[jira] [Commented] (BEAM-8303) Filesystems not properly registered using FileIO.write() on FlinkRunner

2019-10-04 Thread Kenneth Knowles (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16944708#comment-16944708
 ] 

Kenneth Knowles commented on BEAM-8303:
---

Is the pain mild, given the presence of a workaround?

> Filesystems not properly registered using FileIO.write() on FlinkRunner
> ---
>
> Key: BEAM-8303
> URL: https://issues.apache.org/jira/browse/BEAM-8303
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink, sdk-java-core
>Affects Versions: 2.15.0
>Reporter: Preston Koprivica
>Assignee: Maximilian Michels
>Priority: Critical
> Fix For: 2.17.0
>
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> I’m getting the following error when attempting to use the FileIO apis 
> (beam-2.15.0) and integrating with AWS S3.  I have setup the PipelineOptions 
> with all the relevant AWS options, so the filesystem registry **should** be 
> properly seeded by the time the graph is compiled and executed:
> {code:java}
>  java.lang.IllegalArgumentException: No filesystem found for scheme s3
>     at 
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>     at 
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
>     at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>     at 
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)
>     at 
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
>     at 
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
>     at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
>     at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
>     at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>     at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>     at 
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)
>     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>     at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>     at java.lang.Thread.run(Thread.java:748)
>  {code}
> For reference, the write code resembles this:
> {code:java}
>  FileIO.Write write = FileIO.write()
>     .via(ParquetIO.sink(schema))
>     .to(options.getOutputDir()). // will be something like: 
> s3:///
>     .withSuffix(".parquet");
> records.apply(String.format("Write(%s)", options.getOutputDir()), 
> write);{code}
> The issue does not appear to be related to ParquetIO.sink().  I am able to 
> reliably reproduce the issue using JSON formatted records and TextIO.sink(), 
> as well.  Moreover, AvroIO is affected if withWindowedWrites() option is 
> added.
> Just trying some different knobs, I went ahead and set the following option:
> {code:java}
> write = write.withNoSpilling();{code}
> This actually seemed to fix the issue, only to have it reemerge as I scaled 
> up the data set size.  The stack trace, while very similar, reads:
> {code:java}
>  java.lang.IllegalArgumentException: No filesystem found for scheme s3
>     at 
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>     at 
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
>     at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>     at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
>     at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
>     at 
> 

[jira] [Commented] (BEAM-8303) Filesystems not properly registered using FileIO.write()

2019-10-02 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16943039#comment-16943039
 ] 

Maximilian Michels commented on BEAM-8303:
--

Backported to {{release-2.16.0}}. Closing this when it is clear whether it can 
be included in the next RC.

> Filesystems not properly registered using FileIO.write()
> 
>
> Key: BEAM-8303
> URL: https://issues.apache.org/jira/browse/BEAM-8303
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.15.0
>Reporter: Preston Koprivica
>Assignee: Maximilian Michels
>Priority: Critical
> Fix For: 2.17.0
>
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> I’m getting the following error when attempting to use the FileIO apis 
> (beam-2.15.0) and integrating with AWS S3.  I have setup the PipelineOptions 
> with all the relevant AWS options, so the filesystem registry **should** be 
> properly seeded by the time the graph is compiled and executed:
> {code:java}
>  java.lang.IllegalArgumentException: No filesystem found for scheme s3
>     at 
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>     at 
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
>     at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>     at 
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)
>     at 
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
>     at 
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
>     at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
>     at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
>     at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>     at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>     at 
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)
>     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>     at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>     at java.lang.Thread.run(Thread.java:748)
>  {code}
> For reference, the write code resembles this:
> {code:java}
>  FileIO.Write write = FileIO.write()
>     .via(ParquetIO.sink(schema))
>     .to(options.getOutputDir()). // will be something like: 
> s3:///
>     .withSuffix(".parquet");
> records.apply(String.format("Write(%s)", options.getOutputDir()), 
> write);{code}
> The issue does not appear to be related to ParquetIO.sink().  I am able to 
> reliably reproduce the issue using JSON formatted records and TextIO.sink(), 
> as well.  Moreover, AvroIO is affected if withWindowedWrites() option is 
> added.
> Just trying some different knobs, I went ahead and set the following option:
> {code:java}
> write = write.withNoSpilling();{code}
> This actually seemed to fix the issue, only to have it reemerge as I scaled 
> up the data set size.  The stack trace, while very similar, reads:
> {code:java}
>  java.lang.IllegalArgumentException: No filesystem found for scheme s3
>     at 
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>     at 
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
>     at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>     at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
>     at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
>   

[jira] [Commented] (BEAM-8303) Filesystems not properly registered using FileIO.write()

2019-10-02 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16942693#comment-16942693
 ] 

Maximilian Michels commented on BEAM-8303:
--

AFAIK this issue is present since the introduction of the {{FileResultCoder}}. 
It was introduced quite a while ago. I'll merge this to the release branch and 
we can see what we do with it from there.

> Filesystems not properly registered using FileIO.write()
> 
>
> Key: BEAM-8303
> URL: https://issues.apache.org/jira/browse/BEAM-8303
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.15.0
>Reporter: Preston Koprivica
>Assignee: Maximilian Michels
>Priority: Critical
> Fix For: 2.17.0
>
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> I’m getting the following error when attempting to use the FileIO apis 
> (beam-2.15.0) and integrating with AWS S3.  I have setup the PipelineOptions 
> with all the relevant AWS options, so the filesystem registry **should** be 
> properly seeded by the time the graph is compiled and executed:
> {code:java}
>  java.lang.IllegalArgumentException: No filesystem found for scheme s3
>     at 
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>     at 
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
>     at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>     at 
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)
>     at 
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
>     at 
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
>     at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
>     at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
>     at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>     at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>     at 
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)
>     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>     at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>     at java.lang.Thread.run(Thread.java:748)
>  {code}
> For reference, the write code resembles this:
> {code:java}
>  FileIO.Write write = FileIO.write()
>     .via(ParquetIO.sink(schema))
>     .to(options.getOutputDir()). // will be something like: 
> s3:///
>     .withSuffix(".parquet");
> records.apply(String.format("Write(%s)", options.getOutputDir()), 
> write);{code}
> The issue does not appear to be related to ParquetIO.sink().  I am able to 
> reliably reproduce the issue using JSON formatted records and TextIO.sink(), 
> as well.  Moreover, AvroIO is affected if withWindowedWrites() option is 
> added.
> Just trying some different knobs, I went ahead and set the following option:
> {code:java}
> write = write.withNoSpilling();{code}
> This actually seemed to fix the issue, only to have it reemerge as I scaled 
> up the data set size.  The stack trace, while very similar, reads:
> {code:java}
>  java.lang.IllegalArgumentException: No filesystem found for scheme s3
>     at 
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>     at 
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
>     at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>     at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
>     at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
>     at 
> 

