Repository: kafka Updated Branches: refs/heads/trunk 6e42ce8fe -> b62804a25
MINOR: Remove unused constructor param from ProcessorStateManager Remove applicationId parameter as it is no longer used. Author: Damian Guy <damian....@gmail.com> Reviewers: Guozhang Wang <wangg...@gmail.com> Closes #2385 from dguy/minor-remove-unused-param Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b62804a2 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b62804a2 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b62804a2 Branch: refs/heads/trunk Commit: b62804a252b96932952006a483b3afd7da7363ab Parents: 6e42ce8 Author: Damian Guy <damian....@gmail.com> Authored: Mon Jan 16 11:40:47 2017 -0800 Committer: Guozhang Wang <wangg...@gmail.com> Committed: Mon Jan 16 11:40:47 2017 -0800 ---------------------------------------------------------------------- .../streams/processor/internals/AbstractTask.java | 2 +- .../processor/internals/ProcessorStateManager.java | 3 +-- .../internals/ProcessorStateManagerTest.java | 16 ++++++++-------- 3 files changed, 10 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/b62804a2/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index 0730c68..55418d5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -70,7 +70,7 @@ public abstract class AbstractTask { // create the processor state manager try { - this.stateMgr = new ProcessorStateManager(applicationId, id, partitions, restoreConsumer, isStandby, stateDirectory, topology.storeToChangelogTopic()); + this.stateMgr = new ProcessorStateManager(id, partitions, restoreConsumer, isStandby, stateDirectory, topology.storeToChangelogTopic()); } catch (IOException e) { throw new ProcessorStateException(String.format("task [%s] Error while creating the state manager", id), e); http://git-wip-us.apache.org/repos/asf/kafka/blob/b62804a2/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index a21c3e8..ad16c77 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -74,8 +74,7 @@ public class ProcessorStateManager implements StateManager { * (this might be recoverable by retrying) * @throws IOException if any severe error happens while creating or locking the state directory */ - public ProcessorStateManager(final String applicationId, - final TaskId taskId, + public ProcessorStateManager(final TaskId taskId, final Collection<TopicPartition> sources, final Consumer<byte[], byte[]> restoreConsumer, final boolean isStandby, http://git-wip-us.apache.org/repos/asf/kafka/blob/b62804a2/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java index de54723..602601a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java @@ -212,7 +212,7 @@ public class ProcessorStateManagerTest { public void testNoTopic() throws IOException { MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); - ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 1), noPartitions, new MockRestoreConsumer(), false, stateDirectory, new HashMap<String, String>() { + ProcessorStateManager stateMgr = new ProcessorStateManager(new TaskId(0, 1), noPartitions, new MockRestoreConsumer(), false, stateDirectory, new HashMap<String, String>() { { put(nonPersistentStoreName, nonPersistentStoreName); } @@ -244,7 +244,7 @@ public class ProcessorStateManagerTest { MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore("persistentStore", true); // persistent store - ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, noPartitions, restoreConsumer, false, stateDirectory, new HashMap<String, String>() { + ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, noPartitions, restoreConsumer, false, stateDirectory, new HashMap<String, String>() { { put(persistentStoreName, persistentStoreTopicName); put(nonPersistentStoreName, nonPersistentStoreName); @@ -298,7 +298,7 @@ public class ProcessorStateManagerTest { MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); // non persistent store - ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 2), noPartitions, restoreConsumer, false, stateDirectory, new HashMap<String, String>() { + ProcessorStateManager stateMgr = new ProcessorStateManager(new TaskId(0, 2), noPartitions, restoreConsumer, false, stateDirectory, new HashMap<String, String>() { { put(persistentStoreName, persistentStoreTopicName); put(nonPersistentStoreName, nonPersistentStoreTopicName); @@ -381,7 +381,7 @@ public class ProcessorStateManagerTest { // if there is an source partition, inherit the partition id Set<TopicPartition> sourcePartitions = Utils.mkSet(new TopicPartition(storeTopicName3, 1)); - ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, sourcePartitions, restoreConsumer, true, stateDirectory, storeToChangelogTopic); // standby + ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, sourcePartitions, restoreConsumer, true, stateDirectory, storeToChangelogTopic); // standby try { restoreConsumer.reset(); @@ -415,7 +415,7 @@ public class ProcessorStateManagerTest { MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); - ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 1), noPartitions, restoreConsumer, false, stateDirectory, Collections.<String, String>emptyMap()); + ProcessorStateManager stateMgr = new ProcessorStateManager(new TaskId(0, 1), noPartitions, restoreConsumer, false, stateDirectory, Collections.<String, String>emptyMap()); try { stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback); @@ -453,7 +453,7 @@ public class ProcessorStateManagerTest { MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true); MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); - ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, noPartitions, restoreConsumer, false, stateDirectory, new HashMap<String, String>() { + ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, noPartitions, restoreConsumer, false, stateDirectory, new HashMap<String, String>() { { put(persistentStoreName, persistentStoreTopicName); put(nonPersistentStoreName, nonPersistentStoreTopicName); @@ -491,7 +491,7 @@ public class ProcessorStateManagerTest { @Test public void shouldRegisterStoreWithoutLoggingEnabledAndNotBackedByATopic() throws Exception { MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); - ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 1), noPartitions, new MockRestoreConsumer(), false, stateDirectory, Collections.<String, String>emptyMap()); + ProcessorStateManager stateMgr = new ProcessorStateManager(new TaskId(0, 1), noPartitions, new MockRestoreConsumer(), false, stateDirectory, Collections.<String, String>emptyMap()); stateMgr.register(mockStateStore, false, mockStateStore.stateRestoreCallback); assertNotNull(stateMgr.getStore(nonPersistentStoreName)); } @@ -512,7 +512,7 @@ public class ProcessorStateManagerTest { final MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true); - final ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, noPartitions, restoreConsumer, false, stateDirectory, Collections.<String, String>emptyMap()); + final ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, noPartitions, restoreConsumer, false, stateDirectory, Collections.<String, String>emptyMap()); restoreConsumer.reset(); stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);