jsancio commented on code in PR #12715:
URL: https://github.com/apache/kafka/pull/12715#discussion_r989546700


##########
metadata/src/main/java/org/apache/kafka/image/ImageWriter.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+
+/**
+ * Writes out a metadata image.
+ */
+public interface ImageWriter extends AutoCloseable {
+    /**
+     * Write a record.
+     *
+     * @param version       The version of the record to write out.
+     *                      For convenience, this is an int.
+     * @param message       The message of the record to write out.
+     */
+    default void write(int version, ApiMessage message) {

Review Comment:
   Why is this convenient? Is this because we use the literal `0` which has a 
type of `int`? If we replace the literal zero with a symbol that has a type of 
`short`, do we still need this convenience? 



##########
metadata/src/main/java/org/apache/kafka/image/AclsImage.java:
##########
@@ -53,13 +49,15 @@ public Map<Uuid, StandardAcl> acls() {
         return acls;
     }
 
-    public void write(Consumer<List<ApiMessageAndVersion>> out) {
-        List<ApiMessageAndVersion> batch = new ArrayList<>();
+    public void write(ImageWriter writer, ImageWriterOptions options) {
+        // Technically, AccessControlEntryRecord appeared in 3.2-IV0, so we 
should not write it if
+        // the output version is less than that. However, there is a problem: 
pre-production KRaft
+        // images didn't support FeatureLevelRecord, so we can't distinguish 
3.2-IV0 from 3.0-IV1.
+        // The least bad way to resolve this is just to pretend that ACLs were 
in 3.0-IV1.

Review Comment:
   I don't fully understand this comment. Are you saying that Kafka doesn't 
support clusters that have some servers at software version 3.0.x and some 
servers, specially the active controller, at software version 3.2.x?
   
   So if the software version of this server is 3.2.x then we need to assume 
that the software version of every server is 3.2.x?



##########
metadata/src/main/java/org/apache/kafka/image/MetadataImage.java:
##########
@@ -120,17 +116,17 @@ public AclsImage acls() {
         return acls;
     }
 
-    public void write(Consumer<List<ApiMessageAndVersion>> out) {
-        MetadataVersion metadataVersion = features.metadataVersion();
+    public void write(ImageWriter writer, ImageWriterOptions options) {
         // Features should be written out first so we can include the 
metadata.version at the beginning of the
         // snapshot
-        features.write(out);
-        cluster.write(out, metadataVersion);
-        topics.write(out);
-        configs.write(out);
-        clientQuotas.write(out);
-        producerIds.write(out);
-        acls.write(out);
+        features.write(writer, options);
+        cluster.write(writer, options);
+        topics.write(writer, options);
+        configs.write(writer, options);
+        clientQuotas.write(writer, options);
+        producerIds.write(writer, options);
+        acls.write(writer, options);
+        writer.freeze();

Review Comment:
   It feels awkward that this layer or code calls `freeze` but `close` is 
called by the layer above. I am leaning toward moving this call to `freeze` to 
the caller(s).



##########
metadata/src/main/java/org/apache/kafka/image/ImageWriterOptions.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.server.common.MetadataVersion;
+
+import java.util.function.Consumer;
+
+
+/**
+ * The options to use when writing an image.
+ */
+public final class ImageWriterOptions {
+    public static class Builder {
+        private MetadataVersion metadataVersion = MetadataVersion.latest();
+        private Consumer<UnwritableMetadataException> lossHandler = e -> {
+            throw e;
+        };
+
+        public Builder setMetadataVersion(MetadataVersion metadataVersion) {
+            if 
(metadataVersion.isLessThan(MetadataVersion.MINIMUM_BOOTSTRAP_VERSION)) {
+                // When writing an image, all versions less than 3.3-IV0 are 
treated as 3.0-IV1.
+                // This is because those versions don't support 
FeatureLevelRecord.
+                setRawMetadataVersion(MetadataVersion.MINIMUM_KRAFT_VERSION);

Review Comment:
   I have a couple of questions about this.
   
   First, it looks like the controller only writes the metadata version to the 
log if it is greater than the minimum bootstrap version so this should never be 
true, right?
   ```
                 // Write the metadata.version first                            
                                               
                 if (!wroteVersion) {                                           
                                               
                     if (metadataVersion.isAtLeast(minimumBootstrapVersion)) {  
                                               
                         wroteVersion = true;                                   
                                               
                         return Collections.singletonList(new 
ApiMessageAndVersion(new FeatureLevelRecord()                    
                                 .setName(MetadataVersion.FEATURE_NAME)         
                                               
                                 
.setFeatureLevel(metadataVersion.featureLevel()), 
FEATURE_LEVEL_RECORD.lowestSupportedVersion(  )));                              
                                                                                
        
                     }                                                          
                                               
                 }
   ```
   
   Second, does this mean that the user cannot incrementally enable features 
introduced between version 3.0 and 3.3. They have to set the version to 3.3. It 
doesn't look like the controller checks this when accepting metadata version 
updates. In other words it looks like the use can set the version 3.2 but it 
won't have an effect on the metadata version of the cluster. Is this accurate?



##########
metadata/src/main/java/org/apache/kafka/image/ImageWriterOptions.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.server.common.MetadataVersion;
+
+import java.util.function.Consumer;
+
+
+/**
+ * The options to use when writing an image.
+ */
+public final class ImageWriterOptions {
+    public static class Builder {
+        private MetadataVersion metadataVersion = MetadataVersion.latest();
+        private Consumer<UnwritableMetadataException> lossHandler = e -> {
+            throw e;
+        };
+
+        public Builder setMetadataVersion(MetadataVersion metadataVersion) {
+            if 
(metadataVersion.isLessThan(MetadataVersion.MINIMUM_BOOTSTRAP_VERSION)) {
+                // When writing an image, all versions less than 3.3-IV0 are 
treated as 3.0-IV1.
+                // This is because those versions don't support 
FeatureLevelRecord.
+                setRawMetadataVersion(MetadataVersion.MINIMUM_KRAFT_VERSION);
+            } else {
+                setRawMetadataVersion(metadataVersion);
+            }
+            return this;
+        }
+
+        // Package-private for testing
+        Builder setRawMetadataVersion(MetadataVersion metadataVersion) {
+            this.metadataVersion = metadataVersion;
+            return this;
+        }
+
+        public Builder setLossHandler(Consumer<UnwritableMetadataException> 
lossHandler) {
+            this.lossHandler = lossHandler;
+            return this;
+        }
+
+        public ImageWriterOptions build() {
+            return new ImageWriterOptions(metadataVersion, lossHandler);
+        }
+    }
+
+    private final MetadataVersion metadataVersion;
+    private final Consumer<UnwritableMetadataException> lossHandler;
+
+    private ImageWriterOptions(
+        MetadataVersion metadataVersion,
+        Consumer<UnwritableMetadataException> lossHandler
+    ) {
+        this.metadataVersion = metadataVersion;
+        this.lossHandler = lossHandler;
+    }
+
+    public MetadataVersion metadataVersion() {
+        return metadataVersion;
+    }
+
+    public void handleLoss(String loss) {
+        lossHandler.accept(new UnwritableMetadataException(metadataVersion, 
loss));
+    }
+}

Review Comment:
   Minor; add new line character.



##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##########
@@ -390,15 +389,18 @@ class BrokerMetadataListener(
   }
 
   class GetImageRecordsEvent(future: 
CompletableFuture[util.List[ApiMessageAndVersion]])
-      extends EventQueue.FailureLoggingEvent(log) with 
Consumer[util.List[ApiMessageAndVersion]] {
-    val records = new util.ArrayList[ApiMessageAndVersion]()
-    override def accept(batch: util.List[ApiMessageAndVersion]): Unit = {
-      records.addAll(batch)
-    }
-
+      extends EventQueue.FailureLoggingEvent(log) {
     override def run(): Unit = {
-      _image.write(this)
-      future.complete(records)
+      val writer = new InMemoryImageWriter()
+      val options = new ImageWriterOptions.Builder().
+        setMetadataVersion(_image.features().metadataVersion()).
+        build()
+      try {
+        _image.write(writer, options)
+      } finally {
+        writer.close()
+      }

Review Comment:
   How about using Java's "try with resource"?



##########
metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java:
##########
@@ -65,24 +64,62 @@ private Optional<Short> finalizedVersion(String feature) {
         return Optional.ofNullable(finalizedVersions.get(feature));
     }
 
-    public void write(Consumer<List<ApiMessageAndVersion>> out) {
-        List<ApiMessageAndVersion> batch = new ArrayList<>();
-        if 
(!metadataVersion.isLessThan(MetadataVersion.MINIMUM_BOOTSTRAP_VERSION)) {
-            // Write out the metadata.version record first, and then the rest 
of the finalized features
-            batch.add(new ApiMessageAndVersion(new FeatureLevelRecord().
-                    setName(MetadataVersion.FEATURE_NAME).
-                    setFeatureLevel(metadataVersion.featureLevel()), 
FEATURE_LEVEL_RECORD.lowestSupportedVersion()));
+    public void write(ImageWriter writer, ImageWriterOptions options) {
+        if 
(options.metadataVersion().isLessThan(MetadataVersion.MINIMUM_BOOTSTRAP_VERSION))
 {
+            writePreProductionVersion(writer, options);
+        } else {
+            writeProductionVersion(writer, options);
         }
+    }
+
+    private void writePreProductionVersion(ImageWriter writer, 
ImageWriterOptions options) {
+        // If the metadata version is older than 3.3-IV0, we can't represent 
any feature flags,
+        // because the FeatureLevel record is not supported.
+        if (!finalizedVersions.isEmpty()) {
+            List<String> features = new 
ArrayList<>(finalizedVersions.keySet());
+            features.sort(String::compareTo);
+            options.handleLoss("feature flag(s): " +
+                    features.stream().collect(Collectors.joining(", ")));
+        }
+        // With this old metadata version, we can't even represent the 
metadata version itself.
+        // However, we do want to put something into the snapshot that 
indicates the version,
+        // for human consumption. The following two records set a topic config 
on __cluster_metadata
+        // and then clear it, in order to accomplish that. (ConfigRecord goes 
back to the earliest
+        // stable KRaft releases.)
+        //
+        // Another reason we do this here is that it allows us to maintain the 
invariant that every
+        // MetadataImage generates at least one record when serialized -- even 
if the image is
+        // empty.
+        writer.write(0, new ConfigRecord().
+            setResourceType(ResourceType.TOPIC.code()).
+            setResourceName("__cluster_metadata").
+            setName("metadata.version").
+            setValue(options.metadataVersion().toString()));
+        writer.write(0, new ConfigRecord().
+            setResourceType(ResourceType.TOPIC.code()).
+            setResourceName("__cluster_metadata").
+            setName("metadata.version").
+            setValue(null));
+    }
 
+    private void writeProductionVersion(ImageWriter writer, ImageWriterOptions 
options) {
+        // It is important to write out the metadata.version record first, 
because it may have an
+        // impact on how we decode records that come after it.
+        //
+        // Note: it's important that this initial FeatureLevelRecord be 
written with version 0 and
+        // not any later version, so that any modern reader can process it.

Review Comment:
   Interesting. How about writing a similar comment in 
`FeatureLevelRecord.json` stating that 0 should always be a valid version? 
Actually, maybe this is true for all metadata records. Zero must always be a 
valid version for all metadata records.



##########
metadata/src/main/java/org/apache/kafka/image/AclsImage.java:
##########
@@ -53,13 +49,15 @@ public Map<Uuid, StandardAcl> acls() {
         return acls;
     }
 
-    public void write(Consumer<List<ApiMessageAndVersion>> out) {
-        List<ApiMessageAndVersion> batch = new ArrayList<>();
+    public void write(ImageWriter writer, ImageWriterOptions options) {
+        // Technically, AccessControlEntryRecord appeared in 3.2-IV0, so we 
should not write it if
+        // the output version is less than that. However, there is a problem: 
pre-production KRaft
+        // images didn't support FeatureLevelRecord, so we can't distinguish 
3.2-IV0 from 3.0-IV1.
+        // The least bad way to resolve this is just to pretend that ACLs were 
in 3.0-IV1.
         for (Entry<Uuid, StandardAcl> entry : acls.entrySet()) {
             StandardAclWithId aclWithId = new 
StandardAclWithId(entry.getKey(), entry.getValue());
-            batch.add(new ApiMessageAndVersion(aclWithId.toRecord(), (short) 
0));
+            writer.write(0, aclWithId.toRecord());

Review Comment:
   This comment applies to a lot of places.
   
   Should we change `0` to a symbol? This would make it a little easier to 
identify all of the places that need to change if a developers added a new 
metadata record version.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to