It seems that the hadoop conf file is only evaluated with the scheme is null. A non-null scheme tries to parse all information from the URI (which here has no host and port defines, hence the error about "null" and "-1").
On Wed, Jan 7, 2015 at 3:11 PM, Robert Metzger <[email protected]> wrote: > Does one of the Hadoop configuration files contain an entry with the key > "fs.xtreemfs.impl" ? > > On Wed, Jan 7, 2015 at 3:05 PM, Lukas Kairies < > [email protected]> wrote: > >> Unfortunately, it does not work with XtreemFS. I set >> "fs.hdfs.hadoopconf" to the Hadoop configuration directory and tried to run >> the word count example: >> >> bin/flink run -v examples/flink-java-examples-0.9-SNAPSHOT-WordCount.jar >> xtreemfs:///test.txt xtreemfs:///result.txt >> >> The following error occurred: >> >> Error: The main method caused an error. >> org.apache.flink.client.program.ProgramInvocationException: The main >> method caused an error. >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:449) >> at >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:350) >> at org.apache.flink.client.program.Client.run(Client.java:242) >> at >> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:349) >> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:336) >> at >> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:976) >> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1000) >> Caused by: java.io.IOException: The given file URI >> (xtreemfs:///result.txt) points to the HDFS NameNode at null, but the File >> System could not be initialized with that address: port out of range:-1 >> at >> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:325) >> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:244) >> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:299) >> at >> org.apache.flink.api.common.io.FileOutputFormat.initializeGlobal(FileOutputFormat.java:267) >> at >> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:84) >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1$$anonfun$applyOrElse$5.apply(JobManager.scala:179) >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1$$anonfun$applyOrElse$5.apply(JobManager.scala:172) >> at scala.collection.Iterator$class.foreach(Iterator.scala:727) >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:172) >> at >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) >> at >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) >> at >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) >> at >> org.apache.flink.yarn.YarnJobManager$$anonfun$receiveYarnMessages$1.applyOrElse(YarnJobManager.scala:68) >> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) >> at >> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:37) >> at >> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:27) >> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) >> at >> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:27) >> at akka.actor.Actor$class.aroundReceive(Actor.scala:465) >> at >> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:52) >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) >> at akka.dispatch.Mailbox.run(Mailbox.scala:221) >> at akka.dispatch.Mailbox.exec(Mailbox.scala:231) >> at >> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> at >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> at >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> Caused by: java.lang.IllegalArgumentException: port out of range:-1 >> at java.net.InetSocketAddress.checkPort(InetSocketAddress.java:143) >> at java.net.InetSocketAddress.<init>(InetSocketAddress.java:224) >> at >> org.xtreemfs.common.libxtreemfs.Helper.stringToInetSocketAddress(Helper.java:82) >> at >> org.xtreemfs.common.libxtreemfs.RPCCaller.getInetSocketAddressFromAddress(RPCCaller.java:301) >> at >> org.xtreemfs.common.libxtreemfs.RPCCaller.syncCall(RPCCaller.java:88) >> at >> org.xtreemfs.common.libxtreemfs.RPCCaller.syncCall(RPCCaller.java:49) >> at >> org.xtreemfs.common.libxtreemfs.ClientImplementation.listVolumeNames(ClientImplementation.java:529) >> at >> org.xtreemfs.common.clients.hadoop.XtreemFSFileSystem.initialize(XtreemFSFileSystem.java:164) >> at >> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:311) >> ... 31 more >> >> Am 07.01.2015 um 14:04 schrieb Stephan Ewen: >> >> Hi! >> >> You can reference a Hadoop configuration with a defaultFS entry via >> "fs.hdfs.hadoopconf". >> >> Have a look at the configuration reference for details: >> http://flink.incubator.apache.org/docs/0.7-incubating/config.html >> >> Let us know if it works for XtreemFS... >> >> Greetings, >> Stephan >> Am 07.01.2015 13:51 schrieb "Lukas Kairies" < >> [email protected]>: >> >>> Thanks, now it works :) It is possible so set a default filesystem in >>> flink like in Hadoop (with fs.default.name)? Currently I always have to >>> set the complete file URI like xtreemfs://<host>:<port>/file >>> >>> Best, >>> Lukas >>> Am 07.01.2015 um 12:22 schrieb Robert Metzger: >>> >>> Hi Lukas, >>> >>> I see that there is a XtreemFS Hadoop client ( >>> http://www.xtreemfs.org/download.php?t=source). WIth this pending pull >>> request https://github.com/apache/flink/pull/268 you can use all file >>> systems supported by hadoop with Flink (we support the >>> org.apache.hadoop.FileSystems interface). >>> >>> The pull request has not been merged yet because of a failing test, >>> but that should not affect you. >>> If you want, you can check out the branch of my pull request >>> >>> git clone https://github.com/rmetzger/flink.git >>> cd flink >>> git checkout flink1266 >>> mvn clean install -DskipTests >>> >>> In the "flink-dist/target/flink-XXX/flink-yarn-XXX/" directory is the >>> finished built. >>> >>> Let me know if you need more help or information. >>> >>> Best, >>> Robert >>> >>> >>> On Wed, Jan 7, 2015 at 12:00 PM, Lukas Kairies < >>> [email protected]> wrote: >>> >>>> Hello, >>>> >>>> I like to test flink on YARN with the alternative file system XtreemFS. >>>> Therefore I have to add a jar file to flink but I found no possibility to >>>> do so. How can I do this? Hadoop works fine with XtreemFS. >>>> >>>> Thanks >>>> >>>> Lukas >>>> >>> >>> >>> >> >
