An old Onkyo HiFi receiver automated via a Raspberry Pi.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

138 lines
4.1 KiB

  1. #!/usr/bin/env python3
  2. import RPi.GPIO as gpio
  3. import time
  4. import logging
  5. import re
  6. import subprocess
  7. class amp_controller:
  8. """Amp controller class to hold certain attributes and methods for automatically
  9. turning on and off the amplifier.
  10. """
  11. SHUTOFF_TIMER_DURATION = 5 * 60 # in seconds
  12. SHORT_TIMER_DURATION = 60
  13. CHECK_FREQUENCY = 1 # in seconds
  14. MPD_LOG_FILE = "/var/log/mpd/mpd.log"
  15. PULSE_CARD_PATH = "/proc/asound/card0/pcm0p/sub0/status"
  16. GPIO_PINS = {"Relay": 2, "Phono": 4, "Tape": 14, "CD": 15}
  17. powered = False
  18. last_mpd_output = ""
  19. gpio.setmode(gpio.BCM)
  20. logging.basicConfig(level=logging.DEBUG)
  21. def setup_pins(self) -> None:
  22. """Set all GPIO pins to output."""
  23. for pin in self.GPIO_PINS.values():
  24. gpio.setup(pin, gpio.OUT)
  25. def set_channel(self, channel_name) -> None:
  26. """Set the channel on the amplifier."""
  27. pin = self.GPIO_PINS[channel_name]
  28. gpio.output(pin, True)
  29. time.sleep(0.5)
  30. gpio.output(pin, False)
  31. def power(self, state) -> None:
  32. """Turn the amplifier on or off via the GPIO pins."""
  33. if state == "on":
  34. gpio.output(self.GPIO_PINS["Relay"], True)
  35. time.sleep(0.5)
  36. self.powered = True
  37. else:
  38. gpio.output(self.GPIO_PINS["Relay"], False)
  39. self.powered = False
  40. def pulse_playing(self) -> bool:
  41. """Return True if there is something playing on the specified Pulseaudio card.
  42. Return False in every other case.
  43. """
  44. output = subprocess.check_output(
  45. "cat " + self.PULSE_CARD_PATH, shell=True, universal_newlines=True
  46. )
  47. state = output.split("\n")[0]
  48. if "RUNNING" in state:
  49. logging.debug("Pulseaudio connected")
  50. return True
  51. return False
  52. def mpd_client_connected(self) -> bool:
  53. """Return True if an mpd client is connected and False if not.
  54. Requires 'log_level="secure"' or "debug" to be set in mpd.conf.
  55. """
  56. output = subprocess.check_output(
  57. ["tail", "-1", self.MPD_LOG_FILE], universal_newlines=True
  58. )
  59. if output != self.last_mpd_output:
  60. self.last_mpd_output = output
  61. if re.match(".* opened from .*", output):
  62. logging.debug("mpd client connected")
  63. return True
  64. return False
  65. def main():
  66. ac = amp_controller()
  67. ac.setup_pins()
  68. ac.power("on")
  69. ac.set_channel("CD")
  70. timer = ac.SHUTOFF_TIMER_DURATION
  71. while True:
  72. # Check for changes every second.
  73. time.sleep(ac.CHECK_FREQUENCY)
  74. # Only connected, nothing playing.
  75. if ac.mpd_client_connected():
  76. if not ac.powered:
  77. ac.power("on")
  78. ac.set_channel("CD")
  79. # (Re)set the timer to the short duration if a client connects. Don't
  80. # overwrite a running timer currently greater than the short duration one.
  81. if timer > 0 and timer <= ac.SHORT_TIMER_DURATION:
  82. timer = ac.SHORT_TIMER_DURATION
  83. elif timer < 0:
  84. timer = ac.SHORT_TIMER_DURATION
  85. logging.debug("Timer set to {}".format(timer))
  86. if timer == 0:
  87. logging.debug("Timer ran out")
  88. ac.power("off")
  89. if ac.pulse_playing() and not ac.powered:
  90. ac.power("on")
  91. ac.set_channel("CD")
  92. # Reset the timer.
  93. timer = ac.SHUTOFF_TIMER_DURATION
  94. # Start the counter if nothing is currently playing.
  95. if not ac.pulse_playing():
  96. timer -= ac.CHECK_FREQUENCY
  97. logging.debug("Nothing playing.")
  98. logging.debug("Timer: " + str(timer))
  99. continue
  100. if timer < -1000: # Just to be safe and avoid weird overflows.
  101. timer = -1
  102. # If there is something playing (no ifs matched) just reset the counter.
  103. timer = ac.SHUTOFF_TIMER_DURATION
  104. logging.debug("Something is playing.")
  105. logging.debug("Timer: " + str(timer))
  106. if __name__ == "__main__":
  107. main()