Re: Long GC pauses with Spark SQL 1.3.0 and billion row tables

2015-05-04 Thread ayan guha
You can use custom partitioner to redistribution using partitionby
On 4 May 2015 15:37, Nick Travers n.e.trav...@gmail.com wrote:

 I'm currently trying to join two large tables (order 1B rows each) using
 Spark SQL (1.3.0) and am running into long GC pauses which bring the job to
 a halt.

 I'm reading in both tables using a HiveContext with the underlying files
 stored as Parquet Files. I'm using  something along the lines of
 HiveContext.sql(SELECT a.col1, b.col2 FROM a JOIN b ON a.col1 = b.col1)
 to
 set up the join.

 When I execute this (with an action such as .count) I see the first few
 stages complete, but the job eventually stalls. The GC counts keep
 increasing for each executor.

 Running with 6 workers, each with 2T disk and 100GB RAM.

 Has anyone else run into this issue? I'm thinking I might be running into
 issues with the shuffling of the data, but I'm unsure of how to get around
 this? Is there a way to redistribute the rows based on the join key first,
 and then do the join?

 Thanks in advance.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Long-GC-pauses-with-Spark-SQL-1-3-0-and-billion-row-tables-tp22750.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Long GC pauses with Spark SQL 1.3.0 and billion row tables

2015-05-04 Thread Nick Travers
Could you be more specific in how this is done?

A DataFrame class doesn't have that method.

On Sun, May 3, 2015 at 11:07 PM, ayan guha guha.a...@gmail.com wrote:

 You can use custom partitioner to redistribution using partitionby
 On 4 May 2015 15:37, Nick Travers n.e.trav...@gmail.com wrote:

 I'm currently trying to join two large tables (order 1B rows each) using
 Spark SQL (1.3.0) and am running into long GC pauses which bring the job
 to
 a halt.

 I'm reading in both tables using a HiveContext with the underlying files
 stored as Parquet Files. I'm using  something along the lines of
 HiveContext.sql(SELECT a.col1, b.col2 FROM a JOIN b ON a.col1 = b.col1)
 to
 set up the join.

 When I execute this (with an action such as .count) I see the first few
 stages complete, but the job eventually stalls. The GC counts keep
 increasing for each executor.

 Running with 6 workers, each with 2T disk and 100GB RAM.

 Has anyone else run into this issue? I'm thinking I might be running into
 issues with the shuffling of the data, but I'm unsure of how to get around
 this? Is there a way to redistribute the rows based on the join key first,
 and then do the join?

 Thanks in advance.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Long-GC-pauses-with-Spark-SQL-1-3-0-and-billion-row-tables-tp22750.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




spark log analyzer sample

2015-05-04 Thread anshu shukla
Exception in thread main java.lang.RuntimeException:
org.apache.hadoop.ipc.RemoteException: Server IPC version 9 cannot
communicate with client version 4


I am not using any hadoop facility (not even hdfs) then why it is giving
this error .

-- 
Thanks  Regards,
Anshu Shukla


Spark job concurrency problem

2015-05-04 Thread Xi Shen
Hi,

I have two small RDD, each has about 600 records. In my code, I did

val rdd1 = sc...cache()
val rdd2 = sc...cache()

val result = rdd1.cartesian(rdd2).*repartition*(num_cpu).map {case (a,b) =
  some_expensive_job(a,b)
}

I ran my job in YARN cluster with --master yarn-cluster, I have 6
executor, and each has a large memory volume.

However, I noticed my job is very slow. I went to the RM page, and found
there are two containers, one is the driver, one is the worker. I guess
this is correct?

I went to the worker's log, and monitor the log detail. My app print some
information, so I can use them to estimate the progress of the map
operation. Looking at the log, it feels like the jobs are done one by one
sequentially, rather than #cpu batch at a time.

I checked the worker node, and their CPU are all busy.



[image: --]
Xi Shen
[image: http://]about.me/davidshen
http://about.me/davidshen?promo=email_sig
  http://about.me/davidshen


Spark Mongodb connection

2015-05-04 Thread Yasemin Kaya
Hi!

I am new at Spark and I want to begin Spark with simple wordCount example
in Java. But I want to give my input from Mongodb database. I want to learn
how can I connect Mongodb database to my project. Any one can help for this
issue.

Have a nice day
yasemin

-- 
hiç ender hiç


Re: Problem in Standalone Mode

2015-05-04 Thread Akhil Das
Can you paste the complete stacktrace? It looks like you are having version
incompatibility with hadoop.

Thanks
Best Regards

On Sat, May 2, 2015 at 4:36 PM, drarse drarse.a...@gmail.com wrote:

 When I run my program with Spark-Submit everythink are ok. But when I try
 run in satandalone mode I obtain the nex Exceptions:

 ((This is with

 val df = sqlContext.jsonFile(./datos.json)

 ))
 java.io.EOFException
 [error] at
 java.io.ObjectInputStream$BlockDataInputStream.readFully(ObjectInputStream.java:2744)


 This is SparkConf

 val sparkConf = new SparkConf().setAppName(myApp)
   .setMaster(spark://master:7077)
   .setSparkHome(/usr/local/spark/)
   .setJars(Seq(./target/scala-2.10/myApp.jar))



 --
 View this message in context: Problem in Standalone Mode
 http://apache-spark-user-list.1001560.n3.nabble.com/Problem-in-Standalone-Mode-tp22741.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.



how to make sure data is partitioned across all workers?

2015-05-04 Thread shahab
Hi,

Is there any way to enforce Spark to partition cached data across all
worker nodes, so all data is not cached only  in one of the worker nodes?

best,
/Shahab


Re: spark filestrea problem

2015-05-04 Thread Akhil Das
With filestream you can actually pass a filter parameter to avoid loading
up .tmp file/directories.

Also, when you move/rename a file, the file creation date doesn't change
and hence spark won't detect them i believe.

Thanks
Best Regards

On Sat, May 2, 2015 at 9:37 PM, Evo Eftimov evo.efti...@isecc.com wrote:

 it seems that on Spark Streaming 1.2 the filestream API may have a bug -
 it doesn't detect new files when moving or renaming them on HDFS - only
 when copying them but that leads to a well known problem with .tmp files
 which get removed and make spark steraming filestream throw exception



Re: Remoting warning when submitting to cluster

2015-05-04 Thread Akhil Das
Looks like a version incompatibility, just make sure you have the proper
version of spark. Also look further in the stacktrace what is causing
Futures timed out (it could be a network issue also if the ports aren't
opened properly)

Thanks
Best Regards

On Sat, May 2, 2015 at 12:04 AM, javidelgadillo jdelgadi...@esri.com
wrote:

 Hello all!!

 We've been prototyping some spark applications to read messages from Kafka
 topics.  The application is quite simple, we use KafkaUtils.createStream to
 receive a stream of CSV messages from a Kafka Topic.  We parse the CSV and
 count the number of messages we get in each RDD. At a high-level (removing
 the abstractions of our appliction), it looks like this:

 val sc = new SparkConf()
   .setAppName(appName)
   .set(spark.executor.memory, 1024m)
   .set(spark.cores.max, 3)
   .set(spark.app.name, appName)
   .set(spark.ui.port, sparkUIPort)

  val ssc =  new StreamingContext(sc, Milliseconds(emitInterval.toInt))

 KafkaUtils
   .createStream(ssc, zookeeperQuorum, consumerGroup, topicMap)
   .map(_._2)
   .foreachRDD( (rdd:RDD, time: Time) = {
 println(Time %s: (%s total records).format(time, rdd.count()))
   }

 When I submit this using to spark master as local[3] everything behaves as
 I'd expect.  After some startup overhead, I'm seeing the count printed to
 be
 the same as the count I'm simulating  (1 every second for example).

 When I submit this to a spark master using spark://master.host:7077, the
 behavior is different.  The overhead go start receiving seems longer and
 some runs I don't see anything for 30 seconds even though my simulator is
 sending messages to the topic.  I also see the following error written to
 stderr by every executor assigned to the job:

 Using Spark's default log4j profile:
 org/apache/spark/log4j-defaults.properties
 15/05/01 10:11:38 INFO SecurityManager: Changing view acls to: username
 15/05/01 10:11:38 INFO SecurityManager: Changing modify acls to: username
 15/05/01 10:11:38 INFO SecurityManager: SecurityManager: authentication
 disabled; ui acls disabled; users with view permissions: Set(javi4211);
 users with modify permissions: Set(username)
 15/05/01 10:11:38 INFO Slf4jLogger: Slf4jLogger started
 15/05/01 10:11:38 INFO Remoting: Starting remoting
 15/05/01 10:11:39 INFO Remoting: Remoting started; listening on addresses
 :[akka.tcp://driverpropsfetc...@master.host:56534]
 15/05/01 10:11:39 INFO Utils: Successfully started service
 'driverPropsFetcher' on port 56534.
 15/05/01 10:11:40 WARN Remoting: Tried to associate with unreachable remote
 address [akka.tcp://sparkdri...@driver.host:51837]. Address is now gated
 for
 5000 ms, all messages to this address will be delivered to dead letters.
 Reason: Connection refused: no further information:
 driver.host/10.27.51.214:51837
 15/05/01 10:12:09 ERROR UserGroupInformation: PriviledgedActionException
 as:username cause:java.util.concurrent.TimeoutException: Futures timed out
 after [30 seconds]
 Exception in thread main java.lang.reflect.UndeclaredThrowableException:
 Unknown exception in doAs
 at

 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1134)
 at

 org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:59)
 at

 org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:128)
 at

 org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:224)
 at

 org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
 Caused by: java.security.PrivilegedActionException:
 java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:422)
 at

 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
 ... 4 more
 Caused by: java.util.concurrent.TimeoutException: Futures timed out after
 [30 seconds]
 at
 scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
 at
 scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)


 Is there something else I need to do configure to ensure akka remoting will
 work correctly when running spark cluster?  Or can I ignore this error?

 -Javier



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Remoting-warning-when-submitting-to-cluster-tp22733.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Hardware requirements

2015-05-04 Thread Akhil Das
500GB of data will have nearly 3900 partitions and if you can have nearly
that many number of cores and around 500GB of memory then things will be
lightening fast. :)

Thanks
Best Regards

On Sun, May 3, 2015 at 12:49 PM, sherine ahmed sherine.sha...@hotmail.com
wrote:

 I need to use spark to upload a 500 GB data from hadoop on standalone mode
 cluster what are the minimum hardware requirements if it's known that it
 will be used for advanced analysis (social network analysis)?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Hardware-requirements-tp22744.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark - Timeout Issues - OutOfMemoryError

2015-05-04 Thread ๏̯͡๏
Hello Dean  Others,
Thanks for the response.

I tried with 100,200, 400, 600 and 1200 repartitions with 100,200,400 and
800 executors. Each time all the tasks of join complete in less than a
minute except one and that one tasks runs forever. I have a huge cluster at
my disposal.

The data for each of 1199 tasks is around 40MB/30k records and for 1 never
ending task is 1.5G/98million records. I see that there is data skew among
tasks. I had observed this a week earlier and i have no clue on how to fix
it and when someone suggested that repartition might make things more
parallel, but the problem is still persistent.

Please suggest on how to get the task to complete.
All i want to do is join two datasets. (dataset1 is in sequence file and
dataset2 is in avro format).



Ex:
Tasks IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch Time
DurationGC TimeShuffle Read Size / RecordsShuffle Spill (Memory)Shuffle
Spill (Disk)Errors  0 3771 0 RUNNING PROCESS_LOCAL 114 / host1 2015/05/04
01:27:44 7.3 min  19 s  1591.2 MB / 98931767  0.0 B 0.0 B   1 3772 0 SUCCESS
PROCESS_LOCAL 226 / host2 2015/05/04 01:27:44 28 s  2 s  39.2 MB / 29754
0.0 B 0.0 B   2 3773 0 SUCCESS PROCESS_LOCAL 283 / host3 2015/05/04 01:27:44 26
s  2 s  39.0 MB / 29646  0.0 B 0.0 B   5 3776 0 SUCCESS PROCESS_LOCAL 320
/ host4 2015/05/04 01:27:44 31 s  3 s  38.8 MB / 29512  0.0 B 0.0 B   4 3775
0 SUCCESS PROCESS_LOCAL 203 / host5 2015/05/04 01:27:44 41 s  3 s  38.4 MB
/ 29169  0.0 B 0.0 B   3 3774 0 SUCCESS PROCESS_LOCAL 84 / host6 2015/05/04
01:27:44 24 s  2 s  38.5 MB / 29258  0.0 B 0.0 B   8 3779 0 SUCCESS
PROCESS_LOCAL 309 / host7 2015/05/04 01:27:44 31 s  4 s  39.5 MB / 30008
0.0 B 0.0 B

There are 1200 tasks in total.


On Sun, May 3, 2015 at 9:53 PM, Dean Wampler deanwamp...@gmail.com wrote:

 I don't know the full context of what you're doing, but serialization
 errors usually mean you're attempting to serialize something that can't be
 serialized, like the SparkContext. Kryo won't help there.

 The arguments to spark-submit you posted previously look good:

 2)  --num-executors 96 --driver-memory 12g --driver-java-options
 -XX:MaxPermSize=10G --executor-memory 12g --executor-cores 4

 I suspect you aren't getting the parallelism you need. For partitioning,
 if your data is in HDFS and your block size is 128MB, then you'll get ~195
 partitions anyway. If it takes 7 hours to do a join over 25GB of data, you
 have some other serious bottleneck. You should examine the web console and
 the logs to determine where all the time is going. Questions you might
 pursue:

- How long does each task take to complete?
- How many of those 195 partitions/tasks are processed at the same
time? That is, how many slots are available?  Maybe you need more nodes
if the number of slots is too low. Based on your command arguments, you
should be able to process 1/2 of them at a time, unless the cluster is 
 busy.
- Is the cluster swamped with other work?
- How much data does each task process? Is the data roughly the same
from one task to the next? If not, then you might have serious key skew?

 You may also need to research the details of how joins are implemented and
 some of the common tricks for organizing data to minimize having to shuffle
 all N by M records.



 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Sun, May 3, 2015 at 11:02 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 Hello Deam,
 If I don;t use Kryo serializer i got Serialization error and hence am
 using it.
 If I don';t use partitionBy/reparition then the simply join never
 completed even after 7 hours and infact as next step i need to run it
 against 250G as that is my full dataset size. Someone here suggested to me
 to use repartition.

 Assuming reparition is mandatory , how do i decide whats the right number
 ? When i am using 400 i do not get NullPointerException that i talked
 about, which is strange. I never saw that exception against small random
 dataset but see it with 25G and again with 400 partitions , i do not see it.


 On Sun, May 3, 2015 at 9:15 PM, Dean Wampler deanwamp...@gmail.com
 wrote:

 IMHO, you are trying waaay to hard to optimize work on what is really a
 small data set. 25G, even 250G, is not that much data, especially if you've
 spent a month trying to get something to work that should be simple. All
 these errors are from optimization attempts.

 Kryo is great, but if it's not working reliably for some reason, then
 don't use it. Rather than force 200 partitions, let Spark try to figure out
 a good-enough number. (If you really need to force a partition count, use
 the repartition method instead, unless you're overriding the partitioner.)

 So. I recommend that you eliminate all the optimizations: Kryo,
 partitionBy, etc. Just use the simplest 

Re: Spark - Timeout Issues - OutOfMemoryError

2015-05-04 Thread Saisai Shao
IMHO If your data or your algorithm is prone to data skew, I think you have
to fix this from application level, Spark itself cannot overcome this
problem (if one key has large amount of values), you may change your
algorithm to choose another shuffle key, somethings like this to avoid
shuffle on skewed keys.

2015-05-04 16:41 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com:

 Hello Dean  Others,
 Thanks for the response.

 I tried with 100,200, 400, 600 and 1200 repartitions with 100,200,400 and
 800 executors. Each time all the tasks of join complete in less than a
 minute except one and that one tasks runs forever. I have a huge cluster at
 my disposal.

 The data for each of 1199 tasks is around 40MB/30k records and for 1 never
 ending task is 1.5G/98million records. I see that there is data skew among
 tasks. I had observed this a week earlier and i have no clue on how to fix
 it and when someone suggested that repartition might make things more
 parallel, but the problem is still persistent.

 Please suggest on how to get the task to complete.
 All i want to do is join two datasets. (dataset1 is in sequence file and
 dataset2 is in avro format).



 Ex:
 Tasks IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch Time
 DurationGC TimeShuffle Read Size / RecordsShuffle Spill (Memory)Shuffle
 Spill (Disk)Errors  0 3771 0 RUNNING PROCESS_LOCAL 114 / host1 2015/05/04
 01:27:44 7.3 min  19 s  1591.2 MB / 98931767  0.0 B 0.0 B   1 3772 0
 SUCCESS PROCESS_LOCAL 226 / host2 2015/05/04 01:27:44 28 s  2 s  39.2 MB
 / 29754  0.0 B 0.0 B   2 3773 0 SUCCESS PROCESS_LOCAL 283 / host3 2015/05/04
 01:27:44 26 s  2 s  39.0 MB / 29646  0.0 B 0.0 B   5 3776 0 SUCCESS
 PROCESS_LOCAL 320 / host4 2015/05/04 01:27:44 31 s  3 s  38.8 MB / 29512
 0.0 B 0.0 B   4 3775 0 SUCCESS PROCESS_LOCAL 203 / host5 2015/05/04
 01:27:44 41 s  3 s  38.4 MB / 29169  0.0 B 0.0 B   3 3774 0 SUCCESS
 PROCESS_LOCAL 84 / host6 2015/05/04 01:27:44 24 s  2 s  38.5 MB / 29258
 0.0 B 0.0 B   8 3779 0 SUCCESS PROCESS_LOCAL 309 / host7 2015/05/04
 01:27:44 31 s  4 s  39.5 MB / 30008  0.0 B 0.0 B

 There are 1200 tasks in total.


 On Sun, May 3, 2015 at 9:53 PM, Dean Wampler deanwamp...@gmail.com
 wrote:

 I don't know the full context of what you're doing, but serialization
 errors usually mean you're attempting to serialize something that can't be
 serialized, like the SparkContext. Kryo won't help there.

 The arguments to spark-submit you posted previously look good:

 2)  --num-executors 96 --driver-memory 12g --driver-java-options
 -XX:MaxPermSize=10G --executor-memory 12g --executor-cores 4

 I suspect you aren't getting the parallelism you need. For partitioning,
 if your data is in HDFS and your block size is 128MB, then you'll get ~195
 partitions anyway. If it takes 7 hours to do a join over 25GB of data, you
 have some other serious bottleneck. You should examine the web console and
 the logs to determine where all the time is going. Questions you might
 pursue:

- How long does each task take to complete?
- How many of those 195 partitions/tasks are processed at the same
time? That is, how many slots are available?  Maybe you need more nodes
if the number of slots is too low. Based on your command arguments, you
should be able to process 1/2 of them at a time, unless the cluster is 
 busy.
- Is the cluster swamped with other work?
- How much data does each task process? Is the data roughly the same
from one task to the next? If not, then you might have serious key skew?

 You may also need to research the details of how joins are implemented
 and some of the common tricks for organizing data to minimize having to
 shuffle all N by M records.



 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Sun, May 3, 2015 at 11:02 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 Hello Deam,
 If I don;t use Kryo serializer i got Serialization error and hence am
 using it.
 If I don';t use partitionBy/reparition then the simply join never
 completed even after 7 hours and infact as next step i need to run it
 against 250G as that is my full dataset size. Someone here suggested to me
 to use repartition.

 Assuming reparition is mandatory , how do i decide whats the right
 number ? When i am using 400 i do not get NullPointerException that i
 talked about, which is strange. I never saw that exception against small
 random dataset but see it with 25G and again with 400 partitions , i do not
 see it.


 On Sun, May 3, 2015 at 9:15 PM, Dean Wampler deanwamp...@gmail.com
 wrote:

 IMHO, you are trying waaay to hard to optimize work on what is really a
 small data set. 25G, even 250G, is not that much data, especially if you've
 spent a month trying to get something to work that should be simple. All
 these errors are from optimization attempts.


Re: Hardware requirements

2015-05-04 Thread ayan guha
Hi

How do you figure out 500gig~3900 partitions? I am trying to do the math.
If I assume 64mb block size then 1G~16 blocks and 500g~8000 blocks. If we
assume split and block sizes are same, shouldn't we end up with 8k
partitions?
On 4 May 2015 17:49, Akhil Das ak...@sigmoidanalytics.com wrote:

 500GB of data will have nearly 3900 partitions and if you can have nearly
 that many number of cores and around 500GB of memory then things will be
 lightening fast. :)

 Thanks
 Best Regards

 On Sun, May 3, 2015 at 12:49 PM, sherine ahmed sherine.sha...@hotmail.com
  wrote:

 I need to use spark to upload a 500 GB data from hadoop on standalone mode
 cluster what are the minimum hardware requirements if it's known that it
 will be used for advanced analysis (social network analysis)?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Hardware-requirements-tp22744.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Spark - Timeout Issues - OutOfMemoryError

2015-05-04 Thread ๏̯͡๏
Hello Shao,
Can you talk more about shuffle key or point me to APIs that allow me to
change shuffle key. I will try with different keys and see the performance.

What is the shuffle key by default ?

