This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 17e6bba20b47835e832eb59a150af49a68d1fff0
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Mon Dec 12 09:52:06 2022 -0800

    MINOR: Introduce MetadataProvenance and ImageReWriter (#12964)
    
    Introduce MetadataProvenance to encapsulate the three-tuple of (offset, 
epoch, timestamp) that is
    associated with each MetadataImage, as well as each on-disk snapshot. Also 
introduce a builder
    for MetadataDelta.
    
    Remove offset and epoch tracking from MetadataDelta. We do not really need 
to know this information
    until we are creating the final MetadataImage object. Therefore, this 
bookkeeping should be done by
    the metadata loading code, not inside the delta code, like the other 
bookkeeping. This simplifies a
    lot of tests, as well as simplifying RecordTestUtils.  It also makes more 
sense for snapshots, where
    the offset and epoch are the same for every record.
    
    Add ImageReWriter, an ImageWriter that applies records to a MetadataDelta. 
This is useful when you
    need to create a MetadataDelta object that holds the contents of a 
MetadataImage. This will be
    used in the new image loader code (coming soon).
    
    Add ImageWriterOptionsTest to test ImageWriterOptions.
    
    Reviewers: David Arthur <[email protected]>
---
 .../server/metadata/BrokerMetadataListener.scala   | 19 +++--
 .../kafka/zk/ZkMigrationIntegrationTest.scala      |  6 +-
 .../unit/kafka/server/MetadataCacheTest.scala      | 25 +++---
 .../server/ReplicaManagerConcurrencyTest.scala     |  8 +-
 .../unit/kafka/server/ReplicaManagerTest.scala     |  5 +-
 .../metadata/BrokerMetadataSnapshotterTest.scala   | 18 +++--
 .../java/org/apache/kafka/image/MetadataDelta.java | 51 +++++-------
 .../java/org/apache/kafka/image/MetadataImage.java | 26 ++++--
 .../org/apache/kafka/image/MetadataProvenance.java | 92 ++++++++++++++++++++++
 .../apache/kafka/image/MetadataVersionChange.java  | 77 ++++++++++++++++++
 .../image/MetadataVersionChangeException.java      | 37 +++++++++
 .../apache/kafka/image/writer/ImageReWriter.java   | 59 ++++++++++++++
 .../kafka/image/writer/ImageWriterOptions.java     | 17 +++-
 .../org/apache/kafka/image/ImageDowngradeTest.java |  4 +-
 .../org/apache/kafka/image/MetadataImageTest.java  | 34 ++++----
 .../kafka/image/MetadataVersionChangeTest.java     | 63 +++++++++++++++
 .../kafka/image/writer/ImageReWriterTest.java      | 63 +++++++++++++++
 .../kafka/image/writer/ImageWriterOptionsTest.java | 52 ++++++++++++
 .../org/apache/kafka/metadata/RecordTestUtils.java | 48 -----------
 19 files changed, 558 insertions(+), 146 deletions(-)

diff --git 
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala 
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
index 064f89e5917..21fc126691f 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
@@ -22,13 +22,14 @@ import java.util.concurrent.{CompletableFuture, TimeUnit}
 import kafka.metrics.KafkaMetricsGroup
 import org.apache.kafka.common.utils.{LogContext, Time}
 import org.apache.kafka.image.writer.{ImageWriterOptions, RecordListWriter}
-import org.apache.kafka.image.{MetadataDelta, MetadataImage}
+import org.apache.kafka.image.{MetadataDelta, MetadataImage, 
MetadataProvenance}
 import org.apache.kafka.metadata.util.SnapshotReason
 import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
 import org.apache.kafka.raft.{Batch, BatchReader, LeaderAndEpoch, RaftClient}
 import org.apache.kafka.server.common.ApiMessageAndVersion
 import org.apache.kafka.server.fault.FaultHandler
 import org.apache.kafka.snapshot.SnapshotReader
+
 import scala.compat.java8.OptionConverters._
 
 
@@ -79,11 +80,19 @@ class BrokerMetadataListener(
    */
   @volatile var _highestOffset = -1L
 
+  /**
+   * The highest metadata epoch that we've seen.  Written only from the event 
queue thread.
+   */
+  private var _highestEpoch = -1
+
   /**
    * The highest metadata log time that we've seen. Written only from the 
event queue thread.
    */
   private var _highestTimestamp = -1L
 
+  private def provenance(): MetadataProvenance =
+    new MetadataProvenance(_highestOffset, _highestEpoch, _highestTimestamp)
+
   /**
    * The current broker metadata image. Accessed only from the event queue 
thread.
    */
@@ -181,7 +190,7 @@ class BrokerMetadataListener(
     snapshotter.foreach { snapshotter =>
       if (metadataFaultOccurred.get()) {
         trace("Not starting metadata snapshot since we previously had an 
error")
-      } else if (snapshotter.maybeStartSnapshot(_highestTimestamp, 
_delta.apply(), reason)) {
+      } else if (snapshotter.maybeStartSnapshot(_highestTimestamp, 
_delta.apply(provenance()), reason)) {
         _bytesSinceLastSnapshot = 0L
       }
     }
@@ -259,7 +268,7 @@ class BrokerMetadataListener(
     while (iterator.hasNext) {
       val batch = iterator.next()
 
-      val epoch = lastCommittedEpoch.getOrElse(batch.epoch())
+      _highestEpoch = lastCommittedEpoch.getOrElse(batch.epoch())
       _highestTimestamp = 
lastAppendTimestamp.getOrElse(batch.appendTimestamp())
 
       var index = 0
@@ -270,7 +279,7 @@ class BrokerMetadataListener(
         }
         _highestOffset = lastCommittedOffset.getOrElse(batch.baseOffset() + 
index)
         try {
-          delta.replay(highestMetadataOffset, epoch, 
messageAndVersion.message())
+          delta.replay(messageAndVersion.message())
         } catch {
           case e: Throwable => snapshotName match {
             case None => metadataLoadingFaultHandler.handleFault(
@@ -341,7 +350,7 @@ class BrokerMetadataListener(
   private def publish(publisher: MetadataPublisher): Unit = {
     val delta = _delta
     try {
-      _image = _delta.apply()
+      _image = _delta.apply(provenance())
     } catch {
       case t: Throwable =>
         // If we cannot apply the delta, this publish event will throw and we 
will not publish a new image.
diff --git 
a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
index 54b1156ccb1..d9b2e286956 100644
--- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
@@ -23,7 +23,7 @@ import 
kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance
 import org.apache.kafka.clients.admin.NewTopic
 import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity}
-import org.apache.kafka.image.{MetadataDelta, MetadataImage}
+import org.apache.kafka.image.{MetadataDelta, MetadataImage, 
MetadataProvenance}
 import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
 import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull}
@@ -41,13 +41,13 @@ class ZkMigrationIntegrationTest {
     var offset = 0
     def accept(batch: java.util.List[ApiMessageAndVersion]): Unit = {
       batch.forEach(message => {
-        metadataDelta.replay(offset, 0, message.message())
+        metadataDelta.replay(message.message())
         offset += 1
       })
     }
 
     def verify(verifier: MetadataImage => Unit): Unit = {
-      val image = metadataDelta.apply()
+      val image = metadataDelta.apply(new MetadataProvenance(offset, 0, 0))
       verifier.apply(image)
     }
   }
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala 
b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index d92c76f7118..7dadd5bf759 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -32,9 +32,8 @@ import org.apache.kafka.common.requests.UpdateMetadataRequest
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.metadata.{BrokerRegistrationChangeRecord, 
PartitionRecord, RegisterBrokerRecord, RemoveTopicRecord, TopicRecord}
 import org.apache.kafka.common.metadata.RegisterBrokerRecord.{BrokerEndpoint, 
BrokerEndpointCollection}
-import org.apache.kafka.image.{ClusterImage, MetadataDelta, MetadataImage}
+import org.apache.kafka.image.{ClusterImage, MetadataDelta, MetadataImage, 
MetadataProvenance}
 import org.apache.kafka.server.common.MetadataVersion
-import org.apache.kafka.raft.{OffsetAndEpoch => RaftOffsetAndEpoch}
 
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.params.ParameterizedTest
@@ -65,7 +64,7 @@ object MetadataCacheTest {
         // contains no brokers, but which contains the previous partitions.
         val image = c.currentImage()
         val partialImage = new MetadataImage(
-          new RaftOffsetAndEpoch(100, 10),
+          new MetadataProvenance(100L, 10, 1000L),
           image.features(),
           ClusterImage.EMPTY,
           image.topics(),
@@ -73,7 +72,7 @@ object MetadataCacheTest {
           image.clientQuotas(),
           image.producerIds(),
           image.acls())
-        val delta = new MetadataDelta(partialImage)
+        val delta = new MetadataDelta.Builder().setImage(partialImage).build()
 
         def toRecord(broker: UpdateMetadataBroker): RegisterBrokerRecord = {
           val endpoints = new BrokerEndpointCollection()
@@ -100,7 +99,7 @@ object MetadataCacheTest {
             setFenced(fenced)
         }
         request.liveBrokers().iterator().asScala.foreach { brokerInfo =>
-          delta.replay(100, 10, toRecord(brokerInfo))
+          delta.replay(toRecord(brokerInfo))
         }
 
         def toRecords(topic: UpdateMetadataTopicState): Seq[ApiMessage] = {
@@ -125,9 +124,9 @@ object MetadataCacheTest {
           results
         }
         request.topicStates().forEach { topic =>
-          toRecords(topic).foreach(delta.replay(100, 10, _))
+          toRecords(topic).foreach(delta.replay)
         }
-        c.setImage(delta.apply())
+        c.setImage(delta.apply(new MetadataProvenance(100L, 10, 1000L)))
       }
       case _ => throw new RuntimeException("Unsupported cache type")
     }
@@ -646,12 +645,12 @@ class MetadataCacheTest {
   def testIsBrokerFenced(): Unit = {
     val metadataCache = MetadataCache.kRaftMetadataCache(0)
 
-    val delta = new MetadataDelta(MetadataImage.EMPTY)
+    val delta = new MetadataDelta.Builder().build()
     delta.replay(new RegisterBrokerRecord()
       .setBrokerId(0)
       .setFenced(false))
 
-    metadataCache.setImage(delta.apply())
+    metadataCache.setImage(delta.apply(MetadataProvenance.EMPTY))
 
     assertFalse(metadataCache.isBrokerFenced(0))
 
@@ -659,7 +658,7 @@ class MetadataCacheTest {
       .setBrokerId(0)
       .setFenced(1.toByte))
 
-    metadataCache.setImage(delta.apply())
+    metadataCache.setImage(delta.apply(MetadataProvenance.EMPTY))
 
     assertTrue(metadataCache.isBrokerFenced(0))
   }
@@ -668,12 +667,12 @@ class MetadataCacheTest {
   def testIsBrokerInControlledShutdown(): Unit = {
     val metadataCache = MetadataCache.kRaftMetadataCache(0)
 
-    val delta = new MetadataDelta(MetadataImage.EMPTY)
+    val delta = new MetadataDelta.Builder().build()
     delta.replay(new RegisterBrokerRecord()
       .setBrokerId(0)
       .setInControlledShutdown(false))
 
-    metadataCache.setImage(delta.apply())
+    metadataCache.setImage(delta.apply(MetadataProvenance.EMPTY))
 
     assertFalse(metadataCache.isBrokerShuttingDown(0))
 
@@ -681,7 +680,7 @@ class MetadataCacheTest {
       .setBrokerId(0)
       .setInControlledShutdown(1.toByte))
 
-    metadataCache.setImage(delta.apply())
+    metadataCache.setImage(delta.apply(MetadataProvenance.EMPTY))
 
     assertTrue(metadataCache.isBrokerShuttingDown(0))
   }
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
index 651451afad4..559485ff4dd 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
@@ -349,7 +349,7 @@ class ReplicaManagerConcurrencyTest {
     override def doWork(): Unit = {
       channel.poll() match {
         case InitializeEvent =>
-          val delta = new MetadataDelta(latestImage)
+          val delta = new MetadataDelta.Builder().setImage(latestImage).build()
           brokerIds.foreach { brokerId =>
             delta.replay(new RegisterBrokerRecord()
               .setBrokerId(brokerId)
@@ -357,14 +357,14 @@ class ReplicaManagerConcurrencyTest {
             )
           }
           topic.initialize(delta)
-          latestImage = delta.apply()
+          latestImage = delta.apply(latestImage.provenance())
           metadataCache.setImage(latestImage)
           replicaManager.applyDelta(delta.topicsDelta, latestImage)
 
         case AlterIsrEvent(future, topicPartition, leaderAndIsr) =>
-          val delta = new MetadataDelta(latestImage)
+          val delta = new MetadataDelta.Builder().setImage(latestImage).build()
           val updatedLeaderAndIsr = topic.alterIsr(topicPartition, 
leaderAndIsr, delta)
-          latestImage = delta.apply()
+          latestImage = delta.apply(latestImage.provenance())
           future.complete(updatedLeaderAndIsr)
           replicaManager.applyDelta(delta.topicsDelta, latestImage)
 
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 3d28f885b27..3273824ffa4 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -54,10 +54,9 @@ import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.{LogContext, Time, Utils}
 import org.apache.kafka.common.{IsolationLevel, Node, TopicIdPartition, 
TopicPartition, Uuid}
-import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, 
ConfigurationsImage, FeaturesImage, MetadataImage, ProducerIdsImage, 
TopicsDelta, TopicsImage}
+import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, 
ConfigurationsImage, FeaturesImage, MetadataImage, MetadataProvenance, 
ProducerIdsImage, TopicsDelta, TopicsImage}
 import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
 import org.apache.kafka.metadata.LeaderRecoveryState
-import org.apache.kafka.raft.{OffsetAndEpoch => RaftOffsetAndEpoch}
 import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@@ -4111,7 +4110,7 @@ class ReplicaManagerTest {
 
   private def imageFromTopics(topicsImage: TopicsImage): MetadataImage = {
     new MetadataImage(
-      new RaftOffsetAndEpoch(100, 10),
+      new MetadataProvenance(100L, 10, 1000L),
       FeaturesImage.EMPTY,
       ClusterImageTest.IMAGE1,
       topicsImage,
diff --git 
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala
 
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala
index 6849a0923b3..de3c71b04e8 100644
--- 
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala
@@ -24,7 +24,7 @@ import org.apache.kafka.common.memory.MemoryPool
 import org.apache.kafka.common.protocol.ByteBufferAccessor
 import org.apache.kafka.common.record.{CompressionType, MemoryRecords}
 import org.apache.kafka.common.utils.Time
-import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataImageTest}
+import org.apache.kafka.image.{MetadataDelta, MetadataImage, 
MetadataImageTest, MetadataProvenance}
 import org.apache.kafka.metadata.MetadataRecordSerde
 import org.apache.kafka.metadata.util.SnapshotReason
 import org.apache.kafka.queue.EventQueue
@@ -55,7 +55,7 @@ class BrokerMetadataSnapshotterTest {
       RecordsSnapshotWriter.createWithHeader(
         () => {
           Optional.of(
-            new MockRawSnapshotWriter(offsetAndEpoch, 
consumeSnapshotBuffer(committedOffset, committedEpoch))
+            new MockRawSnapshotWriter(offsetAndEpoch, 
consumeSnapshotBuffer(committedOffset, committedEpoch, lastContainedLogTime))
           )
         },
         1024,
@@ -67,7 +67,11 @@ class BrokerMetadataSnapshotterTest {
       ).asScala
     }
 
-    def consumeSnapshotBuffer(committedOffset: Long, committedEpoch: 
Int)(buffer: ByteBuffer): Unit = {
+    def consumeSnapshotBuffer(
+      committedOffset: Long,
+      committedEpoch: Int,
+      lastContainedLogTime: Long
+    )(buffer: ByteBuffer): Unit = {
       val delta = new MetadataDelta(MetadataImage.EMPTY)
       val memoryRecords = MemoryRecords.readableRecords(buffer)
       val batchIterator = memoryRecords.batchIterator()
@@ -78,11 +82,11 @@ class BrokerMetadataSnapshotterTest {
             val recordBuffer = record.value().duplicate()
             val messageAndVersion = MetadataRecordSerde.INSTANCE.read(
               new ByteBufferAccessor(recordBuffer), recordBuffer.remaining())
-            delta.replay(committedOffset, committedEpoch, 
messageAndVersion.message())
+            delta.replay(messageAndVersion.message())
           })
         }
       }
-      image.complete(delta.apply())
+      image.complete(delta.apply(new MetadataProvenance(committedOffset, 
committedEpoch, lastContainedLogTime)))
     }
   }
 
@@ -101,8 +105,8 @@ class BrokerMetadataSnapshotterTest {
       val reasons = Set(SnapshotReason.UNKNOWN)
 
       snapshotter.eventQueue.append(blockingEvent)
-      assertTrue(snapshotter.maybeStartSnapshot(10000L, 
MetadataImageTest.IMAGE1, reasons))
-      assertFalse(snapshotter.maybeStartSnapshot(11000L, 
MetadataImageTest.IMAGE2, reasons))
+      assertTrue(snapshotter.maybeStartSnapshot(2000L, 
MetadataImageTest.IMAGE1, reasons))
+      assertFalse(snapshotter.maybeStartSnapshot(4000L, 
MetadataImageTest.IMAGE2, reasons))
       blockingEvent.latch.countDown()
       assertEquals(MetadataImageTest.IMAGE1, writerBuilder.image.get())
     } finally {
diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java 
b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
index a82216769d0..3d5ee9821ae 100644
--- a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
@@ -34,26 +34,29 @@ import org.apache.kafka.common.metadata.TopicRecord;
 import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
 import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
 import org.apache.kafka.common.protocol.ApiMessage;
-import org.apache.kafka.raft.OffsetAndEpoch;
-import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.common.MetadataVersion;
 
-import java.util.Iterator;
-import java.util.List;
 import java.util.Optional;
 
 
 /**
  * A change to the broker metadata image.
- *
- * This class is thread-safe.
  */
 public final class MetadataDelta {
-    private final MetadataImage image;
+    public static class Builder {
+        private MetadataImage image = MetadataImage.EMPTY;
+
+        public Builder setImage(MetadataImage image) {
+            this.image = image;
+            return this;
+        }
 
-    private long highestOffset;
+        public MetadataDelta build() {
+            return new MetadataDelta(image);
+        }
+    }
 
-    private int highestEpoch;
+    private final MetadataImage image;
 
     private FeaturesDelta featuresDelta = null;
 
@@ -71,8 +74,6 @@ public final class MetadataDelta {
 
     public MetadataDelta(MetadataImage image) {
         this.image = image;
-        this.highestOffset = image.highestOffsetAndEpoch().offset();
-        this.highestEpoch = image.highestOffsetAndEpoch().epoch();
     }
 
     public MetadataImage image() {
@@ -152,19 +153,7 @@ public final class MetadataDelta {
         }
     }
 
-    public void read(long highestOffset, int highestEpoch, 
Iterator<List<ApiMessageAndVersion>> reader) {
-        while (reader.hasNext()) {
-            List<ApiMessageAndVersion> batch = reader.next();
-            for (ApiMessageAndVersion messageAndVersion : batch) {
-                replay(highestOffset, highestEpoch, 
messageAndVersion.message());
-            }
-        }
-    }
-
-    public void replay(long offset, int epoch, ApiMessage record) {
-        highestOffset = offset;
-        highestEpoch = epoch;
-
+    public void replay(ApiMessage record) {
         MetadataRecordType type = MetadataRecordType.fromId(record.apiKey());
         switch (type) {
             case REGISTER_BROKER_RECORD:
@@ -223,13 +212,11 @@ public final class MetadataDelta {
     }
 
     public void replay(RegisterBrokerRecord record) {
-        if (clusterDelta == null) clusterDelta = new 
ClusterDelta(image.cluster());
-        clusterDelta.replay(record);
+        getOrCreateClusterDelta().replay(record);
     }
 
     public void replay(UnregisterBrokerRecord record) {
-        if (clusterDelta == null) clusterDelta = new 
ClusterDelta(image.cluster());
-        clusterDelta.replay(record);
+        getOrCreateClusterDelta().replay(record);
     }
 
     public void replay(TopicRecord record) {
@@ -308,7 +295,7 @@ public final class MetadataDelta {
         getOrCreateAclsDelta().finishSnapshot();
     }
 
-    public MetadataImage apply() {
+    public MetadataImage apply(MetadataProvenance provenance) {
         FeaturesImage newFeatures;
         if (featuresDelta == null) {
             newFeatures = image.features();
@@ -352,7 +339,7 @@ public final class MetadataDelta {
             newAcls = aclsDelta.apply();
         }
         return new MetadataImage(
-            new OffsetAndEpoch(highestOffset, highestEpoch),
+            provenance,
             newFeatures,
             newCluster,
             newTopics,
@@ -366,9 +353,7 @@ public final class MetadataDelta {
     @Override
     public String toString() {
         return "MetadataDelta(" +
-            "highestOffset=" + highestOffset +
-            ", highestEpoch=" + highestEpoch +
-            ", featuresDelta=" + featuresDelta +
+            "featuresDelta=" + featuresDelta +
             ", clusterDelta=" + clusterDelta +
             ", topicsDelta=" + topicsDelta +
             ", configsDelta=" + configsDelta +
diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java 
b/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java
index 18c836ee3bd..36db5dab1de 100644
--- a/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java
@@ -31,7 +31,7 @@ import java.util.Objects;
  */
 public final class MetadataImage {
     public final static MetadataImage EMPTY = new MetadataImage(
-        new OffsetAndEpoch(0, 0),
+        MetadataProvenance.EMPTY,
         FeaturesImage.EMPTY,
         ClusterImage.EMPTY,
         TopicsImage.EMPTY,
@@ -40,7 +40,7 @@ public final class MetadataImage {
         ProducerIdsImage.EMPTY,
         AclsImage.EMPTY);
 
-    private final OffsetAndEpoch highestOffsetAndEpoch;
+    private final MetadataProvenance provenance;
 
     private final FeaturesImage features;
 
@@ -57,7 +57,7 @@ public final class MetadataImage {
     private final AclsImage acls;
 
     public MetadataImage(
-        OffsetAndEpoch highestOffsetAndEpoch,
+        MetadataProvenance provenance,
         FeaturesImage features,
         ClusterImage cluster,
         TopicsImage topics,
@@ -66,7 +66,7 @@ public final class MetadataImage {
         ProducerIdsImage producerIds,
         AclsImage acls
     ) {
-        this.highestOffsetAndEpoch = highestOffsetAndEpoch;
+        this.provenance = provenance;
         this.features = features;
         this.cluster = cluster;
         this.topics = topics;
@@ -86,8 +86,16 @@ public final class MetadataImage {
             acls.isEmpty();
     }
 
+    public MetadataProvenance provenance() {
+        return provenance;
+    }
+
     public OffsetAndEpoch highestOffsetAndEpoch() {
-        return highestOffsetAndEpoch;
+        return new OffsetAndEpoch(provenance.offset(), provenance.epoch());
+    }
+
+    public long offset() {
+        return provenance.offset();
     }
 
     public FeaturesImage features() {
@@ -135,7 +143,7 @@ public final class MetadataImage {
     public boolean equals(Object o) {
         if (o == null || !o.getClass().equals(this.getClass())) return false;
         MetadataImage other = (MetadataImage) o;
-        return highestOffsetAndEpoch.equals(other.highestOffsetAndEpoch) &&
+        return provenance.equals(other.provenance) &&
             features.equals(other.features) &&
             cluster.equals(other.cluster) &&
             topics.equals(other.topics) &&
@@ -147,7 +155,8 @@ public final class MetadataImage {
 
     @Override
     public int hashCode() {
-        return Objects.hash(highestOffsetAndEpoch,
+        return Objects.hash(
+            provenance,
             features,
             cluster,
             topics,
@@ -159,7 +168,8 @@ public final class MetadataImage {
 
     @Override
     public String toString() {
-        return "MetadataImage(highestOffsetAndEpoch=" + highestOffsetAndEpoch +
+        return "MetadataImage(" +
+            "provenance=" + provenance +
             ", features=" + features +
             ", cluster=" + cluster +
             ", topics=" + topics +
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/MetadataProvenance.java 
b/metadata/src/main/java/org/apache/kafka/image/MetadataProvenance.java
new file mode 100644
index 00000000000..3e65d3cfb80
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/image/MetadataProvenance.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+import java.util.Objects;
+
+
+/**
+ * Information about the source of a metadata image.
+ */
+public final class MetadataProvenance {
+    public static final MetadataProvenance EMPTY = new MetadataProvenance(-1L, 
-1, -1L);
+
+    private final long offset;
+    private final int epoch;
+    private final long lastContainedLogTimeMs;
+
+    public MetadataProvenance(
+        long offset,
+        int epoch,
+        long lastContainedLogTimeMs
+    ) {
+        this.offset = offset;
+        this.epoch = epoch;
+        this.lastContainedLogTimeMs = lastContainedLogTimeMs;
+    }
+
+    public OffsetAndEpoch offsetAndEpoch() {
+        return new OffsetAndEpoch(offset, epoch);
+    }
+
+    public long offset() {
+        return offset;
+    }
+
+    public int epoch() {
+        return epoch;
+    }
+
+    public long lastContainedLogTimeMs() {
+        return lastContainedLogTimeMs;
+    }
+
+    /**
+     * Returns the name that a snapshot with this provenance would have.
+     */
+    public String snapshotName() {
+        return String.format("snapshot %020d-%010d", offset, epoch);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || !o.getClass().equals(this.getClass())) return false;
+        MetadataProvenance other = (MetadataProvenance) o;
+        return offset == other.offset &&
+            epoch == other.epoch &&
+            lastContainedLogTimeMs == other.lastContainedLogTimeMs;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(offset,
+            epoch,
+            lastContainedLogTimeMs);
+    }
+
+    @Override
+    public String toString() {
+        return "MetadataProvenance(" +
+            "offset=" + offset +
+            ", epoch=" + epoch +
+            ", lastContainedLogTimeMs=" + lastContainedLogTimeMs +
+            ")";
+    }
+}
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/MetadataVersionChange.java 
b/metadata/src/main/java/org/apache/kafka/image/MetadataVersionChange.java
new file mode 100644
index 00000000000..d9b43a306e5
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/image/MetadataVersionChange.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.server.common.MetadataVersion;
+
+import java.util.Objects;
+
+
+/**
+ * A change in the MetadataVersion.
+ */
+public final class MetadataVersionChange {
+    private final MetadataVersion oldVersion;
+    private final MetadataVersion newVersion;
+
+    public MetadataVersionChange(
+            MetadataVersion oldVersion,
+            MetadataVersion newVersion
+    ) {
+        this.oldVersion = Objects.requireNonNull(oldVersion);
+        this.newVersion = Objects.requireNonNull(newVersion);
+    }
+
+    public MetadataVersion oldVersion() {
+        return oldVersion;
+    }
+
+    public MetadataVersion newVersion() {
+        return newVersion;
+    }
+
+    public boolean isUpgrade() {
+        return oldVersion.isLessThan(newVersion);
+    }
+
+    public boolean isDowngrade() {
+        return newVersion.isLessThan(oldVersion);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || !o.getClass().equals(this.getClass())) return false;
+        MetadataVersionChange other = (MetadataVersionChange) o;
+        return oldVersion.equals(other.oldVersion) &&
+                newVersion.equals(other.newVersion);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(oldVersion,
+                newVersion);
+    }
+
+    @Override
+    public String toString() {
+        return "MetadataVersionChange(" +
+                "oldVersion=" + oldVersion +
+                ", newVersion=" + newVersion +
+                ")";
+    }
+}
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/MetadataVersionChangeException.java
 
b/metadata/src/main/java/org/apache/kafka/image/MetadataVersionChangeException.java
new file mode 100644
index 00000000000..a4c931f77b7
--- /dev/null
+++ 
b/metadata/src/main/java/org/apache/kafka/image/MetadataVersionChangeException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+
+/**
+ * Indicates that the metadata version has changed.
+ */
+public final class MetadataVersionChangeException extends RuntimeException {
+    private static final long serialVersionUID = 1L;
+    private final MetadataVersionChange change;
+
+    public MetadataVersionChangeException(MetadataVersionChange change) {
+        super("The metadata version is changing from " + change.oldVersion() + 
" to " +
+                change.newVersion());
+        this.change = change;
+    }
+
+    public MetadataVersionChange change() {
+        return change;
+    }
+}
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/writer/ImageReWriter.java 
b/metadata/src/main/java/org/apache/kafka/image/writer/ImageReWriter.java
new file mode 100644
index 00000000000..42a0aaa93a1
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/image/writer/ImageReWriter.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.writer;
+
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+
+/**
+ * ImageReWriter writes a metadata image out to another metadata image.
+ *
+ * There are a few reasons why you might want to do this. One is to obtain a 
MetadataDelta
+ * object which contains everything in the image. Another is to translate an 
image from
+ * one metadata version to another.
+ */
+public class ImageReWriter implements ImageWriter {
+    private final MetadataDelta delta;
+    private boolean closed = false;
+    private MetadataImage image = null;
+
+    public ImageReWriter(MetadataDelta delta) {
+        this.delta = delta;
+    }
+
+    @Override
+    public void write(ApiMessageAndVersion record) {
+        if (closed) throw new ImageWriterClosedException();
+        delta.replay(record.message());
+    }
+
+    @Override
+    public void close(boolean complete) {
+        if (closed) return;
+        closed = true;
+        if (complete) {
+            image = delta.apply(delta.image().provenance());
+        }
+    }
+
+    public MetadataImage image() {
+        return image;
+    }
+}
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/writer/ImageWriterOptions.java 
b/metadata/src/main/java/org/apache/kafka/image/writer/ImageWriterOptions.java
index 635a6c77c40..0d4550932bc 100644
--- 
a/metadata/src/main/java/org/apache/kafka/image/writer/ImageWriterOptions.java
+++ 
b/metadata/src/main/java/org/apache/kafka/image/writer/ImageWriterOptions.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.image.writer;
 
+import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.server.common.MetadataVersion;
 
 import java.util.function.Consumer;
@@ -27,11 +28,19 @@ import java.util.function.Consumer;
  */
 public final class ImageWriterOptions {
     public static class Builder {
-        private MetadataVersion metadataVersion = MetadataVersion.latest();
+        private MetadataVersion metadataVersion;
         private Consumer<UnwritableMetadataException> lossHandler = e -> {
             throw e;
         };
 
+        public Builder() {
+            this.metadataVersion = MetadataVersion.latest();
+        }
+
+        public Builder(MetadataImage image) {
+            this.metadataVersion = image.features().metadataVersion();
+        }
+
         public Builder setMetadataVersion(MetadataVersion metadataVersion) {
             if 
(metadataVersion.isLessThan(MetadataVersion.MINIMUM_BOOTSTRAP_VERSION)) {
                 // When writing an image, all versions less than 3.3-IV0 are 
treated as 3.0-IV1.
@@ -43,12 +52,16 @@ public final class ImageWriterOptions {
             return this;
         }
 
-        // Package-private for testing
+        // Visible for testing
         public Builder setRawMetadataVersion(MetadataVersion metadataVersion) {
             this.metadataVersion = metadataVersion;
             return this;
         }
 
+        public MetadataVersion metadataVersion() {
+            return metadataVersion;
+        }
+
         public Builder setLossHandler(Consumer<UnwritableMetadataException> 
lossHandler) {
             this.lossHandler = lossHandler;
             return this;
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/ImageDowngradeTest.java 
b/metadata/src/test/java/org/apache/kafka/image/ImageDowngradeTest.java
index 5a7c2e6c0b6..86eca51b4c1 100644
--- a/metadata/src/test/java/org/apache/kafka/image/ImageDowngradeTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/ImageDowngradeTest.java
@@ -133,9 +133,9 @@ public class ImageDowngradeTest {
         List<ApiMessageAndVersion> expectedOutputs
     ) {
         MockLossConsumer lossConsumer = new MockLossConsumer(metadataVersion);
-        MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
+        MetadataDelta delta = new MetadataDelta.Builder().build();
         RecordTestUtils.replayAll(delta, inputs);
-        MetadataImage image = delta.apply();
+        MetadataImage image = delta.apply(MetadataProvenance.EMPTY);
         RecordListWriter writer = new RecordListWriter();
         image.write(writer, new ImageWriterOptions.Builder().
                 setRawMetadataVersion(metadataVersion).
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java 
b/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java
index 290214369cc..be21a87bd69 100644
--- a/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java
@@ -20,7 +20,6 @@ package org.apache.kafka.image;
 import org.apache.kafka.image.writer.ImageWriterOptions;
 import org.apache.kafka.image.writer.RecordListWriter;
 import org.apache.kafka.metadata.RecordTestUtils;
-import org.apache.kafka.raft.OffsetAndEpoch;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
@@ -37,7 +36,7 @@ public class MetadataImageTest {
 
     static {
         IMAGE1 = new MetadataImage(
-            new OffsetAndEpoch(100, 4),
+            new MetadataProvenance(100, 4, 2000),
             FeaturesImageTest.IMAGE1,
             ClusterImageTest.IMAGE1,
             TopicsImageTest.IMAGE1,
@@ -46,17 +45,19 @@ public class MetadataImageTest {
             ProducerIdsImageTest.IMAGE1,
             AclsImageTest.IMAGE1);
 
-        DELTA1 = new MetadataDelta(IMAGE1);
-        RecordTestUtils.replayAll(DELTA1, 200, 5, 
FeaturesImageTest.DELTA1_RECORDS);
-        RecordTestUtils.replayAll(DELTA1, 200, 5, 
ClusterImageTest.DELTA1_RECORDS);
-        RecordTestUtils.replayAll(DELTA1, 200, 5, 
TopicsImageTest.DELTA1_RECORDS);
-        RecordTestUtils.replayAll(DELTA1, 200, 5, 
ConfigurationsImageTest.DELTA1_RECORDS);
-        RecordTestUtils.replayAll(DELTA1, 200, 5, 
ClientQuotasImageTest.DELTA1_RECORDS);
-        RecordTestUtils.replayAll(DELTA1, 200, 5, 
ProducerIdsImageTest.DELTA1_RECORDS);
-        RecordTestUtils.replayAll(DELTA1, 200, 5, 
AclsImageTest.DELTA1_RECORDS);
+        DELTA1 = new MetadataDelta.Builder().
+                setImage(IMAGE1).
+                build();
+        RecordTestUtils.replayAll(DELTA1, FeaturesImageTest.DELTA1_RECORDS);
+        RecordTestUtils.replayAll(DELTA1, ClusterImageTest.DELTA1_RECORDS);
+        RecordTestUtils.replayAll(DELTA1, TopicsImageTest.DELTA1_RECORDS);
+        RecordTestUtils.replayAll(DELTA1, 
ConfigurationsImageTest.DELTA1_RECORDS);
+        RecordTestUtils.replayAll(DELTA1, 
ClientQuotasImageTest.DELTA1_RECORDS);
+        RecordTestUtils.replayAll(DELTA1, ProducerIdsImageTest.DELTA1_RECORDS);
+        RecordTestUtils.replayAll(DELTA1, AclsImageTest.DELTA1_RECORDS);
 
         IMAGE2 = new MetadataImage(
-            new OffsetAndEpoch(200, 5),
+            new MetadataProvenance(200, 5, 4000),
             FeaturesImageTest.IMAGE2,
             ClusterImageTest.IMAGE2,
             TopicsImageTest.IMAGE2,
@@ -78,7 +79,7 @@ public class MetadataImageTest {
 
     @Test
     public void testApplyDelta1() throws Throwable {
-        assertEquals(IMAGE2, DELTA1.apply());
+        assertEquals(IMAGE2, DELTA1.apply(IMAGE2.provenance()));
     }
 
     @Test
@@ -88,13 +89,10 @@ public class MetadataImageTest {
 
     private void testToImageAndBack(MetadataImage image) throws Throwable {
         RecordListWriter writer = new RecordListWriter();
-        image.write(writer, new ImageWriterOptions.Builder().build());
+        image.write(writer, new ImageWriterOptions.Builder(image).build());
         MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
-        RecordTestUtils.replayAll(delta,
-                image.highestOffsetAndEpoch().offset(),
-                image.highestOffsetAndEpoch().epoch(),
-                writer.records());
-        MetadataImage nextImage = delta.apply();
+        RecordTestUtils.replayAll(delta, writer.records());
+        MetadataImage nextImage = delta.apply(image.provenance());
         assertEquals(image, nextImage);
     }
 }
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/MetadataVersionChangeTest.java 
b/metadata/src/test/java/org/apache/kafka/image/MetadataVersionChangeTest.java
new file mode 100644
index 00000000000..bef49adc37d
--- /dev/null
+++ 
b/metadata/src/test/java/org/apache/kafka/image/MetadataVersionChangeTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV1;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV0;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+@Timeout(value = 40)
+public class MetadataVersionChangeTest {
+    private static final Logger log = 
LoggerFactory.getLogger(MetadataVersionChangeTest.class);
+
+    private final static MetadataVersionChange CHANGE_3_0_IV1_TO_3_3_IV0 =
+        new MetadataVersionChange(IBP_3_0_IV1, IBP_3_3_IV0);
+
+    private final static MetadataVersionChange CHANGE_3_3_IV0_TO_3_0_IV1 =
+        new MetadataVersionChange(IBP_3_3_IV0, IBP_3_0_IV1);
+
+    @Test
+    public void testIsUpgrade() throws Throwable {
+        assertTrue(CHANGE_3_0_IV1_TO_3_3_IV0.isUpgrade());
+        assertFalse(CHANGE_3_3_IV0_TO_3_0_IV1.isUpgrade());
+    }
+
+    @Test
+    public void testIsDowngrade() throws Throwable {
+        assertFalse(CHANGE_3_0_IV1_TO_3_3_IV0.isDowngrade());
+        assertTrue(CHANGE_3_3_IV0_TO_3_0_IV1.isDowngrade());
+    }
+
+    @Test
+    public void testMetadataVersionChangeExceptionToString() throws Throwable {
+        assertEquals("org.apache.kafka.image.MetadataVersionChangeException: 
The metadata " +
+            "version is changing from 3.0-IV1 to 3.3-IV0",
+                new 
MetadataVersionChangeException(CHANGE_3_0_IV1_TO_3_3_IV0).toString());
+        assertEquals("org.apache.kafka.image.MetadataVersionChangeException: 
The metadata " +
+            "version is changing from 3.3-IV0 to 3.0-IV1",
+                new 
MetadataVersionChangeException(CHANGE_3_3_IV0_TO_3_0_IV1).toString());
+    }
+}
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/writer/ImageReWriterTest.java 
b/metadata/src/test/java/org/apache/kafka/image/writer/ImageReWriterTest.java
new file mode 100644
index 00000000000..640924fe076
--- /dev/null
+++ 
b/metadata/src/test/java/org/apache/kafka/image/writer/ImageReWriterTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.writer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.image.MetadataDelta;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import static org.apache.kafka.metadata.RecordTestUtils.testRecord;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+
+@Timeout(value = 40)
+public class ImageReWriterTest {
+    @Test
+    public void testWrite() {
+        MetadataDelta delta = new MetadataDelta.Builder().build();
+        ImageReWriter writer = new ImageReWriter(delta);
+        writer.write(testRecord(0));
+        writer.write(testRecord(1));
+        writer.close(true);
+        assertEquals(2, delta.getOrCreateTopicsDelta().changedTopics().size());
+        assertEquals(2, writer.image().topics().topicsById().size());
+    }
+
+    @Test
+    public void testCloseWithoutFreeze() {
+        MetadataDelta delta = new MetadataDelta.Builder().build();
+        ImageReWriter writer = new ImageReWriter(delta);
+        writer.close();
+        assertNull(writer.image());
+    }
+
+    @Test
+    public void testWriteAfterClose() {
+        MetadataDelta delta = new MetadataDelta.Builder().build();
+        ImageReWriter writer = new ImageReWriter(delta);
+        writer.close(true);
+        assertThrows(ImageWriterClosedException.class, () ->
+                writer.write(0, new TopicRecord().
+                        setName("foo").
+                        
setTopicId(Uuid.fromString("3B134hrsQgKtz8Sp6QBIfg"))));
+    }
+}
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/writer/ImageWriterOptionsTest.java
 
b/metadata/src/test/java/org/apache/kafka/image/writer/ImageWriterOptionsTest.java
new file mode 100644
index 00000000000..07ddd4ff8cb
--- /dev/null
+++ 
b/metadata/src/test/java/org/apache/kafka/image/writer/ImageWriterOptionsTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.writer;
+
+import org.apache.kafka.server.common.MetadataVersion;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+
+@Timeout(value = 40)
+public class ImageWriterOptionsTest {
+    @Test
+    public void testDefaultLossHandler() {
+        ImageWriterOptions options = new ImageWriterOptions.Builder().build();
+        assertEquals("stuff", assertThrows(UnwritableMetadataException.class,
+                () -> options.handleLoss("stuff")).loss());
+    }
+
+    @Test
+    public void testSetMetadataVersion() {
+        for (int i = MetadataVersion.MINIMUM_KRAFT_VERSION.ordinal();
+                 i < MetadataVersion.VERSIONS.length;
+                 i++) {
+            MetadataVersion version = MetadataVersion.VERSIONS[i];
+            ImageWriterOptions.Builder options = new 
ImageWriterOptions.Builder().
+                    setMetadataVersion(version);
+            if (i < MetadataVersion.MINIMUM_BOOTSTRAP_VERSION.ordinal()) {
+                assertEquals(MetadataVersion.MINIMUM_KRAFT_VERSION, 
options.metadataVersion());
+            } else {
+                assertEquals(version, options.metadataVersion());
+            }
+        }
+    }
+}
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java 
b/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
index 7fd2a29d5b1..04da77b0fc3 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.protocol.ApiMessage;
 import org.apache.kafka.common.protocol.Message;
 import org.apache.kafka.common.protocol.ObjectSerializationCache;
 import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
-import org.apache.kafka.image.MetadataDelta;
 import org.apache.kafka.raft.Batch;
 import org.apache.kafka.raft.BatchReader;
 import org.apache.kafka.raft.internals.MemoryBatchReader;
@@ -57,14 +56,6 @@ public class RecordTestUtils {
      */
     public static void replayAll(Object target,
                                  List<ApiMessageAndVersion> 
recordsAndVersions) {
-        if (target instanceof MetadataDelta) {
-            MetadataDelta delta = (MetadataDelta) target;
-            replayAll(delta,
-                    delta.image().highestOffsetAndEpoch().offset(),
-                    delta.image().highestOffsetAndEpoch().epoch(),
-                    recordsAndVersions);
-            return;
-        }
         for (ApiMessageAndVersion recordAndVersion : recordsAndVersions) {
             ApiMessage record = recordAndVersion.message();
             try {
@@ -96,26 +87,6 @@ public class RecordTestUtils {
         }
     }
 
-    /**
-     * Replay a list of records to the metadata delta.
-     *
-     * @param delta the metadata delta on which to replay the records
-     * @param highestOffset highest offset from the list of records
-     * @param highestEpoch highest epoch from the list of records
-     * @param recordsAndVersions list of records
-     */
-    public static void replayAll(
-        MetadataDelta delta,
-        long highestOffset,
-        int highestEpoch,
-        List<ApiMessageAndVersion> recordsAndVersions
-    ) {
-        for (ApiMessageAndVersion recordAndVersion : recordsAndVersions) {
-            ApiMessage record = recordAndVersion.message();
-            delta.replay(highestOffset, highestEpoch, record);
-        }
-    }
-
     /**
      * Replay a list of record batches.
      *
@@ -129,25 +100,6 @@ public class RecordTestUtils {
         }
     }
 
-    /**
-     * Replay a list of record batches to the metadata delta.
-     *
-     * @param delta the metadata delta on which to replay the records
-     * @param highestOffset highest offset from the list of record batches
-     * @param highestEpoch highest epoch from the list of record batches
-     * @param batches list of batches of records
-     */
-    public static void replayAllBatches(
-        MetadataDelta delta,
-        long highestOffset,
-        int highestEpoch,
-        List<List<ApiMessageAndVersion>> batches
-    ) {
-        for (List<ApiMessageAndVersion> batch : batches) {
-            replayAll(delta, highestOffset, highestEpoch, batch);
-        }
-    }
-
     /**
      * Materialize the output of an iterator into a set.
      *

Reply via email to