Re:Flink Memory Usage

2018-05-09 Thread
Hi Pedro,
since you are using RocksDB backend, RocksDB will consume some extra native 
memory, sometimes the amount of that could be very large, because the default 
setting of RocksDB will keep a `BloomFilter` for every opened sst in memory, 
and the number of the opened sst is not limited by default, so in theory, the 
native memory consumed by the `BloomFilter` will increase with the number of 
sst. Beside the memory consumed by `BloomFilter`, there are some other parts in 
RocksDB will also consumed some native memory, e.g the size of the WriteBuffer, 
the max number of the WriteBuffer and so on, but there not the mainly one in 
general. You can find more information about the memory allocation of RocksDB 
here: https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB, also 
there is already an issue that related to your question, maybe you can also 
find some useful information there. 
https://issues.apache.org/jira/browse/FLINK-7289.
Concerning to your separate questions.
1. Shouldn't the sum of the job manager memory and the task manager 
memoryaccount for all the memory allocated by Flink?  Am I missing any
configuration?


No, it only define the size of memory that controlled by the JVM. There would 
be some extra native memory consumed by RocksDB if you're using the 
RocksDBBackend.


2. How can I mantain the server working in this scenario?


Since you are using the RocksDB backend, the size of the native memory consumed 
by RocksDB is pretty hard to controlled, in the most safety case, you can turn 
off the filter cache(in general, this is the mainly memory consumer, but this 
will hurt your performance), and also reduce the size of the WriteBuffer and 
also the number of the max WriteBuffer.


3. I thought that RocksDB would do the job, but it didn't happen. 


The memory consumed by the RocksDB can not be precisely limited yet. You can 
change the options to control it coarse-grained.
 
4. In the past, I have seen Flink taking a checkpoint of 3GB, but
allocating initially 35GB of RAM. Where does this extra memory come from?


I think the extr memory is the native memory consumed by RocksDB, and the most 
of them are used for filter caching.


Since this type of email should go into `user mail list` in general, so I 
redict it there. and I think maybe stefan(cc) could tell more about your 
question, and plz correct me if I'm wrong.


Best,
Sihua






On 05/10/2018 03:12,Pedro Elias wrote:
Hi,

I have Flink running on 2 docker images, one for the job manager, and one
for the task manager, with the configuration below.

64GB RAM machine
200 GB SSD used only by RocksDB

Flink's memory configuration file is like that:

jobmanager.heap.mb: 3072
taskmanager.heap.mb: 53248
taskmanager.memory.fraction: 0.7

I have a very large and heavy job running in this server. The problem is
that the task manager is trying to take more memory than defined on the
configuration and eventually crashes the server, although the heap never
reaches the maximum memory. The last memory log before crashing shows:

Memory usage stats: [HEAP: 44432/53248/53248 MB, NON HEAP: 157/160/-1 MB
(used/committed/max)]

But the memory used by the task manager container is near 64GB


I have some doubts regarding memory usage of Flink.


1. Shouldn't the sum of the job manager memory and the task manager memory
account for all the memory allocated by Flink?  Am I missing any
configuration?

2. How can I mantain the server working in this scenario?

3. I thought that RocksDB would do the job, but it didn't happen.

4. In the past, I have seen Flink taking a checkpoint of 3GB, but
allocating initially 35GB of RAM. Where does this extra memory come from?


Can anyone help me, please?

Thanks in advance.

Pedro Luis


Re: Externalized checkpoints and metadata

2018-04-26 Thread


Hi Juan,


I think you are right and there maybe more then 3 companies implementing 
different solutions for this...I created a ticket to address it here 
https://issues.apache.org/jira/browse/FLINK-9260. Hope this could help to  
reduce other's redundant efforts on this...(If it could be accepted by 
community finally)


Best Regards,
Sihua Zhou


On 04/26/2018 16:35,Juan Gentile wrote:

Hello all,

 

Thank you all for your responses, I’ll take a look at your code Hao, and 
probably implement something similar.

I’d like to ask though, so as to know what we could expect from Flink in the 
future, if this issue will be addressed somehow, considering that we have 
already 3 different companies implementing different (but similar) solutions to 
solve the same problem.

Maybe we could think of adding this issue to here: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-10%3A+Unify+Checkpoints+and+Savepoints
 ?

 

Thank you,

Juan G.

 

From: hao gao 
Date: Wednesday, 25 April 2018 at 20:25
To: Juan Gentile 
Cc: "user@flink.apache.org" , Oleksandr Nitavskyi 

Subject: Re: Externalized checkpoints and metadata

 

Hi Juan,

 

