Hi Billy,

on the Beam side, you probably have looked into writing your own Coder
(the equivalent of a TypeSerializer in Flink). If yes, did that not work
out for you? And if yes, why?


Best,

Aljoscha





On Thu, Mar 2, 2017, at 22:02, Stephan Ewen wrote:

> Hi!

> 

> I can write some more details later, here the short answer:

> 

>   - Your serializer would do into something like the AvroSerializer

> 

>   - When you instantiate the AvroSerializer in
>     GenericTypeInfo.createSerializer(ExecutionConfig), you pre-
>     register the type of the generic type, plus all types in
>     "ExecutionConfig.getRegisteredKryoTypes()"
>     (we abuse the Kryo type registration here as an Avro type
>     registration initially, would have to polish that later)
> 

>   - The registered types are classes, but since they are Avro types,
>     you should be able to get their schema (for Reflect Data or so)
> 

> That way, Flink would internally forward all the registrations for you
> (similar as it forwards Kryo registrations) and you don't have to
> manually do that.
> 

> Stephan

> 

> 

> On Thu, Mar 2, 2017 at 9:31 PM, Newport, Billy
> <billy.newp...@gs.com> wrote:
>> This is what we’re using as our serializer:____



>> __ __



>> Somewhere:____



>> __ __



>>            env.addDefaultKryoSerializer(Record.*class*,
>>            GRKryoSerializer.*class*);____
>> __ __



>> then____



>> __ __



