Repository: kafka
Updated Branches:
  refs/heads/trunk 6e42ce8fe -> b62804a25


MINOR: Remove unused constructor param from ProcessorStateManager

Remove applicationId parameter as it is no longer used.

Author: Damian Guy <damian....@gmail.com>

Reviewers: Guozhang Wang <wangg...@gmail.com>

Closes #2385 from dguy/minor-remove-unused-param


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b62804a2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b62804a2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b62804a2

Branch: refs/heads/trunk
Commit: b62804a252b96932952006a483b3afd7da7363ab
Parents: 6e42ce8
Author: Damian Guy <damian....@gmail.com>
Authored: Mon Jan 16 11:40:47 2017 -0800
Committer: Guozhang Wang <wangg...@gmail.com>
Committed: Mon Jan 16 11:40:47 2017 -0800

----------------------------------------------------------------------
 .../streams/processor/internals/AbstractTask.java   |  2 +-
 .../processor/internals/ProcessorStateManager.java  |  3 +--
 .../internals/ProcessorStateManagerTest.java        | 16 ++++++++--------
 3 files changed, 10 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b62804a2/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 0730c68..55418d5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -70,7 +70,7 @@ public abstract class AbstractTask {
 
         // create the processor state manager
         try {
-            this.stateMgr = new ProcessorStateManager(applicationId, id, 
partitions, restoreConsumer, isStandby, stateDirectory, 
topology.storeToChangelogTopic());
+            this.stateMgr = new ProcessorStateManager(id, partitions, 
restoreConsumer, isStandby, stateDirectory, topology.storeToChangelogTopic());
 
         } catch (IOException e) {
             throw new ProcessorStateException(String.format("task [%s] Error 
while creating the state manager", id), e);

http://git-wip-us.apache.org/repos/asf/kafka/blob/b62804a2/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index a21c3e8..ad16c77 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -74,8 +74,7 @@ public class ProcessorStateManager implements StateManager {
      *                       (this might be recoverable by retrying)
      * @throws IOException if any severe error happens while creating or 
locking the state directory
      */
-    public ProcessorStateManager(final String applicationId,
-                                 final TaskId taskId,
+    public ProcessorStateManager(final TaskId taskId,
                                  final Collection<TopicPartition> sources,
                                  final Consumer<byte[], byte[]> 
restoreConsumer,
                                  final boolean isStandby,

http://git-wip-us.apache.org/repos/asf/kafka/blob/b62804a2/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index de54723..602601a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -212,7 +212,7 @@ public class ProcessorStateManagerTest {
     public void testNoTopic() throws IOException {
         MockStateStoreSupplier.MockStateStore mockStateStore = new 
MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
 
-        ProcessorStateManager stateMgr = new 
ProcessorStateManager(applicationId, new TaskId(0, 1), noPartitions, new 
MockRestoreConsumer(), false, stateDirectory, new HashMap<String, String>() {
+        ProcessorStateManager stateMgr = new ProcessorStateManager(new 
TaskId(0, 1), noPartitions, new MockRestoreConsumer(), false, stateDirectory, 
new HashMap<String, String>() {
             {
                 put(nonPersistentStoreName, nonPersistentStoreName);
             }
@@ -244,7 +244,7 @@ public class ProcessorStateManagerTest {
 
         MockStateStoreSupplier.MockStateStore persistentStore = new 
MockStateStoreSupplier.MockStateStore("persistentStore", true); // persistent 
store
 
-        ProcessorStateManager stateMgr = new 
ProcessorStateManager(applicationId, taskId, noPartitions, restoreConsumer, 
false, stateDirectory, new HashMap<String, String>() {
+        ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, 
noPartitions, restoreConsumer, false, stateDirectory, new HashMap<String, 
String>() {
             {
                 put(persistentStoreName, persistentStoreTopicName);
                 put(nonPersistentStoreName, nonPersistentStoreName);
@@ -298,7 +298,7 @@ public class ProcessorStateManagerTest {
 
         MockStateStoreSupplier.MockStateStore nonPersistentStore = new 
MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); // non 
persistent store
 
-        ProcessorStateManager stateMgr = new 
ProcessorStateManager(applicationId, new TaskId(0, 2), noPartitions, 
restoreConsumer, false, stateDirectory, new HashMap<String, String>() {
+        ProcessorStateManager stateMgr = new ProcessorStateManager(new 
TaskId(0, 2), noPartitions, restoreConsumer, false, stateDirectory, new 
HashMap<String, String>() {
             {
                 put(persistentStoreName, persistentStoreTopicName);
                 put(nonPersistentStoreName, nonPersistentStoreTopicName);
@@ -381,7 +381,7 @@ public class ProcessorStateManagerTest {
         // if there is an source partition, inherit the partition id
         Set<TopicPartition> sourcePartitions = Utils.mkSet(new 
TopicPartition(storeTopicName3, 1));
 
-        ProcessorStateManager stateMgr = new 
ProcessorStateManager(applicationId, taskId, sourcePartitions, restoreConsumer, 
true, stateDirectory, storeToChangelogTopic); // standby
+        ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, 
sourcePartitions, restoreConsumer, true, stateDirectory, 
storeToChangelogTopic); // standby
         try {
             restoreConsumer.reset();
 
@@ -415,7 +415,7 @@ public class ProcessorStateManagerTest {
 
         MockStateStoreSupplier.MockStateStore mockStateStore = new 
MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
 
-        ProcessorStateManager stateMgr = new 
ProcessorStateManager(applicationId, new TaskId(0, 1), noPartitions, 
restoreConsumer, false, stateDirectory, Collections.<String, String>emptyMap());
+        ProcessorStateManager stateMgr = new ProcessorStateManager(new 
TaskId(0, 1), noPartitions, restoreConsumer, false, stateDirectory, 
Collections.<String, String>emptyMap());
         try {
             stateMgr.register(mockStateStore, true, 
mockStateStore.stateRestoreCallback);
 
@@ -453,7 +453,7 @@ public class ProcessorStateManagerTest {
         MockStateStoreSupplier.MockStateStore persistentStore = new 
MockStateStoreSupplier.MockStateStore(persistentStoreName, true);
         MockStateStoreSupplier.MockStateStore nonPersistentStore = new 
MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
 
-        ProcessorStateManager stateMgr = new 
ProcessorStateManager(applicationId, taskId, noPartitions, restoreConsumer, 
false, stateDirectory, new HashMap<String, String>() {
+        ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, 
noPartitions, restoreConsumer, false, stateDirectory, new HashMap<String, 
String>() {
             {
                 put(persistentStoreName, persistentStoreTopicName);
                 put(nonPersistentStoreName, nonPersistentStoreTopicName);
@@ -491,7 +491,7 @@ public class ProcessorStateManagerTest {
     @Test
     public void shouldRegisterStoreWithoutLoggingEnabledAndNotBackedByATopic() 
throws Exception {
         MockStateStoreSupplier.MockStateStore mockStateStore = new 
MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
-        ProcessorStateManager stateMgr = new 
ProcessorStateManager(applicationId, new TaskId(0, 1), noPartitions, new 
MockRestoreConsumer(), false, stateDirectory, Collections.<String, 
String>emptyMap());
+        ProcessorStateManager stateMgr = new ProcessorStateManager(new 
TaskId(0, 1), noPartitions, new MockRestoreConsumer(), false, stateDirectory, 
Collections.<String, String>emptyMap());
         stateMgr.register(mockStateStore, false, 
mockStateStore.stateRestoreCallback);
         assertNotNull(stateMgr.getStore(nonPersistentStoreName));
     }
@@ -512,7 +512,7 @@ public class ProcessorStateManagerTest {
 
 
         final MockStateStoreSupplier.MockStateStore persistentStore = new 
MockStateStoreSupplier.MockStateStore(persistentStoreName, true);
-        final ProcessorStateManager stateMgr = new 
ProcessorStateManager(applicationId, taskId, noPartitions, restoreConsumer, 
false, stateDirectory, Collections.<String, String>emptyMap());
+        final ProcessorStateManager stateMgr = new 
ProcessorStateManager(taskId, noPartitions, restoreConsumer, false, 
stateDirectory, Collections.<String, String>emptyMap());
 
         restoreConsumer.reset();
         stateMgr.register(persistentStore, true, 
persistentStore.stateRestoreCallback);

Reply via email to