Hi Steven

If you restore savepoint/checkpoint successfully, I think this might due to the 
shard wasn't discovered in the previous run, therefore it would be consumed 
from the beginning. Please refer to the implementation here: [1]

[1] 
https://github.com/apache/flink/blob/2c411686d23f456cdc502abf1c6b97a61070a17d/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L307

Best
Yun Tang
________________________________
From: Steven Nelson <snel...@sourceallies.com>
Sent: Wednesday, October 16, 2019 4:31
To: user <user@flink.apache.org>
Subject: Kinesis Connector and Savepoint/Checkpoint restore.

Hello, we currently use Flink 1.9.0 with Kinesis to process data.

We have extended data retention on the Kinesis stream, which gives us 7 days of 
data.

We have found that when a savepoint/checkpoint is restored that it appears to 
be restarting the Kinesis Consumer from the start of the stream. The 
flink_taskmanager_job_task_operator_KinesisConsumer_stream_shardId_millisBehindLatest
 property reports to Prometheus that it is behind by 7 days when the process 
starts back up from a savepoint.

We have some logs that say:

Subtask 3 will start consuming seeded shard 
StreamShardHandle{streamName='TheStream', shard='{ShardId: 
shardId-000000000083,HashKeyRange: {StartingHashKey: 
220651847300296034902031972006537199616,EndingHashKey: 
223310303291865866647839586127097888767},SequenceNumberRange: 
{StartingSequenceNumber: 
49597946220601502339755334362523522663986150244033234226,}}'} from sequence 
number EARLIEST_SEQUENCE_NUM with ShardConsumer 20

This seems to indicate that this shard is starting from the beginning of the 
stream

and some logs that say:
Subtask 3 will start consuming seeded shard StreamShardHandle{streamName=' 
TheStream ', shard='{ShardId: shardId-000000000087,HashKeyRange: 
{StartingHashKey: 231285671266575361885262428488779956224,EndingHashKey: 
233944127258145193631070042609340645375},SequenceNumberRange: 
{StartingSequenceNumber: 
49597946220690705320549456855089665537076743690057155954,}}'} from sequence 
number 49599841594208637293623823226010128300928335129272649074 with 
ShardConsumer 21

This shard seems to be resuming from a specific point.

I am assuming that this might be caused by no data being available on the shard 
for the entire stream (possible with this application stage). Is this the 
expected behavior? I had thought it would checkpoint with the most recent 
sequence number, regardless of if it got data or not.

-Steve


Reply via email to