Re: Kinesis Connector and Savepoint/Checkpoint restore.

2019-10-15 Thread Yun Tang
Hi Steven

If you restore savepoint/checkpoint successfully, I think this might due to the 
shard wasn't discovered in the previous run, therefore it would be consumed 
from the beginning. Please refer to the implementation here: [1]

[1] 
https://github.com/apache/flink/blob/2c411686d23f456cdc502abf1c6b97a61070a17d/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L307

Best
Yun Tang

From: Steven Nelson 
Sent: Wednesday, October 16, 2019 4:31
To: user 
Subject: Kinesis Connector and Savepoint/Checkpoint restore.

Hello, we currently use Flink 1.9.0 with Kinesis to process data.

We have extended data retention on the Kinesis stream, which gives us 7 days of 
data.

We have found that when a savepoint/checkpoint is restored that it appears to 
be restarting the Kinesis Consumer from the start of the stream. The 
flink_taskmanager_job_task_operator_KinesisConsumer_stream_shardId_millisBehindLatest
 property reports to Prometheus that it is behind by 7 days when the process 
starts back up from a savepoint.

We have some logs that say:

Subtask 3 will start consuming seeded shard 
StreamShardHandle{streamName='TheStream', shard='{ShardId: 
shardId-0083,HashKeyRange: {StartingHashKey: 
220651847300296034902031972006537199616,EndingHashKey: 
223310303291865866647839586127097888767},SequenceNumberRange: 
{StartingSequenceNumber: 
49597946220601502339755334362523522663986150244033234226,}}'} from sequence 
number EARLIEST_SEQUENCE_NUM with ShardConsumer 20

This seems to indicate that this shard is starting from the beginning of the 
stream

