yes!, i have tried it works.
-Viswadeep

Viswadeep Veguru.


On Thu, Jun 2, 2016 at 12:43 PM, Amit Sela <[email protected]> wrote:

> I'm not too familiar with Flink but is there a way to simply read bytes of
> Kafka and in the next step (map ?) to Deserialize the bytes that represent
> this object and transform it into a "Kryo-compliant" object ? This should
> avoid Kryo, right ?
>
> On Thu, Jun 2, 2016 at 10:31 PM Viswadeep <[email protected]> wrote:
>
>> Hi Max,
>>
>> I tried your approach (1) it did not work. I am still getting the same
>> exception.
>> (2) is not possible, unless protobuff complier changes.
>>
>> Yes setting that in config also did not work as well.
>>
>> I think some where inside flink it is not able to process correctly.
>>
>>
>> Thanks
>> Viswadeep
>>
>>
>> Viswadeep Veguru.
>>
>>
>> On Thu, Jun 2, 2016 at 10:09 AM, Maximilian Michels <[email protected]>
>> wrote:
>>
>>> Hi Viswadeep,
>>>
>>> What Amit recommended (thanks!) is indeed an easy fix if you're using
>>> Flink. However, we don't expose the ExecutionConfig in Beam. So
>>> setting custom Kryo serializers is not possible.
>>>
>>> Two other options I see:
>>>
>>> 1) Could you try using the following in your deserialization schema?
>>>
>>>    @Override
>>>     public TypeInformation<T> getProducedType() {
>>>         return new CoderTypeInformation<>(coder);
>>>     }
>>>
>>> 2) Could you avoid using Collections.UnmodifiableList in your code?
>>> Not sure if it is possible because it seems to be a field in your
>>> Protobuf class.
>>>
>>> Thanks,
>>> Max
>>>
>>> On Thu, Jun 2, 2016 at 4:49 PM, Viswadeep <[email protected]> wrote:
>>> > Thanks Amit,
>>> >
>>> > It did not solve my issue, Flink it self has the following comment in
>>> > ExecutionConfig.java
>>> >
>>> > /**
>>> >  * Force TypeExtractor to use Kryo serializer for POJOS even though we
>>> could
>>> > analyze as POJO.
>>> >  * In some cases this might be preferable. For example, when using
>>> > interfaces
>>> >  * with subclasses that cannot be analyzed as POJO.
>>> >  */
>>> >
>>> >
>>> > Because generated Protobuf class extends GeneratedMessage implements
>>> > Employee.PacketOrBuilder
>>> >
>>> >
>>> > Thanks for any help.
>>> > Viswadeep
>>> >
>>> >
>>> > On Wed, Jun 1, 2016 at 11:02 PM, Amit Sela <[email protected]>
>>> wrote:
>>> >>
>>> >> I think it has to do with Flink's internal use of Kryo  - take a look
>>> at
>>> >> this:
>>> >>
>>> http://stackoverflow.com/questions/32453030/using-an-collectionsunmodifiablecollection-with-apache-flink
>>> >> I'm sure Flink committers will soon reach out to correct me if I'm
>>> missing
>>> >> something..
>>> >>
>>> >> On Thu, Jun 2, 2016 at 5:01 AM Viswadeep <[email protected]> wrote:
>>> >>>
>>> >>> Hi
>>> >>>
>>> >>> I am using apache beam with flink and ProtoBuf for encoding and
>>> decoding.
>>> >>>
>>> >>> The following is the method for the FlinkKafka Consumer.
>>> >>>
>>> >>> public UnboundedSource<T extends
>>> Message,UnboundedSource.CheckpointMark>
>>> >>> build() {
>>> >>>     Properties p = new Properties();
>>> >>>     p.setProperty("zookeeper.connect", kafkaOptions.getZookeeper());
>>> >>>     p.setProperty("bootstrap.servers", kafkaOptions.getBroker());
>>> >>>     p.setProperty("group.id", kafkaOptions.getGroup());
>>> >>>     FlinkKafkaConsumer09<T> kafkaConsumer = new
>>> >>> FlinkKafkaConsumer09<>(kafkaOptions.getKafkaTopic(),
>>> >>>             new SerializationDeserializationSchema(typedSource,
>>> >>> ProtoCoder.of(typedSource)), p);
>>> >>>     return UnboundedFlinkSource.of(kafkaConsumer);
>>> >>> }
>>> >>>
>>> >>> and the helper for serialization and DeSerialization is this.
>>> >>>
>>> >>> public class SerializationDeserializationSchema<T>
>>> >>>         implements SerializationSchema<T>, DeserializationSchema<T> {
>>> >>>
>>> >>>     private final Class<T> tClass;
>>> >>>
>>> >>>     private final Coder<T> coder;
>>> >>>     private transient ByteArrayOutputStream out;
>>> >>>
>>> >>>     public SerializationDeserializationSchema(Class<T> clazz,
>>> Coder<T>
>>> >>> coder) {
>>> >>>         this.tClass = clazz;
>>> >>>         this.coder = coder;
>>> >>>         this.out = new ByteArrayOutputStream();
>>> >>>     }
>>> >>>
>>> >>>     @Override
>>> >>>     public byte[] serialize(T element) {
>>> >>>
>>> >>>         if (out == null) {
>>> >>>             out = new ByteArrayOutputStream();
>>> >>>         }
>>> >>>         try {
>>> >>>             out.reset();
>>> >>>             coder.encode(element, out, Coder.Context.NESTED);
>>> >>>         } catch (IOException e) {
>>> >>>             throw new RuntimeException("encoding failed.", e);
>>> >>>         }
>>> >>>         return out.toByteArray();
>>> >>>     }
>>> >>>
>>> >>>     @Override
>>> >>>     public T deserialize(byte[] message) throws IOException {
>>> >>>         return coder.decode(new ByteArrayInputStream(message),
>>> >>> Coder.Context.NESTED);
>>> >>>     }
>>> >>>
>>> >>>     @Override
>>> >>>     public boolean isEndOfStream(T nextElement) {
>>> >>>         return false;
>>> >>>     }
>>> >>>
>>> >>>     @Override
>>> >>>     public TypeInformation<T> getProducedType() {
>>> >>>         return TypeExtractor.getForClass(tClass);
>>> >>>     }
>>> >>> }
>>> >>>
>>> >>>
>>> >>> I am getting the following, Kyro exception.
>>> >>>
>>> >>> java.lang.RuntimeException: ConsumerThread threw an exception: Could
>>> not
>>> >>> forward element to next operator
>>> >>>         at
>>> >>>
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.run(FlinkKafkaConsumer09.java:336)
>>> >>>         at
>>> >>>
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
>>> >>>         at
>>> >>>
>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
>>> >>>         at
>>> >>>
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>>> >>>         at
>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>> >>>         at java.lang.Thread.run(Thread.java:745)
>>> >>>     Caused by: java.lang.RuntimeException: Could not forward element
>>> to
>>> >>> next operator
>>> >>>         at
>>> >>>
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>>> >>>         at
>>> >>>
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>>> >>>         at
>>> >>>
>>> org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318)
>>> >>>         at
>>> >>>
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473)
>>> >>>     Caused by: com.esotericsoftware.kryo.KryoException:
>>> >>> java.lang.UnsupportedOperationException
>>> >>>     Serialization trace:
>>> >>>     records_ (com.model.Employee$Packet)
>>> >>>         at
>>> >>>
>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>> >>>         at
>>> >>>
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>> >>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
>>> >>>         at
>>> >>>
>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:168)
>>> >>>         at
>>> >>>
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:349)
>>> >>>         ... 3 more
>>> >>>     Caused by: java.lang.UnsupportedOperationException
>>> >>>         at
>>> >>>
>>> java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
>>> >>>         at
>>> >>>
>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
>>> >>>         at
>>> >>>
>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
>>> >>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>> >>>         at
>>> >>>
>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>>> >>>
>>> >>>
>>> >>> I am not able to avoid this "Kryo" Exception, Thanks for any help.
>>> >>>
>>> >>> Thanks
>>> >>>
>>> >>> Viswadeep.
>>> >>>
>>> >
>>> >
>>>
>>
>>

Reply via email to