hachikuji commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r968897177


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -799,64 +804,95 @@ protected boolean onJoinPrepare(Timer timer, int 
generation, String memberId) {
             }
         }
 
+        Optional<Exception> exception = revokePartitions(partitionsToRevoke, 
generation, memberId);
+
+        isLeader = false;
+        subscriptions.resetGroupSubscription();
+        joinPrepareTimer = null;
+        autoCommitOffsetRequestFuture = null;
+        timer.update();
+
+        if (exception.isPresent()) {
+            throw new KafkaException("User rebalance callback throws an 
error", exception.get());
+        }
+        return true;
+    }
+
+    private SortedSet<TopicPartition> getPartitionsToRevoke(RebalanceProtocol 
protocol, int generation, String memberId) {
+        SortedSet<TopicPartition> partitions = new TreeSet<>(COMPARATOR);
+        if (generation == Generation.NO_GENERATION.generationId ||
+                memberId.equals(Generation.NO_GENERATION.memberId)) {
+            partitions.addAll(subscriptions.assignedPartitions());
+            return partitions;
+        }
+
+        // Revoke all partitions
+        if (protocol == RebalanceProtocol.EAGER) {
+            partitions.addAll(subscriptions.assignedPartitions());
+            return partitions;
+        }
+
+        // only revoke those partitions that are not in the subscription any 
more.
+        if (protocol == RebalanceProtocol.COOPERATIVE) {
+            // Delay the partition revocation because we don't revoke the 
already owned partitions
+            return partitions;
+        }
+
+        log.warn("Invalid protocol: {}. No partition will be revoked.", 
protocol);

Review Comment:
   I think this code is dead? Why don't we use a `switch` like we had before? 
Then the compiler can help us ensure we handle new cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to