Copilot commented on code in PR #20128: URL: https://github.com/apache/kafka/pull/20128#discussion_r2236780174
########## core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala: ########## @@ -1442,6 +1442,21 @@ class KafkaZkClientTest extends QuorumTestHarness { } finally System.clearProperty(ZKConfig.JUTE_MAXBUFFER) } + @Test + def testMigrationZnodeWithNullValue(): Unit = { + val (controllerEpoch, stat) = zkClient.getControllerEpoch.get + var migrationState = new ZkMigrationLeadershipState(3000, 42, 100, 42, Time.SYSTEM.milliseconds(), -1, controllerEpoch, stat.getVersion) + zkClient.retryRequestUntilConnected(CreateRequest( + MigrationZNode.path, + null, + zkClient.defaultAcls(MigrationZNode.path), + CreateMode.PERSISTENT)) + + migrationState = zkClient.getOrCreateMigrationState(migrationState) + + assertEquals(0, migrationState.migrationZkVersion()) Review Comment: [nitpick] The variable `migrationState` is declared as `var` but could be `val` since it's only reassigned once. Consider using `val` for the initial declaration and a different variable name for the reassignment to improve code clarity. ```suggestion val migrationState = new ZkMigrationLeadershipState(3000, 42, 100, 42, Time.SYSTEM.milliseconds(), -1, controllerEpoch, stat.getVersion) zkClient.retryRequestUntilConnected(CreateRequest( MigrationZNode.path, null, zkClient.defaultAcls(MigrationZNode.path), CreateMode.PERSISTENT)) val updatedMigrationState = zkClient.getOrCreateMigrationState(migrationState) assertEquals(0, updatedMigrationState.migrationZkVersion()) ``` ########## core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala: ########## @@ -1442,6 +1442,21 @@ class KafkaZkClientTest extends QuorumTestHarness { } finally System.clearProperty(ZKConfig.JUTE_MAXBUFFER) } + @Test + def testMigrationZnodeWithNullValue(): Unit = { + val (controllerEpoch, stat) = zkClient.getControllerEpoch.get + var migrationState = new ZkMigrationLeadershipState(3000, 42, 100, 42, Time.SYSTEM.milliseconds(), -1, controllerEpoch, stat.getVersion) Review Comment: [nitpick] The magic numbers (3000, 42, 100, 42) make the test hard to understand. Consider using named constants or variables to clarify what these values represent. ```suggestion val brokerId = 3000 val leaderEpoch = 42 val controllerEpochValue = 100 val zkVersion = 42 var migrationState = new ZkMigrationLeadershipState(brokerId, leaderEpoch, controllerEpochValue, zkVersion, Time.SYSTEM.milliseconds(), -1, controllerEpoch, stat.getVersion) ``` ########## core/src/main/scala/kafka/zk/KafkaZkClient.scala: ########## @@ -1734,20 +1734,27 @@ class KafkaZkClient private[zk] ( val getDataResponse = retryRequestUntilConnected(getDataRequest) getDataResponse.resultCode match { case Code.OK => - MigrationZNode.decode(getDataResponse.data, getDataResponse.stat.getVersion, getDataResponse.stat.getMtime) + Option(getDataResponse.data) match { + case Some(data) => + MigrationZNode.decode(data, getDataResponse.stat.getVersion, getDataResponse.stat.getMtime) + case None => + createInitialMigrationState(initialState, removeFirst = true) + } case Code.NONODE => createInitialMigrationState(initialState) case _ => throw getDataResponse.resultException.get } } - private def createInitialMigrationState(initialState: ZkMigrationLeadershipState): ZkMigrationLeadershipState = { - val createRequest = CreateRequest( + private def createInitialMigrationState(initialState: ZkMigrationLeadershipState, removeFirst: Boolean = false): ZkMigrationLeadershipState = { + val createOp = CreateOp( MigrationZNode.path, MigrationZNode.encode(initialState), defaultAcls(MigrationZNode.path), CreateMode.PERSISTENT) - val response = retryRequestUntilConnected(createRequest) + val deleteOp = DeleteOp(MigrationZNode.path, ZkVersion.MatchAnyVersion) + val multi = MultiRequest((if (removeFirst) Some(deleteOp) else None).toSeq ++ Seq(createOp)) Review Comment: [nitpick] The conditional logic for building the MultiRequest operations is complex and could be simplified. Consider extracting this into a clearer variable assignment or using a more readable approach like building a list of operations. ```suggestion val operations = if (removeFirst) Seq(deleteOp, createOp) else Seq(createOp) val multi = MultiRequest(operations) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org