and some logs that say:
Subtask 3 will start consuming seeded shard StreamShardHandle{streamName=' 
TheStream ', shard='{ShardId: shardId-0087,HashKeyRange: 
{StartingHashKey: 231285671266575361885262428488779956224,EndingHashKey: 
233944127258145193631070042609340645375},SequenceNumberRange: 
{StartingSequenceNumber: 
49597946220690705320549456855089665537076743690057155954,}}'} from sequence 
number 49599841594208637293623823226010128300928335129272649074 with 
ShardConsumer 21

This shard seems to be resuming from a specific point.

I am assuming that this might be caused by no data being available on the shard 
for the entire stream (possible with this application stage). Is this the 
expected behavior? I had thought it would checkpoint with the most recent 
sequence number, regardless of if it got data or not.

-Steve




Re: Kinesis Connector and Savepoint/Checkpoint restore.

2019-10-15 Thread Ravi Bhushan Ratnakar
Hi,

I am also facing the same problem. I am using Flink 1.9.0 and consuming
from Kinesis source with retention of 1 day. I am observing that when the
job is submitted with "latest" initial stream position, the job starts well
and keep on processing data from all the shards for very long period of
time without any lag. When the job fails then it also recovery well with
last successful checkpointed state. But i am also experiencing that very
rarely when the job fails and it recovers from the last successful
checkpointed state, i noticed a hug lag( 1 day as per retention) on one of
the stream. For me, to reproduce this issue is still unknown to defined a
step by step process.

So far now, as per the analysis i gathered some  more information by
customizing the FlinkKinesisConsumer to put additional log message, I
noticed that the number of shards details which is loaded from checkpoint
data during recovering is less than than the actual number of shards in the
stream. I have fixed number of shards in kinesis stream.

i added one line of debug log at line 408 to print the size of variable "
sequenceNumsToRestore" which was populated with shard details from
checkpoint data.
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L408

In this consumer class, when the "run" method is called, it does following

   -  it discover shards from kinesis stream and selects all those shards
   which a subtask can scheduled
   - then one by one it iterates over the discovers shards and checks that
   whether that shards state is available in recovered state
   "sequenceNumsToRestore"
   
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L295
   - if it is available then it scheduled that shard with the recovered
   state
   - if it is not available in the state then it shcedule that shard with
   "EARLIEST_SEQUENCE_NUMBER"
   
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L308

As in my case, the recovered number of shard details from the checkpoint
data is less than the actual number of shards which results into scheduling
those shards with earliest stream position.
I am suspecting that somehow the checkpoint is missing state for some of
the shards. But if this is the case then that checkpoint should had failed.

Any further information to resolve this issue would be highly appreciated...

Regards,
Ravi

On Wed, Oct 16, 2019 at 5:57 AM Yun Tang  wrote:

> Hi Steven
>
> If you restore savepoint/checkpoint successfully, I think this might due
> to the shard wasn't discovered in the previous run, therefore it would be
> consumed from the beginning. Please refer to the implementation here: [1]
>
> [1]
> https://github.com/apache/flink/blob/2c411686d23f456cdc502abf1c6b97a61070a17d/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L307
>
> Best
> Yun Tang
> --
> *From:* Steven Nelson 
> *Sent:* Wednesday, October 16, 2019 4:31
> *To:* user 
> *Subject:* Kinesis Connector and Savepoint/Checkpoint restore.
>
> Hello, we currently use Flink 1.9.0 with Kinesis to process data.
>
> We have extended data retention on the Kinesis stream, which gives us 7
> days of data.
>
> We have found that when a savepoint/checkpoint is restored that it appears
> to be restarting the Kinesis Consumer from the start of the stream.
> The 
> flink_taskmanager_job_task_operator_KinesisConsumer_stream_shardId_millisBehindLatest
> property reports to Prometheus that it is behind by 7 days when the process
> starts back up from a savepoint.
>
> We have some logs that say:
>
> Subtask 3 will start consuming seeded shard 
> StreamShardHandle{streamName='TheStream',
> shard='{ShardId: shardId-0083,HashKeyRange: {StartingHashKey:
> 220651847300296034902031972006537199616,EndingHashKey:
> 223310303291865866647839586127097888767},SequenceNumberRange:
> {StartingSequenceNumber:
> 49597946220601502339755334362523522663986150244033234226,}}'} from sequence
> number EARLIEST_SEQUENCE_NUM with ShardConsumer 20
>
> This seems to indicate that this shard is starting from the beginning of
> the stream
>
> and some logs that say:
> Subtask 3 will start consuming seeded shard StreamShardHandle{streamName='
> TheStream ', shard='{ShardId: shardId-0087,HashKeyRange:
> {StartingHashKey: 231285671266575361885262428488779956224,EndingHashKey:
> 233944127258145193631070042609340645375},SequenceNumberRange:
> {StartingSequenceNumber:
> 49597946220690705320549456855089665537076743690057155954,}}'} from sequence
> number 49599841594208637293623823226010128300928335129272649074 with
> ShardConsumer 21
>
>

Re: Kinesis Connector and Savepoint/Checkpoint restore.

2019-10-16 Thread Steven Nelson
I have verified this behavior in 1.9.0, 1.8.1 and 1.7.2.

About half my shards start over at trim horizon. Why would some shard
statuses appear to not exist in a savepoints? This seems like a big problem.

-Steve

On Wed, Oct 16, 2019 at 12:08 AM Ravi Bhushan Ratnakar <
ravibhushanratna...@gmail.com> wrote:

> Hi,
>
> I am also facing the same problem. I am using Flink 1.9.0 and consuming
> from Kinesis source with retention of 1 day. I am observing that when the
> job is submitted with "latest" initial stream position, the job starts well
> and keep on processing data from all the shards for very long period of
> time without any lag. When the job fails then it also recovery well with
> last successful checkpointed state. But i am also experiencing that very
> rarely when the job fails and it recovers from the last successful
> checkpointed state, i noticed a hug lag( 1 day as per retention) on one of
> the stream. For me, to reproduce this issue is still unknown to defined a
> step by step process.
>
> So far now, as per the analysis i gathered some  more information by
> customizing the FlinkKinesisConsumer to put additional log message, I
> noticed that the number of shards details which is loaded from checkpoint
> data during recovering is less than than the actual number of shards in the
> stream. I have fixed number of shards in kinesis stream.
>
> i added one line of debug log at line 408 to print the size of variable "
> sequenceNumsToRestore" which was populated with shard details from
> checkpoint data.
>
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L408
>
> In this consumer class, when the "run" method is called, it does following
>
>-  it discover shards from kinesis stream and selects all those shards
>which a subtask can scheduled
>- then one by one it iterates over the discovers shards and checks
>that whether that shards state is available in recovered state
>"sequenceNumsToRestore"
>
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L295
>- if it is available then it scheduled that shard with the recovered
>state
>- if it is not available in the state then it shcedule that shard with
>"EARLIEST_SEQUENCE_NUMBER"
>
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L308
>
> As in my case, the recovered number of shard details from the checkpoint
> data is less than the actual number of shards which results into scheduling
> those shards with earliest stream position.
> I am suspecting that somehow the checkpoint is missing state for some of
> the shards. But if this is the case then that checkpoint should had failed.
>
> Any further information to resolve this issue would be highly
> appreciated...
>
> Regards,
> Ravi
>
> On Wed, Oct 16, 2019 at 5:57 AM Yun Tang  wrote:
>
>> Hi Steven
>>
>> If you restore savepoint/checkpoint successfully, I think this might due
>> to the shard wasn't discovered in the previous run, therefore it would be
>> consumed from the beginning. Please refer to the implementation here: [1]
>>
>> [1]
>> https://github.com/apache/flink/blob/2c411686d23f456cdc502abf1c6b97a61070a17d/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L307
>>
>> Best
>> Yun Tang
>> --
>> *From:* Steven Nelson 
>> *Sent:* Wednesday, October 16, 2019 4:31
>> *To:* user 
>> *Subject:* Kinesis Connector and Savepoint/Checkpoint restore.
>>
>> Hello, we currently use Flink 1.9.0 with Kinesis to process data.
>>
>> We have extended data retention on the Kinesis stream, which gives us 7
>> days of data.
>>
>> We have found that when a savepoint/checkpoint is restored that it
>> appears to be restarting the Kinesis Consumer from the start of the stream.
>> The 
>> flink_taskmanager_job_task_operator_KinesisConsumer_stream_shardId_millisBehindLatest
>> property reports to Prometheus that it is behind by 7 days when the process
>> starts back up from a savepoint.
>>
>> We have some logs that say:
>>
>> Subtask 3 will start consuming seeded shard 
>> StreamShardHandle{streamName='TheStream',
>> shard='{ShardId: shardId-0083,HashKeyRange: {StartingHashKey:
>> 220651847300296034902031972006537199616,EndingHashKey:
>> 223310303291865866647839586127097888767},SequenceNumberRange:
>> {StartingSequenceNumber:
>> 49597946220601502339755334362523522663986150244033234226,}}'} from sequence
>> number EARLIEST_SEQUENCE_NUM with ShardConsumer 20
>>
>> This seems to indicate that this shard is starting from the beginning of
>> the stream
>>
>> and some logs that say:
>> Subtask 3 will start con

