Consuming Subscribed Data for EnOS Edge Users


After the data subscription job starts running, you can use the data subscription SDK to develop applications and consume the subscribed data. This topic shows how to install the data subscription SDK and provides code samples for consuming the subscribed data.

EnOS Edge supports the following SDKs to subscribe data:

  • Java SDK
  • Go SDK
  • C SDK

For details about EnOS SDK and download addresses, see EnOS SDKs and tools

For Java SDK

The steps for installing the Java SDK and consuming the subscribed data are as follows.

Installing Data Subscription SDK for Java

Get the Maven dependency information of the data subscription SDK and add it to your development project.

  1. Open the Maven repository of the SDK at Maven dependency for EnOS Edge.

  2. Open your development environment, and add the maven dependency for the SDK in your Java project. See the following example.

    <dependency>
      <groupId>com.envisioniot.sub</groupId>
      <artifactId>enos-subscribe-impl</artifactId>
      <version>1.0.1</version>
    </dependency>
    <dependency>
      <groupId>com.google.code.gson</groupId>
      <artifactId>gson</artifactId>
      <version>2.8.0</version>
    </dependency>
    <dependency>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
      <version>1.2.16</version>
    </dependency>
    

Code Sample for Consuming the Subscribed Real-time Data

The following code sample is for consuming the subscribed asset real-time data with a specified consumer group. In case of huge data volume, you can run 2 consumer clients of the same consumer group to improve the data consumption efficiency.

import com.envisioniot.sub.client.EosClient;
import com.envisioniot.sub.client.data.IDataHandler;
import com.envisioniot.sub.client.data.IDataService;
import com.envisioniot.sub.common.model.dto.StreamMessage;

public class DataServiceDemo {
    public static void main(String[] args) throws Exception {
        // Host of the data subscription service
        String host = "subscription_server";
        // Port of the data subscription service
        int port = 9001;
        // APP authentication (Generated when registering your APP)
        String accessKey = "access_key";
        // APP authentication
        String secretKey = "secret_key";

        // Subscription ID
        String subId = "subscription_id";

        // Consumer group name
        String consumerGroup = "consumer_group";

        EosClient eosClient = new EosClient(host, port, accessKey, secretKey, true);

        // Get the real-time data service
        IDataService dataService = eosClient.getDataService();

        // Create the data handler function
        IDataHandler dataHandler = new IDataHandler(){
            public void dataRead(StreamMessage message) {
                System.out.println(message);
            }
        };

        // Establish connection with subscription ID and consumer group
        dataService.subscribe(dataHandler, subId, consumerGroup);
    }
}

Note

  • The host and port of the subscription server will vary with the cloud region and instance. To get the address and port of the subscription service for your cloud instance, log in to the EnOS Management Console and select Help > Environment Information.
  • Each subscription topic has 2 partitions. That is, each topic allows at most 2 data consumers at the same time.
  • Currently, a data consumer can consume 1 topic only.
  • By default, data is stored in the topic for 3 days.

Code Sample for Consuming the Subscribed Alert Data

The following code sample is for consuming the subscribed asset alert data with the default consumer group.

import com.envisioniot.sub.client.EosClient;
import com.envisioniot.sub.client.event.IAlertHandler;
import com.envisioniot.sub.client.event.IAlertService;
import com.envisioniot.sub.common.model.Alert;

public class AlertServiceDemo1 {
    public static void main(String[] args) throws Exception {
        // Host of the data subscription service
        String host = "subscription_server";
        // Port of the data subscription service
        int port = 9001;
        // APP authentication (Generated when registering your APP)
        String accessKey = "access_key";
        // APP authentication
        String secretKey = "secret_key";

        // Subscription ID
        String subId = "subscription_id";

        EosClient eosClient = new EosClient(host, port, accessKey, secretKey, true);

        // Get the alert data service
        IAlertService alertService = eosClient.getAlertService();

        // Create the data handler function
        IAlertHandler alertHandler = new IAlertHandler (){
            @Override
            public void alertRead(Alert alert) {
                System.out.println(alert);
            }
        };

        // Establish connection with subscription ID
        alertService.subscribe(alertHandler, subId);
    }
}

Code Sample for Consuming the Subscribed Event Data

The following code sample is for consuming the subscribed asset event data with the default consumer group.

import com.envisioniot.sub.client.EosClient;
import com.envisioniot.sub.client.event.IAlertHandler;
import com.envisioniot.sub.client.event.IAlertService;
import com.envisioniot.sub.common.model.Alert;

public class AlertServiceDemo1 {
    public static void main(String[] args) throws Exception {
        // Host of the data subscription service
        String host = "subscription_server";
        // Port of the data subscription service
        int port = 9001;
        // APP authentication (Generated when registering your APP)
        String accessKey = "access_key";
        // APP authentication
        String secretKey = "secret_key";

        // Subscription ID
        String subId = "subscription_id";

        EosClient eosClient = new EosClient(host, port, accessKey, secretKey, true);

        // Get the event data service
        IEventService eventService = eosClient.getEventService();

        // Create the data handler function
        IEventHandler eventHandler = new IEventHandler (){
            @Override
            public void eventRead(String event) {
                System.out.println(event);
            }
        };

        // Establish connection with subscription ID
        eventService.subscribe(eventHandler, subId);
    }
}

For Go SDK

The steps for installing the Go SDK and consuming the subscribed data are as follows. For more information, see Data Subscription Go SDK.

Configuring Data Subscription SDK for Go

  1. Obtain the source code of Data Subscription SDK for go.

    git clone https://github.com/EnvisionIot/edge-subscription-service-sdk-go
    
  2. Initialize Subscriber with Function

    import (
        "edge-subscription-go/subscribe"
    )
    
    func main(){
        edgeServiceIp := "127.0.0.1"
        edgeServicePort := 9150
        subTopicName := "DATASVC.SUB.group"
        subChannelName := "real_time_point"
        accessKey := "a-b-c"
        appSecret := "a-d-f"
        subId := "subTopicId"
        f, err := subscribe.NewSubscriber(edgeServiceIp, edgeServicePort, subTopicName, subChannelName,accessKey, appSecret, subId)
        if err != nil{
            //init failed, return
            return
        }
    }
    
  3. Define the Message Processing Function

    import (
        "edge-subscription-go/subscribe/record"
        "fmt"
    )
    
    func main(){
        ...
        functionRealTimePoint := func(records []record.SSRecordPoint) {
                for _, point := range records{
                    fmt.Printf("real time point value = %s\n", point.GetValue())
                }
        }
        ...
    }
    

Code Sample for Consuming Subscribed Real-time Data

import (
    "edge-subscription-go/subscribe"
    "edge-subscription-go/subscribe/record"
    "fmt"
)

func main(){
    //Initialize Subscriber
    edgeServiceIp := "127.0.0.1"
    edgeServicePort := 9150
    subTopicName := "DATASVC.SUB.group"
    subChannelName := "real_time_point"
    accessKey := "accessKey"
    appSecret := "appSecret"
    subId := "subTopicId"
    f, err := subscribe.NewSubscriber(edgeServiceIp, edgeServicePort, subTopicName, subChannelName,accessKey, appSecret, subId)
    if err != nil{
        return
    }

    //Define the Message Processing Function
    functionRealTimePoint := func(records []record.SSRecordPoint) {
        for _, point := range records{
            fmt.Printf("real time point value = %s\n", point.GetValue())
        }
    }
    //start
    _ = f.SubRealTimePoint(functionRealTimePoint)
}

Code Sample for Consuming the Subscribed Alert Data

import (
    "edge-subscription-go/subscribe"
    "edge-subscription-go/subscribe/alarm"
    "fmt"
)

func main(){
    //Initialize Subscriber
    edgeServiceIp := "127.0.0.1"
    edgeServicePort := 9150
    subTopicName := "DATASVC.SUB.group"
    subChannelName := "alarm_data"
    accessKey := "accessKey"
    appSecret := "appSecret"
    subId := "subTopicId"
    f, err := subscribe.NewSubscriber(edgeServiceIp, edgeServicePort, subTopicName, subChannelName,accessKey, appSecret, subId)
    if err != nil{
        return
    }

    //Define the Message Processing Function
    functionAlarmData := func(records []alarm.AlarmRecord) {
        for _, point := range records{
            fmt.Printf("alarm data value = %s\n", point.GetValue())
        }
    }
    //start
    _ = f.SubAlarmData(functionAlarmData)
}

Code Sample for Consuming Subscribed Response of Setting Points

import (
    "edge-subscription-go/subscribe"
    "edge-subscription-go/subscribe/control"
    "fmt"
)

func main(){
    //Initialize Subscriber
    edgeServiceIp := "127.0.0.1"
    edgeServicePort := 9150
    subTopicName := "DATASVC.SUB.group"
    subChannelName := "alarm_data"
    accessKey := "accessKey"
    appSecret := "appSecret"
    subId := "subTopicId"
    f, err := subscribe.NewSubscriber(edgeServiceIp, edgeServicePort, subTopicName, subChannelName,accessKey, appSecret, subId)
    if err != nil{
        return
    }

    //Define the Message Processing Function
    functionSetPointResponse := func(records []setpoint.SetMeasurepointResponsePoint) {
        for _, point := range records{
            fmt.Printf("set measurePoint input value = %s\n", point.GetInputData())
        }
    }
    //start
    _ = f.SubSetMeasurePointResponse(functionSetPointResponse)
}

Code Sample for Consuming Subscribed Results of Control Command

import (
    "edge-subscription-go/subscribe"
    "edge-subscription-go/subscribe/control"
    "fmt"
)

func main(){
    //Initialize Subscriber
    edgeServiceIp := "127.0.0.1"
    edgeServicePort := 9150
    subTopicName := "DATASVC.SUB.group"
    subChannelName := "alarm_data"
    accessKey := "accessKey"
    appSecret := "appSecret"
    subId := "subTopicId"
    f, err := subscribe.NewSubscriber(edgeServiceIp, edgeServicePort, subTopicName, subChannelName,accessKey, appSecret, subId)
    if err != nil{
        return
    }

    //Define the Message Processing Function
    functionControlResponse := func(records []control.ControlResponsePoint) {
        for _, point := range records{
            fmt.Printf("control response value = %s\n", point.GetInputData())
        }
    }
    //start
    _ = f.SubControlResponse(functionControlResponse)
}

For C SDK

The steps for installing the C SDK and consuming the subscribed data are as follows. For more information, see Data Subscription C SDK.

Installing and Compiling Data Subscription SDK for C

  1. To use the EnOS IoT SDK for C, you will need to install CMake, download CMake first.

  2. Select the mode to compile the SDK:

    To Compile in Debug mode:

           cmake -D PLATFORM_DEFINE=linux_x86_normal_64_local -D DEBUG_MODE=ON .
           make //or use "make VERBOSE=1" to get more make informations
    
    To compile in **Release** mode:
    
    cmake -D PLATFORM_DEFINE=linux_x86_normal_64_local -D DEBUG_MODE=OFF .
    make //or use "make VERBOSE=1" to get more make informations
    

Code Sample for Consuming the Subscribed Real-time Data

#include "edge_service_c_api/edge_service_c_api.h"

struct my_ctx {
    int aa;
    char bb[128];
};

void my_log_writer(struct LogInfoBox *log_info, void *user_ctx) {
    if (log_info == NULL) {
        return;
    }
    printf("[TEST-%s]:%s(file=%s, function=%s, line=%d)\n", log_info->module_name, log_info->log_str,
           log_info->file_path,
           log_info->func_name, log_info->line_num);

    //注意用完之后要释放
    delete_log_info_box(log_info);
}

void print_data_subscribe_msg(struct DataSubscribeStruct *dss_ptr) {
    printf("---------------------->\n");
    if (dss_ptr == NULL) {
        printf("dss_ptr == NULL\n");
        return;
    }
    printf("dss_ptr->point_count=%d\n", dss_ptr->point_count);
    int ii = 0;
    for (ii = 0; ii < dss_ptr->point_count; ii++) {
        printf("*******>\n");
        printf("dss_ptr->points[%d].orgid=%s\n", ii, dss_ptr->points[ii].orgid);
        printf("dss_ptr->points[%d].modelid=%s\n", ii, dss_ptr->points[ii].modelid);
        printf("dss_ptr->points[%d].assetid=%s\n", ii, dss_ptr->points[ii].assetid);
        printf("dss_ptr->points[%d].collectdeviceid=%s\n", ii, dss_ptr->points[ii].collectdeviceid);
        printf("dss_ptr->points[%d].pointid=%s\n", ii, dss_ptr->points[ii].pointid);
        printf("dss_ptr->points[%d].time=%lld\n", ii, (long long int) (dss_ptr->points[ii].time));
        printf("dss_ptr->points[%d].value=%s\n", ii, dss_ptr->points[ii].value);
        printf("dss_ptr->points[%d].quality=%d\n", ii, dss_ptr->points[ii].quality);
        printf("dss_ptr->points[%d].dq=%lld\n", ii, (long long int) (dss_ptr->points[ii].dq));
        printf("dss_ptr->points[%d].modelpath=%s\n", ii, dss_ptr->points[ii].modelpath);
        printf("dss_ptr->points[%d].policytype=%s\n", ii, dss_ptr->points[ii].policytype);
        printf("dss_ptr->points[%d].signaltype=%s\n", ii, dss_ptr->points[ii].signaltype);
        printf("dss_ptr->points[%d].hasquality=%d\n", ii, dss_ptr->points[ii].hasquality);
        printf("dss_ptr->points[%d].datatype=%s\n", ii, dss_ptr->points[ii].datatype);
        printf("dss_ptr->points[%d].subdatatype=%s\n", ii, dss_ptr->points[ii].subdatatype);
        printf("dss_ptr->points[%d].attr=%s\n", ii, dss_ptr->points[ii].attr);
        printf("dss_ptr->points[%d].usingoem=%d\n", ii, dss_ptr->points[ii].usingoem);
        printf("dss_ptr->points[%d].oemtime=%lld\n", ii, (long long int) (dss_ptr->points[ii].oemtime));
        printf("dss_ptr->points[%d].pointtype=%d\n", ii, dss_ptr->points[ii].pointtype);
        printf("<*******\n");
    }

    printf("<----------------------\n\n");
}

