Repository: samza Updated Branches: refs/heads/master 2d6b19953 -> 2a71baf7c
http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/test/java/org/apache/samza/test/framework/TestTimerApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/TestTimerApp.java b/samza-test/src/test/java/org/apache/samza/test/framework/TestTimerApp.java index 09da838..91234b7 100644 --- a/samza-test/src/test/java/org/apache/samza/test/framework/TestTimerApp.java +++ b/samza-test/src/test/java/org/apache/samza/test/framework/TestTimerApp.java @@ -19,6 +19,11 @@ package org.apache.samza.test.framework; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.Config; import org.apache.samza.operators.MessageStream; @@ -27,21 +32,19 @@ import org.apache.samza.operators.TimerRegistry; import org.apache.samza.operators.functions.FlatMapFunction; import org.apache.samza.operators.functions.TimerFunction; import org.apache.samza.serializers.JsonSerdeV2; +import org.apache.samza.system.kafka.KafkaInputDescriptor; +import org.apache.samza.system.kafka.KafkaSystemDescriptor; import org.apache.samza.test.operator.data.PageView; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.List; - public class TestTimerApp implements StreamApplication { public static final String PAGE_VIEWS = "page-views"; @Override public void init(StreamGraph graph, Config config) { final JsonSerdeV2<PageView> serde = new JsonSerdeV2<>(PageView.class); - final MessageStream<PageView> pageViews = graph.getInputStream(PAGE_VIEWS, serde); + KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("kafka"); + KafkaInputDescriptor<PageView> isd = ksd.getInputDescriptor(PAGE_VIEWS, serde); + final MessageStream<PageView> pageViews = graph.getInputStream(isd); final MessageStream<PageView> output = pageViews.flatMap(new FlatmapTimerFn()); MessageStreamAssert.that("Output from timer function should container all complete messages", output, serde) http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java index ec9c05d..8ee7e00 100644 --- a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java +++ b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java @@ -35,6 +35,8 @@ import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.StringSerde; import org.apache.samza.system.OutgoingMessageEnvelope; import org.apache.samza.system.SystemStream; +import org.apache.samza.system.kafka.KafkaInputDescriptor; +import org.apache.samza.system.kafka.KafkaSystemDescriptor; import org.apache.samza.task.TaskCoordinator; import org.apache.samza.test.operator.data.AdClick; import org.apache.samza.test.operator.data.PageView; @@ -48,10 +50,10 @@ import org.apache.samza.util.CommandLine; * A {@link StreamApplication} that demonstrates a partitionBy, stream-stream join and a windowed count. */ public class RepartitionJoinWindowApp implements StreamApplication { - - public static final String INPUT_TOPIC_NAME_1_PROP = "inputTopicName1"; - public static final String INPUT_TOPIC_NAME_2_PROP = "inputTopicName2"; - public static final String OUTPUT_TOPIC_NAME_PROP = "outputTopicName"; + public static final String SYSTEM = "kafka"; + public static final String INPUT_TOPIC_1_CONFIG_KEY = "inputTopic1"; + public static final String INPUT_TOPIC_2_CONFIG_KEY = "inputTopic2"; + public static final String OUTPUT_TOPIC_CONFIG_KEY = "outputTopic"; private final List<String> intermediateStreamIds = new ArrayList<>(); @@ -68,12 +70,17 @@ public class RepartitionJoinWindowApp implements StreamApplication { @Override public void init(StreamGraph graph, Config config) { - String inputTopicName1 = config.get(INPUT_TOPIC_NAME_1_PROP); - String inputTopicName2 = config.get(INPUT_TOPIC_NAME_2_PROP); - String outputTopic = config.get(OUTPUT_TOPIC_NAME_PROP); - - MessageStream<PageView> pageViews = graph.getInputStream(inputTopicName1, new JsonSerdeV2<>(PageView.class)); - MessageStream<AdClick> adClicks = graph.getInputStream(inputTopicName2, new JsonSerdeV2<>(AdClick.class)); + // offset.default = oldest required for tests since checkpoint topic is empty on start and messages are published + // before the application is run + String inputTopic1 = config.get(INPUT_TOPIC_1_CONFIG_KEY); + String inputTopic2 = config.get(INPUT_TOPIC_2_CONFIG_KEY); + String outputTopic = config.get(OUTPUT_TOPIC_CONFIG_KEY); + KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(SYSTEM); + KafkaInputDescriptor<PageView> id1 = ksd.getInputDescriptor(inputTopic1, new JsonSerdeV2<>(PageView.class)); + KafkaInputDescriptor<AdClick> id2 = ksd.getInputDescriptor(inputTopic2, new JsonSerdeV2<>(AdClick.class)); + + MessageStream<PageView> pageViews = graph.getInputStream(id1); + MessageStream<AdClick> adClicks = graph.getInputStream(id2); MessageStream<KV<String, PageView>> pageViewsRepartitionedByViewId = pageViews .partitionBy(PageView::getViewId, pv -> pv, @@ -101,14 +108,15 @@ public class RepartitionJoinWindowApp implements StreamApplication { .map(windowPane -> KV.of(windowPane.getKey().getKey(), String.valueOf(windowPane.getMessage().size()))) .sink((message, messageCollector, taskCoordinator) -> { taskCoordinator.commit(TaskCoordinator.RequestScope.ALL_TASKS_IN_CONTAINER); - messageCollector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", outputTopic), null, message.getKey(), message.getValue())); + messageCollector.send( + new OutgoingMessageEnvelope( + new SystemStream("kafka", outputTopic), null, message.getKey(), message.getValue())); }); intermediateStreamIds.add(((IntermediateMessageStreamImpl) pageViewsRepartitionedByViewId).getStreamId()); intermediateStreamIds.add(((IntermediateMessageStreamImpl) adClicksRepartitionedByViewId).getStreamId()); intermediateStreamIds.add(((IntermediateMessageStreamImpl) userPageAdClicksByUserId).getStreamId()); - } List<String> getIntermediateStreamIds() { http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java index e233793..ae3669f 100644 --- a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java +++ b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java @@ -30,6 +30,9 @@ import org.apache.samza.serializers.IntegerSerde; import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.StringSerde; +import org.apache.samza.system.kafka.KafkaInputDescriptor; +import org.apache.samza.system.kafka.KafkaOutputDescriptor; +import org.apache.samza.system.kafka.KafkaSystemDescriptor; import org.apache.samza.test.operator.data.PageView; import org.apache.samza.util.CommandLine; import org.slf4j.Logger; @@ -41,7 +44,7 @@ import org.slf4j.LoggerFactory; public class RepartitionWindowApp implements StreamApplication { private static final Logger LOG = LoggerFactory.getLogger(RepartitionWindowApp.class); - + static final String SYSTEM = "kafka"; static final String INPUT_TOPIC = "page-views"; static final String OUTPUT_TOPIC = "Result"; @@ -57,16 +60,18 @@ public class RepartitionWindowApp implements StreamApplication { @Override public void init(StreamGraph graph, Config config) { + KVSerde<String, PageView> inputSerde = KVSerde.of(new StringSerde("UTF-8"), new JsonSerdeV2<>(PageView.class)); + KVSerde<String, String> outputSerde = KVSerde.of(new StringSerde(), new StringSerde()); + KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(SYSTEM); + KafkaInputDescriptor<KV<String, PageView>> id = ksd.getInputDescriptor(INPUT_TOPIC, inputSerde); + KafkaOutputDescriptor<KV<String, String>> od = ksd.getOutputDescriptor(OUTPUT_TOPIC, outputSerde); - KVSerde<String, PageView> - pgeMsgSerde = KVSerde.of(new StringSerde("UTF-8"), new JsonSerdeV2<>(PageView.class)); - - graph.getInputStream(INPUT_TOPIC, pgeMsgSerde) + graph.getInputStream(id) .map(KV::getValue) - .partitionBy(PageView::getUserId, m -> m, pgeMsgSerde, "p1") + .partitionBy(PageView::getUserId, m -> m, inputSerde, "p1") .window(Windows.keyedSessionWindow(m -> m.getKey(), Duration.ofSeconds(3), () -> 0, (m, c) -> c + 1, new StringSerde("UTF-8"), new IntegerSerde()), "w1") .map(wp -> KV.of(wp.getKey().getKey().toString(), String.valueOf(wp.getMessage()))) - .sendTo(graph.getOutputStream(OUTPUT_TOPIC)); + .sendTo(graph.getOutputStream(od)); } } http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java index 3224d24..3eb2662 100644 --- a/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java +++ b/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java @@ -32,19 +32,19 @@ import org.apache.samza.serializers.IntegerSerde; import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.StringSerde; +import org.apache.samza.system.kafka.KafkaInputDescriptor; +import org.apache.samza.system.kafka.KafkaOutputDescriptor; +import org.apache.samza.system.kafka.KafkaSystemDescriptor; import org.apache.samza.test.operator.data.PageView; import org.apache.samza.util.CommandLine; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * A {@link StreamApplication} that demonstrates a filter followed by a session window. */ public class SessionWindowApp implements StreamApplication { + private static final String SYSTEM = "kafka"; private static final String INPUT_TOPIC = "page-views"; private static final String OUTPUT_TOPIC = "page-view-counts"; - - private static final Logger LOG = LoggerFactory.getLogger(SessionWindowApp.class); private static final String FILTER_KEY = "badKey"; public static void main(String[] args) { @@ -59,10 +59,14 @@ public class SessionWindowApp implements StreamApplication { @Override public void init(StreamGraph graph, Config config) { + JsonSerdeV2<PageView> inputSerde = new JsonSerdeV2<>(PageView.class); + KVSerde<String, Integer> outputSerde = KVSerde.of(new StringSerde(), new IntegerSerde()); + KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(SYSTEM); + KafkaInputDescriptor<PageView> id = ksd.getInputDescriptor(INPUT_TOPIC, inputSerde); + KafkaOutputDescriptor<KV<String, Integer>> od = ksd.getOutputDescriptor(OUTPUT_TOPIC, outputSerde); - MessageStream<PageView> pageViews = graph.getInputStream(INPUT_TOPIC, new JsonSerdeV2<>(PageView.class)); - OutputStream<KV<String, Integer>> outputStream = - graph.getOutputStream(OUTPUT_TOPIC, new KVSerde<>(new StringSerde(), new IntegerSerde())); + MessageStream<PageView> pageViews = graph.getInputStream(id); + OutputStream<KV<String, Integer>> outputStream = graph.getOutputStream(od); pageViews .filter(m -> !FILTER_KEY.equals(m.getUserId())) @@ -70,6 +74,5 @@ public class SessionWindowApp implements StreamApplication { new StringSerde(), new JsonSerdeV2<>(PageView.class)), "sessionWindow") .map(m -> KV.of(m.getKey().getKey(), m.getMessage().size())) .sendTo(outputStream); - } } http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java index 2f75103..5fee9cf 100644 --- a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java +++ b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java @@ -78,9 +78,9 @@ public class TestRepartitionJoinWindowApp extends StreamApplicationIntegrationTe String appName = "UserPageAdClickCounter"; Map<String, String> configs = new HashMap<>(); configs.put("systems.kafka.samza.delete.committed.messages", "false"); - configs.put(RepartitionJoinWindowApp.INPUT_TOPIC_NAME_1_PROP, inputTopicName1); - configs.put(RepartitionJoinWindowApp.INPUT_TOPIC_NAME_2_PROP, inputTopicName2); - configs.put(RepartitionJoinWindowApp.OUTPUT_TOPIC_NAME_PROP, outputTopicName); + configs.put(RepartitionJoinWindowApp.INPUT_TOPIC_1_CONFIG_KEY, inputTopicName1); + configs.put(RepartitionJoinWindowApp.INPUT_TOPIC_2_CONFIG_KEY, inputTopicName2); + configs.put(RepartitionJoinWindowApp.OUTPUT_TOPIC_CONFIG_KEY, outputTopicName); runApplication(app, appName, configs); @@ -104,9 +104,9 @@ public class TestRepartitionJoinWindowApp extends StreamApplicationIntegrationTe final String appName = "UserPageAdClickCounter2"; Map<String, String> configs = new HashMap<>(); configs.put("systems.kafka.samza.delete.committed.messages", "true"); - configs.put(RepartitionJoinWindowApp.INPUT_TOPIC_NAME_1_PROP, inputTopicName1); - configs.put(RepartitionJoinWindowApp.INPUT_TOPIC_NAME_2_PROP, inputTopicName2); - configs.put(RepartitionJoinWindowApp.OUTPUT_TOPIC_NAME_PROP, outputTopicName); + configs.put(RepartitionJoinWindowApp.INPUT_TOPIC_1_CONFIG_KEY, inputTopicName1); + configs.put(RepartitionJoinWindowApp.INPUT_TOPIC_2_CONFIG_KEY, inputTopicName2); + configs.put(RepartitionJoinWindowApp.OUTPUT_TOPIC_CONFIG_KEY, outputTopicName); RunApplicationContext runApplicationContext = runApplication(app, appName, configs); http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java index 058a690..6373292 100644 --- a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java +++ b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java @@ -39,8 +39,7 @@ import static org.apache.samza.test.operator.RepartitionWindowApp.*; * Test driver for {@link RepartitionWindowApp}. */ public class TestRepartitionWindowApp extends StreamApplicationIntegrationTestHarness { - - static final String APP_NAME = "PageViewCounterApp"; + private static final String APP_NAME = "PageViewCounterApp"; @Test public void testRepartitionedSessionWindowCounter() throws Exception { @@ -65,8 +64,6 @@ public class TestRepartitionWindowApp extends StreamApplicationIntegrationTestHa configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory"); configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, "org.apache.samza.standalone.PassthroughCoordinationUtilsFactory"); configs.put(TaskConfig.GROUPER_FACTORY(), "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory"); - configs.put(String.format("streams.%s.samza.msg.serde", INPUT_TOPIC), "string"); - configs.put(String.format("streams.%s.samza.key.serde", INPUT_TOPIC), "string"); // run the application runApplication(new RepartitionWindowApp(), APP_NAME, new MapConfig(configs)); @@ -86,6 +83,5 @@ public class TestRepartitionWindowApp extends StreamApplicationIntegrationTestHa Assert.assertEquals(value, "1"); } } - } } http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java index 40a3f91..d77a2a6 100644 --- a/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java +++ b/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java @@ -32,20 +32,20 @@ import org.apache.samza.serializers.IntegerSerde; import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.StringSerde; +import org.apache.samza.system.kafka.KafkaInputDescriptor; +import org.apache.samza.system.kafka.KafkaOutputDescriptor; +import org.apache.samza.system.kafka.KafkaSystemDescriptor; import org.apache.samza.test.operator.data.PageView; import org.apache.samza.util.CommandLine; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * A {@link StreamApplication} that demonstrates a filter followed by a tumbling window. */ public class TumblingWindowApp implements StreamApplication { + private static final String SYSTEM = "kafka"; private static final String INPUT_TOPIC = "page-views"; private static final String OUTPUT_TOPIC = "page-view-counts"; - - private static final Logger LOG = LoggerFactory.getLogger(TumblingWindowApp.class); private static final String FILTER_KEY = "badKey"; public static void main(String[] args) { @@ -60,11 +60,14 @@ public class TumblingWindowApp implements StreamApplication { @Override public void init(StreamGraph graph, Config config) { + JsonSerdeV2<PageView> inputSerde = new JsonSerdeV2<>(PageView.class); + KVSerde<String, Integer> outputSerde = KVSerde.of(new StringSerde(), new IntegerSerde()); + KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(SYSTEM); + KafkaInputDescriptor<PageView> id = ksd.getInputDescriptor(INPUT_TOPIC, inputSerde); + KafkaOutputDescriptor<KV<String, Integer>> od = ksd.getOutputDescriptor(OUTPUT_TOPIC, outputSerde); - MessageStream<PageView> pageViews = - graph.getInputStream(INPUT_TOPIC, new JsonSerdeV2<>(PageView.class)); - OutputStream<KV<String, Integer>> outputStream = - graph.getOutputStream(OUTPUT_TOPIC, new KVSerde<>(new StringSerde(), new IntegerSerde())); + MessageStream<PageView> pageViews = graph.getInputStream(id); + OutputStream<KV<String, Integer>> outputStream = graph.getOutputStream(od); pageViews .filter(m -> !FILTER_KEY.equals(m.getUserId())) http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamApplication.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamApplication.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamApplication.java index db12351..1b778e8 100644 --- a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamApplication.java +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamApplication.java @@ -30,9 +30,14 @@ import org.apache.samza.config.JobConfig; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OutputStream; import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.descriptors.GenericInputDescriptor; +import org.apache.samza.operators.descriptors.GenericOutputDescriptor; import org.apache.samza.operators.functions.MapFunction; import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.serializers.StringSerde; +import org.apache.samza.system.kafka.KafkaInputDescriptor; +import org.apache.samza.system.kafka.KafkaOutputDescriptor; +import org.apache.samza.system.kafka.KafkaSystemDescriptor; /** @@ -40,12 +45,15 @@ import org.apache.samza.serializers.StringSerde; */ public class TestStreamApplication implements StreamApplication, Serializable { + private final String systemName; private final String inputTopic; private final String outputTopic; private final String appName; private final String processorName; - private TestStreamApplication(String inputTopic, String outputTopic, String appName, String processorName) { + private TestStreamApplication(String systemName, String inputTopic, String outputTopic, + String appName, String processorName) { + this.systemName = systemName; this.inputTopic = inputTopic; this.outputTopic = outputTopic; this.appName = appName; @@ -54,8 +62,11 @@ public class TestStreamApplication implements StreamApplication, Serializable { @Override public void init(StreamGraph graph, Config config) { - MessageStream<String> inputStream = graph.getInputStream(inputTopic, new NoOpSerde<String>()); - OutputStream<String> outputStream = graph.getOutputStream(outputTopic, new StringSerde()); + KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(systemName); + KafkaInputDescriptor<String> isd = ksd.getInputDescriptor(inputTopic, new NoOpSerde<>()); + KafkaOutputDescriptor<String> osd = ksd.getOutputDescriptor(outputTopic, new StringSerde()); + MessageStream<String> inputStream = graph.getInputStream(isd); + OutputStream<String> outputStream = graph.getOutputStream(osd); inputStream.map(new MapFunction<String, String>() { transient CountDownLatch latch1; transient CountDownLatch latch2; @@ -124,6 +135,7 @@ public class TestStreamApplication implements StreamApplication, Serializable { } public static StreamApplication getInstance( + String systemName, String inputTopic, String outputTopic, CountDownLatch processedMessageLatch, @@ -134,7 +146,7 @@ public class TestStreamApplication implements StreamApplication, Serializable { String processorName = config.get(JobConfig.PROCESSOR_ID()); registerLatches(processedMessageLatch, kafkaEventsConsumedLatch, callback, appName, processorName); - StreamApplication app = new TestStreamApplication(inputTopic, outputTopic, appName, processorName); + StreamApplication app = new TestStreamApplication(systemName, inputTopic, outputTopic, appName, processorName); return app; } http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java index bfa78a0..e34ee4a 100644 --- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java @@ -244,7 +244,8 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne CountDownLatch processedMessagesLatch = new CountDownLatch(NUM_KAFKA_EVENTS); Config testAppConfig2 = new MapConfig(applicationConfig2, testConfig); LocalApplicationRunner localApplicationRunner2 = new LocalApplicationRunner(testAppConfig2); - StreamApplication streamApp2 = TestStreamApplication.getInstance(inputSinglePartitionKafkaTopic, outputSinglePartitionKafkaTopic, + StreamApplication streamApp2 = TestStreamApplication.getInstance( + TEST_SYSTEM, inputSinglePartitionKafkaTopic, outputSinglePartitionKafkaTopic, processedMessagesLatch, null, null, testAppConfig2); // Callback handler for streamApp1. @@ -266,7 +267,8 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne // Set up stream app 1. Config testAppConfig1 = new MapConfig(applicationConfig1, testConfig); LocalApplicationRunner localApplicationRunner1 = new LocalApplicationRunner(testAppConfig1); - StreamApplication streamApp1 = TestStreamApplication.getInstance(inputSinglePartitionKafkaTopic, outputSinglePartitionKafkaTopic, + StreamApplication streamApp1 = TestStreamApplication.getInstance( + TEST_SYSTEM, inputSinglePartitionKafkaTopic, outputSinglePartitionKafkaTopic, null, streamApplicationCallback, kafkaEventsConsumedLatch, testAppConfig1); localApplicationRunner1.run(streamApp1); @@ -325,7 +327,8 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne CountDownLatch processedMessagesLatch = new CountDownLatch(NUM_KAFKA_EVENTS * 2); Config testAppConfig2 = new MapConfig(applicationConfig2, testConfig); LocalApplicationRunner localApplicationRunner2 = new LocalApplicationRunner(testAppConfig2); - StreamApplication streamApp2 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch, null, null, testAppConfig2); + StreamApplication streamApp2 = TestStreamApplication.getInstance( + TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, processedMessagesLatch, null, null, testAppConfig2); // Callback handler for streamApp1. StreamApplicationCallback streamApplicationCallback = message -> { @@ -348,7 +351,8 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne // Set up stream app 1. Config testAppConfig1 = new MapConfig(applicationConfig1, testConfig); LocalApplicationRunner localApplicationRunner1 = new LocalApplicationRunner(testAppConfig1); - StreamApplication streamApp1 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, null, + StreamApplication streamApp1 = TestStreamApplication.getInstance( + TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, null, streamApplicationCallback, kafkaEventsConsumedLatch, testAppConfig1); localApplicationRunner1.run(streamApp1); @@ -401,9 +405,15 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne CountDownLatch processedMessagesLatch2 = new CountDownLatch(1); CountDownLatch processedMessagesLatch3 = new CountDownLatch(1); - StreamApplication streamApp1 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, null, kafkaEventsConsumedLatch, applicationConfig1); - StreamApplication streamApp2 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch2, null, kafkaEventsConsumedLatch, applicationConfig2); - StreamApplication streamApp3 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch3, null, kafkaEventsConsumedLatch, applicationConfig3); + StreamApplication streamApp1 = TestStreamApplication.getInstance( + TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, + processedMessagesLatch1, null, kafkaEventsConsumedLatch, applicationConfig1); + StreamApplication streamApp2 = TestStreamApplication.getInstance( + TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, + processedMessagesLatch2, null, kafkaEventsConsumedLatch, applicationConfig2); + StreamApplication streamApp3 = TestStreamApplication.getInstance( + TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, + processedMessagesLatch3, null, kafkaEventsConsumedLatch, applicationConfig3); // Create LocalApplicationRunners LocalApplicationRunner applicationRunner1 = new LocalApplicationRunner(applicationConfig1); @@ -470,8 +480,12 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne CountDownLatch processedMessagesLatch1 = new CountDownLatch(1); CountDownLatch processedMessagesLatch2 = new CountDownLatch(1); - StreamApplication streamApp1 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, null, kafkaEventsConsumedLatch, applicationConfig1); - StreamApplication streamApp2 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch2, null, kafkaEventsConsumedLatch, applicationConfig2); + StreamApplication streamApp1 = TestStreamApplication.getInstance( + TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, + processedMessagesLatch1, null, kafkaEventsConsumedLatch, applicationConfig1); + StreamApplication streamApp2 = TestStreamApplication.getInstance( + TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, + processedMessagesLatch2, null, kafkaEventsConsumedLatch, applicationConfig2); // Create LocalApplicationRunners LocalApplicationRunner applicationRunner1 = new LocalApplicationRunner(applicationConfig1); @@ -489,7 +503,9 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne LocalApplicationRunner applicationRunner3 = new LocalApplicationRunner(appConfig); // Create a stream app with same processor id as SP2 and run it. It should fail. - StreamApplication streamApp3 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, null, null, kafkaEventsConsumedLatch, applicationConfig2); + StreamApplication streamApp3 = TestStreamApplication.getInstance( + TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, + null, null, kafkaEventsConsumedLatch, applicationConfig2); // Fail when the duplicate processor joins. expectedException.expect(SamzaException.class); try { @@ -508,7 +524,8 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne // Set up kafka topics. publishKafkaEvents(inputKafkaTopic, 0, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]); - Map<String, String> configMap = buildStreamApplicationConfigMap(TEST_SYSTEM, inputKafkaTopic, testStreamAppName, testStreamAppId); + Map<String, String> configMap = buildStreamApplicationConfigMap( + TEST_SYSTEM, inputKafkaTopic, testStreamAppName, testStreamAppId); configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[0]); Config applicationConfig1 = new MapConfig(configMap); @@ -524,8 +541,12 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne CountDownLatch processedMessagesLatch1 = new CountDownLatch(1); CountDownLatch processedMessagesLatch2 = new CountDownLatch(1); - StreamApplication streamApp1 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, null, kafkaEventsConsumedLatch, applicationConfig1); - StreamApplication streamApp2 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch2, null, kafkaEventsConsumedLatch, applicationConfig2); + StreamApplication streamApp1 = TestStreamApplication.getInstance( + TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, + processedMessagesLatch1, null, kafkaEventsConsumedLatch, applicationConfig1); + StreamApplication streamApp2 = TestStreamApplication.getInstance( + TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, + processedMessagesLatch2, null, kafkaEventsConsumedLatch, applicationConfig2); // Run stream application. applicationRunner1.run(streamApp1); @@ -546,7 +567,9 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne LocalApplicationRunner applicationRunner4 = new LocalApplicationRunner(applicationConfig1); processedMessagesLatch1 = new CountDownLatch(1); publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, 2 * NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]); - streamApp1 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, null, kafkaEventsConsumedLatch, applicationConfig1); + streamApp1 = TestStreamApplication.getInstance( + TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, + processedMessagesLatch1, null, kafkaEventsConsumedLatch, applicationConfig1); applicationRunner4.run(streamApp1); processedMessagesLatch1.await(); @@ -587,8 +610,10 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne CountDownLatch processedMessagesLatch1 = new CountDownLatch(1); CountDownLatch processedMessagesLatch2 = new CountDownLatch(1); - StreamApplication streamApp1 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, null, kafkaEventsConsumedLatch, applicationConfig1); - StreamApplication streamApp2 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch2, null, kafkaEventsConsumedLatch, applicationConfig2); + StreamApplication streamApp1 = TestStreamApplication.getInstance(TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, + processedMessagesLatch1, null, kafkaEventsConsumedLatch, applicationConfig1); + StreamApplication streamApp2 = TestStreamApplication.getInstance(TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, + processedMessagesLatch2, null, kafkaEventsConsumedLatch, applicationConfig2); applicationRunner1.run(streamApp1); applicationRunner2.run(streamApp2); @@ -606,7 +631,8 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne LocalApplicationRunner applicationRunner3 = new LocalApplicationRunner(applicationConfig3); CountDownLatch processedMessagesLatch3 = new CountDownLatch(1); - StreamApplication streamApp3 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch3, null, kafkaEventsConsumedLatch, applicationConfig3); + StreamApplication streamApp3 = TestStreamApplication.getInstance(TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, + processedMessagesLatch3, null, kafkaEventsConsumedLatch, applicationConfig3); applicationRunner3.run(streamApp3); publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, 2 * NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]); http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java index e5775f0..5b29fe7 100644 --- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java +++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java @@ -39,6 +39,8 @@ import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.metrics.Timer; import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.descriptors.GenericInputDescriptor; +import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor; import org.apache.samza.operators.functions.MapFunction; import org.apache.samza.runtime.LocalApplicationRunner; import org.apache.samza.serializers.IntegerSerde; @@ -97,8 +99,9 @@ public class TestLocalTable extends AbstractIntegrationTestHarness { Table<KV<Integer, Profile>> table = streamGraph.getTable(new InMemoryTableDescriptor("t1") .withSerde(KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))); - - streamGraph.getInputStream("Profile", new NoOpSerde<Profile>()) + DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test"); + GenericInputDescriptor<Profile> isd = ksd.getInputDescriptor("Profile", new NoOpSerde<>()); + streamGraph.getInputStream(isd) .map(mapFn) .sendTo(table); }; @@ -132,12 +135,14 @@ public class TestLocalTable extends AbstractIntegrationTestHarness { Table<KV<Integer, Profile>> table = streamGraph.getTable( new InMemoryTableDescriptor("t1").withSerde(KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))); - - streamGraph.getInputStream("Profile", new NoOpSerde<Profile>()) + DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test"); + GenericInputDescriptor<Profile> profileISD = ksd.getInputDescriptor("Profile", new NoOpSerde<>()); + streamGraph.getInputStream(profileISD) .map(m -> new KV(m.getMemberId(), m)) .sendTo(table); - streamGraph.getInputStream("PageView", new NoOpSerde<PageView>()) + GenericInputDescriptor<PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>()); + streamGraph.getInputStream(pageViewISD) .map(pv -> { received.add(pv); return pv; @@ -207,8 +212,11 @@ public class TestLocalTable extends AbstractIntegrationTestHarness { Table<KV<Integer, Profile>> profileTable = streamGraph.getTable(new InMemoryTableDescriptor("t1") .withSerde(profileKVSerde)); - MessageStream<Profile> profileStream1 = streamGraph.getInputStream("Profile1", new NoOpSerde<Profile>()); - MessageStream<Profile> profileStream2 = streamGraph.getInputStream("Profile2", new NoOpSerde<Profile>()); + DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test"); + GenericInputDescriptor<Profile> profileISD1 = ksd.getInputDescriptor("Profile1", new NoOpSerde<>()); + GenericInputDescriptor<Profile> profileISD2 = ksd.getInputDescriptor("Profile2", new NoOpSerde<>()); + MessageStream<Profile> profileStream1 = streamGraph.getInputStream(profileISD1); + MessageStream<Profile> profileStream2 = streamGraph.getInputStream(profileISD2); profileStream1 .map(m -> { @@ -223,8 +231,10 @@ public class TestLocalTable extends AbstractIntegrationTestHarness { }) .sendTo(profileTable); - MessageStream<PageView> pageViewStream1 = streamGraph.getInputStream("PageView1", new NoOpSerde<PageView>()); - MessageStream<PageView> pageViewStream2 = streamGraph.getInputStream("PageView2", new NoOpSerde<PageView>()); + GenericInputDescriptor<PageView> pageViewISD1 = ksd.getInputDescriptor("PageView1", new NoOpSerde<PageView>()); + GenericInputDescriptor<PageView> pageViewISD2 = ksd.getInputDescriptor("PageView2", new NoOpSerde<PageView>()); + MessageStream<PageView> pageViewStream1 = streamGraph.getInputStream(pageViewISD1); + MessageStream<PageView> pageViewStream2 = streamGraph.getInputStream(pageViewISD2); pageViewStream1 .partitionBy(PageView::getMemberId, v -> v, pageViewKVSerde, "p1") http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java index 1e45f5e..cc969be 100644 --- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java +++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java @@ -20,6 +20,7 @@ package org.apache.samza.test.table; import com.google.common.collect.ImmutableList; + import java.time.Duration; import java.util.Arrays; import java.util.HashMap; @@ -40,6 +41,7 @@ import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.storage.kv.Entry; import org.apache.samza.storage.kv.RocksDbTableDescriptor; import org.apache.samza.storage.kv.inmemory.InMemoryTableDescriptor; +import org.apache.samza.system.kafka.KafkaSystemDescriptor; import org.apache.samza.table.Table; import org.apache.samza.test.framework.TestRunner; import org.apache.samza.test.framework.stream.CollectionStream; @@ -47,10 +49,10 @@ import org.apache.samza.test.harness.AbstractIntegrationTestHarness; import org.junit.Test; import static org.apache.samza.test.table.TestTableData.*; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; + public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness { private static final String PAGEVIEW_STREAM = "pageview"; private static final String PROFILE_STREAM = "profile"; @@ -126,11 +128,12 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness @Override public void init(StreamGraph graph, Config config) { Table<KV<Integer, TestTableData.Profile>> table = graph.getTable(getTableDescriptor()); - - graph.getInputStream(PAGEVIEW_STREAM, new NoOpSerde<TestTableData.PageView>()) + KafkaSystemDescriptor sd = + new KafkaSystemDescriptor(config.get(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), PAGEVIEW_STREAM))); + graph.getInputStream(sd.getInputDescriptor(PAGEVIEW_STREAM, new NoOpSerde<TestTableData.PageView>())) .partitionBy(TestTableData.PageView::getMemberId, v -> v, "partition-page-view") .join(table, new PageViewToProfileJoinFunction()) - .sendTo(graph.getOutputStream(ENRICHED_PAGEVIEW_STREAM, new NoOpSerde<>())); + .sendTo(graph.getOutputStream(sd.getOutputDescriptor(ENRICHED_PAGEVIEW_STREAM, new NoOpSerde<>()))); } protected TableDescriptor<Integer, Profile, ?> getTableDescriptor() { http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java index eb9fbe9..e6e73bb 100644 --- a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java +++ b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java @@ -19,6 +19,8 @@ package org.apache.samza.test.table; +import com.google.common.cache.CacheBuilder; + import java.io.IOException; import java.io.ObjectInputStream; import java.time.Duration; @@ -32,7 +34,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; - import org.apache.samza.SamzaException; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.MapConfig; @@ -42,27 +43,27 @@ import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.metrics.Timer; import org.apache.samza.operators.KV; import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.descriptors.GenericInputDescriptor; +import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor; import org.apache.samza.runtime.LocalApplicationRunner; import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.table.Table; import org.apache.samza.table.caching.CachingTableDescriptor; import org.apache.samza.table.caching.guava.GuavaCacheTableDescriptor; +import org.apache.samza.table.remote.RemoteReadWriteTable; +import org.apache.samza.table.remote.RemoteReadableTable; +import org.apache.samza.table.remote.RemoteTableDescriptor; import org.apache.samza.table.remote.TableRateLimiter; import org.apache.samza.table.remote.TableReadFunction; import org.apache.samza.table.remote.TableWriteFunction; -import org.apache.samza.table.remote.RemoteReadableTable; -import org.apache.samza.table.remote.RemoteTableDescriptor; -import org.apache.samza.table.remote.RemoteReadWriteTable; import org.apache.samza.task.TaskContext; import org.apache.samza.test.harness.AbstractIntegrationTestHarness; import org.apache.samza.test.util.Base64Serializer; import org.apache.samza.util.RateLimiter; -import com.google.common.cache.CacheBuilder; import org.junit.Assert; import org.junit.Test; import static org.apache.samza.test.table.TestTableData.*; - import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doReturn; @@ -185,7 +186,9 @@ public class TestRemoteTable extends AbstractIntegrationTestHarness { inputTable = getCachingTable(inputTable, defaultCache, "input", streamGraph); } - streamGraph.getInputStream("PageView", new NoOpSerde<PageView>()) + DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test"); + GenericInputDescriptor<TestTableData.PageView> isd = ksd.getInputDescriptor("PageView", new NoOpSerde<>()); + streamGraph.getInputStream(isd) .map(pv -> new KV<>(pv.getMemberId(), pv)) .join(inputTable, new PageViewToProfileJoinFunction()) .map(m -> new KV(m.getMemberId(), m)) http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java ---------------------------------------------------------------------- diff --git a/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java b/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java index 5456db6..dc11c3f 100644 --- a/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java +++ b/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java @@ -20,6 +20,8 @@ package org.apache.samza.tools.benchmark; import com.google.common.base.Joiner; +import scala.Option; + import java.io.IOException; import java.time.Duration; import java.time.Instant; @@ -32,10 +34,14 @@ import org.apache.commons.cli.ParseException; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.JobConfig; import org.apache.samza.config.JobCoordinatorConfig; +import org.apache.samza.config.SystemConfig; import org.apache.samza.config.TaskConfig; import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.descriptors.GenericInputDescriptor; +import org.apache.samza.operators.descriptors.GenericSystemDescriptor; import org.apache.samza.operators.functions.MapFunction; import org.apache.samza.runtime.LocalApplicationRunner; +import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.standalone.PassthroughJobCoordinatorFactory; @@ -69,7 +75,10 @@ public class SystemConsumerWithSamzaBench extends AbstractSamzaBench { super.start(); MessageConsumer consumeFn = new MessageConsumer(); StreamApplication app = (graph, config) -> { - MessageStream<Object> stream = graph.getInputStream(streamId); + String systemFactoryName = new SystemConfig(config).getSystemFactory(systemName).get(); + GenericSystemDescriptor sd = new GenericSystemDescriptor(systemName, systemFactoryName); + GenericInputDescriptor<Object> isd = sd.getInputDescriptor(streamId, new NoOpSerde<>()); + MessageStream<Object> stream = graph.getInputStream(isd); stream.map(consumeFn); };