Hi Jeff, In TextIO, the path "/test/kinglear.txt" is interpreted as a local file URL, the same as file:///test/kinglear.txt".
To use a file from HDFS, you will want to try the contrib HadoopFileSource [1]. Note that any moment it will be moved by PR #96 [2] but it will work exactly the same. Kenn [1] https://github.com/apache/incubator-beam/blob/master/contrib/hadoop/src/main/java/com/google/cloud/dataflow/contrib/hadoop/HadoopFileSource.java [2] https://github.com/apache/incubator-beam/pull/96 On Tue, Apr 12, 2016 at 4:49 AM, Jianfeng Qian <[email protected]> wrote: > Hi, > > I also tried the Spark Cluster mode. > > I upload the file to hdfs. > > jeff@T:~/git/incubator-beam/runners/spark$ hadoop fs -ls /test > Found 2 items > drwxr-xr-x - jeff supergroup 0 2016-04-12 11:22 /test/beam > -rw-r--r-- 1 jeff supergroup 185965 2016-04-12 15:58 > /test/kinglear.txt > > When I try to run the cluster mode, it has the similar problem of local > mode. > > And when I try set input as the /kinglearn.txt, it will not quit until I > use Ctrl+Z. > > The log is as following: > > > jeff@T:~/git/incubator-beam/runners/spark$ spark-submit \ > > --class com.google.cloud.dataflow.examples.WordCount \ > > --master yarn-client \ > > target/spark-runner-*-spark-app.jar \ > > --inputFile=/test/kinglear.txt --output=/test/out > --runner=SparkPipelineRunner --sparkMaster=yarn-client > Exception in thread "main" java.lang.IllegalStateException: Failed to > validate /test/kinglear.txt > at > com.google.cloud.dataflow.sdk.io.TextIO$Read$Bound.apply(TextIO.java:305) > at > com.google.cloud.dataflow.sdk.io.TextIO$Read$Bound.apply(TextIO.java:202) > at > com.google.cloud.dataflow.sdk.runners.PipelineRunner.apply(PipelineRunner.java:75) > at > org.apache.beam.runners.spark.SparkPipelineRunner.apply(SparkPipelineRunner.java:122) > at > com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:368) > at > com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:275) > at com.google.cloud.dataflow.sdk.values.PBegin.apply(PBegin.java:48) > at com.google.cloud.dataflow.sdk.Pipeline.apply(Pipeline.java:157) > at > com.google.cloud.dataflow.examples.WordCount.main(WordCount.java:200) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) > at > org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) > at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > Caused by: java.io.IOException: Unable to find parent directory of > /test/kinglear.txt > at > com.google.cloud.dataflow.sdk.util.FileIOChannelFactory.match(FileIOChannelFactory.java:60) > at > com.google.cloud.dataflow.sdk.io.TextIO$Read$Bound.apply(TextIO.java:300) > ... 17 more > jeff@T:~/git/incubator-beam/runners/spark$ spark-submit --class > com.google.cloud.dataflow.examples.WordCount --master yarn-client > target/spark-runner-*-spark-app.jar --inputFile=kinglear.txt > --output=/test/out --runner=SparkPipelineRunner --sparkMaster=yarn-client > Exception in thread "main" java.lang.IllegalStateException: Unable to find > any files matching kinglear.txt > at > com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:199) > at > com.google.cloud.dataflow.sdk.io.TextIO$Read$Bound.apply(TextIO.java:299) > at > com.google.cloud.dataflow.sdk.io.TextIO$Read$Bound.apply(TextIO.java:202) > at > com.google.cloud.dataflow.sdk.runners.PipelineRunner.apply(PipelineRunner.java:75) > at > org.apache.beam.runners.spark.SparkPipelineRunner.apply(SparkPipelineRunner.java:122) > at > com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:368) > at > com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:275) > at com.google.cloud.dataflow.sdk.values.PBegin.apply(PBegin.java:48) > at com.google.cloud.dataflow.sdk.Pipeline.apply(Pipeline.java:157) > at > com.google.cloud.dataflow.examples.WordCount.main(WordCount.java:200) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) > at > org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) > at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > jeff@T:~/git/incubator-beam/runners/spark$ spark-submit --class > com.google.cloud.dataflow.examples.WordCount --master yarn-client > target/spark-runner-*-spark-app.jar --inputFile=/kinglear.txt > --output=/test/out --runner=SparkPipelineRunner --sparkMaster=yarn-client > ^Z > [3]+ Stopped spark-submit --class > com.google.cloud.dataflow.examples.WordCount --master yarn-client > target/spark-runner-*-spark-app.jar --inputFile=/kinglear.txt > --output=/test/out --runner=SparkPipelineRunner --sparkMaster=yarn-client > > > >
