cmccabe commented on a change in pull request #10931:
URL: https://github.com/apache/kafka/pull/10931#discussion_r663277869
##########
File path:
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
##########
@@ -70,230 +58,188 @@ class BrokerMetadataListener(
*/
@volatile private var _highestMetadataOffset = -1L
+ /**
+ * The current broker metadata image. Accessed only from the event queue
thread.
+ */
+ private var _image = MetadataImage.EMPTY
+
+ /**
+ * The current metadata delta. Accessed only from the event queue thread.
+ */
+ private var _delta = new MetadataDelta(_image)
+
+ /**
+ * The object to use to publish new metadata changes, or None if this
listener has not
+ * been activated yet.
+ */
+ private var _publisher: Option[MetadataPublisher] = None
+
+ /**
+ * The event queue which runs this listener.
+ */
val eventQueue = new KafkaEventQueue(time, logContext,
threadNamePrefix.getOrElse(""))
+ /**
+ * Returns the highest metadata-offset. Thread-safe.
+ */
def highestMetadataOffset(): Long = _highestMetadataOffset
/**
* Handle new metadata records.
*/
- override def handleCommit(reader: BatchReader[ApiMessageAndVersion]): Unit =
{
+ override def handleCommit(reader: BatchReader[ApiMessageAndVersion]): Unit =
eventQueue.append(new HandleCommitsEvent(reader))
+
+ class HandleCommitsEvent(reader: BatchReader[ApiMessageAndVersion])
+ extends EventQueue.FailureLoggingEvent(log) {
+ override def run(): Unit = {
+ val results = try {
+ val loadResults = loadBatches(_delta, reader)
+ if (isDebugEnabled) {
+ debug(s"Loaded new commits: ${loadResults}")
+ }
+ loadResults
+ } finally {
+ reader.close()
+ }
+ maybePublish(results.highestMetadataOffset)
+ }
}
/**
* Handle metadata snapshots
*/
- override def handleSnapshot(reader: SnapshotReader[ApiMessageAndVersion]):
Unit = {
- // Loading snapshot on the broker is currently not supported.
- reader.close();
- throw new UnsupportedOperationException(s"Loading snapshot
(${reader.snapshotId()}) is not supported")
- }
-
- // Visible for testing. It's useful to execute events synchronously in order
- // to make tests deterministic. This object is responsible for closing the
reader.
- private[metadata] def execCommits(batchReader:
BatchReader[ApiMessageAndVersion]): Unit = {
- new HandleCommitsEvent(batchReader).run()
- }
+ override def handleSnapshot(reader: SnapshotReader[ApiMessageAndVersion]):
Unit =
+ eventQueue.append(new HandleSnapshotEvent(reader))
- class HandleCommitsEvent(
- reader: BatchReader[ApiMessageAndVersion]
- ) extends EventQueue.FailureLoggingEvent(log) {
+ class HandleSnapshotEvent(reader: SnapshotReader[ApiMessageAndVersion])
+ extends EventQueue.FailureLoggingEvent(log) {
override def run(): Unit = {
- try {
- while (reader.hasNext()) {
- apply(reader.next())
- }
+ val results = try {
+ info(s"Loading snapshot
${reader.snapshotId().offset}-${reader.snapshotId().epoch}.")
+ _delta = new MetadataDelta(_image) // Discard any previous deltas.
+ val loadResults = loadBatches(_delta, reader)
+ _delta.finishSnapshot()
Review comment:
Yeah, I thought about a few other names. Remove unreferenced? None of
them seem great...
I added some more JavaDoc to `MetadataDelta#finishSnapshot`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]