Re: Kinesis Connector and Savepoint/Checkpoint restore.

2019-10-16 Thread Ravi Bhushan Ratnakar
Do you know step by step process to reproduce this problem?

-Ravi


On Wed 16 Oct, 2019, 17:40 Steven Nelson,  wrote:

> I have verified this behavior in 1.9.0, 1.8.1 and 1.7.2.
>
> About half my shards start over at trim horizon. Why would some shard
> statuses appear to not exist in a savepoints? This seems like a big problem.
>
> -Steve
>
> On Wed, Oct 16, 2019 at 12:08 AM Ravi Bhushan Ratnakar <
> ravibhushanratna...@gmail.com> wrote:
>
>> Hi,
>>
>> I am also facing the same problem. I am using Flink 1.9.0 and consuming
>> from Kinesis source with retention of 1 day. I am observing that when the
>> job is submitted with "latest" initial stream position, the job starts well
>> and keep on processing data from all the shards for very long period of
>> time without any lag. When the job fails then it also recovery well with
>> last successful checkpointed state. But i am also experiencing that very
>> rarely when the job fails and it recovers from the last successful
>> checkpointed state, i noticed a hug lag( 1 day as per retention) on one of
>> the stream. For me, to reproduce this issue is still unknown to defined a
>> step by step process.
>>
>> So far now, as per the analysis i gathered some  more information by
>> customizing the FlinkKinesisConsumer to put additional log message, I
>> noticed that the number of shards details which is loaded from checkpoint
>> data during recovering is less than than the actual number of shards in the
>> stream. I have fixed number of shards in kinesis stream.
>>
>> i added one line of debug log at line 408 to print the size of variable "
>> sequenceNumsToRestore" which was populated with shard details from
>> checkpoint data.
>>
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L408
>>
>> In this consumer class, when the "run" method is called, it does following
>>
>>-  it discover shards from kinesis stream and selects all those
>>shards which a subtask can scheduled
>>- then one by one it iterates over the discovers shards and checks
>>that whether that shards state is available in recovered state
>>"sequenceNumsToRestore"
>>
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L295
>>- if it is available then it scheduled that shard with the recovered
>>state
>>- if it is not available in the state then it shcedule that shard
>>with "EARLIEST_SEQUENCE_NUMBER"
>>
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L308
>>
>> As in my case, the recovered number of shard details from the checkpoint
>> data is less than the actual number of shards which results into scheduling
>> those shards with earliest stream position.
>> I am suspecting that somehow the checkpoint is missing state for some of
>> the shards. But if this is the case then that checkpoint should had failed.
>>
>> Any further information to resolve this issue would be highly
>> appreciated...
>>
>> Regards,
>> Ravi
>>
>> On Wed, Oct 16, 2019 at 5:57 AM Yun Tang  wrote:
>>
>>> Hi Steven
>>>
>>> If you restore savepoint/checkpoint successfully, I think this might due
>>> to the shard wasn't discovered in the previous run, therefore it would be
>>> consumed from the beginning. Please refer to the implementation here: [1]
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/2c411686d23f456cdc502abf1c6b97a61070a17d/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L307
>>>
>>> Best
>>> Yun Tang
>>> --
>>> *From:* Steven Nelson 
>>> *Sent:* Wednesday, October 16, 2019 4:31
>>> *To:* user 
>>> *Subject:* Kinesis Connector and Savepoint/Checkpoint restore.
>>>
>>> Hello, we currently use Flink 1.9.0 with Kinesis to process data.
>>>
>>> We have extended data retention on the Kinesis stream, which gives us 7
>>> days of data.
>>>
>>> We have found that when a savepoint/checkpoint is restored that it
>>> appears to be restarting the Kinesis Consumer from the start of the stream.
>>> The 
>>> flink_taskmanager_job_task_operator_KinesisConsumer_stream_shardId_millisBehindLatest
>>> property reports to Prometheus that it is behind by 7 days when the process
>>> starts back up from a savepoint.
>>>
>>> We have some logs that say:
>>>
>>> Subtask 3 will start consuming seeded shard 
>>> StreamShardHandle{streamName='TheStream',
>>> shard='{ShardId: shardId-0083,HashKeyRange: {StartingHashKey:
>>> 220651847300296034902031972006537199616,EndingHashKey:
>>> 223310303291865866647839586127097888767},SequenceNumberRange:
>>> {StartingSequenceNumber:
>>> 495979462206015023397553343625235226639861502440332

Re: Kinesis Connector and Savepoint/Checkpoint restore.

2019-10-16 Thread Steven Nelson
In my situation I believe it's because we have idle shards (it's a testing
environment). I dug into the connector code and it looks like it only
updates the shard state when a record is processed or when the shard hits
shard_end. So, for an idle shard it would never get a checkpointed state. I
guess this is okay since in production we won't have idle shards, but it
might be better to send through a empty record that doesn't get emitted,
but it does trigger a state update.

