Question regarding rescale api

2018-12-09 Thread
Hi All,

I see the rescale api allow us to somehow redistribute element locally, but is 
it possible to make the upstream operator distributed evenly on task managers?
For example I have 10 task managers each with 10 slots. The application reads 
data from Kafka topic with 20 partitions, then rescale it to full parallelism. 
To me it seems that the 20 slots needed to read from Kafka won’t distributed 
evenly on 10 task managers, which means further rescale still needs to shuffle 
data over network.


Best,
Mingliang

本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
This communication may contain privileged or other confidential information of 
Red. If you have received it in error, please advise the sender by reply e-mail 
and immediately delete the message and any attachments without copying or 
disclosing the contents. Thank you.


Re: Flink on kubernetes

2018-09-03 Thread
Hi Lasse,

Is there JIRA ticket I can follow?

Best,
Mingliang

> On 3 Sep 2018, at 5:42 PM, Lasse Nedergaard  wrote:
>
> Hi.
>
> We have documented the same on Flink 1.4.2/1.6 running on Yarn and Mesos.
> If you correlate the none heap memory together with job restart you will see 
> none heap increases for every restart until you get an OOM.
>
> I let you know if/when I know how to handle the problem.
>
> Med venlig hilsen / Best regards
> Lasse Nedergaard
>
>
>> Den 3. sep. 2018 kl. 10.08 skrev 祁明良 :
>>
>> Hi All,
>>
>> We are running flink(version 1.5.2) on k8s with rocksdb backend.
>> Each time when the job is cancelled and restarted, we face OOMKilled problem 
>> from the container.
>> In our case, we only assign 15% of container memory to JVM and leave others 
>> to rocksdb.
>> To us, it looks like memory used by rocksdb is not released after job 
>> cancelling. Anyone can gives some suggestions?
>> Currently our tmp fix is to restart the TM pod for each job cancelling, but 
>> it has to be manually.
>>
>> Regards,
>> Mingliang
>>
>> 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
>> This communication may contain privileged or other confidential information 
>> of Red. If you have received it in error, please advise the sender by reply 
>> e-mail and immediately delete the message and any attachments without 
>> copying or disclosing the contents. Thank you.


本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
This communication may contain privileged or other confidential information of 
Red. If you have received it in error, please advise the sender by reply e-mail 
and immediately delete the message and any attachments without copying or 
disclosing the contents. Thank you.


Flink on kubernetes

2018-09-03 Thread
Hi All,

We are running flink(version 1.5.2) on k8s with rocksdb backend.
Each time when the job is cancelled and restarted, we face OOMKilled problem 
from the container.
In our case, we only assign 15% of container memory to JVM and leave others to 
rocksdb.
To us, it looks like memory used by rocksdb is not released after job 
cancelling. Anyone can gives some suggestions?
Currently our tmp fix is to restart the TM pod for each job cancelling, but it 
has to be manually.

Regards,
Mingliang

本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
This communication may contain privileged or other confidential information of 
Red. If you have received it in error, please advise the sender by reply e-mail 
and immediately delete the message and any attachments without copying or 
disclosing the contents. Thank you.


Batch expired in FlinkKafkaProducer09

2018-08-22 Thread
Hi All,

When using FlinkKafkaProducer09 (Flink version 1.4.2), I’m facing an Kafka 
batch expired error when checkpoint starts. The error log is attached below.

Here is what I have investigated:
1. The error only and always occurs when checkpoint starts.
2. The error seems not related to flushOnCheckpoint config, since it is 
detected before flush check.
3. There is checkErroneous in the beginning of FlinkKafkaProducerBase.invoke  
and FlinkKafkaProducerBase.snapshotState, I don’t know  why the invoke method 
works fine.
4. There is no problem when having the same code writing to another Kafka 
cluster. (We just got a new Kafka server to migrate:)
5. The Kafka server is actually of version 0.11, in this job we need to consume 
from 0.9, and write to 0.11, so we used 09 version.

Best,
Mingliang

