[GitHub] [kafka] Owen-CH-Leung commented on a diff in pull request #14136: Add metadatacache into RemoteLogManager, and refactor all relevant codes
Owen-CH-Leung commented on code in PR #14136: URL: https://github.com/apache/kafka/pull/14136#discussion_r1285857919 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -144,8 +145,9 @@ public class RemoteLogManager implements Closeable { private final ConcurrentHashMap leaderOrFollowerTasks = new ConcurrentHashMap<>(); -// topic ids that are received on leadership changes, this map is cleared on stop partitions -private final ConcurrentMap topicPartitionIds = new ConcurrentHashMap<>(); +// topic ids that are received on leadership changes & on stop partitions are stored in this cache +private final MetadataCache metaDataCache; Review Comment: Got it. Will revise accordingly -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Owen-CH-Leung commented on a diff in pull request #14136: Add metadatacache into RemoteLogManager, and refactor all relevant codes
Owen-CH-Leung commented on code in PR #14136: URL: https://github.com/apache/kafka/pull/14136#discussion_r1285865061 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -347,15 +353,16 @@ public void onLeadershipChange(Set partitionsBecomeLeader, public void stopPartitions(TopicPartition topicPartition, boolean delete) { if (delete) { // Delete from internal datastructures only if it is to be deleted. -Uuid topicIdPartition = topicPartitionIds.remove(topicPartition); -LOGGER.debug("Removed partition: {} from topicPartitionIds", topicIdPartition); +Map mapping = metaDataCache.topicNamesToIds(); +Uuid topicIdPartition = mapping.remove(topicPartition.topic()); Review Comment: Got it. So can I say for `stopPartition` and `onLeadershipChange`, there's no need to update the metadatacache at all ? Is there any extra logic that should be addded into these 2 APIs ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Owen-CH-Leung commented on a diff in pull request #14136: Add metadatacache into RemoteLogManager, and refactor all relevant codes
Owen-CH-Leung commented on code in PR #14136: URL: https://github.com/apache/kafka/pull/14136#discussion_r1292378582 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -347,15 +353,16 @@ public void onLeadershipChange(Set partitionsBecomeLeader, public void stopPartitions(TopicPartition topicPartition, boolean delete) { if (delete) { // Delete from internal datastructures only if it is to be deleted. -Uuid topicIdPartition = topicPartitionIds.remove(topicPartition); -LOGGER.debug("Removed partition: {} from topicPartitionIds", topicIdPartition); +Map mapping = metaDataCache.topicNamesToIds(); +Uuid topicIdPartition = mapping.remove(topicPartition.topic()); Review Comment: @clolov Thanks a lot for your comment. I've incorporated your comments -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Owen-CH-Leung commented on a diff in pull request #14136: Add metadatacache into RemoteLogManager, and refactor all relevant codes
Owen-CH-Leung commented on code in PR #14136: URL: https://github.com/apache/kafka/pull/14136#discussion_r1292378925 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -280,13 +285,18 @@ public RemoteStorageManager storageManager() { return remoteLogStorageManager; } +public MetadataCache metadataCache() { +return metaDataCache; +} + private Stream filterPartitions(Set partitions) { // We are not specifically checking for internal topics etc here as `log.remoteLogEnabled()` already handles that. return partitions.stream().filter(partition -> partition.log().exists(UnifiedLog::remoteLogEnabled)); } private void cacheTopicPartitionIds(TopicIdPartition topicIdPartition) { -Uuid previousTopicId = topicPartitionIds.put(topicIdPartition.topicPartition(), topicIdPartition.topicId()); +Map mapping = metaDataCache.topicNamesToIds(); Review Comment: Thanks. I've removed this logic -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Owen-CH-Leung commented on a diff in pull request #14136: Add metadatacache into RemoteLogManager, and refactor all relevant codes
Owen-CH-Leung commented on code in PR #14136: URL: https://github.com/apache/kafka/pull/14136#discussion_r1296606546 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -802,41 +817,25 @@ private void verifyInCache(TopicIdPartition... topicIdPartitions) { }); } -private void verifyNotInCache(TopicIdPartition... topicIdPartitions) { -Arrays.stream(topicIdPartitions).forEach(topicIdPartition -> { -assertThrows(KafkaException.class, () -> - remoteLogManager.fetchRemoteLogSegmentMetadata(topicIdPartition.topicPartition(), 0, 0L)); -}); -} - @Test void testTopicIdCacheUpdates() throws RemoteStorageException { Review Comment: Sure. Removed the test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Owen-CH-Leung commented on a diff in pull request #14136: Add metadatacache into RemoteLogManager, and refactor all relevant codes
Owen-CH-Leung commented on code in PR #14136: URL: https://github.com/apache/kafka/pull/14136#discussion_r1296606986 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -3600,7 +3600,10 @@ class ReplicaManagerTest { "clusterId", time, _ => Optional.of(mockLog), - brokerTopicStats) + brokerTopicStats, + mock(classOf[MetadataCache]) Review Comment: Sure. Moved the `mock(classOf[MetadataCache])` as variable now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Owen-CH-Leung commented on a diff in pull request #14136: Add metadatacache into RemoteLogManager, and refactor all relevant codes
Owen-CH-Leung commented on code in PR #14136: URL: https://github.com/apache/kafka/pull/14136#discussion_r1303025695 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -378,7 +372,7 @@ public void stopPartitions(Set topicPartitions, remoteLogMetadataManager.onStopPartitions(topicIdPartitions); if (delete) { // NOTE: this#stopPartitions method is called when Replica state changes to Offline and ReplicaDeletionStarted -topicPartitions.forEach(topicIdByPartitionMap::remove); +//topicPartitions.forEach(topicIdByPartitionMap::remove); Review Comment: Thanks. Removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Owen-CH-Leung commented on a diff in pull request #14136: Add metadatacache into RemoteLogManager, and refactor all relevant codes
Owen-CH-Leung commented on code in PR #14136: URL: https://github.com/apache/kafka/pull/14136#discussion_r1303026451 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -285,19 +288,15 @@ public RemoteStorageManager storageManager() { return remoteLogStorageManager; } +public MetadataCache metadataCache() { +return metadataCache; Review Comment: Yes agree that we don't really need this. I've removed it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Owen-CH-Leung commented on a diff in pull request #14136: Add metadatacache into RemoteLogManager, and refactor all relevant codes
Owen-CH-Leung commented on code in PR #14136: URL: https://github.com/apache/kafka/pull/14136#discussion_r1303027263 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -3624,14 +3624,18 @@ class ReplicaManagerTest { val remoteLogManagerConfig = new RemoteLogManagerConfig(config) val mockLog = mock(classOf[UnifiedLog]) val brokerTopicStats = new BrokerTopicStats(java.util.Optional.of(KafkaConfig.fromProps(props))) +val mockMetadataCache = mock(classOf[MetadataCache]) val remoteLogManager = new RemoteLogManager( remoteLogManagerConfig, 0, TestUtils.tempRelativeDir("data").getAbsolutePath, "clusterId", time, _ => Optional.of(mockLog), - brokerTopicStats) + brokerTopicStats, + mockMetadataCache +) + when(remoteLogManager.metadataCache().getTopicId(topic)).thenReturn(topicId) Review Comment: Thanks. I've changed to use `mockMetadataCache` instead -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org