Re: Query on retract stream

2019-01-27 Thread Gagan Agrawal
Thanks Hequn for sharing those details. Looking forward for Blink
integration.
I have one doubt around one of your earlier statements

*> Also, currently, the window doesn't have the ability to handle
retraction messages*

When we use multi window (as you suggested), it is able to handle updates.
So what does this statement really mean? Does that mean using multi window
is just a work around as with single window it's not able to handle
retraction messages?

Also wanted to confirm if tumbling window in Table/SQL api can handle late
data (i.e data arriving after window has closed), do we have something
similar to Datastream apj which has allowedLateness feature? You already
mentioned that for sliding window it can not handle late data. But does
that apply for Tumbling window as well?

One of the challenge in using unbounded aggregates in Table api is around
state retention. As I understand only way to clear old state is via query
config on idleTimeRetention. However it's a global parameter and not per
aggregate parameter. So in my flink job, if I want mix of minute, hourly
and daily aggregates, I will have to keep idleTimeRetention to minimum of
day which means all minute level aggregations will also exist for entire
day and hence would lead to increase in state size.

Gagan

Gagan



On Sun, Jan 27, 2019 at 9:42 AM Hequn Cheng  wrote:

> Hi Gagan,
>
> Besides the eventime and proctime difference, there is another difference
> between the two ways. The window aggregate on bounded data, while unbounded
> aggregate on unbounded data, i.e., the new coming data can update a very
> old data.
>
> As for the performance, I think the two ways may have no big difference in
> current Flink version. Maybe you can run some tests between them on your
> own scenarios if both of them can solve your problem. FYI: There is a nice
> discussion[1] raised by Timo recently. Once Blink is merged into Flink, the
> unbounded aggregate will be much faster than the window.
>
> Best,
> Hequn
>
> [1] https://lists.apache.org/list.html?d...@flink.apache.org:lte=1M:FLIP-32
>
>
> On Sat, Jan 26, 2019 at 4:11 PM Gagan Agrawal 
> wrote:
>
>> Thanks Hequn for suggested solutions and I think this should really work
>> and will give it a try. As I understand First solution  of using multiple
>> windows will be good for those scenarios where I want output to be
>> generated post window is materialized (i.e. watermark reaches end of
>> window). And second will be good if I want it to be fired on per event
>> basis (i.e no watermarking). Apart from this, do you see any difference
>> from performance perspective in choosing between the two or both should be
>> equally performant?
>>
>> Gagan
>>
>> On Sat, Jan 26, 2019 at 11:50 AM Hequn Cheng 
>> wrote:
>>
>>> Hi Gagan,
>>>
>>> Time attribute fields will be materialized by the unbounded groupby.
>>> Also, currently, the window doesn't have the ability to handle retraction
>>> messages. I see two ways to solve the problem.
>>>
>>> - Use multi-window.  The first window performs lastValue, the second
>>> performs count.
>>> - Use two non-window aggregates. In this case, you don't have to change
>>> anything for the first aggregate. For the second one, you can group by an
>>> hour field and perform count(). The code looks like:
>>>
>>> SELECT userId,
>>>  count(orderId)
>>> FROM
>>> (SELECT orderId,
>>>      getHour(orderTime) as myHour,
>>>  lastValue(userId) AS userId,
>>>  lastValue(status) AS status
>>> FROM orders
>>> GROUP BY  orderId, orderTime)
>>> WHERE status='PENDING'
>>> GROUP BY myHour, userId
>>>
>>> Best,
>>> Hequn
>>>
>>>
>>>
>>>
>>> On Sat, Jan 26, 2019 at 12:29 PM Gagan Agrawal 
>>> wrote:
>>>
>>>> Based on the suggestions in this mail thread, I tried out few
>>>> experiments on upsert stream with flink 1.7.1 and here is the issue I am
>>>> facing with window stream.
>>>>
>>>> *1. Global Pending order count. *
>>>> Following query works fine and it's able to handle updates as per
>>>> original requirement.
>>>>
>>>> select userId, count(orderId) from
>>>> (select orderId, lastValue(userId) as userId, lastValue(status) as
>>>> status from orders group by orderId)
>>>> where status='PENDING' group by userId
>>>>
>>>> *2. Last 1 Hour tumbling window count (Append stream)*
>>>> Though following query doesn't handle upsert strea

Re: Query on retract stream

2019-01-25 Thread Gagan Agrawal
Based on the suggestions in this mail thread, I tried out few experiments
on upsert stream with flink 1.7.1 and here is the issue I am facing with
window stream.

*1. Global Pending order count. *
Following query works fine and it's able to handle updates as per original
requirement.

select userId, count(orderId) from
(select orderId, lastValue(userId) as userId, lastValue(status) as status
from orders group by orderId)
where status='PENDING' group by userId

*2. Last 1 Hour tumbling window count (Append stream)*
Though following query doesn't handle upsert stream, I just tried to make
sure time column is working fine. This is working, but as expected, it
doesn't handle updates on orderId.

select userId, count(orderId) from orders
where status='PENDING' group by TUMBLE(orderTime, INTERVAL '1' HOUR), userId

3. *Last 1 Hour tumbling window count (With upsert stream)*
Now I tried combination of above two where input stream is converted to
upsert stream (via lastValue aggregate function) and then Pending count
needs to be calculated in last 1 hour window.

select userId, count(orderId) from
(select orderId, orderTime, lastValue(userId) as userId, lastValue(status)
as status from orders group by orderId, orderTime)
where status='PENDING' group by TUMBLE(orderTime, INTERVAL '1' HOUR), userId

This one gives me following error. Is this because I have added orderTime
in group by/select clause and hence it's time characteristics have changed?
What is the workaround here as without adding orderTime, I can not perform
window aggregation on upsert stream.

