Re: Long GC pauses with Spark SQL 1.3.0 and billion row tables
You can use custom partitioner to redistribution using partitionby On 4 May 2015 15:37, Nick Travers n.e.trav...@gmail.com wrote: I'm currently trying to join two large tables (order 1B rows each) using Spark SQL (1.3.0) and am running into long GC pauses which bring the job to a halt. I'm reading in both tables using a HiveContext with the underlying files stored as Parquet Files. I'm using something along the lines of HiveContext.sql(SELECT a.col1, b.col2 FROM a JOIN b ON a.col1 = b.col1) to set up the join. When I execute this (with an action such as .count) I see the first few stages complete, but the job eventually stalls. The GC counts keep increasing for each executor. Running with 6 workers, each with 2T disk and 100GB RAM. Has anyone else run into this issue? I'm thinking I might be running into issues with the shuffling of the data, but I'm unsure of how to get around this? Is there a way to redistribute the rows based on the join key first, and then do the join? Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Long-GC-pauses-with-Spark-SQL-1-3-0-and-billion-row-tables-tp22750.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Long GC pauses with Spark SQL 1.3.0 and billion row tables
Could you be more specific in how this is done? A DataFrame class doesn't have that method. On Sun, May 3, 2015 at 11:07 PM, ayan guha guha.a...@gmail.com wrote: You can use custom partitioner to redistribution using partitionby On 4 May 2015 15:37, Nick Travers n.e.trav...@gmail.com wrote: I'm currently trying to join two large tables (order 1B rows each) using Spark SQL (1.3.0) and am running into long GC pauses which bring the job to a halt. I'm reading in both tables using a HiveContext with the underlying files stored as Parquet Files. I'm using something along the lines of HiveContext.sql(SELECT a.col1, b.col2 FROM a JOIN b ON a.col1 = b.col1) to set up the join. When I execute this (with an action such as .count) I see the first few stages complete, but the job eventually stalls. The GC counts keep increasing for each executor. Running with 6 workers, each with 2T disk and 100GB RAM. Has anyone else run into this issue? I'm thinking I might be running into issues with the shuffling of the data, but I'm unsure of how to get around this? Is there a way to redistribute the rows based on the join key first, and then do the join? Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Long-GC-pauses-with-Spark-SQL-1-3-0-and-billion-row-tables-tp22750.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark log analyzer sample
Exception in thread main java.lang.RuntimeException: org.apache.hadoop.ipc.RemoteException: Server IPC version 9 cannot communicate with client version 4 I am not using any hadoop facility (not even hdfs) then why it is giving this error . -- Thanks Regards, Anshu Shukla
Spark job concurrency problem
Hi, I have two small RDD, each has about 600 records. In my code, I did val rdd1 = sc...cache() val rdd2 = sc...cache() val result = rdd1.cartesian(rdd2).*repartition*(num_cpu).map {case (a,b) = some_expensive_job(a,b) } I ran my job in YARN cluster with --master yarn-cluster, I have 6 executor, and each has a large memory volume. However, I noticed my job is very slow. I went to the RM page, and found there are two containers, one is the driver, one is the worker. I guess this is correct? I went to the worker's log, and monitor the log detail. My app print some information, so I can use them to estimate the progress of the map operation. Looking at the log, it feels like the jobs are done one by one sequentially, rather than #cpu batch at a time. I checked the worker node, and their CPU are all busy. [image: --] Xi Shen [image: http://]about.me/davidshen http://about.me/davidshen?promo=email_sig http://about.me/davidshen
Spark Mongodb connection
Hi! I am new at Spark and I want to begin Spark with simple wordCount example in Java. But I want to give my input from Mongodb database. I want to learn how can I connect Mongodb database to my project. Any one can help for this issue. Have a nice day yasemin -- hiç ender hiç
Re: Problem in Standalone Mode
Can you paste the complete stacktrace? It looks like you are having version incompatibility with hadoop. Thanks Best Regards On Sat, May 2, 2015 at 4:36 PM, drarse drarse.a...@gmail.com wrote: When I run my program with Spark-Submit everythink are ok. But when I try run in satandalone mode I obtain the nex Exceptions: ((This is with val df = sqlContext.jsonFile(./datos.json) )) java.io.EOFException [error] at java.io.ObjectInputStream$BlockDataInputStream.readFully(ObjectInputStream.java:2744) This is SparkConf val sparkConf = new SparkConf().setAppName(myApp) .setMaster(spark://master:7077) .setSparkHome(/usr/local/spark/) .setJars(Seq(./target/scala-2.10/myApp.jar)) -- View this message in context: Problem in Standalone Mode http://apache-spark-user-list.1001560.n3.nabble.com/Problem-in-Standalone-Mode-tp22741.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
how to make sure data is partitioned across all workers?
Hi, Is there any way to enforce Spark to partition cached data across all worker nodes, so all data is not cached only in one of the worker nodes? best, /Shahab
Re: spark filestrea problem
With filestream you can actually pass a filter parameter to avoid loading up .tmp file/directories. Also, when you move/rename a file, the file creation date doesn't change and hence spark won't detect them i believe. Thanks Best Regards On Sat, May 2, 2015 at 9:37 PM, Evo Eftimov evo.efti...@isecc.com wrote: it seems that on Spark Streaming 1.2 the filestream API may have a bug - it doesn't detect new files when moving or renaming them on HDFS - only when copying them but that leads to a well known problem with .tmp files which get removed and make spark steraming filestream throw exception
Re: Remoting warning when submitting to cluster
Looks like a version incompatibility, just make sure you have the proper version of spark. Also look further in the stacktrace what is causing Futures timed out (it could be a network issue also if the ports aren't opened properly) Thanks Best Regards On Sat, May 2, 2015 at 12:04 AM, javidelgadillo jdelgadi...@esri.com wrote: Hello all!! We've been prototyping some spark applications to read messages from Kafka topics. The application is quite simple, we use KafkaUtils.createStream to receive a stream of CSV messages from a Kafka Topic. We parse the CSV and count the number of messages we get in each RDD. At a high-level (removing the abstractions of our appliction), it looks like this: val sc = new SparkConf() .setAppName(appName) .set(spark.executor.memory, 1024m) .set(spark.cores.max, 3) .set(spark.app.name, appName) .set(spark.ui.port, sparkUIPort) val ssc = new StreamingContext(sc, Milliseconds(emitInterval.toInt)) KafkaUtils .createStream(ssc, zookeeperQuorum, consumerGroup, topicMap) .map(_._2) .foreachRDD( (rdd:RDD, time: Time) = { println(Time %s: (%s total records).format(time, rdd.count())) } When I submit this using to spark master as local[3] everything behaves as I'd expect. After some startup overhead, I'm seeing the count printed to be the same as the count I'm simulating (1 every second for example). When I submit this to a spark master using spark://master.host:7077, the behavior is different. The overhead go start receiving seems longer and some runs I don't see anything for 30 seconds even though my simulator is sending messages to the topic. I also see the following error written to stderr by every executor assigned to the job: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/05/01 10:11:38 INFO SecurityManager: Changing view acls to: username 15/05/01 10:11:38 INFO SecurityManager: Changing modify acls to: username 15/05/01 10:11:38 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(javi4211); users with modify permissions: Set(username) 15/05/01 10:11:38 INFO Slf4jLogger: Slf4jLogger started 15/05/01 10:11:38 INFO Remoting: Starting remoting 15/05/01 10:11:39 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://driverpropsfetc...@master.host:56534] 15/05/01 10:11:39 INFO Utils: Successfully started service 'driverPropsFetcher' on port 56534. 15/05/01 10:11:40 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://sparkdri...@driver.host:51837]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: no further information: driver.host/10.27.51.214:51837 15/05/01 10:12:09 ERROR UserGroupInformation: PriviledgedActionException as:username cause:java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] Exception in thread main java.lang.reflect.UndeclaredThrowableException: Unknown exception in doAs at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1134) at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:59) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:128) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:224) at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala) Caused by: java.security.PrivilegedActionException: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121) ... 4 more Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) Is there something else I need to do configure to ensure akka remoting will work correctly when running spark cluster? Or can I ignore this error? -Javier -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Remoting-warning-when-submitting-to-cluster-tp22733.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Hardware requirements
500GB of data will have nearly 3900 partitions and if you can have nearly that many number of cores and around 500GB of memory then things will be lightening fast. :) Thanks Best Regards On Sun, May 3, 2015 at 12:49 PM, sherine ahmed sherine.sha...@hotmail.com wrote: I need to use spark to upload a 500 GB data from hadoop on standalone mode cluster what are the minimum hardware requirements if it's known that it will be used for advanced analysis (social network analysis)? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Hardware-requirements-tp22744.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark - Timeout Issues - OutOfMemoryError
Hello Dean Others, Thanks for the response. I tried with 100,200, 400, 600 and 1200 repartitions with 100,200,400 and 800 executors. Each time all the tasks of join complete in less than a minute except one and that one tasks runs forever. I have a huge cluster at my disposal. The data for each of 1199 tasks is around 40MB/30k records and for 1 never ending task is 1.5G/98million records. I see that there is data skew among tasks. I had observed this a week earlier and i have no clue on how to fix it and when someone suggested that repartition might make things more parallel, but the problem is still persistent. Please suggest on how to get the task to complete. All i want to do is join two datasets. (dataset1 is in sequence file and dataset2 is in avro format). Ex: Tasks IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch Time DurationGC TimeShuffle Read Size / RecordsShuffle Spill (Memory)Shuffle Spill (Disk)Errors 0 3771 0 RUNNING PROCESS_LOCAL 114 / host1 2015/05/04 01:27:44 7.3 min 19 s 1591.2 MB / 98931767 0.0 B 0.0 B 1 3772 0 SUCCESS PROCESS_LOCAL 226 / host2 2015/05/04 01:27:44 28 s 2 s 39.2 MB / 29754 0.0 B 0.0 B 2 3773 0 SUCCESS PROCESS_LOCAL 283 / host3 2015/05/04 01:27:44 26 s 2 s 39.0 MB / 29646 0.0 B 0.0 B 5 3776 0 SUCCESS PROCESS_LOCAL 320 / host4 2015/05/04 01:27:44 31 s 3 s 38.8 MB / 29512 0.0 B 0.0 B 4 3775 0 SUCCESS PROCESS_LOCAL 203 / host5 2015/05/04 01:27:44 41 s 3 s 38.4 MB / 29169 0.0 B 0.0 B 3 3774 0 SUCCESS PROCESS_LOCAL 84 / host6 2015/05/04 01:27:44 24 s 2 s 38.5 MB / 29258 0.0 B 0.0 B 8 3779 0 SUCCESS PROCESS_LOCAL 309 / host7 2015/05/04 01:27:44 31 s 4 s 39.5 MB / 30008 0.0 B 0.0 B There are 1200 tasks in total. On Sun, May 3, 2015 at 9:53 PM, Dean Wampler deanwamp...@gmail.com wrote: I don't know the full context of what you're doing, but serialization errors usually mean you're attempting to serialize something that can't be serialized, like the SparkContext. Kryo won't help there. The arguments to spark-submit you posted previously look good: 2) --num-executors 96 --driver-memory 12g --driver-java-options -XX:MaxPermSize=10G --executor-memory 12g --executor-cores 4 I suspect you aren't getting the parallelism you need. For partitioning, if your data is in HDFS and your block size is 128MB, then you'll get ~195 partitions anyway. If it takes 7 hours to do a join over 25GB of data, you have some other serious bottleneck. You should examine the web console and the logs to determine where all the time is going. Questions you might pursue: - How long does each task take to complete? - How many of those 195 partitions/tasks are processed at the same time? That is, how many slots are available? Maybe you need more nodes if the number of slots is too low. Based on your command arguments, you should be able to process 1/2 of them at a time, unless the cluster is busy. - Is the cluster swamped with other work? - How much data does each task process? Is the data roughly the same from one task to the next? If not, then you might have serious key skew? You may also need to research the details of how joins are implemented and some of the common tricks for organizing data to minimize having to shuffle all N by M records. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Sun, May 3, 2015 at 11:02 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Hello Deam, If I don;t use Kryo serializer i got Serialization error and hence am using it. If I don';t use partitionBy/reparition then the simply join never completed even after 7 hours and infact as next step i need to run it against 250G as that is my full dataset size. Someone here suggested to me to use repartition. Assuming reparition is mandatory , how do i decide whats the right number ? When i am using 400 i do not get NullPointerException that i talked about, which is strange. I never saw that exception against small random dataset but see it with 25G and again with 400 partitions , i do not see it. On Sun, May 3, 2015 at 9:15 PM, Dean Wampler deanwamp...@gmail.com wrote: IMHO, you are trying waaay to hard to optimize work on what is really a small data set. 25G, even 250G, is not that much data, especially if you've spent a month trying to get something to work that should be simple. All these errors are from optimization attempts. Kryo is great, but if it's not working reliably for some reason, then don't use it. Rather than force 200 partitions, let Spark try to figure out a good-enough number. (If you really need to force a partition count, use the repartition method instead, unless you're overriding the partitioner.) So. I recommend that you eliminate all the optimizations: Kryo, partitionBy, etc. Just use the simplest
Re: Spark - Timeout Issues - OutOfMemoryError
IMHO If your data or your algorithm is prone to data skew, I think you have to fix this from application level, Spark itself cannot overcome this problem (if one key has large amount of values), you may change your algorithm to choose another shuffle key, somethings like this to avoid shuffle on skewed keys. 2015-05-04 16:41 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com: Hello Dean Others, Thanks for the response. I tried with 100,200, 400, 600 and 1200 repartitions with 100,200,400 and 800 executors. Each time all the tasks of join complete in less than a minute except one and that one tasks runs forever. I have a huge cluster at my disposal. The data for each of 1199 tasks is around 40MB/30k records and for 1 never ending task is 1.5G/98million records. I see that there is data skew among tasks. I had observed this a week earlier and i have no clue on how to fix it and when someone suggested that repartition might make things more parallel, but the problem is still persistent. Please suggest on how to get the task to complete. All i want to do is join two datasets. (dataset1 is in sequence file and dataset2 is in avro format). Ex: Tasks IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch Time DurationGC TimeShuffle Read Size / RecordsShuffle Spill (Memory)Shuffle Spill (Disk)Errors 0 3771 0 RUNNING PROCESS_LOCAL 114 / host1 2015/05/04 01:27:44 7.3 min 19 s 1591.2 MB / 98931767 0.0 B 0.0 B 1 3772 0 SUCCESS PROCESS_LOCAL 226 / host2 2015/05/04 01:27:44 28 s 2 s 39.2 MB / 29754 0.0 B 0.0 B 2 3773 0 SUCCESS PROCESS_LOCAL 283 / host3 2015/05/04 01:27:44 26 s 2 s 39.0 MB / 29646 0.0 B 0.0 B 5 3776 0 SUCCESS PROCESS_LOCAL 320 / host4 2015/05/04 01:27:44 31 s 3 s 38.8 MB / 29512 0.0 B 0.0 B 4 3775 0 SUCCESS PROCESS_LOCAL 203 / host5 2015/05/04 01:27:44 41 s 3 s 38.4 MB / 29169 0.0 B 0.0 B 3 3774 0 SUCCESS PROCESS_LOCAL 84 / host6 2015/05/04 01:27:44 24 s 2 s 38.5 MB / 29258 0.0 B 0.0 B 8 3779 0 SUCCESS PROCESS_LOCAL 309 / host7 2015/05/04 01:27:44 31 s 4 s 39.5 MB / 30008 0.0 B 0.0 B There are 1200 tasks in total. On Sun, May 3, 2015 at 9:53 PM, Dean Wampler deanwamp...@gmail.com wrote: I don't know the full context of what you're doing, but serialization errors usually mean you're attempting to serialize something that can't be serialized, like the SparkContext. Kryo won't help there. The arguments to spark-submit you posted previously look good: 2) --num-executors 96 --driver-memory 12g --driver-java-options -XX:MaxPermSize=10G --executor-memory 12g --executor-cores 4 I suspect you aren't getting the parallelism you need. For partitioning, if your data is in HDFS and your block size is 128MB, then you'll get ~195 partitions anyway. If it takes 7 hours to do a join over 25GB of data, you have some other serious bottleneck. You should examine the web console and the logs to determine where all the time is going. Questions you might pursue: - How long does each task take to complete? - How many of those 195 partitions/tasks are processed at the same time? That is, how many slots are available? Maybe you need more nodes if the number of slots is too low. Based on your command arguments, you should be able to process 1/2 of them at a time, unless the cluster is busy. - Is the cluster swamped with other work? - How much data does each task process? Is the data roughly the same from one task to the next? If not, then you might have serious key skew? You may also need to research the details of how joins are implemented and some of the common tricks for organizing data to minimize having to shuffle all N by M records. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Sun, May 3, 2015 at 11:02 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Hello Deam, If I don;t use Kryo serializer i got Serialization error and hence am using it. If I don';t use partitionBy/reparition then the simply join never completed even after 7 hours and infact as next step i need to run it against 250G as that is my full dataset size. Someone here suggested to me to use repartition. Assuming reparition is mandatory , how do i decide whats the right number ? When i am using 400 i do not get NullPointerException that i talked about, which is strange. I never saw that exception against small random dataset but see it with 25G and again with 400 partitions , i do not see it. On Sun, May 3, 2015 at 9:15 PM, Dean Wampler deanwamp...@gmail.com wrote: IMHO, you are trying waaay to hard to optimize work on what is really a small data set. 25G, even 250G, is not that much data, especially if you've spent a month trying to get something to work that should be simple. All these errors are from optimization attempts.
Re: Hardware requirements
Hi How do you figure out 500gig~3900 partitions? I am trying to do the math. If I assume 64mb block size then 1G~16 blocks and 500g~8000 blocks. If we assume split and block sizes are same, shouldn't we end up with 8k partitions? On 4 May 2015 17:49, Akhil Das ak...@sigmoidanalytics.com wrote: 500GB of data will have nearly 3900 partitions and if you can have nearly that many number of cores and around 500GB of memory then things will be lightening fast. :) Thanks Best Regards On Sun, May 3, 2015 at 12:49 PM, sherine ahmed sherine.sha...@hotmail.com wrote: I need to use spark to upload a 500 GB data from hadoop on standalone mode cluster what are the minimum hardware requirements if it's known that it will be used for advanced analysis (social network analysis)? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Hardware-requirements-tp22744.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark - Timeout Issues - OutOfMemoryError
Hello Shao, Can you talk more about shuffle key or point me to APIs that allow me to change shuffle key. I will try with different keys and see the performance. What is the shuffle key by default ? On Mon, May 4, 2015 at 2:37 PM, Saisai Shao sai.sai.s...@gmail.com wrote: IMHO If your data or your algorithm is prone to data skew, I think you have to fix this from application level, Spark itself cannot overcome this problem (if one key has large amount of values), you may change your algorithm to choose another shuffle key, somethings like this to avoid shuffle on skewed keys. 2015-05-04 16:41 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com: Hello Dean Others, Thanks for the response. I tried with 100,200, 400, 600 and 1200 repartitions with 100,200,400 and 800 executors. Each time all the tasks of join complete in less than a minute except one and that one tasks runs forever. I have a huge cluster at my disposal. The data for each of 1199 tasks is around 40MB/30k records and for 1 never ending task is 1.5G/98million records. I see that there is data skew among tasks. I had observed this a week earlier and i have no clue on how to fix it and when someone suggested that repartition might make things more parallel, but the problem is still persistent. Please suggest on how to get the task to complete. All i want to do is join two datasets. (dataset1 is in sequence file and dataset2 is in avro format). Ex: Tasks IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch Time DurationGC TimeShuffle Read Size / RecordsShuffle Spill (Memory)Shuffle Spill (Disk)Errors 0 3771 0 RUNNING PROCESS_LOCAL 114 / host1 2015/05/04 01:27:44 7.3 min 19 s 1591.2 MB / 98931767 0.0 B 0.0 B 1 3772 0 SUCCESS PROCESS_LOCAL 226 / host2 2015/05/04 01:27:44 28 s 2 s 39.2 MB / 29754 0.0 B 0.0 B 2 3773 0 SUCCESS PROCESS_LOCAL 283 / host3 2015/05/04 01:27:44 26 s 2 s 39.0 MB / 29646 0.0 B 0.0 B 5 3776 0 SUCCESS PROCESS_LOCAL 320 / host4 2015/05/04 01:27:44 31 s 3 s 38.8 MB / 29512 0.0 B 0.0 B 4 3775 0 SUCCESS PROCESS_LOCAL 203 / host5 2015/05/04 01:27:44 41 s 3 s 38.4 MB / 29169 0.0 B 0.0 B 3 3774 0 SUCCESS PROCESS_LOCAL 84 / host6 2015/05/04 01:27:44 24 s 2 s 38.5 MB / 29258 0.0 B 0.0 B 8 3779 0 SUCCESS PROCESS_LOCAL 309 / host7 2015/05/04 01:27:44 31 s 4 s 39.5 MB / 30008 0.0 B 0.0 B There are 1200 tasks in total. On Sun, May 3, 2015 at 9:53 PM, Dean Wampler deanwamp...@gmail.com wrote: I don't know the full context of what you're doing, but serialization errors usually mean you're attempting to serialize something that can't be serialized, like the SparkContext. Kryo won't help there. The arguments to spark-submit you posted previously look good: 2) --num-executors 96 --driver-memory 12g --driver-java-options -XX:MaxPermSize=10G --executor-memory 12g --executor-cores 4 I suspect you aren't getting the parallelism you need. For partitioning, if your data is in HDFS and your block size is 128MB, then you'll get ~195 partitions anyway. If it takes 7 hours to do a join over 25GB of data, you have some other serious bottleneck. You should examine the web console and the logs to determine where all the time is going. Questions you might pursue: - How long does each task take to complete? - How many of those 195 partitions/tasks are processed at the same time? That is, how many slots are available? Maybe you need more nodes if the number of slots is too low. Based on your command arguments, you should be able to process 1/2 of them at a time, unless the cluster is busy. - Is the cluster swamped with other work? - How much data does each task process? Is the data roughly the same from one task to the next? If not, then you might have serious key skew? You may also need to research the details of how joins are implemented and some of the common tricks for organizing data to minimize having to shuffle all N by M records. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Sun, May 3, 2015 at 11:02 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Hello Deam, If I don;t use Kryo serializer i got Serialization error and hence am using it. If I don';t use partitionBy/reparition then the simply join never completed even after 7 hours and infact as next step i need to run it against 250G as that is my full dataset size. Someone here suggested to me to use repartition. Assuming reparition is mandatory , how do i decide whats the right number ? When i am using 400 i do not get NullPointerException that i talked about, which is strange. I never saw that exception against small random dataset but see it with 25G and again with 400 partitions , i do not see it. On Sun, May 3, 2015 at 9:15 PM, Dean Wampler
Re: Hardware requirements
Assume your block size is 128MB. Thanks Best Regards On Mon, May 4, 2015 at 2:38 PM, ayan guha guha.a...@gmail.com wrote: Hi How do you figure out 500gig~3900 partitions? I am trying to do the math. If I assume 64mb block size then 1G~16 blocks and 500g~8000 blocks. If we assume split and block sizes are same, shouldn't we end up with 8k partitions? On 4 May 2015 17:49, Akhil Das ak...@sigmoidanalytics.com wrote: 500GB of data will have nearly 3900 partitions and if you can have nearly that many number of cores and around 500GB of memory then things will be lightening fast. :) Thanks Best Regards On Sun, May 3, 2015 at 12:49 PM, sherine ahmed sherine.sha...@hotmail.com wrote: I need to use spark to upload a 500 GB data from hadoop on standalone mode cluster what are the minimum hardware requirements if it's known that it will be used for advanced analysis (social network analysis)? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Hardware-requirements-tp22744.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark - Timeout Issues - OutOfMemoryError
Shuffle key is depending on your implementation, I'm not sure if you are familiar with MapReduce, the mapper output is a key-value pair, where the key is the shuffle key for shuffling, Spark is also the same. 2015-05-04 17:31 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com: Hello Shao, Can you talk more about shuffle key or point me to APIs that allow me to change shuffle key. I will try with different keys and see the performance. What is the shuffle key by default ? On Mon, May 4, 2015 at 2:37 PM, Saisai Shao sai.sai.s...@gmail.com wrote: IMHO If your data or your algorithm is prone to data skew, I think you have to fix this from application level, Spark itself cannot overcome this problem (if one key has large amount of values), you may change your algorithm to choose another shuffle key, somethings like this to avoid shuffle on skewed keys. 2015-05-04 16:41 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com: Hello Dean Others, Thanks for the response. I tried with 100,200, 400, 600 and 1200 repartitions with 100,200,400 and 800 executors. Each time all the tasks of join complete in less than a minute except one and that one tasks runs forever. I have a huge cluster at my disposal. The data for each of 1199 tasks is around 40MB/30k records and for 1 never ending task is 1.5G/98million records. I see that there is data skew among tasks. I had observed this a week earlier and i have no clue on how to fix it and when someone suggested that repartition might make things more parallel, but the problem is still persistent. Please suggest on how to get the task to complete. All i want to do is join two datasets. (dataset1 is in sequence file and dataset2 is in avro format). Ex: Tasks IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch Time DurationGC TimeShuffle Read Size / RecordsShuffle Spill (Memory)Shuffle Spill (Disk)Errors 0 3771 0 RUNNING PROCESS_LOCAL 114 / host1 2015/05/04 01:27:44 7.3 min 19 s 1591.2 MB / 98931767 0.0 B 0.0 B 1 3772 0 SUCCESS PROCESS_LOCAL 226 / host2 2015/05/04 01:27:44 28 s 2 s 39.2 MB / 29754 0.0 B 0.0 B 2 3773 0 SUCCESS PROCESS_LOCAL 283 / host3 2015/05/04 01:27:44 26 s 2 s 39.0 MB / 29646 0.0 B 0.0 B 5 3776 0 SUCCESS PROCESS_LOCAL 320 / host4 2015/05/04 01:27:44 31 s 3 s 38.8 MB / 29512 0.0 B 0.0 B 4 3775 0 SUCCESS PROCESS_LOCAL 203 / host5 2015/05/04 01:27:44 41 s 3 s 38.4 MB / 29169 0.0 B 0.0 B 3 3774 0 SUCCESS PROCESS_LOCAL 84 / host6 2015/05/04 01:27:44 24 s 2 s 38.5 MB / 29258 0.0 B 0.0 B 8 3779 0 SUCCESS PROCESS_LOCAL 309 / host7 2015/05/04 01:27:44 31 s 4 s 39.5 MB / 30008 0.0 B 0.0 B There are 1200 tasks in total. On Sun, May 3, 2015 at 9:53 PM, Dean Wampler deanwamp...@gmail.com wrote: I don't know the full context of what you're doing, but serialization errors usually mean you're attempting to serialize something that can't be serialized, like the SparkContext. Kryo won't help there. The arguments to spark-submit you posted previously look good: 2) --num-executors 96 --driver-memory 12g --driver-java-options -XX:MaxPermSize=10G --executor-memory 12g --executor-cores 4 I suspect you aren't getting the parallelism you need. For partitioning, if your data is in HDFS and your block size is 128MB, then you'll get ~195 partitions anyway. If it takes 7 hours to do a join over 25GB of data, you have some other serious bottleneck. You should examine the web console and the logs to determine where all the time is going. Questions you might pursue: - How long does each task take to complete? - How many of those 195 partitions/tasks are processed at the same time? That is, how many slots are available? Maybe you need more nodes if the number of slots is too low. Based on your command arguments, you should be able to process 1/2 of them at a time, unless the cluster is busy. - Is the cluster swamped with other work? - How much data does each task process? Is the data roughly the same from one task to the next? If not, then you might have serious key skew? You may also need to research the details of how joins are implemented and some of the common tricks for organizing data to minimize having to shuffle all N by M records. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Sun, May 3, 2015 at 11:02 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Hello Deam, If I don;t use Kryo serializer i got Serialization error and hence am using it. If I don';t use partitionBy/reparition then the simply join never completed even after 7 hours and infact as next step i need to run it against 250G as that is my full dataset size. Someone here suggested to me to use repartition. Assuming reparition is mandatory , how do i decide whats the right number ?
Re: Map-Side Join in Spark
This is how i implemented map-side join using broadcast. val listings = DataUtil.getDwLstgItem(sc, DateUtil.addDaysToDate(startDate, -89)) val viEvents = details.map { vi = (vi.get(14).asInstanceOf[Long], vi) } val lstgItemMap = listings.map { lstg = (lstg.getItemId().toLong, lstg) }.*collectAsMap* val broadCastMap = sc.*broadcast*(lstgItemMap) val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))] = viEvents.mapPartitions({ iter = val lstgItemMap = broadCastMap.value *for* { (itemId, viDetail) - iter if (lstgItemMap.contains(itemId)) } *yield* ({ val listing = lstgItemMap.get(itemId).get val viSummary = new VISummary viSummary.leafCategoryId = listing.getLeafCategId().toInt viSummary.itemSiteId = listing.getItemSiteId().toInt viSummary.auctionTypeCode = listing.getAuctTypeCode().toInt viSummary.sellerCountryId = listing.getSlrCntryId().toInt viSummary.buyerSegment = 0 viSummary.isBin = (if (listing.getBinPriceLstgCurncy.doubleValue() 0) 1 else 0) val sellerId = listing.getSlrId.toLong (sellerId, (viDetail, viSummary, itemId)) }) }) Usage: collectAsMap, broadcast, scala-yield. Learning: As lstgItemMap is collected as Map on driver, when its size exceeds driver-memory it throws OOM error. As i had a limit of 12G on memory and my dataset size is around 100G, i could not use map-side join and switched back to join(). Sharing this so others can use the code example in case they want to implement map side join with Spark+Scala. On Tue, Apr 21, 2015 at 7:32 PM, ayan guha guha.a...@gmail.com wrote: Hi Sorry was typing from mobile hence could not elaborate earlier. I presume you want to do map-side join and you mean you want to join 2 RDD without shuffle? Please have a quick look http://apache-spark-user-list.1001560.n3.nabble.com/Text-file-and-shuffle-td5973.html#none 1) co-partition you data for cogroup: val par = HashPartitioner(128) val x = sc.textFile(..).map(...).partitionBy(par) val y = sc.textFile(...).map(...).partitionBy(par) ... This should enable join with (much less) shuffle. Another option provided in the same thread - to broadcast in case one of the table is small(ish). Hope this helps. Best Ayan On Tue, Apr 21, 2015 at 3:56 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: These are pair RDDs (itemId, item) (itemId, listing). What do you mean by re-partitioning of these RDDS ? Now what you mean by your partitioner Can you elaborate ? On Tue, Apr 21, 2015 at 11:18 AM, ayan guha guha.a...@gmail.com wrote: If you are using a pairrdd, then you can use partition by method to provide your partitioner On 21 Apr 2015 15:04, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: What is re-partition ? On Tue, Apr 21, 2015 at 10:23 AM, ayan guha guha.a...@gmail.com wrote: In my understanding you need to create a key out of the data and repartition both datasets to achieve map side join. On 21 Apr 2015 14:10, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Can someone share their working code of Map Side join in Spark + Scala. (No Spark-SQL) The only resource i could find was this (Open in chrome with Chinese to english translator) http://dongxicheng.org/framework-on-yarn/apache-spark-join-two-tables/ -- Deepak -- Deepak -- Deepak -- Best Regards, Ayan Guha -- Deepak
Re: Spark - Timeout Issues - OutOfMemoryError
One dataset (RDD Pair) val lstgItem = listings.map { lstg = (lstg.getItemId().toLong, lstg) } Second Dataset (RDDPair) val viEvents = viEventsRaw.map { vi = (vi.get(14).asInstanceOf[Long], vi) } As i want to join based on item Id that is used as first element in the tuple in both cases and i think thats what is shuffle key. listings == Data set contains all the unique item ids that are ever listed on the ecommerce site. viEvents === List of items viewed by user in last day. This will always be a subset of the total set. So i do not understand what is data skewness. When my long running task is working on 1591.2 MB / 98,931,767 does that mean 98 million reocrds contain all the same item ID ? How can millions of user look at the same item in last day ? Or does this dataset contain records across item ids ? Regards, Deepak On Mon, May 4, 2015 at 3:08 PM, Saisai Shao sai.sai.s...@gmail.com wrote: Shuffle key is depending on your implementation, I'm not sure if you are familiar with MapReduce, the mapper output is a key-value pair, where the key is the shuffle key for shuffling, Spark is also the same. 2015-05-04 17:31 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com: Hello Shao, Can you talk more about shuffle key or point me to APIs that allow me to change shuffle key. I will try with different keys and see the performance. What is the shuffle key by default ? On Mon, May 4, 2015 at 2:37 PM, Saisai Shao sai.sai.s...@gmail.com wrote: IMHO If your data or your algorithm is prone to data skew, I think you have to fix this from application level, Spark itself cannot overcome this problem (if one key has large amount of values), you may change your algorithm to choose another shuffle key, somethings like this to avoid shuffle on skewed keys. 2015-05-04 16:41 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com: Hello Dean Others, Thanks for the response. I tried with 100,200, 400, 600 and 1200 repartitions with 100,200,400 and 800 executors. Each time all the tasks of join complete in less than a minute except one and that one tasks runs forever. I have a huge cluster at my disposal. The data for each of 1199 tasks is around 40MB/30k records and for 1 never ending task is 1.5G/98million records. I see that there is data skew among tasks. I had observed this a week earlier and i have no clue on how to fix it and when someone suggested that repartition might make things more parallel, but the problem is still persistent. Please suggest on how to get the task to complete. All i want to do is join two datasets. (dataset1 is in sequence file and dataset2 is in avro format). Ex: Tasks IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch Time DurationGC TimeShuffle Read Size / RecordsShuffle Spill (Memory)Shuffle Spill (Disk)Errors 0 3771 0 RUNNING PROCESS_LOCAL 114 / host1 2015/05/04 01:27:44 7.3 min 19 s 1591.2 MB / 98931767 0.0 B 0.0 B 1 3772 0 SUCCESS PROCESS_LOCAL 226 / host2 2015/05/04 01:27:44 28 s 2 s 39.2 MB / 29754 0.0 B 0.0 B 2 3773 0 SUCCESS PROCESS_LOCAL 283 / host3 2015/05/04 01:27:44 26 s 2 s 39.0 MB / 29646 0.0 B 0.0 B 5 3776 0 SUCCESS PROCESS_LOCAL 320 / host4 2015/05/04 01:27:44 31 s 3 s 38.8 MB / 29512 0.0 B 0.0 B 4 3775 0 SUCCESS PROCESS_LOCAL 203 / host5 2015/05/04 01:27:44 41 s 3 s 38.4 MB / 29169 0.0 B 0.0 B 3 3774 0 SUCCESS PROCESS_LOCAL 84 / host6 2015/05/04 01:27:44 24 s 2 s 38.5 MB / 29258 0.0 B 0.0 B 8 3779 0 SUCCESS PROCESS_LOCAL 309 / host7 2015/05/04 01:27:44 31 s 4 s 39.5 MB / 30008 0.0 B 0.0 B There are 1200 tasks in total. On Sun, May 3, 2015 at 9:53 PM, Dean Wampler deanwamp...@gmail.com wrote: I don't know the full context of what you're doing, but serialization errors usually mean you're attempting to serialize something that can't be serialized, like the SparkContext. Kryo won't help there. The arguments to spark-submit you posted previously look good: 2) --num-executors 96 --driver-memory 12g --driver-java-options -XX:MaxPermSize=10G --executor-memory 12g --executor-cores 4 I suspect you aren't getting the parallelism you need. For partitioning, if your data is in HDFS and your block size is 128MB, then you'll get ~195 partitions anyway. If it takes 7 hours to do a join over 25GB of data, you have some other serious bottleneck. You should examine the web console and the logs to determine where all the time is going. Questions you might pursue: - How long does each task take to complete? - How many of those 195 partitions/tasks are processed at the same time? That is, how many slots are available? Maybe you need more nodes if the number of slots is too low. Based on your command arguments, you should be able to process 1/2 of them at a time, unless the cluster is busy. - Is the cluster swamped with other work? - How much data does each task process? Is the data roughly the same from one task to the next? If not, then you
Re: Spark - Timeout Issues - OutOfMemoryError
Four tasks are now failing with IndexIDAttemptStatus ▾Locality LevelExecutor ID / HostLaunch TimeDurationGC TimeShuffle Read Size / RecordsShuffle Spill (Memory)Shuffle Spill (Disk) Errors 0 3771 0 FAILED PROCESS_LOCAL 114 / host1 2015/05/04 01:27:44 / ExecutorLostFailure (executor 114 lost) 1007 4973 1 FAILED PROCESS_LOCAL 420 / host2 2015/05/04 02:13:14 / FetchFailed(null, shuffleId=1, mapId=-1, reduceId=1007, message= +details FetchFailed(null, shuffleId=1, mapId=-1, reduceId=1007, message= org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 1 at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:385) at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:382) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:381) at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:177) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:127) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:127) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) ) 371 4972 1 FAILED PROCESS_LOCAL 563 / host3 2015/05/04 02:13:14 / FetchFailed(null, shuffleId=1, mapId=-1, reduceId=371, message= +details FetchFailed(null, shuffleId=1, mapId=-1, reduceId=371, message= org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 1 at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:385) at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:382) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at
Re: Unusual filter behaviour on RDD
I got it working. When I was persisting, it only persisted 85% of the RDD to memory and the rest of the RDD gets recomputed every time. Because my flagged RDD uses a random method to create the field, I was getting unpredictable results. When I persist using: flagged.persist(StorageLevel.MEMORY_AND_DISK) it works perfectly well. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Unusual-filter-behaviour-on-RDD-tp22749p22752.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
mapping JavaRDD to jdbc DataFrame
Hi, I'd like to use a JavaRDD containing parameters for an SQL query, and use SparkSQL jdbc to load data from mySQL. Consider the following pseudo code: JavaRDDString namesRdd = ... ; ... options.put(url, jdbc:mysql://mysql?user=usr); options.put(password, pass); options.put(dbtable, (SELECT * FROM mytable WHERE userName = ?) sp_campaigns); DataFrame myTableDF = m_sqlContext.load(jdbc, options); I'm looking for a way to map namesRdd and get for each name the result of the queries, without loosing spark context. Using a mapping function doesn't seem like an option, because I don't have SQLContext inside it. I can only think of using collect, and than iterating over the string in the RDD and execute the query, but it would run in the driver program. Any suggestions? Thanks, Lior
Re: Spark - Timeout Issues - OutOfMemoryError
From the symptoms you mentioned that one task's shuffle write is much larger than all the other task, it is quite similar to normal data skew behavior, I just give some advice based on your descriptions, I think you need to detect whether data is actually skewed or not. The shuffle will put data with same partitioner strategy (default is hash partitioner) into one task, so the same key data will be put into the same task, but one task not just has only one key. 2015-05-04 18:04 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com: Attached image shows the Spark UI for the job. On Mon, May 4, 2015 at 3:28 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Four tasks are now failing with IndexIDAttemptStatus ▾Locality LevelExecutor ID / HostLaunch TimeDurationGC TimeShuffle Read Size / RecordsShuffle Spill (Memory)Shuffle Spill (Disk) Errors 0 3771 0 FAILED PROCESS_LOCAL 114 / host1 2015/05/04 01:27:44 / ExecutorLostFailure (executor 114 lost) 1007 4973 1 FAILED PROCESS_LOCAL 420 / host2 2015/05/04 02:13:14 / FetchFailed(null, shuffleId=1, mapId=-1, reduceId=1007, message= +details FetchFailed(null, shuffleId=1, mapId=-1, reduceId=1007, message= org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 1 at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:385) at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:382) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:381) at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:177) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:127) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:127) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) ) 371 4972 1 FAILED PROCESS_LOCAL 563 / host3 2015/05/04 02:13:14 / FetchFailed(null, shuffleId=1, mapId=-1, reduceId=371, message= +details FetchFailed(null, shuffleId=1, mapId=-1, reduceId=371, message= org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 1 at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:385) at
Re: sparksql running slow while joining 2 tables.
Hi, What is you Spark version ? Regards, Olivier. Le lun. 4 mai 2015 à 11:03, luohui20...@sina.com a écrit : hi guys when i am running a sql like select a.name,a.startpoint,a.endpoint, a.piece from db a join sample b on (a.name = b.name) where (b.startpoint a.startpoint + 25); I found sparksql running slow in minutes which may caused by very long GC and shuffle time. table db is created from a txt file size at 56mb while table sample sized at 26mb, both at small size. my spark cluster is a standalone pseudo-distributed spark cluster with 8g executor and 4g driver manager. any advises? thank you guys. Thanksamp;Best regards! 罗辉 San.Luo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark - Timeout Issues - OutOfMemoryError
I ran it against one file instead of 10 files and i see one task is still running after 33 mins its shuffle read size is 780MB/50 mil records. I did a count of records for each itemId from dataset-2 [One FILE] (Second Dataset (RDDPair) val viEvents = viEventsRaw.map { vi = (vi.get(14 ).asInstanceOf[Long], vi) } ). This is the dataset that contains the list of items viewed by user in one day. *Item IdCount* 201335783004 537 111654496030 353 141640703798 287 191568402102 258 111654479898 217 231521843148 211 251931716094 201 111654493548 181 181503913062 181 121635453050 152 261798565828 140 151494617682 139 251927181728 127 231516683056 119 141640492864 117 161677270656 117 171771073616 113 111649942124 109 191516989450 97 231539161292 94 221555628408 88 131497785968 87 121632233872 84 131335379184 83 281531363490 83 131492727742 79 231174157820 79 161666914810 77 251699753072 77 161683664300 76 I was assuming that data-skew would be if the top item(201335783004) had a count of 1 million, however its only few hundreds, then why is Spark skewing it in join ? What should i do that Spark distributes the records evenly ? In M/R we can change the Partitioner between mapper and reducer, how can i do in Spark for Join? IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDurationGC TimeShuffle Read Size / Records ▴Shuffle Spill (Memory)Shuffle Spill (Disk) Errors 0 3618 0 RUNNING PROCESS_LOCAL 4 / host12015/05/04 05:09:53 33 min 8.5 min 783.9 MB / 50,761,322 4.6 GB 47.5 MB 433 4051 0 SUCCESS PROCESS_LOCAL 1 / host2 2015/05/04 05:16:27 1.1 min 20 s 116.0 MB / 4505143 1282.3 MB 10.1 MB 218 3836 0 SUCCESS PROCESS_LOCAL 3 / host3 2015/05/04 05:13:01 53 s 11 s 76.4 MB / 2865143 879.6 MB 6.9 MB 113 3731 0 SUCCESS PROCESS_LOCAL 2 / host4 2015/05/04 05:11:30 31 s 8 s 6.9 MB / 5187 0.0 B 0.0 B On Mon, May 4, 2015 at 6:00 PM, Saisai Shao sai.sai.s...@gmail.com wrote: From the symptoms you mentioned that one task's shuffle write is much larger than all the other task, it is quite similar to normal data skew behavior, I just give some advice based on your descriptions, I think you need to detect whether data is actually skewed or not. The shuffle will put data with same partitioner strategy (default is hash partitioner) into one task, so the same key data will be put into the same task, but one task not just has only one key. 2015-05-04 18:04 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com: Attached image shows the Spark UI for the job. On Mon, May 4, 2015 at 3:28 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Four tasks are now failing with IndexIDAttemptStatus ▾Locality LevelExecutor ID / HostLaunch Time DurationGC TimeShuffle Read Size / RecordsShuffle Spill (Memory)Shuffle Spill (Disk)Errors 0 3771 0 FAILED PROCESS_LOCAL 114 / host1 2015/05/04 01:27:44 / ExecutorLostFailure (executor 114 lost) 1007 4973 1 FAILED PROCESS_LOCAL 420 / host2 2015/05/04 02:13:14 / FetchFailed(null, shuffleId=1, mapId=-1, reduceId=1007, message= +details FetchFailed(null, shuffleId=1, mapId=-1, reduceId=1007, message= org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 1 at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:385) at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:382) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:381) at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:177) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:127) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at
Re: Custom Partitioning Spark
Why do you use custom partitioner ? Are you doing join ? And, can you share some code that shows how you implemented custom partitioner. On Tue, Apr 21, 2015 at 8:38 PM, ayan guha guha.a...@gmail.com wrote: Are you looking for? *mapPartitions*(*func*)Similar to map, but runs separately on each partition (block) of the RDD, so *func* must be of type IteratorT = IteratorU when running on an RDD of type T.*mapPartitionsWithIndex*( *func*)Similar to mapPartitions, but also provides *func* with an integer value representing the index of the partition, so *func* must be of type (Int, IteratorT) = IteratorU when running on an RDD of type T. On Wed, Apr 22, 2015 at 1:00 AM, MUHAMMAD AAMIR mas.ha...@gmail.com wrote: Hi Archit, Thanks a lot for your reply. I am using rdd.partitions.length to check the number of partitions. rdd.partitions return the array of partitions. I would like to add one more question here do you have any idea how to get the objects in each partition ? Further is there any way to figure out which particular partitions an object bleongs ? Thanks, On Tue, Apr 21, 2015 at 12:16 PM, Archit Thakur archit279tha...@gmail.com wrote: Hi, This should work. How are you checking the no. of partitions.? Thanks and Regards, Archit Thakur. On Mon, Apr 20, 2015 at 7:26 PM, mas mas.ha...@gmail.com wrote: Hi, I aim to do custom partitioning on a text file. I first convert it into pairRDD and then try to use my custom partitioner. However, somehow it is not working. My code snippet is given below. val file=sc.textFile(filePath) val locLines=file.map(line = line.split(\t)).map(line= ((line(2).toDouble,line(3).toDouble),line(5).toLong)) val ck=locLines.partitionBy(new HashPartitioner(50)) // new CustomPartitioner(50) -- none of the way is working here. while reading the file using textFile method it automatically partitions the file. However when i explicitly want to partition the new rdd locLines, It doesn't appear to do anything and even the number of partitions are same which is created by sc.textFile(). Any help in this regard will be highly appreciated. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Custom-Partitioning-Spark-tp22571.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Regards, Muhammad Aamir *CONFIDENTIALITY:This email is intended solely for the person(s) named and may be confidential and/or privileged.If you are not the intended recipient,please delete it,notify me and do not copy,use,or disclose its content.* -- Best Regards, Ayan Guha -- Deepak
Re: Support for skewed joins in Spark
Hello Soila, Can you share the code that shows usuag of RangePartitioner ? I am facing issue with .join() where one task runs forever. I tried repartition(100/200/300/1200) and it did not help, I cannot use map-side join because both datasets are huge and beyond driver memory size. Regards, Deepak On Fri, Mar 13, 2015 at 9:54 AM, Soila Pertet Kavulya skavu...@gmail.com wrote: Thanks Shixiong, I'll try out your PR. Do you know what the status of the PR is? Are there any plans to incorporate this change to the DataFrames/SchemaRDDs in Spark 1.3? Soila On Thu, Mar 12, 2015 at 7:52 PM, Shixiong Zhu zsxw...@gmail.com wrote: I sent a PR to add skewed join last year: https://github.com/apache/spark/pull/3505 However, it does not split a key to multiple partitions. Instead, if a key has too many values that can not be fit in to memory, it will store the values into the disk temporarily and use disk files to do the join. Best Regards, Shixiong Zhu 2015-03-13 9:37 GMT+08:00 Soila Pertet Kavulya skavu...@gmail.com: Does Spark support skewed joins similar to Pig which distributes large keys over multiple partitions? I tried using the RangePartitioner but I am still experiencing failures because some keys are too large to fit in a single partition. I cannot use broadcast variables to work-around this because both RDDs are too large to fit in driver memory. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Deepak
spark 1.3.1
HI, I am trying to build a example code given at https://spark.apache.org/docs/latest/sql-programming-guide.html#interoperating-with-rdds code is: // Import factory methods provided by DataType.import org.apache.spark.sql.types.DataType;// Import StructType and StructFieldimport org.apache.spark.sql.types.StructType;import org.apache.spark.sql.types.StructField;// Import Row.import org.apache.spark.sql.Row; // sc is an existing JavaSparkContext.SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); // Load a text file and convert each line to a JavaBean.JavaRDDString people = sc.textFile(examples/src/main/resources/people.txt); // The schema is encoded in a stringString schemaString = name age; // Generate the schema based on the string of schemaListStructField fields = new ArrayListStructField();for (String fieldName: schemaString.split( )) { fields.add(DataType.createStructField(fieldName, DataType.StringType, true));}StructType schema = DataType.createStructType(fields); // Convert records of the RDD (people) to Rows.JavaRDDRow rowRDD = people.map( new FunctionString, Row() { public Row call(String record) throws Exception { String[] fields = record.split(,); return Row.create(fields[0], fields[1].trim()); } }); // Apply the schema to the RDD.DataFrame peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema); // Register the DataFrame as a table.peopleDataFrame.registerTempTable(people); // SQL can be run over RDDs that have been registered as tables.DataFrame results = sqlContext.sql(SELECT name FROM people); // The results of SQL queries are DataFrames and support all the normal RDD operations.// The columns of a row in the result can be accessed by ordinal.ListString names = results.map(new FunctionRow, String() { public String call(Row row) { return Name: + row.getString(0); }}).collect(); my pom file looks like: dependencies dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.3.1/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-sql_2.10/artifactId version1.3.1/version /dependency dependency groupIdorg.apache.hbase/groupId artifactIdhbase/artifactId version0.94.0/version /dependency When I try to mvn package I am getting this issue: cannot find symbol [ERROR] symbol: variable StringType [ERROR] location: class org.apache.spark.sql.types.DataType I have gone through https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/types/StringType.html What is missing here?
Difference ?
*Datasets* val viEvents = viEventsRaw.map { vi = (vi.get(14).asInstanceOf[Long], vi) } val lstgItem = listings.map { lstg = (lstg.getItemId().toLong, lstg) } What is the difference between 1) lstgItem.join(viEvents, new org.apache.spark.RangePartitioner(partitions = 1200, rdd = viEvents)).map { } 2) lstgItem.join(viEvents).map { } 3) lstgItem.join(viEvents,1200).map { } 4) lstgItem.join(viEvents, new org.apache.spark.HashPartitioner(1200).map { } 5) viEvents.join(lstgItem).map { } Which is better in case when i run the join and one task runs for ever because its gets 1000x times more number of records when compared to others. Before running the join i do reparition () and still i see this behavior. So which is best or at least one that completes ? In my case i am not even able to get it running for 1G. 25G or 100G datasets. Regards, Deepak -- Deepak
MLLib SVM probability
Hi all, I am trying to understand the output of the SVM classifier. Right now, my output looks like this: -18.841544889249917 0.0 168.32916035523283 1.0 420.67763915879794 1.0 -974.1942589201286 0.0 71.73602841256813 1.0 233.13636224524993 1.0 -1000.5902168199027 0.0 The documentation is unclear about what these numbers mean. I think it is the distance to the hyperplane with sign. My main question is: How can I convert distances from hyperplanes to probabilities in a multi-class one-vs-all approach? SVMLib http://www.csie.ntu.edu.tw/~cjlin/libsvm/ has this functionality and refers the process to get the probabilities as “Platt scaling” http://www.researchgate.net/profile/John_Platt/publication/2594015_Probabilistic_Outputs_for_Support_Vector_Machines_and_Comparisons_to_Regularized_Likelihood_Methods/links/004635154cff5262d600.pdf . I think this functionality should be in MLLib, but I can't find it? Do you think Platt scaling makes sense? Making clusters using Learning Vector Quantization, determining the spread function of a cluster with a Gaussian function and then retrieving the probability makes a lot more sense i.m.o. Using the distances from the hyperplanes from several SVM classifiers and then trying to determine some probability on these distance measures, does not make any sense, because the distribution property of the data-points belonging to a cluster is not taken into account. Does anyone see a fallacy in my reasoning? With kind regards, Robert
Re: spark 1.3.1
Hi Saurabh, Did you check the log of maven? 2015-05-04 15:17 GMT+02:00 Saurabh Gupta saurabh.gu...@semusi.com: HI, I am trying to build a example code given at https://spark.apache.org/docs/latest/sql-programming-guide.html#interoperating-with-rdds code is: // Import factory methods provided by DataType.import org.apache.spark.sql.types.DataType;// Import StructType and StructFieldimport org.apache.spark.sql.types.StructType;import org.apache.spark.sql.types.StructField;// Import Row.import org.apache.spark.sql.Row; // sc is an existing JavaSparkContext.SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); // Load a text file and convert each line to a JavaBean.JavaRDDString people = sc.textFile(examples/src/main/resources/people.txt); // The schema is encoded in a stringString schemaString = name age; // Generate the schema based on the string of schemaListStructField fields = new ArrayListStructField();for (String fieldName: schemaString.split( )) { fields.add(DataType.createStructField(fieldName, DataType.StringType, true));}StructType schema = DataType.createStructType(fields); // Convert records of the RDD (people) to Rows.JavaRDDRow rowRDD = people.map( new FunctionString, Row() { public Row call(String record) throws Exception { String[] fields = record.split(,); return Row.create(fields[0], fields[1].trim()); } }); // Apply the schema to the RDD.DataFrame peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema); // Register the DataFrame as a table.peopleDataFrame.registerTempTable(people); // SQL can be run over RDDs that have been registered as tables.DataFrame results = sqlContext.sql(SELECT name FROM people); // The results of SQL queries are DataFrames and support all the normal RDD operations.// The columns of a row in the result can be accessed by ordinal.ListString names = results.map(new FunctionRow, String() { public String call(Row row) { return Name: + row.getString(0); }}).collect(); my pom file looks like: dependencies dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.3.1/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-sql_2.10/artifactId version1.3.1/version /dependency dependency groupIdorg.apache.hbase/groupId artifactIdhbase/artifactId version0.94.0/version /dependency When I try to mvn package I am getting this issue: cannot find symbol [ERROR] symbol: variable StringType [ERROR] location: class org.apache.spark.sql.types.DataType I have gone through https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/types/StringType.html What is missing here?
Re: Exiting driver main() method...
I think I figured it out. I am playing around with the Cassandra connector and I had a method that inserted some data into a locally-running Cassandra instance, but I forgot to close the Cluster object. I guess that left some non-daemon thread running and kept the process for exiting. Nothing to see here, move along. :) On Sat, May 2, 2015 at 2:44 PM Mohammed Guller moham...@glassbeam.com wrote: No, you don’t need to do anything special. Perhaps, your application is getting stuck somewhere? If you can share your code, someone may be able to help. Mohammed *From:* James Carman [mailto:ja...@carmanconsulting.com] *Sent:* Friday, May 1, 2015 5:53 AM *To:* user@spark.apache.org *Subject:* Exiting driver main() method... In all the examples, it seems that the spark application doesn't really do anything special in order to exit. When I run my application, however, the spark-submit script just hangs there at the end. Is there something special I need to do to get that thing to exit normally?
Re: MLLib SVM probability
Hi Robert, I would say, taking the sign of the numbers represent the class of the input-vector. What kind of data are you using, and what kind of traning-set do you use. Fundamentally a SVM is able to separate only two classes, you can do one vs the rest as you mentioned. I don't see how LVQ can benefit the SVM classifier. I would say that this is more a SVM problem, than a Spark. 2015-05-04 15:22 GMT+02:00 Robert Musters robert.must...@openindex.io: Hi all, I am trying to understand the output of the SVM classifier. Right now, my output looks like this: -18.841544889249917 0.0 168.32916035523283 1.0 420.67763915879794 1.0 -974.1942589201286 0.0 71.73602841256813 1.0 233.13636224524993 1.0 -1000.5902168199027 0.0 The documentation is unclear about what these numbers mean https://spark.apache.org/docs/0.9.2/api/mllib/index.html#org.apache.spark.mllib.regression.LabeledPoint . I think it is the distance to the hyperplane with sign. My main question is: How can I convert distances from hyperplanes to probabilities in a multi-class one-vs-all approach? SVMLib http://www.csie.ntu.edu.tw/~cjlin/libsvm/ has this functionality and refers the process to get the probabilities as “Platt scaling” http://www.researchgate.net/profile/John_Platt/publication/2594015_Probabilistic_Outputs_for_Support_Vector_Machines_and_Comparisons_to_Regularized_Likelihood_Methods/links/004635154cff5262d600.pdf. I think this functionality should be in MLLib, but I can't find it? Do you think Platt scaling makes sense? Making clusters using Learning Vector Quantization, determining the spread function of a cluster with a Gaussian function and then retrieving the probability makes a lot more sense i.m.o. Using the distances from the hyperplanes from several SVM classifiers and then trying to determine some probability on these distance measures, does not make any sense, because the distribution property of the data-points belonging to a cluster is not taken into account. Does anyone see a fallacy in my reasoning? With kind regards, Robert
How to deal with code that runs before foreach block in Apache Spark?
I'm trying to deal with some code that runs differently on Spark stand-alone mode and Spark running on a cluster. Basically, for each item in an RDD, I'm trying to add it to a list, and once this is done, I want to send this list to Solr. This works perfectly fine when I run the following code in stand-alone mode of Spark, but does not work when the same code is run on a cluster. When I run the same code on a cluster, it is like send to Solr part of the code is executed before the list to be sent to Solr is filled with items. I try to force the execution by solrInputDocumentJavaRDD.collect(); after foreach, but it seems like it does not have any effect. // For each RDD solrInputDocumentJavaDStream.foreachRDD( new FunctionJavaRDDSolrInputDocument, Void() { @Override public Void call(JavaRDDSolrInputDocument solrInputDocumentJavaRDD) throws Exception { // For each item in a single RDD solrInputDocumentJavaRDD.foreach( new VoidFunctionSolrInputDocument() { @Override public void call(SolrInputDocument solrInputDocument) { // Add the solrInputDocument to the list of SolrInputDocuments SolrIndexerDriver.solrInputDocumentList.add(solrInputDocument); } }); // Try to force execution solrInputDocumentJavaRDD.collect(); // After having finished adding every SolrInputDocument to the list // add it to the solrServer, and commit, waiting for the commit to be flushed try { // Seems like when run in cluster mode, the list size is zero, // therefore the following part is never executed if (SolrIndexerDriver.solrInputDocumentList != null SolrIndexerDriver.solrInputDocumentList.size() 0) { SolrIndexerDriver.solrServer.add(SolrIndexerDriver.solrInputDocumentList); SolrIndexerDriver.solrServer.commit(true, true); SolrIndexerDriver.solrInputDocumentList.clear(); } } catch (SolrServerException | IOException e) { e.printStackTrace(); } return null; } } ); What should I do, so that sending-to-Solr part executes after the list of SolrDocuments are added to solrInputDocumentList (and works also in cluster mode)? -- Emre Sevinç
Re: spark 1.3.1
I am really new to this but what should I look into maven logs? I have tried mvn package -X -e SHould I show the full trace? On Mon, May 4, 2015 at 6:54 PM, Driesprong, Fokko fo...@driesprong.frl wrote: Hi Saurabh, Did you check the log of maven? 2015-05-04 15:17 GMT+02:00 Saurabh Gupta saurabh.gu...@semusi.com: HI, I am trying to build a example code given at https://spark.apache.org/docs/latest/sql-programming-guide.html#interoperating-with-rdds code is: // Import factory methods provided by DataType.import org.apache.spark.sql.types.DataType;// Import StructType and StructFieldimport org.apache.spark.sql.types.StructType;import org.apache.spark.sql.types.StructField;// Import Row.import org.apache.spark.sql.Row; // sc is an existing JavaSparkContext.SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); // Load a text file and convert each line to a JavaBean.JavaRDDString people = sc.textFile(examples/src/main/resources/people.txt); // The schema is encoded in a stringString schemaString = name age; // Generate the schema based on the string of schemaListStructField fields = new ArrayListStructField();for (String fieldName: schemaString.split( )) { fields.add(DataType.createStructField(fieldName, DataType.StringType, true));}StructType schema = DataType.createStructType(fields); // Convert records of the RDD (people) to Rows.JavaRDDRow rowRDD = people.map( new FunctionString, Row() { public Row call(String record) throws Exception { String[] fields = record.split(,); return Row.create(fields[0], fields[1].trim()); } }); // Apply the schema to the RDD.DataFrame peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema); // Register the DataFrame as a table.peopleDataFrame.registerTempTable(people); // SQL can be run over RDDs that have been registered as tables.DataFrame results = sqlContext.sql(SELECT name FROM people); // The results of SQL queries are DataFrames and support all the normal RDD operations.// The columns of a row in the result can be accessed by ordinal.ListString names = results.map(new FunctionRow, String() { public String call(Row row) { return Name: + row.getString(0); }}).collect(); my pom file looks like: dependencies dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.3.1/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-sql_2.10/artifactId version1.3.1/version /dependency dependency groupIdorg.apache.hbase/groupId artifactIdhbase/artifactId version0.94.0/version /dependency When I try to mvn package I am getting this issue: cannot find symbol [ERROR] symbol: variable StringType [ERROR] location: class org.apache.spark.sql.types.DataType I have gone through https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/types/StringType.html What is missing here?
Re: How to deal with code that runs before foreach block in Apache Spark?
I'm not familiar with the Solr API but provided that ' SolrIndexerDriver' is a singleton, I guess that what's going on when running on a cluster is that the call to: SolrIndexerDriver.solrInputDocumentList.add(elem) is happening on different singleton instances of the SolrIndexerDriver on different JVMs while SolrIndexerDriver.solrServer.commit is happening on the driver. In practical terms, the lists on the executors are being filled-in but they are never committed and on the driver the opposite is happening. -kr, Gerard On Mon, May 4, 2015 at 3:34 PM, Emre Sevinc emre.sev...@gmail.com wrote: I'm trying to deal with some code that runs differently on Spark stand-alone mode and Spark running on a cluster. Basically, for each item in an RDD, I'm trying to add it to a list, and once this is done, I want to send this list to Solr. This works perfectly fine when I run the following code in stand-alone mode of Spark, but does not work when the same code is run on a cluster. When I run the same code on a cluster, it is like send to Solr part of the code is executed before the list to be sent to Solr is filled with items. I try to force the execution by solrInputDocumentJavaRDD.collect(); after foreach, but it seems like it does not have any effect. // For each RDD solrInputDocumentJavaDStream.foreachRDD( new FunctionJavaRDDSolrInputDocument, Void() { @Override public Void call(JavaRDDSolrInputDocument solrInputDocumentJavaRDD) throws Exception { // For each item in a single RDD solrInputDocumentJavaRDD.foreach( new VoidFunctionSolrInputDocument() { @Override public void call(SolrInputDocument solrInputDocument) { // Add the solrInputDocument to the list of SolrInputDocuments SolrIndexerDriver.solrInputDocumentList.add(solrInputDocument); } }); // Try to force execution solrInputDocumentJavaRDD.collect(); // After having finished adding every SolrInputDocument to the list // add it to the solrServer, and commit, waiting for the commit to be flushed try { // Seems like when run in cluster mode, the list size is zero, // therefore the following part is never executed if (SolrIndexerDriver.solrInputDocumentList != null SolrIndexerDriver.solrInputDocumentList.size() 0) { SolrIndexerDriver.solrServer.add(SolrIndexerDriver.solrInputDocumentList); SolrIndexerDriver.solrServer.commit(true, true); SolrIndexerDriver.solrInputDocumentList.clear(); } } catch (SolrServerException | IOException e) { e.printStackTrace(); } return null; } } ); What should I do, so that sending-to-Solr part executes after the list of SolrDocuments are added to solrInputDocumentList (and works also in cluster mode)? -- Emre Sevinç
Re: mapping JavaRDD to jdbc DataFrame
You can use applySchema On Mon, May 4, 2015 at 10:16 PM, Lior Chaga lio...@taboola.com wrote: Hi, I'd like to use a JavaRDD containing parameters for an SQL query, and use SparkSQL jdbc to load data from mySQL. Consider the following pseudo code: JavaRDDString namesRdd = ... ; ... options.put(url, jdbc:mysql://mysql?user=usr); options.put(password, pass); options.put(dbtable, (SELECT * FROM mytable WHERE userName = ?) sp_campaigns); DataFrame myTableDF = m_sqlContext.load(jdbc, options); I'm looking for a way to map namesRdd and get for each name the result of the queries, without loosing spark context. Using a mapping function doesn't seem like an option, because I don't have SQLContext inside it. I can only think of using collect, and than iterating over the string in the RDD and execute the query, but it would run in the driver program. Any suggestions? Thanks, Lior -- Best Regards, Ayan Guha
Troubling Logging w/Simple Example (spark-1.2.2-bin-hadoop2.4)...
I have the following simple example program: public class SimpleCount { public static void main(String[] args) { final String master = System.getProperty(spark.master, local[*]); System.out.printf(Running job against spark master %s ...%n, master); final SparkConf conf = new SparkConf() .setAppName(simple-count) .setMaster(master) .set(spark.eventLog.enabled, true); final JavaSparkContext sc = new JavaSparkContext(conf); JavaRDDInteger rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); long n = rdd.count(); System.out.printf(I counted %d integers.%n, n); } } I start a local master: export SPARK_MASTER_IP=localhost sbin/start-master.sh Then, I start a local worker: bin/spark-class org.apache.spark.deploy.worker.Worker -h localhost spark://localhost:7077 When I run the example application: bin/spark-submit --class com.cengage.analytics.SimpleCount --master spark://localhost:7077 ~/IdeaProjects/spark-analytics/target/spark-analytics-1.0-SNAPSHOT.jar It finishes just fine (and even counts the right number :). However, I get the following log statements in the master's log file: 15/05/04 09:54:14 INFO Master: Registering app simple-count 15/05/04 09:54:14 INFO Master: Registered app simple-count with ID app-20150504095414-0009 15/05/04 09:54:14 INFO Master: Launching executor app-20150504095414-0009/0 on worker worker-20150504095401-localhost-55806 15/05/04 09:54:17 INFO Master: akka.tcp://sparkDriver@jamess-mbp:55939 got disassociated, removing it. 15/05/04 09:54:17 INFO Master: Removing app app-20150504095414-0009 15/05/04 09:54:17 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkDriver@jamess-mbp:55939] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. 15/05/04 09:54:17 INFO LocalActorRef: Message [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from Actor[akka://sparkMaster/deadLetters] to Actor[akka://sparkMaster/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkMaster%40127.0.0.1%3A55948-17#800019242] was not delivered. [18] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 15/05/04 09:54:17 INFO SecurityManager: Changing view acls to: jcarman 15/05/04 09:54:17 INFO SecurityManager: Changing modify acls to: jcarman 15/05/04 09:54:17 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(jcarman); users with modify permissions: Set(jcarman) 15/05/04 09:54:17 INFO Master: akka.tcp://sparkDriver@jamess-mbp:55939 got disassociated, removing it. 15/05/04 09:54:17 WARN EndpointWriter: AssociationError [akka.tcp://sparkMaster@localhost:7077] - [akka.tcp://sparkWorker@localhost:51252]: Error [Invalid address: akka.tcp://sparkWorker@localhost:51252] [ akka.remote.InvalidAssociation: Invalid address: akka.tcp://sparkWorker@localhost:51252 Caused by: akka.remote.transport.Transport$InvalidAssociationException: Connection refused: localhost/127.0.0.1:51252 ] 15/05/04 09:54:17 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://sparkWorker@localhost:51252]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: localhost/127.0.0.1:51252 15/05/04 09:54:17 INFO Master: akka.tcp://sparkWorker@localhost:51252 got disassociated, removing it. 15/05/04 09:54:17 WARN EndpointWriter: AssociationError [akka.tcp://sparkMaster@localhost:7077] - [akka.tcp://sparkWorker@jamess-mbp:50071]: Error [Invalid address: akka.tcp://sparkWorker@jamess-mbp:50071] [ akka.remote.InvalidAssociation: Invalid address: akka.tcp://sparkWorker@jamess-mbp:50071 Caused by: akka.remote.transport.Transport$InvalidAssociationException: Connection refused: jamess-mbp/192.168.1.45:50071 ] 15/05/04 09:54:17 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://sparkWorker@jamess-mbp:50071]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: jamess-mbp/192.168.1.45:50071 15/05/04 09:54:17 INFO Master: akka.tcp://sparkWorker@jamess-mbp:50071 got disassociated, removing it. 15/05/04 09:54:17 INFO RemoteActorRefProvider$RemoteDeadLetterActorRef: Message [org.apache.spark.deploy.DeployMessages$ApplicationFinished] from Actor[akka://sparkMaster/user/Master#-1247271270] to Actor[akka://sparkMaster/deadLetters] was not delivered. [19] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 15/05/04 09:54:17 INFO RemoteActorRefProvider$RemoteDeadLetterActorRef: Message
java.io.IOException: No space left on device while doing repartitioning in Spark
Hi, I am getting No space left on device exception when doing repartitioning of approx. 285 MB of data while these is still 2 GB space left ?? does it mean that repartitioning needs more space (more than 2 GB) for repartitioning of 285 MB of data ?? best, /Shahab java.io.IOException: No space left on device at sun.nio.ch.FileDispatcherImpl.write0(Native Method) at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60) at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) at sun.nio.ch.IOUtil.write(IOUtil.java:51) at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:205) at sun.nio.ch.FileChannelImpl.transferToTrustedChannel(FileChannelImpl.java:473) at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:569) at org.apache.spark.util.Utils$.copyStream(Utils.scala:331) at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$1.apply$mcVI$sp(ExternalSorter.scala:730) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:728) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)
Re: Re: sparksql running slow while joining 2 tables.
Are you using only 1 executor? On Mon, May 4, 2015 at 11:07 PM, luohui20...@sina.com wrote: hi Olivier spark1.3.1, with java1.8.0.45 and add 2 pics . it seems like a GC issue. I also tried with different parameters like memory size of driverexecutor, memory fraction, java opts... but this issue still happens. Thanksamp;Best regards! 罗辉 San.Luo - 原始邮件 - 发件人:Olivier Girardot ssab...@gmail.com 收件人:luohui20...@sina.com, user user@spark.apache.org 主题:Re: sparksql running slow while joining 2 tables. 日期:2015年05月04日 20点46分 Hi, What is you Spark version ? Regards, Olivier. Le lun. 4 mai 2015 à 11:03, luohui20...@sina.com a écrit : hi guys when i am running a sql like select a.name,a.startpoint,a.endpoint, a.piece from db a join sample b on (a.name = b.name) where (b.startpoint a.startpoint + 25); I found sparksql running slow in minutes which may caused by very long GC and shuffle time. table db is created from a txt file size at 56mb while table sample sized at 26mb, both at small size. my spark cluster is a standalone pseudo-distributed spark cluster with 8g executor and 4g driver manager. any advises? thank you guys. Thanksamp;Best regards! 罗辉 San.Luo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Best Regards, Ayan Guha
Re: spark 1.3.1
Hi, I think you need to import org.apache.spark.sql.types.DataTypes instead of org.apache.spark.sql.types.DataType and use that instead to access the StringType.. HTH, Deng On Mon, May 4, 2015 at 9:37 PM, Saurabh Gupta saurabh.gu...@semusi.com wrote: I am really new to this but what should I look into maven logs? I have tried mvn package -X -e SHould I show the full trace? On Mon, May 4, 2015 at 6:54 PM, Driesprong, Fokko fo...@driesprong.frl wrote: Hi Saurabh, Did you check the log of maven? 2015-05-04 15:17 GMT+02:00 Saurabh Gupta saurabh.gu...@semusi.com: HI, I am trying to build a example code given at https://spark.apache.org/docs/latest/sql-programming-guide.html#interoperating-with-rdds code is: // Import factory methods provided by DataType.import org.apache.spark.sql.types.DataType;// Import StructType and StructFieldimport org.apache.spark.sql.types.StructType;import org.apache.spark.sql.types.StructField;// Import Row.import org.apache.spark.sql.Row; // sc is an existing JavaSparkContext.SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); // Load a text file and convert each line to a JavaBean.JavaRDDString people = sc.textFile(examples/src/main/resources/people.txt); // The schema is encoded in a stringString schemaString = name age; // Generate the schema based on the string of schemaListStructField fields = new ArrayListStructField();for (String fieldName: schemaString.split( )) { fields.add(DataType.createStructField(fieldName, DataType.StringType, true));}StructType schema = DataType.createStructType(fields); // Convert records of the RDD (people) to Rows.JavaRDDRow rowRDD = people.map( new FunctionString, Row() { public Row call(String record) throws Exception { String[] fields = record.split(,); return Row.create(fields[0], fields[1].trim()); } }); // Apply the schema to the RDD.DataFrame peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema); // Register the DataFrame as a table.peopleDataFrame.registerTempTable(people); // SQL can be run over RDDs that have been registered as tables.DataFrame results = sqlContext.sql(SELECT name FROM people); // The results of SQL queries are DataFrames and support all the normal RDD operations.// The columns of a row in the result can be accessed by ordinal.ListString names = results.map(new FunctionRow, String() { public String call(Row row) { return Name: + row.getString(0); }}).collect(); my pom file looks like: dependencies dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.3.1/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-sql_2.10/artifactId version1.3.1/version /dependency dependency groupIdorg.apache.hbase/groupId artifactIdhbase/artifactId version0.94.0/version /dependency When I try to mvn package I am getting this issue: cannot find symbol [ERROR] symbol: variable StringType [ERROR] location: class org.apache.spark.sql.types.DataType I have gone through https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/types/StringType.html What is missing here?
Re: Spark Mongodb connection
Hi Yasemin, You can find here a MongoDB connector for Spark SQL: http://github.com/Stratio/spark-mongodb Best regards 2015-05-04 9:27 GMT+02:00 Yasemin Kaya godo...@gmail.com: Hi! I am new at Spark and I want to begin Spark with simple wordCount example in Java. But I want to give my input from Mongodb database. I want to learn how can I connect Mongodb database to my project. Any one can help for this issue. Have a nice day yasemin -- hiç ender hiç -- Gaspar Muñoz @gmunozsoria http://www.stratio.com/ Vía de las dos Castillas, 33, Ática 4, 3ª Planta 28224 Pozuelo de Alarcón, Madrid Tel: +34 91 352 59 42 // *@stratiobd https://twitter.com/StratioBD*
Re: java.io.IOException: No space left on device while doing repartitioning in Spark
See https://wiki.gentoo.org/wiki/Knowledge_Base:No_space_left_on_device_while_there_is_plenty_of_space_available What's the value for spark.local.dir property ? Cheers On Mon, May 4, 2015 at 6:57 AM, shahab shahab.mok...@gmail.com wrote: Hi, I am getting No space left on device exception when doing repartitioning of approx. 285 MB of data while these is still 2 GB space left ?? does it mean that repartitioning needs more space (more than 2 GB) for repartitioning of 285 MB of data ?? best, /Shahab java.io.IOException: No space left on device at sun.nio.ch.FileDispatcherImpl.write0(Native Method) at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60) at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) at sun.nio.ch.IOUtil.write(IOUtil.java:51) at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:205) at sun.nio.ch.FileChannelImpl.transferToTrustedChannel(FileChannelImpl.java:473) at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:569) at org.apache.spark.util.Utils$.copyStream(Utils.scala:331) at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$1.apply$mcVI$sp(ExternalSorter.scala:730) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:728) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)
Re: empty jdbc RDD in spark
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.JdbcRDD The arguments are sql string, lower bound, upper bound, number of partitions. Your call SELECT * FROM MEMBERS LIMIT ? OFFSET ?, 0, 100, 1 would thus be run as SELECT * FROM MEMBERS LIMIT 0 OFFSET 100 Naturally limit 0 will yield 0 results. JdbcRDD is designed to be used with multiple partitions, with some kind of numeric index. Try something more like SELECT * FROM MEMBERS WHERE ID = ? AND ID ?, 0, howeverManyRowsYouHave, 8 On Fri, May 1, 2015 at 3:56 PM, Hafiz Mujadid hafizmujadi...@gmail.com wrote: Hi all! I am trying to read hana database using spark jdbc RDD here is my code def readFromHana() { val conf = new SparkConf() conf.setAppName(test).setMaster(local) val sc = new SparkContext(conf) val rdd = new JdbcRDD(sc, () = { Class.forName(com.sap.db.jdbc.Driver).newInstance() DriverManager.getConnection(jdbc:sap:// 54.69.200.113:30015/?currentschema=LIVE2, mujadid, 786Xyz123) }, SELECT * FROM MEMBERS LIMIT ? OFFSET ?, 0, 100, 1, (r: ResultSet) = convert(r) ) println(rdd.count()); sc.stop() } def convert(rs: ResultSet):String={ val rsmd = rs.getMetaData() val numberOfColumns = rsmd.getColumnCount() var i = 1 val row=new StringBuilder while (i = numberOfColumns) { row.append( rs.getString(i)+,) i += 1 } row.toString() } The resultant count is 0 Any suggestion? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/empty-jdbc-RDD-in-spark-tp22736.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkStream saveAsTextFiles()
Hi, What kind of can't find symbol are you receiving? On the other hand, I would try to change guava dependency version to 14.0.1. In Spark 1.3.0, guava version is 14.0.1 but is not included inside spark artifact because it's marked like provided. http://repo1.maven.org/maven2/org/apache/spark/spark-core_2.10/1.3.0/spark-core_2.10-1.3.0.pom Spark and Guava have a long history. You just have to search a bit in google. Regards. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkStream-saveAsTextFiles-tp22719p22754.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SparkSQL Nested structure
Hi, I'm trying to parse log files generated by Spark using SparkSQL. In the JSON elements related to the StageCompleted event we have a nested structre containing an array of elements with RDD Info. (see the log below as an example (omitting some parts). { Event: SparkListenerStageCompleted, Stage Info: { Stage ID: 1, ... RDD Info: [ { RDD ID: 5, Name: 5, Storage Level: { Use Disk: false, Use Memory: false, Use Tachyon: false, Deserialized: false, Replication: 1 }, Number of Partitions: 2, Number of Cached Partitions: 0, Memory Size: 0, Tachyon Size: 0, Disk Size: 0 }, ... When i register the log as a table SparkSQL is able to generate the correct schema that for the RDD Info element looks like | -- RDD Info: array (nullable = true) ||-- element: struct (containsNull = true) |||-- Disk Size: long (nullable = true) |||-- Memory Size: long (nullable = true) |||-- Name: string (nullable = true) My problem is that if I try to query the table I can only get array buffers out of it: SELECT `stageEndInfos.Stage Info.Stage ID`, `stageEndInfos.Stage Info.RDD Info` FROM stageEndInfos Stage ID RDD Info 1ArrayBuffer([0,0,... 0ArrayBuffer([0,0,... 2ArrayBuffer([0,0,... or: SELECT `stageEndInfos.Stage Info.RDD Info.RDD ID` FROM stageEndInfos RDD ID ArrayBuffer(5, 4, 3) ArrayBuffer(2, 1, 0) ArrayBuffer(9, 6,... Is there a way to explode the arrays in the rows in order to build a single table? (Knowing that the RDD ID is unique and can be used as primary key)? Thanks! How can I get
Is LIMIT n in Spark SQL useful?
I am trying to query PostgreSQL using LIMIT(n) to reduce memory size and improve query performance, but I found it took long time as same as querying not using LIMIT. It let me confused. Anybody know why? Thanks. Regards,Yi
Re: SparkStream saveAsTextFiles()
Structure seems fine. Only need to type at the end of your program: ssc.start(); ssc.awaitTermination(); Check method arguments. I advise you to check the spark java api streaming. https://spark.apache.org/docs/1.3.0/api/java/ Regards. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkStream-saveAsTextFiles-tp22719p22755.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Long GC pauses with Spark SQL 1.3.0 and billion row tables
In regards to the large GC pauses, assuming you allocated all 100GB of memory per worker you may consider running with less memory on your Worker nodes, or splitting up the available memory on the Worker nodes amongst several worker instances. The JVM's garbage collection starts to become very slow as the memory allocation for the heap becomes large. At 100GB it may not be unusual to see GC take minutes at time. I believe with Yarn or Standalone clusters you should be able to run multiple smaller JVM instances on your workers so you can still use your cluster resources but also won't have a single JVM allocating an unwieldy amount of much memory. On Mon, May 4, 2015 at 2:23 AM, Nick Travers n.e.trav...@gmail.com wrote: Could you be more specific in how this is done? A DataFrame class doesn't have that method. On Sun, May 3, 2015 at 11:07 PM, ayan guha guha.a...@gmail.com wrote: You can use custom partitioner to redistribution using partitionby On 4 May 2015 15:37, Nick Travers n.e.trav...@gmail.com wrote: I'm currently trying to join two large tables (order 1B rows each) using Spark SQL (1.3.0) and am running into long GC pauses which bring the job to a halt. I'm reading in both tables using a HiveContext with the underlying files stored as Parquet Files. I'm using something along the lines of HiveContext.sql(SELECT a.col1, b.col2 FROM a JOIN b ON a.col1 = b.col1) to set up the join. When I execute this (with an action such as .count) I see the first few stages complete, but the job eventually stalls. The GC counts keep increasing for each executor. Running with 6 workers, each with 2T disk and 100GB RAM. Has anyone else run into this issue? I'm thinking I might be running into issues with the shuffling of the data, but I'm unsure of how to get around this? Is there a way to redistribute the rows based on the join key first, and then do the join? Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Long-GC-pauses-with-Spark-SQL-1-3-0-and-billion-row-tables-tp22750.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is LIMIT n in Spark SQL useful?
What query are you running. It may be the case that your query requires PosgreSQL to do a large amount of work before identifying the first n rows On 4 May 2015, at 15:52, Yi Zhang zhangy...@yahoo.com.INVALID wrote: I am trying to query PostgreSQL using LIMIT(n) to reduce memory size and improve query performance, but I found it took long time as same as querying not using LIMIT. It let me confused. Anybody know why? Thanks. Regards, Yi
Re: Is LIMIT n in Spark SQL useful?
and a further question - have you tried running this query in pqsl? what’s the performance like there? On 4 May 2015, at 16:04, Robin East robin.e...@xense.co.uk wrote: What query are you running. It may be the case that your query requires PosgreSQL to do a large amount of work before identifying the first n rows On 4 May 2015, at 15:52, Yi Zhang zhangy...@yahoo.com.INVALID mailto:zhangy...@yahoo.com.INVALID wrote: I am trying to query PostgreSQL using LIMIT(n) to reduce memory size and improve query performance, but I found it took long time as same as querying not using LIMIT. It let me confused. Anybody know why? Thanks. Regards, Yi
Python Custom Partitioner
Hi Can someone share some working code for custom partitioner in python? I am trying to understand it better. Here is documentation partitionBy(*numPartitions*, *partitionFunc=function portable_hash at 0x2c45140*) https://spark.apache.org/docs/1.3.1/api/python/pyspark.html#pyspark.RDD.partitionBy Return a copy of the RDD partitioned using the specified partitioner. what I am trying to do - 1. Create a dataframe 2. Partition it using one specific column 3. create another dataframe 4. partition it on the same column 5. join (to enforce map-side join) My question: a) Am I on right path? b) How can I do partitionby? Specifically, when I call DF.rdd.partitionBy, what gets passed to the custom function? tuple? row? how to access (say 3rd column of a tuple inside partitioner function)? -- Best Regards, Ayan Guha
Re: Python Custom Partitioner
I have implemented map-side join with broadcast variables and the code is on mailing list (scala). On Mon, May 4, 2015 at 8:38 PM, ayan guha guha.a...@gmail.com wrote: Hi Can someone share some working code for custom partitioner in python? I am trying to understand it better. Here is documentation partitionBy(*numPartitions*, *partitionFunc=function portable_hash at 0x2c45140*) https://spark.apache.org/docs/1.3.1/api/python/pyspark.html#pyspark.RDD.partitionBy Return a copy of the RDD partitioned using the specified partitioner. what I am trying to do - 1. Create a dataframe 2. Partition it using one specific column 3. create another dataframe 4. partition it on the same column 5. join (to enforce map-side join) My question: a) Am I on right path? b) How can I do partitionby? Specifically, when I call DF.rdd.partitionBy, what gets passed to the custom function? tuple? row? how to access (say 3rd column of a tuple inside partitioner function)? -- Best Regards, Ayan Guha -- Deepak
Re: Spark - Timeout Issues - OutOfMemoryError
I tried this val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))] = lstgItem.join(viEvents, new org.apache.spark.RangePartitioner(partitions = 1200, rdd = viEvents)).map { It fired two jobs and still i have 1 task that never completes. IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDurationGC TimeShuffle Read Size / Records ▴Shuffle Spill (Memory)Shuffle Spill (Disk) Errors 0 4818 0 RUNNING PROCESS_LOCAL 5 / host1 2015/05/04 07:24:25 1.1 h 13 min 778.0 MB / 50314161 4.5 GB 47.4 MB 955 5773 0 SUCCESS PROCESS_LOCAL 5 / host2 2015/05/04 07:47:16 2.2 min 1.5 min 106.3 MB / 4197539 0.0 B 0.0 B 1199 6017 0 SUCCESS PROCESS_LOCAL 3 / host3 2015/05/04 07:51:51 48 s 2 s 94.2 MB / 3618335 2.8 GB 8.6 MB 216 2) I tried reversing the datasets in join val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))] =viEvents.join(lstgItem) This led to same problem of a long running task. 3) Next, i am trying this val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))] = lstgItem.join(viEvents, 1200).map { I have exhausted all my options. Regards, Deepak On Mon, May 4, 2015 at 6:24 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I ran it against one file instead of 10 files and i see one task is still running after 33 mins its shuffle read size is 780MB/50 mil records. I did a count of records for each itemId from dataset-2 [One FILE] (Second Dataset (RDDPair) val viEvents = viEventsRaw.map { vi = (vi.get(14 ).asInstanceOf[Long], vi) } ). This is the dataset that contains the list of items viewed by user in one day. *Item IdCount* 201335783004 537 111654496030 353 141640703798 287 191568402102 258 111654479898 217 231521843148 211 251931716094 201 111654493548 181 181503913062 181 121635453050 152 261798565828 140 151494617682 139 251927181728 127 231516683056 119 141640492864 117 161677270656 117 171771073616 113 111649942124 109 191516989450 97 231539161292 94 221555628408 88 131497785968 87 121632233872 84 131335379184 83 281531363490 83 131492727742 79 231174157820 79 161666914810 77 251699753072 77 161683664300 76 I was assuming that data-skew would be if the top item(201335783004) had a count of 1 million, however its only few hundreds, then why is Spark skewing it in join ? What should i do that Spark distributes the records evenly ? In M/R we can change the Partitioner between mapper and reducer, how can i do in Spark for Join? IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDurationGC TimeShuffle Read Size / Records ▴Shuffle Spill (Memory)Shuffle Spill (Disk)Errors 0 3618 0 RUNNING PROCESS_LOCAL 4 / host12015/05/04 05:09:53 33 min 8.5 min 783.9 MB / 50,761,322 4.6 GB 47.5 MB 433 4051 0 SUCCESS PROCESS_LOCAL 1 / host2 2015/05/04 05:16:27 1.1 min 20 s 116.0 MB / 4505143 1282.3 MB 10.1 MB 218 3836 0 SUCCESS PROCESS_LOCAL 3 / host3 2015/05/04 05:13:01 53 s 11 s 76.4 MB / 2865143 879.6 MB 6.9 MB 113 3731 0 SUCCESS PROCESS_LOCAL 2 / host4 2015/05/04 05:11:30 31 s 8 s 6.9 MB / 5187 0.0 B 0.0 B On Mon, May 4, 2015 at 6:00 PM, Saisai Shao sai.sai.s...@gmail.com wrote: From the symptoms you mentioned that one task's shuffle write is much larger than all the other task, it is quite similar to normal data skew behavior, I just give some advice based on your descriptions, I think you need to detect whether data is actually skewed or not. The shuffle will put data with same partitioner strategy (default is hash partitioner) into one task, so the same key data will be put into the same task, but one task not just has only one key. 2015-05-04 18:04 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com: Attached image shows the Spark UI for the job. On Mon, May 4, 2015 at 3:28 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Four tasks are now failing with IndexIDAttemptStatus ▾Locality LevelExecutor ID / HostLaunch Time DurationGC TimeShuffle Read Size / RecordsShuffle Spill (Memory)Shuffle Spill (Disk)Errors 0 3771 0 FAILED PROCESS_LOCAL 114 / host1 2015/05/04 01:27:44 / ExecutorLostFailure (executor 114 lost) 1007 4973 1 FAILED PROCESS_LOCAL 420 / host2 2015/05/04 02:13:14 / FetchFailed(null, shuffleId=1, mapId=-1, reduceId=1007, message= +details FetchFailed(null, shuffleId=1, mapId=-1, reduceId=1007, message= org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 1 at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:385) at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:382) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at
Parallelize foreach in PySpark with Spark Standalone
Full disclosure, I am *brand* new to Spark. I am trying to use [Py]SparkSQL standalone to pre-process a bunch of *local* (non HDFS) Parquet files. I have several thousand files and want to dispatch as many workers as my machine can handle to process the data in parallel; either at the per-file or per-record (or batch of records) within a single file. My question is, how can this be achieved in a standalone scenario? I have plenty cores and RAM yet when I do `sc = SparkContext(local[8])` in my stand alone script I see no speedup compared to, say, local[1]. I've also tried something like : distData = sc.parallelize(data) then distData.foreach(myFunction) after starting with local[N], yet that seems to return immediately without producing the expected side effects from myFunction (file output). I realize parallelizing Python code on a single node cluster is not what Spark was designed for but it seems to integrate Parquet and Python so well that it's my only option. :) Thanks, Kyle -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Parallelize-foreach-in-PySpark-with-Spark-Standalone-tp22756.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
AJAX with Apache Spark
Hi, I am trying create a DashBoard of a job of Apache Spark. I need run Spark Streaming 24/7 and when recive a ajax request this answer with the actual state of the job. I have created the client, and the program in Spark. I tried create the service of response with play, but this run the program with a request. I want send the accumulator of spark program with a request. Sorry for my explanation. Any idea? Maybe with Play? Thanks
com.datastax.spark % spark-streaming_2.10 % 1.1.0 in my build.sbt ??
Can I specify this in my build file ? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/com-datastax-spark-spark-streaming-2-10-1-1-0-in-my-build-sbt-tp22758.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
No logs from my cluster / worker ... (running DSE 4.6.1)
I'm submitting this via 'dse spark-submit' but somehow, I don't see any loggings in my cluster or worker machines... How can I find out ? My cluster is running DSE 4.6.1 with Spark enabled. My source is running Kafka 0.8.2.0 I'm launching my program on one of my DSE machines. Any insights much appreciated. Thanks. - cas1.dev% dse spark-submit --verbose --deploy-mode cluster --master spark://cas1.dev.kno.com:7077 --class com.kno.highlights.counting.service.HighlightConsumer --driver-class-path /opt/kno/kno-highlights-counting-service/kno-highlights-counting-service-0.1/lib --driver-library-path /opt/kno/kno-highlights-counting-service/kno-highlights-counting-service-0.1/lib --properties-file /tmp/highlights-counting.properties /opt/kno/kno-highlights-counting-service/kno-highlights-counting-service-0.1/lib/kno-highlights-counting-service.kno-highlights-counting-service-0.1.jar --name HighlightConsumer Using properties file: /tmp/highlights-counting.properties Warning: Ignoring non-spark config property: checkpoint_directory=checkpointForHighlights Warning: Ignoring non-spark config property: zookeeper_port=2181 Warning: Ignoring non-spark config property: default_num_of_cores_per_topic=1 Warning: Ignoring non-spark config property: num_of_concurrent_streams=2 Warning: Ignoring non-spark config property: kafka_consumer_group=highlight_consumer_group Warning: Ignoring non-spark config property: app_name=HighlightConsumer Warning: Ignoring non-spark config property: cassandra_keyspace=bookevents Warning: Ignoring non-spark config property: scheduler_mode=FIFO Warning: Ignoring non-spark config property: highlight_topic=highlight_topic Warning: Ignoring non-spark config property: cassandra_host=cas1.dev.kno.com Warning: Ignoring non-spark config property: checkpoint_interval=3 Warning: Ignoring non-spark config property: zookeeper_host=cas1.dev.kno.com Adding default property: spark_master=spark://cas1.dev.kno.com:7077 Warning: Ignoring non-spark config property: streaming_window=10 Using properties file: /tmp/highlights-counting.properties Warning: Ignoring non-spark config property: checkpoint_directory=checkpointForHighlights Warning: Ignoring non-spark config property: zookeeper_port=2181 Warning: Ignoring non-spark config property: default_num_of_cores_per_topic=1 Warning: Ignoring non-spark config property: num_of_concurrent_streams=2 Warning: Ignoring non-spark config property: kafka_consumer_group=highlight_consumer_group Warning: Ignoring non-spark config property: app_name=HighlightConsumer Warning: Ignoring non-spark config property: cassandra_keyspace=bookevents Warning: Ignoring non-spark config property: scheduler_mode=FIFO Warning: Ignoring non-spark config property: highlight_topic=highlight_topic Warning: Ignoring non-spark config property: cassandra_host=cas1.dev.kno.com Warning: Ignoring non-spark config property: checkpoint_interval=3 Warning: Ignoring non-spark config property: zookeeper_host=cas1.dev.kno.com Adding default property: spark_master=spark://cas1.dev.kno.com:7077 Warning: Ignoring non-spark config property: streaming_window=10 Parsed arguments: master spark://cas1.dev.kno.com:7077 deployMode cluster executorMemory null executorCores null totalExecutorCores null propertiesFile /tmp/highlights-counting.properties extraSparkPropertiesMap() driverMemorynull driverCores null driverExtraClassPath /opt/kno/kno-highlights-counting-service/kno-highlights-counting-service-0.1/lib driverExtraLibraryPath /opt/kno/kno-highlights-counting-service/kno-highlights-counting-service-0.1/lib driverExtraJavaOptions null supervise false queue null numExecutorsnull files null pyFiles null archivesnull mainClass com.kno.highlights.counting.service.HighlightConsumer primaryResource file:/opt/kno/kno-highlights-counting-service/kno-highlights-counting-service-0.1/lib/kno-highlights-counting-service.kno-highlights-counting-service-0.1.jar name com.kno.highlights.counting.service.HighlightConsumer childArgs [--name HighlightConsumer] jarsnull verbose true Default properties from /tmp/highlights-counting.properties: spark_master - spark://cas1.dev.kno.com:7077 Using properties file: /tmp/highlights-counting.properties Warning: Ignoring non-spark config property: checkpoint_directory=checkpointForHighlights Warning: Ignoring non-spark config property: zookeeper_port=2181 Warning: Ignoring non-spark config property: default_num_of_cores_per_topic=1 Warning: Ignoring non-spark config property: num_of_concurrent_streams=2 Warning: Ignoring non-spark config property: kafka_consumer_group=highlight_consumer_group Warning: Ignoring non-spark config property:
OOM error with GMMs on 4GB dataset
Hi, I am training a GMM with 10 gaussians on a 4 GB dataset(720,000 * 760). The spark (1.3.1) job is allocated 120 executors with 6GB each and the driver also has 6GB. Spark Config Params: .set(spark.hadoop.validateOutputSpecs, false).set(spark.dynamicAllocation.enabled, false).set(spark.driver.maxResultSize, 4g).set(spark.default.parallelism, 300).set(spark.serializer, org.apache.spark.serializer.KryoSerializer).set(spark.kryoserializer.buffer.mb, 500).set(spark.akka.frameSize, 256).set(spark.akka.timeout, 300) However, at the aggregate step (Line 168) val sums = breezeData.aggregate(ExpectationSum.zero(k, d))(compute.value, _ += _) I get OOM error and the application hangs indefinitely. Is this an issue or am I missing something? java.lang.OutOfMemoryError: Java heap space at akka.util.CompactByteString$.apply(ByteString.scala:410) at akka.util.ByteString$.apply(ByteString.scala:22) at akka.remote.transport.netty.TcpHandlers$class.onMessage(TcpSupport.scala:45) at akka.remote.transport.netty.TcpServerHandler.onMessage(TcpSupport.scala:57) at akka.remote.transport.netty.NettyServerHelpers$class.messageReceived(NettyHelpers.scala:43) at akka.remote.transport.netty.ServerHandler.messageReceived(NettyTransport.scala:180) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462) at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443) at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:310) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/05/04 16:23:38 ERROR util.Utils: Uncaught exception in thread task-result-getter-2 java.lang.OutOfMemoryError: Java heap space Exception in thread task-result-getter-2 java.lang.OutOfMemoryError: Java heap space 15/05/04 16:23:45 INFO scheduler.TaskSetManager: Finished task 1070.0 in stage 6.0 (TID 8276) in 382069 ms on [] (160/3600) 15/05/04 16:23:54 WARN channel.DefaultChannelPipeline: An exception was thrown by a user handler while handling an exception event ([id: 0xc57da871, ] EXCEPTION: java.lang.OutOfMemoryError: Java heap space) java.lang.OutOfMemoryError: Java heap space 15/05/04 16:23:55 WARN channel.DefaultChannelPipeline: An exception was thrown by a user handler while handling an exception event ([id: 0x3c3dbb0c, ] EXCEPTION: java.lang.OutOfMemoryError: Java heap space) 15/05/04 16:24:45 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.remote.default-remote-dispatcher-6] shutting down ActorSystem [sparkDriver] Thanks! Vinay
spark Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient
hi all, when i use submit a spark-sql programe to select data from my hive database I get an error like this: User class threw exception: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient ,what's wrong with my spark configure ,thank any help !
RE: 回复:RE: 回复:Re: sparksql running slow while joining_2_tables.
You can use Explain extended select …. From: luohui20...@sina.com [mailto:luohui20...@sina.com] Sent: Tuesday, May 05, 2015 9:52 AM To: Cheng, Hao; Olivier Girardot; user Subject: 回复:RE: 回复:Re: sparksql running slow while joining_2_tables. As I know broadcastjoin is automatically enabled by spark.sql.autoBroadcastJoinThreshold. refer to http://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration-options and how to check my app's physical plan,and others things like optimized plan,executable plan.etc thanks Thanksamp;Best regards! 罗辉 San.Luo - 原始邮件 - 发件人:Cheng, Hao hao.ch...@intel.commailto:hao.ch...@intel.com 收件人:Cheng, Hao hao.ch...@intel.commailto:hao.ch...@intel.com, luohui20...@sina.commailto:luohui20...@sina.com luohui20...@sina.commailto:luohui20...@sina.com, Olivier Girardot ssab...@gmail.commailto:ssab...@gmail.com, user user@spark.apache.orgmailto:user@spark.apache.org 主题:RE: 回复:Re: sparksql running slow while joining_2_tables. 日期:2015年05月05日 08点38分 Or, have you ever try broadcast join? From: Cheng, Hao [mailto:hao.ch...@intel.com] Sent: Tuesday, May 5, 2015 8:33 AM To: luohui20...@sina.commailto:luohui20...@sina.com; Olivier Girardot; user Subject: RE: 回复:Re: sparksql running slow while joining 2 tables. Can you print out the physical plan? EXPLAIN SELECT xxx… From: luohui20...@sina.commailto:luohui20...@sina.com [mailto:luohui20...@sina.com] Sent: Monday, May 4, 2015 9:08 PM To: Olivier Girardot; user Subject: 回复:Re: sparksql running slow while joining 2 tables. hi Olivier spark1.3.1, with java1.8.0.45 and add 2 pics . it seems like a GC issue. I also tried with different parameters like memory size of driverexecutor, memory fraction, java opts... but this issue still happens. Thanksamp;Best regards! 罗辉 San.Luo - 原始邮件 - 发件人:Olivier Girardot ssab...@gmail.commailto:ssab...@gmail.com 收件人:luohui20...@sina.commailto:luohui20...@sina.com, user user@spark.apache.orgmailto:user@spark.apache.org 主题:Re: sparksql running slow while joining 2 tables. 日期:2015年05月04日 20点46分 Hi, What is you Spark version ? Regards, Olivier. Le lun. 4 mai 2015 à 11:03, luohui20...@sina.commailto:luohui20...@sina.com a écrit : hi guys when i am running a sql like select a.namehttp://a.name,a.startpoint,a.endpoint, a.piece from db a join sample b on (a.namehttp://a.name = b.namehttp://b.name) where (b.startpoint a.startpoint + 25); I found sparksql running slow in minutes which may caused by very long GC and shuffle time. table db is created from a txt file size at 56mb while table sample sized at 26mb, both at small size. my spark cluster is a standalone pseudo-distributed spark cluster with 8g executor and 4g driver manager. any advises? thank you guys. Thanksamp;Best regards! 罗辉 San.Luo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org
Re: Nightly builds/releases?
I see a Jira for this one, but unresolved. https://issues.apache.org/jira/browse/SPARK-1517 https://issues.apache.org/jira/browse/SPARK-1517 On May 4, 2015, at 10:25 PM, Ankur Chauhan achau...@brightcove.com wrote: Hi, Does anyone know if spark has any nightly builds or equivalent that provides binaries that have passed a CI build so that one could try out the bleeding edge without having to compile. -- Ankur
RE: 回复:Re: sparksql running slow while joining 2 tables.
Can you print out the physical plan? EXPLAIN SELECT xxx… From: luohui20...@sina.com [mailto:luohui20...@sina.com] Sent: Monday, May 4, 2015 9:08 PM To: Olivier Girardot; user Subject: 回复:Re: sparksql running slow while joining 2 tables. hi Olivier spark1.3.1, with java1.8.0.45 and add 2 pics . it seems like a GC issue. I also tried with different parameters like memory size of driverexecutor, memory fraction, java opts... but this issue still happens. Thanksamp;Best regards! 罗辉 San.Luo - 原始邮件 - 发件人:Olivier Girardot ssab...@gmail.commailto:ssab...@gmail.com 收件人:luohui20...@sina.commailto:luohui20...@sina.com, user user@spark.apache.orgmailto:user@spark.apache.org 主题:Re: sparksql running slow while joining 2 tables. 日期:2015年05月04日 20点46分 Hi, What is you Spark version ? Regards, Olivier. Le lun. 4 mai 2015 à 11:03, luohui20...@sina.commailto:luohui20...@sina.com a écrit : hi guys when i am running a sql like select a.namehttp://a.name,a.startpoint,a.endpoint, a.piece from db a join sample b on (a.namehttp://a.name = b.namehttp://b.name) where (b.startpoint a.startpoint + 25); I found sparksql running slow in minutes which may caused by very long GC and shuffle time. table db is created from a txt file size at 56mb while table sample sized at 26mb, both at small size. my spark cluster is a standalone pseudo-distributed spark cluster with 8g executor and 4g driver manager. any advises? thank you guys. Thanksamp;Best regards! 罗辉 San.Luo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org
Re: Kryo serialization of classes in additional jars
Actually, after some digging, I did find a JIRA for it: SPARK-5470. The fix for this has gone into master, but it isn't in 1.2. On Mon, May 4, 2015 at 2:47 PM, Imran Rashid iras...@cloudera.com wrote: Oh, this seems like a real pain. You should file a jira, I didn't see an open issue -- if nothing else just to document the issue. As you've noted, the problem is that the serializer is created immediately in the executors, right when the SparkEnv is created, but the other jars aren't downloaded later. I think you could workaround with some combination of pushing the jars to the cluster manually, and then using spark.executor.extraClassPath On Wed, Apr 29, 2015 at 6:42 PM, Akshat Aranya aara...@gmail.com wrote: Hi, Is it possible to register kryo serialization for classes contained in jars that are added with spark.jars? In my experiment it doesn't seem to work, likely because the class registration happens before the jar is shipped to the executor and added to the classloader. Here's the general idea of what I want to do: val sparkConf = new SparkConf(true) .set(spark.jars, foo.jar) .setAppName(foo) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) // register classes contained in foo.jar sparkConf.registerKryoClasses(Array( classOf[com.foo.Foo], classOf[com.foo.Bar])) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
sparksql support hive view
guys, just to confirm, sparksql support hive feature view, is that the one LateralView in hive language manual? thanks Thanksamp;Best regards! 罗辉 San.Luo
Re: Nightly builds/releases?
See this related thread: http://search-hadoop.com/m/JW1q5bnnyT1 Cheers On Mon, May 4, 2015 at 7:58 PM, Guru Medasani gdm...@gmail.com wrote: I see a Jira for this one, but unresolved. https://issues.apache.org/jira/browse/SPARK-1517 On May 4, 2015, at 10:25 PM, Ankur Chauhan achau...@brightcove.com wrote: Hi, Does anyone know if spark has any nightly builds or equivalent that provides binaries that have passed a CI build so that one could try out the bleeding edge without having to compile. -- Ankur
Re: Nightly builds/releases?
Hi, There is also a make-distribution.sh file in the repository root. If someone with jenkins access can create a simple builder that would be awesome. But I am guessing besides the spark binary one would also probably want the maven artifacts (lower priority though) to work with it. -- Ankur On 4 May 2015, at 20:11, Ted Yu yuzhih...@gmail.com wrote: See this related thread: http://search-hadoop.com/m/JW1q5bnnyT1 Cheers On Mon, May 4, 2015 at 7:58 PM, Guru Medasani gdm...@gmail.com wrote: I see a Jira for this one, but unresolved. https://issues.apache.org/jira/browse/SPARK-1517 On May 4, 2015, at 10:25 PM, Ankur Chauhan achau...@brightcove.com wrote: Hi, Does anyone know if spark has any nightly builds or equivalent that provides binaries that have passed a CI build so that one could try out the bleeding edge without having to compile. -- Ankur signature.asc Description: Message signed with OpenPGP using GPGMail
Help with Spark SQL Hash Distribution
I am trying to distribute a table using a particular column which is the key that I’ll be using to perform join operations on the table. Is it possible to do this with Spark SQL? I checked the method partitionBy() for rdds. But not sure how to specify which column is the key? Can anyone give an example? Thanks Mani Graduate Student, Department of Computer Science Virginia Tech
Re: sparksql support hive view
We support both LATERAL VIEWs (a query language feature that lets you turn a single row into many rows, for example with an explode) and virtual views (a table that is really just a query that is run on demand). On Mon, May 4, 2015 at 7:12 PM, luohui20...@sina.com wrote: guys, just to confirm, sparksql support hive feature view, is that the one LateralView https://cwiki.apache.org/confluence/display/Hive/LanguageManual+LateralView in hive language manual? thanks Thanksamp;Best regards! 罗辉 San.Luo
Re: Help with Spark SQL Hash Distribution
If you do a join with at least one equality relationship between the two tables, Spark SQL will automatically hash partition the data and perform the join. If you are looking to prepartition the data, that information is not yet propagated from the in memory cached representation so won't help avoid an extra shuffle, but Kai (cc-ed) was hoping to add that feature. On Mon, May 4, 2015 at 9:05 PM, Mani man...@vt.edu wrote: I am trying to distribute a table using a particular column which is the key that I’ll be using to perform join operations on the table. Is it possible to do this with Spark SQL? I checked the method partitionBy() for rdds. But not sure how to specify which column is the key? Can anyone give an example? Thanks Mani Graduate Student, Department of Computer Science Virginia Tech
Re: Is LIMIT n in Spark SQL useful?
Robin,My query statement is as below:select id, name, trans_date, gender, hobby, job, country from Employees LIMIT 100 In PostgreSQL, it works very well. For 10M records in DB, it just took less than 20ms, but in SparkSQL, it took long time. Michael, Got it. For me, it is not good news. Anyway, thanks. Regards,Yi On Tuesday, May 5, 2015 5:59 AM, Michael Armbrust mich...@databricks.com wrote: The JDBC interface for Spark SQL does not support pushing down limits today. On Mon, May 4, 2015 at 8:06 AM, Robin East robin.e...@xense.co.uk wrote: and a further question - have you tried running this query in pqsl? what’s the performance like there? On 4 May 2015, at 16:04, Robin East robin.e...@xense.co.uk wrote: What query are you running. It may be the case that your query requires PosgreSQL to do a large amount of work before identifying the first n rows On 4 May 2015, at 15:52, Yi Zhang zhangy...@yahoo.com.INVALID wrote: I am trying to query PostgreSQL using LIMIT(n) to reduce memory size and improve query performance, but I found it took long time as same as querying not using LIMIT. It let me confused. Anybody know why? Thanks. Regards,Yi
Re: Spark - Timeout Issues - OutOfMemoryError
Data Set 1 : viEvents : Is the event activity data of 1 day. I took 10 files out of it and 10 records *Item ID Count* 201335783004 3419 191568402102 1793 111654479898 1362 181503913062 1310 261798565828 1028 111654493548 994 231516683056 862 131497785968 746 161666914810 633 221749455474 432 201324502754 410 201334042634 402 191562605592 380 271841178238 362 161663339210 344 251615941886 313 261855748678 309 271821726658 255 111657099518 224 261868369938 218 181725710132 216 171766164072 215 221757076934 213 171763906872 212 111650132368 206 181629904282 204 261867932788 198 161668475280 194 191398227282 194 Data set 2: ItemID Count 2217305702 1 3842604614 1 4463421160 1 4581260446 1 4632783223 1 4645316947 1 4760829454 1 4786989430 1 5530758430 1 5610056107 1 5661929425 1 5953801612 1 6141607456 1 6197204992 1 6220704442 1 6271022614 1 6282402871 1 6525123621 1 6554834772 1 6566297541 1 This data set will always have only one element for each item as it contains metadata information. Given the nature of these two datasets, if at all there is skewness then it must be with dataset1. In dataset1 the top 20-30 records do not have record count for a given itemID (shuffle key) greater than 3000 and that is very small. Why am i still *not* able to do a join of these two datasets given i have unlimited capacity, repartitioning but 12G memory limit on each node. Each time i get a task that runs forever and it process roughly around 1.5G data when others are processing few MBs. Also 1.5G data (shuffle read size) is very small. Please suggest. On Mon, May 4, 2015 at 9:08 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I tried this val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))] = lstgItem.join(viEvents, new org.apache.spark.RangePartitioner(partitions = 1200, rdd = viEvents)).map { It fired two jobs and still i have 1 task that never completes. IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDurationGC TimeShuffle Read Size / Records ▴Shuffle Spill (Memory)Shuffle Spill (Disk)Errors 0 4818 0 RUNNING PROCESS_LOCAL 5 / host1 2015/05/04 07:24:25 1.1 h 13 min 778.0 MB / 50314161 4.5 GB 47.4 MB 955 5773 0 SUCCESS PROCESS_LOCAL 5 / host2 2015/05/04 07:47:16 2.2 min 1.5 min 106.3 MB / 4197539 0.0 B 0.0 B 1199 6017 0 SUCCESS PROCESS_LOCAL 3 / host3 2015/05/04 07:51:51 48 s 2 s 94.2 MB / 3618335 2.8 GB 8.6 MB 216 2) I tried reversing the datasets in join val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))] =viEvents.join(lstgItem) This led to same problem of a long running task. 3) Next, i am trying this val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))] = lstgItem.join(viEvents, 1200).map { I have exhausted all my options. Regards, Deepak On Mon, May 4, 2015 at 6:24 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I ran it against one file instead of 10 files and i see one task is still running after 33 mins its shuffle read size is 780MB/50 mil records. I did a count of records for each itemId from dataset-2 [One FILE] (Second Dataset (RDDPair) val viEvents = viEventsRaw.map { vi = (vi.get(14 ).asInstanceOf[Long], vi) } ). This is the dataset that contains the list of items viewed by user in one day. *Item IdCount* 201335783004 537 111654496030 353 141640703798 287 191568402102 258 111654479898 217 231521843148 211 251931716094 201 111654493548 181 181503913062 181 121635453050 152 261798565828 140 151494617682 139 251927181728 127 231516683056 119 141640492864 117 161677270656 117 171771073616 113 111649942124 109 191516989450 97 231539161292 94 221555628408 88 131497785968 87 121632233872 84 131335379184 83 281531363490 83 131492727742 79 231174157820 79 161666914810 77 251699753072 77 161683664300 76 I was assuming that data-skew would be if the top item(201335783004) had a count of 1 million, however its only few hundreds, then why is Spark skewing it in join ? What should i do that Spark distributes the records evenly ? In M/R we can change the Partitioner between mapper and reducer, how can i do in Spark for Join? IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDurationGC TimeShuffle Read Size / Records ▴Shuffle Spill (Memory)Shuffle Spill (Disk)Errors 0 3618 0 RUNNING PROCESS_LOCAL 4 / host12015/05/04 05:09:53 33 min 8.5 min 783.9 MB / 50,761,322 4.6 GB 47.5 MB 433 4051 0 SUCCESS PROCESS_LOCAL 1 / host2 2015/05/04 05:16:27 1.1 min 20 s 116.0 MB / 4505143 1282.3 MB 10.1 MB 218 3836 0 SUCCESS PROCESS_LOCAL 3 / host3 2015/05/04 05:13:01 53 s 11 s 76.4 MB / 2865143 879.6 MB 6.9 MB 113 3731 0 SUCCESS PROCESS_LOCAL 2 / host4 2015/05/04 05:11:30 31 s 8 s 6.9 MB / 5187 0.0 B 0.0 B On Mon, May 4, 2015 at 6:00 PM, Saisai Shao sai.sai.s...@gmail.com wrote: From the symptoms you mentioned that one
Re: sparksql running slow while joining_2_tables.
I assume you’re using the DataFrame API within your application. sql(“SELECT…”).explain(true) From: Wang, Daoyuan Sent: Tuesday, May 5, 2015 10:16 AM To: luohui20...@sina.com; Cheng, Hao; Olivier Girardot; user Subject: RE: 回复:RE: 回复:Re: sparksql running slow while joining_2_tables. You can use Explain extended select …. From: luohui20...@sina.commailto:luohui20...@sina.com [mailto:luohui20...@sina.com] Sent: Tuesday, May 05, 2015 9:52 AM To: Cheng, Hao; Olivier Girardot; user Subject: 回复:RE: 回复:Re: sparksql running slow while joining_2_tables. As I know broadcastjoin is automatically enabled by spark.sql.autoBroadcastJoinThreshold. refer to http://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration-options and how to check my app's physical plan,and others things like optimized plan,executable plan.etc thanks Thanksamp;Best regards! 罗辉 San.Luo - 原始邮件 - 发件人:Cheng, Hao hao.ch...@intel.commailto:hao.ch...@intel.com 收件人:Cheng, Hao hao.ch...@intel.commailto:hao.ch...@intel.com, luohui20...@sina.commailto:luohui20...@sina.com luohui20...@sina.commailto:luohui20...@sina.com, Olivier Girardot ssab...@gmail.commailto:ssab...@gmail.com, user user@spark.apache.orgmailto:user@spark.apache.org 主题:RE: 回复:Re: sparksql running slow while joining_2_tables. 日期:2015年05月05日 08点38分 Or, have you ever try broadcast join? From: Cheng, Hao [mailto:hao.ch...@intel.com] Sent: Tuesday, May 5, 2015 8:33 AM To: luohui20...@sina.commailto:luohui20...@sina.com; Olivier Girardot; user Subject: RE: 回复:Re: sparksql running slow while joining 2 tables. Can you print out the physical plan? EXPLAIN SELECT xxx… From: luohui20...@sina.commailto:luohui20...@sina.com [mailto:luohui20...@sina.com] Sent: Monday, May 4, 2015 9:08 PM To: Olivier Girardot; user Subject: 回复:Re: sparksql running slow while joining 2 tables. hi Olivier spark1.3.1, with java1.8.0.45 and add 2 pics . it seems like a GC issue. I also tried with different parameters like memory size of driverexecutor, memory fraction, java opts... but this issue still happens. Thanksamp;Best regards! 罗辉 San.Luo - 原始邮件 - 发件人:Olivier Girardot ssab...@gmail.commailto:ssab...@gmail.com 收件人:luohui20...@sina.commailto:luohui20...@sina.com, user user@spark.apache.orgmailto:user@spark.apache.org 主题:Re: sparksql running slow while joining 2 tables. 日期:2015年05月04日 20点46分 Hi, What is you Spark version ? Regards, Olivier. Le lun. 4 mai 2015 à 11:03, luohui20...@sina.commailto:luohui20...@sina.com a écrit : hi guys when i am running a sql like select a.namehttp://a.name,a.startpoint,a.endpoint, a.piece from db a join sample b on (a.namehttp://a.name = b.namehttp://b.name) where (b.startpoint a.startpoint + 25); I found sparksql running slow in minutes which may caused by very long GC and shuffle time. table db is created from a txt file size at 56mb while table sample sized at 26mb, both at small size. my spark cluster is a standalone pseudo-distributed spark cluster with 8g executor and 4g driver manager. any advises? thank you guys. Thanksamp;Best regards! 罗辉 San.Luo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org
Spark JVM default memory
Starting the master with /sbin/start-master.sh creates a JVM with only 512MB of memory. How to change this default amount of memory? Thanks, Vijay
Re: No logs from my cluster / worker ... (running DSE 4.6.1)
bq. its Spark libs are all at 2.10 Clarification: 2.10 is version of Scala Your Spark version is 1.1.0 You can use earlier release of Kafka. Cheers On Mon, May 4, 2015 at 2:39 PM, Eric Ho eric...@intel.com wrote: I still prefer to use Spark core / streaming /... at 2.10 becuase my DSE is at 4.6.1 and its Spark libs are all at 2.10 ... My Scala code will run on DSE machines which have Spark enabled. So, should I grab my Kakfa server at here ? https://archive.apache.org/dist/kafka/0.8.0/kafka_2.8.0-0.8.0.tar.gz On Mon, May 4, 2015 at 1:07 PM, Ted Yu yuzhih...@gmail.com wrote: Looks like you're using Spark 1.1.0 Support for Kafka 0.8.2 was added by: https://issues.apache.org/jira/browse/SPARK-2808 which would come in Spark 1.4.0 FYI On Mon, May 4, 2015 at 12:22 PM, Eric Ho eric...@intel.com wrote: I'm submitting this via 'dse spark-submit' but somehow, I don't see any loggings in my cluster or worker machines... How can I find out ? My cluster is running DSE 4.6.1 with Spark enabled. My source is running Kafka 0.8.2.0 I'm launching my program on one of my DSE machines. Any insights much appreciated. Thanks. - cas1.dev% dse spark-submit --verbose --deploy-mode cluster --master spark://cas1.dev.kno.com:7077 --class com.kno.highlights.counting.service.HighlightConsumer --driver-class-path /opt/kno/kno-highlights-counting-service/kno-highlights-counting-service-0.1/lib --driver-library-path /opt/kno/kno-highlights-counting-service/kno-highlights-counting-service-0.1/lib --properties-file /tmp/highlights-counting.properties /opt/kno/kno-highlights-counting-service/kno-highlights-counting-service-0.1/lib/kno-highlights-counting-service.kno-highlights-counting-service-0.1.jar --name HighlightConsumer Using properties file: /tmp/highlights-counting.properties Warning: Ignoring non-spark config property: checkpoint_directory=checkpointForHighlights Warning: Ignoring non-spark config property: zookeeper_port=2181 Warning: Ignoring non-spark config property: default_num_of_cores_per_topic=1 Warning: Ignoring non-spark config property: num_of_concurrent_streams=2 Warning: Ignoring non-spark config property: kafka_consumer_group=highlight_consumer_group Warning: Ignoring non-spark config property: app_name=HighlightConsumer Warning: Ignoring non-spark config property: cassandra_keyspace=bookevents Warning: Ignoring non-spark config property: scheduler_mode=FIFO Warning: Ignoring non-spark config property: highlight_topic=highlight_topic Warning: Ignoring non-spark config property: cassandra_host= cas1.dev.kno.com Warning: Ignoring non-spark config property: checkpoint_interval=3 Warning: Ignoring non-spark config property: zookeeper_host= cas1.dev.kno.com Adding default property: spark_master=spark://cas1.dev.kno.com:7077 Warning: Ignoring non-spark config property: streaming_window=10 Using properties file: /tmp/highlights-counting.properties Warning: Ignoring non-spark config property: checkpoint_directory=checkpointForHighlights Warning: Ignoring non-spark config property: zookeeper_port=2181 Warning: Ignoring non-spark config property: default_num_of_cores_per_topic=1 Warning: Ignoring non-spark config property: num_of_concurrent_streams=2 Warning: Ignoring non-spark config property: kafka_consumer_group=highlight_consumer_group Warning: Ignoring non-spark config property: app_name=HighlightConsumer Warning: Ignoring non-spark config property: cassandra_keyspace=bookevents Warning: Ignoring non-spark config property: scheduler_mode=FIFO Warning: Ignoring non-spark config property: highlight_topic=highlight_topic Warning: Ignoring non-spark config property: cassandra_host= cas1.dev.kno.com Warning: Ignoring non-spark config property: checkpoint_interval=3 Warning: Ignoring non-spark config property: zookeeper_host= cas1.dev.kno.com Adding default property: spark_master=spark://cas1.dev.kno.com:7077 Warning: Ignoring non-spark config property: streaming_window=10 Parsed arguments: master spark://cas1.dev.kno.com:7077 deployMode cluster executorMemory null executorCores null totalExecutorCores null propertiesFile /tmp/highlights-counting.properties extraSparkPropertiesMap() driverMemorynull driverCores null driverExtraClassPath /opt/kno/kno-highlights-counting-service/kno-highlights-counting-service-0.1/lib driverExtraLibraryPath /opt/kno/kno-highlights-counting-service/kno-highlights-counting-service-0.1/lib driverExtraJavaOptions null supervise false queue null numExecutorsnull files null pyFiles null archivesnull mainClass com.kno.highlights.counting.service.HighlightConsumer primaryResource
Re: ReduceByKey and sorting within partitions
shoot me an email if you need any help with spark-sorted. it does not (yet?) have a java api, so you will have to work in scala On Mon, May 4, 2015 at 4:05 PM, Burak Yavuz brk...@gmail.com wrote: I think this Spark Package may be what you're looking for! http://spark-packages.org/package/tresata/spark-sorted Best, Burak On Mon, May 4, 2015 at 12:56 PM, Imran Rashid iras...@cloudera.com wrote: oh wow, that is a really interesting observation, Marco Jerry. I wonder if this is worth exposing in combineByKey()? I think Jerry's proposed workaround is all you can do for now -- use reflection to side-step the fact that the methods you need are private. On Mon, Apr 27, 2015 at 8:07 AM, Saisai Shao sai.sai.s...@gmail.com wrote: Hi Marco, As I know, current combineByKey() does not expose the related argument where you could set keyOrdering on the ShuffledRDD, since ShuffledRDD is package private, if you can get the ShuffledRDD through reflection or other way, the keyOrdering you set will be pushed down to shuffle. If you use a combination of transformations to do it, the result will be same but the efficiency may be different, some transformations will separate into different stages, which will introduce additional shuffle. Thanks Jerry 2015-04-27 19:00 GMT+08:00 Marco marcope...@gmail.com: Hi, I'm trying, after reducing by key, to get data ordered among partitions (like RangePartitioner) and within partitions (like sortByKey or repartitionAndSortWithinPartition) pushing the sorting down to the shuffles machinery of the reducing phase. I think, but maybe I'm wrong, that the correct way to do that is that combineByKey call setKeyOrdering function on the ShuflleRDD that it returns. Am I wrong? Can be done by a combination of other transformations with the same efficiency? Thanks, Marco - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Python Custom Partitioner
Thanks, but is there non broadcast solution? On 5 May 2015 01:34, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I have implemented map-side join with broadcast variables and the code is on mailing list (scala). On Mon, May 4, 2015 at 8:38 PM, ayan guha guha.a...@gmail.com wrote: Hi Can someone share some working code for custom partitioner in python? I am trying to understand it better. Here is documentation partitionBy(*numPartitions*, *partitionFunc=function portable_hash at 0x2c45140*) https://spark.apache.org/docs/1.3.1/api/python/pyspark.html#pyspark.RDD.partitionBy Return a copy of the RDD partitioned using the specified partitioner. what I am trying to do - 1. Create a dataframe 2. Partition it using one specific column 3. create another dataframe 4. partition it on the same column 5. join (to enforce map-side join) My question: a) Am I on right path? b) How can I do partitionby? Specifically, when I call DF.rdd.partitionBy, what gets passed to the custom function? tuple? row? how to access (say 3rd column of a tuple inside partitioner function)? -- Best Regards, Ayan Guha -- Deepak
Re: Is LIMIT n in Spark SQL useful?
The JDBC interface for Spark SQL does not support pushing down limits today. On Mon, May 4, 2015 at 8:06 AM, Robin East robin.e...@xense.co.uk wrote: and a further question - have you tried running this query in pqsl? what’s the performance like there? On 4 May 2015, at 16:04, Robin East robin.e...@xense.co.uk wrote: What query are you running. It may be the case that your query requires PosgreSQL to do a large amount of work before identifying the first n rows On 4 May 2015, at 15:52, Yi Zhang zhangy...@yahoo.com.INVALID wrote: I am trying to query PostgreSQL using LIMIT(n) to reduce memory size and improve query performance, but I found it took long time as same as querying not using LIMIT. It let me confused. Anybody know why? Thanks. Regards, Yi
Re: Long GC pauses with Spark SQL 1.3.0 and billion row tables
If you data is evenly distributed (i.e. no skewed datapoints in your join keys), it can also help to increase spark.sql.shuffle.partitions (default is 200). On Mon, May 4, 2015 at 8:03 AM, Richard Marscher rmarsc...@localytics.com wrote: In regards to the large GC pauses, assuming you allocated all 100GB of memory per worker you may consider running with less memory on your Worker nodes, or splitting up the available memory on the Worker nodes amongst several worker instances. The JVM's garbage collection starts to become very slow as the memory allocation for the heap becomes large. At 100GB it may not be unusual to see GC take minutes at time. I believe with Yarn or Standalone clusters you should be able to run multiple smaller JVM instances on your workers so you can still use your cluster resources but also won't have a single JVM allocating an unwieldy amount of much memory. On Mon, May 4, 2015 at 2:23 AM, Nick Travers n.e.trav...@gmail.com wrote: Could you be more specific in how this is done? A DataFrame class doesn't have that method. On Sun, May 3, 2015 at 11:07 PM, ayan guha guha.a...@gmail.com wrote: You can use custom partitioner to redistribution using partitionby On 4 May 2015 15:37, Nick Travers n.e.trav...@gmail.com wrote: I'm currently trying to join two large tables (order 1B rows each) using Spark SQL (1.3.0) and am running into long GC pauses which bring the job to a halt. I'm reading in both tables using a HiveContext with the underlying files stored as Parquet Files. I'm using something along the lines of HiveContext.sql(SELECT a.col1, b.col2 FROM a JOIN b ON a.col1 = b.col1) to set up the join. When I execute this (with an action such as .count) I see the first few stages complete, but the job eventually stalls. The GC counts keep increasing for each executor. Running with 6 workers, each with 2T disk and 100GB RAM. Has anyone else run into this issue? I'm thinking I might be running into issues with the shuffling of the data, but I'm unsure of how to get around this? Is there a way to redistribute the rows based on the join key first, and then do the join? Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Long-GC-pauses-with-Spark-SQL-1-3-0-and-billion-row-tables-tp22750.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: MLLib SVM probability
Currently, SVMs don't have built-in multiclass support. Logistic Regression supports multiclass, as do trees and random forests. It would be great to add multiclass support for SVMs as well. There is some ongoing work on generic multiclass-to-binary reductions: https://issues.apache.org/jira/browse/SPARK-7015 I agree that naive one-vs-all reductions might not work that well, but that the raw scores could be calibrated using the scaling you mentioned, or other methods. Joseph On Mon, May 4, 2015 at 6:29 AM, Driesprong, Fokko fo...@driesprong.frl wrote: Hi Robert, I would say, taking the sign of the numbers represent the class of the input-vector. What kind of data are you using, and what kind of traning-set do you use. Fundamentally a SVM is able to separate only two classes, you can do one vs the rest as you mentioned. I don't see how LVQ can benefit the SVM classifier. I would say that this is more a SVM problem, than a Spark. 2015-05-04 15:22 GMT+02:00 Robert Musters robert.must...@openindex.io: Hi all, I am trying to understand the output of the SVM classifier. Right now, my output looks like this: -18.841544889249917 0.0 168.32916035523283 1.0 420.67763915879794 1.0 -974.1942589201286 0.0 71.73602841256813 1.0 233.13636224524993 1.0 -1000.5902168199027 0.0 The documentation is unclear about what these numbers mean https://spark.apache.org/docs/0.9.2/api/mllib/index.html#org.apache.spark.mllib.regression.LabeledPoint . I think it is the distance to the hyperplane with sign. My main question is: How can I convert distances from hyperplanes to probabilities in a multi-class one-vs-all approach? SVMLib http://www.csie.ntu.edu.tw/~cjlin/libsvm/ has this functionality and refers the process to get the probabilities as “Platt scaling” http://www.researchgate.net/profile/John_Platt/publication/2594015_Probabilistic_Outputs_for_Support_Vector_Machines_and_Comparisons_to_Regularized_Likelihood_Methods/links/004635154cff5262d600.pdf. I think this functionality should be in MLLib, but I can't find it? Do you think Platt scaling makes sense? Making clusters using Learning Vector Quantization, determining the spread function of a cluster with a Gaussian function and then retrieving the probability makes a lot more sense i.m.o. Using the distances from the hyperplanes from several SVM classifiers and then trying to determine some probability on these distance measures, does not make any sense, because the distribution property of the data-points belonging to a cluster is not taken into account. Does anyone see a fallacy in my reasoning? With kind regards, Robert
RE: Spark JVM default memory
Did you confirm through the Spark UI how much memory is getting allocated to your application on each worker? Mohammed From: Vijayasarathy Kannan [mailto:kvi...@vt.edu] Sent: Monday, May 4, 2015 3:36 PM To: Andrew Ash Cc: user@spark.apache.org Subject: Re: Spark JVM default memory I am trying to read in a file (4GB file). I tried setting both spark.driver.memory and spark.executor.memory to large values (say 16GB) but I still get a GC limit exceeded error. Any idea what I am missing? On Mon, May 4, 2015 at 5:30 PM, Andrew Ash and...@andrewash.commailto:and...@andrewash.com wrote: It's unlikely you need to increase the amount of memory on your master node since it does simple bookkeeping. The majority of the memory pressure across a cluster is on executor nodes. See the conf/spark-env.sh file for configuring heap sizes, and this section in the docs for more information on how to make these changes: http://spark.apache.org/docs/latest/configuration.html On Mon, May 4, 2015 at 2:24 PM, Vijayasarathy Kannan kvi...@vt.edumailto:kvi...@vt.edu wrote: Starting the master with /sbin/start-master.sh creates a JVM with only 512MB of memory. How to change this default amount of memory? Thanks, Vijay
Re: SparkSQL Nested structure
You are looking for LATERAL VIEW explode https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-explode in HiveQL. On Mon, May 4, 2015 at 7:49 AM, Giovanni Paolo Gibilisco gibb...@gmail.com wrote: Hi, I'm trying to parse log files generated by Spark using SparkSQL. In the JSON elements related to the StageCompleted event we have a nested structre containing an array of elements with RDD Info. (see the log below as an example (omitting some parts). { Event: SparkListenerStageCompleted, Stage Info: { Stage ID: 1, ... RDD Info: [ { RDD ID: 5, Name: 5, Storage Level: { Use Disk: false, Use Memory: false, Use Tachyon: false, Deserialized: false, Replication: 1 }, Number of Partitions: 2, Number of Cached Partitions: 0, Memory Size: 0, Tachyon Size: 0, Disk Size: 0 }, ... When i register the log as a table SparkSQL is able to generate the correct schema that for the RDD Info element looks like | -- RDD Info: array (nullable = true) ||-- element: struct (containsNull = true) |||-- Disk Size: long (nullable = true) |||-- Memory Size: long (nullable = true) |||-- Name: string (nullable = true) My problem is that if I try to query the table I can only get array buffers out of it: SELECT `stageEndInfos.Stage Info.Stage ID`, `stageEndInfos.Stage Info.RDD Info` FROM stageEndInfos Stage ID RDD Info 1ArrayBuffer([0,0,... 0ArrayBuffer([0,0,... 2ArrayBuffer([0,0,... or: SELECT `stageEndInfos.Stage Info.RDD Info.RDD ID` FROM stageEndInfos RDD ID ArrayBuffer(5, 4, 3) ArrayBuffer(2, 1, 0) ArrayBuffer(9, 6,... Is there a way to explode the arrays in the rows in order to build a single table? (Knowing that the RDD ID is unique and can be used as primary key)? Thanks! How can I get
Re: Spark JVM default memory
I am not able to access the web UI for some reason. But the logs (being written while running my application) show that only 385Mb are being allocated for each executor (or slave nodes) while the executor memory I set is 16Gb. This 385Mb is not the same for each run either. It looks random (sometimes 1G, sometimes 512M, etc.) On Mon, May 4, 2015 at 6:57 PM, Mohammed Guller moham...@glassbeam.com wrote: Did you confirm through the Spark UI how much memory is getting allocated to your application on each worker? Mohammed *From:* Vijayasarathy Kannan [mailto:kvi...@vt.edu] *Sent:* Monday, May 4, 2015 3:36 PM *To:* Andrew Ash *Cc:* user@spark.apache.org *Subject:* Re: Spark JVM default memory I am trying to read in a file (4GB file). I tried setting both spark.driver.memory and spark.executor.memory to large values (say 16GB) but I still get a GC limit exceeded error. Any idea what I am missing? On Mon, May 4, 2015 at 5:30 PM, Andrew Ash and...@andrewash.com wrote: It's unlikely you need to increase the amount of memory on your master node since it does simple bookkeeping. The majority of the memory pressure across a cluster is on executor nodes. See the conf/spark-env.sh file for configuring heap sizes, and this section in the docs for more information on how to make these changes: http://spark.apache.org/docs/latest/configuration.html On Mon, May 4, 2015 at 2:24 PM, Vijayasarathy Kannan kvi...@vt.edu wrote: Starting the master with /sbin/start-master.sh creates a JVM with only 512MB of memory. How to change this default amount of memory? Thanks, Vijay
Re: Spark JVM default memory
I am trying to read in a file (4GB file). I tried setting both spark.driver.memory and spark.executor.memory to large values (say 16GB) but I still get a GC limit exceeded error. Any idea what I am missing? On Mon, May 4, 2015 at 5:30 PM, Andrew Ash and...@andrewash.com wrote: It's unlikely you need to increase the amount of memory on your master node since it does simple bookkeeping. The majority of the memory pressure across a cluster is on executor nodes. See the conf/spark-env.sh file for configuring heap sizes, and this section in the docs for more information on how to make these changes: http://spark.apache.org/docs/latest/configuration.html On Mon, May 4, 2015 at 2:24 PM, Vijayasarathy Kannan kvi...@vt.edu wrote: Starting the master with /sbin/start-master.sh creates a JVM with only 512MB of memory. How to change this default amount of memory? Thanks, Vijay
Re: ReduceByKey and sorting within partitions
oh wow, that is a really interesting observation, Marco Jerry. I wonder if this is worth exposing in combineByKey()? I think Jerry's proposed workaround is all you can do for now -- use reflection to side-step the fact that the methods you need are private. On Mon, Apr 27, 2015 at 8:07 AM, Saisai Shao sai.sai.s...@gmail.com wrote: Hi Marco, As I know, current combineByKey() does not expose the related argument where you could set keyOrdering on the ShuffledRDD, since ShuffledRDD is package private, if you can get the ShuffledRDD through reflection or other way, the keyOrdering you set will be pushed down to shuffle. If you use a combination of transformations to do it, the result will be same but the efficiency may be different, some transformations will separate into different stages, which will introduce additional shuffle. Thanks Jerry 2015-04-27 19:00 GMT+08:00 Marco marcope...@gmail.com: Hi, I'm trying, after reducing by key, to get data ordered among partitions (like RangePartitioner) and within partitions (like sortByKey or repartitionAndSortWithinPartition) pushing the sorting down to the shuffles machinery of the reducing phase. I think, but maybe I'm wrong, that the correct way to do that is that combineByKey call setKeyOrdering function on the ShuflleRDD that it returns. Am I wrong? Can be done by a combination of other transformations with the same efficiency? Thanks, Marco - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark kryo serialization question
yes, you should register all three. The truth is, you only *need* to register classes that will get serialized -- either via RDD caching or in a shuffle. So if a type is only used as an intermediate inside a stage, you don't need to register it. But the overhead of registering extra classes is pretty minimal, so as long as you do this within reason, I think you're OK. Imran On Thu, Apr 30, 2015 at 12:34 AM, 邓刚[技术中心] triones.d...@vipshop.com wrote: Hi all We know that spark support Kryo serialization, suppose there is a map function which map C to K,V(here C,K,V are instance of class C,K,V), when we register kryo serialization, should I register all of these three class? Best Wishes Triones Deng 本电子邮件可能为保密文件。如果阁下非电子邮件所指定之收件人,谨请立即通知本人。敬请阁下不要使用、保存、复印、打印、散布本电子邮件及其内容,或将其用于其他任何目的或向任何人披露。谢谢您的合作! This communication is intended only for the addressee(s) and may contain information that is privileged and confidential. You are hereby notified that, if you are not an intended recipient listed above, or an authorized employee or agent of an addressee of this communication responsible for delivering e-mail messages to an intended recipient, any dissemination, distribution or reproduction of this communication (including any attachments hereto) is strictly prohibited. If you have received this communication in error, please notify us immediately by a reply e-mail addressed to the sender and permanently delete the original e-mail communication and any attachments from all storage devices without making or otherwise retaining a copy.
Re: No logs from my cluster / worker ... (running DSE 4.6.1)
Looks like you're using Spark 1.1.0 Support for Kafka 0.8.2 was added by: https://issues.apache.org/jira/browse/SPARK-2808 which would come in Spark 1.4.0 FYI On Mon, May 4, 2015 at 12:22 PM, Eric Ho eric...@intel.com wrote: I'm submitting this via 'dse spark-submit' but somehow, I don't see any loggings in my cluster or worker machines... How can I find out ? My cluster is running DSE 4.6.1 with Spark enabled. My source is running Kafka 0.8.2.0 I'm launching my program on one of my DSE machines. Any insights much appreciated. Thanks. - cas1.dev% dse spark-submit --verbose --deploy-mode cluster --master spark://cas1.dev.kno.com:7077 --class com.kno.highlights.counting.service.HighlightConsumer --driver-class-path /opt/kno/kno-highlights-counting-service/kno-highlights-counting-service-0.1/lib --driver-library-path /opt/kno/kno-highlights-counting-service/kno-highlights-counting-service-0.1/lib --properties-file /tmp/highlights-counting.properties /opt/kno/kno-highlights-counting-service/kno-highlights-counting-service-0.1/lib/kno-highlights-counting-service.kno-highlights-counting-service-0.1.jar --name HighlightConsumer Using properties file: /tmp/highlights-counting.properties Warning: Ignoring non-spark config property: checkpoint_directory=checkpointForHighlights Warning: Ignoring non-spark config property: zookeeper_port=2181 Warning: Ignoring non-spark config property: default_num_of_cores_per_topic=1 Warning: Ignoring non-spark config property: num_of_concurrent_streams=2 Warning: Ignoring non-spark config property: kafka_consumer_group=highlight_consumer_group Warning: Ignoring non-spark config property: app_name=HighlightConsumer Warning: Ignoring non-spark config property: cassandra_keyspace=bookevents Warning: Ignoring non-spark config property: scheduler_mode=FIFO Warning: Ignoring non-spark config property: highlight_topic=highlight_topic Warning: Ignoring non-spark config property: cassandra_host= cas1.dev.kno.com Warning: Ignoring non-spark config property: checkpoint_interval=3 Warning: Ignoring non-spark config property: zookeeper_host= cas1.dev.kno.com Adding default property: spark_master=spark://cas1.dev.kno.com:7077 Warning: Ignoring non-spark config property: streaming_window=10 Using properties file: /tmp/highlights-counting.properties Warning: Ignoring non-spark config property: checkpoint_directory=checkpointForHighlights Warning: Ignoring non-spark config property: zookeeper_port=2181 Warning: Ignoring non-spark config property: default_num_of_cores_per_topic=1 Warning: Ignoring non-spark config property: num_of_concurrent_streams=2 Warning: Ignoring non-spark config property: kafka_consumer_group=highlight_consumer_group Warning: Ignoring non-spark config property: app_name=HighlightConsumer Warning: Ignoring non-spark config property: cassandra_keyspace=bookevents Warning: Ignoring non-spark config property: scheduler_mode=FIFO Warning: Ignoring non-spark config property: highlight_topic=highlight_topic Warning: Ignoring non-spark config property: cassandra_host= cas1.dev.kno.com Warning: Ignoring non-spark config property: checkpoint_interval=3 Warning: Ignoring non-spark config property: zookeeper_host= cas1.dev.kno.com Adding default property: spark_master=spark://cas1.dev.kno.com:7077 Warning: Ignoring non-spark config property: streaming_window=10 Parsed arguments: master spark://cas1.dev.kno.com:7077 deployMode cluster executorMemory null executorCores null totalExecutorCores null propertiesFile /tmp/highlights-counting.properties extraSparkPropertiesMap() driverMemorynull driverCores null driverExtraClassPath /opt/kno/kno-highlights-counting-service/kno-highlights-counting-service-0.1/lib driverExtraLibraryPath /opt/kno/kno-highlights-counting-service/kno-highlights-counting-service-0.1/lib driverExtraJavaOptions null supervise false queue null numExecutorsnull files null pyFiles null archivesnull mainClass com.kno.highlights.counting.service.HighlightConsumer primaryResource file:/opt/kno/kno-highlights-counting-service/kno-highlights-counting-service-0.1/lib/kno-highlights-counting-service.kno-highlights-counting-service-0.1.jar name com.kno.highlights.counting.service.HighlightConsumer childArgs [--name HighlightConsumer] jarsnull verbose true Default properties from /tmp/highlights-counting.properties: spark_master - spark://cas1.dev.kno.com:7077 Using properties file: /tmp/highlights-counting.properties Warning: Ignoring non-spark config property: checkpoint_directory=checkpointForHighlights Warning: Ignoring non-spark config property: zookeeper_port=2181
Re: Spark JVM default memory
It's unlikely you need to increase the amount of memory on your master node since it does simple bookkeeping. The majority of the memory pressure across a cluster is on executor nodes. See the conf/spark-env.sh file for configuring heap sizes, and this section in the docs for more information on how to make these changes: http://spark.apache.org/docs/latest/configuration.html On Mon, May 4, 2015 at 2:24 PM, Vijayasarathy Kannan kvi...@vt.edu wrote: Starting the master with /sbin/start-master.sh creates a JVM with only 512MB of memory. How to change this default amount of memory? Thanks, Vijay
Re: Spark partitioning question
Hi Marius, I am also a little confused -- are you saying that myPartitions is basically something like: class MyPartitioner extends Partitioner { def numPartitions = 1 def getPartition(key: Any) = 0 } ?? If so, I don't understand how you'd ever end up data in two partitions. Indeed, than everything before the call to partitionBy(myPartitioner) is somewhat irrelevant. The important point is the partitionsBy should put all the data in one partition, and then the operations after that do not move data between partitions. so if you're really observing data in two partitions, then it would good to know more about what version of spark you are on, your data etc. as it sounds like a bug. But, I have a feeling there is some misunderstanding about what your partitioner is doing. Eg., I think doing groupByKey followed by sortByKey doesn't make a lot of sense -- in general one sortByKey is all you need (its not exactly the same, but most probably close enough, and avoids doing another expensive shuffle). If you can share a bit more information on your partitioner, and what properties you need for your f, that might help. thanks, Imran On Tue, Apr 28, 2015 at 7:10 AM, Marius Danciu marius.dan...@gmail.com wrote: Hello all, I have the following Spark (pseudo)code: rdd = mapPartitionsWithIndex(...) .mapPartitionsToPair(...) .groupByKey() .sortByKey(comparator) .partitionBy(myPartitioner) .mapPartitionsWithIndex(...) .mapPartitionsToPair( *f* ) The input data has 2 input splits (yarn 2.6.0). myPartitioner partitions all the records on partition 0, which is correct, so the intuition is that f provided to the last transformation (mapPartitionsToPair) would run sequentially inside a single yarn container. However from yarn logs I do see that both yarn containers are processing records from the same partition ... and *sometimes* the over all job fails (due to the code in f which expects a certain order of records) and yarn container 1 receives the records as expected, whereas yarn container 2 receives a subset of records ... for a reason I cannot explain and f fails. The overall behavior of this job is that sometimes it succeeds and sometimes it fails ... apparently due to inconsistent propagation of sorted records to yarn containers. If any of this makes any sense to you, please let me know what I am missing. Best, Marius
Re: Extra stage that executes before triggering computation with an action
sortByKey() runs one job to sample the data, to determine what range of keys to put in each partition. There is a jira to change it to defer launching the job until the subsequent action, but it will still execute another stage: https://issues.apache.org/jira/browse/SPARK-1021 On Wed, Apr 29, 2015 at 5:57 PM, Tom Hubregtsen thubregt...@gmail.com wrote: I'm not sure, but I wonder if because you are using the Spark REPL that it may not be representing what a normal runtime execution would look like and is possibly eagerly running a partial DAG once you define an operation that would cause a shuffle. What happens if you setup your same set of commands [a-e] in a file and use the Spark REPL's `load` or `paste` command to load them all at once? From Richard I have also packaged it in a jar file (without [e], the debug string), and still see the extra stage before the other two that I would expect. Even when I remove [d], the action, I still see stage 0 being executed (and do not see stage 1 and 2). Again a shortened log of the Stage 0: INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[4] at sortByKey, which has no missing parents INFO DAGScheduler: ResultStage 0 (sortByKey) finished in 0.192 s -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Extra-stage-that-executes-before-triggering-computation-with-an-action-tp22707p22713.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: ReduceByKey and sorting within partitions
I think this Spark Package may be what you're looking for! http://spark-packages.org/package/tresata/spark-sorted Best, Burak On Mon, May 4, 2015 at 12:56 PM, Imran Rashid iras...@cloudera.com wrote: oh wow, that is a really interesting observation, Marco Jerry. I wonder if this is worth exposing in combineByKey()? I think Jerry's proposed workaround is all you can do for now -- use reflection to side-step the fact that the methods you need are private. On Mon, Apr 27, 2015 at 8:07 AM, Saisai Shao sai.sai.s...@gmail.com wrote: Hi Marco, As I know, current combineByKey() does not expose the related argument where you could set keyOrdering on the ShuffledRDD, since ShuffledRDD is package private, if you can get the ShuffledRDD through reflection or other way, the keyOrdering you set will be pushed down to shuffle. If you use a combination of transformations to do it, the result will be same but the efficiency may be different, some transformations will separate into different stages, which will introduce additional shuffle. Thanks Jerry 2015-04-27 19:00 GMT+08:00 Marco marcope...@gmail.com: Hi, I'm trying, after reducing by key, to get data ordered among partitions (like RangePartitioner) and within partitions (like sortByKey or repartitionAndSortWithinPartition) pushing the sorting down to the shuffles machinery of the reducing phase. I think, but maybe I'm wrong, that the correct way to do that is that combineByKey call setKeyOrdering function on the ShuflleRDD that it returns. Am I wrong? Can be done by a combination of other transformations with the same efficiency? Thanks, Marco - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Building DAG from log
Hi, I'm trying to build the DAG of an application from the logs. I've had a look at SparkReplayDebugger but it doesn't operato offline on logs. I looked also at the one in this pull: https://github.com/apache/spark/pull/2077 that seems to operate only on logs but it doesn't clealry show the dependency between the stages. Is there some other tool to do this? In the log files I could not find the information needed to define dependencies within the stages, is there any other way to derive this information offline? Thanks,
Re: AJAX with Apache Spark
Hi Sergio, you shouldn't architecture it this way, rather update a storage with Spark Streaming that your Play App will query. For example a Cassandra table, or Redis, or anything that will be able to answer you in milliseconds, rather than querying the Spark Streaming program. Regards, Olivier. Le lun. 4 mai 2015 à 20:08, Sergio Jiménez Barrio drarse.a...@gmail.com a écrit : Hi, I am trying create a DashBoard of a job of Apache Spark. I need run Spark Streaming 24/7 and when recive a ajax request this answer with the actual state of the job. I have created the client, and the program in Spark. I tried create the service of response with play, but this run the program with a request. I want send the accumulator of spark program with a request. Sorry for my explanation. Any idea? Maybe with Play? Thanks
RE: 回复:Re: sparksql running slow while joining 2 tables.
Or, have you ever try broadcast join? From: Cheng, Hao [mailto:hao.ch...@intel.com] Sent: Tuesday, May 5, 2015 8:33 AM To: luohui20...@sina.com; Olivier Girardot; user Subject: RE: 回复:Re: sparksql running slow while joining 2 tables. Can you print out the physical plan? EXPLAIN SELECT xxx… From: luohui20...@sina.commailto:luohui20...@sina.com [mailto:luohui20...@sina.com] Sent: Monday, May 4, 2015 9:08 PM To: Olivier Girardot; user Subject: 回复:Re: sparksql running slow while joining 2 tables. hi Olivier spark1.3.1, with java1.8.0.45 and add 2 pics . it seems like a GC issue. I also tried with different parameters like memory size of driverexecutor, memory fraction, java opts... but this issue still happens. Thanksamp;Best regards! 罗辉 San.Luo - 原始邮件 - 发件人:Olivier Girardot ssab...@gmail.commailto:ssab...@gmail.com 收件人:luohui20...@sina.commailto:luohui20...@sina.com, user user@spark.apache.orgmailto:user@spark.apache.org 主题:Re: sparksql running slow while joining 2 tables. 日期:2015年05月04日 20点46分 Hi, What is you Spark version ? Regards, Olivier. Le lun. 4 mai 2015 à 11:03, luohui20...@sina.commailto:luohui20...@sina.com a écrit : hi guys when i am running a sql like select a.namehttp://a.name,a.startpoint,a.endpoint, a.piece from db a join sample b on (a.namehttp://a.name = b.namehttp://b.name) where (b.startpoint a.startpoint + 25); I found sparksql running slow in minutes which may caused by very long GC and shuffle time. table db is created from a txt file size at 56mb while table sample sized at 26mb, both at small size. my spark cluster is a standalone pseudo-distributed spark cluster with 8g executor and 4g driver manager. any advises? thank you guys. Thanksamp;Best regards! 罗辉 San.Luo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org
回复:RE: 回复:Re: sparksql running slow while joining_2_tables.
As I know broadcastjoin is automatically enabled by spark.sql.autoBroadcastJoinThreshold.refer to http://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration-options and how to check my app's physical plan,and others things like optimized plan,executable plan.etc thanks Thanksamp;Best regards! 罗辉 San.Luo - 原始邮件 - 发件人:Cheng, Hao hao.ch...@intel.com 收件人:Cheng, Hao hao.ch...@intel.com, luohui20...@sina.com luohui20...@sina.com, Olivier Girardot ssab...@gmail.com, user user@spark.apache.org 主题:RE: 回复:Re: sparksql running slow while joining_2_tables. 日期:2015年05月05日 08点38分 Or, have you ever try broadcast join? From: Cheng, Hao [mailto:hao.ch...@intel.com] Sent: Tuesday, May 5, 2015 8:33 AM To: luohui20...@sina.com; Olivier Girardot; user Subject: RE: 回复:Re: sparksql running slow while joining 2 tables. Can you print out the physical plan? EXPLAIN SELECT xxx… From: luohui20...@sina.com [mailto:luohui20...@sina.com] Sent: Monday, May 4, 2015 9:08 PM To: Olivier Girardot; user Subject: 回复:Re: sparksql running slow while joining 2 tables. hi Olivier spark1.3.1, with java1.8.0.45 and add 2 pics . it seems like a GC issue. I also tried with different parameters like memory size of driverexecutor, memory fraction, java opts... but this issue still happens. Thanksamp;Best regards! 罗辉 San.Luo - 原始邮件 - 发件人:Olivier Girardot ssab...@gmail.com 收件人:luohui20...@sina.com, user user@spark.apache.org 主题:Re: sparksql running slow while joining 2 tables. 日期:2015年05月04日 20点46分 Hi, What is you Spark version ? Regards, Olivier. Le lun. 4 mai 2015 à 11:03, luohui20...@sina.com a écrit : hi guys when i am running a sql like select a.name,a.startpoint,a.endpoint, a.piece from db a join sample b on (a.name = b.name) where (b.startpoint a.startpoint + 25); I found sparksql running slow in minutes which may caused by very long GC and shuffle time. table db is created from a txt file size at 56mb while table sample sized at 26mb, both at small size. my spark cluster is a standalone pseudo-distributed spark cluster with 8g executor and 4g driver manager. any advises? thank you guys. Thanksamp;Best regards! 罗辉 San.Luo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
回复:spark Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient
you may need to copy hive-site.xml to your spark conf directory and check your hive metastore warehouse setting, and also check if you are authenticated to access hive metastore warehouse. Thanksamp;Best regards! 罗辉 San.Luo - 原始邮件 - 发件人:鹰 980548...@qq.com 收件人:user user@spark.apache.org 主题:spark Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient 日期:2015年05月05日 08点49分 hi all, when i use submit a spark-sql programe to select data from my hive database I get an error like this: User class threw exception: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient ,what's wrong with my spark configure ,thank any help !