Hive on Spark Vs Spark SQL

2015-11-15 Thread kiran lonikar
I would like to know if Hive on Spark uses or shares the execution code
with Spark SQL or DataFrames?

More specifically, does Hive on Spark benefit from the changes made to
Spark SQL, project Tungsten? Or is it completely different execution path
where it creates its own plan and executes on RDD?

-Kiran


Re: Hive on Spark Vs Spark SQL

2015-11-15 Thread kiran lonikar
So does not benefit from Project Tungsten right?


On Mon, Nov 16, 2015 at 12:07 PM, Reynold Xin <r...@databricks.com> wrote:

> It's a completely different path.
>
>
> On Sun, Nov 15, 2015 at 10:37 PM, kiran lonikar <loni...@gmail.com> wrote:
>
>> I would like to know if Hive on Spark uses or shares the execution code
>> with Spark SQL or DataFrames?
>>
>> More specifically, does Hive on Spark benefit from the changes made to
>> Spark SQL, project Tungsten? Or is it completely different execution path
>> where it creates its own plan and executes on RDD?
>>
>> -Kiran
>>
>>
>


Fwd: Code generation for GPU

2015-09-03 Thread kiran lonikar
Hi,

I am speaking in Spark Europe summit on exploiting GPUs for columnar
DataFrame operations
.
I was going through various blogs, talks and JIRAs given by all the key
spark folks and trying to figure out where to make changes for this
proposal.

First of all, I must thank the recent progress in project tungsten that has
made my job easier. The changes for code generation

make it possible to allow me to generate OpenCL code for expressions
instead of existing java/scala code and run the OpenCL code on GPUs through
a Java library JavaCL.

However, before starting the work, I have a few questions/doubts as below:


   1. I found where the code generation
   

happens
   in spark code from the blogs
   
https://databricks.com/blog/2014/06/02/exciting-performance-improvements-on-the-horizon-for-spark-sql.html,

   
https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html
   and
   
https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html.
   However, I could not find where is the generated code executed? A major
   part of my changes will be there since this executor will now have to send
   vectors of columns to GPU RAM, invoke execution, and get the results back
   to CPU RAM. Thus, the existing executor will significantly change.
   2. On the project tungsten blog
   
,
   in the third Code Generation section, it is mentioned that you plan
   to increase the level of code generation from record-at-a-time expression
   evaluation to vectorized expression evaluation. Has this been implemented?
   If not, how do I implement this? I will need access to columnar ByteBuffer
   objects in DataFrame to do this. Having row by row access to data will
   defeat this exercise. In particular, I need access to
   
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
   in the executor of the generated code.
   3. One thing that confuses me is the changes from 1.4 to 1.5 possibly
   due to JIRA https://issues.apache.org/jira/browse/SPARK-7956 and pull
   request https://github.com/apache/spark/pull/6479/files*. *This changed
   the code generation from quasiquotes (q) to string s operator. This makes
   it simpler for me to generate OpenCL code which is string based. The
   question, is this branch stable now? Should I make my changes on spark 1.4
   or spark 1.5 or master branch?
   4. How do I tune the batch size (number of rows in the ByteBuffer)? Is
   it through the property spark.sql.inMemoryColumnarStorage.batchSize?


Thanks in anticipation,

Kiran
PS:

Other things I found useful were:

*Spark DataFrames*: https://www.brighttalk.com/webcast/12891/166495
*Apache Spark 1.5*: https://www.brighttalk.com/webcast/12891/168177

The links to JavaCL/ScalaCL:

*Library to execute OpenCL code through Java*:
https://github.com/nativelibs4java/ScalaCL
*Library to convert Scala code to OpenCL and execute on GPUs*:
https://github.com/nativelibs4java/JavaCL


Re: RDD of RDDs

2015-06-09 Thread kiran lonikar
Possibly in future, if and when spark architecture allows workers to launch
spark jobs (the functions passed to transformation or action APIs of RDD),
it will be possible to have RDD of RDD.

On Tue, Jun 9, 2015 at 1:47 PM, kiran lonikar loni...@gmail.com wrote:

 Simillar question was asked before:
 http://apache-spark-user-list.1001560.n3.nabble.com/Rdd-of-Rdds-td17025.html

 Here is one of the reasons why I think RDD[RDD[T]] is not possible:

