Re: Adding a part suffix setter to the BucketingSink

2018-03-29 Thread lrao
Sorry, I meant "I don't see a way of doing this apart from setting a part file 
*suffix* with the required file extension. "


On 2018/03/29 14:55:43, l...@lyft.com  wrote: 
> Currently the BucketingSink allows addition of part prefix, pending 
> prefix/suffix and in-progress prefix/suffix via setter methods. Can we also 
> support setting part suffixes?
> An instance where this maybe useful: I am currently writing GZIP compressed 
> output to S3 using the BucketingSink and I would want the uploaded files to 
> have a ".gz" or ".zip" extensions (if the files does not have such an 
> extensionelse they are written as garbled bytes and don't get rendered 
> correctly for reading). I don't see a way of doing this apart from setting a 
> part file prefix with the required file extension.  
> 
> Thanks
> Lakshmi
> 


Adding a part suffix setter to the BucketingSink

2018-03-29 Thread lrao
Currently the BucketingSink allows addition of part prefix, pending 
prefix/suffix and in-progress prefix/suffix via setter methods. Can we also 
support setting part suffixes?
An instance where this maybe useful: I am currently writing GZIP compressed 
output to S3 using the BucketingSink and I would want the uploaded files to 
have a ".gz" or ".zip" extensions (if the files does not have such an 
extensionelse they are written as garbled bytes and don't get rendered 
correctly for reading). I don't see a way of doing this apart from setting a 
part file prefix with the required file extension.  

Thanks
Lakshmi


Re: Compressing files with the Bucketing Sink

2018-03-29 Thread lrao






Thanks a lot for the suggestion Till!

I ended up using your suggestion of extending StreamWriterBase and wrapping the 
FSDataOutputStream with GZIPOutputStream.


On 2018/03/28 09:44:26, Till Rohrmann  wrote: 
> Hi,
> 
> the SequenceFileWriter and the AvroKeyValueSinkWriter both support
> compressed outputs. Apart from that, I'm not aware of any other Writers
> which support compression. Maybe you could use these two Writers as a
> guiding example. Alternatively, you could try to extend the
> StreamWriterBase and wrapping the outStream into a GZIPOutputStream.
> 
> Cheers,
> Till
> 
> On Wed, Mar 28, 2018 at 1:59 AM, l...@lyft.com  wrote:
> 
> > I want to upload a compressed file (gzip preferrably) using the Bucketing
> > Sink. What is the best way to do this? Would I have to implement my own
> > Writer that does the compression? Has anyone done something similar?
> >
> 


Re: Correct way to reference Hadoop dependencies in Flink 1.4.0

2018-03-14 Thread lrao
Hi,

I am running this on a Hadoop-free cluster (i.e. no YARN etc.). I have the 
following dependencies packaged in my user application JAR:

aws-java-sdk 1.7.4
flink-hadoop-fs 1.4.0
flink-shaded-hadoop2 1.4.0
flink-connector-filesystem_2.11 1.4.0
hadoop-common 2.7.4
hadoop-aws 2.7.4

I have also tried the following conf:
classloader.resolve-order: parent-first
fs.hdfs.hadoopconf: /srv/hadoop/hadoop-2.7.5/etc/hadoop

But no luck. Anything else I could be missing?

