Repository: kafka
Updated Branches:
  refs/heads/trunk 4ad165c07 -> 841d2d1a2


MINOR: StreamThread performance optimization

guozhangwang

Author: Yasuhiro Matsuda <yasuh...@confluent.io>

Reviewers: Guozhang Wang

Closes #680 from ymatsuda/perf


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/841d2d1a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/841d2d1a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/841d2d1a

Branch: refs/heads/trunk
Commit: 841d2d1a26af94ec95c480dbf2453f9c7d28c2f7
Parents: 4ad165c
Author: Yasuhiro Matsuda <yasuh...@confluent.io>
Authored: Tue Dec 15 23:53:23 2015 -0800
Committer: Guozhang Wang <wangg...@gmail.com>
Committed: Tue Dec 15 23:53:23 2015 -0800

----------------------------------------------------------------------
 .../processor/internals/StreamThread.java       | 63 ++++++++++++--------
 1 file changed, 38 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/841d2d1a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 37d24bb..578357a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -84,6 +84,8 @@ public class StreamThread extends Thread {
     private final AtomicBoolean running;
     private final Map<TaskId, StreamTask> activeTasks;
     private final Map<TaskId, StandbyTask> standbyTasks;
+    private final Map<TopicPartition, StreamTask> activeTasksByPartition;
+    private final Map<TopicPartition, StandbyTask> standbyTasksByPartition;
     private final Set<TaskId> prevTasks;
     private final Time time;
     private final File stateDir;
@@ -157,6 +159,8 @@ public class StreamThread extends Thread {
         // initialize the task list
         this.activeTasks = new HashMap<>();
         this.standbyTasks = new HashMap<>();
+        this.activeTasksByPartition = new HashMap<>();
+        this.standbyTasksByPartition = new HashMap<>();
         this.prevTasks = new HashSet<>();
 
         // standby ktables
@@ -261,7 +265,7 @@ public class StreamThread extends Thread {
             removeStreamTasks();
             removeStandbyTasks();
         } catch (Throwable e) {
-            // already logged in removePartition()
+            // already logged in removeStreamTasks() and removeStandbyTasks()
         }
 
         log.info("Stream thread shutdown complete [" + this.getName() + "]");
@@ -279,15 +283,16 @@ public class StreamThread extends Thread {
             while (stillRunning()) {
                 // try to fetch some records if necessary
                 if (requiresPoll) {
+                    requiresPoll = false;
+
                     long startPoll = time.milliseconds();
 
                     ConsumerRecords<byte[], byte[]> records = 
consumer.poll(totalNumBuffered == 0 ? this.pollTimeMs : 0);
 
                     if (!records.isEmpty()) {
-                        for (StreamTask task : activeTasks.values()) {
-                            for (TopicPartition partition : task.partitions()) 
{
-                                task.addRecords(partition, 
records.records(partition));
-                            }
+                        for (TopicPartition partition : records.partitions()) {
+                            StreamTask task = 
activeTasksByPartition.get(partition);
+                            task.addRecords(partition, 
records.records(partition));
                         }
                     }
 
@@ -299,8 +304,6 @@ public class StreamThread extends Thread {
 
                 if (!activeTasks.isEmpty()) {
                     // try to process one record from each task
-                    requiresPoll = false;
-
                     for (StreamTask task : activeTasks.values()) {
                         long startProcess = time.milliseconds();
 
@@ -329,16 +332,15 @@ public class StreamThread extends Thread {
         if (!standbyTasks.isEmpty()) {
             if (processStandbyRecords) {
                 if (!standbyRecords.isEmpty()) {
-                    for (StandbyTask task : standbyTasks.values()) {
-                        for (TopicPartition partition : 
task.changeLogPartitions()) {
-                            List<ConsumerRecord<byte[], byte[]>> remaining = 
standbyRecords.remove(partition);
+                    for (TopicPartition partition : standbyRecords.keySet()) {
+                        StandbyTask task = 
standbyTasksByPartition.get(partition);
+                        List<ConsumerRecord<byte[], byte[]>> remaining = 
standbyRecords.remove(partition);
+                        if (remaining != null) {
+                            remaining = task.update(partition, remaining);
                             if (remaining != null) {
-                                remaining = task.update(partition, remaining);
-                                if (remaining != null) {
-                                    standbyRecords.put(partition, remaining);
-                                } else {
-                                    restoreConsumer.resume(partition);
-                                }
+                                standbyRecords.put(partition, remaining);
+                            } else {
+                                restoreConsumer.resume(partition);
                             }
                         }
                     }
@@ -349,13 +351,12 @@ public class StreamThread extends Thread {
             ConsumerRecords<byte[], byte[]> records = restoreConsumer.poll(0);
 
             if (!records.isEmpty()) {
-                for (StandbyTask task : standbyTasks.values()) {
-                    for (TopicPartition partition : 
task.changeLogPartitions()) {
-                        List<ConsumerRecord<byte[], byte[]>> remaining = 
task.update(partition, records.records(partition));
-                        if (remaining != null) {
-                            restoreConsumer.pause(partition);
-                            standbyRecords.put(partition, remaining);
-                        }
+                for (TopicPartition partition : records.partitions()) {
+                    StandbyTask task = standbyTasksByPartition.get(partition);
+                    List<ConsumerRecord<byte[], byte[]>> remaining = 
task.update(partition, records.records(partition));
+                    if (remaining != null) {
+                        restoreConsumer.pause(partition);
+                        standbyRecords.put(partition, remaining);
                     }
                 }
             }
@@ -547,9 +548,16 @@ public class StreamThread extends Thread {
         }
 
         // create the active tasks
-        for (TaskId taskId : partitionsForTask.keySet()) {
+        for (Map.Entry<TaskId, Set<TopicPartition>> entry : 
partitionsForTask.entrySet()) {
+            TaskId taskId = entry.getKey();
+            Set<TopicPartition> partitions = entry.getValue();
+
             try {
-                activeTasks.put(taskId, createStreamTask(taskId, 
partitionsForTask.get(taskId)));
+                StreamTask task = createStreamTask(taskId, partitions);
+                activeTasks.put(taskId, task);
+
+                for (TopicPartition partition : partitions)
+                    activeTasksByPartition.put(partition, task);
             } catch (Exception e) {
                 log.error("Failed to create an active task #" + taskId + " in 
thread [" + this.getName() + "]: ", e);
                 throw e;
@@ -567,6 +575,7 @@ public class StreamThread extends Thread {
         prevTasks.addAll(activeTasks.keySet());
 
         activeTasks.clear();
+        activeTasksByPartition.clear();
     }
 
     private void closeOne(AbstractTask task) {
@@ -605,6 +614,9 @@ public class StreamThread extends Thread {
             StandbyTask task = createStandbyTask(taskId, partitions);
             if (task != null) {
                 standbyTasks.put(taskId, task);
+                for (TopicPartition partition : partitions) {
+                    standbyTasksByPartition.put(partition, task);
+                }
                 // collect checked pointed offsets to position the restore 
consumer
                 // this include all partitions from which we restore states
                 checkpointedOffsets.putAll(task.checkpointedOffsets());
@@ -633,6 +645,7 @@ public class StreamThread extends Thread {
         restoreConsumer.assign(Collections.<TopicPartition>emptyList());
 
         standbyTasks.clear();
+        standbyTasksByPartition.clear();
     }
 
     private void ensureCopartitioning(Collection<Set<String>> 
copartitionGroups) {

Reply via email to