spark table to hive table
Title: Samsung Enterprise Portal mySingle Hi all, I'm trying tocomparefunctionsavailable in Spark1.0 hqlto original HiveQL. But, when I testedfunctions such as 'rank', Spark didn't support some HiveQL functions. In case of Shark, it supports functions as well as Hive so I want to convert parquet file, SparkSQL table to Hive Table and analyze it with Shark. Is there any way to do this? Thanks, Kevin _ Kevin JungAssistantEngineer/BDA LabT+82-2-6155-8349 M +82-10-9288-1984 F +82-2-6155-0251 E itsjb.j...@samsung.com www.sds.samsung.com
Re: maprfs and spark libraries
As simple as that. Indeed, the spark jar i was linking to wasn't the mapr version. I just added spark-assembly-0.9.1-hadoop1.0.3-mapr-3.0.3.jar to the lib directory of my project as a unmanaged dependency for sbt. Thank you Cafe au Lait and to all of you guys. Regards, Nelson. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/maprfs-and-spark-libraries-tp6392p6414.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Map failed [dupliacte 1] error
Hi, I am getting the following error but I don't understand what the problem is. 14/05/27 17:44:29 INFO TaskSetManager: Loss was due to java.io.IOException: Map failed [duplicate 15] 14/05/27 17:44:30 INFO TaskSetManager: Starting task 47.0:43 as TID 60281 on executor 0: cm07 (PROCESS_LOCAL) 14/05/27 17:44:30 INFO TaskSetManager: Serialized task 47.0:43 as 2132 bytes in 0 ms 14/05/27 17:44:30 WARN TaskSetManager: Lost TID 60235 (task 47.0:3) 14/05/27 17:44:30 INFO TaskSetManager: Loss was due to java.io.IOException: Map failed [duplicate 16] 14/05/27 17:44:30 INFO TaskSetManager: Starting task 47.0:3 as TID 60282 on executor 3: cm04 (PROCESS_LOCAL) 14/05/27 17:44:30 INFO TaskSetManager: Serialized task 47.0:3 as 2132 bytes in 0 ms 14/05/27 17:44:30 WARN TaskSetManager: Lost TID 60273 (task 47.0:29) 14/05/27 17:44:30 INFO TaskSetManager: Loss was due to java.io.IOException: Map failed [duplicate 17] 14/05/27 17:44:30 ERROR TaskSetManager: Task 47.0:29 failed 4 times; aborting job 14/05/27 17:44:30 INFO DAGScheduler: Failed to run count at reasoner1.scala:144 [error] (run-main-0) org.apache.spark.SparkException: Job aborted: Task 47.0:29 failed 4 times (most recent failure: Except ion failure: java.io.IOException: Map failed) org.apache.spark.SparkException: Job aborted: Task 47.0:29 failed 4 times (most recent failure: Exception failure: java.io. IOException: Map failed) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAG Scheduler.scala:1020) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAG Scheduler.scala:1018) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1 018) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.sca la:190) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [trace] Stack trace suppressed: run last compile:run for the full output. 14/05/27 17:44:30 INFO ConnectionManager: Selector thread was interrupted! java.lang.RuntimeException: Nonzero exit code: 1 at scala.sys.package$.error(package.scala:27) [trace] Stack trace suppressed: run last compile:run for the full output. [error] (compile:run) Nonzero exit code: 1 [error] Total time: 172 s, completed 2014. 5. 27 오후 5:44:30 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Map-failed-dupliacte-1-error-tp6415.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: how to set task number?
when i using create table bigtable002 tblproperties('shark.cache'=' tachyon') as select * from bigtable001 limit 40; , there will be 4 files created on tachyon. but when i using create table bigtable002 tblproperties('shark.cache'=' tachyon') as select * from bigtable001 ; , there will be 35 files created on tachyon. so, I think spark/shark know how to split files when creating table, could i control it's spliting by setting some configuration ,such as setting map.split.size=64M ? 2014-05-26 12:14 GMT+08:00 qingyang li liqingyang1...@gmail.com: I using create table bigtable002 tblproperties('shark.cache'='tachyon') as select * from bigtable001 to create table bigtable002; while bigtable001 is load from hdfs, it's format is text file , so i think bigtable002's is text. 2014-05-26 11:14 GMT+08:00 Aaron Davidson ilike...@gmail.com: What is the format of your input data, prior to insertion into Tachyon? On Sun, May 25, 2014 at 7:52 PM, qingyang li liqingyang1...@gmail.comwrote: i tried set mapred.map.tasks=30 , it does not work, it seems shark does not support this setting. i also tried SET mapred.max.split.size=6400, it does not work,too. is there other way to control task number in shark CLI ? 2014-05-26 10:38 GMT+08:00 Aaron Davidson ilike...@gmail.com: You can try setting mapred.map.tasks to get Hive to do the right thing. On Sun, May 25, 2014 at 7:27 PM, qingyang li liqingyang1...@gmail.comwrote: Hi, Aaron, thanks for sharing. I am using shark to execute query , and table is created on tachyon. I think i can not using RDD#repartition() in shark CLI; if shark support SET mapred.max.split.size to control file size ? if yes, after i create table, i can control file num, then I can control task number. if not , do anyone know other way to control task number in shark CLI? 2014-05-26 9:36 GMT+08:00 Aaron Davidson ilike...@gmail.com: How many partitions are in your input data set? A possibility is that your input data has 10 unsplittable files, so you end up with 10 partitions. You could improve this by using RDD#repartition(). Note that mapPartitionsWithIndex is sort of the main processing loop for many Spark functions. It is iterating through all the elements of the partition and doing some computation (probably running your user code) on it. You can see the number of partitions in your RDD by visiting the Spark driver web interface. To access this, visit port 8080 on host running your Standalone Master (assuming you're running standalone mode), which will have a link to the application web interface. The Tachyon master also has a useful web interface, available at port 1. On Sun, May 25, 2014 at 5:43 PM, qingyang li liqingyang1...@gmail.com wrote: hi, Mayur, thanks for replying. I know spark application should take all cores by default. My question is how to set task number on each core ? If one silce, one task, how can i set silce file size ? 2014-05-23 16:37 GMT+08:00 Mayur Rustagi mayur.rust...@gmail.com: How many cores do you see on your spark master (8080 port). By default spark application should take all cores when you launch it. Unless you have set max core configuration. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Thu, May 22, 2014 at 4:07 PM, qingyang li liqingyang1...@gmail.com wrote: my aim of setting task number is to increase the query speed, and I have also found mapPartitionsWithIndex at Operator.scala:333 http://192.168.1.101:4040/stages/stage?id=17 is costing much time. so, my another question is : how to tunning mapPartitionsWithIndexhttp://192.168.1.101:4040/stages/stage?id=17 to make the costing time down? 2014-05-22 18:09 GMT+08:00 qingyang li liqingyang1...@gmail.com: i have added SPARK_JAVA_OPTS+=-Dspark. default.parallelism=40 in shark-env.sh, but i find there are only10 tasks on the cluster and 2 tasks each machine. 2014-05-22 18:07 GMT+08:00 qingyang li liqingyang1...@gmail.com : i have added SPARK_JAVA_OPTS+=-Dspark.default.parallelism=40 in shark-env.sh 2014-05-22 17:50 GMT+08:00 qingyang li liqingyang1...@gmail.com : i am using tachyon as storage system and using to shark to query a table which is a bigtable, i have 5 machines as a spark cluster, there are 4 cores on each machine . My question is: 1. how to set task number on each core? 2. where to see how many partitions of one RDD?
Re: how to control task number?
when i using create table bigtable002 tblproperties('shark.cache'=' tachyon') as select * from bigtable001 limit 40; , there will be 4 files created on tachyon. but when i using create table bigtable002 tblproperties('shark.cache'=' tachyon') as select * from bigtable001 ; , there will be 35 files created on tachyon. so, I think spark/shark know how to split files when creating table, spark/shark will partition table into many parts on tatchyon? how spark/shark split table into many parts? could i control it's spliting by setting some configuration ,such as setting map.split.size=64M ? 2014-05-27 16:59 GMT+08:00 qingyang li liqingyang1...@gmail.com: when i using create table bigtable002 tblproperties('shark.cache'=' tachyon') as select * from bigtable001 limit 40; , there will be 4 files created on tachyon. but when i using create table bigtable002 tblproperties('shark.cache'=' tachyon') as select * from bigtable001 ; , there will be 35 files created on tachyon. so, I think spark/shark know how to split files when creating table, could i control it's spliting by setting some configuration ,such as setting map.split.size=64M ? 2014-05-26 12:14 GMT+08:00 qingyang li liqingyang1...@gmail.com: I using create table bigtable002 tblproperties('shark.cache'='tachyon') as select * from bigtable001 to create table bigtable002; while bigtable001 is load from hdfs, it's format is text file , so i think bigtable002's is text. 2014-05-26 11:14 GMT+08:00 Aaron Davidson ilike...@gmail.com: What is the format of your input data, prior to insertion into Tachyon? On Sun, May 25, 2014 at 7:52 PM, qingyang li liqingyang1...@gmail.comwrote: i tried set mapred.map.tasks=30 , it does not work, it seems shark does not support this setting. i also tried SET mapred.max.split.size=6400, it does not work,too. is there other way to control task number in shark CLI ? 2014-05-26 10:38 GMT+08:00 Aaron Davidson ilike...@gmail.com: You can try setting mapred.map.tasks to get Hive to do the right thing. On Sun, May 25, 2014 at 7:27 PM, qingyang li liqingyang1...@gmail.com wrote: Hi, Aaron, thanks for sharing. I am using shark to execute query , and table is created on tachyon. I think i can not using RDD#repartition() in shark CLI; if shark support SET mapred.max.split.size to control file size ? if yes, after i create table, i can control file num, then I can control task number. if not , do anyone know other way to control task number in shark CLI? 2014-05-26 9:36 GMT+08:00 Aaron Davidson ilike...@gmail.com: How many partitions are in your input data set? A possibility is that your input data has 10 unsplittable files, so you end up with 10 partitions. You could improve this by using RDD#repartition(). Note that mapPartitionsWithIndex is sort of the main processing loop for many Spark functions. It is iterating through all the elements of the partition and doing some computation (probably running your user code) on it. You can see the number of partitions in your RDD by visiting the Spark driver web interface. To access this, visit port 8080 on host running your Standalone Master (assuming you're running standalone mode), which will have a link to the application web interface. The Tachyon master also has a useful web interface, available at port 1. On Sun, May 25, 2014 at 5:43 PM, qingyang li liqingyang1...@gmail.com wrote: hi, Mayur, thanks for replying. I know spark application should take all cores by default. My question is how to set task number on each core ? If one silce, one task, how can i set silce file size ? 2014-05-23 16:37 GMT+08:00 Mayur Rustagi mayur.rust...@gmail.com: How many cores do you see on your spark master (8080 port). By default spark application should take all cores when you launch it. Unless you have set max core configuration. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Thu, May 22, 2014 at 4:07 PM, qingyang li liqingyang1...@gmail.com wrote: my aim of setting task number is to increase the query speed, and I have also found mapPartitionsWithIndex at Operator.scala:333 http://192.168.1.101:4040/stages/stage?id=17 is costing much time. so, my another question is : how to tunning mapPartitionsWithIndexhttp://192.168.1.101:4040/stages/stage?id=17 to make the costing time down? 2014-05-22 18:09 GMT+08:00 qingyang li liqingyang1...@gmail.com : i have added SPARK_JAVA_OPTS+=-Dspark. default.parallelism=40 in shark-env.sh, but i find there are only10 tasks on the cluster and 2 tasks each machine. 2014-05-22 18:07 GMT+08:00 qingyang li liqingyang1...@gmail.com : i have added SPARK_JAVA_OPTS+=-Dspark.default.parallelism=40 in shark-env.sh 2014-05-22 17:50 GMT+08:00 qingyang li liqingyang1...@gmail.com: i am using tachyon as storage system and using to shark
too many temporary app files left after app finished
Hi, We use spark 0.9.1 in standalone mode. We found lots of app temporary files didn't get removed in each worker local file system even while the job was finished. These folder have names such as app-20140516120842-0203. These files occupied so many disk storage that we have to run a deamon script to remove them periodically. I thinks this method is ugly. Did anybody run into this issue as well? Is there any conf to delete the app temporary files automatically once job finished? Thanks, Cheney
Re: spark table to hive table
Did you try the Hive Context? Look under Hive Support here: http://people.apache.org/~pwendell/catalyst-docs/sql-programming-guide.html On Tue, May 27, 2014 at 2:09 AM, 정재부 itsjb.j...@samsung.com wrote: Hi all, I'm trying to compare functions available in Spark1.0 hql to original HiveQL. But, when I tested functions such as 'rank', Spark didn't support some HiveQL functions. In case of Shark, it supports functions as well as Hive so I want to convert parquet file, Spark SQL table to Hive Table and analyze it with Shark. Is there any way to do this? Thanks, Kevin _ *Kevin Jung* Assistant Engineer / BDA Lab *T* +82-2-6155-8349 *M* +82-10-9288-1984 *F* +82-2-6155-0251 *E* itsjb.j...@samsung.com www.sds.samsung.com
Re: K-nearest neighbors search in Spark
Any suggestion is very much appreciated. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/K-nearest-neighbors-search-in-Spark-tp6393p6421.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Spark streaming issue
HI, I am facing a weird issue. I am using spark 0.9 and running a streaming application. In the UI, the duration shows order of seconds but if I dig into that particular stage details, it shows total time taken across all tasks for the stage is much much less (in milliseconds) I am using Fair scheduling policy and pool name is counter-metric-persistor. What could the reason for this? *Stage screenshot: Stage 97* 97 counter-metric-persistorhttp://204.77.213.186:4040/stages/pool?poolname=counter-metric-persistor foreach at RealTimeAnalyticsApplication.scala:33http://204.77.213.186:4040/stages/stage?id=972014/05/27 07:22:2314.5 s 6/6 *Stage details screenshot: Stage 97* Details for Stage 97 - *Total task time across all tasks: *154 ms Summary Metrics for 6 Completed Tasks MetricMin 25th percentileMedian 75th percentile Max Result serialization time 0 ms 0 ms 0 ms 0 ms 0 ms Duration 12 ms 13 ms 23 ms 30 ms 54 ms Time spent fetching task results 0 ms 0 ms 0 ms 0 ms 0 ms Scheduler delay 7 ms 7 ms 8 ms 8 ms 8 ms Aggregated Metrics by Executor Executor ID Address Task TimeTotal TasksFailed TasksSucceeded Tasks Shuffle ReadShuffle Write Shuffle Spill (Memory) Shuffle Spill (Disk)0ls230-127-p.nyc0.ls.local:53463199 ms6060.0 B0.0 B0.0 B0.0 B Tasks Task IndexTask ID StatusLocality Level ExecutorLaunch Time DurationGC TimeResult Ser TimeErrors0408SUCCESSPROCESS_LOCALls230-127-p.nyc0.ls.local2014/05/27 07:22:3730 ms 1 411 SUCCESS PROCESS_LOCAL ls230-127-p.nyc0.ls.local 2014/05/27 07:22:3722 ms 2 412 SUCCESS PROCESS_LOCAL ls230-127-p.nyc0.ls.local 2014/05/27 07:22:3723 ms 3 414 SUCCESS PROCESS_LOCAL ls230-127-p.nyc0.ls.local 2014/05/27 07:22:3713 ms 4 415 SUCCESS PROCESS_LOCAL ls230-127-p.nyc0.ls.local 2014/05/27 07:22:3712 ms 5 416 SUCCESS PROCESS_LOCAL ls230-127-p.nyc0.ls.local 2014/05/27 07:22:3754 ms Thanks, -- Sourav Chandra Senior Software Engineer · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · sourav.chan...@livestream.com o: +91 80 4121 8723 m: +91 988 699 3746 skype: sourav.chandra Livestream Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area, Bangalore 560034 www.livestream.com
Re: Computing cosine similiarity using pyspark
Hi Jamal, One nice feature of PySpark is that you can easily use existing functions from NumPy and SciPy inside your Spark code. For a simple example, the following uses Spark's cartesian operation (which combines pairs of vectors into tuples), followed by NumPy's corrcoef to compute the pearson correlation coefficient between every pair of a set of vectors. The vectors are an RDD of numpy arrays. from numpy import array, corrcoef data = sc.parallelize([array([1,2,3]),array([2,4,6.1]),array([3,2,1.1])]) corrs = data.cartesian(data).map(lambda (x,y): corrcoef(x,y)[0,1]).collect() corrs [1.0, 0.0086740991746, -0.99953863896044948 ... This just returns a list of the correlation coefficients, you could also add a key to each array, to keep track of which pair is which data_with_keys = sc.parallelize([(0,array([1,2,3])),(1,array([2,4,6.1])),(2,array([3,2,1.1]))]) corrs_with_keys = data_with_keys.cartesian(data_with_keys).map(lambda ((k1,v1),(k2,v2)): ((k1,k2),corrcoef(v1,v2)[0,1])).collect() corrs_with_keys [((0, 0), 1.0), ((0, 1), 0.0086740991746), ((0, 2), -0.99953863896044948) ... Finally, you could just replace corrcoef in either of the above with scipy.spatial.distance.cosine to get your cosine similarity. Hope that's useful, as Andrei said, the answer partly depends on exactly what you're trying to do. -- Jeremy On Fri, May 23, 2014 at 2:41 PM, Andrei faithlessfri...@gmail.com wrote: Do you need cosine distance and correlation between vectors or between variables (elements of vector)? It would be helpful if you could tell us details of your task. On Thu, May 22, 2014 at 5:49 PM, jamal sasha jamalsha...@gmail.comwrote: Hi, I have bunch of vectors like [0.1234,-0.231,0.23131] and so on. and I want to compute cosine similarity and pearson correlation using pyspark.. How do I do this? Any ideas? Thanks
Re: Spark Summit 2014 (Hotel suggestions)
Hi everyone! Any recommendation anyone? Pierre -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Summit-2014-Hotel-suggestions-tp5457p6424.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Spark On Mesos
Hello,I’ve installed Spark Cluster spark-0.9.0-incubating-bin-hadoop1, which works fine.Also, on the same cluster I’ve installed Mesos cluster, using mesos_0.18.2_x86_64.rpm, which works fine as well.Now,I was trying to followed the instructions from https://spark.apache.org/docs/0.9.0/running-on-mesos.htmland while trying to get JETTY from the web, it can’t find it.I have checked the URL, and obviously I got file not found.How can I overcome/bypass this issue?Here how the errors looks like:[info] Loading project definition from /usr/spark-0.9.0-incubating-bin-hadoop1/project/project[info] Loading project definition from /usr/spark-0.9.0-incubating-bin-hadoop1/project[info] Set current project to root (in build file:/usr/spark-0.9.0-incubating-bin-hadoop1/)[info] Updating {file:/usr/spark-0.9.0-incubating-bin-hadoop1/}core...[info] Resolving org.eclipse.jetty#jetty-http;7.6.8.v20121106 ...[error] Server access Error: Connection timed out url=https://oss.sonatype.org/content/repositories/snapshots/org/eclipse/jetty/jetty-http/7.6.8.v20121106/jetty-http-7.6.8.v20121106.pom... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-On-Mesos-tp6425.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Working with Avro Generic Records in the interactive scala shell
I was able to work around this by switching to the SpecificDatum interface and following this example: https://github.com/massie/spark-parquet-example/blob/master/src/main/scala/com/zenfractal/SerializableAminoAcid.java As in the example, I defined a subclass of my Avro type which implemented the Serializable interface using Avro serialization methods. I also defined a copy constructor which converted from the actual avro type to my subclass. In spark, after reading the Avro file, I ran a map operation to convert from the avro type to my serializable subclass. This worked although I'm not sure its the most efficient solution. Here's a gist of what I run in the console: https://gist.github.com/jlewi/59b0dec90b639d5d568e#file-avrospecific I haven't gotten Kryo registration to work yet but it seems like setting the registrator before launching the console using the environment variable SPARK_JAVA_OPTS might be better than shutting down and restarting the spark context in the console. J On Sat, May 24, 2014 at 11:16 AM, Jeremy Lewi jer...@lewi.us wrote: Hi Josh, Thanks for the help. The class should be on the path on all nodes. Here's what I did: 1) I built a jar from my scala code. 2) I copied that jar to a location on all nodes in my cluster (/usr/local/spark) 3) I edited bin/compute-classpath.sh to add my jar to the class path. 4) I repeated the process with the avro mapreduce jar to provide AvroKey. I doubt this is the best way to set the classpath but it seems to work. J On Sat, May 24, 2014 at 9:26 AM, Josh Marcus jmar...@meetup.com wrote: Jeremy, Just to be clear, are you assembling a jar with that class compiled (with its dependencies) and including the path to that jar on the command line in an environment variable (e.g. SPARK_CLASSPATH=path ./spark-shell)? --j On Saturday, May 24, 2014, Jeremy Lewi jer...@lewi.us wrote: Hi Spark Users, I'm trying to read and process an Avro dataset using the interactive spark scala shell. When my pipeline executes I get the ClassNotFoundException pasted at the end of this email. I'm trying to use the Generic Avro API (not the Specific API). Here's a gist of the commands I'm running in the spark console: https://gist.github.com/jlewi/2c853e0ceee5f00c Here's my registrator for kryo. https://github.com/jlewi/cloud/blob/master/spark/src/main/scala/contrail/AvroGenericRegistrator.scala Any help or suggestions would be greatly appreciated. Thanks Jeremy Here's the log message that is spewed out. 14/05/24 02:00:48 WARN TaskSetManager: Loss was due to java.lang.ClassNotFoundException java.lang.ClassNotFoundException: $line16.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1 at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at
Re: KryoSerializer Exception
I am experiencing the same issue (I tried both using Kryo as serializer and increasing the buffer size up to 256M, my objects are much smaller though). I share my registrator class just in case: https://gist.github.com/JordiAranda/5cc16cf102290c413c82 Any hints would be highly appreciated. Thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/KryoSerializer-Exception-tp5435p6428.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark Summit 2014 (Hotel suggestions)
Go to expedia/orbitz and look for hotels in the union square neighborhood. In my humble opinion having visited San Francisco, it is worth any extra cost to be as close as possible to the conference vs having to travel from other parts of the city. On Tue, May 27, 2014 at 9:36 AM, Gerard Maas gerard.m...@gmail.com wrote: +1 On Tue, May 27, 2014 at 3:22 PM, Pierre B pierre.borckm...@realimpactanalytics.com wrote: Hi everyone! Any recommendation anyone? Pierre -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Summit-2014-Hotel-suggestions-tp5457p6424.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark Summit 2014 (Hotel suggestions)
Hi guys, I ended up reserving a room at the Phoenix (Hotel: http://www.jdvhotels.com/hotels/california/san-francisco-hotels/phoenix-hotel) recommended by my friend who has been in SF. According to Google, it takes 11min to walk to the conference which is not too bad. Hope this helps! Jerry On Tue, May 27, 2014 at 10:35 AM, Gary Malouf malouf.g...@gmail.com wrote: Go to expedia/orbitz and look for hotels in the union square neighborhood. In my humble opinion having visited San Francisco, it is worth any extra cost to be as close as possible to the conference vs having to travel from other parts of the city. On Tue, May 27, 2014 at 9:36 AM, Gerard Maas gerard.m...@gmail.comwrote: +1 On Tue, May 27, 2014 at 3:22 PM, Pierre B pierre.borckm...@realimpactanalytics.com wrote: Hi everyone! Any recommendation anyone? Pierre -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Summit-2014-Hotel-suggestions-tp5457p6424.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Running a spark-submit compatible app in spark-shell
Thanks, Andrew. I'll give it a try. On Mon, May 26, 2014 at 2:22 PM, Andrew Or and...@databricks.com wrote: Hi Roger, This was due to a bug in the Spark shell code, and is fixed in the latest master (and RC11). Here is the commit that fixed it: https://github.com/apache/spark/commit/8edbee7d1b4afc192d97ba192a5526affc464205. Try it now and it should work. :) Andrew 2014-05-26 10:35 GMT+02:00 Perttu Ranta-aho ranta...@iki.fi: Hi Roger, Were you able to solve this? -Perttu On Tue, Apr 29, 2014 at 8:11 AM, Roger Hoover roger.hoo...@gmail.comwrote: Patrick, Thank you for replying. That didn't seem to work either. I see the option parsed using verbose mode. Parsed arguments: ... driverExtraClassPath /Users/rhoover/Work/spark-etl/target/scala-2.10/spark-etl_2.10-1.0.jar But the jar still doesn't show up if I run :cp in the repl and the import still fails. scala import etl._ console:7: error: not found: value etl import etl._ Not sure if this helps, but I noticed with Spark 0.9.1 that the import only seems to work went I add the -usejavacp option to the spark-shell command. I don't really understand why. With the latest code, I tried adding these options to the spark-shell command without success: -usejavacp -Dscala.usejavacp=true On Mon, Apr 28, 2014 at 6:30 PM, Patrick Wendell pwend...@gmail.comwrote: What about if you run ./bin/spark-shell --driver-class-path=/path/to/your/jar.jar I think either this or the --jars flag should work, but it's possible there is a bug with the --jars flag when calling the Repl. On Mon, Apr 28, 2014 at 4:30 PM, Roger Hoover roger.hoo...@gmail.comwrote: A couple of issues: 1) the jar doesn't show up on the classpath even though SparkSubmit had it in the --jars options. I tested this by running :cp in spark-shell 2) After adding it the classpath using (:cp /Users/rhoover/Work/spark-etl/target/scala-2.10/spark-etl_2.10-1.0.jar), it still fails. When I do that in the scala repl, it works. BTW, I'm using the latest code from the master branch (8421034e793c0960373a0a1d694ce334ad36e747) On Mon, Apr 28, 2014 at 3:40 PM, Roger Hoover roger.hoo...@gmail.comwrote: Matei, thank you. That seemed to work but I'm not able to import a class from my jar. Using the verbose options, I can see that my jar should be included Parsed arguments: ... jars /Users/rhoover/Work/spark-etl/target/scala-2.10/spark-etl_2.10-1.0.jar And I see the class I want to load in the jar: jar -tf /Users/rhoover/Work/spark-etl/target/scala-2.10/spark-etl_2.10-1.0.jar | grep IP2IncomeJob etl/IP2IncomeJob$$anonfun$1.class etl/IP2IncomeJob$$anonfun$4.class etl/IP2IncomeJob$.class etl/IP2IncomeJob$$anonfun$splitOverlappingRange$1.class etl/IP2IncomeJob.class etl/IP2IncomeJob$$anonfun$3.class etl/IP2IncomeJob$$anonfun$2.class But the import fails scala import etl.IP2IncomeJob console:10: error: not found: value etl import etl.IP2IncomeJob Any ideas? On Sun, Apr 27, 2014 at 3:46 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Hi Roger, You should be able to use the --jars argument of spark-shell to add JARs onto the classpath and then work with those classes in the shell. (A recent patch, https://github.com/apache/spark/pull/542, made spark-shell use the same command-line arguments as spark-submit). But this is a great question, we should test it out and see whether anything else would make development easier. SBT also has an interactive shell where you can run classes in your project, but unfortunately Spark can’t deal with closures typed directly in that the right way. However you write your Spark logic in a method and just call that method from the SBT shell, that should work. Matei On Apr 27, 2014, at 3:14 PM, Roger Hoover roger.hoo...@gmail.com wrote: Hi, From the meetup talk about the 1.0 release, I saw that spark-submit will be the preferred way to launch apps going forward. How do you recommend launching such jobs in a development cycle? For example, how can I load an app that's expecting to a given to spark-submit into spark-shell? Also, can anyone recommend other tricks for rapid development? I'm new to Scala, sbt, etc. I think sbt can watch for changes in source files and compile them automatically. I want to be able to make code changes and quickly get into a spark-shell to play around with them. I appreciate any advice. Thanks, Roger
Re: Working with Avro Generic Records in the interactive scala shell
Thanks that's super helpful. J On Tue, May 27, 2014 at 8:01 AM, Matt Massie mas...@berkeley.edu wrote: I really should update that blog post. I created a gist (see https://gist.github.com/massie/7224868) which explains a cleaner, more efficient approach. -- Matt http://www.linkedin.com/in/mattmassie/ Massiehttp://www.twitter.com/matt_massie UC, Berkeley AMPLab https://twitter.com/amplab On Tue, May 27, 2014 at 7:18 AM, Jeremy Lewi jer...@lewi.us wrote: I was able to work around this by switching to the SpecificDatum interface and following this example: https://github.com/massie/spark-parquet-example/blob/master/src/main/scala/com/zenfractal/SerializableAminoAcid.java As in the example, I defined a subclass of my Avro type which implemented the Serializable interface using Avro serialization methods. I also defined a copy constructor which converted from the actual avro type to my subclass. In spark, after reading the Avro file, I ran a map operation to convert from the avro type to my serializable subclass. This worked although I'm not sure its the most efficient solution. Here's a gist of what I run in the console: https://gist.github.com/jlewi/59b0dec90b639d5d568e#file-avrospecific I haven't gotten Kryo registration to work yet but it seems like setting the registrator before launching the console using the environment variable SPARK_JAVA_OPTS might be better than shutting down and restarting the spark context in the console. J On Sat, May 24, 2014 at 11:16 AM, Jeremy Lewi jer...@lewi.us wrote: Hi Josh, Thanks for the help. The class should be on the path on all nodes. Here's what I did: 1) I built a jar from my scala code. 2) I copied that jar to a location on all nodes in my cluster (/usr/local/spark) 3) I edited bin/compute-classpath.sh to add my jar to the class path. 4) I repeated the process with the avro mapreduce jar to provide AvroKey. I doubt this is the best way to set the classpath but it seems to work. J On Sat, May 24, 2014 at 9:26 AM, Josh Marcus jmar...@meetup.com wrote: Jeremy, Just to be clear, are you assembling a jar with that class compiled (with its dependencies) and including the path to that jar on the command line in an environment variable (e.g. SPARK_CLASSPATH=path ./spark-shell)? --j On Saturday, May 24, 2014, Jeremy Lewi jer...@lewi.us wrote: Hi Spark Users, I'm trying to read and process an Avro dataset using the interactive spark scala shell. When my pipeline executes I get the ClassNotFoundException pasted at the end of this email. I'm trying to use the Generic Avro API (not the Specific API). Here's a gist of the commands I'm running in the spark console: https://gist.github.com/jlewi/2c853e0ceee5f00c Here's my registrator for kryo. https://github.com/jlewi/cloud/blob/master/spark/src/main/scala/contrail/AvroGenericRegistrator.scala Any help or suggestions would be greatly appreciated. Thanks Jeremy Here's the log message that is spewed out. 14/05/24 02:00:48 WARN TaskSetManager: Loss was due to java.lang.ClassNotFoundException java.lang.ClassNotFoundException: $line16.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1 at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at
Re: Working with Avro Generic Records in the interactive scala shell
Also see this context from February. We started working with Chill to get Avro records automatically registered with Kryo. I'm not sure the final status, but from the Chill PR #172 it looks like this might be much less friction than before. Issue we filed: https://github.com/twitter/chill/issues/171 Pull request that adds an AvroSerializer to Chill: https://github.com/twitter/chill/pull/172 Issue on the old Spark tracker: https://spark-project.atlassian.net/browse/SPARK-746 Matt can you comment if this change helps you streamline that gist even further? Andrew On Tue, May 27, 2014 at 8:49 AM, Jeremy Lewi jer...@lewi.us wrote: Thanks that's super helpful. J On Tue, May 27, 2014 at 8:01 AM, Matt Massie mas...@berkeley.edu wrote: I really should update that blog post. I created a gist (see https://gist.github.com/massie/7224868) which explains a cleaner, more efficient approach. -- Matt http://www.linkedin.com/in/mattmassie/ Massiehttp://www.twitter.com/matt_massie UC, Berkeley AMPLab https://twitter.com/amplab On Tue, May 27, 2014 at 7:18 AM, Jeremy Lewi jer...@lewi.us wrote: I was able to work around this by switching to the SpecificDatum interface and following this example: https://github.com/massie/spark-parquet-example/blob/master/src/main/scala/com/zenfractal/SerializableAminoAcid.java As in the example, I defined a subclass of my Avro type which implemented the Serializable interface using Avro serialization methods. I also defined a copy constructor which converted from the actual avro type to my subclass. In spark, after reading the Avro file, I ran a map operation to convert from the avro type to my serializable subclass. This worked although I'm not sure its the most efficient solution. Here's a gist of what I run in the console: https://gist.github.com/jlewi/59b0dec90b639d5d568e#file-avrospecific I haven't gotten Kryo registration to work yet but it seems like setting the registrator before launching the console using the environment variable SPARK_JAVA_OPTS might be better than shutting down and restarting the spark context in the console. J On Sat, May 24, 2014 at 11:16 AM, Jeremy Lewi jer...@lewi.us wrote: Hi Josh, Thanks for the help. The class should be on the path on all nodes. Here's what I did: 1) I built a jar from my scala code. 2) I copied that jar to a location on all nodes in my cluster (/usr/local/spark) 3) I edited bin/compute-classpath.sh to add my jar to the class path. 4) I repeated the process with the avro mapreduce jar to provide AvroKey. I doubt this is the best way to set the classpath but it seems to work. J On Sat, May 24, 2014 at 9:26 AM, Josh Marcus jmar...@meetup.comwrote: Jeremy, Just to be clear, are you assembling a jar with that class compiled (with its dependencies) and including the path to that jar on the command line in an environment variable (e.g. SPARK_CLASSPATH=path ./spark-shell)? --j On Saturday, May 24, 2014, Jeremy Lewi jer...@lewi.us wrote: Hi Spark Users, I'm trying to read and process an Avro dataset using the interactive spark scala shell. When my pipeline executes I get the ClassNotFoundException pasted at the end of this email. I'm trying to use the Generic Avro API (not the Specific API). Here's a gist of the commands I'm running in the spark console: https://gist.github.com/jlewi/2c853e0ceee5f00c Here's my registrator for kryo. https://github.com/jlewi/cloud/blob/master/spark/src/main/scala/contrail/AvroGenericRegistrator.scala Any help or suggestions would be greatly appreciated. Thanks Jeremy Here's the log message that is spewed out. 14/05/24 02:00:48 WARN TaskSetManager: Loss was due to java.lang.ClassNotFoundException java.lang.ClassNotFoundException: $line16.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1 at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at
Re: K-nearest neighbors search in Spark
Hi Carter, In Spark 1.0 there will be an implementation of k-means available as part of MLLib. You can see the documentation for that below (until 1.0 is fully released). https://people.apache.org/~pwendell/spark-1.0.0-rc9-docs/mllib-clustering.html Maybe diving into the source here will help get you started? https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala Cheers, Andrew On Tue, May 27, 2014 at 4:10 AM, Carter gyz...@hotmail.com wrote: Any suggestion is very much appreciated. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/K-nearest-neighbors-search-in-Spark-tp6393p6421.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Persist and unpersist
I keep bumping into a problem with persisting RDDs. Consider this (silly) example: def everySecondFromBehind(input: RDD[Int]): RDD[Int] = { val count = input.count if (count % 2 == 0) { return input.filter(_ % 2 == 1) } else { return input.filter(_ % 2 == 0) } } The situation is that we want to do two things with an RDD (a count and a filter in the example). The input RDD may represent a very expensive calculation. So it would make sense to add an input.cache() line at the beginning. But where do we put input.unpersist()? input.cache()val count = input.countval result = input.filter(...) input.unpersist()return result input.filter() is lazy, so this does not work as expected. We only want to release input from the cache once nothing depends on it anymore. Maybe result was garbage collected. Maybe result itself has been cached. But there is no way to detect such conditions. Our current approach is to just leave the RDD cached, and it will get dumped at some point anyway. Is there a better solution? Thanks for any tips.
Re: file not found
Thanks for the heads up, I also experienced this issue. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/file-not-found-tp1854p6438.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Persist and unpersist
Daniel, Is SPARK-1103 https://issues.apache.org/jira/browse/SPARK-1103 related to your example? Automatic unpersist()-ing of unreferenced RDDs would be nice. Nick On Tue, May 27, 2014 at 12:28 PM, Daniel Darabos daniel.dara...@lynxanalytics.com wrote: I keep bumping into a problem with persisting RDDs. Consider this (silly) example: def everySecondFromBehind(input: RDD[Int]): RDD[Int] = { val count = input.count if (count % 2 == 0) { return input.filter(_ % 2 == 1) } else { return input.filter(_ % 2 == 0) } } The situation is that we want to do two things with an RDD (a count and a filter in the example). The input RDD may represent a very expensive calculation. So it would make sense to add an input.cache() line at the beginning. But where do we put input.unpersist()? input.cache()val count = input.countval result = input.filter(...) input.unpersist()return result input.filter() is lazy, so this does not work as expected. We only want to release input from the cache once nothing depends on it anymore. Maybe result was garbage collected. Maybe result itself has been cached. But there is no way to detect such conditions. Our current approach is to just leave the RDD cached, and it will get dumped at some point anyway. Is there a better solution? Thanks for any tips.
Re: Akka disassociation on Java SE Embedded
Sorry, to clarify: Spark *does* effectively turn Akka's failure detector off. On Tue, May 27, 2014 at 10:47 AM, Aaron Davidson ilike...@gmail.com wrote: Spark should effectively turn Akka's failure detector off, because we historically had problems with GCs and other issues causing disassociations. The only thing that should cause these messages nowadays is if the TCP connection (which Akka sustains between Actor Systems on different machines) actually drops. TCP connections are pretty resilient, so one common cause of this is actual Executor failure -- recently, I have experienced a similar-sounding problem due to my machine's OOM killer terminating my Executors, such that they didn't produce any error output. On Thu, May 22, 2014 at 9:19 AM, Chanwit Kaewkasi chan...@gmail.comwrote: Hi all, On an ARM cluster, I have been testing a wordcount program with JRE 7 and everything is OK. But when changing to the embedded version of Java SE (Oracle's eJRE), the same program cannot complete all computing stages. It is failed by many Akka's disassociation. - I've been trying to increase Akka's timeout but still stuck. I am not sure what is the right way to do so? (I suspected that GC pausing the world is causing this). - Another question is that how could I properly turn on Akka's logging to see what's the root cause of this disassociation problem? (If my guess about GC is wrong). Best regards, -chanwit -- Chanwit Kaewkasi linkedin.com/in/chanwit
Re: Akka disassociation on Java SE Embedded
Spark should effectively turn Akka's failure detector off, because we historically had problems with GCs and other issues causing disassociations. The only thing that should cause these messages nowadays is if the TCP connection (which Akka sustains between Actor Systems on different machines) actually drops. TCP connections are pretty resilient, so one common cause of this is actual Executor failure -- recently, I have experienced a similar-sounding problem due to my machine's OOM killer terminating my Executors, such that they didn't produce any error output. On Thu, May 22, 2014 at 9:19 AM, Chanwit Kaewkasi chan...@gmail.com wrote: Hi all, On an ARM cluster, I have been testing a wordcount program with JRE 7 and everything is OK. But when changing to the embedded version of Java SE (Oracle's eJRE), the same program cannot complete all computing stages. It is failed by many Akka's disassociation. - I've been trying to increase Akka's timeout but still stuck. I am not sure what is the right way to do so? (I suspected that GC pausing the world is causing this). - Another question is that how could I properly turn on Akka's logging to see what's the root cause of this disassociation problem? (If my guess about GC is wrong). Best regards, -chanwit -- Chanwit Kaewkasi linkedin.com/in/chanwit
proximity of events within the next group of events instead of time
Hi, Spark newbie here with a general question In a stream consisting of several types of events, how can I detect if event X happened within Z transactions of event Y? is it just a matter of iterating thru all the RDDs, when event type Y found, take the next Z transactions and check if there is an event type X? What if the next Z transactions crosses into the next RDD? Thanks. This email is intended solely for the recipient. It may contain privileged, proprietary or confidential information or material. If you are not the intended recipient, please delete this email and any attachments and notify the sender of the error.
Re: Akka disassociation on Java SE Embedded
May be that's explaining mine too. Thank you very much, Aaron !! Best regards, -chanwit -- Chanwit Kaewkasi linkedin.com/in/chanwit On Wed, May 28, 2014 at 12:47 AM, Aaron Davidson ilike...@gmail.com wrote: Spark should effectively turn Akka's failure detector off, because we historically had problems with GCs and other issues causing disassociations. The only thing that should cause these messages nowadays is if the TCP connection (which Akka sustains between Actor Systems on different machines) actually drops. TCP connections are pretty resilient, so one common cause of this is actual Executor failure -- recently, I have experienced a similar-sounding problem due to my machine's OOM killer terminating my Executors, such that they didn't produce any error output. On Thu, May 22, 2014 at 9:19 AM, Chanwit Kaewkasi chan...@gmail.com wrote: Hi all, On an ARM cluster, I have been testing a wordcount program with JRE 7 and everything is OK. But when changing to the embedded version of Java SE (Oracle's eJRE), the same program cannot complete all computing stages. It is failed by many Akka's disassociation. - I've been trying to increase Akka's timeout but still stuck. I am not sure what is the right way to do so? (I suspected that GC pausing the world is causing this). - Another question is that how could I properly turn on Akka's logging to see what's the root cause of this disassociation problem? (If my guess about GC is wrong). Best regards, -chanwit -- Chanwit Kaewkasi linkedin.com/in/chanwit
Running Jars on Spark, program just hanging there
Hi all, I've a single machine with 8 cores and 8g mem. I've deployed the standalone spark on the machine and successfully run the examples. Now I'm trying to write some simple java codes. I just read a local file (23M) into string list and use JavaRDDString rdds = sparkContext.paralellize() method to get the corresponding rdd. And I asked to run rdds.count(). But the program just stopped on the count(). The last log info is: 14/05/27 14:13:16 INFO SparkContext: Starting job: count at RDDTest.java:40 14/05/27 14:13:16 INFO DAGScheduler: Got job 0 (count at RDDTest.java:40) with 2 output partitions (allowLocal=false) 14/05/27 14:13:16 INFO DAGScheduler: Final stage: Stage 0 (count at RDDTest.java:40) 14/05/27 14:13:16 INFO DAGScheduler: Parents of final stage: List() 14/05/27 14:13:16 INFO DAGScheduler: Missing parents: List() 14/05/27 14:13:16 INFO DAGScheduler: Submitting Stage 0 (ParallelCollectionRDD[0] at parallelize at RDDTest.java:37), which has no missing parents 14/05/27 14:13:16 INFO SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20140527141316-0003 14/05/27 14:13:16 INFO AppClient$ClientActor: Executor added: app-20140527141316-0003/0 on worker-20140526221107-spark-35303 (spark:35303) with 8 cores 14/05/27 14:13:16 INFO SparkDeploySchedulerBackend: Granted executor ID app-20140527141316-0003/0 on hostPort spark:35303 with 8 cores, 1024.0 MB RAM 14/05/27 14:13:16 INFO AppClient$ClientActor: Executor updated: app-20140527141316-0003/0 is now RUNNING 14/05/27 14:13:16 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0 (ParallelCollectionRDD[0] at parallelize at RDDTest.java:37) 14/05/27 14:13:16 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks 14/05/27 14:13:17 INFO SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@spark:34279/user/Executor#196489168] with ID 0 14/05/27 14:13:17 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on executor 0: spark (PROCESS_LOCAL) 14/05/27 14:13:17 INFO TaskSetManager: Serialized task 0.0:0 as 12993529 bytes in 127 ms 14/05/27 14:13:17 INFO TaskSetManager: Starting task 0.0:1 as TID 1 on executor 0: spark (PROCESS_LOCAL) 14/05/27 14:13:17 INFO TaskSetManager: Serialized task 0.0:1 as 13006417 bytes in 74 ms 14/05/27 14:13:17 INFO BlockManagerMasterActor$BlockManagerInfo: Registering block manager spark:37617 with 589.2 MB RAM I tried to figure out what's going on, but just can't. Could any please give me some suggestions and point out some possible issues? Best Regards, Min
Re: Broadcast Variables
To answer my own question, that does seem to be the right way. I was concerned about whether the data that a broadcast variable would end up getting serialized if I used it as an instance variable of the function. I realized that doesnt happen because the broadcast variable's value is marked as transient. 1. Http - https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala 2. Torrent - https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala On Thu, May 22, 2014 at 6:58 PM, Puneet Lakhina puneet.lakh...@gmail.comwrote: Hi, Im confused on what is the right way to use broadcast variables from java. My code looks something like this: Map val = //build Map to be broadcast BroadcastMap broadastVar = sc.broadcast(val); sc.textFile(...).map(new SomeFunction()) { //Do something here using broadcastVar } My question is, should I pass the broadcastVar to the SomeFunction as a constructor parameter that it can keep around as an instance variable i.e. sc.textFile(...).map(new SomeFunction(broadcastVar)) { //Do something here using broadcastVar } class SomeFunction extends FunctionT { public SomeFunction(BroadcastMap var) { this.var = var } public T call() { //Do something } } Is above the right way to utilize broadcast Variables when not using anonymous inner classes as functions? -- Regards, Puneet -- Regards, Puneet
Re: Invalid Class Exception
I am running this on a Solaris machine with logical partitions. All the partitions (workers) access the same Spark folder. Thanks, Suman. On 5/23/2014 9:44 PM, Andrew Or wrote: That means not all of your driver and executors have the same version of Spark. Are you on a standalone EC2 cluster? If so, one way to fix this is to run the following on the master node: /root/spark-ec2/copy-dir --delete /root/spark This syncs all of Spark across your cluster, configs, jars and everything. 2014-05-23 15:20 GMT-07:00 Suman Somasundar suman.somasun...@oracle.com mailto:suman.somasun...@oracle.com: Hi, I get the following exception when using Spark to run various programs. java.io.InvalidClassException: org.apache.spark.SerializableWritable; local class incompatible: stream classdesc serialVersionUID = 6301214776158303468, local class serialVersionUID = -7785455416944904980 at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:604) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1601) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1514) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40) at org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:165) at org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:56) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1004) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1866) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1964) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1888) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1964) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1888) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1004) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1866) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1964) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1888) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1964) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1888) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
Re: Invalid Class Exception
On Tue, May 27, 2014 at 1:05 PM, Suman Somasundar suman.somasun...@oracle.com wrote: I am running this on a Solaris machine with logical partitions. All the partitions (workers) access the same Spark folder. Can you check whether you have multiple versions of the offending class (org.apache.spark.SerializableWritable) in the classpath of your apps? Maybe you do and different nodes are loading jars in different order. On 5/23/2014 9:44 PM, Andrew Or wrote: That means not all of your driver and executors have the same version of Spark. Are you on a standalone EC2 cluster? If so, one way to fix this is to run the following on the master node: /root/spark-ec2/copy-dir --delete /root/spark This syncs all of Spark across your cluster, configs, jars and everything. 2014-05-23 15:20 GMT-07:00 Suman Somasundar suman.somasun...@oracle.com: Hi, I get the following exception when using Spark to run various programs. java.io.InvalidClassException: org.apache.spark.SerializableWritable; local class incompatible: stream classdesc serialVersionUID = 6301214776158303468, local class serialVersionUID = -7785455416944904980 at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:604) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1601) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1514) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40) at org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:165) at org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:56) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1004) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1866) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1964) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1888) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1964) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1888) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1004) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1866) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1964) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1888) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1964) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1888) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
Re: Running Jars on Spark, program just hanging there
Does the spark UI show your program running? (http://spark-masterIP:8118). If the program is listed as running you should be able to see details via the UI. In my experience there are 3 sets of logs -- the log where you're running your program (the driver), the log on the master node, and the log on each executor. The master log often has very useful details when one of your slave executors has an issue. Then you can go and read the logs on that machine. Of course, if you have a small number of workers in your cluster you can just read all the logs. That's just general debugging advice... (I also find it useful to do rdd.partitions.size before anything else to check how many partitions the RDD is actually partitioned to...) On Tue, May 27, 2014 at 2:48 PM, Min Li limin...@gmail.com wrote: Hi all, I've a single machine with 8 cores and 8g mem. I've deployed the standalone spark on the machine and successfully run the examples. Now I'm trying to write some simple java codes. I just read a local file (23M) into string list and use JavaRDDString rdds = sparkContext.paralellize() method to get the corresponding rdd. And I asked to run rdds.count(). But the program just stopped on the count(). The last log info is: 14/05/27 14:13:16 INFO SparkContext: Starting job: count at RDDTest.java:40 14/05/27 14:13:16 INFO DAGScheduler: Got job 0 (count at RDDTest.java:40) with 2 output partitions (allowLocal=false) 14/05/27 14:13:16 INFO DAGScheduler: Final stage: Stage 0 (count at RDDTest.java:40) 14/05/27 14:13:16 INFO DAGScheduler: Parents of final stage: List() 14/05/27 14:13:16 INFO DAGScheduler: Missing parents: List() 14/05/27 14:13:16 INFO DAGScheduler: Submitting Stage 0 (ParallelCollectionRDD[0] at parallelize at RDDTest.java:37), which has no missing parents 14/05/27 14:13:16 INFO SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20140527141316-0003 14/05/27 14:13:16 INFO AppClient$ClientActor: Executor added: app-20140527141316-0003/0 on worker-20140526221107-spark-35303 (spark:35303) with 8 cores 14/05/27 14:13:16 INFO SparkDeploySchedulerBackend: Granted executor ID app-20140527141316-0003/0 on hostPort spark:35303 with 8 cores, 1024.0 MB RAM 14/05/27 14:13:16 INFO AppClient$ClientActor: Executor updated: app-20140527141316-0003/0 is now RUNNING 14/05/27 14:13:16 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0 (ParallelCollectionRDD[0] at parallelize at RDDTest.java:37) 14/05/27 14:13:16 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks 14/05/27 14:13:17 INFO SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@spark:34279/user/Executor#196489168] with ID 0 14/05/27 14:13:17 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on executor 0: spark (PROCESS_LOCAL) 14/05/27 14:13:17 INFO TaskSetManager: Serialized task 0.0:0 as 12993529 bytes in 127 ms 14/05/27 14:13:17 INFO TaskSetManager: Starting task 0.0:1 as TID 1 on executor 0: spark (PROCESS_LOCAL) 14/05/27 14:13:17 INFO TaskSetManager: Serialized task 0.0:1 as 13006417 bytes in 74 ms 14/05/27 14:13:17 INFO BlockManagerMasterActor$BlockManagerInfo: Registering block manager spark:37617 with 589.2 MB RAM I tried to figure out what's going on, but just can't. Could any please give me some suggestions and point out some possible issues? Best Regards, Min
Spark 1.0: slf4j version conflicts with pig
I use both Pig and Spark. All my code is built with Maven into a giant *-jar-with-dependencies.jar. I recently upgraded to Spark 1.0 and now all my pig scripts fail with: Caused by: java.lang.RuntimeException: Could not resolve error that occured when launching map reduce job: java.lang.NoSuchMethodError: org.slf4j.spi.LocationAwareLogger.log(Lorg/slf4j/Marker;Ljava/lang/String;ILjava/lang/String;[Ljava/lang/Object;Ljava/lang/Throwable;)V at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher$JobControlThreadExceptionHandler.uncaughtException(MapReduceLauncher.java:598) at java.lang.Thread.dispatchUncaughtException(Thread.java:1874) Did Spark 1.0 change the version of slf4j? I can't seem to find it via mvn dependency:tree
Re: Spark 1.0: slf4j version conflicts with pig
Spark uses 1.7.5, and you should probably see 1.7.{4,5} in use through Hadoop. But those are compatible. That method appears to have been around since 1.3. What version does Pig want? I usually do mvn -Dverbose dependency:tree to see both what the final dependencies are, and what got overwritten, to diagnose things like this. My hunch is that something is depending on an old slf4j in your build and it's overwriting Spark et al. On Tue, May 27, 2014 at 10:45 PM, Ryan Compton compton.r...@gmail.com wrote: I use both Pig and Spark. All my code is built with Maven into a giant *-jar-with-dependencies.jar. I recently upgraded to Spark 1.0 and now all my pig scripts fail with: Caused by: java.lang.RuntimeException: Could not resolve error that occured when launching map reduce job: java.lang.NoSuchMethodError: org.slf4j.spi.LocationAwareLogger.log(Lorg/slf4j/Marker;Ljava/lang/String;ILjava/lang/String;[Ljava/lang/Object;Ljava/lang/Throwable;)V at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher$JobControlThreadExceptionHandler.uncaughtException(MapReduceLauncher.java:598) at java.lang.Thread.dispatchUncaughtException(Thread.java:1874) Did Spark 1.0 change the version of slf4j? I can't seem to find it via mvn dependency:tree
Re: Persist and unpersist
I think what's desired here is for input to be unpersisted automatically as soon as result is materialized. I don't think there's currently a way to do this, but the usual workaround is to force result to be materialized immediately and then unpersist input: input.cache()val count = input.countval result = input.filter(...) result.cache().foreach(x = {}) // materialize resultinput.unpersist() // safe because `result` is materialized // and is the only RDD that depends on `input`return result Ankur http://www.ankurdave.com/
Java RDD structure for Matrix predict?
I've got a trained MatrixFactorizationModel via ALS.train(...) and now I'm trying to use it to predict some ratings like so: JavaRDDRating predictions = model.predict(usersProducts.rdd()) Where usersProducts is built from an existing Ratings dataset like so: JavaPairRDDInteger,Integer usersProducts = testRatings.map( new PairFunctionRating, Integer, Integer() { public Tuple2Integer, Integer call(Rating r) throws Exception { return new Tuple2Integer, Integer(r.user(), r.product()); } } ); The problem is that model.predict(...) doesn't like usersProducts, claiming that the method doesn't accept an RDD of type Tuple2 however the docs show the method signature as follows: def predict(usersProducts: RDD[(Int, Int)]): RDD[Rating] Am I missing something? The JavaRDD is just a list of Tuple2 elements, which would match the method signature but the compile is complaining. Thanks!
Re: Java RDD structure for Matrix predict?
Hi Sandeep I think you should use testRatings.mapToPair instead of testRatings.map. So the code should be JavaPairRDDInteger,Integer usersProducts = training.mapToPair( new PairFunctionRating, Integer, Integer() { public Tuple2Integer, Integer call(Rating r) throws Exception { return new Tuple2Integer, Integer(r.user(), r.product()); } } ); It works on my side. Wisely Chen On Wed, May 28, 2014 at 6:27 AM, Sandeep Parikh sand...@clusterbeep.orgwrote: I've got a trained MatrixFactorizationModel via ALS.train(...) and now I'm trying to use it to predict some ratings like so: JavaRDDRating predictions = model.predict(usersProducts.rdd()) Where usersProducts is built from an existing Ratings dataset like so: JavaPairRDDInteger,Integer usersProducts = testRatings.map( new PairFunctionRating, Integer, Integer() { public Tuple2Integer, Integer call(Rating r) throws Exception { return new Tuple2Integer, Integer(r.user(), r.product()); } } ); The problem is that model.predict(...) doesn't like usersProducts, claiming that the method doesn't accept an RDD of type Tuple2 however the docs show the method signature as follows: def predict(usersProducts: RDD[(Int, Int)]): RDD[Rating] Am I missing something? The JavaRDD is just a list of Tuple2 elements, which would match the method signature but the compile is complaining. Thanks!
Re: K-nearest neighbors search in Spark
Carter, Just as a quick simple starting point for Spark. (caveats - lots of improvements reqd for scaling, graceful and efficient handling of RDD et al): import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import scala.collection.immutable.ListMap import scala.collection.immutable.SortedMap object TopK { // def getCurrentDirectory = new java.io.File( . ).getCanonicalPath // def distance(x1:List[Int],x2:List[Int]):Double = { val dist:Double = math.sqrt(math.pow(x1(1)-x2(1),2) + math.pow(x1(2)-x2( 2),2)) dist } // def main(args: Array[String]): Unit = { // println(getCurrentDirectory) val sc = new SparkContext(local,TopK, spark://USS-Defiant.local:7077) println(sRunning Spark Version ${sc.version}) val file = sc.textFile(data01.csv) // val data = file .map(line = line.split(,)) .map(x1 = List(x1(0).toInt,x1(1).toInt,x1(2).toInt)) //val data1 = data.collect println(data) for (d - data) { println(d) println(d(0)) } // val distList = for (d - data) yield {d(0)} //for (d - distList) (println(d)) val zipList = for (a - distList.collect; b - distList.collect) yield{ List( a,b)} zipList.foreach(println(_)) // val dist = for (l - zipList) yield { println(s${l(0)} = ${l(1)}) val x1a:Array[List[Int]] = data.filter(d = d(0) == l(0)).collect val x2a:Array[List[Int]] = data.filter(d = d(0) == l(1)).collect val x1:List[Int] = x1a(0) val x2:List[Int] = x2a(0) val dist = distance(x1,x2) Map ( dist - l ) } dist.foreach(println(_)) // sort this for topK // } } data01.csv 1,68,93 2,12,90 3,45,76 4,86,54 HTH. Cheers k/ On Tue, May 27, 2014 at 4:10 AM, Carter gyz...@hotmail.com wrote: Any suggestion is very much appreciated. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/K-nearest-neighbors-search-in-Spark-tp6393p6421.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Spark Memory Bounds
I'm trying to determine how to bound my memory use in a job working with more data than can simultaneously fit in RAM. From reading the tuning guide, my impression is that Spark's memory usage is roughly the following: (A) In-Memory RDD use + (B) In memory Shuffle use + (C) Transient memory used by all currently running tasks I can bound A with spark.storage.memoryFraction and I can bound B with spark.shuffle.memoryFraction. I'm wondering how to bound C. It's been hinted at a few times on this mailing list that you can reduce memory use by increasing the number of partitions. That leads me to believe that the amount of transient memory is roughly follows: total_data_set_size/number_of_partitions * number_of_tasks_simultaneously_running_per_machine Does this sound right? In other words, as I increase the number of partitions, the size of each partition will decrease, and since each task is processing a single partition and there are a bounded number of tasks in flight, my memory use has a rough upper limit. Keith
Re: Re: spark table to hive table
Title: Samsung Enterprise Portal mySingle I already tried HiveContext as well as SqlContext. Butitseems that Spark'sHiveContext is not completely same as Apache Hive. For example, SQL like 'SELECT RANK() OVER(ORDER BY VAL1 ASC) FROM TEST LIMIT 10' works fine in Apache Hive, butSpark's Hive Context has an error. That's why I want to use Shark or Apache Hive in special cases. The followings are error message on Spark. java.lang.RuntimeException:Unsupported language features in query: SELECT RANK() OVER(order by val1) FROM TEST LIMIT 10TOK_QUERY TOK_FROM TOK_TABREF TOK_TABNAME TEST TOK_INSERT TOK_DESTINATION TOK_DIR TOK_TMP_FILE TOK_SELECT TOK_SELEXPR TOK_FUNCTION RANK TOK_WINDOWSPEC TOK_PARTITIONINGSPEC TOK_ORDERBY TOK_TABSORTCOLNAMEASC TOK_TABLE_OR_COL v1 TOK_LIMIT 10 at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.hive.HiveQl$.parseSql(HiveQl.scala:236) at org.apache.spark.sql.hive.HiveContext.hiveql(HiveContext.scala:81) at org.apache.spark.sql.hive.HiveContext.hql(HiveContext.scala:90) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:18) at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:23) at $iwC$$iwC$$iwC$$iwC.init(console:25) at $iwC$$iwC$$iwC.init(console:27) at $iwC$$iwC.init(console:29) at $iwC.init(console:31) at init(console:33) at .init(console:37) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:788) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1056) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:614) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:645) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:609) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:796) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:841) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:753) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:601) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:608) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:611) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:936) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:884) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:983) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:292) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) --- Original Message --- Sender : John Omernikj...@omernik.com Date : 2014-05-27 19:28 (GMT+09:00) Title : Re: spark table to hive table Did you try the Hive Context? Look under Hive Support here: http://people.apache.org/~pwendell/catalyst-docs/sql-programming-guide.html On Tue, May 27, 2014 at 2:09 AM, 정재부 itsjb.j...@samsung.com wrote: Hi all, I'm trying tocomparefunctionsavailable in Spark1.0 hqlto original HiveQL. But, when I testedfunctions such as 'rank', Spark didn't support some HiveQL functions. In case of Shark, it supports functions as well as Hive so I want to convert parquet file, SparkSQL table to Hive Table and analyze it with Shark. Is there any way to do this? Thanks, Kevin _ Kevin Jung AssistantEngineer/BDA Lab T+82-2-6155-8349 M +82-10-9288-1984 F +82-2-6155-0251 E itsjb.j...@samsung.com
AMPCamp Training materials are broken due to overwritten AMIs?
Hi, Has anyone had luck going through previous archives of the AMPCamp exercises? Many of the archived bootcamps seem to be broken due to the fact that it references the same AMIs that is constantly being updated, which means that it is no longer compatible with the old bootcamp instructions or the surrounding scripts. I'm not sure why they don't create separate AMIs so that it doesn't get overwritten. Their naming convention seem to indicate that was their intention, but all of them refer to http://s3.amazonaws.com/ampcamp-amis/latest-ampcamp3; Why do I want to use previous bootcamp? Beyond the fact that they cover slightly different materials, it looks like the latest one is yet again broken due to changes introduced in the AMIs (specifically the MLlib exercise). Has anyone else has similar issues? -Toshi
Re: Spark Memory Bounds
Keith, do you mean bound as in (a) strictly control to some quantifiable limit, or (b) try to minimize the amount used by each task? If a, then that is outside the scope of Spark's memory management, which you should think of as an application-level (that is, above JVM) mechanism. In this scope, Spark voluntarily tracks and limits the amount of memory it uses for explicitly known data structures, such as RDDs. What Spark cannot do is, e.g., control or manage the amount of JVM memory that a given piece of user code might take up. For example, I might write some closure code that allocates a large array of doubles unbeknownst to Spark. If b, then your thinking is in the right direction, although quite imperfect, because of things like the example above. We often experience OOME if we're not careful with job partitioning. What I think Spark needs to evolve to is at least to include a mechanism for application-level hints about task memory requirements. We might work on this and submit a PR for it. -- Christopher T. Nguyen Co-founder CEO, Adatao http://adatao.com linkedin.com/in/ctnguyen On Tue, May 27, 2014 at 5:33 PM, Keith Simmons ke...@pulse.io wrote: I'm trying to determine how to bound my memory use in a job working with more data than can simultaneously fit in RAM. From reading the tuning guide, my impression is that Spark's memory usage is roughly the following: (A) In-Memory RDD use + (B) In memory Shuffle use + (C) Transient memory used by all currently running tasks I can bound A with spark.storage.memoryFraction and I can bound B with spark.shuffle.memoryFraction. I'm wondering how to bound C. It's been hinted at a few times on this mailing list that you can reduce memory use by increasing the number of partitions. That leads me to believe that the amount of transient memory is roughly follows: total_data_set_size/number_of_partitions * number_of_tasks_simultaneously_running_per_machine Does this sound right? In other words, as I increase the number of partitions, the size of each partition will decrease, and since each task is processing a single partition and there are a bounded number of tasks in flight, my memory use has a rough upper limit. Keith
Re: Spark Memory Bounds
A dash of both. I want to know enough that I can reason about, rather than strictly control, the amount of memory Spark will use. If I have a big data set, I want to understand how I can design it so that Spark's memory consumption falls below my available resources. Or alternatively, if it's even possible for Spark to process a data set over a certain size. And if I run into memory problems, I want to know which knobs to turn, and how turning those knobs will affect memory consumption. It's my understanding that between certain key stages in a Spark DAG (i.e. group by stages), Spark will serialize all data structures necessary to continue the computation at the next stage, including closures. So in theory, per machine, Spark only needs to hold the transient memory required to process the partitions assigned to the currently active tasks. Is my understanding correct? Specifically, once a key/value pair is serialized in the shuffle stage of a task, are the references to the raw java objects released before the next task is started. On Tue, May 27, 2014 at 6:21 PM, Christopher Nguyen c...@adatao.com wrote: Keith, do you mean bound as in (a) strictly control to some quantifiable limit, or (b) try to minimize the amount used by each task? If a, then that is outside the scope of Spark's memory management, which you should think of as an application-level (that is, above JVM) mechanism. In this scope, Spark voluntarily tracks and limits the amount of memory it uses for explicitly known data structures, such as RDDs. What Spark cannot do is, e.g., control or manage the amount of JVM memory that a given piece of user code might take up. For example, I might write some closure code that allocates a large array of doubles unbeknownst to Spark. If b, then your thinking is in the right direction, although quite imperfect, because of things like the example above. We often experience OOME if we're not careful with job partitioning. What I think Spark needs to evolve to is at least to include a mechanism for application-level hints about task memory requirements. We might work on this and submit a PR for it. -- Christopher T. Nguyen Co-founder CEO, Adatao http://adatao.com linkedin.com/in/ctnguyen On Tue, May 27, 2014 at 5:33 PM, Keith Simmons ke...@pulse.io wrote: I'm trying to determine how to bound my memory use in a job working with more data than can simultaneously fit in RAM. From reading the tuning guide, my impression is that Spark's memory usage is roughly the following: (A) In-Memory RDD use + (B) In memory Shuffle use + (C) Transient memory used by all currently running tasks I can bound A with spark.storage.memoryFraction and I can bound B with spark.shuffle.memoryFraction. I'm wondering how to bound C. It's been hinted at a few times on this mailing list that you can reduce memory use by increasing the number of partitions. That leads me to believe that the amount of transient memory is roughly follows: total_data_set_size/number_of_partitions * number_of_tasks_simultaneously_running_per_machine Does this sound right? In other words, as I increase the number of partitions, the size of each partition will decrease, and since each task is processing a single partition and there are a bounded number of tasks in flight, my memory use has a rough upper limit. Keith