DataStream<MyKafkaMessage> MyKafkaMessageDataStream = env.addSource(
                getStreamSource(env, parameterTool);
        );



        public RichParallelSourceFunction<MyKafkaMessage>
getStreamSource(StreamExecutionEnvironment env, ParameterTool
parameterTool) {

           // MyKAfkaMessage is a ProtoBuf message

env.getConfig().registerTypeWithKryoSerializer(MyKafkaMessage.class,
ProtobufSerializer.class);

            KafkaDataSource<MyKafkaMessage> flinkCepConsumer =
                    new KafkaDataSource<MyKafkaMessage>(parameterTool, new
MyKafkaMessageSerDeSchema());

            return flinkCepConsumer;
        }


public class KafkaDataSource<T> extends FlinkKafkaConsumer010<T> {

    public KafkaDataSource(ParameterTool parameterTool,
DeserializationSchema<T> deserializer) {
        super(

Arrays.asList(parameterTool.getRequired("topic").split(",")),
                deserializer,
                parameterTool.getProperties()
        );

    }

}

public class MyKafkaMessageSerDeSchema implements
DeserializationSchema<MyKafkaMessage>, SerializationSchema<MyKafkaMessage> {

    @Override
    public MyKafkaMessage deserialize(byte[] message) throws IOException {
        MyKafkaMessage MyKafkaMessage = null;
        try {
            MyKafkaMessage =  MyKafkaMessage.parseFrom(message);
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        } finally {
            return MyKafkaMessage;
        }
    }

    @Override
    public boolean isEndOfStream(MyKafkaMessage nextElement) {
        return false;
    }

    @Override
    public TypeInformation<MyKafkaMessage> getProducedType() {
        return null;
    }

    @Override
    public byte[] serialize(MyKafkaMessage element) {
        return new byte[0];
    }
}

On Mon, Aug 28, 2017 at 8:26 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> Which version of Flink / Kafka are you using ?
>
> Can you show the snippet of code where you create the DataStream ?
>
> Cheers
>
> On Mon, Aug 28, 2017 at 7:38 AM, Sridhar Chellappa <flinken...@gmail.com>
> wrote:
>
>> Folks,
>>
>> I have a KafkaConsumer that I am trying to read messages from. When I try
>> to create a DataStream from the KafkConsumer (env.addSource()) I get the
>> following exception :
>>
>> Any idea on how can this happen?
>>
>> java.lang.NullPointerException
>>      at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
>>      at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
>>      at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
>>      at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
>>      at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
>>      at 
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
>>      at 
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110)
>>      at 
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264)
>>      at 
>> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86)
>>      at 
>> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149)
>>      at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)
>>      at 
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
>>      at 
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>>      at 
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
>>      at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
>>      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>      at java.lang.Thread.run(Thread.java:748)
>>
>>
>

Reply via email to