On Mon, May 4, 2015 at 2:37 PM, Saisai Shao sai.sai.s...@gmail.com wrote:

 IMHO If your data or your algorithm is prone to data skew, I think you
 have to fix this from application level, Spark itself cannot overcome this
 problem (if one key has large amount of values), you may change your
 algorithm to choose another shuffle key, somethings like this to avoid
 shuffle on skewed keys.

 2015-05-04 16:41 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com:

 Hello Dean  Others,
 Thanks for the response.

 I tried with 100,200, 400, 600 and 1200 repartitions with 100,200,400 and
 800 executors. Each time all the tasks of join complete in less than a
 minute except one and that one tasks runs forever. I have a huge cluster at
 my disposal.

 The data for each of 1199 tasks is around 40MB/30k records and for 1
 never ending task is 1.5G/98million records. I see that there is data skew
 among tasks. I had observed this a week earlier and i have no clue on how
 to fix it and when someone suggested that repartition might make things
 more parallel, but the problem is still persistent.

 Please suggest on how to get the task to complete.
 All i want to do is join two datasets. (dataset1 is in sequence file and
 dataset2 is in avro format).



 Ex:
 Tasks IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch Time
 DurationGC TimeShuffle Read Size / RecordsShuffle Spill (Memory)Shuffle
 Spill (Disk)Errors  0 3771 0 RUNNING PROCESS_LOCAL 114 / host1 2015/05/04
 01:27:44 7.3 min  19 s  1591.2 MB / 98931767  0.0 B 0.0 B   1 3772 0
 SUCCESS PROCESS_LOCAL 226 / host2 2015/05/04 01:27:44 28 s  2 s  39.2 MB
 / 29754  0.0 B 0.0 B   2 3773 0 SUCCESS PROCESS_LOCAL 283 / host3 2015/05/04
 01:27:44 26 s  2 s  39.0 MB / 29646  0.0 B 0.0 B   5 3776 0 SUCCESS
 PROCESS_LOCAL 320 / host4 2015/05/04 01:27:44 31 s  3 s  38.8 MB / 29512
 0.0 B 0.0 B   4 3775 0 SUCCESS PROCESS_LOCAL 203 / host5 2015/05/04
 01:27:44 41 s  3 s  38.4 MB / 29169  0.0 B 0.0 B   3 3774 0 SUCCESS
 PROCESS_LOCAL 84 / host6 2015/05/04 01:27:44 24 s  2 s  38.5 MB / 29258
 0.0 B 0.0 B   8 3779 0 SUCCESS PROCESS_LOCAL 309 / host7 2015/05/04
 01:27:44 31 s  4 s  39.5 MB / 30008  0.0 B 0.0 B

 There are 1200 tasks in total.


 On Sun, May 3, 2015 at 9:53 PM, Dean Wampler deanwamp...@gmail.com
 wrote:

 I don't know the full context of what you're doing, but serialization
 errors usually mean you're attempting to serialize something that can't be
 serialized, like the SparkContext. Kryo won't help there.

 The arguments to spark-submit you posted previously look good:

 2)  --num-executors 96 --driver-memory 12g --driver-java-options
 -XX:MaxPermSize=10G --executor-memory 12g --executor-cores 4

 I suspect you aren't getting the parallelism you need. For partitioning,
 if your data is in HDFS and your block size is 128MB, then you'll get ~195
 partitions anyway. If it takes 7 hours to do a join over 25GB of data, you
 have some other serious bottleneck. You should examine the web console and
 the logs to determine where all the time is going. Questions you might
 pursue:

- How long does each task take to complete?
- How many of those 195 partitions/tasks are processed at the same
time? That is, how many slots are available?  Maybe you need more nodes
if the number of slots is too low. Based on your command arguments, you
should be able to process 1/2 of them at a time, unless the cluster is 
 busy.
- Is the cluster swamped with other work?
- How much data does each task process? Is the data roughly the same
from one task to the next? If not, then you might have serious key skew?

 You may also need to research the details of how joins are implemented
 and some of the common tricks for organizing data to minimize having to
 shuffle all N by M records.



 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Sun, May 3, 2015 at 11:02 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 Hello Deam,
 If I don;t use Kryo serializer i got Serialization error and hence am
 using it.
 If I don';t use partitionBy/reparition then the simply join never
 completed even after 7 hours and infact as next step i need to run it
 against 250G as that is my full dataset size. Someone here suggested to me
 to use repartition.

 Assuming reparition is mandatory , how do i decide whats the right
 number ? When i am using 400 i do not get NullPointerException that i
 talked about, which is strange. I never saw that exception against small
 random dataset but see it with 25G and again with 400 partitions , i do not
 see it.


 On Sun, May 3, 2015 at 9:15 PM, Dean Wampler 

Re: Hardware requirements

2015-05-04 Thread Akhil Das
Assume your block size is 128MB.

Thanks
Best Regards

On Mon, May 4, 2015 at 2:38 PM, ayan guha guha.a...@gmail.com wrote:

 Hi

 How do you figure out 500gig~3900 partitions? I am trying to do the math.
 If I assume 64mb block size then 1G~16 blocks and 500g~8000 blocks. If we
 assume split and block sizes are same, shouldn't we end up with 8k
 partitions?
 On 4 May 2015 17:49, Akhil Das ak...@sigmoidanalytics.com wrote:

 500GB of data will have nearly 3900 partitions and if you can have nearly
 that many number of cores and around 500GB of memory then things will be
 lightening fast. :)

 Thanks
 Best Regards

 On Sun, May 3, 2015 at 12:49 PM, sherine ahmed 
 sherine.sha...@hotmail.com wrote:

 I need to use spark to upload a 500 GB data from hadoop on standalone
 mode
 cluster what are the minimum hardware requirements if it's known that it
 will be used for advanced analysis (social network analysis)?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Hardware-requirements-tp22744.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Spark - Timeout Issues - OutOfMemoryError

2015-05-04 Thread Saisai Shao
Shuffle key is depending on your implementation, I'm not sure if you are
familiar with MapReduce, the mapper output is a key-value pair, where the
key is the shuffle key for shuffling, Spark is also the same.

2015-05-04 17:31 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com:

 Hello Shao,
 Can you talk more about shuffle key or point me to APIs that allow me to
 change shuffle key. I will try with different keys and see the performance.

 What is the shuffle key by default ?

 On Mon, May 4, 2015 at 2:37 PM, Saisai Shao sai.sai.s...@gmail.com
 wrote:

 IMHO If your data or your algorithm is prone to data skew, I think you
 have to fix this from application level, Spark itself cannot overcome this
 problem (if one key has large amount of values), you may change your
 algorithm to choose another shuffle key, somethings like this to avoid
 shuffle on skewed keys.

 2015-05-04 16:41 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com:

 Hello Dean  Others,
 Thanks for the response.

 I tried with 100,200, 400, 600 and 1200 repartitions with 100,200,400
 and 800 executors. Each time all the tasks of join complete in less than a
 minute except one and that one tasks runs forever. I have a huge cluster at
 my disposal.

 The data for each of 1199 tasks is around 40MB/30k records and for 1
 never ending task is 1.5G/98million records. I see that there is data skew
 among tasks. I had observed this a week earlier and i have no clue on how
 to fix it and when someone suggested that repartition might make things
 more parallel, but the problem is still persistent.

 Please suggest on how to get the task to complete.
 All i want to do is join two datasets. (dataset1 is in sequence file and
 dataset2 is in avro format).



 Ex:
 Tasks IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch Time
 DurationGC TimeShuffle Read Size / RecordsShuffle Spill (Memory)Shuffle
 Spill (Disk)Errors  0 3771 0 RUNNING PROCESS_LOCAL 114 / host1 2015/05/04
 01:27:44 7.3 min  19 s  1591.2 MB / 98931767  0.0 B 0.0 B   1 3772 0
 SUCCESS PROCESS_LOCAL 226 / host2 2015/05/04 01:27:44 28 s  2 s  39.2
 MB / 29754  0.0 B 0.0 B   2 3773 0 SUCCESS PROCESS_LOCAL 283 / host3 
 2015/05/04
 01:27:44 26 s  2 s  39.0 MB / 29646  0.0 B 0.0 B   5 3776 0 SUCCESS
 PROCESS_LOCAL 320 / host4 2015/05/04 01:27:44 31 s  3 s  38.8 MB /
 29512  0.0 B 0.0 B   4 3775 0 SUCCESS PROCESS_LOCAL 203 / host5 2015/05/04
 01:27:44 41 s  3 s  38.4 MB / 29169  0.0 B 0.0 B   3 3774 0 SUCCESS
 PROCESS_LOCAL 84 / host6 2015/05/04 01:27:44 24 s  2 s  38.5 MB / 29258
 0.0 B 0.0 B   8 3779 0 SUCCESS PROCESS_LOCAL 309 / host7 2015/05/04
 01:27:44 31 s  4 s  39.5 MB / 30008  0.0 B 0.0 B

 There are 1200 tasks in total.


 On Sun, May 3, 2015 at 9:53 PM, Dean Wampler deanwamp...@gmail.com
 wrote:

 I don't know the full context of what you're doing, but serialization
 errors usually mean you're attempting to serialize something that can't be
 serialized, like the SparkContext. Kryo won't help there.

 The arguments to spark-submit you posted previously look good:

 2)  --num-executors 96 --driver-memory 12g --driver-java-options
 -XX:MaxPermSize=10G --executor-memory 12g --executor-cores 4

 I suspect you aren't getting the parallelism you need. For
 partitioning, if your data is in HDFS and your block size is 128MB, then
 you'll get ~195 partitions anyway. If it takes 7 hours to do a join over
 25GB of data, you have some other serious bottleneck. You should examine
 the web console and the logs to determine where all the time is going.
 Questions you might pursue:

- How long does each task take to complete?
- How many of those 195 partitions/tasks are processed at the same
time? That is, how many slots are available?  Maybe you need more 
 nodes
if the number of slots is too low. Based on your command arguments, you
should be able to process 1/2 of them at a time, unless the cluster is 
 busy.
- Is the cluster swamped with other work?
- How much data does each task process? Is the data roughly the
same from one task to the next? If not, then you might have serious key
skew?

 You may also need to research the details of how joins are implemented
 and some of the common tricks for organizing data to minimize having to
 shuffle all N by M records.



 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Sun, May 3, 2015 at 11:02 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 Hello Deam,
 If I don;t use Kryo serializer i got Serialization error and hence am
 using it.
 If I don';t use partitionBy/reparition then the simply join never
 completed even after 7 hours and infact as next step i need to run it
 against 250G as that is my full dataset size. Someone here suggested to me
 to use repartition.

 Assuming reparition is mandatory , how do i decide whats the right
 number ? 

Re: Map-Side Join in Spark

2015-05-04 Thread ๏̯͡๏
This is how i implemented map-side join using broadcast.

   val listings = DataUtil.getDwLstgItem(sc,
DateUtil.addDaysToDate(startDate, -89))
val viEvents = details.map { vi = (vi.get(14).asInstanceOf[Long], vi) }

val lstgItemMap = listings.map { lstg = (lstg.getItemId().toLong,
lstg) }.*collectAsMap*
val broadCastMap = sc.*broadcast*(lstgItemMap)
val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
Long))] = viEvents.mapPartitions({
  iter =
val lstgItemMap = broadCastMap.value
*for* {
  (itemId, viDetail) - iter
  if (lstgItemMap.contains(itemId))
} *yield* ({
  val listing = lstgItemMap.get(itemId).get
  val viSummary = new VISummary
  viSummary.leafCategoryId = listing.getLeafCategId().toInt
  viSummary.itemSiteId = listing.getItemSiteId().toInt
  viSummary.auctionTypeCode = listing.getAuctTypeCode().toInt
  viSummary.sellerCountryId = listing.getSlrCntryId().toInt
  viSummary.buyerSegment = 0
  viSummary.isBin = (if
(listing.getBinPriceLstgCurncy.doubleValue()  0) 1 else 0)

  val sellerId = listing.getSlrId.toLong
  (sellerId, (viDetail, viSummary, itemId))
})
})

Usage: collectAsMap, broadcast, scala-yield.

Learning: As lstgItemMap is collected as Map on driver, when its size
exceeds driver-memory it throws OOM error. As i had a limit of 12G on
memory and my dataset size is around 100G, i could not use map-side join
and switched back to join().

Sharing this so others can use the code example in case they want to
implement map side join with Spark+Scala.

On Tue, Apr 21, 2015 at 7:32 PM, ayan guha guha.a...@gmail.com wrote:

 Hi

 Sorry was typing from mobile hence could not elaborate earlier.

 I presume you want to do map-side join and you mean you want to join 2 RDD
 without shuffle?

 Please have a quick look
 http://apache-spark-user-list.1001560.n3.nabble.com/Text-file-and-shuffle-td5973.html#none

 1) co-partition you data for cogroup:

 val par = HashPartitioner(128)
 val x = sc.textFile(..).map(...).partitionBy(par)
 val y = sc.textFile(...).map(...).partitionBy(par)
 ...

 This should enable join with (much less) shuffle.

 Another option provided in the same thread - to broadcast in case one of
 the table is small(ish).

 Hope this helps.

 Best
 Ayan

 On Tue, Apr 21, 2015 at 3:56 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 These are pair RDDs (itemId, item)  (itemId, listing).

 What do you mean by re-partitioning of these RDDS ?
 Now what you mean by your partitioner

 Can you elaborate ?

 On Tue, Apr 21, 2015 at 11:18 AM, ayan guha guha.a...@gmail.com wrote:

 If you are using a pairrdd, then you can use partition by method to
 provide your partitioner
 On 21 Apr 2015 15:04, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 What is re-partition ?

 On Tue, Apr 21, 2015 at 10:23 AM, ayan guha guha.a...@gmail.com
 wrote:

 In my understanding you need to create a key out of the data and
 repartition both datasets to achieve map side join.
 On 21 Apr 2015 14:10, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 Can someone share their working code of Map Side join in Spark +
 Scala. (No Spark-SQL)

 The only resource i could find was this (Open in chrome with Chinese
 to english translator)

 http://dongxicheng.org/framework-on-yarn/apache-spark-join-two-tables/



 --
 Deepak




 --
 Deepak




 --
 Deepak




 --
 Best Regards,
 Ayan Guha




-- 
Deepak


Re: Spark - Timeout Issues - OutOfMemoryError

2015-05-04 Thread ๏̯͡๏
One dataset (RDD Pair)

val lstgItem = listings.map { lstg = (lstg.getItemId().toLong, lstg) }

Second Dataset (RDDPair)

val viEvents = viEventsRaw.map { vi = (vi.get(14).asInstanceOf[Long], vi) }

As i want to join based on item Id that is used as first element in the
tuple in both cases and i think thats what is shuffle key.

listings == Data set contains all the unique item ids that are ever listed
on the ecommerce site.

viEvents === List of items viewed by user in last day. This will always be
a subset of the total set.

So i do not understand what is data skewness. When my long running task is
working on 1591.2 MB / 98,931,767 does that mean 98 million reocrds contain
all the same item ID ? How can millions of user look at the same item in
last day ?

Or does this dataset contain records across item ids ?


Regards,

Deepak




On Mon, May 4, 2015 at 3:08 PM, Saisai Shao sai.sai.s...@gmail.com wrote:

 Shuffle key is depending on your implementation, I'm not sure if you are
 familiar with MapReduce, the mapper output is a key-value pair, where the
 key is the shuffle key for shuffling, Spark is also the same.

 2015-05-04 17:31 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com:

 Hello Shao,
 Can you talk more about shuffle key or point me to APIs that allow me to
 change shuffle key. I will try with different keys and see the performance.

 What is the shuffle key by default ?

 On Mon, May 4, 2015 at 2:37 PM, Saisai Shao sai.sai.s...@gmail.com
 wrote:

 IMHO If your data or your algorithm is prone to data skew, I think you
 have to fix this from application level, Spark itself cannot overcome this
 problem (if one key has large amount of values), you may change your
 algorithm to choose another shuffle key, somethings like this to avoid
 shuffle on skewed keys.

 2015-05-04 16:41 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com:

 Hello Dean  Others,
 Thanks for the response.

 I tried with 100,200, 400, 600 and 1200 repartitions with 100,200,400
 and 800 executors. Each time all the tasks of join complete in less than a
 minute except one and that one tasks runs forever. I have a huge cluster at
 my disposal.

 The data for each of 1199 tasks is around 40MB/30k records and for 1
 never ending task is 1.5G/98million records. I see that there is data skew
 among tasks. I had observed this a week earlier and i have no clue on how
 to fix it and when someone suggested that repartition might make things
 more parallel, but the problem is still persistent.

 Please suggest on how to get the task to complete.
 All i want to do is join two datasets. (dataset1 is in sequence file
 and dataset2 is in avro format).



 Ex:
 Tasks IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch Time
 DurationGC TimeShuffle Read Size / RecordsShuffle Spill (Memory)Shuffle
 Spill (Disk)Errors  0 3771 0 RUNNING PROCESS_LOCAL 114 / host1 2015/05/04
 01:27:44 7.3 min  19 s  1591.2 MB / 98931767  0.0 B 0.0 B   1 3772 0
 SUCCESS PROCESS_LOCAL 226 / host2 2015/05/04 01:27:44 28 s  2 s  39.2
 MB / 29754  0.0 B 0.0 B   2 3773 0 SUCCESS PROCESS_LOCAL 283 / host3 
 2015/05/04
 01:27:44 26 s  2 s  39.0 MB / 29646  0.0 B 0.0 B   5 3776 0 SUCCESS
 PROCESS_LOCAL 320 / host4 2015/05/04 01:27:44 31 s  3 s  38.8 MB /
 29512  0.0 B 0.0 B   4 3775 0 SUCCESS PROCESS_LOCAL 203 / host5 2015/05/04
 01:27:44 41 s  3 s  38.4 MB / 29169  0.0 B 0.0 B   3 3774 0 SUCCESS
 PROCESS_LOCAL 84 / host6 2015/05/04 01:27:44 24 s  2 s  38.5 MB /
 29258  0.0 B 0.0 B   8 3779 0 SUCCESS PROCESS_LOCAL 309 / host7 2015/05/04
 01:27:44 31 s  4 s  39.5 MB / 30008  0.0 B 0.0 B

 There are 1200 tasks in total.


 On Sun, May 3, 2015 at 9:53 PM, Dean Wampler deanwamp...@gmail.com
 wrote:

 I don't know the full context of what you're doing, but serialization
 errors usually mean you're attempting to serialize something that can't be
 serialized, like the SparkContext. Kryo won't help there.

 The arguments to spark-submit you posted previously look good:

 2)  --num-executors 96 --driver-memory 12g --driver-java-options
 -XX:MaxPermSize=10G --executor-memory 12g --executor-cores 4

 I suspect you aren't getting the parallelism you need. For
 partitioning, if your data is in HDFS and your block size is 128MB, then
 you'll get ~195 partitions anyway. If it takes 7 hours to do a join over
 25GB of data, you have some other serious bottleneck. You should examine
 the web console and the logs to determine where all the time is going.
 Questions you might pursue:

- How long does each task take to complete?
- How many of those 195 partitions/tasks are processed at the same
time? That is, how many slots are available?  Maybe you need more 
 nodes
if the number of slots is too low. Based on your command arguments, you
should be able to process 1/2 of them at a time, unless the cluster is 
 busy.
- Is the cluster swamped with other work?
- How much data does each task process? Is the data roughly the
same from one task to the next? If not, then you 

Re: Spark - Timeout Issues - OutOfMemoryError

2015-05-04 Thread ๏̯͡๏
Four tasks are now failing with

IndexIDAttemptStatus ▾Locality LevelExecutor ID / HostLaunch TimeDurationGC
TimeShuffle Read Size / RecordsShuffle Spill (Memory)Shuffle Spill (Disk)
Errors  0 3771 0 FAILED PROCESS_LOCAL 114 / host1 2015/05/04 01:27:44
 /   ExecutorLostFailure
(executor 114 lost)  1007 4973 1 FAILED PROCESS_LOCAL 420 / host2 2015/05/04
02:13:14   /   FetchFailed(null, shuffleId=1, mapId=-1, reduceId=1007,
message= +details

FetchFailed(null, shuffleId=1, mapId=-1, reduceId=1007, message=
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an
output location for shuffle 1
at 
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:385)
at 
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:382)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at 
org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:381)
at 
org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:177)
at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
at 
org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:127)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:127)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

)

 371 4972 1 FAILED PROCESS_LOCAL 563 / host3 2015/05/04 02:13:14   /
FetchFailed(null,
shuffleId=1, mapId=-1, reduceId=371, message= +details

FetchFailed(null, shuffleId=1, mapId=-1, reduceId=371, message=
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an
output location for shuffle 1
at 
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:385)
at 
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:382)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at 

Re: Unusual filter behaviour on RDD

2015-05-04 Thread fawadalam
I got it working. When I was persisting, it only persisted 85% of the RDD to
memory and the rest of the RDD gets recomputed every time. Because my
flagged RDD uses a random method to create the field, I was getting
unpredictable results. When I persist using:

flagged.persist(StorageLevel.MEMORY_AND_DISK)

it works perfectly well.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Unusual-filter-behaviour-on-RDD-tp22749p22752.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



mapping JavaRDD to jdbc DataFrame

2015-05-04 Thread Lior Chaga
Hi,

I'd like to use a JavaRDD containing parameters for an SQL query, and use
SparkSQL jdbc to load data from mySQL.

Consider the following pseudo code:

