Hi Robert,

The bottleneck operator is working with a state (many hash maps basically)
and it's algorithm is not parallelizeable.
We took an approach of preloading all required data from external systems,
so that operators don't have to do any network communication during a
data-record processing (cos even 1 ms delay will sum up to days on big
numbers).

The network should not be a problem yet.
I was doing a test run on one big server (32 cores, 128 gb ram), so apart
from software network stack, no real network (cross-machine) should be
involved.

Does Flink write a type string or id when I register my own Kryo serializer?


Best regards,
Dmitry

On Thu, Feb 23, 2017 at 8:59 PM, Robert Metzger <rmetz...@apache.org> wrote:

> Hi Dmitry,
>
> Cool! Looks like you've taken the right approach to analyze the
> performance issues!
> Often the deserialization of the input is already a performance killer :)
>
> What is this one operator that is the bottleneck doing?
> Does it have a lot of state? Is it CPU intensive, or talking to an
> external system?
>
> What is your network situation? How many shuffles are you doing, whats the
> size of each of the records and how much bandwidth do you have between the
> machines?
>
> one thing you can do to further optimize the performance is to make sure
> that all types (including subtypes) that are serialized with Kryo are
> registered.
> Everything that has a GenericTypeInformation at the API level goes through
> Kryo.
> If you have a "com.acme.datatype.MyType" and  the type is not registered,
> Kryo will write the string "com.acme.datatype.MyType" every time it
> serializes data from that type.
> With registering the type, you'll just serialize an integer id. So the
> amount of data being transferred goes down drastically.
>
> The disableAutoTypeRegistration flag is ignored in the DataStream API at
> the moment.
>
>
>
>
>
>
>
> On Thu, Feb 23, 2017 at 7:00 PM, Dmitry Golubets <dgolub...@gmail.com>
> wrote:
>
>> Hi Robert,
>>
>> In dev environment I load data via zipped csv files from s3.
>> Data is parsed in a case classes.
>> It's quite fast, I'm able to get ~80k/sec with only source and "dev/null"
>> sink.
>> Checkpointing is enabled  with 1 hour intervals.
>>
>> Yes, one of the operators is a bottleneck and it backpressures.
>> Reading data and passing it just though that operator drops the rate down
>> to 30k/sec.
>>
>> But then after all other components are added to the stream it goes down
>> to 15k/sec.
>> No other component causes backpressure.
>>
>> I understand that it's not possible to keep the rate the same when adding
>> more components due to communication overhead.
>> I'm just trying to reduce it.
>>
>>
>> Best regards,
>> Dmitry
>>
>> On Thu, Feb 23, 2017 at 4:17 PM, Robert Metzger <rmetz...@apache.org>
>> wrote:
>>
>>> Hi Dmitry,
>>>
>>> sorry for the late response.
>>> Where are you reading the data from?
>>> Did you check if one operator is causing backpressure?
>>>
>>> Are you using checkpointing?
>>>
>>> Serialization is often the cause for slow processing. However, its very
>>> hard to diagnose potential other causes without any details on your job.
>>>
>>> Are you deserializing data from Kafka into a case classes? If so, what
>>> are you using for doing that?
>>>
>>> Regards,
>>> Robert
>>>
>>> On Fri, Feb 17, 2017 at 9:17 PM, Dmitry Golubets <dgolub...@gmail.com>
>>> wrote:
>>>
>>>> Hi Daniel,
>>>>
>>>> I've implemented a macro that generates message pack serializers in our
>>>> codebase.
>>>> Resulting code is basically a series of writes\reads like in
>>>> hand-written structured serialization.
>>>>
>>>> E.g. given
>>>> case class Data1(str: String, subdata: Data2)
>>>> case class Data2(num: Int)
>>>>
>>>> serialization code for Data1 will be like:
>>>> packer.packString(str)
>>>> packer.packInt(num)
>>>>
>>>> The data structures in our project are quite big (2-4kb in json) and
>>>> contain nested classes with many fields.
>>>> So custom serialization helps us to avoid reflection and reduces data
>>>> size to send over the network.
>>>>
>>>> However, it worth mentioning, I see that on small case classes Flink
>>>> default serialization works faster.
>>>>
>>>>
>>>> Best regards,
>>>> Dmitry
>>>>
>>>> On Fri, Feb 17, 2017 at 6:01 PM, Daniel Santos <dsan...@cryptolab.net>
>>>> wrote:
>>>>
>>>>> Hello Dimitry,
>>>>>
>>>>> Could you please elaborate on your tuning on ->
>>>>> environment.addDefaultKryoSerializer(..) .
>>>>>
>>>>> I'm interested on knowing what have you done there for a boost of
>>>>> about 50% .
>>>>>
>>>>> Some small or simple example would be very nice.
>>>>>
>>>>> Thank you very much in advance.
>>>>>
>>>>> Kind Regards,
>>>>>
>>>>> Daniel Santos
>>>>>
>>>>> On 02/17/2017 12:43 PM, Dmitry Golubets wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> My streaming job cannot benefit much from parallelization
>>>>> unfortunately.
>>>>> So I'm looking for things I can tune in Flink, to make it process
>>>>> sequential stream faster.
>>>>>
>>>>> So far in our current engine based on Akka Streams (non distributed
>>>>> ofc) we have 20k msg/sec.
>>>>> Ported to Flink I'm getting 14k so far.
>>>>>
>>>>> My observations are following:
>>>>>
>>>>>    - if I chain operations together they execute all in sequence, so
>>>>>    I basically sum up the time required to process one data item across 
>>>>> all my
>>>>>    stream operators, not good
>>>>>    - if I split chains, they execute asynchronously to each other,
>>>>>    but there is serialization and network overhead
>>>>>
>>>>> Second approach gives me better results, considering that I have a
>>>>> server with more than enough memory and cores to do all side work for
>>>>> serialization. But I want to reduce this serialization\data transfer
>>>>> overhead to a minimum.
>>>>>
>>>>> So what I have now:
>>>>>
>>>>> environment.getConfig.enableObjectReuse() // cos it's Scala we don't
>>>>> need unnecessary serialization
>>>>> environment.getConfig.disableAutoTypeRegistration() // it works
>>>>> faster with it, I'm not sure why
>>>>> environment.addDefaultKryoSerializer(..) // custom Message Pack
>>>>> serialization for all message types, gives about 50% boost
>>>>>
>>>>> But that's it, I don't know what else to do.
>>>>> I didn't find any interesting network\buffer settings in docs.
>>>>>
>>>>> Best regards,
>>>>> Dmitry
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to