Re: Adding a part suffix setter to the BucketingSink
Sorry, I meant "I don't see a way of doing this apart from setting a part file *suffix* with the required file extension. " On 2018/03/29 14:55:43, l...@lyft.com wrote: > Currently the BucketingSink allows addition of part prefix, pending > prefix/suffix and in-progress prefix/suffix via setter methods. Can we also > support setting part suffixes? > An instance where this maybe useful: I am currently writing GZIP compressed > output to S3 using the BucketingSink and I would want the uploaded files to > have a ".gz" or ".zip" extensions (if the files does not have such an > extensionelse they are written as garbled bytes and don't get rendered > correctly for reading). I don't see a way of doing this apart from setting a > part file prefix with the required file extension. > > Thanks > Lakshmi >
Adding a part suffix setter to the BucketingSink
Currently the BucketingSink allows addition of part prefix, pending prefix/suffix and in-progress prefix/suffix via setter methods. Can we also support setting part suffixes? An instance where this maybe useful: I am currently writing GZIP compressed output to S3 using the BucketingSink and I would want the uploaded files to have a ".gz" or ".zip" extensions (if the files does not have such an extensionelse they are written as garbled bytes and don't get rendered correctly for reading). I don't see a way of doing this apart from setting a part file prefix with the required file extension. Thanks Lakshmi
Re: Compressing files with the Bucketing Sink
Thanks a lot for the suggestion Till! I ended up using your suggestion of extending StreamWriterBase and wrapping the FSDataOutputStream with GZIPOutputStream. On 2018/03/28 09:44:26, Till Rohrmann wrote: > Hi, > > the SequenceFileWriter and the AvroKeyValueSinkWriter both support > compressed outputs. Apart from that, I'm not aware of any other Writers > which support compression. Maybe you could use these two Writers as a > guiding example. Alternatively, you could try to extend the > StreamWriterBase and wrapping the outStream into a GZIPOutputStream. > > Cheers, > Till > > On Wed, Mar 28, 2018 at 1:59 AM, l...@lyft.com wrote: > > > I want to upload a compressed file (gzip preferrably) using the Bucketing > > Sink. What is the best way to do this? Would I have to implement my own > > Writer that does the compression? Has anyone done something similar? > > >
Compressing files with the Bucketing Sink
I want to upload a compressed file (gzip preferrably) using the Bucketing Sink. What is the best way to do this? Would I have to implement my own Writer that does the compression? Has anyone done something similar?
Re: Correct way to reference Hadoop dependencies in Flink 1.4.0
Hi, I am running this on a Hadoop-free cluster (i.e. no YARN etc.). I have the following dependencies packaged in my user application JAR: aws-java-sdk 1.7.4 flink-hadoop-fs 1.4.0 flink-shaded-hadoop2 1.4.0 flink-connector-filesystem_2.11 1.4.0 hadoop-common 2.7.4 hadoop-aws 2.7.4 I have also tried the following conf: classloader.resolve-order: parent-first fs.hdfs.hadoopconf: /srv/hadoop/hadoop-2.7.5/etc/hadoop But no luck. Anything else I could be missing? On 2018/03/14 18:57:47, Francesco Ciuci wrote: > Hi, > > You do not just need the hadoop dependencies in the jar but you need to > have the hadoop file system running in your machine/cluster. > > Regards > > On 14 March 2018 at 18:38, l...@lyft.com wrote: > > > I'm trying to use a BucketingSink to write files to S3 in my Flink job. > > > > I have the Hadoop dependencies I need packaged in my user application jar. > > However, on running the job I get the following error (from the > > taskmanager): > > > > java.lang.RuntimeException: Error while creating FileSystem when > > initializing the state of the BucketingSink. > > at org.apache.flink.streaming.connectors.fs.bucketing. > > BucketingSink.initializeState(BucketingSink.java:358) > > at org.apache.flink.streaming.util.functions. > > StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > > at org.apache.flink.streaming.util.functions. > > StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java: > > 160) > > at org.apache.flink.streaming.api.operators. > > AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator. > > java:96) > > at org.apache.flink.streaming.api.operators. > > AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259) > > at org.apache.flink.streaming.runtime.tasks.StreamTask. > > initializeOperators(StreamTask.java:694) > > at org.apache.flink.streaming.runtime.tasks.StreamTask. > > initializeState(StreamTask.java:682) > > at org.apache.flink.streaming.runtime.tasks.StreamTask. > > invoke(StreamTask.java:253) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > > at java.lang.Thread.run(Thread.java:748) > > Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: > > Could not find a file system implementation for scheme 's3a'. The scheme is > > not directly supported by Flink and no Hadoop file system to support this > > scheme could be loaded. > > at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem( > > FileSystem.java:405) > > at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320) > > at org.apache.flink.streaming.connectors.fs.bucketing. > > BucketingSink.createHadoopFileSystem(BucketingSink.java:1125) > > at org.apache.flink.streaming.connectors.fs.bucketing. > > BucketingSink.initFileSystem(BucketingSink.java:411) > > at org.apache.flink.streaming.connectors.fs.bucketing. > > BucketingSink.initializeState(BucketingSink.java:355) > > ... 9 common frames omitted > > Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: > > Hadoop is not in the classpath/dependencies. > > at org.apache.flink.core.fs.UnsupportedSchemeFactory.create( > > UnsupportedSchemeFactory.java:64) > > at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem( > > FileSystem.java:401) > > ... 13 common frames omitted > > > > What's the right way to do this? > > >
Correct way to reference Hadoop dependencies in Flink 1.4.0
I'm trying to use a BucketingSink to write files to S3 in my Flink job. I have the Hadoop dependencies I need packaged in my user application jar. However, on running the job I get the following error (from the taskmanager): java.lang.RuntimeException: Error while creating FileSystem when initializing the state of the BucketingSink. at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3a'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded. at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:405) at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1125) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355) ... 9 common frames omitted Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies. at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64) at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401) ... 13 common frames omitted What's the right way to do this?
Re: Using the BucketingSink with Flink 1.4.0
Thank you for your responses Stephan and Piotrek! It's great to know that the hadoop-free Bucketing Sink might be available as early as 1.5.x! In the meantime, I have been trying workarounds but I am currently facing issues making it work. I tried including my Hadoop dependencies only in my user jar (but that didn't quite work, threw the classpath error I pasted earlier) Currently my set up is: flink-conf.yaml (Additional params) fs.hdfs.hadoopconf: /srv/hadoop-2.7.5/etc/hadoop classloader.resolve-order: parent-first Libs in /srv/flink/lib: ** total 181864 -rw-r--r-- 1 root root 86370565 Dec 6 12:10 flink-dist_2.11-1.4.0.jar -rw-r--r-- 1 root root 5177639 Mar 9 23:29 streamingplatform-core-1.0.4-20180228.035408-8.jar -rw-r--r-- 1 root root 38244416 Mar 9 23:29 flink-s3-fs-presto-1.4.0.jar -rw-r--r-- 1 root root 39662811 Mar 9 23:43 flink-shaded-hadoop2-uber-1.4.0.jar -rw-r--r-- 1 root root 126287 Mar 9 23:43 hadoop-aws-2.7.3.jar -rw-r--r-- 1 root root 11948376 Mar 9 23:43 aws-java-sdk-1.7.4.jar -rw-r--r-- 1 root root 849398 Mar 9 23:44 aws-java-sdk-core-1.11.183.jar -rw-r--r-- 1 root root 403994 Mar 9 23:44 aws-java-sdk-kms-1.11.183.jar -rw-r--r-- 1 root root 258919 Mar 9 23:44 jackson-core-2.6.7.jar -rw-r--r-- 1 root root 46986 Mar 9 23:44 jackson-annotations-2.6.7.jar -rw-r--r-- 1 root root 1170668 Mar 9 23:45 jackson-databind-2.6.7.jar -rw-r--r-- 1 root root 621931 Mar 9 23:45 joda-time-2.8.1.jar -rw-r--r-- 1 root root 747794 Mar 9 23:46 httpclient-4.5.3.jar -rw-r--r-- 1 root root 326724 Mar 9 23:46 httpcore-4.4.4.jar core-site.xml fs.s3.impl org.apache.hadoop.fs.s3a.S3AFileSystem fs.s3.buffer.dir /tmp Errors: 00:56:52.494 INFO o.a.f.r.t.Task - Source: source -> Sink: S3-Sink-Ugly-Lib (1/1) (b70868c8543e8ea28813f6b745bbb85b) switched from RUNNING to FAILED. com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 403, AWS Service: Amazon S3, AWS Request ID: 5224A5007E58235E, AWS Error Code: AccessDenied, AWS Error Message: Access Denied at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798) at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421) at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232) at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528) at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1393) at com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108) at com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100) at com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192) at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150) at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) I am using IAM roles for granting access to S3 and the S3a filesystem. I am able to write to the bucket outside of the job (via command line). Any pointers on how to workaround this will be helpful! Thanks much, Lakshmi On 2018/03/09 11:13:28, Stephan Ewen wrote: > Hi! > > Yes, the bucketing sink is unfortunately still tied to some specific Hadoop > file systems, due to a special way of using truncate() and append(). > > This is very high up our list post the 1.5 release, possibly even > backportable to 1.5.x. > > The plan is to create a new Bucketing Sink based on Flink's file systems, > meaning it can also work with Hadoop-free Flink when using file:// s3:// or > so. > > Best, > Stephan > > > On Fri, Mar 9, 2018 at 9:43 AM, Piotr Nowojski > wrote: > > > Hi, > > > > There is an quite old ticket about this issue. Feel free to bump it in the > > comment to rise itâs priority. > > > > https://issues.apache.org/jira/browse/FLINK-5789 < > > https://issues.apache.org/jira/browse/FLINK-5789> > > > > Regarding a walk around, maybe someone else will know more. There was a > > similar discussion on this topic which: > > > > http://apache-flink-user-mailing-list-archive.2336050. > > n4.nabble.com/hadoop-free-hdfs-config-td17693.html < > > http://apache-flink-user-mailing-list-archive.2336050. > > n4.nabble.com/hadoop-free-hdfs-config-td17693.html> > > > > Piotrek > > > > > On 9 Mar 2018, at 02:11, l...@lyft.com wrote: > > > > > > I want to use the BucketingSink in the hadoop-free Flink system (i.e. > > 1.4.0) but currently I am kind of blocked because of its dependency on the > > Hadoop file system. > > > 1. Is this something that's going to be fixed in t
Using the BucketingSink with Flink 1.4.0
I want to use the BucketingSink in the hadoop-free Flink system (i.e. 1.4.0) but currently I am kind of blocked because of its dependency on the Hadoop file system. 1. Is this something that's going to be fixed in the next version of Flink? 2. In the meantime, to unblock myself, what is the best way forward? I have tried packaging the hadoop dependencies I need in my user jar but I run into problems while running the job. Stacktrace below: ``` 21:26:09.654 INFO o.a.f.r.t.Task - Source: source -> Sink: S3-Sink (1/1) (9ac2cb1fc2b913c3b9d75aace08bcd37) switched from RUNNING to FAILED. java.lang.RuntimeException: Error while creating FileSystem when initializing the state of the BucketingSink. at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded. at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:405) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355) ... 9 common frames omitted Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies. at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64) at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401) ... 12 common frames omitted ```