-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37506/#review96136
-----------------------------------------------------------


Hi, Milinda, sorry for the late review. I have put down my comments below. 
Overall, there are two things to be discussed:
1) Adding OperatorBuilder interface as well. It serves two purposes:
   a) I remember that we have discussed the need for this due to the fact that 
in the parsing/planning phase, there are cases where the required parameters 
for the operator are not generated / finalized yet (hence you have added some 
setter functions in OperatorSpec as workaround). W/ OperatorBuilder, it is much 
easier that we just keep setting the parameters w/o calling build()
   b) In the user code directly using operator layer API, using OperatorBuilder 
can help to make the TopologyBuilder code more intuitive and helps to hide away 
all unnecessary specs s.t. intermediate stream/table names and/or operator names
2) The implementation details of TopologyBuilder. I would prefer still keep a 
graph-based implementation of TopologyBuilder internally, instead of a 
stack-based implementation, due to the flexible representation the graph-based 
implementation is able to. At the API, we should first focus on DAG-like 
operators. However, I would prefer to keep the implementation flexible to avoid 
having to re-write the TopologyBuilder class later, when we need to support 
non-DAG-like operators. p.s. It would be good if you can modify the example 
tasks using the fluent-style APIs to illustrate how the user experience is. And 
w/ the help from OperatorBuilder, the TopologyBuilder implementaion can achieve 
this: if user does not specify the input/output streams/tables (like in 
DAG-like operators), TopologyBuilder should be able to figure out and generate 
the intermediate streams/tables names and connect the operators via those 
intermediate streams/tables. This is a step we must do anyways for DAG-like oper
 ators. If the user specifies the input/output streams in the OperatorBuilder, 
the named streams/tables are created as vertices in the graph and operators are 
now connected to those vertices if they consume from those streams/tables. This 
is a simple extension from the DAG model that does not need structure-change in 
the TopologyBuilder.

Just my two cents. Thanks!


samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java 
(line 161)
<https://reviews.apache.org/r/37506/#comment151391>

    My original intention to introduce the anonymous stream here is to 
represent the intermediate streams/tables. If we explicitly introduced the 
intermediate streams and tables in the following methods, I think that we can 
drop the anonymous ones.



samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java 
(line 174)
<https://reviews.apache.org/r/37506/#comment151392>

    Could you elaborate more on what to be fixed here?



samza-sql-core/src/main/java/org/apache/samza/sql/api/expressions/ScalarExpression.java
 (line 28)
<https://reviews.apache.org/r/37506/#comment151393>

    Is this going to the interface exposed to users who are writing SQL tasks? 
It would be good to think of not using the generic Object class in the 
interface classes between the Samza framework vs user code, to follow the 
spirit in SAMZA-697.



samza-sql-core/src/main/java/org/apache/samza/sql/api/expressions/TupleExpression.java
 (line 28)
<https://reviews.apache.org/r/37506/#comment151394>

    Same here.



samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java
 (line 19)
<https://reviews.apache.org/r/37506/#comment151395>

    I think that in the new TopologyBuilder + OperatorBuilder, there is a way 
to remove the OperatorSink and OperatorSource interfaces. The main purpose for 
those interfaces to exist is the requirement to refer to the partial topology 
that a) has one output; Or b) has one input that has not been bound to a system 
stream/table or an intermediate stream/table. I have thought about that if we 
follow an API similar to trident, any immediately connected operators won't 
require the sink/source interfaces, and any not-immediately connected operators 
will need to connect via a named intermediate stream/table. Hence, removing the 
need to create OperatorSink/OperatorSource classes.



samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
 
<https://reviews.apache.org/r/37506/#comment151396>

    nit: I still think that a note here stressing the need to get the real 
"event time" instead of the message's receive time based on local system is 
important.



samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java
 (line 77)
<https://reviews.apache.org/r/37506/#comment151397>

    Why do we need this? I thought that we can directly produce to the system 
streams, w/ the isSystemStream flag in the EntityName class?



samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilderV2.java
 (line 100)
<https://reviews.apache.org/r/37506/#comment151398>

    nit: why don't we call it addOperator() directly?



samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilderV2.java
 (line 133)
<https://reviews.apache.org/r/37506/#comment151399>

    Me neither. I don't see the need to emit the table to a stream either.



samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilderV2.java
 (line 136)
<https://reviews.apache.org/r/37506/#comment151400>

    So, I assume that the stack is used as intermediate context for DAG 
computation? It works for computations like algebra. What I am worried about is 
that when the non-algebra types of operators (such as split operator in my 
previous examples, or in a case where one intermediate result is used by 
multiple downstream operators as input) are needed, this builder will need to 
be completely re-written, due to the strict stack-implementation that limits 
the types of computation it can support. I would prefer to have a generic 
implementation that can support more than DAG type of computation, but we can 
keep the API to look like fluent style for DAGs.



samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/InsertToStreamOp.java
 (line 16)
<https://reviews.apache.org/r/37506/#comment151401>

    Question: I am not quite sure about why we need this. Is it simply a 
projection operator that directly send output to the system streams?



samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java (line 
89)
<https://reviews.apache.org/r/37506/#comment151402>

    The goal here is to use the topology builder to generate the query. Can you 
update the code here to use the topology builder?



samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java
 (line 119)
<https://reviews.apache.org/r/37506/#comment151405>

    The previous discussion has led us to the point that we think that using 
OperatorBuilder seems to be easier here:
    this.simpleRtr = TopologyBuilder.create()
        .join(OperatorBuilder.window()
            .size(10).source("kafka:inputstream2")
            .setCallback(this.wndCallback),
          OperatorBuilder.window()
            .size(10).source("kafka:inputstream1")
            .setCallback(this.wndCallback),
          OperatorBuilder.join().setJoinFields(new ArrayList<String>() {{ 
add("key1"); add("key2");}})
        .partition(OperatorBuilder.partition()
            .setPartitionKey("joinKey")
            .setPartitionNum(50)
            .setOutput("kafka:parOutputStrm1"))
        .build()
    
    In which, all intermediate streams that are immediately consumed by the 
downstream operators are not named. Only the actual input/output streams are 
named. And OperatorBuilders are passed in as parameters to TopologyBuilder, 
s.t. intermediate stream/table names are generated and set to the 
OperatorBuilders within the Topology, w/o users to involved. Also, w/ the 
OperatorBuilder model, it would be easier to build a more flexible non-DAG 
topology later: users can name the operator's outputs s.t. it can be consumed 
by multiple downstream operators. I agree that it should not be the first 
priority to implement it. But it would be nice to keep the door open, instead 
of requiring re-implementing the TopologyBuilder layer later.


- Yi Pan (Data Infrastructure)


On Aug. 16, 2015, 3:57 p.m., Milinda Pathirage wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/37506/
> -----------------------------------------------------------
> 
> (Updated Aug. 16, 2015, 3:57 p.m.)
> 
> 
> Review request for samza, Yi Pan (Data Infrastructure) and Navina Ramesh.
> 
> 
> Bugs: SAMZA-552
>     https://issues.apache.org/jira/browse/SAMZA-552
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> New proposal for TopologuBuilder API proposed in rb34500 
> (https://reviews.apache.org/r/34500/).
> 
> * Created a new class called TopologyBuilderV2 instead of changing existing 
> TopologyBuilder
> * org.apache.samza.sql.operators.factory.TestTopologyBuilderV2 contains two 
> tests which demonstrate the basic usage of the new API
> * Window and aggregate related draft APIs are not done yet
> * This is a WIP, please feel free to comment on the APIs
> * This contains Yi's changes from RB 34500
> 
> 
> Diffs
> -----
> 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java 
> 80ba455 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Schema.java 
> 1e8f192 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java 
> 7b4d984 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/expressions/ScalarExpression.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/expressions/TupleExpression.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java 
> d6f6b57 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java
>  fb2aa89 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java
>  0759638 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java
>  c49a822 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
>  72a59f2 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/NoopOperatorCallback.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorImpl.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorSpec.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleRouter.java 
> PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java
>  c3d2266 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java
>  cbc84d0 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java
>  e66451f 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java
>  56753b6 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java
>  e570897 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilderException.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilderV2.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/filter/FilterOp.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/filter/FilterSpec.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/JoinSpec.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/JoinType.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamRelationJoin.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java
>  2854aeb 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java
>  cc0aca0 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/InsertToStreamOp.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/Operation.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/StreamModifySpec.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/TableModifySpec.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/FieldBasedPartitionKeyGenerator.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
>  b93d789 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java
>  c47eed9 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/project/ProjectOp.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/project/ProjectSpec.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/StreamScan.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/StreamScanSpec.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/TableScan.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/TableScanSpec.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java
>  d81cc93 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java
>  eec32ea 
>   
> samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java
>  b29838a 
>   
> samza-sql-core/src/test/java/org/apache/samza/sql/operators/factory/TestTopologyBuilderV2.java
>  PRE-CREATION 
>   
> samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java
>  20dc701 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java 
> 9124e3c 
>   
> samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java
>  96e96c3 
> 
> Diff: https://reviews.apache.org/r/37506/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew :samza-sql-core:test passed
> 
> 
> Thanks,
> 
> Milinda Pathirage
> 
>

Reply via email to