[jira] [Commented] (BEAM-8303) Filesystems not properly registered using FileIO.write()

2019-09-30 Thread Mark Liu (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16941407#comment-16941407
 ] 

Mark Liu commented on BEAM-8303:


Looks like this issue happened in 2.15.0, thus it's not a regression for 
2.16.0. According to the policy in 
https://beam.apache.org/contribute/release-guide/#4-triage-release-blocking-issues-in-jira,
 it's not necessary a blocker for 2.16.0. I'll move this issue to 2.17.0 but if 
it's possible, we can still back port it to 2.16.0.

> Filesystems not properly registered using FileIO.write()
> 
>
> Key: BEAM-8303
> URL: https://issues.apache.org/jira/browse/BEAM-8303
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.15.0
>Reporter: Preston Koprivica
>Assignee: Maximilian Michels
>Priority: Critical
> Fix For: 2.16.0
>
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> I’m getting the following error when attempting to use the FileIO apis 
> (beam-2.15.0) and integrating with AWS S3.  I have setup the PipelineOptions 
> with all the relevant AWS options, so the filesystem registry **should** be 
> properly seeded by the time the graph is compiled and executed:
> {code:java}
>  java.lang.IllegalArgumentException: No filesystem found for scheme s3
>     at 
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>     at 
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
>     at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>     at 
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)
>     at 
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
>     at 
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
>     at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
>     at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
>     at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>     at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>     at 
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)
>     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>     at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>     at java.lang.Thread.run(Thread.java:748)
>  {code}
> For reference, the write code resembles this:
> {code:java}
>  FileIO.Write write = FileIO.write()
>     .via(ParquetIO.sink(schema))
>     .to(options.getOutputDir()). // will be something like: 
> s3:///
>     .withSuffix(".parquet");
> records.apply(String.format("Write(%s)", options.getOutputDir()), 
> write);{code}
> The issue does not appear to be related to ParquetIO.sink().  I am able to 
> reliably reproduce the issue using JSON formatted records and TextIO.sink(), 
> as well.  Moreover, AvroIO is affected if withWindowedWrites() option is 
> added.
> Just trying some different knobs, I went ahead and set the following option:
> {code:java}
> write = write.withNoSpilling();{code}
> This actually seemed to fix the issue, only to have it reemerge as I scaled 
> up the data set size.  The stack trace, while very similar, reads:
> {code:java}
>  java.lang.IllegalArgumentException: No filesystem found for scheme s3
>     at 
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>     at 
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
>     at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>     at 

[jira] [Commented] (BEAM-8303) Filesystems not properly registered using FileIO.write()

2019-09-30 Thread Preston Koprivica (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16941273#comment-16941273
 ] 

Preston Koprivica commented on BEAM-8303:
-

[~mxm] [~markflyhigh] I was able to test the fix by pulling Max's branch.  I 
can verify that with the fix I'm no longer seeing the original error.  Thanks 
so much for diagnosing and thanks for the quick turnaround.

> Filesystems not properly registered using FileIO.write()
> 
>
> Key: BEAM-8303
> URL: https://issues.apache.org/jira/browse/BEAM-8303
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.15.0
>Reporter: Preston Koprivica
>Assignee: Maximilian Michels
>Priority: Critical
> Fix For: 2.16.0
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> I’m getting the following error when attempting to use the FileIO apis 
> (beam-2.15.0) and integrating with AWS S3.  I have setup the PipelineOptions 
> with all the relevant AWS options, so the filesystem registry **should** be 
> properly seeded by the time the graph is compiled and executed:
> {code:java}
>  java.lang.IllegalArgumentException: No filesystem found for scheme s3
>     at 
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>     at 
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
>     at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>     at 
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)
>     at 
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
>     at 
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
>     at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
>     at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
>     at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>     at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>     at 
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)
>     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>     at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>     at java.lang.Thread.run(Thread.java:748)
>  {code}
> For reference, the write code resembles this:
> {code:java}
>  FileIO.Write write = FileIO.write()
>     .via(ParquetIO.sink(schema))
>     .to(options.getOutputDir()). // will be something like: 
> s3:///
>     .withSuffix(".parquet");
> records.apply(String.format("Write(%s)", options.getOutputDir()), 
> write);{code}
> The issue does not appear to be related to ParquetIO.sink().  I am able to 
> reliably reproduce the issue using JSON formatted records and TextIO.sink(), 
> as well.  Moreover, AvroIO is affected if withWindowedWrites() option is 
> added.
> Just trying some different knobs, I went ahead and set the following option:
> {code:java}
> write = write.withNoSpilling();{code}
> This actually seemed to fix the issue, only to have it reemerge as I scaled 
> up the data set size.  The stack trace, while very similar, reads:
> {code:java}
>  java.lang.IllegalArgumentException: No filesystem found for scheme s3
>     at 
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>     at 
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
>     at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>     at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
>     at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
>   

[jira] [Commented] (BEAM-8303) Filesystems not properly registered using FileIO.write()

2019-09-30 Thread Mark Liu (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16941177#comment-16941177
 ] 

Mark Liu commented on BEAM-8303:


Do you have ETA for the fix? I'm ready to cut a RC. 

> Filesystems not properly registered using FileIO.write()
> 
>
> Key: BEAM-8303
> URL: https://issues.apache.org/jira/browse/BEAM-8303
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.15.0
>Reporter: Preston Koprivica
>Assignee: Maximilian Michels
>Priority: Critical
> Fix For: 2.16.0
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> I’m getting the following error when attempting to use the FileIO apis 
> (beam-2.15.0) and integrating with AWS S3.  I have setup the PipelineOptions 
> with all the relevant AWS options, so the filesystem registry **should** be 
> properly seeded by the time the graph is compiled and executed:
> {code:java}
>  java.lang.IllegalArgumentException: No filesystem found for scheme s3
>     at 
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>     at 
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
>     at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>     at 
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)
>     at 
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
>     at 
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
>     at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
>     at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
>     at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>     at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>     at 
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)
>     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>     at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>     at java.lang.Thread.run(Thread.java:748)
>  {code}
> For reference, the write code resembles this:
> {code:java}
>  FileIO.Write write = FileIO.write()
>     .via(ParquetIO.sink(schema))
>     .to(options.getOutputDir()). // will be something like: 
> s3:///
>     .withSuffix(".parquet");
> records.apply(String.format("Write(%s)", options.getOutputDir()), 
> write);{code}
> The issue does not appear to be related to ParquetIO.sink().  I am able to 
> reliably reproduce the issue using JSON formatted records and TextIO.sink(), 
> as well.  Moreover, AvroIO is affected if withWindowedWrites() option is 
> added.
> Just trying some different knobs, I went ahead and set the following option:
> {code:java}
> write = write.withNoSpilling();{code}
> This actually seemed to fix the issue, only to have it reemerge as I scaled 
> up the data set size.  The stack trace, while very similar, reads:
> {code:java}
>  java.lang.IllegalArgumentException: No filesystem found for scheme s3
>     at 
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>     at 
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
>     at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>     at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
>     at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
>     at 
> 

