By the way, could you you try just using KafkaIO.Read. This is the the official Beam Kafka source and it should work well with the Flink runner. On Thu, 2 Jun 2016 at 22:47 Viswadeep <[email protected]> wrote:
> yes!, i have tried it works. > -Viswadeep > > Viswadeep Veguru. > > > On Thu, Jun 2, 2016 at 12:43 PM, Amit Sela <[email protected]> wrote: > >> I'm not too familiar with Flink but is there a way to simply read bytes >> of Kafka and in the next step (map ?) to Deserialize the bytes that >> represent this object and transform it into a "Kryo-compliant" object ? >> This should avoid Kryo, right ? >> >> On Thu, Jun 2, 2016 at 10:31 PM Viswadeep <[email protected]> wrote: >> >>> Hi Max, >>> >>> I tried your approach (1) it did not work. I am still getting the same >>> exception. >>> (2) is not possible, unless protobuff complier changes. >>> >>> Yes setting that in config also did not work as well. >>> >>> I think some where inside flink it is not able to process correctly. >>> >>> >>> Thanks >>> Viswadeep >>> >>> >>> Viswadeep Veguru. >>> >>> >>> On Thu, Jun 2, 2016 at 10:09 AM, Maximilian Michels <[email protected]> >>> wrote: >>> >>>> Hi Viswadeep, >>>> >>>> What Amit recommended (thanks!) is indeed an easy fix if you're using >>>> Flink. However, we don't expose the ExecutionConfig in Beam. So >>>> setting custom Kryo serializers is not possible. >>>> >>>> Two other options I see: >>>> >>>> 1) Could you try using the following in your deserialization schema? >>>> >>>> @Override >>>> public TypeInformation<T> getProducedType() { >>>> return new CoderTypeInformation<>(coder); >>>> } >>>> >>>> 2) Could you avoid using Collections.UnmodifiableList in your code? >>>> Not sure if it is possible because it seems to be a field in your >>>> Protobuf class. >>>> >>>> Thanks, >>>> Max >>>> >>>> On Thu, Jun 2, 2016 at 4:49 PM, Viswadeep <[email protected]> wrote: >>>> > Thanks Amit, >>>> > >>>> > It did not solve my issue, Flink it self has the following comment in >>>> > ExecutionConfig.java >>>> > >>>> > /** >>>> > * Force TypeExtractor to use Kryo serializer for POJOS even though >>>> we could >>>> > analyze as POJO. >>>> > * In some cases this might be preferable. For example, when using >>>> > interfaces >>>> > * with subclasses that cannot be analyzed as POJO. >>>> > */ >>>> > >>>> > >>>> > Because generated Protobuf class extends GeneratedMessage implements >>>> > Employee.PacketOrBuilder >>>> > >>>> > >>>> > Thanks for any help. >>>> > Viswadeep >>>> > >>>> > >>>> > On Wed, Jun 1, 2016 at 11:02 PM, Amit Sela <[email protected]> >>>> wrote: >>>> >> >>>> >> I think it has to do with Flink's internal use of Kryo - take a >>>> look at >>>> >> this: >>>> >> >>>> http://stackoverflow.com/questions/32453030/using-an-collectionsunmodifiablecollection-with-apache-flink >>>> >> I'm sure Flink committers will soon reach out to correct me if I'm >>>> missing >>>> >> something.. >>>> >> >>>> >> On Thu, Jun 2, 2016 at 5:01 AM Viswadeep <[email protected]> >>>> wrote: >>>> >>> >>>> >>> Hi >>>> >>> >>>> >>> I am using apache beam with flink and ProtoBuf for encoding and >>>> decoding. >>>> >>> >>>> >>> The following is the method for the FlinkKafka Consumer. >>>> >>> >>>> >>> public UnboundedSource<T extends >>>> Message,UnboundedSource.CheckpointMark> >>>> >>> build() { >>>> >>> Properties p = new Properties(); >>>> >>> p.setProperty("zookeeper.connect", kafkaOptions.getZookeeper()); >>>> >>> p.setProperty("bootstrap.servers", kafkaOptions.getBroker()); >>>> >>> p.setProperty("group.id", kafkaOptions.getGroup()); >>>> >>> FlinkKafkaConsumer09<T> kafkaConsumer = new >>>> >>> FlinkKafkaConsumer09<>(kafkaOptions.getKafkaTopic(), >>>> >>> new SerializationDeserializationSchema(typedSource, >>>> >>> ProtoCoder.of(typedSource)), p); >>>> >>> return UnboundedFlinkSource.of(kafkaConsumer); >>>> >>> } >>>> >>> >>>> >>> and the helper for serialization and DeSerialization is this. >>>> >>> >>>> >>> public class SerializationDeserializationSchema<T> >>>> >>> implements SerializationSchema<T>, DeserializationSchema<T> >>>> { >>>> >>> >>>> >>> private final Class<T> tClass; >>>> >>> >>>> >>> private final Coder<T> coder; >>>> >>> private transient ByteArrayOutputStream out; >>>> >>> >>>> >>> public SerializationDeserializationSchema(Class<T> clazz, >>>> Coder<T> >>>> >>> coder) { >>>> >>> this.tClass = clazz; >>>> >>> this.coder = coder; >>>> >>> this.out = new ByteArrayOutputStream(); >>>> >>> } >>>> >>> >>>> >>> @Override >>>> >>> public byte[] serialize(T element) { >>>> >>> >>>> >>> if (out == null) { >>>> >>> out = new ByteArrayOutputStream(); >>>> >>> } >>>> >>> try { >>>> >>> out.reset(); >>>> >>> coder.encode(element, out, Coder.Context.NESTED); >>>> >>> } catch (IOException e) { >>>> >>> throw new RuntimeException("encoding failed.", e); >>>> >>> } >>>> >>> return out.toByteArray(); >>>> >>> } >>>> >>> >>>> >>> @Override >>>> >>> public T deserialize(byte[] message) throws IOException { >>>> >>> return coder.decode(new ByteArrayInputStream(message), >>>> >>> Coder.Context.NESTED); >>>> >>> } >>>> >>> >>>> >>> @Override >>>> >>> public boolean isEndOfStream(T nextElement) { >>>> >>> return false; >>>> >>> } >>>> >>> >>>> >>> @Override >>>> >>> public TypeInformation<T> getProducedType() { >>>> >>> return TypeExtractor.getForClass(tClass); >>>> >>> } >>>> >>> } >>>> >>> >>>> >>> >>>> >>> I am getting the following, Kyro exception. >>>> >>> >>>> >>> java.lang.RuntimeException: ConsumerThread threw an exception: >>>> Could not >>>> >>> forward element to next operator >>>> >>> at >>>> >>> >>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.run(FlinkKafkaConsumer09.java:336) >>>> >>> at >>>> >>> >>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) >>>> >>> at >>>> >>> >>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) >>>> >>> at >>>> >>> >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) >>>> >>> at >>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>> >>> at java.lang.Thread.run(Thread.java:745) >>>> >>> Caused by: java.lang.RuntimeException: Could not forward >>>> element to >>>> >>> next operator >>>> >>> at >>>> >>> >>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) >>>> >>> at >>>> >>> >>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) >>>> >>> at >>>> >>> >>>> org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318) >>>> >>> at >>>> >>> >>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473) >>>> >>> Caused by: com.esotericsoftware.kryo.KryoException: >>>> >>> java.lang.UnsupportedOperationException >>>> >>> Serialization trace: >>>> >>> records_ (com.model.Employee$Packet) >>>> >>> at >>>> >>> >>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) >>>> >>> at >>>> >>> >>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) >>>> >>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) >>>> >>> at >>>> >>> >>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:168) >>>> >>> at >>>> >>> >>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:349) >>>> >>> ... 3 more >>>> >>> Caused by: java.lang.UnsupportedOperationException >>>> >>> at >>>> >>> >>>> java.util.Collections$UnmodifiableCollection.add(Collections.java:1055) >>>> >>> at >>>> >>> >>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116) >>>> >>> at >>>> >>> >>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) >>>> >>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) >>>> >>> at >>>> >>> >>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) >>>> >>> >>>> >>> >>>> >>> I am not able to avoid this "Kryo" Exception, Thanks for any help. >>>> >>> >>>> >>> Thanks >>>> >>> >>>> >>> Viswadeep. >>>> >>> >>>> > >>>> > >>>> >>> >>> >
