Now i have a better version, but now the problem is that the saveAsTextFile do not finish the Job, in the hdfs repository only exist a partial temporary file, someone can tell me what is wrong:
Thanks !! object SimpleApp { def main(args: Array[String]){ val conf = new SparkConf().setAppName("Csv Clipper") val sc = new SparkContext(conf) val csvPath = "hdfs://m01/user/acoronado/mov/movilidad_64mb.csv" val csv = sc.textFile(csvPath) csv.cache() val clipPoints = csv.map({line: String => val Array(usuario, lat, lon, date) = line.split(",").map(_.trim) val punto = Point(lon.toDouble,lat.toDouble) val internal = geoDataExternal.get.find(f => f.geometry intersects punto) val (cve_est, cve_mun) = internal match { case Some(f:org.geoscript.feature.Feature) => { val index = f.getAttribute(1).toString() val existe = geoDataMun.get(index).find(f => f.geometry intersects punto) existe match { case Some(f) => (f.getAttribute(1).toString, f.getAttribute(2).toString) case None => ("0", "0") } } case None => ("0", "0") } val time = try {(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ")).parse(date.replaceAll("Z$", "+0000")).getTime().toString()} catch {case e: Exception => "0"} line+","+time+","+cve_est+","+cve_mun }) clipPoints.*saveAsTextFile* ("hdfs://m01/user/acoronado/mov/resultados_movilidad_60.csv") println("Spark Clip Exito!!!") } object geoDataMun { private val shp = Shapefile("/geoData/MunicipiosLatLon.shp") val features = shp.getFeatures.toIterable val result = scala.io.Source.fromFile("/geoData/indice_espacial.csv") .getLines() .toList map { line: String => val campos = line.split(",").map(_.trim) val cve_edo = campos(0) val cve_mun = campos(1) val index = campos(2) scala.collection.immutable.List(index.toInt , (cve_edo,cve_mun)) } val mapaIx = result.groupBy(x=>x(0)).mapValues(cves => cves.map(x => x(1))) def get(index:String) = { features.filter(f => mapaIx(index.toInt).contains((f.getAttribute(1).toString,f.getAttribute(2).toString))) } } object geoDataExternal{ private val shp = Shapefile("/geoData/IndiceRecortado.shp") val features = shp.getFeatures def get: FeatureCollection[org.geoscript.feature.Schema,org.geoscript.feature.Feature] = features } } the log of the driver is: 14/09/18 19:27:55 ERROR EndpointWriter: AssociationError [akka.tcp:// sparkwor...@axaxaxa-cloudera-s05.xxxnetworks.com:44895] -> [akka.tcp:// sparkexecu...@axaxaxa-cloudera-s05.xxxnetworks.com:43942]: Error [Association failed with [akka.tcp:// sparkexecu...@axaxaxa-cloudera-s05.xxxnetworks.com:43942]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkexecu...@axaxaxa-cloudera-s05.xxxnetworks.com:43942] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: axaxaxa-cloudera-s05.xxxnetworks.com/10.5.96.42:43942 ] 14/09/18 19:27:55 ERROR EndpointWriter: AssociationError [akka.tcp:// sparkwor...@axaxaxa-cloudera-s05.xxxnetworks.com:44895] -> [akka.tcp:// sparkexecu...@axaxaxa-cloudera-s05.xxxnetworks.com:43942]: Error [Association failed with [akka.tcp:// sparkexecu...@axaxaxa-cloudera-s05.xxxnetworks.com:43942]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkexecu...@axaxaxa-cloudera-s05.xxxnetworks.com:43942] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: axaxaxa-cloudera-s05.xxxnetworks.com/10.5.96.42:43942 ] 14/09/18 19:27:55 ERROR EndpointWriter: AssociationError [akka.tcp:// sparkwor...@axaxaxa-cloudera-s05.xxxnetworks.com:44895] -> [akka.tcp:// sparkexecu...@axaxaxa-cloudera-s05.xxxnetworks.com:43942]: Error [Association failed with [akka.tcp:// sparkexecu...@axaxaxa-cloudera-s05.xxxnetworks.com:43942]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkexecu...@axaxaxa-cloudera-s05.xxxnetworks.com:43942] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: axaxaxa-cloudera-s05.xxxnetworks.com/10.5.96.42:43942 On Mon, Sep 15, 2014 at 1:30 PM, Abel Coronado Iruegas < acoronadoirue...@gmail.com> wrote: > Here an example of a working code that takes a csv with lat lon points and > intersects with polygons of municipalities of Mexico, generating a new > version of the file with new attributes. > > Do you think that could be improved? > > Thanks. > > The Code: > > import org.apache.spark.SparkContext > import org.apache.spark.SparkContext._ > import org.apache.spark.SparkConf > import org.geoscript.feature._ > import org.geoscript.geometry._ > import org.geoscript.geometry.builder._ > import com.vividsolutions.jts._ > import org.geoscript.layer.Shapefile > import org.geotools.feature.FeatureCollection > import java.text._ > import java.util._ > > object SimpleApp { > def main(args: Array[String]){ > val conf = new SparkConf().setAppName("Csv Clipper") > val sc = new SparkContext(conf) > val csvPath = > "hdfs://x01/user/acoronado/mov/movilidad.csv" //70 Millions of rows > val csv = sc.textFile(csvPath) > val clipPoints = csv.map({line: String => > val Array(usuario, lat, > lon, date) = line.split(",").map(_.trim) > val punto = > Point(lon.toDouble,lat.toDouble) > val existe = > geoData.get.filter(f => f.geometry intersects punto) // Geospatial operation > var cve_est = "0" > var cve_mun = "0" > var time = "0" > if(!existe.isEmpty){ > val f = existe.take(1) > val ff = f.toList(0) > cve_est = > ff.getAttribute(1).toString //State Code > cve_mun = > ff.getAttribute(2).toString // Municipality Code > time = (new > SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ")).parse(date.replaceAll("Z$", > "+0000")).getTime().toString() > } > > line+","+time+","+cve_est+","+cve_mun > }) > > clipPoints.coalesce(1,true).saveAsTextFile("hdfs://m01/user/acoronado/mov/mov_all.csv") > println("Spark Clip Exito!!!") > } > object geoData { > private val estatal = > Shapefile("/geoData/MunicipiosLatLon.shp") //This directory exist in all > the nodes. > private val estatalColl = estatal.getFeatures > def > get:FeatureCollection[org.geoscript.feature.Schema,org.geoscript.feature.Feature] > = estatalColl > } > } >