[jira] [Commented] (KAFKA-16790) Calls to RemoteLogManager are made before it is configured

2024-05-28 Thread Muralidhar Basani (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17849991#comment-17849991
 ] 

Muralidhar Basani commented on KAFKA-16790:
---

[~christo_lolov] [~nikramakrishnan]  wdyt about the new pr changes ? 

> Calls to RemoteLogManager are made before it is configured
> --
>
> Key: KAFKA-16790
> URL: https://issues.apache.org/jira/browse/KAFKA-16790
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.8.0
>Reporter: Christo Lolov
>Assignee: Muralidhar Basani
>Priority: Major
>
> BrokerMetadataPublisher#onMetadataUpdate calls ReplicaManager#applyDelta (1) 
> which in turn calls RemoteLogManager#onLeadershipChange (2), however, the 
> RemoteLogManager is configured after the BrokerMetadataPublisher starts 
> running (3, 4). This is incorrect, we either need to initialise the 
> RemoteLogManager before we start the BrokerMetadataPublisher or we need to 
> skip calls to onLeadershipChange if the RemoteLogManager is not initialised.
> (1) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala#L151]
> (2) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L2737]
> (3) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L432]
> (4) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L515]
> The way to reproduce the problem is by looking at the following changes
> {code:java}
>  config/kraft/broker.properties                         | 10 ++
>  .../main/java/kafka/log/remote/RemoteLogManager.java   |  8 +++-
>  core/src/main/scala/kafka/server/ReplicaManager.scala  |  6 +-
>  3 files changed, 22 insertions(+), 2 deletions(-)diff --git 
> a/config/kraft/broker.properties b/config/kraft/broker.properties
> index 2d15997f28..39d126cf87 100644
> --- a/config/kraft/broker.properties
> +++ b/config/kraft/broker.properties
> @@ -127,3 +127,13 @@ log.segment.bytes=1073741824
>  # The interval at which log segments are checked to see if they can be 
> deleted according
>  # to the retention policies
>  log.retention.check.interval.ms=30
> +
> +remote.log.storage.system.enable=true
> +remote.log.metadata.manager.listener.name=PLAINTEXT
> +remote.log.storage.manager.class.name=org.apache.kafka.server.log.remote.storage.LocalTieredStorage
> +remote.log.storage.manager.class.path=/home/ec2-user/kafka/storage/build/libs/kafka-storage-3.8.0-SNAPSHOT-test.jar
> +remote.log.storage.manager.impl.prefix=rsm.config.
> +remote.log.metadata.manager.impl.prefix=rlmm.config.
> +rsm.config.dir=/tmp/kafka-remote-storage
> +rlmm.config.remote.log.metadata.topic.replication.factor=1
> +log.retention.check.interval.ms=1000
> diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
> b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
> index 6555b7c0cd..e84a072abc 100644
> --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
> +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
> @@ -164,6 +164,7 @@ public class RemoteLogManager implements Closeable {
>      // The endpoint for remote log metadata manager to connect to
>      private Optional endpoint = Optional.empty();
>      private boolean closed = false;
> +    private boolean up = false;
>  
>      /**
>       * Creates RemoteLogManager instance with the given arguments.
> @@ -298,6 +299,7 @@ public class RemoteLogManager implements Closeable {
>          // in connecting to the brokers or remote storages.
>          configureRSM();
>          configureRLMM();
> +        up = true;
>      }
>  
>      public RemoteStorageManager storageManager() {
> @@ -329,7 +331,11 @@ public class RemoteLogManager implements Closeable {
>      public void onLeadershipChange(Set partitionsBecomeLeader,
>                                     Set partitionsBecomeFollower,
>                                     Map topicIds) {
> -        LOGGER.debug("Received leadership changes for leaders: {} and 
> followers: {}", partitionsBecomeLeader, partitionsBecomeFollower);
> +        if (!up) {
> +            LOGGER.error("NullPointerException");
> +            return;
> +        }
> +        LOGGER.error("Received leadership changes for leaders: {} and 
> followers: {}", partitionsBecomeLeader, partitionsBecomeFollower);
>  
>          Map leaderPartitionsWithLeaderEpoch = 
> filterPartitions(partitionsBecomeLeader)
>                  .collect(Collectors.toMap(
> diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
> b/core/src/main/scala/kafka/server/ReplicaManager.scala
> index 35499430d6..bd3f41c3d6 100644
> --- 

[jira] [Commented] (KAFKA-16790) Calls to RemoteLogManager are made before it is configured

2024-05-20 Thread Muralidhar Basani (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847989#comment-17847989
 ] 

Muralidhar Basani commented on KAFKA-16790:
---

[~christo_lolov] have a pr open. 

> Calls to RemoteLogManager are made before it is configured
> --
>
> Key: KAFKA-16790
> URL: https://issues.apache.org/jira/browse/KAFKA-16790
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.8.0
>Reporter: Christo Lolov
>Assignee: Muralidhar Basani
>Priority: Major
>
> BrokerMetadataPublisher#onMetadataUpdate calls ReplicaManager#applyDelta (1) 
> which in turn calls RemoteLogManager#onLeadershipChange (2), however, the 
> RemoteLogManager is configured after the BrokerMetadataPublisher starts 
> running (3, 4). This is incorrect, we either need to initialise the 
> RemoteLogManager before we start the BrokerMetadataPublisher or we need to 
> skip calls to onLeadershipChange if the RemoteLogManager is not initialised.
> (1) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala#L151]
> (2) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L2737]
> (3) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L432]
> (4) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L515]
> The way to reproduce the problem is by looking at the following changes
> {code:java}
>  config/kraft/broker.properties                         | 10 ++
>  .../main/java/kafka/log/remote/RemoteLogManager.java   |  8 +++-
>  core/src/main/scala/kafka/server/ReplicaManager.scala  |  6 +-
>  3 files changed, 22 insertions(+), 2 deletions(-)diff --git 
> a/config/kraft/broker.properties b/config/kraft/broker.properties
> index 2d15997f28..39d126cf87 100644
> --- a/config/kraft/broker.properties
> +++ b/config/kraft/broker.properties
> @@ -127,3 +127,13 @@ log.segment.bytes=1073741824
>  # The interval at which log segments are checked to see if they can be 
> deleted according
>  # to the retention policies
>  log.retention.check.interval.ms=30
> +
> +remote.log.storage.system.enable=true
> +remote.log.metadata.manager.listener.name=PLAINTEXT
> +remote.log.storage.manager.class.name=org.apache.kafka.server.log.remote.storage.LocalTieredStorage
> +remote.log.storage.manager.class.path=/home/ec2-user/kafka/storage/build/libs/kafka-storage-3.8.0-SNAPSHOT-test.jar
> +remote.log.storage.manager.impl.prefix=rsm.config.
> +remote.log.metadata.manager.impl.prefix=rlmm.config.
> +rsm.config.dir=/tmp/kafka-remote-storage
> +rlmm.config.remote.log.metadata.topic.replication.factor=1
> +log.retention.check.interval.ms=1000
> diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
> b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
> index 6555b7c0cd..e84a072abc 100644
> --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
> +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
> @@ -164,6 +164,7 @@ public class RemoteLogManager implements Closeable {
>      // The endpoint for remote log metadata manager to connect to
>      private Optional endpoint = Optional.empty();
>      private boolean closed = false;
> +    private boolean up = false;
>  
>      /**
>       * Creates RemoteLogManager instance with the given arguments.
> @@ -298,6 +299,7 @@ public class RemoteLogManager implements Closeable {
>          // in connecting to the brokers or remote storages.
>          configureRSM();
>          configureRLMM();
> +        up = true;
>      }
>  
>      public RemoteStorageManager storageManager() {
> @@ -329,7 +331,11 @@ public class RemoteLogManager implements Closeable {
>      public void onLeadershipChange(Set partitionsBecomeLeader,
>                                     Set partitionsBecomeFollower,
>                                     Map topicIds) {
> -        LOGGER.debug("Received leadership changes for leaders: {} and 
> followers: {}", partitionsBecomeLeader, partitionsBecomeFollower);
> +        if (!up) {
> +            LOGGER.error("NullPointerException");
> +            return;
> +        }
> +        LOGGER.error("Received leadership changes for leaders: {} and 
> followers: {}", partitionsBecomeLeader, partitionsBecomeFollower);
>  
>          Map leaderPartitionsWithLeaderEpoch = 
> filterPartitions(partitionsBecomeLeader)
>                  .collect(Collectors.toMap(
> diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
> b/core/src/main/scala/kafka/server/ReplicaManager.scala
> index 35499430d6..bd3f41c3d6 100644
> --- a/core/src/main/scala/kafka/server/ReplicaManager.scala
> +++ 

[jira] [Commented] (KAFKA-16790) Calls to RemoteLogManager are made before it is configured

2024-05-20 Thread Muralidhar Basani (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847869#comment-17847869
 ] 

Muralidhar Basani commented on KAFKA-16790:
---

[~christo_lolov] thank you for the explanation. I will look into it.

> Calls to RemoteLogManager are made before it is configured
> --
>
> Key: KAFKA-16790
> URL: https://issues.apache.org/jira/browse/KAFKA-16790
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.8.0
>Reporter: Christo Lolov
>Assignee: Muralidhar Basani
>Priority: Major
>
> BrokerMetadataPublisher#onMetadataUpdate calls ReplicaManager#applyDelta (1) 
> which in turn calls RemoteLogManager#onLeadershipChange (2), however, the 
> RemoteLogManager is configured after the BrokerMetadataPublisher starts 
> running (3, 4). This is incorrect, we either need to initialise the 
> RemoteLogManager before we start the BrokerMetadataPublisher or we need to 
> skip calls to onLeadershipChange if the RemoteLogManager is not initialised.
> (1) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala#L151]
> (2) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L2737]
> (3) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L432]
> (4) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L515]
> The way to reproduce the problem is by looking at the following changes
> {code:java}
>  config/kraft/broker.properties                         | 10 ++
>  .../main/java/kafka/log/remote/RemoteLogManager.java   |  8 +++-
>  core/src/main/scala/kafka/server/ReplicaManager.scala  |  6 +-
>  3 files changed, 22 insertions(+), 2 deletions(-)diff --git 
> a/config/kraft/broker.properties b/config/kraft/broker.properties
> index 2d15997f28..39d126cf87 100644
> --- a/config/kraft/broker.properties
> +++ b/config/kraft/broker.properties
> @@ -127,3 +127,13 @@ log.segment.bytes=1073741824
>  # The interval at which log segments are checked to see if they can be 
> deleted according
>  # to the retention policies
>  log.retention.check.interval.ms=30
> +
> +remote.log.storage.system.enable=true
> +remote.log.metadata.manager.listener.name=PLAINTEXT
> +remote.log.storage.manager.class.name=org.apache.kafka.server.log.remote.storage.LocalTieredStorage
> +remote.log.storage.manager.class.path=/home/ec2-user/kafka/storage/build/libs/kafka-storage-3.8.0-SNAPSHOT-test.jar
> +remote.log.storage.manager.impl.prefix=rsm.config.
> +remote.log.metadata.manager.impl.prefix=rlmm.config.
> +rsm.config.dir=/tmp/kafka-remote-storage
> +rlmm.config.remote.log.metadata.topic.replication.factor=1
> +log.retention.check.interval.ms=1000
> diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
> b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
> index 6555b7c0cd..e84a072abc 100644
> --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
> +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
> @@ -164,6 +164,7 @@ public class RemoteLogManager implements Closeable {
>      // The endpoint for remote log metadata manager to connect to
>      private Optional endpoint = Optional.empty();
>      private boolean closed = false;
> +    private boolean up = false;
>  
>      /**
>       * Creates RemoteLogManager instance with the given arguments.
> @@ -298,6 +299,7 @@ public class RemoteLogManager implements Closeable {
>          // in connecting to the brokers or remote storages.
>          configureRSM();
>          configureRLMM();
> +        up = true;
>      }
>  
>      public RemoteStorageManager storageManager() {
> @@ -329,7 +331,11 @@ public class RemoteLogManager implements Closeable {
>      public void onLeadershipChange(Set partitionsBecomeLeader,
>                                     Set partitionsBecomeFollower,
>                                     Map topicIds) {
> -        LOGGER.debug("Received leadership changes for leaders: {} and 
> followers: {}", partitionsBecomeLeader, partitionsBecomeFollower);
> +        if (!up) {
> +            LOGGER.error("NullPointerException");
> +            return;
> +        }
> +        LOGGER.error("Received leadership changes for leaders: {} and 
> followers: {}", partitionsBecomeLeader, partitionsBecomeFollower);
>  
>          Map leaderPartitionsWithLeaderEpoch = 
> filterPartitions(partitionsBecomeLeader)
>                  .collect(Collectors.toMap(
> diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
> b/core/src/main/scala/kafka/server/ReplicaManager.scala
> index 35499430d6..bd3f41c3d6 100644
> --- 

[jira] [Commented] (KAFKA-16790) Calls to RemoteLogManager are made before it is configured

2024-05-20 Thread Christo Lolov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847829#comment-17847829
 ] 

Christo Lolov commented on KAFKA-16790:
---

I have assigned the Jira ticket to you, [~muralibasani]! If you manage to get 
this in until the 29th of May you will fix it in time for 3.8 :). Feel free to 
tag me as a reviewer on any PRs you raise

> Calls to RemoteLogManager are made before it is configured
> --
>
> Key: KAFKA-16790
> URL: https://issues.apache.org/jira/browse/KAFKA-16790
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.8.0
>Reporter: Christo Lolov
>Assignee: Muralidhar Basani
>Priority: Major
>
> BrokerMetadataPublisher#onMetadataUpdate calls ReplicaManager#applyDelta (1) 
> which in turn calls RemoteLogManager#onLeadershipChange (2), however, the 
> RemoteLogManager is configured after the BrokerMetadataPublisher starts 
> running (3, 4). This is incorrect, we either need to initialise the 
> RemoteLogManager before we start the BrokerMetadataPublisher or we need to 
> skip calls to onLeadershipChange if the RemoteLogManager is not initialised.
> (1) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala#L151]
> (2) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L2737]
> (3) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L432]
> (4) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L515]
> The way to reproduce the problem is by looking at the following changes
> {code:java}
>  config/kraft/broker.properties                         | 10 ++
>  .../main/java/kafka/log/remote/RemoteLogManager.java   |  8 +++-
>  core/src/main/scala/kafka/server/ReplicaManager.scala  |  6 +-
>  3 files changed, 22 insertions(+), 2 deletions(-)diff --git 
> a/config/kraft/broker.properties b/config/kraft/broker.properties
> index 2d15997f28..39d126cf87 100644
> --- a/config/kraft/broker.properties
> +++ b/config/kraft/broker.properties
> @@ -127,3 +127,13 @@ log.segment.bytes=1073741824
>  # The interval at which log segments are checked to see if they can be 
> deleted according
>  # to the retention policies
>  log.retention.check.interval.ms=30
> +
> +remote.log.storage.system.enable=true
> +remote.log.metadata.manager.listener.name=PLAINTEXT
> +remote.log.storage.manager.class.name=org.apache.kafka.server.log.remote.storage.LocalTieredStorage
> +remote.log.storage.manager.class.path=/home/ec2-user/kafka/storage/build/libs/kafka-storage-3.8.0-SNAPSHOT-test.jar
> +remote.log.storage.manager.impl.prefix=rsm.config.
> +remote.log.metadata.manager.impl.prefix=rlmm.config.
> +rsm.config.dir=/tmp/kafka-remote-storage
> +rlmm.config.remote.log.metadata.topic.replication.factor=1
> +log.retention.check.interval.ms=1000
> diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
> b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
> index 6555b7c0cd..e84a072abc 100644
> --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
> +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
> @@ -164,6 +164,7 @@ public class RemoteLogManager implements Closeable {
>      // The endpoint for remote log metadata manager to connect to
>      private Optional endpoint = Optional.empty();
>      private boolean closed = false;
> +    private boolean up = false;
>  
>      /**
>       * Creates RemoteLogManager instance with the given arguments.
> @@ -298,6 +299,7 @@ public class RemoteLogManager implements Closeable {
>          // in connecting to the brokers or remote storages.
>          configureRSM();
>          configureRLMM();
> +        up = true;
>      }
>  
>      public RemoteStorageManager storageManager() {
> @@ -329,7 +331,11 @@ public class RemoteLogManager implements Closeable {
>      public void onLeadershipChange(Set partitionsBecomeLeader,
>                                     Set partitionsBecomeFollower,
>                                     Map topicIds) {
> -        LOGGER.debug("Received leadership changes for leaders: {} and 
> followers: {}", partitionsBecomeLeader, partitionsBecomeFollower);
> +        if (!up) {
> +            LOGGER.error("NullPointerException");
> +            return;
> +        }
> +        LOGGER.error("Received leadership changes for leaders: {} and 
> followers: {}", partitionsBecomeLeader, partitionsBecomeFollower);
>  
>          Map leaderPartitionsWithLeaderEpoch = 
> filterPartitions(partitionsBecomeLeader)
>                  .collect(Collectors.toMap(
> diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
> 

[jira] [Commented] (KAFKA-16790) Calls to RemoteLogManager are made before it is configured

2024-05-20 Thread Christo Lolov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847824#comment-17847824
 ] 

Christo Lolov commented on KAFKA-16790:
---

Heya [~muralibasani], thank you for picking this up on such a short notice!

Here's my reasoning. The real initialisation of a RemoteLogManager happens when 
its startup method is called (that is what in turn calls the configure methods 
on the RemoteStorageManager and the RemoteLogMetadataManager). You are correct 
that the createRemoteLogManager method is called earlier than the 
creation/running of the BrokerMetadataPublisher. However, that doesn't 
configure the underlying managers. Let me know if there is still confusion on 
what the problem is!

> Calls to RemoteLogManager are made before it is configured
> --
>
> Key: KAFKA-16790
> URL: https://issues.apache.org/jira/browse/KAFKA-16790
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.8.0
>Reporter: Christo Lolov
>Priority: Major
>
> BrokerMetadataPublisher#onMetadataUpdate calls ReplicaManager#applyDelta (1) 
> which in turn calls RemoteLogManager#onLeadershipChange (2), however, the 
> RemoteLogManager is configured after the BrokerMetadataPublisher starts 
> running (3, 4). This is incorrect, we either need to initialise the 
> RemoteLogManager before we start the BrokerMetadataPublisher or we need to 
> skip calls to onLeadershipChange if the RemoteLogManager is not initialised.
> (1) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala#L151]
> (2) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L2737]
> (3) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L432]
> (4) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L515]
> The way to reproduce the problem is by looking at the following changes
> {code:java}
>  config/kraft/broker.properties                         | 10 ++
>  .../main/java/kafka/log/remote/RemoteLogManager.java   |  8 +++-
>  core/src/main/scala/kafka/server/ReplicaManager.scala  |  6 +-
>  3 files changed, 22 insertions(+), 2 deletions(-)diff --git 
> a/config/kraft/broker.properties b/config/kraft/broker.properties
> index 2d15997f28..39d126cf87 100644
> --- a/config/kraft/broker.properties
> +++ b/config/kraft/broker.properties
> @@ -127,3 +127,13 @@ log.segment.bytes=1073741824
>  # The interval at which log segments are checked to see if they can be 
> deleted according
>  # to the retention policies
>  log.retention.check.interval.ms=30
> +
> +remote.log.storage.system.enable=true
> +remote.log.metadata.manager.listener.name=PLAINTEXT
> +remote.log.storage.manager.class.name=org.apache.kafka.server.log.remote.storage.LocalTieredStorage
> +remote.log.storage.manager.class.path=/home/ec2-user/kafka/storage/build/libs/kafka-storage-3.8.0-SNAPSHOT-test.jar
> +remote.log.storage.manager.impl.prefix=rsm.config.
> +remote.log.metadata.manager.impl.prefix=rlmm.config.
> +rsm.config.dir=/tmp/kafka-remote-storage
> +rlmm.config.remote.log.metadata.topic.replication.factor=1
> +log.retention.check.interval.ms=1000
> diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
> b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
> index 6555b7c0cd..e84a072abc 100644
> --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
> +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
> @@ -164,6 +164,7 @@ public class RemoteLogManager implements Closeable {
>      // The endpoint for remote log metadata manager to connect to
>      private Optional endpoint = Optional.empty();
>      private boolean closed = false;
> +    private boolean up = false;
>  
>      /**
>       * Creates RemoteLogManager instance with the given arguments.
> @@ -298,6 +299,7 @@ public class RemoteLogManager implements Closeable {
>          // in connecting to the brokers or remote storages.
>          configureRSM();
>          configureRLMM();
> +        up = true;
>      }
>  
>      public RemoteStorageManager storageManager() {
> @@ -329,7 +331,11 @@ public class RemoteLogManager implements Closeable {
>      public void onLeadershipChange(Set partitionsBecomeLeader,
>                                     Set partitionsBecomeFollower,
>                                     Map topicIds) {
> -        LOGGER.debug("Received leadership changes for leaders: {} and 
> followers: {}", partitionsBecomeLeader, partitionsBecomeFollower);
> +        if (!up) {
> +            LOGGER.error("NullPointerException");
> +            return;
> +        }
> +        LOGGER.error("Received leadership 

[jira] [Commented] (KAFKA-16790) Calls to RemoteLogManager are made before it is configured

2024-05-18 Thread Muralidhar Basani (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847584#comment-17847584
 ] 

Muralidhar Basani commented on KAFKA-16790:
---

After abit further digging

-> remoteLogManagerOpt = createRemoteLogManager() in BrokerServer.scala 
[here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L207]
 is invoked before brokerMetadataPublisher is initialized

However, if the property REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP = 
"remote.log.storage.system.enable" is set to true and few other related 
properties, remoteLogManager is configured properly, else it is None.

So, imo we don't have to fix any.

> Calls to RemoteLogManager are made before it is configured
> --
>
> Key: KAFKA-16790
> URL: https://issues.apache.org/jira/browse/KAFKA-16790
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.8.0
>Reporter: Christo Lolov
>Priority: Major
>
> BrokerMetadataPublisher#onMetadataUpdate calls ReplicaManager#applyDelta (1) 
> which in turn calls RemoteLogManager#onLeadershipChange (2), however, the 
> RemoteLogManager is configured after the BrokerMetadataPublisher starts 
> running (3, 4). This is incorrect, we either need to initialise the 
> RemoteLogManager before we start the BrokerMetadataPublisher or we need to 
> skip calls to onLeadershipChange if the RemoteLogManager is not initialised.
> (1) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala#L151]
> (2) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L2737]
> (3) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L432]
> (4) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L515]
> The way to reproduce the problem is by looking at the following changes
> {code:java}
>  config/kraft/broker.properties                         | 10 ++
>  .../main/java/kafka/log/remote/RemoteLogManager.java   |  8 +++-
>  core/src/main/scala/kafka/server/ReplicaManager.scala  |  6 +-
>  3 files changed, 22 insertions(+), 2 deletions(-)diff --git 
> a/config/kraft/broker.properties b/config/kraft/broker.properties
> index 2d15997f28..39d126cf87 100644
> --- a/config/kraft/broker.properties
> +++ b/config/kraft/broker.properties
> @@ -127,3 +127,13 @@ log.segment.bytes=1073741824
>  # The interval at which log segments are checked to see if they can be 
> deleted according
>  # to the retention policies
>  log.retention.check.interval.ms=30
> +
> +remote.log.storage.system.enable=true
> +remote.log.metadata.manager.listener.name=PLAINTEXT
> +remote.log.storage.manager.class.name=org.apache.kafka.server.log.remote.storage.LocalTieredStorage
> +remote.log.storage.manager.class.path=/home/ec2-user/kafka/storage/build/libs/kafka-storage-3.8.0-SNAPSHOT-test.jar
> +remote.log.storage.manager.impl.prefix=rsm.config.
> +remote.log.metadata.manager.impl.prefix=rlmm.config.
> +rsm.config.dir=/tmp/kafka-remote-storage
> +rlmm.config.remote.log.metadata.topic.replication.factor=1
> +log.retention.check.interval.ms=1000
> diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
> b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
> index 6555b7c0cd..e84a072abc 100644
> --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
> +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
> @@ -164,6 +164,7 @@ public class RemoteLogManager implements Closeable {
>      // The endpoint for remote log metadata manager to connect to
>      private Optional endpoint = Optional.empty();
>      private boolean closed = false;
> +    private boolean up = false;
>  
>      /**
>       * Creates RemoteLogManager instance with the given arguments.
> @@ -298,6 +299,7 @@ public class RemoteLogManager implements Closeable {
>          // in connecting to the brokers or remote storages.
>          configureRSM();
>          configureRLMM();
> +        up = true;
>      }
>  
>      public RemoteStorageManager storageManager() {
> @@ -329,7 +331,11 @@ public class RemoteLogManager implements Closeable {
>      public void onLeadershipChange(Set partitionsBecomeLeader,
>                                     Set partitionsBecomeFollower,
>                                     Map topicIds) {
> -        LOGGER.debug("Received leadership changes for leaders: {} and 
> followers: {}", partitionsBecomeLeader, partitionsBecomeFollower);
> +        if (!up) {
> +            LOGGER.error("NullPointerException");
> +            return;
> +        }
> +        LOGGER.error("Received leadership changes for leaders: {} and 
> 

[jira] [Commented] (KAFKA-16790) Calls to RemoteLogManager are made before it is configured

2024-05-17 Thread Muralidhar Basani (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847392#comment-17847392
 ] 

Muralidhar Basani commented on KAFKA-16790:
---

Even though brokerMetadataPublisher is instantiated with all publishers, and 
then rlm, seems like MetadataLoader#initializeNewPublishers calls 
BrokerMetadataPublisher#onMetadataUpdate, and by then rlm is already 
instantiated ?

So if BrokerMetadataPublisher#onMetadataUpdate is invoked when there are new 
publishers being created like 

DynamicConfigPublisher during brokerMetadataPublisher instance creation, then I 
think yea remoteLogManager is not initialized.

But seems like MetadataLoader#initializeNewPublishers calls 
BrokerMetadataPublisher#onMetadataUpdate, and by then rlm is already 
initialized.

 

It's possible that am wrong or I didn't understand this correctly.

> Calls to RemoteLogManager are made before it is configured
> --
>
> Key: KAFKA-16790
> URL: https://issues.apache.org/jira/browse/KAFKA-16790
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.8.0
>Reporter: Christo Lolov
>Priority: Major
>
> BrokerMetadataPublisher#onMetadataUpdate calls ReplicaManager#applyDelta (1) 
> which in turn calls RemoteLogManager#onLeadershipChange (2), however, the 
> RemoteLogManager is configured after the BrokerMetadataPublisher starts 
> running (3, 4). This is incorrect, we either need to initialise the 
> RemoteLogManager before we start the BrokerMetadataPublisher or we need to 
> skip calls to onLeadershipChange if the RemoteLogManager is not initialised.
> (1) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala#L151]
> (2) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L2737]
> (3) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L432]
> (4) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L515]
> The way to reproduce the problem is by looking at the following changes
> {code:java}
>  config/kraft/broker.properties                         | 10 ++
>  .../main/java/kafka/log/remote/RemoteLogManager.java   |  8 +++-
>  core/src/main/scala/kafka/server/ReplicaManager.scala  |  6 +-
>  3 files changed, 22 insertions(+), 2 deletions(-)diff --git 
> a/config/kraft/broker.properties b/config/kraft/broker.properties
> index 2d15997f28..39d126cf87 100644
> --- a/config/kraft/broker.properties
> +++ b/config/kraft/broker.properties
> @@ -127,3 +127,13 @@ log.segment.bytes=1073741824
>  # The interval at which log segments are checked to see if they can be 
> deleted according
>  # to the retention policies
>  log.retention.check.interval.ms=30
> +
> +remote.log.storage.system.enable=true
> +remote.log.metadata.manager.listener.name=PLAINTEXT
> +remote.log.storage.manager.class.name=org.apache.kafka.server.log.remote.storage.LocalTieredStorage
> +remote.log.storage.manager.class.path=/home/ec2-user/kafka/storage/build/libs/kafka-storage-3.8.0-SNAPSHOT-test.jar
> +remote.log.storage.manager.impl.prefix=rsm.config.
> +remote.log.metadata.manager.impl.prefix=rlmm.config.
> +rsm.config.dir=/tmp/kafka-remote-storage
> +rlmm.config.remote.log.metadata.topic.replication.factor=1
> +log.retention.check.interval.ms=1000
> diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
> b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
> index 6555b7c0cd..e84a072abc 100644
> --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
> +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
> @@ -164,6 +164,7 @@ public class RemoteLogManager implements Closeable {
>      // The endpoint for remote log metadata manager to connect to
>      private Optional endpoint = Optional.empty();
>      private boolean closed = false;
> +    private boolean up = false;
>  
>      /**
>       * Creates RemoteLogManager instance with the given arguments.
> @@ -298,6 +299,7 @@ public class RemoteLogManager implements Closeable {
>          // in connecting to the brokers or remote storages.
>          configureRSM();
>          configureRLMM();
> +        up = true;
>      }
>  
>      public RemoteStorageManager storageManager() {
> @@ -329,7 +331,11 @@ public class RemoteLogManager implements Closeable {
>      public void onLeadershipChange(Set partitionsBecomeLeader,
>                                     Set partitionsBecomeFollower,
>                                     Map topicIds) {
> -        LOGGER.debug("Received leadership changes for leaders: {} and 
> followers: {}", partitionsBecomeLeader, partitionsBecomeFollower);
> +        if (!up) {

[jira] [Commented] (KAFKA-16790) Calls to RemoteLogManager are made before it is configured

2024-05-17 Thread Muralidhar Basani (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847384#comment-17847384
 ] 

Muralidhar Basani commented on KAFKA-16790:
---

[~christo_lolov] can I look into this issue ?

> Calls to RemoteLogManager are made before it is configured
> --
>
> Key: KAFKA-16790
> URL: https://issues.apache.org/jira/browse/KAFKA-16790
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.8.0
>Reporter: Christo Lolov
>Priority: Major
>
> BrokerMetadataPublisher#onMetadataUpdate calls ReplicaManager#applyDelta (1) 
> which in turn calls RemoteLogManager#onLeadershipChange (2), however, the 
> RemoteLogManager is configured after the BrokerMetadataPublisher starts 
> running (3, 4). This is incorrect, we either need to initialise the 
> RemoteLogManager before we start the BrokerMetadataPublisher or we need to 
> skip calls to onLeadershipChange if the RemoteLogManager is not initialised.
> (1) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala#L151]
> (2) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L2737]
> (3) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L432]
> (4) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L515]
> The way to reproduce the problem is by looking at the following changes
> {code:java}
>  config/kraft/broker.properties                         | 10 ++
>  .../main/java/kafka/log/remote/RemoteLogManager.java   |  8 +++-
>  core/src/main/scala/kafka/server/ReplicaManager.scala  |  6 +-
>  3 files changed, 22 insertions(+), 2 deletions(-)diff --git 
> a/config/kraft/broker.properties b/config/kraft/broker.properties
> index 2d15997f28..39d126cf87 100644
> --- a/config/kraft/broker.properties
> +++ b/config/kraft/broker.properties
> @@ -127,3 +127,13 @@ log.segment.bytes=1073741824
>  # The interval at which log segments are checked to see if they can be 
> deleted according
>  # to the retention policies
>  log.retention.check.interval.ms=30
> +
> +remote.log.storage.system.enable=true
> +remote.log.metadata.manager.listener.name=PLAINTEXT
> +remote.log.storage.manager.class.name=org.apache.kafka.server.log.remote.storage.LocalTieredStorage
> +remote.log.storage.manager.class.path=/home/ec2-user/kafka/storage/build/libs/kafka-storage-3.8.0-SNAPSHOT-test.jar
> +remote.log.storage.manager.impl.prefix=rsm.config.
> +remote.log.metadata.manager.impl.prefix=rlmm.config.
> +rsm.config.dir=/tmp/kafka-remote-storage
> +rlmm.config.remote.log.metadata.topic.replication.factor=1
> +log.retention.check.interval.ms=1000
> diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
> b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
> index 6555b7c0cd..e84a072abc 100644
> --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
> +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
> @@ -164,6 +164,7 @@ public class RemoteLogManager implements Closeable {
>      // The endpoint for remote log metadata manager to connect to
>      private Optional endpoint = Optional.empty();
>      private boolean closed = false;
> +    private boolean up = false;
>  
>      /**
>       * Creates RemoteLogManager instance with the given arguments.
> @@ -298,6 +299,7 @@ public class RemoteLogManager implements Closeable {
>          // in connecting to the brokers or remote storages.
>          configureRSM();
>          configureRLMM();
> +        up = true;
>      }
>  
>      public RemoteStorageManager storageManager() {
> @@ -329,7 +331,11 @@ public class RemoteLogManager implements Closeable {
>      public void onLeadershipChange(Set partitionsBecomeLeader,
>                                     Set partitionsBecomeFollower,
>                                     Map topicIds) {
> -        LOGGER.debug("Received leadership changes for leaders: {} and 
> followers: {}", partitionsBecomeLeader, partitionsBecomeFollower);
> +        if (!up) {
> +            LOGGER.error("NullPointerException");
> +            return;
> +        }
> +        LOGGER.error("Received leadership changes for leaders: {} and 
> followers: {}", partitionsBecomeLeader, partitionsBecomeFollower);
>  
>          Map leaderPartitionsWithLeaderEpoch = 
> filterPartitions(partitionsBecomeLeader)
>                  .collect(Collectors.toMap(
> diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
> b/core/src/main/scala/kafka/server/ReplicaManager.scala
> index 35499430d6..bd3f41c3d6 100644
> --- a/core/src/main/scala/kafka/server/ReplicaManager.scala
> +++