org.apache.beam.sdk.io.kafka.KafkaIO -Aljoscha
On Fri, 3 Jun 2016 at 01:03 Viswadeep <[email protected]> wrote: > are suggesting about this org.apache.beam.runners.spark.io.KafkaIO; > class? > Which is part of spark? > > Thanks > Viswadeep > > Viswadeep Veguru. > > > On Thu, Jun 2, 2016 at 2:32 PM, Aljoscha Krettek <[email protected]> > wrote: > >> By the way, could you you try just using KafkaIO.Read. This is the the >> official Beam Kafka source and it should work well with the Flink runner. >> >> On Thu, 2 Jun 2016 at 22:47 Viswadeep <[email protected]> wrote: >> >>> yes!, i have tried it works. >>> -Viswadeep >>> >>> Viswadeep Veguru. >>> >>> >>> On Thu, Jun 2, 2016 at 12:43 PM, Amit Sela <[email protected]> wrote: >>> >>>> I'm not too familiar with Flink but is there a way to simply read bytes >>>> of Kafka and in the next step (map ?) to Deserialize the bytes that >>>> represent this object and transform it into a "Kryo-compliant" object ? >>>> This should avoid Kryo, right ? >>>> >>>> On Thu, Jun 2, 2016 at 10:31 PM Viswadeep <[email protected]> wrote: >>>> >>>>> Hi Max, >>>>> >>>>> I tried your approach (1) it did not work. I am still getting the same >>>>> exception. >>>>> (2) is not possible, unless protobuff complier changes. >>>>> >>>>> Yes setting that in config also did not work as well. >>>>> >>>>> I think some where inside flink it is not able to process correctly. >>>>> >>>>> >>>>> Thanks >>>>> Viswadeep >>>>> >>>>> >>>>> Viswadeep Veguru. >>>>> >>>>> >>>>> On Thu, Jun 2, 2016 at 10:09 AM, Maximilian Michels <[email protected]> >>>>> wrote: >>>>> >>>>>> Hi Viswadeep, >>>>>> >>>>>> What Amit recommended (thanks!) is indeed an easy fix if you're using >>>>>> Flink. However, we don't expose the ExecutionConfig in Beam. So >>>>>> setting custom Kryo serializers is not possible. >>>>>> >>>>>> Two other options I see: >>>>>> >>>>>> 1) Could you try using the following in your deserialization schema? >>>>>> >>>>>> @Override >>>>>> public TypeInformation<T> getProducedType() { >>>>>> return new CoderTypeInformation<>(coder); >>>>>> } >>>>>> >>>>>> 2) Could you avoid using Collections.UnmodifiableList in your code? >>>>>> Not sure if it is possible because it seems to be a field in your >>>>>> Protobuf class. >>>>>> >>>>>> Thanks, >>>>>> Max >>>>>> >>>>>> On Thu, Jun 2, 2016 at 4:49 PM, Viswadeep <[email protected]> >>>>>> wrote: >>>>>> > Thanks Amit, >>>>>> > >>>>>> > It did not solve my issue, Flink it self has the following comment >>>>>> in >>>>>> > ExecutionConfig.java >>>>>> > >>>>>> > /** >>>>>> > * Force TypeExtractor to use Kryo serializer for POJOS even though >>>>>> we could >>>>>> > analyze as POJO. >>>>>> > * In some cases this might be preferable. For example, when using >>>>>> > interfaces >>>>>> > * with subclasses that cannot be analyzed as POJO. >>>>>> > */ >>>>>> > >>>>>> > >>>>>> > Because generated Protobuf class extends GeneratedMessage implements >>>>>> > Employee.PacketOrBuilder >>>>>> > >>>>>> > >>>>>> > Thanks for any help. >>>>>> > Viswadeep >>>>>> > >>>>>> > >>>>>> > On Wed, Jun 1, 2016 at 11:02 PM, Amit Sela <[email protected]> >>>>>> wrote: >>>>>> >> >>>>>> >> I think it has to do with Flink's internal use of Kryo - take a >>>>>> look at >>>>>> >> this: >>>>>> >> >>>>>> http://stackoverflow.com/questions/32453030/using-an-collectionsunmodifiablecollection-with-apache-flink >>>>>> >> I'm sure Flink committers will soon reach out to correct me if I'm >>>>>> missing >>>>>> >> something.. >>>>>> >> >>>>>> >> On Thu, Jun 2, 2016 at 5:01 AM Viswadeep <[email protected]> >>>>>> wrote: >>>>>> >>> >>>>>> >>> Hi >>>>>> >>> >>>>>> >>> I am using apache beam with flink and ProtoBuf for encoding and >>>>>> decoding. >>>>>> >>> >>>>>> >>> The following is the method for the FlinkKafka Consumer. >>>>>> >>> >>>>>> >>> public UnboundedSource<T extends >>>>>> Message,UnboundedSource.CheckpointMark> >>>>>> >>> build() { >>>>>> >>> Properties p = new Properties(); >>>>>> >>> p.setProperty("zookeeper.connect", >>>>>> kafkaOptions.getZookeeper()); >>>>>> >>> p.setProperty("bootstrap.servers", kafkaOptions.getBroker()); >>>>>> >>> p.setProperty("group.id", kafkaOptions.getGroup()); >>>>>> >>> FlinkKafkaConsumer09<T> kafkaConsumer = new >>>>>> >>> FlinkKafkaConsumer09<>(kafkaOptions.getKafkaTopic(), >>>>>> >>> new SerializationDeserializationSchema(typedSource, >>>>>> >>> ProtoCoder.of(typedSource)), p); >>>>>> >>> return UnboundedFlinkSource.of(kafkaConsumer); >>>>>> >>> } >>>>>> >>> >>>>>> >>> and the helper for serialization and DeSerialization is this. >>>>>> >>> >>>>>> >>> public class SerializationDeserializationSchema<T> >>>>>> >>> implements SerializationSchema<T>, >>>>>> DeserializationSchema<T> { >>>>>> >>> >>>>>> >>> private final Class<T> tClass; >>>>>> >>> >>>>>> >>> private final Coder<T> coder; >>>>>> >>> private transient ByteArrayOutputStream out; >>>>>> >>> >>>>>> >>> public SerializationDeserializationSchema(Class<T> clazz, >>>>>> Coder<T> >>>>>> >>> coder) { >>>>>> >>> this.tClass = clazz; >>>>>> >>> this.coder = coder; >>>>>> >>> this.out = new ByteArrayOutputStream(); >>>>>> >>> } >>>>>> >>> >>>>>> >>> @Override >>>>>> >>> public byte[] serialize(T element) { >>>>>> >>> >>>>>> >>> if (out == null) { >>>>>> >>> out = new ByteArrayOutputStream(); >>>>>> >>> } >>>>>> >>> try { >>>>>> >>> out.reset(); >>>>>> >>> coder.encode(element, out, Coder.Context.NESTED); >>>>>> >>> } catch (IOException e) { >>>>>> >>> throw new RuntimeException("encoding failed.", e); >>>>>> >>> } >>>>>> >>> return out.toByteArray(); >>>>>> >>> } >>>>>> >>> >>>>>> >>> @Override >>>>>> >>> public T deserialize(byte[] message) throws IOException { >>>>>> >>> return coder.decode(new ByteArrayInputStream(message), >>>>>> >>> Coder.Context.NESTED); >>>>>> >>> } >>>>>> >>> >>>>>> >>> @Override >>>>>> >>> public boolean isEndOfStream(T nextElement) { >>>>>> >>> return false; >>>>>> >>> } >>>>>> >>> >>>>>> >>> @Override >>>>>> >>> public TypeInformation<T> getProducedType() { >>>>>> >>> return TypeExtractor.getForClass(tClass); >>>>>> >>> } >>>>>> >>> } >>>>>> >>> >>>>>> >>> >>>>>> >>> I am getting the following, Kyro exception. >>>>>> >>> >>>>>> >>> java.lang.RuntimeException: ConsumerThread threw an exception: >>>>>> Could not >>>>>> >>> forward element to next operator >>>>>> >>> at >>>>>> >>> >>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.run(FlinkKafkaConsumer09.java:336) >>>>>> >>> at >>>>>> >>> >>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) >>>>>> >>> at >>>>>> >>> >>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) >>>>>> >>> at >>>>>> >>> >>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) >>>>>> >>> at >>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>>> >>> at java.lang.Thread.run(Thread.java:745) >>>>>> >>> Caused by: java.lang.RuntimeException: Could not forward >>>>>> element to >>>>>> >>> next operator >>>>>> >>> at >>>>>> >>> >>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) >>>>>> >>> at >>>>>> >>> >>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) >>>>>> >>> at >>>>>> >>> >>>>>> org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318) >>>>>> >>> at >>>>>> >>> >>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473) >>>>>> >>> Caused by: com.esotericsoftware.kryo.KryoException: >>>>>> >>> java.lang.UnsupportedOperationException >>>>>> >>> Serialization trace: >>>>>> >>> records_ (com.model.Employee$Packet) >>>>>> >>> at >>>>>> >>> >>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) >>>>>> >>> at >>>>>> >>> >>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) >>>>>> >>> at >>>>>> com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) >>>>>> >>> at >>>>>> >>> >>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:168) >>>>>> >>> at >>>>>> >>> >>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:349) >>>>>> >>> ... 3 more >>>>>> >>> Caused by: java.lang.UnsupportedOperationException >>>>>> >>> at >>>>>> >>> >>>>>> java.util.Collections$UnmodifiableCollection.add(Collections.java:1055) >>>>>> >>> at >>>>>> >>> >>>>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116) >>>>>> >>> at >>>>>> >>> >>>>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) >>>>>> >>> at >>>>>> com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) >>>>>> >>> at >>>>>> >>> >>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) >>>>>> >>> >>>>>> >>> >>>>>> >>> I am not able to avoid this "Kryo" Exception, Thanks for any help. >>>>>> >>> >>>>>> >>> Thanks >>>>>> >>> >>>>>> >>> Viswadeep. >>>>>> >>> >>>>>> > >>>>>> > >>>>>> >>>>> >>>>> >>> >
