Hi, I use maxmind geoip with spark (no streaming). To make it work you should use mapPartition. I don't know if something similar exist for spark streaming.
my code for reference: def parseIP(ip:String, ipLookups: IpLookups): List[String] = { val lookupResult = ipLookups.performLookups(ip) val countryName = (lookupResult._1).map(_.countryName).getOrElse("") val city = (lookupResult._1).map(_.city).getOrElse(None).getOrElse("") val latitude = (lookupResult._1).map(_.latitude).getOrElse(None).toString val longitude = (lookupResult._1).map(_.longitude).getOrElse(None).toString return List(countryName, city, latitude, longitude) } sc.addFile("/home/your_user/GeoLiteCity.dat") //load your data in my_data rdd my_data.mapPartitions { rows => val ipLookups = IpLookups(geoFile = Some(SparkFiles.get("GeoLiteCity.dat"))) rows.map { row => row ::: parseIP(row(3),ipLookups) } } Le 23/02/2016 14:28, Zhun Shen a écrit : Hi all, Currently, I sent nginx log to Kafka then I want to use Spark Streaming to parse the log and enrich the IP info with geoip libs from Maxmind. I found this one https://github.com/Sanoma-CDA/maxmind-geoip2-scala.git, but spark streaming throw error and told that the lib was not Serializable. Does anyone there way to process the IP info in Spark Streaming? Many thanks.