[error] Exception in thread "main"
org.apache.flink.table.api.ValidationException:* Window can only be defined
over a time attribute column.*
[error] at
org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:84)
[error] at
org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:89)
[error] at
org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:65)
[error] at
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:315)
[error] at
org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:556)
[error] at
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:415)
[error] at
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:252)
[error] at
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
[error] at
org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:211)
[error] at
org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:198)
[error] at
org.apache.flink.table.api.TableEnvironment.runHepPlanner(TableEnvironment.scala:360)
[error] at
org.apache.flink.table.api.TableEnvironment.runHepPlannerSequentially(TableEnvironment.scala:326)
[error] at
org.apache.flink.table.api.TableEnvironment.optimizeNormalizeLogicalPlan(TableEnvironment.scala:282)
[error] at
org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:811)
[error] at
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
[error] at
org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:205)
[error] at
org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:185)

Gagan

On Tue, Jan 22, 2019 at 7:01 PM Gagan Agrawal 
wrote:

> Thanks Hequn for your response. I initially thought of trying out "over
> window" clause, however as per documentation there seems to be limitation
> in "orderBy" clause where it allows only single time event/processing time
> attribute. Whereas in my case events are getting generated from mysql bin
> log where I have seen multiple event updates getting generated with same
> timestamp (may be because they are part of same transaction) and hence will
> need bin log offset along with timestamp to be able to sort them correctly.
> So looks like can't use "over window" until it allows multiple columns in
> "orderBy". I am exploring option of creating my own window as you suggested
> to be more flexible.
>
> Gagan
>
> On Tue, Jan 22, 2019 at 7:23 AM Hequn Cheng  wrote:
>
>> Hi Gagan,
>>
>> > But I also have a requirement for event time based sliding window
>> aggregation
>>
>> Yes, you can achieve this with Flink TableAPI/SQL. However, currently,
>> sliding windows don't support early fire, i.e., only output results when
>> event time reaches the end of the window. Once window 

Re: Query on retract stream

2019-01-22 Thread Gagan Agrawal
Thanks Hequn for your response. I initially thought of trying out "over
window" clause, however as per documentation there seems to be limitation
in "orderBy" clause where it allows only single time event/processing time
attribute. Whereas in my case events are getting generated from mysql bin
log where I have seen multiple event updates getting generated with same
timestamp (may be because they are part of same transaction) and hence will
need bin log offset along with timestamp to be able to sort them correctly.
So looks like can't use "over window" until it allows multiple columns in
"orderBy". I am exploring option of creating my own window as you suggested
to be more flexible.

Gagan

On Tue, Jan 22, 2019 at 7:23 AM Hequn Cheng  wrote:

> Hi Gagan,
>
> > But I also have a requirement for event time based sliding window
> aggregation
>
> Yes, you can achieve this with Flink TableAPI/SQL. However, currently,
> sliding windows don't support early fire, i.e., only output results when
> event time reaches the end of the window. Once window fires, the window
> state will be cleared and late data belonging to this window will be
> ignored. In order to wait for the late event, you can extract
> watermark with an offset from the timestamp. For example, make watermark =
> timestamp - 5min.
>
> If event time and early fire is a strong requirement in your scenarios,
> you can probably use an over window[1] to solve your problem, say an over
> window with 1h preceding. Over window outputs a result for each input.
>
> If the above solutions can't meet your requirements, you can write a
> DataStream job in which define your own window logic[2].
>
> Best, Hequn
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/tableApi.html#over-windows
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html
>
>
>
> On Tue, Jan 22, 2019 at 12:58 AM Gagan Agrawal 
> wrote:
>
>> Thank you guys. It's great to hear multiple solutions to achieve this. I
>> understand that records once emitted to Kafka can not be deleted and that's
>> acceptable for our use case as last updated value should always be correct.
>> However as I understand most of these solutions will work for global
>> aggregation which was asked in original question. But I also have
>> requirement for event time based sliding window aggregation where same
>> order count needs to be maintained for past x hours window (sliding at say
>> every 5 minutes). Is it possible to achieve with Table Api / SQL at the
>> moment or will require some custom implementation?
>>
>> For window based upsert stream, there can be few scenarios.
>>
>> 1. An update to record key comes in same window. E.g Pending (t1) ->
>> Success (t2) happens in same window w1. In this case once window
>> aggregation is triggered/emitted, such records will be counted as 0
>> 2. An update to record key belongs to same window but arrives late. In
>> this case old(and already emitted)  window (w1) needs to be re-emitted with
>> decreased value.
>> 3. An update to record key comes in different window. E.g Pending (t1) in
>> window w1 and Success (t2) in w2. I think in this case it may not require
>> to re-emit old window w1 as it represents pending count till that window
>> time (w1) which is still valid as record moved to Success in next window w2
>> (based on event time).
>>
>> Gagan
>>
>>
>> On Mon, Jan 21, 2019 at 8:31 PM Piotr Nowojski 
>> wrote:
>>
>>> @Jeff: It depends if user can define a time window for his condition.
>>> As Gagan described his problem it was about “global” threshold of pending
>>> orders.
>>>
>>>
>>>
>>> I have just thought about another solution that should work without any
>>> custom code. Converting “status” field to status_value int:
>>> - "+1” for pending
>>> - “-1” for success/failure
>>> - “0” otherwise
>>>
>>> Then running:
>>>
>>> SELECT uid, SUM(status_value) FROM … GROUP BY uid;
>>>
>>> Query on top of such stream. Conversion to integers could be made by
>>> using `CASE` expression.
>>>
>>> One thing to note here is that probably all of the proposed solutions
>>> would work based on the order of the records, not based on the event_time.
>>>
>>> Piotrek
>>>
>>> On 21 Jan 2019, at 15:10, Jeff Zhang  wrote:
>>>
>>> I am thinking of another approach instead of retract stream. Is it
>>> possible to define a custom window to do thi

Re: Query on retract stream

2019-01-21 Thread Gagan Agrawal
Thank you guys. It's great to hear multiple solutions to achieve this. I
understand that records once emitted to Kafka can not be deleted and that's
acceptable for our use case as last updated value should always be correct.
However as I understand most of these solutions will work for global
aggregation which was asked in original question. But I also have
requirement for event time based sliding window aggregation where same
order count needs to be maintained for past x hours window (sliding at say
every 5 minutes). Is it possible to achieve with Table Api / SQL at the
moment or will require some custom implementation?

