Re: [PR] KAFKA-15365: Broker-side replica management changes [kafka]

2024-04-01 Thread via GitHub


mcmmining commented on PR #14881:
URL: https://github.com/apache/kafka/pull/14881#issuecomment-2030156545

   > - Extended `LogManager.getOrCreateLog` to accept the target directory's 
ID. At this level, we assume that the directory id is online to keep 
`getOrCreateLog` simple. 
   > - Fixed `LogManager.handleLogDirFailure`to remove failed directory from 
`directoryIds` alongside `_liveLogDirs`. 
   > - Update `Partition.makeLeader` and `Partition.makeFollower` to decide 
whether or not to send the directory's ID down to `LogManager.getOrCreateLog`.
   > - `ReplicaManage.applyLocalLeadersDelta` and 
`ReplicaManager.applyLocalFollowersDelta` will correct the assignment if the 
`Partition.log` doesn't equal the assigned in received 
`TopicDelta.partitionChanges` by the replica manager. 
   > 
   > **_Update_**
   > - Deleted `ReplicaManager.maybeNotifyPartitionAssignedToDirectory` as this 
one is redundant now.
   > 
   > 
   > ### Committer Checklist (excluded from commit message)
   > - [ ] Verify design and implementation 
   > - [ ] Verify test coverage and CI build status
   > - [ ] Verify documentation (including upgrade notes)
   > 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15365: Broker-side replica management changes [kafka]

2023-12-11 Thread via GitHub


rondagostino merged PR #14881:
URL: https://github.com/apache/kafka/pull/14881


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15365: Broker-side replica management changes [kafka]

2023-12-11 Thread via GitHub


OmniaGM commented on code in PR #14881:
URL: https://github.com/apache/kafka/pull/14881#discussion_r1422522894


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -867,6 +869,25 @@ class Partition(val topicPartition: TopicPartition,
 }
   }
 
