Hi Steve, for some reason, it seems as if the Java compiler is not generating the bridge method [1].
Could you double-check that the Java version of your build process and your cluster match? Could you run javap on your generated class file and report back? [1] https://docs.oracle.com/javase/tutorial/java/generics/bridgeMethods.html On Thu, Mar 19, 2020 at 5:13 PM Steve Whelan <swhe...@jwplayer.com> wrote: > Hi, > > I am attempting to create a Key/Value serializer for the Kafka table > connector. I forked `KafkaTableSourceSinkFactoryBase`[1] and other relevant > classes, updating the serializer. > > First, I created `JsonRowKeyedSerializationSchema` which implements > `KeyedSerializationSchema`[2], which is deprecated. The way it works is you > provide a list of indices in your Row output that are the Key. This works > successfully. > > When I tried migrating my `JsonRowKeyedSerializationSchema` to implement > `KafkaSerializationSchema`[3], I get a `java.lang.AbstractMethodError` > exception. Normally, this would me I'm using an old interface however all > my Flink dependencies are version 1.9. I can not find this abstract > `serialize()` function in the Flink codebase. Has anyone come across this > before? > > When I print the method of my `JsonRowKeyedSerializationSchema` class, I > do see the below which seems to be getting called by the FlinkKafkaProducer > but I do not see it in `KafkaSerializationSchema`: > > public abstract > org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.ProducerRecord > org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema.serialize(java.lang.Object,java.lang.Long) > java.lang.Object > java.lang.Long > > > *`JsonRowKeyedSerializationSchema` class* > > import > org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema; > import org.apache.flink.types.Row; > import org.apache.kafka.clients.producer.ProducerRecord; > > public class JsonRowKeyedSerializationSchema implements > KafkaSerializationSchema<Row> { > > // constructors and helpers > > @Override > public ProducerRecord<byte[], byte[]> serialize(Row row, @Nullable Long > aLong) { > return new ProducerRecord<>("some_topic", serializeKey(row), > serializeValue(row)); > } > } > > > *Stacktrace:* > > Caused by: java.lang.AbstractMethodError: Method > com/mypackage/flink/serialization/json/JsonRowKeyedSerializationSchema.serialize(Ljava/lang/Object;Ljava/lang/Long;)Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/producer/ProducerRecord; > is abstract > at > com.mypackage.flink.serialization.json.JsonRowKeyedSerializationSchema.serialize(JsonRowKeyedSerializationSchema.java) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:816) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:98) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:228) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:546) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:523) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:483) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:546) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:523) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:483) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:546) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:523) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:483) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409) > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398) > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185) > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202) > > > [1] > https://github.com/apache/flink/blob/release-1.9.0/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java > [2] > https://github.com/apache/flink/blob/release-1.9.0/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java > [3] > https://github.com/apache/flink/blob/release-1.9.0/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.java > >