Re: Spark ec2 lunch problem
spark-ec2 is the way to go however you may need to debug connectivity issues. For example do you know that the servers were correctly setup in AWS and can you access each node using ssh? If no then you need to work out why (it’s not a spark issue). If yes then you will need to work out why ssh via the spark-ec2 script is not working. I’ve used spark-ec2 successfully many times but have never used the —vpc-id and —subnet-id options and that may be the source of your problems, especially since it appears to be a hostname resolution issue. If you could confirm the above questions then maybe someone on the list can help diagnose the specific problem. --- Robin East Spark GraphX in Action Michael Malak and Robin East Manning Publications Co. http://www.manning.com/malak/ http://www.manning.com/malak/ On 24 Aug 2015, at 13:45, Garry Chen g...@cornell.edu wrote: So what is the best way to deploy spark cluster in EC2 environment any suggestions? Garry From: Akhil Das [mailto:ak...@sigmoidanalytics.com mailto:ak...@sigmoidanalytics.com] Sent: Friday, August 21, 2015 4:27 PM To: Garry Chen g...@cornell.edu mailto:g...@cornell.edu Cc: user@spark.apache.org mailto:user@spark.apache.org Subject: Re: Spark ec2 lunch problem It may happen that the version of spark-ec2 script you are using is buggy or sometime AWS have problem provisioning machines. On Aug 21, 2015 7:56 AM, Garry Chen g...@cornell.edu mailto:g...@cornell.edu wrote: Hi All, I am trying to lunch a spark ec2 cluster by running spark-ec2 --key-pair=key --identity-file=my.pem --vpc-id=myvpc --subnet-id=subnet-011 --spark-version=1.4.1 launch spark-cluster but getting following message endless. Please help. Warning: SSH connection error. (This could be temporary.) Host: SSH return code: 255 SSH output: ssh: Could not resolve hostname : Name or service not known
How to evaluate custom UDF over window
The ultimate aim of my program is to be able to wrap an arbitrary Scala function (mostly will be statistics / customized rolling window metrics) in a UDF and evaluate them on DataFrames using the window functionality. So my main question is how do I express that a UDF takes a Frame of rows from a DataFrame as an argument instead of just a single row? And what sort of arguments would the arbitrary Scala function need to take in order to handle the raw data from the Frame? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-evaluate-custom-UDF-over-window-tp24419.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Loading already existing tables in spark shell
Hi Jeetendra, I faced this issue. I did not specify the database where this table exists. Please set the database by using use database command before executing the query. Regards, Ishwardeep From: Jeetendra Gangele gangele...@gmail.com Sent: Monday, August 24, 2015 5:47 PM To: user Subject: Loading already existing tables in spark shell Hi All I have few tables in hive and I wanted to run query against them with spark as execution engine. Can I direct;y load these tables in spark shell and run query? I tried with 1.val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) 2.qlContext.sql(FROM event_impressions select count(*)) where event_impressions is the table name. It give me error saying org.apache.spark.sql.AnalysisException: no such table event_impressions; line 1 pos 5 Does anybody hit similar issues? regards jeetendra NOTE: This message may contain information that is confidential, proprietary, privileged or otherwise protected by law. The message is intended solely for the named addressee. If received in error, please destroy and notify the sender. Any use of this email is prohibited when received in error. Impetus does not represent, warrant and/or guarantee, that the integrity of this communication has been maintained nor that the communication is free of errors, virus, interception or interference.
Performance - Python streaming v/s Scala streaming
I am new to SPARK streaming. I was running the kafka_wordcount example with a local KAFKA and SPARK instance. It was very easy to set this up and get going :)I tried running both SCALA and Python versions of the word count example. Python versions seems to be extremely slow. Sometimes it has delays of more than couple of minutes. On the other hand SCALA versions seems to be way better. I am running on a windows machine.I am trying to understand what is the cause slowness in python streaming? Is there anything that I am missing? For real time streaming analysis should I prefer SCALA? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Performance-Python-streaming-v-s-Scala-streaming-tp24415.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Joining using mulitimap or array
Thanks, but I think this is not the case of multiple spark contexts (never the less I tried your suggestion - didn’t worked). The problem is join to datasets using array items value: attribute.value in my case. Has anyone ideas? 24 авг. 2015 г., в 15:01, satish chandra j jsatishchan...@gmail.com написал(а): Hi, If you join logic is correct, it seems to be a similar issue which i faced recently Can you try by SparkContext(conf).set(spark.driver.allowMultipleContexts,true) Regards, Satish Chandra On Mon, Aug 24, 2015 at 2:51 PM, Ilya Karpov i.kar...@cleverdata.ru mailto:i.kar...@cleverdata.ru wrote: Hi, guys I'm confused about joining columns in SparkSQL and need your advice. I want to join 2 datasets of profiles. Each profile has name and array of attributes(age, gender, email etc). There can be mutliple instances of attribute with the same name, e.g. profile has 2 emails - so 2 attributes with name = 'email' in array. Now I want to join 2 datasets using 'email' attribute. I cant find the way to do it :( The code is below. Now result of join is empty, while I expect to see 1 row with all Alice emails. import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.{SparkConf, SparkContext} case class Attribute(name: String, value: String, weight: Float) case class Profile(name: String, attributes: Seq[Attribute]) object SparkJoinArrayColumn { def main(args: Array[String]) { val sc: SparkContext = new SparkContext(new SparkConf().setMaster(local).setAppName(getClass.getSimpleName)) val sqlContext: SQLContext = new SQLContext(sc) import sqlContext.implicits._ val a: DataFrame = sc.parallelize(Seq( Profile(Alice, Seq(Attribute(email, al...@mail.com mailto:al...@mail.com, 1.0f), Attribute(email, a.jo...@mail.com mailto:a.jo...@mail.com, 1.0f))) )).toDF.as(a) val b: DataFrame = sc.parallelize(Seq( Profile(Alice, Seq(Attribute(email, al...@mail.com mailto:al...@mail.com, 1.0f), Attribute(age, 29, 0.2f))) )).toDF.as(b) a.where($a.attributes.name http://a.attributes.name/ === email) .join( b.where($b.attributes.name http://b.attributes.name/ === email), $a.attributes.value === $b.attributes.value ) .show() } } Thanks forward! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org
Re: Joining using mulitimap or array
Hi, If you join logic is correct, it seems to be a similar issue which i faced recently Can you try by *SparkContext(conf).set(spark.driver.allowMultipleContexts,true)* Regards, Satish Chandra On Mon, Aug 24, 2015 at 2:51 PM, Ilya Karpov i.kar...@cleverdata.ru wrote: Hi, guys I'm confused about joining columns in SparkSQL and need your advice. I want to join 2 datasets of profiles. Each profile has name and array of attributes(age, gender, email etc). There can be mutliple instances of attribute with the same name, e.g. profile has 2 emails - so 2 attributes with name = 'email' in array. Now I want to join 2 datasets using 'email' attribute. I cant find the way to do it :( The code is below. Now result of join is empty, while I expect to see 1 row with all Alice emails. import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.{SparkConf, SparkContext} case class Attribute(name: String, value: String, weight: Float) case class Profile(name: String, attributes: Seq[Attribute]) object SparkJoinArrayColumn { def main(args: Array[String]) { val sc: SparkContext = new SparkContext(new SparkConf().setMaster(local).setAppName(getClass.getSimpleName)) val sqlContext: SQLContext = new SQLContext(sc) import sqlContext.implicits._ val a: DataFrame = sc.parallelize(Seq( Profile(Alice, Seq(Attribute(email, al...@mail.com, 1.0f), Attribute(email, a.jo...@mail.com, 1.0f))) )).toDF.as(a) val b: DataFrame = sc.parallelize(Seq( Profile(Alice, Seq(Attribute(email, al...@mail.com, 1.0f), Attribute(age, 29, 0.2f))) )).toDF.as(b) a.where($a.attributes.name === email) .join( b.where($b.attributes.name === email), $a.attributes.value === $b.attributes.value ) .show() } } Thanks forward! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Difficulties developing a Specs2 matcher for Spark Streaming
Hi, I've had some troubles developing a Specs2 matcher that checks that a predicate holds for all the elements of an RDD, and using it for testing a simple Spark Streaming program. I've finally been able to get a code that works, you can see it in https://gist.github.com/juanrh/dffd060e3a371676b83c, but I wanted to check with the list that I'm using the right approach. First I defined the matcher as follows: def foreachRecord[T](predicate : T = Boolean) : Matcher[RDD[T]] = { (rdd : RDD[T]) = val failingRecords = rdd.filter(! predicate(_)) ( failingRecords.isEmpty, each record fulfils the predicate, spredicate failed for records ${failingRecords.take(4).mkString(, )} ... ) } which works ok for examples like def simpleBatchTest = { val rdd = sc.parallelize(1 to 100, 3) rdd should foreachRecord(_ 0) } The problem started when I tried to use it to check that a predicate holds for all batches / RDDs of a DStream. The idea was using foreachRDD to update a driver local org.specs2.execute.Result object, to make an and for all the batches: def simpleStreamingTest : Result = { val ssc = new StreamingContext(sc, Duration(300)) val record = hola val batches = Seq.fill(5)(Seq.fill(10)(record)) val queue = new Queue[RDD[String]] queue ++= batches.map(batch = sc.parallelize(batch, numSlices = 2)) val inputDStream = ssc.queueStream(queue, oneAtATime = true) var result : Result = ok inputDStream.foreachRDD { rdd = val r = AsResult { rdd should foreachRecord(_ == record) } result = result and r } ssc.start() StreamingContextUtils.awaitForNBatchesCompleted(batches.length)(ssc) ssc.stop(stopSparkContext=false, stopGracefully=false) println(sresult : ${result}) result } This leads to an exception running the test, because AsResult { rdd should foreachRecord(_ == record) } fails because the closure argument of foreachRecord needs to access the local variable 'record', and to do that it tries to serialize the whole context, including 'result', for which it fails with: Driver stacktrace:,org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.io.InvalidClassException: org.specs2.execute.Success; no valid constructor To solve this I followed http://erikerlandson.github.io/blog/2015/03/31/hygienic-closures-for-scala-function-serialization/ to specify more carefully which values would be accessed by the closure, defining the matcher def foreachRecord[T,C](predicateContext : C)(toPredicate : C = (T = Boolean)) : Matcher[RDD[T]] = { val predicate = toPredicate(predicateContext) foreachRecord(predicate) } and then replacing the call above with val r = AsResult { rdd should foreachRecord(record)(r = { _ == r} ) } I'm writing to the list because I wanted to conform that this is a proper solution, and wanted to ask you guys if somebody can imagine a better solution, as this is not too bad but the call foreachRecord(record)(r = { _ == r} ) is still a bit ugly. It is curious that Spark's closure cleaner is smart enough to avoid sending 'result' to the closure in this very similar example: val r2 = AsResult { rdd.filter(_ != record).count === 0 } Also, I wanted to confirm that the approach of updating a var in driver in a foreachRDD is a good idea, I understand that foreachRDD runs in the driver, so that should be ok as long as the local variable is updated using action results only. Thanks a lot in advance. Greetings, Juan
RE: Spark ec2 lunch problem
So what is the best way to deploy spark cluster in EC2 environment any suggestions? Garry From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Friday, August 21, 2015 4:27 PM To: Garry Chen g...@cornell.edu Cc: user@spark.apache.org Subject: Re: Spark ec2 lunch problem It may happen that the version of spark-ec2 script you are using is buggy or sometime AWS have problem provisioning machines. On Aug 21, 2015 7:56 AM, Garry Chen g...@cornell.edumailto:g...@cornell.edu wrote: Hi All, I am trying to lunch a spark ec2 cluster by running spark-ec2 --key-pair=key --identity-file=my.pem --vpc-id=myvpc --subnet-id=subnet-011 --spark-version=1.4.1 launch spark-cluster but getting following message endless. Please help. Warning: SSH connection error. (This could be temporary.) Host: SSH return code: 255 SSH output: ssh: Could not resolve hostname : Name or service not known
Re: Joining using mulitimap or array
In your example, a.attributes.name is a list and is not a string . Run this to find it out : a.select($a.attributes.name).show() On Mon, Aug 24, 2015 at 2:51 PM, Ilya Karpov i.kar...@cleverdata.ru wrote: Hi, guys I'm confused about joining columns in SparkSQL and need your advice. I want to join 2 datasets of profiles. Each profile has name and array of attributes(age, gender, email etc). There can be mutliple instances of attribute with the same name, e.g. profile has 2 emails - so 2 attributes with name = 'email' in array. Now I want to join 2 datasets using 'email' attribute. I cant find the way to do it :( The code is below. Now result of join is empty, while I expect to see 1 row with all Alice emails. import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.{SparkConf, SparkContext} case class Attribute(name: String, value: String, weight: Float) case class Profile(name: String, attributes: Seq[Attribute]) object SparkJoinArrayColumn { def main(args: Array[String]) { val sc: SparkContext = new SparkContext(new SparkConf().setMaster(local).setAppName(getClass.getSimpleName)) val sqlContext: SQLContext = new SQLContext(sc) import sqlContext.implicits._ val a: DataFrame = sc.parallelize(Seq( Profile(Alice, Seq(Attribute(email, al...@mail.com, 1.0f), Attribute(email, a.jo...@mail.com, 1.0f))) )).toDF.as(a) val b: DataFrame = sc.parallelize(Seq( Profile(Alice, Seq(Attribute(email, al...@mail.com, 1.0f), Attribute(age, 29, 0.2f))) )).toDF.as(b) a.where($a.attributes.name === email) .join( b.where($b.attributes.name === email), $a.attributes.value === $b.attributes.value ) .show() } } Thanks forward! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: DataFrame#show cost 2 Spark Jobs ?
The first job is to infer the json schema, and the second one is what you mean of the query. You can provide the schema while loading the json file, like below: sqlContext.read.schema(xxx).json(“…”)? Hao From: Jeff Zhang [mailto:zjf...@gmail.com] Sent: Monday, August 24, 2015 6:20 PM To: user@spark.apache.org Subject: DataFrame#show cost 2 Spark Jobs ? It's weird to me that the simple show function will cost 2 spark jobs. DataFrame#explain shows it is a very simple operation, not sure why need 2 jobs. == Parsed Logical Plan == Relation[age#0L,name#1] JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json] == Analyzed Logical Plan == age: bigint, name: string Relation[age#0L,name#1] JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json] == Optimized Logical Plan == Relation[age#0L,name#1] JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json] == Physical Plan == Scan JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json][age#0L,name#1] -- Best Regards Jeff Zhang
Re: How to set environment of worker applications
System properties and environment variables are two different things.. One can use spark.executor.extraJavaOptions to pass system properties and spark-env.sh to pass environment variables. -raghav On Mon, Aug 24, 2015 at 1:00 PM, Hemant Bhanawat hemant9...@gmail.com wrote: That's surprising. Passing the environment variables using spark.executor.extraJavaOptions=-Dmyenvvar=xxx to the executor and then fetching them using System.getProperty(myenvvar) has worked for me. What is the error that you guys got? On Mon, Aug 24, 2015 at 12:10 AM, Sathish Kumaran Vairavelu vsathishkuma...@gmail.com wrote: spark-env.sh works for me in Spark 1.4 but not spark.executor.extraJavaOptions. On Sun, Aug 23, 2015 at 11:27 AM Raghavendra Pandey raghavendra.pan...@gmail.com wrote: I think the only way to pass on environment variables to worker node is to write it in spark-env.sh file on each worker node. On Sun, Aug 23, 2015 at 8:16 PM, Hemant Bhanawat hemant9...@gmail.com wrote: Check for spark.driver.extraJavaOptions and spark.executor.extraJavaOptions in the following article. I think you can use -D to pass system vars: spark.apache.org/docs/latest/configuration.html#runtime-environment Hi, I am starting a spark streaming job in standalone mode with spark-submit. Is there a way to make the UNIX environment variables with which spark-submit is started available to the processes started on the worker nodes? Jan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Loading already existing tables in spark shell
And be sure the hive-site.xml is under the classpath or under the path of $SPARK_HOME/conf Hao From: Ishwardeep Singh [mailto:ishwardeep.si...@impetus.co.in] Sent: Monday, August 24, 2015 8:57 PM To: user Subject: Re: Loading already existing tables in spark shell Hi Jeetendra, I faced this issue. I did not specify the database where this table exists. Please set the database by using use database command before executing the query. Regards, Ishwardeep From: Jeetendra Gangele gangele...@gmail.commailto:gangele...@gmail.com Sent: Monday, August 24, 2015 5:47 PM To: user Subject: Loading already existing tables in spark shell Hi All I have few tables in hive and I wanted to run query against them with spark as execution engine. Can I direct;y load these tables in spark shell and run query? I tried with 1.val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) 2.qlContext.sql(FROM event_impressions select count(*)) where event_impressions is the table name. It give me error saying org.apache.spark.sql.AnalysisException: no such table event_impressions; line 1 pos 5 Does anybody hit similar issues? regards jeetendra NOTE: This message may contain information that is confidential, proprietary, privileged or otherwise protected by law. The message is intended solely for the named addressee. If received in error, please destroy and notify the sender. Any use of this email is prohibited when received in error. Impetus does not represent, warrant and/or guarantee, that the integrity of this communication has been maintained nor that the communication is free of errors, virus, interception or interference.
DataFrame/JDBC very slow performance
I am trying to access a mid-size Teradata table (~100 million rows) via JDBC in standalone mode on a single node (local[*]). When I tried with BIG table (5B records) then no results returned upon completion of query. I am using Spark 1.4.1. and is setup on a very powerful machine(2 cpu, 24 cores, 126G RAM). I have tried several memory setup and tuning options to make it work faster, but neither of them made a huge impact. I am sure there is something I am missing and below is my final try that took about 11 minutes to get this simple counts vs it only took 40 seconds using a JDBC connection through R to get the counts. bin/pyspark --driver-memory 40g --executor-memory 40g df = sqlContext.read.jdbc(jdbc:teradata://..) df.count() [image: Inline image 1]
Re: Got wrong md5sum for boto
Additional info...If I use an online md5sum check then it matches...So, it's either windows or python (using 2.7.10) On Mon, Aug 24, 2015 at 11:54 AM, Justin Pihony justin.pih...@gmail.com wrote: When running the spark_ec2.py script, I'm getting a wrong md5sum. I've now seen this on two different machines. I am running on windows, but I would imagine that shouldn't affect the md5. Is this a boto problem, python problem, spark problem? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Got-wrong-md5sum-for-boto-tp24420.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to evaluate custom UDF over window
For now, user-defined window function is not supported. We will add it in future. On Mon, Aug 24, 2015 at 6:26 AM, xander92 alexander.fra...@ompnt.com wrote: The ultimate aim of my program is to be able to wrap an arbitrary Scala function (mostly will be statistics / customized rolling window metrics) in a UDF and evaluate them on DataFrames using the window functionality. So my main question is how do I express that a UDF takes a Frame of rows from a DataFrame as an argument instead of just a single row? And what sort of arguments would the arbitrary Scala function need to take in order to handle the raw data from the Frame? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-evaluate-custom-UDF-over-window-tp24419.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Got wrong md5sum for boto
I found this solution: https://stackoverflow.com/questions/3390484/python-hashlib-md5-differs-between-linux-windows Does anybody see a reason why I shouldn't put in a PR to make this change? FROM with open(tgz_file_path) as tar: TO with open(tgz_file_path, rb) as tar: On Mon, Aug 24, 2015 at 11:58 AM, Justin Pihony justin.pih...@gmail.com wrote: Additional info...If I use an online md5sum check then it matches...So, it's either windows or python (using 2.7.10) On Mon, Aug 24, 2015 at 11:54 AM, Justin Pihony justin.pih...@gmail.com wrote: When running the spark_ec2.py script, I'm getting a wrong md5sum. I've now seen this on two different machines. I am running on windows, but I would imagine that shouldn't affect the md5. Is this a boto problem, python problem, spark problem? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Got-wrong-md5sum-for-boto-tp24420.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Drop table and Hive warehouse
Thats not the expected behavior. What version of Spark? On Mon, Aug 24, 2015 at 1:32 AM, Kevin Jung itsjb.j...@samsung.com wrote: When I store DataFrame as table with command saveAsTable and then execute DROP TABLE in SparkSQL, it doesn't actually delete files in hive warehouse. The table disappears from a table list but the data files are still alive. Because of this, I can't saveAsTable with a same name before dropping table. Is it a normal situation? If it is, I will delete files manually ;) Kevin 상기 메일은 지정된 수신인만을 위한 것이며, 부정경쟁방지 및 영업비밀보호에 관한 법률,개인정보 보호법을 포함하여 관련 법령에 따라 보호의 대상이 되는 영업비밀, 산업기술,기밀정보, 개인정보 등을 포함하고 있을 수 있습니다. 본 문서에 포함된 정보의 전부 또는 일부를 무단으로 복사 또는 사용하거나 제3자에게 공개, 배포, 제공하는 것은 엄격히 금지됩니다. 본 메일이 잘못 전송된 경우 발신인 또는 당사에게 알려주시고 본 메일을 즉시 삭제하여 주시기 바랍니다. The contents of this e-mail message and any attachments are confidential and are intended solely for addressee. The information may also be legally privileged. This transmission is sent in trust, for the sole purpose of delivery to the intended recipient. If you have received this transmission in error, any use, reproduction or dissemination of this transmission is strictly prohibited. If you are not the intended recipient, please immediately notify the sender by reply e-mail or phone and delete this message and its attachments, if any.
Got wrong md5sum for boto
When running the spark_ec2.py script, I'm getting a wrong md5sum. I've now seen this on two different machines. I am running on windows, but I would imagine that shouldn't affect the md5. Is this a boto problem, python problem, spark problem? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Got-wrong-md5sum-for-boto-tp24420.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [Spark Streaming on Mesos (good practices)]
Here is the answer to my question if somebody needs it Running Spark in Standalone mode or coarse-grained Mesos mode leads to better task launch times than the fine-grained Mesos mode. The resource is http://spark.apache.org/docs/latest/streaming-programming-guide.html On Mon, Aug 24, 2015 at 12:15 PM, Aram Mkrtchyan aram.mkrtchyan...@gmail.com wrote: which are the best practices to submit spark streaming application on mesos. I would like to know about scheduler mode. Is `coarse-grained` mode right solution? Thanks
Re: Unable to catch SparkContext methods exceptions
textFile is a lazy operation. It doesn't evaluate until you call an action on it, such as .count(). Therefore, you won't catch the exception there. Best, Burak On Mon, Aug 24, 2015 at 9:09 AM, Roberto Coluccio roberto.coluc...@gmail.com wrote: Hello folks, I'm experiencing an unexpected behaviour, that suggests me thinking about my missing notions on how Spark works. Let's say I have a Spark driver that invokes a function like: - in myDriver - val sparkContext = new SparkContext(mySparkConf) val inputPath = file://home/myUser/project/resources/date=*/* val myResult = new MyResultFunction()(sparkContext, inputPath) - in MyResultFunctionOverRDD -- class MyResultFunction extends Function2[SparkContext, String, RDD[String]] with Serializable { override def apply(sparkContext: SparkContext, inputPath: String): RDD[String] = { try { sparkContext.textFile(inputPath, 1) } catch { case t: Throwable = { myLogger.error(serror: ${t.getStackTraceString}\n) sc.makeRDD(Seq[String]()) } } } } What happens is that I'm *unable to catch exceptions* thrown by the textFile method within the try..catch clause in MyResultFunction. In fact, in a unit test for that function where I call it passing an invalid inputPath, I don't get an empty RDD as result, but the unit test exits (and fails) due to exception not handled. What am I missing here? Thank you. Best regards, Roberto
Re: Spark Direct Streaming With ZK Updates
Forgot to include the PR I was referencing: https://github.com/apache/spark/pull/4805/ -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Direct-Streaming-With-ZK-Updates-tp24423p24424.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Direct Streaming With ZK Updates
It doesn't matter if shuffling occurs. Just update ZK from the driver, inside the foreachRDD, after all your dynamodb updates are done. Since you're just doing it for monitoring purposes, that should be fine. On Mon, Aug 24, 2015 at 12:11 PM, suchenzang suchenz...@gmail.com wrote: Forgot to include the PR I was referencing: https://github.com/apache/spark/pull/4805/ -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Direct-Streaming-With-ZK-Updates-tp24423p24424.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Kafka Spark Partition Mapping
Dear Cody, Thanks for your response, I am trying to do decoration which means when a message comes from Kafka (partitioned by key) in to the Spark I want to add more fields/data to it. How Does normally people do it in Spark? If it were you how would you decorate message without hitting database for every message? Our current strategy is, decoration data comes from local in Memory Cache (Guava LoadingCache) and/or from SQL DB if not in cache. If we take this approach we want cached decoration data available locally to RDDs most of the time. Our Kafka and Spark run on separate machines and thats why I just wants kafka-partition to go to same Spark RDD partition most of time so I can utilized cached decoration Data. Do you think if I Create JdbcRDD for décorarion data and join it with JavaPairReceiverInputDStream it will always stays where JdbcRDD lives? Nehal From: Cody Koeninger c...@koeninger.orgmailto:c...@koeninger.org Date: Thursday, August 20, 2015 at 6:33 PM To: Microsoft Office User nehal_s...@cable.comcast.commailto:nehal_s...@cable.comcast.com Cc: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Kafka Spark Partition Mapping In general you cannot guarantee which node an RDD will be processed on. The preferred location for a kafkardd is the kafka leader for that partition, if they're deployed on the same machine. If you want to try to override that behavior, the method is getPreferredLocations But even in that case, location preferences are just a scheduler hint, the rdd can still be scheduled elsewhere. You can turn up spark.locality.wait to a very high value to decrease the likelihood. On Thu, Aug 20, 2015 at 5:47 PM, nehalsyed nehal_s...@cable.comcast.commailto:nehal_s...@cable.comcast.com wrote: I have data in Kafka topic-partition and I am reading it from Spark like this: JavaPairReceiverInputDStreamString, String directKafkaStream = KafkaUtils.createDirectStream(streamingContext, [key class], [value class], [key decoder class], [value decoder class], [map of Kafka parameters], [set of topics to consume]); I want that message from a kafka partition always land on same machine on Spark rdd so I can cache some decoration data locally and later reuse with other messages (that belong to same key). Can anyone tell me how can I achieve it? Thanks View this message in context: Kafka Spark Partition Mappinghttp://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Spark-Partition-Mapping-tp24372.html Sent from the Apache Spark User List mailing list archivehttp://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: Unable to catch SparkContext methods exceptions
The laziness is hard to deal with in these situations. I would suggest trying to handle expected cases FileNotFound, etc using other methods before even starting a Spark job. If you really want to try.catch a specific portion of a Spark job, one way is to just follow it with an action. You can even call persist() before the action, so that you can re-use the rdd. Best, Burak On Mon, Aug 24, 2015 at 10:52 AM, Roberto Coluccio roberto.coluc...@gmail.com wrote: Hi Burak, thanks for your answer. I have a new MyResultFunction()(sparkContext, inputPath).collect in the unit test (so to evaluate the actual result), and there I can observe and catch the exception. Even considering Spark's laziness, shouldn't I catch the exception while occurring in the try..catch statement that encloses the textFile invocation? Best, Roberto On Mon, Aug 24, 2015 at 7:38 PM, Burak Yavuz brk...@gmail.com wrote: textFile is a lazy operation. It doesn't evaluate until you call an action on it, such as .count(). Therefore, you won't catch the exception there. Best, Burak On Mon, Aug 24, 2015 at 9:09 AM, Roberto Coluccio roberto.coluc...@gmail.com wrote: Hello folks, I'm experiencing an unexpected behaviour, that suggests me thinking about my missing notions on how Spark works. Let's say I have a Spark driver that invokes a function like: - in myDriver - val sparkContext = new SparkContext(mySparkConf) val inputPath = file://home/myUser/project/resources/date=*/* val myResult = new MyResultFunction()(sparkContext, inputPath) - in MyResultFunctionOverRDD -- class MyResultFunction extends Function2[SparkContext, String, RDD[String]] with Serializable { override def apply(sparkContext: SparkContext, inputPath: String): RDD[String] = { try { sparkContext.textFile(inputPath, 1) } catch { case t: Throwable = { myLogger.error(serror: ${t.getStackTraceString}\n) sc.makeRDD(Seq[String]()) } } } } What happens is that I'm *unable to catch exceptions* thrown by the textFile method within the try..catch clause in MyResultFunction. In fact, in a unit test for that function where I call it passing an invalid inputPath, I don't get an empty RDD as result, but the unit test exits (and fails) due to exception not handled. What am I missing here? Thank you. Best regards, Roberto
RE: Spark Sql behaves strangely with tables with a lot of partitions
Can you please remove me from this distribution list? (Filling up my inbox too fast) From: Michael Armbrust [mailto:mich...@databricks.com] Sent: Monday, August 24, 2015 2:13 PM To: Philip Weaver philip.wea...@gmail.com Cc: Jerrick Hoang jerrickho...@gmail.com; Raghavendra Pandey raghavendra.pan...@gmail.com; User user@spark.apache.org; Cheng, Hao hao.ch...@intel.com Subject: Re: Spark Sql behaves strangely with tables with a lot of partitions I think we are mostly bottlenecked at this point by how fast we can make listStatus calls to discover the folders. That said, we are happy to accept suggestions or PRs to make this faster. Perhaps you can describe how your home grown partitioning works? On Sun, Aug 23, 2015 at 7:38 PM, Philip Weaver philip.wea...@gmail.commailto:philip.wea...@gmail.com wrote: 1 minute to discover 1000s of partitions -- yes, that is what I have observed. And I would assert that is very slow. On Sun, Aug 23, 2015 at 7:16 PM, Michael Armbrust mich...@databricks.commailto:mich...@databricks.com wrote: We should not be actually scanning all of the data of all of the partitions, but we do need to at least list all of the available directories so that we can apply your predicates to the actual values that are present when we are deciding which files need to be read in a given spark job. While this is a somewhat expensive operation, we do it in parallel and we cache this information when you access the same relation more than once. Can you provide a little more detail about how exactly you are accessing the parquet data (are you using sqlContext.read or creating persistent tables in the metastore?), and how long it is taking? It would also be good to know how many partitions we are talking about and how much data is in each. Finally, I'd like to see the stacktrace where it is hanging to make sure my above assertions are correct. We have several tables internally that have 1000s of partitions and while it takes ~1 minute initially to discover the metadata, after that we are able to query the data interactively. On Sun, Aug 23, 2015 at 2:00 AM, Jerrick Hoang jerrickho...@gmail.commailto:jerrickho...@gmail.com wrote: anybody has any suggestions? On Fri, Aug 21, 2015 at 3:14 PM, Jerrick Hoang jerrickho...@gmail.commailto:jerrickho...@gmail.com wrote: Is there a workaround without updating Hadoop? Would really appreciate if someone can explain what spark is trying to do here and what is an easy way to turn this off. Thanks all! On Fri, Aug 21, 2015 at 11:09 AM, Raghavendra Pandey raghavendra.pan...@gmail.commailto:raghavendra.pan...@gmail.com wrote: Did you try with hadoop version 2.7.1 .. It is known that s3a works really well with parquet which is available in 2.7. They fixed lot of issues related to metadata reading there... On Aug 21, 2015 11:24 PM, Jerrick Hoang jerrickho...@gmail.commailto:jerrickho...@gmail.com wrote: @Cheng, Hao : Physical plans show that it got stuck on scanning S3! (table is partitioned by date_prefix and hour) explain select count(*) from test_table where date_prefix='20150819' and hour='00'; TungstenAggregate(key=[], value=[(count(1),mode=Final,isDistinct=false)] TungstenExchange SinglePartition TungstenAggregate(key=[], value=[(count(1),mode=Partial,isDistinct=false)] Scan ParquetRelation[ .. about 1000 partition paths go here ] Why does spark have to scan all partitions when the query only concerns with 1 partitions? Doesn't it defeat the purpose of partitioning? Thanks! On Thu, Aug 20, 2015 at 4:12 PM, Philip Weaver philip.wea...@gmail.commailto:philip.wea...@gmail.com wrote: I hadn't heard of spark.sql.sources.partitionDiscovery.enabled before, and I couldn't find much information about it online. What does it mean exactly to disable it? Are there any negative consequences to disabling it? On Wed, Aug 19, 2015 at 10:53 PM, Cheng, Hao hao.ch...@intel.commailto:hao.ch...@intel.com wrote: Can you make some more profiling? I am wondering if the driver is busy with scanning the HDFS / S3. Like jstack pid of driver process And also, it’s will be great if you can paste the physical plan for the simple query. From: Jerrick Hoang [mailto:jerrickho...@gmail.commailto:jerrickho...@gmail.com] Sent: Thursday, August 20, 2015 1:46 PM To: Cheng, Hao Cc: Philip Weaver; user Subject: Re: Spark Sql behaves strangely with tables with a lot of partitions I cloned from TOT after 1.5.0 cut off. I noticed there were a couple of CLs trying to speed up spark sql with tables with a huge number of partitions, I've made sure that those CLs are included but it's still very slow On Wed, Aug 19, 2015 at 10:43 PM, Cheng, Hao hao.ch...@intel.commailto:hao.ch...@intel.com wrote: Yes, you can try set the spark.sql.sources.partitionDiscovery.enabled to false. BTW, which version are you using? Hao From: Jerrick Hoang [mailto:jerrickho...@gmail.commailto:jerrickho...@gmail.com] Sent: Thursday, August 20, 2015 12:16 PM To:
Local Spark talking to remote HDFS?
I have a file in HDFS inside my HortonWorks HDP 2.3_1 VirtualBox VM. If I go into the guest spark-shell and refer to the file thus, it works fine val words=sc.textFile(hdfs:///tmp/people.txt) words.count However if I try to access it from a local Spark app on my Windows host, it doesn't work val conf = new SparkConf().setMaster(local).setAppName(My App) val sc = new SparkContext(conf) val words=sc.textFile(hdfs://localhost:8020/tmp/people.txt) words.count Emits The port 8020 is open, and if I choose the wrong file name, it will tell me My pom has dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.11/artifactId version1.4.1/version scopeprovided/scope /dependency Am I doing something wrong? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Local-Spark-talking-to-remote-HDFS-tp24425.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Sql behaves strangely with tables with a lot of partitions
@Michael: would listStatus calls read the actual parquet footers within the folders? On Mon, Aug 24, 2015 at 11:36 AM, Sereday, Scott scott.sere...@nielsen.com wrote: Can you please remove me from this distribution list? (Filling up my inbox too fast) *From:* Michael Armbrust [mailto:mich...@databricks.com] *Sent:* Monday, August 24, 2015 2:13 PM *To:* Philip Weaver philip.wea...@gmail.com *Cc:* Jerrick Hoang jerrickho...@gmail.com; Raghavendra Pandey raghavendra.pan...@gmail.com; User user@spark.apache.org; Cheng, Hao hao.ch...@intel.com *Subject:* Re: Spark Sql behaves strangely with tables with a lot of partitions I think we are mostly bottlenecked at this point by how fast we can make listStatus calls to discover the folders. That said, we are happy to accept suggestions or PRs to make this faster. Perhaps you can describe how your home grown partitioning works? On Sun, Aug 23, 2015 at 7:38 PM, Philip Weaver philip.wea...@gmail.com wrote: 1 minute to discover 1000s of partitions -- yes, that is what I have observed. And I would assert that is very slow. On Sun, Aug 23, 2015 at 7:16 PM, Michael Armbrust mich...@databricks.com wrote: We should not be actually scanning all of the data of all of the partitions, but we do need to at least list all of the available directories so that we can apply your predicates to the actual values that are present when we are deciding which files need to be read in a given spark job. While this is a somewhat expensive operation, we do it in parallel and we cache this information when you access the same relation more than once. Can you provide a little more detail about how exactly you are accessing the parquet data (are you using sqlContext.read or creating persistent tables in the metastore?), and how long it is taking? It would also be good to know how many partitions we are talking about and how much data is in each. Finally, I'd like to see the stacktrace where it is hanging to make sure my above assertions are correct. We have several tables internally that have 1000s of partitions and while it takes ~1 minute initially to discover the metadata, after that we are able to query the data interactively. On Sun, Aug 23, 2015 at 2:00 AM, Jerrick Hoang jerrickho...@gmail.com wrote: anybody has any suggestions? On Fri, Aug 21, 2015 at 3:14 PM, Jerrick Hoang jerrickho...@gmail.com wrote: Is there a workaround without updating Hadoop? Would really appreciate if someone can explain what spark is trying to do here and what is an easy way to turn this off. Thanks all! On Fri, Aug 21, 2015 at 11:09 AM, Raghavendra Pandey raghavendra.pan...@gmail.com wrote: Did you try with hadoop version 2.7.1 .. It is known that s3a works really well with parquet which is available in 2.7. They fixed lot of issues related to metadata reading there... On Aug 21, 2015 11:24 PM, Jerrick Hoang jerrickho...@gmail.com wrote: @Cheng, Hao : Physical plans show that it got stuck on scanning S3! (table is partitioned by date_prefix and hour) explain select count(*) from test_table where date_prefix='20150819' and hour='00'; TungstenAggregate(key=[], value=[(count(1),mode=Final,isDistinct=false)] TungstenExchange SinglePartition TungstenAggregate(key=[], value=[(count(1),mode=Partial,isDistinct=false)] Scan ParquetRelation[ .. about 1000 partition paths go here ] Why does spark have to scan all partitions when the query only concerns with 1 partitions? Doesn't it defeat the purpose of partitioning? Thanks! On Thu, Aug 20, 2015 at 4:12 PM, Philip Weaver philip.wea...@gmail.com wrote: I hadn't heard of spark.sql.sources.partitionDiscovery.enabled before, and I couldn't find much information about it online. What does it mean exactly to disable it? Are there any negative consequences to disabling it? On Wed, Aug 19, 2015 at 10:53 PM, Cheng, Hao hao.ch...@intel.com wrote: Can you make some more profiling? I am wondering if the driver is busy with scanning the HDFS / S3. Like jstack pid of driver process And also, it’s will be great if you can paste the physical plan for the simple query. *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com] *Sent:* Thursday, August 20, 2015 1:46 PM *To:* Cheng, Hao *Cc:* Philip Weaver; user *Subject:* Re: Spark Sql behaves strangely with tables with a lot of partitions I cloned from TOT after 1.5.0 cut off. I noticed there were a couple of CLs trying to speed up spark sql with tables with a huge number of partitions, I've made sure that those CLs are included but it's still very slow On Wed, Aug 19, 2015 at 10:43 PM, Cheng, Hao hao.ch...@intel.com wrote: Yes, you can try set the spark.sql.sources.partitionDiscovery.enabled to false. BTW, which version are you using? Hao *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com] *Sent:* Thursday, August 20, 2015 12:16 PM *To:* Philip
Re: Spark Sql behaves strangely with tables with a lot of partitions
No, starting with Spark 1.5 we should by default only be reading the footers on the executor side (that is unless schema merging has been explicitly turned on). On Mon, Aug 24, 2015 at 12:20 PM, Jerrick Hoang jerrickho...@gmail.com wrote: @Michael: would listStatus calls read the actual parquet footers within the folders? On Mon, Aug 24, 2015 at 11:36 AM, Sereday, Scott scott.sere...@nielsen.com wrote: Can you please remove me from this distribution list? (Filling up my inbox too fast) *From:* Michael Armbrust [mailto:mich...@databricks.com] *Sent:* Monday, August 24, 2015 2:13 PM *To:* Philip Weaver philip.wea...@gmail.com *Cc:* Jerrick Hoang jerrickho...@gmail.com; Raghavendra Pandey raghavendra.pan...@gmail.com; User user@spark.apache.org; Cheng, Hao hao.ch...@intel.com *Subject:* Re: Spark Sql behaves strangely with tables with a lot of partitions I think we are mostly bottlenecked at this point by how fast we can make listStatus calls to discover the folders. That said, we are happy to accept suggestions or PRs to make this faster. Perhaps you can describe how your home grown partitioning works? On Sun, Aug 23, 2015 at 7:38 PM, Philip Weaver philip.wea...@gmail.com wrote: 1 minute to discover 1000s of partitions -- yes, that is what I have observed. And I would assert that is very slow. On Sun, Aug 23, 2015 at 7:16 PM, Michael Armbrust mich...@databricks.com wrote: We should not be actually scanning all of the data of all of the partitions, but we do need to at least list all of the available directories so that we can apply your predicates to the actual values that are present when we are deciding which files need to be read in a given spark job. While this is a somewhat expensive operation, we do it in parallel and we cache this information when you access the same relation more than once. Can you provide a little more detail about how exactly you are accessing the parquet data (are you using sqlContext.read or creating persistent tables in the metastore?), and how long it is taking? It would also be good to know how many partitions we are talking about and how much data is in each. Finally, I'd like to see the stacktrace where it is hanging to make sure my above assertions are correct. We have several tables internally that have 1000s of partitions and while it takes ~1 minute initially to discover the metadata, after that we are able to query the data interactively. On Sun, Aug 23, 2015 at 2:00 AM, Jerrick Hoang jerrickho...@gmail.com wrote: anybody has any suggestions? On Fri, Aug 21, 2015 at 3:14 PM, Jerrick Hoang jerrickho...@gmail.com wrote: Is there a workaround without updating Hadoop? Would really appreciate if someone can explain what spark is trying to do here and what is an easy way to turn this off. Thanks all! On Fri, Aug 21, 2015 at 11:09 AM, Raghavendra Pandey raghavendra.pan...@gmail.com wrote: Did you try with hadoop version 2.7.1 .. It is known that s3a works really well with parquet which is available in 2.7. They fixed lot of issues related to metadata reading there... On Aug 21, 2015 11:24 PM, Jerrick Hoang jerrickho...@gmail.com wrote: @Cheng, Hao : Physical plans show that it got stuck on scanning S3! (table is partitioned by date_prefix and hour) explain select count(*) from test_table where date_prefix='20150819' and hour='00'; TungstenAggregate(key=[], value=[(count(1),mode=Final,isDistinct=false)] TungstenExchange SinglePartition TungstenAggregate(key=[], value=[(count(1),mode=Partial,isDistinct=false)] Scan ParquetRelation[ .. about 1000 partition paths go here ] Why does spark have to scan all partitions when the query only concerns with 1 partitions? Doesn't it defeat the purpose of partitioning? Thanks! On Thu, Aug 20, 2015 at 4:12 PM, Philip Weaver philip.wea...@gmail.com wrote: I hadn't heard of spark.sql.sources.partitionDiscovery.enabled before, and I couldn't find much information about it online. What does it mean exactly to disable it? Are there any negative consequences to disabling it? On Wed, Aug 19, 2015 at 10:53 PM, Cheng, Hao hao.ch...@intel.com wrote: Can you make some more profiling? I am wondering if the driver is busy with scanning the HDFS / S3. Like jstack pid of driver process And also, it’s will be great if you can paste the physical plan for the simple query. *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com] *Sent:* Thursday, August 20, 2015 1:46 PM *To:* Cheng, Hao *Cc:* Philip Weaver; user *Subject:* Re: Spark Sql behaves strangely with tables with a lot of partitions I cloned from TOT after 1.5.0 cut off. I noticed there were a couple of CLs trying to speed up spark sql with tables with a huge number of partitions, I've made sure that those CLs are included but it's still very slow On Wed, Aug 19, 2015 at 10:43 PM, Cheng, Hao hao.ch...@intel.com wrote:
Re: Memory-efficient successive calls to repartition()
Hi Aurelien, The first code should create a new RDD in memory at each iteration (check the webui). The second code will unpersist the RDD but that's not the main problem. I think you have trouble due to long lineage as .cache() keep track of lineage for recovery. You should have a look at checkpointing : https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala You can also have a look at the code of others iterative algorithms in mlllib for best practices. 2015-08-20 17:26 GMT+08:00 abellet aurelien.bel...@telecom-paristech.fr: Hello, For the need of my application, I need to periodically shuffle the data across nodes/partitions of a reasonably-large dataset. This is an expensive operation but I only need to do it every now and then. However it seems that I am doing something wrong because as the iterations go the memory usage increases, causing the job to spill onto HDFS, which eventually gets full. I am also getting some Lost executor errors that I don't get if I don't repartition. Here's a basic piece of code which reproduces the problem: data = sc.textFile(ImageNet_gist_train.txt,50).map(parseLine).cache() data.count() for i in range(1000): data=data.repartition(50).persist() # below several operations are done on data What am I doing wrong? I tried the following but it doesn't solve the issue: for i in range(1000): data2=data.repartition(50).persist() data2.count() # materialize rdd data.unpersist() # unpersist previous version data=data2 Help and suggestions on this would be greatly appreciated! Thanks a lot! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Memory-efficient-successive-calls-to-repartition-tp24358.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Alexis GILLAIN
[Spark Streaming on Mesos (good practices)]
which are the best practices to submit spark streaming application on mesos. I would like to know about scheduler mode. Is `coarse-grained` mode right solution? Thanks
Drop table and Hive warehouse
When I store DataFrame as table with command saveAsTable and then execute DROP TABLE in SparkSQL, it doesn't actually delete files in hive warehouse. The table disappears from a table list but the data files are still alive. Because of this, I can't saveAsTable with a same name before dropping table. Is it a normal situation? If it is, I will delete files manually ;) Kevin 상기 메일은 지정된 수신인만을 위한 것이며, 부정경쟁방지 및 영업비밀보호에 관한 법률,개인정보 보호법을 포함하여 관련 법령에 따라 보호의 대상이 되는 영업비밀, 산업기술,기밀정보, 개인정보 등을 포함하고 있을 수 있습니다. 본 문서에 포함된 정보의 전부 또는 일부를 무단으로 복사 또는 사용하거나 제3자에게 공개, 배포, 제공하는 것은 엄격히 금지됩니다. 본 메일이 잘못 전송된 경우 발신인 또는 당사에게 알려주시고 본 메일을 즉시 삭제하여 주시기 바랍니다. The contents of this e-mail message and any attachments are confidential and are intended solely for addressee. The information may also be legally privileged. This transmission is sent in trust, for the sole purpose of delivery to the intended recipient. If you have received this transmission in error, any use, reproduction or dissemination of this transmission is strictly prohibited. If you are not the intended recipient, please immediately notify the sender by reply e-mail or phone and delete this message and its attachments, if any.
DataFrame#show cost 2 Spark Jobs ?
It's weird to me that the simple show function will cost 2 spark jobs. DataFrame#explain shows it is a very simple operation, not sure why need 2 jobs. == Parsed Logical Plan == Relation[age#0L,name#1] JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json] == Analyzed Logical Plan == age: bigint, name: string Relation[age#0L,name#1] JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json] == Optimized Logical Plan == Relation[age#0L,name#1] JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json] == Physical Plan == Scan JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json][age#0L,name#1] -- Best Regards Jeff Zhang
Determinant of Matrix
Hi, Is there any function to find the determinant of a mllib.linalg.Matrix (a covariance matrix) using Spark? Regards, Naveen - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to remove worker node but let it finish first?
Hi, I have a spark standalone cluster with 100s of applications per day, and it changes size (more or less workers) at various hours. The driver runs on a separate machine outside the spark cluster. When a job is running and it's worker is killed (because at that hour the number of workers is reduced), it sometimes fails, instead of redistributing the work to other workers. How is it possible to decomission a worker, so that it doesn't receive any new work, but does finish all existing work before shutting down? Thanks!
Re: How to set environment of worker applications
That's surprising. Passing the environment variables using spark.executor.extraJavaOptions=-Dmyenvvar=xxx to the executor and then fetching them using System.getProperty(myenvvar) has worked for me. What is the error that you guys got? On Mon, Aug 24, 2015 at 12:10 AM, Sathish Kumaran Vairavelu vsathishkuma...@gmail.com wrote: spark-env.sh works for me in Spark 1.4 but not spark.executor.extraJavaOptions. On Sun, Aug 23, 2015 at 11:27 AM Raghavendra Pandey raghavendra.pan...@gmail.com wrote: I think the only way to pass on environment variables to worker node is to write it in spark-env.sh file on each worker node. On Sun, Aug 23, 2015 at 8:16 PM, Hemant Bhanawat hemant9...@gmail.com wrote: Check for spark.driver.extraJavaOptions and spark.executor.extraJavaOptions in the following article. I think you can use -D to pass system vars: spark.apache.org/docs/latest/configuration.html#runtime-environment Hi, I am starting a spark streaming job in standalone mode with spark-submit. Is there a way to make the UNIX environment variables with which spark-submit is started available to the processes started on the worker nodes? Jan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Joining using mulitimap or array
Hi, guys I'm confused about joining columns in SparkSQL and need your advice. I want to join 2 datasets of profiles. Each profile has name and array of attributes(age, gender, email etc). There can be mutliple instances of attribute with the same name, e.g. profile has 2 emails - so 2 attributes with name = 'email' in array. Now I want to join 2 datasets using 'email' attribute. I cant find the way to do it :( The code is below. Now result of join is empty, while I expect to see 1 row with all Alice emails. import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.{SparkConf, SparkContext} case class Attribute(name: String, value: String, weight: Float) case class Profile(name: String, attributes: Seq[Attribute]) object SparkJoinArrayColumn { def main(args: Array[String]) { val sc: SparkContext = new SparkContext(new SparkConf().setMaster(local).setAppName(getClass.getSimpleName)) val sqlContext: SQLContext = new SQLContext(sc) import sqlContext.implicits._ val a: DataFrame = sc.parallelize(Seq( Profile(Alice, Seq(Attribute(email, al...@mail.com, 1.0f), Attribute(email, a.jo...@mail.com, 1.0f))) )).toDF.as(a) val b: DataFrame = sc.parallelize(Seq( Profile(Alice, Seq(Attribute(email, al...@mail.com, 1.0f), Attribute(age, 29, 0.2f))) )).toDF.as(b) a.where($a.attributes.name === email) .join( b.where($b.attributes.name === email), $a.attributes.value === $b.attributes.value ) .show() } } Thanks forward! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Memory-efficient successive calls to repartition()
Hi Aurelien, The first code should create a new RDD in memory at each iteration (check the webui). The second code will unpersist the RDD but that's not the main problem. I think you have trouble due to long lineage as .cache() keep track of lineage for recovery. You should have a look at checkpointing : https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala You can also have a look at the code of others iterative algorithms in mlllib for best practices. 2015-08-20 17:26 GMT+08:00 abellet aurelien.bel...@telecom-paristech.fr: Hello, For the need of my application, I need to periodically shuffle the data across nodes/partitions of a reasonably-large dataset. This is an expensive operation but I only need to do it every now and then. However it seems that I am doing something wrong because as the iterations go the memory usage increases, causing the job to spill onto HDFS, which eventually gets full. I am also getting some Lost executor errors that I don't get if I don't repartition. Here's a basic piece of code which reproduces the problem: data = sc.textFile(ImageNet_gist_train.txt,50).map(parseLine).cache() data.count() for i in range(1000): data=data.repartition(50).persist() # below several operations are done on data What am I doing wrong? I tried the following but it doesn't solve the issue: for i in range(1000): data2=data.repartition(50).persist() data2.count() # materialize rdd data.unpersist() # unpersist previous version data=data2 Help and suggestions on this would be greatly appreciated! Thanks a lot! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Memory-efficient-successive-calls-to-repartition-tp24358.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Transformation not happening for reduceByKey or GroupByKey
HI All, Please find fix info for users who are following the mail chain of this issue and the respective solution below: *reduceByKey: Non working snippet* import org.apache.spark.Context import org.apache.spark.Context._ import org.apache.spark.SparkConf val conf = new SparkConf() val sc = new SparkContext(conf) val DataRDD = SC.makeRDD(Seq((0,1),(0,2),(1,2),(1,3),(2,4))) DataRDD.reduceByKey(_+_).collect Result: Array() is empty *reduceByKey: Working snippet* import org.apache.spark.Context import org.apache.spark.Context._ import org.apache.spark.SparkConf val conf = new SparkConf() val sc = new SparkContext(conf).set(spark.driver.allowMultipleContexts,true) val DataRDD = SC.makeRDD(Seq((0,1),(0,2),(1,2),(1,3),(2,4))) DataRDD.reduceByKey(_+_).collect Result: Array((0,3),(1,5),(2,4)) Regards, Satish Chandra On Sat, Aug 22, 2015 at 11:27 AM, satish chandra j jsatishchan...@gmail.com wrote: HI All, Currently using DSE 4.7 and Spark 1.2.2 version Regards, Satish On Fri, Aug 21, 2015 at 7:30 PM, java8964 java8...@hotmail.com wrote: What version of Spark you are using, or comes with DSE 4.7? We just cannot reproduce it in Spark. yzhang@localhost$ more test.spark val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40))) pairs.reduceByKey((x,y) = x + y).collect yzhang@localhost$ ~/spark/bin/spark-shell --master local -i test.spark Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.3.1 /_/ Using Scala version 2.10.4 Spark context available as sc. SQL context available as sqlContext. Loading test.spark... pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at makeRDD at console:21 15/08/21 09:58:51 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes res0: Array[(Int, Int)] = Array((0,3), (1,50), (2,40)) Yong -- Date: Fri, 21 Aug 2015 19:24:09 +0530 Subject: Re: Transformation not happening for reduceByKey or GroupByKey From: jsatishchan...@gmail.com To: abhis...@tetrationanalytics.com CC: user@spark.apache.org HI Abhishek, I have even tried that but rdd2 is empty Regards, Satish On Fri, Aug 21, 2015 at 6:47 PM, Abhishek R. Singh abhis...@tetrationanalytics.com wrote: You had: RDD.reduceByKey((x,y) = x+y) RDD.take(3) Maybe try: rdd2 = RDD.reduceByKey((x,y) = x+y) rdd2.take(3) -Abhishek- On Aug 20, 2015, at 3:05 AM, satish chandra j jsatishchan...@gmail.com wrote: HI All, I have data in RDD as mentioned below: RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40)) I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on Values for each key Code: RDD.reduceByKey((x,y) = x+y) RDD.take(3) Result in console: RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at console:73 res:Array[(Int,Int)] = Array() Command as mentioned dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile Please let me know what is missing in my code, as my resultant Array is empty Regards, Satish
History server is not receiving any event
Hi, I am working on streaming application. I tried to configure history server to persist the events of application in hadoop file system (hdfs). However, it is not logging any events. I am running Apache Spark 1.4.1 (pyspark) under Ubuntu 14.04 with three nodes. Here is my configuration: File - /usr/local/spark/conf/spark-defaults.conf#In all three nodes spark.eventLog.enabled true spark.eventLog.dir hdfs://master-host:port/usr/local/hadoop/spark_log #in master node export SPARK_HISTORY_OPTS=-Dspark.history.fs.logDirectory=hdfs://host:port/usr/local/hadoop/spark_log Can someone give list of steps to configure history server. Thanks and regards, b.bhavesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/History-server-is-not-receiving-any-event-tp24426.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Array Out OF Bound Exception
Hi , I am using SPARK 1.4 and I am getting an array out of bound Exception when I am trying to read from a registered table in SPARK. For example If I have 3 different text files with the content as below: Scenario 1: A1|B1|C1 A2|B2|C2 Scenario 2: A1| |C1 A2| |C2 Scenario 3: A1| B1| A2| B2| So for Scenario 1 and 2 it's working fine but for Scenario 3 I am getting the following error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 4, localhost): java.lang.ArrayIndexOutOfBoundsException: 2 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:40) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:38) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143) at org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) Please help. Thanks, Debobrota
Re: Meaning of local[2]
Just to add you can also look into SPARK_WORKER_INSTANCES configuration in the spark-env.sh file. On Aug 17, 2015 3:44 AM, Daniel Darabos daniel.dara...@lynxanalytics.com wrote: Hi Praveen, On Mon, Aug 17, 2015 at 12:34 PM, praveen S mylogi...@gmail.com wrote: What does this mean in .setMaster(local[2]) Local mode (executor in the same JVM) with 2 executor threads. Is this applicable only for standalone Mode? It is not applicable for standalone mode, only for local. Can I do this in a cluster setup, eg: . setMaster(hostname:port[2]).. No. It's faster to try than to ask a mailing list, actually. Also it's documented at http://spark.apache.org/docs/latest/submitting-applications.html#master-urls . Is it number of threads per worker node? You can control the number of total threads with spark-submit's --total-executor-cores parameter, if that's what you're looking for.
Re: Spark Sql behaves strangely with tables with a lot of partitions
Follow the directions here: http://spark.apache.org/community.html On Mon, Aug 24, 2015 at 11:36 AM, Sereday, Scott scott.sere...@nielsen.com wrote: Can you please remove me from this distribution list? (Filling up my inbox too fast) *From:* Michael Armbrust [mailto:mich...@databricks.com] *Sent:* Monday, August 24, 2015 2:13 PM *To:* Philip Weaver philip.wea...@gmail.com *Cc:* Jerrick Hoang jerrickho...@gmail.com; Raghavendra Pandey raghavendra.pan...@gmail.com; User user@spark.apache.org; Cheng, Hao hao.ch...@intel.com *Subject:* Re: Spark Sql behaves strangely with tables with a lot of partitions I think we are mostly bottlenecked at this point by how fast we can make listStatus calls to discover the folders. That said, we are happy to accept suggestions or PRs to make this faster. Perhaps you can describe how your home grown partitioning works? On Sun, Aug 23, 2015 at 7:38 PM, Philip Weaver philip.wea...@gmail.com wrote: 1 minute to discover 1000s of partitions -- yes, that is what I have observed. And I would assert that is very slow. On Sun, Aug 23, 2015 at 7:16 PM, Michael Armbrust mich...@databricks.com wrote: We should not be actually scanning all of the data of all of the partitions, but we do need to at least list all of the available directories so that we can apply your predicates to the actual values that are present when we are deciding which files need to be read in a given spark job. While this is a somewhat expensive operation, we do it in parallel and we cache this information when you access the same relation more than once. Can you provide a little more detail about how exactly you are accessing the parquet data (are you using sqlContext.read or creating persistent tables in the metastore?), and how long it is taking? It would also be good to know how many partitions we are talking about and how much data is in each. Finally, I'd like to see the stacktrace where it is hanging to make sure my above assertions are correct. We have several tables internally that have 1000s of partitions and while it takes ~1 minute initially to discover the metadata, after that we are able to query the data interactively. On Sun, Aug 23, 2015 at 2:00 AM, Jerrick Hoang jerrickho...@gmail.com wrote: anybody has any suggestions? On Fri, Aug 21, 2015 at 3:14 PM, Jerrick Hoang jerrickho...@gmail.com wrote: Is there a workaround without updating Hadoop? Would really appreciate if someone can explain what spark is trying to do here and what is an easy way to turn this off. Thanks all! On Fri, Aug 21, 2015 at 11:09 AM, Raghavendra Pandey raghavendra.pan...@gmail.com wrote: Did you try with hadoop version 2.7.1 .. It is known that s3a works really well with parquet which is available in 2.7. They fixed lot of issues related to metadata reading there... On Aug 21, 2015 11:24 PM, Jerrick Hoang jerrickho...@gmail.com wrote: @Cheng, Hao : Physical plans show that it got stuck on scanning S3! (table is partitioned by date_prefix and hour) explain select count(*) from test_table where date_prefix='20150819' and hour='00'; TungstenAggregate(key=[], value=[(count(1),mode=Final,isDistinct=false)] TungstenExchange SinglePartition TungstenAggregate(key=[], value=[(count(1),mode=Partial,isDistinct=false)] Scan ParquetRelation[ .. about 1000 partition paths go here ] Why does spark have to scan all partitions when the query only concerns with 1 partitions? Doesn't it defeat the purpose of partitioning? Thanks! On Thu, Aug 20, 2015 at 4:12 PM, Philip Weaver philip.wea...@gmail.com wrote: I hadn't heard of spark.sql.sources.partitionDiscovery.enabled before, and I couldn't find much information about it online. What does it mean exactly to disable it? Are there any negative consequences to disabling it? On Wed, Aug 19, 2015 at 10:53 PM, Cheng, Hao hao.ch...@intel.com wrote: Can you make some more profiling? I am wondering if the driver is busy with scanning the HDFS / S3. Like jstack pid of driver process And also, it’s will be great if you can paste the physical plan for the simple query. *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com] *Sent:* Thursday, August 20, 2015 1:46 PM *To:* Cheng, Hao *Cc:* Philip Weaver; user *Subject:* Re: Spark Sql behaves strangely with tables with a lot of partitions I cloned from TOT after 1.5.0 cut off. I noticed there were a couple of CLs trying to speed up spark sql with tables with a huge number of partitions, I've made sure that those CLs are included but it's still very slow On Wed, Aug 19, 2015 at 10:43 PM, Cheng, Hao hao.ch...@intel.com wrote: Yes, you can try set the spark.sql.sources.partitionDiscovery.enabled to false. BTW, which version are you using? Hao *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com] *Sent:* Thursday, August 20, 2015 12:16 PM *To:* Philip Weaver *Cc:* user
`show tables like 'tmp*';` does not work in Spark 1.3.0+
Hi guys and gals, I have a Spark 1.2.0 instance running that I connect to via the thrift interface using beeline. On this instance I can send a command like `show tables like 'tmp*';` and I get a list of all tables that start with `tmp`. When testing this same command out on a server that is running Spark 1.3.0 or higher, I now get an error message: 0: jdbc:hive2://localhost:10001 show tables like 'tmp*'; Error: java.lang.RuntimeException: [1.13] failure: ``in'' expected but identifier like found show tables like 'tmp*' ^ (state=,code=0) 0: jdbc:hive2://localhost:10001 I'm wondering if wildcard matching was inadvertently removed, or if there is another way to accomplish the same thing without having to do this filtering on the client side. Cheers! Doug -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/show-tables-like-tmp-does-not-work-in-Spark-1-3-0-tp24429.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Strange ClassNotFoundException in spark-shell
Hi, I am using spark 1.4 M1 with the Cassandra Connector and run into a strange error when using the spark shell. This works: sc.cassandraTable(events, bid_events).select(bid,type).take(10).foreach(println) But as soon as I put a map() in there (or filter): sc.cassandraTable(events, bid_events).select(bid,type).map(r = r).take(10).foreach(println) I get the exception below. The spark-shell call is: /opt/spark/bin/spark-shell --master spark://x-1:7077 --conf spark.cassandra.connection.host=$(hostname -i) --driver-class-path $(echo /root/*.jar |sed 's/ /:/g') --jar spark-cassandra-connector-assembly-1.4.0-M1-SNAPSHOT.jar Can anyone provide ideas how to approach debugging this? Jan 15/08/24 23:54:43 INFO DAGScheduler: Job 0 failed: take at console:32, took 1.999875 s org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 10.31.39.116): java.lang.ClassNotFoundException: $anonfun$1 at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:66) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:95) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) scala - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Running spark shell on mesos with zookeeper on spark 1.3.1
I have setup up apache mesos using mesosphere on Cent OS 6 with Java 8.I have 3 slaves which total to 3 cores and 8 gb ram. I have set no firewalls. I am trying to run the following lines of code to test whether the setup is working: val data = 1 to 1 val distData = sc.parallelize(data) distData.filter(_ 10).collect() I get the following on my console 5/08/24 20:54:57 INFO SparkContext: Starting job: collect at console:26 15/08/24 20:54:57 INFO DAGScheduler: Got job 0 (collect at console:26) with 8 output partitions (allowLocal=false) 15/08/24 20:54:57 INFO DAGScheduler: Final stage: Stage 0(collect at console:26) 15/08/24 20:54:57 INFO DAGScheduler: Parents of final stage: List() 15/08/24 20:54:57 INFO DAGScheduler: Missing parents: List() 15/08/24 20:54:57 INFO DAGScheduler: Submitting Stage 0 (MapPartitionsRDD[1] at filter at console:26), which has no missing parents 15/08/24 20:54:57 INFO MemoryStore: ensureFreeSpace(1792) called with curMem=0, maxMem=280248975 15/08/24 20:54:57 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 1792.0 B, free 267.3 MB) 15/08/24 20:54:57 INFO MemoryStore: ensureFreeSpace(1293) called with curMem=1792, maxMem=280248975 15/08/24 20:54:57 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1293.0 B, free 267.3 MB) 15/08/24 20:54:57 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on ip-172-31-46-176.ec2.internal:33361 (size: 1293.0 B, free: 267.3 MB) 15/08/24 20:54:57 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0 15/08/24 20:54:57 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:839 15/08/24 20:54:57 INFO DAGScheduler: Submitting 8 missing tasks from Stage 0 (MapPartitionsRDD[1] at filter at console:26) 15/08/24 20:54:57 INFO TaskSchedulerImpl: Adding task set 0.0 with 8 tasks 15/08/24 20:55:12 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources 15/08/24 20:55:27 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources . Here are the logs /var/log/mesos attached mesos-slave.22202 http://apache-spark-user-list.1001560.n3.nabble.com/file/n24430/mesos-slave.22202 mesos-slave.22202 http://apache-spark-user-list.1001560.n3.nabble.com/file/n24430/mesos-slave.22202 mesos-master.22181 http://apache-spark-user-list.1001560.n3.nabble.com/file/n24430/mesos-master.22181 mesos-master.22181 http://apache-spark-user-list.1001560.n3.nabble.com/file/n24430/mesos-master.22181 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-spark-shell-on-mesos-with-zookeeper-on-spark-1-3-1-tp24430.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Run Spark job from within iPython+Spark?
I set up iPython Notebook to work with the pyspark shell, and now I'd like use %run to basically 'spark-submit' another Python Spark file, and leave the objects accessible within the Notebook. I tried this, but got a ValueError: Cannot run multiple SparkContexts at once error. I then tried taking out the 'sc = SparkContext()' line from the .py file, but then it couldn't access sc. How can I %run another Python Spark file within iPython Notebook? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Run-Spark-job-from-within-iPython-Spark-tp24427.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Array Out OF Bound Exception
This top line here is indicating that the exception is being throw from your code (i.e. code written in the console). at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:40) Check to make sure that you are properly handling data that has less columns than you would expect. On Mon, Aug 24, 2015 at 12:41 PM, SAHA, DEBOBROTA ds3...@att.com wrote: Hi , I am using SPARK 1.4 and I am getting an array out of bound Exception when I am trying to read from a registered table in SPARK. For example If I have 3 different text files with the content as below: *Scenario 1*: A1|B1|C1 A2|B2|C2 *Scenario 2*: A1| |C1 A2| |C2 *Scenario 3*: A1| B1| A2| B2| So for Scenario 1 and 2 it’s working fine but for Scenario 3 I am getting the following error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 4, localhost): java.lang.ArrayIndexOutOfBoundsException: 2 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:40) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:38) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to (TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143) at org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) Please help. Thanks, Debobrota
Re: Performance - Python streaming v/s Scala streaming
The scala version of the Kafka is something that we have been working on for a while, and is likely to be more optimized than the python one. The python one definitely requires pass the data back and forth between JVM and Python VM and decoding the raw bytes to the Python strings (probably less efficient that Java's Byte to UTF8 decoder), so that may cause some extra overheads compared to scala. Also consider trying the direct API. Read more in the Kafka integration guide - http://spark.apache.org/docs/latest/streaming-kafka-integration.html That overall has a much higher throughput that the earlier receiver based approach. BTW, disclaimer. Do not consider this difference as generalization of the performance difference between Scala and Python for all of Spark, For example, DataFrames provide performance parity between Scala and Python APIs. On Mon, Aug 24, 2015 at 5:22 AM, utk.pat utkarsh.pat...@gmail.com wrote: I am new to SPARK streaming. I was running the kafka_wordcount example with a local KAFKA and SPARK instance. It was very easy to set this up and get going :) I tried running both SCALA and Python versions of the word count example. Python versions seems to be extremely slow. Sometimes it has delays of more than couple of minutes. On the other hand SCALA versions seems to be way better. I am running on a windows machine. I am trying to understand what is the cause slowness in python streaming? Is there anything that I am missing? For real time streaming analysis should I prefer SCALA? -- View this message in context: Performance - Python streaming v/s Scala streaming http://apache-spark-user-list.1001560.n3.nabble.com/Performance-Python-streaming-v-s-Scala-streaming-tp24415.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
ExternalSorter: Thread *** spilling in-memory map of 352.6 MB to disk (38 times so far)
Hello, I'm trying to run a spark 1.5 job with: ./spark-shell --driver-java-options -Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=1044 -Xms16g -Xmx48g -Xss128m I get lots of error messages like : 15/08/24 20:24:33 INFO ExternalSorter: Thread 172 spilling in-memory map of 352.2 MB to disk (40 times so far) 15/08/24 20:24:33 INFO ExternalSorter: Thread 179 spilling in-memory map of 352.2 MB to disk (39 times so far) 15/08/24 20:24:34 INFO ExternalSorter: Thread 197 spilling in-memory map of 352.2 MB to disk (39 times so far) 15/08/24 20:24:34 INFO ExternalSorter: Thread 192 spilling in-memory map of 352.2 MB to disk (39 times so far) 15/08/24 20:24:36 INFO ShuffleMemoryManager: TID 798 waiting for at least 1/2N of shuffle memory pool to be free 15/08/24 20:24:36 INFO ExternalSorter: Thread 170 spilling in-memory map of 352.2 MB to disk (39 times so far) 15/08/24 20:24:36 INFO ExternalSorter: Thread 171 spilling in-memory map of 352.2 MB to disk (40 times so far) When I force a stack trace with jstack, I get stack traces like this for each of my 36 cores: Executor task launch worker-44 daemon prio=10 tid=0x7fb9c404d000 nid=0x417f runnable [0x7fb5e3ffe000] java.lang.Thread.State: RUNNABLE at scala.collection.mutable.ResizableArray$class.$init$(ResizableArray.scala:32) at scala.collection.mutable.ArrayBuffer.init(ArrayBuffer.scala:49) at scala.collection.mutable.ArrayBuffer.init(ArrayBuffer.scala:63) at org.apache.spark.util.SizeEstimator$SearchState.init(SizeEstimator.scala:154) at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:183) at org.apache.spark.util.SizeEstimator$$anonfun$sampleArray$1.apply$mcVI$sp(SizeEstimator.scala:262) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:166) at org.apache.spark.util.SizeEstimator$.sampleArray(SizeEstimator.scala:254) at org.apache.spark.util.SizeEstimator$.visitArray(SizeEstimator.scala:238) at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:194) at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:186) at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:54) at org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78) at org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70) at org.apache.spark.util.collection.PartitionedPairBuffer.afterUpdate(PartitionedPairBuffer.scala:30) at org.apache.spark.util.collection.PartitionedPairBuffer.insert(PartitionedPairBuffer.scala:53) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:214) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Jmap Shows: root@ip-172-31-28-227:~# jmap -heap 16421 Attaching to process ID 16421, please wait... Debugger attached successfully. Server compiler detected. JVM version is 24.79-b02 using thread-local object allocation. Parallel GC with 25 thread(s) Heap Configuration: MinHeapFreeRatio = 0 MaxHeapFreeRatio = 100 MaxHeapSize = 51539607552 (49152.0MB) NewSize = 1310720 (1.25MB) MaxNewSize = 17592186044415 MB OldSize = 5439488 (5.1875MB) NewRatio = 2 SurvivorRatio= 8 PermSize = 21757952 (20.75MB) MaxPermSize = 268435456 (256.0MB) G1HeapRegionSize = 0 (0.0MB) Heap Usage: PS Young Generation Eden Space: capacity = 15290335232 (14582.0MB) used = 11958528232 (11404.541236877441MB) free = 3331807000 (3177.4587631225586MB) 78.20971908433302% used From Space: capacity = 869269504 (829.0MB) used = 868910560 (828.6576843261719MB) free = 358944 (0.342315673828125MB) 99.9587073976082% used To Space: capacity = 965738496 (921.0MB) used = 0 (0.0MB) free = 965738496 (921.0MB) 0.0% used PS Old Generation capacity = 11453595648 (10923.0MB) used = 1423152248 (1357.223747253418MB) free = 10030443400 (9565.776252746582MB) 12.425375329611077% used PS Perm Generation capacity = 107479040 (102.5MB) used = 107107360 (102.14553833007812MB) free = 371680 (0.354461669921875MB) 99.65418373666158% used 27990 interned Strings occupying
Re: Spark ec2 lunch problem
Hey Garry, Have you verified that your particular VPC and subnet are open to the world? In particular, have you verified the route table attached to your VPC / subnet contains an internet gateway open to the public? I've run into this issue myself recently and that was the problem for me. -Andrew 2015-08-24 5:58 GMT-07:00 Robin East robin.e...@xense.co.uk: spark-ec2 is the way to go however you may need to debug connectivity issues. For example do you know that the servers were correctly setup in AWS and can you access each node using ssh? If no then you need to work out why (it’s not a spark issue). If yes then you will need to work out why ssh via the spark-ec2 script is not working. I’ve used spark-ec2 successfully many times but have never used the —vpc-id and —subnet-id options and that may be the source of your problems, especially since it appears to be a hostname resolution issue. If you could confirm the above questions then maybe someone on the list can help diagnose the specific problem. --- Robin East *Spark GraphX in Action* Michael Malak and Robin East Manning Publications Co. http://www.manning.com/malak/ On 24 Aug 2015, at 13:45, Garry Chen g...@cornell.edu wrote: So what is the best way to deploy spark cluster in EC2 environment any suggestions? Garry *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com ak...@sigmoidanalytics.com] *Sent:* Friday, August 21, 2015 4:27 PM *To:* Garry Chen g...@cornell.edu *Cc:* user@spark.apache.org *Subject:* Re: Spark ec2 lunch problem It may happen that the version of spark-ec2 script you are using is buggy or sometime AWS have problem provisioning machines. On Aug 21, 2015 7:56 AM, Garry Chen g...@cornell.edu wrote: Hi All, I am trying to lunch a spark ec2 cluster by running spark-ec2 --key-pair=key --identity-file=my.pem --vpc-id=myvpc --subnet-id=subnet-011 --spark-version=1.4.1 launch spark-cluster but getting following message endless. Please help. Warning: SSH connection error. (This could be temporary.) Host: SSH return code: 255 SSH output: ssh: Could not resolve hostname : Name or service not known
Exclude slf4j-log4j12 from the classpath via spark-submit
Continuing this discussion: http://apache-spark-user-list.1001560.n3.nabble.com/same-log4j-slf4j-error-in-spark-9-1-td5592.html I am getting this error when I use logback-classic. SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:.m2/repository/ch/qos/logback/logback-classic/1.1.2/logback-classic-1.1.2.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:.m2/repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] I need to use logback-classic for my current project, so I am trying to ignore slf4j-log4j12 from spark: dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.4.1/version exclusions exclusion groupIdorg.slf4j/groupId artifactIdslf4j-log4j12/artifactId /exclusion /exclusions /dependency Now, when I run my job from Intellij (which sets the classpath), things work perfectly. But when I run my job via spark-submit: ~/spark-1.4.1-bin-hadoop2.4/bin/spark-submit --class runner.SparkRunner spark-0.1-SNAPSHOT-jar-with-dependencies.jar My job fails because spark-submit sets up the classpath and it re-adds the slf4j-log4j12. I am not adding spark jar to the uber-jar via the maven assembly plugin: dependencySets dependencySet .. useTransitiveDependenciesfalse/useTransitiveDependencies excludes excludeorg.apache.spark:spark-core_2.10/exclude /excludes /dependencySet /dependencySets So how can I exclude slf4j-log4j12.jar when I submit a job via spark-submit (on a per job basis)? -- Thanks, -Utkarsh
Re: Exclude slf4j-log4j12 from the classpath via spark-submit
Hi Utkarsh, Unfortunately that's not going to be easy. Since Spark bundles all dependent classes into a single fat jar file, to remove that dependency you'd need to modify Spark's assembly jar (potentially in all your nodes). Doing that per-job is even trickier, because you'd probably need some kind of script to inject the correct binding into Spark's classpath. That being said, that message is not an error, it's more of a noisy warning. I'd expect slf4j to use the first binding available - in your case, logback-classic. Is that not the case? On Mon, Aug 24, 2015 at 2:50 PM, Utkarsh Sengar utkarsh2...@gmail.com wrote: Continuing this discussion: http://apache-spark-user-list.1001560.n3.nabble.com/same-log4j-slf4j-error-in-spark-9-1-td5592.html I am getting this error when I use logback-classic. SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:.m2/repository/ch/qos/logback/logback-classic/1.1.2/logback-classic-1.1.2.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:.m2/repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] I need to use logback-classic for my current project, so I am trying to ignore slf4j-log4j12 from spark: dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.4.1/version exclusions exclusion groupIdorg.slf4j/groupId artifactIdslf4j-log4j12/artifactId /exclusion /exclusions /dependency Now, when I run my job from Intellij (which sets the classpath), things work perfectly. But when I run my job via spark-submit: ~/spark-1.4.1-bin-hadoop2.4/bin/spark-submit --class runner.SparkRunner spark-0.1-SNAPSHOT-jar-with-dependencies.jar My job fails because spark-submit sets up the classpath and it re-adds the slf4j-log4j12. I am not adding spark jar to the uber-jar via the maven assembly plugin: dependencySets dependencySet .. useTransitiveDependenciesfalse/useTransitiveDependencies excludes excludeorg.apache.spark:spark-core_2.10/exclude /excludes /dependencySet /dependencySets So how can I exclude slf4j-log4j12.jar when I submit a job via spark-submit (on a per job basis)? -- Thanks, -Utkarsh -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark and scala-2.11
We're going to be upgrading from spark 1.0.2 and using hadoop-1.2.1 so need to build by hand. (Yes, I know. Use hadoop-2.x but standard resource constraints apply.) I want to build against scala-2.11 and publish to our artifact repository but finding build/spark-2.10.4 and tracing down what build/mvn was doing had me concerned that I was missing something. I'll hold the course and build it as instructed. Thanks for the info, all. PS - Since asked -- PATH=./build/apache-maven-3.2.5/bin:$PATH; build/mvn -Phadoop-1 -Dhadoop.version=1.2.1 -Dscala-2.11 -DskipTests package On Mon, Aug 24, 2015 at 2:49 PM, Jonathan Coveney jcove...@gmail.com wrote: I've used the instructions and it worked fine. Can you post exactly what you're doing, and what it fails with? Or are you just trying to understand how it works? 2015-08-24 15:48 GMT-04:00 Lanny Ripple la...@spotright.com: Hello, The instructions for building spark against scala-2.11 indicate using -Dspark-2.11. When I look in the pom.xml I find a profile named 'spark-2.11' but nothing that would indicate I should set a property. The sbt build seems to need the -Dscala-2.11 property set. Finally build/mvn does a simple grep of scala.version (which doesn't change after running dev/ change-version-to-2.11.sh) so the build seems to be grabbing the 2.10.4 scala library. Anyone know (from having done it and used it in production) if the build instructions for spark-1.4.1 against Scala-2.11 are correct? Thanks. -Lanny
Re: spark and scala-2.11
I've used the instructions and it worked fine. Can you post exactly what you're doing, and what it fails with? Or are you just trying to understand how it works? 2015-08-24 15:48 GMT-04:00 Lanny Ripple la...@spotright.com: Hello, The instructions for building spark against scala-2.11 indicate using -Dspark-2.11. When I look in the pom.xml I find a profile named 'spark-2.11' but nothing that would indicate I should set a property. The sbt build seems to need the -Dscala-2.11 property set. Finally build/mvn does a simple grep of scala.version (which doesn't change after running dev/ change-version-to-2.11.sh) so the build seems to be grabbing the 2.10.4 scala library. Anyone know (from having done it and used it in production) if the build instructions for spark-1.4.1 against Scala-2.11 are correct? Thanks. -Lanny
Re: spark and scala-2.11
The property scala-2.11 triggers the profile scala-2.11 -- and additionally disables the scala-2.10 profile, so that's the way to do it. But yes, you also need to run the script before-hand to set up the build for Scala 2.11 as well. On Mon, Aug 24, 2015 at 8:48 PM, Lanny Ripple la...@spotright.com wrote: Hello, The instructions for building spark against scala-2.11 indicate using -Dspark-2.11. When I look in the pom.xml I find a profile named 'spark-2.11' but nothing that would indicate I should set a property. The sbt build seems to need the -Dscala-2.11 property set. Finally build/mvn does a simple grep of scala.version (which doesn't change after running dev/change-version-to-2.11.sh) so the build seems to be grabbing the 2.10.4 scala library. Anyone know (from having done it and used it in production) if the build instructions for spark-1.4.1 against Scala-2.11 are correct? Thanks. -Lanny - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Local Spark talking to remote HDFS?
Changing the ip to the guest IP address just never connects. The VM has port tunnelling, and it passes through all the main ports, 8020 included to the host VM. You can tell that it was talking to the guest VM before, simply because it said when file not found Error is: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): org.apache.hadoop.hdfs.BlockMissingException: Could not obtain block: BP-452094660-10.0.2.15-1437494483194:blk_1073742905_2098 file=/tmp/people.txt but I have no idea what it means by that. It certainly can find the file and knows it exists. On 24 August 2015 at 20:43, Roberto Congiu roberto.con...@gmail.com wrote: When you launch your HDP guest VM, most likely it gets launched with NAT and an address on a private network (192.168.x.x) so on your windows host you should use that address (you can find out using ifconfig on the guest OS). I usually add an entry to my /etc/hosts for VMs that I use oftenif you use vagrant, there's also a vagrant module that can do that automatically. Also, I am not sure how the default HDP VM is set up, that is, if it only binds HDFS to 127.0.0.1 or to all addresses. You can check that with netstat -a. R. 2015-08-24 11:46 GMT-07:00 Dino Fancellu d...@felstar.com: I have a file in HDFS inside my HortonWorks HDP 2.3_1 VirtualBox VM. If I go into the guest spark-shell and refer to the file thus, it works fine val words=sc.textFile(hdfs:///tmp/people.txt) words.count However if I try to access it from a local Spark app on my Windows host, it doesn't work val conf = new SparkConf().setMaster(local).setAppName(My App) val sc = new SparkContext(conf) val words=sc.textFile(hdfs://localhost:8020/tmp/people.txt) words.count Emits The port 8020 is open, and if I choose the wrong file name, it will tell me My pom has dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.11/artifactId version1.4.1/version scopeprovided/scope /dependency Am I doing something wrong? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Local-Spark-talking-to-remote-HDFS-tp24425.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Local Spark talking to remote HDFS?
When you launch your HDP guest VM, most likely it gets launched with NAT and an address on a private network (192.168.x.x) so on your windows host you should use that address (you can find out using ifconfig on the guest OS). I usually add an entry to my /etc/hosts for VMs that I use oftenif you use vagrant, there's also a vagrant module that can do that automatically. Also, I am not sure how the default HDP VM is set up, that is, if it only binds HDFS to 127.0.0.1 or to all addresses. You can check that with netstat -a. R. 2015-08-24 11:46 GMT-07:00 Dino Fancellu d...@felstar.com: I have a file in HDFS inside my HortonWorks HDP 2.3_1 VirtualBox VM. If I go into the guest spark-shell and refer to the file thus, it works fine val words=sc.textFile(hdfs:///tmp/people.txt) words.count However if I try to access it from a local Spark app on my Windows host, it doesn't work val conf = new SparkConf().setMaster(local).setAppName(My App) val sc = new SparkContext(conf) val words=sc.textFile(hdfs://localhost:8020/tmp/people.txt) words.count Emits The port 8020 is open, and if I choose the wrong file name, it will tell me My pom has dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.11/artifactId version1.4.1/version scopeprovided/scope /dependency Am I doing something wrong? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Local-Spark-talking-to-remote-HDFS-tp24425.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark and scala-2.11
Hello, The instructions for building spark against scala-2.11 indicate using -Dspark-2.11. When I look in the pom.xml I find a profile named 'spark-2.11' but nothing that would indicate I should set a property. The sbt build seems to need the -Dscala-2.11 property set. Finally build/mvn does a simple grep of scala.version (which doesn't change after running dev/ change-version-to-2.11.sh) so the build seems to be grabbing the 2.10.4 scala library. Anyone know (from having done it and used it in production) if the build instructions for spark-1.4.1 against Scala-2.11 are correct? Thanks. -Lanny
Re: rdd count is throwing null pointer exception
Move your count operation outside the foreach and use a broadcast to access it inside the foreach. On Aug 17, 2015 10:34 AM, Priya Ch learnings.chitt...@gmail.com wrote: Looks like because of Spark-5063 RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x = rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063. On Mon, Aug 17, 2015 at 8:13 PM, Preetam preetam...@gmail.com wrote: The error could be because of the missing brackets after the word cache - .ticketRdd.cache() On Aug 17, 2015, at 7:26 AM, Priya Ch learnings.chitt...@gmail.com wrote: Hi All, Thank you very much for the detailed explanation. I have scenario like this- I have rdd of ticket records and another rdd of booking records. for each ticket record, i need to check whether any link exists in booking table. val ticketCachedRdd = ticketRdd.cache ticketRdd.foreach{ ticket = val bookingRecords = queryOnBookingTable (date, flightNumber, flightCarrier) // this function queries the booking table and retrieves the booking rows println(ticketCachedRdd.count) // this is throwing Null pointer exception } Is there somthing wrong in the count, i am trying to use the count of cached rdd when looping through the actual rdd. whats wrong in this ? Thanks, Padma Ch
Re: Exclude slf4j-log4j12 from the classpath via spark-submit
Hi Marcelo, When I add this exclusion rule to my pom: dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.4.1/version exclusions exclusion groupIdorg.slf4j/groupId artifactIdslf4j-log4j12/artifactId /exclusion /exclusions /dependency The SparkRunner class works fine (from IntelliJ) but when I build a jar and submit it to spark-submit: I get this error: Caused by: java.lang.ClassCastException: org.slf4j.impl.Log4jLoggerFactory cannot be cast to ch.qos.logback.classic.LoggerContext at com.opentable.logging.AssimilateForeignLogging.assimilate(AssimilateForeignLogging.java:68) at com.opentable.logging.AssimilateForeignLoggingHook.automaticAssimilationHook(AssimilateForeignLoggingHook.java:28) at com.opentable.logging.Log.clinit(Log.java:31) Which is this here (our logging lib is open sourced): https://github.com/opentable/otj-logging/blob/master/logging/src/main/java/com/opentable/logging/AssimilateForeignLogging.java#L68 Thanks, -Utkarsh On Mon, Aug 24, 2015 at 3:04 PM, Marcelo Vanzin van...@cloudera.com wrote: Hi Utkarsh, Unfortunately that's not going to be easy. Since Spark bundles all dependent classes into a single fat jar file, to remove that dependency you'd need to modify Spark's assembly jar (potentially in all your nodes). Doing that per-job is even trickier, because you'd probably need some kind of script to inject the correct binding into Spark's classpath. That being said, that message is not an error, it's more of a noisy warning. I'd expect slf4j to use the first binding available - in your case, logback-classic. Is that not the case? On Mon, Aug 24, 2015 at 2:50 PM, Utkarsh Sengar utkarsh2...@gmail.com wrote: Continuing this discussion: http://apache-spark-user-list.1001560.n3.nabble.com/same-log4j-slf4j-error-in-spark-9-1-td5592.html I am getting this error when I use logback-classic. SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:.m2/repository/ch/qos/logback/logback-classic/1.1.2/logback-classic-1.1.2.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:.m2/repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] I need to use logback-classic for my current project, so I am trying to ignore slf4j-log4j12 from spark: dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.4.1/version exclusions exclusion groupIdorg.slf4j/groupId artifactIdslf4j-log4j12/artifactId /exclusion /exclusions /dependency Now, when I run my job from Intellij (which sets the classpath), things work perfectly. But when I run my job via spark-submit: ~/spark-1.4.1-bin-hadoop2.4/bin/spark-submit --class runner.SparkRunner spark-0.1-SNAPSHOT-jar-with-dependencies.jar My job fails because spark-submit sets up the classpath and it re-adds the slf4j-log4j12. I am not adding spark jar to the uber-jar via the maven assembly plugin: dependencySets dependencySet .. useTransitiveDependenciesfalse/useTransitiveDependencies excludes excludeorg.apache.spark:spark-core_2.10/exclude /excludes /dependencySet /dependencySets So how can I exclude slf4j-log4j12.jar when I submit a job via spark-submit (on a per job basis)? -- Thanks, -Utkarsh -- Marcelo -- Thanks, -Utkarsh
Re: Spark Direct Streaming With ZK Updates
When updating the ZK offset in the driver (within foreachRDD), there is somehow a serialization exception getting thrown: 15/08/24 15:45:40 ERROR JobScheduler: Error in job generator java.io.NotSerializableException: org.I0Itec.zkclient.ZkClient at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) The code looks something like: directKafkaStream.transform { rdd = currOffsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd }.foreachRDD { rdd = ... /*do stuff with shuffling involved, then update DynamoDB*/ val props = new Properties() kafkaConf.foreach(param = props.put(param._1, param._2)) props.setProperty(AUTO_OFFSET_COMMIT, false) val consumerConfig = new ConsumerConfig(props) assert(!consumerConfig.autoCommitEnable) zkClient = new ZkClient(consumerConfig.zkConnect, consumerConfig.zkSessionTimeoutMs, consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer) offsetRanges.foreach { osr = val topicDirs = new ZKGroupTopicDirs(groupId, osr.topic) val zkPath = s${topicDirs.consumerOffsetDir}/${osr.partition} ZkUtils.updatePersistentPath(zkClient, zkPath, osr.untilOffset.toString) } } Why would there be serialization issues when I'm not trying to pass ZkClient to any of the workers? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Direct-Streaming-With-ZK-Updates-tp24423p24432.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: DataFrame#show cost 2 Spark Jobs ?
Hi Cheng, I know that sqlContext.read will trigger one spark job to infer the schema. What I mean is DataFrame#show cost 2 spark jobs. So overall it would cost 3 jobs. Here's the command I use: val df = sqlContext.read.json(file:///Users/hadoop/github/spark/examples/src/main/resources/people.json) // trigger one spark job to infer schema df.show()// trigger 2 spark jobs which is weird On Mon, Aug 24, 2015 at 10:56 PM, Cheng, Hao hao.ch...@intel.com wrote: The first job is to infer the json schema, and the second one is what you mean of the query. You can provide the schema while loading the json file, like below: sqlContext.read.schema(xxx).json(“…”)? Hao *From:* Jeff Zhang [mailto:zjf...@gmail.com] *Sent:* Monday, August 24, 2015 6:20 PM *To:* user@spark.apache.org *Subject:* DataFrame#show cost 2 Spark Jobs ? It's weird to me that the simple show function will cost 2 spark jobs. DataFrame#explain shows it is a very simple operation, not sure why need 2 jobs. == Parsed Logical Plan == Relation[age#0L,name#1] JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json] == Analyzed Logical Plan == age: bigint, name: string Relation[age#0L,name#1] JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json] == Optimized Logical Plan == Relation[age#0L,name#1] JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json] == Physical Plan == Scan JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json][age#0L,name#1] -- Best Regards Jeff Zhang -- Best Regards Jeff Zhang
Re: Exclude slf4j-log4j12 from the classpath via spark-submit
I get the same error even when I set the SPARK_CLASSPATH: export SPARK_CLASSPATH=/.m2/repository/ch/qos/logback/logback-classic/1.1.2/logback-classic-1.1.1.jar:/.m2/repository/ch/qos/logback/logback-core/1.1.2/logback-core-1.1.2.jar And I run the job like this: /spark-1.4.1-bin-hadoop2.4/bin/spark-submit --class runner.SparkRunner target/simspark-0.1-SNAPSHOT-jar-with-dependencies.jar I am not able to find the code in spark which adds these jars before the spark classes in classpath. Or maybe its a bug. Any suggestions on workarounds? Thanks, -Utkarsh On Mon, Aug 24, 2015 at 4:32 PM, Utkarsh Sengar utkarsh2...@gmail.com wrote: I assumed that's the case beacause of the error I got and the documentation which says: Extra classpath entries to append to the classpath of the driver. This is where I stand now: dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.4.1/version exclusions exclusion groupIdorg.slf4j/groupId artifactIdslf4j-log4j12/artifactId /exclusion /exclusions /dependency And no exclusions from my logging lib. And I submit this task: spark-1.4.1-bin-hadoop2.4/bin/spark-submit --class runner.SparkRunner --conf spark.driver.extraClassPath=/.m2/repository/ch/qos/logback/logback-classic/1.1.2/logback-classic-1.1.2.jar --conf spark.executor.extraClassPath=/.m2/repository/ch/qos/logback/logback-classic/1.1.2/logback-classic-1.1.2.jar --conf spark.driver.extraClassPath=/.m2/repository/ch/qos/logback/logback-core/1.1.2/logback-core-1.1.2.jar --conf spark.executor.extraClassPath=/.m2/repository/ch/qos/logback/logback-core/1.1.2/logback-core-1.1.2.jar target/simspark-0.1-SNAPSHOT-jar-with-dependencies.jar And I get the same error: Caused by: java.lang.ClassCastException: org.slf4j.impl.Log4jLoggerFactory cannot be cast to ch.qos.logback.classic.LoggerContext at com.opentable.logging.AssimilateForeignLogging.assimilate(AssimilateForeignLogging.java:68) at com.opentable.logging.AssimilateForeignLoggingHook.automaticAssimilationHook(AssimilateForeignLoggingHook.java:28) at com.opentable.logging.Log.clinit(Log.java:31) ... 16 more Thanks, -Utkarsh On Mon, Aug 24, 2015 at 4:11 PM, Marcelo Vanzin van...@cloudera.com wrote: On Mon, Aug 24, 2015 at 3:58 PM, Utkarsh Sengar utkarsh2...@gmail.com wrote: That didn't work since extraClassPath flag was still appending the jars at the end, so its still picking the slf4j jar provided by spark. Out of curiosity, how did you verify this? The extraClassPath options are supposed to prepend entries to the classpath, and the code seems to be doing that. If it's not really doing that in some case, it's a bug that needs to be fixed. Another option is those is setting the SPARK_CLASSPATH env variable, which is deprecated, but might come in handy in case there is actually a bug in handling those options. -- Marcelo -- Thanks, -Utkarsh -- Thanks, -Utkarsh
Where is Redgate's HDFS explorer?
http://hortonworks.com/blog/windows-explorer-experience-hdfs/ Seemed to exist, now now sign. Anything similar to tie HDFS into windows explorer? Thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Where-is-Redgate-s-HDFS-explorer-tp24431.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Exclude slf4j-log4j12 from the classpath via spark-submit
Hi Utkarsh, A quick look at slf4j's source shows it loads the first StaticLoggerBinder in your classpath. How are you adding the logback jar file to spark-submit? If you use spark.driver.extraClassPath and spark.executor.extraClassPath to add the jar, it should take precedence over the log4j binding embedded in the Spark assembly. On Mon, Aug 24, 2015 at 3:15 PM, Utkarsh Sengar utkarsh2...@gmail.com wrote: Hi Marcelo, When I add this exclusion rule to my pom: dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.4.1/version exclusions exclusion groupIdorg.slf4j/groupId artifactIdslf4j-log4j12/artifactId /exclusion /exclusions /dependency The SparkRunner class works fine (from IntelliJ) but when I build a jar and submit it to spark-submit: I get this error: Caused by: java.lang.ClassCastException: org.slf4j.impl.Log4jLoggerFactory cannot be cast to ch.qos.logback.classic.LoggerContext at com.opentable.logging.AssimilateForeignLogging.assimilate(AssimilateForeignLogging.java:68) at com.opentable.logging.AssimilateForeignLoggingHook.automaticAssimilationHook(AssimilateForeignLoggingHook.java:28) at com.opentable.logging.Log.clinit(Log.java:31) Which is this here (our logging lib is open sourced): https://github.com/opentable/otj-logging/blob/master/logging/src/main/java/com/opentable/logging/AssimilateForeignLogging.java#L68 Thanks, -Utkarsh On Mon, Aug 24, 2015 at 3:04 PM, Marcelo Vanzin van...@cloudera.com wrote: Hi Utkarsh, Unfortunately that's not going to be easy. Since Spark bundles all dependent classes into a single fat jar file, to remove that dependency you'd need to modify Spark's assembly jar (potentially in all your nodes). Doing that per-job is even trickier, because you'd probably need some kind of script to inject the correct binding into Spark's classpath. That being said, that message is not an error, it's more of a noisy warning. I'd expect slf4j to use the first binding available - in your case, logback-classic. Is that not the case? On Mon, Aug 24, 2015 at 2:50 PM, Utkarsh Sengar utkarsh2...@gmail.com wrote: Continuing this discussion: http://apache-spark-user-list.1001560.n3.nabble.com/same-log4j-slf4j-error-in-spark-9-1-td5592.html I am getting this error when I use logback-classic. SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:.m2/repository/ch/qos/logback/logback-classic/1.1.2/logback-classic-1.1.2.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:.m2/repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] I need to use logback-classic for my current project, so I am trying to ignore slf4j-log4j12 from spark: dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.4.1/version exclusions exclusion groupIdorg.slf4j/groupId artifactIdslf4j-log4j12/artifactId /exclusion /exclusions /dependency Now, when I run my job from Intellij (which sets the classpath), things work perfectly. But when I run my job via spark-submit: ~/spark-1.4.1-bin-hadoop2.4/bin/spark-submit --class runner.SparkRunner spark-0.1-SNAPSHOT-jar-with-dependencies.jar My job fails because spark-submit sets up the classpath and it re-adds the slf4j-log4j12. I am not adding spark jar to the uber-jar via the maven assembly plugin: dependencySets dependencySet .. useTransitiveDependenciesfalse/useTransitiveDependencies excludes excludeorg.apache.spark:spark-core_2.10/exclude /excludes /dependencySet /dependencySets So how can I exclude slf4j-log4j12.jar when I submit a job via spark-submit (on a per job basis)? -- Thanks, -Utkarsh -- Marcelo -- Thanks, -Utkarsh -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: DataFrame/JDBC very slow performance
Much appreciated! I am not comparing with select count(*) for performance, but it was one simple thing I tried to check the performance :). I think it now makes sense since Spark tries to extract all records before doing the count. I thought having an aggregated function query submitted over JDBC/Teradata would let Teradata do the heavy lifting. We currently only push down filters since there is a lot of variability in what types of aggregations various databases support. You can manually pushdown whatever you want by replacing the table name with a subquery (i.e. (SELECT ... FROM ...)) - How come my second query for (5B) records didn't return anything even after a long processing? If I understood correctly, Spark would try to fit it in memory and if not then might use disk space, which I have available? Nothing should be held in memory for a query like this (other than a single count per partition), so I don't think that is the problem. There is likely an error buried somewhere. - Am I supposed to do any Spark related tuning to make it work? My main need is to access data from these large table(s) on demand and provide aggregated and calculated results much quicker, for that I was trying out Spark. Next step I am thinking to export data in Parque files and give it a try. Do you have any suggestions for to deal with the problem? Exporting to parquet will likely be a faster option that trying to query through JDBC, since we have many more opportunities for parallelism here.
Re: Exclude slf4j-log4j12 from the classpath via spark-submit
That didn't work since extraClassPath flag was still appending the jars at the end, so its still picking the slf4j jar provided by spark. Although I found this flag: --conf spark.executor.userClassPathFirst=true (http://spark.apache.org/docs/latest/configuration.html) and tried this: ➜ simspark git:(bulkrunner) ✗ spark-1.4.1-bin-hadoop2.4/bin/spark-submit --class runner.SparkRunner --jars /.m2/repository/ch/qos/logback/logback-classic/1.1.2/logback-classic-1.1.2.jar,/.m2/repository/ch/qos/logback/logback-core/1.1.2/logback-core-1.1.2.jar --conf spark.executor.userClassPathFirst=true --conf spark.driver.userClassPathFirst=true target/ds-tetris-simspark-0.1-SNAPSHOT-jar-with-dependencies.jar But this led to another error: com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'akka.version' Thanks, -Utkarsh On Mon, Aug 24, 2015 at 3:25 PM, Marcelo Vanzin van...@cloudera.com wrote: Hi Utkarsh, A quick look at slf4j's source shows it loads the first StaticLoggerBinder in your classpath. How are you adding the logback jar file to spark-submit? If you use spark.driver.extraClassPath and spark.executor.extraClassPath to add the jar, it should take precedence over the log4j binding embedded in the Spark assembly. On Mon, Aug 24, 2015 at 3:15 PM, Utkarsh Sengar utkarsh2...@gmail.com wrote: Hi Marcelo, When I add this exclusion rule to my pom: dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.4.1/version exclusions exclusion groupIdorg.slf4j/groupId artifactIdslf4j-log4j12/artifactId /exclusion /exclusions /dependency The SparkRunner class works fine (from IntelliJ) but when I build a jar and submit it to spark-submit: I get this error: Caused by: java.lang.ClassCastException: org.slf4j.impl.Log4jLoggerFactory cannot be cast to ch.qos.logback.classic.LoggerContext at com.opentable.logging.AssimilateForeignLogging.assimilate(AssimilateForeignLogging.java:68) at com.opentable.logging.AssimilateForeignLoggingHook.automaticAssimilationHook(AssimilateForeignLoggingHook.java:28) at com.opentable.logging.Log.clinit(Log.java:31) Which is this here (our logging lib is open sourced): https://github.com/opentable/otj-logging/blob/master/logging/src/main/java/com/opentable/logging/AssimilateForeignLogging.java#L68 Thanks, -Utkarsh On Mon, Aug 24, 2015 at 3:04 PM, Marcelo Vanzin van...@cloudera.com wrote: Hi Utkarsh, Unfortunately that's not going to be easy. Since Spark bundles all dependent classes into a single fat jar file, to remove that dependency you'd need to modify Spark's assembly jar (potentially in all your nodes). Doing that per-job is even trickier, because you'd probably need some kind of script to inject the correct binding into Spark's classpath. That being said, that message is not an error, it's more of a noisy warning. I'd expect slf4j to use the first binding available - in your case, logback-classic. Is that not the case? On Mon, Aug 24, 2015 at 2:50 PM, Utkarsh Sengar utkarsh2...@gmail.com wrote: Continuing this discussion: http://apache-spark-user-list.1001560.n3.nabble.com/same-log4j-slf4j-error-in-spark-9-1-td5592.html I am getting this error when I use logback-classic. SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:.m2/repository/ch/qos/logback/logback-classic/1.1.2/logback-classic-1.1.2.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:.m2/repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] I need to use logback-classic for my current project, so I am trying to ignore slf4j-log4j12 from spark: dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.4.1/version exclusions exclusion groupIdorg.slf4j/groupId artifactIdslf4j-log4j12/artifactId /exclusion /exclusions /dependency Now, when I run my job from Intellij (which sets the classpath), things work perfectly. But when I run my job via spark-submit: ~/spark-1.4.1-bin-hadoop2.4/bin/spark-submit --class runner.SparkRunner spark-0.1-SNAPSHOT-jar-with-dependencies.jar My job fails because spark-submit sets up the classpath and it re-adds the slf4j-log4j12. I am not adding spark jar to the uber-jar via the maven assembly plugin: dependencySets dependencySet .. useTransitiveDependenciesfalse/useTransitiveDependencies
Re: Protobuf error when streaming from Kafka
Can you show the complete stack trace ? Which Spark / Kafka release are you using ? Thanks On Mon, Aug 24, 2015 at 4:58 PM, Cassa L lcas...@gmail.com wrote: Hi, I am storing messages in Kafka using protobuf and reading them into Spark. I upgraded protobuf version from 2.4.1 to 2.5.0. I got java.lang.UnsupportedOperationException for older messages. However, even for new messages I get the same error. Spark does convert it though. I see my messages. How do I get rid of this error? java.lang.UnsupportedOperationException: This is supposed to be overridden by subclasses. at com.google.protobuf.GeneratedMessage.getUnknownFields(GeneratedMessage.java:180) at org.apache.hadoop.hdfs.protocol.proto.HdfsProtos$FsPermissionProto.getSerializedSize(HdfsProtos.java:5407) at com.google.protobuf.CodedOutputStream.computeMessageSizeNoTag(CodedOutputStream.java:749)
Re: Exclude slf4j-log4j12 from the classpath via spark-submit
On Mon, Aug 24, 2015 at 3:58 PM, Utkarsh Sengar utkarsh2...@gmail.com wrote: That didn't work since extraClassPath flag was still appending the jars at the end, so its still picking the slf4j jar provided by spark. Out of curiosity, how did you verify this? The extraClassPath options are supposed to prepend entries to the classpath, and the code seems to be doing that. If it's not really doing that in some case, it's a bug that needs to be fixed. Another option is those is setting the SPARK_CLASSPATH env variable, which is deprecated, but might come in handy in case there is actually a bug in handling those options. -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Protobuf error when streaming from Kafka
Hi, I am storing messages in Kafka using protobuf and reading them into Spark. I upgraded protobuf version from 2.4.1 to 2.5.0. I got java.lang.UnsupportedOperationException for older messages. However, even for new messages I get the same error. Spark does convert it though. I see my messages. How do I get rid of this error? java.lang.UnsupportedOperationException: This is supposed to be overridden by subclasses. at com.google.protobuf.GeneratedMessage.getUnknownFields(GeneratedMessage.java:180) at org.apache.hadoop.hdfs.protocol.proto.HdfsProtos$FsPermissionProto.getSerializedSize(HdfsProtos.java:5407) at com.google.protobuf.CodedOutputStream.computeMessageSizeNoTag(CodedOutputStream.java:749)
Re: Exclude slf4j-log4j12 from the classpath via spark-submit
I assumed that's the case beacause of the error I got and the documentation which says: Extra classpath entries to append to the classpath of the driver. This is where I stand now: dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.4.1/version exclusions exclusion groupIdorg.slf4j/groupId artifactIdslf4j-log4j12/artifactId /exclusion /exclusions /dependency And no exclusions from my logging lib. And I submit this task: spark-1.4.1-bin-hadoop2.4/bin/spark-submit --class runner.SparkRunner --conf spark.driver.extraClassPath=/.m2/repository/ch/qos/logback/logback-classic/1.1.2/logback-classic-1.1.2.jar --conf spark.executor.extraClassPath=/.m2/repository/ch/qos/logback/logback-classic/1.1.2/logback-classic-1.1.2.jar --conf spark.driver.extraClassPath=/.m2/repository/ch/qos/logback/logback-core/1.1.2/logback-core-1.1.2.jar --conf spark.executor.extraClassPath=/.m2/repository/ch/qos/logback/logback-core/1.1.2/logback-core-1.1.2.jar target/simspark-0.1-SNAPSHOT-jar-with-dependencies.jar And I get the same error: Caused by: java.lang.ClassCastException: org.slf4j.impl.Log4jLoggerFactory cannot be cast to ch.qos.logback.classic.LoggerContext at com.opentable.logging.AssimilateForeignLogging.assimilate(AssimilateForeignLogging.java:68) at com.opentable.logging.AssimilateForeignLoggingHook.automaticAssimilationHook(AssimilateForeignLoggingHook.java:28) at com.opentable.logging.Log.clinit(Log.java:31) ... 16 more Thanks, -Utkarsh On Mon, Aug 24, 2015 at 4:11 PM, Marcelo Vanzin van...@cloudera.com wrote: On Mon, Aug 24, 2015 at 3:58 PM, Utkarsh Sengar utkarsh2...@gmail.com wrote: That didn't work since extraClassPath flag was still appending the jars at the end, so its still picking the slf4j jar provided by spark. Out of curiosity, how did you verify this? The extraClassPath options are supposed to prepend entries to the classpath, and the code seems to be doing that. If it's not really doing that in some case, it's a bug that needs to be fixed. Another option is those is setting the SPARK_CLASSPATH env variable, which is deprecated, but might come in handy in case there is actually a bug in handling those options. -- Marcelo -- Thanks, -Utkarsh
What does Attribute and AttributeReference mean in Spark SQL
There are many such kind of case class or concept such as Attribute/AttributeReference/Expression in Spark SQL I would ask what Attribute/AttributeReference/Expression mean, given a sql query like select a,b from c, it a, b are two Attributes? a + b is an expression? Looks I misunderstand it because Attribute is extending Expression in the code,which means Attribute itself is an Expression. Thanks.
Re: Too many files/dirs in hdfs
Any help would be appreciated On Wed, Aug 19, 2015 at 9:38 AM, Mohit Anchlia mohitanch...@gmail.com wrote: My question was how to do this in Hadoop? Could somebody point me to some examples? On Tue, Aug 18, 2015 at 10:43 PM, UMESH CHAUDHARY umesh9...@gmail.com wrote: Of course, Java or Scala can do that: 1) Create a FileWriter with append or roll over option 2) For each RDD create a StringBuilder after applying your filters 3) Write this StringBuilder to File when you want to write (The duration can be defined as a condition) On Tue, Aug 18, 2015 at 11:05 PM, Mohit Anchlia mohitanch...@gmail.com wrote: Is there a way to store all the results in one file and keep the file roll over separate than the spark streaming batch interval? On Mon, Aug 17, 2015 at 2:39 AM, UMESH CHAUDHARY umesh9...@gmail.com wrote: In Spark Streaming you can simply check whether your RDD contains any records or not and if records are there you can save them using FIleOutputStream: DStream.foreachRDD(t= { var count = t.count(); if (count0){ // SAVE YOUR STUFF} }; This will not create unnecessary files of 0 bytes. On Mon, Aug 17, 2015 at 2:51 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Currently, spark streaming would create a new directory for every batch and store the data to it (whether it has anything or not). There is no direct append call as of now, but you can achieve this either with FileUtil.copyMerge http://apache-spark-user-list.1001560.n3.nabble.com/save-spark-streaming-output-to-single-file-on-hdfs-td21124.html#a21167 or have a separate program which will do the clean up for you. Thanks Best Regards On Sat, Aug 15, 2015 at 5:20 AM, Mohit Anchlia mohitanch...@gmail.com wrote: Spark stream seems to be creating 0 bytes files even when there is no data. Also, I have 2 concerns here: 1) Extra unnecessary files is being created from the output 2) Hadoop doesn't work really well with too many files and I see that it is creating a directory with a timestamp every 1 second. Is there a better way of writing a file, may be use some kind of append mechanism where one doesn't have to change the batch interval.
Re: MLlib Prefixspan implementation
CCing the mailing list again. It's currently not on the radar. Do you have a use case for it? I can bring it up during 1.6 roadmap planning tomorrow. On Mon, Aug 24, 2015 at 8:28 PM, alexis GILLAIN ila...@hotmail.com wrote: Hi, I just realized the article I mentioned is cited in the jira and not in the code so I guess you didn't use this result. Do you plan to implement sequence with timestamp and gap constraint as in : https://people.mpi-inf.mpg.de/~rgemulla/publications/miliaraki13mg-fsm.pdf 2015-08-25 7:06 GMT+08:00 Feynman Liang fli...@databricks.com: Hi Alexis, Unfortunately, both of the papers you referenced appear to be translations and are quite difficult to understand. We followed http://doi.org/10.1109/ICDE.2001.914830 when implementing PrefixSpan. Perhaps you can find the relevant lines in there so I can elaborate further? Feynman On Thu, Aug 20, 2015 at 9:07 AM, alexis GILLAIN ila...@hotmail.com wrote: I want to use prefixspan so I had a look at the code and the cited paper : Distributed PrefixSpan Algorithm Based on MapReduce. There is a result in the paper I didn't really undertstand and I could'nt find where it is used in the code. Suppose a sequence database S = {1,2...n}, a sequence a... is a length-(L-1) (2≤L≤n) sequential pattern, in projected databases which is a prefix of a length-(L-1) sequential pattern a...a, when the support count of a is not less than min_support, it is equal to obtaining a length-L sequential pattern a ... a from projected databases that obtaining a length-L sequential pattern a ... a from a sequence database S. According to the paper It's supposed to add a pruning step in the reduce function but I couldn't find where. This result seems to come from a previous paper : Wang Linlin, Fan Jun. Improved Algorithm for Sequential Pattern Mining Based on PrefixSpan [J]. Computer Engineering, 2009, 35(23): 56-61 but it didn't help me to understand it and how it can improve the algorithm.
Re: DataFrame#show cost 2 Spark Jobs ?
Hao, I can reproduce it using the master branch. I'm curious why you cannot reproduce it. Did you check if the input HadoopRDD did have two partitions? My test code is val df = sqlContext.read.json(examples/src/main/resources/people.json) df.show() Best Regards, Shixiong Zhu 2015-08-25 13:01 GMT+08:00 Cheng, Hao hao.ch...@intel.com: Hi Jeff, which version are you using? I couldn’t reproduce the 2 spark jobs in the `df.show()` with latest code, we did refactor the code for json data source recently, not sure you’re running an earlier version of it. And a known issue is Spark SQL will try to re-list the files every time when loading the data for JSON, it’s probably causes longer time for ramp up with large number of files/partitions. *From:* Jeff Zhang [mailto:zjf...@gmail.com] *Sent:* Tuesday, August 25, 2015 8:11 AM *To:* Cheng, Hao *Cc:* user@spark.apache.org *Subject:* Re: DataFrame#show cost 2 Spark Jobs ? Hi Cheng, I know that sqlContext.read will trigger one spark job to infer the schema. What I mean is DataFrame#show cost 2 spark jobs. So overall it would cost 3 jobs. Here's the command I use: val df = sqlContext.read.json( file:///Users/hadoop/github/spark/examples/src/main/resources/people.json) // trigger one spark job to infer schema df.show()// trigger 2 spark jobs which is weird On Mon, Aug 24, 2015 at 10:56 PM, Cheng, Hao hao.ch...@intel.com wrote: The first job is to infer the json schema, and the second one is what you mean of the query. You can provide the schema while loading the json file, like below: sqlContext.read.schema(xxx).json(“…”)? Hao *From:* Jeff Zhang [mailto:zjf...@gmail.com] *Sent:* Monday, August 24, 2015 6:20 PM *To:* user@spark.apache.org *Subject:* DataFrame#show cost 2 Spark Jobs ? It's weird to me that the simple show function will cost 2 spark jobs. DataFrame#explain shows it is a very simple operation, not sure why need 2 jobs. == Parsed Logical Plan == Relation[age#0L,name#1] JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json] == Analyzed Logical Plan == age: bigint, name: string Relation[age#0L,name#1] JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json] == Optimized Logical Plan == Relation[age#0L,name#1] JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json] == Physical Plan == Scan JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json][age#0L,name#1] -- Best Regards Jeff Zhang -- Best Regards Jeff Zhang
RE: DataFrame#show cost 2 Spark Jobs ?
Hi Jeff, which version are you using? I couldn’t reproduce the 2 spark jobs in the `df.show()` with latest code, we did refactor the code for json data source recently, not sure you’re running an earlier version of it. And a known issue is Spark SQL will try to re-list the files every time when loading the data for JSON, it’s probably causes longer time for ramp up with large number of files/partitions. From: Jeff Zhang [mailto:zjf...@gmail.com] Sent: Tuesday, August 25, 2015 8:11 AM To: Cheng, Hao Cc: user@spark.apache.org Subject: Re: DataFrame#show cost 2 Spark Jobs ? Hi Cheng, I know that sqlContext.read will trigger one spark job to infer the schema. What I mean is DataFrame#show cost 2 spark jobs. So overall it would cost 3 jobs. Here's the command I use: val df = sqlContext.read.json(file:///Users/hadoop/github/spark/examples/src/main/resources/people.jsonfile:///\\Users\hadoop\github\spark\examples\src\main\resources\people.json) // trigger one spark job to infer schema df.show()// trigger 2 spark jobs which is weird On Mon, Aug 24, 2015 at 10:56 PM, Cheng, Hao hao.ch...@intel.commailto:hao.ch...@intel.com wrote: The first job is to infer the json schema, and the second one is what you mean of the query. You can provide the schema while loading the json file, like below: sqlContext.read.schema(xxx).json(“…”)? Hao From: Jeff Zhang [mailto:zjf...@gmail.commailto:zjf...@gmail.com] Sent: Monday, August 24, 2015 6:20 PM To: user@spark.apache.orgmailto:user@spark.apache.org Subject: DataFrame#show cost 2 Spark Jobs ? It's weird to me that the simple show function will cost 2 spark jobs. DataFrame#explain shows it is a very simple operation, not sure why need 2 jobs. == Parsed Logical Plan == Relation[age#0L,name#1] JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json] == Analyzed Logical Plan == age: bigint, name: string Relation[age#0L,name#1] JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json] == Optimized Logical Plan == Relation[age#0L,name#1] JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json] == Physical Plan == Scan JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json][age#0L,name#1] -- Best Regards Jeff Zhang -- Best Regards Jeff Zhang
Re: build spark 1.4.1 with JDK 1.6
I'm trying to build Spark 1.4 with Java 7 and despite having that as my JAVA_HOME, I get [INFO] --- scala-maven-plugin:3.2.2:compile (scala-compile-first) @ spark-launcher_2.10 --- [INFO] Using zinc server for incremental compilation [info] Compiling 8 Java sources to /Users/eric/spark/spark/launcher/target/scala-2.10/classes... [error] javac: invalid source release: 1.7 [error] Usage: javac options source files [error] use -help for a list of possible options [error] Compile failed at Aug 24, 2015 7:44:40 PM [0.020s] [INFO] [INFO] Reactor Summary: [INFO] [INFO] Spark Project Parent POM ... SUCCESS [ 3.109 s] [INFO] Spark Project Launcher . FAILURE [ 4.493 s] On Fri, Aug 21, 2015 at 9:43 AM, Marcelo Vanzin van...@cloudera.com wrote: That was only true until Spark 1.3. Spark 1.4 can be built with JDK7 and pyspark will still work. On Fri, Aug 21, 2015 at 8:29 AM, Chen Song chen.song...@gmail.com wrote: Thanks Sean. So how PySpark is supported. I thought PySpark needs jdk 1.6. Chen On Fri, Aug 21, 2015 at 11:16 AM, Sean Owen so...@cloudera.com wrote: Spark 1.4 requires Java 7. On Fri, Aug 21, 2015, 3:12 PM Chen Song chen.song...@gmail.com wrote: I tried to build Spark 1.4.1 on cdh 5.4.0. Because we need to support PySpark, I used JDK 1.6. I got the following error, [INFO] --- scala-maven-plugin:3.2.0:testCompile (scala-test-compile-first) @ spark-streaming_2.10 --- java.lang.UnsupportedClassVersionError: org/apache/hadoop/io/LongWritable : Unsupported major.minor version 51.0 at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClassCond(ClassLoader.java:637) at java.lang.ClassLoader.defineClass(ClassLoader.java:621) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141) I know that is due to the hadoop jar for cdh5.4.0 is built with JDK 7. Anyone has done this before? Thanks, -- Chen Song -- Chen Song -- --- You received this message because you are subscribed to the Google Groups CDH Users group. To unsubscribe from this group and stop receiving emails from it, send an email to cdh-user+unsubscr...@cloudera.org. For more options, visit https://groups.google.com/a/cloudera.org/d/optout. -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: DataFrame#show cost 2 Spark Jobs ?
Because defaultMinPartitions is 2 (See https://github.com/apache/spark/blob/642c43c81c835139e3f35dfd6a215d668a474203/core/src/main/scala/org/apache/spark/SparkContext.scala#L2057 ), your input people.json will be split to 2 partitions. At first, `take` will start a job for the first partition. However, the limit is 21, but the first partition only has 2 records. So it will continue to start a new job for the second partition. You can check implementation details in SparkPlan.executeTake: https://github.com/apache/spark/blob/642c43c81c835139e3f35dfd6a215d668a474203/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala#L185 Best Regards, Shixiong Zhu 2015-08-25 8:11 GMT+08:00 Jeff Zhang zjf...@gmail.com: Hi Cheng, I know that sqlContext.read will trigger one spark job to infer the schema. What I mean is DataFrame#show cost 2 spark jobs. So overall it would cost 3 jobs. Here's the command I use: val df = sqlContext.read.json(file:///Users/hadoop/github/spark/examples/src/main/resources/people.json) // trigger one spark job to infer schema df.show()// trigger 2 spark jobs which is weird On Mon, Aug 24, 2015 at 10:56 PM, Cheng, Hao hao.ch...@intel.com wrote: The first job is to infer the json schema, and the second one is what you mean of the query. You can provide the schema while loading the json file, like below: sqlContext.read.schema(xxx).json(“…”)? Hao *From:* Jeff Zhang [mailto:zjf...@gmail.com] *Sent:* Monday, August 24, 2015 6:20 PM *To:* user@spark.apache.org *Subject:* DataFrame#show cost 2 Spark Jobs ? It's weird to me that the simple show function will cost 2 spark jobs. DataFrame#explain shows it is a very simple operation, not sure why need 2 jobs. == Parsed Logical Plan == Relation[age#0L,name#1] JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json] == Analyzed Logical Plan == age: bigint, name: string Relation[age#0L,name#1] JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json] == Optimized Logical Plan == Relation[age#0L,name#1] JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json] == Physical Plan == Scan JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json][age#0L,name#1] -- Best Regards Jeff Zhang -- Best Regards Jeff Zhang
Re: Drop table and Hive warehouse
Thanks, Michael. I discovered it myself. Finally, it was not a bug from Spark. I have two HDFS cluster and Hive uses hive.metastore.warehouse.dir + fs.defaultFS(HDFS1) for saving internal tables and also reference a default database URI(HDFS2) in DBS table from metastore. It may not be a problem if URI of default database is same as fs.defaultFS. Maybe few of people set their default database URI to another HDFS like me. I copied hive-site.xml into spark conf then Hive and Spark had same metastore configuration. But the result table of saveAsTable has its metadata in HDFS1 and its data in HDFS2. DESCRIBE FORMATTED table_name will show the difference between Location of table(HDFS1) and Path in Storage Desc Params(HDFS2) even though table is type of MANAGED_TABLE. That is why DROP TABLE deletes only metadata in HDFS1 and NOT delete data files in HDFS2. So I can not reproduce a table with same location and same name. If I update DBS table in metastoredb to set default database URI to HDFS1, it works perfectly. Kevin --- Original Message --- Sender : Michael Armbrustmich...@databricks.com Date : 2015-08-25 00:43 (GMT+09:00) Title : Re: Drop table and Hive warehouse Thats not the expected behavior. What version of Spark? On Mon, Aug 24, 2015 at 1:32 AM, Kevin Jung itsjb.j...@samsung.com wrote: When I store DataFrame as table with command saveAsTable and then execute DROP TABLE in SparkSQL, it doesn't actually delete files in hive warehouse. The table disappears from a table list but the data files are still alive. Because of this, I can't saveAsTable with a same name before dropping table. Is it a normal situation? If it is, I will delete files manually ;) Kevin 상기 메일은 지정된 수신인만을 위한 것이며, 부정경쟁방지 및 영업비밀보호에 관한 법률,개인정보 보호법을 포함하여 관련 법령에 따라 보호의 대상이 되는 영업비밀, 산업기술,기밀정보, 개인정보 등을 포함하고 있을 수 있습니다. 본 문서에 포함된 정보의 전부 또는 일부를 무단으로 복사 또는 사용하거나 제3자에게 공개, 배포, 제공하는 것은 엄격히 금지됩니다. 본 메일이 잘못 전송된 경우 발신인 또는 당사에게 알려주시고 본 메일을 즉시 삭제하여 주시기 바랍니다. The contents of this e-mail message and any attachments are confidential and are intended solely for addressee. The information may also be legally privileged. This transmission is sent in trust, for the sole purpose of delivery to the intended recipient. If you have received this transmission in error, any use, reproduction or dissemination of this transmission is strictly prohibited. If you are not the intended recipient, please immediately notify the sender by reply e-mail or phone and delete this message and its attachments, if any.
Re: Spark Direct Streaming With ZK Updates
I'd start off by trying to simplify that closure - you don't need the transform step, or currOffsetRanges to be scoped outside of it. Just do everything in foreachRDD. LIkewise, it looks like zkClient is also scoped outside of the closure passed to foreachRDD i.e. you have zkClient = new ZkClient(consumerConfig.zkConnec instead of val zkClient = new ZkClient(consumerConfig.zkConnec On Mon, Aug 24, 2015 at 5:53 PM, suchenzang suchenz...@gmail.com wrote: When updating the ZK offset in the driver (within foreachRDD), there is somehow a serialization exception getting thrown: 15/08/24 15:45:40 ERROR JobScheduler: Error in job generator java.io.NotSerializableException: org.I0Itec.zkclient.ZkClient at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) The code looks something like: directKafkaStream.transform { rdd = currOffsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd }.foreachRDD { rdd = ... /*do stuff with shuffling involved, then update DynamoDB*/ val props = new Properties() kafkaConf.foreach(param = props.put(param._1, param._2)) props.setProperty(AUTO_OFFSET_COMMIT, false) val consumerConfig = new ConsumerConfig(props) assert(!consumerConfig.autoCommitEnable) zkClient = new ZkClient(consumerConfig.zkConnect, consumerConfig.zkSessionTimeoutMs, consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer) offsetRanges.foreach { osr = val topicDirs = new ZKGroupTopicDirs(groupId, osr.topic) val zkPath = s${topicDirs.consumerOffsetDir}/${osr.partition} ZkUtils.updatePersistentPath(zkClient, zkPath, osr.untilOffset.toString) } } Why would there be serialization issues when I'm not trying to pass ZkClient to any of the workers? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Direct-Streaming-With-ZK-Updates-tp24423p24432.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Test case for the spark sql catalyst
Yes, check the source code under: https://github.com/apache/spark/tree/master/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst From: Todd [mailto:bit1...@163.com] Sent: Tuesday, August 25, 2015 1:01 PM To: user@spark.apache.org Subject: Test case for the spark sql catalyst Hi, Are there test cases for the spark sql catalyst, such as testing the rules of transforming unsolved query plan? Thanks!
Re: Spark
I think you could try sorting the endPointsCount and then doing a take. This should be a distributed process and only the result would get returned to the driver. Best Regards, Sonal Founder, Nube Technologies http://www.nubetech.co Check out Reifier at Spark Summit 2015 https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/ http://in.linkedin.com/in/sonalgoyal On Tue, Aug 25, 2015 at 10:22 AM, Spark Enthusiast sparkenthusi...@yahoo.in wrote: I was running a Spark Job to crunch a 9GB apache log file When I saw the following error: 15/08/25 04:25:16 WARN scheduler.TaskSetManager: Lost task 99.0 in stage 37.0 (TID 4115, ip-10-150-137-100.ap-southeast-1.compute.internal): ExecutorLostFailure (executor 29 lost) 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted ShuffleMapTask(37, 40), so marking it as still running 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted ShuffleMapTask(37, 86), so marking it as still running 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted ShuffleMapTask(37, 84), so marking it as still running 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted ShuffleMapTask(37, 22), so marking it as still running 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted ShuffleMapTask(37, 48), so marking it as still running 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted ShuffleMapTask(37, 12), so marking it as still running 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Executor lost: 29 (epoch 59) 15/08/25 04:25:16 INFO storage.BlockManagerMasterActor: Trying to remove executor 29 from BlockManagerMaster. 15/08/25 04:25:16 INFO storage.BlockManagerMasterActor: Removing block manager BlockManagerId(29, ip-10-150-137-100.ap-southeast-1.compute.internal, 39411) . . Encountered Exception An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. : org.apache.spark.SparkException: Job cancelled because SparkContext was shut down at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:699) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:698) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:698) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1411) at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84) at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1346) at org.apache.spark.SparkContext.stop(SparkContext.scala:1380) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$$anon$1.run(YarnClientSchedulerBackend.scala:143) . . Looking further, it seems like takeOrdered (called by my application) uses collect() internally and hence drains out all the Drive memory. line 361, in top10EndPoints topEndpoints = endpointCounts.takeOrdered(10, lambda s: -1 * s[1]) File /home/hadoop/spark/python/pyspark/rdd.py, line 1174, in takeOrdered return self.mapPartitions(lambda it: [heapq.nsmallest(num, it, key)]).reduce(merge) File /home/hadoop/spark/python/pyspark/rdd.py, line 739, in reduce vals = self.mapPartitions(func).collect() File /home/hadoop/spark/python/pyspark/rdd.py, line 713, in collect port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) File /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ self.target_id, self.name) File /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value format(target_id, '.', name), value) How can I rewrite this code endpointCounts = (access_logs .map(lambda log: (log.endpoint, 1)) .reduceByKey(lambda a, b : a + b)) #Endpoints is now a list of Tuples of [(endpoint1, count1), (endpoint2, count2), ] topEndpoints = endpointCounts.takeOrdered(10, lambda s: -1 * s[1]) so that this error does not happen?
Re: How to list all dataframes and RDDs available in current session?
Okay but how? thats what I am trying to figure out ? Any command you would suggest? Sent from my iPhone, plaese excuse any typos :) On Aug 21, 2015, at 11:45 PM, Raghavendra Pandey raghavendra.pan...@gmail.com wrote: You get the list of all the persistet rdd using spark context... On Aug 21, 2015 12:06 AM, Rishitesh Mishra rishi80.mis...@gmail.com wrote: I am not sure if you can view all RDDs in a session. Tables are maintained in a catalogue . Hence its easier. However you can see the DAG representation , which lists all the RDDs in a job , with Spark UI. On 20 Aug 2015 22:34, Dhaval Patel dhaval1...@gmail.com wrote: Apologies I accidentally included Spark User DL on BCC. The actual email message is below. = Hi: I have been working on few example using zeppelin. I have been trying to find a command that would list all *dataframes/RDDs* that has been created in current session. Anyone knows if there is any such commands available? Something similar to SparkSQL to list all temp tables : show tables; Thanks, Dhaval On Thu, Aug 20, 2015 at 12:49 PM, Dhaval Patel dhaval1...@gmail.com wrote: Hi: I have been working on few example using zeppelin. I have been trying to find a command that would list all *dataframes/RDDs* that has been created in current session. Anyone knows if there is any such commands available? Something similar to SparkSQL to list all temp tables : show tables; Thanks, Dhaval
Spark
I was running a Spark Job to crunch a 9GB apache log file When I saw the following error: 15/08/25 04:25:16 WARN scheduler.TaskSetManager: Lost task 99.0 in stage 37.0 (TID 4115, ip-10-150-137-100.ap-southeast-1.compute.internal): ExecutorLostFailure (executor 29 lost)15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted ShuffleMapTask(37, 40), so marking it as still running15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted ShuffleMapTask(37, 86), so marking it as still running15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted ShuffleMapTask(37, 84), so marking it as still running15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted ShuffleMapTask(37, 22), so marking it as still running15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted ShuffleMapTask(37, 48), so marking it as still running15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted ShuffleMapTask(37, 12), so marking it as still running15/08/25 04:25:16 INFO scheduler.DAGScheduler: Executor lost: 29 (epoch 59)15/08/25 04:25:16 INFO storage.BlockManagerMasterActor: Trying to remove executor 29 from BlockManagerMaster.15/08/25 04:25:16 INFO storage.BlockManagerMasterActor: Removing block manager BlockManagerId(29, ip-10-150-137-100.ap-southeast-1.compute.internal, 39411) . .Encountered Exception An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.: org.apache.spark.SparkException: Job cancelled because SparkContext was shut down at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:699) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:698) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:698) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1411) at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84) at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1346) at org.apache.spark.SparkContext.stop(SparkContext.scala:1380) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$$anon$1.run(YarnClientSchedulerBackend.scala:143) . . Looking further, it seems like takeOrdered (called by my application) uses collect() internally and hence drains out all the Drive memory. line 361, in top10EndPoints topEndpoints = endpointCounts.takeOrdered(10, lambda s: -1 * s[1]) File /home/hadoop/spark/python/pyspark/rdd.py, line 1174, in takeOrdered return self.mapPartitions(lambda it: [heapq.nsmallest(num, it, key)]).reduce(merge) File /home/hadoop/spark/python/pyspark/rdd.py, line 739, in reduce vals = self.mapPartitions(func).collect() File /home/hadoop/spark/python/pyspark/rdd.py, line 713, in collect port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) File /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ self.target_id, self.name) File /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value format(target_id, '.', name), value) How can I rewrite this code endpointCounts = (access_logs .map(lambda log: (log.endpoint, 1)) .reduceByKey(lambda a, b : a + b)) #Endpoints is now a list of Tuples of [(endpoint1, count1), (endpoint2, count2), ] topEndpoints = endpointCounts.takeOrdered(10, lambda s: -1 * s[1]) so that this error does not happen?
Test case for the spark sql catalyst
Hi, Are there test cases for the spark sql catalyst, such as testing the rules of transforming unsolved query plan? Thanks!
How to access Spark UI through AWS
I am using the steps from this article https://aws.amazon.com/articles/Elastic-MapReduce/4926593393724923 to get spark up and running on EMR through yarn. Once up and running I ssh in and cd to the spark bin and run spark-shell --master yarn. Once this spins up I can see that the UI is started at the internal ip of 4040. If I hit the public dns at 4040 with dynamic port tunneling and foxyproxy then I get a crude UI (css seems broken), however the proxy continuously redirects me to the main page, so I cannot drill into anything. So, I tried static tunneling, but can't seem to get through. So, how can I access the spark UI when running a spark shell in AWS yarn? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-access-Spark-UI-through-AWS-tp24436.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org