dajac commented on code in PR #13408:
URL: https://github.com/apache/kafka/pull/13408#discussion_r1152957342


##########
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java:
##########
@@ -861,8 +872,9 @@ public void testEarlyControllerResults() throws Throwable {
     }
 
     @Disabled // TODO: need to fix leader election in LocalLog.
-    @Test
-    public void testMissingInMemorySnapshot() throws Exception {
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)

Review Comment:
   ditto?



##########
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java:
##########
@@ -343,8 +348,9 @@ public void testFenceMultipleBrokers() throws Throwable {
         }
     }
 
-    @Test
-    public void testBalancePartitionLeaders() throws Throwable {
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)

Review Comment:
   Why are we doing this change?



##########
core/src/main/scala/kafka/server/AlterPartitionManager.scala:
##########
@@ -283,7 +283,7 @@ class DefaultAlterPartitionManager(
         val partitionData = new AlterPartitionRequestData.PartitionData()
           .setPartitionIndex(item.topicIdPartition.partition)
           .setLeaderEpoch(item.leaderAndIsr.leaderEpoch)
-          .setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava)
+          
.setNewIsrWithEpochs(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(item.leaderAndIsr.isr.map(Integer.valueOf).asJava))

Review Comment:
   nit: It seems that `item.leaderAndIsr.isr.map(Integer.valueOf).asJava` will 
create an intermediate collection that is used to create a new collection in 
`newIsrToSimpleNewIsrWithBrokerEpochs`. Should we combine the two and directly 
create the `BrokerState` here?



##########
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java:
##########
@@ -847,15 +849,15 @@ public void testShrinkAndExpandIsr() throws Exception {
         assertEquals(OptionalInt.of(0), ctx.currentLeader(topicIdPartition));
         long brokerEpoch = ctx.currentBrokerEpoch(0);
         PartitionData shrinkIsrRequest = newAlterPartition(
-            replicationControl, topicIdPartition, asList(0, 1), 
LeaderRecoveryState.RECOVERED);
+            replicationControl, topicIdPartition, 
generateIsrWithTestDefaultEpoch(asList(0, 1)), LeaderRecoveryState.RECOVERED);

Review Comment:
   nit: How about having `isrWithDefaultEpoch(0, 1)`? It seems that we could 
remove the `asList`. 



##########
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java:
##########
@@ -950,41 +952,52 @@ public void testInvalidAlterPartitionRequests() throws 
Exception {
 
         // Invalid ISR (3 is not a valid replica)
         PartitionData invalidIsrRequest1 = newAlterPartition(
-            replicationControl, topicIdPartition, asList(0, 1, 3), 
LeaderRecoveryState.RECOVERED);
+            replicationControl, topicIdPartition, 
generateIsrWithTestDefaultEpoch(asList(0, 1, 3)), 
LeaderRecoveryState.RECOVERED);
         ControllerResult<AlterPartitionResponseData> invalidIsrResult1 = 
sendAlterPartition(
             replicationControl, leaderId, ctx.currentBrokerEpoch(leaderId),
             topicIdPartition.topicId(), invalidIsrRequest1);
         assertAlterPartitionResponse(invalidIsrResult1, topicIdPartition, 
Errors.INVALID_REQUEST);
 
         // Invalid ISR (does not include leader 0)
         PartitionData invalidIsrRequest2 = newAlterPartition(
-            replicationControl, topicIdPartition, asList(1, 2), 
LeaderRecoveryState.RECOVERED);
+            replicationControl, topicIdPartition, 
generateIsrWithTestDefaultEpoch(asList(1, 2)), LeaderRecoveryState.RECOVERED);
         ControllerResult<AlterPartitionResponseData> invalidIsrResult2 = 
sendAlterPartition(
             replicationControl, leaderId, ctx.currentBrokerEpoch(leaderId),
             topicIdPartition.topicId(), invalidIsrRequest2);
         assertAlterPartitionResponse(invalidIsrResult2, topicIdPartition, 
Errors.INVALID_REQUEST);
 
         // Invalid ISR length and recovery state
         PartitionData invalidIsrRecoveryRequest = newAlterPartition(
-            replicationControl, topicIdPartition, asList(0, 1), 
LeaderRecoveryState.RECOVERING);
+            replicationControl, topicIdPartition, 
generateIsrWithTestDefaultEpoch(asList(0, 1)), LeaderRecoveryState.RECOVERING);
         ControllerResult<AlterPartitionResponseData> invalidIsrRecoveryResult 
