Spark 1.4 DataFrame Parquet file writing - missing random rows/partitions

2015-06-16 Thread Nathan McCarthy
Hi all,

Looks like data frame parquet writing is very broken in Spark 1.4.0. We had no 
problems with Spark 1.3.

When trying to save a data frame with 569610608 rows.

  dfc.write.format("parquet").save(“/data/map_parquet_file")

We get random results between runs. Caching the data frame in memory makes no 
difference. It looks like the write out misses some of the RDD partitions. We 
have an RDD with 6750 partitions. When we write out we get less files out than 
the number of partitions. When reading the data back in and running a count, we 
get smaller number of rows.

I’ve tried counting the rows in all different ways. All return the same result, 
560214031 rows, missing about 9.4 million rows (0.15%).

  qc.read.parquet("/data/map_parquet_file").count
  qc.read.parquet("/data/map_parquet_file").rdd.count
  qc.read.parquet("/data/map_parquet_file").mapPartitions{itr => var c = 0; 
itr.foreach(_ => c = c + 1); Seq(c).toIterator }.reduce(_ + _)

Looking on HDFS the files, there are 6643 .parquet files. 107 missing 
partitions (about 0.15%).

Then writing out the same cached DF again to a new file gives 6717 files on 
hdfs (about 33 files missing or 0.5%);

  dfc.write.parquet(“/data/map_parquet_file_2")

And we get 566670107 rows back (about 3million missing ~0.5%);

  qc.read.parquet("/data/map_parquet_file_2").count

Writing the same df out to json writes the expected number (6750) of parquet 
files and returns the right number of rows 569610608.

  dfc.write.format("json").save("/data/map_parquet_file_3")
  qc.read.format("json").load("/data/map_parquet_file_3").count

One thing to note is that the parquet part files on HDFS are not the normal 
sequential part numbers like for the json output and parquet output in Spark 
1.3.

part-r-06151.gz.parquet  part-r-118401.gz.parquet  part-r-146249.gz.parquet  
part-r-196755.gz.parquet  part-r-35811.gz.parquet   part-r-55628.gz.parquet  
part-r-73497.gz.parquet  part-r-97237.gz.parquet
part-r-06161.gz.parquet  part-r-118406.gz.parquet  part-r-146254.gz.parquet  
part-r-196763.gz.parquet  part-r-35826.gz.parquet   part-r-55647.gz.parquet  
part-r-73500.gz.parquet  _SUCCESS

We are using MapR 4.0.2 for hdfs.

Any ideas?

Cheers,
Nathan



Re: Spark 1.4 DataFrame Parquet file writing - missing random rows/partitions

2015-06-17 Thread Nathan McCarthy
Thanks Cheng. Nice find!

Let me know if there is anything we can do to help on this end with 
contributing a fix or testing.

Side note - any ideas on the 1.4.1 eta? There are a few bug fixes we need in 
there.

Cheers,
Nathan

From: Cheng Lian
Date: Wednesday, 17 June 2015 6:25 pm
To: Nathan, "user@spark.apache.org<mailto:user@spark.apache.org>"
Subject: Re: Spark 1.4 DataFrame Parquet file writing - missing random 
rows/partitions

Hi Nathan,

Thanks a lot for the detailed report, especially the information about 
nonconsecutive part numbers. It's confirmed to be a race condition bug and just 
filed https://issues.apache.org/jira/browse/SPARK-8406 to track this. Will 
deliver a fix ASAP and this will be included in 1.4.1.

Best,
Cheng

On 6/16/15 12:30 AM, Nathan McCarthy wrote:
Hi all,

Looks like data frame parquet writing is very broken in Spark 1.4.0. We had no 
problems with Spark 1.3.

When trying to save a data frame with 569610608 rows.

  dfc.write.format("parquet").save(“/data/map_parquet_file")

We get random results between runs. Caching the data frame in memory makes no 
difference. It looks like the write out misses some of the RDD partitions. We 
have an RDD with 6750 partitions. When we write out we get less files out than 
the number of partitions. When reading the data back in and running a count, we 
get smaller number of rows.

I’ve tried counting the rows in all different ways. All return the same result, 
560214031 rows, missing about 9.4 million rows (0.15%).

  qc.read.parquet("/data/map_parquet_file").count
  qc.read.parquet("/data/map_parquet_file").rdd.count
  qc.read.parquet("/data/map_parquet_file").mapPartitions{itr => var c = 0; 
itr.foreach(_ => c = c + 1); Seq(c).toIterator }.reduce(_ + _)

Looking on HDFS the files, there are 6643 .parquet files. 107 missing 
partitions (about 0.15%).

Then writing out the same cached DF again to a new file gives 6717 files on 
hdfs (about 33 files missing or 0.5%);

  dfc.write.parquet(“/data/map_parquet_file_2")

And we get 566670107 rows back (about 3million missing ~0.5%);

  qc.read.parquet("/data/map_parquet_file_2").count

Writing the same df out to json writes the expected number (6750) of parquet 
files and returns the right number of rows 569610608.

  dfc.write.format("json").save("/data/map_parquet_file_3")
  qc.read.format("json").load("/data/map_parquet_file_3").count

One thing to note is that the parquet part files on HDFS are not the normal 
sequential part numbers like for the json output and parquet output in Spark 
1.3.

part-r-06151.gz.parquet  part-r-118401.gz.parquet  part-r-146249.gz.parquet  
part-r-196755.gz.parquet  part-r-35811.gz.parquet   part-r-55628.gz.parquet  
part-r-73497.gz.parquet  part-r-97237.gz.parquet
part-r-06161.gz.parquet  part-r-118406.gz.parquet  part-r-146254.gz.parquet  
part-r-196763.gz.parquet  part-r-35826.gz.parquet   part-r-55647.gz.parquet  
part-r-73500.gz.parquet  _SUCCESS

We are using MapR 4.0.2 for hdfs.

Any ideas?

Cheers,
Nathan




Spark SQL DATE_ADD function - Spark 1.3.1 & 1.4.0

2015-06-17 Thread Nathan McCarthy
Hi guys,

Running with a parquet backed table in hive ‘dim_promo_date_curr_p' which has 
the following data;

scala> sqlContext.sql("select * from pz.dim_promo_date_curr_p").show(3)
15/06/18 00:53:21 INFO ParseDriver: Parsing command: select * from 
pz.dim_promo_date_curr_p
15/06/18 00:53:21 INFO ParseDriver: Parse Completed
+--+-+---+
|clndr_date|pw_start_date|pw_end_date|
+--+-+---+
|2015-02-18|   2015-02-18| 2015-02-24|
|2015-11-13|   2015-11-11| 2015-11-17|
|2015-03-31|   2015-03-25| 2015-03-31|
|2015-07-21|   2015-07-15| 2015-07-21|
+--+-+---+

Running a query from Spark 1.4 shell with the sqlContext (hive) with date_add 
it seems to work except for the value from the table. I’ve only seen it on the 
31st of March, no other dates;

scala> sqlContext.sql("SELECT DATE_ADD(CLNDR_DATE, 7) as wrong, 
DATE_ADD('2015-03-30', 7) as right30, DATE_ADD('2015-03-31', 7) as right31, 
DATE_ADD('2015-04-01', 7) as right01 FROM pz.dim_promo_date_curr_p WHERE 
CLNDR_DATE='2015-03-31'").show

15/06/18 00:57:32 INFO ParseDriver: Parsing command: SELECT 
DATE_ADD(CLNDR_DATE, 7) as wrong, DATE_ADD('2015-03-30', 7) as right30, 
DATE_ADD('2015-03-31', 7) as right31, DATE_ADD('2015-04-01', 7) as right01 FROM 
pz.dim_promo_date_curr_p WHERE CLNDR_DATE='2015-03-31'
15/06/18 00:57:32 INFO ParseDriver: Parse Completed
+--+--+--+--+
| wrong|   right30|   right31|   right01|
+--+--+--+--+
|2015-04-06|2015-04-06|2015-04-07|2015-04-08|
+--+--+--+--+

It seems to miss a date, even though the where clause has 31st in it. When the 
date is just a string the select clause seems to work fine. Problem appears in 
Spark 1.3.1 as well.

Not sure if this is coming from Hive, but it seems like a bug. I’ve raised a 
JIRA https://issues.apache.org/jira/browse/SPARK-8421

Cheers,
Nathan




SparkSubmit with Ivy jars is very slow to load with no internet access

2015-06-18 Thread Nathan McCarthy
Hey,

Spark Submit adds maven central & spark bintray to the ChainResolver before it 
adds any external resolvers. 
https://github.com/apache/spark/blob/branch-1.4/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L821

When running on a cluster without internet access, this means the spark shell 
takes forever to launch as it tries these two remote repos before the ones 
specified in the --repositories list. In our case we have a proxy which the 
cluster can access it and supply it via —repositories. This is also a problem 
for users who maintain a proxy for maven/ivy repos with something like 
Nexus/Artifactory.

I see two options for a fix;

  *   Change the order repos are added to the ChainResolver, making the 
--repositories supplied repos come before anything else. 
https://github.com/apache/spark/blob/branch-1.4/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L843
  *   Have a config option (like spark.jars.ivy.useDefaultRemoteRepos, default 
true) which when false wont add the maven central & bintry to the ChainResolver.

Happy to do a PR now for this if someone can give me a recommendation on which 
option would be better.

JIRA here; https://issues.apache.org/jira/browse/SPARK-8475

Cheers,
Nathan



SparkSQL schemaRDD & MapPartitions calls - performance issues - columnar formats?

2015-01-06 Thread Nathan McCarthy
Hi,

I’m trying to use a combination of SparkSQL and ‘normal' Spark/Scala via 
rdd.mapPartitions(…). Using the latest release 1.2.0.

Simple example; load up some sample data from parquet on HDFS (about 380m rows, 
10 columns) on a 7 node cluster.

  val t = sqlC.parquetFile("/user/n/sales-tran12m.parquet”)
  t.registerTempTable("test1”)
  sqlC.cacheTable("test1”)

Now lets do some operations on it; I want the total sales & quantities sold for 
each hour in the day so I choose 3 out of the 10 possible columns...

  sqlC.sql("select Hour, sum(ItemQty), sum(Sales) from test1 group by 
Hour").collect().foreach(println)

After the table has been 100% cached in memory, this takes around 11 seconds.

Lets do the same thing but via a MapPartitions call (this isn’t production 
ready code but gets the job done).

  val try2 = sqlC.sql("select Hour, ItemQty, Sales from test1”)
  rddPC.mapPartitions { case hrs =>
val qtySum = new Array[Double](24)
val salesSum = new Array[Double](24)

for(r <- hrs) {
  val hr = r.getInt(0)
  qtySum(hr) += r.getDouble(1)
  salesSum(hr) += r.getDouble(2)
}
(salesSum zip qtySum).zipWithIndex.map(_.swap).iterator
  }.reduceByKey((a,b) => (a._1 + b._1, a._2 + b._2)).collect().foreach(println)

Now this takes around ~49 seconds… Even though test1 table is 100% cached. The 
number of partitions remains the same…

Now if I create a simple RDD of a case class HourSum(hour: Int, qty: Double, 
sales: Double)

Convert the SchemaRDD;
val rdd = sqlC.sql("select * from test1").map{ r => HourSum(r.getInt(1), 
r.getDouble(7), r.getDouble(8)) }.cache()
//cache all the data
rdd.count()

Then run basically the same MapPartitions query;

rdd.mapPartitions { case hrs =>
  val qtySum = new Array[Double](24)
  val salesSum = new Array[Double](24)

  for(r <- hrs) {
val hr = r.hour
qtySum(hr) += r.qty
salesSum(hr) += r.sales
  }
  (salesSum zip qtySum).zipWithIndex.map(_.swap).iterator
}.reduceByKey((a,b) => (a._1 + b._1, a._2 + b._2)).collect().foreach(println)

This takes around 1.5 seconds! Albeit the memory footprint is much larger.

My thinking is that because SparkSQL does store things in a columnar format, 
there is some unwrapping to be done out of the column array buffers which takes 
time and for some reason this just takes longer when I switch out to map 
partitions (maybe its unwrapping the entire row, even though I’m using just a 
subset of columns, or maybe there is some object creation/autoboxing going on 
when calling getInt or getDouble)…

I’ve tried simpler cases too, like just summing sales. Running sum via SQL is 
fast (4.7 seconds), running a mapPartition sum on a double RDD is even faster 
(2.6 seconds). But MapPartitions on the SchemaRDD;

sqlC.sql("select SalesInclGST from test1").mapPartitions(iter => 
Iterator(iter.foldLeft(0.0)((t,r) => t+r.getDouble(0.sum

 takes a long time (33 seconds). In all these examples everything is fully 
cached in memory. And yes for these kinds of operations I can use SQL, but for 
more complex queries I’d much rather be using a combo of SparkSQL to select the 
data (so I get nice things like Parquet pushdowns etc.) & functional Scala!

I think I’m doing something dumb… Is there something I should be doing to get 
faster performance on MapPartitions on SchemaRDDs? Is there some unwrapping 
going on in the background that catalyst does in a smart way that I’m missing?

Cheers,
~N

Nathan McCarthy
QUANTIUM
Level 25, 8 Chifley, 8-12 Chifley Square
Sydney NSW 2000

T: +61 2 8224 8922
F: +61 2 9292 6444

W: quantium.com.au



linkedin.com/company/quantium

facebook.com/QuantiumAustralia

twitter.com/QuantiumAU


The contents of this email, including attachments, may be confidential 
information. If you are not the intended recipient, any use, disclosure or 
copying of the information is unauthorised. If you have received this email in 
error, we would be grateful if you would notify us immediately by email reply, 
phone (+ 61 2 9292 6400) or fax (+ 61 2 9292 6444) and delete the message from 
your system.


Re: SparkSQL schemaRDD & MapPartitions calls - performance issues - columnar formats?

2015-01-08 Thread Nathan McCarthy
Any ideas? :)

From: Nathan 
mailto:nathan.mccar...@quantium.com.au>>
Date: Wednesday, 7 January 2015 2:53 pm
To: "user@spark.apache.org<mailto:user@spark.apache.org>" 
mailto:user@spark.apache.org>>
Subject: SparkSQL schemaRDD & MapPartitions calls - performance issues - 
columnar formats?

Hi,

I’m trying to use a combination of SparkSQL and ‘normal' Spark/Scala via 
rdd.mapPartitions(…). Using the latest release 1.2.0.

Simple example; load up some sample data from parquet on HDFS (about 380m rows, 
10 columns) on a 7 node cluster.

  val t = sqlC.parquetFile("/user/n/sales-tran12m.parquet”)
  t.registerTempTable("test1”)
  sqlC.cacheTable("test1”)

Now lets do some operations on it; I want the total sales & quantities sold for 
each hour in the day so I choose 3 out of the 10 possible columns...

  sqlC.sql("select Hour, sum(ItemQty), sum(Sales) from test1 group by 
Hour").collect().foreach(println)

After the table has been 100% cached in memory, this takes around 11 seconds.

Lets do the same thing but via a MapPartitions call (this isn’t production 
ready code but gets the job done).

  val try2 = sqlC.sql("select Hour, ItemQty, Sales from test1”)
  rddPC.mapPartitions { case hrs =>
val qtySum = new Array[Double](24)
val salesSum = new Array[Double](24)

for(r <- hrs) {
  val hr = r.getInt(0)
  qtySum(hr) += r.getDouble(1)
  salesSum(hr) += r.getDouble(2)
}
(salesSum zip qtySum).zipWithIndex.map(_.swap).iterator
  }.reduceByKey((a,b) => (a._1 + b._1, a._2 + b._2)).collect().foreach(println)

Now this takes around ~49 seconds… Even though test1 table is 100% cached. The 
number of partitions remains the same…

Now if I create a simple RDD of a case class HourSum(hour: Int, qty: Double, 
sales: Double)

Convert the SchemaRDD;
val rdd = sqlC.sql("select * from test1").map{ r => HourSum(r.getInt(1), 
r.getDouble(7), r.getDouble(8)) }.cache()
//cache all the data
rdd.count()

Then run basically the same MapPartitions query;

rdd.mapPartitions { case hrs =>
  val qtySum = new Array[Double](24)
  val salesSum = new Array[Double](24)

  for(r <- hrs) {
val hr = r.hour
qtySum(hr) += r.qty
salesSum(hr) += r.sales
  }
  (salesSum zip qtySum).zipWithIndex.map(_.swap).iterator
}.reduceByKey((a,b) => (a._1 + b._1, a._2 + b._2)).collect().foreach(println)

This takes around 1.5 seconds! Albeit the memory footprint is much larger.

My thinking is that because SparkSQL does store things in a columnar format, 
there is some unwrapping to be done out of the column array buffers which takes 
time and for some reason this just takes longer when I switch out to map 
partitions (maybe its unwrapping the entire row, even though I’m using just a 
subset of columns, or maybe there is some object creation/autoboxing going on 
when calling getInt or getDouble)…

I’ve tried simpler cases too, like just summing sales. Running sum via SQL is 
fast (4.7 seconds), running a mapPartition sum on a double RDD is even faster 
(2.6 seconds). But MapPartitions on the SchemaRDD;

sqlC.sql("select SalesInclGST from test1").mapPartitions(iter => 
Iterator(iter.foldLeft(0.0)((t,r) => t+r.getDouble(0.sum

 takes a long time (33 seconds). In all these examples everything is fully 
cached in memory. And yes for these kinds of operations I can use SQL, but for 
more complex queries I’d much rather be using a combo of SparkSQL to select the 
data (so I get nice things like Parquet pushdowns etc.) & functional Scala!

I think I’m doing something dumb… Is there something I should be doing to get 
faster performance on MapPartitions on SchemaRDDs? Is there some unwrapping 
going on in the background that catalyst does in a smart way that I’m missing?

Cheers,
~N

Nathan McCarthy
QUANTIUM
Level 25, 8 Chifley, 8-12 Chifley Square
Sydney NSW 2000

T: +61 2 8224 8922
F: +61 2 9292 6444

W: quantium.com.au



linkedin.com/company/quantium

facebook.com/QuantiumAustralia

twitter.com/QuantiumAU


The contents of this email, including attachments, may be confidential 
information. If you are not the intended recipient, any use, disclosure or 
copying of the information is unauthorised. If you have received this email in 
error, we would be grateful if you would notify us immediately by email reply, 
phone (+ 61 2 9292 6400) or fax (+ 61 2 9292 6444) and delete the message from 
your system.


Re: SparkSQL schemaRDD & MapPartitions calls - performance issues - columnar formats?

2015-01-10 Thread Nathan McCarthy
Thanks Cheng & Michael! Makes sense. Appreciate the tips!

Idiomatic scala isn't performant. I’ll definitely start using while loops or 
tail recursive methods. I have noticed this in the spark code base.

I might try turning off columnar compression (via 
spark.sql.inMemoryColumnarStorage.compressed=false correct?) and see how 
performance compares to the primitive objects. Would you expect to see similar 
runtimes vs the primitive objects? We do have the luxury of lots of memory at 
the moment so this might give us an additional performance boost.

Regarding the defensive copying of row objects. Can we switch this off and just 
be aware of the risks? Is MapPartitions on SchemaRDDs and operating on the Row 
object the most performant way to be flipping between SQL & Scala user code? Is 
there anything else I could be doing?

Cheers,
~N

From: Michael Armbrust mailto:mich...@databricks.com>>
Date: Saturday, 10 January 2015 3:41 am
To: Cheng Lian mailto:lian.cs@gmail.com>>
Cc: Nathan 
mailto:nathan.mccar...@quantium.com.au>>, 
"user@spark.apache.org<mailto:user@spark.apache.org>" 
mailto:user@spark.apache.org>>
Subject: Re: SparkSQL schemaRDD & MapPartitions calls - performance issues - 
columnar formats?

The other thing to note here is that Spark SQL defensively copies rows when we 
switch into user code.  This probably explains the difference between 1 & 2.

The difference between 1 & 3 is likely the cost of decompressing the column 
buffers vs. accessing a bunch of uncompressed primitive objects.

On Fri, Jan 9, 2015 at 6:59 AM, Cheng Lian 
mailto:lian.cs@gmail.com>> wrote:
Hey Nathan,

Thanks for sharing, this is a very interesting post :) My comments are inlined 
below.

Cheng

On 1/7/15 11:53 AM, Nathan McCarthy wrote:
Hi,

I’m trying to use a combination of SparkSQL and ‘normal' Spark/Scala via 
rdd.mapPartitions(…). Using the latest release 1.2.0.

Simple example; load up some sample data from parquet on HDFS (about 380m rows, 
10 columns) on a 7 node cluster.

  val t = sqlC.parquetFile("/user/n/sales-tran12m.parquet”)
  t.registerTempTable("test1”)
  sqlC.cacheTable("test1”)

Now lets do some operations on it; I want the total sales & quantities sold for 
each hour in the day so I choose 3 out of the 10 possible columns...

  sqlC.sql("select Hour, sum(ItemQty), sum(Sales) from test1 group by 
Hour").collect().foreach(println)

After the table has been 100% cached in memory, this takes around 11 seconds.

Lets do the same thing but via a MapPartitions call (this isn’t production 
ready code but gets the job done).

  val try2 = sqlC.sql("select Hour, ItemQty, Sales from test1”)
  rddPC.mapPartitions { case hrs =>
val qtySum = new Array[Double](24)
val salesSum = new Array[Double](24)

for(r <- hrs) {
  val hr = r.getInt(0)
  qtySum(hr) += r.getDouble(1)
  salesSum(hr) += r.getDouble(2)
}
(salesSum zip qtySum).zipWithIndex.map(_.swap).iterator
  }.reduceByKey((a,b) => (a._1 + b._1, a._2 + b._2)).collect().foreach(println)
I believe the evil thing that makes this snippet much slower is the for-loop. 
According to my early benchmark done with Scala 2.9, for-loop can be orders of 
magnitude slower than a simple while-loop, especially when the body of the loop 
only does something as trivial as this case. The reason is that Scala for-loop 
is translated into corresponding foreach/map/flatMap/withFilter function calls. 
And that's exactly why Spark SQL tries to avoid for-loop or any other 
functional style code in critical paths (where every row is touched), we also 
uses reusable mutable row objects instead of the immutable version to improve 
performance. You may check HiveTableScan, ParquetTableScan, 
InMemoryColumnarTableScan etc. for reference. Also, the `sum` function calls in 
your SQL code are translated into `o.a.s.s.execution.Aggregate` operators, 
which also use imperative while-loop and reusable mutable rows.

Another thing to notice is that the `hrs` iterator physically points to 
underlying in-memory columnar byte buffers, and the `for (r <- hrs) { ... }` 
loop actually decompresses and extracts values from required byte buffers (this 
is the "unwrapping" processes you mentioned below).

Now this takes around ~49 seconds… Even though test1 table is 100% cached. The 
number of partitions remains the same…

Now if I create a simple RDD of a case class HourSum(hour: Int, qty: Double, 
sales: Double)

Convert the SchemaRDD;
val rdd = sqlC.sql("select * from test1").map{ r => HourSum(r.getInt(1), 
r.getDouble(7), r.getDouble(8)) }.cache()
//cache all the data
rdd.count()

Then run basically the same MapPartitions query;

rdd.mapPartitions { case hrs =>
  val qtySum = new Array[Double](24)
  val salesSum = new Array[Double](24)

  for(r <- hrs) {
val hr = r.hour
qtySum(hr) += r.qty
salesS

Re: SparkSQL schemaRDD & MapPartitions calls - performance issues - columnar formats?

2015-01-15 Thread Nathan McCarthy
Thanks Cheng!

Is there any API I can get access too (e.g. ParquetTableScan) which would allow 
me to load up the low level/baseRDD of just RDD[Row] so I could avoid the 
defensive copy (maybe lose our on columnar storage etc.).

We have parts of our pipeline using SparkSQL/SchemaRDDs and others using the 
core RDD api (mapPartitions etc.). Any tips?

Out of curiosity, a lot of SparkSQL functions seem to run in a mapPartiton 
(e.g. Distinct). Does a defensive copy happen there too?

Keen to get the best performance and the best blend of SparkSQL and functional 
Spark.

Cheers,
Nathan

From: Cheng Lian mailto:lian.cs@gmail.com>>
Date: Monday, 12 January 2015 1:21 am
To: Nathan 
mailto:nathan.mccar...@quantium.com.au>>, 
Michael Armbrust mailto:mich...@databricks.com>>
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" 
mailto:user@spark.apache.org>>
Subject: Re: SparkSQL schemaRDD & MapPartitions calls - performance issues - 
columnar formats?


On 1/11/15 1:40 PM, Nathan McCarthy wrote:
Thanks Cheng & Michael! Makes sense. Appreciate the tips!

Idiomatic scala isn't performant. I’ll definitely start using while loops or 
tail recursive methods. I have noticed this in the spark code base.

I might try turning off columnar compression (via 
spark.sql.inMemoryColumnarStorage.compressed=false correct?) and see how 
performance compares to the primitive objects. Would you expect to see similar 
runtimes vs the primitive objects? We do have the luxury of lots of memory at 
the moment so this might give us an additional performance boost.
Turning off compression should be faster, but still slower than directly using 
primitive objects. Because Spark SQL also serializes all objects within a 
column into byte buffers in a compact format. However, this radically reduces 
number of Java objects in the heap and is more GC friendly. When running large 
queries, cost introduced by GC can be significant.

Regarding the defensive copying of row objects. Can we switch this off and just 
be aware of the risks? Is MapPartitions on SchemaRDDs and operating on the Row 
object the most performant way to be flipping between SQL & Scala user code? Is 
there anything else I could be doing?
This can be very dangerous and error prone. Whenever an operator tries to cache 
row objects, turning off defensive copying can introduce wrong query result. 
For example, sort-based shuffle caches rows to do sorting. In some cases, 
sample operator may also cache row objects. This is very implementation 
specific and may change between versions.

Cheers,
~N

From: Michael Armbrust mailto:mich...@databricks.com>>
Date: Saturday, 10 January 2015 3:41 am
To: Cheng Lian mailto:lian.cs@gmail.com>>
Cc: Nathan 
mailto:nathan.mccar...@quantium.com.au>>, 
"user@spark.apache.org<mailto:user@spark.apache.org>" 
mailto:user@spark.apache.org>>
Subject: Re: SparkSQL schemaRDD & MapPartitions calls - performance issues - 
columnar formats?

The other thing to note here is that Spark SQL defensively copies rows when we 
switch into user code.  This probably explains the difference between 1 & 2.

The difference between 1 & 3 is likely the cost of decompressing the column 
buffers vs. accessing a bunch of uncompressed primitive objects.

On Fri, Jan 9, 2015 at 6:59 AM, Cheng Lian 
mailto:lian.cs@gmail.com>> wrote:
Hey Nathan,

Thanks for sharing, this is a very interesting post :) My comments are inlined 
below.

Cheng

On 1/7/15 11:53 AM, Nathan McCarthy wrote:
Hi,

I’m trying to use a combination of SparkSQL and ‘normal' Spark/Scala via 
rdd.mapPartitions(…). Using the latest release 1.2.0.

Simple example; load up some sample data from parquet on HDFS (about 380m rows, 
10 columns) on a 7 node cluster.

  val t = sqlC.parquetFile("/user/n/sales-tran12m.parquet”)
  t.registerTempTable("test1”)
  sqlC.cacheTable("test1”)

Now lets do some operations on it; I want the total sales & quantities sold for 
each hour in the day so I choose 3 out of the 10 possible columns...

  sqlC.sql("select Hour, sum(ItemQty), sum(Sales) from test1 group by 
Hour").collect().foreach(println)

After the table has been 100% cached in memory, this takes around 11 seconds.

Lets do the same thing but via a MapPartitions call (this isn’t production 
ready code but gets the job done).

  val try2 = sqlC.sql("select Hour, ItemQty, Sales from test1”)
  rddPC.mapPartitions { case hrs =>
val qtySum = new Array[Double](24)
val salesSum = new Array[Double](24)

for(r <- hrs) {
  val hr = r.getInt(0)
  qtySum(hr) += r.getDouble(1)
  salesSum(hr) += r.getDouble(2)
}
(salesSum zip qtySum).zipWithIndex.map(_.swap).iterator
  }.reduceByKey((a,b) => (a._1 + b._1, a._2 + b._2)).collect().foreach(println)
I believe the evil thing that makes this snippet much slower is the for-loop. 
Ac

SparkSQL JDBC Datasources API when running on YARN - Spark 1.3.0

2015-04-14 Thread Nathan McCarthy
Hi guys,

Trying to use a Spark SQL context’s .load(“jdbc", …) method to create a DF from 
a JDBC data source. All seems to work well locally (master = local[*]), however 
as soon as we try and run on YARN we have problems.

We seem to be running into problems with the class path and loading up the JDBC 
driver. I’m using the jTDS 1.3.1 driver, net.sourceforge.jtds.jdbc.Driver.

./bin/spark-shell --jars /tmp/jtds-1.3.1.jar --master yarn-client

When trying to run I get an exception;

scala> sqlContext.load("jdbc", Map("url" -> 
"jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd", "dbtable" -> 
"CUBE.DIM_SUPER_STORE_TBL”))

java.sql.SQLException: No suitable driver found for 
jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd

Thinking maybe we need to force load the driver, if I supply “driver” -> 
“net.sourceforge.jtds.jdbc.Driver” to .load we get;

scala> sqlContext.load("jdbc", Map("url" -> 
"jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd", "driver" -> 
"net.sourceforge.jtds.jdbc.Driver", "dbtable" -> "CUBE.DIM_SUPER_STORE_TBL”))

java.lang.ClassNotFoundException: net.sourceforge.jtds.jdbc.Driver
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:191)
at 
org.apache.spark.sql.jdbc.DefaultSource.createRelation(JDBCRelation.scala:97)
at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:290)
at org.apache.spark.sql.SQLContext.load(SQLContext.scala:679)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:21)

Yet if I run a Class.forName() just from the shell;

scala> Class.forName("net.sourceforge.jtds.jdbc.Driver")
res1: Class[_] = class net.sourceforge.jtds.jdbc.Driver

No problem finding the JAR. I’ve tried in both the shell, and running with 
spark-submit (packing the driver in with my application as a fat JAR). Nothing 
seems to work.

I can also get a connection in the driver/shell no problem;

scala> import java.sql.DriverManager
import java.sql.DriverManager
scala> 
DriverManager.getConnection("jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd")
res3: java.sql.Connection = net.sourceforge.jtds.jdbc.JtdsConnection@2a67ecd0

I’m probably missing some class path setting here. In 
jdbc.DefaultSource.createRelation it looks like the call to Class.forName 
doesn’t specify a class loader so it just uses the default Java behaviour to 
reflectively get the class loader. It almost feels like its using a different 
class loader.

I also tried seeing if the class path was there on all my executors by running;

import scala.collection.JavaConverters._
sc.parallelize(Seq(1,2,3,4)).flatMap(_ => 
java.sql.DriverManager.getDrivers().asScala.map(d => s”$d | 
${d.acceptsURL("jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd")}")).collect().foreach(println)

This successfully returns;

15/04/15 01:07:37 INFO scheduler.DAGScheduler: Job 0 finished: collect at 
Main.scala:46, took 1.495597 s
org.apache.derby.jdbc.AutoloadedDriver40 | false
com.mysql.jdbc.Driver | false
net.sourceforge.jtds.jdbc.Driver | true
org.apache.derby.jdbc.AutoloadedDriver40 | false
com.mysql.jdbc.Driver | false
net.sourceforge.jtds.jdbc.Driver | true
org.apache.derby.jdbc.AutoloadedDriver40 | false
com.mysql.jdbc.Driver | false
net.sourceforge.jtds.jdbc.Driver | true
org.apache.derby.jdbc.AutoloadedDriver40 | false
com.mysql.jdbc.Driver | false
net.sourceforge.jtds.jdbc.Driver | true

As a final test we tried with postgres driver and had the same problem. Any 
ideas?

Cheers,
Nathan


Re: SparkSQL JDBC Datasources API when running on YARN - Spark 1.3.0

2015-04-14 Thread Nathan McCarthy
Just an update, tried with the old JdbcRDD and that worked fine.

From: Nathan 
mailto:nathan.mccar...@quantium.com.au>>
Date: Wednesday, 15 April 2015 1:57 pm
To: "user@spark.apache.org" 
mailto:user@spark.apache.org>>
Subject: SparkSQL JDBC Datasources API when running on YARN - Spark 1.3.0

Hi guys,

Trying to use a Spark SQL context’s .load(“jdbc", …) method to create a DF from 
a JDBC data source. All seems to work well locally (master = local[*]), however 
as soon as we try and run on YARN we have problems.

We seem to be running into problems with the class path and loading up the JDBC 
driver. I’m using the jTDS 1.3.1 driver, net.sourceforge.jtds.jdbc.Driver.

./bin/spark-shell --jars /tmp/jtds-1.3.1.jar --master yarn-client

When trying to run I get an exception;

scala> sqlContext.load("jdbc", Map("url" -> 
"jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd", "dbtable" -> 
"CUBE.DIM_SUPER_STORE_TBL”))

java.sql.SQLException: No suitable driver found for 
jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd

Thinking maybe we need to force load the driver, if I supply “driver” -> 
“net.sourceforge.jtds.jdbc.Driver” to .load we get;

scala> sqlContext.load("jdbc", Map("url" -> 
"jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd", "driver" -> 
"net.sourceforge.jtds.jdbc.Driver", "dbtable" -> "CUBE.DIM_SUPER_STORE_TBL”))

java.lang.ClassNotFoundException: net.sourceforge.jtds.jdbc.Driver
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:191)
at 
org.apache.spark.sql.jdbc.DefaultSource.createRelation(JDBCRelation.scala:97)
at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:290)
at org.apache.spark.sql.SQLContext.load(SQLContext.scala:679)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:21)

Yet if I run a Class.forName() just from the shell;

scala> Class.forName("net.sourceforge.jtds.jdbc.Driver")
res1: Class[_] = class net.sourceforge.jtds.jdbc.Driver

No problem finding the JAR. I’ve tried in both the shell, and running with 
spark-submit (packing the driver in with my application as a fat JAR). Nothing 
seems to work.

I can also get a connection in the driver/shell no problem;

scala> import java.sql.DriverManager
import java.sql.DriverManager
scala> 
DriverManager.getConnection("jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd")
res3: java.sql.Connection = net.sourceforge.jtds.jdbc.JtdsConnection@2a67ecd0

I’m probably missing some class path setting here. In 
jdbc.DefaultSource.createRelation it looks like the call to Class.forName 
doesn’t specify a class loader so it just uses the default Java behaviour to 
reflectively get the class loader. It almost feels like its using a different 
class loader.

I also tried seeing if the class path was there on all my executors by running;

import scala.collection.JavaConverters._
sc.parallelize(Seq(1,2,3,4)).flatMap(_ => 
java.sql.DriverManager.getDrivers().asScala.map(d => s”$d | 
${d.acceptsURL("jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd")}")).collect().foreach(println)

This successfully returns;

15/04/15 01:07:37 INFO scheduler.DAGScheduler: Job 0 finished: collect at 
Main.scala:46, took 1.495597 s
org.apache.derby.jdbc.AutoloadedDriver40 | false
com.mysql.jdbc.Driver | false
net.sourceforge.jtds.jdbc.Driver | true
org.apache.derby.jdbc.AutoloadedDriver40 | false
com.mysql.jdbc.Driver | false
net.sourceforge.jtds.jdbc.Driver | true
org.apache.derby.jdbc.AutoloadedDriver40 | false
com.mysql.jdbc.Driver | false
net.sourceforge.jtds.jdbc.Driver | true
org.apache.derby.jdbc.AutoloadedDriver40 | false
com.mysql.jdbc.Driver | false
net.sourceforge.jtds.jdbc.Driver | true

As a final test we tried with postgres driver and had the same problem. Any 
ideas?

Cheers,
Nathan


RE: SparkSQL JDBC Datasources API when running on YARN - Spark 1.3.0

2015-04-15 Thread Nathan McCarthy
Tried with 1.3.0 release (built myself) & the most recent 1.3.1 Snapshot off 
the 1.3 branch.

Haven't tried with 1.4/master.


From: Wang, Daoyuan [daoyuan.w...@intel.com]
Sent: Wednesday, April 15, 2015 5:22 PM
To: Nathan McCarthy; user@spark.apache.org
Subject: RE: SparkSQL JDBC Datasources API when running on YARN - Spark 1.3.0

Can you provide your spark version?

Thanks,
Daoyuan

From: Nathan McCarthy [mailto:nathan.mccar...@quantium.com.au]
Sent: Wednesday, April 15, 2015 1:57 PM
To: Nathan McCarthy; user@spark.apache.org
Subject: Re: SparkSQL JDBC Datasources API when running on YARN - Spark 1.3.0

Just an update, tried with the old JdbcRDD and that worked fine.

From: Nathan 
mailto:nathan.mccar...@quantium.com.au>>
Date: Wednesday, 15 April 2015 1:57 pm
To: "user@spark.apache.org<mailto:user@spark.apache.org>" 
mailto:user@spark.apache.org>>
Subject: SparkSQL JDBC Datasources API when running on YARN - Spark 1.3.0

Hi guys,

Trying to use a Spark SQL context’s .load(“jdbc", …) method to create a DF from 
a JDBC data source. All seems to work well locally (master = local[*]), however 
as soon as we try and run on YARN we have problems.

We seem to be running into problems with the class path and loading up the JDBC 
driver. I’m using the jTDS 1.3.1 driver, net.sourceforge.jtds.jdbc.Driver.

./bin/spark-shell --jars /tmp/jtds-1.3.1.jar --master yarn-client

When trying to run I get an exception;

scala> sqlContext.load("jdbc", Map("url" -> 
"jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd", "dbtable" -> 
"CUBE.DIM_SUPER_STORE_TBL”))

java.sql.SQLException: No suitable driver found for 
jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd

Thinking maybe we need to force load the driver, if I supply “driver” -> 
“net.sourceforge.jtds.jdbc.Driver” to .load we get;

scala> sqlContext.load("jdbc", Map("url" -> 
"jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd", "driver" -> 
"net.sourceforge.jtds.jdbc.Driver", "dbtable" -> "CUBE.DIM_SUPER_STORE_TBL”))

java.lang.ClassNotFoundException: net.sourceforge.jtds.jdbc.Driver
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:191)
at 
org.apache.spark.sql.jdbc.DefaultSource.createRelation(JDBCRelation.scala:97)
at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:290)
at org.apache.spark.sql.SQLContext.load(SQLContext.scala:679)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:21)

Yet if I run a Class.forName() just from the shell;

scala> Class.forName("net.sourceforge.jtds.jdbc.Driver")
res1: Class[_] = class net.sourceforge.jtds.jdbc.Driver

No problem finding the JAR. I’ve tried in both the shell, and running with 
spark-submit (packing the driver in with my application as a fat JAR). Nothing 
seems to work.

I can also get a connection in the driver/shell no problem;

scala> import java.sql.DriverManager
import java.sql.DriverManager
scala> 
DriverManager.getConnection("jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd")
res3: java.sql.Connection = 
net.sourceforge.jtds.jdbc.JtdsConnection@2a67ecd0<mailto:net.sourceforge.jtds.jdbc.JtdsConnection@2a67ecd0>

I’m probably missing some class path setting here. In 
jdbc.DefaultSource.createRelation it looks like the call to Class.forName 
doesn’t specify a class loader so it just uses the default Java behaviour to 
reflectively get the class loader. It almost feels like its using a different 
class loader.

I also tried seeing if the class path was there on all my executors by running;

import scala.collection.JavaConverters._
sc.parallelize(Seq(1,2,3,4)).flatMap(_ => 
java.sql.DriverManager.getDrivers().asScala.map(d => s”$d | 
${d.acceptsURL("jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd")}")).collect().foreach(println)

This successfully returns;

15/04/15 01:07:37 INFO scheduler.DAGScheduler: Job 0 finished: collect at 
Main.scala:46, took 1.495597 s
org.apache.derby.jdbc.AutoloadedDriver40 | false
com.mysql.jdbc.Driver | false
net.sourceforge.jtds.jdbc.Driver | true
org.apache.derby.jdbc.AutoloadedDriver40 | false
com.mysql.jdbc.Driver | false
net.sourceforge.jtds.jdbc.Driver | true
org.apache.derby.jdbc.AutoloadedDriver40 | false
com.mysql.j

Re: SparkSQL JDBC Datasources API when running on YARN - Spark 1.3.0

2015-04-15 Thread Nathan McCarthy
The problem lies with getting the driver classes into the primordial class 
loader when running on YARN.

Basically I need to somehow set the SPARK_CLASSPATH or compute_classpath.sh 
when running on YARN. I’m not sure how to do this when YARN is handling all the 
file copy.

From: Nathan 
mailto:nathan.mccar...@quantium.com.au>>
Date: Wednesday, 15 April 2015 11:49 pm
To: "Wang, Daoyuan" mailto:daoyuan.w...@intel.com>>, 
"user@spark.apache.org<mailto:user@spark.apache.org>" 
mailto:user@spark.apache.org>>
Subject: RE: SparkSQL JDBC Datasources API when running on YARN - Spark 1.3.0

Tried with 1.3.0 release (built myself) & the most recent 1.3.1 Snapshot off 
the 1.3 branch.

Haven't tried with 1.4/master.


From: Wang, Daoyuan [daoyuan.w...@intel.com<mailto:daoyuan.w...@intel.com>]
Sent: Wednesday, April 15, 2015 5:22 PM
To: Nathan McCarthy; user@spark.apache.org<mailto:user@spark.apache.org>
Subject: RE: SparkSQL JDBC Datasources API when running on YARN - Spark 1.3.0

Can you provide your spark version?

Thanks,
Daoyuan

From: Nathan McCarthy [mailto:nathan.mccar...@quantium.com.au]
Sent: Wednesday, April 15, 2015 1:57 PM
To: Nathan McCarthy; user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: SparkSQL JDBC Datasources API when running on YARN - Spark 1.3.0

Just an update, tried with the old JdbcRDD and that worked fine.

From: Nathan 
mailto:nathan.mccar...@quantium.com.au>>
Date: Wednesday, 15 April 2015 1:57 pm
To: "user@spark.apache.org<mailto:user@spark.apache.org>" 
mailto:user@spark.apache.org>>
Subject: SparkSQL JDBC Datasources API when running on YARN - Spark 1.3.0

Hi guys,

Trying to use a Spark SQL context’s .load(“jdbc", …) method to create a DF from 
a JDBC data source. All seems to work well locally (master = local[*]), however 
as soon as we try and run on YARN we have problems.

We seem to be running into problems with the class path and loading up the JDBC 
driver. I’m using the jTDS 1.3.1 driver, net.sourceforge.jtds.jdbc.Driver.

./bin/spark-shell --jars /tmp/jtds-1.3.1.jar --master yarn-client

When trying to run I get an exception;

scala> sqlContext.load("jdbc", Map("url" -> 
"jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd", "dbtable" -> 
"CUBE.DIM_SUPER_STORE_TBL”))

java.sql.SQLException: No suitable driver found for 
jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd

Thinking maybe we need to force load the driver, if I supply “driver” -> 
“net.sourceforge.jtds.jdbc.Driver” to .load we get;

scala> sqlContext.load("jdbc", Map("url" -> 
"jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd", "driver" -> 
"net.sourceforge.jtds.jdbc.Driver", "dbtable" -> "CUBE.DIM_SUPER_STORE_TBL”))

java.lang.ClassNotFoundException: net.sourceforge.jtds.jdbc.Driver
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:191)
at 
org.apache.spark.sql.jdbc.DefaultSource.createRelation(JDBCRelation.scala:97)
at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:290)
at org.apache.spark.sql.SQLContext.load(SQLContext.scala:679)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:21)

Yet if I run a Class.forName() just from the shell;

scala> Class.forName("net.sourceforge.jtds.jdbc.Driver")
res1: Class[_] = class net.sourceforge.jtds.jdbc.Driver

No problem finding the JAR. I’ve tried in both the shell, and running with 
spark-submit (packing the driver in with my application as a fat JAR). Nothing 
seems to work.

I can also get a connection in the driver/shell no problem;

scala> import java.sql.DriverManager
import java.sql.DriverManager
scala> 
DriverManager.getConnection("jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd")
res3: java.sql.Connection = 
net.sourceforge.jtds.jdbc.JtdsConnection@2a67ecd0<mailto:net.sourceforge.jtds.jdbc.JtdsConnection@2a67ecd0>

I’m probably missing some class path setting here. In 
jdbc.DefaultSource.createRelation it looks like the call to Class.forName 
doesn’t specify a class loader so it just uses the default Java behaviour to 
reflectively get the class loader. It almost feels like its using a different 
class loader.

I also tried seeing if the class path was

Re: SparkSQL JDBC Datasources API when running on YARN - Spark 1.3.0

2015-04-16 Thread Nathan McCarthy
Its JTDS 1.3.1; http://sourceforge.net/projects/jtds/files/jtds/1.3.1/

I put that jar in /tmp on the driver/machine I’m running spark shell from.

Then I ran with ./bin/spark-shell --jars /tmp/jtds-1.3.1.jar --master 
yarn-client

So I’m guessing that --jars doesn’t set the class path for the primordial class 
loader. And because its on the class path in ‘user land’ I’m guessing

Thinking a work around would be to merge my spark assembly jar with the jtds 
driver… But it seems like a hack. The other thing I notice is there is --file 
which lets me pass around files with the YARN distribute, so Im thinking I can 
somehow use this if --jars doesn’t work.

Really I need to understand how the spark class path is set when running on 
YARN.


From: "ÐΞ€ρ@Ҝ (๏̯͡๏)" mailto:deepuj...@gmail.com>>
Date: Thursday, 16 April 2015 3:02 pm
To: Nathan 
mailto:nathan.mccar...@quantium.com.au>>
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" 
mailto:user@spark.apache.org>>
Subject: Re: SparkSQL JDBC Datasources API when running on YARN - Spark 1.3.0

Can you provide the JDBC connector jar version. Possibly the full JAR name and 
full command you ran Spark with ?

On Wed, Apr 15, 2015 at 11:27 AM, Nathan McCarthy 
mailto:nathan.mccar...@quantium.com.au>> wrote:
Just an update, tried with the old JdbcRDD and that worked fine.

From: Nathan 
mailto:nathan.mccar...@quantium.com.au>>
Date: Wednesday, 15 April 2015 1:57 pm
To: "user@spark.apache.org<mailto:user@spark.apache.org>" 
mailto:user@spark.apache.org>>
Subject: SparkSQL JDBC Datasources API when running on YARN - Spark 1.3.0

Hi guys,

Trying to use a Spark SQL context’s .load(“jdbc", …) method to create a DF from 
a JDBC data source. All seems to work well locally (master = local[*]), however 
as soon as we try and run on YARN we have problems.

We seem to be running into problems with the class path and loading up the JDBC 
driver. I’m using the jTDS 1.3.1 driver, net.sourceforge.jtds.jdbc.Driver.

./bin/spark-shell --jars /tmp/jtds-1.3.1.jar --master yarn-client

When trying to run I get an exception;

scala> sqlContext.load("jdbc", Map("url" -> 
"jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd", "dbtable" -> 
"CUBE.DIM_SUPER_STORE_TBL”))

java.sql.SQLException: No suitable driver found for 
jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd

Thinking maybe we need to force load the driver, if I supply “driver” -> 
“net.sourceforge.jtds.jdbc.Driver” to .load we get;

scala> sqlContext.load("jdbc", Map("url" -> 
"jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd", "driver" -> 
"net.sourceforge.jtds.jdbc.Driver", "dbtable" -> "CUBE.DIM_SUPER_STORE_TBL”))

java.lang.ClassNotFoundException: net.sourceforge.jtds.jdbc.Driver
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:191)
at 
org.apache.spark.sql.jdbc.DefaultSource.createRelation(JDBCRelation.scala:97)
at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:290)
at org.apache.spark.sql.SQLContext.load(SQLContext.scala:679)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:21)

Yet if I run a Class.forName() just from the shell;

scala> Class.forName("net.sourceforge.jtds.jdbc.Driver")
res1: Class[_] = class net.sourceforge.jtds.jdbc.Driver

No problem finding the JAR. I’ve tried in both the shell, and running with 
spark-submit (packing the driver in with my application as a fat JAR). Nothing 
seems to work.

I can also get a connection in the driver/shell no problem;

scala> import java.sql.DriverManager
import java.sql.DriverManager
scala> 
DriverManager.getConnection("jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd")
res3: java.sql.Connection = net.sourceforge.jtds.jdbc.JtdsConnection@2a67ecd0

I’m probably missing some class path setting here. In 
jdbc.DefaultSource.createRelation it looks like the call to Class.forName 
doesn’t specify a class loader so it just uses the default Java behaviour to 
reflectively get the class loader. It almost feels like its using a different 
class loader.

I also tried seeing if the class path was there on all my executors by running;

import scala.collection.JavaConverters._
sc.parallelize(Seq(1,2,3,4)).flatMap(_ => 
java.sql.D