Do you know step by step process to reproduce this problem? -Ravi
On Wed 16 Oct, 2019, 17:40 Steven Nelson, <snel...@sourceallies.com> wrote: > I have verified this behavior in 1.9.0, 1.8.1 and 1.7.2. > > About half my shards start over at trim horizon. Why would some shard > statuses appear to not exist in a savepoints? This seems like a big problem. > > -Steve > > On Wed, Oct 16, 2019 at 12:08 AM Ravi Bhushan Ratnakar < > ravibhushanratna...@gmail.com> wrote: > >> Hi, >> >> I am also facing the same problem. I am using Flink 1.9.0 and consuming >> from Kinesis source with retention of 1 day. I am observing that when the >> job is submitted with "latest" initial stream position, the job starts well >> and keep on processing data from all the shards for very long period of >> time without any lag. When the job fails then it also recovery well with >> last successful checkpointed state. But i am also experiencing that very >> rarely when the job fails and it recovers from the last successful >> checkpointed state, i noticed a hug lag( 1 day as per retention) on one of >> the stream. For me, to reproduce this issue is still unknown to defined a >> step by step process. >> >> So far now, as per the analysis i gathered some more information by >> customizing the FlinkKinesisConsumer to put additional log message, I >> noticed that the number of shards details which is loaded from checkpoint >> data during recovering is less than than the actual number of shards in the >> stream. I have fixed number of shards in kinesis stream. >> >> i added one line of debug log at line 408 to print the size of variable " >> sequenceNumsToRestore" which was populated with shard details from >> checkpoint data. >> >> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L408 >> >> In this consumer class, when the "run" method is called, it does following >> >> - it discover shards from kinesis stream and selects all those >> shards which a subtask can scheduled >> - then one by one it iterates over the discovers shards and checks >> that whether that shards state is available in recovered state >> "sequenceNumsToRestore" >> >> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L295 >> - if it is available then it scheduled that shard with the recovered >> state >> - if it is not available in the state then it shcedule that shard >> with "EARLIEST_SEQUENCE_NUMBER" >> >> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L308 >> >> As in my case, the recovered number of shard details from the checkpoint >> data is less than the actual number of shards which results into scheduling >> those shards with earliest stream position. >> I am suspecting that somehow the checkpoint is missing state for some of >> the shards. But if this is the case then that checkpoint should had failed. >> >> Any further information to resolve this issue would be highly >> appreciated... >> >> Regards, >> Ravi >> >> On Wed, Oct 16, 2019 at 5:57 AM Yun Tang <myas...@live.com> wrote: >> >>> Hi Steven >>> >>> If you restore savepoint/checkpoint successfully, I think this might due >>> to the shard wasn't discovered in the previous run, therefore it would be >>> consumed from the beginning. Please refer to the implementation here: [1] >>> >>> [1] >>> https://github.com/apache/flink/blob/2c411686d23f456cdc502abf1c6b97a61070a17d/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L307 >>> >>> Best >>> Yun Tang >>> ------------------------------ >>> *From:* Steven Nelson <snel...@sourceallies.com> >>> *Sent:* Wednesday, October 16, 2019 4:31 >>> *To:* user <user@flink.apache.org> >>> *Subject:* Kinesis Connector and Savepoint/Checkpoint restore. >>> >>> Hello, we currently use Flink 1.9.0 with Kinesis to process data. >>> >>> We have extended data retention on the Kinesis stream, which gives us 7 >>> days of data. >>> >>> We have found that when a savepoint/checkpoint is restored that it >>> appears to be restarting the Kinesis Consumer from the start of the stream. >>> The >>> flink_taskmanager_job_task_operator_KinesisConsumer_stream_shardId_millisBehindLatest >>> property reports to Prometheus that it is behind by 7 days when the process >>> starts back up from a savepoint. >>> >>> We have some logs that say: >>> >>> Subtask 3 will start consuming seeded shard >>> StreamShardHandle{streamName='TheStream', >>> shard='{ShardId: shardId-000000000083,HashKeyRange: {StartingHashKey: >>> 220651847300296034902031972006537199616,EndingHashKey: >>> 223310303291865866647839586127097888767},SequenceNumberRange: >>> {StartingSequenceNumber: >>> 49597946220601502339755334362523522663986150244033234226,}}'} from sequence >>> number EARLIEST_SEQUENCE_NUM with ShardConsumer 20 >>> >>> This seems to indicate that this shard is starting from the beginning of >>> the stream >>> >>> and some logs that say: >>> Subtask 3 will start consuming seeded shard StreamShardHandle >>> {streamName=' TheStream ', shard='{ShardId: >>> shardId-000000000087,HashKeyRange: {StartingHashKey: >>> 231285671266575361885262428488779956224,EndingHashKey: >>> 233944127258145193631070042609340645375},SequenceNumberRange: >>> {StartingSequenceNumber: >>> 49597946220690705320549456855089665537076743690057155954,}}'} from sequence >>> number 49599841594208637293623823226010128300928335129272649074 with >>> ShardConsumer 21 >>> >>> This shard seems to be resuming from a specific point. >>> >>> I am assuming that this might be caused by no data being available on >>> the shard for the entire stream (possible with this application stage). Is >>> this the expected behavior? I had thought it would checkpoint with the most >>> recent sequence number, regardless of if it got data or not. >>> >>> -Steve >>> >>> >>>