I think that's independent of the serializer registration. What's important is registering the types at the execution environment.
On Fri, Feb 24, 2017 at 7:06 PM, Dmitry Golubets <dgolub...@gmail.com> wrote: > Hi Robert, > > The bottleneck operator is working with a state (many hash maps basically) > and it's algorithm is not parallelizeable. > We took an approach of preloading all required data from external systems, > so that operators don't have to do any network communication during a > data-record processing (cos even 1 ms delay will sum up to days on big > numbers). > > The network should not be a problem yet. > I was doing a test run on one big server (32 cores, 128 gb ram), so apart > from software network stack, no real network (cross-machine) should be > involved. > > Does Flink write a type string or id when I register my own Kryo > serializer? > > > Best regards, > Dmitry > > On Thu, Feb 23, 2017 at 8:59 PM, Robert Metzger <rmetz...@apache.org> > wrote: > >> Hi Dmitry, >> >> Cool! Looks like you've taken the right approach to analyze the >> performance issues! >> Often the deserialization of the input is already a performance killer :) >> >> What is this one operator that is the bottleneck doing? >> Does it have a lot of state? Is it CPU intensive, or talking to an >> external system? >> >> What is your network situation? How many shuffles are you doing, whats >> the size of each of the records and how much bandwidth do you have between >> the machines? >> >> one thing you can do to further optimize the performance is to make sure >> that all types (including subtypes) that are serialized with Kryo are >> registered. >> Everything that has a GenericTypeInformation at the API level goes >> through Kryo. >> If you have a "com.acme.datatype.MyType" and the type is not registered, >> Kryo will write the string "com.acme.datatype.MyType" every time it >> serializes data from that type. >> With registering the type, you'll just serialize an integer id. So the >> amount of data being transferred goes down drastically. >> >> The disableAutoTypeRegistration flag is ignored in the DataStream API at >> the moment. >> >> >> >> >> >> >> >> On Thu, Feb 23, 2017 at 7:00 PM, Dmitry Golubets <dgolub...@gmail.com> >> wrote: >> >>> Hi Robert, >>> >>> In dev environment I load data via zipped csv files from s3. >>> Data is parsed in a case classes. >>> It's quite fast, I'm able to get ~80k/sec with only source and >>> "dev/null" sink. >>> Checkpointing is enabled with 1 hour intervals. >>> >>> Yes, one of the operators is a bottleneck and it backpressures. >>> Reading data and passing it just though that operator drops the rate >>> down to 30k/sec. >>> >>> But then after all other components are added to the stream it goes down >>> to 15k/sec. >>> No other component causes backpressure. >>> >>> I understand that it's not possible to keep the rate the same when >>> adding more components due to communication overhead. >>> I'm just trying to reduce it. >>> >>> >>> Best regards, >>> Dmitry >>> >>> On Thu, Feb 23, 2017 at 4:17 PM, Robert Metzger <rmetz...@apache.org> >>> wrote: >>> >>>> Hi Dmitry, >>>> >>>> sorry for the late response. >>>> Where are you reading the data from? >>>> Did you check if one operator is causing backpressure? >>>> >>>> Are you using checkpointing? >>>> >>>> Serialization is often the cause for slow processing. However, its very >>>> hard to diagnose potential other causes without any details on your job. >>>> >>>> Are you deserializing data from Kafka into a case classes? If so, what >>>> are you using for doing that? >>>> >>>> Regards, >>>> Robert >>>> >>>> On Fri, Feb 17, 2017 at 9:17 PM, Dmitry Golubets <dgolub...@gmail.com> >>>> wrote: >>>> >>>>> Hi Daniel, >>>>> >>>>> I've implemented a macro that generates message pack serializers in >>>>> our codebase. >>>>> Resulting code is basically a series of writes\reads like in >>>>> hand-written structured serialization. >>>>> >>>>> E.g. given >>>>> case class Data1(str: String, subdata: Data2) >>>>> case class Data2(num: Int) >>>>> >>>>> serialization code for Data1 will be like: >>>>> packer.packString(str) >>>>> packer.packInt(num) >>>>> >>>>> The data structures in our project are quite big (2-4kb in json) and >>>>> contain nested classes with many fields. >>>>> So custom serialization helps us to avoid reflection and reduces data >>>>> size to send over the network. >>>>> >>>>> However, it worth mentioning, I see that on small case classes Flink >>>>> default serialization works faster. >>>>> >>>>> >>>>> Best regards, >>>>> Dmitry >>>>> >>>>> On Fri, Feb 17, 2017 at 6:01 PM, Daniel Santos <dsan...@cryptolab.net> >>>>> wrote: >>>>> >>>>>> Hello Dimitry, >>>>>> >>>>>> Could you please elaborate on your tuning on -> >>>>>> environment.addDefaultKryoSerializer(..) . >>>>>> >>>>>> I'm interested on knowing what have you done there for a boost of >>>>>> about 50% . >>>>>> >>>>>> Some small or simple example would be very nice. >>>>>> >>>>>> Thank you very much in advance. >>>>>> >>>>>> Kind Regards, >>>>>> >>>>>> Daniel Santos >>>>>> >>>>>> On 02/17/2017 12:43 PM, Dmitry Golubets wrote: >>>>>> >>>>>> Hi, >>>>>> >>>>>> My streaming job cannot benefit much from parallelization >>>>>> unfortunately. >>>>>> So I'm looking for things I can tune in Flink, to make it process >>>>>> sequential stream faster. >>>>>> >>>>>> So far in our current engine based on Akka Streams (non distributed >>>>>> ofc) we have 20k msg/sec. >>>>>> Ported to Flink I'm getting 14k so far. >>>>>> >>>>>> My observations are following: >>>>>> >>>>>> - if I chain operations together they execute all in sequence, so >>>>>> I basically sum up the time required to process one data item across >>>>>> all my >>>>>> stream operators, not good >>>>>> - if I split chains, they execute asynchronously to each other, >>>>>> but there is serialization and network overhead >>>>>> >>>>>> Second approach gives me better results, considering that I have a >>>>>> server with more than enough memory and cores to do all side work for >>>>>> serialization. But I want to reduce this serialization\data transfer >>>>>> overhead to a minimum. >>>>>> >>>>>> So what I have now: >>>>>> >>>>>> environment.getConfig.enableObjectReuse() // cos it's Scala we don't >>>>>> need unnecessary serialization >>>>>> environment.getConfig.disableAutoTypeRegistration() // it works >>>>>> faster with it, I'm not sure why >>>>>> environment.addDefaultKryoSerializer(..) // custom Message Pack >>>>>> serialization for all message types, gives about 50% boost >>>>>> >>>>>> But that's it, I don't know what else to do. >>>>>> I didn't find any interesting network\buffer settings in docs. >>>>>> >>>>>> Best regards, >>>>>> Dmitry >>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >