MQTTブローカにPublishされた値を若干手を入れてた後別のトピックでPublish

Xiaomi製Miスマート体組成計2 XMTZC05HMを見守り目的で購入した。 “一人暮らしの老人の見守りに対するアプローチ” で述べたことの実現の一環。体組成計で取得できるデータや,他のセンサーで検知できることを俯瞰的に見渡して総合的な判断ができるよう,データはMQTTブローカに集約しようとしている。そのため,Android TV準拠ストリーミング・メディア・プレーヤAmigo 7xJP導入したTermux環境内にMQTTブローカをインストールしてある。

体組成計で取得したデータをMQTTブローカに伝達させるには,サードパーティ製のOpenScaleアプリ( F-Droid版)(無償)にさらにOpenScale Sync – Apps on Google Play(有償)を組み合わせて行う。

OpenScale SyncでMQTTブローカに送られるデータ種別はなんと体重のみ,トピックは以下のように決め打ちで自由度が低い。QoSの指定もできなければRetainフラグのセットもできない。今回見守り目的だからともかく体組成計が使われたという事実が検知できればよいので体重だけでも目的は達成される。しかしそうではなく体組成計の測定データを自分で何か利用したいのであったなら,この非合理的な機能制限には大いに不満を持ったろう。

Adding a measurement

topic openScaleSync/measurements/insert

payload {"date":1572082380000,"weight":80.0}

Update a measurement

topic openScaleSync/measurements/update

payload {"date":1572082380000,"weight":120.0}

そこでこれらトピックにデータがpublishされたら,それをこちらが指定した別のトピックに自動的に「転記」されるようにすることにした。そのとき合わせて,QoS, retainフラグの設定や,元のトピックもJSONデータに含まれるようにする。Retainフラグが立ってないとブローカがデータを維持してくれないので,常時起動しているクライアントでなければ,OpenScale Syncからデータがpublishされているかがパッとわからない。

概略

基本的な流れは上記トピックにsubscribeし,何かデータがpublishされるたびそのデータを若干加工し,別のトピックにpublishするだけのシンプルなもの。MQTTクライアントとしてはMosquittoパッケージに含まれていた以下のコマンドを使う。

MQTTクライアントのコンフィギュレーション

オプションについてmosquitto_subコマンドのマニュアルには以下の記述がある。

The options below may be given on the command line, but may also be placed in a config file located at $XDG_CONFIG_HOME/mosquitto_sub or $HOME/.config/mosquitto_sub with one pair of -option value per line. The values in the config file will be used as defaults and can be overridden by using the command line.

mosquitto_pubについても設定ファイルは同様な扱いになっており,どちらもlocalhostで動作するMosquitto MQTTブローカに接続するクライアントであるためログイン情報など共通する部分も多い。なので $HOME/.config/mosquitto_subに設定を記述し,$HOME/.config/mosquitto_pubはそれへのシンボリックリンクとした。

内容は以下のようなもの:

# ~/.config/mosquitto_{pub,sub}                                                       
#                                                                                     
# Note only the lines that have a # as the first character are treated as comments.       
#                                                                                     
# The basics                                                                          
-h localhost                                                                          
-p 1883                                                                               
-u 秘密                                                                            
-P 秘密                                                                         
# Disable 'clean session' / enable persistent client mode.                            
-c                                                                                    
# Qos                                                                                 
-q 1                                                                                  
# Will                                                                                
#--will-qos 1                                                                         
#--will-retain                                                                        

Will関係は思うような動作にならなかったので外している。

Mosquitto_pubの-lオプション

Mosquitto_pubの標準入力から読み込む一行ごとにpublishする-l/–stdin-lineオプションがTermux版ではサポートされていない(”Error: ‘-l’ mode not available, threading support has not been compiled in.”)。Mosquitto_subはsubscribeしたトピックに対して何かデータがpublishがあるたびそのデータを出力し続けるので,それをパイプで加工しつつ,最終的にさらにパイプで-lオプションつきのmosquitto_pubに渡せばよいと思っていたがそれはできなかった。ただし,いずれにせよ加工の部分で1行ずつ読み取るという実装法になったので,そのままその1データごとにmosquitto_pubに渡す形にした。

ただし,その後一日経たずして-lオプションをサポートするpull requestが作成されたもよう。レスポンス鬼速い。

Jqをもっと理解できれば行単位でループするという処理はJq側で実現できるんじゃないかと思う。確信はないが…。

JqでJSONデータの操作

OpenScale SyncがpublishするデータはJSON形式。それに元のトピックを示すフィールドを追加したかったので,JSONデータの操作をする必要があった。 “Different Ways to Handle JSON in a Linux Shell | by Tate Galbraith | The Startup | Medium” なんかも参照したが,結局定番とされるJqを使うことに。ちなみに,Termuxは,JSONデータを操作するコマンドで特に出力に長けているとされるJoのパッケージも用意している。

Jqは関数型言語

安直にJqを選んでしまったが,わかってみればJqは関数型言語。Haskellなどに近いとのこと。その考え方に全く慣れてない自分は大いに戸惑った部分があった。最初からそれがわかっていたら避けたと思うが,Jqのホームページには “functional” という語すら見当たらずすぐ気づけなかった。

特に関数。todateという関数があれば todate(引数) という形で使われるかと思いきやそうではない。

~ $ jq 'todate(1630603473)'                                                                         
jq: error: todate/1 is not defined at <top-level>, line 1:                                           
todate(1630603473)                                                                                   
jq: 1 compile error                                                                                  
~ $ echo "1630603473" | jq 'todate'                                                                  
"2021-09-02T17:24:33Z"

時刻の扱い

