OK. I got past the problem. Basically, I had to change public class MyKafkaMessageSerDeSchema implements DeserializationSchema<MyKafkaMessage>, SerializationSchema<MyKafkaMessage> {
@Override public MyKafkaMessage deserialize(byte[] message) throws IOException { MyKafkaMessage MyKafkaMessage = null; try { MyKafkaMessage = MyKafkaMessage.parseFrom(message); } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } finally { return MyKafkaMessage; } } @Override public boolean isEndOfStream(MyKafkaMessage nextElement) { return false; } @Override public TypeInformation<MyKafkaMessage> getProducedType() { return TypeExtractor.getForClass(MyKafkaMessage.class);; -------------------------> Add Type Info } @Override public byte[] serialize(MyKafkaMessage element) { return element.byteArray(); --------------------------> modify serializer } } When I run my program, I get another exception : java.lang.NullPointerException at shaded.com.google.protobuf.UnmodifiableLazyStringList.size(UnmodifiableLazyStringList.java:68) at java.util.AbstractList.add(AbstractList.java:108) at com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:131) at com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:22) at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:176) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:236) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264) at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) On Tue, Aug 29, 2017 at 8:43 AM, Ted Yu <yuzhih...@gmail.com> wrote: > The NPE came from this line: > > StreamRecord<T> copy = castRecord.copy(serializer. > copy(castRecord.getValue())); > > Either serializer or castRecord was null. > > I wonder if this has been fixed in 1.3.2 release. > > On Mon, Aug 28, 2017 at 7:24 PM, Sridhar Chellappa <flinken...@gmail.com> > wrote: > >> Kafka Version is 0.10.0 >> >> On Tue, Aug 29, 2017 at 6:43 AM, Sridhar Chellappa <flinken...@gmail.com> >> wrote: >> >>> 1.3.0 >>> >>> On Mon, Aug 28, 2017 at 10:09 PM, Ted Yu <yuzhih...@gmail.com> wrote: >>> >>>> Which Flink version are you using (so that line numbers can be matched >>>> with source code) ? >>>> >>>> On Mon, Aug 28, 2017 at 9:16 AM, Sridhar Chellappa < >>>> flinken...@gmail.com> wrote: >>>> >>>>> DataStream<MyKafkaMessage> MyKafkaMessageDataStream = env.addSource( >>>>> getStreamSource(env, parameterTool); >>>>> ); >>>>> >>>>> >>>>> >>>>> public RichParallelSourceFunction<MyKafkaMessage> >>>>> getStreamSource(StreamExecutionEnvironment env, ParameterTool >>>>> parameterTool) { >>>>> >>>>> // MyKAfkaMessage is a ProtoBuf message >>>>> env.getConfig().registerTypeWi >>>>> thKryoSerializer(MyKafkaMessage.class, ProtobufSerializer.class); >>>>> >>>>> KafkaDataSource<MyKafkaMessage> flinkCepConsumer = >>>>> new KafkaDataSource<MyKafkaMessage>(parameterTool, >>>>> new MyKafkaMessageSerDeSchema()); >>>>> >>>>> return flinkCepConsumer; >>>>> } >>>>> >>>>> >>>>> public class KafkaDataSource<T> extends FlinkKafkaConsumer010<T> { >>>>> >>>>> public KafkaDataSource(ParameterTool parameterTool, >>>>> DeserializationSchema<T> deserializer) { >>>>> super( >>>>> Arrays.asList(parameterTool.ge >>>>> tRequired("topic").split(",")), >>>>> deserializer, >>>>> parameterTool.getProperties() >>>>> ); >>>>> >>>>> } >>>>> >>>>> } >>>>> >>>>> public class MyKafkaMessageSerDeSchema implements >>>>> DeserializationSchema<MyKafkaMessage>, SerializationSchema<MyKafkaMessage> >>>>> { >>>>> >>>>> @Override >>>>> public MyKafkaMessage deserialize(byte[] message) throws >>>>> IOException { >>>>> MyKafkaMessage MyKafkaMessage = null; >>>>> try { >>>>> MyKafkaMessage = MyKafkaMessage.parseFrom(message); >>>>> } catch (InvalidProtocolBufferException e) { >>>>> e.printStackTrace(); >>>>> } finally { >>>>> return MyKafkaMessage; >>>>> } >>>>> } >>>>> >>>>> @Override >>>>> public boolean isEndOfStream(MyKafkaMessage nextElement) { >>>>> return false; >>>>> } >>>>> >>>>> @Override >>>>> public TypeInformation<MyKafkaMessage> getProducedType() { >>>>> return null; >>>>> } >>>>> >>>>> @Override >>>>> public byte[] serialize(MyKafkaMessage element) { >>>>> return new byte[0]; >>>>> } >>>>> } >>>>> >>>>> On Mon, Aug 28, 2017 at 8:26 PM, Ted Yu <yuzhih...@gmail.com> wrote: >>>>> >>>>>> Which version of Flink / Kafka are you using ? >>>>>> >>>>>> Can you show the snippet of code where you create the DataStream ? >>>>>> >>>>>> Cheers >>>>>> >>>>>> On Mon, Aug 28, 2017 at 7:38 AM, Sridhar Chellappa < >>>>>> flinken...@gmail.com> wrote: >>>>>> >>>>>>> Folks, >>>>>>> >>>>>>> I have a KafkaConsumer that I am trying to read messages from. When >>>>>>> I try to create a DataStream from the KafkConsumer (env.addSource()) I >>>>>>> get >>>>>>> the following exception : >>>>>>> >>>>>>> Any idea on how can this happen? >>>>>>> >>>>>>> java.lang.NullPointerException >>>>>>> at >>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526) >>>>>>> at >>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) >>>>>>> at >>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) >>>>>>> at >>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) >>>>>>> at >>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) >>>>>>> at >>>>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103) >>>>>>> at >>>>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110) >>>>>>> at >>>>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264) >>>>>>> at >>>>>>> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86) >>>>>>> at >>>>>>> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149) >>>>>>> at >>>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449) >>>>>>> at >>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) >>>>>>> at >>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) >>>>>>> at >>>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95) >>>>>>> at >>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262) >>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) >>>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >