Re: Query on retract stream

2019-01-27 Thread Gagan Agrawal
Thanks Hequn for sharing those details. Looking forward for Blink
integration.
I have one doubt around one of your earlier statements

*> Also, currently, the window doesn't have the ability to handle
retraction messages*

When we use multi window (as you suggested), it is able to handle updates.
So what does this statement really mean? Does that mean using multi window
is just a work around as with single window it's not able to handle
retraction messages?

Also wanted to confirm if tumbling window in Table/SQL api can handle late
data (i.e data arriving after window has closed), do we have something
similar to Datastream apj which has allowedLateness feature? You already
mentioned that for sliding window it can not handle late data. But does
that apply for Tumbling window as well?

One of the challenge in using unbounded aggregates in Table api is around
state retention. As I understand only way to clear old state is via query
config on idleTimeRetention. However it's a global parameter and not per
aggregate parameter. So in my flink job, if I want mix of minute, hourly
and daily aggregates, I will have to keep idleTimeRetention to minimum of
day which means all minute level aggregations will also exist for entire
day and hence would lead to increase in state size.

Gagan

Gagan



On Sun, Jan 27, 2019 at 9:42 AM Hequn Cheng  wrote:

> Hi Gagan,
>
> Besides the eventime and proctime difference, there is another difference
> between the two ways. The window aggregate on bounded data, while unbounded
> aggregate on unbounded data, i.e., the new coming data can update a very
> old data.
>
> As for the performance, I think the two ways may have no big difference in
> current Flink version. Maybe you can run some tests between them on your
> own scenarios if both of them can solve your problem. FYI: There is a nice
> discussion[1] raised by Timo recently. Once Blink is merged into Flink, the
> unbounded aggregate will be much faster than the window.
>
> Best,
> Hequn
>
> [1] https://lists.apache.org/list.html?d...@flink.apache.org:lte=1M:FLIP-32
>
>
> On Sat, Jan 26, 2019 at 4:11 PM Gagan Agrawal 
> wrote:
>
>> Thanks Hequn for suggested solutions and I think this should really work
>> and will give it a try. As I understand First solution  of using multiple
>> windows will be good for those scenarios where I want output to be
>> generated post window is materialized (i.e. watermark reaches end of
>> window). And second will be good if I want it to be fired on per event
>> basis (i.e no watermarking). Apart from this, do you see any difference
>> from performance perspective in choosing between the two or both should be
>> equally performant?
>>
>> Gagan
>>
>> On Sat, Jan 26, 2019 at 11:50 AM Hequn Cheng 
>> wrote:
>>
>>> Hi Gagan,
>>>
>>> Time attribute fields will be materialized by the unbounded groupby.
>>> Also, currently, the window doesn't have the ability to handle retraction
>>> messages. I see two ways to solve the problem.
>>>
>>> - Use multi-window.  The first window performs lastValue, the second
>>> performs count.
>>> - Use two non-window aggregates. In this case, you don't have to change
>>> anything for the first aggregate. For the second one, you can group by an
>>> hour field and perform count(). The code looks like:
>>>
>>> SELECT userId,
>>>  count(orderId)
>>> FROM
>>> (SELECT orderId,
>>>  getHour(orderTime) as myHour,
>>>  lastValue(userId) AS userId,
>>>  lastValue(status) AS status
>>> FROM orders
>>> GROUP BY  orderId, orderTime)
>>> WHERE status='PENDING'
>>> GROUP BY myHour, userId
>>>
>>> Best,
>>> Hequn
>>>
>>>
>>>
>>>
>>> On Sat, Jan 26, 2019 at 12:29 PM Gagan Agrawal 
>>> wrote:
>>>
 Based on the suggestions in this mail thread, I tried out few
 experiments on upsert stream with flink 1.7.1 and here is the issue I am
 facing with window stream.

 *1. Global Pending order count. *
 Following query works fine and it's able to handle updates as per
 original requirement.

 select userId, count(orderId) from
 (select orderId, lastValue(userId) as userId, lastValue(status) as
 status from orders group by orderId)
 where status='PENDING' group by userId

 *2. Last 1 Hour tumbling window count (Append stream)*
 Though following query doesn't handle upsert stream, I just tried to
 make sure time column is working fine. This is working, but as expected, it
 doesn't handle updates on orderId.

 select userId, count(orderId) from orders
 where status='PENDING' group by TUMBLE(orderTime, INTERVAL '1' HOUR),
 userId

 3. *Last 1 Hour tumbling window count (With upsert stream)*
 Now I tried combination of above two where input stream is converted to
 upsert stream (via lastValue aggregate function) and then Pending count
 needs to be calculated in last 1 hour window.

 select userId, count(orderId) from
 (select orderId, orderTime, l

Re: Query on retract stream

2019-01-26 Thread Hequn Cheng
Hi Gagan,

Besides the eventime and proctime difference, there is another difference
between the two ways. The window aggregate on bounded data, while unbounded
aggregate on unbounded data, i.e., the new coming data can update a very
old data.

As for the performance, I think the two ways may have no big difference in
current Flink version. Maybe you can run some tests between them on your
own scenarios if both of them can solve your problem. FYI: There is a nice
discussion[1] raised by Timo recently. Once Blink is merged into Flink, the
unbounded aggregate will be much faster than the window.

Best,
Hequn

[1] https://lists.apache.org/list.html?d...@flink.apache.org:lte=1M:FLIP-32


On Sat, Jan 26, 2019 at 4:11 PM Gagan Agrawal 
wrote:

> Thanks Hequn for suggested solutions and I think this should really work
> and will give it a try. As I understand First solution  of using multiple
> windows will be good for those scenarios where I want output to be
> generated post window is materialized (i.e. watermark reaches end of
> window). And second will be good if I want it to be fired on per event
> basis (i.e no watermarking). Apart from this, do you see any difference
> from performance perspective in choosing between the two or both should be
> equally performant?
>
> Gagan
>
> On Sat, Jan 26, 2019 at 11:50 AM Hequn Cheng  wrote:
>
>> Hi Gagan,
>>
>> Time attribute fields will be materialized by the unbounded groupby.
>> Also, currently, the window doesn't have the ability to handle retraction
>> messages. I see two ways to solve the problem.
>>
>> - Use multi-window.  The first window performs lastValue, the second
>> performs count.
>> - Use two non-window aggregates. In this case, you don't have to change
>> anything for the first aggregate. For the second one, you can group by an
>> hour field and perform count(). The code looks like:
>>
>> SELECT userId,
>>  count(orderId)
>> FROM
>> (SELECT orderId,
>>  getHour(orderTime) as myHour,
>>  lastValue(userId) AS userId,
>>  lastValue(status) AS status
>> FROM orders
>> GROUP BY  orderId, orderTime)
>> WHERE status='PENDING'
>> GROUP BY myHour, userId
>>
>> Best,
>> Hequn
>>
>>
>>
>>
>> On Sat, Jan 26, 2019 at 12:29 PM Gagan Agrawal 
>> wrote:
>>
>>> Based on the suggestions in this mail thread, I tried out few
>>> experiments on upsert stream with flink 1.7.1 and here is the issue I am
>>> facing with window stream.
>>>
>>> *1. Global Pending order count. *
>>> Following query works fine and it's able to handle updates as per
>>> original requirement.
>>>
>>> select userId, count(orderId) from
>>> (select orderId, lastValue(userId) as userId, lastValue(status) as
>>> status from orders group by orderId)
>>> where status='PENDING' group by userId
>>>
>>> *2. Last 1 Hour tumbling window count (Append stream)*
>>> Though following query doesn't handle upsert stream, I just tried to
>>> make sure time column is working fine. This is working, but as expected, it
>>> doesn't handle updates on orderId.
>>>
>>> select userId, count(orderId) from orders
>>> where status='PENDING' group by TUMBLE(orderTime, INTERVAL '1' HOUR),
>>> userId
>>>
>>> 3. *Last 1 Hour tumbling window count (With upsert stream)*
>>> Now I tried combination of above two where input stream is converted to
>>> upsert stream (via lastValue aggregate function) and then Pending count
>>> needs to be calculated in last 1 hour window.
>>>
>>> select userId, count(orderId) from
>>> (select orderId, orderTime, lastValue(userId) as userId,
>>> lastValue(status) as status from orders group by orderId, orderTime)
>>> where status='PENDING' group by TUMBLE(orderTime, INTERVAL '1' HOUR),
>>> userId
>>>
>>> This one gives me following error. Is this because I have added
>>> orderTime in group by/select clause and hence it's time characteristics
>>> have changed? What is the workaround here as without adding orderTime, I
>>> can not perform window aggregation on upsert stream.
>>>
>>> [error] Exception in thread "main"
>>> org.apache.flink.table.api.ValidationException:* Window can only be
>>> defined over a time attribute column.*
>>> [error] at
>>> org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:84)
>>> [error] at
>>> org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:89)
>>> [error] at
>>> org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:65)
>>> [error] at
>>> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:315)
>>> [error] at
>>> org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:556)
>>> [error] at
>>> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:415)
>>> [error] at
>>> org.a

Re: Query on retract stream

2019-01-26 Thread Gagan Agrawal
Thanks Hequn for suggested solutions and I think this should really work
and will give it a try. As I understand First solution  of using multiple
windows will be good for those scenarios where I want output to be
generated post window is materialized (i.e. watermark reaches end of
window). And second will be good if I want it to be fired on per event
basis (i.e no watermarking). Apart from this, do you see any difference
from performance perspective in choosing between the two or both should be
equally performant?

Gagan

On Sat, Jan 26, 2019 at 11:50 AM Hequn Cheng  wrote:

> Hi Gagan,
>
> Time attribute fields will be materialized by the unbounded groupby. Also,
> currently, the window doesn't have the ability to handle retraction
> messages. I see two ways to solve the problem.
>
> - Use multi-window.  The first window performs lastValue, the second
> performs count.
> - Use two non-window aggregates. In this case, you don't have to change
> anything for the first aggregate. For the second one, you can group by an
> hour field and perform count(). The code looks like:
>
> SELECT userId,
>  count(orderId)
> FROM
> (SELECT orderId,
>  getHour(orderTime) as myHour,
>  lastValue(userId) AS userId,
>  lastValue(status) AS status
> FROM orders
> GROUP BY  orderId, orderTime)
> WHERE status='PENDING'
> GROUP BY myHour, userId
>
> Best,
> Hequn
>
>
>
>
> On Sat, Jan 26, 2019 at 12:29 PM Gagan Agrawal 
> wrote:
>
>> Based on the suggestions in this mail thread, I tried out few experiments
>> on upsert stream with flink 1.7.1 and here is the issue I am facing with
>> window stream.
>>
>> *1. Global Pending order count. *
>> Following query works fine and it's able to handle updates as per
>> original requirement.
>>
>> select userId, count(orderId) from
>> (select orderId, lastValue(userId) as userId, lastValue(status) as status
>> from orders group by orderId)
>> where status='PENDING' group by userId
>>
>> *2. Last 1 Hour tumbling window count (Append stream)*
>> Though following query doesn't handle upsert stream, I just tried to make
>> sure time column is working fine. This is working, but as expected, it
>> doesn't handle updates on orderId.
>>
>> select userId, count(orderId) from orders
>> where status='PENDING' group by TUMBLE(orderTime, INTERVAL '1' HOUR),
>> userId
>>
>> 3. *Last 1 Hour tumbling window count (With upsert stream)*
>> Now I tried combination of above two where input stream is converted to
>> upsert stream (via lastValue aggregate function) and then Pending count
>> needs to be calculated in last 1 hour window.
>>
>> select userId, count(orderId) from
>> (select orderId, orderTime, lastValue(userId) as userId,
>> lastValue(status) as status from orders group by orderId, orderTime)
>> where status='PENDING' group by TUMBLE(orderTime, INTERVAL '1' HOUR),
>> userId
>>
>> This one gives me following error. Is this because I have added orderTime
>> in group by/select clause and hence it's time characteristics have changed?
>> What is the workaround here as without adding orderTime, I can not perform
>> window aggregation on upsert stream.
>>
>> [error] Exception in thread "main"
>> org.apache.flink.table.api.ValidationException:* Window can only be
>> defined over a time attribute column.*
>> [error] at
>> org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:84)
>> [error] at
>> org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:89)
>> [error] at
>> org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:65)
>> [error] at
>> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:315)
>> [error] at
>> org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:556)
>> [error] at
>> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:415)
>> [error] at
>> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:252)
>> [error] at
>> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
>> [error] at
>> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:211)
>> [error] at
>> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:198)
>> [error] at
>> org.apache.flink.table.api.TableEnvironment.runHepPlanner(TableEnvironment.scala:360)
>> [error] at
>> org.apache.flink.table.api.TableEnvironment.runHepPlannerSequentially(TableEnvironment.scala:326)
>> [error] at
>> org.apache.flink.table.api.TableEnvironment.optimizeNormalizeLogicalPlan(TableEnvironment.scala:282)
>> [error] at
>> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:8

