In the second link for the BucketingSink, you can set your 
own Bucketer using the setBucketer method. You do not have to 
implement your own sink from scratch.

Kostas

> On Aug 16, 2017, at 1:39 PM, ant burton <apburto...@gmail.com> wrote:
> 
> or rather 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html>
> 
> 
>> On 16 Aug 2017, at 12:24, Kostas Kloudas <k.klou...@data-artisans.com 
>> <mailto:k.klou...@data-artisans.com>> wrote:
>> 
>> Hi Ant,
>> 
>> I think you can do it by implementing your own Bucketer.
>> 
>> Cheers,
>> Kostas
>> 
>> .
>>> On Aug 16, 2017, at 1:09 PM, ant burton <apburto...@gmail.com 
>>> <mailto:apburto...@gmail.com>> wrote:
>>> 
>>> Hello,
>>> 
>>> Given 
>>> 
>>>       // Set StreamExecutionEnvironment
>>>       final StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> 
>>>       // Set checkpoints in ms
>>>       env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>> 
>>>       // Add source (input stream)
>>>       DataStream<String> dataStream = StreamUtil.getDataStream(env, params);
>>> 
>>> How can I construct the s3_filename from the content of the an event, it 
>>> seems that whenever I attempt this I either have access to an event or 
>>> access to .addSink but not both.
>>> 
>>>     dataStream.addSink(new BucketingSink<String>("s3a://flink/ 
>>> <s3a://flink/>" + s3_filename));
>>> 
>>> 
>>> Thanks,
>>> 
>>> 
>>> 
>>> 
>> 
> 

Reply via email to