Just one observation that I wanted to add in: I noted that actually any range-based query clause on an ordered stream essentially means the need for a windowing method in the ordered stream scan. Is it possible to identify a common syntax expression from the query parser in Calcite for any range-based clauses on a StreamScan operator and extract the window spec like: LogicalProject(...) StreamScan(table=[[STREAMS, ORDERS]], fields=[[0,1,2,3]])
To: LogicalProject(...) LogicalWindow(<WindowSpec from the range defining expression in LogicalProject>) StreamScan(table=[[STREAMS, ORDERS]], fields=[[0,1,2,3]]) Any comments on that? Thanks! -Yi On Mon, May 4, 2015 at 12:22 AM, Yi Pan <nickpa...@gmail.com> wrote: > Hi, Julian, > > Thanks for the reply. I want to add a few more points here: > {quote} > Once you have computed that boundary and stored it in your data structure > you can keep on adding rows until you see one rowtime 11:00:00 or higher. > {quote} > The above is not true when the incoming messages in the real distributed > system can NOT be strictly ordered by the "rowtime". That's why in the > current implementation of the window operator in Samza, we will need to > know the window size to set the boundary of the window, mostly to calculate > the end of the window. > That said, the idea of passing the function to calculate the window > boundary is still useful in determining the opening of the new window. If > we call this function as wndKey = FLOOR(rowtime to HOUR), for each incoming > message, we can easily compute the wndKey and that would be enough for us > to determine whether we need to open a new window or not. In the tumbling > window case, if we trust the wndKey function does not generate overlapping > time windows, we can have a way to get around w/o keeping the end of the > window either. > > However, this method has one problem I couldn't see how to overcome: for > sliding windows, a single rowtime may belong to multiple windows, each w/ a > different wndKey. How do we define a function to map a single rowtime to > multiple wndKeys in this case? Any suggestions? > > Thanks! > > -Yi > > On Thu, Apr 30, 2015 at 1:43 PM, Julian Hyde <jul...@hydromatic.net> > wrote: > >> When I implemented this in the past, I found out that what I needed was >> not the window size exactly, but the next boundary value. For example, if I >> am using FLOOR(rowtime TO HOUR) and the current row is 10:03:00 then the >> next boundary will be 11:00:00. Once you have computed that boundary and >> stored it in your data structure you can keep on adding rows until you see >> one rowtime 11:00:00 or higher. >> >> For FLOOR(x TO timeUnit) there is an easy expression, namely CEIL(x TO >> timeUnit). There are analytic solutions for a lot of common monotonic >> expressions, and these can be discovered at query planning time. >> >> But for arbitrary monotonic functions (even UDFs — as long as you >> know/trust that they are monotonic) there is a solution. It is similar to >> the bisection method[1] of finding roots. Suppose that your value is >> 100100010 (I give the value in binary so that that is easier to see how the >> algorithm works) and f(100100010) = 34. You know that >> >> You know: f(100100010) = 34 >> Try: f(100100011) and get 34, so continue >> Try: f(100100010) and get 34, so continue >> Try: f(100100100) and get 34, so continue >> Try: f(100101000) and get 36, so backtrack >> Try: f(100100111) and get 35, so backtrack >> Try: f(100100110) and get 34, so stop. >> >> You now now that 100100111 is the lowest value of x such that f(x) gives >> a value greater than 34. This approach takes at most one iteration for each >> bit (so, 64 bits for timestamp values), and works for any function. This >> effort occurs at runtime, but 64 iterations for each tumbling window seems >> to me to be a negligible cost. >> >> Julian >> >> [1] http://en.wikipedia.org/wiki/Root-finding_algorithm#Bisection_method >> >> On Apr 30, 2015, at 7:29 AM, Milinda Pathirage <mpath...@umail.iu.edu> >> wrote: >> >> > Hi Julian, >> > >> > You are correct, I was referring to >> https://github.com/apache/incubator-calcite/blob/master/doc/STREAM.md#tumbling-windows. >> What I meant by "tumbling window size" was size of the window by seconds, >> minutes or hours. Its possible to deduce the size for an expression like >> "FLOOR(rowtime TO HOUR)”. But I couldn't still find a straight forward way >> to do it by traversing the logical query plan. But if this expression >> involves some other date/time calculations (For example to define 15 min >> tubmling window like this MySQL expression >> "ROUND(UNIX_TIMESTAMP(timestamp)/(15 * 60)) AS timekey") how can we do the >> inferring during physical planning time? One way I can think is running >> some know ordered timestamps through this expression and use the output >> values to figure out the interval. But I am not sure whether this will work >> always. >> > >> > Another scenario is using 'milliseconds since epoch' to achieve the >> same tumbling window queries. Given that Avro doesn't support date or time >> values, I was trying to get this to work by having timestamp represented as >> a long value. In this case we can have any computation (not every >> computation is meaningful in the context of window aggregation, but it's a >> possibility) on the timestamp value even though not every possible >> expression will work. So I am not sure whether inferring this at the query >> planning time is possible or the right thing to do. May be we do as much as >> possible validations in query planner and let the queries fail during >> runtime for other cases. >> > >> > I agree that re-issuing totals can be problematic. May be we wait till >> the timeout to issue the result. But I am sure that we may also need more >> metadata and flags to handle these scenarios. I think Yi may have a better >> idea about this because he was working on the window operator design. You >> can find his design document here [1]. >> > >> > CALCITE-704 looks interesting. I'll have a look at it. >> > >> > Thanks >> > Milinda >> > >> > [1] >> https://issues.apache.org/jira/secure/attachment/12708934/DESIGN-SAMZA-552-7.pdf >> > >> > On Wed, Apr 29, 2015 at 8:29 PM, Julian Hyde <jul...@hydromatic.net> >> wrote: >> > Can you give an example of the SQL syntax you are using for tumbling >> windows? Does it use GROUP BY and FLOOR, as in >> https://github.com/apache/incubator-calcite/blob/master/doc/STREAM.md#tumbling-windows >> ? >> > >> > What do you mean by “tumbling window size”? You can easily deduce that >> "FLOOR(rowtime TO HOUR)” covers an hour range of the rowtime column, but to >> compute the number of rows or bytes you’d have to make assumptions about >> data rates and then you’d only get an estimate. >> > >> > Regarding re-issuing totals to incorporate late arrivals. It sounds >> useful, but you’ll have to be careful that it doesn’t screw up other >> operators downstream. Imagine that you have an aggregate followed by >> another aggregate that rolls it up. If the downstream operator isn’t >> expecting duplicates then it may double-count. >> > >> > I think it may be OK if the stream defines a primary key, specifies >> that there may be duplicates and the duplicates will be compacted. But in >> short, we need more metadata, because the consumer is a dumb operator not a >> smart human. >> > >> > Do you have a URL for Yi’s design doc? >> > >> > By the way, I am just about to check in a patch for >> https://issues.apache.org/jira/browse/CALCITE-704 “FILTER clause for >> aggregate functions”. I think it would be really useful for streaming >> queries, because you can’t afford to re-run the query for a subset of the >> data. Samza-sql should get this virtually for free when it gets the next >> Calcite release. >> > >> > >> > >> >> On Apr 28, 2015, at 7:40 AM, Milinda Pathirage <mpath...@umail.iu.edu> >> wrote: >> >> >> >> Hi Julian, >> >> >> >> I am working on tumbling windows and hoping to have a look at other >> types of window aggregates next. I was trying to extract the window spec >> out from the aggregate operator (for tumbling window) and figure out that >> its impossible to infer tumbling window size from date time expressions or >> from an expression over any other type of monotonic field (such as row >> number for tuple based windows). So we were thinking of implementing >> aggregates like we normally implement stream aggregate in standard SQL >> (assuming group by fields are sorted) but with support for handling out of >> order arrivals. One difference in this method compared to stream aggregate >> from SQL is that an input row(s) can contribute to multiple outputs due to >> late arrivals. My plan is to emit the first result for tumbling window >> aggregate when we see a new tuple from the next window and emit result >> again if we get a tuple for an old window. We'll have a window closing >> policy where we will not handle tuples arriving after the window timeout. >> Yi's window operator design document contains most of the details required. >> What do you think about this approach to implement tumbling windows? We >> highly appreciate your feedback on this. >> >> >> >> Thanks >> >> Milinda >> >> >> >> On Mon, Apr 27, 2015 at 6:15 PM, Julian Hyde <jul...@hydromatic.net> >> wrote: >> >> Milinda, >> >> >> >> I have seen your work adding initial streaming SQL to Samza. Good >> stuff. >> >> >> >> Which types of query are you thinking of doing next? >> >> >> >> As of calcite-1.2, the streaming extensions are in Calcite’s master >> branch. (See >> https://github.com/apache/incubator-calcite/blob/master/doc/STREAM.md.) >> We are a couple of weeks away from the next Calcite release. If you need >> some work done in Calcite, now would be a good time. >> >> >> >> Julian >> >> >> >> >> >> >> >> >> >> -- >> >> Milinda Pathirage >> >> >> >> PhD Student | Research Assistant >> >> School of Informatics and Computing | Data to Insight Center >> >> Indiana University >> >> >> >> twitter: milindalakmal >> >> skype: milinda.pathirage >> >> blog: http://milinda.pathirage.org >> > >> > >> > >> > >> > -- >> > Milinda Pathirage >> > >> > PhD Student | Research Assistant >> > School of Informatics and Computing | Data to Insight Center >> > Indiana University >> > >> > twitter: milindalakmal >> > skype: milinda.pathirage >> > blog: http://milinda.pathirage.org >> >> >