-Steve


On Wed, Oct 16, 2019 at 12:54 PM Ravi Bhushan Ratnakar <
ravibhushanratna...@gmail.com> wrote:

> Do you know step by step process to reproduce this problem?
>
> -Ravi
>
>
> On Wed 16 Oct, 2019, 17:40 Steven Nelson, 
> wrote:
>
>> I have verified this behavior in 1.9.0, 1.8.1 and 1.7.2.
>>
>> About half my shards start over at trim horizon. Why would some shard
>> statuses appear to not exist in a savepoints? This seems like a big problem.
>>
>> -Steve
>>
>> On Wed, Oct 16, 2019 at 12:08 AM Ravi Bhushan Ratnakar <
>> ravibhushanratna...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I am also facing the same problem. I am using Flink 1.9.0 and consuming
>>> from Kinesis source with retention of 1 day. I am observing that when the
>>> job is submitted with "latest" initial stream position, the job starts well
>>> and keep on processing data from all the shards for very long period of
>>> time without any lag. When the job fails then it also recovery well with
>>> last successful checkpointed state. But i am also experiencing that very
>>> rarely when the job fails and it recovers from the last successful
>>> checkpointed state, i noticed a hug lag( 1 day as per retention) on one of
>>> the stream. For me, to reproduce this issue is still unknown to defined a
>>> step by step process.
>>>
>>> So far now, as per the analysis i gathered some  more information by
>>> customizing the FlinkKinesisConsumer to put additional log message, I
>>> noticed that the number of shards details which is loaded from checkpoint
>>> data during recovering is less than than the actual number of shards in the
>>> stream. I have fixed number of shards in kinesis stream.
>>>
>>> i added one line of debug log at line 408 to print the size of variable "
>>> sequenceNumsToRestore" which was populated with shard details from
>>> checkpoint data.
>>>
>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L408
>>>
>>> In this consumer class, when the "run" method is called, it does
>>> following
>>>
>>>-  it discover shards from kinesis stream and selects all those
>>>shards which a subtask can scheduled
>>>- then one by one it iterates over the discovers shards and checks
>>>that whether that shards state is available in recovered state
>>>"sequenceNumsToRestore"
>>>
>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L295
>>>- if it is available then it scheduled that shard with the recovered
>>>state
>>>- if it is not available in the state then it shcedule that shard
>>>with "EARLIEST_SEQUENCE_NUMBER"
>>>
>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L308
>>>
>>> As in my case, the recovered number of shard details from the checkpoint
>>> data is less than the actual number of shards which results into scheduling
>>> those shards with earliest stream position.
>>> I am suspecting that somehow the checkpoint is missing state for some of
>>> the shards. But if this is the case then that checkpoint should had failed.
>>>
>>> Any further information to resolve this issue would be highly
>>> appreciated...
>>>
>>> Regards,
>>> Ravi
>>>
>>> On Wed, Oct 16, 2019 at 5:57 AM Yun Tang  wrote:
>>>
 Hi Steven

 If you restore savepoint/checkpoint successfully, I think this might
 due to the shard wasn't discovered in the previous run, therefore it would
 be consumed from the beginning. Please refer to the implementation here: 
 [1]

 [1]
 https://github.com/apache/flink/blob/2c411686d23f456cdc502abf1c6b97a61070a17d/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L307

 Best
 Yun Tang
 --
 *From:* Steven Nelson 
 *Sent:* Wednesday, October 16, 2019 4:31
 *To:* user 
 *Subject:* Kinesis Connector and Savepoint/Checkpoint restore.

 Hello, we currently use Flink 1.9.0 with Kinesis to process data.

 We have extended data retention on the Kinesis stream, which gives us 7
 days of data.

 We have found that when a savepoint/checkpoint is restored that it
 appears to be re