- RDD is only a handle to the actual data partitions. It has a
reference/pointer to the *SparkContext* object (*sc*) and a list of
partitions.
- The *SparkContext *is an object in the Spark Application/Driver
Program's JVM. Similarly, the list of partitions is also in the JVM of the
driver program. Each partition contains kind of remote references to the
partition data on the worker JVMs.
- The functions passed to RDD's transformations and actions execute in
the worker's JVMs on different nodes. For example, in *rdd.map { x =
x*x }*, the function performing *x*x* runs on the JVMs of the
worker nodes where the partitions of the RDD reside. These JVMs do not have
access to the *sc* since its only on the driver's JVM.
- Thus, in case of your *RDD of RDD*: *outerRDD.map { innerRdd =
innerRDD.filter { x = x*x } }*, the worker nodes will not be able to
execute the *filter* on *innerRDD *as the code in the worker does not
have access to sc and can not launch a spark job.


 Hope it helps. You need to consider List[RDD] or some other collection.

 -Kiran

 On Tue, Jun 9, 2015 at 2:25 AM, ping yan sharon...@gmail.com wrote:

 Hi,


 The problem I am looking at is as follows:

 - I read in a log file of multiple users as a RDD

 - I'd like to group the above RDD into *multiple RDDs* by userIds (the
 key)

 - my processEachUser() function then takes in each RDD mapped into
 each individual user, and calls for RDD.map or DataFrame operations on
 them. (I already had the function coded, I am therefore reluctant to work
 with the ResultIterable object coming out of rdd.groupByKey() ... )

 I've searched the mailing list and googled on RDD of RDDs and seems
 like it isn't a thing at all.

 A few choices left seem to be: 1) groupByKey() and then work with the
 ResultIterable object; 2) groupbyKey() and then write each group into a
 file, and read them back as individual rdds to process..

 Anyone got a better idea or had a similar problem before?


 Thanks!
 Ping






 --
 Ping Yan
 Ph.D. in Management
 Dept. of Management Information Systems
 University of Arizona
 Tucson, AZ 85721





Re: RDD of RDDs

2015-06-09 Thread kiran lonikar
Simillar question was asked before:
http://apache-spark-user-list.1001560.n3.nabble.com/Rdd-of-Rdds-td17025.html

Here is one of the reasons why I think RDD[RDD[T]] is not possible:

   - RDD is only a handle to the actual data partitions. It has a
   reference/pointer to the *SparkContext* object (*sc*) and a list of
   partitions.
   - The *SparkContext *is an object in the Spark Application/Driver
   Program's JVM. Similarly, the list of partitions is also in the JVM of the
   driver program. Each partition contains kind of remote references to the
   partition data on the worker JVMs.
   - The functions passed to RDD's transformations and actions execute in
   the worker's JVMs on different nodes. For example, in *rdd.map { x =
   x*x }*, the function performing *x*x* runs on the JVMs of the worker
   nodes where the partitions of the RDD reside. These JVMs do not have access
   to the *sc* since its only on the driver's JVM.
   - Thus, in case of your *RDD of RDD*: *outerRDD.map { innerRdd =
   innerRDD.filter { x = x*x } }*, the worker nodes will not be able to
   execute the *filter* on *innerRDD *as the code in the worker does not
   have access to sc and can not launch a spark job.


Hope it helps. You need to consider List[RDD] or some other collection.

-Kiran

On Tue, Jun 9, 2015 at 2:25 AM, ping yan sharon...@gmail.com wrote:

 Hi,


 The problem I am looking at is as follows:

 - I read in a log file of multiple users as a RDD

 - I'd like to group the above RDD into *multiple RDDs* by userIds (the
 key)

 - my processEachUser() function then takes in each RDD mapped into
 each individual user, and calls for RDD.map or DataFrame operations on
 them. (I already had the function coded, I am therefore reluctant to work
 with the ResultIterable object coming out of rdd.groupByKey() ... )

 I've searched the mailing list and googled on RDD of RDDs and seems like
 it isn't a thing at all.

 A few choices left seem to be: 1) groupByKey() and then work with the
 ResultIterable object; 2) groupbyKey() and then write each group into a
 file, and read them back as individual rdds to process..

 Anyone got a better idea or had a similar problem before?


 Thanks!
 Ping






 --
 Ping Yan
 Ph.D. in Management
 Dept. of Management Information Systems
 University of Arizona
 Tucson, AZ 85721




