Re: MatchError in JsonRDD.toLong
Hi again, On Fri, Jan 16, 2015 at 4:25 PM, Tobias Pfeiffer t...@preferred.jp wrote: Now I'm wondering where this comes from (I haven't touched this component in a while, nor upgraded Spark etc.) [...] So the reason that the error is showing up now is that suddenly data from a different dataset is showing up in my test dataset... don't ask me... anyway, this different dataset contains data like {Click:nonclicked, Impression:1, DisplayURL:4401798909506983219, AdId:21215341, ...} {Click:nonclicked, Impression:1, DisplayURL:14452800566866169008, AdId:10587781, ...} and the DisplayURL seems to be too long for Long, while it is still inferred as a Long column. So, what to do about this? Is jsonRDD inherently incapable of handling those long numbers or is it just an issue in the schema inference and I should file a JIRA issue? Thanks Tobias
RE: MatchError in JsonRDD.toLong
Hi Tobias, Can you provide how you create the JsonRDD? Thanks, Daoyuan From: Tobias Pfeiffer [mailto:t...@preferred.jp] Sent: Friday, January 16, 2015 4:01 PM To: user Subject: Re: MatchError in JsonRDD.toLong Hi again, On Fri, Jan 16, 2015 at 4:25 PM, Tobias Pfeiffer t...@preferred.jpmailto:t...@preferred.jp wrote: Now I'm wondering where this comes from (I haven't touched this component in a while, nor upgraded Spark etc.) [...] So the reason that the error is showing up now is that suddenly data from a different dataset is showing up in my test dataset... don't ask me... anyway, this different dataset contains data like {Click:nonclicked, Impression:1, DisplayURL:4401798909506983219, AdId:21215341, ...} {Click:nonclicked, Impression:1, DisplayURL:14452800566866169008, AdId:10587781, ...} and the DisplayURL seems to be too long for Long, while it is still inferred as a Long column. So, what to do about this? Is jsonRDD inherently incapable of handling those long numbers or is it just an issue in the schema inference and I should file a JIRA issue? Thanks Tobias
Re: MatchError in JsonRDD.toLong
Hi, On Fri, Jan 16, 2015 at 5:55 PM, Wang, Daoyuan daoyuan.w...@intel.com wrote: Can you provide how you create the JsonRDD? This should be reproducible in the Spark shell: - import org.apache.spark.sql._ val sqlc = new SparkContext(sc) val rdd = sc.parallelize({Click:nonclicked, Impression:1, DisplayURL:4401798909506983219, AdId:21215341} :: {Click:nonclicked, Impression:1, DisplayURL:14452800566866169008, AdId:10587781} :: Nil) // works fine val json = sqlc.jsonRDD(rdd) json.registerTempTable(test) sqlc.sql(SELECT * FROM test).collect // - MatchError val json2 = sqlc.jsonRDD(rdd, 0.1) json2.registerTempTable(test2) sqlc.sql(SELECT * FROM test2).collect - I guess the issue in the latter case is that the column is inferred as Long when some rows actually are too big for Long... Thanks Tobias
RE: MatchError in JsonRDD.toLong
The second parameter of jsonRDD is the sampling ratio when we infer schema. Thanks, Daoyuan From: Tobias Pfeiffer [mailto:t...@preferred.jp] Sent: Friday, January 16, 2015 5:11 PM To: Wang, Daoyuan Cc: user Subject: Re: MatchError in JsonRDD.toLong Hi, On Fri, Jan 16, 2015 at 5:55 PM, Wang, Daoyuan daoyuan.w...@intel.commailto:daoyuan.w...@intel.com wrote: Can you provide how you create the JsonRDD? This should be reproducible in the Spark shell: - import org.apache.spark.sql._ val sqlc = new SparkContext(sc) val rdd = sc.parallelize({Click:nonclicked, Impression:1, DisplayURL:4401798909506983219, AdId:21215341} :: {Click:nonclicked, Impression:1, DisplayURL:14452800566866169008, AdId:10587781} :: Nil) // works fine val json = sqlc.jsonRDD(rdd) json.registerTempTable(test) sqlc.sql(SELECT * FROM test).collect // - MatchError val json2 = sqlc.jsonRDD(rdd, 0.1) json2.registerTempTable(test2) sqlc.sql(SELECT * FROM test2).collect - I guess the issue in the latter case is that the column is inferred as Long when some rows actually are too big for Long... Thanks Tobias
RE: MatchError in JsonRDD.toLong
And you can use jsonRDD(json:RDD[String], schema:StructType) to clearly clarify your schema. For numbers later than Long, we can use DecimalType. Thanks, Daoyuan From: Wang, Daoyuan [mailto:daoyuan.w...@intel.com] Sent: Friday, January 16, 2015 5:14 PM To: Tobias Pfeiffer Cc: user Subject: RE: MatchError in JsonRDD.toLong The second parameter of jsonRDD is the sampling ratio when we infer schema. Thanks, Daoyuan From: Tobias Pfeiffer [mailto:t...@preferred.jp] Sent: Friday, January 16, 2015 5:11 PM To: Wang, Daoyuan Cc: user Subject: Re: MatchError in JsonRDD.toLong Hi, On Fri, Jan 16, 2015 at 5:55 PM, Wang, Daoyuan daoyuan.w...@intel.commailto:daoyuan.w...@intel.com wrote: Can you provide how you create the JsonRDD? This should be reproducible in the Spark shell: - import org.apache.spark.sql._ val sqlc = new SparkContext(sc) val rdd = sc.parallelize({Click:nonclicked, Impression:1, DisplayURL:4401798909506983219, AdId:21215341} :: {Click:nonclicked, Impression:1, DisplayURL:14452800566866169008, AdId:10587781} :: Nil) // works fine val json = sqlc.jsonRDD(rdd) json.registerTempTable(test) sqlc.sql(SELECT * FROM test).collect // - MatchError val json2 = sqlc.jsonRDD(rdd, 0.1) json2.registerTempTable(test2) sqlc.sql(SELECT * FROM test2).collect - I guess the issue in the latter case is that the column is inferred as Long when some rows actually are too big for Long... Thanks Tobias
MLib: How to set preferences for ALS implicit feedback in Collaborative Filtering?
I am trying to use Spark MLib ALS with implicit feedback for collaborative filtering. Input data has only two fields `userId` and `productId`. I have **no product ratings**, just info on what products users have bought, that's all. So to train ALS I use: def trainImplicit(ratings: RDD[Rating], rank: Int, iterations: Int): MatrixFactorizationModel ( http://spark.apache.org/docs/1.0.0/api/scala/index.html#org.apache.spark.mllib.recommendation.ALS$ ) This API requires `Rating` object: Rating(user: Int, product: Int, rating: Double) On the other hand documentation on `trainImplicit` tells: *Train a matrix factorization model given an RDD of 'implicit preferences' ratings given by users to some products, in the form of (userID, productID, **preference**) pairs.* When I set rating / preferences to `1` as in: val ratings = sc.textFile(new File(dir, file).toString).map { line = val fields = line.split(,) // format: (randomNumber, Rating(userId, productId, rating)) (rnd.nextInt(100), Rating(fields(0).toInt, fields(1).toInt, 1.0)) } val training = ratings.filter(x = x._1 60) .values .repartition(numPartitions) .cache() val validation = ratings.filter(x = x._1 = 60 x._1 80) .values .repartition(numPartitions) .cache() val test = ratings.filter(x = x._1 = 80).values.cache() And then train ALSL: val model = ALS.trainImplicit(ratings, rank, numIter) I get RMSE 0.9, which is a big error in case of preferences taking 0 or 1 value: val validationRmse = computeRmse(model, validation, numValidation) /** Compute RMSE (Root Mean Squared Error). */ def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating], n: Long): Double = { val predictions: RDD[Rating] = model.predict(data.map(x = (x.user, x.product))) val predictionsAndRatings = predictions.map(x = ((x.user, x.product), x.rating)) .join(data.map(x = ((x.user, x.product), x.rating))) .values math.sqrt(predictionsAndRatings.map(x = (x._1 - x._2) * (x._1 - x._2)).reduce(_ + _) / n) } So my question is: to what value should I set `rating` in: Rating(user: Int, product: Int, rating: Double) for implicit training (in `ALS.trainImplicit` method) ? **Update** With: val alpha = 40 val lambda = 0.01 I get: Got 1895593 ratings from 17471 users on 462685 products. Training: 1136079, validation: 380495, test: 379019 RMSE (validation) = 0.7537217888106758 for the model trained with rank = 8 and numIter = 10. RMSE (validation) = 0.7489005441881798 for the model trained with rank = 8 and numIter = 20. RMSE (validation) = 0.7387672873747732 for the model trained with rank = 12 and numIter = 10. RMSE (validation) = 0.7310003522283959 for the model trained with rank = 12 and numIter = 20. The best model was trained with rank = 12, and numIter = 20, and its RMSE on the test set is 0.7302343904091481. baselineRmse: 0.0 testRmse: 0.7302343904091481 The best model improves the baseline by -Infinity%. Which is still a big error, I guess. Also I get strange baseline improvement where baseline model is simply mean (1).
Why custom parquet format hive table execute ParquetTableScan physical plan, not HiveTableScan?
Hi all! In the Spark SQL1.2.0. I create a hive table with custom parquet inputformat and outputformat. like this : CREATE TABLE test( id string, msg string) CLUSTERED BY ( id) SORTED BY ( id ASC) INTO 10 BUCKETS ROW FORMAT SERDE '*com.a.MyParquetHiveSerDe*' STORED AS INPUTFORMAT '*com.a.MyParquetInputFormat*' OUTPUTFORMAT '*com.a.MyParquetOutputFormat*'; And the spark shell see the plan of select * from test is : [== Physical Plan ==] [!OutputFaker [id#5,msg#6]] [ *ParquetTableScan* [id#12,msg#13], (ParquetRelation hdfs://hadoop/user/hive/warehouse/test.db/test, Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml), org.apache.spark.sql.hive.HiveContext@6d15a113, []), []] *Not HiveTableScan*!!! *So it dosn't execute my custom inputformat!* Why? How can it execute my custom inputformat? Thanks!
terasort on spark
Hi all , i tried to run a terasort benchmark on my spark cluster, but i found it is hard to find a standard spark terasort program except a PR from rxin and ewan higgs: https://github.com/apache/spark/pull/1242 https://github.com/ehiggs/spark/tree/terasort The example which rxin provided without a validate test so i tried higgs's example, but i sadly found a always get an error when validate: assertion failed: current partition min last partition max It seems that it requires the min array in partition 2 must bigger than max array in partion 1, but the code here is confusing: println(slastMax + lastMax.toSeq.map(x = if (x 0) 256 + x else x)) println(smin + min.toSeq.map(x = if (x 0) 256 + x else x)) println(smax + max.toSeq.map(x = if (x 0) 256 + x else x)) Anyone ever run the terasort example successfully? Or where can i get a standard terasort application?
Re: How to define SparkContext with Cassandra connection for spark-jobserver?
Thank you Abhishek. The code works. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-define-SparkContext-with-Cassandra-connection-for-spark-jobserver-tp21119p21184.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
MLib: How to set preferences for ALS implicit feedback in Collaborative Filtering?
I am trying to use Spark MLib ALS with implicit feedback for collaborative filtering. Input data has only two fields `userId` and `productId`. I have **no product ratings**, just info on what products users have bought, that's all. So to train ALS I use: def trainImplicit(ratings: RDD[Rating], rank: Int, iterations: Int): MatrixFactorizationModel (http://spark.apache.org/docs/1.0.0/api/scala/index.html#org.apache.spark.mllib.recommendation.ALS$) This API requires `Rating` object: Rating(user: Int, product: Int, rating: Double) On the other hand documentation on `trainImplicit` tells: *Train a matrix factorization model given an RDD of 'implicit preferences' ratings given by users to some products, in the form of (userID, productID, **preference**) pairs.* When I set rating / preferences to `1` as in: val ratings = sc.textFile(new File(dir, file).toString).map { line = val fields = line.split(,) // format: (randomNumber, Rating(userId, productId, rating)) (rnd.nextInt(100), Rating(fields(0).toInt, fields(1).toInt, 1.0)) } val training = ratings.filter(x = x._1 60) .values .repartition(numPartitions) .cache() val validation = ratings.filter(x = x._1 = 60 x._1 80) .values .repartition(numPartitions) .cache() val test = ratings.filter(x = x._1 = 80).values.cache() And then train ALSL: val model = ALS.trainImplicit(ratings, rank, numIter) I get RMSE 0.9, which is a big error in case of preferences taking 0 or 1 value: val validationRmse = computeRmse(model, validation, numValidation) /** Compute RMSE (Root Mean Squared Error). */ def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating], n: Long): Double = { val predictions: RDD[Rating] = model.predict(data.map(x = (x.user, x.product))) val predictionsAndRatings = predictions.map(x = ((x.user, x.product), x.rating)) .join(data.map(x = ((x.user, x.product), x.rating))) .values math.sqrt(predictionsAndRatings.map(x = (x._1 - x._2) * (x._1 - x._2)).reduce(_ + _) / n) } So my question is: to what value should I set `rating` in: Rating(user: Int, product: Int, rating: Double) for implicit training (in `ALS.trainImplicit` method) ? **Update** With: val alpha = 40 val lambda = 0.01 I get: Got 1895593 ratings from 17471 users on 462685 products. Training: 1136079, validation: 380495, test: 379019 RMSE (validation) = 0.7537217888106758 for the model trained with rank = 8 and numIter = 10. RMSE (validation) = 0.7489005441881798 for the model trained with rank = 8 and numIter = 20. RMSE (validation) = 0.7387672873747732 for the model trained with rank = 12 and numIter = 10. RMSE (validation) = 0.7310003522283959 for the model trained with rank = 12 and numIter = 20. The best model was trained with rank = 12, and numIter = 20, and its RMSE on the test set is 0.7302343904091481. baselineRmse: 0.0 testRmse: 0.7302343904091481 The best model improves the baseline by -Infinity%. Which is still a big error, I guess. Also I get strange baseline improvement where baseline model is simply mean (1). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLib-How-to-set-preferences-for-ALS-implicit-feedback-in-Collaborative-Filtering-tp21185.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
MLib: How to set preferences for ALS implicit feedback in Collaborative Filtering?
I am trying to use Spark MLib ALS with implicit feedback for collaborative filtering. Input data has only two fields `userId` and `productId`. I have **no product ratings**, just info on what products users have bought, that's all. So to train ALS I use: def trainImplicit(ratings: RDD[Rating], rank: Int, iterations: Int): MatrixFactorizationModel (http://spark.apache.org/docs/1.0.0/api/scala/index.html#org.apache.spark.mllib.recommendation.ALS$) This API requires `Rating` object: Rating(user: Int, product: Int, rating: Double) On the other hand documentation on `trainImplicit` tells: *Train a matrix factorization model given an RDD of 'implicit preferences' ratings given by users to some products, in the form of (userID, productID, **preference**) pairs.* When I set rating / preferences to `1` as in: val ratings = sc.textFile(new File(dir, file).toString).map { line = val fields = line.split(,) // format: (randomNumber, Rating(userId, productId, rating)) (rnd.nextInt(100), Rating(fields(0).toInt, fields(1).toInt, 1.0)) } val training = ratings.filter(x = x._1 60) .values .repartition(numPartitions) .cache() val validation = ratings.filter(x = x._1 = 60 x._1 80) .values .repartition(numPartitions) .cache() val test = ratings.filter(x = x._1 = 80).values.cache() And then train ALSL: val model = ALS.trainImplicit(ratings, rank, numIter) I get RMSE 0.9, which is a big error in case of preferences taking 0 or 1 value: val validationRmse = computeRmse(model, validation, numValidation) /** Compute RMSE (Root Mean Squared Error). */ def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating], n: Long): Double = { val predictions: RDD[Rating] = model.predict(data.map(x = (x.user, x.product))) val predictionsAndRatings = predictions.map(x = ((x.user, x.product), x.rating)) .join(data.map(x = ((x.user, x.product), x.rating))) .values math.sqrt(predictionsAndRatings.map(x = (x._1 - x._2) * (x._1 - x._2)).reduce(_ + _) / n) } So my question is: to what value should I set `rating` in: Rating(user: Int, product: Int, rating: Double) for implicit training (in `ALS.trainImplicit` method) ? **Update** With: val alpha = 40 val lambda = 0.01 I get: Got 1895593 ratings from 17471 users on 462685 products. Training: 1136079, validation: 380495, test: 379019 RMSE (validation) = 0.7537217888106758 for the model trained with rank = 8 and numIter = 10. RMSE (validation) = 0.7489005441881798 for the model trained with rank = 8 and numIter = 20. RMSE (validation) = 0.7387672873747732 for the model trained with rank = 12 and numIter = 10. RMSE (validation) = 0.7310003522283959 for the model trained with rank = 12 and numIter = 20. The best model was trained with rank = 12, and numIter = 20, and its RMSE on the test set is 0.7302343904091481. baselineRmse: 0.0 testRmse: 0.7302343904091481 The best model improves the baseline by -Infinity%. Which is still a big error, I guess. Also I get strange baseline improvement where baseline model is simply mean (1). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLib-How-to-set-preferences-for-ALS-implicit-feedback-in-Collaborative-Filtering-tp21186.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Fwd: Problems with TeraValidate
+spark-user -- Forwarded message -- From: lonely Feb lonely8...@gmail.com Date: 2015-01-16 19:09 GMT+08:00 Subject: Re: Problems with TeraValidate To: Ewan Higgs ewan.hi...@ugent.be thx a lot. btw, here is my output: 1. when dataset is 1000g: num records: 100 checksum: 12aa5028310ea763e part 0 lastMaxArrayBuffer(0, 0, 0, 0, 0, 0, 0, 0, 0, 0) min ArrayBuffer(0, 4, 25, 150, 6, 136, 39, 39, 214, 164) max ArrayBuffer(255, 255, 96, 244, 80, 50, 31, 158, 43, 113) part 1 lastMaxArrayBuffer(255, 255, 96, 244, 80, 50, 31, 158, 43, 113) min ArrayBuffer(0, 4, 25, 150, 6, 136, 39, 39, 214, 164) max ArrayBuffer(255, 255, 96, 244, 80, 50, 31, 158, 43, 113) Exception in thread main java.lang.AssertionError: assertion failed: current partition min last partition max at scala.Predef$.assert(Predef.scala:179) at org.apache.spark.examples.terasort.TeraValidate$$anonfun$validate$3.apply(TeraValidate.scala:117) at org.apache.spark.examples.terasort.TeraValidate$$anonfun$validate$3.apply(TeraValidate.scala:111) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.examples.terasort.TeraValidate$.validate(TeraValidate.scala:111) at org.apache.spark.examples.terasort.TeraValidate$.main(TeraValidate.scala:59) at org.apache.spark.examples.terasort.TeraValidate.main(TeraValidate.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:616) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:329) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 2. when dataset is 200m: um records: 200 checksum: ca93e5d2fad40 part 0 lastMaxArrayBuffer(0, 0, 0, 0, 0, 0, 0, 0, 0, 0) min ArrayBuffer(82, 24, 27, 218, 62, 68, 174, 208, 69, 78) max ArrayBuffer(146, 177, 217, 195, 175, 144, 239, 81, 29, 252) part 1 lastMaxArrayBuffer(146, 177, 217, 195, 175, 144, 239, 81, 29, 252) min ArrayBuffer(82, 24, 27, 218, 62, 68, 174, 208, 69, 78) max ArrayBuffer(146, 177, 217, 195, 175, 144, 239, 81, 29, 252) Exception in thread main java.lang.AssertionError: assertion failed: current partition min last partition max at scala.Predef$.assert(Predef.scala:179) at org.apache.spark.examples.terasort.TeraValidate$$anonfun$validate$3.apply(TeraValidate.scala:117) at org.apache.spark.examples.terasort.TeraValidate$$anonfun$validate$3.apply(TeraValidate.scala:111) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.examples.terasort.TeraValidate$.validate(TeraValidate.scala:111) at org.apache.spark.examples.terasort.TeraValidate$.main(TeraValidate.scala:59) at org.apache.spark.examples.terasort.TeraValidate.main(TeraValidate.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:616) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:329) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) I suspect sth. is wrong with the function clone. 2015-01-16 19:02 GMT+08:00 Ewan Higgs ewan.hi...@ugent.be: Hi Ionely, I am looking at this now. If you need to validate a terasort benchmark as soon as possible, I would use Hadoop's TeraValidate. I'll let you know when I have a fix. Yours, Ewan Higgs On 16/01/15 09:47, lonely Feb wrote: Hi i run your terasort program on my spark cluster, when the dataset is small (below 1000g) everything goes fine, but when the dataset is over 1000g, the TeraValidate always assert error with: current partition min last partition max eg. output is : num records: 100 checksum: 12aa5028310ea763e part 0 lastMaxArrayBuffer(0, 0, 0, 0, 0, 0, 0, 0, 0, 0) min ArrayBuffer(0, 4, 25, 150, 6, 136, 39, 39, 214, 164) max ArrayBuffer(255, 255, 96, 244, 80, 50, 31, 158, 43, 113) part 1 lastMaxArrayBuffer(255, 255, 96, 244, 80, 50, 31, 158, 43, 113) min ArrayBuffer(0, 4, 25, 150, 6, 136, 39, 39, 214, 164) max ArrayBuffer(255, 255, 96, 244, 80, 50, 31, 158, 43, 113) Exception in thread main java.lang.AssertionError: assertion failed: current partition min last partition max at
Re: Spark SQL Parquet - data are reading very very slow
I have seen similar results. I have looked at the code and I think there are a couple of contributors: Encoding/decoding java Strings to UTF8 bytes is quite expensive. I'm not sure what you can do about that. But there are options for optimization due to the repeated decoding of the same String values. As Spark queries process each row from Parquet it makes a call to convert the Binary representation for each String column into a Java String. However in many (probably most) circumstances the underlying Binary classes from Parquet will have come from a Dictionary, for example when column cardinality is low. Therefore Spark is converting the same byte array to a copy of the same Java String over and over again. This is bad due to extra cpu, extra memory used for these strings, and probably results in more expensive grouping comparisons. I tested a simple hack to cache the last Binary-String conversion per column and this led to a 25% performance improvement for the queries I used. Admittedly this was over a data set with lots or runs of the same Strings in the queried columns. I haven't looked at the code to write Parquet files in Spark but I imagine similar duplicate String-Binary conversions could be happening. These costs are quite important for the type of data that I expect will be stored in Parquet which will often have denormalized tables and probably lots of fairly low cardinality string columns Its possible that changes could be made to Parquet to so the encoding/decoding of Objects to Binary is handled on Parquet side of fence. Parquet could deal with Objects (Strings) as the client understands them and only use encoding/decoding to store/read from underlying storage medium. Doing this I think Parquet could ensure that the encoding/decoding of each Object occurs only once. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Parquet-data-are-reading-very-very-slow-tp21061p21187.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: MissingRequirementError with spark
Created JIRA issue https://issues.apache.org/jira/browse/SPARK-5281 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MissingRequirementError-with-spark-tp21149p21188.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Executor vs Mapper in Hadoop
Thanks a lot for clarify it. Then following there are some questions: 1, if normally we have 1 executor per machine. Then if we have a cluster with different hardware capacity, for example: one 8 core worker and one 4 core worker (ignore the driver machine), then if we set executor-cores =4, then for 8 core worker there will have 2 executors running, am I right? if we set the executor-cores=8, then one worker (with 4 core) can't start any executors. And similar idea will also apply to the memory allocation, so we should always use same hardware configuration for spark cluster as worker machine? 2, If I run spark on Yarn (actually EMR), where can I check/configure the default executor-cores? Regards, Shuai On Thu, Jan 15, 2015 at 11:13 PM, Sean Owen so...@cloudera.com wrote: An executor is specific to a Spark application, just as a mapper is specific to a MapReduce job. So a machine will usually be running many executors, and each is a JVM. A Mapper is single-threaded; an executor can run many tasks (possibly from different jobs within the application) at once. Yes, 5 executors with 4 cores should be able to process 20 tasks in parallel. In any normal case, you have 1 executor per machine per application. There are cases where you would make more than 1, but these are unusual. On Thu, Jan 15, 2015 at 8:16 PM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, I try to clarify some behavior in the spark for executor. Because I am from Hadoop background, so I try to compare it to the Mapper (or reducer) in hadoop. 1, Each node can have multiple executors, each run in its own process? This is same as mapper process. 2, I thought the spark executor will use multi-thread mode when there are more than 1 core to allocate to it (for example: set executor-cores to 5). In this way, how many partition it can process? For example, if input are 20 partitions (similar as 20 split as mapper input) and we have 5 executors, each has 4 cores. Will all these partitions will be proceed as the same time (so each core process one partition) or actually one executor can only run one partition at the same time? I don’t know whether my understand is correct, please suggest. BTW: In general practice, should we always try to set the executor-cores to a higher number? So we will favor 10 cores * 2 executor than 2 cores*10 executors? Any suggestion here? Thanks! Regards, Shuai
RE: Why custom parquet format hive table execute ParquetTableScan physical plan, not HiveTableScan?
I think you might need to set spark.sql.hive.convertMetastoreParquet to false if I understand that flag correctly Sent on the new Sprint Network from my Samsung Galaxy S®4. div Original message /divdivFrom: Xiaoyu Wang wangxy...@gmail.com /divdivDate:01/16/2015 5:09 AM (GMT-05:00) /divdivTo: user@spark.apache.org /divdivSubject: Why custom parquet format hive table execute ParquetTableScan physical plan, not HiveTableScan? /divdiv /divHi all! In the Spark SQL1.2.0. I create a hive table with custom parquet inputformat and outputformat. like this : CREATE TABLE test( id string, msg string) CLUSTERED BY ( id) SORTED BY ( id ASC) INTO 10 BUCKETS ROW FORMAT SERDE 'com.a.MyParquetHiveSerDe' STORED AS INPUTFORMAT 'com.a.MyParquetInputFormat' OUTPUTFORMAT 'com.a.MyParquetOutputFormat'; And the spark shell see the plan of select * from test is : [== Physical Plan ==] [!OutputFaker [id#5,msg#6]] [ ParquetTableScan [id#12,msg#13], (ParquetRelation hdfs://hadoop/user/hive/warehouse/test.db/test, Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml), org.apache.spark.sql.hive.HiveContext@6d15a113, []), []] Not HiveTableScan!!! So it dosn't execute my custom inputformat! Why? How can it execute my custom inputformat? Thanks!
Re: Why custom parquet format hive table execute ParquetTableScan physical plan, not HiveTableScan?
Thanks yana! I will try it! 在 2015年1月16日,20:51,yana yana.kadiy...@gmail.com mailto:yana.kadiy...@gmail.com 写道: I think you might need to set spark.sql.hive.convertMetastoreParquet to false if I understand that flag correctly Sent on the new Sprint Network from my Samsung Galaxy S®4. Original message From: Xiaoyu Wang Date:01/16/2015 5:09 AM (GMT-05:00) To: user@spark.apache.org mailto:user@spark.apache.org Subject: Why custom parquet format hive table execute ParquetTableScan physical plan, not HiveTableScan? Hi all! In the Spark SQL1.2.0. I create a hive table with custom parquet inputformat and outputformat. like this : CREATE TABLE test( id string, msg string) CLUSTERED BY ( id) SORTED BY ( id ASC) INTO 10 BUCKETS ROW FORMAT SERDE 'com.a.MyParquetHiveSerDe' STORED AS INPUTFORMAT 'com.a.MyParquetInputFormat' OUTPUTFORMAT 'com.a.MyParquetOutputFormat'; And the spark shell see the plan of select * from test is : [== Physical Plan ==] [!OutputFaker [id#5,msg#6]] [ ParquetTableScan [id#12,msg#13], (ParquetRelation hdfs://hadoop/user/hive/warehouse/test.db/test hdfs://hadoop/user/hive/warehouse/test.db/test, Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml), org.apache.spark.sql.hive.HiveContext@6d15a113, []), []] Not HiveTableScan!!! So it dosn't execute my custom inputformat! Why? How can it execute my custom inputformat? Thanks!
RE: How to force parallel processing of RDD using multiple thread
Does parallel processing mean it is executed in multiple worker or executed in one worker but multiple threads? For example if I have only one worker but my RDD has 4 partition, will it be executed parallel in 4 thread? The reason I am asking is try to decide whether I need to configure spark to have multiple workers. By default, it just start with one worker. Regards, Ningjun Wang Consulting Software Engineer LexisNexis 121 Chanlon Road New Providence, NJ 07974-1541 -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Thursday, January 15, 2015 11:04 PM To: Wang, Ningjun (LNG-NPV) Cc: user@spark.apache.org Subject: Re: How to force parallel processing of RDD using multiple thread Check the number of partitions in your input. It may be much less than the available parallelism of your small cluster. For example, input that lives in just 1 partition will spawn just 1 task. Beyond that parallelism just happens. You can see the parallelism of each operation in the Spark UI. On Thu, Jan 15, 2015 at 10:53 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: Spark Standalone cluster. My program is running very slow, I suspect it is not doing parallel processing of rdd. How can I force it to run parallel? Is there anyway to check whether it is processed in parallel? Regards, Ningjun Wang Consulting Software Engineer LexisNexis 121 Chanlon Road New Providence, NJ 07974-1541 -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Thursday, January 15, 2015 4:29 PM To: Wang, Ningjun (LNG-NPV) Cc: user@spark.apache.org Subject: Re: How to force parallel processing of RDD using multiple thread What is your cluster manager? For example on YARN you would specify --executor-cores. Read: http://spark.apache.org/docs/latest/running-on-yarn.html On Thu, Jan 15, 2015 at 8:54 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: I have a standalone spark cluster with only one node with 4 CPU cores. How can I force spark to do parallel processing of my RDD using multiple threads? For example I can do the following Spark-submit --master local[4] However I really want to use the cluster as follow Spark-submit --master spark://10.125.21.15:7070 In that case, how can I make sure the RDD is processed with multiple threads/cores? Thanks Ningjun
Can I save RDD to local file system and then read it back on spark cluster with multiple nodes?
I have asked this question before but get no answer. Asking again. Can I save RDD to the local file system and then read it back on a spark cluster with multiple nodes? rdd.saveAsObjectFile(file:///home/data/rdd1file:///\\home\data\rdd1) val rdd2 = sc.objectFile(file:///home/data/rdd1file:///\\home\data\rdd1) This will works if the cluster has only one node. But my cluster has 3 nodes and each node has a local dir called /home/data. Is rdd saved to the local dir across 3 nodes? If so, does sc.objectFile(...) smart enough to read the local dir in all 3 nodes to merge them into a single rdd? Ningjun
RE: How to force parallel processing of RDD using multiple thread
So one worker is enough and it will use all 4 cores? In what situation shall I configure more workers in my single node cluster? Regards, Ningjun Wang Consulting Software Engineer LexisNexis 121 Chanlon Road New Providence, NJ 07974-1541 From: Gerard Maas [mailto:gerard.m...@gmail.com] Sent: Friday, January 16, 2015 9:44 AM To: Wang, Ningjun (LNG-NPV) Cc: Sean Owen; user@spark.apache.org Subject: Re: How to force parallel processing of RDD using multiple thread Spark will use the number of cores available in the cluster. If your cluster is 1 node with 4 cores, Spark will execute up to 4 tasks in parallel. Setting your #of partitions to 4 will ensure an even load across cores. Note that this is different from saying threads - Internally Spark uses many threads (data block sender/receiver, listeners, notifications, scheduler, ...) -kr, Gerard. On Fri, Jan 16, 2015 at 3:14 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.commailto:ningjun.w...@lexisnexis.com wrote: Does parallel processing mean it is executed in multiple worker or executed in one worker but multiple threads? For example if I have only one worker but my RDD has 4 partition, will it be executed parallel in 4 thread? The reason I am asking is try to decide whether I need to configure spark to have multiple workers. By default, it just start with one worker. Regards, Ningjun Wang Consulting Software Engineer LexisNexis 121 Chanlon Road New Providence, NJ 07974-1541 -Original Message- From: Sean Owen [mailto:so...@cloudera.commailto:so...@cloudera.com] Sent: Thursday, January 15, 2015 11:04 PM To: Wang, Ningjun (LNG-NPV) Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: How to force parallel processing of RDD using multiple thread Check the number of partitions in your input. It may be much less than the available parallelism of your small cluster. For example, input that lives in just 1 partition will spawn just 1 task. Beyond that parallelism just happens. You can see the parallelism of each operation in the Spark UI. On Thu, Jan 15, 2015 at 10:53 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.commailto:ningjun.w...@lexisnexis.com wrote: Spark Standalone cluster. My program is running very slow, I suspect it is not doing parallel processing of rdd. How can I force it to run parallel? Is there anyway to check whether it is processed in parallel? Regards, Ningjun Wang Consulting Software Engineer LexisNexis 121 Chanlon Road New Providence, NJ 07974-1541 -Original Message- From: Sean Owen [mailto:so...@cloudera.commailto:so...@cloudera.com] Sent: Thursday, January 15, 2015 4:29 PM To: Wang, Ningjun (LNG-NPV) Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: How to force parallel processing of RDD using multiple thread What is your cluster manager? For example on YARN you would specify --executor-cores. Read: http://spark.apache.org/docs/latest/running-on-yarn.html On Thu, Jan 15, 2015 at 8:54 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.commailto:ningjun.w...@lexisnexis.com wrote: I have a standalone spark cluster with only one node with 4 CPU cores. How can I force spark to do parallel processing of my RDD using multiple threads? For example I can do the following Spark-submit --master local[4] However I really want to use the cluster as follow Spark-submit --master spark://10.125.21.15:7070http://10.125.21.15:7070 In that case, how can I make sure the RDD is processed with multiple threads/cores? Thanks Ningjun
How to 'Pipe' Binary Data in Apache Spark
I have an RDD containing binary data. I would like to use 'RDD.pipe' to pipe that binary data to an external program that will translate it to string/text data. Unfortunately, it seems that Spark is mangling the binary data before it gets passed to the external program. This code is representative of what I am trying to do. What am I doing wrong? How can I pipe binary data in Spark? Maybe it is getting corrupted when I read it in initially with 'textFile'? bin = sc.textFile(binary-data.dat) csv = bin.pipe (/usr/bin/binary-to-csv.sh) csv.saveAsTextFile(text-data.csv) Specifically, I am trying to use Spark to transform pcap (packet capture) data to text/csv so that I can perform an analysis on it. Thanks! -- Nick Allen n...@nickallen.org
Re: MLib: How to set preferences for ALS implicit feedback in Collaborative Filtering?
On Fri, Jan 16, 2015 at 9:58 AM, Zork Sail zorks...@gmail.com wrote: And then train ALSL: val model = ALS.trainImplicit(ratings, rank, numIter) I get RMSE 0.9, which is a big error in case of preferences taking 0 or 1 value: This is likely the problem. RMSE is not an appropriate evaluation metric when you have trained a model on implicit data. The factorization is not minimizing the same squared error loss that RMSE evaluates. Use metrics like AUC instead, for example. Rating value can be 1 if you have no information at all about the interaction other than that it exists. It should be thought of as a weight. 10 means it's 10 times more important to predict an interaction than one with weight 1. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to 'Pipe' Binary Data in Apache Spark
Well it looks like you're reading some kind of binary file as text. That isn't going to work, in Spark or elsewhere, as binary data is not even necessarily the valid encoding of a string. There are no line breaks to delimit lines and thus elements of the RDD. Your input has some record structure (or else it's not really useful to put it into an RDD). You can encode this as a SequenceFile and read it with objectFile. You could also write a custom InputFormat that knows how to parse pcap records directly. On Fri, Jan 16, 2015 at 3:09 PM, Nick Allen n...@nickallen.org wrote: I have an RDD containing binary data. I would like to use 'RDD.pipe' to pipe that binary data to an external program that will translate it to string/text data. Unfortunately, it seems that Spark is mangling the binary data before it gets passed to the external program. This code is representative of what I am trying to do. What am I doing wrong? How can I pipe binary data in Spark? Maybe it is getting corrupted when I read it in initially with 'textFile'? bin = sc.textFile(binary-data.dat) csv = bin.pipe (/usr/bin/binary-to-csv.sh) csv.saveAsTextFile(text-data.csv) Specifically, I am trying to use Spark to transform pcap (packet capture) data to text/csv so that I can perform an analysis on it. Thanks! -- Nick Allen n...@nickallen.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Scala vs Python performance differences
I was interested in this as I had some Spark code in Python that was too slow and wanted to know whether Scala would fix it for me. So I re-wrote my code in Scala. In my particular case the Scala version was 10 times faster. But I think that is because I did an awful lot of computation in my own code rather than in a library like numpy. (I put a bit more detail here http://tttv-engineering.tumblr.com/post/108260351966/spark-python-vs-scala in case you are interested) So there's one data point, if only for the obvious data point comparing computations in Scala to computations in pure Python. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Scala-vs-Python-performance-differences-tp4247p21190.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to 'Pipe' Binary Data in Apache Spark
Per your last comment, it appears I need something like this: https://github.com/RIPE-NCC/hadoop-pcap Thanks a ton. That get me oriented in the right direction. On Fri, Jan 16, 2015 at 10:20 AM, Sean Owen so...@cloudera.com wrote: Well it looks like you're reading some kind of binary file as text. That isn't going to work, in Spark or elsewhere, as binary data is not even necessarily the valid encoding of a string. There are no line breaks to delimit lines and thus elements of the RDD. Your input has some record structure (or else it's not really useful to put it into an RDD). You can encode this as a SequenceFile and read it with objectFile. You could also write a custom InputFormat that knows how to parse pcap records directly. On Fri, Jan 16, 2015 at 3:09 PM, Nick Allen n...@nickallen.org wrote: I have an RDD containing binary data. I would like to use 'RDD.pipe' to pipe that binary data to an external program that will translate it to string/text data. Unfortunately, it seems that Spark is mangling the binary data before it gets passed to the external program. This code is representative of what I am trying to do. What am I doing wrong? How can I pipe binary data in Spark? Maybe it is getting corrupted when I read it in initially with 'textFile'? bin = sc.textFile(binary-data.dat) csv = bin.pipe (/usr/bin/binary-to-csv.sh) csv.saveAsTextFile(text-data.csv) Specifically, I am trying to use Spark to transform pcap (packet capture) data to text/csv so that I can perform an analysis on it. Thanks! -- Nick Allen n...@nickallen.org -- Nick Allen n...@nickallen.org
kinesis multiple records adding into stream
Hi Experts! I am using kinesis dependency as follow groupId = org.apache.spark artifactId = spark-streaming-kinesis-asl_2.10 version = 1.2.0 in this aws sdk version 1.8.3 is being used. in this sdk multiple records can not be put in a single request. is it possible to put multiple records in a single request ? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/kinesis-multiple-records-adding-into-stream-tp21191.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: kinesis multiple records adding into stream
Sorry. I couldn't understand the issue. Are you trying to send data to kinesis from a spark batch/real time job? - Aniket On Fri, Jan 16, 2015, 9:40 PM Hafiz Mujadid hafizmujadi...@gmail.com wrote: Hi Experts! I am using kinesis dependency as follow groupId = org.apache.spark artifactId = spark-streaming-kinesis-asl_2.10 version = 1.2.0 in this aws sdk version 1.8.3 is being used. in this sdk multiple records can not be put in a single request. is it possible to put multiple records in a single request ? thanks -- View this message in context: http://apache-spark-user-list. 1001560.n3.nabble.com/kinesis-multiple-records-adding-into- stream-tp21191.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Can I save RDD to local file system and then read it back on spark cluster with multiple nodes?
I'm not positive, but I think this is very unlikely to work. First, when you call sc.objectFile(...), I think the *driver* will need to know something about the file, eg to know how many tasks to create. But it won't even be able to see the file, since it only lives on the local filesystem of the cluster nodes. If you really wanted to, you could probably write out some small metadata about the files and write your own version of objectFile that uses it. But I think there is a bigger conceptual issue. You might not in general be sure that you are running on the same nodes when you save the file, as when you read it back in. So the file might not be present on the local filesystem for the active executors. You might be able to guarantee it for the specific cluster setup you have now, but it might limit you down the road. What are you trying to achieve? There might be a better way. I believe writing to hdfs will usually write one local copy, so you'd still be doing a local read when you reload the data. Imran On Jan 16, 2015 6:19 AM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: I have asked this question before but get no answer. Asking again. Can I save RDD to the local file system and then read it back on a spark cluster with multiple nodes? rdd.saveAsObjectFile(“file:///home/data/rdd1”) val rdd2 = sc.objectFile(“file:///home/data/rdd1”) This will works if the cluster has only one node. But my cluster has 3 nodes and each node has a local dir called /home/data. Is rdd saved to the local dir across 3 nodes? If so, does sc.objectFile(…) smart enough to read the local dir in all 3 nodes to merge them into a single rdd? Ningjun
remote Akka client disassociated - some timeout?
Hi, I believe this is some kind of timeout problem but can't figure out how to increase it. I am running spark 1.2.0 on yarn (all from cdh 5.3.0). I submit a python task which first loads big RDD from hbase - I can see in the screen output all executors fire up then no more logging output for next two minutes after which I get plenty of 15/01/16 17:35:16 ERROR cluster.YarnClientClusterScheduler: Lost executor 7 on node01: remote Akka client disassociated15/01/16 17:35:16 INFO scheduler.TaskSetManager: Re-queueing tasks for 7 from TaskSet 1.015/01/16 17:35:16 WARN scheduler.TaskSetManager: Lost task 32.0 in stage 1.0 (TID 17, node01): ExecutorLostFailure (executor 7 lost)15/01/16 17:35:16 WARN scheduler.TaskSetManager: Lost task 34.0 in stage 1.0 (TID 25, node01): ExecutorLostFailure (executor 7 lost) this points to some timeout ~120secs while the nodes are loading the big RDD? any ideas how to get around it? fyi I already use following options without any success: spark.core.connection.ack.wait.timeout: 600 spark.akka.timeout: 1000 thanks,Antony.
RE: Determine number of running executors
Hi Tobias, Can you share more information about how do you do that? I also have similar question about this. Thanks a lot, Regards, Shuai From: Tobias Pfeiffer [mailto:t...@preferred.jp] Sent: Wednesday, November 26, 2014 12:25 AM To: Sandy Ryza Cc: Yanbo Liang; user Subject: Re: Determine number of running executors Hi, Thanks for your help! Sandy, I had a bit of trouble finding the spark.executor.cores property. (It wasn't there although its value should have been 2.) I ended up throwing regular expressions on scala.util.Properties.propOrElse(sun.java.command, ), which worked surprisingly well ;-) Thanks Tobias
Re: kinesis multiple records adding into stream
Are you referring to the PutRecords method, which was added in 1.9.9? (See http://aws.amazon.com/releasenotes/1369906126177804) If so, can't you just depend upon this later version of the SDK in your app even though spark-streaming-kinesis-asl is depending upon this earlier 1.9.3 version that does not yet have the PutRecords call? Also, could you please explain your use case more fully? spark-streaming-kinesis-asl is for *reading* data from Kinesis in Spark, not for writing data, so I would expect that the part of your code that would be writing to Kinesis would be a totally separate app anyway (unless you are reading from Kinesis using spark-streaming-kinesis-asl, transforming it somehow, then writing it back out to Kinesis). ~ Jonathan From: Aniket Bhatnagar aniket.bhatna...@gmail.commailto:aniket.bhatna...@gmail.com Date: Friday, January 16, 2015 at 9:13 AM To: Hafiz Mujadid hafizmujadi...@gmail.commailto:hafizmujadi...@gmail.com, user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: kinesis multiple records adding into stream Sorry. I couldn't understand the issue. Are you trying to send data to kinesis from a spark batch/real time job? - Aniket On Fri, Jan 16, 2015, 9:40 PM Hafiz Mujadid hafizmujadi...@gmail.commailto:hafizmujadi...@gmail.com wrote: Hi Experts! I am using kinesis dependency as follow groupId = org.apache.spark artifactId = spark-streaming-kinesis-asl_2.10 version = 1.2.0 in this aws sdk version 1.8.3 is being used. in this sdk multiple records can not be put in a single request. is it possible to put multiple records in a single request ? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/kinesis-multiple-records-adding-into-stream-tp21191.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org
RE: Can I save RDD to local file system and then read it back on spark cluster with multiple nodes?
I need to save RDD to file system and then restore my RDD from the file system in the future. I don’t have any hdfs file system and don’t want to go the hassle of setting up a hdfs system. So how can I achieve this? The application need to be run on a cluster with multiple nodes. Regards, Ningjun From: imranra...@gmail.com [mailto:imranra...@gmail.com] On Behalf Of Imran Rashid Sent: Friday, January 16, 2015 12:14 PM To: Wang, Ningjun (LNG-NPV) Cc: user@spark.apache.org Subject: Re: Can I save RDD to local file system and then read it back on spark cluster with multiple nodes? I'm not positive, but I think this is very unlikely to work. First, when you call sc.objectFile(...), I think the *driver* will need to know something about the file, eg to know how many tasks to create. But it won't even be able to see the file, since it only lives on the local filesystem of the cluster nodes. If you really wanted to, you could probably write out some small metadata about the files and write your own version of objectFile that uses it. But I think there is a bigger conceptual issue. You might not in general be sure that you are running on the same nodes when you save the file, as when you read it back in. So the file might not be present on the local filesystem for the active executors. You might be able to guarantee it for the specific cluster setup you have now, but it might limit you down the road. What are you trying to achieve? There might be a better way. I believe writing to hdfs will usually write one local copy, so you'd still be doing a local read when you reload the data. Imran On Jan 16, 2015 6:19 AM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.commailto:ningjun.w...@lexisnexis.com wrote: I have asked this question before but get no answer. Asking again. Can I save RDD to the local file system and then read it back on a spark cluster with multiple nodes? rdd.saveAsObjectFile(“file:///home/data/rdd1file:///\\home\data\rdd1”) val rdd2 = sc.objectFile(“file:///home/data/rdd1file:///\\home\data\rdd1”) This will works if the cluster has only one node. But my cluster has 3 nodes and each node has a local dir called /home/data. Is rdd saved to the local dir across 3 nodes? If so, does sc.objectFile(…) smart enough to read the local dir in all 3 nodes to merge them into a single rdd? Ningjun
Re: Failing jobs runs twice
FYI, I just confirmed with the latest Spark 1.3 snapshot that the spark.yarn.maxAppAttempts setting that SPARK-2165 refers to works perfectly. Great to finally get rid of this problem. Also caused an issue when the eventLogs were enabled since the spark-events/appXXX folder already exists the second time the app gets launched. On Thu, Jan 15, 2015 at 3:01 PM, Anders Arpteg arp...@spotify.com wrote: Found a setting that seems to fix this problem, but it does not seems to be available until Spark 1.3. See https://issues.apache.org/jira/browse/SPARK-2165 However, glad to see a work is being done with the issue. On Tue, Jan 13, 2015 at 8:00 PM, Anders Arpteg arp...@spotify.com wrote: Yes Andrew, I am. Tried setting spark.yarn.applicationMaster.waitTries to 1 (thanks Sean), but with no luck. Any ideas? On Tue, Jan 13, 2015 at 7:58 PM, Andrew Or and...@databricks.com wrote: Hi Anders, are you using YARN by any chance? 2015-01-13 0:32 GMT-08:00 Anders Arpteg arp...@spotify.com: Since starting using Spark 1.2, I've experienced an annoying issue with failing apps that gets executed twice. I'm not talking about tasks inside a job, that should be executed multiple times before failing the whole app. I'm talking about the whole app, that seems to close the previous Spark context, start a new, and rerun the app again. This is annoying since it overwrite the log files as well and it becomes hard to troubleshoot the failing app. Does anyone know how to turn this feature off? Thanks, Anders
If an RDD appeared twice in a DAG, of which calculation is triggered by a single action, will this RDD be calculated twice?
I'm talking about RDD1 (not persisted or checkpointed) in this situation: ...(somewhere) - RDD1 - RDD2 || V V RDD3 - RDD4 - Action! To my experience the change RDD1 get recalculated is volatile, sometimes once, sometimes twice. When calculation of this RDD is expensive (e.g. involves using an RESTful service that charges me money), this compels me to persist RDD1 which takes extra memory, and in case the Action! doesn't always happen, I don't know when to unpersist it to free those memory. A related problem might be in $SQLContest.jsonRDD(), since the source jsonRDD is used twice (one for schema inferring, another for data read). It almost guarantees that the source jsonRDD is calculated twice. Has this problem be addressed so far? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/If-an-RDD-appeared-twice-in-a-DAG-of-which-calculation-is-triggered-by-a-single-action-will-this-RDD-tp21192.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
If an RDD appeared twice in a DAG, of which calculation is triggered by a single action, will this RDD be calculated twice?
I'm talking about RDD1 (not persisted or checkpointed) in this situation: ...(somewhere) - RDD1 - RDD2 || V V RDD3 - RDD4 - Action! To my experience the change RDD1 get recalculated is volatile, sometimes once, sometimes twice. When calculation of this RDD is expensive (e.g. involves using an RESTful service that charges me money), this compels me to persist RDD1 which takes extra memory, and in case the Action! doesn't always happen, I don't know when to unpersist it to free those memory. A related problem might be in $SQLContest.jsonRDD(), since the source jsonRDD is used twice (one for schema inferring, another for data read). It almost guarantees that the source jsonRDD is calculated twice. Is there a way to solve (or circumvent) this problem?
Re: How to 'Pipe' Binary Data in Apache Spark
I just wanted to reiterate the solution for the benefit of the community. The problem is not from my use of 'pipe', but that 'textFile' cannot be used to read in binary data. (Doh) There are a couple options to move forward. 1. Implement a custom 'InputFormat' that understands the binary input data. (Per Sean Owen) 2. Use 'SparkContext.binaryFiles' to read in the entire binary file as a single record. This will impact performance as it prevents the use of more than one mapper on the file's data. In my specific case for #1 I can only find one project from RIPE-NCC ( https://github.com/RIPE-NCC/hadoop-pcap) that does this. Unfortunately, it appears to only support a limited set of network protocols. On Fri, Jan 16, 2015 at 10:40 AM, Nick Allen n...@nickallen.org wrote: Per your last comment, it appears I need something like this: https://github.com/RIPE-NCC/hadoop-pcap Thanks a ton. That get me oriented in the right direction. On Fri, Jan 16, 2015 at 10:20 AM, Sean Owen so...@cloudera.com wrote: Well it looks like you're reading some kind of binary file as text. That isn't going to work, in Spark or elsewhere, as binary data is not even necessarily the valid encoding of a string. There are no line breaks to delimit lines and thus elements of the RDD. Your input has some record structure (or else it's not really useful to put it into an RDD). You can encode this as a SequenceFile and read it with objectFile. You could also write a custom InputFormat that knows how to parse pcap records directly. On Fri, Jan 16, 2015 at 3:09 PM, Nick Allen n...@nickallen.org wrote: I have an RDD containing binary data. I would like to use 'RDD.pipe' to pipe that binary data to an external program that will translate it to string/text data. Unfortunately, it seems that Spark is mangling the binary data before it gets passed to the external program. This code is representative of what I am trying to do. What am I doing wrong? How can I pipe binary data in Spark? Maybe it is getting corrupted when I read it in initially with 'textFile'? bin = sc.textFile(binary-data.dat) csv = bin.pipe (/usr/bin/binary-to-csv.sh) csv.saveAsTextFile(text-data.csv) Specifically, I am trying to use Spark to transform pcap (packet capture) data to text/csv so that I can perform an analysis on it. Thanks! -- Nick Allen n...@nickallen.org -- Nick Allen n...@nickallen.org -- Nick Allen n...@nickallen.org
Re: Scala vs Python performance differences
Hey Phil, Thank you sharing this. The result didn't surprise me a lot, it's normal to do the prototype in Python, once it get stable and you really need the performance, then rewrite part of it in C or whole of it in another language does make sense, it will not cause you much time. Davies On Fri, Jan 16, 2015 at 7:38 AM, philpearl p...@tanktop.tv wrote: I was interested in this as I had some Spark code in Python that was too slow and wanted to know whether Scala would fix it for me. So I re-wrote my code in Scala. In my particular case the Scala version was 10 times faster. But I think that is because I did an awful lot of computation in my own code rather than in a library like numpy. (I put a bit more detail here http://tttv-engineering.tumblr.com/post/108260351966/spark-python-vs-scala in case you are interested) So there's one data point, if only for the obvious data point comparing computations in Scala to computations in pure Python. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Scala-vs-Python-performance-differences-tp4247p21190.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark java options
I want to add some java options when submitting application: --conf spark.executor.extraJavaOptions=-XX:+UnlockCommercialFeatures -XX:+FlightRecorder But looks like it doesn't get set. Where I can add it to make it working? Thanks. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark java options
Hi Kane, What's the complete command line you're using to submit the app? Where to you expect these options to appear? On Fri, Jan 16, 2015 at 11:12 AM, Kane Kim kane.ist...@gmail.com wrote: I want to add some java options when submitting application: --conf spark.executor.extraJavaOptions=-XX:+UnlockCommercialFeatures -XX:+FlightRecorder But looks like it doesn't get set. Where I can add it to make it working? Thanks. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark java options
Hi Kane, Here's the command line you sent me privately: ./spark-1.2.0-bin-hadoop2.4/bin/spark-submit --class SimpleApp --conf spark.executor.extraJavaOptions=-XX:+UnlockCommercialFeatures -XX:+FlightRecorder --master local simpleapp.jar ./test.log You're running the app in local mode. In that mode, the executor is run as a thread inside the same JVM as the driver, so all executor-related options are ignored. Try using local-cluster[1,1,512] as your master, that will run a single executor in a separate process, and then you should see your options take effect. On Fri, Jan 16, 2015 at 11:12 AM, Kane Kim kane.ist...@gmail.com wrote: I want to add some java options when submitting application: --conf spark.executor.extraJavaOptions=-XX:+UnlockCommercialFeatures -XX:+FlightRecorder But looks like it doesn't get set. Where I can add it to make it working? Thanks. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Creating Apache Spark-powered “As Service” applications
The question is about the ways to create a Windows desktop-based and/or web-based application client that is able to connect and talk to the server containing Spark application (either local or on-premise cloud distributions) in the run-time. Any language/architecture may work. So far, I've seen two things that may be a help in that, but I'm not so sure if they would be the best alternative and how they work yet: Spark Job Server - https://github.com/spark-jobserver/spark-jobserver - defines a REST API for Spark Hue - http://gethue.com/get-started-with-spark-deploy-spark-server-and-compute-pi-from-your-web-browser/ - uses item 1) Any advice would be appreciated. Simple toy example program (or steps) that shows, e.g. how to build such client for simply creating Spark Context on a local machine and say reading text file and returning basic stats would be ideal answer! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Creating-Apache-Spark-powered-As-Service-applications-tp21193.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Creating Apache Spark-powered “As Service” applications
There's also an example of running a SparkContext in a java servlet container from Calrissian: https://github.com/calrissian/spark-jetty-server On Fri, Jan 16, 2015 at 2:31 PM, olegshirokikh o...@solver.com wrote: The question is about the ways to create a Windows desktop-based and/or web-based application client that is able to connect and talk to the server containing Spark application (either local or on-premise cloud distributions) in the run-time. Any language/architecture may work. So far, I've seen two things that may be a help in that, but I'm not so sure if they would be the best alternative and how they work yet: Spark Job Server - https://github.com/spark-jobserver/spark-jobserver - defines a REST API for Spark Hue - http://gethue.com/get-started-with-spark-deploy-spark-server-and-compute-pi-from-your-web-browser/ - uses item 1) Any advice would be appreciated. Simple toy example program (or steps) that shows, e.g. how to build such client for simply creating Spark Context on a local machine and say reading text file and returning basic stats would be ideal answer! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Creating-Apache-Spark-powered-As-Service-applications-tp21193.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Subscribe
Re: Subscribe
Send email to user-subscr...@spark.apache.org Cheers On Fri, Jan 16, 2015 at 11:51 AM, Andrew Musselman andrew.mussel...@gmail.com wrote:
Maven out of memory error
Just got the latest from Github and tried running `mvn test`; is this error common and do you have any advice on fixing it? Thanks! [INFO] --- scala-maven-plugin:3.2.0:compile (scala-compile-first) @ spark-core_2.10 --- [WARNING] Zinc server is not available at port 3030 - reverting to normal incremental compile [INFO] Using incremental compilation [INFO] compiler plugin: BasicArtifact(org.scalamacros,paradise_2.10.4,2.0.1,null) [INFO] Compiling 400 Scala sources and 34 Java sources to /home/akm/spark/core/target/scala-2.10/classes... [WARNING] /home/akm/spark/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala:22: imported `DataReadMethod' is permanently hidden by definition of object DataReadMethod in package executor [WARNING] import org.apache.spark.executor.DataReadMethod [WARNING] ^ [WARNING] /home/akm/spark/core/src/main/scala/org/apache/spark/TaskState.scala:41: match may not be exhaustive. It would fail on the following input: TASK_ERROR [WARNING] def fromMesos(mesosState: MesosTaskState): TaskState = mesosState match { [WARNING] ^ [WARNING] /home/akm/spark/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala:89: method isDirectory in class FileSystem is deprecated: see corresponding Javadoc for more information. [WARNING] if (!fileSystem.isDirectory(new Path(logBaseDir))) { [WARNING] ^ [ERROR] PermGen space - [Help 1] [ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch. [ERROR] Re-run Maven using the -X switch to enable full debug logging. [ERROR] [ERROR] For more information about the errors and possible solutions, please read the following articles: [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/OutOfMemoryError
Re: Maven out of memory error
Can you try doing this before running mvn ? export MAVEN_OPTS=-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m What OS are you using ? Cheers On Fri, Jan 16, 2015 at 12:03 PM, Andrew Musselman andrew.mussel...@gmail.com wrote: Just got the latest from Github and tried running `mvn test`; is this error common and do you have any advice on fixing it? Thanks! [INFO] --- scala-maven-plugin:3.2.0:compile (scala-compile-first) @ spark-core_2.10 --- [WARNING] Zinc server is not available at port 3030 - reverting to normal incremental compile [INFO] Using incremental compilation [INFO] compiler plugin: BasicArtifact(org.scalamacros,paradise_2.10.4,2.0.1,null) [INFO] Compiling 400 Scala sources and 34 Java sources to /home/akm/spark/core/target/scala-2.10/classes... [WARNING] /home/akm/spark/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala:22: imported `DataReadMethod' is permanently hidden by definition of object DataReadMethod in package executor [WARNING] import org.apache.spark.executor.DataReadMethod [WARNING] ^ [WARNING] /home/akm/spark/core/src/main/scala/org/apache/spark/TaskState.scala:41: match may not be exhaustive. It would fail on the following input: TASK_ERROR [WARNING] def fromMesos(mesosState: MesosTaskState): TaskState = mesosState match { [WARNING] ^ [WARNING] /home/akm/spark/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala:89: method isDirectory in class FileSystem is deprecated: see corresponding Javadoc for more information. [WARNING] if (!fileSystem.isDirectory(new Path(logBaseDir))) { [WARNING] ^ [ERROR] PermGen space - [Help 1] [ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch. [ERROR] Re-run Maven using the -X switch to enable full debug logging. [ERROR] [ERROR] For more information about the errors and possible solutions, please read the following articles: [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/OutOfMemoryError
Re: Creating Apache Spark-powered “As Service” applications
Hi, You can take a look at the Spark Kernel project: https://github.com/ibm-et/spark-kernel The Spark Kernel's goal is to serve as the foundation for interactive applications. The project provides a client library in Scala that abstracts connecting to the kernel (containing a Spark Context), which can be embedded into a web application. We demonstrated this at StataConf when we embedded the Spark Kernel client into a Play application to provide an interactive web application that communicates to Spark via the Spark Kernel (hosting a Spark Context). A getting started section can be found here: https://github.com/ibm-et/spark-kernel/wiki/Getting-Started-with-the-Spark-Kernel If you have any other questions, feel free to email me or communicate over our mailing list: spark-ker...@googlegroups.com https://groups.google.com/forum/#!forum/spark-kernel Signed, Chip Senkbeil IBM Emerging Technology Software Engineer From: olegshirokikh o...@solver.com To: user@spark.apache.org Date: 01/16/2015 01:32 PM Subject:Creating Apache Spark-powered “As Service” applications The question is about the ways to create a Windows desktop-based and/or web-based application client that is able to connect and talk to the server containing Spark application (either local or on-premise cloud distributions) in the run-time. Any language/architecture may work. So far, I've seen two things that may be a help in that, but I'm not so sure if they would be the best alternative and how they work yet: Spark Job Server - https://github.com/spark-jobserver/spark-jobserver - defines a REST API for Spark Hue - http://gethue.com/get-started-with-spark-deploy-spark-server-and-compute-pi-from-your-web-browser/ - uses item 1) Any advice would be appreciated. Simple toy example program (or steps) that shows, e.g. how to build such client for simply creating Spark Context on a local machine and say reading text file and returning basic stats would be ideal answer! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Creating-Apache-Spark-powered-As-Service-applications-tp21193.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Maven out of memory error
Thanks Ted, got farther along but now have a failing test; is this a known issue? --- T E S T S --- Running org.apache.spark.JavaAPISuite Tests run: 72, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 123.462 sec FAILURE! - in org.apache.spark.JavaAPISuite testGuavaOptional(org.apache.spark.JavaAPISuite) Time elapsed: 106.5 sec ERROR! org.apache.spark.SparkException: Job aborted due to stage failure: Master removed our application: FAILED at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1199) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1188) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1187) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1187) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1399) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1360) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Running org.apache.spark.JavaJdbcRDDSuite Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.846 sec - in org.apache.spark.JavaJdbcRDDSuite Results : Tests in error: JavaAPISuite.testGuavaOptional » Spark Job aborted due to stage failure: Maste... On Fri, Jan 16, 2015 at 12:06 PM, Ted Yu yuzhih...@gmail.com wrote: Can you try doing this before running mvn ? export MAVEN_OPTS=-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m What OS are you using ? Cheers On Fri, Jan 16, 2015 at 12:03 PM, Andrew Musselman andrew.mussel...@gmail.com wrote: Just got the latest from Github and tried running `mvn test`; is this error common and do you have any advice on fixing it? Thanks! [INFO] --- scala-maven-plugin:3.2.0:compile (scala-compile-first) @ spark-core_2.10 --- [WARNING] Zinc server is not available at port 3030 - reverting to normal incremental compile [INFO] Using incremental compilation [INFO] compiler plugin: BasicArtifact(org.scalamacros,paradise_2.10.4,2.0.1,null) [INFO] Compiling 400 Scala sources and 34 Java sources to /home/akm/spark/core/target/scala-2.10/classes... [WARNING] /home/akm/spark/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala:22: imported `DataReadMethod' is permanently hidden by definition of object DataReadMethod in package executor [WARNING] import org.apache.spark.executor.DataReadMethod [WARNING] ^ [WARNING] /home/akm/spark/core/src/main/scala/org/apache/spark/TaskState.scala:41: match may not be exhaustive. It would fail on the following input: TASK_ERROR [WARNING] def fromMesos(mesosState: MesosTaskState): TaskState = mesosState match { [WARNING] ^ [WARNING] /home/akm/spark/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala:89: method isDirectory in class FileSystem is deprecated: see corresponding Javadoc for more information. [WARNING] if (!fileSystem.isDirectory(new Path(logBaseDir))) { [WARNING] ^ [ERROR] PermGen space - [Help 1] [ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch. [ERROR] Re-run Maven using the -X switch to enable full debug logging. [ERROR] [ERROR] For more information about the errors and possible solutions, please read the following articles: [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/OutOfMemoryError
Re: Maven out of memory error
Thanks Sean On Fri, Jan 16, 2015 at 12:06 PM, Sean Owen so...@cloudera.com wrote: Hey Andrew, you'll want to have a look at the Spark docs on building: http://spark.apache.org/docs/latest/building-spark.html It's the first thing covered there. The warnings are normal as you are probably building with newer Hadoop profiles and so old-Hadoop support code shows deprecation warnings on its use of old APIs. On Fri, Jan 16, 2015 at 8:03 PM, Andrew Musselman andrew.mussel...@gmail.com wrote: Just got the latest from Github and tried running `mvn test`; is this error common and do you have any advice on fixing it? Thanks! [INFO] --- scala-maven-plugin:3.2.0:compile (scala-compile-first) @ spark-core_2.10 --- [WARNING] Zinc server is not available at port 3030 - reverting to normal incremental compile [INFO] Using incremental compilation [INFO] compiler plugin: BasicArtifact(org.scalamacros,paradise_2.10.4,2.0.1,null) [INFO] Compiling 400 Scala sources and 34 Java sources to /home/akm/spark/core/target/scala-2.10/classes... [WARNING] /home/akm/spark/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala:22: imported `DataReadMethod' is permanently hidden by definition of object DataReadMethod in package executor [WARNING] import org.apache.spark.executor.DataReadMethod [WARNING] ^ [WARNING] /home/akm/spark/core/src/main/scala/org/apache/spark/TaskState.scala:41: match may not be exhaustive. It would fail on the following input: TASK_ERROR [WARNING] def fromMesos(mesosState: MesosTaskState): TaskState = mesosState match { [WARNING] ^ [WARNING] /home/akm/spark/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala:89: method isDirectory in class FileSystem is deprecated: see corresponding Javadoc for more information. [WARNING] if (!fileSystem.isDirectory(new Path(logBaseDir))) { [WARNING] ^ [ERROR] PermGen space - [Help 1] [ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch. [ERROR] Re-run Maven using the -X switch to enable full debug logging. [ERROR] [ERROR] For more information about the errors and possible solutions, please read the following articles: [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/OutOfMemoryError
Performance issue
Hi, I observed some weird performance issue using Spark in combination with Theano, and I have no real explanation for that. To exemplify the issue I am using the pi.py example of spark that computes pi: When I modify the function from the example: #unmodified code def f(_): x = random() * 2 - 1 y = random() * 2 - 1 return 1 if x ** 2 + y ** 2 1 else 0 count = sc.parallelize(xrange(1, n + 1), partitions).map(f).reduce(add) # by adding a very simple dummy function that just computes the product of two floats, the execution slows down massively (about 100x slower). Here is the slow code: # define simple function in theano that computes the product x = T.dscalar() y = T.dscalar() dummyFun = theano.function([x,y],y * x) broadcast_dummyFun = sc.broadcast(dummyFun) def f(_): x = random() * 2 - 1 y = random() * 2 - 1 # compute product tmp = broadcast_dummyFun.value(x,y) return 1 if x ** 2 + y ** 2 1 else 0 Any idea why it slows down so much? Using a python function that computes the product (or lambda function) again gives full-speed. I would appreciate some help on that. -Tassilo -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Performance-issue-tp21194.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkSQL schemaRDD MapPartitions calls - performance issues - columnar formats?
You can get the internal RDD using: schemaRDD.queryExecution.toRDD. This is used internally and does not copy. This is an unstable developer API. On Thu, Jan 15, 2015 at 11:26 PM, Nathan McCarthy nathan.mccar...@quantium.com.au wrote: Thanks Cheng! Is there any API I can get access too (e.g. ParquetTableScan) which would allow me to load up the low level/baseRDD of just RDD[Row] so I could avoid the defensive copy (maybe lose our on columnar storage etc.). We have parts of our pipeline using SparkSQL/SchemaRDDs and others using the core RDD api (mapPartitions etc.). Any tips? Out of curiosity, a lot of SparkSQL functions seem to run in a mapPartiton (e.g. Distinct). Does a defensive copy happen there too? Keen to get the best performance and the best blend of SparkSQL and functional Spark. Cheers, Nathan From: Cheng Lian lian.cs@gmail.com Date: Monday, 12 January 2015 1:21 am To: Nathan nathan.mccar...@quantium.com.au, Michael Armbrust mich...@databricks.com Cc: user@spark.apache.org user@spark.apache.org Subject: Re: SparkSQL schemaRDD MapPartitions calls - performance issues - columnar formats? On 1/11/15 1:40 PM, Nathan McCarthy wrote: Thanks Cheng Michael! Makes sense. Appreciate the tips! Idiomatic scala isn't performant. I’ll definitely start using while loops or tail recursive methods. I have noticed this in the spark code base. I might try turning off columnar compression (via *spark.sql.inMemoryColumnarStorage.compressed=false *correct?) and see how performance compares to the primitive objects. Would you expect to see similar runtimes vs the primitive objects? We do have the luxury of lots of memory at the moment so this might give us an additional performance boost. Turning off compression should be faster, but still slower than directly using primitive objects. Because Spark SQL also serializes all objects within a column into byte buffers in a compact format. However, this radically reduces number of Java objects in the heap and is more GC friendly. When running large queries, cost introduced by GC can be significant. Regarding the defensive copying of row objects. Can we switch this off and just be aware of the risks? Is MapPartitions on SchemaRDDs and operating on the Row object the most performant way to be flipping between SQL Scala user code? Is there anything else I could be doing? This can be very dangerous and error prone. Whenever an operator tries to cache row objects, turning off defensive copying can introduce wrong query result. For example, sort-based shuffle caches rows to do sorting. In some cases, sample operator may also cache row objects. This is very implementation specific and may change between versions. Cheers, ~N From: Michael Armbrust mich...@databricks.com Date: Saturday, 10 January 2015 3:41 am To: Cheng Lian lian.cs@gmail.com Cc: Nathan nathan.mccar...@quantium.com.au, user@spark.apache.org user@spark.apache.org Subject: Re: SparkSQL schemaRDD MapPartitions calls - performance issues - columnar formats? The other thing to note here is that Spark SQL defensively copies rows when we switch into user code. This probably explains the difference between 1 2. The difference between 1 3 is likely the cost of decompressing the column buffers vs. accessing a bunch of uncompressed primitive objects. On Fri, Jan 9, 2015 at 6:59 AM, Cheng Lian lian.cs@gmail.com wrote: Hey Nathan, Thanks for sharing, this is a very interesting post :) My comments are inlined below. Cheng On 1/7/15 11:53 AM, Nathan McCarthy wrote: Hi, I’m trying to use a combination of SparkSQL and ‘normal' Spark/Scala via rdd.mapPartitions(…). Using the latest release 1.2.0. Simple example; load up some sample data from parquet on HDFS (about 380m rows, 10 columns) on a 7 node cluster. val t = sqlC.parquetFile(/user/n/sales-tran12m.parquet”) t.registerTempTable(test1”) sqlC.cacheTable(test1”) Now lets do some operations on it; I want the total sales quantities sold for each hour in the day so I choose 3 out of the 10 possible columns... sqlC.sql(select Hour, sum(ItemQty), sum(Sales) from test1 group by Hour).collect().foreach(println) After the table has been 100% cached in memory, this takes around 11 seconds. Lets do the same thing but via a MapPartitions call (this isn’t production ready code but gets the job done). val try2 = sqlC.sql(select Hour, ItemQty, Sales from test1”) rddPC.mapPartitions { case hrs = val qtySum = new Array[Double](24) val salesSum = new Array[Double](24) for(r - hrs) { val hr = r.getInt(0) qtySum(hr) += r.getDouble(1) salesSum(hr) += r.getDouble(2) } (salesSum zip qtySum).zipWithIndex.map(_.swap).iterator }.reduceByKey((a,b) = (a._1 + b._1, a._2 + b._2)).collect().foreach(println) I believe the evil thing that makes this snippet much slower is the
Re: Maven out of memory error
I got the same error: testGuavaOptional(org.apache.spark.JavaAPISuite) Time elapsed: 261.111 sec ERROR! org.apache.spark.SparkException: Job aborted due to stage failure: Master removed our application: FAILED at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1199) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1188) Looking under ore/target/surefire-reports/ , I don't see test output. Trying to figure out how test output can be generated. Cheers On Fri, Jan 16, 2015 at 12:26 PM, Andrew Musselman andrew.mussel...@gmail.com wrote: Thanks Ted, got farther along but now have a failing test; is this a known issue? --- T E S T S --- Running org.apache.spark.JavaAPISuite Tests run: 72, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 123.462 sec FAILURE! - in org.apache.spark.JavaAPISuite testGuavaOptional(org.apache.spark.JavaAPISuite) Time elapsed: 106.5 sec ERROR! org.apache.spark.SparkException: Job aborted due to stage failure: Master removed our application: FAILED at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1199) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1188) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1187) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1187) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1399) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1360) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Running org.apache.spark.JavaJdbcRDDSuite Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.846 sec - in org.apache.spark.JavaJdbcRDDSuite Results : Tests in error: JavaAPISuite.testGuavaOptional » Spark Job aborted due to stage failure: Maste... On Fri, Jan 16, 2015 at 12:06 PM, Ted Yu yuzhih...@gmail.com wrote: Can you try doing this before running mvn ? export MAVEN_OPTS=-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m What OS are you using ? Cheers On Fri, Jan 16, 2015 at 12:03 PM, Andrew Musselman andrew.mussel...@gmail.com wrote: Just got the latest from Github and tried running `mvn test`; is this error common and do you have any advice on fixing it? Thanks! [INFO] --- scala-maven-plugin:3.2.0:compile (scala-compile-first) @ spark-core_2.10 --- [WARNING] Zinc server is not available at port 3030 - reverting to normal incremental compile [INFO] Using incremental compilation [INFO] compiler plugin: BasicArtifact(org.scalamacros,paradise_2.10.4,2.0.1,null) [INFO] Compiling 400 Scala sources and 34 Java sources to /home/akm/spark/core/target/scala-2.10/classes... [WARNING] /home/akm/spark/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala:22: imported `DataReadMethod' is permanently hidden by definition of object DataReadMethod in package executor [WARNING] import org.apache.spark.executor.DataReadMethod [WARNING] ^ [WARNING] /home/akm/spark/core/src/main/scala/org/apache/spark/TaskState.scala:41: match may not be exhaustive. It would fail on the following input: TASK_ERROR [WARNING] def fromMesos(mesosState: MesosTaskState): TaskState = mesosState match { [WARNING] ^ [WARNING]
Cluster Aware Custom RDD
Hello all, I have a custom RDD for fast loading of data from a non-partitioned source. The partitioning happens in the RDD implementation by pushing data from the source into queues picked up by the current active partitions in worker threads. This works great on a multi-threaded single host (say with the manager set to local[x] ) but I'd like to run it distributed. However, I need to know, not only which slice my partition is, but also which host (by sequence) it's on so I can divide up the source by worker (host) and then run the multi-threaded. In other words, I need what effectively amounts to a 2-tier slice identifier. I know this is probably unorthodox, but is there some way to get this information in the compute method or the deserialized Partition objects? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Cluster-Aware-Custom-RDD-tp21196.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark random forest - string data
Hi, I have been playing around with the new version of Spark MLlib Random forest implementation, and while in the process, tried it with a file with String Features. While training, it fails with: java.lang.NumberFormatException: For input string. Is MBLib Random forest adapted to run on top of numeric data only? Thanks
RE: Creating Apache Spark-powered “As Service” applications
Thanks a lot, Robert – I’ll definitely investigate this and probably would come back with questions. P.S. I’m new to this Spark forum. I’m getting responses through emails but they are not appearing as “replies” in the thread – it’s kind of inconvenient. Is it something that I should tweak? Thanks, Oleg From: Robert C Senkbeil [mailto:rcsen...@us.ibm.com] Sent: Friday, January 16, 2015 12:21 PM To: Oleg Shirokikh Cc: user@spark.apache.org Subject: Re: Creating Apache Spark-powered “As Service” applications Hi, You can take a look at the Spark Kernel project: https://github.com/ibm-et/spark-kernel The Spark Kernel's goal is to serve as the foundation for interactive applications. The project provides a client library in Scala that abstracts connecting to the kernel (containing a Spark Context), which can be embedded into a web application. We demonstrated this at StataConf when we embedded the Spark Kernel client into a Play application to provide an interactive web application that communicates to Spark via the Spark Kernel (hosting a Spark Context). A getting started section can be found here: https://github.com/ibm-et/spark-kernel/wiki/Getting-Started-with-the-Spark-Kernel If you have any other questions, feel free to email me or communicate over our mailing list: spark-ker...@googlegroups.commailto:spark-ker...@googlegroups.com https://groups.google.com/forum/#!forum/spark-kernel Signed, Chip Senkbeil IBM Emerging Technology Software Engineer [Inactive hide details for olegshirokikh ---01/16/2015 01:32:43 PM---The question is about the ways to create a Windows desktop-]olegshirokikh ---01/16/2015 01:32:43 PM---The question is about the ways to create a Windows desktop-based and/or web-based application client From: olegshirokikh o...@solver.commailto:o...@solver.com To: user@spark.apache.orgmailto:user@spark.apache.org Date: 01/16/2015 01:32 PM Subject: Creating Apache Spark-powered “As Service” applications The question is about the ways to create a Windows desktop-based and/or web-based application client that is able to connect and talk to the server containing Spark application (either local or on-premise cloud distributions) in the run-time. Any language/architecture may work. So far, I've seen two things that may be a help in that, but I'm not so sure if they would be the best alternative and how they work yet: Spark Job Server - https://github.com/spark-jobserver/spark-jobserver - defines a REST API for Spark Hue - http://gethue.com/get-started-with-spark-deploy-spark-server-and-compute-pi-from-your-web-browser/ - uses item 1) Any advice would be appreciated. Simple toy example program (or steps) that shows, e.g. how to build such client for simply creating Spark Context on a local machine and say reading text file and returning basic stats would be ideal answer! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Creating-Apache-Spark-powered-As-Service-applications-tp21193.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org
Re: Spark random forest - string data
The implementation accepts an RDD of LabeledPoint only, so you couldn't feed in strings from a text file directly. LabeledPoint is a wrapper around double values rather than strings. How were you trying to create the input then? No, it only accepts numeric values, although you can encode categorical values as 0, 1, 2 ... and tell the implementation about your categorical features to use categorical features. On Fri, Jan 16, 2015 at 9:25 PM, Asaf Lahav asaf.la...@gmail.com wrote: Hi, I have been playing around with the new version of Spark MLlib Random forest implementation, and while in the process, tried it with a file with String Features. While training, it fails with: java.lang.NumberFormatException: For input string. Is MBLib Random forest adapted to run on top of numeric data only? Thanks - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Using Spark SQL with multiple (avro) files
I'd open an issue on the github to ask us to allow you to use hadoops glob file format for the path. On Thu, Jan 15, 2015 at 4:57 AM, David Jones letsnumsperi...@gmail.com wrote: I've tried this now. Spark can load multiple avro files from the same directory by passing a path to a directory. However, passing multiple paths separated with commas didn't work. Is there any way to load all avro files in multiple directories using sqlContext.avroFile? On Wed, Jan 14, 2015 at 3:53 PM, David Jones letsnumsperi...@gmail.com wrote: Should I be able to pass multiple paths separated by commas? I haven't tried but didn't think it'd work. I'd expected a function that accepted a list of strings. On Wed, Jan 14, 2015 at 3:20 PM, Yana Kadiyska yana.kadiy...@gmail.com wrote: If the wildcard path you have doesn't work you should probably open a bug -- I had a similar problem with Parquet and it was a bug which recently got closed. Not sure if sqlContext.avroFile shares a codepath with .parquetFile...you can try running with bits that have the fix for .parquetFile or look at the source... Here was my question for reference: http://mail-archives.apache.org/mod_mbox/spark-user/201412.mbox/%3ccaaswr-5rfmu-y-7htluj2eqqaecwjs8jh+irrzhm7g1ex7v...@mail.gmail.com%3E On Wed, Jan 14, 2015 at 4:34 AM, David Jones letsnumsperi...@gmail.com wrote: Hi, I have a program that loads a single avro file using spark SQL, queries it, transforms it and then outputs the data. The file is loaded with: val records = sqlContext.avroFile(filePath) val data = records.registerTempTable(data) ... Now I want to run it over tens of thousands of Avro files (all with schemas that contain the fields I'm interested in). Is it possible to load multiple avro files recursively from a top-level directory using wildcards? All my avro files are stored under s3://my-bucket/avros/*/DATE/*.avro, and I want to run my task across all of these on EMR. If that's not possible, is there some way to load multiple avro files into the same table/RDD so the whole dataset can be processed (and in that case I'd supply paths to each file concretely, but I *really* don't want to have to do that). Thanks David
Re: Spark random forest - string data
An alternative approach would be to translate your categorical variables into dummy variables. If your strings represent N classes/categories you would generate N-1 dummy variables containing 0/1 values. Auto-magically creating dummy variables from categorical data definitely comes in handy. I assume this is what SPARK-1216 is referring to, but I am not sure from the description. https://issues.apache.org/jira/browse/SPARK-1216 Auto-magically doing the scheme that Sean mentioned is referenced in SPARK-4081, I believe. https://issues.apache.org/jira/browse/SPARK-4081 On Fri, Jan 16, 2015 at 4:45 PM, Sean Owen so...@cloudera.com wrote: The implementation accepts an RDD of LabeledPoint only, so you couldn't feed in strings from a text file directly. LabeledPoint is a wrapper around double values rather than strings. How were you trying to create the input then? No, it only accepts numeric values, although you can encode categorical values as 0, 1, 2 ... and tell the implementation about your categorical features to use categorical features. On Fri, Jan 16, 2015 at 9:25 PM, Asaf Lahav asaf.la...@gmail.com wrote: Hi, I have been playing around with the new version of Spark MLlib Random forest implementation, and while in the process, tried it with a file with String Features. While training, it fails with: java.lang.NumberFormatException: For input string. Is MBLib Random forest adapted to run on top of numeric data only? Thanks - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Nick Allen n...@nickallen.org
Re: Spark random forest - string data
Hi Asaf, featurestream [1] is an internal project I'm playing with that includes support for some of this, in particular: * 1-pass random forest construction * schema inference * native support for text fields Would this be of interest? It's not open source, but if there's sufficient demand I can get access to it. [1] https://github.com/featurestream On 16 January 2015 at 13:59, Nick Allen n...@nickallen.org wrote: An alternative approach would be to translate your categorical variables into dummy variables. If your strings represent N classes/categories you would generate N-1 dummy variables containing 0/1 values. Auto-magically creating dummy variables from categorical data definitely comes in handy. I assume this is what SPARK-1216 is referring to, but I am not sure from the description. https://issues.apache.org/jira/browse/SPARK-1216 Auto-magically doing the scheme that Sean mentioned is referenced in SPARK-4081, I believe. https://issues.apache.org/jira/browse/SPARK-4081 On Fri, Jan 16, 2015 at 4:45 PM, Sean Owen so...@cloudera.com wrote: The implementation accepts an RDD of LabeledPoint only, so you couldn't feed in strings from a text file directly. LabeledPoint is a wrapper around double values rather than strings. How were you trying to create the input then? No, it only accepts numeric values, although you can encode categorical values as 0, 1, 2 ... and tell the implementation about your categorical features to use categorical features. On Fri, Jan 16, 2015 at 9:25 PM, Asaf Lahav asaf.la...@gmail.com wrote: Hi, I have been playing around with the new version of Spark MLlib Random forest implementation, and while in the process, tried it with a file with String Features. While training, it fails with: java.lang.NumberFormatException: For input string. Is MBLib Random forest adapted to run on top of numeric data only? Thanks - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Nick Allen n...@nickallen.org
spark 1.2 compatibility
Is spark 1.2 is compatibly with HDP 2.1 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-1-2-compatibility-tp21197.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Processing .wav files in PySpark
I think you can not use textFile() or binaryFile() or pickleFile() here, it's different format than wav. You could get a list of paths for all the files, then sc.parallelize(), and foreach(): def process(path): # use subprocess to launch a process to do the job, read the stdout as result files = [] # a list of path of wav files sc.parallelize(files, len(files)).foreach(process) On Fri, Jan 16, 2015 at 2:11 PM, Venkat, Ankam ankam.ven...@centurylink.com wrote: I need to process .wav files in Pyspark. If the files are in local file system, I am able to process them. Once I store them on HDFS, I am facing issues. For example, I run a sox program on a wav file like this. sox ext2187854_03_27_2014.wav -n stats -- works fine sox hdfs://xxx:8020/user/ab00855/ext2187854_03_27_2014.wav -n stats -- Does not work as sox cannot read HDFS file. So, I do like this. hadoop fs -cat hdfs://xxx:8020/user/ab00855/ext2187854_03_27_2014.wav | sox -t wav - -n stats -- This works fine But, I am not able to do this in PySpark. wavfile = sc.textFile('hdfs://xxx:8020/user/ab00855/ext2187854_03_27_2014.wav') wavfile.pipe(subprocess.call(['sox', '-t' 'wav', '-', '-n', 'stats'])) I tried different options like sc.binaryFiles and sc.pickleFile. Any thoughts? Regards, Venkat Ankam This communication is the property of CenturyLink and may contain confidential or privileged information. Unauthorized use of this communication is strictly prohibited and may be unlawful. If you have received this communication in error, please immediately notify the sender by reply e-mail and destroy all copies of the communication and any attachments. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Row similarities
What's a good way to calculate similarities between all vector-rows in a matrix or RDD[Vector]? I'm seeing RowMatrix has a columnSimilarities method but I'm not sure I'm going down a good path to transpose a matrix in order to run that.
RE: spark 1.2 compatibility
Yes. It's compatible with HDP 2.1 -Original Message- From: bhavyateja [mailto:bhavyateja.potin...@gmail.com] Sent: Friday, January 16, 2015 3:17 PM To: user@spark.apache.org Subject: spark 1.2 compatibility Is spark 1.2 is compatibly with HDP 2.1 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-1-2-compatibility-tp21197.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: spark 1.2 compatibility
Should clarify on this. I personally have used HDP 2.1 + Spark 1.2 and have not seen a problem. However officially HDP 2.1 + Spark 1.2 is not a supported scenario. -Original Message- From: Judy Nash Sent: Friday, January 16, 2015 5:35 PM To: 'bhavyateja'; user@spark.apache.org Subject: RE: spark 1.2 compatibility Yes. It's compatible with HDP 2.1 -Original Message- From: bhavyateja [mailto:bhavyateja.potin...@gmail.com] Sent: Friday, January 16, 2015 3:17 PM To: user@spark.apache.org Subject: spark 1.2 compatibility Is spark 1.2 is compatibly with HDP 2.1 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-1-2-compatibility-tp21197.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark 1.2 compatibility
The Apache Spark project should work with it, but I'm not sure you can get support from HDP (if you have that). Matei On Jan 16, 2015, at 5:36 PM, Judy Nash judyn...@exchange.microsoft.com wrote: Should clarify on this. I personally have used HDP 2.1 + Spark 1.2 and have not seen a problem. However officially HDP 2.1 + Spark 1.2 is not a supported scenario. -Original Message- From: Judy Nash Sent: Friday, January 16, 2015 5:35 PM To: 'bhavyateja'; user@spark.apache.org Subject: RE: spark 1.2 compatibility Yes. It's compatible with HDP 2.1 -Original Message- From: bhavyateja [mailto:bhavyateja.potin...@gmail.com] Sent: Friday, January 16, 2015 3:17 PM To: user@spark.apache.org Subject: spark 1.2 compatibility Is spark 1.2 is compatibly with HDP 2.1 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-1-2-compatibility-tp21197.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
HDFS Namenode in safemode when I turn off my EC2 instance
Hello Everyone, I am encountering trouble running Spark applications when I shut down my EC2 instances. Everything else seems to work except Spark. When I try running a simple Spark application, like sc.parallelize() I get the message that hdfs name node is in safemode. Has anyone else had this issue? Is there a proper protocol I should be following to turn off my spark nodes? Thank you!
Re: Spark SQL Custom Predicate Pushdown
Hao, Thanks so much for the links! This is exactly what I'm looking for. If I understand correctly, I can extend PrunedFilteredScan, PrunedScan, and TableScan and I should be able to support all the sql semantics? I'm a little confused about the Array[Filter] that is used with the Filtered scan. I have the ability to perform pretty robust seeks in the underlying data sets in Accumulo. I have an inverted index and I'm able to do intersections as well as unions- and rich predicates which form a tree of alternating intersections and unions. If I understand correctly- the Array[Filter] is to be treated as an AND operator? Do OR operators get propagated through the API at all? I'm trying to do as much pairing down of the dataset as possible on the individual tablet servers so that the data loaded into the spark layer is minimal- really used to perform joins, groupBys, sortBys and other computations that would require the relations to be combined in various ways. Thanks again for pointing me to this. On Fri, Jan 16, 2015 at 2:07 AM, Cheng, Hao hao.ch...@intel.com wrote: The Data Source API probably work for this purpose. It support the column pruning and the Predicate Push Down: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala Examples also can be found in the unit test: https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/sources *From:* Corey Nolet [mailto:cjno...@gmail.com] *Sent:* Friday, January 16, 2015 1:51 PM *To:* user *Subject:* Spark SQL Custom Predicate Pushdown I have document storage services in Accumulo that I'd like to expose to Spark SQL. I am able to push down predicate logic to Accumulo to have it perform only the seeks necessary on each tablet server to grab the results being asked for. I'm interested in using Spark SQL to push those predicates down to the tablet servers. Where wouldI begin my implementation? Currently I have an input format which accepts a query object that gets pushed down. How would I extract this information from the HiveContext/SQLContext to be able to push this down?
Re: Low Level Kafka Consumer for Spark
Hi Dib, For our usecase I want my spark job1 to read from hdfs/cache and write to kafka queues. Similarly spark job2 should read from kafka queues and write to kafka queues. Is writing to kafka queues from spark job supported in your code ? Thanks Deb On Jan 15, 2015 11:21 PM, Akhil Das ak...@sigmoidanalytics.com wrote: There was a simple example https://github.com/dibbhatt/kafka-spark-consumer/blob/master/examples/scala/LowLevelKafkaConsumer.scala#L45 which you can run after changing few lines of configurations. Thanks Best Regards On Fri, Jan 16, 2015 at 12:23 PM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Hi Kidong, Just now I tested the Low Level Consumer with Spark 1.2 and I did not see any issue with Receiver.Store method . It is able to fetch messages form Kafka. Can you cross check other configurations in your setup like Kafka broker IP , topic name, zk host details, consumer id etc. Dib On Fri, Jan 16, 2015 at 11:50 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Hi Kidong, No , I have not tried yet with Spark 1.2 yet. I will try this out and let you know how this goes. By the way, is there any change in Receiver Store method happened in Spark 1.2 ? Regards, Dibyendu On Fri, Jan 16, 2015 at 11:25 AM, mykidong mykid...@gmail.com wrote: Hi Dibyendu, I am using kafka 0.8.1.1 and spark 1.2.0. After modifying these version of your pom, I have rebuilt your codes. But I have not got any messages from ssc.receiverStream(new KafkaReceiver(_props, i)). I have found, in your codes, all the messages are retrieved correctly, but _receiver.store(_dataBuffer.iterator()) which is spark streaming abstract class's method does not seem to work correctly. Have you tried running your spark streaming kafka consumer with kafka 0.8.1.1 and spark 1.2.0 ? - Kidong. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p21180.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: ALS.trainImplicit running out of mem when using higher rank
although this helped to improve it significantly I still run into this problem despite increasing the spark.yarn.executor.memoryOverhead vastly: export SPARK_EXECUTOR_MEMORY=24Gspark.yarn.executor.memoryOverhead=6144 yet getting this:2015-01-17 04:47:40,389 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Container [pid=30211,containerID=container_1421451766649_0002_01_115969] is running beyond physical memory limits. Current usage: 30.1 GB of 30 GB physical memory used; 33.0 GB of 63.0 GB virtual memory used. Killing container. is there anything more I can do? thanks,Antony. On Monday, 12 January 2015, 8:21, Antony Mayi antonym...@yahoo.com wrote: this seems to have sorted it, awesome, thanks for great help.Antony. On Sunday, 11 January 2015, 13:02, Sean Owen so...@cloudera.com wrote: I would expect the size of the user/item feature RDDs to grow linearly with the rank, of course. They are cached, so that would drive cache memory usage on the cluster. This wouldn't cause executors to fail for running out of memory though. In fact, your error does not show the task failing for lack of memory. What it shows is that YARN thinks the task is using a little bit more memory than it said it would, and killed it. This happens sometimes with JVM-based YARN jobs since a JVM configured to use X heap ends up using a bit more than X physical memory if the heap reaches max size. So there's a bit of headroom built in and controlled by spark.yarn.executor.memoryOverhead (http://spark.apache.org/docs/latest/running-on-yarn.html) You can try increasing it to a couple GB. On Sun, Jan 11, 2015 at 9:43 AM, Antony Mayi antonym...@yahoo.com.invalid wrote: the question really is whether this is expected that the memory requirements grow rapidly with the rank... as I would expect memory is rather O(1) problem with dependency only on the size of input data. if this is expected is there any rough formula to determine the required memory based on ALS input and parameters? thanks, Antony. On Saturday, 10 January 2015, 10:47, Antony Mayi antonym...@yahoo.com wrote: the actual case looks like this: * spark 1.1.0 on yarn (cdh 5.2.1) * ~8-10 executors, 36GB phys RAM per host * input RDD is roughly 3GB containing ~150-200M items (and this RDD is made persistent using .cache()) * using pyspark yarn is configured with the limit yarn.nodemanager.resource.memory-mb of 33792 (33GB), spark is set to be: SPARK_EXECUTOR_CORES=6 SPARK_EXECUTOR_INSTANCES=9 SPARK_EXECUTOR_MEMORY=30G when using higher rank (above 20) for ALS.trainImplicit the executor runs after some time (~hour) of execution out of the yarn limit and gets killed: 2015-01-09 17:51:27,130 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Container [pid=27125,containerID=container_1420871936411_0002_01_23] is running beyond physical memory limits. Current usage: 31.2 GB of 31 GB physical memory used; 34.7 GB of 65.1 GB virtual memory used. Killing container. thanks for any ideas, Antony. On Saturday, 10 January 2015, 10:11, Antony Mayi antonym...@yahoo.com wrote: the memory requirements seem to be rapidly growing hen using higher rank... I am unable to get over 20 without running out of memory. is this expected? thanks, Antony. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
My code handles the Kafka Consumer part. But writing to Kafka may not be a big challenge which you can easily do in your driver code. dibyendu On Sat, Jan 17, 2015 at 9:43 AM, Debasish Das debasish.da...@gmail.com wrote: Hi Dib, For our usecase I want my spark job1 to read from hdfs/cache and write to kafka queues. Similarly spark job2 should read from kafka queues and write to kafka queues. Is writing to kafka queues from spark job supported in your code ? Thanks Deb On Jan 15, 2015 11:21 PM, Akhil Das ak...@sigmoidanalytics.com wrote: There was a simple example https://github.com/dibbhatt/kafka-spark-consumer/blob/master/examples/scala/LowLevelKafkaConsumer.scala#L45 which you can run after changing few lines of configurations. Thanks Best Regards On Fri, Jan 16, 2015 at 12:23 PM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Hi Kidong, Just now I tested the Low Level Consumer with Spark 1.2 and I did not see any issue with Receiver.Store method . It is able to fetch messages form Kafka. Can you cross check other configurations in your setup like Kafka broker IP , topic name, zk host details, consumer id etc. Dib On Fri, Jan 16, 2015 at 11:50 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Hi Kidong, No , I have not tried yet with Spark 1.2 yet. I will try this out and let you know how this goes. By the way, is there any change in Receiver Store method happened in Spark 1.2 ? Regards, Dibyendu On Fri, Jan 16, 2015 at 11:25 AM, mykidong mykid...@gmail.com wrote: Hi Dibyendu, I am using kafka 0.8.1.1 and spark 1.2.0. After modifying these version of your pom, I have rebuilt your codes. But I have not got any messages from ssc.receiverStream(new KafkaReceiver(_props, i)). I have found, in your codes, all the messages are retrieved correctly, but _receiver.store(_dataBuffer.iterator()) which is spark streaming abstract class's method does not seem to work correctly. Have you tried running your spark streaming kafka consumer with kafka 0.8.1.1 and spark 1.2.0 ? - Kidong. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p21180.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Maven out of memory error
I tried the following but still didn't see test output :-( diff --git a/pom.xml b/pom.xml index f4466e5..dae2ae8 100644 --- a/pom.xml +++ b/pom.xml @@ -1131,6 +1131,7 @@ spark.driver.allowMultipleContextstrue/spark.driver.allowMultipleContexts /systemProperties failIfNoTestsfalse/failIfNoTests +redirectTestOutputToFiletrue/redirectTestOutputToFile /configuration /plugin !-- Scalatest runs all Scala tests -- On Fri, Jan 16, 2015 at 12:41 PM, Ted Yu yuzhih...@gmail.com wrote: I got the same error: testGuavaOptional(org.apache.spark.JavaAPISuite) Time elapsed: 261.111 sec ERROR! org.apache.spark.SparkException: Job aborted due to stage failure: Master removed our application: FAILED at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1199) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1188) Looking under ore/target/surefire-reports/ , I don't see test output. Trying to figure out how test output can be generated. Cheers On Fri, Jan 16, 2015 at 12:26 PM, Andrew Musselman andrew.mussel...@gmail.com wrote: Thanks Ted, got farther along but now have a failing test; is this a known issue? --- T E S T S --- Running org.apache.spark.JavaAPISuite Tests run: 72, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 123.462 sec FAILURE! - in org.apache.spark.JavaAPISuite testGuavaOptional(org.apache.spark.JavaAPISuite) Time elapsed: 106.5 sec ERROR! org.apache.spark.SparkException: Job aborted due to stage failure: Master removed our application: FAILED at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1199) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1188) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1187) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1187) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1399) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1360) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Running org.apache.spark.JavaJdbcRDDSuite Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.846 sec - in org.apache.spark.JavaJdbcRDDSuite Results : Tests in error: JavaAPISuite.testGuavaOptional » Spark Job aborted due to stage failure: Maste... On Fri, Jan 16, 2015 at 12:06 PM, Ted Yu yuzhih...@gmail.com wrote: Can you try doing this before running mvn ? export MAVEN_OPTS=-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m What OS are you using ? Cheers On Fri, Jan 16, 2015 at 12:03 PM, Andrew Musselman andrew.mussel...@gmail.com wrote: Just got the latest from Github and tried running `mvn test`; is this error common and do you have any advice on fixing it? Thanks! [INFO] --- scala-maven-plugin:3.2.0:compile (scala-compile-first) @ spark-core_2.10 --- [WARNING] Zinc server is not available at port 3030 - reverting to normal incremental compile [INFO] Using incremental compilation [INFO] compiler plugin: BasicArtifact(org.scalamacros,paradise_2.10.4,2.0.1,null) [INFO] Compiling 400 Scala sources and 34 Java sources to /home/akm/spark/core/target/scala-2.10/classes... [WARNING] /home/akm/spark/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala:22: imported
Re: Row similarities
You can use K-means https://spark.apache.org/docs/latest/mllib-clustering.html with a suitably large k. Each cluster should correspond to rows that are similar to one another. On Fri, Jan 16, 2015 at 5:18 PM, Andrew Musselman andrew.mussel...@gmail.com wrote: What's a good way to calculate similarities between all vector-rows in a matrix or RDD[Vector]? I'm seeing RowMatrix has a columnSimilarities method but I'm not sure I'm going down a good path to transpose a matrix in order to run that.
Futures timed out during unpersist
Hi experts, I got an error during unpersist RDD. Any ideas? java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.storage.BlockManagerMaster.removeRdd(BlockManagerMaster.scala:103) at org.apache.spark.SparkContext.unpersistRDD(SparkContext.scala:951) at org.apache.spark.rdd.RDD.unpersist(RDD.scala:168)