I am I on the right path with the following:

class S3SinkFunc implements SinkFunction<String> {
    public void invoke(String element) {
        System.out.println(element);
        // don't have access to dataStream to call .addSink() :-(
    }
}

Thanks,

> On 16 Aug 2017, at 12:24, Kostas Kloudas <k.klou...@data-artisans.com> wrote:
> 
> Hi Ant,
> 
> I think you can do it by implementing your own Bucketer.
> 
> Cheers,
> Kostas
> 
> .
>> On Aug 16, 2017, at 1:09 PM, ant burton <apburto...@gmail.com> wrote:
>> 
>> Hello,
>> 
>> Given 
>> 
>>       // Set StreamExecutionEnvironment
>>       final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> 
>>       // Set checkpoints in ms
>>       env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> 
>>       // Add source (input stream)
>>       DataStream<String> dataStream = StreamUtil.getDataStream(env, params);
>> 
>> How can I construct the s3_filename from the content of the an event, it 
>> seems that whenever I attempt this I either have access to an event or 
>> access to .addSink but not both.
>> 
>>      dataStream.addSink(new BucketingSink<String>("s3a://flink/" + 
>> s3_filename));
>> 
>> 
>> Thanks,
>> 
>> 
>> 
>> 
> 

Reply via email to