Repository: kafka Updated Branches: refs/heads/trunk bb629f224 -> e62dd4cb7
MINOR: Remove unnecessary synchronized block in org.apache.kafka.streams.processor.internals.StreamTask The StreamTask is owned by a specific thread, so it doesn't seem necessary to synchronized the processing of the records as discussed with guozhangwang on the dev mailing list Author: PierreCoquentin <[email protected]> Reviewers: Guozhang Wang <[email protected]> Closes #1688 from PierreCoquentin/trunk Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e62dd4cb Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e62dd4cb Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e62dd4cb Branch: refs/heads/trunk Commit: e62dd4cb770e3662524d4f2f50547549e3ab2eaf Parents: bb629f2 Author: Pierre Coquentin <[email protected]> Authored: Tue Aug 2 14:30:12 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Tue Aug 2 14:30:12 2016 -0700 ---------------------------------------------------------------------- .../streams/processor/internals/StreamTask.java | 66 ++++++++++---------- 1 file changed, 32 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/e62dd4cb/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 3126dd4..18b7646 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -152,51 +152,49 @@ public class StreamTask extends AbstractTask implements Punctuator { */ @SuppressWarnings("unchecked") public int process() { - synchronized (this) { - // get the next record to process - StampedRecord record = partitionGroup.nextRecord(recordInfo); + // get the next record to process + StampedRecord record = partitionGroup.nextRecord(recordInfo); - // if there is no record to process, return immediately - if (record == null) { - requiresPoll = true; - return 0; - } - - requiresPoll = false; + // if there is no record to process, return immediately + if (record == null) { + requiresPoll = true; + return 0; + } - try { - // process the record by passing to the source node of the topology - this.currRecord = record; - this.currNode = recordInfo.node(); - TopicPartition partition = recordInfo.partition(); + requiresPoll = false; - log.debug("Start processing one record [{}]", currRecord); + try { + // process the record by passing to the source node of the topology + this.currRecord = record; + this.currNode = recordInfo.node(); + TopicPartition partition = recordInfo.partition(); - this.currNode.process(currRecord.key(), currRecord.value()); + log.debug("Start processing one record [{}]", currRecord); - log.debug("Completed processing one record [{}]", currRecord); + this.currNode.process(currRecord.key(), currRecord.value()); - // update the consumed offset map after processing is done - consumedOffsets.put(partition, currRecord.offset()); - commitOffsetNeeded = true; + log.debug("Completed processing one record [{}]", currRecord); - // after processing this record, if its partition queue's buffered size has been - // decreased to the threshold, we can then resume the consumption on this partition - if (recordInfo.queue().size() == this.maxBufferedSize) { - consumer.resume(singleton(partition)); - requiresPoll = true; - } + // update the consumed offset map after processing is done + consumedOffsets.put(partition, currRecord.offset()); + commitOffsetNeeded = true; - if (partitionGroup.topQueueSize() <= this.maxBufferedSize) { - requiresPoll = true; - } - } finally { - this.currRecord = null; - this.currNode = null; + // after processing this record, if its partition queue's buffered size has been + // decreased to the threshold, we can then resume the consumption on this partition + if (recordInfo.queue().size() == this.maxBufferedSize) { + consumer.resume(singleton(partition)); + requiresPoll = true; } - return partitionGroup.numBuffered(); + if (partitionGroup.topQueueSize() <= this.maxBufferedSize) { + requiresPoll = true; + } + } finally { + this.currRecord = null; + this.currNode = null; } + + return partitionGroup.numBuffered(); } public boolean requiresPoll() {
