This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4ccbf1634af MINOR: Metadata image test improvements (#15373)
4ccbf1634af is described below

commit 4ccbf1634afb063615616b5995ef279a063fbeab
Author: Alyssa Huang <ahu...@confluent.io>
AuthorDate: Thu Mar 28 03:22:02 2024 -0700

    MINOR: Metadata image test improvements (#15373)
    
    
    Reviewers: Mickael Maison <mickael.mai...@gmail.com>
---
 .../controller/ProducerIdControlManagerTest.java   |   9 +-
 .../apache/kafka/image/ClientQuotasImageTest.java  |  23 +++--
 .../org/apache/kafka/image/ClusterImageTest.java   | 104 +++++++++++++++++++--
 .../org/apache/kafka/image/FeaturesImageTest.java  |  43 ++++++++-
 .../org/apache/kafka/image/ImageDowngradeTest.java |  33 +++++++
 .../apache/kafka/image/ProducerIdsImageTest.java   |  12 ++-
 .../org/apache/kafka/image/ScramImageTest.java     |  16 +++-
 7 files changed, 219 insertions(+), 21 deletions(-)

diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
index ffe6a7b38ca..d8c3770f85a 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
@@ -105,7 +105,14 @@ public class ProducerIdControlManagerTest {
                     .setBrokerEpoch(100)
                     .setNextProducerId(40));
         }, "Producer ID range must only increase");
