in getProducedType(), replace the implementation with:

return new TupleTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, TypeExtractor.getForClass(CustomObject.class));

On 13.06.2017 17:18, AndreaKinn wrote:
Can I ask you to help me? I trying to implement a CustomDeserializer
My kafka messages are composed by KeyedMessages where key and messages are
strings.
I created a new class named CustomObject to manage the message string
because it's more complex then a simple string.


public class CustomDeserializer implements
KeyedDeserializationSchema<Tuple2&lt;String,CustomObject>>{

        @Override
        public boolean isEndOfStream(Tuple2<String, CustomJSONObject> 
nextElement)
{
                return false;
        }

        @Override
        public TypeInformation<Tuple2&lt;String, CustomJSONObject>>
getProducedType() {
                return null;
        }

        @Override
        public Tuple2<String, CustomJSONObject> deserialize(byte[] messageKey,
byte[] message, String topic, int partition, long offset)
                        throws IOException {
                
                String key = new String(messageKey);
                String msg = new String(message);
                CustomObject customObj = new CustomObject(msg);
                
                Tuple2<String,CustomObject> tuple = new 
Tuple2<String,CustomObject>(key,
customObj);
                return tuple;
        }
}

Questions:

- I don't understand what is getProducedType method and its usefulness.
- Which methods have I to implement in my CustomObject class?

My main:

DataStream<Tuple2&lt;String,CustomJSONObject>> stream = env.addSource(new
FlinkKafkaConsumer010<>("topicTest", new CustomDeserializer(),
properties)).rebalance();

stream.print();

If I execute it I get a nullPointerException so I imagine miss something in
CustomObject class: I have implemented just a toString() method.




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-t-get-keyed-messages-from-Kafka-tp13687p13702.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Reply via email to