JavaRDDString namesRdd = ... ;
...
options.put(url, jdbc:mysql://mysql?user=usr);
options.put(password, pass);
options.put(dbtable, (SELECT * FROM mytable WHERE userName = ?)
sp_campaigns);
DataFrame myTableDF = m_sqlContext.load(jdbc, options);


I'm looking for a way to map namesRdd and get for each name the result of
the queries, without loosing spark context.

Using a mapping function doesn't seem like an option, because I don't have
SQLContext inside it.
I can only think of using collect, and than iterating over the string in
the RDD and execute the query, but it would run in the driver program.

Any suggestions?

Thanks,
Lior


Re: Spark - Timeout Issues - OutOfMemoryError

2015-05-04 Thread Saisai Shao
From the symptoms you mentioned that one task's shuffle write is much
larger than all the other task, it is quite similar to normal data skew
behavior, I just give some advice based on your descriptions, I think you
need to detect whether data is actually skewed or not.

The shuffle will put data with same partitioner strategy (default is hash
partitioner) into one task, so the same key data will be put into the same
task, but one task not just has only one key.

2015-05-04 18:04 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com:

 Attached image shows the Spark UI for the job.





 On Mon, May 4, 2015 at 3:28 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 Four tasks are now failing with

 IndexIDAttemptStatus ▾Locality LevelExecutor ID / HostLaunch TimeDurationGC
 TimeShuffle Read Size / RecordsShuffle Spill (Memory)Shuffle Spill (Disk)
 Errors  0 3771 0 FAILED PROCESS_LOCAL 114 / host1 2015/05/04 01:27:44   /
   ExecutorLostFailure (executor 114 lost)  1007 4973 1 FAILED
 PROCESS_LOCAL 420 / host2 2015/05/04 02:13:14   /   FetchFailed(null,
 shuffleId=1, mapId=-1, reduceId=1007, message= +details

 FetchFailed(null, shuffleId=1, mapId=-1, reduceId=1007, message=
 org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output 
 location for shuffle 1
  at 
 org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:385)
  at 
 org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:382)
  at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
  at 
 org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:381)
  at 
 org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:177)
  at 
 org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
  at 
 org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
  at 
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137)
  at 
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:127)
  at 
 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
  at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
  at 
 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
  at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:127)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
  at 
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
  at 
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
  at 
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
  at 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
  at 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
  at org.apache.spark.scheduler.Task.run(Task.scala:64)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
  at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
  at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  at java.lang.Thread.run(Thread.java:745)

 )

  371 4972 1 FAILED PROCESS_LOCAL 563 / host3 2015/05/04 02:13:14   /   
 FetchFailed(null,
 shuffleId=1, mapId=-1, reduceId=371, message= +details

 FetchFailed(null, shuffleId=1, mapId=-1, reduceId=371, message=
 org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output 
 location for shuffle 1
  at 
 org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:385)
  at 
 

Re: sparksql running slow while joining 2 tables.

2015-05-04 Thread Olivier Girardot
Hi,
What is you Spark version ?

Regards,

Olivier.

Le lun. 4 mai 2015 à 11:03, luohui20...@sina.com a écrit :

 hi guys

 when i am running a sql  like select a.name,a.startpoint,a.endpoint,
 a.piece from db a join sample b on (a.name = b.name) where (b.startpoint
  a.startpoint + 25); I found sparksql running slow in minutes which may
 caused by very long GC and shuffle time.


table db is created from a txt file size at 56mb while table sample
 sized at 26mb, both at small size.

my spark cluster is a standalone  pseudo-distributed spark
 cluster with 8g executor and 4g driver manager.

any advises? thank you guys.



 

 Thanksamp;Best regards!
 罗辉 San.Luo

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


Re: Spark - Timeout Issues - OutOfMemoryError

2015-05-04 Thread ๏̯͡๏
I ran it against one file instead of 10 files and i see one task is still
running after 33 mins its shuffle read size is 780MB/50 mil records.

I did a count of records for each itemId from dataset-2 [One FILE] (Second
Dataset (RDDPair) val viEvents = viEventsRaw.map { vi = (vi.get(14
).asInstanceOf[Long], vi) } ). This is the dataset that contains the list
of items viewed by user in one day.

*Item IdCount*
201335783004 537
111654496030 353
141640703798 287
191568402102 258
111654479898 217
231521843148 211
251931716094 201
111654493548 181
181503913062 181
121635453050 152
261798565828 140
151494617682 139
251927181728 127
231516683056 119
141640492864 117
161677270656 117
171771073616 113
111649942124 109
191516989450 97
231539161292 94
221555628408 88
131497785968 87
121632233872 84
131335379184 83
281531363490 83
131492727742 79
231174157820 79
161666914810 77
251699753072 77
161683664300 76


I was assuming that data-skew would be if the top item(201335783004) had a
count of 1 million, however its only few hundreds, then why is Spark
skewing it in join ? What should i do that Spark distributes the records
evenly ?

In M/R we can change the Partitioner between mapper and reducer, how can i
do in Spark  for Join?


IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDurationGC
TimeShuffle Read Size / Records ▴Shuffle Spill (Memory)Shuffle Spill (Disk)
Errors  0 3618 0 RUNNING PROCESS_LOCAL 4 / host12015/05/04 05:09:53 33 min
8.5 min  783.9 MB / 50,761,322  4.6 GB 47.5 MB   433 4051 0 SUCCESS
PROCESS_LOCAL 1 / host2 2015/05/04 05:16:27 1.1 min  20 s  116.0 MB /
4505143  1282.3 MB 10.1 MB   218 3836 0 SUCCESS PROCESS_LOCAL 3 /
host3 2015/05/04
05:13:01 53 s  11 s  76.4 MB / 2865143  879.6 MB 6.9 MB   113 3731 0 SUCCESS
PROCESS_LOCAL 2 / host4 2015/05/04 05:11:30 31 s  8 s  6.9 MB / 5187  0.0 B 0.0
B

On Mon, May 4, 2015 at 6:00 PM, Saisai Shao sai.sai.s...@gmail.com wrote:

 From the symptoms you mentioned that one task's shuffle write is much
 larger than all the other task, it is quite similar to normal data skew
 behavior, I just give some advice based on your descriptions, I think you
 need to detect whether data is actually skewed or not.

 The shuffle will put data with same partitioner strategy (default is hash
 partitioner) into one task, so the same key data will be put into the same
 task, but one task not just has only one key.

 2015-05-04 18:04 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com:

 Attached image shows the Spark UI for the job.





 On Mon, May 4, 2015 at 3:28 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 Four tasks are now failing with

 IndexIDAttemptStatus ▾Locality LevelExecutor ID / HostLaunch Time
 DurationGC TimeShuffle Read Size / RecordsShuffle Spill (Memory)Shuffle
 Spill (Disk)Errors  0 3771 0 FAILED PROCESS_LOCAL 114 / host1 2015/05/04
 01:27:44   /   ExecutorLostFailure (executor 114 lost)  1007 4973 1
 FAILED PROCESS_LOCAL 420 / host2 2015/05/04 02:13:14   /   FetchFailed(null,
 shuffleId=1, mapId=-1, reduceId=1007, message= +details

 FetchFailed(null, shuffleId=1, mapId=-1, reduceId=1007, message=
 org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output 
 location for shuffle 1
 at 
 org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:385)
 at 
 org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:382)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
 at 
 org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:381)
 at 
 org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:177)
 at 
 org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
 at 
 org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
 at 
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137)
 at 
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:127)
 at 
 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
 at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at 
 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
 at 

Re: Custom Partitioning Spark

2015-05-04 Thread ๏̯͡๏
Why do you use custom partitioner  ?
Are you doing join ?
And, can you share some code that shows how you implemented custom
partitioner.

On Tue, Apr 21, 2015 at 8:38 PM, ayan guha guha.a...@gmail.com wrote:

 Are you looking for?

 *mapPartitions*(*func*)Similar to map, but runs separately on each
 partition (block) of the RDD, so *func* must be of type IteratorT =
 IteratorU when running on an RDD of type T.*mapPartitionsWithIndex*(
 *func*)Similar to mapPartitions, but also provides *func* with an integer
 value representing the index of the partition, so *func* must be of type
 (Int, IteratorT) = IteratorU when running on an RDD of type T.

 On Wed, Apr 22, 2015 at 1:00 AM, MUHAMMAD AAMIR mas.ha...@gmail.com
 wrote:

 Hi Archit,

 Thanks a lot for your reply. I am using rdd.partitions.length to check
 the number of partitions. rdd.partitions return the array of partitions.
 I would like to add one more question here do you have any idea how to
 get the objects in each partition ? Further is there any way to figure out
 which particular partitions an object bleongs ?

 Thanks,

 On Tue, Apr 21, 2015 at 12:16 PM, Archit Thakur 
 archit279tha...@gmail.com wrote:

 Hi,

 This should work. How are you checking the no. of partitions.?

 Thanks and Regards,
 Archit Thakur.

 On Mon, Apr 20, 2015 at 7:26 PM, mas mas.ha...@gmail.com wrote:

 Hi,

 I aim to do custom partitioning on a text file. I first convert it into
 pairRDD and then try to use my custom partitioner. However, somehow it
 is
 not working. My code snippet is given below.

 val file=sc.textFile(filePath)
 val locLines=file.map(line = line.split(\t)).map(line=
 ((line(2).toDouble,line(3).toDouble),line(5).toLong))
 val ck=locLines.partitionBy(new HashPartitioner(50)) // new
 CustomPartitioner(50) -- none of the way is working here.

 while reading the file using textFile method it automatically
 partitions
 the file. However when i explicitly want to partition the new rdd
 locLines, It doesn't appear to do anything and even the number of
 partitions are same which is created by sc.textFile().

 Any help in this regard will be highly appreciated.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Custom-Partitioning-Spark-tp22571.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





 --
 Regards,
 Muhammad Aamir


 *CONFIDENTIALITY:This email is intended solely for the person(s) named
 and may be confidential and/or privileged.If you are not the intended
 recipient,please delete it,notify me and do not copy,use,or disclose its
 content.*




 --
 Best Regards,
 Ayan Guha




-- 
Deepak


Re: Support for skewed joins in Spark

2015-05-04 Thread ๏̯͡๏
Hello Soila,
Can you share the code that shows usuag of RangePartitioner ?
I am facing issue with .join() where one task runs forever. I tried
repartition(100/200/300/1200) and it did not help, I cannot use map-side
join because both datasets are huge and beyond driver memory size.
Regards,
Deepak

On Fri, Mar 13, 2015 at 9:54 AM, Soila Pertet Kavulya skavu...@gmail.com
wrote:

 Thanks Shixiong,

 I'll try out your PR. Do you know what the status of the PR is? Are
 there any plans to incorporate this change to the
 DataFrames/SchemaRDDs in Spark 1.3?

 Soila

 On Thu, Mar 12, 2015 at 7:52 PM, Shixiong Zhu zsxw...@gmail.com wrote:
  I sent a PR to add skewed join last year:
  https://github.com/apache/spark/pull/3505
  However, it does not split a key to multiple partitions. Instead, if a
 key
  has too many values that can not be fit in to memory, it will store the
  values into the disk temporarily and use disk files to do the join.
 
  Best Regards,
 
  Shixiong Zhu
 
  2015-03-13 9:37 GMT+08:00 Soila Pertet Kavulya skavu...@gmail.com:
 
  Does Spark support skewed joins similar to Pig which distributes large
  keys over multiple partitions? I tried using the RangePartitioner but
  I am still experiencing failures because some keys are too large to
  fit in a single partition. I cannot use broadcast variables to
  work-around this because both RDDs are too large to fit in driver
  memory.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Deepak


spark 1.3.1

2015-05-04 Thread Saurabh Gupta
HI,

I am trying to build a example code given at
https://spark.apache.org/docs/latest/sql-programming-guide.html#interoperating-with-rdds

code is:

// Import factory methods provided by DataType.import
org.apache.spark.sql.types.DataType;// Import StructType and
StructFieldimport org.apache.spark.sql.types.StructType;import
org.apache.spark.sql.types.StructField;// Import Row.import
org.apache.spark.sql.Row;
// sc is an existing JavaSparkContext.SQLContext sqlContext = new
org.apache.spark.sql.SQLContext(sc);
// Load a text file and convert each line to a
JavaBean.JavaRDDString people =
sc.textFile(examples/src/main/resources/people.txt);
// The schema is encoded in a stringString schemaString = name age;
// Generate the schema based on the string of schemaListStructField
fields = new ArrayListStructField();for (String fieldName:
schemaString.split( )) {
  fields.add(DataType.createStructField(fieldName,
DataType.StringType, true));}StructType schema =
DataType.createStructType(fields);
// Convert records of the RDD (people) to Rows.JavaRDDRow rowRDD = people.map(
  new FunctionString, Row() {
public Row call(String record) throws Exception {
  String[] fields = record.split(,);
  return Row.create(fields[0], fields[1].trim());
}
  });
// Apply the schema to the RDD.DataFrame peopleDataFrame =
sqlContext.createDataFrame(rowRDD, schema);
// Register the DataFrame as a
table.peopleDataFrame.registerTempTable(people);
// SQL can be run over RDDs that have been registered as
tables.DataFrame results = sqlContext.sql(SELECT name FROM people);
// The results of SQL queries are DataFrames and support all the
normal RDD operations.// The columns of a row in the result can be
accessed by ordinal.ListString names = results.map(new FunctionRow,
String() {
  public String call(Row row) {
return Name:  + row.getString(0);
  }}).collect();

my pom file looks like:

dependencies
dependency
groupIdorg.apache.spark/groupId
artifactIdspark-core_2.10/artifactId
version1.3.1/version
/dependency
dependency
groupIdorg.apache.spark/groupId
artifactIdspark-sql_2.10/artifactId
version1.3.1/version
/dependency
dependency
groupIdorg.apache.hbase/groupId
artifactIdhbase/artifactId
version0.94.0/version
/dependency

When I try to mvn package I am getting this issue:
cannot find symbol
[ERROR] symbol:   variable StringType
[ERROR] location: class org.apache.spark.sql.types.DataType

I have gone through
https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/types/StringType.html

What is missing here?


Difference ?

2015-05-04 Thread ๏̯͡๏
*Datasets*

val viEvents = viEventsRaw.map { vi = (vi.get(14).asInstanceOf[Long], vi) }

val lstgItem = listings.map { lstg = (lstg.getItemId().toLong, lstg) }

What is the difference between

1)

lstgItem.join(viEvents, new org.apache.spark.RangePartitioner(partitions =
1200, rdd = viEvents)).map { }


2)

lstgItem.join(viEvents).map { }

3)

lstgItem.join(viEvents,1200).map { }


4)

lstgItem.join(viEvents, new org.apache.spark.HashPartitioner(1200).map { }

5)

viEvents.join(lstgItem).map { }


Which is better in case when i run the join and one task runs for ever
because its gets 1000x times more number of records when compared to
others. Before running the join i do reparition () and still i see this
behavior. So which is best or at least one that completes ? In my case i am
not even able to get it running for 1G. 25G or 100G datasets.
Regards,
Deepak
-- 
Deepak


MLLib SVM probability

2015-05-04 Thread Robert Musters
Hi all,

I am trying to understand the output of the SVM classifier.

Right now, my output looks like this:

-18.841544889249917 0.0 

168.32916035523283 1.0 

420.67763915879794 1.0 

-974.1942589201286 0.0 

71.73602841256813 1.0 

233.13636224524993 1.0 

-1000.5902168199027 0.0



The documentation is unclear about what these numbers mean.

I think it is the distance to the hyperplane with sign.



My main question is: How can I convert distances from hyperplanes to 
probabilities in a multi-class one-vs-all approach?

SVMLib http://www.csie.ntu.edu.tw/~cjlin/libsvm/ has this functionality and 
refers the process to get the probabilities as “Platt scaling” 
http://www.researchgate.net/profile/John_Platt/publication/2594015_Probabilistic_Outputs_for_Support_Vector_Machines_and_Comparisons_to_Regularized_Likelihood_Methods/links/004635154cff5262d600.pdf
 .

I think this functionality should be in MLLib, but I can't find it?
Do you think Platt scaling makes sense?



Making clusters using Learning Vector Quantization, determining the spread 
function of a cluster with a Gaussian function and then retrieving the 
probability makes a lot more sense i.m.o. Using the distances from the 
hyperplanes from several SVM classifiers and then trying to determine some 
probability on these distance measures, does not make any sense, because the 
distribution property of the data-points belonging to a cluster is not taken 
into account. 
Does anyone see a fallacy in my reasoning?



With kind regards,

Robert



Re: spark 1.3.1

2015-05-04 Thread Driesprong, Fokko
Hi Saurabh,

Did you check the log of maven?

2015-05-04 15:17 GMT+02:00 Saurabh Gupta saurabh.gu...@semusi.com:

 HI,

 I am trying to build a example code given at

 https://spark.apache.org/docs/latest/sql-programming-guide.html#interoperating-with-rdds

 code is:

 // Import factory methods provided by DataType.import 
 org.apache.spark.sql.types.DataType;// Import StructType and 
 StructFieldimport org.apache.spark.sql.types.StructType;import 
 org.apache.spark.sql.types.StructField;// Import Row.import 
 org.apache.spark.sql.Row;
 // sc is an existing JavaSparkContext.SQLContext sqlContext = new 
 org.apache.spark.sql.SQLContext(sc);
 // Load a text file and convert each line to a JavaBean.JavaRDDString 
 people = sc.textFile(examples/src/main/resources/people.txt);
 // The schema is encoded in a stringString schemaString = name age;
 // Generate the schema based on the string of schemaListStructField fields 
 = new ArrayListStructField();for (String fieldName: schemaString.split( 
 )) {
   fields.add(DataType.createStructField(fieldName, DataType.StringType, 
 true));}StructType schema = DataType.createStructType(fields);
 // Convert records of the RDD (people) to Rows.JavaRDDRow rowRDD = 
 people.map(
   new FunctionString, Row() {
 public Row call(String record) throws Exception {
   String[] fields = record.split(,);
   return Row.create(fields[0], fields[1].trim());
 }
   });
 // Apply the schema to the RDD.DataFrame peopleDataFrame = 
 sqlContext.createDataFrame(rowRDD, schema);
 // Register the DataFrame as a 
 table.peopleDataFrame.registerTempTable(people);
 // SQL can be run over RDDs that have been registered as tables.DataFrame 
 results = sqlContext.sql(SELECT name FROM people);
 // The results of SQL queries are DataFrames and support all the normal RDD 
 operations.// The columns of a row in the result can be accessed by 
 ordinal.ListString names = results.map(new FunctionRow, String() {
   public String call(Row row) {
 return Name:  + row.getString(0);
   }}).collect();

 my pom file looks like:

 dependencies
 dependency
 groupIdorg.apache.spark/groupId
 artifactIdspark-core_2.10/artifactId
 version1.3.1/version
 /dependency
 dependency
 groupIdorg.apache.spark/groupId
 artifactIdspark-sql_2.10/artifactId
 version1.3.1/version
 /dependency
 dependency
 groupIdorg.apache.hbase/groupId
 artifactIdhbase/artifactId
 version0.94.0/version
 /dependency

 When I try to mvn package I am getting this issue:
 cannot find symbol
 [ERROR] symbol:   variable StringType
 [ERROR] location: class org.apache.spark.sql.types.DataType

 I have gone through
 https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/types/StringType.html

 What is missing here?




Re: Exiting driver main() method...

2015-05-04 Thread James Carman
I think I figured it out.  I am playing around with the Cassandra connector
and I had a method that inserted some data into a locally-running Cassandra
instance, but I forgot to close the Cluster object.  I guess that left some
non-daemon thread running and kept the process for exiting.  Nothing to see
here, move along.  :)


On Sat, May 2, 2015 at 2:44 PM Mohammed Guller moham...@glassbeam.com
wrote:

  No, you don’t need to do anything special. Perhaps, your application is
 getting stuck somewhere? If you can share your code, someone may be able to
 help.



 Mohammed



 *From:* James Carman [mailto:ja...@carmanconsulting.com]
 *Sent:* Friday, May 1, 2015 5:53 AM
 *To:* user@spark.apache.org
 *Subject:* Exiting driver main() method...



 In all the examples, it seems that the spark application doesn't really do
 anything special in order to exit.  When I run my application, however, the
 spark-submit script just hangs there at the end.  Is there something
 special I need to do to get that thing to exit normally?



Re: MLLib SVM probability

2015-05-04 Thread Driesprong, Fokko
Hi Robert,

I would say, taking the sign of the numbers represent the class of the
input-vector. What kind of data are you using, and what kind of traning-set
do you use. Fundamentally a SVM is able to separate only two classes, you
can do one vs the rest as you mentioned.

I don't see how LVQ can benefit the SVM classifier. I would say that this
is more a SVM problem, than a Spark.

2015-05-04 15:22 GMT+02:00 Robert Musters robert.must...@openindex.io:

  Hi all,

 I am trying to understand the output of the SVM classifier.

 Right now, my output looks like this:

 -18.841544889249917 0.0

 168.32916035523283 1.0

 420.67763915879794 1.0

 -974.1942589201286 0.0

 71.73602841256813 1.0

 233.13636224524993 1.0

 -1000.5902168199027 0.0


  The documentation is unclear about what these numbers mean
 https://spark.apache.org/docs/0.9.2/api/mllib/index.html#org.apache.spark.mllib.regression.LabeledPoint
 .

 I think it is the distance to the hyperplane with sign.


  My main question is: How can I convert distances from hyperplanes to
 probabilities in a multi-class one-vs-all approach?

 SVMLib http://www.csie.ntu.edu.tw/~cjlin/libsvm/ has this functionality
 and refers the process to get the probabilities as “Platt scaling”
 http://www.researchgate.net/profile/John_Platt/publication/2594015_Probabilistic_Outputs_for_Support_Vector_Machines_and_Comparisons_to_Regularized_Likelihood_Methods/links/004635154cff5262d600.pdf.


 I think this functionality should be in MLLib, but I can't find it?
 Do you think Platt scaling makes sense?


  Making clusters using Learning Vector Quantization, determining the
 spread function of a cluster with a Gaussian function and then retrieving
 the probability makes a lot more sense i.m.o. Using the distances from the
 hyperplanes from several SVM classifiers and then trying to determine some
 probability on these distance measures, does not make any sense, because
 the distribution property of the data-points belonging to a cluster is not
 taken into account.
 Does anyone see a fallacy in my reasoning?


  With kind regards,

 Robert



How to deal with code that runs before foreach block in Apache Spark?

2015-05-04 Thread Emre Sevinc
I'm trying to deal with some code that runs differently on Spark
stand-alone mode and Spark running on a cluster. Basically, for each item
in an RDD, I'm trying to add it to a list, and once this is done, I want to
send this list to Solr.

This works perfectly fine when I run the following code in stand-alone mode
of Spark, but does not work when the same code is run on a cluster. When I
run the same code on a cluster, it is like send to Solr part of the code
is executed before the list to be sent to Solr is filled with items. I try
to force the execution by solrInputDocumentJavaRDD.collect(); after
foreach, but it seems like it does not have any effect.

