Repository: samza Updated Branches: refs/heads/master b8058af0a -> 66dcc4cb6
SAMZA-1904: Added test case in TestLocalTable for low level API As per subject, adding a new test case that uses TaskApplication Author: Wei Song <ws...@linkedin.com> Reviewers: Aditya Toomula <atoom...@linkedin.com> Closes #656 from weisong44/SAMZA-1904 and squashes the following commits: ee3942ff [Wei Song] SAMZA-1904: Added test case in TestLocalTest for low level API 1c6a2eae [Wei Song] Merge remote-tracking branch 'upstream/master' a6c94add [Wei Song] Merge remote-tracking branch 'upstream/master' 41299b5b [Wei Song] Merge remote-tracking branch 'upstream/master' 239a0950 [Wei Song] Merge remote-tracking branch 'upstream/master' eca00204 [Wei Song] Merge remote-tracking branch 'upstream/master' 51562391 [Wei Song] Merge remote-tracking branch 'upstream/master' de708f5e [Wei Song] Merge remote-tracking branch 'upstream/master' df2f8d7b [Wei Song] Merge remote-tracking branch 'upstream/master' f28b491d [Wei Song] Merge remote-tracking branch 'upstream/master' 4782c61d [Wei Song] Merge remote-tracking branch 'upstream/master' 0440f75f [Wei Song] Merge remote-tracking branch 'upstream/master' aae0f380 [Wei Song] Merge remote-tracking branch 'upstream/master' a15a7c9a [Wei Song] Merge remote-tracking branch 'upstream/master' 5cbf9af9 [Wei Song] Merge remote-tracking branch 'upstream/master' 3f7ed71f [Wei Song] Added self to committer list Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/66dcc4cb Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/66dcc4cb Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/66dcc4cb Branch: refs/heads/master Commit: 66dcc4cb60bb104fa6795b5a1f7dd8442fdf165c Parents: b8058af Author: Wei Song <ws...@linkedin.com> Authored: Mon Sep 24 17:19:23 2018 -0700 Committer: Wei Song <ws...@linkedin.com> Committed: Mon Sep 24 17:19:23 2018 -0700 ---------------------------------------------------------------------- .../apache/samza/test/table/TestLocalTable.java | 53 +++++++++++++++++++- 1 file changed, 51 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/66dcc4cb/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java index e1386c8..419f6c8 100644 --- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java +++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java @@ -26,6 +26,8 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import org.apache.samza.application.StreamApplication; +import org.apache.samza.application.TaskApplication; +import org.apache.samza.application.TaskApplicationDescriptor; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.config.JobCoordinatorConfig; @@ -39,8 +41,8 @@ import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.metrics.Timer; import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.descriptors.GenericInputDescriptor; import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor; +import org.apache.samza.operators.descriptors.GenericInputDescriptor; import org.apache.samza.operators.functions.MapFunction; import org.apache.samza.runtime.LocalApplicationRunner; import org.apache.samza.serializers.IntegerSerde; @@ -52,9 +54,16 @@ import org.apache.samza.storage.kv.Entry; import org.apache.samza.storage.kv.KeyValueStore; import org.apache.samza.storage.kv.LocalStoreBackedReadWriteTable; import org.apache.samza.storage.kv.inmemory.InMemoryTableDescriptor; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.table.ReadWriteTable; import org.apache.samza.table.ReadableTable; import org.apache.samza.table.Table; +import org.apache.samza.task.InitableTask; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.StreamTask; +import org.apache.samza.task.StreamTaskFactory; import org.apache.samza.task.TaskContext; +import org.apache.samza.task.TaskCoordinator; import org.apache.samza.test.harness.AbstractIntegrationTestHarness; import org.apache.samza.test.util.ArraySystemFactory; import org.apache.samza.test.util.Base64Serializer; @@ -73,7 +82,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; - /** * This test class tests sendTo() and join() for local tables */ @@ -388,4 +396,45 @@ public class TestLocalTable extends AbstractIntegrationTestHarness { table.deleteAllAsync(Arrays.asList("foo1", "foo2")).get(); verify(kvStore, times(1)).deleteAll(anyList()); } + + @Test + public void testWithLowLevelApi() throws Exception { + + Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect()); + configs.put("streams.PageView.samza.system", "test"); + configs.put("streams.PageView.source", Base64Serializer.serialize(TestTableData.generatePageViews(10))); + configs.put("streams.PageView.partitionCount", String.valueOf(4)); + configs.put("task.inputs", "test.PageView"); + + final LocalApplicationRunner runner = new LocalApplicationRunner(new MyTaskApplication(), new MapConfig(configs)); + runner.run(); + runner.waitForFinish(); + } + + static public class MyTaskApplication implements TaskApplication { + @Override + public void describe(TaskApplicationDescriptor appDesc) { + appDesc.setTaskFactory((StreamTaskFactory) () -> new MyStreamTask()); + appDesc.addTable(new InMemoryTableDescriptor("t1", KVSerde.of(new IntegerSerde(), new PageViewJsonSerde()))); + DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test"); + GenericInputDescriptor<PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>()); + appDesc.addInputStream(pageViewISD); + } + } + + static public class MyStreamTask implements StreamTask, InitableTask { + private ReadWriteTable<Integer, PageView> pageViewTable; + @Override + public void init(Config config, TaskContext context) throws Exception { + pageViewTable = (ReadWriteTable<Integer, PageView>) context.getTable("t1"); + } + @Override + public void process(IncomingMessageEnvelope message, MessageCollector collector, TaskCoordinator coordinator) { + PageView pv = (PageView) message.getMessage(); + pageViewTable.put(pv.getMemberId(), pv); + PageView pv2 = pageViewTable.get(pv.getMemberId()); + Assert.assertEquals(pv.getMemberId(), pv2.getMemberId()); + Assert.assertEquals(pv.getPageKey(), pv2.getPageKey()); + } + } }