void connect_callback(void *work_ctx, char *channel_id, void *user_ctx) {
    printf("[DATASERVICE]:connected, channel_id=%s(file=%s, function=%s, line=%d)\n",
           channel_id, __FILE__, __FUNCTION__, __LINE__);
    struct my_ctx *x = (struct my_ctx *) (user_ctx);
    if (x == NULL) {
        return;
    }
    printf("[DATASERVICE]:user_ctx->aa=%d, user_ctx->bb=%s(file=%s, function=%s, line=%d)\n",
           x->aa, x->bb, __FILE__, __FUNCTION__, __LINE__);
    return;
}

void close_callback(void *work_ctx, char *channel_id, void *user_ctx) {
    printf("[DATASERVICE]:closed, channel_id=%s(file=%s, function=%s, line=%d)\n", channel_id,
           __FILE__, __FUNCTION__, __LINE__);
    struct my_ctx *x = (struct my_ctx *) (user_ctx);
    if (x == NULL) {
        return;
    }
    printf("[DATASERVICE]:user_ctx->aa=%d, user_ctx->bb=%s(file=%s, function=%s, line=%d)\n",
           x->aa, x->bb, __FILE__, __FUNCTION__, __LINE__);
    return;
}

void
msg_callback(void *work_ctx, char *channel_id, struct DataServiceMessage *msg, void *user_ctx) {
    printf("[DATASERVICE]:recv msg, channel_id=%s(file=%s, function=%s, line=%d)\n",
           channel_id, __FILE__, __FUNCTION__, __LINE__);
    struct my_ctx *x = (struct my_ctx *) (user_ctx);
    if (x == NULL) {
        //注意用完后要释放
        delete_data_service_msg(msg);
        return;
    }
    printf("[DATASERVICE]:user_ctx->aa=%d, user_ctx->bb=%s(file=%s, function=%s, line=%d)\n",
           x->aa, x->bb, __FILE__, __FUNCTION__, __LINE__);

    printf("[DATASERVICE]:msg->topic_type=%d(file=%s, function=%s, line=%d)\n",
           msg->topic_type, __FILE__, __FUNCTION__, __LINE__);

    switch (msg->topic_type) {
        case TOPIC_TYPE_DATA_SUBSCRIBE:
        case TOPIC_TYPE_DATA_SUBSCRIBE_ALL:
            print_data_subscribe_msg((struct DataSubscribeStruct *) (msg->msg));
            break;
        default:
            printf("[DATASERVICE]:unsupported msg->topic_type=%d(file=%s, function=%s, line=%d)\n",
                   msg->topic_type, __FILE__, __FUNCTION__, __LINE__);
            break;
    }

    //注意用完后要释放
    delete_data_service_msg(msg);
    return;
}

int main(int argc, char *argv[]) {
    int ret = 0;
    struct IPBox *ip_list = NULL;
    char topic_name[60];
    memset(topic_name, 0, sizeof(topic_name));
    char consumerGroup[60];
    memset(consumerGroup, 0, sizeof(consumerGroup));

    //需要连接的ip
    ip_list = add_ip_to_ip_box(ip_list, "127.0.0.1");
    //订阅的topic类型,实时数据订阅可以选TOPIC_TYPE_DATA_SUBSCRIBE或TOPIC_TYPE_AUTO
    int topic_type = TOPIC_TYPE_DATA_SUBSCRIBE;
    //端口
    int port = EDGE_DATASERVICE_DEFAULT_PORT;
    //topic名字,实时数据订阅的topic都是以DATASVC.SUB.开头
    snprintf(topic_name, sizeof(topic_name), "%s", "DATASVC.SUB.APP.SUBTEST");
    //consumerGroup,非必填,如果consumerGroup是NULL,则默认为default,详细说明参考new_data_service_ctx函数注释
    snprintf(consumerGroup, sizeof(consumerGroup), "%s", "default");

    struct DataServiceCtx *ctx = NULL;

    //1.init_edge_service_ctx;初始化全局变量,非线程安全
    ret = init_edge_service_ctx();
    if (ret < 0) {
        printf("[DATASERVICE_TEST]:init_edge_service_ctx error(file=%s, function=%s, line=%d)\n", __FILE__,
               __FUNCTION__, __LINE__);
        delete_ip_box(ip_list);
        return -1;
    }

    struct my_ctx *user_ctx = (struct my_ctx *) malloc(sizeof(struct my_ctx));
    if (user_ctx == NULL) {
        printf("[DATASERVICE_TEST]:user_ctx malloc error(file=%s, function=%s, line=%d)\n", __FILE__, __FUNCTION__,
               __LINE__);
        delete_ip_box(ip_list);
        uninit_edge_service_ctx();
        return -1;
    }
    memset(user_ctx, 0, sizeof(struct my_ctx));
    user_ctx->aa = 10;
    snprintf(user_ctx->bb, sizeof(user_ctx->bb), "%s", "user_custom_info");

    //2.set_log_level(EDGE_LOG_INFO);设置日志等级,不设置默认为EDGE_LOG_INFO,线程安全,可以随时调用这个函数,实时生效
    set_log_level(EDGE_LOG_INFO);
    //3.set_log_writer(my_log_writer, user_ctx);设置打印函数,sdk内部需要打印时会调用这个打印函数打印,如果不设置,默认打印到命令行终端,打印函数中注意数据用完后需要delete_log_info_box释放,非线程安全,一开始设置一次就可以了
    set_log_writer(my_log_writer, user_ctx);

    //4.调用new_data_service_ctx函数,初始化必要的上下文,线程安全,注意当只填一个IP时,无论这个IP是主还是备,都会去向这个IP订阅数据,填多个IP时,会根据主备情况自动切换
    ctx = new_data_service_ctx(
            ip_list,
            port,
            "your accessKey",
            "your secretKey",
            topic_name,
            consumerGroup,
            topic_type,
            1,
            user_ctx,
            connect_callback,
            close_callback,
            msg_callback
    );

    if (ctx == NULL) {
        printf("[DATASERVICE_TEST]:new_data_service_ctx error(file=%s, function=%s, line=%d)\n", __FILE__, __FUNCTION__,
               __LINE__);
        delete_ip_box(ip_list);
        ip_list = NULL;
        free(user_ctx);
        user_ctx = NULL;
        uninit_edge_service_ctx();
        return -1;
    }

    //5.调用data_service_ctx_start函数,启动相关模块,开始从服务端接收数据,非线程安全
    ret = data_service_ctx_start(ctx);
    if (ret < 0) {
        printf("[DATASERVICE_TEST]:data_service_ctx_start error(file=%s, function=%s, line=%d)\n", __FILE__,
               __FUNCTION__, __LINE__);
        delete_ip_box(ip_list);
        ip_list = NULL;
        free(user_ctx);
        user_ctx = NULL;
        delete_data_service_ctx(ctx);
        ctx = NULL;
        uninit_edge_service_ctx();
        return -1;
    }

    //运行60s后退出
    edge_sleep(60000);

    //不使用时记得stop和delete

    //ip_box不使用时是需要释放的
    delete_ip_box(ip_list);

    //6.可以调用data_service_ctx_stop函数暂停接收数据,start和stop的调用要成对,不能在没有调用start的情况下调用stop,也不能再已经start的情况下调用start,非线程安全
    data_service_ctx_stop(ctx);
    //7.delete_data_service_ctx;释放new_data_service_ctx占用的资源,退出时需要调用,需要在调用stop之后调用该函数,非线程安全
    delete_data_service_ctx(ctx);
    //8.uninit_edge_service_ctx;释放init_edge_service_ctx占用的资源,退出时需要调用,需要在调用delete_data_service_ctx之后调用该函数,非线程安全
    uninit_edge_service_ctx();

    free(user_ctx);

    return 0;
}