Re: Query on retract stream

2019-01-25 Thread Hequn Cheng
Hi Gagan,

Time attribute fields will be materialized by the unbounded groupby. Also,
currently, the window doesn't have the ability to handle retraction
messages. I see two ways to solve the problem.

- Use multi-window.  The first window performs lastValue, the second
performs count.
- Use two non-window aggregates. In this case, you don't have to change
anything for the first aggregate. For the second one, you can group by an
hour field and perform count(). The code looks like:

SELECT userId,
 count(orderId)
FROM
(SELECT orderId,
 getHour(orderTime) as myHour,
 lastValue(userId) AS userId,
 lastValue(status) AS status
FROM orders
GROUP BY  orderId, orderTime)
WHERE status='PENDING'
GROUP BY myHour, userId

Best,
Hequn




On Sat, Jan 26, 2019 at 12:29 PM Gagan Agrawal 
wrote:

> Based on the suggestions in this mail thread, I tried out few experiments
> on upsert stream with flink 1.7.1 and here is the issue I am facing with
> window stream.
>
> *1. Global Pending order count. *
> Following query works fine and it's able to handle updates as per original
> requirement.
>
> select userId, count(orderId) from
> (select orderId, lastValue(userId) as userId, lastValue(status) as status
> from orders group by orderId)
> where status='PENDING' group by userId
>
> *2. Last 1 Hour tumbling window count (Append stream)*
> Though following query doesn't handle upsert stream, I just tried to make
> sure time column is working fine. This is working, but as expected, it
> doesn't handle updates on orderId.
>
> select userId, count(orderId) from orders
> where status='PENDING' group by TUMBLE(orderTime, INTERVAL '1' HOUR),
> userId
>
> 3. *Last 1 Hour tumbling window count (With upsert stream)*
> Now I tried combination of above two where input stream is converted to
> upsert stream (via lastValue aggregate function) and then Pending count
> needs to be calculated in last 1 hour window.
>
> select userId, count(orderId) from
> (select orderId, orderTime, lastValue(userId) as userId, lastValue(status)
> as status from orders group by orderId, orderTime)
> where status='PENDING' group by TUMBLE(orderTime, INTERVAL '1' HOUR),
> userId
>
> This one gives me following error. Is this because I have added orderTime
> in group by/select clause and hence it's time characteristics have changed?
> What is the workaround here as without adding orderTime, I can not perform
> window aggregation on upsert stream.
>
> [error] Exception in thread "main"
> org.apache.flink.table.api.ValidationException:* Window can only be
> defined over a time attribute column.*
> [error] at
> org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:84)
> [error] at
> org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:89)
> [error] at
> org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:65)
> [error] at
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:315)
> [error] at
> org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:556)
> [error] at
> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:415)
> [error] at
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:252)
> [error] at
> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
> [error] at
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:211)
> [error] at
> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:198)
> [error] at
> org.apache.flink.table.api.TableEnvironment.runHepPlanner(TableEnvironment.scala:360)
> [error] at
> org.apache.flink.table.api.TableEnvironment.runHepPlannerSequentially(TableEnvironment.scala:326)
> [error] at
> org.apache.flink.table.api.TableEnvironment.optimizeNormalizeLogicalPlan(TableEnvironment.scala:282)
> [error] at
> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:811)
> [error] at
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
> [error] at
> org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:205)
> [error] at
> org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:185)
>
> Gagan
>
> On Tue, Jan 22, 2019 at 7:01 PM Gagan Agrawal 
> wrote:
>
>> Thanks Hequn for your response. I initially thought of trying out "over
>> window" clause, however as per documentation there seems to be limitation
>> in "orderBy" clause where it allows only single time event/processing time
>> attribute. Whereas in my 

Re: Query on retract stream

2019-01-25 Thread Gagan Agrawal
Based on the suggestions in this mail thread, I tried out few experiments
on upsert stream with flink 1.7.1 and here is the issue I am facing with
window stream.

*1. Global Pending order count. *
Following query works fine and it's able to handle updates as per original
requirement.

select userId, count(orderId) from
(select orderId, lastValue(userId) as userId, lastValue(status) as status
from orders group by orderId)
where status='PENDING' group by userId

*2. Last 1 Hour tumbling window count (Append stream)*
Though following query doesn't handle upsert stream, I just tried to make
sure time column is working fine. This is working, but as expected, it
doesn't handle updates on orderId.

select userId, count(orderId) from orders
where status='PENDING' group by TUMBLE(orderTime, INTERVAL '1' HOUR), userId

3. *Last 1 Hour tumbling window count (With upsert stream)*
Now I tried combination of above two where input stream is converted to
upsert stream (via lastValue aggregate function) and then Pending count
needs to be calculated in last 1 hour window.

select userId, count(orderId) from
(select orderId, orderTime, lastValue(userId) as userId, lastValue(status)
as status from orders group by orderId, orderTime)
where status='PENDING' group by TUMBLE(orderTime, INTERVAL '1' HOUR), userId

This one gives me following error. Is this because I have added orderTime
in group by/select clause and hence it's time characteristics have changed?
What is the workaround here as without adding orderTime, I can not perform
window aggregation on upsert stream.

