Repository: kafka Updated Branches: refs/heads/trunk 1fbe445dd -> 4c0660bf3
MINOR: Fix typo and tweak wording in `RecordAccumulator` comments This was recently introduced in: https://github.com/apache/kafka/commit/1fbe445dde71df0023a978c5e54dd229d3d23e1b Author: Ismael Juma <[email protected]> Reviewers: Jun Rao <[email protected]> Closes #1152 from ijuma/fix-typos-in-record-accumulator Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4c0660bf Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4c0660bf Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4c0660bf Branch: refs/heads/trunk Commit: 4c0660bf3da9879cb405a0f85cf1524511e091e8 Parents: 1fbe445 Author: Ismael Juma <[email protected]> Authored: Mon Mar 28 09:00:03 2016 -0700 Committer: Jun Rao <[email protected]> Committed: Mon Mar 28 09:00:03 2016 -0700 ---------------------------------------------------------------------- .../clients/producer/internals/RecordAccumulator.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/4c0660bf/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 915c4d3..7f5b16f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -218,14 +218,14 @@ public final class RecordAccumulator { int count = 0; for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) { Deque<RecordBatch> dq = entry.getValue(); - // We only check if the batch should be expired if the partition does not have a batch in flight. - // This is to avoid the later batches get expired when an earlier batch is still in progress. - // This protection only takes effect when user sets max.in.flight.request.per.connection=1. - // Otherwise the expiration order is not guranteed. TopicPartition tp = entry.getKey(); + // We only check if the batch should be expired if the partition does not have a batch in flight. + // This is to prevent later batches from being expired while an earlier batch is still in progress. + // Note that `muted` is only ever populated if `max.in.flight.request.per.connection=1` so this protection + // is only active in this case. Otherwise the expiration order is not guaranteed. if (!muted.contains(tp)) { synchronized (dq) { - // iterate over the batches and expire them if they have stayed in accumulator for more than requestTimeOut + // iterate over the batches and expire them if they have been in the accumulator for more than requestTimeOut RecordBatch lastBatch = dq.peekLast(); Iterator<RecordBatch> batchIterator = dq.iterator(); while (batchIterator.hasNext()) {
