Re: KeyBy distribution across taskslots

2019-02-28 Thread Congxian Qiu
Hi Maybe you could add a prefix for each key, so the hot keys can distributed to many tasks. Best, Congxian On Feb 28, 2019, 21:16 +0800, Yun Tang , wrote: > Hi, > > If you noticed that some key groups are hot and in high load, you could try > to increase the total key groups number (by

Re: How do I compute the average and keep track of a state over a window in DataStream?

2019-02-28 Thread Congxian Qiu
Hi Felipe Maybe you could use process function[1] [1]  https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/process_function.html Best, Congxian On Feb 28, 2019, 22:47 +0800, Felipe Gutierrez , wrote: > Hi all, > > I want to compute the average of two stream data

Re: Checkpoints and catch-up burst (heavy back pressure)

2019-02-28 Thread zhijiang
Hi Arnaud, Thanks for the further feedbacks! For option1: 40min still does not makes sense, which indicates it might take more time to finish checkpoint in your case. I also experienced some scenarios of catching up data to take several hours to finish one checkpoint. If the current

Re: Job Manager not able to fetch job info when restarted

2019-02-28 Thread sen
If I want the job be restarted after jobmanager restart ,is it must be zookeeper when on HA mode? high-availability: zookeeper -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Flink performance drops when async checkpoint is slow

2019-02-28 Thread zhijiang
Hi Paul, Thanks for your feedback. If the at-least-once mode still causes the problem, we can confirm it is not caused by blocking behavior in exactly-once-mode mentioned before. For at-least once, the task would continue processing the buffers following with barriers during allignment. But

Flink Kinesis Consumer

2019-02-28 Thread Steven Nelson
Hello! Does anyone know if the Flink Kinesis Consumer supports stopping rather than cancelling? I don't see that it implements StoppableFunction, but I might be wrong. -Steve

RE: Checkpoints and catch-up burst (heavy back pressure)

2019-02-28 Thread LINZ, Arnaud
Update : Option 1 does not work. It still fails at the end of the timeout, no matter its value. Should I implement a “bandwidth” management system by using artificial Thread.sleep in the source depending on the back pressure ? De : LINZ, Arnaud Envoyé : jeudi 28 février 2019 15:47 À :

Re: Flink Standalone cluster - production settings

2019-02-28 Thread Padarn Wilson
Are you able to give some detail on in which cases you might be better off setting higher (or lower) parallelism for an operator? On Thu, Feb 21, 2019 at 9:54 PM Hung wrote: > / Each job has 3 asynch operators > with Executors with thread counts of 20,20,100/ > > Flink handles parallelisms for

Re: okio and okhttp not shaded in the Flink Uber Jar on EMR

2019-02-28 Thread Austin Cawley-Edwards
Hi Gary, No, I am running a YARN session (which I start with: flink-yarn-session --slots 4 --taskManagerMemory 16GB --jobManagerMemory 3GB --detached) and submit jobs through the REST interface. Thank you for the tips - I will probably shade it on my side. Is there an official location that the

Re: Re: Re: Re: What are blobstore files and why do they keep filling up /tmp directory?

2019-02-28 Thread Kumar Bolar, Harshith
Thanks a lot. Looking into the logs sounds like a much cleaner approach :-) From: Till Rohrmann Date: Thursday, 28 February 2019 at 8:14 PM To: Harshith Kumar Bolar Cc: user Subject: [External] Re: Re: Re: What are blobstore files and why do they keep filling up /tmp directory? Yes this is

RE: Checkpoints and catch-up burst (heavy back pressure)

2019-02-28 Thread LINZ, Arnaud
Hi Zhihiang, Thanks for your feedback. * I’ll try option 1 ; time out is 4min for now, I’ll switch it to 40min and will let you know. Setting it higher than 40 min does not make much sense since after 40 min the pending output is already quite large. * Option 3 won’t work ; I already

