Hi

I am using apache beam with flink and ProtoBuf for encoding and decoding.

The following is the method for the FlinkKafka Consumer.

public UnboundedSource<T extends
Message,UnboundedSource.CheckpointMark> build() {
    Properties p = new Properties();
    p.setProperty("zookeeper.connect", kafkaOptions.getZookeeper());
    p.setProperty("bootstrap.servers", kafkaOptions.getBroker());
    p.setProperty("group.id", kafkaOptions.getGroup());
    FlinkKafkaConsumer09<T> kafkaConsumer = new
FlinkKafkaConsumer09<>(kafkaOptions.getKafkaTopic(),
            new SerializationDeserializationSchema(typedSource,
ProtoCoder.of(typedSource)), p);
    return UnboundedFlinkSource.of(kafkaConsumer);
}

and the helper for serialization and DeSerialization is this.

public class SerializationDeserializationSchema<T>
        implements SerializationSchema<T>, DeserializationSchema<T> {

    private final Class<T> tClass;

    private final Coder<T> coder;
    private transient ByteArrayOutputStream out;

    public SerializationDeserializationSchema(Class<T> clazz, Coder<T> coder) {
        this.tClass = clazz;
        this.coder = coder;
        this.out = new ByteArrayOutputStream();
    }

    @Override
    public byte[] serialize(T element) {

        if (out == null) {
            out = new ByteArrayOutputStream();
        }
        try {
            out.reset();
            coder.encode(element, out, Coder.Context.NESTED);
        } catch (IOException e) {
            throw new RuntimeException("encoding failed.", e);
        }
        return out.toByteArray();
    }

    @Override
    public T deserialize(byte[] message) throws IOException {
        return coder.decode(new ByteArrayInputStream(message),
Coder.Context.NESTED);
    }

    @Override
    public boolean isEndOfStream(T nextElement) {
        return false;
    }

    @Override
    public TypeInformation<T> getProducedType() {
        return TypeExtractor.getForClass(tClass);
    }
}


I am getting the following, *Kyro* exception.

java.lang.RuntimeException: ConsumerThread threw an exception: Could not
forward element to next operator
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.run(FlinkKafkaConsumer09.java:336)
        at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
        at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:745)
    Caused by: java.lang.RuntimeException: Could not forward element to
next operator
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
        at
org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318)
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473)
    Caused by: com.esotericsoftware.kryo.KryoException:
java.lang.UnsupportedOperationException
    Serialization trace:
    records_ (com.model.Employee$Packet)
        at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
        at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
        at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:168)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:349)
        ... 3 more
    Caused by: java.lang.UnsupportedOperationException
        at
java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
        at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
        at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)


I am not able to avoid this "Kryo" Exception, Thanks for any help.

Thanks

Viswadeep.

Reply via email to