Hi,

No, they don't - only the job is being restarted after that, without any
luck. Exception I provided is added to a exceptions list of the job itself.

On Mon, Aug 23, 2021 at 4:50 AM Caizhi Weng <tsreape...@gmail.com> wrote:

> Hi!
>
> This might be that some task managers cannot reach out to the job manager
> in time. Has any of the task manager instance restarted after this failure?
> If yes, what does the log (Flink log and kubernetes log) of the failed task
> manager say?
>
> Zbyszko Papierski <zpapier...@wikimedia.org> 于2021年8月20日周五 下午11:07写道:
>
>> Hi!
>>
>> We're trying to successfully deploy our application to our Kubernetes
>> cluster and we seem to have hit a snag. Long story short - any kind of
>> deployment that involves a cluster of more than 1 TM seem to fail our job
>> almost immediately with this exception:
>>
>> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>>> Sending the partition request to 'null' failed.
>>> at
>>> org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:137)
>>> at
>>> org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:125)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:551)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
>>> buffer.memory = 33554432
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
>>>
>>> app//org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
>>>
>>> app//org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:993)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>>> at java.base/java.lang.Thread.run(Thread.java:834)
>>> Caused by: java.nio.channels.ClosedChannelException
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
>>> ... 11 more
>>
>>
>> I don't have enough experience to judge from the rest of the logs what
>> can be the reason for that, but I'm including the debug logs that I can get
>> from kubectl, both JM and TM.
>> We use a session cluster deployed on Kubernetes (not Kubernetes native,
>> there are still some issues why we can't use it) and we deploy our app to
>> that cluster. We have confirmed that everything works when there's a single
>> Task Manager, but we rather not continue with that limitation. The way we
>> define the cluster itself on k8s is shown here [1] and the chart for the
>> deployment itself can be found here [2]. App we're deploying is available
>> here [3]. We're running Flink 1.21.1 on openjdk-jre 11.
>>
>>  Since I overslept k8s revolution a bit and am somewhat new to it, I am
>> not sure which information to provide to make the situation clearer, but
>> any help is greatly appreciated!
>>
>>
>> [1]
>> https://github.com/wikimedia/operations-deployment-charts/tree/master/charts/flink-session-cluster
>> [2]
>> https://github.com/wikimedia/operations-deployment-charts/tree/master/helmfile.d/services/rdf-streaming-updater
>> [3]
>> https://github.com/wikimedia/wikidata-query-rdf/tree/master/streaming-updater-producer
>> --
>>
>> Zbyszko Papierski (He/Him)
>>
>> Senior Software Engineer
>>
>> Wikimedia Foundation <https://wikimediafoundation.org/>
>>
>

-- 

Zbyszko Papierski (He/Him)

Senior Software Engineer

Wikimedia Foundation <https://wikimediafoundation.org/>

Reply via email to