RE: Spark Streaming - graceful shutdown when stream has no more data

2016-02-24 Thread Cheng, Hao
This is very interesting, how to shutdown the streaming job gracefully once no 
input data for some time.

A doable solution probably you can count the input data by using the 
Accumulator, and anther thread (in master node) will always to get the latest 
accumulator value, if there is no value change from the accumulator for 
sometime, then shutdown the streaming job.

From: Daniel Siegmann [mailto:daniel.siegm...@teamaol.com]
Sent: Wednesday, February 24, 2016 12:30 AM
To: Ashutosh Kumar 
Cc: Hemant Bhanawat ; Ted Yu ; Femi 
Anthony ; user 
Subject: Re: Spark Streaming - graceful shutdown when stream has no more data

During testing you will typically be using some finite data. You want the 
stream to shut down automatically when that data has been consumed so your test 
shuts down gracefully.
Of course once the code is running in production you'll want it to keep waiting 
for new records. So whether the stream shuts down when there's no more data 
should be configurable.


On Tue, Feb 23, 2016 at 11:09 AM, Ashutosh Kumar 
> wrote:
Just out of curiosity I will like to know why a streaming program should 
shutdown when no new data is arriving?  I think it should keep waiting for 
arrival of new records.
Thanks
Ashutosh

On Tue, Feb 23, 2016 at 9:17 PM, Hemant Bhanawat 
> wrote:
A guess - parseRecord is returning None in some case (probaly empty lines). And 
then entry.get is throwing the exception.
You may want to filter the None values from accessLogDStream before you run the 
map function over it.
Hemant

Hemant Bhanawat
www.snappydata.io

On Tue, Feb 23, 2016 at 6:00 PM, Ted Yu 
> wrote:
Which line is line 42 in your code ?

When variable lines becomes empty, you can stop your program.

Cheers

On Feb 23, 2016, at 12:25 AM, Femi Anthony 
> wrote:

I am working on Spark Streaming API and I wish to stream a set of 
pre-downloaded web log files continuously to simulate a real-time stream. I 
wrote a script that gunzips the compressed logs and pipes the output to nc on 
port .

The script looks like this:

BASEDIR=/home/mysuer/data/datamining/internet_traffic_archive

zipped_files=`find $BASEDIR -name "*.gz"`



for zfile in $zipped_files

 do

  echo "Unzipping $zfile..."

  gunzip -c $zfile  | nc -l -p  -q 20



 done
I have streaming code written in Scala that processes the streams. It works 
well for the most part, but when its run out of files to stream I get the 
following error in Spark:



16/02/19 23:04:35 WARN ReceiverSupervisorImpl:

Restarting receiver with delay 2000 ms: Socket data stream had no more data

16/02/19 23:04:35 ERROR ReceiverTracker: Deregistered receiver for stream 0:

Restarting receiver with delay 2000ms: Socket data stream had no more data

16/02/19 23:04:35 WARN BlockManager: Block input-0-1455941075600 replicated to 
only 0 peer(s) instead of 1 peers



16/02/19 23:04:40 ERROR Executor: Exception in task 2.0 in stage 15.0 (TID 47)

java.util.NoSuchElementException: None.get

at scala.None$.get(Option.scala:313)

at scala.None$.get(Option.scala:311)

at 
com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42)

at 
com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42)

How to I implement a graceful shutdown so that the program exits gracefully 
when it no longer detects any data in the stream ?

My Spark Streaming code looks like this:

object StreamingLogEnhanced {

 def main(args: Array[String]) {

  val master = args(0)

  val conf = new

 SparkConf().setMaster(master).setAppName("StreamingLogEnhanced")

 // Create a StreamingContext with a n second batch size

  val ssc = new StreamingContext(conf, Seconds(10))

 // Create a DStream from all the input on port 

  val log = Logger.getLogger(getClass.getName)



  sys.ShutdownHookThread {

  log.info("Gracefully stopping Spark Streaming Application")

  ssc.stop(true, true)

  log.info("Application stopped")

  }

  val lines = ssc.socketTextStream("localhost", )

  // Create a count of log hits by ip

  var ipCounts=countByIp(lines)

  ipCounts.print()



  // start our streaming context and wait for it to "finish"

  ssc.start()

  // Wait for 600 seconds then exit

  ssc.awaitTermination(1*600)

  ssc.stop()

  }