[error] Exception in thread "main"
org.apache.flink.table.api.ValidationException:* Window can only be defined
over a time attribute column.*
[error] at
org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:84)
[error] at
org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:89)
[error] at
org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:65)
[error] at
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:315)
[error] at
org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:556)
[error] at
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:415)
[error] at
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:252)
[error] at
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
[error] at
org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:211)
[error] at
org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:198)
[error] at
org.apache.flink.table.api.TableEnvironment.runHepPlanner(TableEnvironment.scala:360)
[error] at
org.apache.flink.table.api.TableEnvironment.runHepPlannerSequentially(TableEnvironment.scala:326)
[error] at
org.apache.flink.table.api.TableEnvironment.optimizeNormalizeLogicalPlan(TableEnvironment.scala:282)
[error] at
org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:811)
[error] at
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
[error] at
org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:205)
[error] at
org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:185)

Gagan

On Tue, Jan 22, 2019 at 7:01 PM Gagan Agrawal 
wrote:

> Thanks Hequn for your response. I initially thought of trying out "over
> window" clause, however as per documentation there seems to be limitation
> in "orderBy" clause where it allows only single time event/processing time
> attribute. Whereas in my case events are getting generated from mysql bin
> log where I have seen multiple event updates getting generated with same
> timestamp (may be because they are part of same transaction) and hence will
> need bin log offset along with timestamp to be able to sort them correctly.
> So looks like can't use "over window" until it allows multiple columns in
> "orderBy". I am exploring option of creating my own window as you suggested
> to be more flexible.
>
> Gagan
>
> On Tue, Jan 22, 2019 at 7:23 AM Hequn Cheng  wrote:
>
>> Hi Gagan,
>>
>> > But I also have a requirement for event time based sliding window
>> aggregation
>>
>> Yes, you can achieve this with Flink TableAPI/SQL. However, currently,
>> sliding windows don't support early fire, i.e., only output results when
>> event time reaches the end of the window. Once window fires, the window
>> state will be cleared and late data belonging to this window will be
>> ignored. In order to wait for the late event, you can extract

Re: Query on retract stream

2019-01-22 Thread Gagan Agrawal
Thanks Hequn for your response. I initially thought of trying out "over
window" clause, however as per documentation there seems to be limitation
in "orderBy" clause where it allows only single time event/processing time
attribute. Whereas in my case events are getting generated from mysql bin
log where I have seen multiple event updates getting generated with same
timestamp (may be because they are part of same transaction) and hence will
need bin log offset along with timestamp to be able to sort them correctly.
So looks like can't use "over window" until it allows multiple columns in
"orderBy". I am exploring option of creating my own window as you suggested
to be more flexible.

Gagan

On Tue, Jan 22, 2019 at 7:23 AM Hequn Cheng  wrote:

> Hi Gagan,
>
> > But I also have a requirement for event time based sliding window
> aggregation
>
> Yes, you can achieve this with Flink TableAPI/SQL. However, currently,
> sliding windows don't support early fire, i.e., only output results when
> event time reaches the end of the window. Once window fires, the window
> state will be cleared and late data belonging to this window will be
> ignored. In order to wait for the late event, you can extract
> watermark with an offset from the timestamp. For example, make watermark =
> timestamp - 5min.
>
> If event time and early fire is a strong requirement in your scenarios,
> you can probably use an over window[1] to solve your problem, say an over
> window with 1h preceding. Over window outputs a result for each input.
>
> If the above solutions can't meet your requirements, you can write a
> DataStream job in which define your own window logic[2].
>
> Best, Hequn
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/tableApi.html#over-windows
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html
>
>
>
> On Tue, Jan 22, 2019 at 12:58 AM Gagan Agrawal 
> wrote:
>
>> Thank you guys. It's great to hear multiple solutions to achieve this. I
>> understand that records once emitted to Kafka can not be deleted and that's
>> acceptable for our use case as last updated value should always be correct.
>> However as I understand most of these solutions will work for global
>> aggregation which was asked in original question. But I also have
>> requirement for event time based sliding window aggregation where same
>> order count needs to be maintained for past x hours window (sliding at say
>> every 5 minutes). Is it possible to achieve with Table Api / SQL at the
>> moment or will require some custom implementation?
>>
>> For window based upsert stream, there can be few scenarios.
>>
>> 1. An update to record key comes in same window. E.g Pending (t1) ->
>> Success (t2) happens in same window w1. In this case once window
>> aggregation is triggered/emitted, such records will be counted as 0
>> 2. An update to record key belongs to same window but arrives late. In
>> this case old(and already emitted)  window (w1) needs to be re-emitted with
>> decreased value.
>> 3. An update to record key comes in different window. E.g Pending (t1) in
>> window w1 and Success (t2) in w2. I think in this case it may not require
>> to re-emit old window w1 as it represents pending count till that window
>> time (w1) which is still valid as record moved to Success in next window w2
>> (based on event time).
>>
>> Gagan
>>
>>
>> On Mon, Jan 21, 2019 at 8:31 PM Piotr Nowojski 
>> wrote:
>>
>>> @Jeff: It depends if user can define a time window for his condition.
>>> As Gagan described his problem it was about “global” threshold of pending
>>> orders.
>>>
>>>
>>>
>>> I have just thought about another solution that should work without any
>>> custom code. Converting “status” field to status_value int:
>>> - "+1” for pending
>>> - “-1” for success/failure
>>> - “0” otherwise
>>>
>>> Then running:
>>>
>>> SELECT uid, SUM(status_value) FROM … GROUP BY uid;
>>>
>>> Query on top of such stream. Conversion to integers could be made by
>>> using `CASE` expression.
>>>
>>> One thing to note here is that probably all of the proposed solutions
>>> would work based on the order of the records, not based on the event_time.
>>>
>>> Piotrek
>>>
>>> On 21 Jan 2019, at 15:10, Jeff Zhang  wrote:
>>>
>>> I am thinking of another approach instead of retract stream. Is it
>>> possible to define a custom window to do this ? This window is defined for
>>> each order. And then you just need to analyze the events in this window.
>>>
>>> Piotr Nowojski  于2019年1月21日周一 下午8:44写道:
>>>
 Hi,

 There is a missing feature in Flink Table API/SQL of supporting
 retraction streams as the input (or conversions from append stream to
 retraction stream) at the moment. With that your problem would simplify to
 one simple `SELECT uid, count(*) FROM Changelog GROUP BY uid`. There is an
 ongoing work with related work [1], so this might be supported in the next
 couple of months.

 There 

Re: Query on retract stream