For window based upsert stream, there can be few scenarios.

1. An update to record key comes in same window. E.g Pending (t1) ->
Success (t2) happens in same window w1. In this case once window
aggregation is triggered/emitted, such records will be counted as 0
2. An update to record key belongs to same window but arrives late. In this
case old(and already emitted)  window (w1) needs to be re-emitted with
decreased value.
3. An update to record key comes in different window. E.g Pending (t1) in
window w1 and Success (t2) in w2. I think in this case it may not require
to re-emit old window w1 as it represents pending count till that window
time (w1) which is still valid as record moved to Success in next window w2
(based on event time).

Gagan


On Mon, Jan 21, 2019 at 8:31 PM Piotr Nowojski 
wrote:

> @Jeff: It depends if user can define a time window for his condition.
> As Gagan described his problem it was about “global” threshold of pending
> orders.
>
>
>
> I have just thought about another solution that should work without any
> custom code. Converting “status” field to status_value int:
> - "+1” for pending
> - “-1” for success/failure
> - “0” otherwise
>
> Then running:
>
> SELECT uid, SUM(status_value) FROM … GROUP BY uid;
>
> Query on top of such stream. Conversion to integers could be made by using
> `CASE` expression.
>
> One thing to note here is that probably all of the proposed solutions
> would work based on the order of the records, not based on the event_time.
>
> Piotrek
>
> On 21 Jan 2019, at 15:10, Jeff Zhang  wrote:
>
> I am thinking of another approach instead of retract stream. Is it
> possible to define a custom window to do this ? This window is defined for
> each order. And then you just need to analyze the events in this window.
>
> Piotr Nowojski  于2019年1月21日周一 下午8:44写道:
>
>> Hi,
>>
>> There is a missing feature in Flink Table API/SQL of supporting
>> retraction streams as the input (or conversions from append stream to
>> retraction stream) at the moment. With that your problem would simplify to
>> one simple `SELECT uid, count(*) FROM Changelog GROUP BY uid`. There is an
>> ongoing work with related work [1], so this might be supported in the next
>> couple of months.
>>
>> There might a workaround at the moment that could work. I think you would
>> need to write your own custom `LAST_ROW(x)` aggregation function, which
>> would just return the value of the most recent aggregated row. With that
>> you could write a query like this:
>>
>> SELECT
>> uid, count(*)
>> FROM (
>> SELECT
>> *
>> FROM (
>> SELECT
>> uid, LAST_ROW(status)
>> FROM
>> changelog
>> GROUP BY
>> uid, oid)
>> WHERE status = `pending`)
>> GROUP BY
>> uid
>>
>> Where `changelog` is an append only stream with the following content:
>>
>> *user, order, status, event_time*
>> u1, o1, pending, t1
>> u2, o2, failed, t2
>> *u1, o3, pending, t3*
>> *u1, o3, success, t4*
>> u2, o4, pending, t5
>> u2, o4, pending, t6
>>
>>
>>
>> Besides that, you could also write your own a relatively simple Data
>> Stream application to do the same thing.
>>
>> I’m CC’ing Timo, maybe he will have another better idea.
>>
>> Piotrek
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-8577
>>
>> On 18 Jan 2019, at 18:30, Gagan Agrawal  wrote:
>>
>> Hi,
>> I have a requirement and need to understand if same can be achieved with
>> Flink retract stream. Let's say we have stream with 4 attributes userId,
>> orderId, status, event_time where orderId is unique and hence any change in
>> same orderId updates previous value as below
>>
>> *Changelog* *Event Stream*
>>
>> *user, order, status, event_time*
>> u1, o1, pending, t1
>> u2, o2, failed, t2
>> *u1, o3, pending, t3*
>> *u1, o3, success, t4*
>> u2, o4, pending, t5
>> u2, o4, pending, t6
>>
>> *Snapshot view at time t6 (as viewed in mysql)*
>>

Query on retract stream

2019-01-18 Thread Gagan Agrawal
Hi,
I have a requirement and need to understand if same can be achieved with
Flink retract stream. Let's say we have stream with 4 attributes userId,
orderId, status, event_time where orderId is unique and hence any change in
same orderId updates previous value as below

*Changelog* *Event Stream*

*user, order, status, event_time*
u1, o1, pending, t1
u2, o2, failed, t2
*u1, o3, pending, t3*
*u1, o3, success, t4*
u2, o4, pending, t5
u2, o4, pending, t6

*Snapshot view at time t6 (as viewed in mysql)*
u1, o1, pending, t1
u2, o2, failed, t2
u1, o3, success, t4
u4, o4, pending, t6
(Here rows at time t3 and t5 are deleted as they have been updated for
respective order ids)

What I need is to maintain count of "Pending" orders against a user and if
they go beyond configured threshold, then push that user and pending count
to Kafka. Here there can be multiple updates to order status e.g Pending ->
Success or Pending -> Failed. Also in some cases there may not be any
change in status but we may still get a row (may be due to some other
attribute update which we are not concerned about). So is it possible to
have running count in flink as below at respective event times. Here
Pending count is decreased from 2 to 1 for user u1 at t4 since one of it's
order status was changed from Pending to Success. Similarly for user u2, at
time t6, there was no change in running count as there was no change in
status for order o4

t1 -> u1 : 1, u2 : 0
t2 -> u1 : 1, u2 : 0
t3 -> u1 : 2, u2 : 0
*t4 -> u1 : 1, u2 : 0 (since o3 moved pending to success, so count is
decreased for u1)*
t5 -> u1 : 1, u2 : 1
*t6 -> u1 : 1, u2 : 1 (no increase in count of u2 as o4 update has no
change)*

As I understand may be retract stream can achieve this. However I am not
sure how. Any samples around this would be of great help.

Gagan


Re: Multiple MapState vs single nested MapState in stateful Operator

