Hi, Julian,

Thanks for the reply. I want to add  a few more points here:
{quote}
Once you have computed that boundary and stored it in your data structure
you can keep on adding rows until you see one rowtime 11:00:00 or higher.
{quote}
 The above is not true when the incoming messages in the real distributed
system can NOT be strictly ordered by the "rowtime". That's why in the
current implementation of the window operator in Samza, we will need to
know the window size to set the boundary of the window, mostly to calculate
the end of the window.
That said, the idea of passing the function to calculate the window
boundary is still useful in determining the opening of the new window. If
we call this function as wndKey = FLOOR(rowtime to HOUR), for each incoming
message, we can easily compute the wndKey and that would be enough for us
to determine whether we need to open a new window or not. In the tumbling
window case, if we trust the wndKey function does not generate overlapping
time windows, we can have a way to get around w/o keeping the end of the
window either.

However, this method has one problem I couldn't see how to overcome: for
sliding windows, a single rowtime may belong to multiple windows, each w/ a
different wndKey. How do we define a function to map a single rowtime to
multiple wndKeys in this case? Any suggestions?

Thanks!

-Yi

On Thu, Apr 30, 2015 at 1:43 PM, Julian Hyde <jul...@hydromatic.net> wrote:

> When I implemented this in the past, I found out that what I needed was
> not the window size exactly, but the next boundary value. For example, if I
> am using FLOOR(rowtime TO HOUR) and the current row is 10:03:00 then the
> next boundary will be 11:00:00. Once you have computed that boundary and
> stored it in your data structure you can keep on adding rows until you see
> one rowtime 11:00:00 or higher.
>
> For FLOOR(x TO timeUnit) there is an easy expression, namely CEIL(x TO
> timeUnit). There are analytic solutions for a lot of common monotonic
> expressions, and these can be discovered at query planning time.
>
> But for arbitrary monotonic functions (even UDFs — as long as you
> know/trust that they are monotonic) there is a solution. It is similar to
> the bisection method[1] of finding roots. Suppose that your value is
> 100100010 (I give the value in binary so that that is easier to see how the
> algorithm works) and f(100100010) = 34. You know that
>
> You know: f(100100010) = 34
> Try: f(100100011) and get 34, so continue
> Try: f(100100010) and get 34, so continue
> Try:  f(100100100) and get 34, so continue
> Try: f(100101000) and get 36, so backtrack
> Try: f(100100111) and get 35, so backtrack
> Try: f(100100110) and get 34, so stop.
>
> You now now that 100100111 is the lowest value of x such that f(x) gives a
> value greater than 34. This approach takes at most one iteration for each
> bit (so, 64 bits for timestamp values), and works for any function.  This
> effort occurs at runtime, but 64 iterations for each tumbling window seems
> to me to be a negligible cost.
>
> Julian
>
> [1] http://en.wikipedia.org/wiki/Root-finding_algorithm#Bisection_method
>
> On Apr 30, 2015, at 7:29 AM, Milinda Pathirage <mpath...@umail.iu.edu>
> wrote:
>
> > Hi Julian,
> >
> > You are correct, I was referring to
> https://github.com/apache/incubator-calcite/blob/master/doc/STREAM.md#tumbling-windows.
> What I meant by "tumbling window size" was size of the window by seconds,
> minutes or hours. Its possible to deduce the size for an expression like
> "FLOOR(rowtime TO HOUR)”. But I couldn't still find a straight forward way
> to do it by traversing the logical query plan. But if this expression
> involves some other date/time calculations (For example to define 15 min
> tubmling window like this MySQL expression
> "ROUND(UNIX_TIMESTAMP(timestamp)/(15 * 60)) AS timekey") how can we do the
> inferring during physical planning time? One way I can think is running
> some know ordered timestamps through this expression and use the output
> values to figure out the interval. But I am not sure whether this will work
> always.
> >
> > Another scenario is using 'milliseconds since epoch' to achieve the same
> tumbling window queries. Given that Avro doesn't support date or time
> values, I was trying to get this to work by having timestamp represented as
> a long value. In this case we can have any computation (not every
> computation is meaningful in the context of window aggregation, but it's a
> possibility) on the timestamp value even though not every possible
> expression will work. So I am not sure whether inferring this at the query
> planning time is possible or the right thing to do. May be we do as much as
> possible validations in query planner and let the queries fail during
> runtime for other cases.
> >
> > I agree that re-issuing totals can be problematic. May be we wait till
> the timeout to issue the result. But I am sure that we may also need more
> metadata and flags to handle these scenarios. I think Yi may have a better
> idea about this because he was working on the window operator design. You
> can find his design document here [1].
> >
> > CALCITE-704 looks interesting. I'll have a look at it.
> >
> > Thanks
> > Milinda
> >
> > [1]
> https://issues.apache.org/jira/secure/attachment/12708934/DESIGN-SAMZA-552-7.pdf
> >
> > On Wed, Apr 29, 2015 at 8:29 PM, Julian Hyde <jul...@hydromatic.net>
> wrote:
> > Can you give an example of the SQL syntax you are using for tumbling
> windows? Does it use GROUP BY and FLOOR, as in
> https://github.com/apache/incubator-calcite/blob/master/doc/STREAM.md#tumbling-windows
> ?
> >
> > What do you mean by “tumbling window size”? You can easily deduce that
> "FLOOR(rowtime TO HOUR)” covers an hour range of the rowtime column, but to
> compute the number of rows or bytes you’d have to make assumptions about
> data rates and then you’d only get an estimate.
> >
> > Regarding re-issuing totals to incorporate late arrivals. It sounds
> useful, but you’ll have to be careful that it doesn’t screw up other
> operators downstream. Imagine that you have an aggregate followed by
> another aggregate that rolls it up. If the downstream operator isn’t
> expecting duplicates then it may double-count.
> >
> > I think it may be OK if the stream defines a primary key, specifies that
> there may be duplicates and the duplicates will be compacted. But in short,
> we need more metadata, because the consumer is a dumb operator not a smart
> human.
> >
> > Do you have a URL for Yi’s design doc?
> >
> > By the way, I am just about to check in a patch for
> https://issues.apache.org/jira/browse/CALCITE-704 “FILTER clause for
> aggregate functions”. I think it would be really useful for streaming
> queries, because you can’t afford to re-run the query for a subset of the
> data. Samza-sql should get this virtually for free when it gets the next
> Calcite release.
> >
> >
> >
> >> On Apr 28, 2015, at 7:40 AM, Milinda Pathirage <mpath...@umail.iu.edu>
> wrote:
> >>
> >> Hi Julian,
> >>
> >> I am working on tumbling windows and hoping to have a look at other
> types of window aggregates next. I was trying to extract the window spec
> out from the aggregate operator (for tumbling window) and figure out that
> its impossible to infer tumbling window size from date time expressions or
> from an expression over any other type of monotonic field (such as row
> number for tuple based windows). So we were thinking of implementing
> aggregates like we normally implement stream aggregate in standard SQL
> (assuming group by fields are sorted) but with support for handling out of
> order arrivals. One difference in this method compared to stream aggregate
> from SQL is that an input row(s) can contribute to multiple outputs due to
> late arrivals. My plan is to emit the first result for tumbling window
> aggregate when we see a new tuple from the next window and emit result
> again if we get a tuple for an old window. We'll have a window closing
> policy where we will not handle tuples arriving after the window timeout.
> Yi's window operator design document contains most of the details required.
> What do you think about this approach to implement tumbling windows? We
> highly appreciate your feedback on this.
> >>
> >> Thanks
> >> Milinda
> >>
> >> On Mon, Apr 27, 2015 at 6:15 PM, Julian Hyde <jul...@hydromatic.net>
> wrote:
> >> Milinda,
> >>
> >> I have seen your work adding initial streaming SQL to Samza. Good stuff.
> >>
> >> Which types of query are you thinking of doing next?
> >>
> >> As of calcite-1.2, the streaming extensions are in Calcite’s master
> branch. (See
> https://github.com/apache/incubator-calcite/blob/master/doc/STREAM.md.)
> We are a couple of weeks away from the next Calcite release. If you need
> some work done in Calcite, now would be a good time.
> >>
> >> Julian
> >>
> >>
> >>
> >>
> >> --
> >> Milinda Pathirage
> >>
> >> PhD Student | Research Assistant
> >> School of Informatics and Computing | Data to Insight Center
> >> Indiana University
> >>
> >> twitter: milindalakmal
> >> skype: milinda.pathirage
> >> blog: http://milinda.pathirage.org
> >
> >
> >
> >
> > --
> > Milinda Pathirage
> >
> > PhD Student | Research Assistant
> > School of Informatics and Computing | Data to Insight Center
> > Indiana University
> >
> > twitter: milindalakmal
> > skype: milinda.pathirage
> > blog: http://milinda.pathirage.org
>
>

Reply via email to