Jason Kania created FLINK-33486:
-----------------------------------
Summary: Pulsar Client Send Timeout Terminates TaskManager
Key: FLINK-33486
URL: https://issues.apache.org/jira/browse/FLINK-33486
Project: Flink
Issue Type: Bug
Components: Connectors / Pulsar
Affects Versions: 1.17.1
Reporter: Jason Kania
Currently, when the Pulsar Producer encounters a timeout when attempting to
send data, it generates an unhandled TimeoutException. This is not a reasonable
way to handle the timeout. The situation should be handled in a graceful way
either through additional parameters that put control of the action under the
discretion of the user or through some callback mechanism that the user can
work with to write code. Unfortunately, fight now, this causes a termination of
the task manager which then leads to other issues.
Increasing the timeout period to avoid the issue is not really an option to
ensure proper handling in the event that the situation does occur.
The exception is as follows:
org.apache.flink.util.FlinkRuntimeException: Failed to send data to Pulsar:
persistent://public/default/myproducer-partition-0
at
org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.throwSendingException(PulsarWriter.java:182)
~[flink-connector-pulsar-4.0.0-1.17.jar:4.0.0-1.17]
at
org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.lambda$write$0(PulsarWriter.java:172)
~[flink-connector-pulsar-4.0.0-1.17.jar:4.0.0-1.17]
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
~[flink-dist-1.17.1.jar:1.17.1]
at
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
~[flink-dist-1.17.1.jar:1.17.1]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
~[flink-dist-1.17.1.jar:1.17.1]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
~[flink-dist-1.17.1.jar:1.17.1]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
~[flink-dist-1.17.1.jar:1.17.1]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
~[flink-dist-1.17.1.jar:1.17.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
~[flink-dist-1.17.1.jar:1.17.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
~[flink-dist-1.17.1.jar:1.17.1]
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
~[flink-dist-1.17.1.jar:1.17.1]
at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
[flink-dist-1.17.1.jar:1.17.1]
at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: org.apache.pulsar.client.api.PulsarClientException$TimeoutException:
The producer myproducer- f4b1580b-1ea8-4c21-9d0b-da4d12ca6f93 can not send
message to the topic persistent://public/default/myproducer-partition-0 within
given timeout
at
org.apache.pulsar.client.impl.ProducerImpl.run(ProducerImpl.java:1993)
~[pulsar-client-all-2.11.2.jar:2.11.2]
at
org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelTimeout.run(HashedWheelTimer.java:715)
~[pulsar-client-all-2.11.2.jar:2.11.2]
at
org.apache.pulsar.shade.io.netty.util.concurrent.ImmediateExecutor.execute(ImmediateExecutor.java:34)
~[pulsar-client-all-2.11.2.jar:2.11.2]
at
org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:703)
~[pulsar-client-all-2.11.2.jar:2.11.2]
at
org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:790)
~[pulsar-client-all-2.11.2.jar:2.11.2]
at
org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:503)
~[pulsar-client-all-2.11.2.jar:2.11.2]
at
org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
~[pulsar-client-all-2.11.2.jar:2.11.2]
... 1 more
--
This message was sent by Atlassian Jira
(v8.20.10#820010)