Hey Tarun,


Your application looks ok and should work. I did notice this, however I
cannot imagine it is an issue, unless you are not setting the region
correctly:

   - getKafkaConsumerProperties()



Make sure you are setting the correct region
(AWSConfigConstants.AWS_REGION) in the properties. If this is ok, please
check Flink dashboard to ensure the following metrics are flowing for this
operator:

   - millisBehindLatest
   - numberOfAggregatedRecords
   - numberOfDeaggregatedRecords



Thanks,

On Wed, Aug 18, 2021 at 2:33 AM tarun joshi <1985.ta...@gmail.com> wrote:

> Hey All,
>
> I am running flink in docker containers (image Tag
> :flink:scala_2.11-java11) on EC2.
>
> I am able to connect to a Kinesis Connector but nothing is being consumed.
>
> My command to start Jobmanager and TaskManager :
>
>
>
>
>
>
>
>
>
> *docker run \--rm \--volume /root/:/root/ \--env
> JOB_MANAGER_RPC_ADDRESS="${JOB_MANAGER_RPC_ADDRESS}" \--env
> TASK_MANAGER_NUMBER_OF_TASK_SLOTS="${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}"
> \--env
> ENABLE_BUILT_IN_PLUGINS="flink-s3-fs-hadoop-1.13.1.jar;flink-s3-fs-presto-1.13.1.jar"
> \--name=jobmanager \--network flink-network \--publish 8081:8081
> \flink:scala_2.11-java11 jobmanager &*
>
>
>
>
>
>
>
>
>
> *docker run \--rm \--env
> JOB_MANAGER_RPC_ADDRESS="${JOB_MANAGER_RPC_ADDRESS}" \--env
> TASK_MANAGER_NUMBER_OF_TASK_SLOTS="${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}"
> \--env
> ENABLE_BUILT_IN_PLUGINS="flink-s3-fs-hadoop-1.13.1.jar;flink-s3-fs-presto-1.13.1.jar"
> \--name=taskmanager_0 \--network flink-network \flink:scala_2.11-java11
> taskmanager &*
>
> 2021-08-17 22:38:01,106 INFO
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] -
> Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='<My
> Stream Name>', shard='{ShardId: shardId-000000000000,HashKeyRange:
> {StartingHashKey: 0,EndingHashKey: 34028236692093846346337460743176821144}
> ,SequenceNumberRange: {StartingSequenceNumber:
> 49600280467722672235426674687631661510244124728928239618,}}'}, starting
> state set as sequence number LATEST_SEQUENCE_NUM
>
> &&& this for each shard Consumer
>
> 2021-08-17 22:38:01,107 INFO
> org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher
> [] - Subtask 0 will start consuming seeded shard
> StreamShardHandle{streamName='web-clickstream', shard='{ShardId:
> shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey:
> 34028236692093846346337460743176821144},SequenceNumberRange:
> {StartingSequenceNumber:
> 49600280467722672235426674687631661510244124728928239618,}}'} from
> sequence number LATEST_SEQUENCE_NUM with ShardConsumer 0
>
> my program is simple to test out a DataStream from Kinesis
>
> FlinkKinesisConsumer<String> kinesisConsumer =
>         new FlinkKinesisConsumer<>(
>                 "<My-SteamName>", new SimpleStringSchema(), 
> getKafkaConsumerProperties());
> env.addSource(kinesisConsumer).print();
>
> env.execute("Read files in streaming fashion");
>
> Other Facts:
>
>
>    1. I can see data being flowing into our kinesis stream from the
>    Monitoring Tab of AWS continuously.
>    2. I was facing issues with Authorization of accessing the Kinesis in
>    our AWS infra, but I resolved that by moving in the same security group of
>    Kinesis deployment and creating a role with full access to Kinesis.
>
>
> Any pointers are really appreciated!
>
> Thanks,
> Tarun
>

Reply via email to