Re: flink Kinesis Consumer Connected but not consuming

2021-08-18 Thread Danny Cranmer
Hey Tarun,



Your application looks ok and should work. I did notice this, however I
cannot imagine it is an issue, unless you are not setting the region
correctly:

   - getKafkaConsumerProperties()



Make sure you are setting the correct region
(AWSConfigConstants.AWS_REGION) in the properties. If this is ok, please
check Flink dashboard to ensure the following metrics are flowing for this
operator:

   - millisBehindLatest
   - numberOfAggregatedRecords
   - numberOfDeaggregatedRecords



Thanks,

On Wed, Aug 18, 2021 at 2:33 AM tarun joshi <1985.ta...@gmail.com> wrote:

> Hey All,
>
> I am running flink in docker containers (image Tag
> :flink:scala_2.11-java11) on EC2.
>
> I am able to connect to a Kinesis Connector but nothing is being consumed.
>
> My command to start Jobmanager and TaskManager :
>
>
>
>
>
>
>
>
>
> *docker run \--rm \--volume /root/:/root/ \--env
> JOB_MANAGER_RPC_ADDRESS="${JOB_MANAGER_RPC_ADDRESS}" \--env
> TASK_MANAGER_NUMBER_OF_TASK_SLOTS="${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}"
> \--env
> ENABLE_BUILT_IN_PLUGINS="flink-s3-fs-hadoop-1.13.1.jar;flink-s3-fs-presto-1.13.1.jar"
> \--name=jobmanager \--network flink-network \--publish 8081:8081
> \flink:scala_2.11-java11 jobmanager &*
>
>
>
>
>
>
>
>
>
> *docker run \--rm \--env
> JOB_MANAGER_RPC_ADDRESS="${JOB_MANAGER_RPC_ADDRESS}" \--env
> TASK_MANAGER_NUMBER_OF_TASK_SLOTS="${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}"
> \--env
> ENABLE_BUILT_IN_PLUGINS="flink-s3-fs-hadoop-1.13.1.jar;flink-s3-fs-presto-1.13.1.jar"
> \--name=taskmanager_0 \--network flink-network \flink:scala_2.11-java11
> taskmanager &*
>
> 2021-08-17 22:38:01,106 INFO
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] -
> Subtask 0 will be seeded with initial shard StreamShardHandle{streamName=' Stream Name>', shard='{ShardId: shardId-,HashKeyRange:
> {StartingHashKey: 0,EndingHashKey: 34028236692093846346337460743176821144}
> ,SequenceNumberRange: {StartingSequenceNumber:
> 49600280467722672235426674687631661510244124728928239618,}}'}, starting
> state set as sequence number LATEST_SEQUENCE_NUM
>
> &&& this for each shard Consumer
>
> 2021-08-17 22:38:01,107 INFO
> org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher
> [] - Subtask 0 will start consuming seeded shard
> StreamShardHandle{streamName='web-clickstream', shard='{ShardId:
> shardId-,HashKeyRange: {StartingHashKey: 0,EndingHashKey:
> 34028236692093846346337460743176821144},SequenceNumberRange:
> {StartingSequenceNumber:
> 49600280467722672235426674687631661510244124728928239618,}}'} from
> sequence number LATEST_SEQUENCE_NUM with ShardConsumer 0
>
> my program is simple to test out a DataStream from Kinesis
>
> FlinkKinesisConsumer kinesisConsumer =
> new FlinkKinesisConsumer<>(
> "", new SimpleStringSchema(), 
> getKafkaConsumerProperties());
> env.addSource(kinesisConsumer).print();
>
> env.execute("Read files in streaming fashion");
>
> Other Facts:
>
>
>1. I can see data being flowing into our kinesis stream from the
>Monitoring Tab of AWS continuously.
>2. I was facing issues with Authorization of accessing the Kinesis in
>our AWS infra, but I resolved that by moving in the same security group of
>Kinesis deployment and creating a role with full access to Kinesis.
>
>
> Any pointers are really appreciated!
>
> Thanks,
> Tarun
>


flink Kinesis Consumer Connected but not consuming

2021-08-17 Thread tarun joshi
Hey All,

I am running flink in docker containers (image Tag
:flink:scala_2.11-java11) on EC2.

I am able to connect to a Kinesis Connector but nothing is being consumed.

My command to start Jobmanager and TaskManager :









*docker run \--rm \--volume /root/:/root/ \--env
JOB_MANAGER_RPC_ADDRESS="${JOB_MANAGER_RPC_ADDRESS}" \--env
TASK_MANAGER_NUMBER_OF_TASK_SLOTS="${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}"
\--env
ENABLE_BUILT_IN_PLUGINS="flink-s3-fs-hadoop-1.13.1.jar;flink-s3-fs-presto-1.13.1.jar"
\--name=jobmanager \--network flink-network \--publish 8081:8081
\flink:scala_2.11-java11 jobmanager &*









*docker run \--rm \--env
JOB_MANAGER_RPC_ADDRESS="${JOB_MANAGER_RPC_ADDRESS}" \--env
TASK_MANAGER_NUMBER_OF_TASK_SLOTS="${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}"
\--env
ENABLE_BUILT_IN_PLUGINS="flink-s3-fs-hadoop-1.13.1.jar;flink-s3-fs-presto-1.13.1.jar"
\--name=taskmanager_0 \--network flink-network \flink:scala_2.11-java11
taskmanager &*

2021-08-17 22:38:01,106 INFO org.apache.flink.streaming.connectors.kinesis.
FlinkKinesisConsumer [] -
Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='', shard='{ShardId: shardId-,HashKeyRange:
{StartingHashKey: 0,EndingHashKey: 34028236692093846346337460743176821144}
,SequenceNumberRange: {StartingSequenceNumber:
49600280467722672235426674687631661510244124728928239618,}}'}, starting
state set as sequence number LATEST_SEQUENCE_NUM

&&& this for each shard Consumer

2021-08-17 22:38:01,107 INFO
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher
[] - Subtask 0 will start consuming seeded shard
StreamShardHandle{streamName='web-clickstream', shard='{ShardId:
shardId-,HashKeyRange: {StartingHashKey: 0,EndingHashKey:
34028236692093846346337460743176821144},SequenceNumberRange:
{StartingSequenceNumber:
49600280467722672235426674687631661510244124728928239618,}}'} from sequence
number LATEST_SEQUENCE_NUM with ShardConsumer 0

my program is simple to test out a DataStream from Kinesis

FlinkKinesisConsumer kinesisConsumer =
new FlinkKinesisConsumer<>(
"", new SimpleStringSchema(),
getKafkaConsumerProperties());
env.addSource(kinesisConsumer).print();

env.execute("Read files in streaming fashion");

Other Facts:


   1. I can see data being flowing into our kinesis stream from the
   Monitoring Tab of AWS continuously.
   2. I was facing issues with Authorization of accessing the Kinesis in
   our AWS infra, but I resolved that by moving in the same security group of
   Kinesis deployment and creating a role with full access to Kinesis.


Any pointers are really appreciated!

Thanks,
Tarun