Hi there,
In my case, I want to use GeoIP2 in Flink Streaming, I know I need to serialize
geoip2 related classes using Kryo. But I did know how to do it.
Flink version: 1.0.0
Kafka version: 0.9.0.0
Deploy Mode: Local
My demo code as below:
File database = new File(“/home/user/GeoIP2-City.mmdb");
final DatabaseReader reader = new
DatabaseReader.Builder(database).build();
DataStream<String> messageStream = env
.addSource(new FlinkKafkaConsumer09<String>("test", new
SimpleStringSchema(), properties));
messageStream
.rebalance()
.map(new MapFunction<String, String>() {
public String map(String value) throws Exception {
InetAddress ipAddress = InetAddress.getByName(value);
CityResponse response = reader.city(ipAddress);
Country country = response.getCountry();
return "Kafka and Flink says: " + value + " " + country;
}
}).print();
env.execute();
I got the error below:
Object FlinkTest$1@7c7d3c46 not serializable
org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:99)
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:61)
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1219)
org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:160)
org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:505)
FlinkTest.main(FlinkTest.java:36)
Any one can help me ?