Thanks again the for the detailed info. It makes a lot of sense. One last question, can me create a checkpoint as soon as a job starts? In this case, the first record’s offset will be in the checkpoint state and this will provide “strong” guarantee as you said. Did I miss anything? I read the code and realized that checkpointing happens in a separate thread that’s why there isn’t the “initial” checkpoint now. But I want to know if my idea would work in theory.
On Wed, Jan 5, 2022 at 11:36 PM Caizhi Weng <tsreape...@gmail.com> wrote: > Hi! > > Also note that although this eventual consistency seems not good enough, > but for 99.99% of the time the job can run smoothly without failure. In > this case the records are correct and good. Only in the 0.01% case when the > job fails will user see inconsistency for a small period of time (for a > checkpoint interval). If the user can tolerate this 0.01% chance of > inconsistency then he can get this very low latency and mostly correct data. > > Caizhi Weng <tsreape...@gmail.com> 于2022年1月6日周四 10:40写道: > >> Hi! >> >> Flink guarantees *eventual* consistency for systems without transactions >> (by transaction I mean a system supporting writing a few records then >> commit), or with transactions but users prefer latency than consistency. >> That is to say, everything produced by Flink before a checkpoint is "not >> secured" (if you need *strong* consistency). If a failure happens after >> a checkpoint C, then everything produced by Flink after C should be >> ignored. Only the success of C + 1 guarantees that all these records are >> now consistent (up to C + 1). >> >> If you prefer strong consistency and can tolerate latency of a few >> minutes (the latency here is your checkpoint interval), you can try hive >> sink. When writing to hive sink in a streaming job, records are only >> visible in hive after the next checkpoint completes (so it is ok to process >> a record several times, as long as its corresponding checkpoint hasn't >> completed). >> >> group-offsets does not help in your case. There actually is an option to >> commit offsets to Kafka during each checkpoint but Flink will also manage >> offsets in its own state. If there is no checkpoint then group offsets >> won't change. >> >> Sharon Xie <sharon.xie...@gmail.com> 于2022年1月5日周三 13:18写道: >> >>> Hi Caizhi, >>> >>> Thank you for the quick response. Can you help me understand how >>> reprocessing the data with the earliest starting-offset ensures exactly >>> once processing? 1st, the earliest offset could be way beyond the 1st >>> record in my example since the first time the job started from the latest >>> offset. 2nd, even if there are only two records in the topic, the 1st >>> record was already processed before so I'd think the 1st record would be >>> processed twice if the earliest offset is used. >>> >>> Another thing I found from the doc start reading position >>> <https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#start-reading-position> >>> >>> >The default option value is group-offsets which indicates to consume >>> from last committed offsets in ZK / Kafka brokers. >>> >>> It seems that there is a way to resume processing from the >>> "group-offsets" where its value would be the offset of the 1st record in my >>> scenario. However, I can't make it work based on my test. I'm using >>> application mode deployment so my guess is that the 2nd job (in a new >>> cluster) internally has a different kafka consumer group id. Any ideas to >>> make it work? >>> >>> >>> Thanks, >>> Sharon >>> >>> On Tue, Jan 4, 2022 at 6:06 PM Caizhi Weng <tsreape...@gmail.com> wrote: >>> >>>> Hi! >>>> >>>> This is a valid case. This starting-offset is the offset for Kafka >>>> source to read from when the job starts *without checkpoint*. That is >>>> to say, if your job has been running for a while, completed several >>>> checkpoints and then restarted, Kafka source won't read from >>>> starting-offset, but from the offset logged in the checkpoint. >>>> >>>> As you're only processing 2 records (which I guess takes less than a >>>> second) no checkpoint has been completed (if you didn't change any >>>> configurations the default checkpointing interval is 10 minutes), so the >>>> next time you start the same job it will still read from starting-offset, >>>> which is the latest offset by your setting. >>>> >>>> If you would like to reprocess the second record you can set the >>>> starting-offset to earliest (which is the default setting). The first >>>> record will also be reprocessed but this is still valid because it is just >>>> updating the result for the first record (which is the same as your >>>> previous execution). >>>> >>>> Sharon Xie <sharon.xie...@gmail.com> 于2022年1月5日周三 02:56写道: >>>> >>>>> Can someone help me understand how Flink deals with the following >>>>> scenario? >>>>> >>>>> I have a job that reads from a source Kafka (starting-offset: latest) >>>>> and writes to a sink Kafka with exactly-once execution. Let's say that I >>>>> have 2 records in the source. The 1st one is processed without issue and >>>>> the job fails when the 2nd record is processed due to a parsing error. I >>>>> want to update the job with a fix for the 2nd record and resume processing >>>>> from the offset of the 2nd record. >>>>> >>>>> However, I can't find a way to stop the job with a savepoint because >>>>> the job is in a failed state. If I just cancel the job without a >>>>> savepoint, >>>>> the job will start from the new "latest" offset next time I start it. >>>>> >>>>> Is this a valid case? If so, how to handle this case so that I can >>>>> resume processing from the 2nd record's offset after I update the job? >>>>> >>>>> >>>>> Thanks, >>>>> Sharon >>>>> >>>>>