Repository: samza
Updated Branches:
  refs/heads/master b8058af0a -> 66dcc4cb6


SAMZA-1904: Added test case in TestLocalTable for low level API

As per subject, adding a new test case that uses TaskApplication

Author: Wei Song <ws...@linkedin.com>

Reviewers: Aditya Toomula <atoom...@linkedin.com>

Closes #656 from weisong44/SAMZA-1904 and squashes the following commits:

ee3942ff [Wei Song] SAMZA-1904: Added test case in TestLocalTest for low level 
API
1c6a2eae [Wei Song] Merge remote-tracking branch 'upstream/master'
a6c94add [Wei Song] Merge remote-tracking branch 'upstream/master'
41299b5b [Wei Song] Merge remote-tracking branch 'upstream/master'
239a0950 [Wei Song] Merge remote-tracking branch 'upstream/master'
eca00204 [Wei Song] Merge remote-tracking branch 'upstream/master'
51562391 [Wei Song] Merge remote-tracking branch 'upstream/master'
de708f5e [Wei Song] Merge remote-tracking branch 'upstream/master'
df2f8d7b [Wei Song] Merge remote-tracking branch 'upstream/master'
f28b491d [Wei Song] Merge remote-tracking branch 'upstream/master'
4782c61d [Wei Song] Merge remote-tracking branch 'upstream/master'
0440f75f [Wei Song] Merge remote-tracking branch 'upstream/master'
aae0f380 [Wei Song] Merge remote-tracking branch 'upstream/master'
a15a7c9a [Wei Song] Merge remote-tracking branch 'upstream/master'
5cbf9af9 [Wei Song] Merge remote-tracking branch 'upstream/master'
3f7ed71f [Wei Song] Added self to committer list


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/66dcc4cb
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/66dcc4cb
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/66dcc4cb

Branch: refs/heads/master
Commit: 66dcc4cb60bb104fa6795b5a1f7dd8442fdf165c
Parents: b8058af
Author: Wei Song <ws...@linkedin.com>
Authored: Mon Sep 24 17:19:23 2018 -0700
Committer: Wei Song <ws...@linkedin.com>
Committed: Mon Sep 24 17:19:23 2018 -0700

----------------------------------------------------------------------
 .../apache/samza/test/table/TestLocalTable.java | 53 +++++++++++++++++++-
 1 file changed, 51 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/66dcc4cb/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java 
b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
index e1386c8..419f6c8 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
@@ -26,6 +26,8 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.TaskApplication;
+import org.apache.samza.application.TaskApplicationDescriptor;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.JobCoordinatorConfig;
@@ -39,8 +41,8 @@ import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.metrics.Timer;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.descriptors.GenericInputDescriptor;
 import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor;
+import org.apache.samza.operators.descriptors.GenericInputDescriptor;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.serializers.IntegerSerde;
@@ -52,9 +54,16 @@ import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.storage.kv.LocalStoreBackedReadWriteTable;
 import org.apache.samza.storage.kv.inmemory.InMemoryTableDescriptor;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.table.ReadableTable;
 import org.apache.samza.table.Table;
+import org.apache.samza.task.InitableTask;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.StreamTaskFactory;
 import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
 import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
 import org.apache.samza.test.util.ArraySystemFactory;
 import org.apache.samza.test.util.Base64Serializer;
@@ -73,7 +82,6 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
-
 /**
  * This test class tests sendTo() and join() for local tables
  */
@@ -388,4 +396,45 @@ public class TestLocalTable extends 
AbstractIntegrationTestHarness {
     table.deleteAllAsync(Arrays.asList("foo1", "foo2")).get();
     verify(kvStore, times(1)).deleteAll(anyList());
   }
+
+  @Test
+  public void testWithLowLevelApi() throws Exception {
+
+    Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), 
zkConnect());
+    configs.put("streams.PageView.samza.system", "test");
+    configs.put("streams.PageView.source", 
Base64Serializer.serialize(TestTableData.generatePageViews(10)));
+    configs.put("streams.PageView.partitionCount", String.valueOf(4));
+    configs.put("task.inputs", "test.PageView");
+
+    final LocalApplicationRunner runner = new LocalApplicationRunner(new 
MyTaskApplication(), new MapConfig(configs));
+    runner.run();
+    runner.waitForFinish();
+  }
+
+  static public class MyTaskApplication implements TaskApplication {
+    @Override
+    public void describe(TaskApplicationDescriptor appDesc) {
+      appDesc.setTaskFactory((StreamTaskFactory) () -> new MyStreamTask());
+      appDesc.addTable(new InMemoryTableDescriptor("t1", KVSerde.of(new 
IntegerSerde(), new PageViewJsonSerde())));
+      DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
+      GenericInputDescriptor<PageView> pageViewISD = 
ksd.getInputDescriptor("PageView", new NoOpSerde<>());
+      appDesc.addInputStream(pageViewISD);
+    }
+  }
+
+  static public class MyStreamTask implements StreamTask, InitableTask {
+    private ReadWriteTable<Integer, PageView> pageViewTable;
+    @Override
+    public void init(Config config, TaskContext context) throws Exception {
+      pageViewTable = (ReadWriteTable<Integer, PageView>) 
context.getTable("t1");
+    }
+    @Override
+    public void process(IncomingMessageEnvelope message, MessageCollector 
collector, TaskCoordinator coordinator) {
+      PageView pv = (PageView) message.getMessage();
+      pageViewTable.put(pv.getMemberId(), pv);
+      PageView pv2 = pageViewTable.get(pv.getMemberId());
+      Assert.assertEquals(pv.getMemberId(), pv2.getMemberId());
+      Assert.assertEquals(pv.getPageKey(), pv2.getPageKey());
+    }
+  }
 }

Reply via email to