2019-01-11 Thread Gagan Agrawal
This makes perfect sense to me. Thanks Congxian and Kostas for your inputs.

Gagan

On Thu, Jan 10, 2019 at 6:03 PM Kostas Kloudas 
wrote:

> Hi Gagan,
>
> I agree with Congxian!
> In MapState, when accessing the state/value associated with a key in the
> map, then the whole value is de-serialized (and serialized in case of a
> put()).
> Given this, it is more efficient to have many keys, with small state, than
> fewer keys with huge state.
>
> Cheers,
> Kostas
>
>
> On Thu, Jan 10, 2019 at 12:34 PM Congxian Qiu 
> wrote:
>
>> Hi, Gagan Agrawal
>>
>> In my opinion, I prefer the first.
>>
>> Here is the reason.
>>
>> In RocksDB StateBackend, we will serialize the key, namespace, user-key
>> into a serialized bytes (key-bytes) and serialize user-value to serialized
>> bytes(value-bytes) then insert  into the key-bytes/value-bytes into
>> RocksDB, when retrieving from RocksDB we can user get(for a single
>> key/value) or iterator(for a key range).
>>
>> If we store four maps into a single MapState, we need to deserialize the
>> value-bytes(a Map) when we want to retrieve a single user-value.
>>
>>
>> Gagan Agrawal  于2019年1月10日周四 上午10:38写道:
>>
>>> Hi,
>>> I have a use case where 4 streams get merged (union) and grouped on
>>> common key (keyBy) and a custom KeyedProcessFunction is called. Now I need
>>> to keep state (RocksDB backend) for all 4 streams in my custom
>>> KeyedProcessFunction where each of these 4 streams would be stored as map.
>>> So I have 2 options
>>>
>>> 1. Create a separate MapStateDescriptor for each of these streams and
>>> store their events separately.
>>> 2. Create a single MapStateDescriptor where there will be only 4 keys
>>> (corresponding to 4 stream types) and value will be of type Map which
>>> further keep events from respective streams.
>>>
>>> I want to understand from performance perspective, would there be any
>>> difference in above approaches. Will keeping 4 different MapState cause 4
>>> lookups for RocksDB backend when they are accessed? Or all of these
>>> MapStates are internally stored within RocksDB in single row corresponding
>>> to respective key (as per keyedStream) and hence they are all fetched in
>>> single call before operator's processElement is called? If there are
>>> different lookups in RocksDB for each of MapStateDescriptor, then I think
>>> keeping them in single MapStateDescriptor would be more efficient minimize
>>> RocksDB calls? Please advise.
>>>
>>> Gagan
>>>
>>
>>
>> --
>> Best,
>> Congxian
>>
>


Custom Serializer for Avro GenericRecord

2019-01-09 Thread Gagan Agrawal
Hi,
I am using Avro GenericRecord for most of IN/OUT types from my custom
functions. What I have noticed is that default Avro GenericRecord
serializer, also serializes Schema which makes messages very heavy and
hence impacts overall performance.  In my case I already know the schema
before hand and hence can use same to serialize / deserialize records
without schema which should reduce size by great factor. Are there any
guidelines on how this custom serializer can be registered? One way I see
could be to first "enableEnforceKryo()" on StreamingEnvironment and then
register custom serializer for GenericRecord Type as here [1]. However I am
not sure if it's right approach or there is better approach to achieve
same. Also I would need some customization while storing these messages in
state store (RocksDb in my case) which I think can be achieved as here [2].

Please note I can not use POJOs in my case as it's generalized framework on
top of Flink.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/custom_serializers.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/custom_serialization.html

Gagan


Multiple MapState vs single nested MapState in stateful Operator

2019-01-09 Thread Gagan Agrawal
Hi,
I have a use case where 4 streams get merged (union) and grouped on common
key (keyBy) and a custom KeyedProcessFunction is called. Now I need to keep
state (RocksDB backend) for all 4 streams in my custom KeyedProcessFunction
where each of these 4 streams would be stored as map. So I have 2 options

1. Create a separate MapStateDescriptor for each of these streams and store
their events separately.
2. Create a single MapStateDescriptor where there will be only 4 keys
(corresponding to 4 stream types) and value will be of type Map which
further keep events from respective streams.

I want to understand from performance perspective, would there be any
difference in above approaches. Will keeping 4 different MapState cause 4
lookups for RocksDB backend when they are accessed? Or all of these
MapStates are internally stored within RocksDB in single row corresponding
to respective key (as per keyedStream) and hence they are all fetched in
single call before operator's processElement is called? If there are
different lookups in RocksDB for each of MapStateDescriptor, then I think
keeping them in single MapStateDescriptor would be more efficient minimize
RocksDB calls? Please advise.

Gagan


Re: Buffer stats when Back Pressure is high

2019-01-08 Thread Gagan Agrawal
Thanks Timo for suggested solution. Will go with idea of artificial key for
our use case.

Gagan

On Mon, Jan 7, 2019 at 10:21 PM Timo Walther  wrote:

> Hi Gagan,
>
> a typical solution to such a problem is to introduce an artifical key
> (enrichment id + some additional suffix), you can then keyBy on this
> artificial key and thus spread the workload more evenly. Of course you need
> to make sure that records of the second stream are duplicated to all
> operators with the same artificial key.
>
> Depending on the frequency of the second stream, it might also worth to
> use a broadcast join that distributes the second stream to all operators
> such that all operators can perform the enrichment step in a round robin
> fashion.
>
> Regards,
> Timo
>
> Am 07.01.19 um 14:45 schrieb Gagan Agrawal:
>
> Flink Version is 1.7.
> Thanks Zhijiang for your pointer. Initially I was checking only for few.
> However I just checked for all and found couple of them having queue length
> of 40+ which seems to be due to skewness in data. Is there any general
> guide lines on how to handle skewed data? In my case I am taking union and
> then keyBy (with custom stateful Process function) on enrichment id of 2
> streams (1 enrichment stream with low volume and another regular data
> stream with high volume). I see that 30% of my data stream records have
> same enrichment Id and hence go to same tasks which results in skewness.
> Any pointers on how to handle skewness while doing keyBy would be of great
> help.
>
> Gagan
>
> On Mon, Jan 7, 2019 at 3:25 PM zhijiang 
> wrote:
>
>> Hi Gagan,
>>
>> What flink version do you use? And have you checked the 
>> buffers.inputQueueLength
>> for all the related parallelism (connected with A) of B?  It may exist the
>> scenario that only one parallelim B is full of inqueue buffers which back
>> pressure A, and the input queue for other parallelism B is empty.
>>
>> Best,
>> Zhijiang
>>
>> --
>> From:Gagan Agrawal 
>> Send Time:2019年1月7日(星期一) 12:06
>> To:user 
>> Subject:Buffer stats when Back Pressure is high
>>
>> Hi,
>> I want to understand does any of buffer stats help in debugging /
>> validating that downstream operator is performing slow when Back Pressure
>> is high? Say I have A -> B operators and A shows High Back Pressure which
>> indicates something wrong or not performing well on B side which is slowing
>> down operator A. However when I look at buffers.inputQueueLength for
>> operator B, it's 0. My understanding is that when B is processing slow,
>> it's input buffer will be full of incoming messages which ultimately
>> blocks/slows down upstream operator A. However it doesn't seem to be
>> happening in my case. Can someone throw some light on how should different
>> stats around buffers (e.g buffers.inPoolUsage, buffers.inputQueueLength,
>> numBuffersInLocalPerSecond, numBuffersInRemotePerSecond) look like when
>> downstream operator is performing slow?
>>
>> Gagan
>>
>>
>>
>


Re: Buffer stats when Back Pressure is high

2019-01-07 Thread Gagan Agrawal
Flink Version is 1.7.
Thanks Zhijiang for your pointer. Initially I was checking only for few.
However I just checked for all and found couple of them having queue length
of 40+ which seems to be due to skewness in data. Is there any general
guide lines on how to handle skewed data? In my case I am taking union and
then keyBy (with custom stateful Process function) on enrichment id of 2
streams (1 enrichment stream with low volume and another regular data
stream with high volume). I see that 30% of my data stream records have
same enrichment Id and hence go to same tasks which results in skewness.
Any pointers on how to handle skewness while doing keyBy would be of great
help.

Gagan

On Mon, Jan 7, 2019 at 3:25 PM zhijiang  wrote:

> Hi Gagan,
>
> What flink version do you use? And have you checked the 
> buffers.inputQueueLength
> for all the related parallelism (connected with A) of B?  It may exist the
> scenario that only one parallelim B is full of inqueue buffers which back
> pressure A, and the input queue for other parallelism B is empty.
>
> Best,
> Zhijiang
>
> --
> From:Gagan Agrawal 
> Send Time:2019年1月7日(星期一) 12:06
> To:user 
> Subject:Buffer stats when Back Pressure is high
>
> Hi,
> I want to understand does any of buffer stats help in debugging /
> validating that downstream operator is performing slow when Back Pressure
> is high? Say I have A -> B operators and A shows High Back Pressure which
> indicates something wrong or not performing well on B side which is slowing
> down operator A. However when I look at buffers.inputQueueLength for
> operator B, it's 0. My understanding is that when B is processing slow,
> it's input buffer will be full of incoming messages which ultimately
> blocks/slows down upstream operator A. However it doesn't seem to be
> happening in my case. Can someone throw some light on how should different
> stats around buffers (e.g buffers.inPoolUsage, buffers.inputQueueLength,
> numBuffersInLocalPerSecond, numBuffersInRemotePerSecond) look like when
> downstream operator is performing slow?
>
> Gagan
>
>
>


Buffer stats when Back Pressure is high

2019-01-06 Thread Gagan Agrawal
Hi,
I want to understand does any of buffer stats help in debugging /
validating that downstream operator is performing slow when Back Pressure
is high? Say I have A -> B operators and A shows High Back Pressure which
indicates something wrong or not performing well on B side which is slowing
down operator A. However when I look at buffers.inputQueueLength for
operator B, it's 0. My understanding is that when B is processing slow,
it's input buffer will be full of incoming messages which ultimately
blocks/slows down upstream operator A. However it doesn't seem to be
happening in my case. Can someone throw some light on how should different
stats around buffers (e.g buffers.inPoolUsage, buffers.inputQueueLength,
numBuffersInLocalPerSecond, numBuffersInRemotePerSecond) look like when
downstream operator is performing slow?

Gagan


Joining more than 2 streams

2018-11-24 Thread Gagan Agrawal
Hi,
I want to do window join on multiple Kafka streams (say a, b, c) on common
field in all 3 streams and apply some custom function on joined stream. As
I understand we can join only 2 streams at a time via DataStream api. So
may be I need to join a and b first and then join first joined stream with
c. I want to understand how would stream state be stored in backend? Since
I will be joining a and b stream first, I believe both streams will be
stored in state backend for window time. And then again join of first
joined stream (of a and b) with c will result storage of all 3 streams for
windowed period. Does that mean stream a and b are stored twice in state
backend?

Let's say instead of using inbuilt join api, if I rather union all 3
streams (after transforming them to common schema) and keyBy stream on
common field and apply process function where I implement joining on my own
and store streams in some state backend, will that be more storage
efficient as I will be saving 3 streams just once instead of twice?

Gagan


Flink job failing due to "Container is running beyond physical memory limits" error.

2018-11-23 Thread Gagan Agrawal
Hi,
I am running flink job on yarn where it ran fine so far (4-5 days) and have
now started failing with following errors.

2018-11-24 03:46:21,029 INFO  org.apache.flink.yarn.YarnResourceManager
- Closing TaskExecutor connection
container_1542008917197_0038_01_06 because: Container
[pid=18380,containerID=container_1542008917197_0038_01_06] *is running
beyond physical memory limits. Current usage: 3.0 GB of 3 GB physical
memory used; 5.0 GB of 15 GB virtual memory used. Killing container.*

