Hi Aljoscha,

Thank you for the help and reply.

1. I think we have finally pinpointed what the root cause to this issue is:
When partitions are assigned manually (e.g. with assign() API instead
subscribe() API) the client will not try to rediscover the coordinator if
it dies [1]. This seems to no longer be an issue after Kafka 1.1.0
After cherry-picking the PR[2] back to Kafka 0.11.x branch and package it
with our Flink application, we haven't seen this issue re-occurred so far.

2. The GROUP_OFFSETS is in fact the default startup mode if Checkpoint is
not enable - that's why I was a bit surprise that this problem was reported
so many times.
To follow up on the question "whether resuming from GROUP_OFFSETS are
useful": there are definitely use cases where users don't want to use
checkpointing (e.g. due to resource constraint, storage cost consideration,
etc), but somehow still want to avoid a certain amount of data loss. Most
of our analytics use cases falls into this category.


--
Rong


[1] https://issues.apache.org/jira/browse/KAFKA-6362
[2] https://github.com/apache/kafka/pull/4326


On Wed, Mar 11, 2020 at 10:16 AM Aljoscha Krettek <aljos...@apache.org>
wrote:

> On 09.03.20 06:10, Rong Rong wrote:
> > - Is this feature (disabling checkpoint and restarting job from Kafka
> > committed GROUP_OFFSET) not supported?
>
> I believe the Flink community never put much (any?) effort into this
> because the Flink Kafka Consumer does its own offset handling. Starting
> from the committed offsets should work fine, though, the default startup
> mode is even StartupMode.GROUP_OFFSETS.
>
> > - How does Flink-Kafka actually handles auto-commit to coordinator given
> > the fact that Flink ignores the coordinator assignments and uses
> > self-assigning partitions instead?
>
> I think we don't do anything for this case, the Kafka Consumer code will
> do the committing if 'enable.auto.commit' is set. I don't know how this
> will play with out code because we disable the automatic group handling.
>
> Do you think letting Kafka do the auto committing is ever useful, if you
> have a Flink job that does checkpoints you will get the correct offset
> committing and you can start a job from the committed offsets. In what
> cases would you want to use the builtin Kafka offset committing?
>
> Best,
> Aljoscha
>

Reply via email to