
using dataframes you can use SQL, and SQL has an option of JOIN, BETWEEN,
IN and LIKE OPERATIONS. Why would someone use a dataframe and then use them
as RDD's? :)

> Hi,
> why not read the table into a dataframe directly using SPARK CSV package.
> You are trying to solve the problem the round about way.
> Yes, that will simplify and avoid the explicit split/map a bit (though the
> code below is simple enough as is). However, the basic problem with
> performance is not due to that. Note that a DataFrame whether using the
> spark-csv package or otherwise is just an access point into the underlying
> database.txt file, so multiple scans of the DataFrame as in the code below
> will lead to multiple tokenization/parse of the database.txt file which is
> quite expensive. The join approach will reduce to a single scan for case
> below which should definitely be done if possible, but if more queries are
> required to be executed on the DataFrame then saving it into parquet/orc
> (or cacheTable if possible) is faster in my experience.
>> Hello Sir/Madam,
>> I am writing one application using spark sql.
>> i made the vary big table using the following command
>> *val dfCustomers1 =
>> sc.textFile("/root/Desktop/database.txt").map(_.split(",")).map(p =>
>> Customer1(p(0),p(1).trim.toInt, p(2).trim.toInt, p(3)))toDF*
>> Now i want to search the address(many address)  fields in the table and
>> then extends the new table as per the searching.
>> *var k = dfCustomers1.filter(dfCustomers1("Address").equalTo(lines(0)))*
>> *for( a <-1 until 1500) {*
>> *     | var temp=
>> dfCustomers1.filter(dfCustomers1("Address").equalTo(lines(a)))*
>> *     |  k = temp.unionAll(k)*
>> *}*
>> *k.show*
>> For above case one approach that can help a lot is to covert the lines[0]
>> to a table and then do a join on it instead of individual searches.
>> Something like:
>> val linesRDD = sc.parallelize(lines, 1) // since number of lines is
>> small, so 1 partition should be fine
>> val schema = StructType(Array(StructField("Address", StringType)))
>> val linesDF = sqlContext.createDataFrame(linesRDD.map(Row(_)), schema)
>> val result = dfCustomers1.join(linesDF, "Address")
>> If you do need to scan the DataFrame multiple times, then this will end
>> up scanning the csv file, formatting etc in every loop. I would suggest
>> caching in memory or saving to parquet/orc format for faster access. If
>> there is enough memory then the SQLContext.cacheTable API can be used, else
>> can save to parquet file:
>> dfCustomers1.write.parquet("database.parquet")
>> val dfCustomers2 = sqlContext.read.parquet("database.parquet")
>> Normally parquet file scanning should be much faster than CSV scan+format
>> so use the dfCustomers2 everywhere. You can also try various values of
>> "spark.sql.parquet.compression.codec" (lzo, snappy, uncompressed) instead
>> of default gzip. Try if this reduces the runtime. Fastest will be if there
>> is enough memory for sqlContext.cacheTable but I doubt that will be
>> possible since you say it is a big table.
>> But this is taking so long time. So can you suggest me the any optimized
>> way, so i can reduce the execution time.
>> My cluster has 3 slaves and 1 master.