This is simple job where we are reading 2 Avro streams from Kafka and
applying some custom UDF after creating keyed stream from union on those 2
streams and writing back output to Kafka. Udf internally uses Map State
with RocksDB backend. Currently size of checkpoint is around 300 GB and we
are running this with 10 task manager with 3 GB memory each. I have also
set "containerized.heap-cutoff-ratio: 0.5" but still facing same issue.
Flink version is 1.6.2

Here is the flink command
./bin/flink run -m yarn-cluster -yd -yn 10 -ytm 3072 -ys 4 job.jar

I want to understand what are typical reasons for this issue? Also why
would flink consume more memory than allocated as JVM memory is fixed and
will not grow beyond max heap. Can this be something related to RocksDB
where it may be consuming memory outside heap and hence over using defined
limits? I didn't find this issue when checkpoint size was small (<50 GB).
But ever since we are now at 300GB size, this issue is coming frequently. I
can try increasing memory, but I am still interested in knowing what are
typical reasons for this error if Jvm heap memory can not grow beyond
defined limit.

Gagan


Re: Savepoint failed with error "Checkpoint expired before completing"

2018-11-19 Thread Gagan Agrawal
Good to know that Steven. It will be useful feature to have separate time
out configs for both.

Gagan

On Mon, Nov 5, 2018, 10:06 Steven Wu  FYI, here is the jira to support timeout in savepoint REST api
> https://issues.apache.org/jira/browse/FLINK-10360
>
> On Fri, Nov 2, 2018 at 6:37 PM Gagan Agrawal 
> wrote:
>
>> Great, thanks for sharing that info.
>>
>> Gagan
>>
>> On Thu, Nov 1, 2018 at 1:50 PM Yun Tang  wrote:
>>
>>> Haha, actually externalized checkpoint also support parallelism
>>> changes, you could read my email
>>> <http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Why-documentation-always-say-checkpoint-does-not-support-Flink-specific-features-like-rescaling-td23982.html>
>>> posted in dev-mail-list.
>>>
>>> Best
>>> Yun Tang
>>> --
>>> *From:* Gagan Agrawal 
>>> *Sent:* Thursday, November 1, 2018 13:38
>>> *To:* myas...@live.com
>>> *Cc:* happydexu...@gmail.com; user@flink.apache.org
>>> *Subject:* Re: Savepoint failed with error "Checkpoint expired before
>>> completing"
>>>
>>> Thanks Yun for your inputs. Yes, increasing checkpoint helps and we are
>>> able to save save points now. In our case we wanted to increase parallelism
>>> so I believe savepoint is the only option as checkpoint doesn't support
>>> code/parallelism changes.
>>>
>>> Gagan
>>>
>>> On Wed, Oct 31, 2018 at 8:46 PM Yun Tang  wrote:
>>>
>>> Hi Gagan
>>>
>>> Savepoint would generally takes more time than usual incremental
>>> checkpoint, you could try to increase checkpoint timeout time [1]
>>>
>>>env.getCheckpointConfig().setCheckpointTimeout(90);
>>>
>>> If you just want to resume from previous job without change the 
>>> state-backend, I think you could also try to resume from a retained 
>>> checkpoint without trigger savepoint [2].
>>>
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/checkpointing.html#enabling-and-configuring-checkpointing
>>>
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint
>>> Apache Flink 1.6 Documentation: Checkpoints
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint>
>>> Deployment & Operations; State & Fault Tolerance; Checkpoints;
>>> Checkpoints. Overview; Retained Checkpoints. Directory Structure;
>>> Difference to Savepoints; Resuming from a retained checkpoint
>>> ci.apache.org
>>>
>>> Best
>>> Yun Tang
>>>
>>> --
>>> *From:* Gagan Agrawal 
>>> *Sent:* Wednesday, October 31, 2018 19:03
>>> *To:* happydexu...@gmail.com
>>> *Cc:* user@flink.apache.org
>>> *Subject:* Re: Savepoint failed with error "Checkpoint expired before
>>> completing"
>>>
>>> Hi Henry,
>>> Thanks for your response. However we don't face this issue during normal
>>> run as we have incremental checkpoints. Only when we try to take savepoint
>>> (which tries to save entire state in one go), we face this problem.
>>>
>>> Gagan
>>>
>>> On Wed, Oct 31, 2018 at 11:41 AM 徐涛  wrote:
>>>
>>> Hi Gagan,
>>> I have met with the error the checkpoint timeout too.
>>> In my case, it is not due to big checkpoint size,  but due to
>>> slow sink then cause high backpressure to the upper operator. Then the
>>> barrier may take a long time to arrive to sink.
>>> Please check if it is the case you have met.
>>>
>>> Best
>>> Henry
>>>
>>> > 在 2018年10月30日,下午6:07,Gagan Agrawal  写道:
>>> >
>>> > Hi,
>>> > We have a flink job (flink version 1.6.1) which unions 2 streams to
>>> pass through custom KeyedProcessFunction with RocksDB state store which
>>> final creates another stream into Kafka. Current size of checkpoint is
>>> around ~100GB and checkpoints are saved to s3 with 5 mins interval and
>>> incremental checkpoint enabled. Checkpoints mostly finish in less than 1
>>> min. We are running this job on yarn with following parameters
>>> >
>>> > -yn 10  (10 task managers)
>>> > -ytm 2048 (2 GB each)
>>> > - Operator 

Re: Savepoint failed with error "Checkpoint expired before completing"

2018-11-02 Thread Gagan Agrawal
Great, thanks for sharing that info.

Gagan

On Thu, Nov 1, 2018 at 1:50 PM Yun Tang  wrote:

