Hi guys,

I've been working on this feature as I needed something similar. Have a
look at my issue here https://issues.apache.org/jira/browse/FLINK-4190 and
changes here https://github.com/joshfg/flink/tree/flink-4190
The changes follow Kostas's suggestion in this thread.

Thanks,
Josh


On Thu, May 26, 2016 at 3:27 PM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
> while I think it would be possible to do it by creating a "meta sink" that
> contains several RollingSinks I think the approach of integrating it into
> the current RollinkSink is better.
>
> I think it's mostly a question of style and architectural purity but also
> of resource consumption and maintainability. If there are several
> RollingSinks in one other sink instead of just one RollingSink then we
> duplicate all of the internal structures of RollingSink. For
> maintainability, we would have to be very careful when interacting with the
> nested sources to ensure that they really can behave as proper sources.
> (watermarks, checkpoints, closing/disposing come to mind now but this might
> grow in the future.)
>
> Cheers,
> Aljoscha
>
> On Wed, 25 May 2016 at 11:35 Kostas Kloudas <k.klou...@data-artisans.com>
> wrote:
>
>> Hi Juho,
>>
>> To be more aligned with the semantics in Flink, I would suggest a
>> solution with a single modified RollingSink that caches
>> multiple buckets (from the Bucketer) and flushes (some of) them to disk
>> whenever certain time or space criteria are met.
>>
>> I would say that it is worth modifying the rolling sink so that it can
>> such use cases (different flushing policies).
>> Aljoscha, as the writer of the original Rolling Sink, what do you think?
>>
>> Kostas
>>
>> On May 25, 2016, at 8:21 AM, Juho Autio <juho.au...@rovio.com> wrote:
>>
>> Thanks, indeed the desired behavior is to flush if bucket size exceeds a
>> limit but also if the bucket has been open long enough. Contrary to the
>> current RollingSink we don't want to flush all the time if the bucket
>> changes but have multiple buckets "open" as needed.
>>
>> In our case the date to use for partitioning comes from an event field,
>> but needs to be formatted, too. The partitioning feature should be generic,
>> allowing to pass a function that formats the bucket path for each tuple.
>>
>> Does it seem like a valid plan to create a sink that internally caches
>> multiple rolling sinks?
>>
>> On Tue, May 24, 2016 at 3:50 PM, Kostas Kloudas <
>> k.klou...@data-artisans.com> wrote:
>>
>>> Hi Juho,
>>>
>>> If I understand correctly, you want a custom RollingSink that caches some
>>> buckets, one for each topic/date key, and whenever the volume of data
>>> buffered
>>> exceeds a limit, then it flushes to disk, right?
>>>
>>> If this is the case, then you are right that this is not currently
>>> supported
>>> out-of-the-box, but it would be interesting to update the RollingSink
>>> to support such scenarios.
>>>
>>> One clarification: when you say that you want partition by date,
>>> you mean the date of the event, right? Not the processing time.
>>>
>>> Kostas
>>>
>>> > On May 24, 2016, at 1:22 PM, Juho Autio <juho.au...@rovio.com> wrote:
>>> >
>>> > Could you suggest how to dynamically partition data with Flink
>>> streaming?
>>> >
>>> > We've looked at RollingSink, that takes care of writing batches to S3,
>>> but
>>> > it doesn't allow defining the partition dynamically based on the tuple
>>> > fields.
>>> >
>>> > Our data is coming from Kafka and essentially has the kafka topic and a
>>> > date, among other fields.
>>> >
>>> > We'd like to consume all topics (also automatically subscribe to new
>>> ones)
>>> > and write to S3 partitioned by topic and date, for example:
>>> >
>>> > s3://bucket/path/topic=topic2/date=20160522/
>>> > s3://bucket/path/topic=topic2/date=20160523/
>>> > s3://bucket/path/topic=topic1/date=20160522/
>>> > s3://bucket/path/topic=topic1/date=20160523/
>>> >
>>> > There are two problems with RollingSink as it is now:
>>> > - Only allows partitioning by date
>>> > - Flushes the batch every time the path changes. In our case the
>>> stream can
>>> > for example have a random mix of different topics and that would mean
>>> that
>>> > RollingSink isn't able to respect the max flush size but keeps
>>> flushing the
>>> > files pretty much on every tuple.
>>> >
>>> > We've thought that we could implement a sink that internally creates
>>> and
>>> > handles multiple RollingSink instances as needed for partitions. But it
>>> > would be great to first hear any suggestions that you might have.
>>> >
>>> > If we have to extend RollingSink, it would be nice to make it take a
>>> > partitioning function as a parameter. The function would be called for
>>> each
>>> > tuple to create the output path.
>>> >
>>> >
>>> >
>>> > --
>>> > View this message in context:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dynamic-partitioning-for-stream-output-tp7122.html
>>> > Sent from the Apache Flink User Mailing List archive. mailing list
>>> archive at Nabble.com <http://nabble.com>.
>>>
>>
>>

Reply via email to