rondagostino commented on a change in pull request #10090: URL: https://github.com/apache/kafka/pull/10090#discussion_r573925637
########## File path: core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala ########## @@ -0,0 +1,260 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server.metadata + +import java.util +import java.util.concurrent.TimeUnit +import kafka.coordinator.group.GroupCoordinator +import kafka.coordinator.transaction.TransactionCoordinator +import kafka.log.LogManager +import kafka.metrics.KafkaMetricsGroup +import kafka.server.{RaftReplicaManager, RequestHandlerHelper} +import org.apache.kafka.common.config.ConfigResource +import org.apache.kafka.common.metadata.MetadataRecordType._ +import org.apache.kafka.common.metadata._ +import org.apache.kafka.common.protocol.ApiMessage +import org.apache.kafka.common.utils.{LogContext, Time} +import org.apache.kafka.metalog.{MetaLogLeader, MetaLogListener} +import org.apache.kafka.queue.{EventQueue, KafkaEventQueue} + +import scala.jdk.CollectionConverters._ + +object BrokerMetadataListener{ + val MetadataBatchProcessingTimeUs = "MetadataBatchProcessingTimeUs" + val MetadataBatchSizes = "MetadataBatchSizes" +} + +class BrokerMetadataListener(val brokerId: Int, + val time: Time, + val metadataCache: RaftMetadataCache, + val configRepository: LocalConfigRepository, + val groupCoordinator: GroupCoordinator, + val replicaManager: RaftReplicaManager, + val txnCoordinator: TransactionCoordinator, + val logManager: LogManager, + val threadNamePrefix: Option[String] + ) extends MetaLogListener with KafkaMetricsGroup { + val logContext = new LogContext(s"[BrokerMetadataListener id=${brokerId}] ") + val log = logContext.logger(classOf[BrokerMetadataListener]) Review comment: `private`? ########## File path: core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala ########## @@ -0,0 +1,260 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server.metadata + +import java.util +import java.util.concurrent.TimeUnit +import kafka.coordinator.group.GroupCoordinator +import kafka.coordinator.transaction.TransactionCoordinator +import kafka.log.LogManager +import kafka.metrics.KafkaMetricsGroup +import kafka.server.{RaftReplicaManager, RequestHandlerHelper} +import org.apache.kafka.common.config.ConfigResource +import org.apache.kafka.common.metadata.MetadataRecordType._ +import org.apache.kafka.common.metadata._ +import org.apache.kafka.common.protocol.ApiMessage +import org.apache.kafka.common.utils.{LogContext, Time} +import org.apache.kafka.metalog.{MetaLogLeader, MetaLogListener} +import org.apache.kafka.queue.{EventQueue, KafkaEventQueue} + +import scala.jdk.CollectionConverters._ + +object BrokerMetadataListener{ + val MetadataBatchProcessingTimeUs = "MetadataBatchProcessingTimeUs" + val MetadataBatchSizes = "MetadataBatchSizes" +} + +class BrokerMetadataListener(val brokerId: Int, + val time: Time, + val metadataCache: RaftMetadataCache, + val configRepository: LocalConfigRepository, + val groupCoordinator: GroupCoordinator, + val replicaManager: RaftReplicaManager, + val txnCoordinator: TransactionCoordinator, + val logManager: LogManager, + val threadNamePrefix: Option[String] + ) extends MetaLogListener with KafkaMetricsGroup { Review comment: Do any of these really need `val`? ########## File path: core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala ########## @@ -0,0 +1,260 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server.metadata + +import java.util +import java.util.concurrent.TimeUnit +import kafka.coordinator.group.GroupCoordinator +import kafka.coordinator.transaction.TransactionCoordinator +import kafka.log.LogManager +import kafka.metrics.KafkaMetricsGroup +import kafka.server.{RaftReplicaManager, RequestHandlerHelper} +import org.apache.kafka.common.config.ConfigResource +import org.apache.kafka.common.metadata.MetadataRecordType._ +import org.apache.kafka.common.metadata._ +import org.apache.kafka.common.protocol.ApiMessage +import org.apache.kafka.common.utils.{LogContext, Time} +import org.apache.kafka.metalog.{MetaLogLeader, MetaLogListener} +import org.apache.kafka.queue.{EventQueue, KafkaEventQueue} + +import scala.jdk.CollectionConverters._ + +object BrokerMetadataListener{ + val MetadataBatchProcessingTimeUs = "MetadataBatchProcessingTimeUs" + val MetadataBatchSizes = "MetadataBatchSizes" +} + +class BrokerMetadataListener(val brokerId: Int, + val time: Time, + val metadataCache: RaftMetadataCache, + val configRepository: LocalConfigRepository, + val groupCoordinator: GroupCoordinator, + val replicaManager: RaftReplicaManager, + val txnCoordinator: TransactionCoordinator, + val logManager: LogManager, + val threadNamePrefix: Option[String] + ) extends MetaLogListener with KafkaMetricsGroup { + val logContext = new LogContext(s"[BrokerMetadataListener id=${brokerId}] ") + val log = logContext.logger(classOf[BrokerMetadataListener]) + logIdent = logContext.logPrefix() + + /** + * A histogram tracking the time in microseconds it took to process batches of events. + */ + private val batchProcessingTimeHist = newHistogram(BrokerMetadataListener.MetadataBatchProcessingTimeUs) + + /** + * A histogram tracking the sizes of batches that we have processed. + */ + private val metadataBatchSizeHist = newHistogram(BrokerMetadataListener.MetadataBatchSizes) + + /** + * The highest metadata offset that we've seen. Written only from the event queue thread. + */ + @volatile private var _highestMetadataOffset = -1L + + val eventQueue = new KafkaEventQueue(time, logContext, threadNamePrefix.getOrElse("")) + + def highestMetadataOffset(): Long = _highestMetadataOffset + + /** + * Handle new metadata records. + */ + override def handleCommits(lastOffset: Long, records: util.List[ApiMessage]): Unit = { + eventQueue.append(new HandleCommitsEvent(lastOffset, records)) + } + + class HandleCommitsEvent(lastOffset: Long, + records: util.List[ApiMessage]) + extends EventQueue.FailureLoggingEvent(log) { + override def run(): Unit = { + if (isDebugEnabled) { + debug(s"Metadata batch ${lastOffset}: handling ${records.size()} record(s).") + } + val imageBuilder = + MetadataImageBuilder(brokerId, log, metadataCache.currentImage()) + val startNs = time.nanoseconds() + var index = 0 + metadataBatchSizeHist.update(records.size()) + records.iterator().asScala.foreach { case record => + try { + if (isTraceEnabled) { + trace("Metadata batch %d: processing [%d/%d]: %s.".format(lastOffset, index + 1, + records.size(), record.toString())) + } + handleMessage(imageBuilder, record, lastOffset) + } catch { + case e: Exception => error(s"Unable to handle record ${index} in batch " + + s"ending at offset ${lastOffset}", e) + } Review comment: Perhaps not to resolve now, but I think error conditions like this might need more thought. Should the broker die? If so, how would the broker ever come back up if it can't get past a particular batch? But what is the impact of not processing the remaining messages in a batch, and is that always acceptable or just sometimes acceptable? Do we need metrics to track these error events? It seems an error log message might not be sufficient? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org