Code Sample for Consuming the Subscribed Results of Control Command

#include "edge_service_c_api/edge_service_c_api.h"

struct my_ctx {
    int aa;
    char bb[128];
};

void my_log_writer(struct LogInfoBox *log_info, void *user_ctx) {
    if (log_info == NULL) {
        return;
    }
    printf("[TEST-%s]:%s(file=%s, function=%s, line=%d)\n", log_info->module_name, log_info->log_str,
           log_info->file_path,
           log_info->func_name, log_info->line_num);

    //注意用完之后要释放
    delete_log_info_box(log_info);
}

void print_control_response_msg(struct ControlResponseStruct *crs_ptr) {
    printf("---------------------->\n");
    if (crs_ptr == NULL) {
        printf("crs_ptr == NULL\n");
        return;
    }
    printf("crs_ptr->point_count=%d\n", crs_ptr->point_count);
    int ii = 0;
    for (ii = 0; ii < crs_ptr->point_count; ii++) {
        printf("*******>\n");
        printf("crs_ptr->points[%d].requestid=%s\n", ii, crs_ptr->points[ii].requestid);
        printf("crs_ptr->points[%d].messageid=%s\n", ii, crs_ptr->points[ii].messageid);
        printf("crs_ptr->points[%d].requestmethod=%s\n", ii,
               crs_ptr->points[ii].requestmethod);
        printf("crs_ptr->points[%d].calltype=%s\n", ii, crs_ptr->points[ii].calltype);
        printf("crs_ptr->points[%d].controlchannelid=%s\n", ii,
               crs_ptr->points[ii].controlchannelid);
        printf("crs_ptr->points[%d].productkey=%s\n", ii, crs_ptr->points[ii].productkey);
        printf("crs_ptr->points[%d].devicekey=%s\n", ii, crs_ptr->points[ii].devicekey);
        printf("crs_ptr->points[%d].assetid=%s\n", ii, crs_ptr->points[ii].assetid);
        printf("crs_ptr->points[%d].servicename=%s\n", ii, crs_ptr->points[ii].servicename);
        printf("crs_ptr->points[%d].serviceid=%s\n", ii, crs_ptr->points[ii].serviceid);
        printf("crs_ptr->points[%d].callbackurl=%s\n", ii, crs_ptr->points[ii].callbackurl);
        printf("crs_ptr->points[%d].inputdata=%s\n", ii, crs_ptr->points[ii].inputdata);
        printf("crs_ptr->points[%d].outputdata=%s\n", ii, crs_ptr->points[ii].outputdata);
        printf("crs_ptr->points[%d].status=%lld\n", ii, (long long int) (crs_ptr->points[ii].status));
        printf("crs_ptr->points[%d].msg=%s\n", ii, crs_ptr->points[ii].msg);
        printf("crs_ptr->points[%d].submsg=%s\n", ii, crs_ptr->points[ii].submsg);
        printf("crs_ptr->points[%d].timeout=%lld\n", ii, (long long int) (crs_ptr->points[ii].timeout));
        printf("crs_ptr->points[%d].gmtservicerequest=%lld\n", ii,
               (long long int) (crs_ptr->points[ii].gmtservicerequest));
        printf("crs_ptr->points[%d].gmtservicereply=%lld\n", ii,
               (long long int) (crs_ptr->points[ii].gmtservicereply));
        printf("crs_ptr->points[%d].gmtdevicereply=%lld\n", ii,
               (long long int) (crs_ptr->points[ii].gmtdevicereply));
        printf("crs_ptr->points[%d].attr=%s\n", ii, crs_ptr->points[ii].attr);
        printf("<*******\n");
    }

    printf("<----------------------\n\n");
}

void connect_callback(void *work_ctx, char *channel_id, void *user_ctx) {
    printf("[DATASERVICE]:connected, channel_id=%s(file=%s, function=%s, line=%d)\n",
           channel_id, __FILE__, __FUNCTION__, __LINE__);
    struct my_ctx *x = (struct my_ctx *) (user_ctx);
    if (x == NULL) {
        return;
    }
    printf("[DATASERVICE]:user_ctx->aa=%d, user_ctx->bb=%s(file=%s, function=%s, line=%d)\n",
           x->aa, x->bb, __FILE__, __FUNCTION__, __LINE__);
    return;
}