 def countByIp(lines: DStream[String]) = {

   val parser = new AccessLogParser

   val accessLogDStream = lines.map(line => parser.parseRecord(line))

   val ipDStream = accessLogDStream.map(entry =>


RE: Spark SQL joins taking too long

2016-01-27 Thread Cheng, Hao
Another possibility is about the parallelism? Probably be 1 or some other small 
value, since the input data size is not that big.

If in that case, probably you can try something like:

Df1.repartition(10).registerTempTable(“hospitals”);
Df2.repartition(10).registerTempTable(“counties”);
…
And then doing the join.


From: Raghu Ganti [mailto:raghuki...@gmail.com]
Sent: Thursday, January 28, 2016 3:06 AM
To: Ted Yu; Дмитро Попович
Cc: user
Subject: Re: Spark SQL joins taking too long

The problem is with the way Spark query plan is being created, IMO, what was 
happening before is that the order of the tables mattered and when the larger 
table is given first, it took a very long time (~53mins to complete). I changed 
the order of the tables with the smaller one first (including replacing the 
table with one element with that of the entire one) and modified the query to 
look like this:

SELECT c.NAME, h.name FROM counties c, hospitals h WHERE c.NAME 
= 'Dutchess' AND ST_Intersects(c.shape, h.location)
With the above query, things worked like a charm (<1min to finish the entire 
execution and join on 3141 polygons with 6.5k points).
Do let me know if you need more info in order to pin point the issue.
Regards,
Raghu

On Tue, Jan 26, 2016 at 5:13 PM, Ted Yu 
> wrote:
What's the type of shape column ?

Can you disclose what SomeUDF does (by showing the code) ?

Cheers

On Tue, Jan 26, 2016 at 12:41 PM, raghukiran 
> wrote:
Hi,

I create two tables, one counties with just one row (it actually has 2k
rows, but I used only one) and another hospitals, which has 6k rows. The
join command I use is as follows, which takes way too long to run and has
never finished successfully (even after nearly 10mins). The following is
what I have:

DataFrame df1 = ...
df1.registerTempTable("hospitals");
DataFrame df2 = ...
df2.registerTempTable("counties"); //has only one row right now
DataFrame joinDf = sqlCtx.sql("SELECT h.name, 
c.name FROM hospitals h JOIN
counties c ON SomeUDF(c.shape, h.location)");
long count = joinDf.count(); //this takes too long!

//whereas the following which is the exact equivalent of the above gets done
very quickly!
DataFrame joinDf = sqlCtx.sql("SELECT h.name FROM hospitals WHERE
SomeUDF('c.shape as string', h.location)");
long count = joinDf.count(); //gives me the correct answer of 8

Any suggestions on what I can do to optimize and debug this piece of code?

Regards,
Raghu



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-joins-taking-too-long-tp26078.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.org




RE: JSON to SQL

2016-01-27 Thread Cheng, Hao
Have you ever try the DataFrame API like: 
sqlContext.read.json("/path/to/file.json"); the Spark SQL will auto infer the 
type/schema for you.

And lateral view will help on the flatten issues,
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+LateralView, as 
well as the “a.b[0].c” format of expression.


From: Andrés Ivaldi [mailto:iaiva...@gmail.com]
Sent: Thursday, January 28, 2016 3:39 AM
To: Sahil Sareen
Cc: Al Pivonka; user
Subject: Re: JSON to SQL

I'm really brand new with Scala, but if I'm defining a case class then is 
becouse I know how is the json's structure is previously?

If I'm able to define dinamicaly a case class from the JSON structure then even 
with spark I will be able to extract the data

On Wed, Jan 27, 2016 at 4:01 PM, Sahil Sareen 
> wrote:
Isn't this just about defining a case class and using 
parse(json).extract[CaseClassName]  using Jackson?

-Sahil

On Wed, Jan 27, 2016 at 11:08 PM, Andrés Ivaldi 
> wrote:
We dont have Domain Objects, its a service like a pipeline, data is read  from 
source and they are saved it in relational Database

I can read the structure from DataFrames, and do some transformations, I would 
prefer to do it with Spark to be consistent with the process


On Wed, Jan 27, 2016 at 12:25 PM, Al Pivonka 
> wrote:
Are you using an Relational Database?
If so why not use a nojs DB ? then pull from it to your relational?

Or utilize a library that understands Json structure like Jackson to obtain the 
data from the Json structure the persist the Domain Objects ?

On Wed, Jan 27, 2016 at 9:45 AM, Andrés Ivaldi 
> wrote:
Sure,
The Job is like an etl, but without interface, so I decide the rules of how the 
JSON will be saved into a SQL Table.

I need to Flatten the hierarchies where is possible in case of list flatten 
also, nested objects Won't be processed by now

{"a":1,"b":[2,3],"c"="Field", "d":[4,5,6,7,8] }
{"a":11,"b":[22,33],"c"="Field1", "d":[44,55,66,77,88] }
{"a":111,"b":[222,333],"c"="Field2", "d":[44,55,666,777,888] }

I would like something like this on my SQL table
ab  c d
12,3Field 4,5,6,7,8
11   22,33  Field144,55,66,77,88
111  222,333Field2444,555,,666,777,888
Right now this is what i need
I will later add more intelligence, like detection of list or nested objects 
and create relations in other tables.



On Wed, Jan 27, 2016 at 11:25 AM, Al Pivonka 
> wrote:
More detail is needed.
Can you provide some context to the use-case ?

On Wed, Jan 27, 2016 at 8:33 AM, Andrés Ivaldi 
> wrote:
Hello, I'm trying to Save a JSON filo into SQL table.

If i try to do this directly the IlligalArgumentException is raised, I suppose 
this is beacouse JSON have a hierarchical structure, is that correct?

If that is the problem, how can I flatten the JSON structure? The JSON 
structure to be processed would be unknow, so I need to do it programatically

regards
--
Ing. Ivaldi Andres



--
Those who say it can't be done, are usually interrupted by those doing it.


--
Ing. Ivaldi Andres



--
Those who say it can't be done, are usually interrupted by those doing it.


--
Ing. Ivaldi Andres




--
Ing. Ivaldi Andres


RE: Problem with WINDOW functions?

2015-12-29 Thread Cheng, Hao
Which version are you using? Have you tried the 1.6?

From: Vadim Tkachenko [mailto:apache...@gmail.com]
Sent: Wednesday, December 30, 2015 10:17 AM
To: Cheng, Hao
Cc: user@spark.apache.org
Subject: Re: Problem with WINDOW functions?

When I allocate 200g to executor, it is able to make better progress,
that is I see 189 tasks executed instead of 169 previously.
But eventually it fails with the same error.

On Tue, Dec 29, 2015 at 5:58 PM, Cheng, Hao 
<hao.ch...@intel.com<mailto:hao.ch...@intel.com>> wrote:
Is there any improvement if you set a bigger memory for executors?

-Original Message-
From: va...@percona.com<mailto:va...@percona.com> 
[mailto:va...@percona.com<mailto:va...@percona.com>] On Behalf Of Vadim 
Tkachenko
Sent: Wednesday, December 30, 2015 9:51 AM
To: Cheng, Hao
Cc: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: Problem with WINDOW functions?

Hi,

I am getting the same error with write.parquet("/path/to/file") :
 WARN HeartbeatReceiver: Removing executor 0 with no recent
heartbeats: 160714 ms exceeds timeout 12 ms
15/12/30 01:49:05 ERROR TaskSchedulerImpl: Lost executor 0 on
10.10.7.167<http://10.10.7.167>: Executor heartbeat timed out after 160714 ms


On Tue, Dec 29, 2015 at 5:35 PM, Cheng, Hao 
<hao.ch...@intel.com<mailto:hao.ch...@intel.com>> wrote:
> Can you try to write the result into another file instead? Let's see if there 
> any issue in the executors side .
>
> sqlContext.sql("SELECT day,page,dense_rank() OVER (PARTITION BY day
> ORDER BY pageviews DESC) as rank FROM d1").filter("rank <=
> 20").sort($"day",$"rank").write.parquet("/path/to/file")
>
> -Original Message-
> From: vadimtk [mailto:apache...@gmail.com<mailto:apache...@gmail.com>]
> Sent: Wednesday, December 30, 2015 9:29 AM
> To: user@spark.apache.org<mailto:user@spark.apache.org>
> Subject: Problem with WINDOW functions?
>
> Hi,
>
> I can't successfully execute a query with WINDOW function.
>
> The statements are following:
>
> val orcFile =
> sqlContext.read.parquet("/data/flash/spark/dat14sn").filter("upper(pro
> ject)='EN'")
> orcFile.registerTempTable("d1")
>  sqlContext.sql("SELECT day,page,dense_rank() OVER (PARTITION BY day
> ORDER BY pageviews DESC) as rank FROM d1").filter("rank <=
> 20").sort($"day",$"rank").collect().foreach(println)
>
> with default
> spark.driver.memory
>
> I am getting java.lang.OutOfMemoryError: Java heap space.
> The same if I set spark.driver.memory=10g.
>
> When I set spark.driver.memory=45g (the box has 256GB of RAM) the execution 
> fails with a different error:
>
> 15/12/29 23:03:19 WARN HeartbeatReceiver: Removing executor 0 with no
> recent
> heartbeats: 129324 ms exceeds timeout 12 ms
>
> And I see that GC takes a lot of time.
>
> What is a proper way to execute statements above?
>
> I see the similar problems reported
> http://stackoverflow.com/questions/32196859/org-apache-spark-shuffle-f
> etchfailedexception
> http://stackoverflow.com/questions/32544478/spark-memory-settings-for-
> count-action-in-a-big-table
>
>
>
>
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Problem-with-WINDO
> W-functions-tp25833.html Sent from the Apache Spark User List mailing
> list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: 
> user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org> 
> For
> additional commands, e-mail: 
> user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>
>



RE: Problem with WINDOW functions?

2015-12-29 Thread Cheng, Hao
Can you try to write the result into another file instead? Let's see if there 
any issue in the executors side .

sqlContext.sql("SELECT day,page,dense_rank() OVER (PARTITION BY day ORDER BY 
pageviews DESC) as rank FROM d1").filter("rank <=
20").sort($"day",$"rank").write.parquet("/path/to/file")

-Original Message-
From: vadimtk [mailto:apache...@gmail.com] 
Sent: Wednesday, December 30, 2015 9:29 AM
To: user@spark.apache.org
Subject: Problem with WINDOW functions?

Hi,

I can't successfully execute a query with WINDOW function.

The statements are following:

val orcFile =
sqlContext.read.parquet("/data/flash/spark/dat14sn").filter("upper(project)='EN'")
orcFile.registerTempTable("d1")
 sqlContext.sql("SELECT day,page,dense_rank() OVER (PARTITION BY day ORDER BY 
pageviews DESC) as rank FROM d1").filter("rank <=
20").sort($"day",$"rank").collect().foreach(println)

with default
spark.driver.memory 

I am getting java.lang.OutOfMemoryError: Java heap space.
The same if I set spark.driver.memory=10g.

When I set spark.driver.memory=45g (the box has 256GB of RAM) the execution 
fails with a different error:

15/12/29 23:03:19 WARN HeartbeatReceiver: Removing executor 0 with no recent
heartbeats: 129324 ms exceeds timeout 12 ms

And I see that GC takes a lot of time.

What is a proper way to execute statements above?

I see the similar problems reported
http://stackoverflow.com/questions/32196859/org-apache-spark-shuffle-fetchfailedexception
http://stackoverflow.com/questions/32544478/spark-memory-settings-for-count-action-in-a-big-table









--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Problem-with-WINDOW-functions-tp25833.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Problem with WINDOW functions?

2015-12-29 Thread Cheng, Hao
Is there any improvement if you set a bigger memory for executors?

-Original Message-
From: va...@percona.com [mailto:va...@percona.com] On Behalf Of Vadim Tkachenko
Sent: Wednesday, December 30, 2015 9:51 AM
To: Cheng, Hao
Cc: user@spark.apache.org
Subject: Re: Problem with WINDOW functions?

Hi,

I am getting the same error with write.parquet("/path/to/file") :
 WARN HeartbeatReceiver: Removing executor 0 with no recent
heartbeats: 160714 ms exceeds timeout 12 ms
15/12/30 01:49:05 ERROR TaskSchedulerImpl: Lost executor 0 on
10.10.7.167: Executor heartbeat timed out after 160714 ms


On Tue, Dec 29, 2015 at 5:35 PM, Cheng, Hao <hao.ch...@intel.com> wrote:
> Can you try to write the result into another file instead? Let's see if there 
> any issue in the executors side .
>
> sqlContext.sql("SELECT day,page,dense_rank() OVER (PARTITION BY day 
> ORDER BY pageviews DESC) as rank FROM d1").filter("rank <=
> 20").sort($"day",$"rank").write.parquet("/path/to/file")
>
> -Original Message-
> From: vadimtk [mailto:apache...@gmail.com]
> Sent: Wednesday, December 30, 2015 9:29 AM
> To: user@spark.apache.org
> Subject: Problem with WINDOW functions?
>
> Hi,
>
> I can't successfully execute a query with WINDOW function.
>
> The statements are following:
>
> val orcFile =
> sqlContext.read.parquet("/data/flash/spark/dat14sn").filter("upper(pro
> ject)='EN'")
> orcFile.registerTempTable("d1")
>  sqlContext.sql("SELECT day,page,dense_rank() OVER (PARTITION BY day 
> ORDER BY pageviews DESC) as rank FROM d1").filter("rank <=
> 20").sort($"day",$"rank").collect().foreach(println)
>
> with default
> spark.driver.memory
>
> I am getting java.lang.OutOfMemoryError: Java heap space.
> The same if I set spark.driver.memory=10g.
>
> When I set spark.driver.memory=45g (the box has 256GB of RAM) the execution 
> fails with a different error:
>
> 15/12/29 23:03:19 WARN HeartbeatReceiver: Removing executor 0 with no 
> recent
> heartbeats: 129324 ms exceeds timeout 12 ms
>
> And I see that GC takes a lot of time.
>
> What is a proper way to execute statements above?
>
> I see the similar problems reported
> http://stackoverflow.com/questions/32196859/org-apache-spark-shuffle-f
> etchfailedexception 
> http://stackoverflow.com/questions/32544478/spark-memory-settings-for-
> count-action-in-a-big-table
>
>
>
>
>
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Problem-with-WINDO
> W-functions-tp25833.html Sent from the Apache Spark User List mailing 
> list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For 
> additional commands, e-mail: user-h...@spark.apache.org
>


RE: Problem with WINDOW functions?

2015-12-29 Thread Cheng, Hao
It’s not released yet, probably you need to compile it yourself. In the 
meantime, can you increase the partition number? By setting the " 
spark.sql.shuffle.partitions” to a bigger value.

And more details about your cluster size, partition size, yarn/standalone, 
executor resources etc. will be more helpful in understanding your problem.

From: Vadim Tkachenko [mailto:apache...@gmail.com]
Sent: Wednesday, December 30, 2015 10:49 AM
To: Cheng, Hao
Subject: Re: Problem with WINDOW functions?

I use 1.5.2.

Where can I get 1.6? I do not see it on http://spark.apache.org/downloads.html

Thanks,
Vadim


On Tue, Dec 29, 2015 at 6:47 PM, Cheng, Hao 
<hao.ch...@intel.com<mailto:hao.ch...@intel.com>> wrote:
Which version are you using? Have you tried the 1.6?

From: Vadim Tkachenko [mailto:apache...@gmail.com<mailto:apache...@gmail.com>]
Sent: Wednesday, December 30, 2015 10:17 AM

To: Cheng, Hao
Cc: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: Problem with WINDOW functions?

When I allocate 200g to executor, it is able to make better progress,
that is I see 189 tasks executed instead of 169 previously.
But eventually it fails with the same error.

On Tue, Dec 29, 2015 at 5:58 PM, Cheng, Hao 
<hao.ch...@intel.com<mailto:hao.ch...@intel.com>> wrote:
Is there any improvement if you set a bigger memory for executors?

-Original Message-
From: va...@percona.com<mailto:va...@percona.com> 
[mailto:va...@percona.com<mailto:va...@percona.com>] On Behalf Of Vadim 
Tkachenko
Sent: Wednesday, December 30, 2015 9:51 AM
To: Cheng, Hao
Cc: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: Problem with WINDOW functions?

Hi,

I am getting the same error with write.parquet("/path/to/file") :
 WARN HeartbeatReceiver: Removing executor 0 with no recent
heartbeats: 160714 ms exceeds timeout 12 ms
15/12/30 01:49:05 ERROR TaskSchedulerImpl: Lost executor 0 on
10.10.7.167<http://10.10.7.167>: Executor heartbeat timed out after 160714 ms


On Tue, Dec 29, 2015 at 5:35 PM, Cheng, Hao 
<hao.ch...@intel.com<mailto:hao.ch...@intel.com>> wrote:
> Can you try to write the result into another file instead? Let's see if there 
> any issue in the executors side .
>
> sqlContext.sql("SELECT day,page,dense_rank() OVER (PARTITION BY day
> ORDER BY pageviews DESC) as rank FROM d1").filter("rank <=
> 20").sort($"day",$"rank").write.parquet("/path/to/file")
>
> -Original Message-
> From: vadimtk [mailto:apache...@gmail.com<mailto:apache...@gmail.com>]
> Sent: Wednesday, December 30, 2015 9:29 AM
> To: user@spark.apache.org<mailto:user@spark.apache.org>
> Subject: Problem with WINDOW functions?
>
> Hi,
>
> I can't successfully execute a query with WINDOW function.
>
> The statements are following:
>
> val orcFile =
> sqlContext.read.parquet("/data/flash/spark/dat14sn").filter("upper(pro
> ject)='EN'")
> orcFile.registerTempTable("d1")
>  sqlContext.sql("SELECT day,page,dense_rank() OVER (PARTITION BY day
> ORDER BY pageviews DESC) as rank FROM d1").filter("rank <=
> 20").sort($"day",$"rank").collect().foreach(println)
>
> with default
> spark.driver.memory
>
> I am getting java.lang.OutOfMemoryError: Java heap space.
> The same if I set spark.driver.memory=10g.
>
> When I set spark.driver.memory=45g (the box has 256GB of RAM) the execution 
> fails with a different error:
>
> 15/12/29 23:03:19 WARN HeartbeatReceiver: Removing executor 0 with no
> recent
> heartbeats: 129324 ms exceeds timeout 12 ms
>
> And I see that GC takes a lot of time.
>
> What is a proper way to execute statements above?
>
> I see the similar problems reported
> http://stackoverflow.com/questions/32196859/org-apache-spark-shuffle-f
> etchfailedexception
> http://stackoverflow.com/questions/32544478/spark-memory-settings-for-
> count-action-in-a-big-table
>
>
>
>
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Problem-with-WINDO
> W-functions-tp25833.html Sent from the Apache Spark User List mailing
> list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: 
> user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org> 
> For
> additional commands, e-mail: 
> user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>
>




RE: Does Spark SQL support rollup like HQL

2015-12-29 Thread Cheng, Hao
Hi, currently, the Simple SQL Parser of SQLContext is quite weak, and doesn’t 
support the rollup, but you can check the code

https://github.com/apache/spark/pull/5080/ , which aimed to add the support, 
just in case you can patch it in your own branch.

In Spark 2.0, the simple SQL Parser will be replaced by HQL Parser, so it will 
not be the problem then.

Hao

From: Yi Zhang [mailto:zhangy...@yahoo.com.INVALID]
Sent: Wednesday, December 30, 2015 11:41 AM
To: User
Subject: Does Spark SQL support rollup like HQL

Hi guys,

As we know, hqlContext support rollup like this:

hiveContext.sql("select a, b, sum(c) from t group by a, b with rollup")

And I also knows that dataframe provides rollup function to support it:

dataframe.rollup($"a", $"b").agg(Map("c" -> "sum"))

But in my scenario, I'd better use sql syntax in SqlContext to support rollup 
it seems like what HqlContext does. Any suggestion?

Thanks.

Regards,
Yi Zhang


RE: Rule Engine for Spark

2015-11-04 Thread Cheng, Hao
Or try Streaming SQL? Which is a simple layer on top of the Spark Streaming. ☺

https://github.com/Intel-bigdata/spark-streamingsql


From: Cassa L [mailto:lcas...@gmail.com]
Sent: Thursday, November 5, 2015 8:09 AM
To: Adrian Tanase
Cc: Stefano Baghino; user
Subject: Re: Rule Engine for Spark

Thanks for reply. How about DROOLs. Does it worj with Spark?

LCassa

On Wed, Nov 4, 2015 at 3:02 AM, Adrian Tanase 
> wrote:
Another way to do it is to extract your filters as SQL code and load it in a 
transform – which allows you to change the filters at runtime.

Inside the transform you could apply the filters by goind RDD -> DF -> SQL -> 
RDD.

Lastly, depending on how complex your filters are, you could skip SQL and 
create your own mini-DSL that runs inside transform. I’d definitely start here 
if the filter predicates are simple enough…

-adrian

From: Stefano Baghino
Date: Wednesday, November 4, 2015 at 10:15 AM
To: Cassa L
Cc: user
Subject: Re: Rule Engine for Spark

Hi LCassa,
unfortunately I don't have actual experience on this matter, however for a 
similar use case I have briefly evaluated 
Decision (then called literally Streaming 
CEP Engine) and it looked interesting. I hope it may help.

On Wed, Nov 4, 2015 at 1:42 AM, Cassa L 
> wrote:
Hi,
 Has anyone used rule engine with spark streaming? I have a case where data is 
streaming from Kafka and I need to apply some rules on it (instead of hard 
coding in a code).
Thanks,
LCassa



--
BR,
Stefano Baghino
Software Engineer @ Radicalbit



RE: Sort Merge Join

2015-11-02 Thread Cheng, Hao
No as far as I can tell, @Michael @YinHuai @Reynold , any comments on this 
optimization?

From: Jonathan Coveney [mailto:jcove...@gmail.com]
Sent: Tuesday, November 3, 2015 4:17 AM
To: Alex Nastetsky
Cc: Cheng, Hao; user
Subject: Re: Sort Merge Join

Additionally, I'm curious if there are any JIRAS around making dataframes 
support ordering better? there are a lot of operations that can be optimized if 
you know that you have a total ordering on your data...are there any plans, or 
at least JIRAS, around having the catalyst optimizer handle this case?

2015-11-02 9:39 GMT-05:00 Alex Nastetsky 
<alex.nastet...@vervemobile.com<mailto:alex.nastet...@vervemobile.com>>:
Thanks for the response.

Taking the file system based data source as “UnknownPartitioning”, will be a 
simple and SAFE way for JOIN, as it’s hard to guarantee the records from 
different data sets with the identical join keys will be loaded by the same 
node/task , since lots of factors need to be considered, like task pool size, 
cluster size, source format, storage, data locality etc.,.
I’ll agree it’s worth to optimize it for performance concerns, and actually in 
Hive, it is called bucket join. I am not sure will that happens soon in Spark 
SQL.

Yes, this is supported in

  *   Hive with bucket join
  *   Pig with USING 
"merge"<https://pig.apache.org/docs/r0.15.0/perf.html#merge-joins>
  *   MR with CompositeInputFormat
But I guess it's not supported in Spark?

On Mon, Nov 2, 2015 at 12:32 AM, Cheng, Hao 
<hao.ch...@intel.com<mailto:hao.ch...@intel.com>> wrote:
1) Once SortMergeJoin is enabled, will it ever use ShuffledHashJoin? For 
example, in the code below, the two datasets have different number of 
partitions, but it still does a SortMerge join after a "hashpartitioning".

[Hao:] A distributed JOIN operation (either HashBased or SortBased Join) 
requires the records with the identical join keys MUST BE shuffled to the same 
“reducer” node / task, hashpartitioning is just a strategy to tell spark 
shuffle service how to achieve that, in theory, we even can use the 
`RangePartitioning` instead (but it’s less efficient, that’s why we don’t 
choose it for JOIN). So conceptually the JOIN operator doesn’t care so much 
about the shuffle strategy so much if it satisfies the demand on data 
distribution.

2) If both datasets have already been previously partitioned/sorted the same 
and stored on the file system (e.g. in a previous job), is there a way to tell 
Spark this so that it won't want to do a "hashpartitioning" on them? It looks 
like Spark just considers datasets that have been just read from the the file 
system to have UnknownPartitioning. In the example below, I try to join a 
dataframe to itself, and it still wants to hash repartition.

[Hao:] Take this as example:

EXPLAIN SELECT a.value, b.value, c.value FROM src a JOIN src b ON a.key=b.key 
JOIN src c ON b.key=c.key

== Physical Plan ==
TungstenProject [value#20,value#22,value#24]
SortMergeJoin [key#21], [key#23]
  TungstenSort [key#21 ASC], false, 0
   TungstenProject [key#21,value#22,value#20]
SortMergeJoin [key#19], [key#21]
 TungstenSort [key#19 ASC], false, 0
  TungstenExchange hashpartitioning(key#19,200)
   ConvertToUnsafe
HiveTableScan [key#19,value#20], (MetastoreRelation default, src, 
Some(a))
 TungstenSort [key#21 ASC], false, 0
  TungstenExchange hashpartitioning(key#21,200)
   ConvertToUnsafe
HiveTableScan [key#21,value#22], (MetastoreRelation default, src, 
Some(b))
  TungstenSort [key#23 ASC], false, 0
   TungstenExchange hashpartitioning(key#23,200)
ConvertToUnsafe
 HiveTableScan [key#23,value#24], (MetastoreRelation default, src, Some(c))

There is no hashpartitioning anymore for the RESULT of “FROM src a JOIN src b 
ON a.key=b.key”, as we didn’t change the data distribution after it, so we can 
join another table “JOIN src c ON b.key=c.key” directly, which only require the 
table “c” for repartitioning on “key”.

Taking the file system based data source as “UnknownPartitioning”, will be a 
simple and SAFE way for JOIN, as it’s hard to guarantee the records from 
different data sets with the identical join keys will be loaded by the same 
node/task , since lots of factors need to be considered, like task pool size, 
cluster size, source format, storage, data locality etc.,.
I’ll agree it’s worth to optimize it for performance concerns, and actually in 
Hive, it is called bucket join. I am not sure will that happens soon in Spark 
SQL.

Hao

From: Alex Nastetsky 
[mailto:alex.nastet...@vervemobile.com<mailto:alex.nastet...@vervemobile.com>]
Sent: Monday, November 2, 2015 11:29 AM
To: user
Subject: Sort Merge Join

Hi,

I'm trying to understand SortMergeJoin (SPARK-2213).

1) Once SortMergeJoin is enabled, will it ever use ShuffledHashJoin? For 
example, in the code below, the two datasets have different number of 
partitions, but it sti

RE: Sort Merge Join

2015-11-01 Thread Cheng, Hao
1) Once SortMergeJoin is enabled, will it ever use ShuffledHashJoin? For 
example, in the code below, the two datasets have different number of 
partitions, but it still does a SortMerge join after a "hashpartitioning".

[Hao:] A distributed JOIN operation (either HashBased or SortBased Join) 
requires the records with the identical join keys MUST BE shuffled to the same 
“reducer” node / task, hashpartitioning is just a strategy to tell spark 
shuffle service how to achieve that, in theory, we even can use the 
`RangePartitioning` instead (but it’s less efficient, that’s why we don’t 
choose it for JOIN). So conceptually the JOIN operator doesn’t care so much 
about the shuffle strategy so much if it satisfies the demand on data 
distribution.

2) If both datasets have already been previously partitioned/sorted the same 
and stored on the file system (e.g. in a previous job), is there a way to tell 
Spark this so that it won't want to do a "hashpartitioning" on them? It looks 
like Spark just considers datasets that have been just read from the the file 
system to have UnknownPartitioning. In the example below, I try to join a 
dataframe to itself, and it still wants to hash repartition.

[Hao:] Take this as example:

EXPLAIN SELECT a.value, b.value, c.value FROM src a JOIN src b ON a.key=b.key 
JOIN src c ON b.key=c.key

== Physical Plan ==
TungstenProject [value#20,value#22,value#24]
SortMergeJoin [key#21], [key#23]
  TungstenSort [key#21 ASC], false, 0
   TungstenProject [key#21,value#22,value#20]
SortMergeJoin [key#19], [key#21]
 TungstenSort [key#19 ASC], false, 0
  TungstenExchange hashpartitioning(key#19,200)
   ConvertToUnsafe
HiveTableScan [key#19,value#20], (MetastoreRelation default, src, 
Some(a))
 TungstenSort [key#21 ASC], false, 0
  TungstenExchange hashpartitioning(key#21,200)
   ConvertToUnsafe
HiveTableScan [key#21,value#22], (MetastoreRelation default, src, 
Some(b))
  TungstenSort [key#23 ASC], false, 0
   TungstenExchange hashpartitioning(key#23,200)
ConvertToUnsafe
 HiveTableScan [key#23,value#24], (MetastoreRelation default, src, Some(c))

There is no hashpartitioning anymore for the RESULT of “FROM src a JOIN src b 
ON a.key=b.key”, as we didn’t change the data distribution after it, so we can 
join another table “JOIN src c ON b.key=c.key” directly, which only require the 
table “c” for repartitioning on “key”.

Taking the file system based data source as “UnknownPartitioning”, will be a 
simple and SAFE way for JOIN, as it’s hard to guarantee the records from 
different data sets with the identical join keys will be loaded by the same 
node/task , since lots of factors need to be considered, like task pool size, 
cluster size, source format, storage, data locality etc.,.
I’ll agree it’s worth to optimize it for performance concerns, and actually in 
Hive, it is called bucket join. I am not sure will that happens soon in Spark 
SQL.

Hao

From: Alex Nastetsky [mailto:alex.nastet...@vervemobile.com]
Sent: Monday, November 2, 2015 11:29 AM
To: user
Subject: Sort Merge Join

Hi,

I'm trying to understand SortMergeJoin (SPARK-2213).

1) Once SortMergeJoin is enabled, will it ever use ShuffledHashJoin? For 
example, in the code below, the two datasets have different number of 
partitions, but it still does a SortMerge join after a "hashpartitioning".

CODE:
   val sparkConf = new SparkConf()
  .setAppName("SortMergeJoinTest")
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .set("spark.eventLog.enabled", "true")
  .set("spark.sql.planner.sortMergeJoin","true")

sparkConf.setMaster("local-cluster[3,1,1024]")

val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._

val inputpath = input.gz.parquet

val df1 = sqlContext.read.parquet(inputpath).repartition(3)
val df2 = sqlContext.read.parquet(inputpath).repartition(5)
val result = df1.join(df2.withColumnRenamed("foo","foo2"), $"foo" === 
$"foo2")
result.explain()

OUTPUT:
== Physical Plan ==
SortMergeJoin [foo#0], [foo2#8]
TungstenSort [foo#0 ASC], false, 0
  TungstenExchange hashpartitioning(foo#0)
  ConvertToUnsafe
Repartition 3, true
Scan 
ParquetRelation[file:input.gz.parquet][foo#0,bar#1L,somefield#2,anotherfield#3]
TungstenSort [foo2#8 ASC], false, 0
  TungstenExchange hashpartitioning(foo2#8)
  TungstenProject [foo#4 AS foo2#8,bar#5L,somefield#6,anotherfield#7]
Repartition 5, true
Scan 
ParquetRelation[file:input.gz.parquet][foo#4,bar#5L,somefield#6,anotherfield#7]

2) If both datasets have already been previously partitioned/sorted the same 
and stored on the file system (e.g. in a previous job), is there a way to tell 
Spark this so that it won't want to do a "hashpartitioning" on them? It looks 
like Spark just considers datasets that have been just read from the the file 

RE: [Spark-SQL]: Unable to propagate hadoop configuration after SparkContext is initialized

2015-10-28 Thread Cheng, Hao
Hi Jerry, I’ve filed a bug in jira, and also the fixing

https://issues.apache.org/jira/browse/SPARK-11364

It will be great appreciated if you can verify the PR with your case.

Thanks,
Hao

From: Cheng, Hao [mailto:hao.ch...@intel.com]
Sent: Wednesday, October 28, 2015 8:51 AM
To: Jerry Lam; Marcelo Vanzin
Cc: user@spark.apache.org
Subject: RE: [Spark-SQL]: Unable to propagate hadoop configuration after 
SparkContext is initialized

After a draft glance, seems a bug in Spark SQL, do you mind to create a jira 
for this? And then I can start to fix it.

Thanks,
Hao

From: Jerry Lam [mailto:chiling...@gmail.com]
Sent: Wednesday, October 28, 2015 3:13 AM
To: Marcelo Vanzin
Cc: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: [Spark-SQL]: Unable to propagate hadoop configuration after 
SparkContext is initialized

Hi Marcelo,

I tried setting the properties before instantiating spark context via 
SparkConf. It works fine.
Originally, the code I have read hadoop configurations from hdfs-site.xml which 
works perfectly fine as well.
Therefore, can I conclude that sparkContext.hadoopConfiguration.set("key", 
"value") does not propagate through all SQL jobs within the same SparkContext? 
I haven't try with Spark Core so I cannot tell.

Is there a workaround given it seems to be broken? I need to do this 
programmatically after the SparkContext is instantiated not before...

Best Regards,

Jerry

On Tue, Oct 27, 2015 at 2:30 PM, Marcelo Vanzin 
<van...@cloudera.com<mailto:van...@cloudera.com>> wrote:
If setting the values in SparkConf works, there's probably some bug in
the SQL code; e.g. creating a new Configuration object instead of
using the one in SparkContext. But I'm not really familiar with that
code.

On Tue, Oct 27, 2015 at 11:22 AM, Jerry Lam 
<chiling...@gmail.com<mailto:chiling...@gmail.com>> wrote:
> Hi Marcelo,
>
> Thanks for the advice. I understand that we could set the configurations
> before creating SparkContext. My question is
> SparkContext.hadoopConfiguration.set("key","value") doesn't seem to
> propagate to all subsequent SQLContext jobs. Note that I mentioned I can
> load the parquet file but I cannot perform a count on the parquet file
> because of the AmazonClientException. It means that the credential is used
> during the loading of the parquet but not when we are processing the parquet
> file. How this can happen?
>
> Best Regards,
>
> Jerry
>
>
> On Tue, Oct 27, 2015 at 2:05 PM, Marcelo Vanzin 
> <van...@cloudera.com<mailto:van...@cloudera.com>> wrote:
>>
>> On Tue, Oct 27, 2015 at 10:43 AM, Jerry Lam 
>> <chiling...@gmail.com<mailto:chiling...@gmail.com>> wrote:
>> > Anyone experiences issues in setting hadoop configurations after
>> > SparkContext is initialized? I'm using Spark 1.5.1.
>> >
>> > I'm trying to use s3a which requires access and secret key set into
>> > hadoop
>> > configuration. I tried to set the properties in the hadoop configuration
>> > from sparktcontext.
>> >
>> > sc.hadoopConfiguration.set("fs.s3a.access.key", AWSAccessKeyId)
>> > sc.hadoopConfiguration.set("fs.s3a.secret.key", AWSSecretKey)
>>
>> Try setting "spark.hadoop.fs.s3a.access.key" and
>> "spark.hadoop.fs.s3a.secret.key" in your SparkConf before creating the
>> SparkContext.
>>
>> --
>> Marcelo
>
>

--
Marcelo



RE: SparkSQL on hive error

2015-10-27 Thread Cheng, Hao
Hi Anand, can you paste the table creating statement? I’d like to reproduce 
that in my local first, and BTW, which version are you using?

Hao

From: Anand Nalya [mailto:anand.na...@gmail.com]
Sent: Tuesday, October 27, 2015 11:35 PM
To: spark users
Subject: SparkSQL on hive error

Hi,
I've a partitioned table in Hive (Avro) that I can query alright from hive cli.
When using SparkSQL, I'm able to query some of the partitions, but getting  
exception on some of the partitions.
The query is:
sqlContext.sql("select * from myTable where source='http' and date = 
20150812").take(5).foreach(println)
The exception is:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 
5, node1): java.lang.IllegalArgumentException: Error: type expected at the 
position 0 of 
'BIGINT:INT:INT:INT:INT:string:INT:string:string:string:string:string:string:string:string:string:string:string:string:string:string:INT:INT:string:BIGINT:string:string:BIGINT:BIGINT:string:string:string:string:string:FLOAT:FLOAT:string:string:string:string:BIGINT:BIGINT:string:string:string:string:string:string:BIGINT:string:string'
 but 'BIGINT' is found.
at 
org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils$TypeInfoParser.expect(TypeInfoUtils.java:348)
at 
org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils$TypeInfoParser.expect(TypeInfoUtils.java:331)
at 
org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils$TypeInfoParser.parseType(TypeInfoUtils.java:392)
at 
org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils$TypeInfoParser.parseTypeInfos(TypeInfoUtils.java:305)
at 
org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils.getTypeInfosFromTypeString(TypeInfoUtils.java:762)
at 
org.apache.hadoop.hive.serde2.avro.AvroSerDe.initialize(AvroSerDe.java:105)
at 
org.apache.spark.sql.hive.HadoopTableReader$$anonfun$4$$anonfun$9.apply(TableReader.scala:191)
at 
org.apache.spark.sql.hive.HadoopTableReader$$anonfun$4$$anonfun$9.apply(TableReader.scala:188)
at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)
at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Any pointers, what might be wrong here?

Regards,
Anand



RE: [Spark-SQL]: Unable to propagate hadoop configuration after SparkContext is initialized

2015-10-27 Thread Cheng, Hao
After a draft glance, seems a bug in Spark SQL, do you mind to create a jira 
for this? And then I can start to fix it.

Thanks,
Hao

From: Jerry Lam [mailto:chiling...@gmail.com]
Sent: Wednesday, October 28, 2015 3:13 AM
To: Marcelo Vanzin
Cc: user@spark.apache.org
Subject: Re: [Spark-SQL]: Unable to propagate hadoop configuration after 
SparkContext is initialized

Hi Marcelo,

I tried setting the properties before instantiating spark context via 
SparkConf. It works fine.
Originally, the code I have read hadoop configurations from hdfs-site.xml which 
works perfectly fine as well.
Therefore, can I conclude that sparkContext.hadoopConfiguration.set("key", 
"value") does not propagate through all SQL jobs within the same SparkContext? 
I haven't try with Spark Core so I cannot tell.

Is there a workaround given it seems to be broken? I need to do this 
programmatically after the SparkContext is instantiated not before...

Best Regards,

Jerry

On Tue, Oct 27, 2015 at 2:30 PM, Marcelo Vanzin 
> wrote:
If setting the values in SparkConf works, there's probably some bug in
the SQL code; e.g. creating a new Configuration object instead of
using the one in SparkContext. But I'm not really familiar with that
code.

On Tue, Oct 27, 2015 at 11:22 AM, Jerry Lam 
> wrote:
> Hi Marcelo,
>
> Thanks for the advice. I understand that we could set the configurations
> before creating SparkContext. My question is
> SparkContext.hadoopConfiguration.set("key","value") doesn't seem to
> propagate to all subsequent SQLContext jobs. Note that I mentioned I can
> load the parquet file but I cannot perform a count on the parquet file
> because of the AmazonClientException. It means that the credential is used
> during the loading of the parquet but not when we are processing the parquet
> file. How this can happen?
>
> Best Regards,
>
> Jerry
>
>
> On Tue, Oct 27, 2015 at 2:05 PM, Marcelo Vanzin 
> > wrote:
>>
>> On Tue, Oct 27, 2015 at 10:43 AM, Jerry Lam 
>> > wrote:
>> > Anyone experiences issues in setting hadoop configurations after
>> > SparkContext is initialized? I'm using Spark 1.5.1.
>> >
>> > I'm trying to use s3a which requires access and secret key set into
>> > hadoop
>> > configuration. I tried to set the properties in the hadoop configuration
>> > from sparktcontext.
>> >
>> > sc.hadoopConfiguration.set("fs.s3a.access.key", AWSAccessKeyId)
>> > sc.hadoopConfiguration.set("fs.s3a.secret.key", AWSSecretKey)
>>
>> Try setting "spark.hadoop.fs.s3a.access.key" and
>> "spark.hadoop.fs.s3a.secret.key" in your SparkConf before creating the
>> SparkContext.
>>
>> --
>> Marcelo
>
>


--
Marcelo



RE: HiveContext ignores ("skip.header.line.count"="1")

2015-10-26 Thread Cheng, Hao
I am not sure if we really want to support that with HiveContext, but a 
workround is to use the Spark package at https://github.com/databricks/spark-csv


From: Felix Cheung [mailto:felixcheun...@hotmail.com]
Sent: Tuesday, October 27, 2015 10:54 AM
To: Daniel Haviv; user
Subject: RE: HiveContext ignores ("skip.header.line.count"="1")

Please open a JIRA?



Date: Mon, 26 Oct 2015 15:32:42 +0200
Subject: HiveContext ignores ("skip.header.line.count"="1")
From: daniel.ha...@veracity-group.com
To: user@spark.apache.org
Hi,
I have a csv table in Hive which is configured to skip the header row using 
TBLPROPERTIES("skip.header.line.count"="1").
When querying from Hive the header row is not included in the data, but when 
running the same query via HiveContext I get the header row.

I made sure that HiveContext sees the skip.header.line.count setting by running 
"show create table"

Any ideas?

Thank you.
Daniel


RE: Hive with apache spark

2015-10-11 Thread Cheng, Hao
One option is you can read the data via JDBC, however, probably it's the worst 
option, as you probably need some hacky work to enable the parallel reading in 
Spark SQL.
Another option is copy the hive-site.xml of your Hive Server to 
$SPARK_HOME/conf, then Spark SQL will see everything that Hive Server does, and 
you can load the Hive table as need.


-Original Message-
From: Hafiz Mujadid [mailto:hafizmujadi...@gmail.com] 
Sent: Monday, October 12, 2015 1:43 AM
To: user@spark.apache.org
Subject: Hive with apache spark

Hi

how can we read data from external hive server. Hive server is running and I 
want to read data remotely using spark. is there any example ?


thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Hive-with-apache-spark-tp25020.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Saprk 1.5 - How to join 3 RDDs in a SQL DF?

2015-10-11 Thread Cheng, Hao
A join B join C === (A join B) join C
Semantically they are equivalent, right?

From: Richard Eggert [mailto:richard.egg...@gmail.com]
Sent: Monday, October 12, 2015 5:12 AM
To: Subhajit Purkayastha
Cc: User
Subject: Re: Saprk 1.5 - How to join 3 RDDs in a SQL DF?


It's the same as joining 2. Join two together, and then join the third one to 
the result of that.
On Oct 11, 2015 2:57 PM, "Subhajit Purkayastha" 
> wrote:
Can I join 3 different RDDs together in a Spark SQL DF? I can find examples for 
2 RDDs but not 3.

Thanks



RE: Join Order Optimization

2015-10-11 Thread Cheng, Hao
Spark SQL supports very basic join reordering optimization, based on the raw 
table data size, this was added couple major releases back.

And the “EXPLAIN EXTENDED query” command is a very informative tool to verify 
whether the optimization taking effect.

From: Raajay [mailto:raaja...@gmail.com]
Sent: Sunday, October 11, 2015 9:22 AM
To: user@spark.apache.org
Subject: Join Order Optimization

Hello,
Does Spark-SQL support join order optimization as of the 1.5.1 release ? From 
the release notes, I did not see support for this feature, but figured will ask 
the users-list to be sure.
Thanks
Raajay


RE: Saprk 1.5 - How to join 3 RDDs in a SQL DF?

2015-10-11 Thread Cheng, Hao
Thank you Ted, that’s very informative; from the DB optimization point of view, 
the Cost Base join re-ordering, and the multi-way joins does provide better 
performance;

But from the API design point of view, 2 arguments (relation) for JOIN in the 
DF API probably be enough for the multiple tables join cases, as we can always 
use the nested 2 way joins to represents the multi-joins.
For example: A join B join C join D (multi-way join)=>
((A join B) join C) join D
 (A join (B join C)) join D
(A join B) join (C join D) etc.
That also means, we leave the optimization work for Spark SQL, not by users, 
and we believe Spark SQL can do most of the dirty work for us.

However, sometimes, people do write an optimal SQL (e.g. with better join 
ordering) than the Spark SQL optimizer does, then we’d better resort to the API 
SqlContext.sql(“”).

Cheers
Hao

From: Ted Yu [mailto:yuzhih...@gmail.com]
Sent: Monday, October 12, 2015 8:37 AM
To: Cheng, Hao
Cc: Richard Eggert; Subhajit Purkayastha; User
Subject: Re: Saprk 1.5 - How to join 3 RDDs in a SQL DF?

Some weekend reading:
http://stackoverflow.com/questions/20022196/are-left-outer-joins-associative

Cheers

On Sun, Oct 11, 2015 at 5:32 PM, Cheng, Hao 
<hao.ch...@intel.com<mailto:hao.ch...@intel.com>> wrote:
A join B join C === (A join B) join C
Semantically they are equivalent, right?

From: Richard Eggert 
[mailto:richard.egg...@gmail.com<mailto:richard.egg...@gmail.com>]
Sent: Monday, October 12, 2015 5:12 AM
To: Subhajit Purkayastha
Cc: User
Subject: Re: Saprk 1.5 - How to join 3 RDDs in a SQL DF?


It's the same as joining 2. Join two together, and then join the third one to 
the result of that.
On Oct 11, 2015 2:57 PM, "Subhajit Purkayastha" 
<spurk...@p3si.net<mailto:spurk...@p3si.net>> wrote:
Can I join 3 different RDDs together in a Spark SQL DF? I can find examples for 
2 RDDs but not 3.

Thanks




RE: Join Order Optimization

2015-10-11 Thread Cheng, Hao
Probably you have to read the source code, I am not sure if there are any .ppt 
or slides.

Hao

From: VJ Anand [mailto:vjan...@sankia.com]
Sent: Monday, October 12, 2015 11:43 AM
To: Cheng, Hao
Cc: Raajay; user@spark.apache.org
Subject: Re: Join Order Optimization

Hi - Is there a design document for those operations that have been implemented 
in 1.4.0? if so,where can I find them
-VJ

On Sun, Oct 11, 2015 at 7:27 PM, Cheng, Hao 
<hao.ch...@intel.com<mailto:hao.ch...@intel.com>> wrote:
Yes, I think the SPARK-2211 should be the right place to follow the CBO stuff, 
but probably that will not happen right away.

The jira issue introduce the statistic info can be found at:
https://issues.apache.org/jira/browse/SPARK-2393

Hao

From: Raajay [mailto:raaja...@gmail.com<mailto:raaja...@gmail.com>]
Sent: Monday, October 12, 2015 10:17 AM
To: Cheng, Hao
Cc: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: Join Order Optimization

Hi Cheng,
Could you point me to the JIRA that introduced this change ?

Also, is this SPARK-2211 the right issue to follow for cost-based optimization?
Thanks
Raajay


On Sun, Oct 11, 2015 at 7:57 PM, Cheng, Hao 
<hao.ch...@intel.com<mailto:hao.ch...@intel.com>> wrote:
Spark SQL supports very basic join reordering optimization, based on the raw 
table data size, this was added couple major releases back.

And the “EXPLAIN EXTENDED query” command is a very informative tool to verify 
whether the optimization taking effect.

From: Raajay [mailto:raaja...@gmail.com<mailto:raaja...@gmail.com>]
Sent: Sunday, October 11, 2015 9:22 AM
To: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Join Order Optimization

Hello,
Does Spark-SQL support join order optimization as of the 1.5.1 release ? From 
the release notes, I did not see support for this feature, but figured will ask 
the users-list to be sure.
Thanks
Raajay




--
VJ Anand
Founder
Sankia
vjan...@sankia.com<mailto:vjan...@sankia.com>
925-640-1340
www.sankia.com<http://www.sankia.com>

Confidentiality Notice: This e-mail message, including any attachments, is for 
the sole use of the intended recipient(s) and may contain confidential and 
privileged information. Any unauthorized review, use, disclosure or 
distribution is prohibited. If you are not the intended recipient, please 
contact the sender by reply e-mail and destroy all copies of the original 
message


RE: Join Order Optimization

2015-10-11 Thread Cheng, Hao
Yes, I think the SPARK-2211 should be the right place to follow the CBO stuff, 
but probably that will not happen right away.

The jira issue introduce the statistic info can be found at:
https://issues.apache.org/jira/browse/SPARK-2393

Hao

From: Raajay [mailto:raaja...@gmail.com]
Sent: Monday, October 12, 2015 10:17 AM
To: Cheng, Hao
Cc: user@spark.apache.org
Subject: Re: Join Order Optimization

Hi Cheng,
Could you point me to the JIRA that introduced this change ?

Also, is this SPARK-2211 the right issue to follow for cost-based optimization?
Thanks
Raajay


On Sun, Oct 11, 2015 at 7:57 PM, Cheng, Hao 
<hao.ch...@intel.com<mailto:hao.ch...@intel.com>> wrote:
Spark SQL supports very basic join reordering optimization, based on the raw 
table data size, this was added couple major releases back.

And the “EXPLAIN EXTENDED query” command is a very informative tool to verify 
whether the optimization taking effect.

From: Raajay [mailto:raaja...@gmail.com<mailto:raaja...@gmail.com>]
Sent: Sunday, October 11, 2015 9:22 AM
To: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Join Order Optimization

Hello,
Does Spark-SQL support join order optimization as of the 1.5.1 release ? From 
the release notes, I did not see support for this feature, but figured will ask 
the users-list to be sure.
Thanks
Raajay



RE: Insert via HiveContext is slow

2015-10-09 Thread Cheng, Hao
I think DF performs the same as the SQL API does in the multi-inserts, if you 
don’t use the cached table.

Hao

From: Daniel Haviv [mailto:daniel.ha...@veracity-group.com]
Sent: Friday, October 9, 2015 3:09 PM
To: Cheng, Hao
Cc: user
Subject: Re: Insert via HiveContext is slow

Thanks Hao.
It seems like one issue.
The other issue to me seems the renaming of files at the end of the insert.
would DF.save perform the task better?

Thanks,
Daniel

On Fri, Oct 9, 2015 at 3:35 AM, Cheng, Hao 
<hao.ch...@intel.com<mailto:hao.ch...@intel.com>> wrote:
I think that’s a known performance issue(Compared to Hive) of Spark SQL in 
multi-inserts.
A workaround is create a temp cached table for the projection first, and then 
do the multiple inserts base on the cached table.

We are actually working on the POC of some similar cases, hopefully it comes 
out soon.

Hao

From: Daniel Haviv 
[mailto:daniel.ha...@veracity-group.com<mailto:daniel.ha...@veracity-group.com>]
Sent: Friday, October 9, 2015 3:08 AM
To: user
Subject: Re: Insert via HiveContext is slow

Forgot to mention that my insert is a multi table insert :
sqlContext2.sql("""from avro_events
   lateral view explode(usChnlList) usParamLine as usParamLine
   lateral view explode(dsChnlList) dsParamLine as dsParamLine
   insert into table UpStreamParam partition(day_ts, cmtsid)
   select cmtstimestamp,datats,macaddress,
usParamLine['chnlidx'] chnlidx,
usParamLine['modulation'] modulation,
usParamLine['severity'] severity,
usParamLine['rxpower'] rxpower,
usParamLine['sigqnoise'] sigqnoise,
usParamLine['noisedeviation'] noisedeviation,
usParamLine['prefecber'] prefecber,
usParamLine['postfecber'] postfecber,
usParamLine['txpower'] txpower,
usParamLine['txpowerdrop'] txpowerdrop,
usParamLine['nmter'] nmter,
usParamLine['premtter'] premtter,
usParamLine['postmtter'] postmtter,
usParamLine['unerroreds'] unerroreds,
usParamLine['corrected'] corrected,
usParamLine['uncorrectables'] uncorrectables,
from_unixtime(cast(datats/1000 as bigint),'MMdd') 
day_ts,
cmtsid
   insert into table DwnStreamParam partition(day_ts, cmtsid)
   select  cmtstimestamp,datats,macaddress,
dsParamLine['chnlidx'] chnlidx,
dsParamLine['modulation'] modulation,
dsParamLine['severity'] severity,
dsParamLine['rxpower'] rxpower,
dsParamLine['sigqnoise'] sigqnoise,
dsParamLine['noisedeviation'] noisedeviation,
dsParamLine['prefecber'] prefecber,
dsParamLine['postfecber'] postfecber,
dsParamLine['sigqrxmer'] sigqrxmer,
dsParamLine['sigqmicroreflection'] sigqmicroreflection,
dsParamLine['unerroreds'] unerroreds,
dsParamLine['corrected'] corrected,
dsParamLine['uncorrectables'] uncorrectables,
from_unixtime(cast(datats/1000 as bigint),'MMdd') 
day_ts,
cmtsid

""")



On Thu, Oct 8, 2015 at 9:51 PM, Daniel Haviv 
<daniel.ha...@veracity-group.com<mailto:daniel.ha...@veracity-group.com>> wrote:
Hi,
I'm inserting into a partitioned ORC table using an insert sql statement passed 
via HiveContext.
The performance I'm getting is pretty bad and I was wondering if there are ways 
to speed things up.
Would saving the DF like this 
df.write().mode(SaveMode.Append).partitionBy("date").saveAsTable("Tablename") 
be faster ?


Thank you.
Daniel




RE: Insert via HiveContext is slow

2015-10-08 Thread Cheng, Hao
I think that’s a known performance issue(Compared to Hive) of Spark SQL in 
multi-inserts.
A workaround is create a temp cached table for the projection first, and then 
do the multiple inserts base on the cached table.

We are actually working on the POC of some similar cases, hopefully it comes 
out soon.

Hao

From: Daniel Haviv [mailto:daniel.ha...@veracity-group.com]
Sent: Friday, October 9, 2015 3:08 AM
To: user
Subject: Re: Insert via HiveContext is slow

Forgot to mention that my insert is a multi table insert :
sqlContext2.sql("""from avro_events
   lateral view explode(usChnlList) usParamLine as usParamLine
   lateral view explode(dsChnlList) dsParamLine as dsParamLine
   insert into table UpStreamParam partition(day_ts, cmtsid)
   select cmtstimestamp,datats,macaddress,
usParamLine['chnlidx'] chnlidx,
usParamLine['modulation'] modulation,
usParamLine['severity'] severity,
usParamLine['rxpower'] rxpower,
usParamLine['sigqnoise'] sigqnoise,
usParamLine['noisedeviation'] noisedeviation,
usParamLine['prefecber'] prefecber,
usParamLine['postfecber'] postfecber,
usParamLine['txpower'] txpower,
usParamLine['txpowerdrop'] txpowerdrop,
usParamLine['nmter'] nmter,
usParamLine['premtter'] premtter,
usParamLine['postmtter'] postmtter,
usParamLine['unerroreds'] unerroreds,
usParamLine['corrected'] corrected,
usParamLine['uncorrectables'] uncorrectables,
from_unixtime(cast(datats/1000 as bigint),'MMdd') 
day_ts,
cmtsid
   insert into table DwnStreamParam partition(day_ts, cmtsid)
   select  cmtstimestamp,datats,macaddress,
dsParamLine['chnlidx'] chnlidx,
dsParamLine['modulation'] modulation,
dsParamLine['severity'] severity,
dsParamLine['rxpower'] rxpower,
dsParamLine['sigqnoise'] sigqnoise,
dsParamLine['noisedeviation'] noisedeviation,
dsParamLine['prefecber'] prefecber,
dsParamLine['postfecber'] postfecber,
dsParamLine['sigqrxmer'] sigqrxmer,
dsParamLine['sigqmicroreflection'] sigqmicroreflection,
dsParamLine['unerroreds'] unerroreds,
dsParamLine['corrected'] corrected,
dsParamLine['uncorrectables'] uncorrectables,
from_unixtime(cast(datats/1000 as bigint),'MMdd') 
day_ts,
cmtsid

""")



On Thu, Oct 8, 2015 at 9:51 PM, Daniel Haviv 
> wrote:
Hi,
I'm inserting into a partitioned ORC table using an insert sql statement passed 
via HiveContext.
The performance I'm getting is pretty bad and I was wondering if there are ways 
to speed things up.
Would saving the DF like this 
df.write().mode(SaveMode.Append).partitionBy("date").saveAsTable("Tablename") 
be faster ?


Thank you.
Daniel



RE: Performance Spark SQL vs Dataframe API faster

2015-09-22 Thread Cheng, Hao
Yes, should be the same, as they are just different frontend, but the same 
thing in optimization / execution.

-Original Message-
From: sanderg [mailto:s.gee...@wimionline.be] 
Sent: Tuesday, September 22, 2015 10:06 PM
To: user@spark.apache.org
Subject: Performance Spark SQL vs Dataframe API faster

Is there a difference in performance between writing a spark job using only SQL 
statements and writing it using the dataframe api or does it translate to the 
same thing under the hood?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Performance-Spark-SQL-vs-Dataframe-API-faster-tp24768.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: RE: spark sql hook

2015-09-16 Thread Cheng, Hao
Probably a workable solution is, create your own SQLContext by extending the 
class HiveContext, and override the `analyzer`, and add your own rule to do the 
hacking.

From: r7raul1...@163.com [mailto:r7raul1...@163.com]
Sent: Thursday, September 17, 2015 11:08 AM
To: Cheng, Hao; user
Subject: Re: RE: spark sql hook

Example:
select * from test.table chang to  select * from production.table


r7raul1...@163.com<mailto:r7raul1...@163.com>

From: Cheng, Hao<mailto:hao.ch...@intel.com>
Date: 2015-09-17 11:05
To: r7raul1...@163.com<mailto:r7raul1...@163.com>; 
user<mailto:user@spark.apache.org>
Subject: RE: spark sql hook
Catalyst TreeNode is very fundamental API, not sure what kind of hook you need. 
Any concrete example will be more helpful to understand your requirement.

Hao

From: r7raul1...@163.com<mailto:r7raul1...@163.com> [mailto:r7raul1...@163.com]
Sent: Thursday, September 17, 2015 10:54 AM
To: user
Subject: spark sql hook


I want to modify some sql treenode before execute. I cau do this by hive hook 
in hive. Does spark support such hook? Any advise?

r7raul1...@163.com<mailto:r7raul1...@163.com>


RE: spark sql hook

2015-09-16 Thread Cheng, Hao
Catalyst TreeNode is very fundamental API, not sure what kind of hook you need. 
Any concrete example will be more helpful to understand your requirement.

Hao

From: r7raul1...@163.com [mailto:r7raul1...@163.com]
Sent: Thursday, September 17, 2015 10:54 AM
To: user
Subject: spark sql hook


I want to modify some sql treenode before execute. I cau do this by hive hook 
in hive. Does spark support such hook? Any advise?

r7raul1...@163.com


RE: Re:RE: spark 1.5 SQL slows down dramatically by 50%+ compared with spark 1.4.1 SQL

2015-09-11 Thread Cheng, Hao
You mean the performance is still slow as the SMJ in Spark 1.5?

Can you set the spark.shuffle.reduceLocality.enabled=false when you start the 
spark-shell/spark-sql? It’s a new feature in Spark 1.5, and it’s true by 
default, but we found it probably causes the performance reduce dramatically.


From: Todd [mailto:bit1...@163.com]
Sent: Friday, September 11, 2015 2:17 PM
To: Cheng, Hao
Cc: Jesse F Chen; Michael Armbrust; user@spark.apache.org
Subject: Re:RE: spark 1.5 SQL slows down dramatically by 50%+ compared with 
spark 1.4.1 SQL

Thanks Hao for the reply.
I turn the merge sort join off, the physical plan is below, but the performance 
is roughly the same as it on...

== Physical Plan ==
TungstenProject 
[ss_quantity#10,ss_list_price#12,ss_coupon_amt#19,ss_cdemo_sk#4,ss_item_sk#2,ss_promo_sk#8,ss_sold_date_sk#0]
 ShuffledHashJoin [ss_item_sk#2], [ss_item_sk#25], BuildRight
  TungstenExchange hashpartitioning(ss_item_sk#2)
   ConvertToUnsafe
Scan 
ParquetRelation[hdfs://ns1/tmp/spark_perf/scaleFactor=30/useDecimal=true/store_sales][ss_promo_sk#8,ss_quantity#10,ss_cdemo_sk#4,ss_list_price#12,ss_coupon_amt#19,ss_item_sk#2,ss_sold_date_sk#0]
  TungstenExchange hashpartitioning(ss_item_sk#25)
   ConvertToUnsafe
Scan 
ParquetRelation[hdfs://ns1/tmp/spark_perf/scaleFactor=30/useDecimal=true/store_sales][ss_item_sk#25]

Code Generation: true




At 2015-09-11 13:48:23, "Cheng, Hao" 
<hao.ch...@intel.com<mailto:hao.ch...@intel.com>> wrote:

This is not a big surprise the SMJ is slower than the HashJoin, as we do not 
fully utilize the sorting yet, more details can be found at 
https://issues.apache.org/jira/browse/SPARK-2926 .

Anyway, can you disable the sort merge join by 
“spark.sql.planner.sortMergeJoin=false;” in Spark 1.5, and run the query again? 
In our previous testing, it’s about 20% slower for sort merge join. I am not 
sure if there anything else slow down the performance.

Hao


From: Jesse F Chen [mailto:jfc...@us.ibm.com<mailto:jfc...@us.ibm.com>]
Sent: Friday, September 11, 2015 1:18 PM
To: Michael Armbrust
Cc: Todd; user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: spark 1.5 SQL slows down dramatically by 50%+ compared with spark 
1.4.1 SQL


Could this be a build issue (i.e., sbt package)?

If I ran the same jar build for 1.4.1 in 1.5, I am seeing large regression too 
in queries (all other things identical)...

I am curious, to build 1.5 (when it isn't released yet), what do I need to do 
with the build.sbt file?

any special parameters i should be using to make sure I load the latest hive 
dependencies?

[Inactive hide details for Michael Armbrust ---09/10/2015 11:07:28 AM---I've 
been running TPC-DS SF=1500 daily on Spark 1.4.1 an]Michael Armbrust 
---09/10/2015 11:07:28 AM---I've been running TPC-DS SF=1500 daily on Spark 
1.4.1 and Spark 1.5 on S3, so this is surprising.  I

From: Michael Armbrust <mich...@databricks.com<mailto:mich...@databricks.com>>
To: Todd <bit1...@163.com<mailto:bit1...@163.com>>
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" 
<user@spark.apache.org<mailto:user@spark.apache.org>>
Date: 09/10/2015 11:07 AM
Subject: Re: spark 1.5 SQL slows down dramatically by 50%+ compared with spark 
1.4.1 SQL





I've been running TPC-DS SF=1500 daily on Spark 1.4.1 and Spark 1.5 on S3, so 
this is surprising.  In my experiments Spark 1.5 is either the same or faster 
than 1.4 with only small exceptions.  A few thoughts,

 - 600 partitions is probably way too many for 6G of data.
 - Providing the output of explain for both runs would be helpful whenever 
reporting performance changes.

On Thu, Sep 10, 2015 at 1:24 AM, Todd <bit1...@163.com<mailto:bit1...@163.com>> 
wrote:
Hi,

I am using data generated with 
sparksqlperf(https://github.com/databricks/spark-sql-perf) to test the spark 
sql performance (spark on yarn, with 10 nodes) with the following code (The 
table store_sales is about 90 million records, 6G in size)

val outputDir="hdfs://tmp/spark_perf/scaleFactor=30/useDecimal=true/store_sales"
val name="store_sales"
sqlContext.sql(
  s"""
  |CREATE TEMPORARY TABLE ${name}
  |USING org.apache.spark.sql.parquet
  |OPTIONS (
  |  path '${outputDir}'
  |)
""".stripMargin)

val sql="""
 |select
 |  t1.ss_quantity,
 |  t1.ss_list_price,
 |  t1.ss_coupon_amt,
 |  t1.ss_cdemo_sk,
 |  t1.ss_item_sk,
 |  t1.ss_promo_sk,
 |  t1.ss_sold_date_sk
 |from store_sales t1 join store_sales t2 on t1.ss_item_sk = 
t2.ss_item_sk
 |where
 |  t1.ss_sold_date_sk between 2450815 and 2451179
   """.stripMargin

val df = sqlContext.sql(sql)
df.rdd.foreach(row=>Unit)

With 1.4.1, I can finish the query in 6 minut

RE: Re:Re:RE: Re:RE: spark 1.5 SQL slows down dramatically by 50%+ compared with spark 1.4.1 SQL

2015-09-11 Thread Cheng, Hao
Can you confirm if the query really run in the cluster mode? Not the local 
mode. Can you print the call stack of the executor when the query is running?

BTW: spark.shuffle.reduceLocality.enabled is the configuration of Spark, not 
Spark SQL.

From: Todd [mailto:bit1...@163.com]
Sent: Friday, September 11, 2015 3:39 PM
To: Todd
Cc: Cheng, Hao; Jesse F Chen; Michael Armbrust; user@spark.apache.org
Subject: Re:Re:RE: Re:RE: spark 1.5 SQL slows down dramatically by 50%+ 
compared with spark 1.4.1 SQL

I add the following two options:
spark.sql.planner.sortMergeJoin=false
spark.shuffle.reduceLocality.enabled=false

But it still performs the same as not setting them two.

One thing is that on the spark ui, when I click the SQL tab, it shows an empty 
page but the header title 'SQL',there is no table to show queries and execution 
plan information.




At 2015-09-11 14:39:06, "Todd" <bit1...@163.com<mailto:bit1...@163.com>> wrote:


Thanks Hao.
 Yes,it is still low as SMJ。Let me try the option your suggested,


At 2015-09-11 14:34:46, "Cheng, Hao" 
<hao.ch...@intel.com<mailto:hao.ch...@intel.com>> wrote:

You mean the performance is still slow as the SMJ in Spark 1.5?

Can you set the spark.shuffle.reduceLocality.enabled=false when you start the 
spark-shell/spark-sql? It’s a new feature in Spark 1.5, and it’s true by 
default, but we found it probably causes the performance reduce dramatically.


From: Todd [mailto:bit1...@163.com<mailto:bit1...@163.com>]
Sent: Friday, September 11, 2015 2:17 PM
To: Cheng, Hao
Cc: Jesse F Chen; Michael Armbrust; 
user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re:RE: spark 1.5 SQL slows down dramatically by 50%+ compared with 
spark 1.4.1 SQL

Thanks Hao for the reply.
I turn the merge sort join off, the physical plan is below, but the performance 
is roughly the same as it on...

== Physical Plan ==
TungstenProject 
[ss_quantity#10,ss_list_price#12,ss_coupon_amt#19,ss_cdemo_sk#4,ss_item_sk#2,ss_promo_sk#8,ss_sold_date_sk#0]
 ShuffledHashJoin [ss_item_sk#2], [ss_item_sk#25], BuildRight
  TungstenExchange hashpartitioning(ss_item_sk#2)
   ConvertToUnsafe
Scan 
ParquetRelation[hdfs://ns1/tmp/spark_perf/scaleFactor=30/useDecimal=true/store_sales][ss_promo_sk#8,ss_quantity#10,ss_cdemo_sk#4,ss_list_price#12,ss_coupon_amt#19,ss_item_sk#2,ss_sold_date_sk#0]
  TungstenExchange hashpartitioning(ss_item_sk#25)
   ConvertToUnsafe
Scan 
ParquetRelation[hdfs://ns1/tmp/spark_perf/scaleFactor=30/useDecimal=true/store_sales][ss_item_sk#25]

Code Generation: true



At 2015-09-11 13:48:23, "Cheng, Hao" 
<hao.ch...@intel.com<mailto:hao.ch...@intel.com>> wrote:
This is not a big surprise the SMJ is slower than the HashJoin, as we do not 
fully utilize the sorting yet, more details can be found at 
https://issues.apache.org/jira/browse/SPARK-2926 .

Anyway, can you disable the sort merge join by 
“spark.sql.planner.sortMergeJoin=false;” in Spark 1.5, and run the query again? 
In our previous testing, it’s about 20% slower for sort merge join. I am not 
sure if there anything else slow down the performance.

Hao


From: Jesse F Chen [mailto:jfc...@us.ibm.com<mailto:jfc...@us.ibm.com>]
Sent: Friday, September 11, 2015 1:18 PM
To: Michael Armbrust
Cc: Todd; user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: spark 1.5 SQL slows down dramatically by 50%+ compared with spark 
1.4.1 SQL


Could this be a build issue (i.e., sbt package)?

If I ran the same jar build for 1.4.1 in 1.5, I am seeing large regression too 
in queries (all other things identical)...

I am curious, to build 1.5 (when it isn't released yet), what do I need to do 
with the build.sbt file?

any special parameters i should be using to make sure I load the latest hive 
dependencies?

[Inactive hide details for Michael Armbrust ---09/10/2015 11:07:28 AM---I've 
been running TPC-DS SF=1500 daily on Spark 1.4.1 an]Michael Armbrust 
---09/10/2015 11:07:28 AM---I've been running TPC-DS SF=1500 daily on Spark 
1.4.1 and Spark 1.5 on S3, so this is surprising.  I

From: Michael Armbrust <mich...@databricks.com<mailto:mich...@databricks.com>>
To: Todd <bit1...@163.com<mailto:bit1...@163.com>>
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" 
<user@spark.apache.org<mailto:user@spark.apache.org>>
Date: 09/10/2015 11:07 AM
Subject: Re: spark 1.5 SQL slows down dramatically by 50%+ compared with spark 
1.4.1 SQL





I've been running TPC-DS SF=1500 daily on Spark 1.4.1 and Spark 1.5 on S3, so 
this is surprising.  In my experiments Spark 1.5 is either the same or faster 
than 1.4 with only small exceptions.  A few thoughts,

 - 600 partitions is probably way too many for 6G of data.
 - Providing the output of explain for both runs would be helpful whenever 
reporting performance changes.

On Thu, Sep 10, 2015 at 1:24 AM, Todd &l

RE: spark 1.5 SQL slows down dramatically by 50%+ compared with spark 1.4.1 SQL

2015-09-10 Thread Cheng, Hao
This is not a big surprise the SMJ is slower than the HashJoin, as we do not 
fully utilize the sorting yet, more details can be found at 
https://issues.apache.org/jira/browse/SPARK-2926 .

Anyway, can you disable the sort merge join by 
“spark.sql.planner.sortMergeJoin=false;” in Spark 1.5, and run the query again? 
In our previous testing, it’s about 20% slower for sort merge join. I am not 
sure if there anything else slow down the performance.

Hao


From: Jesse F Chen [mailto:jfc...@us.ibm.com]
Sent: Friday, September 11, 2015 1:18 PM
To: Michael Armbrust
Cc: Todd; user@spark.apache.org
Subject: Re: spark 1.5 SQL slows down dramatically by 50%+ compared with spark 
1.4.1 SQL


Could this be a build issue (i.e., sbt package)?

If I ran the same jar build for 1.4.1 in 1.5, I am seeing large regression too 
in queries (all other things identical)...

I am curious, to build 1.5 (when it isn't released yet), what do I need to do 
with the build.sbt file?

any special parameters i should be using to make sure I load the latest hive 
dependencies?

[Inactive hide details for Michael Armbrust ---09/10/2015 11:07:28 AM---I've 
been running TPC-DS SF=1500 daily on Spark 1.4.1 an]Michael Armbrust 
---09/10/2015 11:07:28 AM---I've been running TPC-DS SF=1500 daily on Spark 
1.4.1 and Spark 1.5 on S3, so this is surprising.  I

From: Michael Armbrust >
To: Todd >
Cc: "user@spark.apache.org" 
>
Date: 09/10/2015 11:07 AM
Subject: Re: spark 1.5 SQL slows down dramatically by 50%+ compared with spark 
1.4.1 SQL





I've been running TPC-DS SF=1500 daily on Spark 1.4.1 and Spark 1.5 on S3, so 
this is surprising.  In my experiments Spark 1.5 is either the same or faster 
than 1.4 with only small exceptions.  A few thoughts,

 - 600 partitions is probably way too many for 6G of data.
 - Providing the output of explain for both runs would be helpful whenever 
reporting performance changes.

On Thu, Sep 10, 2015 at 1:24 AM, Todd > 
wrote:
Hi,

I am using data generated with 
sparksqlperf(https://github.com/databricks/spark-sql-perf) to test the spark 
sql performance (spark on yarn, with 10 nodes) with the following code (The 
table store_sales is about 90 million records, 6G in size)

val outputDir="hdfs://tmp/spark_perf/scaleFactor=30/useDecimal=true/store_sales"
val name="store_sales"
sqlContext.sql(
  s"""
  |CREATE TEMPORARY TABLE ${name}
  |USING org.apache.spark.sql.parquet
  |OPTIONS (
  |  path '${outputDir}'
  |)
""".stripMargin)

val sql="""
 |select
 |  t1.ss_quantity,
 |  t1.ss_list_price,
 |  t1.ss_coupon_amt,
 |  t1.ss_cdemo_sk,
 |  t1.ss_item_sk,
 |  t1.ss_promo_sk,
 |  t1.ss_sold_date_sk
 |from store_sales t1 join store_sales t2 on t1.ss_item_sk = 
t2.ss_item_sk
 |where
 |  t1.ss_sold_date_sk between 2450815 and 2451179
   """.stripMargin

val df = sqlContext.sql(sql)
df.rdd.foreach(row=>Unit)

With 1.4.1, I can finish the query in 6 minutes,  but  I need 10+ minutes with 
1.5.

The configuration are basically the same, since I copy the configuration from 
1.4.1 to 1.5:

sparkVersion1.4.11.5.0
scaleFactor3030
spark.sql.shuffle.partitions600600
spark.sql.sources.partitionDiscovery.enabledtruetrue
spark.default.parallelism200200
spark.driver.memory4G4G4G
spark.executor.memory4G4G
spark.executor.instances1010
spark.shuffle.consolidateFilestruetrue
spark.storage.memoryFraction0.40.4
spark.executor.cores33

I am not sure where is going wrong,any ideas?



RE: Driver OOM after upgrading to 1.5

2015-09-09 Thread Cheng, Hao
Will that be helpful if adding jvm options like:
-XX:+CMSClassUnloadingEnabled -XX:+CMSPermGenSweepingEnabled

From: Reynold Xin [mailto:r...@databricks.com]
Sent: Thursday, September 10, 2015 5:31 AM
To: Sandy Ryza
Cc: user@spark.apache.org
Subject: Re: Driver OOM after upgrading to 1.5

It's likely that with codegen, you need a bigger permgen space. Also I found 
that Java 7 doesn't do very well w.r.t. flushing code cache. As a result, Spark 
SQL and DataFrames now run much better under Java 8, because it flushes code 
cache better.


On Wed, Sep 9, 2015 at 2:12 PM, Sandy Ryza 
> wrote:
Java 7.

FWIW I was just able to get it to work by increasing MaxPermSize to 256m.

-Sandy

On Wed, Sep 9, 2015 at 11:37 AM, Reynold Xin 
> wrote:
Java 7 / 8?

On Wed, Sep 9, 2015 at 10:10 AM, Sandy Ryza 
> wrote:
I just upgraded the 
spark-timeseries project to run 
on top of 1.5, and I'm noticing that tests are failing with OOMEs.

I ran a jmap -histo on the process and discovered the top heap items to be:
   1:163428   22236064  
   2:163428   21112648  
   3: 12638   14459192  
   4: 12638   13455904  
   5: 105397642528  

Not sure whether this is suspicious.  Any ideas?

-Sandy





RE: Re: Job aborted due to stage failure: java.lang.StringIndexOutOfBoundsException: String index out of range: 18

2015-08-30 Thread Cheng, Hao
Hi, can you try something like:


val rowRDD=sc.textFile(/user/spark/short_model).map{ line =

  val p = line.split(\\tfile:///\\t)

  if (p.length =72) {

Row(p(0), p(1)…)

  } else {

throw new RuntimeException(s“failed in parsing $line”)

  }

}

From the log “java.lang.ArrayIndexOutOfBoundsException: 71”, seems something 
wrong with your data, is that your intention?

Thanks,
Hao

From: our...@cnsuning.com [mailto:our...@cnsuning.com]
Sent: Friday, August 28, 2015 7:20 PM
To: Terry Hole
Cc: user
Subject: Re: Re: Job aborted due to stage failure: 
java.lang.StringIndexOutOfBoundsException: String index out of range: 18

Terry:
Unfortunately, error remains when use your advice.But error is changed ,now 
error is java.lang.ArrayIndexOutOfBoundsException: 71
  error log as following:

15/08/28 19:13:53 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have 
all completed, from pool
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 
9, 10.104.74.7): java.lang.ArrayIndexOutOfBoundsException: 71
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:23)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:23)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at 
org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:130)
at 
org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:126)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at scala.Option.foreach(Option.scala:236)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)









From: Terry Holemailto:hujie.ea...@gmail.com
Date: 2015-08-28 17:22
To: our...@cnsuning.commailto:our...@cnsuning.com
CC: usermailto:user@spark.apache.org; hao.chengmailto:hao.ch...@intel.com; 
Huang, Jiemailto:jie.hu...@intel.com
Subject: Re: Job aborted due to stage failure: 
java.lang.StringIndexOutOfBoundsException: String index out of range: 18
Ricky,


You may need to use map instead of flatMap in your case

val 
rowRDD=sc.textFile(/user/spark/short_model).map(_.split(\\tfile:///\\t)).map(p
 = Row(...))



Thanks!

-Terry

On Fri, Aug 28, 2015 at 5:08 PM, 
our...@cnsuning.commailto:our...@cnsuning.com 
our...@cnsuning.commailto:our...@cnsuning.com wrote:
hi all,

when using  spark sql ,A problem bothering me.

the codeing as following:
 val schemaString = 

RE: DataFrame#show cost 2 Spark Jobs ?

2015-08-25 Thread Cheng, Hao
Ok, I see, thanks for the correction, but this should be optimized.

From: Shixiong Zhu [mailto:zsxw...@gmail.com]
Sent: Tuesday, August 25, 2015 2:08 PM
To: Cheng, Hao
Cc: Jeff Zhang; user@spark.apache.org
Subject: Re: DataFrame#show cost 2 Spark Jobs ?

That's two jobs. `SparkPlan.executeTake` will call `runJob` twice in this case.


Best Regards,
Shixiong Zhu

2015-08-25 14:01 GMT+08:00 Cheng, Hao 
hao.ch...@intel.commailto:hao.ch...@intel.com:
O, Sorry, I miss reading your reply!

I know the minimum tasks will be 2 for scanning, but Jeff is talking about 2 
jobs, not 2 tasks.

From: Shixiong Zhu [mailto:zsxw...@gmail.commailto:zsxw...@gmail.com]
Sent: Tuesday, August 25, 2015 1:29 PM
To: Cheng, Hao
Cc: Jeff Zhang; user@spark.apache.orgmailto:user@spark.apache.org

Subject: Re: DataFrame#show cost 2 Spark Jobs ?

Hao,

I can reproduce it using the master branch. I'm curious why you cannot 
reproduce it. Did you check if the input HadoopRDD did have two partitions? My 
test code is

val df = sqlContext.read.json(examples/src/main/resources/people.json)
df.show()



Best Regards,
Shixiong Zhu

2015-08-25 13:01 GMT+08:00 Cheng, Hao 
hao.ch...@intel.commailto:hao.ch...@intel.com:
Hi Jeff, which version are you using? I couldn’t reproduce the 2 spark jobs in 
the `df.show()` with latest code, we did refactor the code for json data source 
recently, not sure you’re running an earlier version of it.

And a known issue is Spark SQL will try to re-list the files every time when 
loading the data for JSON, it’s probably causes longer time for ramp up with 
large number of files/partitions.

From: Jeff Zhang [mailto:zjf...@gmail.commailto:zjf...@gmail.com]
Sent: Tuesday, August 25, 2015 8:11 AM
To: Cheng, Hao
Cc: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: DataFrame#show cost 2 Spark Jobs ?

Hi Cheng,

I know that sqlContext.read will trigger one spark job to infer the schema. 
What I mean is DataFrame#show cost 2 spark jobs. So overall it would cost 3 
jobs.

Here's the command I use:

 val df = 
 sqlContext.read.json(file:///Users/hadoop/github/spark/examples/src/main/resources/people.jsonfile:///\\Users\hadoop\github\spark\examples\src\main\resources\people.json)
 // trigger one spark job to infer schema
 df.show()// trigger 2 spark jobs which is weird




On Mon, Aug 24, 2015 at 10:56 PM, Cheng, Hao 
hao.ch...@intel.commailto:hao.ch...@intel.com wrote:
The first job is to infer the json schema, and the second one is what you mean 
of the query.
You can provide the schema while loading the json file, like below:

sqlContext.read.schema(xxx).json(“…”)?

Hao
From: Jeff Zhang [mailto:zjf...@gmail.commailto:zjf...@gmail.com]
Sent: Monday, August 24, 2015 6:20 PM
To: user@spark.apache.orgmailto:user@spark.apache.org
Subject: DataFrame#show cost 2 Spark Jobs ?

It's weird to me that the simple show function will cost 2 spark jobs. 
DataFrame#explain shows it is a very simple operation, not sure why need 2 jobs.

== Parsed Logical Plan ==
Relation[age#0L,name#1] 
JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json]

== Analyzed Logical Plan ==
age: bigint, name: string
Relation[age#0L,name#1] 
JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json]

== Optimized Logical Plan ==
Relation[age#0L,name#1] 
JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json]

== Physical Plan ==
Scan 
JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json][age#0L,name#1]



--
Best Regards

Jeff Zhang



--
Best Regards

Jeff Zhang




RE: DataFrame#show cost 2 Spark Jobs ?

2015-08-25 Thread Cheng, Hao
O, Sorry, I miss reading your reply!

I know the minimum tasks will be 2 for scanning, but Jeff is talking about 2 
jobs, not 2 tasks.

From: Shixiong Zhu [mailto:zsxw...@gmail.com]
Sent: Tuesday, August 25, 2015 1:29 PM
To: Cheng, Hao
Cc: Jeff Zhang; user@spark.apache.org
Subject: Re: DataFrame#show cost 2 Spark Jobs ?

Hao,

I can reproduce it using the master branch. I'm curious why you cannot 
reproduce it. Did you check if the input HadoopRDD did have two partitions? My 
test code is

val df = sqlContext.read.json(examples/src/main/resources/people.json)
df.show()



Best Regards,
Shixiong Zhu

2015-08-25 13:01 GMT+08:00 Cheng, Hao 
hao.ch...@intel.commailto:hao.ch...@intel.com:
Hi Jeff, which version are you using? I couldn’t reproduce the 2 spark jobs in 
the `df.show()` with latest code, we did refactor the code for json data source 
recently, not sure you’re running an earlier version of it.

And a known issue is Spark SQL will try to re-list the files every time when 
loading the data for JSON, it’s probably causes longer time for ramp up with 
large number of files/partitions.

From: Jeff Zhang [mailto:zjf...@gmail.commailto:zjf...@gmail.com]
Sent: Tuesday, August 25, 2015 8:11 AM
To: Cheng, Hao
Cc: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: DataFrame#show cost 2 Spark Jobs ?

Hi Cheng,

I know that sqlContext.read will trigger one spark job to infer the schema. 
What I mean is DataFrame#show cost 2 spark jobs. So overall it would cost 3 
jobs.

Here's the command I use:

 val df = 
 sqlContext.read.json(file:///Users/hadoop/github/spark/examples/src/main/resources/people.jsonfile:///\\Users\hadoop\github\spark\examples\src\main\resources\people.json)
 // trigger one spark job to infer schema
 df.show()// trigger 2 spark jobs which is weird




On Mon, Aug 24, 2015 at 10:56 PM, Cheng, Hao 
hao.ch...@intel.commailto:hao.ch...@intel.com wrote:
The first job is to infer the json schema, and the second one is what you mean 
of the query.
You can provide the schema while loading the json file, like below:

sqlContext.read.schema(xxx).json(“…”)?

Hao
From: Jeff Zhang [mailto:zjf...@gmail.commailto:zjf...@gmail.com]
Sent: Monday, August 24, 2015 6:20 PM
To: user@spark.apache.orgmailto:user@spark.apache.org
Subject: DataFrame#show cost 2 Spark Jobs ?

It's weird to me that the simple show function will cost 2 spark jobs. 
DataFrame#explain shows it is a very simple operation, not sure why need 2 jobs.

== Parsed Logical Plan ==
Relation[age#0L,name#1] 
JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json]

== Analyzed Logical Plan ==
age: bigint, name: string
Relation[age#0L,name#1] 
JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json]

== Optimized Logical Plan ==
Relation[age#0L,name#1] 
JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json]

== Physical Plan ==
Scan 
JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json][age#0L,name#1]



--
Best Regards

Jeff Zhang



--
Best Regards

Jeff Zhang



RE: Spark thrift server on yarn

2015-08-25 Thread Cheng, Hao
Did you register temp table via the beeline or in a new Spark SQL CLI?

As I know, the temp table cannot cross the HiveContext.

Hao

From: Udit Mehta [mailto:ume...@groupon.com]
Sent: Wednesday, August 26, 2015 8:19 AM
To: user
Subject: Spark thrift server on yarn

Hi,
I am trying to start a spark thrift server using the following command on Spark 
1.3.1 running on yarn:

 ./sbin/start-thriftserver.sh --master yarn://resourcemanager.snc1:8032 
--executor-memory 512m --hiveconf hive.server2.thrift.bind.host=test-host.sn1 
--hiveconf hive.server2.thrift.port=10001 --queue public
It starts up fine and is able to connect to the hive metastore.
I now need to view some temporary tables using this thrift server so I start up 
SparkSql and register a temp table.
But the problem is that I am unable to view the temp table using the beeline 
client. I am pretty sure I am going wrong somewhere and the spark documentation 
does not clearly say how to run the thrift server in yarn mode or maybe I 
missed something.
Could someone tell me how this is to be done or point me to some documentation?
Thanks in advance,
Udit


RE: DataFrame#show cost 2 Spark Jobs ?

2015-08-24 Thread Cheng, Hao
The first job is to infer the json schema, and the second one is what you mean 
of the query.
You can provide the schema while loading the json file, like below:

sqlContext.read.schema(xxx).json(“…”)?

Hao
From: Jeff Zhang [mailto:zjf...@gmail.com]
Sent: Monday, August 24, 2015 6:20 PM
To: user@spark.apache.org
Subject: DataFrame#show cost 2 Spark Jobs ?

It's weird to me that the simple show function will cost 2 spark jobs. 
DataFrame#explain shows it is a very simple operation, not sure why need 2 jobs.

== Parsed Logical Plan ==
Relation[age#0L,name#1] 
JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json]

== Analyzed Logical Plan ==
age: bigint, name: string
Relation[age#0L,name#1] 
JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json]

== Optimized Logical Plan ==
Relation[age#0L,name#1] 
JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json]

== Physical Plan ==
Scan 
JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json][age#0L,name#1]



--
Best Regards

Jeff Zhang


RE: Loading already existing tables in spark shell

2015-08-24 Thread Cheng, Hao
And be sure the hive-site.xml is under the classpath or under the path of 
$SPARK_HOME/conf

Hao

From: Ishwardeep Singh [mailto:ishwardeep.si...@impetus.co.in]
Sent: Monday, August 24, 2015 8:57 PM
To: user
Subject: Re: Loading already existing tables in spark shell


Hi Jeetendra,



I faced this issue. I did not specify the database where this table exists. 
Please set the database by using use database command before executing the 
query.



Regards,

Ishwardeep


From: Jeetendra Gangele gangele...@gmail.commailto:gangele...@gmail.com
Sent: Monday, August 24, 2015 5:47 PM
To: user
Subject: Loading already existing tables in spark shell

Hi All I have few tables in hive and I wanted to run query against them with 
spark as execution engine.

Can I direct;y load these tables in spark shell and run query?

I tried with
1.val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
2.qlContext.sql(FROM event_impressions select count(*)) where 
event_impressions is the table name.

It give me error saying org.apache.spark.sql.AnalysisException: no such table 
event_impressions; line 1 pos 5

Does anybody hit similar issues?


regards
jeetendra








NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.


RE: DataFrame#show cost 2 Spark Jobs ?

2015-08-24 Thread Cheng, Hao
Hi Jeff, which version are you using? I couldn’t reproduce the 2 spark jobs in 
the `df.show()` with latest code, we did refactor the code for json data source 
recently, not sure you’re running an earlier version of it.

And a known issue is Spark SQL will try to re-list the files every time when 
loading the data for JSON, it’s probably causes longer time for ramp up with 
large number of files/partitions.

From: Jeff Zhang [mailto:zjf...@gmail.com]
Sent: Tuesday, August 25, 2015 8:11 AM
To: Cheng, Hao
Cc: user@spark.apache.org
Subject: Re: DataFrame#show cost 2 Spark Jobs ?

Hi Cheng,

I know that sqlContext.read will trigger one spark job to infer the schema. 
What I mean is DataFrame#show cost 2 spark jobs. So overall it would cost 3 
jobs.

Here's the command I use:

 val df = 
 sqlContext.read.json(file:///Users/hadoop/github/spark/examples/src/main/resources/people.jsonfile:///\\Users\hadoop\github\spark\examples\src\main\resources\people.json)
 // trigger one spark job to infer schema
 df.show()// trigger 2 spark jobs which is weird




On Mon, Aug 24, 2015 at 10:56 PM, Cheng, Hao 
hao.ch...@intel.commailto:hao.ch...@intel.com wrote:
The first job is to infer the json schema, and the second one is what you mean 
of the query.
You can provide the schema while loading the json file, like below:

sqlContext.read.schema(xxx).json(“…”)?

Hao
From: Jeff Zhang [mailto:zjf...@gmail.commailto:zjf...@gmail.com]
Sent: Monday, August 24, 2015 6:20 PM
To: user@spark.apache.orgmailto:user@spark.apache.org
Subject: DataFrame#show cost 2 Spark Jobs ?

It's weird to me that the simple show function will cost 2 spark jobs. 
DataFrame#explain shows it is a very simple operation, not sure why need 2 jobs.

== Parsed Logical Plan ==
Relation[age#0L,name#1] 
JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json]

== Analyzed Logical Plan ==
age: bigint, name: string
Relation[age#0L,name#1] 
JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json]

== Optimized Logical Plan ==
Relation[age#0L,name#1] 
JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json]

== Physical Plan ==
Scan 
JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json][age#0L,name#1]



--
Best Regards

Jeff Zhang



--
Best Regards

Jeff Zhang


RE: Test case for the spark sql catalyst

2015-08-24 Thread Cheng, Hao
Yes, check the source code under: 
https://github.com/apache/spark/tree/master/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst

From: Todd [mailto:bit1...@163.com]
Sent: Tuesday, August 25, 2015 1:01 PM
To: user@spark.apache.org
Subject: Test case for the spark sql catalyst

Hi, Are there test cases for the spark sql catalyst, such as testing the rules 
of transforming unsolved query plan?
Thanks!


RE: Spark Sql behaves strangely with tables with a lot of partitions

2015-08-19 Thread Cheng, Hao
Yes, you can try set the spark.sql.sources.partitionDiscovery.enabled to false.

BTW, which version are you using?

Hao

From: Jerrick Hoang [mailto:jerrickho...@gmail.com]
Sent: Thursday, August 20, 2015 12:16 PM
To: Philip Weaver
Cc: user
Subject: Re: Spark Sql behaves strangely with tables with a lot of partitions

I guess the question is why does spark have to do partition discovery with all 
partitions when the query only needs to look at one partition? Is there a conf 
flag to turn this off?

On Wed, Aug 19, 2015 at 9:02 PM, Philip Weaver 
philip.wea...@gmail.commailto:philip.wea...@gmail.com wrote:
I've had the same problem. It turns out that Spark (specifically parquet) is 
very slow at partition discovery. It got better in 1.5 (not yet released), but 
was still unacceptably slow. Sadly, we ended up reading parquet files manually 
in Python (via C++) and had to abandon Spark SQL because of this problem.

On Wed, Aug 19, 2015 at 7:51 PM, Jerrick Hoang 
jerrickho...@gmail.commailto:jerrickho...@gmail.com wrote:
Hi all,

I did a simple experiment with Spark SQL. I created a partitioned parquet table 
with only one partition (date=20140701). A simple `select count(*) from table 
where date=20140701` would run very fast (0.1 seconds). However, as I added 
more partitions the query takes longer and longer. When I added about 10,000 
partitions, the query took way too long. I feel like querying for a single 
partition should not be affected by having more partitions. Is this a known 
behaviour? What does spark try to do here?

Thanks,
Jerrick




RE: Spark Sql behaves strangely with tables with a lot of partitions

2015-08-19 Thread Cheng, Hao
Can you make some more profiling? I am wondering if the driver is busy with 
scanning the HDFS / S3.
Like jstack pid of driver process

And also, it’s will be great if you can paste the physical plan for the simple 
query.

From: Jerrick Hoang [mailto:jerrickho...@gmail.com]
Sent: Thursday, August 20, 2015 1:46 PM
To: Cheng, Hao
Cc: Philip Weaver; user
Subject: Re: Spark Sql behaves strangely with tables with a lot of partitions

I cloned from TOT after 1.5.0 cut off. I noticed there were a couple of CLs 
trying to speed up spark sql with tables with a huge number of partitions, I've 
made sure that those CLs are included but it's still very slow

On Wed, Aug 19, 2015 at 10:43 PM, Cheng, Hao 
hao.ch...@intel.commailto:hao.ch...@intel.com wrote:
Yes, you can try set the spark.sql.sources.partitionDiscovery.enabled to false.

BTW, which version are you using?

Hao

From: Jerrick Hoang 
[mailto:jerrickho...@gmail.commailto:jerrickho...@gmail.com]
Sent: Thursday, August 20, 2015 12:16 PM
To: Philip Weaver
Cc: user
Subject: Re: Spark Sql behaves strangely with tables with a lot of partitions

I guess the question is why does spark have to do partition discovery with all 
partitions when the query only needs to look at one partition? Is there a conf 
flag to turn this off?

On Wed, Aug 19, 2015 at 9:02 PM, Philip Weaver 
philip.wea...@gmail.commailto:philip.wea...@gmail.com wrote:
I've had the same problem. It turns out that Spark (specifically parquet) is 
very slow at partition discovery. It got better in 1.5 (not yet released), but 
was still unacceptably slow. Sadly, we ended up reading parquet files manually 
in Python (via C++) and had to abandon Spark SQL because of this problem.

On Wed, Aug 19, 2015 at 7:51 PM, Jerrick Hoang 
jerrickho...@gmail.commailto:jerrickho...@gmail.com wrote:
Hi all,

I did a simple experiment with Spark SQL. I created a partitioned parquet table 
with only one partition (date=20140701). A simple `select count(*) from table 
where date=20140701` would run very fast (0.1 seconds). However, as I added 
more partitions the query takes longer and longer. When I added about 10,000 
partitions, the query took way too long. I feel like querying for a single 
partition should not be affected by having more partitions. Is this a known 
behaviour? What does spark try to do here?

Thanks,
Jerrick





RE: Refresh table

2015-08-11 Thread Cheng, Hao
Refreshing table only works for the Spark SQL DataSource  in my understanding, 
apparently here, you’re running a Hive Table.

Can you try to create a table like:

|CREATE TEMPORARY TABLE parquetTable (a int, b string)
|USING org.apache.spark.sql.parquet.DefaultSource
|OPTIONS (
|  path '/root_path'
|)

And then df2.write.parquet(hdfs://root_path/test_table/key=2) …

Cheng

From: Jerrick Hoang [mailto:jerrickho...@gmail.com]
Sent: Tuesday, August 11, 2015 2:15 PM
To: user
Subject: Refresh table

Hi all,

I'm a little confused about how refresh table (SPARK-5833) should work. So I 
did the following,

val df1 = sc.makeRDD(1 to 5).map(i = (i, i * 2)).toDF(single, double)

df1.write.parquet(hdfs://path/test_table/key=1)

Then I created an external table by doing,

CREATE EXTERNAL TABLE `tmp_table` (
`single`: int,
`double`: int)
PARTITIONED BY (
  `key` string)
ROW FORMAT SERDE
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  'hdfs://path/test_table/'

Then I added the partition to the table by `alter table tmp_table add partition 
(key=1) location 'hdfs://..`

Then I added a new partition with different schema by,


val df2 = sc.makeRDD(1 to 5).map(i = (i, i * 3)).toDF(single, triple)

df2.write.parquet(hdfs://path/test_table/key=2)

And added the new partition to the table by `alter table ..`,

But when I did `refresh table tmp_table` and `describe table` it couldn't pick 
up the new column `triple`. Can someone explain to me how partition discovery 
and schema merging of refresh table should work?

Thanks


RE: Spark DataFrames uses too many partition

2015-08-11 Thread Cheng, Hao
That's a good question, we don't support reading small files in a single 
partition yet, but it's definitely an issue we need to optimize, do you mind to 
create a jira issue for this? Hopefully we can merge that in 1.6 release.

200 is the default partition number for parallel tasks after the data shuffle, 
and we have to change that value according to the file size, cluster size etc..

Ideally, this number would be set dynamically and automatically, however, spark 
sql doesn't support the complex cost based model yet, particularly for the 
multi-stages job. (https://issues.apache.org/jira/browse/SPARK-4630)

Hao

-Original Message-
From: Al M [mailto:alasdair.mcbr...@gmail.com] 
Sent: Tuesday, August 11, 2015 11:31 PM
To: user@spark.apache.org
Subject: Spark DataFrames uses too many partition

I am using DataFrames with Spark 1.4.1.  I really like DataFrames but the 
partitioning makes no sense to me.

I am loading lots of very small files and joining them together.  Every file is 
loaded by Spark with just one partition.  Each time I join two small files the 
partition count increases to 200.  This makes my application take 10x as long 
as if I coalesce everything to 1 partition after each join.

With normal RDDs it would not expand out the partitions to 200 after joining 
two files with one partition each.  It would either keep it at one or expand it 
to two.

Why do DataFrames expand out the partitions so much?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-DataFrames-uses-too-many-partition-tp24214.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Very high latency to initialize a DataFrame from partitioned parquet database.

2015-08-11 Thread Cheng, Hao
Definitely worth to try. And you can sort the record before writing out, and 
then you will get the parquet files without overlapping keys.
Let us know if that helps.

Hao

From: Philip Weaver [mailto:philip.wea...@gmail.com]
Sent: Wednesday, August 12, 2015 4:05 AM
To: Cheng Lian
Cc: user
Subject: Re: Very high latency to initialize a DataFrame from partitioned 
parquet database.

Do you think it might be faster to put all the files in one directory but still 
partitioned the same way? I don't actually need to filter on the values of the 
partition keys, but I need to rely on there be no overlap in the value of the 
keys between any two parquet files.

On Fri, Aug 7, 2015 at 8:23 AM, Philip Weaver 
philip.wea...@gmail.commailto:philip.wea...@gmail.com wrote:
Thanks, I also confirmed that the partition discovery is slow by writing a 
non-Spark application that uses the parquet library directly to load that 
partitions.

It's so slow that my colleague's Python application can read the entire 
contents of all the parquet data files faster than my application can even 
discover the partitions!

On Fri, Aug 7, 2015 at 2:09 AM, Cheng Lian 
lian.cs@gmail.commailto:lian.cs@gmail.com wrote:
However, it's weird that the partition discovery job only spawns 2 tasks. It 
should use the default parallelism, which is probably 8 according to the logs 
of the next Parquet reading job. Partition discovery is already done in a 
distributed manner via a Spark job. But the parallelism is mysteriously low...

Cheng

On 8/7/15 3:32 PM, Cheng Lian wrote:

Hi Philip,

Thanks for providing the log file. It seems that most of the time are spent on 
partition discovery. The code snippet you provided actually issues two jobs. 
The first one is for listing the input directories to find out all leaf 
directories (and this actually requires listing all leaf files, because we can 
only assert that a directory is a leaf one when it contains no 
sub-directories). Then partition information is extracted from leaf directory 
paths. This process starts at:

10:51:44 INFO sources.HadoopFsRelation: Listing leaf files and directories in 
parallel under: file:/home/pweaver/work/parquet/day=20150225, …

and ends at:

10:52:31 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks 
have all completed, from pool

The actual tasks execution time is about 36s:

10:51:54 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 
lindevspark5, PROCESS_LOCAL, 3087 bytes)
…
10:52:30 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) 
in 36107 ms on lindevspark5 (1/2)

You mentioned that your dataset has about 40,000+ partitions, so there are a 
lot of leaf directories and files out there. My guess is that the local file 
system spent lots of time listing FileStatus-es of all these files.

I also noticed that Mesos job scheduling takes more time then expected. It is 
probably because this is the first Spark job executed in the application, and 
the system is not warmed up yet. For example, there’s a 6s gap between these 
two adjacent lines:

10:51:45 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
10:51:51 INFO mesos.CoarseMesosSchedulerBackend: Mesos task 0 is now 
TASK_RUNNING

The 2nd Spark job is the real Parquet reading job, and this one actually 
finishes pretty quickly, only 3s (note that the Mesos job scheduling latency is 
also included):

10:52:32 INFO scheduler.DAGScheduler: Got job 1 (parquet at App.scala:182) with 
8 output partitions
…
10:52:32 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, 
lindevspark4, PROCESS_LOCAL, 2058 bytes)
10:52:32 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, 
lindevspark5, PROCESS_LOCAL, 2058 bytes)
10:52:32 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 1.0 (TID 4, 
lindevspark4, PROCESS_LOCAL, 2058 bytes)
…
10:52:34 INFO scheduler.TaskSetManager: Finished task 6.0 in stage 1.0 (TID 8) 
in 1527 ms on lindevspark4 (6/8)
10:52:34 INFO scheduler.TaskSetManager: Finished task 4.0 in stage 1.0 (TID 6) 
in 1533 ms on lindevspark4 (7/8)
10:52:35 INFO scheduler.TaskSetManager: Finished task 7.0 in stage 1.0 (TID 9) 
in 2886 ms on lindevspark5 (8/8)

That might be the reason why you observed that the C parquet library you 
mentioned (is it parquet-cpp?) is an order of magnitude faster?

Cheng

On 8/7/15 2:02 AM, Philip Weaver wrote:
With DEBUG, the log output was over 10MB, so I opted for just INFO output. The 
(sanitized) log is attached.

The driver is essentially this code:

info(A)

val t = System.currentTimeMillis
val df = sqlContext.read.parquet(dir).select(...).cache

val elapsed = System.currentTimeMillis - t
info(sInit time: ${elapsed} ms)

We've also observed that it is very slow to read the contents of the parquet 
files. My colleague wrote a PySpark application that gets the list of files, 
parallelizes it, maps across it and reads each file manually using a C 

RE: HiveThriftServer2.startWithContext error with registerTempTable

2015-07-15 Thread Cheng, Hao
Have you ever try query the “select * from temp_table” from the spark shell? Or 
can you try the option --jars while starting the spark shell?

From: Srikanth [mailto:srikanth...@gmail.com]
Sent: Thursday, July 16, 2015 9:36 AM
To: user
Subject: Re: HiveThriftServer2.startWithContext error with registerTempTable

Hello,

Re-sending this to see if I'm second time lucky!
I've not managed to move past this error.

Srikanth

On Mon, Jul 13, 2015 at 9:14 PM, Srikanth 
srikanth...@gmail.commailto:srikanth...@gmail.com wrote:
Hello,

I want to expose result of Spark computation to external tools. I plan to do 
this with Thrift server JDBC interface by registering result Dataframe as temp 
table.
I wrote a sample program in spark-shell to test this.

val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
import hiveContext.implicits._
HiveThriftServer2.startWithContext(hiveContext)
val myDF = hiveContext.read.format(com.databricks.spark.csv).option(header, 
true).load(/datafolder/weblog/pages.csv)
myDF.registerTempTable(temp_table)

I'm able to see the temp table in Beeline

+-+--+
|  tableName  | isTemporary  |
+-+--+
| temp_table  | true |
| my_table| false|
+-+--+

Now when I issue select * from temp_table from Beeline, I see below exception 
in spark-shell

15/07/13 17:18:27 WARN ThriftCLIService: Error executing statement:
org.apache.hive.service.cli.HiveSQLException: java.lang.ClassNotFoundException: 
com.databricks.spark.csv.CsvRelation$$anonfun$buildScan$1$$anonfun$1
at 
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.run(Shim13.scala:206)
at 
org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementInternal(HiveSessionImpl.java:231)
at 
org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementAsync(HiveSessionImpl.java:218)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

I'm able to read the other table(my_table) from Beeline though.
Any suggestions on how to overcome this?

This is with Spark 1.4 pre-built version. Spark-shell was started with 
--package to pass spark-csv.

Srikanth



RE: Python DataFrames: length of ArrayType

2015-07-15 Thread Cheng, Hao
Actually it's supposed to be part of Spark 1.5 release, see 
https://issues.apache.org/jira/browse/SPARK-8230
You're definitely welcome to contribute to it, let me know if you have any 
question on implementing it.

Cheng Hao


-Original Message-
From: pedro [mailto:ski.rodrig...@gmail.com] 
Sent: Thursday, July 16, 2015 7:31 AM
To: user@spark.apache.org
Subject: Python DataFrames: length of ArrayType

Resubmitting after fixing subscription to mailing list.

Based on the list of functions here:
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions

there doesn't seem to be a way to get the length of an array in a dataframe 
without defining a UDF. 

What I would be looking for is something like this (except length_udf would be 
pyspark.sql.functions.length or something similar):

length_udf = UserDefinedFunction(len, IntegerType()) test_schema = StructType([ 
StructField('arr', ArrayType(IntegerType())), 
StructField('letter', StringType()) 
])
test_df = sql.createDataFrame(sc.parallelize([ 
[[1, 2, 3], 'a'], 
[[4, 5, 6, 7, 8], 'b'] 
]), test_schema)
test_df.select(length_udf(test_df.arr)).collect() 

Output: 
[Row(PythonUDF#len(arr)=3), Row(PythonUDF#len(arr)=5)] 

Is there currently a way to accomplish this? If this doesn't exist and seems 
useful, I would be happy to contribute a PR with the function. 

Pedro Rodriguez



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Python-DataFrames-length-of-ArrayType-tp23869.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: How do you access a cached Spark SQL Table from a JBDC connection?

2015-07-14 Thread Cheng, Hao
Can you describe how did you cache the tables? In another HiveContext? AFAIK, 
cached table only be visible within the same HiveContext, you probably need to 
execute the sql query like
“cache table mytable as SELECT xxx” in the JDBC connection also.

Cheng Hao

From: Brandon White [mailto:bwwintheho...@gmail.com]
Sent: Wednesday, July 15, 2015 8:26 AM
To: user
Subject: How do you access a cached Spark SQL Table from a JBDC connection?

Hello there,

I have a JBDC connection setup to my Spark cluster but I cannot see the tables 
that I cache in memory. The only tables I can see are those that are in my Hive 
instance. I use a HiveContext to register a table and cache it in memory. How 
can I enable my JBDC connection to query this in memory table?

Brandon


RE: How do you access a cached Spark SQL Table from a JBDC connection?

2015-07-14 Thread Cheng, Hao
So you’re with different HiveContext instances for the caching. We are not 
expected to  see the cached tables cached with the other HiveContext instance.

From: Brandon White [mailto:bwwintheho...@gmail.com]
Sent: Wednesday, July 15, 2015 8:48 AM
To: Cheng, Hao
Cc: user
Subject: Re: How do you access a cached Spark SQL Table from a JBDC connection?

I cache the tell with

hiveContext.cacheTable(tableName)

On Tue, Jul 14, 2015 at 5:43 PM, Cheng, Hao 
hao.ch...@intel.commailto:hao.ch...@intel.com wrote:
Can you describe how did you cache the tables? In another HiveContext? AFAIK, 
cached table only be visible within the same HiveContext, you probably need to 
execute the sql query like
“cache table mytable as SELECT xxx” in the JDBC connection also.

Cheng Hao

From: Brandon White 
[mailto:bwwintheho...@gmail.commailto:bwwintheho...@gmail.com]
Sent: Wednesday, July 15, 2015 8:26 AM
To: user
Subject: How do you access a cached Spark SQL Table from a JBDC connection?

Hello there,

I have a JBDC connection setup to my Spark cluster but I cannot see the tables 
that I cache in memory. The only tables I can see are those that are in my Hive 
instance. I use a HiveContext to register a table and cache it in memory. How 
can I enable my JBDC connection to query this in memory table?

Brandon



RE: [SparkSQL] Incorrect ROLLUP results

2015-07-09 Thread Cheng, Hao
Never mind, I’ve created the jira issue at 
https://issues.apache.org/jira/browse/SPARK-8972.

From: Cheng, Hao [mailto:hao.ch...@intel.com]
Sent: Friday, July 10, 2015 9:15 AM
To: yana.kadiy...@gmail.com; ayan guha
Cc: user
Subject: RE: [SparkSQL] Incorrect ROLLUP results

Yes, this is a bug, do you mind to create a jira issue for this? I will fix 
this asap.

BTW, what’s your spark version?

From: Yana Kadiyska [mailto:yana.kadiy...@gmail.com]
Sent: Friday, July 10, 2015 12:16 AM
To: ayan guha
Cc: user
Subject: Re: [SparkSQL] Incorrect ROLLUP results


+---+---+---+

|cnt|_c1|grp|

+---+---+---+

|  1| 31|  0|

|  1| 31|  1|

|  1|  4|  0|

|  1|  4|  1|

|  1| 42|  0|

|  1| 42|  1|

|  1| 15|  0|

|  1| 15|  1|

|  1| 26|  0|

|  1| 26|  1|

|  1| 37|  0|

|  1| 10|  0|

|  1| 37|  1|

|  1| 10|  1|

|  1| 48|  0|

|  1| 21|  0|

|  1| 48|  1|

|  1| 21|  1|

|  1| 32|  0|

|  1| 32|  1|

+---+---+---+
​

On Thu, Jul 9, 2015 at 11:54 AM, ayan guha 
guha.a...@gmail.commailto:guha.a...@gmail.com wrote:

Can you please post result of show()?
On 10 Jul 2015 01:00, Yana Kadiyska 
yana.kadiy...@gmail.commailto:yana.kadiy...@gmail.com wrote:
Hi folks, I just re-wrote a query from using UNION ALL to use with rollup and 
I'm seeing some unexpected behavior. I'll open a JIRA if needed but wanted to 
check if this is user error. Here is my code:


case class KeyValue(key: Int, value: String)

val df = sc.parallelize(1 to 50).map(i=KeyValue(i, i.toString)).toDF



df.registerTempTable(foo)



sqlContext.sql(“select count(*) as cnt, value as key,GROUPING__ID from foo 
group by value with rollup”).show(100)





sqlContext.sql(“select count(*) as cnt, key % 100 as key,GROUPING__ID from foo 
group by key%100 with rollup”).show(100)
​

Grouping by value does the right thing, I get one group 0 with the overall 
count. But grouping by expression (key%100) produces weird results -- appears 
that group 1 results are replicated as group 0. Am I doing something wrong or 
is this a bug?



RE: [SparkSQL] Incorrect ROLLUP results

2015-07-09 Thread Cheng, Hao
Yes, this is a bug, do you mind to create a jira issue for this? I will fix 
this asap.

BTW, what’s your spark version?

From: Yana Kadiyska [mailto:yana.kadiy...@gmail.com]
Sent: Friday, July 10, 2015 12:16 AM
To: ayan guha
Cc: user
Subject: Re: [SparkSQL] Incorrect ROLLUP results


+---+---+---+

|cnt|_c1|grp|

+---+---+---+

|  1| 31|  0|

|  1| 31|  1|

|  1|  4|  0|

|  1|  4|  1|

|  1| 42|  0|

|  1| 42|  1|

|  1| 15|  0|

|  1| 15|  1|

|  1| 26|  0|

|  1| 26|  1|

|  1| 37|  0|

|  1| 10|  0|

|  1| 37|  1|

|  1| 10|  1|

|  1| 48|  0|

|  1| 21|  0|

|  1| 48|  1|

|  1| 21|  1|

|  1| 32|  0|

|  1| 32|  1|

+---+---+---+
​

On Thu, Jul 9, 2015 at 11:54 AM, ayan guha 
guha.a...@gmail.commailto:guha.a...@gmail.com wrote:

Can you please post result of show()?
On 10 Jul 2015 01:00, Yana Kadiyska 
yana.kadiy...@gmail.commailto:yana.kadiy...@gmail.com wrote:
Hi folks, I just re-wrote a query from using UNION ALL to use with rollup and 
I'm seeing some unexpected behavior. I'll open a JIRA if needed but wanted to 
check if this is user error. Here is my code:


case class KeyValue(key: Int, value: String)

val df = sc.parallelize(1 to 50).map(i=KeyValue(i, i.toString)).toDF



df.registerTempTable(foo)



sqlContext.sql(“select count(*) as cnt, value as key,GROUPING__ID from foo 
group by value with rollup”).show(100)





sqlContext.sql(“select count(*) as cnt, key % 100 as key,GROUPING__ID from foo 
group by key%100 with rollup”).show(100)
​

Grouping by value does the right thing, I get one group 0 with the overall 
count. But grouping by expression (key%100) produces weird results -- appears 
that group 1 results are replicated as group 0. Am I doing something wrong or 
is this a bug?



RE: Hive UDFs

2015-07-07 Thread Cheng, Hao
dataframe.limit(1).selectExpr(xxx).collect()?

-Original Message-
From: chrish2312 [mailto:c...@palantir.com] 
Sent: Wednesday, July 8, 2015 6:20 AM
To: user@spark.apache.org
Subject: Hive UDFs

I know the typical way to apply a hive UDF to a dataframe is basically 
something like:

dataframe.selectExpr(reverse(testString) as reversedString)

Is there a way to apply the hive UDF just to a single row and get a row back? 
Something like:

dataframe.first.selectExpr(reverse(testString) as reversedString)

Thanks in advance!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Hive-UDFs-tp23707.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: HiveContext throws org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient

2015-07-07 Thread Cheng, Hao
Caused by: java.lang.NoClassDefFoundError: Could not initialize class 
org.apache.derby.jdbc.EmbeddedDriver

It will be included in the assembly jar usually, not sure what's wrong. But can 
you try add the derby jar into the driver classpath and try again? 

-Original Message-
From: bdev [mailto:buntu...@gmail.com] 
Sent: Tuesday, July 7, 2015 5:07 PM
To: user@spark.apache.org
Subject: HiveContext throws 
org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient

Just trying to get started with Spark and attempting to use HiveContext using 
spark-shell to interact with existing Hive tables on my CDH cluster but keep 
running into the errors (pls see below) when I do 'hiveContext.sql(show 
tables)'. Wanted to know what all JARs need to be included to have this 
working. Thanks!


java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate 
org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
at
org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:472)
at
org.apache.spark.sql.hive.HiveContext.sessionState$lzycompute(HiveContext.scala:229)
at
org.apache.spark.sql.hive.HiveContext.sessionState(HiveContext.scala:225)
at
org.apache.spark.sql.hive.HiveContext.hiveconf$lzycompute(HiveContext.scala:241)
at org.apache.spark.sql.hive.HiveContext.hiveconf(HiveContext.scala:240)
at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:86)
at 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:31)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:36)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:38)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:40)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:42)
at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:44)
at $iwC$$iwC$$iwC$$iwC.init(console:46)
at $iwC$$iwC$$iwC.init(console:48)
at $iwC$$iwC.init(console:50)
at $iwC.init(console:52)
at init(console:54)
at .init(console:58)
at .clinit(console)
at .init(console:7)
at .clinit(console)
at $print(console)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
at 
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at 
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:856)
at
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:656)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:664)
at
org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:669)
at
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:996)
at
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
at
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
at
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at
org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:944)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1058)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.RuntimeException: Unable to 

RE: Support for Windowing and Analytics functions in Spark SQL

2015-06-22 Thread Cheng, Hao
Yes, with should be with HiveContext, not SQLContext.

From: ayan guha [mailto:guha.a...@gmail.com]
Sent: Tuesday, June 23, 2015 2:51 AM
To: smazumder
Cc: user
Subject: Re: Support for Windowing and Analytics functions in Spark SQL


1.4 supports it
On 23 Jun 2015 02:59, Sourav Mazumder 
sourav.mazumde...@gmail.commailto:sourav.mazumde...@gmail.com wrote:
Hi,
Though the documentation does not explicitly mention support for Windowing and 
Analytics function in Spark SQL, looks like it is not supported.
I tried running a query like Select Lead(column name, 1) over (Partition By 
column name order by column name) from table name and I got error saying 
that this feature is unsupported.
I tried it in Databricks cloud and that supports Spark 1.4.
Can anyone please confirm this ?
Regards,
Sourav


RE: Question about SPARK_WORKER_CORES and spark.task.cpus

2015-06-22 Thread Cheng, Hao
It’s actually not that tricky.
SPARK_WORKER_CORES: is the max task thread pool size of the of the executor, 
the same saying of “one executor with 32 cores and the executor could execute 
32 tasks simultaneously”. Spark doesn’t care about how much real physical 
CPU/Cores you have (OS does), so user need to give an appropriate value to 
reflect the real physical machine settings, otherwise the thread context 
switching probably be an overhead for the CPU intensive tasks.

“spark.task.cpus”: I copied how to it’s used from the Spark source code:

  // TODO: The default value of 1 for spark.executor.cores works right now 
because dynamic
  // allocation is only supported for YARN and the default number of cores per 
executor in YARN is
  // 1, but it might need to be attained differently for different cluster 
managers
  private val tasksPerExecutor =
conf.getInt(spark.executor.cores, 1) / conf.getInt(spark.task.cpus, 1)

It means the “Number of Tasks per Executor”(parallelize task number per 
executor) = SPARK_WORKER_CORES / “spark.task.cpus”

“spark.task.cpus” gives user an opportunity to reserve resources for a task 
which probably create more running threads internally. (For example, run a 
multithreaded external app within each task).

Hope it helpful.


From: Rui Li [mailto:spark.ru...@gmail.com]
Sent: Tuesday, June 23, 2015 8:56 AM
To: user@spark.apache.org
Subject: Question about SPARK_WORKER_CORES and spark.task.cpus

Hi,

I was running a WordCount application on Spark, and the machine I used has 4 
physical cores. However, in spark-env.sh file, I set  SPARK_WORKER_CORES = 32. 
The web UI says it launched one executor with 32 cores and the executor could 
execute 32 tasks simultaneously. Does spark create 32 vCores out of 4 physical 
cores? How much physical CPU resource can each task get then?

Also, I found a parameter “spark.task.cpus”, but I don’t quite understand this 
parameter. If I set it to 2, does Spark allocate 2 CPU cores for one task? I 
think “task” is a “thread” within executor (“process”), so how can a thread 
utilize two CPU cores simultaneously?

I am looking forward to your reply, thanks!

Best,
Rui


RE: Is HiveContext Thread Safe?

2015-06-17 Thread Cheng, Hao
Yes, it is thread safe. That’s how Spark SQL JDBC Server works.

Cheng Hao

From: V Dineshkumar [mailto:developer.dines...@gmail.com]
Sent: Wednesday, June 17, 2015 9:44 PM
To: user@spark.apache.org
Subject: Is HiveContext Thread Safe?

Hi,

I  have a HiveContext which I am using in multiple threads to submit a Spark 
SQL query using sql method.
I just wanted to know whether this method is thread-safe or not?Will all my 
queries be submitted at the same time independent of each other or will be 
submitted sequential one after the other?

Thanks,
Dinesh


RE: generateTreeString causes huge performance problems on dataframe persistence

2015-06-17 Thread Cheng, Hao
Seems you're hitting the self-join, currently Spark SQL won't cache any 
result/logical tree for further analyzing or computing for self-join. Since the 
logical tree is huge, it's reasonable to take long time in generating its tree 
string recursively. And I also doubt the computing can finish within a 
reasonable time, as there probably be lots of partitions (grows exponentially) 
of the intermediate result.

As a workaround, you can break the iterations into smaller ones and trigger 
them manually in sequence.

-Original Message-
From: Jan-Paul Bultmann [mailto:janpaulbultm...@me.com] 
Sent: Wednesday, June 17, 2015 6:17 PM
To: User
Subject: generateTreeString causes huge performance problems on dataframe 
persistence

Hey,
I noticed that my code spends hours with `generateTreeString` even though the 
actual dag/dataframe execution takes seconds.

I’m running a query that grows exponential in the number of iterations when 
evaluated without caching, but should be linear when caching previous results.

E.g.

result_i+1 = distinct(join(result_i, result_i))

Which evaluates exponentially like this this without caching.

Iteration | Dataframe Plan Tree
0|/\
1| /\/\
2|/\/\  /\/\
n|……….

But should be linear with caching.

Iteration | Dataframe Plan Tree
0| /\
  | \/
1| /\
  | \/
2| /\
  | \/
n| ……….


It seems that even though the DAG will have the later form, 
`generateTreeString` will walk the entire plan naively as if no caching was 
done.

The spark webui also shows no active jobs even though my CPU uses one core 
fully, calculating that string.

Below is the piece of stacktrace that starts the entire walk.

^
|
Thousands of calls to  `generateTreeString`.
|
org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(int, 
StringBuilder) TreeNode.scala:431
org.apache.spark.sql.catalyst.trees.TreeNode.treeString() TreeNode.scala:400
org.apache.spark.sql.catalyst.trees.TreeNode.toString() TreeNode.scala:397
org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$buildBuffers$2.apply() 
InMemoryColumnarTableScan.scala:164
org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$buildBuffers$2.apply() 
InMemoryColumnarTableScan.scala:164
scala.Option.getOrElse(Function0) Option.scala:120
org.apache.spark.sql.columnar.InMemoryRelation.buildBuffers() 
InMemoryColumnarTableScan.scala:164
org.apache.spark.sql.columnar.InMemoryRelation.init(Seq, boolean, int, 
StorageLevel, SparkPlan, Option, RDD, Statistics, Accumulable) 
InMemoryColumnarTableScan.scala:112
org.apache.spark.sql.columnar.InMemoryRelation$.apply(boolean, int, 
StorageLevel, SparkPlan, Option) InMemoryColumnarTableScan.scala:45
org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery$1.apply() 
CacheManager.scala:102
org.apache.spark.sql.execution.CacheManager.writeLock(Function0) 
CacheManager.scala:70 
org.apache.spark.sql.execution.CacheManager.cacheQuery(DataFrame, Option, 
StorageLevel) CacheManager.scala:94
org.apache.spark.sql.DataFrame.persist(StorageLevel) DataFrame.scala:1320 ^
|
Application logic.
|

Could someone confirm my suspicion?
And does somebody know why it’s called while caching, and why it walks the 
entire tree including cached results?

Cheers, Jan-Paul
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org



RE: 回复: Re: 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows.

2015-06-12 Thread Cheng, Hao
Not sure if Spark RDD will provide API to fetch the record one by one from the 
final result set, instead of the pulling them all / (or whole partition data) 
and fit in the driver memory.

Seems a big change.

From: Cheng Lian [mailto:l...@databricks.com]
Sent: Friday, June 12, 2015 3:51 PM
To: 姜超才; Hester wang; user@spark.apache.org
Subject: Re: 回复: Re: 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 
1,000,000 rows.

Thanks for the extra details and explanations Chaocai, will try to reproduce 
this when I got chance.

Cheng
On 6/12/15 3:44 PM, 姜超才 wrote:
I said OOM occurred on slave node, because I monitored memory utilization 
during the query task, on driver, very few memory was ocupied. And i remember i 
have ever seen the OOM stderr log on slave node.

But recently there seems no OOM log on slave node.
Follow the cmd 、data 、env and the code I gave you, the OOM can 100% repro on 
cluster mode.

Thanks,

SuperJ


- 原始邮件信息 -
发件人: Cheng Lian l...@databricks.commailto:l...@databricks.com
收件人: 姜超才 jiangchao...@haiyisoft.commailto:jiangchao...@haiyisoft.com, 
Hester wang hester9...@gmail.commailto:hester9...@gmail.com, 
user@spark.apache.orgmailto:user@spark.apache.org
主题: Re: 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows.
日期: 2015/06/12 15:30:08 (Fri)

Hi Chaocai,

Glad that 1.4 fixes your case. However, I'm a bit confused by your last comment 
saying The OOM or lose heartbeat was occurred on slave node. Because from the 
log files you attached at first, those OOM actually happens on driver side 
(Thrift server log only contains log lines from driver side). Did you see OOM 
from executor stderr output? I ask this because there are still a large portion 
of users are using 1.3, and we may want deliver a fix if there does exist bugs 
that causes unexpected OOM.

Cheng
On 6/12/15 3:14 PM, 姜超才 wrote:
Hi Lian,

Today I update my spark to v1.4. This issue resolved.

Thanks,
SuperJ

- 原始邮件信息 -
发件人: 姜超才
收件人: Cheng Lian , Hester wang ,
主题: 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows.
日期: 2015/06/11 08:56:28 (Thu)

No problem on Local mode. I can get all rows.

Select * from foo;

The OOM or lose heartbeat was occured on slave node.

Thanks,

SuperJ


- 原始邮件信息 -
发件人: Cheng Lian
收件人: 姜超才 , Hester wang ,
主题: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows.
日期: 2015/06/10 19:58:59 (Wed)

Hm, I tried the following with 0.13.1 and 0.13.0 on my laptop (don't have 
access to a cluster for now) but couldn't reproduce this issue. Your program 
just executed smoothly... :-/

Command line used to start the Thrift server:

./sbin/start-thriftserver.sh --driver-memory 4g --master local

SQL statements used to create the table with your data:

create table foo(k string, v double);
load data local inpath '/tmp/bar' into table foo;

Tried this via Beeline:

select * from foo limit 160;

Also tried the Java program you provided.

Could you also try to verify whether this single node local mode works for you? 
 Will investigate this with a cluster when I get chance.

Cheng
On 6/10/15 5:19 PM, 姜超才 wrote:
When set spark.sql.thriftServer.incrementalCollect and set driver memory to 
7G, Things seems stable and simple:
It can quickly run through the query line, but when traversal the result set ( 
while rs.hasNext ), it can quickly get the OOM: java heap space. See attachment.

/usr/local/spark/spark-1.3.0/sbin/start-thriftserver.sh --master 
spark://cx-spark-001:7077 --conf spark.executor.memory=4g --conf 
spark.driver.memory=7g --conf spark.shuffle.consolidateFiles=true --conf 
spark.shuffle.manager=sort --conf 
spark.executor.extraJavaOptions=-XX:-UseGCOverheadLimit --conf 
spark.file.transferTo=false --conf spark.akka.timeout=2000 --conf 
spark.storage.memoryFraction=0.4 --conf spark.cores.max=8 --conf 
spark.kryoserializer.buffer.mb=256 --conf 
spark.serializer=org.apache.spark.serializer.KryoSerializer --conf 
spark.akka.frameSize=512 --driver-class-path /usr/local/hive/lib/classes12.jar 
--conf spark.sql.thriftServer.incrementalCollect=true

Thanks,

SuperJ


- 原始邮件信息 -
发件人: Cheng Lian
收件人: 姜超才 , Hester wang ,
主题: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows.
日期: 2015/06/10 16:37:34 (Wed)

Also, if the data isn't confidential, would you mind to send me a compressed 
copy (don't cc user@spark.apache.orgmailto:user@spark.apache.org)?

Cheng
On 6/10/15 4:23 PM, 姜超才 wrote:
Hi Lian,

Thanks for your quick response.






I forgot mention that I have tuned driver memory from 2G to 4G, 
seems got minor improvement, The dead way when fetching 1,400,000 rows changed 
from OOM::GC overhead limit exceeded to  lost worker heartbeat after 120s.


  I will try  to set 
spark.sql.thriftServer.incrementalCollect and continue increase driver memory 
to 7G, and will send the result to you.

Thanks,

SuperJ


- 原始邮件信息 -
发件人: 

RE: 回复: Re: 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows.

2015-06-12 Thread Cheng, Hao
Not sure if Spark Core will provide API to fetch the record one by one from the 
block manager, instead of the pulling them all into the driver memory.

From: Cheng Lian [mailto:l...@databricks.com]
Sent: Friday, June 12, 2015 3:51 PM
To: 姜超才; Hester wang; user@spark.apache.org
Subject: Re: 回复: Re: 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 
1,000,000 rows.

Thanks for the extra details and explanations Chaocai, will try to reproduce 
this when I got chance.

Cheng
On 6/12/15 3:44 PM, 姜超才 wrote:
I said OOM occurred on slave node, because I monitored memory utilization 
during the query task, on driver, very few memory was ocupied. And i remember i 
have ever seen the OOM stderr log on slave node.

But recently there seems no OOM log on slave node.
Follow the cmd 、data 、env and the code I gave you, the OOM can 100% repro on 
cluster mode.

Thanks,

SuperJ


- 原始邮件信息 -
发件人: Cheng Lian l...@databricks.commailto:l...@databricks.com
收件人: 姜超才 jiangchao...@haiyisoft.commailto:jiangchao...@haiyisoft.com, 
Hester wang hester9...@gmail.commailto:hester9...@gmail.com, 
user@spark.apache.orgmailto:user@spark.apache.org
主题: Re: 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows.
日期: 2015/06/12 15:30:08 (Fri)

Hi Chaocai,

Glad that 1.4 fixes your case. However, I'm a bit confused by your last comment 
saying The OOM or lose heartbeat was occurred on slave node. Because from the 
log files you attached at first, those OOM actually happens on driver side 
(Thrift server log only contains log lines from driver side). Did you see OOM 
from executor stderr output? I ask this because there are still a large portion 
of users are using 1.3, and we may want deliver a fix if there does exist bugs 
that causes unexpected OOM.

Cheng
On 6/12/15 3:14 PM, 姜超才 wrote:
Hi Lian,

Today I update my spark to v1.4. This issue resolved.

Thanks,
SuperJ

- 原始邮件信息 -
发件人: 姜超才
收件人: Cheng Lian , Hester wang ,
主题: 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows.
日期: 2015/06/11 08:56:28 (Thu)

No problem on Local mode. I can get all rows.

Select * from foo;

The OOM or lose heartbeat was occured on slave node.

Thanks,

SuperJ


- 原始邮件信息 -
发件人: Cheng Lian
收件人: 姜超才 , Hester wang ,
主题: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows.
日期: 2015/06/10 19:58:59 (Wed)

Hm, I tried the following with 0.13.1 and 0.13.0 on my laptop (don't have 
access to a cluster for now) but couldn't reproduce this issue. Your program 
just executed smoothly... :-/

Command line used to start the Thrift server:

./sbin/start-thriftserver.sh --driver-memory 4g --master local

SQL statements used to create the table with your data:

create table foo(k string, v double);
load data local inpath '/tmp/bar' into table foo;

Tried this via Beeline:

select * from foo limit 160;

Also tried the Java program you provided.

Could you also try to verify whether this single node local mode works for you? 
 Will investigate this with a cluster when I get chance.

Cheng
On 6/10/15 5:19 PM, 姜超才 wrote:
When set spark.sql.thriftServer.incrementalCollect and set driver memory to 
7G, Things seems stable and simple:
It can quickly run through the query line, but when traversal the result set ( 
while rs.hasNext ), it can quickly get the OOM: java heap space. See attachment.

/usr/local/spark/spark-1.3.0/sbin/start-thriftserver.sh --master 
spark://cx-spark-001:7077 --conf spark.executor.memory=4g --conf 
spark.driver.memory=7g --conf spark.shuffle.consolidateFiles=true --conf 
spark.shuffle.manager=sort --conf 
spark.executor.extraJavaOptions=-XX:-UseGCOverheadLimit --conf 
spark.file.transferTo=false --conf spark.akka.timeout=2000 --conf 
spark.storage.memoryFraction=0.4 --conf spark.cores.max=8 --conf 
spark.kryoserializer.buffer.mb=256 --conf 
spark.serializer=org.apache.spark.serializer.KryoSerializer --conf 
spark.akka.frameSize=512 --driver-class-path /usr/local/hive/lib/classes12.jar 
--conf spark.sql.thriftServer.incrementalCollect=true

Thanks,

SuperJ


- 原始邮件信息 -
发件人: Cheng Lian
收件人: 姜超才 , Hester wang ,
主题: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows.
日期: 2015/06/10 16:37:34 (Wed)

Also, if the data isn't confidential, would you mind to send me a compressed 
copy (don't cc user@spark.apache.orgmailto:user@spark.apache.org)?

Cheng
On 6/10/15 4:23 PM, 姜超才 wrote:
Hi Lian,

Thanks for your quick response.






I forgot mention that I have tuned driver memory from 2G to 4G, 
seems got minor improvement, The dead way when fetching 1,400,000 rows changed 
from OOM::GC overhead limit exceeded to  lost worker heartbeat after 120s.


  I will try  to set 
spark.sql.thriftServer.incrementalCollect and continue increase driver memory 
to 7G, and will send the result to you.

Thanks,

SuperJ


- 原始邮件信息 -
发件人: Cheng Lian
收件人: Hester wang ,
主题: Re: Met OOM when fetching 

RE: Spark SQL with Thrift Server is very very slow and finally failing

2015-06-09 Thread Cheng, Hao
Is it the large result set return from the Thrift Server? And can you paste the 
SQL and physical plan?

From: Ted Yu [mailto:yuzhih...@gmail.com]
Sent: Tuesday, June 9, 2015 12:01 PM
To: Sourav Mazumder
Cc: user
Subject: Re: Spark SQL with Thrift Server is very very slow and finally failing

Which Spark release are you using ?

Can you pastebin the stack trace w.r.t. ExecutorLostFailure ?

Thanks

On Mon, Jun 8, 2015 at 8:52 PM, Sourav Mazumder 
sourav.mazumde...@gmail.commailto:sourav.mazumde...@gmail.com wrote:
Hi,
I am trying to run a SQL form a JDBC driver using Spark's Thrift Server.
I'm doing a join between a Hive Table of size around 100 GB and another Hive 
Table with 10 KB, with a filter on a particular column
The query takes more than 45 minutes and then I get ExecutorLostFailure. That 
is because of memory as once I increase the memory the failure happens but 
after a long time.
I'm having executor memory 20 GB, Spark DRiver Memory 2 GB, Executor Instances 
2 and Executor Core 2.
Running the job using Yarn with master as 'yarn-client'.
Any idea if I'm missing any other configuration ?
Regards,
Sourav



RE: SparkSQL : using Hive UDF returning Map throws rror: scala.MatchError: interface java.util.Map (of class java.lang.Class) (state=,code=0)

2015-06-05 Thread Cheng, Hao
Confirmed, with latest master, we don't support complex data type for Simple 
Hive UDF, do you mind file an issue in jira?

-Original Message-
From: Cheng, Hao [mailto:hao.ch...@intel.com] 
Sent: Friday, June 5, 2015 12:35 PM
To: ogoh; user@spark.apache.org
Subject: RE: SparkSQL : using Hive UDF returning Map throws rror: 
scala.MatchError: interface java.util.Map (of class java.lang.Class) 
(state=,code=0)

Which version of Hive jar are you using? Hive 0.13.1 or Hive 0.12.0?

-Original Message-
From: ogoh [mailto:oke...@gmail.com] 
Sent: Friday, June 5, 2015 10:10 AM
To: user@spark.apache.org
Subject: SparkSQL : using Hive UDF returning Map throws rror: 
scala.MatchError: interface java.util.Map (of class java.lang.Class) 
(state=,code=0)


Hello,
I tested some custom udf on SparkSql's ThriftServer  Beeline (Spark 1.3.1).
Some udfs work fine (access array parameter and returning int or string type). 
But my udf returning map type throws an error:
Error: scala.MatchError: interface java.util.Map (of class java.lang.Class) 
(state=,code=0)

I converted the code into Hive's GenericUDF since I worried that using complex 
type parameter (array of map) and returning complex type (map) can be supported 
in Hive's GenericUDF instead of simple UDF.
But SparkSQL doesn't seem supporting GenericUDF.(error message : Error:
java.lang.IllegalAccessException: Class
org.apache.spark.sql.hive.HiveFunctionWrapper can not access ..).

Below is my example udf code returning MAP type.
I appreciate any advice.
Thanks

--

public final class ArrayToMap extends UDF {

public MapString,String evaluate(ArrayListString arrayOfString) {
// add code to handle all index problem

MapString, String map = new HashMapString, String();
   
int count = 0;
for (String element : arrayOfString) {
map.put(count + , element);
count++;

}
return map;
}
}






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-using-Hive-UDF-returning-Map-throws-rror-scala-MatchError-interface-java-util-Map-of-class--tp23164.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: SparkSQL : using Hive UDF returning Map throws rror: scala.MatchError: interface java.util.Map (of class java.lang.Class) (state=,code=0)

2015-06-04 Thread Cheng, Hao
Which version of Hive jar are you using? Hive 0.13.1 or Hive 0.12.0?

-Original Message-
From: ogoh [mailto:oke...@gmail.com] 
Sent: Friday, June 5, 2015 10:10 AM
To: user@spark.apache.org
Subject: SparkSQL : using Hive UDF returning Map throws rror: 
scala.MatchError: interface java.util.Map (of class java.lang.Class) 
(state=,code=0)


Hello,
I tested some custom udf on SparkSql's ThriftServer  Beeline (Spark 1.3.1).
Some udfs work fine (access array parameter and returning int or string type). 
But my udf returning map type throws an error:
Error: scala.MatchError: interface java.util.Map (of class java.lang.Class) 
(state=,code=0)

I converted the code into Hive's GenericUDF since I worried that using complex 
type parameter (array of map) and returning complex type (map) can be supported 
in Hive's GenericUDF instead of simple UDF.
But SparkSQL doesn't seem supporting GenericUDF.(error message : Error:
java.lang.IllegalAccessException: Class
org.apache.spark.sql.hive.HiveFunctionWrapper can not access ..).

Below is my example udf code returning MAP type.
I appreciate any advice.
Thanks

--

public final class ArrayToMap extends UDF {

public MapString,String evaluate(ArrayListString arrayOfString) {
// add code to handle all index problem

MapString, String map = new HashMapString, String();
   
int count = 0;
for (String element : arrayOfString) {
map.put(count + , element);
count++;

}
return map;
}
}






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-using-Hive-UDF-returning-Map-throws-rror-scala-MatchError-interface-java-util-Map-of-class--tp23164.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Pointing SparkSQL to existing Hive Metadata with data file locations in HDFS

2015-05-27 Thread Cheng, Hao
Yes, but be sure you put the hive-site.xml under your class path.

Any problem you meet?

Cheng Hao

From: Sanjay Subramanian [mailto:sanjaysubraman...@yahoo.com.INVALID]
Sent: Thursday, May 28, 2015 8:53 AM
To: user
Subject: Pointing SparkSQL to existing Hive Metadata with data file locations 
in HDFS

hey guys

On the Hive/Hadoop ecosystem we have using Cloudera distribution CDH 5.2.x , 
there are about 300+ hive tables.
The data is stored an text (moving slowly to Parquet) on HDFS.
I want to use SparkSQL and point to the Hive metadata and be able to define 
JOINS etc using a programming structure like this

import org.apache.spark.sql.hive.HiveContext
val sqlContext = new HiveContext(sc)
val schemaRdd = sqlContext.sql(some complex SQL)


Is that the way to go ? Some guidance will be great.

thanks

sanjay




RE: SparkSQL errors in 1.4 rc when using with Hive 0.12 metastore

2015-05-24 Thread Cheng, Hao
Thanks for reporting this.

We intend to support the multiple metastore versions in a single 
build(hive-0.13.1) by introducing the IsolatedClientLoader, but probably you’re 
hitting the bug, please file a jira issue for this.

I will keep investigating on this also.

Hao


From: Mark Hamstra [mailto:m...@clearstorydata.com]
Sent: Sunday, May 24, 2015 9:06 PM
To: Cheolsoo Park
Cc: user@spark.apache.org; d...@spark.apache.org
Subject: Re: SparkSQL errors in 1.4 rc when using with Hive 0.12 metastore

This discussion belongs on the dev list.  Please post any replies there.

On Sat, May 23, 2015 at 10:19 PM, Cheolsoo Park 
piaozhe...@gmail.commailto:piaozhe...@gmail.com wrote:
Hi,

I've been testing SparkSQL in 1.4 rc and found two issues. I wanted to confirm 
whether these are bugs or not before opening a jira.

1) I can no longer compile SparkSQL with -Phive-0.12.0. I noticed that in 1.4, 
IsolatedClientLoader is introduced, and different versions of Hive metastore 
jars can be loaded at runtime. But instead, SparkSQL no longer compiles with 
Hive 0.12.0.

My question is, is this intended? If so, shouldn't the hive-0.12.0 profile in 
POM be removed?

2) After compiling SparkSQL with -Phive-0.13.1, I ran into my 2nd problem. 
Since I have Hive 0.12 metastore in production, I have to use it for now. But 
even if I set spark.sql.hive.metastore.version and 
spark.sql.hive.metastore.jars, SparkSQL cli throws an error as follows-

15/05/24 05:03:29 WARN RetryingMetaStoreClient: MetaStoreClient lost 
connection. Attempting to reconnect.
org.apache.thrift.TApplicationException: Invalid method name: 'get_functions'
at org.apache.thrift.TApplicationException.read(TApplicationException.java:108)
at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:71)
at 
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_functions(ThriftHiveMetastore.java:2886)
at 
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_functions(ThriftHiveMetastore.java:2872)
at 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getFunctions(HiveMetaStoreClient.java:1727)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:89)
at com.sun.proxy.$Proxy12.getFunctions(Unknown Source)
at org.apache.hadoop.hive.ql.metadata.Hive.getFunctions(Hive.java:2670)
at 
org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionNames(FunctionRegistry.java:674)
at 
org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionNames(FunctionRegistry.java:662)
at org.apache.hadoop.hive.cli.CliDriver.getCommandCompletor(CliDriver.java:540)
at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:175)
at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)

What's happening is that when SparkSQL Cli starts up, it tries to fetch 
permanent udfs from Hive metastore (due to 
HIVE-6330https://issues.apache.org/jira/browse/HIVE-6330, which was 
introduced in Hive 0.13). But then, it ends up invoking an incompatible thrift 
function that doesn't exist in Hive 0.12. To work around this error, I have to 
comment out the following line of code for now-
https://goo.gl/wcfnH1

My question is, is SparkSQL that is compiled against Hive 0.13 supposed to work 
with Hive 0.12 metastore (by setting spark.sql.hive.metastore.version and 
spark.sql.hive.metastore.jars)? It only works if I comment out the above line 
of code.

Thanks,
Cheolsoo



RE: InferredSchema Example in Spark-SQL

2015-05-17 Thread Cheng, Hao
Forgot to import the implicit functions/classes?

import sqlContext.implicits._

From: Rajdeep Dua [mailto:rajdeep@gmail.com]
Sent: Monday, May 18, 2015 8:08 AM
To: user@spark.apache.org
Subject: InferredSchema Example in Spark-SQL

Hi All,
Was trying the Inferred Schema spart example
http://spark.apache.org/docs/latest/sql-programming-guide.html#overview

I am getting the following compilation error on the function toRD()

value toRD is not a member of org.apache.spark.rdd.RDD[Person]
[error] val people = 
sc.textFile(/home/ubuntu/work/spark-src/spark/examples/src/main/resources/people.txt).map(_.split(,)).map(p
 = Person(p(0), p(1).trim.toInt)).toRD()
[error]

Thanks
Rajdeep




RE: InferredSchema Example in Spark-SQL

2015-05-17 Thread Cheng, Hao
Typo? Should be .toDF(), not .toRD()

From: Ram Sriharsha [mailto:sriharsha@gmail.com]
Sent: Monday, May 18, 2015 8:31 AM
To: Rajdeep Dua
Cc: user
Subject: Re: InferredSchema Example in Spark-SQL

you mean toDF() ? (toDF converts the RDD to a DataFrame, in this case inferring 
schema from the case class)



On Sun, May 17, 2015 at 5:07 PM, Rajdeep Dua 
rajdeep@gmail.commailto:rajdeep@gmail.com wrote:
Hi All,
Was trying the Inferred Schema spart example
http://spark.apache.org/docs/latest/sql-programming-guide.html#overview

I am getting the following compilation error on the function toRD()

value toRD is not a member of org.apache.spark.rdd.RDD[Person]
[error] val people = 
sc.textFile(/home/ubuntu/work/spark-src/spark/examples/src/main/resources/people.txt).map(_.split(,)).map(p
 = Person(p(0), p(1).trim.toInt)).toRD()
[error]

Thanks
Rajdeep





RE: What's the advantage features of Spark SQL(JDBC)

2015-05-15 Thread Cheng, Hao
Spark SQL just take the JDBC as a new data source, the same as we need to 
support loading data from a .csv or .json.

From: Yi Zhang [mailto:zhangy...@yahoo.com.INVALID]
Sent: Friday, May 15, 2015 2:30 PM
To: User
Subject: What's the advantage features of Spark SQL(JDBC)

Hi All,

Comparing direct access via JDBC, what's the advantage features of Spark 
SQL(JDBC) to access external data source?

Any tips are welcome! Thanks.

Regards,
Yi



RE: question about sparksql caching

2015-05-15 Thread Cheng, Hao
You probably can try something like:
val df = sqlContext.sql(select c1, sum(c2) from T1, T2 where T1.key=T2.key 
group by c1)
df.cache() // Cache the result, but it's a lazy execution.
df.registerAsTempTable(my_result)

sqlContext.sql(select * from my_result where c1=1).collect   // the cache 
execution will be triggered here when first query on it
sqlContext.sql(select * from my_result where c1=1).collect   // the cache 
already there, will be very fast

And you can also cache the raw tables like:
sqlContext.cacheTable(T1)
sqlContext.cacheTable(T2)

They also will be cached when first query comes, and we will benefit from it as 
it's in-memory columnar storages.

One thing you should know is the cache here cannot cross processes shared (more 
precisely, cannot cross the SparkContext instance)

-Original Message-
From: sequoiadb [mailto:mailing-list-r...@sequoiadb.com] 
Sent: Friday, May 15, 2015 11:02 AM
To: user
Subject: question about sparksql caching

Hi all,

We are planing to use SparkSQL in a DW system. There's a question about the 
caching mechanism of SparkSQL.

For example, if I have a SQL like sqlContext.sql(select c1, sum(c2) from T1, 
T2 where T1.key=T2.key group by c1).cache()

Is it going to cache the final result or the raw data of each table that used 
in the SQL?

Since the user may have various of SQLs that use those tables, if the caching 
is for the final result only, it may still take very long time to scan the 
entire table if it's a brand new SQL.

If this is the case, is there any other better way to cache the base tables 
instead of final result?

Thanks

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: What's the advantage features of Spark SQL(JDBC)

2015-05-15 Thread Cheng, Hao
Yes.

From: Yi Zhang [mailto:zhangy...@yahoo.com]
Sent: Friday, May 15, 2015 2:51 PM
To: Cheng, Hao; User
Subject: Re: What's the advantage features of Spark SQL(JDBC)

@Hao,
As you said, there is no advantage feature for JDBC, it just provides unified 
api to support different data sources. Is it right?


On Friday, May 15, 2015 2:46 PM, Cheng, Hao 
hao.ch...@intel.commailto:hao.ch...@intel.com wrote:

Spark SQL just take the JDBC as a new data source, the same as we need to 
support loading data from a .csv or .json.

From: Yi Zhang [mailto:zhangy...@yahoo.com.INVALID]
Sent: Friday, May 15, 2015 2:30 PM
To: User
Subject: What's the advantage features of Spark SQL(JDBC)

Hi All,

Comparing direct access via JDBC, what's the advantage features of Spark 
SQL(JDBC) to access external data source?

Any tips are welcome! Thanks.

Regards,
Yi




RE: 回复:Re: sparksql running slow while joining_2_tables.

2015-05-05 Thread Cheng, Hao
56mb / 26mb is very small size, do you observe data skew? More precisely, many 
records with the same chrname / name?  And can you also double check the jvm 
settings for the executor process?


From: luohui20...@sina.com [mailto:luohui20...@sina.com]
Sent: Tuesday, May 5, 2015 7:50 PM
To: Cheng, Hao; Wang, Daoyuan; Olivier Girardot; user
Subject: 回复:Re: sparksql running slow while joining_2_tables.


Hi guys,

  attache the pic of physical plan and logs.Thanks.



Thanksamp;Best regards!
罗辉 San.Luo

- 原始邮件 -
发件人:Cheng, Hao hao.ch...@intel.commailto:hao.ch...@intel.com
收件人:Wang, Daoyuan daoyuan.w...@intel.commailto:daoyuan.w...@intel.com, 
luohui20...@sina.commailto:luohui20...@sina.com 
luohui20...@sina.commailto:luohui20...@sina.com, Olivier Girardot 
ssab...@gmail.commailto:ssab...@gmail.com, user 
user@spark.apache.orgmailto:user@spark.apache.org
主题:Re: sparksql running slow while joining_2_tables.
日期:2015年05月05日 13点18分


I assume you’re using the DataFrame API within your application.



sql(“SELECT…”).explain(true)



From: Wang, Daoyuan
Sent: Tuesday, May 5, 2015 10:16 AM
To: luohui20...@sina.commailto:luohui20...@sina.com; Cheng, Hao; Olivier 
Girardot; user
Subject: RE: 回复:RE: 回复:Re: sparksql running slow while joining_2_tables.



You can use

Explain extended select ….



From: luohui20...@sina.commailto:luohui20...@sina.com 
[mailto:luohui20...@sina.com]
Sent: Tuesday, May 05, 2015 9:52 AM
To: Cheng, Hao; Olivier Girardot; user
Subject: 回复:RE: 回复:Re: sparksql running slow while joining_2_tables.



As I know broadcastjoin is automatically enabled by 
spark.sql.autoBroadcastJoinThreshold.

refer to 
http://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration-options



and how to check my app's physical plan,and others things like optimized 
plan,executable plan.etc



thanks







Thanksamp;Best regards!
罗辉 San.Luo



- 原始邮件 -
发件人:Cheng, Hao hao.ch...@intel.commailto:hao.ch...@intel.com
收件人:Cheng, Hao hao.ch...@intel.commailto:hao.ch...@intel.com, 
luohui20...@sina.commailto:luohui20...@sina.com 
luohui20...@sina.commailto:luohui20...@sina.com, Olivier Girardot 
ssab...@gmail.commailto:ssab...@gmail.com, user 
user@spark.apache.orgmailto:user@spark.apache.org
主题:RE: 回复:Re: sparksql running slow while joining_2_tables.
日期:2015年05月05日 08点38分



Or, have you ever try broadcast join?



From: Cheng, Hao [mailto:hao.ch...@intel.com]
Sent: Tuesday, May 5, 2015 8:33 AM
To: luohui20...@sina.commailto:luohui20...@sina.com; Olivier Girardot; user
Subject: RE: 回复:Re: sparksql running slow while joining 2 tables.



Can you print out the physical plan?



EXPLAIN SELECT xxx…



From: luohui20...@sina.commailto:luohui20...@sina.com 
[mailto:luohui20...@sina.com]
Sent: Monday, May 4, 2015 9:08 PM
To: Olivier Girardot; user
Subject: 回复:Re: sparksql running slow while joining 2 tables.



hi Olivier

spark1.3.1, with java1.8.0.45

and add 2 pics .

it seems like a GC issue. I also tried with different parameters like memory 
size of driverexecutor, memory fraction, java opts...

but this issue still happens.







Thanksamp;Best regards!
罗辉 San.Luo



- 原始邮件 -
发件人:Olivier Girardot ssab...@gmail.commailto:ssab...@gmail.com
收件人:luohui20...@sina.commailto:luohui20...@sina.com, user 
user@spark.apache.orgmailto:user@spark.apache.org
主题:Re: sparksql running slow while joining 2 tables.
日期:2015年05月04日 20点46分



Hi,
What is you Spark version ?



Regards,



Olivier.



Le lun. 4 mai 2015 à 11:03, luohui20...@sina.commailto:luohui20...@sina.com 
a écrit :

hi guys

when i am running a sql  like select 
a.namehttp://a.name/,a.startpoint,a.endpoint, a.piece from db a join sample b 
on (a.namehttp://a.name/ = b.namehttp://b.name/) where (b.startpoint  
a.startpoint + 25); I found sparksql running slow in minutes which may caused 
by very long GC and shuffle time.



   table db is created from a txt file size at 56mb while table sample 
sized at 26mb, both at small size.

   my spark cluster is a standalone  pseudo-distributed spark cluster with 
8g executor and 4g driver manager.

   any advises? thank you guys.









Thanksamp;Best regards!
罗辉 San.Luo

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org


RE: 回复:Re: sparksql running slow while joining 2 tables.

2015-05-04 Thread Cheng, Hao
Can you print out the physical plan?

EXPLAIN SELECT xxx…

From: luohui20...@sina.com [mailto:luohui20...@sina.com]
Sent: Monday, May 4, 2015 9:08 PM
To: Olivier Girardot; user
Subject: 回复:Re: sparksql running slow while joining 2 tables.


hi Olivier

spark1.3.1, with java1.8.0.45

and add 2 pics .

it seems like a GC issue. I also tried with different parameters like memory 
size of driverexecutor, memory fraction, java opts...

but this issue still happens.



Thanksamp;Best regards!
罗辉 San.Luo

- 原始邮件 -
发件人:Olivier Girardot ssab...@gmail.commailto:ssab...@gmail.com
收件人:luohui20...@sina.commailto:luohui20...@sina.com, user 
user@spark.apache.orgmailto:user@spark.apache.org
主题:Re: sparksql running slow while joining 2 tables.
日期:2015年05月04日 20点46分

Hi,
What is you Spark version ?

Regards,

Olivier.

Le lun. 4 mai 2015 à 11:03, luohui20...@sina.commailto:luohui20...@sina.com 
a écrit :

hi guys

when i am running a sql  like select 
a.namehttp://a.name,a.startpoint,a.endpoint, a.piece from db a join sample b 
on (a.namehttp://a.name = b.namehttp://b.name) where (b.startpoint  
a.startpoint + 25); I found sparksql running slow in minutes which may caused 
by very long GC and shuffle time.



   table db is created from a txt file size at 56mb while table sample 
sized at 26mb, both at small size.

   my spark cluster is a standalone  pseudo-distributed spark cluster with 
8g executor and 4g driver manager.

   any advises? thank you guys.





Thanksamp;Best regards!
罗辉 San.Luo

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org


Re: sparksql running slow while joining_2_tables.

2015-05-04 Thread Cheng, Hao
I assume you’re using the DataFrame API within your application.

sql(“SELECT…”).explain(true)

From: Wang, Daoyuan
Sent: Tuesday, May 5, 2015 10:16 AM
To: luohui20...@sina.com; Cheng, Hao; Olivier Girardot; user
Subject: RE: 回复:RE: 回复:Re: sparksql running slow while joining_2_tables.

You can use
Explain extended select ….

From: luohui20...@sina.commailto:luohui20...@sina.com 
[mailto:luohui20...@sina.com]
Sent: Tuesday, May 05, 2015 9:52 AM
To: Cheng, Hao; Olivier Girardot; user
Subject: 回复:RE: 回复:Re: sparksql running slow while joining_2_tables.


As I know broadcastjoin is automatically enabled by 
spark.sql.autoBroadcastJoinThreshold.

refer to 
http://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration-options



and how to check my app's physical plan,and others things like optimized 
plan,executable plan.etc



thanks



Thanksamp;Best regards!
罗辉 San.Luo

- 原始邮件 -
发件人:Cheng, Hao hao.ch...@intel.commailto:hao.ch...@intel.com
收件人:Cheng, Hao hao.ch...@intel.commailto:hao.ch...@intel.com, 
luohui20...@sina.commailto:luohui20...@sina.com 
luohui20...@sina.commailto:luohui20...@sina.com, Olivier Girardot 
ssab...@gmail.commailto:ssab...@gmail.com, user 
user@spark.apache.orgmailto:user@spark.apache.org
主题:RE: 回复:Re: sparksql running slow while joining_2_tables.
日期:2015年05月05日 08点38分

Or, have you ever try broadcast join?

From: Cheng, Hao [mailto:hao.ch...@intel.com]
Sent: Tuesday, May 5, 2015 8:33 AM
To: luohui20...@sina.commailto:luohui20...@sina.com; Olivier Girardot; user
Subject: RE: 回复:Re: sparksql running slow while joining 2 tables.

Can you print out the physical plan?

EXPLAIN SELECT xxx…

From: luohui20...@sina.commailto:luohui20...@sina.com 
[mailto:luohui20...@sina.com]
Sent: Monday, May 4, 2015 9:08 PM
To: Olivier Girardot; user
Subject: 回复:Re: sparksql running slow while joining 2 tables.


hi Olivier

spark1.3.1, with java1.8.0.45

and add 2 pics .

it seems like a GC issue. I also tried with different parameters like memory 
size of driverexecutor, memory fraction, java opts...

but this issue still happens.



Thanksamp;Best regards!
罗辉 San.Luo

- 原始邮件 -
发件人:Olivier Girardot ssab...@gmail.commailto:ssab...@gmail.com
收件人:luohui20...@sina.commailto:luohui20...@sina.com, user 
user@spark.apache.orgmailto:user@spark.apache.org
主题:Re: sparksql running slow while joining 2 tables.
日期:2015年05月04日 20点46分

Hi,
What is you Spark version ?

Regards,

Olivier.

Le lun. 4 mai 2015 à 11:03, luohui20...@sina.commailto:luohui20...@sina.com 
a écrit :

hi guys

when i am running a sql  like select 
a.namehttp://a.name,a.startpoint,a.endpoint, a.piece from db a join sample b 
on (a.namehttp://a.name = b.namehttp://b.name) where (b.startpoint  
a.startpoint + 25); I found sparksql running slow in minutes which may caused 
by very long GC and shuffle time.



   table db is created from a txt file size at 56mb while table sample 
sized at 26mb, both at small size.

   my spark cluster is a standalone  pseudo-distributed spark cluster with 
8g executor and 4g driver manager.

   any advises? thank you guys.





Thanksamp;Best regards!
罗辉 San.Luo

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org


RE: 回复:Re: sparksql running slow while joining 2 tables.

2015-05-04 Thread Cheng, Hao
Or, have you ever try broadcast join?

From: Cheng, Hao [mailto:hao.ch...@intel.com]
Sent: Tuesday, May 5, 2015 8:33 AM
To: luohui20...@sina.com; Olivier Girardot; user
Subject: RE: 回复:Re: sparksql running slow while joining 2 tables.

Can you print out the physical plan?

EXPLAIN SELECT xxx…

From: luohui20...@sina.commailto:luohui20...@sina.com 
[mailto:luohui20...@sina.com]
Sent: Monday, May 4, 2015 9:08 PM
To: Olivier Girardot; user
Subject: 回复:Re: sparksql running slow while joining 2 tables.


hi Olivier

spark1.3.1, with java1.8.0.45

and add 2 pics .

it seems like a GC issue. I also tried with different parameters like memory 
size of driverexecutor, memory fraction, java opts...

but this issue still happens.



Thanksamp;Best regards!
罗辉 San.Luo

- 原始邮件 -
发件人:Olivier Girardot ssab...@gmail.commailto:ssab...@gmail.com
收件人:luohui20...@sina.commailto:luohui20...@sina.com, user 
user@spark.apache.orgmailto:user@spark.apache.org
主题:Re: sparksql running slow while joining 2 tables.
日期:2015年05月04日 20点46分

Hi,
What is you Spark version ?

Regards,

Olivier.

Le lun. 4 mai 2015 à 11:03, luohui20...@sina.commailto:luohui20...@sina.com 
a écrit :

hi guys

when i am running a sql  like select 
a.namehttp://a.name,a.startpoint,a.endpoint, a.piece from db a join sample b 
on (a.namehttp://a.name = b.namehttp://b.name) where (b.startpoint  
a.startpoint + 25); I found sparksql running slow in minutes which may caused 
by very long GC and shuffle time.



   table db is created from a txt file size at 56mb while table sample 
sized at 26mb, both at small size.

   my spark cluster is a standalone  pseudo-distributed spark cluster with 
8g executor and 4g driver manager.

   any advises? thank you guys.





Thanksamp;Best regards!
罗辉 San.Luo

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org


RE: Re: problem with spark thrift server

2015-04-23 Thread Cheng, Hao
Hi, can you describe a little bit how the ThriftServer crashed, or steps to 
reproduce that? It’s probably a bug of ThriftServer.

Thanks,

From: guoqing0...@yahoo.com.hk [mailto:guoqing0...@yahoo.com.hk]
Sent: Friday, April 24, 2015 9:55 AM
To: Arush Kharbanda
Cc: user
Subject: Re: Re: problem with spark thrift server

Thanks for your reply , i would like to use Spark Thriftserver as JDBC SQL 
interface and the Spark application running on YARN . but the application was 
FINISHED when the Thriftserver crashed , all the cached table was lost .

Thriftserver start command:
start-thriftserver.sh --master yarn --executor-memory 20480m --executor-cores 2 
 --num-executors 20 --queue spark

My question is whether the Thriftserver has anyother more stable mode on YARN , 
like active standby in the Thriftserver .
Really appreciate for any suggestions and idea .
Thanks.

From: Arush Kharbandamailto:ar...@sigmoidanalytics.com
Date: 2015-04-23 18:40
To: guoqing0...@yahoo.com.hkmailto:guoqing0...@yahoo.com.hk
CC: usermailto:user@spark.apache.org
Subject: Re: problem with spark thrift server
Hi

What do you mean disable the driver? what are you trying to achieve.

Thanks
Arush

On Thu, Apr 23, 2015 at 12:29 PM, 
guoqing0...@yahoo.com.hkmailto:guoqing0...@yahoo.com.hk 
guoqing0...@yahoo.com.hkmailto:guoqing0...@yahoo.com.hk wrote:
Hi ,
I have a question about spark thrift server , i deployed the spark on yarn  and 
found if the spark driver disable , the spark application will be crashed on 
yarn.  appreciate for any suggestions and idea .

Thank you!



--

[Sigmoid Analytics]http://htmlsig.com/www.sigmoidanalytics.com

Arush Kharbanda || Technical Teamlead

ar...@sigmoidanalytics.commailto:ar...@sigmoidanalytics.com || 
www.sigmoidanalytics.comhttp://www.sigmoidanalytics.com/


RE: Spark Avarage

2015-04-06 Thread Cheng, Hao
The Dataframe API should be perfectly helpful in this case.  
https://spark.apache.org/docs/1.3.0/sql-programming-guide.html

Some code snippet will like:

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
weathersRDD.toDF.registerTempTable(weathers)
val results = sqlContext.sql(SELECT avg(minDeg), avg(maxDeg), avg(meanDeg) 
FROM weathers GROUP BY dayToMonth(dayOfDate)))
results.collect.foreach(println)


-Original Message-
From: barisak [mailto:baris.akg...@gmail.com] 
Sent: Monday, April 6, 2015 10:50 PM
To: user@spark.apache.org
Subject: Spark Avarage

Hi 

I have a class in above desc.

case class weatherCond(dayOfdate: String, minDeg: Int, maxDeg: Int, meanDeg:
Int)

I am reading the data from csv file and I put this data into weatherCond class 
with this code 

val weathersRDD = sc.textFile(weather.csv).map {
  line =
val Array(dayOfdate, minDeg, maxDeg, meanDeg) =
line.replaceAll(\,).trim.split(,)
weatherCond(dayOfdate, minDeg.toInt, maxDeg.toInt, meanDeg.toInt)
}

the question is ; how can I average the minDeg, maxDeg and meanDeg values for 
each month ; 

The data set example 

day, min, max , mean
2014-03-17,-3,5,5
2014-03-18,6,7,7
2014-03-19,6,14,10

result has to be (2014-03,   3,   8.6   ,7.3) -- (Average for 2014 - 03
)

Thanks





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Avarage-tp22391.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Spark SQL. Memory consumption

2015-04-02 Thread Cheng, Hao
Spark SQL tries to load the entire partition data and organized as In-Memory 
HashMaps, it does eat large memory if there are not many duplicated group by 
keys with large amount of records;

Couple of things you can try case by case:

·Increasing the partition numbers (the records count in each partition 
will reduce)

·Using large memory for executors (--executor-memory 120g).

·Reduce the SPARK COREs (to reduce the parallel running threads)

We are trying to approve that by using the sort-merge aggregation, which should 
reduce the memory utilization significantly, but that’s still on going.

Cheng Hao

From: Masf [mailto:masfwo...@gmail.com]
Sent: Thursday, April 2, 2015 11:47 PM
To: user@spark.apache.org
Subject: Spark SQL. Memory consumption


Hi.

I'm using Spark SQL 1.2. I have this query:

CREATE TABLE test_MA STORED AS PARQUET AS
 SELECT
field1
,field2
,field3
,field4
,field5
,COUNT(1) AS field6
,MAX(field7)
,MIN(field8)
,SUM(field9 / 100)
,COUNT(field10)
,SUM(IF(field11  -500, 1, 0))
,MAX(field12)
,SUM(IF(field13 = 1, 1, 0))
,SUM(IF(field13 in (3,4,5,6,10,104,105,107), 1, 0))
,SUM(IF(field13 = 2012 , 1, 0))
,SUM(IF(field13 in (0,100,101,102,103,106), 1, 0))

FROM table1 CL
   JOIN table2 netw
ON CL.field15 = netw.idhttp://netw.id
WHERE
AND field3 IS NOT NULL
AND field4 IS NOT NULL
AND field5 IS NOT NULL
GROUP BY field1,field2,field3,field4, netw.field5


spark-submit --master spark://master:7077 --driver-memory 20g --executor-memory 
60g --class GMain project_2.10-1.0.jar --driver-class-path 
'/opt/cloudera/parcels/CDH/lib/hive/lib/*' --driver-java-options 
'-Dspark.executor.extraClassPath=/opt/cloudera/parcels/CDH/lib/hive/lib/*' 2 
./error


Input data is 8GB in parquet format. Many times crash by GC overhead. I've 
fixed spark.shuffle.partitions to 1024 but my worker nodes (with 128GB 
RAM/node) is collapsed.

Is it a query too difficult to Spark SQL?
Would It be better to do it in Spark?
Am I doing something wrong?


Thanks
--


Regards.
Miguel Ángel


RE: Spark SQL udf(ScalaUdf) is very slow

2015-03-23 Thread Cheng, Hao
This is a very interesting issue, the root reason for the lower performance 
probably is, in Scala UDF, Spark SQL converts the data type from internal 
representation to Scala representation via Scala reflection recursively.

Can you create a Jira issue for tracking this? I can start to work on the 
improvement soon.

From: zzcclp [mailto:441586...@qq.com]
Sent: Monday, March 23, 2015 5:10 PM
To: user@spark.apache.org
Subject: Spark SQL udf(ScalaUdf) is very slow

My test env: 1. Spark version is 1.3.0 2. 3 node per 80G/20C 3. read 250G 
parquet files from hdfs Test case: 1. register floor func with command: 
sqlContext.udf.register(floor, (ts: Int) = ts - ts % 300), then run with sql 
select chan, floor(ts) as tt, sum(size) from qlogbase3 group by chan, 
floor(ts), it takes 17 minutes. == Physical Plan == Aggregate false, 
[chan#23015,PartialGroup#23500], [chan#23015,PartialGroup#23500 AS 
tt#23494,CombineSum(PartialSum#23499L) AS c2#23495L] Exchange (HashPartitioning 
[chan#23015,PartialGroup#23500], 54) Aggregate true, 
[chan#23015,scalaUDF(ts#23016)], [chan#23015,scalaUDF(ts#23016) AS 
PartialGroup#23500,SUM(size#23023L) AS PartialSum#23499L] PhysicalRDD 
[chan#23015,ts#23016,size#23023L], MapPartitionsRDD[115] at map at 
newParquet.scala:562 2. run with sql select chan, (ts - ts % 300) as tt, 
sum(size) from qlogbase3 group by chan, (ts - ts % 300), it takes only 5 
minutes. == Physical Plan == Aggregate false, [chan#23015,PartialGroup#23349], 
[chan#23015,PartialGroup#23349 AS tt#23343,CombineSum(PartialSum#23348L) AS 
c2#23344L] Exchange (HashPartitioning [chan#23015,PartialGroup#23349], 54) 
Aggregate true, [chan#23015,(ts#23016 - (ts#23016 % 300))], 
[chan#23015,(ts#23016 - (ts#23016 % 300)) AS 
PartialGroup#23349,SUM(size#23023L) AS PartialSum#23348L] PhysicalRDD 
[chan#23015,ts#23016,size#23023L], MapPartitionsRDD[83] at map at 
newParquet.scala:562 3. use HiveContext with sql select chan, floor((ts - ts % 
300)) as tt, sum(size) from qlogbase3 group by chan, floor((ts - ts % 300)) it 
takes only 5 minutes too. == Physical Plan == Aggregate false, 
[chan#23015,PartialGroup#23108L], [chan#23015,PartialGroup#23108L AS 
tt#23102L,CombineSum(PartialSum#23107L) AS _c2#23103L] Exchange 
(HashPartitioning [chan#23015,PartialGroup#23108L], 54) Aggregate true, 
[chan#23015,HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor((ts#23016
 - (ts#23016 % 300)))], 
[chan#23015,HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor((ts#23016
 - (ts#23016 % 300))) AS PartialGroup#23108L,SUM(size#23023L) AS 
PartialSum#23107L] PhysicalRDD [chan#23015,ts#23016,size#23023L], 
MapPartitionsRDD[28] at map at newParquet.scala:562 Why? ScalaUdf is so slow?? 
How to improve it?

View this message in context: Spark SQL udf(ScalaUdf) is very 
slowhttp://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-udf-ScalaUdf-is-very-slow-tp22185.html
Sent from the Apache Spark User List mailing list 
archivehttp://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.


RE: Spark SQL Self join with agreegate

2015-03-19 Thread Cheng, Hao
Not so sure your intention, but something like SELECT sum(val1), sum(val2) 
FROM table GROUP BY src, dest ?


-Original Message-
From: Shailesh Birari [mailto:sbirar...@gmail.com] 
Sent: Friday, March 20, 2015 9:31 AM
To: user@spark.apache.org
Subject: Spark SQL Self join with agreegate

Hello,

I want to use Spark sql to aggregate some columns of the data.
e.g. I have huge data with some columns as:
 time, src, dst, val1, val2

I want to calculate sum(val1) and sum(val2) for all unique pairs of src and dst.

I tried by forming SQL query
  SELECT a.time, a.src, a.dst, sum(a.val1), sum(a.val2) from table a, table b 
where a.src = b.src and a.dst = b.dst

I know I am doing something wrong here.

Can you please let me know is it doable and how ?

Thanks,
  Shailesh



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Self-join-with-agreegate-tp22151.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: [SQL] Elasticsearch-hadoop, exception creating temporary table

2015-03-18 Thread Cheng, Hao
Seems the elasticsearch-hadoop project was built with an old version of Spark, 
and then you upgraded the Spark version in execution env, as I know the 
StructField changed the definition in Spark 1.2, can you confirm the version 
problem first?

From: Todd Nist [mailto:tsind...@gmail.com]
Sent: Thursday, March 19, 2015 7:49 AM
To: user@spark.apache.org
Subject: [SQL] Elasticsearch-hadoop, exception creating temporary table



I am attempting to access ElasticSearch and expose it’s data through SparkSQL 
using the elasticsearch-hadoop project.  I am encountering the following 
exception when trying to create a Temporary table from a resource in 
ElasticSearch.:

15/03/18 07:54:46 INFO DAGScheduler: Job 2 finished: runJob at 
EsSparkSQL.scala:51, took 0.862184 s

Create Temporary Table for querying

Exception in thread main java.lang.NoSuchMethodError: 
org.apache.spark.sql.catalyst.types.StructField.init(Ljava/lang/String;Lorg/apache/spark/sql/catalyst/types/DataType;Z)V

at 
org.elasticsearch.spark.sql.MappingUtils$.org$elasticsearch$spark$sql$MappingUtils$$convertField(MappingUtils.scala:75)

at 
org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54)

at 
org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54)

at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)

at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)

at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)

at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)

at 
org.elasticsearch.spark.sql.MappingUtils$.convertToStruct(MappingUtils.scala:54)

at 
org.elasticsearch.spark.sql.MappingUtils$.discoverMapping(MappingUtils.scala:47)

at 
org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema$lzycompute(DefaultSource.scala:33)

at 
org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema(DefaultSource.scala:32)

at 
org.elasticsearch.spark.sql.ElasticsearchRelation.init(DefaultSource.scala:36)

at 
org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:20)

at org.apache.spark.sql.sources.CreateTableUsing.run(ddl.scala:103)

at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:67)

at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:67)

at org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:75)

at 
org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)

at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)

at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58)

at org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:108)

at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:303)

at 
io.radtech.elasticsearch.spark.ElasticSearchReadWrite$.main(ElasticSearchReadWrite.scala:87)

at 
io.radtech.elasticsearch.spark.ElasticSearchReadWrite.main(ElasticSearchReadWrite.scala)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)



I have loaded the “accounts.json” file from ElasticSearch into my ElasticSearch 
cluster. The mapping looks as follows:

radtech:elastic-search-work tnist$ curl -XGET 
'http://localhost:9200/bank/_mapping'

{bank:{mappings:{account:{properties:{account_number:{type:long},address:{type:string},age:{type:long},balance:{type:long},city:{type:string},email:{type:string},employer:{type:string},firstname:{type:string},gender:{type:string},lastname:{type:string},state:{type:string}}

I can read the data just fine doing the following:

import java.io.File



import scala.collection.JavaConversions._



import org.apache.spark.{SparkConf, SparkContext}

import org.apache.spark.SparkContext._

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.{SchemaRDD,SQLContext}



// ES imports

import org.elasticsearch.spark._

import org.elasticsearch.spark.sql._



import io.radtech.spark.utils.{Settings, Spark, ElasticSearch}



object ElasticSearchReadWrite {



  /**

   * Spark specific configuration

   */

  def sparkInit(): SparkContext = {

val conf = new SparkConf().setAppName(Spark.AppName).setMaster(Spark.Master)

conf.set(es.nodes, ElasticSearch.Nodes)

conf.set(es.port, ElasticSearch.HttpPort.toString())

conf.set(es.index.auto.create, true);

conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer);

conf.set(spark.executor.memory,1g)

conf.set(spark.kryoserializer.buffer.mb,256)



val sparkContext = new SparkContext(conf)



sparkContext

  }



  def main(args: Array[String]) {



val sc = sparkInit



val sqlContext = new SQLContext(sc)

import sqlContext._



val start = System.currentTimeMillis()



/*

 

RE: Re: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient

2015-03-16 Thread Cheng, Hao
Or you need to specify the jars either in configuration or

bin/spark-sql --jars  mysql-connector-xx.jar

From: fightf...@163.com [mailto:fightf...@163.com]
Sent: Monday, March 16, 2015 2:04 PM
To: sandeep vura; Ted Yu
Cc: user
Subject: Re: Re: Unable to instantiate 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient

Hi, Sandeep

From your error log I can see that jdbc driver not found in your classpath. Did 
you had your mysql
jdbc jar correctly configured in the specific classpath? Can you establish a 
hive jdbc connection using
the url : jdbc:hive2://localhost:1 ?

Thanks,
Sun.


fightf...@163.commailto:fightf...@163.com

From: sandeep vuramailto:sandeepv...@gmail.com
Date: 2015-03-16 14:13
To: Ted Yumailto:yuzhih...@gmail.com
CC: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: Unable to instantiate 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient
Hi Ted,

Did you find any solution.

Thanks
Sandeep

On Mon, Mar 16, 2015 at 10:44 AM, sandeep vura 
sandeepv...@gmail.commailto:sandeepv...@gmail.com wrote:
Hi Ted,

I am using Spark -1.2.1 and hive -0.13.1 you can check my configuration files 
attached below.


ERROR IN SPARK

n: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient
at 
org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.jav   
 
a:346)
at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkS   
 
QLCLIDriver.scala:101)
at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQ   
 
LCLIDriver.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.   
 
java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcces   
 
sorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:622)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.RuntimeException: Unable to instantiate 
org.apache.hadoop.h 
   ive.metastore.HiveMetaStoreClient
at 
org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStore   
 
Utils.java:1412)
at 
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.init(Retry   
 
ingMetaStoreClient.java:62)
at 
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(Ret   
 
ryingMetaStoreClient.java:72)
at 
org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.ja   
 
va:2453)
at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:2465)
at 
org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.jav   
 
a:340)
... 9 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstruct   
 
orAccessorImpl.java:57)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingC   
 
onstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:534)
at 
org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStore   
 
Utils.java:1410)
... 14 more
Caused by: javax.jdo.JDOFatalInternalException: Error creating transactional 
con 
   nection factory
NestedThrowables:
java.lang.reflect.InvocationTargetException
at 

RE: Re: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient

2015-03-16 Thread Cheng, Hao
It doesn’t take effect if just putting jar files under the lib-managed/jars 
folder, you need to put that under class path explicitly.

From: sandeep vura [mailto:sandeepv...@gmail.com]
Sent: Monday, March 16, 2015 2:21 PM
To: Cheng, Hao
Cc: fightf...@163.com; Ted Yu; user
Subject: Re: Re: Unable to instantiate 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient

I have already added mysql-connector-xx.jar file in spark/lib-managed/jars 
directory.

Regards,
Sandeep.v

On Mon, Mar 16, 2015 at 11:48 AM, Cheng, Hao 
hao.ch...@intel.commailto:hao.ch...@intel.com wrote:
Or you need to specify the jars either in configuration or

bin/spark-sql --jars  mysql-connector-xx.jar

From: fightf...@163.commailto:fightf...@163.com 
[mailto:fightf...@163.commailto:fightf...@163.com]
Sent: Monday, March 16, 2015 2:04 PM
To: sandeep vura; Ted Yu
Cc: user
Subject: Re: Re: Unable to instantiate 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient

Hi, Sandeep

From your error log I can see that jdbc driver not found in your classpath. Did 
you had your mysql
jdbc jar correctly configured in the specific classpath? Can you establish a 
hive jdbc connection using
the url : jdbc:hive2://localhost:1 ?

Thanks,
Sun.


fightf...@163.commailto:fightf...@163.com

From: sandeep vuramailto:sandeepv...@gmail.com
Date: 2015-03-16 14:13
To: Ted Yumailto:yuzhih...@gmail.com
CC: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: Unable to instantiate 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient
Hi Ted,

Did you find any solution.

Thanks
Sandeep

On Mon, Mar 16, 2015 at 10:44 AM, sandeep vura 
sandeepv...@gmail.commailto:sandeepv...@gmail.com wrote:
Hi Ted,

I am using Spark -1.2.1 and hive -0.13.1 you can check my configuration files 
attached below.


ERROR IN SPARK

n: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient
at 
org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.jav   
 
a:346)
at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkS   
 
QLCLIDriver.scala:101)
at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQ   
 
LCLIDriver.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.   
 
java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcces   
 
sorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:622)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.RuntimeException: Unable to instantiate 
org.apache.hadoop.h 
   ive.metastore.HiveMetaStoreClient
at 
org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStore   
 
Utils.java:1412)
at 
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.init(Retry   
 
ingMetaStoreClient.java:62)
at 
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(Ret   
 
ryingMetaStoreClient.java:72)
at 
org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.ja   
 
va:2453)
at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:2465)
at 
org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.jav   
 
a:340)
... 9 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstruct   
 
orAccessorImpl.java:57)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingC

RE: Spark SQL using Hive metastore

2015-03-11 Thread Cheng, Hao
Hi, Robert

Spark SQL currently only support Hive 0.12.0(need to re-compile the package) 
and 0.13.1(by default), I am not so sure if it supports the Hive 0.14 metastore 
service as backend.  Another way you can try is configure the 
$SPARK_HOME/conf/hive-site.xml to access the remote metastore database 
directly(javax.jdo.option.ConnectionURL” and 
“javax.jdo.option.ConnectionDriverName” required); And then you can start the 
Spark SQL like:
bin/spark-sql --jars lib/mysql-connector-xxx.jar

For the “SnappyError”, seems you didn’t configure the snappy native lib well 
for Spark, can you check the configuration file of 
$SPARK_HOME/conf/spark-xxx.conf ?

Cheng Hao

From: Grandl Robert [mailto:rgra...@yahoo.com.INVALID]
Sent: Thursday, March 12, 2015 5:07 AM
To: user@spark.apache.org
Subject: Spark SQL using Hive metastore

Hi guys,

I am a newbie in running Spark SQL / Spark. My goal is to run some TPC-H 
queries atop Spark SQL using Hive metastore.

It looks like spark 1.2.1 release has Spark SQL / Hive support. However, I am 
not able to fully connect all the dots.

I did the following:
1. Copied hive-site.xml from hive to spark/conf
2. Copied mysql connector to spark/lib
3. I have started hive metastore service: hive --service metastore
3. I have started ./bin/spark-sql
4. I typed: spark-sql show tables;
However, the following error was thrown:
 Job 0 failed: collect at SparkPlan.scala:84, took 0.241788 s
15/03/11 15:02:35 ERROR SparkSQLDriver: Failed in [show tables]
org.apache.spark.SparkException: Job aborted due to stage failure: Task 
serialization failed: org.xerial.snappy.SnappyError: 
[FAILED_TO_LOAD_NATIVE_LIBRARY] no native library is found for os.name=Linux 
and os.arch=aarch64

Do  you know what I am doing wrong ? I mention that I have hive-0.14 instead of 
hive-0.13.

And another question: What is the right command to run sql queries with spark 
sql using hive metastore ?

Thanks,
Robert



RE: Does any one know how to deploy a custom UDAF jar file in SparkSQL?

2015-03-10 Thread Cheng, Hao
You can add the additional jar when submitting your job, something like:

./bin/spark-submit --jars xx.jar …

More options can be listed by just typing ./bin/spark-submit

From: shahab [mailto:shahab.mok...@gmail.com]
Sent: Tuesday, March 10, 2015 8:48 PM
To: user@spark.apache.org
Subject: Does any one know how to deploy a custom UDAF jar file in SparkSQL?

Hi,

Does any one know how to deploy a custom UDAF jar file in SparkSQL? Where 
should i put the jar file so SparkSQL can pick it up and make it accessible for 
SparkSQL applications?
I do not use spark-shell instead I want to use it in an spark application.

best,
/Shahab


RE: Registering custom UDAFs with HiveConetxt in SparkSQL, how?

2015-03-10 Thread Cheng, Hao
Currently, Spark SQL doesn’t provide interface for developing the custom UDTF, 
but it can work seamless with Hive UDTF.

I am working on the UDTF refactoring for Spark SQL, hopefully will provide an 
Hive independent UDTF soon after that.

From: shahab [mailto:shahab.mok...@gmail.com]
Sent: Tuesday, March 10, 2015 5:44 PM
To: user@spark.apache.org
Subject: Registering custom UDAFs with HiveConetxt in SparkSQL, how?

Hi,

I need o develop couple of UDAFs and use them in the SparkSQL. While UDFs can 
be registered as a function in HiveContext, I could not find any documentation 
of how UDAFs can be registered in the HiveContext?? so far what I have found is 
to make a JAR file, out of developed UDAF class, and then deploy the JAR file 
to SparkSQL .

But is there any way to avoid deploying the jar file and register it 
programmatically?


best,
/Shahab


RE: Registering custom UDAFs with HiveConetxt in SparkSQL, how?

2015-03-10 Thread Cheng, Hao
Oh, sorry, my bad, currently Spark SQL doesn’t provide the user interface for 
UDAF, but it can work seamlessly with Hive UDAF (via HiveContext).

I am also working on the UDAF interface refactoring, after that we can provide 
the custom interface for extension.

https://github.com/apache/spark/pull/3247


From: shahab [mailto:shahab.mok...@gmail.com]
Sent: Wednesday, March 11, 2015 1:44 AM
To: Cheng, Hao
Cc: user@spark.apache.org
Subject: Re: Registering custom UDAFs with HiveConetxt in SparkSQL, how?

Thanks Hao,
But my question concerns UDAF (user defined aggregation function ) not UDTF( 
user defined type function ).
I appreciate if you could point me to some starting point on UDAF development 
in Spark.

Thanks
Shahab

On Tuesday, March 10, 2015, Cheng, Hao 
hao.ch...@intel.commailto:hao.ch...@intel.com wrote:
Currently, Spark SQL doesn’t provide interface for developing the custom UDTF, 
but it can work seamless with Hive UDTF.

I am working on the UDTF refactoring for Spark SQL, hopefully will provide an 
Hive independent UDTF soon after that.

From: shahab 
[mailto:shahab.mok...@gmail.comjavascript:_e(%7B%7D,'cvml','shahab.mok...@gmail.com');]
Sent: Tuesday, March 10, 2015 5:44 PM
To: user@spark.apache.orgjavascript:_e(%7B%7D,'cvml','user@spark.apache.org');
Subject: Registering custom UDAFs with HiveConetxt in SparkSQL, how?

Hi,

I need o develop couple of UDAFs and use them in the SparkSQL. While UDFs can 
be registered as a function in HiveContext, I could not find any documentation 
of how UDAFs can be registered in the HiveContext?? so far what I have found is 
to make a JAR file, out of developed UDAF class, and then deploy the JAR file 
to SparkSQL .

But is there any way to avoid deploying the jar file and register it 
programmatically?


best,
/Shahab


RE: [SparkSQL] Reuse HiveContext to different Hive warehouse?

2015-03-10 Thread Cheng, Hao
I am not so sure if Hive supports change the metastore after initialized, I 
guess not. Spark SQL totally rely on Hive Metastore in HiveContext, probably 
that's why it doesn't work as expected for Q1.

BTW, in most of cases, people configure the metastore settings in 
hive-site.xml, and will not change that since then, is there any reason that 
you want to change that in runtime?

For Q2, probably something wrong in configuration, seems the HDFS run into the 
pseudo/single node mode, can you double check that? Or can you run the DDL 
(like create a table) from the spark shell with HiveContext?

From: Haopu Wang [mailto:hw...@qilinsoft.com]
Sent: Tuesday, March 10, 2015 6:38 PM
To: user; d...@spark.apache.org
Subject: [SparkSQL] Reuse HiveContext to different Hive warehouse?


I'm using Spark 1.3.0 RC3 build with Hive support.



In Spark Shell, I want to reuse the HiveContext instance to different warehouse 
locations. Below are the steps for my test (Assume I have loaded a file into 
table src).



==

15/03/10 18:22:59 INFO SparkILoop: Created sql context (with Hive support)..

SQL context available as sqlContext.

scala sqlContext.sql(SET hive.metastore.warehouse.dir=/test/w)

scala sqlContext.sql(SELECT * from src).saveAsTable(table1)

scala sqlContext.sql(SET hive.metastore.warehouse.dir=/test/w2)

scala sqlContext.sql(SELECT * from src).saveAsTable(table2)

==

After these steps, the tables are stored in /test/w only. I expect table2 
to be stored in /test/w2 folder.



Another question is: if I set hive.metastore.warehouse.dir to a HDFS folder, 
I cannot use saveAsTable()? Is this by design? Exception stack trace is below:

==

15/03/10 18:35:28 INFO BlockManagerMaster: Updated info of block 
broadcast_0_piece0

15/03/10 18:35:28 INFO SparkContext: Created broadcast 0 from broadcast at 
TableReader.scala:74

java.lang.IllegalArgumentException: Wrong FS: 
hdfs://server:8020/space/warehouse/table2, expected: file:///file:///\\

at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:643)

at org.apache.hadoop.fs.FileSystem.makeQualified(FileSystem.java:463)

at 
org.apache.hadoop.fs.FilterFileSystem.makeQualified(FilterFileSystem.java:118)

at 
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.apply(newParquet.scala:252)

at 
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.apply(newParquet.scala:251)

at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

at scala.collection.immutable.List.foreach(List.scala:318)

at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)

at scala.collection.AbstractTraversable.map(Traversable.scala:105)

at 
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.refresh(newParquet.scala:251)

at 
org.apache.spark.sql.parquet.ParquetRelation2.init(newParquet.scala:370)

at 
org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:96)

at 
org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:125)

at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:308)

at 
org.apache.spark.sql.hive.execution.CreateMetastoreDataSourceAsSelect.run(commands.scala:217)

at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:55)

at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:55)

at 
org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:65)

at 
org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:1088)

at 
org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:1088)

at org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:1048)

at org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:998)

at org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:964)

at org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:942)

at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:20)

at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:25)

at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:27)

at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:29)

at $iwC$$iwC$$iwC$$iwC.init(console:31)

at $iwC$$iwC$$iwC.init(console:33)

at $iwC$$iwC.init(console:35)

at $iwC.init(console:37)

at init(console:39)



Thank you very much!




RE: SQL with Spark Streaming

2015-03-10 Thread Cheng, Hao
Intel has a prototype for doing this, SaiSai and Jason are the authors. 
Probably you can ask them for some materials.

From: Mohit Anchlia [mailto:mohitanch...@gmail.com]
Sent: Wednesday, March 11, 2015 8:12 AM
To: user@spark.apache.org
Subject: SQL with Spark Streaming

Does Spark Streaming also supports SQLs? Something like how Esper does CEP.


RE: Connection PHP application to Spark Sql thrift server

2015-03-05 Thread Cheng, Hao
Can you query upon Hive? Let's confirm if it's a bug of SparkSQL in your PHP 
code first.

-Original Message-
From: fanooos [mailto:dev.fano...@gmail.com] 
Sent: Thursday, March 5, 2015 4:57 PM
To: user@spark.apache.org
Subject: Connection PHP application to Spark Sql thrift server

We have two applications need to connect to Spark Sql thrift server. 

The first application is developed in java. Having spark sql thrift server 
running, we following the steps in  this link 
https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients#HiveServer2Clients-JDBC
and the application connected smoothly without any problem. 


The second application is developed in PHP. We followed the steps provided in  
this link 
https://cwiki.apache.org/confluence/display/Hive/HiveClient#HiveClient-PHP
.  When hitting the php script, the spark sql thrift server throws this 
exception. 

15/03/05 11:53:19 ERROR TThreadPoolServer: Error occurred during processing of 
message.
java.lang.RuntimeException: org.apache.thrift.transport.TTransportException
at
org.apache.thrift.transport.TSaslServerTransport$Factory.getTransport(TSaslServerTransport.java:219)
at
org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:189)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Caused by: org.apache.thrift.transport.TTransportException
at
org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
at
org.apache.thrift.transport.TSaslTransport.receiveSaslMessage(TSaslTransport.java:182)
at
org.apache.thrift.transport.TSaslServerTransport.handleSaslStartMessage(TSaslServerTransport.java:125)
at 
org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:253)
at
org.apache.thrift.transport.TSaslServerTransport.open(TSaslServerTransport.java:41)
at
org.apache.thrift.transport.TSaslServerTransport$Factory.getTransport(TSaslServerTransport.java:216)
... 4 more


I searched a lot about this exception but I can not figure out what is the 
problem.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Connection-PHP-application-to-Spark-Sql-thrift-server-tp21925.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Does SparkSQL support ..... having count (fieldname) in SQL statement?

2015-03-04 Thread Cheng, Hao
I’ve tried with latest code, seems it works, which version are you using Shahab?

From: yana [mailto:yana.kadiy...@gmail.com]
Sent: Wednesday, March 4, 2015 8:47 PM
To: shahab; user@spark.apache.org
Subject: RE: Does SparkSQL support . having count (fieldname) in SQL 
statement?

I think the problem is that you are using an alias in the having clause. I am 
not able to try just now but see if HAVING count (*) 2 works ( ie dont use cnt)


Sent on the new Sprint Network from my Samsung Galaxy S®4.

 Original message 
From: shahab
Date:03/04/2015 7:22 AM (GMT-05:00)
To: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Does SparkSQL support . having count (fieldname) in SQL 
statement?

Hi,

It seems that SparkSQL, even the HiveContext, does not support SQL statements 
like :   SELECT category, count(1) AS cnt FROM products GROUP BY category 
HAVING cnt  10;

I get this exception:

Error: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: 
Unresolved attributes: CAST(('cnt  2), BooleanType), tree:


I couldn't find anywhere is documentation whether having keyword is not 
supported ?
If this is the case, what would be the work around? using two nested select 
statements?

best,
/Shahab


RE: Supporting Hive features in Spark SQL Thrift JDBC server

2015-03-03 Thread Cheng, Hao
Can you provide the detailed failure call stack?

From: shahab [mailto:shahab.mok...@gmail.com]
Sent: Tuesday, March 3, 2015 3:52 PM
To: user@spark.apache.org
Subject: Supporting Hive features in Spark SQL Thrift JDBC server

Hi,

According to Spark SQL documentation, Spark SQL supports the vast majority 
of Hive features, such as  User Defined Functions( UDF) , and one of these 
UFDs is current_date() function, which should be supported.

However, i get error when I am using this UDF in my SQL query. There are couple 
of other UDFs which cause similar error.

Am I missing something in my JDBC server ?

/Shahab


RE: SparkSQL, executing an OR

2015-03-03 Thread Cheng, Hao
Using where('age =10  'age =4) instead.

-Original Message-
From: Guillermo Ortiz [mailto:konstt2...@gmail.com] 
Sent: Tuesday, March 3, 2015 5:14 PM
To: user
Subject: SparkSQL, executing an OR

I'm trying to execute a query with Spark.

(Example from the Spark Documentation)
val teenagers = people.where('age = 10).where('age = 19).select('name)

Is it possible to execute an OR with this syntax?
val teenagers = people.where('age = 10 'or 'age = 4).where('age =
19).select('name)

I have tried different ways and I didn't get it.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org



RE: Supporting Hive features in Spark SQL Thrift JDBC server

2015-03-03 Thread Cheng, Hao
Hive UDF are only applicable for HiveContext and its subclass instance, is the 
CassandraAwareSQLContext a direct sub class of HiveContext or SQLContext?

From: shahab [mailto:shahab.mok...@gmail.com]
Sent: Tuesday, March 3, 2015 5:10 PM
To: Cheng, Hao
Cc: user@spark.apache.org
Subject: Re: Supporting Hive features in Spark SQL Thrift JDBC server

  val sc: SparkContext = new SparkContext(conf)
  val sqlCassContext = new CassandraAwareSQLContext(sc)  // I used some 
Calliope Cassandra Spark connector
val rdd : SchemaRDD  = sqlCassContext.sql(select * from db.profile  )
rdd.cache
rdd.registerTempTable(profile)
 rdd.first  //enforce caching
 val q = select  from_unixtime(floor(createdAt/1000)) from profile where 
sampling_bucket=0 
 val rdd2 = rdd.sqlContext.sql(q )
 println (Result:  + rdd2.first)

And I get the following  errors:
xception in thread main 
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved 
attributes: 'from_unixtime('floor(('createdAt / 1000))) AS c0#7, tree:
Project ['from_unixtime('floor(('createdAt / 1000))) AS c0#7]
 Filter (sampling_bucket#10 = 0)
  Subquery profile
   Project 
[company#8,bucket#9,sampling_bucket#10,profileid#11,createdat#12L,modifiedat#13L,version#14]
CassandraRelation localhost, 9042, 9160, normaldb_sampling, profile, 
org.apache.spark.sql.CassandraAwareSQLContext@778b692dmailto:org.apache.spark.sql.CassandraAwareSQLContext@778b692d,
 None, None, false, Some(Configuration: core-default.xml, core-site.xml, 
mapred-default.xml, mapred-site.xml)

at 
org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$apply$1.applyOrElse(Analyzer.scala:72)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$apply$1.applyOrElse(Analyzer.scala:70)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:165)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:183)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at 
scala.collection.TraversableOnce$class.tohttp://class.to(TraversableOnce.scala:273)
at 
scala.collection.AbstractIterator.tohttp://scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:212)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:168)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:156)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:70)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:68)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59)
at 
scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
at 
scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:34)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:51)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51)
at 
org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:402)
at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:402)
at 
org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan$lzycompute(SQLContext.scala:403)
at 
org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan(SQLContext.scala:403)
at 
org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:407)
at 
org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:405)
at 
org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:411)
at 
org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:411)
at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438)
at org.apache.spark.sql.SchemaRDD.take(SchemaRDD.scala:440)
at org.apache.spark.sql.SchemaRDD.take

RE: insert Hive table with RDD

2015-03-03 Thread Cheng, Hao
Using the SchemaRDD / DataFrame API via HiveContext

Assume you're using the latest code, something probably like:

val hc = new HiveContext(sc)
import hc.implicits._
existedRdd.toDF().insertInto(hivetable)
or

existedRdd.toDF().registerTempTable(mydata)
hc.sql(insert into hivetable as select xxx from mydata)



-Original Message-
From: patcharee [mailto:patcharee.thong...@uni.no] 
Sent: Tuesday, March 3, 2015 7:09 PM
To: user@spark.apache.org
Subject: insert Hive table with RDD

Hi,

How can I insert an existing hive table with an RDD containing my data? 
Any examples?

Best,
Patcharee

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org



RE: java.lang.IncompatibleClassChangeError when using PrunedFilteredScan

2015-03-03 Thread Cheng, Hao
As the call stack shows, the mongodb connector is not compatible with the Spark 
SQL Data Source interface. The latest Data Source API is changed since 1.2, 
probably you need to confirm which spark version the MongoDB Connector build 
against.

By the way, a well format call stack will be more helpful for people reading.

From: taoewang [mailto:taoew...@sequoiadb.com]
Sent: Tuesday, March 3, 2015 7:39 PM
To: user@spark.apache.org
Subject: java.lang.IncompatibleClassChangeError when using PrunedFilteredScan



Hi,



I’m trying to build the stratio spark-mongodb connector and got error 
java.lang.IncompatibleClassChangeError: class 
com.stratio.deep.mongodb.MongodbRelation has interface 
org.apache.spark.sql.sources.PrunedFilteredScan as super class” when trying to 
create a table using the driver:



scala import org.apache.spark.sql.SQLContext

val sqlContext = new SQLContext(sc)import org.apache.spark.sql.SQLContext



import org.apache.spark.sql.SQLContext



scala val sqlContext = new SQLContext(sc)

sqlContext: org.apache.spark.sql.SQLContext = 
org.apache.spark.sql.SQLContext@37050c15mailto:org.apache.spark.sql.SQLContext@37050c15



scala import com.stratio.deep.mongodb

import com.stratio.deep.mongodb

import com.stratio.deep.mongodb



scala sqlContext.sql(CREATE TEMPORARY TABLE students_table USING 
com.stratio.deep.mongodb OPTIONS (host 'host:port', database 'highschool', 
collection 'students'))

sqlContext.sql(CREATE TEMPORARY TABLE students_table USING com.stratio.d

eep.mongodb OPTIONS (host 'host:port', database 'highschool', collection 'studen

ts'))

java.lang.IncompatibleClassChangeError: class 
com.stratio.deep.mongodb.MongodbRelation has interface 
org.apache.spark.sql.sources.PrunedFilteredScan as super class

at java.lang.ClassLoader.defineClass1(Native Method)

at java.lang.ClassLoader.defineClass(ClassLoader.java:800)

at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)

at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)

at java.net.URLClassLoader.access$100(URLClassLoader.java:71)

at java.net.URLClassLoader$1.run(URLClassLoader.java:361)

at java.net.URLClassLoader$1.run(URLClassLoader.java:355)

at java.security.AccessController.doPrivileged(Native Method)

at java.net.URLClassLoader.findClass(URLClassLoader.java:354)

at java.lang.ClassLoader.loadClass(ClassLoader.java:425)

at java.lang.ClassLoader.loadClass(ClassLoader.java:358)

at java.lang.Class.getDeclaredConstructors0(Native Method)

at java.lang.Class.privateGetDeclaredConstructors(Class.java:2585)

at java.lang.Class.getConstructor0(Class.java:2885)

at java.lang.Class.newInstance(Class.java:350)

at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:288)

at org.apache.spark.sql.sources.CreateTempTableUsing.run(ddl.scala:376)

at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:55)

at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:55)



The code failed at line 288 in ddl.scala:

  def apply(

  sqlContext: SQLContext,

  userSpecifiedSchema: Option[StructType],

  provider: String,

  options: Map[String, String]): ResolvedDataSource = {

val clazz: Class[_] = lookupDataSource(provider)

val relation = userSpecifiedSchema match {

  case Some(schema: StructType) = clazz.newInstance() match {

case dataSource: SchemaRelationProvider =

  dataSource.createRelation(sqlContext, new 
CaseInsensitiveMap(options), schema)

case dataSource: org.apache.spark.sql.sources.RelationProvider =

  sys.error(s${clazz.getCanonicalName} does not allow user-specified 
schemas.)

  }



  case None = clazz.newInstance() match {  ——— failed here

case dataSource: RelationProvider =

  dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options))

case dataSource: org.apache.spark.sql.sources.SchemaRelationProvider =

  sys.error(sA schema needs to be specified when using 
${clazz.getCanonicalName}.)

  }

}

new ResolvedDataSource(clazz, relation)

  }



The “clazz” here is com.stratio.deep.mongodb.DefaultSource, which extends 
RelationProvider:

class DefaultSource extends RelationProvider {



  override def createRelation(

sqlContext: SQLContext,

parameters: Map[String, String]): BaseRelation = {



/** We will assume hosts are provided like 'host:port,host2:port2,...'*/

val host = parameters

  .getOrElse(Host, notFound[String](Host))

  .split(,).toList



val database = parameters.getOrElse(Database, notFound(Database))



val collection = parameters.getOrElse(Collection, notFound(Collection))



val samplingRatio = parameters

  .get(SamplingRatio)

  .map(_.toDouble).getOrElse(DefaultSamplingRatio)



MongodbRelation(

  MongodbConfigBuilder()

.set(Host,host)

.set(Database,database)


RE: Spark SQL Thrift Server start exception : java.lang.ClassNotFoundException: org.datanucleus.api.jdo.JDOPersistenceManagerFactory

2015-03-03 Thread Cheng, Hao
Which version / distribution are you using? Please references this blog that 
Felix C posted if you’re running on CDH.
http://eradiating.wordpress.com/2015/02/22/getting-hivecontext-to-work-in-cdh/

Or you may also need to download the datanucleus*.jar files try to add the 
option of “--jars” while starting the spark shell.

From: Anusha Shamanur [mailto:anushas...@gmail.com]
Sent: Wednesday, March 4, 2015 5:07 AM
To: Cheng, Hao
Subject: Re: Spark SQL Thrift Server start exception : 
java.lang.ClassNotFoundException: 
org.datanucleus.api.jdo.JDOPersistenceManagerFactory

Hi,

I am getting the same error. There is no lib folder in my $SPARK_HOME. But I 
included these jars while calling spark-shell.

Now, I get this:

Caused by: org.datanucleus.exceptions.ClassNotResolvedException: Class 
org.datanucleus.store.rdbms.RDBMSStoreManager was not found in the CLASSPATH. 
Please check your specification and your CLASSPATH.

   at 
org.datanucleus.ClassLoaderResolverImpl.classForName(ClassLoaderResolverImpl.java:218)



How do I solve this?

On Mon, Mar 2, 2015 at 11:04 PM, Cheng, Hao 
hao.ch...@intel.commailto:hao.ch...@intel.com wrote:
Copy those jars into the $SPARK_HOME/lib/

datanucleus-api-jdo-3.2.6.jar
datanucleus-core-3.2.10.jar
datanucleus-rdbms-3.2.9.jar

see https://github.com/apache/spark/blob/master/bin/compute-classpath.sh#L120


-Original Message-
From: fanooos [mailto:dev.fano...@gmail.commailto:dev.fano...@gmail.com]
Sent: Tuesday, March 3, 2015 2:50 PM
To: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Spark SQL Thrift Server start exception : 
java.lang.ClassNotFoundException: 
org.datanucleus.api.jdo.JDOPersistenceManagerFactory

I have installed a hadoop cluster (version : 2.6.0), apache spark (version :
1.2.1 preBuilt for hadoop 2.4 and later), and hive (version 1.0.0).

When I try to start the spark sql thrift server I am getting the following 
exception.

Exception in thread main java.lang.RuntimeException:
java.lang.RuntimeException: Unable to instantiate 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient
at
org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:346)
at
org.apache.spark.sql.hive.HiveContext$$anonfun$4.apply(HiveContext.scala:235)
at
org.apache.spark.sql.hive.HiveContext$$anonfun$4.apply(HiveContext.scala:231)
at scala.Option.orElse(Option.scala:257)
at
org.apache.spark.sql.hive.HiveContext.x$3$lzycompute(HiveContext.scala:231)
at org.apache.spark.sql.hive.HiveContext.x$3(HiveContext.scala:229)
at
org.apache.spark.sql.hive.HiveContext.hiveconf$lzycompute(HiveContext.scala:229)
at org.apache.spark.sql.hive.HiveContext.hiveconf(HiveContext.scala:229)
at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:292)
at 
org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:276)
at org.apache.spark.sql.hive.HiveContext.setConf(HiveContext.scala:248)
at org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:91)
at org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:90)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at org.apache.spark.sql.SQLContext.init(SQLContext.scala:90)
at org.apache.spark.sql.hive.HiveContext.init(HiveContext.scala:72)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.init(SparkSQLEnv.scala:51)
at
org.apache.spark.sql.hive.thriftserver.HiveThriftServer2$.main(HiveThriftServer2.scala:56)
at
org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.main(HiveThriftServer2.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.RuntimeException: Unable to instantiate 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient
at
org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1412)
at
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.init(RetryingMetaStoreClient.java:62)
at
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:72)
at
org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:2453)
at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:2465)
at
org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:340)
... 26 more

RE: Is SQLContext thread-safe?

2015-03-02 Thread Cheng, Hao
Currently, each SQLContext has its own configuration, e.g. shuffle partition 
number, codegen etc. and it will be shared among the multiple threads running.

We actually has some internal discussions on this, probably will provide a 
thread local configuration in the future for a single SQLContext instance.

-Original Message-
From: Haopu Wang [mailto:hw...@qilinsoft.com] 
Sent: Tuesday, March 3, 2015 7:56 AM
To: Cheng, Hao; user
Subject: RE: Is SQLContext thread-safe?

Thanks for the response.

Then I have another question: when will we want to create multiple SQLContext 
instances from the same SparkContext? What's the benefit?

-Original Message-
From: Cheng, Hao [mailto:hao.ch...@intel.com]
Sent: Monday, March 02, 2015 9:05 PM
To: Haopu Wang; user
Subject: RE: Is SQLContext thread-safe?

Yes it is thread safe, at least it's supposed to be.

-Original Message-
From: Haopu Wang [mailto:hw...@qilinsoft.com]
Sent: Monday, March 2, 2015 4:43 PM
To: user
Subject: Is SQLContext thread-safe?

Hi, is it safe to use the same SQLContext to do Select operations in different 
threads at the same time? Thank you very much!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Spark SQL Thrift Server start exception : java.lang.ClassNotFoundException: org.datanucleus.api.jdo.JDOPersistenceManagerFactory

2015-03-02 Thread Cheng, Hao
Copy those jars into the $SPARK_HOME/lib/

datanucleus-api-jdo-3.2.6.jar
datanucleus-core-3.2.10.jar
datanucleus-rdbms-3.2.9.jar

see https://github.com/apache/spark/blob/master/bin/compute-classpath.sh#L120


-Original Message-
From: fanooos [mailto:dev.fano...@gmail.com] 
Sent: Tuesday, March 3, 2015 2:50 PM
To: user@spark.apache.org
Subject: Spark SQL Thrift Server start exception : 
java.lang.ClassNotFoundException: 
org.datanucleus.api.jdo.JDOPersistenceManagerFactory

I have installed a hadoop cluster (version : 2.6.0), apache spark (version :
1.2.1 preBuilt for hadoop 2.4 and later), and hive (version 1.0.0). 

When I try to start the spark sql thrift server I am getting the following 
exception. 

Exception in thread main java.lang.RuntimeException:
java.lang.RuntimeException: Unable to instantiate 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient
at
org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:346)
at
org.apache.spark.sql.hive.HiveContext$$anonfun$4.apply(HiveContext.scala:235)
at
org.apache.spark.sql.hive.HiveContext$$anonfun$4.apply(HiveContext.scala:231)
at scala.Option.orElse(Option.scala:257)
at
org.apache.spark.sql.hive.HiveContext.x$3$lzycompute(HiveContext.scala:231)
at org.apache.spark.sql.hive.HiveContext.x$3(HiveContext.scala:229)
at
org.apache.spark.sql.hive.HiveContext.hiveconf$lzycompute(HiveContext.scala:229)
at org.apache.spark.sql.hive.HiveContext.hiveconf(HiveContext.scala:229)
at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:292)
at 
org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:276)
at org.apache.spark.sql.hive.HiveContext.setConf(HiveContext.scala:248)
at org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:91)
at org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:90)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at org.apache.spark.sql.SQLContext.init(SQLContext.scala:90)
at org.apache.spark.sql.hive.HiveContext.init(HiveContext.scala:72)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.init(SparkSQLEnv.scala:51)
at
org.apache.spark.sql.hive.thriftserver.HiveThriftServer2$.main(HiveThriftServer2.scala:56)
at
org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.main(HiveThriftServer2.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.RuntimeException: Unable to instantiate 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient
at
org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1412)
at
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.init(RetryingMetaStoreClient.java:62)
at
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:72)
at
org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:2453)
at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:2465)
at
org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:340)
... 26 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at
org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1410)
... 31 more
Caused by: javax.jdo.JDOFatalUserException: Class 
org.datanucleus.api.jdo.JDOPersistenceManagerFactory was not found.
NestedThrowables:
java.lang.ClassNotFoundException:
org.datanucleus.api.jdo.JDOPersistenceManagerFactory
at
javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1175)
at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:808)
at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:701)
at
org.apache.hadoop.hive.metastore.ObjectStore.getPMF(ObjectStore.java:310)
at
org.apache.hadoop.hive.metastore.ObjectStore.getPersistenceManager(ObjectStore.java:339)
  

RE: Executing hive query from Spark code

2015-03-02 Thread Cheng, Hao
I am not so sure how Spark SQL compiled in CDH, but if didn’t specify the 
–Phive and –Phive-thriftserver flags during the build, most likely it will not 
work if just by providing the Hive lib jars later on.  For example, does the 
HiveContext class exist in the assembly jar?

I am also quite curious with that, any hint will be appreciated.

From: Felix C [mailto:felixcheun...@hotmail.com]
Sent: Tuesday, March 3, 2015 12:59 PM
To: Ted Yu; nitinkak001
Cc: user
Subject: Re: Executing hive query from Spark code

It should work in CDH without having to recompile.

http://eradiating.wordpress.com/2015/02/22/getting-hivecontext-to-work-in-cdh/

--- Original Message ---

From: Ted Yu yuzhih...@gmail.commailto:yuzhih...@gmail.com
Sent: March 2, 2015 1:35 PM
To: nitinkak001 nitinkak...@gmail.commailto:nitinkak...@gmail.com
Cc: user user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: Executing hive query from Spark code
Here is snippet of dependency tree for spark-hive module:

[INFO] org.apache.spark:spark-hive_2.10:jar:1.3.0-SNAPSHOT
...
[INFO] +- org.spark-project.hive:hive-metastore:jar:0.13.1a:compile
[INFO] |  +- org.spark-project.hive:hive-shims:jar:0.13.1a:compile
[INFO] |  |  +- 
org.spark-project.hive.shims:hive-shims-common:jar:0.13.1a:compile
[INFO] |  |  +- org.spark-project.hive.shims:hive-shims-0.20:jar:0.13.1a:runtime
[INFO] |  |  +- 
org.spark-project.hive.shims:hive-shims-common-secure:jar:0.13.1a:compile
[INFO] |  |  +- 
org.spark-project.hive.shims:hive-shims-0.20S:jar:0.13.1a:runtime
[INFO] |  |  \- org.spark-project.hive.shims:hive-shims-0.23:jar:0.13.1a:runtime
...
[INFO] +- org.spark-project.hive:hive-exec:jar:0.13.1a:compile
[INFO] |  +- org.spark-project.hive:hive-ant:jar:0.13.1a:compile
[INFO] |  |  \- org.apache.velocity:velocity:jar:1.5:compile
[INFO] |  | \- oro:oro:jar:2.0.8:compile
[INFO] |  +- org.spark-project.hive:hive-common:jar:0.13.1a:compile
...
[INFO] +- org.spark-project.hive:hive-serde:jar:0.13.1a:compile

bq. is there a way to have the hive support without updating the assembly

I don't think so.

On Mon, Mar 2, 2015 at 12:37 PM, nitinkak001 
nitinkak...@gmail.commailto:nitinkak...@gmail.com wrote:
I want to run Hive query inside Spark and use the RDDs generated from that
inside Spark. I read in the documentation

/Hive support is enabled by adding the -Phive and -Phive-thriftserver flags
to Spark’s build. This command builds a new assembly jar that includes Hive.
Note that this Hive assembly jar must also be present on all of the worker
nodes, as they will need access to the Hive serialization and
deserialization libraries (SerDes) in order to access data stored in Hive./

I just wanted to know what -Phive and -Phive-thriftserver flags really do
and is there a way to have the hive support without updating the assembly.
Does that flag add a hive support jar or something?

The reason I am asking is that I will be using Cloudera version of Spark in
future and I am not sure how to add the Hive support to that Spark
distribution.






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Executing-hive-query-from-Spark-code-tp21880.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org



RE: Is SQLContext thread-safe?

2015-03-02 Thread Cheng, Hao
https://issues.apache.org/jira/browse/SPARK-2087
https://github.com/apache/spark/pull/4382

I am working on the prototype, but will be updated soon.

-Original Message-
From: Haopu Wang [mailto:hw...@qilinsoft.com] 
Sent: Tuesday, March 3, 2015 8:32 AM
To: Cheng, Hao; user
Subject: RE: Is SQLContext thread-safe?

Hao, thank you so much for the reply!

Do you already have some JIRA for the discussion?

-Original Message-
From: Cheng, Hao [mailto:hao.ch...@intel.com]
Sent: Tuesday, March 03, 2015 8:23 AM
To: Haopu Wang; user
Subject: RE: Is SQLContext thread-safe?

Currently, each SQLContext has its own configuration, e.g. shuffle partition 
number, codegen etc. and it will be shared among the multiple threads running.

We actually has some internal discussions on this, probably will provide a 
thread local configuration in the future for a single SQLContext instance.

-Original Message-
From: Haopu Wang [mailto:hw...@qilinsoft.com]
Sent: Tuesday, March 3, 2015 7:56 AM
To: Cheng, Hao; user
Subject: RE: Is SQLContext thread-safe?

Thanks for the response.

Then I have another question: when will we want to create multiple SQLContext 
instances from the same SparkContext? What's the benefit?

-Original Message-
From: Cheng, Hao [mailto:hao.ch...@intel.com]
Sent: Monday, March 02, 2015 9:05 PM
To: Haopu Wang; user
Subject: RE: Is SQLContext thread-safe?

Yes it is thread safe, at least it's supposed to be.

-Original Message-
From: Haopu Wang [mailto:hw...@qilinsoft.com]
Sent: Monday, March 2, 2015 4:43 PM
To: user
Subject: Is SQLContext thread-safe?

Hi, is it safe to use the same SQLContext to do Select operations in different 
threads at the same time? Thank you very much!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Performance tuning in Spark SQL.

2015-03-02 Thread Cheng, Hao
This is actually a quite open question, from my understanding, there're 
probably ways to tune like:



*SQL Configurations like:



Configuration Key


Default Value


spark.sql.autoBroadcastJoinThreshold


10 * 1024 * 1024


spark.sql.defaultSizeInBytes


10 * 1024 * 1024 + 1


spark.sql.planner.externalSort


false


spark.sql.shuffle.partitions


200


spark.sql.codegen


false




*Spark Cluster / Application Configuration (Memory, GC etc. Spark Core 
Number etc.)

*Try using the Cached tables / Parquet Files as the storage.

*EXPLAIN [EXTENDED] query is your best friend to tuning your SQL 
itself.

*...



And, a real use case scenario probably be more helpful in answering your 
question.



-Original Message-
From: dubey_a [mailto:abhishek.du...@xoriant.com]
Sent: Monday, March 2, 2015 6:02 PM
To: user@spark.apache.org
Subject: Performance tuning in Spark SQL.



What are the ways to tune query performance in Spark SQL?







--

View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Performance-tuning-in-Spark-SQL-tp21871.html

Sent from the Apache Spark User List mailing list archive at Nabble.com.



-

To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For 
additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org




RE: Is SQLContext thread-safe?

2015-03-02 Thread Cheng, Hao
Yes it is thread safe, at least it's supposed to be.

-Original Message-
From: Haopu Wang [mailto:hw...@qilinsoft.com] 
Sent: Monday, March 2, 2015 4:43 PM
To: user
Subject: Is SQLContext thread-safe?

Hi, is it safe to use the same SQLContext to do Select operations in different 
threads at the same time? Thank you very much!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



JLine hangs under Windows8

2015-02-27 Thread Cheng, Hao
Hi, All
I was trying to run spark sql cli on windows 8 for debugging purpose, 
however, seems the JLine hangs in waiting input after ENTER key, I didn't see 
that under Linux, is there anybody meet the same issue?

The call stack as below:
main prio=6 tid=0x02548800 nid=0x17cc runnable [0x0253e000]
   java.lang.Thread.State: RUNNABLE
at jline.WindowsTerminal.readByte(Native Method)
at jline.WindowsTerminal.readCharacter(WindowsTerminal.java:233)
at jline.WindowsTerminal.readVirtualKey(WindowsTerminal.java:319)
at jline.ConsoleReader.readVirtualKey(ConsoleReader.java:1453)
at jline.ConsoleReader.readBinding(ConsoleReader.java:654)
at jline.ConsoleReader.readLine(ConsoleReader.java:494)
at jline.ConsoleReader.readLine(ConsoleReader.java:448)
at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:202)
at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)


Thanks,
Cheng Hao

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



  1   2   >