Last Changed Record Appender


This stage caches the latest data of each point and appends the latest data to each record.


For a single point of a single device:

  • If there is no existing record of the point
    • Cache the data of the point
    • Append the cached data to the /attr/lastChangedRecordAppender field of the record
  • If there is an existing record of the point
    • Append the current cached data to the /attr/lastChangedRecordAppender field of the record
    • Compare the data with the cached data. If there are differences, refresh the cached data with the arrived data.
  • This stage cannot guarantee idempotence of the calculation results due to failure retries caused by any reasons, such as cluster node exceptions.

Configuration

The configuration tabs for this stage are General, Basic, Input/Output, and CacheConfig.

General

Name Required? Description
Name Yes The name of the stage.
Description No The description of the stage.
Stage Library Yes The streaming operator library to which the stage belongs.
Required Fields No The fields that the data records must contain. If the specified fields are not included, the record will be filtered out.
Preconditions No The conditions that must be satisfied by the data records. Records that do not meet the conditions will be filtered out. For example, ${record:value('/value') > 0}. For the syntax of EL expressions, see Expression Language.
On Record Error Yes

The processing method for error data.

  • Discard: Error data will be discarded and ignored
  • Send to Error: Error messages will be reported
  • Stop Pipeline: The pipeline will be stopped

Basic

Name Required? Description
Quality Filter No Filter the data according to the data quality. Only records that meet the quality conditions will be processed by this stage.

Input/Output

Name Required? Description
Input/Output Yes The last changed data is attached to each record. Specify the input points that hold the last changed data, and the output points that receive the output results.
Input Point Yes Specify the input point that holds the last changed data, using the format {modelId}::{pointId}.
Output Point Yes Specify the output point that receives the output results, using the format {modelId}::{pointId}.

CacheConfig

Name Required? Description
Cache Type Yes

Select the storage type for cache data. Options are Redis and Local storage.

  • Redis: The advantage is that the cached data will not be lost after the stream processing pipeline is paused, restarted, or retried. The disadvantage is that the data processing speed is slow and that it is sensitive to network performance. It is recommended that the network delay should be less than 1ms. Otherwise, the data processing performance will be affected.
  • Local: The advantage is that the data processing speed is fast. The disadvantage is that the cached data will be lost after the stream processing pipeline is paused, restarted, or retried.

Output Results

The output results of this stage are included in the attr struct. The description of the fields are as follows:

Name Data Type Description
lastChangedRecordAppender Map The point attribute data object.
lastChangedTs Long The timestamp of the last changed record.
lastChangedValue Object The value of the last changed record.
lastChangedRecord Map The complete information of the last changed record.

Output Example

../../../_images/last_changed_record_appender.png