Thanks again the for the detailed info. It makes a lot of sense.

One last question, can me create a checkpoint as soon as a job starts? In
this case, the first record’s offset will be in the checkpoint state and
this will provide “strong” guarantee as you said. Did I miss anything? I
read the code and realized that checkpointing happens in a separate thread
that’s why there isn’t the “initial” checkpoint now. But I want to know if
my idea would work in theory.

On Wed, Jan 5, 2022 at 11:36 PM Caizhi Weng <tsreape...@gmail.com> wrote:

> Hi!
>
> Also note that although this eventual consistency seems not good enough,
> but for 99.99% of the time the job can run smoothly without failure. In
> this case the records are correct and good. Only in the 0.01% case when the
> job fails will user see inconsistency for a small period of time (for a
> checkpoint interval). If the user can tolerate this 0.01% chance of
> inconsistency then he can get this very low latency and mostly correct data.
>
> Caizhi Weng <tsreape...@gmail.com> 于2022年1月6日周四 10:40写道:
>
>> Hi!
>>
>> Flink guarantees *eventual* consistency for systems without transactions
>> (by transaction I mean a system supporting writing a few records then
>> commit), or with transactions but users prefer latency than consistency.
>> That is to say, everything produced by Flink before a checkpoint is "not
>> secured" (if you need *strong* consistency). If a failure happens after
>> a checkpoint C, then everything produced by Flink after C should be
>> ignored. Only the success of C + 1 guarantees that all these records are
>> now consistent (up to C + 1).
>>
>> If you prefer strong consistency and can tolerate latency of a few
>> minutes (the latency here is your checkpoint interval), you can try hive
>> sink. When writing to hive sink in a streaming job, records are only
>> visible in hive after the next checkpoint completes (so it is ok to process
>> a record several times, as long as its corresponding checkpoint hasn't
>> completed).
>>
>> group-offsets does not help in your case. There actually is an option to
>> commit offsets to Kafka during each checkpoint but Flink will also manage
>> offsets in its own state. If there is no checkpoint then group offsets
>> won't change.
>>
>> Sharon Xie <sharon.xie...@gmail.com> 于2022年1月5日周三 13:18写道:
>>
>>> Hi Caizhi,
>>>
>>> Thank you for the quick response. Can you help me understand how
>>> reprocessing the data with the earliest starting-offset ensures exactly
>>> once processing? 1st, the earliest offset could be way beyond the 1st
>>> record in my example since the first time the job started from the latest
>>> offset. 2nd, even if there are only two records in the topic, the 1st
>>> record was already processed before so I'd think the 1st record would be
>>> processed twice if the earliest offset is used.
>>>
>>> Another thing I found from the doc start reading position
>>> <https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#start-reading-position>
>>>
>>> >The default option value is group-offsets which indicates to consume
>>> from last committed offsets in ZK / Kafka brokers.
>>>
>>> It seems that there is a way to resume processing from the
>>> "group-offsets" where its value would be the offset of the 1st record in my
>>> scenario. However, I can't make it work based on my test. I'm using
>>> application mode deployment so my guess is that the 2nd job (in a new
>>> cluster) internally has a different kafka consumer group id. Any ideas to
>>> make it work?
>>>
>>>
>>> Thanks,
>>> Sharon
>>>
>>> On Tue, Jan 4, 2022 at 6:06 PM Caizhi Weng <tsreape...@gmail.com> wrote:
>>>
>>>> Hi!
>>>>
>>>> This is a valid case. This starting-offset is the offset for Kafka
>>>> source to read from when the job starts *without checkpoint*. That is
>>>> to say, if your job has been running for a while, completed several
>>>> checkpoints and then restarted, Kafka source won't read from
>>>> starting-offset, but from the offset logged in the checkpoint.
>>>>
>>>> As you're only processing 2 records (which I guess takes less than a
>>>> second) no checkpoint has been completed (if you didn't change any
>>>> configurations the default checkpointing interval is 10 minutes), so the
>>>> next time you start the same job it will still read from starting-offset,
>>>> which is the latest offset by your setting.
>>>>
>>>> If you would like to reprocess the second record you can set the
>>>> starting-offset to earliest (which is the default setting). The first
>>>> record will also be reprocessed but this is still valid because it is just
>>>> updating the result for the first record (which is the same as your
>>>> previous execution).
>>>>
>>>> Sharon Xie <sharon.xie...@gmail.com> 于2022年1月5日周三 02:56写道:
>>>>
>>>>> Can someone help me understand how Flink deals with the following
>>>>> scenario?
>>>>>
>>>>> I have a job that reads from a source Kafka (starting-offset: latest)
>>>>> and writes to a sink Kafka with exactly-once execution. Let's say that I
>>>>> have 2 records in the source. The 1st one is processed without issue and
>>>>> the job fails when the 2nd record is processed due to a parsing error. I
>>>>> want to update the job with a fix for the 2nd record and resume processing
>>>>> from the offset of the 2nd record.
>>>>>
>>>>> However, I can't find a way to stop the job with a savepoint because
>>>>> the job is in a failed state. If I just cancel the job without a 
>>>>> savepoint,
>>>>> the job will start from the new "latest" offset next time I start it.
>>>>>
>>>>> Is this a valid case? If so, how to handle this case so that I can
>>>>> resume processing from the 2nd record's offset after I update the job?
>>>>>
>>>>>
>>>>> Thanks,
>>>>> Sharon
>>>>>
>>>>>

Reply via email to