Added Lab 10

This commit is contained in:
Edward Bigos 2026-04-07 07:14:09 -04:00
commit b0d1253215
18 changed files with 885 additions and 0 deletions

View file

@ -0,0 +1,58 @@
import network
import time
import urequests
import machine
import dht
from secrets import WIFI_SSID, WIFI_PASSWORD, AIO_USERNAME, AIO_KEY
# DHT11 setup (GP14)
sensor = dht.DHT11(machine.Pin(14))
# Connect to Wi-Fi
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(WIFI_SSID, WIFI_PASSWORD)
print("Connecting to Wi-Fi...")
while not wlan.isconnected():
time.sleep(1)
print("Connected:", wlan.ifconfig())
# Adafruit IO URLs
temp_url = "https://io.adafruit.com/api/v2/{}/feeds/temperature/data".format(AIO_USERNAME)
hum_url = "https://io.adafruit.com/api/v2/{}/feeds/humidity/data".format(AIO_USERNAME)
headers = {
"X-AIO-Key": AIO_KEY,
"Content-Type": "application/json"
}
while True:
try:
sensor.measure()
temp_c = sensor.temperature()
humidity = sensor.humidity()
# Convert to Fahrenheit
temp_f = (temp_c * 9/5) + 32
print("Temp: {:.1f}°F Humidity: {}%".format(temp_f, humidity))
# Send temperature
temp_data = {"value": temp_f}
r = urequests.post(temp_url, json=temp_data, headers=headers)
r.close()
# Send humidity
hum_data = {"value": humidity}
r = urequests.post(hum_url, json=hum_data, headers=headers)
r.close()
print("Uploaded to Adafruit IO ✅")
except Exception as e:
print("Error:", e)
# Wait 60 seconds
time.sleep(60)

View file

@ -0,0 +1,22 @@
import machine
import utime
import dht
# Set up the DHT11 sensor on GPIO 14
sensor = dht.DHT11(machine.Pin(14))
while True:
try:
sensor.measure()
temp = sensor.temperature()
hum = sensor.humidity()
print("Temperature: {}°C".format(temp))
print("Humidity: {}%".format(hum))
print("------------------------")
except OSError as e:
print("Sensor error:", e)
# Wait 60 seconds
utime.sleep(60)

View file

@ -0,0 +1,25 @@
import machine
import utime
import dht
# Set up the DHT11 sensor on GPIO 14
sensor = dht.DHT11(machine.Pin(14))
while True:
try:
sensor.measure()
temp_c = sensor.temperature()
hum = sensor.humidity()
# Convert to Fahrenheit
temp_f = (temp_c * 9/5) + 32
print("Temperature: {:.1f}°F".format(temp_f))
print("Humidity: {}%".format(hum))
print("------------------------")
except OSError as e:
print("Sensor error:", e)
# Wait 60 seconds
utime.sleep(60)

View file

@ -0,0 +1,95 @@
import network
import time
from umqtt.simple import MQTTClient
import secrets
# Global variable
tsetting = False
last_tsetting = None # Track changes
# WiFi setup
def connect_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(secrets.WIFI_SSID, secrets.WIFI_PASSWORD)
print("Connecting to WiFi...")
while not wlan.isconnected():
time.sleep(1)
print("Connected:", wlan.ifconfig())
# MQTT callback
def message_callback(topic, msg):
topic = topic.decode()
msg = msg.decode()
print("Received:", topic, msg)
if topic.endswith("/mytemperaturesetting"):
try:
value = int(msg)
print("temperaturesetting value:", value)
except:
print("Invalid temperaturesetting value")
elif topic.endswith("/led"):
print("LED command:", msg)
# MQTT setup
def connect_mqtt():
client_id = "picoW-client"
broker = "io.adafruit.com"
port = 1883
username = secrets.AIO_USERNAME
password = secrets.AIO_KEY
client = MQTTClient(client_id, broker, port=port,
user=username, password=password)
client.set_callback(message_callback)
client.connect()
# Feed paths
temperaturesetting_feed = f"{username}/feeds/mytemperaturesetting"
led_feed = f"{username}/feeds/led"
# Subscribe
client.subscribe(temperaturesetting_feed)
client.subscribe(led_feed)
print("Subscribed to feeds")
return client, led_feed
# Publish function
def publish_led(client, led_feed, value):
msg = "ON" if value else "OFF"
client.publish(led_feed, msg)
print("Published to LED feed:", msg)
# Main loop
def main():
global tsetting, last_tsetting
connect_wifi()
client, led_feed = connect_mqtt()
print("Running...")
while True:
client.check_msg()
# 🔁 Example: toggle tsetting every 10 seconds (replace with real logic)
if int(time.time()) % 10 == 0:
tsetting = not tsetting
# ✅ Only publish if value changed
if tsetting != last_tsetting:
publish_led(client, led_feed, tsetting)
last_tsetting = tsetting
time.sleep(1)
# Run
main()