[jira] [Commented] (BEAM-8303) Filesystems not properly registered using FileIO.write()

2019-09-30 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16940771#comment-16940771
 ] 

Maximilian Michels commented on BEAM-8303:
--

I've verified this fixes the issue. In the pipeline the problem was the lack of 
initialization in {{FlinkMultiOutputPruningFunction}}, but I've went ahead and 
fixed initialization in all operators.

> Filesystems not properly registered using FileIO.write()
> 
>
> Key: BEAM-8303
> URL: https://issues.apache.org/jira/browse/BEAM-8303
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.15.0
>Reporter: Preston Koprivica
>Assignee: Maximilian Michels
>Priority: Critical
> Fix For: 2.16.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> I’m getting the following error when attempting to use the FileIO apis 
> (beam-2.15.0) and integrating with AWS S3.  I have setup the PipelineOptions 
> with all the relevant AWS options, so the filesystem registry **should** be 
> properly seeded by the time the graph is compiled and executed:
> {code:java}
>  java.lang.IllegalArgumentException: No filesystem found for scheme s3
>     at 
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>     at 
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
>     at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>     at 
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)
>     at 
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
>     at 
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
>     at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
>     at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
>     at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>     at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>     at 
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)
>     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>     at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>     at java.lang.Thread.run(Thread.java:748)
>  {code}
> For reference, the write code resembles this:
> {code:java}
>  FileIO.Write write = FileIO.write()
>     .via(ParquetIO.sink(schema))
>     .to(options.getOutputDir()). // will be something like: 
> s3:///
>     .withSuffix(".parquet");
> records.apply(String.format("Write(%s)", options.getOutputDir()), 
> write);{code}
> The issue does not appear to be related to ParquetIO.sink().  I am able to 
> reliably reproduce the issue using JSON formatted records and TextIO.sink(), 
> as well.  Moreover, AvroIO is affected if withWindowedWrites() option is 
> added.
> Just trying some different knobs, I went ahead and set the following option:
> {code:java}
> write = write.withNoSpilling();{code}
> This actually seemed to fix the issue, only to have it reemerge as I scaled 
> up the data set size.  The stack trace, while very similar, reads:
> {code:java}
>  java.lang.IllegalArgumentException: No filesystem found for scheme s3
>     at 
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>     at 
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
>     at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>     at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
>     at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
>     at 
> 

[jira] [Commented] (BEAM-8303) Filesystems not properly registered using FileIO.write()

2019-09-30 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16940770#comment-16940770
 ] 

Maximilian Michels commented on BEAM-8303:
--

PR: https://github.com/apache/beam/pull/9688

If still possible, this should be backported to release-2.16.0.

> Filesystems not properly registered using FileIO.write()
> 
>
> Key: BEAM-8303
> URL: https://issues.apache.org/jira/browse/BEAM-8303
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.15.0
>Reporter: Preston Koprivica
>Assignee: Maximilian Michels
>Priority: Major
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> I’m getting the following error when attempting to use the FileIO apis 
> (beam-2.15.0) and integrating with AWS S3.  I have setup the PipelineOptions 
> with all the relevant AWS options, so the filesystem registry **should** be 
> properly seeded by the time the graph is compiled and executed:
> {code:java}
>  java.lang.IllegalArgumentException: No filesystem found for scheme s3
>     at 
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>     at 
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
>     at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>     at 
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)
>     at 
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
>     at 
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
>     at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
>     at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
>     at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>     at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>     at 
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)
>     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>     at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>     at java.lang.Thread.run(Thread.java:748)
>  {code}
> For reference, the write code resembles this:
> {code:java}
>  FileIO.Write write = FileIO.write()
>     .via(ParquetIO.sink(schema))
>     .to(options.getOutputDir()). // will be something like: 
> s3:///
>     .withSuffix(".parquet");
> records.apply(String.format("Write(%s)", options.getOutputDir()), 
> write);{code}
> The issue does not appear to be related to ParquetIO.sink().  I am able to 
> reliably reproduce the issue using JSON formatted records and TextIO.sink(), 
> as well.  Moreover, AvroIO is affected if withWindowedWrites() option is 
> added.
> Just trying some different knobs, I went ahead and set the following option:
> {code:java}
> write = write.withNoSpilling();{code}
> This actually seemed to fix the issue, only to have it reemerge as I scaled 
> up the data set size.  The stack trace, while very similar, reads:
> {code:java}
>  java.lang.IllegalArgumentException: No filesystem found for scheme s3
>     at 
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>     at 
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
>     at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>     at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
>     at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
>     at 
> 

[jira] [Commented] (BEAM-8303) Filesystems not properly registered using FileIO.write()

2019-09-30 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16940742#comment-16940742
 ] 

Maximilian Michels commented on BEAM-8303:
--

I managed to reproduce the problem on a single node (1 TM , 1 JM) Flink cluster 
with a custom File system:
{noformat}
FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
options.setRunner(FlinkRunner.class);

Pipeline pipeline = Pipeline.create(options);

PCollection input = pipeline.apply(Create.of("hello", "world"));

FileIO.Write write = FileIO.write()
.via(TextIO.sink())
.to("max://this/does/not/exist")
.withSuffix(".txt");
input.apply(write);

pipeline.run();{noformat}

This gives me:
{noformat}
Caused by: java.lang.IllegalArgumentException: No filesystem found for scheme 
max
at 
org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:463)
at 
org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:533)
at 
org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
at 
org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
at 
org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)
at 
org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)
at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:592)
at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:583)
at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:529)
at 
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
at 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
at 
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
at 
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
at 
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
at 
org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)
{noformat}

