http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java b/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java index af20fd7..1954cc3 100644 --- a/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java +++ b/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java @@ -19,9 +19,8 @@ package org.apache.samza.test.integration; import org.apache.samza.application.StreamApplication; -import org.apache.samza.config.Config; +import org.apache.samza.application.StreamApplicationDescriptor; import org.apache.samza.operators.KV; -import org.apache.samza.operators.StreamGraph; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.system.kafka.KafkaInputDescriptor; @@ -38,9 +37,9 @@ public class TestStandaloneIntegrationApplication implements StreamApplication { private static final Logger LOGGER = LoggerFactory.getLogger(TestStandaloneIntegrationApplication.class); @Override - public void init(StreamGraph graph, Config config) { + public void describe(StreamApplicationDescriptor appDesc) { String systemName = "testSystemName"; - String inputStreamName = config.get("input.stream.name"); + String inputStreamName = appDesc.getConfig().get("input.stream.name"); String outputStreamName = "standaloneIntegrationTestKafkaOutputTopic"; LOGGER.info("Publishing message from: {} to: {}.", inputStreamName, outputStreamName); KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor(systemName); @@ -50,6 +49,6 @@ public class TestStandaloneIntegrationApplication implements StreamApplication { kafkaSystemDescriptor.getInputDescriptor(inputStreamName, noOpSerde); KafkaOutputDescriptor<KV<Object, Object>> osd = kafkaSystemDescriptor.getOutputDescriptor(inputStreamName, noOpSerde); - graph.getInputStream(isd).sendTo(graph.getOutputStream(osd)); + appDesc.getInputStream(isd).sendTo(appDesc.getOutputStream(osd)); } }
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java index 189ae9b..66cf061 100644 --- a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java +++ b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java @@ -46,6 +46,7 @@ import org.apache.samza.config.TaskConfigJava; import org.apache.samza.config.ZkConfig; import org.apache.samza.coordinator.JobCoordinator; import org.apache.samza.coordinator.JobCoordinatorFactory; +import org.apache.samza.runtime.ProcessorLifecycleListener; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.OutgoingMessageEnvelope; import org.apache.samza.system.SystemStream; @@ -135,9 +136,14 @@ public class TestZkStreamProcessorBase extends StandaloneIntegrationTestHarness String jobCoordinatorFactoryClassName = new JobCoordinatorConfig(config).getJobCoordinatorFactoryClassName(); JobCoordinator jobCoordinator = Util.getObj(jobCoordinatorFactoryClassName, JobCoordinatorFactory.class).getJobCoordinator(config); - StreamProcessorLifecycleListener listener = new StreamProcessorLifecycleListener() { + ProcessorLifecycleListener listener = new ProcessorLifecycleListener() { @Override - public void onStart() { + public void beforeStart() { + + } + + @Override + public void afterStart() { if (waitStart != null) { waitStart.countDown(); } @@ -145,16 +151,18 @@ public class TestZkStreamProcessorBase extends StandaloneIntegrationTestHarness } @Override - public void onShutdown() { + public void afterStop() { + // stopped w/o failure if (waitStop != null) { waitStop.countDown(); } - LOG.info("onShutdown is called for pid=" + pId); + LOG.info("afterStop is called for pid=" + pId + " with successful shutdown"); } @Override - public void onFailure(Throwable t) { - LOG.info("onFailure is called for pid=" + pId); + public void afterFailure(Throwable t) { + // stopped w/ failure + LOG.info("afterStop is called for pid=" + pId + " with failure"); } }; http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java index b86c6af..d2aab11 100644 --- a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; +import org.apache.samza.application.StreamApplicationDescriptor; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.JobConfig; import org.apache.samza.config.JobCoordinatorConfig; @@ -34,7 +35,8 @@ import org.apache.samza.operators.KV; import org.apache.samza.operators.descriptors.GenericInputDescriptor; import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor; import org.apache.samza.operators.functions.MapFunction; -import org.apache.samza.runtime.LocalApplicationRunner; +import org.apache.samza.runtime.ApplicationRunner; +import org.apache.samza.runtime.ApplicationRunners; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory; @@ -94,20 +96,25 @@ public class EndOfStreamIntegrationTest extends AbstractIntegrationTestHarness { configs.put("serializers.registry.int.class", "org.apache.samza.serializers.IntegerSerdeFactory"); configs.put("serializers.registry.json.class", PageViewJsonSerdeFactory.class.getName()); - final LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(configs)); - final StreamApplication app = (streamGraph, cfg) -> { - DelegatingSystemDescriptor sd = new DelegatingSystemDescriptor("test"); - GenericInputDescriptor<KV<String, PageView>> isd = - sd.getInputDescriptor("PageView", KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>())); - streamGraph.getInputStream(isd) - .map(Values.create()) - .partitionBy(pv -> pv.getMemberId(), pv -> pv, "p1") - .sink((m, collector, coordinator) -> { - received.add(m.getValue()); - }); - }; - - runner.run(app); + class PipelineApplication implements StreamApplication { + + @Override + public void describe(StreamApplicationDescriptor appDesc) { + DelegatingSystemDescriptor sd = new DelegatingSystemDescriptor("test"); + GenericInputDescriptor<KV<String, PageView>> isd = + sd.getInputDescriptor("PageView", KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>())); + appDesc.getInputStream(isd) + .map(Values.create()) + .partitionBy(pv -> pv.getMemberId(), pv -> pv, "p1") + .sink((m, collector, coordinator) -> { + received.add(m.getValue()); + }); + } + } + + final ApplicationRunner runner = ApplicationRunners.getApplicationRunner(new PipelineApplication(), new MapConfig(configs)); + + runner.run(); runner.waitForFinish(); assertEquals(received.size(), count * partitionCount); http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java index 5336595..05818e9 100644 --- a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java @@ -19,6 +19,7 @@ package org.apache.samza.test.controlmessages; +import org.apache.samza.application.SamzaApplication; import scala.collection.JavaConverters; import java.lang.reflect.Field; @@ -28,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import org.apache.samza.Partition; +import org.apache.samza.application.StreamApplicationDescriptor; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; @@ -49,8 +51,9 @@ import org.apache.samza.operators.impl.TestOperatorImpl; import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.processor.StreamProcessor; import org.apache.samza.processor.TestStreamProcessorUtil; +import org.apache.samza.runtime.ApplicationRunner; +import org.apache.samza.runtime.ApplicationRunners; import org.apache.samza.runtime.LocalApplicationRunner; -import org.apache.samza.runtime.TestLocalApplicationRunner; import org.apache.samza.serializers.IntegerSerdeFactory; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.NoOpSerde; @@ -123,7 +126,7 @@ public class WatermarkIntegrationTest extends AbstractIntegrationTestHarness { @Test public void testWatermark() throws Exception { Map<String, String> configs = new HashMap<>(); - configs.put("app.runner.class", "org.apache.samza.runtime.LocalApplicationRunner"); + configs.put("app.runner.class", MockLocalApplicationRunner.class.getName()); configs.put("systems.test.samza.factory", TestSystemFactory.class.getName()); configs.put("streams.PageView.samza.system", "test"); configs.put("streams.PageView.partitionCount", String.valueOf(PARTITION_COUNT)); @@ -147,22 +150,27 @@ public class WatermarkIntegrationTest extends AbstractIntegrationTestHarness { configs.put("serializers.registry.json.class", PageViewJsonSerdeFactory.class.getName()); List<PageView> received = new ArrayList<>(); - final StreamApplication app = (streamGraph, cfg) -> { - DelegatingSystemDescriptor sd = new DelegatingSystemDescriptor("test"); - GenericInputDescriptor<KV<String, PageView>> isd = - sd.getInputDescriptor("PageView", KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>())); - streamGraph.getInputStream(isd) - .map(EndOfStreamIntegrationTest.Values.create()) - .partitionBy(pv -> pv.getMemberId(), pv -> pv, "p1") - .sink((m, collector, coordinator) -> { - received.add(m.getValue()); - }); - }; - - LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(configs)); - runner.run(app); + class TestStreamApp implements StreamApplication { + + @Override + public void describe(StreamApplicationDescriptor appDesc) { + DelegatingSystemDescriptor sd = new DelegatingSystemDescriptor("test"); + GenericInputDescriptor<KV<String, PageView>> isd = + sd.getInputDescriptor("PageView", KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>())); + appDesc.getInputStream(isd) + .map(EndOfStreamIntegrationTest.Values.create()) + .partitionBy(pv -> pv.getMemberId(), pv -> pv, "p1") + .sink((m, collector, coordinator) -> { + received.add(m.getValue()); + }); + } + } + + final ApplicationRunner runner = ApplicationRunners.getApplicationRunner(new TestStreamApp(), new MapConfig(configs)); + runner.run(); + // processors are only available when the app is running - Map<String, StreamOperatorTask> tasks = getTaskOperationGraphs(runner); + Map<String, StreamOperatorTask> tasks = getTaskOperationGraphs((MockLocalApplicationRunner) runner); runner.waitForFinish(); // wait for the completion to ensure that all tasks are actually initialized and the OperatorImplGraph is initialized @@ -185,8 +193,8 @@ public class WatermarkIntegrationTest extends AbstractIntegrationTestHarness { assertEquals(TestOperatorImpl.getOutputWatermark(sink), 3); } - Map<String, StreamOperatorTask> getTaskOperationGraphs(LocalApplicationRunner runner) throws Exception { - StreamProcessor processor = TestLocalApplicationRunner.getProcessors(runner).iterator().next(); + Map<String, StreamOperatorTask> getTaskOperationGraphs(MockLocalApplicationRunner runner) throws Exception { + StreamProcessor processor = runner.getProcessors().iterator().next(); SamzaContainer container = TestStreamProcessorUtil.getContainer(processor); Map<TaskName, TaskInstance> taskInstances = JavaConverters.mapAsJavaMapConverter(container.getTaskInstances()).asJava(); Map<String, StreamOperatorTask> tasks = new HashMap<>(); @@ -214,4 +222,20 @@ public class WatermarkIntegrationTest extends AbstractIntegrationTestHarness { } return null; } + + public static class MockLocalApplicationRunner extends LocalApplicationRunner { + + /** + * Default constructor that is required by any implementation of {@link ApplicationRunner} + * @param userApp user application + * @param config user configuration + */ + public MockLocalApplicationRunner(SamzaApplication userApp, Config config) { + super(userApp, config); + } + + protected Set<StreamProcessor> getProcessors() { + return super.getProcessors(); + } + } } http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/test/java/org/apache/samza/test/framework/BroadcastAssertApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/BroadcastAssertApp.java b/samza-test/src/test/java/org/apache/samza/test/framework/BroadcastAssertApp.java index aca6c40..4caf266 100644 --- a/samza-test/src/test/java/org/apache/samza/test/framework/BroadcastAssertApp.java +++ b/samza-test/src/test/java/org/apache/samza/test/framework/BroadcastAssertApp.java @@ -21,9 +21,9 @@ package org.apache.samza.test.framework; import java.util.Arrays; import org.apache.samza.application.StreamApplication; +import org.apache.samza.application.StreamApplicationDescriptor; import org.apache.samza.config.Config; import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.StreamGraph; import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.system.kafka.KafkaInputDescriptor; import org.apache.samza.system.kafka.KafkaSystemDescriptor; @@ -35,13 +35,14 @@ public class BroadcastAssertApp implements StreamApplication { @Override - public void init(StreamGraph graph, Config config) { + public void describe(StreamApplicationDescriptor appDesc) { + Config config = appDesc.getConfig(); String inputTopic = config.get(INPUT_TOPIC_NAME_PROP); final JsonSerdeV2<PageView> serde = new JsonSerdeV2<>(PageView.class); KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(SYSTEM); KafkaInputDescriptor<PageView> isd = ksd.getInputDescriptor(inputTopic, serde); - final MessageStream<PageView> broadcastPageViews = graph + final MessageStream<PageView> broadcastPageViews = appDesc .getInputStream(isd) .broadcast(serde, "pv"); http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java index 6fdf887..1000f22 100644 --- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java @@ -42,22 +42,21 @@ import org.junit.Test; import static org.apache.samza.test.controlmessages.TestData.PageView; - public class StreamApplicationIntegrationTest { - final StreamApplication pageViewFilter = (streamGraph, cfg) -> { + final StreamApplication pageViewFilter = streamAppDesc -> { KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("test"); KafkaInputDescriptor<KV<String, PageView>> isd = ksd.getInputDescriptor("PageView", KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>())); - MessageStream<KV<String, TestData.PageView>> inputStream = streamGraph.getInputStream(isd); + MessageStream<KV<String, TestData.PageView>> inputStream = streamAppDesc.getInputStream(isd); inputStream.map(StreamApplicationIntegrationTest.Values.create()).filter(pv -> pv.getPageKey().equals("inbox")); }; - final StreamApplication pageViewRepartition = (streamGraph, cfg) -> { + final StreamApplication pageViewRepartition = streamAppDesc -> { KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("test"); KafkaInputDescriptor<KV<String, PageView>> isd = ksd.getInputDescriptor("PageView", KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>())); - MessageStream<KV<String, TestData.PageView>> inputStream = streamGraph.getInputStream(isd); + MessageStream<KV<String, TestData.PageView>> inputStream = streamAppDesc.getInputStream(isd); inputStream .map(Values.create()) .partitionBy(PageView::getMemberId, pv -> pv, "p1") http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java index 810d2c2..7f13282 100644 --- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java +++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java @@ -18,6 +18,15 @@ */ package org.apache.samza.test.framework; +import java.io.File; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; import kafka.utils.TestUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -31,23 +40,13 @@ import org.apache.samza.config.Config; import org.apache.samza.config.KafkaConfig; import org.apache.samza.config.MapConfig; import org.apache.samza.execution.TestStreamManager; -import org.apache.samza.runtime.AbstractApplicationRunner; import org.apache.samza.runtime.ApplicationRunner; +import org.apache.samza.runtime.ApplicationRunners; import org.apache.samza.system.kafka.KafkaSystemAdmin; import org.apache.samza.test.harness.AbstractIntegrationTestHarness; import scala.Option; import scala.Option$; -import java.io.File; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Properties; - /** * Harness for writing integration tests for {@link StreamApplication}s. * @@ -210,10 +209,9 @@ public class StreamApplicationIntegrationTestHarness extends AbstractIntegration } /** - * Executes the provided {@link StreamApplication} as a {@link org.apache.samza.job.local.ThreadJob}. The - * {@link StreamApplication} runs in its own separate thread. + * Executes the provided {@link org.apache.samza.application.StreamApplication} as a {@link org.apache.samza.job.local.ThreadJob}. The + * {@link org.apache.samza.application.StreamApplication} runs in its own separate thread. * - * @param streamApplication the application to run * @param appName the name of the application * @param overriddenConfigs configs to override * @return RunApplicationContext which contains objects created within runApplication, to be used for verification @@ -223,7 +221,7 @@ public class StreamApplicationIntegrationTestHarness extends AbstractIntegration String appName, Map<String, String> overriddenConfigs) { Map<String, String> configMap = new HashMap<>(); - configMap.put("job.factory.class", "org.apache.samza.job.local.ThreadJobFactory"); + configMap.put("app.runner.class", "org.apache.samza.runtime.LocalApplicationRunner"); configMap.put("job.name", appName); configMap.put("app.class", streamApplication.getClass().getCanonicalName()); configMap.put("serializers.registry.json.class", "org.apache.samza.serializers.JsonSerdeFactory"); @@ -256,17 +254,13 @@ public class StreamApplicationIntegrationTestHarness extends AbstractIntegration } Config config = new MapConfig(configMap); - AbstractApplicationRunner runner = (AbstractApplicationRunner) ApplicationRunner.fromConfig(config); - runner.run(streamApplication); + ApplicationRunner runner = ApplicationRunners.getApplicationRunner(streamApplication, config); + runner.run(); MessageStreamAssert.waitForComplete(); return new RunApplicationContext(runner, config); } - public void setNumEmptyPolls(int numEmptyPolls) { - this.numEmptyPolls = numEmptyPolls; - } - /** * Shutdown and clear Zookeeper and Kafka broker state. */ @@ -283,15 +277,15 @@ public class StreamApplicationIntegrationTestHarness extends AbstractIntegration * runApplication in order to do verification. */ protected static class RunApplicationContext { - private final AbstractApplicationRunner runner; + private final ApplicationRunner runner; private final Config config; - private RunApplicationContext(AbstractApplicationRunner runner, Config config) { + private RunApplicationContext(ApplicationRunner runner, Config config) { this.runner = runner; this.config = config; } - public AbstractApplicationRunner getRunner() { + public ApplicationRunner getRunner() { return this.runner; } http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/test/java/org/apache/samza/test/framework/TestTimerApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/TestTimerApp.java b/samza-test/src/test/java/org/apache/samza/test/framework/TestTimerApp.java index 91234b7..e72a965 100644 --- a/samza-test/src/test/java/org/apache/samza/test/framework/TestTimerApp.java +++ b/samza-test/src/test/java/org/apache/samza/test/framework/TestTimerApp.java @@ -25,9 +25,8 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import org.apache.samza.application.StreamApplication; -import org.apache.samza.config.Config; +import org.apache.samza.application.StreamApplicationDescriptor; import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.StreamGraph; import org.apache.samza.operators.TimerRegistry; import org.apache.samza.operators.functions.FlatMapFunction; import org.apache.samza.operators.functions.TimerFunction; @@ -40,11 +39,11 @@ public class TestTimerApp implements StreamApplication { public static final String PAGE_VIEWS = "page-views"; @Override - public void init(StreamGraph graph, Config config) { + public void describe(StreamApplicationDescriptor appDesc) { final JsonSerdeV2<PageView> serde = new JsonSerdeV2<>(PageView.class); KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("kafka"); KafkaInputDescriptor<PageView> isd = ksd.getInputDescriptor(PAGE_VIEWS, serde); - final MessageStream<PageView> pageViews = graph.getInputStream(isd); + final MessageStream<PageView> pageViews = appDesc.getInputStream(isd); final MessageStream<PageView> output = pageViews.flatMap(new FlatmapTimerFn()); MessageStreamAssert.that("Output from timer function should container all complete messages", output, serde) http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/test/java/org/apache/samza/test/framework/TimerTest.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/TimerTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/TimerTest.java index a48409c..d4e0e14 100644 --- a/samza-test/src/test/java/org/apache/samza/test/framework/TimerTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/framework/TimerTest.java @@ -19,11 +19,14 @@ package org.apache.samza.test.framework; +import java.util.HashMap; +import java.util.Map; +import org.apache.samza.config.JobConfig; +import org.apache.samza.config.JobCoordinatorConfig; import org.junit.Before; import org.junit.Test; - -import static org.apache.samza.test.framework.TestTimerApp.PAGE_VIEWS; +import static org.apache.samza.test.framework.TestTimerApp.*; public class TimerTest extends StreamApplicationIntegrationTestHarness { @@ -44,7 +47,14 @@ public class TimerTest extends StreamApplicationIntegrationTestHarness { } @Test - public void testJob() { - runApplication(new TestTimerApp(), "TimerTest", null); + public void testJob() throws InterruptedException { + Map<String, String> configs = new HashMap<>(); + configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory"); + configs.put("job.systemstreampartition.grouper.factory", "org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory"); + configs.put("task.name.grouper.factory", "org.apache.samza.container.grouper.task.SingleContainerGrouperFactory"); + configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, "org.apache.samza.standalone.PassthroughCoordinationUtilsFactory"); + configs.put(JobConfig.PROCESSOR_ID(), "0"); + + runApplication(new TestTimerApp(), "TimerTest", configs); } } http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java index 8ee7e00..c63c11f 100644 --- a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java +++ b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java @@ -19,17 +19,17 @@ package org.apache.samza.test.operator; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import org.apache.samza.application.StreamApplication; +import org.apache.samza.application.StreamApplicationDescriptor; import org.apache.samza.config.Config; import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.StreamGraph; import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.operators.stream.IntermediateMessageStreamImpl; import org.apache.samza.operators.windows.Windows; -import org.apache.samza.runtime.LocalApplicationRunner; import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.StringSerde; @@ -42,9 +42,6 @@ import org.apache.samza.test.operator.data.AdClick; import org.apache.samza.test.operator.data.PageView; import org.apache.samza.test.operator.data.UserPageAdClick; -import java.time.Duration; -import org.apache.samza.util.CommandLine; - /** * A {@link StreamApplication} that demonstrates a partitionBy, stream-stream join and a windowed count. @@ -57,21 +54,11 @@ public class RepartitionJoinWindowApp implements StreamApplication { private final List<String> intermediateStreamIds = new ArrayList<>(); - public static void main(String[] args) throws Exception { - CommandLine cmdLine = new CommandLine(); - Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); - - RepartitionJoinWindowApp application = new RepartitionJoinWindowApp(); - LocalApplicationRunner runner = new LocalApplicationRunner(config); - - runner.run(application); - runner.waitForFinish(); - } - @Override - public void init(StreamGraph graph, Config config) { + public void describe(StreamApplicationDescriptor appDesc) { // offset.default = oldest required for tests since checkpoint topic is empty on start and messages are published // before the application is run + Config config = appDesc.getConfig(); String inputTopic1 = config.get(INPUT_TOPIC_1_CONFIG_KEY); String inputTopic2 = config.get(INPUT_TOPIC_2_CONFIG_KEY); String outputTopic = config.get(OUTPUT_TOPIC_CONFIG_KEY); @@ -79,8 +66,8 @@ public class RepartitionJoinWindowApp implements StreamApplication { KafkaInputDescriptor<PageView> id1 = ksd.getInputDescriptor(inputTopic1, new JsonSerdeV2<>(PageView.class)); KafkaInputDescriptor<AdClick> id2 = ksd.getInputDescriptor(inputTopic2, new JsonSerdeV2<>(AdClick.class)); - MessageStream<PageView> pageViews = graph.getInputStream(id1); - MessageStream<AdClick> adClicks = graph.getInputStream(id2); + MessageStream<PageView> pageViews = appDesc.getInputStream(id1); + MessageStream<AdClick> adClicks = appDesc.getInputStream(id2); MessageStream<KV<String, PageView>> pageViewsRepartitionedByViewId = pageViews .partitionBy(PageView::getViewId, pv -> pv, http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java index ae3669f..79a25e7 100644 --- a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java +++ b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java @@ -21,11 +21,9 @@ package org.apache.samza.test.operator; import java.time.Duration; import org.apache.samza.application.StreamApplication; -import org.apache.samza.config.Config; +import org.apache.samza.application.StreamApplicationDescriptor; import org.apache.samza.operators.KV; -import org.apache.samza.operators.StreamGraph; import org.apache.samza.operators.windows.Windows; -import org.apache.samza.runtime.LocalApplicationRunner; import org.apache.samza.serializers.IntegerSerde; import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; @@ -34,7 +32,6 @@ import org.apache.samza.system.kafka.KafkaInputDescriptor; import org.apache.samza.system.kafka.KafkaOutputDescriptor; import org.apache.samza.system.kafka.KafkaSystemDescriptor; import org.apache.samza.test.operator.data.PageView; -import org.apache.samza.util.CommandLine; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,30 +45,21 @@ public class RepartitionWindowApp implements StreamApplication { static final String INPUT_TOPIC = "page-views"; static final String OUTPUT_TOPIC = "Result"; - public static void main(String[] args) { - CommandLine cmdLine = new CommandLine(); - Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); - RepartitionWindowApp reparApp = new RepartitionWindowApp(); - LocalApplicationRunner runner = new LocalApplicationRunner(config); - - runner.run(reparApp); - runner.waitForFinish(); - } @Override - public void init(StreamGraph graph, Config config) { + public void describe(StreamApplicationDescriptor appDesc) { KVSerde<String, PageView> inputSerde = KVSerde.of(new StringSerde("UTF-8"), new JsonSerdeV2<>(PageView.class)); KVSerde<String, String> outputSerde = KVSerde.of(new StringSerde(), new StringSerde()); KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(SYSTEM); KafkaInputDescriptor<KV<String, PageView>> id = ksd.getInputDescriptor(INPUT_TOPIC, inputSerde); KafkaOutputDescriptor<KV<String, String>> od = ksd.getOutputDescriptor(OUTPUT_TOPIC, outputSerde); - graph.getInputStream(id) + appDesc.getInputStream(id) .map(KV::getValue) .partitionBy(PageView::getUserId, m -> m, inputSerde, "p1") .window(Windows.keyedSessionWindow(m -> m.getKey(), Duration.ofSeconds(3), () -> 0, (m, c) -> c + 1, new StringSerde("UTF-8"), new IntegerSerde()), "w1") .map(wp -> KV.of(wp.getKey().getKey().toString(), String.valueOf(wp.getMessage()))) - .sendTo(graph.getOutputStream(od)); + .sendTo(appDesc.getOutputStream(od)); } } http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java index 3eb2662..f116f1d 100644 --- a/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java +++ b/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java @@ -20,14 +20,15 @@ package org.apache.samza.test.operator; import java.time.Duration; +import org.apache.samza.application.StreamApplicationDescriptor; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.Config; import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OutputStream; -import org.apache.samza.operators.StreamGraph; import org.apache.samza.operators.windows.Windows; -import org.apache.samza.runtime.LocalApplicationRunner; +import org.apache.samza.runtime.ApplicationRunner; +import org.apache.samza.runtime.ApplicationRunners; import org.apache.samza.serializers.IntegerSerde; import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; @@ -50,23 +51,21 @@ public class SessionWindowApp implements StreamApplication { public static void main(String[] args) { CommandLine cmdLine = new CommandLine(); Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); - SessionWindowApp app = new SessionWindowApp(); - LocalApplicationRunner runner = new LocalApplicationRunner(config); - - runner.run(app); + ApplicationRunner runner = ApplicationRunners.getApplicationRunner(new SessionWindowApp(), config); + runner.run(); runner.waitForFinish(); } @Override - public void init(StreamGraph graph, Config config) { + public void describe(StreamApplicationDescriptor appDesc) { JsonSerdeV2<PageView> inputSerde = new JsonSerdeV2<>(PageView.class); KVSerde<String, Integer> outputSerde = KVSerde.of(new StringSerde(), new IntegerSerde()); KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(SYSTEM); KafkaInputDescriptor<PageView> id = ksd.getInputDescriptor(INPUT_TOPIC, inputSerde); KafkaOutputDescriptor<KV<String, Integer>> od = ksd.getOutputDescriptor(OUTPUT_TOPIC, outputSerde); - MessageStream<PageView> pageViews = graph.getInputStream(id); - OutputStream<KV<String, Integer>> outputStream = graph.getOutputStream(od); + MessageStream<PageView> pageViews = appDesc.getInputStream(id); + OutputStream<KV<String, Integer>> outputStream = appDesc.getOutputStream(od); pageViews .filter(m -> !FILTER_KEY.equals(m.getUserId())) http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java index 5fee9cf..144f125 100644 --- a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java +++ b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java @@ -19,11 +19,16 @@ package org.apache.samza.test.operator; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.samza.Partition; +import org.apache.samza.config.JobConfig; +import org.apache.samza.config.JobCoordinatorConfig; +import org.apache.samza.config.TaskConfig; import org.apache.samza.system.SystemStreamMetadata; import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata; import org.apache.samza.system.kafka.KafkaSystemAdmin; @@ -33,8 +38,7 @@ import org.apache.samza.util.ExponentialSleepStrategy; import org.junit.Assert; import org.junit.Test; -import java.util.Collections; -import java.util.List; +import static org.junit.Assert.assertEquals; /** @@ -77,6 +81,10 @@ public class TestRepartitionJoinWindowApp extends StreamApplicationIntegrationTe RepartitionJoinWindowApp app = new RepartitionJoinWindowApp(); String appName = "UserPageAdClickCounter"; Map<String, String> configs = new HashMap<>(); + configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory"); + configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, "org.apache.samza.standalone.PassthroughCoordinationUtilsFactory"); + configs.put(JobConfig.PROCESSOR_ID(), "0"); + configs.put(TaskConfig.GROUPER_FACTORY(), "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory"); configs.put("systems.kafka.samza.delete.committed.messages", "false"); configs.put(RepartitionJoinWindowApp.INPUT_TOPIC_1_CONFIG_KEY, inputTopicName1); configs.put(RepartitionJoinWindowApp.INPUT_TOPIC_2_CONFIG_KEY, inputTopicName2); @@ -86,7 +94,7 @@ public class TestRepartitionJoinWindowApp extends StreamApplicationIntegrationTe // consume and validate result List<ConsumerRecord<String, String>> messages = consumeMessages(Collections.singletonList(outputTopicName), 2); - Assert.assertEquals(2, messages.size()); + assertEquals(2, messages.size()); Assert.assertFalse(KafkaSystemAdmin.deleteMessagesCalled()); } @@ -103,22 +111,26 @@ public class TestRepartitionJoinWindowApp extends StreamApplicationIntegrationTe RepartitionJoinWindowApp app = new RepartitionJoinWindowApp(); final String appName = "UserPageAdClickCounter2"; Map<String, String> configs = new HashMap<>(); + configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory"); + configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, "org.apache.samza.standalone.PassthroughCoordinationUtilsFactory"); + configs.put(JobConfig.PROCESSOR_ID(), "0"); + configs.put(TaskConfig.GROUPER_FACTORY(), "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory"); configs.put("systems.kafka.samza.delete.committed.messages", "true"); configs.put(RepartitionJoinWindowApp.INPUT_TOPIC_1_CONFIG_KEY, inputTopicName1); configs.put(RepartitionJoinWindowApp.INPUT_TOPIC_2_CONFIG_KEY, inputTopicName2); configs.put(RepartitionJoinWindowApp.OUTPUT_TOPIC_CONFIG_KEY, outputTopicName); - RunApplicationContext runApplicationContext = runApplication(app, appName, configs); + runApplication(app, appName, configs); // consume and validate result List<ConsumerRecord<String, String>> messages = consumeMessages(Collections.singletonList(outputTopicName), 2); - Assert.assertEquals(2, messages.size()); + assertEquals(2, messages.size()); for (ConsumerRecord<String, String> message : messages) { String key = message.key(); String value = message.value(); Assert.assertTrue(key.equals("u1") || key.equals("u2")); - Assert.assertEquals("2", value); + assertEquals("2", value); } // Verify that messages in the intermediate stream will be deleted in 10 seconds @@ -137,7 +149,7 @@ public class TestRepartitionJoinWindowApp extends StreamApplicationIntegrationTe remainingMessageNum += Long.parseLong(metadata.getUpcomingOffset()) - Long.parseLong(metadata.getOldestOffset()); } } - Assert.assertEquals(0, remainingMessageNum); + assertEquals(0, remainingMessageNum); } } @@ -147,6 +159,10 @@ public class TestRepartitionJoinWindowApp extends StreamApplicationIntegrationTe String inputTopicName2 = "ad-clicks"; String outputTopicName = "user-ad-click-counts"; Map<String, String> configs = new HashMap<>(); + configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory"); + configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, "org.apache.samza.standalone.PassthroughCoordinationUtilsFactory"); + configs.put(JobConfig.PROCESSOR_ID(), "0"); + configs.put(TaskConfig.GROUPER_FACTORY(), "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory"); configs.put(BroadcastAssertApp.INPUT_TOPIC_NAME_PROP, inputTopicName1); initializeTopics(inputTopicName1, inputTopicName2, outputTopicName); http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java index 6373292..2e1de96 100644 --- a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java +++ b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java @@ -18,11 +18,13 @@ */ package org.apache.samza.test.operator; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.samza.config.JobConfig; import org.apache.samza.config.JobCoordinatorConfig; -import org.apache.samza.config.MapConfig; import org.apache.samza.config.TaskConfig; import org.apache.samza.test.framework.StreamApplicationIntegrationTestHarness; import org.apache.samza.test.operator.data.PageView; @@ -30,9 +32,6 @@ import org.codehaus.jackson.map.ObjectMapper; import org.junit.Assert; import org.junit.Test; -import java.util.Collections; -import java.util.List; - import static org.apache.samza.test.operator.RepartitionWindowApp.*; /** @@ -63,10 +62,11 @@ public class TestRepartitionWindowApp extends StreamApplicationIntegrationTestHa Map<String, String> configs = new HashMap<>(); configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory"); configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, "org.apache.samza.standalone.PassthroughCoordinationUtilsFactory"); + configs.put(JobConfig.PROCESSOR_ID(), "0"); configs.put(TaskConfig.GROUPER_FACTORY(), "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory"); // run the application - runApplication(new RepartitionWindowApp(), APP_NAME, new MapConfig(configs)); + runApplication(new RepartitionWindowApp(), APP_NAME, configs); // consume and validate result List<ConsumerRecord<String, String>> messages = consumeMessages(Collections.singletonList(OUTPUT_TOPIC), 2); http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java index d77a2a6..0184013 100644 --- a/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java +++ b/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java @@ -20,14 +20,15 @@ package org.apache.samza.test.operator; import java.time.Duration; +import org.apache.samza.application.StreamApplicationDescriptor; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.Config; import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OutputStream; -import org.apache.samza.operators.StreamGraph; import org.apache.samza.operators.windows.Windows; -import org.apache.samza.runtime.LocalApplicationRunner; +import org.apache.samza.runtime.ApplicationRunner; +import org.apache.samza.runtime.ApplicationRunners; import org.apache.samza.serializers.IntegerSerde; import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; @@ -51,23 +52,22 @@ public class TumblingWindowApp implements StreamApplication { public static void main(String[] args) { CommandLine cmdLine = new CommandLine(); Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); - TumblingWindowApp app = new TumblingWindowApp(); - LocalApplicationRunner runner = new LocalApplicationRunner(config); + ApplicationRunner runner = ApplicationRunners.getApplicationRunner(new TumblingWindowApp(), config); - runner.run(app); + runner.run(); runner.waitForFinish(); } @Override - public void init(StreamGraph graph, Config config) { + public void describe(StreamApplicationDescriptor appDesc) { JsonSerdeV2<PageView> inputSerde = new JsonSerdeV2<>(PageView.class); KVSerde<String, Integer> outputSerde = KVSerde.of(new StringSerde(), new IntegerSerde()); KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(SYSTEM); KafkaInputDescriptor<PageView> id = ksd.getInputDescriptor(INPUT_TOPIC, inputSerde); KafkaOutputDescriptor<KV<String, Integer>> od = ksd.getOutputDescriptor(OUTPUT_TOPIC, outputSerde); - MessageStream<PageView> pageViews = graph.getInputStream(id); - OutputStream<KV<String, Integer>> outputStream = graph.getOutputStream(od); + MessageStream<PageView> pageViews = appDesc.getInputStream(id); + OutputStream<KV<String, Integer>> outputStream = appDesc.getOutputStream(od); pageViews .filter(m -> !FILTER_KEY.equals(m.getUserId())) http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamApplication.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamApplication.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamApplication.java index 1b778e8..51f33b5 100644 --- a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamApplication.java +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamApplication.java @@ -23,15 +23,13 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.io.Serializable; import java.util.concurrent.CountDownLatch; +import org.apache.samza.application.StreamApplicationDescriptor; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OutputStream; -import org.apache.samza.operators.StreamGraph; -import org.apache.samza.operators.descriptors.GenericInputDescriptor; -import org.apache.samza.operators.descriptors.GenericOutputDescriptor; import org.apache.samza.operators.functions.MapFunction; import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.serializers.StringSerde; @@ -43,7 +41,7 @@ import org.apache.samza.system.kafka.KafkaSystemDescriptor; /** * Test class to create an {@link StreamApplication} instance */ -public class TestStreamApplication implements StreamApplication, Serializable { +public class TestStreamApplication implements StreamApplication { private final String systemName; private final String inputTopic; @@ -61,47 +59,57 @@ public class TestStreamApplication implements StreamApplication, Serializable { } @Override - public void init(StreamGraph graph, Config config) { + public void describe(StreamApplicationDescriptor streamAppDesc) { KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(systemName); KafkaInputDescriptor<String> isd = ksd.getInputDescriptor(inputTopic, new NoOpSerde<>()); KafkaOutputDescriptor<String> osd = ksd.getOutputDescriptor(outputTopic, new StringSerde()); - MessageStream<String> inputStream = graph.getInputStream(isd); - OutputStream<String> outputStream = graph.getOutputStream(osd); - inputStream.map(new MapFunction<String, String>() { - transient CountDownLatch latch1; - transient CountDownLatch latch2; - transient StreamApplicationCallback callback; - - @Override - public String apply(String message) { - TestKafkaEvent incomingMessage = TestKafkaEvent.fromString(message); - if (callback != null) { - callback.onMessage(incomingMessage); - } - if (latch1 != null) { - latch1.countDown(); - } - if (latch2 != null) { - latch2.countDown(); - } - return incomingMessage.toString(); - } - - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - in.defaultReadObject(); - SharedContextFactories.SharedContextFactory contextFactory = - SharedContextFactories.getGlobalSharedContextFactory(appName).getProcessorSharedContextFactory(processorName); - this.latch1 = (CountDownLatch) contextFactory.getSharedObject("processedMsgLatch"); - this.latch2 = (CountDownLatch) contextFactory.getSharedObject("kafkaMsgsConsumedLatch"); - this.callback = (StreamApplicationCallback) contextFactory.getSharedObject("callback"); - } - }).sendTo(outputStream); + MessageStream<String> inputStream = streamAppDesc.getInputStream(isd); + OutputStream<String> outputStream = streamAppDesc.getOutputStream(osd); + inputStream.map(new TestMapFunction(appName, processorName)).sendTo(outputStream); } public interface StreamApplicationCallback { void onMessage(TestKafkaEvent m); } + public static class TestMapFunction implements MapFunction<String, String> { + private final String appName; + private final String processorName; + + private transient CountDownLatch latch1; + private transient CountDownLatch latch2; + private transient StreamApplicationCallback callback; + + TestMapFunction(String appName, String processorName) { + this.appName = appName; + this.processorName = processorName; + } + + @Override + public String apply(String message) { + TestKafkaEvent incomingMessage = TestKafkaEvent.fromString(message); + if (callback != null) { + callback.onMessage(incomingMessage); + } + if (latch1 != null) { + latch1.countDown(); + } + if (latch2 != null) { + latch2.countDown(); + } + return incomingMessage.toString(); + } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + SharedContextFactories.SharedContextFactory contextFactory = + SharedContextFactories.getGlobalSharedContextFactory(appName).getProcessorSharedContextFactory(processorName); + this.latch1 = (CountDownLatch) contextFactory.getSharedObject("processedMsgLatch"); + this.latch2 = (CountDownLatch) contextFactory.getSharedObject("kafkaMsgsConsumedLatch"); + this.callback = (StreamApplicationCallback) contextFactory.getSharedObject("callback"); + } + } + public static class TestKafkaEvent implements Serializable { // Actual content of the event. @@ -142,7 +150,7 @@ public class TestStreamApplication implements StreamApplication, Serializable { StreamApplicationCallback callback, CountDownLatch kafkaEventsConsumedLatch, Config config) { - String appName = String.format("%s-%s", config.get(ApplicationConfig.APP_NAME), config.get(ApplicationConfig.APP_ID)); + String appName = new ApplicationConfig(config).getGlobalAppId(); String processorName = config.get(JobConfig.PROCESSOR_ID()); registerLatches(processedMessageLatch, kafkaEventsConsumedLatch, callback, appName, processorName); http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java index c37132f..fc62b0a 100644 --- a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java @@ -42,7 +42,7 @@ import org.apache.samza.config.Config; import org.apache.samza.config.MapConfig; import org.apache.samza.config.ZkConfig; import org.apache.samza.processor.StreamProcessor; -import org.apache.samza.processor.StreamProcessorLifecycleListener; +import org.apache.samza.runtime.ProcessorLifecycleListener; import org.apache.samza.task.AsyncStreamTaskAdapter; import org.apache.samza.task.AsyncStreamTaskFactory; import org.apache.samza.task.StreamTaskFactory; @@ -52,10 +52,7 @@ import org.junit.Assert; import org.junit.Test; import scala.Option$; -import static org.mockito.Matchers.anyObject; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.*; public class TestStreamProcessor extends StandaloneIntegrationTestHarness { @@ -232,7 +229,7 @@ public class TestStreamProcessor extends StandaloneIntegrationTestHarness { KafkaConsumer consumer; KafkaProducer producer; StreamProcessor processor; - StreamProcessorLifecycleListener listener; + ProcessorLifecycleListener listener; private TestStubs(String bootstrapServer) { shutdownLatch = new CountDownLatch(1); @@ -266,13 +263,14 @@ public class TestStreamProcessor extends StandaloneIntegrationTestHarness { } private void initProcessorListener() { - listener = mock(StreamProcessorLifecycleListener.class); - doNothing().when(listener).onStart(); - doNothing().when(listener).onFailure(anyObject()); + listener = mock(ProcessorLifecycleListener.class); + doNothing().when(listener).afterStart(); + doNothing().when(listener).afterFailure(any()); doAnswer(invocation -> { + // stopped successfully shutdownLatch.countDown(); return null; - }).when(listener).onShutdown(); + }).when(listener).afterStop(); } private void initProducer(String bootstrapServer) { http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java index e34ee4a..3b2d08a 100644 --- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java @@ -23,7 +23,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import java.io.Serializable; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -37,7 +37,6 @@ import org.I0Itec.zkclient.ZkClient; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.samza.SamzaException; -import org.apache.samza.application.StreamApplication; import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; @@ -50,10 +49,10 @@ import org.apache.samza.container.TaskName; import org.apache.samza.job.ApplicationStatus; import org.apache.samza.job.model.JobModel; import org.apache.samza.job.model.TaskModel; -import org.apache.samza.runtime.LocalApplicationRunner; +import org.apache.samza.runtime.ApplicationRunner; +import org.apache.samza.runtime.ApplicationRunners; import org.apache.samza.test.StandaloneIntegrationTestHarness; import org.apache.samza.test.StandaloneTestUtils; -import org.apache.samza.test.processor.TestStreamApplication.StreamApplicationCallback; import org.apache.samza.util.NoOpMetricsRegistry; import org.apache.samza.zk.ZkJobCoordinatorFactory; import org.apache.samza.zk.ZkKeyBuilder; @@ -70,10 +69,11 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; /** - * Integration tests for {@link LocalApplicationRunner}. + * Integration tests for {@link org.apache.samza.runtime.LocalApplicationRunner} with {@link ZkJobCoordinatorFactory}. * - * Brings up embedded ZooKeeper, Kafka broker and launches multiple {@link StreamApplication} through - * {@link LocalApplicationRunner} to verify the guarantees made in stand alone execution environment. + * Brings up embedded ZooKeeper, Kafka broker and launches multiple {@link org.apache.samza.application.StreamApplication} + * through {@link org.apache.samza.runtime.LocalApplicationRunner} to verify the guarantees made in stand alone execution + * environment. */ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarness { @@ -157,6 +157,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne } public void tearDown() { + SharedContextFactories.clearAll(); for (String kafkaTopic : ImmutableList.of(inputKafkaTopic, outputKafkaTopic)) { LOGGER.info("Deleting kafka topic: {}.", kafkaTopic); AdminUtils.deleteTopic(zkUtils(), kafkaTopic); @@ -174,7 +175,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne for (int eventIndex = startIndex; eventIndex < endIndex; eventIndex++) { try { LOGGER.info("Publish kafka event with index : {} for stream processor: {}.", eventIndex, streamProcessorId); - producer.send(new ProducerRecord(topic, new TestKafkaEvent(streamProcessorId, String.valueOf(eventIndex)).toString().getBytes())); + producer.send(new ProducerRecord(topic, new TestStreamApplication.TestKafkaEvent(streamProcessorId, String.valueOf(eventIndex)).toString().getBytes())); } catch (Exception e) { LOGGER.error("Publishing to kafka topic: {} resulted in exception: {}.", new Object[]{topic, e}); throw new SamzaException(e); @@ -195,6 +196,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne .put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, TEST_JOB_COORDINATOR_FACTORY) .put(ApplicationConfig.APP_NAME, appName) .put(ApplicationConfig.APP_ID, appId) + .put("app.runner.class", "org.apache.samza.runtime.LocalApplicationRunner") .put(String.format("systems.%s.samza.factory", systemName), TEST_SYSTEM_FACTORY) .put(JobConfig.JOB_NAME(), appName) .put(JobConfig.JOB_ID(), appId) @@ -210,13 +212,13 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne /** * sspGrouper is set to GroupBySystemStreamPartitionFactory. - * Run a stream application(streamApp1) consuming messages from input topic(effectively one container). + * Run a stream application(appRunner1) consuming messages from input topic(effectively one container). * - * In the callback triggered by streamApp1 after processing a message, bring up an another stream application(streamApp2). + * In the callback triggered by appRunner1 after processing a message, bring up an another stream application(appRunner2). * * Assertions: - * A) JobModel generated before and after the addition of streamApp2 should be equal. - * B) Second stream application(streamApp2) should not join the group and process any message. + * A) JobModel generated before and after the addition of appRunner2 should be equal. + * B) Second stream application(appRunner2) should not join the group and process any message. */ @Test @@ -234,28 +236,27 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne final CountDownLatch secondProcessorRegistered = new CountDownLatch(1); zkUtils.subscribeToProcessorChange((parentPath, currentChilds) -> { - // When streamApp2 with id: PROCESSOR_IDS[1] is registered, start processing message in streamApp1. + // When appRunner2 with id: PROCESSOR_IDS[1] is registered, run processing message in appRunner1. if (currentChilds.contains(PROCESSOR_IDS[1])) { secondProcessorRegistered.countDown(); } }); - // Set up stream app 2. + // Set up stream app appRunner2. CountDownLatch processedMessagesLatch = new CountDownLatch(NUM_KAFKA_EVENTS); - Config testAppConfig2 = new MapConfig(applicationConfig2, testConfig); - LocalApplicationRunner localApplicationRunner2 = new LocalApplicationRunner(testAppConfig2); - StreamApplication streamApp2 = TestStreamApplication.getInstance( - TEST_SYSTEM, inputSinglePartitionKafkaTopic, outputSinglePartitionKafkaTopic, - processedMessagesLatch, null, null, testAppConfig2); + Config localTestConfig2 = new MapConfig(applicationConfig2, testConfig); + ApplicationRunner appRunner2 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance( + TEST_SYSTEM, inputSinglePartitionKafkaTopic, outputSinglePartitionKafkaTopic, processedMessagesLatch, + null, null, localTestConfig2), localTestConfig2); - // Callback handler for streamApp1. - StreamApplicationCallback streamApplicationCallback = message -> { + // Callback handler for appRunner1. + TestStreamApplication.StreamApplicationCallback callback = m -> { if (hasSecondProcessorJoined.compareAndSet(false, true)) { previousJobModelVersion[0] = zkUtils.getJobModelVersion(); previousJobModel[0] = zkUtils.getJobModel(previousJobModelVersion[0]); - localApplicationRunner2.run(streamApp2); + appRunner2.run(); try { - // Wait for streamApp2 to register with zookeeper. + // Wait for appRunner2 to register with zookeeper. secondProcessorRegistered.await(); } catch (InterruptedException e) { } @@ -264,13 +265,12 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne CountDownLatch kafkaEventsConsumedLatch = new CountDownLatch(NUM_KAFKA_EVENTS * 2); - // Set up stream app 1. - Config testAppConfig1 = new MapConfig(applicationConfig1, testConfig); - LocalApplicationRunner localApplicationRunner1 = new LocalApplicationRunner(testAppConfig1); - StreamApplication streamApp1 = TestStreamApplication.getInstance( - TEST_SYSTEM, inputSinglePartitionKafkaTopic, outputSinglePartitionKafkaTopic, - null, streamApplicationCallback, kafkaEventsConsumedLatch, testAppConfig1); - localApplicationRunner1.run(streamApp1); + // Set up stream app appRunner1. + Config localTestConfig1 = new MapConfig(applicationConfig1, testConfig); + ApplicationRunner appRunner1 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance( + TEST_SYSTEM, inputSinglePartitionKafkaTopic, outputSinglePartitionKafkaTopic, null, + callback, kafkaEventsConsumedLatch, localTestConfig1), localTestConfig1); + appRunner1.run(); kafkaEventsConsumedLatch.await(); @@ -281,12 +281,12 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne assertEquals(previousJobModel[0], updatedJobModel); assertEquals(new MapConfig(), updatedJobModel.getConfig()); assertEquals(NUM_KAFKA_EVENTS, processedMessagesLatch.getCount()); - localApplicationRunner1.kill(streamApp1); - localApplicationRunner1.waitForFinish(); - localApplicationRunner2.kill(streamApp2); - localApplicationRunner2.waitForFinish(); - assertEquals(localApplicationRunner1.status(streamApp1), ApplicationStatus.SuccessfulFinish); - assertEquals(localApplicationRunner2.status(streamApp2), ApplicationStatus.UnsuccessfulFinish); + appRunner1.kill(); + appRunner1.waitForFinish(); + appRunner2.kill(); + appRunner2.waitForFinish(); + assertEquals(appRunner1.status(), ApplicationStatus.SuccessfulFinish); + assertEquals(appRunner2.status(), ApplicationStatus.UnsuccessfulFinish); } /** @@ -309,7 +309,8 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne publishKafkaEvents(inputKafkaTopic, 0, NUM_KAFKA_EVENTS * 2, PROCESSOR_IDS[0]); // Configuration, verification variables - MapConfig testConfig = new MapConfig(ImmutableMap.of(JobConfig.SSP_GROUPER_FACTORY(), "org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory", JobConfig.JOB_DEBOUNCE_TIME_MS(), "10")); + MapConfig testConfig = new MapConfig(ImmutableMap.of(JobConfig.SSP_GROUPER_FACTORY(), + "org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory", JobConfig.JOB_DEBOUNCE_TIME_MS(), "10")); // Declared as final array to update it from streamApplication callback(Variable should be declared final to access in lambda block). final JobModel[] previousJobModel = new JobModel[1]; final String[] previousJobModelVersion = new String[1]; @@ -317,44 +318,43 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne final CountDownLatch secondProcessorRegistered = new CountDownLatch(1); zkUtils.subscribeToProcessorChange((parentPath, currentChilds) -> { - // When streamApp2 with id: PROCESSOR_IDS[1] is registered, start processing message in streamApp1. + // When appRunner2 with id: PROCESSOR_IDS[1] is registered, start processing message in appRunner1. if (currentChilds.contains(PROCESSOR_IDS[1])) { secondProcessorRegistered.countDown(); } }); - // Set up streamApp2. + // Set up appRunner2. CountDownLatch processedMessagesLatch = new CountDownLatch(NUM_KAFKA_EVENTS * 2); Config testAppConfig2 = new MapConfig(applicationConfig2, testConfig); - LocalApplicationRunner localApplicationRunner2 = new LocalApplicationRunner(testAppConfig2); - StreamApplication streamApp2 = TestStreamApplication.getInstance( - TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, processedMessagesLatch, null, null, testAppConfig2); + ApplicationRunner appRunner2 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance( + TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, processedMessagesLatch, null, + null, testAppConfig2), testAppConfig2); - // Callback handler for streamApp1. - StreamApplicationCallback streamApplicationCallback = message -> { + // Callback handler for appRunner1. + TestStreamApplication.StreamApplicationCallback streamApplicationCallback = message -> { if (hasSecondProcessorJoined.compareAndSet(false, true)) { previousJobModelVersion[0] = zkUtils.getJobModelVersion(); previousJobModel[0] = zkUtils.getJobModel(previousJobModelVersion[0]); - localApplicationRunner2.run(streamApp2); + appRunner2.run(); try { - // Wait for streamApp2 to register with zookeeper. + // Wait for appRunner2 to register with zookeeper. secondProcessorRegistered.await(); } catch (InterruptedException e) { } } }; - // This is the latch for the messages received by streamApp1. Since streamApp1 is run first, it gets one event - // redelivered due to re-balancing done by Zk after the streamApp2 joins (See the callback above). + // This is the latch for the messages received by appRunner1. Since appRunner1 is run first, it gets one event + // redelivered due to re-balancing done by Zk after the appRunner2 joins (See the callback above). CountDownLatch kafkaEventsConsumedLatch = new CountDownLatch(NUM_KAFKA_EVENTS * 2 + 1); - // Set up stream app 1. + // Set up stream app appRunner1. Config testAppConfig1 = new MapConfig(applicationConfig1, testConfig); - LocalApplicationRunner localApplicationRunner1 = new LocalApplicationRunner(testAppConfig1); - StreamApplication streamApp1 = TestStreamApplication.getInstance( - TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, null, - streamApplicationCallback, kafkaEventsConsumedLatch, testAppConfig1); - localApplicationRunner1.run(streamApp1); + ApplicationRunner appRunner1 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance( + TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, null, streamApplicationCallback, + kafkaEventsConsumedLatch, testAppConfig1), testAppConfig1); + appRunner1.run(); kafkaEventsConsumedLatch.await(); @@ -386,12 +386,12 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne processedMessagesLatch.await(); - assertEquals(ApplicationStatus.Running, localApplicationRunner2.status(streamApp2)); - localApplicationRunner1.kill(streamApp1); - localApplicationRunner1.waitForFinish(); - localApplicationRunner2.kill(streamApp2); - localApplicationRunner2.waitForFinish(); - assertEquals(localApplicationRunner1.status(streamApp1), ApplicationStatus.SuccessfulFinish); + assertEquals(ApplicationStatus.Running, appRunner2.status()); + appRunner1.kill(); + appRunner1.waitForFinish(); + appRunner2.kill(); + appRunner2.waitForFinish(); + assertEquals(ApplicationStatus.SuccessfulFinish, appRunner1.status()); } @Test @@ -405,23 +405,18 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne CountDownLatch processedMessagesLatch2 = new CountDownLatch(1); CountDownLatch processedMessagesLatch3 = new CountDownLatch(1); - StreamApplication streamApp1 = TestStreamApplication.getInstance( - TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, - processedMessagesLatch1, null, kafkaEventsConsumedLatch, applicationConfig1); - StreamApplication streamApp2 = TestStreamApplication.getInstance( - TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, - processedMessagesLatch2, null, kafkaEventsConsumedLatch, applicationConfig2); - StreamApplication streamApp3 = TestStreamApplication.getInstance( - TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, - processedMessagesLatch3, null, kafkaEventsConsumedLatch, applicationConfig3); + ApplicationRunner appRunner1 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance( + TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, null, kafkaEventsConsumedLatch, + applicationConfig1), applicationConfig1); + ApplicationRunner appRunner2 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance( + TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, processedMessagesLatch2, null, kafkaEventsConsumedLatch, + applicationConfig2), applicationConfig2); + ApplicationRunner appRunner3 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance( + TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, processedMessagesLatch3, null, kafkaEventsConsumedLatch, + applicationConfig3), applicationConfig3); - // Create LocalApplicationRunners - LocalApplicationRunner applicationRunner1 = new LocalApplicationRunner(applicationConfig1); - LocalApplicationRunner applicationRunner2 = new LocalApplicationRunner(applicationConfig2); - - // Run stream applications. - applicationRunner1.run(streamApp1); - applicationRunner2.run(streamApp2); + appRunner1.run(); + appRunner2.run(); // Wait until all processors have processed a message. processedMessagesLatch1.await(); @@ -439,21 +434,18 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne assertEquals(2, processorIdsFromZK.size()); assertEquals(PROCESSOR_IDS[0], processorIdsFromZK.get(0)); - // Kill the leader. Since streamApp1 is the first to join the cluster, it's the leader. - applicationRunner1.kill(streamApp1); - applicationRunner1.waitForFinish(); - - assertEquals(applicationRunner1.status(streamApp1), ApplicationStatus.SuccessfulFinish); + // Kill the leader. Since appRunner1 is the first to join the cluster, it's the leader. + appRunner1.kill(); + appRunner1.waitForFinish(); + assertEquals(ApplicationStatus.SuccessfulFinish, appRunner1.status()); kafkaEventsConsumedLatch.await(); publishKafkaEvents(inputKafkaTopic, 0, 2 * NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]); - LocalApplicationRunner applicationRunner3 = new LocalApplicationRunner(applicationConfig3); - applicationRunner3.run(streamApp3); + appRunner3.run(); processedMessagesLatch3.await(); // Verifications after killing the leader. - assertEquals(ApplicationStatus.SuccessfulFinish, applicationRunner1.status(streamApp1)); processorIdsFromZK = zkUtils.getActiveProcessorsIDs(ImmutableList.of(PROCESSOR_IDS[1], PROCESSOR_IDS[2])); assertEquals(2, processorIdsFromZK.size()); assertEquals(PROCESSOR_IDS[1], processorIdsFromZK.get(0)); @@ -462,12 +454,12 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne assertEquals(Sets.newHashSet("0000000001", "0000000002"), jobModel.getContainers().keySet()); assertEquals(2, jobModel.getContainers().size()); - applicationRunner2.kill(streamApp2); - applicationRunner2.waitForFinish(); - assertEquals(applicationRunner2.status(streamApp2), ApplicationStatus.SuccessfulFinish); - applicationRunner3.kill(streamApp3); - applicationRunner3.waitForFinish(); - assertEquals(applicationRunner3.status(streamApp2), ApplicationStatus.SuccessfulFinish); + appRunner2.kill(); + appRunner2.waitForFinish(); + assertEquals(ApplicationStatus.SuccessfulFinish, appRunner2.status()); + appRunner3.kill(); + appRunner3.waitForFinish(); + assertEquals(ApplicationStatus.SuccessfulFinish, appRunner3.status()); } @Test @@ -480,42 +472,37 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne CountDownLatch processedMessagesLatch1 = new CountDownLatch(1); CountDownLatch processedMessagesLatch2 = new CountDownLatch(1); - StreamApplication streamApp1 = TestStreamApplication.getInstance( - TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, - processedMessagesLatch1, null, kafkaEventsConsumedLatch, applicationConfig1); - StreamApplication streamApp2 = TestStreamApplication.getInstance( - TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, - processedMessagesLatch2, null, kafkaEventsConsumedLatch, applicationConfig2); - - // Create LocalApplicationRunners - LocalApplicationRunner applicationRunner1 = new LocalApplicationRunner(applicationConfig1); - LocalApplicationRunner applicationRunner2 = new LocalApplicationRunner(applicationConfig2); + ApplicationRunner appRunner1 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance( + TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, null, kafkaEventsConsumedLatch, + applicationConfig1), applicationConfig1); + ApplicationRunner appRunner2 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance( + TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, processedMessagesLatch2, null, kafkaEventsConsumedLatch, + applicationConfig2), applicationConfig2); // Run stream applications. - applicationRunner1.run(streamApp1); - applicationRunner2.run(streamApp2); + appRunner1.run(); + appRunner2.run(); - // Wait for message processing to start in both the processors. + // Wait for message processing to run in both the processors. processedMessagesLatch1.await(); processedMessagesLatch2.await(); - MapConfig appConfig = new ApplicationConfig(new MapConfig(applicationConfig2, ImmutableMap.of(ZkConfig.ZK_SESSION_TIMEOUT_MS, "10"))); - LocalApplicationRunner applicationRunner3 = new LocalApplicationRunner(appConfig); - // Create a stream app with same processor id as SP2 and run it. It should fail. - StreamApplication streamApp3 = TestStreamApplication.getInstance( - TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, - null, null, kafkaEventsConsumedLatch, applicationConfig2); + publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, 2 * NUM_KAFKA_EVENTS, PROCESSOR_IDS[2]); + kafkaEventsConsumedLatch = new CountDownLatch(NUM_KAFKA_EVENTS); + ApplicationRunner appRunner3 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance( + TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, null, null, kafkaEventsConsumedLatch, + applicationConfig2), applicationConfig2); // Fail when the duplicate processor joins. expectedException.expect(SamzaException.class); try { - applicationRunner3.run(streamApp3); + appRunner3.run(); } finally { - applicationRunner1.kill(streamApp1); - applicationRunner2.kill(streamApp2); + appRunner1.kill(); + appRunner2.kill(); - applicationRunner1.waitForFinish(); - applicationRunner2.waitForFinish(); + appRunner1.waitForFinish(); + appRunner2.waitForFinish(); } } @@ -533,24 +520,24 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[1]); Config applicationConfig2 = new MapConfig(configMap); - LocalApplicationRunner applicationRunner1 = new LocalApplicationRunner(applicationConfig1); - LocalApplicationRunner applicationRunner2 = new LocalApplicationRunner(applicationConfig2); + List<TestStreamApplication.TestKafkaEvent> messagesProcessed = new ArrayList<>(); + TestStreamApplication.StreamApplicationCallback streamApplicationCallback = messagesProcessed::add; // Create StreamApplication from configuration. CountDownLatch kafkaEventsConsumedLatch = new CountDownLatch(NUM_KAFKA_EVENTS); CountDownLatch processedMessagesLatch1 = new CountDownLatch(1); CountDownLatch processedMessagesLatch2 = new CountDownLatch(1); - StreamApplication streamApp1 = TestStreamApplication.getInstance( - TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, - processedMessagesLatch1, null, kafkaEventsConsumedLatch, applicationConfig1); - StreamApplication streamApp2 = TestStreamApplication.getInstance( - TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, - processedMessagesLatch2, null, kafkaEventsConsumedLatch, applicationConfig2); + ApplicationRunner appRunner1 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance( + TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, null, kafkaEventsConsumedLatch, + applicationConfig1), applicationConfig1); + ApplicationRunner appRunner2 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance( + TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, processedMessagesLatch2, null, kafkaEventsConsumedLatch, + applicationConfig2), applicationConfig2); // Run stream application. - applicationRunner1.run(streamApp1); - applicationRunner2.run(streamApp2); + appRunner1.run(); + appRunner2.run(); processedMessagesLatch1.await(); processedMessagesLatch2.await(); @@ -559,18 +546,23 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne String jobModelVersion = zkUtils.getJobModelVersion(); JobModel jobModel = zkUtils.getJobModel(jobModelVersion); - applicationRunner1.kill(streamApp1); - applicationRunner1.waitForFinish(); + appRunner1.kill(); + appRunner1.waitForFinish(); - assertEquals(applicationRunner1.status(streamApp1), ApplicationStatus.SuccessfulFinish); + int lastProcessedMessageId = -1; + for (TestStreamApplication.TestKafkaEvent message : messagesProcessed) { + lastProcessedMessageId = Math.max(lastProcessedMessageId, Integer.parseInt(message.getEventData())); + } + messagesProcessed.clear(); + + assertEquals(ApplicationStatus.SuccessfulFinish, appRunner1.status()); - LocalApplicationRunner applicationRunner4 = new LocalApplicationRunner(applicationConfig1); processedMessagesLatch1 = new CountDownLatch(1); publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, 2 * NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]); - streamApp1 = TestStreamApplication.getInstance( - TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, - processedMessagesLatch1, null, kafkaEventsConsumedLatch, applicationConfig1); - applicationRunner4.run(streamApp1); + ApplicationRunner appRunner3 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance( + TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, null, kafkaEventsConsumedLatch, + applicationConfig1), applicationConfig1); + appRunner3.run(); processedMessagesLatch1.await(); @@ -581,12 +573,12 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne assertEquals(Integer.parseInt(jobModelVersion) + 1, Integer.parseInt(newJobModelVersion)); assertEquals(jobModel.getContainers(), newJobModel.getContainers()); - applicationRunner2.kill(streamApp2); - applicationRunner2.waitForFinish(); - assertEquals(applicationRunner2.status(streamApp2), ApplicationStatus.SuccessfulFinish); - applicationRunner4.kill(streamApp1); - applicationRunner4.waitForFinish(); - assertEquals(applicationRunner4.status(streamApp1), ApplicationStatus.SuccessfulFinish); + appRunner2.kill(); + appRunner2.waitForFinish(); + assertEquals(ApplicationStatus.SuccessfulFinish, appRunner2.status()); + appRunner3.kill(); + appRunner3.waitForFinish(); + assertEquals(ApplicationStatus.SuccessfulFinish, appRunner3.status()); } @Test @@ -602,21 +594,20 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[1]); Config applicationConfig2 = new MapConfig(configMap); - LocalApplicationRunner applicationRunner1 = new LocalApplicationRunner(applicationConfig1); - LocalApplicationRunner applicationRunner2 = new LocalApplicationRunner(applicationConfig2); - // Create StreamApplication from configuration. CountDownLatch kafkaEventsConsumedLatch = new CountDownLatch(NUM_KAFKA_EVENTS); CountDownLatch processedMessagesLatch1 = new CountDownLatch(1); CountDownLatch processedMessagesLatch2 = new CountDownLatch(1); - StreamApplication streamApp1 = TestStreamApplication.getInstance(TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, - processedMessagesLatch1, null, kafkaEventsConsumedLatch, applicationConfig1); - StreamApplication streamApp2 = TestStreamApplication.getInstance(TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, - processedMessagesLatch2, null, kafkaEventsConsumedLatch, applicationConfig2); + ApplicationRunner appRunner1 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance( + TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, null, kafkaEventsConsumedLatch, + applicationConfig1), applicationConfig1); + ApplicationRunner appRunner2 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance( + TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, processedMessagesLatch2, null, kafkaEventsConsumedLatch, + applicationConfig2), applicationConfig2); - applicationRunner1.run(streamApp1); - applicationRunner2.run(streamApp2); + appRunner1.run(); + appRunner2.run(); processedMessagesLatch1.await(); processedMessagesLatch2.await(); @@ -628,12 +619,12 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[2]); Config applicationConfig3 = new MapConfig(configMap); - LocalApplicationRunner applicationRunner3 = new LocalApplicationRunner(applicationConfig3); CountDownLatch processedMessagesLatch3 = new CountDownLatch(1); - StreamApplication streamApp3 = TestStreamApplication.getInstance(TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, - processedMessagesLatch3, null, kafkaEventsConsumedLatch, applicationConfig3); - applicationRunner3.run(streamApp3); + ApplicationRunner appRunner3 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance( + TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, processedMessagesLatch3, null, kafkaEventsConsumedLatch, + applicationConfig3), applicationConfig3); + appRunner3.run(); publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, 2 * NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]); @@ -643,26 +634,12 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne * If the processing has started in the third stream processor, then other two stream processors should be stopped. */ // TODO: This is a bug! Status should be unsuccessful finish. - assertEquals(applicationRunner1.status(streamApp1), ApplicationStatus.SuccessfulFinish); - assertEquals(applicationRunner2.status(streamApp2), ApplicationStatus.SuccessfulFinish); - - applicationRunner3.kill(streamApp3); - applicationRunner3.waitForFinish(); - assertEquals(applicationRunner3.status(streamApp3), ApplicationStatus.SuccessfulFinish); - } + assertEquals(ApplicationStatus.SuccessfulFinish, appRunner1.status()); + assertEquals(ApplicationStatus.SuccessfulFinish, appRunner2.status()); - private static class TestKafkaEvent implements Serializable { - - // Actual content of the event. - private String eventData; - - // Contains Integer value, which is greater than previous message id. - private String eventId; - - TestKafkaEvent(String eventId, String eventData) { - this.eventId = eventData; - this.eventData = eventData; - } + appRunner3.kill(); + appRunner3.waitForFinish(); + assertEquals(ApplicationStatus.SuccessfulFinish, appRunner3.status()); } }