http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java index 6df3421..5aaad26 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java @@ -283,7 +283,7 @@ class JoinTranslator { // Create a table backed by RocksDb store with the fields in the join condition as composite key and relational // message as the value. Send the messages from the input stream denoted as 'table' to the created table store. Table<KV<SamzaSqlCompositeKey, SamzaSqlRelMessage>> table = - context.getStreamGraph().getTable(sourceConfig.getTableDescriptor().get()); + context.getStreamAppDescriptor().getTable(sourceConfig.getTableDescriptor().get()); relOutputStream .map(m -> new KV(createSamzaSqlCompositeKey(m, tableKeyIds), m))
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java index c422130..fe4d8da 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java @@ -21,7 +21,6 @@ package org.apache.samza.sql.translator; import java.util.Map; import java.util.Optional; - import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.RelRoot; import org.apache.calcite.rel.RelShuttleImpl; @@ -30,37 +29,37 @@ import org.apache.calcite.rel.logical.LogicalAggregate; import org.apache.calcite.rel.logical.LogicalFilter; import org.apache.calcite.rel.logical.LogicalJoin; import org.apache.calcite.rel.logical.LogicalProject; +import org.apache.samza.SamzaException; +import org.apache.samza.application.StreamApplicationDescriptor; import org.apache.samza.config.Config; import org.apache.samza.operators.ContextManager; -import org.apache.samza.SamzaException; import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.MessageStreamImpl; -import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.TableDescriptor; +import org.apache.samza.operators.functions.MapFunction; import org.apache.samza.operators.descriptors.GenericOutputDescriptor;; import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor; -import org.apache.samza.operators.functions.MapFunction; -import org.apache.samza.operators.TableDescriptor; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.sql.data.SamzaSqlExecutionContext; import org.apache.samza.sql.data.SamzaSqlRelMessage; import org.apache.samza.sql.interfaces.SamzaRelConverter; -import org.apache.samza.sql.interfaces.SqlIOResolver; import org.apache.samza.sql.interfaces.SqlIOConfig; +import org.apache.samza.sql.interfaces.SqlIOResolver; import org.apache.samza.sql.planner.QueryPlanner; import org.apache.samza.sql.runner.SamzaSqlApplicationConfig; import org.apache.samza.sql.testutil.SamzaSqlQueryParser; -import org.apache.samza.task.TaskContext; import org.apache.samza.table.Table; +import org.apache.samza.task.TaskContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * This class is used to populate the StreamGraph using the SQL queries. + * This class is used to populate the {@link StreamApplicationDescriptor} using the SQL queries. * This class contains the core of the SamzaSQL control code that converts the SQL statements to calcite relational graph. - * It then walks the relational graph and then populates the Samza's {@link StreamGraph} accordingly. + * It then walks the relational graph and then populates the Samza's {@link StreamApplicationDescriptor} accordingly. */ public class QueryTranslator { private static final Logger LOG = LoggerFactory.getLogger(QueryTranslator.class); @@ -96,13 +95,13 @@ public class QueryTranslator { this.converters = sqlConfig.getSamzaRelConverters(); } - public void translate(SamzaSqlQueryParser.QueryInfo queryInfo, StreamGraph streamGraph) { + public void translate(SamzaSqlQueryParser.QueryInfo queryInfo, StreamApplicationDescriptor appDesc) { QueryPlanner planner = new QueryPlanner(sqlConfig.getRelSchemaProviders(), sqlConfig.getInputSystemStreamConfigBySource(), sqlConfig.getUdfMetadata()); final SamzaSqlExecutionContext executionContext = new SamzaSqlExecutionContext(this.sqlConfig); final RelRoot relRoot = planner.plan(queryInfo.getSelectQuery()); - final TranslatorContext context = new TranslatorContext(streamGraph, relRoot, executionContext, this.converters); + final TranslatorContext context = new TranslatorContext(appDesc, relRoot, executionContext, this.converters); final RelNode node = relRoot.project(); final SqlIOResolver ioResolver = context.getExecutionContext().getSamzaSqlApplicationConfig().getIoResolver(); @@ -159,9 +158,9 @@ public class QueryTranslator { String systemName = sinkConfig.getSystemName(); DelegatingSystemDescriptor sd = context.getSystemDescriptors().computeIfAbsent(systemName, DelegatingSystemDescriptor::new); GenericOutputDescriptor<KV<Object, Object>> osd = sd.getOutputDescriptor(sinkConfig.getStreamName(), noOpKVSerde); - outputStream.sendTo(streamGraph.getOutputStream(osd)); + outputStream.sendTo(appDesc.getOutputStream(osd)); } else { - Table outputTable = streamGraph.getTable(tableDescriptor.get()); + Table outputTable = appDesc.getTable(tableDescriptor.get()); if (outputTable == null) { String msg = "Failed to obtain table descriptor of " + sinkConfig.getSource(); LOG.error(msg); @@ -170,7 +169,7 @@ public class QueryTranslator { outputStream.sendTo(outputTable); } - streamGraph.withContextManager(new ContextManager() { + appDesc.withContextManager(new ContextManager() { @Override public void init(Config config, TaskContext taskContext) { taskContext.setUserContext(context.clone()); http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java index 46e0840..2dc28be 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java @@ -23,10 +23,10 @@ import java.util.List; import java.util.Map; import org.apache.calcite.rel.core.TableScan; import org.apache.commons.lang.Validate; +import org.apache.samza.application.StreamApplicationDescriptor; import org.apache.samza.config.Config; import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.StreamGraph; import org.apache.samza.operators.descriptors.GenericInputDescriptor; import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor; import org.apache.samza.operators.functions.MapFunction; @@ -34,8 +34,8 @@ import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.sql.data.SamzaSqlRelMessage; import org.apache.samza.sql.interfaces.SamzaRelConverter; -import org.apache.samza.task.TaskContext; import org.apache.samza.sql.interfaces.SqlIOConfig; +import org.apache.samza.task.TaskContext; /** @@ -73,7 +73,7 @@ class ScanTranslator { } void translate(final TableScan tableScan, final TranslatorContext context) { - StreamGraph streamGraph = context.getStreamGraph(); + StreamApplicationDescriptor streamAppDesc = context.getStreamAppDescriptor(); List<String> tableNameParts = tableScan.getTable().getQualifiedName(); String sourceName = SqlIOConfig.getSourceFromSourceParts(tableNameParts); @@ -85,7 +85,7 @@ class ScanTranslator { KVSerde<Object, Object> noOpKVSerde = KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()); DelegatingSystemDescriptor sd = context.getSystemDescriptors().computeIfAbsent(systemName, DelegatingSystemDescriptor::new); GenericInputDescriptor<KV<Object, Object>> isd = sd.getInputDescriptor(streamName, noOpKVSerde); - MessageStream<KV<Object, Object>> inputStream = streamGraph.getInputStream(isd); + MessageStream<KV<Object, Object>> inputStream = streamAppDesc.getInputStream(isd); MessageStream<SamzaSqlRelMessage> samzaSqlRelMessageStream = inputStream.map(new ScanMapFunction(sourceName)); context.registerMessageStream(tableScan.getId(), samzaSqlRelMessageStream); http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java index e622d55..a7ab663 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java @@ -21,7 +21,6 @@ package org.apache.samza.sql.translator; import java.util.HashMap; import java.util.Map; - import java.util.TimeZone; import org.apache.calcite.DataContext; import org.apache.calcite.adapter.java.JavaTypeFactory; @@ -33,8 +32,8 @@ import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.rex.RexNode; import org.apache.calcite.schema.SchemaPlus; +import org.apache.samza.application.StreamApplicationDescriptor; import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.StreamGraph; import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor; import org.apache.samza.sql.data.RexToJavaCompiler; import org.apache.samza.sql.data.SamzaSqlExecutionContext; @@ -42,13 +41,13 @@ import org.apache.samza.sql.interfaces.SamzaRelConverter; /** - * State that is maintained while translating the Calcite relational graph to Samza {@link StreamGraph}. + * State that is maintained while translating the Calcite relational graph to Samza {@link StreamApplicationDescriptor}. */ public class TranslatorContext implements Cloneable { /** * The internal variables that are shared among all cloned {@link TranslatorContext} */ - private final StreamGraph streamGraph; + private final StreamApplicationDescriptor streamAppDesc; private final RexToJavaCompiler compiler; private final Map<String, SamzaRelConverter> relSamzaConverters; private final Map<Integer, MessageStream> messageStreams; @@ -122,7 +121,7 @@ public class TranslatorContext implements Cloneable { * @param other the original object to copy from */ private TranslatorContext(TranslatorContext other) { - this.streamGraph = other.streamGraph; + this.streamAppDesc = other.streamAppDesc; this.compiler = other.compiler; this.relSamzaConverters = other.relSamzaConverters; this.messageStreams = other.messageStreams; @@ -134,13 +133,13 @@ public class TranslatorContext implements Cloneable { /** * Create the instance of TranslatorContext - * @param streamGraph Samza's streamGraph that is populated during the translation. + * @param stramAppDesc Samza's streamAppDesc that is populated during the translation. * @param relRoot Root of the relational graph from calcite. * @param executionContext the execution context * @param converters the map of schema to RelData converters */ - TranslatorContext(StreamGraph streamGraph, RelRoot relRoot, SamzaSqlExecutionContext executionContext, Map<String, SamzaRelConverter> converters) { - this.streamGraph = streamGraph; + TranslatorContext(StreamApplicationDescriptor stramAppDesc, RelRoot relRoot, SamzaSqlExecutionContext executionContext, Map<String, SamzaRelConverter> converters) { + this.streamAppDesc = stramAppDesc; this.compiler = createExpressionCompiler(relRoot); this.executionContext = executionContext; this.dataContext = new DataContextImpl(); @@ -155,8 +154,8 @@ public class TranslatorContext implements Cloneable { * * @return the stream graph */ - public StreamGraph getStreamGraph() { - return streamGraph; + public StreamApplicationDescriptor getStreamAppDescriptor() { + return streamAppDesc; } /** http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java b/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java index c4cacbd..cc339f1 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java @@ -45,8 +45,8 @@ public class TestSamzaSqlTable { String sql1 = "Insert into testDb.testTable.`$table` select id, name from testavro.SIMPLE1"; List<String> sqlStmts = Arrays.asList(sql1); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); - runner.runAndWaitForFinish(); + SamzaSqlApplicationRunner appRunnable = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); + appRunnable.runAndWaitForFinish(); Assert.assertEquals(numMessages, TestIOResolverFactory.TestTable.records.size()); } @@ -61,8 +61,8 @@ public class TestSamzaSqlTable { String sql1 = "Insert into testDb.testTable.`$table` select id __key__, name from testavro.SIMPLE1"; List<String> sqlStmts = Arrays.asList(sql1); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); - runner.runAndWaitForFinish(); + SamzaSqlApplicationRunner appRunnable = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); + appRunnable.runAndWaitForFinish(); Assert.assertEquals(numMessages, TestIOResolverFactory.TestTable.records.size()); } http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java index b6dcac5..9fab5d5 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java @@ -25,11 +25,9 @@ import org.apache.samza.config.Config; import org.apache.samza.config.MapConfig; import org.apache.samza.runtime.LocalApplicationRunner; import org.apache.samza.runtime.RemoteApplicationRunner; -import org.apache.samza.sql.runner.SamzaSqlApplicationRunner; import org.apache.samza.sql.testutil.SamzaSqlTestConfig; import org.junit.Assert; -import org.apache.samza.sql.runner.SamzaSqlApplicationConfig; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java index 88ce443..e7c2195 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java @@ -25,12 +25,12 @@ import java.util.HashSet; import org.apache.calcite.DataContext; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.logical.LogicalFilter; +import org.apache.samza.application.StreamApplicationDescriptorImpl; import org.apache.samza.config.Config; import org.apache.samza.container.TaskContextImpl; import org.apache.samza.container.TaskName; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.MessageStreamImpl; -import org.apache.samza.operators.StreamGraphSpec; import org.apache.samza.operators.functions.FilterFunction; import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.operators.spec.StreamOperatorSpec; @@ -73,7 +73,7 @@ public class TestFilterTranslator extends TranslatorTestBase { when(mockFilter.getInput()).thenReturn(mockInput); when(mockInput.getId()).thenReturn(1); when(mockFilter.getId()).thenReturn(2); - StreamGraphSpec mockGraph = mock(StreamGraphSpec.class); + StreamApplicationDescriptorImpl mockGraph = mock(StreamApplicationDescriptorImpl.class); OperatorSpec<Object, SamzaSqlRelMessage> mockInputOp = mock(OperatorSpec.class); MessageStream<SamzaSqlRelMessage> mockStream = new MessageStreamImpl<>(mockGraph, mockInputOp); when(mockContext.getMessageStream(eq(1))).thenReturn(mockStream); @@ -95,7 +95,7 @@ public class TestFilterTranslator extends TranslatorTestBase { assertNotNull(filterSpec); assertEquals(filterSpec.getOpCode(), OperatorSpec.OpCode.FILTER); - // Verify that the init() method will establish the context for the filter function + // Verify that the describe() method will establish the context for the filter function Config mockConfig = mock(Config.class); TaskContextImpl taskContext = new TaskContextImpl(new TaskName("Partition-1"), null, null, new HashSet<>(), null, null, null, null, null, null); http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java index 7395a3d..f0a8a89 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java @@ -33,9 +33,9 @@ import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexNode; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.samza.application.StreamApplicationDescriptorImpl; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.MessageStreamImpl; -import org.apache.samza.operators.StreamGraphSpec; import org.apache.samza.operators.TableDescriptor; import org.apache.samza.operators.functions.StreamTableJoinFunction; import org.apache.samza.operators.spec.InputOperatorSpec; @@ -132,22 +132,22 @@ public class TestJoinTranslator extends TranslatorTestBase { when(mockRightInput.getRowType()).thenReturn(mockRightRowType); when(mockRightRowType.getFieldNames()).thenReturn(rightStreamFieldNames); - StreamGraphSpec mockGraph = mock(StreamGraphSpec.class); + StreamApplicationDescriptorImpl mockAppDesc = mock(StreamApplicationDescriptorImpl.class); OperatorSpec<Object, SamzaSqlRelMessage> mockLeftInputOp = mock(OperatorSpec.class); - MessageStream<SamzaSqlRelMessage> mockLeftInputStream = new MessageStreamImpl<>(mockGraph, mockLeftInputOp); + MessageStream<SamzaSqlRelMessage> mockLeftInputStream = new MessageStreamImpl<>(mockAppDesc, mockLeftInputOp); when(mockContext.getMessageStream(eq(mockLeftInput.getId()))).thenReturn(mockLeftInputStream); OperatorSpec<Object, SamzaSqlRelMessage> mockRightInputOp = mock(OperatorSpec.class); - MessageStream<SamzaSqlRelMessage> mockRightInputStream = new MessageStreamImpl<>(mockGraph, mockRightInputOp); + MessageStream<SamzaSqlRelMessage> mockRightInputStream = new MessageStreamImpl<>(mockAppDesc, mockRightInputOp); when(mockContext.getMessageStream(eq(mockRightInput.getId()))).thenReturn(mockRightInputStream); - when(mockContext.getStreamGraph()).thenReturn(mockGraph); + when(mockContext.getStreamAppDescriptor()).thenReturn(mockAppDesc); InputOperatorSpec mockInputOp = mock(InputOperatorSpec.class); OutputStreamImpl mockOutputStream = mock(OutputStreamImpl.class); when(mockInputOp.isKeyed()).thenReturn(true); when(mockOutputStream.isKeyed()).thenReturn(true); IntermediateMessageStreamImpl - mockPartitionedStream = new IntermediateMessageStreamImpl(mockGraph, mockInputOp, mockOutputStream); - when(mockGraph.getIntermediateStream(any(String.class), any(Serde.class), eq(false))).thenReturn(mockPartitionedStream); + mockPartitionedStream = new IntermediateMessageStreamImpl(mockAppDesc, mockInputOp, mockOutputStream); + when(mockAppDesc.getIntermediateStream(any(String.class), any(Serde.class), eq(false))).thenReturn(mockPartitionedStream); doAnswer(this.getRegisterMessageStreamAnswer()).when(mockContext).registerMessageStream(eq(3), any(MessageStream.class)); RexToJavaCompiler mockCompiler = mock(RexToJavaCompiler.class); @@ -155,7 +155,7 @@ public class TestJoinTranslator extends TranslatorTestBase { Expression mockExpr = mock(Expression.class); when(mockCompiler.compile(any(), any())).thenReturn(mockExpr); - doAnswer(this.getRegisteredTableAnswer()).when(mockGraph).getTable(any(RocksDbTableDescriptor.class)); + doAnswer(this.getRegisteredTableAnswer()).when(mockAppDesc).getTable(any(RocksDbTableDescriptor.class)); when(mockJoin.getJoinType()).thenReturn(JoinRelType.INNER); SqlIOResolver mockResolver = mock(SqlIOResolver.class); SqlIOConfig mockIOConfig = mock(SqlIOConfig.class); http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java index f84dd3f..1acfc47 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java @@ -32,12 +32,12 @@ import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexNode; import org.apache.calcite.sql.validate.SqlUserDefinedFunction; import org.apache.calcite.util.Pair; +import org.apache.samza.application.StreamApplicationDescriptorImpl; import org.apache.samza.config.Config; import org.apache.samza.container.TaskContextImpl; import org.apache.samza.container.TaskName; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.MessageStreamImpl; -import org.apache.samza.operators.StreamGraphSpec; import org.apache.samza.operators.functions.MapFunction; import org.apache.samza.operators.functions.TimerFunction; import org.apache.samza.operators.functions.WatermarkFunction; @@ -91,9 +91,9 @@ public class TestProjectTranslator extends TranslatorTestBase { List<Pair<RexNode, String>> namedProjects = new ArrayList<>(); namedProjects.add(Pair.of(mockRexField, "test_field")); when(mockProject.getNamedProjects()).thenReturn(namedProjects); - StreamGraphSpec mockGraph = mock(StreamGraphSpec.class); + StreamApplicationDescriptorImpl mockAppDesc = mock(StreamApplicationDescriptorImpl.class); OperatorSpec<Object, SamzaSqlRelMessage> mockInputOp = mock(OperatorSpec.class); - MessageStream<SamzaSqlRelMessage> mockStream = new MessageStreamImpl<>(mockGraph, mockInputOp); + MessageStream<SamzaSqlRelMessage> mockStream = new MessageStreamImpl<>(mockAppDesc, mockInputOp); when(mockContext.getMessageStream(eq(1))).thenReturn(mockStream); doAnswer(this.getRegisterMessageStreamAnswer()).when(mockContext).registerMessageStream(eq(2), any(MessageStream.class)); RexToJavaCompiler mockCompiler = mock(RexToJavaCompiler.class); @@ -113,7 +113,7 @@ public class TestProjectTranslator extends TranslatorTestBase { assertNotNull(projectSpec); assertEquals(projectSpec.getOpCode(), OperatorSpec.OpCode.MAP); - // Verify that the init() method will establish the context for the map function + // Verify that the bootstrap() method will establish the context for the map function Config mockConfig = mock(Config.class); TaskContextImpl taskContext = new TaskContextImpl(new TaskName("Partition-1"), null, null, new HashSet<>(), null, null, null, null, null, null); @@ -183,7 +183,7 @@ public class TestProjectTranslator extends TranslatorTestBase { flattenProjects.add(mockFlattenProject); when(mockProject.getProjects()).thenReturn(flattenProjects); - StreamGraphSpec mockGraph = mock(StreamGraphSpec.class); + StreamApplicationDescriptorImpl mockAppDesc = mock(StreamApplicationDescriptorImpl.class); OperatorSpec<Object, SamzaSqlRelMessage> mockInputOp = new OperatorSpec(OperatorSpec.OpCode.INPUT, "1") { @Override @@ -197,7 +197,7 @@ public class TestProjectTranslator extends TranslatorTestBase { } }; - MessageStream<SamzaSqlRelMessage> mockStream = new MessageStreamImpl<>(mockGraph, mockInputOp); + MessageStream<SamzaSqlRelMessage> mockStream = new MessageStreamImpl<>(mockAppDesc, mockInputOp); when(mockContext.getMessageStream(eq(1))).thenReturn(mockStream); doAnswer(this.getRegisterMessageStreamAnswer()).when(mockContext).registerMessageStream(eq(2), any(MessageStream.class)); RexToJavaCompiler mockCompiler = mock(RexToJavaCompiler.class); @@ -248,7 +248,7 @@ public class TestProjectTranslator extends TranslatorTestBase { assertNotNull(projectSpec); assertEquals(projectSpec.getOpCode(), OperatorSpec.OpCode.MAP); - // Verify that the init() method will establish the context for the map function + // Verify that the describe() method will establish the context for the map function Config mockConfig = mock(Config.class); TaskContextImpl taskContext = new TaskContextImpl(new TaskName("Partition-1"), null, null, new HashSet<>(), null, null, null, null, null, null); http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java index 1776067..c9f59e6 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java @@ -19,20 +19,20 @@ package org.apache.samza.sql.translator; -import java.util.HashSet; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import org.apache.samza.SamzaException; +import org.apache.samza.application.StreamApplicationDescriptorImpl; import org.apache.samza.config.Config; import org.apache.samza.config.MapConfig; import org.apache.samza.config.StreamConfig; import org.apache.samza.container.TaskContextImpl; import org.apache.samza.container.TaskName; import org.apache.samza.operators.OperatorSpecGraph; -import org.apache.samza.operators.StreamGraphSpec; -import org.apache.samza.sql.data.SamzaSqlExecutionContext; import org.apache.samza.operators.spec.OperatorSpec; +import org.apache.samza.sql.data.SamzaSqlExecutionContext; import org.apache.samza.sql.impl.ConfigBasedIOResolverFactory; import org.apache.samza.sql.runner.SamzaSqlApplicationConfig; import org.apache.samza.sql.runner.SamzaSqlApplicationRunner; @@ -49,8 +49,7 @@ public class TestQueryTranslator { private void validateClonedTranslatorContext(TranslatorContext originContext, TranslatorContext clonedContext) { Assert.assertNotEquals(originContext, clonedContext); Assert.assertTrue(originContext.getExpressionCompiler() == clonedContext.getExpressionCompiler()); - Assert.assertTrue(originContext.getStreamGraph() == clonedContext.getStreamGraph()); - Assert.assertTrue(originContext.getExpressionCompiler() == clonedContext.getExpressionCompiler()); + Assert.assertTrue(originContext.getStreamAppDescriptor() == clonedContext.getStreamAppDescriptor()); Assert.assertTrue(Whitebox.getInternalState(originContext, "relSamzaConverters") == Whitebox.getInternalState(clonedContext, "relSamzaConverters")); Assert.assertTrue(Whitebox.getInternalState(originContext, "messageStreams") == Whitebox.getInternalState(clonedContext, "messageStreams")); Assert.assertTrue(Whitebox.getInternalState(originContext, "relNodes") == Whitebox.getInternalState(clonedContext, "relNodes")); @@ -85,10 +84,10 @@ public class TestQueryTranslator { SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamGraphSpec - graphSpec = new StreamGraphSpec(samzaConfig); - translator.translate(queryInfo, graphSpec); - OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph(); + StreamApplicationDescriptorImpl appDesc = new StreamApplicationDescriptorImpl(descriptor -> { },samzaConfig); + + translator.translate(queryInfo, appDesc); + OperatorSpecGraph specGraph = appDesc.getOperatorSpecGraph(); StreamConfig streamConfig = new StreamConfig(samzaConfig); String inputStreamId = specGraph.getInputOperators().keySet().stream().findFirst().get(); @@ -97,29 +96,29 @@ public class TestQueryTranslator { String outputStreamId = specGraph.getOutputStreams().keySet().stream().findFirst().get(); String outputSystem = streamConfig.getSystem(outputStreamId); String outputPhysicalName = streamConfig.getPhysicalName(outputStreamId); - + Assert.assertEquals(1, specGraph.getOutputStreams().size()); Assert.assertEquals("testavro", outputSystem); Assert.assertEquals("outputTopic", outputPhysicalName); Assert.assertEquals(1, specGraph.getInputOperators().size()); - + Assert.assertEquals("testavro", inputSystem); Assert.assertEquals("SIMPLE1", inputPhysicalName); - validatePerTaskContextInit(graphSpec, samzaConfig); + validatePerTaskContextInit(appDesc, samzaConfig); } - private void validatePerTaskContextInit(StreamGraphSpec graphSpec, Config samzaConfig) { + private void validatePerTaskContextInit(StreamApplicationDescriptorImpl appDesc, Config samzaConfig) { // make sure that each task context would have a separate instance of cloned TranslatorContext TaskContextImpl testContext = new TaskContextImpl(new TaskName("Partition 1"), null, null, new HashSet<>(), null, null, null, null, null, null); - // call ContextManager.init() to instantiate the per-task TranslatorContext - graphSpec.getContextManager().init(samzaConfig, testContext); + // call ContextManager.bootstrap() to instantiate the per-task TranslatorContext + appDesc.getContextManager().init(samzaConfig, testContext); Assert.assertNotNull(testContext.getUserContext()); Assert.assertTrue(testContext.getUserContext() instanceof TranslatorContext); TranslatorContext contextPerTaskOne = (TranslatorContext) testContext.getUserContext(); - // call ContextManager.init() second time to instantiate another clone of TranslatorContext - graphSpec.getContextManager().init(samzaConfig, testContext); + // call ContextManager.bootstrap() second time to instantiate another clone of TranslatorContext + appDesc.getContextManager().init(samzaConfig, testContext); Assert.assertTrue(testContext.getUserContext() instanceof TranslatorContext); // validate the two copies of TranslatorContext are clones of each other validateClonedTranslatorContext(contextPerTaskOne, (TranslatorContext) testContext.getUserContext()); @@ -137,9 +136,10 @@ public class TestQueryTranslator { SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamGraphSpec graphSpec = new StreamGraphSpec(samzaConfig); - translator.translate(queryInfo, graphSpec); - OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph(); + StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig); + + translator.translate(queryInfo, streamAppDesc); + OperatorSpecGraph specGraph = streamAppDesc.getOperatorSpecGraph(); StreamConfig streamConfig = new StreamConfig(samzaConfig); String inputStreamId = specGraph.getInputOperators().keySet().stream().findFirst().get(); @@ -156,7 +156,7 @@ public class TestQueryTranslator { Assert.assertEquals("testavro", inputSystem); Assert.assertEquals("COMPLEX1", inputPhysicalName); - validatePerTaskContextInit(graphSpec, samzaConfig); + validatePerTaskContextInit(streamAppDesc, samzaConfig); } @Test @@ -168,10 +168,10 @@ public class TestQueryTranslator { SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamGraphSpec - graphSpec = new StreamGraphSpec(samzaConfig); - translator.translate(queryInfo, graphSpec); - OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph(); + StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig); + + translator.translate(queryInfo, streamAppDesc); + OperatorSpecGraph specGraph = streamAppDesc.getOperatorSpecGraph(); StreamConfig streamConfig = new StreamConfig(samzaConfig); String inputStreamId = specGraph.getInputOperators().keySet().stream().findFirst().get(); @@ -188,7 +188,7 @@ public class TestQueryTranslator { Assert.assertEquals("testavro", inputSystem); Assert.assertEquals("COMPLEX1", inputPhysicalName); - validatePerTaskContextInit(graphSpec, samzaConfig); + validatePerTaskContextInit(streamAppDesc, samzaConfig); } @Test (expected = SamzaException.class) @@ -204,9 +204,9 @@ public class TestQueryTranslator { SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamGraphSpec - graphSpec = new StreamGraphSpec(samzaConfig); - translator.translate(queryInfo, graphSpec); + StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig); + + translator.translate(queryInfo, streamAppDesc); } @Test (expected = SamzaException.class) @@ -223,9 +223,9 @@ public class TestQueryTranslator { SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamGraphSpec - graphSpec = new StreamGraphSpec(samzaConfig); - translator.translate(queryInfo, graphSpec); + StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig); + + translator.translate(queryInfo, streamAppDesc); } @Test (expected = IllegalStateException.class) @@ -242,9 +242,9 @@ public class TestQueryTranslator { SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamGraphSpec - graphSpec = new StreamGraphSpec(samzaConfig); - translator.translate(queryInfo, graphSpec); + StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig); + + translator.translate(queryInfo, streamAppDesc); } @Test (expected = SamzaException.class) @@ -261,9 +261,8 @@ public class TestQueryTranslator { SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamGraphSpec - graphSpec = new StreamGraphSpec(samzaConfig); - translator.translate(queryInfo, graphSpec); + StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig); + translator.translate(queryInfo, streamAppDesc); } @Test (expected = SamzaException.class) @@ -278,9 +277,8 @@ public class TestQueryTranslator { SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamGraphSpec - graphSpec = new StreamGraphSpec(samzaConfig); - translator.translate(queryInfo, graphSpec); + StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig); + translator.translate(queryInfo, streamAppDesc); } @Test (expected = SamzaException.class) @@ -297,9 +295,8 @@ public class TestQueryTranslator { SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamGraphSpec - graphSpec = new StreamGraphSpec(samzaConfig); - translator.translate(queryInfo, graphSpec); + StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig); + translator.translate(queryInfo, streamAppDesc); } @Test (expected = SamzaException.class) @@ -317,9 +314,8 @@ public class TestQueryTranslator { SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamGraphSpec - graphSpec = new StreamGraphSpec(samzaConfig); - translator.translate(queryInfo, graphSpec); + StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig); + translator.translate(queryInfo, streamAppDesc); } @Test (expected = SamzaException.class) @@ -336,9 +332,8 @@ public class TestQueryTranslator { SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamGraphSpec - graphSpec = new StreamGraphSpec(samzaConfig); - translator.translate(queryInfo, graphSpec); + StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig); + translator.translate(queryInfo, streamAppDesc); } @Test (expected = SamzaException.class) @@ -355,9 +350,8 @@ public class TestQueryTranslator { SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamGraphSpec - graphSpec = new StreamGraphSpec(samzaConfig); - translator.translate(queryInfo, graphSpec); + StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig); + translator.translate(queryInfo, streamAppDesc); } @Test (expected = SamzaException.class) @@ -374,9 +368,8 @@ public class TestQueryTranslator { SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamGraphSpec - graphSpec = new StreamGraphSpec(samzaConfig); - translator.translate(queryInfo, graphSpec); + StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig); + translator.translate(queryInfo, streamAppDesc); } @Test (expected = SamzaException.class) @@ -393,9 +386,8 @@ public class TestQueryTranslator { SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamGraphSpec - graphSpec = new StreamGraphSpec(samzaConfig); - translator.translate(queryInfo, graphSpec); + StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig); + translator.translate(queryInfo, streamAppDesc); } @Test (expected = SamzaException.class) @@ -416,9 +408,8 @@ public class TestQueryTranslator { SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamGraphSpec - graphSpec = new StreamGraphSpec(samzaConfig); - translator.translate(queryInfo, graphSpec); + StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig); + translator.translate(queryInfo, streamAppDesc); } @Test (expected = SamzaException.class) @@ -435,9 +426,8 @@ public class TestQueryTranslator { SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamGraphSpec - graphSpec = new StreamGraphSpec(samzaConfig); - translator.translate(queryInfo, graphSpec); + StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig); + translator.translate(queryInfo, streamAppDesc); } @Test @@ -454,10 +444,10 @@ public class TestQueryTranslator { SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamGraphSpec - graphSpec = new StreamGraphSpec(samzaConfig); - translator.translate(queryInfo, graphSpec); - OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph(); + StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig); + + translator.translate(queryInfo, streamAppDesc); + OperatorSpecGraph specGraph = streamAppDesc.getOperatorSpecGraph(); StreamConfig streamConfig = new StreamConfig(samzaConfig); String input1StreamId = specGraph.getInputOperators().keySet().stream().findFirst().get(); @@ -490,7 +480,7 @@ public class TestQueryTranslator { Assert.assertEquals("kafka", input3System); Assert.assertEquals("sql-job-1-partition_by-stream_1", input3PhysicalName); - validatePerTaskContextInit(graphSpec, samzaConfig); + validatePerTaskContextInit(streamAppDesc, samzaConfig); } @Test @@ -507,11 +497,11 @@ public class TestQueryTranslator { SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamGraphSpec - graphSpec = new StreamGraphSpec(samzaConfig); - translator.translate(queryInfo, graphSpec); + StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig); - OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph(); + translator.translate(queryInfo, streamAppDesc); + + OperatorSpecGraph specGraph = streamAppDesc.getOperatorSpecGraph(); StreamConfig streamConfig = new StreamConfig(samzaConfig); String input1StreamId = specGraph.getInputOperators().keySet().stream().findFirst().get(); @@ -544,7 +534,7 @@ public class TestQueryTranslator { Assert.assertEquals("kafka", input3System); Assert.assertEquals("sql-job-1-partition_by-stream_1", input3PhysicalName); - validatePerTaskContextInit(graphSpec, samzaConfig); + validatePerTaskContextInit(streamAppDesc, samzaConfig); } @Test @@ -561,11 +551,10 @@ public class TestQueryTranslator { SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamGraphSpec - graphSpec = new StreamGraphSpec(samzaConfig); - translator.translate(queryInfo, graphSpec); + StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig); + translator.translate(queryInfo, streamAppDesc); - OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph(); + OperatorSpecGraph specGraph = streamAppDesc.getOperatorSpecGraph(); StreamConfig streamConfig = new StreamConfig(samzaConfig); String input1StreamId = specGraph.getInputOperators().keySet().stream().findFirst().get(); @@ -598,7 +587,7 @@ public class TestQueryTranslator { Assert.assertEquals("kafka", input3System); Assert.assertEquals("sql-job-1-partition_by-stream_1", input3PhysicalName); - validatePerTaskContextInit(graphSpec, samzaConfig); + validatePerTaskContextInit(streamAppDesc, samzaConfig); } @Test @@ -615,10 +604,10 @@ public class TestQueryTranslator { SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamGraphSpec - graphSpec = new StreamGraphSpec(samzaConfig); - translator.translate(queryInfo, graphSpec); - OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph(); + StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig); + + translator.translate(queryInfo, streamAppDesc); + OperatorSpecGraph specGraph = streamAppDesc.getOperatorSpecGraph(); Assert.assertEquals(1, specGraph.getInputOperators().size()); Assert.assertEquals(1, specGraph.getOutputStreams().size()); @@ -639,8 +628,7 @@ public class TestQueryTranslator { SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamGraphSpec - graphSpec = new StreamGraphSpec(samzaConfig); - translator.translate(queryInfo, graphSpec); + StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, samzaConfig); + translator.translate(queryInfo, streamAppDesc); } } http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java b/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java index d7f805d..7d5e0d2 100644 --- a/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java +++ b/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java @@ -19,15 +19,17 @@ package org.apache.samza.example; import java.time.Duration; +import java.util.HashMap; import org.apache.samza.application.StreamApplication; +import org.apache.samza.application.StreamApplicationDescriptor; import org.apache.samza.config.Config; import org.apache.samza.operators.KV; -import org.apache.samza.operators.StreamGraph; import org.apache.samza.operators.triggers.Triggers; import org.apache.samza.operators.windows.AccumulationMode; import org.apache.samza.operators.windows.WindowPane; import org.apache.samza.operators.windows.Windows; -import org.apache.samza.runtime.LocalApplicationRunner; +import org.apache.samza.runtime.ApplicationRunner; +import org.apache.samza.runtime.ApplicationRunners; import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.StringSerde; @@ -46,14 +48,14 @@ public class AppWithGlobalConfigExample implements StreamApplication { public static void main(String[] args) { CommandLine cmdLine = new CommandLine(); Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); - LocalApplicationRunner runner = new LocalApplicationRunner(config); - AppWithGlobalConfigExample app = new AppWithGlobalConfigExample(); - runner.run(app); + ApplicationRunner runner = ApplicationRunners.getApplicationRunner(new AppWithGlobalConfigExample(), config); + + runner.run(); runner.waitForFinish(); } @Override - public void init(StreamGraph graph, Config config) { + public void describe(StreamApplicationDescriptor appDesc) { KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking"); KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor = @@ -63,12 +65,15 @@ public class AppWithGlobalConfigExample implements StreamApplication { trackingSystem.getOutputDescriptor("pageViewEventPerMember", KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewCount.class))); - graph.getInputStream(inputStreamDescriptor) - .window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(m -> m.memberId, Duration.ofSeconds(10), () -> 0, (m, c) -> c + 1, null, null) + appDesc.getInputStream(inputStreamDescriptor) + .window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(m -> m.memberId, Duration.ofSeconds(10), () -> 0, (m, c) -> c + 1, + null, null) .setEarlyTrigger(Triggers.repeat(Triggers.count(5))) - .setAccumulationMode(AccumulationMode.DISCARDING), "w1") + .setAccumulationMode(AccumulationMode.DISCARDING), "window1") .map(m -> KV.of(m.getKey().getKey(), new PageViewCount(m))) - .sendTo(graph.getOutputStream(outputStreamDescriptor)); + .sendTo(appDesc.getOutputStream(outputStreamDescriptor)); + + appDesc.withMetricsReporterFactories(new HashMap<>()); } class PageViewEvent { http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java b/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java index 1c1b4be..4ef2402 100644 --- a/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java +++ b/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java @@ -20,11 +20,12 @@ package org.apache.samza.example; import org.apache.samza.application.StreamApplication; +import org.apache.samza.application.StreamApplicationDescriptor; import org.apache.samza.config.Config; import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.StreamGraph; -import org.apache.samza.runtime.LocalApplicationRunner; +import org.apache.samza.runtime.ApplicationRunner; +import org.apache.samza.runtime.ApplicationRunners; import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.StringSerde; @@ -43,16 +44,13 @@ public class BroadcastExample implements StreamApplication { public static void main(String[] args) throws Exception { CommandLine cmdLine = new CommandLine(); Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); - - StreamApplication app = new BroadcastExample(); - LocalApplicationRunner runner = new LocalApplicationRunner(config); - - runner.run(app); + ApplicationRunner runner = ApplicationRunners.getApplicationRunner(new BroadcastExample(), config); + runner.run(); runner.waitForFinish(); } @Override - public void init(StreamGraph graph, Config config) { + public void describe(StreamApplicationDescriptor appDesc) { KVSerde<String, PageViewEvent> serde = KVSerde.of(new StringSerde("UTF-8"), new JsonSerdeV2<>(PageViewEvent.class)); KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking"); KafkaInputDescriptor<KV<String, PageViewEvent>> pageViewEvent = @@ -64,10 +62,10 @@ public class BroadcastExample implements StreamApplication { KafkaOutputDescriptor<KV<String, PageViewEvent>> outStream3 = trackingSystem.getOutputDescriptor("outStream3", serde); - MessageStream<KV<String, PageViewEvent>> inputStream = graph.getInputStream(pageViewEvent); - inputStream.filter(m -> m.key.equals("key1")).sendTo(graph.getOutputStream(outStream1)); - inputStream.filter(m -> m.key.equals("key2")).sendTo(graph.getOutputStream(outStream2)); - inputStream.filter(m -> m.key.equals("key3")).sendTo(graph.getOutputStream(outStream3)); + MessageStream<KV<String, PageViewEvent>> inputStream = appDesc.getInputStream(pageViewEvent); + inputStream.filter(m -> m.key.equals("key1")).sendTo(appDesc.getOutputStream(outStream1)); + inputStream.filter(m -> m.key.equals("key2")).sendTo(appDesc.getOutputStream(outStream2)); + inputStream.filter(m -> m.key.equals("key3")).sendTo(appDesc.getOutputStream(outStream3)); } class PageViewEvent { http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java b/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java index 4d3307b..dfc4b42 100644 --- a/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java +++ b/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java @@ -18,19 +18,19 @@ */ package org.apache.samza.example; - import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.concurrent.TimeUnit; import org.apache.samza.application.StreamApplication; +import org.apache.samza.application.StreamApplicationDescriptor; import org.apache.samza.config.Config; import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OutputStream; -import org.apache.samza.operators.StreamGraph; import org.apache.samza.operators.functions.FlatMapFunction; -import org.apache.samza.runtime.LocalApplicationRunner; +import org.apache.samza.runtime.ApplicationRunner; +import org.apache.samza.runtime.ApplicationRunners; import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.StringSerde; @@ -51,15 +51,14 @@ public class KeyValueStoreExample implements StreamApplication { public static void main(String[] args) throws Exception { CommandLine cmdLine = new CommandLine(); Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); - KeyValueStoreExample app = new KeyValueStoreExample(); - LocalApplicationRunner runner = new LocalApplicationRunner(config); + ApplicationRunner runner = ApplicationRunners.getApplicationRunner(new KeyValueStoreExample(), config); - runner.run(app); + runner.run(); runner.waitForFinish(); } @Override - public void init(StreamGraph graph, Config config) { + public void describe(StreamApplicationDescriptor appDesc) { KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking"); KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor = @@ -69,9 +68,9 @@ public class KeyValueStoreExample implements StreamApplication { trackingSystem.getOutputDescriptor("pageViewEventPerMember", KVSerde.of(new StringSerde(), new JsonSerdeV2<>(StatsOutput.class))); - graph.setDefaultSystem(trackingSystem); - MessageStream<PageViewEvent> pageViewEvents = graph.getInputStream(inputStreamDescriptor); - OutputStream<KV<String, StatsOutput>> pageViewEventPerMember = graph.getOutputStream(outputStreamDescriptor); + appDesc.withDefaultSystem(trackingSystem); + MessageStream<PageViewEvent> pageViewEvents = appDesc.getInputStream(inputStreamDescriptor); + OutputStream<KV<String, StatsOutput>> pageViewEventPerMember = appDesc.getOutputStream(outputStreamDescriptor); pageViewEvents .partitionBy(pve -> pve.memberId, pve -> pve, http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/main/java/org/apache/samza/example/MergeExample.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/example/MergeExample.java b/samza-test/src/main/java/org/apache/samza/example/MergeExample.java index 33d60d6..fe018f3 100644 --- a/samza-test/src/main/java/org/apache/samza/example/MergeExample.java +++ b/samza-test/src/main/java/org/apache/samza/example/MergeExample.java @@ -22,11 +22,12 @@ package org.apache.samza.example; import com.google.common.collect.ImmutableList; import org.apache.samza.application.StreamApplication; +import org.apache.samza.application.StreamApplicationDescriptor; import org.apache.samza.config.Config; -import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.StreamGraph; -import org.apache.samza.runtime.LocalApplicationRunner; +import org.apache.samza.runtime.ApplicationRunner; +import org.apache.samza.runtime.ApplicationRunners; +import org.apache.samza.operators.KV; import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.StringSerde; @@ -41,15 +42,14 @@ public class MergeExample implements StreamApplication { public static void main(String[] args) throws Exception { CommandLine cmdLine = new CommandLine(); Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); - MergeExample app = new MergeExample(); - LocalApplicationRunner runner = new LocalApplicationRunner(config); + ApplicationRunner runner = ApplicationRunners.getApplicationRunner(new MergeExample(), config); - runner.run(app); + runner.run(); runner.waitForFinish(); } @Override - public void init(StreamGraph graph, Config config) { + public void describe(StreamApplicationDescriptor appDesc) { KVSerde<String, PageViewEvent> serde = KVSerde.of(new StringSerde("UTF-8"), new JsonSerdeV2<>(PageViewEvent.class)); KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking"); @@ -64,8 +64,8 @@ public class MergeExample implements StreamApplication { trackingSystem.getOutputDescriptor("mergedStream", serde); MessageStream - .mergeAll(ImmutableList.of(graph.getInputStream(isd1), graph.getInputStream(isd2), graph.getInputStream(isd3))) - .sendTo(graph.getOutputStream(osd)); + .mergeAll(ImmutableList.of(appDesc.getInputStream(isd1), appDesc.getInputStream(isd2), appDesc.getInputStream(isd3))) + .sendTo(appDesc.getOutputStream(osd)); } class PageViewEvent { http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java b/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java index 34b5fc6..8d3812b 100644 --- a/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java +++ b/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java @@ -20,11 +20,12 @@ package org.apache.samza.example; import java.time.Duration; import org.apache.samza.application.StreamApplication; +import org.apache.samza.application.StreamApplicationDescriptor; import org.apache.samza.config.Config; import org.apache.samza.operators.KV; -import org.apache.samza.operators.StreamGraph; import org.apache.samza.operators.functions.JoinFunction; -import org.apache.samza.runtime.LocalApplicationRunner; +import org.apache.samza.runtime.ApplicationRunner; +import org.apache.samza.runtime.ApplicationRunners; import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.StringSerde; @@ -43,15 +44,13 @@ public class OrderShipmentJoinExample implements StreamApplication { public static void main(String[] args) throws Exception { CommandLine cmdLine = new CommandLine(); Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); - OrderShipmentJoinExample app = new OrderShipmentJoinExample(); - LocalApplicationRunner runner = new LocalApplicationRunner(config); - - runner.run(app); + ApplicationRunner runner = ApplicationRunners.getApplicationRunner(new OrderShipmentJoinExample(), config); + runner.run(); runner.waitForFinish(); } @Override - public void init(StreamGraph graph, Config config) { + public void describe(StreamApplicationDescriptor appDesc) { KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking"); KafkaInputDescriptor<OrderRecord> orderStreamDescriptor = @@ -62,12 +61,12 @@ public class OrderShipmentJoinExample implements StreamApplication { trackingSystem.getOutputDescriptor("fulfilledOrders", KVSerde.of(new StringSerde(), new JsonSerdeV2<>(FulfilledOrderRecord.class))); - graph.getInputStream(orderStreamDescriptor) - .join(graph.getInputStream(shipmentStreamDescriptor), new MyJoinFunction(), + appDesc.getInputStream(orderStreamDescriptor) + .join(appDesc.getInputStream(shipmentStreamDescriptor), new MyJoinFunction(), new StringSerde(), new JsonSerdeV2<>(OrderRecord.class), new JsonSerdeV2<>(ShipmentRecord.class), Duration.ofMinutes(1), "join") .map(fulFilledOrder -> KV.of(fulFilledOrder.orderId, fulFilledOrder)) - .sendTo(graph.getOutputStream(fulfilledOrdersStreamDescriptor)); + .sendTo(appDesc.getOutputStream(fulfilledOrdersStreamDescriptor)); } http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java b/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java index dc5eb74..b540585 100644 --- a/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java +++ b/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java @@ -19,19 +19,20 @@ package org.apache.samza.example; import java.time.Duration; +import org.apache.samza.application.StreamApplicationDescriptor; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.Config; import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OutputStream; -import org.apache.samza.operators.StreamGraph; import org.apache.samza.operators.functions.FoldLeftFunction; import org.apache.samza.operators.functions.SupplierFunction; import org.apache.samza.operators.triggers.Triggers; import org.apache.samza.operators.windows.AccumulationMode; import org.apache.samza.operators.windows.WindowPane; import org.apache.samza.operators.windows.Windows; -import org.apache.samza.runtime.LocalApplicationRunner; +import org.apache.samza.runtime.ApplicationRunner; +import org.apache.samza.runtime.ApplicationRunners; import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.StringSerde; @@ -51,14 +52,14 @@ public class PageViewCounterExample implements StreamApplication { CommandLine cmdLine = new CommandLine(); Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); PageViewCounterExample app = new PageViewCounterExample(); - LocalApplicationRunner runner = new LocalApplicationRunner(config); + ApplicationRunner runner = ApplicationRunners.getApplicationRunner(app, config); - runner.run(app); + runner.run(); runner.waitForFinish(); } @Override - public void init(StreamGraph graph, Config config) { + public void describe(StreamApplicationDescriptor appDesc) { KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking"); KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor = @@ -68,8 +69,8 @@ public class PageViewCounterExample implements StreamApplication { trackingSystem.getOutputDescriptor("pageViewEventPerMember", KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewCount.class))); - MessageStream<PageViewEvent> pageViewEvents = graph.getInputStream(inputStreamDescriptor); - OutputStream<KV<String, PageViewCount>> pageViewEventPerMemberStream = graph.getOutputStream(outputStreamDescriptor); + MessageStream<PageViewEvent> pageViewEvents = appDesc.getInputStream(inputStreamDescriptor); + OutputStream<KV<String, PageViewCount>> pageViewEventPerMemberStream = appDesc.getOutputStream(outputStreamDescriptor); SupplierFunction<Integer> initialValue = () -> 0; FoldLeftFunction<PageViewEvent, Integer> foldLeftFn = (m, c) -> c + 1; http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java b/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java index b776c7d..8a0ca28 100644 --- a/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java +++ b/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java @@ -20,14 +20,15 @@ package org.apache.samza.example; import java.time.Duration; import org.apache.samza.application.StreamApplication; +import org.apache.samza.application.StreamApplicationDescriptor; import org.apache.samza.config.Config; import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OutputStream; -import org.apache.samza.operators.StreamGraph; import org.apache.samza.operators.windows.WindowPane; import org.apache.samza.operators.windows.Windows; -import org.apache.samza.runtime.LocalApplicationRunner; +import org.apache.samza.runtime.ApplicationRunner; +import org.apache.samza.runtime.ApplicationRunners; import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.StringSerde; @@ -46,15 +47,14 @@ public class RepartitionExample implements StreamApplication { public static void main(String[] args) throws Exception { CommandLine cmdLine = new CommandLine(); Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); - RepartitionExample app = new RepartitionExample(); - LocalApplicationRunner runner = new LocalApplicationRunner(config); + ApplicationRunner runner = ApplicationRunners.getApplicationRunner(new RepartitionExample(), config); - runner.run(app); + runner.run(); runner.waitForFinish(); } @Override - public void init(StreamGraph graph, Config config) { + public void describe(StreamApplicationDescriptor appDesc) { KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking"); KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor = @@ -64,9 +64,9 @@ public class RepartitionExample implements StreamApplication { trackingSystem.getOutputDescriptor("pageViewEventPerMember", KVSerde.of(new StringSerde(), new JsonSerdeV2<>(MyStreamOutput.class))); - graph.setDefaultSystem(trackingSystem); - MessageStream<PageViewEvent> pageViewEvents = graph.getInputStream(inputStreamDescriptor); - OutputStream<KV<String, MyStreamOutput>> pageViewEventPerMember = graph.getOutputStream(outputStreamDescriptor); + appDesc.withDefaultSystem(trackingSystem); + MessageStream<PageViewEvent> pageViewEvents = appDesc.getInputStream(inputStreamDescriptor); + OutputStream<KV<String, MyStreamOutput>> pageViewEventPerMember = appDesc.getOutputStream(outputStreamDescriptor); pageViewEvents .partitionBy(pve -> pve.memberId, pve -> pve, @@ -75,7 +75,6 @@ public class RepartitionExample implements StreamApplication { KV::getKey, Duration.ofMinutes(5), () -> 0, (m, c) -> c + 1, null, null), "window") .map(windowPane -> KV.of(windowPane.getKey().getKey(), new MyStreamOutput(windowPane))) .sendTo(pageViewEventPerMember); - } class PageViewEvent { http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/main/java/org/apache/samza/example/TaskApplicationExample.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/example/TaskApplicationExample.java b/samza-test/src/main/java/org/apache/samza/example/TaskApplicationExample.java new file mode 100644 index 0000000..73dc10a --- /dev/null +++ b/samza-test/src/main/java/org/apache/samza/example/TaskApplicationExample.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.example; + +import org.apache.samza.application.TaskApplicationDescriptor; +import org.apache.samza.application.TaskApplication; +import org.apache.samza.config.Config; +import org.apache.samza.operators.TableDescriptor; +import org.apache.samza.runtime.ApplicationRunner; +import org.apache.samza.runtime.ApplicationRunners; +import org.apache.samza.serializers.StringSerde; +import org.apache.samza.storage.kv.RocksDbTableDescriptor; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.kafka.KafkaInputDescriptor; +import org.apache.samza.system.kafka.KafkaOutputDescriptor; +import org.apache.samza.system.kafka.KafkaSystemDescriptor; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.StreamTask; +import org.apache.samza.task.StreamTaskFactory; +import org.apache.samza.task.TaskCoordinator; +import org.apache.samza.util.CommandLine; + + +/** + * Test example of a low-level API application (i.e. {@link TaskApplication}) + */ +public class TaskApplicationExample implements TaskApplication { + + public class MyStreamTask implements StreamTask { + + @Override + public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) + throws Exception { + // processing logic here + } + } + + public static void main(String[] args) { + CommandLine cmdLine = new CommandLine(); + Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); + ApplicationRunner runner = ApplicationRunners.getApplicationRunner(new TaskApplicationExample(), config); + runner.run(); + runner.waitForFinish(); + } + + @Override + public void describe(TaskApplicationDescriptor appDesc) { + // add input and output streams + KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("tracking"); + KafkaInputDescriptor<String> isd = ksd.getInputDescriptor("myinput", new StringSerde()); + KafkaOutputDescriptor<String> osd = ksd.getOutputDescriptor("myout", new StringSerde()); + TableDescriptor td = new RocksDbTableDescriptor("mytable"); + + appDesc.addInputStream(isd); + appDesc.addOutputStream(osd); + appDesc.addTable(td); + // create the task factory based on configuration + appDesc.setTaskFactory((StreamTaskFactory) () -> new MyStreamTask()); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/main/java/org/apache/samza/example/WindowExample.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/example/WindowExample.java b/samza-test/src/main/java/org/apache/samza/example/WindowExample.java index cbc1e8e..2f4c19c 100644 --- a/samza-test/src/main/java/org/apache/samza/example/WindowExample.java +++ b/samza-test/src/main/java/org/apache/samza/example/WindowExample.java @@ -21,16 +21,17 @@ package org.apache.samza.example; import java.time.Duration; import org.apache.samza.application.StreamApplication; +import org.apache.samza.application.StreamApplicationDescriptor; import org.apache.samza.config.Config; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OutputStream; -import org.apache.samza.operators.StreamGraph; import org.apache.samza.operators.functions.FoldLeftFunction; import org.apache.samza.operators.functions.SupplierFunction; import org.apache.samza.operators.triggers.Triggers; import org.apache.samza.operators.windows.WindowPane; import org.apache.samza.operators.windows.Windows; -import org.apache.samza.runtime.LocalApplicationRunner; +import org.apache.samza.runtime.ApplicationRunner; +import org.apache.samza.runtime.ApplicationRunners; import org.apache.samza.serializers.IntegerSerde; import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.system.kafka.KafkaInputDescriptor; @@ -49,15 +50,14 @@ public class WindowExample implements StreamApplication { public static void main(String[] args) throws Exception { CommandLine cmdLine = new CommandLine(); Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); - WindowExample app = new WindowExample(); - LocalApplicationRunner runner = new LocalApplicationRunner(config); + ApplicationRunner runner = ApplicationRunners.getApplicationRunner(new WindowExample(), config); - runner.run(app); + runner.run(); runner.waitForFinish(); } @Override - public void init(StreamGraph graph, Config config) { + public void describe(StreamApplicationDescriptor appDesc) { KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking"); KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor = @@ -65,11 +65,10 @@ public class WindowExample implements StreamApplication { KafkaOutputDescriptor<Integer> outputStreamDescriptor = trackingSystem.getOutputDescriptor("pageViewEventPerMember", new IntegerSerde()); - MessageStream<PageViewEvent> inputStream = graph.getInputStream(inputStreamDescriptor); - OutputStream<Integer> outputStream = graph.getOutputStream(outputStreamDescriptor); - SupplierFunction<Integer> initialValue = () -> 0; FoldLeftFunction<PageViewEvent, Integer> counter = (m, c) -> c == null ? 1 : c + 1; + MessageStream<PageViewEvent> inputStream = appDesc.getInputStream(inputStreamDescriptor); + OutputStream<Integer> outputStream = appDesc.getOutputStream(outputStreamDescriptor); // create a tumbling window that outputs the number of message collected every 10 minutes. // also emit early results if either the number of messages collected reaches 30000, or if no new messages arrive @@ -80,7 +79,6 @@ public class WindowExample implements StreamApplication { Triggers.timeSinceLastMessage(Duration.ofMinutes(1)))), "window") .map(WindowPane::getMessage) .sendTo(outputStream); - } class PageViewEvent { http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemConsumer.java b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemConsumer.java index a282dbb..2e27a4c 100644 --- a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemConsumer.java +++ b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemConsumer.java @@ -34,7 +34,7 @@ import org.apache.samza.util.Clock; * MockSystemConsumer is a class that simulates a multi-threaded consumer that * uses BlockingEnvelopeMap. The primary use for this class is to do performance * testing. - * + * * This class works by starting up (threadCount) threads. Each thread adds * (messagesPerBatch) to the BlockingEnvelopeMap, then sleeps for * (brokerSleepMs). The sleep is important to simulate network latency when @@ -57,7 +57,7 @@ public class MockSystemConsumer extends BlockingEnvelopeMap { private List<Thread> threads; /** - * + * * @param messagesPerBatch * The number of messages to add to the BlockingEnvelopeMap before * sleeping. http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java index 5ca497a..477d5b8 100644 --- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java +++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java @@ -31,7 +31,9 @@ import java.util.stream.Collectors; import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.samza.SamzaException; +import org.apache.samza.application.SamzaApplication; import org.apache.samza.application.StreamApplication; +import org.apache.samza.application.TaskApplication; import org.apache.samza.config.Config; import org.apache.samza.config.InMemorySystemConfig; import org.apache.samza.config.JobConfig; @@ -56,7 +58,10 @@ import org.apache.samza.system.SystemStreamMetadata; import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.system.inmemory.InMemorySystemFactory; import org.apache.samza.task.AsyncStreamTask; +import org.apache.samza.task.AsyncStreamTaskFactory; import org.apache.samza.task.StreamTask; +import org.apache.samza.task.StreamTaskFactory; +import org.apache.samza.task.TaskFactory; import org.apache.samza.test.framework.stream.CollectionStream; import org.apache.samza.test.framework.system.CollectionStreamSystemSpec; import org.junit.Assert; @@ -284,17 +289,13 @@ public class TestRunner { public void run(Duration timeout) { Preconditions.checkState((app == null && taskClass != null) || (app != null && taskClass == null), "TestRunner should run for Low Level Task api or High Level Application Api"); - Preconditions.checkState(!timeout.isZero() || !timeout.isNegative(), - "Timeouts should be positive"); - final LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(configs)); - if (app == null) { - runner.runTask(); - } else { - runner.run(app); - } + Preconditions.checkState(!timeout.isZero() || !timeout.isNegative(), "Timeouts should be positive"); + SamzaApplication testApp = app == null ? (TaskApplication) appDesc -> appDesc.setTaskFactory(createTaskFactory()) : app; + final LocalApplicationRunner runner = new LocalApplicationRunner(testApp, new MapConfig(configs)); + runner.run(); boolean timedOut = !runner.waitForFinish(timeout); Assert.assertFalse("Timed out waiting for application to finish", timedOut); - ApplicationStatus status = runner.status(app); + ApplicationStatus status = runner.status(); if (status.getStatusCode() == ApplicationStatus.StatusCode.UnsuccessfulFinish) { throw new SamzaException(ExceptionUtils.getStackTrace(status.getThrowable())); } @@ -364,4 +365,26 @@ public class TestRunner { .collect(Collectors.toMap(entry -> entry.getKey().getPartition().getPartitionId(), entry -> entry.getValue().stream().map(e -> (T) e.getMessage()).collect(Collectors.toList()))); } + + private TaskFactory createTaskFactory() { + if (StreamTask.class.isAssignableFrom(taskClass)) { + return (StreamTaskFactory) () -> { + try { + return (StreamTask) taskClass.newInstance(); + } catch (InstantiationException | IllegalAccessException e) { + throw new SamzaException(String.format("Failed to instantiate StreamTask class %s", taskClass.getName()), e); + } + }; + } else if (AsyncStreamTask.class.isAssignableFrom(taskClass)) { + return (AsyncStreamTaskFactory) () -> { + try { + return (AsyncStreamTask) taskClass.newInstance(); + } catch (InstantiationException | IllegalAccessException e) { + throw new SamzaException(String.format("Failed to instantiate AsyncStreamTask class %s", taskClass.getName()), e); + } + }; + } + throw new SamzaException(String.format("Not supported task.class %s. task.class has to implement either StreamTask " + + "or AsyncStreamTask", taskClass.getName())); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/main/java/org/apache/samza/test/integration/LocalApplicationRunnerMain.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/LocalApplicationRunnerMain.java b/samza-test/src/main/java/org/apache/samza/test/integration/LocalApplicationRunnerMain.java index e8be592..34b264f 100644 --- a/samza-test/src/main/java/org/apache/samza/test/integration/LocalApplicationRunnerMain.java +++ b/samza-test/src/main/java/org/apache/samza/test/integration/LocalApplicationRunnerMain.java @@ -20,19 +20,18 @@ package org.apache.samza.test.integration; import joptsimple.OptionSet; -import org.apache.samza.application.StreamApplication; +import org.apache.samza.application.SamzaApplication; +import org.apache.samza.application.ApplicationUtil; import org.apache.samza.config.Config; -import org.apache.samza.runtime.ApplicationRunner; import org.apache.samza.runtime.ApplicationRunnerMain; -import org.apache.samza.runtime.ApplicationRunnerOperation; +import org.apache.samza.runtime.ApplicationRunner; +import org.apache.samza.runtime.ApplicationRunners; import org.apache.samza.util.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.samza.runtime.ApplicationRunnerMain.STREAM_APPLICATION_CLASS_CONFIG; - /** - * {@link ApplicationRunnerMain} was designed for deploying {@link StreamApplication} in yarn + * {@link ApplicationRunnerMain} was designed for deploying {@link SamzaApplication} in yarn * and doesn't work for in standalone. * * This runner class is built for standalone failure tests and not recommended for general use. @@ -47,17 +46,15 @@ public class LocalApplicationRunnerMain { Config orgConfig = cmdLine.loadConfig(options); Config config = Util.rewriteConfig(orgConfig); - ApplicationRunner runner = ApplicationRunner.fromConfig(config); - StreamApplication app = (StreamApplication) Class.forName(config.get(STREAM_APPLICATION_CLASS_CONFIG)).newInstance(); - - ApplicationRunnerOperation op = cmdLine.getOperation(options); + SamzaApplication app = ApplicationUtil.fromConfig(config); + ApplicationRunner runner = ApplicationRunners.getApplicationRunner(app, config); try { LOGGER.info("Launching stream application: {} to run.", app); - runner.run(app); + runner.run(); runner.waitForFinish(); } catch (Exception e) { - LOGGER.error("Exception occurred when invoking: {} on application: {}.", op, app, e); + LOGGER.error("Exception occurred when running application: {}.", app, e); } } }