Re: RDD of RDDs

2015-06-09 Thread kiran lonikar
Yes true. That's why I said if and when.

But hopefully I have given correct explanation of why rdd of rdd is not
possible.
On 09-Jun-2015 10:22 pm, Mark Hamstra m...@clearstorydata.com wrote:

 That would constitute a major change in Spark's architecture.  It's not
 happening anytime soon.

 On Tue, Jun 9, 2015 at 1:34 AM, kiran lonikar loni...@gmail.com wrote:

 Possibly in future, if and when spark architecture allows workers to
 launch spark jobs (the functions passed to transformation or action APIs of
 RDD), it will be possible to have RDD of RDD.

 On Tue, Jun 9, 2015 at 1:47 PM, kiran lonikar loni...@gmail.com wrote:

 Simillar question was asked before:
 http://apache-spark-user-list.1001560.n3.nabble.com/Rdd-of-Rdds-td17025.html

 Here is one of the reasons why I think RDD[RDD[T]] is not possible:

- RDD is only a handle to the actual data partitions. It has a
reference/pointer to the *SparkContext* object (*sc*) and a list of
partitions.
- The *SparkContext *is an object in the Spark Application/Driver
Program's JVM. Similarly, the list of partitions is also in the JVM of 
 the
driver program. Each partition contains kind of remote references to 
 the
partition data on the worker JVMs.
- The functions passed to RDD's transformations and actions execute
in the worker's JVMs on different nodes. For example, in *rdd.map {
x = x*x }*, the function performing *x*x* runs on the JVMs of
the worker nodes where the partitions of the RDD reside. These JVMs do 
 not
have access to the *sc* since its only on the driver's JVM.
- Thus, in case of your *RDD of RDD*: *outerRDD.map { innerRdd =
innerRDD.filter { x = x*x } }*, the worker nodes will not be able
to execute the *filter* on *innerRDD *as the code in the worker does
not have access to sc and can not launch a spark job.


 Hope it helps. You need to consider List[RDD] or some other collection.

 -Kiran

 On Tue, Jun 9, 2015 at 2:25 AM, ping yan sharon...@gmail.com wrote:

 Hi,


 The problem I am looking at is as follows:

 - I read in a log file of multiple users as a RDD

 - I'd like to group the above RDD into *multiple RDDs* by userIds (the
 key)

 - my processEachUser() function then takes in each RDD mapped into
 each individual user, and calls for RDD.map or DataFrame operations on
 them. (I already had the function coded, I am therefore reluctant to work
 with the ResultIterable object coming out of rdd.groupByKey() ... )

 I've searched the mailing list and googled on RDD of RDDs and seems
 like it isn't a thing at all.

 A few choices left seem to be: 1) groupByKey() and then work with the
 ResultIterable object; 2) groupbyKey() and then write each group into a
 file, and read them back as individual rdds to process..

 Anyone got a better idea or had a similar problem before?


 Thanks!
 Ping






 --
 Ping Yan
 Ph.D. in Management
 Dept. of Management Information Systems
 University of Arizona
 Tucson, AZ 85721







Re: Optimisation advice for Avro-Parquet merge job

2015-06-08 Thread kiran lonikar
James,

As I can see, there are three distinct parts to your program:

   - for loop
   - synchronized block
   - final outputFrame.save statement

Can you do a separate timing measurement by putting a simple
System.currentTimeMillis() around these blocks to know how much they are
taking and then try to optimize where it takes longest? In the second
block, you may want to measure the time for the two statements. Improving
this boils down to playing with spark settings.

Now consider the first block: I think this is a classic case of merge sort
or a reduce tree. You already tried to improve this by submitting jobs in
parallel using executor pool/Callable etc.

To further improve the parallelization, I suggest you use a reduce tree
like approach. For example, lets say you want to compute sum of all
elements of an array in parallel. The way its solved for a GPU like
platform is you divide your input array initially in chunks of 2, compute
those n/2 sums parallely on separate threads and save the result in the
first of the two elements. In the next iteration, you compute n/4 sums
parallely of the earlier sums and so on till you are left with only two
elements whose sum gives you final sum.

