Re: Spark caching questions
Cached RDD do not survive SparkContext deletion (they are scoped on a per sparkcontext basis). I am not sure what you mean by disk based cache eviction, if you cache more RDD than disk space the result will not be very pretty :) Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Wed, Sep 10, 2014 at 4:43 AM, Vladimir Rodionov vrodio...@splicemachine.com wrote: Hi, users 1. Disk based cache eviction policy? The same LRU? 2. What is the scope of a cached RDD? Does it survive application? What happen if I run Java app next time? Will RRD be created or read from cache? If , answer is YES, then ... 3. Is there are any way to invalidate cached RDD automatically? RDD partitions? Some API kind of : RDD.isValid()? 4. HadoopRDD InputFormat - based. Some partitions (splits) may become invalid in cache. Can we reload only those partitions? Into cache? -Vladimir
Re: Spark Streaming and database access (e.g. MySQL)
I think she is checking for blanks? But if the RDD is blank then nothing will happen, no db connections etc. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Mon, Sep 8, 2014 at 1:32 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, On Mon, Sep 8, 2014 at 4:39 PM, Sean Owen so...@cloudera.com wrote: if (rdd.take (1).size == 1) { rdd foreachPartition { iterator = I was wondering: Since take() is an output operation, isn't it computed twice (once for the take(1), once during the iteration)? Or will only one single element be computed for take(1)? Thanks Tobias
Re: RDD memory questions
Thank you very much for your kindly help. I rise some another questions: - If the RDD is stored in serialized format, is that means that whenever the RDD is processed, it will be unpacked and packed again from and back to the JVM even they are located on the same machine? http://apache-spark-user-list.1001560.n3.nabble.com/file/n13862/rdd_img.png - Can the RDD be partially unpacked from the serialized state? or when every a RDD is touched, it must be fully unpacked, and of course pack again afterword. - When a RDD is cached, is it saved in a unserialized format or serialized format? If it's saved in a unserialized format, is the partially reading of RDD from JVM to PYTHON runtime possible? Thank you very much -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-memory-questions-tp13805p13862.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark.cleaner.ttl and spark.streaming.unpersist
I somehow missed that parameter when I was reviewing the documentation, that should do the trick! Thank you! 2014-09-10 2:10 GMT+01:00 Shao, Saisai saisai.s...@intel.com: Hi Luis, The parameter “spark.cleaner.ttl” and “spark.streaming.unpersist” can be used to remove useless timeout streaming data, the difference is that “spark.cleaner.ttl” is time-based cleaner, it does not only clean streaming input data, but also Spark’s useless metadata; while “spark.streaming.unpersist” is reference-based cleaning mechanism, streaming data will be removed when out of slide duration. Both these two parameter can alleviate the memory occupation of Spark Streaming. But if the data is flooded into Spark Streaming when start up like your situation using Kafka, these two parameters cannot well mitigate the problem. Actually you need to control the input data rate to not inject so fast, you can try “spark.straming.receiver.maxRate” to control the inject rate. Thanks Jerry *From:* Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com] *Sent:* Wednesday, September 10, 2014 5:21 AM *To:* user@spark.apache.org *Subject:* spark.cleaner.ttl and spark.streaming.unpersist The executors of my spark streaming application are being killed due to memory issues. The memory consumption is quite high on startup because is the first run and there are quite a few events on the kafka queues that are consumed at a rate of 100K events per sec. I wonder if it's recommended to use spark.cleaner.ttl and spark.streaming.unpersist together to mitigate that problem. And I also wonder if new RDD are being batched while a RDD is being processed. Regards, Luis
Re: groupBy gives non deterministic results
Hi, I am using spark 1.0.0. The bug is fixed by 1.0.1. Hao -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/groupBy-gives-non-deterministic-results-tp13698p13864.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark SQL -- more than two tables for join
Hi: I Hava a question about Spark SQL。 First ,i use left join on two tables,like this: sql(SELECT * FROM youhao_data left join youhao_age on (youhao_data.rowkey=youhao_age.rowkey)).collect().foreach(println) the result is my except。 But,when i use left join on three tables or more ,like this: sql(SELECT * FROM youhao_data left join youhao_age on (youhao_data.rowkey=youhao_age.rowkey) left join youhao_totalKiloMeter on (youhao_age.rowkey=youhao_totalKiloMeter.rowkey)).collect().foreach(println) I take the Exception: Exception in thread main java.lang.RuntimeException: [1.90] failure: ``UNION'' expected but `left' found SELECT * FROM youhao_data left join youhao_age on (youhao_data.rowkey=youhao_age.rowkey) left join youhao_totalKiloMeter on (youhao_age.rowkey=youhao_totalKiloMeter.rowkey) ^ at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.SqlParser.apply(SqlParser.scala:60) at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:69) at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:181) at org.apache.spark.examples.sql.SparkSQLHBaseRelation$.main(SparkSQLHBaseRelation.scala:140) at org.apache.spark.examples.sql.SparkSQLHBaseRelation.main(SparkSQLHBaseRelation.scala) My Question IS: 1、Whether My SQL script error or Spark SQL not support more than two tables for join? 2、If Saprk SQL not support more than two tables, How Can I do for my requirment? boyingk...@163.com
Re: groupBy gives non deterministic results
Great. And you should ask question in user@spark.apache.org mail list. I believe many people don't subscribe the incubator mail list now. -- Ye Xianjin Sent with Sparrow (http://www.sparrowmailapp.com/?sig) On Wednesday, September 10, 2014 at 6:03 PM, redocpot wrote: Hi, I am using spark 1.0.0. The bug is fixed by 1.0.1. Hao -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/groupBy-gives-non-deterministic-results-tp13698p13864.html Sent from the Apache Spark User List mailing list archive at Nabble.com (http://Nabble.com). - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org (mailto:user-unsubscr...@spark.apache.org) For additional commands, e-mail: user-h...@spark.apache.org (mailto:user-h...@spark.apache.org)
Dependency Problem with Spark / ScalaTest / SBT
Hello, I am writing a Spark App which is already working so far. Now I started to build also some UnitTests, but I am running into some dependecy problems and I cannot find a solution right now. Perhaps someone could help me. I build my Spark Project with SBT and it seems to be configured well, because compiling, assembling and running the built jar with spark-submit are working well. Now I started with the UnitTests, which I located under /src/test/scala. When I call test in sbt, I get the following: 14/09/10 12:22:06 INFO storage.BlockManagerMaster: Registered BlockManager 14/09/10 12:22:06 INFO spark.HttpServer: Starting HTTP Server [trace] Stack trace suppressed: run last test:test for the full output. [error] Could not run test test.scala.SetSuite: java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse [info] Run completed in 626 milliseconds. [info] Total number of tests run: 0 [info] Suites: completed 0, aborted 0 [info] Tests: succeeded 0, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. [error] Error during tests: [error] test.scala.SetSuite [error] (test:test) sbt.TestsFailedException: Tests unsuccessful [error] Total time: 3 s, completed 10.09.2014 12:22:06 last test:test gives me the following: last test:test [debug] Running TaskDef(test.scala.SetSuite, org.scalatest.tools.Framework$$anon$1@6e5626c8, false, [SuiteSelector]) java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse at org.apache.spark.HttpServer.start(HttpServer.scala:54) at org.apache.spark.broadcast.HttpBroadcast$.createServer(HttpBroadcast.scala:156) at org.apache.spark.broadcast.HttpBroadcast$.initialize(HttpBroadcast.scala:127) at org.apache.spark.broadcast.HttpBroadcastFactory.initialize(HttpBroadcastFactory.scala:31) at org.apache.spark.broadcast.BroadcastManager.initialize(BroadcastManager.scala:48) at org.apache.spark.broadcast.BroadcastManager.init(BroadcastManager.scala:35) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:218) at org.apache.spark.SparkContext.init(SparkContext.scala:202) at test.scala.SetSuite.init(SparkTest.scala:16) I also noticed right now, that sbt run is also not working: 14/09/10 12:44:46 INFO spark.HttpServer: Starting HTTP Server [error] (run-main-2) java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse at org.apache.spark.HttpServer.start(HttpServer.scala:54) at org.apache.spark.broadcast.HttpBroadcast$.createServer(HttpBroadcast.scala:156) at org.apache.spark.broadcast.HttpBroadcast$.initialize(HttpBroadcast.scala:127) at org.apache.spark.broadcast.HttpBroadcastFactory.initialize(HttpBroadcastFactory.scala:31) at org.apache.spark.broadcast.BroadcastManager.initialize(BroadcastManager.scala:48) at org.apache.spark.broadcast.BroadcastManager.init(BroadcastManager.scala:35) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:218) at org.apache.spark.SparkContext.init(SparkContext.scala:202) at main.scala.PartialDuplicateScanner$.main(PartialDuplicateScanner.scala:29) at main.scala.PartialDuplicateScanner.main(PartialDuplicateScanner.scala) Here is my Testprojekt.sbt file: name := Testprojekt version := 1.0 scalaVersion := 2.10.4 libraryDependencies ++= { Seq( org.apache.lucene % lucene-core % 4.9.0, org.apache.lucene % lucene-analyzers-common % 4.9.0, org.apache.lucene % lucene-queryparser % 4.9.0, (org.apache.spark %% spark-core % 1.0.2). exclude(org.mortbay.jetty, servlet-api). exclude(commons-beanutils, commons-beanutils-core). exclude(commons-collections, commons-collections). exclude(commons-collections, commons-collections). exclude(com.esotericsoftware.minlog, minlog). exclude(org.eclipse.jetty.orbit, javax.mail.glassfish). exclude(org.eclipse.jetty.orbit, javax.transaction). exclude(org.eclipse.jetty.orbit, javax.servlet) ) } resolvers += Akka Repository at http://repo.akka.io/releases/; - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Dependency Problem with Spark / ScalaTest / SBT
Hi, What is your SBT command and the parameters? Arthur On 10 Sep, 2014, at 6:46 pm, Thorsten Bergler sp...@tbonline.de wrote: Hello, I am writing a Spark App which is already working so far. Now I started to build also some UnitTests, but I am running into some dependecy problems and I cannot find a solution right now. Perhaps someone could help me. I build my Spark Project with SBT and it seems to be configured well, because compiling, assembling and running the built jar with spark-submit are working well. Now I started with the UnitTests, which I located under /src/test/scala. When I call test in sbt, I get the following: 14/09/10 12:22:06 INFO storage.BlockManagerMaster: Registered BlockManager 14/09/10 12:22:06 INFO spark.HttpServer: Starting HTTP Server [trace] Stack trace suppressed: run last test:test for the full output. [error] Could not run test test.scala.SetSuite: java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse [info] Run completed in 626 milliseconds. [info] Total number of tests run: 0 [info] Suites: completed 0, aborted 0 [info] Tests: succeeded 0, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. [error] Error during tests: [error] test.scala.SetSuite [error] (test:test) sbt.TestsFailedException: Tests unsuccessful [error] Total time: 3 s, completed 10.09.2014 12:22:06 last test:test gives me the following: last test:test [debug] Running TaskDef(test.scala.SetSuite, org.scalatest.tools.Framework$$anon$1@6e5626c8, false, [SuiteSelector]) java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse at org.apache.spark.HttpServer.start(HttpServer.scala:54) at org.apache.spark.broadcast.HttpBroadcast$.createServer(HttpBroadcast.scala:156) at org.apache.spark.broadcast.HttpBroadcast$.initialize(HttpBroadcast.scala:127) at org.apache.spark.broadcast.HttpBroadcastFactory.initialize(HttpBroadcastFactory.scala:31) at org.apache.spark.broadcast.BroadcastManager.initialize(BroadcastManager.scala:48) at org.apache.spark.broadcast.BroadcastManager.init(BroadcastManager.scala:35) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:218) at org.apache.spark.SparkContext.init(SparkContext.scala:202) at test.scala.SetSuite.init(SparkTest.scala:16) I also noticed right now, that sbt run is also not working: 14/09/10 12:44:46 INFO spark.HttpServer: Starting HTTP Server [error] (run-main-2) java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse at org.apache.spark.HttpServer.start(HttpServer.scala:54) at org.apache.spark.broadcast.HttpBroadcast$.createServer(HttpBroadcast.scala:156) at org.apache.spark.broadcast.HttpBroadcast$.initialize(HttpBroadcast.scala:127) at org.apache.spark.broadcast.HttpBroadcastFactory.initialize(HttpBroadcastFactory.scala:31) at org.apache.spark.broadcast.BroadcastManager.initialize(BroadcastManager.scala:48) at org.apache.spark.broadcast.BroadcastManager.init(BroadcastManager.scala:35) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:218) at org.apache.spark.SparkContext.init(SparkContext.scala:202) at main.scala.PartialDuplicateScanner$.main(PartialDuplicateScanner.scala:29) at main.scala.PartialDuplicateScanner.main(PartialDuplicateScanner.scala) Here is my Testprojekt.sbt file: name := Testprojekt version := 1.0 scalaVersion := 2.10.4 libraryDependencies ++= { Seq( org.apache.lucene % lucene-core % 4.9.0, org.apache.lucene % lucene-analyzers-common % 4.9.0, org.apache.lucene % lucene-queryparser % 4.9.0, (org.apache.spark %% spark-core % 1.0.2). exclude(org.mortbay.jetty, servlet-api). exclude(commons-beanutils, commons-beanutils-core). exclude(commons-collections, commons-collections). exclude(commons-collections, commons-collections). exclude(com.esotericsoftware.minlog, minlog). exclude(org.eclipse.jetty.orbit, javax.mail.glassfish). exclude(org.eclipse.jetty.orbit, javax.transaction). exclude(org.eclipse.jetty.orbit, javax.servlet) ) } resolvers += Akka Repository at http://repo.akka.io/releases/; - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Dependency Problem with Spark / ScalaTest / SBT
Hi, I just called: test or run Thorsten Am 10.09.2014 um 13:38 schrieb arthur.hk.c...@gmail.com: Hi, What is your SBT command and the parameters? Arthur On 10 Sep, 2014, at 6:46 pm, Thorsten Bergler sp...@tbonline.de wrote: Hello, I am writing a Spark App which is already working so far. Now I started to build also some UnitTests, but I am running into some dependecy problems and I cannot find a solution right now. Perhaps someone could help me. I build my Spark Project with SBT and it seems to be configured well, because compiling, assembling and running the built jar with spark-submit are working well. Now I started with the UnitTests, which I located under /src/test/scala. When I call test in sbt, I get the following: 14/09/10 12:22:06 INFO storage.BlockManagerMaster: Registered BlockManager 14/09/10 12:22:06 INFO spark.HttpServer: Starting HTTP Server [trace] Stack trace suppressed: run last test:test for the full output. [error] Could not run test test.scala.SetSuite: java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse [info] Run completed in 626 milliseconds. [info] Total number of tests run: 0 [info] Suites: completed 0, aborted 0 [info] Tests: succeeded 0, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. [error] Error during tests: [error] test.scala.SetSuite [error] (test:test) sbt.TestsFailedException: Tests unsuccessful [error] Total time: 3 s, completed 10.09.2014 12:22:06 last test:test gives me the following: last test:test [debug] Running TaskDef(test.scala.SetSuite, org.scalatest.tools.Framework$$anon$1@6e5626c8, false, [SuiteSelector]) java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse at org.apache.spark.HttpServer.start(HttpServer.scala:54) at org.apache.spark.broadcast.HttpBroadcast$.createServer(HttpBroadcast.scala:156) at org.apache.spark.broadcast.HttpBroadcast$.initialize(HttpBroadcast.scala:127) at org.apache.spark.broadcast.HttpBroadcastFactory.initialize(HttpBroadcastFactory.scala:31) at org.apache.spark.broadcast.BroadcastManager.initialize(BroadcastManager.scala:48) at org.apache.spark.broadcast.BroadcastManager.init(BroadcastManager.scala:35) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:218) at org.apache.spark.SparkContext.init(SparkContext.scala:202) at test.scala.SetSuite.init(SparkTest.scala:16) I also noticed right now, that sbt run is also not working: 14/09/10 12:44:46 INFO spark.HttpServer: Starting HTTP Server [error] (run-main-2) java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse at org.apache.spark.HttpServer.start(HttpServer.scala:54) at org.apache.spark.broadcast.HttpBroadcast$.createServer(HttpBroadcast.scala:156) at org.apache.spark.broadcast.HttpBroadcast$.initialize(HttpBroadcast.scala:127) at org.apache.spark.broadcast.HttpBroadcastFactory.initialize(HttpBroadcastFactory.scala:31) at org.apache.spark.broadcast.BroadcastManager.initialize(BroadcastManager.scala:48) at org.apache.spark.broadcast.BroadcastManager.init(BroadcastManager.scala:35) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:218) at org.apache.spark.SparkContext.init(SparkContext.scala:202) at main.scala.PartialDuplicateScanner$.main(PartialDuplicateScanner.scala:29) at main.scala.PartialDuplicateScanner.main(PartialDuplicateScanner.scala) Here is my Testprojekt.sbt file: name := Testprojekt version := 1.0 scalaVersion := 2.10.4 libraryDependencies ++= { Seq( org.apache.lucene % lucene-core % 4.9.0, org.apache.lucene % lucene-analyzers-common % 4.9.0, org.apache.lucene % lucene-queryparser % 4.9.0, (org.apache.spark %% spark-core % 1.0.2). exclude(org.mortbay.jetty, servlet-api). exclude(commons-beanutils, commons-beanutils-core). exclude(commons-collections, commons-collections). exclude(commons-collections, commons-collections). exclude(com.esotericsoftware.minlog, minlog). exclude(org.eclipse.jetty.orbit, javax.mail.glassfish). exclude(org.eclipse.jetty.orbit, javax.transaction). exclude(org.eclipse.jetty.orbit, javax.servlet) ) } resolvers += Akka Repository at http://repo.akka.io/releases/; - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: groupBy gives non deterministic results
Ah, thank you. I did not notice that. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/groupBy-gives-non-deterministic-results-tp13698p13871.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
nested rdd operation
Hi , I have a question on spark this programm on spark-shell val filerdd = sc.textFile(NOTICE,2) val maprdd = filerdd.map( word = filerdd.map( word2 = (word2+word) ) ) maprdd.collect() throws NULL pointer exception , can somebody explain why i cannot have a nested rdd operation ? --pavlos - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
JavaPairRDDString, Integer to JavaPairRDDString, String based on key
Is it possible to generate a JavaPairRDDString, Integer from a JavaPairRDDString, String, where I can also use the key values? I have looked at for instance mapToPair, but this generates a new K/V pair based on the original value, and does not give me information about the key. I need this in the initialization phase, where I have two RDD's with similar keys, but with different types of values. Generating these is computational intensive, and if I could use the first list to generate the second, it would save me a big map/reduce phase. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/JavaPairRDD-String-Integer-to-JavaPairRDD-String-String-based-on-key-tp13875.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL -- more than two tables for join
Hi, I too had tried SQL queries with joins, MINUS , subqueries etc but they did not work in Spark Sql. I did not find any documentation on what queries work and what do not work in Spark SQL, may be we have to wait for the Spark book to be released in Feb-2015. I believe you can try HiveQL in Spark for your requirement. Thanks, Arun -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-more-than-two-tables-for-join-tp13865p13877.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: nested rdd operation
You can't use an RDD inside an operation on an RDD. Here you have filerdd in your map function. It sort of looks like you want a cartesian product of the RDD with itself, so look at the cartesian() method. It may not be a good idea to compute such a thing. On Wed, Sep 10, 2014 at 1:57 PM, Pavlos Katsogridakis kats...@ics.forth.gr wrote: Hi , I have a question on spark this programm on spark-shell val filerdd = sc.textFile(NOTICE,2) val maprdd = filerdd.map( word = filerdd.map( word2 = (word2+word) ) ) maprdd.collect() throws NULL pointer exception , can somebody explain why i cannot have a nested rdd operation ? --pavlos - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: groupBy gives non deterministic results
| Do the two mailing lists share messages ? I don't think so. I didn't receive this message from the user list. I am not in databricks, so I can't answer your other questions. Maybe Davies Liu dav...@databricks.com can answer you? -- Ye Xianjin Sent with Sparrow (http://www.sparrowmailapp.com/?sig) On Wednesday, September 10, 2014 at 9:05 PM, redocpot wrote: Hi, Xianjin I checked user@spark.apache.org (mailto:user@spark.apache.org), and found my post there: http://mail-archives.apache.org/mod_mbox/spark-user/201409.mbox/browser I am using nabble to send this mail, which indicates that the mail will be sent from my email address to the u...@spark.incubator.apache.org (mailto:u...@spark.incubator.apache.org) mailing list. Do the two mailing lists share messages ? Do we have a nabble interface for user@spark.apache.org (mailto:user@spark.apache.org) mail list ? Thank you. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/groupBy-gives-non-deterministic-results-tp13698p13876.html Sent from the Apache Spark User List mailing list archive at Nabble.com (http://Nabble.com). - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org (mailto:user-unsubscr...@spark.apache.org) For additional commands, e-mail: user-h...@spark.apache.org (mailto:user-h...@spark.apache.org)
Re: JavaPairRDDString, Integer to JavaPairRDDString, String based on key
So, each key-value pair gets a new value for the original key? you want mapValues(). On Wed, Sep 10, 2014 at 2:01 PM, Tom thubregt...@gmail.com wrote: Is it possible to generate a JavaPairRDDString, Integer from a JavaPairRDDString, String, where I can also use the key values? I have looked at for instance mapToPair, but this generates a new K/V pair based on the original value, and does not give me information about the key. I need this in the initialization phase, where I have two RDD's with similar keys, but with different types of values. Generating these is computational intensive, and if I could use the first list to generate the second, it would save me a big map/reduce phase. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/JavaPairRDD-String-Integer-to-JavaPairRDD-String-String-based-on-key-tp13875.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Global Variables in Spark Streaming
Hi Friends, I'm using spark streaming as for kafka consumer. I want to do the CEP by spark. So as for that I need to store my sequence of events. so that I cant detect some pattern. My question is How can I save my events in java collection temporary , So that i can detect pattern by *processed(temporary stored) and upcoming events.* Cheers, Ravi Sharma
Some techniques for improving application performance
Spark friends, I recently wrote up a blog post with examples of some of the standard techniques for improving Spark application performance: http://chapeau.freevariable.com/2014/09/improving-spark-application-performance.html The idea is that we start with readable but poorly-performing code and iteratively refine it, looking at the performance and operational consequences of a series of simple changes. My intention is that this would provide some context for folks who are new to Spark and looking to improve their prototype applications. If you have additional suggestions for techniques to address, let me know! I'd be happy to write a follow-up post. best, wb - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL -- more than two tables for join
Hi, May be you can take a look about the following. http://databricks.com/blog/2014/03/26/spark-sql-manipulating-structured-data-using-spark-2.html Good luck. Arthur On 10 Sep, 2014, at 9:09 pm, arunshell87 shell.a...@gmail.com wrote: Hi, I too had tried SQL queries with joins, MINUS , subqueries etc but they did not work in Spark Sql. I did not find any documentation on what queries work and what do not work in Spark SQL, may be we have to wait for the Spark book to be released in Feb-2015. I believe you can try HiveQL in Spark for your requirement. Thanks, Arun -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-more-than-two-tables-for-join-tp13865p13877.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Task not serializable
You mention that you are creating a UserGroupInformation inside your function, but something is still serializing it. You should show your code since it may not be doing what you think. If you instantiate an object, it happens every time your function is called. map() is called once per data element; mapPartitions() once per partition. It depends. On Wed, Sep 10, 2014 at 3:25 PM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi Sean, The solution of instantiating the non-serializable class inside the map is working fine, but I hit a road block. The solution is not working for singleton classes like UserGroupInformation. In my logic as part of processing a HDFS file, I need to refer to some reference files which are again available in HDFS. So inside the map method I'm trying to instantiate UserGroupInformation to get an instance of FileSystem. Then using this FileSystem instance I read those reference files and use that data in my processing logic. This is throwing task not serializable exceptions for 'UserGroupInformation' and 'FileSystem' classes. I also tried using 'SparkHadoopUtil' instead of 'UserGroupInformation'. But it didn't resolve the issue. Request you provide some pointers in this regard. Also I have a query - when we instantiate a class inside map method, does it create a new instance for every RDD it is processing? Thanks Regards, Sarath On Sat, Sep 6, 2014 at 4:32 PM, Sean Owen so...@cloudera.com wrote: I disagree that the generally right change is to try to make the classes serializable. Usually, classes that are not serializable are not supposed to be serialized. You're using them in a way that's causing them to be serialized, and that's probably not desired. For example, this is wrong: val foo: SomeUnserializableManagerClass = ... rdd.map(d = foo.bar(d)) This is right: rdd.map { d = val foo: SomeUnserializableManagerClass = ... foo.bar(d) } In the first instance, you create the object on the driver and try to serialize and copy it to workers. In the second, you're creating SomeUnserializableManagerClass in the function and therefore on the worker. mapPartitions is better if this creation is expensive. On Fri, Sep 5, 2014 at 3:06 PM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi, I'm trying to migrate a map-reduce program to work with spark. I migrated the program from Java to Scala. The map-reduce program basically loads a HDFS file and for each line in the file it applies several transformation functions available in various external libraries. When I execute this over spark, it is throwing me Task not serializable exceptions for each and every class being used from these from external libraries. I included serialization to few classes which are in my scope, but there there are several other classes which are out of my scope like org.apache.hadoop.io.Text. How to overcome these exceptions? ~Sarath. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark NLP
Hi all, What is your preferred scala NLP lib ? why ? Is there any items on the spark’s road map to integrate NLP features ? I basically need to perform NER line by line, so I don’t need a deep integration with the distributed engine. I only want simple dependencies and the chance to build a dictionary for italian Language. Any suggestions ? Thanks Paolo Platter
Re: Task not serializable
Thanks Sean. Please find attached my code. Let me know your suggestions/ideas. Regards, *Sarath* On Wed, Sep 10, 2014 at 8:05 PM, Sean Owen so...@cloudera.com wrote: You mention that you are creating a UserGroupInformation inside your function, but something is still serializing it. You should show your code since it may not be doing what you think. If you instantiate an object, it happens every time your function is called. map() is called once per data element; mapPartitions() once per partition. It depends. On Wed, Sep 10, 2014 at 3:25 PM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi Sean, The solution of instantiating the non-serializable class inside the map is working fine, but I hit a road block. The solution is not working for singleton classes like UserGroupInformation. In my logic as part of processing a HDFS file, I need to refer to some reference files which are again available in HDFS. So inside the map method I'm trying to instantiate UserGroupInformation to get an instance of FileSystem. Then using this FileSystem instance I read those reference files and use that data in my processing logic. This is throwing task not serializable exceptions for 'UserGroupInformation' and 'FileSystem' classes. I also tried using 'SparkHadoopUtil' instead of 'UserGroupInformation'. But it didn't resolve the issue. Request you provide some pointers in this regard. Also I have a query - when we instantiate a class inside map method, does it create a new instance for every RDD it is processing? Thanks Regards, Sarath On Sat, Sep 6, 2014 at 4:32 PM, Sean Owen so...@cloudera.com wrote: I disagree that the generally right change is to try to make the classes serializable. Usually, classes that are not serializable are not supposed to be serialized. You're using them in a way that's causing them to be serialized, and that's probably not desired. For example, this is wrong: val foo: SomeUnserializableManagerClass = ... rdd.map(d = foo.bar(d)) This is right: rdd.map { d = val foo: SomeUnserializableManagerClass = ... foo.bar(d) } In the first instance, you create the object on the driver and try to serialize and copy it to workers. In the second, you're creating SomeUnserializableManagerClass in the function and therefore on the worker. mapPartitions is better if this creation is expensive. On Fri, Sep 5, 2014 at 3:06 PM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi, I'm trying to migrate a map-reduce program to work with spark. I migrated the program from Java to Scala. The map-reduce program basically loads a HDFS file and for each line in the file it applies several transformation functions available in various external libraries. When I execute this over spark, it is throwing me Task not serializable exceptions for each and every class being used from these from external libraries. I included serialization to few classes which are in my scope, but there there are several other classes which are out of my scope like org.apache.hadoop.io.Text. How to overcome these exceptions? ~Sarath. class MapperNew extends Serializable { var hadoopConf: Configuration = _; var recordValidator: RecordValidator = _; var rulesLoader: RulesLoader = _; var recordTransformer: TransFilEngine = _; var ach: ConfigurationHadoop = _; var file: RDD[(Long, String)] = _; // Configuration is passed by caller def run(c: Configuration): Unit = { hadoopConf = c; val sparkConf = new SparkConf() .setMaster(hadoopConf.get(sparkMaster)) .setAppName(NewMapper) .setSparkHome(hadoopConf.get(sparkHome)) .setJars(Seq(rbcommon.jar, rbengine.jar)); val sparkContext = new SparkContext(sparkConf); val util = SparkHadoopUtil.get; util.runAsSparkUser(() = { file = sparkContext.newAPIHadoopFile(hadoopConf.get(inputPath), classOf[TextInputFormat], classOf[LongWritable], classOf[Text], hadoopConf).map(r = (r._1.get(), r._2.toString())); }); // Works fine till this line println(File line count = + file.count); val rulesFilePath = hdfs://slave:54310/user/hduser/ + // Without this spark is trying to read from local file system hadoopConf.get(rulesFilePath) + / + hadoopConf.get(rulesFile); var processed = file.map(line = { // Doesn't work throws 'task not serializable' exception for UserGroupInformation val ugi = UserGroupInformation.createRemoteUser(hadoopConf.get(remoteUser)); // Doesn't work throws 'task not serializable' exception for FileSystem val fs = ugi.doAs(new PrivilegedExceptionAction[FileSystem] { def run(): FileSystem = { FileSystem.get(hadoopConf); } }); // RulesLoader and TransFilEngine are
Re: flattening a list in spark sql
Hi , Thanks it worked, really appreciate your help. I have also been trying to do multiple Lateral Views, but it doesn't seem to be working. Query : hiveContext.sql(Select t2 from fav LATERAL VIEW explode(TABS) tabs1 as t1 LATERAL VIEW explode(t1) tabs2 as t2) Exception org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: 't2, tree: Regards, Gaurav -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/flattening-a-list-in-spark-sql-tp13300p13894.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: pyspark and cassandra
Hi , I try to evaluate different option of spark + cassandra and I have couple of additional questions. My aim is to use cassandra only without hadoop: 1) Is it possible to use only cassandra as input/output parameter for PySpark? 2) In case I'll use Spark (java,scala) is it possible to use only cassandra - input/output without hadoop? 3) I know there are couple of strategies for storage level, in case my data set is quite big and I have no enough memory to process - can I use DISK_ONLY option without hadoop (having only cassandra)? Thanks Oleg On Wed, Sep 3, 2014 at 3:08 AM, Kan Zhang kzh...@apache.org wrote: In Spark 1.1, it is possible to read from Cassandra using Hadoop jobs. See examples/src/main/python/cassandra_inputformat.py for an example. You may need to write your own key/value converters. On Tue, Sep 2, 2014 at 11:10 AM, Oleg Ruchovets oruchov...@gmail.com wrote: Hi All , Is it possible to have cassandra as input data for PySpark. I found example for java - http://java.dzone.com/articles/sparkcassandra-stack-perform?page=0,0 and I am looking something similar for python. Thanks Oleg.
Cassandra connector
Hi, I am having difficulty getting the Cassandra connector running within the spark shell. My jars looks like: [wwilkins@phel-spark-001 logs]$ ls -altr /opt/connector/ total 14588 drwxr-xr-x. 5 root root4096 Sep 9 22:15 .. -rw-r--r-- 1 root root 242809 Sep 9 22:20 spark-cassandra-connector-master.zip -rw-r--r-- 1 root root 541332 Sep 9 22:20 cassandra-driver-core-2.0.3.jar -rw-r--r-- 1 root root 1855685 Sep 9 22:20 cassandra-thrift-2.0.9.jar -rw-r--r-- 1 root root 30085 Sep 9 22:20 commons-codec-1.2.jar -rw-r--r-- 1 root root 315805 Sep 9 22:20 commons-lang3-3.1.jar -rw-r--r-- 1 root root 60686 Sep 9 22:20 commons-logging-1.1.1.jar -rw-r--r-- 1 root root 2228009 Sep 9 22:20 guava-16.0.1.jar -rw-r--r-- 1 root root 433368 Sep 9 22:20 httpclient-4.2.5.jar -rw-r--r-- 1 root root 227275 Sep 9 22:20 httpcore-4.2.4.jar -rw-r--r-- 1 root root 1222059 Sep 9 22:20 ivy-2.3.0.jar.bak -rw-r--r-- 1 root root 38460 Sep 9 22:20 joda-convert-1.2.jar.bak -rw-r--r-- 1 root root 98818 Sep 9 22:20 joda-convert-1.6.jar -rw-r--r-- 1 root root 581571 Sep 9 22:20 joda-time-2.3.jar -rw-r--r-- 1 root root 217053 Sep 9 22:20 libthrift-0.9.1.jar -rw-r--r-- 1 root root 618 Sep 9 22:20 log4j.properties -rw-r--r-- 1 root root 165505 Sep 9 22:20 lz4-1.2.0.jar -rw-r--r-- 1 root root 85448 Sep 9 22:20 metrics-core-3.0.2.jar -rw-r--r-- 1 root root 1231993 Sep 9 22:20 netty-3.9.0.Final.jar -rw-r--r-- 1 root root 26083 Sep 9 22:20 slf4j-api-1.7.2.jar.bak -rw-r--r-- 1 root root 26084 Sep 9 22:20 slf4j-api-1.7.5.jar -rw-r--r-- 1 root root 1251514 Sep 9 22:20 snappy-java-1.0.5.jar -rw-r--r-- 1 root root 776782 Sep 9 22:20 spark-cassandra-connector_2.10-1.0.0-beta1.jar -rw-r--r-- 1 root root 997458 Sep 9 22:20 spark-cassandra-connector_2.10-1.0.0-SNAPSHOT.jar.bak3 -rwxr--r-- 1 root root 1113208 Sep 9 22:20 spark-cassandra-connector_2.10-1.1.0-SNAPSHOT.jar.bak -rw-r--r-- 1 root root 804 Sep 9 22:20 spark-cassandra-connector_2.10-1.1.0-SNAPSHOT.jar.bak2 I launch the shell with this command: /data/spark/bin/spark-shell --driver-class-path $(echo /opt/connector/*.jar |sed 's/ /:/g') I run these commands: sc.stop import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import com.datastax.spark.connector._ val conf = new SparkConf() conf.set(spark.cassandra.connection.host, 10.208.59.164) val sc = new SparkContext(conf) val table = sc.cassandraTable(retail, ordercf) And I get this problem: scala val table = sc.cassandraTable(retail, ordercf) java.lang.AbstractMethodError at org.apache.spark.Logging$class.log(Logging.scala:52) at com.datastax.spark.connector.cql.CassandraConnector$.log(CassandraConnector.scala:145) at org.apache.spark.Logging$class.logDebug(Logging.scala:63) at com.datastax.spark.connector.cql.CassandraConnector$.logDebug(CassandraConnector.scala:145) at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createCluster(CassandraConnector.scala:155) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$4.apply(CassandraConnector.scala:152) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$4.apply(CassandraConnector.scala:152) at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:36) at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:61) at com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:72) at com.datastax.spark.connector.cql.Schema.keyspaces$lzycompute(Schema.scala:111) at com.datastax.spark.connector.cql.Schema.keyspaces(Schema.scala:110) at com.datastax.spark.connector.cql.Schema.tables$lzycompute(Schema.scala:123) at com.datastax.spark.connector.cql.Schema.tables(Schema.scala:122) at com.datastax.spark.connector.rdd.CassandraRDD.tableDef$lzycompute(CassandraRDD.scala:196) at com.datastax.spark.connector.rdd.CassandraRDD.tableDef(CassandraRDD.scala:195) at com.datastax.spark.connector.rdd.CassandraRDD.init(CassandraRDD.scala:202) at com.datastax.spark.connector.package$SparkContextFunctions.cassandraTable(package.scala:94) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:22) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:27) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:29) at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:31) at $iwC$$iwC$$iwC$$iwC.init(console:33) at $iwC$$iwC$$iwC.init(console:35) at $iwC$$iwC.init(console:37) at $iwC.init(console:39) at init(console:41) at .init(console:45) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at
Re: Cassandra connector
Are you using spark 1.1 ? If yes you would have to update the datastax cassandra connector code and remove ref to log methods from CassandraConnector.scala Regards, Gaurav -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Cassandra-connector-tp13896p13897.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
Hi , The latest changes with Kafka message re-play by manipulating ZK offset seems to be working fine for us. This gives us some relief till actual issue is fixed in Spark 1.2 . I have some question on how Spark process the Received data. The logic I used is basically to pull messages form individual partitions using dedicated Receivers, and doing a Union of these Stream . After that I process this union stream. Today I wanted to test this consumer with our Internal Kafka cluster which has around 50 million records, with this huge backlog I found Spark only running the Receiver task and not running the Processing task (or rather doing it very slow) . Is this a issue with the Consumer or it is a issue from Spark side ? Ideally when Receivers durably write data to Store , the processing should start in parallel . Why does the processing task need to wait till the Receiver consumes all 50 million messages. ...Or may be I am doing something wrong ? I can share the driver log if you want. in Driver I can see only storage.BlockManagerInfo: Added input... type messages, but hardly I see scheduler.TaskSetManager: Starting task... messages.. I see data getting written to target system in very very slow pace. Regards, Dibyendu On Mon, Sep 8, 2014 at 12:08 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Hi Tathagata, I have managed to implement the logic into the Kafka-Spark consumer to recover from Driver failure. This is just a interim fix till actual fix is done from Spark side. The logic is something like this. 1. When the Individual Receivers starts for every Topic partition, it writes the Kafka messages along with certain meta data in Block Store. This meta data contains the details of message offset, partition id, topic name and consumer id. You can see this logic in PartitionManager.java next() method. 2. In the Driver code ( Consumer.java) , I am creating the union of all there individual D-Streams, and processing the data using forEachRDD call. In the driver code, I am receiving the RDD which contains the Kafka messages along with meta data details. In the driver code, periodically I am committing the processed offset of the Kafka message into ZK. 3. When driver stops, and restart again, the Receiver starts again, and this time in PartitionManager.java, I am checking what is the actual committed offset for the partition, and what is the actual processed offset of the same partition. This logic is in the PartitionManager constructor. If this is a Receiver restart, and processed offset of less than Committed offset, I am started fetching again from Processed offset. This may lead to duplicate records, but our system can handle duplicates. I have tested with multiple driver kill/stops and I found no data loss in Kafka consumer. In the Driver code, I have not done any checkpointing yet, will test that tomorrow. One interesting thing I found, if I do repartition of original stream , I can still see the issue of data loss in this logic. What I believe, during re- partitioning Spark might be changing the order of RDDs the way it generated from Kafka stream. So during re-partition case, even when I am committing processed offset, but as this is not in order I still see issue. Not sure if this understanding is correct, but not able to find any other explanation. But if I do not use repartition this solution works fine. I can make this as configurable, so that when actual fix is available , this feature in consumer can be turned off as this is an overhead for the consumer . Let me know what you think.. Regards, Dibyendu On Fri, Sep 5, 2014 at 11:14 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Some thoughts on this thread to clarify the doubts. 1. Driver recovery: The current (1.1 to be released) does not recover the raw data that has been received but not processes. This is because when the driver dies, the executors die and so does the raw data that was stored in it. Only for HDFS, the data is not lost by driver recovery as the data is already present reliably in HDFS. This is something we want to fix by Spark 1.2 (3 month from now). Regarding recovery by replaying the data from Kafka, it is possible but tricky. Our goal is to provide strong guarantee, exactly-once semantics in all transformations. To guarantee this for all kinds of streaming computations stateful and not-stateful computations, it is requires that the data be replayed through Kafka in exactly same order, and the underlying blocks of data in Spark be regenerated in the exact way as it would have if there was no driver failure. This is quite tricky to implement, requires manipulation of zookeeper offsets, etc, that is hard to do with the high level consumer that KafkaUtil uses. Dibyendu's low level Kafka receiver may enable such approaches in the future. For now we definitely plan to solve the first problem very very soon. 3.
Re: groupBy gives non deterministic results
I think the mails to spark.incubator.apache.org will be forwarded to spark.apache.org. Here is the header of the first mail: from: redocpot julien19890...@gmail.com to: u...@spark.incubator.apache.org date: Mon, Sep 8, 2014 at 7:29 AM subject: groupBy gives non deterministic results mailing list: user.spark.apache.org Filter messages from this mailing list mailed-by: spark.apache.org I only subscribe spark.apache.org, and I do see all the mails from he. On Wed, Sep 10, 2014 at 6:29 AM, Ye Xianjin advance...@gmail.com wrote: | Do the two mailing lists share messages ? I don't think so. I didn't receive this message from the user list. I am not in databricks, so I can't answer your other questions. Maybe Davies Liu dav...@databricks.com can answer you? -- Ye Xianjin Sent with Sparrow On Wednesday, September 10, 2014 at 9:05 PM, redocpot wrote: Hi, Xianjin I checked user@spark.apache.org, and found my post there: http://mail-archives.apache.org/mod_mbox/spark-user/201409.mbox/browser I am using nabble to send this mail, which indicates that the mail will be sent from my email address to the u...@spark.incubator.apache.org mailing list. Do the two mailing lists share messages ? Do we have a nabble interface for user@spark.apache.org mail list ? Thank you. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/groupBy-gives-non-deterministic-results-tp13698p13876.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Global Variables in Spark Streaming
Yes your understanding is correct. In that case one easiest option would be to Serialize the object and dump it somewhere in hdfs so that you will be able to recreate/update the object from the file. We have something similar which you can find over BroadCastServer https://github.com/sigmoidanalytics/spork/blob/spork-0.9/src/org/apache/pig/backend/hadoop/executionengine/spark/BroadCastServer.java and BroadCastClient https://github.com/sigmoidanalytics/spork/blob/spork-0.9/src/org/apache/pig/backend/hadoop/executionengine/spark/BroadCastClient.java which we use internally to pass/update Objects between master and worker nodes. Thanks Best Regards On Wed, Sep 10, 2014 at 7:50 PM, Ravi Sharma raviprincesha...@gmail.com wrote: Akhil, By using broadcast variable Will I be able to change the values of Broadcast variable? As per my understanding It will create final variable to access the value across the cluster. Please correct me if I'm wrong. Thanks, Cheers, Ravi Sharma On Wed, Sep 10, 2014 at 7:31 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Have a look at Broadcasting variables http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables Thanks Best Regards On Wed, Sep 10, 2014 at 7:25 PM, Ravi Sharma raviprincesha...@gmail.com wrote: Hi Friends, I'm using spark streaming as for kafka consumer. I want to do the CEP by spark. So as for that I need to store my sequence of events. so that I cant detect some pattern. My question is How can I save my events in java collection temporary , So that i can detect pattern by *processed(temporary stored) and upcoming events.* Cheers, Ravi Sharma
Re: flattening a list in spark sql
My bad, please ignore, it works !!! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/flattening-a-list-in-spark-sql-tp13300p13901.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL -- more than two tables for join
What version of Spark SQL are you running here? I think a lot of your concerns have likely been addressed in more recent versions of the code / documentation. (Spark 1.1 should be published in the next few days) In particular, for serious applications you should use a HiveContext and HiveQL as this is a much more complete implementation of a SQL Parser. The one in SQL context is only suggested if the Hive dependencies conflict with your application. 1) spark sql does not support multiple join This is not true. What problem were you running into? 2) spark left join: has performance issue Can you describe your data and query more? 3) spark sql’s cache table: does not support two-tier query I'm not sure what you mean here. 4) spark sql does not support repartition You can repartition SchemaRDDs in the same way as normal RDDs.
Re: Spark HiveQL support plan
HiveQL is the default language for the JDBC server which will be available as part of the 1.1 release (coming very soon!). Adding support for calling MLlib and other spark libraries is on the roadmap, but not possible at this moment. On Tue, Sep 9, 2014 at 1:45 PM, XUE, Xiaohui xiaohui@sap.com wrote: Hi, In Spark website, there’s a plan to support HiveQL on top of Spark SQL and also to support JDBC/ODBC. I would like to know if in this “HiveQL” supported by Spark (or Spark SQL accessible through JDBC/ODBC), is there a plan to add extensions to leverage different Spark features like machine learning and stream? I’m asking this as we’re considering using Spark from remote machine, so this would be easier through JDBC/ODBC. But it’ll be good to also take benefice of other Spark features than the ones exist in HiveQL. Thanks, Xiaohui Xue
RE: Cassandra connector
Thank you! I am running spark 1.0 but, your suggestion worked for me. I rem'ed out all //logDebug In both CassandraConnector.scala and Schema.scala I am moving again. Regards, Wade -Original Message- From: gtinside [mailto:gtins...@gmail.com] Sent: Wednesday, September 10, 2014 8:49 AM To: u...@spark.incubator.apache.org Subject: Re: Cassandra connector Are you using spark 1.1 ? If yes you would have to update the datastax cassandra connector code and remove ref to log methods from CassandraConnector.scala Regards, Gaurav -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Cassandra-connector-tp13896p13897.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: how to run python examples in spark 1.1?
Just want to provide more information on how I ran the examples. Environment: Cloudera quick start Vm 5.1.0 (HBase 0.98.1 installed). I created a table called 'data1', and 'put' two records in it. I can see the table and data are fine in hbase shell. I cloned spark repo and checked out to 1.1 branch, built it by running sbt/sbt assembly/assembly sbt/sbt examples/assembly. The script is basically, if __name__ == __main__: conf = SparkConf().setAppName('testspark_similar_users_v2') sc = SparkContext(conf=conf, batchSize=512) conf2 = {hbase.zookeeper.quorum: localhost, hbase.mapreduce.inputtable: 'data1'} hbase_rdd = sc.newAPIHadoopRDD( org.apache.hadoop.hbase.mapreduce.TableInputFormat, org.apache.hadoop.hbase.io.ImmutableBytesWritable, org.apache.hadoop.hbase.client.Result, keyConverter=org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter, valueConverter=org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter, conf=conf2) output = hbase_rdd.collect() for (k, v) in output: print (k, v) sc.stop() The error message is, 14/09/10 09:41:52 INFO ZooKeeper: Initiating client connection, connectString=localhost:2181 sessionTimeout=18 watcher=hconnection 14/09/10 09:41:52 INFO RecoverableZooKeeper: The identifier of this process is 25963@quickstart.cloudera 14/09/10 09:41:52 INFO ClientCnxn: Opening socket connection to server quickstart.cloudera/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) 14/09/10 09:41:52 INFO ClientCnxn: Socket connection established to quickstart.cloudera/127.0.0.1:2181, initiating session 14/09/10 09:41:52 INFO ClientCnxn: Session establishment complete on server quickstart.cloudera/127.0.0.1:2181, sessionid = 0x1485b365c450016, negotiated timeout = 4 14/09/10 09:52:32 ERROR TableInputFormat: org.apache.hadoop.hbase.client.NoServerForRegionException: Unable to find region for data1,,99 after 10 tries. at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegionInMeta(HConnectionManager.java:980) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:885) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegionInMeta(HConnectionManager.java:987) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:889) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:846) at org.apache.hadoop.hbase.client.HTable.finishSetup(HTable.java:234) at org.apache.hadoop.hbase.client.HTable.init(HTable.java:174) at org.apache.hadoop.hbase.client.HTable.init(HTable.java:133) at org.apache.hadoop.hbase.mapreduce.TableInputFormat.setConf(TableInputFormat.java:96) at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:90) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.rdd.RDD.take(RDD.scala:1060) at org.apache.spark.rdd.RDD.first(RDD.scala:1091) at org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:70) at org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDD(PythonRDD.scala:441) at org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) Traceback (most recent call last): File /home/cloudera/workspace/recom_env_testing/bin/sparkhbasecheck.py, line 71, in module
Re: Global Variables in Spark Streaming
Hi Ravi, 2014-09-10 15:55 GMT+02:00 Ravi Sharma raviprincesha...@gmail.com: I'm using spark streaming as for kafka consumer. I want to do the CEP by spark. So as for that I need to store my sequence of events. so that I cant detect some pattern. Depending on what you're trying to accomplish, you might implement this using Spark Streaming only, by using the updateStateByKey transformation. [1] This will allow you to maintain global states that you can combine with other streaming operations. We have successfully used this approach to detect patterns in log sequences with Spark Streaming. [1] http://spark.apache.org/docs/latest/streaming-programming-guide.html#transformations Best, -- Santiago M. Mola sm...@stratio.com
Se8sIang i Atau zorOi
jarAk ifz ais iviiiaiiiIsirseikejujuran8$-88Ω=-O Ω:-P in iCdsiiisOz) :) (isuii:V) (:V) riiie89θ
Re: groupBy gives non deterministic results
Well, That's weird. I don't see this thread in my mail box as sending to user list. Maybe because I also subscribe the incubator mail list? I do see mails sending to incubator mail list and no one replies. I thought it was because people don't subscribe the incubator now. -- Ye Xianjin Sent with Sparrow (http://www.sparrowmailapp.com/?sig) On Thursday, September 11, 2014 at 12:12 AM, Davies Liu wrote: I think the mails to spark.incubator.apache.org (http://spark.incubator.apache.org) will be forwarded to spark.apache.org (http://spark.apache.org). Here is the header of the first mail: from: redocpot julien19890...@gmail.com (mailto:julien19890...@gmail.com) to: u...@spark.incubator.apache.org (mailto:u...@spark.incubator.apache.org) date: Mon, Sep 8, 2014 at 7:29 AM subject: groupBy gives non deterministic results mailing list: user.spark.apache.org (http://user.spark.apache.org) Filter messages from this mailing list mailed-by: spark.apache.org (http://spark.apache.org) I only subscribe spark.apache.org (http://spark.apache.org), and I do see all the mails from he. On Wed, Sep 10, 2014 at 6:29 AM, Ye Xianjin advance...@gmail.com (mailto:advance...@gmail.com) wrote: | Do the two mailing lists share messages ? I don't think so. I didn't receive this message from the user list. I am not in databricks, so I can't answer your other questions. Maybe Davies Liu dav...@databricks.com (mailto:dav...@databricks.com) can answer you? -- Ye Xianjin Sent with Sparrow On Wednesday, September 10, 2014 at 9:05 PM, redocpot wrote: Hi, Xianjin I checked user@spark.apache.org (mailto:user@spark.apache.org), and found my post there: http://mail-archives.apache.org/mod_mbox/spark-user/201409.mbox/browser I am using nabble to send this mail, which indicates that the mail will be sent from my email address to the u...@spark.incubator.apache.org (mailto:u...@spark.incubator.apache.org) mailing list. Do the two mailing lists share messages ? Do we have a nabble interface for user@spark.apache.org (mailto:user@spark.apache.org) mail list ? Thank you. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/groupBy-gives-non-deterministic-results-tp13698p13876.html Sent from the Apache Spark User List mailing list archive at Nabble.com (http://Nabble.com). - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org (mailto:user-unsubscr...@spark.apache.org) For additional commands, e-mail: user-h...@spark.apache.org (mailto:user-h...@spark.apache.org)
Re: Task not serializable
You're using hadoopConf, a Configuration object, in your closure. That type is not serializable. You can use -Dsun.io.serialization.extendedDebugInfo=true to debug serialization issues. On Wed, Sep 10, 2014 at 8:23 AM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Thanks Sean. Please find attached my code. Let me know your suggestions/ideas. Regards, Sarath On Wed, Sep 10, 2014 at 8:05 PM, Sean Owen so...@cloudera.com wrote: You mention that you are creating a UserGroupInformation inside your function, but something is still serializing it. You should show your code since it may not be doing what you think. If you instantiate an object, it happens every time your function is called. map() is called once per data element; mapPartitions() once per partition. It depends. On Wed, Sep 10, 2014 at 3:25 PM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi Sean, The solution of instantiating the non-serializable class inside the map is working fine, but I hit a road block. The solution is not working for singleton classes like UserGroupInformation. In my logic as part of processing a HDFS file, I need to refer to some reference files which are again available in HDFS. So inside the map method I'm trying to instantiate UserGroupInformation to get an instance of FileSystem. Then using this FileSystem instance I read those reference files and use that data in my processing logic. This is throwing task not serializable exceptions for 'UserGroupInformation' and 'FileSystem' classes. I also tried using 'SparkHadoopUtil' instead of 'UserGroupInformation'. But it didn't resolve the issue. Request you provide some pointers in this regard. Also I have a query - when we instantiate a class inside map method, does it create a new instance for every RDD it is processing? Thanks Regards, Sarath On Sat, Sep 6, 2014 at 4:32 PM, Sean Owen so...@cloudera.com wrote: I disagree that the generally right change is to try to make the classes serializable. Usually, classes that are not serializable are not supposed to be serialized. You're using them in a way that's causing them to be serialized, and that's probably not desired. For example, this is wrong: val foo: SomeUnserializableManagerClass = ... rdd.map(d = foo.bar(d)) This is right: rdd.map { d = val foo: SomeUnserializableManagerClass = ... foo.bar(d) } In the first instance, you create the object on the driver and try to serialize and copy it to workers. In the second, you're creating SomeUnserializableManagerClass in the function and therefore on the worker. mapPartitions is better if this creation is expensive. On Fri, Sep 5, 2014 at 3:06 PM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: Hi, I'm trying to migrate a map-reduce program to work with spark. I migrated the program from Java to Scala. The map-reduce program basically loads a HDFS file and for each line in the file it applies several transformation functions available in various external libraries. When I execute this over spark, it is throwing me Task not serializable exceptions for each and every class being used from these from external libraries. I included serialization to few classes which are in my scope, but there there are several other classes which are out of my scope like org.apache.hadoop.io.Text. How to overcome these exceptions? ~Sarath. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to scale more consumer to Kafka stream
How are you creating your kafka streams in Spark? If you have 10 partitions for a topic, you can call createStream ten times to create 10 parallel receivers/executors and then use union to combine all the dStreams. On Wed, Sep 10, 2014 at 7:16 AM, richiesgr richie...@gmail.com wrote: Hi (my previous post as been used by someone else) I'm building a application the read from kafka stream event. In production we've 5 consumers that share 10 partitions. But on spark streaming kafka only 1 worker act as a consumer then distribute the tasks to workers so I can have only 1 machine acting as consumer but I need more because only 1 consumer means Lags. Do you've any idea what I can do ? Another point is interresting the master is not loaded at all I can get up more than 10 % CPU I've tried to increase the queued.max.message.chunks on the kafka client to read more records thinking it'll speed up the read but I only get ERROR consumer.ConsumerFetcherThread: [ConsumerFetcherThread-SparkEC2_ip-10-138-59-194.ec2.internal-1410182950783-5c49c8e8-0-174167372], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 73; ClientId: SparkEC2-ConsumerFetcherThread-SparkEC2_ip-10-138-59-194.ec2.internal-1410182950783-5c49c8e8-0-174167372; ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: [IA2,7] - PartitionFetchInfo(929838589,1048576),[IA2,6] - PartitionFetchInfo(929515796,1048576),[IA2,9] - PartitionFetchInfo(929577946,1048576),[IA2,8] - PartitionFetchInfo(930751599,1048576),[IA2,2] - PartitionFetchInfo(926457704,1048576),[IA2,5] - PartitionFetchInfo(930774385,1048576),[IA2,0] - PartitionFetchInfo(929913213,1048576),[IA2,3] - PartitionFetchInfo(929268891,1048576),[IA2,4] - PartitionFetchInfo(929949877,1048576),[IA2,1] - PartitionFetchInfo(930063114,1048576) java.lang.OutOfMemoryError: Java heap space Is someone have ideas ? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-scale-more-consumer-to-Kafka-stream-tp13883.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark.cleaner.ttl and spark.streaming.unpersist
I am using Spark 1.0.0 (on CDH 5.1) and have a similar issue. In my case, the receivers die within an hour because Yarn kills the containers for high memory usage. I set ttl.cleaner to 30 seconds but that didn't help. So I don't think stale RDDs are an issue here. I did a jmap -histo on a couple of running receiver processes and in a heap of 30G, roughly ~16G is taken by [B which is byte arrays. Still investigating more and would appreciate pointers for troubleshooting. I have dumped the heap of a receiver and will try to go over it. On Wed, Sep 10, 2014 at 1:43 AM, Luis Ángel Vicente Sánchez langel.gro...@gmail.com wrote: I somehow missed that parameter when I was reviewing the documentation, that should do the trick! Thank you! 2014-09-10 2:10 GMT+01:00 Shao, Saisai saisai.s...@intel.com: Hi Luis, The parameter “spark.cleaner.ttl” and “spark.streaming.unpersist” can be used to remove useless timeout streaming data, the difference is that “spark.cleaner.ttl” is time-based cleaner, it does not only clean streaming input data, but also Spark’s useless metadata; while “spark.streaming.unpersist” is reference-based cleaning mechanism, streaming data will be removed when out of slide duration. Both these two parameter can alleviate the memory occupation of Spark Streaming. But if the data is flooded into Spark Streaming when start up like your situation using Kafka, these two parameters cannot well mitigate the problem. Actually you need to control the input data rate to not inject so fast, you can try “spark.straming.receiver.maxRate” to control the inject rate. Thanks Jerry *From:* Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com] *Sent:* Wednesday, September 10, 2014 5:21 AM *To:* user@spark.apache.org *Subject:* spark.cleaner.ttl and spark.streaming.unpersist The executors of my spark streaming application are being killed due to memory issues. The memory consumption is quite high on startup because is the first run and there are quite a few events on the kafka queues that are consumed at a rate of 100K events per sec. I wonder if it's recommended to use spark.cleaner.ttl and spark.streaming.unpersist together to mitigate that problem. And I also wonder if new RDD are being batched while a RDD is being processed. Regards, Luis
Re: how to choose right DStream batch interval
http://www.slideshare.net/spark-project/deep-divewithsparkstreaming-tathagatadassparkmeetup20130617 Slide 39 covers it. On Tue, Sep 9, 2014 at 9:23 PM, qihong qc...@pivotal.io wrote: Hi Mayur, Thanks for your response. I did write a simple test that set up a DStream with 5 batches; The batch duration is 1 second, and the 3rd batch will take extra 2 seconds, the output of the test shows that the 3rd batch causes backlog, and spark streaming does catch up on 4th and 5th batch (DStream.print was modified to output system time) --- Time: 1409959708000 ms, system time: 1409959708269 --- 1155 --- Time: 1409959709000 ms, system time: 1409959709033 --- 2255 delay 2000 ms --- Time: 140995971 ms, system time: 1409959712036 --- 3355 --- Time: 1409959711000 ms, system time: 1409959712059 --- 4455 --- Time: 1409959712000 ms, system time: 1409959712083 --- Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-choose-right-DStream-batch-interval-tp13578p13855.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark.cleaner.ttl and spark.streaming.unpersist
Tim, I asked a similar question twice: here http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-executors-to-stay-alive-tt12940.html and here http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Executor-OOM-tt12383.html and have not yet received any responses. I noticed that the heapdump only contains a very large byte array consuming about 66%(the second link contains a picture of my heap -- I ran with a small heap to be able to get the failure quickly) I don't have solutions but wanted to affirm that I've observed a similar situation... On Wed, Sep 10, 2014 at 2:24 PM, Tim Smith secs...@gmail.com wrote: I am using Spark 1.0.0 (on CDH 5.1) and have a similar issue. In my case, the receivers die within an hour because Yarn kills the containers for high memory usage. I set ttl.cleaner to 30 seconds but that didn't help. So I don't think stale RDDs are an issue here. I did a jmap -histo on a couple of running receiver processes and in a heap of 30G, roughly ~16G is taken by [B which is byte arrays. Still investigating more and would appreciate pointers for troubleshooting. I have dumped the heap of a receiver and will try to go over it. On Wed, Sep 10, 2014 at 1:43 AM, Luis Ángel Vicente Sánchez langel.gro...@gmail.com wrote: I somehow missed that parameter when I was reviewing the documentation, that should do the trick! Thank you! 2014-09-10 2:10 GMT+01:00 Shao, Saisai saisai.s...@intel.com: Hi Luis, The parameter “spark.cleaner.ttl” and “spark.streaming.unpersist” can be used to remove useless timeout streaming data, the difference is that “spark.cleaner.ttl” is time-based cleaner, it does not only clean streaming input data, but also Spark’s useless metadata; while “spark.streaming.unpersist” is reference-based cleaning mechanism, streaming data will be removed when out of slide duration. Both these two parameter can alleviate the memory occupation of Spark Streaming. But if the data is flooded into Spark Streaming when start up like your situation using Kafka, these two parameters cannot well mitigate the problem. Actually you need to control the input data rate to not inject so fast, you can try “spark.straming.receiver.maxRate” to control the inject rate. Thanks Jerry *From:* Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com] *Sent:* Wednesday, September 10, 2014 5:21 AM *To:* user@spark.apache.org *Subject:* spark.cleaner.ttl and spark.streaming.unpersist The executors of my spark streaming application are being killed due to memory issues. The memory consumption is quite high on startup because is the first run and there are quite a few events on the kafka queues that are consumed at a rate of 100K events per sec. I wonder if it's recommended to use spark.cleaner.ttl and spark.streaming.unpersist together to mitigate that problem. And I also wonder if new RDD are being batched while a RDD is being processed. Regards, Luis
Re: spark-streaming Could not compute split exception
I had a similar issue and many others - all were basically symptoms for yarn killing the container for high memory usage. Haven't gotten to root cause yet. On Tue, Sep 9, 2014 at 3:18 PM, Marcelo Vanzin van...@cloudera.com wrote: Your executor is exiting or crashing unexpectedly: On Tue, Sep 9, 2014 at 3:13 PM, Penny Espinoza pesp...@societyconsulting.com wrote: org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit code from container container_1410224367331_0006_01_03 is : 1 2014-09-09 21:47:26,345 WARN org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exception from container-launch with container ID: container_1410224367331_0006_01_03 and exit code: 1 You can check the app logs (yarn logs --applicationId [id]) and see why the container is exiting. There's probably an exception happening somewhere. -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Hadoop Distributed Cache
Hi, As part of SparkContext.newAPIHadoopRDD(). Would Spark support an InputFormat that uses Hadoop’s distributed cache? Thanks, Máximo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
[spark upgrade] Error communicating with MapOutputTracker when running test cases in latest spark
I use https://github.com/apache/spark/blob/master/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala to help me with testing. In spark 9.1 my tests depending on TestSuiteBase worked fine. As soon as I switched to latest (1.0.1) all tests fail. My sbt import is: org.apache.spark %% spark-core % 1.1.0-SNAPSHOT % provided One exception I get is: Error communicating with MapOutputTracker org.apache.spark.SparkException: Error communicating with MapOutputTracker How can I fix this? Found a thread on this error but not very helpful: http://mail-archives.apache.org/mod_mbox/spark-dev/201405.mbox/%3ctencent_6b37d69c54f76819509a5...@qq.com%3E -Adrian
Re: pyspark and cassandra
Thanks for the clarification, Yadid. By Hadoop jobs, I meant Spark jobs that use Hadoop inputformats (as shown in the cassandra_inputformat.py example). A future possibility of accessing Cassandra from PySpark is when SparkSQL supports Cassandra as a data source. On Wed, Sep 10, 2014 at 11:37 AM, yadid ayzenberg ayz...@gmail.com wrote: You do not need to actually use Hadoop to read from cassandra. The hadoop inputformat is a standard way for hadoop jobs to read data from various sources. Spark can utilize input formats as well. The storage level has nothing to do with source of the data - be it cassandra or a file system such as HDFS. By using DISK_ONLY you are telling spark to cache the RDDs on disk only (and not memory). On Wed, Sep 10, 2014 at 11:31 AM, Oleg Ruchovets oruchov...@gmail.com wrote: Hi , I try to evaluate different option of spark + cassandra and I have couple of additional questions. My aim is to use cassandra only without hadoop: 1) Is it possible to use only cassandra as input/output parameter for PySpark? 2) In case I'll use Spark (java,scala) is it possible to use only cassandra - input/output without hadoop? 3) I know there are couple of strategies for storage level, in case my data set is quite big and I have no enough memory to process - can I use DISK_ONLY option without hadoop (having only cassandra)? Thanks Oleg On Wed, Sep 3, 2014 at 3:08 AM, Kan Zhang kzh...@apache.org wrote: In Spark 1.1, it is possible to read from Cassandra using Hadoop jobs. See examples/src/main/python/cassandra_inputformat.py for an example. You may need to write your own key/value converters. On Tue, Sep 2, 2014 at 11:10 AM, Oleg Ruchovets oruchov...@gmail.com wrote: Hi All , Is it possible to have cassandra as input data for PySpark. I found example for java - http://java.dzone.com/articles/sparkcassandra-stack-perform?page=0,0 and I am looking something similar for python. Thanks Oleg.
Re: Table not found: using jdbc console to query sparksql hive thriftserver
I used the hiveContext to register the tables and the tables are still not being found by the thrift server. Do I have to pass the hiveContext to JDBC somehow? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Table-not-found-using-jdbc-console-to-query-sparksql-hive-thriftserver-tp13840p13922.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Accumulo and Spark
I've been doing some Googling and haven't found much info on how to incorporate Spark and Accumulo. Does anyone know of some examples of how to tie Spark to Accumulo (for both fetching data and dumping results)? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Accumulo-and-Spark-tp13923.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Table not found: using jdbc console to query sparksql hive thriftserver
Actually, when registering the table, it is only available within the sc context you are running it in. For Spark 1.1, the method name is changed to RegisterAsTempTable to better reflect that. The Thrift server process runs under a different process meaning that it cannot see any of the tables generated within the sc context. You would need to save the sc table into Hive and then the Thrift process would be able to see them. HTH! On Sep 10, 2014, at 13:08, alexandria1101 alexandria.shea...@gmail.com wrote: I used the hiveContext to register the tables and the tables are still not being found by the thrift server. Do I have to pass the hiveContext to JDBC somehow? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Table-not-found-using-jdbc-console-to-query-sparksql-hive-thriftserver-tp13840p13922.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Table not found: using jdbc console to query sparksql hive thriftserver
Hi Denny, There is a related question by the way. I have a program that reads in a stream of RDD¹s, each of which is to be loaded into a hive table as one partition. Currently I do this by first writing the RDD¹s to HDFS and then loading them to hive, which requires multiple passes of HDFS I/O and serialization/deserialization. I wonder if it is possible to do it more efficiently with Spark 1.1 streaming + SQL, e.g., by registering the RDDs into a hive context so that the data is loaded directly into the hive table in cache and meanwhile visible to jdbc/odbc clients. In the spark source code, the method registerTempTable you mentioned works on SqlContext instead of HiveContext. Thanks, Du On 9/10/14, 1:21 PM, Denny Lee denny.g@gmail.com wrote: Actually, when registering the table, it is only available within the sc context you are running it in. For Spark 1.1, the method name is changed to RegisterAsTempTable to better reflect that. The Thrift server process runs under a different process meaning that it cannot see any of the tables generated within the sc context. You would need to save the sc table into Hive and then the Thrift process would be able to see them. HTH! On Sep 10, 2014, at 13:08, alexandria1101 alexandria.shea...@gmail.com wrote: I used the hiveContext to register the tables and the tables are still not being found by the thrift server. Do I have to pass the hiveContext to JDBC somehow? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Table-not-found-using -jdbc-console-to-query-sparksql-hive-thriftserver-tp13840p13922.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
java.lang.ClassCastException: java.lang.Long cannot be cast to scala.Tuple2
Hey guys, After rebuilding from the master branch this morning, I’ve started to see these errors that I’ve never gotten before while running connected components. Anyone seen this before? 14/09/10 20:38:53 INFO collection.ExternalSorter: Thread 87 spilling in-memory batch of 1020 MB to disk (1 spill so far) 14/09/10 20:38:53 INFO collection.ExternalSorter: Thread 58 spilling in-memory batch of 1020 MB to disk (1 spill so far) 14/09/10 20:38:53 INFO collection.ExternalSorter: Thread 57 spilling in-memory batch of 1020 MB to disk (1 spill so far) 14/09/10 20:38:53 INFO collection.ExternalSorter: Thread 60 spilling in-memory batch of 1020 MB to disk (1 spill so far) 14/09/10 20:39:15 ERROR executor.Executor: Exception in task 275.0 in stage 3.0 (TID 994) java.lang.ClassCastException: java.lang.Long cannot be cast to scala.Tuple2 at org.apache.spark.graphx.impl.RoutingTableMessageSerializer$$anon$1$$anon$2.writeObject(Serializers.scala:39) at org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:195) at org.apache.spark.util.collection.ExternalSorter.spillToMergeableFile(ExternalSorter.scala:329) at org.apache.spark.util.collection.ExternalSorter.spill(ExternalSorter.scala:271) at org.apache.spark.util.collection.ExternalSorter.maybeSpill(ExternalSorter.scala:249) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:220) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 14/09/10 20:39:15 ERROR executor.Executor: Exception in task 176.0 in stage 3.0 (TID 894) java.lang.ClassCastException: java.lang.Long cannot be cast to scala.Tuple2 at org.apache.spark.graphx.impl.RoutingTableMessageSerializer$$anon$1$$anon$2.writeObject(Serializers.scala:39) at org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:195) at org.apache.spark.util.collection.ExternalSorter.spillToMergeableFile(ExternalSorter.scala:329) at org.apache.spark.util.collection.ExternalSorter.spill(ExternalSorter.scala:271) at org.apache.spark.util.collection.ExternalSorter.maybeSpill(ExternalSorter.scala:249) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:220) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: how to setup steady state stream partitions
Just a guess. updateStateByKey has overloaded variants with partitioner as parameter. Can it help? -Original Message- From: qihong [mailto:qc...@pivotal.io] Sent: Tuesday, September 09, 2014 9:13 PM To: u...@spark.incubator.apache.org Subject: Re: how to setup steady state stream partitions Thanks for your response. I do have something like: val inputDStream = ... val keyedDStream = inputDStream.map(...) // use sensorId as key val partitionedDStream = keyedDstream.transform(rdd = rdd.partitionBy(new MyPartitioner(...))) val stateDStream = partitionedDStream.updateStateByKey[...](udpateFunction) The partitionedDStream does have steady partitions, but stateDStream does not have steady partitions, i.e., in the partition 0 of partitionedDStream, there's only data for sensors 0 to 999, but the partition 0 of stateDStream contains data for some sensors from 0 to 999 range, and lot of sensor from other partitions of partitionedDStream. I wish the partition 0 of stateDStream only contains the data from the partition 0 of partitionedDStream, partiton 1 of stateDStream only from partition 1 of partitionedDStream, and so on. Anyone knows how to implement that? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-setup-steady-state-stream-partitions-tp13850p13853.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: DELIVERY FAILURE: Error transferring to QCMBSJ601.HERMES.SI.SOCGEN; Maximum hop count exceeded. Message probably in a routing loop.
Great, good to know it's not just me... 2014-09-10 11:25 GMT-07:00 Marcelo Vanzin van...@cloudera.com: Yes please pretty please. This is really annoying. On Sun, Sep 7, 2014 at 6:31 AM, Ognen Duzlevski ognen.duzlev...@gmail.com wrote: I keep getting below reply every time I send a message to the Spark user list? Can this person be taken off the list by powers that be? Thanks! Ognen Forwarded Message Subject: DELIVERY FAILURE: Error transferring to QCMBSJ601.HERMES.SI.SOCGEN; Maximum hop count exceeded. Message probably in a routing loop. Date: Sun, 7 Sep 2014 08:29:23 -0500 From: postmas...@sgcib.com To: ognen.duzlev...@gmail.com Your message Subject: Re: Adding quota to the ephemeral hdfs on a standalone spark cluster on ec2 was not delivered to: pierre.lanvin-...@sgcib.com because: Error transferring to QCMBSJ601.HERMES.SI.SOCGEN; Maximum hop count exceeded. Message probably in a routing loop. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Marcelo
Re: Accumulo and Spark
It's very straightforward to set up a Hadoop RDD to use AccumuloInputFormat. Something like this will do the trick: private JavaPairRDDKey,Value newAccumuloRDD(JavaSparkContext sc, AgileConf agileConf, String appName, Authorizations auths) throws IOException, AccumuloSecurityException { Job hadoopJob = Job.getInstance(agileConf, appName); // configureAccumuloInput is exactly the same as for an MR job // sets zookeeper instance, credentials, table name, auths etc. configureAccumuloInput(hadoopJob, ACCUMULO_TABLE, auths); return sc.newAPIHadoopRDD(hadoopJob.getConfiguration(), AccumuloInputFormat.class, Key.class, Value.class); } There's tons of docs around how to operate on a JavaPairRDD. But you're right, there's hardly anything at all re. how to plug accumulo into spark. -Russ On Wed, Sep 10, 2014 at 1:17 PM, Megavolt jbru...@42six.com wrote: I've been doing some Googling and haven't found much info on how to incorporate Spark and Accumulo. Does anyone know of some examples of how to tie Spark to Accumulo (for both fetching data and dumping results)? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Accumulo-and-Spark-tp13923.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark + AccumuloInputFormat
To answer my own question... I didn't realize that I was responsible for telling Spark how much parallelism I wanted for my job. I figured that between Spark and Yarn they'd figure it out for themselves. Adding --executor-memory 3G --num-executors 24 to my spark-submit command took the query time down to 30s from 18 minutes and I'm seeing much better utilization of my accumulo tablet servers. -Russ On Tue, Sep 9, 2014 at 5:13 PM, Russ Weeks rwe...@newbrightidea.com wrote: Hi, I'm trying to execute Spark SQL queries on top of the AccumuloInputFormat. Not sure if I should be asking on the Spark list or the Accumulo list, but I'll try here. The problem is that the workload to process SQL queries doesn't seem to be distributed across my cluster very well. My Spark SQL app is running in yarn-client mode. The query I'm running is select count(*) from audit_log (or a similarly simple query) where my audit_log table has 14.3M rows, 504M key value pairs spread fairly evenly across 8 tablet servers. Looking at the Accumulo monitor app, I only ever see a maximum of 2 tablet servers with active scans. Since the data is spread across all the tablet servers, I hoped to see 8! I realize there are a lot of moving parts here but I'd any advice about where to start looking. Using Spark 1.0.1 with Accumulo 1.6. Thanks! -Russ
RE: how to setup steady state stream partitions
Thanks for your response! I found that too, and it does the trick! Here's refined code: val inputDStream = ... val keyedDStream = inputDStream.map(...) // use sensorId as key val partitionedDStream = keyedDstream.transform(rdd = rdd.partitionBy(new MyPartitioner(...))) val stateDStream = partitionedDStream.updateStateByKey[...](udpateFunction, new MyPartitioner(...)) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-setup-steady-state-stream-partitions-tp13850p13931.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is the structure for a jar file for running Spark applications the same as that for Hadoop
On Mon, Sep 8, 2014 at 11:15 PM, Sean Owen so...@cloudera.com wrote: This structure is not specific to Hadoop, but in theory works in any JAR file. You can put JARs in JARs and refer to them with Class-Path entries in META-INF/MANIFEST.MF. Funny that you mention that, since someone internally asked the same question, and I spend some time looking at it. That's not actually how Class-Path works in the manifest. You can't have jars inside other jars; the Class-Path items reference things in the filesystem itself. So that solution doesn't work. It would be nice to add the feature Steve is talking about, though. -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: RDD memory questions
On Wed, Sep 10, 2014 at 1:05 AM, Boxian Dong box...@indoo.rs wrote: Thank you very much for your kindly help. I rise some another questions: - If the RDD is stored in serialized format, is that means that whenever the RDD is processed, it will be unpacked and packed again from and back to the JVM even they are located on the same machine? http://apache-spark-user-list.1001560.n3.nabble.com/file/n13862/rdd_img.png In PySpark, Yes. But in Spark generally, no, you have several choice to cache RDD in Scala, serialized or not. - Can the RDD be partially unpacked from the serialized state? or when every a RDD is touched, it must be fully unpacked, and of course pack again afterword. The items in RDD are deserialized batch by batch, so if you call rdd.take(), only first small parts of items are deserialized. The cache of RDD are kept in JVM, you do not need to pack again after visiting them. - When a RDD is cached, is it saved in a unserialized format or serialized format? If it's saved in a unserialized format, is the partially reading of RDD from JVM to PYTHON runtime possible? For PySpark, they are all saved in serialized format. During a transformation of RDD, you can only see the current partition, you can not access other partitions or other RDD. The RDD always are read-only, so you can not modify them any time. (all the modification will be dropped.) Thank you very much -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-memory-questions-tp13805p13862.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is the structure for a jar file for running Spark applications the same as that for Hadoop
Hm, so it is: http://docs.oracle.com/javase/tutorial/deployment/jar/downman.html I'm sure I've done this before though and thought is was this mechanism. It must be something custom. What's the Hadoop jar structure in question then? Is it something special like a WAR file? I confess I had never heard of this so thought this was about generic JAR stuff. Is the question about a lib dir in the Hadoop home dir? On Sep 10, 2014 11:34 PM, Marcelo Vanzin van...@cloudera.com wrote: On Mon, Sep 8, 2014 at 11:15 PM, Sean Owen so...@cloudera.com wrote: This structure is not specific to Hadoop, but in theory works in any JAR file. You can put JARs in JARs and refer to them with Class-Path entries in META-INF/MANIFEST.MF. Funny that you mention that, since someone internally asked the same question, and I spend some time looking at it. That's not actually how Class-Path works in the manifest. You can't have jars inside other jars; the Class-Path items reference things in the filesystem itself. So that solution doesn't work. It would be nice to add the feature Steve is talking about, though. -- Marcelo
Re: Is the structure for a jar file for running Spark applications the same as that for Hadoop
In modern projects there are a bazillion dependencies - when I use Hadoop I just put them in a lib directory in the jar - If I have a project that depends on 50 jars I need a way to deliver them to Spark - maybe wordcount can be written without dependencies but real projects need to deliver dependencies to the cluster On Wed, Sep 10, 2014 at 11:44 PM, Sean Owen so...@cloudera.com wrote: Hm, so it is: http://docs.oracle.com/javase/tutorial/deployment/jar/downman.html I'm sure I've done this before though and thought is was this mechanism. It must be something custom. What's the Hadoop jar structure in question then? Is it something special like a WAR file? I confess I had never heard of this so thought this was about generic JAR stuff. Is the question about a lib dir in the Hadoop home dir? On Sep 10, 2014 11:34 PM, Marcelo Vanzin van...@cloudera.com wrote: On Mon, Sep 8, 2014 at 11:15 PM, Sean Owen so...@cloudera.com wrote: This structure is not specific to Hadoop, but in theory works in any JAR file. You can put JARs in JARs and refer to them with Class-Path entries in META-INF/MANIFEST.MF. Funny that you mention that, since someone internally asked the same question, and I spend some time looking at it. That's not actually how Class-Path works in the manifest. You can't have jars inside other jars; the Class-Path items reference things in the filesystem itself. So that solution doesn't work. It would be nice to add the feature Steve is talking about, though. -- Marcelo -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com
Re: Is the structure for a jar file for running Spark applications the same as that for Hadoop
On Wed, Sep 10, 2014 at 3:44 PM, Sean Owen so...@cloudera.com wrote: What's the Hadoop jar structure in question then? Is it something special like a WAR file? I confess I had never heard of this so thought this was about generic JAR stuff. What I've been told (and Steve's e-mail alludes to) is that you can put jars inside a lib/ directory in your jar (so jars-within-a-jar), and the MR app classloader will automatically add those to your app's class path (by using a special class loader or exploding the jar or something, don't know the exact mechanism). -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: PrintWriter error in foreach
Try providing full path to the file you want to write, and make sure the directory exists and is writable by the Spark process. On Wed, Sep 10, 2014 at 3:46 PM, Arun Luthra arun.lut...@gmail.com wrote: I have a spark program that worked in local mode, but throws an error in yarn-client mode on a cluster. On the edge node in my home directory, I have an output directory (called transout) which is ready to receive files. The spark job I'm running is supposed to write a few hundred files into that directory, once for each iteration of a foreach function. This works in local mode, and my only guess as to why this would fail in yarn-client mode is that the RDD is distributed across many nodes and the program is trying to use the PrintWriter on the datanodes, where the output directory doesn't exist. Is this what's happening? Any proposed solution? abbreviation of the code: import java.io.PrintWriter ... rdd.foreach { val outFile = new PrintWriter(transoutput/output.%s.format(id)) outFile.println(test) outFile.close() } Error: 14/09/10 16:57:09 WARN TaskSetManager: Lost TID 1826 (task 0.0:26) 14/09/10 16:57:09 WARN TaskSetManager: Loss was due to java.io.FileNotFoundException java.io.FileNotFoundException: transoutput/input.598718 (No such file or directory) at java.io.FileOutputStream.open(Native Method) at java.io.FileOutputStream.init(FileOutputStream.java:194) at java.io.FileOutputStream.init(FileOutputStream.java:84) at java.io.PrintWriter.init(PrintWriter.java:146) at com.att.bdcoe.cip.ooh.TransformationLayer$$anonfun$main$3.apply(TransformLayer.scala:98) at com.att.bdcoe.cip.ooh.TransformationLayer$$anonfun$main$3.apply(TransformLayer.scala:95) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:703) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:703) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:662)
GraphX : AssertionError
Hi, I have a small graph with about 3.3M vertices and close to 7.5M edges. It's a pretty innocent graph with the max degree of 8. Unfortunately, graph.traingleCount is failing on me with the exception below. I'm running a spark-shell on CDH5.1 with the following params : SPARK_DRIVER_MEM=10g ADD_JARS=./path/to/my-jar-with-dependencies.jar SPARK_WORKER_INSTANCES=120 SPARK_WORKER_MEMORY=5g SPARK_YARN_APP_NAME=VipulsSparkShell MASTER=yarn-client spark-shell Any clue anyone? Vipul 14/09/10 16:12:22 INFO cluster.YarnClientClusterScheduler: Stage 80 was cancelled 14/09/10 16:12:22 INFO scheduler.TaskSetManager: Loss was due to java.lang.AssertionError: assertion failed [duplicate 8] 14/09/10 16:12:22 WARN scheduler.TaskSetManager: Task 326 was killed. 14/09/10 16:12:22 WARN scheduler.TaskSetManager: Task 325 was killed. 14/09/10 16:12:22 WARN scheduler.TaskSetManager: Task 320 was killed. 14/09/10 16:12:22 WARN scheduler.TaskSetManager: Task 324 was killed. 14/09/10 16:12:22 WARN scheduler.TaskSetManager: Task 322 was killed. org.apache.spark.SparkException: Job aborted due to stage failure: Task 80.0:6 failed 4 times, most recent failure: Exception failure in TID 321 on host abc.xyz.com: java.lang.AssertionError: assertion failed scala.Predef$.assert(Predef.scala:165) org.apache.spark.graphx.lib.TriangleCount$$anonfun$run$1.apply(TriangleCount.scala:89) org.apache.spark.graphx.lib.TriangleCount$$anonfun$run$1.apply(TriangleCount.scala:86) org.apache.spark.graphx.impl.VertexPartitionBaseOps.leftJoin(VertexPartitionBaseOps.scala:125) org.apache.spark.graphx.VertexRDD$$anonfun$3.apply(VertexRDD.scala:192) org.apache.spark.graphx.VertexRDD$$anonfun$3.apply(VertexRDD.scala:189) org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:87) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) org.apache.spark.scheduler.Task.run(Task.scala:51) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: PrintWriter error in foreach
Ok, so I don't think the workers on the data nodes will be able to see my output directory on the edge node. I don't think stdout will work either, so I'll write to HDFS via rdd.saveAsTextFile(...) On Wed, Sep 10, 2014 at 3:51 PM, Daniil Osipov daniil.osi...@shazam.com wrote: Try providing full path to the file you want to write, and make sure the directory exists and is writable by the Spark process. On Wed, Sep 10, 2014 at 3:46 PM, Arun Luthra arun.lut...@gmail.com wrote: I have a spark program that worked in local mode, but throws an error in yarn-client mode on a cluster. On the edge node in my home directory, I have an output directory (called transout) which is ready to receive files. The spark job I'm running is supposed to write a few hundred files into that directory, once for each iteration of a foreach function. This works in local mode, and my only guess as to why this would fail in yarn-client mode is that the RDD is distributed across many nodes and the program is trying to use the PrintWriter on the datanodes, where the output directory doesn't exist. Is this what's happening? Any proposed solution? abbreviation of the code: import java.io.PrintWriter ... rdd.foreach { val outFile = new PrintWriter(transoutput/output.%s.format(id)) outFile.println(test) outFile.close() } Error: 14/09/10 16:57:09 WARN TaskSetManager: Lost TID 1826 (task 0.0:26) 14/09/10 16:57:09 WARN TaskSetManager: Loss was due to java.io.FileNotFoundException java.io.FileNotFoundException: transoutput/input.598718 (No such file or directory) at java.io.FileOutputStream.open(Native Method) at java.io.FileOutputStream.init(FileOutputStream.java:194) at java.io.FileOutputStream.init(FileOutputStream.java:84) at java.io.PrintWriter.init(PrintWriter.java:146) at com.att.bdcoe.cip.ooh.TransformationLayer$$anonfun$main$3.apply(TransformLayer.scala:98) at com.att.bdcoe.cip.ooh.TransformationLayer$$anonfun$main$3.apply(TransformLayer.scala:95) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:703) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:703) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:662)
Re: Re: Spark SQL -- more than two tables for join
Hi,michael : I think Arthur.hk.chan isn't here now,I Can Show something: 1)my spark version is 1.0.1 2) when I use multiple join ,like this: sql(SELECT * FROM youhao_data left join youhao_age on (youhao_data.rowkey=youhao_age.rowkey) left join youhao_totalKiloMeter on (youhao_age.rowkey=youhao_totalKiloMeter.rowkey)) youhao_data,youhao_age,youhao_totalKiloMeter were registerAsTable 。 I take the Exception: Exception in thread main java.lang.RuntimeException: [1.90] failure: ``UNION'' expected but `left' found SELECT * FROM youhao_data left join youhao_age on (youhao_data.rowkey=youhao_age.rowkey) left join youhao_totalKiloMeter on (youhao_age.rowkey=youhao_totalKiloMeter.rowkey) ^ at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.SqlParser.apply(SqlParser.scala:60) at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:69) at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:181) at org.apache.spark.examples.sql.SparkSQLHBaseRelation$.main(SparkSQLHBaseRelation.scala:140) at org.apache.spark.examples.sql.SparkSQLHBaseRelation.main(SparkSQLHBaseRelation.scala) boyingk...@163.com From: Michael Armbrust Date: 2014-09-11 00:28 To: arthur.hk.c...@gmail.com CC: arunshell87; u...@spark.incubator.apache.org Subject: Re: Spark SQL -- more than two tables for join What version of Spark SQL are you running here? I think a lot of your concerns have likely been addressed in more recent versions of the code / documentation. (Spark 1.1 should be published in the next few days) In particular, for serious applications you should use a HiveContext and HiveQL as this is a much more complete implementation of a SQL Parser. The one in SQL context is only suggested if the Hive dependencies conflict with your application. 1) spark sql does not support multiple join This is not true. What problem were you running into? 2) spark left join: has performance issue Can you describe your data and query more? 3) spark sql’s cache table: does not support two-tier query I'm not sure what you mean here. 4) spark sql does not support repartition You can repartition SchemaRDDs in the same way as normal RDDs.
Re: How to scale more consumer to Kafka stream
Hi, You can use this Kafka Spark Consumer. https://github.com/dibbhatt/kafka-spark-consumer This is exactly does that . It creates parallel Receivers for every Kafka topic partitions. You can see the Consumer.java under consumer.kafka.client package to see an example how to use it. There is some discussion on this Consumer you can find it here : https://mail.google.com/mail/u/1/?tab=wm#search/kafka+spark+consumer/14797b2cbbaa8689 Regards, Dib On Wed, Sep 10, 2014 at 11:47 PM, Tim Smith secs...@gmail.com wrote: How are you creating your kafka streams in Spark? If you have 10 partitions for a topic, you can call createStream ten times to create 10 parallel receivers/executors and then use union to combine all the dStreams. On Wed, Sep 10, 2014 at 7:16 AM, richiesgr richie...@gmail.com wrote: Hi (my previous post as been used by someone else) I'm building a application the read from kafka stream event. In production we've 5 consumers that share 10 partitions. But on spark streaming kafka only 1 worker act as a consumer then distribute the tasks to workers so I can have only 1 machine acting as consumer but I need more because only 1 consumer means Lags. Do you've any idea what I can do ? Another point is interresting the master is not loaded at all I can get up more than 10 % CPU I've tried to increase the queued.max.message.chunks on the kafka client to read more records thinking it'll speed up the read but I only get ERROR consumer.ConsumerFetcherThread: [ConsumerFetcherThread-SparkEC2_ip-10-138-59-194.ec2.internal-1410182950783-5c49c8e8-0-174167372], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 73; ClientId: SparkEC2-ConsumerFetcherThread-SparkEC2_ip-10-138-59-194.ec2.internal-1410182950783-5c49c8e8-0-174167372; ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: [IA2,7] - PartitionFetchInfo(929838589,1048576),[IA2,6] - PartitionFetchInfo(929515796,1048576),[IA2,9] - PartitionFetchInfo(929577946,1048576),[IA2,8] - PartitionFetchInfo(930751599,1048576),[IA2,2] - PartitionFetchInfo(926457704,1048576),[IA2,5] - PartitionFetchInfo(930774385,1048576),[IA2,0] - PartitionFetchInfo(929913213,1048576),[IA2,3] - PartitionFetchInfo(929268891,1048576),[IA2,4] - PartitionFetchInfo(929949877,1048576),[IA2,1] - PartitionFetchInfo(930063114,1048576) java.lang.OutOfMemoryError: Java heap space Is someone have ideas ? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-scale-more-consumer-to-Kafka-stream-tp13883.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark.cleaner.ttl and spark.streaming.unpersist
I switched from Yarn to StandAlone mode and haven't had OOM issue yet. However, now I have Akka issues killing the executor: 2014-09-11 02:43:34,543 INFO akka.actor.LocalActorRef: Message [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from Actor[akka://sparkWorker/deadLetters] to Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkWorker%4010.2.16.8%3A44405-6#1549270895] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. Before I switched from Yarn to Standalone, I tried looking at heaps of running executors. What I found odd was that while both - jmap histo:live and jmap histo showed heap usage in few hundreds of MBytes, Yarn kept showing that memory utilization is in several Gigabytes - eventually leading to the container being killed. I would appreciate if someone can duplicate what I am seeing. Basically: 1. Tail your yarn container logs and see what it is reporting as memory used by the JVM 2. In parallel, run jmap -histo:live pid or jmap histo pid on the executor process. They should be about the same, right? Also, in the heap dump, 99% of the heap seems to be occupied with unreachable objects (and most of it is byte arrays). On Wed, Sep 10, 2014 at 12:06 PM, Tim Smith secs...@gmail.com wrote: Actually, I am not doing any explicit shuffle/updateByKey or other transform functions. In my program flow, I take in data from Kafka, match each message against a list of regex and then if a msg matches a regex then extract groups, stuff them in json and push out back to kafka (different topic). So there is really no dependency between two messages in terms of processing. Here's my container histogram: http://pastebin.com/s3nAT3cY Essentially, my app is a cluster grep on steroids. On Wed, Sep 10, 2014 at 11:34 AM, Yana Kadiyska yana.kadiy...@gmail.com wrote: Tim, I asked a similar question twice: here http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-executors-to-stay-alive-tt12940.html and here http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Executor-OOM-tt12383.html and have not yet received any responses. I noticed that the heapdump only contains a very large byte array consuming about 66%(the second link contains a picture of my heap -- I ran with a small heap to be able to get the failure quickly) I don't have solutions but wanted to affirm that I've observed a similar situation... On Wed, Sep 10, 2014 at 2:24 PM, Tim Smith secs...@gmail.com wrote: I am using Spark 1.0.0 (on CDH 5.1) and have a similar issue. In my case, the receivers die within an hour because Yarn kills the containers for high memory usage. I set ttl.cleaner to 30 seconds but that didn't help. So I don't think stale RDDs are an issue here. I did a jmap -histo on a couple of running receiver processes and in a heap of 30G, roughly ~16G is taken by [B which is byte arrays. Still investigating more and would appreciate pointers for troubleshooting. I have dumped the heap of a receiver and will try to go over it. On Wed, Sep 10, 2014 at 1:43 AM, Luis Ángel Vicente Sánchez langel.gro...@gmail.com wrote: I somehow missed that parameter when I was reviewing the documentation, that should do the trick! Thank you! 2014-09-10 2:10 GMT+01:00 Shao, Saisai saisai.s...@intel.com: Hi Luis, The parameter “spark.cleaner.ttl” and “spark.streaming.unpersist” can be used to remove useless timeout streaming data, the difference is that “spark.cleaner.ttl” is time-based cleaner, it does not only clean streaming input data, but also Spark’s useless metadata; while “spark.streaming.unpersist” is reference-based cleaning mechanism, streaming data will be removed when out of slide duration. Both these two parameter can alleviate the memory occupation of Spark Streaming. But if the data is flooded into Spark Streaming when start up like your situation using Kafka, these two parameters cannot well mitigate the problem. Actually you need to control the input data rate to not inject so fast, you can try “spark.straming.receiver.maxRate” to control the inject rate. Thanks Jerry From: Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com] Sent: Wednesday, September 10, 2014 5:21 AM To: user@spark.apache.org Subject: spark.cleaner.ttl and spark.streaming.unpersist The executors of my spark streaming application are being killed due to memory issues. The memory consumption is quite high on startup because is the first run and there are quite a few events on the kafka queues that are consumed at a rate of 100K events per sec. I wonder if it's recommended to use spark.cleaner.ttl and spark.streaming.unpersist together to mitigate that problem. And I also wonder if new RDD are being batched while a RDD is being
Setting up jvm in pyspark from shell
Hi, I am using pyspark shell and am trying to create an rdd from numpy matrix rdd = sc.parallelize(matrix) I am getting the following error: JVMDUMP039I Processing dump event systhrow, detail java/lang/OutOfMemoryError at 2014/09/10 22:41:44 - please wait. JVMDUMP032I JVM requested Heap dump using '/global/u2/m/msingh/heapdump.20140910.224144.29660.0005.phd' in response to an event JVMDUMP010I Heap dump written to /global/u2/m/msingh/heapdump.20140910.224144.29660.0005.phd JVMDUMP032I JVM requested Java dump using '/global/u2/m/msingh/javacore.20140910.224144.29660.0006.txt' in response to an event JVMDUMP010I Java dump written to /global/u2/m/msingh/javacore.20140910.224144.29660.0006.txt JVMDUMP032I JVM requested Snap dump using '/global/u2/m/msingh/Snap.20140910.224144.29660.0007.trc' in response to an event JVMDUMP010I Snap dump written to /global/u2/m/msingh/Snap.20140910.224144.29660.0007.trc JVMDUMP013I Processed dump event systhrow, detail java/lang/OutOfMemoryError. Exception AttributeError: 'SparkContext' object has no attribute '_jsc' in bound method SparkContext.__del__ of pyspark.context.SparkContext object at 0x11f9450 ignored Traceback (most recent call last): File stdin, line 1, in module File /usr/common/usg/spark/1.0.2/python/pyspark/context.py, line 271, in parallelize jrdd = readRDDFromFile(self._jsc, tempFile.name, numSlices) File /usr/common/usg/spark/1.0.2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py, line 537, in __call__ File /usr/common/usg/spark/1.0.2/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.readRDDFromFile. : java.lang.OutOfMemoryError: Java heap space at org.apache.spark.api.python.PythonRDD$.readRDDFromFile(PythonRDD.scala:279) at org.apache.spark.api.python.PythonRDD.readRDDFromFile(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:88) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55) at java.lang.reflect.Method.invoke(Method.java:618) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:804) I did try to setSystemProperty sc.setSystemProperty(spark.executor.memory, 20g) How do i increase jvm heap from the shell? -- Mohit When you want success as badly as you want the air, then you will get it. There is no other secret of success. -Socrates