It's probably sloooow as you say because it's actually also doing the map phase that will do the RTree search and so on, and only then saving to hdfs on 60 partition. If you want to see the time spent in saving to hdfs, you could do a count for instance before saving. Also saving from 60 partition might be overkill so what you can do is first recoalescing to the number of physical nodes that you have (without shuffling).
On the other hand, I don't know if you're running this in a cluster but geoDataMun looks rather heavy to serialize so it would be preferable to broadcast it once (since it won't change). Also, it might only be a syntax improvement but the construction of (cve_est, cve_mun) is rather long and seems that it can be replaced by these 3 lines only: > > > > *val (cve_est, cve_mun) = internal collectFirst { case > (g:Geometry,e:String,m:String) if g.intersects(point) => (e, m) } > getOrElse ("0", "0")* HTH aℕdy ℙetrella about.me/noootsab [image: aℕdy ℙetrella on about.me] <http://about.me/noootsab> On Sat, Sep 20, 2014 at 5:50 AM, Abel Coronado Iruegas < acoronadoirue...@gmail.com> wrote: > Hi Evan, > > here a improved version, thanks for your advice. But you know the last > step, > the SaveAsTextFile is very Sloooow, :( > > import org.apache.spark.SparkContext > import org.apache.spark.SparkContext._ > import org.apache.spark.SparkConf > import java.net.URL > import java.text.SimpleDateFormat > import com.vividsolutions.jts.geom._ > import com.vividsolutions.jts.index.strtree.STRtree > import com.vividsolutions.jts.io._ > import org.geotools.data.FileDataStoreFinder > import org.geotools.geometry.jts.{JTSFactoryFinder, ReferencedEnvelope} > import org.opengis.feature.{simple, Feature, FeatureVisitor} > import scala.collection.JavaConverters._ > > > object SimpleApp { > def main(args: Array[String]){ > val conf = new SparkConf().setAppName("Csv Clipper") > val sc = new SparkContext(conf) > val csvPath = "hdfs://m01/user/acoronado/mov/movilidad.csv" > val csv = sc.textFile(csvPath) > //csv.coalesce(60,true) > csv.cache() > val clipPoints = csv.map({line: String => > val Array(usuario, lat, lon, > date) = line.split(",").map(_.trim) > val geometryFactory = > JTSFactoryFinder.getGeometryFactory(); > val reader = new > WKTReader(geometryFactory); > val point = > reader.read("POINT > ("+lon+" "+ lat + ")" ) > val envelope = > point.getEnvelopeInternal > val internal = > geoDataMun.get(envelope) > val (cve_est, cve_mun) = > internal match { > case l => { > val > existe > = l.find( f => f match { case (g:Geometry,e:String,m:String) => > g.intersects(point) case _ => false} ) > existe > match { > > case Some(t) => t match { case (g:Geometry,e:String,m:String) => (e,m) > case > _ => ("0","0")} > > case None => ("0", "0") > > } > } > case _ => ("0", "0") > } > val time = try {(new > SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ")).parse(date.replaceAll("Z$", > "+0000")).getTime().toString()} catch {case e: Exception => "0"} > > > line+","+time+","+cve_est+","+cve_mun > }) > > > clipPoints.saveAsTextFile("hdfs://m01/user/acoronado/mov/resultados_movilidad_fast.csv") > } > object geoDataMun { > var spatialIndex = new STRtree() > val path = new > URL("file:////geoData/MunicipiosLatLon.shp") > val store = FileDataStoreFinder.getDataStore(path) > val source = store.getFeatureSource(); > val features = source.getFeatures(); > val it = features.features(); > while(it.hasNext){ > val feature = it.next() > val geom = feature.getDefaultGeometry > if (geom != null) { > val geomClass = geom match { case g2: Geometry > => g2 case _ => throw new ClassCastException } > val env = geomClass.getEnvelopeInternal(); > if (!env.isNull) { > spatialIndex.insert(env, > (geomClass,feature.getAttribute(1),feature.getAttribute(2))); > } > } > } > def get(env:Envelope) = > spatialIndex.query(env).asScala > } > > } > > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Example-of-Geoprocessing-with-Spark-tp14274p14710.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >