banner
Vinking

Vinking

你写下的每一个BUG 都是人类反抗被人工智能统治的一颗子弹

ラズベリーパイ、おはよう、私と夏

前段時間、卒業設計のためにラズベリーパイ 4B を購入しました。卒業発表が終わった後、ラズベリーパイは放置されていました。ローカルネットワークのクラウドストレージを構築する以外に、以前学んだ OpenCV を使って、カメラと人体センサーを購入し、ラズベリーパイに接続して簡単な「おはよう」通知機能を作りました。毎朝、机の前に座るときに WeChat で「おはよう」の挨拶と天気予報を送信します。

おはようの挨拶 & 天気予報

全体の設計思路は、毎朝 7 時にラズベリーパイが py スクリプトを実行し、人体センサーで机の前に人がいるかどうかを検出します。もし人がいれば、カメラを開いて本人かどうかを判断し、本人であれば WeChat でメッセージを送信します。思考はとてもシンプルで、コードも順調に書けて大きな問題には直面しませんでした。ただ、人体センサーを接続するときに正負極を間違えてしまい、1 つの人体センサーを壊してしまい、手も少し火傷しました∠(ᐛ」∠)_

Note

前期準備

🖥️ OpenCV + Mediapipe 環境がインストールされたラズベリーパイ

📹 カメラ 1 台

✋ 人体センサー 1 個 (またはそれ以上)

🔌 母対母のジャンパー線数本

🎤 Wecom 酱を構築したサーバー(またはラズベリーパイ内に構築することも可能)

人体センサー#

人体センサーは淘宝で適当なものを購入できますが、感知距離が少し短い(約 1 メートル程度)のものをお勧めします。ここで購入したセンサーは感知距離が 5〜10 メートルで、最小の感知距離に調整しても、日常使用では少し敏感すぎると感じました。

ラズベリーパイピン配置図 & 人体センサーのピン

人体センサーの VCC ピンをラズベリーパイの 4 番 5V ピンに接続し、GND ピンをラズベリーパイの 6 番 GND ピンに接続します。OUT ピンはラズベリーパイの任意の GPIO ピンに接続できますが、ここでは 38 番の GPIO.28 ピンに接続します。

Important

VCC と GND を逆に接続しないように注意してください。そうしないとセンサーが壊れてしまいます。VCC ピンはラズベリーパイの 3.3V ピンまたは 5V ピンに接続するかは、センサーの動作電圧を確認してください。

センサー接続

次に、以下のコードをラズベリーパイに投入して実行します:

import RPi.GPIO as GPIO
import time

GPIO.setmode(GPIO.BCM) # ピン番号をBCMモードに設定
GPIO.setup(20, GPIO.IN) # GPIO.28 (BCM番号は20)を入力モードに設定
while True:
    if GPIO.input(20):
        print("HIGH")
    elif GPIO.input(20) == 0:
        print("LOW")
    time.sleep(1)

すべてが順調であれば、手を人体センサーの前に置くと HIGH と出力され、しばらく HIGH の状態が続きます(センサーには遅延が設定されています)。検出されない場合は LOW と出力されます。これで人体センサーの動作の初期調整が完了しました。

人体センサー出力

顔認識の前期準備#

顔認識には OpenCV + Mediapipe を使用してカメラから顔画像を取得し、取得した顔を API を通じて Megvii のFace++ プラットフォームにアップロードして顔認識の結果を得ます。

Face++ 顔検出#

まず、データベースの顔用に自撮りの写真を用意します。multipart/form-dataメソッドを使用して顔検出 APIを POST 呼び出しし、face_token を取得します。face_token はこの画像から検出された各顔を表し、一意性を持ち、後で顔比較に使用できます。

import urllib.request
import urllib.error
import time
import json

def uploadImage(postURL,imgURL):
    # multipart/form-dataリクエストボディを構築
    border = '----------%s' % hex(int(time.time() * 1000))
    postData = []
    postData.append('--%s' % border)
    postData.append('Content-Disposition: form-data; name="%s"\r\n' % 'api_key')
    postData.append('XXXXXXXXXXX') # api_key
    postData.append('--%s' % border)
    postData.append('Content-Disposition: form-data; name="%s"\r\n' % 'api_secret')
    postData.append('XXXXXXXXXXX') # api_secret
    postData.append('--%s' % border)
    fr = open(imgURL, 'rb')
    postData.append('Content-Disposition: form-data; name="%s"; filename=" "' % 'image_file')
    postData.append('Content-Type: %s\r\n' % 'application/octet-stream')
    postData.append(fr.read())
    fr.close()
    postData.append('--%s--\r\n' % border)
    for i, d in enumerate(postData):
        if isinstance(d, str):
            postData[i] = d.encode('utf-8')
    http_body = b'\r\n'.join(postData)
    req = urllib.request.Request(url=postURL, data=http_body)
    req.add_header('Content-Type', 'multipart/form-data; boundary=%s' % border)
    try:
        res = urllib.request.urlopen(req, timeout=5)
        qrcont = res.read()
        resJson = json.loads(qrcont.decode('utf-8'))
        print(resJson['faces'][0]['face_token'])
    except urllib.error.HTTPError as e:
        print(e.read().decode('utf-8'))
        pass

