[ 
https://issues.apache.org/jira/browse/SAMZA-390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14225442#comment-14225442
 ] 

Yi Pan (Data Infrastructure) commented on SAMZA-390:
----------------------------------------------------

[~milinda] Great discussion! Here are my current thoughts:
{quote}
1. Lack of tuple based sliding windows in CQL: Tuple based sliding windows are 
there in CQL. In addition to tuple and time based windows it introduce 
partitioned window where we partition the stream during window creation.
{quote}
Agreed.

{quote}
2. If we are going for SQL like semantics, it doesn't matter whether we decide 
to go with embedded DSL or plain SQL. We can decouple operator layer and the 
language layer. That's how I have designed Freshet mentioned in earlier 
comment. In my implementation I have several Samza tasks which implements 
different operators such as window, select, project and aggregate. what DSL 
layer does is generation of relation algebra like expression which will be 
converted to Samza job graph. Each individual node in this job graph is a Samza 
job with its properties such as input stream, output stream and other operator 
specific configuration parameters. In the final phase, these nodes get 
converted into properties file which describes the Samza job. IMHO, we should 
first decide what type of semantics we are going to support and then design the 
operator layer based on this semantic. 
{quote}
Great! We are thinking in the same direction. My thought is to identify the set 
of primitives (i.e. operators) for popular SQL use cases first and implement 
them at operator layer. The SQL parser layer can be built on-top of it which 
generate the same relation algebra expression as your DSL parser and convert to 
the Samza job graph. Hence, I think that in addition to the agreement on the 
set of operators in the operator layer, we should also define a common relation 
algebra expression s.t. we can re-use the translation to Samza job graph in 
your work. 

{quote}
3. I am not sure whether we should add concept of time to Samza. We can 
implement the concept of time in our operator layer and query layer, without 
integrating concept of time to Samza. Currently in Freshet [1], I don't have 
the time concept implemented. But I have designed the DSL in a way that, query 
writer can specify how to timestamp individual tuples. For example timestamp 
can be a field in tuple for some scenarios. So in Freshet DSL, user can specify 
which field contains the timestamp. Otherwise, Freshet uses time when tuple 
first appeared in Freshet as tuple's timestamp.
{quote}
The nuances exist when actually implementing the windowing technique in a 
distributed environment where: a) different tasks running on different machines 
may see different time; b) in a stream of tuples w/ timestamp, there is no 
explicit termination of a time window, when streams can be paused by arbitrary 
time period. IMO, the only meaning solution to this seems to be: i) embed the 
original timestamp (if missing, injection timestamp as in Freshet) in the data 
model of the tuple in the stream (i.e. some envelope info tagged to the tuple) 
and propagate this timestamp downward the whole stream processing pipeline. 
CQL[1] talked about some interesting points regarding how to propagate the 
input tuple's timestamp to the output tuple; ii) adopt some explicit window 
termination token in the stream for time-based window as similar to MillWheel 
[2]. Leaving it to users to specify timestamp field may not be necessary and 
complicate the logic. 

{quote}
4. I am nor sure I completely understood Raul Castro Fernandez's comment about 
moving the concept of windows out of the query. But if we are doing that, what 
we can do alternatively is keep the concept of windows in the query but 
implement the operator layer/physical query plan in a way that it separate out 
the windowing and query logic. In my experience with CQL, the same thing 
happens in CQL up to some extent. Most/all scenarios, what happens first in CQL 
query is window generation as insert/delete stream. Insert/delete stream 
assumes each tuple can be uniquely identified. When tuple is added to the 
window, it get emitted to output stream as a 'insert' tuple and when tuple get 
removed from window, it get emitter to output stream as a 'delete' tuple. 
Downstream operator should handle this insert/delete tuples according to its 
logic. I found this concept of insert/delete stream really simplifies the 
window handling. "CQL Implementation in STREAM" section in [2] contains some 
interesting details about stream query execution and how sliding windows can be 
implemented as I described above.
{quote}
I personally like the concept of a separate window operator, due to the fact 
that you mentioned regarding to computation on insert/delete streams in CQL 
(i.e. incremental processing). However, implementation of the window operator 
can be as such that we are running a query on top of the SAMZA stores hosting 
the timestamped tuples (e.g. insert/delete tuples are generated by queries 
based on the difference between the previous and the current windows). 
Generalize the storage model for the internal data model of tuples can help to 
address the memory and debugging issues [~raulcf] mentioned. 
The key issue mentioned in comparison of Oracle vs StreamBase in [3] is a 
difference one that does not matter whether we implement window operator 
directly as STREAM does, or implement it as a query. The key issue in [3] seems 
to be the ambiguity of ordering when the incoming tuples have the same 
timestamp, which messed up w/ the ordering of the tuples in/across the streams 
and hence caused non-deterministic results in different types of windows. No 
matter we embed the timestamp into the data model or not, we have to make sure: 
a) a window operation on the tuples in the same stream w/ the same timestamps 
must be deterministic; b) the ordering between tuples from different streams w/ 
the same timestamps must be deterministic. [3] addressed those two points via 
a)batchId and b) a SPREAD operator implementing a consistent ordering policy in 
a stream group (i.e. can be implemented as a consistent policy in 
MessageChooser).What I am still not sure is how this ordering info can be kept 
downstream throughout the whole stream processing pipeline.

In addition, I want to bring up for discussion the following item:
1) parallelism. We should incorporate the partitioned window in the input 
streams to help generate partitions and parallel Samza tasks in the execution 
graph. The partitions to the output stream should also be considered. I have 
been thinking of introducing some syntax like IStream(SELECT * FROM 
InStream[partitioned by A into 100 rows 1]) PARTITIONED BY B INTO 200 that 
allows us to specify the partition keys and the number of partitions in the 
input and output streams. The tricky part is now the downstream tasks in the 
execution graph may now get out-of-order arrival of tuples w.r.t. original 
timestamp. There is an interesting research paper on this out-of-arrival and 
skewed window time from Stanford [4].

[1] CQL: https://cs.uwaterloo.ca/~david/cs848/stream-cql.pdf
[2] MillWheel: http://research.google.com/pubs/pub41378.html
[3] "Towards a Streaming SQL Standard": http://cs.brown.edu/~ugur/streamsql.pdf
[4] Flexible time management in data stream systems: 
http://dl.acm.org/citation.cfm?id=1055596

> High-Level Language for Samza
> -----------------------------
>
>                 Key: SAMZA-390
>                 URL: https://issues.apache.org/jira/browse/SAMZA-390
>             Project: Samza
>          Issue Type: New Feature
>            Reporter: Raul Castro Fernandez
>            Priority: Minor
>              Labels: project
>
> Discussion about high-level languages to define Samza queries. Queries are 
> defined in this language and transformed to a dataflow graph where the nodes 
> are Samza jobs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to