You are performing many sequential unionAll operations for inputs.size()
avro files. Assuming the unionAll() on DataFrame is blocking (and not a
simple transformation like on RDDs) and actually performs the union
operation, you will certainly benefit by parallelizing this loop. You may
change the loop to something like below:

// pseudo code only
int n = inputs.size()
// initialize executor
executor = new FixedThreadPoolExecutor(n/2)
dfInput = new DataFrame[n/2]
for(int i =0;i  n/2;i++) {
executor.submit(new Runnable() {
public void run() {
// union of i and i+n/2
// showing [] only to bring out array access. Replace with
dfInput(i) in your code
dfInput[i] = sqlContext.load(inputPaths.get(i),
com.databricks.spark.avro).unionAll(sqlContext.load(inputsPath.get(i +
n/2), com.databricks.spark.avro))
}
});
}

executor.awaitTermination(0, TimeUnit.SECONDS)

int steps = log(n)/log(2.0)
for(s = 2; s  steps;s++) {
int stride = n/(1  s); // n/(2^s)
for(int i = 0;i  stride;i++) {
executor.submit(new Runnable() {
public void run() {
// union of i and i+n/2
// showing [] only to bring out array access. Replace with
dfInput(i) and dfInput(i+stride) in your code
dfInput[i] = dfInput[i].unionAll(dfInput[i + stride])
}
});
}
executor.awaitTermination(0, TimeUnit.SECONDS)
}

Let me know if it helped.

-Kiran


On Thu, Jun 4, 2015 at 8:57 PM, James Aley james.a...@swiftkey.com wrote:

 Thanks for the confirmation! We're quite new to Spark, so a little
 reassurance is a good thing to have sometimes :-)

 The thing that's concerning me at the moment is that my job doesn't seem
 to run any faster with more compute resources added to the cluster, and
 this is proving a little tricky to debug. There are a lot of variables, so
 here's what we've tried already and the apparent impact. If anyone has any
 further suggestions, we'd love to hear!

 * Increase the minimum number of output files (targetPartitions above),
 so that input groups smaller than our minimum chunk size can still be
 worked on by more than one executor. This does measurably speed things up,
 but obviously it's a trade-off, as the original goal for this job is to
 merge our data into fewer, larger files.

 * Submit many jobs in parallel, by running the above code in a Callable,
 on an executor pool. This seems to help, to some extent, but I'm not sure
 what else needs to be configured alongside it -- driver threads, scheduling
 policy, etc. We set scheduling to FAIR when doing this, as that seemed
 like the right approach, but we're not 100% confident. It seemed to help
 quite substantially anyway, so perhaps this just needs further tuning?

 * Increasing executors, RAM, etc. This doesn't make a difference by itself
 for this job, so I'm thinking we're already not fully utilising the
 resources we have in a smaller cluster.

 Again, any recommendations appreciated. Thanks for the help!


 James.

 On 4 June 2015 at 15:00, Eugen Cepoi cepoi.eu...@gmail.com wrote:

 Hi

 2015-06-04 15:29 GMT+02:00 James Aley james.a...@swiftkey.com:

 Hi,

 We have a load of Avro data coming into our data systems in the form of
 relatively small files, which we're merging into larger Parquet files with
 Spark. I've been following the docs and the approach I'm taking seemed
 fairly obvious, and pleasingly simple, but I'm wondering if perhaps it's
 not the most optimal approach.

 I was wondering if anyone on this list might have some advice to make to
 make this job as efficient as possible. Here's some code:

 DataFrame dfInput = sqlContext.load(inputPaths.get(0),
 com.databricks.spark.avro);
 long totalSize = 

Re: Column operation on Spark RDDs.

2015-06-08 Thread kiran lonikar
Two simple suggestions:
1. No need to call zipWithIndex twice. Use the earlier RDD dt.
2. Replace zipWithIndex with zipWithUniqueId which does not trigger a spark
job

Below your code with the above changes:

var dataRDD = sc.textFile(/test.csv).map(_.split(,))
val dt = dataRDD.*zipWithUniqueId*.map(_.swap)
val newCol1 = *dt*.map {case (i, x) = (i, x(1)+x(18)) }
val newCol2 = newCol1.join(dt).map(x= function(.))

Hope this helps.
Kiran


