cmccabe commented on code in PR #12715:
URL: https://github.com/apache/kafka/pull/12715#discussion_r991501134


##########
metadata/src/main/java/org/apache/kafka/image/MetadataImage.java:
##########
@@ -120,17 +116,17 @@ public AclsImage acls() {
         return acls;
     }
 
-    public void write(Consumer<List<ApiMessageAndVersion>> out) {
-        MetadataVersion metadataVersion = features.metadataVersion();
+    public void write(ImageWriter writer, ImageWriterOptions options) {
         // Features should be written out first so we can include the 
metadata.version at the beginning of the
         // snapshot
-        features.write(out);
-        cluster.write(out, metadataVersion);
-        topics.write(out);
-        configs.write(out);
-        clientQuotas.write(out);
-        producerIds.write(out);
-        acls.write(out);
+        features.write(writer, options);
+        cluster.write(writer, options);
+        topics.write(writer, options);
+        configs.write(writer, options);
+        clientQuotas.write(writer, options);
+        producerIds.write(writer, options);
+        acls.write(writer, options);
+        writer.freeze();

Review Comment:
   I have revised this code so that there is just a close function which takes 
a boolean. We call it here.



##########
metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java:
##########
@@ -65,24 +64,43 @@ private Optional<Short> finalizedVersion(String feature) {
         return Optional.ofNullable(finalizedVersions.get(feature));
     }
 
-    public void write(Consumer<List<ApiMessageAndVersion>> out) {
-        List<ApiMessageAndVersion> batch = new ArrayList<>();
-        if 
(!metadataVersion.isLessThan(MetadataVersion.MINIMUM_BOOTSTRAP_VERSION)) {
-            // Write out the metadata.version record first, and then the rest 
of the finalized features
-            batch.add(new ApiMessageAndVersion(new FeatureLevelRecord().
-                    setName(MetadataVersion.FEATURE_NAME).
-                    setFeatureLevel(metadataVersion.featureLevel()), 
FEATURE_LEVEL_RECORD.lowestSupportedVersion()));
+    public void write(ImageWriter writer, ImageWriterOptions options) {
+        if 
(options.metadataVersion().isLessThan(MetadataVersion.MINIMUM_BOOTSTRAP_VERSION))
 {
+            writePreProductionVersion(writer, options);
+        } else {
+            writeProductionVersion(writer, options);
         }
+    }
+
+    private void writePreProductionVersion(ImageWriter writer, 
ImageWriterOptions options) {

Review Comment:
   ok



##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##########
@@ -390,14 +404,18 @@ class BrokerMetadataListener(
   }
 
   class GetImageRecordsEvent(future: 
CompletableFuture[util.List[ApiMessageAndVersion]])
-      extends EventQueue.FailureLoggingEvent(log) with 
Consumer[util.List[ApiMessageAndVersion]] {
-    val records = new util.ArrayList[ApiMessageAndVersion]()
-    override def accept(batch: util.List[ApiMessageAndVersion]): Unit = {
-      records.addAll(batch)
-    }
-
+      extends EventQueue.FailureLoggingEvent(log) {
     override def run(): Unit = {
-      _image.write(this)
+      val records = new util.ArrayList[ApiMessageAndVersion]
+      val writer = new RecordListWriter(records)
+      val options = new ImageWriterOptions.Builder().
+        setMetadataVersion(_image.features().metadataVersion()).

Review Comment:
   let's defer this to later. there could be some complexity in doing the 
fallback.



##########
metadata/src/main/java/org/apache/kafka/image/ImageWriterOptions.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.server.common.MetadataVersion;
+
+import java.util.function.Consumer;
+
+
+/**
+ * The options to use when writing an image.
+ */
+public final class ImageWriterOptions {
+    public static class Builder {
+        private MetadataVersion metadataVersion = MetadataVersion.latest();
+        private Consumer<UnwritableMetadataException> lossHandler = e -> {
+            throw e;
+        };
+
+        public Builder setMetadataVersion(MetadataVersion metadataVersion) {
+            if 
(metadataVersion.isLessThan(MetadataVersion.MINIMUM_BOOTSTRAP_VERSION)) {
+                // When writing an image, all versions less than 3.3-IV0 are 
treated as 3.0-IV1.
+                // This is because those versions don't support 
FeatureLevelRecord.
+                setRawMetadataVersion(MetadataVersion.MINIMUM_KRAFT_VERSION);
+            } else {
+                setRawMetadataVersion(metadataVersion);
+            }
+            return this;
+        }
+
+        // Package-private for testing
+        Builder setRawMetadataVersion(MetadataVersion metadataVersion) {
+            this.metadataVersion = metadataVersion;
+            return this;
+        }
+
+        public Builder setLossHandler(Consumer<UnwritableMetadataException> 
lossHandler) {
+            this.lossHandler = lossHandler;
+            return this;
+        }
+
+        public ImageWriterOptions build() {
+            return new ImageWriterOptions(metadataVersion, lossHandler);
+        }
+    }
+
+    private final MetadataVersion metadataVersion;
+    private final Consumer<UnwritableMetadataException> lossHandler;
+
+    private ImageWriterOptions(
+        MetadataVersion metadataVersion,
+        Consumer<UnwritableMetadataException> lossHandler
+    ) {
+        this.metadataVersion = metadataVersion;
+        this.lossHandler = lossHandler;
+    }
+
+    public MetadataVersion metadataVersion() {
+        return metadataVersion;
+    }
+
+    public void handleLoss(String loss) {
+        lossHandler.accept(new UnwritableMetadataException(metadataVersion, 
loss));
+    }
+}

Review Comment:
   ok



##########
metadata/src/main/java/org/apache/kafka/image/AclsImage.java:
##########
@@ -53,13 +49,15 @@ public Map<Uuid, StandardAcl> acls() {
         return acls;
     }
 
-    public void write(Consumer<List<ApiMessageAndVersion>> out) {
-        List<ApiMessageAndVersion> batch = new ArrayList<>();
+    public void write(ImageWriter writer, ImageWriterOptions options) {
+        // Technically, AccessControlEntryRecord appeared in 3.2-IV0, so we 
should not write it if
+        // the output version is less than that. However, there is a problem: 
pre-production KRaft
+        // images didn't support FeatureLevelRecord, so we can't distinguish 
3.2-IV0 from 3.0-IV1.
+        // The least bad way to resolve this is just to pretend that ACLs were 
in 3.0-IV1.

Review Comment:
   yes



##########
metadata/src/main/java/org/apache/kafka/image/ImageProvenance.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import java.util.Objects;
+
+
+/**
+ * The source of a metadata image.
+ */
+public final class ImageProvenance {
+    public static final ImageProvenance EMPTY = new ImageProvenance("the empty 
image",
+            0L,
+            0,
+            0L);
+
+    private final String source;
+    private final long highestOffset;
+    private final int highestEpoch;
+    private final long highestTimestamp;
+
+    public ImageProvenance(
+        String source,
+        long highestOffset,
+        int highestEpoch,
+        long highestTimestamp

Review Comment:
   I removed this from the latest



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to