Re: What next for streaming SQL?

2015-05-05 Thread Yi Pan
Hi, Julian,

Great! I am looking forward to it. Could you help to answer my question
regarding to the sliding windows in the previous email?

Thanks a lot!

-Yi

On Tue, May 5, 2015 at 10:46 AM, Julian Hyde jul...@hydromatic.net wrote:


 On May 4, 2015, at 10:52 AM, Yi Pan nickpa...@gmail.com wrote:

  Just one observation that I wanted to add in: I noted that actually any
  range-based query clause on an ordered stream essentially means the need
  for a windowing method in the ordered stream scan. Is it possible to
  identify a common syntax expression from the query parser in Calcite for
  any range-based clauses on a StreamScan operator and extract the window
  spec like:
  LogicalProject(...)
StreamScan(table=[[STREAMS, ORDERS]], fields=[[0,1,2,3]])
 
  To:
  LogicalProject(...)
LogicalWindow(WindowSpec from the range defining expression in
  LogicalProject)
   StreamScan(table=[[STREAMS, ORDERS]], fields=[[0,1,2,3]])

 Yes, I plan to do this.

 Calcite treats LogicalWindow is a bit differently from the other logical
 operators. A query that contains a windowed aggregate such as  “SUM(units)
 OVER window” is first translated to a LogicalProject that contains
 windowed aggregates as if they were ordinary function calls, then
 ProjectToWindowRule converts that LogicalProject to a LogicalWindow.

 But by the time Samza sees the relational expressions you can assume that
 all windowed aggregates have been moved into a LogicalWindow.

 I have logged https://issues.apache.org/jira/browse/CALCITE-713.

 Julian




Re: What next for streaming SQL?

2015-05-05 Thread Julian Hyde
Sliding windows are so different from tumbling windows that it doesn’t make 
sense to give them a “window key”. With sliding windows, every row has its own 
window. So, given a row R you determine what rows are in R’s window based on 
its windowing criteria, e.g. “rows between 5 preceding and 5 following 
partition by productId”, or “order by rowtime range ‘1’ minute preceding”.

There is no instantaneous flush of a hash table, as occurs in tumbling windows.

Julian


On May 5, 2015, at 4:04 PM, Yi Pan nickpa...@gmail.com wrote:

 Hi, Julian,
 
 Great! I am looking forward to it. Could you help to answer my question
 regarding to the sliding windows in the previous email?
 
 Thanks a lot!
 
 -Yi
 
 On Tue, May 5, 2015 at 10:46 AM, Julian Hyde jul...@hydromatic.net wrote:
 
 
 On May 4, 2015, at 10:52 AM, Yi Pan nickpa...@gmail.com wrote:
 
 Just one observation that I wanted to add in: I noted that actually any
 range-based query clause on an ordered stream essentially means the need
 for a windowing method in the ordered stream scan. Is it possible to
 identify a common syntax expression from the query parser in Calcite for
 any range-based clauses on a StreamScan operator and extract the window
 spec like:
 LogicalProject(...)
  StreamScan(table=[[STREAMS, ORDERS]], fields=[[0,1,2,3]])
 
 To:
 LogicalProject(...)
  LogicalWindow(WindowSpec from the range defining expression in
 LogicalProject)
 StreamScan(table=[[STREAMS, ORDERS]], fields=[[0,1,2,3]])
 
 Yes, I plan to do this.
 
 Calcite treats LogicalWindow is a bit differently from the other logical
 operators. A query that contains a windowed aggregate such as  “SUM(units)
 OVER window” is first translated to a LogicalProject that contains
 windowed aggregates as if they were ordinary function calls, then
 ProjectToWindowRule converts that LogicalProject to a LogicalWindow.
 
 But by the time Samza sees the relational expressions you can assume that
 all windowed aggregates have been moved into a LogicalWindow.
 
 I have logged https://issues.apache.org/jira/browse/CALCITE-713.
 
 Julian
 
 



Re: What next for streaming SQL?

2015-05-05 Thread Julian Hyde

On May 4, 2015, at 10:52 AM, Yi Pan nickpa...@gmail.com wrote:

 Just one observation that I wanted to add in: I noted that actually any
 range-based query clause on an ordered stream essentially means the need
 for a windowing method in the ordered stream scan. Is it possible to
 identify a common syntax expression from the query parser in Calcite for
 any range-based clauses on a StreamScan operator and extract the window
 spec like:
 LogicalProject(...)
   StreamScan(table=[[STREAMS, ORDERS]], fields=[[0,1,2,3]])
 
 To:
 LogicalProject(...)
   LogicalWindow(WindowSpec from the range defining expression in
 LogicalProject)
  StreamScan(table=[[STREAMS, ORDERS]], fields=[[0,1,2,3]])

Yes, I plan to do this.

Calcite treats LogicalWindow is a bit differently from the other logical 
operators. A query that contains a windowed aggregate such as  “SUM(units) OVER 
window” is first translated to a LogicalProject that contains windowed 
aggregates as if they were ordinary function calls, then ProjectToWindowRule 
converts that LogicalProject to a LogicalWindow.

But by the time Samza sees the relational expressions you can assume that all 
windowed aggregates have been moved into a LogicalWindow.

I have logged https://issues.apache.org/jira/browse/CALCITE-713.

Julian



Re: What next for streaming SQL?

2015-05-04 Thread Yi Pan
Hi, Julian,

Thanks for the reply. I want to add  a few more points here:
{quote}
Once you have computed that boundary and stored it in your data structure
you can keep on adding rows until you see one rowtime 11:00:00 or higher.
{quote}
 The above is not true when the incoming messages in the real distributed
system can NOT be strictly ordered by the rowtime. That's why in the
current implementation of the window operator in Samza, we will need to
know the window size to set the boundary of the window, mostly to calculate
the end of the window.
That said, the idea of passing the function to calculate the window
boundary is still useful in determining the opening of the new window. If
we call this function as wndKey = FLOOR(rowtime to HOUR), for each incoming
message, we can easily compute the wndKey and that would be enough for us
to determine whether we need to open a new window or not. In the tumbling
window case, if we trust the wndKey function does not generate overlapping
time windows, we can have a way to get around w/o keeping the end of the
window either.

However, this method has one problem I couldn't see how to overcome: for
sliding windows, a single rowtime may belong to multiple windows, each w/ a
different wndKey. How do we define a function to map a single rowtime to
multiple wndKeys in this case? Any suggestions?

Thanks!

-Yi

On Thu, Apr 30, 2015 at 1:43 PM, Julian Hyde jul...@hydromatic.net wrote:

 When I implemented this in the past, I found out that what I needed was
 not the window size exactly, but the next boundary value. For example, if I
 am using FLOOR(rowtime TO HOUR) and the current row is 10:03:00 then the
 next boundary will be 11:00:00. Once you have computed that boundary and
 stored it in your data structure you can keep on adding rows until you see
 one rowtime 11:00:00 or higher.

 For FLOOR(x TO timeUnit) there is an easy expression, namely CEIL(x TO
 timeUnit). There are analytic solutions for a lot of common monotonic
 expressions, and these can be discovered at query planning time.

 But for arbitrary monotonic functions (even UDFs — as long as you
 know/trust that they are monotonic) there is a solution. It is similar to
 the bisection method[1] of finding roots. Suppose that your value is
 100100010 (I give the value in binary so that that is easier to see how the
 algorithm works) and f(100100010) = 34. You know that

 You know: f(100100010) = 34
 Try: f(100100011) and get 34, so continue
 Try: f(100100010) and get 34, so continue
 Try:  f(100100100) and get 34, so continue
 Try: f(100101000) and get 36, so backtrack
 Try: f(100100111) and get 35, so backtrack
 Try: f(100100110) and get 34, so stop.

 You now now that 100100111 is the lowest value of x such that f(x) gives a
 value greater than 34. This approach takes at most one iteration for each
 bit (so, 64 bits for timestamp values), and works for any function.  This
 effort occurs at runtime, but 64 iterations for each tumbling window seems
 to me to be a negligible cost.

 Julian

 [1] http://en.wikipedia.org/wiki/Root-finding_algorithm#Bisection_method

 On Apr 30, 2015, at 7:29 AM, Milinda Pathirage mpath...@umail.iu.edu
 wrote:

  Hi Julian,
 
  You are correct, I was referring to
 https://github.com/apache/incubator-calcite/blob/master/doc/STREAM.md#tumbling-windows.
 What I meant by tumbling window size was size of the window by seconds,
 minutes or hours. Its possible to deduce the size for an expression like
 FLOOR(rowtime TO HOUR)”. But I couldn't still find a straight forward way
 to do it by traversing the logical query plan. But if this expression
 involves some other date/time calculations (For example to define 15 min
 tubmling window like this MySQL expression
 ROUND(UNIX_TIMESTAMP(timestamp)/(15 * 60)) AS timekey) how can we do the
 inferring during physical planning time? One way I can think is running
 some know ordered timestamps through this expression and use the output
 values to figure out the interval. But I am not sure whether this will work
 always.
 
  Another scenario is using 'milliseconds since epoch' to achieve the same
 tumbling window queries. Given that Avro doesn't support date or time
 values, I was trying to get this to work by having timestamp represented as
 a long value. In this case we can have any computation (not every
 computation is meaningful in the context of window aggregation, but it's a
 possibility) on the timestamp value even though not every possible
 expression will work. So I am not sure whether inferring this at the query
 planning time is possible or the right thing to do. May be we do as much as
 possible validations in query planner and let the queries fail during
 runtime for other cases.
 
  I agree that re-issuing totals can be problematic. May be we wait till
 the timeout to issue the result. But I am sure that we may also need more
 metadata and flags to handle these scenarios. I think Yi may have a better
 idea about this because he was 