java.lang.Exception: Error while triggering checkpoint 3 for Source: 
v2-kafkaRawUbt -> v2-flatMapUbtEnrich -> v2-filterUbt -> (v2-flatMapUbtAbs -> 
v2-mapAbsEvent -> v2-flatMapUbtAbsToAlgoEventV2 -> Filter -> Sink: 
v2-sinkUbtAlgoEventRealtimeV2, v2-filterUbtFeLogErr) (41/226)
at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1210)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not perform checkpoint 3 for operator 
Source: v2-kafkaRawUbt -> v2-flatMapUbtEnrich -> v2-filterUbt -> 
(v2-flatMapUbtAbs -> v2-mapAbsEvent -> v2-flatMapUbtAbsToAlgoEventV2 -> Filter 
-> Sink: v2-sinkUbtAlgoEventRealtimeV2, v2-filterUbtFeLogErr) (41/226).
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:544)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.triggerCheckpoint(SourceStreamTask.java:111)
at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1199)
... 5 more
Caused by: java.lang.Exception: Could not complete snapshot 3 for operator 
Source: v2-kafkaRawUbt -> v2-flatMapUbtEnrich -> v2-filterUbt -> 
(v2-flatMapUbtAbs -> v2-mapAbsEvent -> v2-flatMapUbtAbsToAlgoEventV2 -> Filter 
-> Sink: v2-sinkUbtAlgoEventRealtimeV2, v2-filterUbtFeLogErr) (41/226).
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:378)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1089)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1038)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:671)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:607)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:538)
... 7 more
Caused by: java.lang.Exception: Failed to send data to Kafka: Batch Expired
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.snapshotState(FlinkKafkaProducerBase.java:350)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:356)
... 12 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Batch Expired

本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
This communication may contain privileged or other confidential information of 
Red. If you have received it in error, please advise the sender by reply e-mail 
and immediately delete the message and any attachments without copying or 
disclosing the contents. Thank you.


Stream collector serialization performance

2018-08-15 Thread
Hi all,

I’m currently using the keyed process function, I see there’s serialization 
happening when I collect the object / update the object to rocksdb. For me the 
performance of serialization seems to be the bottleneck.
By default, POJO serializer is used, and the timecost of collect / update to 
rocksdb is roughly 1:1, Then I switch to kryo by setting 
getConfig.enableForceKryo(). Now the timecost of update to rocksdb decreases 
significantly to roughly 0.3, but the collect method seems not improving. Can 
someone help to explain this?

 My Object looks somehow like this:

Class A {
String f1 // 20 * string fields
List f2. // 20 * list of another POJO object
Int f3 // 20 * ints fields
}
Class B {
String f // 5 * string fields
}

Best,
Mingliang

本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
This communication may contain privileged or other confidential information of 
Red. If you have received it in error, please advise the sender by reply e-mail 
and immediately delete the message and any attachments without copying or 
disclosing the contents. Thank you.


Re: Tuning checkpoint

2018-08-13 Thread
Thank you for this great answer, Fabian.

Regarding the yarn JVM heap size, I tried to change
containerized.heap-cutoff-ratio: 0.25
And it somehow looks like working, but the actually memory needed for rocksdb 
still looks like a blackbox  to me. I see there’s already a JIRA ticket talking 
about this problem[1], created last year and still open yet. What I can do is 
just keep enlarging this value until YARN don’t kill my TaskManager because of 
memory usage:)

By the way, my rough calculation of rocksdb memory on each TM is like
num of slots per task * num of stateful operators(including source and sink?) * 
(block cache size + write buffer size)

I bet it’s not correct..

Best,
Mingliang

[1] https://issues.apache.org/jira/browse/FLINK-7289

On 13 Aug 2018, at 11:05 PM, Fabian Hueske 
mailto:fhue...@gmail.com>> wrote:

Hi Mingliang,

let me answer your second question first:

> Another question is about the alignment buffer, I thought it was only used 
> for multiple input stream cases. But for keyed process function , what is 
> actually aligned?

When a task sends records to multiple downstream tasks (task not operators!) 
due to a broadcast or partition/keyBy/shuffle connection, the task broadcasts 
each checkpoint barrier to all of its receiving tasks.
Therefore, each task that receives records from multiple tasks will receive 
multiple checkpoint barriers. (Checkpoint barriers behave similar to watermarks 
in this regard)
In order to provide exactly-once state consistency, a task must buffer records 
from input connection that forwarded a barrier until barriers from all input 
connections have been received and the state checkpoint was initiated.