On 2018/03/14 18:57:47, Francesco Ciuci  wrote: 
> Hi,
> 
> You do not just need the hadoop dependencies in the jar but you need to
> have the hadoop file system running in your machine/cluster.
> 
> Regards
> 
> On 14 March 2018 at 18:38, l...@lyft.com  wrote:
> 
> > I'm trying to use a BucketingSink to write files to S3 in my Flink job.
> >
> > I have the Hadoop dependencies I need packaged in my user application jar.
> > However, on running the job I get the following error (from the
> > taskmanager):
> >
> > java.lang.RuntimeException: Error while creating FileSystem when
> > initializing the state of the BucketingSink.
> > at org.apache.flink.streaming.connectors.fs.bucketing.
> > BucketingSink.initializeState(BucketingSink.java:358)
> > at org.apache.flink.streaming.util.functions.
> > StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
> > at org.apache.flink.streaming.util.functions.
> > StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:
> > 160)
> > at org.apache.flink.streaming.api.operators.
> > AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.
> > java:96)
> > at org.apache.flink.streaming.api.operators.
> > AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
> > at org.apache.flink.streaming.runtime.tasks.StreamTask.
> > initializeOperators(StreamTask.java:694)
> > at org.apache.flink.streaming.runtime.tasks.StreamTask.
> > initializeState(StreamTask.java:682)
> > at org.apache.flink.streaming.runtime.tasks.StreamTask.
> > invoke(StreamTask.java:253)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> > at java.lang.Thread.run(Thread.java:748)
> > Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> > Could not find a file system implementation for scheme 's3a'. The scheme is
> > not directly supported by Flink and no Hadoop file system to support this
> > scheme could be loaded.
> > at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(
> > FileSystem.java:405)
> > at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
> > at org.apache.flink.streaming.connectors.fs.bucketing.
> > BucketingSink.createHadoopFileSystem(BucketingSink.java:1125)
> > at org.apache.flink.streaming.connectors.fs.bucketing.
> > BucketingSink.initFileSystem(BucketingSink.java:411)
> > at org.apache.flink.streaming.connectors.fs.bucketing.
> > BucketingSink.initializeState(BucketingSink.java:355)
> > ... 9 common frames omitted
> > Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> > Hadoop is not in the classpath/dependencies.
> > at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(
> > UnsupportedSchemeFactory.java:64)
> > at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(
> > FileSystem.java:401)
> > ... 13 common frames omitted
> >
> > What's the right way to do this?
> >
> 


Correct way to reference Hadoop dependencies in Flink 1.4.0

2018-03-14 Thread lrao
I'm trying to use a BucketingSink to write files to S3 in my Flink job.

I have the Hadoop dependencies I need packaged in my user application jar. 
However, on running the job I get the following error (from the taskmanager):

java.lang.RuntimeException: Error while creating FileSystem when initializing 
the state of the BucketingSink.
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could 
not find a file system implementation for scheme 's3a'. The scheme is not 
directly supported by Flink and no Hadoop file system to support this scheme 
could be loaded.
at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:405)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1125)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
... 9 common frames omitted
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: 
Hadoop is not in the classpath/dependencies.
at 
org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)
at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
... 13 common frames omitted

What's the right way to do this?


Re: Using the BucketingSink with Flink 1.4.0

2018-03-09 Thread lrao
Thank you for your responses Stephan and Piotrek!
It's great to know that the hadoop-free Bucketing Sink might be available as 
early as 1.5.x!

In the meantime, I have been trying workarounds but I am currently facing 
issues making it work. 
I tried including my Hadoop dependencies only in my user jar (but that didn't 
quite work, threw the classpath error I pasted earlier)

Currently my set up is: 

flink-conf.yaml (Additional params)
fs.hdfs.hadoopconf: /srv/hadoop-2.7.5/etc/hadoop
classloader.resolve-order: parent-first

Libs in /srv/flink/lib:
**

total 181864
-rw-r--r-- 1 root root 86370565 Dec 6 12:10 flink-dist_2.11-1.4.0.jar
-rw-r--r-- 1 root root 5177639 Mar 9 23:29 
streamingplatform-core-1.0.4-20180228.035408-8.jar
-rw-r--r-- 1 root root 38244416 Mar 9 23:29 flink-s3-fs-presto-1.4.0.jar
-rw-r--r-- 1 root root 39662811 Mar 9 23:43 flink-shaded-hadoop2-uber-1.4.0.jar
-rw-r--r-- 1 root root 126287 Mar 9 23:43 hadoop-aws-2.7.3.jar
-rw-r--r-- 1 root root 11948376 Mar 9 23:43 aws-java-sdk-1.7.4.jar
-rw-r--r-- 1 root root 849398 Mar 9 23:44 aws-java-sdk-core-1.11.183.jar
-rw-r--r-- 1 root root 403994 Mar 9 23:44 aws-java-sdk-kms-1.11.183.jar
-rw-r--r-- 1 root root 258919 Mar 9 23:44 jackson-core-2.6.7.jar
-rw-r--r-- 1 root root 46986 Mar 9 23:44 jackson-annotations-2.6.7.jar
-rw-r--r-- 1 root root 1170668 Mar 9 23:45 jackson-databind-2.6.7.jar
-rw-r--r-- 1 root root 621931 Mar 9 23:45 joda-time-2.8.1.jar
-rw-r--r-- 1 root root 747794 Mar 9 23:46 httpclient-4.5.3.jar
-rw-r--r-- 1 root root 326724 Mar 9 23:46 httpcore-4.4.4.jar
 
