[GitHub] [kafka] Owen-CH-Leung commented on a diff in pull request #14136: Add metadatacache into RemoteLogManager, and refactor all relevant codes

2023-08-07 Thread via GitHub


Owen-CH-Leung commented on code in PR #14136:
URL: https://github.com/apache/kafka/pull/14136#discussion_r1285857919


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -144,8 +145,9 @@ public class RemoteLogManager implements Closeable {
 
 private final ConcurrentHashMap 
leaderOrFollowerTasks = new ConcurrentHashMap<>();
 
-// topic ids that are received on leadership changes, this map is cleared 
on stop partitions
-private final ConcurrentMap topicPartitionIds = new 
ConcurrentHashMap<>();
+// topic ids that are received on leadership changes & on stop partitions 
are stored in this cache
+private final MetadataCache metaDataCache;

Review Comment:
   Got it. Will revise accordingly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Owen-CH-Leung commented on a diff in pull request #14136: Add metadatacache into RemoteLogManager, and refactor all relevant codes

2023-08-07 Thread via GitHub


Owen-CH-Leung commented on code in PR #14136:
URL: https://github.com/apache/kafka/pull/14136#discussion_r1285865061


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -347,15 +353,16 @@ public void onLeadershipChange(Set 
partitionsBecomeLeader,
 public void stopPartitions(TopicPartition topicPartition, boolean delete) {
 if (delete) {
 // Delete from internal datastructures only if it is to be deleted.
-Uuid topicIdPartition = topicPartitionIds.remove(topicPartition);
-LOGGER.debug("Removed partition: {} from topicPartitionIds", 
topicIdPartition);
+Map mapping = metaDataCache.topicNamesToIds();
+Uuid topicIdPartition = mapping.remove(topicPartition.topic());

Review Comment:
   Got it. So can I say for `stopPartition` and `onLeadershipChange`, there's 
no need to update the metadatacache at all ? Is there any extra logic that 
should be addded into these 2 APIs ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Owen-CH-Leung commented on a diff in pull request #14136: Add metadatacache into RemoteLogManager, and refactor all relevant codes

2023-08-12 Thread via GitHub


Owen-CH-Leung commented on code in PR #14136:
URL: https://github.com/apache/kafka/pull/14136#discussion_r1292378582


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -347,15 +353,16 @@ public void onLeadershipChange(Set 
partitionsBecomeLeader,
 public void stopPartitions(TopicPartition topicPartition, boolean delete) {
 if (delete) {
 // Delete from internal datastructures only if it is to be deleted.
-Uuid topicIdPartition = topicPartitionIds.remove(topicPartition);
-LOGGER.debug("Removed partition: {} from topicPartitionIds", 
topicIdPartition);
+Map mapping = metaDataCache.topicNamesToIds();
+Uuid topicIdPartition = mapping.remove(topicPartition.topic());

Review Comment:
   @clolov Thanks a lot for your comment. I've incorporated your comments



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Owen-CH-Leung commented on a diff in pull request #14136: Add metadatacache into RemoteLogManager, and refactor all relevant codes

2023-08-12 Thread via GitHub


Owen-CH-Leung commented on code in PR #14136:
URL: https://github.com/apache/kafka/pull/14136#discussion_r1292378925


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -280,13 +285,18 @@ public RemoteStorageManager storageManager() {
 return remoteLogStorageManager;
 }
 
+public MetadataCache metadataCache() {
+return metaDataCache;
+}
+
 private Stream filterPartitions(Set partitions) {
 // We are not specifically checking for internal topics etc here as 
`log.remoteLogEnabled()` already handles that.
 return partitions.stream().filter(partition -> 
partition.log().exists(UnifiedLog::remoteLogEnabled));
 }
 
 private void cacheTopicPartitionIds(TopicIdPartition topicIdPartition) {
-Uuid previousTopicId = 
topicPartitionIds.put(topicIdPartition.topicPartition(), 
topicIdPartition.topicId());
+Map mapping = metaDataCache.topicNamesToIds();

Review Comment:
   Thanks. I've removed this logic



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Owen-CH-Leung commented on a diff in pull request #14136: Add metadatacache into RemoteLogManager, and refactor all relevant codes

2023-08-16 Thread via GitHub


Owen-CH-Leung commented on code in PR #14136:
URL: https://github.com/apache/kafka/pull/14136#discussion_r1296606546


##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -802,41 +817,25 @@ private void verifyInCache(TopicIdPartition... 
topicIdPartitions) {
 });
 }
 
-private void verifyNotInCache(TopicIdPartition... topicIdPartitions) {
-Arrays.stream(topicIdPartitions).forEach(topicIdPartition -> {
-assertThrows(KafkaException.class, () ->
-
remoteLogManager.fetchRemoteLogSegmentMetadata(topicIdPartition.topicPartition(),
 0, 0L));
-});
-}
-
 @Test
 void testTopicIdCacheUpdates() throws RemoteStorageException {

Review Comment:
   Sure. Removed the test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Owen-CH-Leung commented on a diff in pull request #14136: Add metadatacache into RemoteLogManager, and refactor all relevant codes

2023-08-16 Thread via GitHub


Owen-CH-Leung commented on code in PR #14136:
URL: https://github.com/apache/kafka/pull/14136#discussion_r1296606986


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -3600,7 +3600,10 @@ class ReplicaManagerTest {
   "clusterId",
   time,
   _ => Optional.of(mockLog),
-  brokerTopicStats)
+  brokerTopicStats,
+  mock(classOf[MetadataCache])

Review Comment:
   Sure. Moved the `mock(classOf[MetadataCache])` as variable now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Owen-CH-Leung commented on a diff in pull request #14136: Add metadatacache into RemoteLogManager, and refactor all relevant codes

2023-08-23 Thread via GitHub


Owen-CH-Leung commented on code in PR #14136:
URL: https://github.com/apache/kafka/pull/14136#discussion_r1303025695


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -378,7 +372,7 @@ public void stopPartitions(Set 
topicPartitions,
 remoteLogMetadataManager.onStopPartitions(topicIdPartitions);
 if (delete) {
 // NOTE: this#stopPartitions method is called when Replica state 
changes to Offline and ReplicaDeletionStarted
-topicPartitions.forEach(topicIdByPartitionMap::remove);
+//topicPartitions.forEach(topicIdByPartitionMap::remove);

Review Comment:
   Thanks. Removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Owen-CH-Leung commented on a diff in pull request #14136: Add metadatacache into RemoteLogManager, and refactor all relevant codes

2023-08-23 Thread via GitHub


Owen-CH-Leung commented on code in PR #14136:
URL: https://github.com/apache/kafka/pull/14136#discussion_r1303026451


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -285,19 +288,15 @@ public RemoteStorageManager storageManager() {
 return remoteLogStorageManager;
 }
 
+public MetadataCache metadataCache() {
+return metadataCache;

Review Comment:
   Yes agree that we don't really need this. I've removed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Owen-CH-Leung commented on a diff in pull request #14136: Add metadatacache into RemoteLogManager, and refactor all relevant codes

2023-08-23 Thread via GitHub


Owen-CH-Leung commented on code in PR #14136:
URL: https://github.com/apache/kafka/pull/14136#discussion_r1303027263


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -3624,14 +3624,18 @@ class ReplicaManagerTest {
 val remoteLogManagerConfig = new RemoteLogManagerConfig(config)
 val mockLog = mock(classOf[UnifiedLog])
 val brokerTopicStats = new 
BrokerTopicStats(java.util.Optional.of(KafkaConfig.fromProps(props)))
+val mockMetadataCache = mock(classOf[MetadataCache])
 val remoteLogManager = new RemoteLogManager(
   remoteLogManagerConfig,
   0,
   TestUtils.tempRelativeDir("data").getAbsolutePath,
   "clusterId",
   time,
   _ => Optional.of(mockLog),
-  brokerTopicStats)
+  brokerTopicStats,
+  mockMetadataCache
+)
+
when(remoteLogManager.metadataCache().getTopicId(topic)).thenReturn(topicId)

Review Comment:
   Thanks. I've changed to use `mockMetadataCache` instead



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org