// For each RDD
solrInputDocumentJavaDStream.foreachRDD(
new FunctionJavaRDDSolrInputDocument, Void() {
  @Override
  public Void call(JavaRDDSolrInputDocument
solrInputDocumentJavaRDD) throws Exception {

// For each item in a single RDD
solrInputDocumentJavaRDD.foreach(
new VoidFunctionSolrInputDocument() {
  @Override
  public void call(SolrInputDocument solrInputDocument)
{

// Add the solrInputDocument to the list of
SolrInputDocuments

SolrIndexerDriver.solrInputDocumentList.add(solrInputDocument);
  }
});

// Try to force execution
solrInputDocumentJavaRDD.collect();


// After having finished adding every SolrInputDocument to the
list
// add it to the solrServer, and commit, waiting for the commit
to be flushed
try {

  // Seems like when run in cluster mode, the list size is zero,
 // therefore the following part is never executed

  if (SolrIndexerDriver.solrInputDocumentList != null
   SolrIndexerDriver.solrInputDocumentList.size() 
0) {

SolrIndexerDriver.solrServer.add(SolrIndexerDriver.solrInputDocumentList);
SolrIndexerDriver.solrServer.commit(true, true);
SolrIndexerDriver.solrInputDocumentList.clear();
  }
} catch (SolrServerException | IOException e) {
  e.printStackTrace();
}


return null;
  }
}
);


What should I do, so that sending-to-Solr part executes after the list of
SolrDocuments are added to solrInputDocumentList (and works also in cluster
mode)?


-- 
Emre Sevinç


Re: spark 1.3.1

2015-05-04 Thread Saurabh Gupta
I am really new to this but what should I look into maven logs?

I have tried mvn package -X -e

SHould I show the full trace?



On Mon, May 4, 2015 at 6:54 PM, Driesprong, Fokko fo...@driesprong.frl
wrote:

 Hi Saurabh,

 Did you check the log of maven?

 2015-05-04 15:17 GMT+02:00 Saurabh Gupta saurabh.gu...@semusi.com:

 HI,

 I am trying to build a example code given at

 https://spark.apache.org/docs/latest/sql-programming-guide.html#interoperating-with-rdds

 code is:

 // Import factory methods provided by DataType.import 
 org.apache.spark.sql.types.DataType;// Import StructType and 
 StructFieldimport org.apache.spark.sql.types.StructType;import 
 org.apache.spark.sql.types.StructField;// Import Row.import 
 org.apache.spark.sql.Row;
 // sc is an existing JavaSparkContext.SQLContext sqlContext = new 
 org.apache.spark.sql.SQLContext(sc);
 // Load a text file and convert each line to a JavaBean.JavaRDDString 
 people = sc.textFile(examples/src/main/resources/people.txt);
 // The schema is encoded in a stringString schemaString = name age;
 // Generate the schema based on the string of schemaListStructField fields 
 = new ArrayListStructField();for (String fieldName: schemaString.split( 
 )) {
   fields.add(DataType.createStructField(fieldName, DataType.StringType, 
 true));}StructType schema = DataType.createStructType(fields);
 // Convert records of the RDD (people) to Rows.JavaRDDRow rowRDD = 
 people.map(
   new FunctionString, Row() {
 public Row call(String record) throws Exception {
   String[] fields = record.split(,);
   return Row.create(fields[0], fields[1].trim());
 }
   });
 // Apply the schema to the RDD.DataFrame peopleDataFrame = 
 sqlContext.createDataFrame(rowRDD, schema);
 // Register the DataFrame as a 
 table.peopleDataFrame.registerTempTable(people);
 // SQL can be run over RDDs that have been registered as tables.DataFrame 
 results = sqlContext.sql(SELECT name FROM people);
 // The results of SQL queries are DataFrames and support all the normal RDD 
 operations.// The columns of a row in the result can be accessed by 
 ordinal.ListString names = results.map(new FunctionRow, String() {
   public String call(Row row) {
 return Name:  + row.getString(0);
   }}).collect();

 my pom file looks like:

 dependencies
 dependency
 groupIdorg.apache.spark/groupId
 artifactIdspark-core_2.10/artifactId
 version1.3.1/version
 /dependency
 dependency
 groupIdorg.apache.spark/groupId
 artifactIdspark-sql_2.10/artifactId
 version1.3.1/version
 /dependency
 dependency
 groupIdorg.apache.hbase/groupId
 artifactIdhbase/artifactId
 version0.94.0/version
 /dependency

 When I try to mvn package I am getting this issue:
 cannot find symbol
 [ERROR] symbol:   variable StringType
 [ERROR] location: class org.apache.spark.sql.types.DataType

 I have gone through
 https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/types/StringType.html

 What is missing here?





Re: How to deal with code that runs before foreach block in Apache Spark?

2015-05-04 Thread Gerard Maas
I'm not familiar with the Solr API but provided that ' SolrIndexerDriver'
is a singleton, I guess that what's going on when running on a cluster is
that the call to:

 SolrIndexerDriver.solrInputDocumentList.add(elem)

is happening on different singleton instances of the  SolrIndexerDriver on
different JVMs while

SolrIndexerDriver.solrServer.commit

is happening on the driver.

In practical terms, the lists on the executors are being filled-in but they
are never committed and on the driver the opposite is happening.

-kr, Gerard

On Mon, May 4, 2015 at 3:34 PM, Emre Sevinc emre.sev...@gmail.com wrote:

 I'm trying to deal with some code that runs differently on Spark
 stand-alone mode and Spark running on a cluster. Basically, for each item
 in an RDD, I'm trying to add it to a list, and once this is done, I want to
 send this list to Solr.

 This works perfectly fine when I run the following code in stand-alone
 mode of Spark, but does not work when the same code is run on a cluster.
 When I run the same code on a cluster, it is like send to Solr part of
 the code is executed before the list to be sent to Solr is filled with
 items. I try to force the execution by solrInputDocumentJavaRDD.collect();
 after foreach, but it seems like it does not have any effect.

 // For each RDD
 solrInputDocumentJavaDStream.foreachRDD(
 new FunctionJavaRDDSolrInputDocument, Void() {
   @Override
   public Void call(JavaRDDSolrInputDocument
 solrInputDocumentJavaRDD) throws Exception {

 // For each item in a single RDD
 solrInputDocumentJavaRDD.foreach(
 new VoidFunctionSolrInputDocument() {
   @Override
   public void call(SolrInputDocument
 solrInputDocument) {

 // Add the solrInputDocument to the list of
 SolrInputDocuments

 SolrIndexerDriver.solrInputDocumentList.add(solrInputDocument);
   }
 });

 // Try to force execution
 solrInputDocumentJavaRDD.collect();


 // After having finished adding every SolrInputDocument to the
 list
 // add it to the solrServer, and commit, waiting for the
 commit to be flushed
 try {

   // Seems like when run in cluster mode, the list size is
 zero,
  // therefore the following part is never executed

   if (SolrIndexerDriver.solrInputDocumentList != null
SolrIndexerDriver.solrInputDocumentList.size() 
 0) {

 SolrIndexerDriver.solrServer.add(SolrIndexerDriver.solrInputDocumentList);
 SolrIndexerDriver.solrServer.commit(true, true);
 SolrIndexerDriver.solrInputDocumentList.clear();
   }
 } catch (SolrServerException | IOException e) {
   e.printStackTrace();
 }


 return null;
   }
 }
 );


 What should I do, so that sending-to-Solr part executes after the list of
 SolrDocuments are added to solrInputDocumentList (and works also in cluster
 mode)?


 --
 Emre Sevinç



Re: mapping JavaRDD to jdbc DataFrame

2015-05-04 Thread ayan guha
You can use applySchema

On Mon, May 4, 2015 at 10:16 PM, Lior Chaga lio...@taboola.com wrote:

 Hi,

 I'd like to use a JavaRDD containing parameters for an SQL query, and use
 SparkSQL jdbc to load data from mySQL.

 Consider the following pseudo code:

 JavaRDDString namesRdd = ... ;
 ...
 options.put(url, jdbc:mysql://mysql?user=usr);
 options.put(password, pass);
 options.put(dbtable, (SELECT * FROM mytable WHERE userName = ?)
 sp_campaigns);
 DataFrame myTableDF = m_sqlContext.load(jdbc, options);


 I'm looking for a way to map namesRdd and get for each name the result of
 the queries, without loosing spark context.

 Using a mapping function doesn't seem like an option, because I don't have
 SQLContext inside it.
 I can only think of using collect, and than iterating over the string in
 the RDD and execute the query, but it would run in the driver program.

 Any suggestions?

 Thanks,
 Lior




-- 
Best Regards,
Ayan Guha


Troubling Logging w/Simple Example (spark-1.2.2-bin-hadoop2.4)...

2015-05-04 Thread James Carman
I have the following simple example program:

public class SimpleCount {

public static void main(String[] args) {
final String master = System.getProperty(spark.master,
local[*]);
System.out.printf(Running job against spark master %s ...%n,
master);

final SparkConf conf = new SparkConf()
.setAppName(simple-count)
.setMaster(master)
.set(spark.eventLog.enabled, true);
final JavaSparkContext sc = new JavaSparkContext(conf);

JavaRDDInteger rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5,
6, 7, 8, 9, 10));

long n = rdd.count();

System.out.printf(I counted %d integers.%n, n);
}
}

I start a local master:

export SPARK_MASTER_IP=localhost

sbin/start-master.sh


Then, I start a local worker:


bin/spark-class org.apache.spark.deploy.worker.Worker -h localhost
spark://localhost:7077



When I run the example application:


bin/spark-submit --class com.cengage.analytics.SimpleCount  --master
spark://localhost:7077
~/IdeaProjects/spark-analytics/target/spark-analytics-1.0-SNAPSHOT.jar


It finishes just fine (and even counts the right number :).  However, I get
the following log statements in the master's log file:


15/05/04 09:54:14 INFO Master: Registering app simple-count

15/05/04 09:54:14 INFO Master: Registered app simple-count with ID
app-20150504095414-0009

15/05/04 09:54:14 INFO Master: Launching executor app-20150504095414-0009/0
on worker worker-20150504095401-localhost-55806

15/05/04 09:54:17 INFO Master: akka.tcp://sparkDriver@jamess-mbp:55939 got
disassociated, removing it.

15/05/04 09:54:17 INFO Master: Removing app app-20150504095414-0009

