Hey Tarun,
Your application looks ok and should work. I did notice this, however I cannot imagine it is an issue, unless you are not setting the region correctly: - getKafkaConsumerProperties() Make sure you are setting the correct region (AWSConfigConstants.AWS_REGION) in the properties. If this is ok, please check Flink dashboard to ensure the following metrics are flowing for this operator: - millisBehindLatest - numberOfAggregatedRecords - numberOfDeaggregatedRecords Thanks, On Wed, Aug 18, 2021 at 2:33 AM tarun joshi <1985.ta...@gmail.com> wrote: > Hey All, > > I am running flink in docker containers (image Tag > :flink:scala_2.11-java11) on EC2. > > I am able to connect to a Kinesis Connector but nothing is being consumed. > > My command to start Jobmanager and TaskManager : > > > > > > > > > > *docker run \--rm \--volume /root/:/root/ \--env > JOB_MANAGER_RPC_ADDRESS="${JOB_MANAGER_RPC_ADDRESS}" \--env > TASK_MANAGER_NUMBER_OF_TASK_SLOTS="${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" > \--env > ENABLE_BUILT_IN_PLUGINS="flink-s3-fs-hadoop-1.13.1.jar;flink-s3-fs-presto-1.13.1.jar" > \--name=jobmanager \--network flink-network \--publish 8081:8081 > \flink:scala_2.11-java11 jobmanager &* > > > > > > > > > > *docker run \--rm \--env > JOB_MANAGER_RPC_ADDRESS="${JOB_MANAGER_RPC_ADDRESS}" \--env > TASK_MANAGER_NUMBER_OF_TASK_SLOTS="${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" > \--env > ENABLE_BUILT_IN_PLUGINS="flink-s3-fs-hadoop-1.13.1.jar;flink-s3-fs-presto-1.13.1.jar" > \--name=taskmanager_0 \--network flink-network \flink:scala_2.11-java11 > taskmanager &* > > 2021-08-17 22:38:01,106 INFO > org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - > Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='<My > Stream Name>', shard='{ShardId: shardId-000000000000,HashKeyRange: > {StartingHashKey: 0,EndingHashKey: 34028236692093846346337460743176821144} > ,SequenceNumberRange: {StartingSequenceNumber: > 49600280467722672235426674687631661510244124728928239618,}}'}, starting > state set as sequence number LATEST_SEQUENCE_NUM > > &&& this for each shard Consumer > > 2021-08-17 22:38:01,107 INFO > org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher > [] - Subtask 0 will start consuming seeded shard > StreamShardHandle{streamName='web-clickstream', shard='{ShardId: > shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: > 34028236692093846346337460743176821144},SequenceNumberRange: > {StartingSequenceNumber: > 49600280467722672235426674687631661510244124728928239618,}}'} from > sequence number LATEST_SEQUENCE_NUM with ShardConsumer 0 > > my program is simple to test out a DataStream from Kinesis > > FlinkKinesisConsumer<String> kinesisConsumer = > new FlinkKinesisConsumer<>( > "<My-SteamName>", new SimpleStringSchema(), > getKafkaConsumerProperties()); > env.addSource(kinesisConsumer).print(); > > env.execute("Read files in streaming fashion"); > > Other Facts: > > > 1. I can see data being flowing into our kinesis stream from the > Monitoring Tab of AWS continuously. > 2. I was facing issues with Authorization of accessing the Kinesis in > our AWS infra, but I resolved that by moving in the same security group of > Kinesis deployment and creating a role with full access to Kinesis. > > > Any pointers are really appreciated! > > Thanks, > Tarun >