Hi Piotr, thanks for your insights. > What’s your KafkaConsumer configuration?
We only set these in the properties that are passed to FlinkKafkaConsumer010 constructor: auto.offset.reset=latest bootstrap.servers=my-kafka-host:9092 group.id=my_group flink.partition-discovery.interval-millis=30000 > is checkpointing enabled? No. > enable.auto.commit (or auto.commit.enable for Kafka 0.8) / auto.commit.interval.ms We have whatever is the default behaviour of Flink kafka consumer. It seems to commit quite often, something like every 5 seconds. > did you set setCommitOffsetsOnCheckpoints() ? No. But I checked with debugger that apparently enableCommitOnCheckpoints=true is the default. I also checked with debugger that offsetCommitMode=KAFKA_PERIODIC. So I guess you're right that this bug doesn't seem to be in Flink itself? I wonder if it's a known issue in Kafka client lib.. I also took thread dump on one of the task managers in this broken state. But I couldn't spot anything obvious when comparing the threads to a dump from a job where offsets are being committed. Any way I've saved the thread dump in case there's something to look for specifically. Sharing the full logs of job & task managers would be a bit of a hassle, because I don't have an automatic way to obfuscate the logs so that I'm sure that there isn't anything sensitive left. Any way, there isn't anything else to share really. I wrote: "As you can see, it didn't log anything until ~2018-06-07 22:08. Also that's where the log ends". Thanks once more. On Mon, Jun 11, 2018 at 11:18 AM, Piotr Nowojski <pi...@data-artisans.com> wrote: > Hi, > > What’s your KafkaConsumer configuration? Especially values for: > - is checkpointing enabled? > - enable.auto.commit (or auto.commit.enable for Kafka 0.8) / > auto.commit.interval.ms > - did you set setCommitOffsetsOnCheckpoints() ? > > Please also refer to https://ci.apache.org/proje > cts/flink/flink-docs-release-1.4/dev/connectors/kafka.html#k > afka-consumers-offset-committing-behaviour-configuration , especially > this part: > > > Note that the Flink Kafka Consumer does not rely on the committed > offsets for fault tolerance guarantees. The committed offsets are only a > means to expose the consumer’s progress for monitoring purposes. > > Can you post full logs from all TaskManagers/JobManager and can you > say/estimate when did the committing brake/stop? Did you check Kafka logs > for any errors? > > To me it seems more like a Kafka issue/bug: > https://community.cloudera.com/t5/Data-Ingestion-Integration > /Cannot-auto-commit-offsets-for-group-console-consumer-79720/m-p/51188 > https://stackoverflow.com/questions/42362911/kafka-high-leve > l-consumer-error-code-15/42416232#42416232 > Especially that in your case this offsets committing is superseded by > Kafka coordinator failure. > > Piotrek > > > On 8 Jun 2018, at 10:05, Juho Autio <juho.au...@rovio.com> wrote: > > Hi, > > We have a Flink stream job that uses Flink kafka consumer. Normally it > commits consumer offsets to Kafka. > > However this stream ended up in a state where it's otherwise working just > fine, but it isn't committing offsets to Kafka any more. The job keeps > writing correct aggregation results to the sink, though. At the time of > writing this, the job has been running 14 hours without committing offsets. > > Below is an extract from taskmanager.log. As you can see, it didn't log > anything until ~2018-06-07 22:08. Also that's where the log ends, these are > the last lines so far. > > Could you help check if this is a know bug, possibly already fixed, or > something new? > > I'm using a self-built Flink package 1.5-SNAPSHOT, flink commit > 8395508b0401353ed07375e22882e7581d46ac0e which is not super old. > > Cheers, > Juho > > 2018-06-06 10:01:33,498 INFO org.apache.kafka.common.utils.AppInfoParser > - Kafka version : 0.10.2.1 > 2018-06-06 10:01:33,498 INFO org.apache.kafka.common.utils.AppInfoParser > - Kafka commitId : e89bffd6b2eff799 > 2018-06-06 10:01:33,560 INFO org.apache.kafka.clients.consu > mer.internals.AbstractCoordinator - Discovered coordinator > my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id: > 2147483550 rack: null) for group aggregate-all_server_measureme > nts_combined-20180606-1000. > 2018-06-06 10:01:33,563 INFO org.apache.kafka.clients.consu > mer.internals.AbstractCoordinator - Discovered coordinator > my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id: > 2147483550 rack: null) for group aggregate-all_server_measureme > nts_combined-20180606-1000. > 2018-06-07 22:08:28,773 INFO org.apache.kafka.clients.consu > mer.internals.AbstractCoordinator - Marking the coordinator > my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id: > 2147483550 rack: null) dead for group aggregate-all_server_measureme > nts_combined-20180606-1000 > 2018-06-07 22:08:28,776 WARN org.apache.kafka.clients.consu > mer.internals.ConsumerCoordinator - Auto-commit of offsets > {topic1-2=OffsetAndMetadata{offset=12300395550, metadata=''}, > topic1-18=OffsetAndMetadata{offset=12299210444, metadata=''}, > topic3-0=OffsetAndMetadata{offset=5064277287, metadata=''}, > topic4-6=OffsetAndMetadata{offset=5492398559, metadata=''}, > topic2-1=OffsetAndMetadata{offset=89817267, metadata=''}, > topic1-10=OffsetAndMetadata{offset=12299742352, metadata=''}} failed for > group aggregate-all_server_measurements_combined-20180606-1000: Offset > commit failed with a retriable exception. You should retry committing > offsets. > 2018-06-07 22:08:29,840 INFO org.apache.kafka.clients.consu > mer.internals.AbstractCoordinator - Marking the coordinator > my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id: > 2147483550 rack: null) dead for group aggregate-all_server_measureme > nts_combined-20180606-1000 > 2018-06-07 22:08:29,841 WARN org.apache.kafka.clients.consu > mer.internals.ConsumerCoordinator - Auto-commit of offsets > {topic1-6=OffsetAndMetadata{offset=12298347875, metadata=''}, > topic4-2=OffsetAndMetadata{offset=5492779112, metadata=''}, > topic1-14=OffsetAndMetadata{offset=12299972108, metadata=''}} failed for > group aggregate-all_server_measurements_combined-20180606-1000: Offset > commit failed with a retriable exception. You should retry committing > offsets. > > >