#!/usr/bin/env python3 import RPi.GPIO as gpio import time import logging import re import subprocess class amp_controller: """Amp controller class to hold certain attributes and methods for automatically turning on and off the amplifier. """ SHUTOFF_TIMER_DURATION = 5 * 60 # in seconds SHORT_TIMER_DURATION = 60 CHECK_FREQUENCY = 1 # in seconds MPD_LOG_FILE = "/var/log/mpd/mpd.log" PULSE_CARD_PATH = "/proc/asound/card0/pcm0p/sub0/status" GPIO_PINS = {"Relay": 2, "Phono": 4, "Tape": 14, "CD": 15} powered = False last_mpd_output = "" gpio.setmode(gpio.BCM) logging.basicConfig(level=logging.DEBUG) def setup_pins(self) -> None: """Set all GPIO pins to output.""" for pin in self.GPIO_PINS.values(): gpio.setup(pin, gpio.OUT) def set_channel(self, channel_name) -> None: """Set the channel on the amplifier.""" pin = self.GPIO_PINS[channel_name] gpio.output(pin, True) time.sleep(0.5) gpio.output(pin, False) def power(self, state) -> None: """Turn the amplifier on or off via the GPIO pins.""" if state == "on": gpio.output(self.GPIO_PINS["Relay"], True) time.sleep(0.5) self.powered = True else: gpio.output(self.GPIO_PINS["Relay"], False) self.powered = False def pulse_playing(self) -> bool: """Return True if there is something playing on the specified Pulseaudio card. Return False in every other case. """ output = subprocess.check_output( "cat " + self.PULSE_CARD_PATH, shell=True, universal_newlines=True ) state = output.split("\n")[0] if "RUNNING" in state: logging.debug("Pulseaudio connected") return True return False def mpd_client_connected(self) -> bool: """Return True if an mpd client is connected and False if not. Requires 'log_level="secure"' or "debug" to be set in mpd.conf. """ output = subprocess.check_output( ["tail", "-1", self.MPD_LOG_FILE], universal_newlines=True ) if output != self.last_mpd_output: self.last_mpd_output = output if re.match(".* opened from .*", output): logging.debug("mpd client connected") return True return False def main(): ac = amp_controller() ac.setup_pins() ac.power("on") ac.set_channel("CD") timer = ac.SHUTOFF_TIMER_DURATION while True: # Check for changes every second. time.sleep(ac.CHECK_FREQUENCY) # Only connected, nothing playing. if ac.mpd_client_connected(): if not ac.powered: ac.power("on") ac.set_channel("CD") # (Re)set the timer to the short duration if a client connects. Don't # overwrite a running timer currently greater than the short duration one. if timer > 0 and timer <= ac.SHORT_TIMER_DURATION: timer = ac.SHORT_TIMER_DURATION elif timer < 0: timer = ac.SHORT_TIMER_DURATION logging.debug("Timer set to {}".format(timer)) if timer == 0: logging.debug("Timer ran out") ac.power("off") if ac.pulse_playing() and not ac.powered: ac.power("on") ac.set_channel("CD") # Reset the timer. timer = ac.SHUTOFF_TIMER_DURATION # Start the counter if nothing is currently playing. if not ac.pulse_playing(): timer -= ac.CHECK_FREQUENCY logging.debug("Nothing playing.") logging.debug("Timer: " + str(timer)) continue if timer < -1000: # Just to be safe and avoid weird overflows. timer = -1 # If there is something playing (no ifs matched) just reset the counter. timer = ac.SHUTOFF_TIMER_DURATION logging.debug("Something is playing.") logging.debug("Timer: " + str(timer)) if __name__ == "__main__": main()