|
- #!/usr/bin/env python3
-
- import RPi.GPIO as gpio
- import time
- import logging
- import re
- import subprocess
-
-
- class amp_controller:
- """Amp controller class to hold certain attributes and methods for automatically
- turning on and off the amplifier.
- """
-
- SHUTOFF_TIMER_DURATION = 5 * 60 # in seconds
- SHORT_TIMER_DURATION = 60
- CHECK_FREQUENCY = 1 # in seconds
- MPD_LOG_FILE = "/var/log/mpd/mpd.log"
- PULSE_CARD_PATH = "/proc/asound/card0/pcm0p/sub0/status"
- GPIO_PINS = {"Relay": 2, "Phono": 4, "Tape": 14, "CD": 15}
-
- powered = False
- last_mpd_output = ""
-
- gpio.setmode(gpio.BCM)
- logging.basicConfig(level=logging.DEBUG)
-
- def setup_pins(self) -> None:
- """Set all GPIO pins to output."""
- for pin in self.GPIO_PINS.values():
- gpio.setup(pin, gpio.OUT)
-
- def set_channel(self, channel_name) -> None:
- """Set the channel on the amplifier."""
- pin = self.GPIO_PINS[channel_name]
- gpio.output(pin, True)
- time.sleep(0.5)
- gpio.output(pin, False)
-
- def power(self, state) -> None:
- """Turn the amplifier on or off via the GPIO pins."""
- if state == "on":
- gpio.output(self.GPIO_PINS["Relay"], True)
- time.sleep(0.5)
- self.powered = True
- else:
- gpio.output(self.GPIO_PINS["Relay"], False)
- self.powered = False
-
- def pulse_playing(self) -> bool:
- """Return True if there is something playing on the specified Pulseaudio card.
- Return False in every other case.
- """
- output = subprocess.check_output(
- "cat " + self.PULSE_CARD_PATH, shell=True, universal_newlines=True
- )
-
- state = output.split("\n")[0]
-
- if "RUNNING" in state:
- logging.debug("Pulseaudio connected")
- return True
-
- return False
-
- def mpd_client_connected(self) -> bool:
- """Return True if an mpd client is connected and False if not.
- Requires 'log_level="secure"' or "debug" to be set in mpd.conf.
- """
- output = subprocess.check_output(
- ["tail", "-1", self.MPD_LOG_FILE], universal_newlines=True
- )
-
- if output != self.last_mpd_output:
- self.last_mpd_output = output
- if re.match(".* opened from .*", output):
- logging.debug("mpd client connected")
- return True
-
- return False
-
-
- def main():
- ac = amp_controller()
- ac.setup_pins()
- ac.power("on")
- ac.set_channel("CD")
-
- timer = ac.SHUTOFF_TIMER_DURATION
- while True:
- # Check for changes every second.
- time.sleep(ac.CHECK_FREQUENCY)
-
- # Only connected, nothing playing.
- if ac.mpd_client_connected():
- if not ac.powered:
- ac.power("on")
- ac.set_channel("CD")
-
- # (Re)set the timer to the short duration if a client connects. Don't
- # overwrite a running timer currently greater than the short duration one.
- if timer > 0 and timer <= ac.SHORT_TIMER_DURATION:
- timer = ac.SHORT_TIMER_DURATION
- elif timer < 0:
- timer = ac.SHORT_TIMER_DURATION
-
- logging.debug("Timer set to {}".format(timer))
-
- if timer == 0:
- logging.debug("Timer ran out")
- ac.power("off")
-
- if ac.pulse_playing() and not ac.powered:
- ac.power("on")
- ac.set_channel("CD")
-
- # Reset the timer.
- timer = ac.SHUTOFF_TIMER_DURATION
-
- # Start the counter if nothing is currently playing.
- if not ac.pulse_playing():
- timer -= ac.CHECK_FREQUENCY
- logging.debug("Nothing playing.")
- logging.debug("Timer: " + str(timer))
- continue
-
- if timer < -1000: # Just to be safe and avoid weird overflows.
- timer = -1
-
- # If there is something playing (no ifs matched) just reset the counter.
- timer = ac.SHUTOFF_TIMER_DURATION
- logging.debug("Something is playing.")
- logging.debug("Timer: " + str(timer))
-
-
- if __name__ == "__main__":
- main()
|