Re: Different Sorting RDD methods in Apache Spark

2015-06-09 Thread Mark Hamstra
t; public static void sortList(JavaSparkContext sc){
>>>> JavaRDD rdd = sc.textFile("/data.txt",32);
>>>> //parallelize(l,
>>>> 8);
>>>> long start = System.currentTimeMillis();
>>>> JavaRDD>> rdd3 =
>>>> rdd.mapPartitions(new FlatMapFunction,
>>>> LinkedList>>(){
>>>>
>>>> @Override
>>>> public Iterable>>
>>>> call(Iterator t)
>>>> throws Exception {
>>>>   // TODO Auto-generated method stub
>>>>   LinkedList> lines = new
>>>> LinkedList>();
>>>>   while(t.hasNext()){
>>>> String s = t.next();
>>>> String arr1[] = s.split(",");
>>>> Tuple2 t1 = new Tuple2>>> String>(Double.parseDouble(arr1[24]),s);
>>>> lines.add(t1);
>>>>   }
>>>>   Collections.sort(lines, new IncomeComparator());
>>>>   LinkedList>> list =
>>>> new
>>>> LinkedList>>();
>>>>   list.add(lines);
>>>>   return list;
>>>> }
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Different-Sorting-RDD-methods-in-Apache-Spark-tp23214.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>>
>>


Re: Different Sorting RDD methods in Apache Spark

2015-06-09 Thread Raghav Shankar
Thank you for you responses!

You mention that it only works as long as the data fits on a single
machine. What I am tying to do is receive the sorted contents of my
dataset. For this to be possible, the entire dataset should be able to fit
on a single machine. Are you saying that sorting the entire data and
collecting it on the driver node is not a typical use case? If I want to do
this using sortBy(), I would first call sortBy() followed by a collect().
Collect() would involve gathering all the data on a single machine as well.

Thanks,
Raghav

On Tuesday, June 9, 2015, Mark Hamstra  wrote:

> Correct.  Trading away scalability for increased performance is not an
> option for the standard Spark API.
>
> On Tue, Jun 9, 2015 at 3:05 AM, Daniel Darabos <
> daniel.dara...@lynxanalytics.com
> > wrote:
>
>> It would be even faster to load the data on the driver and sort it there
>> without using Spark :). Using reduce() is cheating, because it only works
>> as long as the data fits on one machine. That is not the targeted use case
>> of a distributed computation system. You can repeat your test with more
>> data (that doesn't fit on one machine) to see what I mean.
>>
>> On Tue, Jun 9, 2015 at 8:30 AM, raggy > > wrote:
>>
>>> For a research project, I tried sorting the elements in an RDD. I did
>>> this in
>>> two different approaches.
>>>
>>> In the first method, I applied a mapPartitions() function on the RDD, so
>>> that it would sort the contents of the RDD, and provide a result RDD that
>>> contains the sorted list as the only record in the RDD. Then, I applied a
>>> reduce function which basically merges sorted lists.
>>>
>>> I ran these experiments on an EC2 cluster containing 30 nodes. I set it
>>> up
>>> using the spark ec2 script. The data file was stored in HDFS.
>>>
>>> In the second approach I used the sortBy method in Spark.
>>>
>>> I performed these operation on the US census data(100MB) found here
>>>
>>> A single lines looks like this
>>>
>>> 9, Not in universe, 0, 0, Children, 0, Not in universe, Never married,
>>> Not
>>> in universe or children, Not in universe, White, All other, Female, Not
>>> in
>>> universe, Not in universe, Children or Armed Forces, 0, 0, 0, Nonfiler,
>>> Not
>>> in universe, Not in universe, Child <18 never marr not in subfamily,
>>> Child
>>> under 18 never married, 1758.14, Nonmover, Nonmover, Nonmover, Yes, Not
>>> in
>>> universe, 0, Both parents present, United-States, United-States,
>>> United-States, Native- Born in the United States, 0, Not in universe, 0,
>>> 0,
>>> 94, - 5.
>>> I sorted based on the 25th value in the CSV. In this line that is
>>> 1758.14.
>>>
>>> I noticed that sortBy performs worse than the other method. Is this the
>>> expected scenario? If it is, why wouldn't the mapPartitions() and
>>> reduce()
>>> be the default sorting approach?
>>>
>>> Here is my implementation
>>>
>>> public static void sortBy(JavaSparkContext sc){
>>> JavaRDD rdd = sc.textFile("/data.txt",32);
>>> long start = System.currentTimeMillis();
>>> rdd.sortBy(new Function(){
>>>
>>> @Override
>>> public Double call(String v1) throws Exception {
>>>   // TODO Auto-generated method stub
>>>   String [] arr = v1.split(",");
>>>   return Double.parseDouble(arr[24]);
>>> }
>>> }, true, 9).collect();
>>> long end = System.currentTimeMillis();
>>> System.out.println("SortBy: " + (end - start));
>>>   }
>>>
>>> public static void sortList(JavaSparkContext sc){
>>> JavaRDD rdd = sc.textFile("/data.txt",32);
>>> //parallelize(l,
>>> 8);
>>> long start = System.currentTimeMillis();
>>> JavaRDD>> rdd3 =
>>> rdd.mapPartitions(new FlatMapFunction,
>>> LinkedList>>(){
>>>
>>>     @Override
>>>     public Iterable>>
>>> call(Iterator t)
>>> throws Exception {
>>>   // TODO Auto-generated method stub
>>>   LinkedList> lines = new
>>> LinkedList>();
>>>   while(t.hasNext()){
>>> String s = t.next();
>>> String arr1[] = s.split(",");
>>> Tuple2 t1 = new Tuple2>> String>(Double.parseDouble(arr1[24]),s);
>>> lines.add(t1);
>>>   }
>>>   Collections.sort(lines, new IncomeComparator());
>>>   LinkedList>> list = new
>>> LinkedList>>();
>>>   list.add(lines);
>>>   return list;
>>> }
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Different-Sorting-RDD-methods-in-Apache-Spark-tp23214.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> 
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>> 
>>>
>>>
>>
>


Re: Different Sorting RDD methods in Apache Spark

2015-06-09 Thread Mark Hamstra
Correct.  Trading away scalability for increased performance is not an
option for the standard Spark API.

On Tue, Jun 9, 2015 at 3:05 AM, Daniel Darabos <
daniel.dara...@lynxanalytics.com> wrote:

> It would be even faster to load the data on the driver and sort it there
> without using Spark :). Using reduce() is cheating, because it only works
> as long as the data fits on one machine. That is not the targeted use case
> of a distributed computation system. You can repeat your test with more
> data (that doesn't fit on one machine) to see what I mean.
>
> On Tue, Jun 9, 2015 at 8:30 AM, raggy  wrote:
>
>> For a research project, I tried sorting the elements in an RDD. I did
>> this in
>> two different approaches.
>>
>> In the first method, I applied a mapPartitions() function on the RDD, so
>> that it would sort the contents of the RDD, and provide a result RDD that
>> contains the sorted list as the only record in the RDD. Then, I applied a
>> reduce function which basically merges sorted lists.
>>
>> I ran these experiments on an EC2 cluster containing 30 nodes. I set it up
>> using the spark ec2 script. The data file was stored in HDFS.
>>
>> In the second approach I used the sortBy method in Spark.
>>
>> I performed these operation on the US census data(100MB) found here
>>
>> A single lines looks like this
>>
>> 9, Not in universe, 0, 0, Children, 0, Not in universe, Never married, Not
>> in universe or children, Not in universe, White, All other, Female, Not in
>> universe, Not in universe, Children or Armed Forces, 0, 0, 0, Nonfiler,
>> Not
>> in universe, Not in universe, Child <18 never marr not in subfamily, Child
>> under 18 never married, 1758.14, Nonmover, Nonmover, Nonmover, Yes, Not in
>> universe, 0, Both parents present, United-States, United-States,
>> United-States, Native- Born in the United States, 0, Not in universe, 0,
>> 0,
>> 94, - 5.
>> I sorted based on the 25th value in the CSV. In this line that is 1758.14.
>>
>> I noticed that sortBy performs worse than the other method. Is this the
>> expected scenario? If it is, why wouldn't the mapPartitions() and reduce()
>> be the default sorting approach?
>>
>> Here is my implementation
>>
>> public static void sortBy(JavaSparkContext sc){
>> JavaRDD rdd = sc.textFile("/data.txt",32);
>> long start = System.currentTimeMillis();
>> rdd.sortBy(new Function(){
>>
>> @Override
>> public Double call(String v1) throws Exception {
>>   // TODO Auto-generated method stub
>>   String [] arr = v1.split(",");
>>   return Double.parseDouble(arr[24]);
>> }
>> }, true, 9).collect();
>> long end = System.currentTimeMillis();
>> System.out.println("SortBy: " + (end - start));
>>   }
>>
>> public static void sortList(JavaSparkContext sc){
>> JavaRDD rdd = sc.textFile("/data.txt",32);
>> //parallelize(l,
>> 8);
>> long start = System.currentTimeMillis();
>> JavaRDD>> rdd3 =
>> rdd.mapPartitions(new FlatMapFunction,
>> LinkedList>>(){
>>
>> @Override
>> public Iterable>>
>> call(Iterator t)
>> throws Exception {
>>   // TODO Auto-generated method stub
>>   LinkedList> lines = new
>> LinkedList>();
>>   while(t.hasNext()){
>>     String s = t.next();
>> String arr1[] = s.split(",");
>> Tuple2 t1 = new Tuple2> String>(Double.parseDouble(arr1[24]),s);
>> lines.add(t1);
>>   }
>>   Collections.sort(lines, new IncomeComparator());
>>   LinkedList>> list = new
>> LinkedList>>();
>>   list.add(lines);
>>   return list;
>> }
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Different-Sorting-RDD-methods-in-Apache-Spark-tp23214.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Different Sorting RDD methods in Apache Spark

2015-06-09 Thread Daniel Darabos
It would be even faster to load the data on the driver and sort it there
without using Spark :). Using reduce() is cheating, because it only works
as long as the data fits on one machine. That is not the targeted use case
of a distributed computation system. You can repeat your test with more
data (that doesn't fit on one machine) to see what I mean.

On Tue, Jun 9, 2015 at 8:30 AM, raggy  wrote:

> For a research project, I tried sorting the elements in an RDD. I did this
> in
> two different approaches.
>
> In the first method, I applied a mapPartitions() function on the RDD, so
> that it would sort the contents of the RDD, and provide a result RDD that
> contains the sorted list as the only record in the RDD. Then, I applied a
> reduce function which basically merges sorted lists.
>
> I ran these experiments on an EC2 cluster containing 30 nodes. I set it up
> using the spark ec2 script. The data file was stored in HDFS.
>
> In the second approach I used the sortBy method in Spark.
>
> I performed these operation on the US census data(100MB) found here
>
> A single lines looks like this
>
> 9, Not in universe, 0, 0, Children, 0, Not in universe, Never married, Not
> in universe or children, Not in universe, White, All other, Female, Not in
> universe, Not in universe, Children or Armed Forces, 0, 0, 0, Nonfiler, Not
> in universe, Not in universe, Child <18 never marr not in subfamily, Child
> under 18 never married, 1758.14, Nonmover, Nonmover, Nonmover, Yes, Not in
> universe, 0, Both parents present, United-States, United-States,
> United-States, Native- Born in the United States, 0, Not in universe, 0, 0,
> 94, - 5.
> I sorted based on the 25th value in the CSV. In this line that is 1758.14.
>
> I noticed that sortBy performs worse than the other method. Is this the
> expected scenario? If it is, why wouldn't the mapPartitions() and reduce()
> be the default sorting approach?
>
> Here is my implementation
>
> public static void sortBy(JavaSparkContext sc){
> JavaRDD rdd = sc.textFile("/data.txt",32);
> long start = System.currentTimeMillis();
> rdd.sortBy(new Function(){
>
> @Override
> public Double call(String v1) throws Exception {
>   // TODO Auto-generated method stub
>   String [] arr = v1.split(",");
>   return Double.parseDouble(arr[24]);
> }
> }, true, 9).collect();
> long end = System.currentTimeMillis();
> System.out.println("SortBy: " + (end - start));
>   }
>
> public static void sortList(JavaSparkContext sc){
> JavaRDD rdd = sc.textFile("/data.txt",32); //parallelize(l,
> 8);
> long start = System.currentTimeMillis();
> JavaRDD>> rdd3 =
> rdd.mapPartitions(new FlatMapFunction,
> LinkedList>>(){
>
> @Override
> public Iterable>>
> call(Iterator t)
> throws Exception {
>   // TODO Auto-generated method stub
>   LinkedList> lines = new
> LinkedList>();
>   while(t.hasNext()){
> String s = t.next();
> String arr1[] = s.split(",");
> Tuple2 t1 = new Tuple2 String>(Double.parseDouble(arr1[24]),s);
> lines.add(t1);
>       }
>       Collections.sort(lines, new IncomeComparator());
>   LinkedList>> list = new
> LinkedList>>();
>   list.add(lines);
>   return list;
> }
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Different-Sorting-RDD-methods-in-Apache-Spark-tp23214.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Different Sorting RDD methods in Apache Spark

2015-06-08 Thread raggy
For a research project, I tried sorting the elements in an RDD. I did this in
two different approaches.

In the first method, I applied a mapPartitions() function on the RDD, so
that it would sort the contents of the RDD, and provide a result RDD that
contains the sorted list as the only record in the RDD. Then, I applied a
reduce function which basically merges sorted lists.

I ran these experiments on an EC2 cluster containing 30 nodes. I set it up
using the spark ec2 script. The data file was stored in HDFS.

In the second approach I used the sortBy method in Spark.

I performed these operation on the US census data(100MB) found here

A single lines looks like this

9, Not in universe, 0, 0, Children, 0, Not in universe, Never married, Not
in universe or children, Not in universe, White, All other, Female, Not in
universe, Not in universe, Children or Armed Forces, 0, 0, 0, Nonfiler, Not
in universe, Not in universe, Child <18 never marr not in subfamily, Child
under 18 never married, 1758.14, Nonmover, Nonmover, Nonmover, Yes, Not in
universe, 0, Both parents present, United-States, United-States,
United-States, Native- Born in the United States, 0, Not in universe, 0, 0,
94, - 5.
I sorted based on the 25th value in the CSV. In this line that is 1758.14.

I noticed that sortBy performs worse than the other method. Is this the
expected scenario? If it is, why wouldn't the mapPartitions() and reduce()
be the default sorting approach?

Here is my implementation

public static void sortBy(JavaSparkContext sc){
JavaRDD rdd = sc.textFile("/data.txt",32);
long start = System.currentTimeMillis();
rdd.sortBy(new Function(){

@Override
public Double call(String v1) throws Exception {
  // TODO Auto-generated method stub
  String [] arr = v1.split(",");
  return Double.parseDouble(arr[24]);   
}
}, true, 9).collect();
long end = System.currentTimeMillis();
System.out.println("SortBy: " + (end - start));
  }

public static void sortList(JavaSparkContext sc){
JavaRDD rdd = sc.textFile("/data.txt",32); //parallelize(l,
8);
long start = System.currentTimeMillis();
JavaRDD>> rdd3 =
rdd.mapPartitions(new FlatMapFunction,
LinkedList>>(){

@Override
public Iterable>>
call(Iterator t)
throws Exception {
  // TODO Auto-generated method stub
  LinkedList> lines = new
LinkedList>();
  while(t.hasNext()){   
String s = t.next();
String arr1[] = s.split(",");
Tuple2 t1 = new Tuple2(Double.parseDouble(arr1[24]),s);
lines.add(t1);
  }
  Collections.sort(lines, new IncomeComparator());
  LinkedList>> list = new
LinkedList>>();
  list.add(lines);
  return list;
    }



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Different-Sorting-RDD-methods-in-Apache-Spark-tp23214.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org