Which Flink version are you using (so that line numbers can be matched with source code) ?
On Mon, Aug 28, 2017 at 9:16 AM, Sridhar Chellappa <flinken...@gmail.com> wrote: > DataStream<MyKafkaMessage> MyKafkaMessageDataStream = env.addSource( > getStreamSource(env, parameterTool); > ); > > > > public RichParallelSourceFunction<MyKafkaMessage> > getStreamSource(StreamExecutionEnvironment > env, ParameterTool parameterTool) { > > // MyKAfkaMessage is a ProtoBuf message > > env.getConfig().registerTypeWithKryoSerializer(MyKafkaMessage.class, > ProtobufSerializer.class); > > KafkaDataSource<MyKafkaMessage> flinkCepConsumer = > new KafkaDataSource<MyKafkaMessage>(parameterTool, > new MyKafkaMessageSerDeSchema()); > > return flinkCepConsumer; > } > > > public class KafkaDataSource<T> extends FlinkKafkaConsumer010<T> { > > public KafkaDataSource(ParameterTool parameterTool, > DeserializationSchema<T> deserializer) { > super( > Arrays.asList(parameterTool.getRequired("topic").split("," > )), > deserializer, > parameterTool.getProperties() > ); > > } > > } > > public class MyKafkaMessageSerDeSchema implements > DeserializationSchema<MyKafkaMessage>, > SerializationSchema<MyKafkaMessage> { > > @Override > public MyKafkaMessage deserialize(byte[] message) throws IOException { > MyKafkaMessage MyKafkaMessage = null; > try { > MyKafkaMessage = MyKafkaMessage.parseFrom(message); > } catch (InvalidProtocolBufferException e) { > e.printStackTrace(); > } finally { > return MyKafkaMessage; > } > } > > @Override > public boolean isEndOfStream(MyKafkaMessage nextElement) { > return false; > } > > @Override > public TypeInformation<MyKafkaMessage> getProducedType() { > return null; > } > > @Override > public byte[] serialize(MyKafkaMessage element) { > return new byte[0]; > } > } > > On Mon, Aug 28, 2017 at 8:26 PM, Ted Yu <yuzhih...@gmail.com> wrote: > >> Which version of Flink / Kafka are you using ? >> >> Can you show the snippet of code where you create the DataStream ? >> >> Cheers >> >> On Mon, Aug 28, 2017 at 7:38 AM, Sridhar Chellappa <flinken...@gmail.com> >> wrote: >> >>> Folks, >>> >>> I have a KafkaConsumer that I am trying to read messages from. When I >>> try to create a DataStream from the KafkConsumer (env.addSource()) I get >>> the following exception : >>> >>> Any idea on how can this happen? >>> >>> java.lang.NullPointerException >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) >>> at >>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103) >>> at >>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110) >>> at >>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264) >>> at >>> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86) >>> at >>> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149) >>> at >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449) >>> at >>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) >>> at >>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) >>> at >>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) >>> at java.lang.Thread.run(Thread.java:748) >>> >>> >> >