We modified the flink code a little bit to change the flink checkpoint 
structure so we can easily identify which is which

you can read my note or the PR

https://medium.com/hadoop-noob/flink-externalized-checkpoint-eb86e693cfed

https://github.com/BranchMetrics/flink/pull/6/files

Hope it helps

 

Thanks

Hao

 

2018-04-25 6:07 GMT-07:00 Juan Gentile :

Hello,

 

We are trying to use externalized checkpoints, using RocksDB on Hadoop hdfs.

We would like to know what is the proper way to resume from a saved checkpoint 
as we are currently running many jobs in the same flink cluster.

 

The problem is that when we want to restart the jobs and pass the metadata file 
(or directory) there is 1 file per job but they are not easily identifiable 
based on the name:

Example

/checkpoints/checkpoint_metadata-69053704a5ca

/checkpoints/checkpoint_metadata-c7c016909607

 

We are not using savepoints and reading the documentation I see there are 2 
ways to resume, 1 passing the metadata file (not possible as we have many jobs) 
and the other passing the directory,

But by default it looks for a _metadata file which doesn’t exist.

 

Thank you,

Juan G.





 

--

Thanks

 - Hao 

Re: Why assignTimestampsAndWatermarks parallelism same as map,it will not fired?

2018-04-25 Thread
Hi 潘,
could you please check the number of kafka's partitions, I think if the 
{{number of kafka partition}} <  {{parallelism of source node}}) then there can 
be some idle parallel which won't recevice any data...


Best Regards,
Sihua Zhou






On 04/26/2018 10:44,TechnoMage wrote:
If you are using keyed messages in Kafka, or keyed streams in flink, then only 
partitions that get hashed to the proper value will get data.  If not keyed 
messages, then yes they should all get data.


Michael


On Apr 25, 2018, at 8:25 PM, 潘 功森  wrote:


The event is running all the time in order,I don't know why one of the 
partitions does not receive data if not change parallelism?



发件人: Fabian Hueske 
发送时间: 2018年4月25日 10:56
收件人: Timo Walther
抄送: user
主题: Re: Why assignTimestampsAndWatermarks parallelism same as map,it will not 
fired?
 
Hi,


This sounds like one of the partitions does not receive data. Watermark 
generation is data driven, i.e., the watermark can only advance if the 
TimestampAndWatermarkAssigner sees events.

By changing the parallelism between the map and the assigner, the events are 
shuffled across and hence there is no "empty" partition anymore.


I would check if one instance of your sources does not emit events.


Best, Fabian



2018-04-25 10:43 GMT+02:00 Timo Walther :
Hi,

did you set your time characteristics to even-time?

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

Regards,
Timo

Am 25.04.18 um 05:15 schrieb 潘 功森:

Hi all,


I use the same parallelism between map and assignTimestampsAndWatermarks , and 
it not fired, I saw the extractTimestamp and generateWatermark all is fine, but 
watermark is always not change and keep as min long value.

And then I changed parallelism and different with map, and windows fired.

I used Flink 1.3.2.

Is it a Flink bug?or others can give me why it not fired. It troubled me the 
whole day.


Best regards,

September



Re: keyBy and parallelism

2018-04-12 Thread
Hi Christophe,
I think what you want to do is "stream join", and I'm a bit confuse that if you 
have know there are only 8 keys   then why would you still like to use 16 
parallelisms? 8 of them will be idle(a waste of CPU). In the KeyedStream, the 
tuples with the same key will be sent to the same parrallelism. 


And I'm also a bit confuse about the pseudo code, it looks like you regard that 
the tuple with the same key in stream A will always arrive before the tuple in 
stream B? I think that can't be promised... you may need to store the tuple in 
stream B in case that tuple in stream B arrive before A, and do the "analysis 
logic" in both flatMap1() and flatMap2().


Regards,
Sihua Zhou


On 04/12/2018 15:44,Christophe Jolif wrote:
Thanks Chesnay (and others).


That's what I was figuring out. Now let's go onto the follow up with my exact 
use-case.


I have two streams A and B. A basically receives "rules" that the processing of 
B should observe to process.


There is a "key" that allows me to know that a rule x coming in A is for events 
with the same key coming in B.


I was planning to do (pseudo code):


A.connect(B).keyBy("thekey").flatMap(
   flatMap1()
  -> store in a ValueState the rule 
   flatMap2()
  -> use the state to get the rule, transform the element according to the 
rule, collect it
)




I think it should work, right, because the ValueState will be "per key" and 
contain the rule for this key and so on?


