Thanks so much, Xintong for guiding me through this. I looked at the Flink logs to see the errors. I had to change taskmanager.network.memory.max: 4gb and akka.ask.timeout: 240s to increase the number of tasks. Now, I am able to increase the number of Tasks/ aka Task vertices.
taskmanager.network.memory.fraction: 0.15 taskmanager.network.memory.max: 4gb taskmanager.network.memory.min: 500mb akka.ask.timeout: 240s On Tue, May 26, 2020 at 8:42 PM Xintong Song <tonysong...@gmail.com> wrote: > Could you also explain how do you set the parallelism when getting this > execution plan? > I'm asking because this json file itself only shows the resulted execution > plan. It is not clear to me what is not working as expected in your case. > E.g., you set the parallelism for an operator to 10 but the execution plan > only shows 5. > > Thank you~ > > Xintong Song > > > > On Wed, May 27, 2020 at 3:16 AM Vijay Balakrishnan <bvija...@gmail.com> > wrote: > >> Hi Xintong, >> Thanks for the excellent clarification for tasks. >> >> I attached a sample screenshot above and din't reflect the slots used and >> the tasks limit I was running into in that pic. >> >> I am attaching my Execution plan here. Please let me know how I can >> increase the nmber of tasks aka parallelism. As increase the parallelism, >> i run into this bottleneck with the tasks. >> >> BTW - The https://flink.apache.org/visualizer/ is a great start to see >> this. >> TIA, >> >> On Sun, May 24, 2020 at 7:52 PM Xintong Song <tonysong...@gmail.com> >> wrote: >> >>> Increasing network memory buffers (fraction, min, max) seems to increase >>>> tasks slightly. >>> >>> That's wired. I don't think the number of network memory buffers have >>> anything to do with the task amount. >>> >>> Let me try to clarify a few things. >>> >>> Please be aware that, how many tasks a Flink job has, and how many slots >>> a Flink cluster has, are two different things. >>> - The number of tasks are decided by your job's parallelism and >>> topology. E.g., if your job graph have 3 vertices A, B and C, with >>> parallelism 2, 3, 4 respectively. Then you would have totally 9 (2+3+4) >>> tasks. >>> - The number of slots are decided by number of TMs and slots-per-TM. >>> - For streaming jobs, you have to make sure the number of slots is >>> enough for executing all your tasks. The number of slots needed for >>> executing your job is by default the max parallelism of your job graph >>> vertices. Take the above example, you would need 4 slots, because it's the >>> max among all the vertices' parallelisms (2, 3, 4). >>> >>> In your case, the screenshot shows that you job has 9621 tasks in total >>> (not around 18000, the dark box shows total tasks while the green box shows >>> running tasks), and 600 slots are in use (658 - 58) suggesting that the max >>> parallelism of your job graph vertices is 600. >>> >>> If you want to increase the number of tasks, you should increase your >>> job parallelism. There are several ways to do that. >>> >>> - In your job codes (assuming you are using DataStream API) >>> - Use `StreamExecutionEnvironment#setParallelism()` to set >>> parallelism for all operators. >>> - Use `SingleOutputStreamOperator#setParallelism()` to set >>> parallelism for a specific operator. (Only supported for subclasses of >>> `SingleOutputStreamOperator`.) >>> - When submitting your job, use `-p <parallelism>` as an argument >>> for the `flink run` command, to set parallelism for all operators. >>> - Set `parallelism.default` in your `flink-conf.yaml`, to set a >>> default parallelism for your jobs. This will be used for jobs that have >>> not >>> set parallelism with neither of the above methods. >>> >>> >>> Thank you~ >>> >>> Xintong Song >>> >>> >>> >>> On Sat, May 23, 2020 at 1:11 AM Vijay Balakrishnan <bvija...@gmail.com> >>> wrote: >>> >>>> Hi Xintong, >>>> Thx for your reply. Increasing network memory buffers (fraction, min, >>>> max) seems to increase tasks slightly. >>>> >>>> Streaming job >>>> Standalone >>>> >>>> Vijay >>>> >>>> On Fri, May 22, 2020 at 2:49 AM Xintong Song <tonysong...@gmail.com> >>>> wrote: >>>> >>>>> Hi Vijay, >>>>> >>>>> I don't think your problem is related to number of opening files. The >>>>> parallelism of your job is decided before actually tries to open the >>>>> files. >>>>> And if the OS limit for opening files is reached, you should see a job >>>>> execution failure, instead of a success execution with a lower >>>>> parallelism. >>>>> >>>>> Could you share some more information about your use case? >>>>> >>>>> - What kind of job are your executing? Is it a streaming or batch >>>>> processing job? >>>>> - Which Flink deployment do you use? Standalone? Yarn? >>>>> - It would be helpful if you can share the Flink logs. >>>>> >>>>> >>>>> Thank you~ >>>>> >>>>> Xintong Song >>>>> >>>>> >>>>> >>>>> On Wed, May 20, 2020 at 11:50 PM Vijay Balakrishnan < >>>>> bvija...@gmail.com> wrote: >>>>> >>>>>> Hi, >>>>>> I have increased the number of slots available but the Job is not >>>>>> using all the slots but runs into this approximate 18000 Tasks limit. >>>>>> Looking into the source code, it seems to be opening file - >>>>>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java#L203 >>>>>> So, do I have to tune the ulimit or something similar at the Ubuntu >>>>>> O/S level to increase number of tasks available ? What I am confused >>>>>> about >>>>>> is the ulimit is per machine but the ExecutionGraph is across many >>>>>> machines >>>>>> ? Please pardon my ignorance here. Does number of tasks equate to number >>>>>> of >>>>>> open files. I am using 15 slots per TaskManager on AWS m5.4xlarge which >>>>>> has >>>>>> 16 vCPUs. >>>>>> >>>>>> TIA. >>>>>> >>>>>> On Tue, May 19, 2020 at 3:22 PM Vijay Balakrishnan < >>>>>> bvija...@gmail.com> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> Flink Dashboard UI seems to show tasks having a hard limit for Tasks >>>>>>> column around 18000 on a Ubuntu Linux box. >>>>>>> I kept increasing the number of slots per task manager to 15 and >>>>>>> number of slots increased to 705 but the slots to tasks >>>>>>> stayed at around 18000. Below 18000 tasks, the Flink Job is able to >>>>>>> start up. >>>>>>> Even though I increased the number of slots, it still works when 312 >>>>>>> slots are being used. >>>>>>> >>>>>>> taskmanager.numberOfTaskSlots: 15 >>>>>>> >>>>>>> What knob can I tune to increase the number of Tasks ? >>>>>>> >>>>>>> Pls find attached the Flink Dashboard UI. >>>>>>> >>>>>>> TIA, >>>>>>> >>>>>>>