Here an example of a working code that takes a csv with lat lon points and intersects with polygons of municipalities of Mexico, generating a new version of the file with new attributes.
Do you think that could be improved? Thanks. The Code: import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.geoscript.feature._ import org.geoscript.geometry._ import org.geoscript.geometry.builder._ import com.vividsolutions.jts._ import org.geoscript.layer.Shapefile import org.geotools.feature.FeatureCollection import java.text._ import java.util._ object SimpleApp { def main(args: Array[String]){ val conf = new SparkConf().setAppName("Csv Clipper") val sc = new SparkContext(conf) val csvPath = "hdfs://x01/user/acoronado/mov/movilidad.csv" //70 Millions of rows val csv = sc.textFile(csvPath) val clipPoints = csv.map({line: String => val Array(usuario, lat, lon, date) = line.split(",").map(_.trim) val punto = Point(lon.toDouble,lat.toDouble) val existe = geoData.get.filter(f => f.geometry intersects punto) // Geospatial operation var cve_est = "0" var cve_mun = "0" var time = "0" if(!existe.isEmpty){ val f = existe.take(1) val ff = f.toList(0) cve_est = ff.getAttribute(1).toString //State Code cve_mun = ff.getAttribute(2).toString // Municipality Code time = (new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ")).parse(date.replaceAll("Z$", "+0000")).getTime().toString() } line+","+time+","+cve_est+","+cve_mun }) clipPoints.coalesce(1,true).saveAsTextFile("hdfs://m01/user/acoronado/mov/mov_all.csv") println("Spark Clip Exito!!!") } object geoData { private val estatal = Shapefile("/geoData/MunicipiosLatLon.shp") //This directory exist in all the nodes. private val estatalColl = estatal.getFeatures def get:FeatureCollection[org.geoscript.feature.Schema,org.geoscript.feature.Feature] = estatalColl } }