1.3.0 On Mon, Aug 28, 2017 at 10:09 PM, Ted Yu <yuzhih...@gmail.com> wrote:
> Which Flink version are you using (so that line numbers can be matched > with source code) ? > > On Mon, Aug 28, 2017 at 9:16 AM, Sridhar Chellappa <flinken...@gmail.com> > wrote: > >> DataStream<MyKafkaMessage> MyKafkaMessageDataStream = env.addSource( >> getStreamSource(env, parameterTool); >> ); >> >> >> >> public RichParallelSourceFunction<MyKafkaMessage> >> getStreamSource(StreamExecutionEnvironment env, ParameterTool >> parameterTool) { >> >> // MyKAfkaMessage is a ProtoBuf message >> >> env.getConfig().registerTypeWithKryoSerializer(MyKafkaMessage.class, >> ProtobufSerializer.class); >> >> KafkaDataSource<MyKafkaMessage> flinkCepConsumer = >> new KafkaDataSource<MyKafkaMessage>(parameterTool, >> new MyKafkaMessageSerDeSchema()); >> >> return flinkCepConsumer; >> } >> >> >> public class KafkaDataSource<T> extends FlinkKafkaConsumer010<T> { >> >> public KafkaDataSource(ParameterTool parameterTool, >> DeserializationSchema<T> deserializer) { >> super( >> Arrays.asList(parameterTool.ge >> tRequired("topic").split(",")), >> deserializer, >> parameterTool.getProperties() >> ); >> >> } >> >> } >> >> public class MyKafkaMessageSerDeSchema implements >> DeserializationSchema<MyKafkaMessage>, SerializationSchema<MyKafkaMessage> >> { >> >> @Override >> public MyKafkaMessage deserialize(byte[] message) throws IOException { >> MyKafkaMessage MyKafkaMessage = null; >> try { >> MyKafkaMessage = MyKafkaMessage.parseFrom(message); >> } catch (InvalidProtocolBufferException e) { >> e.printStackTrace(); >> } finally { >> return MyKafkaMessage; >> } >> } >> >> @Override >> public boolean isEndOfStream(MyKafkaMessage nextElement) { >> return false; >> } >> >> @Override >> public TypeInformation<MyKafkaMessage> getProducedType() { >> return null; >> } >> >> @Override >> public byte[] serialize(MyKafkaMessage element) { >> return new byte[0]; >> } >> } >> >> On Mon, Aug 28, 2017 at 8:26 PM, Ted Yu <yuzhih...@gmail.com> wrote: >> >>> Which version of Flink / Kafka are you using ? >>> >>> Can you show the snippet of code where you create the DataStream ? >>> >>> Cheers >>> >>> On Mon, Aug 28, 2017 at 7:38 AM, Sridhar Chellappa <flinken...@gmail.com >>> > wrote: >>> >>>> Folks, >>>> >>>> I have a KafkaConsumer that I am trying to read messages from. When I >>>> try to create a DataStream from the KafkConsumer (env.addSource()) I get >>>> the following exception : >>>> >>>> Any idea on how can this happen? >>>> >>>> java.lang.NullPointerException >>>> at >>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) >>>> at >>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) >>>> at >>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) >>>> at >>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103) >>>> at >>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110) >>>> at >>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264) >>>> at >>>> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86) >>>> at >>>> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149) >>>> at >>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449) >>>> at >>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) >>>> at >>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) >>>> at java.lang.Thread.run(Thread.java:748) >>>> >>>> >>> >> >