Hi Benyi thanks for the reply yes I call each hive partition/ hdfs
directory in one thread so that I can make it faster if I dont use threads
then job is even more slow. Like I mentioned I have to process 2000 hive
partitions so 2000 hdfs direcotories containing ORC files right? If I dont
use threads then these 2000 directories will get processed one by one. By
using Executor Service threads I can make it faster by using thread pool of
20 jobs so that at a time 20 jobs are running in one main job.

On Fri, Dec 11, 2015 at 12:49 AM, Benyi Wang <bewang.t...@gmail.com> wrote:

> I don't understand this: "I have the following method code which I call it
> from a thread spawn from spark driver. So in this case 2000 threads ..."
>
> Why do you call it from a thread?
> Are you process one partition in one thread?
>
> On Thu, Dec 10, 2015 at 11:13 AM, Benyi Wang <bewang.t...@gmail.com>
> wrote:
>
>> DataFrame filterFrame1 = 
>> sourceFrame.filter(col("col1").contains("xyz"));DataFrame frameToProcess = 
>> sourceFrame.except(filterFrame1);
>>
>> except is really expensive. Do you actually want this:
>>
>> sourceFrame.filter(! col("col1").contains("xyz"))
>>
>> ​
>>
>> On Thu, Dec 10, 2015 at 9:57 AM, unk1102 <umesh.ka...@gmail.com> wrote:
>>
>>> Hi I have spark job which reads Hive-ORC data and processes and
>>> generates csv
>>> file in the end. Now this ORC files are hive partitions and I have around
>>> 2000 partitions to process every day. These hive partitions size is
>>> around
>>> 800 GB in HDFS. I have the following method code which I call it from a
>>> thread spawn from spark driver. So in this case 2000 threads gets
>>> processed
>>> and those runs painfully slow around 12 hours making huge data shuffling
>>> each executor shuffles around 50 GB of data. I am using 40 executors of 4
>>> core and 30 GB memory each. I am using Hadoop 2.6 and Spark 1.5.2
>>> release.
>>>
>>> public void callThisFromThread() {
>>> DataFrame sourceFrame =
>>> hiveContext.read().format("orc").load("/path/in/hdfs");
>>> DataFrame filterFrame1 = sourceFrame.filter(col("col1").contains("xyz"));
>>> DataFrame frameToProcess = sourceFrame.except(filterFrame1);
>>> JavaRDD<Rows> updatedRDD = frameToProcess.toJavaRDD().mapPartitions() {
>>> .....
>>> }
>>> DataFrame updatedFrame =
>>> hiveContext.createDataFrame(updatedRdd,sourceFrame.schema());
>>> DataFrame selectFrame = updatedFrame.select("col1","col2...","col8");
>>> DataFrame groupFrame =
>>> selectFrame.groupBy("col1","col2....","col8").agg("......");//8 column
>>> group
>>> by
>>> groupFrame.coalesec(1).save();//save as csv only one file so coalesce(1)
>>> }
>>>
>>> Please guide me how can I optimize above code I cant avoid group by
>>> which is
>>> evil I know I have to do group on 8 fields mentioned above.
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-make-this-Spark-1-5-2-code-fast-and-shuffle-less-data-tp25671.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>

Reply via email to