Re: Kinesis Connector and Savepoint/Checkpoint restore.

2019-11-21 Thread Congxian Qiu
Hi

For idle shards, I think restore from the previous not consumed data is ok,
because Flink did not consume any data before, but for not idle shards this
is problematic. From my experience of other connectors, could you check
whether the "error" shards are newly split? maybe the newly split shards
were not contained in the checkpoint.

Best,
Congxian


Steven Nelson  于2019年10月17日周四 上午2:19写道:

> In my situation I believe it's because we have idle shards (it's a testing
> environment). I dug into the connector code and it looks like it only
> updates the shard state when a record is processed or when the shard hits
> shard_end. So, for an idle shard it would never get a checkpointed state. I
> guess this is okay since in production we won't have idle shards, but it
> might be better to send through a empty record that doesn't get emitted,
> but it does trigger a state update.
>
> -Steve
>
>
> On Wed, Oct 16, 2019 at 12:54 PM Ravi Bhushan Ratnakar <
> ravibhushanratna...@gmail.com> wrote:
>
>> Do you know step by step process to reproduce this problem?
>>
>> -Ravi
>>
>>
>> On Wed 16 Oct, 2019, 17:40 Steven Nelson, 
>> wrote:
>>
>>> I have verified this behavior in 1.9.0, 1.8.1 and 1.7.2.
>>>
>>> About half my shards start over at trim horizon. Why would some shard
>>> statuses appear to not exist in a savepoints? This seems like a big problem.
>>>
>>> -Steve
>>>
>>> On Wed, Oct 16, 2019 at 12:08 AM Ravi Bhushan Ratnakar <
>>> ravibhushanratna...@gmail.com> wrote:
>>>
 Hi,

 I am also facing the same problem. I am using Flink 1.9.0 and consuming
 from Kinesis source with retention of 1 day. I am observing that when the
 job is submitted with "latest" initial stream position, the job starts well
 and keep on processing data from all the shards for very long period of
 time without any lag. When the job fails then it also recovery well with
 last successful checkpointed state. But i am also experiencing that very
 rarely when the job fails and it recovers from the last successful
 checkpointed state, i noticed a hug lag( 1 day as per retention) on one of
 the stream. For me, to reproduce this issue is still unknown to defined a
 step by step process.

 So far now, as per the analysis i gathered some  more information by
 customizing the FlinkKinesisConsumer to put additional log message, I
 noticed that the number of shards details which is loaded from checkpoint
 data during recovering is less than than the actual number of shards in the
 stream. I have fixed number of shards in kinesis stream.

 i added one line of debug log at line 408 to print the size of variable
 "sequenceNumsToRestore" which was populated with shard details from
 checkpoint data.

 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L408

 In this consumer class, when the "run" method is called, it does
 following

