It's probably sloooow as you say because it's actually also doing the map
phase that will do the RTree search and so on, and only then saving to hdfs
on 60 partition. If you want to see the time spent in saving to hdfs, you
could do a count for instance before saving. Also saving from 60 partition
might be overkill so what you can do is first recoalescing to the number of
physical nodes that you have (without shuffling).

On the other hand, I don't know if you're running this in a cluster but
geoDataMun looks rather heavy to serialize so it would be preferable to
broadcast it once (since it won't change).

Also, it might only be a syntax improvement but the construction of (cve_est,
cve_mun) is rather long and seems that it can be replaced by these 3 lines
only:

>
>
>
> *val (cve_est, cve_mun) =   internal collectFirst {    case
> (g:Geometry,e:String,m:String) if g.intersects(point) => (e, m)  }
> getOrElse ("0", "0")*


HTH

aℕdy ℙetrella
about.me/noootsab
[image: aℕdy ℙetrella on about.me]

<http://about.me/noootsab>

On Sat, Sep 20, 2014 at 5:50 AM, Abel Coronado Iruegas <
acoronadoirue...@gmail.com> wrote:

> Hi Evan,
>
> here a improved version, thanks for your advice. But you know the last
> step,
> the SaveAsTextFile is very Sloooow, :(
>
> import org.apache.spark.SparkContext
> import org.apache.spark.SparkContext._
> import org.apache.spark.SparkConf
> import java.net.URL
> import java.text.SimpleDateFormat
> import com.vividsolutions.jts.geom._
> import com.vividsolutions.jts.index.strtree.STRtree
> import com.vividsolutions.jts.io._
> import org.geotools.data.FileDataStoreFinder
> import org.geotools.geometry.jts.{JTSFactoryFinder, ReferencedEnvelope}
> import org.opengis.feature.{simple, Feature, FeatureVisitor}
> import scala.collection.JavaConverters._
>
>
> object SimpleApp {
>         def main(args: Array[String]){
>                 val conf = new SparkConf().setAppName("Csv Clipper")
>                 val sc = new SparkContext(conf)
>                 val csvPath = "hdfs://m01/user/acoronado/mov/movilidad.csv"
>                 val csv = sc.textFile(csvPath)
>                 //csv.coalesce(60,true)
>                 csv.cache()
>                 val clipPoints = csv.map({line: String =>
>                                               val Array(usuario, lat, lon,
> date) = line.split(",").map(_.trim)
>                                               val geometryFactory =
> JTSFactoryFinder.getGeometryFactory();
>                                               val reader = new
> WKTReader(geometryFactory);
>                                               val point =
> reader.read("POINT
> ("+lon+" "+ lat + ")" )
>                                               val envelope =
> point.getEnvelopeInternal
>                                               val internal =
> geoDataMun.get(envelope)
>                                               val (cve_est, cve_mun) =
> internal match {
>                                                 case l => {
>                                                                   val
> existe
> = l.find( f => f match { case (g:Geometry,e:String,m:String) =>
> g.intersects(point) case _ => false} )
>                                                                   existe
> match {
>
> case Some(t)  => t match { case (g:Geometry,e:String,m:String) => (e,m)
> case
> _ => ("0","0")}
>
> case None => ("0", "0")
>
> }
>                                                                 }
>                                                 case _ => ("0", "0")
>                                               }
>                                               val time = try {(new
> SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ")).parse(date.replaceAll("Z$",
> "+0000")).getTime().toString()} catch {case e: Exception => "0"}
>
>
> line+","+time+","+cve_est+","+cve_mun
>                                 })
>
>
> clipPoints.saveAsTextFile("hdfs://m01/user/acoronado/mov/resultados_movilidad_fast.csv")
>         }
>         object geoDataMun {
>                       var spatialIndex = new STRtree()
>                       val path = new
> URL("file:////geoData/MunicipiosLatLon.shp")
>                       val store = FileDataStoreFinder.getDataStore(path)
>                       val source = store.getFeatureSource();
>                       val features = source.getFeatures();
>                       val  it = features.features();
>                       while(it.hasNext){
>                         val feature = it.next()
>                         val geom  =  feature.getDefaultGeometry
>                         if (geom != null) {
>                           val geomClass = geom match {   case g2: Geometry
> => g2  case _ => throw new ClassCastException }
>                           val env = geomClass.getEnvelopeInternal();
>                           if (!env.isNull) {
>                             spatialIndex.insert(env,
> (geomClass,feature.getAttribute(1),feature.getAttribute(2)));
>                           }
>                         }
>                       }
>                       def get(env:Envelope) =
> spatialIndex.query(env).asScala
>                 }
>
> }
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Example-of-Geoprocessing-with-Spark-tp14274p14710.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to