>> *public* *class* GRKryoSerializer *extends*
>> Serializer<GenericData.Record>____
>> {____



>>      *static* *class* AvroStuff____



>>      {____



>>            Schema schema;____



>>            String comment;____



>>            *long* key;____



>>            DatumReader<GenericRecord> reader;____



>>            DatumWriter<GenericRecord> writer;____



>>      }____



>>      *static* Map<Long, AvroStuff> *schemaMap* = *new*
>>      ConcurrentHashMap<>();____
>>      *static* Map<Schema, Long> *schemaToFingerprintMap* = *new*
>>      ConcurrentHashMap<>();____
>>      *static* Logger *log* =
>>      Logger.*getLogger*(GRKryoSerializer.*class*.getName());____
>>      ____



>>      ____



>>      *static* *public* *void* preregisterSchema(String comment,
>>      Schema schema)____
>>      {____



>>            *if*(!*schemaToFingerprintMap*.containsKey(schema)){____
>>                 *long* fingerprint = SchemaNormalization.*parsingFin-
>>                 gerprint64*(schema);____
>>                 AvroStuff stuff = *new* AvroStuff();____



>>                 stuff.schema = schema;____



>>                 stuff.comment = comment;____



>>                 stuff.key = fingerprint;____



>>                 stuff.reader = *new*
>>                 GenericDatumReader<>(schema);____
>>                 stuff.writer = *new*
>>                 GenericDatumWriter<>(schema);____
>>                 *log*.info(String.*format*("Preregistering schema for
>>                 %s with fingerprint %d", comment, fingerprint));____
>>                 *schemaMap*.put(fingerprint, stuff);____



>>                 *schemaToFingerprintMap*.put(schema,
>>                 fingerprint);____
>>            }____



>>      }____



>>      ____



>>      *public* GRKryoSerializer() {____



>>      }____



>> __ __



>>      *static* *public* *void* clearSchemaCache()____



>>      {____



>>            *schemaToFingerprintMap*.clear();____



>>            *schemaMap*.clear();____



>>      }____



>>      ____



>>      *static* *public* AvroStuff getStuffFor(GenericRecord o)____



>>      {____



>>            *return* *getStuffFor*(o.getSchema());____



>>      }____



>>      ____



>>      *static* *public* AvroStuff getStuffFor(Schema schema)____



>>      {____



>>            Long fingerprint =
>>            *schemaToFingerprintMap*.get(schema);____
>>            *if*(fingerprint == *null*)____



>>            {____



>>                 ____



>>                 fingerprint = SchemaNormalization.*parsingFingerprin-
>>                 t64*(schema);____
>>                 *log*.info(String.*format*("No fingerprint. Generated
>>                 %d for schema %s", fingerprint,
>>                 schema.toString(*true*)));____
>>                 *schemaToFingerprintMap*.put(schema,
>>                 fingerprint);____
>>                 ____



>>                 *throw* *new* RuntimeException("Unknown schema " +
>>                 schema.toString(*true*));____
>>                 ____



>>            }____



>>            *return* *schemaMap*.get(fingerprint);____



>>      }____



>>      ____



>>      @Override____



>>      *public* *void* write(Kryo kryo, Output output,
>>      GenericData.Record object) ____
>>      {____



>>            AvroStuff stuff = *getStuffFor*(object);____



>>            ____



>>            BinaryEncoder encoder =
>>            EncoderFactory.*get*().binaryEncoder(output, *null*);____
>>            *try* {____



>>                 // write the schema key not the schema____



>>                 encoder.writeLong(stuff.key);____



>>                 // write the binary version of the fields only____
>>                 stuff.writer.write(object, encoder);____



>>                 encoder.flush();____



>>            } *catch* (IOException e) ____



>>            {____



>>                 *throw* *new* RuntimeException(e);____



>>            }____



>>      }____



>> __ __



>>      @Override____



>>      *public* GenericData.Record read(Kryo kryo, Input input,
>>      Class<GenericData.Record> type) ____
>>      {____



>>            BinaryDecoder decoder =
>>            DecoderFactory.*get*().directBinaryDecoder(input,
>>            *null*);____
>>            *long* fingerPrint;____



>>            *try* {____



>>                 // read the key____



>>                 fingerPrint = decoder.readLong();____



>>                 // find it____



>>                 AvroStuff stuff = *schemaMap*.get(fingerPrint);____
>>                 // inflate using correct preregistered inflator____
>>                 *return* (Record) stuff.reader.read(*null*,
>>                 decoder);____
>>            } *catch* (IOException e) {____



>>                 *throw* *new* RuntimeException(e);____



>>            }____



>>      }____



>>      ____



>>      ____



>> }____



>> __ __



>> We add an instance of one of these to all our Flink Rich
>> operations:____
>> __ __



>> __ __



>> *public* *class* GRBuilder *implements* Serializable {____



>>      *public* String getComment() {____



>>            *return* comment;____



>>      }____



>> __ __



>>      *public* *void* setSchema(Schema schema) {____



>>            *this*.schema = schema;____



>>      }____



>> __ __



>>      /**____



>>      * ____



>>       */____



>>      *private* *static* *final* *long* **serialVersionUID** = -
>>      3441080975441473751L;____
>>      String schemaString;____



>>      String comment;____



>>      ____



>>      *transient* GenericRecordBuilder builder = *null*;____



>>      *transient* Schema schema = *null*;____



>>      ____



>>      *public* *void* registerSchema(){____



>>            GRKryoSerializer.*preregisterSchema*(comment,
>>            getSchema());____
>>      }____



>>      ____



>>      *private* *void* readObject(ObjectInputStream input)____



>>             *throws* IOException, ClassNotFoundException ____



>>      {____



>>            realReadObject(input);____



>>      }____



>>      ____



>>      *private* *void* writeObject(ObjectOutputStream output)____



>>             *throws* IOException, ClassNotFoundException ____



>>      {____



>>            realWriteObject(output);____



>>      }____



>>      ____



>>      // Ensure on inflation, the schema is registered against____



>>      // the hashcode locally so we can inflate that type____



>> __ __



>>      *protected* *void* realReadObject(ObjectInputStream input)____
>>             *throws* IOException, ClassNotFoundException ____



>>      {____



>>            schemaString = input.readUTF();____



>>            comment = input.readUTF();____



>>            builder = *null*;____



>>            schema = *null*;____



>>            GRKryoSerializer.*preregisterSchema*(comment,
>>            getSchema());____
>>      }____



>>      ____



>>      *protected* *void* realWriteObject(ObjectOutputStream
>>      output)____
>>             *throws* IOException, ClassNotFoundException ____



>>      {____



>>            output.writeUTF(schemaString);____



>>            output.writeUTF(comment);____



>>      }____



>>      ____



>>      *public* GRBuilder() ____



>>      {____



>>      }____



>> __ __



>>      *public* GRBuilder(String comment , Schema s){____



>>            schemaString = s.toString();____



>>            builder = *null*;____



>>            *this*.comment = comment;____



>>            ____



>>            GRKryoSerializer.*preregisterSchema*(comment, s);____



>>      }____



>>      ____



>>      *public* *synchronized* GenericRecordBuilder getBuilder()____
>>      {____



>>            *if*(builder == *null*)____



>>            {____



>>                 builder = *new*
>>                 GenericRecordBuilder(getSchema());____
>>            }____



>>            *return* builder;____



>>      }____



>>      ____



>>      *public* *synchronized* Schema getSchema()____



>>      {____



>>            *if*(schema == *null*)____



>>            {____



>>                 Schema.Parser p = *new* Schema.Parser();____



>>                 schema = p.parse(schemaString);____



>>            }____



>>            *return* schema;____



>>      }____



>> }____



>> __ __



>> Our Mappers and such use the GRBuilder on the FlatMap rich class for
>> example to get a Builder to create the output records for collection.
>> We need to have A GRBUilder for each possible genericrecord schema as
>> a variable on a Map object.____
>> __ __



>> If we were torefactor this using the GenericTypeInfo or
>> AvroSerializer, how would you suggest doing it?____
>> __ __



>> Thanks____



>> __ __



>> __ __



>> *From:* Stephan Ewen [mailto:se...@apache.org] *Sent:* Thursday,
>> March 02, 2017 3:07 PM *To:* user@flink.apache.org; Aljoscha Krettek
>> *Subject:* Re: Serialization performance____
>> __ __



>> Hi!____



>> __ __



>> Thanks for this writeup, very cool stuff !____



>> __ __



>> For part (1) - serialization: I think that can be made a bit nicer.
>> Avro is a bit of an odd citizen in Flink, because Flink serialization
>> is actually schema aware, but does not integrate with Avro. That's
>> why Avro types go through Kryo.____
>> __ __



>> We should try and make Avro a first class citizen.____



>>   - The first step is to have a proper AvroSerializer. We have
>>     implemented one already, see
>>     "org.apache.flink.api.java.typeutils.runtime.AvroSerializer". It
>>     works with the ReflectDatumReader/Writer, but would be a good
>>     base line for all types of avro-based serializers in Flink..____
>> __ __



>>   - Then we need to figure out how the Avro Serializer is
>>     instantiated. We could just let the "GenericTypeInfo" create an
>>     Avro serializer for Avro types, and Kryo for all else.____
>>   - The change would eventually have to be behind a config flag in
>>     the ExecutionConfig (see "GenericTypeInfo.createSerializer()") to
>>     make sure we do not break the default serialization format within
>>     a major release version.____
>> __ __



>> __ __



>> A side note: If you actually use that through Beam, I am actually not
>> sure what will happen, because as far as I know, Beam  uses its
>> completely own serialization system and Flink sees only byte coders
>> from Beam. Aljoscha can probably add more detail here.____
>> __ __



>> __ __



>> For part (2) - the filters: If I understand correctly, you "split"
>> the data into different result sets that go to different sinks? The
>> DataStream API has a "split/select" type of construct which would
>> help there, the DataSet API does not have something like that. If you
>> look for peak performance, the demux output format seems like a good
>> workaround on the DataSet side.____
>> __ __



>> __ __



>> Greetings,____



>> Stephan____



>> __ __



>> __ __



>> __ __



>> On Thu, Mar 2, 2017 at 8:01 PM, Newport, Billy <billy.newp...@gs.com>
>> wrote:____
>> We’ve been working on performance for the last while. We’re using
>> flink 1.2 right now. We are writing batch jobs which process avro and
>> parquet input files and produce parquet files.____
>>  ____



>> Flink serialization costs seem to be the most significant aspect of
>> our wall clock time. We have written a custom kryo serializer for
>> GenericRecord which is similar to the Spark one in that it reuses
>> avro Datum reader and writers but writes a hash fingerprint for the
>> schema instead of the schema itself.____
>>  ____



>> We have subclassed most of the Rich* classes in flink and now also
>> pass to the constructors a Builder class which has the avro schema.
>> When flink inflates these, the builders are inflated and preregister
>> the avro schema for the hash code in a static map which the
>> inflation/writing code uses later.____
>>  ____



>> This is working pretty well for us, it’s like 10-20x faster than just
>> using GenericRecords normally. The question is this: Is this the
>> right way to do this? If it is then we can contribute it and then how
>> to modify beam so that it uses this stuff under the covers, we can’t
>> use beam at all right now as far as I can tell because of the
>> performance issues with GenericRecord.____
>>  ____



>> The other performance optimization is basically removing filters
>> which again seem to double wall clock time. We wrote an embedded
>> demux outputformat which receives a Tuple<Enum,GenericRecord> and
>> writes to a different parquet file depending on Enum. This was 2x
>> faster than a naïve 4 filters going to 4 parquet outputformats.____
>>  ____



>> Do these optimizations seem unnecessary to some? Is there some trick
>> we’re missing?____
>>  ____



>> __ __




Reply via email to