Now, what I really care is not having all the elements of key1 in the same 
parallelism, I just want to make sure key1 and key2 are isolated so I can use 
the key state to store the corresponding rule and key2 rules are not used for 
key1 and conversely.


So ideally instead of using 8 parallelisms, in order to use the full power of 
my system, even with 8 keys I would like to use 16 parallelisms as I don't care 
about all elements of key1 being in the same parallelism. All I care is that 
the state contain the rule corresponding to this key.


What would be the recommended approach here?


Thanks again for your help,
--
Christophe




On Thu, Apr 12, 2018 at 9:31 AM, Chesnay Schepler  wrote:

You will get 16 parallel executions since you specify a parallellism of 16, 
however 8 of these will not get any data.


On 11.04.2018 23:29, Hao Sun wrote:

From what I learnt, you have to control parallelism your self. You can set 
parallelism on operator or set default one through flink-config.yaml.
I might be wrong.


On Wed, Apr 11, 2018 at 2:16 PM Christophe Jolif  wrote:

Hi all,


Imagine I have a default parallelism of 16 and I do something like


stream.keyBy("something").flatMap()


Now let's imagine I have less than 16 keys, maybe 8.


How many parallel executions of the flatMap function will I get? 8 because I 
have 8 keys, or 16 because I have default parallelism at 16?


(and I will have follow up questions depending on the answer I suspect ;))


Thanks,
--

Christophe




Re: java.lang.Exception: TaskManager was lost/killed

2018-04-10 Thread
Hi Lasse,


I met that before. I think maybe the non-heap memory trend of the graph you 
attached is the "expected" result ... Because rocksdb will keep the a "filter 
(bloom filter)" in memory for every opened sst file by default, and the num of 
the sst file will increase by time, so it looks like a leak. There is a 
issue(https://issues.apache.org/jira/browse/FLINK-7289) Stefan created to track 
this, and the 
page(https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB) from 
RocksDB's wiki could give you a better understand of the memory used by 
RocksDB, and Stefan please correct me if I bring any wrong information above.


Best Regards,
Sihua Zhou
On 04/11/2018 09:55,Ted Yu wrote:
Please see the last comment on this issue:


https://github.com/facebook/rocksdb/issues/3216



FYI


On Tue, Apr 10, 2018 at 12:25 AM, Lasse Nedergaard  
wrote:


This graph shows Non-Heap . If the same pattern exists it make sense that it 
will try to allocate more memory and then exceed the limit. I can see the trend 
for all other containers that has been killed. So my question is now, what is 
using non-heap memory?
From 
http://mail-archives.apache.org/mod_mbox/flink-user/201707.mbox/%3ccanc1h_u0dqqvbysdaollbemewaxiimtmfjjcribpfpo0idl...@mail.gmail.com%3E
 it look like RockDb could be guilty.


I have job using incremental checkpointing and some without, some optimised for 
FLASH_SSD. all have same pattern


Lasse 






2018-04-10 8:52 GMT+02:00 Lasse Nedergaard :

Hi.


I found the exception attached below, for our simple job. It states that our 
task-manager was killed du to exceed memory limit on 2.7GB. But when I look at 
Flink metricts just 30 sec before it use 1.3 GB heap and 712 MB Non-Heap around 
2 GB. 
So something else are also using memory inside the conatianer any idea how to 
figure out what?
As a side note we use RockDBStateBackend with this configuration


env.getCheckpointConfig().setMinPauseBetweenCheckpoints((long)(config.checkPointInterval
 * 0.75));
env.enableCheckpointing(config.checkPointInterval, 
CheckpointingMode.AT_LEAST_ONCE);
env.setStateBackend(new RocksDBStateBackend(config.checkpointDataUri));
Where checkpointDataUri point to S3


Lasse Nedergaard



2018-04-09 16:52:01,239 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
- Diagnostics for container container_1522921976871_0001_01_79 
in state COMPLETE : exitStatus=Pmem limit exceeded (-104) diagnostics=Container 
[pid=30118,containerID=container_1522921976871_0001_01_79] is running 
beyond physical memory limits. Current usage: 2.7 GB of 2.7 GB physical memory 
used; 4.9 GB of 13.4 GB virtual memory used. Killing container.

Dump of the process-tree for container_1522921976871_0001_01_79 :

|- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) 
VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE

|- 30136 30118 30118 30118 (java) 245173 68463 5193723904 703845 
/usr/lib/jvm/java-openjdk/bin/java -Xms2063m -Xmx2063m 
-Dlog.file=/var/log/hadoop-yarn/containers/application_1522921976871_0001/container_1522921976871_0001_01_79/taskmanager.log
 -Dlogback.configurationFile=file:./logback.xml 
