Just one observation that I wanted to add in: I noted that actually any
range-based query clause on an ordered stream essentially means the need
for a windowing method in the ordered stream scan. Is it possible to
identify a common syntax expression from the query parser in Calcite for
any range-based clauses on a StreamScan operator and extract the window
spec like:
LogicalProject(...)
   StreamScan(table=[[STREAMS, ORDERS]], fields=[[0,1,2,3]])

To:
LogicalProject(...)
   LogicalWindow(<WindowSpec from the range defining expression in
LogicalProject>)
      StreamScan(table=[[STREAMS, ORDERS]], fields=[[0,1,2,3]])

Any comments on that? Thanks!

-Yi

On Mon, May 4, 2015 at 12:22 AM, Yi Pan <nickpa...@gmail.com> wrote:

> Hi, Julian,
>
> Thanks for the reply. I want to add  a few more points here:
> {quote}
> Once you have computed that boundary and stored it in your data structure
> you can keep on adding rows until you see one rowtime 11:00:00 or higher.
> {quote}
>  The above is not true when the incoming messages in the real distributed
> system can NOT be strictly ordered by the "rowtime". That's why in the
> current implementation of the window operator in Samza, we will need to
> know the window size to set the boundary of the window, mostly to calculate
> the end of the window.
> That said, the idea of passing the function to calculate the window
> boundary is still useful in determining the opening of the new window. If
> we call this function as wndKey = FLOOR(rowtime to HOUR), for each incoming
> message, we can easily compute the wndKey and that would be enough for us
> to determine whether we need to open a new window or not. In the tumbling
> window case, if we trust the wndKey function does not generate overlapping
> time windows, we can have a way to get around w/o keeping the end of the
> window either.
>
> However, this method has one problem I couldn't see how to overcome: for
> sliding windows, a single rowtime may belong to multiple windows, each w/ a
> different wndKey. How do we define a function to map a single rowtime to
> multiple wndKeys in this case? Any suggestions?
>
> Thanks!
>
> -Yi
>
> On Thu, Apr 30, 2015 at 1:43 PM, Julian Hyde <jul...@hydromatic.net>
> wrote:
>
>> When I implemented this in the past, I found out that what I needed was
>> not the window size exactly, but the next boundary value. For example, if I
>> am using FLOOR(rowtime TO HOUR) and the current row is 10:03:00 then the
>> next boundary will be 11:00:00. Once you have computed that boundary and
>> stored it in your data structure you can keep on adding rows until you see
>> one rowtime 11:00:00 or higher.
>>
>> For FLOOR(x TO timeUnit) there is an easy expression, namely CEIL(x TO
>> timeUnit). There are analytic solutions for a lot of common monotonic
>> expressions, and these can be discovered at query planning time.
>>
>> But for arbitrary monotonic functions (even UDFs — as long as you
>> know/trust that they are monotonic) there is a solution. It is similar to
>> the bisection method[1] of finding roots. Suppose that your value is
>> 100100010 (I give the value in binary so that that is easier to see how the
>> algorithm works) and f(100100010) = 34. You know that
>>
>> You know: f(100100010) = 34
>> Try: f(100100011) and get 34, so continue
>> Try: f(100100010) and get 34, so continue
>> Try:  f(100100100) and get 34, so continue
>> Try: f(100101000) and get 36, so backtrack
>> Try: f(100100111) and get 35, so backtrack
>> Try: f(100100110) and get 34, so stop.
>>
>> You now now that 100100111 is the lowest value of x such that f(x) gives
>> a value greater than 34. This approach takes at most one iteration for each
>> bit (so, 64 bits for timestamp values), and works for any function.  This
>> effort occurs at runtime, but 64 iterations for each tumbling window seems
>> to me to be a negligible cost.
>>
>> Julian
>>
>> [1] http://en.wikipedia.org/wiki/Root-finding_algorithm#Bisection_method
>>
>> On Apr 30, 2015, at 7:29 AM, Milinda Pathirage <mpath...@umail.iu.edu>
>> wrote:
>>
>> > Hi Julian,
>> >
>> > You are correct, I was referring to
>> https://github.com/apache/incubator-calcite/blob/master/doc/STREAM.md#tumbling-windows.
>> What I meant by "tumbling window size" was size of the window by seconds,
>> minutes or hours. Its possible to deduce the size for an expression like
>> "FLOOR(rowtime TO HOUR)”. But I couldn't still find a straight forward way
>> to do it by traversing the logical query plan. But if this expression
>> involves some other date/time calculations (For example to define 15 min
>> tubmling window like this MySQL expression
>> "ROUND(UNIX_TIMESTAMP(timestamp)/(15 * 60)) AS timekey") how can we do the
>> inferring during physical planning time? One way I can think is running
>> some know ordered timestamps through this expression and use the output
>> values to figure out the interval. But I am not sure whether this will work
>> always.
>> >
>> > Another scenario is using 'milliseconds since epoch' to achieve the
>> same tumbling window queries. Given that Avro doesn't support date or time
>> values, I was trying to get this to work by having timestamp represented as
>> a long value. In this case we can have any computation (not every
>> computation is meaningful in the context of window aggregation, but it's a
>> possibility) on the timestamp value even though not every possible
>> expression will work. So I am not sure whether inferring this at the query
>> planning time is possible or the right thing to do. May be we do as much as
>> possible validations in query planner and let the queries fail during
>> runtime for other cases.
>> >
>> > I agree that re-issuing totals can be problematic. May be we wait till
>> the timeout to issue the result. But I am sure that we may also need more
>> metadata and flags to handle these scenarios. I think Yi may have a better
>> idea about this because he was working on the window operator design. You
>> can find his design document here [1].
>> >
>> > CALCITE-704 looks interesting. I'll have a look at it.
>> >
>> > Thanks
>> > Milinda
>> >
>> > [1]
>> https://issues.apache.org/jira/secure/attachment/12708934/DESIGN-SAMZA-552-7.pdf
>> >
>> > On Wed, Apr 29, 2015 at 8:29 PM, Julian Hyde <jul...@hydromatic.net>
>> wrote:
>> > Can you give an example of the SQL syntax you are using for tumbling
>> windows? Does it use GROUP BY and FLOOR, as in
>> https://github.com/apache/incubator-calcite/blob/master/doc/STREAM.md#tumbling-windows
>> ?
>> >
>> > What do you mean by “tumbling window size”? You can easily deduce that
>> "FLOOR(rowtime TO HOUR)” covers an hour range of the rowtime column, but to
>> compute the number of rows or bytes you’d have to make assumptions about
>> data rates and then you’d only get an estimate.
>> >
>> > Regarding re-issuing totals to incorporate late arrivals. It sounds
>> useful, but you’ll have to be careful that it doesn’t screw up other
>> operators downstream. Imagine that you have an aggregate followed by
>> another aggregate that rolls it up. If the downstream operator isn’t
>> expecting duplicates then it may double-count.
>> >
>> > I think it may be OK if the stream defines a primary key, specifies
>> that there may be duplicates and the duplicates will be compacted. But in
>> short, we need more metadata, because the consumer is a dumb operator not a
>> smart human.
>> >
>> > Do you have a URL for Yi’s design doc?
>> >
>> > By the way, I am just about to check in a patch for
>> https://issues.apache.org/jira/browse/CALCITE-704 “FILTER clause for
>> aggregate functions”. I think it would be really useful for streaming
>> queries, because you can’t afford to re-run the query for a subset of the
>> data. Samza-sql should get this virtually for free when it gets the next
>> Calcite release.
>> >
>> >
>> >
>> >> On Apr 28, 2015, at 7:40 AM, Milinda Pathirage <mpath...@umail.iu.edu>
>> wrote:
>> >>
>> >> Hi Julian,
>> >>
>> >> I am working on tumbling windows and hoping to have a look at other
>> types of window aggregates next. I was trying to extract the window spec
>> out from the aggregate operator (for tumbling window) and figure out that
>> its impossible to infer tumbling window size from date time expressions or
>> from an expression over any other type of monotonic field (such as row
>> number for tuple based windows). So we were thinking of implementing
>> aggregates like we normally implement stream aggregate in standard SQL
>> (assuming group by fields are sorted) but with support for handling out of
>> order arrivals. One difference in this method compared to stream aggregate
>> from SQL is that an input row(s) can contribute to multiple outputs due to
>> late arrivals. My plan is to emit the first result for tumbling window
>> aggregate when we see a new tuple from the next window and emit result
>> again if we get a tuple for an old window. We'll have a window closing
>> policy where we will not handle tuples arriving after the window timeout.
>> Yi's window operator design document contains most of the details required.
>> What do you think about this approach to implement tumbling windows? We
>> highly appreciate your feedback on this.
>> >>
>> >> Thanks
>> >> Milinda
>> >>
>> >> On Mon, Apr 27, 2015 at 6:15 PM, Julian Hyde <jul...@hydromatic.net>
>> wrote:
>> >> Milinda,
>> >>
>> >> I have seen your work adding initial streaming SQL to Samza. Good
>> stuff.
>> >>
>> >> Which types of query are you thinking of doing next?
>> >>
>> >> As of calcite-1.2, the streaming extensions are in Calcite’s master
>> branch. (See
>> https://github.com/apache/incubator-calcite/blob/master/doc/STREAM.md.)
>> We are a couple of weeks away from the next Calcite release. If you need
>> some work done in Calcite, now would be a good time.
>> >>
>> >> Julian
>> >>
>> >>
>> >>
>> >>
>> >> --
>> >> Milinda Pathirage
>> >>
>> >> PhD Student | Research Assistant
>> >> School of Informatics and Computing | Data to Insight Center
>> >> Indiana University
>> >>
>> >> twitter: milindalakmal
>> >> skype: milinda.pathirage
>> >> blog: http://milinda.pathirage.org
>> >
>> >
>> >
>> >
>> > --
>> > Milinda Pathirage
>> >
>> > PhD Student | Research Assistant
>> > School of Informatics and Computing | Data to Insight Center
>> > Indiana University
>> >
>> > twitter: milindalakmal
>> > skype: milinda.pathirage
>> > blog: http://milinda.pathirage.org
>>
>>
>

Reply via email to