[PR] KAFKA-15415: On producer-batch retry, skip-backoff on a new leader (#… [kafka]

2023-10-10 Thread via GitHub


msn-tldr opened a new pull request, #14522:
URL: https://github.com/apache/kafka/pull/14522

   …14384)
   
   This PR backports https://github.com/apache/kafka/pull/14384
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15415 On producer-batch retry, skip-backoff on a new leader [kafka]

2023-10-02 Thread via GitHub


msn-tldr commented on code in PR #14384:
URL: https://github.com/apache/kafka/pull/14384#discussion_r1340370371


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##
@@ -156,6 +157,8 @@ public class SenderTest {
 private SenderMetricsRegistry senderMetricsRegistry = null;
 private final LogContext logContext = new LogContext();
 
+private final Logger log = logContext.logger(SenderTest.class);

Review Comment:
   Inclined to keep it since helps improve readability of test-logs when test 
fails. By default logging is off, see similar comment below for details.



##
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##
@@ -96,6 +98,8 @@ public class RecordAccumulatorTest {
 private final long maxBlockTimeMs = 1000;
 private final LogContext logContext = new LogContext();
 
+private final Logger log = logContext.logger(RecordAccumulatorTest.class);

Review Comment:
   I am inclined to keep it, as it helps improve readability of test logs 
if/when test fails. 
   By default, the logging is turned off in 
`clients/src/resources/log4.properties`



##
clients/src/main/java/org/apache/kafka/common/PartitionInfo.java:
##
@@ -20,6 +20,7 @@
  * This is used to describe per-partition state in the MetadataResponse.
  */
 public class PartitionInfo {
+public static final int UNKNOWN_LEADER_EPOCH = -1;

Review Comment:
   @dajac this is removed now, thanks!



##
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##
@@ -94,9 +100,40 @@ public ProducerBatch(TopicPartition tp, 
MemoryRecordsBuilder recordsBuilder, lon
 this.isSplitBatch = isSplitBatch;
 float compressionRatioEstimation = 
CompressionRatioEstimator.estimation(topicPartition.topic(),

 recordsBuilder.compressionType());
+this.currentLeaderEpoch = PartitionInfo.UNKNOWN_LEADER_EPOCH;
+this.leaderChangedAttempts = -1;
 
recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation);
 }
 
+/*
+ * Returns whether the leader epoch has changed since the last attempt.
+ * @param latestLeaderEpoch The latest leader epoch.
+ * @return true if the leader has changed, otherwise false.
+ */
+boolean hasLeaderChanged(int latestLeaderEpoch) {
+boolean leaderChanged = false;
+// Checking for leader change makes sense only from 1st retry 
onwards(attempt >=1).
+log.trace("For {}, attempting to change leader, currentLeaderEpoch:{}, 
leaderChangedAttempts:{}, latestLeaderEpoch: {}, current Attempt: {}",
+this, currentLeaderEpoch, leaderChangedAttempts, 
latestLeaderEpoch, attempts());
+if (attempts() >= 1) {
+// If the leader's epoch has changed, this counts as a leader 
change
+if (currentLeaderEpoch != latestLeaderEpoch) {
+leaderChangedAttempts = attempts();
+leaderChanged = true;
+} else {
+// Otherwise, it's only a leader change until the first 
attempt is made with this leader

Review Comment:
   > least one attempt
   
   implies a retry.
   
   > the above epoch comparison is false
   
   So leader-epoch is still the same. 
   
   Consider leader was changed at attempt=5, to epoch=100. 
maybeUpdateLeaderEpoch() should detect leader change even when called again at 
attempt=5, with the same epoch=100. As this is the same attempt in which the 
leader change was detected.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15415 On producer-batch retry, skip-backoff on a new leader [kafka]

2023-10-02 Thread via GitHub


msn-tldr commented on PR #14384:
URL: https://github.com/apache/kafka/pull/14384#issuecomment-1743383185

   @kirktrue 
   > @jsancio - can you weigh in about changes to org.apache.kafka.common 
needing a KIP?
   
   I have removed the changes to public classes, so this shouldn't require a 
KIP, take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15415 On producer-batch retry, skip-backoff on a new leader [kafka]

2023-10-03 Thread via GitHub


wcarlson5 commented on code in PR #14384:
URL: https://github.com/apache/kafka/pull/14384#discussion_r1344314253


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##
@@ -94,9 +100,45 @@ public ProducerBatch(TopicPartition tp, 
MemoryRecordsBuilder recordsBuilder, lon
 this.isSplitBatch = isSplitBatch;
 float compressionRatioEstimation = 
CompressionRatioEstimator.estimation(topicPartition.topic(),

 recordsBuilder.compressionType());
+this.currentLeaderEpoch = Optional.empty();
+this.attemptsWhenLeaderLastChanged = 0;
 
recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation);
 }
 
+/*
+ * This is called to update the leader-epoch to which this batch is going 
to be produced in the ongoing attempt.
+ * @param latestLeaderEpoch The latest leader epoch.
+ * @return true if the leader has changed, otherwise false.
+ */
+boolean maybeUpdateLeaderEpoch(Optional latestLeaderEpoch) {

Review Comment:
   It seems to be that this doesn't actually update the Leader Epoch, but 
rather it checks to see if the leader has changed since the last retry. Can we 
name it something more clear?



##
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##
@@ -156,6 +157,8 @@ public class SenderTest {
 private SenderMetricsRegistry senderMetricsRegistry = null;
 private final LogContext logContext = new LogContext();
 
+private final Logger log = logContext.logger(SenderTest.class);

Review Comment:
   Generally adding logging in tests isn't great. I would second removing it. I 
don't think the two lines added are worth the change. 
   
   If you want that information about why a batch isn't being processed or if a 
batch is being processed you should add them as logs to the actual code. They 
can be trace if necessary



##
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##
@@ -3146,6 +3149,97 @@ public void testInvalidTxnStateIsAnAbortableError() 
throws Exception {
 
 txnManager.beginTransaction();
 }
+@Test
+public void testProducerBatchRetriesWhenPartitionLeaderChanges() throws 
Exception {
+Metrics m = new Metrics();
+SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+try {
+// SETUP
+String metricGrpName = "producer-metrics-test-stats-1";
+long totalSize = 1024 * 1024;
+BufferPool pool = new BufferPool(totalSize, batchSize, metrics, 
time,
+metricGrpName);
+long retryBackoffMaxMs = 100L;

Review Comment:
   How difficult would it be to mock the backoff using a ticker? Too many times 
having to rely on timing in tests makes them become flaky. This seems like it 
would low probability here but we should keep it in mind



##
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##
@@ -1398,16 +1416,253 @@ public void testBuiltInPartitionerFractionalBatches() 
throws Exception {
 time.sleep(10);
 
 // We should have one batch ready.
-Set nodes = accum.ready(cluster, 
time.milliseconds()).readyNodes;
+Set nodes = accum.ready(metadataMock, 
time.milliseconds()).readyNodes;
 assertEquals(1, nodes.size(), "Should have 1 leader ready");
-List batches = accum.drain(cluster, nodes, 
Integer.MAX_VALUE, 0).entrySet().iterator().next().getValue();
+List batches = accum.drain(metadataMock, nodes, 
Integer.MAX_VALUE, 0).entrySet().iterator().next().getValue();
 assertEquals(1, batches.size(), "Should have 1 batch ready");
 int actualBatchSize = batches.get(0).records().sizeInBytes();
 assertTrue(actualBatchSize > batchSize / 2, "Batch must be greater 
than half batch.size");
 assertTrue(actualBatchSize < batchSize, "Batch must be less than 
batch.size");
 }
 }
 
+/**
+ * For a batch being retried, this validates ready() and drain() whether a 
batch should skip-backoff(retries-immediately), or backoff, based on -
+ * 1. how long it has waited between retry attempts.
+ * 2. change in leader hosting the partition.
+ */
+@Test
+public void testReadyAndDrainWhenABatchIsBeingRetried() throws 
InterruptedException {

Review Comment:
   I think this is just my unfamiliarity talking, but how are you checking if 
it is backing off? Is it just relying on these lines? 
   
   ```   
   assertEquals(Optional.of(part1LeaderEpoch), batch.currentLeaderEpoch());
   assertEquals(0, batch.attemptsWhenLeaderLastChanged());
   ```



##
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##
@@ -712,13 +714,14 @@ pr

Re: [PR] KAFKA-15415: On producer-batch retry, skip-backoff on a new leader [kafka]

2023-10-03 Thread via GitHub


kirktrue commented on code in PR #14384:
URL: https://github.com/apache/kafka/pull/14384#discussion_r1344502781


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##
@@ -94,9 +100,45 @@ public ProducerBatch(TopicPartition tp, 
MemoryRecordsBuilder recordsBuilder, lon
 this.isSplitBatch = isSplitBatch;
 float compressionRatioEstimation = 
CompressionRatioEstimator.estimation(topicPartition.topic(),

 recordsBuilder.compressionType());
+this.currentLeaderEpoch = Optional.empty();
+this.attemptsWhenLeaderLastChanged = 0;
 
recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation);
 }
 
+/*
+ * This is called to update the leader-epoch to which this batch is going 
to be produced in the ongoing attempt.
+ * @param latestLeaderEpoch The latest leader epoch.
+ * @return true if the leader has changed, otherwise false.
+ */
+boolean maybeUpdateLeaderEpoch(Optional latestLeaderEpoch) {
+if (!latestLeaderEpoch.isPresent())
+return false;
+
+boolean leaderChanged = false;
+int attempts = attempts();
+log.trace("For {}, attempting to change leader, currentLeaderEpoch: 
{}, attemptsWhenLeaderLastChanged:{}, latestLeaderEpoch: {}, current attempt: 
{}",
+this, currentLeaderEpoch.isPresent() ? currentLeaderEpoch.get() : 
"un-initialized", attemptsWhenLeaderLastChanged, latestLeaderEpoch.get(), 
attempts);
+boolean isRetry = attempts >= 1;
+// Checking for leader change makes sense only from 1st retry 
onwards(i.e attempt >=1).
+if (isRetry) {
+// If the leader's epoch has changed, this counts as a leader 
change
+if (!currentLeaderEpoch.equals(latestLeaderEpoch)) {
+attemptsWhenLeaderLastChanged = attempts;
+leaderChanged = true;
+} else {
+// Otherwise, it's only a leader change until the first 
attempt is made with this leader
+leaderChanged = attempts == attemptsWhenLeaderLastChanged;
+}
+}
+if (leaderChanged) {
+log.debug("For {}, leader has changed, oldEpoch: {}, newEpoch: {}",

Review Comment:
   nit: per @wcarlson5's [comment 
above](https://github.com/apache/kafka/pull/14384/files#r1344314253), the log 
message might be read that the internal state of the batch was _changed_, 
rather than simply _detected_. I don't have suggestions on how to reword 
though. It's fine if you want to leave it as is.



##
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##
@@ -842,22 +845,31 @@ private boolean 
shouldStopDrainBatchesForPartition(ProducerBatch first, TopicPar
 return false;
 }
 
-private List drainBatchesForOneNode(Cluster cluster, Node 
node, int maxSize, long now) {
+private List drainBatchesForOneNode(Metadata metadata, Node 
node, int maxSize, long now) {
 int size = 0;
-List parts = cluster.partitionsForNode(node.id());
+List parts = 
metadata.fetch().partitionsForNode(node.id());
 List ready = new ArrayList<>();
+if (parts.isEmpty())
+return ready;
 /* to make starvation less likely each node has it's own drainIndex */
 int drainIndex = getDrainIndex(node.idString());
 int start = drainIndex = drainIndex % parts.size();
 do {
 PartitionInfo part = parts.get(drainIndex);
+
 TopicPartition tp = new TopicPartition(part.topic(), 
part.partition());
 updateDrainIndex(node.idString(), drainIndex);
 drainIndex = (drainIndex + 1) % parts.size();
 // Only proceed if the partition has no in-flight batches.
 if (isMuted(tp))
 continue;
-
+Metadata.LeaderAndEpoch leaderAndEpoch = 
metadata.currentLeader(new TopicPartition(part.topic(), part.partition()));

Review Comment:
   nit: can we reuse `tp` instead of creating a new `TopicPartition`?



##
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##
@@ -94,9 +100,40 @@ public ProducerBatch(TopicPartition tp, 
MemoryRecordsBuilder recordsBuilder, lon
 this.isSplitBatch = isSplitBatch;
 float compressionRatioEstimation = 
CompressionRatioEstimator.estimation(topicPartition.topic(),

 recordsBuilder.compressionType());
+this.currentLeaderEpoch = PartitionInfo.UNKNOWN_LEADER_EPOCH;
+this.leaderChangedAttempts = -1;
 
recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation);
 }
 
+/*
+ * Returns whether the leader epoch has changed since the last attempt.
+ * @param latestLeaderEpoch The la

Re: [PR] KAFKA-15415: On producer-batch retry, skip-backoff on a new leader [kafka]

2023-10-04 Thread via GitHub


msn-tldr commented on code in PR #14384:
URL: https://github.com/apache/kafka/pull/14384#discussion_r1345590803


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##
@@ -94,9 +100,40 @@ public ProducerBatch(TopicPartition tp, 
MemoryRecordsBuilder recordsBuilder, lon
 this.isSplitBatch = isSplitBatch;
 float compressionRatioEstimation = 
CompressionRatioEstimator.estimation(topicPartition.topic(),

 recordsBuilder.compressionType());
+this.currentLeaderEpoch = PartitionInfo.UNKNOWN_LEADER_EPOCH;
+this.leaderChangedAttempts = -1;
 
recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation);
 }
 
+/*
+ * Returns whether the leader epoch has changed since the last attempt.
+ * @param latestLeaderEpoch The latest leader epoch.
+ * @return true if the leader has changed, otherwise false.
+ */
+boolean hasLeaderChanged(int latestLeaderEpoch) {
+boolean leaderChanged = false;
+// Checking for leader change makes sense only from 1st retry 
onwards(attempt >=1).
+log.trace("For {}, attempting to change leader, currentLeaderEpoch:{}, 
leaderChangedAttempts:{}, latestLeaderEpoch: {}, current Attempt: {}",
+this, currentLeaderEpoch, leaderChangedAttempts, 
latestLeaderEpoch, attempts());
+if (attempts() >= 1) {
+// If the leader's epoch has changed, this counts as a leader 
change
+if (currentLeaderEpoch != latestLeaderEpoch) {
+leaderChangedAttempts = attempts();
+leaderChanged = true;
+} else {
+// Otherwise, it's only a leader change until the first 
attempt is made with this leader

Review Comment:
   > I think I can see how the flow of the code would end up calling that 
multiple times.
   
   I gets called multiple times. Its sendProducerData -> ready -> 
partitionReady, and then again in sendProducerData -> drain -> 
drainBatchesForOneNode.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15415: On producer-batch retry, skip-backoff on a new leader [kafka]

2023-10-04 Thread via GitHub


msn-tldr commented on code in PR #14384:
URL: https://github.com/apache/kafka/pull/14384#discussion_r1345592537


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##
@@ -156,6 +157,8 @@ public class SenderTest {
 private SenderMetricsRegistry senderMetricsRegistry = null;
 private final LogContext logContext = new LogContext();
 
+private final Logger log = logContext.logger(SenderTest.class);

Review Comment:
   Ok fair enough will remove the logging from tests.
   
   I have already added traces in actual code, so that should be fine for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15415: On producer-batch retry, skip-backoff on a new leader [kafka]

2023-10-04 Thread via GitHub


msn-tldr commented on code in PR #14384:
URL: https://github.com/apache/kafka/pull/14384#discussion_r1345789833


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##
@@ -94,9 +100,45 @@ public ProducerBatch(TopicPartition tp, 
MemoryRecordsBuilder recordsBuilder, lon
 this.isSplitBatch = isSplitBatch;
 float compressionRatioEstimation = 
CompressionRatioEstimator.estimation(topicPartition.topic(),

 recordsBuilder.compressionType());
+this.currentLeaderEpoch = Optional.empty();
+this.attemptsWhenLeaderLastChanged = 0;
 
recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation);
 }
 
+/*
+ * This is called to update the leader-epoch to which this batch is going 
to be produced in the ongoing attempt.
+ * @param latestLeaderEpoch The latest leader epoch.
+ * @return true if the leader has changed, otherwise false.
+ */
+boolean maybeUpdateLeaderEpoch(Optional latestLeaderEpoch) {

Review Comment:
   So the method does 2 things
   1. updates leader if a newer leader is supplied
   2. also return has changed for the ongoing retry of the batch. 
   
   Keeping them both in 1 method, is a bit confusing, as evident in the naming 
😄 
   
   So i have split it into 2 methods, to me code is more readable now.



##
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##
@@ -3146,6 +3149,97 @@ public void testInvalidTxnStateIsAnAbortableError() 
throws Exception {
 
 txnManager.beginTransaction();
 }
+@Test
+public void testProducerBatchRetriesWhenPartitionLeaderChanges() throws 
Exception {
+Metrics m = new Metrics();
+SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+try {
+// SETUP
+String metricGrpName = "producer-metrics-test-stats-1";
+long totalSize = 1024 * 1024;
+BufferPool pool = new BufferPool(totalSize, batchSize, metrics, 
time,
+metricGrpName);
+long retryBackoffMaxMs = 100L;

Review Comment:
   Fair point. The `SenderTest` already mocks `time` using `MockTime`, and that 
is manually advanced. Is that what u had in mind?



##
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##
@@ -1398,16 +1416,253 @@ public void testBuiltInPartitionerFractionalBatches() 
throws Exception {
 time.sleep(10);
 
 // We should have one batch ready.
-Set nodes = accum.ready(cluster, 
time.milliseconds()).readyNodes;
+Set nodes = accum.ready(metadataMock, 
time.milliseconds()).readyNodes;
 assertEquals(1, nodes.size(), "Should have 1 leader ready");
-List batches = accum.drain(cluster, nodes, 
Integer.MAX_VALUE, 0).entrySet().iterator().next().getValue();
+List batches = accum.drain(metadataMock, nodes, 
Integer.MAX_VALUE, 0).entrySet().iterator().next().getValue();
 assertEquals(1, batches.size(), "Should have 1 batch ready");
 int actualBatchSize = batches.get(0).records().sizeInBytes();
 assertTrue(actualBatchSize > batchSize / 2, "Batch must be greater 
than half batch.size");
 assertTrue(actualBatchSize < batchSize, "Batch must be less than 
batch.size");
 }
 }
 
+/**
+ * For a batch being retried, this validates ready() and drain() whether a 
batch should skip-backoff(retries-immediately), or backoff, based on -
+ * 1. how long it has waited between retry attempts.
+ * 2. change in leader hosting the partition.
+ */
+@Test
+public void testReadyAndDrainWhenABatchIsBeingRetried() throws 
InterruptedException {

Review Comment:
   So if  
   1. batch should backoff - then both `ready` & `drain` would not have node1 & 
batch respectively in their output.
   ```
   assertFalse(result.readyNodes.contains(node1), "Node1 is not ready");
   assertTrue(batches.containsKey(node1.id()) && 
batches.get(node1.id()).isEmpty(),
   "No batches ready to be drained on Node1");
   ```
   2. batch should not be backing off - then both `ready` & `drain` would have 
node1 & batch respectively in their output.
   ```
   assertTrue(result.readyNodes.contains(node1), "Node1 is ready");
   assertTrue(batches.containsKey(node1.id()) && batches.get(node1.id()).size() 
== 1, "Node1 has 1 batch ready & drained");
   ```
   
   Other assertions are sanity checking auxillary conditions



##
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##
@@ -1398,16 +1416,253 @@ public void testBuiltInPartitionerFractionalBatches() 
throws Exception {
 time.sleep(10);
 
 // We should have one batch ready.
-Set nodes = accum.r

Re: [PR] KAFKA-15415: On producer-batch retry, skip-backoff on a new leader [kafka]

2023-10-04 Thread via GitHub


wcarlson5 commented on code in PR #14384:
URL: https://github.com/apache/kafka/pull/14384#discussion_r1346128343


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##
@@ -94,9 +100,42 @@ public ProducerBatch(TopicPartition tp, 
MemoryRecordsBuilder recordsBuilder, lon
 this.isSplitBatch = isSplitBatch;
 float compressionRatioEstimation = 
CompressionRatioEstimator.estimation(topicPartition.topic(),

 recordsBuilder.compressionType());
+this.currentLeaderEpoch = Optional.empty();
+this.attemptsWhenLeaderLastChanged = 0;
 
recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation);
 }
 
+/**
+ * It will update the leader to which this batch will be produced for the 
ongoing attempt, if a newer leader is known.
+ * @param latestLeaderEpoch latest leader's epoch.
+ */
+void maybeUpdateLeaderEpoch(Optional latestLeaderEpoch) {
+if (!currentLeaderEpoch.equals(latestLeaderEpoch)) {
+log.trace("For {}, leader will be updated, currentLeaderEpoch: {}, 
attemptsWhenLeaderLastChanged:{}, latestLeaderEpoch: {}, current attempt: {}",
+this, currentLeaderEpoch, attemptsWhenLeaderLastChanged, 
latestLeaderEpoch, attempts);
+attemptsWhenLeaderLastChanged = attempts();
+currentLeaderEpoch = latestLeaderEpoch;
+} else {
+log.trace("For {}, leader wasn't updated, currentLeaderEpoch: {}, 
attemptsWhenLeaderLastChanged:{}, latestLeaderEpoch: {}, current attempt: {}",
+this, currentLeaderEpoch, attemptsWhenLeaderLastChanged, 
latestLeaderEpoch, attempts);
+}
+}
+
+/**
+ * It will return true, for a when batch is being retried, it will be 
retried to a newer leader.
+ */
+
+boolean hasLeaderChangedForTheOngoingRetry() {

Review Comment:
   love this, thanks!



##
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##
@@ -1398,16 +1416,253 @@ public void testBuiltInPartitionerFractionalBatches() 
throws Exception {
 time.sleep(10);
 
 // We should have one batch ready.
-Set nodes = accum.ready(cluster, 
time.milliseconds()).readyNodes;
+Set nodes = accum.ready(metadataMock, 
time.milliseconds()).readyNodes;
 assertEquals(1, nodes.size(), "Should have 1 leader ready");
-List batches = accum.drain(cluster, nodes, 
Integer.MAX_VALUE, 0).entrySet().iterator().next().getValue();
+List batches = accum.drain(metadataMock, nodes, 
Integer.MAX_VALUE, 0).entrySet().iterator().next().getValue();
 assertEquals(1, batches.size(), "Should have 1 batch ready");
 int actualBatchSize = batches.get(0).records().sizeInBytes();
 assertTrue(actualBatchSize > batchSize / 2, "Batch must be greater 
than half batch.size");
 assertTrue(actualBatchSize < batchSize, "Batch must be less than 
batch.size");
 }
 }
 
+/**
+ * For a batch being retried, this validates ready() and drain() whether a 
batch should skip-backoff(retries-immediately), or backoff, based on -
+ * 1. how long it has waited between retry attempts.
+ * 2. change in leader hosting the partition.
+ */
+@Test
+public void testReadyAndDrainWhenABatchIsBeingRetried() throws 
InterruptedException {
+int part1LeaderEpoch = 100;
+// Create cluster metadata, partition1 being hosted by node1.
+part1 = new PartitionInfo(topic, partition1, node1, null, null, null);
+cluster = new Cluster(null, Arrays.asList(node1, node2), 
Arrays.asList(part1),
+Collections.emptySet(), Collections.emptySet());
+final int finalEpoch = part1LeaderEpoch;
+metadataMock = setupMetadata(cluster, tp -> finalEpoch);
+
+int batchSize = 10;
+int lingerMs = 10;
+int retryBackoffMs = 100;
+int retryBackoffMaxMs = 1000;
+int deliveryTimeoutMs = Integer.MAX_VALUE;
+long totalSize = 10 * 1024;
+String metricGrpName = "producer-metrics";
+final RecordAccumulator accum = new RecordAccumulator(logContext, 
batchSize,
+CompressionType.NONE, lingerMs, retryBackoffMs, retryBackoffMaxMs,
+deliveryTimeoutMs, metrics, metricGrpName, time, new 
ApiVersions(), null,
+new BufferPool(totalSize, batchSize, metrics, time, 
metricGrpName));
+
+// Create 1 batch(batchA) to be produced to partition1.
+long now = time.milliseconds();
+accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, 
null, maxBlockTimeMs, false, now, cluster);
+
+// 1st attempt to produce batchA, it should be ready & drained to be 
produced.
+{
+log.info("Running 1st attempt to produce batchA, it 

Re: [PR] KAFKA-15415: On producer-batch retry, skip-backoff on a new leader [kafka]

2023-10-04 Thread via GitHub


msn-tldr commented on code in PR #14384:
URL: https://github.com/apache/kafka/pull/14384#discussion_r1346188807


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##
@@ -3146,6 +3149,97 @@ public void testInvalidTxnStateIsAnAbortableError() 
throws Exception {
 
 txnManager.beginTransaction();
 }
+@Test
+public void testProducerBatchRetriesWhenPartitionLeaderChanges() throws 
Exception {
+Metrics m = new Metrics();
+SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+try {
+// SETUP
+String metricGrpName = "producer-metrics-test-stats-1";
+long totalSize = 1024 * 1024;
+BufferPool pool = new BufferPool(totalSize, batchSize, metrics, 
time,
+metricGrpName);
+long retryBackoffMaxMs = 100L;

Review Comment:
   `time.sleep(2 * retryBackoffMaxMs);` that is how i ensure that tests waits 
more than backoff period before retrying.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15415: On producer-batch retry, skip-backoff on a new leader [kafka]

2023-10-05 Thread via GitHub


msn-tldr commented on PR #14384:
URL: https://github.com/apache/kafka/pull/14384#issuecomment-1748637611

   Looking through CI failures - 
   
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14384/8/pipeline/10/
   
   For JDK 11 - The 2 flaky runs, and are unrelated to Java clients.
   https://ge.apache.org/s/q63xej2ew4w6u
   
   For JDK 17 - Again flaky tests, unrelated to Java client.
   https://ge.apache.org/s/dayodofvlspty/tests/overview?outcome=FAILED,FLAKY
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15415: On producer-batch retry, skip-backoff on a new leader [kafka]

2023-10-05 Thread via GitHub


wcarlson5 merged PR #14384:
URL: https://github.com/apache/kafka/pull/14384


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15415: On producer-batch retry, skip-backoff on a new leader [kafka]

2023-10-05 Thread via GitHub


msn-tldr commented on PR #14384:
URL: https://github.com/apache/kafka/pull/14384#issuecomment-1749004750

   @wcarlson5 thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15415: On producer-batch retry, skip-backoff on a new leader [kafka]

2023-10-05 Thread via GitHub


ijuma commented on PR #14384:
URL: https://github.com/apache/kafka/pull/14384#issuecomment-1749230972

   Would it make sense to backport this to 3.6 as well?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15415: On producer-batch retry, skip-backoff on a new leader (#… [kafka]

2023-10-11 Thread via GitHub


msn-tldr commented on PR #14522:
URL: https://github.com/apache/kafka/pull/14522#issuecomment-1757644848

   CI failed with unrelated flaky tests, see below. JDK17 had a 100% green run.
   JDK11 - 
https://ge.apache.org/s/q2va7lqt4kk3g/tests/overview?outcome=FLAKY,FAILED
   JDK20 - 
https://ge.apache.org/s/izb5us4rpisya/tests/overview?outcome=FLAKY,FAILED
   JDK8 - 
https://ge.apache.org/s/sjigcnlsvrstg/tests/overview?outcome=FLAKY,FAILED


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15415: On producer-batch retry, skip-backoff on a new leader (#… [kafka]

2023-10-11 Thread via GitHub


wcarlson5 merged PR #14522:
URL: https://github.com/apache/kafka/pull/14522


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15415: On producer-batch retry, skip-backoff on a new leader [kafka]

2023-10-11 Thread via GitHub


msn-tldr commented on PR #14384:
URL: https://github.com/apache/kafka/pull/14384#issuecomment-1757962406

   @ijuma this is merged in 3.6 now, so should be available in 3.6.1.
   https://github.com/apache/kafka/pull/14522


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15415: On producer-batch retry, skip-backoff on a new leader [kafka]

2023-10-11 Thread via GitHub


ijuma commented on PR #14384:
URL: https://github.com/apache/kafka/pull/14384#issuecomment-1758021595

   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org