Hi, using dataframes you can use SQL, and SQL has an option of JOIN, BETWEEN, IN and LIKE OPERATIONS. Why would someone use a dataframe and then use them as RDD's? :)
Regards, Gourav Sengupta On Thu, Mar 3, 2016 at 4:28 PM, Sumedh Wale <sw...@snappydata.io> wrote: > On Thursday 03 March 2016 09:15 PM, Gourav Sengupta wrote: > > Hi, > > why not read the table into a dataframe directly using SPARK CSV package. > You are trying to solve the problem the round about way. > > > Yes, that will simplify and avoid the explicit split/map a bit (though the > code below is simple enough as is). However, the basic problem with > performance is not due to that. Note that a DataFrame whether using the > spark-csv package or otherwise is just an access point into the underlying > database.txt file, so multiple scans of the DataFrame as in the code below > will lead to multiple tokenization/parse of the database.txt file which is > quite expensive. The join approach will reduce to a single scan for case > below which should definitely be done if possible, but if more queries are > required to be executed on the DataFrame then saving it into parquet/orc > (or cacheTable if possible) is faster in my experience. > > > Regards, > Gourav Sengupta > > > thanks > > -- > Sumedh Wale > SnappyData (http://www.snappydata.io) > > > On Thu, Mar 3, 2016 at 12:33 PM, Sumedh Wale <sw...@snappydata.io> wrote: > >> On Thursday 03 March 2016 11:03 AM, Angel Angel wrote: >> >> Hello Sir/Madam, >> >> I am writing one application using spark sql. >> >> i made the vary big table using the following command >> >> *val dfCustomers1 = >> sc.textFile("/root/Desktop/database.txt").map(_.split(",")).map(p => >> Customer1(p(0),p(1).trim.toInt, p(2).trim.toInt, p(3)))toDF* >> >> >> Now i want to search the address(many address) fields in the table and >> then extends the new table as per the searching. >> >> *var k = dfCustomers1.filter(dfCustomers1("Address").equalTo(lines(0)))* >> >> >> >> *for( a <-1 until 1500) {* >> >> * | var temp= >> dfCustomers1.filter(dfCustomers1("Address").equalTo(lines(a)))* >> >> * | k = temp.unionAll(k)* >> >> *}* >> >> *k.show* >> >> >> For above case one approach that can help a lot is to covert the lines[0] >> to a table and then do a join on it instead of individual searches. >> Something like: >> >> val linesRDD = sc.parallelize(lines, 1) // since number of lines is >> small, so 1 partition should be fine >> val schema = StructType(Array(StructField("Address", StringType))) >> val linesDF = sqlContext.createDataFrame(linesRDD.map(Row(_)), schema) >> val result = dfCustomers1.join(linesDF, "Address") >> >> >> If you do need to scan the DataFrame multiple times, then this will end >> up scanning the csv file, formatting etc in every loop. I would suggest >> caching in memory or saving to parquet/orc format for faster access. If >> there is enough memory then the SQLContext.cacheTable API can be used, else >> can save to parquet file: >> >> dfCustomers1.write.parquet("database.parquet") >> val dfCustomers2 = sqlContext.read.parquet("database.parquet") >> >> >> Normally parquet file scanning should be much faster than CSV scan+format >> so use the dfCustomers2 everywhere. You can also try various values of >> "spark.sql.parquet.compression.codec" (lzo, snappy, uncompressed) instead >> of default gzip. Try if this reduces the runtime. Fastest will be if there >> is enough memory for sqlContext.cacheTable but I doubt that will be >> possible since you say it is a big table. >> >> >> But this is taking so long time. So can you suggest me the any optimized >> way, so i can reduce the execution time. >> >> >> My cluster has 3 slaves and 1 master. >> >> >> Thanks. >> >> >> >> thanks >> >> -- >> Sumedh Wale >> SnappyData (http://www.snappydata.io) >> >> --------------------------------------------------------------------- To >> unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional >> commands, e-mail: <user-h...@spark.apache.org>user-h...@spark.apache.org > > > >