> Haha, actually externalized checkpoint also support parallelism changes,
> you could read my email
> <http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Why-documentation-always-say-checkpoint-does-not-support-Flink-specific-features-like-rescaling-td23982.html>
> posted in dev-mail-list.
>
> Best
> Yun Tang
> ----------
> *From:* Gagan Agrawal 
> *Sent:* Thursday, November 1, 2018 13:38
> *To:* myas...@live.com
> *Cc:* happydexu...@gmail.com; user@flink.apache.org
> *Subject:* Re: Savepoint failed with error "Checkpoint expired before
> completing"
>
> Thanks Yun for your inputs. Yes, increasing checkpoint helps and we are
> able to save save points now. In our case we wanted to increase parallelism
> so I believe savepoint is the only option as checkpoint doesn't support
> code/parallelism changes.
>
> Gagan
>
> On Wed, Oct 31, 2018 at 8:46 PM Yun Tang  wrote:
>
> Hi Gagan
>
> Savepoint would generally takes more time than usual incremental
> checkpoint, you could try to increase checkpoint timeout time [1]
>
>env.getCheckpointConfig().setCheckpointTimeout(90);
>
> If you just want to resume from previous job without change the 
> state-backend, I think you could also try to resume from a retained 
> checkpoint without trigger savepoint [2].
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/checkpointing.html#enabling-and-configuring-checkpointing
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint
> Apache Flink 1.6 Documentation: Checkpoints
> <https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint>
> Deployment & Operations; State & Fault Tolerance; Checkpoints;
> Checkpoints. Overview; Retained Checkpoints. Directory Structure;
> Difference to Savepoints; Resuming from a retained checkpoint
> ci.apache.org
>
> Best
> Yun Tang
>
> --
> *From:* Gagan Agrawal 
> *Sent:* Wednesday, October 31, 2018 19:03
> *To:* happydexu...@gmail.com
> *Cc:* user@flink.apache.org
> *Subject:* Re: Savepoint failed with error "Checkpoint expired before
> completing"
>
> Hi Henry,
> Thanks for your response. However we don't face this issue during normal
> run as we have incremental checkpoints. Only when we try to take savepoint
> (which tries to save entire state in one go), we face this problem.
>
> Gagan
>
> On Wed, Oct 31, 2018 at 11:41 AM 徐涛  wrote:
>
> Hi Gagan,
> I have met with the error the checkpoint timeout too.
> In my case, it is not due to big checkpoint size,  but due to slow
> sink then cause high backpressure to the upper operator. Then the barrier
> may take a long time to arrive to sink.
> Please check if it is the case you have met.
>
> Best
> Henry
>
> > 在 2018年10月30日,下午6:07,Gagan Agrawal  写道:
> >
> > Hi,
> > We have a flink job (flink version 1.6.1) which unions 2 streams to pass
> through custom KeyedProcessFunction with RocksDB state store which final
> creates another stream into Kafka. Current size of checkpoint is around
> ~100GB and checkpoints are saved to s3 with 5 mins interval and incremental
> checkpoint enabled. Checkpoints mostly finish in less than 1 min. We are
> running this job on yarn with following parameters
> >
> > -yn 10  (10 task managers)
> > -ytm 2048 (2 GB each)
> > - Operator parallelism is also 10.
> >
> > While trying to run savepoint on this job, it runs for ~10mins and then
> throws following error. Looks like checkpoint default timeout of 10mins is
> causing this. What is recommended way to run savepoint for such job? Should
> we increase checkpoint default timeout of 10mins? Also currently our state
> size is 100GB but it is expected to grow unto 1TB. Is flink good for
> usecases with that much of size? Also how much time savepoint is expected
> to take with such state size and parallelism on Yarn? Any other
> recommendation would be of great help.
> >
> > org.apache.flink.util.FlinkException: Triggering a savepoint for the job
> 434398968e635a49329f59a019b41b6f failed.
> >   at
> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:714)
> >   at
> org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:692)
> >   at
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:9

Re: Savepoint failed with error "Checkpoint expired before completing"

2018-10-31 Thread Gagan Agrawal
Thanks Yun for your inputs. Yes, increasing checkpoint helps and we are
able to save save points now. In our case we wanted to increase parallelism
so I believe savepoint is the only option as checkpoint doesn't support
code/parallelism changes.

Gagan

On Wed, Oct 31, 2018 at 8:46 PM Yun Tang  wrote:

> Hi Gagan
>
> Savepoint would generally takes more time than usual incremental
> checkpoint, you could try to increase checkpoint timeout time [1]
>
>env.getCheckpointConfig().setCheckpointTimeout(90);
>
> If you just want to resume from previous job without change the 
> state-backend, I think you could also try to resume from a retained 
> checkpoint without trigger savepoint [2].
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/checkpointing.html#enabling-and-configuring-checkpointing
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint
> Apache Flink 1.6 Documentation: Checkpoints
> <https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint>
> Deployment & Operations; State & Fault Tolerance; Checkpoints;
> Checkpoints. Overview; Retained Checkpoints. Directory Structure;
> Difference to Savepoints; Resuming from a retained checkpoint
> ci.apache.org
>
> Best
> Yun Tang
>
> --
> *From:* Gagan Agrawal 
> *Sent:* Wednesday, October 31, 2018 19:03
> *To:* happydexu...@gmail.com
> *Cc:* user@flink.apache.org
> *Subject:* Re: Savepoint failed with error "Checkpoint expired before
> completing"
>
> Hi Henry,
> Thanks for your response. However we don't face this issue during normal
> run as we have incremental checkpoints. Only when we try to take savepoint
> (which tries to save entire state in one go), we face this problem.
>
> Gagan
>
> On Wed, Oct 31, 2018 at 11:41 AM 徐涛  wrote:
>
> Hi Gagan,
> I have met with the error the checkpoint timeout too.
> In my case, it is not due to big checkpoint size,  but due to slow
> sink then cause high backpressure to the upper operator. Then the barrier
> may take a long time to arrive to sink.
> Please check if it is the case you have met.
>
> Best
> Henry
>
> > 在 2018年10月30日,下午6:07,Gagan Agrawal  写道:
> >
> > Hi,
> > We have a flink job (flink version 1.6.1) which unions 2 streams to pass
> through custom KeyedProcessFunction with RocksDB state store which final
> creates another stream into Kafka. Current size of checkpoint is around
> ~100GB and checkpoints are saved to s3 with 5 mins interval and incremental
> checkpoint enabled. Checkpoints mostly finish in less than 1 min. We are
> running this job on yarn with following parameters
> >
> > -yn 10  (10 task managers)
> > -ytm 2048 (2 GB each)
> > - Operator parallelism is also 10.
> >
> > While trying to run savepoint on this job, it runs for ~10mins and then
> throws following error. Looks like checkpoint default timeout of 10mins is
> causing this. What is recommended way to run savepoint for such job? Should
> we increase checkpoint default timeout of 10mins? Also currently our state
> size is 100GB but it is expected to grow unto 1TB. Is flink good for
> usecases with that much of size? Also how much time savepoint is expected
> to take with such state size and parallelism on Yarn? Any other
> recommendation would be of great help.
> >
> > org.apache.flink.util.FlinkException: Triggering a savepoint for the job
> 434398968e635a49329f59a019b41b6f failed.
> >   at
> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:714)
> >   at
> org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:692)
> >   at
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:979)
> >   at
> org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:689)
> >   at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1059)
> >   at
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120)
> >   at java.security.AccessController.doPrivileged(Native Method)
> >   at javax.security.auth.Subject.doAs(Subject.java:422)
> >   at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> >   at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> >   at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)
> > Caused by: java.util.concurrent.CompletionExc

