AWS - Kinesis - Getting Started

Subscribe Send me a message home page tags


Related Readings

Introduction

This post covers the following topics

Publish/Subscribe to Kinesis Data Stream

Publishing data to a Kinesis data stream is relatively easy, especially when the data stream already exists.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import boto3
import time
import os

os.environ['AWS_PROFILE'] = "<your-producer-profile>"
os.environ['AWS_DEFAULT_REGION'] = '<your-region>'

DATA_STREAM = "DataStreamTest"

client = boto3.client('kinesis')

count = 0
while True:
    count += 1
    response = client.put_record(
        StreamName = DATA_STREAM,
        Data = "test-data-{}".format(count),
        PartitionKey = "test")

    print(response)
    time.sleep(10)

Subscribing to the data stream is more involved. The reason is that we need to track additional information. For example, we need to know the shard and track the iterator.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import boto3
import time
import os

os.environ['AWS_PROFILE'] = "<your-consumer-profile>"
os.environ['AWS_DEFAULT_REGION'] = '<your-region>'

DATA_STREAM = "DataStreamTest"


kinesisClient = boto3.client('kinesis')

response = kinesisClient.describe_stream(StreamName=DATA_STREAM)
# Assuming there is only one shard.
shardId = response['StreamDescription']['Shards'][0]["ShardId"]

shardIterator = kinesisClient.get_shard_iterator(StreamName=DATA_STREAM,
                                          ShardId=shardId,
                                          ShardIteratorType='LATEST')

recordResponse = kinesisClient.get_records(ShardIterator=shardIterator['ShardIterator'],
                                     Limit=2)

while 'NextShardIterator' in recordResponse:
    recordResponse = kinesisClient.get_records(ShardIterator=recordResponse['NextShardIterator'], Limit=2)
    print(recordResponse)
    time.sleep(20)

Communication Across AWS Accounts

It may happen that the producer and consumer are not in the same AWS account. In this case, we need to use the assume role feature in AWS. Conceptually, there should be two parts of the setup:

The request part is done by assuming a Kinesis role in the consumer code. The Kinesis role is defined in the producer account because this is the "product" offered by the producer. For example, in the diagram below, we have a KinesisReadOnlyRole defined in the producer AWS account which basically means the producer account "offers" to provide a read-only data stream. Anyone who knows about this role (i.e. the ARN of this role) can request the data by assuming the role.

The approval part is done in the "Trus relationships" section in the role where the producer grants access to the requester. The request is identified by the consumer AWS account ARN.

Assume_Role.png

To assume a role, we need to use AWS Security Token Service.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import boto3
stsClient = boto3.client("sts")
stsResponse = stsClient.assume_role(
    RoleArn="<TheKinesisRoleDefinedInProducerAccount>",
    RoleSessionName = "KinesisConsumer"
)

print("[Consumer] AWS account ID: {}".format(stsClient.get_caller_identity()["Account"]))

newSessionId = stsResponse["Credentials"]["AccessKeyId"]
newSessionKey = stsResponse["Credentials"]["SecretAccessKey"]
newSessionToken = stsResponse["Credentials"]["SessionToken"]

kinesisClient = boto3.client(
    'kinesis',
    region_name='us-east-1',
    aws_access_key_id=newSessionId,
    aws_secret_access_key=newSessionKey,
    aws_session_token=newSessionToken
)

As we can imagine, the Security Token Service receives the consumer account info and the requested role; it checks the role configuration defined in the producer account. If the setting is correct, then the Security Token Service generates a temporary access key, which is returned to the consumer code. Then the consumer can use the temporary access key to create a kinesis client that can access the data stream in the producer account.

Put Everything Togethe

Producer Code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import boto3
import time
import os

os.environ['AWS_PROFILE'] = "ProducerProfile"
os.environ['AWS_DEFAULT_REGION'] = 'us-east-1'

DATA_STREAM = "DataStreamTest"

client = boto3.client('kinesis')

count = 0
while True:
    count += 1
    response = client.put_record(
        StreamName = DATA_STREAM,
        Data = "test-data-{}".format(count),
        PartitionKey = "test")

    print(response)
    time.sleep(10)

Consumer Code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import boto3
import time
import os

os.environ['AWS_PROFILE'] = "ConsumerProfile"
os.environ['AWS_DEFAULT_REGION'] = 'us-east-1'

DATA_STREAM = "DataStreamTest"

stsClient = boto3.client("sts")
stsResponse = stsClient.assume_role(
    RoleArn="arn:aws:iam::663710591058:role/KinesisReadAccessForOtherAccountRole",
    RoleSessionName = "KinesisConsumer"
)

print("[Consumer] AWS account ID: {}".format(stsClient.get_caller_identity()["Account"]))

newSessionId = stsResponse["Credentials"]["AccessKeyId"]
newSessionKey = stsResponse["Credentials"]["SecretAccessKey"]
newSessionToken = stsResponse["Credentials"]["SessionToken"]

kinesisClient = boto3.client(
    'kinesis',
    region_name='us-east-1',
    aws_access_key_id=newSessionId,
    aws_secret_access_key=newSessionKey,
    aws_session_token=newSessionToken
)

response = kinesisClient.describe_stream(StreamName=DATA_STREAM)
# Assuming there is only one shard.
shardId = response['StreamDescription']['Shards'][0]["ShardId"]

shardIterator = kinesisClient.get_shard_iterator(StreamName=DATA_STREAM,
                                          ShardId=shardId,
                                          ShardIteratorType='LATEST')

recordResponse = kinesisClient.get_records(ShardIterator=shardIterator['ShardIterator'],
                                     Limit=2)

while 'NextShardIterator' in recordResponse:
    recordResponse = kinesisClient.get_records(ShardIterator=recordResponse['NextShardIterator'], Limit=2)
    print(recordResponse)
    time.sleep(20)

----- END -----

Welcome to join reddit self-learning community.
Send me a message Subscribe to blog updates

Want some fun stuff?

/static/shopping_demo.png