15/05/04 09:54:17 WARN ReliableDeliverySupervisor: Association with remote
system [akka.tcp://sparkDriver@jamess-mbp:55939] has failed, address is now
gated for [5000] ms. Reason is: [Disassociated].

15/05/04 09:54:17 INFO LocalActorRef: Message
[akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from
Actor[akka://sparkMaster/deadLetters] to
Actor[akka://sparkMaster/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkMaster%40127.0.0.1%3A55948-17#800019242]
was not delivered. [18] dead letters encountered. This logging can be
turned off or adjusted with configuration settings 'akka.log-dead-letters'
and 'akka.log-dead-letters-during-shutdown'.

15/05/04 09:54:17 INFO SecurityManager: Changing view acls to: jcarman

15/05/04 09:54:17 INFO SecurityManager: Changing modify acls to: jcarman

15/05/04 09:54:17 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(jcarman);
users with modify permissions: Set(jcarman)

15/05/04 09:54:17 INFO Master: akka.tcp://sparkDriver@jamess-mbp:55939 got
disassociated, removing it.

15/05/04 09:54:17 WARN EndpointWriter: AssociationError
[akka.tcp://sparkMaster@localhost:7077] -
[akka.tcp://sparkWorker@localhost:51252]:
Error [Invalid address: akka.tcp://sparkWorker@localhost:51252] [

akka.remote.InvalidAssociation: Invalid address:
akka.tcp://sparkWorker@localhost:51252

Caused by: akka.remote.transport.Transport$InvalidAssociationException:
Connection refused: localhost/127.0.0.1:51252

]

15/05/04 09:54:17 WARN Remoting: Tried to associate with unreachable remote
address [akka.tcp://sparkWorker@localhost:51252]. Address is now gated for
5000 ms, all messages to this address will be delivered to dead letters.
Reason: Connection refused: localhost/127.0.0.1:51252

15/05/04 09:54:17 INFO Master: akka.tcp://sparkWorker@localhost:51252 got
disassociated, removing it.

15/05/04 09:54:17 WARN EndpointWriter: AssociationError
[akka.tcp://sparkMaster@localhost:7077] -
[akka.tcp://sparkWorker@jamess-mbp:50071]: Error [Invalid address:
akka.tcp://sparkWorker@jamess-mbp:50071] [

akka.remote.InvalidAssociation: Invalid address:
akka.tcp://sparkWorker@jamess-mbp:50071

Caused by: akka.remote.transport.Transport$InvalidAssociationException:
Connection refused: jamess-mbp/192.168.1.45:50071

]

15/05/04 09:54:17 WARN Remoting: Tried to associate with unreachable remote
address [akka.tcp://sparkWorker@jamess-mbp:50071]. Address is now gated for
5000 ms, all messages to this address will be delivered to dead letters.
Reason: Connection refused: jamess-mbp/192.168.1.45:50071

15/05/04 09:54:17 INFO Master: akka.tcp://sparkWorker@jamess-mbp:50071 got
disassociated, removing it.

15/05/04 09:54:17 INFO RemoteActorRefProvider$RemoteDeadLetterActorRef:
Message [org.apache.spark.deploy.DeployMessages$ApplicationFinished] from
Actor[akka://sparkMaster/user/Master#-1247271270] to
Actor[akka://sparkMaster/deadLetters] was not delivered. [19] dead letters
encountered. This logging can be turned off or adjusted with configuration
settings 'akka.log-dead-letters' and
'akka.log-dead-letters-during-shutdown'.

15/05/04 09:54:17 INFO RemoteActorRefProvider$RemoteDeadLetterActorRef:
Message 

java.io.IOException: No space left on device while doing repartitioning in Spark

2015-05-04 Thread shahab
Hi,

I am getting No space left on device exception when doing repartitioning
 of approx. 285 MB of data  while these is still 2 GB space left ??

does it mean that repartitioning needs more space (more than 2 GB) for
repartitioning of 285 MB of data ??

best,
/Shahab

java.io.IOException: No space left on device
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:51)
at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:205)
at 
sun.nio.ch.FileChannelImpl.transferToTrustedChannel(FileChannelImpl.java:473)
at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:569)
at org.apache.spark.util.Utils$.copyStream(Utils.scala:331)
at 
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$1.apply$mcVI$sp(ExternalSorter.scala:730)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at 
org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:728)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:68)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


Re: Re: sparksql running slow while joining 2 tables.

2015-05-04 Thread ayan guha
Are you using only 1 executor?

On Mon, May 4, 2015 at 11:07 PM, luohui20...@sina.com wrote:

 hi Olivier

 spark1.3.1, with java1.8.0.45

 and add 2 pics .

 it seems like a GC issue. I also tried with different parameters like
 memory size of driverexecutor, memory fraction, java opts...

 but this issue still happens.


 

 Thanksamp;Best regards!
 罗辉 San.Luo

 - 原始邮件 -
 发件人:Olivier Girardot ssab...@gmail.com
 收件人:luohui20...@sina.com, user user@spark.apache.org
 主题:Re: sparksql running slow while joining 2 tables.
 日期:2015年05月04日 20点46分

 Hi,
 What is you Spark version ?

 Regards,

 Olivier.

 Le lun. 4 mai 2015 à 11:03, luohui20...@sina.com a écrit :

 hi guys

 when i am running a sql  like select a.name,a.startpoint,a.endpoint,
 a.piece from db a join sample b on (a.name = b.name) where (b.startpoint
  a.startpoint + 25); I found sparksql running slow in minutes which may
 caused by very long GC and shuffle time.


table db is created from a txt file size at 56mb while table sample
 sized at 26mb, both at small size.

my spark cluster is a standalone  pseudo-distributed spark
 cluster with 8g executor and 4g driver manager.

any advises? thank you guys.



 

 Thanksamp;Best regards!
 罗辉 San.Luo

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Best Regards,
Ayan Guha


Re: spark 1.3.1

2015-05-04 Thread Deng Ching-Mallete
Hi,

I think you need to import org.apache.spark.sql.types.DataTypes instead
of org.apache.spark.sql.types.DataType and use that instead to access the
StringType..

HTH,
Deng

On Mon, May 4, 2015 at 9:37 PM, Saurabh Gupta saurabh.gu...@semusi.com
wrote:

 I am really new to this but what should I look into maven logs?

 I have tried mvn package -X -e

 SHould I show the full trace?



 On Mon, May 4, 2015 at 6:54 PM, Driesprong, Fokko fo...@driesprong.frl
 wrote:

 Hi Saurabh,

 Did you check the log of maven?

 2015-05-04 15:17 GMT+02:00 Saurabh Gupta saurabh.gu...@semusi.com:

 HI,

 I am trying to build a example code given at

 https://spark.apache.org/docs/latest/sql-programming-guide.html#interoperating-with-rdds

 code is:

 // Import factory methods provided by DataType.import 
 org.apache.spark.sql.types.DataType;// Import StructType and 
 StructFieldimport org.apache.spark.sql.types.StructType;import 
 org.apache.spark.sql.types.StructField;// Import Row.import 
 org.apache.spark.sql.Row;
 // sc is an existing JavaSparkContext.SQLContext sqlContext = new 
 org.apache.spark.sql.SQLContext(sc);
 // Load a text file and convert each line to a JavaBean.JavaRDDString 
 people = sc.textFile(examples/src/main/resources/people.txt);
 // The schema is encoded in a stringString schemaString = name age;
 // Generate the schema based on the string of schemaListStructField 
 fields = new ArrayListStructField();for (String fieldName: 
 schemaString.split( )) {
   fields.add(DataType.createStructField(fieldName, DataType.StringType, 
 true));}StructType schema = DataType.createStructType(fields);
 // Convert records of the RDD (people) to Rows.JavaRDDRow rowRDD = 
 people.map(
   new FunctionString, Row() {
 public Row call(String record) throws Exception {
   String[] fields = record.split(,);
   return Row.create(fields[0], fields[1].trim());
 }
   });
 // Apply the schema to the RDD.DataFrame peopleDataFrame = 
 sqlContext.createDataFrame(rowRDD, schema);
 // Register the DataFrame as a 
 table.peopleDataFrame.registerTempTable(people);
 // SQL can be run over RDDs that have been registered as tables.DataFrame 
 results = sqlContext.sql(SELECT name FROM people);
 // The results of SQL queries are DataFrames and support all the normal RDD 
 operations.// The columns of a row in the result can be accessed by 
 ordinal.ListString names = results.map(new FunctionRow, String() {
   public String call(Row row) {
 return Name:  + row.getString(0);
   }}).collect();

 my pom file looks like:

 dependencies
 dependency
 groupIdorg.apache.spark/groupId
 artifactIdspark-core_2.10/artifactId
 version1.3.1/version
 /dependency
 dependency
 groupIdorg.apache.spark/groupId
 artifactIdspark-sql_2.10/artifactId
 version1.3.1/version
 /dependency
 dependency
 groupIdorg.apache.hbase/groupId
 artifactIdhbase/artifactId
 version0.94.0/version
 /dependency

 When I try to mvn package I am getting this issue:
 cannot find symbol
 [ERROR] symbol:   variable StringType
 [ERROR] location: class org.apache.spark.sql.types.DataType

 I have gone through
 https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/types/StringType.html

 What is missing here?




Re: Spark Mongodb connection

2015-05-04 Thread Gaspar Muñoz
Hi Yasemin,

You can find here a MongoDB connector for Spark SQL:
http://github.com/Stratio/spark-mongodb

Best regards

2015-05-04 9:27 GMT+02:00 Yasemin Kaya godo...@gmail.com:

 Hi!

 I am new at Spark and I want to begin Spark with simple wordCount example
 in Java. But I want to give my input from Mongodb database. I want to learn
 how can I connect Mongodb database to my project. Any one can help for this
 issue.

 Have a nice day
 yasemin

 --
 hiç ender hiç




-- 

Gaspar Muñoz
@gmunozsoria


http://www.stratio.com/
Vía de las dos Castillas, 33, Ática 4, 3ª Planta
28224 Pozuelo de Alarcón, Madrid
Tel: +34 91 352 59 42 // *@stratiobd https://twitter.com/StratioBD*


Re: java.io.IOException: No space left on device while doing repartitioning in Spark

2015-05-04 Thread Ted Yu
See
https://wiki.gentoo.org/wiki/Knowledge_Base:No_space_left_on_device_while_there_is_plenty_of_space_available

What's the value for spark.local.dir property ?

Cheers

On Mon, May 4, 2015 at 6:57 AM, shahab shahab.mok...@gmail.com wrote:

 Hi,

 I am getting No space left on device exception when doing repartitioning
  of approx. 285 MB of data  while these is still 2 GB space left ??

 does it mean that repartitioning needs more space (more than 2 GB) for
 repartitioning of 285 MB of data ??

 best,
 /Shahab

 java.io.IOException: No space left on device
   at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
   at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)
   at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
   at sun.nio.ch.IOUtil.write(IOUtil.java:51)
   at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:205)
   at 
 sun.nio.ch.FileChannelImpl.transferToTrustedChannel(FileChannelImpl.java:473)
   at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:569)
   at org.apache.spark.util.Utils$.copyStream(Utils.scala:331)
   at 
 org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$1.apply$mcVI$sp(ExternalSorter.scala:730)
   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
   at 
 org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:728)
   at 
 org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:68)
   at 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
   at 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
   at org.apache.spark.scheduler.Task.run(Task.scala:56)
   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
   at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
   at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
   at java.lang.Thread.run(Thread.java:745)




Re: empty jdbc RDD in spark

2015-05-04 Thread Cody Koeninger
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.JdbcRDD

The arguments are sql string, lower bound, upper bound, number of
partitions.

Your call SELECT *  FROM MEMBERS LIMIT ? OFFSET  ?, 0, 100, 1
would thus be run as

SELECT *  FROM MEMBERS LIMIT 0 OFFSET  100

Naturally limit 0 will yield 0 results.

JdbcRDD is designed to be used with multiple partitions, with some kind of
numeric index.

Try something more like

SELECT * FROM MEMBERS WHERE ID = ? AND ID  ?, 0, howeverManyRowsYouHave, 8



On Fri, May 1, 2015 at 3:56 PM, Hafiz Mujadid hafizmujadi...@gmail.com
wrote:

 Hi all!
 I am trying to read hana database using spark jdbc RDD
 here is my code
 def readFromHana() {
 val conf = new SparkConf()
 conf.setAppName(test).setMaster(local)
 val sc = new SparkContext(conf)
 val rdd = new JdbcRDD(sc, () = {
   Class.forName(com.sap.db.jdbc.Driver).newInstance()

 DriverManager.getConnection(jdbc:sap://
 54.69.200.113:30015/?currentschema=LIVE2,
 mujadid, 786Xyz123)
 },
   SELECT *  FROM MEMBERS LIMIT ? OFFSET  ?,
   0, 100, 1,
   (r: ResultSet) =  convert(r) )
 println(rdd.count());
 sc.stop()
   }
   def convert(rs: ResultSet):String={
   val rsmd = rs.getMetaData()
   val numberOfColumns = rsmd.getColumnCount()
   var i = 1
   val row=new StringBuilder
   while (i = numberOfColumns) {
 row.append( rs.getString(i)+,)
 i += 1
   }
   row.toString()
}

 The resultant count is 0

 Any suggestion?

 Thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/empty-jdbc-RDD-in-spark-tp22736.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: SparkStream saveAsTextFiles()

2015-05-04 Thread anavidad
Hi,

What kind of can't find symbol are you receiving?

On the other hand, I would try to change guava dependency version to 14.0.1. 

In Spark 1.3.0, guava version is 14.0.1 but is not included inside spark
artifact because it's marked like provided.
http://repo1.maven.org/maven2/org/apache/spark/spark-core_2.10/1.3.0/spark-core_2.10-1.3.0.pom

Spark and Guava have a long history. You just have to search a bit in
google.

Regards.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkStream-saveAsTextFiles-tp22719p22754.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



SparkSQL Nested structure

2015-05-04 Thread Giovanni Paolo Gibilisco
Hi, I'm trying to parse log files generated by Spark using SparkSQL.

In the JSON elements related to the StageCompleted event we have a nested
structre containing an array of elements with RDD Info. (see the log below
as an example (omitting some parts).

{
Event: SparkListenerStageCompleted,
Stage Info: {
  Stage ID: 1,
  ...
  RDD Info: [
{
  RDD ID: 5,
  Name: 5,
  Storage Level: {
Use Disk: false,
Use Memory: false,
Use Tachyon: false,
Deserialized: false,
Replication: 1
  },
  Number of Partitions: 2,
  Number of Cached Partitions: 0,
  Memory Size: 0,
  Tachyon Size: 0,
  Disk Size: 0
},
...

When i register the log as a table SparkSQL is able to generate the correct
schema that for the RDD Info element looks like

 | -- RDD Info: array (nullable = true)
 ||-- element: struct (containsNull = true)
 |||-- Disk Size: long (nullable = true)
 |||-- Memory Size: long (nullable = true)
 |||-- Name: string (nullable = true)

My problem is that if I try to query the table I can only get array buffers
out of it:

SELECT `stageEndInfos.Stage Info.Stage ID`, `stageEndInfos.Stage Info.RDD
Info` FROM stageEndInfos
Stage ID RDD Info
1ArrayBuffer([0,0,...
0ArrayBuffer([0,0,...
2ArrayBuffer([0,0,...

or:

SELECT `stageEndInfos.Stage Info.RDD Info.RDD ID` FROM stageEndInfos
RDD ID
ArrayBuffer(5, 4, 3)
ArrayBuffer(2, 1, 0)
ArrayBuffer(9, 6,...

Is there a way to explode the arrays in the rows in order to build a single
table? (Knowing that the RDD ID is unique and can be used as primary key)?

Thanks!

How can I get


Is LIMIT n in Spark SQL useful?

2015-05-04 Thread Yi Zhang
I am trying to query PostgreSQL using LIMIT(n) to reduce memory size and 
improve query performance, but I found it took long time as same as querying 
not using LIMIT. It let me confused. Anybody know why?
Thanks.
Regards,Yi

Re: SparkStream saveAsTextFiles()

2015-05-04 Thread anavidad
Structure seems fine. Only need to type at the end of your program:

ssc.start();
ssc.awaitTermination();

Check method arguments. I advise you to check the spark java api streaming.

https://spark.apache.org/docs/1.3.0/api/java/

Regards.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkStream-saveAsTextFiles-tp22719p22755.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Long GC pauses with Spark SQL 1.3.0 and billion row tables

2015-05-04 Thread Richard Marscher
In regards to the large GC pauses, assuming you allocated all 100GB of
memory per worker you may consider running with less memory on your Worker
nodes, or splitting up the available memory on the Worker nodes amongst
several worker instances. The JVM's garbage collection starts to become
very slow as the memory allocation for the heap becomes large. At 100GB it
may not be unusual to see GC take minutes at time. I believe with Yarn or
Standalone clusters you should be able to run multiple smaller JVM
instances on your workers so you can still use your cluster resources but
also won't have a single JVM allocating an unwieldy amount of much memory.

On Mon, May 4, 2015 at 2:23 AM, Nick Travers n.e.trav...@gmail.com wrote:

 Could you be more specific in how this is done?

 A DataFrame class doesn't have that method.

 On Sun, May 3, 2015 at 11:07 PM, ayan guha guha.a...@gmail.com wrote:

 You can use custom partitioner to redistribution using partitionby
 On 4 May 2015 15:37, Nick Travers n.e.trav...@gmail.com wrote:

 I'm currently trying to join two large tables (order 1B rows each) using
 Spark SQL (1.3.0) and am running into long GC pauses which bring the job
 to
 a halt.

 I'm reading in both tables using a HiveContext with the underlying files
 stored as Parquet Files. I'm using  something along the lines of
 HiveContext.sql(SELECT a.col1, b.col2 FROM a JOIN b ON a.col1 =
 b.col1) to
 set up the join.

 When I execute this (with an action such as .count) I see the first few
 stages complete, but the job eventually stalls. The GC counts keep
 increasing for each executor.

 Running with 6 workers, each with 2T disk and 100GB RAM.

 Has anyone else run into this issue? I'm thinking I might be running into
 issues with the shuffling of the data, but I'm unsure of how to get
 around
 this? Is there a way to redistribute the rows based on the join key
 first,
 and then do the join?

 Thanks in advance.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Long-GC-pauses-with-Spark-SQL-1-3-0-and-billion-row-tables-tp22750.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Is LIMIT n in Spark SQL useful?

2015-05-04 Thread Robin East
What query are you running. It may be the case that your query requires 
PosgreSQL to do a large amount of work before identifying the first n rows
 On 4 May 2015, at 15:52, Yi Zhang zhangy...@yahoo.com.INVALID wrote:
 
 I am trying to query PostgreSQL using LIMIT(n) to reduce memory size and 
 improve query performance, but I found it took long time as same as querying 
 not using LIMIT. It let me confused. Anybody know why?
 
 Thanks.
 
 Regards,
 Yi



Re: Is LIMIT n in Spark SQL useful?

2015-05-04 Thread Robin East
and a further question - have you tried running this query in pqsl? what’s the 
performance like there?
 On 4 May 2015, at 16:04, Robin East robin.e...@xense.co.uk wrote:
 
 What query are you running. It may be the case that your query requires 
 PosgreSQL to do a large amount of work before identifying the first n rows
 On 4 May 2015, at 15:52, Yi Zhang zhangy...@yahoo.com.INVALID 
 mailto:zhangy...@yahoo.com.INVALID wrote:
 
 I am trying to query PostgreSQL using LIMIT(n) to reduce memory size and 
 improve query performance, but I found it took long time as same as querying 
 not using LIMIT. It let me confused. Anybody know why?
 
 Thanks.
 
 Regards,
 Yi
 



Python Custom Partitioner

2015-05-04 Thread ayan guha
Hi

Can someone share some working code for custom partitioner in python?

I am trying to understand it better.

Here is documentation

partitionBy(*numPartitions*, *partitionFunc=function portable_hash at
0x2c45140*)
https://spark.apache.org/docs/1.3.1/api/python/pyspark.html#pyspark.RDD.partitionBy

Return a copy of the RDD partitioned using the specified partitioner.


what I am trying to do -

1. Create a dataframe

2. Partition it using one specific column

3. create another dataframe

4. partition it on the same column

5. join (to enforce map-side join)

My question:

a) Am I on right path?

b) How can I do partitionby? Specifically, when I call DF.rdd.partitionBy,
what gets passed to the custom function? tuple? row? how to access (say 3rd
column of a tuple inside partitioner function)?

-- 
Best Regards,
Ayan Guha


Re: Python Custom Partitioner

2015-05-04 Thread ๏̯͡๏
I have implemented map-side join with broadcast variables and the code is
on mailing list (scala).


On Mon, May 4, 2015 at 8:38 PM, ayan guha guha.a...@gmail.com wrote:

 Hi

 Can someone share some working code for custom partitioner in python?

 I am trying to understand it better.

 Here is documentation

 partitionBy(*numPartitions*, *partitionFunc=function portable_hash at
 0x2c45140*)
 https://spark.apache.org/docs/1.3.1/api/python/pyspark.html#pyspark.RDD.partitionBy

 Return a copy of the RDD partitioned using the specified partitioner.


 what I am trying to do -

 1. Create a dataframe

 2. Partition it using one specific column

 3. create another dataframe

 4. partition it on the same column

 5. join (to enforce map-side join)

 My question:

 a) Am I on right path?

 b) How can I do partitionby? Specifically, when I call DF.rdd.partitionBy,
 what gets passed to the custom function? tuple? row? how to access (say 3rd
 column of a tuple inside partitioner function)?

 --
 Best Regards,
 Ayan Guha




-- 
Deepak


Re: Spark - Timeout Issues - OutOfMemoryError

2015-05-04 Thread ๏̯͡๏
I tried this

val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))]
= lstgItem.join(viEvents, new org.apache.spark.RangePartitioner(partitions
= 1200, rdd = viEvents)).map {


It fired two jobs and still i have 1 task that never completes.
IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDurationGC
TimeShuffle Read Size / Records ▴Shuffle Spill (Memory)Shuffle Spill (Disk)
Errors  0 4818 0 RUNNING PROCESS_LOCAL 5 / host1 2015/05/04 07:24:25 1.1 h
13 min  778.0 MB / 50314161  4.5 GB 47.4 MB   955 5773 0 SUCCESS
PROCESS_LOCAL 5 / host2 2015/05/04 07:47:16 2.2 min  1.5 min  106.3 MB /
4197539  0.0 B 0.0 B   1199 6017 0 SUCCESS PROCESS_LOCAL 3 / host3 2015/05/04
07:51:51 48 s  2 s  94.2 MB / 3618335  2.8 GB 8.6 MB   216



2)
I tried reversing the datasets in join

val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))]
=viEvents.join(lstgItem)

This led to same problem of a long running task.
3)
Next, i am trying this

val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))]
= lstgItem.join(viEvents, 1200).map {


I have exhausted all my options.


Regards,

Deepak


On Mon, May 4, 2015 at 6:24 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 I ran it against one file instead of 10 files and i see one task is still
 running after 33 mins its shuffle read size is 780MB/50 mil records.

 I did a count of records for each itemId from dataset-2 [One FILE] (Second
 Dataset (RDDPair) val viEvents = viEventsRaw.map { vi = (vi.get(14
 ).asInstanceOf[Long], vi) } ). This is the dataset that contains the list
 of items viewed by user in one day.

 *Item IdCount*
 201335783004 537
 111654496030 353
 141640703798 287
 191568402102 258
 111654479898 217
 231521843148 211
 251931716094 201
 111654493548 181
 181503913062 181
 121635453050 152
 261798565828 140
 151494617682 139
 251927181728 127
 231516683056 119
 141640492864 117
 161677270656 117
 171771073616 113
 111649942124 109
 191516989450 97
 231539161292 94
 221555628408 88
 131497785968 87
 121632233872 84
 131335379184 83
 281531363490 83
 131492727742 79
 231174157820 79
 161666914810 77
 251699753072 77
 161683664300 76


 I was assuming that data-skew would be if the top item(201335783004) had
 a count of 1 million, however its only few hundreds, then why is Spark
 skewing it in join ? What should i do that Spark distributes the records
 evenly ?

 In M/R we can change the Partitioner between mapper and reducer, how can i
 do in Spark  for Join?


 IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDurationGC
 TimeShuffle Read Size / Records ▴Shuffle Spill (Memory)Shuffle Spill
 (Disk)Errors  0 3618 0 RUNNING PROCESS_LOCAL 4 / host12015/05/04 05:09:53 33
 min  8.5 min  783.9 MB / 50,761,322  4.6 GB 47.5 MB   433 4051 0 SUCCESS
 PROCESS_LOCAL 1 / host2 2015/05/04 05:16:27 1.1 min  20 s  116.0 MB /
 4505143  1282.3 MB 10.1 MB   218 3836 0 SUCCESS PROCESS_LOCAL 3 / host3 
 2015/05/04
 05:13:01 53 s  11 s  76.4 MB / 2865143  879.6 MB 6.9 MB   113 3731 0
 SUCCESS PROCESS_LOCAL 2 / host4 2015/05/04 05:11:30 31 s  8 s  6.9 MB /
 5187  0.0 B 0.0 B

 On Mon, May 4, 2015 at 6:00 PM, Saisai Shao sai.sai.s...@gmail.com
 wrote:

 From the symptoms you mentioned that one task's shuffle write is much
 larger than all the other task, it is quite similar to normal data skew
 behavior, I just give some advice based on your descriptions, I think you
 need to detect whether data is actually skewed or not.

 The shuffle will put data with same partitioner strategy (default is hash
 partitioner) into one task, so the same key data will be put into the same
 task, but one task not just has only one key.

 2015-05-04 18:04 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com:

 Attached image shows the Spark UI for the job.





 On Mon, May 4, 2015 at 3:28 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 Four tasks are now failing with

 IndexIDAttemptStatus ▾Locality LevelExecutor ID / HostLaunch Time
 DurationGC TimeShuffle Read Size / RecordsShuffle Spill (Memory)Shuffle
 Spill (Disk)Errors  0 3771 0 FAILED PROCESS_LOCAL 114 / host1 2015/05/04
 01:27:44   /   ExecutorLostFailure (executor 114 lost)  1007 4973 1
 FAILED PROCESS_LOCAL 420 / host2 2015/05/04 02:13:14   /   
 FetchFailed(null,
 shuffleId=1, mapId=-1, reduceId=1007, message= +details

 FetchFailed(null, shuffleId=1, mapId=-1, reduceId=1007, message=
 org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output 
 location for shuffle 1
at 
 org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:385)
at 
 org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:382)
at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
 

Parallelize foreach in PySpark with Spark Standalone

2015-05-04 Thread kdunn
Full disclosure, I am *brand* new to Spark. 

I am trying to use [Py]SparkSQL standalone to pre-process a bunch of *local*
(non HDFS) Parquet files. I have several thousand files and want to dispatch
as many workers as my machine can handle to process the data in parallel;
either at the per-file or per-record (or batch of records) within a single
file. 

My question is, how can this be achieved in a standalone scenario? I have
plenty cores and RAM yet when I do `sc = SparkContext(local[8])` in my
stand alone script I see no speedup compared to, say, local[1]. I've also
tried something like : distData = sc.parallelize(data) then
distData.foreach(myFunction) after starting with local[N], yet that seems to
return immediately without producing the expected side effects from
myFunction (file output).

I realize parallelizing Python code on a single node cluster is not what
Spark was designed for but it seems to integrate Parquet and Python so well
that it's my only option. :)


Thanks,
Kyle



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Parallelize-foreach-in-PySpark-with-Spark-Standalone-tp22756.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



AJAX with Apache Spark

2015-05-04 Thread Sergio Jiménez Barrio
Hi,

 I am trying create a DashBoard of a job of Apache Spark. I need run Spark
Streaming 24/7 and when recive a ajax request this answer with the actual
state of the job. I have created the client, and the program in Spark. I
tried create the service of response with play, but this run the program
with a request. I want send the accumulator of spark program with a request.

Sorry for my explanation. Any idea? Maybe with Play?

Thanks


com.datastax.spark % spark-streaming_2.10 % 1.1.0 in my build.sbt ??

2015-05-04 Thread Eric Ho
Can I specify this in my build file ?

Thanks.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/com-datastax-spark-spark-streaming-2-10-1-1-0-in-my-build-sbt-tp22758.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



No logs from my cluster / worker ... (running DSE 4.6.1)

2015-05-04 Thread Eric Ho
I'm submitting this via 'dse spark-submit' but somehow, I don't see any
loggings in my cluster or worker machines...

How can I find out ?

My cluster is running DSE 4.6.1 with Spark enabled.
My source is running Kafka 0.8.2.0

I'm launching my program on one of my DSE machines.

Any insights much appreciated.

Thanks.

-
cas1.dev% dse spark-submit --verbose --deploy-mode cluster --master
spark://cas1.dev.kno.com:7077 --class
com.kno.highlights.counting.service.HighlightConsumer --driver-class-path
/opt/kno/kno-highlights-counting-service/kno-highlights-counting-service-0.1/lib
--driver-library-path
/opt/kno/kno-highlights-counting-service/kno-highlights-counting-service-0.1/lib
--properties-file /tmp/highlights-counting.properties
/opt/kno/kno-highlights-counting-service/kno-highlights-counting-service-0.1/lib/kno-highlights-counting-service.kno-highlights-counting-service-0.1.jar
--name HighlightConsumer
Using properties file: /tmp/highlights-counting.properties
Warning: Ignoring non-spark config property:
checkpoint_directory=checkpointForHighlights
Warning: Ignoring non-spark config property: zookeeper_port=2181
Warning: Ignoring non-spark config property:
default_num_of_cores_per_topic=1
Warning: Ignoring non-spark config property: num_of_concurrent_streams=2
Warning: Ignoring non-spark config property:
kafka_consumer_group=highlight_consumer_group
Warning: Ignoring non-spark config property: app_name=HighlightConsumer
Warning: Ignoring non-spark config property: cassandra_keyspace=bookevents
Warning: Ignoring non-spark config property: scheduler_mode=FIFO
Warning: Ignoring non-spark config property: highlight_topic=highlight_topic
Warning: Ignoring non-spark config property: cassandra_host=cas1.dev.kno.com
Warning: Ignoring non-spark config property: checkpoint_interval=3
Warning: Ignoring non-spark config property: zookeeper_host=cas1.dev.kno.com
Adding default property: spark_master=spark://cas1.dev.kno.com:7077
Warning: Ignoring non-spark config property: streaming_window=10
Using properties file: /tmp/highlights-counting.properties
Warning: Ignoring non-spark config property:
checkpoint_directory=checkpointForHighlights
Warning: Ignoring non-spark config property: zookeeper_port=2181
Warning: Ignoring non-spark config property:
default_num_of_cores_per_topic=1
Warning: Ignoring non-spark config property: num_of_concurrent_streams=2
Warning: Ignoring non-spark config property:
kafka_consumer_group=highlight_consumer_group
Warning: Ignoring non-spark config property: app_name=HighlightConsumer
Warning: Ignoring non-spark config property: cassandra_keyspace=bookevents
Warning: Ignoring non-spark config property: scheduler_mode=FIFO
Warning: Ignoring non-spark config property: highlight_topic=highlight_topic
Warning: Ignoring non-spark config property: cassandra_host=cas1.dev.kno.com
Warning: Ignoring non-spark config property: checkpoint_interval=3
Warning: Ignoring non-spark config property: zookeeper_host=cas1.dev.kno.com
Adding default property: spark_master=spark://cas1.dev.kno.com:7077
Warning: Ignoring non-spark config property: streaming_window=10
Parsed arguments:
  master  spark://cas1.dev.kno.com:7077
  deployMode  cluster
  executorMemory  null
  executorCores   null
  totalExecutorCores  null
  propertiesFile  /tmp/highlights-counting.properties
  extraSparkPropertiesMap()
  driverMemorynull
  driverCores null
  driverExtraClassPath   
/opt/kno/kno-highlights-counting-service/kno-highlights-counting-service-0.1/lib
  driverExtraLibraryPath 
/opt/kno/kno-highlights-counting-service/kno-highlights-counting-service-0.1/lib
  driverExtraJavaOptions  null
  supervise   false
  queue   null
  numExecutorsnull
  files   null
  pyFiles null
  archivesnull
  mainClass  
com.kno.highlights.counting.service.HighlightConsumer
  primaryResource
file:/opt/kno/kno-highlights-counting-service/kno-highlights-counting-service-0.1/lib/kno-highlights-counting-service.kno-highlights-counting-service-0.1.jar
  name   
com.kno.highlights.counting.service.HighlightConsumer
  childArgs   [--name HighlightConsumer]
  jarsnull
  verbose true

Default properties from /tmp/highlights-counting.properties:
  spark_master - spark://cas1.dev.kno.com:7077


Using properties file: /tmp/highlights-counting.properties
Warning: Ignoring non-spark config property:
checkpoint_directory=checkpointForHighlights
Warning: Ignoring non-spark config property: zookeeper_port=2181
Warning: Ignoring non-spark config property:
default_num_of_cores_per_topic=1
Warning: Ignoring non-spark config property: num_of_concurrent_streams=2
Warning: Ignoring non-spark config property:
kafka_consumer_group=highlight_consumer_group
Warning: Ignoring non-spark config property: 

OOM error with GMMs on 4GB dataset

2015-05-04 Thread Vinay Muttineni
Hi, I am training a GMM with 10 gaussians on a 4 GB dataset(720,000 * 760).
The spark (1.3.1) job is allocated 120 executors with 6GB each and the
driver also has 6GB.
Spark Config Params:

.set(spark.hadoop.validateOutputSpecs,
false).set(spark.dynamicAllocation.enabled,
false).set(spark.driver.maxResultSize,
4g).set(spark.default.parallelism, 300).set(spark.serializer,
org.apache.spark.serializer.KryoSerializer).set(spark.kryoserializer.buffer.mb,
500).set(spark.akka.frameSize, 256).set(spark.akka.timeout, 300)

However, at the aggregate step (Line 168)
val sums = breezeData.aggregate(ExpectationSum.zero(k, d))(compute.value, _
+= _)

I get OOM error and the application hangs indefinitely. Is this an issue or
am I missing something?
java.lang.OutOfMemoryError: Java heap space
at akka.util.CompactByteString$.apply(ByteString.scala:410)
at akka.util.ByteString$.apply(ByteString.scala:22)
at
akka.remote.transport.netty.TcpHandlers$class.onMessage(TcpSupport.scala:45)
at
akka.remote.transport.netty.TcpServerHandler.onMessage(TcpSupport.scala:57)
at
akka.remote.transport.netty.NettyServerHelpers$class.messageReceived(NettyHelpers.scala:43)
at
akka.remote.transport.netty.ServerHandler.messageReceived(NettyTransport.scala:180)
at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
at
org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
at
org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
at
org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:310)
at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
at
org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
at
org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
at
org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318)
at
org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
at
org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

