Hi All,I am new to Spark. Written a program to read data from local big file,
sort using Spark SQL and then filter based some validation rules. I have
tested this program with 23860746 lines of file, and it took 39 secs (2
cores and Xmx as 6gb). But, when I want to serializing it to a local file,
it is taking much time (I stopped the execution). For 100K lines of file,
without saveAsTextFile, program took 8 secs, whereas writing to file taking
20 mins. BTW, I am using Kryo serialization and StorageLevel.MEMORY_ONLY_SER
option for persisting into RAM. The program is almost as provided
below:package main.scalaimport scala.reflect.runtime.universeimport
org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport
org.apache.spark.serializer.KryoRegistratorimport
org.apache.spark.sql.SQLContextimport
org.apache.spark.storage.StorageLevelimport
com.esotericsoftware.kryo.Kryoobject SparkSqlApplication extends App {   
val txtFile = "/home/admin/scala/bigfile.txt"  val outputDir =
"file:///home/admin/scala/spark-poc/sample_data1_spark-sql"  val conf = new
SparkConf()                .setMaster("local")               
.setAppName("Spark App")                .setSparkHome("$SPARK_HOME")            
   
.setJars(List("target/scala-2.10/spark-project_2.10-1.0.jar"))               
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")          
     
.set("spark.kryo.registrator", "main.scala.ReconRegistrator")  val sc = new
SparkContext(conf)  val sqlCtx = new SQLContext(sc)  import
sqlCtx.createSchemaRDD  val patient = sc.textFile(txtFile)                 
.persist(StorageLevel.MEMORY_ONLY_SER)                  .map(_.split(","))      
           
.map(arr => Patient(arr(0).trim(), arr(1), arr(2)))                 
.registerAsTable("patient")  val sortedPat = sqlCtx.sql("select * from
patient order by pcode")  val validator = new GroovyIntegrator()  val
filteredInvalidPat = sortedPat.filter(patientRow => !validator.applyRules(""
+ patientRow(0)))  filteredInvalidPat.coalesce(1,
false).saveAsTextFile(outputDir)}  case class Patient(pcode: String,
disease: String, dcategory: String)class ReconRegistrator extends
KryoRegistrator {  override def registerClasses(kryo: Kryo) {   
kryo.register(classOf[Patient])  }}Can anyone help on this?Thanks,Sudip



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SaveAsTextFile-of-RDD-taking-much-time-tp9979.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply via email to