Thanks, Evan and Andy: Here a very functional version, i need to improve the syntax, but this works very well, the initial version takes around 36 hours in a 9 machines with 8 cores, and this version takes 36 minutes in a cluster with 7 machines with 8 cores :
object SimpleApp { def main(args: Array[String]){ val conf = new SparkConf().setAppName("Csv Clipper") val sc = new SparkContext(conf) val csvPath = "hdfs://m01/user/acoronado/mov/movilidad.csv" val csv = sc.textFile(csvPath) csv.cache() val clipPoints = csv.map({line: String => val Array(usuario, lat, lon, date) = line.split(",").map(_.trim) val geometryFactory = JTSFactoryFinder.getGeometryFactory(); val reader = new WKTReader(geometryFactory); val point = reader.read("POINT ("+lon+" "+ lat + ")" ) val envelope = point.getEnvelopeInternal val internal = geoDataMun.get(envelope) val (cve_est, cve_mun) = internal match { case l => { val existe = l.find( f => f match { case (g:Geometry,e:String,m:String) => g.intersects(point) case _ => false} ) existe match { case Some(t) => t match { case (g:Geometry,e:String,m:String) => (e,m) case _ => ("0","0")} case None => ("0", "0") } } case _ => ("0", "0") } val time = try {(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ")).parse(date.replaceAll("Z$", "+0000")).getTime().toString()} catch {case e: Exception => "0"} line+","+time+","+cve_est+","+cve_mun }) clipPoints.coalesce(5,true).saveAsTextFile("hdfs://m01/user/acoronado/mov/resultados_movilidad_5_parts.csv") } object geoDataMun { var spatialIndex = new STRtree() val path = new URL("file:////geoData/MunicipiosLatLon.shp") val store = FileDataStoreFinder.getDataStore(path) val source = store.getFeatureSource(); val features = source.getFeatures(); val it = features.features(); while(it.hasNext){ val feature = it.next() val geom = feature.getDefaultGeometry if (geom != null) { val geomClass = geom match { case g2: Geometry => g2 case _ => throw new ClassCastException } val env = geomClass.getEnvelopeInternal(); if (!env.isNull) { spatialIndex.insert(env, (geomClass,feature.getAttribute(1),feature.getAttribute(2))); } } } def get(env:Envelope) = spatialIndex.query(env).asScala } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Example-of-Geoprocessing-with-Spark-tp14274p14752.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org