[ 
https://issues.apache.org/jira/browse/SAMZA-390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14233462#comment-14233462
 ] 

Yi Pan (Data Infrastructure) commented on SAMZA-390:
----------------------------------------------------

I had a discussion with [~criccomini] this morning. To me, the main take away 
from CQL paper is the modeling from stream to relation and relation to stream. 
I quickly checked Esper and found that Esper EPL also implements the concept of 
IStream, DStream, although in a different term as istream = IStream and rstream 
= DStream. From the paper describing the unified window definition between 
Oracle and StreamBase, it is obvious that both takes the windowing approach to 
convert stream to relation as well.

The one big concern [~criccomini] mentioned is the window of "unbounded" in the 
paper. I found that it might not be difficult to implement a single stream 
"unbounded" IStream, as mentioned in CQL paper (i.e. converting to RStream and 
window "Now"). However, if a SQL is a join between two unbounded streams, it 
does not seem to work, since by semantic definition, IStream now needs to keep 
the ever growing R(t) for both streams and perform join between them, while 
RStream on two streams with window "Now" only generate the immediate joins 
between tuples that arrives "simultaneously", not the tuples of join between 
one tuple from stream A at Now with another tuple from stream B at time t=0. 
Given that we have a set of big use cases that will join two or more continuous 
streams, it is a big issue if we can not keep unbounded history of a stream in 
a local box. I took a quick look into tigon.io and noticed one important 
tradeoff they have made in their implementation of GSQL: 
{quote}
In general, all joins must have a predicate which matches the timestamps of its 
sources, and every aggregation query must have a timestamp in its list of 
group-by fields.
Requiring pipelined query plans has three advantages. First, Tigon SQL can 
continually produce output. Second, internal tables (for joins and 
aggregations) remain small as they are continually cleaned of obsolete data. 
Third, queries can be connected together into complex data processing systems 
because the output of every stream is another data stream, suitable as the 
input to another query.
{quote}

The inspiration here is: the join between two streams should also be 
time-bounded. It actually makes sense in many of the use cases in 
time-sensative stream processing: the join is to find correlation between two 
sets of events that happened "simultaneously". Hence, if we can agree on the 
assumption that tuples in two streams that have timestamps not in a same time 
window should not or being meaningless in the join result, we can actually just 
focus on implementing a bounded window in our samza tasks and generate both 
IStream and DStream as window moves. In the ultimate store (i.e. be a 
persistent database or persistent log) that is the sink of the streamed 
results, we can opt to ignore the DStream to the store s.t. the user that 
queries the ultimate store for a time-varying sequence of join results, based 
on the tuples that are within the same time window (defined as "simultaneity"). 
I have a slightly different point of view on aggregation function mentioned in 
tigon.io's document, since there is a use case that you want *all* cumulative 
aggregated results from time t=0, as long as the total size of the relation 
generated from the aggregated function in the query is bounded over time.

A few more comments following the discussion with [~criccomini]:
Comment 4 and 5 from [~criccomini] are actually missing pieces from CQL as well 
as Esper. We are looking into how to enhance it, hoping to get something from 
MillWheel as well.
Comment 6 may be address by having the query planner as a centralized entity 
that identifies possibly same states (i.e. "synopses") and identify the task 
boundaries in the execution graph. If the synopses relationship can be kept 
within the same task boundary, it can be merged into a single one; if not, the 
state will need to be replicated. We can pound on optimization on this further 
in SAMZA-483

> High-Level Language for Samza
> -----------------------------
>
>                 Key: SAMZA-390
>                 URL: https://issues.apache.org/jira/browse/SAMZA-390
>             Project: Samza
>          Issue Type: New Feature
>            Reporter: Raul Castro Fernandez
>            Priority: Minor
>              Labels: project
>
> Discussion about high-level languages to define Samza queries. Queries are 
> defined in this language and transformed to a dataflow graph where the nodes 
> are Samza jobs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to