> On May 29, 2015, 1:28 a.m., Navina Ramesh wrote: > > samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java, > > line 120 > > <https://reviews.apache.org/r/34500/diff/1/?file=965740#file965740line120> > > > > I thought TopologyBuilder was to abstract away the spec and provide a > > simplified API for a user implementing a simple SQL query. > > Imo, this still seems pretty involved for a user concerned with just > > defining a simple join query. > > > > I assumed we could have a builder pattern as below: > > > > ``` > > TopologyBuilder builder = TopologyBuilder > > .create() > > .join(window("stream1", 10), > > window("stream2", 10), List{joinKey1, joinKey2, ...}) > > .partition(partitionKey) > > .build() > > ``` > > > > The idea here is that the build statement order determines the > > topology. The builder just validates and chains them together. > > I can see that this can be a problem with running operators in parallel > > and possibly, make it hard for the user to understand the correct sequence > > of operators. > > I am wondering if you think this kind of a model is possible. It would > > greatly simplify the API for most users. > > Just wanted to put this comment out so that we can discuss further. > > Milinda Pathirage wrote: > I also agree with Navina here. I think we should make building topologies > simple with the builder API. One complexity of current OperatorSpec based API > is you need to create intermediate streams (EntityName)s to wire operators > together. I think we should try to hide that complexity through the builder > API. Even though source and sink hides that complexity to some extent, its > better if we can completely remove that. > > Yi Pan (Data Infrastructure) wrote: > Thank you both for the good points here. @Navina, yes, the basic idea for > the topology builder is exactly what you mentioned and the model you > illustrated is much simpler and very attractive. The issue I saw is that the > topology may not be completely linear, or a tree. It is not easy to describe > a network of operators like the following, w/o introducing the concept of > intermediate streams. > window-->aggregate--+---------------------+ > window-->aggregate----> join --+ | +-->aggregate --+ > project-->window--> join -->|split -+ | > +-------->join > ---------->join --> partition > There are three issues in the above example: > 1. the join input may be intermediate streams, which essentially could be > an output from a sub-topology > 2. the multi-output operator will make the downstream expression branch > off and not easily expressible in linear format > 3. the output of a single operator maybe used by multiple downstream > operators, again, forking off the linear expression > > Maybe we can adopt the simple join builder as you illustrated for simple > queries, although I think that I would like to add an OperatorBuilder as well > here: > OperatorRouter simpleJoinQuery = TopologyBuilder.create() > .join(OperatorBuilder.window("stream1", 10), > OperatorBuilder.window("stream2", 10), joinKeys) > .partition(partitionKey).build(); > > For more complex queries, I am thinking of the following method may work > better: > OperatorRouter router = > TopologyBuilder.create().beginStream("stream1").window(10).aggregate("group-by", > "treeId", "sum") > > .beginStream("stream2").window(10).aggregate("group-by","treeId", > "avg").join(joinCondition) > > .beginStream("stream3").project(fieldList).window(10).join(joinCondition) > .partition(partitionKey, > number,"outstream1").build(); > > In which, beginStream() always signify one linear path of operators and > add it to the topology. The following join operator will join the latest two > streams and create one joined stream. This model may even solve the issue 2 > by allowing: > ...split().beginStream(1).aggregate().beginStream(2) > .beginStream("stream4").filter().window() > .join().join().partition().build(); > > For issue 3, i.e. reuse a certain intermediate stream in multiple > downstream operator, we can introduce beginReuseStream(source, "name") as the > following: > OperatorRouter router = > TopologyBuilder.create().beginReuseStream("stream1", > "reuseStream1").window(10).aggregate("group-by", "treeId", "sum") > > .beginStream("stream2").window(10).aggregate("group-by","treeId", > "avg").join(joinCondition) > > .beginStream("stream3").project(fieldList).window(10).join(joinCondition) > .join("reuseStream1", joinCondition) > .partition(partitionKey, > number,"outstream1").build(); > in which the downstream operator will refer to the name of the reuse > stream to use the intermediate stream multiple times. > > In addition, from review of SAMZA-561 changes, I think that we will need > to include an OperatorBuilder interface as well, to help create operators > easily: > e.g. > OperatorBuilder.join().setInputs().setOutput().setJoinCondition().build(). > This can be used inside the TopologyBuilder, which allows us to hide all the > intermediate inputs/outputs stream/entity names and there will be no > OperatorSpec the programmer need to handle directly.
For the last line, I meant /opId/-output-1 - Yi ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34500/#review85590 ----------------------------------------------------------- On May 20, 2015, 11:13 p.m., Yi Pan (Data Infrastructure) wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/34500/ > ----------------------------------------------------------- > > (Updated May 20, 2015, 11:13 p.m.) > > > Review request for samza, Yan Fang, Chris Riccomini, Guozhang Wang, Milinda > Pathirage, Navina Ramesh, and Naveen Somasundaram. > > > Bugs: SAMZA-552 > https://issues.apache.org/jira/browse/SAMZA-552 > > > Repository: samza > > > Description > ------- > > SAMZA-552: added operator builder API > - The current operator builder only supports single output DAG topology yet > > > Diffs > ----- > > samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java > PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java > PRE-CREATION > > samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java > PRE-CREATION > samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java > PRE-CREATION > > samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java > PRE-CREATION > > Diff: https://reviews.apache.org/r/34500/diff/ > > > Testing > ------- > > ./gradlew clean build passed > > > Thanks, > > Yi Pan (Data Infrastructure) > >