Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]
junrao commented on code in PR #15951: URL: https://github.com/apache/kafka/pull/15951#discussion_r1678493157 ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -427,7 +427,7 @@ class Partition(val topicPartition: TopicPartition, * @param highWatermarkCheckpoints Checkpoint to load initial high watermark from * @return true iff the future replica is created */ - def maybeCreateFutureReplica(logDir: String, highWatermarkCheckpoints: OffsetCheckpoints): Boolean = { + def maybeCreateFutureReplica(logDir: String, highWatermarkCheckpoints: OffsetCheckpoints, topicId: Option[Uuid] = topicId): Boolean = { Review Comment: Could we add the new param to the javadoc? ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -2114,25 +2114,24 @@ class ReplicaManager(val config: KafkaConfig, partition.log.foreach { _ => val leader = BrokerEndPoint(config.brokerId, "localhost", -1) - // Add future replica log to partition's map - partition.createLogIfNotExists( -isNew = false, -isFutureReplica = true, -offsetCheckpoints, -topicIds(partition.topic)) - - // pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move - // replica from source dir to destination dir - logManager.abortAndPauseCleaning(topicPartition) + // Add future replica log to partition's map if it's not existed + if (partition.maybeCreateFutureReplica(futureLog.parentDir, offsetCheckpoints, topicIds(partition.topic))) { +// pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move +// replica from source dir to destination dir +logManager.abortAndPauseCleaning(topicPartition) + } futureReplicasAndInitialOffset.put(topicPartition, InitialFetchState(topicIds(topicPartition.topic), leader, partition.getLeaderEpoch, futureLog.highWatermark)) } } } -if (futureReplicasAndInitialOffset.nonEmpty) +if (futureReplicasAndInitialOffset.nonEmpty) { + // Even though it's possible that there is another thread adding fetcher for this future log partition, + // but it's fine because `BrokerIdAndFetcherId` will be identical and the operation will be no-op. Review Comment: When the leader changes, we need to propagate the new leader epoch to ReplicaAlterLogDirsThread (see https://github.com/apache/kafka/pull/8223). So, the operation is not a no-op? ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -2114,25 +2114,24 @@ class ReplicaManager(val config: KafkaConfig, partition.log.foreach { _ => val leader = BrokerEndPoint(config.brokerId, "localhost", -1) - // Add future replica log to partition's map - partition.createLogIfNotExists( -isNew = false, -isFutureReplica = true, -offsetCheckpoints, -topicIds(partition.topic)) - - // pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move - // replica from source dir to destination dir - logManager.abortAndPauseCleaning(topicPartition) + // Add future replica log to partition's map if it's not existed + if (partition.maybeCreateFutureReplica(futureLog.parentDir, offsetCheckpoints, topicIds(partition.topic))) { +// pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move +// replica from source dir to destination dir +logManager.abortAndPauseCleaning(topicPartition) + } futureReplicasAndInitialOffset.put(topicPartition, InitialFetchState(topicIds(topicPartition.topic), leader, partition.getLeaderEpoch, futureLog.highWatermark)) } } } -if (futureReplicasAndInitialOffset.nonEmpty) +if (futureReplicasAndInitialOffset.nonEmpty) { + // Even though it's possible that there is another thread adding fetcher for this future log partition, Review Comment: Hmm, `becomeLeaderOrFollower()`, `alterReplicaLogDirs()`, and `applyDelta()` are done under the `replicaStateChangeLock`. Is it really possible for another thread to add fetcher for the future log? ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -2114,25 +2114,24 @@ class ReplicaManager(val config: KafkaConfig, partition.log.foreach { _ => val leader = BrokerEndPoint(config.brokerId, "localhost", -1) - // Add future replica log to partition's map - partition.createLogIfNotExists( -isNew = false, -isFutureReplica = true, -offsetCheckpoints, -topicIds(partition.topic)) - - // pause cleaning for partitions that are being moved
Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]
showuon merged PR #15951: URL: https://github.com/apache/kafka/pull/15951 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]
showuon commented on PR #15951: URL: https://github.com/apache/kafka/pull/15951#issuecomment-2134320590 Failed tests are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]
showuon commented on code in PR #15951: URL: https://github.com/apache/kafka/pull/15951#discussion_r1610854305 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -2114,16 +2114,12 @@ class ReplicaManager(val config: KafkaConfig, partition.log.foreach { _ => val leader = BrokerEndPoint(config.brokerId, "localhost", -1) - // Add future replica log to partition's map - partition.createLogIfNotExists( -isNew = false, -isFutureReplica = true, -offsetCheckpoints, -topicIds(partition.topic)) - - // pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move - // replica from source dir to destination dir - logManager.abortAndPauseCleaning(topicPartition) + // Add future replica log to partition's map if it's not existed Review Comment: @chia7712 , thanks for the comment. For this: > In short, alterReplicaLogDirs adds alter thread [0] only if it succeeds to create future log of partition. Maybe maybeAddLogDirFetchers should follow same rule? Or we can add comments to say "that is fine as replicaAlterLogDirsManager.addFetcherForPartitions will be a no-op in this case? I chose latter option because if we only create fetcher when future log is inexisted, it might cause potential side effect that this fetcher is removed when leadership change, but not get added later. I've added the comment in this commit: https://github.com/apache/kafka/pull/15951/commits/0d78e493e484dd4f27ba6a127616d802999f22d0 . Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]
chia7712 commented on code in PR #15951: URL: https://github.com/apache/kafka/pull/15951#discussion_r1609423094 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -2114,16 +2114,12 @@ class ReplicaManager(val config: KafkaConfig, partition.log.foreach { _ => val leader = BrokerEndPoint(config.brokerId, "localhost", -1) - // Add future replica log to partition's map - partition.createLogIfNotExists( -isNew = false, -isFutureReplica = true, -offsetCheckpoints, -topicIds(partition.topic)) - - // pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move - // replica from source dir to destination dir - logManager.abortAndPauseCleaning(topicPartition) + // Add future replica log to partition's map if it's not existed Review Comment: oh, my previous comments is incorrect. Both `alterReplicaLogDirs` and `maybeAddLogDirFetchers` are in `replicaStateChangeLock`, so the race condition I described should not happen. However, I'm thinking whether it is fine to add alter thread by `maybeAddLogDirFetchers` even though the future log of partition is already created by another thread. Although no new alter thread will be created as `BrokerIdAndFetcherId` is identical. In short, `alterReplicaLogDirs` adds alter thread [0] only if it succeeds to create future log of partition. Maybe `maybeAddLogDirFetchers` should follow same rule? Or we can add comments to say "that is fine as `replicaAlterLogDirsManager.addFetcherForPartitions` will be a no-op in this case? [0] https://github.com/apache/kafka/blob/5552f5c26df4eb07b2d6ee218e4a29e4ca790d5c/core/src/main/scala/kafka/server/ReplicaManager.scala#L1198 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]
showuon commented on code in PR #15951: URL: https://github.com/apache/kafka/pull/15951#discussion_r1609228927 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -2114,16 +2114,12 @@ class ReplicaManager(val config: KafkaConfig, partition.log.foreach { _ => val leader = BrokerEndPoint(config.brokerId, "localhost", -1) - // Add future replica log to partition's map - partition.createLogIfNotExists( -isNew = false, -isFutureReplica = true, -offsetCheckpoints, -topicIds(partition.topic)) - - // pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move - // replica from source dir to destination dir - logManager.abortAndPauseCleaning(topicPartition) + // Add future replica log to partition's map if it's not existed Review Comment: > However, adding alter thread (replicaAlterLogDirsManager.addFetcherForPartitions(futureReplicasAndInitialOffset)) is not in this check. Is it possible that alter thread, which is invoked by another thread, just remove the future log and then this thread add the topic partition to replicaAlterLogDirsManager? It seems to me that alter thread will get fail as future log of partition is gone. That's possible. But I think that's fine because the removal of future log could because: 1. alter logDir completes. In this case, the new leaderAndIsr request or topic partition update will updated and this fetcher will be removed then in `ReplicaManager#makeLeader or makeFollower`. 2. Another log failure happened. In this case the createLogIfInexsted will fail, too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]
chia7712 commented on code in PR #15951: URL: https://github.com/apache/kafka/pull/15951#discussion_r1608320628 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -2114,16 +2114,12 @@ class ReplicaManager(val config: KafkaConfig, partition.log.foreach { _ => val leader = BrokerEndPoint(config.brokerId, "localhost", -1) - // Add future replica log to partition's map - partition.createLogIfNotExists( -isNew = false, -isFutureReplica = true, -offsetCheckpoints, -topicIds(partition.topic)) - - // pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move - // replica from source dir to destination dir - logManager.abortAndPauseCleaning(topicPartition) + // Add future replica log to partition's map if it's not existed Review Comment: If `maybeCreateFutureReplica` return false, we assume another thread already add the future log to partition and invoke alter thread. Hence, we don't need to abort cleaning since another thread does it. However, adding alter thread (`replicaAlterLogDirsManager.addFetcherForPartitions(futureReplicasAndInitialOffset)`) is not in this check. Is it possible that alter thread, which is invoked by another thread, just remove the future log and then this thread add the topic partition to `replicaAlterLogDirsManager`? It seems to me that alter thread will get fail as future log of partition is gone. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]
soarez commented on code in PR #15951: URL: https://github.com/apache/kafka/pull/15951#discussion_r1608164096 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -2114,19 +2114,16 @@ class ReplicaManager(val config: KafkaConfig, partition.log.foreach { _ => val leader = BrokerEndPoint(config.brokerId, "localhost", -1) - // Add future replica log to partition's map - partition.createLogIfNotExists( -isNew = false, -isFutureReplica = true, -offsetCheckpoints, -topicIds(partition.topic)) - - // pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move - // replica from source dir to destination dir - logManager.abortAndPauseCleaning(topicPartition) - - futureReplicasAndInitialOffset.put(topicPartition, InitialFetchState(topicIds(topicPartition.topic), leader, -partition.getLeaderEpoch, futureLog.highWatermark)) + if (partition.maybeCreateFutureReplica(futureLog.parentDir, offsetCheckpoints, topicIds(partition.topic))) { Review Comment: > That's fine because there is no chances that futureLog exists in partition#futureLog but not in logManager#futureLogs map. The scenario I was thinking of is when the broker starts up, `logManager` loads the future log, so futureLog exists in `partition#futureLog` but not in `logManager#futureLogs` map yet. Only later the when the broker catches up with metatada (`ReplicaManager#applyDelta`) or receives a LeaderAndIsr request (`becomeLeaderOrFollower`) and this method `maybeAddLogDirFetchers` is called, is when we need to make sure `partition#futureLog` is populated too. But you're right, my confusion here was with where maybeCreateFutureReplica checks if the futureLog already exists, it checks in itself (Partition) not in LogManager, so this makes sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]
soarez commented on PR #15951: URL: https://github.com/apache/kafka/pull/15951#issuecomment-2122392557 Sorry, I clicked the wrong window, did not mean to approve, I'm still reviewing this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]
showuon commented on code in PR #15951: URL: https://github.com/apache/kafka/pull/15951#discussion_r1608048789 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -2114,19 +2114,16 @@ class ReplicaManager(val config: KafkaConfig, partition.log.foreach { _ => val leader = BrokerEndPoint(config.brokerId, "localhost", -1) - // Add future replica log to partition's map - partition.createLogIfNotExists( -isNew = false, -isFutureReplica = true, -offsetCheckpoints, -topicIds(partition.topic)) - - // pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move - // replica from source dir to destination dir - logManager.abortAndPauseCleaning(topicPartition) - - futureReplicasAndInitialOffset.put(topicPartition, InitialFetchState(topicIds(topicPartition.topic), leader, -partition.getLeaderEpoch, futureLog.highWatermark)) + if (partition.maybeCreateFutureReplica(futureLog.parentDir, offsetCheckpoints, topicIds(partition.topic))) { Review Comment: We need a way to determine if `partition#futureLog` map exists or not, so if we call `partition.createLogIfNotExists()`, we can't know if it is created or not. `partition.maybeCreateFutureReplica()` returns a boolean value to notify us what we need. That is, `partition.maybeCreateFutureReplica()` only calls `partition.createLogIfNotExists()` when the `partition#futureLog` map doesn't contain the partition. That's fine because there is no chances that futureLog exists in `partition#futureLog` but not in `logManager#futureLogs` map. Sorry, maybe I didn't get your question here. Could you explain again if I misunderstand it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]
soarez commented on code in PR #15951: URL: https://github.com/apache/kafka/pull/15951#discussion_r1607884973 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -2114,19 +2114,16 @@ class ReplicaManager(val config: KafkaConfig, partition.log.foreach { _ => val leader = BrokerEndPoint(config.brokerId, "localhost", -1) - // Add future replica log to partition's map - partition.createLogIfNotExists( -isNew = false, -isFutureReplica = true, -offsetCheckpoints, -topicIds(partition.topic)) - - // pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move - // replica from source dir to destination dir - logManager.abortAndPauseCleaning(topicPartition) - - futureReplicasAndInitialOffset.put(topicPartition, InitialFetchState(topicIds(topicPartition.topic), leader, -partition.getLeaderEpoch, futureLog.highWatermark)) + if (partition.maybeCreateFutureReplica(futureLog.parentDir, offsetCheckpoints, topicIds(partition.topic))) { Review Comment: > That means, we have to do the partition.createLogIfNotExists() here. Given that `partition.maybeCreateFutureReplica()` only calls `partition.createLogIfNotExists()` if the future replica doesn't yet exist, and that it's possible that the future log only exists in `logManager#futureLogs` map, but not in `partition#futureLog` map – do we need to call `partition.createLogIfNotExists()` directly instead of ``partition.maybeCreateFutureReplica()` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]
showuon commented on PR #15951: URL: https://github.com/apache/kafka/pull/15951#issuecomment-2121598551 @chia7712 , I've updated the PR. Please take a look again when available. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]
showuon commented on code in PR #15951: URL: https://github.com/apache/kafka/pull/15951#discussion_r1604412458 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -2116,14 +2116,13 @@ class ReplicaManager(val config: KafkaConfig, // Add future replica log to partition's map if it's not existed if (partition.maybeCreateFutureReplica(futureLog.parentDir, offsetCheckpoints, topicIds(partition.topic))) { -val futureLogInPartition = futureLocalLogOrException(topicPartition) // pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move // replica from source dir to destination dir logManager.abortAndPauseCleaning(topicPartition) - -futureReplicasAndInitialOffset.put(topicPartition, InitialFetchState(topicIds(topicPartition.topic), leader, - partition.getLeaderEpoch, futureLogInPartition.highWatermark)) } + + futureReplicasAndInitialOffset.put(topicPartition, InitialFetchState(topicIds(topicPartition.topic), leader, +partition.getLeaderEpoch, futureLog.highWatermark)) Review Comment: Thanks to the failing test, I found I was wrong. We should always add the partition into fetch thread no matter we created the future log or not since before `maybeAddLogDirFetchers` is called, the fetchers are all removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]
showuon commented on code in PR #15951: URL: https://github.com/apache/kafka/pull/15951#discussion_r1604409235 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -314,6 +314,77 @@ class ReplicaManagerTest { } } + @ParameterizedTest(name = "testMaybeAddLogDirFetchersPausingCleaning with futureLogCreated: {0}") + @ValueSource(booleans = Array(true, false)) + def testMaybeAddLogDirFetchersPausingCleaning(futureLogCreated: Boolean): Unit = { +val dir1 = TestUtils.tempDir() +val dir2 = TestUtils.tempDir() +val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect) +props.put("log.dirs", dir1.getAbsolutePath + "," + dir2.getAbsolutePath) +val config = KafkaConfig.fromProps(props) +val logManager = TestUtils.createLogManager(config.logDirs.map(new File(_)), new LogConfig(new Properties())) +val spyLogManager = spy(logManager) +val metadataCache: MetadataCache = mock(classOf[MetadataCache]) +mockGetAliveBrokerFunctions(metadataCache, Seq(new Node(0, "host0", 0))) + when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion) +val tp0 = new TopicPartition(topic, 0) +val uuid = Uuid.randomUuid() +val rm = new ReplicaManager( + metrics = metrics, + config = config, + time = time, + scheduler = new MockScheduler(time), + logManager = spyLogManager, + quotaManagers = quotaManager, + metadataCache = metadataCache, + logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), + alterPartitionManager = alterPartitionManager) + +try { + val partition = rm.createPartition(tp0) + partition.createLogIfNotExists(isNew = false, isFutureReplica = false, +new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), None) + + rm.becomeLeaderOrFollower(0, new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, +Seq(new LeaderAndIsrPartitionState() + .setTopicName(topic) + .setPartitionIndex(0) + .setControllerEpoch(0) + .setLeader(0) + .setLeaderEpoch(0) + .setIsr(Seq[Integer](0).asJava) + .setPartitionEpoch(0) + .setReplicas(Seq[Integer](0).asJava) + .setIsNew(false)).asJava, +Collections.singletonMap(topic, Uuid.randomUuid()), Review Comment: Ah, I know what you're talking about now. The reason it won't fail is because the topicId we feed into the partition object is None. So the topicId consistency check will always pass because the original topicId is not set. I've updated the test and also verify the response has no errors in this commit: https://github.com/apache/kafka/pull/15951/commits/4a1b76d38effa2233fb7bc062920a2457243fded . Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]
chia7712 commented on code in PR #15951: URL: https://github.com/apache/kafka/pull/15951#discussion_r1602929057 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -314,6 +314,77 @@ class ReplicaManagerTest { } } + @ParameterizedTest(name = "testMaybeAddLogDirFetchersPausingCleaning with futureLogCreated: {0}") + @ValueSource(booleans = Array(true, false)) + def testMaybeAddLogDirFetchersPausingCleaning(futureLogCreated: Boolean): Unit = { +val dir1 = TestUtils.tempDir() +val dir2 = TestUtils.tempDir() +val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect) +props.put("log.dirs", dir1.getAbsolutePath + "," + dir2.getAbsolutePath) +val config = KafkaConfig.fromProps(props) +val logManager = TestUtils.createLogManager(config.logDirs.map(new File(_)), new LogConfig(new Properties())) +val spyLogManager = spy(logManager) +val metadataCache: MetadataCache = mock(classOf[MetadataCache]) +mockGetAliveBrokerFunctions(metadataCache, Seq(new Node(0, "host0", 0))) + when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion) +val tp0 = new TopicPartition(topic, 0) +val uuid = Uuid.randomUuid() +val rm = new ReplicaManager( + metrics = metrics, + config = config, + time = time, + scheduler = new MockScheduler(time), + logManager = spyLogManager, + quotaManagers = quotaManager, + metadataCache = metadataCache, + logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), + alterPartitionManager = alterPartitionManager) + +try { + val partition = rm.createPartition(tp0) + partition.createLogIfNotExists(isNew = false, isFutureReplica = false, +new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), None) + + rm.becomeLeaderOrFollower(0, new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, +Seq(new LeaderAndIsrPartitionState() + .setTopicName(topic) + .setPartitionIndex(0) + .setControllerEpoch(0) + .setLeader(0) + .setLeaderEpoch(0) + .setIsr(Seq[Integer](0).asJava) + .setPartitionEpoch(0) + .setReplicas(Seq[Integer](0).asJava) + .setIsNew(false)).asJava, +Collections.singletonMap(topic, Uuid.randomUuid()), Review Comment: My point was this test case will produce different topic ids. For example, the following assert will fail ```scala assertEquals(spyLogManager.getLog(tp0, isFuture = false).get.topicId, spyLogManager.getLog(tp0, isFuture = true).get.topicId) ``` It seems `becomeLeaderOrFollower` set the topic id of "log", and the mock return a different topic id of future log. IIRC (and @showuon your confirm) they should have same topic ID, and so I'm a bit confused by this test case. Please feel free to correct me if I misunderstand anything. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]
showuon commented on code in PR #15951: URL: https://github.com/apache/kafka/pull/15951#discussion_r1602524342 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -314,6 +314,77 @@ class ReplicaManagerTest { } } + @ParameterizedTest(name = "testMaybeAddLogDirFetchersPausingCleaning with futureLogCreated: {0}") + @ValueSource(booleans = Array(true, false)) + def testMaybeAddLogDirFetchersPausingCleaning(futureLogCreated: Boolean): Unit = { +val dir1 = TestUtils.tempDir() +val dir2 = TestUtils.tempDir() +val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect) +props.put("log.dirs", dir1.getAbsolutePath + "," + dir2.getAbsolutePath) +val config = KafkaConfig.fromProps(props) +val logManager = TestUtils.createLogManager(config.logDirs.map(new File(_)), new LogConfig(new Properties())) +val spyLogManager = spy(logManager) +val metadataCache: MetadataCache = mock(classOf[MetadataCache]) +mockGetAliveBrokerFunctions(metadataCache, Seq(new Node(0, "host0", 0))) + when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion) +val tp0 = new TopicPartition(topic, 0) +val uuid = Uuid.randomUuid() +val rm = new ReplicaManager( + metrics = metrics, + config = config, + time = time, + scheduler = new MockScheduler(time), + logManager = spyLogManager, + quotaManagers = quotaManager, + metadataCache = metadataCache, + logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), + alterPartitionManager = alterPartitionManager) + +try { + val partition = rm.createPartition(tp0) + partition.createLogIfNotExists(isNew = false, isFutureReplica = false, +new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), None) + + rm.becomeLeaderOrFollower(0, new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, +Seq(new LeaderAndIsrPartitionState() + .setTopicName(topic) + .setPartitionIndex(0) + .setControllerEpoch(0) + .setLeader(0) + .setLeaderEpoch(0) + .setIsr(Seq[Integer](0).asJava) + .setPartitionEpoch(0) + .setReplicas(Seq[Integer](0).asJava) + .setIsNew(false)).asJava, +Collections.singletonMap(topic, Uuid.randomUuid()), Review Comment: No, the future log is just a follower of the original log, so it must have the same topic ID as the original one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]
chia7712 commented on code in PR #15951: URL: https://github.com/apache/kafka/pull/15951#discussion_r1601591079 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -314,6 +314,77 @@ class ReplicaManagerTest { } } + @ParameterizedTest(name = "testMaybeAddLogDirFetchersPausingCleaning with futureLogCreated: {0}") + @ValueSource(booleans = Array(true, false)) + def testMaybeAddLogDirFetchersPausingCleaning(futureLogCreated: Boolean): Unit = { +val dir1 = TestUtils.tempDir() +val dir2 = TestUtils.tempDir() +val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect) +props.put("log.dirs", dir1.getAbsolutePath + "," + dir2.getAbsolutePath) +val config = KafkaConfig.fromProps(props) +val logManager = TestUtils.createLogManager(config.logDirs.map(new File(_)), new LogConfig(new Properties())) +val spyLogManager = spy(logManager) +val metadataCache: MetadataCache = mock(classOf[MetadataCache]) +mockGetAliveBrokerFunctions(metadataCache, Seq(new Node(0, "host0", 0))) + when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion) +val tp0 = new TopicPartition(topic, 0) +val uuid = Uuid.randomUuid() +val rm = new ReplicaManager( + metrics = metrics, + config = config, + time = time, + scheduler = new MockScheduler(time), + logManager = spyLogManager, + quotaManagers = quotaManager, + metadataCache = metadataCache, + logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), + alterPartitionManager = alterPartitionManager) + +try { + val partition = rm.createPartition(tp0) + partition.createLogIfNotExists(isNew = false, isFutureReplica = false, +new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), None) + + rm.becomeLeaderOrFollower(0, new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, +Seq(new LeaderAndIsrPartitionState() + .setTopicName(topic) + .setPartitionIndex(0) + .setControllerEpoch(0) + .setLeader(0) + .setLeaderEpoch(0) + .setIsr(Seq[Integer](0).asJava) + .setPartitionEpoch(0) + .setReplicas(Seq[Integer](0).asJava) + .setIsNew(false)).asJava, +Collections.singletonMap(topic, Uuid.randomUuid()), Review Comment: Pardon me, Do you mean that topic id of future log can be different to topic id of log? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]
showuon commented on code in PR #15951: URL: https://github.com/apache/kafka/pull/15951#discussion_r1601526314 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -314,6 +314,77 @@ class ReplicaManagerTest { } } + @ParameterizedTest(name = "testMaybeAddLogDirFetchersPausingCleaning with futureLogCreated: {0}") + @ValueSource(booleans = Array(true, false)) + def testMaybeAddLogDirFetchersPausingCleaning(futureLogCreated: Boolean): Unit = { +val dir1 = TestUtils.tempDir() +val dir2 = TestUtils.tempDir() +val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect) +props.put("log.dirs", dir1.getAbsolutePath + "," + dir2.getAbsolutePath) +val config = KafkaConfig.fromProps(props) +val logManager = TestUtils.createLogManager(config.logDirs.map(new File(_)), new LogConfig(new Properties())) +val spyLogManager = spy(logManager) +val metadataCache: MetadataCache = mock(classOf[MetadataCache]) +mockGetAliveBrokerFunctions(metadataCache, Seq(new Node(0, "host0", 0))) + when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion) +val tp0 = new TopicPartition(topic, 0) +val uuid = Uuid.randomUuid() +val rm = new ReplicaManager( + metrics = metrics, + config = config, + time = time, + scheduler = new MockScheduler(time), + logManager = spyLogManager, + quotaManagers = quotaManager, + metadataCache = metadataCache, + logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), + alterPartitionManager = alterPartitionManager) + +try { + val partition = rm.createPartition(tp0) + partition.createLogIfNotExists(isNew = false, isFutureReplica = false, +new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), None) + + rm.becomeLeaderOrFollower(0, new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, +Seq(new LeaderAndIsrPartitionState() + .setTopicName(topic) + .setPartitionIndex(0) + .setControllerEpoch(0) + .setLeader(0) + .setLeaderEpoch(0) + .setIsr(Seq[Integer](0).asJava) + .setPartitionEpoch(0) + .setReplicas(Seq[Integer](0).asJava) + .setIsNew(false)).asJava, +Collections.singletonMap(topic, Uuid.randomUuid()), Review Comment: Yes, it's possible, and it's been handled in ReplicaManager [here](https://github.com/apache/kafka/blob/3f8d11f047bf2f388fee7e8b5ddb359b47cee554/core/src/main/scala/kafka/server/ReplicaManager.scala#L1946-L1952). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]
showuon commented on code in PR #15951: URL: https://github.com/apache/kafka/pull/15951#discussion_r1601512176 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -2114,19 +2114,16 @@ class ReplicaManager(val config: KafkaConfig, partition.log.foreach { _ => val leader = BrokerEndPoint(config.brokerId, "localhost", -1) - // Add future replica log to partition's map - partition.createLogIfNotExists( -isNew = false, -isFutureReplica = true, -offsetCheckpoints, -topicIds(partition.topic)) - - // pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move - // replica from source dir to destination dir - logManager.abortAndPauseCleaning(topicPartition) - - futureReplicasAndInitialOffset.put(topicPartition, InitialFetchState(topicIds(topicPartition.topic), leader, -partition.getLeaderEpoch, futureLog.highWatermark)) + if (partition.maybeCreateFutureReplica(futureLog.parentDir, offsetCheckpoints, topicIds(partition.topic))) { +// Add future replica log to partition's map Review Comment: We need this as described in this comment: https://github.com/apache/kafka/pull/15951#discussion_r1601510877 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]
showuon commented on code in PR #15951: URL: https://github.com/apache/kafka/pull/15951#discussion_r1601510877 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -2114,19 +2114,16 @@ class ReplicaManager(val config: KafkaConfig, partition.log.foreach { _ => val leader = BrokerEndPoint(config.brokerId, "localhost", -1) - // Add future replica log to partition's map - partition.createLogIfNotExists( -isNew = false, -isFutureReplica = true, -offsetCheckpoints, -topicIds(partition.topic)) - - // pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move - // replica from source dir to destination dir - logManager.abortAndPauseCleaning(topicPartition) - - futureReplicasAndInitialOffset.put(topicPartition, InitialFetchState(topicIds(topicPartition.topic), leader, -partition.getLeaderEpoch, futureLog.highWatermark)) + if (partition.maybeCreateFutureReplica(futureLog.parentDir, offsetCheckpoints, topicIds(partition.topic))) { Review Comment: > partition.maybeCreateFutureReplica() only calls partition.createLogIfNotExists() if the future replica doesn't yet exist. If that happens, partition.futureLog won't be set, so we need to call partition.setLog(futureLog, true)? `partition.setLog(futureLog, true)` is not necessary because when in `partition.createLogIfNotExists()`, we'll also set futureLog: ``` if (isFutureReplica) { this.futureLog = Some(maybeCreate(this.futureLog)) ``` > Sincre this section is inside a block of logManager.getLog(topicPartition, isFuture = true).foreach { futureLog =>, doesn't that mean this only runs if the future replica exists? Yes, normally, when we created future log is in this path: 1. ReplicaManager#alterReplicaLogDirs 2. partition.maybeCreateFutureReplica 3. partition.createLogIfNotExists 4. partition.createLog 5. logManager.getOrCreateLog So in the end, we'll have future log added in both `logManager#futureLogs` map and partition#futureLog` map. But it's possible that the future log only exists in `logManager#futureLogs` map, but not in `partition#futureLog` map, when the future log is created, and before alter logDir completed, the broker restarted. So, after restarted, we'll add the future log into logManager map during [loadLog](https://github.com/apache/kafka/blob/3f8d11f047bf2f388fee7e8b5ddb359b47cee554/core/src/main/scala/kafka/log/LogManager.scala#L370). But the `partition#futureLog` map won't be updated, until we got leadership update and `ReplicaManager#maybeAddLogDirFetchers` is called. In this case, the `partition.maybeCreateFutureReplica` will not create the future log since it's existed, but the `partition#futureLog` will get updated with the one in logManager. That means, we have to do the `partition.createLogIfNotExists()` here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]
chia7712 commented on code in PR #15951: URL: https://github.com/apache/kafka/pull/15951#discussion_r1600301660 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -314,6 +314,77 @@ class ReplicaManagerTest { } } + @ParameterizedTest(name = "testMaybeAddLogDirFetchersPausingCleaning with futureLogCreated: {0}") + @ValueSource(booleans = Array(true, false)) + def testMaybeAddLogDirFetchersPausingCleaning(futureLogCreated: Boolean): Unit = { +val dir1 = TestUtils.tempDir() +val dir2 = TestUtils.tempDir() +val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect) +props.put("log.dirs", dir1.getAbsolutePath + "," + dir2.getAbsolutePath) +val config = KafkaConfig.fromProps(props) +val logManager = TestUtils.createLogManager(config.logDirs.map(new File(_)), new LogConfig(new Properties())) +val spyLogManager = spy(logManager) +val metadataCache: MetadataCache = mock(classOf[MetadataCache]) +mockGetAliveBrokerFunctions(metadataCache, Seq(new Node(0, "host0", 0))) + when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion) +val tp0 = new TopicPartition(topic, 0) +val uuid = Uuid.randomUuid() +val rm = new ReplicaManager( + metrics = metrics, + config = config, + time = time, + scheduler = new MockScheduler(time), + logManager = spyLogManager, + quotaManagers = quotaManager, + metadataCache = metadataCache, + logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), + alterPartitionManager = alterPartitionManager) + +try { + val partition = rm.createPartition(tp0) + partition.createLogIfNotExists(isNew = false, isFutureReplica = false, +new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), None) + + rm.becomeLeaderOrFollower(0, new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, +Seq(new LeaderAndIsrPartitionState() + .setTopicName(topic) + .setPartitionIndex(0) + .setControllerEpoch(0) + .setLeader(0) + .setLeaderEpoch(0) + .setIsr(Seq[Integer](0).asJava) + .setPartitionEpoch(0) + .setReplicas(Seq[Integer](0).asJava) + .setIsNew(false)).asJava, +Collections.singletonMap(topic, Uuid.randomUuid()), Review Comment: Is it possible that `LeaderAndIsrRequest` carries a different topic id ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]
soarez commented on code in PR #15951: URL: https://github.com/apache/kafka/pull/15951#discussion_r1600179621 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -2114,19 +2114,16 @@ class ReplicaManager(val config: KafkaConfig, partition.log.foreach { _ => val leader = BrokerEndPoint(config.brokerId, "localhost", -1) - // Add future replica log to partition's map - partition.createLogIfNotExists( -isNew = false, -isFutureReplica = true, -offsetCheckpoints, -topicIds(partition.topic)) - - // pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move - // replica from source dir to destination dir - logManager.abortAndPauseCleaning(topicPartition) - - futureReplicasAndInitialOffset.put(topicPartition, InitialFetchState(topicIds(topicPartition.topic), leader, -partition.getLeaderEpoch, futureLog.highWatermark)) + if (partition.maybeCreateFutureReplica(futureLog.parentDir, offsetCheckpoints, topicIds(partition.topic))) { Review Comment: Sincre this section is inside a block of `logManager.getLog(topicPartition, isFuture = true).foreach { futureLog =>`, doesn't that mean this only runs if the future replica exists? ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -2114,19 +2114,16 @@ class ReplicaManager(val config: KafkaConfig, partition.log.foreach { _ => val leader = BrokerEndPoint(config.brokerId, "localhost", -1) - // Add future replica log to partition's map - partition.createLogIfNotExists( -isNew = false, -isFutureReplica = true, -offsetCheckpoints, -topicIds(partition.topic)) - - // pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move - // replica from source dir to destination dir - logManager.abortAndPauseCleaning(topicPartition) - - futureReplicasAndInitialOffset.put(topicPartition, InitialFetchState(topicIds(topicPartition.topic), leader, -partition.getLeaderEpoch, futureLog.highWatermark)) + if (partition.maybeCreateFutureReplica(futureLog.parentDir, offsetCheckpoints, topicIds(partition.topic))) { +// Add future replica log to partition's map Review Comment: I assume this comment refers to setting (populating) `partition.futureLog`, which is only written to via a call `partition.createLogIfNotExists()`. Since we're replacing the call to `partition.createLogIfNotExists()` with `partition.maybeCreateFutureReplica()`, this comment should be moved there? ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -2114,19 +2114,16 @@ class ReplicaManager(val config: KafkaConfig, partition.log.foreach { _ => val leader = BrokerEndPoint(config.brokerId, "localhost", -1) - // Add future replica log to partition's map - partition.createLogIfNotExists( -isNew = false, -isFutureReplica = true, -offsetCheckpoints, -topicIds(partition.topic)) - - // pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move - // replica from source dir to destination dir - logManager.abortAndPauseCleaning(topicPartition) - - futureReplicasAndInitialOffset.put(topicPartition, InitialFetchState(topicIds(topicPartition.topic), leader, -partition.getLeaderEpoch, futureLog.highWatermark)) + if (partition.maybeCreateFutureReplica(futureLog.parentDir, offsetCheckpoints, topicIds(partition.topic))) { Review Comment: `partition.maybeCreateFutureReplica()` only calls `partition.createLogIfNotExists()` if the future replica doesn't yet exist. If that happens, `partition.futureLog` won't be set, so we need to call `partition.setLog(futureLog, true)`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]
showuon commented on PR #15951: URL: https://github.com/apache/kafka/pull/15951#issuecomment-2109816035 @soarez , call for review. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org