slow checkpoints

2019-11-15 Thread yuvraj singh
Hi all ,

I am facing one issue , when i have high back pressure my checkpoints start
failing . please let me know how to deal with this kind  of situations .


Thanks
Yubraj Singh .


[image: Mailtrack]

Sender
notified by
Mailtrack

11/15/19,
02:15:52 PM


Re: slow checkpoints

2019-11-15 Thread Congxian Qiu
Hi

Currently, checkpoint may be faile in high back pressure scenario, because
the barrier alignment can't be done in expected time, you should fix the
back pressure problem first. There is a FLIP[1] that wants to fix this
issue.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints
Best,
Congxian


yuvraj singh <19yuvrajsing...@gmail.com> 于2019年11月15日周五 下午4:47写道:

>
> Hi all ,
>
> I am facing one issue , when i have high back pressure my checkpoints
> start failing . please let me know how to deal with this kind  of
> situations .
>
>
> Thanks
> Yubraj Singh .
>
>
> [image: Mailtrack]
> 
>  Sender
> notified by
> Mailtrack
> 
>  11/15/19,
> 02:15:52 PM
>


Re: slow checkpoints

2019-11-15 Thread yuvraj singh
@Congxian , back pressure is due to job failure , some times job can fail
and we need to catch up .

Thanks
Yubraj Singh

[image: Mailtrack]

Sender
notified by
Mailtrack

11/15/19,
02:53:23 PM

On Fri, Nov 15, 2019 at 2:39 PM Congxian Qiu  wrote:

> Hi
>
> Currently, checkpoint may be faile in high back pressure scenario, because
> the barrier alignment can't be done in expected time, you should fix the
> back pressure problem first. There is a FLIP[1] that wants to fix this
> issue.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints
> Best,
> Congxian
>
>
> yuvraj singh <19yuvrajsing...@gmail.com> 于2019年11月15日周五 下午4:47写道:
>
>>
>> Hi all ,
>>
>> I am facing one issue , when i have high back pressure my checkpoints
>> start failing . please let me know how to deal with this kind  of
>> situations .
>>
>>
>> Thanks
>> Yubraj Singh .
>>
>>
>> [image: Mailtrack]
>> 
>>  Sender
>> notified by
>> Mailtrack
>> 
>>  11/15/19,
>> 02:15:52 PM
>>
>


How to unsubscribe the Apache projects and jira issues notification

2019-11-15 Thread P. Ramanjaneya Reddy
Hi

Following blogs want to unsubscribe kindly guide.

I tried from google..still mails receiving

Also should unubscribe..

j...@apache.org


user@flink.apache.org

d...@flink.apache.org

d...@beam.apache.org

u...@beamho.apache.org 


Thanks


Re: Initialization of broadcast state before processing main stream

2019-11-15 Thread Vasily Melnik
Maxim, great thanks.
We'll try buffering.

С уважением,
Василий Мельник


On Thu, 14 Nov 2019 at 19:36, Maxim Parkachov  wrote:

> Hi Vasily,
>
> unfortunately, this is known issue with Flink, you could read discussion
> under
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
>  .
>
> At the moment I have seen 3 solutions for this issue:
>
> 1. You buffer fact stream in local state before broadcast is completely
> read
> 2. You create custom source for fact stream and in open method wait before
> broadcast stream is completely read.
> 3. With latest Flink version, you could pre-populate state with dimension
> and start Flink job with existing state. You need to take care of setting
> correct kafka offsets for dimension stream though, otherwise you will get a
> gap between pre-populated state and moment when job is started.
>
> First 2  solutions need to know when broadcast stream is "completely
> read". I created workaround for this issue with custom source for dimension
> events. It creates "stop file" on shared file system, reads with admin
> interface kafka end offsets for dimension topic, start processing all
> messages from beginning and clears "stop file" after offset of messages
> reached end offsets for all partitions. Instead of "stop file" you could
> use shared lock in zookeeper.
>
> Hope this helps,
> Maxim.
>
> On Thu, Nov 14, 2019 at 7:42 AM vino yang  wrote:
>
>> Hi Vasily,
>>
>> Currently, Flink did not do the coordination between a general stream and
>> broadcast stream, they are both streams. Your scene of using the broadcast
>> state is a special one. In a more general scene, the states need to be
>> broadcasted is an unbounded stream, the state events may be broadcasted to
>> the downstream at any time. So it can not be wait to be done before playing
>> the usual stream events.
>>
>> For your scene:
>>
>>
>>- you can change your storage about dimension table, e.g. Redis or
>>MySQL and so on to do the stream and dimension table join;
>>- you can inject some control event in your broadcast stream to mark
>>the stream is end and let the fact stream wait until receiving the control
>>event. Or you can introduce a thrid-party coordinator e.g. ZooKeeper to
>>coordinate them, however, it would make your solution more complex.
>>
>> Best,
>> Vino
>>
>>
>> Vasily Melnik  于2019年11月14日周四
>> 下午1:28写道:
>>
>>> Hi all.
>>>
>>> In our task we have two Kafka topics:
>>> - one with fact stream (web traffic)
>>> - one with dimension
>>>
>>> We would like to put dimension data into broadcast state and lookup on
>>> int with facts. But we see that not all dimension records are put into
>>> state before first fact record is processed, so lookup gives no data.
>>>
>>> The question is: how could we read fact topic with some "delay" to give
>>> dimension enough time to initialize state?
>>>
>>>
>>> С уважением,
>>> Василий Мельник
>>>
>>


Broadcast checkpoint serialization fail

2019-11-15 Thread Vasily Melnik
Hi all.
In Flink 1.8 we have strange exception that causes job failing:

2019-11-14 15:52:52,071 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph- op4 (1/1)
(797d4c2b85010dab6be5e1d06ff6493a) switched from RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint
2 for operator op4 (1/1).}
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 2 for
operator op4 (1/1).
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
... 6 more
Caused by: java.util.concurrent.ExecutionException:
java.lang.NullPointerException
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
at
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
... 5 more
Caused by: java.lang.NullPointerException
at
com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:80)
at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:593)
at
com.esotericsoftware.kryo.serializers.DefaultSerializers$TreeSetSerializer.write(DefaultSerializers.java:608)
at
com.esotericsoftware.kryo.serializers.DefaultSerializers$TreeSetSerializer.write(DefaultSerializers.java:605)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:305)
at
org.apache.flink.runtime.state.HeapBroadcastState.write(HeapBroadcastState.java:109)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:167)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
at
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
... 7 more

As we see, exception occurs in
*org.apache.flink.runtime.state.HeapBroadcastState.write(HeapBroadcastState.java:109)
*
but what exactly is the reason?

We configured RocksDB state backend for job with local filesystem storage.


С уважением,
Василий Мельник


Re: slow checkpoints

2019-11-15 Thread vino yang
Hi Yubraj,

So the frequent job failure is the root reason, you need to fix it. Yes,
when too many messages are squashed into the message system. If the
messages can not be consumed normally, there would exist catchup consuming
which will cause your streaming system more pressure than usual.

Best,
Vino

yuvraj singh <19yuvrajsing...@gmail.com> 于2019年11月15日周五 下午5:25写道:

> @Congxian , back pressure is due to job failure , some times job can fail
> and we need to catch up .
>
> Thanks
> Yubraj Singh
>
> [image: Mailtrack]
> 
>  Sender
> notified by
> Mailtrack
> 
>  11/15/19,
> 02:53:23 PM
>
> On Fri, Nov 15, 2019 at 2:39 PM Congxian Qiu 
> wrote:
>
>> Hi
>>
>> Currently, checkpoint may be faile in high back pressure scenario,
>> because the barrier alignment can't be done in expected time, you should
>> fix the back pressure problem first. There is a FLIP[1] that wants to fix
>> this issue.
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints
>> Best,
>> Congxian
>>
>>
>> yuvraj singh <19yuvrajsing...@gmail.com> 于2019年11月15日周五 下午4:47写道:
>>
>>>
>>> Hi all ,
>>>
>>> I am facing one issue , when i have high back pressure my checkpoints
>>> start failing . please let me know how to deal with this kind  of
>>> situations .
>>>
>>>
>>> Thanks
>>> Yubraj Singh .
>>>
>>>
>>> [image: Mailtrack]
>>> 
>>>  Sender
>>> notified by
>>> Mailtrack
>>> 
>>>  11/15/19,
>>> 02:15:52 PM
>>>
>>


Re: How to unsubscribe the Apache projects and jira issues notification

2019-11-15 Thread Piotr Nowojski
Hi,

Please check the first link on google "unsubscribe user@flink.apache.org”

Piotrek

> On 15 Nov 2019, at 11:40, P. Ramanjaneya Reddy  wrote:
> 
> Hi
> 
> Following blogs want to unsubscribe kindly guide.
> 
> I tried from google..still mails receiving
> 
> Also should unubscribe..
> 
> j...@apache.org 
> 
> user@flink.apache.org 
> d...@flink.apache.org 
> d...@beam.apache.org 
> u...@beamho.apache.org 
> 
> Thanks 
> 



unsubscribe

2019-11-15 Thread Katherin Eri
-- 
With thanks, Katherin Pudikova


Re: How to unsubscribe the Apache projects and jira issues notification

2019-11-15 Thread Luke Cwik
https://apache.org/foundation/mailinglists.html#request-addresses-for-unsubscribing

If you want to subscribe to l...@apache.org then you need to send a message
to

list-subscr...@apache.org

To get off a list, send a message to

list-unsubscr...@apache.org

On Fri, Nov 15, 2019 at 2:40 AM P. Ramanjaneya Reddy 
wrote:

> Hi
>
> Following blogs want to unsubscribe kindly guide.
>
> I tried from google..still mails receiving
>
> Also should unubscribe..
>
> j...@apache.org
>
>
> user@flink.apache.org
>
> d...@flink.apache.org
>
> d...@beam.apache.org
>
> u...@beamho.apache.org 
>
>
> Thanks
>


Keyed raw state - example

2019-11-15 Thread bastien dine
Hello everyone,

I would like to know if anybody has a working example on how to declare a
keyed raw state ( in my case a keyedprocessoperator) and how to use  it in
my UDF (keyedprocessfunction)?

Basicaly we have a huge problem with a ValueState w Rocksdb, getting
serialized for every element ( need to access it and update) so it's taking
a crazy amount of time and we would like to have it serialized only on
snapshot, so using Raw state is a possible good solution,
But i cannot find anyexample of it :/

Thanks and best regards,

Bastien DINE
Freelance
Data Architect / Software Engineer / Sysadmin
http://bastiendine.io


how to setup a ha flink cluster on k8s?

2019-11-15 Thread Rock
I'm trying to setup a flink cluster on k8s for production use.But the setup 
here 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/deployment/kubernetes.html  this
 not ha , when job-manager down and rescheduled

the metadata for running job is lost. 

 

I tried to use ha setup for 
zk  https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/jobmanager_high_availability.html on
 k8s , but can't get it right.

 

Stroing  job's metadata on k8s using pvc or other external file 
system should be  very easy.Is there a way to achieve it.