void close_callback(void *work_ctx, char *channel_id, void *user_ctx) {
    printf("[DATASERVICE]:closed, channel_id=%s(file=%s, function=%s, line=%d)\n", channel_id,
           __FILE__, __FUNCTION__, __LINE__);
    struct my_ctx *x = (struct my_ctx *) (user_ctx);
    if (x == NULL) {
        return;
    }
    printf("[DATASERVICE]:user_ctx->aa=%d, user_ctx->bb=%s(file=%s, function=%s, line=%d)\n",
           x->aa, x->bb, __FILE__, __FUNCTION__, __LINE__);
    return;
}

void
msg_callback(void *work_ctx, char *channel_id, struct DataServiceMessage *msg, void *user_ctx) {
    printf("[DATASERVICE]:recv msg, channel_id=%s(file=%s, function=%s, line=%d)\n",
           channel_id, __FILE__, __FUNCTION__, __LINE__);
    struct my_ctx *x = (struct my_ctx *) (user_ctx);
    if (x == NULL) {
        //注意用完后要释放
        delete_data_service_msg(msg);
        return;
    }
    printf("[DATASERVICE]:user_ctx->aa=%d, user_ctx->bb=%s(file=%s, function=%s, line=%d)\n",
           x->aa, x->bb, __FILE__, __FUNCTION__, __LINE__);

    printf("[DATASERVICE]:msg->topic_type=%d(file=%s, function=%s, line=%d)\n",
           msg->topic_type, __FILE__, __FUNCTION__, __LINE__);

    switch (msg->topic_type) {
        case TOPIC_TYPE_CONTROL_RESPONSE:
            print_control_response_msg((struct ControlResponseStruct *) (msg->msg));
            break;
        default:
            printf("[DATASERVICE]:unsupported msg->topic_type=%d(file=%s, function=%s, line=%d)\n",
                   msg->topic_type, __FILE__, __FUNCTION__, __LINE__);
            break;
    }

    //注意用完后要释放
    delete_data_service_msg(msg);
    return;
}

int main(int argc, char *argv[]) {
    int ret = 0;
    struct IPBox *ip_list = NULL;
    char topic_name[60];
    memset(topic_name, 0, sizeof(topic_name));
    char consumerGroup[60];
    memset(consumerGroup, 0, sizeof(consumerGroup));

    //需要连接的ip
    ip_list = add_ip_to_ip_box(ip_list, "127.0.0.1");
    //订阅的topic类型,控制反较数据订阅可以选TOPIC_TYPE_CONTROL_RESPONSE或TOPIC_TYPE_AUTO
    int topic_type = TOPIC_TYPE_CONTROL_RESPONSE;
    //端口
    int port = EDGE_DATASERVICE_DEFAULT_PORT;
    //topic名字,控制反较数据订阅的topic都是以DATASVC.CONTROL.开头
    snprintf(topic_name, sizeof(topic_name), "%s", "DATASVC.CONTROL.APP.SUBTEST");
    //consumerGroup,非必填,如果consumerGroup是NULL,则默认为default,详细说明参考new_data_service_ctx函数注释
    snprintf(consumerGroup, sizeof(consumerGroup), "%s", "default");

    struct DataServiceCtx *ctx = NULL;

    //1.init_edge_service_ctx;初始化全局变量,非线程安全
    ret = init_edge_service_ctx();
    if (ret < 0) {
        printf("[DATASERVICE_TEST]:init_edge_service_ctx error(file=%s, function=%s, line=%d)\n", __FILE__,
               __FUNCTION__, __LINE__);
        delete_ip_box(ip_list);
        return -1;
    }

    struct my_ctx *user_ctx = (struct my_ctx *) malloc(sizeof(struct my_ctx));
    if (user_ctx == NULL) {
        printf("[DATASERVICE_TEST]:user_ctx malloc error(file=%s, function=%s, line=%d)\n", __FILE__, __FUNCTION__,
               __LINE__);
        delete_ip_box(ip_list);
        uninit_edge_service_ctx();
        return -1;
    }
    memset(user_ctx, 0, sizeof(struct my_ctx));
    user_ctx->aa = 10;
    snprintf(user_ctx->bb, sizeof(user_ctx->bb), "%s", "user_custom_info");

    //2.set_log_level(EDGE_LOG_INFO);设置日志等级,不设置默认为EDGE_LOG_INFO,线程安全,可以随时调用这个函数,实时生效
    set_log_level(EDGE_LOG_INFO);
    //3.set_log_writer(my_log_writer, user_ctx);设置打印函数,sdk内部需要打印时会调用这个打印函数打印,如果不设置,默认打印到命令行终端,打印函数中注意数据用完后需要delete_log_info_box释放,非线程安全,一开始设置一次就可以了
    set_log_writer(my_log_writer, user_ctx);

    //4.调用new_data_service_ctx函数,初始化必要的上下文,线程安全,注意当只填一个IP时,无论这个IP是主还是备,都会去向这个IP订阅数据,填多个IP时,会根据主备情况自动切换
    ctx = new_data_service_ctx(
            ip_list,
            port,
            "your accessKey",
            "your secretKey",
            topic_name,
            consumerGroup,
            topic_type,
            1,
            user_ctx,
            connect_callback,
            close_callback,
            msg_callback
    );

    if (ctx == NULL) {
        printf("[DATASERVICE_TEST]:new_data_service_ctx error(file=%s, function=%s, line=%d)\n", __FILE__, __FUNCTION__,
               __LINE__);
        delete_ip_box(ip_list);
        ip_list = NULL;
        free(user_ctx);
        user_ctx = NULL;
        uninit_edge_service_ctx();
        return -1;
    }

    //5.调用data_service_ctx_start函数,启动相关模块,开始从服务端接收数据,非线程安全
    ret = data_service_ctx_start(ctx);
    if (ret < 0) {
        printf("[DATASERVICE_TEST]:data_service_ctx_start error(file=%s, function=%s, line=%d)\n", __FILE__,
               __FUNCTION__, __LINE__);
        delete_ip_box(ip_list);
        ip_list = NULL;
        free(user_ctx);
        user_ctx = NULL;
        delete_data_service_ctx(ctx);
        ctx = NULL;
        uninit_edge_service_ctx();
        return -1;
    }

    //运行60s后退出
    edge_sleep(60000);

    //不使用时记得stop和delete

    //ip_box不使用时是需要释放的
    delete_ip_box(ip_list);

    //6.可以调用data_service_ctx_stop函数暂停接收数据,start和stop的调用要成对,不能在没有调用start的情况下调用stop,也不能再已经start的情况下调用start,非线程安全
    data_service_ctx_stop(ctx);
    //7.delete_data_service_ctx;释放new_data_service_ctx占用的资源,退出时需要调用,需要在调用stop之后调用该函数,非线程安全
    delete_data_service_ctx(ctx);
    //8.uninit_edge_service_ctx;释放init_edge_service_ctx占用的资源,退出时需要调用,需要在调用delete_data_service_ctx之后调用该函数,非线程安全
    uninit_edge_service_ctx();

    free(user_ctx);

    return 0;
}

