Hi, I have a kafka topic on which the key is serialized in a custom format and the value is serialized as JSON. How do I create a FlinkKafakConsumer that has different deserialization schemas for the key and value? Here's what I tried:
FlinkKafkaConsumer<Tuple2<MyClass, ObjectNode>> advancedFeatureData = new FlinkKafkaConsumer<>(ADVANCED_FEATURES_TOPIC, new TypeInformationKeyValueSerializationSchema<MyClass, ObjectNode>( TypeInformation.of(new TypeHint<MyClass>() {}), TypeInformation.of(new TypeHint<ObjectNode>() {}), env.getConfig() ), properties); However, I get the error: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 121 at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346) at org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema.deserialize(TypeInformationKeyValueSerializationSchema.java:112) at org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema.deserialize(TypeInformationKeyValueSerializationSchema.java:43) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:718) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) Is there something I am missing with my approach or am I supposed to use a completely different class than TypeInformationKeyValueSerializationSchema?