Hi congxian,

Thank you for your reply. As I shared details in my previous mail, in my
case, last successful checkpoint is missing details for some of the shards.
I am not doing any up scale or down scale of kinesis shard. I always run
with fix number of shards, so there is no possibility of new shard
discovery which could cause such problem.

Thanks,
Ravi



On Fri 22 Nov, 2019, 02:53 Congxian Qiu, <qcx978132...@gmail.com> wrote:

> Hi
>
> For idle shards, I think restore from the previous not consumed data is
> ok, because Flink did not consume any data before, but for not idle shards
> this is problematic. From my experience of other connectors, could you
> check whether the "error" shards are newly split? maybe the newly split
> shards were not contained in the checkpoint.
>
> Best,
> Congxian
>
>
> Steven Nelson <snel...@sourceallies.com> 于2019年10月17日周四 上午2:19写道:
>
>> In my situation I believe it's because we have idle shards (it's a
>> testing environment). I dug into the connector code and it looks like it
>> only updates the shard state when a record is processed or when the shard
>> hits shard_end. So, for an idle shard it would never get a checkpointed
>> state. I guess this is okay since in production we won't have idle shards,
>> but it might be better to send through a empty record that doesn't get
>> emitted, but it does trigger a state update.
>>
>> -Steve
>>
>>
>> On Wed, Oct 16, 2019 at 12:54 PM Ravi Bhushan Ratnakar <
>> ravibhushanratna...@gmail.com> wrote:
>>
>>> Do you know step by step process to reproduce this problem?
>>>
>>> -Ravi
>>>
>>>
>>> On Wed 16 Oct, 2019, 17:40 Steven Nelson, <snel...@sourceallies.com>
>>> wrote:
>>>
>>>> I have verified this behavior in 1.9.0, 1.8.1 and 1.7.2.
>>>>
>>>> About half my shards start over at trim horizon. Why would some shard
>>>> statuses appear to not exist in a savepoints? This seems like a big 
>>>> problem.
>>>>
>>>> -Steve
>>>>
>>>> On Wed, Oct 16, 2019 at 12:08 AM Ravi Bhushan Ratnakar <
>>>> ravibhushanratna...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I am also facing the same problem. I am using Flink 1.9.0 and
>>>>> consuming from Kinesis source with retention of 1 day. I am observing that
>>>>> when the job is submitted with "latest" initial stream position, the job
>>>>> starts well and keep on processing data from all the shards for very long
>>>>> period of time without any lag. When the job fails then it also recovery
>>>>> well with last successful checkpointed state. But i am also experiencing
>>>>> that very rarely when the job fails and it recovers from the last
>>>>> successful checkpointed state, i noticed a hug lag( 1 day as per 
>>>>> retention)
>>>>> on one of the stream. For me, to reproduce this issue is still unknown to
>>>>> defined a step by step process.
>>>>>
>>>>> So far now, as per the analysis i gathered some  more information by
>>>>> customizing the FlinkKinesisConsumer to put additional log message, I
>>>>> noticed that the number of shards details which is loaded from checkpoint
>>>>> data during recovering is less than than the actual number of shards in 
>>>>> the
>>>>> stream. I have fixed number of shards in kinesis stream.
>>>>>
>>>>> i added one line of debug log at line 408 to print the size of
>>>>> variable "sequenceNumsToRestore" which was populated with shard
>>>>> details from checkpoint data.
>>>>>
>>>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L408
>>>>>
>>>>> In this consumer class, when the "run" method is called, it does
>>>>> following
>>>>>
>>>>>    -  it discover shards from kinesis stream and selects all those
>>>>>    shards which a subtask can scheduled
>>>>>    - then one by one it iterates over the discovers shards and checks
>>>>>    that whether that shards state is available in recovered state
>>>>>    "sequenceNumsToRestore"
>>>>>    
>>>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L295
>>>>>    - if it is available then it scheduled that shard with the
>>>>>    recovered state
>>>>>    - if it is not available in the state then it shcedule that shard
>>>>>    with "EARLIEST_SEQUENCE_NUMBER"
>>>>>    
>>>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L308
>>>>>
>>>>> As in my case, the recovered number of shard details from the
>>>>> checkpoint data is less than the actual number of shards which results 
>>>>> into
>>>>> scheduling those shards with earliest stream position.
>>>>> I am suspecting that somehow the checkpoint is missing state for some
>>>>> of the shards. But if this is the case then that checkpoint should had
>>>>> failed.
>>>>>
>>>>> Any further information to resolve this issue would be highly
>>>>> appreciated...
>>>>>
>>>>> Regards,
>>>>> Ravi
>>>>>
>>>>> On Wed, Oct 16, 2019 at 5:57 AM Yun Tang <myas...@live.com> wrote:
>>>>>
>>>>>> Hi Steven
>>>>>>
>>>>>> If you restore savepoint/checkpoint successfully, I think this might
>>>>>> due to the shard wasn't discovered in the previous run, therefore it 
>>>>>> would
>>>>>> be consumed from the beginning. Please refer to the implementation here: 
>>>>>> [1]
>>>>>>
>>>>>> [1]
>>>>>> https://github.com/apache/flink/blob/2c411686d23f456cdc502abf1c6b97a61070a17d/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L307
>>>>>>
>>>>>> Best
>>>>>> Yun Tang
>>>>>> ------------------------------
>>>>>> *From:* Steven Nelson <snel...@sourceallies.com>
>>>>>> *Sent:* Wednesday, October 16, 2019 4:31
>>>>>> *To:* user <user@flink.apache.org>
>>>>>> *Subject:* Kinesis Connector and Savepoint/Checkpoint restore.
>>>>>>
>>>>>> Hello, we currently use Flink 1.9.0 with Kinesis to process data.
>>>>>>
>>>>>> We have extended data retention on the Kinesis stream, which gives us
>>>>>> 7 days of data.
>>>>>>
>>>>>> We have found that when a savepoint/checkpoint is restored that it
>>>>>> appears to be restarting the Kinesis Consumer from the start of the 
>>>>>> stream.
>>>>>> The 
>>>>>> flink_taskmanager_job_task_operator_KinesisConsumer_stream_shardId_millisBehindLatest
>>>>>> property reports to Prometheus that it is behind by 7 days when the 
>>>>>> process
>>>>>> starts back up from a savepoint.
>>>>>>
>>>>>> We have some logs that say:
>>>>>>
>>>>>> Subtask 3 will start consuming seeded shard 
>>>>>> StreamShardHandle{streamName='TheStream',
>>>>>> shard='{ShardId: shardId-000000000083,HashKeyRange: {StartingHashKey:
>>>>>> 220651847300296034902031972006537199616,EndingHashKey:
>>>>>> 223310303291865866647839586127097888767},SequenceNumberRange:
>>>>>> {StartingSequenceNumber:
>>>>>> 49597946220601502339755334362523522663986150244033234226,}}'} from 
>>>>>> sequence
>>>>>> number EARLIEST_SEQUENCE_NUM with ShardConsumer 20
>>>>>>
>>>>>> This seems to indicate that this shard is starting from the beginning
>>>>>> of the stream
>>>>>>
>>>>>> and some logs that say:
>>>>>> Subtask 3 will start consuming seeded shard StreamShardHandle
>>>>>> {streamName=' TheStream ', shard='{ShardId:
>>>>>> shardId-000000000087,HashKeyRange: {StartingHashKey:
>>>>>> 231285671266575361885262428488779956224,EndingHashKey:
>>>>>> 233944127258145193631070042609340645375},SequenceNumberRange:
>>>>>> {StartingSequenceNumber:
>>>>>> 49597946220690705320549456855089665537076743690057155954,}}'} from 
>>>>>> sequence
>>>>>> number 49599841594208637293623823226010128300928335129272649074 with
>>>>>> ShardConsumer 21
>>>>>>
>>>>>> This shard seems to be resuming from a specific point.
>>>>>>
>>>>>> I am assuming that this might be caused by no data being available on
>>>>>> the shard for the entire stream (possible with this application stage). 
>>>>>> Is
>>>>>> this the expected behavior? I had thought it would checkpoint with the 
>>>>>> most
>>>>>> recent sequence number, regardless of if it got data or not.
>>>>>>
>>>>>> -Steve
>>>>>>
>>>>>>
>>>>>>

Reply via email to