Code Sample for Consuming Subscribed Response of Setting Points

#include "edge_service_c_api/edge_service_c_api.h"

struct my_ctx {
    int aa;
    char bb[128];
};

void my_log_writer(struct LogInfoBox *log_info, void *user_ctx) {
    if (log_info == NULL) {
        return;
    }
    printf("[TEST-%s]:%s(file=%s, function=%s, line=%d)\n", log_info->module_name, log_info->log_str,
           log_info->file_path,
           log_info->func_name, log_info->line_num);

    //注意用完之后要释放
    delete_log_info_box(log_info);
}

void print_set_measurepoint_response_msg(struct SetMeasurepointResponseStruct *smrs_ptr) {
    printf("---------------------->\n");
    if (smrs_ptr == NULL) {
        printf("smrs_ptr == NULL\n");
        return;
    }
    printf("smrs_ptr->point_count=%d\n", smrs_ptr->point_count);
    int ii = 0;
    for (ii = 0; ii < smrs_ptr->point_count; ii++) {
        printf("*******>\n");
        printf("smrs_ptr->points[%d].requestid=%s\n", ii, smrs_ptr->points[ii].requestid);
        printf("smrs_ptr->points[%d].orgid=%s\n", ii, smrs_ptr->points[ii].orgid);
        printf("smrs_ptr->points[%d].calltype=%s\n", ii, smrs_ptr->points[ii].calltype);
        printf("smrs_ptr->points[%d].setmeasurepointchannelid=%s\n", ii,
               smrs_ptr->points[ii].setmeasurepointchannelid);
        printf("smrs_ptr->points[%d].productkey=%s\n", ii, smrs_ptr->points[ii].productkey);
        printf("smrs_ptr->points[%d].devicekey=%s\n", ii, smrs_ptr->points[ii].devicekey);
        printf("smrs_ptr->points[%d].assetid=%s\n", ii, smrs_ptr->points[ii].assetid);
        printf("smrs_ptr->points[%d].measurepointid=%s\n", ii,
               smrs_ptr->points[ii].measurepointid);
        printf("smrs_ptr->points[%d].callbackurl=%s\n", ii, smrs_ptr->points[ii].callbackurl);
        printf("smrs_ptr->points[%d].inputdata=%s\n", ii, smrs_ptr->points[ii].inputdata);
        printf("smrs_ptr->points[%d].status=%lld\n", ii, (long long int) (smrs_ptr->points[ii].status));
        printf("smrs_ptr->points[%d].msg=%s\n", ii, smrs_ptr->points[ii].msg);
        printf("smrs_ptr->points[%d].submsg=%s\n", ii, smrs_ptr->points[ii].submsg);
        printf("smrs_ptr->points[%d].timeout=%lld\n", ii, (long long int) (smrs_ptr->points[ii].timeout));
        printf("smrs_ptr->points[%d].gmtsetmeasurepointrequest=%lld\n", ii,
               (long long int) (smrs_ptr->points[ii].gmtsetmeasurepointrequest));
        printf("smrs_ptr->points[%d].gmtsetmeasurepointreply=%lld\n", ii,
               (long long int) (smrs_ptr->points[ii].gmtsetmeasurepointreply));
        printf("smrs_ptr->points[%d].attr=%s\n", ii, smrs_ptr->points[ii].attr);
        printf("<*******\n");
    }

    printf("<----------------------\n\n");
}

void connect_callback(void *work_ctx, char *channel_id, void *user_ctx) {
    printf("[DATASERVICE]:connected, channel_id=%s(file=%s, function=%s, line=%d)\n",
           channel_id, __FILE__, __FUNCTION__, __LINE__);
    struct my_ctx *x = (struct my_ctx *) (user_ctx);
    if (x == NULL) {
        return;
    }
    printf("[DATASERVICE]:user_ctx->aa=%d, user_ctx->bb=%s(file=%s, function=%s, line=%d)\n",
           x->aa, x->bb, __FILE__, __FUNCTION__, __LINE__);
    return;
}

void close_callback(void *work_ctx, char *channel_id, void *user_ctx) {
    printf("[DATASERVICE]:closed, channel_id=%s(file=%s, function=%s, line=%d)\n", channel_id,
           __FILE__, __FUNCTION__, __LINE__);
    struct my_ctx *x = (struct my_ctx *) (user_ctx);
    if (x == NULL) {
        return;
    }
    printf("[DATASERVICE]:user_ctx->aa=%d, user_ctx->bb=%s(file=%s, function=%s, line=%d)\n",
           x->aa, x->bb, __FILE__, __FUNCTION__, __LINE__);
    return;
}

void
msg_callback(void *work_ctx, char *channel_id, struct DataServiceMessage *msg, void *user_ctx) {
    printf("[DATASERVICE]:recv msg, channel_id=%s(file=%s, function=%s, line=%d)\n",
           channel_id, __FILE__, __FUNCTION__, __LINE__);
    struct my_ctx *x = (struct my_ctx *) (user_ctx);
    if (x == NULL) {
        //注意用完后要释放
        delete_data_service_msg(msg);
        return;
    }
    printf("[DATASERVICE]:user_ctx->aa=%d, user_ctx->bb=%s(file=%s, function=%s, line=%d)\n",
           x->aa, x->bb, __FILE__, __FUNCTION__, __LINE__);

    printf("[DATASERVICE]:msg->topic_type=%d(file=%s, function=%s, line=%d)\n",
           msg->topic_type, __FILE__, __FUNCTION__, __LINE__);

    switch (msg->topic_type) {
        case TOPIC_TYPE_SET_MEASUREPOINT_RESPONSE:
            print_set_measurepoint_response_msg((struct SetMeasurepointResponseStruct *) (msg->msg));
            break;
        default:
            printf("[DATASERVICE]:unsupported msg->topic_type=%d(file=%s, function=%s, line=%d)\n",
                   msg->topic_type, __FILE__, __FUNCTION__, __LINE__);
            break;
    }

    //注意用完后要释放
    delete_data_service_msg(msg);
    return;
}

