Oh you have multiple different output formats, missed that.

For the Batch API you are i believe correct, using a custom output-format is the best solution.

In the Streaming API the code below should be equally fast, if the filtered sets don't overlap.

input = ...
input.filter(conditionA).output(formatA)
input.filter(conditonB).output(formatB)

That is because all filters would be chained; hell all sources might be as well (not to sure on this one).

On 01.05.2017 17:05, Newport, Billy wrote:

There is likely a bug then, the ENUM,Record stream to a filter to a set of outputformats per filter was slower than the BITMASK,Record to single OutputFormat which demux’s the data to each file internally

Are you saying do a custom writer inside a map rather than either of the 2 above approaches?

*From:*Chesnay Schepler [mailto:ches...@apache.org]
*Sent:* Monday, May 01, 2017 10:41 AM
*To:* user@flink.apache.org
*Subject:* Re: Collector.collect

Hello,

@Billy, what prevented you from duplicating/splitting the record, based on the bitmask, in a map function before the sink? This shouldn't incur any serialization overhead if the sink is chained to the map. The emitted Tuple could also share the
GenericRecord; meaning you don't even have to copy it.

On 01.05.2017 14:52, Newport, Billy wrote:

    We’ve done that but it’s very expensive from a serialization point
    of view when writing the same record multiple times, each in a
    different tuple.

    For example, we started with this:

    .collect(new Tuple<Short, GenericRecord)).

    The record would be written with short = 0 and again with short =
    1. This results in the GenericRecord being serialized twice. You
    also prolly need filters on the output dataset which is expensive
    also.

    We switched instead to a bitmask. Now, we write the record once
    and set bits in the short for each file the record needs to be
    written to. Our next step is to write records to a file based on
    the short. We wrote a new outputrecordformat which checks the bits
    in the short and writes the GenericRecord to each file for the
    corresponding bit. This means no filter to split the records for
    each file and this is much faster.

    We’re finding a need to do this kind of optimization pretty
    frequently with flink.

    *From:*Gaurav Khandelwal [mailto:gaurav671...@gmail.com]
    *Sent:* Saturday, April 29, 2017 4:32 AM
    *To:* user@flink.apache.org <mailto:user@flink.apache.org>
    *Subject:* Collector.collect

    Hello

    I am working on RichProcessFunction and I want to emit multiple
    records at a time. To achieve this, I am currently doing :

    while(condition)

    {

     Collector.collect(new Tuple<>...);

    }

    I was wondering, is this the correct way or there is any other
    alternative.


Reply via email to