Re: What next for streaming SQL?

2015-04-30 Thread Milinda Pathirage
Hi Julian,

You are correct, I was referring to
https://github.com/apache/incubator-calcite/blob/master/doc/STREAM.md#tumbling-windows.
What I meant by tumbling window size was size of the window by seconds,
minutes or hours. Its possible to deduce the size for an expression
like FLOOR(rowtime TO HOUR)”. But I couldn't still find a straight forward
way to do it by traversing the logical query plan. But if this expression
involves some other date/time calculations (For example to define 15 min
tubmling window like this MySQL expression ROUND(UNIX_TIMESTAMP(timestamp
)/(15 * 60)) AS timekey) how can we do the inferring during physical
planning time? One way I can think is running some know ordered timestamps
through this expression and use the output values to figure out the
interval. But I am not sure whether this will work always.

Another scenario is using 'milliseconds since epoch' to achieve the same
tumbling window queries. Given that Avro doesn't support date or time
values, I was trying to get this to work by having timestamp represented as
a long value. In this case we can have any computation (not every
computation is meaningful in the context of window aggregation, but it's a
possibility) on the timestamp value even though not every possible
expression will work. So I am not sure whether inferring this at the query
planning time is possible or the right thing to do. May be we do as much as
possible validations in query planner and let the queries fail during
runtime for other cases.

I agree that re-issuing totals can be problematic. May be we wait till the
timeout to issue the result. But I am sure that we may also need more
metadata and flags to handle these scenarios. I think Yi may have a better
idea about this because he was working on the window operator design. You
can find his design document here [1].

CALCITE-704 looks interesting. I'll have a look at it.

Thanks
Milinda

[1]
https://issues.apache.org/jira/secure/attachment/12708934/DESIGN-SAMZA-552-7.pdf

On Wed, Apr 29, 2015 at 8:29 PM, Julian Hyde jul...@hydromatic.net wrote:

 Can you give an example of the SQL syntax you are using for tumbling
 windows? Does it use GROUP BY and FLOOR, as in
 https://github.com/apache/incubator-calcite/blob/master/doc/STREAM.md#tumbling-windows
 ?

 What do you mean by “tumbling window size”? You can easily deduce that
 FLOOR(rowtime TO HOUR)” covers an hour range of the rowtime column, but to
 compute the number of rows or bytes you’d have to make assumptions about
 data rates and then you’d only get an estimate.

 Regarding re-issuing totals to incorporate late arrivals. It sounds
 useful, but you’ll have to be careful that it doesn’t screw up other
 operators downstream. Imagine that you have an aggregate followed by
 another aggregate that rolls it up. If the downstream operator isn’t
 expecting duplicates then it may double-count.

 I think it may be OK if the stream defines a primary key, specifies that
 there may be duplicates and the duplicates will be compacted. But in short,
 we need more metadata, because the consumer is a dumb operator not a smart
 human.

 Do you have a URL for Yi’s design doc?

 By the way, I am just about to check in a patch for
 https://issues.apache.org/jira/browse/CALCITE-704 “FILTER clause for
 aggregate functions”. I think it would be really useful for streaming
 queries, because you can’t afford to re-run the query for a subset of the
 data. Samza-sql should get this virtually for free when it gets the next
 Calcite release.



 On Apr 28, 2015, at 7:40 AM, Milinda Pathirage mpath...@umail.iu.edu
 wrote:

 Hi Julian,

 I am working on tumbling windows and hoping to have a look at other types
 of window aggregates next. I was trying to extract the window spec out from
 the aggregate operator (for tumbling window) and figure out that its
 impossible to infer tumbling window size from date time expressions or from
 an expression over any other type of monotonic field (such as row number
 for tuple based windows). So we were thinking of implementing aggregates
 like we normally implement stream aggregate in standard SQL (assuming group
 by fields are sorted) but with support for handling out of order arrivals.
 One difference in this method compared to stream aggregate from SQL is that
 an input row(s) can contribute to multiple outputs due to late arrivals. My
 plan is to emit the first result for tumbling window aggregate when we see
 a new tuple from the next window and emit result again if we get a tuple
 for an old window. We'll have a window closing policy where we will not
 handle tuples arriving after the window timeout. Yi's window operator
 design document contains most of the details required. What do you think
 about this approach to implement tumbling windows? We highly appreciate
 your feedback on this.

 Thanks
 Milinda

 On Mon, Apr 27, 2015 at 6:15 PM, Julian Hyde jul...@hydromatic.net
 wrote:

 Milinda,

 I have seen your work adding initial 

Re: What next for streaming SQL?

2015-04-30 Thread Julian Hyde
When I implemented this in the past, I found out that what I needed was not the 
window size exactly, but the next boundary value. For example, if I am using 
FLOOR(rowtime TO HOUR) and the current row is 10:03:00 then the next boundary 
will be 11:00:00. Once you have computed that boundary and stored it in your 
data structure you can keep on adding rows until you see one rowtime 11:00:00 
or higher.

For FLOOR(x TO timeUnit) there is an easy expression, namely CEIL(x TO 
timeUnit). There are analytic solutions for a lot of common monotonic 
expressions, and these can be discovered at query planning time.

But for arbitrary monotonic functions (even UDFs — as long as you know/trust 
that they are monotonic) there is a solution. It is similar to the bisection 
method[1] of finding roots. Suppose that your value is 100100010 (I give the 
value in binary so that that is easier to see how the algorithm works) and 
f(100100010) = 34. You know that

You know: f(100100010) = 34
Try: f(100100011) and get 34, so continue
Try: f(100100010) and get 34, so continue
Try:  f(100100100) and get 34, so continue
Try: f(100101000) and get 36, so backtrack
Try: f(100100111) and get 35, so backtrack
Try: f(100100110) and get 34, so stop.

You now now that 100100111 is the lowest value of x such that f(x) gives a 
value greater than 34. This approach takes at most one iteration for each bit 
(so, 64 bits for timestamp values), and works for any function.  This effort 
occurs at runtime, but 64 iterations for each tumbling window seems to me to be 
a negligible cost.

Julian

[1] http://en.wikipedia.org/wiki/Root-finding_algorithm#Bisection_method

