mumrah commented on a change in pull request #10688:
URL: https://github.com/apache/kafka/pull/10688#discussion_r632616511



##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -1002,7 +1013,13 @@ private QuorumController(LogContext logContext,
     @Override
     public CompletableFuture<ElectLeadersResponseData>
             electLeaders(ElectLeadersRequestData request) {
-        return appendWriteEvent("electLeaders", request.timeoutMs(),
+        // If topicPartitions is null, we will try to trigger a new leader 
election on
+        // all partitions (!).  But if it's empty, there is nothing to do.
+        if (request.topicPartitions() != null && 
request.topicPartitions().isEmpty()) {
+            return CompletableFuture.completedFuture(new 
ElectLeadersResponseData());
+        }
+        return appendWriteEvent("electLeaders",
+            time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs(), 
MILLISECONDS),

Review comment:
       Is this related to timeouts, or is it just a different fix?

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -800,19 +800,44 @@ void handleBrokerUnfenced(int brokerId, long brokerEpoch, 
List<ApiMessageAndVers
         boolean uncleanOk = electionTypeIsUnclean(request.electionType());
         List<ApiMessageAndVersion> records = new ArrayList<>();
         ElectLeadersResponseData response = new ElectLeadersResponseData();
-        for (TopicPartitions topic : request.topicPartitions()) {
-            ReplicaElectionResult topicResults =
-                new ReplicaElectionResult().setTopic(topic.topic());
-            response.replicaElectionResults().add(topicResults);
-            for (int partitionId : topic.partitions()) {
-                ApiError error = electLeader(topic.topic(), partitionId, 
uncleanOk, records);
-                topicResults.partitionResult().add(new PartitionResult().
-                    setPartitionId(partitionId).
-                    setErrorCode(error.error().code()).
-                    setErrorMessage(error.message()));
+        if (request.topicPartitions() == null) {
+            // If topicPartitions is null, we try to elect a new leader for 
every partition.
+            // There are some obvious issues with this wire protocol.  For 
example, what
+            // if we have too many partitions to fit the results in a single 
RPC?  Or what
+            // if we generate too many records to fit in a single batch?  This 
behavior

Review comment:
       Do we need all the records in a single batch, or could we create a batch 
for each topic (including its partitions)?

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -1165,4 +1192,22 @@ public long curClaimEpoch() {
     public void close() throws InterruptedException {
         queue.close();
     }
+
+    // VisibleForTesting
+    CountDownLatch pause() {
+        final CountDownLatch latch = new CountDownLatch(1);
+        appendControlEvent("pause", () -> {
+            try {
+                latch.await();
+            } catch (InterruptedException e) {
+                log.info("Interrupted while waiting for unpause.", e);
+            }
+        });
+        return latch;
+    }
+
+    // VisibleForTesting
+    Time time() {
+        return time;
+    }

Review comment:
       Do we need this? Can't we just use the Time we pass into the constructor 
in tests? Not a big deal really, just wondering

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -1030,17 +1050,24 @@ private QuorumController(LogContext logContext,
     @Override
     public CompletableFuture<AlterPartitionReassignmentsResponseData>
             alterPartitionReassignments(AlterPartitionReassignmentsRequestData 
request) {
-        CompletableFuture<AlterPartitionReassignmentsResponseData> future = 
new CompletableFuture<>();
-        future.completeExceptionally(new UnsupportedOperationException());
-        return future;
+        if (request.topics().isEmpty()) {
+            return CompletableFuture.completedFuture(new 
AlterPartitionReassignmentsResponseData());
+        }
+        return appendWriteEvent("alterPartitionReassignments",
+            time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs(), 
MILLISECONDS),
+            () -> {
+                throw new UnsupportedOperationException();
+            });
     }
 
     @Override
     public CompletableFuture<ListPartitionReassignmentsResponseData>
             listPartitionReassignments(ListPartitionReassignmentsRequestData 
request) {
-        CompletableFuture<ListPartitionReassignmentsResponseData> future = new 
CompletableFuture<>();
-        future.completeExceptionally(new UnsupportedOperationException());
-        return future;
+        return appendReadEvent("listPartitionReassignments",
+            time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs(), 
MILLISECONDS),
+            () -> {
+                throw new UnsupportedOperationException();
+            });

Review comment:
       nit: we can do without the curly braces here and above. However, these 
will soon be replaced by the actual impl




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to