Note that this one does not fail:

{noformat}

input.apply(TextIO.write().to("max://this/does/not/exist").withSuffix(".txt"));
{noformat}

The cause of this is what I suspected before. The coder ({{FileResultCoder}}) 
accesses the custom file system before the operator, which receives the data 
from the coder, initializes it. Now, we had previously assumed that coders 
would not access file systems. Coders should be operating on a lower level than 
the user code.

In Flink, all operators are pipelined, i.e. a parallel instance of each 
operator runs in the same task slot. They do not share the same class loader, 
but each operator, unless chained (aka fused), has its own classloader. If we 
want to access the FileSystems code, we need to initialize it for each of those 
operators.

The "easy" solution would be to not let the coder use the file system and defer 
file resolution until later, but I suppose it is a fair assumption that the 
file system code is always initialized when Beam code runs.

Alternatively, from the Flink Runner side, we can make sure to always 
initialize the FileSystems even if we do not run user code in the operator.




> Filesystems not properly registered using FileIO.write()
> 
>
> Key: BEAM-8303
> URL: https://issues.apache.org/jira/browse/BEAM-8303
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.15.0
>Reporter: Preston Koprivica
>Assignee: Maximilian Michels
>Priority: Major
>
> I’m getting the following error when attempting to use the FileIO apis 
> (beam-2.15.0) and integrating with AWS S3.  I have setup the PipelineOptions 
> with all the relevant AWS options, so the filesystem registry **should** be 
> properly seeded by the time the graph is compiled and executed:
> {code:java}
>  java.lang.IllegalArgumentException: No filesystem found for scheme s3
>     at 
> 

[jira] [Commented] (BEAM-8303) Filesystems not properly registered using FileIO.write()

2019-09-27 Thread Preston Koprivica (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16939669#comment-16939669
 ] 

Preston Koprivica commented on BEAM-8303:
-

Yup.  I can give it a shot.  I'm currently running these tests where the issue 
was originally discovered: on an AWS EMR cluster with 5 nodes and 10 task 
managers ( 1 slot per taskmanager ).

> Filesystems not properly registered using FileIO.write()
> 
>
> Key: BEAM-8303
> URL: https://issues.apache.org/jira/browse/BEAM-8303
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.15.0
>Reporter: Preston Koprivica
>Assignee: Maximilian Michels
>Priority: Major
>
> I’m getting the following error when attempting to use the FileIO apis 
> (beam-2.15.0) and integrating with AWS S3.  I have setup the PipelineOptions 
> with all the relevant AWS options, so the filesystem registry **should** be 
> properly seeded by the time the graph is compiled and executed:
> {code:java}
>  java.lang.IllegalArgumentException: No filesystem found for scheme s3
>     at 
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>     at 
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
>     at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>     at 
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)
>     at 
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
>     at 
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
>     at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
>     at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
>     at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>     at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>     at 
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)
>     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>     at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>     at java.lang.Thread.run(Thread.java:748)
>  {code}
> For reference, the write code resembles this:
> {code:java}
>  FileIO.Write write = FileIO.write()
>     .via(ParquetIO.sink(schema))
>     .to(options.getOutputDir()). // will be something like: 
> s3:///
>     .withSuffix(".parquet");
> records.apply(String.format("Write(%s)", options.getOutputDir()), 
> write);{code}
> The issue does not appear to be related to ParquetIO.sink().  I am able to 
> reliably reproduce the issue using JSON formatted records and TextIO.sink(), 
> as well.  Moreover, AvroIO is affected if withWindowedWrites() option is 
> added.
> Just trying some different knobs, I went ahead and set the following option:
> {code:java}
> write = write.withNoSpilling();{code}
> This actually seemed to fix the issue, only to have it reemerge as I scaled 
> up the data set size.  The stack trace, while very similar, reads:
> {code:java}
>  java.lang.IllegalArgumentException: No filesystem found for scheme s3
>     at 
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>     at 
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
>     at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>     at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
>     at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
>     at 
> 

[jira] [Commented] (BEAM-8303) Filesystems not properly registered using FileIO.write()

2019-09-27 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16939511#comment-16939511
 ] 

Maximilian Michels commented on BEAM-8303:
--

The code change I suggested does not work because it is executed during job 
compilation, not during cluster runtime. Could you instead try to insert the 
following static block here? 
[https://github.com/apache/beam/blob/04dc3c3b14ab780e9736d5f769c6bf2a27a390bb/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java#L34]
 
 Again, this is just to verify that the problem goes away when this code is 
present:
{noformat}
static {
FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create());
}
{noformat}
Also, could you share your execution environment? Do you execute this from your 
IDE or on a cluster? If on a cluster, how many taskmanager/taskslots to do use?

> Filesystems not properly registered using FileIO.write()
> 
>
> Key: BEAM-8303
> URL: https://issues.apache.org/jira/browse/BEAM-8303
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.15.0
>Reporter: Preston Koprivica
>Assignee: Maximilian Michels
>Priority: Major
>
> I’m getting the following error when attempting to use the FileIO apis 
> (beam-2.15.0) and integrating with AWS S3.  I have setup the PipelineOptions 
> with all the relevant AWS options, so the filesystem registry **should** be 
> properly seeded by the time the graph is compiled and executed:
> {code:java}
>  java.lang.IllegalArgumentException: No filesystem found for scheme s3
>     at 
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>     at 
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
>     at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>     at 
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)
>     at 
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
>     at 
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
>     at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
>     at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
>     at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>     at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>     at 
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)
>     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>     at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>     at java.lang.Thread.run(Thread.java:748)
>  {code}
> For reference, the write code resembles this:
> {code:java}
>  FileIO.Write write = FileIO.write()
>     .via(ParquetIO.sink(schema))
>     .to(options.getOutputDir()). // will be something like: 
> s3:///
>     .withSuffix(".parquet");
> records.apply(String.format("Write(%s)", options.getOutputDir()), 
> write);{code}
> The issue does not appear to be related to ParquetIO.sink().  I am able to 
> reliably reproduce the issue using JSON formatted records and TextIO.sink(), 
> as well.  Moreover, AvroIO is affected if withWindowedWrites() option is 
> added.
> Just trying some different knobs, I went ahead and set the following option:
> {code:java}
> write = write.withNoSpilling();{code}
> This actually seemed to fix the issue, only to have it reemerge as I scaled 
> up the data set size.  The stack trace, while very similar, reads:
> {code:java}
>  java.lang.IllegalArgumentException: No filesystem found for scheme s3
>     at 
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>     at 
> 

