Do you know step by step process to reproduce this problem?

-Ravi


On Wed 16 Oct, 2019, 17:40 Steven Nelson, <snel...@sourceallies.com> wrote:

> I have verified this behavior in 1.9.0, 1.8.1 and 1.7.2.
>
> About half my shards start over at trim horizon. Why would some shard
> statuses appear to not exist in a savepoints? This seems like a big problem.
>
> -Steve
>
> On Wed, Oct 16, 2019 at 12:08 AM Ravi Bhushan Ratnakar <
> ravibhushanratna...@gmail.com> wrote:
>
>> Hi,
>>
>> I am also facing the same problem. I am using Flink 1.9.0 and consuming
>> from Kinesis source with retention of 1 day. I am observing that when the
>> job is submitted with "latest" initial stream position, the job starts well
>> and keep on processing data from all the shards for very long period of
>> time without any lag. When the job fails then it also recovery well with
>> last successful checkpointed state. But i am also experiencing that very
>> rarely when the job fails and it recovers from the last successful
>> checkpointed state, i noticed a hug lag( 1 day as per retention) on one of
>> the stream. For me, to reproduce this issue is still unknown to defined a
>> step by step process.
>>
>> So far now, as per the analysis i gathered some  more information by
>> customizing the FlinkKinesisConsumer to put additional log message, I
>> noticed that the number of shards details which is loaded from checkpoint
>> data during recovering is less than than the actual number of shards in the
>> stream. I have fixed number of shards in kinesis stream.
>>
>> i added one line of debug log at line 408 to print the size of variable "
>> sequenceNumsToRestore" which was populated with shard details from
>> checkpoint data.
>>
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L408
>>
>> In this consumer class, when the "run" method is called, it does following
>>
>>    -  it discover shards from kinesis stream and selects all those
>>    shards which a subtask can scheduled
>>    - then one by one it iterates over the discovers shards and checks
>>    that whether that shards state is available in recovered state
>>    "sequenceNumsToRestore"
>>    
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L295
>>    - if it is available then it scheduled that shard with the recovered
>>    state
>>    - if it is not available in the state then it shcedule that shard
>>    with "EARLIEST_SEQUENCE_NUMBER"
>>    
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L308
>>
>> As in my case, the recovered number of shard details from the checkpoint
>> data is less than the actual number of shards which results into scheduling
>> those shards with earliest stream position.
>> I am suspecting that somehow the checkpoint is missing state for some of
>> the shards. But if this is the case then that checkpoint should had failed.
>>
>> Any further information to resolve this issue would be highly
>> appreciated...
>>
>> Regards,
>> Ravi
>>
>> On Wed, Oct 16, 2019 at 5:57 AM Yun Tang <myas...@live.com> wrote:
>>
>>> Hi Steven
>>>
>>> If you restore savepoint/checkpoint successfully, I think this might due
>>> to the shard wasn't discovered in the previous run, therefore it would be
>>> consumed from the beginning. Please refer to the implementation here: [1]
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/2c411686d23f456cdc502abf1c6b97a61070a17d/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L307
>>>
>>> Best
>>> Yun Tang
>>> ------------------------------
>>> *From:* Steven Nelson <snel...@sourceallies.com>
>>> *Sent:* Wednesday, October 16, 2019 4:31
>>> *To:* user <user@flink.apache.org>
>>> *Subject:* Kinesis Connector and Savepoint/Checkpoint restore.
>>>
>>> Hello, we currently use Flink 1.9.0 with Kinesis to process data.
>>>
>>> We have extended data retention on the Kinesis stream, which gives us 7
>>> days of data.
>>>
>>> We have found that when a savepoint/checkpoint is restored that it
>>> appears to be restarting the Kinesis Consumer from the start of the stream.
>>> The 
>>> flink_taskmanager_job_task_operator_KinesisConsumer_stream_shardId_millisBehindLatest
>>> property reports to Prometheus that it is behind by 7 days when the process
>>> starts back up from a savepoint.
>>>
>>> We have some logs that say:
>>>
>>> Subtask 3 will start consuming seeded shard 
>>> StreamShardHandle{streamName='TheStream',
>>> shard='{ShardId: shardId-000000000083,HashKeyRange: {StartingHashKey:
>>> 220651847300296034902031972006537199616,EndingHashKey:
>>> 223310303291865866647839586127097888767},SequenceNumberRange:
>>> {StartingSequenceNumber:
>>> 49597946220601502339755334362523522663986150244033234226,}}'} from sequence
>>> number EARLIEST_SEQUENCE_NUM with ShardConsumer 20
>>>
>>> This seems to indicate that this shard is starting from the beginning of
>>> the stream
>>>
>>> and some logs that say:
>>> Subtask 3 will start consuming seeded shard StreamShardHandle
>>> {streamName=' TheStream ', shard='{ShardId:
>>> shardId-000000000087,HashKeyRange: {StartingHashKey:
>>> 231285671266575361885262428488779956224,EndingHashKey:
>>> 233944127258145193631070042609340645375},SequenceNumberRange:
>>> {StartingSequenceNumber:
>>> 49597946220690705320549456855089665537076743690057155954,}}'} from sequence
>>> number 49599841594208637293623823226010128300928335129272649074 with
>>> ShardConsumer 21
>>>
>>> This shard seems to be resuming from a specific point.
>>>
>>> I am assuming that this might be caused by no data being available on
>>> the shard for the entire stream (possible with this application stage). Is
>>> this the expected behavior? I had thought it would checkpoint with the most
>>> recent sequence number, regardless of if it got data or not.
>>>
>>> -Steve
>>>
>>>
>>>

Reply via email to