Hi Magnus,
Your observation seems to be correct. There is an issue with the file
system registration.
The two types of errors you are seeing, as well as the successful run,
are just due to the different structure of the generated transforms. The
Flink scheduler will distribute them differently, which results in some
pipelines being placed on task managers which happen to execute the
FileSystems initialization code and others not.
There is a quick fix to at least initialize the file system in case it
has not been initialized, by adding the loading code here:
https://github.com/apache/beam/blob/948c6fae909685e09d36b23be643182b34c8df25/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java#L463
However, there we do not have the pipeline options available, which
prevents any configuration. The problem is that the error occurs in the
coder used in a native Flink operation which does not even run user code.
I believe the only way fix this is to ship the FileSystems
initialization code in CoderTypeSerializer where we are sure to execute
it in time for any coders which depend on it.
Could you file an issue? I'd be happy to fix this then.
Thanks,
Max
On 24.09.19 09:54, Chamikara Jayalath wrote:
As Magnus mentioned, FileSystems are picked up from the class path and
registered here.
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java#L480
Seems like Flink is invoking this method at following locations.
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java#L142
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java#L63
I'm not too familiar about Flink sure why S3 is not properly being
registered when running the Flink job. Ccing some folks who are more
familiar about Flink.
+Ankur Goenka <mailto:goe...@google.com> +Maximilian Michels
<mailto:m...@apache.org>
Thanks,
Cham
On Sat, Sep 21, 2019 at 9:18 AM Koprivica,Preston Blake
<preston.b.kopriv...@cerner.com <mailto:preston.b.kopriv...@cerner.com>>
wrote:
Thanks for the reply Magnus.
I'm sorry it wasn't more clear in the original message. I have
added the aws dependencies and set up the pipeline options with the
aws options. For the case where I set the write to ignore
windowing, everything works. But the option is deprecated and the
comments warn against its usage.
I'm wondering if where no options are set and I see the error that
that is a case of improperly initialized filesystems in the flink
runner. Or maybe someone has some different ideas for the culprit.
Get Outlook for Android <https://aka.ms/ghei36>
------------------------------------------------------------------------
*From:* Magnus Runesson <ma...@linuxalert.org
<mailto:ma...@linuxalert.org>>
*Sent:* Saturday, September 21, 2019 9:06:03 AM
*To:* user@beam.apache.org <mailto:user@beam.apache.org>
<user@beam.apache.org <mailto:user@beam.apache.org>>
*Subject:* Re: No filesystem found for scheme s3 using FileIO
Hi!
You probably miss the S3 filesystem in your classpath.
If I remember correctly you must include this
https://search.maven.org/search?q=a:beam-sdks-java-io-amazon-web-services
<https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fsearch.maven.org%2Fsearch%3Fq%3Da%3Abeam-sdks-java-io-amazon-web-services&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7C6dfc70aff6754e8a742d08d73e9ce04e%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637046715865253070&sdata=ahe8GyTjSswDDCXoOimmVx5u%2FckkbGY4gTS4ZVSRv8Q%3D&reserved=0>
package in your classpath/fat-jar.
/Magnus
On 2019-09-19 23:13, Koprivica,Preston Blake wrote:
Hello everyone. I’m getting the following error when attempting to
use the FileIO apis (beam-2.15.0) and integrating with a 3rd party
filesystem, in this case AWS S3:____
__ __
java.lang.IllegalArgumentException: No filesystem found for scheme
s3____
at
org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)____
at
org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)____
at
org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)____
at
org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)____
at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)____
at
org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)____
at
org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)____
at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)____
at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)____
at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)____
at
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)____
at
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)____
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)____
at
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)____
at
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)____
at
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)____
at
org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)____
at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)____
at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)____
at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)____
at java.lang.Thread.run(Thread.java:748)____
__ __
For reference, the write code resembles this:____
__ __
FileIO.Write<?, GenericRecord> write =
FileIO.<GenericRecord>write()____
.via(ParquetIO.sink(schema))____
.to(options.getOutputDir()). // will be something
like: s3://<bucket>/<path>____
.withSuffix(".parquet");____
__ __
records.apply(String.format("Write(%s)", options.getOutputDir()),
write);____
__ __
I have setup the PipelineOptions with all the relevant AWS options
and the issue does not appear to be related to ParquetIO.sink()
directly. I am able to reliably reproduce the issue using JSON
formatted records and TextIO.sink(), as well.____
__ __
Just trying some different knobs, I went ahead and set the
following option:____
__ __
write = write.withNoSpilling();____
__ __
This actually seemed to fix the issue, only to have it reemerge as
I scaled up the data set size. The stack trace, while very
similar, reads:____
__ __
java.lang.IllegalArgumentException: No filesystem found for scheme
s3____
at
org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)____
at
org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)____
at
org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)____
at
org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)____
at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)____
at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)____
at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)____
at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)____
at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)____
at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)____
at
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)____
at
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)____
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)____
at
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)____
at
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)____
at
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)____
at
org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:94)____
at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)____
at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)____
at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)____
at java.lang.Thread.run(Thread.java:748) ____
__ __
I’ll be interested to hear some theories on the
differences/similarities in the stacks. And lastly, I tried
adding the following deprecated option (with and without the
withNoSpilling() option):____
__ __
write = write.withIgnoreWindowing();____
__ __
This seemed to fix the issue altogether but aside from having to
rely on a deprecated feature, there is the bigger issue of why?____
__ __
In reading through some of the source, it seems a common pattern
to have to manually register the pipeline options to seed the
filesystem registry during the setup part of the operator
lifecycle, e.g.:
https://github.com/apache/beam/blob/release-2.15.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L304-L313
<https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Frelease-2.15.0%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2Ftranslation%2Fwrappers%2Fstreaming%2FDoFnOperator.java%23L304-L313&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7C6dfc70aff6754e8a742d08d73e9ce04e%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637046715865253070&sdata=TWbNuwGSmMMscZIW0Iwo8MTZwxipBZMF4Zb0oyHd0do%3D&reserved=0>
____
__ __
Is it possible that I have hit upon a couple scenarios where that
has not taken place? Unfortunately, I’m not yet at a position to
suggest a fix, but I’m guessing there’s some missing
initialization code in one or more of the batch operators. If
this is indeed a legitimate issue, I’ll be happy to log an issue,
but I’ll hold off until the community gets a chance to look at it.____
__ __
Thanks,____
* Preston ____
CONFIDENTIALITY NOTICE This message and any included attachments
are from Cerner Corporation and are intended only for the
addressee. The information contained in this message is
confidential and may constitute inside or non-public information
under international, federal, or state securities laws.
Unauthorized forwarding, printing, copying, distribution, or use
of such information is strictly prohibited and may be unlawful. If
you are not the addressee, please promptly delete this message and
notify the sender of the delivery error by e-mail or you may call
Cerner's corporate offices in Kansas City, Missouri, U.S.A at (+1)
(816)221-1024 <tel:(816)%20221-1024>.