Re: How to close resources shared in executor?
Of course, I could create a connection in val result = rdd.map(line = { val conf = HBaseConfiguration.create val connection = HConnectionManager.createConnection(conf) val table = connection.getTable(user) ... table.close() connection.close() } but that would be too slow, which is also the reason I share conf and connection in Utilobject. Maybe I did need a shutdown hook as the Javadoc says. Thank you! 2014-10-17 12:18 GMT+08:00 Ted Yu yuzhih...@gmail.com: Looking at Apache 0.98 code, you can follow the example in the class javadoc (line 144 of HConnectionManager.java): * HTableInterface table = connection.getTable(table1); * try { * // Use the table as needed, for a single operation and a single thread * } finally { * table.close(); * connection.close(); * } Cheers On Thu, Oct 16, 2014 at 9:03 PM, Fengyun RAO raofeng...@gmail.com wrote: Thanks, Ted, We use CDH 5.1 and the HBase version is 0.98.1-cdh5.1.0, in which the javadoc of HConnectionManager.java still recommends shutdown hook. I look into val table = Util.Connection.getTable(user), and find it didn't invoke public HTable(Configuration conf, final byte[] tableName, final ExecutorService pool) but public HTable(TableName tableName, final HConnection connection, final ExecutorService pool) throws IOException { if (connection == null || connection.isClosed()) { throw new IllegalArgumentException(Connection is null or closed.); } this.tableName = tableName; this.cleanupPoolOnClose = this.cleanupConnectionOnClose = false; this.connection = connection; this.configuration = connection.getConfiguration(); this.pool = pool; this.finishSetup(); } in which cleanupConnectionOnClose is false 2014-10-16 22:51 GMT+08:00 Ted Yu yuzhih...@gmail.com: Which hbase release are you using ? Let me refer to 0.94 code hbase. Take a look at the following method in src/main/java/org/apache/hadoop/hbase/client/HTable.java : public void close() throws IOException { ... if (cleanupConnectionOnClose) { if (this.connection != null) { this.connection.close(); When Connection.getTable() is called, the following is invoked: public HTable(Configuration conf, final byte[] tableName, final ExecutorService pool) which sets cleanupConnectionOnClose to true. w.r.t. javadoc, the paragraph on shutdown hook is in HConnectionManager.java of 0.94 You don't need to use shutdown hook for 0.94+ Cheers On Wed, Oct 15, 2014 at 11:41 PM, Fengyun RAO raofeng...@gmail.com wrote: I may have misunderstood your point. val result = rdd.map(line = { val table = Util.Connection.getTable(user) ... table.close() } Did you mean this is enough, and there’s no need to call Util.Connection.close(), or HConnectionManager.deleteAllConnections()? Where is the documentation that statesHconnectionManager would release underlying connection automatically? If that’s true, maybe the Javadoc which recommends a shutdown hook needs update 2014-10-16 14:20 GMT+08:00 Fengyun RAO raofeng...@gmail.com: Thanks, Ted. Util.Connection.close() should be called only once, so it can NOT be in a map function val result = rdd.map(line = { val table = Util.Connection.getTable(user) ... Util.Connection.close() } As you mentioned: Calling table.close() is the recommended approach. HConnectionManager does reference counting. When all references to the underlying connection are gone, connection would be released. Yes, we should call table.close(), but it won’t remove HConnection in HConnectionManager which is a HConnection pool. As I look into the HconnectionManager Javadoc, it seems I have to implement a shutdown hook * pCleanup used to be done inside in a shutdown hook. On startup we'd * register a shutdown hook that called {@link #deleteAllConnections()} * on its way out but the order in which shutdown hooks run is not defined so * were problematic for clients of HConnection that wanted to register their * own shutdown hooks so we removed ours though this shifts the onus for * cleanup to the client. 2014-10-15 22:31 GMT+08:00 Ted Yu yuzhih...@gmail.com: Pardon me - there was typo in previous email. Calling table.close() is the recommended approach. HConnectionManager does reference counting. When all references to the underlying connection are gone, connection would be released. Cheers On Wed, Oct 15, 2014 at 7:13 AM, Ted Yu yuzhih...@gmail.com wrote: Have you tried the following ? val result = rdd.map(line = { val table = Util.Connection.getTable(user) ... Util.Connection.close() } On Wed, Oct 15, 2014 at 6:09 AM, Fengyun RAO raofeng...@gmail.com wrote: In order to share an HBase connection pool, we create an object Object Util { val HBaseConf = HBaseConfiguration.create val Connection= HConnectionManager.createConnection(HBaseConf) } which would be
Re: Join with large data set
Hi Ankur, If your rdds have common keys, you can look at partitioning both your datasets using a custom partitioner based on keys so that you can avoid shuffling and optimize join performance. HTH Best Regards, Sonal Nube Technologies http://www.nubetech.co http://in.linkedin.com/in/sonalgoyal On Fri, Oct 17, 2014 at 4:27 AM, Ankur Srivastava ankur.srivast...@gmail.com wrote: Hi, I have a rdd which is my application data and is huge. I want to join this with reference data which is also huge to fit in-memory and thus I do not want to use Broadcast variable. What other options do I have to perform such joins? I am using Cassandra as my data store, so should I just query cassandra to get the reference data needed? Also when I join two rdds, will it result in rdd scan or would spark do a hash partition on the two rdds to get the data with same keys on same node? Thanks Ankur
RE: error when maven build spark 1.1.0 with message You have 1 Scalastyle violation
Found the root cause: [SPARK-3372] [MLlib] MLlib doesn't pass maven build / checkstyle due ... ...to multi-byte character contained in Gradient.scala Author: Kousuke Saruta saru...@oss.nttdata.co.jp Closes #2248https://github.com/apache/spark/pull/2248 from sarutak/SPARK-3372 and squashes the following commits: 73a28b8https://github.com/apache/spark/commit/73a28b8edf79a6aae7260e514d5b68a438e34ba1 [Kousuke Saruta] Replaced UTF-8 hyphen with ascii hyphen (cherry picked from commit 1bed0a3https://github.com/apache/spark/commit/1bed0a3869a526241381d2a74ba064e5b3721336) Signed-off-by: Xiangrui Meng m...@databricks.com From: MA33 YTHung1 Sent: Friday, October 17, 2014 1:10 PM To: user@spark.apache.org Subject: RE: error when maven build spark 1.1.0 with message You have 1 Scalastyle violation Hi All, Another piece of information, somehow the Gradient.scala throws an exception input length = 2: error file=D:\tools\spark-1.1.0\mllib\src\main\scala\org\apache\spark\mllib\optimization\Gradient.scala message=Input length = 2 Best regards, Henry Hung From: MA33 YTHung1 Sent: Friday, October 17, 2014 1:05 PM To: user@spark.apache.orgmailto:user@spark.apache.org Subject: error when maven build spark 1.1.0 with message You have 1 Scalastyle violation Hi All, I'm using windows 8.1 to build spark 1.1.0 using this command: C:\apache-maven-3.0.5\bin\mvn -Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0 -DskipTests clean package -e Below is the error message: [ERROR] Failed to execute goal org.scalastyle:scalastyle-maven-plugin:0.4.0:check (default) on project spark-mllib_2.10: Failed during scalastyle execution: You have 1 Scalastyle violation(s). - [Help 1] org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute goal org.scalastyle:scalastyle-maven-plugin:0.4.0:check (default) on project spark-mllib_2.10: Failed during scalastyle execution at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:217) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145) at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:84) at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:59) at org.apache.maven.lifecycle.internal.LifecycleStarter.singleThreadedBuild(LifecycleStarter.java:183) at org.apache.maven.lifecycle.internal.LifecycleStarter.execute(LifecycleStarter.java:161) at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:320) at org.apache.maven.DefaultMaven.execute(DefaultMaven.java:156) at org.apache.maven.cli.MavenCli.execute(MavenCli.java:537) at org.apache.maven.cli.MavenCli.doMain(MavenCli.java:196) at org.apache.maven.cli.MavenCli.main(MavenCli.java:141) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced(Launcher.java:290) at org.codehaus.plexus.classworlds.launcher.Launcher.launch(Launcher.java:230) at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode(Launcher.java:409) at org.codehaus.plexus.classworlds.launcher.Launcher.main(Launcher.java:352) Caused by: org.apache.maven.plugin.MojoExecutionException: Failed during scalastyle execution at org.scalastyle.maven.plugin.ScalastyleViolationCheckMojo.performCheck(ScalastyleViolationCheckMojo.java:238) at org.scalastyle.maven.plugin.ScalastyleViolationCheckMojo.execute(ScalastyleViolationCheckMojo.java:199) at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:101) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:209) ... 19 more Caused by: org.apache.maven.plugin.MojoFailureException: You have 1 Scalastyle violation(s). at org.scalastyle.maven.plugin.ScalastyleViolationCheckMojo.performCheck(ScalastyleViolationCheckMojo.java:230) ... 22 more [ERROR] [ERROR] [ERROR] For more information about the errors and possible solutions, please read the following articles: [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException [ERROR] [ERROR] After correcting the problems, you can resume the build with the command [ERROR] mvn goals -rf :spark-mllib_2.10 Best regards, Henry The privileged confidential information contained in this email is intended for use only by the addressees as indicated by the original sender of
Re: Can's create Kafka stream in spark shell
This is how you deal with deduplicate errors: libraryDependencies ++= Seq( (org.apache.spark % spark-streaming_2.10 % 1.1.0 % provided). *exclude(org.eclipse.jetty.orbit, javax.transaction).* *exclude(org.eclipse.jetty.orbit, javax.mail).* *exclude(org.eclipse.jetty.orbit, javax.activation).* *exclude(com.esotericsoftware.minlog, minlog).* *exclude(commons-beanutils, commons-beanutils-core).* *exclude(commons-logging, commons-logging).* *exclude(commons-collections, commons-collections).* *exclude(org.eclipse.jetty.orbit, javax.servlet)* ) Thanks Best Regards On Fri, Oct 17, 2014 at 2:53 AM, Gary Zhao garyz...@gmail.com wrote: Same error. I saw someone reported the same issue, e.g. http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-kafka-error-td9106.html Should I use sbt assembly? It failed for deduplicate though. error] (*:assembly) deduplicate: different file contents found in the following: [error] /Users/gzhao/.ivy2/cache/org.eclipse.jetty.orbit/javax.transaction/orbits/javax.transaction-1.1.1.v201105210645.jar:META-INF/ECLIPSEF.RSA [error] /Users/gzhao/.ivy2/cache/org.eclipse.jetty.orbit/javax.servlet/orbits/javax.servlet-3.0.0.v201112011016.jar:META-INF/ECLIPSEF.RSA [error] /Users/gzhao/.ivy2/cache/org.eclipse.jetty.orbit/javax.mail.glassfish/orbits/javax.mail.glassfish-1.4.1.v201005082020.jar:META-INF/ECLIPSEF.RSA [error] /Users/gzhao/.ivy2/cache/org.eclipse.jetty.orbit/javax.activation/orbits/javax.activation-1.1.0.v201105071233.jar:META-INF/ECLIPSEF.RSA [error] Total time: 4 s, completed Oct 16, 2014 1:58:41 PM On Thu, Oct 16, 2014 at 12:11 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Can you try: sbt: name := Simple Project version := 1.1 scalaVersion := 2.10.4 libraryDependencies ++= Seq( org.apache.spark %% spark-core % 1.1.0, org.apache.spark %% spark-streaming % 1.1.0, org.apache.spark %% spark-streaming-kafka % 1.1.0 ) Thanks Best Regards On Fri, Oct 17, 2014 at 12:36 AM, Gary Zhao garyz...@gmail.com wrote: Thanks Akhil. I tried spark-submit and saw the same issue. I double checked the versions and they look ok. Are you seeing any obvious issues? sbt: name := Simple Project version := 1.1 scalaVersion := 2.10.4 libraryDependencies ++= Seq( org.apache.spark %% spark-core % 1.1.0, org.apache.spark %% spark-streaming % 1.1.0, org.apache.spark %% spark-streaming-kafka % 1.1.0, org.apache.kafka %% kafka % 0.8.0 ) spark-1.1.0-bin-hadoop1/bin/spark-submit --class main.scala.SimpleApp --master local[2] simple-project_2.10-1.1.jar --jars spark-streaming-kafka_2.10-1.1.0.jar,kafka_2.10-0.8.0.jar Exception in thread main java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils$ at main.scala.SimpleApp$delayedInit$body.apply(SimpleApp.scala:15) at scala.Function0$class.apply$mcV$sp(Function0.scala:40) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32) at scala.App$class.main(App.scala:71) at main.scala.SimpleApp$.main(SimpleApp.scala:11) at main.scala.SimpleApp.main(SimpleApp.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtils$ at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 17 more On Tue, Oct 14, 2014 at 12:05 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Just make sure you have the same version of spark-streaming-kafka http://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka_2.10 jar and spark in your classpath. Thanks Best Regards On Tue, Oct 14, 2014 at 9:02 AM, Gary Zhao garyz...@gmail.com wrote: Hello I'm trying to connect kafka in spark shell, but failed as below. Could you take a look what I missed. scala val kafkaStream = KafkaUtils.createStream(ssc, test-vip.snc1:2181, test_spark, Map(user-test-1)) error: bad symbolic reference. A
rdd caching and use thereof
I'm trying to understand two things about how spark is working. (1) When I try to cache an rdd that fits well within memory (about 60g with about 600g of memory), I get seemingly random levels of caching, from around 60% to 100%, given the same tuning parameters. What governs how much of an RDD gets cached when there is enough memory? (2) Even when cached, when I run some tasks over the data, I get various locality states. Sometimes it works perfectly, with everything PROCESS_LOCAL, and sometimes I get 10-20% of the data on locality ANY (and the task takes minutes instead of seconds); often this will vary if I run the task twice in a row in the same shell. Is there anything I can do to affect this? I tried caching with replication, but that caused everything to run out of memory nearly instantly (with the same 60g data set in 4-600g of memory) Thanks for the help, -Nathan -- Nathan Kronenfeld Senior Visualization Developer Oculus Info Inc 2 Berkeley Street, Suite 600, Toronto, Ontario M5A 4J5 Phone: +1-416-203-3003 x 238 Email: nkronenf...@oculusinfo.com
Re: rdd caching and use thereof
Oh, I forgot - I've set the following parameters at the moment (besides the standard location, memory, and core setup): spark.logConf true spark.shuffle.consolidateFiles true spark.ui.port 4042 spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec spark.shuffle.file.buffer.kb 500 spark.speculation true On Fri, Oct 17, 2014 at 2:46 AM, Nathan Kronenfeld nkronenf...@oculusinfo.com wrote: I'm trying to understand two things about how spark is working. (1) When I try to cache an rdd that fits well within memory (about 60g with about 600g of memory), I get seemingly random levels of caching, from around 60% to 100%, given the same tuning parameters. What governs how much of an RDD gets cached when there is enough memory? (2) Even when cached, when I run some tasks over the data, I get various locality states. Sometimes it works perfectly, with everything PROCESS_LOCAL, and sometimes I get 10-20% of the data on locality ANY (and the task takes minutes instead of seconds); often this will vary if I run the task twice in a row in the same shell. Is there anything I can do to affect this? I tried caching with replication, but that caused everything to run out of memory nearly instantly (with the same 60g data set in 4-600g of memory) Thanks for the help, -Nathan -- Nathan Kronenfeld Senior Visualization Developer Oculus Info Inc 2 Berkeley Street, Suite 600, Toronto, Ontario M5A 4J5 Phone: +1-416-203-3003 x 238 Email: nkronenf...@oculusinfo.com -- Nathan Kronenfeld Senior Visualization Developer Oculus Info Inc 2 Berkeley Street, Suite 600, Toronto, Ontario M5A 4J5 Phone: +1-416-203-3003 x 238 Email: nkronenf...@oculusinfo.com
how to submit multiple jar files when using spark-submit script in shell?
Hi, i using the comma separated style for submit multiple jar files in the follow shell but it does not work: bin/spark-submit --class org.apache.spark.examples.mllib.JavaKMeans --master yarn-cluster --execur-memory 2g *--jars lib/spark-examples-1.0.2-hadoop2.2.0.jar,lib/spark-mllib_2.10-1.0.0.jar *hdfs://master:8000/srcdata/kmeans 8 4 Thanks! -- WangHaihua
key class requirement for PairedRDD ?
Dear all, Is it possible to use any kind of object as key in a PairedRDD. When I use a case class key, the groupByKey operation don't behave as I expected. I want to use a case class to avoid using a large tuple as it is easier to manipulate. Cheers, Jaonary
Re: key class requirement for PairedRDD ?
We use our custom classes which are Serializable and have well defined hashcode and equals methods through the Java API. Whats the issue you are getting? Best Regards, Sonal Nube Technologies http://www.nubetech.co http://in.linkedin.com/in/sonalgoyal On Fri, Oct 17, 2014 at 12:28 PM, Jaonary Rabarisoa jaon...@gmail.com wrote: Dear all, Is it possible to use any kind of object as key in a PairedRDD. When I use a case class key, the groupByKey operation don't behave as I expected. I want to use a case class to avoid using a large tuple as it is easier to manipulate. Cheers, Jaonary
Re: key class requirement for PairedRDD ?
Here what I'm trying to do. My case class is the following : case class PersonID(id: String, group: String, name: String) I want to use PersonID as a key in a PairedRDD. But I think the default equal function don't fit to my need because two PersonID(a,a,a) are not the same. When I use a tuple (String, String, String) as a key it's working. On Fri, Oct 17, 2014 at 9:03 AM, Sonal Goyal sonalgoy...@gmail.com wrote: We use our custom classes which are Serializable and have well defined hashcode and equals methods through the Java API. Whats the issue you are getting? Best Regards, Sonal Nube Technologies http://www.nubetech.co http://in.linkedin.com/in/sonalgoyal On Fri, Oct 17, 2014 at 12:28 PM, Jaonary Rabarisoa jaon...@gmail.com wrote: Dear all, Is it possible to use any kind of object as key in a PairedRDD. When I use a case class key, the groupByKey operation don't behave as I expected. I want to use a case class to avoid using a large tuple as it is easier to manipulate. Cheers, Jaonary
MLlib and pyspark features
Hello, I would like to use areaUnderROC from MLlib in Apache Spark. I am currently running Spark 1.1.0 and this function is not available in pyspark but is available in scala. Is there a feature tracker that tracks the advancement of porting Scala apis to Python apis? I have tried to search in the official jira but I could not find any ticket number corresponding to this. Best, poiuytrez -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLlib-and-pyspark-features-tp16667.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: MLlib linking error Mac OS X
Hello MLnick, Have you found a solution on how to install MLlib for Mac OS ? I have also some trouble to install the dependencies. Best, poiuytrez -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLlib-linking-error-Mac-OS-X-tp588p16668.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
why do RDD's partitions migrate between worker nodes in different iterations
Dear all, In my test programer, there are 3 partitions for each RDD, the iteration procedure is as follows: var rdd_0 = ... // init for (...) { *rdd_1* = *rdd_0*.reduceByKey(...).partitionBy(p) // calculate rdd_1 from rdd_0 *rdd_0* = *rdd_0*.partitionBy(p).join(*rdd_1*)... // update rdd_0 by rdd_1 *rdd_0*./action/() } I thought rdd_0 and rdd_1 are part by the same partitioner, and their corresponding partitions are on the same node. for example, rdd_0's partition_0 and rdd_1's partiiton_0 are on the same node in each iteration. But in fact, rdd_0's partition_0 changes its location between workers. Any way to make rdd_0 and rdd_1's partitions not changing their locations, and their corresponding partitions are on the same node for fast join() ? Best Regards, randy -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/why-do-RDD-s-partitions-migrate-between-worker-nodes-in-different-iterations-tp16669.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How does reading the data from Amazon S3 works?
Hi, I have seen in the video from Spark summit that usually (when I use HDFS) are data distributed across the whole cluster and usually computations goes to the data. My question is how does it work when I read the data from Amazon S3? Is the whole input dataset readed by the master node and then distributed to the slave nodes? Or does master node only determine which slave should read what and then the reading is performed independently by each of the slaves? Thank you in advance for the clarification. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to assure that there will be run only one map per cluster node?
hi, I have cluster that has several nodes and every node has several cores. I'd like to run multi-core algorithm within every map. So I'd like to assure that there will be performed only one map per cluster node. Is there some way, how to assure this? It seems to me that it should be possible by spark.task.cpus as it is described at https://spark.apache.org/docs/latest/configuration.html, but it's not clear to me if the value is total number of CPUs per cluster or CPUs per cluster node? Thank you in advance for any help and suggestions. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Gracefully stopping a Spark Streaming application
Hi all, I have a Spark Streaming application running on a cluster, deployed with the spark-submit script. I was reading here that it's possible to gracefully shutdown the application in order to allow the deployment of a new one: http://spark.apache.org/docs/latest/streaming-programming-guide.html#deploying-applications Question is: how can I call StreamingContext.stop(...) http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.StreamingContext from outside the application? Thanks a lot, Max -- Massimiliano Tomassi
Re: Help required on exercise Data Exploratin using Spark SQL
Hi, When I run given Spark SQL commands in the exercise, it returns with unexpected results. I'm explaining the results below for quick reference: 1. The output of query : wikiData.count() shows some count in the file. 2. after running following query: sqlContext.sql(SELECT username, COUNT(*) AS cnt FROM wikiData WHERE username '' GROUP BY username ORDER BY cnt DESC LIMIT 10).collect().foreach(println) I get output like below. Couple of last lines of this output is shown here. It doesn't show the actual results of query. I tried increasing the driver memory as suggested in the exercise, however, id doesn't work. The output is almost same. 14/10/17 15:29:39 INFO executor.Executor: Finished task 199.0 in stage 2.0 (TID 401). 2170 bytes result sent to driver 14/10/17 15:29:39 INFO executor.Executor: Finished task 198.0 in stage 2.0 (TID 400). 2170 bytes result sent to driver 14/10/17 15:29:39 INFO scheduler.TaskSetManager: Finished task 198.0 in stage 2.0 (TID 400) in 13 ms on localhost (199/200) 14/10/17 15:29:39 INFO scheduler.TaskSetManager: Finished task 199.0 in stage 2.0 (TID 401) in 10 ms on localhost (200/200) 14/10/17 15:29:39 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 14/10/17 15:29:39 INFO scheduler.DAGScheduler: Stage 2 (takeOrdered at basicOperators.scala:171) finished in 1.296 s 14/10/17 15:29:39 INFO spark.SparkContext: Job finished: takeOrdered at basicOperators.scala:171, took 3.150021634 s 3. I tried some other Spark SQL commands as below: *sqlContext.sql(SELECT username FROM wikiData LIMIT 10).collect().foreach(println)* *output is* : [[B@787cf559] [[B@53cfe3db] [[B@757869d9] [[B@346d61cf] [[B@793077ec] [[B@5d11651c] [[B@21054100] [[B@5fee77ef] [[B@21041d1d] [[B@15136bda] *sqlContext.sql(SELECT * FROM wikiData LIMIT 10).collect().foreach(println)* *output is *: [12140913,[B@1d74e696,1394582048,[B@65ce90f5,[B@5c8ef90a] [12154508,[B@2e802eff,1393177457,[B@618d7f32,[B@1099dda7] [12165267,[B@65a70774,1398418319,[B@38da84cf,[B@12454f32] [12184073,[B@45264fd,1395243737,[B@3d642042,[B@7881ec8a] [12194348,[B@19d095d5,1372914018,[B@4d1ce030,[B@22c296dd] [12212394,[B@153e98ff,1389794332,[B@40ae983e,[B@68d2f9f] [12224899,[B@1f317315,1396830262,[B@677a77b2,[B@19487c31] [12240745,[B@65d181ee,1389890826,[B@1da9647b,[B@5c03d673] [12258034,[B@7ff44736,1385050943,[B@7e6f6bda,[B@4511f60f] [12279301,[B@1e317636,1382277991,[B@4147e2b6,[B@56753c35] I'm sure the about output of the queries is not the correct content of parquet file.. I'm not able to read the content of parquet file directly. How to validate the output of these queries with the actual content in the parquet file. What is the workaround for this issue. How to read the file through Spark SQL. Is there a need to change the queries? What changes can be made in the queries to get the exact result. Regards, Neeraj -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Help-required-on-exercise-Data-Exploratin-using-Spark-SQL-tp16569p16673.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
What's wrong with my spark filter? I get org.apache.spark.SparkException: Task not serializable
Hi, Probably I am missing very simple principle , but something is wrong with my filter, i get org.apache.spark.SparkException: Task not serializable expetion. here is my filter function: object OBJ { def f1(): Boolean = { var i = 1; for (j-1 to 10) i = i +1; true; } } rdd.filter(row = OBJ.f1()) And when I run, I get the following exception: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) at org.apache.spark.rdd.RDD.filter(RDD.scala:282) ... Caused by: java.io.NotSerializableException: org.apache.spark.SparkConf at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) ... best, /Shahab
Re: What's wrong with my spark filter? I get org.apache.spark.SparkException: Task not serializable
It might be due to the object is nested within some class which may not be serializable. Also you can run the appluication using this jvm parameter to see detailed info about serialization -Dsun.io.serialization.extendedDebugInfo=true On Fri, Oct 17, 2014 at 4:07 PM, shahab shahab.mok...@gmail.com wrote: Hi, Probably I am missing very simple principle , but something is wrong with my filter, i get org.apache.spark.SparkException: Task not serializable expetion. here is my filter function: object OBJ { def f1(): Boolean = { var i = 1; for (j-1 to 10) i = i +1; true; } } rdd.filter(row = OBJ.f1()) And when I run, I get the following exception: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) at org.apache.spark.rdd.RDD.filter(RDD.scala:282) ... Caused by: java.io.NotSerializableException: org.apache.spark.SparkConf at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) ... best, /Shahab -- Sourav Chandra Senior Software Engineer · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · sourav.chan...@livestream.com o: +91 80 4121 8723 m: +91 988 699 3746 skype: sourav.chandra Livestream Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area, Bangalore 560034 www.livestream.com
Optimizing pairwise similarity computation or how to avoid RDD.cartesian operation ?
Hi all, I need to compute a similiarity between elements of two large sets of high dimensional feature vector. Naively, I create all possible pair of vectors with * features1.cartesian(features2)* and then map the produced paired rdd with my similarity function. The problem is that the cartesian operation takes a lot times, more time that computing the similarity itself. If I save each of my feature vector into disk, form a list of file name pair and compute the similarity by reading the files it runs significantly much faster. Any ideas will be helpful, Cheers, Jao
Designed behavior when master is unreachable.
Hi all, I am running a standalone spark cluster with a single master. No HA or failover is configured explicitly (no ZooKeeper etc). What is the default designed behavior for submission of new jobs when a single master went down or became unreachable? I couldn't find it documented anywhere. Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Designed-behavior-when-master-is-unreachable-tp16677.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Regarding using spark sql with yarn
Hi, I have been using spark sql with yarn. It works fine with yarn-client mode, but with yarn-cluster mode, we are facing 2 issues. Is yarn-cluster mode not recommended for spark-sql using hiveContext ?? *Problem #1* We are not able to use any query with very simple filtering operation like, where as just select x,y,x works. select x,y,z from table1 == works on yarn-client as well as yarn-cluster mode select x,y,z from table1 where z 10 == works on yarn-client but NOT on yarn-cluster mode. Exception says Unsupported Feature tracing to HiveQL.scala It is quite strange in the sense that code is expected to be same in parsing the sql. *Problem #2* On another machine, we have configured Hive, there we are getting this issue : java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:344) at org.apache.spark.sql.hive.HiveContext.init(HiveContext.scala:278) Caused by: javax.jdo.JDOFatalInternalException: Error creating transactional connection factory NestedThrowables: java.lang.reflect.InvocationTargetException at org.datanucleus.api.jdo.NucleusJDOHelper.getJDOException ForNucleusException(NucleusJDOHelper.java:587) at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.freezeConfiguration(JDOPersistenceManagerFactory.java:788) at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.createPersistenceManagerFactory(JDOPersistenceManagerFactory.java:333) at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.getPersistenceManagerFactory(JDOPersistenceManagerFactory.java:202) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.lang.reflect.Method.invoke(Unknown Source) at javax.jdo.JDOHelper$16.run(JDOHelper.java:1965) at java.security.AccessController.doPrivileged(Native Method) at javax.jdo.JDOHelper.invoke(JDOHelper.java:1960) at javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1166) at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:808) at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:701) at org.apache.hadoop.hive.metastore.ObjectStore.getPMF(ObjectStore.java:309) at org.apache.hadoop.hive.metastore.ObjectStore.getPersistenceManager(ObjectStore.java:338) at org.apache.hadoop.hive.metastore.ObjectStore.initialize(ObjectStore.java:247) at org.apache.hadoop.hive.metastore.ObjectStore.setConf(ObjectStore.java:222) at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73) at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133) at org.apache.hadoop.hive.metastore.RawStoreProxy.init(RawStoreProxy.java:58) at org.apache.hadoop.hive.metastore.RawStoreProxy.getProxy(RawStoreProxy.java:67) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.newRawStore(HiveMetaStore.java:498) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMS(HiveMetaStore.java:476) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(HiveMetaStore.java:524) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:398) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:357) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.init(RetryingHMSHandler.java:54) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(RetryingHMSHandler.java:59) at org.apache.hadoop.hive.metastore.HiveMetaStore.newHMSHandler(HiveMetaStore.java:4948) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.init(HiveMetaStoreClient.java:171) ... 31 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) at java.lang.reflect.Constructor.newInstance(Unknown Source) at org.datanucleus.plugin.NonManagedPluginRegistry.createExecutableExtension(NonManagedPluginRegistry.java:631) at org.datanucleus.plugin.PluginManager.createExecutableExtension(PluginManager.java:325) at org.datanucleus.store.AbstractStoreManager.registerConnectionFactory(AbstractStoreManager.java:282) at org.datanucleus.store.AbstractStoreManager.init(AbstractStoreManager.java:240) at org.datanucleus.store.rdbms.RDBMSStoreManager.init(RDBMSStoreManager.java:286) at
Re: Optimizing pairwise similarity computation or how to avoid RDD.cartesian operation ?
Cartesian joins of large datasets are usually going to be slow. If there is a way you can reduce the problem space to make sure you only join subsets with each other, that may be helpful. Maybe if you explain your problem in more detail, people on the list can come up with more suggestions. Best Regards, Sonal Nube Technologies http://www.nubetech.co http://in.linkedin.com/in/sonalgoyal On Fri, Oct 17, 2014 at 4:13 PM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, I need to compute a similiarity between elements of two large sets of high dimensional feature vector. Naively, I create all possible pair of vectors with * features1.cartesian(features2)* and then map the produced paired rdd with my similarity function. The problem is that the cartesian operation takes a lot times, more time that computing the similarity itself. If I save each of my feature vector into disk, form a list of file name pair and compute the similarity by reading the files it runs significantly much faster. Any ideas will be helpful, Cheers, Jao
Re: ALS implicit error pyspark
Hi, Thanks a lot for your reply. It is true that python API has default parameters except ranks(the default iterations is 5). At the very beginning, in order to estimate the speed of ALS.trainImplicit, I used ALS.trainImplicit(ratings, rank, 1) and it worked. So I tried ALS with more iterations, for example, ALS.trainImplicit(ratings, rank, 10) and it didn't work. After several test, I found only iterations = 1 works for pyspark. But for scala, all the value works. Best Gen Davies Liu-2 wrote On Thu, Oct 16, 2014 at 9:53 AM, Gen lt; gen.tang86@ gt; wrote: Hi, I am trying to use ALS.trainImplicit method in the pyspark.mllib.recommendation. However it didn't work. So I tried use the example in the python API documentation such as: /r1 = (1, 1, 1.0) r2 = (1, 2, 2.0) r3 = (2, 1, 2.0) ratings = sc.parallelize([r1, r2, r3]) model = ALS.trainImplicit(ratings, 1) / It didn't work neither. After searching in google, I found that there are only two overloads for ALS.trainImplicit in the scala script. So I tried /model = ALS.trainImplicit(ratings, 1, 1)/, it worked. But if I set the iterations other than 1, /model = ALS.trainImplicit(ratings, 1, 2)/ or /model = ALS.trainImplicit(ratings, 4, 2)/ for example, it generated error. The information is as follows: The Python API has default values for all other arguments, so you should call with only rank=1 (no default iterations in Scala). I'm curious that how can you meet this problem? count at ALS.scala:314 Job aborted due to stage failure: Task 6 in stage 189.0 failed 4 times, most recent failure: Lost task 6.3 in stage 189.0 (TID 626, ip-172-31-35-239.ec2.internal): com.esotericsoftware.kryo.KryoException: java.lang.ArrayStoreException: scala.collection.mutable.HashSet Serialization trace: shouldSend (org.apache.spark.mllib.recommendation.OutLinkBlock) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43) com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133) org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137) org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158) scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) org.apache.spark.rdd.RDD.iterator(RDD.scala:227) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) Driver stacktrace: It is really strange, because count at ALS.scala:314 is already out the loop of iterations. Any idea? Thanks a lot for advance. FYI: I used spark 1.1.0 and ALS.train() works pretty well for all the cases. -- View this message in context:
Re: ALS implicit error pyspark
Hi, Today, I tried again with the following code, but it didn't work... Could you please tell me your running environment? /from pyspark.mllib.recommendation import ALS from pyspark import SparkContext sc = SparkContext() r1 = (1, 1, 1.0) r2 = (1, 2, 2.0) r3 = (2, 1, 2.0) ratings = sc.parallelize([r1, r2, r3]) model = ALS.trainImplicit(ratings, 1) / I used spark-ec2 to create a 5 slaves cluster(I did some modifications on spark_ec.py, but the cluster is well launched and configured). And I found that the task failed when one slave node try to take the second task on count at ALS.scala:314 . I will take a look at the log and try to find the problem. Best Gen Davies Liu-2 wrote I can run the following code against Spark 1.1 sc = SparkContext() r1 = (1, 1, 1.0) r2 = (1, 2, 2.0) r3 = (2, 1, 2.0) ratings = sc.parallelize([r1, r2, r3]) model = ALS.trainImplicit(ratings, 1) Davies On Thu, Oct 16, 2014 at 2:45 PM, Davies Liu lt; davies@ gt; wrote: Could you post the code that have problem with pyspark? thanks! Davies On Thu, Oct 16, 2014 at 12:27 PM, Gen lt; gen.tang86@ gt; wrote: I tried the same data with scala. It works pretty well. It seems that it is the problem of pyspark. In the console, it shows the following logs: Traceback (most recent call last): File stdin , line 1, in module * File /root/spark/python/pyspark/mllib/recommendation.py, line 76, in trainImplicit 14/10/16 19:22:44 WARN scheduler.TaskSetManager: Lost task 4.3 in stage 975.0 (TID 1653, ip-172-31-35-240.ec2.internal): TaskKilled (killed intentionally) ratingBytes._jrdd, rank, iterations, lambda_, blocks, alpha)* File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError14/10/16 19:22:44 WARN scheduler.TaskSetManager: Lost task 8.2 in stage 975.0 (TID 1650, ip-172-31-35-241.ec2.internal): TaskKilled (killed intentionally) : An error occurred while calling o32.trainImplicitALSModel. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 975.0 failed 4 times, most recent failure: Lost task 6.3 in stage 975.0 (TID 1651, ip-172-31-35-237.ec2.internal): com.esotericsoftware.kryo.KryoException: java.lang.ArrayStoreException: scala.collection.mutable.HashSet Serialization trace: shouldSend (org.apache.spark.mllib.recommendation.OutLinkBlock) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43) com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133) org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137) org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158) scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
Re: Submission to cluster fails (Spark SQL; NoSuchMethodError on SchemaRDD)
For posterity's sake, I solved this. The problem was the Cloudera cluster I was submitting to is running 1.0, and I was compiling against the latest 1.1 release. Downgrading to 1.0 on my compile got me past this. On Tue, Oct 14, 2014 at 6:08 PM, Michael Campbell michael.campb...@gmail.com wrote: Hey all, I'm trying a very basic spark SQL job and apologies as I'm new to a lot of this, but I'm getting this failure: Exception in thread main java.lang.NoSuchMethodError: org.apache.spark.sql.SchemaRDD.take(I)[Lorg/apache/spark/sql/catalyst/expressions/Row; I've tried a variety of uber-jar creation, but it always comes down to this. Right now my jar has *NO* dependencies on other jars (other than Spark itself), and my uber jar contains essentially only the .class file of my own code. My own code simply reads a parquet file and does a count(*) on the contents. I'm sure this is very basic, but I'm at a loss. Thoughts and/or debugging tips welcome. Here's my code, which I call from the main object which passes in the context, Parquet file name, and some arbitrary sql, (which for this is just select count(*) from flows). I have run the equivalent successfully in spark-shell in the cluster. def readParquetFile(sparkCtx: SparkContext, dataFileName: String, sql: String) = { val sqlContext = new SQLContext(sparkCtx) val parquetFile = sqlContext.parquetFile(dataFileName) parquetFile.registerAsTable(flows) println(sAbout to run $sql) val start = System.nanoTime() val countRDD = sqlContext.sql(sql) val rows: Array[Row] = countRDD.take(1) // DIES HERE val stop = System.nanoTime() println(sresult: ${rows(0)}) println(sQuery took ${(stop - start) / 1e9} seconds.) println(sQuery was $sql) }
What is akka-actor_2.10-2.2.3-shaded-protobuf.jar?
Hello, My SBT pulls in, among others, the following dependency for Spark 1.1.0: akka-actor_2.10-2.2.3-shaded-protobuf.jar What is this? How is this different from the regular Akka Actor JAR? How do I reconcile with other libs that use Akka, such as Play? Thanks! Best, Oliver Oliver Ruebenacker | Solutions Architect Altisource(tm) 290 Congress St, 7th Floor | Boston, Massachusetts 02210 P: (617) 728-5582 | ext: 275585 oliver.ruebenac...@altisource.commailto:oliver.ruebenac...@altisource.com | www.Altisource.com *** This email message and any attachments are intended solely for the use of the addressee. If you are not the intended recipient, you are prohibited from reading, disclosing, reproducing, distributing, disseminating or otherwise using this transmission. If you have received this message in error, please promptly notify the sender by reply email and immediately delete this message from your system. This message and any attachments may contain information that is confidential, privileged or exempt from disclosure. Delivery of this message to any person other than the intended recipient is not intended to waive any right or privilege. Message transmission is not guaranteed to be secure or free of software viruses. ***
Strange duplicates in data when scaling up
Issue was solved by clearing hashmap and hashset at the beginning of the call method. From: Jacob Maloney [mailto:jmalo...@conversantmedia.com] Sent: Thursday, October 16, 2014 5:09 PM To: user@spark.apache.org Subject: Strange duplicates in data when scaling up I have a flatmap function that shouldn't possibly emit duplicates and yet it does. The output of my function is a HashSet so the function itself cannot output duplicates and yet I see many copies of keys emmited from it (in one case up to 62). The curious thing is I can't get this to happen until I ramp up the size of the input lines to about 100,000. For example: (3587005221,[[(80530632,0.20824391739360665)], [(80530632,0.20824391739360665)]]) Will expand to (3587005221,(80530632,0.37312230565577803)) (3587005221,(80530632,0.37312230565577803)) (3587005221,(80530632,0.37312230565577803)) . . . (3587005221,(80530632,0.37312230565577803)) 62 total times If I run this line only as input I only get the one line of output as expected. It seems to be a scaling up issue. My code is as follows: JavaPairRDDLong,IterableIterableTuple2Integer,Double preAggData = indidKeyedJoinedData.groupByKey(); JavaPairRDDLong,Tuple2Integer,Double aggregatedData = preAggData.flatMapToPair(new AggregateLikeSims()); Where: static class AggregateLikeSims implements PairFlatMapFunctionTuple2Long,IterableIterableTuple2Integer,Double, Long,Tuple2Integer,Double{ HashSetTuple2Long, Tuple2Integer, Double output = new HashSetTuple2Long, Tuple2Integer, Double(); MapInteger,ListDouble intermediateMap = new HashMapInteger,ListDouble(); IteratorTuple2Integer,Double intIterator; Tuple2Integer,Double currentTuple; Double MAX_RECO_VALUE = 1.0; IteratorIterableTuple2Integer,Double itIterator; AccumulatorInteger accum; @Override public IterableTuple2Long, Tuple2Integer, Double call(Tuple2Long,IterableIterableTuple2Integer,Double inTuple){ itIterator = inTuple._2.iterator(); while(itIterator.hasNext()){ intIterator = itIterator.next().iterator(); while(intIterator.hasNext()){ currentTuple = intIterator.next(); if (intermediateMap.containsKey(currentTuple._1)){ intermediateMap.get(currentTuple._1).add(currentTuple._2); } else { ListDouble listOfDoubles = new ArrayListDouble(); listOfDoubles.add(currentTuple._2); intermediateMap.put(currentTuple._1, listOfDoubles); } } } IteratorMap.EntryInteger,ListDouble it = intermediateMap.entrySet().iterator(); while (it.hasNext()) { Map.EntryInteger,ListDouble pairs = it.next(); if (pairs.getValue().size() 1) { output.add(new Tuple2Long, Tuple2Integer, Double(inTuple._1,new Tuple2Integer,Double(pairs.getKey(),aggregate(pairs.getValue(); } else { output.add(new Tuple2Long, Tuple2Integer, Double(inTuple._1,new Tuple2Integer,Double(pairs.getKey(),pairs.getValue().get(0; } it.remove(); } return output; } private double aggregate(ListDouble simsList) { if (simsList == null) { return 0;
Re: Join with large data set
Hi Sonal Thank you for the response but since we are joining to reference data different partitions of application data would need to join with same reference data and thus I am not sure if spark join would be a good fit for this. Eg out application data has person with zip code and then the reference data has attributes of zip code (city, state etc), so person objects in different partitions in spark cluster may be referring to same zip and if I partition our application data by zip there will be a lot of shuffling and then latter for our application code we would have to repatriation with another key and another shuffling of whole application data. I think it will not be a good idea. Thanks Ankur On Oct 16, 2014 11:06 PM, Sonal Goyal sonalgoy...@gmail.com wrote: Hi Ankur, If your rdds have common keys, you can look at partitioning both your datasets using a custom partitioner based on keys so that you can avoid shuffling and optimize join performance. HTH Best Regards, Sonal Nube Technologies http://www.nubetech.co http://in.linkedin.com/in/sonalgoyal On Fri, Oct 17, 2014 at 4:27 AM, Ankur Srivastava ankur.srivast...@gmail.com wrote: Hi, I have a rdd which is my application data and is huge. I want to join this with reference data which is also huge to fit in-memory and thus I do not want to use Broadcast variable. What other options do I have to perform such joins? I am using Cassandra as my data store, so should I just query cassandra to get the reference data needed? Also when I join two rdds, will it result in rdd scan or would spark do a hash partition on the two rdds to get the data with same keys on same node? Thanks Ankur
Re: Help required on exercise Data Exploratin using Spark SQL
Looks like this data was encoded with an old version of Spark SQL. You'll need to set the flag to interpret binary data as a string. More info on configuration can be found here: http://spark.apache.org/docs/latest/sql-programming-guide.html#configuration sqlContext.sql(set spark.sql.parquet.binaryAsString=true) Michael On Fri, Oct 17, 2014 at 6:32 AM, neeraj neeraj_gar...@infosys.com wrote: Hi, When I run given Spark SQL commands in the exercise, it returns with unexpected results. I'm explaining the results below for quick reference: 1. The output of query : wikiData.count() shows some count in the file. 2. after running following query: sqlContext.sql(SELECT username, COUNT(*) AS cnt FROM wikiData WHERE username '' GROUP BY username ORDER BY cnt DESC LIMIT 10).collect().foreach(println) I get output like below. Couple of last lines of this output is shown here. It doesn't show the actual results of query. I tried increasing the driver memory as suggested in the exercise, however, id doesn't work. The output is almost same. 14/10/17 15:29:39 INFO executor.Executor: Finished task 199.0 in stage 2.0 (TID 401). 2170 bytes result sent to driver 14/10/17 15:29:39 INFO executor.Executor: Finished task 198.0 in stage 2.0 (TID 400). 2170 bytes result sent to driver 14/10/17 15:29:39 INFO scheduler.TaskSetManager: Finished task 198.0 in stage 2.0 (TID 400) in 13 ms on localhost (199/200) 14/10/17 15:29:39 INFO scheduler.TaskSetManager: Finished task 199.0 in stage 2.0 (TID 401) in 10 ms on localhost (200/200) 14/10/17 15:29:39 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 14/10/17 15:29:39 INFO scheduler.DAGScheduler: Stage 2 (takeOrdered at basicOperators.scala:171) finished in 1.296 s 14/10/17 15:29:39 INFO spark.SparkContext: Job finished: takeOrdered at basicOperators.scala:171, took 3.150021634 s 3. I tried some other Spark SQL commands as below: *sqlContext.sql(SELECT username FROM wikiData LIMIT 10).collect().foreach(println)* *output is* : [[B@787cf559] [[B@53cfe3db] [[B@757869d9] [[B@346d61cf] [[B@793077ec] [[B@5d11651c] [[B@21054100] [[B@5fee77ef] [[B@21041d1d] [[B@15136bda] *sqlContext.sql(SELECT * FROM wikiData LIMIT 10).collect().foreach(println)* *output is *: [12140913,[B@1d74e696,1394582048,[B@65ce90f5,[B@5c8ef90a] [12154508,[B@2e802eff,1393177457,[B@618d7f32,[B@1099dda7] [12165267,[B@65a70774,1398418319,[B@38da84cf,[B@12454f32] [12184073,[B@45264fd,1395243737,[B@3d642042,[B@7881ec8a] [12194348,[B@19d095d5,1372914018,[B@4d1ce030,[B@22c296dd] [12212394,[B@153e98ff,1389794332,[B@40ae983e,[B@68d2f9f] [12224899,[B@1f317315,1396830262,[B@677a77b2,[B@19487c31] [12240745,[B@65d181ee,1389890826,[B@1da9647b,[B@5c03d673] [12258034,[B@7ff44736,1385050943,[B@7e6f6bda,[B@4511f60f] [12279301,[B@1e317636,1382277991,[B@4147e2b6,[B@56753c35] I'm sure the about output of the queries is not the correct content of parquet file.. I'm not able to read the content of parquet file directly. How to validate the output of these queries with the actual content in the parquet file. What is the workaround for this issue. How to read the file through Spark SQL. Is there a need to change the queries? What changes can be made in the queries to get the exact result. Regards, Neeraj -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Help-required-on-exercise-Data-Exploratin-using-Spark-SQL-tp16569p16673.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: ALS implicit error pyspark
Hi, I created an issue in JIRA. https://issues.apache.org/jira/browse/SPARK-3990 https://issues.apache.org/jira/browse/SPARK-3990 I uploaded the error information in JIRA. Thanks in advance for your help. Best Gen Davies Liu-2 wrote It seems a bug, Could you create a JIRA for it? thanks! Davies On Thu, Oct 16, 2014 at 12:27 PM, Gen lt; gen.tang86@ gt; wrote: I tried the same data with scala. It works pretty well. It seems that it is the problem of pyspark. In the console, it shows the following logs: Traceback (most recent call last): File stdin , line 1, in module * File /root/spark/python/pyspark/mllib/recommendation.py, line 76, in trainImplicit 14/10/16 19:22:44 WARN scheduler.TaskSetManager: Lost task 4.3 in stage 975.0 (TID 1653, ip-172-31-35-240.ec2.internal): TaskKilled (killed intentionally) ratingBytes._jrdd, rank, iterations, lambda_, blocks, alpha)* File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError14/10/16 19:22:44 WARN scheduler.TaskSetManager: Lost task 8.2 in stage 975.0 (TID 1650, ip-172-31-35-241.ec2.internal): TaskKilled (killed intentionally) : An error occurred while calling o32.trainImplicitALSModel. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 975.0 failed 4 times, most recent failure: Lost task 6.3 in stage 975.0 (TID 1651, ip-172-31-35-237.ec2.internal): com.esotericsoftware.kryo.KryoException: java.lang.ArrayStoreException: scala.collection.mutable.HashSet Serialization trace: shouldSend (org.apache.spark.mllib.recommendation.OutLinkBlock) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43) com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133) org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137) org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158) scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) org.apache.spark.rdd.RDD.iterator(RDD.scala:227) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) at
bug with MapPartitions?
Hello, Maybe there is something I do not get to understand, but I believe this code should not throw any serialization error when I run this in the spark shell. Using similar code with map instead of mapPartitions works just fine. import java.io.BufferedInputStream import java.io.FileInputStream import com.testing.DataPacket val inStream = new BufferedInputStream(new FileInputStream(inputFile)) val p = new DataPacket(inStream) val c = Array(p) val myfunc[T](iter: Iterator[T]) : Iterator[String] = { var res = List[String]() while (iter.hasNext) { val cur = iter.next; res .::= () } res.iterator } var r = sc.parallelize(c).mapPartitions(myfunc).collect() This throws the following: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597) ... ... Caused by: java.io.NotSerializableException: java.io.BufferedInputStream at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) ... ... Why is this code failing? The constructor of DataPacket just reads data, but does not keep any reference to the BufferedInputStream. Note that this is not the real code, but a simplification while trying to isolate the cause of the error I get. Using map on this instead of MapPartitions works fine. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/bug-with-MapPartitions-tp16689.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Folding an RDD in order
Thank you for sharing this Cheng! This is fantastic. I was able to implement it and it seems like it's working quite well. I'm definitely on the right track now! I'm still having a small problem with the rows inside each partition being out of order - but I suspect this is because in the code currently, I sortByKey, then use RangePartitioner (which I think does not maintain row order within each partition - due to the shuffle in RangePartitioner). I suspect I can work around this by doing operations in these order: - RangePartitioner - mapValues to sort each partition in memory, maintaining partitioning - aggregate Michael On Thu, Oct 16, 2014 at 12:35 PM, Cheng Lian lian.cs@gmail.com wrote: RDD.aggregate doesn’t require the RDD elements to be pairs, so you don’t need to use user_id to be the key or the RDD. For example, you can use an empty Map as the zero value of the aggregation. The key of the Map is the user_id you extracted from each tuple, and the value is the aggregated value. keyByTimestamp.aggregate(Map.empty[String, Float].withDefaultValue(0.0))({ (agg, rec) = val (time, (user, amount)) = rec agg.updated(user, agg(user) + amount) }, { (lhs, rhs) = lhs.keys.foldLeft(rhs) { (combined, user) = combined.updated(user, lhs(user) + rhs(user)) } }) Of course, you may use mutable Map for optimized performance. One thing to notice, foldByKey is a transformation, while aggregate is an action. The final result of the code above is a single Map object rather than an RDD. If this map can be very large (say you have billions of users), then aggregate may OOM. On 10/17/14 12:01 AM, Michael Misiewicz wrote: Thanks for the suggestion! That does look really helpful, I see what you mean about it being more general than fold. I think I will replace my fold with aggregate - it should give me more control over the process. I think the problem will still exist though - which is that I can't get the correct partitioning I need. When I change my key to user_id, I lose the timestamp partitioning. My problem is that I'm trying to retain a parent RDD's partitioning in an RDD that no longer has the same keys as its parent. On Thu, Oct 16, 2014 at 11:46 AM, Cheng Lian lian.cs@gmail.com wrote: Hi Michael, I'm not sure I fully understood your question, but I think RDD.aggregate can be helpful in your case. You can see it as a more general version of fold. Cheng On 10/16/14 11:15 PM, Michael Misiewicz wrote: Hi, I'm working on a problem where I'd like to sum items in an RDD *in order (*approximately*)*. I am currently trying to implement this using a fold, but I'm having some issues because the sorting key of my data is not the same as the folding key for my data. I have data that looks like this: user_id, transaction_timestamp, transaction_amount And I'm interested in doing a foldByKey on user_id to sum transaction amounts - taking care to note approximately when a user surpasses a total transaction threshold. I'm using RangePartitioner to make sure that data is ordered sequentially between partitions, and I'd also make sure that data is sorted within partitions, though I'm not sure how to do this exactly (I was going to look at the code for sortByKey to figure this out - I believe sorting in place in a mapPartitions should work). What do you think about the approach? Here's some sample code that demonstrates what I'm thinking: def myFold(V1:Float, V2:Float) : Float = { val partialSum = V1 + V2 if (partialSum = 500) { // make a note of it, do things } return partialSum } val rawData = sc.textFile(hdfs://path/to/data).map{ x = // load data l = x.split() (l(0).toLong, l(1).toLong, l(2).toFloat) // user_id:long, transaction_timestamp:long, transaction_amount:float } val keyByTimestamp = rawData.map(x= (x._2, (x._1, x._3))) // rearrange to make timestamp the key (for sorting), convert to PairRDD val sortedByTimestamp = keyByTimestamp.sortByKey() val partitionedByTimestamp = sortedByTimestamp.partitionBy( new org.apache.spark.RangePartitioner(partitions=500, rdd=sortedByTimestamp)).persist() // By this point, the RDD should be sorted and partitioned according to the timestamp. However, I need to now make user_id the key, // because the output must be per user. At this point, since I change the keys of the PairRDD, I understand that I lose the partitioning // the consequence of this is that I can no longer be sure in my fold function that the ordering is retained. val keyByUser = partitionedByTimestamp.map(x = (x._2._1, x._2._2)) val finalResult = keyByUser.foldByKey(zeroValue=0)(myFold) finalResult.saveAsTextFile(hdfs://...) The problem as you'd expect takes place in the folding function, after I've re-arranged my RDD to no longer be keyed by timestamp (when I produce keyByUser, I lose the correct partitioning). As I've read in the documentation, partitioning is not preserved when
Re: Folding an RDD in order
My goal is for rows to be partitioned according to timestamp bins (e.g. with each partition representing an even interval of time), and then ordered by timestamp *within* each partition. Ordering by user ID is not important. In my aggregate function, in the seqOp function, I am checking to verify this fact, and seeing that rows within partitions are not in order - but I think this should be very easy to solve with mapPartitons( preservesPartitioning=True) prior to aggregate(), which should maintain the evenly spaced ranges produced by RangePartitioner. On Fri, Oct 17, 2014 at 11:04 AM, Cheng Lian lian.cs@gmail.com wrote: Hm, a little confused here. What exactly the ordering do you expect? It seems that you want all the elements in the RDD to be sorted first by timestamp and then by user_id. If this is true, then you can simply do this: rawData.map { case (time, user, amount) = (time, user) - amount }.sortByKey.aggregate(…) On 10/17/14 10:44 PM, Michael Misiewicz wrote: Thank you for sharing this Cheng! This is fantastic. I was able to implement it and it seems like it's working quite well. I'm definitely on the right track now! I'm still having a small problem with the rows inside each partition being out of order - but I suspect this is because in the code currently, I sortByKey, then use RangePartitioner (which I think does not maintain row order within each partition - due to the shuffle in RangePartitioner). I suspect I can work around this by doing operations in these order: - RangePartitioner - mapValues to sort each partition in memory, maintaining partitioning - aggregate Michael On Thu, Oct 16, 2014 at 12:35 PM, Cheng Lian lian.cs@gmail.com wrote: RDD.aggregate doesn’t require the RDD elements to be pairs, so you don’t need to use user_id to be the key or the RDD. For example, you can use an empty Map as the zero value of the aggregation. The key of the Map is the user_id you extracted from each tuple, and the value is the aggregated value. keyByTimestamp.aggregate(Map.empty[String, Float].withDefaultValue(0.0))({ (agg, rec) = val (time, (user, amount)) = rec agg.updated(user, agg(user) + amount) }, { (lhs, rhs) = lhs.keys.foldLeft(rhs) { (combined, user) = combined.updated(user, lhs(user) + rhs(user)) } }) Of course, you may use mutable Map for optimized performance. One thing to notice, foldByKey is a transformation, while aggregate is an action. The final result of the code above is a single Map object rather than an RDD. If this map can be very large (say you have billions of users), then aggregate may OOM. On 10/17/14 12:01 AM, Michael Misiewicz wrote: Thanks for the suggestion! That does look really helpful, I see what you mean about it being more general than fold. I think I will replace my fold with aggregate - it should give me more control over the process. I think the problem will still exist though - which is that I can't get the correct partitioning I need. When I change my key to user_id, I lose the timestamp partitioning. My problem is that I'm trying to retain a parent RDD's partitioning in an RDD that no longer has the same keys as its parent. On Thu, Oct 16, 2014 at 11:46 AM, Cheng Lian lian.cs@gmail.com wrote: Hi Michael, I'm not sure I fully understood your question, but I think RDD.aggregate can be helpful in your case. You can see it as a more general version of fold. Cheng On 10/16/14 11:15 PM, Michael Misiewicz wrote: Hi, I'm working on a problem where I'd like to sum items in an RDD *in order (*approximately*)*. I am currently trying to implement this using a fold, but I'm having some issues because the sorting key of my data is not the same as the folding key for my data. I have data that looks like this: user_id, transaction_timestamp, transaction_amount And I'm interested in doing a foldByKey on user_id to sum transaction amounts - taking care to note approximately when a user surpasses a total transaction threshold. I'm using RangePartitioner to make sure that data is ordered sequentially between partitions, and I'd also make sure that data is sorted within partitions, though I'm not sure how to do this exactly (I was going to look at the code for sortByKey to figure this out - I believe sorting in place in a mapPartitions should work). What do you think about the approach? Here's some sample code that demonstrates what I'm thinking: def myFold(V1:Float, V2:Float) : Float = { val partialSum = V1 + V2 if (partialSum = 500) { // make a note of it, do things } return partialSum } val rawData = sc.textFile(hdfs://path/to/data).map{ x = // load data l = x.split() (l(0).toLong, l(1).toLong, l(2).toFloat) // user_id:long, transaction_timestamp:long, transaction_amount:float } val keyByTimestamp = rawData.map(x= (x._2, (x._1, x._3))) // rearrange to make timestamp the key (for sorting), convert to PairRDD val
Re: object in an rdd: serializable?
interesting. why does case class work for this? thanks boromir! On Thu, Oct 16, 2014 at 10:41 PM, Boromir Widas vcsub...@gmail.com wrote: make it a case class should work. On Thu, Oct 16, 2014 at 8:30 PM, ll duy.huynh@gmail.com wrote: i got an exception complaining about serializable. the sample code is below... class HelloWorld(val count: Int) { ... ... } object Test extends App { ... val data = sc.parallelize(List(new HelloWorld(1), new HelloWorld(2))) ... } what is the best way to serialize HelloWorld so that it can be contained in an RDD? thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/object-in-an-rdd-serializable-tp16638.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark in cluster and errors
I wasn't the original person who posted the question, but this helped me! :) Thank you. I had a similar issue today when I tried to connect using the IP address (spark://master_ip:7077). I got it resolved by replacing it with the URL displayed in the Spark web console - in my case it is (spark://master_short_hostname:7077). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-in-cluster-and-errors-tp16249p16696.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Gracefully stopping a Spark Streaming application
You will have to write something in your app like an endpoint or button that triggers this code in your app. Hi all, I have a Spark Streaming application running on a cluster, deployed with the spark-submit script. I was reading here that it's possible to gracefully shutdown the application in order to allow the deployment of a new one: http://spark.apache.org/docs/latest/streaming-programming-guide.html#deploying-applications Question is: how can I call StreamingContext.stop(...) http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.StreamingContext from outside the application? Thanks a lot, Max -- Massimiliano Tomassi
Re: What is akka-actor_2.10-2.2.3-shaded-protobuf.jar?
They should be the same except the package names are changed to avoid protopuf conflict. You can use it just like other Akka jars Chester Sent from my iPhone On Oct 17, 2014, at 5:56 AM, Ruebenacker, Oliver A oliver.ruebenac...@altisource.com wrote: Hello, My SBT pulls in, among others, the following dependency for Spark 1.1.0: akka-actor_2.10-2.2.3-shaded-protobuf.jar What is this? How is this different from the regular Akka Actor JAR? How do I reconcile with other libs that use Akka, such as Play? Thanks! Best, Oliver Oliver Ruebenacker | Solutions Architect Altisource™ 290 Congress St, 7th Floor | Boston, Massachusetts 02210 P: (617) 728-5582 | ext: 275585 oliver.ruebenac...@altisource.com | www.Altisource.com *** This email message and any attachments are intended solely for the use of the addressee. If you are not the intended recipient, you are prohibited from reading, disclosing, reproducing, distributing, disseminating or otherwise using this transmission. If you have received this message in error, please promptly notify the sender by reply email and immediately delete this message from your system. This message and any attachments may contain information that is confidential, privileged or exempt from disclosure. Delivery of this message to any person other than the intended recipient is not intended to waive any right or privilege. Message transmission is not guaranteed to be secure or free of software viruses. ***
Attaching schema to RDD created from Parquet file
Hi, How can I convert an RDD loaded from a Parquet file into its original type: case class Person(name: String, age: Int) val rdd: RDD[Person] = ... rdd.saveAsParquetFile(people) val rdd2: sqlContext.parquetFile(people) How can I map rdd2 back into an RDD[Person]? All of the examples just show how to use the RDD loaded from Parquet using SQL.
Re: bug with MapPartitions?
There seems to be some problem with what gets captured in the closure that's passed into the mapPartitions (myfunc in your case). I've had a similar problem before: http://apache-spark-user-list.1001560.n3.nabble.com/TaskNotSerializableException-when-running-through-Spark-shell-td16574.html Try putting your myFunc in an object: object Mapper { def myFunc = ... } val r = sc.parallelize(c).mapPartitions(Mapper.myFunc).collect() On Fri, Oct 17, 2014 at 7:33 AM, davidkl davidkl...@hotmail.com wrote: Hello, Maybe there is something I do not get to understand, but I believe this code should not throw any serialization error when I run this in the spark shell. Using similar code with map instead of mapPartitions works just fine. import java.io.BufferedInputStream import java.io.FileInputStream import com.testing.DataPacket val inStream = new BufferedInputStream(new FileInputStream(inputFile)) val p = new DataPacket(inStream) val c = Array(p) val myfunc[T](iter: Iterator[T]) : Iterator[String] = { var res = List[String]() while (iter.hasNext) { val cur = iter.next; res .::= () } res.iterator } var r = sc.parallelize(c).mapPartitions(myfunc).collect() This throws the following: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597) ... ... Caused by: java.io.NotSerializableException: java.io.BufferedInputStream at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) ... ... Why is this code failing? The constructor of DataPacket just reads data, but does not keep any reference to the BufferedInputStream. Note that this is not the real code, but a simplification while trying to isolate the cause of the error I get. Using map on this instead of MapPartitions works fine. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/bug-with-MapPartitions-tp16689.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark/HIVE Insert Into values Error
Hi, When trying to insert records into HIVE, I got error, My Spark is 1.1.0 and Hive 0.12.0 Any idea what would be wrong? Regards Arthur hive CREATE TABLE students (name VARCHAR(64), age INT, gpa int); OK hive INSERT INTO TABLE students VALUES ('fred flintstone', 35, 1); NoViableAltException(26@[]) at org.apache.hadoop.hive.ql.parse.HiveParser_SelectClauseParser.selectClause(HiveParser_SelectClauseParser.java:693) at org.apache.hadoop.hive.ql.parse.HiveParser.selectClause(HiveParser.java:31374) at org.apache.hadoop.hive.ql.parse.HiveParser.regular_body(HiveParser.java:29083) at org.apache.hadoop.hive.ql.parse.HiveParser.queryStatement(HiveParser.java:28968) at org.apache.hadoop.hive.ql.parse.HiveParser.queryStatementExpression(HiveParser.java:28762) at org.apache.hadoop.hive.ql.parse.HiveParser.execStatement(HiveParser.java:1238) at org.apache.hadoop.hive.ql.parse.HiveParser.statement(HiveParser.java:938) at org.apache.hadoop.hive.ql.parse.ParseDriver.parse(ParseDriver.java:190) at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:424) at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:342) at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:977) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:888) at org.apache.hadoop.hive.cli.CliDriver.processLocalCmd(CliDriver.java:259) at org.apache.hadoop.hive.cli.CliDriver.processCmd(CliDriver.java:216) at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:413) at org.apache.hadoop.hive.cli.CliDriver.executeDriver(CliDriver.java:781) at org.apache.hadoop.hive.cli.CliDriver.run(CliDriver.java:675) at org.apache.hadoop.hive.cli.CliDriver.main(CliDriver.java:614) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.util.RunJar.main(RunJar.java:212) FAILED: ParseException line 1:27 cannot recognize input near 'VALUES' '(' ''fred flintstone'' in select clause
Re: how to submit multiple jar files when using spark-submit script in shell?
Hm, it works for me. Are you sure you have provided the right jars? What happens if you pass in the `--verbose` flag? 2014-10-16 23:51 GMT-07:00 eric wong win19...@gmail.com: Hi, i using the comma separated style for submit multiple jar files in the follow shell but it does not work: bin/spark-submit --class org.apache.spark.examples.mllib.JavaKMeans --master yarn-cluster --execur-memory 2g *--jars lib/spark-examples-1.0.2-hadoop2.2.0.jar,lib/spark-mllib_2.10-1.0.0.jar *hdfs://master:8000/srcdata/kmeans 8 4 Thanks! -- WangHaihua
Re: how can I make the sliding window in Spark Streaming driven by data timestamp instead of absolute time
I believe I have a similar question to this. I would like to process an offline event stream for testing/debugging. The stream is stored in a CSV file where each row in the file has a timestamp. I would like to feed this file into Spark Streaming and have the concept of time be driven by the timestamp column. Has anyone does this before? I haven't seen anything in the docs. Would like to know if this is possible in Spark Streaming. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-can-I-make-the-sliding-window-in-Spark-Streaming-driven-by-data-timestamp-instead-of-absolute-tie-tp1755p16704.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
complexity of each action / transformation
hello... is there a list that shows the complexity of each action/transformation? for example, what is the complexity of RDD.map()/filter() or RowMatrix.multiply() etc? that would be really helpful. thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/complexity-of-each-action-transformation-tp16705.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: why do RDD's partitions migrate between worker nodes in different iterations
The RDDs aren't changing; you are assigning new RDDs to rdd_0 and rdd_1. Operations like join and reduceByKey are making distinct, new partitions that don't correspond 1-1 with old partitions anyway. On Fri, Oct 17, 2014 at 5:32 AM, randylu randyl...@gmail.com wrote: Dear all, In my test programer, there are 3 partitions for each RDD, the iteration procedure is as follows: var rdd_0 = ... // init for (...) { *rdd_1* = *rdd_0*.reduceByKey(...).partitionBy(p) // calculate rdd_1 from rdd_0 *rdd_0* = *rdd_0*.partitionBy(p).join(*rdd_1*)... // update rdd_0 by rdd_1 *rdd_0*./action/() } I thought rdd_0 and rdd_1 are part by the same partitioner, and their corresponding partitions are on the same node. for example, rdd_0's partition_0 and rdd_1's partiiton_0 are on the same node in each iteration. But in fact, rdd_0's partition_0 changes its location between workers. Any way to make rdd_0 and rdd_1's partitions not changing their locations, and their corresponding partitions are on the same node for fast join() ? Best Regards, randy -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/why-do-RDD-s-partitions-migrate-between-worker-nodes-in-different-iterations-tp16669.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: how to build spark 1.1.0 to include org.apache.commons.math3 ?
It doesn't contain commons math3 since Spark does not depend on it. Its tests do, but tests are not built into the Spark assembly. On Thu, Oct 16, 2014 at 9:57 PM, Henry Hung ythu...@winbond.com wrote: HI All, I try to build spark 1.1.0 using sbt with command: sbt/sbt -Dhadoop.version=2.2.0 -Pyarn assembly but the resulting spark-assembly-1.1.0-hadoop2.2.0.jar still missing the apache commons math3 classes. How to add the math3 into package? Best regards, Henry The privileged confidential information contained in this email is intended for use only by the addressees as indicated by the original sender of this email. If you are not the addressee indicated in this email or are not responsible for delivery of the email to such a person, please kindly reply to the sender indicating this fact and delete all copies of it from your computer and network server immediately. Your cooperation is highly appreciated. It is advised that any unauthorized use of confidential information of Winbond is strictly prohibited; and any information in this email irrelevant to the official business of Winbond shall be deemed as neither given nor endorsed by Winbond. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: how to submit multiple jar files when using spark-submit script in shell?
On top of what Andrew said, you shouldn't need to manually add the mllib jar to your jobs; it's already included in the Spark assembly jar. On Thu, Oct 16, 2014 at 11:51 PM, eric wong win19...@gmail.com wrote: Hi, i using the comma separated style for submit multiple jar files in the follow shell but it does not work: bin/spark-submit --class org.apache.spark.examples.mllib.JavaKMeans --master yarn-cluster --execur-memory 2g --jars lib/spark-examples-1.0.2-hadoop2.2.0.jar,lib/spark-mllib_2.10-1.0.0.jar hdfs://master:8000/srcdata/kmeans 8 4 Thanks! -- WangHaihua -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Designed behavior when master is unreachable.
I'm not sure what the design is, but I think the current behavior if the driver can't reach the master is to attempt to connect once and fail if that attempt fails. Is that what you're observing? (What version of Spark also?) On Fri, Oct 17, 2014 at 3:51 AM, preeze etan...@gmail.com wrote: Hi all, I am running a standalone spark cluster with a single master. No HA or failover is configured explicitly (no ZooKeeper etc). What is the default designed behavior for submission of new jobs when a single master went down or became unreachable? I couldn't find it documented anywhere. Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Designed-behavior-when-master-is-unreachable-tp16677.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: complexity of each action / transformation
On 10/17/2014 02:08 PM, ll wrote: hello... is there a list that shows the complexity of each action/transformation? for example, what is the complexity of RDD.map()/filter() or RowMatrix.multiply() etc? that would be really helpful. thanks! I'm pretty new to Spark, so I only know about the RDD functions. The complexity of map() and filter() depend on the function that you pass to them, but it is also more than that. If you view the RDD as a single array in memory, any operation (such as map and filter) is linear, or O(n). If you split the RDD between h hosts, the complexity now becomes O(n/h). Now, adding the function that you pass. Say, for example, that you run a function 'f' with complexity O(f). The new, final complexity is O(n/h * O(f)). If 'f' is a call to String.split() and each String in the RDD has 'l' characters, it has (I think) complexity O(l). So the complexity of calling: myRdd.map(line = line.split(',')) is something along the lines of O(n/h * l). The complexity of a filter() call should be found the same way. The complexity is mostly dependent on the complexity of the function being passed to map()/filter()/whatever, which is where the power of Spark comes in. According to O(n/h * O(f)), the runtime linearly increases with RDD size (makes sense) and linearly *decreases* with the number of hosts, which is why anyone uses Spark. Hope this helps, and explains why the complexities of various functions are not posted; they change based on the parameter passed. Alec
Unable to connect to Spark thrift JDBC server with pluggable authentication
Hi, if Spark thrift JDBC server is started with non-secure mode, it is working fine. with a secured mode in case of pluggable authentication, I placed the authentication class configuration in conf/hive-site.xml property namehive.server2.authentication/name valueCUSTOM/value /property property namehive.server2.custom.authentication.class/name valueorg.apache.hive.service.auth.WebConsoleAuthenticationProviderImpl/value /property and the jar containing the implementation is in Spark classpath, still getting exception, it seems to me it couldn't find the authentication class I specified in the configuration: 14/10/17 12:44:33 ERROR server.TThreadPoolServer: Error occurred during processing of message. java.lang.RuntimeException: java.lang.NoSuchMethodException: org.apache.hive.service.auth.PasswdAuthenticationProvider.init() at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:131) at org.apache.hive.service.auth.CustomAuthenticationProviderImpl.init(CustomAuthenticationProviderImpl.java:38) at org.apache.hive.service.auth.AuthenticationProviderFactory.getAuthenticationProvider(AuthenticationProviderFactory.java:57) at org.apache.hive.service.auth.PlainSaslHelper$PlainServerCallbackHandler.handle(PlainSaslHelper.java:61) at org.apache.hive.service.auth.PlainSaslServer.evaluateResponse(PlainSaslServer.java:127) at org.apache.thrift.transport.TSaslTransport$SaslParticipant.evaluateChallengeOrResponse(TSaslTransport.java:509) at org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:264) at org.apache.thrift.transport.TSaslServerTransport.open(TSaslServerTransport.java:41) at org.apache.thrift.transport.TSaslServerTransport$Factory.getTransport(TSaslServerTransport.java:216) at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:189) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1176) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641) at java.lang.Thread.run(Thread.java:853) Caused by: java.lang.NoSuchMethodException: org.apache.hive.service.auth.PasswdAuthenticationProvider.init() at java.lang.Class.throwNoSuchMethodException(Class.java:367) at java.lang.Class.getDeclaredConstructor(Class.java:541) at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:125) why is that? Thanks for your help! Jenny
Transforming the Dstream vs transforming each RDDs in the Dstream.
Hi, We have been implementing several Spark Streaming jobs that are basically processing data and inserting it into Cassandra, sorting it among different keyspaces. We've been following the pattern: dstream.foreachRDD(rdd = val records = rdd.map(elem = record(elem)) targets.foreach(target = records.filter{record = isTarget(target,record)}.writeToCassandra(target,table)) ) I've been wondering whether there would be a performance difference in transforming the dstream instead of transforming the RDD within the dstream with regards to how the transformations get scheduled. Instead of the RDD-centric computation, I could transform the dstream until the last step, where I need an rdd to store. For example, the previous transformation could be written as: val recordStream = dstream.map(elem = record(elem)) targets.foreach{target = recordStream.filter(record = isTarget(target,record)).foreachRDD(_.writeToCassandra(target,table))} Would be a difference in execution and/or performance? What would be the preferred way to do this? Bonus question: Is there a better (more performant) way to sort the data in different buckets instead of filtering the data collection times the #buckets? thanks, Gerard.
Re: Optimizing pairwise similarity computation or how to avoid RDD.cartesian operation ?
Hi Reza, Thank you for the suggestion. The number of point are not that large about 1000 for each set. So I have 1000x1000 pairs. But, my similarity is obtained using a metric learning to rank and from spark it is viewed as a black box. So my idea was just to distribute the computation of the 1000x1000 similarities. After some investigation, I managed to make it run faster. My feature vectors are obtained after a join operation and I didn't cache the result of this operation before the cartesian operation. Caching the result of the join operation make my code runs amazingly faster. So I think, the real problem I have is the lack of good practice on spark programming. Best Jao On Fri, Oct 17, 2014 at 11:08 PM, Reza Zadeh r...@databricks.com wrote: Hi Jaonary, What are the numbers, i.e. number of points you're trying to do all-pairs on, and the dimension of each? Have you tried the new implementation of columnSimilarities in RowMatrix? Setting the threshold high enough (potentially above 1.0) might solve your problem, here is an example https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/mllib/CosineSimilarity.scala . This implements the DIMSUM sampling scheme, recently merged into master https://github.com/apache/spark/pull/1778. Best, Reza On Fri, Oct 17, 2014 at 3:43 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, I need to compute a similiarity between elements of two large sets of high dimensional feature vector. Naively, I create all possible pair of vectors with * features1.cartesian(features2)* and then map the produced paired rdd with my similarity function. The problem is that the cartesian operation takes a lot times, more time that computing the similarity itself. If I save each of my feature vector into disk, form a list of file name pair and compute the similarity by reading the files it runs significantly much faster. Any ideas will be helpful, Cheers, Jao
Re: Spark on YARN driver memory allocation bug?
It may also cause a problem when running in the yarn-client mode. If --driver-memory is large, Yarn has to allocate a lot of memory to the AM container, but AM doesn't really need the memory. Boduo -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-on-YARN-driver-memory-allocation-bug-tp15961p16721.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
mllib.linalg.Vectors vs Breeze?
hello... i'm looking at the source code for mllib.linalg.Vectors and it looks like it's a wrapper around Breeze with very small changes (mostly changing the names). i don't have any problem with using spark wrapper around Breeze or Breeze directly. i'm just curious to understand why this wrapper was created vs. pointing everyone to Breeze directly? https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/mllib-linalg-Vectors-vs-Breeze-tp16722.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: mllib.linalg.Vectors vs Breeze?
I don't know the answer for sure, but just from an API perspective I'd guess that the Spark authors don't want to tie their API to Breeze. If at a future point they swap out a different implementation for Breeze, they don't have to change their public interface. MLlib's interface remains consistent while the internals are free to evolve. Nick 2014년 10월 17일 금요일, llduy.huynh@gmail.com님이 작성한 메시지: hello... i'm looking at the source code for mllib.linalg.Vectors and it looks like it's a wrapper around Breeze with very small changes (mostly changing the names). i don't have any problem with using spark wrapper around Breeze or Breeze directly. i'm just curious to understand why this wrapper was created vs. pointing everyone to Breeze directly? https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/mllib-linalg-Vectors-vs-Breeze-tp16722.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org javascript:; For additional commands, e-mail: user-h...@spark.apache.org javascript:;
PySpark joins fail - please help
https://gist.github.com/rjurney/fd5c0110fe7eb686afc9 Any way I try to join my data fails. I can't figure out what I'm doing wrong. -- Russell Jurney twitter.com/rjurney russell.jur...@gmail.com datasyndrome.com ᐧ
Re: mllib.linalg.Vectors vs Breeze?
Yes, I think that's the logic, but then what do toBreezeVector return if it is not based on Breeze? and this is called a lot by client code since you often have to do something nontrivial to the vector. I suppose you can still have that thing return a Breeze vector and use it for no other purpose. I think this abstraction leaks though. On Fri, Oct 17, 2014 at 7:48 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: I don't know the answer for sure, but just from an API perspective I'd guess that the Spark authors don't want to tie their API to Breeze. If at a future point they swap out a different implementation for Breeze, they don't have to change their public interface. MLlib's interface remains consistent while the internals are free to evolve. Nick 2014년 10월 17일 금요일, llduy.huynh@gmail.com님이 작성한 메시지: hello... i'm looking at the source code for mllib.linalg.Vectors and it looks like it's a wrapper around Breeze with very small changes (mostly changing the names). i don't have any problem with using spark wrapper around Breeze or Breeze directly. i'm just curious to understand why this wrapper was created vs. pointing everyone to Breeze directly? https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/mllib-linalg-Vectors-vs-Breeze-tp16722.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to disable input split
Is it possible to disable input split if input is already small?
Re: How to write a RDD into One Local Existing File?
You can save to a local file. What are you trying and what doesn't work? You can output one file by repartitioning to 1 partition but this is probably not a good idea as you are bottlenecking the output and some upstream computation by disabling parallelism. How about just combining the files on HDFS afterwards? or just reading all the files instead of 1? You can hdfs dfs -cat a bunch of files at once. On Fri, Oct 17, 2014 at 6:46 PM, Parthus peng.wei@gmail.com wrote: Hi, I have a spark mapreduce task which requires me to write the final rdd to an existing local file (appending to this file). I tried two ways but neither works well: 1. use saveAsTextFile() api. Spark 1.1.0 claims that this API can write to local, but I never make it work. Moreover, the result is not one file but a series of part-x files which is not what I hope to get. 2. collect the rdd to an array and write it to the driver node using Java's File IO. There are also two problems: 1) my RDD is huge(1TB), which cannot fit into the memory of one driver node. I have to split the task into small pieces and collect them part by part and write; 2) During the writing by Java IO, the Spark Mapreduce task has to wait, which is not efficient. Could anybody provide me an efficient way to solve this problem? I wish that the solution could be like: appending a huge rdd to a local file without pausing the MapReduce during writing? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-write-a-RDD-into-One-Local-Existing-File-tp16720.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: PySpark joins fail - please help
Hey Russell, join() can only work with RDD of pairs (key, value), such as rdd1: (k, v1) rdd2: (k, v2) rdd1.join(rdd2) will be (k1, v1, v2) Spark SQL will be more useful for you, see http://spark.apache.org/docs/1.1.0/sql-programming-guide.html Davies On Fri, Oct 17, 2014 at 5:01 PM, Russell Jurney russell.jur...@gmail.com wrote: https://gist.github.com/rjurney/fd5c0110fe7eb686afc9 Any way I try to join my data fails. I can't figure out what I'm doing wrong. -- Russell Jurney twitter.com/rjurney russell.jur...@gmail.com datasyndrome. com ᐧ
Re: PySpark joins fail - please help
Is that not exactly what I've done in j3/j4? The keys are identical strings.The k is the same, the value in both instances is an associative array. devices = devices.map(lambda x: (dh.index('id'), {'deviceid': x[dh.index('id')], 'foo': x[dh.index('foo')], 'bar': x[dh.index('bar')]})) bytes_in_out = transactions.map(lambda x: (x[th.index('deviceid')], {'deviceid': x[th.index('deviceid')], 'foo': x[th.index('foo')], 'bar': x[th.index('bar')], 'hello': x[th.index('hello')], 'world': x[th.index('world')]})) j3 = bytes_in_out.join(devices, 10) j3.take(1) j4 = devices.join(bytes_int_out, 10) j4.take(1) ᐧ On Fri, Oct 17, 2014 at 5:48 PM, Davies Liu dav...@databricks.com wrote: Hey Russell, join() can only work with RDD of pairs (key, value), such as rdd1: (k, v1) rdd2: (k, v2) rdd1.join(rdd2) will be (k1, v1, v2) Spark SQL will be more useful for you, see http://spark.apache.org/docs/1.1.0/sql-programming-guide.html Davies On Fri, Oct 17, 2014 at 5:01 PM, Russell Jurney russell.jur...@gmail.com wrote: https://gist.github.com/rjurney/fd5c0110fe7eb686afc9 Any way I try to join my data fails. I can't figure out what I'm doing wrong. -- Russell Jurney twitter.com/rjurney russell.jur...@gmail.com datasyndrome. com ᐧ -- Russell Jurney twitter.com/rjurney russell.jur...@gmail.com datasyndrome.com
Re: input split size
Thanks, Andrew. What about reading out of local? On Fri, Oct 17, 2014 at 5:38 PM, Andrew Ash and...@andrewash.com wrote: When reading out of HDFS it's the HDFS block size. On Fri, Oct 17, 2014 at 5:27 PM, Larry Liu larryli...@gmail.com wrote: What is the default input split size? How to change it?
Re: PySpark joins fail - please help
There was a bug in the devices line: dh.index('id') should have been x[dh.index('id')]. ᐧ On Fri, Oct 17, 2014 at 5:52 PM, Russell Jurney russell.jur...@gmail.com wrote: Is that not exactly what I've done in j3/j4? The keys are identical strings.The k is the same, the value in both instances is an associative array. devices = devices.map(lambda x: (dh.index('id'), {'deviceid': x[dh.index('id')], 'foo': x[dh.index('foo')], 'bar': x[dh.index('bar')]})) bytes_in_out = transactions.map(lambda x: (x[th.index('deviceid')], {'deviceid': x[th.index('deviceid')], 'foo': x[th.index('foo')], 'bar': x[th.index('bar')], 'hello': x[th.index('hello')], 'world': x[th.index('world')]})) j3 = bytes_in_out.join(devices, 10) j3.take(1) j4 = devices.join(bytes_int_out, 10) j4.take(1) ᐧ On Fri, Oct 17, 2014 at 5:48 PM, Davies Liu dav...@databricks.com wrote: Hey Russell, join() can only work with RDD of pairs (key, value), such as rdd1: (k, v1) rdd2: (k, v2) rdd1.join(rdd2) will be (k1, v1, v2) Spark SQL will be more useful for you, see http://spark.apache.org/docs/1.1.0/sql-programming-guide.html Davies On Fri, Oct 17, 2014 at 5:01 PM, Russell Jurney russell.jur...@gmail.com wrote: https://gist.github.com/rjurney/fd5c0110fe7eb686afc9 Any way I try to join my data fails. I can't figure out what I'm doing wrong. -- Russell Jurney twitter.com/rjurney russell.jur...@gmail.com datasyndrome .com ᐧ -- Russell Jurney twitter.com/rjurney russell.jur...@gmail.com datasyndrome. com -- Russell Jurney twitter.com/rjurney russell.jur...@gmail.com datasyndrome.com