Your code has to send the variable "DatabaseReader reader" into the cluster
together with the map function.
The class is not serializable, which means you cannot ship it like that.

If you control the code of the DatabaseReader , try to make the class
serializable.

If you cannot change the code of the DatabaseReader DatabaseReader, you can
try to do the following:

  - (1) copy the file into a distributed filesystem
  - (2) use a RichMapFunction, and in open(), you load the database from
the distributed file system stream.


On Mon, Apr 4, 2016 at 4:52 PM, Zhun Shen <shenzhunal...@gmail.com> wrote:

> Hi there,
>
> In my case, I want to use GeoIP2 in Flink Streaming, I know I need
> to serialize geoip2 related classes using Kryo. But I did know how to do it.
>
> Flink version: 1.0.0
> Kafka version: 0.9.0.0
> Deploy Mode: Local
>
> My demo code as below:
>
>         File database = new File(“/home/user/GeoIP2-City.mmdb");
>         final DatabaseReader reader = new
> DatabaseReader.Builder(database).build();
>         DataStream<String> messageStream = env
>                 .addSource(new FlinkKafkaConsumer09<String>("test", new
> SimpleStringSchema(), properties));
>
>         messageStream
>                 .rebalance()
>                 .map(new MapFunction<String, String>() {
>                     public String map(String value) throws Exception {
>
>                         InetAddress ipAddress =
> InetAddress.getByName(value);
>                         CityResponse response = reader.city(ipAddress);
>                         Country country = response.getCountry();
>                         return "Kafka and Flink says: " + value + " " +
> country;
>                     }
>                 }).print();
>
>         env.execute();
>
>
> I got the error below:
>
> Object FlinkTest$1@7c7d3c46 not serializable
>
> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:99)
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:61)
>
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1219)
>
> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:160)
>
> org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:505)
> FlinkTest.main(FlinkTest.java:36)
>
> Any one can help me ?
>

Reply via email to