Pythonロボティクスコース レッスン 35
テーマ.9-2 マルチスレッド機能を利用した並行処理
マルチスレッド機能を使ってサッカーのPKゲームを作成しよう!
チャプター1
このレッスンで学ぶこと
前のレッスンでは、「タイマ」や「汎用入出力端子(外部端子)」を使用した「割込み処理」について学習しました。割込み処理は、コンピュータ上で複数の処理を並行して切り替えながら行う「並行処理/マルチタスク」のための手段として、広く利用されています。
Pythonでは、並行処理を実現する手段として他にもいくつかの機能が提供されています。このレッスンではその中の1つとして「マルチスレッド機能」を紹介します。
チャプター2
マルチスレッド機能について
Pythonで提供されている平行処理のための機能は、そのすべてがMicroPythonでサポートされている訳ではありません。理由は前のレッスンでも説明したように、MicroPythonがマイコン用に小型軽量化されたPythonの実行環境であるためです。サポートされている数少ない機能の中で、割込みと並び利用頻度の高いのがマルチスレッド機能を提供する「_thread
モジュール」です。ここでは、thread
モジュールを使用した平行処理のコードの書き方を説明していきます。
2. 1 マルチスレッドとは
そもそも「スレッド」とはプログラムを実行するときの処理の最小単位を表すコンピュータ用語ですが、この説明だけでは分かりにくいので、料理を作る過程に例えてマルチスレッドついて理解していきましょう。
料理ではよく、複数の作業を同時に行うことがあります。例えば、お味噌汁を作るときは次のような流れで調理を進めていくことが多いのではないでしょうか。
「お湯を沸かす」や「豆腐を切る」という作業の単位が、コンピュータでいう1つの処理の単位となっていて、これを「スレッド」と呼びます。また、このお味噌汁の調理過程では、スレッドを進めていく流れが途中で2つに枝分かれしています。このように複数のスレッドを進める流れをもつことを「マルチスレッド」といいます。反対に、スレッドを進める流れが1つしかない場合は「シングルスレッド」といいます。
2. 2 _threadモジュールを使用したマルチスレッドなプログラム
例えば、「ボタンAを押すとブザーから音が鳴る処理」と「ボタンBを押すとLEDディスプレイに文字がスクロール表示される処理」の2つを、前のレッスンで説明したポーリング(※)でコードを書くと次のようになります。
※特定のイベント(例えば「ボタンが押される」や「センサの値が変化した」など)が発生しているかどうかを定期的に監視することで、イベントが発生したときに対応した処理を行う仕組み。
【 サンプルコード 2-2-1 】
from pystubit.board import display, buzzer
from pystubit.board import button_a, button_b
def sound(): # ブザーから音を鳴らす関数
buzzer.on("C4", duration=1000)
def scroll(): # LEDディスプレイ上に文字「Hi!」をスクロール表示する関数
display.scroll("Hi!", delay=50)
while True: # 各ボタンの状態を監視して、対応する関数を実行するループ処理(ポーリング)
if button_a.is_pressed():
sound()
if button_b.is_pressed():
scroll()
このプログラムを実行すると、確かにそれぞれのボタンを押したときに対応した処理が行われますが、一方の処理が行われている間に、別の処理に対応するボタンを押しても反応はできません。これを_thread
モジュールを利用したマルチスレッドなコードで書き替えると、一方の処理が行われている間も、他のボタンを押すとそれに対応した処理が行われるようになります。
では、実際に_thread
モジュールを使ってコードを書き直していきましょう。まずは、プログラムの先頭で新たに_thread
モジュールをインポートします。
追加【1行目】
import _thread # モジュールのインポート
from pystubit.board import display, buzzer
from pystubit.board import button_a, button_b
次に、sound()
関数とscroll()
関数の中に、それぞれwhile
文を使った、ボタンの状態を監視するループ処理を移動します。
追加・変更【6行目~8行目、11行目~13行目】
import _thread
from pystubit.board import display, buzzer
from pystubit.board import button_a, button_b
def sound():
while True: # ボタンAを監視するループ処理
if button_a.is_pressed():
buzzer.on("C4", duration=1000)
def scroll():
while True: # ボタンBを監視するループ処理
if button_b.is_pressed():
display.scroll("Hi!", delay=50)
新しくスレッドを処理する流れを立ち上げるときは、_thread
モジュールのstart_new_thread()
関数を使います。このstart_new_thread()
関数は、次の3つの引数を受け取ります。
_thread.start_new_thread(function, args [,kwargs])
第1引数のfucntion
には、新しいスレッドの流れで処理したい関数の名前を指定します。第2引数のargs
には、第1引数に指定した関数に渡したい位置引数をタプルの形式で指定します。そして、第3引数のkwargs
には、第1引数に指定した関数に渡したいキーワード引数を辞書の形式で指定します。第2引数までは必須ですが、最後の第3引数は省略することができます。
【 start_new_thread()
関数の使用例 】
import _thread
def func(arg1, arg2, kwarg1, kwarg2):
print("positional argument:", arg1, arg2)
print("keyword argument:", kwarg1, kwarg2)
_thread.start_new_thread(func, ("A","B"), {"kwarg1":"C", "kwarg2":"D"})
(実行結果)
>>> positional argument: A B keyword argument: C D
では、start_new_thread()
関数で、2つのスレッドの流れを新たに開始して、sound()
関数とscroll()
関数をそれぞれ処理してみましょう。
【 サンプルコード 2-2-2 】
追加【15行目、16行目】
import _thread
from pystubit.board import display, buzzer
from pystubit.board import button_a, button_b
def sound():
while True:
if button_a.is_pressed():
buzzer.on("C4", duration=1000)
def scroll():
while True:
if button_b.is_pressed():
display.scroll("Hi!", delay=50)
_thread.start_new_thread(sound, ()) # sound()関数は引数を受け取らないため、空のタプルを指定
_thread.start_new_thread(scroll, ()) # 同じ理由で空のタプルを指定
では、このプログラムを実行して、ボタンBを押して文字がスクロール表示されている間にボタンAを押してみましょう。【 サンプルコード 2-2-1 】と違い、ブザーから音が鳴るはずです。この結果からも、2つのスレッドの流れが処理できるマルチスレッド機能の良さが理解できるのではないでしょうか。
2. 3 マルチスレッドなプログラムを作成するときの注意点
1つの流れで処理すると複雑なコードを書かなければならない場合でも、マルチスレッド機能を使用すると、機能ごとにコードを分けてかくことができるため、コード全体をすっきりとまとめることができます。このように大変便利なマルチスレッド機能ですが、使用するときにはいくつか注意しなければ、致命的な不具合を引き起こしてしまう可能性もあります。ここでは、その中でも特に注意すべき2つの点について説明をします。
■ スレッド間はグローバルスコープを共有している
複数のスレッドの流れを立ち上げたとき、その間ではグル―バルスコープにあるグローバル変数や関数を共有しています。例えば、次のサンプルプログラムでは、それぞれボタンAとボタンBの状態を監視して押された回数を記録する2つの関数を、異なるスレッドの流れで処理しています。
【 ボタンAとボタンBボタンが押された回数を記録するプログラム 】
import _thread
from pystubit.board import button_a, button_b
count = 0 # グローバル変数(グローバルスコープに存在)
def event_button_a(): # ボタンAを監視して、押された回数を記録する処理
global count
while True:
if button_a.was_pressed():
count += 1
print(count)
def event_button_b(): # ボタンBを監視して、押された回数を記録する処理
global count
while True:
if button_b.was_pressed():
count += 1
print(count)
_thread.start_new_thread(event_button_a, ())
_thread.start_new_thread(event_button_b, ())
グローバル変数のcount
は、event_button_a()
関数とevent_button_b()
関数のどちらからも参照されています。もし、このプログラムが「両方のボタンが押された回数の合計を記録する」目的で作られているならば、意図通りに動きます。しかし、そうではなく「それぞれのボタンが押された回数を記録する」目的であったなら、意図しない結果となってしまいます。
これは簡単な例でしたが、複雑な処理をマルチスレッド機能で実現していると、意図せず変更してはいけない変数を書き換えてしまい、他のスレッドに悪影響が出てしまうことがあります。そのため、各スレッドではできるだけローカル変数を使うようにし、スレッドからグローバル変数を書き換える必要がある場合は、他に悪影響を及ぼさないことを充分に確認してください。
■ 有限な計算資源を分配している
マルチスレッドなプログラムは、シングルスレッドのプログラムに比べて、コンピュータへの計算負荷が大きくなります。これは、スレッド間はあくまでも1つの計算資源(CPU)を共有しており、コンピュータはその計算資源を細かく時間単位で分けて、各スレッドへ配分しているためです。
この配分を行うこと自体にも計算資源を割かなければいけないため、立ち上げるスレッドの数を増やし過ぎると、全体の処理が大幅に遅くなってしまうことがあります。
では実際に、「1~1000までの数字を足す処理」をシングルスレッドとマルチスレッドで実行し、計算に掛かった時間を比較してみましょう。
【 指定した数のスレッドを立ち上げ、それぞれで1~1000までの数字を足して表示するプログラム 】
import _thread
import time
def addition(): # 1~1000までの数字を足す処理
start_time = time.ticks_ms() # このスレッドの処理の開始時間を記録
_sum = 0 # 合計(sumはPythonの予約語なので、先頭に「 _ 」をつけて重複を回避)
for num in range(1, 1000):
_sum += num
print(time.ticks_diff(time.ticks_ms(), start_time)) # 計算にかかった時間を表示
def make_threads(num): # 指定された数だけスレッドを立ち上げる処理
for _ in range(num):
_thread.start_new_thread(addition, ()) # 各スレッドで足し算を実行
このプログラムで、1個のスレッドを立てて実行した場合と、10個のスレッドを立てて実行した場合で比べると、処理に掛かる時間が大幅に変わることが分かります。
【 スレッドを1つだけ立ち上げた場合 】
>>> make_threads(1) >>> 4
【 スレッドを10個立ち上げた場合 】
>>> make_threads(10) >>> 3100 3100 3100 3100 3100 3100 3090 3080 3100 3130
※ 大幅に処理が遅くなっていることが確認できれば、それぞれ計算に掛かった時間が上記と異なっていても問題ありません。
このように、立ち上げたスレッドの個数以上に処理に時間が掛かってしまうのは、上でも説明したように、それだけ計算資源を配分するための処理に時間が取られてしまうためです。
チャプター3
PKゲーム機の組立て
ここからは、学習したマルチスレッド機能を利用して、サッカーの「PKゲーム機」の制作を行います。以下の組立説明書を確認して、ブロックを組み立てましょう。
3. 1 組み立てに必要なパーツ
【 パーツ一覧 】
- Studuino:bit×1
- ロボット拡張ユニット×1
- 電池ボックス×1
- サーボモーター×1
- 赤外線フォトリフレクタ×1
- センサー接続コード(3芯15cm)×1
- ブロック基本四角(白)×5
- ブロック基本四角(グレー)×4
- ブロック基本四角(黒)×1
- ブロック基本四角(赤)×4
- ブロック基本四角(緑)×1
- ブロックハーフA(グレー)×6
- ブロックハーフB(赤)×1
- ブロックハーフC(白)×18
- ステー×4
- ギヤ大×1
- ラック×1
【 アーテックブロックの形状 】
3. 2 組立説明書
以下のリンク先から組立説明書を確認しましょう。
チャプター4
PKゲーム機のプログラムの作成
組み立てたPKゲーム機のプログラムを作成する前に、まずはこのゲームの遊び方を見ていきましょう。
4. 1 PKゲーム機の遊び方
このゲームでは、プレイヤーはボールに見立てた白色のブロックを指ではじき、制限時間内のゴールを目指します。ゴールはキーパーに見立てた緑色のブロックがランダムに動くことで守られているため、タイミング良くすき間をねらいボールをはじかなくてはいけません。では、実際にプレイしている様子を下の動画で見てみましょう。
【 PKゲーム機のプレイ動画】
4. 2 PKゲーム機の機能説明
PKゲーム機の組立てで使用した各パーツは、それぞれ次の機能を実現するために利用しています。
パーツ名 | 用途 | 機能 |
---|---|---|
サーボモーター+ギヤ | ゴールキーパー | 左右にランダムに動くことで、ボールからゴールを守ります。 |
LEDディスプレイ | 電光掲示板 | メッセージを繰り返し表示します。 |
赤外線フォトリフレクタ | 判定用センサ | ボールがゴールに入ったかどうかを判定します。 |
ブザー | 音響装置 | 繰り返しメロディを流します。 |
各パーツの制御は同時平行で行う必要がありますが、タイミングを考えながら制御対象を切り替えるコードを書くのは大変です。そこで、マルチスレッド機能を使います。上の4つの機能をそれぞれ関数にまとめ、同時に4つのスレッドを立ち上げて各関数を実行するようにしましょう。
4. 3 プログラムの作成手順
まずは、順番に4つの機能に関するコードを書いていきましょう。
■ ゴールキーパーをランダムに動かす処理
サーボモーターに取り付けたギヤとラックによって、サーボモーターの回転する動きを直線的な動きに変換しています。この機構を「ラック&ピニオン」といいます。サーボモーターをおよそ60°~120°の範囲で回転させると、ゴール全体を守るように緑色のブロックが移動します。
同じ動作を繰り返していると、簡単にゴールできてしまうため、60°~120°の範囲でランダムに角度を選び、一定時間(例えば200ミリ秒)おきに移動させることでゲームの難易度をアップします。このとき、60°~120°の範囲から1°単位で角度を選んでしまうと、乱数の出方によっては、同じ場所に長い時間留まってしまう可能性があるため、10°単位で選ぶように調整します。
では、この処理をgoal_keeper()
関数として次のようにまとめましょう。
import time
import random # 乱数を発生させるため、randomモジュールをインポート
from pyatcrobo2.parts import Servomotor
# ゴールキーパー(緑色のブロック)をランダムに動かす処理
def goal_keeper():
servo = Servomotor("P13")
while True:
angle = random.randint(6, 12) * 10 # 60°~120°の範囲から10°単位でランダムに角度を選ぶ
servo.set_angle(angle) # サーボモーターを動かしてゴールキーパーを移動
time.sleep_ms(200) # 次に動かすまで200ミリ秒だけ間を空ける
動作を確認するときは、このプログラムを実行して、ターミナルからgoal_keeper()
関数を呼び出してください。確認が終わり、動作を停止したい場合はStuduino:bitのリセットボタンを押してください。
■ ボールがゴールに入ったかどうかを判定する処理
赤外線フォトリフレクタの正面をボール(白色のブロック)が通過すると、そこで赤外線が反射されるため、赤外線フォトリフレクタから得られる数値が大きくなります。この変化を監視して、ボールがゴールに入ったかどうかを判定します。
また、ボールがゴール入った場合は、ゴールキーパーの動きも停止させます。そのため、グローバル変数を用意して、他のスレッドと情報を共有するようにします。
では、以上のことを踏まえてjudgement()
関数として処理をまとめましょう。
追加・変更【3行目、5行目、11行目、17行目~27行目】
import time
import random
from pyatcrobo2.parts import Servomotor, IRPhotoReflector # 赤外線フォトリフレクタの制御用クラスをインポート
is_goal = False # ゴールされたかどうかを共有するための変数(ゴールされた場合はTrue)
def goal_keeper():
servo = Servomotor("P13")
while not is_goal: # ゴールされると動作を停止する。つまり、Falseの場合のみ継続する
angle = random.randint(6, 12) * 10
servo.set_angle(angle)
time.sleep_ms(200)
# ボールがゴールに入ったかどうかを判定する処理
def judgement():
global is_goal # グローバル変数を関数内で書き換えるためのグローバル宣言
irp = IRPhotoReflector("P0")
threshold = irp.get_value() + 200 # 最初に取得した、正面に何もないときの値からしきい値を設定
while not is_goal: # ゴールされるまで動作を繰り返す
time.sleep_ms(1) # 1ミリ秒おきに赤外線フォトリフレクタの値を取得する
val = irp.get_value()
if val > threshold: # しきい値よりも大きくなった場合はボールが通過したと判断
is_goal = True
グローバル変数としてis_goal
を用意したので、11行目では、これがTrue
に書き換えらるとゴールキーパーの動作も停止するようにwhile
文の条件を変更しています。
また、21行目では最初に取得した赤外線フォトリフレクタの値から、ボールが正面を通過したと判断する基準となるしきい値を決めています(※)。もちろん、あらかじめボールが正面にあるときの赤外線フォトリフレクタの値を調べて、しきい値を決めても良いですが、このように自動的に決定する方法もあることを覚えておきましょう。
※ 赤外線フォトリフレクタの値はサーボモーターの動きによる振動で値が微小な範囲で常に変化しています。そのため、しきい値を決めるときは、最初に調べた値でそのまま設定するのではなく、200のように少し大きな数値を足すことで、振動が影響していないはっきりとした変化が起きていることを確定させ、ゴールを判定できるように調整しています。
そして、特に気を付けておきたいのが24行目です。赤外線フォトリフレクタの値を待ち時間なく取得していると、このスレッドだけで計算資源を占有してしまう恐れがあります。そのため、このように短い待ち時間を設けています。
では、この2つの関数をそれぞれスレッドを立ち上げて実行してみましょう。
追加・変更【3行目、31行目、32行目】
import time
import random
import _thread # マルチスレッド機能のモジュールを新たにインポート
from pyatcrobo2.parts import Servomotor, IRPhotoReflector
is_goal = False
def goal_keeper():
servo = Servomotor("P13")
while not is_goal:
angle = random.randint(6, 12) * 10
servo.set_angle(angle)
time.sleep_ms(200)
def judgement():
global is_goal
irp = IRPhotoReflector("P0")
threshold = irp.get_value() + 200
while not is_goal:
time.sleep_ms(1)
val = irp.get_value()
if val > threshold:
is_goal = True
# スレッドの立ち上げて実行
_thread.start_new_thread(goal_keeper, ())
_thread.start_new_thread(judgement, ())
ではこのプログラムを実行して、ゴールを決めるとキーパーの動作が停止することを確認しましょう。この時点でゲームの中心となる2つの機能ができました。ここからは、ゲームをさらに盛り上げるための機能を追加していきましょう。
■ メロディを繰り返し流す処理
今度は、ブザーから簡単なメロディを繰り返し流す処理をplay_melody()
関数として用意します。好きなメロディをタプルにまとめ、ループ処理で1音ずつ鳴らすようにしましょう。
追加・変更【5行目、31行目~58行目、63行目】
import time
import random
import _thread
from pyatcrobo2.parts import Servomotor, IRPhotoReflector
from pystubit.board import buzzer # ブザーを使用するため、オブジェクトをインポート
def judgement():
global is_goal
irp = IRPhotoReflector("P0")
threshold = irp.get_value() + 200
while not is_goal:
time.sleep_ms(1)
val = irp.get_value()
if val > threshold:
is_goal = True
# 繰り返しメロディを流す処理
def play_melody():
# ブザーから流すメロディの定義、以下は例です。
melody = (
("A5" , 600), # 音の高さと長さをタプルにまとめます。
("C6" , 900),
("A5" , 300),
("C6" , 300),
("A5" , 300),
("C6" , 300),
("A5" , 300),
("F5" , 1200),
("A5" , 300),
("A5" , 300),
("A5" , 300),
("G5" , 1200),
("G5" , 300),
("G5" , 300),
("G5" , 300),
("A5" , 1200),
("F5" , 600),
)
# 順番に音を鳴らす処理
while not is_goal: # ゴールが決まるとループを抜ける
for sound in melody: # 1音ずつ取り出して再生
buzzer.on(sound[0], duration=sound[1])
if is_goal: # ゴールが決まった場合、途中でfor文のループを抜ける
break
_thread.start_new_thread(goal_keeper, ())
_thread.start_new_thread(judgement, ())
_thread.start_new_thread(play_melody, ()) # 新たにスレッドの立ち上げて実行
ループ処理で1音ずつ鳴らすのは、ゴールされたときにメロディの途中でも再生を停止できるようにするためです。実際に、58行目のbreak
文でループを抜けるようになっています。
■ 電光掲示板にメッセージを繰り返し表示する処理
続けて、電光掲示板として使うLEDディスプレイに、文字列を順番に表示する処理をbulltin_board()
関数としてまとめます。文字列もタプルやリストと同じくシーケンスなデータであることを利用して、for
文で1文字ずつ取り出して表示しましょう。
※ 「bulltin board」は英語で「掲示板」を表す言葉です。
追加・変更【5行目、60行目~68行目、74行目】
import time
import random
import _thread
from pyatcrobo2.parts import Servomotor, IRPhotoReflector
from pystubit.board import buzzer, display # LEDディスプレイを使用するため、オブジェクトをインポート
while not is_goal:
for sound in melody:
buzzer.on(sound[0], duration=sound[1])
if is_goal:
break
# 電光掲示板にメッセージを繰り返し表示する処理
def bulltin_board():
message = "PKGAME" # 表示するメッセージ。何でも構いません。
# 順番にメッセージの文字を表示するループ処理
while not is_goal:
for word in message: # メッセージから順番に文字を取り出す
display.show(word)
if is_goal: # ゴールが決まった場合、for文のループ処理を途中で抜ける
break
_thread.start_new_thread(goal_keeper, ())
_thread.start_new_thread(judgement, ())
_thread.start_new_thread(play_melody, ())
_thread.start_new_thread(bulltin_board, ()) # 新たにスレッドの立ち上げて実行
ここまでで、最初に確認した4つの機能ができました。最後に「時間の経過を監視する処理」と「ゲームの結果を表示する処理」、それから「ボタンAを押すとゲームを開始する処理」を順番に追加してゲームを完成させましょう。
■ 時間の経過を監視する処理
ここでは一旦、制限時間を「30秒(30000ミリ秒)」に設定します。プログラムの先頭で定数として定義しておきましょう。ゲームの難易度を変更したい場合は、あとからこの数値を調整してください。
追加【7行目】
import time
import random
import _thread
from pyatcrobo2.parts import Servomotor, IRPhotoReflector
from pystubit.board import buzzer, display
LIMIT_TIME = 30000 # 制限時間を30秒(30000ミリ秒)に設定
準備してきた4つの機能をスレッドの立ち上て実行する処理も含め、ゲームの開始時に行うすべての処理をひとつの関数にまとめます。この関数内で、ゲームの状態のリセットと時間の経過を監視します。時間が経過して制限時間を超えた場合も、ゴールが決まった場合と同様に各機能の動作を停止させる必要があります。そのため、この情報をスレッド間で共有するためのグローバル変数も用意しましょう。
以上のことを踏まえて、start_game()
関数として処理をまとめましょう。
追加・変更【10行目、70行目~88行目】
import time
import random
import _thread
from pyatcrobo2.parts import Servomotor, IRPhotoReflector
from pystubit.board import buzzer, display
LIMIT_TIME = 30000
is_goal = False
over_limit_time = False # 制限時間を超えたかどうかを共有するための変数
def bulltin_board():
message = "PKGAME"
while not is_goal:
for word in message:
display.show(word)
if is_goal:
break
# ゲームの開始時に行うすべての処理をまとめた関数
def start_game():
global is_goal, over_limit_time
# ゲームの状態をリセット
is_goal = False
over_limit_time = False
# スレッドを立ち上げて各関数を実行するコードはここに移動
_thread.start_new_thread(goal_keeper, ())
_thread.start_new_thread(judgement, ())
_thread.start_new_thread(play_melody, ())
_thread.start_new_thread(bulltin_board, ())
start_time = time.ticks_ms() # ゲームの開始時間を記録
# ゴールが決まるか、制限時間を超えるまで監視するループ処理
while not is_goal and not over_limit_time:
# ゲーム開始から経過した時間と制限時間を比較
if time.ticks_diff(time.ticks_ms(), start_time) > LIMIT_TIME:
over_limit_time = True # 制限時間を超えたことをグローバル変数を介して他のスレッドと共有
制限時間を超えたことを共有する変数を用意したので、各機能もこの情報で動作を停止できるようにコードを変更しましょう。
変更【16行目、28行目、56行目、59行目、66行目、69行目】
def goal_keeper():
servo = Servomotor("P13")
while not is_goal and not over_limit_time: # どちらもFalseの場合のみ継続
angle = random.randint(6, 12) * 10
servo.set_angle(angle)
time.sleep_ms(200)
def judgement():
global is_goal
irp = IRPhotoReflector("P0")
threshold = irp.get_value() + 200
while not is_goal and not over_limit_time: # どちらもFalseの場合のみ継続
time.sleep_ms(1)
val = irp.get_value()
if val > threshold:
is_goal = True
def play_melody():
melody = (
("A5" , 600),
("C6" , 900),
("A5" , 300),
("C6" , 300),
("A5" , 300),
("C6" , 300),
("A5" , 300),
("F5" , 1200),
("A5" , 300),
("A5" , 300),
("A5" , 300),
("G5" , 1200),
("G5" , 300),
("G5" , 300),
("G5" , 300),
("A5" , 1200),
("F5" , 600),
)
while not is_goal and not over_limit_time: # どちらもFalseの場合のみ継続
for sound in melody:
buzzer.on(sound[0], duration=sound[1])
if is_goal or over_limit_time: # どちらかが成り立てば抜ける必要があるので、or演算子で条件を追加
break
def bulltin_board():
message = "PKGAME"
while not is_goal and not over_limit_time: # どちらもFalseの場合のみ継続
for word in message:
display.show(word)
if is_goal or over_limit_time: # どちらかが成り立てば抜ける必要があるので、or演算子で条件を追加
break
■ ゲームの結果を表示する処理
次は、結果を表示する処理を追加します。ゴールが決まりって成功した場合は、笑顔のイメージを表示して高い音を鳴らします。反対に失敗した場合は、悲しい顔のイメージを表示して低い音を鳴らしましょう。
上の2つのイメージは、StuduinoBitImage
(Image
)クラスにあらかじめ用意されているので、それを使いましょう。
追加・変更【5行目、94行目~103行目】
import time
import random
import _thread
from pyatcrobo2.parts import Servomotor, IRPhotoReflector
from pystubit.board import buzzer, display, Image # Imageクラスを追加でインポート
start_time = time.ticks_ms()
while not is_goal and not over_limit_time:
if time.ticks_diff(time.ticks_ms(), start_time) > LIMIT_TIME:
over_limit_time = True
# 結果を表示する処理
def show_result():
if is_goal: # ゴールが決まり、成功したとき
display.show(Image.HAPPY, color=(0, 31, 0)) # 緑色で表示
buzzer.on("C7", duration=200) # 成功音
buzzer.on("A6", duration=600)
else: # 制限時間を超えてしまい、失敗したとき
display.show(Image.SAD, color=(0, 0, 31)) # 青色で表示
buzzer.on("C4", duration=800) # 失敗音
time.sleep_ms(1000) # 少し時間を空けてからLEDディスプレイの表示を消す
display.clear()
結果を表示する関数が定義できたので、start_game()
関数の最後に呼び出すようにしましょう。
追加【93行目、94行目】
def start_game():
global is_goal, over_limit_time
is_goal = False
over_limit_time = False
for num in range(3, -1, -1):
display.show(str(num))
_thread.start_new_thread(goal_keeper, ())
_thread.start_new_thread(judgement, ())
_thread.start_new_thread(play_melody, ())
_thread.start_new_thread(bulltin_board, ())
start_time = time.ticks_ms()
while not is_goal and not over_limit_time:
if time.ticks_diff(time.ticks_ms(), start_time) > LIMIT_TIME:
over_limit_time = True
time.sleep_ms(1000) # ゲーム終了後、少し時間をあけてから結果を表示する
show_result()
■ ボタンAを押すとゲームを開始する処理
あとは、start_game()
関数をボタンAが押されたときに実行されるようにできたらプログラムの完成です。ゲーム開始までカウントダウンを行う処理も加えて、最後のコードを書き加えましょう。
追加【5行目、108行目~110行目】
import time
import random
import _thread
from pyatcrobo2.parts import Servomotor, IRPhotoReflector
from pystubit.board import buzzer, display, Image, button_a # ボタンAのオブジェクトをインポート
def show_result():
if is_goal:
display.show(Image.HAPPY, color=(0, 31, 0))
buzzer.on("C7", duration=200)
buzzer.on("A6", duration=600)
else:
display.show(Image.SAD, color=(0, 0, 31))
buzzer.on("C4", duration=800)
time.sleep_ms(1000)
display.clear()
# ボタンAが押されるとゲームを開始する処理
while True:
if button_a.is_pressed():
# 3,2,1,0とカウントダウンを行う処理
for num in range(3, -1, -1):
display.show(str(num))
start_game()
完成したプログラムを実行して、遊んでみましょう。ゲームの難易度を変更したい場合は、「制限時間」や「サーボモーターを動かす時間の間隔」「キーパーとして使用するブロックの数を2個にする」などで調整してみてください。
もし、プログラムのエラーが改善できない場合は、以下のサンプルコードと見比べて修正しましょう。
【 サンプルコード 4-3-1 】
import time
import random
import _thread
from pyatcrobo2.parts import Servomotor, IRPhotoReflector
from pystubit.board import buzzer, display, Image, button_a
LIMIT_TIME = 30000
is_goal = False
over_limit_time = False
def goal_keeper():
servo = Servomotor("P13")
while not is_goal and not over_limit_time:
angle = random.randint(6, 12) * 10
servo.set_angle(angle)
time.sleep_ms(200)
def judgement():
global is_goal
irp = IRPhotoReflector("P0")
threshold = irp.get_value() + 200
while not is_goal and not over_limit_time:
time.sleep_ms(1)
val = irp.get_value()
if val > threshold:
is_goal = True
def play_melody():
melody = (
("A5" , 600),
("C6" , 900),
("A5" , 300),
("C6" , 300),
("A5" , 300),
("C6" , 300),
("A5" , 300),
("F5" , 1200),
("A5" , 300),
("A5" , 300),
("A5" , 300),
("G5" , 1200),
("G5" , 300),
("G5" , 300),
("G5" , 300),
("A5" , 1200),
("F5" , 600),
)
while not is_goal and not over_limit_time:
for sound in melody:
buzzer.on(sound[0], duration=sound[1])
if is_goal or over_limit_time:
break
def bulltin_board():
message = "PKGAME"
while not is_goal and not over_limit_time:
for word in message:
display.show(word)
if is_goal or over_limit_time:
break
def start_game():
global is_goal, over_limit_time
is_goal = False
over_limit_time = False
_thread.start_new_thread(goal_keeper, ())
_thread.start_new_thread(judgement, ())
_thread.start_new_thread(play_melody, ())
_thread.start_new_thread(bulltin_board, ())
start_time = time.ticks_ms()
while not is_goal and not over_limit_time:
if time.ticks_diff(time.ticks_ms(), start_time) > LIMIT_TIME:
over_limit_time = True
time.sleep_ms(1000)
show_result()
def show_result():
if is_goal:
display.show(Image.HAPPY, color=(0, 31, 0))
buzzer.on("C7", duration=200)
buzzer.on("A6", duration=600)
else:
display.show(Image.SAD, color=(0, 0, 31))
buzzer.on("C4", duration=800)
time.sleep_ms(1000)
display.clear()
while True:
if button_a.is_pressed():
for num in range(3, -1, -1):
display.show(str(num))
start_game()
チャプター5
課題:マルチスレッドを使わないプログラムの作成
ここでは課題として、さきほどマルチスレッド機能を利用して作成したPKゲームのプログラムの一部機能を、マルチスレッドを使わないコードに書き替えてみましょう。
対象とする機能はgoal_keeper()
関数とjudgement()
関数の2つです。これらをコメント化して、代わりとなるコードをstart_game()
関数内に書いていきましょう。
""" # コメント化
def goal_keeper():
servo = Servomotor("P13")
while not is_goal and not over_limit_time:
angle = random.randint(6, 12) * 10
servo.set_angle(angle)
time.sleep_ms(200)
def judgement():
global is_goal
irp = IRPhotoReflector("P0")
threshold = irp.get_value() + 200
while not is_goal and not over_limit_time:
time.sleep_ms(1)
val = irp.get_value()
if val > threshold:
is_goal = True
"""
def start_game():
global is_goal, over_limit_time
is_goal = False
over_limit_time = False
# _thread.start_new_thread(goal_keeper, ()) # ここもコメント化
# _thread.start_new_thread(judgement, ()) # ここもコメント化
_thread.start_new_thread(play_melody, ())
_thread.start_new_thread(bulltin_board, ())
start_time = time.ticks_ms()
while not is_goal and not over_limit_time:
if time.ticks_diff(time.ticks_ms(), start_time) > LIMIT_TIME:
over_limit_time = True
time.sleep_ms(1000)
show_result()
5. 1 プログラムの変更手順
スレッドを立ち上げて関数を実行できないため、代わりにstart_game()
関数内のメインのループ処理(85行目のwhile
文)の中に「ゴールキーパーをランダムに動かす機能」と「ゴールを判定する機能」を実現するコードを書くことになります。
start_time = time.ticks_ms()
while not is_goal and not over_limit_time:
if time.ticks_diff(time.ticks_ms(), start_time) > LIMIT_TIME:
over_limit_time = True
まずは、「ゴールを判定する機能」から作成していきましょう。基本的にはjudgement()
関数の中身を移植するだけで終わりです。変更は1点だけあり、赤外線フォトリフレクタの値を取得する周期を1ミリ秒から10ミリ秒に変えてください。1ミリ秒のままでもエラーにはなりませんが、少しサーボモーターの動作に遅れが出てしまいます。
追加・変更【76・77行目、89行目~92行目】
def start_game():
global is_goal, over_limit_time
irp = IRPhotoReflector("P0") # 赤外線フォトリフレクタのオブジェクトを作成
threshold = irp.get_value() + 200 # しきい値を設定
is_goal = False
over_limit_time = False
# _thread.start_new_thread(goal_keeper, ())
# _thread.start_new_thread(judgement, ())
_thread.start_new_thread(play_melody, ())
_thread.start_new_thread(bulltin_board, ())
start_time = time.ticks_ms()
while not is_goal and not over_limit_time:
time.sleep_ms(10) # 1ミリ秒から10秒おきのチェックに変更
val = irp.get_value()
if val > threshold:
is_goal = True
if time.ticks_diff(time.ticks_ms(), start_time) > LIMIT_TIME:
over_limit_time = True
time.sleep_ms(1000)
show_result()
次に「ゴールキーパーをランダムに動かす機能」を移植します。ここに少し工夫が必要です。サーボモーターは200ミリ秒おきに動かしますが、それを以下のようにgoal_keeper()
からそのまま移植してしまうと、赤外線フォトリフレクタの値を取得するときの遅延も重なり、どちらも210ミリ秒おきに実行されることになってしまいます。
【 そのまま移植して失敗する例 】
start_time = time.ticks_ms()
while not is_goal and not over_limit_time:
time.sleep_ms(10) # この遅延はサーボモーターを動かす処理にも影響する
val = irp.get_value()
if val > threshold:
is_goal = True
angle = random.randint(6, 12) * 10 # ここにそのまま追加
servo.set_angle(angle)
time.sleep_ms(200) # この遅延は赤外線フォトリフレクタの値の取得にも影響する
if time.ticks_diff(time.ticks_ms(), start_time) > LIMIT_TIME:
over_limit_time = True
そのため、遅延処理は89行目のtime.sleep_ms(10)
だけに留めておき、200ミリ秒の遅延は新たに変数を用意してループの回数を数えることで代替します。以下のようにコードを書き替えましょう。
追加・変更【76行目、88行目、98行目~101行目】
def start_game():
global is_goal, over_limit_time
servo = Servomotor("P13") # サーボモーターのオブジェクトを作成
irp = IRPhotoReflector("P0")
threshold = irp.get_value() + 200
is_goal = False
over_limit_time = False
# _thread.start_new_thread(goal_keeper, ())
# _thread.start_new_thread(judgement, ())
_thread.start_new_thread(play_melody, ())
_thread.start_new_thread(bulltin_board, ())
count = 0 # ループした回数を数えるための変数
start_time = time.ticks_ms()
while not is_goal and not over_limit_time:
count += 1 # ループの回数をカウントする
time.sleep_ms(10)
val = irp.get_value()
if val > threshold:
is_goal = True
if count == 20: # ループの回数が20回に達したときが200ミリ秒の経過と判断できる
angle = random.randint(6, 12) * 10
servo.set_angle(angle)
count = 0 # 回数をリセットし、ふたたび20を数え直す
if time.ticks_diff(time.ticks_ms(), start_time) > LIMIT_TIME:
over_limit_time = True
time.sleep_ms(1000)
show_result()
では、変更したコードを実行して同じように動作することを確認しましょう。このプログラムの全体は以下のようになります。(転送し実行してください)
【 サンプルコード 5-1-1 】
import time
import random
import _thread
from pyatcrobo2.parts import Servomotor, IRPhotoReflector
from pystubit.board import buzzer, display, Image, button_a
LIMIT_TIME = 30000
is_goal = False
over_limit_time = False
"""
def goal_keeper():
servo = Servomotor("P13")
while not is_goal and not over_limit_time:
angle = random.randint(6, 12) * 10
servo.set_angle(angle)
time.sleep_ms(200)
def judgement():
global is_goal
irp = IRPhotoReflector("P0")
threshold = irp.get_value() + 200
while not is_goal and not over_limit_time:
time.sleep_ms(1)
val = irp.get_value()
if val > threshold:
is_goal = True
"""
def play_melody():
melody = (
("A5" , 600),
("C6" , 900),
("A5" , 300),
("C6" , 300),
("A5" , 300),
("C6" , 300),
("A5" , 300),
("F5" , 1200),
("A5" , 300),
("A5" , 300),
("A5" , 300),
("G5" , 1200),
("G5" , 300),
("G5" , 300),
("G5" , 300),
("A5" , 1200),
("F5" , 600),
)
while not is_goal and not over_limit_time:
for sound in melody:
buzzer.on(sound[0], duration=sound[1])
if is_goal or over_limit_time:
break
def bulltin_board():
message = "PKGAME"
while not is_goal and not over_limit_time:
for word in message:
display.show(word)
if is_goal or over_limit_time:
break
def start_game():
global is_goal, over_limit_time
servo = Servomotor("P13")
irp = IRPhotoReflector("P0")
threshold = irp.get_value() + 200
is_goal = False
over_limit_time = False
# _thread.start_new_thread(goal_keeper, ())
# _thread.start_new_thread(judgement, ())
_thread.start_new_thread(play_melody, ())
_thread.start_new_thread(bulltin_board, ())
count = 0
start_time = time.ticks_ms()
while not is_goal and not over_limit_time:
count += 1
time.sleep_ms(10)
val = irp.get_value()
if val > threshold:
is_goal = True
if count == 20:
angle = random.randint(6, 12) * 10
servo.set_angle(angle)
count = 0
if time.ticks_diff(time.ticks_ms(), start_time) > LIMIT_TIME:
over_limit_time = True
time.sleep_ms(1000)
show_result()
def show_result():
if is_goal:
display.show(Image.HAPPY, color=(0, 31, 0))
buzzer.on("C7", duration=200)
buzzer.on("A6", duration=600)
else:
display.show(Image.SAD, color=(0, 0, 31))
buzzer.on("C4", duration=800)
time.sleep_ms(1000)
display.clear()
while True:
if button_a.is_pressed():
for num in range(3, -1, -1):
display.show(str(num))
start_game()
そこまで大変な変更ではありませんでしたが、遅延処理のタイミングを合わせるのに、変数count
を用意するなど、変則的なコードを書くことになってしまいました。では、ここへさらに「play_melody()
関数とbulltin_board()
関数もマルチスレッド機能を使わずに書き直してください」と言われたらどうでしょう。相当面倒な作業が待っていることが何となく予測できますね。この課題の取り組みからも、マルチスレッド機能がいかに便利で、上手く利用することで、コードをよりシンプルにまとめることができることが理解できたのではないでしょうか。
チャプター6
おわりに
6. 1 このレッスンのまとめ
このレッスンでは、新たに「マルチスレッド機能」について学習しました。マルチスレッド機能を利用すると、複数の処理の流れを同時に平行して進めることができ、1つの処理の流れでは複雑なコードを書かなければならない動作も、よりシンプルなコードで実現することができました。
マルチスレッドは便利な反面、計算負荷が大きくなったり、複数の流れがあるため、思わぬところで不具合が発生した場合に原因が特定しにくくなったりといった短所もあります。そのため、むやみに利用するのではなく、あらかじめ処理の流れを図などに表して整理した上で、必要なときだけ使うようにしましょう。
6. 2 次のレッスンについて
次回のレッスンでは、マルチスレッドのプログラムにおいて、スレッド間で連携しながらひとつの仕事を行うときに用いられるデータの受け渡し方法について新たに学習します。