Re: Access to datastream from BucketSink- RESOLVED

2017-08-16 Thread ant burton
I have resolved my issue, thank you for your help. The following code give me access to an element to determine a bucket directory name. import org.apache.hadoop.fs.Path; import org.apache.flink.streaming.connectors.fs.bucketing.Bucketer; import org.apache.flink.streaming.connectors.fs.Clock;

Re: Access to datastream from BucketSink

2017-08-16 Thread ant burton
Thank you for your help it’s greatly appreciated. My aim is to be able “ use a property of the element to determine the bucket directory” With your suggestions, this is what I have so far, its obviously wrong, I hope I’m getting closer. Is it correct to still implement Bucketer, just change

Re: Access to datastream from BucketSink

2017-08-16 Thread Kostas Kloudas
Hi Ant, I think you are implementing the wrong Bucketer. This seems to be the one for the RollingSink which is deprecated. Is this correct? You should implement the BucketingSink one, which is in the package: org.apache.flink.streaming.connectors.fs.bucketing That one requires the

Re: Access to datastream from BucketSink

2017-08-16 Thread ant burton
Thanks Kostas, I’m narrowing in on a solution: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/filesystem_sink.html says "You can also specify a custom bucketer by

Re: Access to datastream from BucketSink

2017-08-16 Thread Kostas Kloudas
In the second link for the BucketingSink, you can set your own Bucketer using the setBucketer method. You do not have to implement your own sink from scratch. Kostas > On Aug 16, 2017, at 1:39 PM, ant burton wrote: > > or rather >

Re: Access to datastream from BucketSink

2017-08-16 Thread ant burton
or rather https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html >

Re: Access to datastream from BucketSink

2017-08-16 Thread ant burton
I am I on the right path with the following: class S3SinkFunc implements SinkFunction { public void invoke(String element) { System.out.println(element); // don't have access to dataStream to call .addSink() :-( } } Thanks, > On 16 Aug 2017, at 12:24, Kostas Kloudas

Re: Access to datastream from BucketSink

2017-08-16 Thread Kostas Kloudas
Hi Ant, I think you can do it by implementing your own Bucketer. Cheers, Kostas . > On Aug 16, 2017, at 1:09 PM, ant burton wrote: > > Hello, > > Given > >// Set StreamExecutionEnvironment >final StreamExecutionEnvironment env = >

Access to datastream from BucketSink

2017-08-16 Thread ant burton
Hello, Given // Set StreamExecutionEnvironment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Set checkpoints in ms env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // Add source (input