Hi Folks,
I have a strange situation while running beam 2.7.0 with the FlinkRunner,
my setup consist of a HA Flink cluster (1.5.4) writing to HDFS its
checkpoint. Flink is able to correctly writes its checkpoint / savepoint to
HDFS without any problems.
However, my pipeline has to write to HDFS as well, but fails with "Caused
by: java.lang.IllegalArgumentException: No filesystem found for scheme hdfs"
(stacktrace at the bottom)
In the host where the pipeline is running:
1. The environment variable HADOOP_CONF_DIR is set.
2. During my pipeline construction i am explicitly calling
FileSystems.setDefaultPipelineOptions(_options); to trigger the
ServiceLoader to find all options registrar from the classpath
3. If i explore the property SCHEME_TO_FILESYSTEM of the class FileSystems
in my main method using reflection i am able to see that at launch time it
contains:
{file=org.apache.beam.sdk.io.LocalFileSystem@1941a8ff,
hdfs=org.apache.beam.sdk.io.hdfs.HadoopFileSystem@22d7b4f8}
Any idea what i am doing wrong with the HDFS integration?
{snippet}
FileSystems.setDefaultPipelineOptions(_context.getPipelineOptions());
Field f =
FileSystems.class.getDeclaredField("SCHEME_TO_FILESYSTEM");
f.setAccessible(true);
AtomicReference<Map<String, FileSystem>> value
= (AtomicReference<Map<String, FileSystem>>) f.get(null);
System.out.println("===========================");
System.out.println(value);
{snippet}
{stacktrace}
Caused by: java.lang.IllegalArgumentException: No filesystem found for
scheme hdfs
at
org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
at
org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
at
org.apache.beam.sdk.io.FileIO$Write$ViaFileBasedSink.lambda$new$22b9c623$1(FileIO.java:1293)
at
org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
at
org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
at
org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
at
org.apache.beam.sdk.io.FileBasedSink$Writer.open(FileBasedSink.java:920)
at
org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:715)
{stacktrace}
--
JC
--
JC