On Apr 30, 2015, at 7:29 AM, Milinda Pathirage mpath...@umail.iu.edu wrote:

 Hi Julian,
 
 You are correct, I was referring to 
 https://github.com/apache/incubator-calcite/blob/master/doc/STREAM.md#tumbling-windows.
  What I meant by tumbling window size was size of the window by seconds, 
 minutes or hours. Its possible to deduce the size for an expression like 
 FLOOR(rowtime TO HOUR)”. But I couldn't still find a straight forward way to 
 do it by traversing the logical query plan. But if this expression involves 
 some other date/time calculations (For example to define 15 min tubmling 
 window like this MySQL expression ROUND(UNIX_TIMESTAMP(timestamp)/(15 * 60)) 
 AS timekey) how can we do the inferring during physical planning time? One 
 way I can think is running some know ordered timestamps through this 
 expression and use the output values to figure out the interval. But I am not 
 sure whether this will work always. 
 
 Another scenario is using 'milliseconds since epoch' to achieve the same 
 tumbling window queries. Given that Avro doesn't support date or time values, 
 I was trying to get this to work by having timestamp represented as a long 
 value. In this case we can have any computation (not every computation is 
 meaningful in the context of window aggregation, but it's a possibility) on 
 the timestamp value even though not every possible expression will work. So I 
 am not sure whether inferring this at the query planning time is possible or 
 the right thing to do. May be we do as much as possible validations in query 
 planner and let the queries fail during runtime for other cases.
 
 I agree that re-issuing totals can be problematic. May be we wait till the 
 timeout to issue the result. But I am sure that we may also need more 
 metadata and flags to handle these scenarios. I think Yi may have a better 
 idea about this because he was working on the window operator design. You can 
 find his design document here [1]. 
 
 CALCITE-704 looks interesting. I'll have a look at it.
 
 Thanks
 Milinda
 
 [1] 
 https://issues.apache.org/jira/secure/attachment/12708934/DESIGN-SAMZA-552-7.pdf
 
 On Wed, Apr 29, 2015 at 8:29 PM, Julian Hyde jul...@hydromatic.net wrote:
 Can you give an example of the SQL syntax you are using for tumbling windows? 
 Does it use GROUP BY and FLOOR, as in 
 https://github.com/apache/incubator-calcite/blob/master/doc/STREAM.md#tumbling-windows?
 
 What do you mean by “tumbling window size”? You can easily deduce that 
 FLOOR(rowtime TO HOUR)” covers an hour range of the rowtime column, but to 
 compute the number of rows or bytes you’d have to make assumptions about data 
 rates and then you’d only get an estimate.
 
 Regarding re-issuing totals to incorporate late arrivals. It sounds useful, 
 but you’ll have to be careful that it doesn’t screw up other operators 
 downstream. Imagine that you have an aggregate followed by another aggregate 
 that rolls it up. If the downstream operator isn’t expecting duplicates then 
 it may double-count.
 
 I think it may be OK if the stream defines a primary key, specifies that 
 there may be duplicates and the duplicates will be compacted. But in short, 
 we need more metadata, because the consumer is a dumb operator not a smart 
 human.
 
 Do you have a URL for Yi’s design doc?
 
 By the way, 

Re: What next for streaming SQL?

2015-04-29 Thread Julian Hyde
Can you give an example of the SQL syntax you are using for tumbling windows? 
Does it use GROUP BY and FLOOR, as in 
https://github.com/apache/incubator-calcite/blob/master/doc/STREAM.md#tumbling-windows
 
https://github.com/apache/incubator-calcite/blob/master/doc/STREAM.md#tumbling-windows?

What do you mean by “tumbling window size”? You can easily deduce that 
FLOOR(rowtime TO HOUR)” covers an hour range of the rowtime column, but to 
compute the number of rows or bytes you’d have to make assumptions about data 
rates and then you’d only get an estimate.

Regarding re-issuing totals to incorporate late arrivals. It sounds useful, but 
you’ll have to be careful that it doesn’t screw up other operators downstream. 
Imagine that you have an aggregate followed by another aggregate that rolls it 
up. If the downstream operator isn’t expecting duplicates then it may 
double-count.

I think it may be OK if the stream defines a primary key, specifies that there 
may be duplicates and the duplicates will be compacted. But in short, we need 
more metadata, because the consumer is a dumb operator not a smart human.

Do you have a URL for Yi’s design doc?

By the way, I am just about to check in a patch for 
https://issues.apache.org/jira/browse/CALCITE-704 
https://issues.apache.org/jira/browse/CALCITE-704 “FILTER clause for 
aggregate functions”. I think it would be really useful for streaming queries, 
because you can’t afford to re-run the query for a subset of the data. 
Samza-sql should get this virtually for free when it gets the next Calcite 
release.


 On Apr 28, 2015, at 7:40 AM, Milinda Pathirage mpath...@umail.iu.edu wrote:
 
 Hi Julian,
 
 I am working on tumbling windows and hoping to have a look at other types of 
 window aggregates next. I was trying to extract the window spec out from the 
 aggregate operator (for tumbling window) and figure out that its impossible 
 to infer tumbling window size from date time expressions or from an 
 expression over any other type of monotonic field (such as row number for 
 tuple based windows). So we were thinking of implementing aggregates like we 
 normally implement stream aggregate in standard SQL (assuming group by fields 
 are sorted) but with support for handling out of order arrivals. One 
 difference in this method compared to stream aggregate from SQL is that an 
 input row(s) can contribute to multiple outputs due to late arrivals. My plan 
 is to emit the first result for tumbling window aggregate when we see a new 
 tuple from the next window and emit result again if we get a tuple for an old 
 window. We'll have a window closing policy where we will not handle tuples 
 arriving after the window timeout. Yi's window operator design document 
 contains most of the details required. What do you think about this approach 
 to implement tumbling windows? We highly appreciate your feedback on this.
 
 Thanks
 Milinda
 
 On Mon, Apr 27, 2015 at 6:15 PM, Julian Hyde jul...@hydromatic.net 
 mailto:jul...@hydromatic.net wrote:
 Milinda,
 
 I have seen your work adding initial streaming SQL to Samza. Good stuff.
 
 Which types of query are you thinking of doing next?
 
 As of calcite-1.2, the streaming extensions are in Calcite’s master branch. 
 (See https://github.com/apache/incubator-calcite/blob/master/doc/STREAM.md 
 https://github.com/apache/incubator-calcite/blob/master/doc/STREAM.md.) We 
 are a couple of weeks away from the next Calcite release. If you need some 
 work done in Calcite, now would be a good time.
 
 Julian
 
 
 
 
 -- 
 Milinda Pathirage
 
 PhD Student | Research Assistant
 School of Informatics and Computing | Data to Insight Center
 Indiana University
 
 twitter: milindalakmal
 skype: milinda.pathirage
 blog: http://milinda.pathirage.org http://milinda.pathirage.org/