uploadImage(postURL='https://api-cn.faceplusplus.com/facepp/v3/detect',imgURL='./face.jpg')

face_token を取得した後、私たちはそれを faceset に保存していないため、72 時間後に自動的に無効になります。次のステップでは、face_token を長期使用のために保存する顔ライブラリを作成する必要があります。

Face++ 顔ライブラリの作成#

POST メソッドを使用して顔ライブラリ APIを呼び出すことで、顔ライブラリを作成できます。

import requests

CreatFacesetdata = {
    'api_key': 'XXXXXXXXXXX', # api_key
    'api_secret': 'XXXXXXXXXXX' # api_secret
}
CreatFacesetRes = requests.post('https://api-cn.faceplusplus.com/facepp/v3/faceset/create', data=CreatFacesetdata)
CreatFacesetResJson = CreatFacesetRes.json()
faceset_token = CreatFacesetResJson['faceset_token']
print(faceset_token)

faceset_token を取得した後、顔ライブラリに顔を追加できます。

Face++ 顔の追加#

同様に、顔ライブラリに顔を追加するのも非常に簡単で、上記の手順と同じです。

import requests

UploadFacedata = {
    'api_key': 'XXXXXXXXXXX', # api_key
    'api_secret': 'XXXXXXXXXXX', # api_secret
    'faceset_token': 'XXXXXXXXXXX', # faceset_token
    'face_tokens': 'XXXXXXXXXXX' # face_token
}
UploadFaceRes = requests.post('https://api-cn.faceplusplus.com/facepp/v3/faceset/addface', data=UploadFacedata)
UploadFaceResJson = UploadFaceRes.json()
print(UploadFaceResJson)

これで顔認識の前期準備が完了しました。 Face++ のドキュメントセンターには、さまざまな API の詳細な説明がありますので、具体的な使用方法を確認できます。

メインプログラム#

天気情報の取得には、和風天気のリアルタイム天気 APIを使用します。https://devapi.qweather.com/v7/weather/now?location={都市ID}&key={key}に GET リクエストを送信して天気状況を取得します。顔比較は顔比較 APIを呼び出すことで実現します。同様にファイルをアップロードする必要があるため、multipart/form-dataメソッドを使用して API を POST 呼び出しします。完全なプログラムは以下の通りです:

import cv2
import time
import mediapipe as mp
import RPi.GPIO as GPIO
import requests
import urllib.request
import urllib.error
import time
import datetime
import json
import os

GPIO.setmode(GPIO.BCM)
GPIO.setup(20, GPIO.IN)

def getWeather(location, Key):
    # 天気を取得
    weatherData = requests.get(
        'https://devapi.qweather.com/v7/weather/now?location=' + location + '&key=' + Key
    )
    weatherJson = weatherData.json()
    nowTemperature = weatherJson['now']['temp']
    nowWeather = weatherJson['now']['text']
    nowFeelsLike = weatherJson['now']['feelsLike']
    nowWindDir = weatherJson['now']['windDir']
    nowWindScale = weatherJson['now']['windScale']
    return nowWeather, nowFeelsLike, nowTemperature, nowWindDir, nowWindScale

def bodyCheak():
    # 人体センサー情報
    if GPIO.input(20):
        return 1
    else:
        return 0

def faceCompare(imgURL):
    # 顔比較
    border = '----------%s' % hex(int(time.time() * 1000))
    postData = []
    postData.append('--%s' % border)
    postData.append('Content-Disposition: form-data; name="%s"\r\n' % 'api_key')
    postData.append('XXXXXXXXXXX') # api_key
    postData.append('--%s' % border)
    postData.append('Content-Disposition: form-data; name="%s"\r\n' % 'api_secret')
    postData.append('XXXXXXXXXXX') # api_secret
    postData.append('--%s' % border)
    fr = open(imgURL, 'rb')
    postData.append('Content-Disposition: form-data; name="%s"; filename=" "' % 'image_file1')
    postData.append('Content-Type: %s\r\n' % 'application/octet-stream')
    postData.append(fr.read())
    fr.close()
    postData.append('--%s' % border)
    postData.append('Content-Disposition: form-data; name="%s"\r\n' % 'face_token2')
    postData.append('XXXXXXXXXXX') # face_token
    postData.append('--%s--\r\n' % border)
    for i, d in enumerate(postData):
        if isinstance(d, str):
            postData[i] = d.encode('utf-8')
    http_body = b'\r\n'.join(postData)
    req = urllib.request.Request(
        url='https://api-cn.faceplusplus.com/facepp/v3/compare',
        data=http_body
    )
    req.add_header('Content-Type', 'multipart/form-data; boundary=%s' % border)
    try:
        res = urllib.request.urlopen(req, timeout=5)
        qrcont = res.read()
        resJson = json.loads(qrcont.decode('utf-8'))
        if 'confidence' in resJson:
            return resJson['confidence'], resJson['thresholds']['1e-5']
        else:
            print('顔が検出されませんでした')
            return 0, 100
    except urllib.error.HTTPError as e:
        print(e.read().decode('utf-8'))

