Spark preserve timestamp
Do we have option to say to spark to preserve time stamp while creating struct. Regards, Sudhir
Timestamp changing while writing
Hello, I am using createDataframe and passing java row rdd and schema . But it is changing the time value when I write that data frame to a parquet file. Can any one help . Thank you, Sudhir
Re: Custom line/record delimiter
Thanks for the update Kwon. Regards, On Mon, Jan 1, 2018 at 7:54 PM Hyukjin Kwon <gurwls...@gmail.com> wrote: > Hi, > > > There's a PR - https://github.com/apache/spark/pull/18581 and JIRA > - SPARK-21289 > > Alternatively, you could check out multiLine option for CSV and see if > applicable. > > > Thanks. > > > 2017-12-30 2:19 GMT+09:00 sk skk <spark.s...@gmail.com>: > >> Hi, >> >> Do we have an option to write a csv or text file with a custom >> record/line separator through spark ? >> >> I could not find any ref on the api. I have a issue while loading data >> into a warehouse as one of the column on csv have a new line character and >> the warehouse is not letting to escape that new line character . >> >> Thank you , >> Sk >> > >
Custom line/record delimiter
Hi, Do we have an option to write a csv or text file with a custom record/line separator through spark ? I could not find any ref on the api. I have a issue while loading data into a warehouse as one of the column on csv have a new line character and the warehouse is not letting to escape that new line character . Thank you , Sk
Sparkcontext on udf
I have registered a udf with sqlcontext , I am trying to read another parquet using sqlcontext under same udf it’s throwing null pointer exception . Any help how to access sqlcontext inside a udf ? Regards, Sk
Appending column to a parquet
Hi , I have two parquet files with different schemas based on unique I have to fetch one column value and append to all rows on the parquet file . I tried join but I guess due to diff schema it’s not working . I can use withcolumn but can we get single value of a column and assign it to a literal as if I register it as a temp table and fetch that column value and assigning it to a string it is return a row to string schema and not getting a literal . Is there a better way to handle this or how to get a literal value from temporary table . Thank you , Sk
Java Rdd of String to dataframe
Can we create a dataframe from a Java pair rdd of String . I don’t have a schema as it will be a dynamic Json. I gave encoders.string class. Any help is appreciated !! Thanks, SK
how to fetch schema froma dynamic nested JSON
Hi, i have a requirement where i have to read a dynamic nested JSON for schema and need to check the data quality based on the schema. i.e i get the details from a JSON i.e say column 1 should be string, length kinda... this is dynamic json and nested one. so traditionally i have to loop the json object and fetch all the data. Coming to data array i have to read a json array where each json object should be checked with the above json schema i.e on the json array first json object first column data should be string,lengthmatch . With out looping schema json and inside that looping this data array which will be performance impact, do we have any options or better way to handle.. Thanks in advance. sk
Parsing nested json objects with variable structure
Hi, I need to parse a json input file where the nested objects take on a different structure based on the typeId field, as follows: { "d": { "uid" : "12345" "contents": [{"info": {"eventId": "event1"}, "typeId": 19}] } } { "d": { "uid" : "56780" "contents": [{"info": {"id": "1"}, "typeId": 1003}, {"info": {"id": "27"}, "typeId": 13}] } } In the above, the "contents" field takes on a different structure for typeId 13 and 19. My code is currently as follows: logs = sqlc.read.json(sys.argv[1]) logs.registerTempTable("logs") features = sqlc.sql("SELECT d.uid, d.contents.typeId FROM logs") I also need to extract the fields in d.contents.info. How can I extract these fields since they have different names depending on the typeId? I am using Pyspark in Spark version 1.4.1. Any guidance in python or scala would be helpful. thanks sudha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Parsing-nested-json-objects-with-variable-structure-tp24526.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RandomForest in Pyspark (version 1.4.1)
Hi, I tried to develop a RandomForest model for my data in PySpark as follows: rf_model = RandomForest.trainClassifier(train_idf, 2, {}, numTrees=15, seed=144) print RF: Num trees = %d, Num nodes = %d\n %(rf_model.numTrees(), rf_model.totalNumNodes()) pred_label = test_idf.map(lambda p: (float(rf_model.predict(p.features)), p.label)) print pred_label.take(5) ## exception I am getting the following error at the highlighted statement. Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transforamtion. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063. I have used the same set of statements for linear models (LogisticRegresssion and SVM) in PySpark and was able to get the predictions abd print them. I am not sure why I am getting the above exception. I am not using the SparkContenxt directly in any of the above statements. I would appreciate your help. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RandomForest-in-Pyspark-version-1-4-1-tp24103.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
filter expression in API document for DataFrame
The following statement appears in the Scala API example at https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrame people.filter(age 30). I tried this example and it gave a compilation error. I think this needs to be changed to people.filter(people(age) 30) Also, it would be good to add some examples for the new equality operator for columns (e.g. (people(age) === 30) ). The programming guide does not have an example for this in the DataFrame Operations section and it was not very obvious that we need to be using a different equality operator for columns. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/filter-expression-in-API-document-for-DataFrame-tp22213.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
column expression in left outer join for DataFrame
Hi, I am trying to port some code that was working in Spark 1.2.0 on the latest version, Spark 1.3.0. This code involves a left outer join between two SchemaRDDs which I am now trying to change to a left outer join between 2 DataFrames. I followed the example for left outer join of DataFrame at https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html Here's my code, where df1 and df2 are the 2 dataframes I am joining on the country field: val join_df = df1.join( df2, df1.country == df2.country, left_outer) But I got a compilation error that value country is not a member of sql.DataFrame I also tried the following: val join_df = df1.join( df2, df1(country) == df2(country), left_outer) I got a compilation error that it is a Boolean whereas a Column is required. So what is the correct Column expression I need to provide for joining the 2 dataframes on a specific field ? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/column-expression-in-left-outer-join-for-DataFrame-tp22209.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
MLLib: feature standardization
Hi, I have a dataset in csv format and I am trying to standardize the features before using k-means clustering. The data does not have any labels but has the following format: s1, f12,f13,... s2, f21,f22,... where s is a string id, and f is a floating point feature value. To perform feature standardization, I need to compute the mean and variance/std deviation of the features values in each element of the RDD (i.e each row). However, the summary Statistics library in Spark MLLib provides only a colStats() method that provides column-wise mean and variance. I tried to compute the mean and variance per row, using the code below but got a compilation error that there is no mean() or variance() method for a tuple or Vector object. Is there a Spark library to compute the row-wise mean and variance for an RDD, where each row (i.e. element) of the RDD is a Vector or tuple of N feature values? thanks My code for standardization is as follows: //read the data val data=sc.textFile(file_name) .map(_.split(,)) // extract the features. For this example I am using only 2 features, but the data has more features val features = data.map(d= Vectors.dense(d(1).toDouble, d(2).toDouble)) val std_features = features.map(f= { val fmean = f.mean() // Error: NO MEAN() for a Vector or Tuple object val fstd= scala.math.sqrt(f.variance())// Error: NO variance() for a Vector or Tuple object for (i - 0 to f.length) // standardize the features { var fs = 0.0 if (fstd 0.0) fs = (f(i) - fmean)/fstd fs } } ) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLLib-feature-standardization-tp21539.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
K-Means final cluster centers
Hi, I am trying to get the final cluster centers after running the KMeans algorithm in MLlib in order to characterize the clusters. But the KMeansModel does not have any public method to retrieve this info. There appears to be only a private method called clusterCentersWithNorm. I guess I could call predict() to get the final cluster assignment for the dataset and write my own code to compute the means based on this final assignment. But I would like to know if there is a way to get this info from MLLib API directly after running KMeans? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/K-Means-final-cluster-centers-tp21523.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
save a histogram to a file
Hi, histogram() returns an object that is a pair of Arrays. There appears to be no saveAsTextFile() for this paired object. Currently I am using the following to save the output to a file: val hist = a.histogram(10) val arr1 = sc.parallelize(hist._1).saveAsTextFile(file1) val arr2 = sc.parallelize(hist._2).saveAsTextFile(file2) Is there a simpler way to save the histogram() result to a file? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/save-a-histogram-to-a-file-tp21324.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark SQL performance and data size constraints
Hi, I use the following code to read in data and extract the unique users using Spark SQL. The data is 1.2 TB and I am running this on a cluster with 3 TB memory. It appears that there is enough memory, but the program just freezes after sometime where it maps the rdd to the case class Play. (If I dont use the Spark SQL portion (i.e dont map to the case class and register the table etc.) and merely load the data (first 3 lines of the code below) then the program completes.) I tried with spark.storage.memoryFraction=0.5 and 0.6 (default) as suggested in the Tuning guide. but that did not help. According to the logs, total # of partitions/tasks is 38688 and size of each rdd partition for the mapping to the case class is around 31 MB. So total rdd size is 38688*31 = 1.2 TB. This is less than the 3 TB memory on the cluster. At the time the program stops, the total number of tasks is a little 38688 with some of them appearing as failed. There are no details for why the tasks failed. Are there any maximum data size constraints in Spark SQL or table creation that might be causing the program to hang? Are there any performance optimizations I could try with Spark SQL that might allow the completion of the task? val data = sc.textFile(shared_dir/*.dat) .map(_.split(\t)) .persist(MEMORY_AND_DISK_SER) val play = data.map(f = Play(f(0).trim,f(1).trim, f(2).trim, f(3).trim)) .persist(MEMORY_AND_DISK_SER) // register the RDD as a table play.registerTempTable(play) val ids = sql_cxt.sql(SELECT DISTINCT id FROM play) println(Number of unique account ID = %d.format(ids.count())) println(Number of RDDs = %d.format(play.count())) thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-performance-and-data-size-constraints-tp19843.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark performance optimization examples
Hi, Is there any document that provides some guidelines with some examples that illustrate when different performance optimizations would be useful? I am interested in knowing the guidelines for using optimizations like cache(), persist(), repartition(), coalesce(), and broadcast variables. I studied the online programming guide, but I would like some more details (something along the lines of Aaron Davidson's talk which illustrates the use of repartition() with an example during the Spark summit). In particular, I have a dataset that is about 1.2TB (about 30 files) that I am trying to load using sc.textFile on a cluster with a total memory of 3TB (170 GB per node). But I am not able to successfully complete the loading. THe program is continuously active in the mapPartitions task but does not get past that even after a long time. I have tried some of the above optimizations. But that has not helped and I am not sure if I am using these optimizations in the right way or which of the above optimizations would be most appropriate to this problem. So I would appreciate any guidelines. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-performance-optimization-examples-tp19707.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark failing when loading large amount of data
Hi, I am using sc.textFile(shared_dir/*) to load all the files in a directory on a shared partition. The total size of the files in this directory is 1.2 TB. We have a 16 node cluster with 3 TB memory (1 node is driver, 15 nodes are workers). But the loading fails after around 1 TB of data is read (in the mapPartitions stage). Basically, there is no progress in mapPartitions after 1 TB of input. It seems that the cluster has sufficient memory but not sure why the program get stuck. 1.2 TB of data divided across 15 worker nodes would require each node to have about 80 GB of memory. Every node in the cluster is allocated around 170GB of memory. According to the spark documentation, the default storage fraction for RDDs is 60% of the allocated memory. I have increased that to 0.8 (by setting --conf spark.storage.memorFraction=0.8) , so each node should have around 136 GB of memory for storing RDDs. So I am not sure why the program is failing in the mapPartitions stage where it seems to be loading the data. I dont have a good idea about the Spark internals and would appreciate any help in fixing this issue. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-failing-when-loading-large-amount-of-data-tp19441.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Streaming: getting total count over all windows
Hi, I am using the following code to generate the (score, count) for each window: val score_count_by_window = topic.map(r = r._2) // r._2 is the integer score .countByValue() score_count_by_window.print() E.g. output for a window is as follows, which means that within the Dstream for that window, there are 2 rdds with score 0; 3 with score 1, and 1 with score -1. (0, 2) (1, 3) (-1, 1) I would like to get the aggregate count for each score over all windows until program terminates. I tried countByValueAndWindow() but the result is same as countByValue() (i.e. it is producing only per window counts). reduceByWindow also does not produce the result I am expecting. What is the correct way to sum up the counts over multiple windows? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-getting-total-count-over-all-windows-tp1.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
filtering out non English tweets using TwitterUtils
Hi, Is there a way to extract only the English language tweets when using TwitterUtils.createStream()? The filters argument specifies the strings that need to be contained in the tweets, but I am not sure how this can be used to specify the language. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/filtering-out-non-English-tweets-using-TwitterUtils-tp18614.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: filtering out non English tweets using TwitterUtils
Thanks for the response. I tried the following : tweets.filter(_.getLang()=en) I get a compilation error: value getLang is not a member of twitter4j.Status But getLang() is one of the methods of twitter4j.Status since version 3.0.6 according to the doc at: http://twitter4j.org/javadoc/twitter4j/Status.html#getLang-- What version of twitter4j does Spark Streaming use? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/filtering-out-non-English-tweets-using-TwitterUtils-tp18614p18621.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: filtering out non English tweets using TwitterUtils
Small typo in my code in the previous post. That should be: tweets.filter(_.getLang()==en) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/filtering-out-non-English-tweets-using-TwitterUtils-tp18614p18622.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
groupBy for DStream
Hi. 1) I dont see a groupBy() method for a DStream object. Not sure why that is not supported. Currently I am using filter () to separate out the different groups. I would like to know if there is a way to convert a DStream object to a regular RDD so that I can apply the RDD methods like groupBy. 2) The count() method for a DStream object returns a DStream[Long] instead of a simple Long (like RDD does). How can I extract the simple Long count value? I tried dstream(0) but got a compilation error that it does not take parameters. I also tried dstream[0], but that also resulted in a compilation error. I am not able to use the head() or take(0) method for DStream either. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/groupBy-for-DStream-tp18623.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
specifying sort order for sort by value
Hi, I am using rdd.sortBy(_._2) to get an RDD sorted by value. The default order is ascending order. How can I get it sorted in descending order? I could not find an option to specify the order. I need to get the top K elements of the list sorted in descending order. If there is no option to get the descending order, I would like to know if there is a way to get the last K elements of the list sorted in ascending order. take(k) gets the first k elements, is there an option to get the last K elements of an RDD ? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/specifying-sort-order-for-sort-by-value-tp18289.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: specifying sort order for sort by value
Thanks. I was looking at an older RDD documentation that did not specify the ordering option. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/specifying-sort-order-for-sort-by-value-tp18289p18292.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Using SQL statements vs. SchemaRDD methods
SchemaRDD supports some of the SQL-like functionality like groupBy(), distinct(), select(). However, SparkSQL also supports SQL statements which provide this functionality. In terms of future support and performance, is it better to use SQL statements or the SchemaRDD methods that provide equivalent functionality? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-SQL-statements-vs-SchemaRDD-methods-tp18124.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkSQL: Nested Query error
Hi, I am getting an error in the Query Plan when I use the SQL statement exactly as you have suggested. Is that the exact SQL statement I should be using (I am not very familiar with SQL syntax)? I also tried using the SchemaRDD's subtract method to perform this query. usersRDD.subtract(deviceRDD).count(). The count comes out to be 1, but there are many UIDs in tusers that are not in device - so the result is not correct. I would like to know the right way to do frame this query in SparkSQL. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-Nested-Query-error-tp17691p17705.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SparkSQL: Nested Query error
Hi, I am using Spark 1.1.0. I have the following SQL statement where I am trying to count the number of UIDs that are in the tusers table but not in the device table. val users_with_no_device = sql_cxt.sql(SELECT COUNT (u_uid) FROM tusers WHERE tusers.u_uid NOT IN (SELECT d_uid FROM device)) I am getting the following error: Exception in thread main java.lang.RuntimeException: [1.61] failure: string literal expected SELECT COUNT (u_uid) FROM tusers WHERE tusers.u_uid NOT IN (SELECT d_uid FROM device) I am not sure if every subquery has to be a string, so I tried to enclose the subquery as a string literal as follows: val users_with_no_device = sql_cxt.sql(SELECT COUNT (u_uid) FROM tusers WHERE tusers.u_uid NOT IN (SELECT d_uid FROM device)) But that resulted in a compilation error. What is the right way to frame the above query in Spark SQL? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-Nested-Query-error-tp17691.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming: Sentiment Analysis of Twitter streams
You are right. Creating the StreamingContext from the SparkContext instead of SparkConf helped. Thanks for the help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Sentiment-Analysis-of-Twitter-streams-tp16410p16520.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Getting the value from DStream[Int]
Hi, As a result of a reduction operation, the resultant value score is a DStream[Int] . How can I get the simple Int value? I tried score[0], and score._1, but neither worked and can't find a getValue() in the DStream API. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Getting-the-value-from-DStream-Int-tp16525.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Streaming: Sentiment Analysis of Twitter streams
Hi, I am trying to implement simple sentiment analysis of Twitter streams in Spark/Scala. I am getting an exception and it appears when I combine SparkContext with StreamingContext in the same program. When I read the positive and negative words using only SparkContext.textFile (without creating a StreamingContext) and analyze static text files, the program works. Likewise, when I just create the twitter stream using StreamingContext (and dont create a SparkContext to create the vocabulary), the program works. The exception seems to be appearing when I combine both SparkContext and StreamingContext in the same program and I am not sure if we are not allowed to have both simultaneously. All the examples in the streaming module contain only the StreamingContext. The error transcript and my code appear below. I would appreciate your guidance in fixing this error and the right way to read static files and streams in the same program or any pointers to relevant examples. Thanks. --Error transcript - Lost task 0.0 in stage 2.0 (TID 70, mesos4-dev.sccps.net): java.io.IOException: unexpected exception type java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538) java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) -- My code below -- object TweetSentiment { def main(args: Array[String]) { val filters = args val sparkConf = new SparkConf().setAppName(TweetSentiment) val sc = new SparkContext(sparkConf) // get the list of positive words val pos_list = sc.textFile(positive-words.txt) .filter(line = !line.isEmpty()) .collect() .toSet // get the list of negative words val neg_list = sc.textFile(negative-words.txt) .filter(line = !line.isEmpty()) .collect() .toSet // create twitter stream val ssc = new StreamingContext(sparkConf, Seconds(60)) val stream = TwitterUtils.createStream(ssc, None, filters) val tweets = stream.map(r = r.getText) tweets.print() // print tweet text ssc.start() ssc.awaitTermination() sc.stop() // I tried commenting this, but the exception still appeared. } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Sentiment-Analysis-of-Twitter-streams-tp16410.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: getting tweets for a specified handle
Thanks. I made the change and ran the code. But I dont get any tweets for my handle, although I do see the tweets when I search for it on twitter. Does Spark allow us to get the tweets from the past (say the last 100 tweets? tweets that appeared in the last 10 minutes)? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/getting-tweets-for-a-specified-handle-tp16085p16180.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
getting tweets for a specified handle
Hi, I am using Spark 1.1.0. Is there a way to get the complete tweets corresponding to a handle (for e.g. @Delta)? I tried using the following example that extracts just the hashtags and replaced the # with @ as follows. I need the complete tweet and not just the tags. // val hashTags = stream.flatMap(status = status.getText.split( ).filter(_.startsWith(#))) // replaced the above with the following: val tweets = stream.map(status = status.getText.filter(_.contains(@Delta))) But I get an error: value contains is not a member of Char I am trying to find out if there a better Spark API to get the tweets for a handle so that we dont have to do the filtering - Something along the lines of searchTwitter(topic, number_of_tags) API that is provided by the twitteR package in R would be appropriate. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/getting-tweets-for-a-specified-handle-tp16085.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Shuffle files
- We set ulimit to 50. But I still get the same too many open files warning. - I tried setting consolidateFiles to True, but that did not help either. I am using a Mesos cluster. Does Mesos have any limit on the number of open files? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-files-tp15185p15869.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Application details for failed and teminated jobs
Hi, Currently the history server provides application details for only the successfully completed jobs (where the APPLICATION_COMPLETE file is generated). However, (long-running) jobs that we terminate manually or failed jobs where the APPLICATION_COMPLETE may not be generated, dont show up on the history server page. They however do show up on the 4040 interface as long as they are running. Is it possible to save those logs and load them up on the history server (even when the APPLICATION_COMPLETE is not present)? This would allow us troubleshoot the failed and terminated jobs without holding up the cluster. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Application-details-for-failed-and-teminated-jobs-tp15627.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark SQL: ArrayIndexOutofBoundsException
Hi, I am trying to extract the number of distinct users from a file using Spark SQL, but I am getting the following error: ERROR Executor: Exception in task 1.0 in stage 8.0 (TID 15) java.lang.ArrayIndexOutOfBoundsException: 1 I am following the code in examples/sql/RDDRelation.scala. My code is as follows. The error is appearing when it executes the SQL statement. I am new to Spark SQL. I would like to know how I can fix this issue. thanks for your help. val sql_cxt = new SQLContext(sc) import sql_cxt._ // read the data using th e schema and create a schema RDD val tusers = sc.textFile(inp_file) .map(_.split(\t)) .map(p = TUser(p(0), p(1).trim.toInt)) // register the RDD as a table tusers.registerTempTable(tusers) // get the number of unique users val unique_count = sql_cxt.sql(SELECT COUNT (DISTINCT userid) FROM tusers).collect().head.getLong(0) println(unique_count) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-ArrayIndexOutofBoundsException-tp15639.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Fwd: Spark SQL: ArrayIndexOutofBoundsException
Thanks for the help. Yes, I did not realize that the first header line has a different separator. By the way, is there a way to drop the first line that contains the header? Something along the following lines: sc.textFile(inp_file) .drop(1) // or tail() to drop the header line .map // rest of the processing I could not find a drop() function or take the bottom (n) elements for RDD. Alternatively, a way to create the case class schema from the header line of the file and use the rest for the data would be useful - just as a suggestion. Currently I am just deleting this header line manually before processing it in Spark. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-ArrayIndexOutofBoundsException-tp15639p15642.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark.cleaner.ttl
Hi, I am using spark v 1.1.0. The default value of spark.cleaner.ttl is infinite as per the online docs. Since a lot of shuffle files are generated in /tmp/spark-local* and the disk is running out of space, we tested with a smaller value of ttl. However, even when job has completed and the timer expires, the files remain and instead of deleting, the timestamps of the files keep changing. How can we automatically delete these shuffle files, say after every 24 hours? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-cleaner-ttl-tp15574.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
processing large number of files
Hi, I am trying to compute the number of unique users from a year's worth of data. So there are about 300 files and each file is quite large (~GB). I first tried this without a loop by reading all the files in the directory using the glob pattern: sc.textFile(dir/*). But the tasks were stalling and I was getting a Too many open files warning, even though I increased the nofile limit to 500K. The number of shuffle tasks that were being created was more than 200K and they were all generating shuffle files. Setting consolidateFiles to true did not help. So now I am reading the files in a loop as shown in the code below. Now I dont run in to the Too many open files issue. But the countByKey is taking a really long time (more then 15 hours and still ongoing). It appears from the logs that this operation is happening on a single node. From the logs, I am not able to figure out why it is taking so long. Each node has 16 GB memory and the mesos cluster has 16 nodes. I have set spark.serializer to KryoSerializer. I am not running into any out of memory errors so far. Is there some way to improve the performance? Thanks. for (i - 1 to 300) { var f = file + i//name of the file val user_time = sc.textFile(f) .map(line = { val fields = line.split(\t) (fields(11), fields(6)) }) // extract (year-month, user_id) .distinct() .countByKey // group by with year as the key // now convert Map object to RDD in order to output results val ut_rdd = sc.parallelize(user_time.toSeq) // convert to array to extract the count. Need to find if there is an easier way to do this. var ar = ut_rdd.toArray() // aggregate the count for the year ucount = ucount + ar(0)._2 } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/processing-large-number-of-files-tp15429.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Shuffle files
Hi, I am using Spark 1.1.0 on a cluster. My job takes as input 30 files in a directory (I am using sc.textfile(dir/*) ) to read in the files. I am getting the following warning: WARN TaskSetManager: Lost task 99.0 in stage 1.0 (TID 99, mesos12-dev.sccps.net): java.io.FileNotFoundException: /tmp/spark-local-20140925215712-0319/12/shuffle_0_99_93138 (Too many open files) basically I think a lot of shuffle files are being created. 1) The tasks eventually fail and the job just hangs (after taking very long, more than an hour). If I read these 30 files in a for loop, the same job completes in a few minutes. However, I need to specify the files names, which is not convenient. I am assuming that sc.textfile(dir/*) creates a large RDD for all the 30 files. Is there a way to make the operation on this large RDD efficient so as to avoid creating too many shuffle files? 2) Also, I am finding that all the shuffle files for my other completed jobs are not being automatically deleted even after days. I thought that sc.stop() clears the intermediate files. Is there some way to programmatically delete these temp shuffle files upon job completion? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-files-tp15185.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
HdfsWordCount only counts some of the words
Hi, I tried out the HdfsWordCount program in the Streaming module on a cluster. Based on the output, I find that it counts only a few of the words. How can I have it count all the words in the text? I have only one text file in the directory. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/HdfsWordCount-only-counts-some-of-the-words-tp14929.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: HdfsWordCount only counts some of the words
I execute it as follows: $SPARK_HOME/bin/spark-submit --master master url --class org.apache.spark.examples.streaming.HdfsWordCount target/scala-2.10/spark_stream_examples-assembly-1.0.jar hdfsdir After I start the job, I add a new test file in hdfsdir. It is a large text file which I will not be able to copy here. But it probably has at least 100 distinct words. But the streaming output has only about 5-6 words along with their counts as follows. I then stop the job after some time. Time ... (word1, cnt1) (word2, cnt2) (word3, cnt3) (word4, cnt4) (word5, cnt5) Time ... Time ... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/HdfsWordCount-only-counts-some-of-the-words-tp14929p14967.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Streaming: HdfsWordCount does not print any output
Hi, I tried running the HdfsWordCount program in the streaming examples in Spark 1.1.0. I provided a directory in the distributed filesystem as input. This directory has one text file. However, the only thing that the program keeps printing is the time - but not the word count. I have not used the streaming module much, so wanted to find out how I can get the output. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-HdfsWordCount-does-not-print-any-output-tp14849.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Streaming: HdfsWordCount does not print any output
This issue is resolved. The file needs to be created after the program has started to execute. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-HdfsWordCount-does-not-print-any-output-tp14849p14852.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
mllib performance on mesos cluster
Hi, I have a program similar to the BinaryClassifier example that I am running using my data (which is fairly small). I run this for 100 iterations. I observed the following performance: Standalone mode cluster with 10 nodes (with Spark 1.0.2): 5 minutes Standalone mode cluster with 10 nodes (with Spark 1.1.0): 8.9 minutes Mesos cluster with 10 nodes (with Spark 1.1.0): 17 minutes 1) According to the documentation, Spark 1.1.0 has better performance. So I would like to understand why the runtime is longer on Spark 1.1.0. 2) Why is the performance on mesos significantly higher than in standalone mode? I just wanted to find out if any one else has observed poor performance for Mllib based programs on mesos cluster. I looked through the application detail logs and found that some of the scheduler delay values were higher on mesos compared to standalone mode (40 ms vs. 10 ms). Is the mesos scheduler slower? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/mllib-performance-on-mesos-cluster-tp14692.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Streaming compilation error: algebird not a member of package com.twitter
Hi, I am using the latest release Spark 1.1.0. I am trying to build the streaming examples (under examples/streaming) as a standalone project with the following streaming.sbt file. When I run sbt assembly, I get an error stating that object algebird is not a member of package com.twitter. I tried adding the dependency spark-streaming-algebird, but that was not recognized. What dependency should I be including for algebird? import AssemblyKeys._ assemblySettings name := spark_stream_examples version := 1.0 scalaVersion := 2.10.4 libraryDependencies += org.apache.spark %% spark-sql % 1.1.0 % provided libraryDependencies += org.apache.spark %% spark-streaming % 1.1.0 % provided libraryDependencies += org.apache.spark %% spark-streaming-twitter % 1.1.0 libraryDependencies += org.apache.spark %% spark-streaming-flume % 1.1.0 libraryDependencies += org.apache.spark %% spark-streaming-zeromq % 1.1.0 libraryDependencies += org.apache.spark %% spark-streaming-mqtt % 1.1.0 libraryDependencies += org.apache.spark %% spark-streaming-kafka % 1.1.0 //libraryDependencies += org.apache.spark %% spark-streaming-algebird % 1.1.0 resolvers += Akka Repository at http://repo.akka.io/releases/; -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-compilation-error-algebird-not-a-member-of-package-com-twitter-tp14709.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Unable to load app logs for MLLib programs in history server
Hi, The default log files for the Mllib examples use a rather long naming convention that includes special characters like parentheses and comma.For e.g. one of my log files is named binaryclassifier-with-params(input.txt,100,1.0,svm,l2,0.1)-1410566770032. When I click on the program on the history server page (at port 18080), to view the detailed application logs, the history server crashes and I need to restart it. I am using Spark 1.1 on a mesos cluster. I renamed the log file by removing the special characters and then it loads up correctly. I am not sure which program is creating the log files. Can it be changed so that the default log file naming convention does not include special characters? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-load-app-logs-for-MLLib-programs-in-history-server-tp14627.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark 1.1.0: Cannot load main class from JAR
Hi, I am using the Spark 1.1.0 version that was released yesterday. I recompiled my program to use the latest version using sbt assembly after modifying project.sbt to use the 1.1.0 version. The compilation goes through and the jar is built. When I run the jar using spark-submit, I get an error: Cannot load main class from JAR. This program was working with version 1.0.2. The class does have a main method. So far I have never had problems recompiling and running the jar, when I have upgraded to new versions. Is there anything different I need to do for 1.1.0 ? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-1-0-Cannot-load-main-class-from-JAR-tp14123.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark 1.1.0: Cannot load main class from JAR
This issue is resolved. Looks like in the new spark-submit, the jar path has to be at the end of the options. Earlier I could specify this path in any order on the command line. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-1-0-Cannot-load-main-class-from-JAR-tp14123p14124.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
History server: ERROR ReplayListenerBus: Exception in parsing Spark event log
Hi, I am using Spark 1.0.2 on a mesos cluster. After I run my job, when I try to look at the detailed application stats using a history server@18080, the stats don't show up for some of the jobs even though the job completed successfully and the event logs are written to the log folder. The log from the history server execution is attached below - looks like it is encountering some parsing error when reading the EVENT_LOG file ( I have not modified this file). Basically the line that says Malformed line seems to be truncating the first path (instead of amd64, it shows up as a d64). Does the history server have any String buffer limitations that would be causing this problem? Also, I want to point out that this problem does not happen all the time - during some runs the app details do show up. However this is quite unpredictable. The same job when I ran using Spark 1.0.1 in standalone mode (i.e without using a history server), showed up on the application details page. I am not sure if this is a problem with the history server or specifically with version 1.0.2. Is it possible to fix this problem, as I would like to use the application details? thanks 14/09/11 20:50:55 ERROR ReplayListenerBus: Exception in parsing Spark event log file:/mapr/applogs_spark_mesos/spark_test-1410468489529/EVENT_LOG_1 com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'd64': was expecting at [Source: java.io.StringReader@2d51a56a; line: 1, column: 4] at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1524) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:557) at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:2042) at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:1412) at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:679) at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3024) at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:2971) at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2091) at org.json4s.jackson.JsonMethods$class.parse(JsonMethods.scala:19) at org.json4s.jackson.JsonMethods$.parse(JsonMethods.scala:44) at org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:71) at org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:69) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:69) at org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:55) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:55) at org.apache.spark.deploy.history.HistoryServer.org$apache$spark$deploy$history$HistoryServer$$renderSparkUI(HistoryServer.scala:182) at org.apache.spark.deploy.history.HistoryServer$$anonfun$checkForLogs$3.apply(HistoryServer.scala:149) at org.apache.spark.deploy.history.HistoryServer$$anonfun$checkForLogs$3.apply(HistoryServer.scala:146) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.deploy.history.HistoryServer.checkForLogs(HistoryServer.scala:146) at org.apache.spark.deploy.history.HistoryServer$$anon$1$$anonfun$run$1.apply$mcV$sp(HistoryServer.scala:77) at org.apache.spark.deploy.history.HistoryServer$$anon$1$$anonfun$run$1.apply(HistoryServer.scala:74) at org.apache.spark.deploy.history.HistoryServer$$anon$1$$anonfun$run$1.apply(HistoryServer.scala:74) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160) at org.apache.spark.deploy.history.HistoryServer$$anon$1.run(HistoryServer.scala:73) ReplayListenerBus: Malformed line: d64/jre/lib/jsse.jar:/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/jce.jar:/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/charsets.jar:/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/rhino.jar:/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/jfr.jar:/usr/lib/jvm/java-7-openjdk-amd64/jre/classes,file.encoding:ISO-8859-1,user.timezone:Etc/UTC,java.specification.vendor:Oracle Corporation,sun.java.launcher:SUN_STANDARD,os.version:3.13.0-32-generic,sun.os.patch.level:unknown,java.vm.specification.vendor:Oracle
Spark Web UI in Mesos mode
Hi, I am running Spark 1.0.2 on a cluster in Mesos mode. I am not able to access the Spark master Web UI at port 8080 but am able to access it at port 5050. Is 5050 the standard port? Also, in the the standalone mode, there is a link to the Application detail UI directly from the master UI. I dont see that link in the master UI page in Mesos mode. Instead I have to explicitly go to port 18080 to access the application detail. I have set up the history server. Is there some way to access the application detail link in Mesos mode directly from the master UI page (like the standalone UI)? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Web-UI-in-Mesos-mode-tp13746.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
mllib performance on cluster
Hi, I evaluated the runtime performance of some of the MLlib classification algorithms on a local machine and a cluster with 10 nodes. I used standalone mode and Spark 1.0.1 in both cases. Here are the results for the total runtime: Local Cluster Logistic regression 138 sec 336 sec SVM 138 sec 336 sec Decision tree 50 sec 132 sec My dataset is quite small and my programs are very similar to the mllib examples that are included in the Spark distribution. Why is the runtime on the cluster significantly higher (almost 3 times) than that on the local machine even though the former uses more memory and more nodes? Is it because of the communication overhead on the cluster? I would like to know if there is something I need to be doing to optimize the performance on the cluster or if others have also been getting similar results. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/mllib-performance-on-cluster-tp13290.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: mllib performance on cluster
NUm Iterations: For LR and SVM, I am using the default value of 100. All the other parameters also I am using the default values. I am pretty much reusing the code from BinaryClassification.scala. For Decision Tree, I dont see any parameter for number of iterations inthe example code, so I did not specify any. I am running each algorithm on my dataset 100 times and taking the average runtime. MY dataset is very dense (hardly any zeros). The labels are 1 and 0. I did not explicity specify the number of partitions. I did not see any code for this in the MLLib examples for BinaryClassification and DecisionTree. hardware: local: intel core i7 with 12 cores and 7.8 GB of which I am allocating 4GB for the executor memory. According to the application detail stats in the spark UI, the total memory consumed is around 1.5 GB. cluster: 10 nodes with a total of 320 cores, with 16GB per node. According to the application detail stats in the spark UI, the total memory consumed is around 95.5 GB. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/mllib-performance-on-cluster-tp13290p13299.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: mllib performance on cluster
The dataset is quite small : 5.6 KB. It has 200 rows and 3 features, and 1 column of labels. From this dataset, I split 80% for training set and 20% for test set. The features are integer counts and labels are binary (1/0). thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/mllib-performance-on-cluster-tp13290p13311.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Too many open files
Hi, I am having the same problem reported by Michael. I am trying to open 30 files. ulimit -n shows the limit is 1024. So I am not sure why the program is failing with Too many open files error. The total size of all the 30 files is 230 GB. I am running the job on a cluster with 10 nodes, each having 16 GB. The error appears to be happening at the distinct() stage. Here is my program. In the following code, are all the 10 nodes trying to open all of the 30 files or are the files distributed among the 30 nodes? val baseFile = /mapr/mapr_dir/files_2013apr* valx = sc.textFile(baseFile)).map { line = val fields = line.split(\t) (fields(11), fields(6)) }.distinct().countByKey() val xrdd = sc.parallelize(x.toSeq) xrdd.saveAsTextFile(...) Instead of using the glob *, I guess I can try using a for loop to read the files one by one if that helps, but not sure if there is a more efficient solution. The following is the error transcript: Job aborted due to stage failure: Task 1.0:201 failed 4 times, most recent failure: Exception failure in TID 902 on host 192.168.13.11: java.io.FileNotFoundException: /tmp/spark-local-20140829131200-0bb7/08/shuffle_0_201_999 (Too many open files) java.io.FileOutputStream.open(Native Method) java.io.FileOutputStream.init(FileOutputStream.java:221) org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:116) org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:177) org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161) org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:158) scala.collection.Iterator$class.foreach(Iterator.scala:727) org.apache.spark.util.collection.AppendOnlyMap$$anon$1.foreach(AppendOnlyMap.scala:159) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) org.apache.spark.scheduler.Task.run(Task.scala:51) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744) Driver stacktrace: -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Too-many-open-files-tp1464p13144.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark webUI - application details page
I was able to recently solve this problem for standalone mode. For this mode, I did not use a history server. Instead, I set spark.eventLog.dir (in conf/spark-defaults.conf) to a directory in hdfs (basically this directory should be in a place that is writable by the master and accessible globally to all the nodes). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-webUI-application-details-page-tp3490p13055.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: OutofMemoryError when generating output
Hi, Thanks for the response. I tried to use countByKey. But I am not able to write the output to console or to a file. Neither collect() nor saveAsTextFile() work for the Map object that is generated after countByKey(). valx = sc.textFile(baseFile)).map { line = val fields = line.split(\t) (fields(11), fields(6)) // extract (month, user_id) }.distinct().countByKey() x.saveAsTextFile(...) // does not work. generates an error that saveAstextFile is not defined for Map object Is there a way to convert the Map object to an object that I can output to console and to a file? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/OutofMemoryError-when-generating-output-tp12847p13056.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Memory statistics in the Application detail UI
Hi, I am using a cluster where each node has 16GB (this is the executor memory). After I complete an MLlib job, the executor tab shows the following: Memory: 142.6 KB Used (95.5 GB Total) and individual worker nodes have the Memory Used values as 17.3 KB / 8.6 GB (this is different for different nodes). What does the second number signify (i.e. 8.6 GB and 95.5 GB)? If 17.3 KB was used out of the total memory of the node, should it not be 17.3 KB/16 GB? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Memory-statistics-in-the-Application-detail-UI-tp13082.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Memory statistics in the Application detail UI
Hi, Thanks for the responses. I understand that the second values in the Memory Used column for the executors add up to 95.5 GB and the first values add up to 17.3 KB. If 95.5 GB is the memory used to store the RDDs, then what is 17.3 KB ? is that the memory used for shuffling operations? For non MLlib applications I get 0.0 for the first number - i.e memory used is 0.0 (95.5 GB Total). Is the total memory used the sum of the two numbers or is the first number included in the second number (i.e is 17.3 KB included in the 95.5 GB)? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Memory-statistics-in-the-Application-detail-UI-tp13082p13095.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark webUI - application details page
I have already tried setting the history server and accessing it on master-url:18080 as per the link. But the page does not list any completed applications. As I mentioned in my previous mail, I am running Spark in standalone mode on the cluster (as well as on my local machine). According to the link, it appears that the history server is required only in mesos or yarn mode, not in standalone mode. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-webUI-application-details-page-tp3490p12834.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
OutofMemoryError when generating output
Hi, I have the following piece of code that I am running on a cluster with 10 nodes with 2GB memory per node. The tasks seem to complete, but at the point where it is generating output (saveAsTextFile), the program freezes after some time and reports an out of memory error (error transcript attached below). I also tried using collect() and printing the output to console instead of a file, but got the same error. The program reads some logs for a month and extracts the number of unique users during the month. The reduced output is not very large, so not sure why the memory error occurs. I would appreciate any help in fixing this memory error to get the output. Thanks. def main (args: Array[String]) { val conf = new SparkConf().setAppName(App) val sc = new SparkContext(conf) // get the number of users per month val user_time = sc.union(sc.textFile(baseFile)) .map(line = { val fields = line.split(\t) (fields(11), fields(6)) }) // extract (month, user_id) .groupByKey // group by month as the key .map(g= (g._1, g._2.toSet.size)) // get the unique id count per month // .collect() // user_time.foreach(f = println(f)) user_time.map(f = %s, %s.format(f._1, f._2)).saveAsTextFile(app_output) sc.stop() } 14/08/26 15:21:15 WARN TaskSetManager: Loss was due to java.lang.OutOfMemoryError java.lang.OutOfMemoryError: GC overhead limit exceeded at org.apache.spark.util.collection.ExternalAppendOnlyMap.insert(ExternalAppendOnlyMap.scala:121) at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:60) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$4.apply(PairRDDFunctions.scala:107) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$4.apply(PairRDDFunctions.scala:106) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/OutofMemoryError-when-generating-output-tp12847.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark webUI - application details page
Hi, I am able to access the Application details web page from the master UI page when I run Spark in standalone mode on my local machine. However, I am not able to access it when I run Spark on our private cluster. The Spark master runs on one of the nodes in the cluster. I am able to access the spark master UI at spark://master-url:8080. It shows the listing of all the running and completed apps. When I click on the completed app, and access the Application details link, the link points to: master-url/app/?appId=app-idvalue When I view the page source to view the html source, the href portion is blank (). However, on my local machine, when I click the Application detail link for a completed app, it correctly points to master-url/history/app-id and when I view the page's html source, the href portion points to /history/app-id On the cluster, I have set spark.eventLog.enabled to true in $SPARK_HOME/conf/spark-defaults.conf on the master node as well as all the slave nodes. I am using spark 1.0.1 on the cluster. I am not sure why I am able to access the application details for completed apps when the app runs on my local machine but not for the apps that run on our cluster, although in both cases I am using spark 1.0.1 in standalone mode. Do I need to do any additional configuration to enable this history on the cluster? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-webUI-application-details-page-tp3490p12792.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Processing multiple files in parallel
Without the sc.union, my program crashes with the following error: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Master removed our application: FAILED at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Processing-multiple-files-in-parallel-tp12336p12428.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Extracting unique elements of an ArrayBuffer
Hi, I have a piece of code in which the result of a groupByKey operation is as follows: (2013-04, ArrayBuffer(s1, s2, s3, s1, s2, s4)) The first element is a String value representing a date and the ArrayBuffer consists of (non-unique) strings. I want to extract the unique elements of the ArrayBuffer. So I am expecting the result to be: (2013-04, ArrayBuffer(s1, s2, s3, s4)) I tried the following: .groupByKey .map(g = (g._1, g,_2.distinct) But I get the following runtime error: value distinct is not a member of Iterable[String] [error].map(g= (g._1, g._2.distinct)) I also tried g._2.distinct(), but got the same error. I looked at the Scala ArrayBuffer documentation and it supports distinct() and count() operations. I am using Spark 1.0.1 and scala 2.10.4. I would like to know how to extract the unique elements of the ArrayBuffer above. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Extracting-unique-elements-of-an-ArrayBuffer-tp12320.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Processing multiple files in parallel
Hi, I have a piece of code that reads all the (csv) files in a folder. For each file, it parses each line, extracts the first 2 elements from each row of the file, groups the tuple by the key and finally outputs the number of unique values for each key. val conf = new SparkConf().setAppName(App) val sc = new SparkContext(conf) val user_time = sc.union(sc.textFile(/directory/*))// union of all files in the directory .map(line = { val fields = line.split(,) (fields(1), fields(0)) // extract first 2 elements }) .groupByKey // group by timestamp .map(g= (g._1, g._2.toSet.size)) // get the number of unique ids per timestamp I have a lot of files in the directory (several hundreds). The program takes a long time. I am not sure if the union operation is preventing the files from being processed in parallel. Is there a better way to parallelize the above code ? For example, the first two operations (reading each file and extracting the first 2 columns from each file) can be done in parallel, but I am not sure if that is how Spark schedules the above code. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Processing-multiple-files-in-parallel-tp12336.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark webUI - application details page
Hi, Ok, I was specifying --master local. I changed that to --master spark://localhostname:7077 and am now able to see the completed applications. It provides summary stats about runtime and memory usage, which is sufficient for me at this time. However it doesn't seem to archive the info in the application detail UI that lists detailed stats about the completed stages of the application - which would be useful for identifying bottleneck steps in a large application. I guess we need to capture the application detail UI screen before the app run completes or find a way to extract this info by parsing the Json log file in /tmp/spark-events. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-webUI-application-details-page-tp3490p12187.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark webUI - application details page
Hi, I am using Spark 1.0.1. But I am still not able to see the stats for completed apps on port 4040 - only for running apps. Is this feature supported or is there a way to log this info to some file? I am interested in stats about the total # of executors, total runtime, and total memory used by my Spark program. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-webUI-application-details-page-tp3490p12144.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark webUI - application details page
I set spark.eventLog.enabled to true in $SPARK_HOME/conf/spark-defaults.conf and also configured the logging to a file as well as console in log4j.properties. But I am not able to get the log of the statistics in a file. On the console there is a lot of log messages along with the stats - so hard to separate the stats. I prefer the online format that appears on localhost:4040 - it is more clear. I am running the job in standalone mode on my local machine. is there some way to recreate the stats online after the job has completed? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-webUI-application-details-page-tp3490p12156.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [MLLib]:choosing the Loss function
Hi, Thanks for the reference to the LBFGS optimizer. I tried to use the LBFGS optimizer, but I am not able to pass it as an input to the LogisticRegression model for binary classification. After studying the code in mllib/classification/LogisticRegression.scala, it appears that the only implementation of LogisticRegression uses GradientDescent as a fixed optimizer. In other words, I dont see a setOptimizer() function that I can use to change the optimizer to LBFGS. I tried to follow the code in https://github.com/dbtsai/spark-lbfgs-benchmark/blob/master/src/main/scala/org/apache/spark/mllib/benchmark/BinaryLogisticRegression.scala that makes use of LBFGS, but it is not clear to me where the LogisticRegression model with LBFGS is being returned that I can use for the classification of the test dataset. If some one has sample code that uses LogisticRegression with LBFGS instead of gradientDescent as the optimization algorithm, it would be helpful if you can post it. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLLib-choosing-the-Loss-function-tp11738p11913.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: scopt.OptionParser
i was using sbt package when I got this error. Then I switched to using sbt assembly and that solved the issue. To run sbt assembly, you need to have a file called plugins.sbt in the project root/project directory and it has the following line: addSbtPlugin(com.eed3si9n % sbt-assembly % 0.11.2) This is in addition to the project name.sbt file I mentioned in the earlier mail. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/scopt-OptionParser-tp8436p11800.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Naive Bayes parameters
I followed the example in examples/src/main/scala/org/apache/spark/examples/mllib/SparseNaiveBayes.scala. IN this file Params is defined as follows: case class Params ( input: String = null, minPartitions: Int = 0, numFeatures: Int = -1, lambda: Double = 1.0) In the main function, the option parser accepts numFeatures as an option. But I looked at the code in more detail just now and found the following: val model = new NaiveBayes().setLambda(params.lambda).run(training) So looks like at the time of creation only the lambda parameter is used. Perhaps the example needs to be cleaned up during the next release. I am currently using Spark version 1.0.1. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Naive-Bayes-parameters-tp11592p11623.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Regularization parameters
Hi, I am following the code in examples/src/main/scala/org/apache/spark/examples/mllib/BinaryClassification.scala For setting the parameters and parsing the command line options, I am just reusing that code.Params is defined as follows. case class Params( input: String = null, numIterations: Int = 100, stepSize: Double = 1.0, algorithm: Algorithm = LR, regType: RegType = L2, regParam: Double = 0.1) I use the command line option --regType to choose L1 or L2, and --regParam to set it to 0.0. The option parser code in the example above parses the options and creates the LogisticRegression object. It calls setRegParam(regParam) to set the regularization parameter and calls the updater to set the regType. To run LR, I am again using the code in the example above (algorithm.run(training).clearThreshold()) The code in the above example computes AUC. To compute accuracy of the test data classification, I map the class to 0 if prediction 0.5, else it is mapped to class 1. THen I compare the predictions with the corresponding labels and the number of matches is given by correctCount. val accuracy = correctCount.toDouble / predictionAndLabel.count thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Regularization-parameters-tp11601p11627.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Regularization parameters
What is the definition of regParam and what is the range of values it is allowed to take? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Regularization-parameters-tp11601p11737.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
[MLLib]:choosing the Loss function
Hi, According to the MLLib guide, there seems to be support for different loss functions. But I could not find a command line parameter to choose the loss function but only found regType to choose the regularization. Does MLLib support a parameter to choose the loss function? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLLib-choosing-the-Loss-function-tp11738.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Naive Bayes parameters
1) How is the minPartitions parameter in NaiveBayes example used? What is the default value? 2) Why is the numFeatures specified as a parameter? Can this not be obtained from the data? This parameter is not specified for the other MLlib algorithms. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Naive-Bayes-parameters-tp11592.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Extracting an element from the feature vector in LabeledPoint
I am using 1.0.1. It does not matter to me whether it is the first or second element. I would like to know how to extract the i-th element in the feature vector (not the label). data.features(i) gives the following error: method apply in trait Vector cannot be accessed in org.apache.spark.mllib.linalg.Vector -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Extracting-an-element-from-the-feature-vector-in-LabeledPoint-tp0p11181.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
correct upgrade process
Hi, I upgraded to 1.0.1 from 1.0 a couple of weeks ago and have been able to use some of the features advertised in 1.0.1. However, I get some compilation errors in some cases and based on user response, these errors have been addressed in the 1.0.1 version and so I should not be getting these errors. So I want to make sure I followed the correct upgrade process as below (I am running Spark on single machine in standalone mode - so no cluster deployment): - set SPARK_HOME to the new version - run sbt assembly in SPARK_HOME to build the new Spark jars - in the project sbt file point the libraryDependencies for spark-core and other libraries to the 1.0.1 version and run sbt assembly to build the project jar. Is there anything else I need to do to ensure that no old jars are being used? For example do I need to manually delete any old jars? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/correct-upgrade-process-tp11194.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: correct upgrade process
Hi, So I again ran sbt clean followed by all of the steps listed above to rebuild the jars after cleaning. My compilation error still persists. Specifically, I am trying to extract an element from the feature vector that is part of a LabeledPoint as follows: data.features(i) This gives the following error: method apply in trait Vector cannot be accessed in org.apache.spark.mllib.linalg.Vector Based on a related post, this bug has been fixed in version 1.0.1 So not sure why I am still getting this error. I noticed that sbt clean only removes the classes and jar files. However, there is a .ivy2 directory where things get downloaded. That does not seem to get cleaned and I am not sure if there are any old dependencies from here that are being used when sbt assembly is run. So do I need to manually remove this directory before running sbt clean and rebuilding the jars for the new version? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/correct-upgrade-process-tp11194p11213.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: evaluating classification accuracy
I am using 1.0.1 and I am running locally (I am not providing any master URL). But the zip() does not produce the correct count as I mentioned above. So not sure if the issue has been fixed in 1.0.1. However, instead of using zip, I am now using the code that Sean has mentioned and am getting the correct count. So the issue is resolved. thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/evaluating-classification-accuracy-tp10822p10980.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Decision Tree requires regression LabeledPoint
I have also used labeledPoint or libSVM format (for sparse data) for DecisionTree. When I had categorical labels (not features), I mapped the categories to numerical data as part of the data transformation step (i.e. before creating the LabeledPoint). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Decision-Tree-requires-regression-LabeledPoint-tp10953p10981.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
evaluating classification accuracy
Hi, In order to evaluate the ML classification accuracy, I am zipping up the prediction and test labels as follows and then comparing the pairs in predictionAndLabel: val prediction = model.predict(test.map(_.features)) val predictionAndLabel = prediction.zip(test.map(_.label)) However, I am finding that predictionAndLabel.count() has fewer elements than test.count(). For example, my test vector has 43 elements, but predictionAndLabel has only 38 pairs. I have tried other samples and always get fewer elements after zipping. Does zipping the two vectors cause any compression? or is this because of the distributed nature of the algorithm (I am running it in local mode on a single machine). In order to get the correct accuracy, I need the above comparison to be done by a single node on the entire test data (my data is quite small). How can I ensure that? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/evaluating-classification-accuracy-tp10822.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Decision tree classifier in MLlib
yes, the output is continuous. So I used a threshold to get binary labels. If prediction threshold, then class is 0 else 1. I use this binary label to then compute the accuracy. Even with this binary transformation, the accuracy with decision tree model is low compared to LR or SVM (for the specific dataset I used). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Decision-tree-classifier-in-MLlib-tp9457p10678.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Kmeans: set initial centers explicitly
Hi, The mllib.clustering.kmeans implementation supports a random or parallel initialization mode to pick the initial centers. is there a way to specify the initial centers explictly? It would be useful to have a setCenters() method where we can explicitly specify the initial centers. (For e.g. R allows us to specify the initial centers.) thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kmeans-set-initial-centers-explicitly-tp10609.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: jsonRDD: NoSuchMethodError
I am running this in standalone mode on a single machine. I built the spark jar from scratch (sbt assembly) and then included that in my application (the same process I have done for earlier versions). thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/jsonRDD-NoSuchMethodError-tp9688p9735.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: jsonRDD: NoSuchMethodError
The problem is resolved. Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/jsonRDD-NoSuchMethodError-tp9688p9742.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Help with Json array parsing
Hi, I have a json file where the object definition in each line includes an array component obj that contains 0 or more elements as shown by the example below. {name: 16287e9cdf, obj: [{min: 50,max: 59 }, {min: 20, max: 29}]}, {name: 17087e9cdf, obj: [{min: 30,max: 39 }, {min: 10, max: 19}, {min: 60, max: 69}]}, {name: 18287e0cdf} I need to extract all the min values from the JSON definition in each line for further processing. I used the following Spark code to parse the file and extract the min fields, but I am getting a runtime error. I would like to know what is the right way to extract the 0 or more min values from the array above. val inp = sc.textFile(args(0)) val res = inp.map(line = { parse(line) }) .map(json = { implicit lazy val formats = org.json4s.DefaultFormats val name = (json \ name).extract[String] val min_vals = (json \ obj \ min).extract[Array[Int]] (name, min_vals) } ) Thanks for your help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Help-with-Json-array-parsing-tp9807.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Help with Json array parsing
To add to my previous post, the error at runtime is teh following: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0:0 failed 1 times, most recent failure: Exception failure in TID 0 on host localhost: org.json4s.package$MappingException: Expected collection but got JInt(20) for root JInt(20) and mapping int[][int, int] thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Help-with-Json-array-parsing-tp9807p9820.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
jsonRDD: NoSuchMethodError
Hi, I am using Spark 1.0.1. I am using the following piece of code to parse a json file. It is based on the code snippet in the SparkSQL programming guide. However, the compiler outputs an error stating: java.lang.NoSuchMethodError: org.apache.spark.sql.SQLContext.jsonRDD(Lorg/apache/spark/rdd/RDD;)Lorg/apache/spark/sql/SchemaRDD; I get a similar error for jsonFile() as well. I have included the spark-sql 1.0.1 jar when building my program using sbt. What is the right library to import for jsonRDD and jsonFile? thanks import org.apache.spark._ import org.apache.spark.sql._ import org.apache.spark.sql.json object SQLExample{ def main(args : Array[String]) { val sparkConf = new SparkConf().setAppName(JsonExample) val sc = new SparkContext(sparkConf) val sqlc = new org.apache.spark.sql.SQLContext(sc) val jrdd = sc.textFile(args(0)).filter(r= r.trim != ) val data = sqlc.jsonRDD(jrdd) data.printSchema() } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/jsonRDD-NoSuchMethodError-tp9688.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Streaming training@ Spark Summit 2014
Hi, I tried out the streaming program on the Spark training web page. I created a Twitter app as per the instructions (pointing to http://www.twitter.com). When I run the program, my credentials get printed out correctly but thereafter, my program just keeps waiting. It does not print out the hashtag count etc. My code appears below (essentially same as what is on the training web page). I would like to know why I am not able to get a continuous stream and the hashtag count. thanks // relevant code snippet TutorialHelper.configureTwitterCredentials(apiKey,apiSecret,accessToken,accessTokenSecret) val ssc = new StreamingContext(new SparkConf(), Seconds(1)) val tweets = TwitterUtils.createStream(ssc, None) val statuses = tweets.map(status = status.getText()) statuses.print() ssc.checkpoint(checkpointDir) val words = statuses.flatMap(status = status.split( )) val hashtags = words.filter(word = word.startsWith(#)) hashtags.print() val counts = hashtags.map(tag = (tag, 1)) .reduceByKeyAndWindow(_ + _, _ - _, Seconds(60 * 5), Seconds(1)) counts.print() val sortedCounts = counts.map { case(tag, count) = (count, tag) } .transform(rdd = rdd.sortByKey(false)) sortedCounts.foreach(rdd = println(\nTop 10 hashtags:\n + rdd.take(10).mkString(\n))) ssc.start() ssc.awaitTermination() //end code snippet -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-training-Spark-Summit-2014-tp9465.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Streaming training@ Spark Summit 2014
I dont get any exceptions or error messages. I tried it both with and without VPN and had the same outcome. But I can try again without VPN later today and report back. thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-training-Spark-Summit-2014-tp9465p9477.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Streaming training@ Spark Summit 2014
I dont have a proxy server. thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-training-Spark-Summit-2014-tp9465p9481.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
ML classifier and data format for dataset with variable number of features
Hi, I need to perform binary classification on an image dataset. Each image is a data point described by a Json object. The feature set for each image is a set of feature vectors, each feature vector corresponding to a distinct object in the image. For example, if an image has 5 objects, its feature set will have 5 feature vectors, whereas an image that has 3 objects will have a feature set consisting of 3 feature vectors. So the number of feature vectors may be different for different images, although each feature vector has the same number of attributes. The classification depends on the features of the individual objects, so I cannot aggregate them all into a flat vector. I have looked through the Mllib examples and it appears that the libSVM data format and the LabeledData format that Mllib uses, require all the points to have the same number of features and they read in a flat feature vector. I would like to know if any of the Mllib supervised learning classifiers can be used with json data format and whether they can be used to classify points with different number of features as described above. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ML-classifier-and-data-format-for-dataset-with-variable-number-of-features-tp9486.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
incorrect labels being read by MLUtils.loadLabeledData()
Hi, I have a csv data file, which I have organized in the following format to be read as a LabeledPoint(following the example in mllib/data/sample_tree_data.csv): 1,5.1,3.5,1.4,0.2 1,4.9,3,1.4,0.2 1,4.7,3.2,1.3,0.2 1,4.6,3.1,1.5,0.2 The first column is the binary label (1 or 0) and the remaining columns are features. I am using the Logistic Regression Classifier in MLLib to create a model based on the training data and predict the (binary) class of the test data. I use MLUtils.loadLabeledData to read the data file. My prediction accuracy is quite low (compared to the results I got for the same data from R), So I tried to debug, by first verifying that the LabeledData is being read correctly. I find that some of the labels are not read correctly. For example, the first 40 points of the training data have a class of 1, whereas the training data read by loadLabeledData has label 0 for point 12 and point 14. I would like to know if this is because of the distributed algorithm that MLLib uses or if there is something wrong with the format I have above. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/incorrect-labels-being-read-by-MLUtils-loadLabeledData-tp9356.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
scopt.OptionParser
Hi, I tried to develop some code to use Logistic Regression, following the code in BinaryClassification.scala in examples/mllib. My code compiles, but at runtime complains that scopt/OptionParser class cannot be found. I have the following import statement in my code: import scopt.OptionParser My sbt file contains the following dependencies: scalaVersion := 2.10.4 libraryDependencies += org.apache.spark %% spark-core % 1.0.0 libraryDependencies += org.apache.spark %% spark-mllib % 1.0.0 libraryDependencies += com.github.scopt %% scopt % 3.2.0 resolvers += Akka Repository at http://repo.akka.io/releases/; Is there anything else I need to do to include the OptionParser? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/scopt-OptionParser-tp8436.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Error when running unit tests
I am using Spark 1.0.0. I am able to successfully run sbt package. However, when I run sbt test or sbt test-only class, I get the following error: [error] error while loading root, zip file is empty scala.reflect.internal.MissingRequirementError: object scala.runtime in compiler mirror not found. The stacktrace seems to indicate that the error is originating from the scala runtime and not my program. I tried sbt-0.13.5 as well as sbt-0.13.2. I would like to know how to fix this error. Thanks for your help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-running-unit-tests-tp8149.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Unit test failure: Address already in use
Hi, I have 3 unit tests (independent of each other) in the /src/test/scala folder. When I run each of them individually using: sbt test-only test, all the 3 pass the test. But when I run them all using sbt test, then they fail with the warning below. I am wondering if the binding exception results in failure to run the job, thereby causing the failure. If so, what can I do to address this binding exception? I am running these tests locally on a standalone machine (i.e. SparkContext(local, test)). 14/06/17 13:42:48 WARN component.AbstractLifeCycle: FAILED org.eclipse.jetty.server.Server@3487b78d: java.net.BindException: Address already in use java.net.BindException: Address already in use at sun.nio.ch.Net.bind0(Native Method) at sun.nio.ch.Net.bind(Net.java:174) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:139) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:77) thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Unit-test-failure-Address-already-in-use-tp7771.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Set comparison
Hi, I have a Spark method that returns RDD[String], which I am converting to a set and then comparing it to the expected output as shown in the following code. 1. val expected_res = Set(ID1, ID2, ID3) // expected output 2. val result:RDD[String] = getData(input) //method returns RDD[String] 3. val set_val = result.collect().toSet // convert to set. 4. println(set_val) // prints: Set(ID1, ID2, ID3) 5. println(expected_res)// prints: Set(ID1, ID2, ID3) // verify output 6. if( set_val == expected_res) 7.println(true) // this does not get printed The value returned by the method is almost same as expected output, but the verification is failing. I am not sure why the expected_res in Line 5 does not print the quotes even though Line 1 has them. Could that be the reason the comparison is failing? What is the right way to do the above comparison? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Set-comparison-tp7696.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Set comparison
In Line 1, I have expected_res as a set of strings with quotes. So I thought it would include the quotes during comparison. Anyway I modified expected_res = Set(\ID1\, \ID2\, \ID3\) and that seems to work. thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Set-comparison-tp7696p7699.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: specifying fields for join()
I used groupBy to create the keys for both RDDs. Then I did the join. I think though it be useful if in the future Spark could allows us to specify the fields on which to join, even when the keys are different. Scalding allows this feature. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/specifying-fields-for-join-tp7528p7591.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
convert List to RDD
Hi, I have a List[ (String, Int, Int) ] that I would liek to convert to an RDD. I tried to use sc.parallelize and sc.makeRDD, but in each case the original order of items in the List gets modified. Is there a simple way to convert a List to RDD without using SparkContext? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/convert-List-to-RDD-tp7606.html Sent from the Apache Spark User List mailing list archive at Nabble.com.