cmccabe commented on a change in pull request #10931: URL: https://github.com/apache/kafka/pull/10931#discussion_r661674972
########## File path: core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala ########## @@ -70,230 +58,191 @@ class BrokerMetadataListener( */ @volatile private var _highestMetadataOffset = -1L + /** + * The current broker metadata image. Accessed only from the event queue thread. + */ + private var _image = MetadataImage.EMPTY + + /** + * The current metadata delta. Accessed only from the event queue thread. + */ + private var _delta = new MetadataDelta(_image) + + /** + * The object to use to publish new metadata changes, or None if this listener has not + * been activated yet. + */ + private var _publisher: Option[MetadataPublisher] = None + + /** + * The event queue which runs this listener. + */ val eventQueue = new KafkaEventQueue(time, logContext, threadNamePrefix.getOrElse("")) + /** + * Returns the highest metadata-offset. Thread-safe. + */ def highestMetadataOffset(): Long = _highestMetadataOffset /** * Handle new metadata records. */ - override def handleCommit(reader: BatchReader[ApiMessageAndVersion]): Unit = { + override def handleCommit(reader: BatchReader[ApiMessageAndVersion]): Unit = eventQueue.append(new HandleCommitsEvent(reader)) + + class HandleCommitsEvent(reader: BatchReader[ApiMessageAndVersion]) + extends EventQueue.FailureLoggingEvent(log) { + override def run(): Unit = { + val results = try { + val loadResults = loadBatches(_delta, reader) + if (isDebugEnabled) { + debug(s"Loaded new commits: ${loadResults}") + } + loadResults + } finally { + reader.close() + } + maybePublish(results.highestMetadataOffset) + } } /** * Handle metadata snapshots */ - override def handleSnapshot(reader: SnapshotReader[ApiMessageAndVersion]): Unit = { - // Loading snapshot on the broker is currently not supported. - reader.close(); - throw new UnsupportedOperationException(s"Loading snapshot (${reader.snapshotId()}) is not supported") - } - - // Visible for testing. It's useful to execute events synchronously in order - // to make tests deterministic. This object is responsible for closing the reader. - private[metadata] def execCommits(batchReader: BatchReader[ApiMessageAndVersion]): Unit = { - new HandleCommitsEvent(batchReader).run() - } + override def handleSnapshot(reader: SnapshotReader[ApiMessageAndVersion]): Unit = + eventQueue.append(new HandleSnapshotEvent(reader)) - class HandleCommitsEvent( - reader: BatchReader[ApiMessageAndVersion] - ) extends EventQueue.FailureLoggingEvent(log) { + class HandleSnapshotEvent(reader: SnapshotReader[ApiMessageAndVersion]) + extends EventQueue.FailureLoggingEvent(log) { override def run(): Unit = { - try { - while (reader.hasNext()) { - apply(reader.next()) - } + val results = try { + info(s"Loading snapshot ${reader.snapshotId().offset}-${reader.snapshotId().epoch}.") + _delta = new MetadataDelta(_image) // Discard any previous deltas. + val loadResults = loadBatches(_delta, reader) + _delta.finishSnapshot() + info(s"Loaded snapshot ${reader.snapshotId().offset}-${reader.snapshotId().epoch}: " + + s"${loadResults}") + loadResults } finally { reader.close() } + maybePublish(results.highestMetadataOffset) } + } - private def apply(batch: Batch[ApiMessageAndVersion]): Unit = { - val records = batch.records - val lastOffset = batch.lastOffset + case class BatchLoadResults(numBatches: Int, + numRecords: Int, + elapsedUs: Long, + highestMetadataOffset: Long) { + override def toString(): String = { + s"${numBatches} batch(es) with ${numRecords} record(s) ending at offset " + + s"${highestMetadataOffset} in ${elapsedUs} microseconds" + } + } - if (isDebugEnabled) { - debug(s"Metadata batch $lastOffset: handling ${records.size()} record(s).") - } - val imageBuilder = - MetadataImageBuilder(brokerId, log, metadataCache.currentImage()) - val startNs = time.nanoseconds() + private def loadBatches(delta: MetadataDelta, + iterator: util.Iterator[Batch[ApiMessageAndVersion]]): BatchLoadResults = { + val startTimeNs = time.nanoseconds() + var numBatches = 0 + var numRecords = 0 + var newHighestMetadataOffset = _highestMetadataOffset + while (iterator.hasNext()) { + val batch = iterator.next() var index = 0 - metadataBatchSizeHist.update(records.size()) - records.iterator().asScala.foreach { record => - try { - if (isTraceEnabled) { - trace("Metadata batch %d: processing [%d/%d]: %s.".format(lastOffset, index + 1, - records.size(), record.toString)) - } - handleMessage(imageBuilder, record.message, lastOffset) - } catch { - case e: Exception => error(s"Unable to handle record $index in batch " + - s"ending at offset $lastOffset", e) - } - index = index + 1 - } - if (imageBuilder.hasChanges) { - val newImage = imageBuilder.build() + batch.records().forEach { messageAndVersion => + newHighestMetadataOffset = batch.lastOffset() if (isTraceEnabled) { - trace(s"Metadata batch $lastOffset: creating new metadata image ${newImage}") - } else if (isDebugEnabled) { - debug(s"Metadata batch $lastOffset: creating new metadata image") - } - metadataCache.image(newImage) - } else if (isDebugEnabled) { - debug(s"Metadata batch $lastOffset: no new metadata image required.") - } - if (imageBuilder.hasPartitionChanges) { - if (isDebugEnabled) { - debug(s"Metadata batch $lastOffset: applying partition changes") + trace("Metadata batch %d: processing [%d/%d]: %s.".format(batch.lastOffset, index + 1, + batch.records().size(), messageAndVersion.message().toString())) } - replicaManager.handleMetadataRecords(imageBuilder, lastOffset, - RequestHandlerHelper.onLeadershipChange(groupCoordinator, txnCoordinator, _, _)) - } else if (isDebugEnabled) { - debug(s"Metadata batch $lastOffset: no partition changes found.") + delta.replay(messageAndVersion.message()) + numRecords += 1 + index += 1 } - _highestMetadataOffset = lastOffset - val endNs = time.nanoseconds() - val deltaUs = TimeUnit.MICROSECONDS.convert(endNs - startNs, TimeUnit.NANOSECONDS) - debug(s"Metadata batch $lastOffset: advanced highest metadata offset in ${deltaUs} " + - "microseconds.") - batchProcessingTimeHist.update(deltaUs) + metadataBatchSizeHist.update(batch.records().size()) + numBatches = numBatches + 1 } + _highestMetadataOffset = newHighestMetadataOffset + val endTimeNs = time.nanoseconds() + val elapsedUs = TimeUnit.MICROSECONDS.convert(endTimeNs - startTimeNs, TimeUnit.NANOSECONDS) + batchProcessingTimeHist.update(elapsedUs) + BatchLoadResults(numBatches, numRecords, elapsedUs, newHighestMetadataOffset) } - private def handleMessage(imageBuilder: MetadataImageBuilder, - record: ApiMessage, - lastOffset: Long): Unit = { - val recordType = try { - fromId(record.apiKey()) - } catch { - case e: Exception => throw new RuntimeException("Unknown metadata record type " + - s"${record.apiKey()} in batch ending at offset ${lastOffset}.") - } - - record match { - case rec: RegisterBrokerRecord => handleRegisterBrokerRecord(imageBuilder, rec) - case rec: UnregisterBrokerRecord => handleUnregisterBrokerRecord(imageBuilder, rec) - case rec: FenceBrokerRecord => handleFenceBrokerRecord(imageBuilder, rec) - case rec: UnfenceBrokerRecord => handleUnfenceBrokerRecord(imageBuilder, rec) - case rec: TopicRecord => handleTopicRecord(imageBuilder, rec) - case rec: PartitionRecord => handlePartitionRecord(imageBuilder, rec) - case rec: PartitionChangeRecord => handlePartitionChangeRecord(imageBuilder, rec) - case rec: RemoveTopicRecord => handleRemoveTopicRecord(imageBuilder, rec) - case rec: ConfigRecord => handleConfigRecord(rec) - case rec: ClientQuotaRecord => handleClientQuotaRecord(imageBuilder, rec) - case rec: ProducerIdsRecord => handleProducerIdRecord(rec) - case _ => throw new RuntimeException(s"Unhandled record $record with type $recordType") - } + def startPublishing(publisher: MetadataPublisher): CompletableFuture[Void] = { + val event = new StartPublishingEvent(publisher) + eventQueue.append(event) + event.future } - def handleRegisterBrokerRecord(imageBuilder: MetadataImageBuilder, - record: RegisterBrokerRecord): Unit = { - val broker = MetadataBroker(record) - imageBuilder.brokersBuilder().add(broker) - } - - def handleUnregisterBrokerRecord(imageBuilder: MetadataImageBuilder, - record: UnregisterBrokerRecord): Unit = { - imageBuilder.brokersBuilder().remove(record.brokerId()) - } - - def handleTopicRecord(imageBuilder: MetadataImageBuilder, - record: TopicRecord): Unit = { - imageBuilder.partitionsBuilder().addUuidMapping(record.name(), record.topicId()) - } + class StartPublishingEvent(publisher: MetadataPublisher) + extends EventQueue.FailureLoggingEvent(log) { + val future = new CompletableFuture[Void]() - def handlePartitionRecord(imageBuilder: MetadataImageBuilder, - record: PartitionRecord): Unit = { - imageBuilder.topicIdToName(record.topicId()) match { - case None => throw new RuntimeException(s"Unable to locate topic with ID ${record.topicId}") - case Some(name) => - val partition = MetadataPartition(name, record) - imageBuilder.partitionsBuilder().set(partition) + override def run(): Unit = { + _publisher = Some(publisher) + log.info(s"Starting to publish metadata events at offset ${_highestMetadataOffset}.") + try { + maybePublish(_highestMetadataOffset) + future.complete(null) + } catch { + case e: Throwable => future.completeExceptionally(e) + } } } - def handleConfigRecord(record: ConfigRecord): Unit = { - val t = ConfigResource.Type.forId(record.resourceType()) - if (t == ConfigResource.Type.UNKNOWN) { - throw new RuntimeException("Unable to understand config resource type " + - s"${Integer.valueOf(record.resourceType())}") + private def maybePublish(newHighestMetadataOffset: Long): Unit = { + _publisher match { + case None => // Nothing to do + case Some(publisher) => { + val delta = _delta + _image = _delta.apply() + _delta = new MetadataDelta(_image) + publisher.publish(newHighestMetadataOffset, delta, _image) + } } - val resource = new ConfigResource(t, record.resourceName()) - configRepository.setConfig(resource, record.name(), record.value()) - } - - def handlePartitionChangeRecord(imageBuilder: MetadataImageBuilder, - record: PartitionChangeRecord): Unit = { - imageBuilder.partitionsBuilder().handleChange(record) } - def handleFenceBrokerRecord(imageBuilder: MetadataImageBuilder, - record: FenceBrokerRecord): Unit = { - // TODO: add broker epoch to metadata cache, and check it here. - imageBuilder.brokersBuilder().changeFencing(record.id(), fenced = true) + override def handleLeaderChange(leaderAndEpoch: LeaderAndEpoch): Unit = { + // TODO: cache leaderAndEpoch so we can use the epoch in broker-initiated snapshots. } - def handleUnfenceBrokerRecord(imageBuilder: MetadataImageBuilder, - record: UnfenceBrokerRecord): Unit = { - // TODO: add broker epoch to metadata cache, and check it here. - imageBuilder.brokersBuilder().changeFencing(record.id(), fenced = false) + override def beginShutdown(): Unit = { + eventQueue.beginShutdown("beginShutdown", new ShutdownEvent()) } - def handleRemoveTopicRecord(imageBuilder: MetadataImageBuilder, - record: RemoveTopicRecord): Unit = { - imageBuilder.topicIdToName(record.topicId()) match { - case None => - throw new RuntimeException(s"Unable to locate topic with ID ${record.topicId}") - - case Some(topicName) => - info(s"Processing deletion of topic $topicName with id ${record.topicId}") - val removedPartitions = imageBuilder.partitionsBuilder().removeTopicById(record.topicId()) - groupCoordinator.handleDeletedPartitions(removedPartitions.map(_.toTopicPartition).toSeq, RequestLocal.NoCaching) - configRepository.remove(new ConfigResource(ConfigResource.Type.TOPIC, topicName)) + class ShutdownEvent() extends EventQueue.FailureLoggingEvent(log) { + override def run(): Unit = { + removeMetric(BrokerMetadataListener.MetadataBatchProcessingTimeUs) + removeMetric(BrokerMetadataListener.MetadataBatchSizes) } } - def handleClientQuotaRecord(imageBuilder: MetadataImageBuilder, - record: ClientQuotaRecord): Unit = { - // TODO add quotas to MetadataImageBuilder - clientQuotaManager.handleQuotaRecord(record) + def close(): Unit = { + beginShutdown() + eventQueue.close() } - def handleProducerIdRecord(record: ProducerIdsRecord): Unit = { - // This is a no-op since brokers get their producer ID blocks directly from the controller via - // AllocateProducerIds RPC response + // VisibleForTesting + private[kafka] def getImageRecords(): CompletableFuture[util.List[ApiMessageAndVersion]] = { + val future = new CompletableFuture[util.List[ApiMessageAndVersion]]() + eventQueue.append(new GetImageRecordsEvent(future)) + future } - class HandleNewLeaderEvent(leaderAndEpoch: LeaderAndEpoch) - extends EventQueue.FailureLoggingEvent(log) { - override def run(): Unit = { - val imageBuilder = - MetadataImageBuilder(brokerId, log, metadataCache.currentImage()) - imageBuilder.controllerId(leaderAndEpoch.leaderId.asScala) - metadataCache.image(imageBuilder.build()) + class GetImageRecordsEvent(future: CompletableFuture[util.List[ApiMessageAndVersion]]) + extends EventQueue.FailureLoggingEvent(log) with Consumer[util.List[ApiMessageAndVersion]] { + val records = new util.ArrayList[ApiMessageAndVersion]() + override def accept(batch: util.List[ApiMessageAndVersion]): Unit = { + if (batch == null) { Review comment: You're right, there is no need for the null here. Removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org