前段時間、卒業設計のためにラズベリーパイ 4B を購入しました。卒業発表が終わった後、ラズベリーパイは放置されていました。ローカルネットワークのクラウドストレージを構築する以外に、以前学んだ OpenCV を使って、カメラと人体センサーを購入し、ラズベリーパイに接続して簡単な「おはよう」通知機能を作りました。毎朝、机の前に座るときに WeChat で「おはよう」の挨拶と天気予報を送信します。
全体の設計思路は、毎朝 7 時にラズベリーパイが py スクリプトを実行し、人体センサーで机の前に人がいるかどうかを検出します。もし人がいれば、カメラを開いて本人かどうかを判断し、本人であれば WeChat でメッセージを送信します。思考はとてもシンプルで、コードも順調に書けて大きな問題には直面しませんでした。ただ、人体センサーを接続するときに正負極を間違えてしまい、1 つの人体センサーを壊してしまい、手も少し火傷しました∠(ᐛ」∠)_
Note
前期準備
🖥️ OpenCV + Mediapipe 環境がインストールされたラズベリーパイ
📹 カメラ 1 台
✋ 人体センサー 1 個 (またはそれ以上)
🔌 母対母のジャンパー線数本
🎤 Wecom 酱を構築したサーバー(またはラズベリーパイ内に構築することも可能)
人体センサー#
人体センサーは淘宝で適当なものを購入できますが、感知距離が少し短い(約 1 メートル程度)のものをお勧めします。ここで購入したセンサーは感知距離が 5〜10 メートルで、最小の感知距離に調整しても、日常使用では少し敏感すぎると感じました。
人体センサーの VCC ピンをラズベリーパイの 4 番 5V ピンに接続し、GND ピンをラズベリーパイの 6 番 GND ピンに接続します。OUT ピンはラズベリーパイの任意の GPIO ピンに接続できますが、ここでは 38 番の GPIO.28 ピンに接続します。
Important
VCC と GND を逆に接続しないように注意してください。そうしないとセンサーが壊れてしまいます。VCC ピンはラズベリーパイの 3.3V ピンまたは 5V ピンに接続するかは、センサーの動作電圧を確認してください。
次に、以下のコードをラズベリーパイに投入して実行します:
import RPi.GPIO as GPIO
import time
GPIO.setmode(GPIO.BCM) # ピン番号をBCMモードに設定
GPIO.setup(20, GPIO.IN) # GPIO.28 (BCM番号は20)を入力モードに設定
while True:
if GPIO.input(20):
print("HIGH")
elif GPIO.input(20) == 0:
print("LOW")
time.sleep(1)
すべてが順調であれば、手を人体センサーの前に置くと HIGH と出力され、しばらく HIGH の状態が続きます(センサーには遅延が設定されています)。検出されない場合は LOW と出力されます。これで人体センサーの動作の初期調整が完了しました。
顔認識の前期準備#
顔認識には OpenCV + Mediapipe を使用してカメラから顔画像を取得し、取得した顔を API を通じて Megvii のFace++ プラットフォームにアップロードして顔認識の結果を得ます。
Face++ 顔検出#
まず、データベースの顔用に自撮りの写真を用意します。multipart/form-data
メソッドを使用して顔検出 APIを POST 呼び出しし、face_token を取得します。face_token はこの画像から検出された各顔を表し、一意性を持ち、後で顔比較に使用できます。
import urllib.request
import urllib.error
import time
import json
def uploadImage(postURL,imgURL):
# multipart/form-dataリクエストボディを構築
border = '----------%s' % hex(int(time.time() * 1000))
postData = []
postData.append('--%s' % border)
postData.append('Content-Disposition: form-data; name="%s"\r\n' % 'api_key')
postData.append('XXXXXXXXXXX') # api_key
postData.append('--%s' % border)
postData.append('Content-Disposition: form-data; name="%s"\r\n' % 'api_secret')
postData.append('XXXXXXXXXXX') # api_secret
postData.append('--%s' % border)
fr = open(imgURL, 'rb')
postData.append('Content-Disposition: form-data; name="%s"; filename=" "' % 'image_file')
postData.append('Content-Type: %s\r\n' % 'application/octet-stream')
postData.append(fr.read())
fr.close()
postData.append('--%s--\r\n' % border)
for i, d in enumerate(postData):
if isinstance(d, str):
postData[i] = d.encode('utf-8')
http_body = b'\r\n'.join(postData)
req = urllib.request.Request(url=postURL, data=http_body)
req.add_header('Content-Type', 'multipart/form-data; boundary=%s' % border)
try:
res = urllib.request.urlopen(req, timeout=5)
qrcont = res.read()
resJson = json.loads(qrcont.decode('utf-8'))
print(resJson['faces'][0]['face_token'])
except urllib.error.HTTPError as e:
print(e.read().decode('utf-8'))
pass
uploadImage(postURL='https://api-cn.faceplusplus.com/facepp/v3/detect',imgURL='./face.jpg')
face_token を取得した後、私たちはそれを faceset に保存していないため、72 時間後に自動的に無効になります。次のステップでは、face_token を長期使用のために保存する顔ライブラリを作成する必要があります。
Face++ 顔ライブラリの作成#
POST メソッドを使用して顔ライブラリ APIを呼び出すことで、顔ライブラリを作成できます。
import requests
CreatFacesetdata = {
'api_key': 'XXXXXXXXXXX', # api_key
'api_secret': 'XXXXXXXXXXX' # api_secret
}
CreatFacesetRes = requests.post('https://api-cn.faceplusplus.com/facepp/v3/faceset/create', data=CreatFacesetdata)
CreatFacesetResJson = CreatFacesetRes.json()
faceset_token = CreatFacesetResJson['faceset_token']
print(faceset_token)
faceset_token を取得した後、顔ライブラリに顔を追加できます。
Face++ 顔の追加#
同様に、顔ライブラリに顔を追加するのも非常に簡単で、上記の手順と同じです。
import requests
UploadFacedata = {
'api_key': 'XXXXXXXXXXX', # api_key
'api_secret': 'XXXXXXXXXXX', # api_secret
'faceset_token': 'XXXXXXXXXXX', # faceset_token
'face_tokens': 'XXXXXXXXXXX' # face_token
}
UploadFaceRes = requests.post('https://api-cn.faceplusplus.com/facepp/v3/faceset/addface', data=UploadFacedata)
UploadFaceResJson = UploadFaceRes.json()
print(UploadFaceResJson)
これで顔認識の前期準備が完了しました。 Face++ のドキュメントセンターには、さまざまな API の詳細な説明がありますので、具体的な使用方法を確認できます。
メインプログラム#
天気情報の取得には、和風天気のリアルタイム天気 APIを使用します。https://devapi.qweather.com/v7/weather/now?location={都市ID}&key={key}
に GET リクエストを送信して天気状況を取得します。顔比較は顔比較 APIを呼び出すことで実現します。同様にファイルをアップロードする必要があるため、multipart/form-data
メソッドを使用して API を POST 呼び出しします。完全なプログラムは以下の通りです:
import cv2
import time
import mediapipe as mp
import RPi.GPIO as GPIO
import requests
import urllib.request
import urllib.error
import time
import datetime
import json
import os
GPIO.setmode(GPIO.BCM)
GPIO.setup(20, GPIO.IN)
def getWeather(location, Key):
# 天気を取得
weatherData = requests.get(
'https://devapi.qweather.com/v7/weather/now?location=' + location + '&key=' + Key
)
weatherJson = weatherData.json()
nowTemperature = weatherJson['now']['temp']
nowWeather = weatherJson['now']['text']
nowFeelsLike = weatherJson['now']['feelsLike']
nowWindDir = weatherJson['now']['windDir']
nowWindScale = weatherJson['now']['windScale']
return nowWeather, nowFeelsLike, nowTemperature, nowWindDir, nowWindScale
def bodyCheak():
# 人体センサー情報
if GPIO.input(20):
return 1
else:
return 0
def faceCompare(imgURL):
# 顔比較
border = '----------%s' % hex(int(time.time() * 1000))
postData = []
postData.append('--%s' % border)
postData.append('Content-Disposition: form-data; name="%s"\r\n' % 'api_key')
postData.append('XXXXXXXXXXX') # api_key
postData.append('--%s' % border)
postData.append('Content-Disposition: form-data; name="%s"\r\n' % 'api_secret')
postData.append('XXXXXXXXXXX') # api_secret
postData.append('--%s' % border)
fr = open(imgURL, 'rb')
postData.append('Content-Disposition: form-data; name="%s"; filename=" "' % 'image_file1')
postData.append('Content-Type: %s\r\n' % 'application/octet-stream')
postData.append(fr.read())
fr.close()
postData.append('--%s' % border)
postData.append('Content-Disposition: form-data; name="%s"\r\n' % 'face_token2')
postData.append('XXXXXXXXXXX') # face_token
postData.append('--%s--\r\n' % border)
for i, d in enumerate(postData):
if isinstance(d, str):
postData[i] = d.encode('utf-8')
http_body = b'\r\n'.join(postData)
req = urllib.request.Request(
url='https://api-cn.faceplusplus.com/facepp/v3/compare',
data=http_body
)
req.add_header('Content-Type', 'multipart/form-data; boundary=%s' % border)
try:
res = urllib.request.urlopen(req, timeout=5)
qrcont = res.read()
resJson = json.loads(qrcont.decode('utf-8'))
if 'confidence' in resJson:
return resJson['confidence'], resJson['thresholds']['1e-5']
else:
print('顔が検出されませんでした')
return 0, 100
except urllib.error.HTTPError as e:
print(e.read().decode('utf-8'))
weather, feelsLike, temperature, windDir, windScale = getWeather(
location='XXXXXXXXXXX',
Key='XXXXXXXXXXX'
)
pushText = 'Vinking、おはよう~ 今日は' + str(datetime.date.today().year) + '年' \
+ str(datetime.date.today().month) + '月' \
+ str(datetime.date.today().day) + '日' \
+ '、天気は' + weather + '、温度は' + temperature + '°、体感温度は' \
+ feelsLike + '°、' + windDir + windScale + '級の風が吹いています。新しい一日を頑張りましょう~'
while True:
if bodyCheak():
Capture = cv2.VideoCapture(0)
mp_face_mesh = mp.solutions.face_mesh
faceMesh = mp_face_mesh.FaceMesh(max_num_faces=1)
faceDetectionTime, videosTime = 0, 0
while True:
success, videos = Capture.read()
imgRGB = cv2.cvtColor(videos, cv2.COLOR_BGR2RGB)
results = faceMesh.process(imgRGB)
faces = []
if results.multi_face_landmarks:
for faceLms in results.multi_face_landmarks:
face = []
for id, lm in enumerate(faceLms.landmark):
ih, iw, ic = videos.shape
x, y = int(lm.x * iw), int(lm.y * ih)
# 顔の左輪郭特徴点
faceLeft = faceLms.landmark[234]
faceLeft_X = int(faceLeft.x * iw)
# 顔の上輪郭特徴点
faceTop = faceLms.landmark[10]
faceTop_Y = int(faceTop.y * ih)
# 顔の右輪郭特徴点
faceRight = faceLms.landmark[454]
faceRight_X = int(faceRight.x * iw)
# 顔の下輪郭特徴点
faceBottom = faceLms.landmark[152]
faceBottom_Y = int(faceBottom.y * ih)
face.append([x, y])
faces.append(face)
videosTime += 1
if len(faces) != 0:
videosTime = 0
faceDetectionTime += 1
if faceDetectionTime >= 20:
cv2.imwrite("./Face.jpg", videos[faceTop_Y: faceBottom_Y, faceLeft_X: faceRight_X])
confidence, thresholds = faceCompare(imgURL='./Face.jpg')
if confidence >= thresholds:
print('成功')
requests.get(
# WeChat通知
'https://speak.vinking.top/?text=[おはよう通知]' + pushText
)
if os.path.exists('./Face.jpg'):
os.remove('./Face.jpg')
Capture.release()
cv2.destroyAllWindows()
GPIO.cleanup()
exit(0)
else:
print('失敗')
if os.path.exists('./Face.jpg'):
os.remove('./Face.jpg')
break
elif videosTime >= 60:
print('タイムアウト')
break
Capture.release()
time.sleep(1)
毎日 7 時に定時実行するには、crontab -e
コマンドを入力し、ファイルに0 7 * * * python3 {ファイルパス}
と入力して、Ctrl + O で保存し、Ctrl + X で終了すれば OK です。
最後に#
ずっと一整套のスマートホームを持ちたいと思っていましたが、価格が少し高いため、別の方法で類似の機能を実現することにしました。そして、機能が成功して動作したときは非常に達成感があります。おそらく、直接購入して体験することでは得られない喜びの一種でしょう。
Ps. スマートホーム(例えば、小米のスマートプラグ)を同時に制御したい場合、ここで見つけた小米スマートデバイスの Python ライブラリを使えば、類似の機能を実現できるかもしれません。
この記事はMix Spaceによって xLog に同期更新されました。元のリンクはhttps://www.vinking.top/posts/daily/raspberry-pi-morning-alarm-weather-forecast