Pythonロボティクスコース レッスン 36
テーマ.9-3 スタックやキューを利用したタスク間の連携処理
複数のタスクで情報の受け渡しを行いながら全体の処理を進める方法を学ぼう!
チャプター1
このレッスンで学ぶこと
割り込み処理やマルチスレッドのように、並行処理のプログラムでは、複数のタスク(※)の間でデータの受け渡しを行いながら、ひとつの仕事を進めるプログラムを組むことができます。タスク間でのデータの受け渡しには、グローバルスコープにあるグローバル変数が用いられ、データを一時的に貯めておく保管庫のような役割を担います。今回のレッスンでは、「スタック」と「キュー」という上記の役割で用いられる2種類のデータ構造について学習し、複数のタスクが連携してブロックの運搬と分別を行う装置をつくります。
タスク・・・コンピュータ用語では「処理される作業の最小単位」を表します。
チャプター2
タスク間のデータの受け渡しについて
まずは実生活を例にして、複数のタスク間でデータを受け渡しながらひとつの仕事を行うときの処理の流れを見ていきましょう。
例えば、ファストフード店の仕事はまさにマルチタスクです。お客が商品を注文して受け取るまでの間に、店舗側では複数のスタッフが連携しながらそれぞれが担当するタスクを行っています。
この図の「注文内容」や「料理」、「商品」がタスク間で受け渡しされるデータや品物になります。実際に、この流れに沿ってそれぞれがタスクを進めていくときは、あることに注意しなければいけません。それは、データや品物を渡す側と受け取る側のタイミングは必ずしも一致しないということです。
例えば、調理担当は調理中に新たな注文内容を伝えられても、それをすぐに受け取って調理に取り掛かることはできません。一方で、注文口担当は調理担当に注文内容を受け取ってもらえないと、会計や次のお客の注文を受けることができず時間をロスしてしまいます。
この問題の解決方法はシンプルで、「注文内容を一時的に貯めておく場所を用意する」ことです。注文口担当は注文内容をその場所に貯めることで、すぐに次の作業に取り掛かることができ、調理担当は今の調理を終えたら、その場所へ次の注文内容を取りに行くだけで済みます。ファストフード店だけでなく、飲食店ならどこもこの方法を取り入れているのではないでしょうか。
コンピュータにおいて、タスク間で連携して処理を進めるときのデータの受け渡し方法も、基本はこれと同じです。そして、受け渡しを行うときに利用する代表的なデータ構造が次で説明する「スタック」と「キュー」です。
2. 1 スタックとキューのデータ構造
「スタック」や「キュー」のデータ構造はとてもシンプルで、どちらもデータの入口と出口が限定されたリストと捉えることができます。
■ スタックのデータ構造
スタック(stack)は、データの入口と出口が同じ1つの箇所に限定されたデータ構造です。そのため、スタック内のデータは最後に入れられたものから取り出されます。
このような特徴から、スタックは後入れ先出しのLIFO(Last-in First-out)と呼ばれています。
現実では、商品の入ったダンボールを倉庫に積み上げて保管し、それを取り出す作業がスタックにあたります。
■ キューのデータ構造
一方でキュー(que)は、データの入口と出口が分かれています。そのため、キュー内のデータは最初に入れられたものから順番に取り出されます。
このような特徴から、キューは先入れ先出しのFIFO(First-in First-out)と呼ばれています。
※ところてんと同じ。キューと押し出す!
現実では、上で取り挙げたファストフード店の例キューにあたります。もし、スタックを利用して、注文内容の受け渡しをしてしまうと、最初に注文したお客はいつまで経っても商品を受け取ることができません。
2. 2 スタックとキューを利用したサンプルプログラム
実は、Pythonにはスタックやキューを扱うためのcollections
モジュールが用意されていますが、MicroPythonのcollections
はこれらに対応していません。そのため、代わりにリスト型のデータを使いスタックとキューの振る舞いを再現します。それでは、サンプルプログラムを通して、スタックとキューの使い方を見ていきましょう。
■ スタック(stack)の使い方
list
型を使い、データを入れるときはappend()
メソッドを、データを取り出すときはpop()
メソッドを利用すると、スタックとしての振る舞いになります。
append()
メソッド ・・・ リストの末尾に要素を追加します。pop()
メソッド ・・・ 指定した位置の要素をリストから削除し、その値を取得します。引数を省略した場合は末尾の要素が対象となります。
それでは、データの受け渡しを行う次の2つの関数を用意します。
関数名 | 処理内容 |
---|---|
send_data() | 1秒に1回データをスタックへ渡す処理。1~9の数字を順番に渡し終えたら処理を終了する。 |
receive_data() | 2秒に1回データをスタックから受け取って表示する処理。スタック内のデータを確認して、3回連続して空だった場合は処理を終了する。 |
それぞれ以下のようにコードを書きましょう。
import time # 時間を計るためにtimeモジュールを使用
stack = [] # スタック用のリスト
def send_data(): # データを渡す関数
global stack
num = 0 # 1~9の数字を1秒おきにスタックへ入れる
while num < 9:
time.sleep_ms(1000)
num += 1
stack.append(num) # 末尾にデータを追加
def receive_data(): # データを受け取る関数
global stack
count = 0 # スタックが空だった回数を数えるための変数
while count < 3: # 3回連続で空だったときまで繰り返す
time.sleep_ms(2000)
if len(stack): # スタックが空かどうかのチェック
data = stack.pop() # 末尾からデータを取り出す
print(data) # 取り出したデータを表示
count = 0 # スタックからデータを受け取れた場合は回数をリセット
else:
count += 1 # スタックが空だった場合は1回カウント
print("Finished.") # 処理を終える
12行目のstack.append()
では末尾にデータを追加し、22行目のstack.pop()
では末尾からデータを取り出しています。このように、入口と出口が末尾で固定されているため、スタックとしての振る舞いになっています。
では、この2つの関数をマルチスレッド機能を使用して実行します。以下のようにコードを追加しましょう。
【 サンプルコード 2-3-1 】
追加【2行目、31・32行目】
import time
import _thread # マルチスレッド機能を使用するためのモジュール
stack = []
def send_data():
global stack
num = 0
while num < 9:
time.sleep_ms(1000)
num += 1
stack.append(num)
def receive_data():
global stack
count = 0
while count < 3:
time.sleep_ms(2000)
if len(stack):
data = stack.pop()
print(data)
count = 0
else:
count += 1
print("Finished.")
# それぞれの関数をスレッドを立ち上げて実行する
_thread.start_new_thread(send_data, ())
_thread.start_new_thread(receive_data, ())
完成したプログラムを実行してみましょう。次のような結果になるはずです。
(実行結果)
>>> 1 3 5 7 9 8 6 4 2 Finished.
スタックは最後に追加したデータから取り出されるため、send_data()
関数から渡した順番通りに数字が表示されるわけではありません。
■ キュー(que)の使い方
キューもスタックと同様に、list
型のappend()
メソッドと、pop()
メソッドを利用して、その振る舞いを再現します。ただし、要素は末尾ではなく先頭から取り出すため、pop()
メソッドの引数には「0」を指定します。
では、スタックのときに作成したサンプルプログラムの一部を、今度はキューを使用したコードへ書き換えて、実行結果の違いを見てみましょう。
【 サンプルコード 2-3-2 】
変更【4行目、7行目、13行目、17行目、22行目、23行目】
import time
import _thread
que = [] # キュー用のリストを作成
def send_data():
global que # キューに変更
num = 0
while num < 9:
time.sleep_ms(1000)
num += 1
que.append(num) # キューの末尾にデータを追加
def receive_data():
global que # キューに変更
count = 0
while count < 3:
time.sleep_ms(2000)
if len(que): # キューが空でない場合だけデータを取り出す
data = que.pop(0) # キューの場合は先頭から取り出す
print(data)
count = 0
else:
count += 1
print("Finished.")
_thread.start_new_thread(send_data, ())
_thread.start_new_thread(receive_data, ())
(実行結果)
>>> 1 2 3 4 5 6 7 8 9 Finished.
キューは、先に追加されたものからデータが取り出されるため、スタックとは違い、send_data()
関数から渡した順番通りに数字が表示されています。
チャプター3
分別ロボットの組立て
ここからは学習した内容を踏まえて、ロボットの製作を行います。今回は、運ばれてきたブロックを色で分別するロボットです。以下の組立説明書を確認して、ブロックを組み立てましょう。
3. 1 組み立てに必要なパーツ
【 パーツ一覧 】
- Studuino:bit×1
- ロボット拡張ユニット×1
- 電池ボックス×1
- DCモーター×1
- サーボモーター×2
- 赤外線フォトリフレクタ×1
- カラーセンサ×1
- センサー接続コード(3芯30cm)×1
- センサー接続コード(4芯30cm)×1
- ブロック基本四角(白)×5
- ブロック基本四角(グレー)×5
- ブロック基本四角(黒)×6
- ブロック基本四角(赤)×4
- ブロック三角(グレー)×4
- ブロック三角(赤)×2
- ブロックハーフA(グレー)×6
- ブロックハーフB(グレー)×2
- ブロックハーフB(黒)×2
- ブロックハーフC(白)×20
- ブロックハーフD(白)×8
- ステー×7
- 丸(目玉)×2
- ギヤ小×1
- ギヤ大×1
- ラック×1
- ギヤ用タイヤゴム×1
【 アーテックブロックの形状 】
3. 2 組立説明書
以下のリンク先から組立説明書を確認しましょう。
チャプター4
分別ロボットのプログラムの作成
組み立てたロボットが行う仕事は、次の3つの工程に分かれていて、これら3つの工程を経て1つずつブロックが分別されます。
下の動画で実際に動作している様子を確認しましょう。
【 振り分けロボットの動作動画 】
4. 1 プログラムの作成手順
上で確認した3つの工程に対してそれぞれ以下の名前で関数を用意し、別々のスレッドを立てて実行します。
関数名 | 工程 |
---|---|
work1_detect | 流れてきたブロックを検出する工程 |
work2_transport | ブロックを運搬する工程 |
work3_separate | ブロックを色で振り分ける工程 |
また、複数の工程で連携して仕事を進めていくために、キューを利用してブロックの受け渡しを行います。各工程間において、それぞれque_1to2
とque_2to3
という名前で専用のキューを用意します。
それでは、順番にコードを書いていきましょう。
■ ブロックを検出する処理
最初のブロックを検出する工程では、赤外線フォトリフレクタを使用し、その値を監視することで、ブロックが流れてきたことを検知します。
ブロックが赤外線フォトリフレクタの正面を流れていく間は次のように値が変化します。そのため、一度しきい値を超えて、その後でしきい値を下回った時点でブロックが通過したと判断できます。
そこで、フラグとなる変数を用意し、最初にしきい値を超えたタイミングでフラグを立て、その後でしきい値を下回るとこのフラグを降ろします。そして、フラグを降ろしたタイミングで検出したブロックに1番から順番に番号を割り振って、その情報をキューに保存し、次の工程へ渡します。
ここまでの一連の処理をコードで書くと次のようになります。
import time
from pyatcrobo2.parts import IRPhotoReflector # 赤外線フォトリフレクタのクラス
que_1to2 = [] # work1からwork2へブロックの情報を渡すためのキュー
# ブロックの通過を検出する関数
def work1_detect():
irq = IRPhotoReflector("P0") # 赤外線フォトリフレクタのオブジェクトを作成
threshold = irq.get_value() + 100 # 起動直後の値からしきい値を自動で決める
is_passing = False # ブロックが通過中かどうかを示すフラグ
num = 0 # 検出したブロックに対して1から番号をつける
while True: # 10ミリ秒おきに赤外線フォトリフレクタの値を取得する
time.sleep_ms(10)
val = irq.get_value()
# 通過中でなければ、新しくブロックが流れてきたと判断してフラグを立てる
if val > threshold and not is_passing:
is_passing = True
elif val < threshold and is_passing: # 通過中にしきい値を下回る変化がおきたら
is_passing = False # 通過し終えたとして判断してフラグを降ろす
num += 1 # 番号を割り振る
que_1to2.append(num) # 割り振った番号をブロックの情報としてキューに保存
9行目のthreshold = irq.get_value() + 100
は、前のレッスンで製作したPKゲームでも行ったように、何もないときの赤外線フォトリフレクタの値からしきい値を自動で決める処理になっています。ここは、あらかじめ正面にブロックがあるときとないときの赤外線フォトリフレクタの値を調べておき、そこから決めたしきい値を代入しても構いません。
■ ブロックを運搬する処理
2番目の工程では、2つのサーボモーターを順番に動かして、3番目の工程へブロックを運び、キューに情報を保存します。
また、この工程は3番目の工程と密接に連携しているため、3番目の工程が完了するまで次のブロックを運ぶことができません。
そのため、キューが空になるまで待機してから元のリフトを元の位置に戻し、次の運搬を行います。
では、これらの動作を行うコードを書いていきましょう。
プログラムの先頭では、サーボモーターの制御用クラスのインポートと、3番目の工程と情報を共有するためのキューを用意します。
追加【4行目、7行目】
import time
import _thread
from pyatcrobo2.parts import IRPhotoReflector
from pyatcrobo2.parts import Servomotor # サーボモーターのオブジェクトをインポート
que_1to2 = []
que_2to3 = [] # work2からwork3へタスクを渡すためのキュー
2つのサーボモーターを順番に制御するコードをwork2_transport()
関数としてまとめます。プログラムの実行直後に、各サーボモーターを初期位置に移動させたあと、while
文を使ったループ処理で100ミリ秒おきに、工程1と共有しているキューque_1to2
を確認します。このキューにブロックの情報がある場合は受け取って工程3へ運びます。工程3へ運んだあとは、キューque_2to3
に情報を渡し、工程3でブロックが分別されるまで待機します。分別が終わると、リフトを下げて元の位置に戻します。
追加【28行目~53行目】
def work2_transport():
servo13 = Servomotor("P13") # サーボモーターのオブジェクトを作成
servo14 = Servomotor("P14")
# 初期位置に移動(1)
servo13.set_angle(180)
servo14.set_angle(75)
# 100ミリ秒おきにキューを確認
while True:
time.sleep_ms(100)
if len(que_1to2): # キューにタスクがある場合は受け取って、そのブロックを運ぶ
num = que_1to2.pop(0) # キューからブロックの情報を受け取る
time.sleep_ms(500) # 検出から流れてくるまでに掛かる時間を考慮
servo14.set_angle(145) # ブロックを受け取る(2)
time.sleep_ms(1000) # サーボモーターが動き終えるまで1秒時間を空ける
servo14.set_angle(75) # ブロックをリフトへ移す(3)
time.sleep_ms(1000)
servo13.set_angle(5) # リフトを上げて工程3へ運ぶ(4)
time.sleep_ms(1000)
que_2to3.append(num) # キューにブロックの情報を追加
while len(que_2to3): # 次の工程がタスクを完了する(キューが空になる)まで待つ
time.sleep_ms(100) # 計算負荷を掛け過ぎないように100ミリ秒おきに確認
pass
servo13.set_angle(180) # リフトを下げる
time.sleep_ms(1000)
■ ブロックを分別する処理
この工程では、キューque_2to3
に情報があれば、カラーセンサでブロックの色を調べ、DCモーターを回転して左右に振り分けます。赤色と緑色のブロックは右側へ、黄色と青色のブロックは左側へ運びます。
また、分別が終わったあとは、ターミナルへ番号と色名を表示します。
これらの処理をコードで書くと次のようになります。
プログラムの先頭では、新たにカラーセンサとDCモーター用のクラスをインポートします。
追加【3・4行目】
import time
import _thread
from pyatcrobo2.parts import IRPhotoReflector, ColorSensor # カラーセンサ用クラスのインポート
from pyatcrobo2.parts import Servomotor, DCMotor # DCモーター用クラスのインポート
色を調べて分別する処理はwork3_separate()
関数としてまとめます。100ミリ秒おきに、キューque_2to3
を確認して、情報があればカラーセンサーで色を調べ、色によって異なる向きにDCモーターを回転させます。
追加【54行目~80行目】
def work3_separate():
cs = ColorSensor("I2C")
dcm = DCMotor("M1")
dcm.power(255)
# 100ミリ秒おきにキューを確認
while True:
time.sleep_ms(100)
if len(que_2to3):
color = cs.get_colorcode() # 色を調べる
if color == ColorSensor.COLOR_RED: # 赤色の場合
color_name = "red" # ターミナルへの表示用に色名も変数に記録
dcm.cw()
elif color == ColorSensor.COLOR_GREEN: # 緑色の場合
color_name = "green"
dcm.cw()
elif color == ColorSensor.COLOR_YELLOW: # 黄色の場合
color_name = "yellow"
dcm.ccw()
else:
color_name = "blue" # 青色の場合
dcm.ccw()
time.sleep_ms(2000) # DCモーターは2秒間回転させて停止する
dcm.stop()
# 工程を終えたので結果を表示
num = que_2to3.pop(0) # 工程を終えてからキューより情報を削除
print("{}:{}".format(num, color_name)) # 番号と色名を表示
この工程では、分別の処理を終えてからキューque_2to3
より情報を取り出しています(79行目)。これによって分別を終えてから、工程2(work2_transport()
関数)でリフトが戻るようになっています。
■ 各関数をスレッドを立ち上げて実行する
これで、3つの工程に対して関数が用意できたので、スレッドを立ち上げて実行します。最後に以下のコードを追加して、プログラムを実行しましょう。
【 サンプルコード 4-1-1 】
追加【2行目、81行目~83行目】
import time
import _thread # マルチスレッド用モジュールのインポート
from pyatcrobo2.parts import IRPhotoReflector, ColorSensor
from pyatcrobo2.parts import Servomotor, DCMotor
que_1to2 = []
que_2to3 = []
def work1_detect():
irq = IRPhotoReflector("P0")
threshold = irq.get_value() + 100
is_passing = False
num = 0
while True:
time.sleep_ms(10)
val = irq.get_value()
if val > threshold and not is_passing:
is_passing = True
num += 1
que_1to2.append(num)
elif val < threshold and is_passing:
is_passing = False
def work2_transport():
servo13 = Servomotor("P13")
servo14 = Servomotor("P14")
servo13.set_angle(180)
servo14.set_angle(75)
while True:
time.sleep_ms(100)
if len(que_1to2):
num = que_1to2.pop(0)
time.sleep_ms(500)
servo14.set_angle(145)
time.sleep_ms(1000)
servo14.set_angle(75)
time.sleep_ms(1000)
servo13.set_angle(5)
time.sleep_ms(1000)
que_2to3.append(num)
while len(que_2to3):
time.sleep_ms(100)
pass
servo13.set_angle(180)
time.sleep_ms(1000)
def work3_separate():
cs = ColorSensor("I2C")
dcm = DCMotor("M1")
dcm.power(255)
while True:
time.sleep_ms(100)
if len(que_2to3):
color = cs.get_colorcode()
if color == ColorSensor.COLOR_RED:
color_name = "red"
dcm.cw()
elif color == ColorSensor.COLOR_GREEN:
color_name = "green"
dcm.cw()
elif color == ColorSensor.COLOR_YELLOW:
color_name = "yellow"
dcm.ccw()
else:
color_name = "blue"
dcm.ccw()
time.sleep_ms(2000)
dcm.stop()
num = que_2to3.pop(0)
print("{}:{}".format(num, color_name))
# スレッドを立ち上げて各関数を実行する
_thread.start_new_thread(work1_detect, ())
_thread.start_new_thread(work2_transport, ())
_thread.start_new_thread(work3_separate, ())
チャプター5
おわりに
5. 1 このレッスンのまとめ
今回は新たに「スタック」と「キュー」という2つのデータ構造について学びました。どちらも比較的シンプルな構造なので、分かりやすかったのではないかと思います。スタックは「先入れ後出し」、キューは「先入れ先出し」と覚えておきましょう。
レッスン内では、複数の処理が1つのコンピュータ内で同時に行われるケースでスタックやキューを扱ったマルチタスクを行いましたが、複数のコンピュータ同士がつながるインターネットの世界では、コンピュータ間でタスクを分散し、互いに通信しながら膨大なデータを処理することも行われています。
また、レッスン11では、Studuino:bitをwi-fiに接続してインターネットから情報を取得し、ロボットを制御する方法を紹介しますが、ここでもまたキューを利用します。少し先の学習なりますので、忘れてしまった場合はここに戻って復習してください。
5. 2 次のレッスンについて
次回のレッスンでは、マイコン内で時間の経過を正確に追うための機能と、マルチスレッドなプログラムにおいて、スレッド間で共有するデータを保護する機能について紹介していきます。