Get Stage State

Get the intermediate state data of a specified operator (stage) in a stream processing pipeline.

Prerequisites

Stream data processing pipelines have been created with the calculator library of the Stream Processing service.

Request Format

POST https://{apigw-address}/streaming/v2.0/stage-state?action=get

Request Parameters (URI)

Name Location (Path/Query) Mandatory/Optional Data Type Description
orgId Query Mandatory String The organization ID. How to get the orgId>>

Request Parameters (Body)

Name Mandatory/Optional Data Type Description
pipelineId Mandatory String The stream processing pipeline ID, which can be found on the EnOS Management Console > Stream Processing > Stream Operation page.
stageInstanceName Mandatory String The stage instance name, which can be found on the Stream Development page. Click the stream processing pipeline name, select the target operator, and copy the stage instance name on the Info page.
assetIds Mandatory String The asset ID. Supports the query of multiple asset IDs, separated by commas.
pointIds Mandatory String The measurement point ID. Supports the query of multiple measurement point IDs, separated by commas. The upper limit of the number of measurement points that can be queried is 100,000.

Response Parameters

Name Data Type Description
data List<JSONObject> The list of operator intermediate state data. For more information, see items. Depending on the operator type being queried (distinguished by the stageInstanceName prefix in the request parameters), different lists of operator intermediate state data will be returned. Note: The data formats of different types of operators are also different.

items

Type 1:Generic Type

Operators of this type: LastRecordAppender, LastChangedRecordAppender, LatestRecordMerger, RecordCapturer

Name Data Type Description
modelId String The model ID.
modelIdPath String The model ID path.
attr Object The struct that contains the output results of the operator.
assetId String The asset ID.
pointId String The measurement point ID.
time Long The measurement point data timestamp (UNIX time, accurate to the second).
value String The measurement point data.
quality Integer The quality tagging for the measurement point data.
dq Long The quality indicator of the measurement point data.

Type 2:Simple Timestamp Type

Operators of this type: LatePointTagger

Name Data Type Description
assetId String The asset ID.
pointId String The measurement point ID.
time Long The measurement point data timestamp (UNIX time, accurate to the second).

Type 3:Window Aggregation Type

Operators of this type: FixedTimeWindowAggregator, SlidingTimeWindowAggregator

Name Data Type Description
assetId String The asset ID.
pointId String The measurement point ID.
%s::windowStrategy::%s Object The state information required by the window function calculation. %s represents the string determined according to information such as assets and measurement points.
%s::updated Object The context information for the window function. %s represents the string determined according to information such as assets and measurement points.
%s::watermark Long The watermark of the current asset of the window function. %s represents the string determined according to information such as assets and measurement points.

Type 4:Simple Window Aggregation Type

Operators of this type: SimplifiedTimeWindowAggregator

Name Data Type Description
assetId String The asset ID.
pointId String The measurement point ID.
time Long The measurement point data timestamp (UNIX time, accurate to the second).
windowSum Object The state information required by the window function calculation.

Error Code

Code Error Information Description
61102 Missing param xx The parameter format or value is not valid.
61105 Too many points The number of requested measurement points exceeds the limit.
61106 Wrong pipelineId or OU has no privilege The provided pipeline ID is not valid, or the pipeline ID and organization ID do not match.
61107 Not supported stage type The specified stage type is not supported.
61199   Other errors.

Samples

Request Sample

url: https://{apigw-address}/streaming/v2.0/stage-state?action=get&orgId=yourOrgId

method: POST

requestBody
{
    "pipelineId":"yourPipelineId",
    "stageInstanceName":"yourStageInstanceName",
    "assetIds":"yourAssetIds",
    "pointIds":"yourPointIds"
}

Return Sample

{
  "code": 0,
  "msg": "OK",
  "data": {
    "items": [
  {
        "modelId": "modelId",
        "modelIdPath": "/modelIdPath",
        "assetId": "assetId",
        "pointId": "pointId",
        "time": 1582648020580,
        "value": 3.1,
        "quality": 0,
        "attr": {}
    },
    {
        "modelId": "modelId",
        "modelIdPath": "/modelIdPath",
        "assetId": "assetId2",
        "pointId": "pointId2",
        "time": 1582648020580,
        "value": 3.2,
        "quality": 1,
        "attr": {}
    }
    ]
  }
}

Java SDK Sample

import com.alibaba.fastjson.JSONObject;
import com.envision.apim.poseidon.config.PConfig;
import com.envision.apim.poseidon.core.Poseidon;
import com.envision.apim.poseidon.request.PoseidonRequest;
import org.junit.Before;
import org.junit.Test;

public class Sample {
    private static final String API_Gateway_URL = "https://{domain_url}";
    private Poseidon poseidon;

    private static class Request extends PoseidonRequest {

        public void setBodyParams(String key, Object value) {
            bodyParams().put(key, value);
        }

        public void setMethod(String method) {
            this.method = method;
        }

        private String method;

        @Override
        public String baseUri() {
            return "";
        }

        @Override
        public String method() {
            return method;
        }
    }

    @Before
    public void init() {
        poseidon = Poseidon.config(
                PConfig.init()
                        .appKey("AccessKey of your APP")
                        .appSecret("SecretKey of your APP")
        ).method("POST");
    }

    @Test
    public void GetStageState() {
        Request request = new Request();
        request.setBodyParams("assetIds", "yourAssetId");
        request.setBodyParams("pointIds", "yourPointId");
        request.setBodyParams("pipelineId", "yourPipelineId");
        request.setBodyParams("stageInstanceName", "yourStageInstanceName");

        JSONObject response = poseidon
                .url(API_Gateway_URL + "/streaming/v2.0/stage-state")
                .queryParam("orgId", "yourOrgId")
                .queryParam("action", "get")
                .getResponse(request, JSONObject.class);
        System.out.println(response);
    }
}