[jira] [Commented] (BEAM-8303) Filesystems not properly registered using FileIO.write()

2019-09-27 Thread Preston Koprivica (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16939447#comment-16939447
 ] 

Preston Koprivica commented on BEAM-8303:
-

{code}
 It was to proof the theory, that FileSystems is not initialized when the File 
coders, which rely on FileSystems being initialized, are first used.
{code}

I may need a little more detail on the experiment and what observations should 
be made - and then how to interpret them.  (Sorry...I'm still trying to catch 
up)

> Filesystems not properly registered using FileIO.write()
> 
>
> Key: BEAM-8303
> URL: https://issues.apache.org/jira/browse/BEAM-8303
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.15.0
>Reporter: Preston Koprivica
>Assignee: Maximilian Michels
>Priority: Major
>
> I’m getting the following error when attempting to use the FileIO apis 
> (beam-2.15.0) and integrating with AWS S3.  I have setup the PipelineOptions 
> with all the relevant AWS options, so the filesystem registry **should** be 
> properly seeded by the time the graph is compiled and executed:
> {code:java}
>  java.lang.IllegalArgumentException: No filesystem found for scheme s3
>     at 
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>     at 
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
>     at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>     at 
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)
>     at 
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
>     at 
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
>     at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
>     at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
>     at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>     at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>     at 
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)
>     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>     at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>     at java.lang.Thread.run(Thread.java:748)
>  {code}
> For reference, the write code resembles this:
> {code:java}
>  FileIO.Write write = FileIO.write()
>     .via(ParquetIO.sink(schema))
>     .to(options.getOutputDir()). // will be something like: 
> s3:///
>     .withSuffix(".parquet");
> records.apply(String.format("Write(%s)", options.getOutputDir()), 
> write);{code}
> The issue does not appear to be related to ParquetIO.sink().  I am able to 
> reliably reproduce the issue using JSON formatted records and TextIO.sink(), 
> as well.  Moreover, AvroIO is affected if withWindowedWrites() option is 
> added.
> Just trying some different knobs, I went ahead and set the following option:
> {code:java}
> write = write.withNoSpilling();{code}
> This actually seemed to fix the issue, only to have it reemerge as I scaled 
> up the data set size.  The stack trace, while very similar, reads:
> {code:java}
>  java.lang.IllegalArgumentException: No filesystem found for scheme s3
>     at 
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>     at 
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
>     at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>     at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
>     at 

[jira] [Commented] (BEAM-8303) Filesystems not properly registered using FileIO.write()

2019-09-27 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16939207#comment-16939207
 ] 

Maximilian Michels commented on BEAM-8303:
--

[~preston] The suggested experiment on the mailing list was not meant as a 
proper fix. It was to proof the theory, that {{FileSystems}} is not initialized 
when the File coders, which rely on FileSystems being initialized, are first 
used. I was assuming you didn't have any configuration regarding S3 in the 
pipeline options, hence the passing of the default pipeline options. If you do, 
you may have to copy the options in there, for testing.

> Filesystems not properly registered using FileIO.write()
> 
>
> Key: BEAM-8303
> URL: https://issues.apache.org/jira/browse/BEAM-8303
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.15.0
>Reporter: Preston Koprivica
>Assignee: Maximilian Michels
>Priority: Major
>
> I’m getting the following error when attempting to use the FileIO apis 
> (beam-2.15.0) and integrating with AWS S3.  I have setup the PipelineOptions 
> with all the relevant AWS options, so the filesystem registry **should** be 
> properly seeded by the time the graph is compiled and executed:
> {code:java}
>  java.lang.IllegalArgumentException: No filesystem found for scheme s3
>     at 
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>     at 
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
>     at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>     at 
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)
>     at 
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
>     at 
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
>     at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
>     at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
>     at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>     at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>     at 
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)
>     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>     at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>     at java.lang.Thread.run(Thread.java:748)
>  {code}
> For reference, the write code resembles this:
> {code:java}
>  FileIO.Write write = FileIO.write()
>     .via(ParquetIO.sink(schema))
>     .to(options.getOutputDir()). // will be something like: 
> s3:///
>     .withSuffix(".parquet");
> records.apply(String.format("Write(%s)", options.getOutputDir()), 
> write);{code}
> The issue does not appear to be related to ParquetIO.sink().  I am able to 
> reliably reproduce the issue using JSON formatted records and TextIO.sink(), 
> as well.  Moreover, AvroIO is affected if withWindowedWrites() option is 
> added.
> Just trying some different knobs, I went ahead and set the following option:
> {code:java}
> write = write.withNoSpilling();{code}
> This actually seemed to fix the issue, only to have it reemerge as I scaled 
> up the data set size.  The stack trace, while very similar, reads:
> {code:java}
>  java.lang.IllegalArgumentException: No filesystem found for scheme s3
>     at 
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>     at 
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
>     at 

[jira] [Commented] (BEAM-8303) Filesystems not properly registered using FileIO.write()

2019-09-26 Thread Preston Koprivica (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16939002#comment-16939002
 ] 

Preston Koprivica commented on BEAM-8303:
-

[~mxm] Sorry for the delay.  I was struggling with my local build and I had to 
track down some issues.  Totally unrelated, but if you have some time, I think 
I may have found an issue related to some recent build changes [1].  In any 
case, I was able to finally get the local build working and pulled into my test 
project.  

{quote}
Just to proof this theory, do you mind building Beam and testing your pipeline 
with the following line added before line 75?
https://github.com/apache/beam/blob/04dc3c3b14ab780e9736d5f769c6bf2a27a390bb/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java#L75
FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create());
{quote}

