Introduction


Stream Data Store (SDS) Smart Module is a scalable stream data storage service. SDS has infinite data streams to which you can store any kind of data that is generated in your IoT devices. If you create an SDS Smart Module, you can have multiple Streams in it. Atomic data in an SDS Stream is called a Data Packet, whose data structure consists of 1) byte sequence and 2) packet index within a stream. IoT devices can send their data to SDS Smart Module with the MQTT protocol, and your business application can retrieve uploaded data via a REST API by specifying the range of data you are interested in.

Setting up Stream Data Store Smart Module


ONE | In the Developer Console, navigate to the SMART MODULE LIST page and select +NEW. Click +Add on the section for Stream Data Store.

Screenshot - Console Smart Modules Add Stream Data Store

TWO | Fill out the "New Stream Data Store Module" form.

Screenshot - Console Smart Modules Add Stream Data Store
  • Module ID: Uniquely identifiable name used for getting access to the data in SDS from the REST API.
  • Description: Description for the new Smart Module.
  • Bulk Data Label: Uniquely identifiable name used for uploading the data by MQTT.
  • Stream timeout: The session between SDS and your IoT device will be aborted if there are no activities for this duration of time.
  • Date retention: The data packets and metadata packets can be automatically deleted depending on this value. By default, data is retained indefinitely.

Data structure of Stream Data Store


When you create a Stream Data Store smart module in your project, your smart module will maintain structures like in the following diagram.

Concept diagram of Stream Data Store

In a Stream Data Store smart module, you can create an arbitrary number of Streams (the bands in orange color).

In the Stream, there are Data Packets (yellow colored circles) which are entities of your applications' data. Each Data Packet is a tuple of 1) packet index and 2) actual data entity which is essentially an arbitrary byte sequence. Alongside with the Data Packets, you can have additional information, the so-called Metadata Packet (green colored callouts) in the Stream. Metadata Packet is represented by a JSON object which contains additional information about the corresponding Data Packet.

The group of Packets (Data Packets and Metadata Packets) enclosed by the curly braces in the diagram is called a Session. When your Device sends data to the Stream, you need to open and close the Stream, as you would do when working with the Unix filesystem. The session is automatically created when the stream is opened and closed. That means, the first Data Packet in a Session is the first Data Packet the Device has sent after it has opened the Stream, and the last Data Packet in the Session is the one that the Device has sent just before it has closed the Stream.

You can get access to those data structures via the REST API. Please refer to the API Documentation for the details.

Collecting Data


To send data to SDS, you need to use MQTT (please see How to use MQTT with MODE first if you are not familiar). The basic workflow consists of following three steps:

  1. Open Stream
  2. Send Data Packets (and Metadata Packets)
  3. Close Stream

Just like writing to a file on a Unix filesystem, your IoT device needs to open an SDS Stream so that you can write Data Packets on it. Once your IoT device has opened the Stream, the other Devices cannot open it for writing until the device who opened that Stream would close it.

For step 1 and 3, opening/closing the Stream, you need to work with Bulk Data Request / Response MQTT topics, which are the special topics for enabling request-response communications.

For step 2, sending Data Packets, you would work with Bulk Data MQTT topic, which is a special topic for sending binary data.

In the following subsections, we will explain the details of the workflow.

Opening SDS Stream

The Bulk Data Request topic is used to open an SDS Stream.

The topic is in the format:

/devices/DEVICE_ID/bulkData/BULK_DATA_LABEL/request

The payload structure for the Request JSON object which consists of the following three data fields.

  • requestType: must be "open"
  • streamName: arbitrary string which uniquely identifies your stream
  • openFlag: 0 or 1; 0 is for normal open, 1 is for opening the stream which is already opened by another device

The entire request JSON to be published to the Bulk Data Request topic (e.g. /devices/1234/bulkData/mySDS/request) looks like the following example:

{
  "requestID": "24020E73-68E1-40AC-A8F1-763D77591ED7"
  "bulkDataLabel": "mySDS",
  "payload": {
    "requestType": "open",
    "streamName": "device1234:log-stream",
    "openFlag": 0
  }
}

The corresponding response is to be published back to the Bulk Data Response topic that the Device should subscribe to.

The topic is in the format:

/devices/DEVICE_ID/bulkData/BULK_DATA_LABEL/response

The response payload is also a JSON object which consists of following two data fields:

  • status: the outcome of the request.
  • packetIndex: the packet index with which the Device has to start sending data packets.

The status field should take one of the following values:

  • OK: stream is successfully opened.
  • ERR_LOCKED_SESSION: can't open stream because another Device has already opened the same stream.
  • ERR_UNKNOWN_REQUEST_TYPE: requestType field in the Request is incorrect.

