[ 
https://issues.apache.org/jira/browse/KAFKA-7194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16553954#comment-16553954
 ] 

ASF GitHub Bot commented on KAFKA-7194:
---------------------------------------

rajinisivaram closed pull request #5417: KAFKA-7194; Fix buffer underflow if 
onJoinComplete is retried after failure
URL: https://github.com/apache/kafka/pull/5417
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index b5c7a66e100..53834fb81df 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -200,9 +200,8 @@ public AbstractCoordinator(LogContext logContext,
                                                                  Map<String, 
ByteBuffer> allMemberMetadata);
 
     /**
-     * Invoked when a group member has successfully joined a group. If this 
call is woken up (i.e.
-     * if the invocation raises {@link 
org.apache.kafka.common.errors.WakeupException}), then it
-     * will be retried on the next call to {@link #ensureActiveGroup()}.
+     * Invoked when a group member has successfully joined a group. If this 
call fails with an exception,
+     * then it will be retried using the same assignment state on the next 
call to {@link #ensureActiveGroup()}.
      *
      * @param generation The generation that was joined
      * @param memberId The identifier for the local member in the group
@@ -418,7 +417,9 @@ boolean joinGroupIfNeeded(final long timeoutMs, final long 
startTimeMs) {
             }
 
             if (future.succeeded()) {
-                onJoinComplete(generation.generationId, generation.memberId, 
generation.protocol, future.value());
+                // Duplicate the buffer in case `onJoinComplete` does not 
complete and needs to be retried.
+                ByteBuffer memberAssignment = future.value().duplicate();
+                onJoinComplete(generation.generationId, generation.memberId, 
generation.protocol, memberAssignment);
 
                 // We reset the join group future only after the completion 
callback returns. This ensures
                 // that if the callback is woken up, we will retry it on the 
next joinGroupIfNeeded.
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index ea6d47249eb..f9b77e921cd 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -269,10 +269,10 @@ protected void onJoinComplete(int generation,
             this.joinedSubscription = newJoinedSubscription;
         }
 
-        // update the metadata and enforce a refresh to make sure the fetcher 
can start
-        // fetching data in the next iteration
+        // Update the metadata to include the full group subscription. The 
leader will trigger a rebalance
+        // if there are any metadata changes affecting any of the consumed 
partitions (whether or not this
+        // instance is subscribed to the topics).
         this.metadata.setTopics(subscriptions.groupSubscription());
-        if (!client.ensureFreshMetadata(Long.MAX_VALUE)) throw new 
TimeoutException();
 
         // give the assignor a chance to update internal state based on the 
received assignment
         assignor.onAssignment(assignment);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 7c2638cf012..ba392c6f4cb 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -874,6 +874,57 @@ public boolean matches(AbstractRequest body) {
         assertEquals(new HashSet<>(Arrays.asList(tp1, tp2)), 
subscriptions.assignedPartitions());
     }
 
+    @Test
+    public void testWakeupFromAssignmentCallback() {
+        ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), 
assignors,
+                ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, false, true);
+
+        final String topic = "topic1";
+        TopicPartition partition = new TopicPartition(topic, 0);
+        final String consumerId = "follower";
+        Set<String> topics = Collections.singleton(topic);
+        MockRebalanceListener rebalanceListener = new MockRebalanceListener() {
+            @Override
+            public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
+                boolean raiseWakeup = this.assignedCount == 0;
+                super.onPartitionsAssigned(partitions);
+
+                if (raiseWakeup)
+                    throw new WakeupException();
+            }
+        };
+
+        subscriptions.subscribe(topics, rebalanceListener);
+        metadata.setTopics(topics);
+
+        // we only have metadata for one topic initially
+        metadata.update(TestUtils.singletonCluster(topic, 1), 
Collections.emptySet(), time.milliseconds());
+
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
+
+        // prepare initial rebalance
+        partitionAssignor.prepare(singletonMap(consumerId, 
Collections.singletonList(partition)));
+
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, 
"leader", Errors.NONE));
+        
client.prepareResponse(syncGroupResponse(Collections.singletonList(partition), 
Errors.NONE));
+
+
+        // The first call to poll should raise the exception from the 
rebalance listener
+        try {
+            coordinator.poll(Long.MAX_VALUE);
+            fail("Expected exception thrown from assignment callback");
+        } catch (WakeupException e) {
+        }
+
+        // The second call should retry the assignment callback and succeed
+        coordinator.poll(Long.MAX_VALUE);
+
+        assertFalse(coordinator.rejoinNeededOrPending());
+        assertEquals(1, rebalanceListener.revokedCount);
+        assertEquals(2, rebalanceListener.assignedCount);
+    }
+
     @Test
     public void testRebalanceAfterTopicUnavailableWithSubscribe() {
         unavailableTopicTest(false, false, Collections.<String>emptySet());
@@ -1901,7 +1952,7 @@ private JoinGroupResponse joinGroupLeaderResponse(int 
generationId,
 
     private JoinGroupResponse joinGroupFollowerResponse(int generationId, 
String memberId, String leaderId, Errors error) {
         return new JoinGroupResponse(error, generationId, 
partitionAssignor.name(), memberId, leaderId,
-                Collections.<String, ByteBuffer>emptyMap());
+                Collections.emptyMap());
     }
 
     private SyncGroupResponse syncGroupResponse(List<TopicPartition> 
partitions, Errors error) {
@@ -1914,7 +1965,7 @@ private OffsetCommitResponse 
offsetCommitResponse(Map<TopicPartition, Errors> re
     }
 
     private OffsetFetchResponse offsetFetchResponse(Errors topLevelError) {
-        return new OffsetFetchResponse(topLevelError, 
Collections.<TopicPartition, OffsetFetchResponse.PartitionData>emptyMap());
+        return new OffsetFetchResponse(topLevelError, Collections.emptyMap());
     }
 
     private OffsetFetchResponse offsetFetchResponse(TopicPartition tp, Errors 
partitionLevelError, String metadata, long offset) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Error deserializing assignment after rebalance
> ----------------------------------------------
>
>                 Key: KAFKA-7194
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7194
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Konstantine Karantasis
>            Assignee: Jason Gustafson
>            Priority: Major
>
> A simple sink connector task is failing in a test with the following 
> exception: 
> {noformat}
> [2018-07-02 12:31:13,200] ERROR WorkerSinkTask{id=verifiable-sink-0} Task 
> threw an uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask)
> org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
> 'version': java.nio.BufferUnderflowException
>         at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:77)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:105)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:243)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:421)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:353)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:338)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:333)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
>         at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:444)
>         at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:317)
>         at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
>         at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
>         at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
>         at 
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748){noformat}
>  
> After dumping the consumer offsets on the partition that this consumer group 
> is writing with: 
> {noformat}
> bin/kafka-dump-log.sh --offsets-decoder --files ./00000000000000000000.log 
> {noformat}
> we get: 
> {noformat}
> Dumping ./00000000000000000000.log
> Starting offset: 0
> offset: 0 position: 0 CreateTime: 1530534673177 isvalid: true keysize: 27 
> valuesize: 217 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 
> sequence: -1 isTransactional: false headerKeys: [] key: 
> {"metadata":"connect-verifiable-sink"} payload: 
> {"protocolType":"consumer","protocol":"range","generationId":1,"assignment":"{consumer-4-bad84955-e702-44fe-a018-677bd3b3a9d4=[test-0]}"}
> offset: 1 position: 314 CreateTime: 1530534673206 isvalid: true keysize: 27 
> valuesize: 32 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 
> sequence: -1 isTransactional: false headerKeys: [] key: 
> {"metadata":"connect-verifiable-sink"} payload: 
> {"protocolType":"consumer","protocol":null,"generationId":2,"assignment":"{}"}{noformat}
>  
> Since the broker seems to send a non-empty response to the consumer, there's 
> a chance that the response buffer is consumed more than once at some point 
> when parsing the response in the client. 
> Here's what the kafka-request.log shows it sends to the client with the 
> `SYNC_GROUP` response that throws the error: 
> {noformat}
> [2018-07-02 12:31:13,185] DEBUG Completed 
> request:RequestHeader(apiKey=SYNC_GROUP, apiVersion=2, clientId=consumer-4, 
> correlationId=5) -- 
> {group_id=connect-verifiable-sink,generation_id=1,member_id=consumer-4-bad84955-e702-44fe-a018-677bd3b3a9d4,group_assignment=[{member_id=consumer-4-bad84955-e702-44fe-a018-677bd3b3a9d4,member_assignment=java.nio.HeapByteBuffer[pos=0
>  lim=24 
> cap=24]}]},response:{throttle_time_ms=0,error_code=0,member_assignment=java.nio.HeapByteBuffer[pos=0
>  lim=24 cap=24]} from connection 
> 172.31.40.44:9092-172.31.35.189:49191-25;totalTime:8.904,requestQueueTime:0.063,localTime:8.558,remoteTime:0.0,throttleTime:0.03,responseQueueTime:0.037,sendTime:0.245,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT
>  (kafka.request.logger){noformat}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to