Thing の購読
購読は
イベントを受信してこれに応答するサービスです。購読にはソース (通常は
Thing) が含まれています。Thing には、操作で応答するイベントの購読を設定できます。たとえば、あるエンティティで「モーターが過熱している」というイベントが発生した場合、それは「モーターをオフにする」という購読をトリガーすることによって、そのイベントを購読できます。Thing は、使用している
Thing Templateおよび
Thing Shapeから購読を継承できます。
購読は標準のサービスとよく似ていますが、購読はイベントに明示的にリンクしています。これにより、イベントに応答するコードからイベントを切り離すことができます。サービスと同様に、イベントに応答するカスタムビジネスロジックを実装できます。メールサーバー Thing を介して電子メールを送信したり、データベースに書き込んだり、そのプラットフォームで利用可能なサービスを呼び出したりすることによって、モデルの能力を活用できます。購読ではサービスのように明示的な出力は返されません。ただし、購読はスレッドセキュリティコンテキストがアクセスできるモデル内に、その他のサービスを呼び出すことはできます。購読のスレッドセキュリティコンテキストは、発生したイベントと同じスレッドセキュリティコンテキストに設定されます。サービスの実装に使用されているものと同じ JavaScript 編集環境を使用できます。
購読には定義済みの入力があり、これはイベントによって発行されるデータパケットで、イベントデータと呼ばれます。エンティティが定義済みのイベントを購読している場合、イベントデータは購読関数に渡されます。イベントデータはイベント
データシェイプによって定義されます。購読の実装内で、イベントから渡されたデータはスクリプト関数への入力として機能します。たとえば、あるエンティティが Thing プロパティデータ変更イベントを購読している場合、その購読スクリプト関数が呼び出されます。その結果、Thing プロパティ値が、イベントからのその他の関連データとともにイベントデータの一部としてこの関数に渡されます。
購読によって、多数のエンティティが同じイベントを購読できます。各エンティティはイベントデータが渡された購読への呼び出しを受信します。エンティティは、ソリューションの要件を満たすために必要なあらゆる操作を購読スクリプトから実行できます。
別のサービスから呼び出されるサービスを使用するよりも、この手法の方が有利な点には、以下があります。
• イベントを 1 つまたは多数の購読によって購読できること。
• イベントがシステムのアクティビティに基づいて呼び出され、ユーザーの操作が不要であること。
• イベントが複数の Thing によって購読される場合は、複数のサービスを連携する代わりに、購読を使用できます。
• 1 つの購読によって、異なるエンティティから複数のイベントを購読できます。
|
イベントトリガーと購読は非同期に実行されます。たとえば、プロパティ更新操作が完了するとただちに、プロパティ更新 API リクエストは応答を受信します。これはデータ変更イベントの完了に応答する後続の購読を待機しません。
|
複数購読
購読には一意の識別子としてユーザー定義の名前が付けられます。エンティティは、Thing のイベントを複数購読できます。たとえば、あるエンティティで「モーターが過熱している」というイベントが発生した場合、それは「モーターをオフにする」という購読と「作業命令書を作成する」という購読の両方によってそのイベントを購読できます。これにより、メンテナンスでエンジンが点検されます。そのイベントに対して、任意の数のほかの購読を作成することもできます。
Thing Template または Thing Shape によってあるイベントの購読が実装されている場合、その Thing Template または Thing Shape を使用する Thing も同じイベントの購読を作成でき、これらのイベントの発生時に追加の操作を行う次善策は必要ありません。
配分購読
配分購読を使用すると、イベントによって多数の購読のインスタンスがトリガーされた場合に、すべての ThingWorx ノードに購読の実行を分散できます。たとえば、多数の Thing が同じタイマーまたはスケジューライベントを購読している場合などです。これにより、高可用性環境のすべての ThingWorx ノードにおけるタイマー/スケジューラベースの購読実行の水平スケーラビリティが実現し、リソース使用率とパフォーマンスが向上します。この動作を有効にするには、「購読情報」タブの「配分」チェックボックスをオンにします。「均等」チェックボックスがオフの場合、タイマーおよびスケジューラの購読は、タイマーまたはスケジューライベントが生成されたノードで実行されます。関連するコンフィギュレーションの詳細については、以下を参照してください。
マルチイベント購読
マルチイベント購読機能では、1 つの購読で、さまざまな Thing からでも、複数のイベントを購読できます。これは主に複雑な購読で必要です。たとえば、複数の異なるプロパティ変更からトリガーされる購読を作成し、これらのプロパティ値に対して単純な if-then-else 規則を実行することで、結果に基づいた操作を実行できます。例については、
ユースケース 2を参照してください。または、タイマーのイベントに基づく購読を作成して、タイムウィンドウ規則を設定します。例については、
ユースケース 3を参照してください。
|
マルチイベント購読機能は、新規購読を作成する場合にのみ表示されます。ThingWorx バージョン 9.4 以降で作成されたレガシー購読は同じままとなり、複数のイベントを購読することはできません。
|
マルチイベント購読の機能により、更新されたプロパティのタイムスタンプに基づいてグループ化された、一度に複数のイベントを購読に配信するバッチ取得の概念が導入されています。詳細については、
「Thing プロパティ」を参照してください。Remote Thing の詳細については、
「Remote Thing サービス」を参照してください。
|
getSubscriptions または getInstanceSubscriptions Java 拡張機能 API を現在使用している場合、代わりにオプション getMultiEventSubscriptions または getInstanceMultiEventSubscriptions を使用する必要があります。getSubscriptions および getInstanceSubscriptions は ThingWorx 9.5 以降の購読フォーマットをサポートしていません。これは、ThingWorx 9.5 の前に作成された購読と ThingWorx 9.5 以降に作成された購読の両方が含まれているエンティティがある場合にのみ適用されます。新しい API では、レガシー (ThingWorx 9.5 より前) 購読フォーマットと新しい (ThingWorx 9.5 以降) 購読フォーマットがサポートされています。
|
購読の「入力」タブの「イベント」ドロップダウンリストの横にある + 記号によって、この動作が有効になります。購読のイベントのリストを管理できます。たとえば、イベントを追加/削除したり、イベント情報を編集したりできます。追加された各イベントは、一意の識別子としてユーザー定義のエイリアスを取得します。このエイリアスは、JavaScript を介して eventData にアクセスするときに使用します。eventData へのアクセスの詳細については、以下を参照してください。
eventData へのアクセス
JavaScript を介して eventData 情報にアクセスするには、イベントのエイリアスを使用します。開発者は event.eventData.newValue によって eventData にアクセスできなくなりますが、events[] レベルとエイリアスの一意の名前を使用して、特定の eventData にアクセスする必要があります (events["AliasName"].eventData.newValue.value など)。
関連付けられているイベントのうちの一部だけが購読をトリガーする可能性があります。したがって、購読をトリガーしなかった eventData のいずれかにアクセスするとエラーが発生する可能性があるので、アクセス対象の eventData が定義されているかどうかを最初にチェックする必要があります。次に例を示します。
try {
if (events["Me_DataChange_p1"].eventData.newValue.value ==44 {
…
} catch (error) {
logger.error(" p1 event is not defined " + error);
}
または、別の例を示します。
if( events["Me_DataChange_p1"] !== undefined ){
if(events["Me_DataChange_p1"].eventData.newValue.value)==44;{
…
}
}
順序付けされたステートフルな購読 (ベータ)
順序付けされた購読は、順序付けと不可分性を必要とするユースケースで使用されます。これは新規購読を作成する場合にのみ表示されます。ThingWorx 9.4 以前で作成された購読は同じままとなり、連続して実行することはできません。
この機能がない場合、購読の実行は非同期で並列に実行され、ステートフルにはできません (実行間で値を渡すことはできません)。一部のユースケースでは、これにより、同時実行の問題や、主に購読の規則が以前の値に基づいている場合に間違った結果が生じる可能性があります。ただし、順序付けされた購読により、イベントのタイムスタンプの順序に基づいて購読を連続して実行できます。
この動作を有効にするには、「購読情報」タブの「Execute sequentially」チェックボックスをオンにします。さらに、一度選択すると、開発者は購読ごとに専用の JSON オブジェクトのコンテンツを完全に制御できるようになり、購読の実行間で以前の値を保存したり、データを累積したり、単純なステートフル集約を実行したりできます。thisSub.JSONState の使用の詳細については、以下を参照してください。
thisSub.JSONState の使用
このオブジェクト thisSub.JSONState は JavaScript から次のようにアクセスできます。
• ネイティブの値の場合、thisSub.JSONState.X=5 を使用します。
• インフォテーブルの場合、JSONObject(), thisSub.JSONstate.myInfoTable = Y.toJSONObject() を使用します。
これにアクセスするには、thisSub.state.myInfoTable.rows[0].value を使用します。
状態オブジェクトから値を削除するには、JavaScript delete キーワード delete thisSub.JSONState.myInfoTable を使用します。
状態オブジェクト全体をクリアするには、これに空の JSON {} を割り当てます: thisSub.JSONState = {}。
|
thisSub.JSONState の柔軟性により、開発者が格納するデータが多すぎて ThingWorx が失敗するリスクが高くなります。そのため、開発者はこのオブジェクトを使用する際には必ずオブジェクトクリーンアッププロセスを埋め込む必要があります。
|
thisSub.JSONState はメモリに格納されるので、ThingWorx のシャットダウン時にクリアされます。さらに、以下の場合に状態がクリアされます。
• 購読を宣言しているエンティティが編集されて保存された場合。
• 購読を宣言しているエンティティが無効になった場合。
• DisableSubscription サービスを使用するなど、購読が無効になった場合。
• HA クラスタで新しい ThingWorx ノードが起動/停止し、一部の購読が別のノードで実行を開始した場合。
ユースケース
以下のユースケースは、マルチイベント購読機能と順序付けされたステートフルな購読機能を示しています。
ユースケース 1
// Usecase 1: alert will be triggered if voltage is higher than 118V
// Check if the voltage value exceeds 118 and trigger the appropriate action.
if (events["Me_DataChange_useCase1_Voltage"].eventData.newValue.value > 118){
logger.warn("Use-Case 1 subscription has been triggered");
}
ユースケース 2
// Usecase 2: alert will be triggered if voltage is higher than 118V and current is lower than 2A
//********************** Default initialization ****************************************
if (thisSub.JSONState.Voltage === undefined)
thisSub.JSONState.Voltage = me.useCase2_Voltage; // default value from VTQ
if (thisSub.JSONState.Current === undefined)
thisSub.JSONState.Current = 0; // default value 0
//*************************************************************************************
//************************* storing the values in JSONState from eventData ***************
var aliasArray = Object.keys(events.dataShape.fields);
for (var i=0; i < aliasArray.length; i ++ ){
if (aliasArray[i] === "Me_DataChange_useCase2_Current")
// Assign the new current value to the state
thisSub.JSONState.Current = events["Me_DataChange_useCase2_Current"].eventData.newValue.value;
if (aliasArray[i] === "Me_DataChange_useCase2_Voltage") {
// Assign the new voltage value to the state
thisSub.JSONState.Voltage = events["Me_DataChange_useCase2_Voltage"].eventData.newValue.value;
}
}
//******************************************************************************************
// Check if both voltage and current meet the conditions - voltage is higher than 118V and current is lower than 2A
if (thisSub.JSONState.Voltage > 118 &&
thisSub.JSONState.Current < 2) {
logger.warn("Use-Case 2 alert !!!, Voltage is: "+thisSub.JSONState.Voltage+" and Current is: "+thisSub.JSONState.Current);
}
else
logger.warn("Use-Case 2 NO alert, Voltage is: "+thisSub.JSONState.Voltage+" and Current is: "+thisSub.JSONState.Current);
ユースケース 3
// Usecase 3: alert will be triggered if voltage is higher than 118V and it has last more than 3 minutes.
// Timer will tick every X seconds
/*
When the DataChange event causing the subscription to run,
The value of Voltage will be checked:
1. If it would be above 118, timer would start running in case its undefined.
2. If it would be below or equal to 118, timer would back to undefined.
In case time has gotten to 3 minutes (180000MS), subscription would be triggered.
*/
// Getting the name of the event that triggered the subscription
let aliasArray = Object.keys(events.dataShape.fields);
if (aliasArray[0] === "Me_DataChange_useCase3_Voltage") {
// Valtage was changed
// We want to manage thisSub.JSONState.StartTime only. We don't save the Valtage in state
var Voltage = events[aliasArray[0]].eventData.newValue.value;
if( thisSub.JSONState.StartTime === undefined ) {
if( Voltage > 118 ){
thisSub.JSONState.StartTime = Date.now();
logger.warn("Voltage = " + Voltage + " Start time was set");
}
} else{
if( Voltage <= 118 ){
delete thisSub.JSONState.StartTime; // this will set to undefine
logger.warn("Voltage = " + Voltage + " Start time was unset");
}
}
}
else{
// We are in timer event
if( thisSub.JSONState.StartTime !== undefined && Date.now() - thisSub.JSONState.StartTime > 40000 )
logger.warn("Use-Case 3 timer was triggered - Alert !!! thisSub.JSONState.StartTime = " + thisSub.JSONState.StartTime);
else
logger.warn("Use-Case 3 timer was triggered - no Alert. thisSub.JSONState.StartTime = " + thisSub.JSONState.StartTime);
}
ユースケース 4
// Usecase 4: Input 32 bit integer will be translated into 32 status properties if there is any change,
// and alert on status property potentially will be triggered.
logger.warn("Use-Case 4 subscription has been triggered");
ユースケース 5
// Usecase 5: Alert will be triggered only on Error code which has severity level 1 or 2,
// where Error severity is defined in a relational table
// Get reference to the data table
var dataTable = Things["useCase5_DataTable"];
// Get the new value of ErrorCode from the event
var ErrorCodeValue = events["Me_DataChange_useCase5_ErrorCode"].eventData.newValue.value;
// Set parameters for querying the data table
var params = {
maxItems: 100, // Maximum number of entries to retrieve
source: undefined, // Filter entries by a specific source if needed
values: undefined, // Filter entries by specific column values if needed
orderBy: undefined // Order the retrieved entries by a specific column if needed
};
// Query the data table entries
var queryResult = dataTable.QueryDataTableEntries(params);
var rows = queryResult.rows;
// Iterate through the rows and check for matching ErrorCode and severity level
for (var i = 0; i < rows.length; i++) {
var thisRow = rows[i];
// Check if the ErrorCode matches the given ErrorCodeValue
if (thisRow.ErrorCode == ErrorCodeValue) {
// Check if the Severity is either 1 or 2
if (thisRow.Severity == 1 || thisRow.Severity == 2) {
// Log a warning message indicating the matched ErrorCode and its Severity
logger.warn("Error code: " + thisRow.ErrorCode + " has a severity of: " + thisRow.Severity + ". Use-Case 5 subscription has been triggered.");
}
}
}
ユースケース 6
// Usecase 6: Alert will be trigged if sum of product count is less than 10 in past 10 minutes
// Timer will tick every 1 seconds (1000MS)
// Getting the name of the event that triggered the subscription
var alias = events.dataShape.fields;
// Check if the alias matches "Me_DataChange_useCase6_ProductCount"
if (alias == "Me_DataChange_useCase6_ProductCount") {
// Check if the productCountDict exists in the state
if (thisSub.JSONState.productCountDict != undefined) {
// Delete entries older than 10 minutes
deleteOlderThan10Minutes();
// Add a new entry with the current timestamp and the new value from the event
thisSub.JSONState.productCountDict[Date.now().toString()] = events[alias].eventData.newValue.value;
} else {
// Initialize the productCountDict as an empty object and add a new entry
thisSub.JSONState.productCountDict = {};
thisSub.JSONState.productCountDict[Date.now().toString()] = events[alias].eventData.newValue.value;
}
} else {
// Check if the startTime is undefined and set it to the current time
if (thisSub.JSONState.startTime == undefined) {
thisSub.JSONState.startTime = Date.now();
}
// Check if the productCountDict exists in the state
if (thisSub.JSONState.productCountDict != undefined) {
// Delete entries older than 10 minutes
deleteOlderThan10Minutes();
// Check if an alert should be triggered and log a warning message
if (shouldTriggerAlert()) {
logger.warn("Use-Case 6 subscription has been triggered");
}
}
}
// Function to delete entries older than 10 minutes from productCountDict
function deleteOlderThan10Minutes() {
var keys = Object.keys(thisSub.JSONState.productCountDict);
for (var i = 0; i < keys.length; i++) {
var key = keys[i];
// Check if the time difference is greater than 10 minutes (600,000 milliseconds)
if (Date.now() - parseInt(key) > 600000) {
delete thisSub.JSONState.productCountDict[key];
}
}
}
// Function to check if an alert should be triggered
function shouldTriggerAlert() {
// Check if the sum of product count is less than 10 and if the time difference is greater than 10 minutes
if (productCountDictSUM() < 10 && Date.now() - thisSub.JSONState.startTime > 600000) {
return true;
} else {
return false;
}
}
// Function to calculate the sum of values in productCountDict
function productCountDictSUM() {
var keys = Object.keys(thisSub.JSONState.productCountDict);
var sum = 0;
for (var i = 0; i < keys.length; i++) {
var key = keys[i];
var value = thisSub.JSONState.productCountDict[key];
sum += value;
}
return sum;
}
ユースケース 7
// Usecase 7: alert will be triggered if average current (5 minutes window) is higher than 2.5A,
// and it will be checked very 30 seconds.
// Timer will tick every 30 seconds (30000MS)
// Getting the name of the event that triggered the subscription
var alias = events.dataShape.fields;
// Checking if the data alias matches the expected value
if (alias == "Me_DataChange_useCase7_Current") {
// If the data dictionary exists, update it with the new current value
if (thisSub.JSONState.currentDict != undefined) {
deleteOlderThan5Minutes();
thisSub.JSONState.currentDict[Date.now().toString()] = events[alias].eventData.newValue.value;
} else {
// If the data dictionary doesn't exist, create it and set the start time
thisSub.JSONState.currentDict = {};
thisSub.JSONState.currentDict[Date.now().toString()] = events[alias].eventData.newValue.value;
}
} else {
if(thisSub.JSONState.startTime == undefined){
thisSub.JSONState.startTime = Date.now();
}
// If the data alias doesn't match, perform data management and potential alert triggering
if (thisSub.JSONState.currentDict != undefined) {
// Delete entries in the data dictionary older than 5 minutes
deleteOlderThan5Minutes();
// Check if conditions for triggering the alert are met
if (shouldTriggerAlert()) {
logger.warn("Use-Case 7 subscription has been triggered");
}
}
}
// Function to delete data entries older than 5 minutes
function deleteOlderThan5Minutes() {
var keys = Object.keys(thisSub.JSONState.currentDict);
for (var i = 0; i < keys.length; i++) {
var key = keys[i];
if (Date.now() - parseInt(key) > 300000) {
delete thisSub.JSONState.currentDict[key];
}
}
}
// Function to determine if the alert should be triggered
// Only in case 5 minutes has been passed, and AVG is higher then 2.5
function shouldTriggerAlert() {
if (currentDictAVG() > 2.5 && Date.now() - thisSub.JSONState.startTime > 300000) {
return true;
} else {
return false;
}
}
// Function to calculate the average of currentDict
function currentDictAVG() {
var keys = Object.keys(thisSub.JSONState.currentDict);
var sum = 0;
var count = 0;
for (var i = 0; i < keys.length; i++) {
var key = keys[i];
var value = thisSub.JSONState.currentDict[key];
sum += value;
count++;
}
var average = sum / count;
return average;
}
////at any data change i sum the current
////when 30 seconds has been passed i calculate AVG
//
//
//
////change property event:
//// check if there is any value older then 5, if so - delete. (can implement by get keys)
//// add the value to the dict with key of timestamp
////timer event:
//// check if there is any value older then 5, if so - delete. (can implement by get keys)
//// AVG calculation, and condition check. - not only per 30 seconds, but for all the data at 5 minutes
//
//// build function to: change if there is any value older then 5, if so - delete. (can implement by get keys)
//
// ** the first timer event will be the startTime point of the algorithm
ユースケース 8
// Usecase 8: the alert will be triggered when most recent value is greater than the previous 10 values
// Check if valuesArray state is undefined and initialize it
if (thisSub.JSONState.valuesArray === undefined) {
thisSub.JSONState.valuesArray = {};
}
// Check if valuesArray length is less than 10
let indexCounter = Object.keys(thisSub.JSONState.valuesArray).length;
if ( indexCounter < 3) {
// Add the new value to valuesArray
thisSub.JSONState.valuesArray["i" + indexCounter] = events["Me_DataChange_useCase8_prop"].eventData.newValue.value;
} else {
// The valuesArray is full
var max = 0;
// Find the maximum value in valuesArray
for (let i = 0; i < Object.keys(thisSub.JSONState.valuesArray).length; i++) {
if (thisSub.JSONState.valuesArray["i" + i] > max)
max = thisSub.JSONState.valuesArray["i" + i];
}
// Check if the new value is greater than the maximum value
if (events["Me_DataChange_useCase8_prop"].eventData.newValue.value > max)
logger.warn("Use-Case 8 Alert!!! max is " + events["Me_DataChange_useCase8_prop"].eventData.newValue.value);
// Shift the values in valuesArray by one position
for (var j = 0; j < parseInt((Object.keys(thisSub.JSONState.valuesArray)).length) - 1; j++) {
thisSub.JSONState.valuesArray["i" + j] = thisSub.JSONState.valuesArray["i" + (j + 1)];
}
// Add the new value to valuesArray using the current indexCounter
thisSub.JSONState.valuesArray["i" + (indexCounter - 1)] = events["Me_DataChange_useCase8_prop"].eventData.newValue.value;
}
logger.warn("Use-Case 8 End. valuesArray = " + thisSub.JSONState.valuesArray);