Re: Task not Serializable: Graph is unexpectedly null when DStream is being serialized
Hey, so I start the context at the very end when all the piping is done. BTW a foreachRDD will be called on the resulting dstream.map() right after that. The puzzling thing is why removing the context bounds solve the problem... What does this exception mean in general? On Mon, Apr 20, 2015 at 1:33 PM, Tathagata Das t...@databricks.com wrote: When are you getting this exception? After starting the context? TD On Mon, Apr 20, 2015 at 10:44 AM, Jean-Pascal Billaud j...@tellapart.com wrote: Hi, I am getting this serialization exception and I am not too sure what Graph is unexpectedly null when DStream is being serialized means? 15/04/20 06:12:38 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: Task not serializable) Exception in thread Driver org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable( ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean( ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1435) at org.apache.spark.streaming.dstream.DStream.map(DStream. scala:438) [...] Caused by: java.io.NotSerializableException: Graph is unexpectedly null when DStream is being serialized. at org.apache.spark.streaming.dstream.DStream$anonfun$ writeObject$1.apply$mcV$sp(DStream.scala:420) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985) at org.apache.spark.streaming.dstream.DStream.writeObject( DStream.scala:403) The operation comes down to something like this: dstream.map(tuple = { val w = StreamState.fetch[K,W](state.prefixKey, tuple._1) (tuple._1, (tuple._2, w)) }) And StreamState being a very simple standalone object: object StreamState { def fetch[K : ClassTag : Ordering, V : ClassTag](prefixKey: String, key: K) : Option[V] = None } However if I remove the context bounds from K in fetch e.g. removing ClassTag and Ordering then everything is fine. If anyone has some pointers, I'd really appreciate it. Thanks,
Why is Columnar Parquet used as default for saving Row-based DataFrames/RDD?
Hello, I have the above naive question if anyone could help. Why not using a Row-based File format to save Row-based DataFrames/RDD? Thanks, Lan
Re: Streaming Linear Regression problem
Did you keep adding new files under the `train/` folder? What was the exact warn message? -Xiangrui On Fri, Apr 17, 2015 at 4:56 AM, barisak baris.akg...@gmail.com wrote: Hi, I write this code for just train the Stream Linear Regression, but I took no data found warn, so no weights were not updated. Is there any solution for this ? Thanks import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.{LabeledPoint, StreamingLinearRegressionWithSGD} import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} object StreamingLinearRegression { def main(args: Array[String]) { val numFeatures=3 val conf = new SparkConf().setMaster(local[2]).setAppName(StreamingLinearRegression) val ssc = new StreamingContext(conf, Seconds(30)) val trainingData = ssc.textFileStream(/home/barisakgu/Desktop/Spark/train).map(LabeledPoint.parse).cache() val testData = ssc.textFileStream(/home/barisakgu/Desktop/Spark/test).map(LabeledPoint.parse) val model = new StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(numFeatures)) model.trainOn(trainingData) model.predictOnValues(testData.map(lp = (lp.label, lp.features))).print() ssc.start() ssc.awaitTermination() } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Linear-Regression-problem-tp22539.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
ChiSquared Test from user response flat files to RDD[Vector]?
Hi Spark community, I'm very new to the Apache Spark community; but if this (very active) group is anything like the other Apache project user groups I've worked with, I'm going to enjoy discussions here very much. Thanks in advance! Use Case: I am trying to go from flat files of user response data, to contingency tables of frequency counts, to Pearson Chi-Square correlation statistics and perform a Chi-Squared hypothesis test. The user response data represents a multiple choice question-answer (MCQ) format. The goal is to compute all choose-two combinations of question answers (precondition, question X question) contingency tables. Each cell of the contingency table is the intersection of the users whom responded per each option per each question of the table. An overview of the problem: // data ingestion and typing schema: Observation (u: String, d: java.util.Date, t: String, q: String, v: String, a: Int) // a question (q) has a finite set of response options (v) per which a user (u) responds // additional response fields are not required per this test for (precondition a) { for (q_i in lex ordered questions) { for (q_j in lex ordered question, q_j q_i) { forall v_k \in q_i get set of distinct users {u}_ik forall v_l \in q_j get set of distinct users {u}_jl forall cells per table (a,q_i,q_j) defn C_ijkl = |intersect({u}_ik, {u}_jl)| // contingency table construct compute chisq test per this contingency table and persist } } } The scala main I'm testing is provided below, and I was planning to use the provided example https://spark.apache.org/docs/1.3.1/mllib-statistics.html however I am not sure how to go from my RDD[Observation] to the necessary precondition of RDD[Vector] for ingestion def main(args: Array[String]): Unit = { // setup main space for test val conf = new SparkConf().setAppName(TestMain) val sc = new SparkContext(conf) // data ETL and typing schema case class Observation (u: String, d: java.util.Date, t: String, q: String, v: String, a: Int) val date_format = new java.text.SimpleDateFormat(MMdd) val data_project_abs_dir = /my/path/to/data/files val data_files = data_project_abs_dir + /part-*.gz val data = sc.textFile(data_files) val observations = data.map(line = line.split(,).map(_.trim)).map(r = Observation(r(0).toString, date_format.parse(r(1).toString), r(2).toString, r(3).toString, r(4).toString, r(5).toInt)) observations.cache // ToDo: the basic keying of the space, possibly... val qvu = observations.map(o = ((o.a, o.q, o.v), o.u)).distinct // ToDo: ok, so now how to get this into the precondition RDD[Vector] from the Spark example to make a contingency table?... // ToDo: perform then persist the resulting chisq and p-value on these contingency tables... } Any help is appreciated. Thanks! -Dan
RE: GSSException when submitting Spark job in yarn-cluster mode with HiveContext APIs on Kerberos cluster
Hi Marcelo, Exactly what I need to track, thanks for the JIRA pointer. Date: Mon, 20 Apr 2015 14:03:55 -0700 Subject: Re: GSSException when submitting Spark job in yarn-cluster mode with HiveContext APIs on Kerberos cluster From: van...@cloudera.com To: alee...@hotmail.com CC: user@spark.apache.org I think you want to take a look at: https://issues.apache.org/jira/browse/SPARK-6207 On Mon, Apr 20, 2015 at 1:58 PM, Andrew Lee alee...@hotmail.com wrote: Hi All, Affected version: spark 1.2.1 / 1.2.2 / 1.3-rc1 Posting this problem to user group first to see if someone is encountering the same problem. When submitting spark jobs that invokes HiveContext APIs on a Kerberos Hadoop + YARN (2.4.1) cluster, I'm getting this error. javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] Apparently, the Kerberos ticket is not on the remote data node nor computing node since we don't deploy Kerberos tickets, and that is not a good practice either. On the other hand, we can't just SSH to every machine and run kinit for that users. This is not practical and it is insecure. The point here is that shouldn't there be a delegation token during the doAs to use the token instead of the ticket ? I'm trying to understand what is missing in Spark's HiveContext API while a normal MapReduce job that invokes Hive APIs will work, but not in Spark SQL. Any insights or feedback are appreciated. Anyone got this running without pre-deploying (pre-initializing) all tickets node by node? Is this worth filing a JIRA? 15/03/25 18:59:08 INFO hive.metastore: Trying to connect to metastore with URI thrift://alee-cluster.test.testserver.com:9083 15/03/25 18:59:08 ERROR transport.TSaslTransport: SASL negotiation failure javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212) at org.apache.thrift.transport.TSaslClientTransport.handleSaslStartMessage(TSaslClientTransport.java:94) at org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:253) at org.apache.thrift.transport.TSaslClientTransport.open(TSaslClientTransport.java:37) at org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport$1.run(TUGIAssumingTransport.java:52) at org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport$1.run(TUGIAssumingTransport.java:49) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556) at org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport.open(TUGIAssumingTransport.java:49) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.open(HiveMetaStoreClient.java:336) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.init(HiveMetaStoreClient.java:214) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1410) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.init(RetryingMetaStoreClient.java:62) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:72) at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:2453) at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:2465) at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:340) at org.apache.spark.sql.hive.HiveContext$$anonfun$4.apply(HiveContext.scala:235) at org.apache.spark.sql.hive.HiveContext$$anonfun$4.apply(HiveContext.scala:231) at scala.Option.orElse(Option.scala:257) at org.apache.spark.sql.hive.HiveContext.x$3$lzycompute(HiveContext.scala:231) at org.apache.spark.sql.hive.HiveContext.x$3(HiveContext.scala:229) at org.apache.spark.sql.hive.HiveContext.hiveconf$lzycompute(HiveContext.scala:229) at org.apache.spark.sql.hive.HiveContext.hiveconf(HiveContext.scala:229) at org.apache.spark.sql.hive.HiveMetastoreCatalog.init(HiveMetastoreCatalog.scala:55) at org.apache.spark.sql.hive.HiveContext$$anon$2.init(HiveContext.scala:253) at org.apache.spark.sql.hive.HiveContext.catalog$lzycompute(HiveContext.scala:253) at org.apache.spark.sql.hive.HiveContext.catalog(HiveContext.scala:253) at org.apache.spark.sql.hive.HiveContext$$anon$4.init(HiveContext.scala:263) at
Re: SparkSQL performance
Does anybody have an idea? a clue? a hint? Thanks! Renato M. 2015-04-20 9:31 GMT+02:00 Renato Marroquín Mogrovejo renatoj.marroq...@gmail.com: Hi all, I have a simple query Select * from tableX where attribute1 between 0 and 5 that I run over a Kryo file with four partitions that ends up being around 3.5 million rows in our case. If I run this query by doing a simple map().filter() it takes around ~9.6 seconds but when I apply schema, register the table into a SqlContext, and then run the query, it takes around ~16 seconds. This is using Spark 1.2.1 with Scala 2.10.0 I am wondering why there is such a big gap on performance if it is just a filter. Internally, the relation files are mapped to a JavaBean. This different data presentation (JavaBeans vs SparkSQL internal representation) could lead to such difference? Is there anything I could do to make the performance get closer to the hard-coded option? Thanks in advance for any suggestions or ideas. Renato M.
Re: SparkSQL performance
SparkSQL optimizes better by column pruning and predicate pushdown, primarily. Here you are not taking advantage of either. I am curious to know what goes in your filter function, as you are not using a filter in SQL side. Best Ayan On 21 Apr 2015 08:05, Renato Marroquín Mogrovejo renatoj.marroq...@gmail.com wrote: Does anybody have an idea? a clue? a hint? Thanks! Renato M. 2015-04-20 9:31 GMT+02:00 Renato Marroquín Mogrovejo renatoj.marroq...@gmail.com: Hi all, I have a simple query Select * from tableX where attribute1 between 0 and 5 that I run over a Kryo file with four partitions that ends up being around 3.5 million rows in our case. If I run this query by doing a simple map().filter() it takes around ~9.6 seconds but when I apply schema, register the table into a SqlContext, and then run the query, it takes around ~16 seconds. This is using Spark 1.2.1 with Scala 2.10.0 I am wondering why there is such a big gap on performance if it is just a filter. Internally, the relation files are mapped to a JavaBean. This different data presentation (JavaBeans vs SparkSQL internal representation) could lead to such difference? Is there anything I could do to make the performance get closer to the hard-coded option? Thanks in advance for any suggestions or ideas. Renato M.
Re: ChiSquared Test from user response flat files to RDD[Vector]?
You can find the user guide for vector creation here: http://spark.apache.org/docs/latest/mllib-data-types.html#local-vector. -Xiangrui On Mon, Apr 20, 2015 at 2:32 PM, Dan DeCapria, CivicScience dan.decap...@civicscience.com wrote: Hi Spark community, I'm very new to the Apache Spark community; but if this (very active) group is anything like the other Apache project user groups I've worked with, I'm going to enjoy discussions here very much. Thanks in advance! Use Case: I am trying to go from flat files of user response data, to contingency tables of frequency counts, to Pearson Chi-Square correlation statistics and perform a Chi-Squared hypothesis test. The user response data represents a multiple choice question-answer (MCQ) format. The goal is to compute all choose-two combinations of question answers (precondition, question X question) contingency tables. Each cell of the contingency table is the intersection of the users whom responded per each option per each question of the table. An overview of the problem: // data ingestion and typing schema: Observation (u: String, d: java.util.Date, t: String, q: String, v: String, a: Int) // a question (q) has a finite set of response options (v) per which a user (u) responds // additional response fields are not required per this test for (precondition a) { for (q_i in lex ordered questions) { for (q_j in lex ordered question, q_j q_i) { forall v_k \in q_i get set of distinct users {u}_ik forall v_l \in q_j get set of distinct users {u}_jl forall cells per table (a,q_i,q_j) defn C_ijkl = |intersect({u}_ik, {u}_jl)| // contingency table construct compute chisq test per this contingency table and persist } } } The scala main I'm testing is provided below, and I was planning to use the provided example https://spark.apache.org/docs/1.3.1/mllib-statistics.html however I am not sure how to go from my RDD[Observation] to the necessary precondition of RDD[Vector] for ingestion def main(args: Array[String]): Unit = { // setup main space for test val conf = new SparkConf().setAppName(TestMain) val sc = new SparkContext(conf) // data ETL and typing schema case class Observation (u: String, d: java.util.Date, t: String, q: String, v: String, a: Int) val date_format = new java.text.SimpleDateFormat(MMdd) val data_project_abs_dir = /my/path/to/data/files val data_files = data_project_abs_dir + /part-*.gz val data = sc.textFile(data_files) val observations = data.map(line = line.split(,).map(_.trim)).map(r = Observation(r(0).toString, date_format.parse(r(1).toString), r(2).toString, r(3).toString, r(4).toString, r(5).toInt)) observations.cache // ToDo: the basic keying of the space, possibly... val qvu = observations.map(o = ((o.a, o.q, o.v), o.u)).distinct // ToDo: ok, so now how to get this into the precondition RDD[Vector] from the Spark example to make a contingency table?... // ToDo: perform then persist the resulting chisq and p-value on these contingency tables... } Any help is appreciated. Thanks! -Dan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkStreaming onStart not being invoked on CustomReceiver attached to master with multiple workers
Responses inline. On Mon, Apr 20, 2015 at 3:27 PM, Ankit Patel patel7...@hotmail.com wrote: What you said is correct and I am expecting the printlns to be in my console or my SparkUI. I do not see it in either places. Can you actually login into the machine running the executor which runs the receiver? And then see the executors logs in that machine, to see whether the expected onStart logs are there? However, if you run the program then the printlns do print for the constructor of the receiver and the for the foreach statements with total count 0. That's expected. In all cases, with or without master, the receiver object is constructed in the driver. With master, the receiver gets serialized and sent to worker machine and rest of the prints go to the executor logs. When you run it in regular more with no master attached then you will see the counts being printed out in the console as well. Please compile my program and try it out, I have spent significant time on debugging where it can go wrong and could not find an answer. As I said, checkout the executor logs in the worker machine running the receiver. You can identify that machine from the streaming tab in the Spark UI. I also see the starting receiver logs from spark when no master is defined, but do not see it when there is. Also, I am running some other simple code with spark-submit with printlns and I do see them in my SparkUI, but not for spark streaming. That's expected. Same reason. Without master, received logs is in the same driver process. With master, they go to executor logs. As such, its not clear what you are trying to debug? If you just want to see printlns, then this seems to be the expected behavior, nothing is wrong. Use without master for convenient debugging (you can see all logs and prints), and then run into distributed mode for actual deployment (where its harder to see those logs and prints). Thanks, Ankit -- From: t...@databricks.com Date: Mon, 20 Apr 2015 13:29:31 -0700 Subject: Re: SparkStreaming onStart not being invoked on CustomReceiver attached to master with multiple workers To: patel7...@hotmail.com CC: ak...@sigmoidanalytics.com; user@spark.apache.org Well, the receiver always runs as part of an executor. When running locally (that is, spark-submit without --master), the executor is in the same process as the driver, so you see the printlns. If you are running with --master spark://cluster, then the executors ar running in different process and possibly different nodes. Hence you dont see the printlns in the output of driver process. If you see the the output of executorsin the Spark UI, then you may find those prints. TD On Mon, Apr 20, 2015 at 5:16 AM, Ankit Patel patel7...@hotmail.com wrote: The code I've written is simple as it just invokes a thread and calls a store method on the Receiver class. I see this code with printlns working fine when I try spark-submit --jars jar --class test.TestCustomReceiver jar However it does not work with I try the same command above with --master spark://masterURL spark-submit --master spark://masterURL --jars jar --class test.TestCustomReceiver jar I also tried setting the master in the conf that I am created, but that does not work either. I do not see the onStart println being printed when I use --master option. Please advice. Also, the master I am attaching to has multiple workers across hosts with many threads available to it. The code is pasted below (Classes: TestReviever, TestCustomReceiver): package test; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.receiver.Receiver; public class TestReceiver extends ReceiverString { public TestReceiver() { super(StorageLevel.MEMORY_ONLY()); System.out.println(Ankit: Created TestReceiver); } @Override public void onStart() { System.out.println(Start TestReceiver); new TestThread().start(); } public void onStop() {} @SuppressWarnings(unused) private class TestThread extends Thread{ @Override public void run() { while(true){ try{ sleep( (long) (Math.random() * 3000)); }catch(Exception e){ e.printStackTrace(); } store(Time: + System.currentTimeMillis()); } } } } package test; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.Function; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import
HiveContext vs SQLContext
Is HiveContext still preferred over SQLContext? What are the current (1.3.1) diferences between them? thanks Daniel
Re: MLlib - Naive Bayes Problem
Could you attach the full stack trace? Please also include the stack trace from executors, which you can find on the Spark WebUI. -Xiangrui On Thu, Apr 16, 2015 at 1:00 PM, riginos samarasrigi...@gmail.com wrote: I have a big dataset of categories of cars and descriptions of cars. So i want to give a description of a car and the program to classify the category of that car. So i decided to use multinomial naive Bayes. I created a unique id for each word and replaced my whole category,description data. //My input 2,25187 15095 22608 28756 17862 29523 499 32681 9830 24957 18993 19501 16596 17953 16596 20,1846 29058 16252 20446 9835 52,16861 808 26785 17874 18993 18993 18993 18269 34157 33811 18437 6004 2791 27923 19141 ... ... Why do I have errors like: //Errors 3 ERROR Executor: Exception in task 0.0 in stage 211.0 (TID 392) java.lang.IndexOutOfBoundsException: 13 not in [-13,13) ERROR Executor: Exception in task 1.0 in stage 211.0 (TID 393) java.lang.IndexOutOfBoundsException: 17 not in [-17,17) ERROR TaskSetManager: Task 0 in stage 211.0 failed 1 times; aborting job org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 211.0 failed 1 times, most recent failure: Lost task 0.0 in stage 211.0 (TID 392, localhost): java.lang.IndexOutOfBoundsException: 13 not in [-13,13) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLlib-Naive-Bayes-Problem-tp22531.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Instantiating/starting Spark jobs programmatically
Greetings, We have an analytics workflow system in production. This system is built in Java and utilizes other services (including Apache Solr). It works fine with moderate level of data/processing load. However, when the load goes beyond certain limit (e.g., more than 10 million messages/documents) delays start to show up. No doubt this is a scalability issue, and Hadoop ecosystem, especially Spark, can be handy in this situation. The simplest approach would be to rebuild the entire workflow using Spark, Kafka and other components. However, we decided to handle the problem in a couple of phases. In first phase we identified a few pain points (areas where performance suffers most) and have started building corresponding mini Spark applications (so as to take advantage of parallelism). For now my question is: how can we instantiate/start our mini Spark jobs programmatically (e.g., from Java applications)? Only option I see in the documentation is to run the jobs through command line (using spark-submit). Any insight in this area would be highly appreciated. In longer term, I want to construct a collection of mini Spark applications (each performing one specific task, similar to web services), and architect/design bigger Spark based applications which in term will call these mini Spark applications programmatically. There is a possibility that the Spark community has already started building such collection of services. Can you please provide some information/tips/best-practices in this regard? Cheers! Ajay -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Instantiating-starting-Spark-jobs-programmatically-tp22577.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL vs map reduce tableInputOutput
Please take a look at https://issues.apache.org/jira/browse/PHOENIX-1815 On Mon, Apr 20, 2015 at 10:11 AM, Jeetendra Gangele gangele...@gmail.com wrote: Thanks for reply. Does phoenix using inside Spark will be useful? what is the best way to bring data from Hbase into Spark in terms performance of application? Regards Jeetendra On 20 April 2015 at 20:49, Ted Yu yuzhih...@gmail.com wrote: To my knowledge, Spark SQL currently doesn't provide range scan capability against hbase. Cheers On Apr 20, 2015, at 7:54 AM, Jeetendra Gangele gangele...@gmail.com wrote: HI All, I am Querying Hbase and combining result and using in my spake job. I am querying hbase using Hbase client api inside my spark job. can anybody suggest me will Spark SQl will be fast enough and provide range of queries? Regards Jeetendra
Re: Spark SQL vs map reduce tableInputOutput
I think recommended use will be creating a dataframe using hbase as source. Then you can run any SQL on that DF. In 1.2 you can create a base rdd and then apply schema in the same manner On 21 Apr 2015 03:12, Jeetendra Gangele gangele...@gmail.com wrote: Thanks for reply. Does phoenix using inside Spark will be useful? what is the best way to bring data from Hbase into Spark in terms performance of application? Regards Jeetendra On 20 April 2015 at 20:49, Ted Yu yuzhih...@gmail.com wrote: To my knowledge, Spark SQL currently doesn't provide range scan capability against hbase. Cheers On Apr 20, 2015, at 7:54 AM, Jeetendra Gangele gangele...@gmail.com wrote: HI All, I am Querying Hbase and combining result and using in my spake job. I am querying hbase using Hbase client api inside my spark job. can anybody suggest me will Spark SQl will be fast enough and provide range of queries? Regards Jeetendra
Re: SparkSQL performance
There is a cost to converting from JavaBeans to Rows and this code path has not been optimized. That is likely what you are seeing. On Mon, Apr 20, 2015 at 3:55 PM, ayan guha guha.a...@gmail.com wrote: SparkSQL optimizes better by column pruning and predicate pushdown, primarily. Here you are not taking advantage of either. I am curious to know what goes in your filter function, as you are not using a filter in SQL side. Best Ayan On 21 Apr 2015 08:05, Renato Marroquín Mogrovejo renatoj.marroq...@gmail.com wrote: Does anybody have an idea? a clue? a hint? Thanks! Renato M. 2015-04-20 9:31 GMT+02:00 Renato Marroquín Mogrovejo renatoj.marroq...@gmail.com: Hi all, I have a simple query Select * from tableX where attribute1 between 0 and 5 that I run over a Kryo file with four partitions that ends up being around 3.5 million rows in our case. If I run this query by doing a simple map().filter() it takes around ~9.6 seconds but when I apply schema, register the table into a SqlContext, and then run the query, it takes around ~16 seconds. This is using Spark 1.2.1 with Scala 2.10.0 I am wondering why there is such a big gap on performance if it is just a filter. Internally, the relation files are mapped to a JavaBean. This different data presentation (JavaBeans vs SparkSQL internal representation) could lead to such difference? Is there anything I could do to make the performance get closer to the hard-coded option? Thanks in advance for any suggestions or ideas. Renato M.
Re: spark sql error with proto/parquet
You are probably using an encoding that we don't support. I think this PR may be adding that support: https://github.com/apache/spark/pull/5422 On Sat, Apr 18, 2015 at 5:40 PM, Abhishek R. Singh abhis...@tetrationanalytics.com wrote: I have created a bunch of protobuf based parquet files that I want to read/inspect using Spark SQL. However, I am running into exceptions and not able to proceed much further: This succeeds successfully (probably because there is no action yet). I can also printSchema() and count() without any issues: scala val df = sqlContext.load(“my_root_dir/201504101000, parquet) scala df.select(df(summary)).first 15/04/18 17:03:03 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 5.0 (TID 27, xxx.yyy.com): parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file hdfs://xxx.yyy.com:8020/my_root_dir/201504101000/0.parquet at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213) at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:143) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to (TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:122) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:122) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: parquet.io.ParquetDecodingException: The requested schema is not compatible with the file schema. incompatible types: optional group … I could convert my protos into json and then back to parquet, but that seems wasteful ! Also, I will be happy to contribute and make protobuf work with Spark SQL if I can get some guidance/help/pointers. Help appreciated. -Abhishek-
Re: Updating a Column in a DataFrame
You can always create another DF using a map. In reality operations are lazy so only final value will get computed. Can you provide the usecase in little more detail? On 21 Apr 2015 08:39, ARose ashley.r...@telarix.com wrote: In my Java application, I want to update the values of a Column in a given DataFrame. However, I realize DataFrames are immutable, and therefore cannot be updated by conventional means. Is there a workaround for this sort of transformation? If so, can someone provide an example? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Updating-a-Column-in-a-DataFrame-tp22578.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Why is Columnar Parquet used as default for saving Row-based DataFrames/RDD?
Hello, I have the above naive question if anyone could help. Why not using a Row-based File format to save Row-based DataFrames/RDD? Thanks, Lan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Why-is-Columnar-Parquet-used-as-default-for-saving-Row-based-DataFrames-RDD-tp22579.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: how to make a spark cluster ?
Thank you :) On Mon, Apr 20, 2015 at 4:46 PM, Jörn Franke jornfra...@gmail.com wrote: Hi, If you have just one physical machine then I would try out Docker instead of a full VM (would be waste of memory and CPU). Best regards Le 20 avr. 2015 00:11, hnahak harihar1...@gmail.com a écrit : Hi All, I've big physical machine with 16 CPUs , 256 GB RAM, 20 TB Hard disk. I just need to know what should be the best solution to make a spark cluster? If I need to process TBs of data then 1. Only one machine, which contain driver, executor, job tracker and task tracker everything. 2. create 4 VMs and each VM should consist 4 CPUs , 64 GB RAM 3. create 8 VMs and each VM should consist 2 CPUs , 32 GB RAM each please give me your views/suggestions -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-make-a-spark-cluster-tp22563.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- {{{H2N}}}-(@:
RE: SparkStreaming onStart not being invoked on CustomReceiver attached to master with multiple workers
What you said is correct and I am expecting the printlns to be in my console or my SparkUI. I do not see it in either places. However, if you run the program then the printlns do print for the constructor of the receiver and the for the foreach statements with total count 0. When you run it in regular more with no master attached then you will see the counts being printed out in the console as well. Please compile my program and try it out, I have spent significant time on debugging where it can go wrong and could not find an answer. I also see the starting receiver logs from spark when no master is defined, but do not see it when there is. Also, I am running some other simple code with spark-submit with printlns and I do see them in my SparkUI, but not for spark streaming. Thanks,Ankit From: t...@databricks.com Date: Mon, 20 Apr 2015 13:29:31 -0700 Subject: Re: SparkStreaming onStart not being invoked on CustomReceiver attached to master with multiple workers To: patel7...@hotmail.com CC: ak...@sigmoidanalytics.com; user@spark.apache.org Well, the receiver always runs as part of an executor. When running locally (that is, spark-submit without --master), the executor is in the same process as the driver, so you see the printlns. If you are running with --master spark://cluster, then the executors ar running in different process and possibly different nodes. Hence you dont see the printlns in the output of driver process. If you see the the output of executorsin the Spark UI, then you may find those prints. TD On Mon, Apr 20, 2015 at 5:16 AM, Ankit Patel patel7...@hotmail.com wrote: The code I've written is simple as it just invokes a thread and calls a store method on the Receiver class. I see this code with printlns working fine when I try spark-submit --jars jar --class test.TestCustomReceiver jar However it does not work with I try the same command above with --master spark://masterURLspark-submit --master spark://masterURL --jars jar --class test.TestCustomReceiver jar I also tried setting the master in the conf that I am created, but that does not work either. I do not see the onStart println being printed when I use --master option. Please advice. Also, the master I am attaching to has multiple workers across hosts with many threads available to it. The code is pasted below (Classes: TestReviever, TestCustomReceiver): package test;import org.apache.spark.storage.StorageLevel;import org.apache.spark.streaming.receiver.Receiver; public class TestReceiver extends ReceiverString { public TestReceiver() { super(StorageLevel.MEMORY_ONLY()); System.out.println(Ankit: Created TestReceiver); } @Override public void onStart() { System.out.println(Start TestReceiver);new TestThread().start(); } public void onStop() {} @SuppressWarnings(unused) private class TestThread extends Thread{ @Override public void run() { while(true){ try{ sleep( (long) (Math.random() * 3000)); }catch(Exception e){ e.printStackTrace(); } store(Time: + System.currentTimeMillis()); } } } } package test;import org.apache.spark.SparkConf;import org.apache.spark.api.java.function.Function;import org.apache.spark.streaming.Duration;import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;import org.apache.spark.streaming.api.java.JavaStreamingContext; public class TestCustomReceiver { public static void main(String[] args){ SparkConf conf = new SparkConf(); JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000)); TestReceiver receiver = new TestReceiver(); JavaReceiverInputDStreamString stream = ssc.receiverStream(receiver); stream.map(new Function(){ @Override public Object call(Object arg0) throws Exception { System.out.println(Received: + arg0); return arg0; } }).foreachRDD(new Function(){ @Override public Object call(Object arg0) throws Exception { System.out.println(Total Count: + ((org.apache.spark.api.java.JavaRDD)arg0).count()); return arg0; } }); ssc.start(); ssc.awaitTermination(); } } Thanks,Ankit Date: Mon, 20 Apr 2015 12:22:03 +0530 Subject: Re: SparkStreaming onStart not being invoked on CustomReceiver attached to master with multiple workers From:
Updating a Column in a DataFrame
In my Java application, I want to update the values of a Column in a given DataFrame. However, I realize DataFrames are immutable, and therefore cannot be updated by conventional means. Is there a workaround for this sort of transformation? If so, can someone provide an example? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Updating-a-Column-in-a-DataFrame-tp22578.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SQL UserDefinedType can't be saved in parquet file when using assembly jar
You should check where MyDenseVectorUDT is defined and whether it was on the classpath (or in the assembly jar) at runtime. Make sure the full class name (with package name) is used. Btw, UDTs are not public yet, so please use it with caution. -Xiangrui On Fri, Apr 17, 2015 at 12:45 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Dear all, Here is an example of code to reproduce the issue I mentioned in a previous mail about saving an UserDefinedType into a parquet file. The problem here is that the code works when I run it inside intellij idea but fails when I create the assembly jar and run it with spark-submit. I use the master version of Spark. @SQLUserDefinedType(udt = classOf[MyDenseVectorUDT]) class MyDenseVector(val data: Array[Double]) extends Serializable { override def equals(other: Any): Boolean = other match { case v: MyDenseVector = java.util.Arrays.equals(this.data, v.data) case _ = false } } class MyDenseVectorUDT extends UserDefinedType[MyDenseVector] { override def sqlType: DataType = ArrayType(DoubleType, containsNull = false) override def serialize(obj: Any): Seq[Double] = { obj match { case features: MyDenseVector = features.data.toSeq } } override def deserialize(datum: Any): MyDenseVector = { datum match { case data: Seq[_] = new MyDenseVector(data.asInstanceOf[Seq[Double]].toArray) } } override def userClass: Class[MyDenseVector] = classOf[MyDenseVector] } case class Toto(imageAnnotation: MyDenseVector) object TestUserDefinedType { case class Params(input: String = null, partitions: Int = 12, outputDir: String = images.parquet) def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName(ImportImageFolder).setMaster(local[4]) val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val rawImages = sc.parallelize((1 to 5).map(x = Toto(new MyDenseVector(Array[Double](x.toDouble).toDF rawImages.printSchema() rawImages.show() rawImages.save(toto.parquet) // This fails with assembly jar sc.stop() } } My build.sbt is as follow : libraryDependencies ++= Seq( org.apache.spark %% spark-core % sparkVersion % provided, org.apache.spark %% spark-sql % sparkVersion, org.apache.spark %% spark-mllib % sparkVersion ) assemblyMergeStrategy in assembly := { case PathList(javax, servlet, xs @ _*) = MergeStrategy.first case PathList(org, apache, xs @ _*) = MergeStrategy.first case PathList(org, jboss, xs @ _*) = MergeStrategy.first // case PathList(ps @ _*) if ps.last endsWith .html = MergeStrategy.first // case application.conf= MergeStrategy.concat case m if m.startsWith(META-INF) = MergeStrategy.discard //case x = // val oldStrategy = (assemblyMergeStrategy in assembly).value // oldStrategy(x) case _ = MergeStrategy.first } As I said, this code works without problem when I execute it inside intellij idea. But when generate the assembly jar with sbt-assembly and use spark-submit I got the following error : 15/04/17 09:34:01 INFO ParquetOutputFormat: Writer version is: PARQUET_1_0 15/04/17 09:34:01 ERROR Executor: Exception in task 3.0 in stage 2.0 (TID 7) java.lang.IllegalArgumentException: Unsupported dataType: {type:struct,fields:[{name:imageAnnotation,type:{type:udt,class:MyDenseVectorUDT,pyClass:null,sqlType:{type:array,elementType:double,containsNull:false}},nullable:true,metadata:{}}]}, [1.1] failure: `TimestampType' expected but `{' found {type:struct,fields:[{name:imageAnnotation,type:{type:udt,class:MyDenseVectorUDT,pyClass:null,sqlType:{type:array,elementType:double,containsNull:false}},nullable:true,metadata:{}}]} ^ at org.apache.spark.sql.types.DataType$CaseClassStringParser$.apply(dataTypes.scala:163) at org.apache.spark.sql.types.DataType$.fromCaseClassString(dataTypes.scala:98) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402) at scala.util.Try.getOrElse(Try.scala:77) at org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromString(ParquetTypes.scala:402) at org.apache.spark.sql.parquet.RowWriteSupport.init(ParquetTableSupport.scala:145) at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:278) at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252) at org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:694) at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:716) at
Re: Spark Performance on Yarn
I got exactly the same problem, except that I'm running on a standalone master. Can you tell me the counterpart parameter on standalone master for increasing the same memroy overhead? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Performance-on-Yarn-tp21729p22580.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
MLlib - Collaborative Filtering - trainImplicit task size
I keep seeing these warnings when using trainImplicit: WARN TaskSetManager: Stage 246 contains a task of very large size (208 KB). The maximum recommended task size is 100 KB. And then the task size starts to increase. Is this a known issue ? Thanks ! -- Blog http://blog.christianperone.com | Github https://github.com/perone | Twitter https://twitter.com/tarantulae Forgive, O Lord, my little jokes on Thee, and I'll forgive Thy great big joke on me.
meet weird exception when studying rdd caching
Hi, I am studying the RDD Caching function and write a small program to verify it. I run the program in a Spark1.3.0 environment and on Yarn cluster. But I meet a weird exception. It isn't always generated in the log. Only sometimes I can see this exception. And it does not affect the output of my program. Could anyone explain why this happens and how to eliminate it? My program and the exception is listed in the following. Thanks very much for the help! *Program* object TestSparkCaching01 { def main(args: Array[String]) { val conf = new SparkConf() conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer) conf.set(spark.kryo.registrationRequired,true) conf.registerKryoClasses(Array(classOf[MyClass1],classOf[Array[MyClass1]])) val inFile = hdfs://bgdt-dev-hrb/user/spark/tst/charset/A_utf8.txt val sc = new SparkContext(conf) val rdd = sc.textFile(inFile) rdd.cache() rdd.map(Cache String: +_).foreach(println ) sc.stop() } } *Exception* 15/04/21 09:58:25 WARN channel.DefaultChannelPipeline: An exception was thrown by an exception handler. java.util.concurrent.RejectedExecutionException: Worker has already been shutdown at org.jboss.netty.channel.socket.nio.AbstractNioSelector.registerTask(AbstractNioSelector.java:120) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.executeInIoThread(AbstractNioWorker.java:72) at org.jboss.netty.channel.socket.nio.NioWorker.executeInIoThread(NioWorker.java:36) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.executeInIoThread(AbstractNioWorker.java:56) at org.jboss.netty.channel.socket.nio.NioWorker.executeInIoThread(NioWorker.java:36) at org.jboss.netty.channel.socket.nio.AbstractNioChannelSink.execute(AbstractNioChannelSink.java:34) at org.jboss.netty.channel.Channels.fireExceptionCaughtLater(Channels.java:496) at org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:46) at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:54) at org.jboss.netty.channel.Channels.disconnect(Channels.java:781) at org.jboss.netty.channel.AbstractChannel.disconnect(AbstractChannel.java:211) at akka.remote.transport.netty.NettyTransport$$anonfun$gracefulClose$1.apply(NettyTransport.scala:223) at akka.remote.transport.netty.NettyTransport$$anonfun$gracefulClose$1.apply(NettyTransport.scala:222) at scala.util.Success.foreach(Try.scala:205) at scala.concurrent.Future$$anonfun$foreach$1.apply(Future.scala:204) at scala.concurrent.Future$$anonfun$foreach$1.apply(Future.scala:204) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 15/04/21 09:58:25 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
Re: Equi Join is taking for ever. 1 Task is Running while other 199 are complete
It also is a little more evidence to Jonathan's suggestion that there is a null / 0 record that is getting grouped together. To fix this, do i need to run a filter ? val viEventsRaw = details.map { vi = (vi.get(14).asInstanceOf[Long], vi) } val viEvents = viEventsRaw.filter { case (itemId, viEvent) = itemId != 0 } On Wed, Apr 15, 2015 at 2:04 AM, Imran Rashid iras...@cloudera.com wrote: Shuffle write could be a good indication of skew, but it looks like the task in question hasn't generated any shuffle write yet, because its still working on the shuffle-read side. So I wouldn't read too much into the fact that the shuffle write is 0 for a task that is still running. The shuffle read is larger than for the other tasks (3.0GB vs. 2.2 GB, or more importantly, 55M records vs 1M records). So it might not be that the raw data volume is much higher on that task, but its getting a ton more small records, which will also generate a lot of work. It also is a little more evidence to Jonathan's suggestion that there is a null / 0 record that is getting grouped together. On Mon, Apr 13, 2015 at 12:40 PM, Jonathan Coveney jcove...@gmail.com wrote: I'm not 100% sure of spark's implementation but in the MR frameworks, it would have a much larger shuffle write size becasue that node is dealing with a lot more data and as a result has a lot more to shuffle 2015-04-13 13:20 GMT-04:00 java8964 java8...@hotmail.com: If it is really due to data skew, will the task hanging has much bigger Shuffle Write Size in this case? In this case, the shuffle write size for that task is 0, and the rest IO of this task is not much larger than the fast finished tasks, is that normal? I am also interested in this case, as from statistics on the UI, how it indicates the task could have skew data? Yong -- Date: Mon, 13 Apr 2015 12:58:12 -0400 Subject: Re: Equi Join is taking for ever. 1 Task is Running while other 199 are complete From: jcove...@gmail.com To: deepuj...@gmail.com CC: user@spark.apache.org I can promise you that this is also a problem in the pig world :) not sure why it's not a problem for this data set, though... are you sure that the two are doing the exact same code? you should inspect your source data. Make a histogram for each and see what the data distribution looks like. If there is a value or bucket with a disproportionate set of values you know you have an issue 2015-04-13 12:50 GMT-04:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com: You mean there is a tuple in either RDD, that has itemID = 0 or null ? And what is catch all ? That implies is it a good idea to run a filter on each RDD first ? We do not do this using Pig on M/R. Is it required in Spark world ? On Mon, Apr 13, 2015 at 9:58 PM, Jonathan Coveney jcove...@gmail.com wrote: My guess would be data skew. Do you know if there is some item id that is a catch all? can it be null? item id 0? lots of data sets have this sort of value and it always kills joins 2015-04-13 11:32 GMT-04:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com: Code: val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))] = lstgItem.join(viEvents).map { case (itemId, (listing, viDetail)) = val viSummary = new VISummary viSummary.leafCategoryId = listing.getLeafCategId().toInt viSummary.itemSiteId = listing.getItemSiteId().toInt viSummary.auctionTypeCode = listing.getAuctTypeCode().toInt viSummary.sellerCountryId = listing.getSlrCntryId().toInt viSummary.buyerSegment = 0 viSummary.isBin = (if (listing.getBinPriceLstgCurncy.doubleValue() 0) 1 else 0) val sellerId = listing.getSlrId.toLong (sellerId, (viDetail, viSummary, itemId)) } Running Tasks: Tasks IndexIDAttemptStatus ▾Locality LevelExecutor ID / HostLaunch Time DurationGC TimeShuffle Read Size / RecordsWrite TimeShuffle Write Size / RecordsShuffle Spill (Memory)Shuffle Spill (Disk)Errors 0 216 0 RUNNING PROCESS_LOCAL 181 / phxaishdc9dn0474.phx.ebay.com 2015/04/13 06:43:53 1.7 h 13 min 3.0 GB / 56964921 0.0 B / 0 21.2 GB 1902.6 MB 2 218 0 SUCCESS PROCESS_LOCAL 582 / phxaishdc9dn0235.phx.ebay.com 2015/04/13 06:43:53 15 min 31 s 2.2 GB / 1666851 0.1 s 3.0 MB / 2062 54.8 GB 1924.5 MB 1 217 0 SUCCESS PROCESS_LOCAL 202 / phxdpehdc9dn2683.stratus.phx.ebay.com 2015/04/13 06:43:53 19 min 1.3 min 2.2 GB / 1687086 75 ms 3.9 MB / 2692 33.7 GB 1960.4 MB 4 220 0 SUCCESS PROCESS_LOCAL 218 / phxaishdc9dn0855.phx.ebay.com 2015/04/13 06:43:53 15 min 56 s 2.2 GB / 1675654 40 ms 3.3 MB / 2260 26.2 GB 1928.4 MB Command: ./bin/spark-submit -v --master yarn-cluster --driver-class-path
Re: Map-Side Join in Spark
In my understanding you need to create a key out of the data and repartition both datasets to achieve map side join. On 21 Apr 2015 14:10, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Can someone share their working code of Map Side join in Spark + Scala. (No Spark-SQL) The only resource i could find was this (Open in chrome with Chinese to english translator) http://dongxicheng.org/framework-on-yarn/apache-spark-join-two-tables/ -- Deepak
Spark and accumulo
Hi all, Is there anything to integrate spark with accumulo or make spark to process over accumulo data? Thanks Madhvi Gupta - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Instantiating/starting Spark jobs programmatically
I have built a data analytics SaaS platform by creating Rest end points and based on the type of job request I would invoke the necessary spark job/jobs and return the results as json(async). I used yarn-client mode to submit the jobs to yarn cluster. hope this helps. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Instantiating-starting-Spark-jobs-programmatically-tp22577p22584.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Equi Join is taking for ever. 1 Task is Running while other 199 are complete
After the above changes 1) filter shows this Tasks IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch Time DurationGC TimeInput Size / RecordsWrite TimeShuffle Write Size / Records Errors 0 1 0 SUCCESS ANY 1 / phxaishdc9dn1571.stratus.phx.ebay.com 2015/04/20 20:55:31 7.4 min 21 s 129.7 MB (hadoop) / 100 18 s 1106.2 MB / 718687 2) lstgItem.join(viEvents).map [Equi Join] shows this Tasks IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch Time DurationGC TimeShuffle Read Size / RecordsWrite TimeShuffle Write Size / RecordsShuffle Spill (Memory)Shuffle Spill (Disk)Errors 0 17 0 RUNNING PROCESS_LOCAL 8 / phxaishdc9dn0556.phx.ebay.com 2015/04/20 21:02:56 4.3 min 20 s 1097.3 MB / 55906817 0.0 B / 0 0.0 B 0.0 B 1 18 0 SUCCESS PROCESS_LOCAL 3 / phxaishdc9dn0374.phx.ebay.com 2015/04/20 21:02:56 1.4 min 1 s 251.0 MB / 831341 2 ms 377.8 KB / 226 9.6 GB 173.3 MB 2 19 0 SUCCESS PROCESS_LOCAL 9 / phxaishdc9dn0121.phx.ebay.com 2015/04/20 21:02:56 2.1 min 4 s 250.6 MB / 830896 89 ms 280.4 KB / 168 4.4 GB 267.9 MB 3 20 0 SUCCESS PROCESS_LOCAL 4 / phxaishdc9dn1703.stratus.phx.ebay.com 2015/04/20 21:02:56 1.9 min 1.0 s 250.6 MB / 831180 2 ms 330.3 KB / 198 7.4 GB 285.2 MB 4 21 0 SUCCESS PROCESS_LOCAL 5 / phxaishdc9dn1350.stratus.phx.ebay.com 2015/04/20 21:02:56 2.1 min 3 s 249.7 MB / 830966 3 ms 303.9 KB / 182 3.7 GB 282.8 MB Task 0/17 will run for 30 minutes. I was wondering if increasing the input data size executors will solve this problem ? Will Map Side join help ? On Tue, Apr 21, 2015 at 9:23 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: It also is a little more evidence to Jonathan's suggestion that there is a null / 0 record that is getting grouped together. To fix this, do i need to run a filter ? val viEventsRaw = details.map { vi = (vi.get(14).asInstanceOf[Long], vi) } val viEvents = viEventsRaw.filter { case (itemId, viEvent) = itemId != 0 } On Wed, Apr 15, 2015 at 2:04 AM, Imran Rashid iras...@cloudera.com wrote: Shuffle write could be a good indication of skew, but it looks like the task in question hasn't generated any shuffle write yet, because its still working on the shuffle-read side. So I wouldn't read too much into the fact that the shuffle write is 0 for a task that is still running. The shuffle read is larger than for the other tasks (3.0GB vs. 2.2 GB, or more importantly, 55M records vs 1M records). So it might not be that the raw data volume is much higher on that task, but its getting a ton more small records, which will also generate a lot of work. It also is a little more evidence to Jonathan's suggestion that there is a null / 0 record that is getting grouped together. On Mon, Apr 13, 2015 at 12:40 PM, Jonathan Coveney jcove...@gmail.com wrote: I'm not 100% sure of spark's implementation but in the MR frameworks, it would have a much larger shuffle write size becasue that node is dealing with a lot more data and as a result has a lot more to shuffle 2015-04-13 13:20 GMT-04:00 java8964 java8...@hotmail.com: If it is really due to data skew, will the task hanging has much bigger Shuffle Write Size in this case? In this case, the shuffle write size for that task is 0, and the rest IO of this task is not much larger than the fast finished tasks, is that normal? I am also interested in this case, as from statistics on the UI, how it indicates the task could have skew data? Yong -- Date: Mon, 13 Apr 2015 12:58:12 -0400 Subject: Re: Equi Join is taking for ever. 1 Task is Running while other 199 are complete From: jcove...@gmail.com To: deepuj...@gmail.com CC: user@spark.apache.org I can promise you that this is also a problem in the pig world :) not sure why it's not a problem for this data set, though... are you sure that the two are doing the exact same code? you should inspect your source data. Make a histogram for each and see what the data distribution looks like. If there is a value or bucket with a disproportionate set of values you know you have an issue 2015-04-13 12:50 GMT-04:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com: You mean there is a tuple in either RDD, that has itemID = 0 or null ? And what is catch all ? That implies is it a good idea to run a filter on each RDD first ? We do not do this using Pig on M/R. Is it required in Spark world ? On Mon, Apr 13, 2015 at 9:58 PM, Jonathan Coveney jcove...@gmail.com wrote: My guess would be data skew. Do you know if there is some item id that is a catch all? can it be null? item id 0? lots of data sets have this sort of value and it always kills joins 2015-04-13 11:32 GMT-04:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com: Code: val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))] = lstgItem.join(viEvents).map { case (itemId, (listing, viDetail)) = val viSummary = new VISummary viSummary.leafCategoryId =
Map-Side Join in Spark
Can someone share their working code of Map Side join in Spark + Scala. (No Spark-SQL) The only resource i could find was this (Open in chrome with Chinese to english translator) http://dongxicheng.org/framework-on-yarn/apache-spark-join-two-tables/ -- Deepak
Re: Map-Side Join in Spark
Could you do it using flatMap? Punya On Tue, Apr 21, 2015 at 12:19 AM ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: The reason am asking this is, i am not able to understand how do i do a skip. 1) Broadcast small table-1 as map. 2) I jun do .map() on large table-2. When you do .map() you must map each element to a new element. However with map-side join, when i get the broadcasted map, i will search in it with a key, and if that element in not found in map then i want to skip that input all together. (This is what happens when you do .join, it skips automatically). With map side join you need to do it. I am assuming you do it with mapPartitions yield. A working code will help me understand it better. On Tue, Apr 21, 2015 at 9:40 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Can someone share their working code of Map Side join in Spark + Scala. (No Spark-SQL) The only resource i could find was this (Open in chrome with Chinese to english translator) http://dongxicheng.org/framework-on-yarn/apache-spark-join-two-tables/ -- Deepak -- Deepak
Re: Map-Side Join in Spark
I did this val lstgItemMap = listings.map { lstg = (lstg.getItemId().toLong, lstg) }.collectAsMap val broadCastMap = sc.broadcast(lstgItemMap) val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))] = viEvents.mapPartitions({ iter = val lstgItemMap = broadCastMap.value for { (itemId, viDetail) - iter if (lstgItemMap.contains(itemId)) } yield ({ val listing = lstgItemMap.get(itemId).get val viSummary = new VISummary viSummary.leafCategoryId = listing.getLeafCategId().toInt viSummary.itemSiteId = listing.getItemSiteId().toInt viSummary.auctionTypeCode = listing.getAuctTypeCode().toInt viSummary.sellerCountryId = listing.getSlrCntryId().toInt viSummary.buyerSegment = 0 viSummary.isBin = (if (listing.getBinPriceLstgCurncy.doubleValue() 0) 1 else 0) val sellerId = listing.getSlrId.toLong (sellerId, (viDetail, viSummary, itemId)) }) }) Earlier : val viEvents = details.map { vi = (vi.get(14).asInstanceOf[Long], vi) } val lstgItem = listings.map { lstg = (lstg.getItemId().toLong, lstg) } val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))] = lstgItem.join(viEvents).map { case (itemId, (listing, viDetail)) = val viSummary = new VISummary viSummary.leafCategoryId = listing.getLeafCategId().toInt viSummary.itemSiteId = listing.getItemSiteId().toInt viSummary.auctionTypeCode = listing.getAuctTypeCode().toInt viSummary.sellerCountryId = listing.getSlrCntryId().toInt viSummary.buyerSegment = 0 viSummary.isBin = (if (listing.getBinPriceLstgCurncy.doubleValue() 0) 1 else 0) val sellerId = listing.getSlrId.toLong (sellerId, (viDetail, viSummary, itemId)) } Waiting for run to complete. On Tue, Apr 21, 2015 at 9:54 AM, Punyashloka Biswal punya.bis...@gmail.com wrote: Could you do it using flatMap? Punya On Tue, Apr 21, 2015 at 12:19 AM ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: The reason am asking this is, i am not able to understand how do i do a skip. 1) Broadcast small table-1 as map. 2) I jun do .map() on large table-2. When you do .map() you must map each element to a new element. However with map-side join, when i get the broadcasted map, i will search in it with a key, and if that element in not found in map then i want to skip that input all together. (This is what happens when you do .join, it skips automatically). With map side join you need to do it. I am assuming you do it with mapPartitions yield. A working code will help me understand it better. On Tue, Apr 21, 2015 at 9:40 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Can someone share their working code of Map Side join in Spark + Scala. (No Spark-SQL) The only resource i could find was this (Open in chrome with Chinese to english translator) http://dongxicheng.org/framework-on-yarn/apache-spark-join-two-tables/ -- Deepak -- Deepak -- Deepak
Re: invalidate caching for hadoopFile input?
You can use rdd.unpersist. its documented in spark programming guide page under Removing Data section. Ayan On 21 Apr 2015 13:16, Wei Wei vivie...@gmail.com wrote: Hey folks, I am trying to load a directory of avro files like this in spark-shell: val data = sqlContext.avroFile(hdfs://path/to/dir/*).cache data.count This works fine, but when more files are uploaded to that directory running these two lines again yields the same result. I suspect there is some metadata caching in HadoopRDD, thus new files are ignored. Does anyone know why this is happening? Is there a way to force reload the whole directory without restarting spark-shell? Thanks. W - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
invalidate caching for hadoopFile input?
Hey folks, I am trying to load a directory of avro files like this in spark-shell: val data = sqlContext.avroFile(hdfs://path/to/dir/*).cache data.count This works fine, but when more files are uploaded to that directory running these two lines again yields the same result. I suspect there is some metadata caching in HadoopRDD, thus new files are ignored. Does anyone know why this is happening? Is there a way to force reload the whole directory without restarting spark-shell? Thanks. W - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Map-Side Join in Spark
The reason am asking this is, i am not able to understand how do i do a skip. 1) Broadcast small table-1 as map. 2) I jun do .map() on large table-2. When you do .map() you must map each element to a new element. However with map-side join, when i get the broadcasted map, i will search in it with a key, and if that element in not found in map then i want to skip that input all together. (This is what happens when you do .join, it skips automatically). With map side join you need to do it. I am assuming you do it with mapPartitions yield. A working code will help me understand it better. On Tue, Apr 21, 2015 at 9:40 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Can someone share their working code of Map Side join in Spark + Scala. (No Spark-SQL) The only resource i could find was this (Open in chrome with Chinese to english translator) http://dongxicheng.org/framework-on-yarn/apache-spark-join-two-tables/ -- Deepak -- Deepak
Re: Map-Side Join in Spark
What is re-partition ? On Tue, Apr 21, 2015 at 10:23 AM, ayan guha guha.a...@gmail.com wrote: In my understanding you need to create a key out of the data and repartition both datasets to achieve map side join. On 21 Apr 2015 14:10, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Can someone share their working code of Map Side join in Spark + Scala. (No Spark-SQL) The only resource i could find was this (Open in chrome with Chinese to english translator) http://dongxicheng.org/framework-on-yarn/apache-spark-join-two-tables/ -- Deepak -- Deepak
Re: Streaming problems running 24x7
You have a window operation; I have seen that behaviour before with window operations in spark streaming. My solution was to move away from window operations using probabilistic data structures; it might not be an option for you. 2015-04-20 10:29 GMT+01:00 Marius Soutier mps@gmail.com: The processing speed displayed in the UI doesn’t seem to take everything into account. I also had a low processing time but had to increase batch duration from 30 seconds to 1 minute because waiting batches kept increasing. Now it runs fine. On 17.04.2015, at 13:30, González Salgado, Miquel miquel.gonza...@tecsidel.es wrote: Hi, Thank you for your response, I think it is not because of the processing speed, in fact the delay is under 1 second, while the batch interval is 10 seconds… The data volume is low (10 lines / second) Changing to local[8] was worsening the problem (cpu increase more quickly) By the way, I have seen some results changing to this call of Kafkautils: KafkaUtils.createDirectStream CPU usage is low and stable, but memory is slowly increasing… But at least the process last longer.. Best regards, Miquel *De:* bit1...@163.com [mailto:bit1...@163.com bit1...@163.com] *Enviado el:* jueves, 16 de abril de 2015 10:58 *Para:* González Salgado, Miquel; user *Asunto:* Re: Streaming problems running 24x7 From your description, looks like the data processing speed is far behind the data receiving speed Could you try to increase the core number when you submit the application? such as local[8]? -- bit1...@163.com *From:* Miquel miquel.gonza...@tecsidel.es *Date:* 2015-04-16 16:39 *To:* user user@spark.apache.org *Subject:* Streaming problems running 24x7 Hello, I'm finding problems to run a spark streaming job for more than a few hours (3 or 4). It begins working OK, but it degrades until failure. Some of the symptoms: - Consumed memory and CPU keeps getting higher ang higher, and finally some error is being thrown (java.lang.Exception: Could not compute split, block input-0-1429168311800 not found) and data stops being calculated. - The delay showed in web UI keeps also increasing. - After some hours disk space is being consumed. There are a lot of directories with name like /tmp/spark-e3505437-f509-4b5b-92d2-ae2559badb3c The job is basically reading information from kafka topic, and calculate several topN tables for some key and value camps related with netflow data, some of the parameters are this: - batch interval: 10 seconds - window calculation: 1 minute - spark.cleaner.ttl: 5 minutes The execution is standalone on one machine (16GB RAM , 12 cores), and the options to run it is as follows: /opt/spark/bin/spark-submit --driver-java-options -XX:+UseCompressedOops --jars $JARS --class $APPCLASS --master local[2] $APPJAR someone has some clues about the problem? I don't know if it is a configuration problem or some error in the code that is causing memory leaks.. Thank you in advance! Miquel PD: the code is basically this:-- object NetflowTopn { var appPath = . var zkQuorum = var group = var topics = var numThreads = 1 var batch_interval = 10 var n_window = 1 var n_slide = 1 var topnsize = 10 var hm = Map[String,Int]() hm += ( unix_secs - 0 ) hm += ( unix_nsecs - 1 ) hm += ( sysuptime - 2 ) hm += ( exaddr - 3 ) hm += ( dpkts - 4 ) hm += ( doctets -5 ) hm += ( first - 6 ) hm += ( last - 7 ) hm += ( engine_type - 8 ) hm += ( engine_id - 9 ) hm += ( srcaddr -10 ) hm += ( dstaddr -11 ) hm += ( nexthop -12 ) hm += ( input - 13 ) hm += ( output - 14 ) hm += ( srcport -15 ) hm += ( dstport -16 ) hm += ( prot - 17 ) hm += ( tos -18 ) hm += ( tcp_flags - 19 ) hm += ( src_mask - 20 ) hm += ( dst_mask - 21 ) hm += ( src_as - 22 ) hm += ( dst_as - 23 ) def getKey (lcamps: Array[String], camp: String): String = { if (camp == total) return total else return lcamps(hm(camp)) } def getVal (lcamps: Array[String], camp: String): Long = { if (camp == flows) return 1L else return lcamps(hm(camp)).toLong } def getKeyVal (line: String, keycamps: List[String], valcamp: String ) = { val arr = line.split(,) (keycamps.map(getKey(arr, _)).mkString(,) , getVal(arr,valcamp) ) } def writeOutput (data: Array[(Long, String)], keycamps_str: String, csvheader: String, valcamp: String, prefix: String) = { val ts = System.currentTimeMillis val f1 = appPath + /data/ + prefix + _ + keycamps_str + _ + valcamp + .csv val f1f = new File(f1); val ftmpf = new File(f1 + ts); val pw = new PrintWriter(ftmpf) pw.println(csvheader) data.foreach{ t =
Re: writing to hdfs on master node much faster
What machines are HDFS data nodes -- just your master? that would explain it. Otherwise, is it actually the write that's slow or is something else you're doing much faster on the master for other reasons maybe? like you're actually shipping data via the master first in some local computation? so the master's executor has the result much faster? On Mon, Apr 20, 2015 at 12:21 PM, jamborta jambo...@gmail.com wrote: Hi all, I have a three node cluster with identical hardware. I am trying a workflow where it reads data from hdfs, repartitions it and runs a few map operations then writes the results back to hdfs. It looks like that all the computation, including the repartitioning and the maps complete within similar time intervals on all the nodes, except when it writes it back to HDFS when the master node does the job way much faster then the slaves (15s for each block as opposed to 1.2 min for the slaves). Any suggestion what the reason might be? thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/writing-to-hdfs-on-master-node-much-faster-tp22570.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
When the old data dropped from the cache?
Hi all, On https://spark.apache.org/docs/latest/programming-guide.html under the RDD Persistence Removing Data, it states Spark automatically monitors cache usage on each node and drops out old data partitions in a least-recently-used (LRU) fashion. Can it be understood that the cache will be automatically refreshed with new data. If yes when and how? How Spark determines the old data? Regards.
Re: writing to hdfs on master node much faster
Not sure what would slow it down as the repartition completes equally fast on all nodes, implying that the data is available on all, then there are a few computation steps none of them local on the master. On Mon, Apr 20, 2015 at 12:57 PM, Sean Owen so...@cloudera.com wrote: What machines are HDFS data nodes -- just your master? that would explain it. Otherwise, is it actually the write that's slow or is something else you're doing much faster on the master for other reasons maybe? like you're actually shipping data via the master first in some local computation? so the master's executor has the result much faster? On Mon, Apr 20, 2015 at 12:21 PM, jamborta jambo...@gmail.com wrote: Hi all, I have a three node cluster with identical hardware. I am trying a workflow where it reads data from hdfs, repartitions it and runs a few map operations then writes the results back to hdfs. It looks like that all the computation, including the repartitioning and the maps complete within similar time intervals on all the nodes, except when it writes it back to HDFS when the master node does the job way much faster then the slaves (15s for each block as opposed to 1.2 min for the slaves). Any suggestion what the reason might be? thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/writing-to-hdfs-on-master-node-much-faster-tp22570.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: writing to hdfs on master node much faster
Check whether your partitioning results in balanced partitions ie partitions with similar sizes - one of the reasons for the performance differences observed by you may be that after your explicit repartitioning, the partition on your master node is much smaller than the RDD partitions on the other 2 nodes -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Monday, April 20, 2015 12:57 PM To: jamborta Cc: user@spark.apache.org Subject: Re: writing to hdfs on master node much faster What machines are HDFS data nodes -- just your master? that would explain it. Otherwise, is it actually the write that's slow or is something else you're doing much faster on the master for other reasons maybe? like you're actually shipping data via the master first in some local computation? so the master's executor has the result much faster? On Mon, Apr 20, 2015 at 12:21 PM, jamborta jambo...@gmail.com wrote: Hi all, I have a three node cluster with identical hardware. I am trying a workflow where it reads data from hdfs, repartitions it and runs a few map operations then writes the results back to hdfs. It looks like that all the computation, including the repartitioning and the maps complete within similar time intervals on all the nodes, except when it writes it back to HDFS when the master node does the job way much faster then the slaves (15s for each block as opposed to 1.2 min for the slaves). Any suggestion what the reason might be? thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/writing-to-hdfs-on -master-node-much-faster-tp22570.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark 1.3.1 - SQL Issues
Hi Just upgraded to Spark 1.3.1. I am getting an warning Warning (from warnings module): File D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\sql\context.py, line 191 warnings.warn(inferSchema is deprecated, please use createDataFrame instead) UserWarning: inferSchema is deprecated, please use createDataFrame instead However, documentation still says to use inferSchema. Here: http://spark.apache.org/docs/latest/sql-programming-guide.htm in section Also, I am getting an error in mlib.ALS.train function when passing dataframe (do I need to convert the DF to RDD?) Code: training = ssc.sql(select userId,movieId,rating from ratings where partitionKey 6).cache() print type(training) model = ALS.train(training,rank,numIter,lmbda) Error: class 'pyspark.sql.dataframe.DataFrame' Rank:8 Lmbda:1.0 iteration:10 Traceback (most recent call last): File D:\Project\Spark\code\movie_sql.py, line 109, in module bestConf = getBestModel(sc,ssc,training,validation,validationNoRating) File D:\Project\Spark\code\movie_sql.py, line 54, in getBestModel model = ALS.train(trainingRDD,rank,numIter,lmbda) File D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py, line 139, in train model = callMLlibFunc(trainALSModel, cls._prepare(ratings), rank, iterations, File D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py, line 127, in _prepare assert isinstance(ratings, RDD), ratings should be RDD AssertionError: ratings should be RDD -- Best Regards, Ayan Guha
Re: Running spark over HDFS
I think the memory requested by your job 2.0 GB is higher than what is requested. Please request for 256 MB explicitly which creating Spark Context and try again. Thanks and Regards, Suraj Sheth On Mon, Apr 20, 2015 at 2:44 PM, madhvi madhvi.gu...@orkash.com wrote: PFA screenshot of my cluster UI Thanks On Monday 20 April 2015 02:27 PM, Akhil Das wrote: Are you seeing your task being submitted to the UI? Under completed or running tasks? How much resources are you allocating for your job? Can you share a screenshot of your cluster UI and the code snippet that you are trying to run? Thanks Best Regards On Mon, Apr 20, 2015 at 12:37 PM, madhvi madhvi.gu...@orkash.com wrote: Hi, I Did the same you told but now it is giving the following error: ERROR TaskSchedulerImpl: Exiting due to error from cluster scheduler: All masters are unresponsive! Giving up. On UI it is showing that master is working Thanks Madhvi On Monday 20 April 2015 12:28 PM, Akhil Das wrote: In your eclipse, while you create your SparkContext, set the master uri as shown in the web UI's top left corner like: spark://someIPorHost:7077 and it should be fine. Thanks Best Regards On Mon, Apr 20, 2015 at 12:22 PM, madhvi madhvi.gu...@orkash.com wrote: Hi All, I am new to spark and have installed spark cluster over my system having hadoop cluster.I want to process data stored in HDFS through spark. When I am running code in eclipse it is giving the following warning repeatedly: scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources. I have made changes to spark-env.sh file as below export SPARK_WORKER_INSTANCES=1 export HADOOP_CONF_DIR=/root/Documents/hadoop/etc/hadoop export SPARK_WORKER_MEMORY=1g export SPARK_WORKER_CORES=2 export SPARK_EXECUTOR_MEMORY=1g I am running the spark standalone cluster.In cluster UI it is showing all workers with allocated resources but still its not working. what other configurations are needed to be changed? Thanks Madhvi Gupta - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Running spark over HDFS
There are lot of similar problems shared and resolved by users on this same portal. I have been part of those discussions before, Search those, Please Try them and let us know, if you still face problems. Thanks and Regards, Archit Thakur. On Mon, Apr 20, 2015 at 3:05 PM, madhvi madhvi.gu...@orkash.com wrote: On Monday 20 April 2015 02:52 PM, SURAJ SHETH wrote: Hi Madhvi, I think the memory requested by your job, i.e. 2.0 GB is higher than what is available. Please request for 256 MB explicitly while creating Spark Context and try again. Thanks and Regards, Suraj Sheth Tried the same but still no luck:| Madhvi
Re: mapPartitions vs foreachPartition
The same, which is between map and foreach. map takes iterator returns iterator foreach takes iterator returns Unit. On Mon, Apr 20, 2015 at 4:05 PM, Arun Patel arunp.bigd...@gmail.com wrote: What is difference between mapPartitions vs foreachPartition? When to use these? Thanks, Arun
Re: Running spark over HDFS
On Monday 20 April 2015 03:18 PM, Archit Thakur wrote: There are lot of similar problems shared and resolved by users on this same portal. I have been part of those discussions before, Search those, Please Try them and let us know, if you still face problems. Thanks and Regards, Archit Thakur. On Mon, Apr 20, 2015 at 3:05 PM, madhvi madhvi.gu...@orkash.com mailto:madhvi.gu...@orkash.com wrote: On Monday 20 April 2015 02:52 PM, SURAJ SHETH wrote: Hi Madhvi, I think the memory requested by your job, i.e. 2.0 GB is higher than what is available. Please request for 256 MB explicitly while creating Spark Context and try again. Thanks and Regards, Suraj Sheth Tried the same but still no luck:| Madhvi Hi, Its still not working.Dont getting where I am mistaken or doing wrong.Following are the configurations in my spark-env.sh file: export HADOOP_CONF_DIR=/root/Documents/hadoop/etc/hadoop export SPARK_EXECUTOR_MEMORY=256m export SPARK_DRIVER_MEMORY=256m I am running the command on the shell: ./bin/spark-submit --class Spark.testSpark.JavaWordCount --master yarn-client --num-executors 2 --driver-memory 256m --executor-memory 256m --executor-cores 1 lib/Untitled.jar Madhvi
RE: SparkStreaming onStart not being invoked on CustomReceiver attached to master with multiple workers
The code I've written is simple as it just invokes a thread and calls a store method on the Receiver class. I see this code with printlns working fine when I try spark-submit --jars jar --class test.TestCustomReceiver jar However it does not work with I try the same command above with --master spark://masterURLspark-submit --master spark://masterURL --jars jar --class test.TestCustomReceiver jar I also tried setting the master in the conf that I am created, but that does not work either. I do not see the onStart println being printed when I use --master option. Please advice. Also, the master I am attaching to has multiple workers across hosts with many threads available to it. The code is pasted below (Classes: TestReviever, TestCustomReceiver): package test;import org.apache.spark.storage.StorageLevel;import org.apache.spark.streaming.receiver.Receiver; public class TestReceiver extends ReceiverString { public TestReceiver() { super(StorageLevel.MEMORY_ONLY()); System.out.println(Ankit: Created TestReceiver); } @Override public void onStart() { System.out.println(Start TestReceiver);new TestThread().start(); } public void onStop() {} @SuppressWarnings(unused) private class TestThread extends Thread{ @Override public void run() { while(true){ try{ sleep( (long) (Math.random() * 3000)); }catch(Exception e){ e.printStackTrace(); } store(Time: + System.currentTimeMillis()); } } } } package test;import org.apache.spark.SparkConf;import org.apache.spark.api.java.function.Function;import org.apache.spark.streaming.Duration;import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;import org.apache.spark.streaming.api.java.JavaStreamingContext; public class TestCustomReceiver { public static void main(String[] args){ SparkConf conf = new SparkConf(); JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000)); TestReceiver receiver = new TestReceiver(); JavaReceiverInputDStreamString stream = ssc.receiverStream(receiver); stream.map(new Function(){ @Override public Object call(Object arg0) throws Exception { System.out.println(Received: + arg0); return arg0; } }).foreachRDD(new Function(){ @Override public Object call(Object arg0) throws Exception { System.out.println(Total Count: + ((org.apache.spark.api.java.JavaRDD)arg0).count()); return arg0; } }); ssc.start(); ssc.awaitTermination(); } } Thanks,Ankit Date: Mon, 20 Apr 2015 12:22:03 +0530 Subject: Re: SparkStreaming onStart not being invoked on CustomReceiver attached to master with multiple workers From: ak...@sigmoidanalytics.com To: patel7...@hotmail.com CC: user@spark.apache.org Would be good, if you can paste your custom receiver code and the code that you used to invoke it.ThanksBest Regards On Mon, Apr 20, 2015 at 9:43 AM, Ankit Patel patel7...@hotmail.com wrote: I am experiencing problem with SparkStreaming (Spark 1.2.0), the onStart method is never called on CustomReceiver when calling spark-submit against a master node with multiple workers. However, SparkStreaming works fine with no master node set. Anyone notice this issue?
Re: history-server does't read logs which are on FS
Thanks, it helped. We can't use Spark 1.3 because Cassandra DSE doesn't support it. 2015-04-17 21:48 GMT+02:00 Imran Rashid iras...@cloudera.com: are you calling sc.stop() at the end of your applications? The history server only displays completed applications, but if you don't call sc.stop(), it doesn't know that those applications have been stopped. Note that in spark 1.3, the history server can also display running applications (including completed applications, but that it thinks are still running), which improves things a little bit. On Fri, Apr 17, 2015 at 10:13 AM, Serega Sheypak serega.shey...@gmail.com wrote: Hi, started history-server Here is UI output: - *Event log directory:* file:/var/log/spark/applicationHistory/ No completed applications found! Did you specify the correct logging directory? Please verify your setting of spark.history.fs.logDirectory and whether you have the permissions to access it. It is also possible that your application did not run to completion or did not stop the SparkContext. Spark 1.2.0 I goto node where server runs and: ls -la /var/log/spark/applicationHistory/ total 44 drwxrwxrwx 11 root root4096 Apr 17 14:50 . drwxrwxrwx 3 cassandra root4096 Apr 16 15:31 .. drwxrwxrwx 2 vagrant vagrant 4096 Apr 17 10:06 app-20150417100630- drwxrwxrwx 2 vagrant vagrant 4096 Apr 17 11:01 app-20150417110140-0001 drwxrwxrwx 2 vagrant vagrant 4096 Apr 17 11:12 app-20150417111216-0002 drwxrwxrwx 2 vagrant vagrant 4096 Apr 17 11:14 app-20150417111441-0003 drwxrwx--- 2 vagrant vagrant 4096 Apr 17 11:20 *app-20150417112028-0004* drwxrwx--- 2 vagrant vagrant 4096 Apr 17 14:17 *app-20150417141733-0005* drwxrwx--- 2 vagrant vagrant 4096 Apr 17 14:32 *app-20150417143237-0006* drwxrwx--- 2 vagrant vagrant 4096 Apr 17 14:49 *app-20150417144902-0007* drwxrwx--- 2 vagrant vagrant 4096 Apr 17 14:50 *app-20150417145025-0008* So there are logs, but history-server doesn't want to display them. I've checked workers, they are pointed to that dir also, I run app, I see new log. Here is history-server log output: vagrant@dsenode01:/usr/lib/spark/logs$ cat spark-root-org.apache.spark.deploy.history.HistoryServer-1-dsenode01.out Spark assembly has been built with Hive, including Datanucleus jars on classpath Spark Command: java -cp ::/usr/lib/spark/sbin/../conf:/usr/lib/spark/lib/spark-assembly-1.2.0-hadoop2.4.0.jar:/usr/lib/spark/lib/datanucleus-api-jdo-3.2.6.jar:/usr/lib/spark/lib/datanucleus-rdbms-3.2.9.jar:/usr/lib/spark/lib/datanucleus-core-3.2.10.jar -XX:MaxPermSize=128m -Dspark.akka.logLifecycleEvents=true -Dspark.history.fs.logDirectory=/var/log/spark/applicationHistory -Dspark.eventLog.enabled=true -Xms512m -Xmx512m org.apache.spark.deploy.history.HistoryServer Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/04/17 09:55:21 INFO HistoryServer: Registered signal handlers for [TERM, HUP, INT] 15/04/17 09:55:21 INFO SecurityManager: Changing view acls to: root 15/04/17 09:55:21 INFO SecurityManager: Changing modify acls to: root 15/04/17 09:55:21 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root) 15/04/17 09:55:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/04/17 09:55:24 INFO Utils: Successfully started service on port 18080. 15/04/17 09:55:24 INFO HistoryServer: Started HistoryServer at http://dsenode01:18080 What could be wrong with it?
Re: mapPartitions vs foreachPartition
True. On Mon, Apr 20, 2015 at 4:14 PM, Arun Patel arunp.bigd...@gmail.com wrote: mapPartitions is a transformation and foreachPartition is a an action? Thanks Arun On Mon, Apr 20, 2015 at 4:38 AM, Archit Thakur archit279tha...@gmail.com wrote: The same, which is between map and foreach. map takes iterator returns iterator foreach takes iterator returns Unit. On Mon, Apr 20, 2015 at 4:05 PM, Arun Patel arunp.bigd...@gmail.com wrote: What is difference between mapPartitions vs foreachPartition? When to use these? Thanks, Arun
SparkSQL performance
Hi all, I have a simple query Select * from tableX where attribute1 between 0 and 5 that I run over a Kryo file with four partitions that ends up being around 3.5 million rows in our case. If I run this query by doing a simple map().filter() it takes around ~9.6 seconds but when I apply schema, register the table into a SqlContext, and then run the query, it takes around ~16 seconds. This is using Spark 1.2.1 with Scala 2.10.0 I am wondering why there is such a big gap on performance if it is just a filter. Internally, the relation files are mapped to a JavaBean. This different data presentation (JavaBeans vs SparkSQL internal representation) could lead to such difference? Is there anything I could do to make the performance get closer to the hard-coded option? Thanks in advance for any suggestions or ideas. Renato M.
[pyspark] Starting workers in a virtualenv
Hi all, I am running the Python process that communicates with Spark in a virtualenv. Is there any way I can make sure that the Python processes of the workers are also started in a virtualenv? Currently I am getting ImportErrors when the worker tries to unpickle stuff that is not installed system-wide. For now both the worker and the driver run on the same machine in local mode. Thanks in advance! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Running spark over HDFS
Are you seeing your task being submitted to the UI? Under completed or running tasks? How much resources are you allocating for your job? Can you share a screenshot of your cluster UI and the code snippet that you are trying to run? Thanks Best Regards On Mon, Apr 20, 2015 at 12:37 PM, madhvi madhvi.gu...@orkash.com wrote: Hi, I Did the same you told but now it is giving the following error: ERROR TaskSchedulerImpl: Exiting due to error from cluster scheduler: All masters are unresponsive! Giving up. On UI it is showing that master is working Thanks Madhvi On Monday 20 April 2015 12:28 PM, Akhil Das wrote: In your eclipse, while you create your SparkContext, set the master uri as shown in the web UI's top left corner like: spark://someIPorHost:7077 and it should be fine. Thanks Best Regards On Mon, Apr 20, 2015 at 12:22 PM, madhvi madhvi.gu...@orkash.com wrote: Hi All, I am new to spark and have installed spark cluster over my system having hadoop cluster.I want to process data stored in HDFS through spark. When I am running code in eclipse it is giving the following warning repeatedly: scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources. I have made changes to spark-env.sh file as below export SPARK_WORKER_INSTANCES=1 export HADOOP_CONF_DIR=/root/Documents/hadoop/etc/hadoop export SPARK_WORKER_MEMORY=1g export SPARK_WORKER_CORES=2 export SPARK_EXECUTOR_MEMORY=1g I am running the spark standalone cluster.In cluster UI it is showing all workers with allocated resources but still its not working. what other configurations are needed to be changed? Thanks Madhvi Gupta - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: NEWBIE/not able to connect to postgresql using jdbc
try doing a sc.addJar(path\to\your\postgres\jar) Thanks Best Regards On Mon, Apr 20, 2015 at 12:26 PM, shashanksoni shashankso...@gmail.com wrote: I am using spark 1.3 standalone cluster on my local windows and trying to load data from one of our server. Below is my code - import os os.environ['SPARK_CLASSPATH'] = C:\Users\ACERNEW3\Desktop\Spark\spark-1.3.0-bin-hadoop2.4\postgresql-9.2-1002.jdbc3.jar from pyspark import SparkContext, SparkConf, SQLContext sc = SparkContext(appName = SampleApp) sqlContext = SQLContext(sc) df = sqlContext.load(source=jdbc, url=jdbc:postgresql://54.189.136.218/reporting, dbtable=public.course_mast) When I run it, it throws error - No suitable driver found for jdbc:postgresql. Please help me out. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NEWBIE-not-able-to-connect-to-postgresql-using-jdbc-tp22569.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
writing to hdfs on master node much faster
Hi all, I have a three node cluster with identical hardware. I am trying a workflow where it reads data from hdfs, repartitions it and runs a few map operations then writes the results back to hdfs. It looks like that all the computation, including the repartitioning and the maps complete within similar time intervals on all the nodes, except when it writes it back to HDFS when the master node does the job way much faster then the slaves (15s for each block as opposed to 1.2 min for the slaves). Any suggestion what the reason might be? thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/writing-to-hdfs-on-master-node-much-faster-tp22570.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Running spark over HDFS
On Monday 20 April 2015 02:52 PM, SURAJ SHETH wrote: Hi Madhvi, I think the memory requested by your job, i.e. 2.0 GB is higher than what is available. Please request for 256 MB explicitly while creating Spark Context and try again. Thanks and Regards, Suraj Sheth Tried the same but still no luck:| Madhvi
Re: Spark-1.2.2-bin-hadoop2.4.tgz missing
I looking for that build too. -Conor On Mon, Apr 20, 2015 at 9:18 AM, Marius Soutier mps@gmail.com wrote: Same problem here... On 20.04.2015, at 09:59, Zsolt Tóth toth.zsolt@gmail.com wrote: Hi all, it looks like the 1.2.2 pre-built version for hadoop2.4 is not available on the mirror sites. Am I missing something? Regards, Zsolt - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark-1.2.2-bin-hadoop2.4.tgz missing
Hi all, it looks like the 1.2.2 pre-built version for hadoop2.4 is not available on the mirror sites. Am I missing something? Regards, Zsolt
RE: compliation error
Any pointers to this issue..? Thanks Regards Brahma Reddy Battula From: Brahma Reddy Battula [brahmareddy.batt...@huawei.com] Sent: Monday, April 20, 2015 9:30 AM To: Sean Owen; Ted Yu Cc: user@spark.apache.org Subject: RE: compliation error Thanks a lot for your replies.. @Ted,V100R001C00 this is our internal hadoop version which is based on hadoop 2.4.1.. @Sean Owen,Yes, you are correct...Just I wanted to know, what leads this problem... Thanks Regards Brahma Reddy Battula From: Sean Owen [so...@cloudera.com] Sent: Monday, April 20, 2015 9:14 AM To: Ted Yu Cc: Brahma Reddy Battula; user@spark.apache.org Subject: Re: compliation error Brahma since you can see the continuous integration builds are passing, it's got to be something specific to your environment, right? this is not even an error from Spark, but from Maven plugins. On Mon, Apr 20, 2015 at 4:42 AM, Ted Yu yuzhih...@gmail.com wrote: bq. -Dhadoop.version=V100R001C00 First time I saw above hadoop version. Doesn't look like Apache release. I checked my local maven repo but didn't find impl under ~/.m2/repository/com/ibm/icu FYI On Sun, Apr 19, 2015 at 8:04 PM, Brahma Reddy Battula brahmareddy.batt...@huawei.com wrote: Hey Todd Thanks a lot for your reply...Kindly check following details.. spark version :1.1.0 jdk:jdk1.7.0_60 , command:mvn -Pbigtop-dist -Phive -Pyarn -Phadoop-2.4 -Dhadoop.version=V100R001C00 -DskipTests package Thanks Regards Brahma Reddy Battula From: Ted Yu [yuzhih...@gmail.com] Sent: Monday, April 20, 2015 8:07 AM To: Brahma Reddy Battula Cc: user@spark.apache.org Subject: Re: compliation error What JDK release are you using ? Can you give the complete command you used ? Which Spark branch are you working with ? Cheers On Sun, Apr 19, 2015 at 7:25 PM, Brahma Reddy Battula brahmareddy.batt...@huawei.com wrote: Hi All Getting following error, when I am compiling spark..What did I miss..? Even googled and did not find the exact solution for this... [ERROR] Failed to execute goal org.apache.maven.plugins:maven-shade-plugin:2.2:shade (default) on project spark-assembly_2.10: Error creating shaded jar: Error in ASM processing class com/ibm/icu/impl/data/LocaleElements_zh__PINYIN.class: 48188 - [Help 1] org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute goal org.apache.maven.plugins:maven-shade-plugin:2.2:shade (default) on project spark-assembly_2.10: Error creating shaded jar: Error in ASM processing class com/ibm/icu/impl/data/LocaleElements_zh__PINYIN.class at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:217) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145) at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:84) at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:59) at org.apache.maven.lifecycle.internal.LifecycleStarter.singleThreadedBuild(LifecycleStarter.java:183) at org.apache.maven.lifecycle.internal.LifecycleStarter.execute(LifecycleStarter.java:161) at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:320) Thanks Regards Brahma Reddy Battula - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark-1.2.2-bin-hadoop2.4.tgz missing
Same problem here... On 20.04.2015, at 09:59, Zsolt Tóth toth.zsolt@gmail.com wrote: Hi all, it looks like the 1.2.2 pre-built version for hadoop2.4 is not available on the mirror sites. Am I missing something? Regards, Zsolt - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: sparksql - HiveConf not found during task deserialization
Can you try sc.addJar(/path/to/your/hive/jar), i think it will resolve it. Thanks Best Regards On Mon, Apr 20, 2015 at 12:26 PM, Manku Timma manku.tim...@gmail.com wrote: Akhil, But the first case of creating HiveConf on the executor works fine (map case). Only the second case fails. I was suspecting some foul play with classloaders. On 20 April 2015 at 12:20, Akhil Das ak...@sigmoidanalytics.com wrote: Looks like a missing jar, try to print the classpath and make sure the hive jar is present. Thanks Best Regards On Mon, Apr 20, 2015 at 11:52 AM, Manku Timma manku.tim...@gmail.com wrote: I am using spark-1.3 with hadoop-provided and hive-provided and hive-0.13.1 profiles. I am running a simple spark job on a yarn cluster by adding all hadoop2 and hive13 jars to the spark classpaths. If I remove the hive-provided while building spark, I dont face any issue. But with hive-provided I am getting a java.lang.NoClassDefFoundError: org/apache/hadoop/hive/conf/HiveConf in the yarn executor. Code is below: import org.apache.spark._ import org.apache.spark.sql._ import org.apache.hadoop.hive.conf.HiveConf object Simple { def main(args: Array[String]) = { val sc = new SparkContext(new SparkConf()) val sqlC = new org.apache.spark.sql.hive.HiveContext(sc) val x = sc.parallelize(1 to 2).map(x = { val h = new HiveConf; h.getBoolean(hive.test, false) }) x.collect.foreach(x = println(s- $x )) val result = sqlC.sql( select * from products_avro order by month, name, price ) result.collect.foreach(println) } } The first job (involving map) runs fine. HiveConf is instantiated and the conf variable is looked up etc. But the second job (involving the select * query) throws the class not found exception. The task deserializer is the one throwing the exception. It is unable to find the class in its classpath. Not sure what is different from the first job which also involved HiveConf. 157573 [task-result-getter-3] 2015/04/20 11:01:48:287 WARN TaskSetManager: Lost task 0.2 in stage 2.0 (TID 4, localhost): java.lang.NoClassDefFoundError: org/apache/hadoop/hive/conf/HiveConf at java.lang.Class.getDeclaredFields0(Native Method) at java.lang.Class.privateGetDeclaredFields(Class.java:2436) at java.lang.Class.getDeclaredField(Class.java:1946) at java.io.ObjectStreamClass.getDeclaredSUID(ObjectStreamClass.java:1659) at java.io.ObjectStreamClass.access$700(ObjectStreamClass.java:72) at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:480) at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468) at java.security.AccessController.doPrivileged(Native Method) at java.io.ObjectStreamClass.init(ObjectStreamClass.java:468) at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365) at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:602) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at
Re: Running spark over HDFS
Hi, I Did the same you told but now it is giving the following error: ERROR TaskSchedulerImpl: Exiting due to error from cluster scheduler: All masters are unresponsive! Giving up. On UI it is showing that master is working Thanks Madhvi On Monday 20 April 2015 12:28 PM, Akhil Das wrote: In your eclipse, while you create your SparkContext, set the master uri as shown in the web UI's top left corner like: spark://someIPorHost:7077 and it should be fine. Thanks Best Regards On Mon, Apr 20, 2015 at 12:22 PM, madhvi madhvi.gu...@orkash.com mailto:madhvi.gu...@orkash.com wrote: Hi All, I am new to spark and have installed spark cluster over my system having hadoop cluster.I want to process data stored in HDFS through spark. When I am running code in eclipse it is giving the following warning repeatedly: scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources. I have made changes to spark-env.sh file as below export SPARK_WORKER_INSTANCES=1 export HADOOP_CONF_DIR=/root/Documents/hadoop/etc/hadoop export SPARK_WORKER_MEMORY=1g export SPARK_WORKER_CORES=2 export SPARK_EXECUTOR_MEMORY=1g I am running the spark standalone cluster.In cluster UI it is showing all workers with allocated resources but still its not working. what other configurations are needed to be changed? Thanks Madhvi Gupta - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org
RE: shuffle.FetchFailedException in spark on YARN job
I don’t think this problem is related to Netty or NIO, switching to nio will not change this part of code path to get the index file for sort-based shuffle reader. I think you could check your system from some aspects: 1. Is there any hardware problem like disk full or others which makes this file lost or non-exist, this can introduce such exception. 2. Do you have any other exception besides this one, mainly the shuffle fetch failed problem means your job is in abnormal status, some other problems may also introduce this error. Thanks Jerry From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Monday, April 20, 2015 2:56 PM To: roy Cc: user@spark.apache.org Subject: Re: shuffle.FetchFailedException in spark on YARN job Which version of Spark are you using? Did you try using spark.shuffle.blockTransferService=nio Thanks Best Regards On Sat, Apr 18, 2015 at 11:14 PM, roy rp...@njit.edumailto:rp...@njit.edu wrote: Hi, My spark job is failing with following error message org.apache.spark.shuffle.FetchFailedException: /mnt/ephemeral12/yarn/nm/usercache/abc/appcache/application_1429353954024_1691/spark-local-20150418132335-0723/28/shuffle_3_1_0.index (No such file or directory) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.graphx.impl.EdgePartition.updateVertices(EdgePartition.scala:89) at org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:75) at org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:73) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:210) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.FileNotFoundException: /mnt/ephemeral12/yarn/nm/usercache/abc/appcache/application_1429353954024_1691/spark-local-20150418132335-0723/28/shuffle_3_1_0.index (No such file or directory) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.init(FileInputStream.java:146) at org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:109) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305) at org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchLocalBlocks(ShuffleBlockFetcherIterator.scala:235) at org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:268) at org.apache.spark.storage.ShuffleBlockFetcherIterator.init(ShuffleBlockFetcherIterator.scala:115) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:76) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) ... 7 more ) my job
Re: Addition of new Metrics for killed executors.
Hi Twinkle, We have a use case in where we want to debug the reason of how n why an executor got killed. Could be because of stackoverflow, GC or any other unexpected scenario. If I see the driver UI there is no information present around killed executors, So was just curious how do people usually debug those things apart from scanning logs and understanding it. The metrics we are planning to add are similar to what we have for non killed executors - [data per stage specifically] - numFailedTasks, executorRunTime, inputBytes, memoryBytesSpilled .. etc. Apart from that we also intend to add all information present in an executor tabs for running executors. Thanks, Archit Thakur. On Mon, Apr 20, 2015 at 1:31 PM, twinkle sachdeva twinkle.sachd...@gmail.com wrote: Hi Archit, What is your use case and what kind of metrics are you planning to add? Thanks, Twinkle On Fri, Apr 17, 2015 at 4:07 PM, Archit Thakur archit279tha...@gmail.com wrote: Hi, We are planning to add new Metrics in Spark for the executors that got killed during the execution. Was just curious, why this info is not already present. Is there some reason for not adding it.? Any ideas around are welcome. Thanks and Regards, Archit Thakur.
Re: Streaming problems running 24x7
The processing speed displayed in the UI doesn’t seem to take everything into account. I also had a low processing time but had to increase batch duration from 30 seconds to 1 minute because waiting batches kept increasing. Now it runs fine. On 17.04.2015, at 13:30, González Salgado, Miquel miquel.gonza...@tecsidel.es wrote: Hi, Thank you for your response, I think it is not because of the processing speed, in fact the delay is under 1 second, while the batch interval is 10 seconds… The data volume is low (10 lines / second) Changing to local[8] was worsening the problem (cpu increase more quickly) By the way, I have seen some results changing to this call of Kafkautils: KafkaUtils.createDirectStream CPU usage is low and stable, but memory is slowly increasing… But at least the process last longer.. Best regards, Miquel De: bit1...@163.com mailto:bit1...@163.com [mailto:bit1...@163.com mailto:bit1...@163.com] Enviado el: jueves, 16 de abril de 2015 10:58 Para: González Salgado, Miquel; user Asunto: Re: Streaming problems running 24x7 From your description, looks like the data processing speed is far behind the data receiving speed Could you try to increase the core number when you submit the application? such as local[8]? bit1...@163.com mailto:bit1...@163.com From: Miquel mailto:miquel.gonza...@tecsidel.es Date: 2015-04-16 16:39 To: user mailto:user@spark.apache.org Subject: Streaming problems running 24x7 Hello, I'm finding problems to run a spark streaming job for more than a few hours (3 or 4). It begins working OK, but it degrades until failure. Some of the symptoms: - Consumed memory and CPU keeps getting higher ang higher, and finally some error is being thrown (java.lang.Exception: Could not compute split, block input-0-1429168311800 not found) and data stops being calculated. - The delay showed in web UI keeps also increasing. - After some hours disk space is being consumed. There are a lot of directories with name like /tmp/spark-e3505437-f509-4b5b-92d2-ae2559badb3c The job is basically reading information from kafka topic, and calculate several topN tables for some key and value camps related with netflow data, some of the parameters are this: - batch interval: 10 seconds - window calculation: 1 minute - spark.cleaner.ttl: 5 minutes The execution is standalone on one machine (16GB RAM , 12 cores), and the options to run it is as follows: /opt/spark/bin/spark-submit --driver-java-options -XX:+UseCompressedOops --jars $JARS --class $APPCLASS --master local[2] $APPJAR someone has some clues about the problem? I don't know if it is a configuration problem or some error in the code that is causing memory leaks.. Thank you in advance! Miquel PD: the code is basically this:-- object NetflowTopn { var appPath = . var zkQuorum = var group = var topics = var numThreads = 1 var batch_interval = 10 var n_window = 1 var n_slide = 1 var topnsize = 10 var hm = Map[String,Int]() hm += ( unix_secs - 0 ) hm += ( unix_nsecs - 1 ) hm += ( sysuptime - 2 ) hm += ( exaddr - 3 ) hm += ( dpkts - 4 ) hm += ( doctets -5 ) hm += ( first - 6 ) hm += ( last - 7 ) hm += ( engine_type - 8 ) hm += ( engine_id - 9 ) hm += ( srcaddr -10 ) hm += ( dstaddr -11 ) hm += ( nexthop -12 ) hm += ( input - 13 ) hm += ( output - 14 ) hm += ( srcport -15 ) hm += ( dstport -16 ) hm += ( prot - 17 ) hm += ( tos -18 ) hm += ( tcp_flags - 19 ) hm += ( src_mask - 20 ) hm += ( dst_mask - 21 ) hm += ( src_as - 22 ) hm += ( dst_as - 23 ) def getKey (lcamps: Array[String], camp: String): String = { if (camp == total) return total else return lcamps(hm(camp)) } def getVal (lcamps: Array[String], camp: String): Long = { if (camp == flows) return 1L else return lcamps(hm(camp)).toLong } def getKeyVal (line: String, keycamps: List[String], valcamp: String ) = { val arr = line.split(,) (keycamps.map(getKey(arr, _)).mkString(,) , getVal(arr,valcamp) ) } def writeOutput (data: Array[(Long, String)], keycamps_str: String, csvheader: String, valcamp: String, prefix: String) = { val ts = System.currentTimeMillis val f1 = appPath + /data/ + prefix + _ + keycamps_str + _ + valcamp + .csv val f1f = new File(f1); val ftmpf = new File(f1 + ts); val pw = new PrintWriter(ftmpf) pw.println(csvheader) data.foreach{ t = pw.println (t._2 + , + t._1) } pw.close ftmpf.renameTo(f1f); } def main(args: Array[String]) { if (args.length 1) {
Re: Addition of new Metrics for killed executors.
Hi Archit, What is your use case and what kind of metrics are you planning to add? Thanks, Twinkle On Fri, Apr 17, 2015 at 4:07 PM, Archit Thakur archit279tha...@gmail.com wrote: Hi, We are planning to add new Metrics in Spark for the executors that got killed during the execution. Was just curious, why this info is not already present. Is there some reason for not adding it.? Any ideas around are welcome. Thanks and Regards, Archit Thakur.
Re: Running spark over HDFS
No I am not getting any task on the UI which I am running.Also I have set instances=1 but on UI it is showing 2 workers.i am running the java word count code exactly but i have the text file in HDFS.Following is the part of my code I writing to make connection SparkConf sparkConf = new SparkConf().setAppName(JavaWordCount); sparkConf.setMaster(spark://192.168.0.119:7077); JavaSparkContext ctx = new JavaSparkContext(sparkConf); Configuration conf = new Configuration(); conf.set(fs.default.name, hdfs://192.168.0.119:9000); FileSystem dfs = FileSystem.get(conf); JavaRDDString lines = ctx.textFile(dfs.getWorkingDirectory()+/spark.txt, 1); Thanks On Monday 20 April 2015 02:27 PM, Akhil Das wrote: Are you seeing your task being submitted to the UI? Under completed or running tasks? How much resources are you allocating for your job? Can you share a screenshot of your cluster UI and the code snippet that you are trying to run? Thanks Best Regards On Mon, Apr 20, 2015 at 12:37 PM, madhvi madhvi.gu...@orkash.com mailto:madhvi.gu...@orkash.com wrote: Hi, I Did the same you told but now it is giving the following error: ERROR TaskSchedulerImpl: Exiting due to error from cluster scheduler: All masters are unresponsive! Giving up. On UI it is showing that master is working Thanks Madhvi On Monday 20 April 2015 12:28 PM, Akhil Das wrote: In your eclipse, while you create your SparkContext, set the master uri as shown in the web UI's top left corner like: spark://someIPorHost:7077 and it should be fine. Thanks Best Regards On Mon, Apr 20, 2015 at 12:22 PM, madhvi madhvi.gu...@orkash.com mailto:madhvi.gu...@orkash.com wrote: Hi All, I am new to spark and have installed spark cluster over my system having hadoop cluster.I want to process data stored in HDFS through spark. When I am running code in eclipse it is giving the following warning repeatedly: scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources. I have made changes to spark-env.sh file as below export SPARK_WORKER_INSTANCES=1 export HADOOP_CONF_DIR=/root/Documents/hadoop/etc/hadoop export SPARK_WORKER_MEMORY=1g export SPARK_WORKER_CORES=2 export SPARK_EXECUTOR_MEMORY=1g I am running the spark standalone cluster.In cluster UI it is showing all workers with allocated resources but still its not working. what other configurations are needed to be changed? Thanks Madhvi Gupta - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org
mapPartitions vs foreachPartition
What is difference between mapPartitions vs foreachPartition? When to use these? Thanks, Arun
Re: Running spark over HDFS
Hi Madhvi, I think the memory requested by your job, i.e. 2.0 GB is higher than what is available. Please request for 256 MB explicitly while creating Spark Context and try again. Thanks and Regards, Suraj Sheth
Order of execution of tasks inside of a stage and computing the number of stages
Hello! I'm newbie in spark I would like to understand some basic mechanism on how it works behind the scenes. I have attached the lineage of my RDD and I have the following questions: 1. Why do I have 8 stages instead of 5? From the book Learning from Spark (Chapter 8 -http://bit.ly/1E0Hah7), I could understand that RDDs that exist at the same level of indentation as their parents will be pipelined [into same physical stage] during physical execution. Since I have 5 parents, I'm expected to have 5 stages. Still the Spark UI stages view, shows 8 stages. Also what represents the (8) represented in the debug string? Is any bug in this function? 2. At the stage level, what is the execution order among the tasks? They can be executed all of them in parallel (for example: test4spark.csv HadoopRDD[0] || test4spark.csv MappedRDD[1] || MapPartitionsRDD[4] || ZippedWithIndexRDD[6]) or they are waiting each task upon the other to complete ( test4spark.csv HadoopRDD[0]=completed= test4spark.csv MappedRDD[1]=completed=etc) 3. Between stages, the order is given by the execution plan, so each stage is waiting till the ones before it will be completed. Is this a correct assumption? I look forward for your answers. Regards, Florin (8) MappedRDD[21] at map at WAChunkSepvgFilterNewModel.scala:298 [] | MappedRDD[20] at map at WAChunkSepvgFilterNewModel.scala:182 [] | ShuffledRDD[19] at sortByKey at WAChunkSepvgFilterNewModel.scala:182 [] +-(8) ShuffledRDD[16] at aggregateByKey at WAChunkSepvgFilterNewModel.scala:182 [] +-(8) FlatMappedRDD[15] at flatMap at WAChunkSepvgFilterNewModel.scala:174 [] | ZippedWithIndexRDD[14] at zipWithIndex at WAChunkSepvgFilterNewModel.scala:174 [] | MappedRDD[13] at map at WAChunkSepvgFilterNewModel.scala:272 [] | MappedRDD[12] at map at WAChunkSepvgFilterNewModel.scala:161 [] | ShuffledRDD[11] at sortByKey at WAChunkSepvgFilterNewModel.scala:161 [] +-(8) ShuffledRDD[8] at aggregateByKey at WAChunkSepvgFilterNewModel.scala:161 [] +-(8) FlatMappedRDD[7] at flatMap at WAChunkSepvgFilterNewModel.scala:153 [] | ZippedWithIndexRDD[6] at zipWithIndex at WAChunkSepvgFilterNewModel.scala:153 [] | MappedRDD[5] at map at WAChunkSepvgFilterNewModel.scala:248 [] | MapPartitionsRDD[4] at mapPartitionsWithIndex at WAChunkSepvgFilterNewModel.scala:114 [] | test4spark.csv MappedRDD[1] at textFile at WAChunkSepvgFilterNewModel.scala:215 [] | test4spark.csv HadoopRDD[0] at textFile at WAChunkSepvgFilterNewModel.scala:215 [] [image: Inline image 1] Excerpt from the book: The lineage output shown in Example 8-8 uses indentation levels to show where RDDs are going to be pipelined together into physical stages. RDDs that exist at the same level of indentation as their parents will be pipelined during physical execution
Re: mapPartitions vs foreachPartition
mapPartitions is a transformation and foreachPartition is a an action? Thanks Arun On Mon, Apr 20, 2015 at 4:38 AM, Archit Thakur archit279tha...@gmail.com wrote: The same, which is between map and foreach. map takes iterator returns iterator foreach takes iterator returns Unit. On Mon, Apr 20, 2015 at 4:05 PM, Arun Patel arunp.bigd...@gmail.com wrote: What is difference between mapPartitions vs foreachPartition? When to use these? Thanks, Arun
Custom Partitioning Spark
Hi, I aim to do custom partitioning on a text file. I first convert it into pairRDD and then try to use my custom partitioner. However, somehow it is not working. My code snippet is given below. val file=sc.textFile(filePath) val locLines=file.map(line = line.split(\t)).map(line= ((line(2).toDouble,line(3).toDouble),line(5).toLong)) val ck=locLines.partitionBy(new HashPartitioner(50)) // new CustomPartitioner(50) -- none of the way is working here. while reading the file using textFile method it automatically partitions the file. However when i explicitly want to partition the new rdd locLines, It doesn't appear to do anything and even the number of partitions are same which is created by sc.textFile(). Any help in this regard will be highly appreciated. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Custom-Partitioning-Spark-tp22571.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Understanding the build params for spark with sbt.
Hi. My usage is only about the spark core and hdfs, so no spark sql or mlib or other components invovled. I saw the hint on the http://spark.apache.org/docs/latest/building-spark.html, with a sample like: build/sbt -Pyarn -Phadoop-2.3 assembly. (what's the -P for?) Fundamentally, I'd like to let sbt only compile and package the core and the hadoop. Meanwhile, it would be appreciated if you could inform me what's the scala file that controls the logic of -Pyarn, so that I can dig into the build source and have a finer control. Thanks. -- 吾輩は猫である。ホームーページはhttp://introo.me。 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Issue of running partitioned loading (RDD) in Spark External Datasource on Mesos
I implemented two kinds of DataSource, one load data during buildScan, the other returning my RDD class with partition information for future loading. My RDD's compute gets actorSystem from SparkEnv.get.actorSystem, then use Spray to interact with a HTTP endpoint, which is the same flow as loading data in buildScan. All the Spray dependencies are included in a jar and passes to spark-submit using --jar. The Job is define in python. Both scenarios work testing locally using --master local[4]. For mesos, the not partitioned loading works too, but the partitioned loading hits the following exception. Traceback (most recent call last): File /root/spark-1.3.1-bin-hadoop2.4/../CloudantApp.py, line 78, in module for code in airportData.collect(): File /root/spark-1.3.1-bin-hadoop2.4/python/pyspark/sql/dataframe.py, line 293, in collect port = self._sc._jvm.PythonRDD.collectAndServe(self._jdf.javaToPython().rdd()) File /root/spark-1.3.1-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /root/spark-1.3.1-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o60.javaToPython. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 36.0 failed 4 times, most recent failure: Lost task 0.3 in stage 36.0 (TID 147, 198.11.207.72): com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'spray' at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:124) at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:147) at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:159) at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:164) at com.typesafe.config.impl.SimpleConfig.getObject(SimpleConfig.java:218) at com.typesafe.config.impl.SimpleConfig.getConfig(SimpleConfig.java:224) at com.typesafe.config.impl.SimpleConfig.getConfig(SimpleConfig.java:33) at spray.can.HttpExt.init(Http.scala:143) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78) at scala.util.Try$.apply(Try.scala:161) at akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73) at akka.actor.ExtensionKey.createExtension(Extension.scala:153) at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:711) at akka.actor.ExtensionId$class.apply(Extension.scala:79) at akka.actor.ExtensionKey.apply(Extension.scala:149) at akka.io.IO$.apply(IO.scala:30) at spray.client.pipelining$.sendReceive(pipelining.scala:35) at com.cloudant.spark.common.JsonStoreDataAccess.getQueryResult(JsonStoreDataAccess.scala:118) at com.cloudant.spark.common.JsonStoreDataAccess.getIterator(JsonStoreDataAccess.scala:71) at com.cloudant.spark.common.JsonStoreRDD.compute(JsonStoreRDD.scala:86) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Is this due to some kind of classpath setup issue on the executor for the external jar for handing RDD? Thanks in advance for any suggestions on how to resolve this. Yang
Re: Equal number of RDD Blocks
I also see that its creating both receivers on the same executor and that might be the cause of having more RDDs on executor than the other. Can I suggest spark to create each receiver on a each executor Regards,Laeeq On Monday, April 20, 2015 4:51 PM, Evo Eftimov evo.efti...@isecc.com wrote: #yiv8130515999 #yiv8130515999 -- _filtered #yiv8130515999 {font-family:Helvetica;panose-1:2 11 6 4 2 2 2 2 2 4;} _filtered #yiv8130515999 {panose-1:2 4 5 3 5 4 6 3 2 4;} _filtered #yiv8130515999 {font-family:Calibri;panose-1:2 15 5 2 2 2 4 3 2 4;} _filtered #yiv8130515999 {font-family:Tahoma;panose-1:2 11 6 4 3 5 4 4 2 4;}#yiv8130515999 #yiv8130515999 p.yiv8130515999MsoNormal, #yiv8130515999 li.yiv8130515999MsoNormal, #yiv8130515999 div.yiv8130515999MsoNormal {margin:0in;margin-bottom:.0001pt;font-size:12.0pt;}#yiv8130515999 a:link, #yiv8130515999 span.yiv8130515999MsoHyperlink {color:blue;text-decoration:underline;}#yiv8130515999 a:visited, #yiv8130515999 span.yiv8130515999MsoHyperlinkFollowed {color:purple;text-decoration:underline;}#yiv8130515999 p.yiv8130515999MsoAcetate, #yiv8130515999 li.yiv8130515999MsoAcetate, #yiv8130515999 div.yiv8130515999MsoAcetate {margin:0in;margin-bottom:.0001pt;font-size:8.0pt;}#yiv8130515999 p.yiv8130515999msolistparagraph, #yiv8130515999 li.yiv8130515999msolistparagraph, #yiv8130515999 div.yiv8130515999msolistparagraph {margin-right:0in;margin-left:0in;font-size:12.0pt;}#yiv8130515999 p.yiv8130515999msonormal, #yiv8130515999 li.yiv8130515999msonormal, #yiv8130515999 div.yiv8130515999msonormal {margin-right:0in;margin-left:0in;font-size:12.0pt;}#yiv8130515999 p.yiv8130515999msochpdefault, #yiv8130515999 li.yiv8130515999msochpdefault, #yiv8130515999 div.yiv8130515999msochpdefault {margin-right:0in;margin-left:0in;font-size:12.0pt;}#yiv8130515999 span.yiv8130515999msohyperlink {}#yiv8130515999 span.yiv8130515999msohyperlinkfollowed {}#yiv8130515999 span.yiv8130515999emailstyle17 {}#yiv8130515999 p.yiv8130515999msonormal1, #yiv8130515999 li.yiv8130515999msonormal1, #yiv8130515999 div.yiv8130515999msonormal1 {margin:0in;margin-bottom:.0001pt;font-size:12.0pt;}#yiv8130515999 span.yiv8130515999msohyperlink1 {color:blue;text-decoration:underline;}#yiv8130515999 span.yiv8130515999msohyperlinkfollowed1 {color:purple;text-decoration:underline;}#yiv8130515999 p.yiv8130515999msolistparagraph1, #yiv8130515999 li.yiv8130515999msolistparagraph1, #yiv8130515999 div.yiv8130515999msolistparagraph1 {margin-top:0in;margin-right:0in;margin-bottom:0in;margin-left:.5in;margin-bottom:.0001pt;font-size:12.0pt;}#yiv8130515999 span.yiv8130515999emailstyle171 {color:#1F497D;}#yiv8130515999 p.yiv8130515999msochpdefault1, #yiv8130515999 li.yiv8130515999msochpdefault1, #yiv8130515999 div.yiv8130515999msochpdefault1 {margin-right:0in;margin-left:0in;font-size:10.0pt;}#yiv8130515999 span.yiv8130515999BalloonTextChar {}#yiv8130515999 span.yiv8130515999EmailStyle31 {color:#1F497D;}#yiv8130515999 .yiv8130515999MsoChpDefault {font-size:10.0pt;} _filtered #yiv8130515999 {margin:1.0in 1.0in 1.0in 1.0in;}#yiv8130515999 div.yiv8130515999WordSection1 {}#yiv8130515999 And what is the message rate of each topic mate – that was the other part of the required clarifications From: Laeeq Ahmed [mailto:laeeqsp...@yahoo.com] Sent: Monday, April 20, 2015 3:38 PM To: Evo Eftimov; user@spark.apache.org Subject: Re: Equal number of RDD Blocks Hi, I have two different topics and two Kafka receivers, one for each topic. Regards,Laeeq On Monday, April 20, 2015 4:28 PM, Evo Eftimov evo.efti...@isecc.com wrote: What is meant by “streams” here: 1. Two different DSTream Receivers producing two different DSTreams consuming from two different kafka topics, each with different message rate 2. One kafka topic (hence only one message rate to consider) but with two different DStream receivers (ie running in parallel) giving a start of two different DSTreams From: Laeeq Ahmed [mailto:laeeqsp...@yahoo.com.INVALID] Sent: Monday, April 20, 2015 3:15 PM To: user@spark.apache.org Subject: Equal number of RDD Blocks Hi, I have two streams of data from kafka. How can I make approx. equal number of RDD blocks of on two executors.Please see the attachement, one worker has 1785 RDD blocks and the other has 26. Regards,Laeeq
Spark SQL vs map reduce tableInputOutput
HI All, I am Querying Hbase and combining result and using in my spake job. I am querying hbase using Hbase client api inside my spark job. can anybody suggest me will Spark SQl will be fast enough and provide range of queries? Regards Jeetendra
RE: Equal number of RDD Blocks
What is meant by “streams” here: 1. Two different DSTream Receivers producing two different DSTreams consuming from two different kafka topics, each with different message rate 2. One kafka topic (hence only one message rate to consider) but with two different DStream receivers (ie running in parallel) giving a start of two different DSTreams From: Laeeq Ahmed [mailto:laeeqsp...@yahoo.com.INVALID] Sent: Monday, April 20, 2015 3:15 PM To: user@spark.apache.org Subject: Equal number of RDD Blocks Hi, I have two streams of data from kafka. How can I make approx. equal number of RDD blocks of on two executors. Please see the attachement, one worker has 1785 RDD blocks and the other has 26. Regards, Laeeq
Fail to read files from s3 after upgrading to 1.3
Hi, Today I upgraded our code and cluster to 1.3. We are using Spark 1.3 in Amazon EMR, ami 3.6, include history server and Ganglia. I also migrated all deprecated SchemaRDD into DataFrame. Now when I'm trying to read a parquet files from s3 I get the below exception. Actually it not a problem if my code because I get the same failures using Spark shell. Any ideas? Thanks, Ophir 15/04/20 13:49:20 WARN internal.S3MetadataResponseHandler: Unable to parse last modified date: Wed, 04 Mar 2015 16:20:05 GMT java.lang.IllegalStateException: Joda-time 2.2 or later version is required, but found version: null at com.amazonaws.util.DateUtils.handleException(DateUtils.java:147) at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:195) at com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:73) at com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:115) at com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32) at com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25) at com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:975) at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:702) at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:461) at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:296) at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3735) at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1026) at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1004) at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:199) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:190) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103) at com.sun.proxy.$Proxy34.retrieveMetadata(Unknown Source) at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:743) at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.open(S3NativeFileSystem.java:1098) at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:768) at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.open(EmrFileSystem.java:171) at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:402) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:278) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277) at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56) at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165) at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514) at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.IllegalArgumentException: Invalid format: Wed, 04 Mar 2015 16:20:05 GMT is malformed at GMT at org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:747) at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:193) ... 39 more
Re: Equal number of RDD Blocks
They both have same message rates, 300 record/sec On Monday, April 20, 2015 4:51 PM, Evo Eftimov evo.efti...@isecc.com wrote: #yiv8130515999 #yiv8130515999 -- _filtered #yiv8130515999 {font-family:Helvetica;panose-1:2 11 6 4 2 2 2 2 2 4;} _filtered #yiv8130515999 {panose-1:2 4 5 3 5 4 6 3 2 4;} _filtered #yiv8130515999 {font-family:Calibri;panose-1:2 15 5 2 2 2 4 3 2 4;} _filtered #yiv8130515999 {font-family:Tahoma;panose-1:2 11 6 4 3 5 4 4 2 4;}#yiv8130515999 #yiv8130515999 p.yiv8130515999MsoNormal, #yiv8130515999 li.yiv8130515999MsoNormal, #yiv8130515999 div.yiv8130515999MsoNormal {margin:0in;margin-bottom:.0001pt;font-size:12.0pt;}#yiv8130515999 a:link, #yiv8130515999 span.yiv8130515999MsoHyperlink {color:blue;text-decoration:underline;}#yiv8130515999 a:visited, #yiv8130515999 span.yiv8130515999MsoHyperlinkFollowed {color:purple;text-decoration:underline;}#yiv8130515999 p.yiv8130515999MsoAcetate, #yiv8130515999 li.yiv8130515999MsoAcetate, #yiv8130515999 div.yiv8130515999MsoAcetate {margin:0in;margin-bottom:.0001pt;font-size:8.0pt;}#yiv8130515999 p.yiv8130515999msolistparagraph, #yiv8130515999 li.yiv8130515999msolistparagraph, #yiv8130515999 div.yiv8130515999msolistparagraph {margin-right:0in;margin-left:0in;font-size:12.0pt;}#yiv8130515999 p.yiv8130515999msonormal, #yiv8130515999 li.yiv8130515999msonormal, #yiv8130515999 div.yiv8130515999msonormal {margin-right:0in;margin-left:0in;font-size:12.0pt;}#yiv8130515999 p.yiv8130515999msochpdefault, #yiv8130515999 li.yiv8130515999msochpdefault, #yiv8130515999 div.yiv8130515999msochpdefault {margin-right:0in;margin-left:0in;font-size:12.0pt;}#yiv8130515999 span.yiv8130515999msohyperlink {}#yiv8130515999 span.yiv8130515999msohyperlinkfollowed {}#yiv8130515999 span.yiv8130515999emailstyle17 {}#yiv8130515999 p.yiv8130515999msonormal1, #yiv8130515999 li.yiv8130515999msonormal1, #yiv8130515999 div.yiv8130515999msonormal1 {margin:0in;margin-bottom:.0001pt;font-size:12.0pt;}#yiv8130515999 span.yiv8130515999msohyperlink1 {color:blue;text-decoration:underline;}#yiv8130515999 span.yiv8130515999msohyperlinkfollowed1 {color:purple;text-decoration:underline;}#yiv8130515999 p.yiv8130515999msolistparagraph1, #yiv8130515999 li.yiv8130515999msolistparagraph1, #yiv8130515999 div.yiv8130515999msolistparagraph1 {margin-top:0in;margin-right:0in;margin-bottom:0in;margin-left:.5in;margin-bottom:.0001pt;font-size:12.0pt;}#yiv8130515999 span.yiv8130515999emailstyle171 {color:#1F497D;}#yiv8130515999 p.yiv8130515999msochpdefault1, #yiv8130515999 li.yiv8130515999msochpdefault1, #yiv8130515999 div.yiv8130515999msochpdefault1 {margin-right:0in;margin-left:0in;font-size:10.0pt;}#yiv8130515999 span.yiv8130515999BalloonTextChar {}#yiv8130515999 span.yiv8130515999EmailStyle31 {color:#1F497D;}#yiv8130515999 .yiv8130515999MsoChpDefault {font-size:10.0pt;} _filtered #yiv8130515999 {margin:1.0in 1.0in 1.0in 1.0in;}#yiv8130515999 div.yiv8130515999WordSection1 {}#yiv8130515999 And what is the message rate of each topic mate – that was the other part of the required clarifications From: Laeeq Ahmed [mailto:laeeqsp...@yahoo.com] Sent: Monday, April 20, 2015 3:38 PM To: Evo Eftimov; user@spark.apache.org Subject: Re: Equal number of RDD Blocks Hi, I have two different topics and two Kafka receivers, one for each topic. Regards,Laeeq On Monday, April 20, 2015 4:28 PM, Evo Eftimov evo.efti...@isecc.com wrote: What is meant by “streams” here: 1. Two different DSTream Receivers producing two different DSTreams consuming from two different kafka topics, each with different message rate 2. One kafka topic (hence only one message rate to consider) but with two different DStream receivers (ie running in parallel) giving a start of two different DSTreams From: Laeeq Ahmed [mailto:laeeqsp...@yahoo.com.INVALID] Sent: Monday, April 20, 2015 3:15 PM To: user@spark.apache.org Subject: Equal number of RDD Blocks Hi, I have two streams of data from kafka. How can I make approx. equal number of RDD blocks of on two executors.Please see the attachement, one worker has 1785 RDD blocks and the other has 26. Regards,Laeeq
Re: Fail to read files from s3 after upgrading to 1.3
Interesting: Remove the history server, '-a' option and using ami 3.5 fixed the problem. Now the question is: what made the change?... I vote for the '-a' but let me update... On Mon, Apr 20, 2015 at 5:43 PM, Ophir Cohen oph...@gmail.com wrote: Hi, Today I upgraded our code and cluster to 1.3. We are using Spark 1.3 in Amazon EMR, ami 3.6, include history server and Ganglia. I also migrated all deprecated SchemaRDD into DataFrame. Now when I'm trying to read a parquet files from s3 I get the below exception. Actually it not a problem if my code because I get the same failures using Spark shell. Any ideas? Thanks, Ophir 15/04/20 13:49:20 WARN internal.S3MetadataResponseHandler: Unable to parse last modified date: Wed, 04 Mar 2015 16:20:05 GMT java.lang.IllegalStateException: Joda-time 2.2 or later version is required, but found version: null at com.amazonaws.util.DateUtils.handleException(DateUtils.java:147) at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:195) at com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:73) at com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:115) at com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32) at com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25) at com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:975) at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:702) at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:461) at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:296) at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3735) at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1026) at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1004) at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:199) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:190) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103) at com.sun.proxy.$Proxy34.retrieveMetadata(Unknown Source) at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:743) at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.open(S3NativeFileSystem.java:1098) at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:768) at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.open(EmrFileSystem.java:171) at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:402) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:278) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277) at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56) at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165) at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514) at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.IllegalArgumentException: Invalid format: Wed, 04 Mar 2015 16:20:05 GMT is malformed at GMT at org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:747) at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:193) ... 39 more
Re: Configuring logging properties for executor
Rename your log4j_special.properties file as log4j.properties and place it under the root of your jar file, you should be fine. If you are using Maven to build your jar, please the log4j.properties in the src/main/resources folder. However, please note that if you have other dependency jar file in the classpath that contains another log4j.properties file this way, it might not work since the first log4j.properties file that is loaded will be used. You can also do spark-submit —file log4j_special.properties … ,which should transfer your log4j property file to the worker nodes automatically without you copying them manually. Lan On Apr 20, 2015, at 9:26 AM, Michael Ryabtsev michael...@gmail.com wrote: Hi all, I need to configure spark executor log4j.properties on a standalone cluster. It looks like placing the relevant properties file in the spark configuration folder and setting the spark.executor.extraJavaOptions from my application code: sparkConf.set(spark.executor.extraJavaOptions, -Dlog4j.configuration=log4j_special.properties); does the work, and the executor logs are written in the required place and level. As far as I understand, it works, because the spark configuration folder is on the class path, and passing parameter without path works here. However, I would like to avoid deploying these properties to each worker spark configuration folder. I wonder, if I put the properties in my application jar, is there any way of telling executor to load them? Thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Configuring-logging-properties-for-executor-tp22572.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Configuring logging properties for executor
Hi all, I need to configure spark executor log4j.properties on a standalone cluster. It looks like placing the relevant properties file in the spark configuration folder and setting the spark.executor.extraJavaOptions from my application code: sparkConf.set(spark.executor.extraJavaOptions, -Dlog4j.configuration=log4j_special.properties); does the work, and the executor logs are written in the required place and level. As far as I understand, it works, because the spark configuration folder is on the class path, and passing parameter without path works here. However, I would like to avoid deploying these properties to each worker spark configuration folder. I wonder, if I put the properties in my application jar, is there any way of telling executor to load them? Thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Configuring-logging-properties-for-executor-tp22572.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Equal number of RDD Blocks
Hi, I have two different topics and two Kafka receivers, one for each topic. Regards,Laeeq On Monday, April 20, 2015 4:28 PM, Evo Eftimov evo.efti...@isecc.com wrote: #yiv4992037734 #yiv4992037734 -- _filtered #yiv4992037734 {panose-1:2 4 5 3 5 4 6 3 2 4;} _filtered #yiv4992037734 {font-family:Calibri;panose-1:2 15 5 2 2 2 4 3 2 4;} _filtered #yiv4992037734 {font-family:Tahoma;panose-1:2 11 6 4 3 5 4 4 2 4;}#yiv4992037734 #yiv4992037734 p.yiv4992037734MsoNormal, #yiv4992037734 li.yiv4992037734MsoNormal, #yiv4992037734 div.yiv4992037734MsoNormal {margin:0in;margin-bottom:.0001pt;font-size:12.0pt;}#yiv4992037734 a:link, #yiv4992037734 span.yiv4992037734MsoHyperlink {color:blue;text-decoration:underline;}#yiv4992037734 a:visited, #yiv4992037734 span.yiv4992037734MsoHyperlinkFollowed {color:purple;text-decoration:underline;}#yiv4992037734 p.yiv4992037734MsoListParagraph, #yiv4992037734 li.yiv4992037734MsoListParagraph, #yiv4992037734 div.yiv4992037734MsoListParagraph {margin-top:0in;margin-right:0in;margin-bottom:0in;margin-left:.5in;margin-bottom:.0001pt;font-size:12.0pt;}#yiv4992037734 span.yiv4992037734EmailStyle17 {color:#1F497D;}#yiv4992037734 .yiv4992037734MsoChpDefault {font-size:10.0pt;} _filtered #yiv4992037734 {margin:1.0in 1.0in 1.0in 1.0in;}#yiv4992037734 div.yiv4992037734WordSection1 {}#yiv4992037734 _filtered #yiv4992037734 {} _filtered #yiv4992037734 {}#yiv4992037734 ol {margin-bottom:0in;}#yiv4992037734 ul {margin-bottom:0in;}#yiv4992037734 What is meant by “streams” here: 1. Two different DSTream Receivers producing two different DSTreams consuming from two different kafka topics, each with different message rate 2. One kafka topic (hence only one message rate to consider) but with two different DStream receivers (ie running in parallel) giving a start of two different DSTreams From: Laeeq Ahmed [mailto:laeeqsp...@yahoo.com.INVALID] Sent: Monday, April 20, 2015 3:15 PM To: user@spark.apache.org Subject: Equal number of RDD Blocks Hi, I have two streams of data from kafka. How can I make approx. equal number of RDD blocks of on two executors.Please see the attachement, one worker has 1785 RDD blocks and the other has 26. Regards,Laeeq
Equal number of RDD Blocks
Hi, I have two streams of data from kafka. How can I make approx. equal number of RDD blocks of on two executors.Please see the attachement, one worker has 1785 RDD blocks and the other has 26. Regards,Laeeq - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Equal number of RDD Blocks
And what is the message rate of each topic mate – that was the other part of the required clarifications From: Laeeq Ahmed [mailto:laeeqsp...@yahoo.com] Sent: Monday, April 20, 2015 3:38 PM To: Evo Eftimov; user@spark.apache.org Subject: Re: Equal number of RDD Blocks Hi, I have two different topics and two Kafka receivers, one for each topic. Regards, Laeeq On Monday, April 20, 2015 4:28 PM, Evo Eftimov evo.efti...@isecc.com wrote: What is meant by “streams” here: 1. Two different DSTream Receivers producing two different DSTreams consuming from two different kafka topics, each with different message rate 2. One kafka topic (hence only one message rate to consider) but with two different DStream receivers (ie running in parallel) giving a start of two different DSTreams From: Laeeq Ahmed [mailto:laeeqsp...@yahoo.com.INVALID] Sent: Monday, April 20, 2015 3:15 PM To: user@spark.apache.org Subject: Equal number of RDD Blocks Hi, I have two streams of data from kafka. How can I make approx. equal number of RDD blocks of on two executors. Please see the attachement, one worker has 1785 RDD blocks and the other has 26. Regards, Laeeq
RE: Equal number of RDD Blocks
Well spark steraming is supposed to create / distribute the Receivers on different cluster nodes. If you are saying that actually your receivers are running on the same node probably that node is getting most of the data to minimize the network transfer costs If you want to distribute your data more evenly you can partition it explicitly Also contact Data Bricks why the Receivers are not being distributed on different cluster nodes From: Laeeq Ahmed [mailto:laeeqsp...@yahoo.com] Sent: Monday, April 20, 2015 3:57 PM To: Evo Eftimov; user@spark.apache.org Subject: Re: Equal number of RDD Blocks I also see that its creating both receivers on the same executor and that might be the cause of having more RDDs on executor than the other. Can I suggest spark to create each receiver on a each executor Regards, Laeeq On Monday, April 20, 2015 4:51 PM, Evo Eftimov evo.efti...@isecc.com wrote: And what is the message rate of each topic mate – that was the other part of the required clarifications From: Laeeq Ahmed [mailto:laeeqsp...@yahoo.com] Sent: Monday, April 20, 2015 3:38 PM To: Evo Eftimov; user@spark.apache.org Subject: Re: Equal number of RDD Blocks Hi, I have two different topics and two Kafka receivers, one for each topic. Regards, Laeeq On Monday, April 20, 2015 4:28 PM, Evo Eftimov evo.efti...@isecc.com wrote: What is meant by “streams” here: 1. Two different DSTream Receivers producing two different DSTreams consuming from two different kafka topics, each with different message rate 2. One kafka topic (hence only one message rate to consider) but with two different DStream receivers (ie running in parallel) giving a start of two different DSTreams From: Laeeq Ahmed [mailto:laeeqsp...@yahoo.com.INVALID] Sent: Monday, April 20, 2015 3:15 PM To: user@spark.apache.org Subject: Equal number of RDD Blocks Hi, I have two streams of data from kafka. How can I make approx. equal number of RDD blocks of on two executors. Please see the attachement, one worker has 1785 RDD blocks and the other has 26. Regards, Laeeq
Shuffle files not cleaned up (Spark 1.2.1)
Hi all, I had posed this query as part of a different thread but did not get a response there. So creating a new thread hoping to catch someone's attention. We are experiencing this issue of shuffle files being left behind and not being cleaned up by Spark. Since this is a Spark streaming application, it is expected to stay up indefinitely, so shuffle files not being cleaned up is a big problem right now. Our max window size is 6 hours, so we have set up a cron job to clean up shuffle files older than 12 hours otherwise it will eat up all our disk space. Please see the following. It seems the non-cleaning of shuffle files is being documented in 1.3.1. https://github.com/apache/spark/pull/5074/files https://issues.apache.org/jira/browse/SPARK-5836 Also, for some reason, the following JIRAs that were reported as functional issues were closed as Duplicates of the above Documentation bug. Does this mean that this issue won't be tackled at all? https://issues.apache.org/jira/browse/SPARK-3563 https://issues.apache.org/jira/browse/SPARK-4796 https://issues.apache.org/jira/browse/SPARK-6011 Any further insight into whether this is being looked into and meanwhile how to handle shuffle files will be greatly appreciated. Thanks NB
Task not Serializable: Graph is unexpectedly null when DStream is being serialized
Hi, I am getting this serialization exception and I am not too sure what Graph is unexpectedly null when DStream is being serialized means? 15/04/20 06:12:38 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: Task not serializable) Exception in thread Driver org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable( ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean( ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1435) at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:438) [...] Caused by: java.io.NotSerializableException: Graph is unexpectedly null when DStream is being serialized. at org.apache.spark.streaming.dstream.DStream$anonfun$ writeObject$1.apply$mcV$sp(DStream.scala:420) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985) at org.apache.spark.streaming.dstream.DStream.writeObject( DStream.scala:403) The operation comes down to something like this: dstream.map(tuple = { val w = StreamState.fetch[K,W](state.prefixKey, tuple._1) (tuple._1, (tuple._2, w)) }) And StreamState being a very simple standalone object: object StreamState { def fetch[K : ClassTag : Ordering, V : ClassTag](prefixKey: String, key: K) : Option[V] = None } However if I remove the context bounds from K in fetch e.g. removing ClassTag and Ordering then everything is fine. If anyone has some pointers, I'd really appreciate it. Thanks,
Re: Shuffle files not cleaned up (Spark 1.2.1)
Write a crone job for this like below 12 * * * * find $SPARK_HOME/work -cmin +1440 -prune -exec rm -rf {} \+ 32 * * * * find /tmp -type d -cmin +1440 -name spark-*-*-* -prune -exec rm -rf {} \+ 52 * * * * find $SPARK_LOCAL_DIR -mindepth 1 -maxdepth 1 -type d -cmin +1440 -name spark-*-*-* -prune -exec rm -rf {} \+ On 20 April 2015 at 23:12, N B nb.nos...@gmail.com wrote: Hi all, I had posed this query as part of a different thread but did not get a response there. So creating a new thread hoping to catch someone's attention. We are experiencing this issue of shuffle files being left behind and not being cleaned up by Spark. Since this is a Spark streaming application, it is expected to stay up indefinitely, so shuffle files not being cleaned up is a big problem right now. Our max window size is 6 hours, so we have set up a cron job to clean up shuffle files older than 12 hours otherwise it will eat up all our disk space. Please see the following. It seems the non-cleaning of shuffle files is being documented in 1.3.1. https://github.com/apache/spark/pull/5074/files https://issues.apache.org/jira/browse/SPARK-5836 Also, for some reason, the following JIRAs that were reported as functional issues were closed as Duplicates of the above Documentation bug. Does this mean that this issue won't be tackled at all? https://issues.apache.org/jira/browse/SPARK-3563 https://issues.apache.org/jira/browse/SPARK-4796 https://issues.apache.org/jira/browse/SPARK-6011 Any further insight into whether this is being looked into and meanwhile how to handle shuffle files will be greatly appreciated. Thanks NB
Re: Did anybody run Spark-perf on powerpc?
This appears to be a problem with SSL. I'm facing the same issue.. Did you get around this somehow? I'm running IBM Java 8, on linux ppc64le. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Did-anybody-run-Spark-perf-on-powerpc-tp22329p22575.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Configuring logging properties for executor
Hi Lan, Thanks for fast response. It could be a solution if it works. I have more than one log4 properties file, for different run modes like debug/production, for executor and for application core. I think I would like to keep them separate. Then, I suppose I should give all other properties files a special names and keep the executor configuration with the default name? Can I conclude that going this way I will not be able to run several applications on the same cluster in parallel? Regarding submit, I am not using it now, I submit from the code, but I think I should consider this option. Thanks. On Mon, Apr 20, 2015 at 5:59 PM, Lan Jiang ljia...@gmail.com wrote: Rename your log4j_special.properties file as log4j.properties and place it under the root of your jar file, you should be fine. If you are using Maven to build your jar, please the log4j.properties in the src/main/resources folder. However, please note that if you have other dependency jar file in the classpath that contains another log4j.properties file this way, it might not work since the first log4j.properties file that is loaded will be used. You can also do spark-submit —file log4j_special.properties … ,which should transfer your log4j property file to the worker nodes automatically without you copying them manually. Lan On Apr 20, 2015, at 9:26 AM, Michael Ryabtsev michael...@gmail.com wrote: Hi all, I need to configure spark executor log4j.properties on a standalone cluster. It looks like placing the relevant properties file in the spark configuration folder and setting the spark.executor.extraJavaOptions from my application code: sparkConf.set(spark.executor.extraJavaOptions, -Dlog4j.configuration=log4j_special.properties); does the work, and the executor logs are written in the required place and level. As far as I understand, it works, because the spark configuration folder is on the class path, and passing parameter without path works here. However, I would like to avoid deploying these properties to each worker spark configuration folder. I wonder, if I put the properties in my application jar, is there any way of telling executor to load them? Thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Configuring-logging-properties-for-executor-tp22572.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Configuring logging properties for executor
Each application gets its own executor processes, so there should be no problem running them in parallel. Lan On Apr 20, 2015, at 10:25 AM, Michael Ryabtsev michael...@gmail.com wrote: Hi Lan, Thanks for fast response. It could be a solution if it works. I have more than one log4 properties file, for different run modes like debug/production, for executor and for application core. I think I would like to keep them separate. Then, I suppose I should give all other properties files a special names and keep the executor configuration with the default name? Can I conclude that going this way I will not be able to run several applications on the same cluster in parallel? Regarding submit, I am not using it now, I submit from the code, but I think I should consider this option. Thanks. On Mon, Apr 20, 2015 at 5:59 PM, Lan Jiang ljia...@gmail.com mailto:ljia...@gmail.com wrote: Rename your log4j_special.properties file as log4j.properties and place it under the root of your jar file, you should be fine. If you are using Maven to build your jar, please the log4j.properties in the src/main/resources folder. However, please note that if you have other dependency jar file in the classpath that contains another log4j.properties file this way, it might not work since the first log4j.properties file that is loaded will be used. You can also do spark-submit —file log4j_special.properties … ,which should transfer your log4j property file to the worker nodes automatically without you copying them manually. Lan On Apr 20, 2015, at 9:26 AM, Michael Ryabtsev michael...@gmail.com mailto:michael...@gmail.com wrote: Hi all, I need to configure spark executor log4j.properties on a standalone cluster. It looks like placing the relevant properties file in the spark configuration folder and setting the spark.executor.extraJavaOptions from my application code: sparkConf.set(spark.executor.extraJavaOptions, -Dlog4j.configuration=log4j_special.properties); does the work, and the executor logs are written in the required place and level. As far as I understand, it works, because the spark configuration folder is on the class path, and passing parameter without path works here. However, I would like to avoid deploying these properties to each worker spark configuration folder. I wonder, if I put the properties in my application jar, is there any way of telling executor to load them? Thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Configuring-logging-properties-for-executor-tp22572.html http://apache-spark-user-list.1001560.n3.nabble.com/Configuring-logging-properties-for-executor-tp22572.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org
How can i custom the external initialize when start the spark cluster
Hi All, I had a question about Spark thriftserver . I want to load some table when the Spark server started . How can i config the external initialization in spark , i guess the spark should had a interface can config in the spark-default.conf , and we can implements the initialization function to add the business logic like cache table , crate temporary table etc . Please help to verify correct or not , or if anyother correct way .Thank you very much. Best Regards . Jacky .
Re: Did anybody run Spark-perf on powerpc?
This appears to be a problem with SSL. I'm facing the same issue.. Did you get around this somehow? I'm running IBM Java 8, on linux ppc64le. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Did-anybody-run-Spark-perf-on-powerpc-tp22329p22576.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Fail to read files from s3 after upgrading to 1.3
And the winner is: ami 3.6. Apparently it does not work with it... ami 3.5 works great. Interesting: Remove the history server, '-a' option and using ami 3.5 fixed the problem. Now the question is: what made the change?... I vote for the '-a' but let me update... On Mon, Apr 20, 2015 at 5:43 PM, Ophir Cohen oph...@gmail.com wrote: Hi, Today I upgraded our code and cluster to 1.3. We are using Spark 1.3 in Amazon EMR, ami 3.6, include history server and Ganglia. I also migrated all deprecated SchemaRDD into DataFrame. Now when I'm trying to read a parquet files from s3 I get the below exception. Actually it not a problem if my code because I get the same failures using Spark shell. Any ideas? Thanks, Ophir 15/04/20 13:49:20 WARN internal.S3MetadataResponseHandler: Unable to parse last modified date: Wed, 04 Mar 2015 16:20:05 GMT java.lang.IllegalStateException: Joda-time 2.2 or later version is required, but found version: null at com.amazonaws.util.DateUtils.handleException(DateUtils.java:147) at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:195) at com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:73) at com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:115) at com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32) at com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25) at com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:975) at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:702) at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:461) at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:296) at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3735) at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1026) at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1004) at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:199) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:190) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103) at com.sun.proxy.$Proxy34.retrieveMetadata(Unknown Source) at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:743) at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.open(S3NativeFileSystem.java:1098) at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:768) at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.open(EmrFileSystem.java:171) at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:402) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:278) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277) at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56) at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165) at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514) at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.IllegalArgumentException: Invalid format: Wed, 04 Mar 2015 16:20:05 GMT is malformed at GMT at org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:747) at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:193) ... 39 more
Re: Spark SQL vs map reduce tableInputOutput
Thanks for reply. Does phoenix using inside Spark will be useful? what is the best way to bring data from Hbase into Spark in terms performance of application? Regards Jeetendra On 20 April 2015 at 20:49, Ted Yu yuzhih...@gmail.com wrote: To my knowledge, Spark SQL currently doesn't provide range scan capability against hbase. Cheers On Apr 20, 2015, at 7:54 AM, Jeetendra Gangele gangele...@gmail.com wrote: HI All, I am Querying Hbase and combining result and using in my spake job. I am querying hbase using Hbase client api inside my spark job. can anybody suggest me will Spark SQl will be fast enough and provide range of queries? Regards Jeetendra
Re: Spark SQL vs map reduce tableInputOutput
To my knowledge, Spark SQL currently doesn't provide range scan capability against hbase. Cheers On Apr 20, 2015, at 7:54 AM, Jeetendra Gangele gangele...@gmail.com wrote: HI All, I am Querying Hbase and combining result and using in my spake job. I am querying hbase using Hbase client api inside my spark job. can anybody suggest me will Spark SQl will be fast enough and provide range of queries? Regards Jeetendra - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Unsupported types in org.apache.spark.sql.jdbc.JDBCRDD$.getCatalystType
So I am trying to pull data from an external database using JDBC MapString, String options = new HashMap(); options.put(driver, driver); options.put(url, dburl); options.put(dbtable, tmpTrunk); DataFrame tbTrunkInfo = sqlContext.load(jdbc, options); And the following exception gets thrown: java.sql.SQLException: Unsupported type -9 at org.apache.spark.sql.jdbc.JDBCRDD$.getCatalystType(JDBCRDD.scala:76) at org.apache.spark.sql.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:110) at org.apache.spark.sql.jdbc.JDBCRelation.init(JDBCRelation.scala:125) at org.apache.spark.sql.jdbc.DefaultSource.createRelation(JDBCRelation.scala:114) at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:290) at org.apache.spark.sql.SQLContext.load(SQLContext.scala:679) at org.apache.spark.sql.SQLContext.load(SQLContext.scala:667) I looked the schema for the table tmpTrunk and the function org.apache.spark.sql.jdbc.JDBCRDD$.getCatalystType(JDBCRDD.scala:76) And it appears that the datatype nvarchar is not supported, among several other commonly used datatypes. Is it possible to request an expansion of this method to include more java.sql.Types? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Unsupported-types-in-org-apache-spark-sql-jdbc-JDBCRDD-getCatalystType-tp22573.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Super slow caching in 1.3?
Now this is very important: “Normal RDDs” refers to “batch RDDs”. However the default in-memory Serialization of RDDs which are part of DSTream is “Srialized” rather than actual (hydrated) Objects. The Spark documentation states that “Serialization” is required for space and garbage collection efficiency (but creates higher CPU load) – which makes sense consider the large number of RDDs which get discarded in a streaming app So what does Data Bricks actually recommend as Object Oriented model for RDD elements used in Spark Streaming apps – flat or not and can you provide a detailed description / spec of both From: Michael Armbrust [mailto:mich...@databricks.com] Sent: Thursday, April 16, 2015 7:23 PM To: Evo Eftimov Cc: Christian Perez; user Subject: Re: Super slow caching in 1.3? Here are the types that we specialize, other types will be much slower. This is only for Spark SQL, normal RDDs do not serialize data that is cached. I'll also not that until yesterday we were missing FloatType https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala#L154 Christian, can you provide the schema of the fast and slow datasets? On Thu, Apr 16, 2015 at 10:14 AM, Evo Eftimov evo.efti...@isecc.com wrote: Michael what exactly do you mean by flattened version/structure here e.g.: 1. An Object with only primitive data types as attributes 2. An Object with no more than one level of other Objects as attributes 3. An Array/List of primitive types 4. An Array/List of Objects This question is in general about RDDs not necessarily RDDs in the context of SparkSQL When answering can you also score how bad the performance of each of the above options is -Original Message- From: Christian Perez [mailto:christ...@svds.com] Sent: Thursday, April 16, 2015 6:09 PM To: Michael Armbrust Cc: user Subject: Re: Super slow caching in 1.3? Hi Michael, Good question! We checked 1.2 and found that it is also slow cacheing the same flat parquet file. Caching other file formats of the same data were faster by up to a factor of ~2. Note that the parquet file was created in Impala but the other formats were written by Spark SQL. Cheers, Christian On Mon, Apr 6, 2015 at 6:17 PM, Michael Armbrust mich...@databricks.com wrote: Do you think you are seeing a regression from 1.2? Also, are you caching nested data or flat rows? The in-memory caching is not really designed for nested data and so performs pretty slowly here (its just falling back to kryo and even then there are some locking issues). If so, would it be possible to try caching a flattened version? CACHE TABLE flattenedTable AS SELECT ... FROM parquetTable On Mon, Apr 6, 2015 at 5:00 PM, Christian Perez christ...@svds.com wrote: Hi all, Has anyone else noticed very slow time to cache a Parquet file? It takes 14 s per 235 MB (1 block) uncompressed node local Parquet file on M2 EC2 instances. Or are my expectations way off... Cheers, Christian -- Christian Perez Silicon Valley Data Science Data Analyst christ...@svds.com @cp_phd - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Christian Perez Silicon Valley Data Science Data Analyst christ...@svds.com @cp_phd - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Custom paritioning of DSTream
Is the only way to implement a custom partitioning of DStream via the foreach approach so to gain access to the actual RDDs comprising the DSTReam and hence their paritionBy method DSTReam has only a repartition method accepting only the number of partitions, BUT not the method of partitioning -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Custom-paritioning-of-DSTream-tp22574.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org