How do I compute the average and keep track of a state over a window in DataStream?

2019-02-28 Thread Felipe Gutierrez
Hi all, I want to compute the average of two stream data sources and also keep track of a ValueState variable which is a CountMinSketch class that I implemented. For this, I tried to use RichAggregateFunction however it throws an exception saying that - Exception in thread "main"

Re: Re: Re: What are blobstore files and why do they keep filling up /tmp directory?

2019-02-28 Thread Till Rohrmann
Yes this is one way. Another way could be to look into the logs of the running TaskManagers. They should contain the path of the blob store directory. Cheers, Till On Thu, Feb 28, 2019 at 12:04 PM Kumar Bolar, Harshith wrote: > Is there any way to figure out which one is being run on the

Re: KeyBy distribution across taskslots

2019-02-28 Thread Yun Tang
Hi, If you noticed that some key groups are hot and in high load, you could try to increase the total key groups number (by increase the max parallelism), but pay attention that it would cause previous checkpoint cannot be restored . With the help of this, we might let the hot key groups share

Re: okio and okhttp not shaded in the Flink Uber Jar on EMR

2019-02-28 Thread Gary Yao
Hi Austin, Are you running your job detached in a per-job cluster? In that case inverted class loading does not work. This is because we add the user jar to the system class path, and there is no dynamic class loading involved at the moment [1]. You can try the YARN session mode, or – as Chesnay

Re: Flink performance drops when async checkpoint is slow

2019-02-28 Thread Paul Lam
Hi Zhijiang, Thanks a lot for your reasoning! I tried to set the checkpoint to at-leaset-once as you suggested, but unluckily the problem remains the same :( IMHO, if it’s caused by barrier alignment, the state size (mainly buffers during alignment) would be big, right? But actually it’s

Re: Re: Re: What are blobstore files and why do they keep filling up /tmp directory?

2019-02-28 Thread Kumar Bolar, Harshith
Is there any way to figure out which one is being run on the TaskManager? Would it be safe to assume that it is the latest directory created? Regards, Harshith From: Till Rohrmann Date: Thursday, 28 February 2019 at 3:28 PM To: Harshith Kumar Bolar Cc: user Subject: [External] Re: Re: What

Re: submit job failed on Yarn HA

2019-02-28 Thread Gary Yao
Hi Sen, I took a look at the CLI code again, and found out that -m is ignored if high- availability: ZOOKEEPER is configured in your flink-conf.yaml. This does not seem right and should be at least documented [1]. Judging from the client logs that you provided, I think the problem is that the

Re: KeyBy distribution across taskslots

2019-02-28 Thread Fabian Hueske
Hi, The answer is in fact no. Flink hash-partitions keys into Key Groups [1] which are uniformly assigned to tasks, i.e., a task can process more than one key group. AFAIK, there are no plans to change this behavior. Stefan (in CC) might be able to give more details on this. Something that might

Re: Checkpoints and catch-up burst (heavy back pressure)

2019-02-28 Thread zhijiang
Hi Arnaud, I think there are two key points. First the checkpoint barrier might be emitted delay from source under high backpressure for synchronizing lock. Second the barrier has to be queued in flighting data buffers, so the downstream task has to process all the buffers before barriers to

Re: Re: What are blobstore files and why do they keep filling up /tmp directory?

2019-02-28 Thread Till Rohrmann
Yes, at the moment this does not happen automatically. When deleting the directories you have to be careful not to delete the directory of a running TaskManager. Cheers, Till On Wed, Feb 27, 2019 at 6:29 PM Kumar Bolar, Harshith wrote: > Thanks Till, > > > > It appears to occur when a task

Setting source vs sink vs window parallelism with data increase

2019-02-28 Thread Padarn Wilson
Hi all, I'm trying to process many records, and I have an expensive operation I'm trying to optimize. Simplified it is something like: Data: (key1, count, time) Source -> Map(x -> (x, newKeyList(x.key1)) -> Flatmap(x -> x._2.map(y => (y, x._1.count, x._1.time)) ->

Re: Collapsing watermarks after keyby

2019-02-28 Thread Padarn Wilson
I created a small test to see if I could replicate this... but I couldn't :-) Below is my code that provides a counter example. It is not very clean, but perhaps it is useful for someone else in the future: class SessionWindowTest extends FunSuite with Matchers { test("Should advance

Re: flink wordcount中 sum是在什么时候,哪个地方调用的?

2019-02-28 Thread  
我明白了,感谢大家 ).RecordWriter.emit(),这个时候,数据是已经flatMap,map之后的值, 函数中会一条一条数据发送(a,1),(a,1),(a,1)).WindowOperator.processElement()函数中,收到数据后,调用 windowState.add(element.value), 其实调的是 HeapReducingState.add()函数,    这个state值在WindowOperator.windowState.stateTable.primaryTable.state 这个里边存着(key,value)  

Re: flink wordcount中 sum是在什么时候,哪个地方调用的?

2019-02-28 Thread  
On Thursday, 28 February 2019, 5:32:00 pm GMT+8, thinktothi...@yahoo.com.INVALID wrote:   我明白了,感谢大家).RecordWriter.emit(),这个时候,数据是已经flatMap,map之后的值, 函数中会一条一条数据发送(a,1),(a,1),(a,1)).WindowOperator.processElement()函数中,收到数据后,调用 windowState.add(element.value), 其实调的是

