This is an automated email from the ASF dual-hosted git repository.

davidarthur pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.4 by this push:
     new 7d79341ab0e KAFKA-14304 Use boolean for ZK migrating brokers in 
RPC/record (#13103)
7d79341ab0e is described below

commit 7d79341ab0ee3f461eb311f9af888fb8c5c1c924
Author: David Arthur <[email protected]>
AuthorDate: Wed Jan 11 14:36:56 2023 -0500

    KAFKA-14304 Use boolean for ZK migrating brokers in RPC/record (#13103)
    
    With the new broker epoch validation logic introduced in #12998, we no 
longer need the ZK broker epoch to be sent to the KRaft controller. This patch 
removes that epoch and replaces it with a boolean.
    
    Another small fix is included in this patch for controlled shutdown in 
migration mode. Previously, if a ZK broker was in migration mode, it would 
always try to do controlled shutdown via BrokerLifecycleManager. Since there is 
no ordering dependency between bringing up ZK brokers and the KRaft quorum 
during migration, a ZK broker could be running in migration mode, but talking 
to a ZK controller. A small check was added to see if the current controller is 
ZK or KRaft before decided whi [...]
    
    Reviewers: Colin P. McCabe <[email protected]>
---
 .../common/requests/BrokerRegistrationRequest.java |  2 +-
 .../common/message/BrokerRegistrationRequest.json  |  4 +-
 .../kafka/server/BrokerLifecycleManager.scala      | 16 +-------
 .../src/main/scala/kafka/server/BrokerServer.scala |  3 +-
 core/src/main/scala/kafka/server/KafkaConfig.scala | 10 +++--
 core/src/main/scala/kafka/server/KafkaServer.scala |  6 +--
 .../server/KafkaServerKRaftRegistrationTest.scala  | 33 +++++++++++++++-
 .../kafka/server/BrokerLifecycleManagerTest.scala  | 10 ++---
 .../server/BrokerRegistrationRequestTest.scala     |  2 +-
 .../scala/unit/kafka/server/KafkaConfigTest.scala  | 44 ++++++++++++++++++++++
 .../kafka/controller/ClusterControlManager.java    |  6 +--
 .../apache/kafka/metadata/BrokerRegistration.java  | 34 ++++++++---------
 .../common/metadata/RegisterBrokerRecord.json      |  4 +-
 .../kafka/metadata/BrokerRegistrationTest.java     |  8 ++--
 14 files changed, 121 insertions(+), 61 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/BrokerRegistrationRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/BrokerRegistrationRequest.java
index 9c6e57c6bb0..18d6a070d05 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/BrokerRegistrationRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/BrokerRegistrationRequest.java
@@ -36,7 +36,7 @@ public class BrokerRegistrationRequest extends 
AbstractRequest {
 
         @Override
         public short oldestAllowedVersion() {
-            if (data.migratingZkBrokerEpoch() != -1) {
+            if (data.isMigratingZkBroker()) {
                 return (short) 1;
             } else {
                 return (short) 0;
diff --git 
a/clients/src/main/resources/common/message/BrokerRegistrationRequest.json 
b/clients/src/main/resources/common/message/BrokerRegistrationRequest.json
index 22b0edbc64c..98658a3f04a 100644
--- a/clients/src/main/resources/common/message/BrokerRegistrationRequest.json
+++ b/clients/src/main/resources/common/message/BrokerRegistrationRequest.json
@@ -52,7 +52,7 @@
     },
     { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": 
"0+",
       "about": "The rack which this broker is in." },
-    { "name": "MigratingZkBrokerEpoch", "type": "int64", "versions": "1+", 
"default": "-1",
-      "about": "If the required configurations for ZK migration are present, 
this value is set to the ZK broker epoch" }
+    { "name": "IsMigratingZkBroker", "type": "bool", "versions": "1+", 
"default": "false",
+      "about": "If the required configurations for ZK migration are present, 
this value is set to true" }
   ]
 }
diff --git a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala 
b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
index f560a8eafc3..dd3f39b156f 100644
--- a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
@@ -55,8 +55,7 @@ class BrokerLifecycleManager(
   val config: KafkaConfig,
   val time: Time,
   val threadNamePrefix: Option[String],
-  val isZkBroker: Boolean,
-  val zkBrokerEpochSupplier: () => Long
+  val isZkBroker: Boolean
 ) extends Logging {
 
   val logContext = new LogContext(s"[BrokerLifecycleManager 
id=${config.nodeId}] ")
@@ -291,20 +290,9 @@ class BrokerLifecycleManager(
         setMinSupportedVersion(range.min()).
         setMaxSupportedVersion(range.max()))
     }
-    val migrationZkBrokerEpoch: Long = {
-      if (isZkBroker) {
-        val zkBrokerEpoch: Long = 
Option(zkBrokerEpochSupplier).map(_.apply()).getOrElse(-1)
-        if (zkBrokerEpoch < 0) {
-          throw new IllegalStateException("Trying to sending 
BrokerRegistration in migration Zk " +
-            "broker without valid zk broker epoch")
-        }
-        zkBrokerEpoch
-      } else
-        -1
-    }
     val data = new BrokerRegistrationRequestData().
         setBrokerId(nodeId).
-        setMigratingZkBrokerEpoch(migrationZkBrokerEpoch).
+        setIsMigratingZkBroker(isZkBroker).
         setClusterId(_clusterId).
         setFeatures(features).
         setIncarnationId(incarnationId).
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 3bf4154f535..82ecbc22010 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -183,8 +183,7 @@ class BrokerServer(
       lifecycleManager = new BrokerLifecycleManager(config,
         time,
         threadNamePrefix,
-        isZkBroker = false,
-        () => -1)
+        isZkBroker = false)
 
       /* start scheduler */
       kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 4fad17cd4d4..791e516a443 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -2128,8 +2128,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
       }
       if (migrationEnabled) {
         if (zkConnect == null) {
-          throw new ConfigException(s"Missing required configuration 
`${KafkaConfig.ZkConnectProp}` which has no default value. " +
-            s"`${KafkaConfig.ZkConnectProp}` is required because 
`${KafkaConfig.MigrationEnabledProp}  is set to true.")
+          throw new ConfigException(s"If using 
`${KafkaConfig.MigrationEnabledProp}` in KRaft mode, 
`${KafkaConfig.ZkConnectProp}` must also be set.")
         }
       }
     }
@@ -2155,6 +2154,11 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
         throw new ConfigException(s"If using ${KafkaConfig.ProcessRolesProp}, 
${KafkaConfig.QuorumVotersProp} must contain a parseable set of voters.")
       }
     }
+    def validateNonEmptyQuorumVotersForMigration(): Unit = {
+      if (voterAddressSpecsByNodeId.isEmpty) {
+        throw new ConfigException(s"If using 
${KafkaConfig.MigrationEnabledProp}, ${KafkaConfig.QuorumVotersProp} must 
contain a parseable set of voters.")
+      }
+    }
     def validateControlPlaneListenerEmptyForKRaft(): Unit = {
       require(controlPlaneListenerName.isEmpty,
         s"${KafkaConfig.ControlPlaneListenerNameProp} is not supported in 
KRaft mode.")
@@ -2237,7 +2241,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
     } else {
       // ZK-based
       if (migrationEnabled) {
-        validateNonEmptyQuorumVotersForKRaft()
+        validateNonEmptyQuorumVotersForMigration()
         require(controllerListenerNames.nonEmpty,
           s"${KafkaConfig.ControllerListenerNamesProp} must not be empty when 
running in ZooKeeper migration mode: ${controllerListenerNames.asJava}")
         require(interBrokerProtocolVersion.isMigrationSupported, s"Cannot 
enable ZooKeeper migration without setting " +
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 2144fb5aa73..3f8ac4ccad9 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -371,8 +371,7 @@ class KafkaServer(
           lifecycleManager = new BrokerLifecycleManager(config,
             time,
             threadNamePrefix,
-            isZkBroker = true,
-            () => kafkaController.brokerEpoch)
+            isZkBroker = true)
 
           // If the ZK broker is in migration mode, start up a RaftManager to 
learn about the new KRaft controller
           val kraftMetaProps = MetaProperties(zkMetaProperties.clusterId, 
zkMetaProperties.brokerId)
@@ -788,7 +787,7 @@ class KafkaServer(
 
       _brokerState = BrokerState.PENDING_CONTROLLED_SHUTDOWN
 
-      if (config.migrationEnabled && lifecycleManager != null) {
+      if (config.migrationEnabled && lifecycleManager != null && 
metadataCache.getControllerId.exists(_.isInstanceOf[KRaftCachedControllerId])) {
         // TODO KAFKA-14447 Only use KRaft controlled shutdown (when in 
migration mode)
         // For now we'll send the heartbeat with WantShutDown set so the KRaft 
controller can see a broker
         // shutting down without waiting for the heartbeat to time out.
@@ -802,7 +801,6 @@ class KafkaServer(
           case e: Throwable =>
             error("Got unexpected exception waiting for controlled shutdown 
future", e)
         }
-        // TODO fix this ^
       }
 
       val shutdownSucceeded = 
doControlledShutdown(config.controlledShutdownMaxRetries.intValue)
diff --git 
a/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala
 
b/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala
index f961cd4507e..d6f39c76f38 100644
--- 
a/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala
@@ -25,7 +25,7 @@ import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
 import org.apache.kafka.common.Uuid
 import org.apache.kafka.raft.RaftConfig
 import org.apache.kafka.server.common.MetadataVersion
-import org.junit.jupiter.api.Assertions.fail
+import org.junit.jupiter.api.Assertions.{assertThrows, fail}
 import org.junit.jupiter.api.extension.ExtendWith
 import org.junit.jupiter.api.{Tag, Timeout}
 
@@ -85,4 +85,35 @@ class KafkaServerKRaftRegistrationTest {
       kraftCluster.close()
     }
   }
+
+  @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = 
MetadataVersion.IBP_3_3_IV0)
+  def testRestartOldIbpZkBrokerInMigrationMode(zkCluster: ClusterInstance): 
Unit = {
+    // Bootstrap the ZK cluster ID into KRaft
+    val clusterId = zkCluster.clusterId()
+    val kraftCluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setBootstrapMetadataVersion(MetadataVersion.IBP_3_4_IV0).
+        setClusterId(Uuid.fromString(clusterId)).
+        setNumBrokerNodes(0).
+        setNumControllerNodes(1).build())
+      .setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
+      .setConfigProp(KafkaConfig.ZkConnectProp, 
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
+      .build()
+    try {
+      kraftCluster.format()
+      kraftCluster.startup()
+
+      // Enable migration configs and restart brokers
+      val props = kraftCluster.controllerClientProperties()
+      val voters = props.get(RaftConfig.QUORUM_VOTERS_CONFIG)
+      
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, 
"true")
+      
zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, 
voters)
+      
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp,
 "CONTROLLER")
+      
zkCluster.config().serverProperties().put(KafkaConfig.ListenerSecurityProtocolMapProp,
 "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
+      assertThrows(classOf[IllegalArgumentException], () => 
zkCluster.rollingBrokerRestart())
+    } finally {
+      zkCluster.stop()
+      kraftCluster.close()
+    }
+  }
 }
diff --git 
a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
index 1a7569a9871..304c987d3ae 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
@@ -98,14 +98,14 @@ class BrokerLifecycleManagerTest {
   @Test
   def testCreateAndClose(): Unit = {
     val context = new BrokerLifecycleManagerTestContext(configProperties)
-    val manager = new BrokerLifecycleManager(context.config, context.time, 
None, isZkBroker = false, () => -1)
+    val manager = new BrokerLifecycleManager(context.config, context.time, 
None, isZkBroker = false)
     manager.close()
   }
 
   @Test
   def testCreateStartAndClose(): Unit = {
     val context = new BrokerLifecycleManagerTestContext(configProperties)
-    val manager = new BrokerLifecycleManager(context.config, context.time, 
None, isZkBroker = false, () => -1)
+    val manager = new BrokerLifecycleManager(context.config, context.time, 
None, isZkBroker = false)
     assertEquals(BrokerState.NOT_RUNNING, manager.state)
     manager.start(() => context.highestMetadataOffset.get(),
       context.mockChannelManager, context.clusterId, 
context.advertisedListeners,
@@ -120,7 +120,7 @@ class BrokerLifecycleManagerTest {
   @Test
   def testSuccessfulRegistration(): Unit = {
     val context = new BrokerLifecycleManagerTestContext(configProperties)
-    val manager = new BrokerLifecycleManager(context.config, context.time, 
None, isZkBroker = false, () => -1)
+    val manager = new BrokerLifecycleManager(context.config, context.time, 
None, isZkBroker = false)
     val controllerNode = new Node(3000, "localhost", 8021)
     context.controllerNodeProvider.node.set(controllerNode)
     context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
@@ -140,7 +140,7 @@ class BrokerLifecycleManagerTest {
   def testRegistrationTimeout(): Unit = {
     val context = new BrokerLifecycleManagerTestContext(configProperties)
     val controllerNode = new Node(3000, "localhost", 8021)
-    val manager = new BrokerLifecycleManager(context.config, context.time, 
None, isZkBroker = false, () => -1)
+    val manager = new BrokerLifecycleManager(context.config, context.time, 
None, isZkBroker = false)
     context.controllerNodeProvider.node.set(controllerNode)
     def newDuplicateRegistrationResponse(): Unit = {
       context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
@@ -181,7 +181,7 @@ class BrokerLifecycleManagerTest {
   @Test
   def testControlledShutdown(): Unit = {
     val context = new BrokerLifecycleManagerTestContext(configProperties)
-    val manager = new BrokerLifecycleManager(context.config, context.time, 
None, isZkBroker = false, () => -1)
+    val manager = new BrokerLifecycleManager(context.config, context.time, 
None, isZkBroker = false)
     val controllerNode = new Node(3000, "localhost", 8021)
     context.controllerNodeProvider.node.set(controllerNode)
     context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
diff --git 
a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
index d7fbf644c83..09c74d4e155 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
@@ -110,7 +110,7 @@ class BrokerRegistrationRequestTest {
       .setBrokerId(brokerId)
       .setClusterId(clusterId)
       .setIncarnationId(Uuid.randomUuid())
-      .setMigratingZkBrokerEpoch(zkEpoch.getOrElse(-1L))
+      .setIsMigratingZkBroker(zkEpoch.isDefined)
       .setFeatures(features)
 
     Errors.forCode(sendAndRecieve(channelManager, req).errorCode())
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 1983eb1b0ed..5cb6c666523 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1635,4 +1635,48 @@ class KafkaConfigTest {
       errorMessage
     )
   }
+
+  @Test
+  def testMigrationEnabledZkMode(): Unit = {
+    val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect, port 
= TestUtils.MockZkPort)
+    props.setProperty(KafkaConfig.MigrationEnabledProp, "true")
+    assertEquals(
+      "If using zookeeper.metadata.migration.enable, controller.quorum.voters 
must contain a parseable set of voters.",
+      assertThrows(classOf[ConfigException], () => 
KafkaConfig.fromProps(props)).getMessage)
+
+    props.setProperty(KafkaConfig.QuorumVotersProp, "3000@localhost:9093")
+    assertEquals(
+      "requirement failed: controller.listener.names must not be empty when 
running in ZooKeeper migration mode: []",
+      assertThrows(classOf[IllegalArgumentException], () => 
KafkaConfig.fromProps(props)).getMessage)
+
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
+    KafkaConfig.fromProps(props)
+
+    props.setProperty(KafkaConfig.InterBrokerProtocolVersionProp, 
MetadataVersion.IBP_3_3_IV0.version())
+    assertEquals(
+      "requirement failed: Cannot enable ZooKeeper migration without setting 
'inter.broker.protocol.version' to 3.4 or higher",
+      assertThrows(classOf[IllegalArgumentException], () => 
KafkaConfig.fromProps(props)).getMessage)
+
+    props.remove(KafkaConfig.MigrationEnabledProp)
+    assertEquals(
+      "requirement failed: controller.listener.names must be empty when not 
running in KRaft mode: [CONTROLLER]",
+      assertThrows(classOf[IllegalArgumentException], () => 
KafkaConfig.fromProps(props)).getMessage)
+
+    props.remove(KafkaConfig.ControllerListenerNamesProp)
+    KafkaConfig.fromProps(props)
+  }
+
+  @Test
+  def testMigrationEnabledKRaftMode(): Unit = {
+    val props = new Properties()
+    props.putAll(kraftProps())
+    props.setProperty(KafkaConfig.MigrationEnabledProp, "true")
+
+    assertEquals(
+      "If using `zookeeper.metadata.migration.enable` in KRaft mode, 
`zookeeper.connect` must also be set.",
+      assertThrows(classOf[ConfigException], () => 
KafkaConfig.fromProps(props)).getMessage)
+
+    props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
+    KafkaConfig.fromProps(props)
+  }
 }
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java 
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index 7a58433789b..98981b4c1f6 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -336,13 +336,13 @@ public class ClusterControlManager {
             }
         }
 
-        if (request.migratingZkBrokerEpoch() != -1 && 
!zkRegistrationAllowed()) {
+        if (request.isMigratingZkBroker() && !zkRegistrationAllowed()) {
             throw new BrokerIdNotRegisteredException("Controller does not 
support registering ZK brokers.");
         }
 
         RegisterBrokerRecord record = new RegisterBrokerRecord().
             setBrokerId(brokerId).
-            setMigratingZkBrokerEpoch(request.migratingZkBrokerEpoch()).
+            setIsMigratingZkBroker(request.isMigratingZkBroker()).
             setIncarnationId(request.incarnationId()).
             setBrokerEpoch(brokerEpoch).
             setRack(request.rack());
@@ -426,7 +426,7 @@ public class ClusterControlManager {
                 new BrokerRegistration(brokerId, record.brokerEpoch(),
                     record.incarnationId(), listeners, features,
                     Optional.ofNullable(record.rack()), record.fenced(),
-                    record.inControlledShutdown(), 
BrokerRegistration.zkBrokerEpoch(record.migratingZkBrokerEpoch())));
+                    record.inControlledShutdown(), 
record.isMigratingZkBroker()));
         if (heartbeatManager != null) {
             if (prevRegistration != null) heartbeatManager.remove(brokerId);
             heartbeatManager.register(brokerId, record.fenced());
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java 
b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
index 3f87d2830ab..0ece4dd94de 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
@@ -65,7 +65,7 @@ public class BrokerRegistration {
     private final Optional<String> rack;
     private final boolean fenced;
     private final boolean inControlledShutdown;
-    private final Optional<Long> migratingZkBrokerEpoch;
+    private final boolean isMigratingZkBroker;
 
     // Visible for testing
     public BrokerRegistration(int id,
@@ -77,7 +77,7 @@ public class BrokerRegistration {
                               boolean fenced,
                               boolean inControlledShutdown) {
         this(id, epoch, incarnationId, listenersToMap(listeners), 
supportedFeatures, rack,
-            fenced, inControlledShutdown, Optional.empty());
+            fenced, inControlledShutdown, false);
     }
 
     public BrokerRegistration(int id,
@@ -88,9 +88,9 @@ public class BrokerRegistration {
                               Optional<String> rack,
                               boolean fenced,
                               boolean inControlledShutdown,
-                              Optional<Long> migratingZkBrokerEpoch) {
+                              boolean isMigratingZkBroker) {
         this(id, epoch, incarnationId, listenersToMap(listeners), 
supportedFeatures, rack,
-            fenced, inControlledShutdown, migratingZkBrokerEpoch);
+            fenced, inControlledShutdown, isMigratingZkBroker);
     }
 
     // Visible for testing
@@ -102,7 +102,7 @@ public class BrokerRegistration {
                               Optional<String> rack,
                               boolean fenced,
                               boolean inControlledShutdown) {
-        this(id, epoch, incarnationId, listeners, supportedFeatures, rack, 
fenced, inControlledShutdown, Optional.empty());
+        this(id, epoch, incarnationId, listeners, supportedFeatures, rack, 
fenced, inControlledShutdown, false);
     }
 
     public BrokerRegistration(int id,
@@ -113,7 +113,7 @@ public class BrokerRegistration {
                               Optional<String> rack,
                               boolean fenced,
                               boolean inControlledShutdown,
-                              Optional<Long> migratingZkBrokerEpoch) {
+                              boolean isMigratingZkBroker) {
         this.id = id;
         this.epoch = epoch;
         this.incarnationId = incarnationId;
@@ -131,7 +131,7 @@ public class BrokerRegistration {
         this.rack = rack;
         this.fenced = fenced;
         this.inControlledShutdown = inControlledShutdown;
-        this.migratingZkBrokerEpoch = migratingZkBrokerEpoch;
+        this.isMigratingZkBroker = isMigratingZkBroker;
     }
 
     public static BrokerRegistration fromRecord(RegisterBrokerRecord record) {
@@ -155,7 +155,7 @@ public class BrokerRegistration {
             Optional.ofNullable(record.rack()),
             record.fenced(),
             record.inControlledShutdown(),
-            zkBrokerEpoch(record.migratingZkBrokerEpoch()));
+            record.isMigratingZkBroker());
     }
 
     public int id() {
@@ -199,11 +199,7 @@ public class BrokerRegistration {
     }
 
     public boolean isMigratingZkBroker() {
-        return migratingZkBrokerEpoch.isPresent();
-    }
-
-    public Optional<Long> migratingZkBrokerEpoch() {
-        return migratingZkBrokerEpoch;
+        return isMigratingZkBroker;
     }
 
     public ApiMessageAndVersion toRecord(ImageWriterOptions options) {
@@ -222,9 +218,9 @@ public class BrokerRegistration {
             }
         }
 
-        if (migratingZkBrokerEpoch.isPresent()) {
+        if (isMigratingZkBroker) {
             if (options.metadataVersion().isMigrationSupported()) {
-                
registrationRecord.setMigratingZkBrokerEpoch(migratingZkBrokerEpoch.get());
+                registrationRecord.setIsMigratingZkBroker(isMigratingZkBroker);
             } else {
                 options.handleLoss("the isMigratingZkBroker state of one or 
more brokers");
             }
@@ -253,7 +249,7 @@ public class BrokerRegistration {
     @Override
     public int hashCode() {
         return Objects.hash(id, epoch, incarnationId, listeners, 
supportedFeatures,
-            rack, fenced, inControlledShutdown, migratingZkBrokerEpoch);
+            rack, fenced, inControlledShutdown, isMigratingZkBroker);
     }
 
     @Override
@@ -268,7 +264,7 @@ public class BrokerRegistration {
             other.rack.equals(rack) &&
             other.fenced == fenced &&
             other.inControlledShutdown == inControlledShutdown &&
-            other.migratingZkBrokerEpoch.equals(migratingZkBrokerEpoch);
+            other.isMigratingZkBroker == isMigratingZkBroker;
     }
 
     @Override
@@ -289,7 +285,7 @@ public class BrokerRegistration {
         bld.append(", rack=").append(rack);
         bld.append(", fenced=").append(fenced);
         bld.append(", inControlledShutdown=").append(inControlledShutdown);
-        bld.append(", 
migratingZkBrokerEpoch=").append(migratingZkBrokerEpoch.orElse(-1L));
+        bld.append(", isMigratingZkBroker=").append(isMigratingZkBroker);
         bld.append(")");
         return bld.toString();
     }
@@ -313,7 +309,7 @@ public class BrokerRegistration {
             rack,
             newFenced,
             newInControlledShutdownChange,
-            migratingZkBrokerEpoch
+            isMigratingZkBroker
         );
     }
 }
diff --git 
a/metadata/src/main/resources/common/metadata/RegisterBrokerRecord.json 
b/metadata/src/main/resources/common/metadata/RegisterBrokerRecord.json
index f8de544127c..66b740573d5 100644
--- a/metadata/src/main/resources/common/metadata/RegisterBrokerRecord.json
+++ b/metadata/src/main/resources/common/metadata/RegisterBrokerRecord.json
@@ -22,8 +22,8 @@
   "fields": [
     { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
       "about": "The broker id." },
-    { "name": "MigratingZkBrokerEpoch", "type": "int64", "versions": "2+", 
"default": "-1",
-      "about": "The ZK broker epoch if this record is for a ZK broker. 
Otherwise, -1" },
+    { "name": "IsMigratingZkBroker", "type": "bool", "versions": "2+", 
"default": "false",
+      "about": "True if the broker is a ZK broker in migration mode. 
Otherwise, false" },
     { "name": "IncarnationId", "type": "uuid", "versions": "0+",
       "about": "The incarnation ID of the broker process" },
     { "name": "BrokerEpoch", "type": "int64", "versions": "0+",
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java 
b/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java
index 50516fbfccd..262c8513381 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java
@@ -59,7 +59,7 @@ public class BrokerRegistrationTest {
             Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, 
"localhost", 9093)),
             Stream.of(new SimpleEntry<>("metadata.version", 
VersionRange.of((short) 7, (short) 7)))
                 .collect(Collectors.toMap(SimpleEntry::getKey, 
SimpleEntry::getValue)),
-            Optional.empty(), false, true, Optional.of(10L)));
+            Optional.empty(), false, true, true));
 
     @Test
     public void testValues() {
@@ -90,19 +90,19 @@ public class BrokerRegistrationTest {
             "incarnationId=3MfdxWlNSn2UDYsmDP1pYg, listeners=[Endpoint(" +
             "listenerName='INTERNAL', securityProtocol=PLAINTEXT, " +
             "host='localhost', port=9091)], supportedFeatures={foo: 1-2}, " +
-            "rack=Optional.empty, fenced=true, inControlledShutdown=false, 
migratingZkBrokerEpoch=-1)",
+            "rack=Optional.empty, fenced=true, inControlledShutdown=false, 
isMigratingZkBroker=false)",
             REGISTRATIONS.get(1).toString());
         assertEquals("BrokerRegistration(id=2, epoch=0, " +
             "incarnationId=eY7oaG1RREie5Kk9uy1l6g, listeners=[Endpoint(" +
             "listenerName='INTERNAL', securityProtocol=PLAINTEXT, " +
             "host='localhost', port=9092)], supportedFeatures={bar: 1-4, foo: 
2-3}, " +
-            "rack=Optional[myrack], fenced=false, inControlledShutdown=true, 
migratingZkBrokerEpoch=-1)",
+            "rack=Optional[myrack], fenced=false, inControlledShutdown=true, 
isMigratingZkBroker=false)",
             REGISTRATIONS.get(2).toString());
         assertEquals("BrokerRegistration(id=3, epoch=0, " +
             "incarnationId=1t8VyWx2TCSTpUWuqj-FOw, listeners=[Endpoint(" +
             "listenerName='INTERNAL', securityProtocol=PLAINTEXT, " +
             "host='localhost', port=9093)], 
supportedFeatures={metadata.version: 7}, " +
-            "rack=Optional.empty, fenced=false, inControlledShutdown=true, 
migratingZkBrokerEpoch=10)",
+            "rack=Optional.empty, fenced=false, inControlledShutdown=true, 
isMigratingZkBroker=true)",
             REGISTRATIONS.get(3).toString());
     }
 

Reply via email to