I have following code:

//////////////////////
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
MyCustomClassDeserializer.class.getName());

FlinkKafkaConsumer<MyCustomClass> kafkaConsumer = new FlinkKafkaConsumer(
                    "test-kafka=topic",
                    new SimpleStringSchema(),
                    properties);

final StreamExecutionEnvironment streamEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<MyCustomClass> kafkaInputStream =
streamEnv.addSource(kafkaConsumer);

DataStream<String> stringStream = kafkaInputStream
                    .map(new MapFunction<MyCustomClass,String>() {
                        @Override
                        public String map(MyCustomClass message) {
                            logger.info("--- Received message : " +
message.toString());
                            return message.toString();
                        }
                    });

streamEnv.execute("Published messages");

///////
MyCustomClassDeserializer is implemented as:

public MyCustomClass deserialize(String s, byte[] bytes) {
        return (MyCustomClass) JsonUtil.convertBytesToObject(bytes,
MyCustomClass.class);
    }

When I run this program locally, I get error:
Caused by: org.apache.flink.api.common.functions.InvalidTypesException:
Input mismatch: Basic type expected.

Why I get this error?

Reply via email to