Repository: kafka Updated Branches: refs/heads/trunk 45e7f7130 -> 4a3d244a2
MINOR: do not create a StandbyTask if there is no state store in the task guozhangwang An optimization which may reduce unnecessary poll for standby tasks. Author: Yasuhiro Matsuda <[email protected]> Reviewers: Guozhang Wang Closes #535 from ymatsuda/remove_empty_standby_task Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4a3d244a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4a3d244a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4a3d244a Branch: refs/heads/trunk Commit: 4a3d244a2cb95654d18368de6e5b67661d4f4f10 Parents: 45e7f71 Author: Yasuhiro Matsuda <[email protected]> Authored: Mon Nov 16 14:09:27 2015 -0800 Committer: Guozhang Wang <[email protected]> Committed: Mon Nov 16 14:09:27 2015 -0800 ---------------------------------------------------------------------- .../kafka/streams/processor/internals/StreamThread.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/4a3d244a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index bbaeb14..796e53f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -542,7 +542,11 @@ public class StreamThread extends Thread { ProcessorTopology topology = builder.build(id.topicGroupId); - return new StandbyTask(id, restoreConsumer, topology, config, sensors); + if (!topology.stateStoreSuppliers().isEmpty()) { + return new StandbyTask(id, restoreConsumer, topology, config, sensors); + } else { + return null; + } } private void addStandbyTasks() { @@ -550,8 +554,10 @@ public class StreamThread extends Thread { for (TaskId taskId : partitionGrouper.standbyTasks()) { StandbyTask task = createStandbyTask(taskId); - standbyTasks.put(taskId, task); - checkpointedOffsets.putAll(task.checkpointedOffsets()); + if (task != null) { + standbyTasks.put(taskId, task); + checkpointedOffsets.putAll(task.checkpointedOffsets()); + } } restoreConsumer.assign(new ArrayList<>(checkpointedOffsets.keySet()));