Re: flink wordcount中 sum是在什么时候,哪个地方调用的?

2019-02-28 Thread  
我明白了,感谢大家).RecordWriter.emit(),这个时候,数据是已经flatMap,map之后的值, 函数中会一条一条数据发送(a,1),(a,1),(a,1)).WindowOperator.processElement()函数中,收到数据后,调用 windowState.add(element.value), 其实调的是 HeapReducingState.add()函数,这个state值在WindowOperator.windowState.stateTable.primaryTable.state 这个里边存着(key,value) 

Checkpoints and catch-up burst (heavy back pressure)

2019-02-28 Thread LINZ, Arnaud
Hello, I have a simple streaming app that get data from a source and store it to HDFS using a sink similar to the bucketing file sink. Checkpointing mode is “exactly once”. Everything is fine on a “normal” course as the sink is faster than the source; but when we stop the application for a

Re: Breakage in Flink CLI in 1.5.0

2019-02-28 Thread sen
Hi Till: So how can we get the right rest address and port when using HA mode on Yarn? I get it from the rest api "/jars ". But when I submit a job use the flink run -m ,it failed . org.apache.flink.client.program.ProgramInvocationException: Could not retrieve

Two Kubernetes clusters and one Flink cluster?

2019-02-28 Thread Thomas Eckestad
Hi, I'm working with two separate Kubernetes clusters located in different regions (hosted in proprietary data centers), the distance between the regions introduces a pretty high (~50ms) latency between the clusters, so communication should not go cross-site unless necessary. I would like to

Re: flink wordcount中 sum是在什么时候,哪个地方调用的?

2019-02-28 Thread mayangyang02
@thinktothings 这个sum其实是维护在WindowOperator的state里的。 你看下WindowedStream#sum()会发现最终会调用到WindowedStream#reduce()。 而对reduce来说,这个state的实现类是HeapReducingState,对这个state来说,调用其add方法是就会实现聚合(对sum来说就是相加)。在window emit时,就会将这个维护的state发送出去。 原始邮件 发件人:Yaoting gongfall.for.you@gmail.com

Re: Flink performance drops when async checkpoint is slow

2019-02-28 Thread zhijiang
Hi Paul, I am not sure whether task thread is involverd in some works during snapshoting states for FsStateBackend. But I have another experience which might also cause your problem. From your descriptions below, the last task is blocked by `SingleInputGate.getNextBufferOrEvent` that means the