On Fri, Jun 5, 2015 at 8:15 AM, Carter gyz...@hotmail.com wrote:

 Hi, I have a RDD with MANY columns (e.g., hundreds), and most of my
 operation
 is on columns, e.g., I need to create many intermediate variables from
 different columns, what is the most efficient way to do this?

 For example, if my dataRDD[Array[String]] is like below:

 123, 523, 534, ..., 893
 536, 98, 1623, ..., 98472
 537, 89, 83640, ..., 9265
 7297, 98364, 9, ..., 735
 ..
 29, 94, 956, ..., 758

 I will need to create a new column or a variable as newCol1 =
 2ndCol+19thCol, and another new column based on newCol1 and the existing
 columns: newCol2 = function(newCol1, 34thCol), what is the best way of
 doing
 this?

 I have been thinking using index for the intermediate variables and the
 dataRDD, and then join them together on the index to do my calculation:
 var dataRDD = sc.textFile(/test.csv).map(_.split(,))
 val dt = dataRDD.zipWithIndex.map(_.swap)
 val newCol1 = dataRDD.map(x = x(1)+x(18)).zipWithIndex.map(_.swap)
 val newCol2 = newCol1.join(dt).map(x= function(.))

 Is there a better way of doing this?

 Thank you very much!












 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Column-operation-on-Spark-RDDs-tp23165.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: columnar structure of RDDs from Parquet or ORC files

2015-06-08 Thread kiran lonikar
Thanks. Can you point me to a place in the documentation of SQL programming
guide or DataFrame scaladoc where this transformation and actions are
grouped like in the case of RDD?

Also if you can tell me if sqlContext.load and unionAll are transformations
or actions...

I answered a question on the forum assuming unionAll is a blocking call and
said execution of multiple load and df.unionAll in different threads would
benefit performance :)

Kiran
On 08-Jun-2015 4:37 pm, Cheng Lian lian.cs@gmail.com wrote:

  For DataFrame, there are also transformations and actions. And
 transformations are also lazily evaluated. However, DataFrame
 transformations like filter(), select(), agg() return a DataFrame rather
 than an RDD. Other methods like show() and collect() are actions.

 Cheng

 On 6/8/15 1:33 PM, kiran lonikar wrote:

 Thanks for replying twice :) I think I sent this question by email and
 somehow thought I did not sent it, hence created the other one on the web
 interface. Lets retain this thread since you have provided more details
 here.

  Great, it confirms my intuition about DataFrame. It's similar to Shark
 columnar layout, with the addition of compression. There it used java nio's
 ByteBuffer to hold actual data. I will go through the code you pointed.

  I have another question about DataFrame: The RDD operations are divided
 in two groups: *transformations *which are lazily evaluated and return a
 new RDD and *actions *which evaluate lineage defined by transformations,
 invoke actions and return results. What about DataFrame operations like
 join, groupBy, agg, unionAll etc which are all transformations in RDD? Are
 they lazily evaluated or immediately executed?






Re: columnar structure of RDDs from Parquet or ORC files

2015-06-08 Thread kiran lonikar
Hi Cheng, Ayan,

Thanks for the answers. I like the rule of thumb. I cursorily went through
the DataFrame, SQLContext and sql.execution.basicOperators.scala code. It
is apparent that these functions are lazily evaluated. The SQLContext.load
functions are similar to SparkContext.textFile kind of functions which
simply create an RDD but the data load actually happens when an action is
invoked.

I would like to modify the formula for DF. I think it should be *DF = RDD
+ Schema + additional methods (like loading/saving from/to JDBC) + columnar
layout. *DF is not a simple wrapper around RDD.

As another interest, I wanted check if some of the DF execution functions
can be executed on GPUs. For that to happen, the columnar layout is
important. Here is where DF scores over ordinary RDDs.

Seems like the batch size defined by
spark.sql.inMemoryColumnarStorage.batchSize is set to a default size of
1. I wonder if it can be set to 100K or 1 million so that computations
involving primitive data types can be efficiently carried out on GPU (if
present). Higher batch sizes mean data can be efficiently transferred
to/from GPU RAM involving fewer transfers. Also, at least initially the
other parameter spark.sql.inMemoryColumnarStorage.compressed will have to
be set to false since uncompressing on GPU is not so straightforward
(issues of how much data each GPU thread should handle and uncoalesced
memory access).