View file

@ -0,0 +1,86 @@
import network
import time
from umqtt.simple import MQTTClient
import secrets
# WiFi setup
def connect_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(secrets.WIFI_SSID, secrets.WIFI_PASSWORD)
print("Connecting to WiFi...")
while not wlan.isconnected():
time.sleep(1)
print("Connected:", wlan.ifconfig())
# MQTT callback
def message_callback(topic, msg):
topic = topic.decode()
msg = msg.decode()
print("Received:", topic, msg)
if topic.endswith("/mytemperaturesetting"):
try:
value = int(msg)
print("temperaturesetting value:", value)
# Add temperaturesetting control here if needed
except:
print("Invalid temperaturesetting value")
elif topic.endswith("/led"):
print("LED command:", msg)
# Example: interpret ON/OFF
if msg.upper() == "ON":
print("Turn LED ON")
# Add LED ON code here
elif msg.upper() == "OFF":
print("Turn LED OFF")
# Add LED OFF code here
else:
print("Unknown LED command")
# MQTT setup
def connect_mqtt():
client_id = "picoW-client"
broker = "io.adafruit.com"
port = 1883
username = secrets.AIO_USERNAME
password = secrets.AIO_KEY
client = MQTTClient(client_id, broker, port=port,
user=username, password=password)
client.set_callback(message_callback)
client.connect()
# Feed paths
temperaturesetting_feed = f"{username}/feeds/temperaturesetting"
led_feed = f"{username}/feeds/led"
# Subscribe to both feeds
client.subscribe(temperaturesetting_feed)
client.subscribe(led_feed)
print("Subscribed to:")
print(" -", temperaturesetting_feed)
print(" -", led_feed)
return client
# Main loop
def main():
connect_wifi()
client = connect_mqtt()
print("Waiting for messages...")
while True:
client.check_msg()
time.sleep(1)
# Run
main()

View file

@ -0,0 +1,65 @@
import network
import time
from umqtt.simple import MQTTClient
import secrets
# WiFi setup
def connect_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(secrets.WIFI_SSID, secrets.WIFI_PASSWORD)
print("Connecting to WiFi...")
while not wlan.isconnected():
time.sleep(1)
print("Connected:", wlan.ifconfig())
# MQTT callback (runs when new message arrives)
def message_callback_temperaturesetting(topic, msg):
print("Received from feed:", topic)
print("Value:", msg)
# Convert to int if needed (for servo control later)
try:
value = int(msg)
print("Parsed value:", value)
print(type(msg))
except:
print("Non-integer value")
# MQTT setup
def connect_mqtt():
client_id = "picoW-client"
broker = "io.adafruit.com"
port = 1883
username = secrets.AIO_USERNAME
password = secrets.AIO_KEY
client = MQTTClient(client_id, broker, port=port,
user=username, password=password)
client.set_callback(message_callback_temperaturesetting)
client.connect()
# Subscribe to your feed
feed_path = f"{username}/feeds/temperaturesetting"
client.subscribe(feed_path)
print("Subscribed to:", feed_path)
return client
# Main loop
def main():
connect_wifi()
client = connect_mqtt()
print("Waiting for messages...")
while True:
client.check_msg() # Non-blocking
time.sleep(1)
# Run
main()

View file

@ -0,0 +1,86 @@
import network
import time
from umqtt.simple import MQTTClient
import secrets
# WiFi setup
def connect_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(secrets.WIFI_SSID, secrets.WIFI_PASSWORD)
print("Connecting to WiFi...")
while not wlan.isconnected():
time.sleep(1)
print("Connected:", wlan.ifconfig())
# MQTT callback
def message_callback(topic, msg):
topic = topic.decode()
msg = msg.decode()
print("Received:", topic, msg)
if topic.endswith("/myservo"):
try:
value = int(msg)
print("Servo value:", value)
# Add servo control here if needed
except:
print("Invalid servo value")
elif topic.endswith("/led"):
print("LED command:", msg)
# Example: interpret ON/OFF
if msg.upper() == "ON":
print("Turn LED ON")
# Add LED ON code here
elif msg.upper() == "OFF":
print("Turn LED OFF")
# Add LED OFF code here
else:
print("Unknown LED command")
# MQTT setup
def connect_mqtt():
client_id = "picoW-client"
broker = "io.adafruit.com"
port = 1883
username = secrets.AIO_USERNAME
password = secrets.AIO_KEY
client = MQTTClient(client_id, broker, port=port,
user=username, password=password)
client.set_callback(message_callback)
client.connect()
# Feed paths
servo_feed = f"{username}/feeds/temperaturesetting"
led_feed = f"{username}/feeds/led"
# Subscribe to both feeds
client.subscribe(servo_feed)
client.subscribe(led_feed)
print("Subscribed to:")
print(" -", servo_feed)
print(" -", led_feed)
return client
# Main loop
def main():
connect_wifi()
client = connect_mqtt()
print("Waiting for messages...")
while True:
client.check_msg()
time.sleep(1)
# Run
main()

View file

@ -0,0 +1,160 @@
import network
import time
from umqtt.simple import MQTTClient
import secrets
import dht
from machine import Pin
# ====== Globals ======
tsetting = False
last_tsetting = None
temperatureSetting = 60
# DHT11 setup (GP14)
dht_sensor = dht.DHT11(Pin(14))
furnaceRelay = machine.Pin('GP16', machine.Pin.OUT) #configure LED Pin as an output pin and create and led object for Pin class
# ====== WiFi ======
def connect_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(secrets.WIFI_SSID, secrets.WIFI_PASSWORD)
print("Connecting to WiFi...")
while not wlan.isconnected():
time.sleep(1)
print("Connected:", wlan.ifconfig())
# ====== MQTT Callback ======
def message_callback(topic, msg):
global tsetting, last_tsetting,temperatureSetting
topic = topic.decode()
msg = msg.decode()
print("Received:", topic, msg)
if topic.endswith("/temperaturesetting"):
try:
value = int(msg)
print("temperaturesetting value::", value)
temperatureSetting = value
print("Temperature Setting == ",temperatureSetting)
except:
print("Invalid temperaturesetting value")
if topic.endswith("/led"):
print("LED command:", msg)
# ====== MQTT Setup ======
def connect_mqtt():
client_id = "picoW-client"
broker = "io.adafruit.com"
port = 1883
username = secrets.AIO_USERNAME
password = secrets.AIO_KEY
client = MQTTClient(client_id, broker, port=port,
user=username, password=password)
client.set_callback(message_callback)
client.connect()
# Feed paths
base = secrets.AIO_USERNAME + "/feeds/"
temperaturesetting_feed = base + "temperaturesetting"
led_feed = base + "led"
humidity_feed = base + "humidity"
temperature_feed = base + "temperature"
# Subscribe
client.subscribe(temperaturesetting_feed)
client.subscribe(led_feed)
print("Subscribed to feeds")
return client, led_feed, humidity_feed, temperature_feed, temperaturesetting_feed
# ====== Publish helpers ======
def publish_led(client, led_feed, value):
msg = "ON" if value else "OFF"
client.publish(led_feed, msg)
print("Published LED:", msg)
def publish_temp_humidity(client, humidity_feed, temperature_feed, humidity, temp_f):
client.publish(humidity_feed, str(humidity))
client.publish(temperature_feed, str(temp_f))
print("Published Humidity:", humidity)
print("Published Temp (F):", temp_f)
# ====== Read DHT11 ======
def read_dht():
try:
dht_sensor.measure()
temp_c = dht_sensor.temperature()
humidity = dht_sensor.humidity()
# Convert to Fahrenheit
temp_f = (temp_c * 9/5) + 32
return temp_f, humidity
except Exception as e:
print("DHT read error:", e)
return None, None
# ====== Main ======
def main():
global tsetting, last_tsetting,temperatureSetting, furnaceRelay
connect_wifi()
client, led_feed, humidity_feed, temperature_feed, temperaturesetting_feed = connect_mqtt()
last_dht_time = 0
while True:
client.check_msg()
# ---- Example toggle logic (replace with real input) ----
# if int(time.time()) % 10 == 0:
# tsetting = not tsetting
# ---- Publish LED state if changed ----
# if tsetting != last_tsetting:
# publish_led(client, led_feed, tsetting)
# last_tsetting = tsetting
# ---- Read DHT every 5 seconds ----
if time.time() - last_dht_time > 15:
temp_f, humidity = read_dht()
if temp_f is not None:
publish_temp_humidity(
client,
humidity_feed,
temperature_feed,
humidity,
temp_f
)
last_dht_time = time.time()
if temperatureSetting > temp_f:
tsetting = True
furnaceRelay.value(True)
else:
tsetting = False
furnaceRelay.value(False)
print("Temperature Setting = ",temperatureSetting)
publish_led(client, led_feed, tsetting)
time.sleep(5)
# Run
main()

View file

@ -0,0 +1,204 @@
import usocket as socket
import ustruct as struct
from ubinascii import hexlify
class MQTTException(Exception):
pass
class MQTTClient:
def __init__(self, client_id, server, port=0, user=None, password=None, keepalive=0,
ssl=False, ssl_params={}):
if port == 0:
port = 8883 if ssl else 1883
self.client_id = client_id
self.sock = None
self.server = server
self.port = port
self.ssl = ssl
self.ssl_params = ssl_params
self.pid = 0
self.cb = None
self.user = user
self.pswd = password
self.keepalive = keepalive
self.lw_topic = None
self.lw_msg = None
self.lw_qos = 0
self.lw_retain = False
def _send_str(self, s):
self.sock.write(struct.pack("!H", len(s)))
self.sock.write(s)
def _recv_len(self):
n = 0
sh = 0
while 1:
b = self.sock.read(1)[0]
n |= (b & 0x7f) << sh
if not b & 0x80:
return n
sh += 7
def set_callback(self, f):
self.cb = f
def set_last_will(self, topic, msg, retain=False, qos=0):
assert 0 <= qos <= 2
assert topic
self.lw_topic = topic
self.lw_msg = msg
self.lw_qos = qos
self.lw_retain = retain
def connect(self, clean_session=True):
self.sock = socket.socket()
addr = socket.getaddrinfo(self.server, self.port)[0][-1]
self.sock.connect(addr)
if self.ssl:
import ussl
self.sock = ussl.wrap_socket(self.sock, **self.ssl_params)
premsg = bytearray(b"\x10\0\0\0\0\0")
msg = bytearray(b"\x04MQTT\x04\x02\0\0")
sz = 10 + 2 + len(self.client_id)
msg[6] = clean_session << 1
if self.user is not None:
sz += 2 + len(self.user) + 2 + len(self.pswd)
msg[6] |= 0xC0
if self.keepalive:
assert self.keepalive < 65536
msg[7] |= self.keepalive >> 8
msg[8] |= self.keepalive & 0x00FF
if self.lw_topic:
sz += 2 + len(self.lw_topic) + 2 + len(self.lw_msg)
msg[6] |= 0x4 | (self.lw_qos & 0x1) << 3 | (self.lw_qos & 0x2) << 3
msg[6] |= self.lw_retain << 5
i = 1
while sz > 0x7f:
premsg[i] = (sz & 0x7f) | 0x80
sz >>= 7
i += 1
premsg[i] = sz
self.sock.write(premsg, i + 2)
self.sock.write(msg)
#print(hex(len(msg)), hexlify(msg, ":"))
self._send_str(self.client_id)
if self.lw_topic:
self._send_str(self.lw_topic)
self._send_str(self.lw_msg)
if self.user is not None:
self._send_str(self.user)
self._send_str(self.pswd)
resp = self.sock.read(4)
assert resp[0] == 0x20 and resp[1] == 0x02
if resp[3] != 0:
raise MQTTException(resp[3])
return resp[2] & 1
def disconnect(self):
self.sock.write(b"\xe0\0")
self.sock.close()
def ping(self):
self.sock.write(b"\xc0\0")
def publish(self, topic, msg, retain=False, qos=0):
pkt = bytearray(b"\x30\0\0\0")
pkt[0] |= qos << 1 | retain
sz = 2 + len(topic) + len(msg)
if qos > 0:
sz += 2
assert sz < 2097152
i = 1
while sz > 0x7f:
pkt[i] = (sz & 0x7f) | 0x80
sz >>= 7
i += 1
pkt[i] = sz
#print(hex(len(pkt)), hexlify(pkt, ":"))
self.sock.write(pkt, i + 1)
self._send_str(topic)
if qos > 0:
self.pid += 1
pid = self.pid
struct.pack_into("!H", pkt, 0, pid)
self.sock.write(pkt, 2)
self.sock.write(msg)
if qos == 1:
while 1:
op = self.wait_msg()
if op == 0x40:
sz = self.sock.read(1)
assert sz == b"\x02"
rcv_pid = self.sock.read(2)
rcv_pid = rcv_pid[0] << 8 | rcv_pid[1]
if pid == rcv_pid:
return
elif qos == 2:
assert 0
def subscribe(self, topic, qos=0):
assert self.cb is not None, "Subscribe callback is not set"
pkt = bytearray(b"\x82\0\0\0")
self.pid += 1
struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic) + 1, self.pid)
#print(hex(len(pkt)), hexlify(pkt, ":"))
self.sock.write(pkt)
self._send_str(topic)
self.sock.write(qos.to_bytes(1, "little"))
while 1:
op = self.wait_msg()
if op == 0x90:
resp = self.sock.read(4)
#print(resp)
assert resp[1] == pkt[2] and resp[2] == pkt[3]
if resp[3] == 0x80:
raise MQTTException(resp[3])
return
# Wait for a single incoming MQTT message and process it.
# Subscribed messages are delivered to a callback previously
# set by .set_callback() method. Other (internal) MQTT
# messages processed internally.
def wait_msg(self):
res = self.sock.read(1)
self.sock.setblocking(True)
if res is None:
return None
if res == b"":
raise OSError(-1)
if res == b"\xd0": # PINGRESP
sz = self.sock.read(1)[0]
assert sz == 0
return None
op = res[0]
if op & 0xf0 != 0x30:
return op
sz = self._recv_len()
topic_len = self.sock.read(2)
topic_len = (topic_len[0] << 8) | topic_len[1]
topic = self.sock.read(topic_len)
sz -= topic_len + 2
if op & 6:
pid = self.sock.read(2)
pid = pid[0] << 8 | pid[1]
sz -= 2
msg = self.sock.read(sz)
self.cb(topic, msg)
if op & 6 == 2:
pkt = bytearray(b"\x40\x02\0\0")
struct.pack_into("!H", pkt, 2, pid)
self.sock.write(pkt)
elif op & 6 == 4:
assert 0
# Checks whether a pending message from server is available.
# If not, returns immediately with None. Otherwise, does
# the same processing as wait_msg.
def check_msg(self):
self.sock.setblocking(False)
return self.wait_msg()

View file

@ -0,0 +1,26 @@
secrets = {
'location': 'Stcc',
'ssid': 'cset@stcc',
'password': 'c1s2e3t4',
}
WIFI_SSID = "cset@stcc"
WIFI_PASSWORD = "c1s2e3t4"
#AIO_USERNAME = "your_adafruit_username"
#AIO_KEY = "your_adafruit_key"
# The following data is INVALID and made up. Go to your Adafruit
# dashboard, open the key icon, then replace the information with
# your own username and key.
AIO_USERNAME = "csetuser"
AIO_KEY = "aio_qzZV9854jSpSpBSvKPNaNwP9hDaj"
# Be sure to create the following feeds on io.adafruit.com:
# humidity
# temperature
# led
# temperaturesetting

View file

@ -0,0 +1,32 @@
Run Code on Power Up
(From Google AI)
To run a MicroPython script automatically on power-up, you must
save your code to the device's filesystem with the specific filename main.py. The board's bootloader is programmed to look for this file and execute its contents after finishing its initial setup process.
Steps to Save and Run Code on Power-up
The general process involves connecting your board to a computer, accessing its file system, and saving your script with the required name.
Connect your board: Plug your MicroPython board (like a Raspberry Pi Pico or ESP32) into your computer via USB.
Open your IDE/Editor: Use a MicroPython-compatible IDE like Thonny to connect to the board's serial port.
Save your file as main.py:
In Thonny, write your code in the editor window.
Go to File > Save as.
Select the option to save the file to the "Raspberry Pi Pico"
(or your specific board's name) instead of your computer.
Name the file main.py and click OK or Save.
Power cycle the board: Eject or unmount the device safely from your computer's file system, and then unplug and re-plug the USB cable (or press the RST button if available).
Upon reconnection, the code within main.py will run automatically.
Advanced Boot Options
MicroPython actually uses two special files during the boot process:
boot.py: This file runs first and is intended for low-level configuration, such as setting up the system path or network connections. You typically do not need to modify this file unless you are customizing the boot process itself.
main.py: This file runs after boot.py and should contain your primary application logic.
By correctly naming your script main.py, you ensure it behaves like an embedded system program, running immediately when power is applied, without needing a computer connection or manual command.

View file

@ -0,0 +1,10 @@
# MicroPython Files
---
Directories & Files
---
MicroPython: MicroPython firmware for the Pico 2 W
Code: MicroPython code for the lab.
Documentation: Any additional documentation files.

16
Lab10/README.md Normal file
View file

@ -0,0 +1,16 @@
# Lab 10
## Reading Temperature and Humidity.
---
Directories & Files
---
MicroPython: MicroPython code and documentation. It includes the MicroPython Version 1.27.0 Pico W firmware.
flash_nuke.uf2: Erases all existing firmware on the Pico W.
MicroPython/code: Working code for the lab.
Documentation: Pin diagrams, web links, and other information.

BIN
Lab10/flash_nuke.uf2 Normal file

Binary file not shown.

Binary file not shown.