Re: Intermittent No FileSystem found exception

2019-10-25 Thread Koprivica,Preston Blake
I misspoke on the temporary workaround.  Should use  #withIgnoreWindowing() 
option on FileIO.

On 10/25/19, 11:33 AM, "Maximilian Michels"  wrote:

Hi Maulik,

Thanks for reporting. As Preston already pointed out, this is fixed in
the upcoming 2.17.0 release.

Thanks,
Max

On 24.10.19 15:20, Koprivica,Preston Blake wrote:
> Hi Maulik,
>
> I believe you may be witnessing this issue:
> 
https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-8303data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7C36be5764350d4c7c759d08d759690487%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C1%7C637076179896614383sdata=au%2BJBvQQRhGFDHnkBE0%2BNBPvTU3pMsp8v7Qp2Il%2Bluk%3Dreserved=0.
  We ran into this using
> beam-2.15.0 on flink-1.8 over S3.  It looks like it’ll be fixed in 2.17.0.
>
> As a temporary workaround, you can set the #withNoSpilling() option if
> you’re using the FileIO api.  If not, it should be relatively easy to
> move to it.
>
> *From: *Maulik Soneji 
> *Reply-To: *"dev@beam.apache.org" 
> *Date: *Thursday, October 24, 2019 at 7:05 AM
> *To: *"dev@beam.apache.org" 
> *Subject: *Intermittent No FileSystem found exception
>
> Hi everyone,
>
> We are running a Batch job on flink that reads data from GCS and does
> some aggregation on this data.
> We are intermittently getting issue: `No filesystem found for scheme gs`
>
> We are running Beam version 2.15.0 with FlinkRunner, Flink version: 1.6.4
>
> On remote debugging the task managers, we found that in a few task
> managers, the *GcsFileSystemRegistrar is not added to the list of
> FileSystem Schemes*. In these task managers, we get this issue.
>
> The collection *SCHEME_TO_FILESYSTEM* is getting modified only in
> *setDefaultPipelineOptions* function call in
> org.apache.beam.sdk.io.FileSystems class and this function is not
> getting called and thus the GcsFileSystemRegistrar is not added to
> *SCHEME_TO_FILESYSTEM*.
>
> *Detailed stacktrace:*
>
>
> |java.lang.IllegalArgumentException: No filesystem found for scheme gs|
>
> | at
> 
org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:463)|
>
> | at
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:533)|
>
> | at
> org.apache.beam.sdk.io.fs.ResourceIdCoder.decode(ResourceIdCoder.java:49)|
>
> | at
> 
org.apache.beam.sdk.io.fs.MetadataCoder.decodeBuilder(MetadataCoder.java:62)|
>
> | at
> org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:58)|
>
> | at
> org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:36)|
>
> | at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)|
>
> | at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)|
>
> | at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)|
>
> | at
> 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:592)|
>
> | at
> 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:583)|
>
> | at
> 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:529)|
>
> | at
> 
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:92)|
>
> | at
> 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)|
>
> | at
> 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)|
>
> | at
> 
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)|
>
> | at
> 
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)|
>
> | at
> 
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)|
>
> | at
> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:94)|
>
> | at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)|
>
> | at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.ja

Re: Intermittent No FileSystem found exception

2019-10-24 Thread Koprivica,Preston Blake
Hi Maulik,

I believe you may be witnessing this issue: 
https://issues.apache.org/jira/browse/BEAM-8303.  We ran into this using 
beam-2.15.0 on flink-1.8 over S3.  It looks like it’ll be fixed in 2.17.0.

As a temporary workaround, you can set the #withNoSpilling() option if you’re 
using the FileIO api.  If not, it should be relatively easy to move to it.

From: Maulik Soneji 
Reply-To: "dev@beam.apache.org" 
Date: Thursday, October 24, 2019 at 7:05 AM
To: "dev@beam.apache.org" 
Subject: Intermittent No FileSystem found exception

