Pythonロボティクスコース レッスン 37
テーマ.9-4 スレッド間で共有するデータの保護
スレッド間で共有するデータを保護して安全にマルチタスクを行おう!
チャプター1
このレッスンで学ぶこと
電気製品の中には、カーオーディオや炊飯器など、デジタルな時計表示の付いたものがあります。マイコンには、リアルタイムに日付や時刻の経過を追うための機能が備わっているものがあり、MicroPythonでもそのためのモジュールが提供されています。
また、これまでのレッスンで学習してきたマルチスレッドのプログラムでは、スレッド間で共有しているデータに対して、別々のスレッドから同時に参照と変更が行われると、予期せぬ不具合につながってしまう可能性があります。これを防ぐ方法として、Pythonでは一時的に他のスレッドからデータが利用できないように保護する仕組みが提供されています。
このレッスンで、これら2つの機能について新たに学習をしていきましょう。
チャプター2
日付・時刻の経過を追うリアルタイムクロック
MicrpPythonでは、正確な日付・時刻の経過を追うための「リアルタイムクロック(Real Time Clock)機能」が用意されています。
2. 1 日付・時刻の設定と取得
リアルタイムクロック機能は、machine
モジュール内でRTC
クラスとして提供されています。新たに日付・時刻を設定するときは、このクラスのdatetime()
メソッドを使用します。
datetime()
メソッドは以下の順番でデータが並ぶタプルを引数として受け取ります。
datetime((年, 月, 日, 曜日, 時, 分, 秒, ミリ秒))
この中で曜日は、文字列の代わりに以下の数字で指定します。
0 | 1 | 2 | 3 | 4 | 5 | 6 |
---|---|---|---|---|---|---|
月 | 火 | 水 | 木 | 金 | 土 | 日 |
また、日付・時刻を取得するときも同じdatetime()
メソッドを使います。この場合は引数を省略します。
では、現在の日付と時刻を設定し、10秒後に日付と時刻を取得するサンプルコードを書いて実行してみましょう。
import time
from machine import RTC
rtc = RTC() # RTCクラスのインスタンスを作成
# 学習時の日付・時刻を設定。この例では「2020年4月1日(水)17:30:30」を設定
rtc.datetime((2020, 4, 1, 2, 17, 30, 30, 0))
time.sleep(10) # sleep()メソッドの引数の単位は「秒」
print(rtc.datetime()) # 引数を省略して、日付・時刻を取得
(実行結果)
(2020, 4, 1, 2, 17, 30, 40, 380)
実は、曜日については日付から自動的に正しい情報が設定されるようになっているため、代わりに「0」を入力しても同じ結果になります。
変更【6行目】
import time
from machine import RTC
rtc = RTC()
rtc.datetime((2020, 4, 1, 0, 17, 30, 30, 0)) # 4つめの要素を「2」から「0」に変更
time.sleep(10)
print(rtc.datetime())
(実行結果)
(2020, 4, 1, 2, 17, 30, 40, 369)
2. 2 timeモジュールを使用した日付・時刻の取得
日付や時刻の取得はtime
モジュールからも行えます。それにはlocaltime()
メソッドを使います。localtime()
メソッドで取得する日付や時刻はリアルタイムクロックを参照しているため、これが正しく設定できていないと正確な日付・時刻を取得できません。
一度Studuino:bitのリセットボタンを押して、リアルタイムクロックの設定を初期化したあとで、以下のプログラムを実行して結果を確認しましょう。
import time
from machine import RTC
rtc = RTC()
print("before:", time.localtime()) # リアルタイムクロックの設定前
rtc.datetime((2020, 4, 1, 2, 17, 30, 30, 0))
print("after:", time.localtime()) # リアルタイムクロックの設定後
(実行結果)
before: (2000, 1, 1, 0, 0, 2, 5, 1) after: (2020, 4, 1, 17, 30, 30, 2, 92)
※ リアルタイムクロックは、起動時に「2000年1月1日 00:00:00」で初期設定されます。
この結果から、localtime()
関数がリアルタイムクロックを参照していることが分かります。また、localtime()
関数の戻り値はRTC
クラスのdatetime()
メソッドの戻り値と比べて、データの並びと内容が少し違うので注意してください。
【 localtime()関数の戻り値のデータの並び 】
(年, 月, 日, 時, 分, 秒, 曜日, 年内の通算日数を表す数字1~366)
チャプター3
データを保護してスレッド間で安全に共有する方法
前のレッスンでは、スタックやキューを使用して、複数のスレッド間でデータの受け渡しを行うプログラムを作成しました。そして、このプログラムでは別々のスレッドから同時に同じデータへの参照と変更を行っていました。このケースでは特に問題は見られませんでしたが、内容によっては、スレッド間でのデータ共有が思わぬ不具合につながってしまう可能性があります。
3. 1 スレッド間でのデータの共有が問題になるケース
例として、仮想の「メール予約配信システム」を構築し、どのような問題が起きうるのかを見ていきましょう。
■ 変更が反映されない問題
この仮想のメール予約配信システムでは、リストに登録されているメンバー宛て対して、設定した時間の経過後に自動でメッセージを送信できます。
以下のサンプルコードでは、このシステムを利用しているユーザーが、メールの配信を設定した後で、リストへのメンバーの登録漏れに気づき、更新作業を行っているところをシミュレートしています。
【 サンプルコード 3-1-1 】
import time
import _thread
from machine import Timer
mytimer = Timer(1) # タイマ割込みを利用して、設定した時間の経過後にメールを配信する
# メンバーの名前とメールアドレスのリスト
mailing_list = {
"Daisuke": "daisuke@python.com",
"Ichiro": "ichiro@python.com",
"Taichi": "taichi@python.com",
}
# リスト内のメンバーへ順番にメールを送る処理
def send_mail():
for name, address in mailing_list.items():
time.sleep_ms(100)
print("sending a mail to {}: Hello, {}.".format(address, name))
# タイマで設定された時間になると、メールを送る関数を新たなスレッドを立てて実行する処理
def wake_up(t):
_thread.start_new_thread(send_mail, ())
t.deinit() # 1回切りのため、利用後はタイマを解放する
# ユーザーがリストの内容を更新する作業をシミュレートする処理
def update_mailing_list():
global mailing_list # 内容を書き換えるため、グローバル宣言が必要
# ユーザーが1秒おきに、メンバーを追加する作業
time.sleep_ms(1000)
mailing_list["Kenta"] = "kenta@python.com"
time.sleep_ms(1000)
mailing_list["Kaito"] = "kaito@python.com"
time.sleep_ms(1000)
mailing_list["Taro"] = "taro@python.com"
# タイマ割込みでメールの配信を予約
mytimer.init(mode=Timer.ONE_SHOT, period=5000, callback=wake_up)
time.sleep_ms(3000) # 予約してから少し時間が経ったところで、
update_mailing_list() # ユーザーがリストを更新
もちろんユーザーが期待しているのは、更新されたリストで6名全てのメンバーへメールが配信されることです。しかし、実際はこのサンプルコードを実行すると、次のような結果になります。
(実行結果)
sending a mail to kenta@python.com: Hello, Kenta. sending a mail to daisuke@python.com: Hello, Daisuke. sending a mail to taichi@python.com: Hello, Taichi. sending a mail to kaito@python.com: Hello, Kaito. sending a mail to ichiro@python.com: Hello, Ichiro.
6名のメンバーに対して、メールは5名にしか送られていません。これは、上のサンプルコードが次の時間軸で処理を進めていたためです。
図からも分かるように、ユーザーがリストを更新している途中でメールの送信が開始されてしまったため、最後にリストへ追加したTaro
さんには、メールを送ることができなかったのです。
このようにマルチスレッドのプログラムにおいては、共有しているデータをあるスレッドから更新する場合、別のスレッドが同じデータを利用していると、思わぬ問題を引き起こす可能性があります。そこで、Pythonには一時的にデータを保護し、他のスレッドから利用できないようにする仕組みが用意されています。
3. 2 ロックオブジェクトを利用したデータの保護
上の問題を避ける手段として、Pythonには他のスレッドから一時的にアクセスできないように、ロックできるオブジェクトが用意されています。これを「ロックオブジェクト」といい、_thread
モジュールのallocate_lock()
関数で作成できます。
※ 「allocate」は「割り当てる」という意味があります。
lock_obj = _thread.allocate_lock()
ロックオブジェクトは特殊なオブジェクトで、「ロック」と「解除(アンロック)」の2つの状態があり、スレッドから以下のメソッドを実行すると、状態を変更することができます。
【 ロックオブジェクトのメソッド 】
メソッド名 | 処理の内容 |
---|---|
acquire(waitflag=1, timeout=-1) | ロックを要求します。もし、既に他のスレッドからロックされている場合は、引数の値に従って解除まで待機します。例えば、引数waitflag に「0」を指定した場合は、その場でロックできる場合のみロックを要求し、できない場合は解除を待たずに次の処理に移ります。反対に、引数waitflag に「1」を指定した場合は、引数timeout の時間(秒)が経過するまでは解除を待ち、それを過ぎると次の処理に移ります。他にも、引数timeout を「-1」に設定した場合は、解除まで無期限に待機します。 |
release() | ロックを解除します。 |
locked() | 現在ロックされているかどうかをブール値(True まはたFalse )で返します。 |
ロックオブジェクトの振る舞いは、まさに鍵付きのロッカーと同じです。acquire()
メソッドを実行した場合、ロッカーに鍵が差してあれば、その鍵を使用してロックします。もし鍵がなく、ロックされていた場合は、それをロックした人が鍵を開けるまで(release()
メソッドを実行して解除するまで)待つこともできますが、他のロッカーを探しに行ったり、利用をあきらめることもあります。
下のサンプルコードは、このロックオブジェクトを使用する一連の流れを書いたものです。
import time
import _thread
lock_obj = _thread.allocate_lock() # ロックオブジェクトの作成
lock_obj.acquire() # ロックを要求する
print("lock_obj is locked.")
time.sleep_ms(1000) # この間はロックされている
lock_obj.release() # ロックを解除する
print("lock_obj is released.")
また、ロックオブジェクトはwith
構文に対応しています。そのため、以下のサンプルコードは上と同じ処理を行います。
import time
import _thread
lock_obj = _thread.allocate_lock()
with lock_obj: # ロックを要求する。既にロック中の場合は解除されるまで待つ
print("lock_obj is locked.")
time.sleep_ms(1000) # この間はロックされている
print("lock_obj is released.") # with構文内の処理を終えると、自動的にロックが解除される
では、さきほど問題のあったプログラムをロックオブジェクトを使って、以下のように書き換えてみましょう。
【 ロックオブジェクトを使用して書き換えた サンプルコード 3-1-1 】
追加・変更【6行目、16行目~19行目、30行目~36行目】
import time
import _thread
from machine import Timer
mytimer = Timer(1)
lock_obj = _thread.allocate_lock() # ロックオブジェクトを作成
mailing_list = {
"Daisuke": "daisuke@python.com",
"Ichiro": "ichiro@python.com",
"Taichi": "taichi@python.com",
}
def send_mail():
with lock_obj: # ロックが解除されるまで待ってからリストのデータを参照する
for name, address in mailing_list.items():
time.sleep_ms(100)
print("sending a mail to {}: Hello, {}.".format(address, name))
def wake_up(t):
_thread.start_new_thread(send_mail, ())
t.deinit()
def update_mailing_list():
global mailing_list
with lock_obj: # ロックを掛けてからリストに変更を加える
time.sleep_ms(1000)
mailing_list["Kenta"] = "kenta@python.com"
time.sleep_ms(1000)
mailing_list["Kaito"] = "kaito@python.com"
time.sleep_ms(1000)
mailing_list["Taro"] = "taro@python.com"
# ここでロックを解除
mytimer.init(mode=Timer.ONE_SHOT, period=5000, callback=wake_up)
time.sleep_ms(3000)
update_mailing_list()
このプログラムを実行すると、update_mailing_list()
関数での更新処理がすべて終了してから、メールが送信されるようになったことが分かります。
(実行結果)
>>> sending a mail to kenta@python.com: Hello, Kenta. sending a mail to daisuke@python.com: Hello, Daisuke. sending a mail to taichi@python.com: Hello, Taichi. sending a mail to kaito@python.com: Hello, Kaito. sending a mail to taro@python.com: Hello, Taro. sending a mail to ichiro@python.com: Hello, Ichiro.
このように、マルチスレッドなプログラムでは、ロックオブジェクトを使用してデータを保護することで、予期せぬ結果が生じてしまうのを防ぐことができます。しかし、逆にロックオブジェクトを使うことで、別の不具合が起きるケースもあるのです。
3. 3 デッドロックの問題
ロックオブジェクトは便利な機能ですが、ロックを解除するのを忘れたことで、他の処理が進まなくなってしまったり、また複数のロックオブジェクトを使用することにより起きる「デッドロック」という落とし穴があったりします。
3. 4 デッドロックとは
まずは、以下のサンプルコードを見てください。このサンプルコードでは、work1()
とwork2()
の2つの関数の間で、2つのロックオブジェクトを使用し、ロックと解除を一定時間おきに繰り返す処理を行っています。
【 サンプルコード 3-2-1 】
import _thread
import time
# 2つのロックオブジェクトの作成
lock_obj1 = _thread.allocate_lock()
lock_obj2 = _thread.allocate_lock()
# 1秒おきにlock_obj1のロックを要求し、その中でさらにlock_obj2のロックを要求する処理
def work1():
while True:
time.sleep_ms(1000)
with lock_obj1:
print("work1: lock_obj1 is locked.")
with lock_obj2:
print("work1: lock_obj2 is locked.")
print("work1: lock_obj2 is released.")
print("work1: lock_obj1 is released.")
# 1.5秒おきにlock_obj2のロックを要求し、その中でさらにlock_obj1のロックを要求する処理
def work2():
while True:
time.sleep_ms(1500)
with lock_obj2:
print("work2: lock_obj2 is locked.")
with lock_obj1:
print("work2: lock_obj1 is locked.")
print("work2: lock_obj1 is released.")
print("work2: lock_obj2 is released.")
# それぞれの関数をスレッドを立ち上げて実行
_thread.start_new_thread(work1, ())
_thread.start_new_thread(work2, ())
このプログラムを実行すると、最初のうちは問題なく動作しますが、しばらくすると以下の文が表示されたあと、処理がストップしてしまいます。
. . . . . work1: lock_obj1 is locked. work2: lock_obj2 is locked.
このような状態が起きるは、work1
のlock_obj1を解除する条件にlock_obj2
のロックが含まれており、反対にwork2
では、lock_obj2
の解除条件にlock_obj1
の取得が含まれているためです。このようなケースでは、互いが同時にlock_obj1
とlock_obj2
にロックを掛けてしまうと、どちらも解除ができない状態に陥り、処理を先に進めることができなくなってしまいます。
これを「デッドロック」といい、ロックオブジェクトを使用する際に最も注意しなければいけないエラーのひとつです。
チャプター4
一定時間ごとの平均温度を記録するシステムの制作
ここからは、学習したことを踏まえて、より実践的なプログラムを作成します。
作成するプログラムでは、まず温度センサで一定時間おきに温度を取得し、キューにその情報を蓄積します。そして、ある程度のデータ量が蓄積できたところで、集計を行い平均温度と計算して、そのときの時刻とともに記録します。その後、ユーザーがボタンAを押すと、最新のデータ(時刻と平均温度)をLEDディスプレイにスクロール表示するというシステムです。下の図はこのシステムの全体像を表しています。
【 制作するシステムの全体像 】
このシステムでは、3つの関数を同時並行で処理しています。また、関数の間では特定のデータを共有します。
それではこのシステムを、これまでに学習してきた「タイマ割込み」「マルチスレッド」「キュー」「リアルタイムクロック」「ロックオブジェクト」を使用して作成していきましょう。
4. 1 プログラムの作成手順
上の図で確認した3つの関数を、それぞれ次の名前で順番に作成します。
関数名 | 内容 |
---|---|
logging_temperature() | 500ミリ秒おきに温度を取得する関数 |
calc_average() | 30秒間の計測結果を集計して平均温度を計算する関数 |
main() | メインのループ関数 |
※ logging(ロギング)・・・起きた出来事についての情報などを一定の形式で時系列に記録・蓄積すること。
※ calculate(カルキュレート)・・・計算すること。
※ average(アベレージ)・・・平均値のこと。
■ 500ミリ秒おきに温度を取得する関数の作成
まずは、温度センサから500ミリ秒おきに取得する処理をlogging_temperature()
関数としてまとめましょう。
import time
from pystubit.board import temperature
def logging_temperature():
while True: # 500ミリ秒おきに繰り返し現在の温度を取得
time.sleep_ms(500)
_temp = temperature.get_celsius()
取得した温度は、キューを用意して蓄積します。このキューはcalc_average()
関数と共有しているため、更新するときはロックオブジェクトを利用して一時的にデータを保護します。これで関数が作成できました。
追加【2行目、5・6行目、13・14行目】
import time
import _thread # マルチスレッドのモジュールのインポート
from pystubit.board import temperature
que = [] # 温度センサから取得したデータを蓄積するためのキュー
lock_obj = _thread.allocate_lock() # ロックオブジェクトの作成
def logging_temperature():
while True: # 500ミリ秒おきに繰り返し現在の温度を取得
time.sleep_ms(500)
_temp = temperature.get_celsius()
with lock_obj: # ロックを要求しロックできるまで待機
que.append(_temp) # 取得したデータをキューに蓄積
■ 30秒間の計測結果を集計して平均温度を計算する関数の作成
この関数では、30秒おきにキューに蓄積した温度データを集計し、その平均値を計算します。また、その平均値とそのときの時刻をリアルタイムクロックで取得し、最新の温度情報としてmain()
関数と共有するグローバル変数に記録します。まずは、そこまでのコードを書きましょう。
追加【3行目、8・9行目、20行目~30行目】
import time
import _thread
from machine import RTC
from pystubit.board import temperature
que = []
lock_obj = _thread.allocate_lock()
latest_data = None # 最新のデータを記録するためのグローバル変数
rtc = RTC() # リアルタイムクロックのオブジェクト作成。日付・時刻の設定はmain()関数で行う。
def logging_temperature():
while True:
time.sleep_ms(500)
_temp = temperature.get_celsius()
with lock_obj:
que.append(_temp)
def calc_average():
global latest_data # 内容を変更するためグローバル宣言が必要
num = len(que) # 現在のキューの長さを取得
with lock_obj: # キューの内容を変更する前にロックを要求
_sum = 0 # 集計用の変数
for _ in range(num): # 現在のキューの長さだけ繰り返す
_sum += que.pop(0) # キューの先頭から要素を削除してその値を取得
average = round(_sum/num, 2) # 集計した合計値から平均値を計算し小数点第二位までで値を丸める
latest_data = (rtc.datetime(), average) # 最新の温度情報をタプルにまとめて記録
print(latest_data) # 動作確認用にターミナルにも表示
28行目のコードでは、_sum/num
で求めた平均値を小数第2位までに丸めるために、round()
関数を利用しています。round()
関数は第2引数を指定することで、整数だけでなく小数点以下の桁でも数値を丸めることができます。
そして、作成したcalc_average()
関数を30秒おきにスレッドを立てて実行するために、タイマ割込みを利用します。タイマ割込みで呼び出すための関数を新たに作成し、その中でスレッドを立ち上げるようにしましょう。
追加・変更【3行目、10行目、34・35行目】
import time
import _thread
from machine import RTC, Timer # タイマ割込み用のクラスをインポート
from pystubit.board import temperature
que = []
lock_obj = _thread.allocate_lock()
latest_data = None
rtc = RTC()
mytimer = Timer(1) # タイマオブジェクトの作成
def calc_average():
global latest_data
num = len(que)
with lock_obj:
_sum = 0
for _ in range(num):
_sum += que.pop(0)
average = round(_sum/num, 2)
latest_data = (rtc.datetime(), average)
print(latest_data)
def wake_up(t): # タイマ割込みで呼び出す関数。引数の「t」は呼び出し元のタイマオブジェクトが格納される。
_thread.start_new_thread(calc_average, ()) # スレッドを立ち上げて実行する
■ メインのループ関数の作成
最後に、ボタンAを押すと、最新の温度情報をLEDディスプレイに表示するループ処理をまとめた関数を作成します。また、この関数の冒頭では、リアルタイムクロックの日付・時刻を引数を受け取って設定したり、用意した関数をスレッドを立ち上げて実行したり、タイマ割込みを設定したりする処理も行います。
追加・変更【4行目、43行目~57行目】
import time
import _thread
from machine import RTC, Timer
from pystubit.board import temperature, display, button_a # LEDディスプレイとボタンAのオブジェクトをインポート
def wake_up(t):
_thread.start_new_thread(calc_average, ())
def main(_datetime): # 引数として開始時点の日付・時刻を受け取る
rtc.datetime(_datetime) # リアルタイムクロックに現在の日付・時刻を設定する
_thread.start_new_thread(logging_temperature, ()) # スレッドを立ち上げて実行
mytimer.init(mode=Timer.PERIODIC, period=30000, callback=wake_up) # タイマ割込みの設定
while True: # ボタンAを押されたときに最新の温度情報をスクロール表示
if button_a.is_pressed():
if latest_data: # 最新データがある場合(※スタート後30秒間はこのデータがない)
hour = latest_data[0][4] # 時(rtc.datetime()メソッドの戻り値のタプルの4番の要素)
minute = latest_data[0][5] # 分(同じく5番の要素)
second = latest_data[0][6] # 秒(同じく6番の要素)
_time = "{}:{}:{}".format(hour, minute, second) # 「h:m:s」のフォーマットで時刻を表示
_temp = str(latest_data[1]) # 最新の平均温度
display.scroll(_time, delay=50) # 時刻からスクロール表示
display.scroll(_temp, delay=50) # 続けて平均温度をスクロール表示
else: # 最新データがまだ無い場合はターミナルにて知らせる
print("no data")
while button_a.is_pressed(): # ボタンAが一度はなされるまで待つ
pass
time.sleep_ms(100) # 計算負荷を掛け過ぎないように、100ミリ秒に1回のボタンAを監視
■ 動作の確認
これでプログラムの完成です。実行後、ターミナルから現在の日時・時刻をmain()
に指定して呼び出します。30秒後に、最新のデータがターミナルに表示され、ボタンAを押すと同じデータがLEDディスプレイにスクロール表示されれば成功です。
※ 日付・時刻はタプルにまとめて渡す必要があるので注意してください。
※ 下の表示の4行目が記録された最新のデータです。このあとでボタンAを押してください。
MicroPython v1.10-229-gd579623-dirty on 2019-12-04; ESP32 module with ESP32 Type "help()" for more information. >>> main((2020, 4, 1, 0, 12, 0, 0, 0)) ((2020, 4, 1, 2, 12, 0, 30, 3354), 31.66)
エラーが出て改善できない場合は、以下のサンプルコードと見比べて、誤りがないかを確認しましょう。
【 サンプルコード 4-1-1 】
import time
import _thread
from machine import RTC, Timer
from pystubit.board import temperature, display, button_a
que = []
lock_obj = _thread.allocate_lock()
latest_data = None
rtc = RTC()
mytimer = Timer(1)
def logging_temperature():
while True:
time.sleep_ms(500)
_temp = temperature.get_celsius()
with lock_obj:
que.append(_temp)
def calc_average():
global latest_data
num = len(que)
with lock_obj:
_sum = 0
for _ in range(num):
_sum += que.pop(0)
average = round(_sum/num, 2)
latest_data = (rtc.datetime(), average)
print(latest_data)
def wake_up(t):
_thread.start_new_thread(calc_average, ())
def main(_datetime):
rtc.datetime(_datetime)
_thread.start_new_thread(logging_temperature, ())
mytimer.init(mode=Timer.PERIODIC, period=30000, callback=wake_up)
while True:
if button_a.is_pressed():
if latest_data:
hour = latest_data[0][4]
minute = latest_data[0][5]
second = latest_data[0][6]
_time = "{}:{}:{}".format(hour, minute, second)
_temp = str(latest_data[1])
display.scroll(_time, delay=50)
display.scroll(_temp, delay=50)
else:
print("no data")
while button_a.is_pressed():
pass
time.sleep_ms(100)
チャプター5
課題:一定時間ごとの光センサの平均値を記録する機能の追加
レッスンの最後に、前のチャプターで作成したプログラムへ、新たに光センサの平均値を記録する機能を追加する課題に取り組みましょう。そして、記録した最新情報は、ボタンBを押すとLEDディスプレイにスクロール表示されるようにもしましょう。
【追加する機能に必要な関数と共有データ】
上の図の通り、追加する機能に必要な関数や共有データ等の構造は温度センサの場合と同じです。そのため、温度センサ用の変数や関数を名前を変えて複製し、必要な箇所だけ書き換えることで、プログラムを作成しましょう。
5. 1 プログラムの作成例
温度センサ用の変数や関数を複製し、それぞれ次のように名前を変更します。
温度センサ用の関数名や変数名 | 光センサ用に複製した関数名や変数名 |
---|---|
que | que_light |
lock_obj | lock_obj_light |
latest_data | latest_data_light |
logging_temperature() | logging_lightsensor() |
calc_average() | calc_average_light() |
そして、それぞれの関数内のコードを光センサ用に書き換えましょう。
追加・変更【5行目、8行目、10行目、12行目、25行目~30行目、46行目~56行目】
import time
import _thread
from machine import RTC, Timer
from pystubit.board import temperature, display, button_a, button_b
from pystubit.board import lightsensor # 光センサのオブジェクトをインポート
que = []
que_light = [] # 光センサ用
lock_obj = _thread.allocate_lock()
lock_obj_light = _thread.allocate_lock() # 光センサ用
latest_data = None
latest_data_light = None # 光センサ用
rtc = RTC()
mytimer = Timer(1)
def logging_temperature():
while True:
time.sleep_ms(500)
_temp = temperature.get_celsius()
with lock_obj:
que.append(_temp)
# 光センサ用に複製して書き換える
def logging_lightsensor():
while True:
time.sleep_ms(500)
_light = lightsensor.get_value() # 光センサの値を取得
with lock_obj_light: # 光センサ用のロックオブジェクト
que_light.append(_light) # 光センサ用のキュー
def calc_average():
global latest_data
num = len(que)
with lock_obj:
_sum = 0
for _ in range(num):
_sum += que.pop(0)
average = round(_sum/num, 2)
latest_data = (rtc.datetime(), average)
print(latest_data)
# 光センサ用に複製して書き換える
def calc_average_light():
global latest_data_light # 光センサ用のグローバル変数
num = len(que_light) # 光センサ用のキューの長さを取得
with lock_obj_light: # 光センサ用のロックオブジェクト
_sum = 0
for _ in range(num):
_sum += que_light.pop(0) # 光センサ用のキューの先頭から要素を削除して値を取得
average = round(_sum/num) # 光センサの値は0~4095までの整数となっているため、平均値も整数で丸める
latest_data_light = (rtc.datetime(), average) # 最新データを記録
print(latest_data_light) # 確認用にターミナルに表示
これで、関数と変数の複製ができたので、今度はスレッドの立ち上げやタイマ割込みの設定を行い、ボタンBが押されたときに最新データを表示する処理を追加します。
追加【62行目、69行目、87行目~100行目】
def wake_up(t):
_thread.start_new_thread(calc_average, ())
# 光センサの平均値を計算する処理をスレッドを立てて実行
_thread.start_new_thread(calc_average_light, ())
def main(_datetime):
rtc.datetime(_datetime)
_thread.start_new_thread(logging_temperature, ())
# 光センサの値を取得する処理をスレッドを立てて実行
_thread.start_new_thread(logging_lightsensor, ())
mytimer.init(mode=Timer.PERIODIC, period=30000, callback=wake_up)
while True:
if button_a.is_pressed():
if latest_data:
hour = latest_data[0][4]
minute = latest_data[0][5]
second = latest_data[0][6]
_time = "{}:{}:{}".format(hour, minute, second)
_temp = str(latest_data[1])
display.scroll(_time, delay=50)
display.scroll(_temp, delay=50)
else:
print("no data")
while button_a.is_pressed():
pass
# ボタンBが押されたときに、光センサの最新データをスクロール表示
if button_b.is_pressed():
if latest_data_light:
hour = latest_data_light[0][4]
minute = latest_data_light[0][5]
second = latest_data_light[0][6]
_time = "{}:{}:{}".format(hour, minute, second)
_light = str(latest_data_light[1])
# この例では、温度と区別するために白色でスクロール表示するように設定
display.scroll(_time, delay=50, color=(31, 31, 31))
display.scroll(_light, delay=50, color=(31, 31, 31))
else:
print("no data")
while button_b.is_pressed():
pass
time.sleep_ms(100)
これでプログラムの完成です。実行して、ターミナルからmain()
関数を呼び出し、動作を確認しましょう。もし、エラーが改善できない場合は、以下のサンプルコードと見比べて、誤りがないか確かめましょう。
※ターミナルの入力は main((年,月,日,曜日(数字で),時間,分,秒(ひとまずは0で),0))
【 サンプルコード 5-1-1 】
import time
import _threa
dfrom machine import RTC, Timer
from pystubit.board import temperature, display, button_a, button_b
from pystubit.board import lightsensor
que = []
que_light = []
lock_obj = _thread.allocate_lock()
lock_obj_light = _thread.allocate_lock()
latest_data = None
latest_data_light = None
rtc = RTC()
mytimer = Timer(1)
def logging_temperature():
while True:
time.sleep_ms(500)
_temp = temperature.get_celsius()
with lock_obj:
que.append(_temp)
def logging_lightsensor():
while True:
time.sleep_ms(500)
_light = lightsensor.get_value()
with lock_obj_light:
que_light.append(_light)
def calc_average():
global latest_data
num = len(que)
with lock_obj:
_sum = 0
for _ in range(num):
_sum += que.pop(0)
average = round(_sum/num, 2)
latest_data = (rtc.datetime(), average)
print(latest_data)
def calc_average_light():
global latest_data_light
num = len(que_light)
with lock_obj_light:
_sum = 0
for _ in range(num):
_sum += que_light.pop(0)
average = round(_sum/num)
latest_data_light = (rtc.datetime(), average)
print(latest_data_light)
def wake_up(t):
_thread.start_new_thread(calc_average, ())
_thread.start_new_thread(calc_average_light, ())
def main(_datetime):
rtc.datetime(_datetime)
_thread.start_new_thread(logging_temperature, ())
_thread.start_new_thread(logging_lightsensor, ())
mytimer.init(mode=Timer.PERIODIC, period=30000, callback=wake_up)
while True:
if button_a.is_pressed():
if latest_data:
hour = latest_data[0][4]
minute = latest_data[0][5]
second = latest_data[0][6]
_time = "{}:{}:{}".format(hour, minute, second)
_temp = str(latest_data[1])
display.scroll(_time, delay=50)
display.scroll(_temp, delay=50)
else:
print("no data")
while button_a.is_pressed():
pass
if button_b.is_pressed():
if latest_data_light:
hour = latest_data_light[0][4]
minute = latest_data_light[0][5]
second = latest_data_light[0][6]
_time = "{}:{}:{}".format(hour, minute, second)
_light = str(latest_data_light[1])
display.scroll(_time, delay=50, color=(31, 31, 31))
display.scroll(_light, delay=50, color=(31, 31, 31))
else:
print("no data")
while button_b.is_pressed():
pass
time.sleep_ms(100)
チャプター6
おわりに
6. 1 このレッスンのまとめ
このレッスンでは新たに、正確な日付・時刻の経過を追う「リアルタイムクロック機能」とマルチスレッドなプログラムにおいて、他のスレッドからデータを一時的に保護するための「ロックオブジェクト」について学習しました。
そして、テーマ9のまとめとして、これまでに学習してきた「タイマ割込み」「マルチスレッド」「キュー」「リアルタイムクロック」「ロックオブジェクト」をすべて使用して、より実践的なプログラムを作成しました。
テーマ9で学習したいくつかの機能は、ハードウェアによっては使用できないものもあります。これから、Pythonでマイコンのような機能が限定されたハードウェアを制御するときは、あらかじめどのような機能が提供されているのかを、開発元から出されているデータを見て確認するようにしましょう。
※Studuino:bitに搭載しているESP32というマイコンの場合、以下のMicroPython公式ドキュメントから使用できる機能の一覧が確認できます。
6. 2 次のレッスンについて
次のレッスンから、テーマ10に入ります。テーマ10では、コンピュータ上で扱われるデータについて、より深く学んでいきます。次のテーマ.10-1では、コンピュータ上で数値を扱うときに使われている「2進数」や「8進数」「16進数」などの数値の表現方法について学習しましょう。