-Dlog4j.configuration=file:./log4j.properties 
org.apache.flink.yarn.YarnTaskManager --configDir . 

|- 30118 30116 30118 30118 (bash) 0 0 115818496 674 /bin/bash -c 
/usr/lib/jvm/java-openjdk/bin/java -Xms2063m -Xmx2063m  
-Dlog.file=/var/log/hadoop-yarn/containers/application_1522921976871_0001/container_1522921976871_0001_01_79/taskmanager.log
 -Dlogback.configurationFile=file:./logback.xml 
-Dlog4j.configuration=file:./log4j.properties 
org.apache.flink.yarn.YarnTaskManager --configDir . 1> 
/var/log/hadoop-yarn/containers/application_1522921976871_0001/container_1522921976871_0001_01_79/taskmanager.out
 2> 
/var/log/hadoop-yarn/containers/application_1522921976871_0001/container_1522921976871_0001_01_79/taskmanager.err
 




2018-04-09 16:51:26,659 DEBUG org.trackunit.tm2.LogReporter 
- 
gauge.ip-10-1-1-181.taskmanager.container_1522921976871_0001_01_79.Status.JVM.Memory.Heap.Used=1398739496




2018-04-09 16:51:26,659 DEBUG org.trackunit.tm2.LogReporter 
- 
gauge.ip-10-1-1-181.taskmanager.container_1522921976871_0001_01_79.Status.JVM.Memory.NonHeap.Used=746869520







 


2018-04-09 23:52 GMT+02:00 Ken Krugler :

Hi Chesnay,


Don’t know if this helps, but I’d run into this as well, though I haven’t 
hooked up YourKit to analyze exactly what’s causing the memory problem.


E.g. after about 3.5 hours running locally, it failed with memory issues.



In the TaskManager logs, I start seeing exceptions in my code….


java.lang.OutOfMemoryError: GC overhead limit exceeded


And then eventually...


2018-04-07 21:55:25,686 WARN  
org.apache.flink.runtime.accumulators.AccumulatorRegistry - Failed to 
serialize accumulators for task.
java.lang.OutOfMemoryError: GC overhead limit exceeded


Immedia

Re: checkpoint stuck with rocksdb statebackend and s3 filesystem

2018-03-05 Thread


Hi Tony,


About to your question: average end to end latency of checkpoint is less than 
1.5 mins, doesn't means that checkpoint won't timeout. indeed, it determined 
byt the max end to end latency (the slowest one), a checkpoint truly completed 
only after all task's checkpoint have completed.


About to the problem: after a second look at the info you privode, we can found 
from the checkpoint detail picture that there is one task which cost 4m20s to 
transfer it snapshot (about 482M) to s3 and there are 4 others tasks didn't 
complete the checkpoint yet. And from the bad_tm_pic.png vs good_tm_pic.png, we 
can found that on "bad tm" the network performance is far less than the "good 
tm" (-15 MB vs -50MB). So I guss the network is a problem, sometimes it failed 
to send 500M data to s3 in 10 minutes. (maybe you need to check whether the 
network env is stable)


About the solution: I think incremental checkpoint can help you a lot, it will 
only send the new data each checkpoint, but you are right if the increment 
state size is huger than 500M, it might cause the timeout problem again 
(because of the bad network performance).


Best Regards,
Sihua Zhou


发自网易邮箱大师


On 03/6/2018 13:02,Tony Wei wrote:
Hi Sihua,


Thanks for your suggestion. "incremental checkpoint" is what I will try out 
next and I know it will give a better performance. However, it might not solve 
this issue completely, because as I said, the average end to end latency of 
checkpointing is less than 1.5 mins currently, and it is far from my timeout 
configuration. I believe "incremental checkpoint" will reduce the latency and 
make this issue might occur seldom, but I can't promise it won't happen again 
if I have bigger states growth in the future. Am I right?


Best Regards,
Tony Wei 


2018-03-06 10:55 GMT+08:00 周思华 :

Hi Tony,


Sorry for jump into, one thing I want to remind is that from the log you 
provided it looks like you are using "full checkpoint", this means that the 
state data that need to be checkpointed and transvered to s3 will grow over 
time, and even for the first checkpoint it performance is slower that 
incremental checkpoint (because it need to iterate all the record from the 
rocksdb using the RocksDBMergeIterator). Maybe you can try out "incremental 
checkpoint", it could help you got a better performance.


Best Regards,
Sihua Zhou


发自网易邮箱大师


On 03/6/2018 10:34,Tony Wei wrote:
Hi Stefan,


