I’ve used the code below with SparkSQL. I was using this with Spark 1.4 but should still be good with 1.6. In this case I have a UDF to do the lookup, but for Streaming you’d just have a lambda to apply in a map function, so no UDF wrapper.
import org.apache.spark.sql.functions._ import java.io.File import java.net.InetAddress import com.maxmind.geoip2._ object GeoIPLookup { @transient lazy val reader = { val db = new File("/data/meetup/GeoLite2-City.mmdb") val reader = new DatabaseReader.Builder(db).build() reader } } case class Location(latitude: Double, longitude: Double) case class Geo(city: String, country: String, loc: Location) val iplookup = udf { (s: String) => { val ip = InetAddress.getByName(s) val record = GeoIPLookup.reader.city(ip) val city = record.getCity val country = record.getCountry val location = record.getLocation Geo(city.getName, country.getName, Location(location.getLatitude, location.getLongitude)) } } val withGeo = df.withColumn("geo", iplookup(column("ip"))) From: Zhun Shen<mailto:shenzhunal...@gmail.com> Sent: Monday, February 29, 2016 11:17 PM To: romain sagean<mailto:romain.sag...@hupi.fr> Cc: user<mailto:user@spark.apache.org> Subject: Re: Use maxmind geoip lib to process ip on Spark/Spark Streaming Hi, I check the dependencies and fix the bug. It work well on Spark but not on Spark Streaming. So I think I still need find another way to do it. On Feb 26, 2016, at 2:47 PM, Zhun Shen <shenzhunal...@gmail.com<mailto:shenzhunal...@gmail.com>> wrote: Hi, thanks for you advice. I tried your method, I use Gradle to manage my scala code. 'com.snowplowanalytics:scala-maxmind-iplookups:0.2.0’ was imported in Gradle. spark version: 1.6.0 scala: 2.10.4 scala-maxmind-iplookups: 0.2.0 I run my test, got the error as below: java.lang.NoClassDefFoundError: scala/collection/JavaConversions$JMapWrapperLike at com.snowplowanalytics.maxmind.iplookups.IpLookups$.apply(IpLookups.scala:53) On Feb 24, 2016, at 1:10 AM, romain sagean <romain.sag...@hupi.fr<mailto:romain.sag...@hupi.fr>> wrote: I realize I forgot the sbt part resolvers += "SnowPlow Repo" at "http://maven.snplow.com/releases/"<http://maven.snplow.com/releases/> libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "1.3.0", "com.snowplowanalytics" %% "scala-maxmind-iplookups" % "0.2.0" ) otherwise, to process streaming log I use logstash with kafka as input. You can set kafka as output if you need to do some extra calculation with spark. Le 23/02/2016 15:07, Romain Sagean a écrit : Hi, I use maxmind geoip with spark (no streaming). To make it work you should use mapPartition. I don't know if something similar exist for spark streaming. my code for reference: def parseIP(ip:String, ipLookups: IpLookups): List[String] = { val lookupResult = ipLookups.performLookups(ip) val countryName = (lookupResult._1).map(_.countryName).getOrElse("") val city = (lookupResult._1).map(_.city).getOrElse(None).getOrElse("") val latitude = (lookupResult._1).map(_.latitude).getOrElse(None).toString val longitude = (lookupResult._1).map(_.longitude).getOrElse(None).toString return List(countryName, city, latitude, longitude) } sc.addFile("/home/your_user/GeoLiteCity.dat") //load your data in my_data rdd my_data.mapPartitions { rows => val ipLookups = IpLookups(geoFile = Some(SparkFiles.get("GeoLiteCity.dat"))) rows.map { row => row ::: parseIP(row(3),ipLookups) } } Le 23/02/2016 14:28, Zhun Shen a écrit : Hi all, Currently, I sent nginx log to Kafka then I want to use Spark Streaming to parse the log and enrich the IP info with geoip libs from Maxmind. I found this one <https://github.com/Sanoma-CDA/maxmind-geoip2-scala.git> https://github.com/Sanoma-CDA/maxmind-geoip2-scala.git, but spark streaming throw error and told that the lib was not Serializable. Does anyone there way to process the IP info in Spark Streaming? Many thanks.