+  private def createLogInAssignedDirectoryId(partitionState: 
LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, 
topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = {
+targetLogDirectoryId match {
+  case Some(directoryId) =>
+if (logManager.onlineLogDirId(directoryId)) {
+  if (logManager.getLog(topicPartition).isEmpty) {
+createLogIfNotExists(partitionState.isNew, isFutureReplica = 
false, highWatermarkCheckpoints, topicId, targetLogDirectoryId)
+  }
+} else if ((!logManager.onlineLogDirId(directoryId) && 
!logManager.hasOfflineLogDirs()) || directoryId == DirectoryId.UNASSIGNED) {
+  // select random directory if the directory Id is LOST or unknown 
and no offline directories or unassigned

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15365: Broker-side replica management changes [kafka]

2023-12-09 Thread via GitHub


OmniaGM commented on code in PR #14881:
URL: https://github.com/apache/kafka/pull/14881#discussion_r1421567480


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -123,7 +123,9 @@ class LogManager(logDirs: Seq[File],
   }
 
   private val dirLocks = lockLogDirs(liveLogDirs)
-  val directoryIds: Map[String, Uuid] = loadDirectoryIds(liveLogDirs)
+  private val directoryIds: mutable.Map[String, Uuid] = 
loadDirectoryIds(liveLogDirs)
+  def directoryIdsSet: Predef.Set[Uuid] = directoryIds.values.toSet
+
   @volatile private var recoveryPointCheckpoints = liveLogDirs.map(dir =>

Review Comment:
   rebased and fixed few tests



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15365: Broker-side replica management changes [kafka]

2023-12-09 Thread via GitHub


pprovenzano commented on code in PR #14881:
URL: https://github.com/apache/kafka/pull/14881#discussion_r1421489097


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -123,7 +123,9 @@ class LogManager(logDirs: Seq[File],
   }
 
   private val dirLocks = lockLogDirs(liveLogDirs)
-  val directoryIds: Map[String, Uuid] = loadDirectoryIds(liveLogDirs)
+  private val directoryIds: mutable.Map[String, Uuid] = 
loadDirectoryIds(liveLogDirs)
+  def directoryIdsSet: Predef.Set[Uuid] = directoryIds.values.toSet
+
   @volatile private var recoveryPointCheckpoints = liveLogDirs.map(dir =>

Review Comment:
   You need to rebase your changes on top of latest apache/trunk
   This change needs to be applied to 
'core/src/main/scala/kafka/server/KafkaServer.scala' which was updated pass 
this set to `BrokerLifecycleManager` if the Zk broker has migration enabled.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15365: Broker-side replica management changes [kafka]

2023-12-08 Thread via GitHub


OmniaGM commented on code in PR #14881:
URL: https://github.com/apache/kafka/pull/14881#discussion_r1421190145


##
metadata/src/main/java/org/apache/kafka/image/TopicDelta.java:
##
@@ -147,7 +149,7 @@ public LocalReplicaChanges localChanges(int brokerId) {
 );
 topicIds.putIfAbsent(name(), id());
 }
-} else if (
+ } else if (

Review Comment:
   solved



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15365: Broker-side replica management changes [kafka]

2023-12-08 Thread via GitHub


OmniaGM commented on code in PR #14881:
URL: https://github.com/apache/kafka/pull/14881#discussion_r1421189818


##
metadata/src/main/java/org/apache/kafka/image/TopicDelta.java:
##
@@ -158,11 +160,21 @@ public LocalReplicaChanges localChanges(int brokerId) {
 new LocalReplicaChanges.PartitionInfo(id(), 
entry.getValue())
 );
 topicIds.putIfAbsent(name(), id());
+updateDirectoryIds(brokerId, entry, prevPartition, 
directoryIds);

Review Comment:
   updated this now as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15365: Broker-side replica management changes [kafka]

2023-12-08 Thread via GitHub


OmniaGM commented on PR #14881:
URL: https://github.com/apache/kafka/pull/14881#issuecomment-1848035026

   > Checkstyle failed: Here is the diff to fix
   > 
   > ```
   > diff --git a/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java 
b/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java
   > index ffce9ea550..2686af151c 100644
   > --- a/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java
   > +++ b/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java
   > @@ -135,7 +135,7 @@ public final class TopicDelta {
   >  Map directoryIds = new HashMap<>();
   >  
   >  for (Entry entry : 
partitionChanges.entrySet()) {
   > - if (!Replicas.contains(entry.getValue().replicas, brokerId)) 
{
   > +if (!Replicas.contains(entry.getValue().replicas, brokerId)) {
   >  PartitionRegistration prevPartition = 
image.partitions().get(entry.getKey());
   >  if (prevPartition != null && 
Replicas.contains(prevPartition.replicas, brokerId)) {
   >  deletes.add(new TopicPartition(name(), 
entry.getKey()));
   > ```
   
   Fixed now


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15365: Broker-side replica management changes [kafka]

2023-12-08 Thread via GitHub


soarez commented on code in PR #14881:
URL: https://github.com/apache/kafka/pull/14881#discussion_r1420887859


##
metadata/src/main/java/org/apache/kafka/image/TopicDelta.java:
##
@@ -147,7 +149,7 @@ public LocalReplicaChanges localChanges(int brokerId) {
 );
 topicIds.putIfAbsent(name(), id());
 }
-} else if (
+ } else if (

Review Comment:
   Looks like some formatting changes sneaked through, might affect checkstyle



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15365: Broker-side replica management changes [kafka]

2023-12-08 Thread via GitHub


OmniaGM commented on code in PR #14881:
URL: https://github.com/apache/kafka/pull/14881#discussion_r1420879420


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2626,8 +2601,16 @@ class ReplicaManager(val config: KafkaConfig,
 localLeaders.forKeyValue { (tp, info) =>
   getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, 
isNew) =>
 try {
+  val partitionDirectoryId: Option[Uuid] =
+Option(newImage.topics().getPartition(info.topicId(), 
partition.partitionId)).map(_.directory(localBrokerId))

Review Comment:
   I don't see the value of defaulting to `MIGRATING` instead of Option. I 
would leave as Option for now and if we want to change this later because it 
cause an issue we can do that latter 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15365: Broker-side replica management changes [kafka]

2023-12-08 Thread via GitHub


OmniaGM commented on code in PR #14881:
URL: https://github.com/apache/kafka/pull/14881#discussion_r1420878757


##
metadata/src/main/java/org/apache/kafka/image/TopicDelta.java:
##
@@ -158,11 +160,21 @@ public LocalReplicaChanges localChanges(int brokerId) {
 new LocalReplicaChanges.PartitionInfo(id(), 
entry.getValue())
 );
 topicIds.putIfAbsent(name(), id());
+updateDirectoryIds(brokerId, entry, prevPartition, 
directoryIds);
 }
 }
 }
 
-return new LocalReplicaChanges(deletes, leaders, followers, topicIds);
+return new LocalReplicaChanges(deletes, leaders, followers, topicIds, 
directoryIds);
+}
+
+private void updateDirectoryIds(int brokerId, Entry entry, PartitionRegistration prevPartition, 
Map directoryIds) {
+if (prevPartition == null || prevPartition.directory(brokerId) != 
entry.getValue().directory(brokerId)) {
+directoryIds.put(
+new TopicPartition(name(), entry.getKey()),
+new LocalReplicaChanges.PartitionInfo(id(), 
entry.getValue())

Review Comment:
   updated this. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15365: Broker-side replica management changes [kafka]

2023-12-08 Thread via GitHub


OmniaGM commented on code in PR #14881:
URL: https://github.com/apache/kafka/pull/14881#discussion_r1420878519


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2626,8 +2601,16 @@ class ReplicaManager(val config: KafkaConfig,
 localLeaders.forKeyValue { (tp, info) =>
   getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, 
isNew) =>
 try {
+  val partitionDirectoryId: Option[Uuid] =
+Option(newImage.topics().getPartition(info.topicId(), 
partition.partitionId)).map(_.directory(localBrokerId))
+
   val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew)
-  partition.makeLeader(state, offsetCheckpoints, Some(info.topicId))
+  partition.makeLeader(state, offsetCheckpoints, Some(info.topicId), 
partitionDirectoryId)
+
+  if (!delta.localChanges(localBrokerId).directoryIds().isEmpty) {
+maybeUpdateTopicAssignment(partition, partitionDirectoryId)
+  }

Review Comment:
   I updated this now. also updated `directoryIds` to return `TopicIdParititon` 
instead of `TopicPartition` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15365: Broker-side replica management changes [kafka]

2023-12-08 Thread via GitHub


soarez commented on code in PR #14881:
URL: https://github.com/apache/kafka/pull/14881#discussion_r1420873183


##
metadata/src/main/java/org/apache/kafka/image/TopicDelta.java:
##
@@ -158,11 +160,21 @@ public LocalReplicaChanges localChanges(int brokerId) {
 new LocalReplicaChanges.PartitionInfo(id(), 
entry.getValue())
 );
 topicIds.putIfAbsent(name(), id());
+updateDirectoryIds(brokerId, entry, prevPartition, 
directoryIds);

Review Comment:
   We should only add the directoryId as a local change if `prevPartition == 
null || prevPartition.directory(brokerId) != 
entry.getValue().directory(brokerId)`. So this change should be outside of the 
leader/follower conditions.
   
   Also I think the info we want/need is a map of 
`org.apache.kafka.common.TopicIdPartition` (which already includes the topicId 
– it's always present in KRaft mode) to `Uuid` (the directoryId in the 
metadata). We can get these as:
   
   ```java
   if (Replicas.contains(entry.getValue().replicas, brokerId)) {
 PartitionRegistration prevPartition = 
image.partitions().get(entry.getKey());
 if (prevPartition == null || prevPartition.directory(brokerId) != 
entry.getValue().directory(brokerId)) {
 directoryIds.put(
 new TopicIdPartition(id(), entry.getKey(), name()),
 entry.getValue().directory(brokerId)
 );
 }
   }
   ```



##
metadata/src/main/java/org/apache/kafka/image/TopicDelta.java:
##
@@ -158,11 +160,21 @@ public LocalReplicaChanges localChanges(int brokerId) {
 new LocalReplicaChanges.PartitionInfo(id(), 
entry.getValue())
 );
 topicIds.putIfAbsent(name(), id());
+updateDirectoryIds(brokerId, entry, prevPartition, 
directoryIds);

Review Comment:
   We should only add the directoryId as a local change if `prevPartition == 
null || prevPartition.directory(brokerId) != 
entry.getValue().directory(brokerId)`. So this change should be outside of the 
leader/follower conditions.
   
   Also I think the info we want/need is a map of 
`org.apache.kafka.common.TopicIdPartition` (which already includes the topicId 
– it's always present in KRaft mode) to `Uuid` (the directoryId in the 
metadata). We can get these as:
   
   ```java
   if (Replicas.contains(entry.getValue().replicas, brokerId)) {
 PartitionRegistration prevPartition = 
image.partitions().get(entry.getKey());
 if (prevPartition == null || prevPartition.directory(brokerId) != 
entry.getValue().directory(brokerId)) {
 directoryIds.put(
 new TopicIdPartition(id(), entry.getKey(), name()),
 entry.getValue().directory(brokerId)
 );
 }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15365: Broker-side replica management changes [kafka]

2023-12-08 Thread via GitHub


soarez commented on code in PR #14881:
URL: https://github.com/apache/kafka/pull/14881#discussion_r1420866994


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2626,8 +2601,16 @@ class ReplicaManager(val config: KafkaConfig,
 localLeaders.forKeyValue { (tp, info) =>
   getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, 
isNew) =>
 try {
+  val partitionDirectoryId: Option[Uuid] =
+Option(newImage.topics().getPartition(info.topicId(), 
partition.partitionId)).map(_.directory(localBrokerId))
+
   val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew)
-  partition.makeLeader(state, offsetCheckpoints, Some(info.topicId))
+  partition.makeLeader(state, offsetCheckpoints, Some(info.topicId), 
partitionDirectoryId)
+
+  if (!delta.localChanges(localBrokerId).directoryIds().isEmpty) {
+maybeUpdateTopicAssignment(partition, partitionDirectoryId)
+  }

Review Comment:
   `applyLocalLeadersDelta` and `applyLocalFollowersDelta` are only called if 
`localChanges.leaders`/`localChanges.partitions` are not empty.
   I think `localChanges.directoryIds` should be considered separately.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15365: Broker-side replica management changes [kafka]

2023-12-08 Thread via GitHub


OmniaGM commented on code in PR #14881:
URL: https://github.com/apache/kafka/pull/14881#discussion_r1420746154


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2626,8 +2601,16 @@ class ReplicaManager(val config: KafkaConfig,
 localLeaders.forKeyValue { (tp, info) =>
   getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, 
isNew) =>
 try {
+  val partitionDirectoryId: Option[Uuid] =
+Option(newImage.topics().getPartition(info.topicId(), 
partition.partitionId)).map(_.directory(localBrokerId))
+
   val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew)
-  partition.makeLeader(state, offsetCheckpoints, Some(info.topicId))
+  partition.makeLeader(state, offsetCheckpoints, Some(info.topicId), 
partitionDirectoryId)
+
+  if (!delta.localChanges(localBrokerId).directoryIds().isEmpty) {
+maybeUpdateTopicAssignment(partition, partitionDirectoryId)
+  }

Review Comment:
   We need to update the assignment with the actual directory ID of the 
partition created at the end not with `directoryIds`. This info is available in 
`Partition` (not `TopicPartition`) which we don't have at this point. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15365: Broker-side replica management changes [kafka]

2023-12-08 Thread via GitHub


soarez commented on code in PR #14881:
URL: https://github.com/apache/kafka/pull/14881#discussion_r1420734792


##
metadata/src/main/java/org/apache/kafka/image/TopicDelta.java:
##
@@ -158,11 +160,21 @@ public LocalReplicaChanges localChanges(int brokerId) {
 new LocalReplicaChanges.PartitionInfo(id(), 
entry.getValue())
 );
 topicIds.putIfAbsent(name(), id());
+updateDirectoryIds(brokerId, entry, prevPartition, 
directoryIds);
 }
 }
 }
 
-return new LocalReplicaChanges(deletes, leaders, followers, topicIds);
+return new LocalReplicaChanges(deletes, leaders, followers, topicIds, 
directoryIds);
+}
+
+private void updateDirectoryIds(int brokerId, Entry entry, PartitionRegistration prevPartition, 
Map directoryIds) {
+if (prevPartition == null || prevPartition.directory(brokerId) != 
entry.getValue().directory(brokerId)) {
+directoryIds.put(
+new TopicPartition(name(), entry.getKey()),
+new LocalReplicaChanges.PartitionInfo(id(), 
entry.getValue())

Review Comment:
   I think this can just be `entry.getValue().directory(brokerId)`, we don't 
need the rest



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2626,8 +2601,16 @@ class ReplicaManager(val config: KafkaConfig,
 localLeaders.forKeyValue { (tp, info) =>
   getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, 
isNew) =>
 try {
+  val partitionDirectoryId: Option[Uuid] =
+Option(newImage.topics().getPartition(info.topicId(), 
partition.partitionId)).map(_.directory(localBrokerId))

Review Comment:
   Maybe we can default this to MIGRATING instead of an Option?
   
   



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2626,8 +2601,16 @@ class ReplicaManager(val config: KafkaConfig,
 localLeaders.forKeyValue { (tp, info) =>
   getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, 
isNew) =>
 try {
+  val partitionDirectoryId: Option[Uuid] =
+Option(newImage.topics().getPartition(info.topicId(), 
partition.partitionId)).map(_.directory(localBrokerId))
+
   val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew)
-  partition.makeLeader(state, offsetCheckpoints, Some(info.topicId))
+  partition.makeLeader(state, offsetCheckpoints, Some(info.topicId), 
partitionDirectoryId)
+
+  if (!delta.localChanges(localBrokerId).directoryIds().isEmpty) {
+maybeUpdateTopicAssignment(partition, partitionDirectoryId)
+  }

Review Comment:
   This can be pulled up to `def applyDelta(delta: TopicsDelta, newImage: 
MetadataImage):`.
   Maybe we can do 
`localChanges.directoryIds().forEach(maybeUpdateTopicAssignment)` after 
   ```
   if (!localChanges.leaders.isEmpty) {
 applyLocalLeadersDelta(leaderChangedPartitions, newImage, delta, 
lazyOffsetCheckpoints, localChanges.leaders.asScala)
   }
   if (!localChanges.followers.isEmpty) {
 applyLocalFollowersDelta(followerChangedPartitions, newImage, 
delta, lazyOffsetCheckpoints, localChanges.followers.asScala)
   }
   localChanges.directoryIds().forEach(maybeUpdateTopicAssignment)
   ```
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15365: Broker-side replica management changes [kafka]

2023-12-08 Thread via GitHub


soarez commented on code in PR #14881:
URL: https://github.com/apache/kafka/pull/14881#discussion_r1420691621


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2626,8 +2601,12 @@ class ReplicaManager(val config: KafkaConfig,
 localLeaders.forKeyValue { (tp, info) =>
   getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, 
isNew) =>
 try {
+  val partitionDirectoryId: Option[Uuid] =
+Option(newImage.topics().getPartition(info.topicId(), 
partition.partitionId)).map(_.directory(localBrokerId))

Review Comment:
   Maybe we can default this to `MIGRATING` instead of an `Option`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15365: Broker-side replica management changes [kafka]

2023-12-08 Thread via GitHub


OmniaGM commented on code in PR #14881:
URL: https://github.com/apache/kafka/pull/14881#discussion_r1420732241


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2626,8 +2601,12 @@ class ReplicaManager(val config: KafkaConfig,
 localLeaders.forKeyValue { (tp, info) =>
   getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, 
isNew) =>
 try {
+  val partitionDirectoryId: Option[Uuid] =
+Option(newImage.topics().getPartition(info.topicId(), 
partition.partitionId)).map(_.directory(localBrokerId))

Review Comment:
   Do we need to count `MIGRATING` now? what would this mean for creating the 
directory?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15365: Broker-side replica management changes [kafka]

2023-12-08 Thread via GitHub


OmniaGM commented on code in PR #14881:
URL: https://github.com/apache/kafka/pull/14881#discussion_r1420729137


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2666,8 +2648,8 @@ class ReplicaManager(val config: KafkaConfig,
   //   is unavailable. This is required to ensure that we include the 
partition's
   //   high watermark in the checkpoint file (see KAFKA-1647).
   val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew)
-  val isNewLeaderEpoch = partition.makeFollower(state, 
offsetCheckpoints, Some(info.topicId))
-
+  val isNewLeaderEpoch = partition.makeFollower(state, 
offsetCheckpoints, Some(info.topicId), partitionDirectoryId)
+  maybeUpdateTopicAssignment(partition, partitionDirectoryId)

Review Comment:
   I updated this now. Please have a look 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15365: Broker-side replica management changes [kafka]

2023-12-08 Thread via GitHub


soarez commented on code in PR #14881:
URL: https://github.com/apache/kafka/pull/14881#discussion_r1420691621


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2626,8 +2601,12 @@ class ReplicaManager(val config: KafkaConfig,
 localLeaders.forKeyValue { (tp, info) =>
   getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, 
isNew) =>
 try {
+  val partitionDirectoryId: Option[Uuid] =
+Option(newImage.topics().getPartition(info.topicId(), 
partition.partitionId)).map(_.directory(localBrokerId))

Review Comment:
   Maybe we can default this to `MIGRATING` instead of an `Option`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15365: Broker-side replica management changes [kafka]

2023-12-08 Thread via GitHub


soarez commented on code in PR #14881:
URL: https://github.com/apache/kafka/pull/14881#discussion_r1420675158


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2666,8 +2648,8 @@ class ReplicaManager(val config: KafkaConfig,
   //   is unavailable. This is required to ensure that we include the 
partition's
   //   high watermark in the checkpoint file (see KAFKA-1647).
   val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew)
-  val isNewLeaderEpoch = partition.makeFollower(state, 
offsetCheckpoints, Some(info.topicId))
-
+  val isNewLeaderEpoch = partition.makeFollower(state, 
offsetCheckpoints, Some(info.topicId), partitionDirectoryId)
+  maybeUpdateTopicAssignment(partition, partitionDirectoryId)

Review Comment:
   Apologies, you're right. I still think does can create duplicates.
   
   I think maybe we should extend `TopicDelta.localChanges` to also consider 
changes to the assigned directory to replicas hosted in the broker. Then we can 
gate calling `maybeUpdateTopicAssignment` unless `localChanges.directoryIds` is 
empty.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15365: Broker-side replica management changes [kafka]

2023-12-08 Thread via GitHub


OmniaGM commented on code in PR #14881:
URL: https://github.com/apache/kafka/pull/14881#discussion_r1420552612


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2666,8 +2648,8 @@ class ReplicaManager(val config: KafkaConfig,
   //   is unavailable. This is required to ensure that we include the 
partition's
   //   high watermark in the checkpoint file (see KAFKA-1647).
   val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew)
-  val isNewLeaderEpoch = partition.makeFollower(state, 
offsetCheckpoints, Some(info.topicId))
-
+  val isNewLeaderEpoch = partition.makeFollower(state, 
offsetCheckpoints, Some(info.topicId), partitionDirectoryId)
+  maybeUpdateTopicAssignment(partition, partitionDirectoryId)

Review Comment:
   this already is happening as `applyLocalFollowersDelta` and 
`applyLocalLeadersDelta`  are only invoked `ReplicaManager.applyDelta` when 
`localChanges.followers` or `localChanges.leaders` aren't empty.  
https://github.com/apache/kafka/blob/4325f9c6a3c3458485f0f7f3e718a63d63e72087/core/src/main/scala/kafka/server/ReplicaManager.scala#L2573



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15365: Broker-side replica management changes [kafka]

2023-12-08 Thread via GitHub


OmniaGM commented on code in PR #14881:
URL: https://github.com/apache/kafka/pull/14881#discussion_r1420552612


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2666,8 +2648,8 @@ class ReplicaManager(val config: KafkaConfig,
   //   is unavailable. This is required to ensure that we include the 
partition's
   //   high watermark in the checkpoint file (see KAFKA-1647).
   val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew)
-  val isNewLeaderEpoch = partition.makeFollower(state, 
offsetCheckpoints, Some(info.topicId))
-
+  val isNewLeaderEpoch = partition.makeFollower(state, 
offsetCheckpoints, Some(info.topicId), partitionDirectoryId)
+  maybeUpdateTopicAssignment(partition, partitionDirectoryId)

Review Comment:
   this already happening as `applyLocalFollowersDelta` and 
`applyLocalLeadersDelta`  are only invoked `ReplicaManager.applyDelta` when 
`localChanges.followers` or `localChanges.leaders` aren't empty.  
https://github.com/apache/kafka/blob/4325f9c6a3c3458485f0f7f3e718a63d63e72087/core/src/main/scala/kafka/server/ReplicaManager.scala#L2574
 



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2666,8 +2648,8 @@ class ReplicaManager(val config: KafkaConfig,
   //   is unavailable. This is required to ensure that we include the 
partition's
   //   high watermark in the checkpoint file (see KAFKA-1647).
   val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew)
-  val isNewLeaderEpoch = partition.makeFollower(state, 
offsetCheckpoints, Some(info.topicId))
-
+  val isNewLeaderEpoch = partition.makeFollower(state, 
offsetCheckpoints, Some(info.topicId), partitionDirectoryId)
+  maybeUpdateTopicAssignment(partition, partitionDirectoryId)

Review Comment:
   this already happening as `applyLocalFollowersDelta` and 
`applyLocalLeadersDelta`  are only invoked `ReplicaManager.applyDelta` when 
`localChanges.followers` or `localChanges.leaders` aren't empty.  
https://github.com/apache/kafka/blob/4325f9c6a3c3458485f0f7f3e718a63d63e72087/core/src/main/scala/kafka/server/ReplicaManager.scala#L2573



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15365: Broker-side replica management changes [kafka]

2023-12-08 Thread via GitHub


soarez commented on code in PR #14881:
URL: https://github.com/apache/kafka/pull/14881#discussion_r1420503147


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2666,8 +2648,8 @@ class ReplicaManager(val config: KafkaConfig,
   //   is unavailable. This is required to ensure that we include the 
partition's
   //   high watermark in the checkpoint file (see KAFKA-1647).
   val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew)
-  val isNewLeaderEpoch = partition.makeFollower(state, 
offsetCheckpoints, Some(info.topicId))
-
+  val isNewLeaderEpoch = partition.makeFollower(state, 
offsetCheckpoints, Some(info.topicId), partitionDirectoryId)
+  maybeUpdateTopicAssignment(partition, partitionDirectoryId)

Review Comment:
   I think we should only call this if `localChanges.leaders` or 
`localChanges.followers` aren't empty, otherwise we can get duplicate 
assignment requests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15365: Broker-side replica management changes [kafka]

2023-12-08 Thread via GitHub


OmniaGM commented on PR #14881:
URL: https://github.com/apache/kafka/pull/14881#issuecomment-1846975734

   > @OmniaGM can you resolve the conflicts here?
   
   resolved now


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15365: Broker-side replica management changes [kafka]

2023-12-07 Thread via GitHub


rondagostino commented on PR #14881:
URL: https://github.com/apache/kafka/pull/14881#issuecomment-1846430171

   @OmniaGM can you resolve the conflicts here?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15365: Broker-side replica management changes [kafka]

2023-12-07 Thread via GitHub


OmniaGM commented on code in PR #14881:
URL: https://github.com/apache/kafka/pull/14881#discussion_r1418966437


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -867,6 +869,26 @@ class Partition(val topicPartition: TopicPartition,
 }
   }
 
+  private def createLogInAssignedDirectoryId(partitionState: 
LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, 
topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = {
+targetLogDirectoryId match {
+  case Some(directoryId) =>
+if (logManager.onlineLogDirId(directoryId)) {
+  if (logManager.getLog(topicPartition).isEmpty) {
+createLogIfNotExists(partitionState.isNew, isFutureReplica = 
false, highWatermarkCheckpoints, topicId, targetLogDirectoryId)
+  } else {
+logger.info(s"Topic partition 
${topicPartition.topic()}-${topicPartition.partition()} already exists.")
+  }
+} else if ((!logManager.onlineLogDirId(directoryId) && 
!logManager.hasOfflineLogDirs()) || directoryId == DirectoryId.UNASSIGNED) {
+  // create on next selected directory if the directory Id is LOST or 
unknown and no offline directories or unassigned
+  createLogIfNotExists(partitionState.isNew, isFutureReplica = false, 
highWatermarkCheckpoints, topicId, None)

Review Comment:
   Good call, updated the pr to reflect this. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15365: Broker-side replica management changes [kafka]

2023-12-05 Thread via GitHub


soarez commented on PR #14881:
URL: https://github.com/apache/kafka/pull/14881#issuecomment-1841561078

   @rondagostino @cmccabe @pprovenzano PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15365: Broker-side replica management changes [kafka]

2023-12-05 Thread via GitHub


soarez commented on code in PR #14881:
URL: https://github.com/apache/kafka/pull/14881#discussion_r1416239117


##
core/src/test/scala/unit/kafka/log/LogManagerTest.scala:
##
@@ -93,6 +93,54 @@ class LogManagerTest {
 log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), 
leaderEpoch = 0)
   }
 
+  /**
+   * Test that getOrCreateLog on a non-existent log creates a new log in given 
logDirectory using directory id and that we can append to the new log.
+   */
+  @Test
+  def testCreateLogOnTargetedLogDirectory(): Unit = {
+val targetedLogDirectoryId = DirectoryId.random()
+
+val dirs: Seq[File] = Seq.fill(5)(TestUtils.tempDir())
+writeMetaProperties(dirs(0))
+writeMetaProperties(dirs(1), Optional.of(targetedLogDirectoryId))
+writeMetaProperties(dirs(3), Optional.of(DirectoryId.random()))
+writeMetaProperties(dirs(4))
+
+logManager = createLogManager(dirs)
+
+val log = logManager.getOrCreateLog(new TopicPartition(name, 0), topicId = 
None, targetLogDirectoryId = Some(targetedLogDirectoryId))
+assertEquals(5, logManager.liveLogDirs.size)
+
+val logFile = new File(dirs(1), name + "-0")
+assertTrue(logFile.exists)
+assertEquals(dirs(1).getAbsolutePath, logFile.getParent)
+log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), 
leaderEpoch = 0)
+  }
+
+  /**
+   * Test that getOrCreateLog on a non-existent log creates a new log in 
random logDirectory if the given directory id is DirectoryId.UNASSIGNED.

Review Comment:
   Not random



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15365: Broker-side replica management changes [kafka]

2023-12-05 Thread via GitHub


soarez commented on code in PR #14881:
URL: https://github.com/apache/kafka/pull/14881#discussion_r1416236550


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -991,18 +997,29 @@ class LogManager(logDirs: Seq[File],
* @param isNew Whether the replica should have existed on the broker or not
* @param isFuture True if the future log of the specified partition should 
be returned or created
* @param topicId The topic ID of the partition's topic
+   * @param targetLogDirectoryId The directory Id that should host the the 
partition's topic.
+   * The next selected directory will be picked up 
if it None or equal {@link DirectoryId.UNASSIGNED}.
+   * The method assumes provided Id belong to 
online directory.
* @throws KafkaStorageException if isNew=false, log is not found in the 
cache and there is offline log directory on the broker
* @throws InconsistentTopicIdException if the topic ID in the log does not 
match the topic ID provided
*/
-  def getOrCreateLog(topicPartition: TopicPartition, isNew: Boolean = false, 
isFuture: Boolean = false, topicId: Option[Uuid]): UnifiedLog = {
+  def getOrCreateLog(topicPartition: TopicPartition, isNew: Boolean = false, 
isFuture: Boolean = false,
+ topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid] 
= Option.empty): UnifiedLog = {
 logCreationOrDeletionLock synchronized {
   val log = getLog(topicPartition, isFuture).getOrElse {
 // create the log if it has not already been created in another thread
 if (!isNew && offlineLogDirs.nonEmpty)
   throw new KafkaStorageException(s"Can not create log for 
$topicPartition because log directories ${offlineLogDirs.mkString(",")} are 
offline")
 
 val logDirs: List[File] = {
-  val preferredLogDir = preferredLogDirs.get(topicPartition)
+  val preferredLogDir = targetLogDirectoryId.filterNot(_ == 
DirectoryId.UNASSIGNED) match {
+case Some(targetId) if 
!preferredLogDirs.containsKey(topicPartition) =>
+  // If partition is configured with both targetLogDirectoryId and 
preferredLogDirs, then
+  // preferredLogDirs will be respected, otherwise 
targetLogDirectoryId will be respected
+  directoryIds.find(_._2 == targetId).map(_._1).getOrElse(null)

Review Comment:
   This makes sense. The preferredLogDir is set by an admin, and if that exists 
it should take precedence.  



##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -991,18 +997,29 @@ class LogManager(logDirs: Seq[File],
* @param isNew Whether the replica should have existed on the broker or not
* @param isFuture True if the future log of the specified partition should 
be returned or created
* @param topicId The topic ID of the partition's topic
+   * @param targetLogDirectoryId The directory Id that should host the the 
partition's topic.
+   * The next selected directory will be picked up 
if it None or equal {@link DirectoryId.UNASSIGNED}.
+   * The method assumes provided Id belong to 
online directory.
* @throws KafkaStorageException if isNew=false, log is not found in the 
cache and there is offline log directory on the broker
* @throws InconsistentTopicIdException if the topic ID in the log does not 
match the topic ID provided
*/
-  def getOrCreateLog(topicPartition: TopicPartition, isNew: Boolean = false, 
isFuture: Boolean = false, topicId: Option[Uuid]): UnifiedLog = {
+  def getOrCreateLog(topicPartition: TopicPartition, isNew: Boolean = false, 
isFuture: Boolean = false,
+ topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid] 
= Option.empty): UnifiedLog = {
 logCreationOrDeletionLock synchronized {
   val log = getLog(topicPartition, isFuture).getOrElse {
 // create the log if it has not already been created in another thread
 if (!isNew && offlineLogDirs.nonEmpty)
   throw new KafkaStorageException(s"Can not create log for 
$topicPartition because log directories ${offlineLogDirs.mkString(",")} are 
offline")
 
 val logDirs: List[File] = {
-  val preferredLogDir = preferredLogDirs.get(topicPartition)
+  val preferredLogDir = targetLogDirectoryId.filterNot(_ == 
DirectoryId.UNASSIGNED) match {
+case Some(targetId) if 
!preferredLogDirs.containsKey(topicPartition) =>
+  // If partition is configured with both targetLogDirectoryId and 
preferredLogDirs, then
+  // preferredLogDirs will be respected, otherwise 
targetLogDirectoryId will be respected
+  directoryIds.find(_._2 == targetId).map(_._1).getOrElse(null)

Review Comment:
   This makes sense. The preferredLogDir is set by an admin, and if that exists 
it should take precedence.  



-- 
This is an automated message from the Apache Git Service.
To 

Re: [PR] KAFKA-15365: Broker-side replica management changes [kafka]

2023-12-05 Thread via GitHub


soarez commented on code in PR #14881:
URL: https://github.com/apache/kafka/pull/14881#discussion_r1416235526


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -867,6 +869,26 @@ class Partition(val topicPartition: TopicPartition,
 }
   }
 
+  private def createLogInAssignedDirectoryId(partitionState: 
LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, 
topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = {
+targetLogDirectoryId match {
+  case Some(directoryId) =>
+if (logManager.onlineLogDirId(directoryId)) {
+  if (logManager.getLog(topicPartition).isEmpty) {
+createLogIfNotExists(partitionState.isNew, isFutureReplica = 
false, highWatermarkCheckpoints, topicId, targetLogDirectoryId)
+  } else {
+logger.info(s"Topic partition 
${topicPartition.topic()}-${topicPartition.partition()} already exists.")
+  }
+} else if ((!logManager.onlineLogDirId(directoryId) && 
!logManager.hasOfflineLogDirs()) || directoryId == DirectoryId.UNASSIGNED) {
+  // create on next selected directory if the directory Id is LOST or 
unknown and no offline directories or unassigned
+  createLogIfNotExists(partitionState.isNew, isFutureReplica = false, 
highWatermarkCheckpoints, topicId, None)

Review Comment:
   I think if we have a directory ID defined, we can safely set `isNew` to 
`directoryId == DirectoryId.UNASSIGNED`. And then I think we don't need the 
check above for offline directories.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15365: Broker-side replica management changes [kafka]

2023-12-05 Thread via GitHub


OmniaGM commented on code in PR #14881:
URL: https://github.com/apache/kafka/pull/14881#discussion_r1415898460


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -867,6 +869,26 @@ class Partition(val topicPartition: TopicPartition,
 }
   }
 
+  private def createLogInAssignedDirectoryId(partitionState: 
LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, 
topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = {
+targetLogDirectoryId match {
+  case Some(directoryId) =>
+if (logManager.onlineLogDirId(directoryId)) {
+  if (logManager.getLog(topicPartition).isEmpty) {
+createLogIfNotExists(partitionState.isNew, isFutureReplica = 
false, highWatermarkCheckpoints, topicId, targetLogDirectoryId)
+  } else {
+logger.info(s"Topic partition 
${topicPartition.topic()}-${topicPartition.partition()} already exists.")
+  }
+} else if ((!logManager.onlineLogDirId(directoryId) && 
!logManager.hasOfflineLogDirs()) || directoryId == DirectoryId.UNASSIGNED) {
+  // create on next selected directory if the directory Id is LOST or 
unknown and no offline directories or unassigned
+  createLogIfNotExists(partitionState.isNew, isFutureReplica = false, 
highWatermarkCheckpoints, topicId, None)

Review Comment:
   `createLogIfNotExists` which invokes down the line 
`logManager.getOrCreateLog` will do nothing if the topic partition exists 
somewhere else. I removed the previous check in the other if statement to 
simplify this as it was unnecessary. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15365: Broker-side replica management changes [kafka]

2023-12-05 Thread via GitHub


OmniaGM commented on code in PR #14881:
URL: https://github.com/apache/kafka/pull/14881#discussion_r1415898460


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -867,6 +869,26 @@ class Partition(val topicPartition: TopicPartition,
 }
   }
 
+  private def createLogInAssignedDirectoryId(partitionState: 
LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, 
topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = {
+targetLogDirectoryId match {
+  case Some(directoryId) =>
+if (logManager.onlineLogDirId(directoryId)) {
+  if (logManager.getLog(topicPartition).isEmpty) {
+createLogIfNotExists(partitionState.isNew, isFutureReplica = 
false, highWatermarkCheckpoints, topicId, targetLogDirectoryId)
+  } else {
+logger.info(s"Topic partition 
${topicPartition.topic()}-${topicPartition.partition()} already exists.")
+  }
+} else if ((!logManager.onlineLogDirId(directoryId) && 
!logManager.hasOfflineLogDirs()) || directoryId == DirectoryId.UNASSIGNED) {
+  // create on next selected directory if the directory Id is LOST or 
unknown and no offline directories or unassigned
+  createLogIfNotExists(partitionState.isNew, isFutureReplica = false, 
highWatermarkCheckpoints, topicId, None)

Review Comment:
   `createLogIfNotExists` will do nothing if the topic partition exists 
somewhere else. I removed the previous check in the other if statement to 
simplify this as it was unnecessary. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15365: Broker-side replica management changes [kafka]

2023-12-05 Thread via GitHub


soarez commented on code in PR #14881:
URL: https://github.com/apache/kafka/pull/14881#discussion_r1415748253


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -867,6 +869,26 @@ class Partition(val topicPartition: TopicPartition,
 }
   }
 
+  private def createLogInAssignedDirectoryId(partitionState: 
LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, 
topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = {
+targetLogDirectoryId match {
+  case Some(directoryId) =>
+if (logManager.onlineLogDirId(directoryId)) {
+  if (logManager.getLog(topicPartition).isEmpty) {
+createLogIfNotExists(partitionState.isNew, isFutureReplica = 
false, highWatermarkCheckpoints, topicId, targetLogDirectoryId)
+  } else {
+logger.info(s"Topic partition 
${topicPartition.topic()}-${topicPartition.partition()} already exists.")
+  }
+} else if ((!logManager.onlineLogDirId(directoryId) && 
!logManager.hasOfflineLogDirs()) || directoryId == DirectoryId.UNASSIGNED) {
+  // create on next selected directory if the directory Id is LOST or 
unknown and no offline directories or unassigned
+  createLogIfNotExists(partitionState.isNew, isFutureReplica = false, 
highWatermarkCheckpoints, topicId, None)

Review Comment:
   The partition may exist in some other log dir



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15365: Broker-side replica management changes [kafka]

2023-12-05 Thread via GitHub


soarez commented on code in PR #14881:
URL: https://github.com/apache/kafka/pull/14881#discussion_r1415655849


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -867,6 +869,25 @@ class Partition(val topicPartition: TopicPartition,
 }
   }
 
+  private def createLogInAssignedDirectoryId(partitionState: 
LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, 
topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = {
+targetLogDirectoryId match {
+  case Some(directoryId) =>
+if (logManager.onlineLogDirId(directoryId)) {
+  if (logManager.getLog(topicPartition).isEmpty) {
+createLogIfNotExists(partitionState.isNew, isFutureReplica = 
false, highWatermarkCheckpoints, topicId, targetLogDirectoryId)
+  }

Review Comment:
   Makes sense. Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15365: Broker-side replica management changes [kafka]

2023-12-05 Thread via GitHub


OmniaGM commented on code in PR #14881:
URL: https://github.com/apache/kafka/pull/14881#discussion_r1415618371


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2643,6 +2645,17 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  private def getAssignedDirectoryId(delta: TopicsDelta, partition: 
Partition): Option[Uuid] = {
+for {
+  (topicId, topicDelta) <- delta.changedTopics().asScala.find(_._2.name() 
== partition.topic)
+  if topicDelta != null
+  partitionRegistration = 
topicDelta.partitionChanges().get(partition.partitionId)
+  if partitionRegistration != null
+  localBrokerIndexInReplicas = 
partitionRegistration.replicas.indexOf(localBrokerId)
+  directory = partitionRegistration.directories(localBrokerIndexInReplicas)
+} yield directory
+  }
+

Review Comment:
   I updated this to ` Option(newImage.topics().getPartition(topicId, 
partitionId)).map(_.directory(localBrokerId))` as `TopicsImage.getPartition` 
has the potential of returning `null` and wrapping `.getPartition(topicId, 
partitionId).directory` in `Option` would not protect us from 
`NullPointerException`. One other thing to keep in mind here is 
`PartitionRegistration.directory` may throw `IllegalArgumentException` which 
shouldn't be a concern in real use cases but can cause misconfigure tests to 
fail with `IllegalArgumentException`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15365: Broker-side replica management changes [kafka]

2023-12-05 Thread via GitHub


OmniaGM commented on code in PR #14881:
URL: https://github.com/apache/kafka/pull/14881#discussion_r1415612624


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2741,6 +2755,18 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  private def mayUpdateTopicAssignment(partition: Partition, 
partitionDirectoryId: Option[Uuid]) = {
+if (partitionDirectoryId.isDefined) {
+  val topicPartitionActualDirectory = partition.log.flatMap(log => 
logManager.directoryId(log.dir.getParent))
+  if 
(!topicPartitionActualDirectory.exists(partitionDirectoryId.contains)) {
+topicPartitionActualDirectory
+  .flatMap(uuid => partition.topicId.map(topicId =>
+directoryEventHandler.handleAssignment(new 
common.TopicIdPartition(topicId, partition.partitionId), uuid)
+  ))

Review Comment:
   replaced it with for each, don't know why i jumped to map here 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15365: Broker-side replica management changes [kafka]

2023-12-05 Thread via GitHub


OmniaGM commented on code in PR #14881:
URL: https://github.com/apache/kafka/pull/14881#discussion_r1415612007


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2741,6 +2755,18 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  private def mayUpdateTopicAssignment(partition: Partition, 
partitionDirectoryId: Option[Uuid]) = {

Review Comment:
   yup, fixed the name



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15365: Broker-side replica management changes [kafka]

2023-12-05 Thread via GitHub


OmniaGM commented on code in PR #14881:
URL: https://github.com/apache/kafka/pull/14881#discussion_r1415611267


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -123,14 +123,19 @@ class LogManager(logDirs: Seq[File],
   }
 
   private val dirLocks = lockLogDirs(liveLogDirs)
-  val directoryIds: Map[String, Uuid] = loadDirectoryIds(liveLogDirs)
+  private val directoryIds: mutable.Map[String, Uuid] = 
loadDirectoryIds(liveLogDirs)
+  def directoryIdsSet: Predef.Set[Uuid] = directoryIds.values.toSet
+
   @volatile private var recoveryPointCheckpoints = liveLogDirs.map(dir =>
 (dir, new OffsetCheckpointFile(new File(dir, RecoveryPointCheckpointFile), 
logDirFailureChannel))).toMap
   @volatile private var logStartOffsetCheckpoints = liveLogDirs.map(dir =>
 (dir, new OffsetCheckpointFile(new File(dir, 
LogStartOffsetCheckpointFile), logDirFailureChannel))).toMap
 
   private val preferredLogDirs = new ConcurrentHashMap[TopicPartition, 
String]()
 
+  def hasOfflineLogDirs(): Boolean = offlineLogDirs.nonEmpty

Review Comment:
   good call fixed this. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15365: Broker-side replica management changes [kafka]

2023-12-05 Thread via GitHub


OmniaGM commented on code in PR #14881:
URL: https://github.com/apache/kafka/pull/14881#discussion_r1415609251


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -867,6 +869,25 @@ class Partition(val topicPartition: TopicPartition,
 }
   }
 
+  private def createLogInAssignedDirectoryId(partitionState: 
LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, 
topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = {
+targetLogDirectoryId match {
+  case Some(directoryId) =>
+if (logManager.onlineLogDirId(directoryId)) {
+  if (logManager.getLog(topicPartition).isEmpty) {
+createLogIfNotExists(partitionState.isNew, isFutureReplica = 
false, highWatermarkCheckpoints, topicId, targetLogDirectoryId)
+  }

Review Comment:
   The `else` means it just existed somewhere and we don't need to create it. 
It doesn't matter if it exists on this log dir or another at this point. 
ReplicaManager will decide later to send the reassignment if partition.log != 
assigned log. I added an else with log.info that the topic existed before 
somewhere. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15365: Broker-side replica management changes [kafka]

2023-12-05 Thread via GitHub


OmniaGM commented on code in PR #14881:
URL: https://github.com/apache/kafka/pull/14881#discussion_r1415609251


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -867,6 +869,25 @@ class Partition(val topicPartition: TopicPartition,
 }
   }
 
+  private def createLogInAssignedDirectoryId(partitionState: 
LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, 
topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = {
+targetLogDirectoryId match {
+  case Some(directoryId) =>
+if (logManager.onlineLogDirId(directoryId)) {
+  if (logManager.getLog(topicPartition).isEmpty) {
+createLogIfNotExists(partitionState.isNew, isFutureReplica = 
false, highWatermarkCheckpoints, topicId, targetLogDirectoryId)
+  }

Review Comment:
   The `else` means it just existed somewhere and we don't need to create it. 
It doesn't matter if on this log or another at this point. ReplicaManager will 
decide later to send the reassignment if partition.log != assigned log. I added 
an else with log.info that the topic existed before somewhere. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15365: Broker-side replica management changes [kafka]

2023-12-05 Thread via GitHub


OmniaGM commented on code in PR #14881:
URL: https://github.com/apache/kafka/pull/14881#discussion_r1415604987


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -867,6 +869,25 @@ class Partition(val topicPartition: TopicPartition,
 }
   }
 
+  private def createLogInAssignedDirectoryId(partitionState: 
LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, 
topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = {
+targetLogDirectoryId match {
+  case Some(directoryId) =>
+if (logManager.onlineLogDirId(directoryId)) {
+  if (logManager.getLog(topicPartition).isEmpty) {
+createLogIfNotExists(partitionState.isNew, isFutureReplica = 
false, highWatermarkCheckpoints, topicId, targetLogDirectoryId)
+  }
+} else if ((!logManager.onlineLogDirId(directoryId) && 
!logManager.hasOfflineLogDirs()) || directoryId == DirectoryId.UNASSIGNED) {
+  // select random directory if the directory Id is LOST or unknown 
and no offline directories or unassigned
+  createLogIfNotExists(partitionState.isNew, isFutureReplica = false, 
highWatermarkCheckpoints, topicId, None)
+} else {
+ logger.warn(s"can't create $topicPartition on directory id 
${directoryId}")
+}
+  case None =>
+// original flow of creating in random log directory.

Review Comment:
   fixed this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15365: Broker-side replica management changes [kafka]

2023-12-03 Thread via GitHub


soarez commented on code in PR #14881:
URL: https://github.com/apache/kafka/pull/14881#discussion_r1413201855


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -867,6 +869,25 @@ class Partition(val topicPartition: TopicPartition,
 }
   }
 
+  private def createLogInAssignedDirectoryId(partitionState: 
LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, 
topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = {
+targetLogDirectoryId match {
+  case Some(directoryId) =>
+if (logManager.onlineLogDirId(directoryId)) {
+  if (logManager.getLog(topicPartition).isEmpty) {
+createLogIfNotExists(partitionState.isNew, isFutureReplica = 
false, highWatermarkCheckpoints, topicId, targetLogDirectoryId)
+  }
+} else if ((!logManager.onlineLogDirId(directoryId) && 
!logManager.hasOfflineLogDirs()) || directoryId == DirectoryId.UNASSIGNED) {
+  // select random directory if the directory Id is LOST or unknown 
and no offline directories or unassigned

Review Comment:
   Here too, directory selection isn't random, this is misleading.



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2643,6 +2645,17 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  private def getAssignedDirectoryId(delta: TopicsDelta, partition: 
Partition): Option[Uuid] = {
+for {
+  (topicId, topicDelta) <- delta.changedTopics().asScala.find(_._2.name() 
== partition.topic)
+  if topicDelta != null
+  partitionRegistration = 
topicDelta.partitionChanges().get(partition.partitionId)
+  if partitionRegistration != null
+  localBrokerIndexInReplicas = 
partitionRegistration.replicas.indexOf(localBrokerId)
+  directory = partitionRegistration.directories(localBrokerIndexInReplicas)
+} yield directory
+  }
+

Review Comment:
   You can avoid all of this and just do  `val directoryId = 
Option(newImage.topics().getPartition(topicId, 
partitionId).directory(brokerId))`



##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -991,18 +997,27 @@ class LogManager(logDirs: Seq[File],
* @param isNew Whether the replica should have existed on the broker or not
* @param isFuture True if the future log of the specified partition should 
be returned or created
* @param topicId The topic ID of the partition's topic
+   * @param targetLogDirectoryId The directory Id that should host the the 
partition's topic.
+   * A random directory will be picked up if it 
None or equal {@link DirectoryId.UNASSIGNED}.

Review Comment:
   Not random. Following `LogManager.nextLogDirs()`.



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2741,6 +2755,18 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  private def mayUpdateTopicAssignment(partition: Partition, 
partitionDirectoryId: Option[Uuid]) = {
+if (partitionDirectoryId.isDefined) {
+  val topicPartitionActualDirectory = partition.log.flatMap(log => 
logManager.directoryId(log.dir.getParent))
+  if 
(!topicPartitionActualDirectory.exists(partitionDirectoryId.contains)) {
+topicPartitionActualDirectory
+  .flatMap(uuid => partition.topicId.map(topicId =>
+directoryEventHandler.handleAssignment(new 
common.TopicIdPartition(topicId, partition.partitionId), uuid)
+  ))

Review Comment:
   It is odd to map and flatMap but then throw away the result. Should this be 
a foreach instead?



##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -123,14 +123,19 @@ class LogManager(logDirs: Seq[File],
   }
 
   private val dirLocks = lockLogDirs(liveLogDirs)
-  val directoryIds: Map[String, Uuid] = loadDirectoryIds(liveLogDirs)
+  private val directoryIds: mutable.Map[String, Uuid] = 
loadDirectoryIds(liveLogDirs)
+  def directoryIdsSet: Predef.Set[Uuid] = directoryIds.values.toSet
+
   @volatile private var recoveryPointCheckpoints = liveLogDirs.map(dir =>
 (dir, new OffsetCheckpointFile(new File(dir, RecoveryPointCheckpointFile), 
logDirFailureChannel))).toMap
   @volatile private var logStartOffsetCheckpoints = liveLogDirs.map(dir =>
 (dir, new OffsetCheckpointFile(new File(dir, 
LogStartOffsetCheckpointFile), logDirFailureChannel))).toMap
 
   private val preferredLogDirs = new ConcurrentHashMap[TopicPartition, 
String]()
 
+  def hasOfflineLogDirs(): Boolean = offlineLogDirs.nonEmpty

Review Comment:
   Calling the `offlineLogDirs` function builds a list by creating a set with 
`logDirs` and then removing all of `_liveLogDirs`. Instead of doing that we can 
just compare the number of entries in `logDirs` with `_liveLogDirs`.



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2741,6 +2755,18 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  private def mayUpdateTopicAssignment(partition: Partition, 

[PR] KAFKA-15365: Broker-side replica management changes [kafka]

2023-11-30 Thread via GitHub


OmniaGM opened a new pull request, #14881:
URL: https://github.com/apache/kafka/pull/14881

   - Extended `LogManager.getOrCreateLog` to accept the target directory's ID. 
At this level, we assume that the directory id is online to keep 
`getOrCreateLog` simple. 
   - Fixed `LogManager.handleLogDirFailure`to remove failed directory from 
`directoryIds` alongside `_liveLogDirs`. 
   - Update `Partition.makeLeader` and `Partition.makeFollower` to decide 
whether or not to send the directory's ID down to `LogManager.getOrCreateLog`.
   - `ReplicaManage.applyLocalLeadersDelta` and 
`ReplicaManager.applyLocalFollowersDelta` will correct the assignment if the 
`Partition.log` doesn't equal the assigned in received 
`TopicDelta.partitionChanges` by the replica manager. 
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15365: Broker-side replica management changes [kafka]

2023-11-30 Thread via GitHub


OmniaGM closed pull request #14880: KAFKA-15365: Broker-side replica management 
changes
URL: https://github.com/apache/kafka/pull/14880


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15365: Broker-side replica management changes [kafka]

2023-11-30 Thread via GitHub


OmniaGM opened a new pull request, #14880:
URL: https://github.com/apache/kafka/pull/14880

   - Extended `LogManager.getOrCreateLog` to accept the target directory's ID. 
At this level, we assume that the directory id is online to keep 
`getOrCreateLog` simple. 
   - Fixed `LogManager.handleLogDirFailure`to remove failed directory from 
`directoryIds` along side `_liveLogDirs`. 
   - Update `Partition.makeLeader` and `Partition.makeFollower` to decide 
whether or not to send the directory's ID down to `LogManager.getOrCreateLog`.
   - `ReplicaManage.applyLocalLeadersDelta` and 
`ReplicaManager.applyLocalFollowersDelta` will correct the assignment if the 
`Partition.log` doesn't equal the assigned in received 
`TopicDelta.partitionChanges` by the replica manager. 
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org