jsancio commented on code in PR #12715: URL: https://github.com/apache/kafka/pull/12715#discussion_r993773605
########## core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala: ########## @@ -179,8 +184,13 @@ class BrokerMetadataListener( snapshotter.foreach { snapshotter => if (metadataFaultOccurred.get()) { trace("Not starting metadata snapshot since we previously had an error") - } else if (snapshotter.maybeStartSnapshot(_highestTimestamp, _delta.apply(), reason)) { - _bytesSinceLastSnapshot = 0L + } else { + val newProvenance = new ImageProvenance("generated snapshot", + _highestOffset, _highestEpoch, _highestTimestamp) + val newImage = _delta.apply(newProvenance) Review Comment: Minor but how about `snapshotImage` instead of `newImage`. ########## metadata/src/main/java/org/apache/kafka/image/writer/ImageReWriter.java: ########## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image.writer; + +import org.apache.kafka.image.ImageProvenance; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.server.common.ApiMessageAndVersion; + + +/** + * ImageReWriter writes a metadata image out to another metadata image. + * + * There are a few reasons why you might want to do this. One is to obtain a MetadataDelta + * object which contains everything in the image. Another is to translate an image from + * one metadata version to another. + */ +public class ImageReWriter implements ImageWriter { + private final String newSource; + private final MetadataDelta delta; + private boolean closed = false; + private MetadataImage image = null; + + public ImageReWriter( + String newSource, + MetadataDelta delta + ) { Review Comment: Hmm. Maybe I missed it but doesn't look like this class is used anywhere. I was interested in its usage because it wasn't clear to me when it would be useful to pass in the delta. Instead of starting with a empty delta from an empty or non-empty image. ########## core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala: ########## @@ -390,14 +404,18 @@ class BrokerMetadataListener( } class GetImageRecordsEvent(future: CompletableFuture[util.List[ApiMessageAndVersion]]) - extends EventQueue.FailureLoggingEvent(log) with Consumer[util.List[ApiMessageAndVersion]] { - val records = new util.ArrayList[ApiMessageAndVersion]() - override def accept(batch: util.List[ApiMessageAndVersion]): Unit = { - records.addAll(batch) - } - + extends EventQueue.FailureLoggingEvent(log) { override def run(): Unit = { - _image.write(this) + val records = new util.ArrayList[ApiMessageAndVersion] + val writer = new RecordListWriter(records) + val options = new ImageWriterOptions.Builder(). + setMetadataVersion(_image.features().metadataVersion()). Review Comment: This comment also applies to it used in the snapshotter. Outside of tests it looks like this option is always set to metadata version in the metadata image. What do you think about changing the API so that setting the metadata version is optional. If the metadata version is not set then `MetadataImage.write` will set the metadata version to the version contained in the image? ########## metadata/src/main/java/org/apache/kafka/image/AclsImage.java: ########## @@ -53,13 +49,15 @@ public Map<Uuid, StandardAcl> acls() { return acls; } - public void write(Consumer<List<ApiMessageAndVersion>> out) { - List<ApiMessageAndVersion> batch = new ArrayList<>(); + public void write(ImageWriter writer, ImageWriterOptions options) { + // Technically, AccessControlEntryRecord appeared in 3.2-IV0, so we should not write it if + // the output version is less than that. However, there is a problem: pre-production KRaft + // images didn't support FeatureLevelRecord, so we can't distinguish 3.2-IV0 from 3.0-IV1. + // The least bad way to resolve this is just to pretend that ACLs were in 3.0-IV1. Review Comment: I see. Kafka would only write ACLs records to the snapshot if they existed in the log originally and got replayed to the `AclsImage`. The question is then downgrade. Are you saying that downgrading the `metadata.version` doesn't remove ACLs records. ########## metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java: ########## @@ -65,24 +64,43 @@ private Optional<Short> finalizedVersion(String feature) { return Optional.ofNullable(finalizedVersions.get(feature)); } - public void write(Consumer<List<ApiMessageAndVersion>> out) { - List<ApiMessageAndVersion> batch = new ArrayList<>(); - if (!metadataVersion.isLessThan(MetadataVersion.MINIMUM_BOOTSTRAP_VERSION)) { - // Write out the metadata.version record first, and then the rest of the finalized features - batch.add(new ApiMessageAndVersion(new FeatureLevelRecord(). - setName(MetadataVersion.FEATURE_NAME). - setFeatureLevel(metadataVersion.featureLevel()), FEATURE_LEVEL_RECORD.lowestSupportedVersion())); + public void write(ImageWriter writer, ImageWriterOptions options) { + if (options.metadataVersion().isLessThan(MetadataVersion.MINIMUM_BOOTSTRAP_VERSION)) { + writePreProductionVersion(writer, options); + } else { + writeProductionVersion(writer, options); } + } + + private void writePreProductionVersion(ImageWriter writer, ImageWriterOptions options) { Review Comment: Instead of `writePreProductionVersion` and `writeProductionVersion` can use a more focused name? How about `handleFeatureLevelNotSupported` and `writeFeatureLevels` respectively? ########## metadata/src/main/java/org/apache/kafka/image/writer/ImageWriter.java: ########## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image.writer; + +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.server.common.ApiMessageAndVersion; + + +/** + * Writes out a metadata image. + */ +public interface ImageWriter { Review Comment: It looks like this class can extend from `AutoCloseable`. ########## metadata/src/test/java/org/apache/kafka/metadata/MockRandom.java: ########## @@ -15,16 +15,26 @@ * limitations under the License. */ -package org.apache.kafka.metadata; +package org.apache.kafka.server.util; Review Comment: This package doesn't match the file path. Did you mean to move this file to a different project and package? ########## metadata/src/main/java/org/apache/kafka/image/ImageProvenance.java: ########## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image; + +import java.util.Objects; + + +/** + * The source of a metadata image. + */ +public final class ImageProvenance { + public static final ImageProvenance EMPTY = new ImageProvenance("the empty image", + 0L, + 0, + 0L); + + private final String source; + private final long highestOffset; + private final int highestEpoch; + private final long highestTimestamp; + + public ImageProvenance( + String source, + long highestOffset, + int highestEpoch, + long highestTimestamp Review Comment: Can we document all of these fields? I specifically don't understand how to use the `source` field. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org