Repository: samza
Updated Branches:
  refs/heads/master b0b292200 -> abf49eaaa


http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java 
b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
index 5b29fe7..fbf0539 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
@@ -94,19 +94,20 @@ public class TestLocalTable extends 
AbstractIntegrationTestHarness {
 
     MyMapFunction mapFn = new MyMapFunction();
 
-    final LocalApplicationRunner runner = new LocalApplicationRunner(new 
MapConfig(configs));
-    final StreamApplication app = (streamGraph, cfg) -> {
+    final StreamApplication app = appDesc -> {
 
-      Table<KV<Integer, Profile>> table = streamGraph.getTable(new 
InMemoryTableDescriptor("t1")
+      Table<KV<Integer, Profile>> table = appDesc.getTable(new 
InMemoryTableDescriptor("t1")
           .withSerde(KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
       DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
       GenericInputDescriptor<Profile> isd = ksd.getInputDescriptor("Profile", 
new NoOpSerde<>());
-      streamGraph.getInputStream(isd)
+
+      appDesc.getInputStream(isd)
           .map(mapFn)
           .sendTo(table);
     };
 
-    runner.run(app);
+    final LocalApplicationRunner runner = new LocalApplicationRunner(app, new 
MapConfig(configs));
+    runner.run();
     runner.waitForFinish();
 
     for (int i = 0; i < partitionCount; i++) {
@@ -130,19 +131,18 @@ public class TestLocalTable extends 
AbstractIntegrationTestHarness {
     }
 
     void runTest() {
-      final LocalApplicationRunner runner = new LocalApplicationRunner(new 
MapConfig(configs));
-      final StreamApplication app = (streamGraph, cfg) -> {
+      final StreamApplication app = appDesc -> {
 
-        Table<KV<Integer, Profile>> table = streamGraph.getTable(
+        Table<KV<Integer, Profile>> table = appDesc.getTable(
             new InMemoryTableDescriptor("t1").withSerde(KVSerde.of(new 
IntegerSerde(), new ProfileJsonSerde())));
         DelegatingSystemDescriptor ksd = new 
DelegatingSystemDescriptor("test");
         GenericInputDescriptor<Profile> profileISD = 
ksd.getInputDescriptor("Profile", new NoOpSerde<>());
-        streamGraph.getInputStream(profileISD)
+        appDesc.getInputStream(profileISD)
             .map(m -> new KV(m.getMemberId(), m))
             .sendTo(table);
 
         GenericInputDescriptor<PageView> pageViewISD = 
ksd.getInputDescriptor("PageView", new NoOpSerde<>());
-        streamGraph.getInputStream(pageViewISD)
+        appDesc.getInputStream(pageViewISD)
             .map(pv -> {
                 received.add(pv);
                 return pv;
@@ -152,7 +152,8 @@ public class TestLocalTable extends 
AbstractIntegrationTestHarness {
             .sink((m, collector, coordinator) -> joined.add(m));
       };
 
-      runner.run(app);
+      final LocalApplicationRunner runner = new LocalApplicationRunner(app, 
new MapConfig(configs));
+      runner.run();
       runner.waitForFinish();
 
       assertEquals(count * partitionCount, received.size());
@@ -206,17 +207,16 @@ public class TestLocalTable extends 
AbstractIntegrationTestHarness {
       PageViewToProfileJoinFunction joinFn1 = new 
PageViewToProfileJoinFunction();
       PageViewToProfileJoinFunction joinFn2 = new 
PageViewToProfileJoinFunction();
 
-      final LocalApplicationRunner runner = new LocalApplicationRunner(new 
MapConfig(configs));
-      final StreamApplication app = (streamGraph, cfg) -> {
+      final StreamApplication app = appDesc -> {
 
-        Table<KV<Integer, Profile>> profileTable = streamGraph.getTable(new 
InMemoryTableDescriptor("t1")
+        Table<KV<Integer, Profile>> profileTable = appDesc.getTable(new 
InMemoryTableDescriptor("t1")
             .withSerde(profileKVSerde));
 
         DelegatingSystemDescriptor ksd = new 
DelegatingSystemDescriptor("test");
         GenericInputDescriptor<Profile> profileISD1 = 
ksd.getInputDescriptor("Profile1", new NoOpSerde<>());
         GenericInputDescriptor<Profile> profileISD2 = 
ksd.getInputDescriptor("Profile2", new NoOpSerde<>());
-        MessageStream<Profile> profileStream1 = 
streamGraph.getInputStream(profileISD1);
-        MessageStream<Profile> profileStream2 = 
streamGraph.getInputStream(profileISD2);
+        MessageStream<Profile> profileStream1 = 
appDesc.getInputStream(profileISD1);
+        MessageStream<Profile> profileStream2 = 
appDesc.getInputStream(profileISD2);
 
         profileStream1
             .map(m -> {
@@ -233,8 +233,8 @@ public class TestLocalTable extends 
AbstractIntegrationTestHarness {
 
         GenericInputDescriptor<PageView> pageViewISD1 = 
ksd.getInputDescriptor("PageView1", new NoOpSerde<PageView>());
         GenericInputDescriptor<PageView> pageViewISD2 = 
ksd.getInputDescriptor("PageView2", new NoOpSerde<PageView>());
-        MessageStream<PageView> pageViewStream1 = 
streamGraph.getInputStream(pageViewISD1);
-        MessageStream<PageView> pageViewStream2 = 
streamGraph.getInputStream(pageViewISD2);
+        MessageStream<PageView> pageViewStream1 = 
appDesc.getInputStream(pageViewISD1);
+        MessageStream<PageView> pageViewStream2 = 
appDesc.getInputStream(pageViewISD2);
 
         pageViewStream1
             .partitionBy(PageView::getMemberId, v -> v, pageViewKVSerde, "p1")
@@ -247,7 +247,8 @@ public class TestLocalTable extends 
AbstractIntegrationTestHarness {
             .sink((m, collector, coordinator) -> joinedPageViews2.add(m));
       };
 
-      runner.run(app);
+      final LocalApplicationRunner runner = new LocalApplicationRunner(app, 
new MapConfig(configs));
+      runner.run();
       runner.waitForFinish();
 
       assertEquals(count * partitionCount, sentToProfileTable1.size());

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
 
b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
index 575178c..0d9df8b 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
@@ -27,13 +27,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
+import org.apache.samza.application.StreamApplicationDescriptor;
 import org.apache.samza.application.StreamApplication;
-import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.StreamConfig;
 import org.apache.samza.operators.KV;
-import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.operators.TableDescriptor;
 import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.KVSerde;
@@ -127,14 +126,14 @@ public class TestLocalTableWithSideInputs extends 
AbstractIntegrationTestHarness
     static final String PROFILE_TABLE = "profile-table";
 
     @Override
-    public void init(StreamGraph graph, Config config) {
-      Table<KV<Integer, TestTableData.Profile>> table = 
graph.getTable(getTableDescriptor());
+    public void describe(StreamApplicationDescriptor appDesc) {
+      Table<KV<Integer, TestTableData.Profile>> table = 
appDesc.getTable(getTableDescriptor());
       KafkaSystemDescriptor sd =
-          new 
KafkaSystemDescriptor(config.get(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(),
 PAGEVIEW_STREAM)));
-      graph.getInputStream(sd.getInputDescriptor(PAGEVIEW_STREAM, new 
NoOpSerde<TestTableData.PageView>()))
+          new 
KafkaSystemDescriptor(appDesc.getConfig().get(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(),
 PAGEVIEW_STREAM)));
+      appDesc.getInputStream(sd.getInputDescriptor(PAGEVIEW_STREAM, new 
NoOpSerde<TestTableData.PageView>()))
           .partitionBy(TestTableData.PageView::getMemberId, v -> v, 
"partition-page-view")
           .join(table, new PageViewToProfileJoinFunction())
-          
.sendTo(graph.getOutputStream(sd.getOutputDescriptor(ENRICHED_PAGEVIEW_STREAM, 
new NoOpSerde<>())));
+          
.sendTo(appDesc.getOutputStream(sd.getOutputDescriptor(ENRICHED_PAGEVIEW_STREAM,
 new NoOpSerde<>())));
     }
 
     protected TableDescriptor<Integer, Profile, ?> getTableDescriptor() {

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java 
b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
index e6e73bb..d79683e 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
@@ -20,7 +20,6 @@
 package org.apache.samza.test.table;
 
 import com.google.common.cache.CacheBuilder;
-
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.time.Duration;
@@ -35,6 +34,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.samza.SamzaException;
+import org.apache.samza.application.StreamApplicationDescriptor;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.container.SamzaContainerContext;
@@ -42,7 +42,6 @@ import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.metrics.Timer;
 import org.apache.samza.operators.KV;
-import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.operators.descriptors.GenericInputDescriptor;
 import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor;
 import org.apache.samza.runtime.LocalApplicationRunner;
@@ -127,7 +126,7 @@ public class TestRemoteTable extends 
AbstractIntegrationTestHarness {
     }
   }
 
-  private <K, V> Table<KV<K, V>> getCachingTable(Table<KV<K, V>> actualTable, 
boolean defaultCache, String id, StreamGraph streamGraph) {
+  private <K, V> Table<KV<K, V>> getCachingTable(Table<KV<K, V>> actualTable, 
boolean defaultCache, String id, StreamApplicationDescriptor appDesc) {
     CachingTableDescriptor<K, V> cachingDesc = new 
CachingTableDescriptor<>("caching-table-" + id);
     if (defaultCache) {
       cachingDesc.withReadTtl(Duration.ofMinutes(5));
@@ -135,12 +134,12 @@ public class TestRemoteTable extends 
AbstractIntegrationTestHarness {
     } else {
       GuavaCacheTableDescriptor<K, V> guavaDesc = new 
GuavaCacheTableDescriptor<>("guava-table-" + id);
       guavaDesc.withCache(CacheBuilder.newBuilder().expireAfterAccess(5, 
TimeUnit.MINUTES).build());
-      Table<KV<K, V>> guavaTable = streamGraph.getTable(guavaDesc);
+      Table<KV<K, V>> guavaTable = appDesc.getTable(guavaDesc);
       cachingDesc.withCache(guavaTable);
     }
 
     cachingDesc.withTable(actualTable);
-    return streamGraph.getTable(cachingDesc);
+    return appDesc.getTable(cachingDesc);
   }
 
   private void doTestStreamTableJoinRemoteTable(boolean withCache, boolean 
defaultCache, String testName) throws Exception {
@@ -161,9 +160,8 @@ public class TestRemoteTable extends 
AbstractIntegrationTestHarness {
 
     final RateLimiter readRateLimiter = mock(RateLimiter.class);
     final RateLimiter writeRateLimiter = mock(RateLimiter.class);
-    final LocalApplicationRunner runner = new LocalApplicationRunner(new 
MapConfig(configs));
-    final StreamApplication app = (streamGraph, cfg) -> {
-      RemoteTableDescriptor<Integer, Profile> inputTableDesc = new 
RemoteTableDescriptor<>("profile-table-1");
+    final StreamApplication app = appDesc -> {
+      RemoteTableDescriptor<Integer, TestTableData.Profile> inputTableDesc = 
new RemoteTableDescriptor<>("profile-table-1");
       inputTableDesc
           
.withReadFunction(InMemoryReadFunction.getInMemoryReadFunction(profiles))
           .withRateLimiter(readRateLimiter, null, null);
@@ -174,28 +172,29 @@ public class TestRemoteTable extends 
AbstractIntegrationTestHarness {
           .withWriteFunction(writer)
           .withRateLimiter(writeRateLimiter, null, null);
 
-      Table<KV<Integer, EnrichedPageView>> outputTable = 
streamGraph.getTable(outputTableDesc);
+      Table<KV<Integer, EnrichedPageView>> outputTable = 
appDesc.getTable(outputTableDesc);
 
       if (withCache) {
-        outputTable = getCachingTable(outputTable, defaultCache, "output", 
streamGraph);
+        outputTable = getCachingTable(outputTable, defaultCache, "output", 
appDesc);
       }
 
-      Table<KV<Integer, Profile>> inputTable = 
streamGraph.getTable(inputTableDesc);
+      Table<KV<Integer, Profile>> inputTable = 
appDesc.getTable(inputTableDesc);
 
       if (withCache) {
-        inputTable = getCachingTable(inputTable, defaultCache, "input", 
streamGraph);
+        inputTable = getCachingTable(inputTable, defaultCache, "input", 
appDesc);
       }
 
       DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
       GenericInputDescriptor<TestTableData.PageView> isd = 
ksd.getInputDescriptor("PageView", new NoOpSerde<>());
-      streamGraph.getInputStream(isd)
+      appDesc.getInputStream(isd)
           .map(pv -> new KV<>(pv.getMemberId(), pv))
           .join(inputTable, new PageViewToProfileJoinFunction())
           .map(m -> new KV(m.getMemberId(), m))
           .sendTo(outputTable);
     };
 
-    runner.run(app);
+    final LocalApplicationRunner runner = new LocalApplicationRunner(app, new 
MapConfig(configs));
+    runner.run();
     runner.waitForFinish();
 
     int numExpected = count * partitionCount;

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java
----------------------------------------------------------------------
diff --git 
a/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java
 
b/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java
index dc11c3f..f7805fe 100644
--- 
a/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java
+++ 
b/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java
@@ -20,7 +20,6 @@
 package org.apache.samza.tools.benchmark;
 
 import com.google.common.base.Joiner;
-import scala.Option;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -34,12 +33,15 @@ import org.apache.commons.cli.ParseException;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.SystemConfig;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.descriptors.GenericInputDescriptor;
 import org.apache.samza.operators.descriptors.GenericSystemDescriptor;
 import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunners;
 import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
@@ -71,18 +73,18 @@ public class SystemConsumerWithSamzaBench extends 
AbstractSamzaBench {
   }
 
   public void start() throws IOException, InterruptedException {
-    LocalApplicationRunner runner = new LocalApplicationRunner(config);
     super.start();
     MessageConsumer consumeFn = new MessageConsumer();
-    StreamApplication app = (graph, config) -> {
+    StreamApplication app = appDesc -> {
       String systemFactoryName = new 
SystemConfig(config).getSystemFactory(systemName).get();
       GenericSystemDescriptor sd = new GenericSystemDescriptor(systemName, 
systemFactoryName);
       GenericInputDescriptor<Object> isd = sd.getInputDescriptor(streamId, new 
NoOpSerde<>());
-      MessageStream<Object> stream = graph.getInputStream(isd);
+      MessageStream<Object> stream = appDesc.getInputStream(isd);
       stream.map(consumeFn);
     };
+    ApplicationRunner runner = ApplicationRunners.getApplicationRunner(app, 
new MapConfig());
 
-    runner.run(app);
+    runner.run();
 
     while (consumeFn.getEventsConsumed() < totalEvents) {
       Thread.sleep(10);
@@ -90,7 +92,7 @@ public class SystemConsumerWithSamzaBench extends 
AbstractSamzaBench {
 
     Instant endTime = Instant.now();
 
-    runner.kill(app);
+    runner.kill();
 
     System.out.println("\n*******************");
     System.out.println(String.format("Started at %s Ending at %s ", 
consumeFn.startTime, endTime));

Reply via email to