Repository: samza Updated Branches: refs/heads/master b0b292200 -> abf49eaaa
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java index 5b29fe7..fbf0539 100644 --- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java +++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java @@ -94,19 +94,20 @@ public class TestLocalTable extends AbstractIntegrationTestHarness { MyMapFunction mapFn = new MyMapFunction(); - final LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(configs)); - final StreamApplication app = (streamGraph, cfg) -> { + final StreamApplication app = appDesc -> { - Table<KV<Integer, Profile>> table = streamGraph.getTable(new InMemoryTableDescriptor("t1") + Table<KV<Integer, Profile>> table = appDesc.getTable(new InMemoryTableDescriptor("t1") .withSerde(KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))); DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test"); GenericInputDescriptor<Profile> isd = ksd.getInputDescriptor("Profile", new NoOpSerde<>()); - streamGraph.getInputStream(isd) + + appDesc.getInputStream(isd) .map(mapFn) .sendTo(table); }; - runner.run(app); + final LocalApplicationRunner runner = new LocalApplicationRunner(app, new MapConfig(configs)); + runner.run(); runner.waitForFinish(); for (int i = 0; i < partitionCount; i++) { @@ -130,19 +131,18 @@ public class TestLocalTable extends AbstractIntegrationTestHarness { } void runTest() { - final LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(configs)); - final StreamApplication app = (streamGraph, cfg) -> { + final StreamApplication app = appDesc -> { - Table<KV<Integer, Profile>> table = streamGraph.getTable( + Table<KV<Integer, Profile>> table = appDesc.getTable( new InMemoryTableDescriptor("t1").withSerde(KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))); DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test"); GenericInputDescriptor<Profile> profileISD = ksd.getInputDescriptor("Profile", new NoOpSerde<>()); - streamGraph.getInputStream(profileISD) + appDesc.getInputStream(profileISD) .map(m -> new KV(m.getMemberId(), m)) .sendTo(table); GenericInputDescriptor<PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>()); - streamGraph.getInputStream(pageViewISD) + appDesc.getInputStream(pageViewISD) .map(pv -> { received.add(pv); return pv; @@ -152,7 +152,8 @@ public class TestLocalTable extends AbstractIntegrationTestHarness { .sink((m, collector, coordinator) -> joined.add(m)); }; - runner.run(app); + final LocalApplicationRunner runner = new LocalApplicationRunner(app, new MapConfig(configs)); + runner.run(); runner.waitForFinish(); assertEquals(count * partitionCount, received.size()); @@ -206,17 +207,16 @@ public class TestLocalTable extends AbstractIntegrationTestHarness { PageViewToProfileJoinFunction joinFn1 = new PageViewToProfileJoinFunction(); PageViewToProfileJoinFunction joinFn2 = new PageViewToProfileJoinFunction(); - final LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(configs)); - final StreamApplication app = (streamGraph, cfg) -> { + final StreamApplication app = appDesc -> { - Table<KV<Integer, Profile>> profileTable = streamGraph.getTable(new InMemoryTableDescriptor("t1") + Table<KV<Integer, Profile>> profileTable = appDesc.getTable(new InMemoryTableDescriptor("t1") .withSerde(profileKVSerde)); DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test"); GenericInputDescriptor<Profile> profileISD1 = ksd.getInputDescriptor("Profile1", new NoOpSerde<>()); GenericInputDescriptor<Profile> profileISD2 = ksd.getInputDescriptor("Profile2", new NoOpSerde<>()); - MessageStream<Profile> profileStream1 = streamGraph.getInputStream(profileISD1); - MessageStream<Profile> profileStream2 = streamGraph.getInputStream(profileISD2); + MessageStream<Profile> profileStream1 = appDesc.getInputStream(profileISD1); + MessageStream<Profile> profileStream2 = appDesc.getInputStream(profileISD2); profileStream1 .map(m -> { @@ -233,8 +233,8 @@ public class TestLocalTable extends AbstractIntegrationTestHarness { GenericInputDescriptor<PageView> pageViewISD1 = ksd.getInputDescriptor("PageView1", new NoOpSerde<PageView>()); GenericInputDescriptor<PageView> pageViewISD2 = ksd.getInputDescriptor("PageView2", new NoOpSerde<PageView>()); - MessageStream<PageView> pageViewStream1 = streamGraph.getInputStream(pageViewISD1); - MessageStream<PageView> pageViewStream2 = streamGraph.getInputStream(pageViewISD2); + MessageStream<PageView> pageViewStream1 = appDesc.getInputStream(pageViewISD1); + MessageStream<PageView> pageViewStream2 = appDesc.getInputStream(pageViewISD2); pageViewStream1 .partitionBy(PageView::getMemberId, v -> v, pageViewKVSerde, "p1") @@ -247,7 +247,8 @@ public class TestLocalTable extends AbstractIntegrationTestHarness { .sink((m, collector, coordinator) -> joinedPageViews2.add(m)); }; - runner.run(app); + final LocalApplicationRunner runner = new LocalApplicationRunner(app, new MapConfig(configs)); + runner.run(); runner.waitForFinish(); assertEquals(count * partitionCount, sentToProfileTable1.size()); http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java index 575178c..0d9df8b 100644 --- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java +++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java @@ -27,13 +27,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import org.apache.samza.application.StreamApplicationDescriptor; import org.apache.samza.application.StreamApplication; -import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; import org.apache.samza.config.StreamConfig; import org.apache.samza.operators.KV; -import org.apache.samza.operators.StreamGraph; import org.apache.samza.operators.TableDescriptor; import org.apache.samza.serializers.IntegerSerde; import org.apache.samza.serializers.KVSerde; @@ -127,14 +126,14 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness static final String PROFILE_TABLE = "profile-table"; @Override - public void init(StreamGraph graph, Config config) { - Table<KV<Integer, TestTableData.Profile>> table = graph.getTable(getTableDescriptor()); + public void describe(StreamApplicationDescriptor appDesc) { + Table<KV<Integer, TestTableData.Profile>> table = appDesc.getTable(getTableDescriptor()); KafkaSystemDescriptor sd = - new KafkaSystemDescriptor(config.get(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), PAGEVIEW_STREAM))); - graph.getInputStream(sd.getInputDescriptor(PAGEVIEW_STREAM, new NoOpSerde<TestTableData.PageView>())) + new KafkaSystemDescriptor(appDesc.getConfig().get(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), PAGEVIEW_STREAM))); + appDesc.getInputStream(sd.getInputDescriptor(PAGEVIEW_STREAM, new NoOpSerde<TestTableData.PageView>())) .partitionBy(TestTableData.PageView::getMemberId, v -> v, "partition-page-view") .join(table, new PageViewToProfileJoinFunction()) - .sendTo(graph.getOutputStream(sd.getOutputDescriptor(ENRICHED_PAGEVIEW_STREAM, new NoOpSerde<>()))); + .sendTo(appDesc.getOutputStream(sd.getOutputDescriptor(ENRICHED_PAGEVIEW_STREAM, new NoOpSerde<>()))); } protected TableDescriptor<Integer, Profile, ?> getTableDescriptor() { http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java index e6e73bb..d79683e 100644 --- a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java +++ b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java @@ -20,7 +20,6 @@ package org.apache.samza.test.table; import com.google.common.cache.CacheBuilder; - import java.io.IOException; import java.io.ObjectInputStream; import java.time.Duration; @@ -35,6 +34,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; import org.apache.samza.SamzaException; +import org.apache.samza.application.StreamApplicationDescriptor; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.MapConfig; import org.apache.samza.container.SamzaContainerContext; @@ -42,7 +42,6 @@ import org.apache.samza.metrics.Counter; import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.metrics.Timer; import org.apache.samza.operators.KV; -import org.apache.samza.operators.StreamGraph; import org.apache.samza.operators.descriptors.GenericInputDescriptor; import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor; import org.apache.samza.runtime.LocalApplicationRunner; @@ -127,7 +126,7 @@ public class TestRemoteTable extends AbstractIntegrationTestHarness { } } - private <K, V> Table<KV<K, V>> getCachingTable(Table<KV<K, V>> actualTable, boolean defaultCache, String id, StreamGraph streamGraph) { + private <K, V> Table<KV<K, V>> getCachingTable(Table<KV<K, V>> actualTable, boolean defaultCache, String id, StreamApplicationDescriptor appDesc) { CachingTableDescriptor<K, V> cachingDesc = new CachingTableDescriptor<>("caching-table-" + id); if (defaultCache) { cachingDesc.withReadTtl(Duration.ofMinutes(5)); @@ -135,12 +134,12 @@ public class TestRemoteTable extends AbstractIntegrationTestHarness { } else { GuavaCacheTableDescriptor<K, V> guavaDesc = new GuavaCacheTableDescriptor<>("guava-table-" + id); guavaDesc.withCache(CacheBuilder.newBuilder().expireAfterAccess(5, TimeUnit.MINUTES).build()); - Table<KV<K, V>> guavaTable = streamGraph.getTable(guavaDesc); + Table<KV<K, V>> guavaTable = appDesc.getTable(guavaDesc); cachingDesc.withCache(guavaTable); } cachingDesc.withTable(actualTable); - return streamGraph.getTable(cachingDesc); + return appDesc.getTable(cachingDesc); } private void doTestStreamTableJoinRemoteTable(boolean withCache, boolean defaultCache, String testName) throws Exception { @@ -161,9 +160,8 @@ public class TestRemoteTable extends AbstractIntegrationTestHarness { final RateLimiter readRateLimiter = mock(RateLimiter.class); final RateLimiter writeRateLimiter = mock(RateLimiter.class); - final LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(configs)); - final StreamApplication app = (streamGraph, cfg) -> { - RemoteTableDescriptor<Integer, Profile> inputTableDesc = new RemoteTableDescriptor<>("profile-table-1"); + final StreamApplication app = appDesc -> { + RemoteTableDescriptor<Integer, TestTableData.Profile> inputTableDesc = new RemoteTableDescriptor<>("profile-table-1"); inputTableDesc .withReadFunction(InMemoryReadFunction.getInMemoryReadFunction(profiles)) .withRateLimiter(readRateLimiter, null, null); @@ -174,28 +172,29 @@ public class TestRemoteTable extends AbstractIntegrationTestHarness { .withWriteFunction(writer) .withRateLimiter(writeRateLimiter, null, null); - Table<KV<Integer, EnrichedPageView>> outputTable = streamGraph.getTable(outputTableDesc); + Table<KV<Integer, EnrichedPageView>> outputTable = appDesc.getTable(outputTableDesc); if (withCache) { - outputTable = getCachingTable(outputTable, defaultCache, "output", streamGraph); + outputTable = getCachingTable(outputTable, defaultCache, "output", appDesc); } - Table<KV<Integer, Profile>> inputTable = streamGraph.getTable(inputTableDesc); + Table<KV<Integer, Profile>> inputTable = appDesc.getTable(inputTableDesc); if (withCache) { - inputTable = getCachingTable(inputTable, defaultCache, "input", streamGraph); + inputTable = getCachingTable(inputTable, defaultCache, "input", appDesc); } DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test"); GenericInputDescriptor<TestTableData.PageView> isd = ksd.getInputDescriptor("PageView", new NoOpSerde<>()); - streamGraph.getInputStream(isd) + appDesc.getInputStream(isd) .map(pv -> new KV<>(pv.getMemberId(), pv)) .join(inputTable, new PageViewToProfileJoinFunction()) .map(m -> new KV(m.getMemberId(), m)) .sendTo(outputTable); }; - runner.run(app); + final LocalApplicationRunner runner = new LocalApplicationRunner(app, new MapConfig(configs)); + runner.run(); runner.waitForFinish(); int numExpected = count * partitionCount; http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java ---------------------------------------------------------------------- diff --git a/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java b/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java index dc11c3f..f7805fe 100644 --- a/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java +++ b/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java @@ -20,7 +20,6 @@ package org.apache.samza.tools.benchmark; import com.google.common.base.Joiner; -import scala.Option; import java.io.IOException; import java.time.Duration; @@ -34,12 +33,15 @@ import org.apache.commons.cli.ParseException; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.JobConfig; import org.apache.samza.config.JobCoordinatorConfig; +import org.apache.samza.config.MapConfig; import org.apache.samza.config.SystemConfig; import org.apache.samza.config.TaskConfig; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.descriptors.GenericInputDescriptor; import org.apache.samza.operators.descriptors.GenericSystemDescriptor; import org.apache.samza.operators.functions.MapFunction; +import org.apache.samza.runtime.ApplicationRunner; +import org.apache.samza.runtime.ApplicationRunners; import org.apache.samza.runtime.LocalApplicationRunner; import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.standalone.PassthroughJobCoordinatorFactory; @@ -71,18 +73,18 @@ public class SystemConsumerWithSamzaBench extends AbstractSamzaBench { } public void start() throws IOException, InterruptedException { - LocalApplicationRunner runner = new LocalApplicationRunner(config); super.start(); MessageConsumer consumeFn = new MessageConsumer(); - StreamApplication app = (graph, config) -> { + StreamApplication app = appDesc -> { String systemFactoryName = new SystemConfig(config).getSystemFactory(systemName).get(); GenericSystemDescriptor sd = new GenericSystemDescriptor(systemName, systemFactoryName); GenericInputDescriptor<Object> isd = sd.getInputDescriptor(streamId, new NoOpSerde<>()); - MessageStream<Object> stream = graph.getInputStream(isd); + MessageStream<Object> stream = appDesc.getInputStream(isd); stream.map(consumeFn); }; + ApplicationRunner runner = ApplicationRunners.getApplicationRunner(app, new MapConfig()); - runner.run(app); + runner.run(); while (consumeFn.getEventsConsumed() < totalEvents) { Thread.sleep(10); @@ -90,7 +92,7 @@ public class SystemConsumerWithSamzaBench extends AbstractSamzaBench { Instant endTime = Instant.now(); - runner.kill(app); + runner.kill(); System.out.println("\n*******************"); System.out.println(String.format("Started at %s Ending at %s ", consumeFn.startTime, endTime));