[ 
https://issues.apache.org/jira/browse/FLINK-27963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17551989#comment-17551989
 ] 

Martijn Visser commented on FLINK-27963:
----------------------------------------

I don't think this is a bug on Flink's end. Kafka is specifically returning an 
exception that the message is larger than the max.request.size. Flink is 
unaware of why Kafka won't produce it, so it will retry indefinitely. The fix 
is also not on Flink's end, but on Kafka's end: you'll need to increase the 
max.request.size. 

> FlinkRuntimeException in KafkaSink causes a Flink job to hang
> -------------------------------------------------------------
>
>                 Key: FLINK-27963
>                 URL: https://issues.apache.org/jira/browse/FLINK-27963
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.15.0, 1.14.4
>            Reporter: Dmytro
>            Priority: Major
>              Labels: FlinkRuntimeException, KafkaSink
>
> If FlinkRuntimeException occurs in the 
> [KafkaSink|https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/kafka/#kafka-sink]
>  then the Flink job tries to re-send failed data  again and gets into endless 
> loop "exception->send again"
> *Code sample which throws the FlinkRuntimeException:*
> {code:java}
> int numberOfRows = 1;
>     int rowsPerSecond = 1;
>     DataStream<String> stream = environment.addSource(
>                     new DataGeneratorSource<>(
>                             RandomGenerator.stringGenerator(1050000), // 
> max.message.bytes=1048588
>                             rowsPerSecond,
>                             (long) numberOfRows),
>                     TypeInformation.of(String.class))
>             .setParallelism(1)
>             .name("string-generator");
>     KafkaSinkBuilder<String> builder = KafkaSink.<String>builder()
>             .setBootstrapServers("localhost:9092")
>             .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
>             .setRecordSerializer(
>                     
> KafkaRecordSerializationSchema.builder().setTopic("test.output")
>                             .setValueSerializationSchema(new 
> SimpleStringSchema())
>                             .build());
>     KafkaSink<String> sink = builder.build();
>     stream.sinkTo(sink).setParallelism(1).name("output-producer"); {code}
> *Exception Stack Trace:*
> {code:java}
> 2022-06-02/14:01:45.066/PDT [flink-akka.actor.default-dispatcher-4] INFO 
> output-producer: Writer -> output-producer: Committer (1/1) 
> (a66beca5a05c1c27691f7b94ca6ac025) switched from RUNNING to FAILED on 
> 271b1b90-7d6b-4a34-8116-3de6faa8a9bf @ 127.0.0.1 (dataPort=-1). 
> org.apache.flink.util.FlinkRuntimeException: Failed to send data to Kafka 
> null with FlinkKafkaInternalProducer{transactionalId='null', 
> inTransaction=false, closed=false} at 
> org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.throwException(KafkaWriter.java:440)
>  ~[flink-connector-kafka-1.15.0.jar:1.15.0] at 
> org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.lambda$onCompletion$0(KafkaWriter.java:421)
>  ~[flink-connector-kafka-1.15.0.jar:1.15.0] at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>  ~[flink-streaming-java-1.15.0.jar:1.15.0] at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) 
> ~[flink-streaming-java-1.15.0.jar:1.15.0] at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
>  ~[flink-streaming-java-1.15.0.jar:1.15.0] at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
>  ~[flink-streaming-java-1.15.0.jar:1.15.0] at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
>  ~[flink-streaming-java-1.15.0.jar:1.15.0] at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>  ~[flink-streaming-java-1.15.0.jar:1.15.0] at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>  ~[flink-streaming-java-1.15.0.jar:1.15.0] at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>  ~[flink-runtime-1.15.0.jar:1.15.0] at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) 
> ~[flink-runtime-1.15.0.jar:1.15.0] at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) 
> ~[flink-runtime-1.15.0.jar:1.15.0] at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) 
> ~[flink-runtime-1.15.0.jar:1.15.0] at java.lang.Thread.run(Thread.java:748) 
> ~[?:1.8.0_292] Caused by: 
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1050088 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration. {code}
> **



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to