int main(int argc, char *argv[]) {
    int ret = 0;
    struct IPBox *ip_list = NULL;
    char topic_name[60];
    memset(topic_name, 0, sizeof(topic_name));
    char consumerGroup[60];
    memset(consumerGroup, 0, sizeof(consumerGroup));

    //需要连接的ip
    ip_list = add_ip_to_ip_box(ip_list, "127.0.0.1");
    //订阅的topic类型,实时数据订阅可以选TOPIC_TYPE_SET_MEASUREPOINT_RESPONSE或TOPIC_TYPE_AUTO
    int topic_type = TOPIC_TYPE_SET_MEASUREPOINT_RESPONSE;
    //端口
    int port = EDGE_DATASERVICE_DEFAULT_PORT;
    //topic名字,写值反较数据订阅的topic都是以DATASVC.SET.开头
    snprintf(topic_name, sizeof(topic_name), "%s", "DATASVC.SET.APP.SUBTEST");
    //consumerGroup,非必填,如果consumerGroup是NULL,则默认为default,详细说明参考new_data_service_ctx函数注释
    snprintf(consumerGroup, sizeof(consumerGroup), "%s", "default");

    struct DataServiceCtx *ctx = NULL;

    //1.init_edge_service_ctx;初始化全局变量,非线程安全
    ret = init_edge_service_ctx();
    if (ret < 0) {
        printf("[DATASERVICE_TEST]:init_edge_service_ctx error(file=%s, function=%s, line=%d)\n", __FILE__,
               __FUNCTION__, __LINE__);
        delete_ip_box(ip_list);
        return -1;
    }

    struct my_ctx *user_ctx = (struct my_ctx *) malloc(sizeof(struct my_ctx));
    if (user_ctx == NULL) {
        printf("[DATASERVICE_TEST]:user_ctx malloc error(file=%s, function=%s, line=%d)\n", __FILE__, __FUNCTION__,
               __LINE__);
        delete_ip_box(ip_list);
        uninit_edge_service_ctx();
        return -1;
    }
    memset(user_ctx, 0, sizeof(struct my_ctx));
    user_ctx->aa = 10;
    snprintf(user_ctx->bb, sizeof(user_ctx->bb), "%s", "user_custom_info");

    //2.set_log_level(EDGE_LOG_INFO);设置日志等级,不设置默认为EDGE_LOG_INFO,线程安全,可以随时调用这个函数,实时生效
    set_log_level(EDGE_LOG_INFO);
    //3.set_log_writer(my_log_writer, user_ctx);设置打印函数,sdk内部需要打印时会调用这个打印函数打印,如果不设置,默认打印到命令行终端,打印函数中注意数据用完后需要delete_log_info_box释放,非线程安全,一开始设置一次就可以了
    set_log_writer(my_log_writer, user_ctx);

    //4.调用new_data_service_ctx函数,初始化必要的上下文,线程安全,注意当只填一个IP时,无论这个IP是主还是备,都会去向这个IP订阅数据,填多个IP时,会根据主备情况自动切换
    ctx = new_data_service_ctx(
            ip_list,
            port,
            "your accessKey",
            "your secretKey",
            topic_name,
            consumerGroup,
            topic_type,
            1,
            user_ctx,
            connect_callback,
            close_callback,
            msg_callback
    );

    if (ctx == NULL) {
        printf("[DATASERVICE_TEST]:new_data_service_ctx error(file=%s, function=%s, line=%d)\n", __FILE__, __FUNCTION__,
               __LINE__);
        delete_ip_box(ip_list);
        ip_list = NULL;
        free(user_ctx);
        user_ctx = NULL;
        uninit_edge_service_ctx();
        return -1;
    }

    //5.调用data_service_ctx_start函数,启动相关模块,开始从服务端接收数据,非线程安全
    ret = data_service_ctx_start(ctx);
    if (ret < 0) {
        printf("[DATASERVICE_TEST]:data_service_ctx_start error(file=%s, function=%s, line=%d)\n", __FILE__,
               __FUNCTION__, __LINE__);
        delete_ip_box(ip_list);
        ip_list = NULL;
        free(user_ctx);
        user_ctx = NULL;
        delete_data_service_ctx(ctx);
        ctx = NULL;
        uninit_edge_service_ctx();
        return -1;
    }

    //运行60s后退出
    edge_sleep(60000);

    //不使用时记得stop和delete

    //ip_box不使用时是需要释放的
    delete_ip_box(ip_list);

    //6.可以调用data_service_ctx_stop函数暂停接收数据,start和stop的调用要成对,不能在没有调用start的情况下调用stop,也不能再已经start的情况下调用start,非线程安全
    data_service_ctx_stop(ctx);
    //7.delete_data_service_ctx;释放new_data_service_ctx占用的资源,退出时需要调用,需要在调用stop之后调用该函数,非线程安全
    delete_data_service_ctx(ctx);
    //8.uninit_edge_service_ctx;释放init_edge_service_ctx占用的资源,退出时需要调用,需要在调用delete_data_service_ctx之后调用该函数,非线程安全
    uninit_edge_service_ctx();

    free(user_ctx);

    return 0;
}

Code Sample for Consuming Customized Subscribed Data

#include "edge_service_c_api/edge_service_c_api.h"

struct my_ctx {
    int aa;
    char bb[128];
};

void my_log_writer(struct LogInfoBox *log_info, void *user_ctx) {
    if (log_info == NULL) {
        return;
    }
    printf("[TEST-%s]:%s(file=%s, function=%s, line=%d)\n", log_info->module_name, log_info->log_str,
           log_info->file_path,
           log_info->func_name, log_info->line_num);

    //注意用完之后要释放
    delete_log_info_box(log_info);
}

void print_custom_msg(char *custom_ptr, int len) {
    printf("---------------------->\n");
    if (custom_ptr == NULL) {
        printf("custom_ptr == NULL\n");
        return;
    }

    int ii = 0;
    printf("hex:");
    for (ii = 0; ii < len; ii++) {
        printf("%hhx ", custom_ptr[ii]);
    }
    printf("\n");
    printf("str:%s\n", custom_ptr);
    printf("<----------------------\n\n");
}

void connect_callback(void *work_ctx, char *channel_id, void *user_ctx) {
    printf("[DATASERVICE]:connected, channel_id=%s(file=%s, function=%s, line=%d)\n",
           channel_id, __FILE__, __FUNCTION__, __LINE__);
    struct my_ctx *x = (struct my_ctx *) (user_ctx);
    if (x == NULL) {
        return;
    }
    printf("[DATASERVICE]:user_ctx->aa=%d, user_ctx->bb=%s(file=%s, function=%s, line=%d)\n",
           x->aa, x->bb, __FILE__, __FUNCTION__, __LINE__);
    return;
}

void close_callback(void *work_ctx, char *channel_id, void *user_ctx) {
    printf("[DATASERVICE]:closed, channel_id=%s(file=%s, function=%s, line=%d)\n", channel_id,
           __FILE__, __FUNCTION__, __LINE__);
    struct my_ctx *x = (struct my_ctx *) (user_ctx);
    if (x == NULL) {
        return;
    }
    printf("[DATASERVICE]:user_ctx->aa=%d, user_ctx->bb=%s(file=%s, function=%s, line=%d)\n",
           x->aa, x->bb, __FILE__, __FUNCTION__, __LINE__);
    return;
}

