[
https://issues.apache.org/jira/browse/SAMZA-390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14233462#comment-14233462
]
Yi Pan (Data Infrastructure) commented on SAMZA-390:
----------------------------------------------------
I had a discussion with [~criccomini] this morning. To me, the main take away
from CQL paper is the modeling from stream to relation and relation to stream.
I quickly checked Esper and found that Esper EPL also implements the concept of
IStream, DStream, although in a different term as istream = IStream and rstream
= DStream. From the paper describing the unified window definition between
Oracle and StreamBase, it is obvious that both takes the windowing approach to
convert stream to relation as well.
The one big concern [~criccomini] mentioned is the window of "unbounded" in the
paper. I found that it might not be difficult to implement a single stream
"unbounded" IStream, as mentioned in CQL paper (i.e. converting to RStream and
window "Now"). However, if a SQL is a join between two unbounded streams, it
does not seem to work, since by semantic definition, IStream now needs to keep
the ever growing R(t) for both streams and perform join between them, while
RStream on two streams with window "Now" only generate the immediate joins
between tuples that arrives "simultaneously", not the tuples of join between
one tuple from stream A at Now with another tuple from stream B at time t=0.
Given that we have a set of big use cases that will join two or more continuous
streams, it is a big issue if we can not keep unbounded history of a stream in
a local box. I took a quick look into tigon.io and noticed one important
tradeoff they have made in their implementation of GSQL:
{quote}
In general, all joins must have a predicate which matches the timestamps of its
sources, and every aggregation query must have a timestamp in its list of
group-by fields.
Requiring pipelined query plans has three advantages. First, Tigon SQL can
continually produce output. Second, internal tables (for joins and
aggregations) remain small as they are continually cleaned of obsolete data.
Third, queries can be connected together into complex data processing systems
because the output of every stream is another data stream, suitable as the
input to another query.
{quote}
The inspiration here is: the join between two streams should also be
time-bounded. It actually makes sense in many of the use cases in
time-sensative stream processing: the join is to find correlation between two
sets of events that happened "simultaneously". Hence, if we can agree on the
assumption that tuples in two streams that have timestamps not in a same time
window should not or being meaningless in the join result, we can actually just
focus on implementing a bounded window in our samza tasks and generate both
IStream and DStream as window moves. In the ultimate store (i.e. be a
persistent database or persistent log) that is the sink of the streamed
results, we can opt to ignore the DStream to the store s.t. the user that
queries the ultimate store for a time-varying sequence of join results, based
on the tuples that are within the same time window (defined as "simultaneity").
I have a slightly different point of view on aggregation function mentioned in
tigon.io's document, since there is a use case that you want *all* cumulative
aggregated results from time t=0, as long as the total size of the relation
generated from the aggregated function in the query is bounded over time.
A few more comments following the discussion with [~criccomini]:
Comment 4 and 5 from [~criccomini] are actually missing pieces from CQL as well
as Esper. We are looking into how to enhance it, hoping to get something from
MillWheel as well.
Comment 6 may be address by having the query planner as a centralized entity
that identifies possibly same states (i.e. "synopses") and identify the task
boundaries in the execution graph. If the synopses relationship can be kept
within the same task boundary, it can be merged into a single one; if not, the
state will need to be replicated. We can pound on optimization on this further
in SAMZA-483
> High-Level Language for Samza
> -----------------------------
>
> Key: SAMZA-390
> URL: https://issues.apache.org/jira/browse/SAMZA-390
> Project: Samza
> Issue Type: New Feature
> Reporter: Raul Castro Fernandez
> Priority: Minor
> Labels: project
>
> Discussion about high-level languages to define Samza queries. Queries are
> defined in this language and transformed to a dataflow graph where the nodes
> are Samza jobs.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)