Hi
Maybe you could add a prefix for each key, so the hot keys can distributed to
many tasks.
Best, Congxian
On Feb 28, 2019, 21:16 +0800, Yun Tang , wrote:
> Hi,
>
> If you noticed that some key groups are hot and in high load, you could try
> to increase the total key groups number (by
Hi Felipe
Maybe you could use process function[1]
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/process_function.html
Best, Congxian
On Feb 28, 2019, 22:47 +0800, Felipe Gutierrez ,
wrote:
> Hi all,
>
> I want to compute the average of two stream data
Hi Arnaud,
Thanks for the further feedbacks!
For option1: 40min still does not makes sense, which indicates it might take
more time to finish checkpoint in your case. I also experienced some scenarios
of catching up data to take several hours to finish one checkpoint. If the
current
If I want the job be restarted after jobmanager restart ,is it must be
zookeeper when on HA mode?
high-availability: zookeeper
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Hi Paul,
Thanks for your feedback. If the at-least-once mode still causes the problem,
we can confirm it is not caused by blocking behavior in exactly-once-mode
mentioned before.
For at-least once, the task would continue processing the buffers following
with barriers during allignment. But
Hello!
Does anyone know if the Flink Kinesis Consumer supports stopping rather
than cancelling? I don't see that it implements StoppableFunction, but I
might be wrong.
-Steve
Update :
Option 1 does not work. It still fails at the end of the timeout, no matter
its value.
Should I implement a “bandwidth” management system by using artificial
Thread.sleep in the source depending on the back pressure ?
De : LINZ, Arnaud
Envoyé : jeudi 28 février 2019 15:47
À :
Are you able to give some detail on in which cases you might be better off
setting higher (or lower) parallelism for an operator?
On Thu, Feb 21, 2019 at 9:54 PM Hung wrote:
> / Each job has 3 asynch operators
> with Executors with thread counts of 20,20,100/
>
> Flink handles parallelisms for
Hi Gary,
No, I am running a YARN session (which I start with: flink-yarn-session
--slots 4 --taskManagerMemory 16GB --jobManagerMemory 3GB --detached) and
submit jobs through the REST interface. Thank you for the tips - I will
probably shade it on my side. Is there an official location that the
Thanks a lot. Looking into the logs sounds like a much cleaner approach :-)
From: Till Rohrmann
Date: Thursday, 28 February 2019 at 8:14 PM
To: Harshith Kumar Bolar
Cc: user
Subject: [External] Re: Re: Re: What are blobstore files and why do they keep
filling up /tmp directory?
Yes this is
Hi Zhihiang,
Thanks for your feedback.
* I’ll try option 1 ; time out is 4min for now, I’ll switch it to 40min and
will let you know. Setting it higher than 40 min does not make much sense since
after 40 min the pending output is already quite large.
* Option 3 won’t work ; I already
Hi all,
I want to compute the average of two stream data sources and also keep
track of a ValueState variable which is a CountMinSketch class that I
implemented. For this, I tried to use RichAggregateFunction however it
throws an exception saying that - Exception in thread "main"
Yes this is one way. Another way could be to look into the logs of the
running TaskManagers. They should contain the path of the blob store
directory.
Cheers,
Till
On Thu, Feb 28, 2019 at 12:04 PM Kumar Bolar, Harshith
wrote:
> Is there any way to figure out which one is being run on the
Hi,
If you noticed that some key groups are hot and in high load, you could try to
increase the total key groups number (by increase the max parallelism), but pay
attention that it would cause previous checkpoint cannot be restored . With the
help of this, we might let the hot key groups share
Hi Austin,
Are you running your job detached in a per-job cluster? In that case
inverted
class loading does not work. This is because we add the user jar to the
system
class path, and there is no dynamic class loading involved at the moment
[1].
You can try the YARN session mode, or – as Chesnay
Hi Zhijiang,
Thanks a lot for your reasoning!
I tried to set the checkpoint to at-leaset-once as you suggested, but unluckily
the problem remains the same :(
IMHO, if it’s caused by barrier alignment, the state size (mainly buffers
during alignment) would be big, right? But actually it’s
Is there any way to figure out which one is being run on the TaskManager? Would
it be safe to assume that it is the latest directory created?
Regards,
Harshith
From: Till Rohrmann
Date: Thursday, 28 February 2019 at 3:28 PM
To: Harshith Kumar Bolar
Cc: user
Subject: [External] Re: Re: What
Hi Sen,
I took a look at the CLI code again, and found out that -m is ignored if
high-
availability: ZOOKEEPER is configured in your flink-conf.yaml. This does not
seem right and should be at least documented [1].
Judging from the client logs that you provided, I think the problem is that
the
Hi,
The answer is in fact no.
Flink hash-partitions keys into Key Groups [1] which are uniformly assigned
to tasks, i.e., a task can process more than one key group.
AFAIK, there are no plans to change this behavior.
Stefan (in CC) might be able to give more details on this.
Something that might
Hi Arnaud,
I think there are two key points. First the checkpoint barrier might be emitted
delay from source under high backpressure for synchronizing lock.
Second the barrier has to be queued in flighting data buffers, so the
downstream task has to process all the buffers before barriers to
Yes, at the moment this does not happen automatically. When deleting the
directories you have to be careful not to delete the directory of a running
TaskManager.
Cheers,
Till
On Wed, Feb 27, 2019 at 6:29 PM Kumar Bolar, Harshith
wrote:
> Thanks Till,
>
>
>
> It appears to occur when a task
Hi all,
I'm trying to process many records, and I have an expensive operation I'm
trying to optimize. Simplified it is something like:
Data: (key1, count, time)
Source -> Map(x -> (x, newKeyList(x.key1))
-> Flatmap(x -> x._2.map(y => (y, x._1.count, x._1.time))
->
I created a small test to see if I could replicate this... but I couldn't
:-) Below is my code that provides a counter example. It is not very clean,
but perhaps it is useful for someone else in the future:
class SessionWindowTest extends FunSuite with Matchers {
test("Should advance
我明白了,感谢大家
).RecordWriter.emit(),这个时候,数据是已经flatMap,map之后的值,
函数中会一条一条数据发送(a,1),(a,1),(a,1)).WindowOperator.processElement()函数中,收到数据后,调用
windowState.add(element.value), 其实调的是 HeapReducingState.add()函数,
这个state值在WindowOperator.windowState.stateTable.primaryTable.state
这个里边存着(key,value)
On Thursday, 28 February 2019, 5:32:00 pm GMT+8,
thinktothi...@yahoo.com.INVALID wrote:
我明白了,感谢大家).RecordWriter.emit(),这个时候,数据是已经flatMap,map之后的值,
函数中会一条一条数据发送(a,1),(a,1),(a,1)).WindowOperator.processElement()函数中,收到数据后,调用
windowState.add(element.value), 其实调的是
我明白了,感谢大家).RecordWriter.emit(),这个时候,数据是已经flatMap,map之后的值,
函数中会一条一条数据发送(a,1),(a,1),(a,1)).WindowOperator.processElement()函数中,收到数据后,调用
windowState.add(element.value), 其实调的是
HeapReducingState.add()函数,这个state值在WindowOperator.windowState.stateTable.primaryTable.state
这个里边存着(key,value)
Hello,
I have a simple streaming app that get data from a source and store it to HDFS
using a sink similar to the bucketing file sink. Checkpointing mode is “exactly
once”.
Everything is fine on a “normal” course as the sink is faster than the source;
but when we stop the application for a
Hi Till:
So how can we get the right rest address and port when using HA mode
on Yarn? I get it from the rest api "/jars ". But when I submit a job use
the flink run -m ,it failed .
org.apache.flink.client.program.ProgramInvocationException: Could
not retrieve
Hi,
I'm working with two separate Kubernetes clusters located in different regions
(hosted in proprietary data centers), the distance between the regions
introduces a pretty high (~50ms) latency between the clusters, so communication
should not go cross-site unless necessary. I would like to
@thinktothings
这个sum其实是维护在WindowOperator的state里的。
你看下WindowedStream#sum()会发现最终会调用到WindowedStream#reduce()。
而对reduce来说,这个state的实现类是HeapReducingState,对这个state来说,调用其add方法是就会实现聚合(对sum来说就是相加)。在window
emit时,就会将这个维护的state发送出去。
原始邮件
发件人:Yaoting gongfall.for.you@gmail.com
Hi Paul,
I am not sure whether task thread is involverd in some works during snapshoting
states for FsStateBackend. But I have another experience which might also cause
your problem.
From your descriptions below, the last task is blocked by
`SingleInputGate.getNextBufferOrEvent` that means the
31 matches
Mail list logo