Hi Beam Team,
I have a data pipeline (Beam on Flink running on YARN on AWS EMR), which
reads some data and does a simple filtering operation and writes the
data to data source S3.
*Components and Versions:*
- Beam: 2.16.0 (branch: release-2.16.0)
- Flink: 1.8
- YARN on AWS EMR: emr-5.26.0
Below is a snippet of code
PCollection<SomeType> someTypes = pipeline.apply(new ReadLatestSomeType());
PCollection<SomeTypeValue> someTypesOutput =
someTypes.apply(
Filter.by(
someTypeElement -> {
if (some condition) {
return false;
}
return true;
}));
someTypesOutput
.apply(
AvroIO.write(SomeType.class).to(options.getDestination().get()).withOutputFilenames())
.getPerDestinationOutputFilenames();
Below is the exception, I see on Flink job
java.lang.IllegalArgumentException: No filesystem foundfor scheme s3
at
org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:463)
at
org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:533)
at
org.apache.beam.sdk.io.fs.ResourceIdCoder.decode(ResourceIdCoder.java:49)
at
org.apache.beam.sdk.io.fs.MetadataCoder.decodeBuilder(MetadataCoder.java:62)
at org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:58)
at org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:36)
at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:592)
at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:583)
at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:529)
at
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
at
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:106)
My Beam pipeline is failing randomly with the exception I have listed
above. I saw that the fix was made available on
the branch: release-2.16.0 (comment on JIRA:
https://issues.apache.org/jira/browse/BEAM-8303) which I have checked
out locally. I did a custom build of the Apache/Beam
(branch: release-2.16.0) and made sure the artifacts were published to
the local maven repository so that I can use the artifacts on my
project. I imported Beam artifacts (2.16.0-MG-SNAPSHOT) in my project
and build my project. Once I had the compiled JAR, I kicked off Flink
jobs on an AWS EMR cluster. The Flink job will give fail/success
randomly (non-deterministic), without any change in the way I run the
command, or re-building my artifact.
I have noticed if I run the Flink Job with the parallelism of 1, it will
not fail, but if I run the same job with the parallelism of 5 it can
fail or succeed.
If anyone can please help me out or give me directions, I could try out,
it will be greatly appreciated.
Thanks.
- Maulik
On Wed, Sep 25, 2019 at 10:53 AM Koprivica,Preston Blake
<[email protected] <mailto:[email protected]>>
wrote:
Not a problem! Thanks for looking into this. In reading through
the source associated with the stacktrace, I also noticed that
there's neither user-code, nor beam-to-flink lifecycle code
available for initialization. As far as I could tell, it was pure
flink down to the coders. Nothing new here, but maybe it bolsters
confidence in your diagnosis. I went ahead and logged an issue
here: https://issues.apache.org/jira/browse/BEAM-8303.
Let me know what I can do to help - I'm happy to test/verify any
fixes you want to try and review any code (bearing in mind I'm a
total newb in the beam space).
Thanks again,
Preston
On 9/25/19, 10:34 AM, "Maximilian Michels" <[email protected]
<mailto:[email protected]>> wrote:
Hi Preston,
Sorry about the name mixup, of course I meant to write Preston not
Magnus :) See my reply below.
cheers,
Max
On 25.09.19 08:31, Maximilian Michels wrote:
> Hi Magnus,
>
> Your observation seems to be correct. There is an issue with
the file
> system registration.
>
> The two types of errors you are seeing, as well as the
successful run,
> are just due to the different structure of the generated
transforms. The
> Flink scheduler will distribute them differently, which
results in some
> pipelines being placed on task managers which happen to
execute the
> FileSystems initialization code and others not.
>
> There is a quick fix to at least initialize the file system
in case it
> has not been initialized, by adding the loading code here:
>
https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2F948c6fae909685e09d36b23be643182b34c8df25%2Fsdks%2Fjava%2Fcore%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2FFileSystems.java%23L463&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&sdata=n9zr0jMao6oGcSK0G3QSPcHbcfdZlUAGwKhWCpdKT6Y%3D&reserved=0
>
>
> However, there we do not have the pipeline options available,
which
> prevents any configuration. The problem is that the error
occurs in the
> coder used in a native Flink operation which does not even
run user code.
>
> I believe the only way fix this is to ship the FileSystems
> initialization code in CoderTypeSerializer where we are sure
to execute
> it in time for any coders which depend on it.
>
> Could you file an issue? I'd be happy to fix this then.
>
> Thanks,
> Max
>
> On 24.09.19 09:54, Chamikara Jayalath wrote:
>> As Magnus mentioned, FileSystems are picked up from the
class path and
>> registered here.
>>
https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Fmaster%2Fsdks%2Fjava%2Fcore%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2FFileSystems.java%23L480&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&sdata=RPVRS548zo%2FWdf672K%2FKbqJykjf%2FuWyS84CirzR4b0E%3D&reserved=0
>>
>>
>> Seems like Flink is invoking this method at following locations.
>>
https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Fmaster%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2FFlinkPipelineRunner.java%23L142&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&sdata=jiMBDlzvSX2rLY9tZOLtt6QKAmCPTiglWezLKymxJAc%3D&reserved=0
>>
>>
https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Fmaster%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2FFlinkJobServerDriver.java%23L63&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&sdata=7hBfpu2mrAltIFCiq98hx5kp%2BLN8XYnavbU%2FKzHudnI%3D&reserved=0
>>
>>
>> I'm not too familiar about Flink sure why S3 is not properly
being
>> registered when running the Flink job. Ccing some folks who
are more
>> familiar about Flink.
>>
>> +Ankur Goenka <mailto:[email protected]
<mailto:[email protected]>> +Maximilian Michels
>> <mailto:[email protected] <mailto:[email protected]>>
>>
>> Thanks,
>> Cham
>>
>>
>> On Sat, Sep 21, 2019 at 9:18 AM Koprivica,Preston Blake
>> <[email protected]
<mailto:[email protected]>
>> <mailto:[email protected]
<mailto:[email protected]>>> wrote:
>>
>> Thanks for the reply Magnus.
>>
>> I'm sorry it wasn't more clear in the original message.
I have
>> added the aws dependencies and set up the pipeline
options with the
>> aws options. For the case where I set the write to ignore
>> windowing, everything works. But the option is
deprecated and the
>> comments warn against its usage.
>>
>> I'm wondering if where no options are set and I see the
error that
>> that is a case of improperly initialized filesystems in
the flink
>> runner. Or maybe someone has some different ideas for
the culprit.
>>
>> Get Outlook for Android
<https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Faka.ms%2Fghei36&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&sdata=AGwyaD%2Bn8MQJlHqYjsefmUDcrgMifcFUVx%2BR%2BOty5Xo%3D&reserved=0>
>>
>>
>>
------------------------------------------------------------------------
>> *From:* Magnus Runesson <[email protected]
<mailto:[email protected]>
>> <mailto:[email protected] <mailto:[email protected]>>>
>> *Sent:* Saturday, September 21, 2019 9:06:03 AM
>> *To:* [email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>
>> <[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>>
>> *Subject:* Re: No filesystem found for scheme s3 using
FileIO
>>
>> Hi!
>>
>>
>> You probably miss the S3 filesystem in your classpath.
>>
>> If I remember correctly you must include this
>>
>>
https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fsearch.maven.org%2Fsearch%3Fq%3Da%3Abeam-sdks-java-io-amazon-web-services&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&sdata=K%2FGNviln25aCjv1biaVakcPpvGEgmAzB%2FggezNGiSOo%3D&reserved=0
>>
>>
<https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fsearch.maven.org%2Fsearch%3Fq%3Da%3Abeam-sdks-java-io-amazon-web-services&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680792233&sdata=ZLJ7QmRWC8XkouQ7r47Ze%2Fy2pxN%2Bcttoi70ilOsJNG0%3D&reserved=0>
>>
>> package in your classpath/fat-jar.
>>
>> /Magnus
>>
>> On 2019-09-19 23:13, Koprivica,Preston Blake wrote:
>>>
>>> Hello everyone. I’m getting the following error when
attempting to
>>> use the FileIO apis (beam-2.15.0) and integrating with
a 3rd party
>>> filesystem, in this case AWS S3:____
>>>
>>> __ __
>>>
>>> java.lang.IllegalArgumentException: No filesystem found
for scheme
>>> s3____
>>>
>>> at
>>>
>>> org.apache.beam.sdk.io
<http://org.apache.beam.sdk.io>.FileSystems.getFileSystemInternal(FileSystems.java:456)____
>>>
>>>
>>> at
>>>
>>> org.apache.beam.sdk.io
<http://org.apache.beam.sdk.io>.FileSystems.matchNewResource(FileSystems.java:526)____
>>>
>>>
>>> at
>>>
>>> org.apache.beam.sdk.io
<http://org.apache.beam.sdk.io>.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)____
>>>
>>>
>>> at
>>>
>>> org.apache.beam.sdk.io
<http://org.apache.beam.sdk.io>.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)____
>>>
>>>
>>> at
org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)____
>>>
>>> at
>>>
>>>
org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)____
>>>
>>>
>>> at
>>>
>>>
org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)____
>>>
>>>
>>> at
>>>
>>>
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)____
>>>
>>>
>>> at
>>>
>>>
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)____
>>>
>>>
>>> at
>>>
>>>
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)____
>>>
>>>
>>> at
>>>
>>>
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)____
>>>
>>>
>>> at
>>>
>>>
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)____
>>>
>>>
>>> at
>>>
>>> org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)____
>>>
>>>
>>> at
>>>
>>> org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)____
>>>
>>>
>>> at
>>>
>>> org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)____
>>>
>>>
>>> at
>>>
>>>
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)____
>>>
>>>
>>> at
>>>
>>>
org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)____
>>>
>>>
>>> at
>>>
>>>
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)____
>>>
>>> at
>>>
>>>
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)____
>>>
>>>
>>> at
>>>
org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)____
>>>
>>> at java.lang.Thread.run(Thread.java:748)____
>>>
>>> __ __
>>>
>>> For reference, the write code resembles this:____
>>>
>>> __ __
>>>
>>> FileIO.Write<?, GenericRecord> write =
>>> FileIO.<GenericRecord>write()____
>>>
>>> .via(ParquetIO.sink(schema))____
>>>
>>> .to(options.getOutputDir()). // will be
something
>>> like: s3://<bucket>/<path>____
>>>
>>> .withSuffix(".parquet");____
>>>
>>> __ __
>>>
>>> records.apply(String.format("Write(%s)",
options.getOutputDir()),
>>> write);____
>>>
>>> __ __
>>>
>>> I have setup the PipelineOptions with all the relevant
AWS options
>>> and the issue does not appear to be related to
ParquetIO.sink()
>>> directly. I am able to reliably reproduce the issue
using JSON
>>> formatted records and TextIO.sink(), as well.____
>>>
>>> __ __
>>>
>>> Just trying some different knobs, I went ahead and set the
>>> following option:____
>>>
>>> __ __
>>>
>>> write = write.withNoSpilling();____
>>>
>>> __ __
>>>
>>> This actually seemed to fix the issue, only to have it
reemerge as
>>> I scaled up the data set size. The stack trace, while very
>>> similar, reads:____
>>>
>>> __ __
>>>
>>> java.lang.IllegalArgumentException: No filesystem found
for scheme
>>> s3____
>>>
>>> at
>>>
>>> org.apache.beam.sdk.io
<http://org.apache.beam.sdk.io>.FileSystems.getFileSystemInternal(FileSystems.java:456)____
>>>
>>>
>>> at
>>>
>>> org.apache.beam.sdk.io
<http://org.apache.beam.sdk.io>.FileSystems.matchNewResource(FileSystems.java:526)____
>>>
>>>
>>> at
>>>
>>> org.apache.beam.sdk.io
<http://org.apache.beam.sdk.io>.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)____
>>>
>>>
>>> at
>>>
>>> org.apache.beam.sdk.io
<http://org.apache.beam.sdk.io>.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)____
>>>
>>>
>>> at
org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)____
>>>
>>> at
>>> org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)____
>>>
>>> at
>>> org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)____
>>>
>>> at
>>>
>>>
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)____
>>>
>>>
>>> at
>>>
>>>
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)____
>>>
>>>
>>> at
>>>
>>>
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)____
>>>
>>>
>>> at
>>>
>>>
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)____
>>>
>>>
>>> at
>>>
>>>
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)____
>>>
>>>
>>> at
>>>
>>> org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)____
>>>
>>>
>>> at
>>>
>>> org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)____
>>>
>>>
>>> at
>>>
>>> org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)____
>>>
>>>
>>> at
>>>
>>>
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)____
>>>
>>>
>>> at
>>>
>>>
org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:94)____
>>>
>>>
>>> at
>>>
>>>
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)____
>>>
>>> at
>>>
>>>
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)____
>>>
>>>
>>> at
>>>
org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)____
>>>
>>> at java.lang.Thread.run(Thread.java:748) ____
>>>
>>> __ __
>>>
>>> I’ll be interested to hear some theories on the
>>> differences/similarities in the stacks. And lastly, I
tried
>>> adding the following deprecated option (with and
without the
>>> withNoSpilling() option):____
>>>
>>> __ __
>>>
>>> write = write.withIgnoreWindowing();____
>>>
>>> __ __
>>>
>>> This seemed to fix the issue altogether but aside from
having to
>>> rely on a deprecated feature, there is the bigger issue
of why?____
>>>
>>> __ __
>>>
>>> In reading through some of the source, it seems a
common pattern
>>> to have to manually register the pipeline options to
seed the
>>> filesystem registry during the setup part of the operator
>>> lifecycle, e.g.:
>>>
>>>
https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Frelease-2.15.0%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2Ftranslation%2Fwrappers%2Fstreaming%2FDoFnOperator.java%23L304-L313&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680792233&sdata=j1axauYJ3SxfmPrYW1rgwiGxv3mclE75Sf5PpuzpxFc%3D&reserved=0
>>>
>>>
>>>
<https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Frelease-2.15.0%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2Ftranslation%2Fwrappers%2Fstreaming%2FDoFnOperator.java%23L304-L313&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680792233&sdata=j1axauYJ3SxfmPrYW1rgwiGxv3mclE75Sf5PpuzpxFc%3D&reserved=0>
>>>
>>> ____
>>>
>>> __ __
>>>
>>> Is it possible that I have hit upon a couple scenarios
where that
>>> has not taken place? Unfortunately, I’m not yet at a
position to
>>> suggest a fix, but I’m guessing there’s some missing
>>> initialization code in one or more of the batch
operators. If
>>> this is indeed a legitimate issue, I’ll be happy to log
an issue,
>>> but I’ll hold off until the community gets a chance to
look at
>>> it.____
>>>
>>> __ __
>>>
>>> Thanks,____
>>>
>>> * Preston ____
>>>
>>> CONFIDENTIALITY NOTICE This message and any included
attachments
>>> are from Cerner Corporation and are intended only for the
>>> addressee. The information contained in this message is
>>> confidential and may constitute inside or non-public
information
>>> under international, federal, or state securities laws.
>>> Unauthorized forwarding, printing, copying,
distribution, or use
>>> of such information is strictly prohibited and may be
unlawful. If
>>> you are not the addressee, please promptly delete this
message and
>>> notify the sender of the delivery error by e-mail or
you may call
>>> Cerner's corporate offices in Kansas City, Missouri,
U.S.A at (+1)
>>> (816)221-1024 <tel:(816)%20221-1024>.
>>>
CONFIDENTIALITY NOTICE This message and any included attachments are
from Cerner Corporation and are intended only for the addressee. The
information contained in this message is confidential and may
constitute inside or non-public information under international,
federal, or state securities laws. Unauthorized forwarding,
printing, copying, distribution, or use of such information is
strictly prohibited and may be unlawful. If you are not the
addressee, please promptly delete this message and notify the sender
of the delivery error by e-mail or you may call Cerner's corporate
offices in Kansas City, Missouri, U.S.A at (+1) (816)221-1024.