The entire Response JSON to be published to the Bulk Data Response topic (e.g. /devices/1234/bulkData/mySDS/response) that corresponds to the above Request JSON example will look like below.

{
  "requestID": "24020E73-68E1-40AC-A8F1-763D77591ED7",
  "status": "OK",
  "payload": {
    "status": "OK",
    "packetIndex": 0
  }
}

In this example Response, you can know the stream "device1234:log-stream" is created for the first time because the payload.packetIndex field in the Response JSON is set to 0. The next time the Stream is opened, the value of payload.packetIndex field in the Response for the stream open Request will be different — the value will be the packet index of the last Data Packet sent plus one.

Sending SDS Data Packets

Once a device has succeeded to open the stream, it can start sending actual data packets.

A Data Packet should be represented by a sequence of MessagePack format data consisting of following three factors:

  • ext format: 0
  • str format: stream name
  • int format: packet index
  • bin format: data

Data Packet representation in MessagePack uses an extension type that must be "0". A custom encoder is likely needed to encode a Data Packet structure into MessagePack.

The following code snippet is an example code written in Go (with github.com/vmihailenco/msgpack) for defining a Data Packet structure which can be encoded as MessagePack.

Example code in Go

type DataPacket struct {
    StreamName  string
    PacketIndex uint32
    Data        []byte
}

var _ msgpack.Marshaler  = DataPacket{}

func init() {
    msgpack.RegisterExt(int8(0), DataPacket{})
}

func (d DataPacket) MarshalMsgpack() ([]byte, error) {
    var buf bytes.Buffer
    enc := msgpack.NewEncoder(&buf)

    if err := enc.EncodeString(d.StreamName); err != nil {
        return nil, err
    }
    if err := enc.EncodeUint32(d.PacketIndex); err != nil {
        return nil, err
    }
    if err := enc.EncodeBytes(d.Data); err != nil {
        return nil, err
    }

    return buf.Bytes(), nil
}

The way to create data packets really depends on your application. You can send a single JSON object that represents your Device status at a certain point of time in a data packet. Or you can send unstructured log text on your device. You can also send a series of video data across multiple data packets.

But the following are the general rules of working with SDS.

  • The first Data Packet after the Stream is opened should be indexed with the value in the packetIndex field of the Response payload JSON object.
  • The packet indexes for the subsequent Data Packets should be greater than the previous one.
  • It's the client's responsibility to increment the packet indexes. The way to increment the packet index doesn't have to be consecutive, as long as the index is greater than the previous one. Note that if the same packet index is attached to the different data, the one sent later will overwrite the previously sent data.
  • The data payload size in a Data Packet should be less than 1 MiB.

The Data Packets need to be sent on the Bulk Data publish topic of MODE MQTT. In each MQTT Message for Bulk Data to be published, you can send multiple data packets.

Bulk Data payload is an arbitrary byte sequence, so you can send the sequence of the Data Packet byte representation described above.

Adding Metadata Packets when you send Data Packets

While you are sending Data Packets you can also send Metadata Packet(s).

The structure of Metadata Packet to be sent over the MQTT connection is similar to the Data Packet structure. The Metadata Packet is a tuple of the following three factors that are encoded by MessagePack:

  • ext format: 1
  • str format: stream name
  • int format: packet index
  • str format: json string

Metadata Packet representation in MessagePack uses an extension type that must be "1". Again, a custom encoder is likely needed to encode a Metadata Packet structure into MessagePack.

The following code snippet is an example code written in Go (with github.com/vmihailenco/msgpack) for defining a Metadata Packet structure which can be encoded as MessagePack.

Example code in Go

type MetadataPacket struct {
    StreamName  string
    PacketIndex uint32
    Content     map[string]interface{}
}

var _ msgpack.Marshaler = MetadataPacket{}

func init() {
    msgpack.RegisterExt(int8(1), MetadataPacket{})
}

func (m MetadataPacket) MarshalMsgpack() ([]byte, error) {
    var buf bytes.Buffer
    enc := msgpack.NewEncoder(&buf)

    if err := enc.EncodeString(m.StreamName); err != nil {
        return nil, err
    }
    if err := enc.EncodeUint32(uint32(m.PacketIndex)); err != nil {
        return nil, err
    }
    jsonBytes, err := json.Marshal(m.Content)
    if err != nil {
        return nil, err
    }
    if err := enc.EncodeString(string(jsonBytes)); err != nil {
        return nil, err
    }

    return buf.Bytes(), nil
}

