On Thursday 03 March 2016 09:15 PM, Gourav Sengupta wrote:
Hi,

why not read the table into a dataframe directly using SPARK CSV package. You are trying to solve the problem the round about way.


Yes, that will simplify and avoid the explicit split/map a bit (though the code below is simple enough as is). However, the basic problem with performance is not due to that. Note that a DataFrame whether using the spark-csv package or otherwise is just an access point into the underlying database.txt file, so multiple scans of the DataFrame as in the code below will lead to multiple tokenization/parse of the database.txt file which is quite expensive. The join approach will reduce to a single scan for case below which should definitely be done if possible, but if more queries are required to be executed on the DataFrame then saving it into parquet/orc (or cacheTable if possible) is faster in my experience.


Regards,
Gourav Sengupta


thanks
-- 
Sumedh Wale
SnappyData (http://www.snappydata.io)

On Thu, Mar 3, 2016 at 12:33 PM, Sumedh Wale <sw...@snappydata.io> wrote:
On Thursday 03 March 2016 11:03 AM, Angel Angel wrote:
Hello Sir/Madam,

I am writing one application using spark sql.

i made the vary big table using the following command 

val dfCustomers1 = sc.textFile("/root/Desktop/database.txt").map(_.split(",")).map(p => Customer1(p(0),p(1).trim.toInt, p(2).trim.toInt, p(3)))toDF


Now i want to search the address(many address)  fields in the table and then extends the new table as per the searching. 

var k = dfCustomers1.filter(dfCustomers1("Address").equalTo(lines(0)))



for( a <-1 until 1500) {

     | var temp= dfCustomers1.filter(dfCustomers1("Address").equalTo(lines(a)))

     |  k = temp.unionAll(k)

}

k.show


For above case one approach that can help a lot is to covert the lines[0] to a table and then do a join on it instead of individual searches. Something like:

val linesRDD = sc.parallelize(lines, 1) // since number of lines is small, so 1 partition should be fine
val schema = StructType(Array(StructField("Address", StringType)))
val linesDF = sqlContext.createDataFrame(linesRDD.map(Row(_)), schema)
val result = dfCustomers1.join(linesDF, "Address")

If you do need to scan the DataFrame multiple times, then this will end up scanning the csv file, formatting etc in every loop. I would suggest caching in memory or saving to parquet/orc format for faster access. If there is enough memory then the SQLContext.cacheTable API can be used, else can save to parquet file:

dfCustomers1.write.parquet("database.parquet")
val dfCustomers2 = sqlContext.read.parquet("database.parquet")

Normally parquet file scanning should be much faster than CSV scan+format so use the dfCustomers2 everywhere. You can also try various values of "spark.sql.parquet.compression.codec" (lzo, snappy, uncompressed) instead of default gzip. Try if this reduces the runtime. Fastest will be if there is enough memory for sqlContext.cacheTable but I doubt that will be possible since you say it is a big table.


But this is taking so long time. So can you suggest me the any optimized way, so i can reduce the execution time.


My cluster has 3 slaves and 1 master.


Thanks.



thanks
-- 
Sumedh Wale
SnappyData (http://www.snappydata.io)
--------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org


--------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to