2019-01-21 Thread Hequn Cheng
Hi Gagan,

> But I also have a requirement for event time based sliding window
aggregation

Yes, you can achieve this with Flink TableAPI/SQL. However, currently,
sliding windows don't support early fire, i.e., only output results when
event time reaches the end of the window. Once window fires, the window
state will be cleared and late data belonging to this window will be
ignored. In order to wait for the late event, you can extract
watermark with an offset from the timestamp. For example, make watermark =
timestamp - 5min.

If event time and early fire is a strong requirement in your scenarios, you
can probably use an over window[1] to solve your problem, say an over
window with 1h preceding. Over window outputs a result for each input.

If the above solutions can't meet your requirements, you can write a
DataStream job in which define your own window logic[2].

Best, Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/tableApi.html#over-windows
[2]
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html



On Tue, Jan 22, 2019 at 12:58 AM Gagan Agrawal 
wrote:

> Thank you guys. It's great to hear multiple solutions to achieve this. I
> understand that records once emitted to Kafka can not be deleted and that's
> acceptable for our use case as last updated value should always be correct.
> However as I understand most of these solutions will work for global
> aggregation which was asked in original question. But I also have
> requirement for event time based sliding window aggregation where same
> order count needs to be maintained for past x hours window (sliding at say
> every 5 minutes). Is it possible to achieve with Table Api / SQL at the
> moment or will require some custom implementation?
>
> For window based upsert stream, there can be few scenarios.
>
> 1. An update to record key comes in same window. E.g Pending (t1) ->
> Success (t2) happens in same window w1. In this case once window
> aggregation is triggered/emitted, such records will be counted as 0
> 2. An update to record key belongs to same window but arrives late. In
> this case old(and already emitted)  window (w1) needs to be re-emitted with
> decreased value.
> 3. An update to record key comes in different window. E.g Pending (t1) in
> window w1 and Success (t2) in w2. I think in this case it may not require
> to re-emit old window w1 as it represents pending count till that window
> time (w1) which is still valid as record moved to Success in next window w2
> (based on event time).
>
> Gagan
>
>
> On Mon, Jan 21, 2019 at 8:31 PM Piotr Nowojski 
> wrote:
>
>> @Jeff: It depends if user can define a time window for his condition.
>> As Gagan described his problem it was about “global” threshold of pending
>> orders.
>>
>>
>>
>> I have just thought about another solution that should work without any
>> custom code. Converting “status” field to status_value int:
>> - "+1” for pending
>> - “-1” for success/failure
>> - “0” otherwise
>>
>> Then running:
>>
>> SELECT uid, SUM(status_value) FROM … GROUP BY uid;
>>
>> Query on top of such stream. Conversion to integers could be made by
>> using `CASE` expression.
>>
>> One thing to note here is that probably all of the proposed solutions
>> would work based on the order of the records, not based on the event_time.
>>
>> Piotrek
>>
>> On 21 Jan 2019, at 15:10, Jeff Zhang  wrote:
>>
>> I am thinking of another approach instead of retract stream. Is it
>> possible to define a custom window to do this ? This window is defined for
>> each order. And then you just need to analyze the events in this window.
>>
>> Piotr Nowojski  于2019年1月21日周一 下午8:44写道:
>>
>>> Hi,
>>>
>>> There is a missing feature in Flink Table API/SQL of supporting
>>> retraction streams as the input (or conversions from append stream to
>>> retraction stream) at the moment. With that your problem would simplify to
>>> one simple `SELECT uid, count(*) FROM Changelog GROUP BY uid`. There is an
>>> ongoing work with related work [1], so this might be supported in the next
>>> couple of months.
>>>
>>> There might a workaround at the moment that could work. I think you
>>> would need to write your own custom `LAST_ROW(x)` aggregation function,
>>> which would just return the value of the most recent aggregated row. With
>>> that you could write a query like this:
>>>
>>> SELECT
>>> uid, count(*)
>>> FROM (
>>> SELECT
>>> *
>>> FROM (
>>> SELECT
>>> uid, LAST_ROW(status)
>>> FROM
>>> changelog
>>> GROUP BY
>>> uid, oid)
>>> WHERE status = `pending`)
>>> GROUP BY
>>> uid
>>>
>>> Where `changelog` is an append only stream with the following content:
>>>
>>> *user, order, status, event_time*
>>> u1, o1, pending, t1
>>> u2, o2, failed, t2
>>> *u1, o3, pending, t3*
>>> *u1, o3, success, t4*
>>> u2, o4, pending, t5
>>> u2, o4, pending, t6
>>>
>>>
>>>
>>> Besides that, you could also write your own a relatively simple Data
>>> Stream application to do the same thing.
>>>
>>> I’

Re: Query on retract stream

2019-01-21 Thread Gagan Agrawal
Thank you guys. It's great to hear multiple solutions to achieve this. I
understand that records once emitted to Kafka can not be deleted and that's
acceptable for our use case as last updated value should always be correct.
However as I understand most of these solutions will work for global
aggregation which was asked in original question. But I also have
requirement for event time based sliding window aggregation where same
order count needs to be maintained for past x hours window (sliding at say
every 5 minutes). Is it possible to achieve with Table Api / SQL at the
moment or will require some custom implementation?

For window based upsert stream, there can be few scenarios.

1. An update to record key comes in same window. E.g Pending (t1) ->
Success (t2) happens in same window w1. In this case once window
aggregation is triggered/emitted, such records will be counted as 0
2. An update to record key belongs to same window but arrives late. In this
case old(and already emitted)  window (w1) needs to be re-emitted with
decreased value.
3. An update to record key comes in different window. E.g Pending (t1) in
window w1 and Success (t2) in w2. I think in this case it may not require
to re-emit old window w1 as it represents pending count till that window
time (w1) which is still valid as record moved to Success in next window w2
(based on event time).

Gagan


On Mon, Jan 21, 2019 at 8:31 PM Piotr Nowojski 
wrote:

