Hi team,

I'm trying to run a flink-statefun application (version 3.2.0) on my local
machine. The application is a pipeline consisting of multiple services that
communicate to each other via sending http requests served by aiohttp. I am
using a single job manager and a single task manager. When I run the
application, in the worker logs I see these warnings multiple times:

2022-03-18 17:35:43,315 WARN
org.apache.flink.statefun.flink.core.nettyclient.NettyRequest
[] - Exception caught while trying to deliver a message: (attempt
#0)ToFunctionRequestSummary(address=Address(analytics-transformer,
dispatch, 77ce0dcb-347c-4c03-bc32-f7ebb734b930), batchSize=1,
totalSizeInBytes=1323, numberOfStates=0)


org.apache.flink.statefun.flink.core.nettyclient.exceptions.DisconnectedException:
Disconnected




18:25:27,594 WARN
org.apache.flink.statefun.flink.core.nettyclient.NettyRequest
[] - Exception caught while trying to deliver a message: (attempt
#0)ToFunctionRequestSummary(address=Address(web, statefun,
82936819-b3d9-4a24-b4eb-81a189d6306c), batchSize=1, totalSizeInBytes=1434,
numberOfStates=0)


org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
readAddress(..) failed: Connection reset by peer




2022-03-18 18:06:44,848 WARN
org.apache.flink.statefun.flink.core.nettyclient.NettyRequest
[] - Exception caught while trying to deliver a message: (attempt
#0)ToFunctionRequestSummary(address=Address(web, statefun,
f004409f-77be-433c-8ab1-ae5f9dad605c), batchSize=1, totalSizeInBytes=1172,
numberOfStates=0)

java.lang.IllegalStateException: FixedChannelPool was closed




Then after some time I see that the master has crashed due to a request
timeout:


org.apache.flink.statefun.flink.core.functions.StatefulFunctionInvocationException:
An error occurred when attempting to invoke function
FunctionType(analytics-transformer, dispatch).

    at
org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:50)
~[statefun-flink-core.jar:3.2.0]

    at
org.apache.flink.statefun.flink.core.functions.ReusableContext.apply(ReusableContext.java:74)
~[statefun-flink-core.jar:3.2.0]

    at
org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.processNextEnvelope(LocalFunctionGroup.java:60)
~[statefun-flink-core.jar:3.2.0]

    at
org.apache.flink.statefun.flink.core.functions.Reductions.processEnvelopes(Reductions.java:164)
~[statefun-flink-core.jar:3.2.0]

    at
org.apache.flink.statefun.flink.core.functions.AsyncSink.drainOnOperatorThread(AsyncSink.java:119)
~[statefun-flink-core.jar:3.2.0]

    at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
~[flink-dist_2.12-1.14.3.jar:1.14.3]

    at
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
~[flink-dist_2.12-1.14.3.jar:1.14.3]

    at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
~[flink-dist_2.12-1.14.3.jar:1.14.3]

    at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
~[flink-dist_2.12-1.14.3.jar:1.14.3]

    at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
~[flink-dist_2.12-1.14.3.jar:1.14.3]

    at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
~[flink-dist_2.12-1.14.3.jar:1.14.3]

    at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
~[flink-dist_2.12-1.14.3.jar:1.14.3]

    at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
~[flink-dist_2.12-1.14.3.jar:1.14.3]

    at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
~[flink-dist_2.12-1.14.3.jar:1.14.3]

    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
~[flink-dist_2.12-1.14.3.jar:1.14.3]

    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
~[flink-dist_2.12-1.14.3.jar:1.14.3]

    at java.lang.Thread.run(Unknown Source) ~[?:?]

Caused by: java.lang.IllegalStateException: Failure forwarding a message to
a remote function Address(analytics-transformer, dispatch,
77d07eb3-f499-4265-a456-b0f75d738830)

    at
org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.onAsyncResult(RequestReplyFunction.java:170)
~[statefun-flink-core.jar:3.2.0]

    at
org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.invoke(RequestReplyFunction.java:124)
~[statefun-flink-core.jar:3.2.0]

    at
org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:48)
~[statefun-flink-core.jar:3.2.0]

    ... 16 more

Caused by:
org.apache.flink.statefun.flink.core.nettyclient.exceptions.RequestTimeoutException



Could someone tell me why these warnings are occurring and how to fix them?
I'm assuming it is a load-related issue due to the number of incoming
requests, if that is the case how should I go about handling it? Thanks.

Reply via email to