http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java index e1e1c55..1e021f5 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java @@ -103,7 +103,7 @@ public abstract class OperatorSpec<M, OM> implements Serializable { } /** - * Get the unique ID of this operator in the {@link org.apache.samza.operators.StreamGraph}. + * Get the unique ID of this operator in the {@link org.apache.samza.application.StreamApplicationDescriptorImpl}. * @return the unique operator ID */ public final String getOpId() {
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java index 17c6903..4db8e60 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java @@ -18,9 +18,9 @@ */ package org.apache.samza.operators.stream; +import org.apache.samza.application.StreamApplicationDescriptorImpl; import org.apache.samza.operators.MessageStreamImpl; import org.apache.samza.operators.OutputStream; -import org.apache.samza.operators.StreamGraphSpec; import org.apache.samza.operators.spec.InputOperatorSpec; import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.operators.spec.OutputStreamImpl; @@ -44,9 +44,9 @@ public class IntermediateMessageStreamImpl<M> extends MessageStreamImpl<M> imple private final OutputStreamImpl<M> outputStream; private final boolean isKeyed; - public IntermediateMessageStreamImpl(StreamGraphSpec graph, InputOperatorSpec inputOperatorSpec, + public IntermediateMessageStreamImpl(StreamApplicationDescriptorImpl appDesc, InputOperatorSpec inputOperatorSpec, OutputStreamImpl<M> outputStream) { - super(graph, (OperatorSpec<?, M>) inputOperatorSpec); + super(appDesc, (OperatorSpec<?, M>) inputOperatorSpec); this.outputStream = outputStream; if (inputOperatorSpec.isKeyed() != outputStream.isKeyed()) { LOGGER.error("Input and output streams for intermediate stream {} aren't keyed consistently. Input: {}, Output: {}", http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java index ed67d80..4ef9f9c 100644 --- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java +++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java @@ -40,8 +40,8 @@ import org.apache.samza.coordinator.JobCoordinatorFactory; import org.apache.samza.coordinator.JobCoordinatorListener; import org.apache.samza.job.model.JobModel; import org.apache.samza.metrics.MetricsReporter; -import org.apache.samza.task.AsyncStreamTaskFactory; -import org.apache.samza.task.StreamTaskFactory; +import org.apache.samza.runtime.ProcessorLifecycleListener; +import org.apache.samza.task.TaskFactory; import org.apache.samza.util.ScalaJavaUtil; import org.apache.samza.util.Util; import org.slf4j.Logger; @@ -95,8 +95,8 @@ public class StreamProcessor { private static final String CONTAINER_THREAD_NAME_FORMAT = "Samza StreamProcessor Container Thread-%d"; private final JobCoordinator jobCoordinator; - private final StreamProcessorLifecycleListener processorListener; - private final Object taskFactory; + private final ProcessorLifecycleListener processorListener; + private final TaskFactory taskFactory; private final Map<String, MetricsReporter> customMetricsReporter; private final Config config; private final long taskShutdownMs; @@ -105,7 +105,6 @@ public class StreamProcessor { private final Object lock = new Object(); private Throwable containerException = null; - private boolean processorOnStartCalled = false; volatile CountDownLatch containerShutdownLatch = new CountDownLatch(1); @@ -153,54 +152,56 @@ public class StreamProcessor { * * <b>Note:</b> Lifecycle of the ExecutorService is fully managed by the StreamProcessor. * - * @param config configuration required to launch {@link JobCoordinator} and {@link SamzaContainer}. + * @param config configuration required to launch {@link JobCoordinator} and {@link SamzaContainer}. * @param customMetricsReporters metricReporter instances that will be used by SamzaContainer and JobCoordinator to report metrics. - * @param asyncStreamTaskFactory The {@link AsyncStreamTaskFactory} to be used for creating task instances. - * @param processorListener listener to the StreamProcessor life cycle. + * @param taskFactory the {@link TaskFactory} to be used for creating task instances. + * @param processorListener listener to the StreamProcessor life cycle. */ - public StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters, - AsyncStreamTaskFactory asyncStreamTaskFactory, StreamProcessorLifecycleListener processorListener) { - this(config, customMetricsReporters, asyncStreamTaskFactory, processorListener, null); + public StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters, TaskFactory taskFactory, + ProcessorLifecycleListener processorListener) { + this(config, customMetricsReporters, taskFactory, processorListener, null); } /** - * Same as {@link #StreamProcessor(Config, Map, AsyncStreamTaskFactory, StreamProcessorLifecycleListener)}, except task - * instances are created using the provided {@link StreamTaskFactory}. - * @param config - config + * Same as {@link #StreamProcessor(Config, Map, TaskFactory, ProcessorLifecycleListener)}, except the + * {@link JobCoordinator} is given for this {@link StreamProcessor}. + * @param config configuration required to launch {@link JobCoordinator} and {@link SamzaContainer} * @param customMetricsReporters metric Reporter - * @param streamTaskFactory task factory to instantiate the Task - * @param processorListener listener to the StreamProcessor life cycle + * @param taskFactory task factory to instantiate the Task + * @param processorListener listener to the StreamProcessor life cycle + * @param jobCoordinator the instance of {@link JobCoordinator} */ - public StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters, - StreamTaskFactory streamTaskFactory, StreamProcessorLifecycleListener processorListener) { - this(config, customMetricsReporters, streamTaskFactory, processorListener, null); + public StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters, TaskFactory taskFactory, + ProcessorLifecycleListener processorListener, JobCoordinator jobCoordinator) { + this(config, customMetricsReporters, taskFactory, sp -> processorListener, jobCoordinator); } - /* package private */ - private JobCoordinator getJobCoordinator() { - String jobCoordinatorFactoryClassName = new JobCoordinatorConfig(config).getJobCoordinatorFactoryClassName(); - return Util.getObj(jobCoordinatorFactoryClassName, JobCoordinatorFactory.class).getJobCoordinator(config); - } - - @VisibleForTesting - JobCoordinator getCurrentJobCoordinator() { - return jobCoordinator; - } - - StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters, Object taskFactory, - StreamProcessorLifecycleListener processorListener, JobCoordinator jobCoordinator) { - Preconditions.checkNotNull(processorListener, "ProcessorListener cannot be null."); + /** + * Same as {@link #StreamProcessor(Config, Map, TaskFactory, ProcessorLifecycleListener, JobCoordinator)}, except + * there is a {@link StreamProcessorLifecycleListenerFactory} as input instead of {@link ProcessorLifecycleListener}. + * This is useful to create a {@link ProcessorLifecycleListener} with a reference to this {@link StreamProcessor} + * + * @param config configuration required to launch {@link JobCoordinator} and {@link SamzaContainer} + * @param customMetricsReporters metric Reporter + * @param taskFactory task factory to instantiate the Task + * @param listenerFactory listener to the StreamProcessor life cycle + * @param jobCoordinator the instance of {@link JobCoordinator} + */ + public StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters, TaskFactory taskFactory, + StreamProcessorLifecycleListenerFactory listenerFactory, JobCoordinator jobCoordinator) { + Preconditions.checkNotNull(listenerFactory, "StreamProcessorListenerFactory cannot be null."); this.taskFactory = taskFactory; this.config = config; this.taskShutdownMs = new TaskConfigJava(config).getShutdownMs(); this.customMetricsReporter = customMetricsReporters; - this.processorListener = processorListener; - this.jobCoordinator = (jobCoordinator != null) ? jobCoordinator : getJobCoordinator(); + this.jobCoordinator = (jobCoordinator != null) ? jobCoordinator : createJobCoordinator(); this.jobCoordinatorListener = createJobCoordinatorListener(); this.jobCoordinator.setListener(jobCoordinatorListener); ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(CONTAINER_THREAD_NAME_FORMAT).setDaemon(true).build(); this.executorService = Executors.newSingleThreadExecutor(threadFactory); + // TODO: remove the dependency on jobCoordinator for processorId after fixing SAMZA-1835 this.processorId = this.jobCoordinator.getProcessorId(); + this.processorListener = listenerFactory.createInstance(this); } /** @@ -214,6 +215,7 @@ public class StreamProcessor { public void start() { synchronized (lock) { if (state == State.NEW) { + processorListener.beforeStart(); state = State.STARTED; jobCoordinator.start(); } else { @@ -239,9 +241,9 @@ public class StreamProcessor { * <br> * If container is running, * <ol> - * <li>container is shutdown cleanly and {@link SamzaContainerListener#onContainerStop()} will trigger + * <li>container is shutdown cleanly and {@link SamzaContainerListener#afterStop()} will trigger * {@link JobCoordinator#stop()}</li> - * <li>container fails to shutdown cleanly and {@link SamzaContainerListener#onContainerFailed(Throwable)} will + * <li>container fails to shutdown cleanly and {@link SamzaContainerListener#afterFailure(Throwable)} will * trigger {@link JobCoordinator#stop()}</li> * </ol> * If container is not running, then this method will simply shutdown the {@link JobCoordinator}. @@ -269,10 +271,26 @@ public class StreamProcessor { } } + @VisibleForTesting + JobCoordinator getCurrentJobCoordinator() { + return jobCoordinator; + } + + @VisibleForTesting + SamzaContainer getContainer() { + return container; + } + + @VisibleForTesting SamzaContainer createSamzaContainer(String processorId, JobModel jobModel) { return SamzaContainer.apply(processorId, jobModel, config, ScalaJavaUtil.toScalaMap(customMetricsReporter), taskFactory); } + private JobCoordinator createJobCoordinator() { + String jobCoordinatorFactoryClassName = new JobCoordinatorConfig(config).getJobCoordinatorFactoryClassName(); + return Util.getObj(jobCoordinatorFactoryClassName, JobCoordinatorFactory.class).getJobCoordinator(config); + } + /** * Stops the {@link SamzaContainer}. * @return true if {@link SamzaContainer} had shutdown within task.shutdown.ms. false otherwise. @@ -346,9 +364,9 @@ public class StreamProcessor { state = State.STOPPED; } if (containerException != null) - processorListener.onFailure(containerException); + processorListener.afterFailure(containerException); else - processorListener.onShutdown(); + processorListener.afterStop(); } @@ -360,30 +378,40 @@ public class StreamProcessor { executorService.shutdownNow(); state = State.STOPPED; } - processorListener.onFailure(throwable); + processorListener.afterFailure(throwable); } }; } - /* package private for testing */ - SamzaContainer getContainer() { - return container; + /** + * Interface to create a {@link ProcessorLifecycleListener} + */ + @FunctionalInterface + public interface StreamProcessorLifecycleListenerFactory { + ProcessorLifecycleListener createInstance(StreamProcessor processor); } class ContainerListener implements SamzaContainerListener { + private boolean processorOnStartCalled = false; + + @Override + public void beforeStart() { + // processorListener.beforeStart() is invoked in StreamProcessor.start() + } + @Override - public void onContainerStart() { + public void afterStart() { LOGGER.warn("Received container start notification for container: {} in stream processor: {}.", container, processorId); if (!processorOnStartCalled) { - processorListener.onStart(); + processorListener.afterStart(); processorOnStartCalled = true; } state = State.RUNNING; } @Override - public void onContainerStop() { + public void afterStop() { containerShutdownLatch.countDown(); synchronized (lock) { if (state == State.IN_REBALANCE) { @@ -397,7 +425,7 @@ public class StreamProcessor { } @Override - public void onContainerFailed(Throwable t) { + public void afterFailure(Throwable t) { containerShutdownLatch.countDown(); synchronized (lock) { LOGGER.error(String.format("Container: %s failed with an exception. Stopping the stream processor: %s. Original exception:", container, processorId), containerException); http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/processor/StreamProcessorLifecycleListener.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessorLifecycleListener.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessorLifecycleListener.java deleted file mode 100644 index 7a4da7d..0000000 --- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessorLifecycleListener.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.processor; - -import org.apache.samza.annotation.InterfaceStability; - - -/** - * This class listens to the life cycle events in a {@link StreamProcessor}, - */ -@InterfaceStability.Evolving -public interface StreamProcessorLifecycleListener { - /** - * Callback when the {@link StreamProcessor} is started - * This callback is invoked only once when {@link org.apache.samza.container.SamzaContainer} starts for the first time - * in the {@link StreamProcessor}. When there is a re-balance of tasks/partitions among the processors, the container - * may temporarily be "paused" and re-started again. For such re-starts, this callback is NOT invoked. - */ - void onStart(); - - /** - * Callback when the {@link StreamProcessor} is shut down. - */ - void onShutdown(); - - /** - * Callback when the {@link StreamProcessor} fails - * @param t Cause of the failure - */ - void onFailure(Throwable t); - -} http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java deleted file mode 100644 index dfcfba4..0000000 --- a/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.runtime; - -import com.google.common.annotations.VisibleForTesting; -import java.io.File; -import java.io.PrintWriter; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import org.apache.commons.lang3.StringUtils; -import org.apache.samza.application.StreamApplication; -import org.apache.samza.config.ApplicationConfig; -import org.apache.samza.config.ApplicationConfig.ApplicationMode; -import org.apache.samza.config.Config; -import org.apache.samza.config.JobConfig; -import org.apache.samza.config.MapConfig; -import org.apache.samza.config.ShellCommandConfig; -import org.apache.samza.config.StreamConfig; -import org.apache.samza.execution.ExecutionPlan; -import org.apache.samza.execution.ExecutionPlanner; -import org.apache.samza.execution.JobNode; -import org.apache.samza.execution.StreamManager; -import org.apache.samza.operators.OperatorSpecGraph; -import org.apache.samza.operators.StreamGraphSpec; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * Defines common, core behavior for implementations of the {@link ApplicationRunner} API. - */ -public abstract class AbstractApplicationRunner extends ApplicationRunner { - private static final Logger log = LoggerFactory.getLogger(AbstractApplicationRunner.class); - - /** - * The {@link ApplicationRunner} is supposed to run a single {@link StreamApplication} instance in the full life-cycle - */ - protected final StreamGraphSpec graphSpec; - - public AbstractApplicationRunner(Config config) { - super(config); - this.graphSpec = new StreamGraphSpec(config); - } - - public ExecutionPlan getExecutionPlan(StreamApplication app) throws Exception { - return getExecutionPlan(app, null); - } - - /* package private */ - ExecutionPlan getExecutionPlan(StreamApplication app, String runId) throws Exception { - // build stream graph - app.init(graphSpec, config); - OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph(); - - // generated application configs are stored in cfg - Map<String, String> cfg = new HashMap<>(); - if (StringUtils.isNoneEmpty(runId)) { - cfg.put(ApplicationConfig.APP_RUN_ID, runId); - } - - StreamConfig streamConfig = new StreamConfig(config); - Set<String> inputStreams = new HashSet<>(specGraph.getInputOperators().keySet()); - inputStreams.removeAll(specGraph.getOutputStreams().keySet()); - ApplicationMode mode = inputStreams.stream().allMatch(streamConfig::getIsBounded) - ? ApplicationMode.BATCH : ApplicationMode.STREAM; - cfg.put(ApplicationConfig.APP_MODE, mode.name()); - - // merge user-provided configuration with input/output descriptor generated configuration - // descriptor generated configuration has higher priority - Map<String, String> systemStreamConfigs = new HashMap<>(); - graphSpec.getInputDescriptors().forEach((key, value) -> systemStreamConfigs.putAll(value.toConfig())); - graphSpec.getOutputDescriptors().forEach((key, value) -> systemStreamConfigs.putAll(value.toConfig())); - graphSpec.getSystemDescriptors().forEach(sd -> systemStreamConfigs.putAll(sd.toConfig())); - graphSpec.getDefaultSystemDescriptor().ifPresent(dsd -> - systemStreamConfigs.put(JobConfig.JOB_DEFAULT_SYSTEM(), dsd.getSystemName())); - cfg.putAll(systemStreamConfigs); - - // create the physical execution plan and merge with overrides. This works for a single-stage job now - // TODO: This should all be consolidated with ExecutionPlanner after fixing SAMZA-1811 - Config mergedConfig = JobNode.mergeJobConfig(config, new MapConfig(cfg)); - StreamManager streamManager = buildAndStartStreamManager(mergedConfig); - try { - ExecutionPlanner planner = new ExecutionPlanner(mergedConfig, streamManager); - return planner.plan(specGraph); - } finally { - streamManager.stop(); - } - } - - /** - * Write the execution plan JSON to a file - * @param planJson JSON representation of the plan - */ - final void writePlanJsonFile(String planJson) { - try { - String content = "plan='" + planJson + "'"; - String planPath = System.getenv(ShellCommandConfig.EXECUTION_PLAN_DIR()); - if (planPath != null && !planPath.isEmpty()) { - // Write the plan json to plan path - File file = new File(planPath + "/plan.json"); - file.setReadable(true, false); - PrintWriter writer = new PrintWriter(file, "UTF-8"); - writer.println(content); - writer.close(); - } - } catch (Exception e) { - log.warn("Failed to write execution plan json to file", e); - } - } - - @VisibleForTesting - StreamManager buildAndStartStreamManager(Config config) { - StreamManager streamManager = new StreamManager(config); - streamManager.start(); - return streamManager; - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java b/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java index 13e6d38..17a9dc1 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java @@ -21,21 +21,17 @@ package org.apache.samza.runtime; import joptsimple.OptionSet; import joptsimple.OptionSpec; -import org.apache.samza.application.StreamApplication; +import org.apache.samza.application.ApplicationUtil; import org.apache.samza.config.Config; -import org.apache.samza.job.JobRunner$; import org.apache.samza.util.CommandLine; import org.apache.samza.util.Util; /** * This class contains the main() method used by run-app.sh. - * For a StreamApplication, it creates the {@link ApplicationRunner} based on the config, and then run the application. - * For a Samza job using low level task API, it will create the JobRunner to run it. + * It creates the {@link ApplicationRunner} based on the config, and then run the application. */ public class ApplicationRunnerMain { - // TODO: have the app configs consolidated in one place - public static final String STREAM_APPLICATION_CLASS_CONFIG = "app.class"; public static class ApplicationRunnerCommandLine extends CommandLine { public OptionSpec operationOpt = @@ -58,25 +54,21 @@ public class ApplicationRunnerMain { Config config = Util.rewriteConfig(orgConfig); ApplicationRunnerOperation op = cmdLine.getOperation(options); - if (config.containsKey(STREAM_APPLICATION_CLASS_CONFIG)) { - ApplicationRunner runner = ApplicationRunner.fromConfig(config); - StreamApplication app = - (StreamApplication) Class.forName(config.get(STREAM_APPLICATION_CLASS_CONFIG)).newInstance(); - switch (op) { - case RUN: - runner.run(app); - break; - case KILL: - runner.kill(app); - break; - case STATUS: - System.out.println(runner.status(app)); - break; - default: - throw new IllegalArgumentException("Unrecognized operation: " + op); - } - } else { - JobRunner$.MODULE$.main(args); + ApplicationRunner appRunner = + ApplicationRunners.getApplicationRunner(ApplicationUtil.fromConfig(config), config); + + switch (op) { + case RUN: + appRunner.run(); + break; + case KILL: + appRunner.kill(); + break; + case STATUS: + System.out.println(appRunner.status()); + break; + default: + throw new IllegalArgumentException("Unrecognized operation: " + op); } } } http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java index 8a9c151..7100482 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java @@ -21,36 +21,29 @@ package org.apache.samza.runtime; import com.google.common.annotations.VisibleForTesting; import java.time.Duration; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.apache.samza.SamzaException; -import org.apache.samza.application.StreamApplication; +import org.apache.samza.application.ApplicationDescriptor; +import org.apache.samza.application.ApplicationDescriptorImpl; +import org.apache.samza.application.SamzaApplication; +import org.apache.samza.application.ApplicationDescriptorUtil; import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; -import org.apache.samza.config.JobCoordinatorConfig; -import org.apache.samza.config.TaskConfig; -import org.apache.samza.coordinator.CoordinationUtils; -import org.apache.samza.coordinator.DistributedLockWithState; -import org.apache.samza.execution.ExecutionPlan; -import org.apache.samza.execution.StreamManager; +import org.apache.samza.execution.LocalJobPlanner; import org.apache.samza.job.ApplicationStatus; import org.apache.samza.metrics.MetricsReporter; -import org.apache.samza.operators.StreamGraphSpec; import org.apache.samza.processor.StreamProcessor; -import org.apache.samza.processor.StreamProcessorLifecycleListener; -import org.apache.samza.system.StreamSpec; -import org.apache.samza.task.AsyncStreamTaskFactory; -import org.apache.samza.task.StreamTaskFactory; +import org.apache.samza.task.TaskFactory; import org.apache.samza.task.TaskFactoryUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,176 +51,81 @@ import org.slf4j.LoggerFactory; /** * This class implements the {@link ApplicationRunner} that runs the applications in standalone environment */ -public class LocalApplicationRunner extends AbstractApplicationRunner { +public class LocalApplicationRunner implements ApplicationRunner { private static final Logger LOG = LoggerFactory.getLogger(LocalApplicationRunner.class); - private static final String APPLICATION_RUNNER_PATH_SUFFIX = "/ApplicationRunnerData"; - private final String uid; + private final ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc; + private final LocalJobPlanner planner; private final Set<StreamProcessor> processors = ConcurrentHashMap.newKeySet(); private final CountDownLatch shutdownLatch = new CountDownLatch(1); private final AtomicInteger numProcessorsToStart = new AtomicInteger(); private final AtomicReference<Throwable> failure = new AtomicReference<>(); - private final Map<String, MetricsReporter> customMetricsReporters; private ApplicationStatus appStatus = ApplicationStatus.New; - final class LocalStreamProcessorLifeCycleListener implements StreamProcessorLifecycleListener { - StreamProcessor processor; - - void setProcessor(StreamProcessor processor) { - this.processor = processor; - } - - @Override - public void onStart() { - if (numProcessorsToStart.decrementAndGet() == 0) { - appStatus = ApplicationStatus.Running; - } - } - - @Override - public void onShutdown() { - processors.remove(processor); - processor = null; - - if (processors.isEmpty()) { - shutdownAndNotify(); - } - } - - @Override - public void onFailure(Throwable t) { - processors.remove(processor); - processor = null; - - if (failure.compareAndSet(null, t)) { - // shutdown the other processors - processors.forEach(StreamProcessor::stop); - } - - if (processors.isEmpty()) { - shutdownAndNotify(); - } - } - - private void shutdownAndNotify() { - if (failure.get() != null) { - appStatus = ApplicationStatus.unsuccessfulFinish(failure.get()); - } else { - if (appStatus == ApplicationStatus.Running) { - appStatus = ApplicationStatus.SuccessfulFinish; - } else if (appStatus == ApplicationStatus.New) { - // the processor is shutdown before started - appStatus = ApplicationStatus.UnsuccessfulFinish; - } - } - - shutdownLatch.countDown(); - } - } - - public LocalApplicationRunner(Config config) { - this(config, new HashMap<>()); - } - - public LocalApplicationRunner(Config config, Map<String, MetricsReporter> customMetricsReporters) { - super(config); - this.uid = UUID.randomUUID().toString(); - this.customMetricsReporters = customMetricsReporters; + /** + * Constructors a {@link LocalApplicationRunner} to run the {@code app} with the {@code config}. + * + * @param app application to run + * @param config configuration for the application + */ + public LocalApplicationRunner(SamzaApplication app, Config config) { + this.appDesc = ApplicationDescriptorUtil.getAppDescriptor(app, config); + this.planner = new LocalJobPlanner(appDesc); } - @Override - public void runTask() { - JobConfig jobConfig = new JobConfig(this.config); - - // validation - String taskName = new TaskConfig(config).getTaskClass().getOrElse(null); - if (taskName == null) { - throw new SamzaException("Neither APP nor task.class are defined defined"); - } - LOG.info("LocalApplicationRunner will run " + taskName); - LocalStreamProcessorLifeCycleListener listener = new LocalStreamProcessorLifeCycleListener(); - - StreamProcessor processor = createStreamProcessor(jobConfig, listener); - - numProcessorsToStart.set(1); - listener.setProcessor(processor); - processor.start(); + /** + * Constructor only used in unit test to allow injection of {@link LocalJobPlanner} + */ + @VisibleForTesting + LocalApplicationRunner(ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc, LocalJobPlanner planner) { + this.appDesc = appDesc; + this.planner = planner; } @Override - public void run(StreamApplication app) { - + public void run() { try { - // 1. initialize and plan - ExecutionPlan plan = getExecutionPlan(app); + List<JobConfig> jobConfigs = planner.prepareJobs(); - String executionPlanJson = plan.getPlanAsJson(); - writePlanJsonFile(executionPlanJson); - LOG.info("Execution Plan: \n" + executionPlanJson); - String planId = String.valueOf(executionPlanJson.hashCode()); - - if (plan.getJobConfigs().isEmpty()) { + // create the StreamProcessors + if (jobConfigs.isEmpty()) { throw new SamzaException("No jobs to run."); } - - plan.getJobConfigs().forEach(jobConfig -> { - StreamManager streamManager = null; - try { - // 2. create the necessary streams - streamManager = buildAndStartStreamManager(jobConfig); - createStreams(planId, plan.getIntermediateStreams(), streamManager); - - // 3. create the StreamProcessors - LOG.debug("Starting job {} StreamProcessor with config {}", jobConfig.getName(), jobConfig); - LocalStreamProcessorLifeCycleListener listener = new LocalStreamProcessorLifeCycleListener(); - StreamProcessor processor = createStreamProcessor(jobConfig, graphSpec, listener); - listener.setProcessor(processor); - processors.add(processor); - } finally { - if (streamManager != null) { - streamManager.stop(); - } - } + jobConfigs.forEach(jobConfig -> { + LOG.debug("Starting job {} StreamProcessor with config {}", jobConfig.getName(), jobConfig); + StreamProcessor processor = createStreamProcessor(jobConfig, appDesc, + sp -> new LocalStreamProcessorLifecycleListener(sp, jobConfig)); + processors.add(processor); }); numProcessorsToStart.set(processors.size()); - // 4. start the StreamProcessors + // start the StreamProcessors processors.forEach(StreamProcessor::start); } catch (Throwable throwable) { appStatus = ApplicationStatus.unsuccessfulFinish(throwable); shutdownLatch.countDown(); - throw new SamzaException(String.format("Failed to start application: %s.", app), throwable); + throw new SamzaException(String.format("Failed to start application: %s", + new ApplicationConfig(appDesc.getConfig()).getGlobalAppId()), throwable); } } @Override - public void kill(StreamApplication streamApp) { + public void kill() { processors.forEach(StreamProcessor::stop); } @Override - public ApplicationStatus status(StreamApplication streamApp) { + public ApplicationStatus status() { return appStatus; } - /** - * Waits until the application finishes. - */ @Override public void waitForFinish() { - waitForFinish(Duration.ofMillis(0)); + this.waitForFinish(Duration.ofSeconds(0)); } - /** - * Waits for {@code timeout} duration for the application to finish. - * If timeout < 1, blocks the caller indefinitely. - * - * @param timeout time to wait for the application to finish - * @return true - application finished before timeout - * false - otherwise - */ @Override public boolean waitForFinish(Duration timeout) { long timeoutInMs = timeout.toMillis(); @@ -251,102 +149,105 @@ public class LocalApplicationRunner extends AbstractApplicationRunner { return finished; } + @VisibleForTesting + protected Set<StreamProcessor> getProcessors() { + return Collections.unmodifiableSet(processors); + } + + @VisibleForTesting + CountDownLatch getShutdownLatch() { + return shutdownLatch; + } + + @VisibleForTesting + StreamProcessor createStreamProcessor(Config config, ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc, + StreamProcessor.StreamProcessorLifecycleListenerFactory listenerFactory) { + TaskFactory taskFactory = TaskFactoryUtil.getTaskFactory(appDesc); + Map<String, MetricsReporter> reporters = new HashMap<>(); + // TODO: the null processorId has to be fixed after SAMZA-1835 + appDesc.getMetricsReporterFactories().forEach((name, factory) -> + reporters.put(name, factory.getMetricsReporter(name, null, config))); + return new StreamProcessor(config, reporters, taskFactory, listenerFactory, null); + } + /** - * Create intermediate streams using {@link org.apache.samza.execution.StreamManager}. - * If {@link CoordinationUtils} is provided, this function will first invoke leader election, and the leader - * will create the streams. All the runner processes will wait on the latch that is released after the leader finishes - * stream creation. - * @param planId a unique identifier representing the plan used for coordination purpose - * @param intStreams list of intermediate {@link StreamSpec}s + * Defines a specific implementation of {@link ProcessorLifecycleListener} for local {@link StreamProcessor}s. */ - private void createStreams(String planId, - List<StreamSpec> intStreams, - StreamManager streamManager) { - if (intStreams.isEmpty()) { - LOG.info("Set of intermediate streams is empty. Nothing to create."); - return; + private final class LocalStreamProcessorLifecycleListener implements ProcessorLifecycleListener { + private final StreamProcessor processor; + private final ProcessorLifecycleListener userDefinedProcessorLifecycleListener; + + LocalStreamProcessorLifecycleListener(StreamProcessor processor, Config jobConfig) { + this.userDefinedProcessorLifecycleListener = appDesc.getProcessorLifecycleListenerFactory() + .createInstance(new ProcessorContext() { }, jobConfig); + this.processor = processor; } - LOG.info("A single processor must create the intermediate streams. Processor {} will attempt to acquire the lock.", uid); - // Move the scope of coordination utils within stream creation to address long idle connection problem. - // Refer SAMZA-1385 for more details - JobCoordinatorConfig jcConfig = new JobCoordinatorConfig(config); - String coordinationId = new ApplicationConfig(config).getGlobalAppId() + APPLICATION_RUNNER_PATH_SUFFIX; - CoordinationUtils coordinationUtils = - jcConfig.getCoordinationUtilsFactory().getCoordinationUtils(coordinationId, uid, config); - if (coordinationUtils == null) { - LOG.warn("Processor {} failed to create utils. Each processor will attempt to create streams.", uid); - // each application process will try creating the streams, which - // requires stream creation to be idempotent - streamManager.createStreams(intStreams); - return; + + @Override + public void beforeStart() { + userDefinedProcessorLifecycleListener.beforeStart(); } - DistributedLockWithState lockWithState = coordinationUtils.getLockWithState(planId); - try { - // check if the processor needs to go through leader election and stream creation - if (lockWithState.lockIfNotSet(1000, TimeUnit.MILLISECONDS)) { - LOG.info("lock acquired for streams creation by " + uid); - streamManager.createStreams(intStreams); - lockWithState.unlockAndSet(); - } else { - LOG.info("Processor {} did not obtain the lock for streams creation. They must've been created by another processor.", uid); + @Override + public void afterStart() { + if (numProcessorsToStart.decrementAndGet() == 0) { + appStatus = ApplicationStatus.Running; } - } catch (TimeoutException e) { - String msg = String.format("Processor {} failed to get the lock for stream initialization", uid); - throw new SamzaException(msg, e); - } finally { - coordinationUtils.close(); + userDefinedProcessorLifecycleListener.afterStart(); } - } - /** - * Create {@link StreamProcessor} based on {@link StreamApplication} and the config - * @param config config - * @return {@link StreamProcessor]} - */ - /* package private */ - StreamProcessor createStreamProcessor( - Config config, - StreamProcessorLifecycleListener listener) { - Object taskFactory = TaskFactoryUtil.createTaskFactory(config); - return getStreamProcessorInstance(config, taskFactory, listener); - } + @Override + public void afterStop() { + processors.remove(processor); - /** - * Create {@link StreamProcessor} based on {@link StreamApplication} and the config - * @param config config - * @param graphBuilder {@link StreamGraphSpec} - * @return {@link StreamProcessor]} - */ - /* package private */ - StreamProcessor createStreamProcessor( - Config config, - StreamGraphSpec graphBuilder, - StreamProcessorLifecycleListener listener) { - Object taskFactory = TaskFactoryUtil.createTaskFactory(graphBuilder.getOperatorSpecGraph(), graphBuilder.getContextManager()); - return getStreamProcessorInstance(config, taskFactory, listener); - } + // successful shutdown + handleProcessorShutdown(null); + } - private StreamProcessor getStreamProcessorInstance(Config config, Object taskFactory, StreamProcessorLifecycleListener listener) { - if (taskFactory instanceof StreamTaskFactory) { - return new StreamProcessor( - config, customMetricsReporters, (StreamTaskFactory) taskFactory, listener); - } else if (taskFactory instanceof AsyncStreamTaskFactory) { - return new StreamProcessor( - config, customMetricsReporters, (AsyncStreamTaskFactory) taskFactory, listener); - } else { - throw new SamzaException(String.format("%s is not a valid task factory", - taskFactory.getClass().getCanonicalName())); + @Override + public void afterFailure(Throwable t) { + processors.remove(processor); + + // the processor stopped with failure, this is logging the first processor's failure as the cause of + // the whole application failure + if (failure.compareAndSet(null, t)) { + // shutdown the other processors + processors.forEach(StreamProcessor::stop); + } + + // handle the current processor's shutdown failure. + handleProcessorShutdown(t); } - } - /* package private for testing */ - Set<StreamProcessor> getProcessors() { - return processors; - } + private void handleProcessorShutdown(Throwable error) { + if (processors.isEmpty()) { + // all processors are shutdown, setting the application final status + setApplicationFinalStatus(); + } + if (error != null) { + // current processor shutdown with a failure + userDefinedProcessorLifecycleListener.afterFailure(error); + } else { + // current processor shutdown successfully + userDefinedProcessorLifecycleListener.afterStop(); + } + if (processors.isEmpty()) { + // no processor is still running. Notify callers waiting on waitForFinish() + shutdownLatch.countDown(); + } + } - @VisibleForTesting - CountDownLatch getShutdownLatch() { - return shutdownLatch; + private void setApplicationFinalStatus() { + if (failure.get() != null) { + appStatus = ApplicationStatus.unsuccessfulFinish(failure.get()); + } else { + if (appStatus == ApplicationStatus.Running) { + appStatus = ApplicationStatus.SuccessfulFinish; + } else if (appStatus == ApplicationStatus.New) { + // the processor is shutdown before started + appStatus = ApplicationStatus.UnsuccessfulFinish; + } + } + } } } http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java index fe75883..98864d2 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java @@ -20,10 +20,14 @@ package org.apache.samza.runtime; import java.util.HashMap; +import java.util.Map; import java.util.Random; import org.apache.log4j.MDC; import org.apache.samza.SamzaException; -import org.apache.samza.application.StreamApplication; +import org.apache.samza.application.ApplicationDescriptor; +import org.apache.samza.application.ApplicationDescriptorUtil; +import org.apache.samza.application.ApplicationDescriptorImpl; +import org.apache.samza.application.ApplicationUtil; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.config.ShellCommandConfig; @@ -31,11 +35,12 @@ import org.apache.samza.container.ContainerHeartbeatClient; import org.apache.samza.container.ContainerHeartbeatMonitor; import org.apache.samza.container.SamzaContainer; import org.apache.samza.container.SamzaContainer$; -import org.apache.samza.operators.StreamGraphSpec; -import org.apache.samza.util.SamzaUncaughtExceptionHandler; import org.apache.samza.container.SamzaContainerListener; import org.apache.samza.job.model.JobModel; +import org.apache.samza.metrics.MetricsReporter; +import org.apache.samza.task.TaskFactory; import org.apache.samza.task.TaskFactoryUtil; +import org.apache.samza.util.SamzaUncaughtExceptionHandler; import org.apache.samza.util.ScalaJavaUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,37 +80,51 @@ public class LocalContainerRunner { MDC.put("jobName", jobName); MDC.put("jobId", jobId); - StreamApplication streamApp = TaskFactoryUtil.createStreamApplication(config); - Object taskFactory = getTaskFactory(streamApp, config); - run(taskFactory, containerId, jobModel, config); + ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc = + ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(config), config); + run(appDesc, containerId, jobModel, config); System.exit(0); } - private static void run(Object taskFactory, String containerId, JobModel jobModel, Config config) { + private static void run(ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc, String containerId, + JobModel jobModel, Config config) { + TaskFactory taskFactory = TaskFactoryUtil.getTaskFactory(appDesc); SamzaContainer container = SamzaContainer$.MODULE$.apply( containerId, jobModel, config, - ScalaJavaUtil.toScalaMap(new HashMap<>()), + ScalaJavaUtil.toScalaMap(loadMetricsReporters(appDesc, containerId, config)), taskFactory); + ProcessorLifecycleListener listener = appDesc.getProcessorLifecycleListenerFactory() + .createInstance(new ProcessorContext() { }, config); + container.setContainerListener( new SamzaContainerListener() { @Override - public void onContainerStart() { + public void beforeStart() { + log.info("Before starting the container."); + listener.beforeStart(); + } + + @Override + public void afterStart() { log.info("Container Started"); + listener.afterStart(); } @Override - public void onContainerStop() { + public void afterStop() { log.info("Container Stopped"); + listener.afterStop(); } @Override - public void onContainerFailed(Throwable t) { + public void afterFailure(Throwable t) { log.info("Container Failed"); containerRunnerException = t; + listener.afterFailure(t); } }); @@ -126,13 +145,14 @@ public class LocalContainerRunner { } } - private static Object getTaskFactory(StreamApplication streamApp, Config config) { - if (streamApp != null) { - StreamGraphSpec graphSpec = new StreamGraphSpec(config); - streamApp.init(graphSpec, config); - return TaskFactoryUtil.createTaskFactory(graphSpec.getOperatorSpecGraph(), graphSpec.getContextManager()); - } - return TaskFactoryUtil.createTaskFactory(config); + // TODO: this is going away when SAMZA-1168 is done and the initialization of metrics reporters are done via + // LocalApplicationRunner#createStreamProcessor() + private static Map<String, MetricsReporter> loadMetricsReporters( + ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc, String containerId, Config config) { + Map<String, MetricsReporter> reporters = new HashMap<>(); + appDesc.getMetricsReporterFactories().forEach((name, factory) -> + reporters.put(name, factory.getMetricsReporter(name, containerId, config))); + return reporters; } /** http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java index 6229abc..69eb5fe 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java @@ -20,18 +20,17 @@ package org.apache.samza.runtime; import java.time.Duration; -import java.util.UUID; +import java.util.List; import org.apache.samza.SamzaException; -import org.apache.samza.application.StreamApplication; -import org.apache.samza.config.ApplicationConfig; +import org.apache.samza.application.ApplicationDescriptor; +import org.apache.samza.application.ApplicationDescriptorImpl; +import org.apache.samza.application.SamzaApplication; +import org.apache.samza.application.ApplicationDescriptorUtil; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; -import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer; -import org.apache.samza.execution.ExecutionPlan; -import org.apache.samza.execution.StreamManager; +import org.apache.samza.execution.RemoteJobPlanner; import org.apache.samza.job.ApplicationStatus; import org.apache.samza.job.JobRunner; -import org.apache.samza.metrics.MetricsRegistryMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,55 +40,38 @@ import static org.apache.samza.job.ApplicationStatus.*; /** * This class implements the {@link ApplicationRunner} that runs the applications in a remote cluster */ -public class RemoteApplicationRunner extends AbstractApplicationRunner { +public class RemoteApplicationRunner implements ApplicationRunner { private static final Logger LOG = LoggerFactory.getLogger(RemoteApplicationRunner.class); private static final long DEFAULT_SLEEP_DURATION_MS = 2000; - public RemoteApplicationRunner(Config config) { - super(config); - } - - @Override - public void runTask() { - throw new UnsupportedOperationException("Running StreamTask is not implemented for RemoteReplicationRunner"); - } + private final ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc; + private final RemoteJobPlanner planner; /** - * Run the {@link StreamApplication} on the remote cluster - * @param app a StreamApplication + * Constructors a {@link RemoteApplicationRunner} to run the {@code app} with the {@code config}. + * + * @param app application to run + * @param config configuration for the application */ + public RemoteApplicationRunner(SamzaApplication app, Config config) { + this.appDesc = ApplicationDescriptorUtil.getAppDescriptor(app, config); + this.planner = new RemoteJobPlanner(appDesc); + } + @Override - public void run(StreamApplication app) { + public void run() { try { - // TODO: run.id needs to be set for standalone: SAMZA-1531 - // run.id is based on current system time with the most significant bits in UUID (8 digits) to avoid collision - String runId = String.valueOf(System.currentTimeMillis()) + "-" + UUID.randomUUID().toString().substring(0, 8); - LOG.info("The run id for this run is {}", runId); - - // 1. initialize and plan - ExecutionPlan plan = getExecutionPlan(app, runId); - writePlanJsonFile(plan.getPlanAsJson()); - - plan.getJobConfigs().forEach(jobConfig -> { - StreamManager streamManager = null; - try { - // 2. create the necessary streams - streamManager = buildAndStartStreamManager(jobConfig); - if (plan.getApplicationConfig().getAppMode() == ApplicationConfig.ApplicationMode.BATCH) { - streamManager.clearStreamsFromPreviousRun(getConfigFromPrevRun()); - } - streamManager.createStreams(plan.getIntermediateStreams()); - - // 3. submit jobs for remote execution - LOG.info("Starting job {} with config {}", jobConfig.getName(), jobConfig); - JobRunner runner = new JobRunner(jobConfig); - runner.run(true); - } finally { - if (streamManager != null) { - streamManager.stop(); - } - } + List<JobConfig> jobConfigs = planner.prepareJobs(); + if (jobConfigs.isEmpty()) { + throw new SamzaException("No jobs to run."); + } + + // 3. submit jobs for remote execution + jobConfigs.forEach(jobConfig -> { + LOG.info("Starting job {} with config {}", jobConfig.getName(), jobConfig); + JobRunner runner = new JobRunner(jobConfig); + runner.run(true); }); } catch (Throwable t) { throw new SamzaException("Failed to run application", t); @@ -97,12 +79,11 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner { } @Override - public void kill(StreamApplication app) { - + public void kill() { // since currently we only support single actual remote job, we can get its status without // building the execution plan. try { - JobConfig jc = new JobConfig(config); + JobConfig jc = new JobConfig(appDesc.getConfig()); LOG.info("Killing job {}", jc.getName()); JobRunner runner = new JobRunner(jc); runner.kill(); @@ -112,42 +93,25 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner { } @Override - public ApplicationStatus status(StreamApplication app) { - + public ApplicationStatus status() { // since currently we only support single actual remote job, we can get its status without // building the execution plan try { - JobConfig jc = new JobConfig(config); + JobConfig jc = new JobConfig(appDesc.getConfig()); return getApplicationStatus(jc); } catch (Throwable t) { throw new SamzaException("Failed to get status for application", t); } } - /* package private */ ApplicationStatus getApplicationStatus(JobConfig jobConfig) { - JobRunner runner = new JobRunner(jobConfig); - ApplicationStatus status = runner.status(); - LOG.debug("Status is {} for job {}", new Object[]{status, jobConfig.getName()}); - return status; - } - - /** - * Waits until the application finishes. - */ + @Override public void waitForFinish() { waitForFinish(Duration.ofMillis(0)); } - /** - * Waits for {@code timeout} duration for the application to finish. - * If timeout < 1, blocks the caller indefinitely. - * - * @param timeout time to wait for the application to finish - * @return true - application finished before timeout - * false - otherwise - */ + @Override public boolean waitForFinish(Duration timeout) { - JobConfig jobConfig = new JobConfig(config); + JobConfig jobConfig = new JobConfig(appDesc.getConfig()); boolean finished = true; long timeoutInMs = timeout.toMillis(); long startTimeInMs = System.currentTimeMillis(); @@ -181,15 +145,10 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner { return finished; } - private Config getConfigFromPrevRun() { - CoordinatorStreamSystemConsumer consumer = new CoordinatorStreamSystemConsumer(config, new MetricsRegistryMap()); - consumer.register(); - consumer.start(); - consumer.bootstrap(); - consumer.stop(); - - Config cfg = consumer.getConfig(); - LOG.info("Previous config is: " + cfg.toString()); - return cfg; + /* package private */ ApplicationStatus getApplicationStatus(JobConfig jobConfig) { + JobRunner runner = new JobRunner(jobConfig); + ApplicationStatus status = runner.status(); + LOG.debug("Status is {} for job {}", new Object[]{status, jobConfig.getName()}); + return status; } } http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java index dd8d6c3..2ca4e81 100644 --- a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java +++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java @@ -69,10 +69,9 @@ public class StreamOperatorTask implements StreamTask, InitableTask, WindowableT * Initializes this task during startup. * <p> * Implementation: Initializes the runtime {@link OperatorImplGraph} according to user-defined {@link OperatorSpecGraph}. - * The {@link org.apache.samza.operators.StreamGraphSpec} sets the input and output streams and the task-wide - * context manager using the {@link org.apache.samza.operators.StreamGraph} APIs, + * Users set the input and output streams and the task-wide context manager using {@link org.apache.samza.application.StreamApplicationDescriptor} APIs, * and the logical transforms using the {@link org.apache.samza.operators.MessageStream} APIs. After the - * {@link org.apache.samza.operators.StreamGraphSpec} is initialized once by the application, it then creates + * {@link org.apache.samza.application.StreamApplicationDescriptorImpl} is initialized once by the application, it then creates * an immutable {@link OperatorSpecGraph} accordingly, which is passed in to this class to create the {@link OperatorImplGraph} * corresponding to the logical DAG. * http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java b/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java index 38ae854..834777b 100644 --- a/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java +++ b/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java @@ -18,66 +18,53 @@ */ package org.apache.samza.task; +import com.google.common.base.Preconditions; +import org.apache.commons.lang3.StringUtils; import org.apache.samza.SamzaException; -import org.apache.samza.config.ApplicationConfig; -import org.apache.samza.config.Config; +import org.apache.samza.application.ApplicationDescriptor; +import org.apache.samza.application.ApplicationDescriptorImpl; +import org.apache.samza.application.StreamApplicationDescriptorImpl; +import org.apache.samza.application.TaskApplicationDescriptorImpl; import org.apache.samza.config.ConfigException; -import org.apache.samza.application.StreamApplication; -import org.apache.samza.config.TaskConfig; -import org.apache.samza.operators.ContextManager; -import org.apache.samza.operators.OperatorSpecGraph; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.ExecutorService; -import static org.apache.samza.util.ScalaJavaUtil.toScalaFunction; -import static org.apache.samza.util.ScalaJavaUtil.defaultValue; - /** - * This class provides utility functions to load task factory classes based on config, and to wrap {@link StreamTaskFactory} in {@link AsyncStreamTaskFactory} - * when running {@link StreamTask}s in multi-thread mode + * This class provides utility functions to load task factory classes based on config, and to wrap {@link StreamTaskFactory} + * in {@link AsyncStreamTaskFactory} when running {@link StreamTask}s in multi-thread mode */ public class TaskFactoryUtil { private static final Logger log = LoggerFactory.getLogger(TaskFactoryUtil.class); /** - * This method creates a task factory class based on the {@link StreamApplication} + * Creates a {@link TaskFactory} based on {@link ApplicationDescriptorImpl} * - * @param specGraph the {@link OperatorSpecGraph} - * @param contextManager the {@link ContextManager} to set up initial context for {@code specGraph} - * @return a task factory object, either a instance of {@link StreamTaskFactory} or {@link AsyncStreamTaskFactory} + * @param appDesc {@link ApplicationDescriptorImpl} for this application + * @return {@link TaskFactory} object defined by {@code appDesc} */ - public static Object createTaskFactory(OperatorSpecGraph specGraph, ContextManager contextManager) { - return createStreamOperatorTaskFactory(specGraph, contextManager); + public static TaskFactory getTaskFactory(ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc) { + if (appDesc instanceof TaskApplicationDescriptorImpl) { + return ((TaskApplicationDescriptorImpl) appDesc).getTaskFactory(); + } else if (appDesc instanceof StreamApplicationDescriptorImpl) { + return (StreamTaskFactory) () -> new StreamOperatorTask(((StreamApplicationDescriptorImpl) appDesc).getOperatorSpecGraph(), + ((StreamApplicationDescriptorImpl) appDesc).getContextManager()); + } + throw new IllegalArgumentException(String.format("ApplicationDescriptorImpl has to be either TaskApplicationDescriptorImpl or " + + "StreamApplicationDescriptorImpl. class %s is not supported", appDesc.getClass().getName())); } /** - * This method creates a task factory class based on the configuration + * Creates a {@link TaskFactory} based on the configuration. + * <p> + * This should only be used to create {@link TaskFactory} defined in task.class * - * @param config the {@link Config} for this job - * @return a task factory object, either a instance of {@link StreamTaskFactory} or {@link AsyncStreamTaskFactory} - */ - public static Object createTaskFactory(Config config) { - return fromTaskClassConfig(config); - } - - private static StreamTaskFactory createStreamOperatorTaskFactory(OperatorSpecGraph specGraph, ContextManager contextManager) { - return () -> new StreamOperatorTask(specGraph, contextManager); - } - - /** - * Create {@link StreamTaskFactory} or {@link AsyncStreamTaskFactory} based on the configured task.class. - * @param config the {@link Config} - * @return task factory instance + * @param taskClassName the task class name for this job + * @return a {@link TaskFactory} object, either a instance of {@link StreamTaskFactory} or {@link AsyncStreamTaskFactory} */ - private static Object fromTaskClassConfig(Config config) { - // if there is configuration to set the job w/ a specific type of task, instantiate the corresponding task factory - String taskClassName = new TaskConfig(config).getTaskClass().getOrElse(toScalaFunction( - () -> { - throw new ConfigException("No task class defined in the configuration."); - })); - + public static TaskFactory getTaskFactory(String taskClassName) { + Preconditions.checkArgument(StringUtils.isNotBlank(taskClassName), "task.class cannot be empty"); log.info("Got task class name: {}", taskClassName); boolean isAsyncTaskClass; @@ -88,28 +75,22 @@ public class TaskFactoryUtil { } if (isAsyncTaskClass) { - return new AsyncStreamTaskFactory() { - @Override - public AsyncStreamTask createInstance() { - try { - return (AsyncStreamTask) Class.forName(taskClassName).newInstance(); - } catch (Throwable t) { - log.error("Error loading AsyncStreamTask class: {}. error: {}", taskClassName, t); - throw new SamzaException(String.format("Error loading AsyncStreamTask class: %s", taskClassName), t); - } + return (AsyncStreamTaskFactory) () -> { + try { + return (AsyncStreamTask) Class.forName(taskClassName).newInstance(); + } catch (Throwable t) { + log.error("Error loading AsyncStreamTask class: {}. error: {}", taskClassName, t); + throw new SamzaException(String.format("Error loading AsyncStreamTask class: %s", taskClassName), t); } }; } - return new StreamTaskFactory() { - @Override - public StreamTask createInstance() { - try { - return (StreamTask) Class.forName(taskClassName).newInstance(); - } catch (Throwable t) { - log.error("Error loading StreamTask class: {}. error: {}", taskClassName, t); - throw new SamzaException(String.format("Error loading StreamTask class: %s", taskClassName), t); - } + return (StreamTaskFactory) () -> { + try { + return (StreamTask) Class.forName(taskClassName).newInstance(); + } catch (Throwable t) { + log.error("Error loading StreamTask class: {}. error: {}", taskClassName, t); + throw new SamzaException(String.format("Error loading StreamTask class: %s", taskClassName), t); } }; } @@ -123,7 +104,7 @@ public class TaskFactoryUtil { * @param taskThreadPool the thread pool to run the {@link AsyncStreamTaskAdapter} tasks * @return the finalized task factory object */ - public static Object finalizeTaskFactory(Object factory, boolean singleThreadMode, ExecutorService taskThreadPool) { + public static TaskFactory finalizeTaskFactory(TaskFactory factory, boolean singleThreadMode, ExecutorService taskThreadPool) { validateFactory(factory); @@ -138,18 +119,13 @@ public class TaskFactoryUtil { if (!singleThreadMode && !isAsyncTaskClass) { log.info("Converting StreamTask to AsyncStreamTaskAdapter when running StreamTask with multiple threads"); - return new AsyncStreamTaskFactory() { - @Override - public AsyncStreamTask createInstance() { - return new AsyncStreamTaskAdapter(((StreamTaskFactory) factory).createInstance(), taskThreadPool); - } - }; + return (AsyncStreamTaskFactory) () -> new AsyncStreamTaskAdapter(((StreamTaskFactory) factory).createInstance(), taskThreadPool); } return factory; } - private static void validateFactory(Object factory) { + private static void validateFactory(TaskFactory factory) { if (factory == null) { throw new SamzaException("Either the task class name or the task factory instance is required."); } @@ -160,33 +136,4 @@ public class TaskFactoryUtil { } } - /** - * Returns {@link StreamApplication} if it's configured, otherwise null. - * @param config Config - * throws {@link ConfigException} if there is misconfiguration of StreamApp. - * @return {@link StreamApplication} instance - */ - public static StreamApplication createStreamApplication(Config config) { - ApplicationConfig appConfig = new ApplicationConfig(config); - if (appConfig.getAppClass() != null && !appConfig.getAppClass().isEmpty()) { - TaskConfig taskConfig = new TaskConfig(config); - String taskClassName = taskConfig.getTaskClass().getOrElse(defaultValue(null)); - if (taskClassName != null && !taskClassName.isEmpty()) { - throw new ConfigException("High level StreamApplication API cannot be used together with low-level API using task.class."); - } - - String appClassName = appConfig.getAppClass(); - try { - Class<?> builderClass = Class.forName(appClassName); - return (StreamApplication) builderClass.newInstance(); - } catch (Throwable t) { - String errorMsg = String.format("Failed to create StreamApplication class from the config. %s = %s", - ApplicationConfig.APP_CLASS, appConfig.getAppClass()); - log.error(errorMsg, t); - throw new ConfigException(errorMsg, t); - } - } else { - return null; - } - } } http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 0c889d2..68de4a6 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -124,7 +124,7 @@ object SamzaContainer extends Logging { jobModel: JobModel, config: Config, customReporters: Map[String, MetricsReporter] = Map[String, MetricsReporter](), - taskFactory: Object) = { + taskFactory: TaskFactory[_]) = { val containerModel = jobModel.getContainers.get(containerId) val containerName = "samza-container-%s" format containerId val maxChangeLogStreamPartitions = jobModel.maxChangeLogStreamPartitions @@ -786,6 +786,10 @@ class SamzaContainer( try { info("Starting container.") + if (containerListener != null) { + containerListener.beforeStart() + } + val startTime = System.nanoTime() status = SamzaContainerStatus.STARTING @@ -809,7 +813,7 @@ class SamzaContainer( info("Entering run loop.") status = SamzaContainerStatus.STARTED if (containerListener != null) { - containerListener.onContainerStart() + containerListener.afterStart() } metrics.containerStartupTime.update(System.nanoTime() - startTime) runLoop.run @@ -860,11 +864,11 @@ class SamzaContainer( status match { case SamzaContainerStatus.STOPPED => if (containerListener != null) { - containerListener.onContainerStop() + containerListener.afterStop() } case SamzaContainerStatus.FAILED => if (containerListener != null) { - containerListener.onContainerFailed(exceptionSeen) + containerListener.afterFailure(exceptionSeen) } } } @@ -876,8 +880,8 @@ class SamzaContainer( * <br> * <b>Implementation</b>: Stops the [[RunLoop]], which will eventually transition the container from * [[SamzaContainerStatus.STARTED]] to either [[SamzaContainerStatus.STOPPED]] or [[SamzaContainerStatus.FAILED]]]. - * Based on the final `status`, [[SamzaContainerListener#onContainerStop(boolean)]] or - * [[SamzaContainerListener#onContainerFailed(Throwable)]] will be invoked respectively. + * Based on the final `status`, [[SamzaContainerListener#afterStop()]] or + * [[SamzaContainerListener#afterFailure(Throwable]] will be invoked respectively. * * @throws SamzaException, Thrown when the container has already been stopped or failed */ http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala index bf4f252..5f4338c 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala @@ -20,8 +20,6 @@ package org.apache.samza.job -import java.util.concurrent.TimeUnit - import org.apache.samza.SamzaException import org.apache.samza.config._ import org.apache.samza.config.JobConfig.Config2Job http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala index 0b472aa..abd7f65 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala @@ -19,6 +19,7 @@ package org.apache.samza.job.local +import org.apache.samza.application.{ApplicationDescriptorUtil, ApplicationUtil} import org.apache.samza.config.{Config, TaskConfigJava} import org.apache.samza.config.JobConfig._ import org.apache.samza.config.ShellCommandConfig._ @@ -27,8 +28,9 @@ import org.apache.samza.coordinator.JobModelManager import org.apache.samza.coordinator.stream.CoordinatorStreamManager import org.apache.samza.job.{StreamJob, StreamJobFactory} import org.apache.samza.metrics.{JmxServer, MetricsRegistryMap, MetricsReporter} -import org.apache.samza.operators.StreamGraphSpec +import org.apache.samza.runtime.ProcessorContext import org.apache.samza.storage.ChangelogStreamManager +import org.apache.samza.task.TaskFactory import org.apache.samza.task.TaskFactoryUtil import org.apache.samza.util.Logging @@ -70,32 +72,36 @@ class ThreadJobFactory extends StreamJobFactory with Logging { val containerId = "0" val jmxServer = new JmxServer - val streamApp = TaskFactoryUtil.createStreamApplication(config) - - val taskFactory = if (streamApp != null) { - val graphSpec = new StreamGraphSpec(config) - streamApp.init(graphSpec, config) - TaskFactoryUtil.createTaskFactory(graphSpec.getOperatorSpecGraph(), graphSpec.getContextManager) - } else { - TaskFactoryUtil.createTaskFactory(config) - } + + val appDesc = ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(config), config) + val taskFactory : TaskFactory[_] = TaskFactoryUtil.getTaskFactory(appDesc) // Give developers a nice friendly warning if they've specified task.opts and are using a threaded job. config.getTaskOpts match { - case Some(taskOpts) => warn("%s was specified in config, but is not being used because job is being executed with ThreadJob. You probably want to run %s=%s." format (TASK_JVM_OPTS, STREAM_JOB_FACTORY_CLASS, classOf[ProcessJobFactory].getName)) + case Some(taskOpts) => warn("%s was specified in config, but is not being used because job is being executed with ThreadJob. " + + "You probably want to run %s=%s." format (TASK_JVM_OPTS, STREAM_JOB_FACTORY_CLASS, classOf[ProcessJobFactory].getName)) case _ => None } - val containerListener = new SamzaContainerListener { - override def onContainerFailed(t: Throwable): Unit = { - error("Container failed.", t) - throw t - } - - override def onContainerStop(): Unit = { - } - - override def onContainerStart(): Unit = { + val containerListener = { + val processorLifecycleListener = appDesc.getProcessorLifecycleListenerFactory().createInstance(new ProcessorContext() { }, config) + new SamzaContainerListener { + override def afterFailure(t: Throwable): Unit = { + processorLifecycleListener.afterFailure(t) + throw t + } + + override def afterStart(): Unit = { + processorLifecycleListener.afterStart() + } + + override def afterStop(): Unit = { + processorLifecycleListener.afterStop() + } + + override def beforeStart(): Unit = { + processorLifecycleListener.beforeStart() + } } } http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/application/MockStreamApplication.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/application/MockStreamApplication.java b/samza-core/src/test/java/org/apache/samza/application/MockStreamApplication.java new file mode 100644 index 0000000..ccd88b8 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/application/MockStreamApplication.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.application; + +/** + * Test class of {@link StreamApplication} for unit tests + */ +public class MockStreamApplication implements StreamApplication { + @Override + public void describe(StreamApplicationDescriptor appSpec) { + + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/application/TestApplicationUtil.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/application/TestApplicationUtil.java b/samza-core/src/test/java/org/apache/samza/application/TestApplicationUtil.java new file mode 100644 index 0000000..9b590c4 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/application/TestApplicationUtil.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.application; + +import java.util.HashMap; +import java.util.Map; +import org.apache.samza.config.ApplicationConfig; +import org.apache.samza.config.Config; +import org.apache.samza.config.ConfigException; +import org.apache.samza.config.MapConfig; +import org.apache.samza.config.TaskConfig; +import org.apache.samza.task.MockStreamTask; +import org.junit.Test; + +import static org.junit.Assert.assertTrue; + + +/** + * Unit tests for {@link ApplicationUtil} + */ +public class TestApplicationUtil { + + @Test + public void testStreamAppClass() { + Map<String, String> configMap = new HashMap<>(); + configMap.put(ApplicationConfig.APP_CLASS, MockStreamApplication.class.getName()); + SamzaApplication app = ApplicationUtil.fromConfig(new MapConfig(configMap)); + assertTrue(app instanceof MockStreamApplication); + + configMap.put(TaskConfig.TASK_CLASS(), MockStreamTask.class.getName()); + app = ApplicationUtil.fromConfig(new MapConfig(configMap)); + assertTrue(app instanceof MockStreamApplication); + } + + @Test + public void testTaskAppClass() { + Map<String, String> configMap = new HashMap<>(); + configMap.put(ApplicationConfig.APP_CLASS, MockTaskApplication.class.getName()); + SamzaApplication app = ApplicationUtil.fromConfig(new MapConfig(configMap)); + assertTrue(app instanceof MockTaskApplication); + + configMap.put(TaskConfig.TASK_CLASS(), MockStreamTask.class.getName()); + app = ApplicationUtil.fromConfig(new MapConfig(configMap)); + assertTrue(app instanceof MockTaskApplication); + } + + @Test + public void testTaskClassOnly() { + Map<String, String> configMap = new HashMap<>(); + configMap.put(TaskConfig.TASK_CLASS(), MockStreamTask.class.getName()); + Config config = new MapConfig(configMap); + SamzaApplication app = ApplicationUtil.fromConfig(config); + assertTrue(app instanceof TaskApplication); + TaskApplicationDescriptorImpl appSpec = new TaskApplicationDescriptorImpl((TaskApplication) app, config); + assertTrue(appSpec.getTaskFactory().createInstance() instanceof MockStreamTask); + } + + @Test(expected = ConfigException.class) + public void testEmptyTaskClassOnly() { + Map<String, String> configMap = new HashMap<>(); + configMap.put(TaskConfig.TASK_CLASS(), ""); + ApplicationUtil.fromConfig(new MapConfig(configMap)); + } + + @Test(expected = ConfigException.class) + public void testNoAppClassNoTaskClass() { + Map<String, String> configMap = new HashMap<>(); + ApplicationUtil.fromConfig(new MapConfig(configMap)); + } + + /** + * Test class of {@link TaskApplication} for unit tests + */ + public static class MockTaskApplication implements TaskApplication { + @Override + public void describe(TaskApplicationDescriptor appSpec) { + + } + } +} \ No newline at end of file