Hi Piotr, thanks for your insights.

> What’s your KafkaConsumer configuration?

We only set these in the properties that are passed to
FlinkKafkaConsumer010 constructor:

auto.offset.reset=latest
bootstrap.servers=my-kafka-host:9092
group.id=my_group
flink.partition-discovery.interval-millis=30000

> is checkpointing enabled?

No.

> enable.auto.commit (or auto.commit.enable for Kafka 0.8) /
auto.commit.interval.ms

We have whatever is the default behaviour of Flink kafka consumer. It seems
to commit quite often, something like every 5 seconds.

> did you set setCommitOffsetsOnCheckpoints() ?

No. But I checked with debugger that apparently enableCommitOnCheckpoints=true
is the default.

I also checked with debugger that offsetCommitMode=KAFKA_PERIODIC.

So I guess you're right that this bug doesn't seem to be in Flink itself? I
wonder if it's a known issue in Kafka client lib..

I also took thread dump on one of the task managers in this broken state.
But I couldn't spot anything obvious when comparing the threads to a dump
from a job where offsets are being committed. Any way I've saved the thread
dump in case there's something to look for specifically.

Sharing the full logs of job & task managers would be a bit of a hassle,
because I don't have an automatic way to obfuscate the logs so that I'm
sure that there isn't anything sensitive left. Any way, there isn't
anything else to share really. I wrote: "As you can see, it didn't log
anything until ~2018-06-07 22:08. Also that's where the log ends".

Thanks once more.

On Mon, Jun 11, 2018 at 11:18 AM, Piotr Nowojski <pi...@data-artisans.com>
wrote:

> Hi,
>
> What’s your KafkaConsumer configuration? Especially values for:
> - is checkpointing enabled?
> - enable.auto.commit (or auto.commit.enable for Kafka 0.8) /
> auto.commit.interval.ms
> - did you set setCommitOffsetsOnCheckpoints() ?
>
> Please also refer to https://ci.apache.org/proje
> cts/flink/flink-docs-release-1.4/dev/connectors/kafka.html#k
> afka-consumers-offset-committing-behaviour-configuration , especially
> this part:
>
> > Note that the Flink Kafka Consumer does not rely on the committed
> offsets for fault tolerance guarantees. The committed offsets are only a
> means to expose the consumer’s progress for monitoring purposes.
>
> Can you post full logs from all TaskManagers/JobManager and can you
> say/estimate when did the committing brake/stop? Did you check Kafka logs
> for any errors?
>
> To me it seems more like a Kafka issue/bug:
> https://community.cloudera.com/t5/Data-Ingestion-Integration
> /Cannot-auto-commit-offsets-for-group-console-consumer-79720/m-p/51188
> https://stackoverflow.com/questions/42362911/kafka-high-leve
> l-consumer-error-code-15/42416232#42416232
> Especially that in your case this offsets committing is superseded by
> Kafka coordinator failure.
>
> Piotrek
>
>
> On 8 Jun 2018, at 10:05, Juho Autio <juho.au...@rovio.com> wrote:
>
> Hi,
>
> We have a Flink stream job that uses Flink kafka consumer. Normally it
> commits consumer offsets to Kafka.
>
> However this stream ended up in a state where it's otherwise working just
> fine, but it isn't committing offsets to Kafka any more. The job keeps
> writing correct aggregation results to the sink, though. At the time of
> writing this, the job has been running 14 hours without committing offsets.
>
> Below is an extract from taskmanager.log. As you can see, it didn't log
> anything until ~2018-06-07 22:08. Also that's where the log ends, these are
> the last lines so far.
>
> Could you help check if this is a know bug, possibly already fixed, or
> something new?
>
> I'm using a self-built Flink package 1.5-SNAPSHOT, flink commit
> 8395508b0401353ed07375e22882e7581d46ac0e which is not super old.
>
> Cheers,
> Juho
>
> 2018-06-06 10:01:33,498 INFO  org.apache.kafka.common.utils.AppInfoParser
>                  - Kafka version : 0.10.2.1
> 2018-06-06 10:01:33,498 INFO  org.apache.kafka.common.utils.AppInfoParser
>                  - Kafka commitId : e89bffd6b2eff799
> 2018-06-06 10:01:33,560 INFO  org.apache.kafka.clients.consu
> mer.internals.AbstractCoordinator  - Discovered coordinator
> my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id:
> 2147483550 rack: null) for group aggregate-all_server_measureme
> nts_combined-20180606-1000.
> 2018-06-06 10:01:33,563 INFO  org.apache.kafka.clients.consu
> mer.internals.AbstractCoordinator  - Discovered coordinator
> my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id:
> 2147483550 rack: null) for group aggregate-all_server_measureme
> nts_combined-20180606-1000.
> 2018-06-07 22:08:28,773 INFO  org.apache.kafka.clients.consu
> mer.internals.AbstractCoordinator  - Marking the coordinator
> my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id:
> 2147483550 rack: null) dead for group aggregate-all_server_measureme
> nts_combined-20180606-1000
> 2018-06-07 22:08:28,776 WARN  org.apache.kafka.clients.consu
> mer.internals.ConsumerCoordinator  - Auto-commit of offsets
> {topic1-2=OffsetAndMetadata{offset=12300395550, metadata=''},
> topic1-18=OffsetAndMetadata{offset=12299210444, metadata=''},
> topic3-0=OffsetAndMetadata{offset=5064277287, metadata=''},
> topic4-6=OffsetAndMetadata{offset=5492398559, metadata=''},
> topic2-1=OffsetAndMetadata{offset=89817267, metadata=''},
> topic1-10=OffsetAndMetadata{offset=12299742352, metadata=''}} failed for
> group aggregate-all_server_measurements_combined-20180606-1000: Offset
> commit failed with a retriable exception. You should retry committing
> offsets.
> 2018-06-07 22:08:29,840 INFO  org.apache.kafka.clients.consu
> mer.internals.AbstractCoordinator  - Marking the coordinator
> my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id:
> 2147483550 rack: null) dead for group aggregate-all_server_measureme
> nts_combined-20180606-1000
> 2018-06-07 22:08:29,841 WARN  org.apache.kafka.clients.consu
> mer.internals.ConsumerCoordinator  - Auto-commit of offsets
> {topic1-6=OffsetAndMetadata{offset=12298347875, metadata=''},
> topic4-2=OffsetAndMetadata{offset=5492779112, metadata=''},
> topic1-14=OffsetAndMetadata{offset=12299972108, metadata=''}} failed for
> group aggregate-all_server_measurements_combined-20180606-1000: Offset
> commit failed with a retriable exception. You should retry committing
> offsets.
>
>
>

Reply via email to