I see. That explains why the loading of machines grew up. However, I think it 
is not the root cause that led to these consecutive checkpoint timeout. As I 
said in my first mail, the checkpointing progress usually took 1.5 mins to 
upload states, and this operator and kafka consumer are only two operators that 
have states in my pipeline. In the best case, I should never encounter the 
timeout problem that only caused by lots of pending checkpointing threads that 
have already timed out. Am I right?


Since these logging and stack trace was taken after nearly 3 hours from the 
first checkpoint timeout, I'm afraid that we couldn't actually find out the 
root cause for the first checkpoint timeout. Because we are preparing to make 
this pipeline go on production, I was wondering if you could help me find out 
where the root cause happened: bad machines or s3 or flink-s3-presto packages 
or flink checkpointing thread. It will be great if we can find it out from 
those informations the I provided, or a hypothesis based on your experience is 
welcome as well. The most important thing is that I have to decide whether I 
need to change my persistence filesystem or use another s3 filesystem package, 
because it is the last thing I want to see that the checkpoint timeout happened 
very often.


Thank you very much for all your advices.


Best Regards,
Tony Wei


2018-03-06 1:07 GMT+08:00 Stefan Richter :

Hi,


thanks for all the info. I had a look into the problem and opened 
https://issues.apache.org/jira/browse/FLINK-8871 to fix this. From your stack 
trace, you can see many checkpointing threads are running on your TM for 
checkpoints that have already timed out, and I think this cascades and slows 
down everything. Seems like the implementation of some features like checkpoint 
timeouts and not failing tasks from checkpointing problems overlooked that we 
also require to properly communicate that checkpoint cancellation to all task, 
which was not needed before.


Best,
Stefan




Am 05.03.2018 um 14:42 schrieb Tony Wei :


Hi Stefan,


Here is my checkpointing configuration.



| Checkpointing Mode | Exactly Once |
| Interval | 20m 0s |
| Timeout | 10m 0s |
| Minimum Pause Between Checkpoints | 0ms |
| Maximum Concurrent Checkpoints | 1 |
| Persist Checkpoints Externally | Enabled (delete on cancellation) |
Best Regards,

Tony Wei


2018-03-05 21:30 GMT+08:00 Stefan Richter :
Hi,

quick question: what is your exact chec

Re: checkpoint stuck with rocksdb statebackend and s3 filesystem

2018-03-05 Thread
Hi Tony,


Sorry for jump into, one thing I want to remind is that from the log you 
provided it looks like you are using "full checkpoint", this means that the 
state data that need to be checkpointed and transvered to s3 will grow over 
time, and even for the first checkpoint it performance is slower that 
incremental checkpoint (because it need to iterate all the record from the 
rocksdb using the RocksDBMergeIterator). Maybe you can try out "incremental 
checkpoint", it could help you got a better performance.


Best Regards,
Sihua Zhou


发自网易邮箱大师


On 03/6/2018 10:34,Tony Wei wrote:
Hi Stefan,


I see. That explains why the loading of machines grew up. However, I think it 
is not the root cause that led to these consecutive checkpoint timeout. As I 
said in my first mail, the checkpointing progress usually took 1.5 mins to 
upload states, and this operator and kafka consumer are only two operators that 
have states in my pipeline. In the best case, I should never encounter the 
timeout problem that only caused by lots of pending checkpointing threads that 
have already timed out. Am I right?


Since these logging and stack trace was taken after nearly 3 hours from the 
first checkpoint timeout, I'm afraid that we couldn't actually find out the 
root cause for the first checkpoint timeout. Because we are preparing to make 
this pipeline go on production, I was wondering if you could help me find out 
where the root cause happened: bad machines or s3 or flink-s3-presto packages 
or flink checkpointing thread. It will be great if we can find it out from 
those informations the I provided, or a hypothesis based on your experience is 
welcome as well. The most important thing is that I have to decide whether I 
need to change my persistence filesystem or use another s3 filesystem package, 
because it is the last thing I want to see that the checkpoint timeout happened 
very often.


Thank you very much for all your advices.


Best Regards,
Tony Wei


2018-03-06 1:07 GMT+08:00 Stefan Richter :

Hi,


thanks for all the info. I had a look into the problem and opened 
https://issues.apache.org/jira/browse/FLINK-8871 to fix this. From your stack 
trace, you can see many checkpointing threads are running on your TM for 
checkpoints that have already timed out, and I think this cascades and slows 
down everything. Seems like the implementation of some features like checkpoint 
timeouts and not failing tasks from checkpointing problems overlooked that we 
also require to properly communicate that checkpoint cancellation to all task, 
which was not needed before.


Best,
Stefan




