From 313370476a88c6737b6e4a4ea724f84afacdff1b Mon Sep 17 00:00:00 2001 From: Shaun Setlock Date: Sat, 26 Apr 2025 21:43:24 -0400 Subject: [PATCH] Moving to dotenv. --- main.py | 182 ++++++++++++++++++++++++++++++++------------------------ 1 file changed, 105 insertions(+), 77 deletions(-) diff --git a/main.py b/main.py index 343dfd5..acbdf1c 100644 --- a/main.py +++ b/main.py @@ -2,14 +2,15 @@ import network import rp2 import time import json +import os import machine from machine import Pin from umqtt.robust import MQTTClient import dht import urequests -import wifi_config -import mqtt_config -import ha_mqtt_config +from dotenv import load_dotenv + +load_dotenv() # Change this GPIO PIN where your DHT22 sensor is connected DHT_22_GPIO_PIN = 3 @@ -17,29 +18,31 @@ DHT_22_GPIO_PIN = 3 # Debug Mode DEBUG = False + def read_cpu_temp(): """ - If you print the value of the temperature value you are going to get an integer number between 0 and 65535. - So, we have to convert this value either to the Celsius degree scales. + If you print the value of the temperature value you are going to get an integer number between 0 and 65535. + So, we have to convert this value either to the Celsius degree scales. - The temperature sensor works by delivering a voltage to the ADC4 pin that is proportional to the temperature. - From the datasheet, a temperature of 27 degrees Celsius delivers a voltage of 0.706 V. - With each additional degree the voltage reduces by 1.721 mV or 0.001721 V. - The first step in converting the 16-bit temperature is to convert it back to volts, which is done based on the 3.3 V maximum voltage used by the Pico board. - ref: https://how2electronics.com/read-temperature-sensor-value-from-raspberry-pi-pico/ + The temperature sensor works by delivering a voltage to the ADC4 pin that is proportional to the temperature. + From the datasheet, a temperature of 27 degrees Celsius delivers a voltage of 0.706 V. + With each additional degree the voltage reduces by 1.721 mV or 0.001721 V. + The first step in converting the 16-bit temperature is to convert it back to volts, which is done based on the 3.3 V maximum voltage used by the Pico board. + ref: https://how2electronics.com/read-temperature-sensor-value-from-raspberry-pi-pico/ """ cpu_temp_conversion_factor = 3.3 / 65535 cpu_temp_sensor = machine.ADC(4) reading = cpu_temp_sensor.read_u16() * cpu_temp_conversion_factor temperature_c = 27 - (reading - 0.706) / 0.001721 - temperature_f = temperature_c * 9/5. + 32.0 + temperature_f = temperature_c * 9 / 5.0 + 32.0 return temperature_f + def read_dht_22(sensor): """ - reads the temperature and humidity from dht.DHT22 sensor. - returns tuple(temperature, humidity) if no errors - returns None if there was an error + reads the temperature and humidity from dht.DHT22 sensor. + returns tuple(temperature, humidity) if no errors + returns None if there was an error """ try: sensor.measure() @@ -50,10 +53,11 @@ def read_dht_22(sensor): time.sleep(2) return None + def wlan_up(wlan): wlan.active(True) - wlan.connect(wifi_config.HOME_WIFI_SSID, wifi_config.HOME_WIFI_PWD) - + wlan.connect(os.getenv("HOME_WIFI_SSID"), os.getenv("HOME_WIFI_PWD")) + # Wait for connect or fail max_wait = 10 while max_wait > 0: @@ -61,22 +65,23 @@ def wlan_up(wlan): break max_wait -= 1 if DEBUG: - print('Waiting for WiFi connection...') + print("Waiting for WiFi connection...") time.sleep(1) if max_wait == 0: return None - + ifconfig = wlan.ifconfig() if DEBUG: print(ifconfig) return ifconfig - + + def led_error_code(led, error_code: int): """Blink LED for a given error code (int). error code == number of times to blink""" if DEBUG: print("LED Error Status code: {}".format(error_code)) - + # Run a quick 'start error code sequence' # So we know when LED error sequence starts start_sequence_counter = 0 @@ -86,7 +91,7 @@ def led_error_code(led, error_code: int): led.value(False) time.sleep(0.1) start_sequence_counter += 1 - + # Run real error code sequence blink_counter = 0 while blink_counter < error_code: @@ -99,40 +104,40 @@ def led_error_code(led, error_code: int): led.value(False) if DEBUG: print("LED Error Status code finished for: {}".format(error_code)) - - + + def main(): # Start Up Activities if DEBUG: print("Start up") count = 0 - led = machine.Pin('LED', machine.Pin.OUT) + led = machine.Pin("LED", machine.Pin.OUT) led.value(False) led_error_code(led, 1) # Set Wi-Fi Country - rp2.country('US') + rp2.country("US") wlan = network.WLAN(network.STA_IF) - + # Create MQTT Client mqtt_client = MQTTClient( - client_id = mqtt_config.MQTT_CLIENT_ID, - server = mqtt_config.MQTT_HOST_NAME, - ) + client_id=os.getenv("MQTT_CLIENT_ID"), + server=os.getenv("MQTT_HOST_NAME"), + ) # Create Home Assistant MQTT Client ha_mqtt_client = MQTTClient( - client_id = ha_mqtt_config.MQTT_CLIENT_ID, - server = ha_mqtt_config.MQTT_HOST_NAME, - keepalive = ha_mqtt_config.MQTT_KEEP, - user = ha_mqtt_config.MQTT_USERNAME, - password = ha_mqtt_config.MQTT_PASSWORD, - ) + client_id=os.getenv("MQTT_CLIENT_ID"), + server=os.getenv("MQTT_HOST_NAME"), + keepalive=os.getenv("MQTT_KEEP"), + user=os.getenv("MQTT_USERNAME"), + password=os.getenv("MQTT_PASSWORD"), + ) # Create DHT22 sensor = dht.DHT22(Pin(DHT_22_GPIO_PIN)) - + # Let's Go! if DEBUG: print("Enter main loop") @@ -143,8 +148,8 @@ def main(): led.value(False) count += 1 if DEBUG: - print(f'\nStarting loop #{count}... \nWiFI Status is {wlan.status()}.') - + print(f"\nStarting loop #{count}... \nWiFI Status is {wlan.status()}.") + # Create Local Flags for Control wifi_ready = False mqtt_ready = False @@ -157,10 +162,10 @@ def main(): # DHT22 Reading. dht22_reading = read_dht_22(sensor) - #debug_str = "None" + # debug_str = "None" if dht22_reading is not None: - temp,hum = dht22_reading - temp = temp * 9/5. + 32.0 + temp, hum = dht22_reading + temp = temp * 9 / 5.0 + 32.0 dht22_ready = True except: continue @@ -183,93 +188,116 @@ def main(): wifi_ready = True ifconfig = wlan.ifconfig() if DEBUG: - print(f"Skipping WiFi Activation since WiFi Status is {wlan.status()} \n{ifconfig}.") + print( + f"Skipping WiFi Activation since WiFi Status is {wlan.status()} \n{ifconfig}." + ) except: continue # MQTT Connection to Primary Broker. try: - mqtt_client.connect(clean_session = False) + mqtt_client.connect(clean_session=False) mqtt_ready = True - mqtt = 'mosquitto' + mqtt = "mosquitto" if DEBUG: - print(f'Connected to Mosquitto MQTT broker.') - + print(f"Connected to Mosquitto MQTT broker.") + except Exception as e: if DEBUG: print("Trouble to connecting to Mosquitto MQTT: {}".format(e)) - # MQTT Connection to Back Up Broker. if not mqtt_ready: try: try: - ha_mqtt_client.connect(clean_session = False) + ha_mqtt_client.connect(clean_session=False) mqtt_ready = True - mqtt = 'ha' + mqtt = "ha" if DEBUG: - print(f'Connected to the back up, Home Assistant, MQTT broker.') - + print(f"Connected to the back up, Home Assistant, MQTT broker.") + except Exception as f: if DEBUG: - print("Trouble to connecting to Home Assistant MQTT: {}".format(f)) + print( + "Trouble to connecting to Home Assistant MQTT: {}".format(f) + ) led_error_code(led, 2) time.sleep(5) - continue # Start back at the top of the While Loop + continue # Start back at the top of the While Loop except: continue - # Ready to Publish? try: if wifi_ready and mqtt_ready and dht22_ready: # Timestamp time_now = time.localtime() - timestamp = "{}/{}/{} {}:{}:{}".format(time_now[1],time_now[2],time_now[0],time_now[3],time_now[4],time_now[5]) - + timestamp = "{}/{}/{} {}:{}:{}".format( + time_now[1], + time_now[2], + time_now[0], + time_now[3], + time_now[4], + time_now[5], + ) + # Build JSON Payloads dht_data = { - 'Location':mqtt_config.MQTT_ZONE_ID, - 'Temperature':temp, - 'Relative Humidity':hum, + "Location": os.getenv("MQTT_ZONE_ID"), + "Temperature": temp, + "Relative Humidity": hum, } dht_payload = json.dumps(dht_data) - + hw_data = { - 'Timestamp':timestamp, - 'CPU Temperature':cpu_temp, - 'Device':mqtt_config.MQTT_HW_ID, - 'WiFi Information':ifconfig, + "Timestamp": timestamp, + "CPU Temperature": cpu_temp, + "Device": os.getenv("MQTT_HW_ID"), + "WiFi Information": ifconfig, } hw_payload = json.dumps(hw_data) if DEBUG: - print(f'Trying to publish...') + print(f"Trying to publish...") - - if mqtt == 'mosquitto': + if mqtt == "mosquitto": # Publish - mqtt_client.publish("home/{}".format(mqtt_config.MQTT_ZONE_ID),dht_payload, retain=True) - mqtt_client.publish("hw/{}".format(mqtt_config.MQTT_HW_ID),hw_payload, retain=True) + mqtt_client.publish( + "home/{}".format(os.getenv("MQTT_ZONE_ID")), + dht_payload, + retain=True, + ) + mqtt_client.publish( + "hw/{}".format(os.getenv("MQTT_HW_ID")), hw_payload, retain=True + ) mqtt_client.disconnect() if DEBUG: - print(f'MQTT Disconnected.') - - if mqtt == 'ha': + print(f"MQTT Disconnected.") + + if mqtt == "ha": # Publish - ha_mqtt_client.publish("home/{}".format(mqtt_config.MQTT_ZONE_ID),dht_payload, retain=True) - ha_mqtt_client.publish("hw/{}".format(mqtt_config.MQTT_HW_ID),hw_payload, retain=True) + ha_mqtt_client.publish( + "home/{}".format(os.getenv("HA_MQTT_ZONE_ID")), + dht_payload, + retain=True, + ) + ha_mqtt_client.publish( + "hw/{}".format(os.getenv("HA_MQTT_HW_ID")), + hw_payload, + retain=True, + ) ha_mqtt_client.disconnect() if DEBUG: - print(f'MQTT Disconnected.') - + print(f"MQTT Disconnected.") + except: continue # Sleep for a bit. if DEBUG: - print(f'Finished loop #{count}.') + print(f"Finished loop #{count}.") time.sleep(5) + main()