[jira] [Commented] (FLINK-3343) Exception while using Kafka 0.9 connector
[ https://issues.apache.org/jira/browse/FLINK-3343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15224633#comment-15224633 ] Farouk Salem commented on FLINK-3343: - I use the default values. > Exception while using Kafka 0.9 connector > -- > > Key: FLINK-3343 > URL: https://issues.apache.org/jira/browse/FLINK-3343 > Project: Flink > Issue Type: Improvement > Components: flink-contrib, Kafka Connector >Affects Versions: 1.0.0 >Reporter: Farouk Salem > > While running a job, without fault tolerance, producing data to Kafka, the > job failed due to "Batch Expired exception". I tried to increase the > "request.timeout.ms" and "max.block.ms" to 6 instead of 3 but still > the same problem. The only way to ride on this problem is using snapshotting. > 09:58:11,036 WARN org.apache.kafka.clients.producer.internals.Sender >- Got error produce response with correlation id 48106 on topic-partition > flinkWordCountNoFaultToleranceSmall > -2, retrying (2147483646 attempts left). Error: NETWORK_EXCEPTION > 09:58:11,036 WARN org.apache.kafka.clients.producer.internals.Sender >- Got error produce response with correlation id 48105 on topic-partition > flinkWordCountNoFaultToleranceSmall > -2, retrying (2147483646 attempts left). Error: NETWORK_EXCEPTION > 09:58:11,036 WARN org.apache.kafka.clients.producer.internals.Sender >- Got error produce response with correlation id 48104 on topic-partition > flinkWordCountNoFaultToleranceSmall > -2, retrying (2147483646 attempts left). Error: NETWORK_EXCEPTION > 09:58:11,068 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask >- Caught exception while processing timer. > java.lang.RuntimeException: Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:319) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:300) > at > org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:48) > at > org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:29) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > org.apache.flink.streaming.runtime.operators.windowing.AggregatingKeyedTimePanes.evaluateWindow(AggregatingKeyedTimePanes.java:59) > at > org.apache.flink.streaming.runtime.operators.windowing.AbstractAlignedProcessingTimeWindowOperator.computeWindow(AbstractAlignedProcessingTimeWindowOperator.java:242) > at > org.apache.flink.streaming.runtime.operators.windowing.AbstractAlignedProcessingTimeWindowOperator.trigger(AbstractAlignedProcessingTimeWindowOperator.java:223) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:606) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:319) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:300) > at > org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:48) > at > org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:29) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:37) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:316) > ... 15 more > Caused by: java.lang.Exception: Failed to send data to Kafka: Batch Expired > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:282) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(FlinkKafkaProducerBase.java:249) > at >
[jira] [Commented] (FLINK-3343) Exception while using Kafka 0.9 connector
[ https://issues.apache.org/jira/browse/FLINK-3343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15224606#comment-15224606 ] Farouk Salem commented on FLINK-3343: - yeah, it works most of the time. But, sometimes it throws "GC overhead limit exceeded" exception. > Exception while using Kafka 0.9 connector > -- > > Key: FLINK-3343 > URL: https://issues.apache.org/jira/browse/FLINK-3343 > Project: Flink > Issue Type: Improvement > Components: flink-contrib, Kafka Connector >Affects Versions: 1.0.0 >Reporter: Farouk Salem > > While running a job, without fault tolerance, producing data to Kafka, the > job failed due to "Batch Expired exception". I tried to increase the > "request.timeout.ms" and "max.block.ms" to 6 instead of 3 but still > the same problem. The only way to ride on this problem is using snapshotting. > 09:58:11,036 WARN org.apache.kafka.clients.producer.internals.Sender >- Got error produce response with correlation id 48106 on topic-partition > flinkWordCountNoFaultToleranceSmall > -2, retrying (2147483646 attempts left). Error: NETWORK_EXCEPTION > 09:58:11,036 WARN org.apache.kafka.clients.producer.internals.Sender >- Got error produce response with correlation id 48105 on topic-partition > flinkWordCountNoFaultToleranceSmall > -2, retrying (2147483646 attempts left). Error: NETWORK_EXCEPTION > 09:58:11,036 WARN org.apache.kafka.clients.producer.internals.Sender >- Got error produce response with correlation id 48104 on topic-partition > flinkWordCountNoFaultToleranceSmall > -2, retrying (2147483646 attempts left). Error: NETWORK_EXCEPTION > 09:58:11,068 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask >- Caught exception while processing timer. > java.lang.RuntimeException: Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:319) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:300) > at > org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:48) > at > org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:29) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > org.apache.flink.streaming.runtime.operators.windowing.AggregatingKeyedTimePanes.evaluateWindow(AggregatingKeyedTimePanes.java:59) > at > org.apache.flink.streaming.runtime.operators.windowing.AbstractAlignedProcessingTimeWindowOperator.computeWindow(AbstractAlignedProcessingTimeWindowOperator.java:242) > at > org.apache.flink.streaming.runtime.operators.windowing.AbstractAlignedProcessingTimeWindowOperator.trigger(AbstractAlignedProcessingTimeWindowOperator.java:223) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:606) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:319) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:300) > at > org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:48) > at > org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:29) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:37) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:316) > ... 15 more > Caused by: java.lang.Exception: Failed to send data to Kafka: Batch Expired > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:282) > at >
[jira] [Commented] (FLINK-3343) Exception while using Kafka 0.9 connector
[ https://issues.apache.org/jira/browse/FLINK-3343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15208659#comment-15208659 ] Farouk Salem commented on FLINK-3343: - Hi Robert, I still have this problem but it happens less with Flink 1.0.0. I tried to send an email to u...@kafka.apache.org as you told me but I didn't get any reply. > Exception while using Kafka 0.9 connector > -- > > Key: FLINK-3343 > URL: https://issues.apache.org/jira/browse/FLINK-3343 > Project: Flink > Issue Type: Improvement > Components: flink-contrib, Kafka Connector >Affects Versions: 1.0.0 >Reporter: Farouk Salem > > While running a job, without fault tolerance, producing data to Kafka, the > job failed due to "Batch Expired exception". I tried to increase the > "request.timeout.ms" and "max.block.ms" to 6 instead of 3 but still > the same problem. The only way to ride on this problem is using snapshotting. > 09:58:11,036 WARN org.apache.kafka.clients.producer.internals.Sender >- Got error produce response with correlation id 48106 on topic-partition > flinkWordCountNoFaultToleranceSmall > -2, retrying (2147483646 attempts left). Error: NETWORK_EXCEPTION > 09:58:11,036 WARN org.apache.kafka.clients.producer.internals.Sender >- Got error produce response with correlation id 48105 on topic-partition > flinkWordCountNoFaultToleranceSmall > -2, retrying (2147483646 attempts left). Error: NETWORK_EXCEPTION > 09:58:11,036 WARN org.apache.kafka.clients.producer.internals.Sender >- Got error produce response with correlation id 48104 on topic-partition > flinkWordCountNoFaultToleranceSmall > -2, retrying (2147483646 attempts left). Error: NETWORK_EXCEPTION > 09:58:11,068 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask >- Caught exception while processing timer. > java.lang.RuntimeException: Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:319) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:300) > at > org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:48) > at > org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:29) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > org.apache.flink.streaming.runtime.operators.windowing.AggregatingKeyedTimePanes.evaluateWindow(AggregatingKeyedTimePanes.java:59) > at > org.apache.flink.streaming.runtime.operators.windowing.AbstractAlignedProcessingTimeWindowOperator.computeWindow(AbstractAlignedProcessingTimeWindowOperator.java:242) > at > org.apache.flink.streaming.runtime.operators.windowing.AbstractAlignedProcessingTimeWindowOperator.trigger(AbstractAlignedProcessingTimeWindowOperator.java:223) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:606) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:319) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:300) > at > org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:48) > at > org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:29) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:37) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:316) > ... 15 more > Caused by: java.lang.Exception: Failed to send data to Kafka: Batch Expired > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:282) > at >
[jira] [Commented] (FLINK-3343) Exception while using Kafka 0.9 connector
[ https://issues.apache.org/jira/browse/FLINK-3343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15136945#comment-15136945 ] Farouk Salem commented on FLINK-3343: - Thanks. I will send an email right now :) > Exception while using Kafka 0.9 connector > -- > > Key: FLINK-3343 > URL: https://issues.apache.org/jira/browse/FLINK-3343 > Project: Flink > Issue Type: Improvement > Components: flink-contrib, Kafka Connector >Affects Versions: 1.00 >Reporter: Farouk Salem > > While running a job, without fault tolerance, producing data to Kafka, the > job failed due to "Batch Expired exception". I tried to increase the > "request.timeout.ms" and "max.block.ms" to 6 instead of 3 but still > the same problem. The only way to ride on this problem is using snapshotting. > 09:58:11,036 WARN org.apache.kafka.clients.producer.internals.Sender >- Got error produce response with correlation id 48106 on topic-partition > flinkWordCountNoFaultToleranceSmall > -2, retrying (2147483646 attempts left). Error: NETWORK_EXCEPTION > 09:58:11,036 WARN org.apache.kafka.clients.producer.internals.Sender >- Got error produce response with correlation id 48105 on topic-partition > flinkWordCountNoFaultToleranceSmall > -2, retrying (2147483646 attempts left). Error: NETWORK_EXCEPTION > 09:58:11,036 WARN org.apache.kafka.clients.producer.internals.Sender >- Got error produce response with correlation id 48104 on topic-partition > flinkWordCountNoFaultToleranceSmall > -2, retrying (2147483646 attempts left). Error: NETWORK_EXCEPTION > 09:58:11,068 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask >- Caught exception while processing timer. > java.lang.RuntimeException: Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:319) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:300) > at > org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:48) > at > org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:29) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > org.apache.flink.streaming.runtime.operators.windowing.AggregatingKeyedTimePanes.evaluateWindow(AggregatingKeyedTimePanes.java:59) > at > org.apache.flink.streaming.runtime.operators.windowing.AbstractAlignedProcessingTimeWindowOperator.computeWindow(AbstractAlignedProcessingTimeWindowOperator.java:242) > at > org.apache.flink.streaming.runtime.operators.windowing.AbstractAlignedProcessingTimeWindowOperator.trigger(AbstractAlignedProcessingTimeWindowOperator.java:223) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:606) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:319) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:300) > at > org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:48) > at > org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:29) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:37) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:316) > ... 15 more > Caused by: java.lang.Exception: Failed to send data to Kafka: Batch Expired > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:282) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(FlinkKafkaProducerBase.java:249) > at >
[jira] [Commented] (FLINK-3343) Exception while using Kafka 0.9 connector
[ https://issues.apache.org/jira/browse/FLINK-3343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15135756#comment-15135756 ] Farouk Salem commented on FLINK-3343: - Sorry, I checked an old version of kafka documentation (Kafka 0.7). When I set the batch size with zero, "GC overhead limit exceeded" exception is thrown. I tried to set it with 100 but still the same problem "Batch expired". It works fine with Kafka 0.82 > Exception while using Kafka 0.9 connector > -- > > Key: FLINK-3343 > URL: https://issues.apache.org/jira/browse/FLINK-3343 > Project: Flink > Issue Type: Improvement > Components: flink-contrib, Kafka Connector >Affects Versions: 1.00 >Reporter: Farouk Salem > > While running a job, without fault tolerance, producing data to Kafka, the > job failed due to "Batch Expired exception". I tried to increase the > "request.timeout.ms" and "max.block.ms" to 6 instead of 3 but still > the same problem. The only way to ride on this problem is using snapshotting. > 09:58:11,036 WARN org.apache.kafka.clients.producer.internals.Sender >- Got error produce response with correlation id 48106 on topic-partition > flinkWordCountNoFaultToleranceSmall > -2, retrying (2147483646 attempts left). Error: NETWORK_EXCEPTION > 09:58:11,036 WARN org.apache.kafka.clients.producer.internals.Sender >- Got error produce response with correlation id 48105 on topic-partition > flinkWordCountNoFaultToleranceSmall > -2, retrying (2147483646 attempts left). Error: NETWORK_EXCEPTION > 09:58:11,036 WARN org.apache.kafka.clients.producer.internals.Sender >- Got error produce response with correlation id 48104 on topic-partition > flinkWordCountNoFaultToleranceSmall > -2, retrying (2147483646 attempts left). Error: NETWORK_EXCEPTION > 09:58:11,068 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask >- Caught exception while processing timer. > java.lang.RuntimeException: Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:319) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:300) > at > org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:48) > at > org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:29) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > org.apache.flink.streaming.runtime.operators.windowing.AggregatingKeyedTimePanes.evaluateWindow(AggregatingKeyedTimePanes.java:59) > at > org.apache.flink.streaming.runtime.operators.windowing.AbstractAlignedProcessingTimeWindowOperator.computeWindow(AbstractAlignedProcessingTimeWindowOperator.java:242) > at > org.apache.flink.streaming.runtime.operators.windowing.AbstractAlignedProcessingTimeWindowOperator.trigger(AbstractAlignedProcessingTimeWindowOperator.java:223) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:606) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:319) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:300) > at > org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:48) > at > org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:29) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:37) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:316) > ... 15 more > Caused by: java.lang.Exception: Failed to send data to Kafka: Batch Expired > at >
[jira] [Commented] (FLINK-3343) Exception while using Kafka 0.9 connector
[ https://issues.apache.org/jira/browse/FLINK-3343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15134481#comment-15134481 ] Farouk Salem commented on FLINK-3343: - Thanks for your replies. I decreased the batch size to 6000 instead of 16000. I tried different values greater than 6000 but it threw the error. I checked the default value fort batch size in kafka configuration and it's 200 but it's around 16000 when using Flink, Do you know why ? is it design decision from Flink team ? if yes, so what is it based on ? Thanks so much > Exception while using Kafka 0.9 connector > -- > > Key: FLINK-3343 > URL: https://issues.apache.org/jira/browse/FLINK-3343 > Project: Flink > Issue Type: Improvement > Components: flink-contrib, Kafka Connector >Affects Versions: 1.00 >Reporter: Farouk Salem > > While running a job, without fault tolerance, producing data to Kafka, the > job failed due to "Batch Expired exception". I tried to increase the > "request.timeout.ms" and "max.block.ms" to 6 instead of 3 but still > the same problem. The only way to ride on this problem is using snapshotting. > 09:58:11,036 WARN org.apache.kafka.clients.producer.internals.Sender >- Got error produce response with correlation id 48106 on topic-partition > flinkWordCountNoFaultToleranceSmall > -2, retrying (2147483646 attempts left). Error: NETWORK_EXCEPTION > 09:58:11,036 WARN org.apache.kafka.clients.producer.internals.Sender >- Got error produce response with correlation id 48105 on topic-partition > flinkWordCountNoFaultToleranceSmall > -2, retrying (2147483646 attempts left). Error: NETWORK_EXCEPTION > 09:58:11,036 WARN org.apache.kafka.clients.producer.internals.Sender >- Got error produce response with correlation id 48104 on topic-partition > flinkWordCountNoFaultToleranceSmall > -2, retrying (2147483646 attempts left). Error: NETWORK_EXCEPTION > 09:58:11,068 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask >- Caught exception while processing timer. > java.lang.RuntimeException: Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:319) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:300) > at > org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:48) > at > org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:29) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > org.apache.flink.streaming.runtime.operators.windowing.AggregatingKeyedTimePanes.evaluateWindow(AggregatingKeyedTimePanes.java:59) > at > org.apache.flink.streaming.runtime.operators.windowing.AbstractAlignedProcessingTimeWindowOperator.computeWindow(AbstractAlignedProcessingTimeWindowOperator.java:242) > at > org.apache.flink.streaming.runtime.operators.windowing.AbstractAlignedProcessingTimeWindowOperator.trigger(AbstractAlignedProcessingTimeWindowOperator.java:223) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:606) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:319) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:300) > at > org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:48) > at > org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:29) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:37) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:316) > ... 15 more > Caused by: java.lang.Exception: Failed to
[jira] [Commented] (FLINK-3343) Exception while using Kafka 0.9 connector
[ https://issues.apache.org/jira/browse/FLINK-3343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15134111#comment-15134111 ] Farouk Salem commented on FLINK-3343: - By the way, I opened this as "improvement" because I think that the suitable "batch.size" value could be determined smartly by the Flink Engine. > Exception while using Kafka 0.9 connector > -- > > Key: FLINK-3343 > URL: https://issues.apache.org/jira/browse/FLINK-3343 > Project: Flink > Issue Type: Improvement > Components: flink-contrib, Kafka Connector >Affects Versions: 1.00 >Reporter: Farouk Salem > > While running a job, without fault tolerance, producing data to Kafka, the > job failed due to "Batch Expired exception". I tried to increase the > "request.timeout.ms" and "max.block.ms" to 6 instead of 3 but still > the same problem. The only way to ride on this problem is using snapshotting. > 09:58:11,036 WARN org.apache.kafka.clients.producer.internals.Sender >- Got error produce response with correlation id 48106 on topic-partition > flinkWordCountNoFaultToleranceSmall > -2, retrying (2147483646 attempts left). Error: NETWORK_EXCEPTION > 09:58:11,036 WARN org.apache.kafka.clients.producer.internals.Sender >- Got error produce response with correlation id 48105 on topic-partition > flinkWordCountNoFaultToleranceSmall > -2, retrying (2147483646 attempts left). Error: NETWORK_EXCEPTION > 09:58:11,036 WARN org.apache.kafka.clients.producer.internals.Sender >- Got error produce response with correlation id 48104 on topic-partition > flinkWordCountNoFaultToleranceSmall > -2, retrying (2147483646 attempts left). Error: NETWORK_EXCEPTION > 09:58:11,068 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask >- Caught exception while processing timer. > java.lang.RuntimeException: Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:319) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:300) > at > org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:48) > at > org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:29) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > org.apache.flink.streaming.runtime.operators.windowing.AggregatingKeyedTimePanes.evaluateWindow(AggregatingKeyedTimePanes.java:59) > at > org.apache.flink.streaming.runtime.operators.windowing.AbstractAlignedProcessingTimeWindowOperator.computeWindow(AbstractAlignedProcessingTimeWindowOperator.java:242) > at > org.apache.flink.streaming.runtime.operators.windowing.AbstractAlignedProcessingTimeWindowOperator.trigger(AbstractAlignedProcessingTimeWindowOperator.java:223) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:606) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:319) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:300) > at > org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:48) > at > org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:29) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:37) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:316) > ... 15 more > Caused by: java.lang.Exception: Failed to send data to Kafka: Batch Expired > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:282) > at >
[jira] [Created] (FLINK-3343) Exception while using Kafka 0.9 connector
Farouk Salem created FLINK-3343: --- Summary: Exception while using Kafka 0.9 connector Key: FLINK-3343 URL: https://issues.apache.org/jira/browse/FLINK-3343 Project: Flink Issue Type: Improvement Components: flink-contrib, Kafka Connector Affects Versions: 1.00 Reporter: Farouk Salem While running a job, without fault tolerance, producing data to Kafka, the job failed due to "Batch Expired exception". I tried to increase the "request.timeout.ms" and "max.block.ms" to 6 instead of 3 but still the same problem. The only way to ride on this problem is using snapshotting. 09:58:11,036 WARN org.apache.kafka.clients.producer.internals.Sender - Got error produce response with correlation id 48106 on topic-partition flinkWordCountNoFaultToleranceSmall -2, retrying (2147483646 attempts left). Error: NETWORK_EXCEPTION 09:58:11,036 WARN org.apache.kafka.clients.producer.internals.Sender - Got error produce response with correlation id 48105 on topic-partition flinkWordCountNoFaultToleranceSmall -2, retrying (2147483646 attempts left). Error: NETWORK_EXCEPTION 09:58:11,036 WARN org.apache.kafka.clients.producer.internals.Sender - Got error produce response with correlation id 48104 on topic-partition flinkWordCountNoFaultToleranceSmall -2, retrying (2147483646 attempts left). Error: NETWORK_EXCEPTION 09:58:11,068 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask - Caught exception while processing timer. java.lang.RuntimeException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:319) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:300) at org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:48) at org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:29) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at org.apache.flink.streaming.runtime.operators.windowing.AggregatingKeyedTimePanes.evaluateWindow(AggregatingKeyedTimePanes.java:59) at org.apache.flink.streaming.runtime.operators.windowing.AbstractAlignedProcessingTimeWindowOperator.computeWindow(AbstractAlignedProcessingTimeWindowOperator.java:242) at org.apache.flink.streaming.runtime.operators.windowing.AbstractAlignedProcessingTimeWindowOperator.trigger(AbstractAlignedProcessingTimeWindowOperator.java:223) at org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:606) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:319) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:300) at org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:48) at org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:29) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:37) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:316) ... 15 more Caused by: java.lang.Exception: Failed to send data to Kafka: Batch Expired at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:282) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(FlinkKafkaProducerBase.java:249) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:37) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:316) ... 20 more Caused by: org.apache.kafka.common.errors.TimeoutException: Batch Expired 09:58:11,084 INFO org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka producer
[jira] [Commented] (FLINK-3343) Exception while using Kafka 0.9 connector
[ https://issues.apache.org/jira/browse/FLINK-3343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15134108#comment-15134108 ] Farouk Salem commented on FLINK-3343: - Thanks a lot for your reply and sorry for misunderstanding the usage of JIRA. Next time I will :) > Exception while using Kafka 0.9 connector > -- > > Key: FLINK-3343 > URL: https://issues.apache.org/jira/browse/FLINK-3343 > Project: Flink > Issue Type: Improvement > Components: flink-contrib, Kafka Connector >Affects Versions: 1.00 >Reporter: Farouk Salem > > While running a job, without fault tolerance, producing data to Kafka, the > job failed due to "Batch Expired exception". I tried to increase the > "request.timeout.ms" and "max.block.ms" to 6 instead of 3 but still > the same problem. The only way to ride on this problem is using snapshotting. > 09:58:11,036 WARN org.apache.kafka.clients.producer.internals.Sender >- Got error produce response with correlation id 48106 on topic-partition > flinkWordCountNoFaultToleranceSmall > -2, retrying (2147483646 attempts left). Error: NETWORK_EXCEPTION > 09:58:11,036 WARN org.apache.kafka.clients.producer.internals.Sender >- Got error produce response with correlation id 48105 on topic-partition > flinkWordCountNoFaultToleranceSmall > -2, retrying (2147483646 attempts left). Error: NETWORK_EXCEPTION > 09:58:11,036 WARN org.apache.kafka.clients.producer.internals.Sender >- Got error produce response with correlation id 48104 on topic-partition > flinkWordCountNoFaultToleranceSmall > -2, retrying (2147483646 attempts left). Error: NETWORK_EXCEPTION > 09:58:11,068 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask >- Caught exception while processing timer. > java.lang.RuntimeException: Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:319) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:300) > at > org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:48) > at > org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:29) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > org.apache.flink.streaming.runtime.operators.windowing.AggregatingKeyedTimePanes.evaluateWindow(AggregatingKeyedTimePanes.java:59) > at > org.apache.flink.streaming.runtime.operators.windowing.AbstractAlignedProcessingTimeWindowOperator.computeWindow(AbstractAlignedProcessingTimeWindowOperator.java:242) > at > org.apache.flink.streaming.runtime.operators.windowing.AbstractAlignedProcessingTimeWindowOperator.trigger(AbstractAlignedProcessingTimeWindowOperator.java:223) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:606) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:319) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:300) > at > org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:48) > at > org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:29) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:37) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:316) > ... 15 more > Caused by: java.lang.Exception: Failed to send data to Kafka: Batch Expired > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:282) > at >
[jira] [Created] (FLINK-3326) Kafka Consumer doesn't retrieve messages after killing flink job which was using the same topic
Farouk Salem created FLINK-3326: --- Summary: Kafka Consumer doesn't retrieve messages after killing flink job which was using the same topic Key: FLINK-3326 URL: https://issues.apache.org/jira/browse/FLINK-3326 Project: Flink Issue Type: Bug Components: flink-contrib Affects Versions: 0.10.1 Environment: Java Reporter: Farouk Salem Priority: Blocker If there is a streaming job which retrieving data from a Kafka topic (from the smallest offest) and this job is killed manually, starting another job (or the same job) after the killed job will not be able to consume data from the same topic starting from smallest offset. I tried this behavior more than 10 times and each time, it failed -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3326) Kafka Consumer doesn't retrieve messages after killing flink job which was using the same topic
[ https://issues.apache.org/jira/browse/FLINK-3326?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15130590#comment-15130590 ] Farouk Salem commented on FLINK-3326: - Thanks for your super fast replies. Regarding using checkpointing, it doesn't matter because I tried both scenarios and the same problem happened. Yes, i use the same group id for both jobs! Now I realized that this could be the problem > Kafka Consumer doesn't retrieve messages after killing flink job which was > using the same topic > --- > > Key: FLINK-3326 > URL: https://issues.apache.org/jira/browse/FLINK-3326 > Project: Flink > Issue Type: Bug > Components: flink-contrib >Affects Versions: 0.10.1 > Environment: Java >Reporter: Farouk Salem >Priority: Blocker > Labels: flink, kafka > > If there is a streaming job which retrieving data from a Kafka topic (from > the smallest offest) and this job is killed manually, starting another job > (or the same job) after the killed job will not be able to consume data from > the same topic starting from smallest offset. > I tried this behavior more than 10 times and each time, it failed -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-3326) Kafka Consumer doesn't retrieve messages after killing flink job which was using the same topic
[ https://issues.apache.org/jira/browse/FLINK-3326?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Farouk Salem closed FLINK-3326. --- Resolution: Invalid > Kafka Consumer doesn't retrieve messages after killing flink job which was > using the same topic > --- > > Key: FLINK-3326 > URL: https://issues.apache.org/jira/browse/FLINK-3326 > Project: Flink > Issue Type: Bug > Components: flink-contrib >Affects Versions: 0.10.1 > Environment: Java >Reporter: Farouk Salem >Priority: Blocker > Labels: flink, kafka > > If there is a streaming job which retrieving data from a Kafka topic (from > the smallest offest) and this job is killed manually, starting another job > (or the same job) after the killed job will not be able to consume data from > the same topic starting from smallest offset. > I tried this behavior more than 10 times and each time, it failed -- This message was sent by Atlassian JIRA (v6.3.4#6332)