Am 05.03.2018 um 14:42 schrieb Tony Wei :


Hi Stefan,


Here is my checkpointing configuration.



| Checkpointing Mode | Exactly Once |
| Interval | 20m 0s |
| Timeout | 10m 0s |
| Minimum Pause Between Checkpoints | 0ms |
| Maximum Concurrent Checkpoints | 1 |
| Persist Checkpoints Externally | Enabled (delete on cancellation) |
Best Regards,

Tony Wei


2018-03-05 21:30 GMT+08:00 Stefan Richter :
Hi,

quick question: what is your exact checkpointing configuration? In particular, 
what is your value for the maximum parallel checkpoints and the minimum time 
interval to wait between two checkpoints?

Best,
Stefan

> Am 05.03.2018 um 06:34 schrieb Tony Wei :
>
> Hi all,
>
> Last weekend, my flink job's checkpoint start failing because of timeout. I 
> have no idea what happened, but I collect some informations about my cluster 
> and job. Hope someone can give me advices or hints about the problem that I 
> encountered.
>
> My cluster version is flink-release-1.4.0. Cluster has 10 TMs, each has 4 
> cores. These machines are ec2 spot instances. The job's parallelism is set as 
> 32, using rocksdb as state backend and s3 presto as checkpoint file system.
> The state's size is nearly 15gb and still grows day-by-day. Normally, It 
> takes 1.5 mins to finish the whole checkpoint process. The timeout 
> configuration is set as 10 mins.
>
> 
>
> As the picture shows, not each subtask of checkpoint broke caused by timeout, 
> but each machine has ever broken for all its subtasks during last weekend. 
> Some machines recovered by themselves and some machines recovered after I 
> restarted them.
>
> I record logs, stack trace and snapshot for machine's status (CPU, IO, 
> Network, etc.) for both good and bad machine. If there is a need for more 
> informations, please let me know. Thanks in advance.
>
> Best Regards,
> Tony Wei.
> 









Re: A "per operator instance" window all ?

2018-02-18 Thread
Hi Julien,
If I am not misunderstand, I think you can key your stream on a 
`Random.nextInt() % parallesm`, this way  you can "group" together alerts from 
different and benefit from multi parallems.




发自网易邮箱大师


On 02/19/2018 09:08,Xingcan Cui wrote:
Hi Julien,


sorry for my misunderstanding before. For now, the window can only be defined 
on a KeyedStream or an ordinary DataStream but with parallelism = 1. I’d like 
to provide three options for your scenario.


1. If your external data is static and can be fit into the memory, you can use 
ManagedStates to cache them without considering the querying problem.
2. Or you can use a CustomPartitioner to manually distribute your alert data 
and simulate an window operation by yourself in a ProcessFuncton.
3. You may also choose to use some external systems such as in-memory store, 
which can work as a cache for your queries.


Best,
Xingcan



On 19 Feb 2018, at 5:55 AM, Julien  wrote:


Hi Xingcan,

Thanks for your answer.
Yes, I understand that point:

if I have 100 resource IDs with parallelism of 4, then each operator instance 
will handle about 25 keys




The issue I have is that I want, on a given operator instance, to group those 
25 keys together in order to do only 1 query to an external system per operator 
instance:

on a given operator instance, I will do 1 query for my 25 keys
so with the 4 operator instances, I will do 4 query in parallel (with about 25 
keys per query)


I do not know how I can do that.

