Thanks, Evan and Andy:

Here a very functional version, i need to improve the syntax, but this works
very well, the initial version takes around 36 hours in a 9 machines with 8
cores, and this version takes 36 minutes in a cluster with 7 machines with 8
cores :

object SimpleApp {
        def main(args: Array[String]){
                val conf = new SparkConf().setAppName("Csv Clipper")
                val sc = new SparkContext(conf)
                val csvPath = "hdfs://m01/user/acoronado/mov/movilidad.csv"
                val csv = sc.textFile(csvPath)
                csv.cache()
                val clipPoints = csv.map({line: String =>
                                              val Array(usuario, lat, lon,
date) = line.split(",").map(_.trim)
                                              val geometryFactory =
JTSFactoryFinder.getGeometryFactory();
                                              val reader = new
WKTReader(geometryFactory);
                                              val point = reader.read("POINT
("+lon+" "+ lat + ")" )
                                              val envelope =
point.getEnvelopeInternal
                                              val internal =
geoDataMun.get(envelope)
                                              val (cve_est, cve_mun) =
internal match {
                                                case l => {
                                                                  val existe
= l.find( f => f match { case (g:Geometry,e:String,m:String) =>
g.intersects(point) case _ => false} )
                                                                  existe
match {  
                                                                                
 
case Some(t)  => t match { case (g:Geometry,e:String,m:String) => (e,m) case
_ => ("0","0")}
                                                                                
 
case None => ("0", "0")
                                                                               
}
                                                                }
                                                case _ => ("0", "0")
                                              }
                                              val time = try {(new
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ")).parse(date.replaceAll("Z$",
"+0000")).getTime().toString()} catch {case e: Exception => "0"}

                                              
line+","+time+","+cve_est+","+cve_mun
                                })
               
clipPoints.coalesce(5,true).saveAsTextFile("hdfs://m01/user/acoronado/mov/resultados_movilidad_5_parts.csv")
        }
        object geoDataMun {
                      var spatialIndex = new STRtree()
                      val path = new
URL("file:////geoData/MunicipiosLatLon.shp")
                      val store = FileDataStoreFinder.getDataStore(path)
                      val source = store.getFeatureSource();
                      val features = source.getFeatures();
                      val  it = features.features();
                      while(it.hasNext){
                        val feature = it.next()
                        val geom  =  feature.getDefaultGeometry
                        if (geom != null) {
                          val geomClass = geom match {   case g2: Geometry
=> g2  case _ => throw new ClassCastException }
                          val env = geomClass.getEnvelopeInternal();
                          if (!env.isNull) {
                            spatialIndex.insert(env,
(geomClass,feature.getAttribute(1),feature.getAttribute(2)));
                          }
                        }
                      }
                      def get(env:Envelope) =
spatialIndex.query(env).asScala
                }

}




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Example-of-Geoprocessing-with-Spark-tp14274p14752.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to