[ https://issues.apache.org/jira/browse/KAFKA-6362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16357298#comment-16357298 ]
ASF GitHub Bot commented on KAFKA-6362: --------------------------------------- hachikuji closed pull request #4326: KAFKA-6362: maybeAutoCommitOffsetsAsync should try to discover coordinator URL: https://github.com/apache/kafka/pull/4326 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 1d84f847cd8..2f7fd58a66f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -1058,7 +1058,7 @@ public void assign(Collection<TopicPartition> partitions) { // make sure the offsets of topic partitions the consumer is unsubscribing from // are committed since there will be no following rebalance - this.coordinator.maybeAutoCommitOffsetsNow(); + this.coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds()); log.debug("Subscribed to partition(s): {}", Utils.join(partitions, ", ")); this.subscriptions.assignFromUser(new HashSet<>(partitions)); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 5c1e60eee82..d7c1ce9966e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -528,6 +528,7 @@ public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offs public void onSuccess(Void value) { pendingAsyncCommits.decrementAndGet(); doCommitOffsetsAsync(offsets, callback); + client.pollNoWakeup(); } @Override @@ -623,20 +624,10 @@ public boolean commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets, return false; } - private void maybeAutoCommitOffsetsAsync(long now) { - if (autoCommitEnabled) { - if (coordinatorUnknown()) { - this.nextAutoCommitDeadline = now + retryBackoffMs; - } else if (now >= nextAutoCommitDeadline) { - this.nextAutoCommitDeadline = now + autoCommitIntervalMs; - doAutoCommitOffsetsAsync(); - } - } - } - - public void maybeAutoCommitOffsetsNow() { - if (autoCommitEnabled && !coordinatorUnknown()) + public void maybeAutoCommitOffsetsAsync(long now) { + if (autoCommitEnabled && now >= nextAutoCommitDeadline) { doAutoCommitOffsetsAsync(); + } } private void doAutoCommitOffsetsAsync() { @@ -650,8 +641,11 @@ public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception log.warn("Asynchronous auto-commit of offsets {} failed: {}", offsets, exception.getMessage()); if (exception instanceof RetriableException) nextAutoCommitDeadline = Math.min(time.milliseconds() + retryBackoffMs, nextAutoCommitDeadline); + else + nextAutoCommitDeadline = time.milliseconds() + autoCommitIntervalMs; } else { log.debug("Completed asynchronous auto-commit of offsets {}", offsets); + nextAutoCommitDeadline = time.milliseconds() + autoCommitIntervalMs; } } }); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 76301a71bea..c49339b6525 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -1625,6 +1625,25 @@ public void testHeartbeatThreadClose() throws Exception { assertFalse("Heartbeat thread active after close", threads[i].getName().contains(groupId)); } + @Test + public void testAutoCommitAfterCoordinatorBackToService() { + ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors, + ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true, true); + subscriptions.assignFromUser(Collections.singleton(t1p)); + subscriptions.seek(t1p, 100L); + + coordinator.coordinatorDead(); + assertTrue(coordinator.coordinatorUnknown()); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE))); + + // async commit offset should find coordinator + time.sleep(autoCommitIntervalMs); // sleep for a while to ensure auto commit does happen + coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds()); + assertFalse(coordinator.coordinatorUnknown()); + assertEquals(subscriptions.committed(t1p).offset(), 100L); + } + private ConsumerCoordinator prepareCoordinatorForCloseTest(final boolean useGroupManagement, final boolean autoCommit, final boolean leaveGroup) { ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > auto commit not work since coordinatorUnknown() is always true. > --------------------------------------------------------------- > > Key: KAFKA-6362 > URL: https://issues.apache.org/jira/browse/KAFKA-6362 > Project: Kafka > Issue Type: Bug > Components: clients > Affects Versions: 0.10.2.1 > Reporter: Renkai Ge > Assignee: huxihx > Priority: Major > > {code} > [2017-12-14 20:09:23.501] [Kafka 0.10 Fetcher for Source: > source_bj-docker-large (14/40)] INFO > org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: > auto.commit.interval.ms = 5000 > auto.offset.reset = latest > bootstrap.servers = [11.192.77.42:3002, 11.192.73.43:3002, > 11.192.73.66:3002] > check.crcs = true > client.id = > connections.max.idle.ms = 540000 > enable.auto.commit = true > exclude.internal.topics = true > fetch.max.bytes = 52428800 > fetch.max.wait.ms = 500 > fetch.min.bytes = 1 > group.id = tcprtdetail_flink > heartbeat.interval.ms = 3000 > interceptor.classes = null > key.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > max.partition.fetch.bytes = 1048576 > max.poll.interval.ms = 300000 > max.poll.records = 500 > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 30000 > partition.assignment.strategy = [class > org.apache.kafka.clients.consumer.RangeAssignor] > receive.buffer.bytes = 65536 > reconnect.backoff.ms = 50 > request.timeout.ms = 305000 > retry.backoff.ms = 100 > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 60000 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.mechanism = GSSAPI > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > session.timeout.ms = 10000 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = null > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > value.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > [2017-12-14 20:09:23.502] [Kafka 0.10 Fetcher for Source: > source_bj-docker-large (14/40)] INFO > org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.2.1 > [2017-12-14 20:09:23.502] [Kafka 0.10 Fetcher for Source: > source_bj-docker-large (14/40)] INFO > org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : > e89bffd6b2eff799 > {code} > My kafka java client cannot auto commit.After add some debug log,I found that > the coordinatorUnknown() function in > [ConsumerCoordinator.java#L604|https://github.com/apache/kafka/blob/0.10.2.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L604] > always returns true,and nextAutoCommitDeadline just increases > infinitly.Should there be a lookupCoordinator() after line 604 like in > [ConsumerCoordinator.java#L508|https://github.com/apache/kafka/blob/0.10.2.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L508]?After > I add lookupCoordinator() next to line 604.The consumer can auto commit > offset properly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)