void
msg_callback(void *work_ctx, char *channel_id, struct DataServiceMessage *msg, void *user_ctx) {
    printf("[DATASERVICE]:recv msg, channel_id=%s(file=%s, function=%s, line=%d)\n",
           channel_id, __FILE__, __FUNCTION__, __LINE__);
    struct my_ctx *x = (struct my_ctx *) (user_ctx);
    if (x == NULL) {
        //注意用完后要释放
        delete_data_service_msg(msg);
        return;
    }
    printf("[DATASERVICE]:user_ctx->aa=%d, user_ctx->bb=%s(file=%s, function=%s, line=%d)\n",
           x->aa, x->bb, __FILE__, __FUNCTION__, __LINE__);

    printf("[DATASERVICE]:msg->topic_type=%d(file=%s, function=%s, line=%d)\n",
           msg->topic_type, __FILE__, __FUNCTION__, __LINE__);

    switch (msg->topic_type) {
        case TOPIC_TYPE_CUSTOM:
            print_custom_msg((char *) (msg->msg), msg->msg_len);
            break;
        default:
            printf("[DATASERVICE]:unsupported msg->topic_type=%d(file=%s, function=%s, line=%d)\n",
                   msg->topic_type, __FILE__, __FUNCTION__, __LINE__);
            break;
    }

    //注意用完后要释放
    delete_data_service_msg(msg);
    return;
}

int main(int argc, char *argv[]) {
    int ret = 0;
    struct IPBox *ip_list = NULL;
    char topic_name[60];
    memset(topic_name, 0, sizeof(topic_name));
    char consumerGroup[60];
    memset(consumerGroup, 0, sizeof(consumerGroup));

    //需要连接的ip
    ip_list = add_ip_to_ip_box(ip_list, "127.0.0.1");
    //订阅的topic类型,实时数据订阅可以选TOPIC_TYPE_CUSTOM或TOPIC_TYPE_AUTO,custom类型不会解析订阅到的数据,会原样返回给调用方
    int topic_type = TOPIC_TYPE_CUSTOM;
    //端口
    int port = EDGE_DATASERVICE_DEFAULT_PORT;
    //topic名字,custom订阅方式时,可以订阅任意topic,包括实时数据、控制反较、写值反较、其他数据
    snprintf(topic_name, sizeof(topic_name), "%s", "custom_topic");
    //consumerGroup,非必填,如果consumerGroup是NULL,则默认为default,详细说明参考new_data_service_ctx函数注释
    snprintf(consumerGroup, sizeof(consumerGroup), "%s", "default");

    struct DataServiceCtx *ctx = NULL;

    //1.init_edge_service_ctx;初始化全局变量,非线程安全
    ret = init_edge_service_ctx();
    if (ret < 0) {
        printf("[DATASERVICE_TEST]:init_edge_service_ctx error(file=%s, function=%s, line=%d)\n", __FILE__,
               __FUNCTION__, __LINE__);
        delete_ip_box(ip_list);
        return -1;
    }

    struct my_ctx *user_ctx = (struct my_ctx *) malloc(sizeof(struct my_ctx));
    if (user_ctx == NULL) {
        printf("[DATASERVICE_TEST]:user_ctx malloc error(file=%s, function=%s, line=%d)\n", __FILE__, __FUNCTION__,
               __LINE__);
        delete_ip_box(ip_list);
        uninit_edge_service_ctx();
        return -1;
    }
    memset(user_ctx, 0, sizeof(struct my_ctx));
    user_ctx->aa = 10;
    snprintf(user_ctx->bb, sizeof(user_ctx->bb), "%s", "user_custom_info");

    //2.set_log_level(EDGE_LOG_INFO);设置日志等级,不设置默认为EDGE_LOG_INFO,线程安全,可以随时调用这个函数,实时生效
    set_log_level(EDGE_LOG_INFO);
    //3.set_log_writer(my_log_writer, user_ctx);设置打印函数,sdk内部需要打印时会调用这个打印函数打印,如果不设置,默认打印到命令行终端,打印函数中注意数据用完后需要delete_log_info_box释放,非线程安全,一开始设置一次就可以了
    set_log_writer(my_log_writer, user_ctx);

    //4.调用new_data_service_ctx函数,初始化必要的上下文,线程安全,注意当只填一个IP时,无论这个IP是主还是备,都会去向这个IP订阅数据,填多个IP时,会根据主备情况自动切换
    ctx = new_data_service_ctx(
            ip_list,
            port,
            "your accessKey",
            "your secretKey",
            topic_name,
            consumerGroup,
            topic_type,
            1,
            user_ctx,
            connect_callback,
            close_callback,
            msg_callback
    );

    if (ctx == NULL) {
        printf("[DATASERVICE_TEST]:new_data_service_ctx error(file=%s, function=%s, line=%d)\n", __FILE__, __FUNCTION__,
               __LINE__);
        delete_ip_box(ip_list);
        ip_list = NULL;
        free(user_ctx);
        user_ctx = NULL;
        uninit_edge_service_ctx();
        return -1;
    }

    //5.调用data_service_ctx_start函数,启动相关模块,开始从服务端接收数据,非线程安全
    ret = data_service_ctx_start(ctx);
    if (ret < 0) {
        printf("[DATASERVICE_TEST]:data_service_ctx_start error(file=%s, function=%s, line=%d)\n", __FILE__,
               __FUNCTION__, __LINE__);
        delete_ip_box(ip_list);
        ip_list = NULL;
        free(user_ctx);
        user_ctx = NULL;
        delete_data_service_ctx(ctx);
        ctx = NULL;
        uninit_edge_service_ctx();
        return -1;
    }

    //运行60s后退出
    edge_sleep(60000);

    //不使用时记得stop和delete

    //ip_box不使用时是需要释放的
    delete_ip_box(ip_list);

    //6.可以调用data_service_ctx_stop函数暂停接收数据,start和stop的调用要成对,不能在没有调用start的情况下调用stop,也不能再已经start的情况下调用start,非线程安全
    data_service_ctx_stop(ctx);
    //7.delete_data_service_ctx;释放new_data_service_ctx占用的资源,退出时需要调用,需要在调用stop之后调用该函数,非线程安全
    delete_data_service_ctx(ctx);
    //8.uninit_edge_service_ctx;释放init_edge_service_ctx占用的资源,退出时需要调用,需要在调用delete_data_service_ctx之后调用该函数,非线程安全
    uninit_edge_service_ctx();

    free(user_ctx);

    return 0;
}