Hi all,

Super newb question here - I'm just getting started playing with beam, and 
wanted to check out its capabilities to run on Apex. So I tried to follow the 
directions here:
https://beam.apache.org/documentation/runners/apex/ 
<https://beam.apache.org/documentation/runners/apex/>

The directions were a little vague around using a file on hdfs "(example 
project needs to be modified to include HDFS file provider)"
So I removed this line:
p.apply("ReadLines", TextIO.Read.from(options.getInputFile()))
and replaced it with these lines:
HDFSFileSource<String, LongWritable, Text> source = 
HDFSFileSource.fromText(options.getInputFile());
p.apply(Read.from(source))
in the WordCount.java example class (with corresponding pom changes to pull in 
the requisite dependencies).

and ended up running into:
java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: Unable to find any files matching 
/tmp/input/pom.xml
        at 
org.apache.beam.sdks.java.io.hdfs.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:518)
        at 
org.apache.beam.sdk.io.hdfs.HDFSFileSource$7.run(HDFSFileSource.java:346)
        at 
org.apache.beam.sdk.io.hdfs.HDFSFileSource$7.run(HDFSFileSource.java:339)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
        at 
org.apache.beam.sdk.io.hdfs.HDFSFileSource.validate(HDFSFileSource.java:339)
        at org.apache.beam.sdk.io.Read$Bounded.expand(Read.java:104)
        at org.apache.beam.sdk.io.Read$Bounded.expand(Read.java:89)
        at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:488)
        at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:402)
        at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:47)
        at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:161)
        at org.apache.beam.examples.WordCount.main(WordCount.java:186)
        ... 6 more


My assumption is that this is because it was looking locally (rather than in 
HDFS) for my pom file, so I changed my input to explicitly point at hdfs, like:
`--inputFile=hdfs:///user/sean.story/pom.xml` 
<hdfs:///user/sean.story/pom.xml%60>

which made me get this error:

java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: java.io.IOException: No FileSystem for 
scheme: hdfs
        at 
org.apache.beam.sdk.io.hdfs.HDFSFileSource.validate(HDFSFileSource.java:353)
        at org.apache.beam.sdk.io.Read$Bounded.expand(Read.java:104)
        at org.apache.beam.sdk.io.Read$Bounded.expand(Read.java:89)
        at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:488)
        at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:402)
        at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:47)
        at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:161)
        at org.apache.beam.examples.WordCount.main(WordCount.java:186)
        ... 6 more
Caused by: java.io.IOException: No FileSystem for scheme: hdfs
        at 
org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2644)
        at 
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2651)
        at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:92)
        at 
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2687)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2669)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:371)
        at 
org.apache.beam.sdk.io.hdfs.HDFSFileSource$7.run(HDFSFileSource.java:343)
        at 
org.apache.beam.sdk.io.hdfs.HDFSFileSource$7.run(HDFSFileSource.java:339)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
        at 
org.apache.beam.sdk.io.hdfs.HDFSFileSource.validate(HDFSFileSource.java:339)
        ... 13 more

is it immediately obvious to anyone what I'm doing wrong? I looked in the 
HDFSFileSource's tests for an example, but they're all just pulling local files 
(duh, because the tests run on the local FS). Pointers towards other examples 
or docs would be greatly appreciated.

Thanks,

Sean Story


Reply via email to