[
https://issues.apache.org/jira/browse/SAMZA-390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14225442#comment-14225442
]
Yi Pan (Data Infrastructure) commented on SAMZA-390:
----------------------------------------------------
[~milinda] Great discussion! Here are my current thoughts:
{quote}
1. Lack of tuple based sliding windows in CQL: Tuple based sliding windows are
there in CQL. In addition to tuple and time based windows it introduce
partitioned window where we partition the stream during window creation.
{quote}
Agreed.
{quote}
2. If we are going for SQL like semantics, it doesn't matter whether we decide
to go with embedded DSL or plain SQL. We can decouple operator layer and the
language layer. That's how I have designed Freshet mentioned in earlier
comment. In my implementation I have several Samza tasks which implements
different operators such as window, select, project and aggregate. what DSL
layer does is generation of relation algebra like expression which will be
converted to Samza job graph. Each individual node in this job graph is a Samza
job with its properties such as input stream, output stream and other operator
specific configuration parameters. In the final phase, these nodes get
converted into properties file which describes the Samza job. IMHO, we should
first decide what type of semantics we are going to support and then design the
operator layer based on this semantic.
{quote}
Great! We are thinking in the same direction. My thought is to identify the set
of primitives (i.e. operators) for popular SQL use cases first and implement
them at operator layer. The SQL parser layer can be built on-top of it which
generate the same relation algebra expression as your DSL parser and convert to
the Samza job graph. Hence, I think that in addition to the agreement on the
set of operators in the operator layer, we should also define a common relation
algebra expression s.t. we can re-use the translation to Samza job graph in
your work.
{quote}
3. I am not sure whether we should add concept of time to Samza. We can
implement the concept of time in our operator layer and query layer, without
integrating concept of time to Samza. Currently in Freshet [1], I don't have
the time concept implemented. But I have designed the DSL in a way that, query
writer can specify how to timestamp individual tuples. For example timestamp
can be a field in tuple for some scenarios. So in Freshet DSL, user can specify
which field contains the timestamp. Otherwise, Freshet uses time when tuple
first appeared in Freshet as tuple's timestamp.
{quote}
The nuances exist when actually implementing the windowing technique in a
distributed environment where: a) different tasks running on different machines
may see different time; b) in a stream of tuples w/ timestamp, there is no
explicit termination of a time window, when streams can be paused by arbitrary
time period. IMO, the only meaning solution to this seems to be: i) embed the
original timestamp (if missing, injection timestamp as in Freshet) in the data
model of the tuple in the stream (i.e. some envelope info tagged to the tuple)
and propagate this timestamp downward the whole stream processing pipeline.
CQL[1] talked about some interesting points regarding how to propagate the
input tuple's timestamp to the output tuple; ii) adopt some explicit window
termination token in the stream for time-based window as similar to MillWheel
[2]. Leaving it to users to specify timestamp field may not be necessary and
complicate the logic.
{quote}
4. I am nor sure I completely understood Raul Castro Fernandez's comment about
moving the concept of windows out of the query. But if we are doing that, what
we can do alternatively is keep the concept of windows in the query but
implement the operator layer/physical query plan in a way that it separate out
the windowing and query logic. In my experience with CQL, the same thing
happens in CQL up to some extent. Most/all scenarios, what happens first in CQL
query is window generation as insert/delete stream. Insert/delete stream
assumes each tuple can be uniquely identified. When tuple is added to the
window, it get emitted to output stream as a 'insert' tuple and when tuple get
removed from window, it get emitter to output stream as a 'delete' tuple.
Downstream operator should handle this insert/delete tuples according to its
logic. I found this concept of insert/delete stream really simplifies the
window handling. "CQL Implementation in STREAM" section in [2] contains some
interesting details about stream query execution and how sliding windows can be
implemented as I described above.
{quote}
I personally like the concept of a separate window operator, due to the fact
that you mentioned regarding to computation on insert/delete streams in CQL
(i.e. incremental processing). However, implementation of the window operator
can be as such that we are running a query on top of the SAMZA stores hosting
the timestamped tuples (e.g. insert/delete tuples are generated by queries
based on the difference between the previous and the current windows).
Generalize the storage model for the internal data model of tuples can help to
address the memory and debugging issues [~raulcf] mentioned.
The key issue mentioned in comparison of Oracle vs StreamBase in [3] is a
difference one that does not matter whether we implement window operator
directly as STREAM does, or implement it as a query. The key issue in [3] seems
to be the ambiguity of ordering when the incoming tuples have the same
timestamp, which messed up w/ the ordering of the tuples in/across the streams
and hence caused non-deterministic results in different types of windows. No
matter we embed the timestamp into the data model or not, we have to make sure:
a) a window operation on the tuples in the same stream w/ the same timestamps
must be deterministic; b) the ordering between tuples from different streams w/
the same timestamps must be deterministic. [3] addressed those two points via
a)batchId and b) a SPREAD operator implementing a consistent ordering policy in
a stream group (i.e. can be implemented as a consistent policy in
MessageChooser).What I am still not sure is how this ordering info can be kept
downstream throughout the whole stream processing pipeline.
In addition, I want to bring up for discussion the following item:
1) parallelism. We should incorporate the partitioned window in the input
streams to help generate partitions and parallel Samza tasks in the execution
graph. The partitions to the output stream should also be considered. I have
been thinking of introducing some syntax like IStream(SELECT * FROM
InStream[partitioned by A into 100 rows 1]) PARTITIONED BY B INTO 200 that
allows us to specify the partition keys and the number of partitions in the
input and output streams. The tricky part is now the downstream tasks in the
execution graph may now get out-of-order arrival of tuples w.r.t. original
timestamp. There is an interesting research paper on this out-of-arrival and
skewed window time from Stanford [4].
[1] CQL: https://cs.uwaterloo.ca/~david/cs848/stream-cql.pdf
[2] MillWheel: http://research.google.com/pubs/pub41378.html
[3] "Towards a Streaming SQL Standard": http://cs.brown.edu/~ugur/streamsql.pdf
[4] Flexible time management in data stream systems:
http://dl.acm.org/citation.cfm?id=1055596
> High-Level Language for Samza
> -----------------------------
>
> Key: SAMZA-390
> URL: https://issues.apache.org/jira/browse/SAMZA-390
> Project: Samza
> Issue Type: New Feature
> Reporter: Raul Castro Fernandez
> Priority: Minor
> Labels: project
>
> Discussion about high-level languages to define Samza queries. Queries are
> defined in this language and transformed to a dataflow graph where the nodes
> are Samza jobs.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)