Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]

2024-07-19 Thread via GitHub


junrao commented on code in PR #15951:
URL: https://github.com/apache/kafka/pull/15951#discussion_r1678493157


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -427,7 +427,7 @@ class Partition(val topicPartition: TopicPartition,
 * @param highWatermarkCheckpoints Checkpoint to load initial high 
watermark from
 * @return true iff the future replica is created
 */
-  def maybeCreateFutureReplica(logDir: String, highWatermarkCheckpoints: 
OffsetCheckpoints): Boolean = {
+  def maybeCreateFutureReplica(logDir: String, highWatermarkCheckpoints: 
OffsetCheckpoints, topicId: Option[Uuid] = topicId): Boolean = {

Review Comment:
   Could we add the new param to the javadoc?



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2114,25 +2114,24 @@ class ReplicaManager(val config: KafkaConfig,
 partition.log.foreach { _ =>
   val leader = BrokerEndPoint(config.brokerId, "localhost", -1)
 
-  // Add future replica log to partition's map
-  partition.createLogIfNotExists(
-isNew = false,
-isFutureReplica = true,
-offsetCheckpoints,
-topicIds(partition.topic))
-
-  // pause cleaning for partitions that are being moved and start 
ReplicaAlterDirThread to move
-  // replica from source dir to destination dir
-  logManager.abortAndPauseCleaning(topicPartition)
+  // Add future replica log to partition's map if it's not existed
+  if (partition.maybeCreateFutureReplica(futureLog.parentDir, 
offsetCheckpoints, topicIds(partition.topic))) {
+// pause cleaning for partitions that are being moved and start 
ReplicaAlterDirThread to move
+// replica from source dir to destination dir
+logManager.abortAndPauseCleaning(topicPartition)
+  }
 
   futureReplicasAndInitialOffset.put(topicPartition, 
InitialFetchState(topicIds(topicPartition.topic), leader,
 partition.getLeaderEpoch, futureLog.highWatermark))
 }
   }
 }
 
-if (futureReplicasAndInitialOffset.nonEmpty)
+if (futureReplicasAndInitialOffset.nonEmpty) {
+  // Even though it's possible that there is another thread adding fetcher 
for this future log partition,
+  // but it's fine because `BrokerIdAndFetcherId` will be identical and 
the operation will be no-op.

Review Comment:
   When the leader changes, we need to propagate the new leader epoch to 
ReplicaAlterLogDirsThread (see https://github.com/apache/kafka/pull/8223). So, 
the operation is not a no-op?



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2114,25 +2114,24 @@ class ReplicaManager(val config: KafkaConfig,
 partition.log.foreach { _ =>
   val leader = BrokerEndPoint(config.brokerId, "localhost", -1)
 
-  // Add future replica log to partition's map
-  partition.createLogIfNotExists(
-isNew = false,
-isFutureReplica = true,
-offsetCheckpoints,
-topicIds(partition.topic))
-
-  // pause cleaning for partitions that are being moved and start 
ReplicaAlterDirThread to move
-  // replica from source dir to destination dir
-  logManager.abortAndPauseCleaning(topicPartition)
+  // Add future replica log to partition's map if it's not existed
+  if (partition.maybeCreateFutureReplica(futureLog.parentDir, 
offsetCheckpoints, topicIds(partition.topic))) {
+// pause cleaning for partitions that are being moved and start 
ReplicaAlterDirThread to move
+// replica from source dir to destination dir
+logManager.abortAndPauseCleaning(topicPartition)
+  }
 
   futureReplicasAndInitialOffset.put(topicPartition, 
InitialFetchState(topicIds(topicPartition.topic), leader,
 partition.getLeaderEpoch, futureLog.highWatermark))
 }
   }
 }
 
-if (futureReplicasAndInitialOffset.nonEmpty)
+if (futureReplicasAndInitialOffset.nonEmpty) {
+  // Even though it's possible that there is another thread adding fetcher 
for this future log partition,

Review Comment:
   Hmm, `becomeLeaderOrFollower()`, `alterReplicaLogDirs()`, and `applyDelta()` 
are done under the `replicaStateChangeLock`. Is it really possible for another 
thread to add fetcher for the future log?



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2114,25 +2114,24 @@ class ReplicaManager(val config: KafkaConfig,
 partition.log.foreach { _ =>
   val leader = BrokerEndPoint(config.brokerId, "localhost", -1)
 
-  // Add future replica log to partition's map
-  partition.createLogIfNotExists(
-isNew = false,
-isFutureReplica = true,
-offsetCheckpoints,
-topicIds(partition.topic))
-
-  // pause cleaning for partitions that are being moved

Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]

2024-05-27 Thread via GitHub


showuon merged PR #15951:
URL: https://github.com/apache/kafka/pull/15951


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]

2024-05-27 Thread via GitHub


showuon commented on PR #15951:
URL: https://github.com/apache/kafka/pull/15951#issuecomment-2134320590

   Failed tests are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]

2024-05-22 Thread via GitHub


showuon commented on code in PR #15951:
URL: https://github.com/apache/kafka/pull/15951#discussion_r1610854305


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2114,16 +2114,12 @@ class ReplicaManager(val config: KafkaConfig,
 partition.log.foreach { _ =>
   val leader = BrokerEndPoint(config.brokerId, "localhost", -1)
 
-  // Add future replica log to partition's map
-  partition.createLogIfNotExists(
-isNew = false,
-isFutureReplica = true,
-offsetCheckpoints,
-topicIds(partition.topic))
-
-  // pause cleaning for partitions that are being moved and start 
ReplicaAlterDirThread to move
-  // replica from source dir to destination dir
-  logManager.abortAndPauseCleaning(topicPartition)
+  // Add future replica log to partition's map if it's not existed

Review Comment:
   @chia7712 , thanks for the comment. 
   For this:
   > In short, alterReplicaLogDirs adds alter thread [0] only if it succeeds to 
create future log of partition. Maybe maybeAddLogDirFetchers should follow same 
rule? Or we can add comments to say "that is fine as 
replicaAlterLogDirsManager.addFetcherForPartitions will be a no-op in this case?
   
   I chose latter option because if we only create fetcher when future log is 
inexisted, it might cause potential side effect that this fetcher is removed 
when leadership change, but not get added later. I've added the comment in this 
commit: 
https://github.com/apache/kafka/pull/15951/commits/0d78e493e484dd4f27ba6a127616d802999f22d0
 . Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]

2024-05-22 Thread via GitHub


chia7712 commented on code in PR #15951:
URL: https://github.com/apache/kafka/pull/15951#discussion_r1609423094


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2114,16 +2114,12 @@ class ReplicaManager(val config: KafkaConfig,
 partition.log.foreach { _ =>
   val leader = BrokerEndPoint(config.brokerId, "localhost", -1)
 
-  // Add future replica log to partition's map
-  partition.createLogIfNotExists(
-isNew = false,
-isFutureReplica = true,
-offsetCheckpoints,
-topicIds(partition.topic))
-
-  // pause cleaning for partitions that are being moved and start 
ReplicaAlterDirThread to move
-  // replica from source dir to destination dir
-  logManager.abortAndPauseCleaning(topicPartition)
+  // Add future replica log to partition's map if it's not existed

Review Comment:
   oh, my previous comments is incorrect. Both `alterReplicaLogDirs` and 
`maybeAddLogDirFetchers` are in `replicaStateChangeLock`, so the race condition 
I described should not happen.
   
   However, I'm thinking whether it is fine to add alter thread by 
`maybeAddLogDirFetchers` even though the future log of partition is already 
created by another thread. Although no new alter thread will be created as 
`BrokerIdAndFetcherId` is identical.
   
   In short, `alterReplicaLogDirs` adds alter thread [0] only if it succeeds to 
create future log of partition. Maybe `maybeAddLogDirFetchers` should follow 
same rule? Or we can add comments to say "that is fine as 
`replicaAlterLogDirsManager.addFetcherForPartitions` will be a no-op in this 
case?
   
   
   [0] 
https://github.com/apache/kafka/blob/5552f5c26df4eb07b2d6ee218e4a29e4ca790d5c/core/src/main/scala/kafka/server/ReplicaManager.scala#L1198
 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]

2024-05-21 Thread via GitHub


showuon commented on code in PR #15951:
URL: https://github.com/apache/kafka/pull/15951#discussion_r1609228927


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2114,16 +2114,12 @@ class ReplicaManager(val config: KafkaConfig,
 partition.log.foreach { _ =>
   val leader = BrokerEndPoint(config.brokerId, "localhost", -1)
 
-  // Add future replica log to partition's map
-  partition.createLogIfNotExists(
-isNew = false,
-isFutureReplica = true,
-offsetCheckpoints,
-topicIds(partition.topic))
-
-  // pause cleaning for partitions that are being moved and start 
ReplicaAlterDirThread to move
-  // replica from source dir to destination dir
-  logManager.abortAndPauseCleaning(topicPartition)
+  // Add future replica log to partition's map if it's not existed

Review Comment:
   > However, adding alter thread 
(replicaAlterLogDirsManager.addFetcherForPartitions(futureReplicasAndInitialOffset))
 is not in this check. Is it possible that alter thread, which is invoked by 
another thread, just remove the future log and then this thread add the topic 
partition to replicaAlterLogDirsManager? It seems to me that alter thread will 
get fail as future log of partition is gone.
   
   That's possible. But I think that's fine because the removal of future log 
could because:
   1. alter logDir completes. In this case, the new leaderAndIsr request or 
topic partition update will updated and this fetcher will be removed then in 
`ReplicaManager#makeLeader or makeFollower`.
   2. Another log failure happened. In this case the createLogIfInexsted will 
fail, too.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]

2024-05-21 Thread via GitHub


chia7712 commented on code in PR #15951:
URL: https://github.com/apache/kafka/pull/15951#discussion_r1608320628


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2114,16 +2114,12 @@ class ReplicaManager(val config: KafkaConfig,
 partition.log.foreach { _ =>
   val leader = BrokerEndPoint(config.brokerId, "localhost", -1)
 
-  // Add future replica log to partition's map
-  partition.createLogIfNotExists(
-isNew = false,
-isFutureReplica = true,
-offsetCheckpoints,
-topicIds(partition.topic))
-
-  // pause cleaning for partitions that are being moved and start 
ReplicaAlterDirThread to move
-  // replica from source dir to destination dir
-  logManager.abortAndPauseCleaning(topicPartition)
+  // Add future replica log to partition's map if it's not existed

Review Comment:
   If `maybeCreateFutureReplica` return false, we assume another thread already 
add the future log to partition and invoke alter thread.  Hence, we don't need 
to abort cleaning since another thread does it. 
   
   However, adding alter thread 
(`replicaAlterLogDirsManager.addFetcherForPartitions(futureReplicasAndInitialOffset)`)
 is not in this check. Is it possible that alter thread, which is invoked by 
another thread, just remove the future log and then this thread add the topic 
partition to `replicaAlterLogDirsManager`? It seems to me that alter thread 
will get fail as future log of partition is gone.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]

2024-05-21 Thread via GitHub


soarez commented on code in PR #15951:
URL: https://github.com/apache/kafka/pull/15951#discussion_r1608164096


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2114,19 +2114,16 @@ class ReplicaManager(val config: KafkaConfig,
 partition.log.foreach { _ =>
   val leader = BrokerEndPoint(config.brokerId, "localhost", -1)
 
-  // Add future replica log to partition's map
-  partition.createLogIfNotExists(
-isNew = false,
-isFutureReplica = true,
-offsetCheckpoints,
-topicIds(partition.topic))
-
-  // pause cleaning for partitions that are being moved and start 
ReplicaAlterDirThread to move
-  // replica from source dir to destination dir
-  logManager.abortAndPauseCleaning(topicPartition)
-
-  futureReplicasAndInitialOffset.put(topicPartition, 
InitialFetchState(topicIds(topicPartition.topic), leader,
-partition.getLeaderEpoch, futureLog.highWatermark))
+  if (partition.maybeCreateFutureReplica(futureLog.parentDir, 
offsetCheckpoints, topicIds(partition.topic))) {

Review Comment:
   >  That's fine because there is no chances that futureLog exists in 
partition#futureLog but not in logManager#futureLogs map.
   
   The scenario I was thinking of is when the broker starts up, `logManager` 
loads the future log, so futureLog exists in `partition#futureLog` but not in 
`logManager#futureLogs` map yet. Only later the when the broker catches up with 
metatada (`ReplicaManager#applyDelta`) or receives a LeaderAndIsr request 
(`becomeLeaderOrFollower`) and this method `maybeAddLogDirFetchers` is called, 
is when we need to make sure `partition#futureLog` is populated too.
   
   But you're right, my confusion here was with where maybeCreateFutureReplica 
checks if the futureLog already exists, it checks in itself (Partition) not in 
LogManager, so this makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]

2024-05-21 Thread via GitHub


soarez commented on PR #15951:
URL: https://github.com/apache/kafka/pull/15951#issuecomment-2122392557

   Sorry, I clicked the wrong window, did not mean to approve, I'm still 
reviewing this


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]

2024-05-21 Thread via GitHub


showuon commented on code in PR #15951:
URL: https://github.com/apache/kafka/pull/15951#discussion_r1608048789


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2114,19 +2114,16 @@ class ReplicaManager(val config: KafkaConfig,
 partition.log.foreach { _ =>
   val leader = BrokerEndPoint(config.brokerId, "localhost", -1)
 
-  // Add future replica log to partition's map
-  partition.createLogIfNotExists(
-isNew = false,
-isFutureReplica = true,
-offsetCheckpoints,
-topicIds(partition.topic))
-
-  // pause cleaning for partitions that are being moved and start 
ReplicaAlterDirThread to move
-  // replica from source dir to destination dir
-  logManager.abortAndPauseCleaning(topicPartition)
-
-  futureReplicasAndInitialOffset.put(topicPartition, 
InitialFetchState(topicIds(topicPartition.topic), leader,
-partition.getLeaderEpoch, futureLog.highWatermark))
+  if (partition.maybeCreateFutureReplica(futureLog.parentDir, 
offsetCheckpoints, topicIds(partition.topic))) {

Review Comment:
   We need a way to determine if `partition#futureLog` map exists or not, so if 
we call `partition.createLogIfNotExists()`, we can't know if it is created or 
not. `partition.maybeCreateFutureReplica()` returns a boolean value to notify 
us what we need. 
   
   That is, `partition.maybeCreateFutureReplica()` only calls 
`partition.createLogIfNotExists()` when the `partition#futureLog` map doesn't 
contain the partition. That's fine because there is no chances that futureLog 
exists in `partition#futureLog` but not in `logManager#futureLogs` map.
   
   Sorry, maybe I didn't get your question here. Could you explain again if I 
misunderstand it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]

2024-05-21 Thread via GitHub


soarez commented on code in PR #15951:
URL: https://github.com/apache/kafka/pull/15951#discussion_r1607884973


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2114,19 +2114,16 @@ class ReplicaManager(val config: KafkaConfig,
 partition.log.foreach { _ =>
   val leader = BrokerEndPoint(config.brokerId, "localhost", -1)
 
-  // Add future replica log to partition's map
-  partition.createLogIfNotExists(
-isNew = false,
-isFutureReplica = true,
-offsetCheckpoints,
-topicIds(partition.topic))
-
-  // pause cleaning for partitions that are being moved and start 
ReplicaAlterDirThread to move
-  // replica from source dir to destination dir
-  logManager.abortAndPauseCleaning(topicPartition)
-
-  futureReplicasAndInitialOffset.put(topicPartition, 
InitialFetchState(topicIds(topicPartition.topic), leader,
-partition.getLeaderEpoch, futureLog.highWatermark))
+  if (partition.maybeCreateFutureReplica(futureLog.parentDir, 
offsetCheckpoints, topicIds(partition.topic))) {

Review Comment:
   > That means, we have to do the partition.createLogIfNotExists() here.
   
   Given that `partition.maybeCreateFutureReplica()` only calls 
`partition.createLogIfNotExists()` if the future replica doesn't yet exist, and 
that it's possible that the future log only exists in `logManager#futureLogs` 
map, but not in `partition#futureLog` map – do we need to call 
`partition.createLogIfNotExists()` directly instead of 
``partition.maybeCreateFutureReplica()` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]

2024-05-20 Thread via GitHub


showuon commented on PR #15951:
URL: https://github.com/apache/kafka/pull/15951#issuecomment-2121598551

   @chia7712 , I've updated the PR. Please take a look again when available. 
Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]

2024-05-16 Thread via GitHub


showuon commented on code in PR #15951:
URL: https://github.com/apache/kafka/pull/15951#discussion_r1604412458


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2116,14 +2116,13 @@ class ReplicaManager(val config: KafkaConfig,
 
   // Add future replica log to partition's map if it's not existed
   if (partition.maybeCreateFutureReplica(futureLog.parentDir, 
offsetCheckpoints, topicIds(partition.topic))) {
-val futureLogInPartition = 
futureLocalLogOrException(topicPartition)
 // pause cleaning for partitions that are being moved and start 
ReplicaAlterDirThread to move
 // replica from source dir to destination dir
 logManager.abortAndPauseCleaning(topicPartition)
-
-futureReplicasAndInitialOffset.put(topicPartition, 
InitialFetchState(topicIds(topicPartition.topic), leader,
-  partition.getLeaderEpoch, futureLogInPartition.highWatermark))
   }
+
+  futureReplicasAndInitialOffset.put(topicPartition, 
InitialFetchState(topicIds(topicPartition.topic), leader,
+partition.getLeaderEpoch, futureLog.highWatermark))

Review Comment:
   Thanks to the failing test, I found I was wrong. We should always add the 
partition into fetch thread no matter we created the future log or not since 
before `maybeAddLogDirFetchers` is called, the fetchers are all removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]

2024-05-16 Thread via GitHub


showuon commented on code in PR #15951:
URL: https://github.com/apache/kafka/pull/15951#discussion_r1604409235


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -314,6 +314,77 @@ class ReplicaManagerTest {
 }
   }
 
+  @ParameterizedTest(name = "testMaybeAddLogDirFetchersPausingCleaning with 
futureLogCreated: {0}")
+  @ValueSource(booleans = Array(true, false))
+  def testMaybeAddLogDirFetchersPausingCleaning(futureLogCreated: Boolean): 
Unit = {
+val dir1 = TestUtils.tempDir()
+val dir2 = TestUtils.tempDir()
+val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+props.put("log.dirs", dir1.getAbsolutePath + "," + dir2.getAbsolutePath)
+val config = KafkaConfig.fromProps(props)
+val logManager = TestUtils.createLogManager(config.logDirs.map(new 
File(_)), new LogConfig(new Properties()))
+val spyLogManager = spy(logManager)
+val metadataCache: MetadataCache = mock(classOf[MetadataCache])
+mockGetAliveBrokerFunctions(metadataCache, Seq(new Node(0, "host0", 0)))
+
when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion)
+val tp0 = new TopicPartition(topic, 0)
+val uuid = Uuid.randomUuid()
+val rm = new ReplicaManager(
+  metrics = metrics,
+  config = config,
+  time = time,
+  scheduler = new MockScheduler(time),
+  logManager = spyLogManager,
+  quotaManagers = quotaManager,
+  metadataCache = metadataCache,
+  logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
+  alterPartitionManager = alterPartitionManager)
+
+try {
+  val partition = rm.createPartition(tp0)
+  partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
+new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), None)
+
+  rm.becomeLeaderOrFollower(0, new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
+Seq(new LeaderAndIsrPartitionState()
+  .setTopicName(topic)
+  .setPartitionIndex(0)
+  .setControllerEpoch(0)
+  .setLeader(0)
+  .setLeaderEpoch(0)
+  .setIsr(Seq[Integer](0).asJava)
+  .setPartitionEpoch(0)
+  .setReplicas(Seq[Integer](0).asJava)
+  .setIsNew(false)).asJava,
+Collections.singletonMap(topic, Uuid.randomUuid()),

Review Comment:
   Ah, I know what you're talking about now. The reason it won't fail is 
because the topicId we feed into the partition object is None. So the topicId 
consistency check will always pass because the original topicId is not set. 
I've updated the test and also verify the response has no errors in this 
commit: 
https://github.com/apache/kafka/pull/15951/commits/4a1b76d38effa2233fb7bc062920a2457243fded
 . Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]

2024-05-16 Thread via GitHub


chia7712 commented on code in PR #15951:
URL: https://github.com/apache/kafka/pull/15951#discussion_r1602929057


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -314,6 +314,77 @@ class ReplicaManagerTest {
 }
   }
 
+  @ParameterizedTest(name = "testMaybeAddLogDirFetchersPausingCleaning with 
futureLogCreated: {0}")
+  @ValueSource(booleans = Array(true, false))
+  def testMaybeAddLogDirFetchersPausingCleaning(futureLogCreated: Boolean): 
Unit = {
+val dir1 = TestUtils.tempDir()
+val dir2 = TestUtils.tempDir()
+val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+props.put("log.dirs", dir1.getAbsolutePath + "," + dir2.getAbsolutePath)
+val config = KafkaConfig.fromProps(props)
+val logManager = TestUtils.createLogManager(config.logDirs.map(new 
File(_)), new LogConfig(new Properties()))
+val spyLogManager = spy(logManager)
+val metadataCache: MetadataCache = mock(classOf[MetadataCache])
+mockGetAliveBrokerFunctions(metadataCache, Seq(new Node(0, "host0", 0)))
+
when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion)
+val tp0 = new TopicPartition(topic, 0)
+val uuid = Uuid.randomUuid()
+val rm = new ReplicaManager(
+  metrics = metrics,
+  config = config,
+  time = time,
+  scheduler = new MockScheduler(time),
+  logManager = spyLogManager,
+  quotaManagers = quotaManager,
+  metadataCache = metadataCache,
+  logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
+  alterPartitionManager = alterPartitionManager)
+
+try {
+  val partition = rm.createPartition(tp0)
+  partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
+new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), None)
+
+  rm.becomeLeaderOrFollower(0, new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
+Seq(new LeaderAndIsrPartitionState()
+  .setTopicName(topic)
+  .setPartitionIndex(0)
+  .setControllerEpoch(0)
+  .setLeader(0)
+  .setLeaderEpoch(0)
+  .setIsr(Seq[Integer](0).asJava)
+  .setPartitionEpoch(0)
+  .setReplicas(Seq[Integer](0).asJava)
+  .setIsNew(false)).asJava,
+Collections.singletonMap(topic, Uuid.randomUuid()),

Review Comment:
   My point was this test case will produce different topic ids. For example, 
the following assert will fail
   ```scala
 assertEquals(spyLogManager.getLog(tp0, isFuture = false).get.topicId,
   spyLogManager.getLog(tp0, isFuture = true).get.topicId)
   ```
   
   It seems `becomeLeaderOrFollower` set the topic id of "log", and the mock 
return a different topic id of future log. IIRC (and @showuon your confirm) 
they should have same topic ID, and so I'm a bit confused by this test case. 
   
   Please feel free to correct me if I misunderstand anything.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]

2024-05-15 Thread via GitHub


showuon commented on code in PR #15951:
URL: https://github.com/apache/kafka/pull/15951#discussion_r1602524342


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -314,6 +314,77 @@ class ReplicaManagerTest {
 }
   }
 
+  @ParameterizedTest(name = "testMaybeAddLogDirFetchersPausingCleaning with 
futureLogCreated: {0}")
+  @ValueSource(booleans = Array(true, false))
+  def testMaybeAddLogDirFetchersPausingCleaning(futureLogCreated: Boolean): 
Unit = {
+val dir1 = TestUtils.tempDir()
+val dir2 = TestUtils.tempDir()
+val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+props.put("log.dirs", dir1.getAbsolutePath + "," + dir2.getAbsolutePath)
+val config = KafkaConfig.fromProps(props)
+val logManager = TestUtils.createLogManager(config.logDirs.map(new 
File(_)), new LogConfig(new Properties()))
+val spyLogManager = spy(logManager)
+val metadataCache: MetadataCache = mock(classOf[MetadataCache])
+mockGetAliveBrokerFunctions(metadataCache, Seq(new Node(0, "host0", 0)))
+
when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion)
+val tp0 = new TopicPartition(topic, 0)
+val uuid = Uuid.randomUuid()
+val rm = new ReplicaManager(
+  metrics = metrics,
+  config = config,
+  time = time,
+  scheduler = new MockScheduler(time),
+  logManager = spyLogManager,
+  quotaManagers = quotaManager,
+  metadataCache = metadataCache,
+  logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
+  alterPartitionManager = alterPartitionManager)
+
+try {
+  val partition = rm.createPartition(tp0)
+  partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
+new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), None)
+
+  rm.becomeLeaderOrFollower(0, new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
+Seq(new LeaderAndIsrPartitionState()
+  .setTopicName(topic)
+  .setPartitionIndex(0)
+  .setControllerEpoch(0)
+  .setLeader(0)
+  .setLeaderEpoch(0)
+  .setIsr(Seq[Integer](0).asJava)
+  .setPartitionEpoch(0)
+  .setReplicas(Seq[Integer](0).asJava)
+  .setIsNew(false)).asJava,
+Collections.singletonMap(topic, Uuid.randomUuid()),

Review Comment:
   No, the future log is just a follower of the original log, so it must have 
the same topic ID as the original one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]

2024-05-15 Thread via GitHub


chia7712 commented on code in PR #15951:
URL: https://github.com/apache/kafka/pull/15951#discussion_r1601591079


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -314,6 +314,77 @@ class ReplicaManagerTest {
 }
   }
 
+  @ParameterizedTest(name = "testMaybeAddLogDirFetchersPausingCleaning with 
futureLogCreated: {0}")
+  @ValueSource(booleans = Array(true, false))
+  def testMaybeAddLogDirFetchersPausingCleaning(futureLogCreated: Boolean): 
Unit = {
+val dir1 = TestUtils.tempDir()
+val dir2 = TestUtils.tempDir()
+val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+props.put("log.dirs", dir1.getAbsolutePath + "," + dir2.getAbsolutePath)
+val config = KafkaConfig.fromProps(props)
+val logManager = TestUtils.createLogManager(config.logDirs.map(new 
File(_)), new LogConfig(new Properties()))
+val spyLogManager = spy(logManager)
+val metadataCache: MetadataCache = mock(classOf[MetadataCache])
+mockGetAliveBrokerFunctions(metadataCache, Seq(new Node(0, "host0", 0)))
+
when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion)
+val tp0 = new TopicPartition(topic, 0)
+val uuid = Uuid.randomUuid()
+val rm = new ReplicaManager(
+  metrics = metrics,
+  config = config,
+  time = time,
+  scheduler = new MockScheduler(time),
+  logManager = spyLogManager,
+  quotaManagers = quotaManager,
+  metadataCache = metadataCache,
+  logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
+  alterPartitionManager = alterPartitionManager)
+
+try {
+  val partition = rm.createPartition(tp0)
+  partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
+new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), None)
+
+  rm.becomeLeaderOrFollower(0, new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
+Seq(new LeaderAndIsrPartitionState()
+  .setTopicName(topic)
+  .setPartitionIndex(0)
+  .setControllerEpoch(0)
+  .setLeader(0)
+  .setLeaderEpoch(0)
+  .setIsr(Seq[Integer](0).asJava)
+  .setPartitionEpoch(0)
+  .setReplicas(Seq[Integer](0).asJava)
+  .setIsNew(false)).asJava,
+Collections.singletonMap(topic, Uuid.randomUuid()),

Review Comment:
   Pardon me, Do you mean that topic id of future log can be different to topic 
id of log? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]

2024-05-15 Thread via GitHub


showuon commented on code in PR #15951:
URL: https://github.com/apache/kafka/pull/15951#discussion_r1601526314


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -314,6 +314,77 @@ class ReplicaManagerTest {
 }
   }
 
+  @ParameterizedTest(name = "testMaybeAddLogDirFetchersPausingCleaning with 
futureLogCreated: {0}")
+  @ValueSource(booleans = Array(true, false))
+  def testMaybeAddLogDirFetchersPausingCleaning(futureLogCreated: Boolean): 
Unit = {
+val dir1 = TestUtils.tempDir()
+val dir2 = TestUtils.tempDir()
+val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+props.put("log.dirs", dir1.getAbsolutePath + "," + dir2.getAbsolutePath)
+val config = KafkaConfig.fromProps(props)
+val logManager = TestUtils.createLogManager(config.logDirs.map(new 
File(_)), new LogConfig(new Properties()))
+val spyLogManager = spy(logManager)
+val metadataCache: MetadataCache = mock(classOf[MetadataCache])
+mockGetAliveBrokerFunctions(metadataCache, Seq(new Node(0, "host0", 0)))
+
when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion)
+val tp0 = new TopicPartition(topic, 0)
+val uuid = Uuid.randomUuid()
+val rm = new ReplicaManager(
+  metrics = metrics,
+  config = config,
+  time = time,
+  scheduler = new MockScheduler(time),
+  logManager = spyLogManager,
+  quotaManagers = quotaManager,
+  metadataCache = metadataCache,
+  logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
+  alterPartitionManager = alterPartitionManager)
+
+try {
+  val partition = rm.createPartition(tp0)
+  partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
+new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), None)
+
+  rm.becomeLeaderOrFollower(0, new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
+Seq(new LeaderAndIsrPartitionState()
+  .setTopicName(topic)
+  .setPartitionIndex(0)
+  .setControllerEpoch(0)
+  .setLeader(0)
+  .setLeaderEpoch(0)
+  .setIsr(Seq[Integer](0).asJava)
+  .setPartitionEpoch(0)
+  .setReplicas(Seq[Integer](0).asJava)
+  .setIsNew(false)).asJava,
+Collections.singletonMap(topic, Uuid.randomUuid()),

Review Comment:
   Yes, it's possible, and it's been handled in ReplicaManager 
[here](https://github.com/apache/kafka/blob/3f8d11f047bf2f388fee7e8b5ddb359b47cee554/core/src/main/scala/kafka/server/ReplicaManager.scala#L1946-L1952).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]

2024-05-15 Thread via GitHub


showuon commented on code in PR #15951:
URL: https://github.com/apache/kafka/pull/15951#discussion_r1601512176


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2114,19 +2114,16 @@ class ReplicaManager(val config: KafkaConfig,
 partition.log.foreach { _ =>
   val leader = BrokerEndPoint(config.brokerId, "localhost", -1)
 
-  // Add future replica log to partition's map
-  partition.createLogIfNotExists(
-isNew = false,
-isFutureReplica = true,
-offsetCheckpoints,
-topicIds(partition.topic))
-
-  // pause cleaning for partitions that are being moved and start 
ReplicaAlterDirThread to move
-  // replica from source dir to destination dir
-  logManager.abortAndPauseCleaning(topicPartition)
-
-  futureReplicasAndInitialOffset.put(topicPartition, 
InitialFetchState(topicIds(topicPartition.topic), leader,
-partition.getLeaderEpoch, futureLog.highWatermark))
+  if (partition.maybeCreateFutureReplica(futureLog.parentDir, 
offsetCheckpoints, topicIds(partition.topic))) {
+// Add future replica log to partition's map

Review Comment:
   We need this as described in this comment: 
https://github.com/apache/kafka/pull/15951#discussion_r1601510877



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]

2024-05-15 Thread via GitHub


showuon commented on code in PR #15951:
URL: https://github.com/apache/kafka/pull/15951#discussion_r1601510877


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2114,19 +2114,16 @@ class ReplicaManager(val config: KafkaConfig,
 partition.log.foreach { _ =>
   val leader = BrokerEndPoint(config.brokerId, "localhost", -1)
 
-  // Add future replica log to partition's map
-  partition.createLogIfNotExists(
-isNew = false,
-isFutureReplica = true,
-offsetCheckpoints,
-topicIds(partition.topic))
-
-  // pause cleaning for partitions that are being moved and start 
ReplicaAlterDirThread to move
-  // replica from source dir to destination dir
-  logManager.abortAndPauseCleaning(topicPartition)
-
-  futureReplicasAndInitialOffset.put(topicPartition, 
InitialFetchState(topicIds(topicPartition.topic), leader,
-partition.getLeaderEpoch, futureLog.highWatermark))
+  if (partition.maybeCreateFutureReplica(futureLog.parentDir, 
offsetCheckpoints, topicIds(partition.topic))) {

Review Comment:
   > partition.maybeCreateFutureReplica() only calls 
partition.createLogIfNotExists() if the future replica doesn't yet exist. If 
that happens, partition.futureLog won't be set, so we need to call 
partition.setLog(futureLog, true)?
   
   `partition.setLog(futureLog, true)` is not necessary because when in 
`partition.createLogIfNotExists()`, we'll also set futureLog:
   ```
 if (isFutureReplica) {
 this.futureLog = Some(maybeCreate(this.futureLog))
   ```
   
   > Sincre this section is inside a block of logManager.getLog(topicPartition, 
isFuture = true).foreach { futureLog =>, doesn't that mean this only runs if 
the future replica exists?
   
   Yes, normally, when we created future log is in this path:
   1. ReplicaManager#alterReplicaLogDirs
   2. partition.maybeCreateFutureReplica
   3. partition.createLogIfNotExists
   4. partition.createLog
   5. logManager.getOrCreateLog
   
   So in the end, we'll have future log added in both `logManager#futureLogs` 
map and partition#futureLog` map.
   
   But it's possible that the future log only exists in `logManager#futureLogs` 
map, but not in `partition#futureLog` map, when the future log is created, and 
before alter logDir completed, the broker restarted. So, after restarted, we'll 
add the future log into logManager map during 
[loadLog](https://github.com/apache/kafka/blob/3f8d11f047bf2f388fee7e8b5ddb359b47cee554/core/src/main/scala/kafka/log/LogManager.scala#L370).
 But the `partition#futureLog` map won't be updated, until we got leadership 
update and `ReplicaManager#maybeAddLogDirFetchers` is called. In this case, the 
`partition.maybeCreateFutureReplica` will not create the future log since it's 
existed, but the `partition#futureLog` will get updated with the one in 
logManager.
   
   That means, we have to do the `partition.createLogIfNotExists()` here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]

2024-05-14 Thread via GitHub


chia7712 commented on code in PR #15951:
URL: https://github.com/apache/kafka/pull/15951#discussion_r1600301660


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -314,6 +314,77 @@ class ReplicaManagerTest {
 }
   }
 
+  @ParameterizedTest(name = "testMaybeAddLogDirFetchersPausingCleaning with 
futureLogCreated: {0}")
+  @ValueSource(booleans = Array(true, false))
+  def testMaybeAddLogDirFetchersPausingCleaning(futureLogCreated: Boolean): 
Unit = {
+val dir1 = TestUtils.tempDir()
+val dir2 = TestUtils.tempDir()
+val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+props.put("log.dirs", dir1.getAbsolutePath + "," + dir2.getAbsolutePath)
+val config = KafkaConfig.fromProps(props)
+val logManager = TestUtils.createLogManager(config.logDirs.map(new 
File(_)), new LogConfig(new Properties()))
+val spyLogManager = spy(logManager)
+val metadataCache: MetadataCache = mock(classOf[MetadataCache])
+mockGetAliveBrokerFunctions(metadataCache, Seq(new Node(0, "host0", 0)))
+
when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion)
+val tp0 = new TopicPartition(topic, 0)
+val uuid = Uuid.randomUuid()
+val rm = new ReplicaManager(
+  metrics = metrics,
+  config = config,
+  time = time,
+  scheduler = new MockScheduler(time),
+  logManager = spyLogManager,
+  quotaManagers = quotaManager,
+  metadataCache = metadataCache,
+  logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
+  alterPartitionManager = alterPartitionManager)
+
+try {
+  val partition = rm.createPartition(tp0)
+  partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
+new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), None)
+
+  rm.becomeLeaderOrFollower(0, new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
+Seq(new LeaderAndIsrPartitionState()
+  .setTopicName(topic)
+  .setPartitionIndex(0)
+  .setControllerEpoch(0)
+  .setLeader(0)
+  .setLeaderEpoch(0)
+  .setIsr(Seq[Integer](0).asJava)
+  .setPartitionEpoch(0)
+  .setReplicas(Seq[Integer](0).asJava)
+  .setIsNew(false)).asJava,
+Collections.singletonMap(topic, Uuid.randomUuid()),

Review Comment:
   Is it possible that `LeaderAndIsrRequest` carries a different topic id ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]

2024-05-14 Thread via GitHub


soarez commented on code in PR #15951:
URL: https://github.com/apache/kafka/pull/15951#discussion_r1600179621


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2114,19 +2114,16 @@ class ReplicaManager(val config: KafkaConfig,
 partition.log.foreach { _ =>
   val leader = BrokerEndPoint(config.brokerId, "localhost", -1)
 
-  // Add future replica log to partition's map
-  partition.createLogIfNotExists(
-isNew = false,
-isFutureReplica = true,
-offsetCheckpoints,
-topicIds(partition.topic))
-
-  // pause cleaning for partitions that are being moved and start 
ReplicaAlterDirThread to move
-  // replica from source dir to destination dir
-  logManager.abortAndPauseCleaning(topicPartition)
-
-  futureReplicasAndInitialOffset.put(topicPartition, 
InitialFetchState(topicIds(topicPartition.topic), leader,
-partition.getLeaderEpoch, futureLog.highWatermark))
+  if (partition.maybeCreateFutureReplica(futureLog.parentDir, 
offsetCheckpoints, topicIds(partition.topic))) {

Review Comment:
   Sincre this section is inside a block of `logManager.getLog(topicPartition, 
isFuture = true).foreach { futureLog =>`, doesn't that mean this only runs if 
the future replica exists? 



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2114,19 +2114,16 @@ class ReplicaManager(val config: KafkaConfig,
 partition.log.foreach { _ =>
   val leader = BrokerEndPoint(config.brokerId, "localhost", -1)
 
-  // Add future replica log to partition's map
-  partition.createLogIfNotExists(
-isNew = false,
-isFutureReplica = true,
-offsetCheckpoints,
-topicIds(partition.topic))
-
-  // pause cleaning for partitions that are being moved and start 
ReplicaAlterDirThread to move
-  // replica from source dir to destination dir
-  logManager.abortAndPauseCleaning(topicPartition)
-
-  futureReplicasAndInitialOffset.put(topicPartition, 
InitialFetchState(topicIds(topicPartition.topic), leader,
-partition.getLeaderEpoch, futureLog.highWatermark))
+  if (partition.maybeCreateFutureReplica(futureLog.parentDir, 
offsetCheckpoints, topicIds(partition.topic))) {
+// Add future replica log to partition's map

Review Comment:
   I assume this comment refers to setting (populating) `partition.futureLog`, 
which is only written to via a call `partition.createLogIfNotExists()`.
   Since we're replacing the call to `partition.createLogIfNotExists()` with 
`partition.maybeCreateFutureReplica()`,  this comment should be moved there?



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2114,19 +2114,16 @@ class ReplicaManager(val config: KafkaConfig,
 partition.log.foreach { _ =>
   val leader = BrokerEndPoint(config.brokerId, "localhost", -1)
 
-  // Add future replica log to partition's map
-  partition.createLogIfNotExists(
-isNew = false,
-isFutureReplica = true,
-offsetCheckpoints,
-topicIds(partition.topic))
-
-  // pause cleaning for partitions that are being moved and start 
ReplicaAlterDirThread to move
-  // replica from source dir to destination dir
-  logManager.abortAndPauseCleaning(topicPartition)
-
-  futureReplicasAndInitialOffset.put(topicPartition, 
InitialFetchState(topicIds(topicPartition.topic), leader,
-partition.getLeaderEpoch, futureLog.highWatermark))
+  if (partition.maybeCreateFutureReplica(futureLog.parentDir, 
offsetCheckpoints, topicIds(partition.topic))) {

Review Comment:
   `partition.maybeCreateFutureReplica()` only calls 
`partition.createLogIfNotExists()` if the future replica doesn't yet exist. If 
that happens, `partition.futureLog` won't be set, so we need to call 
`partition.setLog(futureLog, true)`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]

2024-05-14 Thread via GitHub


showuon commented on PR #15951:
URL: https://github.com/apache/kafka/pull/15951#issuecomment-2109816035

   @soarez , call for review. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org