jsancio commented on code in PR #12181:
URL: https://github.com/apache/kafka/pull/12181#discussion_r892961128
##########
core/src/main/scala/kafka/server/AlterPartitionManager.scala:
##########
@@ -217,36 +226,56 @@ class DefaultAlterPartitionManager(
})
}
- private def buildRequest(inflightAlterPartitionItems:
Seq[AlterPartitionItem]): AlterPartitionRequestData = {
+ private def buildRequest(
+ inflightAlterPartitionItems: Seq[AlterPartitionItem],
+ ): (AlterPartitionRequestData, Boolean, mutable.Map[Uuid, String]) = {
+ val metadataVersion = metadataVersionSupplier()
+ var canUseTopicIds = metadataVersion.isAtLeast(MetadataVersion.IBP_2_8_IV0)
Review Comment:
How about `metadataVersion.isTopidIdSupported`?
##########
core/src/main/scala/kafka/server/AlterPartitionManager.scala:
##########
@@ -217,36 +226,56 @@ class DefaultAlterPartitionManager(
})
}
- private def buildRequest(inflightAlterPartitionItems:
Seq[AlterPartitionItem]): AlterPartitionRequestData = {
+ private def buildRequest(
+ inflightAlterPartitionItems: Seq[AlterPartitionItem],
+ ): (AlterPartitionRequestData, Boolean, mutable.Map[Uuid, String]) = {
+ val metadataVersion = metadataVersionSupplier()
+ var canUseTopicIds = metadataVersion.isAtLeast(MetadataVersion.IBP_2_8_IV0)
+ // We build this mapping in order to map topic id back to their name when
we
+ // receive the response. We cannot rely on the metadata cache for this
because
+ // the metadata cache is updated after the partition state so it might not
know
+ // yet about a topic id already used here.
+ val topicNamesByIds = mutable.HashMap[Uuid, String]()
+
val message = new AlterPartitionRequestData()
.setBrokerId(brokerId)
- .setBrokerEpoch(brokerEpochSupplier.apply())
+ .setBrokerEpoch(brokerEpochSupplier())
+
+ inflightAlterPartitionItems.groupBy(_.topicIdPartition.topic).foreach {
case (topicName, items) =>
+ val topicId = items.head.topicIdPartition.topicId
+ // We use topic ids only if all the topics have one defined.
+ canUseTopicIds &= topicId != Uuid.ZERO_UUID
+ topicNamesByIds(topicId) = topicName
- inflightAlterPartitionItems.groupBy(_.topicPartition.topic).foreach {
case (topic, items) =>
val topicData = new AlterPartitionRequestData.TopicData()
- .setName(topic)
+ .setTopicName(topicName)
+ .setTopicId(topicId)
message.topics.add(topicData)
+
items.foreach { item =>
val partitionData = new AlterPartitionRequestData.PartitionData()
- .setPartitionIndex(item.topicPartition.partition)
+ .setPartitionIndex(item.topicIdPartition.partition)
.setLeaderEpoch(item.leaderAndIsr.leaderEpoch)
.setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava)
.setPartitionEpoch(item.leaderAndIsr.partitionEpoch)
- if (metadataVersionSupplier().isAtLeast(MetadataVersion.IBP_3_2_IV0)) {
+ if (metadataVersion.isAtLeast(MetadataVersion.IBP_3_2_IV0)) {
partitionData.setLeaderRecoveryState(item.leaderAndIsr.leaderRecoveryState.value)
}
topicData.partitions.add(partitionData)
}
}
- message
+
+ (message, canUseTopicIds, if (canUseTopicIds) topicNamesByIds else
mutable.Map.empty[Uuid, String])
Review Comment:
I wonder if we can simplify the return values by just always returning
`topicNamesByIds`. E.g. `(message, canUseTopicIds, topicNamesByIds)`
##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -847,21 +862,29 @@ class Partition(val topicPartition: TopicPartition,
}
private def needsExpandIsr(followerReplica: Replica): Boolean = {
- canAddReplicaToIsr(followerReplica.brokerId) &&
isFollowerAtHighwatermark(followerReplica)
+ canAddReplicaToIsr(followerReplica.brokerId) &&
isFollowerInSync(followerReplica)
}
private def canAddReplicaToIsr(followerReplicaId: Int): Boolean = {
val current = partitionState
- !current.isInflight && !current.isr.contains(followerReplicaId)
+ !current.isInflight &&
+ !current.isr.contains(followerReplicaId) &&
+ isBrokerIsrEligible(followerReplicaId)
}
- private def isFollowerAtHighwatermark(followerReplica: Replica): Boolean = {
+ private def isFollowerInSync(followerReplica: Replica): Boolean = {
leaderLogIfLocal.exists { leaderLog =>
val followerEndOffset = followerReplica.stateSnapshot.logEndOffset
followerEndOffset >= leaderLog.highWatermark &&
leaderEpochStartOffsetOpt.exists(followerEndOffset >= _)
}
}
+ private def isBrokerIsrEligible(brokerId: Int): Boolean = {
Review Comment:
Minor. The replica manager refers to brokers as replicas. Why not
`isReplicaIsrEligible` instead of `isBrokerIsrEligible`?
##########
core/src/main/scala/kafka/server/AlterPartitionManager.scala:
##########
@@ -217,36 +226,56 @@ class DefaultAlterPartitionManager(
})
}
- private def buildRequest(inflightAlterPartitionItems:
Seq[AlterPartitionItem]): AlterPartitionRequestData = {
+ private def buildRequest(
+ inflightAlterPartitionItems: Seq[AlterPartitionItem],
+ ): (AlterPartitionRequestData, Boolean, mutable.Map[Uuid, String]) = {
+ val metadataVersion = metadataVersionSupplier()
+ var canUseTopicIds = metadataVersion.isAtLeast(MetadataVersion.IBP_2_8_IV0)
+ // We build this mapping in order to map topic id back to their name when
we
+ // receive the response. We cannot rely on the metadata cache for this
because
+ // the metadata cache is updated after the partition state so it might not
know
+ // yet about a topic id already used here.
+ val topicNamesByIds = mutable.HashMap[Uuid, String]()
Review Comment:
I think we can remove the need to compute this map if you agree with my
comment regarding `unsentIsrUpdates`.
##########
core/src/main/scala/kafka/server/MetadataCache.scala:
##########
@@ -58,6 +58,10 @@ trait MetadataCache {
def hasAliveBroker(brokerId: Int): Boolean
+ def isBrokerFenced(brokerId: Int): Boolean
+
+ def isBrokerInControlledShutdown(brokerId: Int): Boolean
Review Comment:
I wonder if we should remove these methods from this trait and the
`ZkMetadataCache` type. We can move this complexity to
`Partition.isBrokerIsrEligible`. You already document this subtle semantic
there. I think this change would make that semantic explicit.
##########
clients/src/main/resources/common/message/AlterPartitionRequest.json:
##########
@@ -18,16 +18,21 @@
"type": "request",
"listeners": ["zkBroker", "controller"],
"name": "AlterPartitionRequest",
- "validVersions": "0-1",
+ // Version 1 adds LeaderRecoveryState field (KIP-704).
+ //
+ // Version 2 adds TopicId field to replace TopicName field (KIP-841).
+ "validVersions": "0-2",
Review Comment:
I think it would be good to document when version 1 vs version 2 is sent. I
get the impression that a few things need to be true. For example, if topic id
is enabled (IBP 2.8) is it guarantee that every topic will have an id? I ask
this because it is possible for this RPC to send multi topics in the request.
##########
core/src/main/scala/kafka/server/AlterPartitionManager.scala:
##########
@@ -262,29 +291,35 @@ class DefaultAlterPartitionManager(
// Collect partition-level responses to pass to the callbacks
val partitionResponses = new mutable.HashMap[TopicPartition,
Either[Errors, LeaderAndIsr]]()
Review Comment:
Similar to my other comment, but I think you can simplify this handling and
remove the need to have `topicNamesByIds` if we make this a map of
`Map[TopicIdPartition, Etiher[Errors, LeaderAndIsr]/`
##########
core/src/main/scala/kafka/server/AlterPartitionManager.scala:
##########
@@ -217,36 +226,56 @@ class DefaultAlterPartitionManager(
})
}
- private def buildRequest(inflightAlterPartitionItems:
Seq[AlterPartitionItem]): AlterPartitionRequestData = {
+ private def buildRequest(
+ inflightAlterPartitionItems: Seq[AlterPartitionItem],
+ ): (AlterPartitionRequestData, Boolean, mutable.Map[Uuid, String]) = {
Review Comment:
I know that this is a private method but the return type is important. Can
we add a JavaDoc that explains each element of the tuple returned?
##########
core/src/main/scala/kafka/server/AlterPartitionManager.scala:
##########
@@ -217,36 +226,56 @@ class DefaultAlterPartitionManager(
})
}
- private def buildRequest(inflightAlterPartitionItems:
Seq[AlterPartitionItem]): AlterPartitionRequestData = {
+ private def buildRequest(
+ inflightAlterPartitionItems: Seq[AlterPartitionItem],
+ ): (AlterPartitionRequestData, Boolean, mutable.Map[Uuid, String]) = {
+ val metadataVersion = metadataVersionSupplier()
+ var canUseTopicIds = metadataVersion.isAtLeast(MetadataVersion.IBP_2_8_IV0)
+ // We build this mapping in order to map topic id back to their name when
we
+ // receive the response. We cannot rely on the metadata cache for this
because
+ // the metadata cache is updated after the partition state so it might not
know
+ // yet about a topic id already used here.
+ val topicNamesByIds = mutable.HashMap[Uuid, String]()
+
val message = new AlterPartitionRequestData()
.setBrokerId(brokerId)
- .setBrokerEpoch(brokerEpochSupplier.apply())
+ .setBrokerEpoch(brokerEpochSupplier())
+
+ inflightAlterPartitionItems.groupBy(_.topicIdPartition.topic).foreach {
case (topicName, items) =>
+ val topicId = items.head.topicIdPartition.topicId
+ // We use topic ids only if all the topics have one defined.
+ canUseTopicIds &= topicId != Uuid.ZERO_UUID
+ topicNamesByIds(topicId) = topicName
- inflightAlterPartitionItems.groupBy(_.topicPartition.topic).foreach {
case (topic, items) =>
val topicData = new AlterPartitionRequestData.TopicData()
- .setName(topic)
+ .setTopicName(topicName)
+ .setTopicId(topicId)
message.topics.add(topicData)
+
items.foreach { item =>
val partitionData = new AlterPartitionRequestData.PartitionData()
- .setPartitionIndex(item.topicPartition.partition)
+ .setPartitionIndex(item.topicIdPartition.partition)
.setLeaderEpoch(item.leaderAndIsr.leaderEpoch)
.setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava)
.setPartitionEpoch(item.leaderAndIsr.partitionEpoch)
- if (metadataVersionSupplier().isAtLeast(MetadataVersion.IBP_3_2_IV0)) {
+ if (metadataVersion.isAtLeast(MetadataVersion.IBP_3_2_IV0)) {
Review Comment:
Do you mind changing this to `metadataVersion.isLeaderRecoverySupported`?
##########
core/src/main/scala/kafka/server/AlterPartitionManager.scala:
##########
@@ -124,7 +126,9 @@ class DefaultAlterPartitionManager(
val metadataVersionSupplier: () => MetadataVersion
) extends AlterPartitionManager with Logging with KafkaMetricsGroup {
- // Used to allow only one pending ISR update per partition (visible for
testing)
+ // Used to allow only one pending ISR update per partition (visible for
testing).
+ // Note that we key items by TopicPartition despite using TopicIdPartition
while
+ // submitting it. We do this because we don't always have a topic id to rely
on.
Review Comment:
You should be able to use `TopidIdPartition` with the zero uuid if topic id
is not supported. This is correct because equality for
`kafka.common.TopidIdPartition` requires that all three parts are equal: topid
id, topic name, topic partition.
##########
metadata/src/main/java/org/apache/kafka/controller/ControllerRequestContext.java:
##########
@@ -38,18 +43,49 @@ public static OptionalLong requestTimeoutMsToDeadlineNs(
return OptionalLong.of(time.nanoseconds() +
NANOSECONDS.convert(millisecondsOffset, MILLISECONDS));
}
- private final KafkaPrincipal principal;
+ public static ControllerRequestContext anonymousContextFor(ApiKeys
apiKeys) {
Review Comment:
Similar comment for this method. I don't think this is safe to have in
`src/main` I don't see us using this in the future. If you agree, lets move
this to `src/test`.
##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -159,48 +159,63 @@ sealed trait PartitionState {
}
sealed trait PendingPartitionChange extends PartitionState {
+ def lastCommittedState: PartitionState
Review Comment:
This is a great idea. I have been meaning to do this for a while.
Is the type `PartitionState` too flexible? Should this always be
`CommittedPartitionState`? Maybe the easiest way to do this with the current
code structure is to move this to `PartitionState` with
`CommittedPartitionState` implementing this method with `this`.
Now that we have `lastCommittedState`, I think that we have redundant data
in both `PendingPartitionChange` types. For example, `isr` is always set to
`lastCommittedState.isr`.
What do you think?
##########
metadata/src/main/java/org/apache/kafka/controller/ControllerRequestContext.java:
##########
@@ -17,18 +17,23 @@
package org.apache.kafka.controller;
+import org.apache.kafka.common.message.RequestHeaderData;
+import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.utils.Time;
import java.util.OptionalLong;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
public class ControllerRequestContext {
public static final ControllerRequestContext ANONYMOUS_CONTEXT =
Review Comment:
Is this true that this is the anonymous context now that we have the request
header. Having this in `src/main` is not safe. It looks like we only use this
in tests. Should we move this helper to `src/main`?
##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -915,22 +918,31 @@ Set<TopicIdPartition> imbalancedPartitions() {
return new HashSet<>(imbalancedPartitions);
}
- ControllerResult<AlterPartitionResponseData>
alterPartition(AlterPartitionRequestData request) {
+ ControllerResult<AlterPartitionResponseData> alterPartition(
+ ControllerRequestContext context,
+ AlterPartitionRequestData request
+ ) {
+ short requestVersion = context.requestHeader().requestApiVersion();
clusterControl.checkBrokerEpoch(request.brokerId(),
request.brokerEpoch());
AlterPartitionResponseData response = new AlterPartitionResponseData();
List<ApiMessageAndVersion> records = new ArrayList<>();
for (AlterPartitionRequestData.TopicData topicData : request.topics())
{
AlterPartitionResponseData.TopicData responseTopicData =
- new
AlterPartitionResponseData.TopicData().setName(topicData.name());
+ new AlterPartitionResponseData.TopicData().
+ setTopicName(topicData.topicName()).
+ setTopicId(topicData.topicId());
response.topics().add(responseTopicData);
- Uuid topicId = topicsByName.get(topicData.name());
- if (topicId == null || !topics.containsKey(topicId)) {
+
+ Uuid topicId = requestVersion > 1 ? topicData.topicId() :
topicsByName.get(topicData.topicName());
Review Comment:
Btw, since the default value for topicId is `ZERO_UUID` and the default
value topicName is `""` you maybe able to do this without needed the request
version.
##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -915,22 +918,31 @@ Set<TopicIdPartition> imbalancedPartitions() {
return new HashSet<>(imbalancedPartitions);
}
- ControllerResult<AlterPartitionResponseData>
alterPartition(AlterPartitionRequestData request) {
+ ControllerResult<AlterPartitionResponseData> alterPartition(
+ ControllerRequestContext context,
+ AlterPartitionRequestData request
Review Comment:
Outside the scope of this PR but I wonder if we need a type for this? E.g.
```java
class ControllerRequest<T extends ApiMessage> {
ControllerRequesContext context;
T request;
}
##########
core/src/main/scala/kafka/server/AlterPartitionManager.scala:
##########
@@ -217,36 +225,51 @@ class DefaultAlterPartitionManager(
})
}
- private def buildRequest(inflightAlterPartitionItems:
Seq[AlterPartitionItem]): AlterPartitionRequestData = {
+ private def buildRequest(
+ inflightAlterPartitionItems: Seq[AlterPartitionItem],
+ ): (AlterPartitionRequestData, Boolean, mutable.Map[Uuid, String]) = {
+ val metadataVersion = metadataVersionSupplier()
+ var canUseTopicIds = metadataVersion.isAtLeast(MetadataVersion.IBP_2_8_IV0)
+ val topicNamesByIds = mutable.HashMap[Uuid, String]()
+
val message = new AlterPartitionRequestData()
.setBrokerId(brokerId)
- .setBrokerEpoch(brokerEpochSupplier.apply())
+ .setBrokerEpoch(brokerEpochSupplier())
+
+ inflightAlterPartitionItems.groupBy(_.topicIdPartition.topic).foreach {
case (topicName, items) =>
+ val topicId = items.head.topicIdPartition.topicId
+ // We use topic ids only if all the topics have one defined.
+ canUseTopicIds &= topicId != Uuid.ZERO_UUID
+ topicNamesByIds(topicId) = topicName
- inflightAlterPartitionItems.groupBy(_.topicPartition.topic).foreach {
case (topic, items) =>
val topicData = new AlterPartitionRequestData.TopicData()
- .setName(topic)
+ .setTopicName(topicName)
+ .setTopicId(topicId)
Review Comment:
Yeah, this is the reason why @dajac had to mark both `TopicName` and
`TopicId` as `ignorable`. Ideally we wouldn't do that.
It would be difficult and error prone to expose the ApiVersion to this layer
as it would change when the active controller fails over.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]