Hi Juan

This sounds reminiscent of https://issues.apache.org/jira/browse/BEAM-2277
which we believed fixed in 2.7.0.
What IO are you using to write your files and can you paste a snippet of
your code please?

On BEAM-2277 I posted a workaround for AvroIO (it might help you find a
workaround too):

 transform.apply("Write",
        AvroIO.writeGenericRecords(schema)
            .to(FileSystems.matchNewResource(options.getTarget(),true))
            // BEAM-2277 workaround
.withTempDirectory(FileSystems.matchNewResource("hdfs://ha-nn/tmp/beam-avro",
true)));


Thanks
Tim


On Fri, Oct 26, 2018 at 11:47 AM Juan Carlos Garcia <jcgarc...@gmail.com>
wrote:

> Hi Folks,
>
> I have a strange situation while running beam 2.7.0 with the FlinkRunner,
> my setup consist of a HA Flink cluster (1.5.4) writing to HDFS its
> checkpoint. Flink is able to correctly writes its checkpoint / savepoint to
> HDFS without any problems.
>
> However, my pipeline has to write to HDFS as well, but fails with "Caused
> by: java.lang.IllegalArgumentException: No filesystem found for scheme hdfs"
> (stacktrace at the bottom)
>
> In the host where the pipeline is running:
> 1. The environment variable HADOOP_CONF_DIR is set.
> 2. During my pipeline construction i am explicitly calling
> FileSystems.setDefaultPipelineOptions(_options); to trigger the
> ServiceLoader to find all options registrar from the classpath
> 3. If i explore the property SCHEME_TO_FILESYSTEM of the class FileSystems
> in my main method using reflection i am able to see that at launch time it
> contains:
>    {file=org.apache.beam.sdk.io.LocalFileSystem@1941a8ff,
> hdfs=org.apache.beam.sdk.io.hdfs.HadoopFileSystem@22d7b4f8}
>
> Any idea what i am doing wrong with the HDFS integration?
>
> {snippet}
>
> FileSystems.setDefaultPipelineOptions(_context.getPipelineOptions());
>                             Field f =
> FileSystems.class.getDeclaredField("SCHEME_TO_FILESYSTEM");
>                             f.setAccessible(true);
>                             AtomicReference<Map<String, FileSystem>> value
> = (AtomicReference<Map<String, FileSystem>>) f.get(null);
>
> System.out.println("===========================");
>                             System.out.println(value);
> {snippet}
>
> {stacktrace}
> Caused by: java.lang.IllegalArgumentException: No filesystem found for
> scheme hdfs
>         at
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>         at
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>         at
> org.apache.beam.sdk.io.FileIO$Write$ViaFileBasedSink.lambda$new$22b9c623$1(FileIO.java:1293)
>         at
> org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
>         at
> org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
>         at
> org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
>         at
> org.apache.beam.sdk.io.FileBasedSink$Writer.open(FileBasedSink.java:920)
>         at
> org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:715)
>
> {stacktrace}
>
> --
>
> JC
>
>
>
> --
>
> JC
>
>

Reply via email to