weather, feelsLike, temperature, windDir, windScale = getWeather(
    location='XXXXXXXXXXX',
    Key='XXXXXXXXXXX'
)
pushText = 'Vinking、おはよう~ 今日は' + str(datetime.date.today().year) + '年' \
            + str(datetime.date.today().month) + '月' \
            + str(datetime.date.today().day) + '日' \
            + '、天気は' + weather + '、温度は' + temperature + '°、体感温度は' \
            + feelsLike + '°、' + windDir + windScale + '級の風が吹いています。新しい一日を頑張りましょう~'

while True:
    if bodyCheak():
        Capture = cv2.VideoCapture(0)
        mp_face_mesh = mp.solutions.face_mesh
        faceMesh = mp_face_mesh.FaceMesh(max_num_faces=1)
        faceDetectionTime, videosTime = 0, 0
        while True:
            success, videos = Capture.read()
            imgRGB = cv2.cvtColor(videos, cv2.COLOR_BGR2RGB)
            results = faceMesh.process(imgRGB)
            faces = []
            if results.multi_face_landmarks:
                for faceLms in results.multi_face_landmarks:
                    face = []
                    for id, lm in enumerate(faceLms.landmark):
                        ih, iw, ic = videos.shape
                        x, y = int(lm.x * iw), int(lm.y * ih)
                        # 顔の左輪郭特徴点
                        faceLeft = faceLms.landmark[234]
                        faceLeft_X = int(faceLeft.x * iw)
                        # 顔の上輪郭特徴点
                        faceTop = faceLms.landmark[10]
                        faceTop_Y = int(faceTop.y * ih)
                        # 顔の右輪郭特徴点
                        faceRight = faceLms.landmark[454]
                        faceRight_X = int(faceRight.x * iw)
                        # 顔の下輪郭特徴点
                        faceBottom = faceLms.landmark[152]
                        faceBottom_Y = int(faceBottom.y * ih)
                        face.append([x, y])
                    faces.append(face)
            videosTime += 1
            if len(faces) != 0:
                videosTime = 0
                faceDetectionTime += 1
            if faceDetectionTime >= 20:
                cv2.imwrite("./Face.jpg", videos[faceTop_Y: faceBottom_Y, faceLeft_X: faceRight_X])
                confidence, thresholds = faceCompare(imgURL='./Face.jpg')
                if confidence >= thresholds:
                    print('成功')
                    requests.get(
                        # WeChat通知
                        'https://speak.vinking.top/?text=[おはよう通知]' + pushText
                    )
                    if os.path.exists('./Face.jpg'):
                        os.remove('./Face.jpg')
                    Capture.release()
                    cv2.destroyAllWindows()
                    GPIO.cleanup()
                    exit(0)
                else:
                    print('失敗')
                    if os.path.exists('./Face.jpg'):
                        os.remove('./Face.jpg')
                    break
            elif videosTime >= 60:
                print('タイムアウト')
                break
        Capture.release()
    time.sleep(1)

毎日 7 時に定時実行するには、crontab -eコマンドを入力し、ファイルに0 7 * * * python3 {ファイルパス}と入力して、Ctrl + O で保存し、Ctrl + X で終了すれば OK です。

最後に#

ずっと一整套のスマートホームを持ちたいと思っていましたが、価格が少し高いため、別の方法で類似の機能を実現することにしました。そして、機能が成功して動作したときは非常に達成感があります。おそらく、直接購入して体験することでは得られない喜びの一種でしょう。

Ps. スマートホーム(例えば、小米のスマートプラグ)を同時に制御したい場合、ここで見つけた小米スマートデバイスの Python ライブラリを使えば、類似の機能を実現できるかもしれません。

この記事はMix Spaceによって xLog に同期更新されました。元のリンクはhttps://www.vinking.top/posts/daily/raspberry-pi-morning-alarm-weather-forecast

読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。