This change did not impact the behavior at all.  And I guess the question is, 
would we have expected it to using the default PipelineOptions (which I'm 
assuming wouldn't include the S3 options).

[1] https://issues.apache.org/jira/browse/BEAM-8021

> Filesystems not properly registered using FileIO.write()
> 
>
> Key: BEAM-8303
> URL: https://issues.apache.org/jira/browse/BEAM-8303
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.15.0
>Reporter: Preston Koprivica
>Assignee: Maximilian Michels
>Priority: Major
>
> I’m getting the following error when attempting to use the FileIO apis 
> (beam-2.15.0) and integrating with AWS S3.  I have setup the PipelineOptions 
> with all the relevant AWS options, so the filesystem registry **should** be 
> properly seeded by the time the graph is compiled and executed:
> {code:java}
>  java.lang.IllegalArgumentException: No filesystem found for scheme s3
>     at 
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>     at 
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
>     at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>     at 
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)
>     at 
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
>     at 
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
>     at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
>     at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
>     at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>     at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>     at 
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)
>     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>     at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>     at java.lang.Thread.run(Thread.java:748)
>  {code}
> For reference, the write code resembles this:
> {code:java}
>  FileIO.Write write = FileIO.write()
>     .via(ParquetIO.sink(schema))
>     .to(options.getOutputDir()). // will be something like: 
> s3:///
>     .withSuffix(".parquet");
> records.apply(String.format("Write(%s)", options.getOutputDir()), 
> write);{code}
> The issue does not appear to be related to ParquetIO.sink().  I am able to 
> reliably reproduce the issue using JSON formatted records and TextIO.sink(), 
> as well.  Moreover, AvroIO is affected if withWindowedWrites() option is 
> added.
> Just trying some different knobs, I went ahead and set the following option:
> {code:java}
> write = write.withNoSpilling();{code}
> This actually seemed to fix the issue, only to have it reemerge as I scaled 
> up the data set size.  The stack trace, while very 

[jira] [Commented] (BEAM-8303) Filesystems not properly registered using FileIO.write()

2019-09-25 Thread Preston Koprivica (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16938088#comment-16938088
 ] 

Preston Koprivica commented on BEAM-8303:
-

In hindsight this test seems irrelevant, but I'll post for posterity.  Here's 
the error after having specified #withTempDirectory(... a local filesystem ... 
): 

{code}
Caused by: java.lang.IllegalArgumentException: Expect srcResourceIds and 
destResourceIds have the same scheme, but received file, s3.
at 
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument(Preconditions.java:122)
at 
org.apache.beam.sdk.io.FileSystems.validateSrcDestLists(FileSystems.java:421)
at org.apache.beam.sdk.io.FileSystems.rename(FileSystems.java:307)
at 
org.apache.beam.sdk.io.FileBasedSink$WriteOperation.moveToOutputFiles(FileBasedSink.java:755)
at 
org.apache.beam.sdk.io.WriteFiles$FinalizeTempFileBundles$FinalizeFn.process(WriteFiles.java:850)
{code}

Looks like one of the final steps in the processing graph is to rename files 
from the temp dir to the final output dir.  And, of course, it makes sense that 
those should be on the same filesystem.  I'm not sure if this informs anything 
regarding why "coders" need access to the filesystem, other than they seem to 
share the same temporary filesystem with the broader processing graph.

> Filesystems not properly registered using FileIO.write()
> 
>
> Key: BEAM-8303
> URL: https://issues.apache.org/jira/browse/BEAM-8303
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.15.0
>Reporter: Preston Koprivica
>Assignee: Maximilian Michels
>Priority: Major
>
> I’m getting the following error when attempting to use the FileIO apis 
> (beam-2.15.0) and integrating with AWS S3.  I have setup the PipelineOptions 
> with all the relevant AWS options, so the filesystem registry **should** be 
> properly seeded by the time the graph is compiled and executed:
> {code:java}
>  java.lang.IllegalArgumentException: No filesystem found for scheme s3
>     at 
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>     at 
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
>     at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>     at 
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)
>     at 
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
>     at 
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
>     at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
>     at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
>     at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>     at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>     at 
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)
>     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>     at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>     at java.lang.Thread.run(Thread.java:748)
>  {code}
> For reference, the write code resembles this:
> {code:java}
>  FileIO.Write write = FileIO.write()
>     .via(ParquetIO.sink(schema))
>     .to(options.getOutputDir()). // will be something like: 
> s3:///
>     .withSuffix(".parquet");
> records.apply(String.format("Write(%s)", options.getOutputDir()), 
> write);{code}
> The issue does not appear to be related to ParquetIO.sink().  I am able to 
> reliably reproduce the issue using JSON formatted records and TextIO.sink(), 
> as well.  Moreover, AvroIO is affected if withWindowedWrites() option is 
> added.
> Just trying some different knobs, I went 

[jira] [Commented] (BEAM-8303) Filesystems not properly registered using FileIO.write()

2019-09-25 Thread Preston Koprivica (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16938079#comment-16938079
 ] 

Preston Koprivica commented on BEAM-8303:
-

[~mxm] I was able to reproduce this issue in a couple different contexts, and 
each of them somehow involved temporary directories.  I tried to decouple the 
target directory and the temporary directory filesystems using 
#withTempDirectory(...), but something is enforcing them to be the same 
filesystem.  I'm trying to hunt down the source now, will reply with any 
findings. 

> Filesystems not properly registered using FileIO.write()
> 
>
> Key: BEAM-8303
> URL: https://issues.apache.org/jira/browse/BEAM-8303
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.15.0
>Reporter: Preston Koprivica
>Assignee: Maximilian Michels
>Priority: Major
>
> I’m getting the following error when attempting to use the FileIO apis 
> (beam-2.15.0) and integrating with AWS S3.  I have setup the PipelineOptions 
> with all the relevant AWS options, so the filesystem registry **should** be 
> properly seeded by the time the graph is compiled and executed:
> {code:java}
>  java.lang.IllegalArgumentException: No filesystem found for scheme s3
>     at 
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>     at 
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
>     at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>     at 
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)
>     at 
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
>     at 
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
>     at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
>     at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
>     at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>     at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>     at 
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)
>     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>     at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>     at java.lang.Thread.run(Thread.java:748)
>  {code}
> For reference, the write code resembles this:
> {code:java}
>  FileIO.Write write = FileIO.write()
>     .via(ParquetIO.sink(schema))
>     .to(options.getOutputDir()). // will be something like: 
> s3:///
>     .withSuffix(".parquet");
> records.apply(String.format("Write(%s)", options.getOutputDir()), 
> write);{code}
> The issue does not appear to be related to ParquetIO.sink().  I am able to 
> reliably reproduce the issue using JSON formatted records and TextIO.sink(), 
> as well.  Moreover, AvroIO is affected if withWindowedWrites() option is 
> added.
> Just trying some different knobs, I went ahead and set the following option:
> {code:java}
> write = write.withNoSpilling();{code}
> This actually seemed to fix the issue, only to have it reemerge as I scaled 
> up the data set size.  The stack trace, while very similar, reads:
> {code:java}
>  java.lang.IllegalArgumentException: No filesystem found for scheme s3
>     at 
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>     at 
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
>     at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>     at 

[jira] [Commented] (BEAM-8303) Filesystems not properly registered using FileIO.write()

