Here an example of a working code that takes a csv with lat lon points and
intersects with polygons of municipalities of Mexico, generating a new
version of the file with new attributes.

Do you think that could be improved?

Thanks.

The Code:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.geoscript.feature._
import org.geoscript.geometry._
import org.geoscript.geometry.builder._
import com.vividsolutions.jts._
import org.geoscript.layer.Shapefile
import org.geotools.feature.FeatureCollection
import java.text._
import java.util._

object SimpleApp {
        def main(args: Array[String]){
                val conf = new SparkConf().setAppName("Csv Clipper")
                val sc = new SparkContext(conf)
                val csvPath = "hdfs://x01/user/acoronado/mov/movilidad.csv"
//70 Millions of rows
                val csv = sc.textFile(csvPath)
                val clipPoints = csv.map({line: String =>
                                               val Array(usuario, lat, lon,
date) = line.split(",").map(_.trim)
                                               val punto =
Point(lon.toDouble,lat.toDouble)
                                               val existe =
geoData.get.filter(f => f.geometry intersects punto) // Geospatial operation
                                               var cve_est = "0"
                                               var cve_mun = "0"
                                               var time = "0"
                                               if(!existe.isEmpty){
                                                  val f = existe.take(1)
                                                  val ff = f.toList(0)
                                                  cve_est =
ff.getAttribute(1).toString //State Code
                                                  cve_mun =
ff.getAttribute(2).toString  // Municipality Code
                                                  time = (new
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ")).parse(date.replaceAll("Z$",
"+0000")).getTime().toString()
                                               }

 line+","+time+","+cve_est+","+cve_mun
                                           })

clipPoints.coalesce(1,true).saveAsTextFile("hdfs://m01/user/acoronado/mov/mov_all.csv")
                println("Spark Clip Exito!!!")
        }
        object geoData {
            private val estatal =
Shapefile("/geoData/MunicipiosLatLon.shp") //This directory exist in all
the nodes.
            private val estatalColl = estatal.getFeatures
            def
get:FeatureCollection[org.geoscript.feature.Schema,org.geoscript.feature.Feature]
= estatalColl
        }
}

Reply via email to