The NPE came from this line:

        StreamRecord<T> copy =
castRecord.copy(serializer.copy(castRecord.getValue()));

Either serializer or castRecord was null.

I wonder if this has been fixed in 1.3.2 release.

On Mon, Aug 28, 2017 at 7:24 PM, Sridhar Chellappa <flinken...@gmail.com>
wrote:

> Kafka Version is 0.10.0
>
> On Tue, Aug 29, 2017 at 6:43 AM, Sridhar Chellappa <flinken...@gmail.com>
> wrote:
>
>> 1.3.0
>>
>> On Mon, Aug 28, 2017 at 10:09 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>>> Which Flink version are you using (so that line numbers can be matched
>>> with source code) ?
>>>
>>> On Mon, Aug 28, 2017 at 9:16 AM, Sridhar Chellappa <flinken...@gmail.com
>>> > wrote:
>>>
>>>> DataStream<MyKafkaMessage> MyKafkaMessageDataStream = env.addSource(
>>>>                 getStreamSource(env, parameterTool);
>>>>         );
>>>>
>>>>
>>>>
>>>>         public RichParallelSourceFunction<MyKafkaMessage>
>>>> getStreamSource(StreamExecutionEnvironment env, ParameterTool
>>>> parameterTool) {
>>>>
>>>>            // MyKAfkaMessage is a ProtoBuf message
>>>>             env.getConfig().registerTypeWi
>>>> thKryoSerializer(MyKafkaMessage.class, ProtobufSerializer.class);
>>>>
>>>>             KafkaDataSource<MyKafkaMessage> flinkCepConsumer =
>>>>                     new KafkaDataSource<MyKafkaMessage>(parameterTool,
>>>> new MyKafkaMessageSerDeSchema());
>>>>
>>>>             return flinkCepConsumer;
>>>>         }
>>>>
>>>>
>>>> public class KafkaDataSource<T> extends FlinkKafkaConsumer010<T> {
>>>>
>>>>     public KafkaDataSource(ParameterTool parameterTool,
>>>> DeserializationSchema<T> deserializer) {
>>>>         super(
>>>>                 Arrays.asList(parameterTool.ge
>>>> tRequired("topic").split(",")),
>>>>                 deserializer,
>>>>                 parameterTool.getProperties()
>>>>         );
>>>>
>>>>     }
>>>>
>>>> }
>>>>
>>>> public class MyKafkaMessageSerDeSchema implements
>>>> DeserializationSchema<MyKafkaMessage>, SerializationSchema<MyKafkaMessage>
>>>> {
>>>>
>>>>     @Override
>>>>     public MyKafkaMessage deserialize(byte[] message) throws
>>>> IOException {
>>>>         MyKafkaMessage MyKafkaMessage = null;
>>>>         try {
>>>>             MyKafkaMessage =  MyKafkaMessage.parseFrom(message);
>>>>         } catch (InvalidProtocolBufferException e) {
>>>>             e.printStackTrace();
>>>>         } finally {
>>>>             return MyKafkaMessage;
>>>>         }
>>>>     }
>>>>
>>>>     @Override
>>>>     public boolean isEndOfStream(MyKafkaMessage nextElement) {
>>>>         return false;
>>>>     }
>>>>
>>>>     @Override
>>>>     public TypeInformation<MyKafkaMessage> getProducedType() {
>>>>         return null;
>>>>     }
>>>>
>>>>     @Override
>>>>     public byte[] serialize(MyKafkaMessage element) {
>>>>         return new byte[0];
>>>>     }
>>>> }
>>>>
>>>> On Mon, Aug 28, 2017 at 8:26 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>
>>>>> Which version of Flink / Kafka are you using ?
>>>>>
>>>>> Can you show the snippet of code where you create the DataStream ?
>>>>>
>>>>> Cheers
>>>>>
>>>>> On Mon, Aug 28, 2017 at 7:38 AM, Sridhar Chellappa <
>>>>> flinken...@gmail.com> wrote:
>>>>>
>>>>>> Folks,
>>>>>>
>>>>>> I have a KafkaConsumer that I am trying to read messages from. When I
>>>>>> try to create a DataStream from the KafkConsumer (env.addSource()) I get
>>>>>> the following exception :
>>>>>>
>>>>>> Any idea on how can this happen?
>>>>>>
>>>>>> java.lang.NullPointerException
>>>>>>  at 
>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
>>>>>>  at 
>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
>>>>>>  at 
>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
>>>>>>  at 
>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
>>>>>>  at 
>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
>>>>>>  at 
>>>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
>>>>>>  at 
>>>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110)
>>>>>>  at 
>>>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264)
>>>>>>  at 
>>>>>> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86)
>>>>>>  at 
>>>>>> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149)
>>>>>>  at 
>>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)
>>>>>>  at 
>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
>>>>>>  at 
>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>>>>>>  at 
>>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
>>>>>>  at 
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
>>>>>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>>>>>  at java.lang.Thread.run(Thread.java:748)
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to