15/05/04 16:23:38 ERROR util.Utils: Uncaught exception in thread
task-result-getter-2
java.lang.OutOfMemoryError: Java heap space
Exception in thread task-result-getter-2 java.lang.OutOfMemoryError: Java
heap space
15/05/04 16:23:45 INFO scheduler.TaskSetManager: Finished task 1070.0 in
stage 6.0 (TID 8276) in 382069 ms on [] (160/3600)
15/05/04 16:23:54 WARN channel.DefaultChannelPipeline: An exception was
thrown by a user handler while handling an exception event ([id:
0xc57da871, ] EXCEPTION: java.lang.OutOfMemoryError: Java heap space)
java.lang.OutOfMemoryError: Java heap space
15/05/04 16:23:55 WARN channel.DefaultChannelPipeline: An exception was
thrown by a user handler while handling an exception event ([id:
0x3c3dbb0c, ] EXCEPTION: java.lang.OutOfMemoryError: Java heap space)
15/05/04 16:24:45 ERROR actor.ActorSystemImpl: Uncaught fatal error from
thread [sparkDriver-akka.remote.default-remote-dispatcher-6] shutting down
ActorSystem [sparkDriver]



Thanks!
Vinay


spark Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient

2015-05-04 Thread ??
hi all,
   when i use submit a spark-sql programe to select data from my hive 
database I get an error like this:
User class threw exception: java.lang.RuntimeException: Unable to  instantiate 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient ,what's wrong with my 
spark configure ,thank any help !

RE: 回复:RE: 回复:Re: sparksql running slow while joining_2_tables.

2015-05-04 Thread Wang, Daoyuan
You can use
Explain extended select ….

From: luohui20...@sina.com [mailto:luohui20...@sina.com]
Sent: Tuesday, May 05, 2015 9:52 AM
To: Cheng, Hao; Olivier Girardot; user
Subject: 回复:RE: 回复:Re: sparksql running slow while joining_2_tables.


As I know broadcastjoin is automatically enabled by 
spark.sql.autoBroadcastJoinThreshold.

refer to 
http://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration-options



and how to check my app's physical plan,and others things like optimized 
plan,executable plan.etc



thanks



Thanksamp;Best regards!
罗辉 San.Luo

- 原始邮件 -
发件人:Cheng, Hao hao.ch...@intel.commailto:hao.ch...@intel.com
收件人:Cheng, Hao hao.ch...@intel.commailto:hao.ch...@intel.com, 
luohui20...@sina.commailto:luohui20...@sina.com 
luohui20...@sina.commailto:luohui20...@sina.com, Olivier Girardot 
ssab...@gmail.commailto:ssab...@gmail.com, user 
user@spark.apache.orgmailto:user@spark.apache.org
主题:RE: 回复:Re: sparksql running slow while joining_2_tables.
日期:2015年05月05日 08点38分

Or, have you ever try broadcast join?

From: Cheng, Hao [mailto:hao.ch...@intel.com]
Sent: Tuesday, May 5, 2015 8:33 AM
To: luohui20...@sina.commailto:luohui20...@sina.com; Olivier Girardot; user
Subject: RE: 回复:Re: sparksql running slow while joining 2 tables.

Can you print out the physical plan?

EXPLAIN SELECT xxx…

From: luohui20...@sina.commailto:luohui20...@sina.com 
[mailto:luohui20...@sina.com]
Sent: Monday, May 4, 2015 9:08 PM
To: Olivier Girardot; user
Subject: 回复:Re: sparksql running slow while joining 2 tables.


hi Olivier

spark1.3.1, with java1.8.0.45

and add 2 pics .

it seems like a GC issue. I also tried with different parameters like memory 
size of driverexecutor, memory fraction, java opts...

but this issue still happens.



Thanksamp;Best regards!
罗辉 San.Luo

- 原始邮件 -
发件人:Olivier Girardot ssab...@gmail.commailto:ssab...@gmail.com
收件人:luohui20...@sina.commailto:luohui20...@sina.com, user 
user@spark.apache.orgmailto:user@spark.apache.org
主题:Re: sparksql running slow while joining 2 tables.
日期:2015年05月04日 20点46分

Hi,
What is you Spark version ?

Regards,

Olivier.

Le lun. 4 mai 2015 à 11:03, luohui20...@sina.commailto:luohui20...@sina.com 
a écrit :

hi guys

when i am running a sql  like select 
a.namehttp://a.name,a.startpoint,a.endpoint, a.piece from db a join sample b 
on (a.namehttp://a.name = b.namehttp://b.name) where (b.startpoint  
a.startpoint + 25); I found sparksql running slow in minutes which may caused 
by very long GC and shuffle time.



   table db is created from a txt file size at 56mb while table sample 
sized at 26mb, both at small size.

   my spark cluster is a standalone  pseudo-distributed spark cluster with 
8g executor and 4g driver manager.

   any advises? thank you guys.





Thanksamp;Best regards!
罗辉 San.Luo

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org


Re: Nightly builds/releases?

2015-05-04 Thread Guru Medasani
I see a Jira for this one, but unresolved. 

https://issues.apache.org/jira/browse/SPARK-1517 
https://issues.apache.org/jira/browse/SPARK-1517




 On May 4, 2015, at 10:25 PM, Ankur Chauhan achau...@brightcove.com wrote:
 
 Hi,
 
 Does anyone know if spark has any nightly builds or equivalent that provides 
 binaries that have passed a CI build so that one could try out the bleeding 
 edge without having to compile.
 
 -- Ankur



RE: 回复:Re: sparksql running slow while joining 2 tables.

2015-05-04 Thread Cheng, Hao
Can you print out the physical plan?

EXPLAIN SELECT xxx…

From: luohui20...@sina.com [mailto:luohui20...@sina.com]
Sent: Monday, May 4, 2015 9:08 PM
To: Olivier Girardot; user
Subject: 回复:Re: sparksql running slow while joining 2 tables.


hi Olivier

spark1.3.1, with java1.8.0.45

and add 2 pics .

it seems like a GC issue. I also tried with different parameters like memory 
size of driverexecutor, memory fraction, java opts...

but this issue still happens.



Thanksamp;Best regards!
罗辉 San.Luo

- 原始邮件 -
发件人:Olivier Girardot ssab...@gmail.commailto:ssab...@gmail.com
收件人:luohui20...@sina.commailto:luohui20...@sina.com, user 
user@spark.apache.orgmailto:user@spark.apache.org
主题:Re: sparksql running slow while joining 2 tables.
日期:2015年05月04日 20点46分

Hi,
What is you Spark version ?

Regards,

Olivier.

Le lun. 4 mai 2015 à 11:03, luohui20...@sina.commailto:luohui20...@sina.com 
a écrit :

hi guys

when i am running a sql  like select 
a.namehttp://a.name,a.startpoint,a.endpoint, a.piece from db a join sample b 
on (a.namehttp://a.name = b.namehttp://b.name) where (b.startpoint  
a.startpoint + 25); I found sparksql running slow in minutes which may caused 
by very long GC and shuffle time.



   table db is created from a txt file size at 56mb while table sample 
sized at 26mb, both at small size.

   my spark cluster is a standalone  pseudo-distributed spark cluster with 
8g executor and 4g driver manager.

   any advises? thank you guys.





Thanksamp;Best regards!
罗辉 San.Luo

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org


Re: Kryo serialization of classes in additional jars

2015-05-04 Thread Akshat Aranya
Actually, after some digging, I did find a JIRA for it: SPARK-5470.
The fix for this has gone into master, but it isn't in 1.2.

On Mon, May 4, 2015 at 2:47 PM, Imran Rashid iras...@cloudera.com wrote:
 Oh, this seems like a real pain.  You should file a jira, I didn't see an
 open issue -- if nothing else just to document the issue.

 As you've noted, the problem is that the serializer is created immediately
 in the executors, right when the SparkEnv is created, but the other jars
 aren't downloaded later.  I think you could workaround with some combination
 of pushing the jars to the cluster manually, and then using
 spark.executor.extraClassPath

 On Wed, Apr 29, 2015 at 6:42 PM, Akshat Aranya aara...@gmail.com wrote:

 Hi,

 Is it possible to register kryo serialization for classes contained in
 jars that are added with spark.jars?  In my experiment it doesn't seem to
 work, likely because the class registration happens before the jar is
 shipped to the executor and added to the classloader.  Here's the general
 idea of what I want to do:

val sparkConf = new SparkConf(true)
   .set(spark.jars, foo.jar)
   .setAppName(foo)
   .set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)

 // register classes contained in foo.jar
 sparkConf.registerKryoClasses(Array(
   classOf[com.foo.Foo],
   classOf[com.foo.Bar]))



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



sparksql support hive view

2015-05-04 Thread luohui20001
guys, just to confirm, sparksql support hive feature view, is that the one 
LateralView in hive language manual?
thanks

 
Thanksamp;Best regards!
罗辉 San.Luo


Re: Nightly builds/releases?

2015-05-04 Thread Ted Yu
See this related thread:
http://search-hadoop.com/m/JW1q5bnnyT1

Cheers

On Mon, May 4, 2015 at 7:58 PM, Guru Medasani gdm...@gmail.com wrote:

 I see a Jira for this one, but unresolved.

 https://issues.apache.org/jira/browse/SPARK-1517




 On May 4, 2015, at 10:25 PM, Ankur Chauhan achau...@brightcove.com
 wrote:

 Hi,

 Does anyone know if spark has any nightly builds or equivalent that
 provides binaries that have passed a CI build so that one could try out the
 bleeding edge without having to compile.

 -- Ankur





Re: Nightly builds/releases?

2015-05-04 Thread Ankur Chauhan
Hi,

There is also a make-distribution.sh file in the repository root. If someone 
with jenkins access can create a simple builder that would be awesome.
But I am guessing besides the spark binary one would also probably want the 
maven artifacts (lower priority though) to work with it.

-- Ankur
 On 4 May 2015, at 20:11, Ted Yu yuzhih...@gmail.com wrote:
 
 See this related thread:
 http://search-hadoop.com/m/JW1q5bnnyT1
 
 Cheers
 
 On Mon, May 4, 2015 at 7:58 PM, Guru Medasani gdm...@gmail.com wrote:
 I see a Jira for this one, but unresolved.
 
 https://issues.apache.org/jira/browse/SPARK-1517
 
 
 
 
 On May 4, 2015, at 10:25 PM, Ankur Chauhan achau...@brightcove.com wrote:
 
 Hi,
 
 Does anyone know if spark has any nightly builds or equivalent that provides 
 binaries that have passed a CI build so that one could try out the bleeding 
 edge without having to compile.
 
 -- Ankur
 
 



signature.asc
Description: Message signed with OpenPGP using GPGMail


Help with Spark SQL Hash Distribution

2015-05-04 Thread Mani
I am trying to distribute a table using a particular column which is the key 
that I’ll be using to perform join operations on the table. Is it possible to 
do this with Spark SQL?
I checked the method partitionBy() for rdds. But not sure how to specify which 
column is the key? Can anyone give an example?

Thanks
Mani
Graduate Student, Department of Computer Science
Virginia Tech








Re: sparksql support hive view

2015-05-04 Thread Michael Armbrust
We support both LATERAL VIEWs (a query language feature that lets you turn
a single row into many rows, for example with an explode) and virtual views
(a table that is really just a query that is run on demand).

On Mon, May 4, 2015 at 7:12 PM, luohui20...@sina.com wrote:

 guys,

  just to confirm, sparksql support hive feature view, is that the one
 LateralView
 https://cwiki.apache.org/confluence/display/Hive/LanguageManual+LateralView 
 in
 hive language manual?


 thanks

 

 Thanksamp;Best regards!
 罗辉 San.Luo



Re: Help with Spark SQL Hash Distribution

2015-05-04 Thread Michael Armbrust
If you do a join with at least one equality relationship between the two
tables, Spark SQL will automatically hash partition the data and perform
the join.

If you are looking to prepartition the data, that information is not yet
propagated from the in memory cached representation so won't help avoid an
extra shuffle, but Kai (cc-ed) was hoping to add that feature.

On Mon, May 4, 2015 at 9:05 PM, Mani man...@vt.edu wrote:

 I am trying to distribute a table using a particular column which is the
 key that I’ll be using to perform join operations on the table. Is it
 possible to do this with Spark SQL?
 I checked the method partitionBy() for rdds. But not sure how to specify
 which column is the key? Can anyone give an example?

 Thanks
 Mani
 Graduate Student, Department of Computer Science
 Virginia Tech









Re: Is LIMIT n in Spark SQL useful?

2015-05-04 Thread Yi Zhang
Robin,My query statement is as below:select id, name, trans_date, gender, 
hobby, job, country from Employees LIMIT 100
In PostgreSQL, it works very well. For 10M records in DB, it just took less 
than 20ms, but in SparkSQL, it took long time. 
Michael,
Got it. For me, it is not good news. Anyway, thanks.
Regards,Yi



 On Tuesday, May 5, 2015 5:59 AM, Michael Armbrust mich...@databricks.com 
wrote:
   

 The JDBC interface for Spark SQL does not support pushing down limits today.
On Mon, May 4, 2015 at 8:06 AM, Robin East robin.e...@xense.co.uk wrote:

and a further question - have you tried running this query in pqsl? what’s the 
performance like there?

On 4 May 2015, at 16:04, Robin East robin.e...@xense.co.uk wrote:
What query are you running. It may be the case that your query requires 
PosgreSQL to do a large amount of work before identifying the first n rows

On 4 May 2015, at 15:52, Yi Zhang zhangy...@yahoo.com.INVALID wrote:
I am trying to query PostgreSQL using LIMIT(n) to reduce memory size and 
improve query performance, but I found it took long time as same as querying 
not using LIMIT. It let me confused. Anybody know why?
Thanks.
Regards,Yi







  

Re: Spark - Timeout Issues - OutOfMemoryError

2015-05-04 Thread ๏̯͡๏
Data Set 1 : viEvents : Is the event activity data of 1 day. I took 10
files out of it and 10 records

*Item ID Count*
 201335783004 3419  191568402102 1793  111654479898 1362  181503913062
1310  261798565828 1028  111654493548 994  231516683056 862  131497785968
746  161666914810 633  221749455474 432  201324502754 410  201334042634 402
191562605592 380  271841178238 362  161663339210 344  251615941886 313
261855748678 309  271821726658 255  111657099518 224  261868369938 218
181725710132 216  171766164072 215  221757076934 213  171763906872 212
111650132368 206  181629904282 204  261867932788 198  161668475280 194
191398227282 194





Data set 2:
ItemID Count
2217305702 1
3842604614 1
4463421160 1
4581260446 1
4632783223 1
4645316947 1
4760829454 1
4786989430 1
5530758430 1
5610056107 1
5661929425 1
5953801612 1
6141607456 1
6197204992 1
6220704442 1
6271022614 1
6282402871 1
6525123621 1
6554834772 1
6566297541 1
This data set will always have only one element for each item as it
contains metadata information.

Given the nature of these two datasets, if at all there is skewness then it
must be with dataset1. In dataset1 the top 20-30 records do not have record
count for a given itemID (shuffle key) greater than 3000 and that is very
small.

Why am i still *not* able to do a join of these two datasets given i have
unlimited capacity, repartitioning but 12G memory limit on each node.
Each time i get a task that runs forever and it process roughly around 1.5G
data when others are processing few MBs. Also 1.5G data (shuffle read size)
is very small.

Please suggest.


