Hi Caizhi,

Thank you for the quick response. Can you help me understand how
reprocessing the data with the earliest starting-offset ensures exactly
once processing? 1st, the earliest offset could be way beyond the 1st
record in my example since the first time the job started from the latest
offset. 2nd, even if there are only two records in the topic, the 1st
record was already processed before so I'd think the 1st record would be
processed twice if the earliest offset is used.

Another thing I found from the doc start reading position
<https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#start-reading-position>

>The default option value is group-offsets which indicates to consume from
last committed offsets in ZK / Kafka brokers.

It seems that there is a way to resume processing from the  "group-offsets"
where its value would be the offset of the 1st record in my scenario.
However, I can't make it work based on my test. I'm using application mode
deployment so my guess is that the 2nd job (in a new cluster) internally
has a different kafka consumer group id. Any ideas to make it work?


Thanks,
Sharon

On Tue, Jan 4, 2022 at 6:06 PM Caizhi Weng <tsreape...@gmail.com> wrote:

> Hi!
>
> This is a valid case. This starting-offset is the offset for Kafka source
> to read from when the job starts *without checkpoint*. That is to say, if
> your job has been running for a while, completed several checkpoints and
> then restarted, Kafka source won't read from starting-offset, but from the
> offset logged in the checkpoint.
>
> As you're only processing 2 records (which I guess takes less than a
> second) no checkpoint has been completed (if you didn't change any
> configurations the default checkpointing interval is 10 minutes), so the
> next time you start the same job it will still read from starting-offset,
> which is the latest offset by your setting.
>
> If you would like to reprocess the second record you can set the
> starting-offset to earliest (which is the default setting). The first
> record will also be reprocessed but this is still valid because it is just
> updating the result for the first record (which is the same as your
> previous execution).
>
> Sharon Xie <sharon.xie...@gmail.com> 于2022年1月5日周三 02:56写道:
>
>> Can someone help me understand how Flink deals with the following
>> scenario?
>>
>> I have a job that reads from a source Kafka (starting-offset: latest) and
>> writes to a sink Kafka with exactly-once execution. Let's say that I have 2
>> records in the source. The 1st one is processed without issue and the job
>> fails when the 2nd record is processed due to a parsing error. I want to
>> update the job with a fix for the 2nd record and resume processing from the
>> offset of the 2nd record.
>>
>> However, I can't find a way to stop the job with a savepoint because the
>> job is in a failed state. If I just cancel the job without a savepoint, the
>> job will start from the new "latest" offset next time I start it.
>>
>> Is this a valid case? If so, how to handle this case so that I can resume
>> processing from the 2nd record's offset after I update the job?
>>
>>
>> Thanks,
>> Sharon
>>
>>

Reply via email to