[jira] [Comment Edited] (FLINK-6613) OOM during reading big messages from Kafka
[ https://issues.apache.org/jira/browse/FLINK-6613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16022285#comment-16022285 ] Tzu-Li (Gordon) Tai edited comment on FLINK-6613 at 5/24/17 4:10 AM: - +1 to what [~rmetzger] mentioned. The Kafka consumer (0.9+) will always have as much as 2 {{ConsumerRecords}}, one in the {{Handover}} awaiting to be processed, another awaiting to be added to {{Handover}}. Regarding what you expect: "2) Read next batch of messages only when previous batch processed" --> this is already what is happening, with only a size 1 buffer in the {{Handover}}. Also, I don't think it will solve the root cause of whats causing your OOM, even if we remove buffering completely. "1) KafkaConsumerThread read messages with total size ~1G." --> as Robert mentioned, you should be able to just directly configure the Kafka client for that, and will likely solve your problem. was (Author: tzulitai): +1 to what [~rmetzger] mentioned. The Kafka consumer (0.9+) will always have as much as 2 {{ConsumerRecords}}, one in the {{Handover}} awaiting to be processed, another awaiting to be added to {{Handover}}. Regarding what you expect: "2) Read next batch of messages only when previous batch processed" --> this is already what is happening, with only a size 1 buffer in the {{Handover}}. Also, I don't think it will solve the root cause of whats causing your OOM. "1) KafkaConsumerThread read messages with total size ~1G." --> as Robert mentioned, you should be able to just directly configure the Kafka client for that, and will likely solve your problem. > OOM during reading big messages from Kafka > -- > > Key: FLINK-6613 > URL: https://issues.apache.org/jira/browse/FLINK-6613 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.2.0 >Reporter: Andrey > > Steps to reproduce: > 1) Setup Task manager with 2G heap size > 2) Setup job that reads messages from Kafka 10 (i.e. FlinkKafkaConsumer010) > 3) Send 3300 messages each 635Kb. So total size is ~2G > 4) OOM in task manager. > According to heap dump: > 1) KafkaConsumerThread read messages with total size ~1G. > 2) Pass them to the next operator using > org.apache.flink.streaming.connectors.kafka.internal.Handover > 3) Then began to read another batch of messages. > 4) Task manager was able to read next batch of ~500Mb messages until OOM. > Expected: > 1) Either have constraint like "number of messages in-flight" OR > 2) Read next batch of messages only when previous batch processed OR > 3) Any other option which will solve OOM. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-6613) OOM during reading big messages from Kafka
[ https://issues.apache.org/jira/browse/FLINK-6613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16022285#comment-16022285 ] Tzu-Li (Gordon) Tai edited comment on FLINK-6613 at 5/24/17 4:12 AM: - . + 1 to what [~rmetzger] mentioned. The Kafka consumer (0.9+) will always have as much as 2 {{ConsumerRecords}}, one in the {{Handover}} awaiting to be processed, another awaiting to be added to {{Handover}}. Regarding what you expect: "2) Read next batch of messages only when previous batch processed" --> this is already what is happening, with only a size 1 buffer in the {{Handover}}. Also, I don't think it will solve the root cause of whats causing your OOM, even if we remove buffering completely. "1) KafkaConsumerThread read messages with total size ~1G." --> as Robert mentioned, you should be able to just directly configure the Kafka client for that, and will likely solve your problem. was (Author: tzulitai): . +1 to what [~rmetzger] mentioned. The Kafka consumer (0.9+) will always have as much as 2 {{ConsumerRecords}}, one in the {{Handover}} awaiting to be processed, another awaiting to be added to {{Handover}}. Regarding what you expect: "2) Read next batch of messages only when previous batch processed" --> this is already what is happening, with only a size 1 buffer in the {{Handover}}. Also, I don't think it will solve the root cause of whats causing your OOM, even if we remove buffering completely. "1) KafkaConsumerThread read messages with total size ~1G." --> as Robert mentioned, you should be able to just directly configure the Kafka client for that, and will likely solve your problem. > OOM during reading big messages from Kafka > -- > > Key: FLINK-6613 > URL: https://issues.apache.org/jira/browse/FLINK-6613 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.2.0 >Reporter: Andrey > > Steps to reproduce: > 1) Setup Task manager with 2G heap size > 2) Setup job that reads messages from Kafka 10 (i.e. FlinkKafkaConsumer010) > 3) Send 3300 messages each 635Kb. So total size is ~2G > 4) OOM in task manager. > According to heap dump: > 1) KafkaConsumerThread read messages with total size ~1G. > 2) Pass them to the next operator using > org.apache.flink.streaming.connectors.kafka.internal.Handover > 3) Then began to read another batch of messages. > 4) Task manager was able to read next batch of ~500Mb messages until OOM. > Expected: > 1) Either have constraint like "number of messages in-flight" OR > 2) Read next batch of messages only when previous batch processed OR > 3) Any other option which will solve OOM. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-6613) OOM during reading big messages from Kafka
[ https://issues.apache.org/jira/browse/FLINK-6613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16022285#comment-16022285 ] Tzu-Li (Gordon) Tai edited comment on FLINK-6613 at 5/24/17 4:13 AM: - . + 1 to what [~rmetzger] mentioned. The Kafka consumer (0.9+) will always have as much as 2 {{ConsumerRecords}}, one in the {{Handover}} awaiting to be processed, another awaiting to be added to {{Handover}} in the {{KafkaConsumerThread}}. There shouldn't be any excessive objects created. Regarding what you expect: "2) Read next batch of messages only when previous batch processed" --> this is already what is happening, with only a size 1 buffer in the {{Handover}}. Also, I don't think it will solve the root cause of whats causing your OOM, even if we remove buffering completely. "1) KafkaConsumerThread read messages with total size ~1G." --> as Robert mentioned, you should be able to just directly configure the Kafka client for that, and will likely solve your problem. was (Author: tzulitai): . + 1 to what [~rmetzger] mentioned. The Kafka consumer (0.9+) will always have as much as 2 {{ConsumerRecords}}, one in the {{Handover}} awaiting to be processed, another awaiting to be added to {{Handover}} in the {{KafkaConsumerThread}}. Regarding what you expect: "2) Read next batch of messages only when previous batch processed" --> this is already what is happening, with only a size 1 buffer in the {{Handover}}. Also, I don't think it will solve the root cause of whats causing your OOM, even if we remove buffering completely. "1) KafkaConsumerThread read messages with total size ~1G." --> as Robert mentioned, you should be able to just directly configure the Kafka client for that, and will likely solve your problem. > OOM during reading big messages from Kafka > -- > > Key: FLINK-6613 > URL: https://issues.apache.org/jira/browse/FLINK-6613 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.2.0 >Reporter: Andrey > > Steps to reproduce: > 1) Setup Task manager with 2G heap size > 2) Setup job that reads messages from Kafka 10 (i.e. FlinkKafkaConsumer010) > 3) Send 3300 messages each 635Kb. So total size is ~2G > 4) OOM in task manager. > According to heap dump: > 1) KafkaConsumerThread read messages with total size ~1G. > 2) Pass them to the next operator using > org.apache.flink.streaming.connectors.kafka.internal.Handover > 3) Then began to read another batch of messages. > 4) Task manager was able to read next batch of ~500Mb messages until OOM. > Expected: > 1) Either have constraint like "number of messages in-flight" OR > 2) Read next batch of messages only when previous batch processed OR > 3) Any other option which will solve OOM. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-6613) OOM during reading big messages from Kafka
[ https://issues.apache.org/jira/browse/FLINK-6613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16022285#comment-16022285 ] Tzu-Li (Gordon) Tai edited comment on FLINK-6613 at 5/24/17 4:11 AM: - .+1 to what [~rmetzger] mentioned. The Kafka consumer (0.9+) will always have as much as 2 {{ConsumerRecords}}, one in the {{Handover}} awaiting to be processed, another awaiting to be added to {{Handover}}. Regarding what you expect: "2) Read next batch of messages only when previous batch processed" --> this is already what is happening, with only a size 1 buffer in the {{Handover}}. Also, I don't think it will solve the root cause of whats causing your OOM, even if we remove buffering completely. "1) KafkaConsumerThread read messages with total size ~1G." --> as Robert mentioned, you should be able to just directly configure the Kafka client for that, and will likely solve your problem. was (Author: tzulitai): +1 to what [~rmetzger] mentioned. The Kafka consumer (0.9+) will always have as much as 2 {{ConsumerRecords}}, one in the {{Handover}} awaiting to be processed, another awaiting to be added to {{Handover}}. Regarding what you expect: "2) Read next batch of messages only when previous batch processed" --> this is already what is happening, with only a size 1 buffer in the {{Handover}}. Also, I don't think it will solve the root cause of whats causing your OOM, even if we remove buffering completely. "1) KafkaConsumerThread read messages with total size ~1G." --> as Robert mentioned, you should be able to just directly configure the Kafka client for that, and will likely solve your problem. > OOM during reading big messages from Kafka > -- > > Key: FLINK-6613 > URL: https://issues.apache.org/jira/browse/FLINK-6613 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.2.0 >Reporter: Andrey > > Steps to reproduce: > 1) Setup Task manager with 2G heap size > 2) Setup job that reads messages from Kafka 10 (i.e. FlinkKafkaConsumer010) > 3) Send 3300 messages each 635Kb. So total size is ~2G > 4) OOM in task manager. > According to heap dump: > 1) KafkaConsumerThread read messages with total size ~1G. > 2) Pass them to the next operator using > org.apache.flink.streaming.connectors.kafka.internal.Handover > 3) Then began to read another batch of messages. > 4) Task manager was able to read next batch of ~500Mb messages until OOM. > Expected: > 1) Either have constraint like "number of messages in-flight" OR > 2) Read next batch of messages only when previous batch processed OR > 3) Any other option which will solve OOM. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-6613) OOM during reading big messages from Kafka
[ https://issues.apache.org/jira/browse/FLINK-6613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16022285#comment-16022285 ] Tzu-Li (Gordon) Tai edited comment on FLINK-6613 at 5/24/17 4:12 AM: - . +1 to what [~rmetzger] mentioned. The Kafka consumer (0.9+) will always have as much as 2 {{ConsumerRecords}}, one in the {{Handover}} awaiting to be processed, another awaiting to be added to {{Handover}}. Regarding what you expect: "2) Read next batch of messages only when previous batch processed" --> this is already what is happening, with only a size 1 buffer in the {{Handover}}. Also, I don't think it will solve the root cause of whats causing your OOM, even if we remove buffering completely. "1) KafkaConsumerThread read messages with total size ~1G." --> as Robert mentioned, you should be able to just directly configure the Kafka client for that, and will likely solve your problem. was (Author: tzulitai): .+1 to what [~rmetzger] mentioned. The Kafka consumer (0.9+) will always have as much as 2 {{ConsumerRecords}}, one in the {{Handover}} awaiting to be processed, another awaiting to be added to {{Handover}}. Regarding what you expect: "2) Read next batch of messages only when previous batch processed" --> this is already what is happening, with only a size 1 buffer in the {{Handover}}. Also, I don't think it will solve the root cause of whats causing your OOM, even if we remove buffering completely. "1) KafkaConsumerThread read messages with total size ~1G." --> as Robert mentioned, you should be able to just directly configure the Kafka client for that, and will likely solve your problem. > OOM during reading big messages from Kafka > -- > > Key: FLINK-6613 > URL: https://issues.apache.org/jira/browse/FLINK-6613 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.2.0 >Reporter: Andrey > > Steps to reproduce: > 1) Setup Task manager with 2G heap size > 2) Setup job that reads messages from Kafka 10 (i.e. FlinkKafkaConsumer010) > 3) Send 3300 messages each 635Kb. So total size is ~2G > 4) OOM in task manager. > According to heap dump: > 1) KafkaConsumerThread read messages with total size ~1G. > 2) Pass them to the next operator using > org.apache.flink.streaming.connectors.kafka.internal.Handover > 3) Then began to read another batch of messages. > 4) Task manager was able to read next batch of ~500Mb messages until OOM. > Expected: > 1) Either have constraint like "number of messages in-flight" OR > 2) Read next batch of messages only when previous batch processed OR > 3) Any other option which will solve OOM. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-6613) OOM during reading big messages from Kafka
[ https://issues.apache.org/jira/browse/FLINK-6613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16022285#comment-16022285 ] Tzu-Li (Gordon) Tai edited comment on FLINK-6613 at 5/24/17 4:12 AM: - . + 1 to what [~rmetzger] mentioned. The Kafka consumer (0.9+) will always have as much as 2 {{ConsumerRecords}}, one in the {{Handover}} awaiting to be processed, another awaiting to be added to {{Handover}} in the {{KafkaConsumerThread}}. Regarding what you expect: "2) Read next batch of messages only when previous batch processed" --> this is already what is happening, with only a size 1 buffer in the {{Handover}}. Also, I don't think it will solve the root cause of whats causing your OOM, even if we remove buffering completely. "1) KafkaConsumerThread read messages with total size ~1G." --> as Robert mentioned, you should be able to just directly configure the Kafka client for that, and will likely solve your problem. was (Author: tzulitai): . + 1 to what [~rmetzger] mentioned. The Kafka consumer (0.9+) will always have as much as 2 {{ConsumerRecords}}, one in the {{Handover}} awaiting to be processed, another awaiting to be added to {{Handover}}. Regarding what you expect: "2) Read next batch of messages only when previous batch processed" --> this is already what is happening, with only a size 1 buffer in the {{Handover}}. Also, I don't think it will solve the root cause of whats causing your OOM, even if we remove buffering completely. "1) KafkaConsumerThread read messages with total size ~1G." --> as Robert mentioned, you should be able to just directly configure the Kafka client for that, and will likely solve your problem. > OOM during reading big messages from Kafka > -- > > Key: FLINK-6613 > URL: https://issues.apache.org/jira/browse/FLINK-6613 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.2.0 >Reporter: Andrey > > Steps to reproduce: > 1) Setup Task manager with 2G heap size > 2) Setup job that reads messages from Kafka 10 (i.e. FlinkKafkaConsumer010) > 3) Send 3300 messages each 635Kb. So total size is ~2G > 4) OOM in task manager. > According to heap dump: > 1) KafkaConsumerThread read messages with total size ~1G. > 2) Pass them to the next operator using > org.apache.flink.streaming.connectors.kafka.internal.Handover > 3) Then began to read another batch of messages. > 4) Task manager was able to read next batch of ~500Mb messages until OOM. > Expected: > 1) Either have constraint like "number of messages in-flight" OR > 2) Read next batch of messages only when previous batch processed OR > 3) Any other option which will solve OOM. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-6613) OOM during reading big messages from Kafka
[ https://issues.apache.org/jira/browse/FLINK-6613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015569#comment-16015569 ] Dmytro Shkvyra edited comment on FLINK-6613 at 5/18/17 11:08 AM: - Hi [~dernasherbrezon], first of all root cause of this issue is using ParallelGC. OOM is normal behavior for JVM with ParallelGC if application create too much objects (please explore ParallelGC algoritm). -XX:-UseGCOverheadLimit just hide problem with lack of memory. {quote} 3) If you recommend G1, then default startup scripts should be changed. {quote} We don't need change startup scripts. You can {{export JVM_ARGS="$JVM_ARGS -XX:+UseG1GC"}}, you also can pass other JVM options (except memory size options) JobManager and TaskManager use the same options from {{JVM_ARGS}} BTW, if you use {{-XX:-UseParNewGC -XX:+UseConcMarkSweepGC}} options (serial GC) Flink will not read too much messages from Kafka because Flink's JVM will be suspended for "stop the world". was (Author: dshkvyra): Hi [~dernasherbrezon], first of all root cause of this issue is using ParallelGC. OOM is normal behavior for JVM with ParallelGC if application create too much objects (please explore ParallelGC algoritm). -XX:-UseGCOverheadLimit just hide problem with lack of memory. {quote} 3) If you recommend G1, then default startup scripts should be changed. {quote} We don't need change startup scripts. You can {{export JVM_ARGS="$JVM_ARGS -XX:+UseG1GC"}}, you also can pass other JVM options (except memory size options) JobManager and TaskManager use the same options from {{JVM_ARGS}} > OOM during reading big messages from Kafka > -- > > Key: FLINK-6613 > URL: https://issues.apache.org/jira/browse/FLINK-6613 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.2.0 >Reporter: Andrey > > Steps to reproduce: > 1) Setup Task manager with 2G heap size > 2) Setup job that reads messages from Kafka 10 (i.e. FlinkKafkaConsumer010) > 3) Send 3300 messages each 635Kb. So total size is ~2G > 4) OOM in task manager. > According to heap dump: > 1) KafkaConsumerThread read messages with total size ~1G. > 2) Pass them to the next operator using > org.apache.flink.streaming.connectors.kafka.internal.Handover > 3) Then began to read another batch of messages. > 4) Task manager was able to read next batch of ~500Mb messages until OOM. > Expected: > 1) Either have constraint like "number of messages in-flight" OR > 2) Read next batch of messages only when previous batch processed OR > 3) Any other option which will solve OOM. -- This message was sent by Atlassian JIRA (v6.3.15#6346)