Hi Vijay,

>From the information you provided (the configurations, error message &
screenshot), I'm not able to find out what is the problem and how to
resolve it.

The error message comes from a healthy task manager, who discovered that
another task manager is not responding. We would need to look into the *log
of the task manager that is not responding* to understand what's wrong with
it.

Thank you~

Xintong Song



On Fri, Jun 5, 2020 at 6:06 AM Vijay Balakrishnan <bvija...@gmail.com>
wrote:

> Thx a ton, Xintong.
> I am using this configuration now:
>  taskmanager.numberOfTaskSlots: 14
>     rest.server.max-content-length: 314572800
>     taskmanager.network.memory.fraction: 0.45
>     taskmanager.network.memory.max: 24gb
>     taskmanager.network.memory.min: 500mb
>     akka.ask.timeout: 240s
>     cluster.evenly-spread-out-slots: true
>     akka.tcp.timeout: 240s
> taskmanager.network.request-backoff.initial: 5000
> taskmanager.network.request-backoff.max: 30000
> web.timeout:1000000
>
> I still get an error on startup with loading the Flink jar. It resolves
> itself after failing on the 1st few tries. This is
> where taskmanager.network.request-backoff.initial: 5000 helped a little
> bit. Would like to get this Job starting successfully on the 1st try
> itself.Also attaching screenshot of error on Job failure.
> Exception:
> org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException:
> Connection for partition
> ce6b601e14b959de21d8351a6c5cf70c@1f2cd0d827586a4bc7b6f40ad2609db1 not
> reachable.
> at
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168)
> ...
> Caused by: java.io.IOException: Connecting the channel failed: Connecting
> to remote task manager + '/10.128.49.96:43060' has failed. This might
> indicate that the remote task manager has been lost.
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
> ... 7 more
> Caused by:
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
> Connecting to remote task manager + '/10.128.49.96:43060' has failed.
> This might indicate that the remote task manager has been lost.
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
> ...
> Caused by:
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
> Connection timed out: /10.128.49.96:43060
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:714)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340)
> ... 6 more
> Caused by: java.net.ConnectException: Connection timed out
> ... 10 more
>
> TIA,
>
>
>
> On Sun, May 31, 2020 at 8:08 PM Xintong Song <tonysong...@gmail.com>
> wrote:
>
>> Hi Vijay,
>>
>> The error message suggests that another task manager (10.127.106.54) is
>> not responding. This could happen when the remote task manager has failed
>> or under severe GC pressure. You would need to find the log of the remote
>> task manager to understand what is happening.
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Mon, Jun 1, 2020 at 4:57 AM Vijay Balakrishnan <bvija...@gmail.com>
>> wrote:
>>
>>> Hi All,
>>> The Job takes forever to startup and is now failing all the time to
>>> startup.
>>> Physical Memory:62.1 GB
>>> JVM Heap Size:15.0 GB
>>> Flink Managed Memory:10.5 GB
>>> Attached a TM screenshot.
>>>
>>> Tried increasing the following:
>>>
>>> taskmanager.numberOfTaskSlots: 10
>>> parallelism.default: 1
>>> rest.server.max-content-length: 314572800
>>> taskmanager.network.memory.fraction: 0.45
>>> taskmanager.network.memory.max: 24gb
>>> taskmanager.network.memory.min: 500mb
>>> akka.ask.timeout: 240s
>>> cluster.evenly-spread-out-slots: true
>>> taskmanager.network.netty.client.connectTimeoutSec: 240
>>> taskmanager.network.detailed-metrics: true
>>> taskmanager.network.memory.floating-buffers-per-gate: 16
>>> akka.tcp.timeout: 30s
>>>
>>> There are more than enough slots. Issue seems to be communicating over
>>> TCP with Remote Task managers ??
>>>
>>> Getting this exception on a TaskManager:
>>>
>>> 2020-05-31 20:37:31,436 INFO  org.apache.flink.runtime.taskmanager.Task
>>>                     - Window(TumblingEventTimeWindows(5000),
>>> EventTimeTrigger, MGroupingWindowAggregate,
>>> MGroupingAggregateWindowProcessing) (36/440)
>>> (921fe6761ce844a6850c5fc67326b221) switched from DEPLOYING to FAILED.
>>> org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException:
>>> Connection for partition
>>> faea47916a206dc8d014694ec72ab577@95f71b39868d4e23a180ce11653dc4ca not
>>> reachable.
>>> at
>>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168)
>>> at
>>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237)
>>> at
>>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215)
>>> at
>>> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65)
>>> at
>>> org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866)
>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>> at java.lang.Thread.run(Thread.java:748)
>>> --
>>> at
>>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168)
>>> at
>>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237)
>>> at
>>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215)
>>> at
>>> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65)
>>> at
>>> org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866)
>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.io.IOException: Connecting the channel failed:
>>> Connecting to remote task manager + '/10.127.106.54:33564' has failed.
>>> This might indicate that the remote task manager has been lost.
>>> at
>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
>>> at
>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:134)
>>> at
>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:70)
>>> at
>>> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:68)
>>> at
>>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:165)
>>> ... 7 more
>>> --
>>> at
>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
>>> at
>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:134)
>>> at
>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:70)
>>> at
>>> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:68)
>>> at
>>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:165)
>>> ... 7 more
>>> Caused by:
>>> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
>>> Connecting to remote task manager + '/10.127.106.54:33564' has failed.
>>> This might indicate that the remote task manager has been lost.
>>> at
>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
>>> at
>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:134)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:121)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:327)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:343)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
>>> ... 1 more
>>> --
>>> at
>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
>>> at
>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:134)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:121)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:327)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:343)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
>>> ... 1 more
>>> Caused by:
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
>>> Connection timed out: /10.127.106.54:33564
>>>
>>>
>>> On Fri, May 29, 2020 at 12:43 PM Vijay Balakrishnan <bvija...@gmail.com>
>>> wrote:
>>>
>>>> Thx, Xintong for the detailed explanation of memory fraction. I
>>>> increased the mem fraction now.
>>>>
>>>> As I increase the defaultParallelism, I keep getting this error:
>>>>
>>>> org.apache.flink.runtime.io.network.partition.consumer.
>>>> PartitionConnectionException: Connection for partition
>>>> e312b2db4d1d0c65224664f620d06f7d@c3a4fb1e56a0996d9f2ff86dac6e483f not
>>>> reachable.
>>>>     at org.apache.flink.runtime.io.network.partition.consumer.
>>>> RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168)
>>>>     at org.apache.flink.runtime.io.network.partition.consumer.
>>>> SingleInputGate.requestPartitions(SingleInputGate.java:237)
>>>>     at org.apache.flink.runtime.io.network.partition.consumer.
>>>> SingleInputGate.setup(SingleInputGate.java:215)
>>>>     at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(
>>>> InputGateWithMetrics.java:65)
>>>>     at org.apache.flink.runtime.taskmanager.Task
>>>> .setupPartitionsAndGates(Task.java:866)
>>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621)
>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>>>     at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: java.io.IOException: Connecting the channel failed:
>>>> Connecting to remote task manager + '/10.9.239.218:45544' has failed.
>>>> This might indicate that the remote task manager has been lost.
>>>>     at org.apache.flink.runtime.io.network.netty.
>>>> PartitionRequestClientFactory$ConnectingChannel.waitForChannel(
>>>> PartitionRequestClientFactory.java:197)
>>>>     at org.apache.flink.runtime.io.network.netty.
>>>> PartitionRequestClientFactory$ConnectingChannel.access$000(
>>>> PartitionRequestClientFactory.java:134)
>>>>     at org.apache.flink.runtime.io.network.netty.
>>>> PartitionRequestClientFactory.createPartitionRequestClient(
>>>> PartitionRequestClientFactory.java:70)
>>>>     at org.apache.flink.runtime.io.network.netty.NettyConnectionManager
>>>> .createPartitionRequestClient(NettyConnectionManager.java:68)
>>>>     at org.apache.flink.runtime.io.network.partition.consumer.
>>>> RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:165)
>>>>     ... 7 more
>>>> Caused by: org.apache.flink.runtime.io.network.netty.exception.
>>>> RemoteTransportException: Connecting to remote task manager + '/
>>>> 10.9.239.218:45544' has failed. This might indicate that the remote
>>>> task manager has been lost.
>>>>     at org.apache.flink.runtime.io.network.netty.
>>>> PartitionRequestClientFactory$ConnectingChannel.operationComplete(
>>>> PartitionRequestClientFactory.java:220)
>>>>     at org.apache.flink.runtime.io.network.netty.
>>>> PartitionRequestClientFactory$ConnectingChannel.operationComplete(
>>>> PartitionRequestClientFactory.java:134)
>>>>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
>>>> DefaultPromise.notifyListener0(DefaultPromise.java:511)
>>>>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
>>>> DefaultPromise.notifyListeners0(DefaultPromise.java:504)
>>>>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
>>>> DefaultPromise.notifyListenersNow(DefaultPromise.java:483)
>>>>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
>>>> DefaultPromise.notifyListeners(DefaultPromise.java:424)
>>>>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
>>>> DefaultPromise.tryFailure(DefaultPromise.java:121)
>>>>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.
>>>> AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(
>>>> AbstractNioChannel.java:327)
>>>>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.
>>>> AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel
>>>> .java:343)
>>>>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop
>>>> .processSelectedKey(NioEventLoop.java:644)
>>>>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop
>>>> .processSelectedKeysOptimized(NioEventLoop.java:591)
>>>>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop
>>>> .processSelectedKeys(NioEventLoop.java:508)
>>>>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop
>>>> .run(NioEventLoop.java:470)
>>>>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
>>>> SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
>>>>     ... 1 more
>>>> Caused by: org.apache.flink.shaded.netty4.io.netty.channel.
>>>> AbstractChannel$AnnotatedConnectException: Connection timed out: /10.9.
>>>> 239.218:45544
>>>>     at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>>>>     at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl
>>>> .java:714)
>>>>     at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.
>>>> NioSocketChannel.doFinishConnect(NioSocketChannel.java:327)
>>>>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.
>>>> AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel
>>>> .java:340)
>>>>     ... 6 more
>>>> Caused by: java.net.ConnectException: Connection timed out
>>>>     ... 10 more
>>>>
>>>>
>>>> On Wed, May 27, 2020 at 7:14 PM Xintong Song <tonysong...@gmail.com>
>>>> wrote:
>>>>
>>>>> Ah, I guess I had misunderstood what your mean.
>>>>>
>>>>> Below 18000 tasks, the Flink Job is able to start up.
>>>>>> Even though I increased the number of slots, it still works when 312
>>>>>> slots are being used.
>>>>>>
>>>>> When you say "it still works", I thought that you increased the
>>>>> parallelism the job was sill executed as the parallelism was not 
>>>>> increased.
>>>>> From your latest reply, it seems the job's parallelism is indeed
>>>>> increased, but then it runs into failures.
>>>>>
>>>>> The reason you run into the "Insufficient number of network buffers"
>>>>> exception, is that with more tasks in your job, more inter-task data
>>>>> transmission channels, thus memory for network buffers, are needed.
>>>>>
>>>>> To increase the network memory size, the following configuration
>>>>> options, as you already found, are related.
>>>>>
>>>>>    - taskmanager.network.memory.fraction
>>>>>    - taskmanager.network.memory.max
>>>>>    - taskmanager.network.memory.min
>>>>>
>>>>> Please be aware that `taskmanager.memory.task.off-heap.size` is not
>>>>> related to network memory, and is only available in Flink 1.10 and above
>>>>> while you're using 1.9.1 as suggested by the screenshots.
>>>>>
>>>>> The network memory size is calculated as `min(max(some_total_value *
>>>>> network_fraction, network_min), network_max)`. According to the error
>>>>> message, your current network memory size is `85922 buffers *
>>>>> 32KB/buffer = 2685MB`, smaller than your "max" (4gb). That means
>>>>> increasing the "max" does not help in your case. It is the "fraction" that
>>>>> you need to increase.
>>>>>
>>>>> Thank you~
>>>>>
>>>>> Xintong Song
>>>>>
>>>>>
>>>>>
>>>>> On Thu, May 28, 2020 at 9:30 AM Vijay Balakrishnan <bvija...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Xintong,
>>>>>> Looks like the issue is not fully resolved :( Attaching 2 screenshots
>>>>>> of the memory consumption of 1 of the TaskManagers.
>>>>>>
>>>>>> To increase the used up Direct memory off heap,Do I change this:
>>>>>>  taskmanager.memory.task.off-heap.size: 5gb
>>>>>>
>>>>>> I had increased the taskmanager.network.memory.max: 24gb
>>>>>> which seems excessive.
>>>>>>
>>>>>> 1 of the errors I saw in the Flink logs:
>>>>>>
>>>>>> java.io.IOException: Insufficient number of network buffers: required
>>>>>> 1, but only 0 available. The total number of network buffers is currently
>>>>>> set to 85922 of 32768 bytes each. You can increase this number by setting
>>>>>> the configuration keys 'taskmanager.network.memory.fraction',
>>>>>> 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
>>>>>> at
>>>>>> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:281)
>>>>>> at
>>>>>> org.apache.flink.runtime.io.network.partition.ResultPartitionFactory.lambda$createBufferPoolFactory$0(ResultPartitionFactory.java:191)
>>>>>>
>>>>>> TIA,
>>>>>>
>>>>>>
>>>>>> On Wed, May 27, 2020 at 9:06 AM Vijay Balakrishnan <
>>>>>> bvija...@gmail.com> wrote:
>>>>>>
>>>>>>> Thanks so much, Xintong for guiding me through this. I looked at the
>>>>>>> Flink logs to see the errors.
>>>>>>> I had to change taskmanager.network.memory.max: 4gb
>>>>>>> and akka.ask.timeout: 240s to increase the number of tasks.
>>>>>>> Now, I am able to increase the number of Tasks/ aka Task vertices.
>>>>>>>
>>>>>>> taskmanager.network.memory.fraction: 0.15
>>>>>>> taskmanager.network.memory.max: 4gb
>>>>>>> taskmanager.network.memory.min: 500mb
>>>>>>> akka.ask.timeout: 240s
>>>>>>>
>>>>>>> On Tue, May 26, 2020 at 8:42 PM Xintong Song <tonysong...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Could you also explain how do you set the parallelism when getting
>>>>>>>> this execution plan?
>>>>>>>> I'm asking because this json file itself only shows the resulted
>>>>>>>> execution plan. It is not clear to me what is not working as expected 
>>>>>>>> in
>>>>>>>> your case. E.g., you set the parallelism for an operator to 10 but the
>>>>>>>> execution plan only shows 5.
>>>>>>>>
>>>>>>>> Thank you~
>>>>>>>>
>>>>>>>> Xintong Song
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, May 27, 2020 at 3:16 AM Vijay Balakrishnan <
>>>>>>>> bvija...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Xintong,
>>>>>>>>> Thanks for the excellent clarification for tasks.
>>>>>>>>>
>>>>>>>>> I attached a sample screenshot above and din't reflect the slots
>>>>>>>>> used and the tasks limit I was running into in that pic.
>>>>>>>>>
>>>>>>>>> I am attaching my Execution plan here. Please let me know how I
>>>>>>>>> can increase the nmber of tasks aka parallelism. As  increase the
>>>>>>>>> parallelism, i run into this bottleneck with the tasks.
>>>>>>>>>
>>>>>>>>> BTW - The https://flink.apache.org/visualizer/ is a great start
>>>>>>>>> to see this.
>>>>>>>>> TIA,
>>>>>>>>>
>>>>>>>>> On Sun, May 24, 2020 at 7:52 PM Xintong Song <
>>>>>>>>> tonysong...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Increasing network memory buffers (fraction, min, max) seems to
>>>>>>>>>>> increase tasks slightly.
>>>>>>>>>>
>>>>>>>>>> That's wired. I don't think the number of network memory buffers
>>>>>>>>>> have anything to do with the task amount.
>>>>>>>>>>
>>>>>>>>>> Let me try to clarify a few things.
>>>>>>>>>>
>>>>>>>>>> Please be aware that, how many tasks a Flink job has, and how
>>>>>>>>>> many slots a Flink cluster has, are two different things.
>>>>>>>>>> - The number of tasks are decided by your job's parallelism and
>>>>>>>>>> topology. E.g., if your job graph have 3 vertices A, B and C, with
>>>>>>>>>> parallelism 2, 3, 4 respectively. Then you would have totally 9 
>>>>>>>>>> (2+3+4)
>>>>>>>>>> tasks.
>>>>>>>>>> - The number of slots are decided by number of TMs and
>>>>>>>>>> slots-per-TM.
>>>>>>>>>> - For streaming jobs, you have to make sure the number of slots
>>>>>>>>>> is enough for executing all your tasks. The number of slots needed 
>>>>>>>>>> for
>>>>>>>>>> executing your job is by default the max parallelism of your job 
>>>>>>>>>> graph
>>>>>>>>>> vertices. Take the above example, you would need 4 slots, because 
>>>>>>>>>> it's the
>>>>>>>>>> max among all the vertices' parallelisms (2, 3, 4).
>>>>>>>>>>
>>>>>>>>>> In your case, the screenshot shows that you job has 9621 tasks in
>>>>>>>>>> total (not around 18000, the dark box shows total tasks while the 
>>>>>>>>>> green box
>>>>>>>>>> shows running tasks), and 600 slots are in use (658 - 58) suggesting 
>>>>>>>>>> that
>>>>>>>>>> the max parallelism of your job graph vertices is 600.
>>>>>>>>>>
>>>>>>>>>> If you want to increase the number of tasks, you should increase
>>>>>>>>>> your job parallelism. There are several ways to do that.
>>>>>>>>>>
>>>>>>>>>>    - In your job codes (assuming you are using DataStream API)
>>>>>>>>>>       - Use `StreamExecutionEnvironment#setParallelism()` to set
>>>>>>>>>>       parallelism for all operators.
>>>>>>>>>>       - Use `SingleOutputStreamOperator#setParallelism()` to set
>>>>>>>>>>       parallelism for a specific operator. (Only supported for 
>>>>>>>>>> subclasses of
>>>>>>>>>>       `SingleOutputStreamOperator`.)
>>>>>>>>>>    - When submitting your job, use `-p <parallelism>` as an
>>>>>>>>>>    argument for the `flink run` command, to set parallelism for all 
>>>>>>>>>> operators.
>>>>>>>>>>    - Set `parallelism.default` in your `flink-conf.yaml`, to set
>>>>>>>>>>    a default parallelism for your jobs. This will be used for jobs 
>>>>>>>>>> that have
>>>>>>>>>>    not set parallelism with neither of the above methods.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Thank you~
>>>>>>>>>>
>>>>>>>>>> Xintong Song
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Sat, May 23, 2020 at 1:11 AM Vijay Balakrishnan <
>>>>>>>>>> bvija...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Xintong,
>>>>>>>>>>> Thx for your reply.  Increasing network memory buffers
>>>>>>>>>>> (fraction, min, max) seems to increase tasks slightly.
>>>>>>>>>>>
>>>>>>>>>>> Streaming job
>>>>>>>>>>> Standalone
>>>>>>>>>>>
>>>>>>>>>>> Vijay
>>>>>>>>>>>
>>>>>>>>>>> On Fri, May 22, 2020 at 2:49 AM Xintong Song <
>>>>>>>>>>> tonysong...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Vijay,
>>>>>>>>>>>>
>>>>>>>>>>>> I don't think your problem is related to number of opening
>>>>>>>>>>>> files. The parallelism of your job is decided before actually 
>>>>>>>>>>>> tries to open
>>>>>>>>>>>> the files. And if the OS limit for opening files is reached, you 
>>>>>>>>>>>> should see
>>>>>>>>>>>> a job execution failure, instead of a success execution with a 
>>>>>>>>>>>> lower
>>>>>>>>>>>> parallelism.
>>>>>>>>>>>>
>>>>>>>>>>>> Could you share some more information about your use case?
>>>>>>>>>>>>
>>>>>>>>>>>>    - What kind of job are your executing? Is it a streaming or
>>>>>>>>>>>>    batch processing job?
>>>>>>>>>>>>    - Which Flink deployment do you use? Standalone? Yarn?
>>>>>>>>>>>>    - It would be helpful if you can share the Flink logs.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Thank you~
>>>>>>>>>>>>
>>>>>>>>>>>> Xintong Song
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, May 20, 2020 at 11:50 PM Vijay Balakrishnan <
>>>>>>>>>>>> bvija...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>> I have increased the number of slots available but the Job is
>>>>>>>>>>>>> not using all the slots but runs into this approximate 18000 
>>>>>>>>>>>>> Tasks limit.
>>>>>>>>>>>>> Looking into the source code, it seems to be opening file -
>>>>>>>>>>>>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java#L203
>>>>>>>>>>>>> So, do I have to tune the ulimit or something similar at the
>>>>>>>>>>>>> Ubuntu O/S level to increase number of tasks available ? What I 
>>>>>>>>>>>>> am confused
>>>>>>>>>>>>> about is the ulimit is per machine but the ExecutionGraph is 
>>>>>>>>>>>>> across many
>>>>>>>>>>>>> machines ? Please pardon my ignorance here. Does number of tasks 
>>>>>>>>>>>>> equate to
>>>>>>>>>>>>> number of open files. I am using 15 slots per TaskManager on AWS 
>>>>>>>>>>>>> m5.4xlarge
>>>>>>>>>>>>> which has 16 vCPUs.
>>>>>>>>>>>>>
>>>>>>>>>>>>> TIA.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Tue, May 19, 2020 at 3:22 PM Vijay Balakrishnan <
>>>>>>>>>>>>> bvija...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Flink Dashboard UI seems to show tasks having a hard limit
>>>>>>>>>>>>>> for Tasks column around 18000 on a Ubuntu Linux box.
>>>>>>>>>>>>>> I kept increasing the number of slots per task manager to 15
>>>>>>>>>>>>>> and number of slots increased to 705 but the slots to tasks
>>>>>>>>>>>>>> stayed at around 18000. Below 18000 tasks, the Flink Job is
>>>>>>>>>>>>>> able to start up.
>>>>>>>>>>>>>> Even though I increased the number of slots, it still works
>>>>>>>>>>>>>> when 312 slots are being used.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> taskmanager.numberOfTaskSlots: 15
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> What knob can I tune to increase the number of Tasks ?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Pls find attached the Flink Dashboard UI.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> TIA,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>

Reply via email to