Archived
1
0
This repository has been archived on 2025-04-27. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
pico-dht22/main.py

208 lines
6.1 KiB
Python

import network
import rp2
import time
import json
import machine
from machine import Pin
from umqtt.simple import MQTTClient
import dht
import urequests
import wifi_config
import mqtt_config
# Change this GPIO PIN where your DHT22 sensor is connected
DHT_22_GPIO_PIN = 3
# Debug Mode
DEBUG = False
def read_cpu_temp():
"""
If you print the value of the temperature value you are going to get an integer number between 0 and 65535.
So, we have to convert this value either to the Celsius degree scales.
The temperature sensor works by delivering a voltage to the ADC4 pin that is proportional to the temperature.
From the datasheet, a temperature of 27 degrees Celsius delivers a voltage of 0.706 V.
With each additional degree the voltage reduces by 1.721 mV or 0.001721 V.
The first step in converting the 16-bit temperature is to convert it back to volts, which is done based on the 3.3 V maximum voltage used by the Pico board.
ref: https://how2electronics.com/read-temperature-sensor-value-from-raspberry-pi-pico/
"""
cpu_temp_conversion_factor = 3.3 / 65535
cpu_temp_sensor = machine.ADC(4)
reading = cpu_temp_sensor.read_u16() * cpu_temp_conversion_factor
temperature_c = 27 - (reading - 0.706) / 0.001721
temperature_f = temperature_c * 9/5. + 32.0
return temperature_f
def read_dht_22(sensor):
"""
reads the temperature and humidity from dht.DHT22 sensor.
returns tuple(temperature, humidity) if no errors
returns None if there was an error
"""
try:
sensor.measure()
temperature = sensor.temperature()
humidity = sensor.humidity()
return temperature, humidity
except:
time.sleep(2)
return None
def wlan_up(wlan):
wlan.active(True)
wlan.connect(wifi_config.HOME_WIFI_SSID, wifi_config.HOME_WIFI_PWD)
# Wait for connect or fail
max_wait = 10
while max_wait > 0:
if wlan.status() < 0 or wlan.status() >= 3:
break
max_wait -= 1
if DEBUG:
print('Waiting for WiFi connection...')
time.sleep(1)
if max_wait == 0:
return None
ifconfig = wlan.ifconfig()
if DEBUG:
print(ifconfig)
return ifconfig
def led_error_code(led, error_code: int):
"""Blink LED for a given error code (int). error code == number of times to blink"""
if DEBUG:
print("LED Error Status code: {}".format(error_code))
# Run a quick 'start error code sequence'
# So we know when LED error sequence starts
start_sequence_counter = 0
while start_sequence_counter < 3:
led.value(True)
time.sleep(0.1)
led.value(False)
time.sleep(0.1)
start_sequence_counter += 1
# Run real error code sequence
blink_counter = 0
while blink_counter < error_code:
time.sleep(1)
led.value(True)
time.sleep(1)
led.value(False)
blink_counter += 1
# Make sure to turn off LED when this subroutine finished
led.value(False)
if DEBUG:
print("LED Error Status code finished for: {}".format(error_code))
def main():
# Start Up Activities
if DEBUG:
print("Start up")
count = 0
led = machine.Pin('LED', machine.Pin.OUT)
led.value(False)
led_error_code(led, 1)
# Set Wi-Fi Country
rp2.country('US')
wlan = network.WLAN(network.STA_IF)
# Create MQTT Client
mqtt_client = MQTTClient(mqtt_config.MQTT_CLIENT_ID, mqtt_config.MQTT_HOST_NAME)
# Create DHT22
sensor = dht.DHT22(Pin(DHT_22_GPIO_PIN))
# Let's Go!
if DEBUG:
print("Enter main loop")
while True:
# Loop Clean-Up and Prep
led.value(False)
count += 1
if DEBUG:
print(f'\nStarting loop #{count}... \nWiFI Status is {wlan.status()}.')
# WiFi Connection.
if wlan.status() != 3:
ifconfig = wlan_up(wlan)
if ifconfig is None:
if DEBUG:
print("Trouble to connecting WiFi: {}".format(e))
led_error_code(led, 3)
continue
else:
if DEBUG:
print("Connected to WiFi: {}".format(ifconfig))
else:
ifconfig = wlan.ifconfig()
if DEBUG:
print(f"Skipping WiFi Activation since WiFi Status is {wlan.status()} \n{ifconfig}.")
# MQTT Conneciton.
try:
mqtt_client.connect()
if DEBUG:
print(f'MQTT Connected.')
except Exception as e:
if DEBUG:
print("Trouble to connecting to MQTT: {}".format(e))
led_error_code(led, 2)
time.sleep(5)
continue # Start back at the top of the While Loop
# DHT22 Reading.
dht22_reading = read_dht_22(sensor)
#debug_str = "None"
if dht22_reading is not None:
temp,hum = dht22_reading
temp = temp * 9/5. + 32.0
# CPU Reading.
cpu_temp = read_cpu_temp()
# Timestamp
time_now = time.localtime()
timestamp = "{}/{}/{} {}:{}:{}".format(time_now[1],time_now[2],time_now[0],time_now[3],time_now[4],time_now[5])
# Build JSON Payloads
dht_data = {
'Location':mqtt_config.MQTT_ZONE_ID,
'Temperature':temp,
'Relative Humidity':hum,
}
dht_payload = json.dumps(dht_data)
hw_data = {
'Timestamp':timestamp,
'CPU Temperature':cpu_temp,
'Device':mqtt_config.MQTT_HW_ID,
'WiFi Information':ifconfig,
}
hw_payload = json.dumps(hw_data)
# Publish
mqtt_client.publish("home/{}".format(mqtt_config.MQTT_ZONE_ID),dht_payload, retain=True)
mqtt_client.publish("hw/{}".format(mqtt_config.MQTT_HW_ID),hw_payload, retain=True)
mqtt_client.disconnect()
if DEBUG:
print(f'MQTT Disconnected.')
# Sleep for a bit.
if DEBUG:
print(f'Finished loop #{count}.')
time.sleep(5)
main()