Hi Max, I tried your approach (1) it did not work. I am still getting the same exception. (2) is not possible, unless protobuff complier changes.
Yes setting that in config also did not work as well. I think some where inside flink it is not able to process correctly. Thanks Viswadeep Viswadeep Veguru. On Thu, Jun 2, 2016 at 10:09 AM, Maximilian Michels <[email protected]> wrote: > Hi Viswadeep, > > What Amit recommended (thanks!) is indeed an easy fix if you're using > Flink. However, we don't expose the ExecutionConfig in Beam. So > setting custom Kryo serializers is not possible. > > Two other options I see: > > 1) Could you try using the following in your deserialization schema? > > @Override > public TypeInformation<T> getProducedType() { > return new CoderTypeInformation<>(coder); > } > > 2) Could you avoid using Collections.UnmodifiableList in your code? > Not sure if it is possible because it seems to be a field in your > Protobuf class. > > Thanks, > Max > > On Thu, Jun 2, 2016 at 4:49 PM, Viswadeep <[email protected]> wrote: > > Thanks Amit, > > > > It did not solve my issue, Flink it self has the following comment in > > ExecutionConfig.java > > > > /** > > * Force TypeExtractor to use Kryo serializer for POJOS even though we > could > > analyze as POJO. > > * In some cases this might be preferable. For example, when using > > interfaces > > * with subclasses that cannot be analyzed as POJO. > > */ > > > > > > Because generated Protobuf class extends GeneratedMessage implements > > Employee.PacketOrBuilder > > > > > > Thanks for any help. > > Viswadeep > > > > > > On Wed, Jun 1, 2016 at 11:02 PM, Amit Sela <[email protected]> wrote: > >> > >> I think it has to do with Flink's internal use of Kryo - take a look at > >> this: > >> > http://stackoverflow.com/questions/32453030/using-an-collectionsunmodifiablecollection-with-apache-flink > >> I'm sure Flink committers will soon reach out to correct me if I'm > missing > >> something.. > >> > >> On Thu, Jun 2, 2016 at 5:01 AM Viswadeep <[email protected]> wrote: > >>> > >>> Hi > >>> > >>> I am using apache beam with flink and ProtoBuf for encoding and > decoding. > >>> > >>> The following is the method for the FlinkKafka Consumer. > >>> > >>> public UnboundedSource<T extends > Message,UnboundedSource.CheckpointMark> > >>> build() { > >>> Properties p = new Properties(); > >>> p.setProperty("zookeeper.connect", kafkaOptions.getZookeeper()); > >>> p.setProperty("bootstrap.servers", kafkaOptions.getBroker()); > >>> p.setProperty("group.id", kafkaOptions.getGroup()); > >>> FlinkKafkaConsumer09<T> kafkaConsumer = new > >>> FlinkKafkaConsumer09<>(kafkaOptions.getKafkaTopic(), > >>> new SerializationDeserializationSchema(typedSource, > >>> ProtoCoder.of(typedSource)), p); > >>> return UnboundedFlinkSource.of(kafkaConsumer); > >>> } > >>> > >>> and the helper for serialization and DeSerialization is this. > >>> > >>> public class SerializationDeserializationSchema<T> > >>> implements SerializationSchema<T>, DeserializationSchema<T> { > >>> > >>> private final Class<T> tClass; > >>> > >>> private final Coder<T> coder; > >>> private transient ByteArrayOutputStream out; > >>> > >>> public SerializationDeserializationSchema(Class<T> clazz, Coder<T> > >>> coder) { > >>> this.tClass = clazz; > >>> this.coder = coder; > >>> this.out = new ByteArrayOutputStream(); > >>> } > >>> > >>> @Override > >>> public byte[] serialize(T element) { > >>> > >>> if (out == null) { > >>> out = new ByteArrayOutputStream(); > >>> } > >>> try { > >>> out.reset(); > >>> coder.encode(element, out, Coder.Context.NESTED); > >>> } catch (IOException e) { > >>> throw new RuntimeException("encoding failed.", e); > >>> } > >>> return out.toByteArray(); > >>> } > >>> > >>> @Override > >>> public T deserialize(byte[] message) throws IOException { > >>> return coder.decode(new ByteArrayInputStream(message), > >>> Coder.Context.NESTED); > >>> } > >>> > >>> @Override > >>> public boolean isEndOfStream(T nextElement) { > >>> return false; > >>> } > >>> > >>> @Override > >>> public TypeInformation<T> getProducedType() { > >>> return TypeExtractor.getForClass(tClass); > >>> } > >>> } > >>> > >>> > >>> I am getting the following, Kyro exception. > >>> > >>> java.lang.RuntimeException: ConsumerThread threw an exception: Could > not > >>> forward element to next operator > >>> at > >>> > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.run(FlinkKafkaConsumer09.java:336) > >>> at > >>> > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) > >>> at > >>> > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) > >>> at > >>> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) > >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > >>> at java.lang.Thread.run(Thread.java:745) > >>> Caused by: java.lang.RuntimeException: Could not forward element to > >>> next operator > >>> at > >>> > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) > >>> at > >>> > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > >>> at > >>> > org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318) > >>> at > >>> > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473) > >>> Caused by: com.esotericsoftware.kryo.KryoException: > >>> java.lang.UnsupportedOperationException > >>> Serialization trace: > >>> records_ (com.model.Employee$Packet) > >>> at > >>> > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) > >>> at > >>> > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) > >>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) > >>> at > >>> > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:168) > >>> at > >>> > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:349) > >>> ... 3 more > >>> Caused by: java.lang.UnsupportedOperationException > >>> at > >>> java.util.Collections$UnmodifiableCollection.add(Collections.java:1055) > >>> at > >>> > com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116) > >>> at > >>> > com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) > >>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) > >>> at > >>> > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) > >>> > >>> > >>> I am not able to avoid this "Kryo" Exception, Thanks for any help. > >>> > >>> Thanks > >>> > >>> Viswadeep. > >>> > > > > >
