Refactor alarm clock application structure and add core functionality
- Moved legacy entry point to init.py - Created src/__init__.py for module initialization - Implemented AlarmClock class in src/alarm_clock.py - Added audio playback capabilities in src/audio.py - Configured Shelly device interaction in src/shelly.py - Introduced wakeup action handling in src/wakeup_action.py - Added location utilities for sun times in src/location.py - Established configuration settings in src/config.py - Developed main application logic in src/main.py
This commit is contained in:
321
init.py
321
init.py
@@ -1,317 +1,10 @@
|
|||||||
from time import time
|
"""
|
||||||
from astral import LocationInfo
|
Legacy entry point - redirects to src.main
|
||||||
from astral.sun import sun
|
|
||||||
from datetime import date, datetime, timedelta
|
|
||||||
from zoneinfo import ZoneInfo
|
|
||||||
from typing import Callable
|
|
||||||
import requests
|
|
||||||
import subprocess
|
|
||||||
|
|
||||||
from grove.gpio import GPIO
|
|
||||||
from grove.display.jhd1802 import JHD1802
|
|
||||||
|
|
||||||
shelly_server = "https://shelly-237-eu.shelly.cloud"
|
|
||||||
shelly_token = "M2M5YTYxdWlkEFD82A2301693D62637E530B4CDDC0DC6E2CDD8F01BE5E5102000B92638A2DA74019E6A81E6D17E0"
|
|
||||||
|
|
||||||
shelly_deviceId = "8cbfea9fd6d0"
|
|
||||||
|
|
||||||
def setShellyPlugState(deviceId: str, state: bool) -> dict:
|
|
||||||
"""
|
|
||||||
Set the state of a Shelly plug/switch device.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
deviceId: The Shelly device id
|
|
||||||
state: True to turn on, False to turn off
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Response from the API as a dictionary
|
|
||||||
"""
|
|
||||||
url = f"{shelly_server}/v2/devices/api/set/switch"
|
|
||||||
params = {"auth_key": shelly_token}
|
|
||||||
payload = {
|
|
||||||
"id": deviceId,
|
|
||||||
"on": state
|
|
||||||
}
|
|
||||||
|
|
||||||
response = requests.post(url, params=params, json=payload)
|
|
||||||
|
|
||||||
if response.status_code == 200:
|
|
||||||
return {"success": True}
|
|
||||||
else:
|
|
||||||
return response.json()
|
|
||||||
|
|
||||||
city = LocationInfo("Seinäjoki", "Finland", "Europe/Helsinki", 62.7900, 22.8400)
|
|
||||||
s = sun(city.observer, date=date.today())
|
|
||||||
print(f"City: {city.name}, {city.region}")
|
|
||||||
print(f"Timezone: {city.timezone}")
|
|
||||||
print(f"Latitude: {city.latitude:.6f}; Longitude: {city.longitude:.6f}")
|
|
||||||
|
|
||||||
def isSunUp() -> bool:
|
|
||||||
now = datetime.now(city.tzinfo)
|
|
||||||
return s["sunrise"] <= now <= s["sunset"]
|
|
||||||
|
|
||||||
def playAudio(filePath: str, loop: bool = False) -> Callable[[], None]:
|
|
||||||
"""
|
|
||||||
Play an audio file through the Raspberry Pi's standard aux output.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
filePath: Path to the audio file (.wav, .mp3, .ogg, etc.)
|
|
||||||
loop: Whether to loop the audio playback until stopped (default: False)
|
|
||||||
Returns:
|
|
||||||
A callable that stops the audio playback when invoked.
|
|
||||||
"""
|
|
||||||
if filePath.endswith(".wav"):
|
|
||||||
if loop:
|
|
||||||
# Use sox's play command with repeat for looping wav files
|
|
||||||
process = subprocess.Popen(["play", filePath, "repeat", "-"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
|
||||||
else:
|
|
||||||
process = subprocess.Popen(["aplay", filePath], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
|
||||||
else:
|
|
||||||
# Use mpg123 for mp3
|
|
||||||
if loop:
|
|
||||||
process = subprocess.Popen(["mpg123", "-q", "--loop", "-1", filePath], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
|
||||||
else:
|
|
||||||
process = subprocess.Popen(["mpg123", "-q", filePath], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
|
||||||
|
|
||||||
def stop():
|
|
||||||
process.terminate()
|
|
||||||
|
|
||||||
return stop
|
|
||||||
|
|
||||||
|
|
||||||
class WakeUpAction:
|
|
||||||
offsetSeconds: int # how many seconds before alarm time to trigger
|
|
||||||
action: Callable[[], None]
|
|
||||||
dismissAction: Callable[[], None] | None = None
|
|
||||||
triggered: bool = False
|
|
||||||
|
|
||||||
def __init__(self, offsetSeconds: int, action: Callable[[], None], dismissAction: Callable[[], None] | None):
|
|
||||||
self.offsetSeconds = offsetSeconds
|
|
||||||
self.action = action
|
|
||||||
self.dismissAction = dismissAction
|
|
||||||
self.triggered = False
|
|
||||||
|
|
||||||
|
|
||||||
# sunset and sunrise
|
|
||||||
class AlarmClock:
|
|
||||||
wakeupActions: list[WakeUpAction] = []
|
|
||||||
ledGpio: GPIO
|
|
||||||
buttonGpio: GPIO
|
|
||||||
relayGpio: GPIO
|
|
||||||
lastKnownButtonState: bool = False
|
|
||||||
lcd: JHD1802
|
|
||||||
lastButtonPressTime: float = 0.0
|
|
||||||
lastButtonStateChangeTime: float = 0.0
|
|
||||||
debounceDelay: float = 0.05 # 50ms debounce delay
|
|
||||||
longPressThreshold: float = 1.0 # 1 second for long press
|
|
||||||
buttonPressStartTime: float = 0.0
|
|
||||||
configMode: str = "normal" # modes: normal, set_hour, set_minute
|
|
||||||
alarmTime: datetime = datetime.now(ZoneInfo("Europe/Helsinki")).replace(hour=9, minute=51, second=0, microsecond=0)
|
|
||||||
actionsResetForToday: bool = False
|
|
||||||
alarmActive: bool = False
|
|
||||||
def __init__(self, pins: dict = {}):
|
|
||||||
self.ledGpio = GPIO(pins.get("led", 5), GPIO.OUT)
|
|
||||||
self.buttonGpio = GPIO(pins.get("button", 6), GPIO.IN)
|
|
||||||
self.relayGpio = GPIO(pins.get("relay", 16), GPIO.OUT)
|
|
||||||
self.lcd = JHD1802()
|
|
||||||
self.wakeupActions = []
|
|
||||||
self.addWakeUpAction(0, lambda: self.triggerAlarm()) # Activate alarm state
|
|
||||||
self.setLcdText("AlarmClock Init")
|
|
||||||
|
|
||||||
def addWakeUpAction(self, offsetSeconds: int, action: Callable[[], None], dismissAction: Callable[[], None] | None = None):
|
|
||||||
"""Add a wakeup action to be triggered offsetSeconds before alarm time"""
|
|
||||||
self.wakeupActions.append(WakeUpAction(offsetSeconds, action, dismissAction))
|
|
||||||
|
|
||||||
def resetWakeUpActions(self):
|
|
||||||
for action in self.wakeupActions:
|
|
||||||
action.triggered = False
|
|
||||||
self.actionsResetForToday = True
|
|
||||||
print("Wakeup actions reset")
|
|
||||||
|
|
||||||
def triggerAlarm(self):
|
|
||||||
if self.alarmActive:
|
|
||||||
return
|
|
||||||
self.alarmActive = True
|
|
||||||
print("Alarm is now active!")
|
|
||||||
|
|
||||||
def dismissAlarm(self):
|
|
||||||
if not self.alarmActive:
|
|
||||||
return
|
|
||||||
self.alarmActive = False
|
|
||||||
for action in self.wakeupActions:
|
|
||||||
if action.dismissAction:
|
|
||||||
action.dismissAction()
|
|
||||||
print("Alarm dismissed")
|
|
||||||
|
|
||||||
def checkWakeUpActions(self):
|
|
||||||
"""Check and trigger any wakeup actions that are due"""
|
|
||||||
if not self.wakeupActions:
|
|
||||||
return
|
|
||||||
|
|
||||||
# if not in normal mode, skip wakeup actions
|
|
||||||
if self.configMode != "normal":
|
|
||||||
return
|
|
||||||
|
|
||||||
now = datetime.now(ZoneInfo(city.timezone))
|
|
||||||
today_alarm = now.replace(
|
|
||||||
hour=self.alarmTime.hour,
|
|
||||||
minute=self.alarmTime.minute,
|
|
||||||
second=0,
|
|
||||||
microsecond=0
|
|
||||||
)
|
|
||||||
|
|
||||||
# Find the earliest action (largest offset)
|
|
||||||
max_offset = max(action.offsetSeconds for action in self.wakeupActions)
|
|
||||||
first_trigger_time = today_alarm - timedelta(seconds=max_offset)
|
|
||||||
reset_time = first_trigger_time - timedelta(seconds=10)
|
|
||||||
alarm_window_end = today_alarm + timedelta(minutes=1)
|
|
||||||
|
|
||||||
# Reset actions 10 seconds before the first one should trigger
|
|
||||||
if now >= reset_time and now < first_trigger_time and not self.actionsResetForToday:
|
|
||||||
self.resetWakeUpActions()
|
|
||||||
|
|
||||||
# After alarm window ends, allow reset to happen again tomorrow
|
|
||||||
if now >= alarm_window_end:
|
|
||||||
self.actionsResetForToday = False
|
|
||||||
|
|
||||||
for wakeupAction in self.wakeupActions:
|
|
||||||
if wakeupAction.triggered:
|
|
||||||
continue
|
|
||||||
trigger_time = today_alarm - timedelta(seconds=wakeupAction.offsetSeconds)
|
|
||||||
if now >= trigger_time and now < today_alarm + timedelta(minutes=1):
|
|
||||||
print(f"Triggering wakeup action (offset: {wakeupAction.offsetSeconds}s)")
|
|
||||||
wakeupAction.action()
|
|
||||||
wakeupAction.triggered = True
|
|
||||||
|
|
||||||
|
|
||||||
def start(self):
|
|
||||||
print("AlarmClock started")
|
|
||||||
while True:
|
|
||||||
self.loop()
|
|
||||||
|
|
||||||
def setRelayState(self, state: bool):
|
|
||||||
self.relayGpio.write(1 if state else 0)
|
|
||||||
|
|
||||||
def loop(self):
|
|
||||||
currentButtonState = not self.buttonGpio.read()
|
|
||||||
currentTime = time()
|
|
||||||
|
|
||||||
if (self.configMode == "normal"):
|
|
||||||
currentMinute = datetime.now(ZoneInfo(city.timezone)).minute
|
|
||||||
currentHour = datetime.now(ZoneInfo(city.timezone)).hour
|
|
||||||
currrentSecond = datetime.now(ZoneInfo(city.timezone)).second
|
|
||||||
|
|
||||||
if self.alarmActive:
|
|
||||||
self.setLcdText(f"!! ALARM !!\nPress to dismiss")
|
|
||||||
else:
|
|
||||||
self.setLcdText(f"Time {currentHour:02d}:{currentMinute:02d}:{currrentSecond:02d}\nAlarm {self.alarmTime.hour:02d}:{self.alarmTime.minute:02d}")
|
|
||||||
|
|
||||||
# Check and trigger wakeup actions
|
|
||||||
self.checkWakeUpActions()
|
|
||||||
|
|
||||||
# Only process button state change if debounce delay has passed
|
|
||||||
if currentButtonState != self.lastKnownButtonState:
|
|
||||||
if (currentTime - self.lastButtonStateChangeTime) >= self.debounceDelay:
|
|
||||||
self.lastKnownButtonState = currentButtonState
|
|
||||||
self.lastButtonStateChangeTime = currentTime
|
|
||||||
self.onButtonPress("button", currentButtonState)
|
|
||||||
|
|
||||||
# Check for long press while button is held
|
|
||||||
if currentButtonState and self.buttonPressStartTime > 0:
|
|
||||||
pressDuration = currentTime - self.buttonPressStartTime
|
|
||||||
if pressDuration >= self.longPressThreshold:
|
|
||||||
self.onLongPress()
|
|
||||||
self.buttonPressStartTime = 0.0 # Reset to prevent repeated triggers
|
|
||||||
|
|
||||||
def onButtonPress(self, button: str, state: bool):
|
|
||||||
print(f"Button '{button}' pressed state: {state}")
|
|
||||||
currentTime = time()
|
|
||||||
|
|
||||||
if button == "button":
|
|
||||||
if state: # Button pressed down
|
|
||||||
self.buttonPressStartTime = currentTime
|
|
||||||
else: # Button released
|
|
||||||
if self.buttonPressStartTime > 0:
|
|
||||||
pressDuration = currentTime - self.buttonPressStartTime
|
|
||||||
self.buttonPressStartTime = 0.0
|
|
||||||
|
|
||||||
# Only handle as short press if it wasn't a long press
|
|
||||||
if pressDuration < self.longPressThreshold:
|
|
||||||
self.onShortPress()
|
|
||||||
|
|
||||||
def onShortPress(self):
|
|
||||||
"""Handle short button press based on current mode"""
|
|
||||||
if self.configMode == "normal":
|
|
||||||
if self.alarmActive:
|
|
||||||
# Dismiss active alarm
|
|
||||||
self.dismissAlarm()
|
|
||||||
else:
|
|
||||||
# Normal mode: toggle relay
|
|
||||||
self.setRelayState(not self.relayGpio.read())
|
|
||||||
setShellyPlugState(shelly_deviceId, self.relayGpio.read() == 0)
|
|
||||||
elif self.configMode == "set_hour":
|
|
||||||
# Increment hour
|
|
||||||
self.alarmTime = self.alarmTime.replace(hour=(self.alarmTime.hour + 1) % 24)
|
|
||||||
self.setLcdText(f"Set Hour: {self.alarmTime.hour:02d}")
|
|
||||||
elif self.configMode == "set_minute":
|
|
||||||
# Increment minute
|
|
||||||
self.alarmTime = self.alarmTime.replace(minute=(self.alarmTime.minute + 1) % 60)
|
|
||||||
self.setLcdText(f"Set Min: {self.alarmTime.minute:02d}")
|
|
||||||
self.resetWakeUpActions() # Reset wakeup actions to apply new alarm time immediately
|
|
||||||
|
|
||||||
def onLongPress(self):
|
|
||||||
"""Handle long button press - cycle through configuration modes"""
|
|
||||||
if self.configMode == "normal":
|
|
||||||
self.configMode = "set_hour"
|
|
||||||
self.setLcdText(f"Set Hour: {self.alarmTime.hour:02d}")
|
|
||||||
print(f"Entering hour setting mode")
|
|
||||||
elif self.configMode == "set_hour":
|
|
||||||
self.configMode = "set_minute"
|
|
||||||
self.setLcdText(f"Set Min: {self.alarmTime.minute:02d}")
|
|
||||||
print(f"Switching to minute setting mode")
|
|
||||||
elif self.configMode == "set_minute":
|
|
||||||
self.configMode = "normal"
|
|
||||||
self.setLcdText(f"Saved: {self.alarmTime.hour:02d}:{self.alarmTime.minute:02d}")
|
|
||||||
print(f"Alarm time saved: {self.alarmTime.hour:02d}:{self.alarmTime.minute:02d}")
|
|
||||||
|
|
||||||
lastLcdText: str = ""
|
|
||||||
def setLcdText(self, text: str):
|
|
||||||
if text == self.lastLcdText:
|
|
||||||
return
|
|
||||||
self.lastLcdText = text
|
|
||||||
self.lcd.clear()
|
|
||||||
rows = text.split("\n")
|
|
||||||
for i, row in enumerate(rows):
|
|
||||||
if i == 0:
|
|
||||||
self.lcd.setCursor(0, 0)
|
|
||||||
self.lcd.write(f"{(" "+row):<17}")
|
|
||||||
elif i == 1:
|
|
||||||
self.lcd.setCursor(1, 0)
|
|
||||||
self.lcd.write(row)
|
|
||||||
#print(f"LCD: {text}")
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
pins = {
|
|
||||||
"button": 6,
|
|
||||||
"led": 5,
|
|
||||||
"buzzer": 12,
|
|
||||||
}
|
|
||||||
alarm_clock = AlarmClock(pins)
|
|
||||||
|
|
||||||
# Add wakeup actions (offsetSeconds = seconds before alarm to trigger)
|
|
||||||
alarm_clock.addWakeUpAction(120, lambda: print("2 minutes to alarm!")) # 2 minutes before alarm
|
|
||||||
alarm_clock.addWakeUpAction(90, lambda: setShellyPlugState(shelly_deviceId, True)) # activate plug 90s before alarm
|
|
||||||
alarm_clock.addWakeUpAction(30, lambda: print("30 seconds to alarm!")) # 30 seconds before alarm
|
|
||||||
alarm_clock.addWakeUpAction(0, lambda: print("Alarm triggered!")) # At alarm time
|
|
||||||
|
|
||||||
alarm_clock.addWakeUpAction(0, lambda: playAudio("./boxing_bell_multiple.wav")) # Play sound at alarm time
|
|
||||||
|
|
||||||
alarm_clock.start()
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
For new development, use: python -m src.main
|
||||||
|
"""
|
||||||
|
|
||||||
|
from src.main import main
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
|
|||||||
1
src/__init__.py
Normal file
1
src/__init__.py
Normal file
@@ -0,0 +1 @@
|
|||||||
|
# Alarm Clock IoT Application
|
||||||
229
src/alarm_clock.py
Normal file
229
src/alarm_clock.py
Normal file
@@ -0,0 +1,229 @@
|
|||||||
|
from time import time
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
from zoneinfo import ZoneInfo
|
||||||
|
from typing import Callable
|
||||||
|
|
||||||
|
from grove.gpio import GPIO
|
||||||
|
from grove.display.jhd1802 import JHD1802
|
||||||
|
|
||||||
|
from .config import CITY, SHELLY_DEVICE_ID
|
||||||
|
from .shelly import set_shelly_plug_state
|
||||||
|
from .wakeup_action import WakeUpAction
|
||||||
|
|
||||||
|
|
||||||
|
class AlarmClock:
|
||||||
|
"""Main alarm clock controller with LCD display and GPIO controls."""
|
||||||
|
|
||||||
|
wakeup_actions: list[WakeUpAction] = []
|
||||||
|
led_gpio: GPIO
|
||||||
|
button_gpio: GPIO
|
||||||
|
relay_gpio: GPIO
|
||||||
|
last_known_button_state: bool = False
|
||||||
|
lcd: JHD1802
|
||||||
|
last_button_press_time: float = 0.0
|
||||||
|
last_button_state_change_time: float = 0.0
|
||||||
|
debounce_delay: float = 0.05 # 50ms debounce delay
|
||||||
|
long_press_threshold: float = 1.0 # 1 second for long press
|
||||||
|
button_press_start_time: float = 0.0
|
||||||
|
config_mode: str = "normal" # modes: normal, set_hour, set_minute
|
||||||
|
alarm_time: datetime = datetime.now(ZoneInfo("Europe/Helsinki")).replace(
|
||||||
|
hour=9, minute=59, second=0, microsecond=0
|
||||||
|
)
|
||||||
|
actions_reset_for_today: bool = False
|
||||||
|
alarm_active: bool = False
|
||||||
|
last_lcd_text: str = ""
|
||||||
|
|
||||||
|
def __init__(self, pins: dict = {}):
|
||||||
|
self.led_gpio = GPIO(pins.get("led", 5), GPIO.OUT)
|
||||||
|
self.button_gpio = GPIO(pins.get("button", 6), GPIO.IN)
|
||||||
|
self.relay_gpio = GPIO(pins.get("relay", 16), GPIO.OUT)
|
||||||
|
self.lcd = JHD1802()
|
||||||
|
self.wakeup_actions = []
|
||||||
|
self.add_wakeup_action(0, lambda: self.trigger_alarm()) # Activate alarm state
|
||||||
|
self.set_lcd_text("AlarmClock Init")
|
||||||
|
|
||||||
|
def add_wakeup_action(
|
||||||
|
self,
|
||||||
|
offset_seconds: int,
|
||||||
|
action: Callable[[], None],
|
||||||
|
dismiss_action: Callable[[], None] | None = None
|
||||||
|
):
|
||||||
|
"""Add a wakeup action to be triggered offset_seconds before alarm time."""
|
||||||
|
self.wakeup_actions.append(WakeUpAction(offset_seconds, action, dismiss_action))
|
||||||
|
|
||||||
|
def reset_wakeup_actions(self):
|
||||||
|
"""Reset all wakeup actions to untriggered state."""
|
||||||
|
for action in self.wakeup_actions:
|
||||||
|
action.triggered = False
|
||||||
|
self.actions_reset_for_today = True
|
||||||
|
print("Wakeup actions reset")
|
||||||
|
|
||||||
|
def trigger_alarm(self):
|
||||||
|
"""Activate the alarm state."""
|
||||||
|
if self.alarm_active:
|
||||||
|
return
|
||||||
|
self.alarm_active = True
|
||||||
|
print("Alarm is now active!")
|
||||||
|
|
||||||
|
def dismiss_alarm(self):
|
||||||
|
"""Dismiss the active alarm and call dismiss actions."""
|
||||||
|
if not self.alarm_active:
|
||||||
|
return
|
||||||
|
self.alarm_active = False
|
||||||
|
for action in self.wakeup_actions:
|
||||||
|
if action.dismiss_action:
|
||||||
|
action.dismiss_action()
|
||||||
|
print("Alarm dismissed")
|
||||||
|
|
||||||
|
def check_wakeup_actions(self):
|
||||||
|
"""Check and trigger any wakeup actions that are due."""
|
||||||
|
if not self.wakeup_actions:
|
||||||
|
return
|
||||||
|
|
||||||
|
# If not in normal mode, skip wakeup actions
|
||||||
|
if self.config_mode != "normal":
|
||||||
|
return
|
||||||
|
|
||||||
|
now = datetime.now(ZoneInfo(CITY.timezone))
|
||||||
|
today_alarm = now.replace(
|
||||||
|
hour=self.alarm_time.hour,
|
||||||
|
minute=self.alarm_time.minute,
|
||||||
|
second=0,
|
||||||
|
microsecond=0
|
||||||
|
)
|
||||||
|
|
||||||
|
# Find the earliest action (largest offset)
|
||||||
|
max_offset = max(action.offset_seconds for action in self.wakeup_actions)
|
||||||
|
first_trigger_time = today_alarm - timedelta(seconds=max_offset)
|
||||||
|
reset_time = first_trigger_time - timedelta(seconds=10)
|
||||||
|
alarm_window_end = today_alarm + timedelta(minutes=1)
|
||||||
|
|
||||||
|
# Reset actions 10 seconds before the first one should trigger
|
||||||
|
if now >= reset_time and now < first_trigger_time and not self.actions_reset_for_today:
|
||||||
|
self.reset_wakeup_actions()
|
||||||
|
|
||||||
|
# After alarm window ends, allow reset to happen again tomorrow
|
||||||
|
if now >= alarm_window_end:
|
||||||
|
self.actions_reset_for_today = False
|
||||||
|
|
||||||
|
for wakeup_action in self.wakeup_actions:
|
||||||
|
if wakeup_action.triggered:
|
||||||
|
continue
|
||||||
|
trigger_time = today_alarm - timedelta(seconds=wakeup_action.offset_seconds)
|
||||||
|
if now >= trigger_time and now < today_alarm + timedelta(minutes=1):
|
||||||
|
print(f"Triggering wakeup action (offset: {wakeup_action.offset_seconds}s)")
|
||||||
|
wakeup_action.action()
|
||||||
|
wakeup_action.triggered = True
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
"""Start the alarm clock main loop."""
|
||||||
|
print("AlarmClock started")
|
||||||
|
while True:
|
||||||
|
self.loop()
|
||||||
|
|
||||||
|
def set_relay_state(self, state: bool):
|
||||||
|
"""Set the relay GPIO state."""
|
||||||
|
self.relay_gpio.write(1 if state else 0)
|
||||||
|
|
||||||
|
def loop(self):
|
||||||
|
"""Main loop iteration - handles display and button input."""
|
||||||
|
current_button_state = not self.button_gpio.read()
|
||||||
|
current_time = time()
|
||||||
|
|
||||||
|
if self.config_mode == "normal":
|
||||||
|
now = datetime.now(ZoneInfo(CITY.timezone))
|
||||||
|
current_hour = now.hour
|
||||||
|
current_minute = now.minute
|
||||||
|
current_second = now.second
|
||||||
|
|
||||||
|
if self.alarm_active:
|
||||||
|
self.set_lcd_text("!! ALARM !!\nPress to dismiss")
|
||||||
|
else:
|
||||||
|
self.set_lcd_text(
|
||||||
|
f"Time {current_hour:02d}:{current_minute:02d}:{current_second:02d}\n"
|
||||||
|
f"Alarm {self.alarm_time.hour:02d}:{self.alarm_time.minute:02d}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check and trigger wakeup actions
|
||||||
|
self.check_wakeup_actions()
|
||||||
|
|
||||||
|
# Only process button state change if debounce delay has passed
|
||||||
|
if current_button_state != self.last_known_button_state:
|
||||||
|
if (current_time - self.last_button_state_change_time) >= self.debounce_delay:
|
||||||
|
self.last_known_button_state = current_button_state
|
||||||
|
self.last_button_state_change_time = current_time
|
||||||
|
self.on_button_press("button", current_button_state)
|
||||||
|
|
||||||
|
# Check for long press while button is held
|
||||||
|
if current_button_state and self.button_press_start_time > 0:
|
||||||
|
press_duration = current_time - self.button_press_start_time
|
||||||
|
if press_duration >= self.long_press_threshold:
|
||||||
|
self.on_long_press()
|
||||||
|
self.button_press_start_time = 0.0 # Reset to prevent repeated triggers
|
||||||
|
|
||||||
|
def on_button_press(self, button: str, state: bool):
|
||||||
|
"""Handle button press/release events."""
|
||||||
|
print(f"Button '{button}' pressed state: {state}")
|
||||||
|
current_time = time()
|
||||||
|
|
||||||
|
if button == "button":
|
||||||
|
if state: # Button pressed down
|
||||||
|
self.button_press_start_time = current_time
|
||||||
|
else: # Button released
|
||||||
|
if self.button_press_start_time > 0:
|
||||||
|
press_duration = current_time - self.button_press_start_time
|
||||||
|
self.button_press_start_time = 0.0
|
||||||
|
|
||||||
|
# Only handle as short press if it wasn't a long press
|
||||||
|
if press_duration < self.long_press_threshold:
|
||||||
|
self.on_short_press()
|
||||||
|
|
||||||
|
def on_short_press(self):
|
||||||
|
"""Handle short button press based on current mode."""
|
||||||
|
if self.config_mode == "normal":
|
||||||
|
if self.alarm_active:
|
||||||
|
# Dismiss active alarm
|
||||||
|
self.dismiss_alarm()
|
||||||
|
else:
|
||||||
|
# Normal mode: toggle relay
|
||||||
|
self.set_relay_state(not self.relay_gpio.read())
|
||||||
|
set_shelly_plug_state(SHELLY_DEVICE_ID, self.relay_gpio.read() == 0)
|
||||||
|
elif self.config_mode == "set_hour":
|
||||||
|
# Increment hour
|
||||||
|
self.alarm_time = self.alarm_time.replace(hour=(self.alarm_time.hour + 1) % 24)
|
||||||
|
self.set_lcd_text(f"Set Hour: {self.alarm_time.hour:02d}")
|
||||||
|
elif self.config_mode == "set_minute":
|
||||||
|
# Increment minute
|
||||||
|
self.alarm_time = self.alarm_time.replace(minute=(self.alarm_time.minute + 1) % 60)
|
||||||
|
self.set_lcd_text(f"Set Min: {self.alarm_time.minute:02d}")
|
||||||
|
self.reset_wakeup_actions() # Reset wakeup actions to apply new alarm time immediately
|
||||||
|
|
||||||
|
def on_long_press(self):
|
||||||
|
"""Handle long button press - cycle through configuration modes."""
|
||||||
|
if self.config_mode == "normal":
|
||||||
|
self.config_mode = "set_hour"
|
||||||
|
self.set_lcd_text(f"Set Hour: {self.alarm_time.hour:02d}")
|
||||||
|
print("Entering hour setting mode")
|
||||||
|
elif self.config_mode == "set_hour":
|
||||||
|
self.config_mode = "set_minute"
|
||||||
|
self.set_lcd_text(f"Set Min: {self.alarm_time.minute:02d}")
|
||||||
|
print("Switching to minute setting mode")
|
||||||
|
elif self.config_mode == "set_minute":
|
||||||
|
self.config_mode = "normal"
|
||||||
|
self.set_lcd_text(f"Saved: {self.alarm_time.hour:02d}:{self.alarm_time.minute:02d}")
|
||||||
|
print(f"Alarm time saved: {self.alarm_time.hour:02d}:{self.alarm_time.minute:02d}")
|
||||||
|
|
||||||
|
def set_lcd_text(self, text: str):
|
||||||
|
"""Update the LCD display with new text."""
|
||||||
|
if text == self.last_lcd_text:
|
||||||
|
return
|
||||||
|
self.last_lcd_text = text
|
||||||
|
self.lcd.clear()
|
||||||
|
rows = text.split("\n")
|
||||||
|
for i, row in enumerate(rows):
|
||||||
|
if i == 0:
|
||||||
|
self.lcd.setCursor(0, 0)
|
||||||
|
self.lcd.write(f"{(' ' + row):<17}")
|
||||||
|
elif i == 1:
|
||||||
|
self.lcd.setCursor(1, 0)
|
||||||
|
self.lcd.write(row)
|
||||||
61
src/audio.py
Normal file
61
src/audio.py
Normal file
@@ -0,0 +1,61 @@
|
|||||||
|
import subprocess
|
||||||
|
from typing import Callable
|
||||||
|
|
||||||
|
class AudioPlayer:
|
||||||
|
def __init__(self, file_path: str, loop: bool = False):
|
||||||
|
self.current_stop_function: Callable[[], None] | None = None
|
||||||
|
self.play(file_path, loop)
|
||||||
|
def play(self, file_path: str, loop: bool = False):
|
||||||
|
"""Play the specified audio file, stopping any currently playing audio."""
|
||||||
|
self.stop() # Stop any currently playing audio
|
||||||
|
self.current_stop_function = play_audio(file_path, loop)
|
||||||
|
def stop(self):
|
||||||
|
"""Stop any currently playing audio."""
|
||||||
|
if self.current_stop_function:
|
||||||
|
self.current_stop_function()
|
||||||
|
self.current_stop_function = None
|
||||||
|
|
||||||
|
def play_audio(file_path: str, loop: bool = False) -> Callable[[], None]:
|
||||||
|
"""
|
||||||
|
Play an audio file through the Raspberry Pi's standard aux output.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
file_path: Path to the audio file (.wav, .mp3, .ogg, etc.)
|
||||||
|
loop: Whether to loop the audio playback until stopped (default: False)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A callable that stops the audio playback when invoked.
|
||||||
|
"""
|
||||||
|
if file_path.endswith(".wav"):
|
||||||
|
if loop:
|
||||||
|
# Use sox's play command with repeat for looping wav files
|
||||||
|
process = subprocess.Popen(
|
||||||
|
["play", file_path, "repeat", "-"],
|
||||||
|
stdout=subprocess.DEVNULL,
|
||||||
|
stderr=subprocess.DEVNULL
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
process = subprocess.Popen(
|
||||||
|
["aplay", file_path],
|
||||||
|
stdout=subprocess.DEVNULL,
|
||||||
|
stderr=subprocess.DEVNULL
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# Use mpg123 for mp3
|
||||||
|
if loop:
|
||||||
|
process = subprocess.Popen(
|
||||||
|
["mpg123", "-q", "--loop", "-1", file_path],
|
||||||
|
stdout=subprocess.DEVNULL,
|
||||||
|
stderr=subprocess.DEVNULL
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
process = subprocess.Popen(
|
||||||
|
["mpg123", "-q", file_path],
|
||||||
|
stdout=subprocess.DEVNULL,
|
||||||
|
stderr=subprocess.DEVNULL
|
||||||
|
)
|
||||||
|
|
||||||
|
def stop():
|
||||||
|
process.terminate()
|
||||||
|
|
||||||
|
return stop
|
||||||
9
src/config.py
Normal file
9
src/config.py
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
from astral import LocationInfo
|
||||||
|
|
||||||
|
# Shelly Cloud API Configuration
|
||||||
|
SHELLY_SERVER = "https://shelly-237-eu.shelly.cloud"
|
||||||
|
SHELLY_TOKEN = "M2M5YTYxdWlkEFD82A2301693D62637E530B4CDDC0DC6E2CDD8F01BE5E5102000B92638A2DA74019E6A81E6D17E0"
|
||||||
|
SHELLY_DEVICE_ID = "8cbfea9fd6d0"
|
||||||
|
|
||||||
|
# Location Configuration
|
||||||
|
CITY = LocationInfo("Seinäjoki", "Finland", "Europe/Helsinki", 62.7900, 22.8400)
|
||||||
28
src/location.py
Normal file
28
src/location.py
Normal file
@@ -0,0 +1,28 @@
|
|||||||
|
from datetime import date, datetime
|
||||||
|
from astral.sun import sun
|
||||||
|
from .config import CITY
|
||||||
|
|
||||||
|
class LocationUtils:
|
||||||
|
@staticmethod
|
||||||
|
def get_sun_times(target_date: date = None):
|
||||||
|
"""Get sunrise and sunset times for the configured city."""
|
||||||
|
if target_date is None:
|
||||||
|
target_date = date.today()
|
||||||
|
return sun(CITY.observer, date=target_date)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def is_sun_up() -> bool:
|
||||||
|
"""Check if the sun is currently up at the configured location."""
|
||||||
|
s = LocationUtils.get_sun_times()
|
||||||
|
now = datetime.now(CITY.tzinfo)
|
||||||
|
return s["sunrise"] <= now <= s["sunset"]
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def print_location_info():
|
||||||
|
"""Print information about the configured location."""
|
||||||
|
s = LocationUtils.get_sun_times()
|
||||||
|
print(f"City: {CITY.name}, {CITY.region}")
|
||||||
|
print(f"Timezone: {CITY.timezone}")
|
||||||
|
print(f"Latitude: {CITY.latitude:.6f}; Longitude: {CITY.longitude:.6f}")
|
||||||
|
print(f"Sunrise: {s['sunrise']}")
|
||||||
|
print(f"Sunset: {s['sunset']}")
|
||||||
40
src/main.py
Normal file
40
src/main.py
Normal file
@@ -0,0 +1,40 @@
|
|||||||
|
from .config import SHELLY_DEVICE_ID
|
||||||
|
from .shelly import ShellyDevice
|
||||||
|
from .audio import AudioPlayer
|
||||||
|
from .alarm_clock import AlarmClock
|
||||||
|
|
||||||
|
shellyDevice = ShellyDevice(SHELLY_DEVICE_ID)
|
||||||
|
audio_player = AudioPlayer('./boxing_bell_multiple.wav', loop=True)
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
"""Main entry point for the alarm clock application."""
|
||||||
|
alarm_clock = AlarmClock()
|
||||||
|
|
||||||
|
alarm_clock.add_wakeup_action(
|
||||||
|
120, lambda: print("2 minutes to alarm!")
|
||||||
|
) # 2 minutes before alarm
|
||||||
|
|
||||||
|
alarm_clock.add_wakeup_action(
|
||||||
|
15, lambda: shellyDevice.set_state(True)
|
||||||
|
) # activate plug 15 seconds before alarm
|
||||||
|
|
||||||
|
alarm_clock.add_wakeup_action(
|
||||||
|
30, lambda: print("30 seconds to alarm!")
|
||||||
|
) # 30 seconds before alarm
|
||||||
|
|
||||||
|
alarm_clock.add_wakeup_action(
|
||||||
|
0, lambda: print("Alarm triggered!")
|
||||||
|
) # At alarm time
|
||||||
|
|
||||||
|
alarm_clock.add_wakeup_action(
|
||||||
|
0,
|
||||||
|
action=lambda: audio_player.play(),
|
||||||
|
dismiss_action=lambda: audio_player.stop()
|
||||||
|
)
|
||||||
|
|
||||||
|
alarm_clock.start()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
37
src/shelly.py
Normal file
37
src/shelly.py
Normal file
@@ -0,0 +1,37 @@
|
|||||||
|
import requests
|
||||||
|
from .config import SHELLY_SERVER, SHELLY_TOKEN
|
||||||
|
|
||||||
|
class ShellyDevice:
|
||||||
|
"""Class to represent a Shelly device and interact with it via the API."""
|
||||||
|
|
||||||
|
def __init__(self, device_id: str):
|
||||||
|
self.device_id = device_id
|
||||||
|
|
||||||
|
def set_state(self, state: bool) -> dict:
|
||||||
|
"""Set the state of the Shelly device."""
|
||||||
|
return set_shelly_plug_state(self.device_id, state)
|
||||||
|
|
||||||
|
def set_shelly_plug_state(device_id: str, state: bool) -> dict:
|
||||||
|
"""
|
||||||
|
Set the state of a Shelly plug/switch device.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
device_id: The Shelly device id
|
||||||
|
state: True to turn on, False to turn off
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Response from the API as a dictionary
|
||||||
|
"""
|
||||||
|
url = f"{SHELLY_SERVER}/v2/devices/api/set/switch"
|
||||||
|
params = {"auth_key": SHELLY_TOKEN}
|
||||||
|
payload = {
|
||||||
|
"id": device_id,
|
||||||
|
"on": state
|
||||||
|
}
|
||||||
|
|
||||||
|
response = requests.post(url, params=params, json=payload)
|
||||||
|
|
||||||
|
if response.status_code == 200:
|
||||||
|
return {"success": True}
|
||||||
|
else:
|
||||||
|
return response.json()
|
||||||
21
src/wakeup_action.py
Normal file
21
src/wakeup_action.py
Normal file
@@ -0,0 +1,21 @@
|
|||||||
|
from typing import Callable
|
||||||
|
|
||||||
|
|
||||||
|
class WakeUpAction:
|
||||||
|
"""Represents an action to be triggered relative to alarm time."""
|
||||||
|
|
||||||
|
offset_seconds: int # how many seconds before alarm time to trigger
|
||||||
|
action: Callable[[], None]
|
||||||
|
dismiss_action: Callable[[], None] | None = None
|
||||||
|
triggered: bool = False
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
offset_seconds: int,
|
||||||
|
action: Callable[[], None],
|
||||||
|
dismiss_action: Callable[[], None] | None = None
|
||||||
|
):
|
||||||
|
self.offset_seconds = offset_seconds
|
||||||
|
self.action = action
|
||||||
|
self.dismiss_action = dismiss_action
|
||||||
|
self.triggered = False
|
||||||
Reference in New Issue
Block a user