Hi Kamal,
… interesting, seeing what other people do 😊
First a number of general comments/observations as to your code:
* Indeed, you are using GenericRecord as a means to encode a Coproduct,
I’ll come back to that later
* FlinkGenericRecordTypeSafeJob.java:
* Up to line 103 you emulate a Kafka source, i.e. byte[] is ok here,
however, you don’t to use byte[] in any other place throughout and use Flink
API to your favor …
* E.g. your DelayFunction can work directly with GenericRecord instead of
byte[]:
* Line 206, Line 216 … in that you don’t neet to serialize (227),
deserialize (254) explicitly
* You don’t need separate timerState (207) as timers in flink are
idempotent, double registration does not have any effect. i.e. you can
* Remove lines 233-242, 260, just keep line 241
* State access is sort of expensive, one unit of parallelism gives you
roughly 15000 accesses to Rocksdb, depending on your hardware
* Right now, your job uses keyed streams only to have timers available,
* Should you want to use it to consistently distribute/shuffle events
by key in order to implement by-key logic, you need to keyBy at each stage that
depends on it,
* otherwise it is not guaranteed to get consistent keying, especially
* since you disabled chaining
Now for the TypeInformation:
* you have an enum (coproduct) of two different record types (products) in
the same stream
* you use GenericRecord throughout the job
* that means in any place that works with data in the record AVRO needs to
interpret the schema and possibly read the whole blob to find the place that
contains the specific field of interest
* that is quite inefficient in most situations
* Pojos are much more efficient to use, and flink also allows to generate
TypeInformation for Java enums
* (can’t help you much with this, I code in Scala and have other means
to encode algebraic data types), you’ll find the documentation …
* Pojos also support schema migration in case you want to upgrade your
job, but continue with the collected state in you checkpoints/savepoints
* Your TypeSerializer stores the whole schema with each event, that is also
very inefficient
* Extra I/O foodprint, a serialized schema is easily 20x the space of
the actual record to be serialized
* You force AVRO to serialize/deserialize the schema with each record,
when you only have two of them
* How about (if you don’t want to go the POJO/Enum way) storing the two
schemas in use in the serializer, together with a specific tag per schema, and
* Then serialize the tag first, then the record
* Deserialize the tag, extract the stored schema, deserialize the
record
* DynamicGenericRecordSerializer.java:
* Lines 60-62: target.writeUTF(schemaStr)
* Lines 81-84: String schemaStr = source.readUTF()
* However, rather read/write the tag instead (1 byte should be
sufficient) and resolve the schema from the local Map[tag, Schema]
* That brings us to the TypeSerializerSnapshot
* Flink supports schema migration, creating a savepoint using the
original implementation, reading the savepoint using the migrated job
* In order to support that, Flink uses SerializerSnapshots to store the
configuration of the original serializer together with the savepoint, and the
* When loading the savepoint into the new job, use the serializer with
the old configuration to load the state, but once the state is loaded use the
serializer with the new configuration to run the job
* If you want to support schema migration,
* e.g. a later job support an additional schema, you would store the
serializer configuration by means of the Snapshot
* i.e. the Map[tag, Schema] with all its tags and respective schemas
* also, the record schemas can be updated, if the new schema versions
are AVRO FORWARD compatible
* for the job runtime, that Map[tag, Schema] works like a
mini-schema-registry that covers just the last active schema of each respective
record
So much … for now
Do you consider visiting the FLinkFOrward Conference in Barcelona, you could
find me there for additional discussion 😊?
Sincere greetings
Thias
From: Kamal Mittal <[email protected]>
Sent: Tuesday, September 23, 2025 3:33 PM
To: Schwalbe Matthias <[email protected]>; [email protected]
Subject: [External] RE: Flink kafka source with Avro Generic Record
⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠
Thanks for such a nice explanation.
1. Kryo is not used now as per attached classes (custom type information,
custom type serializer, snapshot etc.), can you please have a look and let me
know if any reflection? It is working over Flink machine.
Also, I used at the start of program intentionally to disable fallback to Kryo
- env.getConfig().disableGenericTypes();
1. This is persisting complete GenericRecord right now and as I know while
using GenericDatumWriter, it will just persist the fields as per schema and not
schema field itself because Kryo not used here.
1. Little bit confused here w.r.t snapshot change you called out below for
different Avro schemas (product, customer, order etc.), why serializer snapshot
need to persist schemas?
1. Also, yes, we are using Generic Record in between of source and sink as
well and also having a custom schema registry integration. Attached is a dummy
program and simulating up to some extent what is done in actual application.
From: Schwalbe Matthias
<[email protected]<mailto:[email protected]>>
Sent: 23 September 2025 17:07
To: Kamal Mittal <[email protected]<mailto:[email protected]>>;
[email protected]<mailto:[email protected]>
Subject: RE: Flink kafka source with Avro Generic Record
Hi Kamal,
Interesting topic 😊
The answer really depends on what you exactly want to do.
* GenericRecord always contains the AVRO schema, if serialization by Kryo
was possible it would still serialize the schema (big) with each record
* Hence you need to arrange things such that the schema does not end up in
the serialized record
* We use AVRO GenericRecord just one map() operator before the sink (i.e.
not for state etc.), we transfer the GenericRecord from the Map() operator to
the sink, but
* reconfigure the chaining strategy:
setChainingStrategy(ChainingStrategy.ALWAYS) on the operator such that the
GenericRecords don’t get serialize between the map() and the sink
* We don’t work with GenericRecord in any place other that sources and sinks
* From your description I assume you use GenericRecord in other places that
source/sinks
* That means you need to have/create TypeInformation[GenericRecord] that
contains the schema in use and serializes by means of AVRO, plus a
TypeSerializerSnapshot that persists the AVRO schema
* That is some 50 lines of code
* However, your description also indicates that you use AVRO to also
support different event types in the same place like a Coproduct (i.e. Sum
type, enum type) ??!
* In that case TypeInformation needs to be a little more complicated, the
Serializer Snapshot needs to persist all avro schemas together with some
respective invariant tag, and
* The TypeSerializer needs to store the respective tag before the
serialized AVRO record is stored, and on deserialization the other way around
load tag, get schema, use schema for deserialization
* All that is not rocket science, but a bit elaborate
Hope that helps
Thias
From: Kamal Mittal via user
<[email protected]<mailto:[email protected]>>
Sent: Tuesday, September 23, 2025 5:05 AM
To: [email protected]<mailto:[email protected]>
Subject: [External] RE: Flink kafka source with Avro Generic Record
⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠
Can someone please give input for below?
From: Kamal Mittal <[email protected]<mailto:[email protected]>>
Sent: 22 September 2025 17:16
To: Kamal Mittal <[email protected]<mailto:[email protected]>>;
[email protected]<mailto:[email protected]>
Subject: RE: Flink kafka source with Avro Generic Record
Hello,
I tried this and Flink fails later, when it tries to serialize/deserialize the
GenericRecord object for communication between operators (e.g. from map() to
another operator, or writing checkpoints, or shuffling).
it's a serialization issue during operator chaining or data exchange in Flink’s
runtime.
Probable reason:
GenericRecord from Avro holds schema metadata internally, which includes
unmodifiable maps, especially:
schema (org.apache.avro.generic.GenericData$Record)
↳ fieldMap (org.apache.avro.Schema$RecordSchema)
↳ reserved (java.util.Collections$UnmodifiableMap)
These types (like UnmodifiableMap) are not easily serializable by Kryo, which
Flink falls back to if:
* No proper TypeInformation or TypeSerializer is provided.
* Flink cannot infer a more optimized serializer.
Error Stack :
Caused by: com.esotericsoftware.kryo.KryoException:
java.lang.UnsupportedOperationException
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
Can you please confirm above understanding and also possible way to resolve
this? Probably custom serializer and custom type information solution needed
here, is that recommended?
Rgds,
Kamal
From: Kamal Mittal via user
<[email protected]<mailto:[email protected]>>
Sent: 22 September 2025 13:56
To: [email protected]<mailto:[email protected]>
Subject: Flink kafka source with Avro Generic Record
Hello,
I need to support Flink application accepting avro binary events with different
schemas over flink kafka source. Need to use custom schema registry to fetch
schema at runtime and decode the incoming event.
Will use Avro Generic Record to decode incoming event with different avro
schemas.
Gone through the page - Flink Serialization Tuning Vol. 1: Choosing your
Serializer — if you can | Apache
Flink<https://flink.apache.org/2020/04/15/flink-serialization-tuning-vol.-1-choosing-your-serializer-if-you-can/#avro-generic>.
Can you please tell as at compile/job graph time schema is not available then
it will use Kryo as serialzer? Also anything can be done here to improve it as
for Kryo perf. is impacted?
Rgds,
Kamal
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng
verboten.
This message is intended only for the named recipient and may contain
confidential or privileged information. As the confidentiality of email
communication cannot be guaranteed, we do not accept any responsibility for the
confidentiality and the intactness of this message. If you have received it in
error, please advise the sender by return e-mail and delete this message and
any attachments. Any unauthorised use or dissemination of this information is
strictly prohibited.
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng
verboten.
This message is intended only for the named recipient and may contain
confidential or privileged information. As the confidentiality of email
communication cannot be guaranteed, we do not accept any responsibility for the
confidentiality and the intactness of this message. If you have received it in
error, please advise the sender by return e-mail and delete this message and
any attachments. Any unauthorised use or dissemination of this information is
strictly prohibited.