Hi,
I use maxmind geoip with spark (no streaming). To make it work you should
use mapPartition. I don't know if something similar exist for spark
streaming.

my code for reference:

  def parseIP(ip:String, ipLookups: IpLookups): List[String] = {
    val lookupResult = ipLookups.performLookups(ip)
    val countryName = (lookupResult._1).map(_.countryName).getOrElse("")
    val city = (lookupResult._1).map(_.city).getOrElse(None).getOrElse("")
    val latitude =
(lookupResult._1).map(_.latitude).getOrElse(None).toString
    val longitude =
(lookupResult._1).map(_.longitude).getOrElse(None).toString
    return List(countryName, city, latitude, longitude)
  }
sc.addFile("/home/your_user/GeoLiteCity.dat")

//load your data in my_data rdd

my_data.mapPartitions { rows =>
        val ipLookups = IpLookups(geoFile =
Some(SparkFiles.get("GeoLiteCity.dat")))
        rows.map { row => row ::: parseIP(row(3),ipLookups) }
}

Le 23/02/2016 14:28, Zhun Shen a écrit :

Hi all,

Currently, I sent nginx log to Kafka then I want to use Spark Streaming to
parse the log and enrich the IP info with geoip libs from Maxmind.

I found this one https://github.com/Sanoma-CDA/maxmind-geoip2-scala.git,
but spark streaming throw error and told that the lib was not Serializable.

Does anyone there way to process the IP info in Spark Streaming? Many
thanks.

Reply via email to