Hi Matthias,

Can you make sure that node-1 and node-2 can talk to each other? It looks
to me that node-2 fails to open a connection to the other TaskManager.
Maybe the logs give some more insights. You can change the log level to
DEBUG to gather more information.

Cheers,
Till

On Tue, Feb 16, 2021 at 10:57 AM Matthias Seiler <
matthias.sei...@campus.tu-berlin.de> wrote:

> Hi Everyone,
>
> I'm trying to setup a Flink cluster in standealone mode with two
> machines. However, running a job throws the following exception:
> `org.apache.flink.runtime.io
> .network.netty.exception.LocalTransportException:
> Sending the partition request to 'null' failed`
>
> Here is some background:
>
> Machines:
> - node-1: JobManager, TaskManager
> - node-2: TaskManager
>
> flink-conf-yaml looks like this:
> ```
> jobmanager.rpc.address: node-1
> taskmanager.numberOfTaskSlots: 8
> parallelism.default: 2
> cluster.evenly-spread-out-slots: true
> ```
>
> Deploying the cluster works: I can see both TaskManagers in the WebUI.
>
> I ran the streaming WordCount example: `flink run
> flink-1.12.1/examples/streaming/WordCount.jar --input lorem-ipsum.txt`
> - the job has been submitted
> - job failed (with the above exception)
> - the log of the node-2 also shows the exception, the other logs are
> fine (graceful stop)
>
> I played around with the config and observed that
> - if parallelism is set to 1, node-1 gets all the slots and node-2 none
> - if parallelism is set to 2, each TaskManager occupies 1 TaskSlot (but
> fails because of node-2)
>
> I suspect, that the problem must be with the communication between
> TaskManagers
> - job runs successful if
>     - node-1 is the **only** node with x TaskManagers (tested with x=1
> and x=2)
>     - node-2 is the **only** node with x TaskManagers (tested with x=1
> and x=2)
> - job fails if
>     - node-1 **and** node-2 have one TaskManager
>
> The full exception is:
> ```
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error:
> org.apache.flink.client.program.ProgramInvocationException: Job failed
> (JobID: 547d4d29b3650883aa403cdb5eb1ba5c)
> // ... Job failed, Recovery is suppressed by
> NoRestartBackoffTimeStrategy, ...
> Caused by:
> org.apache.flink.runtime.io
> .network.netty.exception.LocalTransportException:
> Sending the partition request to 'null' failed.
>     at
> org.apache.flink.runtime.io
> .network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:137)
>     at
> org.apache.flink.runtime.io
> .network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:125)
>     at
>
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
>     at
>
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:551)
>     at
>
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
>     at
>
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
>     at
>
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
>     at
>
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
>     at
>
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:993)
>     at
>
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
>     at
>
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
>     at
>
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
>     at
>
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
>     at
>
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
>     at
>
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
>     at
>
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
>     at
>
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
>     at
>
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>     at
>
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>     at java.base/java.lang.Thread.run(Thread.java:834)
> Caused by: java.nio.channels.ClosedChannelException
>     at
>
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
>     ... 11 more
> ```
>
> Thanks in advance,
> Matthias
>
>

Reply via email to