AndrewJSchofield commented on code in PR #14384:
URL: https://github.com/apache/kafka/pull/14384#discussion_r1324590647


##########
clients/src/main/java/org/apache/kafka/common/PartitionInfo.java:
##########
@@ -60,12 +69,19 @@ public int partition() {
     }
 
     /**
-     * The node id of the node currently acting as a leader for this partition 
or null if there is no leader
+     * The node currently acting as a leader for this partition or null if 
there is no leader
      */
     public Node leader() {
         return leader;
     }
 
+    /**
+     * The epoch of the partition's leader.

Review Comment:
   I think it's the "The leader epoch of this partition if known".



##########
clients/src/main/java/org/apache/kafka/common/PartitionInfo.java:
##########
@@ -20,26 +20,35 @@
  * This is used to describe per-partition state in the MetadataResponse.
  */
 public class PartitionInfo {
+    public static final int UNKNOWN_LEADER_EPOCH = -1;
     private final String topic;
     private final int partition;
     private final Node leader;
+    private final int leaderEpoch;
     private final Node[] replicas;
     private final Node[] inSyncReplicas;
     private final Node[] offlineReplicas;
 
     public PartitionInfo(String topic, int partition, Node leader, Node[] 
replicas, Node[] inSyncReplicas) {
-        this(topic, partition, leader, replicas, inSyncReplicas, new Node[0]);
+        this(topic, partition, leader, UNKNOWN_LEADER_EPOCH, replicas, 
inSyncReplicas, new Node[0]);

Review Comment:
   And then `Optional.empty()` of course.



##########
clients/src/main/java/org/apache/kafka/common/Cluster.java:
##########
@@ -277,6 +277,19 @@ public Node leaderFor(TopicPartition topicPartition) {
             return info.leader();
     }
 
+    /**
+     * Get the current leader's epoch for the given topic-partition.
+     * @param topicPartition
+     * @return The epoch for partition's leader, or UNKNOWN_LEADER_EPOCH if 
epoch unkown.

Review Comment:
   Typo. "unkown" -> "unknown".



##########
clients/src/main/java/org/apache/kafka/common/PartitionInfo.java:
##########
@@ -20,26 +20,35 @@
  * This is used to describe per-partition state in the MetadataResponse.
  */
 public class PartitionInfo {
+    public static final int UNKNOWN_LEADER_EPOCH = -1;
     private final String topic;
     private final int partition;
     private final Node leader;
+    private final int leaderEpoch;

Review Comment:
   I think it's more aligned with the existing Kafka interface to use 
`Optional<Integer> leaderEpoch`. For example, 
`org.apache.kafka.clients.consumer.OffsetAndMetadata`.



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##########
@@ -94,9 +100,40 @@ public ProducerBatch(TopicPartition tp, 
MemoryRecordsBuilder recordsBuilder, lon
         this.isSplitBatch = isSplitBatch;
         float compressionRatioEstimation = 
CompressionRatioEstimator.estimation(topicPartition.topic(),
                                                                                
 recordsBuilder.compressionType());
+        this.currentLeaderEpoch = PartitionInfo.UNKNOWN_LEADER_EPOCH;
+        this.leaderChangedAttempts = -1;
         
recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation);
     }
 
+    /*
+     * Returns whether the leader epoch has changed since the last attempt.
+     * @param latestLeaderEpoch The latest leader epoch.
+     * @return true if the leader has changed, otherwise false.
+     */
+    boolean hasLeaderChanged(int latestLeaderEpoch) {
+        boolean leaderChanged = false;
+        // Checking for leader change makes sense only from 1st retry 
onwards(attempt >=1).
+        log.trace("For {}, attempting to change leader, currentLeaderEpoch:{}, 
leaderChangedAttempts:{}, latestLeaderEpoch: {}, current Attempt: {}",

Review Comment:
   There are 4 separate calls to `attempts()` (or the equivalent of 
`attempts.get()` in this method. Capture the value with a single call and use 
instead.



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##########
@@ -94,9 +100,40 @@ public ProducerBatch(TopicPartition tp, 
MemoryRecordsBuilder recordsBuilder, lon
         this.isSplitBatch = isSplitBatch;
         float compressionRatioEstimation = 
CompressionRatioEstimator.estimation(topicPartition.topic(),
                                                                                
 recordsBuilder.compressionType());
+        this.currentLeaderEpoch = PartitionInfo.UNKNOWN_LEADER_EPOCH;
+        this.leaderChangedAttempts = -1;

Review Comment:
   I don't think there's any need for a special value of -1. Initializing to 0 
would work fine.



##########
clients/src/main/java/org/apache/kafka/common/Cluster.java:
##########
@@ -277,6 +277,19 @@ public Node leaderFor(TopicPartition topicPartition) {
             return info.leader();
     }
 
+    /**
+     * Get the current leader's epoch for the given topic-partition.
+     * @param topicPartition
+     * @return The epoch for partition's leader, or UNKNOWN_LEADER_EPOCH if 
epoch unkown.
+     */
+    public int leaderEpochFor(TopicPartition topicPartition) {

Review Comment:
   Again, I think `Optional<Integer>` is more usual.



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##########
@@ -79,6 +80,11 @@ private enum FinalState { ABORTED, FAILED, SUCCEEDED }
     private boolean retry;
     private boolean reopened;
 
+    // Tracks the current-leader's epoch to which this batch would be sent, in 
the current to produce the batch.
+    private int currentLeaderEpoch;
+    // Tracks the attempt in which leader was changed to currentLeaderEpoch 
for the 1st time.
+    private int leaderChangedAttempts;

Review Comment:
   Sounds like the number of attempts to change the leader. Perhaps 
`attemptsWhenLeaderLastChanged` would be more descriptive.



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##########
@@ -156,6 +157,8 @@ public class SenderTest {
     private SenderMetricsRegistry senderMetricsRegistry = null;
     private final LogContext logContext = new LogContext();
 
+    private final Logger log = logContext.logger(SenderTest.class);

Review Comment:
   I'm surprised that you added a logger when this massive test hasn't used one 
so far. Seems like logging is not required for working out whether the test 
passed or how it failed. I suggest removing it.



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##########
@@ -517,4 +554,14 @@ public boolean isTransactional() {
     public boolean sequenceHasBeenReset() {
         return reopened;
     }
+
+    // VisibleForTesting
+    int currentLeaderEpoch() {
+        return currentLeaderEpoch;
+    }
+
+    // VisibleForTesting
+    int leaderChangedAttempts() {

Review Comment:
   `attemptsWhenLeaderLastChanged()` perhaps.



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##########
@@ -96,6 +98,8 @@ public class RecordAccumulatorTest {
     private final long maxBlockTimeMs = 1000;
     private final LogContext logContext = new LogContext();
 
+    private final Logger log = logContext.logger(RecordAccumulatorTest.class);

Review Comment:
   Probably unnecessary.



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##########
@@ -3146,6 +3149,97 @@ public void testInvalidTxnStateIsAnAbortableError() 
throws Exception {
 
         txnManager.beginTransaction();
     }
+    @Test
+    public void testProducerBatchRetriesWhenPartitionLeaderChanges() throws 
Exception {
+        Metrics m = new Metrics();
+        SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+        try {
+            // SETUP
+            String metricGrpName = "producer-metrics-test-stats-1";
+            long totalSize = 1024 * 1024;
+            BufferPool pool = new BufferPool(totalSize, batchSize, metrics, 
time,
+                metricGrpName);
+            long retryBackoffMaxMs = 100L;
+            // lingerMs is 0 to send batch as soon as any records are 
available on it.
+            this.accumulator = new RecordAccumulator(logContext, batchSize,
+                CompressionType.NONE, 0, 10L, retryBackoffMaxMs,
+                DELIVERY_TIMEOUT_MS, metrics, metricGrpName, time, 
apiVersions, null, pool);
+            Sender sender = new Sender(logContext, client, metadata, 
this.accumulator, false,
+                MAX_REQUEST_SIZE, ACKS_ALL,
+                10, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, 
null,
+                apiVersions);
+            // Update metadata with leader-epochs.
+            int tp0LeaderEpoch = 100;
+            int epoch = tp0LeaderEpoch;
+            this.client.updateMetadata(
+                RequestTestUtils.metadataUpdateWith(1, 
Collections.singletonMap("test", 2),
+                    tp -> {
+                        if (tp0.equals(tp)) {
+                            return epoch;
+                        }  else if (tp1.equals(tp)) {
+                            return 0;
+                        } else {
+                            throw new RuntimeException("unexpected tp " + tp);
+                        }
+                    }));
+
+            // Produce batch, it returns with a retry-able error like 
NOT_LEADER_OR_FOLLOWER, scheduled for retry.
+            // This triggers a metadata-request, that discovers a new-leader 
for tp0.
+            Future<RecordMetadata> futureIsProduced = appendToAccumulator(tp0, 
0L, "key", "value");
+            sender.runOnce(); // connect
+            sender.runOnce(); // send produce request
+            assertEquals(1, client.inFlightRequestCount(),
+                "We should have a single produce request in flight.");
+            assertEquals(1, sender.inFlightBatches(tp0).size());
+            assertTrue(client.hasInFlightRequests());
+            client.respond(produceResponse(tp0, -1, 
Errors.NOT_LEADER_OR_FOLLOWER, 0));
+            sender.runOnce(); // receive produce response, batch scheduled for 
retry
+            assertTrue(!futureIsProduced.isDone(), "Produce request is yet not 
done.");
+
+            // TEST that as new-leader(with epochA) is discovered, the batch 
is retried immediately i.e. skips any backoff period.
+            // Update leader epoch for tp0
+            log.info("Test that to a new-leader, batch is retried 
immediately.");
+            int newEpoch = ++tp0LeaderEpoch;
+            this.client.updateMetadata(
+                RequestTestUtils.metadataUpdateWith(1, 
Collections.singletonMap("test", 2),
+                    tp -> {
+                        if (tp0.equals(tp)) {
+                            return newEpoch;
+                        } else if (tp1.equals(tp)) {
+                            return 0;
+                        } else {
+                            throw new RuntimeException("unexpected tp " + tp);
+                        }
+                    }));
+            sender.runOnce(); // send produce request, immediately.
+            assertEquals(1, sender.inFlightBatches(tp0).size());
+            assertTrue(client.hasInFlightRequests());
+            client.respond(produceResponse(tp0, -1, 
Errors.NOT_LEADER_OR_FOLLOWER, 0));

Review Comment:
   Should there not be testing that uses the new `CurrentLeader` field in v9 of 
the ProduceResponse given that this is part of KIP-951 which this PR is 
implementing?



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##########
@@ -94,9 +100,40 @@ public ProducerBatch(TopicPartition tp, 
MemoryRecordsBuilder recordsBuilder, lon
         this.isSplitBatch = isSplitBatch;
         float compressionRatioEstimation = 
CompressionRatioEstimator.estimation(topicPartition.topic(),
                                                                                
 recordsBuilder.compressionType());
+        this.currentLeaderEpoch = PartitionInfo.UNKNOWN_LEADER_EPOCH;
+        this.leaderChangedAttempts = -1;
         
recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation);
     }
 
+    /*
+     * Returns whether the leader epoch has changed since the last attempt.
+     * @param latestLeaderEpoch The latest leader epoch.
+     * @return true if the leader has changed, otherwise false.
+     */
+    boolean hasLeaderChanged(int latestLeaderEpoch) {
+        boolean leaderChanged = false;
+        // Checking for leader change makes sense only from 1st retry 
onwards(attempt >=1).
+        log.trace("For {}, attempting to change leader, currentLeaderEpoch:{}, 
leaderChangedAttempts:{}, latestLeaderEpoch: {}, current Attempt: {}",
+            this, currentLeaderEpoch, leaderChangedAttempts, 
latestLeaderEpoch, attempts());
+        if (attempts() >= 1) {
+            // If the leader's epoch has changed, this counts as a leader 
change
+            if (currentLeaderEpoch != latestLeaderEpoch) {
+                leaderChangedAttempts = attempts();
+                leaderChanged = true;
+            } else {
+                // Otherwise, it's only a leader change until the first 
attempt is made with this leader
+                leaderChanged = attempts.get() == leaderChangedAttempts;
+            }
+        }
+        if (leaderChanged) {
+            log.debug("For {}, leaderChanged, currentLeaderEpoch:{}, 
leaderChangedAttempts:{}",

Review Comment:
   I don't see why you're tracing only the previous leader epoch here. I think 
you're interested in the new one, or both new and old. Because the assignment 
follows the log line, the "current" is still previous.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to