[ https://issues.apache.org/jira/browse/FLINK-27963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Flink Jira Bot updated FLINK-27963: ----------------------------------- Labels: FlinkRuntimeException KafkaSink auto-deprioritized-major auto-deprioritized-minor (was: FlinkRuntimeException KafkaSink auto-deprioritized-major stale-minor) Priority: Not a Priority (was: Minor) This issue was labeled "stale-minor" 7 days ago and has not received any updates so it is being deprioritized. If this ticket is actually Minor, please raise the priority and ask a committer to assign you the issue or revive the public discussion. > FlinkRuntimeException in KafkaSink causes a Flink job to hang > ------------------------------------------------------------- > > Key: FLINK-27963 > URL: https://issues.apache.org/jira/browse/FLINK-27963 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.14.4, 1.15.0 > Reporter: Dmytro > Priority: Not a Priority > Labels: FlinkRuntimeException, KafkaSink, > auto-deprioritized-major, auto-deprioritized-minor > > If FlinkRuntimeException occurs in the > [KafkaSink|https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/kafka/#kafka-sink] > then the Flink job tries to re-send failed data again and gets into endless > loop "exception->send again" > *Code sample which throws the FlinkRuntimeException:* > {code:java} > int numberOfRows = 1; > int rowsPerSecond = 1; > DataStream<String> stream = environment.addSource( > new DataGeneratorSource<>( > RandomGenerator.stringGenerator(1050000), // > max.message.bytes=1048588 > rowsPerSecond, > (long) numberOfRows), > TypeInformation.of(String.class)) > .setParallelism(1) > .name("string-generator"); > KafkaSinkBuilder<String> builder = KafkaSink.<String>builder() > .setBootstrapServers("localhost:9092") > .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) > .setRecordSerializer( > > KafkaRecordSerializationSchema.builder().setTopic("test.output") > .setValueSerializationSchema(new > SimpleStringSchema()) > .build()); > KafkaSink<String> sink = builder.build(); > stream.sinkTo(sink).setParallelism(1).name("output-producer"); {code} > *Exception Stack Trace:* > {code:java} > 2022-06-02/14:01:45.066/PDT [flink-akka.actor.default-dispatcher-4] INFO > output-producer: Writer -> output-producer: Committer (1/1) > (a66beca5a05c1c27691f7b94ca6ac025) switched from RUNNING to FAILED on > 271b1b90-7d6b-4a34-8116-3de6faa8a9bf @ 127.0.0.1 (dataPort=-1). > org.apache.flink.util.FlinkRuntimeException: Failed to send data to Kafka > null with FlinkKafkaInternalProducer{transactionalId='null', > inTransaction=false, closed=false} at > org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.throwException(KafkaWriter.java:440) > ~[flink-connector-kafka-1.15.0.jar:1.15.0] at > org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.lambda$onCompletion$0(KafkaWriter.java:421) > ~[flink-connector-kafka-1.15.0.jar:1.15.0] at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > ~[flink-streaming-java-1.15.0.jar:1.15.0] at > org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > ~[flink-streaming-java-1.15.0.jar:1.15.0] at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353) > ~[flink-streaming-java-1.15.0.jar:1.15.0] at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317) > ~[flink-streaming-java-1.15.0.jar:1.15.0] at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) > ~[flink-streaming-java-1.15.0.jar:1.15.0] at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) > ~[flink-streaming-java-1.15.0.jar:1.15.0] at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) > ~[flink-streaming-java-1.15.0.jar:1.15.0] at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > ~[flink-runtime-1.15.0.jar:1.15.0] at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) > ~[flink-runtime-1.15.0.jar:1.15.0] at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > ~[flink-runtime-1.15.0.jar:1.15.0] at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > ~[flink-runtime-1.15.0.jar:1.15.0] at java.lang.Thread.run(Thread.java:748) > ~[?:1.8.0_292] Caused by: > org.apache.kafka.common.errors.RecordTooLargeException: The message is > 1050088 bytes when serialized which is larger than 1048576, which is the > value of the max.request.size configuration. {code} > ** -- This message was sent by Atlassian Jira (v8.20.10#820010)