Hi Vijay, >From the information you provided (the configurations, error message & screenshot), I'm not able to find out what is the problem and how to resolve it.
The error message comes from a healthy task manager, who discovered that another task manager is not responding. We would need to look into the *log of the task manager that is not responding* to understand what's wrong with it. Thank you~ Xintong Song On Fri, Jun 5, 2020 at 6:06 AM Vijay Balakrishnan <bvija...@gmail.com> wrote: > Thx a ton, Xintong. > I am using this configuration now: > taskmanager.numberOfTaskSlots: 14 > rest.server.max-content-length: 314572800 > taskmanager.network.memory.fraction: 0.45 > taskmanager.network.memory.max: 24gb > taskmanager.network.memory.min: 500mb > akka.ask.timeout: 240s > cluster.evenly-spread-out-slots: true > akka.tcp.timeout: 240s > taskmanager.network.request-backoff.initial: 5000 > taskmanager.network.request-backoff.max: 30000 > web.timeout:1000000 > > I still get an error on startup with loading the Flink jar. It resolves > itself after failing on the 1st few tries. This is > where taskmanager.network.request-backoff.initial: 5000 helped a little > bit. Would like to get this Job starting successfully on the 1st try > itself.Also attaching screenshot of error on Job failure. > Exception: > org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException: > Connection for partition > ce6b601e14b959de21d8351a6c5cf70c@1f2cd0d827586a4bc7b6f40ad2609db1 not > reachable. > at > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168) > ... > Caused by: java.io.IOException: Connecting the channel failed: Connecting > to remote task manager + '/10.128.49.96:43060' has failed. This might > indicate that the remote task manager has been lost. > at > org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197) > ... 7 more > Caused by: > org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: > Connecting to remote task manager + '/10.128.49.96:43060' has failed. > This might indicate that the remote task manager has been lost. > at > org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220) > ... > Caused by: > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: > Connection timed out: /10.128.49.96:43060 > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) > at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:714) > at > org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340) > ... 6 more > Caused by: java.net.ConnectException: Connection timed out > ... 10 more > > TIA, > > > > On Sun, May 31, 2020 at 8:08 PM Xintong Song <tonysong...@gmail.com> > wrote: > >> Hi Vijay, >> >> The error message suggests that another task manager (10.127.106.54) is >> not responding. This could happen when the remote task manager has failed >> or under severe GC pressure. You would need to find the log of the remote >> task manager to understand what is happening. >> >> Thank you~ >> >> Xintong Song >> >> >> >> On Mon, Jun 1, 2020 at 4:57 AM Vijay Balakrishnan <bvija...@gmail.com> >> wrote: >> >>> Hi All, >>> The Job takes forever to startup and is now failing all the time to >>> startup. >>> Physical Memory:62.1 GB >>> JVM Heap Size:15.0 GB >>> Flink Managed Memory:10.5 GB >>> Attached a TM screenshot. >>> >>> Tried increasing the following: >>> >>> taskmanager.numberOfTaskSlots: 10 >>> parallelism.default: 1 >>> rest.server.max-content-length: 314572800 >>> taskmanager.network.memory.fraction: 0.45 >>> taskmanager.network.memory.max: 24gb >>> taskmanager.network.memory.min: 500mb >>> akka.ask.timeout: 240s >>> cluster.evenly-spread-out-slots: true >>> taskmanager.network.netty.client.connectTimeoutSec: 240 >>> taskmanager.network.detailed-metrics: true >>> taskmanager.network.memory.floating-buffers-per-gate: 16 >>> akka.tcp.timeout: 30s >>> >>> There are more than enough slots. Issue seems to be communicating over >>> TCP with Remote Task managers ?? >>> >>> Getting this exception on a TaskManager: >>> >>> 2020-05-31 20:37:31,436 INFO org.apache.flink.runtime.taskmanager.Task >>> - Window(TumblingEventTimeWindows(5000), >>> EventTimeTrigger, MGroupingWindowAggregate, >>> MGroupingAggregateWindowProcessing) (36/440) >>> (921fe6761ce844a6850c5fc67326b221) switched from DEPLOYING to FAILED. >>> org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException: >>> Connection for partition >>> faea47916a206dc8d014694ec72ab577@95f71b39868d4e23a180ce11653dc4ca not >>> reachable. >>> at >>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168) >>> at >>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237) >>> at >>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215) >>> at >>> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65) >>> at >>> org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866) >>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) >>> at java.lang.Thread.run(Thread.java:748) >>> -- >>> at >>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168) >>> at >>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237) >>> at >>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215) >>> at >>> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65) >>> at >>> org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866) >>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) >>> at java.lang.Thread.run(Thread.java:748) >>> Caused by: java.io.IOException: Connecting the channel failed: >>> Connecting to remote task manager + '/10.127.106.54:33564' has failed. >>> This might indicate that the remote task manager has been lost. >>> at >>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197) >>> at >>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:134) >>> at >>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:70) >>> at >>> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:68) >>> at >>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:165) >>> ... 7 more >>> -- >>> at >>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197) >>> at >>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:134) >>> at >>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:70) >>> at >>> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:68) >>> at >>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:165) >>> ... 7 more >>> Caused by: >>> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: >>> Connecting to remote task manager + '/10.127.106.54:33564' has failed. >>> This might indicate that the remote task manager has been lost. >>> at >>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220) >>> at >>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:134) >>> at >>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511) >>> at >>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504) >>> at >>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483) >>> at >>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424) >>> at >>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:121) >>> at >>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:327) >>> at >>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:343) >>> at >>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644) >>> at >>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591) >>> at >>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508) >>> at >>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470) >>> at >>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909) >>> ... 1 more >>> -- >>> at >>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220) >>> at >>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:134) >>> at >>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511) >>> at >>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504) >>> at >>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483) >>> at >>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424) >>> at >>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:121) >>> at >>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:327) >>> at >>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:343) >>> at >>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644) >>> at >>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591) >>> at >>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508) >>> at >>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470) >>> at >>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909) >>> ... 1 more >>> Caused by: >>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: >>> Connection timed out: /10.127.106.54:33564 >>> >>> >>> On Fri, May 29, 2020 at 12:43 PM Vijay Balakrishnan <bvija...@gmail.com> >>> wrote: >>> >>>> Thx, Xintong for the detailed explanation of memory fraction. I >>>> increased the mem fraction now. >>>> >>>> As I increase the defaultParallelism, I keep getting this error: >>>> >>>> org.apache.flink.runtime.io.network.partition.consumer. >>>> PartitionConnectionException: Connection for partition >>>> e312b2db4d1d0c65224664f620d06f7d@c3a4fb1e56a0996d9f2ff86dac6e483f not >>>> reachable. >>>> at org.apache.flink.runtime.io.network.partition.consumer. >>>> RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168) >>>> at org.apache.flink.runtime.io.network.partition.consumer. >>>> SingleInputGate.requestPartitions(SingleInputGate.java:237) >>>> at org.apache.flink.runtime.io.network.partition.consumer. >>>> SingleInputGate.setup(SingleInputGate.java:215) >>>> at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup( >>>> InputGateWithMetrics.java:65) >>>> at org.apache.flink.runtime.taskmanager.Task >>>> .setupPartitionsAndGates(Task.java:866) >>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) >>>> at java.lang.Thread.run(Thread.java:748) >>>> Caused by: java.io.IOException: Connecting the channel failed: >>>> Connecting to remote task manager + '/10.9.239.218:45544' has failed. >>>> This might indicate that the remote task manager has been lost. >>>> at org.apache.flink.runtime.io.network.netty. >>>> PartitionRequestClientFactory$ConnectingChannel.waitForChannel( >>>> PartitionRequestClientFactory.java:197) >>>> at org.apache.flink.runtime.io.network.netty. >>>> PartitionRequestClientFactory$ConnectingChannel.access$000( >>>> PartitionRequestClientFactory.java:134) >>>> at org.apache.flink.runtime.io.network.netty. >>>> PartitionRequestClientFactory.createPartitionRequestClient( >>>> PartitionRequestClientFactory.java:70) >>>> at org.apache.flink.runtime.io.network.netty.NettyConnectionManager >>>> .createPartitionRequestClient(NettyConnectionManager.java:68) >>>> at org.apache.flink.runtime.io.network.partition.consumer. >>>> RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:165) >>>> ... 7 more >>>> Caused by: org.apache.flink.runtime.io.network.netty.exception. >>>> RemoteTransportException: Connecting to remote task manager + '/ >>>> 10.9.239.218:45544' has failed. This might indicate that the remote >>>> task manager has been lost. >>>> at org.apache.flink.runtime.io.network.netty. >>>> PartitionRequestClientFactory$ConnectingChannel.operationComplete( >>>> PartitionRequestClientFactory.java:220) >>>> at org.apache.flink.runtime.io.network.netty. >>>> PartitionRequestClientFactory$ConnectingChannel.operationComplete( >>>> PartitionRequestClientFactory.java:134) >>>> at org.apache.flink.shaded.netty4.io.netty.util.concurrent. >>>> DefaultPromise.notifyListener0(DefaultPromise.java:511) >>>> at org.apache.flink.shaded.netty4.io.netty.util.concurrent. >>>> DefaultPromise.notifyListeners0(DefaultPromise.java:504) >>>> at org.apache.flink.shaded.netty4.io.netty.util.concurrent. >>>> DefaultPromise.notifyListenersNow(DefaultPromise.java:483) >>>> at org.apache.flink.shaded.netty4.io.netty.util.concurrent. >>>> DefaultPromise.notifyListeners(DefaultPromise.java:424) >>>> at org.apache.flink.shaded.netty4.io.netty.util.concurrent. >>>> DefaultPromise.tryFailure(DefaultPromise.java:121) >>>> at org.apache.flink.shaded.netty4.io.netty.channel.nio. >>>> AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise( >>>> AbstractNioChannel.java:327) >>>> at org.apache.flink.shaded.netty4.io.netty.channel.nio. >>>> AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel >>>> .java:343) >>>> at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop >>>> .processSelectedKey(NioEventLoop.java:644) >>>> at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop >>>> .processSelectedKeysOptimized(NioEventLoop.java:591) >>>> at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop >>>> .processSelectedKeys(NioEventLoop.java:508) >>>> at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop >>>> .run(NioEventLoop.java:470) >>>> at org.apache.flink.shaded.netty4.io.netty.util.concurrent. >>>> SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909) >>>> ... 1 more >>>> Caused by: org.apache.flink.shaded.netty4.io.netty.channel. >>>> AbstractChannel$AnnotatedConnectException: Connection timed out: /10.9. >>>> 239.218:45544 >>>> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) >>>> at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl >>>> .java:714) >>>> at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio. >>>> NioSocketChannel.doFinishConnect(NioSocketChannel.java:327) >>>> at org.apache.flink.shaded.netty4.io.netty.channel.nio. >>>> AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel >>>> .java:340) >>>> ... 6 more >>>> Caused by: java.net.ConnectException: Connection timed out >>>> ... 10 more >>>> >>>> >>>> On Wed, May 27, 2020 at 7:14 PM Xintong Song <tonysong...@gmail.com> >>>> wrote: >>>> >>>>> Ah, I guess I had misunderstood what your mean. >>>>> >>>>> Below 18000 tasks, the Flink Job is able to start up. >>>>>> Even though I increased the number of slots, it still works when 312 >>>>>> slots are being used. >>>>>> >>>>> When you say "it still works", I thought that you increased the >>>>> parallelism the job was sill executed as the parallelism was not >>>>> increased. >>>>> From your latest reply, it seems the job's parallelism is indeed >>>>> increased, but then it runs into failures. >>>>> >>>>> The reason you run into the "Insufficient number of network buffers" >>>>> exception, is that with more tasks in your job, more inter-task data >>>>> transmission channels, thus memory for network buffers, are needed. >>>>> >>>>> To increase the network memory size, the following configuration >>>>> options, as you already found, are related. >>>>> >>>>> - taskmanager.network.memory.fraction >>>>> - taskmanager.network.memory.max >>>>> - taskmanager.network.memory.min >>>>> >>>>> Please be aware that `taskmanager.memory.task.off-heap.size` is not >>>>> related to network memory, and is only available in Flink 1.10 and above >>>>> while you're using 1.9.1 as suggested by the screenshots. >>>>> >>>>> The network memory size is calculated as `min(max(some_total_value * >>>>> network_fraction, network_min), network_max)`. According to the error >>>>> message, your current network memory size is `85922 buffers * >>>>> 32KB/buffer = 2685MB`, smaller than your "max" (4gb). That means >>>>> increasing the "max" does not help in your case. It is the "fraction" that >>>>> you need to increase. >>>>> >>>>> Thank you~ >>>>> >>>>> Xintong Song >>>>> >>>>> >>>>> >>>>> On Thu, May 28, 2020 at 9:30 AM Vijay Balakrishnan <bvija...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Xintong, >>>>>> Looks like the issue is not fully resolved :( Attaching 2 screenshots >>>>>> of the memory consumption of 1 of the TaskManagers. >>>>>> >>>>>> To increase the used up Direct memory off heap,Do I change this: >>>>>> taskmanager.memory.task.off-heap.size: 5gb >>>>>> >>>>>> I had increased the taskmanager.network.memory.max: 24gb >>>>>> which seems excessive. >>>>>> >>>>>> 1 of the errors I saw in the Flink logs: >>>>>> >>>>>> java.io.IOException: Insufficient number of network buffers: required >>>>>> 1, but only 0 available. The total number of network buffers is currently >>>>>> set to 85922 of 32768 bytes each. You can increase this number by setting >>>>>> the configuration keys 'taskmanager.network.memory.fraction', >>>>>> 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'. >>>>>> at >>>>>> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:281) >>>>>> at >>>>>> org.apache.flink.runtime.io.network.partition.ResultPartitionFactory.lambda$createBufferPoolFactory$0(ResultPartitionFactory.java:191) >>>>>> >>>>>> TIA, >>>>>> >>>>>> >>>>>> On Wed, May 27, 2020 at 9:06 AM Vijay Balakrishnan < >>>>>> bvija...@gmail.com> wrote: >>>>>> >>>>>>> Thanks so much, Xintong for guiding me through this. I looked at the >>>>>>> Flink logs to see the errors. >>>>>>> I had to change taskmanager.network.memory.max: 4gb >>>>>>> and akka.ask.timeout: 240s to increase the number of tasks. >>>>>>> Now, I am able to increase the number of Tasks/ aka Task vertices. >>>>>>> >>>>>>> taskmanager.network.memory.fraction: 0.15 >>>>>>> taskmanager.network.memory.max: 4gb >>>>>>> taskmanager.network.memory.min: 500mb >>>>>>> akka.ask.timeout: 240s >>>>>>> >>>>>>> On Tue, May 26, 2020 at 8:42 PM Xintong Song <tonysong...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Could you also explain how do you set the parallelism when getting >>>>>>>> this execution plan? >>>>>>>> I'm asking because this json file itself only shows the resulted >>>>>>>> execution plan. It is not clear to me what is not working as expected >>>>>>>> in >>>>>>>> your case. E.g., you set the parallelism for an operator to 10 but the >>>>>>>> execution plan only shows 5. >>>>>>>> >>>>>>>> Thank you~ >>>>>>>> >>>>>>>> Xintong Song >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Wed, May 27, 2020 at 3:16 AM Vijay Balakrishnan < >>>>>>>> bvija...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hi Xintong, >>>>>>>>> Thanks for the excellent clarification for tasks. >>>>>>>>> >>>>>>>>> I attached a sample screenshot above and din't reflect the slots >>>>>>>>> used and the tasks limit I was running into in that pic. >>>>>>>>> >>>>>>>>> I am attaching my Execution plan here. Please let me know how I >>>>>>>>> can increase the nmber of tasks aka parallelism. As increase the >>>>>>>>> parallelism, i run into this bottleneck with the tasks. >>>>>>>>> >>>>>>>>> BTW - The https://flink.apache.org/visualizer/ is a great start >>>>>>>>> to see this. >>>>>>>>> TIA, >>>>>>>>> >>>>>>>>> On Sun, May 24, 2020 at 7:52 PM Xintong Song < >>>>>>>>> tonysong...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Increasing network memory buffers (fraction, min, max) seems to >>>>>>>>>>> increase tasks slightly. >>>>>>>>>> >>>>>>>>>> That's wired. I don't think the number of network memory buffers >>>>>>>>>> have anything to do with the task amount. >>>>>>>>>> >>>>>>>>>> Let me try to clarify a few things. >>>>>>>>>> >>>>>>>>>> Please be aware that, how many tasks a Flink job has, and how >>>>>>>>>> many slots a Flink cluster has, are two different things. >>>>>>>>>> - The number of tasks are decided by your job's parallelism and >>>>>>>>>> topology. E.g., if your job graph have 3 vertices A, B and C, with >>>>>>>>>> parallelism 2, 3, 4 respectively. Then you would have totally 9 >>>>>>>>>> (2+3+4) >>>>>>>>>> tasks. >>>>>>>>>> - The number of slots are decided by number of TMs and >>>>>>>>>> slots-per-TM. >>>>>>>>>> - For streaming jobs, you have to make sure the number of slots >>>>>>>>>> is enough for executing all your tasks. The number of slots needed >>>>>>>>>> for >>>>>>>>>> executing your job is by default the max parallelism of your job >>>>>>>>>> graph >>>>>>>>>> vertices. Take the above example, you would need 4 slots, because >>>>>>>>>> it's the >>>>>>>>>> max among all the vertices' parallelisms (2, 3, 4). >>>>>>>>>> >>>>>>>>>> In your case, the screenshot shows that you job has 9621 tasks in >>>>>>>>>> total (not around 18000, the dark box shows total tasks while the >>>>>>>>>> green box >>>>>>>>>> shows running tasks), and 600 slots are in use (658 - 58) suggesting >>>>>>>>>> that >>>>>>>>>> the max parallelism of your job graph vertices is 600. >>>>>>>>>> >>>>>>>>>> If you want to increase the number of tasks, you should increase >>>>>>>>>> your job parallelism. There are several ways to do that. >>>>>>>>>> >>>>>>>>>> - In your job codes (assuming you are using DataStream API) >>>>>>>>>> - Use `StreamExecutionEnvironment#setParallelism()` to set >>>>>>>>>> parallelism for all operators. >>>>>>>>>> - Use `SingleOutputStreamOperator#setParallelism()` to set >>>>>>>>>> parallelism for a specific operator. (Only supported for >>>>>>>>>> subclasses of >>>>>>>>>> `SingleOutputStreamOperator`.) >>>>>>>>>> - When submitting your job, use `-p <parallelism>` as an >>>>>>>>>> argument for the `flink run` command, to set parallelism for all >>>>>>>>>> operators. >>>>>>>>>> - Set `parallelism.default` in your `flink-conf.yaml`, to set >>>>>>>>>> a default parallelism for your jobs. This will be used for jobs >>>>>>>>>> that have >>>>>>>>>> not set parallelism with neither of the above methods. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Thank you~ >>>>>>>>>> >>>>>>>>>> Xintong Song >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Sat, May 23, 2020 at 1:11 AM Vijay Balakrishnan < >>>>>>>>>> bvija...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Xintong, >>>>>>>>>>> Thx for your reply. Increasing network memory buffers >>>>>>>>>>> (fraction, min, max) seems to increase tasks slightly. >>>>>>>>>>> >>>>>>>>>>> Streaming job >>>>>>>>>>> Standalone >>>>>>>>>>> >>>>>>>>>>> Vijay >>>>>>>>>>> >>>>>>>>>>> On Fri, May 22, 2020 at 2:49 AM Xintong Song < >>>>>>>>>>> tonysong...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Vijay, >>>>>>>>>>>> >>>>>>>>>>>> I don't think your problem is related to number of opening >>>>>>>>>>>> files. The parallelism of your job is decided before actually >>>>>>>>>>>> tries to open >>>>>>>>>>>> the files. And if the OS limit for opening files is reached, you >>>>>>>>>>>> should see >>>>>>>>>>>> a job execution failure, instead of a success execution with a >>>>>>>>>>>> lower >>>>>>>>>>>> parallelism. >>>>>>>>>>>> >>>>>>>>>>>> Could you share some more information about your use case? >>>>>>>>>>>> >>>>>>>>>>>> - What kind of job are your executing? Is it a streaming or >>>>>>>>>>>> batch processing job? >>>>>>>>>>>> - Which Flink deployment do you use? Standalone? Yarn? >>>>>>>>>>>> - It would be helpful if you can share the Flink logs. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Thank you~ >>>>>>>>>>>> >>>>>>>>>>>> Xintong Song >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Wed, May 20, 2020 at 11:50 PM Vijay Balakrishnan < >>>>>>>>>>>> bvija...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi, >>>>>>>>>>>>> I have increased the number of slots available but the Job is >>>>>>>>>>>>> not using all the slots but runs into this approximate 18000 >>>>>>>>>>>>> Tasks limit. >>>>>>>>>>>>> Looking into the source code, it seems to be opening file - >>>>>>>>>>>>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java#L203 >>>>>>>>>>>>> So, do I have to tune the ulimit or something similar at the >>>>>>>>>>>>> Ubuntu O/S level to increase number of tasks available ? What I >>>>>>>>>>>>> am confused >>>>>>>>>>>>> about is the ulimit is per machine but the ExecutionGraph is >>>>>>>>>>>>> across many >>>>>>>>>>>>> machines ? Please pardon my ignorance here. Does number of tasks >>>>>>>>>>>>> equate to >>>>>>>>>>>>> number of open files. I am using 15 slots per TaskManager on AWS >>>>>>>>>>>>> m5.4xlarge >>>>>>>>>>>>> which has 16 vCPUs. >>>>>>>>>>>>> >>>>>>>>>>>>> TIA. >>>>>>>>>>>>> >>>>>>>>>>>>> On Tue, May 19, 2020 at 3:22 PM Vijay Balakrishnan < >>>>>>>>>>>>> bvija...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Flink Dashboard UI seems to show tasks having a hard limit >>>>>>>>>>>>>> for Tasks column around 18000 on a Ubuntu Linux box. >>>>>>>>>>>>>> I kept increasing the number of slots per task manager to 15 >>>>>>>>>>>>>> and number of slots increased to 705 but the slots to tasks >>>>>>>>>>>>>> stayed at around 18000. Below 18000 tasks, the Flink Job is >>>>>>>>>>>>>> able to start up. >>>>>>>>>>>>>> Even though I increased the number of slots, it still works >>>>>>>>>>>>>> when 312 slots are being used. >>>>>>>>>>>>>> >>>>>>>>>>>>>> taskmanager.numberOfTaskSlots: 15 >>>>>>>>>>>>>> >>>>>>>>>>>>>> What knob can I tune to increase the number of Tasks ? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Pls find attached the Flink Dashboard UI. >>>>>>>>>>>>>> >>>>>>>>>>>>>> TIA, >>>>>>>>>>>>>> >>>>>>>>>>>>>>