Repository: samza Updated Branches: refs/heads/master bd9387b7f -> df56f2dc1
SAMZA-1835: Consolidate all processorId generation code. Currently, the processorId creation function createProcessorId() is repeated in three different implementation of `JobCoordinator` viz `ZkJobCoordinator`, `PassthroughJobCoordinator`, and `AzureJobCoordinator`. Here're the few problems that stems from this duplication. 1. `ProcessorId` is passed into the `MetricsReporterFactory` through the factory create method: `MetricsReporter getMetricsReporter(String name, String processorId, Config config);`. Custom `MetricsReporter` implementations currently use the processorId as a component in the generated metric names. Metrics reporters are instantiated from `LocalApplicationRunner` and`processorId` is currently passed in as null to `MetricsReporterFactory.getMetricsReporter`. This corrupts the generated metrics names. 2. `ZkJobCoordinator`, `ZkUtils`, `ZkLeaderElector` and different downstream components of `LocalApplicationRunner` currently instantiate and manage their private reporters, rather than the sharing common `MetricsRegistry` managed by `LocalApplicationRunner`. Since there is no common namespace and reporter shared by downstream components of `LocalApplicationRunner`, generating metrics dashboards for standalone is kind of a hassle. This PR is comprised of the following changes: 1. Moved the processorId generation to `LocalApplicationRunner` and injects the generated `processorId` to all the downstream layers. 2. Deprecated the getProcessorId API in `JobCoordinator` interface. 3. Add the `processorId` and `metricsRegistry` arguments to the `getJobCoordinator` method of `JobCoordinatorFactory` interface. 4. Fixed the unit tests and added unit tests for `LocalApplicationRunner.createProcessorId`. Author: Shanthoosh Venkataraman <svenk...@linkedin.com> Author: Shanthoosh Venkataraman <spven...@usc.edu> Author: svenkata <svenkatara...@linkedin.com> Reviewers: Jagadish<jagad...@apache.org> Closes #844 from shanthoosh/SAMZA-1835 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/df56f2dc Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/df56f2dc Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/df56f2dc Branch: refs/heads/master Commit: df56f2dc1ee1be9b2a3aa9894465e3f870451125 Parents: bd9387b Author: Shanthoosh Venkataraman <svenk...@linkedin.com> Authored: Thu Dec 6 18:11:38 2018 -0800 Committer: Jagadish <jvenkatra...@linkedin.com> Committed: Thu Dec 6 18:11:38 2018 -0800 ---------------------------------------------------------------------- .../samza/coordinator/AzureJobCoordinator.java | 25 ++--------- .../coordinator/AzureJobCoordinatorFactory.java | 5 ++- .../apache/samza/config/ApplicationConfig.java | 1 - .../samza/coordinator/JobCoordinator.java | 1 + .../coordinator/JobCoordinatorFactory.java | 11 +++-- .../apache/samza/processor/StreamProcessor.java | 45 ++++++++++++------- .../samza/runtime/LocalApplicationRunner.java | 32 +++++++++++-- .../standalone/PassthroughJobCoordinator.java | 25 ++--------- .../PassthroughJobCoordinatorFactory.java | 5 ++- .../org/apache/samza/zk/ZkJobCoordinator.java | 47 +------------------- .../samza/zk/ZkJobCoordinatorFactory.java | 6 +-- .../samza/processor/TestStreamProcessor.java | 14 +++--- .../samza/runtime/MockProcessorIdGenerator.java | 29 ++++++++++++ .../runtime/TestLocalApplicationRunner.java | 26 +++++++++++ .../apache/samza/zk/TestZkJobCoordinator.java | 10 ++--- .../processor/TestZkStreamProcessorBase.java | 5 ++- .../test/processor/TestStreamProcessor.java | 9 ++-- 17 files changed, 158 insertions(+), 138 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/df56f2dc/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java index 076ab54..ae4aba3 100644 --- a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java +++ b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java @@ -19,12 +19,9 @@ package org.apache.samza.coordinator; -import org.apache.commons.lang3.StringUtils; import org.apache.samza.AzureClient; -import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.AzureConfig; import org.apache.samza.config.Config; -import org.apache.samza.config.ConfigException; import org.apache.samza.config.JobConfig; import org.apache.samza.config.TaskConfig; import org.apache.samza.container.TaskName; @@ -42,7 +39,7 @@ import org.apache.samza.coordinator.scheduler.LivenessCheckScheduler; import org.apache.samza.coordinator.scheduler.RenewLeaseScheduler; import org.apache.samza.coordinator.scheduler.SchedulerStateChangeListener; import org.apache.samza.job.model.JobModel; -import org.apache.samza.runtime.ProcessorIdGenerator; +import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.system.StreamMetadataCache; import org.apache.samza.system.SystemAdmins; import org.apache.samza.system.SystemStream; @@ -101,10 +98,10 @@ public class AzureJobCoordinator implements JobCoordinator { * Creates an instance of Azure job coordinator, along with references to Azure leader elector, Azure Blob and Azure Table. * @param config User defined config */ - public AzureJobCoordinator(Config config) { + public AzureJobCoordinator(String processorId, Config config, MetricsRegistry metricsRegistry) { //TODO: Cleanup previous values in the table when barrier times out. + this.processorId = processorId; this.config = config; - processorId = createProcessorId(config); currentJMVersion = new AtomicReference<>(INITIAL_STATE); AzureConfig azureConfig = new AzureConfig(config); AzureClient client = new AzureClient(azureConfig.getAzureConnectionString()); @@ -473,22 +470,6 @@ public class AzureJobCoordinator implements JobCoordinator { } } - private String createProcessorId(Config config) { - // TODO: This check to be removed after 0.13+ - ApplicationConfig appConfig = new ApplicationConfig(config); - if (appConfig.getProcessorId() != null) { - return appConfig.getProcessorId(); - } else if (StringUtils.isNotBlank(appConfig.getAppProcessorIdGeneratorClass())) { - ProcessorIdGenerator idGenerator = - Util.getObj(appConfig.getAppProcessorIdGeneratorClass(), ProcessorIdGenerator.class); - return idGenerator.generateProcessorId(config); - } else { - throw new ConfigException(String - .format("Expected either %s or %s to be configured", ApplicationConfig.PROCESSOR_ID, - ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS)); - } - } - public class AzureLeaderElectorListener implements LeaderElectorListener { /** * Keep renewing the lease and do the required tasks as a leader. http://git-wip-us.apache.org/repos/asf/samza/blob/df56f2dc/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinatorFactory.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinatorFactory.java b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinatorFactory.java index 8b3d357..ff8925a 100644 --- a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinatorFactory.java +++ b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinatorFactory.java @@ -20,10 +20,11 @@ package org.apache.samza.coordinator; import org.apache.samza.config.Config; +import org.apache.samza.metrics.MetricsRegistry; public class AzureJobCoordinatorFactory implements JobCoordinatorFactory { @Override - public JobCoordinator getJobCoordinator(Config config) { - return new AzureJobCoordinator(config); + public JobCoordinator getJobCoordinator(String processorId, Config config, MetricsRegistry metricsRegistry) { + return new AzureJobCoordinator(processorId, config, metricsRegistry); } } http://git-wip-us.apache.org/repos/asf/samza/blob/df56f2dc/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java b/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java index 39facb6..804035e 100644 --- a/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java @@ -85,7 +85,6 @@ public class ApplicationConfig extends MapConfig { return String.format("app-%s-%s", getAppName(), getAppId()); } - @Deprecated public String getProcessorId() { return get(PROCESSOR_ID, null); } http://git-wip-us.apache.org/repos/asf/samza/blob/df56f2dc/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java index bd06039..cd10acb 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java @@ -70,6 +70,7 @@ public interface JobCoordinator { * * @return String representing a unique logical processor ID */ + @Deprecated String getProcessorId(); /** http://git-wip-us.apache.org/repos/asf/samza/blob/df56f2dc/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java index 83ebf52..8f3d96e 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java @@ -20,13 +20,16 @@ package org.apache.samza.coordinator; import org.apache.samza.annotation.InterfaceStability; import org.apache.samza.config.Config; +import org.apache.samza.metrics.MetricsRegistry; @InterfaceStability.Evolving public interface JobCoordinatorFactory { /** - * Return a new instance of {@link JobCoordinator} - * @param config Configs relevant for the JobCoordinator TODO: Separate JC related configs into a "JobCoordinatorConfig" - * @return {@link JobCoordinator} instance + * Returns a new instance of {@link JobCoordinator}. + * @param processorId a unique logical identifier assigned to the {@link org.apache.samza.processor.StreamProcessor}. + * @param config the configuration of the samza application. + * @param metricsRegistry used to publish the coordination specific metrics. + * @return the {@link JobCoordinator} instance. */ - JobCoordinator getJobCoordinator(Config config); + JobCoordinator getJobCoordinator(String processorId, Config config, MetricsRegistry metricsRegistry); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/df56f2dc/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java index 389dafd..2c0c0b7 100644 --- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java +++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java @@ -30,6 +30,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.apache.commons.lang3.StringUtils; import org.apache.samza.annotation.InterfaceStability; import org.apache.samza.config.Config; import org.apache.samza.config.JobCoordinatorConfig; @@ -46,6 +47,7 @@ import org.apache.samza.coordinator.JobCoordinator; import org.apache.samza.coordinator.JobCoordinatorFactory; import org.apache.samza.coordinator.JobCoordinatorListener; import org.apache.samza.job.model.JobModel; +import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.metrics.MetricsReporter; import org.apache.samza.runtime.ProcessorLifecycleListener; import org.apache.samza.task.TaskFactory; @@ -123,6 +125,7 @@ public class StreamProcessor { private final String processorId; private final ExecutorService containerExcecutorService; private final Object lock = new Object(); + private final MetricsRegistryMap metricsRegistry; private volatile Throwable containerException = null; @@ -163,25 +166,26 @@ public class StreamProcessor { JobCoordinatorListener jobCoordinatorListener = null; /** - * Same as {@link #StreamProcessor(Config, Map, TaskFactory, ProcessorLifecycleListener, JobCoordinator)}, except + * Same as {@link #StreamProcessor(String, Config, Map, TaskFactory, ProcessorLifecycleListener, JobCoordinator)}, except * it creates a {@link JobCoordinator} instead of accepting it as an argument. * - * @param config configuration required to launch {@link JobCoordinator} and {@link SamzaContainer} - * @param customMetricsReporters registered with the metrics system to report metrics - * @param taskFactory task factory to instantiate the Task - * @param processorListener listener to the StreamProcessor life cycle + * @param processorId a unique logical identifier assigned to the stream processor. + * @param config configuration required to launch {@link JobCoordinator} and {@link SamzaContainer}. + * @param customMetricsReporters registered with the metrics system to report metrics. + * @param taskFactory the task factory to instantiate the Task. + * @param processorListener listener to the StreamProcessor life cycle. * - * Deprecated: Use {@link #StreamProcessor(Config, Map, TaskFactory, Optional, Optional, Optional, + * Deprecated: Use {@link #StreamProcessor(String, Config, Map, TaskFactory, Optional, Optional, Optional, * StreamProcessorLifecycleListenerFactory, JobCoordinator)} instead. */ @Deprecated - public StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters, TaskFactory taskFactory, + public StreamProcessor(String processorId, Config config, Map<String, MetricsReporter> customMetricsReporters, TaskFactory taskFactory, ProcessorLifecycleListener processorListener) { - this(config, customMetricsReporters, taskFactory, processorListener, null); + this(processorId, config, customMetricsReporters, taskFactory, processorListener, null); } /** - * Same as {@link #StreamProcessor(Config, Map, TaskFactory, Optional, Optional, Optional, + * Same as {@link #StreamProcessor(String, Config, Map, TaskFactory, Optional, Optional, Optional, * StreamProcessorLifecycleListenerFactory, JobCoordinator)}, with the following differences: * <ol> * <li>Passes null for application-defined context factories</li> @@ -189,25 +193,27 @@ public class StreamProcessor { * {@link StreamProcessorLifecycleListenerFactory}</li> * </ol> * + * @param processorId a unique logical identifier assigned to the stream processor. * @param config configuration required to launch {@link JobCoordinator} and {@link SamzaContainer} * @param customMetricsReporters registered with the metrics system to report metrics * @param taskFactory task factory to instantiate the Task * @param processorListener listener to the StreamProcessor life cycle * @param jobCoordinator the instance of {@link JobCoordinator} * - * Deprecated: Use {@link #StreamProcessor(Config, Map, TaskFactory, Optional, Optional, Optional, + * Deprecated: Use {@link #StreamProcessor(String, Config, Map, TaskFactory, Optional, Optional, Optional, * StreamProcessorLifecycleListenerFactory, JobCoordinator)} instead. */ @Deprecated - public StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters, TaskFactory taskFactory, + public StreamProcessor(String processorId, Config config, Map<String, MetricsReporter> customMetricsReporters, TaskFactory taskFactory, ProcessorLifecycleListener processorListener, JobCoordinator jobCoordinator) { - this(config, customMetricsReporters, taskFactory, Optional.empty(), Optional.empty(), Optional.empty(), - sp -> processorListener, jobCoordinator); + this(processorId, config, customMetricsReporters, taskFactory, Optional.empty(), Optional.empty(), Optional.empty(), sp -> processorListener, + jobCoordinator); } /** * Builds a {@link StreamProcessor} with full specification of processing components. * + * @param processorId a unique logical identifier assigned to the stream processor. * @param config configuration required to launch {@link JobCoordinator} and {@link SamzaContainer} * @param customMetricsReporters registered with the metrics system to report metrics * @param taskFactory task factory to instantiate the Task @@ -217,14 +223,20 @@ public class StreamProcessor { * @param listenerFactory factory for creating a listener to the StreamProcessor life cycle * @param jobCoordinator the instance of {@link JobCoordinator} */ - public StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters, TaskFactory taskFactory, + public StreamProcessor(String processorId, Config config, Map<String, MetricsReporter> customMetricsReporters, TaskFactory taskFactory, Optional<ApplicationContainerContextFactory<ApplicationContainerContext>> applicationDefinedContainerContextFactoryOptional, Optional<ApplicationTaskContextFactory<ApplicationTaskContext>> applicationDefinedTaskContextFactoryOptional, Optional<ExternalContext> externalContextOptional, StreamProcessorLifecycleListenerFactory listenerFactory, JobCoordinator jobCoordinator) { Preconditions.checkNotNull(listenerFactory, "StreamProcessorListenerFactory cannot be null."); + Preconditions.checkArgument(StringUtils.isNotBlank(processorId), "ProcessorId cannot be null."); this.config = config; + this.processorId = processorId; + this.metricsRegistry = new MetricsRegistryMap(); this.customMetricsReporter = customMetricsReporters; + for (MetricsReporter metricsReporter : customMetricsReporter.values()) { + metricsReporter.register("StreamProcessor", metricsRegistry); + } this.taskFactory = taskFactory; this.applicationDefinedContainerContextFactoryOptional = applicationDefinedContainerContextFactoryOptional; this.applicationDefinedTaskContextFactoryOptional = applicationDefinedTaskContextFactoryOptional; @@ -235,8 +247,6 @@ public class StreamProcessor { this.jobCoordinator.setListener(jobCoordinatorListener); ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(CONTAINER_THREAD_NAME_FORMAT).setDaemon(true).build(); this.containerExcecutorService = Executors.newSingleThreadExecutor(threadFactory); - // TODO: remove the dependency on jobCoordinator for processorId after fixing SAMZA-1835 - this.processorId = this.jobCoordinator.getProcessorId(); this.processorListener = listenerFactory.createInstance(this); } @@ -287,6 +297,7 @@ public class StreamProcessor { */ public void stop() { synchronized (lock) { + LOGGER.info("Stopping the stream processor: {}.", processorId); if (state != State.STOPPING && state != State.STOPPED) { state = State.STOPPING; try { @@ -328,7 +339,7 @@ public class StreamProcessor { private JobCoordinator createJobCoordinator() { String jobCoordinatorFactoryClassName = new JobCoordinatorConfig(config).getJobCoordinatorFactoryClassName(); - return Util.getObj(jobCoordinatorFactoryClassName, JobCoordinatorFactory.class).getJobCoordinator(config); + return Util.getObj(jobCoordinatorFactoryClassName, JobCoordinatorFactory.class).getJobCoordinator(processorId, config, metricsRegistry); } /** http://git-wip-us.apache.org/repos/asf/samza/blob/df56f2dc/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java index 189fc1f..7da3369 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java @@ -32,6 +32,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.lang3.StringUtils; import org.apache.samza.SamzaException; import org.apache.samza.application.descriptors.ApplicationDescriptor; import org.apache.samza.application.descriptors.ApplicationDescriptorImpl; @@ -39,6 +40,7 @@ import org.apache.samza.application.descriptors.ApplicationDescriptorUtil; import org.apache.samza.application.SamzaApplication; import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.Config; +import org.apache.samza.config.ConfigException; import org.apache.samza.config.JobConfig; import org.apache.samza.context.ExternalContext; import org.apache.samza.execution.LocalJobPlanner; @@ -47,6 +49,7 @@ import org.apache.samza.metrics.MetricsReporter; import org.apache.samza.processor.StreamProcessor; import org.apache.samza.task.TaskFactory; import org.apache.samza.task.TaskFactoryUtil; +import org.apache.samza.util.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -167,14 +170,37 @@ public class LocalApplicationRunner implements ApplicationRunner { Optional<ExternalContext> externalContextOptional) { TaskFactory taskFactory = TaskFactoryUtil.getTaskFactory(appDesc); Map<String, MetricsReporter> reporters = new HashMap<>(); - // TODO: the null processorId has to be fixed after SAMZA-1835 + String processorId = createProcessorId(new ApplicationConfig(config)); appDesc.getMetricsReporterFactories().forEach((name, factory) -> - reporters.put(name, factory.getMetricsReporter(name, null, config))); - return new StreamProcessor(config, reporters, taskFactory, appDesc.getApplicationContainerContextFactory(), + reporters.put(name, factory.getMetricsReporter(name, processorId, config))); + return new StreamProcessor(processorId, config, reporters, taskFactory, appDesc.getApplicationContainerContextFactory(), appDesc.getApplicationTaskContextFactory(), externalContextOptional, listenerFactory, null); } /** + * Generates a unique logical identifier for the stream processor using the provided {@param appConfig}. + * 1. If the processorId is defined in the configuration, then returns the value defined in the configuration. + * 2. Else if the {@linkplain ProcessorIdGenerator} class is defined the configuration, then uses the {@linkplain ProcessorIdGenerator} + * to generate the unique processorId. + * 3. Else throws the {@see ConfigException} back to the caller. + * @param appConfig the configuration of the samza application. + * @throws ConfigException if neither processor.id nor app.processor-id-generator.class is defined in the configuration. + * @return the generated processor identifier. + */ + @VisibleForTesting + static String createProcessorId(ApplicationConfig appConfig) { + if (StringUtils.isNotBlank(appConfig.getProcessorId())) { + return appConfig.getProcessorId(); + } else if (StringUtils.isNotBlank(appConfig.getAppProcessorIdGeneratorClass())) { + ProcessorIdGenerator idGenerator = Util.getObj(appConfig.getAppProcessorIdGeneratorClass(), ProcessorIdGenerator.class); + return idGenerator.generateProcessorId(appConfig); + } else { + throw new ConfigException(String.format("Expected either %s or %s to be configured", ApplicationConfig.PROCESSOR_ID, + ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS)); + } + } + + /** * Defines a specific implementation of {@link ProcessorLifecycleListener} for local {@link StreamProcessor}s. */ private final class LocalStreamProcessorLifecycleListener implements ProcessorLifecycleListener { http://git-wip-us.apache.org/repos/asf/samza/blob/df56f2dc/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java index 44fd811..15a9205 100644 --- a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java @@ -20,9 +20,7 @@ package org.apache.samza.standalone; import com.google.common.collect.ImmutableMap; import org.apache.samza.checkpoint.CheckpointManager; -import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.Config; -import org.apache.samza.config.ConfigException; import org.apache.samza.config.JobConfig; import org.apache.samza.config.TaskConfigJava; import org.apache.samza.container.grouper.task.GrouperMetadata; @@ -34,7 +32,7 @@ import org.apache.samza.coordinator.JobCoordinatorListener; import org.apache.samza.runtime.LocationId; import org.apache.samza.runtime.LocationIdProvider; import org.apache.samza.runtime.LocationIdProviderFactory; -import org.apache.samza.runtime.ProcessorIdGenerator; +import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.storage.ChangelogStreamManager; import org.apache.samza.system.StreamMetadataCache; import org.apache.samza.system.SystemAdmins; @@ -73,8 +71,8 @@ public class PassthroughJobCoordinator implements JobCoordinator { private final LocationId locationId; private JobCoordinatorListener coordinatorListener = null; - public PassthroughJobCoordinator(Config config) { - this.processorId = createProcessorId(config); + public PassthroughJobCoordinator(String processorId, Config config, MetricsRegistry metricsRegistry) { + this.processorId = processorId; this.config = config; LocationIdProviderFactory locationIdProviderFactory = Util.getObj(new JobConfig(config).getLocationIdProviderFactory(), LocationIdProviderFactory.class); LocationIdProvider locationIdProvider = locationIdProviderFactory.getLocationIdProvider(config); @@ -105,6 +103,7 @@ public class PassthroughJobCoordinator implements JobCoordinator { coordinatorListener.onNewJobModel(processorId, jobModel); } } else { + LOGGER.info("JobModel: {} does not contain processorId: {}. Stopping the JobCoordinator", jobModel, processorId); stop(); } } @@ -141,20 +140,4 @@ public class PassthroughJobCoordinator implements JobCoordinator { public String getProcessorId() { return this.processorId; } - - private String createProcessorId(Config config) { - // TODO: This check to be removed after 0.13+ - ApplicationConfig appConfig = new ApplicationConfig(config); - if (appConfig.getProcessorId() != null) { - return appConfig.getProcessorId(); - } else if (appConfig.getAppProcessorIdGeneratorClass() != null) { - ProcessorIdGenerator idGenerator = - Util.getObj(appConfig.getAppProcessorIdGeneratorClass(), ProcessorIdGenerator.class); - return idGenerator.generateProcessorId(config); - } else { - throw new ConfigException(String - .format("Expected either %s or %s to be configured", ApplicationConfig.PROCESSOR_ID, - ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS)); - } - } } http://git-wip-us.apache.org/repos/asf/samza/blob/df56f2dc/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinatorFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinatorFactory.java index 2ba56a6..5d5fecf 100644 --- a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinatorFactory.java +++ b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinatorFactory.java @@ -21,10 +21,11 @@ package org.apache.samza.standalone; import org.apache.samza.config.Config; import org.apache.samza.coordinator.JobCoordinator; import org.apache.samza.coordinator.JobCoordinatorFactory; +import org.apache.samza.metrics.MetricsRegistry; public class PassthroughJobCoordinatorFactory implements JobCoordinatorFactory { @Override - public JobCoordinator getJobCoordinator(Config config) { - return new PassthroughJobCoordinator(config); + public JobCoordinator getJobCoordinator(String processorId, Config config, MetricsRegistry metricsRegistry) { + return new PassthroughJobCoordinator(processorId, config, metricsRegistry); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/df56f2dc/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java index 8c5a3ba..8371070 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java @@ -29,14 +29,10 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import org.I0Itec.zkclient.IZkStateListener; -import org.apache.commons.lang3.StringUtils; import org.apache.samza.checkpoint.CheckpointManager; -import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.Config; -import org.apache.samza.config.ConfigException; import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; -import org.apache.samza.config.MetricsConfig; import org.apache.samza.config.TaskConfigJava; import org.apache.samza.config.StorageConfig; import org.apache.samza.config.ZkConfig; @@ -52,18 +48,14 @@ import org.apache.samza.job.model.ContainerModel; import org.apache.samza.job.model.JobModel; import org.apache.samza.job.model.TaskModel; import org.apache.samza.metrics.MetricsRegistry; -import org.apache.samza.metrics.MetricsReporter; -import org.apache.samza.metrics.ReadableMetricsRegistry; import org.apache.samza.runtime.LocationId; import org.apache.samza.runtime.LocationIdProvider; import org.apache.samza.runtime.LocationIdProviderFactory; -import org.apache.samza.runtime.ProcessorIdGenerator; import org.apache.samza.storage.ChangelogStreamManager; import org.apache.samza.system.StreamMetadataCache; import org.apache.samza.system.SystemAdmins; import org.apache.samza.system.SystemStream; import org.apache.samza.system.SystemStreamPartition; -import org.apache.samza.util.MetricsReporterLoader; import org.apache.samza.util.SystemClock; import org.apache.samza.util.Util; import org.apache.samza.zk.ZkUtils.ProcessorNode; @@ -100,7 +92,6 @@ public class ZkJobCoordinator implements JobCoordinator { private final Config config; private final ZkBarrierForVersionUpgrade barrier; private final ZkJobCoordinatorMetrics metrics; - private final Map<String, MetricsReporter> reporters; private final ZkLeaderElector leaderElector; private final AtomicBoolean initiatedShutdown = new AtomicBoolean(false); private final StreamMetadataCache streamMetadataCache; @@ -120,11 +111,11 @@ public class ZkJobCoordinator implements JobCoordinator { @VisibleForTesting StreamPartitionCountMonitor streamPartitionCountMonitor = null; - ZkJobCoordinator(Config config, MetricsRegistry metricsRegistry, ZkUtils zkUtils) { + ZkJobCoordinator(String processorId, Config config, MetricsRegistry metricsRegistry, ZkUtils zkUtils) { this.config = config; this.metrics = new ZkJobCoordinatorMetrics(metricsRegistry); - this.processorId = createProcessorId(config); + this.processorId = processorId; this.zkUtils = zkUtils; // setup a listener for a session state change // we are mostly interested in "session closed" and "new session created" events @@ -132,7 +123,6 @@ public class ZkJobCoordinator implements JobCoordinator { leaderElector = new ZkLeaderElector(processorId, zkUtils); leaderElector.setLeaderElectorListener(new LeaderElectorListenerImpl()); this.debounceTimeMs = new JobConfig(config).getDebounceTimeMs(); - this.reporters = MetricsReporterLoader.getMetricsReporters(new MetricsConfig(config), processorId); debounceTimer = new ScheduleAfterDebounceTime(processorId); debounceTimer.setScheduledTaskCallback(throwable -> { LOG.error("Received exception in debounce timer! Stopping the job coordinator", throwable); @@ -152,7 +142,6 @@ public class ZkJobCoordinator implements JobCoordinator { zkUtils.validateZkVersion(); zkUtils.validatePaths(new String[]{keyBuilder.getProcessorsPath(), keyBuilder.getJobModelVersionPath(), keyBuilder.getJobModelPathPrefix(), keyBuilder.getTaskLocalityPath()}); - startMetrics(); systemAdmins.start(); leaderElector.tryBecomeLeader(); zkUtils.subscribeToJobModelVersionChange(new ZkJobModelVersionChangeHandler(zkUtils)); @@ -192,9 +181,6 @@ public class ZkJobCoordinator implements JobCoordinator { LOG.debug("Shutting down system admins."); systemAdmins.stop(); - LOG.debug("Shutting down metrics."); - shutdownMetrics(); - if (streamPartitionCountMonitor != null) { streamPartitionCountMonitor.stop(); } @@ -217,19 +203,6 @@ public class ZkJobCoordinator implements JobCoordinator { } } - private void startMetrics() { - for (MetricsReporter reporter: reporters.values()) { - reporter.register("job-coordinator-" + processorId, (ReadableMetricsRegistry) metrics.getMetricsRegistry()); - reporter.start(); - } - } - - private void shutdownMetrics() { - for (MetricsReporter reporter: reporters.values()) { - reporter.stop(); - } - } - @Override public void setListener(JobCoordinatorListener listener) { this.coordinatorListener = listener; @@ -307,22 +280,6 @@ public class ZkJobCoordinator implements JobCoordinator { debounceTimer.scheduleAfterDebounceTime(ON_ZK_CLEANUP, 0, () -> zkUtils.cleanupZK(NUM_VERSIONS_TO_LEAVE)); } - private String createProcessorId(Config config) { - // TODO: This check to be removed after 0.13+ - ApplicationConfig appConfig = new ApplicationConfig(config); - if (appConfig.getProcessorId() != null) { - return appConfig.getProcessorId(); - } else if (StringUtils.isNotBlank(appConfig.getAppProcessorIdGeneratorClass())) { - ProcessorIdGenerator idGenerator = - Util.getObj(appConfig.getAppProcessorIdGeneratorClass(), ProcessorIdGenerator.class); - return idGenerator.generateProcessorId(config); - } else { - throw new ConfigException(String - .format("Expected either %s or %s to be configured", ApplicationConfig.PROCESSOR_ID, - ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS)); - } - } - /** * Generate new JobModel when becoming a leader or the list of processor changed. */ http://git-wip-us.apache.org/repos/asf/samza/blob/df56f2dc/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java index 41294a3..fdc2b0d 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java @@ -27,7 +27,6 @@ import org.apache.samza.config.ZkConfig; import org.apache.samza.coordinator.JobCoordinator; import org.apache.samza.coordinator.JobCoordinatorFactory; import org.apache.samza.metrics.MetricsRegistry; -import org.apache.samza.metrics.MetricsRegistryMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,13 +43,12 @@ public class ZkJobCoordinatorFactory implements JobCoordinatorFactory { * @return An instance of {@link ZkJobCoordinator} */ @Override - public JobCoordinator getJobCoordinator(Config config) { + public JobCoordinator getJobCoordinator(String processorId, Config config, MetricsRegistry metricsRegistry) { // TODO: Separate JC related configs into a "ZkJobCoordinatorConfig" - MetricsRegistry metricsRegistry = new MetricsRegistryMap(); String jobCoordinatorZkBasePath = getJobCoordinationZkPath(config); ZkUtils zkUtils = getZkUtils(config, metricsRegistry, jobCoordinatorZkBasePath); LOG.debug("Creating ZkJobCoordinator with config: {}.", config); - return new ZkJobCoordinator(config, metricsRegistry, zkUtils); + return new ZkJobCoordinator(processorId, config, metricsRegistry, zkUtils); } private ZkUtils getZkUtils(Config config, MetricsRegistry metricsRegistry, String coordinatorZkBasePath) { http://git-wip-us.apache.org/repos/asf/samza/blob/df56f2dc/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java index 1c33b96..d977504 100644 --- a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java +++ b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java @@ -93,7 +93,7 @@ public class TestStreamProcessor { ProcessorLifecycleListener processorListener, JobCoordinator jobCoordinator, SamzaContainer container) { - super(config, customMetricsReporters, streamTaskFactory, processorListener, jobCoordinator); + super("TEST_PROCESSOR_ID", config, customMetricsReporters, streamTaskFactory, processorListener, jobCoordinator); this.container = container; } @@ -325,7 +325,7 @@ public class TestStreamProcessor { JobCoordinator mockJobCoordinator = Mockito.mock(JobCoordinator.class); Mockito.doNothing().when(mockJobCoordinator).start(); ProcessorLifecycleListener lifecycleListener = Mockito.mock(ProcessorLifecycleListener.class); - StreamProcessor streamProcessor = new StreamProcessor(new MapConfig(), new HashMap<>(), null, lifecycleListener, mockJobCoordinator); + StreamProcessor streamProcessor = new StreamProcessor("TestProcessorId", new MapConfig(), new HashMap<>(), null, lifecycleListener, mockJobCoordinator); assertEquals(State.NEW, streamProcessor.getState()); streamProcessor.start(); @@ -344,7 +344,7 @@ public class TestStreamProcessor { ProcessorLifecycleListener lifecycleListener = Mockito.mock(ProcessorLifecycleListener.class); SamzaContainer mockSamzaContainer = Mockito.mock(SamzaContainer.class); MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0")); - StreamProcessor streamProcessor = new StreamProcessor(config, new HashMap<>(), null, lifecycleListener, mockJobCoordinator); + StreamProcessor streamProcessor = new StreamProcessor("TestProcessorId", config, new HashMap<>(), null, lifecycleListener, mockJobCoordinator); /** * Without a SamzaContainer running in StreamProcessor and current StreamProcessor state is STARTED, @@ -412,7 +412,7 @@ public class TestStreamProcessor { ProcessorLifecycleListener lifecycleListener = Mockito.mock(ProcessorLifecycleListener.class); SamzaContainer mockSamzaContainer = Mockito.mock(SamzaContainer.class); MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0")); - StreamProcessor streamProcessor = PowerMockito.spy(new StreamProcessor(config, new HashMap<>(), null, lifecycleListener, mockJobCoordinator)); + StreamProcessor streamProcessor = PowerMockito.spy(new StreamProcessor("TestProcessorId", config, new HashMap<>(), null, lifecycleListener, mockJobCoordinator)); Mockito.doNothing().when(mockJobCoordinator).stop(); Mockito.doNothing().when(mockSamzaContainer).shutdown(); @@ -434,7 +434,7 @@ public class TestStreamProcessor { ProcessorLifecycleListener lifecycleListener = Mockito.mock(ProcessorLifecycleListener.class); SamzaContainer mockSamzaContainer = Mockito.mock(SamzaContainer.class); MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0")); - StreamProcessor streamProcessor = new StreamProcessor(config, new HashMap<>(), null, lifecycleListener, mockJobCoordinator); + StreamProcessor streamProcessor = new StreamProcessor("TestProcessorId", config, new HashMap<>(), null, lifecycleListener, mockJobCoordinator); Exception failureException = new Exception("dummy exception"); @@ -455,7 +455,7 @@ public class TestStreamProcessor { JobCoordinator mockJobCoordinator = Mockito.mock(JobCoordinator.class); ProcessorLifecycleListener lifecycleListener = Mockito.mock(ProcessorLifecycleListener.class); MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0")); - StreamProcessor streamProcessor = new StreamProcessor(config, new HashMap<>(), null, lifecycleListener, mockJobCoordinator); + StreamProcessor streamProcessor = new StreamProcessor("TestProcessorId", config, new HashMap<>(), null, lifecycleListener, mockJobCoordinator); streamProcessor.state = State.RUNNING; streamProcessor.jobCoordinatorListener.onCoordinatorStop(); @@ -468,7 +468,7 @@ public class TestStreamProcessor { public void testStreamProcessorWithStreamProcessorListenerFactory() { AtomicReference<MockStreamProcessorLifecycleListener> mockListener = new AtomicReference<>(); StreamProcessor streamProcessor = - new StreamProcessor(mock(Config.class), new HashMap<>(), mock(TaskFactory.class), Optional.empty(), + new StreamProcessor("TestProcessorId", mock(Config.class), new HashMap<>(), mock(TaskFactory.class), Optional.empty(), Optional.empty(), Optional.empty(), sp -> mockListener.updateAndGet(old -> new MockStreamProcessorLifecycleListener(sp)), mock(JobCoordinator.class)); http://git-wip-us.apache.org/repos/asf/samza/blob/df56f2dc/samza-core/src/test/java/org/apache/samza/runtime/MockProcessorIdGenerator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/runtime/MockProcessorIdGenerator.java b/samza-core/src/test/java/org/apache/samza/runtime/MockProcessorIdGenerator.java new file mode 100644 index 0000000..a55a2ab --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/runtime/MockProcessorIdGenerator.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.runtime; + +import org.apache.samza.config.Config; + +public class MockProcessorIdGenerator implements ProcessorIdGenerator { + @Override + public String generateProcessorId(Config config) { + return "testProcessorId"; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/df56f2dc/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java index 5bd7893..c691500 100644 --- a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java +++ b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java @@ -19,6 +19,7 @@ package org.apache.samza.runtime; +import com.google.common.collect.ImmutableMap; import java.time.Duration; import java.util.Collections; import java.util.HashMap; @@ -32,6 +33,7 @@ import org.apache.samza.application.descriptors.ApplicationDescriptorUtil; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.Config; +import org.apache.samza.config.ConfigException; import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; import org.apache.samza.context.ExternalContext; @@ -42,6 +44,7 @@ import org.apache.samza.task.IdentityStreamTask; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -221,6 +224,29 @@ public class TestLocalApplicationRunner { assertFalse("Application finished before the timeout.", finished); } + @Test + public void testCreateProcessorIdShouldReturnProcessorIdDefinedInConfiguration() { + String processorId = "testProcessorId"; + MapConfig configMap = new MapConfig(ImmutableMap.of(ApplicationConfig.PROCESSOR_ID, processorId)); + String actualProcessorId = LocalApplicationRunner.createProcessorId(new ApplicationConfig(configMap)); + assertEquals(processorId, actualProcessorId); + } + + @Test + public void testCreateProcessorIdShouldInvokeProcessorIdGeneratorDefinedInConfiguration() { + String processorId = "testProcessorId"; + MapConfig configMap = new MapConfig(ImmutableMap.of(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, MockProcessorIdGenerator.class.getCanonicalName())); + String actualProcessorId = LocalApplicationRunner.createProcessorId(new ApplicationConfig(configMap)); + assertEquals(processorId, actualProcessorId); + } + + @Test(expected = ConfigException.class) + public void testCreateProcessorIdShouldThrowExceptionWhenProcessorIdAndGeneratorAreNotDefined() { + ApplicationConfig mockConfig = Mockito.mock(ApplicationConfig.class); + Mockito.when(mockConfig.getProcessorId()).thenReturn(null); + LocalApplicationRunner.createProcessorId(mockConfig); + } + private void prepareTest() { ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc = ApplicationDescriptorUtil.getAppDescriptor(mockApp, config); http://git-wip-us.apache.org/repos/asf/samza/blob/df56f2dc/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java index 083caad..e0a0941 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java @@ -53,7 +53,7 @@ public class TestZkJobCoordinator { when(zkUtils.getZkClient()).thenReturn(mockZkClient); when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>())); - ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator(new MapConfig(), new NoOpMetricsRegistry(), zkUtils)); + ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils)); doAnswer(new Answer<Void>() { public Void answer(InvocationOnMock invocation) { jcShutdownLatch.countDown(); @@ -80,7 +80,7 @@ public class TestZkJobCoordinator { ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class); - ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator(new MapConfig(), new NoOpMetricsRegistry(), zkUtils)); + ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils)); zkJobCoordinator.debounceTimer = mockDebounceTimer; final ZkSessionStateChangedListener zkSessionStateChangedListener = zkJobCoordinator.new ZkSessionStateChangedListener(); @@ -104,7 +104,7 @@ public class TestZkJobCoordinator { ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class); - ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator(new MapConfig(), new NoOpMetricsRegistry(), zkUtils)); + ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils)); StreamPartitionCountMonitor monitor = Mockito.mock(StreamPartitionCountMonitor.class); zkJobCoordinator.debounceTimer = mockDebounceTimer; zkJobCoordinator.streamPartitionCountMonitor = monitor; @@ -127,7 +127,7 @@ public class TestZkJobCoordinator { ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class); - ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator(new MapConfig(), new NoOpMetricsRegistry(), zkUtils)); + ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils)); StreamPartitionCountMonitor monitor = Mockito.mock(StreamPartitionCountMonitor.class); zkJobCoordinator.debounceTimer = mockDebounceTimer; @@ -154,7 +154,7 @@ public class TestZkJobCoordinator { ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class); - ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator(new MapConfig(), new NoOpMetricsRegistry(), zkUtils)); + ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils)); StreamPartitionCountMonitor monitor = Mockito.mock(StreamPartitionCountMonitor.class); zkJobCoordinator.debounceTimer = mockDebounceTimer; http://git-wip-us.apache.org/repos/asf/samza/blob/df56f2dc/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java index 7bd99bb..3760edb 100644 --- a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java +++ b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java @@ -47,6 +47,7 @@ import org.apache.samza.config.ZkConfig; import org.apache.samza.context.Context; import org.apache.samza.coordinator.JobCoordinator; import org.apache.samza.coordinator.JobCoordinatorFactory; +import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.runtime.ProcessorLifecycleListener; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.OutgoingMessageEnvelope; @@ -132,7 +133,7 @@ public class TestZkStreamProcessorBase extends StandaloneIntegrationTestHarness map.put(ApplicationConfig.PROCESSOR_ID, pId); Config config = new MapConfig(map); String jobCoordinatorFactoryClassName = new JobCoordinatorConfig(config).getJobCoordinatorFactoryClassName(); - JobCoordinator jobCoordinator = Util.getObj(jobCoordinatorFactoryClassName, JobCoordinatorFactory.class).getJobCoordinator(config); + JobCoordinator jobCoordinator = Util.getObj(jobCoordinatorFactoryClassName, JobCoordinatorFactory.class).getJobCoordinator(pId, config, new MetricsRegistryMap()); ProcessorLifecycleListener listener = new ProcessorLifecycleListener() { @Override @@ -165,7 +166,7 @@ public class TestZkStreamProcessorBase extends StandaloneIntegrationTestHarness }; StreamProcessor processor = - new StreamProcessor(config, new HashMap<>(), (StreamTaskFactory) TestStreamTask::new, listener, jobCoordinator); + new StreamProcessor(pId, config, new HashMap<>(), (StreamTaskFactory) TestStreamTask::new, listener, jobCoordinator); return processor; } http://git-wip-us.apache.org/repos/asf/samza/blob/df56f2dc/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java index e7040ca..cf41148 100644 --- a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java @@ -59,6 +59,9 @@ import static org.mockito.Mockito.mock; public class TestStreamProcessor extends StandaloneIntegrationTestHarness { + + public static final String PROCESSOR_ID = "1"; + /** * Testing a basic identity stream task - reads data from a topic and writes it to another topic * (without any modifications) @@ -136,7 +139,7 @@ public class TestStreamProcessor extends StandaloneIntegrationTestHarness { final String outputTopic = "output4"; final int messageCount = 20; - final Map<String, String> configMap = createConfigs("1", testSystem, inputTopic, outputTopic, messageCount); + final Map<String, String> configMap = createConfigs(PROCESSOR_ID, testSystem, inputTopic, outputTopic, messageCount); configMap.remove("task.class"); final Config configs = new MapConfig(configMap); final TestStubs stubs = new TestStubs(configs, (StreamTaskFactory) null, bootstrapServers()); @@ -243,12 +246,12 @@ public class TestStreamProcessor extends StandaloneIntegrationTestHarness { TestStubs(Config config, StreamTaskFactory taskFactory, String bootstrapServer) { this(bootstrapServer); - processor = new StreamProcessor(config, new HashMap<>(), taskFactory, listener); + processor = new StreamProcessor("1", config, new HashMap<>(), taskFactory, listener); } TestStubs(Config config, AsyncStreamTaskFactory taskFactory, String bootstrapServer) { this(bootstrapServer); - processor = new StreamProcessor(config, new HashMap<>(), taskFactory, listener); + processor = new StreamProcessor("1", config, new HashMap<>(), taskFactory, listener); } private void initConsumer(String bootstrapServer) {