I'm not too familiar with Flink but is there a way to simply read bytes of Kafka and in the next step (map ?) to Deserialize the bytes that represent this object and transform it into a "Kryo-compliant" object ? This should avoid Kryo, right ?
On Thu, Jun 2, 2016 at 10:31 PM Viswadeep <[email protected]> wrote: > Hi Max, > > I tried your approach (1) it did not work. I am still getting the same > exception. > (2) is not possible, unless protobuff complier changes. > > Yes setting that in config also did not work as well. > > I think some where inside flink it is not able to process correctly. > > > Thanks > Viswadeep > > > Viswadeep Veguru. > > > On Thu, Jun 2, 2016 at 10:09 AM, Maximilian Michels <[email protected]> > wrote: > >> Hi Viswadeep, >> >> What Amit recommended (thanks!) is indeed an easy fix if you're using >> Flink. However, we don't expose the ExecutionConfig in Beam. So >> setting custom Kryo serializers is not possible. >> >> Two other options I see: >> >> 1) Could you try using the following in your deserialization schema? >> >> @Override >> public TypeInformation<T> getProducedType() { >> return new CoderTypeInformation<>(coder); >> } >> >> 2) Could you avoid using Collections.UnmodifiableList in your code? >> Not sure if it is possible because it seems to be a field in your >> Protobuf class. >> >> Thanks, >> Max >> >> On Thu, Jun 2, 2016 at 4:49 PM, Viswadeep <[email protected]> wrote: >> > Thanks Amit, >> > >> > It did not solve my issue, Flink it self has the following comment in >> > ExecutionConfig.java >> > >> > /** >> > * Force TypeExtractor to use Kryo serializer for POJOS even though we >> could >> > analyze as POJO. >> > * In some cases this might be preferable. For example, when using >> > interfaces >> > * with subclasses that cannot be analyzed as POJO. >> > */ >> > >> > >> > Because generated Protobuf class extends GeneratedMessage implements >> > Employee.PacketOrBuilder >> > >> > >> > Thanks for any help. >> > Viswadeep >> > >> > >> > On Wed, Jun 1, 2016 at 11:02 PM, Amit Sela <[email protected]> >> wrote: >> >> >> >> I think it has to do with Flink's internal use of Kryo - take a look >> at >> >> this: >> >> >> http://stackoverflow.com/questions/32453030/using-an-collectionsunmodifiablecollection-with-apache-flink >> >> I'm sure Flink committers will soon reach out to correct me if I'm >> missing >> >> something.. >> >> >> >> On Thu, Jun 2, 2016 at 5:01 AM Viswadeep <[email protected]> wrote: >> >>> >> >>> Hi >> >>> >> >>> I am using apache beam with flink and ProtoBuf for encoding and >> decoding. >> >>> >> >>> The following is the method for the FlinkKafka Consumer. >> >>> >> >>> public UnboundedSource<T extends >> Message,UnboundedSource.CheckpointMark> >> >>> build() { >> >>> Properties p = new Properties(); >> >>> p.setProperty("zookeeper.connect", kafkaOptions.getZookeeper()); >> >>> p.setProperty("bootstrap.servers", kafkaOptions.getBroker()); >> >>> p.setProperty("group.id", kafkaOptions.getGroup()); >> >>> FlinkKafkaConsumer09<T> kafkaConsumer = new >> >>> FlinkKafkaConsumer09<>(kafkaOptions.getKafkaTopic(), >> >>> new SerializationDeserializationSchema(typedSource, >> >>> ProtoCoder.of(typedSource)), p); >> >>> return UnboundedFlinkSource.of(kafkaConsumer); >> >>> } >> >>> >> >>> and the helper for serialization and DeSerialization is this. >> >>> >> >>> public class SerializationDeserializationSchema<T> >> >>> implements SerializationSchema<T>, DeserializationSchema<T> { >> >>> >> >>> private final Class<T> tClass; >> >>> >> >>> private final Coder<T> coder; >> >>> private transient ByteArrayOutputStream out; >> >>> >> >>> public SerializationDeserializationSchema(Class<T> clazz, Coder<T> >> >>> coder) { >> >>> this.tClass = clazz; >> >>> this.coder = coder; >> >>> this.out = new ByteArrayOutputStream(); >> >>> } >> >>> >> >>> @Override >> >>> public byte[] serialize(T element) { >> >>> >> >>> if (out == null) { >> >>> out = new ByteArrayOutputStream(); >> >>> } >> >>> try { >> >>> out.reset(); >> >>> coder.encode(element, out, Coder.Context.NESTED); >> >>> } catch (IOException e) { >> >>> throw new RuntimeException("encoding failed.", e); >> >>> } >> >>> return out.toByteArray(); >> >>> } >> >>> >> >>> @Override >> >>> public T deserialize(byte[] message) throws IOException { >> >>> return coder.decode(new ByteArrayInputStream(message), >> >>> Coder.Context.NESTED); >> >>> } >> >>> >> >>> @Override >> >>> public boolean isEndOfStream(T nextElement) { >> >>> return false; >> >>> } >> >>> >> >>> @Override >> >>> public TypeInformation<T> getProducedType() { >> >>> return TypeExtractor.getForClass(tClass); >> >>> } >> >>> } >> >>> >> >>> >> >>> I am getting the following, Kyro exception. >> >>> >> >>> java.lang.RuntimeException: ConsumerThread threw an exception: Could >> not >> >>> forward element to next operator >> >>> at >> >>> >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.run(FlinkKafkaConsumer09.java:336) >> >>> at >> >>> >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) >> >>> at >> >>> >> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) >> >>> at >> >>> >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) >> >>> at >> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >> >>> at java.lang.Thread.run(Thread.java:745) >> >>> Caused by: java.lang.RuntimeException: Could not forward element >> to >> >>> next operator >> >>> at >> >>> >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) >> >>> at >> >>> >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) >> >>> at >> >>> >> org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318) >> >>> at >> >>> >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473) >> >>> Caused by: com.esotericsoftware.kryo.KryoException: >> >>> java.lang.UnsupportedOperationException >> >>> Serialization trace: >> >>> records_ (com.model.Employee$Packet) >> >>> at >> >>> >> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) >> >>> at >> >>> >> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) >> >>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) >> >>> at >> >>> >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:168) >> >>> at >> >>> >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:349) >> >>> ... 3 more >> >>> Caused by: java.lang.UnsupportedOperationException >> >>> at >> >>> >> java.util.Collections$UnmodifiableCollection.add(Collections.java:1055) >> >>> at >> >>> >> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116) >> >>> at >> >>> >> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) >> >>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) >> >>> at >> >>> >> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) >> >>> >> >>> >> >>> I am not able to avoid this "Kryo" Exception, Thanks for any help. >> >>> >> >>> Thanks >> >>> >> >>> Viswadeep. >> >>> >> > >> > >> > >
