One observation here is that you're only reading from one file. This will mean that you won't get any parallelism. Everything is executed on just one task/thread.
Cheers, Aljoscha On Thu, 15 Sep 2016 at 01:24 amir bahmanyari <amirto...@yahoo.com> wrote: > Hi Aljoscha, > Experimenting on relatively smaller file , everything fixed except > KafkaIO() vs. TextIO(), I get 50% better runtime performance in the Flink > Cluster when reading tuples by TextIO(). > I understand the NW involvement in reading from Kafka topic etc., but 50% > is significant. > Also, I experimented 64 partitions in Kafka topic vs. 400. I get exact > same performance & increasing the topic partitions doesnt improve anything. > I thought some of the 64 slots may get multiple-over- parallelism really > pushing it to its limit. 64 kafka topic partitions & 400 kafka topic > partitions while #slots=64 is the same. > > Its still slow for a relatively large file though. > Pls advice if something I can try to improve the cluster performance. > Thanks+regards > > ------------------------------ > *From:* Aljoscha Krettek <aljos...@apache.org> > *To:* user@flink.apache.org; amir bahmanyari <amirto...@yahoo.com> > *Sent:* Wednesday, September 14, 2016 1:48 AM > *Subject:* Re: Fw: Flink Cluster Load Distribution Question > > Hi, > this is a different job from the Kafka Job that you have running, right? > > Could you maybe post the code for that as well? > > Cheers, > Aljoscha > > On Tue, 13 Sep 2016 at 20:14 amir bahmanyari <amirto...@yahoo.com> wrote: > > Hi Robert, > Sure, I am forwarding it to user. Sorry about that. I followed the > "robot's" instructions :)) > Topology: 4 Azure A11 CentOS 7 nodes (16 cores, 110 GB). Lets call them > node1, 2, 3, 4. > Flink Clustered with node1 running JM & a TM. Three more TM's running on > node2,3, and 4 respectively. > I have a Beam running FLink Runner underneath. > The input data is received by Beam TextIO() reading off a 1.6 GB of data > containing roughly 22 million tuples. > *All nodes have identical flink-conf.yam*l, masters & slaves contents as > follows: > > *flink-conf.yaml:* > jobmanager.rpc.address: node1 > jobmanager.rpc.port: 6123 > jobmanager.heap.mb: 1024 > taskmanager.heap.mb: 102400 > taskmanager.numberOfTaskSlots: 16 > taskmanager.memory.preallocate: false > parallelism.default: 64 > jobmanager.web.port: 8081 > taskmanager.network.numberOfBuffers: 4096 > > > > *masters*: > node1:8081 > > *slaves*: > node1 > node2 > node3 > node4 > > Everything looks normal at ./start-cluster.sh & all daemons start on all > nodes. > JM, TMs log files get generated on all nodes. > Dashboard shows how all slots are being used. > I deploy the Beam app to the cluster where JM is running at node1. > a *.out file gets generated as data is being processed. No *.out on other > nodes, just node1 where I deployed the fat jar. > I tail -f the *.out log on node1 (master). starts fine...but slowly > degrades & becomes extremely slow. > As we speak, I started the Beam app 13 hrs ago and its still running. > How can I prove that ALL NODES are involved in processing the data at the > same time i.e. clustered? > Do the above configurations look ok for a reasonable performance? > Given above parameters set, how can I improve the performance in this > cluster? > What other information and or dashboard screen shots is needed to clarify > this issue. > I used these websites to do the configuration: > Apache Flink: Cluster Setup > <https://ci.apache.org/projects/flink/flink-docs-release-0.8/cluster_setup.html> > > Apache Flink: Cluster Setup > > <https://ci.apache.org/projects/flink/flink-docs-release-0.8/cluster_setup.html> > > > Apache Flink: Configuration > <https://ci.apache.org/projects/flink/flink-docs-release-0.8/config.html> > > > Apache Flink: Configuration > <https://ci.apache.org/projects/flink/flink-docs-release-0.8/config.html> > > In the second link, there is a config recommendation for the following but > this parameter is not in the configuration file out of the box: > > - taskmanager.network.bufferSizeInBytes > > Should I include it manually? Does it make any difference if the default > value i.e.32 KB doesn't get picked up? > Sorry too many questions. > Pls let me know. > I appreciate your help. > Cheers, > Amir- > > ----- Forwarded Message ----- > *From:* Robert Metzger <rmetz...@apache.org> > *To:* "d...@flink.apache.org" <d...@flink.apache.org>; amir bahmanyari < > amirto...@yahoo.com> > *Sent:* Tuesday, September 13, 2016 1:15 AM > *Subject:* Re: Flink Cluster Load Distribution Question > > Hi Amir, > > I would recommend to post such questions to the user@flink mailing list in > the future. This list is meant for development-related topics. > > I think we need more details to understand why your application is not > running properly. Can you quickly describe what your topology is doing? > Are you setting the parallelism to a value >= 1 ? > > Regards, > Robert > > > On Tue, Sep 13, 2016 at 6:35 AM, amir bahmanyari < > amirto...@yahoo.com.invalid> wrote: > > > Hi Colleagues,Just joined this forum.I have done everything possible to > > get a 4 nodes Flink cluster to work peoperly & run a Beam app.It always > > generates system-output logs (*.out) in only one node. Its sooooooooo > slow > > for 4 nodes being there.Seems like the load is not distributed amongst > all > > 4 nodes but only one node. Most of the time the one where JM runs.I > > run/tested it in a single node, and it took even faster to run the same > > load.Not sure whats not being configured right.1- why am I getting > > SystemOut .out log in only one server? All nodes get their TaskManager > log > > files updated thu.2- why dont I see load being distributed amongst all 4 > > nodes, but only one all the times.3- Why does the Dashboard show a 0 > (zero) > > for Send/Receive numbers per all Task Managers. > > The Dashboard shows all the right stuff. Top shows not much of resources > > being stressed on any of the nodes.I can share its contents if it helps > > diagnosing the issue.Thanks + I appreciate your valuable time, response & > > help.Amir- > > > > >