Metadata Packets and Data Packets can be mixed up in the same payload for an individual MQTT Message for Bulk Data topic.

Closing SDS Stream

After all necessary Data Packets have been sent , it's time to close the Stream. Just like opening a Stream, closing a Stream uses the Bulk Data Request topic of MODE MQTT.

The payload structure for the request JSON object consists of the following three data fields:

  • requestType: must be "close"
  • streamName: arbitrary string which identifies your stream
  • packetIndex: packet index of the last Data Packet the Device has sent

As mentioned in the previous section, the packetIndex field should be the one that is attached to the last Data Packet sent.

The entire request JSON to be published to the Bulk Data Request topic (e.g. /devices/1234/bulkData/mySDS/request) which corresponds to the open Stream Request looks like the following:

{
  "requestID": "1B93967B-5DF2-4098-94EF-409DAC6866B9"
  "bulkDataLabel": "mySDS",
  "payload": {
    "requestType": "close",
    "streamName": "device1234:log-stream",
    "lastPacketIndex": 10
  }
}

The corresponding response is published to the Bulk Data Response topic that the device should subscribe to. The response payload is also a JSON object which consists of just one data field:

  • status: outcome of the request.

The status filed should take one of following values:

  • OK: stream is successfully closed.
  • ERR_PERMISSION_DENIED: error that occurs when the device that tries to close the Stream is different from the one that has opened that Stream.
  • ERR_INVALID_CLOSE_PACKET_INDEX: error that occurs when the packetIndex is invalid.
  • ERR_UNKNOWN_REQUEST_TYPE: error that occurs when requestType field in the Request is incorrect.

The entire Response JSON to be sent to the Bulk Data Response topic (e.g. /devices/1234/bulkData/mySDS/response) that corresponds to the above Request JSON example looks like below.

{
  "requestID": "1B93967B-5DF2-4098-94EF-409DAC6866B9",
  "status": "OK",
  "payload": {
    "status": "OK",
  }
}

Retrieving data


You can retrieve the stored Data Packets via the MODE REST API.

When you can download the Data Packets, you should make an HTTP GET request to the Data Packets endpoint. Please note the following:

  • After successful authentication and authorization, the Data Packets endpoint will redirect you to another server which actually serves the Data Packets. So, please make sure your HTTP client follows the redirection.
  • The actual Data Packets server will send you Data Packets using the "chunked" transfer encoding scheme, which means the Data Packets are gradually sent to your HTTP client by chunks.
  • Each data chunk is Data Packets encoded with MessagePack.

The following code snippet is an example code written in Go (with github.com/vmihailenco/msgpack) for retrieving Data Packets using the REST API.

Example code in Go

type Packet struct {
    StreamName  string
    PacketIndex uint32
    Data        []byte
}

var _ msgpack.CustomDecoder = &Packet{}

func (p *Packet) DecodeMsgpack(dec *msgpack.Decoder) error {
    return dec.DecodeMulti(&p.StreamName, &p.PacketIndex, &p.Data)
}

func RetrieveDataPacket(
    ctx context.Context,
    homeID uint64,
    smartModuleID string,
    streamName string,
    start int,
    stop int,
    apiKey string,
) ([]Packet, error) {
    reqURL := url.URL{
        Scheme: "https",
        Host:   "api.tinkermode.com",
    }
    reqURL.Path = fmt.Sprintf("/homes/%d/smartModules/%s/streams/%s/data", homeID, smartModuleID, streamName)
    params := url.Values{}
    params.Add("start", strconv.Itoa(start))
    params.Add("stop", strconv.Itoa(stop))
    reqURL.RawQuery = params.Encode()

    req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqURL.String(), nil)
    if err != nil {
        return nil, fmt.Errorf("failed to create request: %w", err)
    }
    req.Header.Set("Authorization", fmt.Sprintf("ModeCloud %s", apiKey))

    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        return nil, fmt.Errorf("failed to get data from API: %w", err)
    }
    defer func() {
        _ = resp.Body.Close()
    }()

    // Go's http client automatically follows the redirect and handle chunked response
    if resp.StatusCode != http.StatusOK {
        return nil, fmt.Errorf("unexpected response status: %d %s", resp.StatusCode, resp.Status)
    }

    dec := msgpack.NewDecoder(resp.Body)
    var ps []Packet
    for {
        var p Packet
        if err := dec.Decode(&p); err != nil {
            if !errors.Is(err, io.EOF) {
                return nil, fmt.Errorf("failed to decode: %w", err)
            }
            return ps, nil
        }
        ps = append(ps, p)
    }
}

For more details, please refer to the full REST API Documentation for SDS.