時刻はUnix time (epoch time)を秒単位で扱うことにする。OpenScale Syncが吐くJSONデータ(以下に具体例)の “date” 属性の値は “the Unix date/time in milliseconds” であるがミリ秒の精度が必要になることはなかろう。

{ "date": 1572082380000, "weight": 80 }

Mosquitto_subの出力フォーマットでstrftime(3)に対応したフォーマットが利用できるので,Unix timeを出力してくれる “%s” を利用する。ただしMosquitto_subの出力フォーマットにそれを指定する際には “%” を “@” に変えて “@s” としないといけない。

機械処理をする部分ではUnix timeでやるにしても,人間(主に自分)がデータを見たときに具体的時刻がわからないと困るので,それをJqでJSONデータに追加しようとして苦戦した。それについては別稿にまとめた

この処理の結果,publishするデータは以下のようになる。OpenScaleで測定した時刻を示す time と,このスクリプトで処理した時刻を示した time-processed,さらにそれぞれ人間が見てわかりやすい表記(ISO-8601のオフセット付き表記)にした ~-readable を用意した。

{
  "weight": 80,
  "time": 1572082380,
  "time-readable": "2019-10-26T18:33:00+0900",
  "orig-topic": "openScaleSync/measurements/insert",
  "time-processed": 1630602011,
  "time-processed-readable": "2021-09-03T02:00:11+0900"
}

もっと簡潔なキーがいいとは思うんだが…。

追記: キー名にダッシュ(-)が入っていると,例えばこのJSONデータをJavaScriptで処理する際,obj.time-readableと書くと(obj.time)-readableと解釈されてしまう。obj['time-readable']と書けばよいのだが,煩わしいのでダッシュの代わりにアンダスコア(_)を使用することにした。 この変更は以下のスクリプトで行われたが,以下では変更前のものを提示している。 “Are dashes allowed in javascript property names? – Stack Overflow” 参照。

スクリプト

最終的なBashスクリプト~/bin/cp-scale-val.shは以下 ⇦ IFSの扱いを訂正した版が以下にあり

#!/data/data/com.termux/files/usr/bin/bash -f

# The output format specified by the -F option of 'mosquitto_sub'
# needs to match how it is subsequently 'read'.
mosquitto_sub -t 'openScaleSync/measurements/#' -F '%t\t%p\t@s\t%I' \
              -i "Scale Val Sub Client $$" \
| while true; do
    new_payload=$( IFS=$'\t' read -r topic payload time_processed time_processed_reada
ble;
                   echo "$payload" \
                   | jq ". + { \
                               \"time\": ( .date / 1000 ), \
                               \"time-readable\": \
                                  ( ( .date / 1000 ) \
                                      | strflocaltime(\"%Y-%m-%dT%H:%M:%S+0900\") \
                                  ), \
                               \"orig-topic\": \"${topic}\", \
                               \"time-processed\": ${time_processed}, \
                               \"time-processed-readable\": \"${time_processed_readabl
e}\" \
                             } \
                         | del(.date)" \
                 )
    mosquitto_pub -r -t '秘密/scale/weight' -m "$new_payload" \
                  -i "Scale Val Pub Client $$" ;
  done
exit 0

重要な注意: 上記スクリプトはOpenScale SyncによってpublishされるJSONデータが一行に収まることを前提にしている。この前提が壊れると正しく動作しなくなる。

ちなみにTermuxの自動起動スクリプトは現在以下のようになっている。

#!/data/data/com.termux/files/usr/bin/sh
termux-wake-lock
/data/data/com.termux/files/home/bin/no-ip.sh \
        -c=/data/data/com.termux/files/home/.no-ip/no-ip.conf &
sshd
autossh -M 0 \
        -f -N -T \
        -R 外部サーバのポート1:localhost:8022 \
        -R 外部サーバのポート2:localhost:1883 \
        ユーザ名@外部サーバ
mosquitto -c /data/data/com.termux/files/home/.mosquitto/mosquitto.conf -d
/data/data/com.termux/files/home/bin/cp-scale-val.sh

IFSの扱い

スクリプト中IFSを設定してreadしているが,IFS='\t'ではDashで動作しない。Bourne Shellではこれでよかったようにぼんやりと記憶しているんだが自信はない。深入りする余裕はないのでIFS=$'\t'として黙ってBashを使うことにする。

IFSへの代入方法に誤解があることがわかったので改定。

付記

なお,Amazfit Band 5自分専用の関連写真アルバム)もサードパーティアプリNotifyの使用で見守り目的に使おうとしているNotifyは有償機能の一つとしてブロードキャスト・インテントとして流してくれる。これをMQTTブローカにさらに伝搬させるところは自分で実現しないといけないが,それについては別稿で考察した

 

MQTTブローカにPublishされた値を若干手を入れてた後別のトピックでPublish」への4件のフィードバック

  1. ピンバック: Xiaomi製Miスマート体組成計2 XMTZC05HMを見守り目的で購入 | あくまで暫定措置としてのブログ
  2. ピンバック: Termux環境下でアプリを強制終了 | あくまで暫定措置としてのブログ
  3. ピンバック: DashでタブをIFSに代入 | あくまで暫定措置としてのブログ
  4. ピンバック: ストリーミング・メディア・プレーヤAmigo 7xJPを使ってみて | あくまで暫定措置としてのブログ

コメントを残す

以下に詳細を記入するか、アイコンをクリックしてログインしてください。

WordPress.com ロゴ

WordPress.com アカウントを使ってコメントしています。 ログアウト /  変更 )

Twitter 画像

Twitter アカウントを使ってコメントしています。 ログアウト /  変更 )

Facebook の写真

Facebook アカウントを使ってコメントしています。 ログアウト /  変更 )

%s と連携中

このサイトはスパムを低減するために Akismet を使っています。コメントデータの処理方法の詳細はこちらをご覧ください