Hi there,

In my case, I want to use GeoIP2 in Flink Streaming, I know I need to serialize 
geoip2 related classes using Kryo. But I did know how to do it.

Flink version: 1.0.0
Kafka version: 0.9.0.0
Deploy Mode: Local

My demo code as below:

        File database = new File(“/home/user/GeoIP2-City.mmdb");
        final DatabaseReader reader = new 
DatabaseReader.Builder(database).build();
        DataStream<String> messageStream = env
                .addSource(new FlinkKafkaConsumer09<String>("test", new 
SimpleStringSchema(), properties));

        messageStream
                .rebalance()
                .map(new MapFunction<String, String>() {
                    public String map(String value) throws Exception {

                        InetAddress ipAddress = InetAddress.getByName(value);
                        CityResponse response = reader.city(ipAddress);
                        Country country = response.getCountry();
                        return "Kafka and Flink says: " + value + " " + country;
                    }
                }).print();

        env.execute(); 


I got the error below:

Object FlinkTest$1@7c7d3c46 not serializable
        
org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:99)
        org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:61)
        
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1219)
        
org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:160)
        
org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:505)
        FlinkTest.main(FlinkTest.java:36)

Any one can help me ?

Reply via email to