Hi Max,

I tried your approach (1) it did not work. I am still getting the same
exception.
(2) is not possible, unless protobuff complier changes.

Yes setting that in config also did not work as well.

I think some where inside flink it is not able to process correctly.


Thanks
Viswadeep


Viswadeep Veguru.


On Thu, Jun 2, 2016 at 10:09 AM, Maximilian Michels <[email protected]> wrote:

> Hi Viswadeep,
>
> What Amit recommended (thanks!) is indeed an easy fix if you're using
> Flink. However, we don't expose the ExecutionConfig in Beam. So
> setting custom Kryo serializers is not possible.
>
> Two other options I see:
>
> 1) Could you try using the following in your deserialization schema?
>
>    @Override
>     public TypeInformation<T> getProducedType() {
>         return new CoderTypeInformation<>(coder);
>     }
>
> 2) Could you avoid using Collections.UnmodifiableList in your code?
> Not sure if it is possible because it seems to be a field in your
> Protobuf class.
>
> Thanks,
> Max
>
> On Thu, Jun 2, 2016 at 4:49 PM, Viswadeep <[email protected]> wrote:
> > Thanks Amit,
> >
> > It did not solve my issue, Flink it self has the following comment in
> > ExecutionConfig.java
> >
> > /**
> >  * Force TypeExtractor to use Kryo serializer for POJOS even though we
> could
> > analyze as POJO.
> >  * In some cases this might be preferable. For example, when using
> > interfaces
> >  * with subclasses that cannot be analyzed as POJO.
> >  */
> >
> >
> > Because generated Protobuf class extends GeneratedMessage implements
> > Employee.PacketOrBuilder
> >
> >
> > Thanks for any help.
> > Viswadeep
> >
> >
> > On Wed, Jun 1, 2016 at 11:02 PM, Amit Sela <[email protected]> wrote:
> >>
> >> I think it has to do with Flink's internal use of Kryo  - take a look at
> >> this:
> >>
> http://stackoverflow.com/questions/32453030/using-an-collectionsunmodifiablecollection-with-apache-flink
> >> I'm sure Flink committers will soon reach out to correct me if I'm
> missing
> >> something..
> >>
> >> On Thu, Jun 2, 2016 at 5:01 AM Viswadeep <[email protected]> wrote:
> >>>
> >>> Hi
> >>>
> >>> I am using apache beam with flink and ProtoBuf for encoding and
> decoding.
> >>>
> >>> The following is the method for the FlinkKafka Consumer.
> >>>
> >>> public UnboundedSource<T extends
> Message,UnboundedSource.CheckpointMark>
> >>> build() {
> >>>     Properties p = new Properties();
> >>>     p.setProperty("zookeeper.connect", kafkaOptions.getZookeeper());
> >>>     p.setProperty("bootstrap.servers", kafkaOptions.getBroker());
> >>>     p.setProperty("group.id", kafkaOptions.getGroup());
> >>>     FlinkKafkaConsumer09<T> kafkaConsumer = new
> >>> FlinkKafkaConsumer09<>(kafkaOptions.getKafkaTopic(),
> >>>             new SerializationDeserializationSchema(typedSource,
> >>> ProtoCoder.of(typedSource)), p);
> >>>     return UnboundedFlinkSource.of(kafkaConsumer);
> >>> }
> >>>
> >>> and the helper for serialization and DeSerialization is this.
> >>>
> >>> public class SerializationDeserializationSchema<T>
> >>>         implements SerializationSchema<T>, DeserializationSchema<T> {
> >>>
> >>>     private final Class<T> tClass;
> >>>
> >>>     private final Coder<T> coder;
> >>>     private transient ByteArrayOutputStream out;
> >>>
> >>>     public SerializationDeserializationSchema(Class<T> clazz, Coder<T>
> >>> coder) {
> >>>         this.tClass = clazz;
> >>>         this.coder = coder;
> >>>         this.out = new ByteArrayOutputStream();
> >>>     }
> >>>
> >>>     @Override
> >>>     public byte[] serialize(T element) {
> >>>
> >>>         if (out == null) {
> >>>             out = new ByteArrayOutputStream();
> >>>         }
> >>>         try {
> >>>             out.reset();
> >>>             coder.encode(element, out, Coder.Context.NESTED);
> >>>         } catch (IOException e) {
> >>>             throw new RuntimeException("encoding failed.", e);
> >>>         }
> >>>         return out.toByteArray();
> >>>     }
> >>>
> >>>     @Override
> >>>     public T deserialize(byte[] message) throws IOException {
> >>>         return coder.decode(new ByteArrayInputStream(message),
> >>> Coder.Context.NESTED);
> >>>     }
> >>>
> >>>     @Override
> >>>     public boolean isEndOfStream(T nextElement) {
> >>>         return false;
> >>>     }
> >>>
> >>>     @Override
> >>>     public TypeInformation<T> getProducedType() {
> >>>         return TypeExtractor.getForClass(tClass);
> >>>     }
> >>> }
> >>>
> >>>
> >>> I am getting the following, Kyro exception.
> >>>
> >>> java.lang.RuntimeException: ConsumerThread threw an exception: Could
> not
> >>> forward element to next operator
> >>>         at
> >>>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.run(FlinkKafkaConsumer09.java:336)
> >>>         at
> >>>
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
> >>>         at
> >>>
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
> >>>         at
> >>>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> >>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> >>>         at java.lang.Thread.run(Thread.java:745)
> >>>     Caused by: java.lang.RuntimeException: Could not forward element to
> >>> next operator
> >>>         at
> >>>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
> >>>         at
> >>>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
> >>>         at
> >>>
> org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318)
> >>>         at
> >>>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473)
> >>>     Caused by: com.esotericsoftware.kryo.KryoException:
> >>> java.lang.UnsupportedOperationException
> >>>     Serialization trace:
> >>>     records_ (com.model.Employee$Packet)
> >>>         at
> >>>
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
> >>>         at
> >>>
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> >>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
> >>>         at
> >>>
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:168)
> >>>         at
> >>>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:349)
> >>>         ... 3 more
> >>>     Caused by: java.lang.UnsupportedOperationException
> >>>         at
> >>> java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
> >>>         at
> >>>
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
> >>>         at
> >>>
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
> >>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> >>>         at
> >>>
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
> >>>
> >>>
> >>> I am not able to avoid this "Kryo" Exception, Thanks for any help.
> >>>
> >>> Thanks
> >>>
> >>> Viswadeep.
> >>>
> >
> >
>

Reply via email to