I wrote the same reply as you, but then deleted it again before sending ;) Given Preston's full description and a bit of Flink context, it is pretty much impossible to have anything to do with service files.

The issue comes from a coder FileIO uses. The coder depends on the S3 file system (really any custom file system). If we use a native Flink transformation, e.g. Union, we never get to load the FileSystems code in the current classloader. However, the coder depends on the FileSystems initialization code (which by the way has to be run "manually" because it depends on the pipeline options), so it will error.

Note that FileIO usually consists of a cascade of different transforms for which parts may execute on different machines. That's why we see this error during `decode` on a remote host which had not been initialized yet by one of the other instances of the initialization code. Probably that is the receiving side of a Reshuffle.

Just to proof this theory, do you mind building Beam and testing your pipeline with the following line added before line 75? https://github.com/apache/beam/blob/04dc3c3b14ab780e9736d5f769c6bf2a27a390bb/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java#L75
FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create());

We can later pass in the pipeline options to initialize this properly.

If this is too much, I'll probably give it a try tomorrow using your example code.

Cheers,
Max

On 25.09.19 11:15, Kenneth Knowles wrote:
Are you building a fat jar? I don't know if this is your issue. I don't know Flink's operation in any close detail, and I'm not sure how it relates to what Max has described. But it is a common cause of this kind of error.

The registration of the filesystem is here: https://github.com/apache/beam/blob/master/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemRegistrar.java#L32. This results in the built jar for beam-sdks-java-io-amazon-web-services to have a file META-INF/services/org.apache.beam.sdk.io.FileSystemRegistrar with the line org.apache.beam.sdk.io.aws.s3.S3FileSystemRegistrar

The file META-INF/services/org.apache.beam.sdk.io.FileSystemRegistrar exists in many dependencies, including the core SDK. The default for many fat jar tools (maven shade and gradle shadow) is that they nondeterministically clobber each other, and you have to add a line like "mergeServiceFiles" to your configuration to keep all the registrations.

Kenn

On Wed, Sep 25, 2019 at 8:36 AM Maximilian Michels <m...@apache.org <mailto:m...@apache.org>> wrote:

    Hey Preston,

    I just wrote a reply on the user mailing list. Copying the reply here
    just in case:

    ----

    Your observation seems to be correct. There is an issue with the file
    system registration.

    The two types of errors you are seeing, as well as the successful run,
    are just due to the different structure of the generated transforms.
    The
    Flink scheduler will distribute them differently, which results in some
    pipelines being placed on task managers which happen to execute the
    FileSystems initialization code and others not.

    There is a quick fix to at least initialize the file system in case it
    has not been initialized, by adding the loading code here:
    
https://github.com/apache/beam/blob/948c6fae909685e09d36b23be643182b34c8df25/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java#L463

    However, there we do not have the pipeline options available, which
    prevents any configuration. The problem is that the error occurs in the
    coder used in a native Flink operation which does not even run user
    code.

    I believe the only way fix this is to ship the FileSystems
    initialization code in CoderTypeSerializer where we are sure to execute
    it in time for any coders which depend on it.

    Could you file an issue? I'd be happy to fix this then.

    Thanks,
    Max

    ----

    On 23.09.19 08:04, Koprivica,Preston Blake wrote:
     > Hello everyone. This is a cross-post from the users list.  It
    didn’t get
     > much traction, so I thought I’d move over to the dev group, since
    this
     > seems like it might be a an issue with initialization in the
    FlinkRunner
     > (apologies in advance if I missed something silly).
     >
     > I’m getting the following error when attempting to use the FileIO
    apis
     > (beam-2.15.0) and integrating with AWS S3.  I have setup the
     > PipelineOptions with all the relevant AWS options, so the filesystem
     > registry **should* *be properly seeded by the time the graph is
    compiled
     > and executed:
     >
     > java.lang.IllegalArgumentException: No filesystem found for scheme s3
     >
     >      at
     > org.apache.beam.sdk.io
    
<http://org.apache.beam.sdk.io>.FileSystems.getFileSystemInternal(FileSystems.java:456)
     >
     >      at
     > org.apache.beam.sdk.io
    
<http://org.apache.beam.sdk.io>.FileSystems.matchNewResource(FileSystems.java:526)
     >
     >      at
     > org.apache.beam.sdk.io
    
<http://org.apache.beam.sdk.io>.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
     >
     >      at
     > org.apache.beam.sdk.io
    
<http://org.apache.beam.sdk.io>.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
     >
     >      at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
     >
     >      at
     >
    org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)
     >
     >      at
     >
    org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)
     >
     >      at
     >
    
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
     >
     >      at
     >
    
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
     >
     >      at
     >
    
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
     >
     >      at
     >
    
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
     >
     >      at
     >
    
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
     >
     >      at
     > org.apache.flink.runtime.io
    
<http://org.apache.flink.runtime.io>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
     >
     >      at
     > org.apache.flink.runtime.io
    