On Mon, May 4, 2015 at 9:08 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 I tried this

 val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
 Long))] = lstgItem.join(viEvents, new
 org.apache.spark.RangePartitioner(partitions = 1200, rdd = viEvents)).map
 {


 It fired two jobs and still i have 1 task that never completes.
 IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDurationGC
 TimeShuffle Read Size / Records ▴Shuffle Spill (Memory)Shuffle Spill
 (Disk)Errors  0 4818 0 RUNNING PROCESS_LOCAL 5 / host1 2015/05/04 07:24:25 1.1
 h  13 min  778.0 MB / 50314161  4.5 GB 47.4 MB   955 5773 0 SUCCESS
 PROCESS_LOCAL 5 / host2 2015/05/04 07:47:16 2.2 min  1.5 min  106.3 MB /
 4197539  0.0 B 0.0 B   1199 6017 0 SUCCESS PROCESS_LOCAL 3 / host3 2015/05/04
 07:51:51 48 s  2 s  94.2 MB / 3618335  2.8 GB 8.6 MB   216



 2)
 I tried reversing the datasets in join

 val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
 Long))] =viEvents.join(lstgItem)

 This led to same problem of a long running task.
 3)
 Next, i am trying this

 val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
 Long))] = lstgItem.join(viEvents, 1200).map {


 I have exhausted all my options.


 Regards,

 Deepak


 On Mon, May 4, 2015 at 6:24 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 I ran it against one file instead of 10 files and i see one task is still
 running after 33 mins its shuffle read size is 780MB/50 mil records.

 I did a count of records for each itemId from dataset-2 [One FILE] (Second
 Dataset (RDDPair) val viEvents = viEventsRaw.map { vi = (vi.get(14
 ).asInstanceOf[Long], vi) } ). This is the dataset that contains the
 list of items viewed by user in one day.

 *Item IdCount*
 201335783004 537
 111654496030 353
 141640703798 287
 191568402102 258
 111654479898 217
 231521843148 211
 251931716094 201
 111654493548 181
 181503913062 181
 121635453050 152
 261798565828 140
 151494617682 139
 251927181728 127
 231516683056 119
 141640492864 117
 161677270656 117
 171771073616 113
 111649942124 109
 191516989450 97
 231539161292 94
 221555628408 88
 131497785968 87
 121632233872 84
 131335379184 83
 281531363490 83
 131492727742 79
 231174157820 79
 161666914810 77
 251699753072 77
 161683664300 76


 I was assuming that data-skew would be if the top item(201335783004) had
 a count of 1 million, however its only few hundreds, then why is Spark
 skewing it in join ? What should i do that Spark distributes the records
 evenly ?

 In M/R we can change the Partitioner between mapper and reducer, how can
 i do in Spark  for Join?


 IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDurationGC
 TimeShuffle Read Size / Records ▴Shuffle Spill (Memory)Shuffle Spill
 (Disk)Errors  0 3618 0 RUNNING PROCESS_LOCAL 4 / host12015/05/04 05:09:53 33
 min  8.5 min  783.9 MB / 50,761,322  4.6 GB 47.5 MB   433 4051 0 SUCCESS
 PROCESS_LOCAL 1 / host2 2015/05/04 05:16:27 1.1 min  20 s  116.0 MB /
 4505143  1282.3 MB 10.1 MB   218 3836 0 SUCCESS PROCESS_LOCAL 3 / host3 
 2015/05/04
 05:13:01 53 s  11 s  76.4 MB / 2865143  879.6 MB 6.9 MB   113 3731 0
 SUCCESS PROCESS_LOCAL 2 / host4 2015/05/04 05:11:30 31 s  8 s  6.9 MB /
 5187  0.0 B 0.0 B

 On Mon, May 4, 2015 at 6:00 PM, Saisai Shao sai.sai.s...@gmail.com
 wrote:

 From the symptoms you mentioned that one 

Re: sparksql running slow while joining_2_tables.

2015-05-04 Thread Cheng, Hao
I assume you’re using the DataFrame API within your application.

sql(“SELECT…”).explain(true)

From: Wang, Daoyuan
Sent: Tuesday, May 5, 2015 10:16 AM
To: luohui20...@sina.com; Cheng, Hao; Olivier Girardot; user
Subject: RE: 回复:RE: 回复:Re: sparksql running slow while joining_2_tables.

You can use
Explain extended select ….

From: luohui20...@sina.commailto:luohui20...@sina.com 
[mailto:luohui20...@sina.com]
Sent: Tuesday, May 05, 2015 9:52 AM
To: Cheng, Hao; Olivier Girardot; user
Subject: 回复:RE: 回复:Re: sparksql running slow while joining_2_tables.


As I know broadcastjoin is automatically enabled by 
spark.sql.autoBroadcastJoinThreshold.

refer to 
http://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration-options



and how to check my app's physical plan,and others things like optimized 
plan,executable plan.etc



thanks



Thanksamp;Best regards!
罗辉 San.Luo

- 原始邮件 -
发件人:Cheng, Hao hao.ch...@intel.commailto:hao.ch...@intel.com
收件人:Cheng, Hao hao.ch...@intel.commailto:hao.ch...@intel.com, 
luohui20...@sina.commailto:luohui20...@sina.com 
luohui20...@sina.commailto:luohui20...@sina.com, Olivier Girardot 
ssab...@gmail.commailto:ssab...@gmail.com, user 
user@spark.apache.orgmailto:user@spark.apache.org
主题:RE: 回复:Re: sparksql running slow while joining_2_tables.
日期:2015年05月05日 08点38分

Or, have you ever try broadcast join?

From: Cheng, Hao [mailto:hao.ch...@intel.com]
Sent: Tuesday, May 5, 2015 8:33 AM
To: luohui20...@sina.commailto:luohui20...@sina.com; Olivier Girardot; user
Subject: RE: 回复:Re: sparksql running slow while joining 2 tables.

Can you print out the physical plan?

EXPLAIN SELECT xxx…

From: luohui20...@sina.commailto:luohui20...@sina.com 
[mailto:luohui20...@sina.com]
Sent: Monday, May 4, 2015 9:08 PM
To: Olivier Girardot; user
Subject: 回复:Re: sparksql running slow while joining 2 tables.


hi Olivier

spark1.3.1, with java1.8.0.45

and add 2 pics .

it seems like a GC issue. I also tried with different parameters like memory 
size of driverexecutor, memory fraction, java opts...

but this issue still happens.



Thanksamp;Best regards!
罗辉 San.Luo

- 原始邮件 -
发件人:Olivier Girardot ssab...@gmail.commailto:ssab...@gmail.com
收件人:luohui20...@sina.commailto:luohui20...@sina.com, user 
user@spark.apache.orgmailto:user@spark.apache.org
主题:Re: sparksql running slow while joining 2 tables.
日期:2015年05月04日 20点46分

Hi,
What is you Spark version ?

Regards,

Olivier.

Le lun. 4 mai 2015 à 11:03, luohui20...@sina.commailto:luohui20...@sina.com 
a écrit :

hi guys

when i am running a sql  like select 
a.namehttp://a.name,a.startpoint,a.endpoint, a.piece from db a join sample b 
on (a.namehttp://a.name = b.namehttp://b.name) where (b.startpoint  
a.startpoint + 25); I found sparksql running slow in minutes which may caused 
by very long GC and shuffle time.



   table db is created from a txt file size at 56mb while table sample 
sized at 26mb, both at small size.

   my spark cluster is a standalone  pseudo-distributed spark cluster with 
8g executor and 4g driver manager.

   any advises? thank you guys.





Thanksamp;Best regards!
罗辉 San.Luo

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org


Spark JVM default memory

2015-05-04 Thread Vijayasarathy Kannan
Starting the master with /sbin/start-master.sh creates a JVM with only
512MB of memory. How to change this default amount of memory?

Thanks,
Vijay


Re: No logs from my cluster / worker ... (running DSE 4.6.1)

2015-05-04 Thread Ted Yu
bq. its Spark libs are all at 2.10

Clarification: 2.10 is version of Scala
Your Spark version is 1.1.0

You can use earlier release of Kafka.

Cheers

On Mon, May 4, 2015 at 2:39 PM, Eric Ho eric...@intel.com wrote:

 I still prefer to use Spark core / streaming /... at 2.10 becuase my DSE
 is at 4.6.1 and its Spark libs are all at 2.10 ...  My Scala code will run
 on DSE machines which have Spark enabled.

 So, should I grab my Kakfa server at here ?
 https://archive.apache.org/dist/kafka/0.8.0/kafka_2.8.0-0.8.0.tar.gz

 On Mon, May 4, 2015 at 1:07 PM, Ted Yu yuzhih...@gmail.com wrote:

 Looks like you're using Spark 1.1.0

 Support for Kafka 0.8.2 was added by:
 https://issues.apache.org/jira/browse/SPARK-2808

 which would come in Spark 1.4.0

 FYI

 On Mon, May 4, 2015 at 12:22 PM, Eric Ho eric...@intel.com wrote:

 I'm submitting this via 'dse spark-submit' but somehow, I don't see any
 loggings in my cluster or worker machines...

 How can I find out ?

 My cluster is running DSE 4.6.1 with Spark enabled.
 My source is running Kafka 0.8.2.0

 I'm launching my program on one of my DSE machines.

 Any insights much appreciated.

 Thanks.

 -
 cas1.dev% dse spark-submit --verbose --deploy-mode cluster --master
 spark://cas1.dev.kno.com:7077 --class
 com.kno.highlights.counting.service.HighlightConsumer --driver-class-path

 /opt/kno/kno-highlights-counting-service/kno-highlights-counting-service-0.1/lib
 --driver-library-path

 /opt/kno/kno-highlights-counting-service/kno-highlights-counting-service-0.1/lib
 --properties-file /tmp/highlights-counting.properties

 /opt/kno/kno-highlights-counting-service/kno-highlights-counting-service-0.1/lib/kno-highlights-counting-service.kno-highlights-counting-service-0.1.jar
 --name HighlightConsumer
 Using properties file: /tmp/highlights-counting.properties
 Warning: Ignoring non-spark config property:
 checkpoint_directory=checkpointForHighlights
 Warning: Ignoring non-spark config property: zookeeper_port=2181
 Warning: Ignoring non-spark config property:
 default_num_of_cores_per_topic=1
 Warning: Ignoring non-spark config property: num_of_concurrent_streams=2
 Warning: Ignoring non-spark config property:
 kafka_consumer_group=highlight_consumer_group
 Warning: Ignoring non-spark config property: app_name=HighlightConsumer
 Warning: Ignoring non-spark config property:
 cassandra_keyspace=bookevents
 Warning: Ignoring non-spark config property: scheduler_mode=FIFO
 Warning: Ignoring non-spark config property:
 highlight_topic=highlight_topic
 Warning: Ignoring non-spark config property: cassandra_host=
 cas1.dev.kno.com
 Warning: Ignoring non-spark config property: checkpoint_interval=3
 Warning: Ignoring non-spark config property: zookeeper_host=
 cas1.dev.kno.com
 Adding default property: spark_master=spark://cas1.dev.kno.com:7077
 Warning: Ignoring non-spark config property: streaming_window=10
 Using properties file: /tmp/highlights-counting.properties
 Warning: Ignoring non-spark config property:
 checkpoint_directory=checkpointForHighlights
 Warning: Ignoring non-spark config property: zookeeper_port=2181
 Warning: Ignoring non-spark config property:
 default_num_of_cores_per_topic=1
 Warning: Ignoring non-spark config property: num_of_concurrent_streams=2
 Warning: Ignoring non-spark config property:
 kafka_consumer_group=highlight_consumer_group
 Warning: Ignoring non-spark config property: app_name=HighlightConsumer
 Warning: Ignoring non-spark config property:
 cassandra_keyspace=bookevents
 Warning: Ignoring non-spark config property: scheduler_mode=FIFO
 Warning: Ignoring non-spark config property:
 highlight_topic=highlight_topic
 Warning: Ignoring non-spark config property: cassandra_host=
 cas1.dev.kno.com
 Warning: Ignoring non-spark config property: checkpoint_interval=3
 Warning: Ignoring non-spark config property: zookeeper_host=
 cas1.dev.kno.com
 Adding default property: spark_master=spark://cas1.dev.kno.com:7077
 Warning: Ignoring non-spark config property: streaming_window=10
 Parsed arguments:
   master  spark://cas1.dev.kno.com:7077
   deployMode  cluster
   executorMemory  null
   executorCores   null
   totalExecutorCores  null
   propertiesFile  /tmp/highlights-counting.properties
   extraSparkPropertiesMap()
   driverMemorynull
   driverCores null
   driverExtraClassPath

 /opt/kno/kno-highlights-counting-service/kno-highlights-counting-service-0.1/lib
   driverExtraLibraryPath

 /opt/kno/kno-highlights-counting-service/kno-highlights-counting-service-0.1/lib
   driverExtraJavaOptions  null
   supervise   false
   queue   null
   numExecutorsnull
   files   null
   pyFiles null
   archivesnull
   mainClass
 com.kno.highlights.counting.service.HighlightConsumer
   primaryResource

 

Re: ReduceByKey and sorting within partitions

2015-05-04 Thread Koert Kuipers
shoot me an email if you need any help with spark-sorted. it does not
(yet?) have a java api, so you will have to work in scala

On Mon, May 4, 2015 at 4:05 PM, Burak Yavuz brk...@gmail.com wrote:

 I think this Spark Package may be what you're looking for!
 http://spark-packages.org/package/tresata/spark-sorted

 Best,
 Burak

 On Mon, May 4, 2015 at 12:56 PM, Imran Rashid iras...@cloudera.com
 wrote:

 oh wow, that is a really interesting observation, Marco  Jerry.
 I wonder if this is worth exposing in combineByKey()?  I think Jerry's
 proposed workaround is all you can do for now -- use reflection to
 side-step the fact that the methods you need are private.

 On Mon, Apr 27, 2015 at 8:07 AM, Saisai Shao sai.sai.s...@gmail.com
 wrote:

 Hi Marco,

 As I know, current combineByKey() does not expose the related argument
 where you could set keyOrdering on the ShuffledRDD, since ShuffledRDD is
 package private, if you can get the ShuffledRDD through reflection or other
 way, the keyOrdering you set will be pushed down to shuffle. If you use a
 combination of transformations to do it, the result will be same but the
 efficiency may be different, some transformations will separate into
 different stages, which will introduce additional shuffle.

 Thanks
 Jerry


 2015-04-27 19:00 GMT+08:00 Marco marcope...@gmail.com:

 Hi,

 I'm trying, after reducing by key, to get data ordered among partitions
 (like RangePartitioner) and within partitions (like sortByKey or
 repartitionAndSortWithinPartition) pushing the sorting down to the
 shuffles machinery of the reducing phase.

 I think, but maybe I'm wrong, that the correct way to do that is that
 combineByKey call setKeyOrdering function on the ShuflleRDD that it
 returns.

 Am I wrong? Can be done by a combination of other transformations with
 the same efficiency?

 Thanks,
 Marco

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







Re: Python Custom Partitioner

2015-05-04 Thread ayan guha
Thanks, but is there non broadcast solution?
On 5 May 2015 01:34, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 I have implemented map-side join with broadcast variables and the code is
 on mailing list (scala).


 On Mon, May 4, 2015 at 8:38 PM, ayan guha guha.a...@gmail.com wrote:

 Hi

 Can someone share some working code for custom partitioner in python?

 I am trying to understand it better.

 Here is documentation

 partitionBy(*numPartitions*, *partitionFunc=function portable_hash at
 0x2c45140*)
 https://spark.apache.org/docs/1.3.1/api/python/pyspark.html#pyspark.RDD.partitionBy

 Return a copy of the RDD partitioned using the specified partitioner.


 what I am trying to do -

 1. Create a dataframe

 2. Partition it using one specific column

 3. create another dataframe

 4. partition it on the same column

 5. join (to enforce map-side join)

 My question:

 a) Am I on right path?

 b) How can I do partitionby? Specifically, when I call
 DF.rdd.partitionBy, what gets passed to the custom function? tuple? row?
 how to access (say 3rd column of a tuple inside partitioner function)?

 --
 Best Regards,
 Ayan Guha




 --
 Deepak




Re: Is LIMIT n in Spark SQL useful?

2015-05-04 Thread Michael Armbrust
The JDBC interface for Spark SQL does not support pushing down limits today.

On Mon, May 4, 2015 at 8:06 AM, Robin East robin.e...@xense.co.uk wrote:

 and a further question - have you tried running this query in pqsl? what’s
 the performance like there?

 On 4 May 2015, at 16:04, Robin East robin.e...@xense.co.uk wrote:

 What query are you running. It may be the case that your query requires
 PosgreSQL to do a large amount of work before identifying the first n rows

 On 4 May 2015, at 15:52, Yi Zhang zhangy...@yahoo.com.INVALID wrote:

 I am trying to query PostgreSQL using LIMIT(n) to reduce memory size and
 improve query performance, but I found it took long time as same as
 querying not using LIMIT. It let me confused. Anybody know why?

 Thanks.

 Regards,
 Yi






Re: Long GC pauses with Spark SQL 1.3.0 and billion row tables

2015-05-04 Thread Michael Armbrust
If you data is evenly distributed (i.e. no skewed datapoints in your join
keys), it can also help to increase spark.sql.shuffle.partitions (default
is 200).

On Mon, May 4, 2015 at 8:03 AM, Richard Marscher rmarsc...@localytics.com
wrote:

 In regards to the large GC pauses, assuming you allocated all 100GB of
 memory per worker you may consider running with less memory on your Worker
 nodes, or splitting up the available memory on the Worker nodes amongst
 several worker instances. The JVM's garbage collection starts to become
 very slow as the memory allocation for the heap becomes large. At 100GB it
 may not be unusual to see GC take minutes at time. I believe with Yarn or
 Standalone clusters you should be able to run multiple smaller JVM
 instances on your workers so you can still use your cluster resources but
 also won't have a single JVM allocating an unwieldy amount of much memory.

 On Mon, May 4, 2015 at 2:23 AM, Nick Travers n.e.trav...@gmail.com
 wrote:

 Could you be more specific in how this is done?

 A DataFrame class doesn't have that method.

 On Sun, May 3, 2015 at 11:07 PM, ayan guha guha.a...@gmail.com wrote:

 You can use custom partitioner to redistribution using partitionby
 On 4 May 2015 15:37, Nick Travers n.e.trav...@gmail.com wrote:

 I'm currently trying to join two large tables (order 1B rows each) using
 Spark SQL (1.3.0) and am running into long GC pauses which bring the
 job to
 a halt.

 I'm reading in both tables using a HiveContext with the underlying files
 stored as Parquet Files. I'm using  something along the lines of
 HiveContext.sql(SELECT a.col1, b.col2 FROM a JOIN b ON a.col1 =
 b.col1) to
 set up the join.

 When I execute this (with an action such as .count) I see the first few
 stages complete, but the job eventually stalls. The GC counts keep
 increasing for each executor.

 Running with 6 workers, each with 2T disk and 100GB RAM.

 Has anyone else run into this issue? I'm thinking I might be running
 into
 issues with the shuffling of the data, but I'm unsure of how to get
 around
 this? Is there a way to redistribute the rows based on the join key
 first,
 and then do the join?

 Thanks in advance.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Long-GC-pauses-with-Spark-SQL-1-3-0-and-billion-row-tables-tp22750.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: MLLib SVM probability

2015-05-04 Thread Joseph Bradley
Currently, SVMs don't have built-in multiclass support.  Logistic
Regression supports multiclass, as do trees and random forests.  It would
be great to add multiclass support for SVMs as well.

There is some ongoing work on generic multiclass-to-binary reductions:
https://issues.apache.org/jira/browse/SPARK-7015

I agree that naive one-vs-all reductions might not work that well, but that
the raw scores could be calibrated using the scaling you mentioned, or
other methods.

Joseph

On Mon, May 4, 2015 at 6:29 AM, Driesprong, Fokko fo...@driesprong.frl
wrote:

 Hi Robert,

 I would say, taking the sign of the numbers represent the class of the
 input-vector. What kind of data are you using, and what kind of traning-set
 do you use. Fundamentally a SVM is able to separate only two classes, you
 can do one vs the rest as you mentioned.

 I don't see how LVQ can benefit the SVM classifier. I would say that this
 is more a SVM problem, than a Spark.

 2015-05-04 15:22 GMT+02:00 Robert Musters robert.must...@openindex.io:

  Hi all,

 I am trying to understand the output of the SVM classifier.

 Right now, my output looks like this:

 -18.841544889249917 0.0

 168.32916035523283 1.0

 420.67763915879794 1.0

 -974.1942589201286 0.0

 71.73602841256813 1.0

 233.13636224524993 1.0

 -1000.5902168199027 0.0


  The documentation is unclear about what these numbers mean
 https://spark.apache.org/docs/0.9.2/api/mllib/index.html#org.apache.spark.mllib.regression.LabeledPoint
 .

 I think it is the distance to the hyperplane with sign.


  My main question is: How can I convert distances from hyperplanes to
 probabilities in a multi-class one-vs-all approach?

 SVMLib http://www.csie.ntu.edu.tw/~cjlin/libsvm/ has this
 functionality and refers the process to get the probabilities as “Platt
 scaling”
 http://www.researchgate.net/profile/John_Platt/publication/2594015_Probabilistic_Outputs_for_Support_Vector_Machines_and_Comparisons_to_Regularized_Likelihood_Methods/links/004635154cff5262d600.pdf.


 I think this functionality should be in MLLib, but I can't find it?
 Do you think Platt scaling makes sense?


  Making clusters using Learning Vector Quantization, determining the
 spread function of a cluster with a Gaussian function and then retrieving
 the probability makes a lot more sense i.m.o. Using the distances from the
 hyperplanes from several SVM classifiers and then trying to determine some
 probability on these distance measures, does not make any sense, because
 the distribution property of the data-points belonging to a cluster is not
 taken into account.
 Does anyone see a fallacy in my reasoning?


  With kind regards,

 Robert





RE: Spark JVM default memory

2015-05-04 Thread Mohammed Guller
Did you confirm through the Spark UI how much memory is getting allocated to 
your application on each worker?

Mohammed

From: Vijayasarathy Kannan [mailto:kvi...@vt.edu]
Sent: Monday, May 4, 2015 3:36 PM
To: Andrew Ash
Cc: user@spark.apache.org
Subject: Re: Spark JVM default memory

I am trying to read in a file (4GB file). I tried setting both 
spark.driver.memory and spark.executor.memory to large values (say 16GB) 
but I still get a GC limit exceeded error. Any idea what I am missing?

On Mon, May 4, 2015 at 5:30 PM, Andrew Ash 
and...@andrewash.commailto:and...@andrewash.com wrote:
It's unlikely you need to increase the amount of memory on your master node 
since it does simple bookkeeping.  The majority of the memory pressure across a 
cluster is on executor nodes.

See the conf/spark-env.sh file for configuring heap sizes, and this section in 
the docs for more information on how to make these changes: 
http://spark.apache.org/docs/latest/configuration.html

On Mon, May 4, 2015 at 2:24 PM, Vijayasarathy Kannan 
kvi...@vt.edumailto:kvi...@vt.edu wrote:
Starting the master with /sbin/start-master.sh creates a JVM with only 512MB 
of memory. How to change this default amount of memory?

Thanks,
Vijay




Re: SparkSQL Nested structure

2015-05-04 Thread Michael Armbrust
You are looking for LATERAL VIEW explode
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-explode
in HiveQL.

