Hi Zachary,

How did you configure the Kafka connector to commit the offsets
(periodically, on checkpoint)? One explanation for the graphs you showed is
that you enabled periodic committing of the offsets. If this
automatic commit happens between two checkpoints and you later fall back to
the earlier checkpoint, it should be possible that you see with the next
periodic committing of the offsets that it dropped. Note, that Flink does
not rely on the offset committed to the Kafka broker for fault tolerance.
It stores the actual offset in its internal state.

In order to better understand the scenario let me try to summarize it.
Periodically you restart your infrastructure and then resume the Flink job
from the latest checkpoint. You did this on the 19th of April. Then on the
27th of April you created a savepoint from the job you restarted on the
19th but was running fine since then. And then you submitted a new job
resuming from this savepoint. And all of a sudden, this new job started to
consume data from Kafka starting from the 19th of April. Is this correct?
If this happened like described, then the Flink job seems to not have made
a lot of progress since you restarted it. Without the logs it is really
hard to tell what could be the cause.

Cheers,
Till

On Wed, Apr 28, 2021 at 4:36 PM Zachary Manno <zachary.ma...@capitalone.com>
wrote:

> Hello,
> I am running Flink version 1.10 on Amazon EMR. We use RocksDB/S3 for
> state. I have "Persist Checkpoints Externally" enabled. Periodically I must
> tear down the current infrastructure and bring it back up. To do this, I
> terminate the EMR, bring up a fresh EMR cluster, and then I resume the
> Flink job from the latest checkpoint path in S3 using the "-s" method here:
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/savepoints.html#resuming-from-savepoints
>
> I last did this operation on April 19. Then, on April 27 I deployed a new
> version of the code only, using savepointing. This caused a production
> incident because it turned out that on April 19th one of the Kafka
> partition offsets was not committed currently somehow during the resuming
> from checkpoint. When the new code was deployed on the 27th a backfill of
> Kafka messages came in from the 19th to the 27th which caused the issue.
>
> I am attaching screenshots of Datadog metrics for the Kafka consumer
> metrics Flink provides. One graph is:
>
> ".KafkaConsumer.topic.{topic_name}.partition.{partition_number}.committedOffsets"
>
> And the next is:
>
> "KafkaConsumer.topic.{topic_name}.partition.{partition_number}.currentOffsets"
>
>
> The light blue line is partition 3 and that is the one that caused the
> issue. Does anyone have any insight into what could have happened? And what
> I can do to prevent this in the future? Unfortunately since the EMR was
> terminated I cannot provide the full logs. I am able to search for keywords
> or get sections since we have external Splunk logging but cannot get full
> files.
>
> Thanks for any help!!
>
> [image: Committed_Offsets.png]
> [image: Current_Offsets.png]
>
> ------------------------------
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>
>
>
>
>

Reply via email to