-  it discover shards from kinesis stream and selects all those
shards which a subtask can scheduled
- then one by one it iterates over the discovers shards and checks
that whether that shards state is available in recovered state
"sequenceNumsToRestore"

 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L295
- if it is available then it scheduled that shard with the
recovered state
- if it is not available in the state then it shcedule that shard
with "EARLIEST_SEQUENCE_NUMBER"

 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L308

 As in my case, the recovered number of shard details from the
 checkpoint data is less than the actual number of shards which results into
 scheduling those shards with earliest stream position.
 I am suspecting that somehow the checkpoint is missing state for some
 of the shards. But if this is the case then that checkpoint should had
 failed.

 Any further information to resolve this issue would be highly
 appreciated...

 Regards,
 Ravi

 On Wed, Oct 16, 2019 at 5:57 AM Yun Tang  wrote:

> Hi Steven
>
> If you restore savepoint/checkpoint successfully, I think this might
> due to the shard wasn't discovered in the previous run, therefore it would
> be consumed from the beginning. Please refer to the implementation here: 
> [1]
>
> [1]
> https://github.com/apache/flink/blob/2c411686d23f456cdc502abf1c6b97a61070a17d/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.

Re: Kinesis Connector and Savepoint/Checkpoint restore.

2019-11-21 Thread Ravi Bhushan Ratnakar
Hi congxian,

