[ https://issues.apache.org/jira/browse/KAFKA-16790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17847384#comment-17847384 ]
Muralidhar Basani commented on KAFKA-16790: ------------------------------------------- [~christo_lolov] can I look into this issue ? > Calls to RemoteLogManager are made before it is configured > ---------------------------------------------------------- > > Key: KAFKA-16790 > URL: https://issues.apache.org/jira/browse/KAFKA-16790 > Project: Kafka > Issue Type: Bug > Components: kraft > Affects Versions: 3.8.0 > Reporter: Christo Lolov > Priority: Major > > BrokerMetadataPublisher#onMetadataUpdate calls ReplicaManager#applyDelta (1) > which in turn calls RemoteLogManager#onLeadershipChange (2), however, the > RemoteLogManager is configured after the BrokerMetadataPublisher starts > running (3, 4). This is incorrect, we either need to initialise the > RemoteLogManager before we start the BrokerMetadataPublisher or we need to > skip calls to onLeadershipChange if the RemoteLogManager is not initialised. > (1) > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala#L151] > (2) > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L2737] > (3) > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L432] > (4) > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L515] > The way to reproduce the problem is by looking at the following changes > {code:java} > config/kraft/broker.properties | 10 ++++++++++ > .../main/java/kafka/log/remote/RemoteLogManager.java | 8 +++++++- > core/src/main/scala/kafka/server/ReplicaManager.scala | 6 +++++- > 3 files changed, 22 insertions(+), 2 deletions(-)diff --git > a/config/kraft/broker.properties b/config/kraft/broker.properties > index 2d15997f28..39d126cf87 100644 > --- a/config/kraft/broker.properties > +++ b/config/kraft/broker.properties > @@ -127,3 +127,13 @@ log.segment.bytes=1073741824 > # The interval at which log segments are checked to see if they can be > deleted according > # to the retention policies > log.retention.check.interval.ms=300000 > + > +remote.log.storage.system.enable=true > +remote.log.metadata.manager.listener.name=PLAINTEXT > +remote.log.storage.manager.class.name=org.apache.kafka.server.log.remote.storage.LocalTieredStorage > +remote.log.storage.manager.class.path=/home/ec2-user/kafka/storage/build/libs/kafka-storage-3.8.0-SNAPSHOT-test.jar > +remote.log.storage.manager.impl.prefix=rsm.config. > +remote.log.metadata.manager.impl.prefix=rlmm.config. > +rsm.config.dir=/tmp/kafka-remote-storage > +rlmm.config.remote.log.metadata.topic.replication.factor=1 > +log.retention.check.interval.ms=1000 > diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java > b/core/src/main/java/kafka/log/remote/RemoteLogManager.java > index 6555b7c0cd..e84a072abc 100644 > --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java > +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java > @@ -164,6 +164,7 @@ public class RemoteLogManager implements Closeable { > // The endpoint for remote log metadata manager to connect to > private Optional<EndPoint> endpoint = Optional.empty(); > private boolean closed = false; > + private boolean up = false; > > /** > * Creates RemoteLogManager instance with the given arguments. > @@ -298,6 +299,7 @@ public class RemoteLogManager implements Closeable { > // in connecting to the brokers or remote storages. > configureRSM(); > configureRLMM(); > + up = true; > } > > public RemoteStorageManager storageManager() { > @@ -329,7 +331,11 @@ public class RemoteLogManager implements Closeable { > public void onLeadershipChange(Set<Partition> partitionsBecomeLeader, > Set<Partition> partitionsBecomeFollower, > Map<String, Uuid> topicIds) { > - LOGGER.debug("Received leadership changes for leaders: {} and > followers: {}", partitionsBecomeLeader, partitionsBecomeFollower); > + if (!up) { > + LOGGER.error("NullPointerException"); > + return; > + } > + LOGGER.error("Received leadership changes for leaders: {} and > followers: {}", partitionsBecomeLeader, partitionsBecomeFollower); > > Map<TopicIdPartition, Integer> leaderPartitionsWithLeaderEpoch = > filterPartitions(partitionsBecomeLeader) > .collect(Collectors.toMap( > diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala > b/core/src/main/scala/kafka/server/ReplicaManager.scala > index 35499430d6..bd3f41c3d6 100644 > --- a/core/src/main/scala/kafka/server/ReplicaManager.scala > +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala > @@ -2688,6 +2688,7 @@ class ReplicaManager(val config: KafkaConfig, > */ > def applyDelta(delta: TopicsDelta, newImage: MetadataImage): Unit = { > // Before taking the lock, compute the local changes > + stateChangeLogger.error("ROBIN") > val localChanges = delta.localChanges(config.nodeId) > val metadataVersion = newImage.features().metadataVersion() > > @@ -2734,7 +2735,10 @@ class ReplicaManager(val config: KafkaConfig, > replicaFetcherManager.shutdownIdleFetcherThreads() > replicaAlterLogDirsManager.shutdownIdleFetcherThreads() > > - remoteLogManager.foreach(rlm => > rlm.onLeadershipChange(leaderChangedPartitions.asJava, > followerChangedPartitions.asJava, localChanges.topicIds())) > + remoteLogManager.foreach(rlm => { > + stateChangeLogger.error("JOKER") > + rlm.onLeadershipChange(leaderChangedPartitions.asJava, > followerChangedPartitions.asJava, localChanges.topicIds()) > + }) > } > > if (metadataVersion.isDirectoryAssignmentSupported) { {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)