事物订阅
订阅是接收事件并对事件进行响应的服务。订阅包含源 (通常为事物)。事物可以订阅通过某一操作作出响应的事件。例如,如果某个实体触发电动机过热 事件,则可通过触发关闭电动机 订阅来订阅该事件。事物可以从其使用的事物模板事物形态继承订阅。
订阅类似于标准服务,但它与事件显式关联。这样,您便可将事件从对其作出响应的代码中分离出来。与服务类似,可通过实现自定义业务逻辑来对事件作出响应。可以利用模型的功能,方法包括通过邮件服务器事物发送电子邮件、写入数据库或调用平台中可用的任何服务。订阅不能像服务那样具有显式返回输出。但是,订阅可以调用模型中线程安全上下文有权访问的任何其他服务。订阅的线程安全上下文被设置为与所触发事件使用相同的线程安全上下文。可以使用用于实现服务的 JavaScript 编辑环境。
订阅具有已定义的输入,该输入是由事件发布的数据包,称为事件数据。如果实体订阅了已定义事件,则会将事件数据传递给订阅函数。事件数据由事件数据形状进行描述。在订阅实现中,从事件传递的数据用作脚本函数的输入。例如,如果将某个实体订阅到事物属性数据更改事件,则会调用订阅脚本函数。因此,事物属性值以及来自事件的其他相关数据会作为事件数据的一部分传递给函数。
许多实体都可通过订阅订阅同一事件。每个实体可以使用传递的事件数据接收对订阅的调用。为达到解决方案的要求,实体可以采取订阅脚本中的任何操作。
使用此方法与使用从另一服务中调用的服务相比,其优点包括:
事件可通过一个或多个订阅进行订阅。
将根据系统活动调用事件,无需用户交互。
如果有多个事物订阅了某个事件,则可以使用订阅而不是链接多个服务。
一个订阅可订阅来自不同实体的多个事件。
* 
事件触发器和订阅异步执行。例如,属性更新操作完成后,属性更新 API 请求将得到即时响应。它不会等待响应数据更改事件的后续订阅完成。
多次订阅
订阅将使用用户定义的名称作为唯一标识符。实体可以对事物上的事件进行多次订阅。例如,如果某个实体触发电动机过热 事件,则它可以使用关闭电动机 订阅和创建工作订单 订阅来订阅该事件,以便对该引擎进行维护检查。也可以为该事件创建任意数量的其他订阅。
如果事物模板或事物形态实现了对某事件的订阅,则使用该事物模板或事物形态的事物也可创建同一事件的订阅,而无需其他替代方法来在触发事件时执行其他操作。
分布式订阅
分布式订阅允许在事件触发多个订阅实例时,将订阅分配到所有 ThingWorx 节点上执行。例如,当多个事物订阅相同的计时器或计划程序事件时。这样一来,在高可用性环境下的所有 ThingWorx 节点上执行基于计时器/计划程序的订阅时,可实现此类执行的水平可扩展性,进而优化资源利用率并提高性能。可通过“订阅信息”选项卡下的“分布式”复选框启用此行为。如果清除“分布式”复选框,则将在生成计时器或计划程序事件的同一节点内执行计时器和计划程序订阅。有关相关配置的详细信息,请参阅以下信息:
对于本地部署,请参阅为 Akka 配置 SSL/TLS
有关 Docker 环境,请参阅为 ThingWorx配置 Akka TLS 通信
多事件订阅
客户可通过多事件订阅功能为单个订阅订阅多个事件,甚至可以订阅来自不同事物的事件。一般而言,此为复杂订阅的必备功能。例如,客户可创建由多个不同属性更改触发的订阅,这些订阅可针对这些属性值运行简单的“如果-则-否则”规则以根据结果执行操作。有关示例,请参阅用例 2。或者,基于计时器事件创建订阅以设置时间窗口规则。有关示例,请参阅用例 3
* 
多事件订阅功能仅在创建新订阅时可见。使用 ThingWorx 9.4 及更高版本创建的旧版订阅将保持不变,且无法订阅多个事件。
多事件订阅功能催生了批量引入概念,即一次为一个订阅订阅多个事件,这些事件根据已更新属性的时间戳进行分组。有关详细信息,请参阅事物属性。有关远程事物的详细信息,请参阅远程事物服务
* 
如果正在使用 getSubscriptionsgetInstanceSubscriptions Java extensions API,则必须改用 getMultiEventSubscriptionsgetInstanceMultiEventSubscriptions 选项。getSubscriptionsgetInstanceSubscriptions 不支持 ThingWorx 9.5 及更高版本的订阅格式。此情况仅在实体包含 ThingWorx 9.5 之前和之后版本所创建订阅的组合时适用。新 API 支持旧版 (ThingWorx 9.5 之前的版本) 和新版 (ThingWorx 9.5 及更高版本) 订阅格式。
可通过订阅的“输入”选项卡下“事件”下拉列表旁的 + 符号启用此行为。订阅事件列表为可管理列表。例如,用户可添加/删除事件和编辑事件信息。所添加的每个事件均会获得一个用户定义的别名作为唯一标识符。此别名应在通过 JavaScript 访问 eventData 时使用。有关访问 eventData 的详细信息,请参见以下内容。
* 
更新同一时间戳的多个属性时,新 UpdatePropertyValuesBatchedUpdateSubscribedPropertyValuesBatched 服务优先于旧式 UpdatePropertyValuesUpdateSubscribedPropertyValues 服务。新服务允许针对每个时间戳仅执行一次多事件订阅,从而在同一执行中接收所有已更改属性的新值。与旧式服务相比,这不但可减少订阅执行的总次数,同时还降低了 CPU 占用率。
访问 eventData
使用事件别名通过 JavaScript 访问 eventData 信息。开发人员不能再通过 event.eventData.newValue 访问 eventData,而是必须使用唯一的 events[] 级别和别名名称来访问特定的 eventData,例如 events["AliasName"].eventData.newValue.value
* 
本指南仅适用于从 ThingWorx 9.5 及更高版本中创建的新订阅。在 9.5 之前创建的旧订阅仍可通过 event.eventData.newValue 访问 eventData。
可能只有部分关联事件会触发订阅。因此,访问其中一个未触发订阅的 eventData 可能会导致错误,所以必须先检查是否已定义 eventData 访问。例如:
try {
if (events["Me_DataChange_p1"].eventData.newValue.value ==44 {

} catch (error) {
logger.error(" p1 event is not defined " + error);
}
以下为另一示例。
if( events["Me_DataChange_p1"] !== undefined ){
if(events["Me_DataChange_p1"].eventData.newValue.value)==44;{

}
}
有序和有状态订阅
有序订阅适用于需要排序和原子性的用例。其仅在创建新订阅时可见。使用 ThingWorx 9.4 及更低版本创建的订阅将保持不变,且无法按顺序执行。
如果没有此功能,则将异步、并行执行订阅,且无法保持状态 (无法在多次执行间传递值),这可能会导致某些用例出现并发问题和错误结果,尤其是在订阅规则基于先前值的情况下。但有序订阅允许基于事件的时间戳顺序执行订阅。
可通过“订阅信息”选项卡下的“按顺序执行事件”复选框启用此行为。此外,一旦选择此功能,开发人员即可完全控制各个订阅的专用 JSON 对象的内容,从而在订阅执行间存储先前值、累积数据并执行简单的状态聚合。有关使用 thisSub.JSONState 的详细信息,请参阅下文。
* 
与无序订阅相比,有序订阅需要更多的 CPU 资源,因此将所有订阅标记为有序可能会影响整体的 CPU 占用率并降低订阅的执行速率。对于不需要保持状态且可并行执行而不产生副作用的订阅,建议清除“按顺序执行事件”复选框。
使用 thisSub.JSONState
可按如下方式通过 JavaScript 访问此对象 thisSub.JSONState
对于原生值,请使用 thisSub.JSONState.X=5
对于信息表,请使用 JSONObject(), thisSub.JSONstate.myInfoTable = Y.toJSONObject()
要对其进行访问,请使用 thisSub.state.myInfoTable.rows[0].value
存储在 JSONState 对象中的数据应谨慎使用,且仅限用于必要字段。例如,不建议使用 toJSONObject() 方法代替单个字段来存储完整的信息表对象,否则会占用更多的 CPU 且需要更多的状态内存存储。
要从状态对象中删除值,请使用 JavaScript delete 关键字 delete thisSub.JSONState.myInfoTable
要清除整个状态对象,请为其分配一个空 JSON {}:thisSub.JSONState = {}
* 
为防止因内存消耗过大而导致状态数据丢失,开发人员必须在所存储的数据失去利用价值后删除各个状态字段或清除整个 thisSub.JSONState 对象。
thisSub.JSONState 在内存中存储,因此,会在 ThingWorx 关闭时清除。此外,该状态会于以下情况清除:
编辑并保存声明订阅的实体。
禁用声明订阅的实体。
使用 DisableSubscription 服务等禁用订阅。
新的 ThingWorx 节点在 HA 群集中启动/停止,从而导致一些订阅在不同的节点上开始执行。
* 
ThingWorx 会针对单个订阅的 thisSub.JSONState 以及所有订阅状态的整体内存存储强制执行内存大小限制。这些限制可在 platform-settings.json 配置详细信息的 SubscriptionSettings 部分进行配置。
有关监控订阅状态内存的详细信息,请参阅订阅性能。有关最佳订阅做法的信息,请参阅构建 ThingWorx 解决方案的最佳做法概览
有关如何使用多事件订阅和有序订阅的汇总,请查看 ThingWorx 9.5 中的多事件订阅
以下“用例”部分包含代码示例,并借此展示了强大的多事件以及有序和有状态订阅功能,其中包括时间窗口和属性更新冲突处理等用例。
用例
用例 1 
// Usecase 1: alert will be triggered if voltage is higher than 118V
// Check if the voltage value exceeds 118 and trigger the appropriate action.
if (events["Me_DataChange_useCase1_Voltage"].eventData.newValue.value > 118){
logger.warn("Use-Case 1 subscription has been triggered");
}
用例 2 
// Usecase 2: alert will be triggered if voltage is higher than 118V and current is lower than 2A

//********************** Default initialization ****************************************
if (thisSub.JSONState.Voltage === undefined)
thisSub.JSONState.Voltage = me.useCase2_Voltage; // default value from VTQ
if (thisSub.JSONState.Current === undefined)
thisSub.JSONState.Current = 0; // default value 0
//*************************************************************************************
//************************* storing the values in JSONState from eventData ***************
var aliasArray = Object.keys(events.dataShape.fields);
for (var i=0; i < aliasArray.length; i ++ ){
if (aliasArray[i] === "Me_DataChange_useCase2_Current")
// Assign the new current value to the state
thisSub.JSONState.Current = events["Me_DataChange_useCase2_Current"].eventData.newValue.value;
if (aliasArray[i] === "Me_DataChange_useCase2_Voltage") {
// Assign the new voltage value to the state
thisSub.JSONState.Voltage = events["Me_DataChange_useCase2_Voltage"].eventData.newValue.value;
}
}
//******************************************************************************************
// Check if both voltage and current meet the conditions - voltage is higher than 118V and current is lower than 2A
if (thisSub.JSONState.Voltage > 118 &&
thisSub.JSONState.Current < 2) {
logger.warn("Use-Case 2 alert !!!, Voltage is: "+thisSub.JSONState.Voltage+" and Current is: "+thisSub.JSONState.Current);
}
else
logger.warn("Use-Case 2 NO alert, Voltage is: "+thisSub.JSONState.Voltage+" and Current is: "+thisSub.JSONState.Current);
用例 3 
// Usecase 3: alert will be triggered if voltage is higher than 118V and it has last more than 3 minutes.
// Timer will tick every X seconds

/*
When the DataChange event causing the subscription to run,
The value of Voltage will be checked:
1. If it would be above 118, timer would start running in case its undefined.
2. If it would be below or equal to 118, timer would back to undefined.
In case time has gotten to 3 minutes (180000MS), subscription would be triggered.
*/
// Getting the name of the event that triggered the subscription
let aliasArray = Object.keys(events.dataShape.fields);
if (aliasArray[0] === "Me_DataChange_useCase3_Voltage") {
// Valtage was changed
// We want to manage thisSub.JSONState.StartTime only. We don't save the Valtage in state

var Voltage = events[aliasArray[0]].eventData.newValue.value;

if( thisSub.JSONState.StartTime === undefined ) {
if( Voltage > 118 ){
thisSub.JSONState.StartTime = Date.now();
logger.warn("Voltage = " + Voltage + " Start time was set");
}
} else{
if( Voltage <= 118 ){
delete thisSub.JSONState.StartTime; // this will set to undefine
logger.warn("Voltage = " + Voltage + " Start time was unset");
}
}
}
else{
// We are in timer event
if( thisSub.JSONState.StartTime !== undefined && Date.now() - thisSub.JSONState.StartTime > 40000 )
logger.warn("Use-Case 3 timer was triggered - Alert !!! thisSub.JSONState.StartTime = " + thisSub.JSONState.StartTime);
else
logger.warn("Use-Case 3 timer was triggered - no Alert. thisSub.JSONState.StartTime = " + thisSub.JSONState.StartTime);
}

用例 4 
// Usecase 4: Input 32 bit integer will be translated into 32 status properties if there is any change,
// and alert on status property potentially will be triggered.

logger.warn("Use-Case 4 subscription has been triggered");
用例 5 
// Usecase 5: Alert will be triggered only on Error code which has severity level 1 or 2,
// where Error severity is defined in a relational table
// Get reference to the data table
var dataTable = Things["useCase5_DataTable"];
// Get the new value of ErrorCode from the event
var ErrorCodeValue = events["Me_DataChange_useCase5_ErrorCode"].eventData.newValue.value;
// Set parameters for querying the data table
var params = {
maxItems: 100, // Maximum number of entries to retrieve
source: undefined, // Filter entries by a specific source if needed
values: undefined, // Filter entries by specific column values if needed
orderBy: undefined // Order the retrieved entries by a specific column if needed
};
// Query the data table entries
var queryResult = dataTable.QueryDataTableEntries(params);
var rows = queryResult.rows;
// Iterate through the rows and check for matching ErrorCode and severity level
for (var i = 0; i < rows.length; i++) {
var thisRow = rows[i];

// Check if the ErrorCode matches the given ErrorCodeValue
if (thisRow.ErrorCode == ErrorCodeValue) {
// Check if the Severity is either 1 or 2
if (thisRow.Severity == 1 || thisRow.Severity == 2) {
// Log a warning message indicating the matched ErrorCode and its Severity
logger.warn("Error code: " + thisRow.ErrorCode + " has a severity of: " + thisRow.Severity + ". Use-Case 5 subscription has been triggered.");
}
}
}
用例 6 
// Usecase 6: Alert will be trigged if sum of product count is less than 10 in past 10 minutes
// Timer will tick every 1 seconds (1000MS)

// Getting the name of the event that triggered the subscription
var alias = events.dataShape.fields;
// Check if the alias matches "Me_DataChange_useCase6_ProductCount"
if (alias == "Me_DataChange_useCase6_ProductCount") {
// Check if the productCountDict exists in the state
if (thisSub.JSONState.productCountDict != undefined) {
// Delete entries older than 10 minutes
deleteOlderThan10Minutes();
// Add a new entry with the current timestamp and the new value from the event
thisSub.JSONState.productCountDict[Date.now().toString()] = events[alias].eventData.newValue.value;
} else {
// Initialize the productCountDict as an empty object and add a new entry
thisSub.JSONState.productCountDict = {};
thisSub.JSONState.productCountDict[Date.now().toString()] = events[alias].eventData.newValue.value;
}
} else {
// Check if the startTime is undefined and set it to the current time
if (thisSub.JSONState.startTime == undefined) {
thisSub.JSONState.startTime = Date.now();
}
// Check if the productCountDict exists in the state
if (thisSub.JSONState.productCountDict != undefined) {
// Delete entries older than 10 minutes
deleteOlderThan10Minutes();
// Check if an alert should be triggered and log a warning message
if (shouldTriggerAlert()) {
logger.warn("Use-Case 6 subscription has been triggered");
}
}
}
// Function to delete entries older than 10 minutes from productCountDict
function deleteOlderThan10Minutes() {
var keys = Object.keys(thisSub.JSONState.productCountDict);
for (var i = 0; i < keys.length; i++) {
var key = keys[i];
// Check if the time difference is greater than 10 minutes (600,000 milliseconds)
if (Date.now() - parseInt(key) > 600000) {
delete thisSub.JSONState.productCountDict[key];
}
}
}
// Function to check if an alert should be triggered
function shouldTriggerAlert() {
// Check if the sum of product count is less than 10 and if the time difference is greater than 10 minutes
if (productCountDictSUM() < 10 && Date.now() - thisSub.JSONState.startTime > 600000) {
return true;
} else {
return false;
}
}
// Function to calculate the sum of values in productCountDict
function productCountDictSUM() {
var keys = Object.keys(thisSub.JSONState.productCountDict);
var sum = 0;
for (var i = 0; i < keys.length; i++) {
var key = keys[i];
var value = thisSub.JSONState.productCountDict[key];
sum += value;
}
return sum;
}
用例 7 
// Usecase 7: alert will be triggered if average current (5 minutes window) is higher than 2.5A,
// and it will be checked very 30 seconds.
// Timer will tick every 30 seconds (30000MS)
// Getting the name of the event that triggered the subscription
var alias = events.dataShape.fields;
// Checking if the data alias matches the expected value
if (alias == "Me_DataChange_useCase7_Current") {
// If the data dictionary exists, update it with the new current value
if (thisSub.JSONState.currentDict != undefined) {
deleteOlderThan5Minutes();
thisSub.JSONState.currentDict[Date.now().toString()] = events[alias].eventData.newValue.value;
} else {
// If the data dictionary doesn't exist, create it and set the start time
thisSub.JSONState.currentDict = {};
thisSub.JSONState.currentDict[Date.now().toString()] = events[alias].eventData.newValue.value;
}
} else {
if(thisSub.JSONState.startTime == undefined){
thisSub.JSONState.startTime = Date.now();
}
// If the data alias doesn't match, perform data management and potential alert triggering
if (thisSub.JSONState.currentDict != undefined) {
// Delete entries in the data dictionary older than 5 minutes
deleteOlderThan5Minutes();
// Check if conditions for triggering the alert are met
if (shouldTriggerAlert()) {
logger.warn("Use-Case 7 subscription has been triggered");
}
}
}
// Function to delete data entries older than 5 minutes
function deleteOlderThan5Minutes() {
var keys = Object.keys(thisSub.JSONState.currentDict);
for (var i = 0; i < keys.length; i++) {
var key = keys[i];
if (Date.now() - parseInt(key) > 300000) {
delete thisSub.JSONState.currentDict[key];
}
}
}
// Function to determine if the alert should be triggered
// Only in case 5 minutes has been passed, and AVG is higher then 2.5
function shouldTriggerAlert() {
if (currentDictAVG() > 2.5 && Date.now() - thisSub.JSONState.startTime > 300000) {
return true;
} else {
return false;
}
}
// Function to calculate the average of currentDict
function currentDictAVG() {
var keys = Object.keys(thisSub.JSONState.currentDict);
var sum = 0;
var count = 0;
for (var i = 0; i < keys.length; i++) {
var key = keys[i];
var value = thisSub.JSONState.currentDict[key];
sum += value;
count++;
}
var average = sum / count;
return average;
}
////at any data change i sum the current
////when 30 seconds has been passed i calculate AVG
//
//
//
////change property event:
//// check if there is any value older then 5, if so - delete. (can implement by get keys)
//// add the value to the dict with key of timestamp
////timer event:
//// check if there is any value older then 5, if so - delete. (can implement by get keys)
//// AVG calculation, and condition check. - not only per 30 seconds, but for all the data at 5 minutes
//
//// build function to: change if there is any value older then 5, if so - delete. (can implement by get keys)
//
// ** the first timer event will be the startTime point of the algorithm
用例 8 
// Usecase 8: the alert will be triggered when most recent value is greater than the previous 10 values
// Check if valuesArray state is undefined and initialize it
if (thisSub.JSONState.valuesArray === undefined) {
thisSub.JSONState.valuesArray = {};
}
// Check if valuesArray length is less than 10
let indexCounter = Object.keys(thisSub.JSONState.valuesArray).length;
if ( indexCounter < 3) {
// Add the new value to valuesArray
thisSub.JSONState.valuesArray["i" + indexCounter] = events["Me_DataChange_useCase8_prop"].eventData.newValue.value;
} else {
// The valuesArray is full

var max = 0;
// Find the maximum value in valuesArray
for (let i = 0; i < Object.keys(thisSub.JSONState.valuesArray).length; i++) {
if (thisSub.JSONState.valuesArray["i" + i] > max)
max = thisSub.JSONState.valuesArray["i" + i];
}

// Check if the new value is greater than the maximum value
if (events["Me_DataChange_useCase8_prop"].eventData.newValue.value > max)
logger.warn("Use-Case 8 Alert!!! max is " + events["Me_DataChange_useCase8_prop"].eventData.newValue.value);
// Shift the values in valuesArray by one position
for (var j = 0; j < parseInt((Object.keys(thisSub.JSONState.valuesArray)).length) - 1; j++) {
thisSub.JSONState.valuesArray["i" + j] = thisSub.JSONState.valuesArray["i" + (j + 1)];
}
// Add the new value to valuesArray using the current indexCounter
thisSub.JSONState.valuesArray["i" + (indexCounter - 1)] = events["Me_DataChange_useCase8_prop"].eventData.newValue.value;
}
logger.warn("Use-Case 8 End. valuesArray = " + thisSub.JSONState.valuesArray);
这对您有帮助吗?