This is an automated email from the ASF dual-hosted git repository.

soarez pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8ace33b47fc KAFKA-16757: Fix broker re-registration issues around MV 
3.7-IV2 (#15945)
8ace33b47fc is described below

commit 8ace33b47fc078d89a104043b12ca95f6e1da637
Author: Colin Patrick McCabe <cmcc...@apache.org>
AuthorDate: Sat Jun 1 15:51:39 2024 -0700

    KAFKA-16757: Fix broker re-registration issues around MV 3.7-IV2 (#15945)
    
    When upgrading from a MetadataVersion older than 3.7-IV2, we need to resend 
the broker registration, so that the controller can record the storage 
directories. The current code for doing this has several problems, however. One 
is that it tends to trigger even in cases where we don't actually need it. 
Another is that when re-registering the broker, the broker is marked as fenced.
    
    This PR moves the handling of the re-registration case out of 
BrokerMetadataPublisher and into BrokerRegistrationTracker. The re-registration 
code there will only trigger in the case where the broker sees an existing 
registration for itself with no directories set.  This is much more targetted 
than the original code.
    
    Additionally, in ClusterControlManager, when re-registering the same 
broker, we now preserve its fencing and shutdown state, rather than clearing 
those. (There isn't any good reason re-registering the same broker should clear 
these things... this was purely an oversight.) Note that we can tell the broker 
is "the same" because it has the same IncarnationId.
    
    Reviewers: Gaurav Narula <gaurav_naru...@apple.com>, Igor Soarez 
<soa...@apple.com>
---
 .../kafka/server/BrokerLifecycleManager.scala      |   6 +-
 .../src/main/scala/kafka/server/BrokerServer.scala |   8 +-
 .../server/metadata/BrokerMetadataPublisher.scala  |  16 ---
 .../kafka/server/BrokerLifecycleManagerTest.scala  |   2 +-
 .../metadata/BrokerMetadataPublisherTest.scala     | 101 +-------------
 .../kafka/controller/ClusterControlManager.java    |   7 +
 .../image/publisher/BrokerRegistrationTracker.java | 136 +++++++++++++++++++
 .../publisher/BrokerRegistrationTrackerTest.java   | 151 +++++++++++++++++++++
 8 files changed, 306 insertions(+), 121 deletions(-)

diff --git a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala 
b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
index 5f3fdc81887..51bc16fb09d 100644
--- a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
@@ -264,11 +264,11 @@ class BrokerLifecycleManager(
       new OfflineDirBrokerFailureEvent(directory))
   }
 
-  def handleKraftJBODMetadataVersionUpdate(): Unit = {
-    eventQueue.append(new KraftJBODMetadataVersionUpdateEvent())
+  def resendBrokerRegistrationUnlessZkMode(): Unit = {
+    eventQueue.append(new ResendBrokerRegistrationUnlessZkModeEvent())
   }
 
-  private class KraftJBODMetadataVersionUpdateEvent extends EventQueue.Event {
+  private class ResendBrokerRegistrationUnlessZkModeEvent extends 
EventQueue.Event {
     override def run(): Unit = {
       if (!isZkBroker) {
         registered = false
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 112a03c50a9..64a4fd7474a 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -37,7 +37,7 @@ import org.apache.kafka.common.utils.{LogContext, Time}
 import org.apache.kafka.common.{ClusterResource, TopicPartition, Uuid}
 import org.apache.kafka.coordinator.group.metrics.{GroupCoordinatorMetrics, 
GroupCoordinatorRuntimeMetrics}
 import org.apache.kafka.coordinator.group.{CoordinatorRecord, 
GroupCoordinator, GroupCoordinatorConfig, GroupCoordinatorService, 
CoordinatorRecordSerde}
-import org.apache.kafka.image.publisher.MetadataPublisher
+import org.apache.kafka.image.publisher.{BrokerRegistrationTracker, 
MetadataPublisher}
 import org.apache.kafka.metadata.{BrokerState, ListenerInfo, VersionRange}
 import org.apache.kafka.security.CredentialProvider
 import org.apache.kafka.server.{AssignmentsManager, ClientMetricsManager, 
NodeToControllerChannelManager}
@@ -139,6 +139,8 @@ class BrokerServer(
 
   var brokerMetadataPublisher: BrokerMetadataPublisher = _
 
+  var brokerRegistrationTracker: BrokerRegistrationTracker = _
+
   val brokerFeatures: BrokerFeatures = 
BrokerFeatures.createDefault(config.unstableFeatureVersionsEnabled)
 
   def kafkaYammerMetrics: KafkaYammerMetrics = KafkaYammerMetrics.INSTANCE
@@ -482,6 +484,10 @@ class BrokerServer(
         lifecycleManager
       )
       metadataPublishers.add(brokerMetadataPublisher)
+      brokerRegistrationTracker = new 
BrokerRegistrationTracker(config.brokerId,
+        logManager.directoryIdsSet.toList.asJava,
+        () => lifecycleManager.resendBrokerRegistrationUnlessZkMode())
+      metadataPublishers.add(brokerRegistrationTracker)
 
       // Register parts of the broker that can be reconfigured via dynamic 
configs.  This needs to
       // be done before we publish the dynamic configs, so that we don't miss 
anything.
diff --git 
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala 
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index 048a665757b..ee7bfa2157e 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -29,7 +29,6 @@ import org.apache.kafka.coordinator.group.GroupCoordinator
 import org.apache.kafka.image.loader.LoaderManifest
 import org.apache.kafka.image.publisher.MetadataPublisher
 import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta}
-import org.apache.kafka.server.common.MetadataVersion
 import org.apache.kafka.server.fault.FaultHandler
 
 import java.util.concurrent.CompletableFuture
@@ -129,21 +128,6 @@ class BrokerMetadataPublisher(
         debug(s"Publishing metadata at offset $highestOffsetAndEpoch with 
$metadataVersionLogMsg.")
       }
 
-      Option(delta.featuresDelta()).foreach { featuresDelta =>
-        featuresDelta.metadataVersionChange().ifPresent{ metadataVersion =>
-          info(s"Updating metadata.version to 
${metadataVersion.featureLevel()} at offset $highestOffsetAndEpoch.")
-          val currentMetadataVersion = 
delta.image().features().metadataVersion()
-          if (currentMetadataVersion.isLessThan(MetadataVersion.IBP_3_7_IV2) 
&& metadataVersion.isAtLeast(MetadataVersion.IBP_3_7_IV2)) {
-            info(
-              s"""Resending BrokerRegistration with existing incarnation-id to 
inform the
-                 |controller about log directories in the broker following 
metadata update:
-                 |previousMetadataVersion: 
${delta.image().features().metadataVersion()}
-                 |newMetadataVersion: 
$metadataVersion""".stripMargin.linesIterator.mkString(" ").trim)
-            brokerLifecycleManager.handleKraftJBODMetadataVersionUpdate()
-          }
-        }
-      }
-
       // Apply topic deltas.
       Option(delta.topicsDelta()).foreach { topicsDelta =>
         try {
diff --git 
a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
index 34f9d139a03..b0162dc6358 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
@@ -285,7 +285,7 @@ class BrokerLifecycleManagerTest {
     assertEquals(1000L, manager.brokerEpoch)
 
     // Trigger JBOD MV update
-    manager.handleKraftJBODMetadataVersionUpdate()
+    manager.resendBrokerRegistrationUnlessZkMode()
 
     // Accept new registration, response sets epoch to 1200
     nextRegistrationRequest(1200L)
diff --git 
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
 
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
index c2926c3b67d..26f4fb3daee 100644
--- 
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
@@ -30,7 +30,6 @@ import org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET
 import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, ConfigEntry, 
NewTopic}
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.config.ConfigResource.Type.BROKER
-import org.apache.kafka.common.metadata.FeatureLevelRecord
 import org.apache.kafka.common.utils.Exit
 import org.apache.kafka.coordinator.group.GroupCoordinator
 import org.apache.kafka.image.{MetadataDelta, MetadataImage, 
MetadataImageTest, MetadataProvenance}
@@ -43,7 +42,7 @@ import org.junit.jupiter.api.Assertions.{assertEquals, 
assertNotNull, assertTrue
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.mockito.ArgumentMatchers.any
 import org.mockito.Mockito
-import org.mockito.Mockito.{clearInvocations, doThrow, mock, times, verify, 
verifyNoInteractions}
+import org.mockito.Mockito.{doThrow, mock, verify}
 import org.mockito.invocation.InvocationOnMock
 import org.mockito.stubbing.Answer
 
@@ -221,102 +220,4 @@ class BrokerMetadataPublisherTest {
 
     verify(groupCoordinator).onNewMetadataImage(image, delta)
   }
-
-  @Test
-  def 
testMetadataVersionUpdateToIBP_3_7_IV2OrAboveTriggersBrokerReRegistration(): 
Unit = {
-    val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, ""))
-    val metadataCache = new KRaftMetadataCache(0)
-    val logManager = mock(classOf[LogManager])
-    val replicaManager = mock(classOf[ReplicaManager])
-    val groupCoordinator = mock(classOf[GroupCoordinator])
-    val faultHandler = mock(classOf[FaultHandler])
-    val brokerLifecycleManager = mock(classOf[BrokerLifecycleManager])
-
-    val metadataPublisher = new BrokerMetadataPublisher(
-      config,
-      metadataCache,
-      logManager,
-      replicaManager,
-      groupCoordinator,
-      mock(classOf[TransactionCoordinator]),
-      mock(classOf[DynamicConfigPublisher]),
-      mock(classOf[DynamicClientQuotaPublisher]),
-      mock(classOf[ScramPublisher]),
-      mock(classOf[DelegationTokenPublisher]),
-      mock(classOf[AclPublisher]),
-      faultHandler,
-      faultHandler,
-      brokerLifecycleManager,
-    )
-
-    var image = MetadataImage.EMPTY
-    var delta = new MetadataDelta.Builder()
-      .setImage(image)
-      .build()
-
-    // We first upgrade metadata version to 3_6_IV2
-    delta.replay(new FeatureLevelRecord().
-      setName(MetadataVersion.FEATURE_NAME).
-      setFeatureLevel(MetadataVersion.IBP_3_6_IV2.featureLevel()))
-    var newImage = delta.apply(new MetadataProvenance(100, 4, 2000))
-
-    metadataPublisher.onMetadataUpdate(delta, newImage,
-      LogDeltaManifest.newBuilder()
-        .provenance(MetadataProvenance.EMPTY)
-        .leaderAndEpoch(LeaderAndEpoch.UNKNOWN)
-        .numBatches(1)
-        .elapsedNs(100)
-        .numBytes(42)
-        .build())
-
-    // This should NOT trigger broker reregistration
-    verifyNoInteractions(brokerLifecycleManager)
-
-    // We then upgrade to IBP_3_7_IV2
-    image = newImage
-    delta = new MetadataDelta.Builder()
-      .setImage(image)
-      .build()
-    delta.replay(new FeatureLevelRecord().
-      setName(MetadataVersion.FEATURE_NAME).
-      setFeatureLevel(MetadataVersion.IBP_3_7_IV2.featureLevel()))
-    newImage = delta.apply(new MetadataProvenance(100, 4, 2000))
-
-    metadataPublisher.onMetadataUpdate(delta, newImage,
-      LogDeltaManifest.newBuilder()
-        .provenance(MetadataProvenance.EMPTY)
-        .leaderAndEpoch(LeaderAndEpoch.UNKNOWN)
-        .numBatches(1)
-        .elapsedNs(100)
-        .numBytes(42)
-        .build())
-
-    // This SHOULD trigger a broker registration
-    verify(brokerLifecycleManager, 
times(1)).handleKraftJBODMetadataVersionUpdate()
-    clearInvocations(brokerLifecycleManager)
-
-    // Finally upgrade to IBP_3_8_IV0
-    image = newImage
-    delta = new MetadataDelta.Builder()
-      .setImage(image)
-      .build()
-    delta.replay(new FeatureLevelRecord().
-      setName(MetadataVersion.FEATURE_NAME).
-      setFeatureLevel(MetadataVersion.IBP_3_8_IV0.featureLevel()))
-    newImage = delta.apply(new MetadataProvenance(200, 4, 3000))
-
-    metadataPublisher.onMetadataUpdate(delta, newImage,
-      LogDeltaManifest.newBuilder()
-        .provenance(MetadataProvenance.EMPTY)
-        .leaderAndEpoch(LeaderAndEpoch.UNKNOWN)
-        .numBatches(1)
-        .elapsedNs(100)
-        .numBytes(42)
-        .build())
-
-    // This should NOT trigger broker reregistration
-    verify(brokerLifecycleManager, 
times(0)).handleKraftJBODMetadataVersionUpdate()
-
-    metadataPublisher.close()
-  }
 }
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java 
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index 0974c31d1b2..8b9c5b19eae 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -408,6 +408,13 @@ public class ClusterControlManager {
             setBrokerEpoch(brokerEpoch).
             setRack(request.rack()).
             setEndPoints(listenerInfo.toBrokerRegistrationRecord());
+
+        if (existing != null && 
request.incarnationId().equals(existing.incarnationId())) {
+            log.info("Amending registration of broker {}", request.brokerId());
+            record.setFenced(existing.fenced());
+            record.setInControlledShutdown(existing.inControlledShutdown());
+        }
+
         for (BrokerRegistrationRequestData.Feature feature : 
request.features()) {
             record.features().add(processRegistrationFeature(brokerId, 
finalizedFeatures, feature));
         }
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/publisher/BrokerRegistrationTracker.java
 
b/metadata/src/main/java/org/apache/kafka/image/publisher/BrokerRegistrationTracker.java
new file mode 100644
index 00000000000..51ac2bdfa4b
--- /dev/null
+++ 
b/metadata/src/main/java/org/apache/kafka/image/publisher/BrokerRegistrationTracker.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.publisher;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.loader.LoaderManifest;
+import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.slf4j.Logger;
+
+import java.util.List;
+
+/**
+ * Tracks the registration of a specific broker, and executes a callback if it 
should be refreshed.
+ *
+ * This tracker handles cases where we might want to re-register the broker. 
The only such case
+ * right now is during the transition from non-JBOD mode, to JBOD mode. In 
other words, the
+ * transition from a MetadataVersion less than 3.7-IV2, to one greater than or 
equal to 3.7-IV2.
+ * In this case, the broker registration will start out containing no 
directories, and we need to
+ * resend the BrokerRegistrationRequest to fix that.
+ *
+ * As much as possible, the goal here is to keep things simple. We just 
compare the desired state
+ * with the actual state, and try to make changes only if necessary.
+ */
+public class BrokerRegistrationTracker implements MetadataPublisher {
+    private final Logger log;
+    private final int id;
+    private final Runnable refreshRegistrationCallback;
+
+    /**
+     * Create the tracker.
+     *
+     * @param id                            The ID of this broker.
+     * @param targetDirectories             The directories managed by this 
broker.
+     * @param refreshRegistrationCallback   Callback to run if we need to 
refresh the registration.
+     */
+    public BrokerRegistrationTracker(
+        int id,
+        List<Uuid> targetDirectories,
+        Runnable refreshRegistrationCallback
+    ) {
+        this.log = new LogContext("[BrokerRegistrationTracker id=" + id + "] 
").
+            logger(BrokerRegistrationTracker.class);
+        this.id = id;
+        this.refreshRegistrationCallback = refreshRegistrationCallback;
+    }
+
+    @Override
+    public String name() {
+        return "BrokerRegistrationTracker(id=" + id + ")";
+    }
+
+    @Override
+    public void onMetadataUpdate(
+        MetadataDelta delta,
+        MetadataImage newImage,
+        LoaderManifest manifest
+    ) {
+        boolean checkBrokerRegistration = false;
+        if (delta.featuresDelta() != null) {
+            if (delta.metadataVersionChanged().isPresent()) {
+                if (log.isTraceEnabled()) {
+                    log.trace("Metadata version change is present: {}",
+                        delta.metadataVersionChanged());
+                }
+                checkBrokerRegistration = true;
+            }
+        }
+        if (delta.clusterDelta() != null) {
+            if (delta.clusterDelta().changedBrokers().get(id) != null) {
+                if (log.isTraceEnabled()) {
+                    log.trace("Broker change is present: {}",
+                        delta.clusterDelta().changedBrokers().get(id));
+                }
+                checkBrokerRegistration = true;
+            }
+        }
+        if (checkBrokerRegistration) {
+            if 
(brokerRegistrationNeedsRefresh(newImage.features().metadataVersion(),
+                    delta.clusterDelta().broker(id))) {
+                refreshRegistrationCallback.run();
+            }
+        }
+    }
+
+    /**
+     * Check if the current broker registration needs to be refreshed.
+     *
+     * @param metadataVersion   The current metadata version.
+     * @param registration      The current broker registration, or null if 
there is none.
+     * @return                  True only if we should refresh.
+     */
+    boolean brokerRegistrationNeedsRefresh(
+        MetadataVersion metadataVersion,
+        BrokerRegistration registration
+    ) {
+        // If there is no existing registration, the BrokerLifecycleManager 
must still be sending it.
+        // So we don't need to do anything yet.
+        if (registration == null) {
+            log.debug("No current broker registration to check.");
+            return false;
+        }
+        // Check to see if the directory list has changed.  Note that this 
check could certainly be
+        // triggered spuriously. For example, if the broker's directory list 
has been changed in the
+        // past, and we are in the process of replaying that change log, we 
will end up here.
+        // That's fine because resending the broker registration does not 
cause any problems. And,
+        // of course, as soon as a snapshot is made, we will no longer need to 
worry about those
+        // old metadata log entries being replayed on startup.
+        if (metadataVersion.isAtLeast(MetadataVersion.IBP_3_7_IV2) &&
+                registration.directories().isEmpty()) {
+            log.info("Current directory set is empty, but MV supports JBOD. 
Resending " +
+                    "broker registration.");
+            return true;
+        }
+        log.debug("Broker registration does not need to be resent.");
+        return false;
+    }
+}
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/publisher/BrokerRegistrationTrackerTest.java
 
b/metadata/src/test/java/org/apache/kafka/image/publisher/BrokerRegistrationTrackerTest.java
new file mode 100644
index 00000000000..855a96cd8aa
--- /dev/null
+++ 
b/metadata/src/test/java/org/apache/kafka/image/publisher/BrokerRegistrationTrackerTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.publisher;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.image.loader.LogDeltaManifest;
+import org.apache.kafka.raft.LeaderAndEpoch;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@Timeout(value = 40)
+public class BrokerRegistrationTrackerTest {
+    static final Uuid INCARNATION_ID = 
Uuid.fromString("jyjLbk31Tpa53pFrU9Y-Ng");
+
+    static final Uuid A = Uuid.fromString("Ahw3vXfnThqeZbb7HD1w6Q");
+
+    static final Uuid B = Uuid.fromString("BjOacT0OTNqIvUWIlKhahg");
+
+    static final Uuid C = Uuid.fromString("CVHi_iv2Rvy5_1rtPdasfg");
+
+    static class BrokerRegistrationTrackerTestContext {
+        AtomicInteger numCalls = new AtomicInteger(0);
+        BrokerRegistrationTracker tracker = new BrokerRegistrationTracker(1,
+                Arrays.asList(B, A), () -> numCalls.incrementAndGet());
+
+        MetadataImage image = MetadataImage.EMPTY;
+
+        void onMetadataUpdate(MetadataDelta delta) {
+            MetadataProvenance provenance = new MetadataProvenance(0, 0, 0);
+            image = delta.apply(provenance);
+            LogDeltaManifest manifest = new LogDeltaManifest.Builder().
+                provenance(provenance).
+                leaderAndEpoch(LeaderAndEpoch.UNKNOWN).
+                numBatches(1).
+                elapsedNs(1).
+                numBytes(1).
+                build();
+            tracker.onMetadataUpdate(delta, image, manifest);
+        }
+
+        MetadataDelta newDelta() {
+            return new MetadataDelta.Builder().
+                setImage(image).
+                build();
+        }
+    }
+
+    @Test
+    public void testTrackerName() {
+        BrokerRegistrationTrackerTestContext ctx  = new 
BrokerRegistrationTrackerTestContext();
+        assertEquals("BrokerRegistrationTracker(id=1)", ctx.tracker.name());
+    }
+
+    @Test
+    public void testMetadataVersionUpdateWithoutRegistrationDoesNothing() {
+        BrokerRegistrationTrackerTestContext ctx  = new 
BrokerRegistrationTrackerTestContext();
+        MetadataDelta delta = ctx.newDelta();
+        delta.replay(new FeatureLevelRecord().
+            setName(MetadataVersion.FEATURE_NAME).
+            setFeatureLevel(MetadataVersion.IBP_3_7_IV2.featureLevel()));
+        ctx.onMetadataUpdate(delta);
+        assertEquals(0, ctx.numCalls.get());
+    }
+
+    @Test
+    public void testBrokerUpdateWithoutNewMvDoesNothing() {
+        BrokerRegistrationTrackerTestContext ctx  = new 
BrokerRegistrationTrackerTestContext();
+        MetadataDelta delta = ctx.newDelta();
+        delta.replay(new RegisterBrokerRecord().
+            setBrokerId(1).
+            setIncarnationId(INCARNATION_ID).
+            setLogDirs(Arrays.asList(A, B, C)));
+        ctx.onMetadataUpdate(delta);
+        assertEquals(0, ctx.numCalls.get());
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testBrokerUpdateWithNewMv(boolean jbodMv) {
+        BrokerRegistrationTrackerTestContext ctx  = new 
BrokerRegistrationTrackerTestContext();
+        MetadataDelta delta = ctx.newDelta();
+        delta.replay(new RegisterBrokerRecord().
+            setBrokerId(1).
+            setIncarnationId(INCARNATION_ID).
+            setLogDirs(Arrays.asList()));
+        delta.replay(new FeatureLevelRecord().
+            setName(MetadataVersion.FEATURE_NAME).
+            setFeatureLevel(jbodMv ? 
MetadataVersion.IBP_3_7_IV2.featureLevel() :
+                MetadataVersion.IBP_3_7_IV1.featureLevel()));
+        ctx.onMetadataUpdate(delta);
+        if (jbodMv) {
+            assertEquals(1, ctx.numCalls.get());
+        } else {
+            assertEquals(0, ctx.numCalls.get());
+        }
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testBrokerUpdateWithNewMvWithTwoDeltas(boolean jbodMv) {
+        BrokerRegistrationTrackerTestContext ctx  = new 
BrokerRegistrationTrackerTestContext();
+        MetadataDelta delta = ctx.newDelta();
+        delta.replay(new RegisterBrokerRecord().
+            setBrokerId(1).
+            setIncarnationId(INCARNATION_ID).
+            setLogDirs(Arrays.asList()));
+        ctx.onMetadataUpdate(delta);
+        // No calls are made because MetadataVersion is 3.0-IV1 initially
+        assertEquals(0, ctx.numCalls.get());
+
+        delta = ctx.newDelta();
+        delta.replay(new FeatureLevelRecord().
+            setName(MetadataVersion.FEATURE_NAME).
+            setFeatureLevel(jbodMv ? 
MetadataVersion.IBP_3_7_IV2.featureLevel() :
+                MetadataVersion.IBP_3_7_IV1.featureLevel()));
+        ctx.onMetadataUpdate(delta);
+        if (jbodMv) {
+            assertEquals(1, ctx.numCalls.get());
+        } else {
+            assertEquals(0, ctx.numCalls.get());
+        }
+    }
+}

Reply via email to