-Kiran


On Mon, Jun 8, 2015 at 8:25 PM, Cheng Lian lian.cs@gmail.com wrote:

  You may refer to DataFrame Scaladoc
 http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrame

 Methods listed in Language Integrated Queries and RDD Options can be
 viewed as transformations, and those listed in Actions are, of course,
 actions.  As for SQLContext.load, it's listed in the Generic Data Sources
 section
 http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.SQLContext

 I think a simple rule can be: if a DataFrame method or a SQLContext method
 returns a DataFrame or an RDD, then it is lazily evaluated, since DataFrame
 and RDD are both lazily evaluated.

 Cheng


 On 6/8/15 8:11 PM, kiran lonikar wrote:

 Thanks. Can you point me to a place in the documentation of SQL
 programming guide or DataFrame scaladoc where this transformation and
 actions are grouped like in the case of RDD?

 Also if you can tell me if sqlContext.load and unionAll are
 transformations or actions...

 I answered a question on the forum assuming unionAll is a blocking call
 and said execution of multiple load and df.unionAll in different threads
 would benefit performance :)

 Kiran
 On 08-Jun-2015 4:37 pm, Cheng Lian lian.cs@gmail.com wrote:

  For DataFrame, there are also transformations and actions. And
 transformations are also lazily evaluated. However, DataFrame
 transformations like filter(), select(), agg() return a DataFrame rather
 than an RDD. Other methods like show() and collect() are actions.

 Cheng

 On 6/8/15 1:33 PM, kiran lonikar wrote:

 Thanks for replying twice :) I think I sent this question by email and
 somehow thought I did not sent it, hence created the other one on the web
 interface. Lets retain this thread since you have provided more details
 here.

  Great, it confirms my intuition about DataFrame. It's similar to Shark
 columnar layout, with the addition of compression. There it used java nio's
 ByteBuffer to hold actual data. I will go through the code you pointed.

  I have another question about DataFrame: The RDD operations are divided
 in two groups: *transformations *which are lazily evaluated and return a
 new RDD and *actions *which evaluate lineage defined by transformations,
 invoke actions and return results. What about DataFrame operations like
 join, groupBy, agg, unionAll etc which are all transformations in RDD? Are
 they lazily evaluated or immediately executed?







Re: Optimisation advice for Avro-Parquet merge job

