Repository: samza Updated Branches: refs/heads/master 3750f5e24 -> 6fcf7f3f4
SAMZA-1482: Restart or fail Samza jobs in YARN when detecting changes⦠⦠in input topic partitions Some high-lights of the changes: - always instantiating StreamPartitionCountMonitor on all input system streams now -- it is debatable whether we want to include systems that do not implement the optimized ExtendedSystemAdmin interface. We may need to configure a long partition monitor interval for this case and the case where there are tons of input topics. (Pending perf test) - moved the instantiation of StreamPartitionCountMonitor out of JobModelManager and allow ClusterBasedJobCoordinator associate a callback method directly to the monitor - allow callbacks to set different application status code before throwing exception to shutdown the job Author: Yi Pan (Data Infrastructure) <nickpa...@gmail.com> Reviewers: Jacob Maes <jm...@linkedin.com>, Jagadish <jagad...@apache.org> Closes #351 from nickpan47/restart-on-partition-change and squashes the following commits: 8d04cd6 [Yi Pan (Data Infrastructure)] SAMZA-1482: restart or fail the job when input topic partition count changes ee3fa65 [Yi Pan (Data Infrastructure)] SAMZA-1482: Restart or fail Samza jobs in YARN when detecting changes in input topic partitions Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/6fcf7f3f Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/6fcf7f3f Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/6fcf7f3f Branch: refs/heads/master Commit: 6fcf7f3f4cbf8b9c6f69b292e3c1aaa239ab18d3 Parents: 3750f5e Author: Yi Pan (Data Infrastructure) <nickpa...@gmail.com> Authored: Tue Nov 14 12:45:23 2017 -0800 Committer: Yi Pan (Data Infrastructure) <nickpa...@gmail.com> Committed: Tue Nov 14 12:45:23 2017 -0800 ---------------------------------------------------------------------- .../apache/samza/PartitionChangeException.java | 31 ++++ .../ClusterBasedJobCoordinator.java | 108 +++++++++-- .../clustermanager/ContainerProcessManager.java | 26 ++- .../clustermanager/SamzaApplicationState.java | 7 +- .../StreamPartitionCountMonitor.java | 116 +++++++----- .../org/apache/samza/config/JobConfig.scala | 2 - .../org/apache/samza/config/StorageConfig.scala | 8 + .../samza/coordinator/JobModelManager.scala | 49 ++--- .../main/scala/org/apache/samza/util/Util.scala | 1 - .../MockClusterResourceManagerFactory.java | 32 ++++ .../clustermanager/MockContainerListener.java | 1 + .../TestClusterBasedJobCoordinator.java | 108 +++++++++++ .../clustermanager/TestContainerAllocator.java | 12 +- .../TestContainerProcessManager.java | 22 +-- .../TestHostAwareContainerAllocator.java | 16 +- .../coordinator/JobModelManagerTestUtil.java | 4 +- .../samza/coordinator/TestJobModelManager.java | 24 +-- .../samza/storage/MockSystemConsumer.java | 59 ------ .../apache/samza/storage/MockSystemFactory.java | 45 ----- .../samza/storage/TestStorageRecovery.java | 37 +--- .../apache/samza/system/MockSystemFactory.java | 181 +++++++++++++++++++ .../samza/coordinator/TestJobCoordinator.scala | 65 +------ .../TestStreamPartitionCountMonitor.scala | 82 ++++++++- .../TestRangeSystemStreamPartitionMatcher.scala | 1 - .../TestRegexSystemStreamPartitionMatcher.scala | 1 - 25 files changed, 693 insertions(+), 345 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/main/java/org/apache/samza/PartitionChangeException.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/PartitionChangeException.java b/samza-core/src/main/java/org/apache/samza/PartitionChangeException.java new file mode 100644 index 0000000..4619dfa --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/PartitionChangeException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza; + + +/** + * Exception to indicate that the input {@link org.apache.samza.system.SystemStreamPartition} changed + */ +public class PartitionChangeException extends SamzaException { + + public PartitionChangeException(String s) { + super(s); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java index d0d4e34..3d67cae 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java @@ -18,21 +18,36 @@ */ package org.apache.samza.clustermanager; +import com.google.common.annotations.VisibleForTesting; +import java.util.Map; +import java.util.Set; import org.apache.samza.SamzaException; +import org.apache.samza.PartitionChangeException; import org.apache.samza.config.ClusterManagerConfig; import org.apache.samza.config.Config; +import org.apache.samza.config.JavaSystemConfig; +import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; import org.apache.samza.config.ShellCommandConfig; +import org.apache.samza.config.StorageConfig; +import org.apache.samza.config.TaskConfigJava; import org.apache.samza.coordinator.JobModelManager; +import org.apache.samza.coordinator.StreamPartitionCountMonitor; import org.apache.samza.metrics.JmxServer; import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.serializers.model.SamzaObjectMapper; +import org.apache.samza.system.StreamMetadataCache; +import org.apache.samza.system.SystemAdmin; +import org.apache.samza.system.SystemStream; +import org.apache.samza.util.SystemClock; +import org.apache.samza.util.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; + /** * Implements a JobCoordinator that is completely independent of the underlying cluster * manager system. This {@link ClusterBasedJobCoordinator} handles functionality common @@ -69,10 +84,6 @@ public class ClusterBasedJobCoordinator { */ private final SamzaApplicationState state; - /** - * Metrics to track stats around container failures, needed containers etc. - */ - //even though some of these can be converted to local variables, it will not be the case //as we add more methods to the JobCoordinator and completely implement SAMZA-881. @@ -101,8 +112,30 @@ public class ClusterBasedJobCoordinator { */ private final AtomicBoolean isStarted = new AtomicBoolean(false); + /** + * A boolean variable indicating whether the job has durable state stores in the configuration + */ + private final boolean hasDurableStores; + + /** + * The input topic partition count monitor + */ + private final StreamPartitionCountMonitor partitionMonitor; + + /** + * Metrics to track stats around container failures, needed containers etc. + */ + private final MetricsRegistryMap metrics; + + /** + * Internal variable for the instance of {@link JmxServer} + */ private JmxServer jmxServer; + /** + * Variable to keep the callback exception + */ + volatile private Exception coordinatorException = null; /** * Creates a new ClusterBasedJobCoordinator instance from a config. Invoke run() to actually @@ -113,22 +146,23 @@ public class ClusterBasedJobCoordinator { */ public ClusterBasedJobCoordinator(Config coordinatorSystemConfig) { - MetricsRegistryMap registry = new MetricsRegistryMap(); + metrics = new MetricsRegistryMap(); //build a JobModelReader and perform partition assignments. - jobModelManager = buildJobModelManager(coordinatorSystemConfig, registry); + jobModelManager = buildJobModelManager(coordinatorSystemConfig, metrics); config = jobModelManager.jobModel().getConfig(); + hasDurableStores = new StorageConfig(config).hasDurableStores(); state = new SamzaApplicationState(jobModelManager); + partitionMonitor = getPartitionCountMonitor(config); clusterManagerConfig = new ClusterManagerConfig(config); isJmxEnabled = clusterManagerConfig.getJmxEnabled(); jobCoordinatorSleepInterval = clusterManagerConfig.getJobCoordinatorSleepInterval(); // build a container process Manager - containerProcessManager = new ContainerProcessManager(config, state, registry); + containerProcessManager = new ContainerProcessManager(config, state, metrics); } - /** * Starts the JobCoordinator. * @@ -152,10 +186,11 @@ public class ClusterBasedJobCoordinator { log.info("Starting Cluster Based Job Coordinator"); containerProcessManager.start(); + partitionMonitor.start(); boolean isInterrupted = false; - while (!containerProcessManager.shouldShutdown() && !isInterrupted) { + while (!containerProcessManager.shouldShutdown() && !checkAndThrowException() && !isInterrupted) { try { Thread.sleep(jobCoordinatorSleepInterval); } catch (InterruptedException e) { @@ -172,19 +207,25 @@ public class ClusterBasedJobCoordinator { } } + private boolean checkAndThrowException() throws Exception { + if (coordinatorException != null) { + throw coordinatorException; + } + return false; + } + /** * Stops all components of the JobCoordinator. */ private void onShutDown() { - if (containerProcessManager != null) { - try { - containerProcessManager.stop(); - } catch (Throwable e) { - log.error("Exception while stopping task manager {}", e); - } - log.info("Stopped task manager"); + try { + partitionMonitor.stop(); + containerProcessManager.stop(); + } catch (Throwable e) { + log.error("Exception while stopping task manager {}", e); } + log.info("Stopped task manager"); if (jmxServer != null) { try { @@ -201,6 +242,41 @@ public class ClusterBasedJobCoordinator { return jobModelManager; } + private StreamPartitionCountMonitor getPartitionCountMonitor(Config config) { + Map<String, SystemAdmin> systemAdmins = new JavaSystemConfig(config).getSystemAdmins(); + StreamMetadataCache streamMetadata = new StreamMetadataCache(Util.javaMapAsScalaMap(systemAdmins), 0, SystemClock.instance()); + Set<SystemStream> inputStreamsToMonitor = new TaskConfigJava(config).getAllInputStreams(); + if (inputStreamsToMonitor.isEmpty()) { + throw new SamzaException("Input streams to a job can not be empty."); + } + + return new StreamPartitionCountMonitor( + inputStreamsToMonitor, + streamMetadata, + metrics, + new JobConfig(config).getMonitorPartitionChangeFrequency(), + streamsChanged -> { + // Fail the jobs with durable state store. Otherwise, application state.status remains UNDEFINED s.t. YARN job will be restarted + if (hasDurableStores) { + log.error("Input topic partition count changed in a job with durable state. Failing the job."); + state.status = SamzaApplicationState.SamzaAppStatus.FAILED; + } + coordinatorException = new PartitionChangeException("Input topic partition count changes detected."); + }); + } + + // The following two methods are package-private and for testing only + @VisibleForTesting + SamzaApplicationState.SamzaAppStatus getAppStatus() { + // make sure to only return a unmodifiable copy of the status variable + final SamzaApplicationState.SamzaAppStatus copy = state.status; + return copy; + } + + @VisibleForTesting + StreamPartitionCountMonitor getPartitionMonitor() { + return partitionMonitor; + } /** http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java index 2861e9e..6a18b84 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java @@ -33,6 +33,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static com.google.common.base.Preconditions.checkNotNull; + + /** * ContainerProcessManager is responsible for requesting containers, handling failures, and notifying the application master that the * job is done. @@ -85,9 +88,11 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback */ private volatile boolean tooManyFailedContainers = false; + /** + * Exception thrown in callbacks, such as {@code containerAllocator} + */ private volatile Throwable exceptionOccurred = null; - /** * A map that keeps track of how many times each container failed. The key is the container ID, and the * value is the {@link ResourceFailure} object that has a count of failures. @@ -95,9 +100,11 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback */ private final Map<String, ResourceFailure> containerFailures = new HashMap<>(); + /** + * Metrics for {@link ContainerProcessManager} + */ private final ContainerProcessManagerMetrics metrics; - public ContainerProcessManager(Config config, SamzaApplicationState state, MetricsRegistryMap registry) { @@ -108,7 +115,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback this.hostAffinityEnabled = clusterManagerConfig.getHostAffinityEnabled(); ResourceManagerFactory factory = getContainerProcessManagerFactory(clusterManagerConfig); - this.clusterResourceManager = factory.getClusterResourceManager(this, state); + this.clusterResourceManager = checkNotNull(factory.getClusterResourceManager(this, state)); this.metrics = new ContainerProcessManagerMetrics(config, state, registry); if (this.hostAffinityEnabled) { @@ -189,6 +196,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback containerAllocator.stop(); try { allocatorThread.join(); + log.info("Stopped container allocator"); } catch (InterruptedException ie) { log.error("Allocator Thread join() threw an interrupted exception", ie); Thread.currentThread().interrupt(); @@ -197,19 +205,17 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback if (metrics != null) { try { metrics.stop(); + log.info("Stopped metrics reporters"); } catch (Throwable e) { log.error("Exception while stopping metrics {}", e); } - log.info("Stopped metrics reporters"); } - if (clusterResourceManager != null) { - try { - clusterResourceManager.stop(state.status); - } catch (Throwable e) { - log.error("Exception while stopping cluster resource manager {}", e); - } + try { + clusterResourceManager.stop(state.status); log.info("Stopped cluster resource manager"); + } catch (Throwable e) { + log.error("Exception while stopping cluster resource manager {}", e); } log.info("Finished stop of Container process manager"); http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java index 653fb4e..adc6e51 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java @@ -42,6 +42,9 @@ public class SamzaApplicationState { public enum SamzaAppStatus { UNDEFINED, SUCCEEDED, FAILED } + /** + * {@link JobModelManager} object associated with this {@link SamzaApplicationState} + */ public final JobModelManager jobModelManager; /** @@ -102,9 +105,9 @@ public class SamzaApplicationState { public final ConcurrentMap<String, SamzaResource> runningContainers = new ConcurrentHashMap<String, SamzaResource>(0); /** - * Final status of the application + * Final status of the application. Made to be volatile s.t. changes will be visible in multiple threads. */ - public SamzaAppStatus status = SamzaAppStatus.UNDEFINED; + public volatile SamzaAppStatus status = SamzaAppStatus.UNDEFINED; /** * State indicating whether the job is healthy or not http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java b/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java index b35cbff..16e8221 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java @@ -18,8 +18,10 @@ */ package org.apache.samza.coordinator; +import com.google.common.annotations.VisibleForTesting; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.Executors; @@ -28,7 +30,7 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.samza.metrics.Gauge; -import org.apache.samza.metrics.MetricsRegistryMap; +import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.system.StreamMetadataCache; import org.apache.samza.system.SystemStream; import org.apache.samza.system.SystemStreamMetadata; @@ -48,10 +50,10 @@ public class StreamPartitionCountMonitor { private final Set<SystemStream> streamsToMonitor; private final StreamMetadataCache metadataCache; - private final MetricsRegistryMap metrics; private final int monitorPeriodMs; private final Map<SystemStream, Gauge<Integer>> gauges; private final Map<SystemStream, SystemStreamMetadata> initialMetadata; + private final Callback callbackMethod; // Used to guard write access to state. private final Object lock = new Object(); @@ -61,6 +63,19 @@ public class StreamPartitionCountMonitor { private volatile State state = State.INIT; + /** + * A callback that is invoked when the {@link StreamPartitionCountMonitor} detects a change in the partition count of + * any of its {@link SystemStream}s. + */ + public interface Callback { + /** + * Method to be called when SSP changes detected in the input + * + * @param streamsChanged the set of {@link SystemStream}s that have partition count changes + */ + void onSystemStreamPartitionChange(Set<SystemStream> streamsChanged); + } + /** * Gets the metadata for all the specified system streams from the provided metadata cache. @@ -88,14 +103,15 @@ public class StreamPartitionCountMonitor { * @param metadataCache the metadata cache which will be used to fetch metadata for partition counts. * @param metrics the metrics registry to which the metrics should be added. * @param monitorPeriodMs the period at which the monitor will run in milliseconds. + * @param monitorCallback the callback method to be invoked when partition count changes are detected */ public StreamPartitionCountMonitor(Set<SystemStream> streamsToMonitor, StreamMetadataCache metadataCache, - MetricsRegistryMap metrics, int monitorPeriodMs) { + MetricsRegistry metrics, int monitorPeriodMs, Callback monitorCallback) { this.streamsToMonitor = streamsToMonitor; this.metadataCache = metadataCache; - this.metrics = metrics; this.monitorPeriodMs = monitorPeriodMs; this.initialMetadata = getMetadata(streamsToMonitor, metadataCache); + this.callbackMethod = monitorCallback; // Pre-populate the gauges Map<SystemStream, Gauge<Integer>> mutableGauges = new HashMap<>(); @@ -109,48 +125,20 @@ public class StreamPartitionCountMonitor { } /** - * Fetches the current partition count for each system stream from the cache, compares the current count to the - * original count and updates the metric for that system stream with the delta. - */ - void updatePartitionCountMetric() { - try { - Map<SystemStream, SystemStreamMetadata> currentMetadata = getMetadata(streamsToMonitor, metadataCache); - - for (Map.Entry<SystemStream, SystemStreamMetadata> metadataEntry : initialMetadata.entrySet()) { - SystemStream systemStream = metadataEntry.getKey(); - SystemStreamMetadata metadata = metadataEntry.getValue(); - - int currentPartitionCount = currentMetadata.get(systemStream).getSystemStreamPartitionMetadata().keySet().size(); - int prevPartitionCount = metadata.getSystemStreamPartitionMetadata().keySet().size(); - - Gauge gauge = gauges.get(systemStream); - gauge.set(currentPartitionCount - prevPartitionCount); - } - } catch (Exception e) { - log.error("Exception while updating partition count metric.", e); - } - } - - /** - * For testing. Returns the metrics. - */ - Map<SystemStream, Gauge<Integer>> getGauges() { - return gauges; - } - - /** * Starts the monitor. */ public void start() { synchronized (lock) { switch (state) { case INIT: - schedulerService.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - updatePartitionCountMetric(); - } - }, monitorPeriodMs, monitorPeriodMs, TimeUnit.MILLISECONDS); + if (monitorPeriodMs > 0) { + schedulerService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + updatePartitionCountMetric(); + } + }, monitorPeriodMs, monitorPeriodMs, TimeUnit.MILLISECONDS); + } state = State.RUNNING; break; @@ -179,8 +167,53 @@ public class StreamPartitionCountMonitor { } /** - * For testing. + * Fetches the current partition count for each system stream from the cache, compares the current count to the + * original count and updates the metric for that system stream with the delta. + */ + @VisibleForTesting + public void updatePartitionCountMetric() { + try { + Map<SystemStream, SystemStreamMetadata> currentMetadata = getMetadata(streamsToMonitor, metadataCache); + Set<SystemStream> streamsChanged = new HashSet<>(); + + for (Map.Entry<SystemStream, SystemStreamMetadata> metadataEntry : initialMetadata.entrySet()) { + try { + SystemStream systemStream = metadataEntry.getKey(); + SystemStreamMetadata metadata = metadataEntry.getValue(); + + int currentPartitionCount = currentMetadata.get(systemStream).getSystemStreamPartitionMetadata().size(); + int prevPartitionCount = metadata.getSystemStreamPartitionMetadata().size(); + + Gauge gauge = gauges.get(systemStream); + gauge.set(currentPartitionCount - prevPartitionCount); + if (currentPartitionCount != prevPartitionCount) { + log.warn(String.format("Change of partition count detected in stream %s. old partition count: %d, current partition count: %d", + systemStream.toString(), prevPartitionCount, currentPartitionCount)); + streamsChanged.add(systemStream); + } + } catch (Exception e) { + log.error(String.format("Error comparing partition count differences for stream: %s", metadataEntry.getKey().toString())); + } + } + + if (!streamsChanged.isEmpty() && this.callbackMethod != null) { + this.callbackMethod.onSystemStreamPartitionChange(streamsChanged); + } + + } catch (Exception e) { + log.error("Exception while updating partition count metric.", e); + } + } + + /** + * For testing. Returns the metrics. */ + @VisibleForTesting + Map<SystemStream, Gauge<Integer>> getGauges() { + return gauges; + } + + @VisibleForTesting boolean isRunning() { return state == State.RUNNING; } @@ -191,6 +224,7 @@ public class StreamPartitionCountMonitor { * <p> * This is currently exposed at the package private level for tests only. */ + @VisibleForTesting boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { return schedulerService.awaitTermination(timeout, unit); } http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala index 1b3b893..083dbaf 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala @@ -137,8 +137,6 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging { } } - def getMonitorPartitionChange = getBoolean(JobConfig.MONITOR_PARTITION_CHANGE, false) - def getMonitorPartitionChangeFrequency = getInt( JobConfig.MONITOR_PARTITION_CHANGE_FREQUENCY_MS, JobConfig.DEFAULT_MONITOR_PARTITION_CHANGE_FREQUENCY_MS) http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala index 0e3d568..ad03e59 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala @@ -98,4 +98,12 @@ class StorageConfig(config: Config) extends ScalaMapConfig(config) with Logging .map(systemStreamName => Util.getSystemStreamFromNames(systemStreamName.get).getSystem) .contains(systemName) } + + /** + * Helper method to check if there is any stores configured w/ a changelog + */ + def hasDurableStores : Boolean = { + val conf = config.subset("stores.", true) + conf.asScala.keys.exists(k => k.endsWith(".changelog")) + } } http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala index 92c3663..e915a8a 100644 --- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala @@ -43,17 +43,10 @@ import org.apache.samza.job.model.JobModel import org.apache.samza.job.model.TaskModel import org.apache.samza.metrics.MetricsRegistryMap import org.apache.samza.storage.ChangelogPartitionManager -import org.apache.samza.system.ExtendedSystemAdmin -import org.apache.samza.system.StreamMetadataCache -import org.apache.samza.system.SystemFactory -import org.apache.samza.system.SystemStreamPartition -import org.apache.samza.system.SystemStreamPartitionMatcher -import org.apache.samza.system.SystemAdmin -import org.apache.samza.system.StreamSpec +import org.apache.samza.system._ import org.apache.samza.util.Logging import org.apache.samza.util.Util -import org.apache.samza.Partition -import org.apache.samza.SamzaException +import org.apache.samza.{Partition, PartitionChangeException, SamzaException} import scala.collection.JavaConverters._ @@ -110,22 +103,8 @@ object JobModelManager extends Logging { val systemAdmins = getSystemAdmins(config) val streamMetadataCache = new StreamMetadataCache(systemAdmins = systemAdmins, cacheTTLms = 0) - var streamPartitionCountMonitor: StreamPartitionCountMonitor = null - if (config.getMonitorPartitionChange) { - val extendedSystemAdmins = systemAdmins.filter{ - case (systemName, systemAdmin) => systemAdmin.isInstanceOf[ExtendedSystemAdmin] - } - val inputStreamsToMonitor = config.getInputStreams.filter(systemStream => extendedSystemAdmins.contains(systemStream.getSystem)) - if (inputStreamsToMonitor.nonEmpty) { - streamPartitionCountMonitor = new StreamPartitionCountMonitor( - inputStreamsToMonitor.asJava, - streamMetadataCache, - metricsRegistryMap, - config.getMonitorPartitionChangeFrequency) - } - } val previousChangelogPartitionMapping = changelogManager.readChangeLogPartitionMapping() - val jobModelManager = getJobModelManager(config, previousChangelogPartitionMapping, localityManager, streamMetadataCache, streamPartitionCountMonitor, null) + val jobModelManager = getJobModelManager(config, previousChangelogPartitionMapping, localityManager, streamMetadataCache, null) val jobModel = jobModelManager.jobModel // Save the changelog mapping back to the ChangelogPartitionmanager // newChangelogPartitionMapping is the merging of all current task:changelog @@ -145,6 +124,13 @@ object JobModelManager extends Logging { jobModelManager } + + /** + * This method creates a {@link JobModelManager} object w/o {@link StreamPartitionCountMonitor} + * + * @param coordinatorSystemConfig configuration for coordinator system + * @return a JobModelManager object + */ def apply(coordinatorSystemConfig: Config): JobModelManager = apply(coordinatorSystemConfig, new MetricsRegistryMap()) /** @@ -154,14 +140,13 @@ object JobModelManager extends Logging { changeLogMapping: util.Map[TaskName, Integer], localityManager: LocalityManager, streamMetadataCache: StreamMetadataCache, - streamPartitionCountMonitor: StreamPartitionCountMonitor, containerIds: java.util.List[String]) = { val jobModel: JobModel = readJobModel(config, changeLogMapping, localityManager, streamMetadataCache, containerIds) jobModelRef.set(jobModel) val server = new HttpServer server.addServlet("/", new JobServlet(jobModelRef)) - currentJobModelManager = new JobModelManager(jobModel, server, streamPartitionCountMonitor) + currentJobModelManager = new JobModelManager(jobModel, server) currentJobModelManager } @@ -337,7 +322,6 @@ object JobModelManager extends Logging { } private def getSystemNames(config: Config) = config.getSystemNames.toSet - } /** @@ -361,8 +345,7 @@ class JobModelManager( /** * HTTP server used to serve a Samza job's container model to SamzaContainers when they start up. */ - val server: HttpServer = null, - val streamPartitionCountMonitor: StreamPartitionCountMonitor = null) extends Logging { + val server: HttpServer = null) extends Logging { debug("Got job model: %s." format jobModel) @@ -370,10 +353,6 @@ class JobModelManager( if (server != null) { debug("Starting HTTP server.") server.start - if (streamPartitionCountMonitor != null) { - debug("Starting Stream Partition Count Monitor..") - streamPartitionCountMonitor.start() - } info("Started HTTP server: %s" format server.getUrl) } } @@ -381,10 +360,6 @@ class JobModelManager( def stop { if (server != null) { debug("Stopping HTTP server.") - if (streamPartitionCountMonitor != null) { - debug("Stopping Stream Partition Count Monitor..") - streamPartitionCountMonitor.stop() - } server.stop info("Stopped HTTP server.") } http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/main/scala/org/apache/samza/util/Util.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala index 46bdb75..cc2a097 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala @@ -218,7 +218,6 @@ object Util extends Logging { JobConfig.JOB_NAME -> jobName, JobConfig.JOB_ID -> jobId, JobConfig.JOB_COORDINATOR_SYSTEM -> config.getCoordinatorSystemName, - JobConfig.MONITOR_PARTITION_CHANGE -> String.valueOf(config.getMonitorPartitionChange), JobConfig.MONITOR_PARTITION_CHANGE_FREQUENCY_MS -> String.valueOf(config.getMonitorPartitionChangeFrequency)) new MapConfig(map.asJava) } http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManagerFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManagerFactory.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManagerFactory.java new file mode 100644 index 0000000..3a464c2 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManagerFactory.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.clustermanager; + +/** + * Mock {@link ResourceManagerFactory} used in unit tests + */ +public class MockClusterResourceManagerFactory implements ResourceManagerFactory { + + @Override + public ClusterResourceManager getClusterResourceManager(ClusterResourceManager.Callback callback, + SamzaApplicationState state) { + return new MockClusterResourceManager(callback); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerListener.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerListener.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerListener.java index db70c38..3987288 100644 --- a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerListener.java +++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerListener.java @@ -18,6 +18,7 @@ */ package org.apache.samza.clustermanager; + import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java new file mode 100644 index 0000000..264966d --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.clustermanager; + +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import org.apache.samza.Partition; +import org.apache.samza.config.Config; +import org.apache.samza.config.MapConfig; +import org.apache.samza.coordinator.StreamPartitionCountMonitor; +import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory; +import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer; +import org.apache.samza.metrics.MetricsRegistry; +import org.apache.samza.system.MockSystemFactory; +import org.apache.samza.system.SystemStreamPartition; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; + + +/** + * Tests for {@link ClusterBasedJobCoordinator} + */ +public class TestClusterBasedJobCoordinator { + + Map<String, String> configMap; + + @Before + public void setUp() throws NoSuchFieldException, NoSuchMethodException { + configMap = new HashMap<>(); + configMap.put("job.name", "test-job"); + configMap.put("job.coordinator.system", "kafka"); + configMap.put("task.inputs", "kafka.topic1"); + configMap.put("systems.kafka.samza.factory", "org.apache.samza.system.MockSystemFactory"); + configMap.put("samza.cluster-manager.factory", "org.apache.samza.clustermanager.MockClusterResourceManagerFactory"); + configMap.put("job.coordinator.monitor-partition-change.frequency.ms", "1"); + + MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("kafka", "topic1", new Partition(0)), new ArrayList<>()); + } + + @After + public void tearDown() { + MockSystemFactory.MSG_QUEUES.clear(); + } + + @Test + public void testPartitionCountMonitorWithDurableStates() + throws IllegalAccessException, InvocationTargetException, NoSuchMethodException { + configMap.put("stores.mystore.changelog", "mychangelog"); + Config config = new MapConfig(configMap); + + // mimic job runner code to write the config to coordinator stream + CoordinatorStreamSystemFactory coordinatorFactory = new CoordinatorStreamSystemFactory(); + CoordinatorStreamSystemProducer producer = coordinatorFactory.getCoordinatorStreamSystemProducer(config, mock(MetricsRegistry.class)); + producer.writeConfig("test-job", config); + + ClusterBasedJobCoordinator clusterCoordinator = new ClusterBasedJobCoordinator(config); + + // change the input system stream metadata + MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("kafka", "topic1", new Partition(1)), new ArrayList<>()); + + StreamPartitionCountMonitor monitor = clusterCoordinator.getPartitionMonitor(); + monitor.updatePartitionCountMetric(); + assertEquals(clusterCoordinator.getAppStatus(), SamzaApplicationState.SamzaAppStatus.FAILED); + } + + @Test + public void testPartitionCountMonitorWithoutDurableStates() throws IllegalAccessException, InvocationTargetException { + Config config = new MapConfig(configMap); + + // mimic job runner code to write the config to coordinator stream + CoordinatorStreamSystemFactory coordinatorFactory = new CoordinatorStreamSystemFactory(); + CoordinatorStreamSystemProducer producer = coordinatorFactory.getCoordinatorStreamSystemProducer(config, mock(MetricsRegistry.class)); + producer.writeConfig("test-job", config); + + ClusterBasedJobCoordinator clusterCoordinator = new ClusterBasedJobCoordinator(config); + + // change the input system stream metadata + MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("kafka", "topic1", new Partition(1)), new ArrayList<>()); + + StreamPartitionCountMonitor monitor = clusterCoordinator.getPartitionMonitor(); + monitor.updatePartitionCountMetric(); + assertEquals(clusterCoordinator.getAppStatus(), SamzaApplicationState.SamzaAppStatus.UNDEFINED); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java index 1e9d372..734043a 100644 --- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java +++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java @@ -73,16 +73,16 @@ public class TestContainerAllocator { private static Config getConfig() { Config config = new MapConfig(new HashMap<String, String>() { { - put("yarn.container.count", "1"); - put("systems.test-system.samza.factory", "org.apache.samza.job.yarn.MockSystemFactory"); - put("yarn.container.memory.mb", "512"); + put("cluster-manager.container.count", "1"); + put("cluster-manager.container.retry.count", "1"); + put("cluster-manager.container.retry.window.ms", "1999999999"); + put("cluster-manager.allocator.sleep.ms", "10"); + put("cluster-manager.container.memory.mb", "512"); put("yarn.package.path", "/foo"); put("task.inputs", "test-system.test-stream"); + put("systems.test-system.samza.factory", "org.apache.samza.system.MockSystemFactory"); put("systems.test-system.samza.key.serde", "org.apache.samza.serializers.JsonSerde"); put("systems.test-system.samza.msg.serde", "org.apache.samza.serializers.JsonSerde"); - put("yarn.container.retry.count", "1"); - put("yarn.container.retry.window.ms", "1999999999"); - put("yarn.allocator.sleep.ms", "10"); } }); http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java index 4288aea..e252b7d 100644 --- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java +++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java @@ -54,17 +54,17 @@ public class TestContainerProcessManager { private Map<String, String> configVals = new HashMap<String, String>() { { - put("yarn.container.count", "1"); - put("systems.test-system.samza.factory", "org.apache.samza.job.yarn.MockSystemFactory"); - put("yarn.container.memory.mb", "512"); + put("cluster-manager.container.count", "1"); + put("cluster-manager.container.retry.count", "1"); + put("cluster-manager.container.retry.window.ms", "1999999999"); + put("cluster-manager.allocator.sleep.ms", "1"); + put("cluster-manager.container.request.timeout.ms", "2"); + put("cluster-manager.container.memory.mb", "512"); put("yarn.package.path", "/foo"); put("task.inputs", "test-system.test-stream"); + put("systems.test-system.samza.factory", "org.apache.samza.system.MockSystemFactory"); put("systems.test-system.samza.key.serde", "org.apache.samza.serializers.JsonSerde"); put("systems.test-system.samza.msg.serde", "org.apache.samza.serializers.JsonSerde"); - put("yarn.container.retry.count", "1"); - put("yarn.container.retry.window.ms", "1999999999"); - put("yarn.allocator.sleep.ms", "1"); - put("yarn.container.request.timeout.ms", "2"); } }; private Config config = new MapConfig(configVals); @@ -117,8 +117,8 @@ public class TestContainerProcessManager { public void testContainerProcessManager() throws Exception { Map<String, String> conf = new HashMap<>(); conf.putAll(getConfig()); - conf.put("yarn.container.memory.mb", "500"); - conf.put("yarn.container.cpu.cores", "5"); + conf.put("cluster-manager.container.memory.mb", "500"); + conf.put("cluster-manager.container.cpu.cores", "5"); state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1)); ContainerProcessManager taskManager = new ContainerProcessManager( @@ -137,8 +137,8 @@ public class TestContainerProcessManager { conf.clear(); conf.putAll(getConfigWithHostAffinity()); - conf.put("yarn.container.memory.mb", "500"); - conf.put("yarn.container.cpu.cores", "5"); + conf.put("cluster-manager.container.memory.mb", "500"); + conf.put("cluster-manager.container.cpu.cores", "5"); state = new SamzaApplicationState(getJobModelManagerWithHostAffinity(1)); taskManager = new ContainerProcessManager( http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java index 32ec2d2..00198e9 100644 --- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java +++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java @@ -328,18 +328,18 @@ public class TestHostAwareContainerAllocator { private static Config getConfig() { Config config = new MapConfig(new HashMap<String, String>() { { - put("yarn.container.count", "1"); - put("systems.test-system.samza.factory", "org.apache.samza.job.yarn.MockSystemFactory"); - put("yarn.container.memory.mb", "512"); + put("cluster-manager.container.count", "1"); + put("cluster-manager.container.retry.count", "1"); + put("cluster-manager.container.retry.window.ms", "1999999999"); + put("cluster-manager.container.request.timeout.ms", "3"); + put("cluster-manager.allocator.sleep.ms", "1"); + put("cluster-manager.container.memory.mb", "512"); put("yarn.package.path", "/foo"); put("task.inputs", "test-system.test-stream"); + put("systems.test-system.samza.factory", "org.apache.samza.system.MockSystemFactory"); put("systems.test-system.samza.key.serde", "org.apache.samza.serializers.JsonSerde"); put("systems.test-system.samza.msg.serde", "org.apache.samza.serializers.JsonSerde"); - put("yarn.container.retry.count", "1"); - put("yarn.container.retry.window.ms", "1999999999"); - put("yarn.samza.host-affinity.enabled", "true"); - put("yarn.container.request.timeout.ms", "3"); - put("yarn.allocator.sleep.ms", "1"); + put("job.host-affinity.enabled", "true"); } }); http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/test/java/org/apache/samza/coordinator/JobModelManagerTestUtil.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/JobModelManagerTestUtil.java b/samza-core/src/test/java/org/apache/samza/coordinator/JobModelManagerTestUtil.java index b7514c4..6a69889 100644 --- a/samza-core/src/test/java/org/apache/samza/coordinator/JobModelManagerTestUtil.java +++ b/samza-core/src/test/java/org/apache/samza/coordinator/JobModelManagerTestUtil.java @@ -49,7 +49,7 @@ public class JobModelManagerTestUtil { containers.put(String.valueOf(i), container); } JobModel jobModel = new JobModel(config, containers, localityManager); - return new JobModelManager(jobModel, server, null); + return new JobModelManager(jobModel, server); } public static JobModelManager getJobModelManagerUsingReadModel(Config config, int containerCount, StreamMetadataCache streamMetadataCache, @@ -59,7 +59,7 @@ public class JobModelManagerTestUtil { containerIds.add(String.valueOf(i)); } JobModel jobModel = JobModelManager.readJobModel(config, new HashMap<>(), locManager, streamMetadataCache, containerIds); - return new JobModelManager(jobModel, server, null); + return new JobModelManager(jobModel, server); } http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java index 1d6fc65..3130ed6 100644 --- a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java +++ b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java @@ -85,16 +85,16 @@ public class TestJobModelManager { public void testLocalityMapWithHostAffinity() { Config config = new MapConfig(new HashMap<String, String>() { { - put("yarn.container.count", "1"); - put("systems.test-system.samza.factory", "org.apache.samza.job.yarn.MockSystemFactory"); - put("yarn.container.memory.mb", "512"); + put("cluster-manager.container.count", "1"); + put("cluster-manager.container.memory.mb", "512"); + put("cluster-manager.container.retry.count", "1"); + put("cluster-manager.container.retry.window.ms", "1999999999"); + put("cluster-manager.allocator.sleep.ms", "10"); put("yarn.package.path", "/foo"); put("task.inputs", "test-system.test-stream"); + put("systems.test-system.samza.factory", "org.apache.samza.system.MockSystemFactory"); put("systems.test-system.samza.key.serde", "org.apache.samza.serializers.JsonSerde"); put("systems.test-system.samza.msg.serde", "org.apache.samza.serializers.JsonSerde"); - put("yarn.container.retry.count", "1"); - put("yarn.container.retry.window.ms", "1999999999"); - put("yarn.allocator.sleep.ms", "10"); put("job.host-affinity.enabled", "true"); } }); @@ -111,16 +111,16 @@ public class TestJobModelManager { public void testLocalityMapWithoutHostAffinity() { Config config = new MapConfig(new HashMap<String, String>() { { - put("yarn.container.count", "1"); - put("systems.test-system.samza.factory", "org.apache.samza.job.yarn.MockSystemFactory"); - put("yarn.container.memory.mb", "512"); + put("cluster-manager.container.count", "1"); + put("cluster-manager.container.memory.mb", "512"); + put("cluster-manager.container.retry.count", "1"); + put("cluster-manager.container.retry.window.ms", "1999999999"); + put("cluster-manager.allocator.sleep.ms", "10"); put("yarn.package.path", "/foo"); put("task.inputs", "test-system.test-stream"); + put("systems.test-system.samza.factory", "org.apache.samza.system.MockSystemFactory"); put("systems.test-system.samza.key.serde", "org.apache.samza.serializers.JsonSerde"); put("systems.test-system.samza.msg.serde", "org.apache.samza.serializers.JsonSerde"); - put("yarn.container.retry.count", "1"); - put("yarn.container.retry.window.ms", "1999999999"); - put("yarn.allocator.sleep.ms", "10"); put("job.host-affinity.enabled", "false"); } }); http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/test/java/org/apache/samza/storage/MockSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/storage/MockSystemConsumer.java b/samza-core/src/test/java/org/apache/samza/storage/MockSystemConsumer.java deleted file mode 100644 index 07c4a24..0000000 --- a/samza-core/src/test/java/org/apache/samza/storage/MockSystemConsumer.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.storage; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.samza.system.IncomingMessageEnvelope; -import org.apache.samza.system.SystemConsumer; -import org.apache.samza.system.SystemStreamPartition; - -public class MockSystemConsumer implements SystemConsumer { - public static Map<SystemStreamPartition, List<IncomingMessageEnvelope>> messages = new HashMap<SystemStreamPartition, List<IncomingMessageEnvelope>>(); - private boolean flag = true; // flag to make sure the messages only are - // returned once - - @Override - public void start() {} - - @Override - public void stop() {} - - @Override - public void register(SystemStreamPartition systemStreamPartition, String offset) {} - - @Override - public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException { - if (flag) { - ArrayList<IncomingMessageEnvelope> list = new ArrayList<IncomingMessageEnvelope>(); - list.add(TestStorageRecovery.msg); - messages.put(TestStorageRecovery.ssp, list); - flag = false; - return messages; - } else { - messages.clear(); - return messages; - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/test/java/org/apache/samza/storage/MockSystemFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/storage/MockSystemFactory.java b/samza-core/src/test/java/org/apache/samza/storage/MockSystemFactory.java deleted file mode 100644 index 7abf82b..0000000 --- a/samza-core/src/test/java/org/apache/samza/storage/MockSystemFactory.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.storage; - -import org.apache.samza.config.Config; -import org.apache.samza.metrics.MetricsRegistry; -import org.apache.samza.system.SystemAdmin; -import org.apache.samza.system.SystemConsumer; -import org.apache.samza.system.SystemFactory; -import org.apache.samza.system.SystemProducer; - -public class MockSystemFactory implements SystemFactory { - - @Override - public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) { - return new MockSystemConsumer(); - } - - @Override - public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) { - return null; - } - - @Override - public SystemAdmin getAdmin(String systemName, Config config) { - return TestStorageRecovery.systemAdmin; - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java b/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java index 21d0150..7c1647e 100644 --- a/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java +++ b/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java @@ -19,34 +19,25 @@ package org.apache.samza.storage; -import java.util.Arrays; +import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; -import java.util.Set; import org.apache.samza.Partition; import org.apache.samza.config.Config; import org.apache.samza.config.MapConfig; import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory; import org.apache.samza.system.IncomingMessageEnvelope; -import org.apache.samza.system.SystemAdmin; -import org.apache.samza.system.SystemStreamMetadata; -import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata; +import org.apache.samza.system.MockSystemFactory; import org.apache.samza.system.SystemStreamPartition; import org.junit.After; import org.junit.Before; import org.junit.Test; import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; public class TestStorageRecovery { - public static SystemAdmin systemAdmin = null; public Config config = null; - public SystemStreamMetadata systemStreamMetadata = null; - public SystemStreamMetadata inputSystemStreamMetadata = null; private static final String SYSTEM_STREAM_NAME = "changelog"; private static final String INPUT_STREAM = "input"; private static final String STORE_NAME = "testStore"; @@ -57,16 +48,6 @@ public class TestStorageRecovery { public void setup() throws InterruptedException { putConfig(); putMetadata(); - - systemAdmin = mock(SystemAdmin.class); - - Set<String> set1 = new HashSet<String>(Arrays.asList(SYSTEM_STREAM_NAME)); - Set<String> set2 = new HashSet<String>(Arrays.asList(INPUT_STREAM)); - HashMap<String, SystemStreamMetadata> ssmMap = new HashMap<>(); - ssmMap.put(SYSTEM_STREAM_NAME, systemStreamMetadata); - ssmMap.put(INPUT_STREAM, inputSystemStreamMetadata); - when(systemAdmin.getSystemStreamMetadata(set1)).thenReturn(ssmMap); - when(systemAdmin.getSystemStreamMetadata(set2)).thenReturn(ssmMap); } @After @@ -106,15 +87,9 @@ public class TestStorageRecovery { } private void putMetadata() { - SystemStreamMetadata.SystemStreamPartitionMetadata sspm = new SystemStreamMetadata.SystemStreamPartitionMetadata("0", "1", "2"); - HashMap<Partition, SystemStreamPartitionMetadata> map = new HashMap<Partition, SystemStreamPartitionMetadata>(); - map.put(new Partition(0), sspm); - map.put(new Partition(1), sspm); - systemStreamMetadata = new SystemStreamMetadata(SYSTEM_STREAM_NAME, map); - - HashMap<Partition, SystemStreamPartitionMetadata> map1 = new HashMap<Partition, SystemStreamPartitionMetadata>(); - map1.put(new Partition(0), sspm); - map1.put(new Partition(1), sspm); - inputSystemStreamMetadata = new SystemStreamMetadata(INPUT_STREAM, map1); + MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("mockSystem", SYSTEM_STREAM_NAME, new Partition(0)), new ArrayList<IncomingMessageEnvelope>() { { this.add(msg); } }); + MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("mockSystem", SYSTEM_STREAM_NAME, new Partition(1)), new ArrayList<IncomingMessageEnvelope>() { { this.add(msg); } }); + MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("mockSystem", INPUT_STREAM, new Partition(0)), new ArrayList<>()); + MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("mockSystem", INPUT_STREAM, new Partition(1)), new ArrayList<>()); } } http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/test/java/org/apache/samza/system/MockSystemFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/system/MockSystemFactory.java b/samza-core/src/test/java/org/apache/samza/system/MockSystemFactory.java new file mode 100644 index 0000000..a9c57da --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/system/MockSystemFactory.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.samza.Partition; +import org.apache.samza.config.Config; +import org.apache.samza.metrics.MetricsRegistry; + + +/** + * A mock system backed by a set of in-memory queues. Used for testing w/o actual external messaging systems. + */ +public class MockSystemFactory implements SystemFactory { + + public static final Map<SystemStreamPartition, List<IncomingMessageEnvelope>> MSG_QUEUES = new ConcurrentHashMap<>(); + + public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) { + return new SystemConsumer() { + public void start() { + } + + public void stop() { + } + + public void register(SystemStreamPartition systemStreamPartition, String offset) { + MSG_QUEUES.putIfAbsent(systemStreamPartition, new ArrayList<>()); + } + + public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) { + Map<SystemStreamPartition, List<IncomingMessageEnvelope>> retQueues = new HashMap<>(); + systemStreamPartitions.forEach(ssp -> { + List<IncomingMessageEnvelope> msgs = MSG_QUEUES.get(ssp); + if (msgs == null) { + retQueues.put(ssp, new ArrayList<>()); + } else { + retQueues.put(ssp, MSG_QUEUES.remove(ssp)); + } + }); + return retQueues; + } + }; + } + + public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) { + return new SystemProducer() { + private final Random seed = new Random(System.currentTimeMillis()); + + @Override + public void start() { + + } + + @Override + public void stop() { + + } + + @Override + public void register(String source) { + } + + @Override + public void send(String source, OutgoingMessageEnvelope envelope) { + SystemStream systemStream = envelope.getSystemStream(); + List<SystemStreamPartition> sspForSystem = MSG_QUEUES.keySet().stream() + .filter(ssp -> ssp.getSystemStream().equals(systemStream)) + .collect(ArrayList::new, (l, ssp) -> l.add(ssp), (l1, l2) -> l1.addAll(l2)); + if (sspForSystem.isEmpty()) { + MSG_QUEUES.putIfAbsent(new SystemStreamPartition(systemStream, new Partition(0)), new ArrayList<>()); + sspForSystem.add(new SystemStreamPartition(systemStream, new Partition(0))); + } + int partitionCount = sspForSystem.size(); + int partitionId = envelope.getPartitionKey() == null ? + envelope.getKey() == null ? this.seed.nextInt(partitionCount) : envelope.getKey().hashCode() % partitionCount : + envelope.getPartitionKey().hashCode() % partitionCount; + SystemStreamPartition ssp = new SystemStreamPartition(envelope.getSystemStream(), new Partition(partitionId)); + List<IncomingMessageEnvelope> msgQueue = MSG_QUEUES.get(ssp); + msgQueue.add(new IncomingMessageEnvelope(ssp, null, envelope.getKey(), envelope.getMessage())); + } + + @Override + public void flush(String source) { + + } + }; + } + + public SystemAdmin getAdmin(String systemName, Config config) { + return new ExtendedSystemAdmin() { + + @Override + public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) { + return null; + } + + @Override + public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames) { + Map<String, SystemStreamMetadata> metadataMap = new HashMap<>(); + Map<String, Set<Partition>> partitionMap = MSG_QUEUES.entrySet() + .stream() + .filter(entry -> streamNames.contains(entry.getKey().getSystemStream().getStream())) + .map(e -> e.getKey()).<Map<String, Set<Partition>>>collect(HashMap::new, (m, ssp) -> { + if (m.get(ssp.getStream()) == null) { + m.put(ssp.getStream(), new HashSet<>()); + } + m.get(ssp.getStream()).add(ssp.getPartition()); + }, (m1, m2) -> { + m2.forEach((k, v) -> { + if (m1.get(k) == null) { + m1.put(k, v); + } else { + m1.get(k).addAll(v); + } + }); + }); + + partitionMap.forEach((k, v) -> { + Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetaMap = + v.stream().<Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata>>collect(HashMap::new, + (m, p) -> { + m.put(p, new SystemStreamMetadata.SystemStreamPartitionMetadata("", "", "")); + }, (m1, m2) -> m1.putAll(m2)); + + metadataMap.put(k, new SystemStreamMetadata(k, partitionMetaMap)); + }); + + return metadataMap; + } + + @Override + public Integer offsetComparator(String offset1, String offset2) { + return null; + } + + @Override + public Map<String, SystemStreamMetadata> getSystemStreamPartitionCounts(Set<String> streamNames, long cacheTTL) { + return getSystemStreamMetadata(streamNames); + } + + @Override + public String getNewestOffset(SystemStreamPartition ssp, Integer maxRetries) { + return null; + } + + @Override + public boolean createStream(StreamSpec streamSpec) { + return true; + } + + @Override + public void validateStream(StreamSpec streamSpec) throws StreamValidationException { + + } + }; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala index e6b148b..f092d75 100644 --- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala +++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala @@ -26,8 +26,7 @@ import org.apache.samza.job.local.ProcessJobFactory import org.apache.samza.job.local.ThreadJobFactory import org.apache.samza.serializers.model.SamzaObjectMapper import org.apache.samza.util.Util -import org.junit.After -import org.junit.Test +import org.junit.{After, Before, Test} import org.junit.Assert._ import scala.collection.JavaConverters._ @@ -36,10 +35,7 @@ import org.apache.samza.config.TaskConfig import org.apache.samza.config.SystemConfig import org.apache.samza.container.SamzaContainer import org.apache.samza.container.TaskName -import org.apache.samza.metrics.MetricsRegistry -import org.apache.samza.config.Config import org.apache.samza.system._ -import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata import org.apache.samza.Partition import org.apache.samza.SamzaException import org.apache.samza.job.model.JobModel @@ -126,12 +122,7 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester { val jobModelFromCoordinatorUrl = SamzaObjectMapper.getObjectMapper.readValue(Util.read(coordinator.server.getUrl), classOf[JobModel]) assertEquals(expectedJobModel, jobModelFromCoordinatorUrl) - // Check the status of Stream Partition Count Monitor - assertNotNull(coordinator.streamPartitionCountMonitor) - assertTrue(coordinator.streamPartitionCountMonitor.isRunning()) - coordinator.stop - assertFalse(coordinator.streamPartitionCountMonitor.isRunning()) } @Test @@ -288,54 +279,16 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester { }}.toMap } + @Before + def setUp() { + // setup the test stream metadata + MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("test", "stream1", new Partition(0)), new util.ArrayList[IncomingMessageEnvelope]()); + MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("test", "stream1", new Partition(1)), new util.ArrayList[IncomingMessageEnvelope]()); + MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("test", "stream1", new Partition(2)), new util.ArrayList[IncomingMessageEnvelope]()); + } @After def tearDown() = { MockCoordinatorStreamSystemFactory.disableMockConsumerCache() } -} - -class MockSystemFactory extends SystemFactory { - def getConsumer(systemName: String, config: Config, registry: MetricsRegistry) = new SystemConsumer { - def start() {} - def stop() {} - def register(systemStreamPartition: SystemStreamPartition, offset: String) {} - def poll(systemStreamPartitions: java.util.Set[SystemStreamPartition], timeout: Long) = new java.util.HashMap[SystemStreamPartition, java.util.List[IncomingMessageEnvelope]]() - } - def getProducer(systemName: String, config: Config, registry: MetricsRegistry) = null - def getAdmin(systemName: String, config: Config) = new MockSystemAdmin -} - -class MockSystemAdmin extends ExtendedSystemAdmin { - def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) = null - def getSystemStreamMetadata(streamNames: java.util.Set[String]): java.util.Map[String, SystemStreamMetadata] = { - assertEquals(1, streamNames.size) - val partitionMetadata = Map( - new Partition(0) -> new SystemStreamPartitionMetadata(null, null, null), - new Partition(1) -> new SystemStreamPartitionMetadata(null, null, null), - // Create a new Partition(2), which wasn't in the prior changelog mapping. - new Partition(2) -> new SystemStreamPartitionMetadata(null, null, null)) - Map(streamNames.asScala.toList.head -> new SystemStreamMetadata("foo", partitionMetadata.asJava)).asJava - } - - override def offsetComparator(offset1: String, offset2: String) = null - - override def getSystemStreamPartitionCounts(streamNames: util.Set[String], - cacheTTL: Long): util.Map[String, SystemStreamMetadata] = { - assertEquals(1, streamNames.size()) - val result = streamNames.asScala.map { - stream => - val partitionMetadata = Map( - new Partition(0) -> new SystemStreamPartitionMetadata("", "", ""), - new Partition(1) -> new SystemStreamPartitionMetadata("", "", ""), - new Partition(2) -> new SystemStreamPartitionMetadata("", "", "") - ) - stream -> new SystemStreamMetadata(stream, partitionMetadata.asJava) - }.toMap - result.asJava - } - - override def getNewestOffset(ssp: SystemStreamPartition, maxRetries: Integer) = null - - -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala index 7d15fd8..c7eab3b 100644 --- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala +++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala @@ -39,7 +39,7 @@ import scala.collection.JavaConverters._ class TestStreamPartitionCountMonitor extends AssertionsForJUnit with MockitoSugar { @Test - def testStreamPartitionCountMonitor(): Unit = { + def testStreamPartitionCountChange(): Unit = { val mockMetadataCache = mock[StreamMetadataCache] val inputSystemStream = new SystemStream("test-system", "test-stream") val inputSystemStreamSet = Set[SystemStream](inputSystemStream) @@ -70,11 +70,79 @@ class TestStreamPartitionCountMonitor extends AssertionsForJUnit with MockitoSug val metrics = new MetricsRegistryMap() + val mockCallback = mock[StreamPartitionCountMonitor.Callback] + + val partitionCountMonitor = new StreamPartitionCountMonitor( + inputSystemStreamSet.asJava, + mockMetadataCache, + metrics, + 5, + mockCallback + ) + + partitionCountMonitor.updatePartitionCountMetric() + + assertNotNull(partitionCountMonitor.getGauges().get(inputSystemStream)) + assertEquals(1, partitionCountMonitor.getGauges().get(inputSystemStream).getValue) + + assertNotNull(metrics.getGroup("job-coordinator")) + + val metricGroup = metrics.getGroup("job-coordinator") + assertTrue(metricGroup.get("test-system-test-stream-partitionCount").isInstanceOf[Gauge[Int]]) + assertEquals(1, metricGroup.get("test-system-test-stream-partitionCount").asInstanceOf[Gauge[Int]].getValue) + + verify(mockCallback, times(1)).onSystemStreamPartitionChange(any()) + + } + + @Test + def testStreamPartitionCountException(): Unit = { + val mockMetadataCache = mock[StreamMetadataCache] + val inputSystemStream = new SystemStream("test-system", "test-stream") + val inputExceptionStream = new SystemStream("test-system", "test-exception-stream") + val inputSystemStreamSet = Set[SystemStream](inputSystemStream, inputExceptionStream) + + val initialPartitionMetadata = new java.util.HashMap[Partition, SystemStreamPartitionMetadata]() { + { + put(new Partition(0), new SystemStreamPartitionMetadata("", "", "")) + put(new Partition(1), new SystemStreamPartitionMetadata("", "", "")) + } + } + + val finalPartitionMetadata = new java.util.HashMap[Partition, SystemStreamPartitionMetadata]() { + { + putAll(initialPartitionMetadata) + put(new Partition(2), new SystemStreamPartitionMetadata("", "", "")) + } + } + val streamMockMetadata = mock[java.util.HashMap[Partition, SystemStreamPartitionMetadata]] + + val initialMetadata = Map( + inputExceptionStream -> new SystemStreamMetadata(inputExceptionStream.getStream, initialPartitionMetadata), + inputSystemStream -> new SystemStreamMetadata(inputSystemStream.getStream, initialPartitionMetadata) + ) + val finalMetadata = Map( + inputExceptionStream -> new SystemStreamMetadata(inputExceptionStream.getStream, streamMockMetadata), + inputSystemStream -> new SystemStreamMetadata(inputSystemStream.getStream, finalPartitionMetadata) + ) + + when(mockMetadataCache.getStreamMetadata(any(classOf[Set[SystemStream]]), Matchers.eq(true))) + .thenReturn(initialMetadata) // Called during StreamPartitionCountMonitor instantiation + .thenReturn(finalMetadata) // Called from monitor thread the second time + + // make the call to get stream metadata for {@code inputExceptionStream} fail w/ a runtime exception + when(streamMockMetadata.keySet()).thenThrow(new RuntimeException) + + val metrics = new MetricsRegistryMap() + + val mockCallback = mock[StreamPartitionCountMonitor.Callback] + val partitionCountMonitor = new StreamPartitionCountMonitor( inputSystemStreamSet.asJava, mockMetadataCache, metrics, - 5 + 5, + mockCallback ) partitionCountMonitor.updatePartitionCountMetric() @@ -87,6 +155,10 @@ class TestStreamPartitionCountMonitor extends AssertionsForJUnit with MockitoSug val metricGroup = metrics.getGroup("job-coordinator") assertTrue(metricGroup.get("test-system-test-stream-partitionCount").isInstanceOf[Gauge[Int]]) assertEquals(1, metricGroup.get("test-system-test-stream-partitionCount").asInstanceOf[Gauge[Int]].getValue) + + // Make sure as long as one of the input stream topic partition change is detected, the callback is invoked + verify(mockCallback, times(1)).onSystemStreamPartitionChange(any()) + } @Test @@ -98,7 +170,8 @@ class TestStreamPartitionCountMonitor extends AssertionsForJUnit with MockitoSug inputSystemStreamSet.asJava, mockMetadataCache, new MetricsRegistryMap(), - 50 + 50, + null ) assertFalse(monitor.isRunning()) @@ -143,7 +216,8 @@ class TestStreamPartitionCountMonitor extends AssertionsForJUnit with MockitoSug inputSystemStreamSet.asJava, mockMetadataCache, new MetricsRegistryMap(), - 50 + 50, + null ) { override def updatePartitionCountMetric(): Unit = { sampleCount.countDown() http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/test/scala/org/apache/samza/system/TestRangeSystemStreamPartitionMatcher.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/system/TestRangeSystemStreamPartitionMatcher.scala b/samza-core/src/test/scala/org/apache/samza/system/TestRangeSystemStreamPartitionMatcher.scala index a6d82e1..3d385d6 100644 --- a/samza-core/src/test/scala/org/apache/samza/system/TestRangeSystemStreamPartitionMatcher.scala +++ b/samza-core/src/test/scala/org/apache/samza/system/TestRangeSystemStreamPartitionMatcher.scala @@ -20,7 +20,6 @@ package org.apache.samza.system import org.apache.samza.checkpoint.TestCheckpointTool.MockCheckpointManagerFactory import org.apache.samza.config._ -import org.apache.samza.coordinator.MockSystemFactory import org.apache.samza.job.local.ThreadJobFactory import org.apache.samza.{Partition, SamzaException} import org.junit.Assert._ http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/test/scala/org/apache/samza/system/TestRegexSystemStreamPartitionMatcher.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/system/TestRegexSystemStreamPartitionMatcher.scala b/samza-core/src/test/scala/org/apache/samza/system/TestRegexSystemStreamPartitionMatcher.scala index b7f2119..255c85e 100644 --- a/samza-core/src/test/scala/org/apache/samza/system/TestRegexSystemStreamPartitionMatcher.scala +++ b/samza-core/src/test/scala/org/apache/samza/system/TestRegexSystemStreamPartitionMatcher.scala @@ -20,7 +20,6 @@ package org.apache.samza.system import org.apache.samza.checkpoint.TestCheckpointTool.MockCheckpointManagerFactory import org.apache.samza.config._ -import org.apache.samza.coordinator.MockSystemFactory import org.apache.samza.job.local.ThreadJobFactory import org.apache.samza.{Partition, SamzaException} import org.junit.Assert._