The NPE came from this line: StreamRecord<T> copy = castRecord.copy(serializer.copy(castRecord.getValue()));
Either serializer or castRecord was null. I wonder if this has been fixed in 1.3.2 release. On Mon, Aug 28, 2017 at 7:24 PM, Sridhar Chellappa <flinken...@gmail.com> wrote: > Kafka Version is 0.10.0 > > On Tue, Aug 29, 2017 at 6:43 AM, Sridhar Chellappa <flinken...@gmail.com> > wrote: > >> 1.3.0 >> >> On Mon, Aug 28, 2017 at 10:09 PM, Ted Yu <yuzhih...@gmail.com> wrote: >> >>> Which Flink version are you using (so that line numbers can be matched >>> with source code) ? >>> >>> On Mon, Aug 28, 2017 at 9:16 AM, Sridhar Chellappa <flinken...@gmail.com >>> > wrote: >>> >>>> DataStream<MyKafkaMessage> MyKafkaMessageDataStream = env.addSource( >>>> getStreamSource(env, parameterTool); >>>> ); >>>> >>>> >>>> >>>> public RichParallelSourceFunction<MyKafkaMessage> >>>> getStreamSource(StreamExecutionEnvironment env, ParameterTool >>>> parameterTool) { >>>> >>>> // MyKAfkaMessage is a ProtoBuf message >>>> env.getConfig().registerTypeWi >>>> thKryoSerializer(MyKafkaMessage.class, ProtobufSerializer.class); >>>> >>>> KafkaDataSource<MyKafkaMessage> flinkCepConsumer = >>>> new KafkaDataSource<MyKafkaMessage>(parameterTool, >>>> new MyKafkaMessageSerDeSchema()); >>>> >>>> return flinkCepConsumer; >>>> } >>>> >>>> >>>> public class KafkaDataSource<T> extends FlinkKafkaConsumer010<T> { >>>> >>>> public KafkaDataSource(ParameterTool parameterTool, >>>> DeserializationSchema<T> deserializer) { >>>> super( >>>> Arrays.asList(parameterTool.ge >>>> tRequired("topic").split(",")), >>>> deserializer, >>>> parameterTool.getProperties() >>>> ); >>>> >>>> } >>>> >>>> } >>>> >>>> public class MyKafkaMessageSerDeSchema implements >>>> DeserializationSchema<MyKafkaMessage>, SerializationSchema<MyKafkaMessage> >>>> { >>>> >>>> @Override >>>> public MyKafkaMessage deserialize(byte[] message) throws >>>> IOException { >>>> MyKafkaMessage MyKafkaMessage = null; >>>> try { >>>> MyKafkaMessage = MyKafkaMessage.parseFrom(message); >>>> } catch (InvalidProtocolBufferException e) { >>>> e.printStackTrace(); >>>> } finally { >>>> return MyKafkaMessage; >>>> } >>>> } >>>> >>>> @Override >>>> public boolean isEndOfStream(MyKafkaMessage nextElement) { >>>> return false; >>>> } >>>> >>>> @Override >>>> public TypeInformation<MyKafkaMessage> getProducedType() { >>>> return null; >>>> } >>>> >>>> @Override >>>> public byte[] serialize(MyKafkaMessage element) { >>>> return new byte[0]; >>>> } >>>> } >>>> >>>> On Mon, Aug 28, 2017 at 8:26 PM, Ted Yu <yuzhih...@gmail.com> wrote: >>>> >>>>> Which version of Flink / Kafka are you using ? >>>>> >>>>> Can you show the snippet of code where you create the DataStream ? >>>>> >>>>> Cheers >>>>> >>>>> On Mon, Aug 28, 2017 at 7:38 AM, Sridhar Chellappa < >>>>> flinken...@gmail.com> wrote: >>>>> >>>>>> Folks, >>>>>> >>>>>> I have a KafkaConsumer that I am trying to read messages from. When I >>>>>> try to create a DataStream from the KafkConsumer (env.addSource()) I get >>>>>> the following exception : >>>>>> >>>>>> Any idea on how can this happen? >>>>>> >>>>>> java.lang.NullPointerException >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526) >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) >>>>>> at >>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) >>>>>> at >>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) >>>>>> at >>>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103) >>>>>> at >>>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110) >>>>>> at >>>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264) >>>>>> at >>>>>> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86) >>>>>> at >>>>>> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149) >>>>>> at >>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449) >>>>>> at >>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) >>>>>> at >>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95) >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262) >>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) >>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>> >>>>>> >>>>> >>>> >>> >> >