OK. I got past the problem. Basically, I had to change

public class MyKafkaMessageSerDeSchema implements
DeserializationSchema<MyKafkaMessage>,
SerializationSchema<MyKafkaMessage> {

    @Override
    public MyKafkaMessage deserialize(byte[] message) throws IOException {
        MyKafkaMessage MyKafkaMessage = null;
        try {
            MyKafkaMessage =  MyKafkaMessage.parseFrom(message);
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        } finally {
            return MyKafkaMessage;
        }
    }

    @Override
    public boolean isEndOfStream(MyKafkaMessage nextElement) {
        return false;
    }

    @Override
    public TypeInformation<MyKafkaMessage> getProducedType() {
        return TypeExtractor.getForClass(MyKafkaMessage.class);;
-------------------------> Add Type Info
    }

    @Override
    public byte[] serialize(MyKafkaMessage element) {
        return
element.byteArray();
--------------------------> modify serializer
    }
}


When I run my program, I get another exception :


java.lang.NullPointerException
        at 
shaded.com.google.protobuf.UnmodifiableLazyStringList.size(UnmodifiableLazyStringList.java:68)
        at java.util.AbstractList.add(AbstractList.java:108)
        at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:131)
        at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:22)
        at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
        at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:176)
        at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:236)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
        at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
        at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
        at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110)
        at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264)
        at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86)
        at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149)
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:748)


On Tue, Aug 29, 2017 at 8:43 AM, Ted Yu <yuzhih...@gmail.com> wrote:

> The NPE came from this line:
>
>         StreamRecord<T> copy = castRecord.copy(serializer.
> copy(castRecord.getValue()));
>
> Either serializer or castRecord was null.
>
> I wonder if this has been fixed in 1.3.2 release.
>
> On Mon, Aug 28, 2017 at 7:24 PM, Sridhar Chellappa <flinken...@gmail.com>
> wrote:
>
>> Kafka Version is 0.10.0
>>
>> On Tue, Aug 29, 2017 at 6:43 AM, Sridhar Chellappa <flinken...@gmail.com>
>> wrote:
>>
>>> 1.3.0
>>>
>>> On Mon, Aug 28, 2017 at 10:09 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>
>>>> Which Flink version are you using (so that line numbers can be matched
>>>> with source code) ?
>>>>
>>>> On Mon, Aug 28, 2017 at 9:16 AM, Sridhar Chellappa <
>>>> flinken...@gmail.com> wrote:
>>>>
>>>>> DataStream<MyKafkaMessage> MyKafkaMessageDataStream = env.addSource(
>>>>>                 getStreamSource(env, parameterTool);
>>>>>         );
>>>>>
>>>>>
>>>>>
>>>>>         public RichParallelSourceFunction<MyKafkaMessage>
>>>>> getStreamSource(StreamExecutionEnvironment env, ParameterTool
>>>>> parameterTool) {
>>>>>
>>>>>            // MyKAfkaMessage is a ProtoBuf message
>>>>>             env.getConfig().registerTypeWi
>>>>> thKryoSerializer(MyKafkaMessage.class, ProtobufSerializer.class);
>>>>>
>>>>>             KafkaDataSource<MyKafkaMessage> flinkCepConsumer =
>>>>>                     new KafkaDataSource<MyKafkaMessage>(parameterTool,
>>>>> new MyKafkaMessageSerDeSchema());
>>>>>
>>>>>             return flinkCepConsumer;
>>>>>         }
>>>>>
>>>>>
>>>>> public class KafkaDataSource<T> extends FlinkKafkaConsumer010<T> {
>>>>>
>>>>>     public KafkaDataSource(ParameterTool parameterTool,
>>>>> DeserializationSchema<T> deserializer) {
>>>>>         super(
>>>>>                 Arrays.asList(parameterTool.ge
>>>>> tRequired("topic").split(",")),
>>>>>                 deserializer,
>>>>>                 parameterTool.getProperties()
>>>>>         );
>>>>>
>>>>>     }
>>>>>
>>>>> }
>>>>>
>>>>> public class MyKafkaMessageSerDeSchema implements
>>>>> DeserializationSchema<MyKafkaMessage>, SerializationSchema<MyKafkaMessage>
>>>>> {
>>>>>
>>>>>     @Override
>>>>>     public MyKafkaMessage deserialize(byte[] message) throws
>>>>> IOException {
>>>>>         MyKafkaMessage MyKafkaMessage = null;
>>>>>         try {
>>>>>             MyKafkaMessage =  MyKafkaMessage.parseFrom(message);
>>>>>         } catch (InvalidProtocolBufferException e) {
>>>>>             e.printStackTrace();
>>>>>         } finally {
>>>>>             return MyKafkaMessage;
>>>>>         }
>>>>>     }
>>>>>
>>>>>     @Override
>>>>>     public boolean isEndOfStream(MyKafkaMessage nextElement) {
>>>>>         return false;
>>>>>     }
>>>>>
>>>>>     @Override
>>>>>     public TypeInformation<MyKafkaMessage> getProducedType() {
>>>>>         return null;
>>>>>     }
>>>>>
>>>>>     @Override
>>>>>     public byte[] serialize(MyKafkaMessage element) {
>>>>>         return new byte[0];
>>>>>     }
>>>>> }
>>>>>
>>>>> On Mon, Aug 28, 2017 at 8:26 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>
>>>>>> Which version of Flink / Kafka are you using ?
>>>>>>
>>>>>> Can you show the snippet of code where you create the DataStream ?
>>>>>>
>>>>>> Cheers
>>>>>>
>>>>>> On Mon, Aug 28, 2017 at 7:38 AM, Sridhar Chellappa <
>>>>>> flinken...@gmail.com> wrote:
>>>>>>
>>>>>>> Folks,
>>>>>>>
>>>>>>> I have a KafkaConsumer that I am trying to read messages from. When
>>>>>>> I try to create a DataStream from the KafkConsumer (env.addSource()) I 
>>>>>>> get
>>>>>>> the following exception :
>>>>>>>
>>>>>>> Any idea on how can this happen?
>>>>>>>
>>>>>>> java.lang.NullPointerException
>>>>>>>         at 
>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
>>>>>>>         at 
>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
>>>>>>>         at 
>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
>>>>>>>         at 
>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
>>>>>>>         at 
>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
>>>>>>>         at 
>>>>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
>>>>>>>         at 
>>>>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110)
>>>>>>>         at 
>>>>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264)
>>>>>>>         at 
>>>>>>> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86)
>>>>>>>         at 
>>>>>>> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149)
>>>>>>>         at 
>>>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)
>>>>>>>         at 
>>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
>>>>>>>         at 
>>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>>>>>>>         at 
>>>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
>>>>>>>         at 
>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
>>>>>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>>>>>>         at java.lang.Thread.run(Thread.java:748)
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to