If I define a window on my keyed stream (with for example 
stream.key(_.resourceId).window(TumblingProcessingTimeWindows.of(Time.milliseconds(500))),
 then my understanding is that the window is "associated" to the key. So in 
this case, on a given operator instance, I will have 25 of those windows (one 
per key), and I will do 25 queries (instead of 1).

Do you understand my point ?
Or maybe am I missing something ?

I'd like to find a way on operator instance 1 to group all the alerts received 
on those 25 resource ids and do 1 query for those 25 resource ids.
Same thing for operator instance 2, 3 and 4.


Thank you,
Regards.


On 18/02/2018 14:43, Xingcan Cui wrote:

Hi Julien,


the cardinality of your keys (e.g., resource ID) will not be restricted to the 
parallelism. For instance, if you have 100 resource IDs processed by 
KeyedStream with parallelism 4, each operator instance will handle about 25 
keys. 


Hope that helps.


Best,
Xingcan


On 18 Feb 2018, at 8:49 PM, Julien  wrote:



Hi,

I am pretty new to flink and I don't know what will be the best way to deal 
with the following use case:

as an input, I recieve some alerts from a kafka topic
an alert is linked to a network resource (like router-1, router-2, switch-1, 
switch-2, ...)
so an alert has two main information (the alert id and the resource id of the 
resource on which this alert has been raised)
then I need to do a query to an external system in order to enrich the alert 
with additional information on the resource

(A "natural" candidate for the key on this stream will be the resource id)

The issue I have is that regarding the query to the external system:

I do not want to do 1 query per resource id
I want to do a small number of queries in parallel (for example 4 queries in 
parallel every 500ms), each query requesting the external system for several 
alerts linked to several resource id
Currently, I don't know what will be the best way to deal with that:

I can key my stream on the resource id and then define a processing time window 
of 500ms and when the trigger is ok, then I do my query
by doing so, I will "group" several alerts in a single query, but they will all 
be linked to the same resource.
so I will do 1 query per resource id (which will be too much in my use case)
I can also do a windowAll on a non keyed stream
by doing so, I will "group" together alerts from different resource ids, but 
from what I've read in such a case the parallelism will always be one.
so in this case, I will only do 1 query whereas I'd like to have some 
parallelism

I am thinking that a way to deal with that will be:

define the resource id as the key of stream and put a parallelism of 4
and then having a way to do a windowAll on this keyed stream
which is that, on a given operator instance, I will "group" on the same window 
all the keys (ie all the resource ids) managed by this operator instance
with a parallelism of 4, I will do 4 queries in parallel (1 per operator 
instance, and each query will be for several alerts linked to several resource 
ids)

But after looking at the documentation, I cannot see this ability (having a 
windowAll on a keyed stream).

Am I missing something?

What will be the best way to deal with such a use case?




I've tried for example to review my key and to do something like 
"resourceId.hahsCode%" and then to use a time 
window.

In my example above, the  will be 4. And all my 
keys will be 0, 1, 2 or 3.

The issue with this approach is th

Re:Re: Problem with Flink restoring from checkpoints

2017-07-19 Thread
Hi Fran,


is the DataTimeBucketer acts like a memory buffer and does't managed by flink's 
state? If so, then i think the problem is not about Kafka, but about the 
DateTimeBucketer. Flink won't take snapshot for the DataTimeBucketer if it not 
in any state.


Best, 
Sihua Zhou



At 2017-07-20 03:02:20, "Fabian Hueske"  wrote:

Hi Fran,


did you observe actual data loss due to the problem you are describing or are 
you discussing a possible issue based on your observations?


AFAIK, Flink's Kafka consumer keeps track of the offsets itself and includes 
these in the checkpoints. In case of a recovery, it does not rely on the 
offsets which were committed back to Kafka but only on the offsets it 
checkpointed itself.
Gordon (in CC) is familiar with all details of Flink's Kafka consumer and can 
give a more detailed answer.


Best, Fabian



2017-07-19 16:55 GMT+02:00 Francisco Blaya :

Hi,


We have a Flink job running on AWS EMR sourcing a Kafka topic and persisting 
the events to S3 through a DateTimeBucketer. We configured the bucketer to 
flush to S3 with an inactivity period of 5 mins.The rate at which events are 
written to Kafka in the first place is very low so it is easy for us to 
investigate how the Flink job would recover in respect to Kafka offsets after 
the job gets cancelled or the Yarn session killed.


What we found is that Flink acks Kafka immediately before even writing to S3. 
The consequence of this seems to be that if the job gets cancelled before the 
acked events are flushed to S3 then these are lost, they don't get written when 
the job restarts. Flink doesn't seem to keep in its checkpointed state the fact 
that it acked those events but never flushed them to S3. Checkpoints are 
created every 5 seconds in S3.

We've also tried to configure externalized checkpoints throught 
"state.checkpoints.dir" configuration key and 
"env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)"
 in the job so that they don't automatically get cleaned up when the job gets 
cancelled or the Yarn session killed. We can see the job uses a restored 
checkpoint upon restart but still we get missing events in S3.

Has anyone come across this behaviour before? Are we assuming something wrong?

We're using EMR 5.4.0 and Flink 1.2.0.

Regards,
Fran

hivehome.com






Hive | London | Cambridge | Houston | Toronto
The information contained in or attached to this email is confidential and 
intended only for the use of the individual(s) to which it is addressed. It may 
contain information which is confidential and/or covered by legal professional 
or other privilege. The views expressed in this email are not necessarily the 
views of Centrica plc, and the company, its directors, officers or employees 
make no representation or accept any liability for their accuracy or 
completeness unless expressly stated to the contrary. 
Centrica Connected Home Limited (company no: 5782908), registered in England 
and Wales with its registered office at Millstream, Maidenhead Road, Windsor, 
Berkshire SL4 5GD.



Is there some metric info about RocksdbBackend?

2017-06-30 Thread
Hi,
 Is there some metric info about RocksdbBackend in flink, like sst compact 
times, memtable dump times, block cache size and so on. Currently when using 
Rocksdb as backend it behavior is black for us  and it consumption a lot of 
memory, i want to figure out it behavior via metric.

Re:Re: Error when set RocksDBStateBackend option in Flink?

2017-06-29 Thread

I will keep the call to special my rocksdb option later, OptionFactory have 
already extended the java.io.Serializable interface and MRocksDBFactory  
implement from OptionFactory , so MRocksDBFactory should have the 
Serializability. Why this problem occur? 

At 2017-06-29 17:53:07, "Ted Yu"  wrote:

Since MRocksDBFactory doesn't add any option, it seems 
rocksDBBackEnd.setOptions() call can be skipped.


If you choose to keep the call, please take a look at (OptionsFactory extends 
java.io.Serializable):


https://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html



On Thu, Jun 29, 2017 at 2:16 AM, 周思华  wrote:

I use the follow code to set RocksDBStateBackend and it option, it can run 
correctly locally, but can't be submitted to cluster.


Main.class:
public static void main() {
   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
RocksDBStateBackend rocksDBBackEnd = new 
RocksDBStateBackend("file:///Users/zsh/tmp/rocksdb");
rocksDBBackEnd.setPredefinedOptions(PredefinedOptions.DEFAULT);
rocksDBBackEnd.setOptions(new MRocksDBFactory());
env.setStateBackend(rocksDBBackEnd);
...
env.execute(jobName);
}


MRocksDBFactory.class:
public class MRocksDBFactory implements OptionsFactory {
@Override
public DBOptions createDBOptions(DBOptions currentOptions) {


return currentOptions;
}


@Override
public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions 
currentOptions) {


return currentOptions;


}
}


