Pythonロボティクスコース レッスン 45
テーマ.11-4 日常で使う道具のIoT化
傘立てのIoT化をモチーフにしたロボットを制作しよう!
チャプター1
このレッスンで学ぶこと
テーマ.11-2とテーマ.11-3で学習した内容を踏まえて、日用品のIoT(Internet of Things)化をモチーフにしたロボット制作に取り組みます。今回、モチーフにするのは「傘立てのIoT化」です。人が近づいたことを感知すると、その日の気象予報を表示する機能と、さらに気象が雨の場合は折り畳み式の傘を渡す機能をもつ傘立てロボットをつくり、家の玄関に設置してみましょう。
チャプター2
新しいPython文法の学習
はじめに、今回作成するプログラムで新たに利用するMicroPythonのモジュールについて学習しましょう。
2. 1 ujsonモジュールを利用した辞書型オブジェクトとJSON文字列の相互変換
テーマ.11-2では、「JSON(JavaScript Object Notation)」というデータフォーマットについて学習しました。そのときに作成したプログラムでは、JSONで表された文字列からPythonの辞書型オブジェクトへの変換を行いました。
これから紹介するMicroPythonのujson
モジュールを使うと、その反対の変換を行うことができます。
まずは、ujson
モジュールで定義されている4つの関数を確認しましょう。
【 ujsonモジュールで定義されている関数 】
関数名 | 処理内容 |
---|---|
dump(obj, stream) | 辞書型のオブジェクト(obj )をJSON文字列に変換し、ファイルなどのストリーム※(stream )へ書き込む。 |
dumps(obj) | 辞書型のオブジェクト(obj )をJSON文字列に変換する。 |
load(stream) | ファイルなどのストリーム(stream )内のデータをJSON文字列として解釈し、辞書型のオブジェクトに変換する。データが正しいJSON形式になっていない場合はValueError が発生する。 |
loads(str) | JSON文字列として解釈し、辞書型のオブジェクトに変換する。データが正しいJSON形式になっていない場合はValueError が発生する。 |
※ ストリーム(stream)は英語で「小川」や「流れ」などの意味があります。IT分野では連続したデータの流れのことを指し、例えばデータの送受信や入出力の処理を連続して行うことを意味します。また、ファイルのようにデータの入出力を扱うオブジェクトを表す言葉としても使われます。
基本は辞書型オブジェクトからJSON文字列への変換を行う「dump
系の関数」と、その逆の変換を行う「load
系の関数」の2種類に大別できます。dump()
関数とdumps()
関数の違いは、ストリームに変換した結果を書き込むかどうかという点にあります。load()
関数とloads()
関数も同様にストリームを利用するかどうかという点に違いがあります。
では実際に、これらの関数を使ったコードを書いてみましょう。まずは、ある辞書型オブジェクトに対してdumps()
関数でJSON文字列に変換したあと、loads()
関数で再び辞書型オブジェクトに戻してみます。
【 サンプルコード 2-1-1 】
import ujson
# 入れ子構造になっている辞書型オブジェクトのサンプル
_dict = {"name": "Tanaka", "age": 16, "score": {"Mathematics": 82, "Physics": 76, "Chemistry": 96}}
# 辞書型オブジェクトをJSON文字列に変換する
data = ujson.dumps(_dict) # JSON文字列への変換
print(type(data), "\n", data) # 変換結果の表示
# JSON文字列を解析して辞書型のオブジェクトを返す
try: # 対象の文字列が正しいJSONのフォーマットになっていない場合はValueErrorが発生
data = ujson.loads(data) # 辞書型オブジェクトへの再変換
print(type(data), "\n", data) # 変換結果の表示
except ValueError: # エラー発生時はそのことを伝えるためのメッセージを表示
print("The data is not in JSON format.")
上のコードを実行すると、以下の結果がターミナルに表示されます。type()
関数でオブジェクトの型を確認すると、文字列型と辞書型を行き来したことが分かります。
(実行結果)
<class 'str'> {"name": "Tanaka", "age": 16, "score": {"Mathematics": 82, "Physics": 76, "Chemistry": 96}} <class 'dict'> {'score': {'Mathematics': 82, 'Physics': 76, 'Chemistry': 96}, 'name': 'Tanaka', 'age': 16}
今度は、同じ辞書型オブジェクトのサンプルに対して、dump()
関数とload()
関数でファイルへの入出力を兼ねた相互変換を行ってみましょう。
【 サンプルコード 2-1-2 】
import ujson
_dict = {"name": "Tanaka", "age": 16, "score": {"Mathematics": 82, "Physics": 76, "Chemistry": 96}}
# JSON文字列に変換し、ファイルへの書き込みを行う
file = open("example.json", mode="w+t")
ujson.dump(_dict, file) # JSON文字列としてファイルに書き込む
file.close() # 一度ファイルを閉じる
# ファイル内のデータをJSON文字列として解釈し、辞書型のオブジェクトを返す
file = open("example.json", mode="rt") # 同じファイルを読み取り専用のテキストモードで開く
try: # ファイル内のデータが正しいJSONフォーマットになっていない場合はValueErrorが発生
data = ujson.load(file) # ファイル内のデータを辞書型オブジェクトへ変換
print(type(data), "\n", data) # 変換結果の表示
except ValueError: # エラー発生時はそのことを伝えるためのメッセージを表示
print("The data is not in JSON format.")
file.close() # ファイルを閉じる
※ JSONデータのファイル拡張子は「.json
」です。
このコードを実行すると、ファイルにJSON文字列として書き込んだ辞書型オブジェクトを再び同じオブジェクトとして取り込めていることが確認できます。
(実行結果)
<class 'dict'> {'score': {'Mathematics': 82, 'Physics': 76, 'Chemistry': 96}, 'name': 'Tanaka', 'age': 16}
そのままでは外部ファイルに記録できないオブジェクトも、JSON文字列に変換することで保管が可能になります。このような目的にもJSONが利用できるということを覚えておきましょう。
チャプター3
気象予報データの取得とNTPを利用した時刻補正についての復習
テーマ.11-2では、世界気象機関(WMO)のWebサイトへHTTPでリクエストを送り、自分の住んでいる都市(または最も近い都市)の気象予報データをJSON形式で取得しました。また、テーマ.11-3ではNTPを利用して日本の国立研究開発法人情報通信研究機構(NICT)が運営するサーバーから協定世界時(UTC)を取得し、Studuino:bitのリアルタイムクロック(RTC)の時刻補正を行いました。このレッスンでは、これら2つのテーマを踏まえてロボットの制作を行います。制作を始める前にこのチャプターで、各テーマで学習した内容を簡単に振り返りましょう。
3. 1 気象予報データの取得方法
世界気象機関(WMO)のWebサイト内に存在する以下のURLに対し、HTTPのGETメソッドでリクエストを送信すると、1週間分の気象予報を含むJSONデータが取得できます。
https://worldweather.wmo.int/en/json/[City ID]_en.json
上記のURLの[City_ID]
には、世界各国の都市に割り当てられた番号を指定します。自分の住んでいる都市に割り当てられた番号は以下のページで確認できました。
https://worldweather.wmo.int/en/json/full_city_list.txt
※ 自分の住んでいる都市が一覧にない場合は最も距離が近い都市を選択してください。
日本では、以下の7つの都市に番号が割り当てられています。
【 日本の各都市に割り当てられた番号 】
都市名 | [City ID] |
---|---|
札幌("Sapporo" ) | 181 |
仙台("Sendai" ) | 182 |
東京("Tokyo" ) | 183 |
名古屋("Nagoya" ) | 355 |
大阪("Osaka" ) | 184 |
福岡("Fukuoka" ) | 185 |
那覇("Naha" ) | 186 |
また、気象予報データを表すJSONのフォーマットは以下のように定められていました。
【 日本の東京(Tokyo)を指定して取得したJSONデータ 】
{ "city":{ "lang":"en", # 言語(文字列) "cityName":"Tokyo", # 都市名(文字列) "cityLatitude":"35.680000000", # 経度(文字列) "cityLongitude":"139.770000000", # 緯度(文字列) "cityId":183, # 都市のID(数値) "isCapital":true, # その国の首都かどうか(ブール値) "stationName":"Tokyo", # 駅名または空港名(文字列) "tourismURL":"www.jnto.go.jp", # 観光局のURL(文字列) "tourismBoardName":"Japan National Tourist Organization", # 観光局の名前(文字列) "isDep":false, # 政治的に完全に独立していない地域かどうか(文字列) "timeZone":"+0900", # タイムゾーン(文字列) "isDST":"N", # 都市がサマータイムにあるかどうか(文字列) "member":{} # 世界気象機関の組織メンバーとしての情報(辞書型)※ 表記を省略しています "forecast":{ # 1週間の気象予報のデータ(辞書型) "issueDate":"2020-07-28 11:00:00", # 交付日時(文字列) "timeZone":"Local", # 表示されている日付のタイムゾーン(文字列) "forecastDay":"forecastDay":[ # 気象予報の全データ(リスト) { # リスト内は要素は下記の辞書型のデータになっている "forecastDate":"2020-07-29", # 予報日(文字列) "wxdesc":"", # 詳細に記述された気象情報(文字列) "weather":"Rain", # 気象(文字列) "minTemp":"24", # 最低気温-セ氏温度(文字列) "maxTemp":"27", # 最高気温-セ氏温度(文字列) "minTempF":"75", # 最低気温-華氏温度(文字列) "maxTempF":"81", # 最高気温-華氏温度(文字列) "weatherIcon":1401 # 気象に関するアイコンを表す番号(数値) }, . . . ] } "climate":{} # 気候に関するデータ ※ 表記を省略しています。 } }
フォーマットの詳細な定義については、以下のページ(英語版)で確認できます。
http://worldweather.wmo.int/en/json/WWIS_json_schema_v2.json
そして、JSONから辞書型オブジェクトへ変換することで、以下のように各情報へアクセスすることができました。
data["city"]["forecast"]["forecastDay"][0]["weather"] # 明日(または今日)の気象 data["city"]["forecast"]["forecastDay"][0]["maxTemp"] # 明日(または今日)の最高気温-セ氏温度 data["city"]["forecast"]["forecastDay"][0]["minTemp"] # 明日(または今日)の最低気温-セ氏温度
日本の場合、午前11:00(日本標準時)に翌日以降の気象予報を表すデータが新たに交付されます。そのため、午前0時から午前11時までに取得した場合は"forcastDay"
の0番目の要素はその日の気象予報を表しており、午前11時以降に取得した場合は翌日からの気象予報を表していることになります。
3. 2 リアルタイムクロックの時刻を補正する方法
現在の時刻からリアルタイムクロック(RTC)の時刻がズレている場合、「NTP(Netowork Time Protocol)」を利用して、その誤差を補正することができました。NTPはネットワーク内に存在しているコンピュータの時計を「世界協定時(UTC)」に同期させるための通信プロトコルで、クライアントは以下のデータフォーマットでNTPサーバーと通信し、時刻情報を取得することができます。
【 NTPのデータフォーマット 】
※ LIはうるう秒指示子、VNはNTPのバージョン、MODEは動作モードをそれぞれ表しています。
そして、このデータから受信タイムスタンプと送信タイムスタンプを利用して、以下の計算式でクライアントの時計とNTPサーバーの時計の時差を求めます。
【 NTPサーバーとの時差を求める計算式 】
T1
:クライアントがNTPサーバーへリクエストを発信した時刻。T2
:受信タイムスタンプ。NTPサーバーがリクエストを受け取った時刻。T3
:送信タイムスタンプ。NTPサーバーがクライアントへレスポンスを発信した時刻。T4
:クライアントがNTPサーバーからレスポンスを受け取った時刻。
NTPサーバーが返す時刻は協定世界時(UTC0)です。そのため、自分の住んでいる地域の時刻に合わせるためには、以下の標準時間帯(タイムゾーン)を考慮する必要がありました。
【 世界の標準時間帯 】
以上のことから、レッスンで作成したプログラムでは、以下の計算式でリアルタイムクロック(RTC)の時刻を補正しました。
補正後のRTCの時刻 = 現在のRTCの時刻 + offset + 自分の地域のタイムゾーンの時差
※ 実際のNTPでは、NTPサーバーの時刻も協定世界時からズレている可能性があるため、複数のNTPサーバーから時刻情報を取得して、少しずつ誤差を補正する複雑な仕組みになっています。
チャプター4
傘立てロボットの組立て
組立て説明書を確認して、以下の傘立てロボットを組み立てましょう。
4. 1 組み立てに必要なパーツ
【 パーツ一覧 】
- Studuino:bit×1
- ロボット拡張ユニット×1
- 電池ボックス×1
- DCモーター×1
- サーボモーター×2
- 赤外線フォトリフレクタ×1
- 超音波距離センサー×1
- センサー接続コード(3芯15cm)×1
- センサー接続コード(3芯30cm)×1
- ブロック基本四角(白)×6
- ブロック基本四角(グレー)×6
- ブロック基本四角(黒)×6
- ブロック基本四角(赤)×4
- ブロック三角(グレー)×4
- ブロック三角(赤)×4
- ブロックハーフA(グレー)×2
- ブロックハーフB(グレー)×2
- ブロックハーフC(白)×16
- ブロックハーフD(白)×10
- ステー×4
【 アーテックブロックの形状 】
4. 2 組立説明書
以下のリンク先から組立説明書を確認しましょう。
傘立てロボットの組立説明書ここまでできたらクリック
チャプター5
傘立てロボットのプログラムの作成
これから傘立てロボットのプログラムでは、次の4つの機能を実現します。
- NTPサーバーから協定世界時を取得して、内部の時計(リアルタイムクロック)の時刻を補正する機能
- 世界気象機関(WMO)から気象予報データを取得して、内部のテキストファイルに記録する機能
- 超音波距離センサで人が近づいたことを感知すると、2で記録したデータからその日の気象予報(気象マーク、最高気温、最低気温)をLEDディスプレイに表示する機能
- 3のときにもし雨だった場合は、折り畳み式傘を差しだす機能
1と2の機能は、それぞれテーマ.11-2とテーマ.11-3で作成したプログラムからコードを流用して作成します。3と4の機能は今回新たにコーディングします。
それでは、以下の手順に沿ってプログラムの作成を進めていきましょう。
5. 1 プログラムの作成手順
上で確認した4つの機能を実現するために、以下の関数を用意します。
関数名 | 説明 |
---|---|
give_umbrella() | 2つのサーボモーターを同期制御して、傘を差し出す動作を行う。 |
get_icon(weather) | 気象名(weather )を引数に受け取り、LEDディスプレイに表示するアイコンのイメージオブジェクトを返す。※テーマ.11-2で作成したプログラムから流用。 |
get_forecast(t=None) | WMOから気象予報データを取得して、テキストファイルに記録する。この関数はタイマ割込みを設定して、毎日決められた時間に実行する。※テーマ.11-2で作成したプログラムから流用。 |
inform_forecast() | get_forecast() 関数で取得したデータを参照して、その日の(時間帯によっては明日の)気象予報をLEDディスプレイに表示して知らせる。 |
correct_time(t=None) | NTPサーバーと通信し、リアルタイムクロック(RTC)の時刻補正を行う。この関数は、タイマ割込みを設定して、10分おきに実行する。※テーマ.11-3で作成したプログラムから流用。 |
main() | 起動時に実行するメイン関数。最初の時刻補正や気象予報データの取得だけでなく、超音波距離センサで人を感知する処理もこの関数にまとめる。 |
上の表の並びに沿って、それぞれの関数を定義していきましょう。
■ 傘を差し出す動作を行う関数の定義
今日の気象予報が雨だった場合は、おじぎをするような動きで傘を差しだします。そして、一定時間が経過した後に元の姿勢に戻ります。この動作では、2個のサーボモーターを同期制御しています。
サーボモーターの同期制御については、テーマ.8-1で学習しました。そのときに作成した以下のモジュールを今回のプログラムでも利用します。
【 テーマ.8-1で作成したサーボモーターの同期制御用モジュール「myservo.py」 】
import time
from pyatcrobo2.parts import Servomotor
class MyServomotor(Servomotor):
def __init__(self, pin):
super().__init__(pin)
self.angle = 90
super().set_angle(self.angle)
def set_angle(self, degree):
super().set_angle(degree)
self.angle = degree
def get_angle(self):
return self.angle
def syncRotateServos(duration, *args):
speeds = {}
for arg in args:
myservo = arg[0]
degree = arg[1]
diff = degree - myservo.get_angle()
speeds[id(myservo)] = diff/duration
for _ in range(duration):
for arg in args:
myservo = arg[0]
degree = myservo.get_angle() + speeds[id(myservo)]
myservo.set_angle(degree)
time.sleep_ms(1)
Studuino:bitからこのモジュールを削除している場合は、上記のコードを複製して「myservo(.py)」と名前を付けてファイルを保存し、Muエディタの「ファイル」機能をつかって、Studuino:bitにそのファイルをコピーしましょう。また、同じファイルは以下のリンクから取得することもできます。リンク上で右クリックをして、名前を付けて保存してください。
これで準備ができたので、関数を定義していきます。まずは、使用するモジュールやクラスをインポートして、各パーツの制御に必要なオブジェクトを作成しましょう。
import time
import myservo
myservo_p13 = myservo.MyServomotor("P13") # 正面から見て右側のサーボモーターのオブジェクト
myservo_p14 = myservo.MyServomotor("P14") # 正面から見て左側のサーボモーターのオブジェクト
次に、傘を差しだす動作をgive_umbrella()
関数としてまとめます。傘を差しだすときは、P13に接続した右のサーボモーターを45度に、P14に接続した左のサーボモーターを135度まで回転させます。
そして、これら2つのサーボモーターの同期制御を、myservo
モジュールのsyncRotateServos()
メソッドで行います。
myservo.syncRotateServos(duration, (MyServomotor1, degrees), (MyServomotor2, degrees), ....)
30秒後に元の姿勢(両方のサーボモーターが90度の位置)に戻す動作を加え、次のようにコードをまとめましょう。
追加【7行目~10行目】
import time
import myservo
myservo_p13 = myservo.MyServomotor("P13")
myservo_p14 = myservo.MyServomotor("P14")
def give_umbrella(): # 新しく関数を定義
myservo.syncRotateServos(500, (myservo_p13, 45), (myservo_p14, 135)) # おじぎをして傘を差しだす
time.sleep_ms(30000) # 30秒間待機
myservo.syncRotateServos(500, (myservo_p13, 90), (myservo_p14, 90)) # 元の姿勢に戻る
ここまでのプログラムを実行し、ターミナルから定義したgive_umbrella()
を呼び出して、動作を確認しておきましょう。
>>> give_umbrella()
■ 気象名から気象を表すアイコンのイメージを返す関数の定義
次は、気象名を引数に取り、LEDディスプレイに表示する気象を表すアイコンのイメージオブジェクト(StuduinoBitImage
クラスのインスタンス)を返す「get_icon()
関数」を定義します。この関数は、テーマ.11-2の課題で作成したものと同じです。気象を以下の5つの分類に分けてアイコンとなるイメージを設定します。
追加【3行目、6~16行目、19~23行目、35~46行目】
import time
import myservo
from pystubit.board import Image # イメージを扱うクラスのインポート
# 気象の分類
WORDS_FINE = {"No Rain", "Fine", "Sunny", "Clear", "Clearing", "Bright",
"Fair", "Mild"} # 晴れ
WORDS_RAINY = {"Thunderstorms", "Thundershowers", "Storm", "Showers",
"Heavy Showers", "Rainshower", "Occasional Showers",
"Scattered Showers", "Isolated Showers", "Light Showers",
"Freezing Rain", "Rain", "Drizzle", "Light Rain", "Squall"} # 雨
WORDS_CLOUDY = {"Cloudy", "Partly Cloudy", "Sunny Intervals",
"Mostly Cloudy", "Partly Bright"} # くもり
WORDS_SNOWY = {"Snow", "Flurries", "Heavy Snow", "Snowfall", "Light Snow",
"Blizzard", "Blowing Snow", "Hail", "Snow Showers",
"Snowstorm", "Snowdrift"} # 雪
# 分類した各気象を表すアイコンのイメージオブジェクト
IMG_FINE = Image("01110:11111:11111:11111:01110:", color=(30, 10, 0)) # 晴れ
IMG_RAINY = Image("00100:01110:11111:00100:01100:", color=(0, 10, 30)) # 雨
IMG_CLOUDY = Image("00000:01110:11111:01110:00000:", color=(15, 15, 15)) # くもり
IMG_SNOWY = Image("10101:01110:11011:01110:10101:", color=(30, 30, 30)) # 雪
IMG_OTHERS = Image("00000:00000:01110:00000:00000:", color=(10, 5, 15)) # その他
myservo_p13 = myservo.MyServomotor("P13")
myservo_p14 = myservo.MyServomotor("P14")
def give_umbrella():
myservo.syncRotateServos(500, (myservo_p13, 45), (myservo_p14, 135))
time.sleep_ms(30000)
myservo.syncRotateServos(500, (myservo_p13, 90), (myservo_p14, 90))
def get_icon(weather): # 気象を表すアイコンのイメージを返す関数
if weather in WORDS_FINE:
img = IMG_FINE
elif weather in WORDS_RAINY:
img = IMG_RAINY
elif weather in WORDS_CLOUDY:
img = IMG_CLOUDY
elif weather in WORDS_SNOWY:
img = IMG_SNOWY
else:
img = IMG_OTHERS
return img
■ 気象予報データを取得する関数の定義
今度は、世界気象機関(WMO)から気象予報を取得して、その結果をStuduino:bit内にテキストファイルとして保存する関数を定義します。
プログラムの先頭でHTTPリクエスト用のモジュールとWi-Fi接続用のクラスをインポートします。そして、Wi-FiのSSIDとパスワード、それから気象を取得したい都市に割り当てられたIDを定数として定義しましょう。
追加【2行目、5行目、7~9行目】
import time
import urequests # HTTPリクエスト用のモジュール
import myservo
from pystubit.board import Image
from mywifi import MyWifi # テーマ.11-1で作成したWi-Fi接続用のクラス
SSID = "SSID" # Wi-FiのSSID
PASSWORD = "PASSWORD" # Wi-Fiのパスワード
CITY_ID = 183 # 気象予報データを取得したい都市のIDを設定。183は日本の東京。
次に、インポートしたWi-Fi接続用のクラスのオブジェクトを作成します。
myservo_p13 = myservo.MyServomotor("P13")
myservo_p14 = myservo.MyServomotor("P14")
wifi = MyWifi() # Wi-Fi接続用のオブジェクト
get_forecast()
関数を新たに定義します。WMOのWebサイトにHTTPリクエストを送り、気象予報を取得して、テキストファイルに記録するコードを次のようにまとめましょう。
追加【56~68行目】
def get_forecast(t=None): # 関数を新たに定義。タイマ割込みで呼び出すため引数「t」を設定。
if not wifi.isconnected(): # Wi-Fi未接続の場合は接続を試みる
if not wifi.connect(SSID, PASSWORD):
return # 接続に失敗した場合はここで処理を終える
url_format = "https://worldweather.wmo.int/en/json/[City ID]_en.json" # 気象予報を取得するページのURLのフォーマット
url = url_format.replace("[City ID]", str(CITY_ID)) # URLの一部を指定した都市IDに変換する
res = urequests.get(url) # GETメソッドでリクエストを送り、レスポンスを受け取る
file = open("weather_report.json", mode="w+t") # 読み書き可能なテキストモードで開く
file.write(res.text) # ファイルにデコーディングされたレスポンスボディを書き込む(中身はJSON文字列)
file.close() # ファイルを閉じる
動作確認のため、ここまでのプログラムを実行し、ターミナルからget_forecast()
関数を呼び出しましょう。そのあとで、作成されたファイル(weather_report.json
)を読み取り専用のテキストモード(mode="rt"
)で開き、中身を表示してみましょう。確認後はファイルを閉じるのを忘れないように注意してください。
>>> get_forecast() . . . Connected. ('172.20.10.2', '255.255.255.240', '172.20.10.1', '172.20.10.1') >>> file = open("weather_report.json", mode="rt") >>> print(file.read()) ----ここに取得した気象予報データが表示される---- >>> file.close()
■ 気象予報をLEDディスプレイに表示する関数の定義
get_forecast()
関数でテキストファイルに記録したJSON文字列を辞書型オブジェクトへ変換して、今日(または明日)の「気象」「最高気温(セ氏温度)」「最低気温(セ氏温度)」を参照し、LEDディスプレイに表示します。
まずはプログラムの先頭で、JSON文字列を扱うモジュールとLEDディスプレイの制御オブジェクトをインポートしましょう。
import time
import urequests
import ujson # JSON文字列を辞書型のオブジェクトへ変換するために利用
import myservo
from pystubit.board import Image, display # LEDディスプレイの制御用オブジェクト
from mywifi import MyWifi
次にwith
構文を用いて、Studuino:bit内に保存されているテキストファイルを開きます。そして、そのファイルストリームをujson
モジュールのload()
関数に渡して、JSON文字列から辞書型オブジェクトに変換します。そして、変換で得られた辞書型のオブジェクトから「気象:"weather"
」「最高気温:"maxTemp"
」「最低気温:"minTemp"
」を参照して、それぞれ変数に格納します。
追加【72~77行目】
def inform_forecast(): # 今日の気象予報を知らせる関数
with open("weather_report.json", mode="rt") as file: # 読み取り専用のテキストモードで開く
data = ujson.load(file) # ファイルストリームをJSONとして解釈
weather = data["city"]["forecast"]["forecastDay"][0]["weather"] # 気象
max_temp = data["city"]["forecast"]["forecastDay"][0]["maxTemp"] # 最高気温
min_temp = data["city"]["forecast"]["forecastDay"][0]["minTemp"] # 最低気温
get_icon()
関数を利用して、気象からアイコンのイメージオブジェクトを取得します。そして、3つの情報を順番にLEDディスプレイに表示しましょう。
追加【78~81行目】
def inform_forecast():
with open("weather_report.json", mode="rt") as file:
data = ujson.load(file)
weather = data["city"]["forecast"]["forecastDay"][0]["weather"]
max_temp = data["city"]["forecast"]["forecastDay"][0]["maxTemp"]
min_temp = data["city"]["forecast"]["forecastDay"][0]["minTemp"]
img = get_icon(weather) # 気象からアイコンのイメージを取得
display.show(img, delay=3000) # イメージの表示
display.scroll("max:"+max_temp, delay=25, color=(31, 0, 0)) # 最高気温の表示(赤色)
display.scroll("min:"+min_temp, delay=25, color=(0, 0, 31)) # 最低気温の表示(青色)
気象が雨だった場合は、最後に傘を渡します。上で定義したgive_umbrella()
関数を実行しましょう。
追加【82・83行目】
def inform_forecast():
with open("weather_report.json", mode="rt") as file:
data = ujson.load(file)
weather = data["city"]["forecast"]["forecastDay"][0]["weather"]
max_temp = data["city"]["forecast"]["forecastDay"][0]["maxTemp"]
min_temp = data["city"]["forecast"]["forecastDay"][0]["minTemp"]
img = get_icon(weather)
display.show(img, delay=3000)
display.scroll("max:"+max_temp, delay=25, color=(31, 0, 0))
display.scroll("min:"+min_temp, delay=25, color=(0, 0, 31))
if weather in WORDS_RAINY: # 雨の場合
give_umbrella() # 傘を差し出す
ここまでのプログラムを実行し、inform_forecast()
関数をターミナルから呼び出して、動作を確認しておきましょう。
>>> inform_forecast()
■ リアルタイムクロックの時刻補正を行う関数の定義
この関数は、テーマ.11-3で作成したものと同じです。まず、プログラムの先頭で必要なモジュールやクラスをインポートします。
追加【4~6行目】
import time
import urequests
import ujson
import usocket # ソケットを利用した通信を行うために利用
import ubinascii # バイト列から数値に変換するために利用
from machine import RTC # リアルタイムクロックのクラス
import myservo
from pystubit.board import Image, display
from mywifi import MyWifi
NTPサーバーは日本の国立研究開発法人情報通信研究機構(NICT)を選択します。このサーバーのドメインは「ntp.nict.jp」です。
また、NTPサーバーから取得する協定世界時(UTC)は、1900-1-1 0:0:0
を開始時点として、これまでに経過した時間(単位は秒)で現在の時刻が表されていました。一方でStuduino:bitのリアルタイムクロックは、2000-1-1 0:0:0
を開始時点としています。そのため、1900-1-1 0:0:0
から2000-1-1 0:0:0
までに経過した時間を差し引いてリアルタイムクロックに設定する時刻を求めます。
さらに、現地の時刻に合わせるためには「標準時間帯(タイムゾーン)」を考慮する必要もありました。これら2つの時間の補正値を定数として定義しましょう。
追加【14~16行目】
SSID = "SSID"
PASSWORD = "PASSWORD"
CITY_ID = 183
NTP_SERVER = "ntp.nict.jp" # 利用するNTPサーバーのドメイン
DELTA = 3155673600 # 1900-1-1 0:0:0 ~ 2000-1-1 0:0:0 の間に経過した時間(単位は秒)
UTC_TIMEZONE_JP = 32400 # UTC0とUTC+9(日本)のタイムゾーンの時刻差(単位は秒)
続けてリアルタイムクロックのオブジェクトを作成します。
追加【40行目】
myservo_p13 = myservo.MyServomotor("P13")
myservo_p14 = myservo.MyServomotor("P14")
wifi = MyWifi()
rtc = RTC() # リアルイムクロックのオブジェクト
テーマ.11-3で作成したプログラムと同じコードでcorrect_time()
関数を定義します。この関数は、初回起動時を除き、タイマ割込みによって10分おきに呼び出します。
def correct_time(t=None): # タイマ割込み処理で呼び出されたときは、引数「t」に呼び出し元のタイマオブジェクトが渡される。
if not wifi.isconnected(): # Wi-Fi未接続の場合は、接続を試みる
if not wifi.connect(SSID, PASSWORD):
return # Wi-Fiの接続に失敗した場合はここで処理を終える
addrinfo = usocket.getaddrinfo(NTP_SERVER, 123) # NTPサーバーのアドレス情報の取得
sockaddr = addrinfo[0][-1] # ソケットのコネクション確立に必要なアドレス情報のみ抽出
s = usocket.socket(usocket.AF_INET, usocket.SOCK_DGRAM) # ソケットの作成
s.connect(sockaddr) # ソケットのコネクションの確立
data = bytearray(48) # 48バイトのバイト列データの作成
data[0] = 0x23 # 先頭の1バイトでうるう秒指示子(2ビット)、NTPのバージョン(3ビット)、動作モード(3ビット)の情報を指定
client_transmit = time.time() # リクエストの送信時刻(RTC)の取得
s.send(data) # ソケットを利用したリクエストの送信
msg = s.recv(48) # ソケットを利用したレスポンスデータの取得
client_receive = time.time() # レスポンスの受信時刻(RTC)の取得
s.close() # ソケットを閉じる
server_receive = int(ubinascii.hexlify(msg[32:36]), 16) - DELTA # サーバーのリクエストの受信時刻(整数部のみ)を10進数に変換
server_transmit = int(ubinascii.hexlify(msg[40:44]), 16) - DELTA # サーバーのレスポンスの送信時刻(整数部のみ)を10進数に変換
offset = ((server_receive - client_transmit) + (server_transmit - client_receive)) / 2 # RTCと協定世界時の時間差を計算
current_time = time.time() + offset + UTC_TIMEZONE_JP # 補正後の現在時刻を求める
datetime = time.localtime(int(current_time)) # 秒数から日付・時刻の情報に変換
datetime = datetime[:3] + (datetime[6],) + datetime[3:6] + (0,) # RTCの日付・時刻のフォーマットに合わせて要素の並び順を入れ替える
rtc.init(datetime) # RTCに補正後の時刻を設定する
ここまでのプログラムを実行し、ターミナルからcorrect_time()
関数を呼び出します。リアルタイムクロックのdatetime()
メソッドで現在の日付・時刻を取得し、正しく時刻補正が行えていることを確認しておきましょう。
(実行結果の例)
>>> correct_time() >>> print(rtc.datetime()) (2020, 8, 7, 4, 15, 13, 14, 333870)
■ メイン関数の定義
起動時に実行するメイン関数では、以下の処理を順番に行います。
- リアルタイムクロックの時刻補正を行う
- タイマ割込みで10分おきに時刻補正を行うように設定する
- 気象予報データを取得する
- タイマ割込みで次回の気象予報データの取得を設定する
- 超音波距離センサで人の通過を感知する
- 人を感知した場合はLEDディスプレイにその日の(時間帯によっては明日の)気象予報を表示する
- 無限ループで5と6の処理を繰り返す
気象予報データの取得は起動時を除き、1日に1回のみ行います。あらかじめ何時何分に行うか決めておきましょう。例えば日本の場合、毎日午前11時(日本標準時)に気象予報データが更新されます。そこで、少し余裕を見て「午前11時10分」に設定しておくと、最新の気象予報データが取得できます。
まずは、プログラムの先頭でタイマ割込み用のクラスと超音波距離センサのクラスをインポートしましょう。
追加【6行目、9行目】
import time
import urequests
import ujson
import usocket
import ubinascii
from machine import RTC, Timer # タイマ割込み用のクラス
import myservo
from pystubit.board import Image, display
from pyatcrobo2.parts import UltrasonicSensor # 超音波距離センサのクラス
from mywifi import MyWifi
気象予報データを取得する時刻を定数で定義します。ここでは、比較しやすいように秒単位に変換しておきましょう。
追加【18行目】
SSID = "SSID"
PASSWORD = "PASSWORD"
CITY_ID = 183
NTP_SERVER = "ntp.nict.jp"
DELTA = 3155673600
UTC_TIMEZONE_JP = 32400
WEATHER_REPORT_REQUEST_TIME = 11*3600 + 10*60 # 気象予報を取得する時刻(11:10:00)
続けて、割込み処理用のタイマオブジェクトと超音波距離センサのオブジェクトを作成します。タイマオブジェクトは、時刻補正用と気象予報データ取得用にそれぞれ用意します。
追加【43~45行目】
myservo_p13 = myservo.MyServomotor("P13")
myservo_p14 = myservo.MyServomotor("P14")
wifi = MyWifi()
rtc = RTC()
timer_for_rtc = Timer(1) # 時刻補正の割込み処理用のタイマオブジェクト
timer_for_weather = Timer(2) # 気象予報データ取得の割込み処理用のタイマオブジェクト
us = UltrasonicSensor("P0") # 超音波距離センサの制御用オブジェクト
main()
関数を新たに定義します。まずはリアルタイムクロックの時刻補正と、10分おきのタイマ割込みを設定しましょう。
追加【121~123行目】
def main():
correct_time() # リアルタイムクロックの時刻補正を行う
timer_for_rtc.init(mode=Timer.PERIODIC, period=600000, callback=correct_time) # タイマ割込みで10分おきの時刻補正を設定
次に、気象予報データの取得と次回実行分をタイマ割り込みで設定します。次回実行するまでの時間は現在の時刻によって異なります。まだその日の取得予定時刻を過ぎていない場合は、その時刻との時間差を求めます。既に過ぎている場合は、翌日の取得予定時刻との時間差を求めましょう。
追加【124~131行目】
def main():
correct_time()
timer_for_rtc.init(mode=Timer.PERIODIC, period=600000, callback=correct_time)
get_forecast() # 気象予報データの取得
datetime = rtc.datetime() # 現在時刻の取得
now = datetime[4] * 3600 + datetime[5] * 60 + datetime[6] # 現在時刻を秒に換算
if now < WEATHER_REPORT_REQUEST_TIME: # 今日の気象予報の取得予定時刻を過ぎていない場合
diff = WEATHER_REPORT_REQUEST_TIME - now # その時刻との時間差を求める
else: # 既に過ぎている場合
diff = (WEATHER_REPORT_REQUEST_TIME + 86400) - now # 翌日の予定時刻との時間差を求める
timer_for_weather.init(mode=Timer.ONE_SHOT, period=diff*1000, callback=get_forecast) # 1回のみのタイマ割込み処理を設定。periodにはミリ秒単位で時間を設定する必要があることに注意。
また、これ以降のタイマ割込みでの呼び出しの設定は、get_forecast()
関数内で行います。起動時の1回を除き、この関数はタイマ割込みで呼び出されます。そこで、引数t
にタイマオブジェクトが渡されているときだけ、翌日の同じ時間にタイマ割込みを設定する処理を追加しましょう。
追加【69・70行目】
def get_forecast(t=None):
if t: # タイマ割込みで呼び出された場合は、翌日の同じ時間(24時間後)にタイマ割込みを設定
t.init(mode=Timer.ONE_SHOT, period=86400*1000, callback=get_forecast)
if not wifi.isconnected():
if not wifi.connect(SSID, PASSWORD):
return
url_format = "https://worldweather.wmo.int/en/json/[City ID]_en.json"
url = url_format.replace("[City ID]", str(CITY_ID))
res = urequests.get(url)
file = open("weather_report.json", mode="w+t")
file.write(res.text)
file.close()
while
文で無限ループをつくり、繰り返し超音波距離センサの値を取得します。使用する場所に合わせて人を感知したと判断する距離を決めておきましょう。そして人を感知したら、inform_forecast()
関数を実行します。最後にmain()
関数を実行するコードを追加してプログラムの完成です。
追加【135~139行目、142行目】
def main():
correct_time()
timer_for_rtc.init(mode=Timer.PERIODIC, period=600000, callback=correct_time)
get_forecast()
datetime = rtc.datetime()
now = datetime[4] * 3600 + datetime[5] * 60 + datetime[6]
if now < WEATHER_REPORT_REQUEST_TIME:
diff = WEATHER_REPORT_REQUEST_TIME - now
else:
diff = (WEATHER_REPORT_REQUEST_TIME + 86400) - now
timer_for_weather.init(mode=Timer.ONE_SHOT, period=diff*1000, callback=get_forecast)
while True: # 無限ループ
d = us.get_distance() # 超音波距離センサの値を取得
if d > 0 and d < 30: # 人を感知した(この例では距離が30cm未満の)場合
inform_forecast() # 気象予報をLEDディスプレイに表示する
time.sleep_ms(20) # 超音波距離センサを使用する場合は次に値を取得するまで少し時間を空ける必要がある
main() # メイン関数の実行
5. 2 完成プログラムの動作確認
完成したプログラム例を以下に示します。実行して動作を確認しましょう。
【 サンプルコード 5-3-1 】
import time
import urequests
import ujson
import usocket
import ubinascii
from machine import RTC, Timer
import myservo
from pystubit.board import Image, display
from pyatcrobo2.parts import UltrasonicSensor
from mywifi import MyWifi
SSID = "SSID"
PASSWORD = "PASSWORD"
CITY_ID = 183
NTP_SERVER = "ntp.nict.jp"
DELTA = 3155673600
UTC_TIMEZONE_JP = 32400
WEATHER_REPORT_REQUEST_TIME = 11*3600 + 10*60
WORDS_FINE = {"No Rain", "Fine", "Sunny", "Clear", "Clearing", "Bright",
"Fair", "Mild"}
WORDS_RAINY = {"Thunderstorms", "Thundershowers", "Storm", "Showers",
"Heavy Showers", "Rainshower", "Occasional Showers",
"Scattered Showers", "Isolated Showers", "Light Showers",
"Freezing Rain", "Rain", "Drizzle", "Light Rain", "Squall"}
WORDS_CLOUDY = {"Cloudy", "Partly Cloudy", "Sunny Intervals",
"Mostly Cloudy", "Partly Bright"}
WORDS_SNOWY = {"Snow", "Flurries", "Heavy Snow", "Snowfall", "Light Snow",
"Blizzard", "Blowing Snow", "Hail", "Snow Showers",
"Snowstorm", "Snowdrift"}
IMG_FINE = Image("01110:11111:11111:11111:01110:", color=(30, 10, 0))
IMG_RAINY = Image("00100:01110:11111:00100:01100:", color=(0, 10, 30))
IMG_CLOUDY = Image("00000:01110:11111:01110:00000:", color=(15, 15, 15))
IMG_SNOWY = Image("10101:01110:11011:01110:10101:", color=(30, 30, 30))
IMG_OTHERS = Image("00000:00000:01110:00000:00000:", color=(10, 5, 15))
myservo_p13 = myservo.MyServomotor("P13")
myservo_p14 = myservo.MyServomotor("P14")
wifi = MyWifi()
rtc = RTC()
timer_for_rtc = Timer(1)
timer_for_weather = Timer(2)
us = UltrasonicSensor("P0")
def give_umbrella():
myservo.syncRotateServos(500, (myservo_p13, 45), (myservo_p14, 135))
time.sleep_ms(30000)
myservo.syncRotateServos(500, (myservo_p13, 90), (myservo_p14, 90))
def get_icon(weather):
if weather in WORDS_FINE:
img = IMG_FINE
elif weather in WORDS_RAINY:
img = IMG_RAINY
elif weather in WORDS_CLOUDY:
img = IMG_CLOUDY
elif weather in WORDS_SNOWY:
img = IMG_SNOWY
else:
img = IMG_OTHERS
return img
def get_forecast(t=None):
if t:
t.init(mode=Timer.ONE_SHOT, period=86400*1000, callback=get_forecast)
if not wifi.isconnected():
if not wifi.connect(SSID, PASSWORD):
return
url_format = "https://worldweather.wmo.int/en/json/[City ID]_en.json"
url = url_format.replace("[City ID]", str(CITY_ID))
res = urequests.get(url)
file = open("weather_report.json", mode="w+t")
file.write(res.text)
file.close()
def inform_forecast():
with open("weather_report.json", mode="rt") as file:
data = ujson.load(file)
weather = data["city"]["forecast"]["forecastDay"][0]["weather"]
max_temp = data["city"]["forecast"]["forecastDay"][0]["maxTemp"]
min_temp = data["city"]["forecast"]["forecastDay"][0]["minTemp"]
img = get_icon(weather)
display.show(img, delay=3000)
display.scroll("max:"+max_temp, delay=25, color=(31, 0, 0))
display.scroll("min:"+min_temp, delay=25, color=(0, 0, 31))
if weather in WORDS_RAINY:
give_umbrella()
def correct_time(t=None):
if not wifi.isconnected():
if not wifi.connect(SSID, PASSWORD):
return
addrinfo = usocket.getaddrinfo(NTP_SERVER, 123)
sockaddr = addrinfo[0][-1]
s = usocket.socket(usocket.AF_INET, usocket.SOCK_DGRAM)
s.connect(sockaddr)
data = bytearray(48)
data[0] = 0x23
client_transmit = time.time()
s.send(data)
msg = s.recv(48)
client_receive = time.time()
s.close()
server_receive = int(ubinascii.hexlify(msg[32:36]), 16) - DELTA
server_transmit = int(ubinascii.hexlify(msg[40:44]), 16) - DELTA
offset = ((server_receive - client_transmit) + (server_transmit - client_receive)) / 2
current_time = time.time() + offset + UTC_TIMEZONE_JP
datetime = time.localtime(int(current_time))
datetime = datetime[:3] + (datetime[6],) + datetime[3:6] + (0,)
rtc.init(datetime)
def main():
correct_time()
timer_for_rtc.init(mode=Timer.PERIODIC, period=600000, callback=correct_time)
get_forecast()
datetime = rtc.datetime()
now = datetime[4] * 3600 + datetime[5] * 60 + datetime[6]
if now < WEATHER_REPORT_REQUEST_TIME:
diff = WEATHER_REPORT_REQUEST_TIME - now
else:
diff = (WEATHER_REPORT_REQUEST_TIME + 86400) - now
timer_for_weather.init(mode=Timer.ONE_SHOT, period=diff*1000, callback=get_forecast)
while True:
d = us.get_distance()
if d > 0 and d < 30:
inform_forecast()
time.sleep_ms(20)
main()
チャプター6
課題:傘立てロボットの多機能化
【 サンプルコード 5-3-1 】では、どの時間帯に玄関を通っても、その日または翌日の気象予報を表示します。しかし、実際の利用シーンを考えると、気象予報を知りたいのは、ほとんどが朝の出かける時間帯です。むしろその他の時間帯については、次のような機能に切り替わっている方が便利です。
- 夕方に家へ帰宅する時間帯
「センサー式ライト機能」:人を感知するとLEDディスプレイが一定時間明るく点灯する。 - 就寝中の時間帯
「防犯機能」:人を感知すると警告として、ブザーから繰り返しアラーム音を鳴らす。
そこで、この課題では上の2つの機能を新たに追加します。そして、時間帯によって実行する機能が切り替わるようにプログラムを改造してみましょう。
6. 1 プログラム作成のヒント
まずは、自分の生活を振り返り、どの時間帯にどの機能を実行すると便利になるのかを考えましょう。
【 時間帯の設定例 】
また、センサー式ライト機能と防犯機能については、以下の単体でのコード例を参考にして追加してみましょう。
【 センサー式ライト機能単体のコード例 】
from pystubit.board import Image, display
from pyatcrobo2.parts import UltrasonicSensor
us = UltrasonicSensor("P0")
img_all = Image("11111:11111:11111:11111:11111")
def light_on():
display.show(img_all, delay=30000, clear=True, color=(31, 31, 31))
while True:
d = us.get_distance()
if d > 0 and d < 30:
light_on()
time.sleep_ms(20)
【 防犯機能単体のコード例 】
from pystubit.board import buzzer
from pyatcrobo2.parts import UltrasonicSensor
def alarm():
while True: # Studuino:bitがリセットされるまで繰り返す
buzzer.on("C7", duration=200)
time.sleep_ms(100)
while True:
d = us.get_distance()
if d > 0 and d < 30:
alarm()
time.sleep_ms(20)
6. 2 プログラムの作成手順
ここでは、以下のように3つの時間帯を設定した場合のプログラムの作成手順を説明します。
- 朝の出かける時間帯:7:30~8:30
- 夕方の帰宅する時間帯:18:00~19:30
- 夜の就寝中の時間帯:23:00~翌日の6:00
■ 3つの時間帯の定義
まず、3つの時間帯を(開始時刻、終了時刻)
のタプル形式で定数として定義します。また、開始時刻と終了時刻はどちらも秒に換算しておきましょう。
SSID = "SSID"
PASSWORD = "PASSWORD"
CITY_ID = 183
NTP_SERVER = "ntp.nict.jp"
DELTA = 3155673600UTC_TIMEZONE_JP = 32400
PERIOD_GO_OUT = (7*3600+30*60, 8*3600+30*60) # 朝出かける時間帯(7:30 ~ 8:30)
PERIOD_RETURN_HOME = (18*3600, 19*3600+30*60) # 夕方家に戻る時間帯(18:00 ~ 19:30)
PERIOD_SLEEP = (23*3600, 6*3600) # 夜就寝中の時間帯(23:00 ~ 翌日の6:00)
■ センサー式ライト機能用の関数の定義
LEDディスプレイ全体を一定時間(30秒など)白色で明るく点灯させる処理をlight_on()
関数としてまとめます。そのときに必要な全灯のイメージオブジェクトも用意しておきましょう。
追加【47行目、125・126行目】
myservo_p13 = myservo.MyServomotor("P13")
myservo_p14 = myservo.MyServomotor("P14")
wifi = MyWifi()
rtc = RTC()
timer_for_rtc = Timer(1)
timer_for_weather = Timer(2)
us = UltrasonicSensor("P0")
img_all = Image("11111:11111:11111:11111:11111") # LEDディスプレイを全灯するためのイメージ
def light_on(): # LEDディスプレイを一定時間白色で明るく点灯させる関数
display.show(img_all, delay=30000, clear=True, color=(31, 31, 31))
■ 防犯機能用の関数の定義
防犯用のアラーム音をブザーから繰り返し鳴らす処理をalarm()
関数としてまとめます。このアラーム音はStuduino:bitがリセットされるまで続くようにしておきましょう。
変更【8行目、129~132行目】
import time
import urequests
import ujson
import usocket
import ubinascii
from machine import RTC, Timer
import myservo
from pystubit.board import Image, display, buzzer # ブザー制御用オブジェクトの追加
from pyatcrobo2.parts import UltrasonicSensor
from mywifi import MyWifi
def alarm(): # Studuino:bitがリセットされるまでアラーム音を繰り返す関数
while True:
buzzer.on("C7" , duration=200)
time.sleep_ms(100)
■ メイン関数の変更
最後にmain()
関数を変更します。人を感知したときに、リアルタイムクロックから現在の時刻を取得し、どの時間帯にあるかを確認して、実行する機能を選択するようにしましょう。
変更・追加【150~157行目】
def main():
correct_time()
timer_for_rtc.init(mode=Timer.PERIODIC, period=600000, callback=correct_time)
get_forecast()
datetime = rtc.datetime()
now = datetime[4] * 3600 + datetime[5] * 60 + datetime[6]
if now < WEATHER_REPORT_REQUEST_TIME:
diff = WEATHER_REPORT_REQUEST_TIME - now
else:
diff = (WEATHER_REPORT_REQUEST_TIME + 86400) - now
timer_for_weather.init(mode=Timer.ONE_SHOT, period=diff*1000, callback=get_forecast)
while True:
d = us.get_distance()
if d > 0 and d < 30:
datetime = rtc.datetime() # RTCから現在時刻を取得
now = datetime[4] * 3600 + datetime[5] * 60 + datetime[6] # 現在時刻を秒に換算
if now >= PERIOD_GO_OUT[0] and now <= PERIOD_GO_OUT[1]: # 朝でかける時間帯のとき
inform_forecast() # 気象予報の表示機能を実行
elif now >= PERIOD_RETURN_HOME[0] and now <= PERIOD_RETURN_HOME[1]: # 夕方帰宅する時間帯のとき
light_on() # センサー式ライト機能を実行
elif now >= PERIOD_SLEEP[0] or now <= PERIOD_SLEEP[1]: # 夜就寝中の時間帯のとき
alarm() # 防犯機能を実行
time.sleep_ms(20)
■ プログラムの完成例
以上でプログラムの完成です。動作テストを行うときは、実際に使用するときの時間帯から範囲を変更しながら実行して、すべての機能に問題がないことを確認しましょう。
【 サンプルコード 6-2-1 】
import time
import urequests
import ujson
import usocket
import ubinascii
from machine import RTC, Timer
import myservo
from pystubit.board import Image, display, buzzer
from pyatcrobo2.parts import UltrasonicSensor
from mywifi import MyWifi
SSID = "SSID"
PASSWORD = "PASSWORD"
CITY_ID = 183
NTP_SERVER = "ntp.nict.jp"
DELTA = 3155673600
UTC_TIMEZONE_JP = 32400
WEATHER_REPORT_REQUEST_TIME = 11*3600 + 10*60
PERIOD_GO_OUT = (7*3600, 8*3600)
PERIOD_RETURN_HOME = (18*3600, 19*3600)
PERIOD_SLEEP = (23*3600+30*60, 6*3600)
WORDS_FINE = {"No Rain", "Fine", "Sunny", "Clear", "Clearing", "Bright",
"Fair", "Mild"}
WORDS_RAINY = {"Thunderstorms", "Thundershowers", "Storm", "Showers",
"Heavy Showers", "Rainshower", "Occasional Showers",
"Scattered Showers", "Isolated Showers", "Light Showers",
"Freezing Rain", "Rain", "Drizzle", "Light Rain", "Squall"}
WORDS_CLOUDY = {"Cloudy", "Partly Cloudy", "Sunny Intervals",
"Mostly Cloudy", "Partly Bright"}
WORDS_SNOWY = {"Snow", "Flurries", "Heavy Snow", "Snowfall", "Light Snow",
"Blizzard", "Blowing Snow", "Hail", "Snow Showers",
"Snowstorm", "Snowdrift"}
IMG_FINE = Image("01110:11111:11111:11111:01110:", color=(30, 10, 0))
IMG_RAINY = Image("00100:01110:11111:00100:01100:", color=(0, 10, 30))
IMG_CLOUDY = Image("00000:01110:11111:01110:00000:", color=(15, 15, 15))
IMG_SNOWY = Image("10101:01110:11011:01110:10101:", color=(30, 30, 30))
IMG_OTHERS = Image("00000:00000:01110:00000:00000:", color=(10, 5, 15))
myservo_p13 = myservo.MyServomotor("P13")
myservo_p14 = myservo.MyServomotor("P14")
wifi = MyWifi()
rtc = RTC()
timer_for_rtc = Timer(1)
timer_for_weather = Timer(2)
us = UltrasonicSensor("P0")
img_all = Image("11111:11111:11111:11111:11111")
def give_umbrella():
myservo.syncRotateServos(500, (myservo_p13, 45), (myservo_p14, 135))
time.sleep_ms(30000)
myservo.syncRotateServos(500, (myservo_p13, 90), (myservo_p14, 90))
def get_icon(weather):
if weather in WORDS_FINE:
img = IMG_FINE
elif weather in WORDS_RAINY:
img = IMG_RAINY
elif weather in WORDS_CLOUDY:
img = IMG_CLOUDY
elif weather in WORDS_SNOWY:
img = IMG_SNOWY
else:
img = IMG_OTHERS
return img
def get_forecast(t=None):
if t:
t.init(mode=Timer.ONE_SHOT, period=86400*1000, callback=get_forecast)
if not wifi.isconnected():
if not wifi.connect(SSID, PASSWORD):
return
url_format = "https://worldweather.wmo.int/en/json/[City ID]_en.json"
url = url_format.replace("[City ID]", str(CITY_ID))
res = urequests.get(url)
file = open("weather_report.json", mode="w+t")
file.write(res.text)
file.close()
def inform_forecast():
with open("weather_report.json", mode="rt") as file:
data = ujson.load(file)
weather = data["city"]["forecast"]["forecastDay"][0]["weather"]
max_temp = data["city"]["forecast"]["forecastDay"][0]["maxTemp"]
min_temp = data["city"]["forecast"]["forecastDay"][0]["minTemp"]
img = get_icon(weather)
display.show(img, delay=3000)
display.scroll("max:"+max_temp, delay=25, color=(31, 0, 0))
display.scroll("min:"+min_temp, delay=25, color=(0, 0, 31))
if weather in WORDS_RAINY:
give_umbrella()
def correct_time(t=None):
if not wifi.isconnected():
if not wifi.connect(SSID, PASSWORD):
return
addrinfo = usocket.getaddrinfo(NTP_SERVER, 123)
sockaddr = addrinfo[0][-1]
s = usocket.socket(usocket.AF_INET, usocket.SOCK_DGRAM)
s.connect(sockaddr)
data = bytearray(48)
data[0] = 0x23
client_transmit = time.time()
s.send(data)
msg = s.recv(48)
client_receive = time.time()
s.close()
server_receive = int(ubinascii.hexlify(msg[32:36]), 16) - DELTA
server_transmit = int(ubinascii.hexlify(msg[40:44]), 16) - DELTA
offset = ((server_receive - client_transmit) + (server_transmit - client_receive)) / 2
current_time = time.time() + offset + UTC_TIMEZONE_JP
datetime = time.localtime(int(current_time))
datetime = datetime[:3] + (datetime[6],) + datetime[3:6] + (0,)
rtc.init(datetime)
def light_on():
display.show(img_all, delay=30000, clear=True, color=(31, 31, 31))
def alarm():
while True:
buzzer.on("C7" , duration=200)
time.sleep_ms(100)
def main():
correct_time()
timer_for_rtc.init(mode=Timer.PERIODIC, period=600000, callback=correct_time)
get_forecast()
datetime = rtc.datetime()
now = datetime[4] * 3600 + datetime[5] * 60 + datetime[6]
if now < WEATHER_REPORT_REQUEST_TIME:
diff = WEATHER_REPORT_REQUEST_TIME - now
else:
diff = (WEATHER_REPORT_REQUEST_TIME + 86400) - now
timer_for_weather.init(mode=Timer.ONE_SHOT, period=diff*1000, callback=get_forecast)
while True:
d = us.get_distance()
if d > 0 and d < 30:
datetime = rtc.datetime()
now = datetime[4] * 3600 + datetime[5] * 60 + datetime[6]
if now >= PERIOD_GO_OUT[0] and now <= PERIOD_GO_OUT[1]:
inform_forecast()
elif now >= PERIOD_RETURN_HOME[0] and now <= PERIOD_RETURN_HOME[1]:
light_on()
elif now >= PERIOD_SLEEP[0] or now <= PERIOD_SLEEP[1]:
alarm()
time.sleep_ms(20)
main()
チャプター7
おわりに
7. 1 このレッスンのまとめ
今回はテーマ.11-2とテーマ.11-3で学習した、インターネットを利用した「気象予報データの取得」と「リアルタイムクロックの時刻補正」を応用して、傘立てのIoT化をモチーフにしたロボット制作に取り組みました。
IoTの世界では、このように身近な道具をインターネットに接続して様々なサービスと連携することで、新たな価値を生み、私たちの暮らしをより一層豊かにしていくことが期待されています。
また、テーマ.11では紹介できませんでしたが、HTTPリクエストを送るだけでメールを送信したり、AIを使った音声認識や画像認識の技術を無料で提供するサービスもあります。ほとんどが会員登録を必要とするものですが、興味のある人は検索して調べて自分のロボットづくりに利用してみましょう。
7. 2 次回のレッスンについて
テーマ.12では、「Studuino:bitをWebサーバーとして起動する」方法と、「オープンソース※として公開されているMicroPython用の拡張パッケージをインストールして利用する」方法について学習していきます。
次回のレッスンでは、手持ちのタブレットやスマートフォンからStuduino:bitを制御する方法を紹介します。家からWi-Fi接続ができるタブレットやスマートフォンを持参しましょう。