<http://org.apache.flink.runtime.io>.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
     >
     >      at
     > org.apache.flink.runtime.io
    
<http://org.apache.flink.runtime.io>.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
     >
     >      at
     >
    
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
     >
     >      at
     >
    org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)
     >
     >      at
    org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
     >
     >      at
     >
    org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
     >
     >      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
     >
     >      at java.lang.Thread.run(Thread.java:748)
     >
     > For reference, the write code resembles this:
     >
     > FileIO.Write<?, GenericRecord> write = FileIO.<GenericRecord>write()
     >
     >                  .via(ParquetIO.sink(schema))
     >
     >                  .to(options.getOutputDir()). // will be
    something like:
     > s3://<bucket>/<path>
     >
     >                  .withSuffix(".parquet");
     >
     > records.apply(String.format("Write(%s)", options.getOutputDir()),
    write);
     >
     > The issue does not appear to be related to ParquetIO.sink().  I
    am able
     > to reliably reproduce the issue using JSON formatted records and
     > TextIO.sink(), as well.
     >
     > Just trying some different knobs, I went ahead and set the
    following option:
     >
     >          write = write.withNoSpilling();
     >
     > This actually seemed to fix the issue, only to have it reemerge as I
     > scaled up the data set size.  The stack trace, while very
    similar, reads:
     >
     > java.lang.IllegalArgumentException: No filesystem found for scheme s3
     >
     >      at
     > org.apache.beam.sdk.io
    
<http://org.apache.beam.sdk.io>.FileSystems.getFileSystemInternal(FileSystems.java:456)
     >
     >      at
     > org.apache.beam.sdk.io
    
<http://org.apache.beam.sdk.io>.FileSystems.matchNewResource(FileSystems.java:526)
     >
     >      at
     > org.apache.beam.sdk.io
    
<http://org.apache.beam.sdk.io>.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
     >
     >      at
     > org.apache.beam.sdk.io
    
<http://org.apache.beam.sdk.io>.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
     >
     >      at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
     >
     >      at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
     >
     >      at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
     >
     >      at
     >
    
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
     >
     >      at
     >
    
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
     >
     >      at
     >
    
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
     >
     >      at
     >
    
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
     >
     >      at
     >
    
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
     >
     >      at
     > org.apache.flink.runtime.io
    
<http://org.apache.flink.runtime.io>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
     >
     >      at
     > org.apache.flink.runtime.io
    
<http://org.apache.flink.runtime.io>.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
     >
     >      at
     > org.apache.flink.runtime.io
    
<http://org.apache.flink.runtime.io>.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
     >
     >      at
     >
    
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
     >
     >      at
     > org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:94)
     >
     >      at
    org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
     >
     >      at
     >
    org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
     >
     >      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
     >
     >      at java.lang.Thread.run(Thread.java:748)
     >
     > I’ll be interested to hear some theories on the
    differences/similarities
     > in the stacks.  And lastly, I tried adding the following deprecated
     > option (with and without the withNoSpilling() option):
     >
     > write = write.withIgnoreWindowing();
     >
     > This seemed to fix the issue altogether but aside from having to
    rely on
     > a deprecated feature, there is the bigger issue of why?
     >
     > In reading through some of the source, it seems a common pattern
    to have
     > to manually register the pipeline options to seed the filesystem
     > registry during the setup part of the operator lifecycle,
     >
    
e.g.:https://github.com/apache/beam/blob/release-2.15.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L304-L313

     >
    
<https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Frelease-2.15.0%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2Ftranslation%2Fwrappers%2Fstreaming%2FDoFnOperator.java%23L304-L313&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Cca33cc0168834f2d327708d73d463c69%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637045244185938359&sdata=lEH5yuFU3L%2FzT7Qy8m1pTahFG%2FH20AUh9rfQjVfYijI%3D&reserved=0>

     >
     >
     > Is it possible that I have hit upon a couple scenarios where that
    has
     > not taken place?  Unfortunately, I’m not yet at a position to
    suggest a
     > fix, but I’m guessing there’s some missing initialization code in
    one or
     > more of the batch operators.  If this is indeed a legitimate
    issue, I’ll
     > be happy to log an issue, but I’ll hold off until the community
    gets a
     > chance to look at it.
     >
     > Thanks,
     >
     >   * Preston
     >
     > CONFIDENTIALITY NOTICE This message and any included attachments are
     > from Cerner Corporation and are intended only for the addressee. The
     > information contained in this message is confidential and may
    constitute
     > inside or non-public information under international, federal, or
    state
     > securities laws. Unauthorized forwarding, printing, copying,
     > distribution, or use of such information is strictly prohibited
    and may
     > be unlawful. If you are not the addressee, please promptly delete
    this
     > message and notify the sender of the delivery error by e-mail or
    you may
     > call Cerner's corporate offices in Kansas City, Missouri, U.S.A
    at (+1)
     > (816)221-1024 <tel:(816)%20221-1024>.
     >

Reply via email to