Thanks Amit,

It did not solve my issue, Flink it self has the following comment in
ExecutionConfig.java

/**
 * Force TypeExtractor to use Kryo serializer for POJOS even though we
could analyze as POJO.
 * In some cases this might be preferable. For example, when using interfaces
 * with subclasses that cannot be analyzed as POJO.
 */


Because generated Protobuf class extends GeneratedMessage implements
Employee.PacketOrBuilder


Thanks for any help.
Viswadeep


On Wed, Jun 1, 2016 at 11:02 PM, Amit Sela <[email protected]> wrote:

> I think it has to do with Flink's internal use of Kryo  - take a look at
> this:
> http://stackoverflow.com/questions/32453030/using-an-collectionsunmodifiablecollection-with-apache-flink
> I'm sure Flink committers will soon reach out to correct me if I'm missing
> something..
>
> On Thu, Jun 2, 2016 at 5:01 AM Viswadeep <[email protected]> wrote:
>
>> Hi
>>
>> I am using apache beam with flink and ProtoBuf for encoding and decoding.
>>
>> The following is the method for the FlinkKafka Consumer.
>>
>> public UnboundedSource<T extends Message,UnboundedSource.CheckpointMark> 
>> build() {
>>     Properties p = new Properties();
>>     p.setProperty("zookeeper.connect", kafkaOptions.getZookeeper());
>>     p.setProperty("bootstrap.servers", kafkaOptions.getBroker());
>>     p.setProperty("group.id", kafkaOptions.getGroup());
>>     FlinkKafkaConsumer09<T> kafkaConsumer = new 
>> FlinkKafkaConsumer09<>(kafkaOptions.getKafkaTopic(),
>>             new SerializationDeserializationSchema(typedSource, 
>> ProtoCoder.of(typedSource)), p);
>>     return UnboundedFlinkSource.of(kafkaConsumer);
>> }
>>
>> and the helper for serialization and DeSerialization is this.
>>
>> public class SerializationDeserializationSchema<T>
>>         implements SerializationSchema<T>, DeserializationSchema<T> {
>>
>>     private final Class<T> tClass;
>>
>>     private final Coder<T> coder;
>>     private transient ByteArrayOutputStream out;
>>
>>     public SerializationDeserializationSchema(Class<T> clazz, Coder<T> 
>> coder) {
>>         this.tClass = clazz;
>>         this.coder = coder;
>>         this.out = new ByteArrayOutputStream();
>>     }
>>
>>     @Override
>>     public byte[] serialize(T element) {
>>
>>         if (out == null) {
>>             out = new ByteArrayOutputStream();
>>         }
>>         try {
>>             out.reset();
>>             coder.encode(element, out, Coder.Context.NESTED);
>>         } catch (IOException e) {
>>             throw new RuntimeException("encoding failed.", e);
>>         }
>>         return out.toByteArray();
>>     }
>>
>>     @Override
>>     public T deserialize(byte[] message) throws IOException {
>>         return coder.decode(new ByteArrayInputStream(message), 
>> Coder.Context.NESTED);
>>     }
>>
>>     @Override
>>     public boolean isEndOfStream(T nextElement) {
>>         return false;
>>     }
>>
>>     @Override
>>     public TypeInformation<T> getProducedType() {
>>         return TypeExtractor.getForClass(tClass);
>>     }
>> }
>>
>>
>> I am getting the following, *Kyro* exception.
>>
>> java.lang.RuntimeException: ConsumerThread threw an exception: Could not
>> forward element to next operator
>>         at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.run(FlinkKafkaConsumer09.java:336)
>>         at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
>>         at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>         at java.lang.Thread.run(Thread.java:745)
>>     Caused by: java.lang.RuntimeException: Could not forward element to
>> next operator
>>         at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>>         at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>>         at
>> org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318)
>>         at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473)
>>     Caused by: com.esotericsoftware.kryo.KryoException:
>> java.lang.UnsupportedOperationException
>>     Serialization trace:
>>     records_ (com.model.Employee$Packet)
>>         at
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>         at
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
>>         at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:168)
>>         at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:349)
>>         ... 3 more
>>     Caused by: java.lang.UnsupportedOperationException
>>         at
>> java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
>>         at
>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
>>         at
>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>         at
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>>
>>
>> I am not able to avoid this "Kryo" Exception, Thanks for any help.
>>
>> Thanks
>>
>> Viswadeep.
>>
>>
>

Reply via email to