msn-tldr commented on code in PR #14384:
URL: https://github.com/apache/kafka/pull/14384#discussion_r1345789833
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##########
@@ -94,9 +100,45 @@ public ProducerBatch(TopicPartition tp,
MemoryRecordsBuilder recordsBuilder, lon
this.isSplitBatch = isSplitBatch;
float compressionRatioEstimation =
CompressionRatioEstimator.estimation(topicPartition.topic(),
recordsBuilder.compressionType());
+ this.currentLeaderEpoch = Optional.empty();
+ this.attemptsWhenLeaderLastChanged = 0;
recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation);
}
+ /*
+ * This is called to update the leader-epoch to which this batch is going
to be produced in the ongoing attempt.
+ * @param latestLeaderEpoch The latest leader epoch.
+ * @return true if the leader has changed, otherwise false.
+ */
+ boolean maybeUpdateLeaderEpoch(Optional<Integer> latestLeaderEpoch) {
Review Comment:
So the method does 2 things
1. updates leader if a newer leader is supplied
2. also return has changed for the ongoing retry of the batch.
Keeping them both in 1 method, is a bit confusing, as evident in the naming
😄
So i have split it into 2 methods, to me code is more readable now.
##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##########
@@ -3146,6 +3149,97 @@ public void testInvalidTxnStateIsAnAbortableError()
throws Exception {
txnManager.beginTransaction();
}
+ @Test
+ public void testProducerBatchRetriesWhenPartitionLeaderChanges() throws
Exception {
+ Metrics m = new Metrics();
+ SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+ try {
+ // SETUP
+ String metricGrpName = "producer-metrics-test-stats-1";
+ long totalSize = 1024 * 1024;
+ BufferPool pool = new BufferPool(totalSize, batchSize, metrics,
time,
+ metricGrpName);
+ long retryBackoffMaxMs = 100L;
Review Comment:
Fair point. The `SenderTest` already mocks `time` using `MockTime`, and that
is manually advanced. Is that what u had in mind?
##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##########
@@ -1398,16 +1416,253 @@ public void testBuiltInPartitionerFractionalBatches()
throws Exception {
time.sleep(10);
// We should have one batch ready.
- Set<Node> nodes = accum.ready(cluster,
time.milliseconds()).readyNodes;
+ Set<Node> nodes = accum.ready(metadataMock,
time.milliseconds()).readyNodes;
assertEquals(1, nodes.size(), "Should have 1 leader ready");
- List<ProducerBatch> batches = accum.drain(cluster, nodes,
Integer.MAX_VALUE, 0).entrySet().iterator().next().getValue();
+ List<ProducerBatch> batches = accum.drain(metadataMock, nodes,
Integer.MAX_VALUE, 0).entrySet().iterator().next().getValue();
assertEquals(1, batches.size(), "Should have 1 batch ready");
int actualBatchSize = batches.get(0).records().sizeInBytes();
assertTrue(actualBatchSize > batchSize / 2, "Batch must be greater
than half batch.size");
assertTrue(actualBatchSize < batchSize, "Batch must be less than
batch.size");
}
}
+ /**
+ * For a batch being retried, this validates ready() and drain() whether a
batch should skip-backoff(retries-immediately), or backoff, based on -
+ * 1. how long it has waited between retry attempts.
+ * 2. change in leader hosting the partition.
+ */
+ @Test
+ public void testReadyAndDrainWhenABatchIsBeingRetried() throws
InterruptedException {
Review Comment:
So if
1. batch should backoff - then both `ready` & `drain` would not have node1 &
batch respectively in their output.
```
assertFalse(result.readyNodes.contains(node1), "Node1 is not ready");
assertTrue(batches.containsKey(node1.id()) &&
batches.get(node1.id()).isEmpty(),
"No batches ready to be drained on Node1");
```
2. batch should not be backing off - then both `ready` & `drain` would have
node1 & batch respectively in their output.
```
assertTrue(result.readyNodes.contains(node1), "Node1 is ready");
assertTrue(batches.containsKey(node1.id()) && batches.get(node1.id()).size()
== 1, "Node1 has 1 batch ready & drained");
```
Other assertions are sanity checking auxillary conditions
##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##########
@@ -1398,16 +1416,253 @@ public void testBuiltInPartitionerFractionalBatches()
throws Exception {
time.sleep(10);
// We should have one batch ready.
- Set<Node> nodes = accum.ready(cluster,
time.milliseconds()).readyNodes;
+ Set<Node> nodes = accum.ready(metadataMock,
time.milliseconds()).readyNodes;
assertEquals(1, nodes.size(), "Should have 1 leader ready");
- List<ProducerBatch> batches = accum.drain(cluster, nodes,
Integer.MAX_VALUE, 0).entrySet().iterator().next().getValue();
+ List<ProducerBatch> batches = accum.drain(metadataMock, nodes,
Integer.MAX_VALUE, 0).entrySet().iterator().next().getValue();
assertEquals(1, batches.size(), "Should have 1 batch ready");
int actualBatchSize = batches.get(0).records().sizeInBytes();
assertTrue(actualBatchSize > batchSize / 2, "Batch must be greater
than half batch.size");
assertTrue(actualBatchSize < batchSize, "Batch must be less than
batch.size");
}
}
+ /**
+ * For a batch being retried, this validates ready() and drain() whether a
batch should skip-backoff(retries-immediately), or backoff, based on -
+ * 1. how long it has waited between retry attempts.
+ * 2. change in leader hosting the partition.
+ */
+ @Test
+ public void testReadyAndDrainWhenABatchIsBeingRetried() throws
InterruptedException {
+ int part1LeaderEpoch = 100;
+ // Create cluster metadata, partition1 being hosted by node1.
+ part1 = new PartitionInfo(topic, partition1, node1, null, null, null);
+ cluster = new Cluster(null, Arrays.asList(node1, node2),
Arrays.asList(part1),
+ Collections.emptySet(), Collections.emptySet());
+ final int finalEpoch = part1LeaderEpoch;
+ metadataMock = setupMetadata(cluster, tp -> finalEpoch);
+
+ int batchSize = 10;
+ int lingerMs = 10;
+ int retryBackoffMs = 100;
+ int retryBackoffMaxMs = 1000;
+ int deliveryTimeoutMs = Integer.MAX_VALUE;
+ long totalSize = 10 * 1024;
+ String metricGrpName = "producer-metrics";
+ final RecordAccumulator accum = new RecordAccumulator(logContext,
batchSize,
+ CompressionType.NONE, lingerMs, retryBackoffMs, retryBackoffMaxMs,
+ deliveryTimeoutMs, metrics, metricGrpName, time, new
ApiVersions(), null,
+ new BufferPool(totalSize, batchSize, metrics, time,
metricGrpName));
+
+ // Create 1 batch(batchA) to be produced to partition1.
+ long now = time.milliseconds();
+ accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS,
null, maxBlockTimeMs, false, now, cluster);
+
+ // 1st attempt to produce batchA, it should be ready & drained to be
produced.
+ {
+ log.info("Running 1st attempt to produce batchA, it should be
ready & drained to be produced.");
+ now += lingerMs + 1;
+ RecordAccumulator.ReadyCheckResult result =
accum.ready(metadataMock, now);
+ assertTrue(result.readyNodes.contains(node1), "Node1 is ready");
+
+ Map<Integer, List<ProducerBatch>> batches =
accum.drain(metadataMock,
+ result.readyNodes, 999999 /* maxSize */, now);
+ assertTrue(batches.containsKey(node1.id()) &&
batches.get(node1.id()).size() == 1, "Node1 has 1 batch ready & drained");
+ ProducerBatch batch = batches.get(node1.id()).get(0);
+ assertEquals(Optional.of(part1LeaderEpoch),
batch.currentLeaderEpoch());
+ assertEquals(0, batch.attemptsWhenLeaderLastChanged());
+ // Re-enqueue batch for subsequent retries & test-cases
+ accum.reenqueue(batch, now);
+ }
+
+ // In this retry of batchA, wait-time between retries is less than
configured and no leader change, so should backoff.
+ {
+ log.info("In this retry of batchA, wait-time between retries is
less than configured and no leader change, so should backoff.");
+ now += 1;
+ RecordAccumulator.ReadyCheckResult result =
accum.ready(metadataMock, now);
+ assertFalse(result.readyNodes.contains(node1), "Node1 is not
ready");
+
+ // Try to drain from node1, it should return no batches.
+ Map<Integer, List<ProducerBatch>> batches =
accum.drain(metadataMock,
+ new HashSet<>(Arrays.asList(node1)), 999999 /* maxSize */,
now);
+ assertTrue(batches.containsKey(node1.id()) &&
batches.get(node1.id()).isEmpty(),
+ "No batches ready to be drained on Node1");
+ }
+
+ // In this retry of batchA, wait-time between retries is less than
configured and leader has changed, so should not backoff.
+ {
+ log.info("In this retry of batchA, wait-time between retries is
less than configured and leader has changed, so should not backoff.");
+ now += 1;
+ part1LeaderEpoch++;
+ // Create cluster metadata, with new leader epoch.
+ part1 = new PartitionInfo(topic, partition1, node1, null, null,
null);
+ cluster = new Cluster(null, Arrays.asList(node1, node2),
Arrays.asList(part1),
+ Collections.emptySet(), Collections.emptySet());
+ final int finalPart1LeaderEpoch = part1LeaderEpoch;
+ metadataMock = setupMetadata(cluster, tp -> finalPart1LeaderEpoch);
+ RecordAccumulator.ReadyCheckResult result =
accum.ready(metadataMock, now);
+ assertTrue(result.readyNodes.contains(node1), "Node1 is ready");
+
+ Map<Integer, List<ProducerBatch>> batches =
accum.drain(metadataMock,
+ result.readyNodes, 999999 /* maxSize */, now);
+ assertTrue(batches.containsKey(node1.id()) &&
batches.get(node1.id()).size() == 1, "Node1 has 1 batch ready & drained");
+ ProducerBatch batch = batches.get(node1.id()).get(0);
+ assertEquals(Optional.of(part1LeaderEpoch),
batch.currentLeaderEpoch());
+ assertEquals(1, batch.attemptsWhenLeaderLastChanged());
+
+ // Re-enqueue batch for subsequent retries/test-cases.
+ accum.reenqueue(batch, now);
+ }
+
+ // In this retry of batchA, wait-time between retries is more than
configured and no leader change, so should not backoff.
+ {
+ log.info("In this retry of batchA, wait-time between retries is
more than configured and no leader change, so should not backoff");
+ now += 2 * retryBackoffMaxMs;
+ // Create cluster metadata, with new leader epoch.
+ part1 = new PartitionInfo(topic, partition1, node1, null, null,
null);
+ cluster = new Cluster(null, Arrays.asList(node1, node2),
Arrays.asList(part1),
+ Collections.emptySet(), Collections.emptySet());
+ final int finalPart1LeaderEpoch = part1LeaderEpoch;
+ metadataMock = setupMetadata(cluster, tp -> finalPart1LeaderEpoch);
+ RecordAccumulator.ReadyCheckResult result =
accum.ready(metadataMock, now);
+ assertTrue(result.readyNodes.contains(node1), "Node1 is ready");
+
+ Map<Integer, List<ProducerBatch>> batches =
accum.drain(metadataMock,
+ result.readyNodes, 999999 /* maxSize */, now);
+ assertTrue(batches.containsKey(node1.id()) &&
batches.get(node1.id()).size() == 1, "Node1 has 1 batch ready & drained");
+ ProducerBatch batch = batches.get(node1.id()).get(0);
+ assertEquals(Optional.of(part1LeaderEpoch),
batch.currentLeaderEpoch());
+ assertEquals(1, batch.attemptsWhenLeaderLastChanged());
+
+ // Re-enqueue batch for subsequent retries/test-cases.
+ accum.reenqueue(batch, now);
+ }
+
+ // In this retry of batchA, wait-time between retries is more than
configured and leader has changed, so should not backoff.
+ {
+ log.info("In this retry of batchA, wait-time between retries is
more than configured and leader has changed, so should not backoff.");
+ now += 2 * retryBackoffMaxMs;
+ part1LeaderEpoch++;
+ // Create cluster metadata, with new leader epoch.
+ part1 = new PartitionInfo(topic, partition1, node1, null, null,
null);
+ cluster = new Cluster(null, Arrays.asList(node1, node2),
Arrays.asList(part1),
+ Collections.emptySet(), Collections.emptySet());
+ final int finalPart1LeaderEpoch = part1LeaderEpoch;
+ metadataMock = setupMetadata(cluster, tp -> finalPart1LeaderEpoch);
+ RecordAccumulator.ReadyCheckResult result =
accum.ready(metadataMock, now);
+ assertTrue(result.readyNodes.contains(node1), "Node1 is ready");
+
+ Map<Integer, List<ProducerBatch>> batches =
accum.drain(metadataMock,
+ result.readyNodes, 999999 /* maxSize */, now);
+ assertTrue(batches.containsKey(node1.id()) &&
batches.get(node1.id()).size() == 1, "Node1 has 1 batch ready & drained");
+ ProducerBatch batch = batches.get(node1.id()).get(0);
+ assertEquals(Optional.of(part1LeaderEpoch),
batch.currentLeaderEpoch());
+ assertEquals(3, batch.attemptsWhenLeaderLastChanged());
+
+ // Re-enqueue batch for subsequent retries/test-cases.
+ accum.reenqueue(batch, now);
+ }
+ }
+
+ @Test
+ public void testDrainWithANodeThatDoesntHostAnyPartitions() {
Review Comment:
`RecordAccumulator` `drain` now uses Metadata Vs Cluster. Metadata can
change where as Cluster is immutable. Hence added tests w.r.t metadata
changing(which weren't possible earlier)
1. node(broker) hosts partitions but partitions map to it change underneath
2. node(broker) no longer hosts any partitions.
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##########
@@ -94,9 +100,45 @@ public ProducerBatch(TopicPartition tp,
MemoryRecordsBuilder recordsBuilder, lon
this.isSplitBatch = isSplitBatch;
float compressionRatioEstimation =
CompressionRatioEstimator.estimation(topicPartition.topic(),
recordsBuilder.compressionType());
+ this.currentLeaderEpoch = Optional.empty();
+ this.attemptsWhenLeaderLastChanged = 0;
recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation);
}
+ /*
+ * This is called to update the leader-epoch to which this batch is going
to be produced in the ongoing attempt.
+ * @param latestLeaderEpoch The latest leader epoch.
+ * @return true if the leader has changed, otherwise false.
+ */
+ boolean maybeUpdateLeaderEpoch(Optional<Integer> latestLeaderEpoch) {
Review Comment:
So the method does 2 things
1. updates leader if a newer leader is supplied
2. also return has changed for the ongoing retry of the batch.
Keeping them both in 1 method, is a bit confusing, as evident in the naming
😄
So i have split it into 2 methods, to me code is more readable now.
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##########
@@ -94,9 +100,45 @@ public ProducerBatch(TopicPartition tp,
MemoryRecordsBuilder recordsBuilder, lon
this.isSplitBatch = isSplitBatch;
float compressionRatioEstimation =
CompressionRatioEstimator.estimation(topicPartition.topic(),
recordsBuilder.compressionType());
+ this.currentLeaderEpoch = Optional.empty();
+ this.attemptsWhenLeaderLastChanged = 0;
recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation);
}
+ /*
+ * This is called to update the leader-epoch to which this batch is going
to be produced in the ongoing attempt.
+ * @param latestLeaderEpoch The latest leader epoch.
+ * @return true if the leader has changed, otherwise false.
+ */
+ boolean maybeUpdateLeaderEpoch(Optional<Integer> latestLeaderEpoch) {
+ if (!latestLeaderEpoch.isPresent())
+ return false;
+
+ boolean leaderChanged = false;
+ int attempts = attempts();
+ log.trace("For {}, attempting to change leader, currentLeaderEpoch:
{}, attemptsWhenLeaderLastChanged:{}, latestLeaderEpoch: {}, current attempt:
{}",
+ this, currentLeaderEpoch.isPresent() ? currentLeaderEpoch.get() :
"un-initialized", attemptsWhenLeaderLastChanged, latestLeaderEpoch.get(),
attempts);
+ boolean isRetry = attempts >= 1;
+ // Checking for leader change makes sense only from 1st retry
onwards(i.e attempt >=1).
+ if (isRetry) {
+ // If the leader's epoch has changed, this counts as a leader
change
+ if (!currentLeaderEpoch.equals(latestLeaderEpoch)) {
+ attemptsWhenLeaderLastChanged = attempts;
+ leaderChanged = true;
+ } else {
+ // Otherwise, it's only a leader change until the first
attempt is made with this leader
+ leaderChanged = attempts == attemptsWhenLeaderLastChanged;
+ }
+ }
+ if (leaderChanged) {
+ log.debug("For {}, leader has changed, oldEpoch: {}, newEpoch: {}",
Review Comment:
fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]