cmccabe commented on code in PR #12776:
URL: https://github.com/apache/kafka/pull/12776#discussion_r1011313984


##########
metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java:
##########
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.loader;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.image.MetadataVersionChange;
+import org.apache.kafka.image.MetadataVersionChangeException;
+import org.apache.kafka.image.publisher.MetadataPublisher;
+import org.apache.kafka.image.writer.ImageReWriter;
+import org.apache.kafka.image.writer.ImageWriterOptions;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.raft.Batch;
+import org.apache.kafka.raft.BatchReader;
+import org.apache.kafka.raft.LeaderAndEpoch;
+import org.apache.kafka.raft.RaftClient;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.fault.FaultHandler;
+import org.apache.kafka.snapshot.SnapshotReader;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+
+/**
+ * The MetadataLoader follows changes provided by a RaftClient, and packages 
them into metadata
+ * deltas and images that can be consumed by publishers.
+ *
+ * The Loader maintains its own thread, which is used to make all callbacks 
into publishers. If a
+ * publisher A is installed before B, A will receive all callbacks before B. 
This is also true if
+ * A and B are installed as part of a list [A, B].
+ *
+ * Publishers should not modify any data structures passed to them.
+ *
+ * It is possible to change the list of publishers dynamically over time. 
Whenever a new publisher is
+ * added, it receives a catch-up delta which contains the full state. Any 
publisher installed when the
+ * loader is closed will itself be closed.
+ */
+public class MetadataLoader implements 
RaftClient.Listener<ApiMessageAndVersion>, AutoCloseable {
+    public static class Builder {
+        private int nodeId = -1;
+        private Time time = Time.SYSTEM;
+        private LogContext logContext = null;
+        private String threadNamePrefix = "";
+        private FaultHandler faultHandler = null;
+        private MetadataLoaderMetrics metrics = new MetadataLoaderMetrics() {
+            private volatile long lastAppliedOffset = -1L;
+
+            @Override
+            public void updateBatchProcessingTime(long elapsedNs) { }
+
+            @Override
+            public void updateBatchSize(int size) { }
+
+            @Override
+            public void updateLastAppliedImageProvenance(MetadataProvenance 
provenance) {
+                this.lastAppliedOffset = provenance.offset();
+            }
+
+            @Override
+            public long lastAppliedOffset() {
+                return lastAppliedOffset;
+            }
+
+            @Override
+            public void close() throws Exception { }
+        };
+
+        public Builder setNodeId(int nodeId) {
+            this.nodeId = nodeId;
+            return this;
+        }
+
+        public Builder setTime(Time time) {
+            this.time = time;
+            return this;
+        }
+
+        public Builder setThreadNamePrefix(String threadNamePrefix) {
+            this.threadNamePrefix = threadNamePrefix;
+            return this;
+        }
+
+        public Builder setFaultHandler(FaultHandler faultHandler) {
+            this.faultHandler = faultHandler;
+            return this;
+        }
+
+        public Builder setMetadataLoaderMetrics(MetadataLoaderMetrics metrics) 
{
+            this.metrics = metrics;
+            return this;
+        }
+
+        public MetadataLoader build() {
+            if (logContext == null) {
+                logContext = new LogContext("[MetadataLoader " + nodeId + "] 
");
+            }
+            if (faultHandler == null) throw new RuntimeException("You must set 
a fault handler.");
+            return new MetadataLoader(
+                time,
+                logContext,
+                threadNamePrefix,
+                faultHandler,
+                metrics);
+        }
+    }
+
+    /**
+     * The log4j logger for this loader.
+     */
+    private final Logger log;
+
+    /**
+     * The clock used by this loader.
+     */
+    private final Time time;
+
+    /**
+     * The fault handler to use if metadata loading fails.
+     */
+    private final FaultHandler faultHandler;
+
+    /**
+     * Callbacks for updating metrics.
+     */
+    private final MetadataLoaderMetrics metrics;
+
+    /**
+     * The publishers which should receive cluster metadata updates.
+     */
+    private final List<MetadataPublisher> publishers;
+
+    /**
+     * The current leader and epoch.
+     */
+    private LeaderAndEpoch currentLeaderAndEpoch = LeaderAndEpoch.UNKNOWN;

Review Comment:
   it would be nice to keep to minimize divergence



##########
metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java:
##########
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.loader;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.image.MetadataVersionChange;
+import org.apache.kafka.image.MetadataVersionChangeException;
+import org.apache.kafka.image.publisher.MetadataPublisher;
+import org.apache.kafka.image.writer.ImageReWriter;
+import org.apache.kafka.image.writer.ImageWriterOptions;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.raft.Batch;
+import org.apache.kafka.raft.BatchReader;
+import org.apache.kafka.raft.LeaderAndEpoch;
+import org.apache.kafka.raft.RaftClient;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.fault.FaultHandler;
+import org.apache.kafka.snapshot.SnapshotReader;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+
+/**
+ * The MetadataLoader follows changes provided by a RaftClient, and packages 
them into metadata
+ * deltas and images that can be consumed by publishers.
+ *
+ * The Loader maintains its own thread, which is used to make all callbacks 
into publishers. If a
+ * publisher A is installed before B, A will receive all callbacks before B. 
This is also true if
+ * A and B are installed as part of a list [A, B].
+ *
+ * Publishers should not modify any data structures passed to them.
+ *
+ * It is possible to change the list of publishers dynamically over time. 
Whenever a new publisher is
+ * added, it receives a catch-up delta which contains the full state. Any 
publisher installed when the
+ * loader is closed will itself be closed.
+ */
+public class MetadataLoader implements 
RaftClient.Listener<ApiMessageAndVersion>, AutoCloseable {
+    public static class Builder {
+        private int nodeId = -1;
+        private Time time = Time.SYSTEM;
+        private LogContext logContext = null;
+        private String threadNamePrefix = "";
+        private FaultHandler faultHandler = null;
+        private MetadataLoaderMetrics metrics = new MetadataLoaderMetrics() {
+            private volatile long lastAppliedOffset = -1L;
+
+            @Override
+            public void updateBatchProcessingTime(long elapsedNs) { }
+
+            @Override
+            public void updateBatchSize(int size) { }
+
+            @Override
+            public void updateLastAppliedImageProvenance(MetadataProvenance 
provenance) {
+                this.lastAppliedOffset = provenance.offset();
+            }
+
+            @Override
+            public long lastAppliedOffset() {
+                return lastAppliedOffset;
+            }
+
+            @Override
+            public void close() throws Exception { }
+        };
+
+        public Builder setNodeId(int nodeId) {
+            this.nodeId = nodeId;
+            return this;
+        }
+
+        public Builder setTime(Time time) {
+            this.time = time;
+            return this;
+        }
+
+        public Builder setThreadNamePrefix(String threadNamePrefix) {
+            this.threadNamePrefix = threadNamePrefix;
+            return this;
+        }
+
+        public Builder setFaultHandler(FaultHandler faultHandler) {
+            this.faultHandler = faultHandler;
+            return this;
+        }
+
+        public Builder setMetadataLoaderMetrics(MetadataLoaderMetrics metrics) 
{
+            this.metrics = metrics;
+            return this;
+        }
+
+        public MetadataLoader build() {
+            if (logContext == null) {
+                logContext = new LogContext("[MetadataLoader " + nodeId + "] 
");
+            }
+            if (faultHandler == null) throw new RuntimeException("You must set 
a fault handler.");
+            return new MetadataLoader(
+                time,
+                logContext,
+                threadNamePrefix,
+                faultHandler,
+                metrics);
+        }
+    }
+
+    /**
+     * The log4j logger for this loader.
+     */
+    private final Logger log;
+
+    /**
+     * The clock used by this loader.
+     */
+    private final Time time;
+
+    /**
+     * The fault handler to use if metadata loading fails.
+     */
+    private final FaultHandler faultHandler;
+
+    /**
+     * Callbacks for updating metrics.
+     */
+    private final MetadataLoaderMetrics metrics;
+
+    /**
+     * The publishers which should receive cluster metadata updates.
+     */
+    private final List<MetadataPublisher> publishers;
+
+    /**
+     * The current leader and epoch.
+     */
+    private LeaderAndEpoch currentLeaderAndEpoch = LeaderAndEpoch.UNKNOWN;

Review Comment:
   it would be nice to keep to minimize divergence (although I don't feel 
strongly)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to