Hi Arvid,

Interestingly, my job runs successfully in a docker container (image*
flink:1.9.0-scala_2.11*) but is failing with the
*java.lang.AbstractMethodError* on AWS EMR (non-docker). I am compiling
with java version OpenJDK 1.8.0_242, which is the same version my EMR
cluster is running. Though since it runs successfully locally in a docker
container, it would point to an issue in our AWS environment setup. Oddly,
we have been running Flink on EMR for +2 years and have never come across
this till now.

Results of javap are:

public class
com.jwplayer.flink.serialization.json.JsonRowKeyedSerializationSchema
implements
org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema<org.apache.flink.types.Row>
{
  public static
org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema<org.apache.flink.types.Row>
create(com.jwplayer.flink.config.serde.SerDeConfig);
  public byte[] serializeKey(org.apache.flink.types.Row);
  public byte[] serializeValue(org.apache.flink.types.Row);
  public org.apache.kafka.clients.producer.ProducerRecord<byte[], byte[]>
serialize(org.apache.flink.types.Row, java.lang.Long);
  public org.apache.kafka.clients.producer.ProducerRecord
serialize(java.lang.Object, java.lang.Long);
}


On Mon, Mar 23, 2020 at 9:55 AM Arvid Heise <ar...@ververica.com> wrote:

> Hi Steve,
>
> for some reason, it seems as if the Java compiler is not generating the
> bridge method [1].
>
> Could you double-check that the Java version of your build process and
> your cluster match?
>
> Could you run javap on your generated class file and report back?
>
> [1]
> https://docs.oracle.com/javase/tutorial/java/generics/bridgeMethods.html
>
> On Thu, Mar 19, 2020 at 5:13 PM Steve Whelan <swhe...@jwplayer.com> wrote:
>
>> Hi,
>>
>> I am attempting to create a Key/Value serializer for the Kafka table
>> connector. I forked `KafkaTableSourceSinkFactoryBase`[1] and other relevant
>> classes, updating the serializer.
>>
>> First, I created `JsonRowKeyedSerializationSchema` which implements
>> `KeyedSerializationSchema`[2], which is deprecated. The way it works is you
>> provide a list of indices in your Row output that are the Key. This works
>> successfully.
>>
>> When I tried migrating my `JsonRowKeyedSerializationSchema` to implement
>> `KafkaSerializationSchema`[3], I get a `java.lang.AbstractMethodError`
>> exception. Normally, this would me I'm using an old interface however all
>> my Flink dependencies are version 1.9. I can not find this abstract
>> `serialize()` function in the Flink codebase. Has anyone come across this
>> before?
>>
>> When I print the method of my `JsonRowKeyedSerializationSchema` class, I
>> do see the below which seems to be getting called by the FlinkKafkaProducer
>> but I do not see it in `KafkaSerializationSchema`:
>>
>> public abstract
>> org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.ProducerRecord
>> org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema.serialize(java.lang.Object,java.lang.Long)
>> java.lang.Object
>> java.lang.Long
>>
>>
>> *`JsonRowKeyedSerializationSchema` class*
>>
>> import
>> org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
>> import org.apache.flink.types.Row;
>> import org.apache.kafka.clients.producer.ProducerRecord;
>>
>> public class JsonRowKeyedSerializationSchema implements
>> KafkaSerializationSchema<Row> {
>>
>>   // constructors and helpers
>>
>>   @Override
>>   public ProducerRecord<byte[], byte[]> serialize(Row row, @Nullable Long
>> aLong) {
>>     return new ProducerRecord<>("some_topic", serializeKey(row),
>> serializeValue(row));
>>   }
>> }
>>
>>
>> *Stacktrace:*
>>
>> Caused by: java.lang.AbstractMethodError: Method
>> com/mypackage/flink/serialization/json/JsonRowKeyedSerializationSchema.serialize(Ljava/lang/Object;Ljava/lang/Long;)Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/producer/ProducerRecord;
>> is abstract
>> at
>> com.mypackage.flink.serialization.json.JsonRowKeyedSerializationSchema.serialize(JsonRowKeyedSerializationSchema.java)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:816)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:98)
>> at
>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:228)
>> at
>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:546)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:523)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:483)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>> at
>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:546)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:523)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:483)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>> at
>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:546)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:523)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:483)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>> at
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
>> at
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
>> at
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
>> at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>> at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>> at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)
>>
>>
>> [1]
>> https://github.com/apache/flink/blob/release-1.9.0/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
>> [2]
>> https://github.com/apache/flink/blob/release-1.9.0/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
>> [3]
>> https://github.com/apache/flink/blob/release-1.9.0/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.java
>>
>>
>

Reply via email to