Repository: samza Updated Branches: refs/heads/master 4baaddbbb -> 531b35e9f
SAMZA-1926: Fix standalone configurations in configuration table. Changes: * Fix the default value of debounce time configuration. * Remove the `coordination.utils.factory` configuration from the table(Infer that based upon job.coordinator.factory configuration). Remove the definition of `coordination.utils.factory` from the configuration in unit tests. * Default the configuration `job.coordinator.factory` to `ZkJobCoordinatorFactory` if it is not defined by the user. Author: Shanthoosh Venkataraman <spven...@usc.edu> Reviewers: Jagadish<jagad...@apache.org>, Prateek M<pmahe...@linkedin.com> Closes #675 from shanthoosh/master Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/531b35e9 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/531b35e9 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/531b35e9 Branch: refs/heads/master Commit: 531b35e9f0f2a26933bb445f2b5a1301580093de Parents: 4baaddb Author: Shanthoosh Venkataraman <spven...@usc.edu> Authored: Fri Oct 5 14:30:42 2018 -0700 Committer: Jagadish <jvenkatra...@linkedin.com> Committed: Fri Oct 5 14:30:42 2018 -0700 ---------------------------------------------------------------------- .../versioned/jobs/configuration-table.html | 22 ++------ .../samza/config/JobCoordinatorConfig.java | 34 ++++++++---- .../samza/config/TestJobCoordinatorConfig.java | 58 -------------------- .../samza/sql/testutil/SamzaSqlTestConfig.java | 1 - .../apache/samza/test/framework/TestRunner.java | 3 - .../EndOfStreamIntegrationTest.java | 2 - .../WatermarkIntegrationTest.java | 2 - .../samza/test/framework/SchedulingTest.java | 1 - .../operator/TestRepartitionJoinWindowApp.java | 3 - .../test/operator/TestRepartitionWindowApp.java | 1 - .../apache/samza/test/table/TestLocalTable.java | 2 - 11 files changed, 27 insertions(+), 102 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/531b35e9/docs/learn/documentation/versioned/jobs/configuration-table.html ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html index 26b4661..35ddcab 100644 --- a/docs/learn/documentation/versioned/jobs/configuration-table.html +++ b/docs/learn/documentation/versioned/jobs/configuration-table.html @@ -448,9 +448,11 @@ </tr> <tr> <td class="property" id="job.coordinator.factory">job.coordinator.factory</td> - <td class="default"></td> + <td class="default">org.apache.samza.zk.ZkJobCoordinatorFactory</td> <td class="description"> - Class to use for job coordination. Currently available values are: + The fully-qualified name of the Java class which determines the factory class which will build the JobCoordinator. + The user can specify a custom implementation of the JobCoordinatorFactory where a custom logic is implemented for distributed coordination of stream processors. <br> + Samza supports the following coordination modes out of the box. <dl> <dt><code>org.apache.samza.standalone.PassthroughJobCoordinatorFactory</code></dt> <dd>Fixed partition mapping. No Zoookeeper. </dd> @@ -461,20 +463,6 @@ Required only for non-cluster-managed applications. Please see the required value for <a href=#task-name-grouper-factory>task-name-grouper-factory </a> </td> </tr> - <tr> - <td class="property" id="job.coordination.utils.factory">job.coordination.utils.factory</td> - <td class="default">org.apache.samza.zk.ZkCoordinationUtilsFactory</td> - <td class="description"> - Class to use to create CoordinationUtils. Currently available values are: - <dl> - <dt><code>org.apache.samza.zk.ZkCoordinationUtilsFactory</code></dt> - <dd>ZooKeeper based coordination utils.</dd> - <dt><code>org.apache.samza.coordinator.AzureCoordinationUtilsFactory</code></dt> - <dd>Azure based coordination utils.</dd> - These coordination utils are currently used for intermediate stream creation. - </dl> - </td> - </tr> <tr> <td class="property" id="job.logged.store.base.dir">job.logged.store.base.dir</td> @@ -539,7 +527,7 @@ </tr> <tr> <td class="property" id="job.debounce.time.ms">job.debounce.time.ms</td> - <td class="default"> 2000 </td> + <td class="default"> 20000 </td> <td class="description"> How long the Leader processor will wait before recalculating the JobModel on change of registered processors. </td> http://git-wip-us.apache.org/repos/asf/samza/blob/531b35e9/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java b/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java index 2322727..60c43c3 100644 --- a/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java @@ -22,33 +22,44 @@ package org.apache.samza.config; import com.google.common.base.Strings; import org.apache.samza.SamzaException; import org.apache.samza.coordinator.CoordinationUtilsFactory; +import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory; +import org.apache.samza.standalone.PassthroughJobCoordinatorFactory; import org.apache.samza.util.Util; import org.apache.samza.zk.ZkCoordinationUtilsFactory; +import org.apache.samza.zk.ZkJobCoordinatorFactory; public class JobCoordinatorConfig extends MapConfig { public static final String JOB_COORDINATOR_FACTORY = "job.coordinator.factory"; - public static final String JOB_COORDINATION_UTILS_FACTORY = "job.coordination.utils.factory"; - public final static String DEFAULT_COORDINATION_UTILS_FACTORY = ZkCoordinationUtilsFactory.class.getName(); + public final static String DEFAULT_COORDINATOR_FACTORY = ZkJobCoordinatorFactory.class.getName(); + private static final String AZURE_COORDINATION_UTILS_FACTORY = "org.apache.samza.coordinator.AzureCoordinationUtilsFactory"; + private static final String AZURE_COORDINATOR_FACTORY = "org.apache.samza.coordinator.AzureJobCoordinatorFactory"; public JobCoordinatorConfig(Config config) { super(config); } public String getJobCoordinationUtilsFactoryClassName() { - String className = get(JOB_COORDINATION_UTILS_FACTORY, DEFAULT_COORDINATION_UTILS_FACTORY); + String coordinatorFactory = get(JOB_COORDINATOR_FACTORY, DEFAULT_COORDINATOR_FACTORY); - if (Strings.isNullOrEmpty(className)) { - throw new SamzaException("Empty config for " + JOB_COORDINATION_UTILS_FACTORY + " = " + className); + String coordinationUtilsFactory; + if (AZURE_COORDINATOR_FACTORY.equals(coordinatorFactory)) { + coordinationUtilsFactory = AZURE_COORDINATION_UTILS_FACTORY; + } else if (PassthroughJobCoordinatorFactory.class.getName().equals(coordinatorFactory)) { + coordinationUtilsFactory = PassthroughCoordinationUtilsFactory.class.getName(); + } else if (ZkJobCoordinatorFactory.class.getName().equals(coordinatorFactory)) { + coordinationUtilsFactory = ZkCoordinationUtilsFactory.class.getName(); + } else { + throw new SamzaException(String.format("Coordination factory: %s defined by the config: %s is invalid.", coordinatorFactory, JOB_COORDINATOR_FACTORY)); } try { - Class.forName(className); + Class.forName(coordinationUtilsFactory); } catch (ClassNotFoundException e) { throw new SamzaException( - "Failed to validate config value for " + JOB_COORDINATION_UTILS_FACTORY + " = " + className, e); + "Failed to validate config value for " + JOB_COORDINATOR_FACTORY + " = " + coordinationUtilsFactory, e); } - return className; + return coordinationUtilsFactory; } public CoordinationUtilsFactory getCoordinationUtilsFactory() { @@ -61,10 +72,9 @@ public class JobCoordinatorConfig extends MapConfig { public String getJobCoordinatorFactoryClassName() { String jobCoordinatorFactoryClassName = get(JOB_COORDINATOR_FACTORY); if (Strings.isNullOrEmpty(jobCoordinatorFactoryClassName)) { - throw new ConfigException( - String.format("Missing config - %s. Cannot start StreamProcessor!", JOB_COORDINATOR_FACTORY)); + return ZkJobCoordinatorFactory.class.getName(); + } else { + return jobCoordinatorFactoryClassName; } - - return jobCoordinatorFactoryClassName; } } http://git-wip-us.apache.org/repos/asf/samza/blob/531b35e9/samza-core/src/test/scala/org/apache/samza/config/TestJobCoordinatorConfig.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/config/TestJobCoordinatorConfig.java b/samza-core/src/test/scala/org/apache/samza/config/TestJobCoordinatorConfig.java deleted file mode 100644 index 2ef92b5..0000000 --- a/samza-core/src/test/scala/org/apache/samza/config/TestJobCoordinatorConfig.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.config; - -import java.util.HashMap; -import java.util.Map; -import junit.framework.Assert; -import org.apache.samza.SamzaException; -import org.apache.samza.zk.ZkCoordinationUtilsFactory; -import org.junit.Test; - - -public class TestJobCoordinatorConfig { - - private final static String NONEXISTING_FACTORY_CLASS = "AnotherFactory"; - private final static String ANOTHER_FACTORY_CLASS = TestJobCoordinatorConfig.class.getName(); // any valid name - - @Test - public void testJobCoordinationUtilsFactoryConfig() { - - Map<String, String> map = new HashMap<>(); - JobCoordinatorConfig jConfig = new JobCoordinatorConfig(new MapConfig(map)); - - // test default value - Assert.assertEquals(ZkCoordinationUtilsFactory.class.getName(), jConfig.getJobCoordinationUtilsFactoryClassName()); - - map.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, ANOTHER_FACTORY_CLASS); - jConfig = new JobCoordinatorConfig(new MapConfig(map)); - Assert.assertEquals(ANOTHER_FACTORY_CLASS, jConfig.getJobCoordinationUtilsFactoryClassName()); - - // failure case - map.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, NONEXISTING_FACTORY_CLASS); - jConfig = new JobCoordinatorConfig(new MapConfig(map)); - try { - jConfig.getJobCoordinationUtilsFactoryClassName(); - Assert.fail("Failed to validate loading of fake class: " + NONEXISTING_FACTORY_CLASS); - } catch (SamzaException e) { - // expected - } - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/531b35e9/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java index a96fd08..97168d8 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java @@ -74,7 +74,6 @@ public class SamzaSqlTestConfig { staticConfigs.put(JobConfig.JOB_NAME(), "sql-job"); staticConfigs.put(JobConfig.PROCESSOR_ID(), "1"); staticConfigs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName()); - staticConfigs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName()); staticConfigs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName()); staticConfigs.put(SamzaSqlApplicationConfig.CFG_IO_RESOLVER, "config"); http://git-wip-us.apache.org/repos/asf/samza/blob/531b35e9/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java index a1103dd..add3bf6 100644 --- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java +++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java @@ -44,7 +44,6 @@ import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory; import org.apache.samza.job.ApplicationStatus; import org.apache.samza.operators.KV; import org.apache.samza.runtime.LocalApplicationRunner; -import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory; import org.apache.samza.standalone.PassthroughJobCoordinatorFactory; import org.apache.samza.system.EndOfStreamMessage; import org.apache.samza.system.IncomingMessageEnvelope; @@ -71,7 +70,6 @@ import org.junit.Assert; * * The following configs are set by default * <ol> - * <li>"job.coordination.utils.factory" = {@link PassthroughCoordinationUtilsFactory}</li> * <li>"job.coordination.factory" = {@link PassthroughJobCoordinatorFactory}</li> * <li>"task.name.grouper.factory" = {@link SingleContainerGrouperFactory}</li> * <li>"job.name" = "test-samza"</li> @@ -98,7 +96,6 @@ public class TestRunner { this.inMemoryScope = RandomStringUtils.random(10, true, true); configs.put(JobConfig.JOB_NAME(), JOB_NAME); configs.put(JobConfig.PROCESSOR_ID(), "1"); - configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName()); configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName()); configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName()); addConfig(JobConfig.JOB_DEFAULT_SYSTEM(), JOB_DEFAULT_SYSTEM); http://git-wip-us.apache.org/repos/asf/samza/blob/531b35e9/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java index abdba01..4c8884d 100644 --- a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java @@ -38,7 +38,6 @@ import org.apache.samza.runtime.ApplicationRunner; import org.apache.samza.runtime.ApplicationRunners; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.NoOpSerde; -import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory; import org.apache.samza.standalone.PassthroughJobCoordinatorFactory; import org.apache.samza.test.controlmessages.TestData.PageView; import org.apache.samza.test.controlmessages.TestData.PageViewJsonSerdeFactory; @@ -80,7 +79,6 @@ public class EndOfStreamIntegrationTest extends AbstractIntegrationTestHarness { configs.put(JobConfig.JOB_NAME(), "test-eos-job"); configs.put(JobConfig.PROCESSOR_ID(), "1"); - configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName()); configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName()); configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName()); http://git-wip-us.apache.org/repos/asf/samza/blob/531b35e9/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java index 3c86a37..e0097bd 100644 --- a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java @@ -58,7 +58,6 @@ import org.apache.samza.serializers.IntegerSerdeFactory; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.serializers.StringSerdeFactory; -import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory; import org.apache.samza.standalone.PassthroughJobCoordinatorFactory; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.SystemAdmin; @@ -133,7 +132,6 @@ public class WatermarkIntegrationTest extends AbstractIntegrationTestHarness { configs.put(JobConfig.JOB_NAME(), "test-watermark-job"); configs.put(JobConfig.PROCESSOR_ID(), "1"); - configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName()); configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName()); configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName()); http://git-wip-us.apache.org/repos/asf/samza/blob/531b35e9/samza-test/src/test/java/org/apache/samza/test/framework/SchedulingTest.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/SchedulingTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/SchedulingTest.java index 658492a..7e89fa9 100644 --- a/samza-test/src/test/java/org/apache/samza/test/framework/SchedulingTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/framework/SchedulingTest.java @@ -52,7 +52,6 @@ public class SchedulingTest extends StreamApplicationIntegrationTestHarness { configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory"); configs.put("job.systemstreampartition.grouper.factory", "org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory"); configs.put("task.name.grouper.factory", "org.apache.samza.container.grouper.task.SingleContainerGrouperFactory"); - configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, "org.apache.samza.standalone.PassthroughCoordinationUtilsFactory"); configs.put(JobConfig.PROCESSOR_ID(), "0"); runApplication(new TestSchedulingApp(), "SchedulingTest", configs); http://git-wip-us.apache.org/repos/asf/samza/blob/531b35e9/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java index 144f125..340f0e7 100644 --- a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java +++ b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java @@ -82,7 +82,6 @@ public class TestRepartitionJoinWindowApp extends StreamApplicationIntegrationTe String appName = "UserPageAdClickCounter"; Map<String, String> configs = new HashMap<>(); configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory"); - configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, "org.apache.samza.standalone.PassthroughCoordinationUtilsFactory"); configs.put(JobConfig.PROCESSOR_ID(), "0"); configs.put(TaskConfig.GROUPER_FACTORY(), "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory"); configs.put("systems.kafka.samza.delete.committed.messages", "false"); @@ -112,7 +111,6 @@ public class TestRepartitionJoinWindowApp extends StreamApplicationIntegrationTe final String appName = "UserPageAdClickCounter2"; Map<String, String> configs = new HashMap<>(); configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory"); - configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, "org.apache.samza.standalone.PassthroughCoordinationUtilsFactory"); configs.put(JobConfig.PROCESSOR_ID(), "0"); configs.put(TaskConfig.GROUPER_FACTORY(), "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory"); configs.put("systems.kafka.samza.delete.committed.messages", "true"); @@ -160,7 +158,6 @@ public class TestRepartitionJoinWindowApp extends StreamApplicationIntegrationTe String outputTopicName = "user-ad-click-counts"; Map<String, String> configs = new HashMap<>(); configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory"); - configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, "org.apache.samza.standalone.PassthroughCoordinationUtilsFactory"); configs.put(JobConfig.PROCESSOR_ID(), "0"); configs.put(TaskConfig.GROUPER_FACTORY(), "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory"); configs.put(BroadcastAssertApp.INPUT_TOPIC_NAME_PROP, inputTopicName1); http://git-wip-us.apache.org/repos/asf/samza/blob/531b35e9/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java index 2e1de96..2f08fed 100644 --- a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java +++ b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java @@ -61,7 +61,6 @@ public class TestRepartitionWindowApp extends StreamApplicationIntegrationTestHa Map<String, String> configs = new HashMap<>(); configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory"); - configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, "org.apache.samza.standalone.PassthroughCoordinationUtilsFactory"); configs.put(JobConfig.PROCESSOR_ID(), "0"); configs.put(TaskConfig.GROUPER_FACTORY(), "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory"); http://git-wip-us.apache.org/repos/asf/samza/blob/531b35e9/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java index 4d0d83a..da8af9e 100644 --- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java +++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java @@ -48,7 +48,6 @@ import org.apache.samza.runtime.LocalApplicationRunner; import org.apache.samza.serializers.IntegerSerde; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.NoOpSerde; -import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory; import org.apache.samza.standalone.PassthroughJobCoordinatorFactory; import org.apache.samza.storage.kv.Entry; import org.apache.samza.storage.kv.KeyValueStore; @@ -308,7 +307,6 @@ public class TestLocalTable extends AbstractIntegrationTestHarness { configs.put(JobConfig.JOB_NAME(), "test-table-job"); configs.put(JobConfig.PROCESSOR_ID(), "1"); - configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName()); configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName()); configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());