Hi all,
Super newb question here - I'm just getting started playing with beam, and
wanted to check out its capabilities to run on Apex. So I tried to follow the
directions here:
https://beam.apache.org/documentation/runners/apex/
<https://beam.apache.org/documentation/runners/apex/>
The directions were a little vague around using a file on hdfs "(example
project needs to be modified to include HDFS file provider)"
So I removed this line:
p.apply("ReadLines", TextIO.Read.from(options.getInputFile()))
and replaced it with these lines:
HDFSFileSource<String, LongWritable, Text> source =
HDFSFileSource.fromText(options.getInputFile());
p.apply(Read.from(source))
in the WordCount.java example class (with corresponding pom changes to pull in
the requisite dependencies).
and ended up running into:
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: Unable to find any files matching
/tmp/input/pom.xml
at
org.apache.beam.sdks.java.io.hdfs.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:518)
at
org.apache.beam.sdk.io.hdfs.HDFSFileSource$7.run(HDFSFileSource.java:346)
at
org.apache.beam.sdk.io.hdfs.HDFSFileSource$7.run(HDFSFileSource.java:339)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at
org.apache.beam.sdk.io.hdfs.HDFSFileSource.validate(HDFSFileSource.java:339)
at org.apache.beam.sdk.io.Read$Bounded.expand(Read.java:104)
at org.apache.beam.sdk.io.Read$Bounded.expand(Read.java:89)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:488)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:402)
at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:47)
at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:161)
at org.apache.beam.examples.WordCount.main(WordCount.java:186)
... 6 more
My assumption is that this is because it was looking locally (rather than in
HDFS) for my pom file, so I changed my input to explicitly point at hdfs, like:
`--inputFile=hdfs:///user/sean.story/pom.xml`
<hdfs:///user/sean.story/pom.xml%60>
which made me get this error:
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: java.io.IOException: No FileSystem for
scheme: hdfs
at
org.apache.beam.sdk.io.hdfs.HDFSFileSource.validate(HDFSFileSource.java:353)
at org.apache.beam.sdk.io.Read$Bounded.expand(Read.java:104)
at org.apache.beam.sdk.io.Read$Bounded.expand(Read.java:89)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:488)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:402)
at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:47)
at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:161)
at org.apache.beam.examples.WordCount.main(WordCount.java:186)
... 6 more
Caused by: java.io.IOException: No FileSystem for scheme: hdfs
at
org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2644)
at
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2651)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:92)
at
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2687)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2669)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:371)
at
org.apache.beam.sdk.io.hdfs.HDFSFileSource$7.run(HDFSFileSource.java:343)
at
org.apache.beam.sdk.io.hdfs.HDFSFileSource$7.run(HDFSFileSource.java:339)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at
org.apache.beam.sdk.io.hdfs.HDFSFileSource.validate(HDFSFileSource.java:339)
... 13 more
is it immediately obvious to anyone what I'm doing wrong? I looked in the
HDFSFileSource's tests for an example, but they're all just pulling local files
(duh, because the tests run on the local FS). Pointers towards other examples
or docs would be greatly appreciated.
Thanks,
Sean Story