Hi everyone,

We are running a Batch job on flink that reads data from GCS and does some 
aggregation on this data.
We are intermittently getting issue: `No filesystem found for scheme gs`

We are running Beam version 2.15.0 with FlinkRunner, Flink version: 1.6.4

On remote debugging the task managers, we found that in a few task managers, 
the GcsFileSystemRegistrar is not added to the list of FileSystem Schemes. In 
these task managers, we get this issue.

The collection SCHEME_TO_FILESYSTEM is getting modified only in 
setDefaultPipelineOptions function call in org.apache.beam.sdk.io.FileSystems 
class and this function is not getting called and thus the 
GcsFileSystemRegistrar is not added to SCHEME_TO_FILESYSTEM.

Detailed stacktrace:


java.lang.IllegalArgumentException: No filesystem found for scheme gs

 at 
org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:463)

 at 
org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:533)

 at 
org.apache.beam.sdk.io.fs.ResourceIdCoder.decode(ResourceIdCoder.java:49)

 at 
org.apache.beam.sdk.io.fs.MetadataCoder.decodeBuilder(MetadataCoder.java:62)

 at 
org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:58)

 at 
org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:36)

 at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)

 at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)

 at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)

 at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:592)

 at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:583)

 at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:529)

 at 
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:92)

 at 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)

 at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)

 at 
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)

 at 
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)

 at 
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)

 at 
org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:94)

 at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)

 at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)

 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

 at java.lang.Thread.run(Thread.java:748)
Inorder to resolve this issue, we tried calling the following in PTransform's 
expand function:

FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create());
This function call is to make sure that the GcsFileSystemRegistrar is added to 
the list, but this hasn't solved the issue.

Can someone please help in checking why this might be happening and what can be 
done to resolve this issue.

Thanks and Regards,
Maulik


CONFIDENTIALITY NOTICE This message and any included attachments are from 
Cerner Corporation and are intended only for the addressee. The information 
contained in this message is confidential and may constitute inside or 
non-public information under international, federal, or state securities laws. 
Unauthorized forwarding, printing, copying, distribution, or use of such 
information is strictly prohibited and may be unlawful. If you are not the 
addressee, please promptly delete this message and notify the sender of the 
delivery error by e-mail or you may call Cerner's corporate offices in Kansas 
City, Missouri, U.S.A at (+1) (816)221-1024.


Re: FileIO and windowed writes

2019-10-23 Thread Koprivica,Preston Blake
I currently only have quick access to test the DirectRunner and the 
FlinkRunner.  This only manifests in the FlinkRunner.

From: Reuven Lax 
Reply-To: "dev@beam.apache.org" 
Date: Wednesday, October 23, 2019 at 4:14 PM
To: dev 
Subject: Re: FileIO and windowed writes

Is this only in the Flink runner?

On Wed, Oct 23, 2019 at 2:12 PM Koprivica,Preston Blake 
mailto:preston.b.kopriv...@cerner.com>> wrote:
I’ve tried different windowing functions and all result in the same behavior.  
The one in the previous email used the global window and a processing time 
based repeated trigger.  The filename policy used real system time to timestamp 
the outgoing files.

Here are a couple other window+trigger combos I’ve tried (with window based 
filename strategies):