> @Jeff: It depends if user can define a time window for his condition.
> As Gagan described his problem it was about “global” threshold of pending
> orders.
>
>
>
> I have just thought about another solution that should work without any
> custom code. Converting “status” field to status_value int:
> - "+1” for pending
> - “-1” for success/failure
> - “0” otherwise
>
> Then running:
>
> SELECT uid, SUM(status_value) FROM … GROUP BY uid;
>
> Query on top of such stream. Conversion to integers could be made by using
> `CASE` expression.
>
> One thing to note here is that probably all of the proposed solutions
> would work based on the order of the records, not based on the event_time.
>
> Piotrek
>
> On 21 Jan 2019, at 15:10, Jeff Zhang  wrote:
>
> I am thinking of another approach instead of retract stream. Is it
> possible to define a custom window to do this ? This window is defined for
> each order. And then you just need to analyze the events in this window.
>
> Piotr Nowojski  于2019年1月21日周一 下午8:44写道:
>
>> Hi,
>>
>> There is a missing feature in Flink Table API/SQL of supporting
>> retraction streams as the input (or conversions from append stream to
>> retraction stream) at the moment. With that your problem would simplify to
>> one simple `SELECT uid, count(*) FROM Changelog GROUP BY uid`. There is an
>> ongoing work with related work [1], so this might be supported in the next
>> couple of months.
>>
>> There might a workaround at the moment that could work. I think you would
>> need to write your own custom `LAST_ROW(x)` aggregation function, which
>> would just return the value of the most recent aggregated row. With that
>> you could write a query like this:
>>
>> SELECT
>> uid, count(*)
>> FROM (
>> SELECT
>> *
>> FROM (
>> SELECT
>> uid, LAST_ROW(status)
>> FROM
>> changelog
>> GROUP BY
>> uid, oid)
>> WHERE status = `pending`)
>> GROUP BY
>> uid
>>
>> Where `changelog` is an append only stream with the following content:
>>
>> *user, order, status, event_time*
>> u1, o1, pending, t1
>> u2, o2, failed, t2
>> *u1, o3, pending, t3*
>> *u1, o3, success, t4*
>> u2, o4, pending, t5
>> u2, o4, pending, t6
>>
>>
>>
>> Besides that, you could also write your own a relatively simple Data
>> Stream application to do the same thing.
>>
>> I’m CC’ing Timo, maybe he will have another better idea.
>>
>> Piotrek
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-8577
>>
>> On 18 Jan 2019, at 18:30, Gagan Agrawal  wrote:
>>
>> Hi,
>> I have a requirement and need to understand if same can be achieved with
>> Flink retract stream. Let's say we have stream with 4 attributes userId,
>> orderId, status, event_time where orderId is unique and hence any change in
>> same orderId updates previous value as below
>>
>> *Changelog* *Event Stream*
>>
>> *user, order, status, event_time*
>> u1, o1, pending, t1
>> u2, o2, failed, t2
>> *u1, o3, pending, t3*
>> *u1, o3, success, t4*
>> u2, o4, pending, t5
>> u2, o4, pending, t6
>>
>> *Snapshot view at time t6 (as viewed in mysql)*
>> u1, o1, pending, t1
>> u2, o2, failed, t2
>> u1, o3, success, t4
>> u4, o4, pending, t6
>> (Here rows at time t3 and t5 are deleted as they have been updated for
>> respective order ids)
>>
>> What I need is to maintain count of "Pending" orders against a user and
>> if they go beyond configured threshold, then push that user and pending
>> count to Kafka. Here there can be multiple updates to order status e.g
>> Pending -> Success or Pending -> Failed. Also in some cases there may not
>> be any change in status but we may still

Re: Query on retract stream

2019-01-21 Thread Piotr Nowojski
@Jeff: It depends if user can define a time window for his condition. As Gagan 
described his problem it was about “global” threshold of pending orders.



I have just thought about another solution that should work without any custom 
code. Converting “status” field to status_value int:
- "+1” for pending
- “-1” for success/failure
- “0” otherwise

Then running:

SELECT uid, SUM(status_value) FROM … GROUP BY uid;

Query on top of such stream. Conversion to integers could be made by using 
`CASE` expression. 

One thing to note here is that probably all of the proposed solutions would 
work based on the order of the records, not based on the event_time.

Piotrek

> On 21 Jan 2019, at 15:10, Jeff Zhang  wrote:
> 
> I am thinking of another approach instead of retract stream. Is it possible 
> to define a custom window to do this ? This window is defined for each order. 
> And then you just need to analyze the events in this window.
> 
> Piotr Nowojski mailto:pi...@da-platform.com>> 
> 于2019年1月21日周一 下午8:44写道:
> Hi,
> 
> There is a missing feature in Flink Table API/SQL of supporting retraction 
> streams as the input (or conversions from append stream to retraction stream) 
> at the moment. With that your problem would simplify to one simple `SELECT 
> uid, count(*) FROM Changelog GROUP BY uid`. There is an ongoing work with 
> related work [1], so this might be supported in the next couple of months.
> 
> There might a workaround at the moment that could work. I think you would 
> need to write your own custom `LAST_ROW(x)` aggregation function, which would 
> just return the value of the most recent aggregated row. With that you could 
> write a query like this:
> 
> SELECT 
>   uid, count(*) 
> FROM (
>   SELECT 
>   * 
>   FROM (
>   SELECT 
>   uid, LAST_ROW(status)
>   FROM
>   changelog
>   GROUP BY
>   uid, oid)
>   WHERE status = `pending`)
> GROUP BY
>   uid
> 
> Where `changelog` is an append only stream with the following content:
> 
>> user, order, status, event_time
>> u1, o1, pending, t1
>> u2, o2, failed, t2
>> u1, o3, pending, t3
>> u1, o3, success, t4
>> u2, o4, pending, t5
>> u2, o4, pending, t6
> 
> 
> Besides that, you could also write your own a relatively simple Data Stream 
> application to do the same thing.
> 
> I’m CC’ing Timo, maybe he will have another better idea.
> 
> Piotrek
> 
> [1] https://issues.apache.org/jira/browse/FLINK-8577 
> 
> 
>> On 18 Jan 2019, at 18:30, Gagan Agrawal > > wrote:
>> 
>> Hi,
>> I have a requirement and need to understand if same can be achieved with 
>> Flink retract stream. Let's say we have stream with 4 attributes userId, 
>> orderId, status, event_time where orderId is unique and hence any change in 
>> same orderId updates previous value as below
>> 
>> Changelog Event Stream
>> 
>> user, order, status, event_time
>> u1, o1, pending, t1
>> u2, o2, failed, t2
>> u1, o3, pending, t3
>> u1, o3, success, t4
>> u2, o4, pending, t5
>> u2, o4, pending, t6
>> 
>> Snapshot view at time t6 (as viewed in mysql)
>> u1, o1, pending, t1
>> u2, o2, failed, t2
>> u1, o3, success, t4
>> u4, o4, pending, t6
>> (Here rows at time t3 and t5 are deleted as they have been updated for 
>> respective order ids)
>> 
>> What I need is to maintain count of "Pending" orders against a user and if 
>> they go beyond configured threshold, then push that user and pending count 
>> to Kafka. Here there can be multiple updates to order status e.g Pending -> 
>> Success or Pending -> Failed. Also in some cases there may not be any change 
>> in status but we may still get a row (may be due to some other attribute 
>> update which we are not concerned about). So is it possible to have running 
>> count in flink as below at respective event times. Here Pending count is 
>> decreased from 2 to 1 for user u1 at t4 since one of it's order status was 
>> changed from Pending to Success. Similarly for user u2, at time t6, there 
>> was no change in running count as there was no change in status for order o4
>> 
>> t1 -> u1 : 1, u2 : 0
>> t2 -> u1 : 1, u2 : 0
>> t3 -> u1 : 2, u2 : 0
>> t4 -> u1 : 1, u2 : 0 (since o3 moved pending to success, so count is 
>> decreased for u1)
>> t5 -> u1 : 1, u2 : 1
>> t6 -> u1 : 1, u2 : 1 (no increase in count of u2 as o4 update has no change)
>> 
>> As I understand may be retract stream can achieve this. However I am not 
>> sure how. Any samples around this would be of great help.
>> 
>> Gagan
>> 
> 
> 
> 
> -- 
> Best Regards
> 
> Jeff Zhang



Re: Query on retract stream

2019-01-21 Thread Jeff Zhang
I am thinking of another approach instead of retract stream. Is it possible
to define a custom window to do this ? This window is defined for each
order. And then you just need to analyze the events in this window.

Piotr Nowojski  于2019年1月21日周一 下午8:44写道:

> Hi,
>
> There is a missing feature in Flink Table API/SQL of supporting retraction
> streams as the input (or conversions from append stream to retraction
> stream) at the moment. With that your problem would simplify to one simple
> `SELECT uid, count(*) FROM Changelog GROUP BY uid`. There is an ongoing
> work with related work [1], so this might be supported in the next couple
> of months.
>
> There might a workaround at the moment that could work. I think you would
> need to write your own custom `LAST_ROW(x)` aggregation function, which
> would just return the value of the most recent aggregated row. With that
> you could write a query like this:
>
> SELECT
> uid, count(*)
> FROM (
> SELECT
> *
> FROM (
> SELECT
> uid, LAST_ROW(status)
> FROM
> changelog
> GROUP BY
> uid, oid)
> WHERE status = `pending`)
> GROUP BY
> uid
>
> Where `changelog` is an append only stream with the following content:
>
> *user, order, status, event_time*
> u1, o1, pending, t1
> u2, o2, failed, t2
> *u1, o3, pending, t3*
> *u1, o3, success, t4*
> u2, o4, pending, t5
> u2, o4, pending, t6
>
>
>
> Besides that, you could also write your own a relatively simple Data
> Stream application to do the same thing.
>
> I’m CC’ing Timo, maybe he will have another better idea.
>
> Piotrek
>
> [1] https://issues.apache.org/jira/browse/FLINK-8577
>
> On 18 Jan 2019, at 18:30, Gagan Agrawal  wrote:
>
> Hi,
> I have a requirement and need to understand if same can be achieved with
> Flink retract stream. Let's say we have stream with 4 attributes userId,
> orderId, status, event_time where orderId is unique and hence any change in
> same orderId updates previous value as below
>
> *Changelog* *Event Stream*
>
> *user, order, status, event_time*
> u1, o1, pending, t1
> u2, o2, failed, t2
> *u1, o3, pending, t3*
> *u1, o3, success, t4*
> u2, o4, pending, t5
> u2, o4, pending, t6
>
> *Snapshot view at time t6 (as viewed in mysql)*
> u1, o1, pending, t1
> u2, o2, failed, t2
> u1, o3, success, t4
> u4, o4, pending, t6
> (Here rows at time t3 and t5 are deleted as they have been updated for
> respective order ids)
>
> What I need is to maintain count of "Pending" orders against a user and if
> they go beyond configured threshold, then push that user and pending count
> to Kafka. Here there can be multiple updates to order status e.g Pending ->
> Success or Pending -> Failed. Also in some cases there may not be any
> change in status but we may still get a row (may be due to some other
> attribute update which we are not concerned about). So is it possible to
> have running count in flink as below at respective event times. Here
> Pending count is decreased from 2 to 1 for user u1 at t4 since one of it's
> order status was changed from Pending to Success. Similarly for user u2, at
> time t6, there was no change in running count as there was no change in
> status for order o4
>
> t1 -> u1 : 1, u2 : 0
> t2 -> u1 : 1, u2 : 0
> t3 -> u1 : 2, u2 : 0
> *t4 -> u1 : 1, u2 : 0 (since o3 moved pending to success, so count is
> decreased for u1)*
> t5 -> u1 : 1, u2 : 1
> *t6 -> u1 : 1, u2 : 1 (no increase in count of u2 as o4 update has no
> change)*
>
> As I understand may be retract stream can achieve this. However I am not
> sure how. Any samples around this would be of great help.
>
> Gagan
>
>
>

-- 
Best Regards

Jeff Zhang


Re: Query on retract stream

2019-01-21 Thread Piotr Nowojski
Hi,

There is a missing feature in Flink Table API/SQL of supporting retraction 
streams as the input (or conversions from append stream to retraction stream) 
at the moment. With that your problem would simplify to one simple `SELECT uid, 
count(*) FROM Changelog GROUP BY uid`. There is an ongoing work with related 
work [1], so this might be supported in the next couple of months.