core-site.xml
 
  
 fs.s3.impl
 org.apache.hadoop.fs.s3a.S3AFileSystem  
 
  
 fs.s3.buffer.dir  
 /tmp
 
  

Errors:

00:56:52.494 INFO o.a.f.r.t.Task - Source: source -> Sink: S3-Sink-Ugly-Lib 
(1/1) (b70868c8543e8ea28813f6b745bbb85b) switched from RUNNING to FAILED.
com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 403, AWS 
Service: Amazon S3, AWS Request ID: 5224A5007E58235E, AWS Error Code: 
AccessDenied, AWS Error Message: Access Denied
at 
com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1393)
at 
com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108)
at 
com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
at 
com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
at 
com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
at 
com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

I am using IAM roles for granting access to S3 and the S3a filesystem. I am 
able to write to the bucket outside of the job (via command line). Any pointers 
on how to workaround this will be helpful!

Thanks much,
Lakshmi

On 2018/03/09 11:13:28, Stephan Ewen  wrote: 
> Hi!
> 
> Yes, the bucketing sink is unfortunately still tied to some specific Hadoop
> file systems, due to a special way of using truncate() and append().
> 
> This is very high up our list post the 1.5 release, possibly even
> backportable to 1.5.x.
> 
> The plan is to create a new Bucketing Sink based on Flink's file systems,
> meaning it can also work with Hadoop-free Flink when using file:// s3:// or
> so.
> 
> Best,
> Stephan
> 
> 
> On Fri, Mar 9, 2018 at 9:43 AM, Piotr Nowojski 
> wrote:
> 
> > Hi,
> >
> > There is an quite old ticket about this issue. Feel free to bump it in the
> > comment to rise it’s priority.
> >
> > https://issues.apache.org/jira/browse/FLINK-5789 <
> > https://issues.apache.org/jira/browse/FLINK-5789>
> >
> > Regarding a walk around, maybe someone else will know more. There was a
> > similar discussion on this topic which:
> >
> > http://apache-flink-user-mailing-list-archive.2336050.
> > n4.nabble.com/hadoop-free-hdfs-config-td17693.html <
> > http://apache-flink-user-mailing-list-archive.2336050.
> > n4.nabble.com/hadoop-free-hdfs-config-td17693.html>
> >
> > Piotrek
> >
> > > On 9 Mar 2018, at 02:11, l...@lyft.com wrote:
> > >
> > > I want to use the BucketingSink in the hadoop-free Flink system (i.e.
> > 1.4.0) but currently I am kind of blocked because of its dependency on the
> > Hadoop file system.
> > > 1. Is 

Using the BucketingSink with Flink 1.4.0

2018-03-08 Thread lrao
I want to use the BucketingSink in the hadoop-free Flink system (i.e. 1.4.0) 
but currently I am kind of blocked because of its dependency on the Hadoop file 
system. 
1. Is this something that's going to be fixed in the next version of Flink? 
2. In the meantime, to unblock myself, what is the best way forward? I have 
tried packaging the hadoop dependencies I need in my user jar but I run into 
problems while running the job. Stacktrace below:
```
21:26:09.654 INFO  o.a.f.r.t.Task - Source: source -> Sink: S3-Sink (1/1) 
(9ac2cb1fc2b913c3b9d75aace08bcd37) switched from RUNNING to FAILED.
java.lang.RuntimeException: Error while creating FileSystem when initializing 
the state of the BucketingSink.
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could 
not find a file system implementation for scheme 'hdfs'. The scheme is not 
directly supported by Flink and no Hadoop file system to support this scheme 
could be loaded.
at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:405)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
... 9 common frames omitted
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: 
Hadoop is not in the classpath/dependencies.
at 
org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)
at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
... 12 common frames omitted
```