Re: [PR] KAFKA-16120: Handle removed replicas during KRaft migration [kafka]

2024-01-12 Thread via GitHub


splett2 commented on code in PR #15184:
URL: https://github.com/apache/kafka/pull/15184#discussion_r1450726015


##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -677,6 +677,88 @@ class ZkMigrationIntegrationTest {
 }
   }
 
+  @ClusterTest(clusterType = Type.ZK, brokers = 4, metadataVersion = 
MetadataVersion.IBP_3_4_IV0, serverProperties = Array(
+new ClusterConfigProperty(key = "inter.broker.listener.name", value = 
"EXTERNAL"),
+new ClusterConfigProperty(key = "listeners", value = 
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
+new ClusterConfigProperty(key = "advertised.listeners", value = 
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
+new ClusterConfigProperty(key = "listener.security.protocol.map", value = 
"EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT"),
+  ))
+  def testPartitionReassignmentInDualWrite(zkCluster: ClusterInstance): Unit = 
{
+// Create a topic in ZK mode
+val topicName = "test"
+var admin = zkCluster.createAdminClient()
+val zkClient = 
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying().zkClient
+
+// Bootstrap the ZK cluster ID into KRaft
+val clusterId = zkCluster.clusterId()
+val kraftCluster = new KafkaClusterTestKit.Builder(
+  new TestKitNodes.Builder().
+setBootstrapMetadataVersion(MetadataVersion.IBP_3_4_IV0).
+setClusterId(Uuid.fromString(clusterId)).
+setNumBrokerNodes(0).
+setNumControllerNodes(1).build())
+  .setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
+  .setConfigProp(KafkaConfig.ZkConnectProp, 
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
+  .build()
+try {
+  kraftCluster.format()
+  kraftCluster.startup()
+  val readyFuture = 
kraftCluster.controllers().values().asScala.head.controller.waitForReadyBrokers(3)
+
+  // Enable migration configs and restart brokers
+  log.info("Restart brokers in migration mode")
+  
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, 
"true")
+  
zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, 
kraftCluster.quorumVotersConfig())
+  
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp,
 "CONTROLLER")
+  
zkCluster.config().serverProperties().put(KafkaConfig.ListenerSecurityProtocolMapProp,
 "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
+  zkCluster.rollingBrokerRestart()
+  zkCluster.waitForReadyBrokers()
+  readyFuture.get(30, TimeUnit.SECONDS)
+
+  // Wait for migration to begin
+  log.info("Waiting for ZK migration to begin")
+  TestUtils.waitUntilTrue(
+() => zkClient.getControllerId.contains(3000),
+"Timed out waiting for KRaft controller to take over",
+3)
+
+  // Create a topic with replicas on brokers 0, 1, 2
+  log.info("Create new topic with AdminClient")
+  admin = zkCluster.createAdminClient()
+  val newTopics = new util.ArrayList[NewTopic]()
+  val replicaAssignment = Collections.singletonMap(Integer.valueOf(0), 
Seq(0, 1, 2).map(int2Integer).asJava)
+  newTopics.add(new NewTopic(topicName, replicaAssignment))
+  val createTopicResult = admin.createTopics(newTopics)
+  createTopicResult.all().get(60, TimeUnit.SECONDS)
+
+  val topicPartition = new TopicPartition(topicName, 0)
+
+  // Verify the changes made to KRaft are seen in ZK
+  verifyTopicPartitionMetadata(topicName, Seq(topicPartition), zkClient)
+
+  // Reassign replicas to brokers 1, 2, 3 and wait for reassignment to 
complete
+  
admin.alterPartitionReassignments(Collections.singletonMap(topicPartition,
+Optional.of(new NewPartitionReassignment(Seq(1, 2, 
3).map(int2Integer).asJava.all().get()
+
+  admin.electLeaders(ElectionType.PREFERRED, 
Collections.singleton(topicPartition)).all.get()

Review Comment:
   oddly, the reassignment times out if I do not run leader election. I took a 
look at the `state.change.logger` logs and I saw some messages around the 
reassignment-generated LISR being ignored due to the broker having the same 
leader epoch already.
   
   I suspect this is because the leader epoch bump logic in KRaft is different 
from the leader epoch bump logic in ZK (and the handling for LISR and topic 
deltas is different in the ReplicaManager). Something along the lines of:
   
   In KRaft mode, when we start a reassignment we don't bump the leader epoch. 
In ZK mode, when we add new replicas we _do_ bump the leader epoch. When we 
handle an LISR, we ignore any LISR with the current leader epoch, so that 
results in the KRaft reassignment initiation LISR being ignored.
   
   I will file a separate JIRA once I get to the bottom of it.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific 

Re: [PR] KAFKA-16120: Handle removed replicas during KRaft migration [kafka]

2024-01-12 Thread via GitHub


splett2 commented on code in PR #15184:
URL: https://github.com/apache/kafka/pull/15184#discussion_r1450726015


##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -677,6 +677,88 @@ class ZkMigrationIntegrationTest {
 }
   }
 
+  @ClusterTest(clusterType = Type.ZK, brokers = 4, metadataVersion = 
MetadataVersion.IBP_3_4_IV0, serverProperties = Array(
+new ClusterConfigProperty(key = "inter.broker.listener.name", value = 
"EXTERNAL"),
+new ClusterConfigProperty(key = "listeners", value = 
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
+new ClusterConfigProperty(key = "advertised.listeners", value = 
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
+new ClusterConfigProperty(key = "listener.security.protocol.map", value = 
"EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT"),
+  ))
+  def testPartitionReassignmentInDualWrite(zkCluster: ClusterInstance): Unit = 
{
+// Create a topic in ZK mode
+val topicName = "test"
+var admin = zkCluster.createAdminClient()
+val zkClient = 
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying().zkClient
+
+// Bootstrap the ZK cluster ID into KRaft
+val clusterId = zkCluster.clusterId()
+val kraftCluster = new KafkaClusterTestKit.Builder(
+  new TestKitNodes.Builder().
+setBootstrapMetadataVersion(MetadataVersion.IBP_3_4_IV0).
+setClusterId(Uuid.fromString(clusterId)).
+setNumBrokerNodes(0).
+setNumControllerNodes(1).build())
+  .setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
+  .setConfigProp(KafkaConfig.ZkConnectProp, 
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
+  .build()
+try {
+  kraftCluster.format()
+  kraftCluster.startup()
+  val readyFuture = 
kraftCluster.controllers().values().asScala.head.controller.waitForReadyBrokers(3)
+
+  // Enable migration configs and restart brokers
+  log.info("Restart brokers in migration mode")
+  
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, 
"true")
+  
zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, 
kraftCluster.quorumVotersConfig())
+  
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp,
 "CONTROLLER")
+  
zkCluster.config().serverProperties().put(KafkaConfig.ListenerSecurityProtocolMapProp,
 "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
+  zkCluster.rollingBrokerRestart()
+  zkCluster.waitForReadyBrokers()
+  readyFuture.get(30, TimeUnit.SECONDS)
+
+  // Wait for migration to begin
+  log.info("Waiting for ZK migration to begin")
+  TestUtils.waitUntilTrue(
+() => zkClient.getControllerId.contains(3000),
+"Timed out waiting for KRaft controller to take over",
+3)
+
+  // Create a topic with replicas on brokers 0, 1, 2
+  log.info("Create new topic with AdminClient")
+  admin = zkCluster.createAdminClient()
+  val newTopics = new util.ArrayList[NewTopic]()
+  val replicaAssignment = Collections.singletonMap(Integer.valueOf(0), 
Seq(0, 1, 2).map(int2Integer).asJava)
+  newTopics.add(new NewTopic(topicName, replicaAssignment))
+  val createTopicResult = admin.createTopics(newTopics)
+  createTopicResult.all().get(60, TimeUnit.SECONDS)
+
+  val topicPartition = new TopicPartition(topicName, 0)
+
+  // Verify the changes made to KRaft are seen in ZK
+  verifyTopicPartitionMetadata(topicName, Seq(topicPartition), zkClient)
+
+  // Reassign replicas to brokers 1, 2, 3 and wait for reassignment to 
complete
+  
admin.alterPartitionReassignments(Collections.singletonMap(topicPartition,
+Optional.of(new NewPartitionReassignment(Seq(1, 2, 
3).map(int2Integer).asJava.all().get()
+
+  admin.electLeaders(ElectionType.PREFERRED, 
Collections.singleton(topicPartition)).all.get()

Review Comment:
   oddly, the reassignment times out if I do not run leader election. I took a 
look at the `state.change.logger` logs and I saw some messages around the 
reassignment-generated LISR being ignored due to the broker having the same 
leader epoch already.
   
   I suspect this is because the leader epoch bump logic in KRaft is different 
from the leader epoch bump logic in ZK (and the handling for LISR and topic 
deltas is different in the ReplicaManager). I will file a separate JIRA once I 
get to the bottom of it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org