Yeah, I am still thinking about it. Jay pointed out for event-time window, the window start time may be derivable if we just keep a single starting value for fixed length windows. I yet to think about the tuple window case and the windows with dynamic length (i.e. session window example in MillWheel).
On Fri, Mar 6, 2015 at 7:24 AM, Milinda Pathirage <[email protected]> wrote: > I think my previous comment about maintaining start and end offsets as the > window state will not work when there are delays. We may need to keep > multiple such offsets. But this may not be a clean solution. > > On Thu, Mar 5, 2015 at 2:42 PM, Milinda Pathirage <[email protected]> > wrote: > > > Hi Yi, > > > > Please find my comments inline. > > > > On Thu, Mar 5, 2015 at 1:18 PM, Yi Pan <[email protected]> wrote: > > > >> Hi, Milinda, > >> > >> We have recently some discussions on the MillWheel model: > >> http://www.infoq.com/presentations/millwheel. > > > > > > Yes. Above is a very interesting talk. I asked the above question > > regarding the language, just after watching the talk. I was under the > > impression that we need to specify these details (handling delays) > > explicitly in the query. > > > > > >> It is very interesting talk and have one striking point that we did not > >> think about before: handle late arrivals as a "correction" to the > earlier > >> results. Hence, if we follow that model, the late arrival problem that > you > >> described can be addressed in the following: > >> > >> a) Each window will have a closing policy: it would either be wall-clock > >> based timeout, or the arrival of messages indicating that we have > received > >> all messages in the corresponding event time window > >> > > > > Given that the closing policy is not explicit in the query, how we are > > going to handle this. Is this policy going to be specific to a query or > > system wide thing. I think I was not clear about this in the previous > mail. > > > > > >> b) Each window also keeps all the past messages it receives in the past > >> windows, up to a large retention size that covers all possible late > >> arrivals > >> > > > > Are we going to keep this in local storage. Is this (keeping past > > messages) really necessary in case of monotonic queries. May be you meant > > to say we just keep metadata about offsets. So we can replay from Kafka > (I > > don't have that much experience with Kafka, but I think we can start > > consuming from random offsets). > > > > > >> c) When a window's closing policy is satisfied, the window operator > always > >> emits the current window results > >> > > > > Does this means we are waiting for the window to be closed, before > sending > > new messages downstream? This may have performance implications, but this > > will make it easy to implement the query processing. I think current > > operator layer can support this style without any changes. > > > > > >> d) When a late arrival message came, the window operator will re-emit > the > >> past window results to correct the previous window results > >> > >> > > It would be better if we can do incremental updates without replaying the > > whole window. But I believe there are advantages of this approach. > > > > > >> In your example, the aggregation for the counter for window from > >> 10:00-10:59 will have a "wrong" value when the window is closed by an > >> arrival of message w/ 11:00 timestamp, but will be corrected later by a > >> late arrival of another message in the time window from 10:00-10:59. > I.e. > >> if we keep all the previous window states, late arrival messages will > >> simply trigger a re-computation of the aggregated counter for the window > >> 10:00-10:59 and overwrite the previous result. In this model, the final > >> result is always correct, as long as the late arrivals is within the > large > >> retention size. > >> > >> I have been thinking of this model and had a discussion with Julian > >> yesterday. It seems that the followings are more reasonable to me: > >> 1) Window operator will have a full buffered state of the stream similar > >> to > >> a time-varying materialized view over the retention size > >> 2) Window size and termination (i.e. sliding/tumbling/hopping windows) > >> will > >> now determine when we emit window results (i.e. new messages/updates to > >> the > >> current window) to the downstream operator s.t. the operators can > >> calculate > >> result in time > >> 3) Late arrivals will be sent to the downstream operator and triggers a > >> re-computation of the past result based on the full buffered state > >> > >> In the above model, the window operator becomes a system feature, or an > >> implementation of "StreamScan" in Calcite's term. And we do not need > >> specific language support for the window semantics, with a default time > >> window operator implementation that serves as a "StreamScan". All > window > >> definition in the query language now only dictates the semantic meaning > of > >> aggregation and join on top of the physical window operator which > >> provides: > >> a) a varying/growing materialized view; b) a driver that tells the > >> aggregation/join to compute/re-compute results on-top-of the > materialized > >> view. > >> > >> > >> > > I will think more about this model and may have more questions about this > > in future :). > > > > Thanks > > Milinda > > > > > >> On Wed, Mar 4, 2015 at 10:28 AM, Milinda Pathirage < > [email protected] > >> > > >> wrote: > >> > >> > Hi Julian, > >> > > >> > I went through the draft and it covers most of our requirements. But > >> > aggregation over a window will not be as simple as mentioned in the > >> draft. > >> > > >> > In the stream extension draft we have following: > >> > > >> > 'How did Calcite know that the 10:00:00 sub-totals were complete at > >> > > 11:00:00, so that it could emit them? It knows that rowtime is > >> > increasing, > >> > > and it knows that FLOOR(rowtime TO HOUR) is also increasing. So, > once > >> it > >> > > has seen a row at or after 11:00:00, it will never see a row that > will > >> > > contribute to a 10:00:00 total.' > >> > > >> > > >> > When there are delays, we can't do above. Because observing a row with > >> > rowtime greater than 11:00:00 doesn't mean events from 10:00:00 to > >> 10:00:59 > >> > time window will not arrive after this observation. We have discussed > >> this > >> > in https://issues.apache.org/jira/browse/SAMZA-552. Even if we > consider > >> > the > >> > 'system time/stream time' as mentioned in SAMZA-552, it doesn't > >> guarantee > >> > the absence of delays in a distributed setting. So we may need to > >> > additional hints/extensions to specify extra information required to > >> handle > >> > complexities in window calculations. > >> > > >> > May be there are ways to handle this at Samza level, not in the query > >> > language. > >> > > >> > @Chirs, @Yi > >> > I got the query planner working with some dummy operators and > re-writing > >> > the query to add default window operators. But Julian's comments about > >> > handling defaults and optimizing the query plan (moving the Delta down > >> and > >> > removing both Delta and Chi) got me into thinking whether enforcing > CQL > >> > semantics as we have in our current operator layer limits the > >> flexibility > >> > and increase the complexity of query plan to operator router > generation. > >> > Anyway, I am going to take a step back and think more about Julian's > >> > comments. I'll put my thoughts into a design document for query > planner. > >> > > >> > Thanks > >> > Milinda > >> > > >> > > >> > On Tue, Mar 3, 2015 at 3:40 PM, Julian Hyde <[email protected]> > >> wrote: > >> > > >> > > Sorry to show up late to this party. I've had my head down writing a > >> > > description of streaming SQL which I hoped would answer questions > like > >> > > this. Here is the latest draft: > >> > > > >> https://github.com/julianhyde/incubator-calcite/blob/chi/doc/STREAM.md > >> > > > >> > > I've been avoiding windows for now. They are not needed for simple > >> > queries > >> > > (project, filter, windowed aggregate) and I wanted to write the > >> > > specification of more complex queries before I introduce them. > >> > > > >> > > Let's look at a simple query, filter. According to CQL, to evaluate > >> > > > >> > > select stream * > >> > > from orders > >> > > where productId = 10 (query 1) > >> > > > >> > > you need to convert orders to a relation over a particular window, > >> apply > >> > > the filter, then convert back to a stream. We could write > >> > > > >> > > select stream * > >> > > from orders over (order by rowtime range between unbounded > preceding > >> > and > >> > > current row) > >> > > where productId = 10 (query 2) > >> > > > >> > > or we could write > >> > > > >> > > select stream * > >> > > from orders over (order by rowtime range between current row and > >> > current > >> > > row) > >> > > where productId = 10 (query 3) > >> > > > >> > > Very different windows, but they produce the same result, because of > >> the > >> > > stateless nature of Filter. So, let's suppose that the default > window > >> is > >> > > the one I gave first, "(order by rowtime range between unbounded > >> > preceding > >> > > and current row)", and so query 1 is just short-hand for query 2. > >> > > > >> > > I currently translate query 1 to > >> > > > >> > > Delta > >> > > Filter($1 = 10) > >> > > Scan(orders) > >> > > > >> > > but I should really be translating to > >> > > > >> > > Delta > >> > > Filter($1 = 10) > >> > > Chi(order by $0 range between unbounded preceding and current > row) > >> > > Scan(orders) > >> > > > >> > > Delta is the "differentiation" operator and Chi is the "integration" > >> > > operator. After we apply rules to push the Delta through the Filter, > >> the > >> > > Delta and Chi will collide and cancel each other out. > >> > > > >> > > Why have I not yet introduced the Chi operator? Because I have not > yet > >> > > dealt with a query where it makes any difference. > >> > > > >> > > Where it will make a difference is joins. But even for joins, I hold > >> out > >> > > hope that we can avoid explicit windows, most of the time. One could > >> > write > >> > > > >> > > select stream * > >> > > from orders over (order by rowtime range between current row and > >> > > interval '1' hour following) > >> > > join shipments > >> > > on orders.orderId = shipments.orderId (query 4) > >> > > > >> > > but I think most people would find the following clearer: > >> > > > >> > > select stream * > >> > > from orders > >> > > join shipments > >> > > on orders.orderId = shipments.orderId (query 5) > >> > > and shipments.rowtime between orders.rowtime and orders.rowtime + > >> > > interval '1' hour > >> > > > >> > > Under the covers there are still the implicit windows: > >> > > > >> > > select stream * > >> > > from orders over (order by rowtime range between unbounded > preceding > >> > and > >> > > current row) > >> > > join shipments over (order by rowtime range between unbounded > >> preceding > >> > > and current row) > >> > > on orders.orderId = shipments.orderId (query 6) > >> > > and shipments.rowtime between orders.rowtime and orders.rowtime + > >> > > interval '1' hour > >> > > > >> > > Query 6 is equivalent to query 5. But the system can notice the join > >> > > condition involving the two streams' rowtimes and trim down the > >> windows > >> > > (one window to an hour, another window to just the current row) > >> without > >> > > changing semantics: > >> > > > >> > > select stream * > >> > > from orders over (order by rowtime range between interval '1' hour > >> > > preceding and current row) > >> > > join shipments over (order by rowtime range between current row > and > >> > > current row) > >> > > on orders.orderId = shipments.orderId (query 7) > >> > > and shipments.rowtime between orders.rowtime and orders.rowtime + > >> > > interval '1' hour > >> > > > >> > > So, my hope is that end-users will rarely need to use an explicit > >> window. > >> > > > >> > > In the algebra, we will start introducing Chi. It will evaporate for > >> > > simple queries such as Filter. It will remain for more complex > queries > >> > such > >> > > as stream-to-stream join, because you are joining the current row of > >> one > >> > > stream to a time-varying relation based on the other, and Chi > >> represents > >> > > that "recent history of a stream" relation. > >> > > > >> > > Julian > >> > > > >> > > > >> > > > On Mar 2, 2015, at 11:42 AM, Milinda Pathirage < > >> [email protected]> > >> > > wrote: > >> > > > > >> > > > Hi Yi, > >> > > > > >> > > > As I understand rules and re-writes basically do the same thing > >> > > > (changing/re-writing the operator tree). But in case of rules this > >> > > happens > >> > > > during planning based on the query planner configuration. And > >> > re-writing > >> > > is > >> > > > done on the planner output, after the query goes through the > >> planner. > >> > In > >> > > > Calcite re-write is happening inside the interpreter and in our > >> case it > >> > > > will be inside the query plan to operator router conversion phase. > >> > > > > >> > > > Thanks > >> > > > Milinda > >> > > > > >> > > > On Mon, Mar 2, 2015 at 2:31 PM, Yi Pan <[email protected]> > wrote: > >> > > > > >> > > >> Hi, Milinda, > >> > > >> > >> > > >> +1 on your default window idea. One question: what's the > difference > >> > > between > >> > > >> a rule and a re-write? > >> > > >> > >> > > >> Thanks! > >> > > >> > >> > > >> On Mon, Mar 2, 2015 at 7:14 AM, Milinda Pathirage < > >> > > [email protected]> > >> > > >> wrote: > >> > > >> > >> > > >>> @Chris > >> > > >>> Yes, I was referring to that mail. Actually I was wrong about > the > >> > ‘Now’ > >> > > >>> window, it should be a ‘Unbounded’ window for most the default > >> > > scenarios > >> > > >>> (Section 6.4 of > >> https://cs.uwaterloo.ca/~david/cs848/stream-cql.pdf > >> > ). > >> > > >>> Because > >> > > >>> applying a ‘Now’ window with size of 1 will double the number of > >> > events > >> > > >>> generated if we consider insert/delete streams. But ‘Unbounded’ > >> will > >> > > only > >> > > >>> generate insert events. > >> > > >>> > >> > > >>> @Yi > >> > > >>> 1. You are correct about Calcite.There is no stream-to-relation > >> > > >> conversion > >> > > >>> happening. But as I understand we don’t need Calcite to support > >> this. > >> > > We > >> > > >>> can add it to our query planner as a rule or re-write. What I am > >> not > >> > > sure > >> > > >>> is whether to use a rule or a re-write. > >> > > >>> 2. There is a rule in Calcite which extract the Window out from > >> the > >> > > >>> Project. But I am not sure why that didn’t happen in my test. > This > >> > rule > >> > > >> is > >> > > >>> added to the planner by default. I’ll ask about this in Calcite > >> > mailing > >> > > >>> list. > >> > > >>> > >> > > >>> I think we can figure out a way to move the window to the input > >> > stream > >> > > if > >> > > >>> Calcite can move the window out from Project. I’ll see how we > can > >> do > >> > > >> this. > >> > > >>> > >> > > >>> Also I’ll go ahead and implement default windows. We can change > it > >> > > later > >> > > >> if > >> > > >>> Julian or someone from Calcite comes up with a better > suggestion. > >> > > >>> > >> > > >>> Thanks > >> > > >>> Milinda > >> > > >>> > >> > > >>> On Sun, Mar 1, 2015 at 8:23 PM, Yi Pan <[email protected]> > >> wrote: > >> > > >>> > >> > > >>>> Hi, Milinda, > >> > > >>>> > >> > > >>>> Sorry to reply late on this. Here are some of my comments: > >> > > >>>> 1) In Calcite's model, it seems that there is no > >> stream-to-relation > >> > > >>>> conversion step. In the first example where the window > >> specification > >> > > is > >> > > >>>> missing, I like your solution to add the default > LogicalNowWindow > >> > > >>> operator > >> > > >>>> s.t. it makes the physical operator matches the query plan. > >> However, > >> > > if > >> > > >>>> Calcite community does not agree to add the default > >> > LogicalNowWindow, > >> > > >> it > >> > > >>>> would be fine for us if we always insert a default "now" window > >> on a > >> > > >>> stream > >> > > >>>> when we generate the Samza configuration. > >> > > >>>> 2) I am more concerned on the other cases, where window > operator > >> is > >> > > >> used > >> > > >>> in > >> > > >>>> aggregation and join. In your example of windowed aggregation > in > >> > > >> Calcite, > >> > > >>>> window spec seems to be a decoration to the LogicalProject > >> operator, > >> > > >>>> instead of defining a data source to the LogicalProject > >> operator. In > >> > > >> the > >> > > >>>> CQL model we followed, the window operator is considered as a > >> query > >> > > >>>> primitive that generate a data source for other relation > >> operators > >> > to > >> > > >>>> consume. How exactly is window operator used in Calcite > planner? > >> > Isn't > >> > > >> it > >> > > >>>> much clear if the following is used? > >> > > >>>> LogicalProject(EXPR$0=[CASE(>(COUNT($2), 0), > >> > CAST($SUM0($2)):INTEGER, > >> > > >>>> null)]) > >> > > >>>> LogicalWindow(ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING) > >> > > >>>> > >> > > >>>> On Thu, Feb 26, 2015 at 12:18 PM, Milinda Pathirage < > >> > > >>> [email protected] > >> > > >>>>> > >> > > >>>> wrote: > >> > > >>>> > >> > > >>>>> Hi devs, > >> > > >>>>> > >> > > >>>>> I ask about $subject in calcite-dev. You can find the archived > >> > > >>> discussion > >> > > >>>>> at [1]. I think your thoughts are also valuable in this > >> discussion > >> > in > >> > > >>>>> calcite list. > >> > > >>>>> > >> > > >>>>> I discovered the requirement for a default window operator > when > >> I > >> > > >> tried > >> > > >>>> to > >> > > >>>>> integrate streamscan (I was using tablescan prevously) into > the > >> > > >>> physical > >> > > >>>>> plan generation logic. Because of the way we have written the > >> > > >>>>> OperatorRouter API, we always need a stream-to-relation > >> operator at > >> > > >> the > >> > > >>>>> input. But Calcite generates a query plan like following: > >> > > >>>>> > >> > > >>>>> LogicalDelta > >> > > >>>>> LogicalProject(id=[$0], product=[$1], quantity=[$2]) > >> > > >>>>> LogicalFilter(condition=[>($2, 5)]) > >> > > >>>>> > >> > > >>>>> StreamScan(table=[[KAFKA, ORDERS]], fields=[[0, 1, 2]]) > >> > > >>>>> > >> > > >>>>> If we consider LogicalFilter as a relation operator, we need > >> > > >> something > >> > > >>> to > >> > > >>>>> convert input stream to a relation before sending the tuples > >> > > >>> downstream. > >> > > >>>>> In addition to this, there is a optimization where we consider > >> > filter > >> > > >>>>> operator as a tuple operator and have it between StreamScan > and > >> > > >>>>> stream-to-relation operator as a way of reducing the amount of > >> > > >> messages > >> > > >>>>> going downstream. > >> > > >>>>> > >> > > >>>>> Other scenario is windowed aggregates. Currently window spec > is > >> > > >>> attached > >> > > >>>> to > >> > > >>>>> the LogicalProject in query plan like following: > >> > > >>>>> > >> > > >>>>> LogicalProject(EXPR$0=[CASE(>(COUNT($2) OVER (ROWS BETWEEN 2 > >> > > >> PRECEDING > >> > > >>>> AND > >> > > >>>>> 2 FOLLOWING), 0), CAST($SUM0($2) OVER (ROWS BETWEEN 2 > PRECEDING > >> > AND 2 > >> > > >>>>> FOLLOWING)):INTEGER, null)]) > >> > > >>>>> > >> > > >>>>> I wanted to know from them whether it is possible to move > window > >> > > >>>> operation > >> > > >>>>> just after the stream scan, so that it is compatible with our > >> > > >> operator > >> > > >>>>> layer. > >> > > >>>>> May be there are better or easier ways to do this. So your > >> comments > >> > > >> are > >> > > >>>>> always welcome. > >> > > >>>>> > >> > > >>>>> Thanks > >> > > >>>>> Milinda > >> > > >>>>> > >> > > >>>>> > >> > > >>>>> [1] > >> > > >>>>> > >> > > >>>>> > >> > > >>>> > >> > > >>> > >> > > >> > >> > > > >> > > >> > http://mail-archives.apache.org/mod_mbox/incubator-calcite-dev/201502.mbox/browser > >> > > >>>>> > >> > > >>>>> -- > >> > > >>>>> Milinda Pathirage > >> > > >>>>> > >> > > >>>>> PhD Student | Research Assistant > >> > > >>>>> School of Informatics and Computing | Data to Insight Center > >> > > >>>>> Indiana University > >> > > >>>>> > >> > > >>>>> twitter: milindalakmal > >> > > >>>>> skype: milinda.pathirage > >> > > >>>>> blog: http://milinda.pathirage.org > >> > > >>>>> > >> > > >>>> > >> > > >>> > >> > > >>> > >> > > >>> > >> > > >>> -- > >> > > >>> Milinda Pathirage > >> > > >>> > >> > > >>> PhD Student | Research Assistant > >> > > >>> School of Informatics and Computing | Data to Insight Center > >> > > >>> Indiana University > >> > > >>> > >> > > >>> twitter: milindalakmal > >> > > >>> skype: milinda.pathirage > >> > > >>> blog: http://milinda.pathirage.org > >> > > >>> > >> > > >> > >> > > > > >> > > > > >> > > > > >> > > > -- > >> > > > Milinda Pathirage > >> > > > > >> > > > PhD Student | Research Assistant > >> > > > School of Informatics and Computing | Data to Insight Center > >> > > > Indiana University > >> > > > > >> > > > twitter: milindalakmal > >> > > > skype: milinda.pathirage > >> > > > blog: http://milinda.pathirage.org > >> > > > >> > > > >> > > >> > > >> > -- > >> > Milinda Pathirage > >> > > >> > PhD Student | Research Assistant > >> > School of Informatics and Computing | Data to Insight Center > >> > Indiana University > >> > > >> > twitter: milindalakmal > >> > skype: milinda.pathirage > >> > blog: http://milinda.pathirage.org > >> > > >> > > > > > > > > -- > > Milinda Pathirage > > > > PhD Student | Research Assistant > > School of Informatics and Computing | Data to Insight Center > > Indiana University > > > > twitter: milindalakmal > > skype: milinda.pathirage > > blog: http://milinda.pathirage.org > > > > > > -- > Milinda Pathirage > > PhD Student | Research Assistant > School of Informatics and Computing | Data to Insight Center > Indiana University > > twitter: milindalakmal > skype: milinda.pathirage > blog: http://milinda.pathirage.org >