Window.into(FixedWindows.of(windowDur))
.withAllowedLateness(Duration.standardHours(24))
.discardingFiredPanes()
.triggering(
Repeatedly.forever(

AfterProcessingTime.pastFirstElementInPane().plusDelayOf(windowDur
ie a fixed window with processing time based trigger + ~infinite lateness


pipeline

.apply(

format("ReadSQS(%s)", options.getQueueUrl()),

SqsIO.read().withQueueUrl(options.getQueueUrl()))

.apply(WithTimestamps.of((Message m) -> Instant.now()))

.apply(

format("Window(%s)", options.getWindowDuration()),

Window.into(FixedWindows.of(windowDur)))
ie map event time to processing time and use default trigger (ie close of 
window, no lateness)

These all resulted in the same behavior – data gets hung up in a temp file 
somewhere and the finalize file logic never seems to run.

Thanks,
-Preston

From: Reuven Lax mailto:re...@google.com>>
Reply-To: "dev@beam.apache.org<mailto:dev@beam.apache.org>" 
mailto:dev@beam.apache.org>>
Date: Wednesday, October 23, 2019 at 2:35 PM
To: dev mailto:dev@beam.apache.org>>
Subject: Re: FileIO and windowed writes

What WindowFn are you using?

On Wed, Oct 23, 2019 at 11:36 AM Koprivica,Preston Blake 
mailto:preston.b.kopriv...@cerner.com>> wrote:
Hi guys,

I’m currently working on a simple system where the intention is to ingest data 
from a realtime stream – in this case amazon SQS – and write the output in an 
incremental fashion to a durable filesystem (ie S3).  It’s easy to think of 
this as a low-fi journaling system.  We need to make sure that data that’s 
written to the source queue eventually makes it to S3.  We are utilizing the 
FileIO windowed writes with a custom naming policy to partition the files by 
their event time.   Because SQS can’t guarantee order, we do have to allow late 
messages.  Moreover, we need a further guarantee that a message be written in a 
timely manner – we’re thinking some constant multiple of the windowing 
duration.  As a first pass, we were thinking a processing time based trigger 
that fires on some regular interval.  For context, here’s an example of the 
pipeline:

ReadSQS -> Window + Processing Time Trigger -> Convert to Avro Message -> Write 
to Avro

  pipeline
.apply(SqsIO.read().withQueueUrl(options.getQueueUrl()))
.apply(
Window.configure()
.discardingFiredPanes()
.triggering(
Repeatedly.forever(

AfterProcessingTime.pastFirstElementInPane().plusDelayOf(windowDur
.apply(ParDo.of(new SqsMsgToAvroDoFn<>(recordClass, options)))
.setCoder(AvroCoder.of(recordClass))
.apply(
AvroIO.write(recordClass)
.withWindowedWrites()
.withTempDirectory(options.getTempDir())
.withNumShards(options.getShards())
.to(new WindowedFilenamePolicy(options.getOutputPrefix(), 
"avro")));

This all seemed fairly straightforward.  I have not yet observed lost data with 
this pipeline, but I am seeing an issue with timeliness.  Things seem to get 
hung up on finalizing file output, but I have yet to truly pinpoint the issue.  
To really highlight the issue, I can setup a test where I send a single message 
to the source queue.  If nothing else happens, the data never makes it to its 
final output using the FlinkRunner (beam-2.15.0, flink-1.8).  Has anyone seen 
this behavior before?  Is the expectation of eventual consistency wrong?

Thanks,
-Preston




CONFIDENTIALITY NOTICE This message and any included attachments are from 
Cerner Corporation and are intended only for the addressee. The information 
contained in this message is confidential and may constitute inside or 
non-public information under international, federal, or state securities laws. 
Unauthorized forwarding, printing, copying, distribution, or use of such 
information is strictly prohibited and may be unlawful. If you are not the 
addressee, pl

Re: FileIO and windowed writes

2019-10-23 Thread Koprivica,Preston Blake
I’ve tried different windowing functions and all result in the same behavior.  
The one in the previous email used the global window and a processing time 
based repeated trigger.  The filename policy used real system time to timestamp 
the outgoing files.

Here are a couple other window+trigger combos I’ve tried (with window based 
filename strategies):

Window.into(FixedWindows.of(windowDur))
.withAllowedLateness(Duration.standardHours(24))
.discardingFiredPanes()
.triggering(
Repeatedly.forever(

AfterProcessingTime.pastFirstElementInPane().plusDelayOf(windowDur
ie a fixed window with processing time based trigger + ~infinite lateness


pipeline

.apply(

format("ReadSQS(%s)", options.getQueueUrl()),

SqsIO.read().withQueueUrl(options.getQueueUrl()))

.apply(WithTimestamps.of((Message m) -> Instant.now()))

.apply(

format("Window(%s)", options.getWindowDuration()),

Window.into(FixedWindows.of(windowDur)))
ie map event time to processing time and use default trigger (ie close of 
window, no lateness)

These all resulted in the same behavior – data gets hung up in a temp file 
somewhere and the finalize file logic never seems to run.

Thanks,
-Preston

From: Reuven Lax 
Reply-To: "dev@beam.apache.org" 
Date: Wednesday, October 23, 2019 at 2:35 PM
To: dev 
Subject: Re: FileIO and windowed writes

What WindowFn are you using?

On Wed, Oct 23, 2019 at 11:36 AM Koprivica,Preston Blake 
mailto:preston.b.kopriv...@cerner.com>> wrote:
Hi guys,

I’m currently working on a simple system where the intention is to ingest data 
from a realtime stream – in this case amazon SQS – and write the output in an 
incremental fashion to a durable filesystem (ie S3).  It’s easy to think of 
this as a low-fi journaling system.  We need to make sure that data that’s 
written to the source queue eventually makes it to S3.  We are utilizing the 
FileIO windowed writes with a custom naming policy to partition the files by 
their event time.   Because SQS can’t guarantee order, we do have to allow late 
messages.  Moreover, we need a further guarantee that a message be written in a 
timely manner – we’re thinking some constant multiple of the windowing 
duration.  As a first pass, we were thinking a processing time based trigger 
that fires on some regular interval.  For context, here’s an example of the 
pipeline:

ReadSQS -> Window + Processing Time Trigger -> Convert to Avro Message -> Write 
to Avro

  pipeline
.apply(SqsIO.read().withQueueUrl(options.getQueueUrl()))
.apply(
Window.configure()
.discardingFiredPanes()
.triggering(
Repeatedly.forever(

AfterProcessingTime.pastFirstElementInPane().plusDelayOf(windowDur
.apply(ParDo.of(new SqsMsgToAvroDoFn<>(recordClass, options)))
.setCoder(AvroCoder.of(recordClass))
.apply(
AvroIO.write(recordClass)
.withWindowedWrites()
.withTempDirectory(options.getTempDir())
.withNumShards(options.getShards())
.to(new WindowedFilenamePolicy(options.getOutputPrefix(), 
"avro")));

This all seemed fairly straightforward.  I have not yet observed lost data with 
this pipeline, but I am seeing an issue with timeliness.  Things seem to get 
hung up on finalizing file output, but I have yet to truly pinpoint the issue.  
To really highlight the issue, I can setup a test where I send a single message 
to the source queue.  If nothing else happens, the data never makes it to its 
final output using the FlinkRunner (beam-2.15.0, flink-1.8).  Has anyone seen 
this behavior before?  Is the expectation of eventual consistency wrong?

Thanks,
-Preston




CONFIDENTIALITY NOTICE This message and any included attachments are from 
Cerner Corporation and are intended only for the addressee. The information 
contained in this message is confidential and may constitute inside or 
non-public information under international, federal, or state securities laws. 
Unauthorized forwarding, printing, copying, distribution, or use of such 
information is strictly prohibited and may be unlawful. If you are not the 
addressee, please promptly delete this message and notify the sender of the 
delivery error by e-mail or you may call Cerner's corporate offices in Kansas 
City, Missouri, U.S.A at (+1) (816)221-1024.


FileIO and windowed writes

2019-10-23 Thread Koprivica,Preston Blake
Hi guys,

I’m currently working on a simple system where the intention is to ingest data 
from a realtime stream – in this case amazon SQS – and write the output in an 
incremental fashion to a durable filesystem (ie S3).  It’s easy to think of 
this as a low-fi journaling system.  We need to make sure that data that’s 
written to the source queue eventually makes it to S3.  We are utilizing the 
FileIO windowed writes with a custom naming policy to partition the files by 
their event time.   Because SQS can’t guarantee order, we do have to allow late 
messages.  Moreover, we need a further guarantee that a message be written in a 
timely manner – we’re thinking some constant multiple of the windowing 
duration.  As a first pass, we were thinking a processing time based trigger 
that fires on some regular interval.  For context, here’s an example of the 
pipeline:

ReadSQS -> Window + Processing Time Trigger -> Convert to Avro Message -> Write 
to Avro

  pipeline
.apply(SqsIO.read().withQueueUrl(options.getQueueUrl()))
.apply(
Window.configure()
.discardingFiredPanes()
.triggering(
Repeatedly.forever(

AfterProcessingTime.pastFirstElementInPane().plusDelayOf(windowDur
.apply(ParDo.of(new SqsMsgToAvroDoFn<>(recordClass, options)))
.setCoder(AvroCoder.of(recordClass))
.apply(
AvroIO.write(recordClass)
.withWindowedWrites()
.withTempDirectory(options.getTempDir())
.withNumShards(options.getShards())
.to(new WindowedFilenamePolicy(options.getOutputPrefix(), 
"avro")));

This all seemed fairly straightforward.  I have not yet observed lost data with 
this pipeline, but I am seeing an issue with timeliness.  Things seem to get 
hung up on finalizing file output, but I have yet to truly pinpoint the issue.  
To really highlight the issue, I can setup a test where I send a single message 
to the source queue.  If nothing else happens, the data never makes it to its 
final output using the FlinkRunner (beam-2.15.0, flink-1.8).  Has anyone seen 
this behavior before?  Is the expectation of eventual consistency wrong?

Thanks,
-Preston




CONFIDENTIALITY NOTICE This message and any included attachments are from 
Cerner Corporation and are intended only for the addressee. The information 
contained in this message is confidential and may constitute inside or 
non-public information under international, federal, or state securities laws. 
Unauthorized forwarding, printing, copying, distribution, or use of such 
information is strictly prohibited and may be unlawful. If you are not the 
addressee, please promptly delete this message and notify the sender of the 
delivery error by e-mail or you may call Cerner's corporate offices in Kansas 
City, Missouri, U.S.A at (+1) (816)221-1024.


Re: No filesystem found for scheme s3 using FileIO.write()

2019-09-26 Thread Koprivica,Preston Blake
Hi max, 

I was able to test that change you suggested.  I went ahead and commented on 
the Jira 
https://issues.apache.org/jira/browse/BEAM-8303?focusedCommentId=16939002=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16939002.
  Unless others object, I'm going to move any further discussion to the Jira to 
keep a consistent record there. 

-Preston

On 9/25/19, 5:27 PM, "Koprivica,Preston Blake" 
 wrote:

Hi max,

I assume you were asking me.  I'm working on building the project from 
source and will the inject that code that you suggested.  I'll let you know if 
I have any success.

On 9/25/19, 4:05 PM, "Maximilian Michels"  wrote:

I wrote the same reply as you, but then deleted it again before sending
;) Given Preston's full description and a bit of Flink context, it is
pretty much impossible to have anything to do with service files.

The issue comes from a coder FileIO uses. The coder depends on the S3
file system (really any custom file system). If we use a native Flink
transformation, e.g. Union, we never get to load the FileSystems code in
the current classloader. However, the coder depends on the FileSystems
initialization code (which by the way has to be run "manually" because
it depends on the pipeline options), so it will error.

Note that FileIO usually consists of a cascade of different transforms
for which parts may execute on different machines. That's why we see
this error during `decode` on a remote host which had not been
initialized yet by one of the other instances of the initialization
code. Probably that is the receiving side of a Reshuffle.

Just to proof this theory, do you mind building Beam and testing your
pipeline with the following line added before line 75?

https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2F04dc3c3b14ab780e9736d5f769c6bf2a27a390bb%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2Ftranslation%2Ftypes%2FCoderTypeInformation.java%23L75data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7C20a09459241b440b5b4708d74207957d%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050472655447516sdata=Ou0LooAOYTlrJOEXrVlj%2FwnbMn7pRG6H5k98Oy3sljM%3Dreserved=0
FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create());

We can later pass in the pipeline options to initialize this properly.

If this is too much, I'll probably give it a try tomorrow using your
example code.

Cheers,
Max

On 25.09.19 11:15, Kenneth Knowles wrote:
> Are you building a fat jar? I don't know if this is your issue. I 
don't
> know Flink's operation in any close detail, and I'm not sure how it
> relates to what Max has described. But it is a common cause of this 
kind
> of error.
>
> The registration of the filesystem is here:
> 
https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Fmaster%2Fsdks%2Fjava%2Fio%2Famazon-web-services%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2Faws%2Fs3%2FS3FileSystemRegistrar.java%23L32data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7C20a09459241b440b5b4708d74207957d%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050472655447516sdata=H4yxhhbXdyx2ubcDxQ%2BqguJLjIQSZYIVez7PugWHpTE%3Dreserved=0.
> This results in the built jar for 
beam-sdks-java-io-amazon-web-services
> to have a file
> META-INF/services/org.apache.beam.sdk.io.FileSystemRegistrar with the
> line org.apache.beam.sdk.io.aws.s3.S3FileSystemRegistrar
>
> The file META-INF/services/org.apache.beam.sdk.io.FileSystemRegistrar
> exists in many dependencies, including the core SDK. The default for
> many fat jar tools (maven shade and gradle shadow) is that they
> nondeterministically clobber each other, and you have to add a line 
like
> "mergeServiceFiles" to your configuration to keep all the 
registrations.
>
> Kenn
>
> On Wed, Sep 25, 2019 at 8:36 AM Maximilian Michels  <mailto:m...@apache.org>> wrote:
>
> Hey Preston,
>
> I just wrote a reply on the user mailing list. Copying the reply 
here
> just in case:
>
> 
>
> Your observation seems to be correct. There is an issue with the 
file
> system registration.
>
> The two types of errors you are seeing, as well as the successful 
run,
> are just due to the different structure of the genera

Re: No filesystem found for scheme s3 using FileIO.write()

2019-09-25 Thread Koprivica,Preston Blake
%7C735623c127174d55277308d741fc0e23%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050423139228195sdata=lPs1uMKJ%2BGecVwMRsFnA2bme044QWXx%2F3I6vBgZ518s%3Dreserved=0
>
> However, there we do not have the pipeline options available, which
> prevents any configuration. The problem is that the error occurs in 
the
> coder used in a native Flink operation which does not even run user
> code.
>
> I believe the only way fix this is to ship the FileSystems
> initialization code in CoderTypeSerializer where we are sure to 
execute
> it in time for any coders which depend on it.
>
> Could you file an issue? I'd be happy to fix this then.
>
> Thanks,
> Max
>
> 
>
> On 23.09.19 08:04, Koprivica,Preston Blake wrote:
>  > Hello everyone. This is a cross-post from the users list.  It
> didn’t get
>  > much traction, so I thought I’d move over to the dev group, since
> this
>  > seems like it might be a an issue with initialization in the
> FlinkRunner
>  > (apologies in advance if I missed something silly).
>  >
>  > I’m getting the following error when attempting to use the FileIO
> apis
>  > (beam-2.15.0) and integrating with AWS S3.  I have setup the
>  > PipelineOptions with all the relevant AWS options, so the 
filesystem
>  > registry **should* *be properly seeded by the time the graph is
> compiled
>  > and executed:
>  >
>  > java.lang.IllegalArgumentException: No filesystem found for scheme 
s3
>  >
>  >  at
>  > org.apache.beam.sdk.io
> 
<https://nam01.safelinks.protection.outlook.com/?url=http%3A%2F%2Forg.apache.beam.sdk.iodata=02%7C01%7CPreston.B.Koprivica%40cerner.com%7C735623c127174d55277308d741fc0e23%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050423139238189sdata=1zwvL1yOmtxfsAMEqOho8o5AV18WDUDNrpIP09Z0c6M%3Dreserved=0>.FileSystems.getFileSystemInternal(FileSystems.java:456)
>  >
>  >  at
>  > org.apache.beam.sdk.io
> 
<https://nam01.safelinks.protection.outlook.com/?url=http%3A%2F%2Forg.apache.beam.sdk.iodata=02%7C01%7CPreston.B.Koprivica%40cerner.com%7C735623c127174d55277308d741fc0e23%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050423139238189sdata=1zwvL1yOmtxfsAMEqOho8o5AV18WDUDNrpIP09Z0c6M%3Dreserved=0>.FileSystems.matchNewResource(FileSystems.java:526)
>  >
>  >  at
>  > org.apache.beam.sdk.io
> 
<https://nam01.safelinks.protection.outlook.com/?url=http%3A%2F%2Forg.apache.beam.sdk.iodata=02%7C01%7CPreston.B.Koprivica%40cerner.com%7C735623c127174d55277308d741fc0e23%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050423139238189sdata=1zwvL1yOmtxfsAMEqOho8o5AV18WDUDNrpIP09Z0c6M%3Dreserved=0>.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
>  >
>  >  at
>  > org.apache.beam.sdk.io
> 
<https://nam01.safelinks.protection.outlook.com/?url=http%3A%2F%2Forg.apache.beam.sdk.iodata=02%7C01%7CPreston.B.Koprivica%40cerner.com%7C735623c127174d55277308d741fc0e23%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050423139238189sdata=1zwvL1yOmtxfsAMEqOho8o5AV18WDUDNrpIP09Z0c6M%3Dreserved=0>.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
>  >
>  >  at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>  >
>  >  at
>  >
> 
org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)
>  >
>  >  at
>  >
> 
org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)
>  >
>  >  at
>  >
> 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
>  >
>  >  at
>  >
> 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
>  >
>  >  at
>  >
> 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
>  >
>  >  at
>  >
> 
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
>  >
>  >  at
>  >
> 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>  >
>   

No filesystem found for scheme s3 using FileIO.write()

2019-09-23 Thread Koprivica,Preston Blake
Hello everyone. This is a cross-post from the users list.  It didn’t get much 
traction, so I thought I’d move over to the dev group, since this seems like it 
might be a an issue with initialization in the FlinkRunner (apologies in 
advance if I missed something silly).

I’m getting the following error when attempting to use the FileIO apis 
(beam-2.15.0) and integrating with AWS S3.  I have setup the PipelineOptions 
with all the relevant AWS options, so the filesystem registry *should* be 
properly seeded by the time the graph is compiled and executed:

java.lang.IllegalArgumentException: No filesystem found for scheme s3
at 
org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
at 
org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
at 
org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
at org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)
at org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)
at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
at 
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
at 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
at 
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
at 
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
at 
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
at 
org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)

For reference, the write code resembles this:

FileIO.Write write = FileIO.write()
.via(ParquetIO.sink(schema))
.to(options.getOutputDir()). // will be something like: 
s3:///
.withSuffix(".parquet");

records.apply(String.format("Write(%s)", options.getOutputDir()), write);

The issue does not appear to be related to ParquetIO.sink().  I am able to 
reliably reproduce the issue using JSON formatted records and TextIO.sink(), as 
well.

Just trying some different knobs, I went ahead and set the following option:

write = write.withNoSpilling();

This actually seemed to fix the issue, only to have it reemerge as I scaled up 
the data set size.  The stack trace, while very similar, reads:

java.lang.IllegalArgumentException: No filesystem found for scheme s3
at 
org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
at 
org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
at 
org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
at 
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
at 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
at 
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
at 
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
at