There might a workaround at the moment that could work. I think you would need 
to write your own custom `LAST_ROW(x)` aggregation function, which would just 
return the value of the most recent aggregated row. With that you could write a 
query like this:

SELECT 
uid, count(*) 
FROM (
SELECT 
* 
FROM (
SELECT 
uid, LAST_ROW(status)
FROM
changelog
GROUP BY
uid, oid)
WHERE status = `pending`)
GROUP BY
uid

Where `changelog` is an append only stream with the following content:

> user, order, status, event_time
> u1, o1, pending, t1
> u2, o2, failed, t2
> u1, o3, pending, t3
> u1, o3, success, t4
> u2, o4, pending, t5
> u2, o4, pending, t6


Besides that, you could also write your own a relatively simple Data Stream 
application to do the same thing.

I’m CC’ing Timo, maybe he will have another better idea.

Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-8577

> On 18 Jan 2019, at 18:30, Gagan Agrawal  wrote:
> 
> Hi,
> I have a requirement and need to understand if same can be achieved with 
> Flink retract stream. Let's say we have stream with 4 attributes userId, 
> orderId, status, event_time where orderId is unique and hence any change in 
> same orderId updates previous value as below
> 
> Changelog Event Stream
> 
> user, order, status, event_time
> u1, o1, pending, t1
> u2, o2, failed, t2
> u1, o3, pending, t3
> u1, o3, success, t4
> u2, o4, pending, t5
> u2, o4, pending, t6
> 
> Snapshot view at time t6 (as viewed in mysql)
> u1, o1, pending, t1
> u2, o2, failed, t2
> u1, o3, success, t4
> u4, o4, pending, t6
> (Here rows at time t3 and t5 are deleted as they have been updated for 
> respective order ids)
> 
> What I need is to maintain count of "Pending" orders against a user and if 
> they go beyond configured threshold, then push that user and pending count to 
> Kafka. Here there can be multiple updates to order status e.g Pending -> 
> Success or Pending -> Failed. Also in some cases there may not be any change 
> in status but we may still get a row (may be due to some other attribute 
> update which we are not concerned about). So is it possible to have running 
> count in flink as below at respective event times. Here Pending count is 
> decreased from 2 to 1 for user u1 at t4 since one of it's order status was 
> changed from Pending to Success. Similarly for user u2, at time t6, there was 
> no change in running count as there was no change in status for order o4
> 
> t1 -> u1 : 1, u2 : 0
> t2 -> u1 : 1, u2 : 0
> t3 -> u1 : 2, u2 : 0
> t4 -> u1 : 1, u2 : 0 (since o3 moved pending to success, so count is 
> decreased for u1)
> t5 -> u1 : 1, u2 : 1
> t6 -> u1 : 1, u2 : 1 (no increase in count of u2 as o4 update has no change)
> 
> As I understand may be retract stream can achieve this. However I am not sure 
> how. Any samples around this would be of great help.
> 
> Gagan
> 



Re: Query on retract stream

2019-01-21 Thread Hequn Cheng
Hi Gagan,

Yes, you can achieve this with Flink TableAPI/SQL. However, you have to pay
attention to the following things:
1) Currently, Flink only ingests append streams. In order to ingest upsert
streams(steam with keys), you can use groupBy with a user-defined
LAST_VALUE aggregate function. For implementation, you can refer to the MAX
AggregateFunction(MAX always return the max value while LAST_VALUE always
return the latest value). The SQL may look like:

SELECT user, COUNT(*)
> FROM (
> SELECT order, LAST_VALUE(user), LAST_VALUE(status), LAST_VALUE(event_time)
> FROM SourceTable
> GROUP BY order
> )
> WHERE status = 'pending'
> GROUP BY user

You have to note that the query will be processed under processing time
instead of event time. But I think it would be fine for you, as the final
result will be right.

As for the upsert source, there is already a pr[1] on it, and it is under
review now.

2) You have to note that once you output results to Kafka according to a
configured threshold. The output record cannot be deleted anymore even the
count value decreased. Because Kafka doesn't support delete messages. Also,
this issue[2] make things worse. You can take a detailed look if you
interested in it.

Best, Hequn

[1] https://github.com/apache/flink/pull/6787
[2] https://issues.apache.org/jira/browse/FLINK-9528


On Sat, Jan 19, 2019 at 1:31 AM Gagan Agrawal 
wrote:

> Hi,
> I have a requirement and need to understand if same can be achieved with
> Flink retract stream. Let's say we have stream with 4 attributes userId,
> orderId, status, event_time where orderId is unique and hence any change in
> same orderId updates previous value as below
>
> *Changelog* *Event Stream*
>
> *user, order, status, event_time*
> u1, o1, pending, t1
> u2, o2, failed, t2
> *u1, o3, pending, t3*
> *u1, o3, success, t4*
> u2, o4, pending, t5
> u2, o4, pending, t6
>
> *Snapshot view at time t6 (as viewed in mysql)*
> u1, o1, pending, t1
> u2, o2, failed, t2
> u1, o3, success, t4
> u4, o4, pending, t6
> (Here rows at time t3 and t5 are deleted as they have been updated for
> respective order ids)
>
> What I need is to maintain count of "Pending" orders against a user and if
> they go beyond configured threshold, then push that user and pending count
> to Kafka. Here there can be multiple updates to order status e.g Pending ->
> Success or Pending -> Failed. Also in some cases there may not be any
> change in status but we may still get a row (may be due to some other
> attribute update which we are not concerned about). So is it possible to
> have running count in flink as below at respective event times. Here
> Pending count is decreased from 2 to 1 for user u1 at t4 since one of it's
> order status was changed from Pending to Success. Similarly for user u2, at
> time t6, there was no change in running count as there was no change in
> status for order o4
>
> t1 -> u1 : 1, u2 : 0
> t2 -> u1 : 1, u2 : 0
> t3 -> u1 : 2, u2 : 0
> *t4 -> u1 : 1, u2 : 0 (since o3 moved pending to success, so count is
> decreased for u1)*
> t5 -> u1 : 1, u2 : 1
> *t6 -> u1 : 1, u2 : 1 (no increase in count of u2 as o4 update has no
> change)*
>
> As I understand may be retract stream can achieve this. However I am not
> sure how. Any samples around this would be of great help.
>
> Gagan
>
>