Hi!

Flink guarantees *eventual* consistency for systems without transactions
(by transaction I mean a system supporting writing a few records then
commit), or with transactions but users prefer latency than consistency.
That is to say, everything produced by Flink before a checkpoint is "not
secured" (if you need *strong* consistency). If a failure happens after a
checkpoint C, then everything produced by Flink after C should be ignored.
Only the success of C + 1 guarantees that all these records are now
consistent (up to C + 1).

If you prefer strong consistency and can tolerate latency of a few minutes
(the latency here is your checkpoint interval), you can try hive sink. When
writing to hive sink in a streaming job, records are only visible in hive
after the next checkpoint completes (so it is ok to process a record
several times, as long as its corresponding checkpoint hasn't completed).

group-offsets does not help in your case. There actually is an option to
commit offsets to Kafka during each checkpoint but Flink will also manage
offsets in its own state. If there is no checkpoint then group offsets
won't change.

Sharon Xie <sharon.xie...@gmail.com> 于2022年1月5日周三 13:18写道:

> Hi Caizhi,
>
> Thank you for the quick response. Can you help me understand how
> reprocessing the data with the earliest starting-offset ensures exactly
> once processing? 1st, the earliest offset could be way beyond the 1st
> record in my example since the first time the job started from the latest
> offset. 2nd, even if there are only two records in the topic, the 1st
> record was already processed before so I'd think the 1st record would be
> processed twice if the earliest offset is used.
>
> Another thing I found from the doc start reading position
> <https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#start-reading-position>
>
> >The default option value is group-offsets which indicates to consume from
> last committed offsets in ZK / Kafka brokers.
>
> It seems that there is a way to resume processing from the
> "group-offsets" where its value would be the offset of the 1st record in my
> scenario. However, I can't make it work based on my test. I'm using
> application mode deployment so my guess is that the 2nd job (in a new
> cluster) internally has a different kafka consumer group id. Any ideas to
> make it work?
>
>
> Thanks,
> Sharon
>
> On Tue, Jan 4, 2022 at 6:06 PM Caizhi Weng <tsreape...@gmail.com> wrote:
>
>> Hi!
>>
>> This is a valid case. This starting-offset is the offset for Kafka source
>> to read from when the job starts *without checkpoint*. That is to say,
>> if your job has been running for a while, completed several checkpoints and
>> then restarted, Kafka source won't read from starting-offset, but from the
>> offset logged in the checkpoint.
>>
>> As you're only processing 2 records (which I guess takes less than a
>> second) no checkpoint has been completed (if you didn't change any
>> configurations the default checkpointing interval is 10 minutes), so the
>> next time you start the same job it will still read from starting-offset,
>> which is the latest offset by your setting.
>>
>> If you would like to reprocess the second record you can set the
>> starting-offset to earliest (which is the default setting). The first
>> record will also be reprocessed but this is still valid because it is just
>> updating the result for the first record (which is the same as your
>> previous execution).
>>
>> Sharon Xie <sharon.xie...@gmail.com> 于2022年1月5日周三 02:56写道:
>>
>>> Can someone help me understand how Flink deals with the following
>>> scenario?
>>>
>>> I have a job that reads from a source Kafka (starting-offset: latest)
>>> and writes to a sink Kafka with exactly-once execution. Let's say that I
>>> have 2 records in the source. The 1st one is processed without issue and
>>> the job fails when the 2nd record is processed due to a parsing error. I
>>> want to update the job with a fix for the 2nd record and resume processing
>>> from the offset of the 2nd record.
>>>
>>> However, I can't find a way to stop the job with a savepoint because the
>>> job is in a failed state. If I just cancel the job without a savepoint, the
>>> job will start from the new "latest" offset next time I start it.
>>>
>>> Is this a valid case? If so, how to handle this case so that I can
>>> resume processing from the 2nd record's offset after I update the job?
>>>
>>>
>>> Thanks,
>>> Sharon
>>>
>>>

Reply via email to