Re: Bind Exception
I closed the Spark Shell and tried but no change. Here is the error: . 15/01/17 14:33:39 INFO AbstractConnector: Started SocketConnector@0.0.0.0:59791 15/01/17 14:33:39 INFO Server: jetty-8.y.z-SNAPSHOT 15/01/17 14:33:39 WARN AbstractLifeCycle: FAILED SelectChannelConnector@0.0.0.0:4040: java.net.BindException: Address already in use java.net.BindException: Address already in use at sun.nio.ch.Net.bind0(Native Method) at sun.nio.ch.Net.bind(Net.java:444) at sun.nio.ch.Net.bind(Net.java:436) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74) at org.eclipse.jetty.server.nio.SelectChannelConnector.open(SelectChannelConnector.java:187) at org.eclipse.jetty.server.AbstractConnector.doStart(AbstractConnector.java:316) at org.eclipse.jetty.server.nio.SelectChannelConnector.doStart(SelectChannelConnector.java:265) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) at org.eclipse.jetty.server.Server.doStart(Server.java:293) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) at org.apache.spark.ui.JettyUtils$$anonfun$1.apply$mcV$sp(JettyUtils.scala:192) at org.apache.spark.ui.JettyUtils$$anonfun$1.apply(JettyUtils.scala:192) at org.apache.spark.ui.JettyUtils$$anonfun$1.apply(JettyUtils.scala:192) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.ui.JettyUtils$.connect$1(JettyUtils.scala:191) at org.apache.spark.ui.JettyUtils$.startJettyServer(JettyUtils.scala:205) at org.apache.spark.ui.WebUI.bind(WebUI.scala:99) at org.apache.spark.SparkContext.init(SparkContext.scala:223) at org.apache.spark.examples.SparkAPSP$.main(SparkAPSP.scala:21) at org.apache.spark.examples.SparkAPSP.main(SparkAPSP.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:292) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 15/01/17 14:33:39 WARN AbstractLifeCycle: FAILED org.eclipse.jetty.server.Server@f1b69ca: java.net.BindException: Address already in use java.net.BindException: Address already in use at sun.nio.ch.Net.bind0(Native Method) at sun.nio.ch.Net.bind(Net.java:444) at sun.nio.ch.Net.bind(Net.java:436) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74) at org.eclipse.jetty.server.nio.SelectChannelConnector.open(SelectChannelConnector.java:187) at org.eclipse.jetty.server.AbstractConnector.doStart(AbstractConnector.java:316) at org.eclipse.jetty.server.nio.SelectChannelConnector.doStart(SelectChannelConnector.java:265) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) at org.eclipse.jetty.server.Server.doStart(Server.java:293) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) at org.apache.spark.ui.JettyUtils$$anonfun$1.apply$mcV$sp(JettyUtils.scala:192) at org.apache.spark.ui.JettyUtils$$anonfun$1.apply(JettyUtils.scala:192) at org.apache.spark.ui.JettyUtils$$anonfun$1.apply(JettyUtils.scala:192) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.ui.JettyUtils$.connect$1(JettyUtils.scala:191) at org.apache.spark.ui.JettyUtils$.startJettyServer(JettyUtils.scala:205) at org.apache.spark.ui.WebUI.bind(WebUI.scala:99) at org.apache.spark.SparkContext.init(SparkContext.scala:223) at org.apache.spark.examples.SparkAPSP$.main(SparkAPSP.scala:21) at org.apache.spark.examples.SparkAPSP.main(SparkAPSP.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:292) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 15/01/17 14:33:39 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/metrics/json,null} 15/01/17 14:33:39 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage/kill,null} 15/01/17 14:33:39 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/,null} 15/01/17 14:33:39 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/static,null} .. On Tue, Jan 20, 2015 at 9:52 AM, Deep Pradhan pradhandeep1...@gmail.com wrote: I had the Spark Shell running through out. Is it because of that? On Tue, Jan 20, 2015 at 9:47 AM, Ted Yu yuzhih...@gmail.com wrote: Was there another instance of
Bind Exception
Hi, I am running a Spark job. I get the output correctly but when I see the logs file I see the following: AbstractLifeCycle: FAILED.: java.net.BindException: Address already in use... What could be the reason for this? Thank You
Re: Why custom parquet format hive table execute ParquetTableScan physical plan, not HiveTableScan?
The *spark.sql.parquet.**filterPushdown=true *has been turned on. But set *spark.sql.hive.**convertMetastoreParquet *to *false*. the first parameter is lose efficacy!!! 2015-01-20 6:52 GMT+08:00 Yana Kadiyska yana.kadiy...@gmail.com: If you're talking about filter pushdowns for parquet files this also has to be turned on explicitly. Try *spark.sql.parquet.**filterPushdown=true . *It's off by default On Mon, Jan 19, 2015 at 3:46 AM, Xiaoyu Wang wangxy...@gmail.com wrote: Yes it works! But the filter can't pushdown!!! If custom parquetinputformat only implement the datasource API? https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala 2015-01-16 21:51 GMT+08:00 Xiaoyu Wang wangxy...@gmail.com: Thanks yana! I will try it! 在 2015年1月16日,20:51,yana yana.kadiy...@gmail.com 写道: I think you might need to set spark.sql.hive.convertMetastoreParquet to false if I understand that flag correctly Sent on the new Sprint Network from my Samsung Galaxy S®4. Original message From: Xiaoyu Wang Date:01/16/2015 5:09 AM (GMT-05:00) To: user@spark.apache.org Subject: Why custom parquet format hive table execute ParquetTableScan physical plan, not HiveTableScan? Hi all! In the Spark SQL1.2.0. I create a hive table with custom parquet inputformat and outputformat. like this : CREATE TABLE test( id string, msg string) CLUSTERED BY ( id) SORTED BY ( id ASC) INTO 10 BUCKETS ROW FORMAT SERDE '*com.a.MyParquetHiveSerDe*' STORED AS INPUTFORMAT '*com.a.MyParquetInputFormat*' OUTPUTFORMAT '*com.a.MyParquetOutputFormat*'; And the spark shell see the plan of select * from test is : [== Physical Plan ==] [!OutputFaker [id#5,msg#6]] [ *ParquetTableScan* [id#12,msg#13], (ParquetRelation hdfs://hadoop/user/hive/warehouse/test.db/test, Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml), org.apache.spark.sql.hive.HiveContext@6d15a113, []), []] *Not HiveTableScan*!!! *So it dosn't execute my custom inputformat!* Why? How can it execute my custom inputformat? Thanks!
Re: Bind Exception
I had the Spark Shell running through out. Is it because of that? On Tue, Jan 20, 2015 at 9:47 AM, Ted Yu yuzhih...@gmail.com wrote: Was there another instance of Spark running on the same machine ? Can you pastebin the full stack trace ? Cheers On Mon, Jan 19, 2015 at 8:11 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, I am running a Spark job. I get the output correctly but when I see the logs file I see the following: AbstractLifeCycle: FAILED.: java.net.BindException: Address already in use... What could be the reason for this? Thank You
Re: Bind Exception
Yes, I have increased the driver memory in spark-default.conf to 2g. Still the error persists. On Tue, Jan 20, 2015 at 10:18 AM, Ted Yu yuzhih...@gmail.com wrote: Have you seen these threads ? http://search-hadoop.com/m/JW1q5tMFlb http://search-hadoop.com/m/JW1q5dabji1 Cheers On Mon, Jan 19, 2015 at 8:33 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi Ted, When I am running the same job with small data, I am able to run. But when I run it with relatively bigger set of data, it is giving me OutOfMemoryError: GC overhead limit exceeded. The first time I run the job, no output. When I run for second time, I am getting this error. I am aware that, the memory is getting full, but is there any way to avoid this? I have a single node Spark cluster. Thank You On Tue, Jan 20, 2015 at 9:52 AM, Deep Pradhan pradhandeep1...@gmail.com wrote: I had the Spark Shell running through out. Is it because of that? On Tue, Jan 20, 2015 at 9:47 AM, Ted Yu yuzhih...@gmail.com wrote: Was there another instance of Spark running on the same machine ? Can you pastebin the full stack trace ? Cheers On Mon, Jan 19, 2015 at 8:11 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, I am running a Spark job. I get the output correctly but when I see the logs file I see the following: AbstractLifeCycle: FAILED.: java.net.BindException: Address already in use... What could be the reason for this? Thank You
Re: Finding most occurrences in a JSON Nested Array
I just checked the post. do you need help still ? I think getAs(Seq[String]) should help. If you are still stuck let me know. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Finding-most-occurrences-in-a-JSON-Nested-Array-tp20971p21252.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: If an RDD appeared twice in a DAG, of which calculation is triggered by a single action, will this RDD be calculated twice?
Hi, On Sat, Jan 17, 2015 at 3:37 AM, Peng Cheng pc...@uow.edu.au wrote: I'm talking about RDD1 (not persisted or checkpointed) in this situation: ...(somewhere) - RDD1 - RDD2 || V V RDD3 - RDD4 - Action! To my experience the change RDD1 get recalculated is volatile, sometimes once, sometimes twice. That should not happen if your access pattern to RDD2 and RDD3 is always the same. A related problem might be in $SQLContest.jsonRDD(), since the source jsonRDD is used twice (one for schema inferring, another for data read). It almost guarantees that the source jsonRDD is calculated twice. Has this problem be addressed so far? That's exactly why schema inference is expensive. However, I am afraid in general you have to make a decision between store or recompute (cf. http://en.wikipedia.org/wiki/Space%E2%80%93time_tradeoff). There is no way to avoid recomputation on each access except than storing the value, I guess. Tobias
RE: MatchError in JsonRDD.toLong
Yes, actually that is what I mean exactly. And maybe you missed my last response, you can use the API: jsonRDD(json:RDD[String], schema:StructType) to clearly clarify your schema. For numbers bigger than Long, we can use DecimalType. Thanks, Daoyuan From: Tobias Pfeiffer [mailto:t...@preferred.jp] Sent: Tuesday, January 20, 2015 9:26 AM To: Wang, Daoyuan Cc: user Subject: Re: MatchError in JsonRDD.toLong Hi, On Fri, Jan 16, 2015 at 6:14 PM, Wang, Daoyuan daoyuan.w...@intel.commailto:daoyuan.w...@intel.com wrote: The second parameter of jsonRDD is the sampling ratio when we infer schema. OK, I was aware of this, but I guess I understand the problem now. My sampling ratio is so low that I only see the Long values of data items and infer it's a Long. When I meet the data that's actually longer than Long, I get the error I posted; basically it's the same situation as when specifying a wrong schema manually. So is there any way around this other than increasing the sample ratio to discover also the very BigDecimal-sized numbers? Thanks Tobias
How to compute RDD[(String, Set[String])] that include large Set
i want compute RDD[(String, Set[String])] that include a part of large size ’Set[String]’. -- val hoge: RDD[(String, Set[String])] = ... val reduced = hoge.reduceByKey(_ ++ _) //= create large size Set (shuffle read size 7GB) val counted = reduced.map{ case (key, strSeq) = s”$key\t${strSeq.size}} counted.saveAsText(“/path/to/save/dir) -- Look Spark UI, In stage of saveAsText, lost executor and starting resubmit. then spark continue much lost executor. i think, approach for this problem solving, make ‘RDD[(String, RDD[String])]’ , union RDD[String], and distinct count. but create RDD in RDD, NullPointerException has occured. maybe impossible this operation What might be the issue and possible solution? please lend your wisdom -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-compute-RDD-String-Set-String-that-include-large-Set-tp21248.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Does Spark automatically run different stages concurrently when possible?
Sean, A related question. When to persist the RDD after step 2 or after Step 3 (nothing would happen before step 3 I assume)? On Mon, Jan 19, 2015 at 5:17 PM, Sean Owen so...@cloudera.com wrote: From the OP: (1) val lines = Import full dataset using sc.textFile (2) val ABonly = Filter out all rows from lines that are not of type A or B (3) val processA = Process only the A rows from ABonly (4) val processB = Process only the B rows from ABonly I assume that 3 and 4 are actions, or else nothing happens here at all. When 3 is invoked, it will compute 1, then 2, then 3. 4 will happen after 3, and may even cause 1 and 2 to happen again if nothing is persisted. You can invoke 3 and 4 in parallel on the driver if you like. That's fine. But actions are blocking in the driver. On Mon, Jan 19, 2015 at 8:21 AM, davidkl davidkl...@hotmail.com wrote: Hi Jon, I am looking for an answer for a similar question in the doc now, so far no clue. I would need to know what is spark behaviour in a situation like the example you provided, but taking into account also that there are multiple partitions/workers. I could imagine it's possible that different spark workers are not synchronized in terms of waiting for each other to progress to the next step/stage for the partitions of data they get assigned, while I believe in streaming they would wait for the current batch to complete before they start working on a new one. In the code I am working on, I need to make sure a particular step is completed (in all workers, for all partitions) before next transformation is applied. Would be great if someone could clarify or point to these issues in the doc! :-) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Does-Spark-automatically-run-different-stages-concurrently-when-possible-tp21075p21227.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- thanks ashish Blog: http://www.ashishpaliwal.com/blog My Photo Galleries: http://www.pbase.com/ashishpaliwal - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to compute RDD[(String, Set[String])] that include large Set
Instead of counted.saveAsText(“/path/to/save/dir) if you call counted.collect what happens ? If you still face the same issue please paste the stacktrace here. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-compute-RDD-String-Set-String-that-include-large-Set-tp21248p21250.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Bind Exception
Have you seen these threads ? http://search-hadoop.com/m/JW1q5tMFlb http://search-hadoop.com/m/JW1q5dabji1 Cheers On Mon, Jan 19, 2015 at 8:33 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi Ted, When I am running the same job with small data, I am able to run. But when I run it with relatively bigger set of data, it is giving me OutOfMemoryError: GC overhead limit exceeded. The first time I run the job, no output. When I run for second time, I am getting this error. I am aware that, the memory is getting full, but is there any way to avoid this? I have a single node Spark cluster. Thank You On Tue, Jan 20, 2015 at 9:52 AM, Deep Pradhan pradhandeep1...@gmail.com wrote: I had the Spark Shell running through out. Is it because of that? On Tue, Jan 20, 2015 at 9:47 AM, Ted Yu yuzhih...@gmail.com wrote: Was there another instance of Spark running on the same machine ? Can you pastebin the full stack trace ? Cheers On Mon, Jan 19, 2015 at 8:11 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, I am running a Spark job. I get the output correctly but when I see the logs file I see the following: AbstractLifeCycle: FAILED.: java.net.BindException: Address already in use... What could be the reason for this? Thank You
Re: How to get the master URL at runtime inside driver program?
Hi, On Sun, Jan 18, 2015 at 11:08 AM, guxiaobo1982 guxiaobo1...@qq.com wrote: Driver programs submitted by the spark-submit script will get the runtime spark master URL, but how it get the URL inside the main method when creating the SparkConf object? The master will be stored in the spark.master property. I use the following snippet: // When run through spark-submit, the Java system property spark.master // will contain the master passed to spark-submit and we *must* use the // same; otherwise use local[3]. val master = scala.util.Properties.propOrElse(spark.master, local[3]) Tobias
Re: How to compute RDD[(String, Set[String])] that include large Set
As far as I know, the tasks before calling saveAsText are transformations so that they are lazy computed. Then saveAsText action performs all transformations and your Set[String] grows up at this time. It creates large collection if you have few keys and this causes OOM easily when your executor memory and fraction settings are not suitable for computing this. If you want only collection counts by keys , you can use countByKey() or map() RDD[(String, Set[String])] to RDD[(String,Long)] after creating hoge RDD to make reduceByKey collect only counts of keys. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-compute-RDD-String-Set-String-that-include-large-Set-tp21248p21251.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Issues with constants in Spark HiveQL queries
Yes we tried the master branch (sometime last week) and there was no issue, but the above repro is for branch 1.2 and Hive 0.13. Isn't that the final release branch for Spark 1.2? If so, a patch needs to be created or back-ported from master? (Yes the obvious typo in the column name was introduced in this email only, so is irrelevant to the error). On Wed, Jan 14, 2015 at 5:52 PM, Yana Kadiyska yana.kadiy...@gmail.com wrote: yeah, that makes sense. Pala, are you on a prebuild version of Spark -- I just tried the CDH4 prebuilt...Here is what I get for the = token: [image: Inline image 1] The literal type shows as 290, not 291, and 290 is numeric. According to this http://grepcode.com/file/repo1.maven.org/maven2/org.apache.hive/hive-exec/0.13.1/org/apache/hadoop/hive/ql/parse/HiveParser.java#HiveParser 291 is token PLUS which is really weird... On Wed, Jan 14, 2015 at 7:47 PM, Cheng, Hao hao.ch...@intel.com wrote: The log showed it failed in parsing, so the typo stuff shouldn’t be the root cause. BUT I couldn’t reproduce that with master branch. I did the test as follow: sbt/sbt –Phadoop-2.3.0 –Phadoop-2.3 –Phive –Phive-0.13.1 hive/console scala sql(“SELECT user_id FROM actions where conversion_aciton_id=20141210”) sbt/sbt –Phadoop-2.3.0 –Phadoop-2.3 –Phive –Phive-0.12.0 hive/console scala sql(“SELECT user_id FROM actions where conversion_aciton_id=20141210”) *From:* Yana Kadiyska [mailto:yana.kadiy...@gmail.com] *Sent:* Wednesday, January 14, 2015 11:12 PM *To:* Pala M Muthaia *Cc:* user@spark.apache.org *Subject:* Re: Issues with constants in Spark HiveQL queries Just a guess but what is the type of conversion_aciton_id? I do queries over an epoch all the time with no issues(where epoch's type is bigint). You can see the source here https://github.com/apache/spark/blob/v1.2.0/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala -- not sure what ASTNode type: 291 but it sounds like it's not considered numeric? If it's a string it should be conversion_aciton_id=*'*20141210*' *(single quotes around the string) On Tue, Jan 13, 2015 at 5:25 PM, Pala M Muthaia mchett...@rocketfuelinc.com wrote: Hi, We are testing Spark SQL-Hive QL, on Spark 1.2.0. We have run some simple queries successfully, but we hit the following issue whenever we attempt to use a constant in the query predicate. It seems like an issue with parsing constant. Query: SELECT user_id FROM actions where conversion_aciton_id=20141210 Error: scala.NotImplementedError: No parse rules for ASTNode type: 291, text: 20141210 : 20141210 Any ideas? This seems very basic, so we may be missing something basic, but i haven't figured out what it is. --- Full shell output below: scala sqlContext.sql(SELECT user_id FROM actions where conversion_aciton_id=20141210) 15/01/13 16:55:54 INFO ParseDriver: Parsing command: SELECT user_id FROM actions where conversion_aciton_id=20141210 15/01/13 16:55:54 INFO ParseDriver: Parse Completed 15/01/13 16:55:54 INFO ParseDriver: Parsing command: SELECT user_id FROM actions where conversion_aciton_id=20141210 15/01/13 16:55:54 INFO ParseDriver: Parse Completed java.lang.RuntimeException: Unsupported language features in query: SELECT user_id FROM actions where conversion_aciton_id=20141210 TOK_QUERY TOK_FROM TOK_TABREF TOK_TABNAME actions TOK_INSERT TOK_DESTINATION TOK_DIR TOK_TMP_FILE TOK_SELECT TOK_SELEXPR TOK_TABLE_OR_COL user_id TOK_WHERE = TOK_TABLE_OR_COL conversion_aciton_id 20141210 scala.NotImplementedError: No parse rules for ASTNode type: 291, text: 20141210 : 20141210 + org.apache.spark.sql.hive.HiveQl$.nodeToExpr(HiveQl.scala:1110) at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:251) at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:50) at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:49) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202) at
Re: MatchError in JsonRDD.toLong
Hi, On Fri, Jan 16, 2015 at 6:14 PM, Wang, Daoyuan daoyuan.w...@intel.com wrote: The second parameter of jsonRDD is the sampling ratio when we infer schema. OK, I was aware of this, but I guess I understand the problem now. My sampling ratio is so low that I only see the Long values of data items and infer it's a Long. When I meet the data that's actually longer than Long, I get the error I posted; basically it's the same situation as when specifying a wrong schema manually. So is there any way around this other than increasing the sample ratio to discover also the very BigDecimal-sized numbers? Thanks Tobias
How to output to S3 and keep the order
Hi, I am using Spark on AWS and want to write the output to S3. It is a relatively small file and I don't want them to output as multiple parts. So I use result.repartition(1).saveAsTextFile(s3://...) However as long as I am using the saveAsTextFile method, the output doesn't keep the original order. But if I use BufferedWriter in Java to write the output, I could only write to the master machine instead of S3 directly. Is there a way that I could write to S3 and the same time keep the order? Thanks a lot! Anny -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-output-to-S3-and-keep-the-order-tp21246.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Why custom parquet format hive table execute ParquetTableScan physical plan, not HiveTableScan?
If you're talking about filter pushdowns for parquet files this also has to be turned on explicitly. Try *spark.sql.parquet.**filterPushdown=true . *It's off by default On Mon, Jan 19, 2015 at 3:46 AM, Xiaoyu Wang wangxy...@gmail.com wrote: Yes it works! But the filter can't pushdown!!! If custom parquetinputformat only implement the datasource API? https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala 2015-01-16 21:51 GMT+08:00 Xiaoyu Wang wangxy...@gmail.com: Thanks yana! I will try it! 在 2015年1月16日,20:51,yana yana.kadiy...@gmail.com 写道: I think you might need to set spark.sql.hive.convertMetastoreParquet to false if I understand that flag correctly Sent on the new Sprint Network from my Samsung Galaxy S®4. Original message From: Xiaoyu Wang Date:01/16/2015 5:09 AM (GMT-05:00) To: user@spark.apache.org Subject: Why custom parquet format hive table execute ParquetTableScan physical plan, not HiveTableScan? Hi all! In the Spark SQL1.2.0. I create a hive table with custom parquet inputformat and outputformat. like this : CREATE TABLE test( id string, msg string) CLUSTERED BY ( id) SORTED BY ( id ASC) INTO 10 BUCKETS ROW FORMAT SERDE '*com.a.MyParquetHiveSerDe*' STORED AS INPUTFORMAT '*com.a.MyParquetInputFormat*' OUTPUTFORMAT '*com.a.MyParquetOutputFormat*'; And the spark shell see the plan of select * from test is : [== Physical Plan ==] [!OutputFaker [id#5,msg#6]] [ *ParquetTableScan* [id#12,msg#13], (ParquetRelation hdfs://hadoop/user/hive/warehouse/test.db/test, Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml), org.apache.spark.sql.hive.HiveContext@6d15a113, []), []] *Not HiveTableScan*!!! *So it dosn't execute my custom inputformat!* Why? How can it execute my custom inputformat? Thanks!
Aggregations based on sort order
Hi, I am trying to aggregate a key based on some timestamp, and I believe that spilling to disk is changing the order of the data fed into the combiner. I have some timeseries data that is of the form: (key, date, other data) Partition 1 (A, 2, ...) (B, 4, ...) (A, 1, ...) (A, 3, ...) (B, 6, ...) which I then partition by key, then sort within the partition: Partition 1 (A, 1, ...) (A, 2, ...) (A, 3, ...) (A, 4, ...) Partition 2 (B, 4, ...) (B, 6, ...) If I run a combineByKey with the same partitioner, then the items for each key will be fed into the ExternalAppendOnlyMap in the correct order. However, if I spill, then the time slices are spilled to disk as multiple partial combiners. When its time to merge the spilled combiners for each key, the combiners are combined in the wrong order. For example, if during a groupByKey, [(A, 1, ...), (A, 2...)] and [(A, 3, ...), (A, 4, ...)] are spilled separately, it's possible that the combiners can be combined in the wrong order, like [(A, 3, ...), (A, 4, ...), (A, 1, ...), (A, 2, ...)], which invalidates the invariant that all the values for A are passed in order to the combiners. I'm not an expert, but I suspect that this is because we use a heap ordered by key when iterating, which doesn't retain the order the spilled combiners. Perhaps we can order our mergeHeap by (hash_key, spill_index), where spill_index is incremented each time we spill? This would mean that we would pop and merge the combiners of each key in order, resulting in [(A, 1, ...), (A, 2, ...), (A, 3, ...), (A, 4, ...)]. Thanks in advance for the help! If there is a way to do this already in Spark 1.2, can someone point it out to me? Best, Justin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Aggregations-based-on-sort-order-tp21245.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to output to S3 and keep the order
When you repartiton, ordering can get lost. You would need to sort after repartitioning. Aniket On Tue, Jan 20, 2015, 7:08 AM anny9699 anny9...@gmail.com wrote: Hi, I am using Spark on AWS and want to write the output to S3. It is a relatively small file and I don't want them to output as multiple parts. So I use result.repartition(1).saveAsTextFile(s3://...) However as long as I am using the saveAsTextFile method, the output doesn't keep the original order. But if I use BufferedWriter in Java to write the output, I could only write to the master machine instead of S3 directly. Is there a way that I could write to S3 and the same time keep the order? Thanks a lot! Anny -- View this message in context: http://apache-spark-user-list. 1001560.n3.nabble.com/How-to-output-to-S3-and-keep-the-order-tp21246.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Bind Exception
Was there another instance of Spark running on the same machine ? Can you pastebin the full stack trace ? Cheers On Mon, Jan 19, 2015 at 8:11 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, I am running a Spark job. I get the output correctly but when I see the logs file I see the following: AbstractLifeCycle: FAILED.: java.net.BindException: Address already in use... What could be the reason for this? Thank You
Re: Does Spark automatically run different stages concurrently when possible?
Hi Jon, I am looking for an answer for a similar question in the doc now, so far no clue. I would need to know what is spark behaviour in a situation like the example you provided, but taking into account also that there are multiple partitions/workers. I could imagine it's possible that different spark workers are not synchronized in terms of waiting for each other to progress to the next step/stage for the partitions of data they get assigned, while I believe in streaming they would wait for the current batch to complete before they start working on a new one. In the code I am working on, I need to make sure a particular step is completed (in all workers, for all partitions) before next transformation is applied. Would be great if someone could clarify or point to these issues in the doc! :-) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Does-Spark-automatically-run-different-stages-concurrently-when-possible-tp21075p21227.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Need some help to create user defined type for ML pipeline
Hi all, I'm trying to implement a pipeline for computer vision based on the latest ML package in spark. The first step of my pipeline is to decode image (jpeg for instance) stored in a parquet file. For this, I begin to create a UserDefinedType that represents a decoded image stored in a array of byte. Here is my first attempt : *@SQLUserDefinedType(udt = classOf[ByteImageUDT])class ByteImage(channels: Int, width: Int, height: Int, data: Array[Byte])private[spark] class ByteImageUDT extends UserDefinedType[ByteImage] { override def sqlType: StructType = { // type: 0 = sparse, 1 = dense// We only use values for dense vectors, and size, indices, and values for sparse// vectors. The values field is nullable because we might want to add binary vectors later,// which uses size and indices, but not values.StructType(Seq( StructField(channels, IntegerType, nullable = false), StructField(width, IntegerType, nullable = false), StructField(height, IntegerType, nullable = false), StructField(data, BinaryType, nullable = false) } override def serialize(obj: Any): Row = {val row = new GenericMutableRow(4) val img = obj.asInstanceOf[ByteImage]* *... } override def deserialize(datum: Any): Vector = { * ** *} } override def pyUDT: String = pyspark.mllib.linalg.VectorUDT override def userClass: Class[Vector] = classOf[Vector]}* I take the VectorUDT as a starting point but there's a lot of thing that I don't really understand. So any help on defining serialize and deserialize methods will be appreciated. Best Regards, Jao
Re: Newbie Question on How Tasks are Executed
Hello Mixtou, if you want to look at partition ID, I believe you want to use mapPartitionsWithIndex -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Newbie-Question-on-How-Tasks-are-Executed-tp21064p21228.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Why custom parquet format hive table execute ParquetTableScan physical plan, not HiveTableScan?
Yes it works! But the filter can't pushdown!!! If custom parquetinputformat only implement the datasource API? https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala 2015-01-16 21:51 GMT+08:00 Xiaoyu Wang wangxy...@gmail.com: Thanks yana! I will try it! 在 2015年1月16日,20:51,yana yana.kadiy...@gmail.com 写道: I think you might need to set spark.sql.hive.convertMetastoreParquet to false if I understand that flag correctly Sent on the new Sprint Network from my Samsung Galaxy S®4. Original message From: Xiaoyu Wang Date:01/16/2015 5:09 AM (GMT-05:00) To: user@spark.apache.org Subject: Why custom parquet format hive table execute ParquetTableScan physical plan, not HiveTableScan? Hi all! In the Spark SQL1.2.0. I create a hive table with custom parquet inputformat and outputformat. like this : CREATE TABLE test( id string, msg string) CLUSTERED BY ( id) SORTED BY ( id ASC) INTO 10 BUCKETS ROW FORMAT SERDE '*com.a.MyParquetHiveSerDe*' STORED AS INPUTFORMAT '*com.a.MyParquetInputFormat*' OUTPUTFORMAT '*com.a.MyParquetOutputFormat*'; And the spark shell see the plan of select * from test is : [== Physical Plan ==] [!OutputFaker [id#5,msg#6]] [ *ParquetTableScan* [id#12,msg#13], (ParquetRelation hdfs://hadoop/user/hive/warehouse/test.db/test, Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml), org.apache.spark.sql.hive.HiveContext@6d15a113, []), []] *Not HiveTableScan*!!! *So it dosn't execute my custom inputformat!* Why? How can it execute my custom inputformat? Thanks!
Re: How to compute RDD[(String, Set[String])] that include large Set
In your code, you're doing combination of large sets, like (set1 ++ set2).size which is not a good idea. (rdd1 ++ rdd2).distinct is equivalent implementation and will compute in distributed manner. Not very sure your computation on key'd sets are feasible to be transformed into RDDs. Regards, Kevin On Tue Jan 20 2015 at 1:57:52 PM Kevin Jung itsjb.j...@samsung.com wrote: As far as I know, the tasks before calling saveAsText are transformations so that they are lazy computed. Then saveAsText action performs all transformations and your Set[String] grows up at this time. It creates large collection if you have few keys and this causes OOM easily when your executor memory and fraction settings are not suitable for computing this. If you want only collection counts by keys , you can use countByKey() or map() RDD[(String, Set[String])] to RDD[(String,Long)] after creating hoge RDD to make reduceByKey collect only counts of keys. -- View this message in context: http://apache-spark-user-list. 1001560.n3.nabble.com/How-to-compute-RDD-String-Set-String- that-include-large-Set-tp21248p21251.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain
Hi all! I am trying to use kinesis and spark streaming together. So when I execute program I get exception com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain Here is my piece of code val credentials = new BasicAWSCredentials(KinesisProperties.AWS_ACCESS_KEY_ID, KinesisProperties.AWS_SECRET_KEY) var kinesisClient: AmazonKinesisClient = new AmazonKinesisClient(credentials) kinesisClient.setEndpoint(KinesisProperties.KINESIS_ENDPOINT_URL, KinesisProperties.KINESIS_SERVICE_NAME, KinesisProperties.KINESIS_REGION_ID) System.setProperty(aws.accessKeyId, KinesisProperties.AWS_ACCESS_KEY_ID) System.setProperty(aws.secretKey, KinesisProperties.AWS_SECRET_KEY) System.setProperty(AWS_ACCESS_KEY_ID, KinesisProperties.AWS_ACCESS_KEY_ID) System.setProperty(AWS_SECRET_KEY, KinesisProperties.AWS_SECRET_KEY) val numShards = kinesisClient.describeStream(KinesisProperties.MY_STREAM_NAME) .getStreamDescription().getShards().size() val numStreams = numShards val ssc = StreamingHelper.getStreamingInstance(new Duration(KinesisProperties.KINESIS_CHECKPOINT_INTERVAL)) ssc.addStreamingListener(new MyStreamListener) val kinesisStreams = (0 until numStreams).map { i = KinesisUtils.createStream(ssc, KinesisProperties.MY_STREAM_NAME, KinesisProperties.KINESIS_ENDPOINT_URL, new Duration(KinesisProperties.KINESIS_CHECKPOINT_INTERVAL), InitialPositionInStream.TRIM_HORIZON, StorageLevel.MEMORY_AND_DISK_2) } /* Union all the streams */ val unionStreams = ssc.union(kinesisStreams) val tmp_stream = unionStreams.map(byteArray = new String(byteArray)) val data=tmp_stream.window(Seconds(KinesisProperties.WINDOW_INTERVAL ), Seconds(KinesisProperties.SLIDING_INTERVAL)) data.foreachRDD((rdd: RDD[String], time: Time) = { if (rdd.take(1).size == 1) { rdd.saveAsTextFile(KinesisProperties.Sink + time.milliseconds) } }) ssc.start() ssc.awaitTermination() Any suggestion? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/com-amazonaws-AmazonClientException-Unable-to-load-AWS-credentials-from-any-provider-in-the-chain-tp21255.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain
Try this piece of code: System.setProperty(AWS_ACCESS_KEY_ID, access_key) System.setProperty(AWS_SECRET_KEY, secret) val streamName = mystream val endpointUrl = https://kinesis.us-east-1.amazonaws.com/; val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain()) kinesisClient.setEndpoint(endpointUrl) val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards() .size() val numStreams = numShards val kinesisCheckpointInterval = Seconds(10) val kinesisStreams = (0 until 10).map { i =KinesisUtils.createStream(ssc, streamName, endpointUrl, kinesisCheckpointInterval, InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2) } /* Union all the streams */ val unionStreams = ssc.union(kinesisStreams) unionStreams.print() Thanks Best Regards On Tue, Jan 20, 2015 at 12:51 PM, Hafiz Mujadid hafizmujadi...@gmail.com wrote: Hi all! I am trying to use kinesis and spark streaming together. So when I execute program I get exception com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain Here is my piece of code val credentials = new BasicAWSCredentials(KinesisProperties.AWS_ACCESS_KEY_ID, KinesisProperties.AWS_SECRET_KEY) var kinesisClient: AmazonKinesisClient = new AmazonKinesisClient(credentials) kinesisClient.setEndpoint(KinesisProperties.KINESIS_ENDPOINT_URL, KinesisProperties.KINESIS_SERVICE_NAME, KinesisProperties.KINESIS_REGION_ID) System.setProperty(aws.accessKeyId, KinesisProperties.AWS_ACCESS_KEY_ID) System.setProperty(aws.secretKey, KinesisProperties.AWS_SECRET_KEY) System.setProperty(AWS_ACCESS_KEY_ID, KinesisProperties.AWS_ACCESS_KEY_ID) System.setProperty(AWS_SECRET_KEY, KinesisProperties.AWS_SECRET_KEY) val numShards = kinesisClient.describeStream(KinesisProperties.MY_STREAM_NAME) .getStreamDescription().getShards().size() val numStreams = numShards val ssc = StreamingHelper.getStreamingInstance(new Duration(KinesisProperties.KINESIS_CHECKPOINT_INTERVAL)) ssc.addStreamingListener(new MyStreamListener) val kinesisStreams = (0 until numStreams).map { i = KinesisUtils.createStream(ssc, KinesisProperties.MY_STREAM_NAME, KinesisProperties.KINESIS_ENDPOINT_URL, new Duration(KinesisProperties.KINESIS_CHECKPOINT_INTERVAL), InitialPositionInStream.TRIM_HORIZON, StorageLevel.MEMORY_AND_DISK_2) } /* Union all the streams */ val unionStreams = ssc.union(kinesisStreams) val tmp_stream = unionStreams.map(byteArray = new String(byteArray)) val data=tmp_stream.window(Seconds(KinesisProperties.WINDOW_INTERVAL ), Seconds(KinesisProperties.SLIDING_INTERVAL)) data.foreachRDD((rdd: RDD[String], time: Time) = { if (rdd.take(1).size == 1) { rdd.saveAsTextFile(KinesisProperties.Sink + time.milliseconds) } }) ssc.start() ssc.awaitTermination() Any suggestion? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/com-amazonaws-AmazonClientException-Unable-to-load-AWS-credentials-from-any-provider-in-the-chain-tp21255.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to compute RDD[(String, Set[String])] that include large Set
That i want to do, get unique count for each key. so take map() or countByKey(), not get unique count. (because duplicate string is likely to be counted)... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-compute-RDD-String-Set-String-that-include-large-Set-tp21248p21254.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to compute RDD[(String, Set[String])] that include large Set
If keys are not too many, You can do like this: val data = List( (A, Set(1,2,3)), (A, Set(1,2,4)), (B, Set(1,2,3)) ) val rdd = sc.parallelize(data) rdd.persist() rdd.filter(_._1 == A).flatMap(_._2).distinct.count rdd.filter(_._1 == B).flatMap(_._2).distinct.count rdd.unpersist() == data: List[(String, scala.collection.mutable.Set[Int])] = List((A,Set(1, 2, 3)), (A,Set(1, 2, 4)), (B,Set(1, 2, 3))) rdd: org.apache.spark.rdd.RDD[(String, scala.collection.mutable.Set[Int])] = ParallelCollectionRDD[6940] at parallelize at console:66 res332: rdd.type = ParallelCollectionRDD[6940] at parallelize at console:66 res334: Long = 4 res335: Long = 3 res336: rdd.type = ParallelCollectionRDD[6940] at parallelize at console:66 Regards, Kevin On Tue Jan 20 2015 at 2:53:22 PM jagaximo takuya_seg...@dwango.co.jp wrote: That i want to do, get unique count for each key. so take map() or countByKey(), not get unique count. (because duplicate string is likely to be counted)... -- View this message in context: http://apache-spark-user-list. 1001560.n3.nabble.com/How-to-compute-RDD-String-Set-String- that-include-large-Set-tp21248p21254.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Bind Exception
Deep, Yes you have another spark shell or application sticking around somewhere. Try to inspect running processes and lookout for jave process. And kill it. This might be helpful https://www.digitalocean.com/community/tutorials/how-to-use-ps-kill-and-nice-to-manage-processes-in-linux Also, That is just a warning. FYI spark ignores BindException and probes for next available port and continues. So you application is fine if that particular error comes up. Prashant Sharma On Tue, Jan 20, 2015 at 10:30 AM, Deep Pradhan pradhandeep1...@gmail.com wrote: Yes, I have increased the driver memory in spark-default.conf to 2g. Still the error persists. On Tue, Jan 20, 2015 at 10:18 AM, Ted Yu yuzhih...@gmail.com wrote: Have you seen these threads ? http://search-hadoop.com/m/JW1q5tMFlb http://search-hadoop.com/m/JW1q5dabji1 Cheers On Mon, Jan 19, 2015 at 8:33 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi Ted, When I am running the same job with small data, I am able to run. But when I run it with relatively bigger set of data, it is giving me OutOfMemoryError: GC overhead limit exceeded. The first time I run the job, no output. When I run for second time, I am getting this error. I am aware that, the memory is getting full, but is there any way to avoid this? I have a single node Spark cluster. Thank You On Tue, Jan 20, 2015 at 9:52 AM, Deep Pradhan pradhandeep1...@gmail.com wrote: I had the Spark Shell running through out. Is it because of that? On Tue, Jan 20, 2015 at 9:47 AM, Ted Yu yuzhih...@gmail.com wrote: Was there another instance of Spark running on the same machine ? Can you pastebin the full stack trace ? Cheers On Mon, Jan 19, 2015 at 8:11 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, I am running a Spark job. I get the output correctly but when I see the logs file I see the following: AbstractLifeCycle: FAILED.: java.net.BindException: Address already in use... What could be the reason for this? Thank You
Re: Join DStream With Other Datasets
I don't think this has anything to do with transferring anything from the driver, or per task. I'm talking about a singleton object in the JVM that loads whatever you want from wherever you want and holds it in memory once per JVM. That is, I do not think you have to use broadcast, or even any Spark mechanism. On Mon, Jan 19, 2015 at 2:35 AM, Ji ZHANG zhangj...@gmail.com wrote: Hi Sean, Thanks for your advice, a normal 'val' will suffice. But will it be serialized and transferred every batch and every partition? That's why broadcast exists, right? For now I'm going to use 'val', but I'm still looking for a broadcast-way solution. On Sun, Jan 18, 2015 at 5:36 PM, Sean Owen so...@cloudera.com wrote: I think that this problem is not Spark-specific since you are simply side loading some data into memory. Therefore you do not need an answer that uses Spark. Simply load the data and then poll for an update each time it is accessed? Or some reasonable interval? This is just something you write in Java/Scala. On Jan 17, 2015 2:06 PM, Ji ZHANG zhangj...@gmail.com wrote: Hi, I want to join a DStream with some other dataset, e.g. join a click stream with a spam ip list. I can think of two possible solutions, one is use broadcast variable, and the other is use transform operation as is described in the manual. But the problem is the spam ip list will be updated outside of the spark streaming program, so how can it be noticed to reload the list? For broadcast variables, they are immutable. For transform operation, is it costly to reload the RDD on every batch? If it is, and I use RDD.persist(), does it mean I need to launch a thread to regularly unpersist it so that it can get the updates? Any ideas will be appreciated. Thanks. -- Jerry - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Jerry - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Guava 11 dependency issue in Spark 1.2.0
I have recently encountered a similar problem with Guava version collision with Hadoop. Isn't it more correct to upgrade Hadoop to use the latest Guava? Why are they staying in version 11, does anyone know? *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Wed, Jan 7, 2015 at 7:59 AM, Niranda Perera niranda.per...@gmail.com wrote: Hi Sean, I removed the hadoop dependencies from the app and ran it on the cluster. It gives a java.io.EOFException 15/01/07 11:19:29 INFO MemoryStore: ensureFreeSpace(177166) called with curMem=0, maxMem=2004174766 15/01/07 11:19:29 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 173.0 KB, free 1911.2 MB) 15/01/07 11:19:29 INFO MemoryStore: ensureFreeSpace(25502) called with curMem=177166, maxMem=2004174766 15/01/07 11:19:29 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 24.9 KB, free 1911.1 MB) 15/01/07 11:19:29 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.100.5.109:43924 (size: 24.9 KB, free: 1911.3 MB) 15/01/07 11:19:29 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0 15/01/07 11:19:29 INFO SparkContext: Created broadcast 0 from hadoopFile at AvroRelation.scala:45 15/01/07 11:19:29 INFO FileInputFormat: Total input paths to process : 1 15/01/07 11:19:29 INFO SparkContext: Starting job: collect at SparkPlan.scala:84 15/01/07 11:19:29 INFO DAGScheduler: Got job 0 (collect at SparkPlan.scala:84) with 2 output partitions (allowLocal=false) 15/01/07 11:19:29 INFO DAGScheduler: Final stage: Stage 0(collect at SparkPlan.scala:84) 15/01/07 11:19:29 INFO DAGScheduler: Parents of final stage: List() 15/01/07 11:19:29 INFO DAGScheduler: Missing parents: List() 15/01/07 11:19:29 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[6] at map at SparkPlan.scala:84), which has no missing parents 15/01/07 11:19:29 INFO MemoryStore: ensureFreeSpace(4864) called with curMem=202668, maxMem=2004174766 15/01/07 11:19:29 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.8 KB, free 1911.1 MB) 15/01/07 11:19:29 INFO MemoryStore: ensureFreeSpace(3481) called with curMem=207532, maxMem=2004174766 15/01/07 11:19:29 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 3.4 KB, free 1911.1 MB) 15/01/07 11:19:29 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 10.100.5.109:43924 (size: 3.4 KB, free: 1911.3 MB) 15/01/07 11:19:29 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0 15/01/07 11:19:29 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:838 15/01/07 11:19:29 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0 (MappedRDD[6] at map at SparkPlan.scala:84) 15/01/07 11:19:29 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks 15/01/07 11:19:29 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 10.100.5.109, PROCESS_LOCAL, 1340 bytes) 15/01/07 11:19:29 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 10.100.5.109, PROCESS_LOCAL, 1340 bytes) 15/01/07 11:19:29 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, 10.100.5.109): java.io.EOFException at java.io.ObjectInputStream$BlockDataInputStream.readFully(ObjectInputStream.java:2722) at java.io.ObjectInputStream.readFully(ObjectInputStream.java:1009) at org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:63) at org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:101) at org.apache.hadoop.io.UTF8.readChars(UTF8.java:216) at org.apache.hadoop.io.UTF8.readString(UTF8.java:208) at org.apache.hadoop.mapred.FileSplit.readFields(FileSplit.java:87) at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:237) at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:66) at org.apache.spark.SerializableWritable$$anonfun$readObject$1.apply$mcV$sp(SerializableWritable.scala:43) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985) at org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:969) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1871) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1775) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1327) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1969) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at
Fwd: UnknownhostException : home
-- Forwarded message -- From: Rapelly Kartheek kartheek.m...@gmail.com Date: Mon, Jan 19, 2015 at 3:03 PM Subject: UnknownhostException : home To: user@spark.apache.org user@spark.apache.org Hi, I get the following exception when I run my application: karthik@karthik:~/spark-1.2.0$ ./bin/spark-submit --class org.apache.spark.examples.SimpleApp001 --deploy-mode client --master spark://karthik:7077 $SPARK_HOME/examples/*/scala-*/spark-examples-*.jar out1.txt log4j:WARN No such property [target] in org.apache.log4j.FileAppender. Exception in thread main java.lang.IllegalArgumentException: java.net.UnknownHostException: home at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:377) at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:237) at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:141) at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:569) at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:512) at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:142) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2316) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:366) at org.apache.spark.util.FileLogger.init(FileLogger.scala:90) at org.apache.spark.scheduler.EventLoggingListener.init(EventLoggingListener.scala:63) at org.apache.spark.SparkContext.init(SparkContext.scala:352) at org.apache.spark.examples.SimpleApp001$.main(SimpleApp001.scala:13) at org.apache.spark.examples.SimpleApp001.main(SimpleApp001.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.net.UnknownHostException: home ... 20 more I couldn't trace the cause of this exception. Any help in this regard? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Fwd-UnknownhostException-home-tp21230.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Guava 11 dependency issue in Spark 1.2.0
Actually there is already someone on Hadoop-Common-Dev taking care of removing the old Guava dependency http://mail-archives.apache.org/mod_mbox/hadoop-common-dev/201501.mbox/browser https://issues.apache.org/jira/browse/HADOOP-11470 *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Mon, Jan 19, 2015 at 4:03 PM, Romi Kuntsman r...@totango.com wrote: I have recently encountered a similar problem with Guava version collision with Hadoop. Isn't it more correct to upgrade Hadoop to use the latest Guava? Why are they staying in version 11, does anyone know? *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Wed, Jan 7, 2015 at 7:59 AM, Niranda Perera niranda.per...@gmail.com wrote: Hi Sean, I removed the hadoop dependencies from the app and ran it on the cluster. It gives a java.io.EOFException 15/01/07 11:19:29 INFO MemoryStore: ensureFreeSpace(177166) called with curMem=0, maxMem=2004174766 15/01/07 11:19:29 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 173.0 KB, free 1911.2 MB) 15/01/07 11:19:29 INFO MemoryStore: ensureFreeSpace(25502) called with curMem=177166, maxMem=2004174766 15/01/07 11:19:29 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 24.9 KB, free 1911.1 MB) 15/01/07 11:19:29 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.100.5.109:43924 (size: 24.9 KB, free: 1911.3 MB) 15/01/07 11:19:29 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0 15/01/07 11:19:29 INFO SparkContext: Created broadcast 0 from hadoopFile at AvroRelation.scala:45 15/01/07 11:19:29 INFO FileInputFormat: Total input paths to process : 1 15/01/07 11:19:29 INFO SparkContext: Starting job: collect at SparkPlan.scala:84 15/01/07 11:19:29 INFO DAGScheduler: Got job 0 (collect at SparkPlan.scala:84) with 2 output partitions (allowLocal=false) 15/01/07 11:19:29 INFO DAGScheduler: Final stage: Stage 0(collect at SparkPlan.scala:84) 15/01/07 11:19:29 INFO DAGScheduler: Parents of final stage: List() 15/01/07 11:19:29 INFO DAGScheduler: Missing parents: List() 15/01/07 11:19:29 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[6] at map at SparkPlan.scala:84), which has no missing parents 15/01/07 11:19:29 INFO MemoryStore: ensureFreeSpace(4864) called with curMem=202668, maxMem=2004174766 15/01/07 11:19:29 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.8 KB, free 1911.1 MB) 15/01/07 11:19:29 INFO MemoryStore: ensureFreeSpace(3481) called with curMem=207532, maxMem=2004174766 15/01/07 11:19:29 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 3.4 KB, free 1911.1 MB) 15/01/07 11:19:29 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 10.100.5.109:43924 (size: 3.4 KB, free: 1911.3 MB) 15/01/07 11:19:29 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0 15/01/07 11:19:29 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:838 15/01/07 11:19:29 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0 (MappedRDD[6] at map at SparkPlan.scala:84) 15/01/07 11:19:29 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks 15/01/07 11:19:29 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 10.100.5.109, PROCESS_LOCAL, 1340 bytes) 15/01/07 11:19:29 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 10.100.5.109, PROCESS_LOCAL, 1340 bytes) 15/01/07 11:19:29 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, 10.100.5.109): java.io.EOFException at java.io.ObjectInputStream$BlockDataInputStream.readFully(ObjectInputStream.java:2722) at java.io.ObjectInputStream.readFully(ObjectInputStream.java:1009) at org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:63) at org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:101) at org.apache.hadoop.io.UTF8.readChars(UTF8.java:216) at org.apache.hadoop.io.UTF8.readString(UTF8.java:208) at org.apache.hadoop.mapred.FileSplit.readFields(FileSplit.java:87) at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:237) at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:66) at org.apache.spark.SerializableWritable$$anonfun$readObject$1.apply$mcV$sp(SerializableWritable.scala:43) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985) at org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:969) at
Re: Guava 11 dependency issue in Spark 1.2.0
Please see this thread: http://search-hadoop.com/m/LgpTk2aVYgr/Hadoop+guava+upgradesubj=Re+Time+to+address+the+Guava+version+problem On Jan 19, 2015, at 6:03 AM, Romi Kuntsman r...@totango.com wrote: I have recently encountered a similar problem with Guava version collision with Hadoop. Isn't it more correct to upgrade Hadoop to use the latest Guava? Why are they staying in version 11, does anyone know? Romi Kuntsman, Big Data Engineer http://www.totango.com On Wed, Jan 7, 2015 at 7:59 AM, Niranda Perera niranda.per...@gmail.com wrote: Hi Sean, I removed the hadoop dependencies from the app and ran it on the cluster. It gives a java.io.EOFException 15/01/07 11:19:29 INFO MemoryStore: ensureFreeSpace(177166) called with curMem=0, maxMem=2004174766 15/01/07 11:19:29 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 173.0 KB, free 1911.2 MB) 15/01/07 11:19:29 INFO MemoryStore: ensureFreeSpace(25502) called with curMem=177166, maxMem=2004174766 15/01/07 11:19:29 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 24.9 KB, free 1911.1 MB) 15/01/07 11:19:29 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.100.5.109:43924 (size: 24.9 KB, free: 1911.3 MB) 15/01/07 11:19:29 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0 15/01/07 11:19:29 INFO SparkContext: Created broadcast 0 from hadoopFile at AvroRelation.scala:45 15/01/07 11:19:29 INFO FileInputFormat: Total input paths to process : 1 15/01/07 11:19:29 INFO SparkContext: Starting job: collect at SparkPlan.scala:84 15/01/07 11:19:29 INFO DAGScheduler: Got job 0 (collect at SparkPlan.scala:84) with 2 output partitions (allowLocal=false) 15/01/07 11:19:29 INFO DAGScheduler: Final stage: Stage 0(collect at SparkPlan.scala:84) 15/01/07 11:19:29 INFO DAGScheduler: Parents of final stage: List() 15/01/07 11:19:29 INFO DAGScheduler: Missing parents: List() 15/01/07 11:19:29 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[6] at map at SparkPlan.scala:84), which has no missing parents 15/01/07 11:19:29 INFO MemoryStore: ensureFreeSpace(4864) called with curMem=202668, maxMem=2004174766 15/01/07 11:19:29 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.8 KB, free 1911.1 MB) 15/01/07 11:19:29 INFO MemoryStore: ensureFreeSpace(3481) called with curMem=207532, maxMem=2004174766 15/01/07 11:19:29 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 3.4 KB, free 1911.1 MB) 15/01/07 11:19:29 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 10.100.5.109:43924 (size: 3.4 KB, free: 1911.3 MB) 15/01/07 11:19:29 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0 15/01/07 11:19:29 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:838 15/01/07 11:19:29 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0 (MappedRDD[6] at map at SparkPlan.scala:84) 15/01/07 11:19:29 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks 15/01/07 11:19:29 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 10.100.5.109, PROCESS_LOCAL, 1340 bytes) 15/01/07 11:19:29 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 10.100.5.109, PROCESS_LOCAL, 1340 bytes) 15/01/07 11:19:29 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, 10.100.5.109): java.io.EOFException at java.io.ObjectInputStream$BlockDataInputStream.readFully(ObjectInputStream.java:2722) at java.io.ObjectInputStream.readFully(ObjectInputStream.java:1009) at org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:63) at org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:101) at org.apache.hadoop.io.UTF8.readChars(UTF8.java:216) at org.apache.hadoop.io.UTF8.readString(UTF8.java:208) at org.apache.hadoop.mapred.FileSplit.readFields(FileSplit.java:87) at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:237) at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:66) at org.apache.spark.SerializableWritable$$anonfun$readObject$1.apply$mcV$sp(SerializableWritable.scala:43) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985) at org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:969) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1871) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1775) at
Spark SQL: Assigning several aliases to the output (several return values) of an UDF
Hello I use Hive on Spark and have an issue with assigning several aliases to the output (several return values) of an UDF. I ran in several issues and ended up with a workaround (described at the end of this message). - Is assigning several aliases to the output of an UDF not supported by Spark SQL yet? - Is there a smarter solution than the one I ended up with finally - see (3)? 1) The query with following syntax is rejected due to the assigment of multiple aliases. Query SELECT my_function(param_one, param_two) AS (return_one, return_two, return_three) FROM my_table; Error Unsupported language features in query: SELECT my_function(param_one, param_two) AS (return_one, return_two, return_three) FROM my_table; TOK_QUERY TOK_FROM TOK_TABREF TOK_TABNAME my_table TOK_SELECT TOK_SELEXPR TOK_FUNCTION my_function TOK_TABLE_OR_COL param_one TOK_TABLE_OR_COL param_two return_one return_two return_three 2) Because of this error I searched for a way to avoid assigning multiple aliases to the UDF. I ended up having the following query and encountered another error/issue. Note: This error only occurs when having c_0 in select clause. Only selecting c_1 and c_2 works fine. Query SELECT return.c_0 AS return_one, return.c_1 AS return_two, return.c_2 AS return_three FROM (SELECT my_function(param_one, param_two) FROM my_table) return; Error java.lang.RuntimeException: Couldn't find c_0#504 in [c_0#521L,c_1#522,c_2#523] 3) My final (working) workaround is wrapping the actual query (the one with the UDF) with an additional select statement. Query SELECT result.c_0 AS return_one, result.c_1 AS return_two, result.c_2 AS return_three FROM(SELECT * FROM (SELECT my_function(param_one, param_two) FROM my_table) return) result; Error No error :) Kind regards Max -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Assigning-several-aliases-to-the-output-several-return-values-of-an-UDF-tp21236.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
[SQL] Using HashPartitioner to distribute by column
Is it possible to use a HashPartioner or something similar to distribute a SchemaRDDs data by the hash of a particular column or set of columns. Having done this I would then hope that GROUP BY could avoid shuffle E.g. set up a HashPartioner on CustomerCode field so that SELECT CustomerCode, SUM(Cost) FROM Orders GROUP BY CustomerCode would not need to shuffle. Cheers Mick -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SQL-Using-HashPartitioner-to-distribute-by-column-tp21237.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark 1.20 resource issue with Mesos .21.1
' in ZooKeeper I0119 02:41:44.112208 19320 detector.cpp:138] Detected a new leader: (id='2') I0119 02:41:44.113049 19315 group.cpp:659] Trying to get '/mesos/info_02' in ZooKeeper I0119 02:41:44.115067 19316 detector.cpp:433] A new leading master (UPID= master@192.0.3.12:5050) is detected I0119 02:41:44.118728 19317 sched.cpp:234] New master detected at master@192.0.3.12:5050 I0119 02:41:44.119282 19317 sched.cpp:242] No credentials provided. Attempting to register without authentication I0119 02:41:44.123064 19317 sched.cpp:408] Framework registered with 20150119-003609-201523392-5050-7198-0002 15/01/19 02:41:44 INFO MesosSchedulerBackend: Registered as framework ID 20150119-003609-201523392-5050-7198-0002 15/01/19 02:41:44 INFO NettyBlockTransferService: Server created on 54462 15/01/19 02:41:44 INFO BlockManagerMaster: Trying to register BlockManager 15/01/19 02:41:44 INFO BlockManagerMasterActor: Registering block manager master1:54462 with 267.3 MB RAM, BlockManagerId(driver, master1, 54462) 15/01/19 02:41:44 INFO BlockManagerMaster: Registered BlockManager 15/01/19 02:41:44 INFO SparkContext: Starting job: reduce at SparkPi.scala:35 15/01/19 02:41:44 INFO DAGScheduler: Got job 0 (reduce at SparkPi.scala:35) with 3 output partitions (allowLocal=false) 15/01/19 02:41:44 INFO DAGScheduler: Final stage: Stage 0(reduce at SparkPi.scala:35) 15/01/19 02:41:44 INFO DAGScheduler: Parents of final stage: List() 15/01/19 02:41:44 INFO DAGScheduler: Missing parents: List() 15/01/19 02:41:44 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[1] at map at SparkPi.scala:31), which has no missing parents 15/01/19 02:41:45 INFO MemoryStore: ensureFreeSpace(1728) called with curMem=0, maxMem=280248975 15/01/19 02:41:45 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 1728.0 B, free 267.3 MB) 15/01/19 02:41:45 INFO MemoryStore: ensureFreeSpace(1235) called with curMem=1728, maxMem=280248975 15/01/19 02:41:45 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1235.0 B, free 267.3 MB) 15/01/19 02:41:45 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on master1:54462 (size: 1235.0 B, free: 267.3 MB) 15/01/19 02:41:45 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0 15/01/19 02:41:45 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:838 15/01/19 02:41:45 INFO DAGScheduler: Submitting 3 missing tasks from Stage 0 (MappedRDD[1] at map at SparkPi.scala:31) 15/01/19 02:41:45 INFO TaskSchedulerImpl: Adding task set 0.0 with 3 tasks 15/01/19 02:42:00 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory and it keeps repeating Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory I have verified that http://192.0.3.11:8081/spark-1.2.0.tgz is accessible from all the slave nodes. *My Spark Environment variables list * *Environment* *Runtime Information* *Name ▾* *Value* Java Home /usr/lib/jvm/java-7-openjdk-amd64/jre Java Version 1.7.0_65 (Oracle Corporation) Scala Version version 2.10.4 *Spark Properties* *Name* *Value* spark.app.id 20150119-003609-201523392-5050-7198-0005 spark.app.name Spark Pi spark.driver.host master1 spark.driver.port 46107 spark.executor.id driver spark.fileserver.uri http://192.0.3.11:55424 spark.jars file:/home/vagrant/spark-1.2.0/examples/target/scala-2.10/spark-examples-1.2.0-hadoop1.0.4.jar spark.master mesos://zk://192.0.3.11:2181,192.0.3.12:2181,192.0.3.13:2181/mesos spark.scheduler.mode FIFO spark.tachyonStore.folderName spark-3dffd4bb-f23b-43f7-a498-54b401dc591b *System Properties* *Name* *Value* SPARK_SUBMIT true awt.toolkit sun.awt.X11.XToolkit file.encoding UTF-8 file.encoding.pkg sun.io file.separator / java.awt.graphicsenv sun.awt.X11GraphicsEnvironment java.awt.printerjob sun.print.PSPrinterJob java.class.version 51.0 java.endorsed.dirs /usr/lib/jvm/java-7-openjdk-amd64/jre/lib/endorsed java.ext.dirs /usr/lib/jvm/java-7-openjdk-amd64/jre/lib/ext:/usr/java/packages/lib/ext java.home /usr/lib/jvm/java-7-openjdk-amd64/jre java.io.tmpdir /tmp java.library.path /usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib java.runtime.name OpenJDK Runtime Environment java.runtime.version 1.7.0_65-b32 java.specification.name Java Platform API Specification java.specification.vendor Oracle Corporation java.specification.version 1.7 java.vendor Oracle Corporation java.vendor.url http://java.oracle.com/ java.vendor.url.bug http://bugreport.sun.com/bugreport/ java.version 1.7.0_65 java.vm.info mixed mode java.vm.name OpenJDK 64-Bit Server VM java.vm.specification.name Java Virtual Machine Specification java.vm.specification.vendor Oracle
unit tests with java.io.IOException: Could not create FileClient
Hi, I created some unit tests to test some of the functions in my project which use Spark. However, when I used the sbt tool to build it and then ran the sbt test, I ran into java.io.IOException: Could not create FileClient: 2015-01-19 08:50:38,1894 ERROR Client fs/client/fileclient/cc/client.cc:385 Thread: -2 Failed to initialize client for cluster 127.0.0.1:7222, error Unknown error(108) num lines: 21 [info] TextFileAdapterTestSuite: [info] - Checking the RDD Vector Length *** FAILED *** [info] java.io.IOException: Could not create FileClient [info] at com.mapr.fs.MapRFileSystem.lookupClient(MapRFileSystem.java:351) [info] at com.mapr.fs.MapRFileSystem.lookupClient(MapRFileSystem.java:363) [info] at com.mapr.fs.MapRFileSystem.getMapRFileStatus(MapRFileSystem.java:795) [info] at com.mapr.fs.MapRFileSystem.getFileStatus(MapRFileSystem.java:822) [info] at org.apache.hadoop.fs.FileSystem.getFileStatus(FileSystem.java:1419) [info] at org.apache.hadoop.fs.FileSystem.globStatusInternal(FileSystem.java:1092) [info] at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1031) [info] at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:231) [info] at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:277) [info] at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199) [info] ... The only tests failed, which I believe led to this exception are the ones where my functions call the SparkContext's function textFile(). I tried to debug this, and found that the exception seems to take place within the textFile() function. Does anybody know what is the issue and how to fix it? I used the local host for the SparkContext, does it have anything to do with this exception. Thanks, Jianguo
Re: If an RDD appeared twice in a DAG, of which calculation is triggered by a single action, will this RDD be calculated twice?
I think it's always twice, could you provide some demo case for sometimes the RDD1 calculated only once? On Sat, Jan 17, 2015 at 2:37 AM, Peng Cheng pc...@uow.edu.au wrote: I'm talking about RDD1 (not persisted or checkpointed) in this situation: ...(somewhere) - RDD1 - RDD2 || V V RDD3 - RDD4 - Action! To my experience the change RDD1 get recalculated is volatile, sometimes once, sometimes twice. When calculation of this RDD is expensive (e.g. involves using an RESTful service that charges me money), this compels me to persist RDD1 which takes extra memory, and in case the Action! doesn't always happen, I don't know when to unpersist it to free those memory. A related problem might be in $SQLContest.jsonRDD(), since the source jsonRDD is used twice (one for schema inferring, another for data read). It almost guarantees that the source jsonRDD is calculated twice. Has this problem be addressed so far? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/If-an-RDD-appeared-twice-in-a-DAG-of-which-calculation-is-triggered-by-a-single-action-will-this-RDD-tp21192.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- ~Yours, Xuefeng Wu/吴雪峰 敬上
Re: UnknownhostException : home
it's not able to resolve home to an IP. Assuming it's your local machine, add an entry in your /etc/hosts file like and then run the program again (use sudo to edit the file) 127.0.0.1 home On Mon, Jan 19, 2015 at 3:03 PM, Rapelly Kartheek kartheek.m...@gmail.com wrote: Hi, I get the following exception when I run my application: karthik@karthik:~/spark-1.2.0$ ./bin/spark-submit --class org.apache.spark.examples.SimpleApp001 --deploy-mode client --master spark://karthik:7077 $SPARK_HOME/examples/*/scala-*/spark-examples-*.jar out1.txt log4j:WARN No such property [target] in org.apache.log4j.FileAppender. Exception in thread main java.lang.IllegalArgumentException: java.net.UnknownHostException: home at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:377) at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:237) at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:141) at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:569) at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:512) at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:142) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2316) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:366) at org.apache.spark.util.FileLogger.init(FileLogger.scala:90) at org.apache.spark.scheduler.EventLoggingListener.init(EventLoggingListener.scala:63) at org.apache.spark.SparkContext.init(SparkContext.scala:352) at org.apache.spark.examples.SimpleApp001$.main(SimpleApp001.scala:13) at org.apache.spark.examples.SimpleApp001.main(SimpleApp001.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.net.UnknownHostException: home ... 20 more I couldn't trace the cause of this exception. Any help in this regard? Thanks -- thanks ashish Blog: http://www.ashishpaliwal.com/blog My Photo Galleries: http://www.pbase.com/ashishpaliwal - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: UnknownhostException : home
Sorry, to be clear, you need to write hdfs:///home/ Note three slashes; there is an empty host between the 2nd and 3rd. This is true of most URI schemes with a host. On Mon, Jan 19, 2015 at 9:56 AM, Rapelly Kartheek kartheek.m...@gmail.com wrote: Yes yes.. hadoop/etc/hadoop/hdfs-site.xml file has the path like: hdfs://home/... On Mon, Jan 19, 2015 at 3:21 PM, Sean Owen so...@cloudera.com wrote: I bet somewhere you have a path like hdfs://home/... which would suggest that 'home' is a hostname, when I imagine you mean it as a root directory. On Mon, Jan 19, 2015 at 9:33 AM, Rapelly Kartheek kartheek.m...@gmail.com wrote: Hi, I get the following exception when I run my application: karthik@karthik:~/spark-1.2.0$ ./bin/spark-submit --class org.apache.spark.examples.SimpleApp001 --deploy-mode client --master spark://karthik:7077 $SPARK_HOME/examples/*/scala-*/spark-examples-*.jar out1.txt log4j:WARN No such property [target] in org.apache.log4j.FileAppender. Exception in thread main java.lang.IllegalArgumentException: java.net.UnknownHostException: home at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:377) at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:237) at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:141) at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:569) at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:512) at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:142) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2316) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:366) at org.apache.spark.util.FileLogger.init(FileLogger.scala:90) at org.apache.spark.scheduler.EventLoggingListener.init(EventLoggingListener.scala:63) at org.apache.spark.SparkContext.init(SparkContext.scala:352) at org.apache.spark.examples.SimpleApp001$.main(SimpleApp001.scala:13) at org.apache.spark.examples.SimpleApp001.main(SimpleApp001.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.net.UnknownHostException: home ... 20 more I couldn't trace the cause of this exception. Any help in this regard? Thanks - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: UnknownhostException : home
+1 to Sean suggestion On Mon, Jan 19, 2015 at 3:21 PM, Sean Owen so...@cloudera.com wrote: I bet somewhere you have a path like hdfs://home/... which would suggest that 'home' is a hostname, when I imagine you mean it as a root directory. On Mon, Jan 19, 2015 at 9:33 AM, Rapelly Kartheek kartheek.m...@gmail.com wrote: Hi, I get the following exception when I run my application: karthik@karthik:~/spark-1.2.0$ ./bin/spark-submit --class org.apache.spark.examples.SimpleApp001 --deploy-mode client --master spark://karthik:7077 $SPARK_HOME/examples/*/scala-*/spark-examples-*.jar out1.txt log4j:WARN No such property [target] in org.apache.log4j.FileAppender. Exception in thread main java.lang.IllegalArgumentException: java.net.UnknownHostException: home at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:377) at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:237) at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:141) at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:569) at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:512) at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:142) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2316) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:366) at org.apache.spark.util.FileLogger.init(FileLogger.scala:90) at org.apache.spark.scheduler.EventLoggingListener.init(EventLoggingListener.scala:63) at org.apache.spark.SparkContext.init(SparkContext.scala:352) at org.apache.spark.examples.SimpleApp001$.main(SimpleApp001.scala:13) at org.apache.spark.examples.SimpleApp001.main(SimpleApp001.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.net.UnknownHostException: home ... 20 more I couldn't trace the cause of this exception. Any help in this regard? Thanks - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- thanks ashish Blog: http://www.ashishpaliwal.com/blog My Photo Galleries: http://www.pbase.com/ashishpaliwal - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: using hiveContext to select a nested Map-data-type from an AVROmodel+parquet file
I am quoting the reply I got on this - which for some reason did not get posted here. The suggestion in the reply below worked perfectly for me. The error mentioned in the reply is not related (or old). Hope this is helpful to someone. Cheers, BB Hi, BB Ideally you can do the query like: select key, value.percent from mytable_data lateral view explode(audiences) f as key, value limit 3; But there is a bug in HiveContext: https://issues.apache.org/jira/browse/SPARK-5237 I am working on it now, hopefully make a patch soon. Cheng Hao -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/using-hiveContext-to-select-a-nested-Map-data-type-from-an-AVROmodel-parquet-file-tp21168p21231.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
is there documentation on spark sql catalyst?
Where can I find a good documentation on sql catalyst? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/is-there-documentation-on-spark-sql-catalyst-tp21232.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: UnknownhostException : home
Yes yes.. hadoop/etc/hadoop/hdfs-site.xml file has the path like: hdfs://home/... On Mon, Jan 19, 2015 at 3:21 PM, Sean Owen so...@cloudera.com wrote: I bet somewhere you have a path like hdfs://home/... which would suggest that 'home' is a hostname, when I imagine you mean it as a root directory. On Mon, Jan 19, 2015 at 9:33 AM, Rapelly Kartheek kartheek.m...@gmail.com wrote: Hi, I get the following exception when I run my application: karthik@karthik:~/spark-1.2.0$ ./bin/spark-submit --class org.apache.spark.examples.SimpleApp001 --deploy-mode client --master spark://karthik:7077 $SPARK_HOME/examples/*/scala-*/spark-examples-*.jar out1.txt log4j:WARN No such property [target] in org.apache.log4j.FileAppender. Exception in thread main java.lang.IllegalArgumentException: java.net.UnknownHostException: home at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:377) at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:237) at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:141) at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:569) at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:512) at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:142) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2316) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:366) at org.apache.spark.util.FileLogger.init(FileLogger.scala:90) at org.apache.spark.scheduler.EventLoggingListener.init(EventLoggingListener.scala:63) at org.apache.spark.SparkContext.init(SparkContext.scala:352) at org.apache.spark.examples.SimpleApp001$.main(SimpleApp001.scala:13) at org.apache.spark.examples.SimpleApp001.main(SimpleApp001.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.net.UnknownHostException: home ... 20 more I couldn't trace the cause of this exception. Any help in this regard? Thanks
possible memory leak when re-creating SparkContext's in the same JVM
Problem we're seeing is a gradual memory leak in the driver's JVM. Executing jobs using a long running Java app which creates relatively short-lived SparkContext's. So our Spark drivers are created as part of this application's JVM. We're using standalone cluster mode, spark 1.0.2 Root cause of the memory leak seems to be Spark's DiskBlockManager - it registers a JVM shutdown hook that's responsible for deleting local spark dirs: Runtime.getRuntime.addShutdownHook(new Thread(delete Spark local dirs) (this doesn't seem to have changed in Spark 1.2) The problem is that this causes the entire Akka actor-system of each application to stay in memory: Runtime shutdown hooks - DiskBlockManager - ShuffleBlockManager - BlockManager - ActorSystemImpl Anyone came across this issue as well? I would imagine that with YARN, when using yarn-cluster mode, this would not be an issue, as the JVM running the Spark driver would itself be short-lived. Is that the case? Is there no way of creating short-lived SparkContext applications using the same JVM then? Is the only alternative using one long-running SparkContext? I did see examples of Java applications re-creating SparkContex's - for example, Ooyala's spark-jobserver - so I would imagine this is possible, no? https://github.com/ooyala/spark-jobserver/blob/master/job-server/src/spark.jobserver/JobManagerActor.scala#L104 Thanks, *Noam Barcay* Developer // *Kenshoo* *Office* +972 3 746-6500 *427 // *Mobile* +972 54 475-3142 __ *www.Kenshoo.com* http://kenshoo.com/ -- This e-mail, as well as any attached document, may contain material which is confidential and privileged and may include trademark, copyright and other intellectual property rights that are proprietary to Kenshoo Ltd, its subsidiaries or affiliates (Kenshoo). This e-mail and its attachments may be read, copied and used only by the addressee for the purpose(s) for which it was disclosed herein. If you have received it in error, please destroy the message and any attachment, and contact us immediately. If you are not the intended recipient, be aware that any review, reliance, disclosure, copying, distribution or use of the contents of this message without Kenshoo's express permission is strictly prohibited.
Is there any way to support multiple users executing SQL on thrift server?
Is there any way to support multiple users executing SQL on one thrift server? I think there are some problems for spark 1.2.0, for example: 1. Start thrift server with user A 2. Connect to thrift server via beeline with user B 3. Execute “insert into table dest select … from table src” then we found these items on hdfs: |drwxr-xr-x - B supergroup 0 2015-01-16 16:42 /tmp/hadoop/hive_2015-01-16_16-42-48_923_1860943684064616152-3/-ext-1 drwxr-xr-x - B supergroup 0 2015-01-16 16:42 /tmp/hadoop/hive_2015-01-16_16-42-48_923_1860943684064616152-3/-ext-1/_temporary drwxr-xr-x - B supergroup 0 2015-01-16 16:42 /tmp/hadoop/hive_2015-01-16_16-42-48_923_1860943684064616152-3/-ext-1/_temporary/0 drwxr-xr-x - A supergroup 0 2015-01-16 16:42 /tmp/hadoop/hive_2015-01-16_16-42-48_923_1860943684064616152-3/-ext-1/_temporary/0/_temporary drwxr-xr-x - A supergroup 0 2015-01-16 16:42 /tmp/hadoop/hive_2015-01-16_16-42-48_923_1860943684064616152-3/-ext-1/_temporary/0/task_201501161642_0022_m_00 -rw-r--r-- 3 A supergroup 2671 2015-01-16 16:42 /tmp/hadoop/hive_2015-01-16_16-42-48_923_1860943684064616152-3/-ext-1/_temporary/0/task_201501161642_0022_m_00/part-0 | You can see all the temporary path created on driver side (thrift server side) is owned by user B (which is what we expected). But all the output data created on executor side is owned by user A, (which is NOT what we expected). error owner of the output data cause |org.apache.hadoop.security.AccessControlException| while the driver side moving output data into |dest| table. Is anyone know how to resolve this problem?
Re: ALS.trainImplicit running out of mem when using higher rank
The problem is clearly to do with the executor exceeding YARN allocations, so, this can't be in local mode. He said this was running on YARN at the outset. On Mon, Jan 19, 2015 at 2:27 AM, Raghavendra Pandey raghavendra.pan...@gmail.com wrote: If you are running spark in local mode, executor parameters are not used as there is no executor. You should try to set corresponding driver parameter to effect it. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL Parquet - data are reading very very slow
Added a JIRA to track https://issues.apache.org/jira/browse/SPARK-5309 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Parquet-data-are-reading-very-very-slow-tp21061p21229.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
UnknownhostException : home
Hi, I get the following exception when I run my application: karthik@karthik:~/spark-1.2.0$ ./bin/spark-submit --class org.apache.spark.examples.SimpleApp001 --deploy-mode client --master spark://karthik:7077 $SPARK_HOME/examples/*/scala-*/spark-examples-*.jar out1.txt log4j:WARN No such property [target] in org.apache.log4j.FileAppender. Exception in thread main java.lang.IllegalArgumentException: java.net.UnknownHostException: home at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:377) at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:237) at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:141) at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:569) at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:512) at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:142) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2316) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:366) at org.apache.spark.util.FileLogger.init(FileLogger.scala:90) at org.apache.spark.scheduler.EventLoggingListener.init(EventLoggingListener.scala:63) at org.apache.spark.SparkContext.init(SparkContext.scala:352) at org.apache.spark.examples.SimpleApp001$.main(SimpleApp001.scala:13) at org.apache.spark.examples.SimpleApp001.main(SimpleApp001.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.net.UnknownHostException: home ... 20 more I couldn't trace the cause of this exception. Any help in this regard? Thanks
Re: Determine number of running executors
Hi, On Sat, Jan 17, 2015 at 3:05 AM, Shuai Zheng szheng.c...@gmail.com wrote: Can you share more information about how do you do that? I also have similar question about this. Not very proud about it ;-), but here you go: // find the number of workers available to us. val _runCmd = scala.util.Properties.propOrElse(sun.java.command, ) val numCoresRe = .*--executor-cores ([0-9]+) --num-executors ([0-9]+).*.r val totalNumCores = _runCmd match { case numCoresRe(coresPerExecutor, numExecutors) = coresPerExecutor.toInt * numExecutors.toInt case _ = 0 } if (totalNumCores 0) logger.debug(total number of cores: + totalNumCores) else logger.warn(could not extract number of cores from run command: + _runCmd) Tobias
Re: UnknownhostException : home
I bet somewhere you have a path like hdfs://home/... which would suggest that 'home' is a hostname, when I imagine you mean it as a root directory. On Mon, Jan 19, 2015 at 9:33 AM, Rapelly Kartheek kartheek.m...@gmail.com wrote: Hi, I get the following exception when I run my application: karthik@karthik:~/spark-1.2.0$ ./bin/spark-submit --class org.apache.spark.examples.SimpleApp001 --deploy-mode client --master spark://karthik:7077 $SPARK_HOME/examples/*/scala-*/spark-examples-*.jar out1.txt log4j:WARN No such property [target] in org.apache.log4j.FileAppender. Exception in thread main java.lang.IllegalArgumentException: java.net.UnknownHostException: home at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:377) at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:237) at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:141) at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:569) at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:512) at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:142) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2316) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:366) at org.apache.spark.util.FileLogger.init(FileLogger.scala:90) at org.apache.spark.scheduler.EventLoggingListener.init(EventLoggingListener.scala:63) at org.apache.spark.SparkContext.init(SparkContext.scala:352) at org.apache.spark.examples.SimpleApp001$.main(SimpleApp001.scala:13) at org.apache.spark.examples.SimpleApp001.main(SimpleApp001.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.net.UnknownHostException: home ... 20 more I couldn't trace the cause of this exception. Any help in this regard? Thanks - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: UnknownhostException : home
Actually, I don't have any entry in my /etc/hosts file with hostname: home. Infact, I didn't use this hostname naywhere. Then why is it that its trying to resolve this? On Mon, Jan 19, 2015 at 3:15 PM, Ashish paliwalash...@gmail.com wrote: it's not able to resolve home to an IP. Assuming it's your local machine, add an entry in your /etc/hosts file like and then run the program again (use sudo to edit the file) 127.0.0.1 home On Mon, Jan 19, 2015 at 3:03 PM, Rapelly Kartheek kartheek.m...@gmail.com wrote: Hi, I get the following exception when I run my application: karthik@karthik:~/spark-1.2.0$ ./bin/spark-submit --class org.apache.spark.examples.SimpleApp001 --deploy-mode client --master spark://karthik:7077 $SPARK_HOME/examples/*/scala-*/spark-examples-*.jar out1.txt log4j:WARN No such property [target] in org.apache.log4j.FileAppender. Exception in thread main java.lang.IllegalArgumentException: java.net.UnknownHostException: home at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:377) at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:237) at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:141) at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:569) at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:512) at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:142) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2316) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:366) at org.apache.spark.util.FileLogger.init(FileLogger.scala:90) at org.apache.spark.scheduler.EventLoggingListener.init(EventLoggingListener.scala:63) at org.apache.spark.SparkContext.init(SparkContext.scala:352) at org.apache.spark.examples.SimpleApp001$.main(SimpleApp001.scala:13) at org.apache.spark.examples.SimpleApp001.main(SimpleApp001.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.net.UnknownHostException: home ... 20 more I couldn't trace the cause of this exception. Any help in this regard? Thanks -- thanks ashish Blog: http://www.ashishpaliwal.com/blog My Photo Galleries: http://www.pbase.com/ashishpaliwal
Re: UnknownhostException : home
Yeah... I made that mistake in spark/conf/spark-defaults.conf for setting: spark.eventLog.dir. Now it works Thank you Karthik On Mon, Jan 19, 2015 at 3:29 PM, Sean Owen so...@cloudera.com wrote: Sorry, to be clear, you need to write hdfs:///home/ Note three slashes; there is an empty host between the 2nd and 3rd. This is true of most URI schemes with a host. On Mon, Jan 19, 2015 at 9:56 AM, Rapelly Kartheek kartheek.m...@gmail.com wrote: Yes yes.. hadoop/etc/hadoop/hdfs-site.xml file has the path like: hdfs://home/... On Mon, Jan 19, 2015 at 3:21 PM, Sean Owen so...@cloudera.com wrote: I bet somewhere you have a path like hdfs://home/... which would suggest that 'home' is a hostname, when I imagine you mean it as a root directory. On Mon, Jan 19, 2015 at 9:33 AM, Rapelly Kartheek kartheek.m...@gmail.com wrote: Hi, I get the following exception when I run my application: karthik@karthik:~/spark-1.2.0$ ./bin/spark-submit --class org.apache.spark.examples.SimpleApp001 --deploy-mode client --master spark://karthik:7077 $SPARK_HOME/examples/*/scala-*/spark-examples-*.jar out1.txt log4j:WARN No such property [target] in org.apache.log4j.FileAppender. Exception in thread main java.lang.IllegalArgumentException: java.net.UnknownHostException: home at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:377) at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:237) at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:141) at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:569) at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:512) at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:142) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2316) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:366) at org.apache.spark.util.FileLogger.init(FileLogger.scala:90) at org.apache.spark.scheduler.EventLoggingListener.init(EventLoggingListener.scala:63) at org.apache.spark.SparkContext.init(SparkContext.scala:352) at org.apache.spark.examples.SimpleApp001$.main(SimpleApp001.scala:13) at org.apache.spark.examples.SimpleApp001.main(SimpleApp001.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.net.UnknownHostException: home ... 20 more I couldn't trace the cause of this exception. Any help in this regard? Thanks
Re: Does Spark automatically run different stages concurrently when possible?
+1, I too need to know. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Does-Spark-automatically-run-different-stages-concurrently-when-possible-tp21075p21233.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Does Spark automatically run different stages concurrently when possible?
From the OP: (1) val lines = Import full dataset using sc.textFile (2) val ABonly = Filter out all rows from lines that are not of type A or B (3) val processA = Process only the A rows from ABonly (4) val processB = Process only the B rows from ABonly I assume that 3 and 4 are actions, or else nothing happens here at all. When 3 is invoked, it will compute 1, then 2, then 3. 4 will happen after 3, and may even cause 1 and 2 to happen again if nothing is persisted. You can invoke 3 and 4 in parallel on the driver if you like. That's fine. But actions are blocking in the driver. On Mon, Jan 19, 2015 at 8:21 AM, davidkl davidkl...@hotmail.com wrote: Hi Jon, I am looking for an answer for a similar question in the doc now, so far no clue. I would need to know what is spark behaviour in a situation like the example you provided, but taking into account also that there are multiple partitions/workers. I could imagine it's possible that different spark workers are not synchronized in terms of waiting for each other to progress to the next step/stage for the partitions of data they get assigned, while I believe in streaming they would wait for the current batch to complete before they start working on a new one. In the code I am working on, I need to make sure a particular step is completed (in all workers, for all partitions) before next transformation is applied. Would be great if someone could clarify or point to these issues in the doc! :-) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Does-Spark-automatically-run-different-stages-concurrently-when-possible-tp21075p21227.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Streaming with Java: Expected ReduceByWindow to Return JavaDStream
For anyone who finds this later, looks like Jerry already took care of this here: https://issues.apache.org/jira/browse/SPARK-5315 Thanks! On Sun, Jan 18, 2015 at 10:28 PM, Shao, Saisai saisai.s...@intel.com wrote: Hi Jeff, From my understanding it seems more like a bug, since JavaDStreamLike is used for Java code, return a Scala DStream is not reasonable. You can fix this by submitting a PR, or I can help you to fix this. Thanks Jerry *From:* Jeff Nadler [mailto:jnad...@srcginc.com] *Sent:* Monday, January 19, 2015 2:04 PM *To:* user@spark.apache.org *Subject:* Streaming with Java: Expected ReduceByWindow to Return JavaDStream Can anyone tell me if my expectations are sane? I'm trying to do a reduceByWindow using the 3-arg signature (not providing an inverse reduce function): JavaDStreamwhatevs reducedStream = messages.reduceByWindow((x, y) - reduce(x, y), Durations.*seconds*(5), Durations.*seconds*(5)); This isn't building; looks like it's returning DStream not JavaDStream. From JavaDStreamLike.scala, looks like this sig returns DStream, the 4-arg sig with the inverse reduce returns JavaDStream. *def *reduceByWindow( reduceFunc: (T, T) = T, windowDuration: Duration, slideDuration: Duration ): DStream[T] = { dstream.reduceByWindow(reduceFunc, windowDuration, slideDuration) } So I'm just a noob. Is this a bug or am I missing something? Thanks! Jeff Nadler
Spark SQL: Assigning several aliases to the output (several return values) of an UDF
Hello I use Hive on Spark and have an issue with assigning several aliases to the output (several return values) of an UDF. I ran in several issues and ended up with a workaround (described at the end of this message). - Is assigning several aliases to the output of an UDF not supported by Spark SQL yet? - Is there a smarter solution than the one I ended up with finally - see (3)? 1) The query with following syntax is rejected due to the assigment of multiple aliases. Query SELECT my_function(param_one, param_two) AS (return_one, return_two, return_three) FROM my_table; Error Unsupported language features in query: SELECT my_function(param_one, param_two) AS (return_one, return_two, return_three) FROM my_table; TOK_QUERY TOK_FROM TOK_TABREF TOK_TABNAME my_table TOK_SELECT TOK_SELEXPR TOK_FUNCTION my_function TOK_TABLE_OR_COL param_one TOK_TABLE_OR_COL param_two return_one return_two return_three 2) Because of this error I searched for a way to avoid assigning multiple aliases to the UDF. I ended up having the following query and encountered another error/issue. Note: This error only occurs when having c_0 in select clause. Only selecting c_1 and c_2 works fine. Query SELECT return.c_0 AS return_one, return.c_1 AS return_two, return.c_2 AS return_three FROM (SELECT my_function(param_one, param_two) FROM my_table) return; Error java.lang.RuntimeException: Couldn't find c_0#504 in [c_0#521L,c_1#522,c_2#523] 3) My final (working) workaround is wrapping the actual query (the one with the UDF) with an additional select statement. Query SELECT result.c_0 AS return_one, result.c_1 AS return_two, result.c_2 AS return_three FROM(SELECT * FROM (SELECT my_function(param_one, param_two) FROM my_table) return) result; Error No error :) Kind regards Max -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Assigning-several-aliases-to-the-output-several-return-values-of-an-UDF-tp21238.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: unit tests with java.io.IOException: Could not create FileClient
Your classpath has some MapR jar. Is that intentional ? Cheers On Mon, Jan 19, 2015 at 6:58 AM, Jianguo Li flyingfromch...@gmail.com wrote: Hi, I created some unit tests to test some of the functions in my project which use Spark. However, when I used the sbt tool to build it and then ran the sbt test, I ran into java.io.IOException: Could not create FileClient: 2015-01-19 08:50:38,1894 ERROR Client fs/client/fileclient/cc/client.cc:385 Thread: -2 Failed to initialize client for cluster 127.0.0.1:7222, error Unknown error(108) num lines: 21 [info] TextFileAdapterTestSuite: [info] - Checking the RDD Vector Length *** FAILED *** [info] java.io.IOException: Could not create FileClient [info] at com.mapr.fs.MapRFileSystem.lookupClient(MapRFileSystem.java:351) [info] at com.mapr.fs.MapRFileSystem.lookupClient(MapRFileSystem.java:363) [info] at com.mapr.fs.MapRFileSystem.getMapRFileStatus(MapRFileSystem.java:795) [info] at com.mapr.fs.MapRFileSystem.getFileStatus(MapRFileSystem.java:822) [info] at org.apache.hadoop.fs.FileSystem.getFileStatus(FileSystem.java:1419) [info] at org.apache.hadoop.fs.FileSystem.globStatusInternal(FileSystem.java:1092) [info] at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1031) [info] at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:231) [info] at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:277) [info] at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199) [info] ... The only tests failed, which I believe led to this exception are the ones where my functions call the SparkContext's function textFile(). I tried to debug this, and found that the exception seems to take place within the textFile() function. Does anybody know what is the issue and how to fix it? I used the local host for the SparkContext, does it have anything to do with this exception. Thanks, Jianguo
Re: Does Spark automatically run different stages concurrently when possible?
Hi, john and david I tried this to run them concurrently List(RDD1,RDD2,.).par.foreach{ rdd= rdd.collect().foreach(println) } this was able to successfully register the task but the parallelism of the stages is limited it was able run 4 of them some time and only one of them some time which was not consistent. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Does-Spark-automatically-run-different-stages-concurrently-when-possible-tp21075p21240.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Does Spark automatically run different stages concurrently when possible?
Keep in mind that your executors will be able to run some fixed number of tasks in parallel, given your configuration. You should not necessarily expect that arbitrarily many RDDs and tasks would schedule simultaneously. On Mon, Jan 19, 2015 at 5:34 PM, critikaled isasmani@gmail.com wrote: Hi, john and david I tried this to run them concurrently List(RDD1,RDD2,.).par.foreach{ rdd= rdd.collect().foreach(println) } this was able to successfully register the task but the parallelism of the stages is limited it was able run 4 of them some time and only one of them some time which was not consistent. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: EC2 VPC script
I also found this issue. I have reported it as a bug https://issues.apache.org/jira/browse/SPARK-5242 and submitted a fix. You can find link to fixed fork in the comments on the issue page. Please vote on the issue, hopefully guys will accept pull request faster then :) Regards, Vladimir On Mon, Dec 29, 2014 at 7:48 PM, Eduardo Cusa eduardo.c...@usmediaconsulting.com wrote: I running the master branch. Finally I can make it work, changing all occurrences of *public_dns_name* property with *private_ip_address* in the spark_ec2.py script. My VPC instances always have null value in *public_dns_name* property Now my script only work for VPC instances. Regards Eduardo On Sat, Dec 20, 2014 at 7:53 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: What version of the script are you running? What did you see in the EC2 web console when this happened? Sometimes instances just don't come up in a reasonable amount of time and you have to kill and restart the process. Does this always happen, or was it just once? Nick On Thu, Dec 18, 2014 at 9:42 AM, Eduardo Cusa eduardo.c...@usmediaconsulting.com wrote: Hi guys. I run the folling command to lauch a new cluster : ./spark-ec2 -k test -i test.pem -s 1 --vpc-id vpc-X --subnet-id subnet-X launch vpc_spark The instances started ok but the command never end. With the following output: Setting up security groups... Searching for existing cluster vpc_spark... Spark AMI: ami-5bb18832 Launching instances... Launched 1 slaves in us-east-1a, regid = r-e9d603c4 Launched master in us-east-1a, regid = r-89d104a4 Waiting for cluster to enter 'ssh-ready' state... any ideas what happend? regards Eduardo
Re: Why Parquet Predicate Pushdown doesn't work?
Hi guys, Does this issue affect 1.2.0 only or all previous releases as well? Best Regards, Jerry On Thu, Jan 8, 2015 at 1:40 AM, Xuelin Cao xuelincao2...@gmail.com wrote: Yes, the problem is, I've turned the flag on. One possible reason for this is, the parquet file supports predicate pushdown by setting statistical min/max value of each column on parquet blocks. If in my test, the groupID=10113000 is scattered in all parquet blocks, then the predicate pushdown fails. But, I'm not quite sure about that. I don't know whether there is any other reason that can lead to this. On Wed, Jan 7, 2015 at 10:14 PM, Cody Koeninger c...@koeninger.org wrote: But Xuelin already posted in the original message that the code was using SET spark.sql.parquet.filterPushdown=true On Wed, Jan 7, 2015 at 12:42 AM, Daniel Haviv danielru...@gmail.com wrote: Quoting Michael: Predicate push down into the input format is turned off by default because there is a bug in the current parquet library that null pointers when there are full row groups that are null. https://issues.apache.org/jira/browse/SPARK-4258 You can turn it on if you want: http://spark.apache.org/docs/latest/sql-programming-guide.html#configuration Daniel On 7 בינו׳ 2015, at 08:18, Xuelin Cao xuelin...@yahoo.com.INVALID wrote: Hi, I'm testing parquet file format, and the predicate pushdown is a very useful feature for us. However, it looks like the predicate push down doesn't work after I set sqlContext.sql(SET spark.sql.parquet.filterPushdown=true) Here is my sql: *sqlContext.sql(select adId, adTitle from ad where groupId=10113000).collect* Then, I checked the amount of input data on the WEB UI. But the amount of input data is ALWAYS 80.2M regardless whether I turn the spark.sql.parquet.filterPushdown flag on or off. I'm not sure, if there is anything that I must do when *generating *the parquet file in order to make the predicate pushdown available. (Like ORC file, when creating the ORC file, I need to explicitly sort the field that will be used for predicate pushdown) Anyone have any idea? And, anyone knows the internal mechanism for parquet predicate pushdown? Thanks
Error for first run from iPython Notebook
Hi, I've setup my first spark cluster (1 master, 2 workers) and an iPython notebook server that I'm trying to setup to access the cluster. I'm running the workers from Anaconda to make sure the python setup is correct on each box. The iPy notebook server appears to have everything setup correctly, and I'm able to initialize Spark and push a job out. However, the job is failing, and I'm not sure how to troubleshoot. Here's the code: from pyspark import SparkContext CLUSTER_URL = 'spark://192.168.1.20:7077' sc = SparkContext( CLUSTER_URL, 'pyspark') def sample(p): x, y = random(), random() return 1 if x*x + y*y 1 else 0 count = sc.parallelize(xrange(0, 20)).map(sample).reduce(lambda a, b: a + b) print Pi is roughly %f % (4.0 * count / 20) And here's the error: Py4JJavaError Traceback (most recent call last)ipython-input-4-e8dce94b43bb in module() 3 return 1 if x*x + y*y 1 else 0 4 5 count = sc.parallelize(xrange(0, 20)).map(sample).reduce(lambda a, b: a + b) 6 print Pi is roughly %f % (4.0 * count / 20) /opt/spark-1.2.0/python/pyspark/rdd.pyc in reduce(self, f)713 yield reduce(f, iterator, initial)714 -- 715 vals = self.mapPartitions(func).collect()716 if vals:717 return reduce(f, vals) /opt/spark-1.2.0/python/pyspark/rdd.pyc in collect(self)674 675 with SCCallSiteSync(self.context) as css:-- 676 bytesInJava = self._jrdd.collect().iterator()677 return list(self._collect_iterator_through_file(bytesInJava))678 /opt/spark-1.2.0/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)536 answer = self.gateway_client.send_command(command)537 return_value = get_return_value(answer, self.gateway_client,-- 538 self.target_id, self.name)539 540 for temp_arg in temp_args: /opt/spark-1.2.0/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)298 raise Py4JJavaError(299 'An error occurred while calling {0}{1}{2}.\n'.-- 300 format(target_id, '.', name), value)301 else:302 raise Py4JError( Py4JJavaError: An error occurred while calling o28.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 31 in stage 0.0 failed 4 times, most recent failure: Lost task 31.3 in stage 0.0 (TID 72, 192.168.1.21): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /opt/spark-1.2.0/python/pyspark/worker.py, line 107, in main process() File /opt/spark-1.2.0/python/pyspark/worker.py, line 98, in process serializer.dump_stream(func(split_index, iterator), outfile) File /opt/spark-1.2.0/python/pyspark/serializers.py, line 227, in dump_stream vs = list(itertools.islice(iterator, batch)) File /opt/spark-1.2.0/python/pyspark/rdd.py, line 710, in func initial = next(iterator) File ipython-input-4-e8dce94b43bb, line 2, in sample TypeError: 'module' object is not callable at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137) at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:174) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) at
Re: How to get the master URL at runtime inside driver program?
If you pass spark master URL to spark-submit, you don't need to pass the same to SparkConf object. You can create SparkConf without this property or for that matter any other property that you pass in spark-submit. On Sun, Jan 18, 2015 at 7:38 AM, guxiaobo1982 guxiaobo1...@qq.com wrote: Hi, Driver programs submitted by the spark-submit script will get the runtime spark master URL, but how it get the URL inside the main method when creating the SparkConf object? Regards,
Re: ERROR TaskSchedulerImpl: Lost an executor
I am trying to run SparkR shell on aws I am unable to access worker nodes webUI access. 15/01/19 19:57:17 ERROR TaskSchedulerImpl: Lost an executor 0 (already removed): remote Akka client disassociated 15/01/19 19:57:17 ERROR TaskSchedulerImpl: Lost an executor 1 (already removed): remote Akka client disassociated 15/01/19 19:57:17 ERROR TaskSchedulerImpl: Lost an executor 2 (already removed): remote Akka client disassociated 15/01/19 19:57:50 ERROR Remoting: org.apache.spark.storage.BlockManagerId; local class incompatible: stream classdesc serialVersionUID = 2439208141545036836, local class serialVersionUID = -7366074099953117729 java.io.InvalidClassException: org.apache.spark.storage.BlockManagerId; local class incompatible: stream classdesc serialVersionUID = 2439208141545036836, local class serialVersionUID = -7366074099953117729 at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136) at akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104) at scala.util.Try$.apply(Try.scala:161) at akka.serialization.Serialization.deserialize(Serialization.scala:98) at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23) at akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:55) at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:55) at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:73) at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:764) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 15/01/19 19:57:50 ERROR Remoting: org.apache.spark.storage.BlockManagerId; local class incompatible: stream classdesc serialVersionUID = 2439208141545036836, local class serialVersionUID = -7366074099953117729 java.io.InvalidClassException: org.apache.spark.storage.BlockManagerId; local class incompatible: stream classdesc serialVersionUID = 2439208141545036836, local class serialVersionUID = -7366074099953117729 at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136) at akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104) at scala.util.Try$.apply(Try.scala:161) at akka.serialization.Serialization.deserialize(Serialization.scala:98) at
Re: ERROR TaskSchedulerImpl: Lost an executor
Have you seen this thread ? http://search-hadoop.com/m/JW1q5PgA7X What Spark release are you running ? Cheers On Mon, Jan 19, 2015 at 12:04 PM, suresh lanki.sur...@gmail.com wrote: I am trying to run SparkR shell on aws I am unable to access worker nodes webUI access. 15/01/19 19:57:17 ERROR TaskSchedulerImpl: Lost an executor 0 (already removed): remote Akka client disassociated 15/01/19 19:57:17 ERROR TaskSchedulerImpl: Lost an executor 1 (already removed): remote Akka client disassociated 15/01/19 19:57:17 ERROR TaskSchedulerImpl: Lost an executor 2 (already removed): remote Akka client disassociated 15/01/19 19:57:50 ERROR Remoting: org.apache.spark.storage.BlockManagerId; local class incompatible: stream classdesc serialVersionUID = 2439208141545036836, local class serialVersionUID = -7366074099953117729 java.io.InvalidClassException: org.apache.spark.storage.BlockManagerId; local class incompatible: stream classdesc serialVersionUID = 2439208141545036836, local class serialVersionUID = -7366074099953117729 at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136) at akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104) at scala.util.Try$.apply(Try.scala:161) at akka.serialization.Serialization.deserialize(Serialization.scala:98) at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23) at akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:55) at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:55) at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:73) at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:764) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 15/01/19 19:57:50 ERROR Remoting: org.apache.spark.storage.BlockManagerId; local class incompatible: stream classdesc serialVersionUID = 2439208141545036836, local class serialVersionUID = -7366074099953117729 java.io.InvalidClassException: org.apache.spark.storage.BlockManagerId; local class incompatible: stream classdesc serialVersionUID = 2439208141545036836, local class serialVersionUID = -7366074099953117729 at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136)
Re: ERROR TaskSchedulerImpl: Lost an executor
Hi Yu, I am able to run Spark-example's, I am unable to run SparkR example (only Pi example is running on SparkR). Thank you Regards Suresh On Mon, Jan 19, 2015 at 3:08 PM, Ted Yu yuzhih...@gmail.com wrote: Have you seen this thread ? http://search-hadoop.com/m/JW1q5PgA7X What Spark release are you running ? Cheers On Mon, Jan 19, 2015 at 12:04 PM, suresh lanki.sur...@gmail.com wrote: I am trying to run SparkR shell on aws I am unable to access worker nodes webUI access. 15/01/19 19:57:17 ERROR TaskSchedulerImpl: Lost an executor 0 (already removed): remote Akka client disassociated 15/01/19 19:57:17 ERROR TaskSchedulerImpl: Lost an executor 1 (already removed): remote Akka client disassociated 15/01/19 19:57:17 ERROR TaskSchedulerImpl: Lost an executor 2 (already removed): remote Akka client disassociated 15/01/19 19:57:50 ERROR Remoting: org.apache.spark.storage.BlockManagerId; local class incompatible: stream classdesc serialVersionUID = 2439208141545036836, local class serialVersionUID = -7366074099953117729 java.io.InvalidClassException: org.apache.spark.storage.BlockManagerId; local class incompatible: stream classdesc serialVersionUID = 2439208141545036836, local class serialVersionUID = -7366074099953117729 at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136) at akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104) at scala.util.Try$.apply(Try.scala:161) at akka.serialization.Serialization.deserialize(Serialization.scala:98) at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23) at akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:55) at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:55) at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:73) at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:764) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 15/01/19 19:57:50 ERROR Remoting: org.apache.spark.storage.BlockManagerId; local class incompatible: stream classdesc serialVersionUID = 2439208141545036836, local class serialVersionUID = -7366074099953117729 java.io.InvalidClassException: org.apache.spark.storage.BlockManagerId; local class incompatible: stream classdesc serialVersionUID = 2439208141545036836, local class serialVersionUID = -7366074099953117729 at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at