2019-09-25 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16938049#comment-16938049
 ] 

Maximilian Michels commented on BEAM-8303:
--

[~kenn] Thanks for the help, but the service files are not an issue here. I 
wrote the same reply as you, but then deleted it again before sending ;) Given 
Preston's full description and a bit of Flink context, it is pretty much 
impossible to have anything to do with service files. Please see my explanation 
on the mailing list. The issue comes from a coder the FileIO uses which depends 
on the S3 file system. If we use a native Flink transformation, e.g. Union, on 
a TaskManager we never get to load the FileSystems code in the current 
classloader. However, the coder depends on the file systems, so it will error.

Maybe the question is, why does the coder need access to the file system in the 
first place? That is probably the the more important fix to consider: 
https://github.com/apache/beam/blob/04dc3c3b14ab780e9736d5f769c6bf2a27a390bb/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java#L1152

> Filesystems not properly registered using FileIO.write()
> 
>
> Key: BEAM-8303
> URL: https://issues.apache.org/jira/browse/BEAM-8303
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.15.0
>Reporter: Preston Koprivica
>Priority: Major
>
> I’m getting the following error when attempting to use the FileIO apis 
> (beam-2.15.0) and integrating with AWS S3.  I have setup the PipelineOptions 
> with all the relevant AWS options, so the filesystem registry **should** be 
> properly seeded by the time the graph is compiled and executed:
> {code:java}
>  java.lang.IllegalArgumentException: No filesystem found for scheme s3
>     at 
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>     at 
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
>     at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>     at 
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)
>     at 
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
>     at 
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
>     at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
>     at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
>     at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>     at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>     at 
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)
>     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>     at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>     at java.lang.Thread.run(Thread.java:748)
>  {code}
> For reference, the write code resembles this:
> {code:java}
>  FileIO.Write write = FileIO.write()
>     .via(ParquetIO.sink(schema))
>     .to(options.getOutputDir()). // will be something like: 
> s3:///
>     .withSuffix(".parquet");
> records.apply(String.format("Write(%s)", options.getOutputDir()), 
> write);{code}
> The issue does not appear to be related to ParquetIO.sink().  I am able to 
> reliably reproduce the issue using JSON formatted records and TextIO.sink(), 
> as well.  Moreover, AvroIO is affected if withWindowedWrites() option is 
> added.
> Just trying some different knobs, I went ahead and set the following option:
> {code:java}
> write = write.withNoSpilling();{code}
> This actually seemed to fix the issue, only to have it reemerge as I scaled 
> up the data set size.  The stack trace, while very similar, reads:
> {code:java}
>  java.lang.IllegalArgumentException: No filesystem found for 

[jira] [Commented] (BEAM-8303) Filesystems not properly registered using FileIO.write()

2019-09-25 Thread Preston Koprivica (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16937975#comment-16937975
 ] 

Preston Koprivica commented on BEAM-8303:
-

Yes, we are building a fat jar using the maven-shade-plugin.  Here are the 
contents of that services file: 

{code:java}
$ tar -xvf  target/.jar 
META-INF/services/org.apache.beam.sdk.io.FileSystemRegistrar
x META-INF/services/org.apache.beam.sdk.io.FileSystemRegistrarmlecosystem
$ cat META-INF/services/org.apache.beam.sdk.io.FileSystemRegistrar
org.apache.beam.sdk.io.LocalFileSystemRegistrar
org.apache.beam.sdk.io.aws.s3.S3FileSystemRegistrar 
{code}

I'm sure you guys have seen this error quite a bit, so bear with me.  The issue 
here does not seem to be classpath nor improperly initialized options.  Just to 
reiterate, if I specify write#withIgnoreWindowing(), everything works fine.

> Filesystems not properly registered using FileIO.write()
> 
>
> Key: BEAM-8303
> URL: https://issues.apache.org/jira/browse/BEAM-8303
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.15.0
>Reporter: Preston Koprivica
>Priority: Major
>
> I’m getting the following error when attempting to use the FileIO apis 
> (beam-2.15.0) and integrating with AWS S3.  I have setup the PipelineOptions 
> with all the relevant AWS options, so the filesystem registry **should** be 
> properly seeded by the time the graph is compiled and executed:
> {code:java}
>  java.lang.IllegalArgumentException: No filesystem found for scheme s3
>     at 
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>     at 
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
>     at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>     at 
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)
>     at 
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
>     at 
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
>     at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
>     at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
>     at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>     at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>     at 
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)
>     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>     at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>     at java.lang.Thread.run(Thread.java:748)
>  {code}
> For reference, the write code resembles this:
> {code:java}
>  FileIO.Write write = FileIO.write()
>     .via(ParquetIO.sink(schema))
>     .to(options.getOutputDir()). // will be something like: 
> s3:///
>     .withSuffix(".parquet");
> records.apply(String.format("Write(%s)", options.getOutputDir()), 
> write);{code}
> The issue does not appear to be related to ParquetIO.sink().  I am able to 
> reliably reproduce the issue using JSON formatted records and TextIO.sink(), 
> as well.  Moreover, AvroIO is affected if withWindowedWrites() option is 
> added.
> Just trying some different knobs, I went ahead and set the following option:
> {code:java}
> write = write.withNoSpilling();{code}
> This actually seemed to fix the issue, only to have it reemerge as I scaled 
> up the data set size.  The stack trace, while very similar, reads:
> {code:java}
>  java.lang.IllegalArgumentException: No filesystem found for scheme s3
>     at 
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>     at 
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>     at 
> 

[jira] [Commented] (BEAM-8303) Filesystems not properly registered using FileIO.write()

2019-09-25 Thread Kenneth Knowles (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16937963#comment-16937963
 ] 

Kenneth Knowles commented on BEAM-8303:
---

Copying reply from mailing list here, so it is more discoverable:

Are you building a fat jar? I don't know if this is your issue. I don't know 
Flink's operation in any close detail, and I'm not sure how it relates to what 
Max has described. But it is a common cause of this kind of error.

The registration of the filesystem is here: 
https://github.com/apache/beam/blob/master/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemRegistrar.java#L32.
 This results in the built jar for beam-sdks-java-io-amazon-web-services to 
have a file META-INF/services/org.apache.beam.sdk.io.FileSystemRegistrar with 
the line org.apache.beam.sdk.io.aws.s3.S3FileSystemRegistrar