= sendAlterPartition(
             replicationControl, leaderId, ctx.currentBrokerEpoch(leaderId),
             topicIdPartition.topicId(), invalidIsrRecoveryRequest);
         assertAlterPartitionResponse(invalidIsrRecoveryResult, 
topicIdPartition, Errors.INVALID_REQUEST);
 
         // Invalid recovery state transition from RECOVERED to RECOVERING
         PartitionData invalidRecoveryRequest = newAlterPartition(
-            replicationControl, topicIdPartition, asList(0), 
LeaderRecoveryState.RECOVERING);
+            replicationControl, topicIdPartition, 
generateIsrWithTestDefaultEpoch(asList(0)), LeaderRecoveryState.RECOVERING);
         ControllerResult<AlterPartitionResponseData> invalidRecoveryResult = 
sendAlterPartition(
             replicationControl, leaderId, ctx.currentBrokerEpoch(leaderId),
             topicIdPartition.topicId(), invalidRecoveryRequest);
         assertAlterPartitionResponse(invalidRecoveryResult, topicIdPartition, 
Errors.INVALID_REQUEST);
+
+        // Stale ISR broker epoch request.

Review Comment:
   It seems that this unit test is more about testing `INVALID_REQUEST` cases. 
Should we just remove this case? It seems that it validates exactly what 
`testAlterPartitionShouldRejectBrokersWithStaleEpoch` validates as well, no?



##########
clients/src/test/java/org/apache/kafka/common/requests/AlterPartitionRequestTest.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.AlterPartitionRequestData;
+import org.apache.kafka.common.message.AlterPartitionRequestData.BrokerState;
+import org.apache.kafka.common.message.AlterPartitionRequestData.PartitionData;
+import org.apache.kafka.common.message.AlterPartitionRequestData.TopicData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
+import org.apache.kafka.common.Uuid;
+import org.junit.jupiter.params.ParameterizedTest;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class AlterPartitionRequestTest {
+    String topic = "test-topic";
+    Uuid topicId = Uuid.randomUuid();
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
+    public void testBuildAlterPartitionRequest(short version) {
+        AlterPartitionRequestData request = new AlterPartitionRequestData()
+            .setBrokerId(1)
+            .setBrokerEpoch(1);
+
+        TopicData topicData = new TopicData()
+            .setTopicId(topicId)
+            .setTopicName(topic);
+
+        List<BrokerState> newIsrWithBrokerEpoch = new LinkedList<>();
+        newIsrWithBrokerEpoch.add(new 
BrokerState().setBrokerId(1).setBrokerEpoch(1001));
+        newIsrWithBrokerEpoch.add(new 
BrokerState().setBrokerId(2).setBrokerEpoch(1002));
+        newIsrWithBrokerEpoch.add(new 
BrokerState().setBrokerId(3).setBrokerEpoch(1003));
+
+        topicData.partitions().add(new PartitionData()
+            .setPartitionIndex(0)
+            .setLeaderEpoch(1)
+            .setPartitionEpoch(10)
+            .setNewIsrWithEpochs(newIsrWithBrokerEpoch));
+
+        request.topics().add(topicData);
+
+        AlterPartitionRequest.Builder builder = new 
AlterPartitionRequest.Builder(request, version > 1);
+        AlterPartitionRequest alterPartitionRequest = builder.build(version);
+        assertEquals(1, alterPartitionRequest.data().topics().size());
+        assertEquals(1, 
alterPartitionRequest.data().topics().get(0).partitions().size());
+        PartitionData partitionData = 
alterPartitionRequest.data().topics().get(0).partitions().get(0);
+        assertEquals(version >= 3, partitionData.newIsr().isEmpty());
+        assertEquals(version < 3, partitionData.newIsrWithEpochs().isEmpty());

Review Comment:
   nit: Could we put those two in the if/else below? That would be clearer, I 
think.



##########
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java:
##########
@@ -311,7 +313,7 @@ void createPartitions(int count, String name,
         void registerBrokers(Integer... brokerIds) throws Exception {
             for (int brokerId : brokerIds) {
                 RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().
-                    setBrokerEpoch(brokerId + 
100).setBrokerId(brokerId).setRack(null);
+                    
setBrokerEpoch(generateTestDefaultBrokerEpoch(brokerId)).setBrokerId(brokerId).setRack(null);

Review Comment:
   nit: How about `defaultBrokerEpoch`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to