RE: Spark cluster set up on EC2 customization
Thanks! Date: Thu, 26 Feb 2015 12:51:21 +0530 Subject: Re: Spark cluster set up on EC2 customization From: ak...@sigmoidanalytics.com To: ssti...@live.com CC: user@spark.apache.org You can easily add a function (say setup_pig) inside the function setup_cluster in this scriptThanksBest Regards On Thu, Feb 26, 2015 at 7:08 AM, Sameer Tilak ssti...@live.com wrote: Hi, I was looking at the documentation for deploying Spark cluster on EC2. http://spark.apache.org/docs/latest/ec2-scripts.html We are using Pig to build the data pipeline and then use MLLib for analytics. I was wondering if someone has any experience to include additional tools/services such as Pig/Hadoop in the above deployment script?
Spark cluster set up on EC2 customization
Hi, I was looking at the documentation for deploying Spark cluster on EC2. http://spark.apache.org/docs/latest/ec2-scripts.html We are using Pig to build the data pipeline and then use MLLib for analytics. I was wondering if someone has any experience to include additional tools/services such as Pig/Hadoop in the above deployment script?
RE: Interpreting MLLib's linear regression o/p
Hi,It is a text format in which each line represents a labeled sparse feature vector using the following format:label index1:value1 index2:value2 ...This was the confusing part in the documentation: where the indices are one-based and in ascending order. After loading, the feature indices are converted to zero-based. Let us say that I have 40 features so I create an index file like this: Feature, index number:F1 1F2 2F3 3...F4 40 I then create my feature vectors and in the libsvm format something like:1 10:1 20:0 8:1 4:0 24:11 1:1 40:0 2:1 8:0 9:1 23:10 23:1 18:0 13:1. I run regression and get back models.weights which are 40 weights.Say I get 0.110.34450.5... In that case does the first weight (0.11) correspond to index 1/ F1 or does or correspond to index 2/F2? Since Input is 1-based and o/p is 0-based. Or is 0-based indexing is only for internal representation and what you get back at the end of regression is essentially 1-based indexed like your input so 0.11 maps onto from F1and so on? Date: Mon, 22 Dec 2014 16:31:57 -0800 Subject: Re: Interpreting MLLib's linear regression o/p From: men...@gmail.com To: ssti...@live.com CC: user@spark.apache.org Did you check the indices in the LIBSVM data and the master file? Do they match? -Xiangrui On Sat, Dec 20, 2014 at 8:13 AM, Sameer Tilak ssti...@live.com wrote: Hi All, I use LIBSVM format to specify my input feature vector, which used 1-based index. When I run regression the o/p is 0-indexed based. I have a master lookup file that maps back these indices to what they stand or. However, I need to add offset of 2 and not 1 to the regression outcome during the mapping. So for example to map the index of 800 from the regression output file, I look for 802 in my master lookup file and then things make sense. I can understand adding offset of 1, but not sure why adding offset 2 is working fine. Have others seem something like this as well? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
MLlib: Libsvm: Loss was due to java.lang.ArrayIndexOutOfBoundsException
Hi All,When I am running LinearRegressionWithSGD, I get the following error. Any help on how to debug this further will be highly appreciated. 14/12/10 20:26:02 WARN TaskSetManager: Loss was due to java.lang.ArrayIndexOutOfBoundsExceptionjava.lang.ArrayIndexOutOfBoundsException: 150323 at breeze.linalg.operators.DenseVector_SparseVector_Ops$$anon$129.apply(SparseVectorOps.scala:231) at breeze.linalg.operators.DenseVector_SparseVector_Ops$$anon$129.apply(SparseVectorOps.scala:216) at breeze.linalg.operators.BinaryRegistry$class.apply(BinaryOp.scala:60) at breeze.linalg.VectorOps$$anon$178.apply(Vector.scala:391)at breeze.linalg.NumericOps$class.dot(NumericOps.scala:83) at breeze.linalg.DenseVector.dot(DenseVector.scala:47) at org.apache.spark.mllib.optimization.LeastSquaresGradient.compute(Gradient.scala:125) at org.apache.spark.mllib.optimization.GradientDescent$$anonfun$runMiniBatchSGD$1$$anonfun$1.apply(GradientDescent.scala:180) at org.apache.spark.mllib.optimization.GradientDescent$$anonfun$runMiniBatchSGD$1$$anonfun$1.apply(GradientDescent.scala:179) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144) at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157) at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:201) at scala.collection.AbstractIterator.aggregate(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$21.apply(RDD.scala:838)at org.apache.spark.rdd.RDD$$anonfun$21.apply(RDD.scala:838)at org.apache.spark.SparkContext$$anonfun$23.apply(SparkContext.scala:1116) at org.apache.spark.SparkContext$$anonfun$23.apply(SparkContext.scala:1116) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Best regards,--Sameer.
MLLIb: Linear regression: Loss was due to java.lang.ArrayIndexOutOfBoundsException
Hi All, I was able to run LinearRegressionwithSGD for a largeer dataset ( 2GB sparse). I have now filtered the data and I am running regression on a subset of it (~ 200 MB). I see this error, which is strange since it was running fine with the superset data. Is this a formatting issue (which I doubt) or is this some other issue in data preparation? I confirmed that there is no empty line in my dataset. Any help with this will be highly appreciated. 14/12/08 20:32:03 WARN TaskSetManager: Lost TID 5 (task 3.0:1) 14/12/08 20:32:03 WARN TaskSetManager: Loss was due to java.lang.ArrayIndexOutOfBoundsException java.lang.ArrayIndexOutOfBoundsException: 150323 at breeze.linalg.operators.DenseVector_SparseVector_Ops$$anon$129.apply(SparseVectorOps.scala:231) at breeze.linalg.operators.DenseVector_SparseVector_Ops$$anon$129.apply(SparseVectorOps.scala:216) at breeze.linalg.operators.BinaryRegistry$class.apply(BinaryOp.scala:60) at breeze.linalg.VectorOps$$anon$178.apply(Vector.scala:391) at breeze.linalg.NumericOps$class.dot(NumericOps.scala:83) at breeze.linalg.DenseVector.dot(DenseVector.scala:47) at org.apache.spark.mllib.optimization.LeastSquaresGradient.compute(Gradient.scala:125) at org.apache.spark.mllib.optimization.GradientDescent$$anonfun$runMiniBatchSGD$1$$anonfun$1.apply(GradientDescent.scala:180) at org.apache.spark.mllib.optimization.GradientDescent$$anonfun$runMiniBatchSGD$1$$anonfun$1.apply(GradientDescent.scala:179) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144) at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157) at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:201) at scala.collection.AbstractIterator.aggregate(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$21.apply(RDD.scala:838) at org.apache.spark.rdd.RDD$$anonfun$21.apply(RDD.scala:838) at org.apache.spark.SparkContext$$anonfun$23.apply(SparkContext.scala:1116) at org.apache.spark.SparkContext$$anonfun$23.apply(SparkContext.scala:1116) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)
MLLib: loading saved model
Hi All,I am using LinearRegressionWithSGD and then I save the model weights and intercept. File that contains weights have this format: 1.204550.13560.000456.. Intercept is 0 since I am using train not setting the intercept so it can be ignored for the moment. I would now like to initialize a new model object and using these saved weights from the above file. We are using CDH 5.1 Something along these lines: val weights = sc.textFile(linear-weights);val model = new LinearRegressionWithSGD(weights); then use is as: val valuesAndPreds = testData.map { point = val prediction = model.predict(point.features) (point.label, prediction)} Any pointers to how do I do that?
Model characterization
Hi All, I have been using LinearRegression model of MLLib and very pleased with its scalability and robustness. Right now, we are just calculating MSE of our model. We would like to characterize the performance of our model. I was wondering adding support for computing things such as Confidence Interval etc. are they something that are on the roadmap? Graphical things such as ROC curves etc. will that be supported by MLLib/other parts of the ecosystem? or is this something for which other statistical packages are recommended?
LinearRegression and model prediction threshold
Hi All, I am using LinearRegression and have a question about the details on model.predict method. Basically it is predicting variable y given an input vector x. However, can someone point me to the documentation about what is the threshold used in the predict method? Can that be changed ? I am assuming that i/p vector essentially gets mapped to a number and is compared against a threshold value and then y is either set to 0 or 1 based on those two numbers. Another question I have is if I want to save the model to hdfs for later reuse is there a recommended way for doing that? // Building the model val numIterations = 100 val model = LinearRegressionWithSGD.train(parsedData, numIterations) // Evaluate model on training examples and compute training error val valuesAndPreds = parsedData.map { point = val prediction = model.predict(point.features) (point.label, prediction) }
MLLib: libsvm - default value initialization
Hi All,I have my sparse data in libsvm format. val examples: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, mllib/data/sample_libsvm_data.txt) I am running Linear regression. Let us say that my data has following entry:1 1:0 4:1 I think it will assume 0 for indices 2 and 3, right? I would like to make default values to be 0.5 instead of 0. Is it possible? If not, I will have to switch to dense data and it will significantly increase the data size for me.
MLLib libsvm format
Hi All,I have a question regarding the ordering of indices. The document says that the indices indices are one-based and in ascending order. However, do the indices within a row need to be sorted in ascending order? Sparse dataIt is very common in practice to have sparse training data. MLlib supports reading training examples stored in LIBSVM format, which is the default format used by LIBSVM and LIBLINEAR. It is a text format in which each line represents a labeled sparse feature vector using the following format:label index1:value1 index2:value2 ... where the indices are one-based and in ascending order. After loading, the feature indices are converted to zero-based. For example, I have have indices ranging rom 1 to 1000 is this as a libsvm data file OK? 1110:1.0 80:0.5 310:0.00 890:0.5 20:0.0 200:0.5 400:1.0 82:0.0 and so on: OR do I need to sort them as: 1 80:0.5 110:1.0 310:0.00 20:0.082:0.0200:0.5 400:1.0 890:0.5
RE: MLLib libsvm format
Great, I will sort them. Sent via the Samsung GALAXY S®4, an ATT 4G LTE smartphone div Original message /divdivFrom: Xiangrui Meng men...@gmail.com /divdivDate:10/21/2014 3:29 PM (GMT-08:00) /divdivTo: Sameer Tilak ssti...@live.com /divdivCc: user@spark.apache.org /divdivSubject: Re: MLLib libsvm format /divdiv /div Yes. where the indices are one-based and **in ascending order**. -Xiangrui On Tue, Oct 21, 2014 at 1:10 PM, Sameer Tilak ssti...@live.com wrote: Hi All, I have a question regarding the ordering of indices. The document says that the indices indices are one-based and in ascending order. However, do the indices within a row need to be sorted in ascending order? Sparse data It is very common in practice to have sparse training data. MLlib supports reading training examples stored in LIBSVM format, which is the default format used by LIBSVM and LIBLINEAR. It is a text format in which each line represents a labeled sparse feature vector using the following format: label index1:value1 index2:value2 ... where the indices are one-based and in ascending order. After loading, the feature indices are converted to zero-based. For example, I have have indices ranging rom 1 to 1000 is this as a libsvm data file OK? 1110:1.0 80:0.5 310:0.0 0 890:0.5 20:0.0 200:0.5 400:1.0 82:0.0 and so on: OR do I need to sort them as: 1 80:0.5 110:1.0 310:0.0 0 20:0.082:0.0200:0.5 400:1.0 890:0.5
RE: MLLib Linear regression
Hi Xiangrui,Changing the default step size to 0.01 made a huge difference. The results make sense when I use A + B + C + D. MSE is ~0.07 and the outcome matches the domain knowledge. I was wondering is there any documentation on the parameters and when/how to vary them. Date: Tue, 7 Oct 2014 15:11:39 -0700 Subject: Re: MLLib Linear regression From: men...@gmail.com To: ssti...@live.com CC: user@spark.apache.org Did you test different regularization parameters and step sizes? In the combination that works, I don't see A + D. Did you test that combination? Are there any linear dependency between A's columns and D's columns? -Xiangrui On Tue, Oct 7, 2014 at 1:56 PM, Sameer Tilak ssti...@live.com wrote: BTW, one detail: When number of iterations is 100 all weights are zero or below and the indices are only from set A. When number of iterations is 150 I see 30+ non-zero weights (when sorted by weight) and indices are distributed across al sets. however MSE is high (5.xxx) and the result does not match the domain knowledge. When number of iterations is 400 I see 30+ non-zero weights (when sorted by weight) and indices are distributed across al sets. however MSE is high (6.xxx) and the result does not match the domain knowledge. Any help will be highly appreciated. From: ssti...@live.com To: user@spark.apache.org Subject: MLLib Linear regression Date: Tue, 7 Oct 2014 13:41:03 -0700 Hi All, I have following classes of features: class A: 15000 features class B: 170 features class C: 900 features Class D: 6000 features. I use linear regression (over sparse data). I get excellent results with low RMSE (~0.06) for the following combinations of classes: 1. A + B + C 2. B + C + D 3. A + B 4. A + C 5. B + D 6. C + D 7. D Unfortunately, when I use A + B + C + D (all the features) I get results that don't make any sense -- all weights are zero or below and the indices are only from set A. I also get high MSE. I changed the number of iterations from 100 to 150, 250, or even 400. I still get MSE as (5/ 6). Are there any other parameters that I can play with? Any insight on what could be wrong? Is it somehow it is not able to scale up to 22K features? (I highly doubt that). - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
MLLib Linear regression
Hi All,I have following classes of features: class A: 15000 featuresclass B: 170 featuresclass C: 900 featuresClass D: 6000 features. I use linear regression (over sparse data). I get excellent results with low RMSE (~0.06) for the following combinations of classes:1. A + B + C 2. B + C + D3. A + B4. A + C5. B + D6. C + D7. D Unfortunately, when I use A + B + C + D (all the features) I get results that don't make any sense -- all weights are zero or below and the indices are only from set A. I also get high MSE. I changed the number of iterations from 100 to 150, 250, or even 400. I still get MSE as (5/ 6). Are there any other parameters that I can play with? Any insight on what could be wrong? Is it somehow it is not able to scale up to 22K features? (I highly doubt that).
RE: MLLib Linear regression
BTW, one detail: When number of iterations is 100 all weights are zero or below and the indices are only from set A. When number of iterations is 150 I see 30+ non-zero weights (when sorted by weight) and indices are distributed across al sets. however MSE is high (5.xxx) and the result does not match the domain knowledge. When number of iterations is 400 I see 30+ non-zero weights (when sorted by weight) and indices are distributed across al sets. however MSE is high (6.xxx) and the result does not match the domain knowledge. Any help will be highly appreciated. From: ssti...@live.com To: user@spark.apache.org Subject: MLLib Linear regression Date: Tue, 7 Oct 2014 13:41:03 -0700 Hi All,I have following classes of features: class A: 15000 featuresclass B: 170 featuresclass C: 900 featuresClass D: 6000 features. I use linear regression (over sparse data). I get excellent results with low RMSE (~0.06) for the following combinations of classes:1. A + B + C 2. B + C + D3. A + B4. A + C5. B + D6. C + D7. D Unfortunately, when I use A + B + C + D (all the features) I get results that don't make any sense -- all weights are zero or below and the indices are only from set A. I also get high MSE. I changed the number of iterations from 100 to 150, 250, or even 400. I still get MSE as (5/ 6). Are there any other parameters that I can play with? Any insight on what could be wrong? Is it somehow it is not able to scale up to 22K features? (I highly doubt that).
RE: MLLib: Missing value imputation
Thanks, Xiangrui and Debashish for your input. Date: Wed, 1 Oct 2014 08:35:51 -0700 Subject: Re: MLLib: Missing value imputation From: debasish.da...@gmail.com To: men...@gmail.com CC: ssti...@live.com; user@spark.apache.org If the missing values are 0, then you can also look into implicit formulation... On Tue, Sep 30, 2014 at 12:05 PM, Xiangrui Meng men...@gmail.com wrote: We don't handle missing value imputation in the current version of MLlib. In future releases, we can store feature information in the dataset metadata, which may store the default value to replace missing values. But no one is committed to work on this feature. For now, you can filter out examples containing missing values and use the rest for training. -Xiangrui On Tue, Sep 30, 2014 at 11:26 AM, Sameer Tilak ssti...@live.com wrote: Hi All, Can someone please me to the documentation that describes how missing value imputation is done in MLLib. Also, any information of how this fits in the overall roadmap will be great. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
MLLib: Missing value imputation
Hi All,Can someone please me to the documentation that describes how missing value imputation is done in MLLib. Also, any information of how this fits in the overall roadmap will be great.
RE: MLUtils.loadLibSVMFile error
Hi Liquan, Thanks. I was running this in spark-shell. I was able to resolve this issue by creating an app and then submitting it via spark-submit in yarn-client mode. I have seen this happening before as well -- submitting via spark-shell has memory issues. The same code then works fine when submitted as an app in spark-submit yarn-client mode. I am not sure whether this is due to difference between spark-shell and spark-submit or yarn vs non-yarn mode. Date: Wed, 24 Sep 2014 22:13:35 -0700 Subject: Re: MLUtils.loadLibSVMFile error From: liquan...@gmail.com To: ssti...@live.com CC: so...@cloudera.com; user@spark.apache.org Hi Sameer, I think there are two things that you can do1) What is your current driver-memory or executor-memory, you can try to Increate driver-memory or executor-memory to see if that solves your problem. 2) How many features in your data? Two many features may create a large number of temp objects, which may also cause GC to happen. Hope this helps!Liquan On Wed, Sep 24, 2014 at 9:50 PM, Sameer Tilak ssti...@live.com wrote: Hi All,I was able to solve this formatting issue. However, I have another question. When I do the following, val examples: RDD[LabeledPoint] =MLUtils.loadLibSVMFile(sc,structured/results/data.txt) I get java.lang.OutOfMemoryError: GC overhead limit exceeded error. Is it possible to specify the number of partitions explicitly? I want to add that this dataset is sparse and is fairly small -- ~250 MB. Error log: 14/09/24 21:41:02 ERROR Executor: Exception in task ID 0java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.regex.Pattern.compile(Pattern.java:1655) at java.util.regex.Pattern.init(Pattern.java:1337)at java.util.regex.Pattern.compile(Pattern.java:1022) at java.lang.String.split(String.java:2313) at java.lang.String.split(String.java:2355) at scala.collection.immutable.StringLike$class.split(StringLike.scala:201) at scala.collection.immutable.StringOps.split(StringOps.scala:31) at org.apache.spark.mllib.util.MLUtils$$anonfun$4$$anonfun$5.apply(MLUtils.scala:80) at org.apache.spark.mllib.util.MLUtils$$anonfun$4$$anonfun$5.apply(MLUtils.scala:79) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.mllib.util.MLUtils$$anonfun$4.apply(MLUtils.scala:79) at org.apache.spark.mllib.util.MLUtils$$anonfun$4.apply(MLUtils.scala:76) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:107) at org.apache.spark.rdd.RDD.iterator(RDD.scala:227) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)14/09/24 21:41:02 ERROR ExecutorUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-0,5,main]java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.regex.Pattern.compile(Pattern.java:1655) at java.util.regex.Pattern.init(Pattern.java:1337)at java.util.regex.Pattern.compile(Pattern.java:1022) at java.lang.String.split(String.java:2313) at java.lang.String.split(String.java:2355) at scala.collection.immutable.StringLike$class.split(StringLike.scala:201) at scala.collection.immutable.StringOps.split(StringOps.scala:31) at org.apache.spark.mllib.util.MLUtils$$anonfun$4$$anonfun$5.apply(MLUtils.scala:80) at org.apache.spark.mllib.util.MLUtils$$anonfun$4$$anonfun$5.apply(MLUtils.scala:79) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach
MLUtils.loadLibSVMFile error
Hi All, When I try to load dataset using MLUtils.loadLibSVMFile, I have the following problem. Any help will be greatly appreciated! Code snippet: import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD import org.apache.spark.mllib.regression.LinearRegressionWithSGD val examples: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc,structured/results/data.txt) stacktrace: 14/09/24 15:00:49 ERROR Executor: Exception in task ID 0java.lang.ArrayIndexOutOfBoundsException: 1 at org.apache.spark.mllib.util.MLUtils$$anonfun$4$$anonfun$5.apply(MLUtils.scala:82) at org.apache.spark.mllib.util.MLUtils$$anonfun$4$$anonfun$5.apply(MLUtils.scala:79) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.mllib.util.MLUtils$$anonfun$4.apply(MLUtils.scala:79) at org.apache.spark.mllib.util.MLUtils$$anonfun$4.apply(MLUtils.scala:76) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:107) at org.apache.spark.rdd.RDD.iterator(RDD.scala:227) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744)14/09/24 15:00:49 ERROR Executor: Exception in task ID 1java.lang.ArrayIndexOutOfBoundsException: 1 at org.apache.spark.mllib.util.MLUtils$$anonfun$4$$anonfun$5.apply(MLUtils.scala:82) at org.apache.spark.mllib.util.MLUtils$$anonfun$4$$anonfun$5.apply(MLUtils.scala:79) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.mllib.util.MLUtils$$anonfun$4.apply(MLUtils.scala:79) at org.apache.spark.mllib.util.MLUtils$$anonfun$4.apply(MLUtils.scala:76) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:107) at org.apache.spark.rdd.RDD.iterator(RDD.scala:227) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744)14/09/24 15:00:49 WARN TaskSetManager: Lost TID 0 (task 0.0:0)14/09/24 15:00:49 WARN TaskSetManager: Loss was due to java.lang.ArrayIndexOutOfBoundsExceptionjava.lang.ArrayIndexOutOfBoundsException: 1at org.apache.spark.mllib.util.MLUtils$$anonfun$4$$anonfun$5.apply(MLUtils.scala:82) at org.apache.spark.mllib.util.MLUtils$$anonfun$4$$anonfun$5.apply(MLUtils.scala:79) at
RE: MLUtils.loadLibSVMFile error
:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) From: so...@cloudera.com Date: Wed, 24 Sep 2014 23:07:27 +0100 Subject: Re: MLUtils.loadLibSVMFile error To: ssti...@live.com Well, why not show some of the file? it's pretty certain there is a problem with the format. The large repeated stack trace doesn't say anything more. On Wed, Sep 24, 2014 at 11:02 PM, Sameer Tilak ssti...@live.com wrote: Hi All, When I try to load dataset using MLUtils.loadLibSVMFile, I have the following problem. Any help will be greatly appreciated! Code snippet: import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD import org.apache.spark.mllib.regression.LinearRegressionWithSGD val examples: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc,structured/results/data.txt) stacktrace: 14/09/24 15:00:49 ERROR Executor: Exception in task ID 0 java.lang.ArrayIndexOutOfBoundsException: 1 at org.apache.spark.mllib.util.MLUtils$$anonfun$4$$anonfun$5.apply(MLUtils.scala:82) at org.apache.spark.mllib.util.MLUtils$$anonfun$4$$anonfun$5.apply(MLUtils.scala:79) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.mllib.util.MLUtils$$anonfun$4.apply(MLUtils.scala:79) at org.apache.spark.mllib.util.MLUtils$$anonfun$4.apply(MLUtils.scala:76) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:107) at org.apache.spark.rdd.RDD.iterator(RDD.scala:227) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) 14/09/24 15:00:49 ERROR Executor: Exception in task ID 1 java.lang.ArrayIndexOutOfBoundsException: 1 at org.apache.spark.mllib.util.MLUtils$$anonfun$4$$anonfun$5.apply(MLUtils.scala:82) at org.apache.spark.mllib.util.MLUtils$$anonfun$4$$anonfun$5.apply(MLUtils.scala:79) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.mllib.util.MLUtils$$anonfun$4.apply(MLUtils.scala:79) at org.apache.spark.mllib.util.MLUtils$$anonfun$4.apply(MLUtils.scala:76) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:107) at org.apache.spark.rdd.RDD.iterator(RDD.scala:227) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51
RE: MLLib: LIBSVM issue
Thanks, Burak,Yes, tab was an issue and I was able to get it working after replacing that with space. Date: Wed, 17 Sep 2014 21:11:00 -0700 From: bya...@stanford.edu To: ssti...@live.com CC: user@spark.apache.org Subject: Re: MLLib: LIBSVM issue Hi, The spacing between the inputs should be a single space, not a tab. I feel like your inputs have tabs between them instead of a single space. Therefore the parser cannot parse the input. Best, Burak - Original Message - From: Sameer Tilak ssti...@live.com To: user@spark.apache.org Sent: Wednesday, September 17, 2014 7:25:10 PM Subject: MLLib: LIBSVM issue Hi All,We have a fairly large amount of sparse data. I was following the following instructions in the manual: Sparse dataIt is very common in practice to have sparse training data. MLlib supports reading training examples stored in LIBSVM format, which is the default format used by LIBSVM and LIBLINEAR. It is a text format in which each line represents a labeled sparse feature vector using the following format:label index1:value1 index2:value2 ... import org.apache.spark.mllib.regression.LabeledPointimport org.apache.spark.mllib.util.MLUtilsimport org.apache.spark.rdd.RDD val examples: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, data/mllib/sample_libsvm_data.txt) I believe that I have formatted my data as per the required Libsvm format. Here is a snippet of that: 1122:11693:11771:11974:12334:1 2378:12562:1 1118:11389:11413:11454:1 1780:12562:15051:15417:15548:1 5798:15862:1 0150:1214:1468:11013:1 1078:11092:11117:11489:11546:1 1630:11635:11827:12024:12215:12478:1 2761:15985:16115:16218:1 0251:1 5578:1 However,When I use MLUtils.loadLibSVMFile(sc, path-to-data-file)I get the following error messages in mt spark-shell. Can someone please point me in right direction. java.lang.NumberFormatException: For input string: 150:1214:1 468:11013:11078:11092:11117:11489:1 1546:11630:11635:11827:12024:1 2215:12478:12761:15985:16115:16218:1 at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1241) at java.lang.Double.parseDouble(Double.java:540) at scala.collection.immutable.StringLike$class.toDouble(StringLike.scala:232) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: MLLib: LIBSVM issue
Thanks, will try it out today. Date: Wed, 17 Sep 2014 23:04:31 -0700 Subject: Re: MLLib: LIBSVM issue From: debasish.da...@gmail.com To: bya...@stanford.edu CC: ssti...@live.com; user@spark.apache.org We dump fairly big libsvm to compare against liblinear/libsvm...the following code dumps out libsvm format from SparseVector... def toLibSvm(features: SparseVector): String = { val indices = features.indices.map(_ + 1) val values = features.values indices.zip(values).mkString( ).replace(',', ':').replace((, ).replace(), ) } On Wed, Sep 17, 2014 at 9:11 PM, Burak Yavuz bya...@stanford.edu wrote: Hi, The spacing between the inputs should be a single space, not a tab. I feel like your inputs have tabs between them instead of a single space. Therefore the parser cannot parse the input. Best, Burak - Original Message - From: Sameer Tilak ssti...@live.com To: user@spark.apache.org Sent: Wednesday, September 17, 2014 7:25:10 PM Subject: MLLib: LIBSVM issue Hi All,We have a fairly large amount of sparse data. I was following the following instructions in the manual: Sparse dataIt is very common in practice to have sparse training data. MLlib supports reading training examples stored in LIBSVM format, which is the default format used by LIBSVM and LIBLINEAR. It is a text format in which each line represents a labeled sparse feature vector using the following format:label index1:value1 index2:value2 ... import org.apache.spark.mllib.regression.LabeledPointimport org.apache.spark.mllib.util.MLUtilsimport org.apache.spark.rdd.RDD val examples: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, data/mllib/sample_libsvm_data.txt) I believe that I have formatted my data as per the required Libsvm format. Here is a snippet of that: 1122:11693:11771:11974:12334:1 2378:12562:1 1118:11389:11413:11454:1 1780:12562:15051:15417:15548:1 5798:15862:1 0150:1214:1468:11013:1 1078:11092:11117:11489:11546:11630:1 1635:11827:12024:12215:12478:1 2761:15985:16115:16218:1 0251:15578:1 However,When I use MLUtils.loadLibSVMFile(sc, path-to-data-file)I get the following error messages in mt spark-shell. Can someone please point me in right direction. java.lang.NumberFormatException: For input string: 150:1214:1 468:11013:11078:11092:11117:11489:1 1546:11630:11635:11827:12024:12215:1 2478:12761:15985:16115:16218:1 at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1241) at java.lang.Double.parseDouble(Double.java:540) at scala.collection.immutable.StringLike$class.toDouble(StringLike.scala:232) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
MLLib regression model weights
Hi All, I am able to run LinearRegressionWithSGD on a small sample dataset (~60MB Libsvm file of sparse data) with 6700 features. val model = LinearRegressionWithSGD.train(examples, numIterations) At the end I get a model that model.weights.sizeres6: Int = 6699 I am assuming each entry in the model is weight for the corresponding feature/index. However,, if I want to get the top10 most important features or all features with weights higher than certain threshold, is that functionality available out-of-box? I can implement that on my own, but seems like a common feature that most of the people will need when they are working on high-dimensional dataset.
MLLib: LIBSVM issue
Hi All,We have a fairly large amount of sparse data. I was following the following instructions in the manual: Sparse dataIt is very common in practice to have sparse training data. MLlib supports reading training examples stored in LIBSVM format, which is the default format used by LIBSVM and LIBLINEAR. It is a text format in which each line represents a labeled sparse feature vector using the following format:label index1:value1 index2:value2 ... import org.apache.spark.mllib.regression.LabeledPointimport org.apache.spark.mllib.util.MLUtilsimport org.apache.spark.rdd.RDD val examples: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, data/mllib/sample_libsvm_data.txt) I believe that I have formatted my data as per the required Libsvm format. Here is a snippet of that: 1122:11693:11771:11974:12334:1 2378:12562:1 1118:11389:11413:11454:1 1780:12562:15051:15417:15548:1 5798:15862:1 0150:1214:1468:11013:1 1078:11092:11117:11489:11546:11630:1 1635:11827:12024:12215:12478:1 2761:15985:16115:16218:1 0251:15578:1 However,When I use MLUtils.loadLibSVMFile(sc, path-to-data-file)I get the following error messages in mt spark-shell. Can someone please point me in right direction. java.lang.NumberFormatException: For input string: 150:1214:1 468:11013:11078:11092:11117:11489:1 1546:11630:11635:11827:12024:12215:1 2478:12761:15985:16115:16218:1 at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1241) at java.lang.Double.parseDouble(Double.java:540) at scala.collection.immutable.StringLike$class.toDouble(StringLike.scala:232)
RDD projection and sorting
Hi All, I have data in for following format:L 1st column is userid and the second column onward are class ids for various products. I want to save this in Libsvm format and an intermediate step is to sort (in ascending manner) the class ids. For example: I/Puid1 12433580 2670122 259317821526uid2 121 285 24471516343 385 1200 912 143058241451893112711088 258416645481 Desired O/P:uid1 122 1243 1526 1782 2593 2670 3580uid2 121 285 343 385 912 1088 1200 1271 1430 1451 1516 1664 2447 25845481 5824 8931 Can someone please point me in the right direction. How do I project if I use val data = sc.textFile(..)How do I project column 1 to end (not including column 0) and then sort these projected columns.
MLLib sparse vector
Hi All,I have transformed the data into following format: First column is user id, and then all the other columns are class ids. For a user only class ids that appear in this row have value 1 and others are 0. I need to crease a sparse vector from this. Does the API for creating a sparse vector that can directly support this format? User idProduct class ids 2622572 145447 162013421 28565 285556 293 455367261 130 3646167118806 183576 328651715 57671 57476
RE: MLLib decision tree: Weights
Dear Xiangrui, Thanks for your reply. We will use sampling for now. However, just to let you know, we believe that it is not the best fit for our problems due to two reasons (1) high dimensionality of data (600) features and (2) Highly skewed distribution. Do you have any idea when MLLib v1.2 will be released? We can plan things accordingly. Date: Tue, 2 Sep 2014 23:15:09 -0700 Subject: Re: MLLib decision tree: Weights From: men...@gmail.com To: ssti...@live.com CC: user@spark.apache.org This is not supported in MLlib. Hopefully, we will add support for weighted examples in v1.2. If you want to train weighted instances with the current tree implementation, please try importance sampling first to adjust the weights. For instance, an example with weight 0.3 is sampled with probability 0.3. And if it is sampled, its weight become 1. -Xiangrui On Tue, Sep 2, 2014 at 1:05 PM, Sameer Tilak ssti...@live.com wrote: Hi everyone, We are looking to apply a weight to each training example; this weight should be used when computing the penalty of a misclassified example. For instance, without weighting, each example is penalized 1 point when evaluating the model of a classifier, such as a decision tree. We would like to customize this penalty for each training example, such that we could apply a penalty of W for a misclassified example, where W is a weight associated with the given training example. Is this something that is supported directly in MLLib? I would appreciate if someone can point me in right direction. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
MLLib decision tree: Weights
Hi everyone, We are looking to apply a weight to each training example; this weight should be used when computing the penalty of a misclassified example. For instance, without weighting, each example is penalized 1 point when evaluating the model of a classifier, such as a decision tree. We would like to customize this penalty for each training example, such that we could apply a penalty of W for a misclassified example, where W is a weight associated with the given training example. Is this something that is supported directly in MLLib? I would appreciate if someone can point me in right direction.
Amplab: big-data-benchmark
Hi All, I am planning to run amplab benchmark suite to evaluate the performance of our cluster. I looked at: https://amplab.cs.berkeley.edu/benchmark/ and it mentions about data avallability at: s3n://big-data-benchmark/pavlo/[text|text-deflate|sequence|sequence-snappy]/[suffix]where /tiny/, /1node/ and /5nodes/ are options for suffix. However, I am not able to doanload these datasets directly. Here is what I see. I read that they can be used directly by doing : sc.textFile(s3:/). However, I wanted to make sure that my understanding is correct. Here is what I see at http://s3.amazonaws.com/big-data-benchmark/ I do not see anything for sequence or text-deflate. I see sequence-snappy dataset: ContentsKeypavlo/sequence-snappy/5nodes/crawl/000738_0/KeyLastModified2013-05-27T21:26:40.000Z/LastModifiedETaga978d18721d5a533d38a88f558461644/ETagSize42958735/SizeStorageClassSTANDARD/StorageClass/Contents For text, I get the following error: ErrorCodeNoSuchKey/CodeMessageThe specified key does not exist./MessageKeypavlo/text/1node/crawl/KeyRequestId166D239D38399526/RequestIdHostId4Bg8BHomWqJ6BXOkx/3fQZhN5Uw1TtCn01uQzm+1qYffx2s/oPV+9sGoAWV2thCI/HostId/Error Please let me know if there is a way to readily download the dataset and view it.
RE: Amplab: big-data-benchmark
Hi Burak,Thanks, I will then start benchmarking the cluster. Date: Wed, 27 Aug 2014 11:52:05 -0700 From: bya...@stanford.edu To: ssti...@live.com CC: user@spark.apache.org Subject: Re: Amplab: big-data-benchmark Hi Sameer, I've faced this issue before. They don't show up on http://s3.amazonaws.com/big-data-benchmark/. But you can directly use: `sc.textFile(s3n://big-data-benchmark/pavlo/text/tiny/crawl)` The gotcha is that you also need to supply which dataset you want: crawl, uservisits, or rankings in lower case after the format and size you want them in. They should be there. Best, Burak - Original Message - From: Sameer Tilak ssti...@live.com To: user@spark.apache.org Sent: Wednesday, August 27, 2014 11:42:28 AM Subject: Amplab: big-data-benchmark Hi All, I am planning to run amplab benchmark suite to evaluate the performance of our cluster. I looked at: https://amplab.cs.berkeley.edu/benchmark/ and it mentions about data avallability at: s3n://big-data-benchmark/pavlo/[text|text-deflate|sequence|sequence-snappy]/[suffix]where /tiny/, /1node/ and /5nodes/ are options for suffix. However, I am not able to doanload these datasets directly. Here is what I see. I read that they can be used directly by doing : sc.textFile(s3:/). However, I wanted to make sure that my understanding is correct. Here is what I see at http://s3.amazonaws.com/big-data-benchmark/ I do not see anything for sequence or text-deflate. I see sequence-snappy dataset: ContentsKeypavlo/sequence-snappy/5nodes/crawl/000738_0/KeyLastModified2013-05-27T21:26:40.000Z/LastModifiedETaga978d18721d5a533d38a88f558461644/ETagSize42958735/SizeStorageClassSTANDARD/StorageClass/Contents For text, I get the following error: ErrorCodeNoSuchKey/CodeMessageThe specified key does not exist./MessageKeypavlo/text/1node/crawl/KeyRequestId166D239D38399526/RequestIdHostId4Bg8BHomWqJ6BXOkx/3fQZhN5Uw1TtCn01uQzm+1qYffx2s/oPV+9sGoAWV2thCI/HostId/Error Please let me know if there is a way to readily download the dataset and view it. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Execution time increasing with increase of cluster size
Can you tell which nodes were doing the computation in each case? Date: Wed, 27 Aug 2014 20:29:38 +0530 Subject: Execution time increasing with increase of cluster size From: sarathchandra.jos...@algofusiontech.com To: user@spark.apache.org Hi, I've written a simple scala program which reads a file on HDFS (which is a delimited file having 100 fields and 1 million rows), splits each row with delimiter, deduces hashcode of each field, makes new rows with these hashcodes and writes these rows back to HDFS. Code attached. When I run this on spark cluster of 2 nodes (these 2 nodes also act as HDFS cluster) it took about 35sec to complete. Then I increased the cluster to 4 nodes (additional nodes are not part of HDFS cluster) and submitted the same job. I was expecting a decrease in the execution time but instead it took 3 times more time (1.6 min) to complete. Attached snapshots of the execution summary. Both the times I've set executor memory to 6GB which is available in all the nodes. What am I'm missing here? Do I need to do any additional configuration when increasing the cluster size? ~Sarath - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
MLBase status
Hi All,I was wondering can someone please tell me the status of MLbase and its roadmap in terms of software release. We are very interested in exploring it for our applications.
MLlib: issue with increasing maximum depth of the decision tree
Hi All,My dataset is fairly small -- a CSV file with around half million rows and 600 features. Everything works when I set maximum depth of the decision tree to 5 or 6. However, I get this error for larger values of that parameter -- For example when I set it to 10. Have others encountered a similar issue? 14/08/20 10:27:26 INFO TaskSetManager: Serialized task 5.0:390 as 400933 bytes in 1 ms14/08/20 10:27:26 WARN TaskSetManager: Lost TID 1194 (task 5.0:399)14/08/20 10:27:26 WARN TaskSetManager: Loss was due to java.lang.ArrayIndexOutOfBoundsExceptionjava.lang.ArrayIndexOutOfBoundsException: 178 at org.apache.spark.mllib.linalg.DenseVector.apply(Vectors.scala:163) at org.apache.spark.mllib.tree.DecisionTree$.findBin$1(DecisionTree.scala:444) at org.apache.spark.mllib.tree.DecisionTree$.org$apache$spark$mllib$tree$DecisionTree$$findBinsForLevel$1(DecisionTree.scala:529) at org.apache.spark.mllib.tree.DecisionTree$$anonfun$3.apply(DecisionTree.scala:653) at org.apache.spark.mllib.tree.DecisionTree$$anonfun$3.apply(DecisionTree.scala:653) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144) at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157) at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:201) at scala.collection.AbstractIterator.aggregate(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$21.apply(RDD.scala:838)at org.apache.spark.rdd.RDD$$anonfun$21.apply(RDD.scala:838)at org.apache.spark.SparkContext$$anonfun$23.apply(SparkContext.scala:1116) at org.apache.spark.SparkContext$$anonfun$23.apply(SparkContext.scala:1116) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744)
RE: How to set KryoRegistrator class in spark-shell
Hi Wang,Have you tried doing this in your application? conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer) conf.set(spark.kryo.registrator, yourpackage.MyKryoRegistrator) You then don't need to specify it via commandline. Date: Wed, 20 Aug 2014 12:25:14 -0700 Subject: How to set KryoRegistrator class in spark-shell From: bewang.t...@gmail.com To: user@spark.apache.org I want to use opencsv's CSVParser to parse csv lines using a script like below in spark-shell: import au.com.bytecode.opencsv.CSVParser;import com.esotericsoftware.kryo.Kryo import org.apache.spark.serializer.KryoRegistratorimport org.apache.hadoop.fs.{Path, FileSystem} class MyKryoRegistrator extends KryoRegistrator { override def registerClasses(kryo:Kryo) { kryo.register(classOf[CSVParser]) }} val outDir=/tmp/dmc-out val fs = FileSystem.get(sc.hadoopConfiguration)fs.delete(new Path(outDir), true); val largeLines = sc.textFile(/tmp/dmc-03-08/*.gz)val parser = new CSVParser('|', '')largeLines.map(parser.parseLine(_).toList).saveAsTextFile(outDir, classOf[org.apache.hadoop.io.compress.GzipCodec]) If I start spark-shell with spark.kryo.registrator like this SPARK_JAVA_OPTS=-Dspark.serializer=org.apache.spark.serializer.KryoSerializer -Dspark.kryo.registrator=MyKryoRegistrator spark-shell it complains that MyKroRegistrator not found when I run :load my_script in spark-shell. 14/08/20 12:14:01 ERROR KryoSerializer: Failed to run spark.kryo.registrator java.lang.ClassNotFoundException: MyKryoRegistrator What's wrong?
FW: Decision tree: categorical variables
From: ssti...@live.com To: men...@gmail.com Subject: RE: Decision tree: categorical variables Date: Wed, 20 Aug 2014 12:09:52 -0700 Hi Xiangrui, My data is in the following format: 0,1,5,A,8,1,M0,1,5,B,4,1,M1,0,2,B,7,0,U0,1,3,C,8,0,M0,0,5,C,1,0,M1,1,5,C,8,0,U0,0,5,B,8,0,M1,0,3,B,2,1,M0,1,5,B,8,0,F1,0,2,B,4,0,F0,1,5,A,8,0,F I can create a map like this: val catmap = Map(3- 3, 6 - 2) However, I am not sure what should I do when I parse the data. In the default case, I parse it like: val parsedData = data.map { line = val parts = line.split(',').map(_.toDouble) LabeledPoint(parts(0), Vectors.dense(parts.tail)) } Do In need to explicitly do something for columns 3 and 6 or just specifying map will suffice Date: Tue, 19 Aug 2014 16:45:35 -0700 Subject: Re: Decision tree: categorical variables From: men...@gmail.com To: ssti...@live.com CC: user@spark.apache.org The categorical features must be encoded into indices starting from 0: 0, 1, ..., numCategories - 1. Then you can provide the categoricalFeatureInfo map to specify which columns contain categorical features and the number of categories in each. Joseph is updating the user guide. But if you want to try something now, you can take look at the docs of DecisionTree.trainClassifier and trainRegressor: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala#L360 -Xiangrui On Tue, Aug 19, 2014 at 4:24 PM, Sameer Tilak ssti...@live.com wrote: Hi All, Is there any example of MLlib decision tree handling categorical variables? My dataset includes few categorical variables (20 out of 100 features) so was interested in knowing how I can use the current version of decision tree implementation to handle this situation? I looked at the LabeledData and not sure if that the way to go.. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Decision tree: categorical variables
Was able to resolve the parsing issue. Thanks! From: ssti...@live.com To: user@spark.apache.org Subject: FW: Decision tree: categorical variables Date: Wed, 20 Aug 2014 12:48:10 -0700 From: ssti...@live.com To: men...@gmail.com Subject: RE: Decision tree: categorical variables Date: Wed, 20 Aug 2014 12:09:52 -0700 Hi Xiangrui, My data is in the following format: 0,1,5,A,8,1,M0,1,5,B,4,1,M1,0,2,B,7,0,U0,1,3,C,8,0,M0,0,5,C,1,0,M1,1,5,C,8,0,U0,0,5,B,8,0,M1,0,3,B,2,1,M0,1,5,B,8,0,F1,0,2,B,4,0,F0,1,5,A,8,0,F I can create a map like this: val catmap = Map(3- 3, 6 - 2) However, I am not sure what should I do when I parse the data. In the default case, I parse it like: val parsedData = data.map { line = val parts = line.split(',').map(_.toDouble) LabeledPoint(parts(0), Vectors.dense(parts.tail)) } Do In need to explicitly do something for columns 3 and 6 or just specifying map will suffice Date: Tue, 19 Aug 2014 16:45:35 -0700 Subject: Re: Decision tree: categorical variables From: men...@gmail.com To: ssti...@live.com CC: user@spark.apache.org The categorical features must be encoded into indices starting from 0: 0, 1, ..., numCategories - 1. Then you can provide the categoricalFeatureInfo map to specify which columns contain categorical features and the number of categories in each. Joseph is updating the user guide. But if you want to try something now, you can take look at the docs of DecisionTree.trainClassifier and trainRegressor: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala#L360 -Xiangrui On Tue, Aug 19, 2014 at 4:24 PM, Sameer Tilak ssti...@live.com wrote: Hi All, Is there any example of MLlib decision tree handling categorical variables? My dataset includes few categorical variables (20 out of 100 features) so was interested in knowing how I can use the current version of decision tree implementation to handle this situation? I looked at the LabeledData and not sure if that the way to go.. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Decision tree: categorical variables
Hi All, Is there any example of MLlib decision tree handling categorical variables? My dataset includes few categorical variables (20 out of 100 features) so was interested in knowing how I can use the current version of decision tree implementation to handle this situation? I looked at the LabeledData and not sure if that the way to go..
RE: spark kryo serilizable exception
Hi,I was able to set this parameter in my application to resolve this issue: set(spark.kryoserializer.buffer.mb, 256) Please let me know if this helps. Date: Mon, 18 Aug 2014 21:50:02 +0800 From: dujinh...@hzduozhun.com To: user@spark.apache.org Subject: spark kryo serilizable exception hi all, In RDD map , i invoke an object that is Serialized by java standard , and exception :: com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0, required: 13 at com.esotericsoftware.kryo.io.Output.require(Output.java:138) at com.esotericsoftware.kryo.io.Output.writeAscii_slow(Output.java:446) at com.esotericsoftware.kryo.io.Output.writeString(Output.java:306) at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:153) at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:146) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:138) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
mlib model viewing and saving
Hi All, I have a mlib model: val model = DecisionTree.train(parsedData, Regression, Variance, maxDepth) I see model has following methods:algo asInstanceOf isInstanceOf predicttoString topNode model.topNode outputs:org.apache.spark.mllib.tree.model.Node = id = 0, isLeaf = false, predict = 0.5, split = Some(Feature = 87, threshold = 0.7931471805599453, featureType = Continuous, categories = List()), stats = Some(gain = 0.89, impurity = 0.35, left impurity = 0.12, right impurity = 0.00, predict = 0.50) I was wondering what is the best way to look at the model. We want to see what the decision tree looks like-- which features are selected, the details of splitting, what is the depth etc. Is there an easy way to see that? I can traverse it recursively using topNode.leftNode and topNode.rightNode. However, was wondering if there is any way to look at the model and also to save it on the hdfs for later use.
RE: java.lang.UnknownError: no bin was found for continuous variable.
Hi Yanbo,I think it was happening because some of the rows did not have all the columns. We are cleaning up the data and will let you know once we confirm this. Date: Thu, 14 Aug 2014 22:50:58 +0800 Subject: Re: java.lang.UnknownError: no bin was found for continuous variable. From: yanboha...@gmail.com To: ssti...@live.com Can you supply the detail code and data you used.From the log, it looks like can not find the bin for specific feature.The bin for continuous feature is a unit that covers a specific range of the feature. 2014-08-14 7:43 GMT+08:00 Sameer Tilak ssti...@live.com: Hi All, I am using the decision tree algorithm and I get the following error. Any help would be great! java.lang.UnknownError: no bin was found for continuous variable. at org.apache.spark.mllib.tree.DecisionTree$.findBin$1(DecisionTree.scala:492) at org.apache.spark.mllib.tree.DecisionTree$.org$apache$spark$mllib$tree$DecisionTree$$findBinsForLevel$1(DecisionTree.scala:529) at org.apache.spark.mllib.tree.DecisionTree$$anonfun$3.apply(DecisionTree.scala:653) at org.apache.spark.mllib.tree.DecisionTree$$anonfun$3.apply(DecisionTree.scala:653) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144) at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157) at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:201) at scala.collection.AbstractIterator.aggregate(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$21.apply(RDD.scala:838) at org.apache.spark.rdd.RDD$$anonfun$21.apply(RDD.scala:838)at org.apache.spark.SparkContext$$anonfun$23.apply(SparkContext.scala:1116) at org.apache.spark.SparkContext$$anonfun$23.apply(SparkContext.scala:1116) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744)14/08/13 16:36:06 ERROR ExecutorUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-0,5,main] java.lang.UnknownError: no bin was found for continuous variable. at org.apache.spark.mllib.tree.DecisionTree$.findBin$1(DecisionTree.scala:492) at org.apache.spark.mllib.tree.DecisionTree$.org$apache$spark$mllib$tree$DecisionTree$$findBinsForLevel$1(DecisionTree.scala:529) at org.apache.spark.mllib.tree.DecisionTree$$anonfun$3.apply(DecisionTree.scala:653) at org.apache.spark.mllib.tree.DecisionTree$$anonfun$3.apply(DecisionTree.scala:653) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144) at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157) at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:201) at scala.collection.AbstractIterator.aggregate(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$21.apply(RDD.scala:838) at org.apache.spark.rdd.RDD$$anonfun$21.apply(RDD.scala:838)at org.apache.spark.SparkContext$$anonfun$23.apply(SparkContext.scala:1116) at org.apache.spark.SparkContext$$anonfun$23.apply(SparkContext.scala:1116) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744)
Mlib model: viewing and saving
I have a mlib model: val model = DecisionTree.train(parsedData, Regression, Variance, maxDepth) I see model has following methods:algo asInstanceOf isInstanceOf predicttoString topNode model.topNode outputs:org.apache.spark.mllib.tree.model.Node = id = 0, isLeaf = false, predict = 0.5, split = Some(Feature = 87, threshold = 0.7931471805599453, featureType = Continuous, categories = List()), stats = Some(gain = 0.89, impurity = 0.35, left impurity = 0.12, right impurity = 0.00, predict = 0.50) I was wondering what is the best way to look at the model. We want to see what the decision tree looks like-- which features are selected, the details of splitting, what is the depth etc. Is there an easy way to see that? I can traverse it recursively using topNode.leftNode and topNode.rightNode. However, was wondering if there is any way to look at the model and also to save it on the hdfs for later use.
java.lang.UnknownError: no bin was found for continuous variable.
Hi All, I am using the decision tree algorithm and I get the following error. Any help would be great! java.lang.UnknownError: no bin was found for continuous variable. at org.apache.spark.mllib.tree.DecisionTree$.findBin$1(DecisionTree.scala:492) at org.apache.spark.mllib.tree.DecisionTree$.org$apache$spark$mllib$tree$DecisionTree$$findBinsForLevel$1(DecisionTree.scala:529) at org.apache.spark.mllib.tree.DecisionTree$$anonfun$3.apply(DecisionTree.scala:653) at org.apache.spark.mllib.tree.DecisionTree$$anonfun$3.apply(DecisionTree.scala:653) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144) at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157) at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:201) at scala.collection.AbstractIterator.aggregate(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$21.apply(RDD.scala:838)at org.apache.spark.rdd.RDD$$anonfun$21.apply(RDD.scala:838)at org.apache.spark.SparkContext$$anonfun$23.apply(SparkContext.scala:1116) at org.apache.spark.SparkContext$$anonfun$23.apply(SparkContext.scala:1116) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744)14/08/13 16:36:06 ERROR ExecutorUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-0,5,main]java.lang.UnknownError: no bin was found for continuous variable.at org.apache.spark.mllib.tree.DecisionTree$.findBin$1(DecisionTree.scala:492) at org.apache.spark.mllib.tree.DecisionTree$.org$apache$spark$mllib$tree$DecisionTree$$findBinsForLevel$1(DecisionTree.scala:529) at org.apache.spark.mllib.tree.DecisionTree$$anonfun$3.apply(DecisionTree.scala:653) at org.apache.spark.mllib.tree.DecisionTree$$anonfun$3.apply(DecisionTree.scala:653) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144) at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157) at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:201) at scala.collection.AbstractIterator.aggregate(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$21.apply(RDD.scala:838)at org.apache.spark.rdd.RDD$$anonfun$21.apply(RDD.scala:838)at org.apache.spark.SparkContext$$anonfun$23.apply(SparkContext.scala:1116) at org.apache.spark.SparkContext$$anonfun$23.apply(SparkContext.scala:1116) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744)
Random Forest implementation in MLib
Hi All,I read on the mailing list that random forest implementation was on the roadmap. I wanted to check about its status? We are currently using Weka and would like to move over to MLib for performance.
java.lang.IllegalArgumentException: Unable to create serializer com.esotericsoftware.kryo.serializers.FieldSerializer
Hi All, I am trying to move away from spark-shell to spark-submit and have been making some code changes. However, I am now having problem with serialization. It used to work fine before the code update. Not sure what I did wrong. However, here is the code JaccardScore.scala package approxstrmatch class JaccardScore { val mjc = new Jaccard() with Serializable def main(args: Array[String]) { val conf = new SparkConf().setAppName(ApproxStrMatch).set(spark.storage.memoryFraction, 0.0) conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer) conf.set(spark.kryo.registrator, approxstrmatch.MyRegistrator) val sc = new SparkContext(conf) // More code here… score.calculateSortedJaccardScore(srcFile, distFile) sc.stop() } def calculateSortedJaccardScore (sourcerdd: RDD[String], destrdd: RDD[String]) { // Code over here…} MyRegistrator.scala: This is the central place for registering all the classes. package approxstrmatch import com.wcohen.ss.BasicStringWrapper;import com.wcohen.ss.Jaccard;import com.wcohen.ss.Level2MongeElkan;import com.wcohen.ss.Levenstein;import com.wcohen.ss.ScaledLevenstein;import com.wcohen.ss.Jaro;import com.wcohen.ss.JensenShannonDistance; import com.esotericsoftware.kryo.Kryo// import org.apache.spark.serializer.KryoRegistrator import org.apache.spark.serializer.{KryoSerializer, KryoRegistrator} class MyRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo) { kryo.register(classOf[approxstrmatch.JaccardScore]) kryo.register(classOf[com.wcohen.ss.BasicStringWrapper]) kryo.register(classOf[com.wcohen.ss.Jaccard]) // Bunch of other registrations here. }} I run it as: spark-submit --class approxstrmatch.JaccardDriver --master local --executor-memory 8G /apps/sameert/software/approxstrmatch/target/scala-2.10/classes/approxstrmatch_2.10-1.0.jar I get the following error message: java.lang.IllegalArgumentException: Unable to create serializer com.esotericsoftware.kryo.serializers.FieldSerializer for class: approxstrmatch.JaccardScoreat com.esotericsoftware.kryo.Kryo.newSerializer(Kryo.java:335)at com.esotericsoftware.kryo.Kryo.newDefaultSerializer(Kryo.java:314)at com.twitter.chill.KryoBase.newDefaultSerializer(KryoBase.scala:49)at com.esotericsoftware.kryo.Kryo.getDefaultSerializer(Kryo.java:307)at com.esotericsoftware.kryo.Kryo.register(Kryo.java:351)at approxstrmatch.MyRegistrator.registerClasses(MyRegistrator.scala:18)at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:77)at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:73)at scala.Option.foreach(Option.scala:236)at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:73)at org.apache.spark.serializer.KryoSerializerInstance.init(KryoSerializer.scala:130)at org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:92)at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:995)at org.apache.spark.storage.DiskStore.putValues(DiskStore.scala:80)at org.apache.spark.storage.DiskStore.putValues(DiskStore.scala:66)at org.apache.spark.storage.BlockManager.dropFromMemory(BlockManager.scala:847)at org.apache.spark.storage.MemoryStore.tryToPut(MemoryStore.scala:205)at org.apache.spark.storage.MemoryStore.putValues(MemoryStore.scala:76)at org.apache.spark.storage.MemoryStore.putValues(MemoryStore.scala:92)at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:661)at org.apache.spark.storage.BlockManager.put(BlockManager.scala:546)at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:812)at org.apache.spark.broadcast.HttpBroadcast.init(HttpBroadcast.scala:52)at org.apache.spark.broadcast.HttpBroadcastFactory.newBroadcast(HttpBroadcastFactory.scala:35)
java.lang.IllegalArgumentException: Unable to create serializer com.esotericsoftware.kryo.serializers.FieldSerializer
Hi All, I am trying to move away from spark-shell to spark-submit and have been making some code changes. However, I am now having problem with serialization. It used to work fine before the code update. Not sure what I did wrong. However, here is the code JaccardScore.scala package approxstrmatch class JaccardScore { val mjc = new Jaccard() with Serializable def main(args: Array[String]) { val conf = new SparkConf().setAppName(ApproxStrMatch).set(spark.storage.memoryFraction, 0.0) conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer) conf.set(spark.kryo.registrator, approxstrmatch.MyRegistrator) val sc = new SparkContext(conf) // More code here… score.calculateSortedJaccardScore(srcFile, distFile) sc.stop() } def calculateSortedJaccardScore (sourcerdd: RDD[String], destrdd: RDD[String]) { // Code over here… } MyRegistrator.scala: This is the central place for registering all the classes. package approxstrmatch import com.wcohen.ss.BasicStringWrapper; import com.wcohen.ss.Jaccard; import com.wcohen.ss.Level2MongeElkan; import com.wcohen.ss.Levenstein; import com.wcohen.ss.ScaledLevenstein; import com.wcohen.ss.Jaro; import com.wcohen.ss.JensenShannonDistance; import com.esotericsoftware.kryo.Kryo // import org.apache.spark.serializer.KryoRegistrator import org.apache.spark.serializer.{KryoSerializer, KryoRegistrator} class MyRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo) { kryo.register(classOf[approxstrmatch.JaccardScore]) kryo.register(classOf[com.wcohen.ss.BasicStringWrapper]) kryo.register(classOf[com.wcohen.ss.Jaccard]) // Bunch of other registrations here. } } I run it as: spark-submit --class approxstrmatch.JaccardDriver --master local --executor-memory 8G /apps/sameert/software/approxstrmatch/target/scala-2.10/classes/approxstrmatch_2.10-1.0.jar I get the following error message: java.lang.IllegalArgumentException: Unable to create serializer com.esotericsoftware.kryo.serializers.FieldSerializer for class: approxstrmatch.JaccardScore at com.esotericsoftware.kryo.Kryo.newSerializer(Kryo.java:335) at com.esotericsoftware.kryo.Kryo.newDefaultSerializer(Kryo.java:314) at com.twitter.chill.KryoBase.newDefaultSerializer(KryoBase.scala:49) at com.esotericsoftware.kryo.Kryo.getDefaultSerializer(Kryo.java:307) at com.esotericsoftware.kryo.Kryo.register(Kryo.java:351) at approxstrmatch.MyRegistrator.registerClasses(MyRegistrator.scala:18) at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:77) at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:73) at scala.Option.foreach(Option.scala:236) at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:73) at org.apache.spark.serializer.KryoSerializerInstance.init(KryoSerializer.scala:130) at org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:92) at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:995) at org.apache.spark.storage.DiskStore.putValues(DiskStore.scala:80) at org.apache.spark.storage.DiskStore.putValues(DiskStore.scala:66) at org.apache.spark.storage.BlockManager.dropFromMemory(BlockManager.scala:847) at org.apache.spark.storage.MemoryStore.tryToPut(MemoryStore.scala:205) at org.apache.spark.storage.MemoryStore.putValues(MemoryStore.scala:76) at org.apache.spark.storage.MemoryStore.putValues(MemoryStore.scala:92) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:661) at org.apache.spark.storage.BlockManager.put(BlockManager.scala:546) at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:812) at org.apache.spark.broadcast.HttpBroadcast.init(HttpBroadcast.scala:52) at org.apache.spark.broadcast.HttpBroadcastFactory.newBroadcast(HttpBroadcastFactory.scala:35)
java.lang.OutOfMemoryError: Java heap space
Hi everyone,I have the following configuration. I am currently running my app in local mode. val conf = new SparkConf().setMaster(local[2]).setAppName(ApproxStrMatch).set(spark.executor.memory, 3g).set(spark.storage.memoryFraction, 0.1) I am getting the following error. I tried setting up spark.executor.memory and memory fraction setting, however my UI does not show the increase and I still get these errors. I am loading a TSV file from HDFS (around 5 GB). Does this mean, I should update these settings and add more memory or is it somethign else? Spark master has 24 GB physical memory and workers have 16 GB, but we are running other services (CDH 5.1) on these nodes as well. 14/07/31 09:48:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks14/07/31 09:48:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks14/07/31 09:48:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 6 ms14/07/31 09:48:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 6 ms14/07/31 09:48:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 1006632914/07/31 09:48:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 1006632914/07/31 09:48:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks14/07/31 09:48:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks14/07/31 09:48:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 1 ms14/07/31 09:48:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 1 ms14/07/31 09:48:17 ERROR Executor: Exception in task ID 5java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:2271)at java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:178)at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744)14/07/31 09:48:17 ERROR ExecutorUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-3,5,main]java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:2271)at java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:178)at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744)14/07/31 09:48:17 WARN TaskSetManager: Lost TID 5 (task 1.0:0)14/07/31 09:48:17 WARN TaskSetManager: Loss was due to java.lang.OutOfMemoryErrorjava.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:2271)at java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:178)at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744)14/07/31 09:48:17 ERROR TaskSetManager: Task 1.0:0 failed 1 times; aborting job14/07/31 09:48:17 INFO TaskSchedulerImpl: Cancelling stage 114/07/31 09:48:17 INFO DAGScheduler: Failed to run collect at ComputeScores.scala:7614/07/31 09:48:17 INFO Executor: Executor is trying to kill task 614/07/31 09:48:17 INFO TaskSchedulerImpl: Stage 1 was cancelled
Spark SQL and Hive tables
Hi All,I am trying to load data from Hive tables using Spark SQL. I am using spark-shell. Here is what I see: val trainingDataTable = sql(SELECT prod.prod_num, demographics.gender, demographics.birth_year, demographics.income_group FROM prod p JOIN demographics d ON d.user_id = p.user_id) 14/07/25 14:18:46 INFO Analyzer: Max iterations (2) reached for batch MultiInstanceRelations14/07/25 14:18:46 INFO Analyzer: Max iterations (2) reached for batch CaseInsensitiveAttributeReferencesjava.lang.RuntimeException: Table Not Found: prod. I have these tables in hive. I used show tables command to confirm this. Can someone please let me know how do I make them accessible here?
RE: Spark SQL and Hive tables
Hi Jerry,Thanks for your reply. I was following the steps in this programming guide. It does not mention anything about creating HiveContext or HQL explicitly. http://databricks.com/blog/2014/03/26/spark-sql-manipulating-structured-data-using-spark-2.html Users(userId INT, name String, email STRING,age INT, latitude: DOUBLE, longitude: DOUBLE,subscribed: BOOLEAN)Events(userId INT, action INT)Given the data stored in in these tables, one might want to build a model that will predict which users are good targets for a new campaign, based on users that are similar.// Data can easily be extracted from existing sources, // such as Apache Hive. val trainingDataTable = sql( SELECT e.action u.age, u.latitude, u.logitude FROM Users u JOIN Events e ON u.userId = e.userId) Date: Fri, 25 Jul 2014 17:27:17 -0400 Subject: Re: Spark SQL and Hive tables From: chiling...@gmail.com To: user@spark.apache.org Hi Sameer, Maybe this page will help you: https://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables Best Regards, Jerry On Fri, Jul 25, 2014 at 5:25 PM, Sameer Tilak ssti...@live.com wrote: Hi All,I am trying to load data from Hive tables using Spark SQL. I am using spark-shell. Here is what I see: val trainingDataTable = sql(SELECT prod.prod_num, demographics.gender, demographics.birth_year, demographics.income_group FROM prod p JOIN demographics d ON d.user_id = p.user_id) 14/07/25 14:18:46 INFO Analyzer: Max iterations (2) reached for batch MultiInstanceRelations14/07/25 14:18:46 INFO Analyzer: Max iterations (2) reached for batch CaseInsensitiveAttributeReferences java.lang.RuntimeException: Table Not Found: prod. I have these tables in hive. I used show tables command to confirm this. Can someone please let me know how do I make them accessible here?
RE: Spark SQL and Hive tables
Thanks, Jerry. Date: Fri, 25 Jul 2014 17:48:27 -0400 Subject: Re: Spark SQL and Hive tables From: chiling...@gmail.com To: user@spark.apache.org Hi Sameer, The blog post you referred to is about Spark SQL. I don't think the intent of the article is meant to guide you how to read data from Hive via Spark SQL. So don't worry too much about the blog post. The programming guide I referred to demonstrate how to read data from Hive using Spark SQL. It is a good starting point. Best Regards, Jerry On Fri, Jul 25, 2014 at 5:38 PM, Sameer Tilak ssti...@live.com wrote: Hi Michael,Thanks. I am not creating HiveContext, I am creating SQLContext. I am using CDH 5.1. Can you please let me know which conf/ directory you are talking about? From: mich...@databricks.com Date: Fri, 25 Jul 2014 14:34:53 -0700 Subject: Re: Spark SQL and Hive tables To: user@spark.apache.org In particular, have you put your hive-site.xml in the conf/ directory? Also, are you creating a HiveContext instead of a SQLContext? On Fri, Jul 25, 2014 at 2:27 PM, Jerry Lam chiling...@gmail.com wrote: Hi Sameer, Maybe this page will help you: https://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables Best Regards, Jerry On Fri, Jul 25, 2014 at 5:25 PM, Sameer Tilak ssti...@live.com wrote: Hi All,I am trying to load data from Hive tables using Spark SQL. I am using spark-shell. Here is what I see: val trainingDataTable = sql(SELECT prod.prod_num, demographics.gender, demographics.birth_year, demographics.income_group FROM prod p JOIN demographics d ON d.user_id = p.user_id) 14/07/25 14:18:46 INFO Analyzer: Max iterations (2) reached for batch MultiInstanceRelations14/07/25 14:18:46 INFO Analyzer: Max iterations (2) reached for batch CaseInsensitiveAttributeReferences java.lang.RuntimeException: Table Not Found: prod. I have these tables in hive. I used show tables command to confirm this. Can someone please let me know how do I make them accessible here?
error: bad symbolic reference. A signature in SparkContext.class refers to term io in package org.apache.hadoop which is not available
Hi everyone,I was using Spark1.0 from Apache site and I was able to compile my code successfully using: scalac -classpath /apps/software/secondstring/secondstring/dist/lib/secondstring-20140630.jar:/apps/software/spark-1.0.0-bin-hadoop1/lib/datanucleus-api-jdo-3.2.1.jar:/apps/software/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:spark-assembly-1.0.0-hadoop1.0.4.jar/datanucleus-core-3.2.2.jar ComputeScores.scala Last week I have moved to CDH 5.1 and I am trying to compile the same by doing the following. However, I am getting the following errors. Any help with this will be great! scalac -classpath /apps/software/secondstring/secondstring/dist/lib/secondstring-20140723.jar:/opt/cloudera/parcels/CDH/lib/spark/core/lib/spark-core_2.10-1.0.0-cdh5.1.0.jar:/opt/cloudera/parcels/CDH/lib/spark/lib/kryo-2.21.jar:/opt/cloudera/parcels/CDH/lib/hadoop/lib/commons-io-2.4.jar JaccardScore.scala JaccardScore.scala:37: error: bad symbolic reference. A signature in SparkContext.class refers to term ioin package org.apache.hadoop which is not available.It may be completely missing from the current classpath, or the version onthe classpath might be incompatible with the version used when compiling SparkContext.class. val mjc = new Jaccard() with Serializable ^JaccardScore.scala:39: error: bad symbolic reference. A signature in SparkContext.class refers to term ioin package org.apache.hadoop which is not available.It may be completely missing from the current classpath, or the version onthe classpath might be incompatible with the version used when compiling SparkContext.class. val conf = new SparkConf().setMaster(spark://pzxnvm2021:7077).setAppName(ApproxStrMatch) ^JaccardScore.scala:51: error: bad symbolic reference. A signature in SparkContext.class refers to term ioin package org.apache.hadoop which is not available.It may be completely missing from the current classpath, or the version onthe classpath might be incompatible with the version used when compiling SparkContext.class. var scorevector = destrdd.map(x = jc_.score(str1, new BasicStringWrapper(x)))
RE: error: bad symbolic reference. A signature in SparkContext.class refers to term io in package org.apache.hadoop which is not available
Hi Sean,Thanks for the quick reply. I moved to an sbt-based build and I was able to build the project successfully. In my /apps/sameert/software/approxstrmatch I see the following: jar -tf target/scala-2.10/approxstrmatch_2.10-1.0.jarMETA-INF/MANIFEST.MFapproxstrmatch/approxstrmatch/MyRegistrator.classapproxstrmatch/JaccardScore$$anonfun$calculateJaccardScore$1.classapproxstrmatch/JaccardScore$$anonfun$calculateAnotatedJaccardScore$1.classapproxstrmatch/JaccardScore$$anonfun$calculateSortedJaccardScore$1$$anonfun$4.classapproxstrmatch/JaccardScore$$anon$1.classapproxstrmatch/JaccardScore$$anonfun$calculateSortedJaccardScore$1$$anonfun$3.classapproxstrmatch/JaccardScore$$anonfun$calculateSortedJaccardScore$1.classapproxstrmatch/JaccardScore$$anonfun$calculateAnotatedJaccardScore$1$$anonfun$2.classapproxstrmatch/JaccardScore.classapproxstrmatch/JaccardScore$$anonfun$calculateSortedJaccardScore$1$$anonfun$5.classapproxstrmatch/JaccardScore$$anonfun$calculateJaccardScore$1$$anonfun$1.class However, when I start my spark shell: spark-shell --jars /apps/sameert/software/secondstring/secondstring/dist/lib/secondstring-20140723.jar /apps/sameert/software/approxstrmatch/target/scala-2.10/approxstrmatch_2.10-1.0.jar I type the following interactively, I get error, not sure what I am missing now. This used to work before. val srcFile = sc.textFile(hdfs://ipaddr:8020/user/sameert/approxstrmatch/target-sentences.csv)val distFile = sc.textFile(hdfs://ipaddr:8020/user/sameert/approxstrmatch/sameer_sentence_filter.tsv) val score = new approxstrmatch.JaccardScore()error: not found: value approxstrmatch From: so...@cloudera.com Date: Wed, 23 Jul 2014 18:11:34 +0100 Subject: Re: error: bad symbolic reference. A signature in SparkContext.class refers to term io in package org.apache.hadoop which is not available To: user@spark.apache.org The issue is that you don't have Hadoop classes in your compiler classpath. In the first example, you are getting Hadoop classes from the Spark assembly, which packages everything together. In the second example, you are referencing Spark .jars as deployed in a Hadoop cluster. They no longer contain a copy of Hadoop classes. So you would also need to add the Hadoop .jars in the cluster to your classpath. It may be much easier to manage this as a project with SBT or Maven and let it sort out dependencies. On Wed, Jul 23, 2014 at 6:01 PM, Sameer Tilak ssti...@live.com wrote: Hi everyone, I was using Spark1.0 from Apache site and I was able to compile my code successfully using: scalac -classpath /apps/software/secondstring/secondstring/dist/lib/secondstring-20140630.jar:/apps/software/spark-1.0.0-bin-hadoop1/lib/datanucleus-api-jdo-3.2.1.jar:/apps/software/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:spark-assembly-1.0.0-hadoop1.0.4.jar/datanucleus-core-3.2.2.jar ComputeScores.scala Last week I have moved to CDH 5.1 and I am trying to compile the same by doing the following. However, I am getting the following errors. Any help with this will be great! scalac -classpath /apps/software/secondstring/secondstring/dist/lib/secondstring-20140723.jar:/opt/cloudera/parcels/CDH/lib/spark/core/lib/spark-core_2.10-1.0.0-cdh5.1.0.jar:/opt/cloudera/parcels/CDH/lib/spark/lib/kryo-2.21.jar:/opt/cloudera/parcels/CDH/lib/hadoop/lib/commons-io-2.4.jar JaccardScore.scala JaccardScore.scala:37: error: bad symbolic reference. A signature in SparkContext.class refers to term io in package org.apache.hadoop which is not available. It may be completely missing from the current classpath, or the version on the classpath might be incompatible with the version used when compiling SparkContext.class. val mjc = new Jaccard() with Serializable ^ JaccardScore.scala:39: error: bad symbolic reference. A signature in SparkContext.class refers to term io in package org.apache.hadoop which is not available. It may be completely missing from the current classpath, or the version on the classpath might be incompatible with the version used when compiling SparkContext.class. val conf = new SparkConf().setMaster(spark://pzxnvm2021:7077).setAppName(ApproxStrMatch) ^ JaccardScore.scala:51: error: bad symbolic reference. A signature in SparkContext.class refers to term io in package org.apache.hadoop which is not available. It may be completely missing from the current classpath, or the version on the classpath might be incompatible with the version used when compiling SparkContext.class. var scorevector = destrdd.map(x = jc_.score(str1, new BasicStringWrapper(x)))
RE: error: bad symbolic reference. A signature in SparkContext.class refers to term io in package org.apache.hadoop which is not available
I was able to resolve this, In my spark-shell command I forgot to add a comma in between two jar files. From: ssti...@live.com To: user@spark.apache.org Subject: RE: error: bad symbolic reference. A signature in SparkContext.class refers to term io in package org.apache.hadoop which is not available Date: Wed, 23 Jul 2014 11:29:03 -0700 Hi Sean,Thanks for the quick reply. I moved to an sbt-based build and I was able to build the project successfully. In my /apps/sameert/software/approxstrmatch I see the following: jar -tf target/scala-2.10/approxstrmatch_2.10-1.0.jarMETA-INF/MANIFEST.MFapproxstrmatch/approxstrmatch/MyRegistrator.classapproxstrmatch/JaccardScore$$anonfun$calculateJaccardScore$1.classapproxstrmatch/JaccardScore$$anonfun$calculateAnotatedJaccardScore$1.classapproxstrmatch/JaccardScore$$anonfun$calculateSortedJaccardScore$1$$anonfun$4.classapproxstrmatch/JaccardScore$$anon$1.classapproxstrmatch/JaccardScore$$anonfun$calculateSortedJaccardScore$1$$anonfun$3.classapproxstrmatch/JaccardScore$$anonfun$calculateSortedJaccardScore$1.classapproxstrmatch/JaccardScore$$anonfun$calculateAnotatedJaccardScore$1$$anonfun$2.classapproxstrmatch/JaccardScore.classapproxstrmatch/JaccardScore$$anonfun$calculateSortedJaccardScore$1$$anonfun$5.classapproxstrmatch/JaccardScore$$anonfun$calculateJaccardScore$1$$anonfun$1.class However, when I start my spark shell: spark-shell --jars /apps/sameert/software/secondstring/secondstring/dist/lib/secondstring-20140723.jar /apps/sameert/software/approxstrmatch/target/scala-2.10/approxstrmatch_2.10-1.0.jar I type the following interactively, I get error, not sure what I am missing now. This used to work before. val srcFile = sc.textFile(hdfs://ipaddr:8020/user/sameert/approxstrmatch/target-sentences.csv)val distFile = sc.textFile(hdfs://ipaddr:8020/user/sameert/approxstrmatch/sameer_sentence_filter.tsv) val score = new approxstrmatch.JaccardScore()error: not found: value approxstrmatch From: so...@cloudera.com Date: Wed, 23 Jul 2014 18:11:34 +0100 Subject: Re: error: bad symbolic reference. A signature in SparkContext.class refers to term io in package org.apache.hadoop which is not available To: user@spark.apache.org The issue is that you don't have Hadoop classes in your compiler classpath. In the first example, you are getting Hadoop classes from the Spark assembly, which packages everything together. In the second example, you are referencing Spark .jars as deployed in a Hadoop cluster. They no longer contain a copy of Hadoop classes. So you would also need to add the Hadoop .jars in the cluster to your classpath. It may be much easier to manage this as a project with SBT or Maven and let it sort out dependencies. On Wed, Jul 23, 2014 at 6:01 PM, Sameer Tilak ssti...@live.com wrote: Hi everyone, I was using Spark1.0 from Apache site and I was able to compile my code successfully using: scalac -classpath /apps/software/secondstring/secondstring/dist/lib/secondstring-20140630.jar:/apps/software/spark-1.0.0-bin-hadoop1/lib/datanucleus-api-jdo-3.2.1.jar:/apps/software/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:spark-assembly-1.0.0-hadoop1.0.4.jar/datanucleus-core-3.2.2.jar ComputeScores.scala Last week I have moved to CDH 5.1 and I am trying to compile the same by doing the following. However, I am getting the following errors. Any help with this will be great! scalac -classpath /apps/software/secondstring/secondstring/dist/lib/secondstring-20140723.jar:/opt/cloudera/parcels/CDH/lib/spark/core/lib/spark-core_2.10-1.0.0-cdh5.1.0.jar:/opt/cloudera/parcels/CDH/lib/spark/lib/kryo-2.21.jar:/opt/cloudera/parcels/CDH/lib/hadoop/lib/commons-io-2.4.jar JaccardScore.scala JaccardScore.scala:37: error: bad symbolic reference. A signature in SparkContext.class refers to term io in package org.apache.hadoop which is not available. It may be completely missing from the current classpath, or the version on the classpath might be incompatible with the version used when compiling SparkContext.class. val mjc = new Jaccard() with Serializable ^ JaccardScore.scala:39: error: bad symbolic reference. A signature in SparkContext.class refers to term io in package org.apache.hadoop which is not available. It may be completely missing from the current classpath, or the version on the classpath might be incompatible with the version used when compiling SparkContext.class. val conf = new SparkConf().setMaster(spark://pzxnvm2021:7077).setAppName(ApproxStrMatch) ^ JaccardScore.scala:51: error: bad symbolic reference. A signature in SparkContext.class refers to term io in package org.apache.hadoop which is not available. It may be completely missing from the current classpath, or the version on the classpath might be incompatible with the version used when compiling SparkContext.class. var scorevector = destrdd.map(x
RE: How should I add a jar?
Hi Nicholas, I am using Spark 1.0 and I use this method to specify the additional jars. First jar is the dependency and the second one is my application. Hope this will work for you. ./spark-shell --jars /apps/software/secondstring/secondstring/dist/lib/secondstring-20140630.jar,/apps/software/scala-approsstrmatch/approxstrmatch.jar Date: Wed, 9 Jul 2014 11:44:27 -0700 From: nicholas.cham...@gmail.com To: u...@spark.incubator.apache.org Subject: How should I add a jar? I’m just starting to use the Scala version of Spark’s shell, and I’d like to add in a jar I believe I need to access Twitter data live, twitter4j. I’m confused over where and how to add this jar in. SPARK-1089 mentions two environment variables, SPARK_CLASSPATH and ADD_JARS. SparkContext also has an addJar method and a jars property, the latter of which does not have an associated doc. What’s the difference between all these jar-related things, and what do I need to do to add this Twitter jar in correctly? Nick View this message in context: How should I add a jar? Sent from the Apache Spark User List mailing list archive at Nabble.com.
CoarseGrainedExecutorBackend: Driver Disassociated
Hi,This time instead of manually starting worker node using ./bin/spark-class org.apache.spark.deploy.worker.Worker spark://IP:PORT I used start-slaves script on the master node. I also enabled -v (verbose flag) in ssh. Here is the o/p that I see. The log file for to the worker node was not created. I will switch back to the manual process for starting the cluster. bash-4.1$ ./start-slaves.sh172.16.48.44: OpenSSH_5.3p1, OpenSSL 1.0.0-fips 29 Mar 2010172.16.48.44: debug1: Reading configuration data /users/userid/.ssh/config172.16.48.44: debug1: Reading configuration data /etc/ssh/ssh_config172.16.48.44: debug1: Applying options for *172.16.48.44: debug1: Connecting to 172.16.48.44 [172.16.48.44] port 22.172.16.48.44: debug1: Connection established.172.16.48.44: debug1: identity file /users/p529444/.ssh/identity type -1172.16.48.44: debug1: identity file /users/p529444/.ssh/identity-cert type -1172.16.48.44: debug1: identity file /users/p529444/.ssh/id_rsa type 1172.16.48.44: debug1: identity file /users/p529444/.ssh/id_rsa-cert type -1172.16.48.44: debug1: identity file /users/p529444/.ssh/id_dsa type -1172.16.48.44: debug1: identity file /users/p529444/.ssh/id_dsa-cert type -1172.16.48.44: debug1: Remote protocol version 2.0, remote software version OpenSSH_5.2p1_q17.gM-hpn13v6172.16.48.44: debug1: match: OpenSSH_5.2p1_q17.gM-hpn13v6 pat OpenSSH*172.16.48.44: debug1: Enabling compatibility mode for protocol 2.0172.16.48.44: debug1: Local version string SSH-2.0-OpenSSH_5.3172.16.48.44: debug1: SSH2_MSG_KEXINIT sent172.16.48.44: debug1: SSH2_MSG_KEXINIT received172.16.48.44: debug1: kex: server-client aes128-ctr hmac-md5 none172.16.48.44: debug1: kex: client-server aes128-ctr hmac-md5 none172.16.48.44: debug1: SSH2_MSG_KEX_DH_GEX_REQUEST(102410248192) sent172.16.48.44: debug1: expecting SSH2_MSG_KEX_DH_GEX_GROUP172.16.48.44: debug1: SSH2_MSG_KEX_DH_GEX_INIT sent172.16.48.44: debug1: expecting SSH2_MSG_KEX_DH_GEX_REPLY172.16.48.44: debug1: Host '172.16.48.44' is known and matches the RSA host key.172.16.48.44: debug1: Found key in /users/p529444/.ssh/known_hosts:6172.16.48.44: debug1: ssh_rsa_verify: signature correct172.16.48.44: debug1: SSH2_MSG_NEWKEYS sent172.16.48.44: debug1: expecting SSH2_MSG_NEWKEYS172.16.48.44: debug1: SSH2_MSG_NEWKEYS received172.16.48.44: debug1: SSH2_MSG_SERVICE_REQUEST sent172.16.48.44: debug1: SSH2_MSG_SERVICE_ACCEPT received172.16.48.44: 172.16.48.44: This is a private computer system. Access to and use requires172.16.48.44: explicit current authorization and is limited to business use.172.16.48.44: All users express consent to monitoring by system personnel to172.16.48.44: detect improper use of or access to the system, system personnel172.16.48.44: may provide evidence of such conduct to law enforcement172.16.48.44: officials and/or company management.172.16.48.44: 172.16.48.44: UAM R2 account support: http://ussweb.crdc.kp.org/UAM/172.16.48.44: 172.16.48.44: For password resets, please call the Helpdesk 888-457-4872172.16.48.44: 172.16.48.44: debug1: Authentications that can continue: gssapi-keyex,gssapi-with-mic,publickey,password,keyboard-interactive172.16.48.44: debug1: Next authentication method: gssapi-keyex172.16.48.44: debug1: No valid Key exchange context172.16.48.44: debug1: Next authentication method: gssapi-with-mic172.16.48.44: debug1: Unspecified GSS failure. Minor code may provide more information172.16.48.44: Cannot find KDC for requested realm172.16.48.44:172.16.48.44: debug1: Unspecified GSS failure. Minor code may provide more information172.16.48.44: Cannot find KDC for requested realm172.16.48.44:172.16.48.44: debug1: Unspecified GSS failure. Minor code may provide more information172.16.48.44:172.16.48.44:172.16.48.44: debug1: Authentications that can continue: gssapi-keyex,gssapi-with-mic,publickey,password,keyboard-interactive172.16.48.44: debug1: Next authentication method: publickey172.16.48.44: debug1: Trying private key: /users/userid/.ssh/identity172.16.48.44: debug1: Offering public key: /users/userid/.ssh/id_rsa172.16.48.44: debug1: Server accepts key: pkalg ssh-rsa blen 277172.16.48.44: debug1: read PEM private key done: type RSA172.16.48.44: debug1: Authentication succeeded (publickey).172.16.48.44: debug1: channel 0: new [client-session]172.16.48.44: debug1: Requesting no-more-sessions@openssh.com172.16.48.44: debug1: Entering interactive session.172.16.48.44: debug1: Sending environment.172.16.48.44: debug1: Sending env LANG = en_US.UTF-8172.16.48.44: debug1: Sending command: cd /apps/software/spark-1.0.0-bin-hadoop1/sbin/.. ; /apps/software/spark-1.0.0-bin-hadoop1/sbin/start-slave.sh 1
Spark: All masters are unresponsive!
Hi All, I am having a few issues with stability and scheduling. When I use spark shell to submit my application. I get the following error message and spark shell crashes. I have a small 4-node cluster for PoC. I tried both manual and scripts-based cluster set up. I tried using FQDN as well for specifying the master node, but no luck. 14/07/07 23:44:35 INFO DAGScheduler: Submitting 2 missing tasks from Stage 1 (MappedRDD[6] at map at JaccardScore.scala:83)14/07/07 23:44:35 INFO TaskSchedulerImpl: Adding task set 1.0 with 2 tasks14/07/07 23:44:35 INFO TaskSetManager: Starting task 1.0:0 as TID 1 on executor localhost: localhost (PROCESS_LOCAL)14/07/07 23:44:35 INFO TaskSetManager: Serialized task 1.0:0 as 2322 bytes in 0 ms14/07/07 23:44:35 INFO TaskSetManager: Starting task 1.0:1 as TID 2 on executor localhost: localhost (PROCESS_LOCAL)14/07/07 23:44:35 INFO TaskSetManager: Serialized task 1.0:1 as 2322 bytes in 0 ms14/07/07 23:44:35 INFO Executor: Running task ID 114/07/07 23:44:35 INFO Executor: Running task ID 214/07/07 23:44:35 INFO BlockManager: Found block broadcast_1 locally14/07/07 23:44:35 INFO BlockManager: Found block broadcast_1 locally14/07/07 23:44:35 INFO HadoopRDD: Input split: hdfs://pzxnvm2018:54310/data/sameer_7-2-2014_3mm_sentences.tsv:0+9723938914/07/07 23:44:35 INFO HadoopRDD: Input split: hdfs://pzxnvm2018:54310/data/sameer_7-2-2014_3mm_sentences.tsv:97239389+9723939014/07/07 23:44:54 INFO AppClient$ClientActor: Connecting to master spark://pzxnvm2018:7077...14/07/07 23:45:14 INFO AppClient$ClientActor: Connecting to master spark://pzxnvm2018:7077...14/07/07 23:45:35 ERROR SparkDeploySchedulerBackend: Application has been killed. Reason: All masters are unresponsive! Giving up.14/07/07 23:45:35 ERROR TaskSchedulerImpl: Exiting due to error from cluster scheduler: All masters are unresponsive! Giving up.14/07/07 23:45:35 WARN HadoopRDD: Exception in RecordReader.close()java.io.IOException: Filesystem closed at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264) at org.apache.hadoop.hdfs.DFSClient.access$1100(DFSClient.java:74) at org.apache.hadoop.hdfs.DFSClient$DFSInputStream.close(DFSClient.java:2135) at java.io.FilterInputStream.close(FilterInputStream.java:181) at org.apache.hadoop.util.LineReader.close(LineReader.java:83) at org.apache.hadoop.mapred.LineRecordReader.close(LineRecordReader.java:168) at org.apache.spark.rdd.HadoopRDD$$anon$1.close(HadoopRDD.scala:208)at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63) at org.apache.spark.rdd.HadoopRDD$$anon$1$$anonfun$1.apply$mcV$sp(HadoopRDD.scala:193) at org.apache.spark.TaskContext$$anonfun$executeOnCompleteCallbacks$1.apply(TaskContext.scala:63) at org.apache.spark.TaskContext$$anonfun$executeOnCompleteCallbacks$1.apply(TaskContext.scala:63) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.TaskContext.executeOnCompleteCallbacks(TaskContext.scala:63) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:113) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722)14/07/07 23:45:35 ERROR Executor: Exception in task ID 2java.io.IOException: Filesystem closed at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264) at org.apache.hadoop.hdfs.DFSClient.access$1100(DFSClient.java:74) at org.apache.hadoop.hdfs.DFSClient$DFSInputStream.read(DFSClient.java:2213)at java.io.DataInputStream.read(DataInputStream.java:100) at org.apache.hadoop.util.LineReader.readLine(LineReader.java:134) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:133)at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at
Further details on spark cluster set up
Hi All, I used ip addresses in my scripts (spark-env.sh) and slaves contain ip addresses of master and slave nodes respectively. However, I still have no luck. Here is the relevant log file snippet: Master node log:14/07/08 10:56:19 ERROR EndpointWriter: AssociationError [akka.tcp://sparkMaster@172.16.48.41:7077] - [akka.tcp://spark@localhost:35797]: Error [Association failed with [akka.tcp://spark@localhost:35797]] [akka.remote.EndpointAssociationException: Association failed with [akka.tcp://spark@localhost:35797]Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: localhost/127.0.0.1:35797]14/07/08 10:56:19 INFO Master: akka.tcp://spark@localhost:35797 got disassociated, removing it.14/07/08 10:56:19 ERROR EndpointWriter: AssociationError [akka.tcp://sparkMaster@172.16.48.41:7077] - [akka.tcp://spark@localhost:35797]: Error [Association failed with [akka.tcp://spark@localhost:35797]] [akka.remote.EndpointAssociationException: Association failed with [akka.tcp://spark@localhost:35797]Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: localhost/127.0.0.1:35797] Worker node log: 14/07/08 10:56:11 INFO ExecutorRunner: Launch command: /usr/lib/jvm/java-1.7.0-openjdk-1.7.0.9.x86_64/jre/bin/java -cp ::/apps/software/spark-1.0.0-bin-hadoop1/conf:/apps/software/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:/apps/hadoop/hadoop-conf -XX:MaxPermSize=128m -Xms512M -Xmx512M org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://spark@localhost:35797/user/CoarseGrainedScheduler 6 pzxnvm2023.x.y.name.org 4 akka.tcp://sparkwor...@pzxnvm2023.x.y.name.or:34222/user/Worker app-20140708105602- 14/07/08 10:56:11 ERROR EndpointWriter: AssociationError [akka.tcp://sparkwor...@pzxnvm2023.x.y.name.org:34222] - [akka.tcp://sparkexecu...@pzxnvm2023.x.y.name.org:52485]: Error [Association failed with [akka.tcp://sparkexecu...@pzxnvm2023.x.y.name.org:52485]] [akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkexecu...@pzxnvm2023.dcld.pldc.kp.org:52485]Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: pzxnvm2023.x.y.name.org/172.16.48.51:52485]14/07/08 10:56:13 INFO Worker: Executor app-20140708105602-/6 finished with state FAILED message Command exited with code 1 exitStatus 114/07/08 10:56:13 INFO Worker: Asked to launch executor app-20140708105602-/8 for ApproxStrMatch
CoarseGrainedExecutorBackend: Driver Disassociated
Dear All, When I look inside the following directory on my worker node:$SPARK_HOME/work/app-20140708110707-0001/3 I see the following error message: log4j:WARN No appenders could be found for logger (org.apache.hadoop.conf.Configuration).log4j:WARN Please initialize the log4j system properly.log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.14/07/08 11:07:11 INFO SparkHadoopUtil: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties14/07/08 11:07:11 INFO SecurityManager: Changing view acls to: p52944414/07/08 11:07:11 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(p529444)14/07/08 11:07:12 INFO Slf4jLogger: Slf4jLogger started14/07/08 11:07:12 INFO Remoting: Starting remoting14/07/08 11:07:13 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkexecu...@pzxnvm2022.dcld.pldc.kp.org:34679]14/07/08 11:07:13 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkexecu...@pzxnvm2022.x.y.name.org:34679]14/07/08 11:07:13 INFO CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://spark@localhost:39701/user/CoarseGrainedScheduler14/07/08 11:07:13 INFO WorkerWatcher: Connecting to worker akka.tcp://sparkwor...@pzxnvm2022.x.y.name.org:37054/user/Worker14/07/08 11:07:13 ERROR CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkexecu...@pzxnvm2022.dcld.pldc.kp.org:34679] - [akka I am not sure what the problem is but it is preventing me to get the 4 node test cluster up and running.
RE: CoarseGrainedExecutorBackend: Driver Disassociated
Hi Aaron,Would really appreciate your help if you can point me to the documentation. Is this something that I need to do with /etc/hosts on each of the worker machines ? Or do I set SPARK_PUBLIC_DNS (if yes, what is the format?) or something else? I have the following set up: master node: pzxnvm2018.x.y.orgworker nodes: pzxnvm2022.x.y.org pzxnvm2023.x.y.org pzxnvm2024.x.y.org From: ilike...@gmail.com Date: Tue, 8 Jul 2014 11:59:54 -0700 Subject: Re: CoarseGrainedExecutorBackend: Driver Disassociated To: user@spark.apache.org Hmm, looks like the Executor is trying to connect to the driver on localhost, from this line:14/07/08 11:07:13 INFO CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://spark@localhost:39701/user/CoarseGrainedScheduler What is your setup? Standalone mode with 4 separate machines? Are you configuring the driver public dns name somewhere? On Tue, Jul 8, 2014 at 11:52 AM, Sameer Tilak ssti...@live.com wrote: Dear All, When I look inside the following directory on my worker node:$SPARK_HOME/work/app-20140708110707-0001/3 I see the following error message: log4j:WARN No appenders could be found for logger (org.apache.hadoop.conf.Configuration).log4j:WARN Please initialize the log4j system properly.log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 14/07/08 11:07:11 INFO SparkHadoopUtil: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties14/07/08 11:07:11 INFO SecurityManager: Changing view acls to: p529444 14/07/08 11:07:11 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(p529444) 14/07/08 11:07:12 INFO Slf4jLogger: Slf4jLogger started14/07/08 11:07:12 INFO Remoting: Starting remoting14/07/08 11:07:13 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkexecu...@pzxnvm2022.dcld.pldc.kp.org:34679] 14/07/08 11:07:13 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkexecu...@pzxnvm2022.x.y.name.org:34679] 14/07/08 11:07:13 INFO CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://spark@localhost:39701/user/CoarseGrainedScheduler14/07/08 11:07:13 INFO WorkerWatcher: Connecting to worker akka.tcp://sparkwor...@pzxnvm2022.x.y.name.org:37054/user/Worker 14/07/08 11:07:13 ERROR CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkexecu...@pzxnvm2022.dcld.pldc.kp.org:34679] - [akka I am not sure what the problem is but it is preventing me to get the 4 node test cluster up and running.
Error while launching spark cluster manaually
Hi All,I am having the following issue -- may be fqdn/ip resolution issue, but not sure, any help with this will be great! On the master node I get the following error:I start master using ./start-master.shstarting org.apache.spark.deploy.master.Master, logging to /apps/software/spark-1.0.0-bin-hadoop1/sbin/../logs/spark-p529444-org.apache.spark.deploy.master.Master-1-pzxnvm2018.out bash-4.1$ tail /apps/software/spark-1.0.0-bin-hadoop1/sbin/../logs/spark-p529444-org.apache.spark.deploy.master.Master-1-pzxnvm2018.out 14/07/07 11:03:16 INFO SecurityManager: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties14/07/07 11:03:16 INFO SecurityManager: Changing view acls to: userid14/07/07 11:03:16 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(userid)14/07/07 11:03:16 INFO Slf4jLogger: Slf4jLogger started14/07/07 11:03:16 INFO Remoting: Starting remoting14/07/07 11:03:17 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkMaster@pzxnvm2018:7077]14/07/07 11:03:17 INFO Master: Starting Spark master at spark://pzxnvm2018:707714/07/07 11:03:17 INFO MasterWebUI: Started MasterWebUI at http://pzxnvm2018.a.b.org(masterfqdn):808014/07/07 11:03:17 INFO Master: I have been elected leader! New state: ALIVE 14/07/07 11:07:53 INFO SparkDeploySchedulerBackend: Granted executor ID app-20140707110744-/10 on hostPort worker3fqdn:54921 with 4 cores, 512.0 MB RAM14/07/07 11:07:53 INFO AppClient$ClientActor: Executor updated: app-20140707110744-/10 is now RUNNING14/07/07 11:07:54 INFO AppClient$ClientActor: Executor updated: app-20140707110744-/8 is now FAILED (Command exited with code 1)14/07/07 11:07:54 INFO SparkDeploySchedulerBackend: Executor app-20140707110744-/8 removed: Command exited with code 114/07/07 11:07:54 INFO AppClient$ClientActor: Executor added: app-20140707110744-/11 on worker-20140707110701-worker3fqdn-49287 (worker3fqdn:49287) with 4 cores On the worker node I get the following errorI start the workers node with the following command: ./bin/spark-class org.apache.spark.deploy.worker.Worker spark://pzxnvm2018:7077 worker4ip 4 akka.tcp://sparkWorker@worker4ip:49287/user/Worker app-20140707110744-14/07/07 11:07:49 INFO Worker: Executor app-20140707110744-/3 finished with state FAILED message Command exited with code 1 exitStatus 114/07/07 11:07:49 INFO Worker: Asked to launch executor app-20140707110744-/6 for ApproxStrMatch14/07/07 11:07:49 INFO ExecutorRunner: Launch command: /usr/lib/jvm/java-1.7.0-openjdk-1.7.0.9.x86_64/jre/bin/java -cp ::/apps/software/spark-1.0.0-bin-hadoop1/conf:/apps/software/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:/apps/hadoop/hadoop-conf -XX:MaxPermSize=128m -Xms512M -Xmx512M org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://spark@localhost:59792/user/CoarseGrainedScheduler 6 pzxnvm2024.dcld.pldc.kp.org 4 akka.tcp://sparkwor...@pzxnvm2024.dcld.pldc.kp.org:49287/user/Worker app-20140707110744-14/07/07 11:07:51 INFO Worker: Executor app-20140707110744-/6 finished with state FAILED message Command exited with code 1 exitStatus 1
RDD join: composite keys
Hi everyone, Is it possible to join RDDs using composite keys? I would like to join these two RDDs with RDD1.id1 = RDD2.id1 and RDD1.id2 RDD2.id2RDD1 (id1, id2, scoretype1) RDD2 (id1, id2, scoretype2) I want the result to be ResultRDD = (id1, id2, (score1, score2)) Would really appreciate if you can point me in the right direction.
RE: Serialization of objects
Hi everyone,I was able to solve this issue. For now I changed the library code and added the following to the class com.wcohen.ss.BasicStringWrapper: public class BasicStringWrapper implements Serializable However, I am still curious to know ho to get around the issue when you don't have access to the code and you are using a 3rd party jar. From: ssti...@live.com To: u...@spark.incubator.apache.org Subject: Serialization of objects Date: Thu, 26 Jun 2014 09:30:31 -0700 Hi everyone, Aaron, thanks for your help so far. I am trying to serialize objects that I instantiate from a 3rd party library namely instances of com.wcohen.ss.Jaccard, and com.wcohen.ss.BasicStringWrapper. However, I am having problems with serialization. I am (at least trying to) using Kryo for serialization. I am still facing the serialization issue. I get org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: com.wcohen.ss.BasicStringWrapper Any help with this will be great. Scala code: package approxstrmatch import com.wcohen.ss.BasicStringWrapper;import com.wcohen.ss.Jaccard; import java.util.Iterator; import org.apache.spark.SparkContextimport org.apache.spark.SparkContext._import org.apache.spark.SparkConf import org.apache.spark.rdd;import org.apache.spark.rdd.RDD; import com.esotericsoftware.kryo.Kryoimport org.apache.spark.serializer.KryoRegistrator class MyRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo) { kryo.register(classOf[approxstrmatch.JaccardScore]) kryo.register(classOf[com.wcohen.ss.BasicStringWrapper]) kryo.register(classOf[com.wcohen.ss.Jaccard]) }} class JaccardScore { val mjc = new Jaccard() with Serializable val conf = new SparkConf().setMaster(spark://pzxnvm2018:7077).setAppName(ApproxStrMatch) conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer) conf.set(spark.kryo.registrator, approxstrmatch.MyRegistrator) val sc = new SparkContext(conf) def calculateScoreSecond (sourcerdd: RDD[String], destrdd: RDD[String]) { val jc_ = this.mjc var i: Int = 0 for (sentence - sourcerdd.toLocalIterator) {val str1 = new BasicStringWrapper (sentence)var scorevector = destrdd.map(x = jc_.score(str1, new BasicStringWrapper(x)))val fileName = new String(/apps/software/scala-approsstrmatch-sentence + i) scorevector.saveAsTextFile(fileName)i += 1 } } Here is the script: val distFile = sc.textFile(hdfs://serverip:54310/data/dummy/sample.txt); val srcFile = sc.textFile(hdfs://serverip:54310/data/dummy/test.txt); val score = new approxstrmatch.JaccardScore() score.calculateScoreSecond(srcFile, distFile) O/P: 14/06/25 12:32:05 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at textFile at console:12), which has no missing parents14/06/25 12:32:05 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (MappedRDD[3] at textFile at console:12)14/06/25 12:32:05 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks14/06/25 12:32:05 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on executor localhost: localhost (PROCESS_LOCAL)14/06/25 12:32:05 INFO TaskSetManager: Serialized task 0.0:0 as 1879 bytes in 4 ms14/06/25 12:32:05 INFO Executor: Running task ID 014/06/25 12:32:05 INFO Executor: Fetching http://serverip:47417/jars/approxstrmatch.jar with timestamp 140372470156414/06/25 12:32:05 INFO Utils: Fetching http://serverip:47417/jars/approxstrmatch.jar to /tmp/fetchFileTemp8194323811657370518.tmp14/06/25 12:32:05 INFO Executor: Adding file:/tmp/spark-397828b5-3e0e-4bb4-b56b-58895eb4d6df/approxstrmatch.jar to class loader14/06/25 12:32:05 INFO Executor: Fetching http://serverip:47417/jars/secondstring-20140618.jar with timestamp 140372470156214/06/25 12:32:05 INFO Utils: Fetching http://serverip:47417/jars/secondstring-20140618.jar to /tmp/fetchFileTemp8711755318201511766.tmp14/06/25 12:32:06 INFO Executor: Adding file:/tmp/spark-397828b5-3e0e-4bb4-b56b-58895eb4d6df/secondstring-20140618.jar to class loader14/06/25 12:32:06 INFO BlockManager: Found block broadcast_1 locally14/06/25 12:32:06 INFO HadoopRDD: Input split: hdfs://serverip:54310/data/dummy/test.txt:0+14014/06/25 12:32:06 INFO Executor: Serialized size of result for 0 is 71714/06/25 12:32:06 INFO Executor: Sending result for 0 directly to driver14/06/25 12:32:06 INFO Executor: Finished task ID 014/06/25 12:32:06 INFO TaskSetManager: Finished TID 0 in 227 ms on localhost (progress: 1/1)14/06/25 12:32:06 INFO DAGScheduler: Completed ResultTask(0, 0)14/06/25 12:32:06 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool14/06/25 12:32:06 INFO DAGScheduler: Stage 0 (apply at Iterator.scala:371) finished in 0.242 s14/06/25 12:32:06 INFO SparkContext: Job finished: apply at Iterator.scala:371, took 0.34204941 s14/06/25 12:32:06 INFO FileInputFormat: Total input paths to process : 114/06/25
Serialization of objects
Hi everyone, Aaron, thanks for your help so far. I am trying to serialize objects that I instantiate from a 3rd party library namely instances of com.wcohen.ss.Jaccard, and com.wcohen.ss.BasicStringWrapper. However, I am having problems with serialization. I am (at least trying to) using Kryo for serialization. I am still facing the serialization issue. I get org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: com.wcohen.ss.BasicStringWrapper Any help with this will be great. Scala code: package approxstrmatch import com.wcohen.ss.BasicStringWrapper;import com.wcohen.ss.Jaccard; import java.util.Iterator; import org.apache.spark.SparkContextimport org.apache.spark.SparkContext._import org.apache.spark.SparkConf import org.apache.spark.rdd;import org.apache.spark.rdd.RDD; import com.esotericsoftware.kryo.Kryoimport org.apache.spark.serializer.KryoRegistrator class MyRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo) { kryo.register(classOf[approxstrmatch.JaccardScore]) kryo.register(classOf[com.wcohen.ss.BasicStringWrapper]) kryo.register(classOf[com.wcohen.ss.Jaccard]) }} class JaccardScore { val mjc = new Jaccard() with Serializable val conf = new SparkConf().setMaster(spark://pzxnvm2018:7077).setAppName(ApproxStrMatch) conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer) conf.set(spark.kryo.registrator, approxstrmatch.MyRegistrator) val sc = new SparkContext(conf) def calculateScoreSecond (sourcerdd: RDD[String], destrdd: RDD[String]) { val jc_ = this.mjc var i: Int = 0 for (sentence - sourcerdd.toLocalIterator) {val str1 = new BasicStringWrapper (sentence)var scorevector = destrdd.map(x = jc_.score(str1, new BasicStringWrapper(x)))val fileName = new String(/apps/software/scala-approsstrmatch-sentence + i) scorevector.saveAsTextFile(fileName)i += 1 } } Here is the script: val distFile = sc.textFile(hdfs://serverip:54310/data/dummy/sample.txt); val srcFile = sc.textFile(hdfs://serverip:54310/data/dummy/test.txt); val score = new approxstrmatch.JaccardScore() score.calculateScoreSecond(srcFile, distFile) O/P: 14/06/25 12:32:05 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at textFile at console:12), which has no missing parents14/06/25 12:32:05 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (MappedRDD[3] at textFile at console:12)14/06/25 12:32:05 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks14/06/25 12:32:05 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on executor localhost: localhost (PROCESS_LOCAL)14/06/25 12:32:05 INFO TaskSetManager: Serialized task 0.0:0 as 1879 bytes in 4 ms14/06/25 12:32:05 INFO Executor: Running task ID 014/06/25 12:32:05 INFO Executor: Fetching http://serverip:47417/jars/approxstrmatch.jar with timestamp 140372470156414/06/25 12:32:05 INFO Utils: Fetching http://serverip:47417/jars/approxstrmatch.jar to /tmp/fetchFileTemp8194323811657370518.tmp14/06/25 12:32:05 INFO Executor: Adding file:/tmp/spark-397828b5-3e0e-4bb4-b56b-58895eb4d6df/approxstrmatch.jar to class loader14/06/25 12:32:05 INFO Executor: Fetching http://serverip:47417/jars/secondstring-20140618.jar with timestamp 140372470156214/06/25 12:32:05 INFO Utils: Fetching http://serverip:47417/jars/secondstring-20140618.jar to /tmp/fetchFileTemp8711755318201511766.tmp14/06/25 12:32:06 INFO Executor: Adding file:/tmp/spark-397828b5-3e0e-4bb4-b56b-58895eb4d6df/secondstring-20140618.jar to class loader14/06/25 12:32:06 INFO BlockManager: Found block broadcast_1 locally14/06/25 12:32:06 INFO HadoopRDD: Input split: hdfs://serverip:54310/data/dummy/test.txt:0+14014/06/25 12:32:06 INFO Executor: Serialized size of result for 0 is 71714/06/25 12:32:06 INFO Executor: Sending result for 0 directly to driver14/06/25 12:32:06 INFO Executor: Finished task ID 014/06/25 12:32:06 INFO TaskSetManager: Finished TID 0 in 227 ms on localhost (progress: 1/1)14/06/25 12:32:06 INFO DAGScheduler: Completed ResultTask(0, 0)14/06/25 12:32:06 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool14/06/25 12:32:06 INFO DAGScheduler: Stage 0 (apply at Iterator.scala:371) finished in 0.242 s14/06/25 12:32:06 INFO SparkContext: Job finished: apply at Iterator.scala:371, took 0.34204941 s14/06/25 12:32:06 INFO FileInputFormat: Total input paths to process : 114/06/25 12:32:06 INFO SparkContext: Starting job: saveAsTextFile at JaccardScore.scala:5214/06/25 12:32:06 INFO DAGScheduler: Got job 1 (saveAsTextFile at JaccardScore.scala:52) with 2 output partitions (allowLocal=false)14/06/25 12:32:06 INFO DAGScheduler: Final stage: Stage 1(saveAsTextFile at JaccardScore.scala:52)14/06/25 12:32:06 INFO DAGScheduler: Parents of final stage: List()14/06/25 12:32:06 INFO DAGScheduler: Missing parents: List()14/06/25 12:32:06 INFO DAGScheduler:
Worker nodes: Error messages
Hi All, I see the following error messages on my worker nodes. Are they due to improper cleanup or wrong configuration? Any help with this would be great! 14/06/25 12:30:55 INFO SecurityManager: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties14/06/25 12:30:55 INFO SecurityManager: Changing view acls to: userid14/06/25 12:30:55 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(p529444)14/06/25 12:30:56 INFO Slf4jLogger: Slf4jLogger started14/06/25 12:30:56 INFO Remoting: Starting remoting14/06/25 12:30:56 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkWorker@worker1ip:60276]14/06/25 12:30:57 INFO Worker: Starting Spark worker worker1ip:60276 with 1 cores, 2.7 GB RAM14/06/25 12:30:57 INFO Worker: Spark home: /apps/software/spark-1.0.0-bin-hadoop114/06/25 12:30:57 INFO WorkerWebUI: Started WorkerWebUI at http://worker1ip:808114/06/25 12:30:57 INFO Worker: Connecting to master spark://serverip:7077...14/06/25 12:30:57 INFO Worker: Successfully registered with master spark://serverip:707714/06/25 12:32:05 INFO Worker: Asked to launch executor app-20140625123205-/2 for ApproxStrMatch14/06/25 12:32:05 INFO ExecutorRunner: Launch command: /usr/lib/jvm/java-1.7.0-openjdk-1.7.0.9.x86_64/jre/bin/java -cp ::/apps/software/spark-1.0.0-bin-hadoop1/conf:/apps/software/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:/apps/hadoop/hadoop-conf -XX:MaxPermSize=128m -Xms512M -Xmx512M org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://spark@localhost:56569/user/CoarseGrainedScheduler 2 p worker1ip 1 akka.tcp://sparkWorker@ worker1ip:60276/user/Worker app-20140625123205-14/06/25 12:32:09 INFO Worker: Executor app-20140625123205-/2 finished with state FAILED message Command exited with code 1 exitStatus 114/06/25 12:32:09 INFO Worker: Asked to launch executor app-20140625123205-/5 for ApproxStrMatch14/06/25 12:32:09 INFO ExecutorRunner: Launch command: /usr/lib/jvm/java-1.7.0-openjdk-1.7.0.9.x86_64/jre/bin/java -cp ::/apps/software/spark-1.0.0-bin-hadoop1/conf:/apps/software/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:/apps/hadoop/hadoop-conf -XX:MaxPermSize=128m -Xms512M -Xmx512M org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://spark@localhost:56569/user/CoarseGrainedScheduler 5 worker1ip 1 akka.tcp://sparkWorker@ worker1ip:60276/user/Worker app-20140625123205-14/06/25 12:32:12 INFO Worker: Executor app-20140625123205-/5 finished with state FAILED message Command exited with code 1 exitStatus 114/06/25 12:32:12 INFO Worker: Asked to launch executor app-20140625123205-/9 for ApproxStrMatch14/06/25 12:32:12 INFO ExecutorRunner: Launch command: /usr/lib/jvm/java-1.7.0-openjdk-1.7.0.9.x86_64/jre/bin/java -cp ::/apps/software/spark-1.0.0-bin-hadoop1/conf:/apps/software/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:/apps/hadoop/hadoop-conf -XX:MaxPermSize=128m -Xms512M -Xmx512M org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://spark@localhost:56569/user/CoarseGrainedScheduler 9 worker1ip 1 akka.tcp://sparkWorker@ worker1ip:60276/user/Worker app-20140625123205-14/06/25 12:32:16 INFO Worker: Asked to kill executor app-20140625123205-/914/06/25 12:32:16 INFO ExecutorRunner: Runner thread for executor app-20140625123205-/9 interrupted14/06/25 12:32:16 INFO ExecutorRunner: Killing process!14/06/25 12:32:16 INFO Worker: Executor app-20140625123205-/9 finished with state KILLED14/06/25 13:28:44 INFO Worker: Asked to launch executor app-20140625132844-0001/2 for ApproxStrMatch14/06/25 13:28:44 INFO ExecutorRunner: Launch command: /usr/lib/jvm/java-1.7.0-openjdk-1.7.0.9.x86_64/jre/bin/java -cp ::/apps/software/spark-1.0.0-bin-hadoop1/conf:/apps/software/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:/apps/hadoop/hadoop-conf -XX:MaxPermSize=128m -Xms512M -Xmx512M org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://spark@localhost:46648/user/CoarseGrainedScheduler 2 worker1ip 1 akka.tcp://sparkWorker@ worker1ip:60276/user/Worker app-20140625132844-000114/06/25 13:28:48 INFO Worker: Executor app-20140625132844-0001/2 finished with state FAILED message Command exited with code 1 exitStatus 114/06/25 13:28:48 INFO Worker: Asked to launch executor app-20140625132844-0001/5 for ApproxStrMatch14/06/25 13:28:48 INFO ExecutorRunner: Launch command: /usr/lib/jvm/java-1.7.0-openjdk-1.7.0.9.x86_64/jre/bin/java -cp ::/apps/software/spark-1.0.0-bin-hadoop1/conf:/apps/software/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:/apps/hadoop/hadoop-conf -XX:MaxPermSize=128m -Xms512M -Xmx512M org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://spark@localhost:46648/user/CoarseGrainedScheduler 5 worker1ip 1 akka.tcp://sparkWorker@ worker1ip:60276/user/Worker
RE: DAGScheduler: Failed to run foreach
) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) From: ilike...@gmail.com Date: Mon, 23 Jun 2014 18:00:27 -0700 Subject: Re: DAGScheduler: Failed to run foreach To: user@spark.apache.org CC: u...@spark.incubator.apache.org Please note that this: for (sentence - sourcerdd) {... } is actually Scala syntactic sugar which is converted into sourcerdd.foreach { sentence = ... } What this means is that this will actually run on the cluster, which is probably not what you want if you're trying to print them. Try this instead: for (sentence - sourcerdd.toLocalIterator) { ...} (By the way, the reason this was throwing a NotSerializableException was because you were trying to pass printScoreCanndedString as part of the job's closure. In Java, class methods have an implicit reference to this, so it tried to serialize the class CalculateScore, which is presumably not marked as Serializable.) On Mon, Jun 23, 2014 at 5:45 PM, Sameer Tilak ssti...@live.com wrote: The subject should be: org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: and not DAGScheduler: Failed to run foreach If I call printScoreCanndedString with a hard-coded string and identical 2nd parameter, it works fine. However for my application that is not sufficient. From: ssti...@live.com To: u...@spark.incubator.apache.org Subject: DAGScheduler: Failed to run foreach Date: Mon, 23 Jun 2014 17:05:03 -0700 Hi All, I am using spark for text analysis. I have a source file that has few thousand sentences and a dataset of tens of millions of statements. I want to compare each statement from the sourceFile with each statement from the dataset and generate a score. I am having following problem. I would really appreciate help. Here is what I do within spark-shell // Source file with few thousand sentences val srcFile = sc.textFile(hdfs://serverip/data/dummy/src.txt); // Dataset with tens of millions of statements. val destFile = sc.textFile(hdfs://serverip/data/dummy/sample.txt); // Initialize the score variable. val score = new mypackage.Score() // Generate score. score.calculateScore(srcFile, destFile); Here is my snippet from my scala class (Score.scala) def calculateScore (sourcerdd: RDD[String], destrdd: RDD[String]) { for (sentence - sourcerdd) { println(Source String is: + sentence + Data Type is: + sentence.getClass.getSimpleName) printScoreCanndedString(sentence, destrdd); } }def printScoreCanndedString(sourceStr: String,rdd: RDD[String]) : RDD[Double] = {// Do the analysis here.} The print statement displays the data correctly along with data type as String as expected. However, I get the following error message when it tries to execute printScoreCanndedString method. Any help with this will be great. 14/06/23 16:45:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/06/23 16:45:04 WARN LoadSnappy: Snappy native library not loaded14/06/23 16:45:04 INFO FileInputFormat: Total input paths to process : 114/06/23 16:45:04 INFO SparkContext: Starting job: foreach at calculateScore.scala:51 14/06/23 16:45:04 INFO DAGScheduler: Got job 0 (foreach at CalculateScore.scala:51) with 2 output partitions (allowLocal=false)14/06/23 16:45:04 INFO DAGScheduler: Final stage: Stage 0(foreach at CalculateScore.scala:51) 14/06/23 16:45:04 INFO DAGScheduler: Parents of final stage: List()14/06/23 16:45:04 INFO DAGScheduler: Missing parents: List()14/06/23 16:45:04 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[1] at textFile at console:12), which has no missing parents 14/06/23 16:45:04 INFO DAGScheduler: Failed to run foreach at CalculateScore.scala:51org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: approxstrmatch.CalculateScore at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler
RE: Basic Scala and Spark questions
Hi there,Here is how I specify it during the compilation. scalac -classpath /apps/software/abc.jar:/apps/software/spark-1.0.0-bin-hadoop1/lib/datanucleus-api-jdo-3.2.1.jar:/apps/software/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:spark-assembly-1.0.0-hadoop1.0.4.jar/datanucleus-core-3.2.2.jar Score.scala Then I generate a jar file out of it say myapp. Finally, to run this I do the following: ./spark-shell --jars /apps/software/abc.jar,/apps/software/myapp/myapp.jar Hope this helps. From: vmuttin...@ebay.com To: user@spark.apache.org; u...@spark.incubator.apache.org Subject: RE: Basic Scala and Spark questions Date: Tue, 24 Jun 2014 20:06:04 + Hello Tilak, 1. I get error Not found: type RDD error. Can someone please tell me which jars do I need to add as external jars and what dhoulf I add iunder import statements so that this error will go away. Do you not see any issues with the import statements? Add the spark-assembly-1.0.0-hadoop2.2.0.jar file as a dependency. You can download Spark from here (http://spark.apache.org/downloads.html). You’ll find the above mentioned jar in the lib folder. Import Statement: import org.apache.spark.rdd.RDD From: Sameer Tilak [mailto:ssti...@live.com] Sent: Monday, June 23, 2014 10:38 AM To: u...@spark.incubator.apache.org Subject: Basic Scala and Spark questions Hi All, I am new so Scala and Spark. I have a basic question. I have the following import statements in my Scala program. I want to pass my function (printScore) to Spark. It will compare a string import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf /* import thirdparty jars */ I have the following method in my Scala class: class DistanceClass { val ta = new textAnalytics(); def printScore(sourceStr: String, rdd: RDD[String]) { // Third party jars have StringWrapper val str1 = new StringWrapper (sourceStr) val ta_ = this.ta; rdd.map(str1, x = ta_.score(str1, StringWrapper(x)) } I am using Eclipse for development. I have the following questions: 1. I get error Not found: type RDD error. Can someone please tell me which jars do I need to add as external jars and what dhoulf I add iunder import statements so that this error will go away. 2. Also, including StringWrapper(x) inside map, will that be OK? rdd.map(str1, x = ta_.score(str1, StringWrapper(x))
Basic Scala and Spark questions
Hi All,I am new so Scala and Spark. I have a basic question. I have the following import statements in my Scala program. I want to pass my function (printScore) to Spark. It will compare a string import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf/* import thirdparty jars */ I have the following method in my Scala class: class DistanceClass{val ta = new textAnalytics(); def printScore(sourceStr: String, rdd: RDD[String]) { // Third party jars have StringWrapper val str1 = new StringWrapper (sourceStr)val ta_ = this.ta; rdd.map(str1, x = ta_.score(str1, StringWrapper(x)) } I am using Eclipse for development. I have the following questions:1. I get error Not found: type RDD error. Can someone please tell me which jars do I need to add as external jars and what dhoulf I add iunder import statements so that this error will go away. 2. Also, including StringWrapper(x) inside map, will that be OK? rdd.map(str1, x = ta_.score(str1, StringWrapper(x))
DAGScheduler: Failed to run foreach
Hi All, I am using spark for text analysis. I have a source file that has few thousand sentences and a dataset of tens of millions of statements. I want to compare each statement from the sourceFile with each statement from the dataset and generate a score. I am having following problem. I would really appreciate help. Here is what I do within spark-shell // Source file with few thousand sentences val srcFile = sc.textFile(hdfs://serverip/data/dummy/src.txt); // Dataset with tens of millions of statements. val destFile = sc.textFile(hdfs://serverip/data/dummy/sample.txt); // Initialize the score variable. val score = new mypackage.Score() // Generate score. score.calculateScore(srcFile, destFile); Here is my snippet from my scala class (Score.scala) def calculateScore (sourcerdd: RDD[String], destrdd: RDD[String]) {for (sentence - sourcerdd) { println(Source String is: + sentence + Data Type is: + sentence.getClass.getSimpleName) printScoreCanndedString(sentence, destrdd); } }def printScoreCanndedString(sourceStr: String,rdd: RDD[String]) : RDD[Double] = {// Do the analysis here.} The print statement displays the data correctly along with data type as String as expected. However, I get the following error message when it tries to execute printScoreCanndedString method. Any help with this will be great. 14/06/23 16:45:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable14/06/23 16:45:04 WARN LoadSnappy: Snappy native library not loaded14/06/23 16:45:04 INFO FileInputFormat: Total input paths to process : 114/06/23 16:45:04 INFO SparkContext: Starting job: foreach at calculateScore.scala:5114/06/23 16:45:04 INFO DAGScheduler: Got job 0 (foreach at CalculateScore.scala:51) with 2 output partitions (allowLocal=false)14/06/23 16:45:04 INFO DAGScheduler: Final stage: Stage 0(foreach at CalculateScore.scala:51)14/06/23 16:45:04 INFO DAGScheduler: Parents of final stage: List()14/06/23 16:45:04 INFO DAGScheduler: Missing parents: List()14/06/23 16:45:04 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[1] at textFile at console:12), which has no missing parents14/06/23 16:45:04 INFO DAGScheduler: Failed to run foreach at CalculateScore.scala:51org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: approxstrmatch.CalculateScore at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
RE: DAGScheduler: Failed to run foreach
The subject should be: org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: and not DAGScheduler: Failed to run foreach If I call printScoreCanndedString with a hard-coded string and identical 2nd parameter, it works fine. However for my application that is not sufficient. From: ssti...@live.com To: u...@spark.incubator.apache.org Subject: DAGScheduler: Failed to run foreach Date: Mon, 23 Jun 2014 17:05:03 -0700 Hi All, I am using spark for text analysis. I have a source file that has few thousand sentences and a dataset of tens of millions of statements. I want to compare each statement from the sourceFile with each statement from the dataset and generate a score. I am having following problem. I would really appreciate help. Here is what I do within spark-shell // Source file with few thousand sentences val srcFile = sc.textFile(hdfs://serverip/data/dummy/src.txt); // Dataset with tens of millions of statements. val destFile = sc.textFile(hdfs://serverip/data/dummy/sample.txt); // Initialize the score variable. val score = new mypackage.Score() // Generate score. score.calculateScore(srcFile, destFile); Here is my snippet from my scala class (Score.scala) def calculateScore (sourcerdd: RDD[String], destrdd: RDD[String]) {for (sentence - sourcerdd) { println(Source String is: + sentence + Data Type is: + sentence.getClass.getSimpleName) printScoreCanndedString(sentence, destrdd); } }def printScoreCanndedString(sourceStr: String,rdd: RDD[String]) : RDD[Double] = {// Do the analysis here.} The print statement displays the data correctly along with data type as String as expected. However, I get the following error message when it tries to execute printScoreCanndedString method. Any help with this will be great. 14/06/23 16:45:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable14/06/23 16:45:04 WARN LoadSnappy: Snappy native library not loaded14/06/23 16:45:04 INFO FileInputFormat: Total input paths to process : 114/06/23 16:45:04 INFO SparkContext: Starting job: foreach at calculateScore.scala:5114/06/23 16:45:04 INFO DAGScheduler: Got job 0 (foreach at CalculateScore.scala:51) with 2 output partitions (allowLocal=false)14/06/23 16:45:04 INFO DAGScheduler: Final stage: Stage 0(foreach at CalculateScore.scala:51)14/06/23 16:45:04 INFO DAGScheduler: Parents of final stage: List()14/06/23 16:45:04 INFO DAGScheduler: Missing parents: List()14/06/23 16:45:04 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[1] at textFile at console:12), which has no missing parents14/06/23 16:45:04 INFO DAGScheduler: Failed to run foreach at CalculateScore.scala:51org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: approxstrmatch.CalculateScore at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Running Spark alongside Hadoop
Dear Spark users, I have a small 4 node Hadoop cluster. Each node is a VM -- 4 virtual cores, 8GB memory and 500GB disk. I am currently running Hadoop on it. I would like to run Spark (in standalone mode) along side Hadoop on the same nodes. Given the configuration of my nodes, will that work? Does anyone has any experience in terms of stability and performance of running Spark and Hadoop on somewhat resource-constrained nodes. I was looking at the Spark documentation and there is a way to configure memory and cores for the and worker nodes and memory for the master node: SPARK_WORKER_CORES, SPARK_WORKER_MEMORY, SPARK_DAEMON_MEMORY. Any recommendations on how to share resource between HAdoop and Spark?
Spark and Hadoop cluster
Hi everyone,We are planning to set up Spark. The documentation mentions that it is possible to run Spark in standalone mode on a Hadoop cluster. Does anyone have any comments on stability and performance of this mode?
RE: Pig on Spark
Hi Mayur,We are planning to upgrade our distribution MR1 MR2 (YARN) and the goal is to get SPROK set up next month. I will keep you posted. Can you please keep me informed about your progress as well. From: mayur.rust...@gmail.com Date: Mon, 10 Mar 2014 11:47:56 -0700 Subject: Re: Pig on Spark To: user@spark.apache.org Hi Sameer,Did you make any progress on this. My team is also trying it out would love to know some detail so progress. Mayur Rustagi Ph: +1 (760) 203 3257http://www.sigmoidanalytics.com@mayur_rustagi On Thu, Mar 6, 2014 at 2:20 PM, Sameer Tilak ssti...@live.com wrote: Hi Aniket,Many thanks! I will check this out. Date: Thu, 6 Mar 2014 13:46:50 -0800 Subject: Re: Pig on Spark From: aniket...@gmail.com To: user@spark.apache.org; tgraves...@yahoo.com There is some work to make this work on yarn at https://github.com/aniket486/pig. (So, compile pig with ant -Dhadoopversion=23) You can look at https://github.com/aniket486/pig/blob/spork/pig-spark to find out what sort of env variables you need (sorry, I haven't been able to clean this up- in-progress). There are few known issues with this, I will work on fixing them soon. Known issues-1. Limit does not work (spork-fix)2. Foreach requires to turn off schema-tuple-backend (should be a pig-jira)3. Algebraic udfs dont work (spork-fix in-progress) 4. Group by rework (to avoid OOMs)5. UDF Classloader issue (requires SPARK-1053, then you can put pig-withouthadoop.jar as SPARK_JARS in SparkContext along with udf jars) ~Aniket On Thu, Mar 6, 2014 at 1:36 PM, Tom Graves tgraves...@yahoo.com wrote: I had asked a similar question on the dev mailing list a while back (Jan 22nd). See the archives: http://mail-archives.apache.org/mod_mbox/spark-dev/201401.mbox/browser - look for spork. Basically Matei said: Yup, that was it, though I believe people at Twitter picked it up again recently. I’d suggest asking Dmitriy if you know him. I’ve seen interest in this from several other groups, and if there’s enough of it, maybe we can start another open source repo to track it. The work in that repo you pointed to was done over one week, and already had most of Pig’s operators working. (I helped out with this prototype over Twitter’s hack week.) That work also calls the Scala API directly, because it was done before we had a Java API; it should be easier with the Java one. Tom On Thursday, March 6, 2014 3:11 PM, Sameer Tilak ssti...@live.com wrote: Hi everyone, We are using to Pig to build our data pipeline. I came across Spork -- Pig on Spark at: https://github.com/dvryaboy/pig and not sure if it is still active. Can someone please let me know the status of Spork or any other effort that will let us run Pig on Spark? We can significantly benefit by using Spark, but we would like to keep using the existing Pig scripts. -- ...:::Aniket:::... Quetzalco@tl
Pig on Spark
Hi everyone, We are using to Pig to build our data pipeline. I came across Spork -- Pig on Spark at: https://github.com/dvryaboy/pig and not sure if it is still active. Can someone please let me know the status of Spork or any other effort that will let us run Pig on Spark? We can significantly benefit by using Spark, but we would like to keep using the existing Pig scripts.
RE: Pig on Spark
Hi Aniket,Many thanks! I will check this out. Date: Thu, 6 Mar 2014 13:46:50 -0800 Subject: Re: Pig on Spark From: aniket...@gmail.com To: user@spark.apache.org; tgraves...@yahoo.com There is some work to make this work on yarn at https://github.com/aniket486/pig. (So, compile pig with ant -Dhadoopversion=23) You can look at https://github.com/aniket486/pig/blob/spork/pig-spark to find out what sort of env variables you need (sorry, I haven't been able to clean this up- in-progress). There are few known issues with this, I will work on fixing them soon. Known issues-1. Limit does not work (spork-fix)2. Foreach requires to turn off schema-tuple-backend (should be a pig-jira)3. Algebraic udfs dont work (spork-fix in-progress) 4. Group by rework (to avoid OOMs)5. UDF Classloader issue (requires SPARK-1053, then you can put pig-withouthadoop.jar as SPARK_JARS in SparkContext along with udf jars) ~Aniket On Thu, Mar 6, 2014 at 1:36 PM, Tom Graves tgraves...@yahoo.com wrote: I had asked a similar question on the dev mailing list a while back (Jan 22nd). See the archives: http://mail-archives.apache.org/mod_mbox/spark-dev/201401.mbox/browser - look for spork. Basically Matei said: Yup, that was it, though I believe people at Twitter picked it up again recently. I’d suggest asking Dmitriy if you know him. I’ve seen interest in this from several other groups, and if there’s enough of it, maybe we can start another open source repo to track it. The work in that repo you pointed to was done over one week, and already had most of Pig’s operators working. (I helped out with this prototype over Twitter’s hack week.) That work also calls the Scala API directly, because it was done before we had a Java API; it should be easier with the Java one. Tom On Thursday, March 6, 2014 3:11 PM, Sameer Tilak ssti...@live.com wrote: Hi everyone, We are using to Pig to build our data pipeline. I came across Spork -- Pig on Spark at: https://github.com/dvryaboy/pig and not sure if it is still active. Can someone please let me know the status of Spork or any other effort that will let us run Pig on Spark? We can significantly benefit by using Spark, but we would like to keep using the existing Pig scripts. -- ...:::Aniket:::... Quetzalco@tl