Re: Null Pointer Exception on Trying to read a message from Kafka
OK. I got past the problem. Basically, I had to change public class MyKafkaMessageSerDeSchema implements DeserializationSchema, SerializationSchema { @Override public MyKafkaMessage deserialize(byte[] message) throws IOException { MyKafkaMessage MyKafkaMessage = null; try { MyKafkaMessage = MyKafkaMessage.parseFrom(message); } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } finally { return MyKafkaMessage; } } @Override public boolean isEndOfStream(MyKafkaMessage nextElement) { return false; } @Override public TypeInformation getProducedType() { return TypeExtractor.getForClass(MyKafkaMessage.class);; -> Add Type Info } @Override public byte[] serialize(MyKafkaMessage element) { return element.byteArray(); --> modify serializer } } When I run my program, I get another exception : java.lang.NullPointerException at shaded.com.google.protobuf.UnmodifiableLazyStringList.size(UnmodifiableLazyStringList.java:68) at java.util.AbstractList.add(AbstractList.java:108) at com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:131) at com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:22) at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:176) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:236) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264) at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) On Tue, Aug 29, 2017 at 8:43 AM, Ted Yuwrote: > The NPE came from this line: > > StreamRecord copy = castRecord.copy(serializer. > copy(castRecord.getValue())); > > Either serializer or castRecord was null. > > I wonder if this has been fixed in 1.3.2 release. > > On Mon, Aug 28, 2017 at 7:24 PM, Sridhar Chellappa > wrote: > >> Kafka Version is 0.10.0 >> >> On Tue, Aug 29, 2017 at 6:43 AM, Sridhar Chellappa >> wrote: >> >>> 1.3.0 >>> >>> On Mon, Aug 28, 2017 at 10:09 PM, Ted Yu wrote: >>> Which Flink version are you using (so that line numbers can be matched
Re: Null Pointer Exception on Trying to read a message from Kafka
The NPE came from this line: StreamRecord copy = castRecord.copy(serializer.copy(castRecord.getValue())); Either serializer or castRecord was null. I wonder if this has been fixed in 1.3.2 release. On Mon, Aug 28, 2017 at 7:24 PM, Sridhar Chellappawrote: > Kafka Version is 0.10.0 > > On Tue, Aug 29, 2017 at 6:43 AM, Sridhar Chellappa > wrote: > >> 1.3.0 >> >> On Mon, Aug 28, 2017 at 10:09 PM, Ted Yu wrote: >> >>> Which Flink version are you using (so that line numbers can be matched >>> with source code) ? >>> >>> On Mon, Aug 28, 2017 at 9:16 AM, Sridhar Chellappa >> > wrote: >>> DataStream MyKafkaMessageDataStream = env.addSource( getStreamSource(env, parameterTool); ); public RichParallelSourceFunction getStreamSource(StreamExecutionEnvironment env, ParameterTool parameterTool) { // MyKAfkaMessage is a ProtoBuf message env.getConfig().registerTypeWi thKryoSerializer(MyKafkaMessage.class, ProtobufSerializer.class); KafkaDataSource flinkCepConsumer = new KafkaDataSource(parameterTool, new MyKafkaMessageSerDeSchema()); return flinkCepConsumer; } public class KafkaDataSource extends FlinkKafkaConsumer010 { public KafkaDataSource(ParameterTool parameterTool, DeserializationSchema deserializer) { super( Arrays.asList(parameterTool.ge tRequired("topic").split(",")), deserializer, parameterTool.getProperties() ); } } public class MyKafkaMessageSerDeSchema implements DeserializationSchema, SerializationSchema { @Override public MyKafkaMessage deserialize(byte[] message) throws IOException { MyKafkaMessage MyKafkaMessage = null; try { MyKafkaMessage = MyKafkaMessage.parseFrom(message); } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } finally { return MyKafkaMessage; } } @Override public boolean isEndOfStream(MyKafkaMessage nextElement) { return false; } @Override public TypeInformation getProducedType() { return null; } @Override public byte[] serialize(MyKafkaMessage element) { return new byte[0]; } } On Mon, Aug 28, 2017 at 8:26 PM, Ted Yu wrote: > Which version of Flink / Kafka are you using ? > > Can you show the snippet of code where you create the DataStream ? > > Cheers > > On Mon, Aug 28, 2017 at 7:38 AM, Sridhar Chellappa < > flinken...@gmail.com> wrote: > >> Folks, >> >> I have a KafkaConsumer that I am trying to read messages from. When I >> try to create a DataStream from the KafkConsumer (env.addSource()) I get >> the following exception : >> >> Any idea on how can this happen? >> >> java.lang.NullPointerException >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) >> at >> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103) >> at >> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110) >> at >> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264) >> at >> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86) >> at >> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149) >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449) >> at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) >> at >>
Re: Null Pointer Exception on Trying to read a message from Kafka
Kafka Version is 0.10.0 On Tue, Aug 29, 2017 at 6:43 AM, Sridhar Chellappawrote: > 1.3.0 > > On Mon, Aug 28, 2017 at 10:09 PM, Ted Yu wrote: > >> Which Flink version are you using (so that line numbers can be matched >> with source code) ? >> >> On Mon, Aug 28, 2017 at 9:16 AM, Sridhar Chellappa >> wrote: >> >>> DataStream MyKafkaMessageDataStream = env.addSource( >>> getStreamSource(env, parameterTool); >>> ); >>> >>> >>> >>> public RichParallelSourceFunction >>> getStreamSource(StreamExecutionEnvironment env, ParameterTool >>> parameterTool) { >>> >>>// MyKAfkaMessage is a ProtoBuf message >>> >>> env.getConfig().registerTypeWithKryoSerializer(MyKafkaMessage.class, >>> ProtobufSerializer.class); >>> >>> KafkaDataSource flinkCepConsumer = >>> new KafkaDataSource(parameterTool, >>> new MyKafkaMessageSerDeSchema()); >>> >>> return flinkCepConsumer; >>> } >>> >>> >>> public class KafkaDataSource extends FlinkKafkaConsumer010 { >>> >>> public KafkaDataSource(ParameterTool parameterTool, >>> DeserializationSchema deserializer) { >>> super( >>> Arrays.asList(parameterTool.ge >>> tRequired("topic").split(",")), >>> deserializer, >>> parameterTool.getProperties() >>> ); >>> >>> } >>> >>> } >>> >>> public class MyKafkaMessageSerDeSchema implements >>> DeserializationSchema, SerializationSchema >>> { >>> >>> @Override >>> public MyKafkaMessage deserialize(byte[] message) throws IOException >>> { >>> MyKafkaMessage MyKafkaMessage = null; >>> try { >>> MyKafkaMessage = MyKafkaMessage.parseFrom(message); >>> } catch (InvalidProtocolBufferException e) { >>> e.printStackTrace(); >>> } finally { >>> return MyKafkaMessage; >>> } >>> } >>> >>> @Override >>> public boolean isEndOfStream(MyKafkaMessage nextElement) { >>> return false; >>> } >>> >>> @Override >>> public TypeInformation getProducedType() { >>> return null; >>> } >>> >>> @Override >>> public byte[] serialize(MyKafkaMessage element) { >>> return new byte[0]; >>> } >>> } >>> >>> On Mon, Aug 28, 2017 at 8:26 PM, Ted Yu wrote: >>> Which version of Flink / Kafka are you using ? Can you show the snippet of code where you create the DataStream ? Cheers On Mon, Aug 28, 2017 at 7:38 AM, Sridhar Chellappa < flinken...@gmail.com> wrote: > Folks, > > I have a KafkaConsumer that I am trying to read messages from. When I > try to create a DataStream from the KafkConsumer (env.addSource()) I get > the following exception : > > Any idea on how can this happen? > > java.lang.NullPointerException > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110) > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264) > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86) > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:748) > > >>> >> >
Re: Null Pointer Exception on Trying to read a message from Kafka
1.3.0 On Mon, Aug 28, 2017 at 10:09 PM, Ted Yuwrote: > Which Flink version are you using (so that line numbers can be matched > with source code) ? > > On Mon, Aug 28, 2017 at 9:16 AM, Sridhar Chellappa > wrote: > >> DataStream MyKafkaMessageDataStream = env.addSource( >> getStreamSource(env, parameterTool); >> ); >> >> >> >> public RichParallelSourceFunction >> getStreamSource(StreamExecutionEnvironment env, ParameterTool >> parameterTool) { >> >>// MyKAfkaMessage is a ProtoBuf message >> >> env.getConfig().registerTypeWithKryoSerializer(MyKafkaMessage.class, >> ProtobufSerializer.class); >> >> KafkaDataSource flinkCepConsumer = >> new KafkaDataSource(parameterTool, >> new MyKafkaMessageSerDeSchema()); >> >> return flinkCepConsumer; >> } >> >> >> public class KafkaDataSource extends FlinkKafkaConsumer010 { >> >> public KafkaDataSource(ParameterTool parameterTool, >> DeserializationSchema deserializer) { >> super( >> Arrays.asList(parameterTool.ge >> tRequired("topic").split(",")), >> deserializer, >> parameterTool.getProperties() >> ); >> >> } >> >> } >> >> public class MyKafkaMessageSerDeSchema implements >> DeserializationSchema, SerializationSchema >> { >> >> @Override >> public MyKafkaMessage deserialize(byte[] message) throws IOException { >> MyKafkaMessage MyKafkaMessage = null; >> try { >> MyKafkaMessage = MyKafkaMessage.parseFrom(message); >> } catch (InvalidProtocolBufferException e) { >> e.printStackTrace(); >> } finally { >> return MyKafkaMessage; >> } >> } >> >> @Override >> public boolean isEndOfStream(MyKafkaMessage nextElement) { >> return false; >> } >> >> @Override >> public TypeInformation getProducedType() { >> return null; >> } >> >> @Override >> public byte[] serialize(MyKafkaMessage element) { >> return new byte[0]; >> } >> } >> >> On Mon, Aug 28, 2017 at 8:26 PM, Ted Yu wrote: >> >>> Which version of Flink / Kafka are you using ? >>> >>> Can you show the snippet of code where you create the DataStream ? >>> >>> Cheers >>> >>> On Mon, Aug 28, 2017 at 7:38 AM, Sridhar Chellappa >> > wrote: >>> Folks, I have a KafkaConsumer that I am trying to read messages from. When I try to create a DataStream from the KafkConsumer (env.addSource()) I get the following exception : Any idea on how can this happen? java.lang.NullPointerException at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264) at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) >>> >> >
Re: Null Pointer Exception on Trying to read a message from Kafka
Which Flink version are you using (so that line numbers can be matched with source code) ? On Mon, Aug 28, 2017 at 9:16 AM, Sridhar Chellappawrote: > DataStream MyKafkaMessageDataStream = env.addSource( > getStreamSource(env, parameterTool); > ); > > > > public RichParallelSourceFunction > getStreamSource(StreamExecutionEnvironment > env, ParameterTool parameterTool) { > >// MyKAfkaMessage is a ProtoBuf message > > env.getConfig().registerTypeWithKryoSerializer(MyKafkaMessage.class, > ProtobufSerializer.class); > > KafkaDataSource flinkCepConsumer = > new KafkaDataSource(parameterTool, > new MyKafkaMessageSerDeSchema()); > > return flinkCepConsumer; > } > > > public class KafkaDataSource extends FlinkKafkaConsumer010 { > > public KafkaDataSource(ParameterTool parameterTool, > DeserializationSchema deserializer) { > super( > Arrays.asList(parameterTool.getRequired("topic").split("," > )), > deserializer, > parameterTool.getProperties() > ); > > } > > } > > public class MyKafkaMessageSerDeSchema implements > DeserializationSchema, > SerializationSchema { > > @Override > public MyKafkaMessage deserialize(byte[] message) throws IOException { > MyKafkaMessage MyKafkaMessage = null; > try { > MyKafkaMessage = MyKafkaMessage.parseFrom(message); > } catch (InvalidProtocolBufferException e) { > e.printStackTrace(); > } finally { > return MyKafkaMessage; > } > } > > @Override > public boolean isEndOfStream(MyKafkaMessage nextElement) { > return false; > } > > @Override > public TypeInformation getProducedType() { > return null; > } > > @Override > public byte[] serialize(MyKafkaMessage element) { > return new byte[0]; > } > } > > On Mon, Aug 28, 2017 at 8:26 PM, Ted Yu wrote: > >> Which version of Flink / Kafka are you using ? >> >> Can you show the snippet of code where you create the DataStream ? >> >> Cheers >> >> On Mon, Aug 28, 2017 at 7:38 AM, Sridhar Chellappa >> wrote: >> >>> Folks, >>> >>> I have a KafkaConsumer that I am trying to read messages from. When I >>> try to create a DataStream from the KafkConsumer (env.addSource()) I get >>> the following exception : >>> >>> Any idea on how can this happen? >>> >>> java.lang.NullPointerException >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) >>> at >>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103) >>> at >>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110) >>> at >>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264) >>> at >>> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86) >>> at >>> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149) >>> at >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449) >>> at >>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) >>> at >>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) >>> at >>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) >>> at java.lang.Thread.run(Thread.java:748) >>> >>> >> >
Re: Null Pointer Exception on Trying to read a message from Kafka
DataStream MyKafkaMessageDataStream = env.addSource( getStreamSource(env, parameterTool); ); public RichParallelSourceFunction getStreamSource(StreamExecutionEnvironment env, ParameterTool parameterTool) { // MyKAfkaMessage is a ProtoBuf message env.getConfig().registerTypeWithKryoSerializer(MyKafkaMessage.class, ProtobufSerializer.class); KafkaDataSource flinkCepConsumer = new KafkaDataSource(parameterTool, new MyKafkaMessageSerDeSchema()); return flinkCepConsumer; } public class KafkaDataSource extends FlinkKafkaConsumer010 { public KafkaDataSource(ParameterTool parameterTool, DeserializationSchema deserializer) { super( Arrays.asList(parameterTool.getRequired("topic").split(",")), deserializer, parameterTool.getProperties() ); } } public class MyKafkaMessageSerDeSchema implements DeserializationSchema, SerializationSchema { @Override public MyKafkaMessage deserialize(byte[] message) throws IOException { MyKafkaMessage MyKafkaMessage = null; try { MyKafkaMessage = MyKafkaMessage.parseFrom(message); } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } finally { return MyKafkaMessage; } } @Override public boolean isEndOfStream(MyKafkaMessage nextElement) { return false; } @Override public TypeInformation getProducedType() { return null; } @Override public byte[] serialize(MyKafkaMessage element) { return new byte[0]; } } On Mon, Aug 28, 2017 at 8:26 PM, Ted Yuwrote: > Which version of Flink / Kafka are you using ? > > Can you show the snippet of code where you create the DataStream ? > > Cheers > > On Mon, Aug 28, 2017 at 7:38 AM, Sridhar Chellappa > wrote: > >> Folks, >> >> I have a KafkaConsumer that I am trying to read messages from. When I try >> to create a DataStream from the KafkConsumer (env.addSource()) I get the >> following exception : >> >> Any idea on how can this happen? >> >> java.lang.NullPointerException >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) >> at >> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103) >> at >> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110) >> at >> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264) >> at >> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86) >> at >> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149) >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449) >> at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) >> at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) >> at >> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) >> at java.lang.Thread.run(Thread.java:748) >> >> >
Re: Null Pointer Exception on Trying to read a message from Kafka
Which version of Flink / Kafka are you using ? Can you show the snippet of code where you create the DataStream ? Cheers On Mon, Aug 28, 2017 at 7:38 AM, Sridhar Chellappawrote: > Folks, > > I have a KafkaConsumer that I am trying to read messages from. When I try > to create a DataStream from the KafkConsumer (env.addSource()) I get the > following exception : > > Any idea on how can this happen? > > java.lang.NullPointerException > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110) > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264) > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86) > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:748) > >