On Mon, May 4, 2015 at 7:49 AM, Giovanni Paolo Gibilisco gibb...@gmail.com
wrote:

 Hi, I'm trying to parse log files generated by Spark using SparkSQL.

 In the JSON elements related to the StageCompleted event we have a nested
 structre containing an array of elements with RDD Info. (see the log below
 as an example (omitting some parts).

 {
 Event: SparkListenerStageCompleted,
 Stage Info: {
   Stage ID: 1,
   ...
   RDD Info: [
 {
   RDD ID: 5,
   Name: 5,
   Storage Level: {
 Use Disk: false,
 Use Memory: false,
 Use Tachyon: false,
 Deserialized: false,
 Replication: 1
   },
   Number of Partitions: 2,
   Number of Cached Partitions: 0,
   Memory Size: 0,
   Tachyon Size: 0,
   Disk Size: 0
 },
 ...

 When i register the log as a table SparkSQL is able to generate the
 correct schema that for the RDD Info element looks like

  | -- RDD Info: array (nullable = true)
  ||-- element: struct (containsNull = true)
  |||-- Disk Size: long (nullable = true)
  |||-- Memory Size: long (nullable = true)
  |||-- Name: string (nullable = true)

 My problem is that if I try to query the table I can only get array
 buffers out of it:

 SELECT `stageEndInfos.Stage Info.Stage ID`, `stageEndInfos.Stage Info.RDD
 Info` FROM stageEndInfos
 Stage ID RDD Info
 1ArrayBuffer([0,0,...
 0ArrayBuffer([0,0,...
 2ArrayBuffer([0,0,...

 or:

 SELECT `stageEndInfos.Stage Info.RDD Info.RDD ID` FROM stageEndInfos
 RDD ID
 ArrayBuffer(5, 4, 3)
 ArrayBuffer(2, 1, 0)
 ArrayBuffer(9, 6,...

 Is there a way to explode the arrays in the rows in order to build a
 single table? (Knowing that the RDD ID is unique and can be used as primary
 key)?

 Thanks!

 How can I get



Re: Spark JVM default memory

2015-05-04 Thread Vijayasarathy Kannan
I am not able to access the web UI for some reason. But the logs (being
written while running my application) show that only 385Mb are being
allocated for each executor (or slave nodes) while the executor memory I
set is 16Gb. This 385Mb is not the same for each run either. It looks
random (sometimes 1G, sometimes 512M, etc.)

On Mon, May 4, 2015 at 6:57 PM, Mohammed Guller moham...@glassbeam.com
wrote:

  Did you confirm through the Spark UI how much memory is getting
 allocated to your application on each worker?



 Mohammed



 *From:* Vijayasarathy Kannan [mailto:kvi...@vt.edu]
 *Sent:* Monday, May 4, 2015 3:36 PM
 *To:* Andrew Ash
 *Cc:* user@spark.apache.org
 *Subject:* Re: Spark JVM default memory



 I am trying to read in a file (4GB file). I tried setting both
 spark.driver.memory and spark.executor.memory to large values (say
 16GB) but I still get a GC limit exceeded error. Any idea what I am missing?



 On Mon, May 4, 2015 at 5:30 PM, Andrew Ash and...@andrewash.com wrote:

 It's unlikely you need to increase the amount of memory on your master
 node since it does simple bookkeeping.  The majority of the memory pressure
 across a cluster is on executor nodes.



 See the conf/spark-env.sh file for configuring heap sizes, and this
 section in the docs for more information on how to make these changes:
 http://spark.apache.org/docs/latest/configuration.html



 On Mon, May 4, 2015 at 2:24 PM, Vijayasarathy Kannan kvi...@vt.edu
 wrote:

 Starting the master with /sbin/start-master.sh creates a JVM with only
 512MB of memory. How to change this default amount of memory?



 Thanks,

 Vijay







Re: Spark JVM default memory

2015-05-04 Thread Vijayasarathy Kannan
I am trying to read in a file (4GB file). I tried setting both
spark.driver.memory and spark.executor.memory to large values (say
16GB) but I still get a GC limit exceeded error. Any idea what I am missing?

On Mon, May 4, 2015 at 5:30 PM, Andrew Ash and...@andrewash.com wrote:

 It's unlikely you need to increase the amount of memory on your master
 node since it does simple bookkeeping.  The majority of the memory pressure
 across a cluster is on executor nodes.

 See the conf/spark-env.sh file for configuring heap sizes, and this
 section in the docs for more information on how to make these changes:
 http://spark.apache.org/docs/latest/configuration.html

 On Mon, May 4, 2015 at 2:24 PM, Vijayasarathy Kannan kvi...@vt.edu
 wrote:

 Starting the master with /sbin/start-master.sh creates a JVM with only
 512MB of memory. How to change this default amount of memory?

 Thanks,
 Vijay





Re: ReduceByKey and sorting within partitions

2015-05-04 Thread Imran Rashid
oh wow, that is a really interesting observation, Marco  Jerry.
I wonder if this is worth exposing in combineByKey()?  I think Jerry's
proposed workaround is all you can do for now -- use reflection to
side-step the fact that the methods you need are private.

On Mon, Apr 27, 2015 at 8:07 AM, Saisai Shao sai.sai.s...@gmail.com wrote:

 Hi Marco,

 As I know, current combineByKey() does not expose the related argument
 where you could set keyOrdering on the ShuffledRDD, since ShuffledRDD is
 package private, if you can get the ShuffledRDD through reflection or other
 way, the keyOrdering you set will be pushed down to shuffle. If you use a
 combination of transformations to do it, the result will be same but the
 efficiency may be different, some transformations will separate into
 different stages, which will introduce additional shuffle.

 Thanks
 Jerry


 2015-04-27 19:00 GMT+08:00 Marco marcope...@gmail.com:

 Hi,

 I'm trying, after reducing by key, to get data ordered among partitions
 (like RangePartitioner) and within partitions (like sortByKey or
 repartitionAndSortWithinPartition) pushing the sorting down to the
 shuffles machinery of the reducing phase.

 I think, but maybe I'm wrong, that the correct way to do that is that
 combineByKey call setKeyOrdering function on the ShuflleRDD that it
 returns.

 Am I wrong? Can be done by a combination of other transformations with
 the same efficiency?

 Thanks,
 Marco

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: spark kryo serialization question

2015-05-04 Thread Imran Rashid
yes, you should register all three.

The truth is, you only *need* to register classes that will get serialized
-- either via RDD caching or in a shuffle.  So if a type is only used as an
intermediate inside a stage, you don't need to register it.  But the
overhead of registering extra classes is pretty minimal, so as long as you
do this within reason, I think you're OK.


Imran

On Thu, Apr 30, 2015 at 12:34 AM, 邓刚[技术中心] triones.d...@vipshop.com wrote:

 Hi all

  We know that spark support Kryo serialization, suppose there is a
 map function which map C  to K,V(here C,K,V are instance of class C,K,V),
 when we register kryo serialization, should I register all of these three
 class?



 Best Wishes



 Triones Deng





 本电子邮件可能为保密文件。如果阁下非电子邮件所指定之收件人,谨请立即通知本人。敬请阁下不要使用、保存、复印、打印、散布本电子邮件及其内容,或将其用于其他任何目的或向任何人披露。谢谢您的合作!
 This communication is intended only for the addressee(s) and may contain
 information that is privileged and confidential. You are hereby notified
 that, if you are not an intended recipient listed above, or an authorized
 employee or agent of an addressee of this communication responsible for
 delivering e-mail messages to an intended recipient, any dissemination,
 distribution or reproduction of this communication (including any
 attachments hereto) is strictly prohibited. If you have received this
 communication in error, please notify us immediately by a reply e-mail
 addressed to the sender and permanently delete the original e-mail
 communication and any attachments from all storage devices without making
 or otherwise retaining a copy.



Re: No logs from my cluster / worker ... (running DSE 4.6.1)

2015-05-04 Thread Ted Yu
Looks like you're using Spark 1.1.0

Support for Kafka 0.8.2 was added by:
https://issues.apache.org/jira/browse/SPARK-2808

which would come in Spark 1.4.0

FYI

On Mon, May 4, 2015 at 12:22 PM, Eric Ho eric...@intel.com wrote:

 I'm submitting this via 'dse spark-submit' but somehow, I don't see any
 loggings in my cluster or worker machines...

 How can I find out ?

 My cluster is running DSE 4.6.1 with Spark enabled.
 My source is running Kafka 0.8.2.0

 I'm launching my program on one of my DSE machines.

 Any insights much appreciated.

 Thanks.

 -
 cas1.dev% dse spark-submit --verbose --deploy-mode cluster --master
 spark://cas1.dev.kno.com:7077 --class
 com.kno.highlights.counting.service.HighlightConsumer --driver-class-path

 /opt/kno/kno-highlights-counting-service/kno-highlights-counting-service-0.1/lib
 --driver-library-path

 /opt/kno/kno-highlights-counting-service/kno-highlights-counting-service-0.1/lib
 --properties-file /tmp/highlights-counting.properties

 /opt/kno/kno-highlights-counting-service/kno-highlights-counting-service-0.1/lib/kno-highlights-counting-service.kno-highlights-counting-service-0.1.jar
 --name HighlightConsumer
 Using properties file: /tmp/highlights-counting.properties
 Warning: Ignoring non-spark config property:
 checkpoint_directory=checkpointForHighlights
 Warning: Ignoring non-spark config property: zookeeper_port=2181
 Warning: Ignoring non-spark config property:
 default_num_of_cores_per_topic=1
 Warning: Ignoring non-spark config property: num_of_concurrent_streams=2
 Warning: Ignoring non-spark config property:
 kafka_consumer_group=highlight_consumer_group
 Warning: Ignoring non-spark config property: app_name=HighlightConsumer
 Warning: Ignoring non-spark config property: cassandra_keyspace=bookevents
 Warning: Ignoring non-spark config property: scheduler_mode=FIFO
 Warning: Ignoring non-spark config property:
 highlight_topic=highlight_topic
 Warning: Ignoring non-spark config property: cassandra_host=
 cas1.dev.kno.com
 Warning: Ignoring non-spark config property: checkpoint_interval=3
 Warning: Ignoring non-spark config property: zookeeper_host=
 cas1.dev.kno.com
 Adding default property: spark_master=spark://cas1.dev.kno.com:7077
 Warning: Ignoring non-spark config property: streaming_window=10
 Using properties file: /tmp/highlights-counting.properties
 Warning: Ignoring non-spark config property:
 checkpoint_directory=checkpointForHighlights
 Warning: Ignoring non-spark config property: zookeeper_port=2181
 Warning: Ignoring non-spark config property:
 default_num_of_cores_per_topic=1
 Warning: Ignoring non-spark config property: num_of_concurrent_streams=2
 Warning: Ignoring non-spark config property:
 kafka_consumer_group=highlight_consumer_group
 Warning: Ignoring non-spark config property: app_name=HighlightConsumer
 Warning: Ignoring non-spark config property: cassandra_keyspace=bookevents
 Warning: Ignoring non-spark config property: scheduler_mode=FIFO
 Warning: Ignoring non-spark config property:
 highlight_topic=highlight_topic
 Warning: Ignoring non-spark config property: cassandra_host=
 cas1.dev.kno.com
 Warning: Ignoring non-spark config property: checkpoint_interval=3
 Warning: Ignoring non-spark config property: zookeeper_host=
 cas1.dev.kno.com
 Adding default property: spark_master=spark://cas1.dev.kno.com:7077
 Warning: Ignoring non-spark config property: streaming_window=10
 Parsed arguments:
   master  spark://cas1.dev.kno.com:7077
   deployMode  cluster
   executorMemory  null
   executorCores   null
   totalExecutorCores  null
   propertiesFile  /tmp/highlights-counting.properties
   extraSparkPropertiesMap()
   driverMemorynull
   driverCores null
   driverExtraClassPath

 /opt/kno/kno-highlights-counting-service/kno-highlights-counting-service-0.1/lib
   driverExtraLibraryPath

 /opt/kno/kno-highlights-counting-service/kno-highlights-counting-service-0.1/lib
   driverExtraJavaOptions  null
   supervise   false
   queue   null
   numExecutorsnull
   files   null
   pyFiles null
   archivesnull
   mainClass
 com.kno.highlights.counting.service.HighlightConsumer
   primaryResource

 file:/opt/kno/kno-highlights-counting-service/kno-highlights-counting-service-0.1/lib/kno-highlights-counting-service.kno-highlights-counting-service-0.1.jar
   name
 com.kno.highlights.counting.service.HighlightConsumer
   childArgs   [--name HighlightConsumer]
   jarsnull
   verbose true

 Default properties from /tmp/highlights-counting.properties:
   spark_master - spark://cas1.dev.kno.com:7077


 Using properties file: /tmp/highlights-counting.properties
 Warning: Ignoring non-spark config property:
 checkpoint_directory=checkpointForHighlights
 Warning: Ignoring non-spark config property: zookeeper_port=2181
 

Re: Spark JVM default memory

2015-05-04 Thread Andrew Ash
It's unlikely you need to increase the amount of memory on your master node
since it does simple bookkeeping.  The majority of the memory pressure
across a cluster is on executor nodes.

See the conf/spark-env.sh file for configuring heap sizes, and this section
in the docs for more information on how to make these changes:
http://spark.apache.org/docs/latest/configuration.html

On Mon, May 4, 2015 at 2:24 PM, Vijayasarathy Kannan kvi...@vt.edu wrote:

 Starting the master with /sbin/start-master.sh creates a JVM with only
 512MB of memory. How to change this default amount of memory?

 Thanks,
 Vijay



Re: Spark partitioning question

2015-05-04 Thread Imran Rashid
Hi Marius,

I am also a little confused -- are you saying that myPartitions is
basically something like:

class MyPartitioner extends Partitioner {
  def numPartitions = 1
  def getPartition(key: Any) = 0
}

??

If so, I don't understand how you'd ever end up data in two partitions.
Indeed, than everything before the call to partitionBy(myPartitioner) is
somewhat irrelevant.  The important point is the partitionsBy should put
all the data in one partition, and then the operations after that do not
move data between partitions.  so if you're really observing data in two
partitions, then it would good to know more about what version of spark you
are on, your data etc. as it sounds like a bug.

But, I have a feeling there is some misunderstanding about what your
partitioner is doing.  Eg., I think doing groupByKey followed by sortByKey
doesn't make a lot of sense -- in general one sortByKey is all you need
(its not exactly the same, but most probably close enough, and avoids doing
another expensive shuffle).  If you can share a bit more information on
your partitioner, and what properties you need for your f, that might
help.

thanks,
Imran


On Tue, Apr 28, 2015 at 7:10 AM, Marius Danciu marius.dan...@gmail.com
wrote:

 Hello all,

 I have the following Spark (pseudo)code:

 rdd = mapPartitionsWithIndex(...)
 .mapPartitionsToPair(...)
 .groupByKey()
 .sortByKey(comparator)
 .partitionBy(myPartitioner)
 .mapPartitionsWithIndex(...)
 .mapPartitionsToPair( *f* )

 The input data has 2 input splits (yarn 2.6.0).
 myPartitioner partitions all the records on partition 0, which is correct,
 so the intuition is that f provided to the last transformation
 (mapPartitionsToPair) would run sequentially inside a single yarn
 container. However from yarn logs I do see that both yarn containers are
 processing records from the same partition ... and *sometimes*  the over
 all job fails (due to the code in f which expects a certain order of
 records) and yarn container 1 receives the records as expected, whereas
 yarn container 2 receives a subset of records ... for a reason I cannot
 explain and f fails.

 The overall behavior of this job is that sometimes it succeeds and
 sometimes it fails ... apparently due to inconsistent propagation of sorted
 records to yarn containers.


 If any of this makes any sense to you, please let me know what I am
 missing.



 Best,
 Marius



Re: Extra stage that executes before triggering computation with an action

2015-05-04 Thread Imran Rashid
sortByKey() runs one job to sample the data, to determine what range of
keys to put in each partition.

There is a jira to change it to defer launching the job until the
subsequent action, but it will still execute another stage:

https://issues.apache.org/jira/browse/SPARK-1021

On Wed, Apr 29, 2015 at 5:57 PM, Tom Hubregtsen thubregt...@gmail.com
wrote:

 I'm not sure, but I wonder if because you are using the Spark REPL that it
 may not be representing what a normal runtime execution would look like and
 is possibly eagerly running a partial DAG once you define an operation that
 would cause a shuffle.

 What happens if you setup your same set of commands [a-e] in a file and use
 the Spark REPL's `load` or `paste` command to load them all at once? From
 Richard

 I have also packaged it in a jar file (without [e], the debug string), and
 still see the extra stage before the other two that I would expect. Even
 when I remove [d], the action, I still see stage 0 being executed (and do
 not see stage 1 and 2).

 Again a shortened log of the Stage 0:
 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[4] at
 sortByKey, which has no missing parents
 INFO DAGScheduler: ResultStage 0 (sortByKey) finished in 0.192 s




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Extra-stage-that-executes-before-triggering-computation-with-an-action-tp22707p22713.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: ReduceByKey and sorting within partitions

2015-05-04 Thread Burak Yavuz
I think this Spark Package may be what you're looking for!
http://spark-packages.org/package/tresata/spark-sorted

Best,
Burak

On Mon, May 4, 2015 at 12:56 PM, Imran Rashid iras...@cloudera.com wrote:

 oh wow, that is a really interesting observation, Marco  Jerry.
 I wonder if this is worth exposing in combineByKey()?  I think Jerry's
 proposed workaround is all you can do for now -- use reflection to
 side-step the fact that the methods you need are private.

 On Mon, Apr 27, 2015 at 8:07 AM, Saisai Shao sai.sai.s...@gmail.com
 wrote:

 Hi Marco,

 As I know, current combineByKey() does not expose the related argument
 where you could set keyOrdering on the ShuffledRDD, since ShuffledRDD is
 package private, if you can get the ShuffledRDD through reflection or other
 way, the keyOrdering you set will be pushed down to shuffle. If you use a
 combination of transformations to do it, the result will be same but the
 efficiency may be different, some transformations will separate into
 different stages, which will introduce additional shuffle.

 Thanks
 Jerry


 2015-04-27 19:00 GMT+08:00 Marco marcope...@gmail.com:

 Hi,

 I'm trying, after reducing by key, to get data ordered among partitions
 (like RangePartitioner) and within partitions (like sortByKey or
 repartitionAndSortWithinPartition) pushing the sorting down to the
 shuffles machinery of the reducing phase.

 I think, but maybe I'm wrong, that the correct way to do that is that
 combineByKey call setKeyOrdering function on the ShuflleRDD that it
 returns.

 Am I wrong? Can be done by a combination of other transformations with
 the same efficiency?

 Thanks,
 Marco

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Building DAG from log

2015-05-04 Thread Giovanni Paolo Gibilisco
Hi,
I'm trying to build the DAG of an application from the logs.
I've had a look at SparkReplayDebugger but it doesn't operato offline on
logs. I looked also at the one in this pull:
https://github.com/apache/spark/pull/2077 that seems to operate only on
logs but it doesn't clealry show the dependency between the stages.
Is there some other tool to do this?

In the log files I could not find the information needed to define
dependencies within the stages, is there any other way to derive this
information offline?
Thanks,


Re: AJAX with Apache Spark

2015-05-04 Thread Olivier Girardot
Hi Sergio,
you shouldn't architecture it this way, rather update a storage with Spark
Streaming that your Play App will query.
For example a Cassandra table, or Redis, or anything that will be able to
answer you in milliseconds, rather than querying the Spark Streaming
program.

Regards,

Olivier.

Le lun. 4 mai 2015 à 20:08, Sergio Jiménez Barrio drarse.a...@gmail.com a
écrit :

 Hi,

  I am trying create a DashBoard of a job of Apache Spark. I need run Spark
 Streaming 24/7 and when recive a ajax request this answer with the actual
 state of the job. I have created the client, and the program in Spark. I
 tried create the service of response with play, but this run the program
 with a request. I want send the accumulator of spark program with a request.

 Sorry for my explanation. Any idea? Maybe with Play?

 Thanks



RE: 回复:Re: sparksql running slow while joining 2 tables.

2015-05-04 Thread Cheng, Hao
Or, have you ever try broadcast join?

From: Cheng, Hao [mailto:hao.ch...@intel.com]
Sent: Tuesday, May 5, 2015 8:33 AM
To: luohui20...@sina.com; Olivier Girardot; user
Subject: RE: 回复:Re: sparksql running slow while joining 2 tables.

Can you print out the physical plan?

EXPLAIN SELECT xxx…

From: luohui20...@sina.commailto:luohui20...@sina.com 
[mailto:luohui20...@sina.com]
Sent: Monday, May 4, 2015 9:08 PM
To: Olivier Girardot; user
Subject: 回复:Re: sparksql running slow while joining 2 tables.


hi Olivier

spark1.3.1, with java1.8.0.45

and add 2 pics .

it seems like a GC issue. I also tried with different parameters like memory 
size of driverexecutor, memory fraction, java opts...

but this issue still happens.



Thanksamp;Best regards!
罗辉 San.Luo

- 原始邮件 -
发件人:Olivier Girardot ssab...@gmail.commailto:ssab...@gmail.com
收件人:luohui20...@sina.commailto:luohui20...@sina.com, user 
user@spark.apache.orgmailto:user@spark.apache.org
主题:Re: sparksql running slow while joining 2 tables.
日期:2015年05月04日 20点46分

Hi,
What is you Spark version ?

Regards,

Olivier.

Le lun. 4 mai 2015 à 11:03, luohui20...@sina.commailto:luohui20...@sina.com 
a écrit :

hi guys

when i am running a sql  like select 
a.namehttp://a.name,a.startpoint,a.endpoint, a.piece from db a join sample b 
on (a.namehttp://a.name = b.namehttp://b.name) where (b.startpoint  
a.startpoint + 25); I found sparksql running slow in minutes which may caused 
by very long GC and shuffle time.



   table db is created from a txt file size at 56mb while table sample 
sized at 26mb, both at small size.

   my spark cluster is a standalone  pseudo-distributed spark cluster with 
8g executor and 4g driver manager.

   any advises? thank you guys.





Thanksamp;Best regards!
罗辉 San.Luo

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org


回复:RE: 回复:Re: sparksql running slow while joining_2_tables.

2015-05-04 Thread luohui20001
As I know broadcastjoin is automatically enabled by 
spark.sql.autoBroadcastJoinThreshold.refer to 
http://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration-options
and how to check my app's physical plan,and others things like optimized 
plan,executable plan.etc
thanks


 
Thanksamp;Best regards!
罗辉 San.Luo

- 原始邮件 -
发件人:Cheng, Hao hao.ch...@intel.com
收件人:Cheng, Hao hao.ch...@intel.com, luohui20...@sina.com 
luohui20...@sina.com, Olivier Girardot ssab...@gmail.com, user 
user@spark.apache.org
主题:RE: 回复:Re: sparksql running slow while joining_2_tables.
日期:2015年05月05日 08点38分





Or, have you ever try broadcast join?
 


From: Cheng, Hao [mailto:hao.ch...@intel.com]


Sent: Tuesday, May 5, 2015 8:33 AM

To: luohui20...@sina.com; Olivier Girardot; user

Subject: RE: 回复:Re: sparksql running slow while joining 2 tables.


 
Can you print out the physical plan?
 
EXPLAIN SELECT xxx…
 
From:
luohui20...@sina.com [mailto:luohui20...@sina.com]


Sent: Monday, May 4, 2015 9:08 PM

To: Olivier Girardot; user

Subject: 回复:Re: sparksql running slow while joining 2 tables.
 
hi Olivier
spark1.3.1, with java1.8.0.45
and add 2 pics .
it seems like a GC issue. I also tried with different parameters like memory 
size of driverexecutor, memory fraction, java opts...
but this issue still happens.
 





 

Thanksamp;Best regards!

罗辉 San.Luo

 


- 
原始邮件 -

发件人:Olivier Girardot ssab...@gmail.com

收件人:luohui20...@sina.com, user user@spark.apache.org

主题:Re: sparksql running slow while joining 2 tables.

日期:2015年05月04日 20点46分

 

Hi, 

What is you Spark version ?

 


Regards, 


 


Olivier.


 

Le lun. 4 mai 2015 à 11:03, luohui20...@sina.com a écrit :

hi guys
when i am running a sql  like select 
a.name,a.startpoint,a.endpoint, a.piece from db a join sample b on (a.name =
b.name) where (b.startpoint  a.startpoint + 25); I found sparksql running 
slow in minutes which may caused by very long GC and shuffle time.
 
   table db is created from a txt file size at 56mb while table sample 
sized at 26mb, both at small size.
   my spark cluster is a standalone  pseudo-distributed spark cluster with 
8g executor and 4g driver manager.
   any advises? thank you guys.
 
 





 

Thanksamp;Best regards!

罗辉 San.Luo



-

To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org

For additional commands, e-mail: 
user-h...@spark.apache.org








回复:spark Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient

2015-05-04 Thread luohui20001
you may need to copy hive-site.xml to your spark conf directory and check your 
hive metastore warehouse setting, and also check if you are authenticated to 
access hive metastore warehouse.



 
Thanksamp;Best regards!
罗辉 San.Luo

- 原始邮件 -
发件人:鹰 980548...@qq.com
收件人:user user@spark.apache.org
主题:spark Unable to instantiate 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient
日期:2015年05月05日 08点49分

hi all,
   when i use submit a spark-sql programe to select data from my hive 
database I get an error like this:
User class threw exception: java.lang.RuntimeException: Unable to 
instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient ,what's wrong 
with my spark configure ,thank any help !