You need to differentiate two serialization abstractions (which I guess you
already know). One is coming from reading the source, where the
DeserializationSchema is used, and it translates the bytes of Kafka into
something that Flink can handle.

The second serialization occurs within Flink through the TypeSerializer,
such that Flink can pass data from one subtask to another subtask. That's
why your custom DeserializationSchema would need to provide
TypeInformation, which allows Flink to pick the TypeSerializer.

Now you would probably not be able to provide a consistent TypeInformation
for arbitrary types and Flink has to fall back to Kryo as you said. A
solution is to also provide a custom TypeSerializer that uses the Schema
registry (I wouldn't go the route of GenericRecords with schema again).

Note that because of the missing TypeInformation, you will never be able to
use Table API or SQL. If you ask me that's quite a bit of drawbacks coming
from that approach (no schema enforcement, no proper schema evolution
support, no schema compability enforcement, custom serializers, and clumsy
code using lots of string-based field accesses and casts).

---

I forgot to highlight another rather simple approach that works on very
generic workflows with few operations quite well: use byte[]. So
DeserializationSchema works as trivial as it sounds. You pass byte[] all
along until you have your FlatMap (assuming you are doing some filtering
validation) and only inside this flatmap you deserialize into Avro, do your
custom logic, and serialize it again into byte[]. You can use Table API /
SQL later on with UDFs that do the same thing. Using byte[] as the internal
serialization format of Flink is also blazingly fast (there is not much to
do except adding a header). The only downside is that you need to
deserialize manually in each operator, but that can usually be factored out.

I'd still recommend looking into using only one schema that captures all
events as subschemas.

On Thu, Nov 12, 2020 at 4:15 PM ashwinkonale <ashwin.kon...@gmail.com>
wrote:

> So in this case, flink will fall back to default kyro serialiser right ?
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to