Thank you for your reply. As I shared details in my previous mail, in my
case, last successful checkpoint is missing details for some of the shards.
I am not doing any up scale or down scale of kinesis shard. I always run
with fix number of shards, so there is no possibility of new shard
discovery which could cause such problem.

Thanks,
Ravi



On Fri 22 Nov, 2019, 02:53 Congxian Qiu,  wrote:

> Hi
>
> For idle shards, I think restore from the previous not consumed data is
> ok, because Flink did not consume any data before, but for not idle shards
> this is problematic. From my experience of other connectors, could you
> check whether the "error" shards are newly split? maybe the newly split
> shards were not contained in the checkpoint.
>
> Best,
> Congxian
>
>
> Steven Nelson  于2019年10月17日周四 上午2:19写道:
>
>> In my situation I believe it's because we have idle shards (it's a
>> testing environment). I dug into the connector code and it looks like it
>> only updates the shard state when a record is processed or when the shard
>> hits shard_end. So, for an idle shard it would never get a checkpointed
>> state. I guess this is okay since in production we won't have idle shards,
>> but it might be better to send through a empty record that doesn't get
>> emitted, but it does trigger a state update.
>>
>> -Steve
>>
>>
>> On Wed, Oct 16, 2019 at 12:54 PM Ravi Bhushan Ratnakar <
>> ravibhushanratna...@gmail.com> wrote:
>>
>>> Do you know step by step process to reproduce this problem?
>>>
>>> -Ravi
>>>
>>>
>>> On Wed 16 Oct, 2019, 17:40 Steven Nelson, 
>>> wrote:
>>>
 I have verified this behavior in 1.9.0, 1.8.1 and 1.7.2.

 About half my shards start over at trim horizon. Why would some shard
 statuses appear to not exist in a savepoints? This seems like a big 
 problem.

 -Steve

 On Wed, Oct 16, 2019 at 12:08 AM Ravi Bhushan Ratnakar <
 ravibhushanratna...@gmail.com> wrote:

> Hi,
>
> I am also facing the same problem. I am using Flink 1.9.0 and
> consuming from Kinesis source with retention of 1 day. I am observing that
> when the job is submitted with "latest" initial stream position, the job
> starts well and keep on processing data from all the shards for very long
> period of time without any lag. When the job fails then it also recovery
> well with last successful checkpointed state. But i am also experiencing
> that very rarely when the job fails and it recovers from the last
> successful checkpointed state, i noticed a hug lag( 1 day as per 
> retention)
> on one of the stream. For me, to reproduce this issue is still unknown to
> defined a step by step process.
>
> So far now, as per the analysis i gathered some  more information by
> customizing the FlinkKinesisConsumer to put additional log message, I
> noticed that the number of shards details which is loaded from checkpoint
> data during recovering is less than than the actual number of shards in 
> the
> stream. I have fixed number of shards in kinesis stream.
>
> i added one line of debug log at line 408 to print the size of
> variable "sequenceNumsToRestore" which was populated with shard
> details from checkpoint data.
>
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L408
>
> In this consumer class, when the "run" method is called, it does
> following
>
>-  it discover shards from kinesis stream and selects all those
>shards which a subtask can scheduled
>- then one by one it iterates over the discovers shards and checks
>that whether that shards state is available in recovered state
>"sequenceNumsToRestore"
>
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L295
>- if it is available then it scheduled that shard with the
>recovered state
>- if it is not available in the state then it shcedule that shard
>with "EARLIEST_SEQUENCE_NUMBER"
>
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L308
>
> As in my case, the recovered number of shard details from the
> checkpoint data is less than the actual number of shards which results 
> into
> scheduling those shards with earliest stream position.
> I am suspecting that somehow the checkpoint is missing state for some
> of the shards. But if this is the case then that checkpoint should had
> failed.
>
> Any further information to resolve this issue would be highly
> appreciated...
>
> Regards,
> Ravi
>