jsancio commented on code in PR #12181:
URL: https://github.com/apache/kafka/pull/12181#discussion_r892961128


##########
core/src/main/scala/kafka/server/AlterPartitionManager.scala:
##########
@@ -217,36 +226,56 @@ class DefaultAlterPartitionManager(
       })
   }
 
-  private def buildRequest(inflightAlterPartitionItems: 
Seq[AlterPartitionItem]): AlterPartitionRequestData = {
+  private def buildRequest(
+    inflightAlterPartitionItems: Seq[AlterPartitionItem],
+  ): (AlterPartitionRequestData, Boolean, mutable.Map[Uuid, String]) = {
+    val metadataVersion = metadataVersionSupplier()
+    var canUseTopicIds = metadataVersion.isAtLeast(MetadataVersion.IBP_2_8_IV0)

Review Comment:
   How about `metadataVersion.isTopidIdSupported`?



##########
core/src/main/scala/kafka/server/AlterPartitionManager.scala:
##########
@@ -217,36 +226,56 @@ class DefaultAlterPartitionManager(
       })
   }
 
-  private def buildRequest(inflightAlterPartitionItems: 
Seq[AlterPartitionItem]): AlterPartitionRequestData = {
+  private def buildRequest(
+    inflightAlterPartitionItems: Seq[AlterPartitionItem],
+  ): (AlterPartitionRequestData, Boolean, mutable.Map[Uuid, String]) = {
+    val metadataVersion = metadataVersionSupplier()
+    var canUseTopicIds = metadataVersion.isAtLeast(MetadataVersion.IBP_2_8_IV0)
+    // We build this mapping in order to map topic id back to their name when 
we
+    // receive the response. We cannot rely on the metadata cache for this 
because
+    // the metadata cache is updated after the partition state so it might not 
know
+    // yet about a topic id already used here.
+    val topicNamesByIds = mutable.HashMap[Uuid, String]()
+
     val message = new AlterPartitionRequestData()
       .setBrokerId(brokerId)
-      .setBrokerEpoch(brokerEpochSupplier.apply())
+      .setBrokerEpoch(brokerEpochSupplier())
+
+    inflightAlterPartitionItems.groupBy(_.topicIdPartition.topic).foreach { 
case (topicName, items) =>
+      val topicId = items.head.topicIdPartition.topicId
+      // We use topic ids only if all the topics have one defined.
+      canUseTopicIds &= topicId != Uuid.ZERO_UUID
+      topicNamesByIds(topicId) = topicName
 
-      inflightAlterPartitionItems.groupBy(_.topicPartition.topic).foreach { 
case (topic, items) =>
       val topicData = new AlterPartitionRequestData.TopicData()
-        .setName(topic)
+        .setTopicName(topicName)
+        .setTopicId(topicId)
       message.topics.add(topicData)
+
       items.foreach { item =>
         val partitionData = new AlterPartitionRequestData.PartitionData()
-          .setPartitionIndex(item.topicPartition.partition)
+          .setPartitionIndex(item.topicIdPartition.partition)
           .setLeaderEpoch(item.leaderAndIsr.leaderEpoch)
           .setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava)
           .setPartitionEpoch(item.leaderAndIsr.partitionEpoch)
 
-        if (metadataVersionSupplier().isAtLeast(MetadataVersion.IBP_3_2_IV0)) {
+        if (metadataVersion.isAtLeast(MetadataVersion.IBP_3_2_IV0)) {
           
partitionData.setLeaderRecoveryState(item.leaderAndIsr.leaderRecoveryState.value)
         }
 
         topicData.partitions.add(partitionData)
       }
     }
-    message
+
+    (message, canUseTopicIds, if (canUseTopicIds) topicNamesByIds else 
mutable.Map.empty[Uuid, String])

Review Comment:
   I wonder if we can simplify the return values by just always returning 
`topicNamesByIds`. E.g. `(message, canUseTopicIds, topicNamesByIds)`



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -847,21 +862,29 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   private def needsExpandIsr(followerReplica: Replica): Boolean = {
-    canAddReplicaToIsr(followerReplica.brokerId) && 
isFollowerAtHighwatermark(followerReplica)
+    canAddReplicaToIsr(followerReplica.brokerId) && 
isFollowerInSync(followerReplica)
   }
 
   private def canAddReplicaToIsr(followerReplicaId: Int): Boolean = {
     val current = partitionState
-    !current.isInflight && !current.isr.contains(followerReplicaId)
+    !current.isInflight &&
+      !current.isr.contains(followerReplicaId) &&
+      isBrokerIsrEligible(followerReplicaId)
   }
 
-  private def isFollowerAtHighwatermark(followerReplica: Replica): Boolean = {
+  private def isFollowerInSync(followerReplica: Replica): Boolean = {
     leaderLogIfLocal.exists { leaderLog =>
       val followerEndOffset = followerReplica.stateSnapshot.logEndOffset
       followerEndOffset >= leaderLog.highWatermark && 
leaderEpochStartOffsetOpt.exists(followerEndOffset >= _)
     }
   }
 
+  private def isBrokerIsrEligible(brokerId: Int): Boolean = {

Review Comment:
   Minor. The replica manager refers to brokers as replicas. Why not 
`isReplicaIsrEligible` instead of `isBrokerIsrEligible`?



##########
core/src/main/scala/kafka/server/AlterPartitionManager.scala:
##########
@@ -217,36 +226,56 @@ class DefaultAlterPartitionManager(
       })
   }
 
-  private def buildRequest(inflightAlterPartitionItems: 
Seq[AlterPartitionItem]): AlterPartitionRequestData = {
+  private def buildRequest(
+    inflightAlterPartitionItems: Seq[AlterPartitionItem],
+  ): (AlterPartitionRequestData, Boolean, mutable.Map[Uuid, String]) = {
+    val metadataVersion = metadataVersionSupplier()
+    var canUseTopicIds = metadataVersion.isAtLeast(MetadataVersion.IBP_2_8_IV0)
+    // We build this mapping in order to map topic id back to their name when 
we
+    // receive the response. We cannot rely on the metadata cache for this 
because
+    // the metadata cache is updated after the partition state so it might not 
know
+    // yet about a topic id already used here.
+    val topicNamesByIds = mutable.HashMap[Uuid, String]()

Review Comment:
   I think we can remove the need to compute this map if you agree with my 
comment regarding `unsentIsrUpdates`.



##########
core/src/main/scala/kafka/server/MetadataCache.scala:
##########
@@ -58,6 +58,10 @@ trait MetadataCache {
 
   def hasAliveBroker(brokerId: Int): Boolean
 
+  def isBrokerFenced(brokerId: Int): Boolean
+
+  def isBrokerInControlledShutdown(brokerId: Int): Boolean

Review Comment:
   I wonder if we should remove these methods from this trait and the 
`ZkMetadataCache` type. We can move this complexity to 
`Partition.isBrokerIsrEligible`. You already document this subtle semantic 
there. I think this change would make that semantic explicit.



##########
clients/src/main/resources/common/message/AlterPartitionRequest.json:
##########
@@ -18,16 +18,21 @@
   "type": "request",
   "listeners": ["zkBroker", "controller"],
   "name": "AlterPartitionRequest",
-  "validVersions": "0-1",
+  // Version 1 adds LeaderRecoveryState field (KIP-704).
+  //
+  // Version 2 adds TopicId field to replace TopicName field (KIP-841).
+  "validVersions": "0-2",

Review Comment:
   I think it would be good to document when version 1 vs version 2 is sent. I 
get the impression that a few things need to be true. For example, if topic id 
is enabled (IBP 2.8) is it guarantee that every topic will have an id? I ask 
this because it is possible for this RPC to send multi topics in the request.



##########
core/src/main/scala/kafka/server/AlterPartitionManager.scala:
##########
@@ -262,29 +291,35 @@ class DefaultAlterPartitionManager(
         // Collect partition-level responses to pass to the callbacks
         val partitionResponses = new mutable.HashMap[TopicPartition, 
Either[Errors, LeaderAndIsr]]()

Review Comment:
   Similar to my other comment, but I think you can simplify this handling and 
remove the need to have `topicNamesByIds` if we make this a map of 
`Map[TopicIdPartition, Etiher[Errors, LeaderAndIsr]/`



##########
core/src/main/scala/kafka/server/AlterPartitionManager.scala:
##########
@@ -217,36 +226,56 @@ class DefaultAlterPartitionManager(
       })
   }
 
-  private def buildRequest(inflightAlterPartitionItems: 
Seq[AlterPartitionItem]): AlterPartitionRequestData = {
+  private def buildRequest(
+    inflightAlterPartitionItems: Seq[AlterPartitionItem],
+  ): (AlterPartitionRequestData, Boolean, mutable.Map[Uuid, String]) = {

Review Comment:
   I know that this is a private method but the return type is important. Can 
we add a JavaDoc that explains each element of the tuple returned?



##########
core/src/main/scala/kafka/server/AlterPartitionManager.scala:
##########
@@ -217,36 +226,56 @@ class DefaultAlterPartitionManager(
       })
   }
 
-  private def buildRequest(inflightAlterPartitionItems: 
Seq[AlterPartitionItem]): AlterPartitionRequestData = {
+  private def buildRequest(
+    inflightAlterPartitionItems: Seq[AlterPartitionItem],
+  ): (AlterPartitionRequestData, Boolean, mutable.Map[Uuid, String]) = {
+    val metadataVersion = metadataVersionSupplier()
+    var canUseTopicIds = metadataVersion.isAtLeast(MetadataVersion.IBP_2_8_IV0)
+    // We build this mapping in order to map topic id back to their name when 
we
+    // receive the response. We cannot rely on the metadata cache for this 
because
+    // the metadata cache is updated after the partition state so it might not 
know
+    // yet about a topic id already used here.
+    val topicNamesByIds = mutable.HashMap[Uuid, String]()
+
     val message = new AlterPartitionRequestData()
       .setBrokerId(brokerId)
-      .setBrokerEpoch(brokerEpochSupplier.apply())
+      .setBrokerEpoch(brokerEpochSupplier())
+
+    inflightAlterPartitionItems.groupBy(_.topicIdPartition.topic).foreach { 
case (topicName, items) =>
+      val topicId = items.head.topicIdPartition.topicId
+      // We use topic ids only if all the topics have one defined.
+      canUseTopicIds &= topicId != Uuid.ZERO_UUID
+      topicNamesByIds(topicId) = topicName
 
-      inflightAlterPartitionItems.groupBy(_.topicPartition.topic).foreach { 
case (topic, items) =>
       val topicData = new AlterPartitionRequestData.TopicData()
-        .setName(topic)
+        .setTopicName(topicName)
+        .setTopicId(topicId)
       message.topics.add(topicData)
+
       items.foreach { item =>
         val partitionData = new AlterPartitionRequestData.PartitionData()
-          .setPartitionIndex(item.topicPartition.partition)
+          .setPartitionIndex(item.topicIdPartition.partition)
           .setLeaderEpoch(item.leaderAndIsr.leaderEpoch)
           .setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava)
           .setPartitionEpoch(item.leaderAndIsr.partitionEpoch)
 
-        if (metadataVersionSupplier().isAtLeast(MetadataVersion.IBP_3_2_IV0)) {
+        if (metadataVersion.isAtLeast(MetadataVersion.IBP_3_2_IV0)) {

Review Comment:
   Do you mind changing this to `metadataVersion.isLeaderRecoverySupported`?



##########
core/src/main/scala/kafka/server/AlterPartitionManager.scala:
##########
@@ -124,7 +126,9 @@ class DefaultAlterPartitionManager(
   val metadataVersionSupplier: () => MetadataVersion
 ) extends AlterPartitionManager with Logging with KafkaMetricsGroup {
 
-  // Used to allow only one pending ISR update per partition (visible for 
testing)
+  // Used to allow only one pending ISR update per partition (visible for 
testing).
+  // Note that we key items by TopicPartition despite using TopicIdPartition 
while
+  // submitting it. We do this because we don't always have a topic id to rely 
on.

Review Comment:
   You should be able to use `TopidIdPartition` with the zero uuid if topic id 
is not supported. This is correct because equality for 
`kafka.common.TopidIdPartition` requires that all three parts are equal: topid 
id, topic name, topic partition.



##########
metadata/src/main/java/org/apache/kafka/controller/ControllerRequestContext.java:
##########
@@ -38,18 +43,49 @@ public static OptionalLong requestTimeoutMsToDeadlineNs(
         return OptionalLong.of(time.nanoseconds() + 
NANOSECONDS.convert(millisecondsOffset, MILLISECONDS));
     }
 
-    private final KafkaPrincipal principal;
+    public static ControllerRequestContext anonymousContextFor(ApiKeys 
apiKeys) {

Review Comment:
   Similar comment for this method. I don't think this is safe to have in 
`src/main` I don't see us using this in the future. If you agree, lets move 
this to `src/test`.



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -159,48 +159,63 @@ sealed trait PartitionState {
 }
 
 sealed trait PendingPartitionChange extends PartitionState {
+  def lastCommittedState: PartitionState

Review Comment:
   This is a great idea. I have been meaning to do this for a while.
   
   Is the type `PartitionState` too flexible? Should this always be 
`CommittedPartitionState`? Maybe the easiest way to do this with the current 
code structure is to move this to `PartitionState` with 
`CommittedPartitionState` implementing this method with `this`.
   
   Now that we have `lastCommittedState`, I think that we have redundant data 
in both `PendingPartitionChange` types. For example, `isr` is always set to 
`lastCommittedState.isr`.
   
   What do you think?



##########
metadata/src/main/java/org/apache/kafka/controller/ControllerRequestContext.java:
##########
@@ -17,18 +17,23 @@
 
 package org.apache.kafka.controller;
 
+import org.apache.kafka.common.message.RequestHeaderData;
+import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.utils.Time;
 
 import java.util.OptionalLong;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 
 
 public class ControllerRequestContext {
     public static final ControllerRequestContext ANONYMOUS_CONTEXT =

Review Comment:
   Is this true that this is the anonymous context now that we have the request 
header. Having this in `src/main` is not safe. It looks like we only use this 
in tests. Should we move this helper to `src/main`?



##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -915,22 +918,31 @@ Set<TopicIdPartition> imbalancedPartitions() {
         return new HashSet<>(imbalancedPartitions);
     }
 
-    ControllerResult<AlterPartitionResponseData> 
alterPartition(AlterPartitionRequestData request) {
+    ControllerResult<AlterPartitionResponseData> alterPartition(
+        ControllerRequestContext context,
+        AlterPartitionRequestData request
+    ) {
+        short requestVersion = context.requestHeader().requestApiVersion();
         clusterControl.checkBrokerEpoch(request.brokerId(), 
request.brokerEpoch());
         AlterPartitionResponseData response = new AlterPartitionResponseData();
         List<ApiMessageAndVersion> records = new ArrayList<>();
         for (AlterPartitionRequestData.TopicData topicData : request.topics()) 
{
             AlterPartitionResponseData.TopicData responseTopicData =
-                new 
AlterPartitionResponseData.TopicData().setName(topicData.name());
+                new AlterPartitionResponseData.TopicData().
+                    setTopicName(topicData.topicName()).
+                    setTopicId(topicData.topicId());
             response.topics().add(responseTopicData);
-            Uuid topicId = topicsByName.get(topicData.name());
-            if (topicId == null || !topics.containsKey(topicId)) {
+
+            Uuid topicId = requestVersion > 1 ? topicData.topicId() : 
topicsByName.get(topicData.topicName());

Review Comment:
   Btw, since the default value for topicId is `ZERO_UUID` and the default 
value topicName is `""` you maybe able to do this without needed the request 
version.



##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -915,22 +918,31 @@ Set<TopicIdPartition> imbalancedPartitions() {
         return new HashSet<>(imbalancedPartitions);
     }
 
-    ControllerResult<AlterPartitionResponseData> 
alterPartition(AlterPartitionRequestData request) {
+    ControllerResult<AlterPartitionResponseData> alterPartition(
+        ControllerRequestContext context,
+        AlterPartitionRequestData request

Review Comment:
   Outside the scope of this PR but I wonder if we need a type for this? E.g.
   
   ```java
   class ControllerRequest<T extends ApiMessage> {
       ControllerRequesContext context;
       T request;
   }



##########
core/src/main/scala/kafka/server/AlterPartitionManager.scala:
##########
@@ -217,36 +225,51 @@ class DefaultAlterPartitionManager(
       })
   }
 
-  private def buildRequest(inflightAlterPartitionItems: 
Seq[AlterPartitionItem]): AlterPartitionRequestData = {
+  private def buildRequest(
+    inflightAlterPartitionItems: Seq[AlterPartitionItem],
+  ): (AlterPartitionRequestData, Boolean, mutable.Map[Uuid, String]) = {
+    val metadataVersion = metadataVersionSupplier()
+    var canUseTopicIds = metadataVersion.isAtLeast(MetadataVersion.IBP_2_8_IV0)
+    val topicNamesByIds = mutable.HashMap[Uuid, String]()
+
     val message = new AlterPartitionRequestData()
       .setBrokerId(brokerId)
-      .setBrokerEpoch(brokerEpochSupplier.apply())
+      .setBrokerEpoch(brokerEpochSupplier())
+
+    inflightAlterPartitionItems.groupBy(_.topicIdPartition.topic).foreach { 
case (topicName, items) =>
+      val topicId = items.head.topicIdPartition.topicId
+      // We use topic ids only if all the topics have one defined.
+      canUseTopicIds &= topicId != Uuid.ZERO_UUID
+      topicNamesByIds(topicId) = topicName
 
-      inflightAlterPartitionItems.groupBy(_.topicPartition.topic).foreach { 
case (topic, items) =>
       val topicData = new AlterPartitionRequestData.TopicData()
-        .setName(topic)
+        .setTopicName(topicName)
+        .setTopicId(topicId)

Review Comment:
   Yeah, this is the reason why @dajac had to mark both `TopicName` and 
`TopicId` as `ignorable`. Ideally we wouldn't do that.
   
   It would be difficult and error prone to expose the ApiVersion to this layer 
as it would change when the active controller fails over.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to