Archived
1
0

Moving to dotenv.

This commit is contained in:
Shaun Setlock
2025-04-26 21:27:05 -04:00
parent 33ba6f414b
commit 6deaf17a6a

113
main.py
View File

@@ -1,15 +1,16 @@
import network import network
import rp2 import rp2
import time import time
import os
import json import json
import machine import machine
from machine import Pin from machine import Pin
from umqtt.robust import MQTTClient from umqtt.robust import MQTTClient
import wifi_config
import mqtt_config
import ha_mqtt_config
from pico_funcs import read_cpu_temp, wlan_up, led_error_code from pico_funcs import read_cpu_temp, wlan_up, led_error_code
from hcsr04_funcs import read_hc_sr04 from hcsr04_funcs import read_hc_sr04
from dotenv import load_dotenv
load_dotenv()
# Change this GPIO PIN where your DHT22 sensor is connected # Change this GPIO PIN where your DHT22 sensor is connected
TRIG_PIN = 3 TRIG_PIN = 3
@@ -18,35 +19,34 @@ ECHO_PIN = 2
# Debug Mode # Debug Mode
DEBUG = False DEBUG = False
def main(): def main():
# Start Up Activities # Start Up Activities
if DEBUG: if DEBUG:
print("Start up") print("Start up")
count = 0 count = 0
led = machine.Pin('LED', machine.Pin.OUT) led = machine.Pin("LED", machine.Pin.OUT)
led.value(False) led.value(False)
led_error_code(led, 1) led_error_code(led, 1)
# Set Wi-Fi Country and Create a Wireless Interface # Set Wi-Fi Country and Create a Wireless Interface
rp2.country('US') rp2.country("US")
wlan = network.WLAN(network.STA_IF) wlan = network.WLAN(network.STA_IF)
# Create MQTT Client # Create MQTT Client
mqtt_client = MQTTClient( mqtt_client = MQTTClient(
client_id = mqtt_config.MQTT_CLIENT_ID, client_id=os.getenv("MQTT_CLIENT_ID"),
server = mqtt_config.MQTT_HOST_NAME, server=os.getenv("MQTT_HOST_NAME"),
) )
# Create Home Assistant MQTT Client # Create Home Assistant MQTT Client
ha_mqtt_client = MQTTClient( ha_mqtt_client = MQTTClient(
client_id = ha_mqtt_config.MQTT_CLIENT_ID, client_id=os.getenv("HA_MQTT_CLIENT_ID"),
server = ha_mqtt_config.MQTT_HOST_NAME, server=os.getenv("HA_MQTT_HOST_NAME"),
keepalive = ha_mqtt_config.MQTT_KEEP, keepalive=os.getenv("HA_MQTT_KEEP"),
user = ha_mqtt_config.MQTT_USERNAME, user=os.getenv("HA_MQTT_USERNAME"),
password = ha_mqtt_config.MQTT_PASSWORD, password=os.getenv("HA_MQTT_PASSWORD"),
) )
# Create HC-SR04 # Create HC-SR04
@@ -67,7 +67,7 @@ def main():
led.value(False) led.value(False)
count += 1 count += 1
if DEBUG: if DEBUG:
print(f'\nStarting loop #{count}... \nWiFI Status is {wlan.status()}.') print(f"\nStarting loop #{count}... \nWiFI Status is {wlan.status()}.")
# Create Local Flags for Control # Create Local Flags for Control
wifi_ready = False wifi_ready = False
@@ -84,7 +84,9 @@ def main():
try: try:
sensor_value = read_hc_sr04(trig, echo) sensor_value = read_hc_sr04(trig, echo)
if not first_time: if not first_time:
distance = distance * FILTER_CONSTANT + (1 - FILTER_CONSTANT) * sensor_value distance = (
distance * FILTER_CONSTANT + (1 - FILTER_CONSTANT) * sensor_value
)
else: else:
distance = sensor_value distance = sensor_value
first_time = False first_time = False
@@ -110,36 +112,39 @@ def main():
wifi_ready = True wifi_ready = True
ifconfig = wlan.ifconfig() ifconfig = wlan.ifconfig()
if DEBUG: if DEBUG:
print(f"Skipping WiFi Activation since WiFi Status is {wlan.status()} \n{ifconfig}.") print(
f"Skipping WiFi Activation since WiFi Status is {wlan.status()} \n{ifconfig}."
)
except: except:
continue continue
# MQTT Connection to Primary Broker. # MQTT Connection to Primary Broker.
try: try:
mqtt_client.connect(clean_session = False) mqtt_client.connect(clean_session=False)
mqtt_ready = True mqtt_ready = True
mqtt = 'mosquitto' mqtt = "mosquitto"
if DEBUG: if DEBUG:
print(f'Connected to Mosquitto MQTT broker.') print(f"Connected to Mosquitto MQTT broker.")
except Exception as e: except Exception as e:
if DEBUG: if DEBUG:
print("Trouble to connecting to Mosquitto MQTT: {}".format(e)) print("Trouble to connecting to Mosquitto MQTT: {}".format(e))
# MQTT Connection to Back Up Broker. # MQTT Connection to Back Up Broker.
if not mqtt_ready: if not mqtt_ready:
try: try:
try: try:
ha_mqtt_client.connect(clean_session = False) ha_mqtt_client.connect(clean_session=False)
mqtt_ready = True mqtt_ready = True
mqtt = 'ha' mqtt = "ha"
if DEBUG: if DEBUG:
print(f'Connected to the back up, Home Assistant, MQTT broker.') print(f"Connected to the back up, Home Assistant, MQTT broker.")
except Exception as f: except Exception as f:
if DEBUG: if DEBUG:
print("Trouble to connecting to Home Assistant MQTT: {}".format(f)) print(
"Trouble to connecting to Home Assistant MQTT: {}".format(f)
)
led_error_code(led, 2) led_error_code(led, 2)
time.sleep(5) time.sleep(5)
continue # Start back at the top of the While Loop continue # Start back at the top of the While Loop
@@ -152,50 +157,70 @@ def main():
if wifi_ready and mqtt_ready and hc_sr04_ready: if wifi_ready and mqtt_ready and hc_sr04_ready:
# Timestamp # Timestamp
time_now = time.localtime() time_now = time.localtime()
timestamp = "{}/{}/{} {}:{}:{}".format(time_now[1],time_now[2],time_now[0],time_now[3],time_now[4],time_now[5]) timestamp = "{}/{}/{} {}:{}:{}".format(
time_now[1],
time_now[2],
time_now[0],
time_now[3],
time_now[4],
time_now[5],
)
# Build JSON Payloads # Build JSON Payloads
hc_sr04_data = { hc_sr04_data = {
'Location':mqtt_config.MQTT_ZONE_ID, "Location": os.getenv("MQTT_ZONE_ID"),
'Distance':distance, "Distance": distance,
} }
hc_sr04_payload = json.dumps(hc_sr04_data) hc_sr04_payload = json.dumps(hc_sr04_data)
hw_data = { hw_data = {
'Timestamp':timestamp, "Timestamp": timestamp,
'CPU Temperature':cpu_temp, "CPU Temperature": cpu_temp,
'Device':mqtt_config.MQTT_HW_ID, "Device": os.getenv("MQTT_HW_ID"),
'WiFi Information':ifconfig, "WiFi Information": ifconfig,
} }
hw_payload = json.dumps(hw_data) hw_payload = json.dumps(hw_data)
if DEBUG: if DEBUG:
print(f'Trying to publish...') print(f"Trying to publish...")
if mqtt == 'mosquitto': if mqtt == "mosquitto":
# Publish # Publish
mqtt_client.publish("home/{}".format(mqtt_config.MQTT_ZONE_ID),hc_sr04_payload, retain=True) mqtt_client.publish(
mqtt_client.publish("hw/{}".format(mqtt_config.MQTT_HW_ID),hw_payload, retain=True) "home/{}".format(os.getenv("MQTT_ZONE_ID")),
hc_sr04_payload,
retain=True,
)
mqtt_client.publish(
"hw/{}".format(os.getenv("MQTT_HW_ID")), hw_payload, retain=True
)
mqtt_client.disconnect() mqtt_client.disconnect()
if DEBUG: if DEBUG:
print(f'MQTT Disconnected.') print(f"MQTT Disconnected.")
if mqtt == 'ha': if mqtt == "ha":
# Publish # Publish
ha_mqtt_client.publish("home/{}".format(mqtt_config.MQTT_ZONE_ID),hc_sr04_payload, retain=True) ha_mqtt_client.publish(
ha_mqtt_client.publish("hw/{}".format(mqtt_config.MQTT_HW_ID),hw_payload, retain=True) "home/{}".format(os.getenv("HA_MQTT_ZONE_ID")),
hc_sr04_payload,
retain=True,
)
ha_mqtt_client.publish(
"hw/{}".format(os.getenv("HA_MQTT_HW_ID")),
hw_payload,
retain=True,
)
ha_mqtt_client.disconnect() ha_mqtt_client.disconnect()
if DEBUG: if DEBUG:
print(f'MQTT Disconnected.') print("MQTT Disconnected.")
except: except:
continue continue
# Sleep for a bit. # Sleep for a bit.
if DEBUG: if DEBUG:
print(f'Finished loop #{count}.') print(f"Finished loop #{count}.")
time.sleep(5) time.sleep(5)
main() main()