Hi there, In my case, I want to use GeoIP2 in Flink Streaming, I know I need to serialize geoip2 related classes using Kryo. But I did know how to do it.
Flink version: 1.0.0 Kafka version: 0.9.0.0 Deploy Mode: Local My demo code as below: File database = new File(“/home/user/GeoIP2-City.mmdb"); final DatabaseReader reader = new DatabaseReader.Builder(database).build(); DataStream<String> messageStream = env .addSource(new FlinkKafkaConsumer09<String>("test", new SimpleStringSchema(), properties)); messageStream .rebalance() .map(new MapFunction<String, String>() { public String map(String value) throws Exception { InetAddress ipAddress = InetAddress.getByName(value); CityResponse response = reader.city(ipAddress); Country country = response.getCountry(); return "Kafka and Flink says: " + value + " " + country; } }).print(); env.execute(); I got the error below: Object FlinkTest$1@7c7d3c46 not serializable org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:99) org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:61) org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1219) org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:160) org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:505) FlinkTest.main(FlinkTest.java:36) Any one can help me ?