I think that's independent of the serializer registration.

What's important is registering the types at the execution environment.

On Fri, Feb 24, 2017 at 7:06 PM, Dmitry Golubets <dgolub...@gmail.com>
wrote:

> Hi Robert,
>
> The bottleneck operator is working with a state (many hash maps basically)
> and it's algorithm is not parallelizeable.
> We took an approach of preloading all required data from external systems,
> so that operators don't have to do any network communication during a
> data-record processing (cos even 1 ms delay will sum up to days on big
> numbers).
>
> The network should not be a problem yet.
> I was doing a test run on one big server (32 cores, 128 gb ram), so apart
> from software network stack, no real network (cross-machine) should be
> involved.
>
> Does Flink write a type string or id when I register my own Kryo
> serializer?
>
>
> Best regards,
> Dmitry
>
> On Thu, Feb 23, 2017 at 8:59 PM, Robert Metzger <rmetz...@apache.org>
> wrote:
>
>> Hi Dmitry,
>>
>> Cool! Looks like you've taken the right approach to analyze the
>> performance issues!
>> Often the deserialization of the input is already a performance killer :)
>>
>> What is this one operator that is the bottleneck doing?
>> Does it have a lot of state? Is it CPU intensive, or talking to an
>> external system?
>>
>> What is your network situation? How many shuffles are you doing, whats
>> the size of each of the records and how much bandwidth do you have between
>> the machines?
>>
>> one thing you can do to further optimize the performance is to make sure
>> that all types (including subtypes) that are serialized with Kryo are
>> registered.
>> Everything that has a GenericTypeInformation at the API level goes
>> through Kryo.
>> If you have a "com.acme.datatype.MyType" and  the type is not registered,
>> Kryo will write the string "com.acme.datatype.MyType" every time it
>> serializes data from that type.
>> With registering the type, you'll just serialize an integer id. So the
>> amount of data being transferred goes down drastically.
>>
>> The disableAutoTypeRegistration flag is ignored in the DataStream API at
>> the moment.
>>
>>
>>
>>
>>
>>
>>
>> On Thu, Feb 23, 2017 at 7:00 PM, Dmitry Golubets <dgolub...@gmail.com>
>> wrote:
>>
>>> Hi Robert,
>>>
>>> In dev environment I load data via zipped csv files from s3.
>>> Data is parsed in a case classes.
>>> It's quite fast, I'm able to get ~80k/sec with only source and
>>> "dev/null" sink.
>>> Checkpointing is enabled  with 1 hour intervals.
>>>
>>> Yes, one of the operators is a bottleneck and it backpressures.
>>> Reading data and passing it just though that operator drops the rate
>>> down to 30k/sec.
>>>
>>> But then after all other components are added to the stream it goes down
>>> to 15k/sec.
>>> No other component causes backpressure.
>>>
>>> I understand that it's not possible to keep the rate the same when
>>> adding more components due to communication overhead.
>>> I'm just trying to reduce it.
>>>
>>>
>>> Best regards,
>>> Dmitry
>>>
>>> On Thu, Feb 23, 2017 at 4:17 PM, Robert Metzger <rmetz...@apache.org>
>>> wrote:
>>>
>>>> Hi Dmitry,
>>>>
>>>> sorry for the late response.
>>>> Where are you reading the data from?
>>>> Did you check if one operator is causing backpressure?
>>>>
>>>> Are you using checkpointing?
>>>>
>>>> Serialization is often the cause for slow processing. However, its very
>>>> hard to diagnose potential other causes without any details on your job.
>>>>
>>>> Are you deserializing data from Kafka into a case classes? If so, what
>>>> are you using for doing that?
>>>>
>>>> Regards,
>>>> Robert
>>>>
>>>> On Fri, Feb 17, 2017 at 9:17 PM, Dmitry Golubets <dgolub...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Daniel,
>>>>>
>>>>> I've implemented a macro that generates message pack serializers in
>>>>> our codebase.
>>>>> Resulting code is basically a series of writes\reads like in
>>>>> hand-written structured serialization.
>>>>>
>>>>> E.g. given
>>>>> case class Data1(str: String, subdata: Data2)
>>>>> case class Data2(num: Int)
>>>>>
>>>>> serialization code for Data1 will be like:
>>>>> packer.packString(str)
>>>>> packer.packInt(num)
>>>>>
>>>>> The data structures in our project are quite big (2-4kb in json) and
>>>>> contain nested classes with many fields.
>>>>> So custom serialization helps us to avoid reflection and reduces data
>>>>> size to send over the network.
>>>>>
>>>>> However, it worth mentioning, I see that on small case classes Flink
>>>>> default serialization works faster.
>>>>>
>>>>>
>>>>> Best regards,
>>>>> Dmitry
>>>>>
>>>>> On Fri, Feb 17, 2017 at 6:01 PM, Daniel Santos <dsan...@cryptolab.net>
>>>>> wrote:
>>>>>
>>>>>> Hello Dimitry,
>>>>>>
>>>>>> Could you please elaborate on your tuning on ->
>>>>>> environment.addDefaultKryoSerializer(..) .
>>>>>>
>>>>>> I'm interested on knowing what have you done there for a boost of
>>>>>> about 50% .
>>>>>>
>>>>>> Some small or simple example would be very nice.
>>>>>>
>>>>>> Thank you very much in advance.
>>>>>>
>>>>>> Kind Regards,
>>>>>>
>>>>>> Daniel Santos
>>>>>>
>>>>>> On 02/17/2017 12:43 PM, Dmitry Golubets wrote:
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> My streaming job cannot benefit much from parallelization
>>>>>> unfortunately.
>>>>>> So I'm looking for things I can tune in Flink, to make it process
>>>>>> sequential stream faster.
>>>>>>
>>>>>> So far in our current engine based on Akka Streams (non distributed
>>>>>> ofc) we have 20k msg/sec.
>>>>>> Ported to Flink I'm getting 14k so far.
>>>>>>
>>>>>> My observations are following:
>>>>>>
>>>>>>    - if I chain operations together they execute all in sequence, so
>>>>>>    I basically sum up the time required to process one data item across 
>>>>>> all my
>>>>>>    stream operators, not good
>>>>>>    - if I split chains, they execute asynchronously to each other,
>>>>>>    but there is serialization and network overhead
>>>>>>
>>>>>> Second approach gives me better results, considering that I have a
>>>>>> server with more than enough memory and cores to do all side work for
>>>>>> serialization. But I want to reduce this serialization\data transfer
>>>>>> overhead to a minimum.
>>>>>>
>>>>>> So what I have now:
>>>>>>
>>>>>> environment.getConfig.enableObjectReuse() // cos it's Scala we don't
>>>>>> need unnecessary serialization
>>>>>> environment.getConfig.disableAutoTypeRegistration() // it works
>>>>>> faster with it, I'm not sure why
>>>>>> environment.addDefaultKryoSerializer(..) // custom Message Pack
>>>>>> serialization for all message types, gives about 50% boost
>>>>>>
>>>>>> But that's it, I don't know what else to do.
>>>>>> I didn't find any interesting network\buffer settings in docs.
>>>>>>
>>>>>> Best regards,
>>>>>> Dmitry
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to