Hi,

using dataframes you can use SQL, and SQL has an option of JOIN, BETWEEN,
IN and LIKE OPERATIONS. Why would someone use a dataframe and then use them
as RDD's? :)

Regards,
Gourav Sengupta

On Thu, Mar 3, 2016 at 4:28 PM, Sumedh Wale <sw...@snappydata.io> wrote:

> On Thursday 03 March 2016 09:15 PM, Gourav Sengupta wrote:
>
> Hi,
>
> why not read the table into a dataframe directly using SPARK CSV package.
> You are trying to solve the problem the round about way.
>
>
> Yes, that will simplify and avoid the explicit split/map a bit (though the
> code below is simple enough as is). However, the basic problem with
> performance is not due to that. Note that a DataFrame whether using the
> spark-csv package or otherwise is just an access point into the underlying
> database.txt file, so multiple scans of the DataFrame as in the code below
> will lead to multiple tokenization/parse of the database.txt file which is
> quite expensive. The join approach will reduce to a single scan for case
> below which should definitely be done if possible, but if more queries are
> required to be executed on the DataFrame then saving it into parquet/orc
> (or cacheTable if possible) is faster in my experience.
>
>
> Regards,
> Gourav Sengupta
>
>
> thanks
>
> --
> Sumedh Wale
> SnappyData (http://www.snappydata.io)
>
>
> On Thu, Mar 3, 2016 at 12:33 PM, Sumedh Wale <sw...@snappydata.io> wrote:
>
>> On Thursday 03 March 2016 11:03 AM, Angel Angel wrote:
>>
>> Hello Sir/Madam,
>>
>> I am writing one application using spark sql.
>>
>> i made the vary big table using the following command
>>
>> *val dfCustomers1 =
>> sc.textFile("/root/Desktop/database.txt").map(_.split(",")).map(p =>
>> Customer1(p(0),p(1).trim.toInt, p(2).trim.toInt, p(3)))toDF*
>>
>>
>> Now i want to search the address(many address)  fields in the table and
>> then extends the new table as per the searching.
>>
>> *var k = dfCustomers1.filter(dfCustomers1("Address").equalTo(lines(0)))*
>>
>>
>>
>> *for( a <-1 until 1500) {*
>>
>> *     | var temp=
>> dfCustomers1.filter(dfCustomers1("Address").equalTo(lines(a)))*
>>
>> *     |  k = temp.unionAll(k)*
>>
>> *}*
>>
>> *k.show*
>>
>>
>> For above case one approach that can help a lot is to covert the lines[0]
>> to a table and then do a join on it instead of individual searches.
>> Something like:
>>
>> val linesRDD = sc.parallelize(lines, 1) // since number of lines is
>> small, so 1 partition should be fine
>> val schema = StructType(Array(StructField("Address", StringType)))
>> val linesDF = sqlContext.createDataFrame(linesRDD.map(Row(_)), schema)
>> val result = dfCustomers1.join(linesDF, "Address")
>>
>>
>> If you do need to scan the DataFrame multiple times, then this will end
>> up scanning the csv file, formatting etc in every loop. I would suggest
>> caching in memory or saving to parquet/orc format for faster access. If
>> there is enough memory then the SQLContext.cacheTable API can be used, else
>> can save to parquet file:
>>
>> dfCustomers1.write.parquet("database.parquet")
>> val dfCustomers2 = sqlContext.read.parquet("database.parquet")
>>
>>
>> Normally parquet file scanning should be much faster than CSV scan+format
>> so use the dfCustomers2 everywhere. You can also try various values of
>> "spark.sql.parquet.compression.codec" (lzo, snappy, uncompressed) instead
>> of default gzip. Try if this reduces the runtime. Fastest will be if there
>> is enough memory for sqlContext.cacheTable but I doubt that will be
>> possible since you say it is a big table.
>>
>>
>> But this is taking so long time. So can you suggest me the any optimized
>> way, so i can reduce the execution time.
>>
>>
>> My cluster has 3 slaves and 1 master.
>>
>>
>> Thanks.
>>
>>
>>
>> thanks
>>
>> --
>> Sumedh Wale
>> SnappyData (http://www.snappydata.io)
>>
>> --------------------------------------------------------------------- To
>> unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
>> commands, e-mail: <user-h...@spark.apache.org>user-h...@spark.apache.org
>
>
>
>

Reply via email to