-        range = producerIdControlManager.generateNextProducerId(1, 
100).response();
+        assertThrows(RuntimeException.class, () -> {
+            producerIdControlManager.replay(
+                new ProducerIdsRecord()
+                    .setBrokerId(2)
+                    .setBrokerEpoch(100)
+                    .setNextProducerId(42));
+        }, "Producer ID range must only increase");
+        range = producerIdControlManager.generateNextProducerId(3, 
100).response();
         assertEquals(42, range.firstProducerId());
 
         // Gaps in the ID range are okay.
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/ClientQuotasImageTest.java 
b/metadata/src/test/java/org/apache/kafka/image/ClientQuotasImageTest.java
index 8d1a5883cc4..9b6a8fda9d0 100644
--- a/metadata/src/test/java/org/apache/kafka/image/ClientQuotasImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/ClientQuotasImageTest.java
@@ -30,6 +30,7 @@ import org.junit.jupiter.api.Timeout;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -51,27 +52,31 @@ public class ClientQuotasImageTest {
 
     static {
         Map<ClientQuotaEntity, ClientQuotaImage> entities1 = new HashMap<>();
-        Map<String, String> fooUser = new HashMap<>();
-        fooUser.put(ClientQuotaEntity.USER, "foo");
-        Map<String, Double> fooUserQuotas = new HashMap<>();
-        fooUserQuotas.put(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, 
123.0);
+        Map<String, String> fooUser = 
Collections.singletonMap(ClientQuotaEntity.USER, "foo");
+        Map<String, Double> fooUserQuotas = 
Collections.singletonMap(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, 
123.0);
         entities1.put(new ClientQuotaEntity(fooUser), new 
ClientQuotaImage(fooUserQuotas));
         Map<String, String> barUserAndIp = new HashMap<>();
         barUserAndIp.put(ClientQuotaEntity.USER, "bar");
         barUserAndIp.put(ClientQuotaEntity.IP, "127.0.0.1");
-        Map<String, Double> barUserAndIpQuotas = new HashMap<>();
-        
barUserAndIpQuotas.put(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, 456.0);
-        entities1.put(new ClientQuotaEntity(barUserAndIp),
-            new ClientQuotaImage(barUserAndIpQuotas));
+        Map<String, Double> barUserAndIpQuotas = 
Collections.singletonMap(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, 
456.0);
+        entities1.put(new ClientQuotaEntity(barUserAndIp), new 
ClientQuotaImage(barUserAndIpQuotas));
         IMAGE1 = new ClientQuotasImage(entities1);
 
         DELTA1_RECORDS = new ArrayList<>();
+        // remove quota
         DELTA1_RECORDS.add(new ApiMessageAndVersion(new ClientQuotaRecord().
                 setEntity(Arrays.asList(
                     new 
EntityData().setEntityType(ClientQuotaEntity.USER).setEntityName("bar"),
                     new 
EntityData().setEntityType(ClientQuotaEntity.IP).setEntityName("127.0.0.1"))).
                 setKey(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG).
                 setRemove(true), 
CLIENT_QUOTA_RECORD.highestSupportedVersion()));
+        // alter quota
+        DELTA1_RECORDS.add(new ApiMessageAndVersion(new ClientQuotaRecord().
+            setEntity(Arrays.asList(
+                new 
EntityData().setEntityType(ClientQuotaEntity.USER).setEntityName("foo"))).
+            setKey(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG).
+            setValue(234.0), CLIENT_QUOTA_RECORD.highestSupportedVersion()));
+        // add quota to entity with existing quota
         DELTA1_RECORDS.add(new ApiMessageAndVersion(new ClientQuotaRecord().
             setEntity(Arrays.asList(
                 new 
EntityData().setEntityType(ClientQuotaEntity.USER).setEntityName("foo"))).
@@ -83,7 +88,7 @@ public class ClientQuotasImageTest {
 
         Map<ClientQuotaEntity, ClientQuotaImage> entities2 = new HashMap<>();
         Map<String, Double> fooUserQuotas2 = new HashMap<>();
-        fooUserQuotas2.put(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, 
123.0);
+        fooUserQuotas2.put(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, 
234.0);
         fooUserQuotas2.put(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, 
999.0);
         entities2.put(new ClientQuotaEntity(fooUser), new 
ClientQuotaImage(fooUserQuotas2));
         IMAGE2 = new ClientQuotasImage(entities2);
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java 
b/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java
index 7d121aaf466..820d5a83fa8 100644
--- a/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java
@@ -21,6 +21,11 @@ import org.apache.kafka.common.Endpoint;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
 import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpoint;
+import 
org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpointCollection;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerFeature;
+import 
org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerFeatureCollection;
 import org.apache.kafka.common.metadata.RegisterControllerRecord;
 import 
org.apache.kafka.common.metadata.RegisterControllerRecord.ControllerEndpoint;
 import 
org.apache.kafka.common.metadata.RegisterControllerRecord.ControllerEndpointCollection;
@@ -48,7 +53,9 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static 
org.apache.kafka.common.metadata.MetadataRecordType.BROKER_REGISTRATION_CHANGE_RECORD;
 import static 
org.apache.kafka.common.metadata.MetadataRecordType.FENCE_BROKER_RECORD;
+import static 
org.apache.kafka.common.metadata.MetadataRecordType.REGISTER_BROKER_RECORD;
 import static 
org.apache.kafka.common.metadata.MetadataRecordType.UNFENCE_BROKER_RECORD;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
@@ -56,13 +63,19 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 @Timeout(value = 40)
 public class ClusterImageTest {
 
-    public final static ClusterImage IMAGE1;
+    public static final ClusterImage IMAGE1;
 
     static final List<ApiMessageAndVersion> DELTA1_RECORDS;
 
-    final static ClusterDelta DELTA1;
+    static final ClusterDelta DELTA1;
 
-    final static ClusterImage IMAGE2;
+    static final ClusterImage IMAGE2;
+
+    static final List<ApiMessageAndVersion> DELTA2_RECORDS;
+
+    static final ClusterDelta DELTA2;
+
+    static final ClusterImage IMAGE3;
 
     static {
         Map<Integer, BrokerRegistration> map1 = new HashMap<>();
@@ -88,7 +101,7 @@ public class ClusterImageTest {
             setId(2).
             setEpoch(123).
             setIncarnationId(Uuid.fromString("hr4TVh3YQiu3p16Awkka6w")).
-            setListeners(Arrays.asList(new Endpoint("PLAINTEXT", 
SecurityProtocol.PLAINTEXT, "localhost", 9093))).
+            setListeners(Arrays.asList(new Endpoint("PLAINTEXT", 
SecurityProtocol.PLAINTEXT, "localhost", 9094))).
             setSupportedFeatures(Collections.emptyMap()).
             setRack(Optional.of("arack")).
             setFenced(false).
@@ -104,14 +117,18 @@ public class ClusterImageTest {
         IMAGE1 = new ClusterImage(map1, cmap1);
 
         DELTA1_RECORDS = new ArrayList<>();
+        // unfence b0
         DELTA1_RECORDS.add(new ApiMessageAndVersion(new UnfenceBrokerRecord().
             setId(0).setEpoch(1000), 
UNFENCE_BROKER_RECORD.highestSupportedVersion()));
+        // fence b1
         DELTA1_RECORDS.add(new ApiMessageAndVersion(new FenceBrokerRecord().
             setId(1).setEpoch(1001), 
FENCE_BROKER_RECORD.highestSupportedVersion()));
+        // mark b0 in controlled shutdown
         DELTA1_RECORDS.add(new ApiMessageAndVersion(new 
BrokerRegistrationChangeRecord().
             setBrokerId(0).setBrokerEpoch(1000).setInControlledShutdown(
-            
BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value()),
-            (short) 0));
+                
BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value()),
+            BROKER_REGISTRATION_CHANGE_RECORD.highestSupportedVersion()));
+        // unregister b2
         DELTA1_RECORDS.add(new ApiMessageAndVersion(new 
UnregisterBrokerRecord().
             setBrokerId(2).setBrokerEpoch(123),
             (short) 0));
@@ -160,6 +177,67 @@ public class ClusterImageTest {
                 new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, 
"localhost", 19093))).
             setSupportedFeatures(Collections.emptyMap()).build());
         IMAGE2 = new ClusterImage(map2, cmap2);
+
+        DELTA2_RECORDS = new ArrayList<>(DELTA1_RECORDS);
+        // fence b0
+        DELTA2_RECORDS.add(new ApiMessageAndVersion(new FenceBrokerRecord().
+            setId(0).setEpoch(1000), 
FENCE_BROKER_RECORD.highestSupportedVersion()));
+        // unfence b1
+        DELTA2_RECORDS.add(new ApiMessageAndVersion(new UnfenceBrokerRecord().
+            setId(1).setEpoch(1001), 
UNFENCE_BROKER_RECORD.highestSupportedVersion()));
+        // mark b0 as not in controlled shutdown
+        DELTA2_RECORDS.add(new ApiMessageAndVersion(new 
BrokerRegistrationChangeRecord().
+            setBrokerId(0).setBrokerEpoch(1000).setInControlledShutdown(
+                BrokerRegistrationInControlledShutdownChange.NONE.value()),
+            BROKER_REGISTRATION_CHANGE_RECORD.highestSupportedVersion()));
+        // re-register b2
+        DELTA2_RECORDS.add(new ApiMessageAndVersion(new RegisterBrokerRecord().
+            
setBrokerId(2).setIsMigratingZkBroker(true).setIncarnationId(Uuid.fromString("Am5Yse7GQxaw0b2alM74bP")).
+            setBrokerEpoch(1002).setEndPoints(new BrokerEndpointCollection(
+                Arrays.asList(new 
BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").
+                    setPort(9094).setSecurityProtocol((short) 0)).iterator())).
+            setFeatures(new BrokerFeatureCollection(
+                Collections.singleton(new BrokerFeature().
+                    setName(MetadataVersion.FEATURE_NAME).
+                    
setMinSupportedVersion(MetadataVersion.IBP_3_3_IV3.featureLevel()).
+                    
setMaxSupportedVersion(MetadataVersion.IBP_3_6_IV0.featureLevel())).iterator())).
+            setRack("rack3"),
+            REGISTER_BROKER_RECORD.highestSupportedVersion()));
+
+        DELTA2 = new ClusterDelta(IMAGE2);
+        RecordTestUtils.replayAll(DELTA2, DELTA2_RECORDS);
+
+        Map<Integer, BrokerRegistration> map3 = new HashMap<>();
+        map3.put(0, new BrokerRegistration.Builder().
+            setId(0).
+            setEpoch(1000).
+            setIncarnationId(Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")).
+            setListeners(Arrays.asList(new Endpoint("PLAINTEXT", 
SecurityProtocol.PLAINTEXT, "localhost", 9092))).
+            setSupportedFeatures(Collections.singletonMap("foo", 
VersionRange.of((short) 1, (short) 3))).
+            setRack(Optional.empty()).
+            setFenced(true).
+            setInControlledShutdown(true).build());
+        map3.put(1, new BrokerRegistration.Builder().
+            setId(1).
+            setEpoch(1001).
+            setIncarnationId(Uuid.fromString("U52uRe20RsGI0RvpcTx33Q")).
+            setListeners(Arrays.asList(new Endpoint("PLAINTEXT", 
SecurityProtocol.PLAINTEXT, "localhost", 9093))).
+            setSupportedFeatures(Collections.singletonMap("foo", 
VersionRange.of((short) 1, (short) 3))).
+            setRack(Optional.empty()).
+            setFenced(false).
+            setInControlledShutdown(false).build());
+        map3.put(2, new BrokerRegistration.Builder().
+            setId(2).
+            setEpoch(1002).
+            setIncarnationId(Uuid.fromString("Am5Yse7GQxaw0b2alM74bP")).
+            setListeners(Arrays.asList(new Endpoint("PLAINTEXT", 
SecurityProtocol.PLAINTEXT, "localhost", 9094))).
+            setSupportedFeatures(Collections.singletonMap("metadata.version",
+                VersionRange.of(MetadataVersion.IBP_3_3_IV3.featureLevel(), 
MetadataVersion.IBP_3_6_IV0.featureLevel()))).
+            setRack(Optional.of("rack3")).
+            setFenced(true).
+            setIsMigratingZkBroker(true).build());
+
+        IMAGE3 = new ClusterImage(map3, cmap2);
     }
 
     @Test
@@ -186,6 +264,20 @@ public class ClusterImageTest {
         testToImage(IMAGE2);
     }
 
+    @Test
+    public void testApplyDelta2() {
+        assertEquals(IMAGE3, DELTA2.apply());
+        // check image2 + delta2 = image3, since records for image2 + delta2 
might differ from records from image3
+        List<ApiMessageAndVersion> records = getImageRecords(IMAGE2);
+        records.addAll(DELTA2_RECORDS);
+        testToImage(IMAGE3, records);
+    }
+
+    @Test
+    public void testImage3RoundTrip() {
+        testToImage(IMAGE3);
+    }
+
     private static void testToImage(ClusterImage image) {
         testToImage(image, Optional.empty());
     }
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java 
b/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java
index bc5149a2172..99e97d64757 100644
--- a/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java
@@ -45,23 +45,28 @@ public class FeaturesImageTest {
     public final static List<ApiMessageAndVersion> DELTA1_RECORDS;
     final static FeaturesDelta DELTA1;
     final static FeaturesImage IMAGE2;
+    final static List<ApiMessageAndVersion> DELTA2_RECORDS;
+    final static FeaturesDelta DELTA2;
+    final static FeaturesImage IMAGE3;
 
     static {
         Map<String, Short> map1 = new HashMap<>();
         map1.put("foo", (short) 2);
         map1.put("bar", (short) 1);
-        map1.put("baz", (short) 8);
         IMAGE1 = new FeaturesImage(map1, MetadataVersion.latestTesting(), 
ZkMigrationState.NONE);
 
         DELTA1_RECORDS = new ArrayList<>();
+        // change feature level
         DELTA1_RECORDS.add(new ApiMessageAndVersion(new FeatureLevelRecord().
             setName("foo").setFeatureLevel((short) 3),
             (short) 0));
+        // remove feature
         DELTA1_RECORDS.add(new ApiMessageAndVersion(new FeatureLevelRecord().
             setName("bar").setFeatureLevel((short) 0),
             (short) 0));
+        // add feature
         DELTA1_RECORDS.add(new ApiMessageAndVersion(new FeatureLevelRecord().
-            setName("baz").setFeatureLevel((short) 0),
+            setName("baz").setFeatureLevel((short) 8),
             (short) 0));
 
         DELTA1 = new FeaturesDelta(IMAGE1);
@@ -69,7 +74,27 @@ public class FeaturesImageTest {
 
         Map<String, Short> map2 = new HashMap<>();
         map2.put("foo", (short) 3);
+        map2.put("baz", (short) 8);
         IMAGE2 = new FeaturesImage(map2, MetadataVersion.latestTesting(), 
ZkMigrationState.NONE);
+
+        DELTA2_RECORDS = new ArrayList<>();
+        // remove all features
+        DELTA2_RECORDS.add(new ApiMessageAndVersion(new FeatureLevelRecord().
+            setName("foo").setFeatureLevel((short) 0),
+            (short) 0));
+        DELTA2_RECORDS.add(new ApiMessageAndVersion(new FeatureLevelRecord().
+            setName("baz").setFeatureLevel((short) 0),
+            (short) 0));
+        // add feature back with different feature level
+        DELTA2_RECORDS.add(new ApiMessageAndVersion(new FeatureLevelRecord().
+            setName("bar").setFeatureLevel((short) 1),
+            (short) 0));
+
+        DELTA2 = new FeaturesDelta(IMAGE2);
+        RecordTestUtils.replayAll(DELTA2, DELTA2_RECORDS);
+
+        Map<String, Short> map3 = Collections.singletonMap("bar", (short) 1);
+        IMAGE3 = new FeaturesImage(map3, MetadataVersion.latestTesting(), 
ZkMigrationState.NONE);
     }
 
     @Test
@@ -96,6 +121,20 @@ public class FeaturesImageTest {
         testToImage(IMAGE2);
     }
 
+    @Test
+    public void testImage3RoundTrip() {
+        testToImage(IMAGE3);
+    }
+
+    @Test
+    public void testApplyDelta2() {
+        assertEquals(IMAGE3, DELTA2.apply());
+        // check image2 + delta2 = image3, since records for image2 + delta2 
might differ from records from image3
+        List<ApiMessageAndVersion> records = getImageRecords(IMAGE2);
+        records.addAll(DELTA2_RECORDS);
+        testToImage(IMAGE3, records);
+    }
+
     private static void testToImage(FeaturesImage image) {
         testToImage(image, Optional.empty());
     }
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/ImageDowngradeTest.java 
b/metadata/src/test/java/org/apache/kafka/image/ImageDowngradeTest.java
index 55323c67713..4a29779e0ac 100644
--- a/metadata/src/test/java/org/apache/kafka/image/ImageDowngradeTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/ImageDowngradeTest.java
@@ -128,6 +128,39 @@ public class ImageDowngradeTest {
                         TEST_RECORDS.get(1)));
     }
 
+    /**
+     * Test downgrading to a MetadataVersion that doesn't support ZK migration.
+     */
+    @Test
+    public void testPreZkMigrationSupportVersion() {
+        writeWithExpectedLosses(MetadataVersion.IBP_3_3_IV3,
+            Arrays.asList(
+                "the isMigratingZkBroker state of one or more brokers"),
+            Arrays.asList(
+                metadataVersionRecord(MetadataVersion.IBP_3_4_IV0),
+                new ApiMessageAndVersion(new RegisterBrokerRecord().
+                    setBrokerId(123).
+                    
setIncarnationId(Uuid.fromString("XgjKo16hRWeWrTui0iR5Nw")).
+                    setBrokerEpoch(456).
+                    setRack(null).
+                    setFenced(false).
+                    setInControlledShutdown(true).
+                    setIsMigratingZkBroker(true), (short) 2),
+                TEST_RECORDS.get(0),
+                TEST_RECORDS.get(1)),
+            Arrays.asList(
+                metadataVersionRecord(MetadataVersion.IBP_3_3_IV3),
+                new ApiMessageAndVersion(new RegisterBrokerRecord().
+                    setBrokerId(123).
+                    
setIncarnationId(Uuid.fromString("XgjKo16hRWeWrTui0iR5Nw")).
+                    setBrokerEpoch(456).
+                    setRack(null).
+                    setFenced(false).
+                    setInControlledShutdown(true), (short) 1),
+                TEST_RECORDS.get(0),
+                TEST_RECORDS.get(1)));
+    }
+
     @Test
     void testDirectoryAssignmentState() {
         MetadataVersion outputMetadataVersion = MetadataVersion.IBP_3_7_IV0;
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/ProducerIdsImageTest.java 
b/metadata/src/test/java/org/apache/kafka/image/ProducerIdsImageTest.java
index 738582fc108..32d42efda7a 100644
--- a/metadata/src/test/java/org/apache/kafka/image/ProducerIdsImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/ProducerIdsImageTest.java
@@ -53,12 +53,20 @@ public class ProducerIdsImageTest {
         DELTA1_RECORDS.add(new ApiMessageAndVersion(new ProducerIdsRecord().
             setBrokerId(3).
             setBrokerEpoch(100).
-            setNextProducerId(789), (short) 0));
+            setNextProducerId(780), (short) 0));
+        DELTA1_RECORDS.add(new ApiMessageAndVersion(new ProducerIdsRecord().
+            setBrokerId(3).
+            setBrokerEpoch(100).
+            setNextProducerId(785), (short) 0));
+        DELTA1_RECORDS.add(new ApiMessageAndVersion(new ProducerIdsRecord().
+            setBrokerId(2).
+            setBrokerEpoch(100).
+            setNextProducerId(800), (short) 0));
 
         DELTA1 = new ProducerIdsDelta(IMAGE1);
         RecordTestUtils.replayAll(DELTA1, DELTA1_RECORDS);
 
-        IMAGE2 = new ProducerIdsImage(789);
+        IMAGE2 = new ProducerIdsImage(800);
     }
 
     @Test
diff --git a/metadata/src/test/java/org/apache/kafka/image/ScramImageTest.java 
b/metadata/src/test/java/org/apache/kafka/image/ScramImageTest.java
index 038a5c956c3..26f860579e0 100644
--- a/metadata/src/test/java/org/apache/kafka/image/ScramImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/ScramImageTest.java
@@ -85,10 +85,15 @@ public class ScramImageTest {
         IMAGE1 = new ScramImage(image1mechanisms);
 
         DELTA1_RECORDS = new ArrayList<>();
+        // remove all sha512 credentials
+        DELTA1_RECORDS.add(new ApiMessageAndVersion(new 
RemoveUserScramCredentialRecord().
+            setName("alpha").
+            setMechanism(SCRAM_SHA_512.type()), (short) 0));
         DELTA1_RECORDS.add(new ApiMessageAndVersion(new 
RemoveUserScramCredentialRecord().
             setName("gamma").
             setMechanism(SCRAM_SHA_512.type()), (short) 0));
         ScramCredentialData secondAlpha256Credential = 
randomScramCredentialData(random);
+        // add sha256 credential
         DELTA1_RECORDS.add(new ApiMessageAndVersion(new 
UserScramCredentialRecord().
                 setName("alpha").
                 setMechanism(SCRAM_SHA_256.type()).
@@ -96,6 +101,15 @@ public class ScramImageTest {
                 setStoredKey(secondAlpha256Credential.storedKey()).
                 setServerKey(secondAlpha256Credential.serverKey()).
                 setIterations(secondAlpha256Credential.iterations()), (short) 
0));
+        // add sha512 credential re-using name
+        ScramCredentialData secondAlpha512Credential = 
randomScramCredentialData(random);
+        DELTA1_RECORDS.add(new ApiMessageAndVersion(new 
UserScramCredentialRecord().
+            setName("alpha").
+            setMechanism(SCRAM_SHA_512.type()).
+            setSalt(secondAlpha512Credential.salt()).
+            setStoredKey(secondAlpha512Credential.storedKey()).
+            setServerKey(secondAlpha512Credential.serverKey()).
+            setIterations(secondAlpha512Credential.iterations()), (short) 0));
         DELTA1 = new ScramDelta(IMAGE1);
         RecordTestUtils.replayAll(DELTA1, DELTA1_RECORDS);
 
@@ -107,7 +121,7 @@ public class ScramImageTest {
         image2mechanisms.put(SCRAM_SHA_256, image2sha256);
 
         Map<String, ScramCredentialData> image2sha512 = new HashMap<>();
-        image2sha512.put("alpha", image1sha512.get("alpha"));
+        image2sha512.put("alpha", secondAlpha512Credential);
         image2mechanisms.put(SCRAM_SHA_512, image2sha512);
 
         IMAGE2 = new ScramImage(image2mechanisms);

Reply via email to