What does this mean for the long checkpoint alignment that you observe?
Checkpoint alignment starts when the first barrier is received and ends when 
the last barrier is received.
Hence, it seems as if one task manager receives some barrier(s) later than the 
other nodes, probably because it is more heavily loaded.
The fact that all affected tasks run on the same TM and that you mentioned 
backpressure is a hint for that because TMs multiplex the connection of all 
tasks.

Regarding the memory configuration question, I am not sure if there is a way to 
override the JVM heap size on YARN. Maybe others can answer this question.

Best,
Fabian

2018-08-12 18:36 GMT+02:00 祁明良 
mailto:m...@xiaohongshu.com>>:

Hi all,

I have several questions regarding the checkpoint. The background is I'm using 
a ProcessFunction keyed by user_id somehow works like following:

inputStream
  .keyBy(x => getUserKey(x))
  .process(...)

It runs on yarn with 40 TMs * 2 slots each, when I look at the checkpoint 
metrics, only a small number of subtasks have a large "alignment 
buffered/duration", and looks like either all the 2 slots on the same TM are 
both high or both low.  What may probably cause this?

  1.  maybe data skew, but I see the amount of data is almost same
  2.  or network?
  3.  The system is under back pressure, but I don't understand why only like 4 
out of 80 subtasks perform like this.

Another question is about the alignment buffer, I thought it was only used for 
multiple input stream cases. But for keyed process function , what is actually 
aligned?

The last question is about tuning rocksdb, I try to assign some memory to 
writebuffer and block cache, and the doc says "typically by decreasing the JVM 
heap size of the TaskManagers by the same amount" , and taskmanager heap size 
is "On YARN setups, this value is automatically configured to the size of the 
TaskManager's YARN container, minus a certain tolerance value." This looks like 
I should decrease the taskmanager heap and the value is set by YARN 
automatically, so what should I do?

Best,

Mingliang



本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
This communication may contain privileged or other confidential information of 
Red. If you have received it in error, please advise the sender by reply e-mail 
and immediately delete the message and any attachments without copying or 
disclosing the contents. Thank you.




本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
This communication may contain privileged or other confidential information of 
Red. If you have received it in error, please advise the sender by reply e-mail 
and immediately delete the message and any attachments without copying or 
disclosing the contents. Thank you.


flink on kubernetes

2018-08-12 Thread
Hi all,


We are trying to build our flink cluster on k8s, but there seems to be not 
enough materials about details.

Firstly, we learned how to build a standalone cluster on k8s and run all the 
jobs inside it.

The question is like are we building a cluster for all the jobs or one job per 
cluster?

To us, a cluster for all the jobs leads to bad isolation and some job may 
require different configurations. But we don't find any material talking about 
how to build like one job per cluster on k8s, something like running one flink 
job on yarn. Do we have to wrap the things by ourself or there's something we 
missed?


Best,

Mingliang



本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
This communication may contain privileged or other confidential information of 
Red. If you have received it in error, please advise the sender by reply e-mail 
and immediately delete the message and any attachments without copying or 
disclosing the contents. Thank you.


Window state with rocksdb backend

2018-08-09 Thread
Hi all,


This is mingliang, I got a problem with rocksdb backend.


I'm currently using a 15min SessionWindow which also fires every 10s, there's 
no pre-aggregation, so the input of WindowFunction would be the whole Iterator 
of input object.

For window operator, I assume this collection is also a state that maintained 
by Flink.

Then, in each 10s fire, the window function will take the objects out from 
iterator and do some update, and in next fire, I assume I would get the updated 
value of that object.

With File system backend it was successful but eats a lot of memory and finally 
I got GC overhead limit, then I switch to rocksdb backend and the problem is 
the object in the next fire round is not updated by the previous fire round.

Do I have to do some additional staff with rocksdb backend in this case?


Thanks in advance

Mingliang


本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
This communication may contain privileged or other confidential information of 
Red. If you have received it in error, please advise the sender by reply e-mail 
and immediately delete the message and any attachments without copying or 
disclosing the contents. Thank you.