jsancio commented on code in PR #12596: URL: https://github.com/apache/kafka/pull/12596#discussion_r966471788
########## core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala: ########## @@ -28,6 +28,8 @@ import org.apache.kafka.server.common.ApiMessageAndVersion import org.apache.kafka.server.fault.FaultHandler import org.apache.kafka.snapshot.SnapshotReader +import java.util.concurrent.atomic.AtomicBoolean + Review Comment: This comment applies to this code: ```scala try { delta.replay(highestMetadataOffset, epoch, messageAndVersion.message()) } catch { case e: Throwable => snapshotName match { case None => metadataLoadingFaultHandler.handleFault( s"Error replaying metadata log record at offset ${_highestOffset}", e) case Some(name) => metadataLoadingFaultHandler.handleFault( s"Error replaying record ${index} from snapshot ${name} at offset ${_highestOffset}", e) } } ``` I think this code attempts to read and replay the entire committed log. I wonder if this code should be more conservative if it encounters an error replaying a record and only read the current batch before updating the image. Note that this code is used for both snapshots and log segments. For snapshots, the entire snapshot needs to be in one `delta` update. ########## core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala: ########## @@ -307,11 +322,18 @@ class BrokerMetadataListener( private def publish(publisher: MetadataPublisher): Unit = { val delta = _delta - _image = _delta.apply() + try { + _image = _delta.apply() Review Comment: Note that it is possible for `_delta` to include a lot of batches maybe even the entire log. I wonder that if the broker encounters an error applying a delta we want to instead rewind, generate and apply a delta per record batch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org