Repository: kafka Updated Branches: refs/heads/trunk 4ad165c07 -> 841d2d1a2
MINOR: StreamThread performance optimization guozhangwang Author: Yasuhiro Matsuda <yasuh...@confluent.io> Reviewers: Guozhang Wang Closes #680 from ymatsuda/perf Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/841d2d1a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/841d2d1a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/841d2d1a Branch: refs/heads/trunk Commit: 841d2d1a26af94ec95c480dbf2453f9c7d28c2f7 Parents: 4ad165c Author: Yasuhiro Matsuda <yasuh...@confluent.io> Authored: Tue Dec 15 23:53:23 2015 -0800 Committer: Guozhang Wang <wangg...@gmail.com> Committed: Tue Dec 15 23:53:23 2015 -0800 ---------------------------------------------------------------------- .../processor/internals/StreamThread.java | 63 ++++++++++++-------- 1 file changed, 38 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/841d2d1a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 37d24bb..578357a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -84,6 +84,8 @@ public class StreamThread extends Thread { private final AtomicBoolean running; private final Map<TaskId, StreamTask> activeTasks; private final Map<TaskId, StandbyTask> standbyTasks; + private final Map<TopicPartition, StreamTask> activeTasksByPartition; + private final Map<TopicPartition, StandbyTask> standbyTasksByPartition; private final Set<TaskId> prevTasks; private final Time time; private final File stateDir; @@ -157,6 +159,8 @@ public class StreamThread extends Thread { // initialize the task list this.activeTasks = new HashMap<>(); this.standbyTasks = new HashMap<>(); + this.activeTasksByPartition = new HashMap<>(); + this.standbyTasksByPartition = new HashMap<>(); this.prevTasks = new HashSet<>(); // standby ktables @@ -261,7 +265,7 @@ public class StreamThread extends Thread { removeStreamTasks(); removeStandbyTasks(); } catch (Throwable e) { - // already logged in removePartition() + // already logged in removeStreamTasks() and removeStandbyTasks() } log.info("Stream thread shutdown complete [" + this.getName() + "]"); @@ -279,15 +283,16 @@ public class StreamThread extends Thread { while (stillRunning()) { // try to fetch some records if necessary if (requiresPoll) { + requiresPoll = false; + long startPoll = time.milliseconds(); ConsumerRecords<byte[], byte[]> records = consumer.poll(totalNumBuffered == 0 ? this.pollTimeMs : 0); if (!records.isEmpty()) { - for (StreamTask task : activeTasks.values()) { - for (TopicPartition partition : task.partitions()) { - task.addRecords(partition, records.records(partition)); - } + for (TopicPartition partition : records.partitions()) { + StreamTask task = activeTasksByPartition.get(partition); + task.addRecords(partition, records.records(partition)); } } @@ -299,8 +304,6 @@ public class StreamThread extends Thread { if (!activeTasks.isEmpty()) { // try to process one record from each task - requiresPoll = false; - for (StreamTask task : activeTasks.values()) { long startProcess = time.milliseconds(); @@ -329,16 +332,15 @@ public class StreamThread extends Thread { if (!standbyTasks.isEmpty()) { if (processStandbyRecords) { if (!standbyRecords.isEmpty()) { - for (StandbyTask task : standbyTasks.values()) { - for (TopicPartition partition : task.changeLogPartitions()) { - List<ConsumerRecord<byte[], byte[]>> remaining = standbyRecords.remove(partition); + for (TopicPartition partition : standbyRecords.keySet()) { + StandbyTask task = standbyTasksByPartition.get(partition); + List<ConsumerRecord<byte[], byte[]>> remaining = standbyRecords.remove(partition); + if (remaining != null) { + remaining = task.update(partition, remaining); if (remaining != null) { - remaining = task.update(partition, remaining); - if (remaining != null) { - standbyRecords.put(partition, remaining); - } else { - restoreConsumer.resume(partition); - } + standbyRecords.put(partition, remaining); + } else { + restoreConsumer.resume(partition); } } } @@ -349,13 +351,12 @@ public class StreamThread extends Thread { ConsumerRecords<byte[], byte[]> records = restoreConsumer.poll(0); if (!records.isEmpty()) { - for (StandbyTask task : standbyTasks.values()) { - for (TopicPartition partition : task.changeLogPartitions()) { - List<ConsumerRecord<byte[], byte[]>> remaining = task.update(partition, records.records(partition)); - if (remaining != null) { - restoreConsumer.pause(partition); - standbyRecords.put(partition, remaining); - } + for (TopicPartition partition : records.partitions()) { + StandbyTask task = standbyTasksByPartition.get(partition); + List<ConsumerRecord<byte[], byte[]>> remaining = task.update(partition, records.records(partition)); + if (remaining != null) { + restoreConsumer.pause(partition); + standbyRecords.put(partition, remaining); } } } @@ -547,9 +548,16 @@ public class StreamThread extends Thread { } // create the active tasks - for (TaskId taskId : partitionsForTask.keySet()) { + for (Map.Entry<TaskId, Set<TopicPartition>> entry : partitionsForTask.entrySet()) { + TaskId taskId = entry.getKey(); + Set<TopicPartition> partitions = entry.getValue(); + try { - activeTasks.put(taskId, createStreamTask(taskId, partitionsForTask.get(taskId))); + StreamTask task = createStreamTask(taskId, partitions); + activeTasks.put(taskId, task); + + for (TopicPartition partition : partitions) + activeTasksByPartition.put(partition, task); } catch (Exception e) { log.error("Failed to create an active task #" + taskId + " in thread [" + this.getName() + "]: ", e); throw e; @@ -567,6 +575,7 @@ public class StreamThread extends Thread { prevTasks.addAll(activeTasks.keySet()); activeTasks.clear(); + activeTasksByPartition.clear(); } private void closeOne(AbstractTask task) { @@ -605,6 +614,9 @@ public class StreamThread extends Thread { StandbyTask task = createStandbyTask(taskId, partitions); if (task != null) { standbyTasks.put(taskId, task); + for (TopicPartition partition : partitions) { + standbyTasksByPartition.put(partition, task); + } // collect checked pointed offsets to position the restore consumer // this include all partitions from which we restore states checkpointedOffsets.putAll(task.checkpointedOffsets()); @@ -633,6 +645,7 @@ public class StreamThread extends Thread { restoreConsumer.assign(Collections.<TopicPartition>emptyList()); standbyTasks.clear(); + standbyTasksByPartition.clear(); } private void ensureCopartitioning(Collection<Set<String>> copartitionGroups) {