Repository: samza Updated Branches: refs/heads/master 08cfad990 -> 93b397e84
SAMZA-1730: Adding state valiations in StreamProcessor before any lifecycle operation and group coordination. Author: Shanthoosh Venkataraman <santhoshvenkat1...@gmail.com> Reviewers: Jagadish<jagad...@apache.org> Closes #535 from shanthoosh/abced Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/93b397e8 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/93b397e8 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/93b397e8 Branch: refs/heads/master Commit: 93b397e84056f311d238be72181b2df60cccb6c0 Parents: 08cfad9 Author: Shanthoosh Venkataraman <santhoshvenkat1...@gmail.com> Authored: Tue Jul 10 16:34:58 2018 -0700 Committer: Jagadish <jvenkatra...@linkedin.com> Committed: Tue Jul 10 16:34:58 2018 -0700 ---------------------------------------------------------------------- .../samza/container/SamzaContainerListener.java | 9 +- .../apache/samza/processor/StreamProcessor.java | 281 ++++++++++++------- .../samza/runtime/LocalContainerRunner.java | 2 +- .../standalone/PassthroughJobCoordinator.java | 1 + .../apache/samza/container/SamzaContainer.scala | 16 +- .../samza/job/local/ThreadJobFactory.scala | 2 +- .../samza/processor/TestStreamProcessor.java | 162 ++++++++++- .../samza/container/TestSamzaContainer.scala | 10 +- .../samza/processor/TestZkStreamProcessor.java | 4 +- .../TestZkStreamProcessorFailures.java | 4 +- .../processor/TestZkStreamProcessorSession.java | 4 +- .../processor/TestZkLocalApplicationRunner.java | 145 +++++++--- 12 files changed, 456 insertions(+), 184 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/93b397e8/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java b/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java index a9c3b2c..fe8bc66 100644 --- a/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java +++ b/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java @@ -37,19 +37,16 @@ public interface SamzaContainerListener { * <br> * <b>Note</b>: This will be the last call after completely shutting down the SamzaContainer without any * exceptions/errors. - * @param pausedByJm boolean indicating why the container was stopped. It should be {@literal true}, iff the container - * was stopped as a result of an expired {@link org.apache.samza.job.model.JobModel}. Otherwise, - * it should be {@literal false} */ - void onContainerStop(boolean pausedByJm); + void onContainerStop(); /** * Method invoked when the {@link org.apache.samza.container.SamzaContainer} has transitioned to * {@link org.apache.samza.SamzaContainerStatus#FAILED} state. Details on state transitions can be found in * {@link org.apache.samza.SamzaContainerStatus} * <br> - * <b>Note</b>: {@link #onContainerFailed(Throwable)} is mutually exclusive to {@link #onContainerStop(boolean)}. - * @param t Throwable that caused the container failure. + * <b>Note</b>: {@link #onContainerFailed(Throwable)} is mutually exclusive to {@link #onContainerStop()}. + * @param t Throwable that caused the container failure. */ void onContainerFailed(Throwable t); } http://git-wip-us.apache.org/repos/asf/samza/blob/93b397e8/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java index 73f32e7..22550d5 100644 --- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java +++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java @@ -19,6 +19,8 @@ package org.apache.samza.processor; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -26,7 +28,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import org.apache.samza.SamzaContainerStatus; import org.apache.samza.annotation.InterfaceStability; import org.apache.samza.config.Config; import org.apache.samza.config.JobCoordinatorConfig; @@ -49,10 +50,44 @@ import org.slf4j.LoggerFactory; /** * StreamProcessor can be embedded in any application or executed in a distributed environment (aka cluster) as an * independent process. + * * <p> * * <b>Note</b>: A single JVM can create multiple StreamProcessor instances. It is safe to create StreamProcessor instances in - * multiple threads. + * multiple threads. This class is thread safe. + * + * </p> + * + * <pre> + * A StreamProcessor could be in any one of the following states: + * NEW, STARTED, IN_REBALANCE, RUNNING, STOPPING, STOPPED. + * + * Describes the valid state transitions of the {@link StreamProcessor}. + * + * + * ââââââââââââââââââââââââââââââââ + * â â + * â â + * â â + * â â + * New StreamProcessor.start() Rebalance triggered V Receives JobModel â + * StreamProcessor ââââââââââⶠNEW âââââââââââââââââââââââââââⶠSTARTED ââââââââââââââââââⶠIN_REBALANCE âââââââââââââââââââââⶠRUNNING + * Creation â â by group leader â and starts Container â + * â â â â + * StreâamProcessor.stop() StreâamProcessor.stop() StreâamProcessor.stop() StreâamProcessor.stop() + * â â â â + * â â â â + * â â â â + * V V V V + * âââââââââââââââââââââââââââⶠSTOPPING Dââââââââââââââââââââââââââââââââââââââââââââââââââââââââââ + * â + * â + * After JobCoordinator and SamzaContainer had shutdown. + * â + * V + * STOPPED + * + * </pre> */ @InterfaceStability.Evolving public class StreamProcessor { @@ -69,31 +104,59 @@ public class StreamProcessor { private final ExecutorService executorService; private final Object lock = new Object(); - private SamzaContainer container = null; private Throwable containerException = null; private boolean processorOnStartCalled = false; - // Latch used to synchronize between the JobCoordinator thread and the container thread, when the container is - // stopped due to re-balancing - volatile CountDownLatch jcContainerShutdownLatch; + volatile CountDownLatch containerShutdownLatch = new CountDownLatch(1); + + /** + * Indicates the current status of a {@link StreamProcessor}. + */ + public enum State { + STARTED("STARTED"), RUNNING("RUNNING"), STOPPING("STOPPING"), STOPPED("STOPPED"), NEW("NEW"), IN_REBALANCE("IN_REBALANCE"); + + private String strVal; + + State(String strVal) { + this.strVal = strVal; + } + + @Override + public String toString() { + return strVal; + } + } + + /** + * @return the current state of StreamProcessor. + */ + public State getState() { + return state; + } + + @VisibleForTesting + State state = State.NEW; + + @VisibleForTesting + SamzaContainer container = null; @VisibleForTesting JobCoordinatorListener jobCoordinatorListener = null; /** - * Create an instance of StreamProcessor that encapsulates a JobCoordinator and Samza Container + * StreamProcessor encapsulates and manages the lifecycle of {@link JobCoordinator} and {@link SamzaContainer}. + * * <p> - * JobCoordinator controls how the various StreamProcessor instances belonging to a job coordinate. It is also - * responsible generating and updating JobModel. - * When StreamProcessor starts, it starts the JobCoordinator and brings up a SamzaContainer based on the JobModel. - * SamzaContainer is executed using an ExecutorService. + * On startup, StreamProcessor starts the JobCoordinator. Schedules the SamzaContainer to run in a ExecutorService + * when it receives new {@link JobModel} from JobCoordinator. * <p> - * <b>Note:</b> Lifecycle of the ExecutorService is fully managed by the StreamProcessor, and NOT exposed to the user * - * @param config Instance of config object - contains all configuration required for processing - * @param customMetricsReporters Map of custom MetricReporter instances that are to be injected in the Samza job + * <b>Note:</b> Lifecycle of the ExecutorService is fully managed by the StreamProcessor. + * + * @param config configuration required to launch {@link JobCoordinator} and {@link SamzaContainer}. + * @param customMetricsReporters metricReporter instances that will be used by SamzaContainer and JobCoordinator to report metrics. * @param asyncStreamTaskFactory The {@link AsyncStreamTaskFactory} to be used for creating task instances. - * @param processorListener listener to the StreamProcessor life cycle + * @param processorListener listener to the StreamProcessor life cycle. */ public StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters, AsyncStreamTaskFactory asyncStreamTaskFactory, StreamProcessorLifecycleListener processorListener) { @@ -101,7 +164,7 @@ public class StreamProcessor { } /** - *Same as {@link #StreamProcessor(Config, Map, AsyncStreamTaskFactory, StreamProcessorLifecycleListener)}, except task + * Same as {@link StreamProcessor(Config, Map, AsyncStreamTaskFactory, StreamProcessorLifecycleListener)}, except task * instances are created using the provided {@link StreamTaskFactory}. * @param config - config * @param customMetricsReporters metric Reporter @@ -114,7 +177,7 @@ public class StreamProcessor { } /* package private */ - JobCoordinator getJobCoordinator() { + private JobCoordinator getJobCoordinator() { String jobCoordinatorFactoryClassName = new JobCoordinatorConfig(config).getJobCoordinatorFactoryClassName(); return Util.getObj(jobCoordinatorFactoryClassName, JobCoordinatorFactory.class).getJobCoordinator(config); } @@ -126,6 +189,7 @@ public class StreamProcessor { StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters, Object taskFactory, StreamProcessorLifecycleListener processorListener, JobCoordinator jobCoordinator) { + Preconditions.checkNotNull(processorListener, "ProcessorListener cannot be null."); this.taskFactory = taskFactory; this.config = config; this.taskShutdownMs = new TaskConfigJava(config).getShutdownMs(); @@ -148,7 +212,14 @@ public class StreamProcessor { * </p> */ public void start() { - jobCoordinator.start(); + synchronized (lock) { + if (state == State.NEW) { + state = State.STARTED; + jobCoordinator.start(); + } else { + LOGGER.info("Start is no-op, since the current state is {} and not {}.", state, State.NEW); + } + } } /** @@ -156,7 +227,7 @@ public class StreamProcessor { * Asynchronously stops the {@link StreamProcessor}'s running components - {@link SamzaContainer} * and {@link JobCoordinator} * </p> - * There are multiple ways in which the StreamProcessor stops: + * Here're the ways which can stop the StreamProcessor: * <ol> * <li>Caller of StreamProcessor invokes stop()</li> * <li>Samza Container completes processing (eg. bounded input) and shuts down</li> @@ -168,7 +239,7 @@ public class StreamProcessor { * <br> * If container is running, * <ol> - * <li>container is shutdown cleanly and {@link SamzaContainerListener#onContainerStop(boolean)} will trigger + * <li>container is shutdown cleanly and {@link SamzaContainerListener#onContainerStop()} will trigger * {@link JobCoordinator#stop()}</li> * <li>container fails to shutdown cleanly and {@link SamzaContainerListener#onContainerFailed(Throwable)} will * trigger {@link JobCoordinator#stop()}</li> @@ -178,20 +249,22 @@ public class StreamProcessor { */ public void stop() { synchronized (lock) { - boolean containerShutdownInvoked = false; - if (container != null) { + if (state != State.STOPPING && state != State.STOPPED) { + state = State.STOPPING; try { LOGGER.info("Shutting down the container: {} of stream processor: {}.", container, processorId); - container.shutdown(); - containerShutdownInvoked = true; - } catch (Exception exception) { - LOGGER.error(String.format("Ignoring the exception during the shutdown of container: %s.", container), exception); + boolean hasContainerShutdown = stopSamzaContainer(); + if (!hasContainerShutdown) { + LOGGER.info("Interrupting the container: {} thread to die.", container); + executorService.shutdownNow(); + } + } catch (Throwable throwable) { + LOGGER.error(String.format("Exception occurred on container: %s shutdown of stream processor: %s.", container, processorId), throwable); } - } - - if (!containerShutdownInvoked) { - LOGGER.info("Shutting down JobCoordinator from StreamProcessor"); + LOGGER.info("Shutting down JobCoordinator of stream processor: {}.", processorId); jobCoordinator.stop(); + } else { + LOGGER.info("StreamProcessor state is: {}. Ignoring the stop.", state); } } } @@ -200,44 +273,51 @@ public class StreamProcessor { return SamzaContainer.apply(processorId, jobModel, config, ScalaJavaUtil.toScalaMap(customMetricsReporter), taskFactory); } - JobCoordinatorListener createJobCoordinatorListener() { + /** + * Stops the {@link SamzaContainer}. + * @return true if {@link SamzaContainer} had shutdown within task.shutdown.ms. false otherwise. + */ + private boolean stopSamzaContainer() { + boolean hasContainerShutdown = true; + if (container != null) { + if (!container.hasStopped()) { + try { + container.shutdown(); + LOGGER.info("Waiting {} ms for the container: {} to shutdown.", taskShutdownMs, container); + hasContainerShutdown = containerShutdownLatch.await(taskShutdownMs, TimeUnit.MILLISECONDS); + } catch (IllegalContainerStateException icse) { + LOGGER.info(String.format("Cannot shutdown container: %s for stream processor: %s. Container is not running.", container, processorId), icse); + } catch (Exception e) { + LOGGER.error("Exception occurred when shutting down the container: {}.", container, e); + hasContainerShutdown = false; + } + LOGGER.info(String.format("Shutdown status of container: %s for stream processor: %s is: %b.", container, processorId, hasContainerShutdown)); + } else { + LOGGER.info("Container is not instantiated for stream processor: {}.", processorId); + } + } + return hasContainerShutdown; + } + + private JobCoordinatorListener createJobCoordinatorListener() { return new JobCoordinatorListener() { @Override public void onJobModelExpired() { synchronized (lock) { - if (container != null) { - SamzaContainerStatus status = container.getStatus(); - if (SamzaContainerStatus.NOT_STARTED.equals(status) || SamzaContainerStatus.STARTED.equals(status)) { - boolean shutdownComplete = false; - try { - LOGGER.info("Job model expired. Shutting down the container: {} of stream processor: {}.", container, - processorId); - container.pause(); - shutdownComplete = jcContainerShutdownLatch.await(taskShutdownMs, TimeUnit.MILLISECONDS); - LOGGER.info(String.format("Shutdown status of container: %s for stream processor: %s is: %s.", container, processorId, shutdownComplete)); - } catch (IllegalContainerStateException icse) { - // Ignored since container is not running - LOGGER.info(String.format("Cannot shutdown container: %s for stream processor: %s. Container is not running.", container, processorId), icse); - shutdownComplete = true; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOGGER.warn(String.format("Shutdown of container: %s for stream processor: %s was interrupted", container, processorId), e); - } catch (Exception e) { - LOGGER.error("Exception occurred when shutting down the container: {}.", container, e); - } - if (!shutdownComplete) { - LOGGER.warn("Container: {} shutdown was unsuccessful. Stopping the stream processor: {}.", container, processorId); - container = null; - stop(); - } else { - LOGGER.info("Container: {} shutdown completed for stream processor: {}.", container, processorId); - } + if (state == State.STARTED || state == State.RUNNING) { + state = State.IN_REBALANCE; + LOGGER.info("Job model expired. Shutting down the container: {} of stream processor: {}.", container, processorId); + boolean hasContainerShutdown = stopSamzaContainer(); + if (!hasContainerShutdown) { + LOGGER.warn("Container: {} shutdown was unsuccessful. Stopping the stream processor: {}.", container, processorId); + state = State.STOPPING; + jobCoordinator.stop(); } else { - LOGGER.info("Container: {} of the stream processor: {} is not running.", container, processorId); + LOGGER.info("Container: {} shutdown completed for stream processor: {}.", container, processorId); } } else { - LOGGER.info("Container is not instantiated for stream processor: {}.", processorId); + LOGGER.info("Ignoring onJobModelExpired invocation since the current state is {} and not in {}.", state, ImmutableList.of(State.RUNNING, State.STARTED)); } } } @@ -245,35 +325,42 @@ public class StreamProcessor { @Override public void onNewJobModel(String processorId, JobModel jobModel) { synchronized (lock) { - jcContainerShutdownLatch = new CountDownLatch(1); - container = createSamzaContainer(processorId, jobModel); - container.setContainerListener(new ContainerListener()); - LOGGER.info("Starting the container: {} for the stream processor: {}.", container, processorId); - executorService.submit(container::run); + if (state == State.IN_REBALANCE) { + containerShutdownLatch = new CountDownLatch(1); + container = createSamzaContainer(processorId, jobModel); + container.setContainerListener(new ContainerListener()); + LOGGER.info("Starting the container: {} for the stream processor: {}.", container, processorId); + executorService.submit(container); + } else { + LOGGER.info("Ignoring onNewJobModel invocation since the current state is {} and not {}.", state, State.IN_REBALANCE); + } } } @Override public void onCoordinatorStop() { - if (executorService != null) { + synchronized (lock) { LOGGER.info("Shutting down the executor service of the stream processor: {}.", processorId); + stopSamzaContainer(); executorService.shutdownNow(); + state = State.STOPPED; } - if (processorListener != null) { - if (containerException != null) - processorListener.onFailure(containerException); - else - processorListener.onShutdown(); - } + if (containerException != null) + processorListener.onFailure(containerException); + else + processorListener.onShutdown(); + } @Override public void onCoordinatorFailure(Throwable throwable) { - LOGGER.info(String.format("Coordinator: %s failed with an exception. Stopping the stream processor: %s. Original exception:", jobCoordinator, processorId), throwable); - stop(); - if (processorListener != null) { - processorListener.onFailure(throwable); + synchronized (lock) { + LOGGER.info(String.format("Coordinator: %s failed with an exception. Stopping the stream processor: %s. Original exception:", jobCoordinator, processorId), throwable); + stopSamzaContainer(); + executorService.shutdownNow(); + state = State.STOPPED; } + processorListener.onFailure(throwable); } }; } @@ -287,46 +374,36 @@ public class StreamProcessor { @Override public void onContainerStart() { + LOGGER.warn("Received container start notification for container: {} in stream processor: {}.", container, processorId); if (!processorOnStartCalled) { - // processorListener is called on start only the first time the container starts. - // It is not called after every re-balance of partitions among the processors + processorListener.onStart(); processorOnStartCalled = true; - if (processorListener != null) { - processorListener.onStart(); - } - } else { - LOGGER.warn("Received duplicate container start notification for container: {} in stream processor: {}.", container, processorId); } + state = State.RUNNING; } @Override - public void onContainerStop(boolean pauseByJm) { - if (pauseByJm) { - LOGGER.info("Container: {} of the stream processor: {} was stopped by the JobCoordinator.", container, processorId); - if (jcContainerShutdownLatch != null) { - jcContainerShutdownLatch.countDown(); - } - } else { // sp.stop was called or container stopped by itself - LOGGER.info("Container: {} stopped. Stopping the stream processor: {}.", container, processorId); - synchronized (lock) { - container = null; // this guarantees that stop() doesn't try to stop container again - stop(); + public void onContainerStop() { + containerShutdownLatch.countDown(); + synchronized (lock) { + if (state == State.IN_REBALANCE) { + LOGGER.info("Container: {} of the stream processor: {} was stopped by the JobCoordinator.", container, processorId); + } else { + LOGGER.info("Container: {} stopped. Stopping the stream processor: {}.", container, processorId); + state = State.STOPPING; + jobCoordinator.stop(); } } } @Override public void onContainerFailed(Throwable t) { - if (jcContainerShutdownLatch != null) { - jcContainerShutdownLatch.countDown(); - } else { - LOGGER.warn("JobCoordinatorLatch was null. It is possible for some component to be waiting."); - } + containerShutdownLatch.countDown(); synchronized (lock) { - containerException = t; LOGGER.error(String.format("Container: %s failed with an exception. Stopping the stream processor: %s. Original exception:", container, processorId), containerException); - container = null; - stop(); + state = State.STOPPING; + containerException = t; + jobCoordinator.stop(); } } } http://git-wip-us.apache.org/repos/asf/samza/blob/93b397e8/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java index 66176d7..e6e622d 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java @@ -87,7 +87,7 @@ public class LocalContainerRunner extends AbstractApplicationRunner { } @Override - public void onContainerStop(boolean invokedExternally) { + public void onContainerStop() { log.info("Container Stopped"); } http://git-wip-us.apache.org/repos/asf/samza/blob/93b397e8/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java index 01ee84e..228617a 100644 --- a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java @@ -89,6 +89,7 @@ public class PassthroughJobCoordinator implements JobCoordinator { } if (jobModel != null && jobModel.getContainers().containsKey(processorId)) { if (coordinatorListener != null) { + coordinatorListener.onJobModelExpired(); coordinatorListener.onNewJobModel(processorId, jobModel); } } else { http://git-wip-us.apache.org/repos/asf/samza/blob/93b397e8/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index be0fb26..89278ad 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -736,7 +736,6 @@ class SamzaContainer( @volatile private var status = SamzaContainerStatus.NOT_STARTED private var exceptionSeen: Throwable = null - private var paused: Boolean = false private var containerListener: SamzaContainerListener = null def getStatus(): SamzaContainerStatus = status @@ -747,6 +746,8 @@ class SamzaContainer( containerListener = listener } + def hasStopped(): Boolean = status == SamzaContainerStatus.STOPPED || status == SamzaContainerStatus.FAILED + def run { try { info("Starting container.") @@ -824,7 +825,7 @@ class SamzaContainer( status match { case SamzaContainerStatus.STOPPED => if (containerListener != null) { - containerListener.onContainerStop(paused) + containerListener.onContainerStop() } case SamzaContainerStatus.FAILED => if (containerListener != null) { @@ -833,17 +834,6 @@ class SamzaContainer( } } - // TODO: We want to introduce a "PAUSED" state for SamzaContainer in the future so that StreamProcessor can pause and - // unpause the container when the jobmodel changes. - /** - * Marks the [[SamzaContainer]] as being paused by the called due to a change in [[JobModel]] and then, asynchronously - * shuts down this [[SamzaContainer]] - */ - def pause(): Unit = { - paused = true - shutdown() - } - /** * <p> * Asynchronously shuts down this [[SamzaContainer]] http://git-wip-us.apache.org/repos/asf/samza/blob/93b397e8/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala index 029b375..7b83874 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala @@ -94,7 +94,7 @@ class ThreadJobFactory extends StreamJobFactory with Logging { throw t } - override def onContainerStop(pausedOrNot: Boolean): Unit = { + override def onContainerStop(): Unit = { } override def onContainerStart(): Unit = { http://git-wip-us.apache.org/repos/asf/samza/blob/93b397e8/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java index fc1259c..052aa29 100644 --- a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java +++ b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java @@ -18,6 +18,7 @@ */ package org.apache.samza.processor; +import com.google.common.collect.ImmutableMap; import org.apache.samza.SamzaContainerStatus; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; @@ -28,13 +29,13 @@ import org.apache.samza.coordinator.JobCoordinator; import org.apache.samza.job.model.ContainerModel; import org.apache.samza.job.model.JobModel; import org.apache.samza.metrics.MetricsReporter; +import org.apache.samza.processor.StreamProcessor.State; import org.apache.samza.task.StreamTask; import org.apache.samza.task.StreamTaskFactory; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; - import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -42,7 +43,8 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; - +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; @@ -57,6 +59,7 @@ public class TestStreamProcessor { @Before public void before() { + Mockito.reset(); processorListenerState = new ConcurrentHashMap<ListenerCallback, Boolean>() { { put(ListenerCallback.ON_START, false); @@ -103,12 +106,12 @@ public class TestStreamProcessor { return null; }).when(mockRunLoop).run(); - doAnswer(invocation -> + Mockito.doAnswer(invocation -> { containerStop.countDown(); return null; }).when(mockRunLoop).shutdown(); - container = StreamProcessorTestUtils.getDummyContainer(mockRunLoop, mock(StreamTask.class)); + container = StreamProcessorTestUtils.getDummyContainer(mockRunLoop, Mockito.mock(StreamTask.class)); } return container; } @@ -166,6 +169,7 @@ public class TestStreamProcessor { final Thread jcThread = new Thread(() -> { try { + processor.jobCoordinatorListener.onJobModelExpired(); processor.jobCoordinatorListener.onNewJobModel("1", getMockJobModel()); coordinatorStop.await(); processor.jobCoordinatorListener.onCoordinatorStop(); @@ -215,7 +219,7 @@ public class TestStreamProcessor { */ @Test public void testContainerFailureCorrectlyStopsProcessor() throws InterruptedException { - JobCoordinator mockJobCoordinator = mock(JobCoordinator.class); + JobCoordinator mockJobCoordinator = Mockito.mock(JobCoordinator.class); Throwable expectedThrowable = new SamzaException("Failure in Container!"); AtomicReference<Throwable> actualThrowable = new AtomicReference<>(); final CountDownLatch runLoopStartedLatch = new CountDownLatch(1); @@ -271,6 +275,7 @@ public class TestStreamProcessor { new Thread(() -> { try { + processor.jobCoordinatorListener.onJobModelExpired(); processor.jobCoordinatorListener.onNewJobModel("1", getMockJobModel()); coordinatorStop.await(); processor.jobCoordinatorListener.onCoordinatorStop(); @@ -296,9 +301,146 @@ public class TestStreamProcessor { Assert.assertTrue(processorListenerState.get(ListenerCallback.ON_FAILURE)); } - // TODO: - // Test multiple start / stop and its ordering - // test onNewJobModel - // test onJobModelExpiry - // test Coordinator failure - correctly shutsdown the streamprocessor + @Test + public void testStartOperationShouldBeIdempotent() { + JobCoordinator mockJobCoordinator = Mockito.mock(JobCoordinator.class); + Mockito.doNothing().when(mockJobCoordinator).start(); + StreamProcessorLifecycleListener lifecycleListener = Mockito.mock(StreamProcessorLifecycleListener.class); + StreamProcessor streamProcessor = new StreamProcessor(new MapConfig(), new HashMap<>(), null, lifecycleListener, mockJobCoordinator); + Assert.assertEquals(State.NEW, streamProcessor.getState()); + streamProcessor.start(); + + Assert.assertEquals(State.STARTED, streamProcessor.getState()); + + streamProcessor.start(); + + Assert.assertEquals(State.STARTED, streamProcessor.getState()); + + Mockito.verify(mockJobCoordinator, Mockito.times(1)).start(); + } + + @Test + public void testOnJobModelExpiredShouldMakeCorrectStateTransitions() { + JobCoordinator mockJobCoordinator = Mockito.mock(JobCoordinator.class); + StreamProcessorLifecycleListener lifecycleListener = Mockito.mock(StreamProcessorLifecycleListener.class); + SamzaContainer mockSamzaContainer = Mockito.mock(SamzaContainer.class); + MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0")); + StreamProcessor streamProcessor = new StreamProcessor(config, new HashMap<>(), null, lifecycleListener, mockJobCoordinator); + + /** + * Without a SamzaContainer running in StreamProcessor and current StreamProcessor state is STARTED, + * onJobModelExpired should move the state to IN_REBALANCE. + */ + + streamProcessor.start(); + + Assert.assertEquals(State.STARTED, streamProcessor.getState()); + + streamProcessor.jobCoordinatorListener.onJobModelExpired(); + + Assert.assertEquals(State.IN_REBALANCE, streamProcessor.getState()); + + /** + * When there's initialized SamzaContainer in StreamProcessor and the container shutdown + * fails in onJobModelExpired. onJobModelExpired should move StreamProcessor to STOPPING + * state and should shutdown JobCoordinator. + */ + Mockito.doNothing().when(mockJobCoordinator).start(); + Mockito.doNothing().when(mockJobCoordinator).stop(); + Mockito.doNothing().when(mockSamzaContainer).shutdown(); + Mockito.when(mockSamzaContainer.hasStopped()).thenReturn(false); + Mockito.when(mockSamzaContainer.getStatus()) + .thenReturn(SamzaContainerStatus.STARTED) + .thenReturn(SamzaContainerStatus.STOPPED); + streamProcessor.container = mockSamzaContainer; + streamProcessor.state = State.STARTED; + + streamProcessor.jobCoordinatorListener.onJobModelExpired(); + + Assert.assertEquals(State.STOPPING, streamProcessor.getState()); + Mockito.verify(mockSamzaContainer, Mockito.times(1)).shutdown(); + Mockito.verify(mockJobCoordinator, Mockito.times(1)).stop(); + + // If StreamProcessor is in IN_REBALANCE state, onJobModelExpired should be a NO_OP. + streamProcessor.state = State.IN_REBALANCE; + + streamProcessor.jobCoordinatorListener.onJobModelExpired(); + + Assert.assertEquals(State.IN_REBALANCE, streamProcessor.state); + } + + @Test + public void testOnNewJobModelShouldResultInValidStateTransitions() throws Exception { + JobCoordinator mockJobCoordinator = Mockito.mock(JobCoordinator.class); + StreamProcessorLifecycleListener lifecycleListener = Mockito.mock(StreamProcessorLifecycleListener.class); + SamzaContainer mockSamzaContainer = Mockito.mock(SamzaContainer.class); + MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0")); + StreamProcessor streamProcessor = PowerMockito.spy(new StreamProcessor(config, new HashMap<>(), null, lifecycleListener, mockJobCoordinator)); + + streamProcessor.container = mockSamzaContainer; + streamProcessor.state = State.IN_REBALANCE; + Mockito.doNothing().when(mockSamzaContainer).run(); + + streamProcessor.jobCoordinatorListener.onNewJobModel("TestProcessorId", new JobModel(new MapConfig(), new HashMap<>())); + + Mockito.verify(mockSamzaContainer, Mockito.atMost(1)).run(); + } + + @Test + public void testStopShouldBeIdempotent() { + JobCoordinator mockJobCoordinator = Mockito.mock(JobCoordinator.class); + StreamProcessorLifecycleListener lifecycleListener = Mockito.mock(StreamProcessorLifecycleListener.class); + SamzaContainer mockSamzaContainer = Mockito.mock(SamzaContainer.class); + MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0")); + StreamProcessor streamProcessor = PowerMockito.spy(new StreamProcessor(config, new HashMap<>(), null, lifecycleListener, mockJobCoordinator)); + + Mockito.doNothing().when(mockJobCoordinator).stop(); + Mockito.doNothing().when(mockSamzaContainer).shutdown(); + Mockito.when(mockSamzaContainer.hasStopped()).thenReturn(false); + Mockito.when(mockSamzaContainer.getStatus()) + .thenReturn(SamzaContainerStatus.STARTED) + .thenReturn(SamzaContainerStatus.STOPPED); + + streamProcessor.state = State.RUNNING; + + streamProcessor.stop(); + + Assert.assertEquals(State.STOPPING, streamProcessor.state); + } + + @Test + public void testCoordinatorFailureShouldStopTheStreamProcessor() { + JobCoordinator mockJobCoordinator = Mockito.mock(JobCoordinator.class); + StreamProcessorLifecycleListener lifecycleListener = Mockito.mock(StreamProcessorLifecycleListener.class); + SamzaContainer mockSamzaContainer = Mockito.mock(SamzaContainer.class); + MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0")); + StreamProcessor streamProcessor = new StreamProcessor(config, new HashMap<>(), null, lifecycleListener, mockJobCoordinator); + + Exception failureException = new Exception("dummy exception"); + + streamProcessor.container = mockSamzaContainer; + streamProcessor.state = State.RUNNING; + streamProcessor.jobCoordinatorListener.onCoordinatorFailure(failureException); + Mockito.doNothing().when(mockSamzaContainer).shutdown(); + Mockito.when(mockSamzaContainer.hasStopped()).thenReturn(false); + + + Assert.assertEquals(State.STOPPED, streamProcessor.state); + Mockito.verify(lifecycleListener).onFailure(failureException); + Mockito.verify(mockSamzaContainer).shutdown(); + } + + @Test + public void testCoordinatorStopShouldStopTheStreamProcessor() { + JobCoordinator mockJobCoordinator = Mockito.mock(JobCoordinator.class); + StreamProcessorLifecycleListener lifecycleListener = Mockito.mock(StreamProcessorLifecycleListener.class); + MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0")); + StreamProcessor streamProcessor = new StreamProcessor(config, new HashMap<>(), null, lifecycleListener, mockJobCoordinator); + + streamProcessor.state = State.RUNNING; + streamProcessor.jobCoordinatorListener.onCoordinatorStop(); + + Assert.assertEquals(State.STOPPED, streamProcessor.state); + Mockito.verify(lifecycleListener).onShutdown(); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/93b397e8/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala index b27b151..9aca45e 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala @@ -203,7 +203,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { onContainerFailedThrowable = t } - override def onContainerStop(invokedExternally: Boolean): Unit = { + override def onContainerStop(): Unit = { onContainerStopCalled = true } @@ -284,7 +284,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { onContainerFailedThrowable = t } - override def onContainerStop(invokedExternally: Boolean): Unit = { + override def onContainerStop(): Unit = { onContainerStopCalled = true } @@ -367,7 +367,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { onContainerFailedThrowable = t } - override def onContainerStop(invokedExternally: Boolean): Unit = { + override def onContainerStop(): Unit = { onContainerStopCalled = true } @@ -451,7 +451,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { onContainerFailedThrowable = t } - override def onContainerStop(invokedExternally: Boolean): Unit = { + override def onContainerStop(): Unit = { onContainerStopCalled = true } @@ -530,7 +530,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { onContainerFailedThrowable = t } - override def onContainerStop(invokedExternally: Boolean): Unit = { + override def onContainerStop(): Unit = { onContainerStopCalled = true } http://git-wip-us.apache.org/repos/asf/samza/blob/93b397e8/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessor.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessor.java b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessor.java index 7253b29..5c28553 100644 --- a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessor.java +++ b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessor.java @@ -128,7 +128,7 @@ public class TestZkStreamProcessor extends TestZkStreamProcessorBase { // make sure it consumes all the messages from the first batch waitUntilMessagesLeftN(totalEventsToGenerate - messageCount); - CountDownLatch containerStopped1 = sp1.jcContainerShutdownLatch; + CountDownLatch containerStopped1 = sp1.containerShutdownLatch; // start the second processor CountDownLatch startWait2 = new CountDownLatch(1); @@ -211,7 +211,7 @@ public class TestZkStreamProcessor extends TestZkStreamProcessorBase { // make sure they consume all the messages from the first batch waitUntilMessagesLeftN(totalEventsToGenerate - messageCount); - CountDownLatch containerStopped2 = sp2.jcContainerShutdownLatch; + CountDownLatch containerStopped2 = sp2.containerShutdownLatch; // stop the first processor stopProcessor(stopLatch1); http://git-wip-us.apache.org/repos/asf/samza/blob/93b397e8/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorFailures.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorFailures.java b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorFailures.java index 374e77c..fb9c66b 100644 --- a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorFailures.java +++ b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorFailures.java @@ -108,8 +108,8 @@ public class TestZkStreamProcessorFailures extends TestZkStreamProcessorBase { // make sure they consume all the messages waitUntilMessagesLeftN(totalEventsToBeConsumed - messageCount); - CountDownLatch containerStopped1 = sp1.jcContainerShutdownLatch; - CountDownLatch containerStopped2 = sp2.jcContainerShutdownLatch; + CountDownLatch containerStopped1 = sp1.containerShutdownLatch; + CountDownLatch containerStopped2 = sp2.containerShutdownLatch; // produce the bad messages produceMessages(BAD_MESSAGE_KEY, inputTopic, 4); http://git-wip-us.apache.org/repos/asf/samza/blob/93b397e8/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorSession.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorSession.java b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorSession.java index 40eeaf0..f518c0a 100644 --- a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorSession.java +++ b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorSession.java @@ -98,10 +98,10 @@ public class TestZkStreamProcessorSession extends TestZkStreamProcessorBase { waitUntilMessagesLeftN(totalEventsToGenerate - messageCount); // Get the container stop latch to be able to check when a container is stopped. - // New jcContainerShutdownLatch is created after each onNewJobModel, + // New containerShutdownLatch is created after each onNewJobModel, // so we need to get the current one, before it changed.. for (int i = 0; i < processorIds.length; i++) { - containerStopLatches[i] = streamProcessors[i].jcContainerShutdownLatch; + containerStopLatches[i] = streamProcessors[i].containerShutdownLatch; } // expire zk session of one of the processors http://git-wip-us.apache.org/repos/asf/samza/blob/93b397e8/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java index ea44052..bfa78a0 100644 --- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java @@ -23,7 +23,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import java.util.ArrayList; +import java.io.Serializable; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -54,7 +54,6 @@ import org.apache.samza.runtime.LocalApplicationRunner; import org.apache.samza.test.StandaloneIntegrationTestHarness; import org.apache.samza.test.StandaloneTestUtils; import org.apache.samza.test.processor.TestStreamApplication.StreamApplicationCallback; -import org.apache.samza.test.processor.TestStreamApplication.TestKafkaEvent; import org.apache.samza.util.NoOpMetricsRegistry; import org.apache.samza.zk.ZkJobCoordinatorFactory; import org.apache.samza.zk.ZkKeyBuilder; @@ -279,16 +278,13 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne // Job model before and after the addition of second stream processor should be the same. assertEquals(previousJobModel[0], updatedJobModel); assertEquals(new MapConfig(), updatedJobModel.getConfig()); - // TODO: After SAMZA-1364 add assertion for localApplicationRunner2.status(streamApp) - // ProcessedMessagesLatch shouldn't have changed. Should retain it's initial value. assertEquals(NUM_KAFKA_EVENTS, processedMessagesLatch.getCount()); - - // TODO: re-enable the following kill and waitForFinish calls after fixing SAMZA-1665 - // localApplicationRunner1.kill(streamApp1); - // localApplicationRunner2.kill(streamApp2); - - // localApplicationRunner1.waitForFinish(); - // localApplicationRunner2.waitForFinish(); + localApplicationRunner1.kill(streamApp1); + localApplicationRunner1.waitForFinish(); + localApplicationRunner2.kill(streamApp2); + localApplicationRunner2.waitForFinish(); + assertEquals(localApplicationRunner1.status(streamApp1), ApplicationStatus.SuccessfulFinish); + assertEquals(localApplicationRunner2.status(streamApp2), ApplicationStatus.UnsuccessfulFinish); } /** @@ -387,13 +383,11 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne processedMessagesLatch.await(); assertEquals(ApplicationStatus.Running, localApplicationRunner2.status(streamApp2)); - - // TODO: re-enable the following kill and waitForFinish calls after fixing SAMZA-1665 - // localApplicationRunner1.kill(streamApp1); - // localApplicationRunner2.kill(streamApp2); - - // localApplicationRunner1.waitForFinish(); - // localApplicationRunner2.waitForFinish(); + localApplicationRunner1.kill(streamApp1); + localApplicationRunner1.waitForFinish(); + localApplicationRunner2.kill(streamApp2); + localApplicationRunner2.waitForFinish(); + assertEquals(localApplicationRunner1.status(streamApp1), ApplicationStatus.SuccessfulFinish); } @Test @@ -439,7 +433,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne applicationRunner1.kill(streamApp1); applicationRunner1.waitForFinish(); - // How do you know here that leader has been reelected. + assertEquals(applicationRunner1.status(streamApp1), ApplicationStatus.SuccessfulFinish); kafkaEventsConsumedLatch.await(); publishKafkaEvents(inputKafkaTopic, 0, 2 * NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]); @@ -458,12 +452,12 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne assertEquals(Sets.newHashSet("0000000001", "0000000002"), jobModel.getContainers().keySet()); assertEquals(2, jobModel.getContainers().size()); - // TODO: re-enable the following kill and waitForFinish calls after fixing SAMZA-1665 - // applicationRunner2.kill(streamApp2); - // applicationRunner3.kill(streamApp3); - - // applicationRunner2.waitForFinish(); - // applicationRunner3.waitForFinish(); + applicationRunner2.kill(streamApp2); + applicationRunner2.waitForFinish(); + assertEquals(applicationRunner2.status(streamApp2), ApplicationStatus.SuccessfulFinish); + applicationRunner3.kill(streamApp3); + applicationRunner3.waitForFinish(); + assertEquals(applicationRunner3.status(streamApp2), ApplicationStatus.SuccessfulFinish); } @Test @@ -501,12 +495,11 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne try { applicationRunner3.run(streamApp3); } finally { - // TODO: re-enable the following kill and waitForFinish calls after fixing SAMZA-1665 - // applicationRunner1.kill(streamApp1); - // applicationRunner2.kill(streamApp2); + applicationRunner1.kill(streamApp1); + applicationRunner2.kill(streamApp2); - // applicationRunner1.waitForFinish(); - // applicationRunner2.waitForFinish(); + applicationRunner1.waitForFinish(); + applicationRunner2.waitForFinish(); } } @@ -526,15 +519,12 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne LocalApplicationRunner applicationRunner1 = new LocalApplicationRunner(applicationConfig1); LocalApplicationRunner applicationRunner2 = new LocalApplicationRunner(applicationConfig2); - List<TestKafkaEvent> messagesProcessed = new ArrayList<>(); - StreamApplicationCallback streamApplicationCallback = m -> messagesProcessed.add(m); - // Create StreamApplication from configuration. CountDownLatch kafkaEventsConsumedLatch = new CountDownLatch(NUM_KAFKA_EVENTS); CountDownLatch processedMessagesLatch1 = new CountDownLatch(1); CountDownLatch processedMessagesLatch2 = new CountDownLatch(1); - StreamApplication streamApp1 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, streamApplicationCallback, kafkaEventsConsumedLatch, applicationConfig1); + StreamApplication streamApp1 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, null, kafkaEventsConsumedLatch, applicationConfig1); StreamApplication streamApp2 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch2, null, kafkaEventsConsumedLatch, applicationConfig2); // Run stream application. @@ -551,10 +541,12 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne applicationRunner1.kill(streamApp1); applicationRunner1.waitForFinish(); + assertEquals(applicationRunner1.status(streamApp1), ApplicationStatus.SuccessfulFinish); + LocalApplicationRunner applicationRunner4 = new LocalApplicationRunner(applicationConfig1); processedMessagesLatch1 = new CountDownLatch(1); publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, 2 * NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]); - streamApp1 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, streamApplicationCallback, kafkaEventsConsumedLatch, applicationConfig1); + streamApp1 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, null, kafkaEventsConsumedLatch, applicationConfig1); applicationRunner4.run(streamApp1); processedMessagesLatch1.await(); @@ -566,12 +558,85 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne assertEquals(Integer.parseInt(jobModelVersion) + 1, Integer.parseInt(newJobModelVersion)); assertEquals(jobModel.getContainers(), newJobModel.getContainers()); - // TODO: re-enable the following kill and waitForFinish calls after fixing SAMZA-1665 - // applicationRunner2.kill(streamApp2); - // applicationRunner4.kill(streamApp1); + applicationRunner2.kill(streamApp2); + applicationRunner2.waitForFinish(); + assertEquals(applicationRunner2.status(streamApp2), ApplicationStatus.SuccessfulFinish); + applicationRunner4.kill(streamApp1); + applicationRunner4.waitForFinish(); + assertEquals(applicationRunner4.status(streamApp1), ApplicationStatus.SuccessfulFinish); + } + + @Test + public void testShouldStopStreamApplicationWhenShutdownTimeOutIsLessThanContainerShutdownTime() throws Exception { + publishKafkaEvents(inputKafkaTopic, 0, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]); + + Map<String, String> configMap = buildStreamApplicationConfigMap(TEST_SYSTEM, inputKafkaTopic, testStreamAppName, testStreamAppId); + configMap.put(TaskConfig.SHUTDOWN_MS(), "0"); + + configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[0]); + Config applicationConfig1 = new MapConfig(configMap); + + configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[1]); + Config applicationConfig2 = new MapConfig(configMap); + + LocalApplicationRunner applicationRunner1 = new LocalApplicationRunner(applicationConfig1); + LocalApplicationRunner applicationRunner2 = new LocalApplicationRunner(applicationConfig2); + + // Create StreamApplication from configuration. + CountDownLatch kafkaEventsConsumedLatch = new CountDownLatch(NUM_KAFKA_EVENTS); + CountDownLatch processedMessagesLatch1 = new CountDownLatch(1); + CountDownLatch processedMessagesLatch2 = new CountDownLatch(1); + + StreamApplication streamApp1 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, null, kafkaEventsConsumedLatch, applicationConfig1); + StreamApplication streamApp2 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch2, null, kafkaEventsConsumedLatch, applicationConfig2); + + applicationRunner1.run(streamApp1); + applicationRunner2.run(streamApp2); + + processedMessagesLatch1.await(); + processedMessagesLatch2.await(); + kafkaEventsConsumedLatch.await(); + + // At this stage, both the processors are running and have drained the kakfa source. + // Trigger re-balancing phase, by manually adding a new processor. + + configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[2]); + Config applicationConfig3 = new MapConfig(configMap); + + LocalApplicationRunner applicationRunner3 = new LocalApplicationRunner(applicationConfig3); + CountDownLatch processedMessagesLatch3 = new CountDownLatch(1); - // applicationRunner2.waitForFinish(); - // applicationRunner4.waitForFinish(); + StreamApplication streamApp3 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch3, null, kafkaEventsConsumedLatch, applicationConfig3); + applicationRunner3.run(streamApp3); + + publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, 2 * NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]); + + processedMessagesLatch3.await(); + + /** + * If the processing has started in the third stream processor, then other two stream processors should be stopped. + */ + // TODO: This is a bug! Status should be unsuccessful finish. + assertEquals(applicationRunner1.status(streamApp1), ApplicationStatus.SuccessfulFinish); + assertEquals(applicationRunner2.status(streamApp2), ApplicationStatus.SuccessfulFinish); + + applicationRunner3.kill(streamApp3); + applicationRunner3.waitForFinish(); + assertEquals(applicationRunner3.status(streamApp3), ApplicationStatus.SuccessfulFinish); + } + + private static class TestKafkaEvent implements Serializable { + + // Actual content of the event. + private String eventData; + + // Contains Integer value, which is greater than previous message id. + private String eventId; + + TestKafkaEvent(String eventId, String eventData) { + this.eventId = eventData; + this.eventData = eventData; + } } }