The file META-INF/services/org.apache.beam.sdk.io.FileSystemRegistrar exists in 
many dependencies, including the core SDK. The default for many fat jar tools 
(maven shade and gradle shadow) is that they nondeterministically clobber each 
other, and you have to add a line like "mergeServiceFiles" to your 
configuration to keep all the registrations.


> Filesystems not properly registered using FileIO.write()
> 
>
> Key: BEAM-8303
> URL: https://issues.apache.org/jira/browse/BEAM-8303
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.15.0
>Reporter: Preston Koprivica
>Priority: Major
>
> I’m getting the following error when attempting to use the FileIO apis 
> (beam-2.15.0) and integrating with AWS S3.  I have setup the PipelineOptions 
> with all the relevant AWS options, so the filesystem registry **should** be 
> properly seeded by the time the graph is compiled and executed:
> {code:java}
>  java.lang.IllegalArgumentException: No filesystem found for scheme s3
>     at 
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>     at 
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
>     at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>     at 
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)
>     at 
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
>     at 
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
>     at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
>     at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
>     at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>     at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>     at 
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)
>     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>     at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>     at java.lang.Thread.run(Thread.java:748)
>  {code}
> For reference, the write code resembles this:
> {code:java}
>  FileIO.Write write = FileIO.write()
>     .via(ParquetIO.sink(schema))
>     .to(options.getOutputDir()). // will be something like: 
> s3:///
>     .withSuffix(".parquet");
> records.apply(String.format("Write(%s)", options.getOutputDir()), 
> write);{code}
> The issue does not appear to be related to ParquetIO.sink().  I am able to 
> reliably reproduce the issue using JSON formatted records and TextIO.sink(), 
> as well.  Moreover, AvroIO is affected if withWindowedWrites() option is 
> added.
> Just trying some different knobs, I went ahead and set the following option:
> {code:java}
> write = write.withNoSpilling();{code}
> This actually seemed to fix the issue, only to have it reemerge as I scaled 
> up the data set 

[jira] [Commented] (BEAM-8303) Filesystems not properly registered using FileIO.write()

2019-09-25 Thread Akshay Iyangar (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16937955#comment-16937955
 ] 

Akshay Iyangar commented on BEAM-8303:
--

Can you check what is the security provider that is being used by your pipeline 
? We faced the issue and found that it was using SUN provider.

Changing it to conscript or bouncy castle will help you resolve the issue.

> Filesystems not properly registered using FileIO.write()
> 
>
> Key: BEAM-8303
> URL: https://issues.apache.org/jira/browse/BEAM-8303
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.15.0
>Reporter: Preston Koprivica
>Priority: Major
>
> I’m getting the following error when attempting to use the FileIO apis 
> (beam-2.15.0) and integrating with AWS S3.  I have setup the PipelineOptions 
> with all the relevant AWS options, so the filesystem registry **should** be 
> properly seeded by the time the graph is compiled and executed:
> {code:java}
>  java.lang.IllegalArgumentException: No filesystem found for scheme s3
>     at 
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>     at 
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
>     at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>     at 
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)
>     at 
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
>     at 
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
>     at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
>     at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
>     at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>     at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>     at 
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)
>     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>     at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>     at java.lang.Thread.run(Thread.java:748)
>  {code}
> For reference, the write code resembles this:
> {code:java}
>  FileIO.Write write = FileIO.write()
>     .via(ParquetIO.sink(schema))
>     .to(options.getOutputDir()). // will be something like: 
> s3:///
>     .withSuffix(".parquet");
> records.apply(String.format("Write(%s)", options.getOutputDir()), 
> write);{code}
> The issue does not appear to be related to ParquetIO.sink().  I am able to 
> reliably reproduce the issue using JSON formatted records and TextIO.sink(), 
> as well.  Moreover, AvroIO is affected if withWindowedWrites() option is 
> added.
> Just trying some different knobs, I went ahead and set the following option:
> {code:java}
> write = write.withNoSpilling();{code}
> This actually seemed to fix the issue, only to have it reemerge as I scaled 
> up the data set size.  The stack trace, while very similar, reads:
> {code:java}
>  java.lang.IllegalArgumentException: No filesystem found for scheme s3
>     at 
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>     at 
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
>     at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>     at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
>     at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
>     at 
> 

[jira] [Commented] (BEAM-8303) Filesystems not properly registered using FileIO.write()

2019-09-23 Thread Preston Koprivica (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16936130#comment-16936130
 ] 

Preston Koprivica commented on BEAM-8303:
-

I'll defer to the experts on the priority of this issue.  Currently, I am able 
to workaround it by setting FileIO.write().withIgnoreWindowing(), which is also 
the default for AvroIO 
([https://github.com/apache/beam/blob/release-2.15.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java#L516]),
 and I suspect other FileBasedSink apis as well.

> Filesystems not properly registered using FileIO.write()
> 
>
> Key: BEAM-8303
> URL: https://issues.apache.org/jira/browse/BEAM-8303
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.15.0
>Reporter: Preston Koprivica
>Priority: Major
>
> I’m getting the following error when attempting to use the FileIO apis 
> (beam-2.15.0) and integrating with AWS S3.  I have setup the PipelineOptions 
> with all the relevant AWS options, so the filesystem registry **should** be 
> properly seeded by the time the graph is compiled and executed:
> {code:java}
>  java.lang.IllegalArgumentException: No filesystem found for scheme s3
>     at 
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>     at 
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
>     at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>     at 
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)
>     at 
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
>     at 
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
>     at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
>     at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
>     at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>     at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>     at 
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)
>     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>     at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>     at java.lang.Thread.run(Thread.java:748)
>  {code}
> For reference, the write code resembles this:
> {code:java}
>  FileIO.Write write = FileIO.write()
>     .via(ParquetIO.sink(schema))
>     .to(options.getOutputDir()). // will be something like: 
> s3:///
>     .withSuffix(".parquet");
> records.apply(String.format("Write(%s)", options.getOutputDir()), 
> write);{code}
> The issue does not appear to be related to ParquetIO.sink().  I am able to 
> reliably reproduce the issue using JSON formatted records and TextIO.sink(), 
> as well.  Moreover, AvroIO is affected if withWindowedWrites() option is 
> added.
> Just trying some different knobs, I went ahead and set the following option:
> {code:java}
> write = write.withNoSpilling();{code}
> This actually seemed to fix the issue, only to have it reemerge as I scaled 
> up the data set size.  The stack trace, while very similar, reads:
> {code:java}
>  java.lang.IllegalArgumentException: No filesystem found for scheme s3
>     at 
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>     at 
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
>     at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>     at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
>     at