Re: Connecting the channel failed: Connection refused
So the JobManager was running on host1. This also explains why I didn't see the problem until I had asked for a sizeable degree of parallelism since it probably never assigned a task to host3. Thanks for your help On Thu, Jun 25, 2015 at 3:34 AM, Stephan Ewen se...@apache.org wrote: Nice! TaskManagers need to announce where they listen for connections. We do not yet block localhost as an acceptable address, to not prohibit local test setups. There are some routines that try to select an interface that can communicate with the outside world. Is host3 running on the same machine as the JobManager? Or did you experience a long delay until TaskManager 3 was registered? Thanks for helping us debug this, Stephan On Wed, Jun 24, 2015 at 11:58 PM, Aaron Jackson ajack...@pobox.com wrote: That was it. host3 was showing localhost - looked a little further and it was missing an entry in /etc/hosts. Thanks for looking into this. Aaron On Wed, Jun 24, 2015 at 2:13 PM, Stephan Ewen se...@apache.org wrote: Aaron, Can you check how the TaskManagers register at the JobManager? When you look at the 'TaskManagers' section in the JobManager's web Interface (at port 8081), what does it say as the TaskManager host names? Does it list host1, host2, host3...? Thanks, Stephan Am 24.06.2015 20:31 schrieb Ufuk Celebi u...@apache.org: On 24 Jun 2015, at 16:22, Aaron Jackson ajack...@pobox.com wrote: Thanks. My setup is actually 3 task managers x 4 slots. I played with the parallelism and found that at low values, the error did not occur. I can only conclude that there is some form of data shuffling that is occurring that is sensitive to the data source. Yes, seems a little odd to me as well. OOC, did you load the file into HDFS or use it from a local file system (e.g. file:///tmp/data.csv) - my results have shown that so far, HDFS does not appear to be sensitive to this issue. I updated the example to include my configuration and slaves, but for brevity, I'll include the configurable bits here: jobmanager.rpc.address: host01 jobmanager.rpc.port: 6123 jobmanager.heap.mb: 512 taskmanager.heap.mb: 2048 taskmanager.numberOfTaskSlots: 4 parallelization.degree.default: 1 jobmanager.web.port: 8081 webclient.port: 8080 taskmanager.network.numberOfBuffers: 8192 taskmanager.tmp.dirs: /datassd/flink/tmp And the slaves ... host01 host02 host03 I did notice an extra empty line at the end of the slaves. And while I highly doubt it makes ANY difference, I'm still going to re-run with it removed. Thanks for looking into it. Thank you for being so helpful. I've tried it with the local filesystem. On 23 Jun 2015, at 07:11, Aaron Jackson ajack...@pobox.com wrote: I have 12 task managers across 3 machines - so it's a small setup. Sorry for my misunderstanding. I've tried it with both 12 task managers and 3 as well now. What's odd is that the stack trace shows that it is trying to connect to localhost for the remote channel although localhost is not configured anywhere. Let me think about that. ;) – Ufuk
Re: Documentation Error
Hey Maximilian Alber, I don't know if you are interested in contributing in Flink, but if you would like to, these small fixes to the documentation are really helpful for us! Its actually quite easy to work with the documentation locally. It is located in the docs/ directory of the Flink source. There is also a build.sh script, that will run a local webserver which is presenting the docs at localhost:4000. So you can read the Flink docs on your machine and make these small typo fixes as you spot them. In the end you commit the stuff to your github and open a pull request for us ;) On Thu, Jun 25, 2015 at 2:45 PM, Maximilian Alber alber.maximil...@gmail.com wrote: Another one: in the stream guide under Connecting to the outside world Sources I guess one by should be a be. http://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html On Thu, Jun 25, 2015 at 2:42 PM, Maximilian Michels m...@apache.org wrote: Thanks Max. I think the documentation has grown a lot and needs an overhaul. We should remove the unnecessary non-Flink-related stuff (e.g. configuring ssh keys in the setup guide). I like your idea of having an essential guide that just covers the basics for people already familiar with other big data projects. It would be great if somebody could spare time to work on that. On Thu, Jun 25, 2015 at 2:31 PM, Maximilian Alber alber.maximil...@gmail.com wrote: Something different. I just read through the Spark documentation and yours. While the Spark one is quite unstructured and easy to understand, yours is structured and really detailed. It's great that you have that in depth documentation, but I would recommend you to make a boiled-down page with just the basic stuff. Which would ease the life of beginners. Cheers, Max On Thu, Jun 25, 2015 at 2:20 PM, Chiwan Park chiwanp...@apache.org wrote: How to contribute, and coding guidelines are also duplicated on the web site and the documentation. I think this duplication is not needed. We need to merge the duplication. Regards, Chiwan Park On Jun 25, 2015, at 9:01 PM, Maximilian Michels m...@apache.org wrote: Thanks. Fixed. Actually, that one is not linked anywhere, right? Just realized the FAQ page is duplicated on the web site and the Flink documentation. So there is http://ci.apache.org/projects/flink/flink-docs-master/faq.html and http://flink.apache.org/faq.html I'm guessing somebody wanted a FAQ independent of the documentation version. However, I don't see how we will maintain multiple FAQs. The two have already diverged quite a bit and merging them is not trivial. On Thu, Jun 25, 2015 at 11:40 AM, Maximilian Alber alber.maximil...@gmail.com wrote: Another one: on http://ci.apache.org/projects/flink/flink-docs-master/faq.html in the What is parallelism? How do I set it? Section the links are broken. Cheers, Max On Wed, Jun 24, 2015 at 9:52 AM, Maximilian Michels m...@apache.org wrote: Hi Max, Thanks for noticing! Fixed on the master and for the 0.9.1 release. Cheers, Max On Tue, Jun 23, 2015 at 5:09 PM, Maximilian Alber alber.maximil...@gmail.com wrote: Hi Flinksters, just some minor: http://ci.apache.org/projects/flink/flink-docs-master/setup/yarn_setup.html in the second code sample should be ./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 ./examples/flink-java-examples-0.9-SNAPSHOT-WordCount.jar instead of: ./bin/flink -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 ./examples/flink-java-examples-0.9-SNAPSHOT-WordCount.jar Cheers, Max
Re: writeAsCsv on HDFS
It represents the folder containing the hadoop config files. :) Regards, Chiwan Park On Jun 25, 2015, at 10:07 PM, Flavio Pompermaier pomperma...@okkam.it wrote: fs.hdfs.hadoopconf represents the folder containing the hadoop config files (*-site.xml) or just one specific hadoop config file (e.g. core-site.xml or the hdfs-site.xml)? On Thu, Jun 25, 2015 at 3:04 PM, Robert Metzger rmetz...@apache.org wrote: Hi Flavio, there is a file called conf/flink-conf.yaml Add a new line in the file with the following contents: fs.hdfs.hadoopconf: /path/to/your/hadoop/config This should fix the problem. Flink can not load the configuration file from the jar containing the user code, because the file system is initialized independent of the the job. So there is (currently) no way of initializing the file system using the user code classloader. What you can do is making the configuration file available to Flink's system classloader. For example by putting your user jar into the lib/ folder of Flink. You can also add the path to the Hadoop configuration files into the CLASSPATH of Flink (but you need to do that on all machines). I think the easiest approach is using Flink's configuration file. On Thu, Jun 25, 2015 at 2:59 PM, Flavio Pompermaier pomperma...@okkam.it wrote: Could you describe it better with an example please? Why Flink doesn't load automatically the properties of the hadoop conf files within the jar? On Thu, Jun 25, 2015 at 2:55 PM, Robert Metzger rmetz...@apache.org wrote: Hi, Flink is not loading the Hadoop configuration from the classloader. You have to specify the path to the Hadoop configuration in the flink configuration fs.hdfs.hadoopconf On Thu, Jun 25, 2015 at 2:50 PM, Flavio Pompermaier pomperma...@okkam.it wrote: Hi to all, I'm experiencing some problem in writing a file as csv on HDFS with flink 0.9.0. The code I use is myDataset.writeAsCsv(new Path(hdfs:///tmp, myFile.csv).toString()); If I run the job from Eclipse everything works fine but when I deploy the job on the cluster (cloudera 5.1.3) I obtain the following exception: Caused by: java.io.IOException: The given HDFS file URI (hdfs:///tmp/myFile.csv) did not describe the HDFS NameNode. The attempt to use a default HDFS configuration, as specified in the 'fs.hdfs.hdfsdefault' or 'fs.hdfs.hdfssite' config parameter failed due to the following problem: Either no default file system was registered, or the provided configuration contains no valid authority component (fs.default.name or fs.defaultFS) describing the (hdfs namenode) host and port. at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:291) at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:258) at org.apache.flink.core.fs.Path.getFileSystem(Path.java:309) at org.apache.flink.api.common.io.FileOutputFormat.initializeGlobal(FileOutputFormat.java:273) at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:84) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:520) ... 25 more The core-site.xml is present in the fat jar and contains the property property namefs.defaultFS/name valuehdfs://myServerX:8020/value /property I compiled flink with the following command: mvn clean install -Dhadoop.version=2.3.0-cdh5.1.3 -Dhbase.version=0.98.1-cdh5.1.3 -Dhadoop.core.version=2.3.0-mr1-cdh5.1.3 -DskipTests -Pvendor-repos How can I fix that? Best, Flavio
Re: writeAsCsv on HDFS
Could you describe it better with an example please? Why Flink doesn't load automatically the properties of the hadoop conf files within the jar? On Thu, Jun 25, 2015 at 2:55 PM, Robert Metzger rmetz...@apache.org wrote: Hi, Flink is not loading the Hadoop configuration from the classloader. You have to specify the path to the Hadoop configuration in the flink configuration fs.hdfs.hadoopconf On Thu, Jun 25, 2015 at 2:50 PM, Flavio Pompermaier pomperma...@okkam.it wrote: Hi to all, I'm experiencing some problem in writing a file as csv on HDFS with flink 0.9.0. The code I use is myDataset.writeAsCsv(new Path(hdfs:///tmp, myFile.csv).toString()); If I run the job from Eclipse everything works fine but when I deploy the job on the cluster (cloudera 5.1.3) I obtain the following exception: Caused by: java.io.IOException: The given HDFS file URI (hdfs:///tmp/myFile.csv) did not describe the HDFS NameNode. The attempt to use a default HDFS configuration, as specified in the 'fs.hdfs.hdfsdefault' or 'fs.hdfs.hdfssite' config parameter failed due to the following problem: Either no default file system was registered, or the provided configuration contains no valid authority component (fs.default.name or fs.defaultFS) describing the (hdfs namenode) host and port. at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:291) at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:258) at org.apache.flink.core.fs.Path.getFileSystem(Path.java:309) at org.apache.flink.api.common.io.FileOutputFormat.initializeGlobal(FileOutputFormat.java:273) at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:84) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:520) ... 25 more The core-site.xml is present in the fat jar and contains the property property namefs.defaultFS/name valuehdfs://myServerX:8020/value /property I compiled flink with the following command: mvn clean install -Dhadoop.version=2.3.0-cdh5.1.3 -Dhbase.version=0.98.1-cdh5.1.3 -Dhadoop.core.version=2.3.0-mr1-cdh5.1.3 -DskipTests -Pvendor-repos How can I fix that? Best, Flavio
Re: writeAsCsv on HDFS
Do I have to put the hadoop conf file on each task manager or just on the job-manager? On Thu, Jun 25, 2015 at 3:12 PM, Chiwan Park chiwanp...@apache.org wrote: It represents the folder containing the hadoop config files. :) Regards, Chiwan Park On Jun 25, 2015, at 10:07 PM, Flavio Pompermaier pomperma...@okkam.it wrote: fs.hdfs.hadoopconf represents the folder containing the hadoop config files (*-site.xml) or just one specific hadoop config file (e.g. core-site.xml or the hdfs-site.xml)? On Thu, Jun 25, 2015 at 3:04 PM, Robert Metzger rmetz...@apache.org wrote: Hi Flavio, there is a file called conf/flink-conf.yaml Add a new line in the file with the following contents: fs.hdfs.hadoopconf: /path/to/your/hadoop/config This should fix the problem. Flink can not load the configuration file from the jar containing the user code, because the file system is initialized independent of the the job. So there is (currently) no way of initializing the file system using the user code classloader. What you can do is making the configuration file available to Flink's system classloader. For example by putting your user jar into the lib/ folder of Flink. You can also add the path to the Hadoop configuration files into the CLASSPATH of Flink (but you need to do that on all machines). I think the easiest approach is using Flink's configuration file. On Thu, Jun 25, 2015 at 2:59 PM, Flavio Pompermaier pomperma...@okkam.it wrote: Could you describe it better with an example please? Why Flink doesn't load automatically the properties of the hadoop conf files within the jar? On Thu, Jun 25, 2015 at 2:55 PM, Robert Metzger rmetz...@apache.org wrote: Hi, Flink is not loading the Hadoop configuration from the classloader. You have to specify the path to the Hadoop configuration in the flink configuration fs.hdfs.hadoopconf On Thu, Jun 25, 2015 at 2:50 PM, Flavio Pompermaier pomperma...@okkam.it wrote: Hi to all, I'm experiencing some problem in writing a file as csv on HDFS with flink 0.9.0. The code I use is myDataset.writeAsCsv(new Path(hdfs:///tmp, myFile.csv).toString()); If I run the job from Eclipse everything works fine but when I deploy the job on the cluster (cloudera 5.1.3) I obtain the following exception: Caused by: java.io.IOException: The given HDFS file URI (hdfs:///tmp/myFile.csv) did not describe the HDFS NameNode. The attempt to use a default HDFS configuration, as specified in the 'fs.hdfs.hdfsdefault' or 'fs.hdfs.hdfssite' config parameter failed due to the following problem: Either no default file system was registered, or the provided configuration contains no valid authority component (fs.default.name or fs.defaultFS) describing the (hdfs namenode) host and port. at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:291) at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:258) at org.apache.flink.core.fs.Path.getFileSystem(Path.java:309) at org.apache.flink.api.common.io.FileOutputFormat.initializeGlobal(FileOutputFormat.java:273) at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:84) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:520) ... 25 more The core-site.xml is present in the fat jar and contains the property property namefs.defaultFS/name valuehdfs://myServerX:8020/value /property I compiled flink with the following command: mvn clean install -Dhadoop.version=2.3.0-cdh5.1.3 -Dhbase.version=0.98.1-cdh5.1.3 -Dhadoop.core.version=2.3.0-mr1-cdh5.1.3 -DskipTests -Pvendor-repos How can I fix that? Best, Flavio
Re: Documentation Error
Something different. I just read through the Spark documentation and yours. While the Spark one is quite unstructured and easy to understand, yours is structured and really detailed. It's great that you have that in depth documentation, but I would recommend you to make a boiled-down page with just the basic stuff. Which would ease the life of beginners. Cheers, Max On Thu, Jun 25, 2015 at 2:20 PM, Chiwan Park chiwanp...@apache.org wrote: How to contribute, and coding guidelines are also duplicated on the web site and the documentation. I think this duplication is not needed. We need to merge the duplication. Regards, Chiwan Park On Jun 25, 2015, at 9:01 PM, Maximilian Michels m...@apache.org wrote: Thanks. Fixed. Actually, that one is not linked anywhere, right? Just realized the FAQ page is duplicated on the web site and the Flink documentation. So there is http://ci.apache.org/projects/flink/flink-docs-master/faq.html and http://flink.apache.org/faq.html I'm guessing somebody wanted a FAQ independent of the documentation version. However, I don't see how we will maintain multiple FAQs. The two have already diverged quite a bit and merging them is not trivial. On Thu, Jun 25, 2015 at 11:40 AM, Maximilian Alber alber.maximil...@gmail.com wrote: Another one: on http://ci.apache.org/projects/flink/flink-docs-master/faq.html in the What is parallelism? How do I set it? Section the links are broken. Cheers, Max On Wed, Jun 24, 2015 at 9:52 AM, Maximilian Michels m...@apache.org wrote: Hi Max, Thanks for noticing! Fixed on the master and for the 0.9.1 release. Cheers, Max On Tue, Jun 23, 2015 at 5:09 PM, Maximilian Alber alber.maximil...@gmail.com wrote: Hi Flinksters, just some minor: http://ci.apache.org/projects/flink/flink-docs-master/setup/yarn_setup.html in the second code sample should be ./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 ./examples/flink-java-examples-0.9-SNAPSHOT-WordCount.jar instead of: ./bin/flink -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 ./examples/flink-java-examples-0.9-SNAPSHOT-WordCount.jar Cheers, Max
Re: Documentation Error
Thanks for noticing, Chiwan. I have the feeling this problem arose when the website was updated. The problem about linking documentation pages from the main website is that it is currently hard to go back to the main web site from the documentation (the nav and URL changes). However, now we are suffering from fragmentation. I would suggest to move the FAQ and How To Contribute to the Flink web site and delete them from the Flink repository. Cheers, Max On Thu, Jun 25, 2015 at 2:20 PM, Chiwan Park chiwanp...@apache.org wrote: How to contribute, and coding guidelines are also duplicated on the web site and the documentation. I think this duplication is not needed. We need to merge the duplication. Regards, Chiwan Park On Jun 25, 2015, at 9:01 PM, Maximilian Michels m...@apache.org wrote: Thanks. Fixed. Actually, that one is not linked anywhere, right? Just realized the FAQ page is duplicated on the web site and the Flink documentation. So there is http://ci.apache.org/projects/flink/flink-docs-master/faq.html and http://flink.apache.org/faq.html I'm guessing somebody wanted a FAQ independent of the documentation version. However, I don't see how we will maintain multiple FAQs. The two have already diverged quite a bit and merging them is not trivial. On Thu, Jun 25, 2015 at 11:40 AM, Maximilian Alber alber.maximil...@gmail.com wrote: Another one: on http://ci.apache.org/projects/flink/flink-docs-master/faq.html in the What is parallelism? How do I set it? Section the links are broken. Cheers, Max On Wed, Jun 24, 2015 at 9:52 AM, Maximilian Michels m...@apache.org wrote: Hi Max, Thanks for noticing! Fixed on the master and for the 0.9.1 release. Cheers, Max On Tue, Jun 23, 2015 at 5:09 PM, Maximilian Alber alber.maximil...@gmail.com wrote: Hi Flinksters, just some minor: http://ci.apache.org/projects/flink/flink-docs-master/setup/yarn_setup.html in the second code sample should be ./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 ./examples/flink-java-examples-0.9-SNAPSHOT-WordCount.jar instead of: ./bin/flink -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 ./examples/flink-java-examples-0.9-SNAPSHOT-WordCount.jar Cheers, Max
Re: Documentation Error
Thanks Max. I think the documentation has grown a lot and needs an overhaul. We should remove the unnecessary non-Flink-related stuff (e.g. configuring ssh keys in the setup guide). I like your idea of having an essential guide that just covers the basics for people already familiar with other big data projects. It would be great if somebody could spare time to work on that. On Thu, Jun 25, 2015 at 2:31 PM, Maximilian Alber alber.maximil...@gmail.com wrote: Something different. I just read through the Spark documentation and yours. While the Spark one is quite unstructured and easy to understand, yours is structured and really detailed. It's great that you have that in depth documentation, but I would recommend you to make a boiled-down page with just the basic stuff. Which would ease the life of beginners. Cheers, Max On Thu, Jun 25, 2015 at 2:20 PM, Chiwan Park chiwanp...@apache.org wrote: How to contribute, and coding guidelines are also duplicated on the web site and the documentation. I think this duplication is not needed. We need to merge the duplication. Regards, Chiwan Park On Jun 25, 2015, at 9:01 PM, Maximilian Michels m...@apache.org wrote: Thanks. Fixed. Actually, that one is not linked anywhere, right? Just realized the FAQ page is duplicated on the web site and the Flink documentation. So there is http://ci.apache.org/projects/flink/flink-docs-master/faq.html and http://flink.apache.org/faq.html I'm guessing somebody wanted a FAQ independent of the documentation version. However, I don't see how we will maintain multiple FAQs. The two have already diverged quite a bit and merging them is not trivial. On Thu, Jun 25, 2015 at 11:40 AM, Maximilian Alber alber.maximil...@gmail.com wrote: Another one: on http://ci.apache.org/projects/flink/flink-docs-master/faq.html in the What is parallelism? How do I set it? Section the links are broken. Cheers, Max On Wed, Jun 24, 2015 at 9:52 AM, Maximilian Michels m...@apache.org wrote: Hi Max, Thanks for noticing! Fixed on the master and for the 0.9.1 release. Cheers, Max On Tue, Jun 23, 2015 at 5:09 PM, Maximilian Alber alber.maximil...@gmail.com wrote: Hi Flinksters, just some minor: http://ci.apache.org/projects/flink/flink-docs-master/setup/yarn_setup.html in the second code sample should be ./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 ./examples/flink-java-examples-0.9-SNAPSHOT-WordCount.jar instead of: ./bin/flink -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 ./examples/flink-java-examples-0.9-SNAPSHOT-WordCount.jar Cheers, Max
Re: Documentation Error
On 25 Jun 2015, at 14:31, Maximilian Michels m...@apache.org wrote: Thanks for noticing, Chiwan. I have the feeling this problem arose when the website was updated. The problem about linking documentation pages from the main website is that it is currently hard to go back to the main web site from the documentation (the nav and URL changes). However, now we are suffering from fragmentation. I would suggest to move the FAQ and How To Contribute to the Flink web site and delete them from the Flink repository. +1
Re: Documentation Error
Another one: in the stream guide under Connecting to the outside world Sources I guess one by should be a be. http://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html On Thu, Jun 25, 2015 at 2:42 PM, Maximilian Michels m...@apache.org wrote: Thanks Max. I think the documentation has grown a lot and needs an overhaul. We should remove the unnecessary non-Flink-related stuff (e.g. configuring ssh keys in the setup guide). I like your idea of having an essential guide that just covers the basics for people already familiar with other big data projects. It would be great if somebody could spare time to work on that. On Thu, Jun 25, 2015 at 2:31 PM, Maximilian Alber alber.maximil...@gmail.com wrote: Something different. I just read through the Spark documentation and yours. While the Spark one is quite unstructured and easy to understand, yours is structured and really detailed. It's great that you have that in depth documentation, but I would recommend you to make a boiled-down page with just the basic stuff. Which would ease the life of beginners. Cheers, Max On Thu, Jun 25, 2015 at 2:20 PM, Chiwan Park chiwanp...@apache.org wrote: How to contribute, and coding guidelines are also duplicated on the web site and the documentation. I think this duplication is not needed. We need to merge the duplication. Regards, Chiwan Park On Jun 25, 2015, at 9:01 PM, Maximilian Michels m...@apache.org wrote: Thanks. Fixed. Actually, that one is not linked anywhere, right? Just realized the FAQ page is duplicated on the web site and the Flink documentation. So there is http://ci.apache.org/projects/flink/flink-docs-master/faq.html and http://flink.apache.org/faq.html I'm guessing somebody wanted a FAQ independent of the documentation version. However, I don't see how we will maintain multiple FAQs. The two have already diverged quite a bit and merging them is not trivial. On Thu, Jun 25, 2015 at 11:40 AM, Maximilian Alber alber.maximil...@gmail.com wrote: Another one: on http://ci.apache.org/projects/flink/flink-docs-master/faq.html in the What is parallelism? How do I set it? Section the links are broken. Cheers, Max On Wed, Jun 24, 2015 at 9:52 AM, Maximilian Michels m...@apache.org wrote: Hi Max, Thanks for noticing! Fixed on the master and for the 0.9.1 release. Cheers, Max On Tue, Jun 23, 2015 at 5:09 PM, Maximilian Alber alber.maximil...@gmail.com wrote: Hi Flinksters, just some minor: http://ci.apache.org/projects/flink/flink-docs-master/setup/yarn_setup.html in the second code sample should be ./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 ./examples/flink-java-examples-0.9-SNAPSHOT-WordCount.jar instead of: ./bin/flink -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 ./examples/flink-java-examples-0.9-SNAPSHOT-WordCount.jar Cheers, Max
writeAsCsv on HDFS
Hi to all, I'm experiencing some problem in writing a file as csv on HDFS with flink 0.9.0. The code I use is myDataset.writeAsCsv(new Path(hdfs:///tmp, myFile.csv).toString()); If I run the job from Eclipse everything works fine but when I deploy the job on the cluster (cloudera 5.1.3) I obtain the following exception: Caused by: java.io.IOException: The given HDFS file URI (hdfs:///tmp/myFile.csv) did not describe the HDFS NameNode. The attempt to use a default HDFS configuration, as specified in the 'fs.hdfs.hdfsdefault' or 'fs.hdfs.hdfssite' config parameter failed due to the following problem: Either no default file system was registered, or the provided configuration contains no valid authority component (fs.default.name or fs.defaultFS) describing the (hdfs namenode) host and port. at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:291) at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:258) at org.apache.flink.core.fs.Path.getFileSystem(Path.java:309) at org.apache.flink.api.common.io.FileOutputFormat.initializeGlobal(FileOutputFormat.java:273) at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:84) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:520) ... 25 more The core-site.xml is present in the fat jar and contains the property property namefs.defaultFS/name valuehdfs://myServerX:8020/value /property I compiled flink with the following command: mvn clean install -Dhadoop.version=2.3.0-cdh5.1.3 -Dhbase.version=0.98.1-cdh5.1.3 -Dhadoop.core.version=2.3.0-mr1-cdh5.1.3 -DskipTests -Pvendor-repos How can I fix that? Best, Flavio
Re: writeAsCsv on HDFS
You could also just qualify the HDFS URL, if that is simpler (put host and port of the namenode in there): hdfs://myhost:40010/path/to/file On Thu, Jun 25, 2015 at 3:20 PM, Robert Metzger rmetz...@apache.org wrote: You have to put it into all machines On Thu, Jun 25, 2015 at 3:17 PM, Flavio Pompermaier pomperma...@okkam.it wrote: Do I have to put the hadoop conf file on each task manager or just on the job-manager? On Thu, Jun 25, 2015 at 3:12 PM, Chiwan Park chiwanp...@apache.org wrote: It represents the folder containing the hadoop config files. :) Regards, Chiwan Park On Jun 25, 2015, at 10:07 PM, Flavio Pompermaier pomperma...@okkam.it wrote: fs.hdfs.hadoopconf represents the folder containing the hadoop config files (*-site.xml) or just one specific hadoop config file (e.g. core-site.xml or the hdfs-site.xml)? On Thu, Jun 25, 2015 at 3:04 PM, Robert Metzger rmetz...@apache.org wrote: Hi Flavio, there is a file called conf/flink-conf.yaml Add a new line in the file with the following contents: fs.hdfs.hadoopconf: /path/to/your/hadoop/config This should fix the problem. Flink can not load the configuration file from the jar containing the user code, because the file system is initialized independent of the the job. So there is (currently) no way of initializing the file system using the user code classloader. What you can do is making the configuration file available to Flink's system classloader. For example by putting your user jar into the lib/ folder of Flink. You can also add the path to the Hadoop configuration files into the CLASSPATH of Flink (but you need to do that on all machines). I think the easiest approach is using Flink's configuration file. On Thu, Jun 25, 2015 at 2:59 PM, Flavio Pompermaier pomperma...@okkam.it wrote: Could you describe it better with an example please? Why Flink doesn't load automatically the properties of the hadoop conf files within the jar? On Thu, Jun 25, 2015 at 2:55 PM, Robert Metzger rmetz...@apache.org wrote: Hi, Flink is not loading the Hadoop configuration from the classloader. You have to specify the path to the Hadoop configuration in the flink configuration fs.hdfs.hadoopconf On Thu, Jun 25, 2015 at 2:50 PM, Flavio Pompermaier pomperma...@okkam.it wrote: Hi to all, I'm experiencing some problem in writing a file as csv on HDFS with flink 0.9.0. The code I use is myDataset.writeAsCsv(new Path(hdfs:///tmp, myFile.csv).toString()); If I run the job from Eclipse everything works fine but when I deploy the job on the cluster (cloudera 5.1.3) I obtain the following exception: Caused by: java.io.IOException: The given HDFS file URI (hdfs:///tmp/myFile.csv) did not describe the HDFS NameNode. The attempt to use a default HDFS configuration, as specified in the 'fs.hdfs.hdfsdefault' or 'fs.hdfs.hdfssite' config parameter failed due to the following problem: Either no default file system was registered, or the provided configuration contains no valid authority component (fs.default.name or fs.defaultFS) describing the (hdfs namenode) host and port. at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:291) at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:258) at org.apache.flink.core.fs.Path.getFileSystem(Path.java:309) at org.apache.flink.api.common.io.FileOutputFormat.initializeGlobal(FileOutputFormat.java:273) at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:84) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:520) ... 25 more The core-site.xml is present in the fat jar and contains the property property namefs.defaultFS/name valuehdfs://myServerX:8020/value /property I compiled flink with the following command: mvn clean install -Dhadoop.version=2.3.0-cdh5.1.3 -Dhbase.version=0.98.1-cdh5.1.3 -Dhadoop.core.version=2.3.0-mr1-cdh5.1.3 -DskipTests -Pvendor-repos How can I fix that? Best, Flavio
Re: Connecting the channel failed: Connection refused
That makes perfect sense, thanks! Am 25.06.2015 21:39 schrieb Aaron Jackson ajack...@pobox.com: So the JobManager was running on host1. This also explains why I didn't see the problem until I had asked for a sizeable degree of parallelism since it probably never assigned a task to host3. Thanks for your help On Thu, Jun 25, 2015 at 3:34 AM, Stephan Ewen se...@apache.org wrote: Nice! TaskManagers need to announce where they listen for connections. We do not yet block localhost as an acceptable address, to not prohibit local test setups. There are some routines that try to select an interface that can communicate with the outside world. Is host3 running on the same machine as the JobManager? Or did you experience a long delay until TaskManager 3 was registered? Thanks for helping us debug this, Stephan On Wed, Jun 24, 2015 at 11:58 PM, Aaron Jackson ajack...@pobox.com wrote: That was it. host3 was showing localhost - looked a little further and it was missing an entry in /etc/hosts. Thanks for looking into this. Aaron On Wed, Jun 24, 2015 at 2:13 PM, Stephan Ewen se...@apache.org wrote: Aaron, Can you check how the TaskManagers register at the JobManager? When you look at the 'TaskManagers' section in the JobManager's web Interface (at port 8081), what does it say as the TaskManager host names? Does it list host1, host2, host3...? Thanks, Stephan Am 24.06.2015 20:31 schrieb Ufuk Celebi u...@apache.org: On 24 Jun 2015, at 16:22, Aaron Jackson ajack...@pobox.com wrote: Thanks. My setup is actually 3 task managers x 4 slots. I played with the parallelism and found that at low values, the error did not occur. I can only conclude that there is some form of data shuffling that is occurring that is sensitive to the data source. Yes, seems a little odd to me as well. OOC, did you load the file into HDFS or use it from a local file system (e.g. file:///tmp/data.csv) - my results have shown that so far, HDFS does not appear to be sensitive to this issue. I updated the example to include my configuration and slaves, but for brevity, I'll include the configurable bits here: jobmanager.rpc.address: host01 jobmanager.rpc.port: 6123 jobmanager.heap.mb: 512 taskmanager.heap.mb: 2048 taskmanager.numberOfTaskSlots: 4 parallelization.degree.default: 1 jobmanager.web.port: 8081 webclient.port: 8080 taskmanager.network.numberOfBuffers: 8192 taskmanager.tmp.dirs: /datassd/flink/tmp And the slaves ... host01 host02 host03 I did notice an extra empty line at the end of the slaves. And while I highly doubt it makes ANY difference, I'm still going to re-run with it removed. Thanks for looking into it. Thank you for being so helpful. I've tried it with the local filesystem. On 23 Jun 2015, at 07:11, Aaron Jackson ajack...@pobox.com wrote: I have 12 task managers across 3 machines - so it's a small setup. Sorry for my misunderstanding. I've tried it with both 12 task managers and 3 as well now. What's odd is that the stack trace shows that it is trying to connect to localhost for the remote channel although localhost is not configured anywhere. Let me think about that. ;) – Ufuk
ArrayIndexOutOfBoundsException when running job from JAR
Hi, I get an ArrayIndexOutOfBoundsException when I run my job from a JAR in the CLI. This doesn't occur in the IDE. I've build the JAR using the maven-shade-plugin and the pom.xml configuration Robert has provided here: https://stackoverflow.com/questions/30102523/linkage-failure-when-running-apache-flink-jobs I specify the entry point using the -c option. The array the Exception refers to is actually initialized when a vertices dataset is read from the file system. Any ideas on what could cause this issue? Best, Mihail P.S.: the stack trace: / //org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.// //at org.apache.flink.client.program.Client.run(Client.java:413)// //at org.apache.flink.client.program.Client.run(Client.java:356)// //at org.apache.flink.client.program.Client.run(Client.java:349)// //at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)// //at graphdistance.KAPSPNaiveJob.main(KAPSPNaiveJob.java:56)// //at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)// //at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)// //at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)// //at java.lang.reflect.Method.invoke(Method.java:606)// //at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)// //at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)// //at org.apache.flink.client.program.Client.run(Client.java:315)// //at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)// //at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)// //at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)// //at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)// //Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.// //at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)// //at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)// //at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)// //at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)// //at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)// //at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)// //at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)// //at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)// //at akka.actor.Actor$class.aroundReceive(Actor.scala:465)// //at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)// //at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)// //at akka.actor.ActorCell.invoke(ActorCell.scala:487)// //at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)// //at akka.dispatch.Mailbox.run(Mailbox.scala:221)// //at akka.dispatch.Mailbox.exec(Mailbox.scala:231)// //at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)// //at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)// //at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)// //at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)// //Caused by: java.lang.*ArrayIndexOutOfBoundsException*: 0// //at graphdistance.APSP$InitVerticesMapper.map(APSP.java:74)// //at graphdistance.APSP$InitVerticesMapper.map(APSP.java:48)// //at org.apache.flink.graph.Graph$2.map(Graph.java:389)// //at org.apache.flink.graph.Graph$2.map(Graph.java:387)// //at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)// //at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)// //at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)// //at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)// //at java.lang.Thread.run(Thread.java:745)/
Re: Cannot instantiate Mysql connection
Good to hear it works. Libraries, class-loading, and initialization seems to be one of the things that remains tricky once one switches to distributed processed. On Thu, Jun 25, 2015 at 10:58 AM, Flavio Pompermaier pomperma...@okkam.it wrote: Sorry for the late response but I was on vacation the last 2 weeks.. Calling Class.forName(com.mysql.jdbc.Driver) in the main() of my class made the things work! Thanks for the support, Flavio On Fri, Jun 5, 2015 at 11:13 PM, Stephan Ewen se...@apache.org wrote: Can you manually load the driver class, with Class.forName(...), or does that yield a ClassNotFoundException ? On Fri, Jun 5, 2015 at 11:10 PM, Flavio Pompermaier pomperma...@okkam.it wrote: in the fat jar On 5 Jun 2015 19:28, Stephan Ewen se...@apache.org wrote: In which way is the driver in the classpath? - fat jar? - in the nested /out folder in the slim jar? On Fri, Jun 5, 2015 at 7:23 PM, Flavio Pompermaier pomperma...@okkam.it wrote: Actually I just need to load it in the main method (job manager) before calling any flink operation, I retrieve the records in a mysql table because they contain the path of files I'll need to read. Nothing more nothing less On 5 Jun 2015 19:06, Robert Metzger rmetz...@apache.org wrote: Sure. So the DriverManager has a static variable called registeredDrivers. When DriverManager.getConnection() is called, the method is looking up if an registered driver for that connection (in this case mysql) is available. For drivers to be in that list, they have to register themselves using the DriverManager.registerDriver() method. Drivers can register themselves with a static constructor (which is executed when Java loads a class): public class PoolingDriver implements Driver { /** Register myself with the {@link DriverManager}. */ static { try { DriverManager.registerDriver(new PoolingDriver()); } catch(Exception e) { } } To execute that driver registration, you need to do: Class.forName(org.datanucleus.store.rdbms.datasource.dbcp. PoolingDriver); because then Java is loading the class and executing the static constructor which is registering the driver at the connection manager. When executing Flink locally, you are using only one JVM. By calling the MySQL driver manually in the main() method of your flink job, you are registering the MySQL driver at the DriverManager of that JVM. However, when you run Flink in a distributed cluster, at the TaskManager JVMs, the MySQL driver is not loaded at the DriverManager there. Therefore, you have to make sure that Class.forName( org.datanucleus.store.rdbms.datasource.dbcp.PoolingDriver); (this is not the correct class for the MySQL driver) has been called. One approach to do that is to call Class.forName() in the open() method of your function. Best, Robert On Fri, Jun 5, 2015 at 6:54 PM, Flavio Pompermaier pomperma...@okkam.it wrote: HI Robert, In the main method I connect to a mysql table that acts as a data-source repository that I use to know which dataset I need to load. All mysql classes are present in the shaded jar. Could you explain a little bit more in detail the solution to fix this problem please? Sorry but I didn't understand it :( Thanks, Flavio On 5 Jun 2015 18:33, Robert Metzger rmetz...@apache.org wrote: Hi Stefano, I doubt that there are conflicting dependencies because Flink does not contain MySQL dependencies. Are you using Flink's JDBCInputFormat or custom code? For drivers to register at java.sql's DriverManager, their classes need to be loaded first. To load a class, you need to call Class.forName(classname); Maybe you are loading the class in the application's main() method (thats why it is working from eclipse) but not on the cluster instances which are supposed to read the data. On Fri, Jun 5, 2015 at 5:16 PM, Stefano Bortoli s.bort...@gmail.com wrote: Hi Robert, I answer on behalf of Flavio. He told me the driver jar was included. Smells lik class-loading issue due to 'conflicting' dependencies. Is it possible? Saluti, Stefano 2015-06-05 16:24 GMT+02:00 Robert Metzger rmetz...@apache.org: Hi, is the MySQL driver part of the Jar file that you've build? On Fri, Jun 5, 2015 at 4:11 PM, Flavio Pompermaier pomperma...@okkam.it wrote: Hi to all, I'm using a fresh build of flink-0.9-SNAPSHOT and in my flink job I set up a mysql connection. When I run the job from Eclipse everything is fine, while when running the job from the Web UI I get the following exception: java.sql.SQLException: No suitable driver found for jdbc:mysql:/localhost:3306/mydb?autoReconnect=true at java.sql.DriverManager.getConnection(DriverManager.java:596) at java.sql.DriverManager.getConnection(DriverManager.java:215) How can I fix that? Best, Flavio
Re: Documentation Error
Another one: on http://ci.apache.org/projects/flink/flink-docs-master/faq.html in the What is parallelism? How do I set it? Section the links are broken. Cheers, Max On Wed, Jun 24, 2015 at 9:52 AM, Maximilian Michels m...@apache.org wrote: Hi Max, Thanks for noticing! Fixed on the master and for the 0.9.1 release. Cheers, Max On Tue, Jun 23, 2015 at 5:09 PM, Maximilian Alber alber.maximil...@gmail.com wrote: Hi Flinksters, just some minor: http://ci.apache.org/projects/flink/flink-docs-master/setup/yarn_setup.html in the second code sample should be ./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 ./examples/flink-java-examples-0.9-SNAPSHOT-WordCount.jar instead of: ./bin/flink -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 ./examples/flink-java-examples-0.9-SNAPSHOT-WordCount.jar Cheers, Max