Kinesis
Introduction
The Kinesis Smart Module is a Smart Module that directly forwards data collected from MODE Devices to your Kinesis Data Stream.
Preparation
Before you set up the Kinesis Smart Module, make sure 1) Kinesis Data Stream that will be the target is created, 2) An IAM Role with an IAM policy which has the appropriate permissions for putting records into the Kinesis Data Stream is created.
The IAM Policy given to the IAM Role should look like the following:
IAM Policy template
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:PutRecord",
"kinesis:PutRecords"
],
"Resource": "KINESIS_DATA_STREAM_ARN"
}
]
}
NOTE: Please replace KINESIS_DATA_STREAM_ARN with your actual Kinesis Data Stream ARN.
Setting up Kinesis Smart Module
ONE | In the Developer Console, navigate to the Smart Modules page and select +NEW. Click +ADD on the section for Kinesis.
TWO | Fill out the "New Kinesis Module" form, then click Save.
- Module ID: Uniquely identifiable name used for getting access to the data in SDS from the REST API.
- Description: Description for the new Smart Module.
- Subscribe Events: (optional) Add the type of Event that you want to propagate to your Kinesis Data Stream. If you use "*", all the Event occurring in your project will be forwarded.
- Bulk Data Label: (optional) Uniquely identifiable name used for uploading the data by MQTT.
- Role ARN: The ARN of your AWS IAM Role that allow MODE to forward the data into your Kinesis Data Stream.
- External ID: (optional) External ID you would have created when you created IAM Role.
- Region: AWS Region that your Kinesis Data Stream is located.
- Stream Name: Name of your Kinesis Data Stream.
- Use UUID: The flag to enable to set UUID to partition keys.
You need to specify either Subscribe Events
field or Bulk Data Label
field.
How to send Kinesis Data Record
Send MODE Event as Kinesis Data Record
First of all, please refer to the following pages to know the way to send Events if you are not familiar with MODE Event.
MODE Event is a JSON object with the following structure:
- FieldTypeDescriptionRequired?
- eventTypestringEvent type ID.Yes
- eventDataobjectEvent data.Yes
- timestampstringIn RFC 3339 format.Yes
- homeIdintegerID of home associated with this event.Yes
- originDeviceIdintegerID of device that sent the event.No (present only if event came from a device)
- originDeviceClassstringClass ID of device that sent the event.No (present only if event came from a device)
- originDeviceIpstringIP address of device that sent the event.No (present only if event came from a device)
- originProjectKeyIDstringID of project API key associated with the event.No (present only if event came from a program using a project API key)
- originProjectKeyNamestringName of project API key assoicated with the event.No (present only if event came from a program using a project API key)
Each MODE Event corresponds to a Kinesis Data Record. That is, if you send a MODE Event and it is forwarded to your Kinesis Data Stream, that event is represented as a Kinesis Data Record.
NOTE: The type of event which can be propagated to Kinesis Data Stream is determined by the Smart Module configuration "Subscribed Events".
Send arbitrary data as Kinesis Data Record
You can send any kind of data which can be represented as a byte sequence by MQTT with Bulk Data topic. Please read How to use MQTT with MODE if you are not familiar.
While the payload for the MQTT topic Bulk Data is an arbitrary byte sequence, you need to build the payload data to satisfy the format which is specific to Kinesis Smart Module.
The payload format used for the Bulk Data topic for Kinesis Smart Module is supposed to be encoded as MessagePack.
First part of the payload is a version string which represents the data encoding format version – "1.0" is the only available version string. The rest of it is an arbitrary-length array of byte arrays. Eachelement, a byte array, corresponds to a Kinesis Data Record.
- VersionData....Data
- "1.0"byte array....byte array
The following code snippet is an example in Golang for encoding an array of byte arrays into a MessagePacked payload for the MQTT Bulk Data topic corresponding to Kinesis Smart Module.
Example code by Golang
func encode(records [][]byte) ([]byte, error) {
var buf []byte
writer := bytes.NewBuffer(buf)
encoder := msgpack.NewEncoder(writer)
if err := encoder.EncodeString("1.0"); err != nil {
return nil, err
}
for _, record := range records {
err := encoder.EncodeBytes(record)
if err != nil {
return nil, err
}
}
return writer.Bytes(), nil
}
How a Kinesis Data Record from MODE looks like
As described in the API Document, essential components of Kinesis Data Record are "Data" ( arbitrary byte sequence within 1 MB) and "PartitionKey" (arbitrary string used to determine the shard to which the record would be going to).
"Data" is explained in the previous section. "Partition Key" is automatically assigned in the Kinesis Smart Module internal system. When "Use UUID" is enabled, UUIDv4 values are set as partition keys for each record. When it is turned off, Device IDs or Home IDs are used as partition keys; Device ID will be used for Device Event and Bulk Data, while Home ID will be used for Home Event.
Troubleshooting
You can view the Kinesis Smart Module activities under the "LOGS" section of your Kinesis Smart Module configuration page.
Check if there are log entries at the ERROR level.
One type of typical errors, especially in the early phase of development, is data format error. Please check How to send Kinesis Data Record section again.
Another common error is caused by insufficient throughput capacity in your Kinesis Data Stream. This error can only be fixed either by 1) decreasing the frequency or amount of data emission from devices or 2) increasing the number of shards in your Kinesis Data Stream so that it can acquire more throughput capacity (see Kinesis Data Streams Quotas and Limits - Amazon Kinesis Data Streams for more details about this topic).