In the second link for the BucketingSink, you can set your own Bucketer using the setBucketer method. You do not have to implement your own sink from scratch.
Kostas > On Aug 16, 2017, at 1:39 PM, ant burton <apburto...@gmail.com> wrote: > > or rather > https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html > > <https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html> > > >> On 16 Aug 2017, at 12:24, Kostas Kloudas <k.klou...@data-artisans.com >> <mailto:k.klou...@data-artisans.com>> wrote: >> >> Hi Ant, >> >> I think you can do it by implementing your own Bucketer. >> >> Cheers, >> Kostas >> >> . >>> On Aug 16, 2017, at 1:09 PM, ant burton <apburto...@gmail.com >>> <mailto:apburto...@gmail.com>> wrote: >>> >>> Hello, >>> >>> Given >>> >>> // Set StreamExecutionEnvironment >>> final StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> >>> // Set checkpoints in ms >>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >>> >>> // Add source (input stream) >>> DataStream<String> dataStream = StreamUtil.getDataStream(env, params); >>> >>> How can I construct the s3_filename from the content of the an event, it >>> seems that whenever I attempt this I either have access to an event or >>> access to .addSink but not both. >>> >>> dataStream.addSink(new BucketingSink<String>("s3a://flink/ >>> <s3a://flink/>" + s3_filename)); >>> >>> >>> Thanks, >>> >>> >>> >>> >> >