2015-06-08 Thread kiran lonikar
It turns out my assumption on load and unionAll being blocking is not
correct. They are transformations. So instead of just running only the load
and unionAll in the run() methods, I think you will have to save the
intermediate dfInput[i] to temp (parquet) files (possibly to in memory DFS
like http://tachyon-project.org/) in the run() methods. The second for loop
will also have to load from the intermediate parquet files. Then finally
save the final dfInput[0] to the HDFS.

I think this way of parallelizing will force the cluster to utilize the all
the resources.

-Kiran

On Mon, Jun 8, 2015 at 12:30 PM, kiran lonikar loni...@gmail.com wrote:

 James,

 As I can see, there are three distinct parts to your program:

- for loop
- synchronized block
- final outputFrame.save statement

 Can you do a separate timing measurement by putting a simple
 System.currentTimeMillis() around these blocks to know how much they are
 taking and then try to optimize where it takes longest? In the second
 block, you may want to measure the time for the two statements. Improving
 this boils down to playing with spark settings.

 Now consider the first block: I think this is a classic case of merge sort
 or a reduce tree. You already tried to improve this by submitting jobs in
 parallel using executor pool/Callable etc.

 To further improve the parallelization, I suggest you use a reduce tree
 like approach. For example, lets say you want to compute sum of all
 elements of an array in parallel. The way its solved for a GPU like
 platform is you divide your input array initially in chunks of 2, compute
 those n/2 sums parallely on separate threads and save the result in the
 first of the two elements. In the next iteration, you compute n/4 sums
 parallely of the earlier sums and so on till you are left with only two
 elements whose sum gives you final sum.

 You are performing many sequential unionAll operations for inputs.size()
 avro files. Assuming the unionAll() on DataFrame is blocking (and not a
 simple transformation like on RDDs) and actually performs the union
 operation, you will certainly benefit by parallelizing this loop. You may
 change the loop to something like below:

 // pseudo code only
 int n = inputs.size()
 // initialize executor
 executor = new FixedThreadPoolExecutor(n/2)
 dfInput = new DataFrame[n/2]
 for(int i =0;i  n/2;i++) {
 executor.submit(new Runnable() {
 public void run() {
 // union of i and i+n/2
 // showing [] only to bring out array access. Replace with
 dfInput(i) in your code
 dfInput[i] = sqlContext.load(inputPaths.get(i),
 com.databricks.spark.avro).unionAll(sqlContext.load(inputsPath.get(i +
 n/2), com.databricks.spark.avro))
 }
 });
 }

 executor.awaitTermination(0, TimeUnit.SECONDS)

 int steps = log(n)/log(2.0)
 for(s = 2; s  steps;s++) {
 int stride = n/(1  s); // n/(2^s)
 for(int i = 0;i  stride;i++) {
 executor.submit(new Runnable() {
 public void run() {
 // union of i and i+n/2
 // showing [] only to bring out array access. Replace with
 dfInput(i) and dfInput(i+stride) in your code
 dfInput[i] = dfInput[i].unionAll(dfInput[i + stride])
 }
 });
 }
 executor.awaitTermination(0, TimeUnit.SECONDS)
 }

 Let me know if it helped.

 -Kiran


 On Thu, Jun 4, 2015 at 8:57 PM, James Aley james.a...@swiftkey.com
 wrote:

 Thanks for the confirmation! We're quite new to Spark, so a little
 reassurance is a good thing to have sometimes :-)

 The thing that's concerning me at the moment is that my job doesn't seem
 to run any faster with more compute resources added to the cluster, and
 this is proving a little tricky to debug. There are a lot of variables, so
 here's what we've tried already and the apparent impact. If anyone has any
 further suggestions, we'd love to hear!

 * Increase the minimum number of output files (targetPartitions above),
 so that input groups smaller than our minimum chunk size can still be
 worked on by more than one executor. This does measurably speed things up,
 but obviously it's a trade-off, as the original goal for this job is to
 merge our data into fewer, larger files.

 * Submit many jobs in parallel, by running the above code in a Callable,
 on an executor pool. This seems to help, to some extent, but I'm not sure
 what else needs to be configured alongside it -- driver threads, scheduling
 policy, etc. We set scheduling to FAIR when doing this, as that seemed
 like the right approach, but we're not 100% confident. It seemed to help
 quite substantially anyway, so perhaps this just needs further tuning?

 * Increasing executors, RAM, etc. This doesn't make a difference by
 itself for this job, so I'm thinking we're already not fully utilising the
 resources we have in a smaller cluster.

 Again, any recommendations appreciated. Thanks for the help!


 James.

 On 4 June 2015 at 15

Re: columnar structure of RDDs from Parquet or ORC files

2015-06-07 Thread kiran lonikar
Thanks for replying twice :) I think I sent this question by email and
somehow thought I did not sent it, hence created the other one on the web
interface. Lets retain this thread since you have provided more details
here.

Great, it confirms my intuition about DataFrame. It's similar to Shark
columnar layout, with the addition of compression. There it used java nio's
ByteBuffer to hold actual data. I will go through the code you pointed.

I have another question about DataFrame: The RDD operations are divided in
two groups: *transformations *which are lazily evaluated and return a new
RDD and *actions *which evaluate lineage defined by transformations, invoke
actions and return results. What about DataFrame operations like join,
groupBy, agg, unionAll etc which are all transformations in RDD? Are they
lazily evaluated or immediately executed?


columnar structure of RDDs from Parquet or ORC files

2015-06-03 Thread kiran lonikar
When spark reads parquet files (sqlContext.parquetFile), it creates a
DataFrame RDD. I would like to know if the resulting DataFrame has columnar
structure (many rows of a column coalesced together in memory) or its a row
wise structure that a spark RDD has. The section Spark SQL and DataFrames
http://spark.apache.org/docs/latest/sql-programming-guide.html#caching-data-in-memory
says
you need to call sqlContext.cacheTable(tableName) or df.cache() to make
it columnar. What exactly is this columnar structure?

To be precise: What does the row represent in the expression
df.cache().map{row = ...}?

Is it a logical row which maintains an array of columns and each column in
turn is an array of values for batchSize rows?

-Kiran