[ 
https://issues.apache.org/jira/browse/BEAM-8303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16940742#comment-16940742
 ] 

Maximilian Michels commented on BEAM-8303:
------------------------------------------

I managed to reproduce the problem on a single node (1 TM , 1 JM) Flink cluster 
with a custom File system:
{noformat}
FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
options.setRunner(FlinkRunner.class);

Pipeline pipeline = Pipeline.create(options);

PCollection<String> input = pipeline.apply(Create.of("hello", "world"));

FileIO.Write<?, String> write = FileIO.<String>write()
    .via(TextIO.sink())
    .to("max://this/does/not/exist")
    .withSuffix(".txt");
input.apply(write);

pipeline.run();{noformat}

This gives me:
{noformat}
Caused by: java.lang.IllegalArgumentException: No filesystem found for scheme 
max
        at 
org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:463)
        at 
org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:533)
        at 
org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
        at 
org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
        at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
        at 
org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)
        at 
org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)
        at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:592)
        at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:583)
        at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:529)
        at 
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
        at 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
        at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
        at 
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
        at 
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
        at 
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
        at 
org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
        at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:745)
{noformat}

Note that this one does not fail:

{noformat}
    
input.apply(TextIO.write().to("max://this/does/not/exist").withSuffix(".txt"));
{noformat}

The cause of this is what I suspected before. The coder ({{FileResultCoder}}) 
accesses the custom file system before the operator, which receives the data 
from the coder, initializes it. Now, we had previously assumed that coders 
would not access file systems. Coders should be operating on a lower level than 
the user code.

In Flink, all operators are pipelined, i.e. a parallel instance of each 
operator runs in the same task slot. They do not share the same class loader, 
but each operator, unless chained (aka fused), has its own classloader. If we 
want to access the FileSystems code, we need to initialize it for each of those 
operators.

The "easy" solution would be to not let the coder use the file system and defer 
file resolution until later, but I suppose it is a fair assumption that the 
file system code is always initialized when Beam code runs.

Alternatively, from the Flink Runner side, we can make sure to always 
initialize the FileSystems even if we do not run user code in the operator.




> Filesystems not properly registered using FileIO.write()
> --------------------------------------------------------
>
>                 Key: BEAM-8303
>                 URL: https://issues.apache.org/jira/browse/BEAM-8303
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-core
>    Affects Versions: 2.15.0
>            Reporter: Preston Koprivica
>            Assignee: Maximilian Michels
>            Priority: Major
>
> I’m getting the following error when attempting to use the FileIO apis 
> (beam-2.15.0) and integrating with AWS S3.  I have setup the PipelineOptions 
> with all the relevant AWS options, so the filesystem registry **should** be 
> properly seeded by the time the graph is compiled and executed:
> {code:java}
>  java.lang.IllegalArgumentException: No filesystem found for scheme s3
>     at 
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>     at 
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
>     at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>     at 
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)
>     at 
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
>     at 
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
>     at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
>     at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
>     at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>     at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>     at 
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)
>     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>     at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>     at java.lang.Thread.run(Thread.java:748)
>  {code}
> For reference, the write code resembles this:
> {code:java}
>  FileIO.Write<?, GenericRecord> write = FileIO.<GenericRecord>write()
>                 .via(ParquetIO.sink(schema))
>                 .to(options.getOutputDir()). // will be something like: 
> s3://<bucket>/<path>
>                 .withSuffix(".parquet");
> records.apply(String.format("Write(%s)", options.getOutputDir()), 
> write);{code}
> The issue does not appear to be related to ParquetIO.sink().  I am able to 
> reliably reproduce the issue using JSON formatted records and TextIO.sink(), 
> as well.  Moreover, AvroIO is affected if withWindowedWrites() option is 
> added.
> Just trying some different knobs, I went ahead and set the following option:
> {code:java}
> write = write.withNoSpilling();{code}
> This actually seemed to fix the issue, only to have it reemerge as I scaled 
> up the data set size.  The stack trace, while very similar, reads:
> {code:java}
>  java.lang.IllegalArgumentException: No filesystem found for scheme s3
>     at 
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>     at 
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
>     at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>     at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
>     at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
>     at 
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
>     at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
>     at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
>     at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>     at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>     at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:94)
>     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>     at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>     at java.lang.Thread.run(Thread.java:748){code}
>  
> And lastly, I tried adding the following deprecated option (with and without 
> the withNoSpilling() option):
> {code:java}
>  write = write.withIgnoreWindowing(); {code}
> This seemed to fix the issue altogether but aside from having to rely on a 
> deprecated feature, there is the bigger issue of why?
>  
> In reading through some of the source, it seems a common pattern to have to 
> manually register the pipeline options to seed the filesystem registry during 
> the setup part of the operator lifecycle, e.g.: 
> [https://github.com/apache/beam/blob/release-2.15.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L304-L313|https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Frelease-2.15.0%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2Ftranslation%2Fwrappers%2Fstreaming%2FDoFnOperator.java%23L304-L313&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7C024bc6b438914e7351c008d74037641d%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637048478964357677&sdata=iGNAsktzEA9T2hlKQ4e3oscwL8xLQFuCZ6hsGHQb1So%3D&reserved=0]
>    
>  
> Is it possible that I have hit upon a couple scenarios where that has not 
> taken place?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to