kirktrue commented on code in PR #14384:
URL: https://github.com/apache/kafka/pull/14384#discussion_r1344502781
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##########
@@ -94,9 +100,45 @@ public ProducerBatch(TopicPartition tp,
MemoryRecordsBuilder recordsBuilder, lon
this.isSplitBatch = isSplitBatch;
float compressionRatioEstimation =
CompressionRatioEstimator.estimation(topicPartition.topic(),
recordsBuilder.compressionType());
+ this.currentLeaderEpoch = Optional.empty();
+ this.attemptsWhenLeaderLastChanged = 0;
recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation);
}
+ /*
+ * This is called to update the leader-epoch to which this batch is going
to be produced in the ongoing attempt.
+ * @param latestLeaderEpoch The latest leader epoch.
+ * @return true if the leader has changed, otherwise false.
+ */
+ boolean maybeUpdateLeaderEpoch(Optional<Integer> latestLeaderEpoch) {
+ if (!latestLeaderEpoch.isPresent())
+ return false;
+
+ boolean leaderChanged = false;
+ int attempts = attempts();
+ log.trace("For {}, attempting to change leader, currentLeaderEpoch:
{}, attemptsWhenLeaderLastChanged:{}, latestLeaderEpoch: {}, current attempt:
{}",
+ this, currentLeaderEpoch.isPresent() ? currentLeaderEpoch.get() :
"un-initialized", attemptsWhenLeaderLastChanged, latestLeaderEpoch.get(),
attempts);
+ boolean isRetry = attempts >= 1;
+ // Checking for leader change makes sense only from 1st retry
onwards(i.e attempt >=1).
+ if (isRetry) {
+ // If the leader's epoch has changed, this counts as a leader
change
+ if (!currentLeaderEpoch.equals(latestLeaderEpoch)) {
+ attemptsWhenLeaderLastChanged = attempts;
+ leaderChanged = true;
+ } else {
+ // Otherwise, it's only a leader change until the first
attempt is made with this leader
+ leaderChanged = attempts == attemptsWhenLeaderLastChanged;
+ }
+ }
+ if (leaderChanged) {
+ log.debug("For {}, leader has changed, oldEpoch: {}, newEpoch: {}",
Review Comment:
nit: per @wcarlson5's [comment
above](https://github.com/apache/kafka/pull/14384/files#r1344314253), the log
message might be read that the internal state of the batch was _changed_,
rather than simply _detected_. I don't have suggestions on how to reword
though. It's fine if you want to leave it as is.
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -842,22 +845,31 @@ private boolean
shouldStopDrainBatchesForPartition(ProducerBatch first, TopicPar
return false;
}
- private List<ProducerBatch> drainBatchesForOneNode(Cluster cluster, Node
node, int maxSize, long now) {
+ private List<ProducerBatch> drainBatchesForOneNode(Metadata metadata, Node
node, int maxSize, long now) {
int size = 0;
- List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
+ List<PartitionInfo> parts =
metadata.fetch().partitionsForNode(node.id());
List<ProducerBatch> ready = new ArrayList<>();
+ if (parts.isEmpty())
+ return ready;
/* to make starvation less likely each node has it's own drainIndex */
int drainIndex = getDrainIndex(node.idString());
int start = drainIndex = drainIndex % parts.size();
do {
PartitionInfo part = parts.get(drainIndex);
+
TopicPartition tp = new TopicPartition(part.topic(),
part.partition());
updateDrainIndex(node.idString(), drainIndex);
drainIndex = (drainIndex + 1) % parts.size();
// Only proceed if the partition has no in-flight batches.
if (isMuted(tp))
continue;
-
+ Metadata.LeaderAndEpoch leaderAndEpoch =
metadata.currentLeader(new TopicPartition(part.topic(), part.partition()));
Review Comment:
nit: can we reuse `tp` instead of creating a new `TopicPartition`?
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##########
@@ -94,9 +100,40 @@ public ProducerBatch(TopicPartition tp,
MemoryRecordsBuilder recordsBuilder, lon
this.isSplitBatch = isSplitBatch;
float compressionRatioEstimation =
CompressionRatioEstimator.estimation(topicPartition.topic(),
recordsBuilder.compressionType());
+ this.currentLeaderEpoch = PartitionInfo.UNKNOWN_LEADER_EPOCH;
+ this.leaderChangedAttempts = -1;
recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation);
}
+ /*
+ * Returns whether the leader epoch has changed since the last attempt.
+ * @param latestLeaderEpoch The latest leader epoch.
+ * @return true if the leader has changed, otherwise false.
+ */
+ boolean hasLeaderChanged(int latestLeaderEpoch) {
+ boolean leaderChanged = false;
+ // Checking for leader change makes sense only from 1st retry
onwards(attempt >=1).
+ log.trace("For {}, attempting to change leader, currentLeaderEpoch:{},
leaderChangedAttempts:{}, latestLeaderEpoch: {}, current Attempt: {}",
+ this, currentLeaderEpoch, leaderChangedAttempts,
latestLeaderEpoch, attempts());
+ if (attempts() >= 1) {
+ // If the leader's epoch has changed, this counts as a leader
change
+ if (currentLeaderEpoch != latestLeaderEpoch) {
+ leaderChangedAttempts = attempts();
+ leaderChanged = true;
+ } else {
+ // Otherwise, it's only a leader change until the first
attempt is made with this leader
Review Comment:
I guess I was under the false impression that `maybeUpdateLeaderEpoch()` was
only called _once_ per attempt. I _think_ I can see how the flow of the code
would end up calling that multiple times.
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##########
@@ -94,9 +100,45 @@ public ProducerBatch(TopicPartition tp,
MemoryRecordsBuilder recordsBuilder, lon
this.isSplitBatch = isSplitBatch;
float compressionRatioEstimation =
CompressionRatioEstimator.estimation(topicPartition.topic(),
recordsBuilder.compressionType());
+ this.currentLeaderEpoch = Optional.empty();
+ this.attemptsWhenLeaderLastChanged = 0;
recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation);
}
+ /*
+ * This is called to update the leader-epoch to which this batch is going
to be produced in the ongoing attempt.
+ * @param latestLeaderEpoch The latest leader epoch.
+ * @return true if the leader has changed, otherwise false.
+ */
+ boolean maybeUpdateLeaderEpoch(Optional<Integer> latestLeaderEpoch) {
Review Comment:
It does assign the `currentLeaderEpoch` at the end, though, right?
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -682,8 +683,9 @@ private long partitionReady(Cluster cluster, long nowMs,
String topic,
TopicPartition part = new TopicPartition(topic, entry.getKey());
// Advance queueSizesIndex so that we properly index available
// partitions. Do it here so that it's done for all code paths.
- Node leader = cluster.leaderFor(part);
- if (leader != null && queueSizes != null) {
+
+ Metadata.LeaderAndEpoch leaderAndEpoch =
metadata.currentLeader(part);
Review Comment:
```suggestion
Metadata.LeaderAndEpoch leaderAndEpoch =
metadata.currentLeader(part);
Node leader = leaderAndEpoch.orElse(null);
```
Or something like that?
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -682,8 +683,9 @@ private long partitionReady(Cluster cluster, long nowMs,
String topic,
TopicPartition part = new TopicPartition(topic, entry.getKey());
// Advance queueSizesIndex so that we properly index available
// partitions. Do it here so that it's done for all code paths.
- Node leader = cluster.leaderFor(part);
- if (leader != null && queueSizes != null) {
+
+ Metadata.LeaderAndEpoch leaderAndEpoch =
metadata.currentLeader(part);
Review Comment:
Since the `leaderAndEpoch` variable isn't reassigned in this method, can we
grab a reference to the leader `Node` as we had before? That way we don't have
quite so many places that we have the more verbose
`leaderAndEpoch.leader.isPresent` and `leaderAndEpoch.leader.get()` everywhere.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]