lucasbru commented on code in PR #14920:
URL: https://github.com/apache/kafka/pull/14920#discussion_r1417523313


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java:
##########
@@ -331,34 +331,35 @@ public 
AsyncKafkaConsumerTestBuilder(Optional<GroupInformation> groupInfo, boole
             super(groupInfo, enableAutoCommit, enableAutoTick);
             String clientId = 
config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
             List<ConsumerPartitionAssignor> assignors = 
ConsumerPartitionAssignor.getAssignorInstances(
-                    
config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
-                    
config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, 
clientId))
+                
config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
+                
config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, 
clientId))
             );
             Deserializers<String, String> deserializers = new 
Deserializers<>(new StringDeserializer(), new StringDeserializer());
             this.fetchCollector = spy(new FetchCollector<>(logContext,
-                    metadata,
-                    subscriptions,
-                    fetchConfig,
-                    deserializers,
-                    metricsManager,
-                    time));
+                metadata,
+                subscriptions,
+                fetchConfig,
+                deserializers,

Review Comment:
   It seems like you have slightly different formatting settings than somebody 
else. I wouldn't do these kinds of whitespace changes unless it's obviously 
unclean



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -957,6 +966,57 @@ private void close(Duration timeout, boolean 
swallowException) {
         }
     }
 
+    /**
+     * Prior to closing the network thread, we need to make sure the following 
operations happen in the right sequence:
+     * 1. autocommit offsets
+     * 2. revoke all partitions
+     */
+    private void prepareShutdown(final Timer timer) {
+        if (!groupMetadata.isPresent())
+            return;
+
+        maybeAutoCommitSync(timer);
+        timer.update();
+        if (!subscriptions.hasAutoAssignedPartitions() || 
subscriptions.assignedPartitions().isEmpty())
+            return;
+
+        try {
+            // If the consumer is in a group, we will pause and revoke all 
assigned partitions
+            onLeavePrepare().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
+            timer.update();
+        } catch (Exception e) {
+            Exception exception = e;
+            if (e instanceof ExecutionException)
+                exception = (Exception) e.getCause();
+            throw new KafkaException("User rebalance callback throws an 
error", exception);
+        } finally {
+            subscriptions.assignFromSubscribed(Collections.emptySet());
+        }
+    }
+
+    private void maybeAutoCommitSync(final Timer timer) {
+        if (autoCommitEnabled) {
+            Map<TopicPartition, OffsetAndMetadata> allConsumed = 
subscriptions.allConsumed();
+            try {
+                log.debug("Sending synchronous auto-commit of offsets {} on 
closing", allConsumed);
+                commitSync(allConsumed, 
Duration.ofMillis(timer.remainingMs()));
+            } catch (Exception e) {
+                // consistent with async auto-commit failures, we do not 
propagate the exception
+                log.warn("Synchronous auto-commit of offsets {} failed: {}", 
allConsumed, e.getMessage());
+            }
+        }
+    }
+
+    private CompletableFuture<Void> onLeavePrepare() {

Review Comment:
   May miss a bit of context, but I'm not yet sure what this function is 
achieving. If this is for KAFKA-15276, maybe we can implement this as part of 
that ticket, because this function is mostly confusing me.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -436,10 +434,15 @@ public void testCommitAsyncLeaderEpochUpdate() {
         topicPartitionOffsets.put(t1, new OffsetAndMetadata(20L, 
Optional.of(1), ""));
 
         consumer.assign(Arrays.asList(t0, t1));
+        consumer.seek(t0, 10);
+        consumer.seek(t1, 20);
 
         CompletableFuture<Void> commitFuture = new CompletableFuture<>();
         commitFuture.complete(null);
 
+        prepareCommit(Arrays.asList(t1, t0), DEFAULT_GROUP_ID, Errors.NONE);
+        // TODO: The log shows NPE thrown from the CommitRequestManager, which 
is caused by the use of mock.

Review Comment:
   I think as long as the test isn't failing this is fine, since the problem 
should go away once we move to mocks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to