Question regarding rescale api
Hi All, I see the rescale api allow us to somehow redistribute element locally, but is it possible to make the upstream operator distributed evenly on task managers? For example I have 10 task managers each with 10 slots. The application reads data from Kafka topic with 20 partitions, then rescale it to full parallelism. To me it seems that the 20 slots needed to read from Kafka won’t distributed evenly on 10 task managers, which means further rescale still needs to shuffle data over network. Best, Mingliang 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件! This communication may contain privileged or other confidential information of Red. If you have received it in error, please advise the sender by reply e-mail and immediately delete the message and any attachments without copying or disclosing the contents. Thank you.
Re: Flink on kubernetes
Hi Lasse, Is there JIRA ticket I can follow? Best, Mingliang > On 3 Sep 2018, at 5:42 PM, Lasse Nedergaard wrote: > > Hi. > > We have documented the same on Flink 1.4.2/1.6 running on Yarn and Mesos. > If you correlate the none heap memory together with job restart you will see > none heap increases for every restart until you get an OOM. > > I let you know if/when I know how to handle the problem. > > Med venlig hilsen / Best regards > Lasse Nedergaard > > >> Den 3. sep. 2018 kl. 10.08 skrev 祁明良 : >> >> Hi All, >> >> We are running flink(version 1.5.2) on k8s with rocksdb backend. >> Each time when the job is cancelled and restarted, we face OOMKilled problem >> from the container. >> In our case, we only assign 15% of container memory to JVM and leave others >> to rocksdb. >> To us, it looks like memory used by rocksdb is not released after job >> cancelling. Anyone can gives some suggestions? >> Currently our tmp fix is to restart the TM pod for each job cancelling, but >> it has to be manually. >> >> Regards, >> Mingliang >> >> 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件! >> This communication may contain privileged or other confidential information >> of Red. If you have received it in error, please advise the sender by reply >> e-mail and immediately delete the message and any attachments without >> copying or disclosing the contents. Thank you. 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件! This communication may contain privileged or other confidential information of Red. If you have received it in error, please advise the sender by reply e-mail and immediately delete the message and any attachments without copying or disclosing the contents. Thank you.
Flink on kubernetes
Hi All, We are running flink(version 1.5.2) on k8s with rocksdb backend. Each time when the job is cancelled and restarted, we face OOMKilled problem from the container. In our case, we only assign 15% of container memory to JVM and leave others to rocksdb. To us, it looks like memory used by rocksdb is not released after job cancelling. Anyone can gives some suggestions? Currently our tmp fix is to restart the TM pod for each job cancelling, but it has to be manually. Regards, Mingliang 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件! This communication may contain privileged or other confidential information of Red. If you have received it in error, please advise the sender by reply e-mail and immediately delete the message and any attachments without copying or disclosing the contents. Thank you.
Batch expired in FlinkKafkaProducer09
Hi All, When using FlinkKafkaProducer09 (Flink version 1.4.2), I’m facing an Kafka batch expired error when checkpoint starts. The error log is attached below. Here is what I have investigated: 1. The error only and always occurs when checkpoint starts. 2. The error seems not related to flushOnCheckpoint config, since it is detected before flush check. 3. There is checkErroneous in the beginning of FlinkKafkaProducerBase.invoke and FlinkKafkaProducerBase.snapshotState, I don’t know why the invoke method works fine. 4. There is no problem when having the same code writing to another Kafka cluster. (We just got a new Kafka server to migrate:) 5. The Kafka server is actually of version 0.11, in this job we need to consume from 0.9, and write to 0.11, so we used 09 version. Best, Mingliang java.lang.Exception: Error while triggering checkpoint 3 for Source: v2-kafkaRawUbt -> v2-flatMapUbtEnrich -> v2-filterUbt -> (v2-flatMapUbtAbs -> v2-mapAbsEvent -> v2-flatMapUbtAbsToAlgoEventV2 -> Filter -> Sink: v2-sinkUbtAlgoEventRealtimeV2, v2-filterUbtFeLogErr) (41/226) at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1210) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception: Could not perform checkpoint 3 for operator Source: v2-kafkaRawUbt -> v2-flatMapUbtEnrich -> v2-filterUbt -> (v2-flatMapUbtAbs -> v2-mapAbsEvent -> v2-flatMapUbtAbsToAlgoEventV2 -> Filter -> Sink: v2-sinkUbtAlgoEventRealtimeV2, v2-filterUbtFeLogErr) (41/226). at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:544) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.triggerCheckpoint(SourceStreamTask.java:111) at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1199) ... 5 more Caused by: java.lang.Exception: Could not complete snapshot 3 for operator Source: v2-kafkaRawUbt -> v2-flatMapUbtEnrich -> v2-filterUbt -> (v2-flatMapUbtAbs -> v2-mapAbsEvent -> v2-flatMapUbtAbsToAlgoEventV2 -> Filter -> Sink: v2-sinkUbtAlgoEventRealtimeV2, v2-filterUbtFeLogErr) (41/226). at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:378) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1089) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1038) at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:671) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:607) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:538) ... 7 more Caused by: java.lang.Exception: Failed to send data to Kafka: Batch Expired at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.snapshotState(FlinkKafkaProducerBase.java:350) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:356) ... 12 more Caused by: org.apache.kafka.common.errors.TimeoutException: Batch Expired 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件! This communication may contain privileged or other confidential information of Red. If you have received it in error, please advise the sender by reply e-mail and immediately delete the message and any attachments without copying or disclosing the contents. Thank you.
Stream collector serialization performance
Hi all, I’m currently using the keyed process function, I see there’s serialization happening when I collect the object / update the object to rocksdb. For me the performance of serialization seems to be the bottleneck. By default, POJO serializer is used, and the timecost of collect / update to rocksdb is roughly 1:1, Then I switch to kryo by setting getConfig.enableForceKryo(). Now the timecost of update to rocksdb decreases significantly to roughly 0.3, but the collect method seems not improving. Can someone help to explain this? My Object looks somehow like this: Class A { String f1 // 20 * string fields List f2. // 20 * list of another POJO object Int f3 // 20 * ints fields } Class B { String f // 5 * string fields } Best, Mingliang 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件! This communication may contain privileged or other confidential information of Red. If you have received it in error, please advise the sender by reply e-mail and immediately delete the message and any attachments without copying or disclosing the contents. Thank you.
Re: Tuning checkpoint
Thank you for this great answer, Fabian. Regarding the yarn JVM heap size, I tried to change containerized.heap-cutoff-ratio: 0.25 And it somehow looks like working, but the actually memory needed for rocksdb still looks like a blackbox to me. I see there’s already a JIRA ticket talking about this problem[1], created last year and still open yet. What I can do is just keep enlarging this value until YARN don’t kill my TaskManager because of memory usage:) By the way, my rough calculation of rocksdb memory on each TM is like num of slots per task * num of stateful operators(including source and sink?) * (block cache size + write buffer size) I bet it’s not correct.. Best, Mingliang [1] https://issues.apache.org/jira/browse/FLINK-7289 On 13 Aug 2018, at 11:05 PM, Fabian Hueske mailto:fhue...@gmail.com>> wrote: Hi Mingliang, let me answer your second question first: > Another question is about the alignment buffer, I thought it was only used > for multiple input stream cases. But for keyed process function , what is > actually aligned? When a task sends records to multiple downstream tasks (task not operators!) due to a broadcast or partition/keyBy/shuffle connection, the task broadcasts each checkpoint barrier to all of its receiving tasks. Therefore, each task that receives records from multiple tasks will receive multiple checkpoint barriers. (Checkpoint barriers behave similar to watermarks in this regard) In order to provide exactly-once state consistency, a task must buffer records from input connection that forwarded a barrier until barriers from all input connections have been received and the state checkpoint was initiated. What does this mean for the long checkpoint alignment that you observe? Checkpoint alignment starts when the first barrier is received and ends when the last barrier is received. Hence, it seems as if one task manager receives some barrier(s) later than the other nodes, probably because it is more heavily loaded. The fact that all affected tasks run on the same TM and that you mentioned backpressure is a hint for that because TMs multiplex the connection of all tasks. Regarding the memory configuration question, I am not sure if there is a way to override the JVM heap size on YARN. Maybe others can answer this question. Best, Fabian 2018-08-12 18:36 GMT+02:00 祁明良 mailto:m...@xiaohongshu.com>>: Hi all, I have several questions regarding the checkpoint. The background is I'm using a ProcessFunction keyed by user_id somehow works like following: inputStream .keyBy(x => getUserKey(x)) .process(...) It runs on yarn with 40 TMs * 2 slots each, when I look at the checkpoint metrics, only a small number of subtasks have a large "alignment buffered/duration", and looks like either all the 2 slots on the same TM are both high or both low. What may probably cause this? 1. maybe data skew, but I see the amount of data is almost same 2. or network? 3. The system is under back pressure, but I don't understand why only like 4 out of 80 subtasks perform like this. Another question is about the alignment buffer, I thought it was only used for multiple input stream cases. But for keyed process function , what is actually aligned? The last question is about tuning rocksdb, I try to assign some memory to writebuffer and block cache, and the doc says "typically by decreasing the JVM heap size of the TaskManagers by the same amount" , and taskmanager heap size is "On YARN setups, this value is automatically configured to the size of the TaskManager's YARN container, minus a certain tolerance value." This looks like I should decrease the taskmanager heap and the value is set by YARN automatically, so what should I do? Best, Mingliang 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件! This communication may contain privileged or other confidential information of Red. If you have received it in error, please advise the sender by reply e-mail and immediately delete the message and any attachments without copying or disclosing the contents. Thank you. 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件! This communication may contain privileged or other confidential information of Red. If you have received it in error, please advise the sender by reply e-mail and immediately delete the message and any attachments without copying or disclosing the contents. Thank you.
flink on kubernetes
Hi all, We are trying to build our flink cluster on k8s, but there seems to be not enough materials about details. Firstly, we learned how to build a standalone cluster on k8s and run all the jobs inside it. The question is like are we building a cluster for all the jobs or one job per cluster? To us, a cluster for all the jobs leads to bad isolation and some job may require different configurations. But we don't find any material talking about how to build like one job per cluster on k8s, something like running one flink job on yarn. Do we have to wrap the things by ourself or there's something we missed? Best, Mingliang 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件! This communication may contain privileged or other confidential information of Red. If you have received it in error, please advise the sender by reply e-mail and immediately delete the message and any attachments without copying or disclosing the contents. Thank you.
Window state with rocksdb backend
Hi all, This is mingliang, I got a problem with rocksdb backend. I'm currently using a 15min SessionWindow which also fires every 10s, there's no pre-aggregation, so the input of WindowFunction would be the whole Iterator of input object. For window operator, I assume this collection is also a state that maintained by Flink. Then, in each 10s fire, the window function will take the objects out from iterator and do some update, and in next fire, I assume I would get the updated value of that object. With File system backend it was successful but eats a lot of memory and finally I got GC overhead limit, then I switch to rocksdb backend and the problem is the object in the next fire round is not updated by the previous fire round. Do I have to do some additional staff with rocksdb backend in this case? Thanks in advance Mingliang 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件! This communication may contain privileged or other confidential information of Red. If you have received it in error, please advise the sender by reply e-mail and immediately delete the message and any attachments without copying or disclosing the contents. Thank you.