Some time ago, I bought a Raspberry Pi 4B to work on my graduation project. After passing the graduation defense, the Raspberry Pi was left idle. Besides setting up a local cloud disk, I thought about using OpenCV, which I had studied for a while, so I bought a camera and a human sensor to connect to the Raspberry Pi and created a simple good morning push notification feature. It can send a WeChat message every morning with a greeting and the weather forecast when I sit down at my desk.
The overall design idea is that at 7 AM every morning, the Raspberry Pi runs a Python script to detect if someone is at the desk using the human sensor. If someone is present, it turns on the camera to determine if it is the right person. If it is, it sends a message via WeChat. The concept is simple, and the coding went smoothly without any major issues, except that I accidentally reversed the positive and negative terminals when connecting the human sensor, which burned out one sensor and burned my hand a bit ∠( ᐛ」∠)_
Note
Preparation
🖥️ A Raspberry Pi with OpenCV + Mediapipe environment installed
📹 A camera
✋ A human sensor (or more)
🔌 Several female-to-female DuPont wires
🎤 A server set up with Wecom Chan (or it can also be set up on the Raspberry Pi, as you wish)
Human Sensor#
Any human sensor from Taobao will do, but it is recommended to buy one with a shorter sensing distance (about 1 meter), because the sensor I bought has a sensing distance of 5 to 10 meters. Even when set to the minimum sensing distance, it is still a bit too sensitive for daily use.
The VCC pin of the human sensor connects to the 5V pin (pin 4) of the Raspberry Pi, the GND pin connects to the GND pin (pin 6) of the Raspberry Pi, and the OUT pin can be connected to any GPIO pin on the Raspberry Pi. Here, I chose to connect it to GPIO.28 (pin 38).
Important
Be careful not to reverse the VCC and GND connections, or it will directly burn out the sensor. The VCC pin should connect to the Raspberry Pi's 3.3V or 5V pin depending on the sensor's operating voltage.
Next, throw the following code into the Raspberry Pi and run it:
import RPi.GPIO as GPIO
import time
GPIO.setmode(GPIO.BCM) # Set pin numbering to BCM mode
GPIO.setup(20, GPIO.IN) # Set GPIO.28 (BCM number 20) as input mode
while True:
if GPIO.input(20):
print("HIGH")
elif GPIO.input(20) == 0:
print("LOW")
time.sleep(1)
If everything goes smoothly, when you place your hand in front of the human sensor, it will output HIGH, and it will stay HIGH for a while (the sensor has a delay set). If nothing is detected, it will output LOW. This completes the first step of debugging the human sensor's operation.
Preparation for Face Recognition#
For face recognition, OpenCV + Mediapipe is used to capture the face image from the camera, and the captured face is uploaded to Megvii's Face++ platform via API for face recognition results.
Face++ Face Detection#
First, prepare a selfie to use as a database face. Use the multipart/form-data
method to POST call the Face Detection API to obtain the face_token. The face_token represents each detected face in the image and is unique, which can be used for subsequent face comparisons.
import urllib.request
import urllib.error
import time
import json
def uploadImage(postURL,imgURL):
# Construct multipart/form-data request body
border = '----------%s' % hex(int(time.time() * 1000))
postData = []
postData.append('--%s' % border)
postData.append('Content-Disposition: form-data; name="%s"\r\n' % 'api_key')
postData.append('XXXXXXXXXXX') #api_key
postData.append('--%s' % border)
postData.append('Content-Disposition: form-data; name="%s"\r\n' % 'api_secret')
postData.append('XXXXXXXXXXX') #api_secret
postData.append('--%s' % border)
fr = open(imgURL, 'rb')
postData.append('Content-Disposition: form-data; name="%s"; filename=" "' % 'image_file')
postData.append('Content-Type: %s\r\n' % 'application/octet-stream')
postData.append(fr.read())
fr.close()
postData.append('--%s--\r\n' % border)
for i, d in enumerate(postData):
if isinstance(d, str):
postData[i] = d.encode('utf-8')
http_body = b'\r\n'.join(postData)
req = urllib.request.Request(url=postURL, data=http_body)
req.add_header('Content-Type', 'multipart/form-data; boundary=%s' % border)
try:
res = urllib.request.urlopen(req, timeout=5)
qrcont = res.read()
resJson = json.loads(qrcont.decode('utf-8'))
print(resJson['faces'][0]['face_token'])
except urllib.error.HTTPError as e:
print(e.read().decode('utf-8'))
pass
uploadImage(postURL='https://api-cn.faceplusplus.com/facepp/v3/detect',imgURL='./face.jpg')
After obtaining the face_token, since we did not store it in the faceset, it will automatically expire after 72 hours. Therefore, the next step is to create a face library to store the face_token for long-term use.
Face++ Create Face Library#
You can create a face library by calling the Create Face Library API using the POST method.
import requests
CreatFacesetdata = {
'api_key': 'XXXXXXXXXXX', #api_key
'api_secret': 'XXXXXXXXXXX' #api_secret
}
CreatFacesetRes = requests.post('https://api-cn.faceplusplus.com/facepp/v3/faceset/create', data=CreatFacesetdata)
CreatFacesetResJson = CreatFacesetRes.json()
faceset_token = CreatFacesetResJson['faceset_token']
print(faceset_token)
After obtaining the faceset_token, you can add faces to the face library.
Face++ Add Face#
Similarly, adding a face to the face library is also very simple and follows the same steps as above.
import requests
UploadFacedata = {
'api_key': 'XXXXXXXXXXX', #api_key
'api_secret': 'XXXXXXXXXXX', #api_secret
'faceset_token': 'XXXXXXXXXXX', #faceset_token
'face_tokens': 'XXXXXXXXXXX' #face_token
}
UploadFaceRes = requests.post('https://api-cn.faceplusplus.com/facepp/v3/faceset/addface', data=UploadFacedata)
UploadFaceResJson = UploadFaceRes.json()
print(UploadFaceResJson)
Thus, the preparation for face recognition is complete. The Face++ documentation center has more detailed introductions to various APIs, and you can click in to see the specific usage methods.
Main Program#
For obtaining weather information, I used the He Feng Weather Real-time Weather API. By sending a GET request to https://devapi.qweather.com/v7/weather/now?location={City ID}&key={key}
, you can get the weather conditions. Face comparison is achieved by calling the Face Comparison API. Since it also requires file uploads, the multipart/form-data
method is used to POST call the API. The complete program is as follows:
import cv2
import time
import mediapipe as mp
import RPi.GPIO as GPIO
import requests
import urllib.request
import urllib.error
import time
import datetime
import json
import os
GPIO.setmode(GPIO.BCM)
GPIO.setup(20, GPIO.IN)
def getWeather(location, Key):
# Get weather
weatherData = requests.get(
'https://devapi.qweather.com/v7/weather/now?location=' + location + '&key=' + Key
)
weatherJson = weatherData.json()
nowTemperature = weatherJson['now']['temp']
nowWeather = weatherJson['now']['text']
nowFeelsLike = weatherJson['now']['feelsLike']
nowWindDir = weatherJson['now']['windDir']
nowWindScale = weatherJson['now']['windScale']
return nowWeather, nowFeelsLike, nowTemperature, nowWindDir, nowWindScale
def bodyCheak():
# Human sensor information
if GPIO.input(20):
return 1
else:
return 0
def faceCompare(imgURL):
# Face comparison
border = '----------%s' % hex(int(time.time() * 1000))
postData = []
postData.append('--%s' % border)
postData.append('Content-Disposition: form-data; name="%s"\r\n' % 'api_key')
postData.append('XXXXXXXXXXX') # api_key
postData.append('--%s' % border)
postData.append('Content-Disposition: form-data; name="%s"\r\n' % 'api_secret')
postData.append('XXXXXXXXXXX') # api_secret
postData.append('--%s' % border)
fr = open(imgURL, 'rb')
postData.append('Content-Disposition: form-data; name="%s"; filename=" "' % 'image_file1')
postData.append('Content-Type: %s\r\n' % 'application/octet-stream')
postData.append(fr.read())
fr.close()
postData.append('--%s' % border)
postData.append('Content-Disposition: form-data; name="%s"\r\n' % 'face_token2')
postData.append('XXXXXXXXXXX') # face_token
postData.append('--%s--\r\n' % border)
for i, d in enumerate(postData):
if isinstance(d, str):
postData[i] = d.encode('utf-8')
http_body = b'\r\n'.join(postData)
req = urllib.request.Request(
url='https://api-cn.faceplusplus.com/facepp/v3/compare',
data=http_body
)
req.add_header('Content-Type', 'multipart/form-data; boundary=%s' % border)
try:
res = urllib.request.urlopen(req, timeout=5)
qrcont = res.read()
resJson = json.loads(qrcont.decode('utf-8'))
if 'confidence' in resJson:
return resJson['confidence'], resJson['thresholds']['1e-5']
else:
print('No Face Detected')
return 0, 100
except urllib.error.HTTPError as e:
print(e.read().decode('utf-8'))
weather, feelsLike, temperature, windDir, windScale = getWeather(
location='XXXXXXXXXXX',
Key='XXXXXXXXXXX'
)
pushText = 'Vinking, good morning~ Today is ' + str(datetime.date.today().year) + ' year ' \
+ str(datetime.date.today().month) + ' month ' \
+ str(datetime.date.today().day) + ' day ' \
+ ' , the weather is ' + weather + ' , the temperature is ' + temperature + '° , the feels like temperature ' \
+ feelsLike + '° , blowing ' + windDir + windScale + ' level, let’s work hard for the new day~'
while True:
if bodyCheak():
Capture = cv2.VideoCapture(0)
mp_face_mesh = mp.solutions.face_mesh
faceMesh = mp_face_mesh.FaceMesh(max_num_faces=1)
faceDetectionTime, videosTime = 0, 0
while True:
success, videos = Capture.read()
imgRGB = cv2.cvtColor(videos, cv2.COLOR_BGR2RGB)
results = faceMesh.process(imgRGB)
faces = []
if results.multi_face_landmarks:
for faceLms in results.multi_face_landmarks:
face = []
for id, lm in enumerate(faceLms.landmark):
ih, iw, ic = videos.shape
x, y = int(lm.x * iw), int(lm.y * ih)
# Left contour feature point of the face
faceLeft = faceLms.landmark[234]
faceLeft_X = int(faceLeft.x * iw)
# Top contour feature point of the face
faceTop = faceLms.landmark[10]
faceTop_Y = int(faceTop.y * ih)
# Right contour feature point of the face
faceRight = faceLms.landmark[454]
faceRight_X = int(faceRight.x * iw)
# Bottom contour feature point of the face
faceBottom = faceLms.landmark[152]
faceBottom_Y = int(faceBottom.y * ih)
face.append([x, y])
faces.append(face)
videosTime += 1
if len(faces) != 0:
videosTime = 0
faceDetectionTime += 1
if faceDetectionTime >= 20:
cv2.imwrite("./Face.jpg", videos[faceTop_Y: faceBottom_Y, faceLeft_X: faceRight_X])
confidence, thresholds = faceCompare(imgURL='./Face.jpg')
if confidence >= thresholds:
print('Success')
requests.get(
# WeChat push
'https://speak.vinking.top/?text=[Good Morning Push]' + pushText
)
if os.path.exists('./Face.jpg'):
os.remove('./Face.jpg')
Capture.release()
cv2.destroyAllWindows()
GPIO.cleanup()
exit(0)
else:
print('False')
if os.path.exists('./Face.jpg'):
os.remove('./Face.jpg')
break
elif videosTime >= 60:
print('Timeout')
break
Capture.release()
time.sleep(1)
To execute it at 7 AM every day, just enter the command crontab -e
, then input 0 7 * * * python3 {file path}
in the file, and then Ctrl + O to save and Ctrl + X to exit.
Finally#
I have always wanted a complete set of smart home devices, but the price is a bit high, so I had to find another way to achieve similar functionality. Moreover, when the function successfully runs, it is still very fulfilling, perhaps it is a kind of joy that cannot be experienced by directly buying it.
Ps. If you want to control smart home devices (like Xiaomi smart plugs) to turn them on and off at the same time, I found a Python library for Xiaomi smart devices, which might achieve similar functionality.
This article is synchronized by Mix Space to xLog. The original link is https://www.vinking.top/posts/daily/raspberry-pi-morning-alarm-weather-forecast