[ 
https://issues.apache.org/jira/browse/KAFKA-6362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16357298#comment-16357298
 ] 

ASF GitHub Bot commented on KAFKA-6362:
---------------------------------------

hachikuji closed pull request #4326: KAFKA-6362: maybeAutoCommitOffsetsAsync 
should try to discover coordinator
URL: https://github.com/apache/kafka/pull/4326
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 1d84f847cd8..2f7fd58a66f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1058,7 +1058,7 @@ public void assign(Collection<TopicPartition> partitions) 
{
 
                 // make sure the offsets of topic partitions the consumer is 
unsubscribing from
                 // are committed since there will be no following rebalance
-                this.coordinator.maybeAutoCommitOffsetsNow();
+                
this.coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds());
 
                 log.debug("Subscribed to partition(s): {}", 
Utils.join(partitions, ", "));
                 this.subscriptions.assignFromUser(new HashSet<>(partitions));
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 5c1e60eee82..d7c1ce9966e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -528,6 +528,7 @@ public void commitOffsetsAsync(final Map<TopicPartition, 
OffsetAndMetadata> offs
                 public void onSuccess(Void value) {
                     pendingAsyncCommits.decrementAndGet();
                     doCommitOffsetsAsync(offsets, callback);
+                    client.pollNoWakeup();
                 }
 
                 @Override
@@ -623,20 +624,10 @@ public boolean commitOffsetsSync(Map<TopicPartition, 
OffsetAndMetadata> offsets,
         return false;
     }
 
-    private void maybeAutoCommitOffsetsAsync(long now) {
-        if (autoCommitEnabled) {
-            if (coordinatorUnknown()) {
-                this.nextAutoCommitDeadline = now + retryBackoffMs;
-            } else if (now >= nextAutoCommitDeadline) {
-                this.nextAutoCommitDeadline = now + autoCommitIntervalMs;
-                doAutoCommitOffsetsAsync();
-            }
-        }
-    }
-
-    public void maybeAutoCommitOffsetsNow() {
-        if (autoCommitEnabled && !coordinatorUnknown())
+    public void maybeAutoCommitOffsetsAsync(long now) {
+        if (autoCommitEnabled && now >= nextAutoCommitDeadline) {
             doAutoCommitOffsetsAsync();
+        }
     }
 
     private void doAutoCommitOffsetsAsync() {
@@ -650,8 +641,11 @@ public void onComplete(Map<TopicPartition, 
OffsetAndMetadata> offsets, Exception
                     log.warn("Asynchronous auto-commit of offsets {} failed: 
{}", offsets, exception.getMessage());
                     if (exception instanceof RetriableException)
                         nextAutoCommitDeadline = Math.min(time.milliseconds() 
+ retryBackoffMs, nextAutoCommitDeadline);
+                    else
+                        nextAutoCommitDeadline = time.milliseconds() + 
autoCommitIntervalMs;
                 } else {
                     log.debug("Completed asynchronous auto-commit of offsets 
{}", offsets);
+                    nextAutoCommitDeadline = time.milliseconds() + 
autoCommitIntervalMs;
                 }
             }
         });
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 76301a71bea..c49339b6525 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -1625,6 +1625,25 @@ public void testHeartbeatThreadClose() throws Exception {
             assertFalse("Heartbeat thread active after close", 
threads[i].getName().contains(groupId));
     }
 
+    @Test
+    public void testAutoCommitAfterCoordinatorBackToService() {
+        ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), 
assignors,
+                ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true, true);
+        subscriptions.assignFromUser(Collections.singleton(t1p));
+        subscriptions.seek(t1p, 100L);
+
+        coordinator.coordinatorDead();
+        assertTrue(coordinator.coordinatorUnknown());
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, 
Errors.NONE)));
+
+        // async commit offset should find coordinator
+        time.sleep(autoCommitIntervalMs); // sleep for a while to ensure auto 
commit does happen
+        coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds());
+        assertFalse(coordinator.coordinatorUnknown());
+        assertEquals(subscriptions.committed(t1p).offset(), 100L);
+    }
+
     private ConsumerCoordinator prepareCoordinatorForCloseTest(final boolean 
useGroupManagement,
                                                                final boolean 
autoCommit,
                                                                final boolean 
leaveGroup) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> auto commit not work since coordinatorUnknown() is always true.
> ---------------------------------------------------------------
>
>                 Key: KAFKA-6362
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6362
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 0.10.2.1
>            Reporter: Renkai Ge
>            Assignee: huxihx
>            Priority: Major
>
> {code}
> [2017-12-14 20:09:23.501] [Kafka 0.10 Fetcher for Source: 
> source_bj-docker-large (14/40)] INFO  
> org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: 
>       auto.commit.interval.ms = 5000
>       auto.offset.reset = latest
>       bootstrap.servers = [11.192.77.42:3002, 11.192.73.43:3002, 
> 11.192.73.66:3002]
>       check.crcs = true
>       client.id = 
>       connections.max.idle.ms = 540000
>       enable.auto.commit = true
>       exclude.internal.topics = true
>       fetch.max.bytes = 52428800
>       fetch.max.wait.ms = 500
>       fetch.min.bytes = 1
>       group.id = tcprtdetail_flink
>       heartbeat.interval.ms = 3000
>       interceptor.classes = null
>       key.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
>       max.partition.fetch.bytes = 1048576
>       max.poll.interval.ms = 300000
>       max.poll.records = 500
>       metadata.max.age.ms = 300000
>       metric.reporters = []
>       metrics.num.samples = 2
>       metrics.recording.level = INFO
>       metrics.sample.window.ms = 30000
>       partition.assignment.strategy = [class 
> org.apache.kafka.clients.consumer.RangeAssignor]
>       receive.buffer.bytes = 65536
>       reconnect.backoff.ms = 50
>       request.timeout.ms = 305000
>       retry.backoff.ms = 100
>       sasl.jaas.config = null
>       sasl.kerberos.kinit.cmd = /usr/bin/kinit
>       sasl.kerberos.min.time.before.relogin = 60000
>       sasl.kerberos.service.name = null
>       sasl.kerberos.ticket.renew.jitter = 0.05
>       sasl.kerberos.ticket.renew.window.factor = 0.8
>       sasl.mechanism = GSSAPI
>       security.protocol = PLAINTEXT
>       send.buffer.bytes = 131072
>       session.timeout.ms = 10000
>       ssl.cipher.suites = null
>       ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>       ssl.endpoint.identification.algorithm = null
>       ssl.key.password = null
>       ssl.keymanager.algorithm = SunX509
>       ssl.keystore.location = null
>       ssl.keystore.password = null
>       ssl.keystore.type = JKS
>       ssl.protocol = TLS
>       ssl.provider = null
>       ssl.secure.random.implementation = null
>       ssl.trustmanager.algorithm = PKIX
>       ssl.truststore.location = null
>       ssl.truststore.password = null
>       ssl.truststore.type = JKS
>       value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> [2017-12-14 20:09:23.502] [Kafka 0.10 Fetcher for Source: 
> source_bj-docker-large (14/40)] INFO  
> org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.2.1
> [2017-12-14 20:09:23.502] [Kafka 0.10 Fetcher for Source: 
> source_bj-docker-large (14/40)] INFO  
> org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 
> e89bffd6b2eff799
> {code}
> My kafka java client cannot auto commit.After add some debug log,I found that 
> the coordinatorUnknown() function in 
> [ConsumerCoordinator.java#L604|https://github.com/apache/kafka/blob/0.10.2.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L604]
>  always returns true,and nextAutoCommitDeadline just increases 
> infinitly.Should there be a lookupCoordinator() after line 604 like in 
> [ConsumerCoordinator.java#L508|https://github.com/apache/kafka/blob/0.10.2.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L508]?After
>  I add lookupCoordinator() next to line 604.The consumer can auto commit 
> offset properly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to