The exception info in jobmanager.log look like below:


2017-06-29 16:29:27,162 WARN  akka.remote.ReliableDeliverySupervisor
- Association with remote system 
[akka.tcp://flink@10.242.98.255:52638] has failed, address is now gated for 
[5000] ms. Reason: [gerryzhou.MRocksDBFactory]
2017-06-29 16:29:27,163 ERROR Remoting  
- gerryzhou.MRocksDBFactory
java.lang.ClassNotFoundException: gerryzhou.MRocksDBFactory
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:677)
at 
akka.util.ClassLoaderObjectInputStream.resolveClass(ClassLoaderObjectInputStream.scala:19)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1819)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1986)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136)
at 
akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104)
at scala.util.Try$.apply(Try.scala:192)
at akka.serialization.Serialization.deserialize(Serialization.scala:98)
at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23)
at akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:58)
at akka.remote.De

Error when set RocksDBStateBackend option in Flink?

2017-06-29 Thread
I use the follow code to set RocksDBStateBackend and it option, it can run 
correctly locally, but can't be submitted to cluster.


Main.class:
public static void main() {
   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
RocksDBStateBackend rocksDBBackEnd = new 
RocksDBStateBackend("file:///Users/zsh/tmp/rocksdb");
rocksDBBackEnd.setPredefinedOptions(PredefinedOptions.DEFAULT);
rocksDBBackEnd.setOptions(new MRocksDBFactory());
env.setStateBackend(rocksDBBackEnd);
...
env.execute(jobName);
}


MRocksDBFactory.class:
public class MRocksDBFactory implements OptionsFactory {
@Override
public DBOptions createDBOptions(DBOptions currentOptions) {


return currentOptions;
}


@Override
public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions 
currentOptions) {


return currentOptions;


}
}


The exception info in jobmanager.log look like below:


2017-06-29 16:29:27,162 WARN  akka.remote.ReliableDeliverySupervisor
- Association with remote system 
[akka.tcp://flink@10.242.98.255:52638] has failed, address is now gated for 
[5000] ms. Reason: [gerryzhou.MRocksDBFactory]
2017-06-29 16:29:27,163 ERROR Remoting  
- gerryzhou.MRocksDBFactory
java.lang.ClassNotFoundException: gerryzhou.MRocksDBFactory
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:677)
at 
akka.util.ClassLoaderObjectInputStream.resolveClass(ClassLoaderObjectInputStream.scala:19)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1819)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1986)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136)
at 
akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104)
at scala.util.Try$.apply(Try.scala:192)
at akka.serialization.Serialization.deserialize(Serialization.scala:98)
at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23)
at akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:58)
at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:58)
at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:76)
at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:967)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:437)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkj