Hi guys, I've been working on this feature as I needed something similar. Have a look at my issue here https://issues.apache.org/jira/browse/FLINK-4190 and changes here https://github.com/joshfg/flink/tree/flink-4190 The changes follow Kostas's suggestion in this thread.
Thanks, Josh On Thu, May 26, 2016 at 3:27 PM, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > while I think it would be possible to do it by creating a "meta sink" that > contains several RollingSinks I think the approach of integrating it into > the current RollinkSink is better. > > I think it's mostly a question of style and architectural purity but also > of resource consumption and maintainability. If there are several > RollingSinks in one other sink instead of just one RollingSink then we > duplicate all of the internal structures of RollingSink. For > maintainability, we would have to be very careful when interacting with the > nested sources to ensure that they really can behave as proper sources. > (watermarks, checkpoints, closing/disposing come to mind now but this might > grow in the future.) > > Cheers, > Aljoscha > > On Wed, 25 May 2016 at 11:35 Kostas Kloudas <k.klou...@data-artisans.com> > wrote: > >> Hi Juho, >> >> To be more aligned with the semantics in Flink, I would suggest a >> solution with a single modified RollingSink that caches >> multiple buckets (from the Bucketer) and flushes (some of) them to disk >> whenever certain time or space criteria are met. >> >> I would say that it is worth modifying the rolling sink so that it can >> such use cases (different flushing policies). >> Aljoscha, as the writer of the original Rolling Sink, what do you think? >> >> Kostas >> >> On May 25, 2016, at 8:21 AM, Juho Autio <juho.au...@rovio.com> wrote: >> >> Thanks, indeed the desired behavior is to flush if bucket size exceeds a >> limit but also if the bucket has been open long enough. Contrary to the >> current RollingSink we don't want to flush all the time if the bucket >> changes but have multiple buckets "open" as needed. >> >> In our case the date to use for partitioning comes from an event field, >> but needs to be formatted, too. The partitioning feature should be generic, >> allowing to pass a function that formats the bucket path for each tuple. >> >> Does it seem like a valid plan to create a sink that internally caches >> multiple rolling sinks? >> >> On Tue, May 24, 2016 at 3:50 PM, Kostas Kloudas < >> k.klou...@data-artisans.com> wrote: >> >>> Hi Juho, >>> >>> If I understand correctly, you want a custom RollingSink that caches some >>> buckets, one for each topic/date key, and whenever the volume of data >>> buffered >>> exceeds a limit, then it flushes to disk, right? >>> >>> If this is the case, then you are right that this is not currently >>> supported >>> out-of-the-box, but it would be interesting to update the RollingSink >>> to support such scenarios. >>> >>> One clarification: when you say that you want partition by date, >>> you mean the date of the event, right? Not the processing time. >>> >>> Kostas >>> >>> > On May 24, 2016, at 1:22 PM, Juho Autio <juho.au...@rovio.com> wrote: >>> > >>> > Could you suggest how to dynamically partition data with Flink >>> streaming? >>> > >>> > We've looked at RollingSink, that takes care of writing batches to S3, >>> but >>> > it doesn't allow defining the partition dynamically based on the tuple >>> > fields. >>> > >>> > Our data is coming from Kafka and essentially has the kafka topic and a >>> > date, among other fields. >>> > >>> > We'd like to consume all topics (also automatically subscribe to new >>> ones) >>> > and write to S3 partitioned by topic and date, for example: >>> > >>> > s3://bucket/path/topic=topic2/date=20160522/ >>> > s3://bucket/path/topic=topic2/date=20160523/ >>> > s3://bucket/path/topic=topic1/date=20160522/ >>> > s3://bucket/path/topic=topic1/date=20160523/ >>> > >>> > There are two problems with RollingSink as it is now: >>> > - Only allows partitioning by date >>> > - Flushes the batch every time the path changes. In our case the >>> stream can >>> > for example have a random mix of different topics and that would mean >>> that >>> > RollingSink isn't able to respect the max flush size but keeps >>> flushing the >>> > files pretty much on every tuple. >>> > >>> > We've thought that we could implement a sink that internally creates >>> and >>> > handles multiple RollingSink instances as needed for partitions. But it >>> > would be great to first hear any suggestions that you might have. >>> > >>> > If we have to extend RollingSink, it would be nice to make it take a >>> > partitioning function as a parameter. The function would be called for >>> each >>> > tuple to create the output path. >>> > >>> > >>> > >>> > -- >>> > View this message in context: >>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dynamic-partitioning-for-stream-output-tp7122.html >>> > Sent from the Apache Flink User Mailing List archive. mailing list >>> archive at Nabble.com <http://nabble.com>. >>> >> >>