Re: Savepoint failed with error "Checkpoint expired before completing"

2018-10-31 Thread Gagan Agrawal
Hi Henry,
Thanks for your response. However we don't face this issue during normal
run as we have incremental checkpoints. Only when we try to take savepoint
(which tries to save entire state in one go), we face this problem.

Gagan

On Wed, Oct 31, 2018 at 11:41 AM 徐涛  wrote:

> Hi Gagan,
> I have met with the error the checkpoint timeout too.
> In my case, it is not due to big checkpoint size,  but due to slow
> sink then cause high backpressure to the upper operator. Then the barrier
> may take a long time to arrive to sink.
> Please check if it is the case you have met.
>
> Best
> Henry
>
> > 在 2018年10月30日,下午6:07,Gagan Agrawal  写道:
> >
> > Hi,
> > We have a flink job (flink version 1.6.1) which unions 2 streams to pass
> through custom KeyedProcessFunction with RocksDB state store which final
> creates another stream into Kafka. Current size of checkpoint is around
> ~100GB and checkpoints are saved to s3 with 5 mins interval and incremental
> checkpoint enabled. Checkpoints mostly finish in less than 1 min. We are
> running this job on yarn with following parameters
> >
> > -yn 10  (10 task managers)
> > -ytm 2048 (2 GB each)
> > - Operator parallelism is also 10.
> >
> > While trying to run savepoint on this job, it runs for ~10mins and then
> throws following error. Looks like checkpoint default timeout of 10mins is
> causing this. What is recommended way to run savepoint for such job? Should
> we increase checkpoint default timeout of 10mins? Also currently our state
> size is 100GB but it is expected to grow unto 1TB. Is flink good for
> usecases with that much of size? Also how much time savepoint is expected
> to take with such state size and parallelism on Yarn? Any other
> recommendation would be of great help.
> >
> > org.apache.flink.util.FlinkException: Triggering a savepoint for the job
> 434398968e635a49329f59a019b41b6f failed.
> >   at
> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:714)
> >   at
> org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:692)
> >   at
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:979)
> >   at
> org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:689)
> >   at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1059)
> >   at
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120)
> >   at java.security.AccessController.doPrivileged(Native Method)
> >   at javax.security.auth.Subject.doAs(Subject.java:422)
> >   at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> >   at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> >   at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)
> > Caused by: java.util.concurrent.CompletionException:
> java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint
> expired before completing
> >   at
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$13(JobMaster.java:955)
> >   at
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
> >   at
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
> >   at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> >   at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> >   at
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortExpired(PendingCheckpoint.java:412)
> >   at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$triggerCheckpoint$0(CheckpointCoordinator.java:548)
> >   at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >   at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> >   at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> >   at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >   at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >   at java.lang.Thread.run(Thread.java:748)
> > Caused by: java.util.concurrent.CompletionException:
> java.lang.Exception: Checkpoint expired before completing
> >   at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> >   at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> >   at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
> >   at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>
>


Savepoint failed with error "Checkpoint expired before completing"

2018-10-30 Thread Gagan Agrawal
Hi,
We have a flink job (flink version 1.6.1) which unions 2 streams to pass
through custom KeyedProcessFunction with RocksDB state store which final
creates another stream into Kafka. Current size of checkpoint is around
~100GB and checkpoints are saved to s3 with 5 mins interval and incremental
checkpoint enabled. Checkpoints mostly finish in less than 1 min. We are
running this job on yarn with following parameters

-yn 10  (10 task managers)
-ytm 2048 (2 GB each)
- Operator parallelism is also 10.

While trying to run savepoint on this job, it runs for ~10mins and then
throws following error. Looks like checkpoint default timeout of 10mins is
causing this. What is recommended way to run savepoint for such job? Should
we increase checkpoint default timeout of 10mins? Also currently our state
size is 100GB but it is expected to grow unto 1TB. Is flink good for
usecases with that much of size? Also how much time savepoint is expected
to take with such state size and parallelism on Yarn? Any other
recommendation would be of great help.

org.apache.flink.util.FlinkException: Triggering a savepoint for the job
434398968e635a49329f59a019b41b6f failed.
at
org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:714)
at
org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:692)
at
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:979)
at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:689)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1059)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)
Caused by: java.util.concurrent.CompletionException:
java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint
expired before completing
at
org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$13(JobMaster.java:955)
at
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at
org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortExpired(PendingCheckpoint.java:412)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$triggerCheckpoint$0(CheckpointCoordinator.java:548)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.CompletionException: java.lang.Exception:
Checkpoint expired before completing
at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)