MINOR: small code optimizations in streams guozhangwang
Author: Yasuhiro Matsuda <[email protected]> Reviewers: Guozhang Wang <[email protected]> Closes #1176 from ymatsuda/optimize Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/09f4a7fd Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/09f4a7fd Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/09f4a7fd Branch: refs/heads/0.10.0 Commit: 09f4a7fdc923b03a2f2ea29ecb0659ca450e8149 Parents: c216f8a Author: Yasuhiro Matsuda <[email protected]> Authored: Fri Apr 1 17:14:29 2016 -0700 Committer: Gwen Shapira <[email protected]> Committed: Tue Apr 5 17:08:53 2016 -0700 ---------------------------------------------------------------------- .../processor/internals/PartitionGroup.java | 6 +++- .../streams/processor/internals/StreamTask.java | 12 +++---- .../processor/internals/StreamThread.java | 35 ++++++++------------ 3 files changed, 24 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/09f4a7fd/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java index b487ff5..3d8f792 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java @@ -49,6 +49,10 @@ public class PartitionGroup { public TopicPartition partition() { return queue.partition(); } + + public RecordQueue queue() { + return queue; + } } // since task is thread-safe, we do not need to synchronize on local variables @@ -88,7 +92,7 @@ public class PartitionGroup { // get the first record from this queue. record = queue.poll(); - if (queue.size() > 0) { + if (!queue.isEmpty()) { queuesByTime.offer(queue); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/09f4a7fd/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index afa303c..61aeced 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -179,7 +179,7 @@ public class StreamTask extends AbstractTask implements Punctuator { // after processing this record, if its partition queue's buffered size has been // decreased to the threshold, we can then resume the consumption on this partition - if (partitionGroup.numBuffered(partition) == this.maxBufferedSize) { + if (recordInfo.queue().size() == this.maxBufferedSize) { consumer.resume(singleton(partition)); requiresPoll = true; } @@ -320,13 +320,13 @@ public class StreamTask extends AbstractTask implements Punctuator { @SuppressWarnings("unchecked") public <K, V> void forward(K key, V value) { ProcessorNode thisNode = currNode; - for (ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) { - currNode = childNode; - try { + try { + for (ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) { + currNode = childNode; childNode.process(key, value); - } finally { - currNode = thisNode; } + } finally { + currNode = thisNode; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/09f4a7fd/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 7d6b98f..c2a8e06 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -350,9 +350,12 @@ public class StreamThread extends Thread { requiresPoll = requiresPoll || task.requiresPoll(); sensors.processTimeSensor.record(time.milliseconds() - startProcess); - } - maybePunctuate(); + maybePunctuate(task); + + if (task.commitNeeded()) + commitOne(task, time.milliseconds()); + } // if pollTimeMs has passed since the last poll, we poll to respond to a possible rebalance // even when we paused all partitions. @@ -424,18 +427,16 @@ public class StreamThread extends Thread { return true; } - private void maybePunctuate() { - for (StreamTask task : activeTasks.values()) { - try { - long now = time.milliseconds(); + private void maybePunctuate(StreamTask task) { + try { + long now = time.milliseconds(); - if (task.maybePunctuate(now)) - sensors.punctuateTimeSensor.record(time.milliseconds() - now); + if (task.maybePunctuate(now)) + sensors.punctuateTimeSensor.record(time.milliseconds() - now); - } catch (KafkaException e) { - log.error("Failed to punctuate active task #" + task.id() + " in thread [" + this.getName() + "]: ", e); - throw e; - } + } catch (KafkaException e) { + log.error("Failed to punctuate active task #" + task.id() + " in thread [" + this.getName() + "]: ", e); + throw e; } } @@ -449,16 +450,6 @@ public class StreamThread extends Thread { lastCommit = now; processStandbyRecords = true; - } else { - for (StreamTask task : activeTasks.values()) { - try { - if (task.commitNeeded()) - commitOne(task, time.milliseconds()); - } catch (KafkaException e) { - log.error("Failed to commit active task #" + task.id() + " in thread [" + this.getName() + "]: ", e); - throw e; - } - } } }
