Hi
I am using apache beam with flink and ProtoBuf for encoding and decoding.
The following is the method for the FlinkKafka Consumer.
public UnboundedSource<T extends
Message,UnboundedSource.CheckpointMark> build() {
Properties p = new Properties();
p.setProperty("zookeeper.connect", kafkaOptions.getZookeeper());
p.setProperty("bootstrap.servers", kafkaOptions.getBroker());
p.setProperty("group.id", kafkaOptions.getGroup());
FlinkKafkaConsumer09<T> kafkaConsumer = new
FlinkKafkaConsumer09<>(kafkaOptions.getKafkaTopic(),
new SerializationDeserializationSchema(typedSource,
ProtoCoder.of(typedSource)), p);
return UnboundedFlinkSource.of(kafkaConsumer);
}
and the helper for serialization and DeSerialization is this.
public class SerializationDeserializationSchema<T>
implements SerializationSchema<T>, DeserializationSchema<T> {
private final Class<T> tClass;
private final Coder<T> coder;
private transient ByteArrayOutputStream out;
public SerializationDeserializationSchema(Class<T> clazz, Coder<T> coder) {
this.tClass = clazz;
this.coder = coder;
this.out = new ByteArrayOutputStream();
}
@Override
public byte[] serialize(T element) {
if (out == null) {
out = new ByteArrayOutputStream();
}
try {
out.reset();
coder.encode(element, out, Coder.Context.NESTED);
} catch (IOException e) {
throw new RuntimeException("encoding failed.", e);
}
return out.toByteArray();
}
@Override
public T deserialize(byte[] message) throws IOException {
return coder.decode(new ByteArrayInputStream(message),
Coder.Context.NESTED);
}
@Override
public boolean isEndOfStream(T nextElement) {
return false;
}
@Override
public TypeInformation<T> getProducedType() {
return TypeExtractor.getForClass(tClass);
}
}
I am getting the following, *Kyro* exception.
java.lang.RuntimeException: ConsumerThread threw an exception: Could not
forward element to next operator
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.run(FlinkKafkaConsumer09.java:336)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Could not forward element to
next operator
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
at
org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473)
Caused by: com.esotericsoftware.kryo.KryoException:
java.lang.UnsupportedOperationException
Serialization trace:
records_ (com.model.Employee$Packet)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:168)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:349)
... 3 more
Caused by: java.lang.UnsupportedOperationException
at
java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
I am not able to avoid this "Kryo" Exception, Thanks for any help.
Thanks
Viswadeep.