Copilot commented on code in PR #20128:
URL: https://github.com/apache/kafka/pull/20128#discussion_r2236780174


##########
core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala:
##########
@@ -1442,6 +1442,21 @@ class KafkaZkClientTest extends QuorumTestHarness {
     } finally System.clearProperty(ZKConfig.JUTE_MAXBUFFER)
   }
 
+  @Test
+  def testMigrationZnodeWithNullValue(): Unit = {
+    val (controllerEpoch, stat) = zkClient.getControllerEpoch.get
+    var migrationState = new ZkMigrationLeadershipState(3000, 42, 100, 42, 
Time.SYSTEM.milliseconds(), -1, controllerEpoch, stat.getVersion)
+    zkClient.retryRequestUntilConnected(CreateRequest(
+      MigrationZNode.path,
+      null,
+      zkClient.defaultAcls(MigrationZNode.path),
+      CreateMode.PERSISTENT))
+    
+    migrationState = zkClient.getOrCreateMigrationState(migrationState)
+
+    assertEquals(0, migrationState.migrationZkVersion())

Review Comment:
   [nitpick] The variable `migrationState` is declared as `var` but could be 
`val` since it's only reassigned once. Consider using `val` for the initial 
declaration and a different variable name for the reassignment to improve code 
clarity.
   ```suggestion
       val migrationState = new ZkMigrationLeadershipState(3000, 42, 100, 42, 
Time.SYSTEM.milliseconds(), -1, controllerEpoch, stat.getVersion)
       zkClient.retryRequestUntilConnected(CreateRequest(
         MigrationZNode.path,
         null,
         zkClient.defaultAcls(MigrationZNode.path),
         CreateMode.PERSISTENT))
       
       val updatedMigrationState = 
zkClient.getOrCreateMigrationState(migrationState)
   
       assertEquals(0, updatedMigrationState.migrationZkVersion())
   ```



##########
core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala:
##########
@@ -1442,6 +1442,21 @@ class KafkaZkClientTest extends QuorumTestHarness {
     } finally System.clearProperty(ZKConfig.JUTE_MAXBUFFER)
   }
 
+  @Test
+  def testMigrationZnodeWithNullValue(): Unit = {
+    val (controllerEpoch, stat) = zkClient.getControllerEpoch.get
+    var migrationState = new ZkMigrationLeadershipState(3000, 42, 100, 42, 
Time.SYSTEM.milliseconds(), -1, controllerEpoch, stat.getVersion)

Review Comment:
   [nitpick] The magic numbers (3000, 42, 100, 42) make the test hard to 
understand. Consider using named constants or variables to clarify what these 
values represent.
   ```suggestion
       val brokerId = 3000
       val leaderEpoch = 42
       val controllerEpochValue = 100
       val zkVersion = 42
       var migrationState = new ZkMigrationLeadershipState(brokerId, 
leaderEpoch, controllerEpochValue, zkVersion, Time.SYSTEM.milliseconds(), -1, 
controllerEpoch, stat.getVersion)
   ```



##########
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##########
@@ -1734,20 +1734,27 @@ class KafkaZkClient private[zk] (
     val getDataResponse = retryRequestUntilConnected(getDataRequest)
     getDataResponse.resultCode match {
       case Code.OK =>
-        MigrationZNode.decode(getDataResponse.data, 
getDataResponse.stat.getVersion, getDataResponse.stat.getMtime)
+        Option(getDataResponse.data) match {
+          case Some(data) =>
+            MigrationZNode.decode(data, getDataResponse.stat.getVersion, 
getDataResponse.stat.getMtime)
+          case None =>
+            createInitialMigrationState(initialState, removeFirst = true)
+        }
       case Code.NONODE =>
         createInitialMigrationState(initialState)
       case _ => throw getDataResponse.resultException.get
     }
   }
 
-  private def createInitialMigrationState(initialState: 
ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
-    val createRequest = CreateRequest(
+  private def createInitialMigrationState(initialState: 
ZkMigrationLeadershipState, removeFirst: Boolean = false): 
ZkMigrationLeadershipState = {
+    val createOp = CreateOp(
       MigrationZNode.path,
       MigrationZNode.encode(initialState),
       defaultAcls(MigrationZNode.path),
       CreateMode.PERSISTENT)
-    val response = retryRequestUntilConnected(createRequest)
+    val deleteOp = DeleteOp(MigrationZNode.path, ZkVersion.MatchAnyVersion)
+    val multi = MultiRequest((if (removeFirst) Some(deleteOp) else None).toSeq 
++ Seq(createOp))

Review Comment:
   [nitpick] The conditional logic for building the MultiRequest operations is 
complex and could be simplified. Consider extracting this into a clearer 
variable assignment or using a more readable approach like building a list of 
operations.
   ```suggestion
       val operations = if (removeFirst) Seq(deleteOp, createOp) else 
Seq(createOp)
       val multi = MultiRequest(operations)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to