kirktrue commented on code in PR #14384: URL: https://github.com/apache/kafka/pull/14384#discussion_r1326170737
########## clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java: ########## @@ -94,9 +100,40 @@ public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, lon this.isSplitBatch = isSplitBatch; float compressionRatioEstimation = CompressionRatioEstimator.estimation(topicPartition.topic(), recordsBuilder.compressionType()); + this.currentLeaderEpoch = PartitionInfo.UNKNOWN_LEADER_EPOCH; + this.leaderChangedAttempts = -1; recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation); } + /* + * Returns whether the leader epoch has changed since the last attempt. + * @param latestLeaderEpoch The latest leader epoch. + * @return true if the leader has changed, otherwise false. + */ + boolean hasLeaderChanged(int latestLeaderEpoch) { + boolean leaderChanged = false; + // Checking for leader change makes sense only from 1st retry onwards(attempt >=1). + log.trace("For {}, attempting to change leader, currentLeaderEpoch:{}, leaderChangedAttempts:{}, latestLeaderEpoch: {}, current Attempt: {}", Review Comment: Can I be extremely nit-picky about the capitalization of `Attempt` in the log line? Any reason it can't just be `attempt`? ########## clients/src/main/java/org/apache/kafka/common/PartitionInfo.java: ########## @@ -20,26 +20,35 @@ * This is used to describe per-partition state in the MetadataResponse. */ public class PartitionInfo { + public static final int UNKNOWN_LEADER_EPOCH = -1; private final String topic; private final int partition; private final Node leader; + private final int leaderEpoch; private final Node[] replicas; private final Node[] inSyncReplicas; private final Node[] offlineReplicas; public PartitionInfo(String topic, int partition, Node leader, Node[] replicas, Node[] inSyncReplicas) { - this(topic, partition, leader, replicas, inSyncReplicas, new Node[0]); + this(topic, partition, leader, UNKNOWN_LEADER_EPOCH, replicas, inSyncReplicas, new Node[0]); + } + + public PartitionInfo(String topic, int partition, Node leader, Node[] replicas, Node[] inSyncReplicas, + Node[] offlineReplicas) { + this(topic, partition, leader, UNKNOWN_LEADER_EPOCH, replicas, inSyncReplicas, offlineReplicas); } public PartitionInfo(String topic, int partition, Node leader, + int leaderEpoch, Review Comment: This is where we might get into trouble since we're breaking a constructor by adding a parameter. ########## clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java: ########## @@ -96,6 +98,8 @@ public class RecordAccumulatorTest { private final long maxBlockTimeMs = 1000; private final LogContext logContext = new LogContext(); + private final Logger log = logContext.logger(RecordAccumulatorTest.class); Review Comment: I am not personally against logging in tests, but they should be turned down so that they're not emitted by default. ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java: ########## @@ -94,9 +100,40 @@ public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, lon this.isSplitBatch = isSplitBatch; float compressionRatioEstimation = CompressionRatioEstimator.estimation(topicPartition.topic(), recordsBuilder.compressionType()); + this.currentLeaderEpoch = PartitionInfo.UNKNOWN_LEADER_EPOCH; + this.leaderChangedAttempts = -1; recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation); } + /* + * Returns whether the leader epoch has changed since the last attempt. + * @param latestLeaderEpoch The latest leader epoch. + * @return true if the leader has changed, otherwise false. + */ + boolean hasLeaderChanged(int latestLeaderEpoch) { + boolean leaderChanged = false; + // Checking for leader change makes sense only from 1st retry onwards(attempt >=1). + log.trace("For {}, attempting to change leader, currentLeaderEpoch:{}, leaderChangedAttempts:{}, latestLeaderEpoch: {}, current Attempt: {}", Review Comment: Also super nit-picky: spaces after colons in message. Sorry, I can't help myself 😛 ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java: ########## @@ -94,9 +100,40 @@ public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, lon this.isSplitBatch = isSplitBatch; float compressionRatioEstimation = CompressionRatioEstimator.estimation(topicPartition.topic(), recordsBuilder.compressionType()); + this.currentLeaderEpoch = PartitionInfo.UNKNOWN_LEADER_EPOCH; + this.leaderChangedAttempts = -1; recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation); } + /* + * Returns whether the leader epoch has changed since the last attempt. + * @param latestLeaderEpoch The latest leader epoch. + * @return true if the leader has changed, otherwise false. + */ + boolean hasLeaderChanged(int latestLeaderEpoch) { Review Comment: ```suggestion boolean maybeUpdateLeaderEpoch(int latestLeaderEpoch) { ``` This will imply that we're not just determining that the leader has changed, we're updating internal state. ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ########## @@ -859,8 +863,9 @@ private List<ProducerBatch> drainBatchesForOneNode(Cluster cluster, Node node, i continue; Deque<ProducerBatch> deque = getDeque(tp); - if (deque == null) + if (deque == null) { Review Comment: Unless there's a valid reason to, we should omit formatting changes. ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java: ########## @@ -94,9 +100,40 @@ public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, lon this.isSplitBatch = isSplitBatch; float compressionRatioEstimation = CompressionRatioEstimator.estimation(topicPartition.topic(), recordsBuilder.compressionType()); + this.currentLeaderEpoch = PartitionInfo.UNKNOWN_LEADER_EPOCH; + this.leaderChangedAttempts = -1; Review Comment: Seems as though 0 would do, yes. ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java: ########## @@ -94,9 +100,40 @@ public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, lon this.isSplitBatch = isSplitBatch; float compressionRatioEstimation = CompressionRatioEstimator.estimation(topicPartition.topic(), recordsBuilder.compressionType()); + this.currentLeaderEpoch = PartitionInfo.UNKNOWN_LEADER_EPOCH; + this.leaderChangedAttempts = -1; recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation); } + /* + * Returns whether the leader epoch has changed since the last attempt. + * @param latestLeaderEpoch The latest leader epoch. + * @return true if the leader has changed, otherwise false. + */ + boolean hasLeaderChanged(int latestLeaderEpoch) { + boolean leaderChanged = false; + // Checking for leader change makes sense only from 1st retry onwards(attempt >=1). + log.trace("For {}, attempting to change leader, currentLeaderEpoch:{}, leaderChangedAttempts:{}, latestLeaderEpoch: {}, current Attempt: {}", + this, currentLeaderEpoch, leaderChangedAttempts, latestLeaderEpoch, attempts()); + if (attempts() >= 1) { + // If the leader's epoch has changed, this counts as a leader change + if (currentLeaderEpoch != latestLeaderEpoch) { + leaderChangedAttempts = attempts(); + leaderChanged = true; + } else { + // Otherwise, it's only a leader change until the first attempt is made with this leader Review Comment: I'm sorry, I don't quite get this logic. If there are at least one attempt and the above epoch comparison is false, why do we consider the leader as having changed? ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java: ########## @@ -94,9 +100,40 @@ public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, lon this.isSplitBatch = isSplitBatch; float compressionRatioEstimation = CompressionRatioEstimator.estimation(topicPartition.topic(), recordsBuilder.compressionType()); + this.currentLeaderEpoch = PartitionInfo.UNKNOWN_LEADER_EPOCH; + this.leaderChangedAttempts = -1; recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation); } + /* + * Returns whether the leader epoch has changed since the last attempt. + * @param latestLeaderEpoch The latest leader epoch. + * @return true if the leader has changed, otherwise false. + */ + boolean hasLeaderChanged(int latestLeaderEpoch) { + boolean leaderChanged = false; + // Checking for leader change makes sense only from 1st retry onwards(attempt >=1). + log.trace("For {}, attempting to change leader, currentLeaderEpoch:{}, leaderChangedAttempts:{}, latestLeaderEpoch: {}, current Attempt: {}", Review Comment: The value shouldn't change between those calls, right? ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java: ########## @@ -94,9 +100,40 @@ public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, lon this.isSplitBatch = isSplitBatch; float compressionRatioEstimation = CompressionRatioEstimator.estimation(topicPartition.topic(), recordsBuilder.compressionType()); + this.currentLeaderEpoch = PartitionInfo.UNKNOWN_LEADER_EPOCH; + this.leaderChangedAttempts = -1; recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation); } + /* + * Returns whether the leader epoch has changed since the last attempt. + * @param latestLeaderEpoch The latest leader epoch. + * @return true if the leader has changed, otherwise false. + */ + boolean hasLeaderChanged(int latestLeaderEpoch) { + boolean leaderChanged = false; + // Checking for leader change makes sense only from 1st retry onwards(attempt >=1). + log.trace("For {}, attempting to change leader, currentLeaderEpoch:{}, leaderChangedAttempts:{}, latestLeaderEpoch: {}, current Attempt: {}", + this, currentLeaderEpoch, leaderChangedAttempts, latestLeaderEpoch, attempts()); + if (attempts() >= 1) { + // If the leader's epoch has changed, this counts as a leader change + if (currentLeaderEpoch != latestLeaderEpoch) { + leaderChangedAttempts = attempts(); + leaderChanged = true; + } else { + // Otherwise, it's only a leader change until the first attempt is made with this leader + leaderChanged = attempts.get() == leaderChangedAttempts; + } + } + if (leaderChanged) { + log.debug("For {}, leaderChanged, currentLeaderEpoch:{}, leaderChangedAttempts:{}", Review Comment: ```suggestion log.debug("For {}, a leader change was detected, currentLeaderEpoch: {}, leaderChangedAttempts: {}", ``` ########## clients/src/main/java/org/apache/kafka/common/PartitionInfo.java: ########## @@ -60,12 +69,19 @@ public int partition() { } /** - * The node id of the node currently acting as a leader for this partition or null if there is no leader + * The node currently acting as a leader for this partition or null if there is no leader */ public Node leader() { return leader; } + /** + * The epoch of the partition's leader. Review Comment: `Optional` here, right? ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java: ########## @@ -94,9 +100,40 @@ public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, lon this.isSplitBatch = isSplitBatch; float compressionRatioEstimation = CompressionRatioEstimator.estimation(topicPartition.topic(), recordsBuilder.compressionType()); + this.currentLeaderEpoch = PartitionInfo.UNKNOWN_LEADER_EPOCH; + this.leaderChangedAttempts = -1; recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation); } + /* + * Returns whether the leader epoch has changed since the last attempt. + * @param latestLeaderEpoch The latest leader epoch. + * @return true if the leader has changed, otherwise false. + */ + boolean hasLeaderChanged(int latestLeaderEpoch) { + boolean leaderChanged = false; + // Checking for leader change makes sense only from 1st retry onwards(attempt >=1). + log.trace("For {}, attempting to change leader, currentLeaderEpoch:{}, leaderChangedAttempts:{}, latestLeaderEpoch: {}, current Attempt: {}", + this, currentLeaderEpoch, leaderChangedAttempts, latestLeaderEpoch, attempts()); + if (attempts() >= 1) { + // If the leader's epoch has changed, this counts as a leader change + if (currentLeaderEpoch != latestLeaderEpoch) { + leaderChangedAttempts = attempts(); + leaderChanged = true; + } else { + // Otherwise, it's only a leader change until the first attempt is made with this leader + leaderChanged = attempts.get() == leaderChangedAttempts; + } + } + if (leaderChanged) { + log.debug("For {}, leaderChanged, currentLeaderEpoch:{}, leaderChangedAttempts:{}", Review Comment: Agreed. ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java: ########## @@ -94,9 +100,40 @@ public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, lon this.isSplitBatch = isSplitBatch; float compressionRatioEstimation = CompressionRatioEstimator.estimation(topicPartition.topic(), recordsBuilder.compressionType()); + this.currentLeaderEpoch = PartitionInfo.UNKNOWN_LEADER_EPOCH; + this.leaderChangedAttempts = -1; recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation); } + /* + * Returns whether the leader epoch has changed since the last attempt. + * @param latestLeaderEpoch The latest leader epoch. + * @return true if the leader has changed, otherwise false. + */ + boolean hasLeaderChanged(int latestLeaderEpoch) { + boolean leaderChanged = false; + // Checking for leader change makes sense only from 1st retry onwards(attempt >=1). + log.trace("For {}, attempting to change leader, currentLeaderEpoch:{}, leaderChangedAttempts:{}, latestLeaderEpoch: {}, current Attempt: {}", Review Comment: Also super nit-picky: spaces after colons in message. Sorry, I can't help myself 😛 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org