Re: Intermittent No FileSystem found exception
I misspoke on the temporary workaround. Should use #withIgnoreWindowing() option on FileIO. On 10/25/19, 11:33 AM, "Maximilian Michels" wrote: Hi Maulik, Thanks for reporting. As Preston already pointed out, this is fixed in the upcoming 2.17.0 release. Thanks, Max On 24.10.19 15:20, Koprivica,Preston Blake wrote: > Hi Maulik, > > I believe you may be witnessing this issue: > https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-8303data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7C36be5764350d4c7c759d08d759690487%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C1%7C637076179896614383sdata=au%2BJBvQQRhGFDHnkBE0%2BNBPvTU3pMsp8v7Qp2Il%2Bluk%3Dreserved=0. We ran into this using > beam-2.15.0 on flink-1.8 over S3. It looks like it’ll be fixed in 2.17.0. > > As a temporary workaround, you can set the #withNoSpilling() option if > you’re using the FileIO api. If not, it should be relatively easy to > move to it. > > *From: *Maulik Soneji > *Reply-To: *"dev@beam.apache.org" > *Date: *Thursday, October 24, 2019 at 7:05 AM > *To: *"dev@beam.apache.org" > *Subject: *Intermittent No FileSystem found exception > > Hi everyone, > > We are running a Batch job on flink that reads data from GCS and does > some aggregation on this data. > We are intermittently getting issue: `No filesystem found for scheme gs` > > We are running Beam version 2.15.0 with FlinkRunner, Flink version: 1.6.4 > > On remote debugging the task managers, we found that in a few task > managers, the *GcsFileSystemRegistrar is not added to the list of > FileSystem Schemes*. In these task managers, we get this issue. > > The collection *SCHEME_TO_FILESYSTEM* is getting modified only in > *setDefaultPipelineOptions* function call in > org.apache.beam.sdk.io.FileSystems class and this function is not > getting called and thus the GcsFileSystemRegistrar is not added to > *SCHEME_TO_FILESYSTEM*. > > *Detailed stacktrace:* > > > |java.lang.IllegalArgumentException: No filesystem found for scheme gs| > > | at > org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:463)| > > | at > org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:533)| > > | at > org.apache.beam.sdk.io.fs.ResourceIdCoder.decode(ResourceIdCoder.java:49)| > > | at > org.apache.beam.sdk.io.fs.MetadataCoder.decodeBuilder(MetadataCoder.java:62)| > > | at > org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:58)| > > | at > org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:36)| > > | at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)| > > | at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)| > > | at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)| > > | at > org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:592)| > > | at > org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:583)| > > | at > org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:529)| > > | at > org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:92)| > > | at > org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)| > > | at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)| > > | at > org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)| > > | at > org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)| > > | at > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)| > > | at > org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:94)| > > | at > org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)| > > | at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.ja
Re: Intermittent No FileSystem found exception
Hi Maulik, I believe you may be witnessing this issue: https://issues.apache.org/jira/browse/BEAM-8303. We ran into this using beam-2.15.0 on flink-1.8 over S3. It looks like it’ll be fixed in 2.17.0. As a temporary workaround, you can set the #withNoSpilling() option if you’re using the FileIO api. If not, it should be relatively easy to move to it. From: Maulik Soneji Reply-To: "dev@beam.apache.org" Date: Thursday, October 24, 2019 at 7:05 AM To: "dev@beam.apache.org" Subject: Intermittent No FileSystem found exception Hi everyone, We are running a Batch job on flink that reads data from GCS and does some aggregation on this data. We are intermittently getting issue: `No filesystem found for scheme gs` We are running Beam version 2.15.0 with FlinkRunner, Flink version: 1.6.4 On remote debugging the task managers, we found that in a few task managers, the GcsFileSystemRegistrar is not added to the list of FileSystem Schemes. In these task managers, we get this issue. The collection SCHEME_TO_FILESYSTEM is getting modified only in setDefaultPipelineOptions function call in org.apache.beam.sdk.io.FileSystems class and this function is not getting called and thus the GcsFileSystemRegistrar is not added to SCHEME_TO_FILESYSTEM. Detailed stacktrace: java.lang.IllegalArgumentException: No filesystem found for scheme gs at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:463) at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:533) at org.apache.beam.sdk.io.fs.ResourceIdCoder.decode(ResourceIdCoder.java:49) at org.apache.beam.sdk.io.fs.MetadataCoder.decodeBuilder(MetadataCoder.java:62) at org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:58) at org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:36) at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159) at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82) at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:592) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:583) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:529) at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:92) at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106) at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72) at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47) at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73) at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:94) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) Inorder to resolve this issue, we tried calling the following in PTransform's expand function: FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create()); This function call is to make sure that the GcsFileSystemRegistrar is added to the list, but this hasn't solved the issue. Can someone please help in checking why this might be happening and what can be done to resolve this issue. Thanks and Regards, Maulik CONFIDENTIALITY NOTICE This message and any included attachments are from Cerner Corporation and are intended only for the addressee. The information contained in this message is confidential and may constitute inside or non-public information under international, federal, or state securities laws. Unauthorized forwarding, printing, copying, distribution, or use of such information is strictly prohibited and may be unlawful. If you are not the addressee, please promptly delete this message and notify the sender of the delivery error by e-mail or you may call Cerner's corporate offices in Kansas City, Missouri, U.S.A at (+1) (816)221-1024.
Re: FileIO and windowed writes
I currently only have quick access to test the DirectRunner and the FlinkRunner. This only manifests in the FlinkRunner. From: Reuven Lax Reply-To: "dev@beam.apache.org" Date: Wednesday, October 23, 2019 at 4:14 PM To: dev Subject: Re: FileIO and windowed writes Is this only in the Flink runner? On Wed, Oct 23, 2019 at 2:12 PM Koprivica,Preston Blake mailto:preston.b.kopriv...@cerner.com>> wrote: I’ve tried different windowing functions and all result in the same behavior. The one in the previous email used the global window and a processing time based repeated trigger. The filename policy used real system time to timestamp the outgoing files. Here are a couple other window+trigger combos I’ve tried (with window based filename strategies): Window.into(FixedWindows.of(windowDur)) .withAllowedLateness(Duration.standardHours(24)) .discardingFiredPanes() .triggering( Repeatedly.forever( AfterProcessingTime.pastFirstElementInPane().plusDelayOf(windowDur ie a fixed window with processing time based trigger + ~infinite lateness pipeline .apply( format("ReadSQS(%s)", options.getQueueUrl()), SqsIO.read().withQueueUrl(options.getQueueUrl())) .apply(WithTimestamps.of((Message m) -> Instant.now())) .apply( format("Window(%s)", options.getWindowDuration()), Window.into(FixedWindows.of(windowDur))) ie map event time to processing time and use default trigger (ie close of window, no lateness) These all resulted in the same behavior – data gets hung up in a temp file somewhere and the finalize file logic never seems to run. Thanks, -Preston From: Reuven Lax mailto:re...@google.com>> Reply-To: "dev@beam.apache.org<mailto:dev@beam.apache.org>" mailto:dev@beam.apache.org>> Date: Wednesday, October 23, 2019 at 2:35 PM To: dev mailto:dev@beam.apache.org>> Subject: Re: FileIO and windowed writes What WindowFn are you using? On Wed, Oct 23, 2019 at 11:36 AM Koprivica,Preston Blake mailto:preston.b.kopriv...@cerner.com>> wrote: Hi guys, I’m currently working on a simple system where the intention is to ingest data from a realtime stream – in this case amazon SQS – and write the output in an incremental fashion to a durable filesystem (ie S3). It’s easy to think of this as a low-fi journaling system. We need to make sure that data that’s written to the source queue eventually makes it to S3. We are utilizing the FileIO windowed writes with a custom naming policy to partition the files by their event time. Because SQS can’t guarantee order, we do have to allow late messages. Moreover, we need a further guarantee that a message be written in a timely manner – we’re thinking some constant multiple of the windowing duration. As a first pass, we were thinking a processing time based trigger that fires on some regular interval. For context, here’s an example of the pipeline: ReadSQS -> Window + Processing Time Trigger -> Convert to Avro Message -> Write to Avro pipeline .apply(SqsIO.read().withQueueUrl(options.getQueueUrl())) .apply( Window.configure() .discardingFiredPanes() .triggering( Repeatedly.forever( AfterProcessingTime.pastFirstElementInPane().plusDelayOf(windowDur .apply(ParDo.of(new SqsMsgToAvroDoFn<>(recordClass, options))) .setCoder(AvroCoder.of(recordClass)) .apply( AvroIO.write(recordClass) .withWindowedWrites() .withTempDirectory(options.getTempDir()) .withNumShards(options.getShards()) .to(new WindowedFilenamePolicy(options.getOutputPrefix(), "avro"))); This all seemed fairly straightforward. I have not yet observed lost data with this pipeline, but I am seeing an issue with timeliness. Things seem to get hung up on finalizing file output, but I have yet to truly pinpoint the issue. To really highlight the issue, I can setup a test where I send a single message to the source queue. If nothing else happens, the data never makes it to its final output using the FlinkRunner (beam-2.15.0, flink-1.8). Has anyone seen this behavior before? Is the expectation of eventual consistency wrong? Thanks, -Preston CONFIDENTIALITY NOTICE This message and any included attachments are from Cerner Corporation and are intended only for the addressee. The information contained in this message is confidential and may constitute inside or non-public information under international, federal, or state securities laws. Unauthorized forwarding, printing, copying, distribution, or use of such information is strictly prohibited and may be unlawful. If you are not the addressee, pl
Re: FileIO and windowed writes
I’ve tried different windowing functions and all result in the same behavior. The one in the previous email used the global window and a processing time based repeated trigger. The filename policy used real system time to timestamp the outgoing files. Here are a couple other window+trigger combos I’ve tried (with window based filename strategies): Window.into(FixedWindows.of(windowDur)) .withAllowedLateness(Duration.standardHours(24)) .discardingFiredPanes() .triggering( Repeatedly.forever( AfterProcessingTime.pastFirstElementInPane().plusDelayOf(windowDur ie a fixed window with processing time based trigger + ~infinite lateness pipeline .apply( format("ReadSQS(%s)", options.getQueueUrl()), SqsIO.read().withQueueUrl(options.getQueueUrl())) .apply(WithTimestamps.of((Message m) -> Instant.now())) .apply( format("Window(%s)", options.getWindowDuration()), Window.into(FixedWindows.of(windowDur))) ie map event time to processing time and use default trigger (ie close of window, no lateness) These all resulted in the same behavior – data gets hung up in a temp file somewhere and the finalize file logic never seems to run. Thanks, -Preston From: Reuven Lax Reply-To: "dev@beam.apache.org" Date: Wednesday, October 23, 2019 at 2:35 PM To: dev Subject: Re: FileIO and windowed writes What WindowFn are you using? On Wed, Oct 23, 2019 at 11:36 AM Koprivica,Preston Blake mailto:preston.b.kopriv...@cerner.com>> wrote: Hi guys, I’m currently working on a simple system where the intention is to ingest data from a realtime stream – in this case amazon SQS – and write the output in an incremental fashion to a durable filesystem (ie S3). It’s easy to think of this as a low-fi journaling system. We need to make sure that data that’s written to the source queue eventually makes it to S3. We are utilizing the FileIO windowed writes with a custom naming policy to partition the files by their event time. Because SQS can’t guarantee order, we do have to allow late messages. Moreover, we need a further guarantee that a message be written in a timely manner – we’re thinking some constant multiple of the windowing duration. As a first pass, we were thinking a processing time based trigger that fires on some regular interval. For context, here’s an example of the pipeline: ReadSQS -> Window + Processing Time Trigger -> Convert to Avro Message -> Write to Avro pipeline .apply(SqsIO.read().withQueueUrl(options.getQueueUrl())) .apply( Window.configure() .discardingFiredPanes() .triggering( Repeatedly.forever( AfterProcessingTime.pastFirstElementInPane().plusDelayOf(windowDur .apply(ParDo.of(new SqsMsgToAvroDoFn<>(recordClass, options))) .setCoder(AvroCoder.of(recordClass)) .apply( AvroIO.write(recordClass) .withWindowedWrites() .withTempDirectory(options.getTempDir()) .withNumShards(options.getShards()) .to(new WindowedFilenamePolicy(options.getOutputPrefix(), "avro"))); This all seemed fairly straightforward. I have not yet observed lost data with this pipeline, but I am seeing an issue with timeliness. Things seem to get hung up on finalizing file output, but I have yet to truly pinpoint the issue. To really highlight the issue, I can setup a test where I send a single message to the source queue. If nothing else happens, the data never makes it to its final output using the FlinkRunner (beam-2.15.0, flink-1.8). Has anyone seen this behavior before? Is the expectation of eventual consistency wrong? Thanks, -Preston CONFIDENTIALITY NOTICE This message and any included attachments are from Cerner Corporation and are intended only for the addressee. The information contained in this message is confidential and may constitute inside or non-public information under international, federal, or state securities laws. Unauthorized forwarding, printing, copying, distribution, or use of such information is strictly prohibited and may be unlawful. If you are not the addressee, please promptly delete this message and notify the sender of the delivery error by e-mail or you may call Cerner's corporate offices in Kansas City, Missouri, U.S.A at (+1) (816)221-1024.
FileIO and windowed writes
Hi guys, I’m currently working on a simple system where the intention is to ingest data from a realtime stream – in this case amazon SQS – and write the output in an incremental fashion to a durable filesystem (ie S3). It’s easy to think of this as a low-fi journaling system. We need to make sure that data that’s written to the source queue eventually makes it to S3. We are utilizing the FileIO windowed writes with a custom naming policy to partition the files by their event time. Because SQS can’t guarantee order, we do have to allow late messages. Moreover, we need a further guarantee that a message be written in a timely manner – we’re thinking some constant multiple of the windowing duration. As a first pass, we were thinking a processing time based trigger that fires on some regular interval. For context, here’s an example of the pipeline: ReadSQS -> Window + Processing Time Trigger -> Convert to Avro Message -> Write to Avro pipeline .apply(SqsIO.read().withQueueUrl(options.getQueueUrl())) .apply( Window.configure() .discardingFiredPanes() .triggering( Repeatedly.forever( AfterProcessingTime.pastFirstElementInPane().plusDelayOf(windowDur .apply(ParDo.of(new SqsMsgToAvroDoFn<>(recordClass, options))) .setCoder(AvroCoder.of(recordClass)) .apply( AvroIO.write(recordClass) .withWindowedWrites() .withTempDirectory(options.getTempDir()) .withNumShards(options.getShards()) .to(new WindowedFilenamePolicy(options.getOutputPrefix(), "avro"))); This all seemed fairly straightforward. I have not yet observed lost data with this pipeline, but I am seeing an issue with timeliness. Things seem to get hung up on finalizing file output, but I have yet to truly pinpoint the issue. To really highlight the issue, I can setup a test where I send a single message to the source queue. If nothing else happens, the data never makes it to its final output using the FlinkRunner (beam-2.15.0, flink-1.8). Has anyone seen this behavior before? Is the expectation of eventual consistency wrong? Thanks, -Preston CONFIDENTIALITY NOTICE This message and any included attachments are from Cerner Corporation and are intended only for the addressee. The information contained in this message is confidential and may constitute inside or non-public information under international, federal, or state securities laws. Unauthorized forwarding, printing, copying, distribution, or use of such information is strictly prohibited and may be unlawful. If you are not the addressee, please promptly delete this message and notify the sender of the delivery error by e-mail or you may call Cerner's corporate offices in Kansas City, Missouri, U.S.A at (+1) (816)221-1024.
Re: No filesystem found for scheme s3 using FileIO.write()
Hi max, I was able to test that change you suggested. I went ahead and commented on the Jira https://issues.apache.org/jira/browse/BEAM-8303?focusedCommentId=16939002=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16939002. Unless others object, I'm going to move any further discussion to the Jira to keep a consistent record there. -Preston On 9/25/19, 5:27 PM, "Koprivica,Preston Blake" wrote: Hi max, I assume you were asking me. I'm working on building the project from source and will the inject that code that you suggested. I'll let you know if I have any success. On 9/25/19, 4:05 PM, "Maximilian Michels" wrote: I wrote the same reply as you, but then deleted it again before sending ;) Given Preston's full description and a bit of Flink context, it is pretty much impossible to have anything to do with service files. The issue comes from a coder FileIO uses. The coder depends on the S3 file system (really any custom file system). If we use a native Flink transformation, e.g. Union, we never get to load the FileSystems code in the current classloader. However, the coder depends on the FileSystems initialization code (which by the way has to be run "manually" because it depends on the pipeline options), so it will error. Note that FileIO usually consists of a cascade of different transforms for which parts may execute on different machines. That's why we see this error during `decode` on a remote host which had not been initialized yet by one of the other instances of the initialization code. Probably that is the receiving side of a Reshuffle. Just to proof this theory, do you mind building Beam and testing your pipeline with the following line added before line 75? https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2F04dc3c3b14ab780e9736d5f769c6bf2a27a390bb%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2Ftranslation%2Ftypes%2FCoderTypeInformation.java%23L75data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7C20a09459241b440b5b4708d74207957d%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050472655447516sdata=Ou0LooAOYTlrJOEXrVlj%2FwnbMn7pRG6H5k98Oy3sljM%3Dreserved=0 FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create()); We can later pass in the pipeline options to initialize this properly. If this is too much, I'll probably give it a try tomorrow using your example code. Cheers, Max On 25.09.19 11:15, Kenneth Knowles wrote: > Are you building a fat jar? I don't know if this is your issue. I don't > know Flink's operation in any close detail, and I'm not sure how it > relates to what Max has described. But it is a common cause of this kind > of error. > > The registration of the filesystem is here: > https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Fmaster%2Fsdks%2Fjava%2Fio%2Famazon-web-services%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2Faws%2Fs3%2FS3FileSystemRegistrar.java%23L32data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7C20a09459241b440b5b4708d74207957d%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050472655447516sdata=H4yxhhbXdyx2ubcDxQ%2BqguJLjIQSZYIVez7PugWHpTE%3Dreserved=0. > This results in the built jar for beam-sdks-java-io-amazon-web-services > to have a file > META-INF/services/org.apache.beam.sdk.io.FileSystemRegistrar with the > line org.apache.beam.sdk.io.aws.s3.S3FileSystemRegistrar > > The file META-INF/services/org.apache.beam.sdk.io.FileSystemRegistrar > exists in many dependencies, including the core SDK. The default for > many fat jar tools (maven shade and gradle shadow) is that they > nondeterministically clobber each other, and you have to add a line like > "mergeServiceFiles" to your configuration to keep all the registrations. > > Kenn > > On Wed, Sep 25, 2019 at 8:36 AM Maximilian Michels <mailto:m...@apache.org>> wrote: > > Hey Preston, > > I just wrote a reply on the user mailing list. Copying the reply here > just in case: > > > > Your observation seems to be correct. There is an issue with the file > system registration. > > The two types of errors you are seeing, as well as the successful run, > are just due to the different structure of the genera
Re: No filesystem found for scheme s3 using FileIO.write()
%7C735623c127174d55277308d741fc0e23%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050423139228195sdata=lPs1uMKJ%2BGecVwMRsFnA2bme044QWXx%2F3I6vBgZ518s%3Dreserved=0 > > However, there we do not have the pipeline options available, which > prevents any configuration. The problem is that the error occurs in the > coder used in a native Flink operation which does not even run user > code. > > I believe the only way fix this is to ship the FileSystems > initialization code in CoderTypeSerializer where we are sure to execute > it in time for any coders which depend on it. > > Could you file an issue? I'd be happy to fix this then. > > Thanks, > Max > > > > On 23.09.19 08:04, Koprivica,Preston Blake wrote: > > Hello everyone. This is a cross-post from the users list. It > didn’t get > > much traction, so I thought I’d move over to the dev group, since > this > > seems like it might be a an issue with initialization in the > FlinkRunner > > (apologies in advance if I missed something silly). > > > > I’m getting the following error when attempting to use the FileIO > apis > > (beam-2.15.0) and integrating with AWS S3. I have setup the > > PipelineOptions with all the relevant AWS options, so the filesystem > > registry **should* *be properly seeded by the time the graph is > compiled > > and executed: > > > > java.lang.IllegalArgumentException: No filesystem found for scheme s3 > > > > at > > org.apache.beam.sdk.io > <https://nam01.safelinks.protection.outlook.com/?url=http%3A%2F%2Forg.apache.beam.sdk.iodata=02%7C01%7CPreston.B.Koprivica%40cerner.com%7C735623c127174d55277308d741fc0e23%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050423139238189sdata=1zwvL1yOmtxfsAMEqOho8o5AV18WDUDNrpIP09Z0c6M%3Dreserved=0>.FileSystems.getFileSystemInternal(FileSystems.java:456) > > > > at > > org.apache.beam.sdk.io > <https://nam01.safelinks.protection.outlook.com/?url=http%3A%2F%2Forg.apache.beam.sdk.iodata=02%7C01%7CPreston.B.Koprivica%40cerner.com%7C735623c127174d55277308d741fc0e23%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050423139238189sdata=1zwvL1yOmtxfsAMEqOho8o5AV18WDUDNrpIP09Z0c6M%3Dreserved=0>.FileSystems.matchNewResource(FileSystems.java:526) > > > > at > > org.apache.beam.sdk.io > <https://nam01.safelinks.protection.outlook.com/?url=http%3A%2F%2Forg.apache.beam.sdk.iodata=02%7C01%7CPreston.B.Koprivica%40cerner.com%7C735623c127174d55277308d741fc0e23%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050423139238189sdata=1zwvL1yOmtxfsAMEqOho8o5AV18WDUDNrpIP09Z0c6M%3Dreserved=0>.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149) > > > > at > > org.apache.beam.sdk.io > <https://nam01.safelinks.protection.outlook.com/?url=http%3A%2F%2Forg.apache.beam.sdk.iodata=02%7C01%7CPreston.B.Koprivica%40cerner.com%7C735623c127174d55277308d741fc0e23%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050423139238189sdata=1zwvL1yOmtxfsAMEqOho8o5AV18WDUDNrpIP09Z0c6M%3Dreserved=0>.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105) > > > > at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159) > > > > at > > > org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83) > > > > at > > > org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32) > > > > at > > > org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543) > > > > at > > > org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534) > > > > at > > > org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480) > > > > at > > > org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93) > > > > at > > > org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) > > >
No filesystem found for scheme s3 using FileIO.write()
Hello everyone. This is a cross-post from the users list. It didn’t get much traction, so I thought I’d move over to the dev group, since this seems like it might be a an issue with initialization in the FlinkRunner (apologies in advance if I missed something silly). I’m getting the following error when attempting to use the FileIO apis (beam-2.15.0) and integrating with AWS S3. I have setup the PipelineOptions with all the relevant AWS options, so the filesystem registry *should* be properly seeded by the time the graph is compiled and executed: java.lang.IllegalArgumentException: No filesystem found for scheme s3 at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456) at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526) at org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149) at org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105) at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159) at org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83) at org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480) at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93) at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106) at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72) at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47) at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73) at org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) For reference, the write code resembles this: FileIO.Write write = FileIO.write() .via(ParquetIO.sink(schema)) .to(options.getOutputDir()). // will be something like: s3:/// .withSuffix(".parquet"); records.apply(String.format("Write(%s)", options.getOutputDir()), write); The issue does not appear to be related to ParquetIO.sink(). I am able to reliably reproduce the issue using JSON formatted records and TextIO.sink(), as well. Just trying some different knobs, I went ahead and set the following option: write = write.withNoSpilling(); This actually seemed to fix the issue, only to have it reemerge as I scaled up the data set size. The stack trace, while very similar, reads: java.lang.IllegalArgumentException: No filesystem found for scheme s3 at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456) at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526) at org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149) at org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105) at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159) at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82) at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480) at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93) at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106) at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72) at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47) at