I’ve used the code below with SparkSQL. I was using this with Spark 1.4 but 
should still be good with 1.6. In this case I have a UDF to do the lookup, but 
for Streaming you’d just have a lambda to apply in a map function, so no UDF 
wrapper.

import org.apache.spark.sql.functions._
import java.io.File
import java.net.InetAddress
import com.maxmind.geoip2._

object GeoIPLookup {
    @transient lazy val reader = {
        val db = new File("/data/meetup/GeoLite2-City.mmdb")

        val reader = new DatabaseReader.Builder(db).build()

        reader
    }
}

case class Location(latitude: Double, longitude: Double)
case class Geo(city: String, country: String, loc: Location)

val iplookup = udf { (s: String) => {
   val ip = InetAddress.getByName(s)

   val record = GeoIPLookup.reader.city(ip)

   val city = record.getCity
   val country = record.getCountry
   val location = record.getLocation

   Geo(city.getName, country.getName, Location(location.getLatitude, 
location.getLongitude))
} }

val withGeo = df.withColumn("geo", iplookup(column("ip")))


From: Zhun Shen<mailto:shenzhunal...@gmail.com>
Sent: Monday, February 29, 2016 11:17 PM
To: romain sagean<mailto:romain.sag...@hupi.fr>
Cc: user<mailto:user@spark.apache.org>
Subject: Re: Use maxmind geoip lib to process ip on Spark/Spark Streaming

Hi,

I check the dependencies and fix the bug. It work well on Spark but not on 
Spark Streaming. So I think I still need find another way to do it.


On Feb 26, 2016, at 2:47 PM, Zhun Shen 
<shenzhunal...@gmail.com<mailto:shenzhunal...@gmail.com>> wrote:

Hi,

thanks for you advice. I tried your method, I use Gradle to manage my scala 
code. 'com.snowplowanalytics:scala-maxmind-iplookups:0.2.0’ was imported in 
Gradle.

spark version: 1.6.0
scala: 2.10.4
scala-maxmind-iplookups: 0.2.0

I run my test, got the error as below:
java.lang.NoClassDefFoundError: scala/collection/JavaConversions$JMapWrapperLike
at com.snowplowanalytics.maxmind.iplookups.IpLookups$.apply(IpLookups.scala:53)




On Feb 24, 2016, at 1:10 AM, romain sagean 
<romain.sag...@hupi.fr<mailto:romain.sag...@hupi.fr>> wrote:

I realize I forgot the sbt part

resolvers += "SnowPlow Repo" at 
"http://maven.snplow.com/releases/";<http://maven.snplow.com/releases/>

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.3.0",
  "com.snowplowanalytics"  %% "scala-maxmind-iplookups"  % "0.2.0"
)

otherwise, to process streaming log I use logstash with kafka as input. You can 
set kafka as output if you need to do some extra calculation with spark.

Le 23/02/2016 15:07, Romain Sagean a écrit :
Hi,
I use maxmind geoip with spark (no streaming). To make it work you should use 
mapPartition. I don't know if something similar exist for spark streaming.

my code for reference:

  def parseIP(ip:String, ipLookups: IpLookups): List[String] = {
    val lookupResult = ipLookups.performLookups(ip)
    val countryName = (lookupResult._1).map(_.countryName).getOrElse("")
    val city = (lookupResult._1).map(_.city).getOrElse(None).getOrElse("")
    val latitude = (lookupResult._1).map(_.latitude).getOrElse(None).toString
    val longitude = (lookupResult._1).map(_.longitude).getOrElse(None).toString
    return List(countryName, city, latitude, longitude)
  }
sc.addFile("/home/your_user/GeoLiteCity.dat")

//load your data in my_data rdd

my_data.mapPartitions { rows =>
        val ipLookups = IpLookups(geoFile = 
Some(SparkFiles.get("GeoLiteCity.dat")))
        rows.map { row => row ::: parseIP(row(3),ipLookups) }
}

Le 23/02/2016 14:28, Zhun Shen a écrit :
Hi all,

Currently, I sent nginx log to Kafka then I want to use Spark Streaming to 
parse the log and enrich the IP info with geoip libs from Maxmind.

I found this one <https://github.com/Sanoma-CDA/maxmind-geoip2-scala.git> 
https://github.com/Sanoma-CDA/maxmind-geoip2-scala.git, but spark streaming 
throw error and told that the lib was not Serializable.

Does anyone there way to process the IP info in Spark Streaming? Many thanks.





Reply via email to