Spark preserve timestamp

2018-01-12 Thread sk skk
Do we have option to say to spark to preserve time stamp while creating
struct.

Regards,
Sudhir


Timestamp changing while writing

2018-01-11 Thread sk skk
Hello,

I am using createDataframe and passing java row rdd and schema . But it is
changing the time value when I write that data frame to a parquet file.

Can any one help .

Thank you,
Sudhir


Re: Custom line/record delimiter

2018-01-01 Thread sk skk
Thanks for the update Kwon.

Regards,


On Mon, Jan 1, 2018 at 7:54 PM Hyukjin Kwon <gurwls...@gmail.com> wrote:

> Hi,
>
>
> There's a PR - https://github.com/apache/spark/pull/18581 and JIRA
> - SPARK-21289
>
> Alternatively, you could check out multiLine option for CSV and see if
> applicable.
>
>
> Thanks.
>
>
> 2017-12-30 2:19 GMT+09:00 sk skk <spark.s...@gmail.com>:
>
>> Hi,
>>
>> Do we have an option to write a csv or text file with a custom
>> record/line separator through spark ?
>>
>> I could not find any ref on the api. I have a issue while loading data
>> into a warehouse as one of the column on csv have a new line character and
>> the warehouse is not letting to escape that new line character .
>>
>> Thank you ,
>> Sk
>>
>
>


Custom line/record delimiter

2017-12-29 Thread sk skk
Hi,

Do we have an option to write a csv or text file with a custom record/line
separator through spark ?

I could not find any ref on the api. I have a issue while loading data into
a warehouse as one of the column on csv have a new line character and the
warehouse is not letting to escape that new line character .

Thank you ,
Sk


Sparkcontext on udf

2017-10-18 Thread sk skk
I have registered a udf with sqlcontext , I am trying to read another
parquet using sqlcontext under same udf it’s throwing null pointer
exception .

Any help how to access sqlcontext inside a udf ?

Regards,
Sk


Appending column to a parquet

2017-10-17 Thread sk skk
Hi ,

I have two parquet files with different schemas based on unique I have to
fetch one column value and append to all rows on the parquet file .

I tried join but I guess due to diff schema it’s not working . I can use
withcolumn but can we get single value of a column and assign it to a
literal as if I register it as a temp table and fetch that column value and
assigning it to a string it is return a row to string schema and not
getting a literal .

Is there a better way to handle this or how to get a literal value from
temporary table .


Thank you ,
Sk


Java Rdd of String to dataframe

2017-10-11 Thread sk skk
Can we create a dataframe from a Java pair rdd of String . I don’t have a
schema as it will be a dynamic Json. I gave encoders.string class.

Any help is appreciated !!

Thanks,
SK


how to fetch schema froma dynamic nested JSON

2017-08-12 Thread sk skk
Hi,

i have a requirement where i have to read a dynamic nested JSON for schema
and need to check the data quality based on the schema.

i.e i get the details from a JSON i.e say column 1 should be string, length
kinda... this is dynamic json and nested one. so traditionally i have to
loop the json object and fetch all the data.

Coming to data array i have to read a json array where each json object
should be checked with the above json schema i.e on the json array first
json object first column data should be string,lengthmatch .

With out looping schema json and inside that looping this data array which
will be performance impact, do we have any options or better way to handle..


Thanks in advance.
sk


Parsing nested json objects with variable structure

2015-08-31 Thread SK

Hi,

I need to parse a json input file where the nested objects take on a
different structure based on the typeId field, as follows: 

{ "d":
{  "uid" : "12345"
  "contents": [{"info": {"eventId": "event1"}, "typeId": 19}]
 }
}

{ "d":
{  "uid" :  "56780"
   "contents": [{"info": {"id": "1"}, "typeId": 1003}, {"info":
{"id": "27"}, "typeId": 13}]
 }
}

In the above, the "contents" field takes on a different structure for typeId
13 and 19. My code is currently as follows:

logs  = sqlc.read.json(sys.argv[1])
logs.registerTempTable("logs")

features = sqlc.sql("SELECT d.uid, d.contents.typeId FROM logs")

I also need to extract the fields in d.contents.info. How can I extract
these fields since they have different names depending on the typeId?  I am
using Pyspark in Spark version 1.4.1. Any guidance in python or scala would
be helpful.

thanks
sudha









--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Parsing-nested-json-objects-with-variable-structure-tp24526.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RandomForest in Pyspark (version 1.4.1)

2015-07-31 Thread SK

Hi,

I tried to develop a RandomForest model for my data in PySpark as follows:

   rf_model = RandomForest.trainClassifier(train_idf, 2, {},
numTrees=15, seed=144)
   print RF: Num trees = %d, Num nodes = %d\n %(rf_model.numTrees(),
rf_model.totalNumNodes())

   pred_label = test_idf.map(lambda p:
(float(rf_model.predict(p.features)), p.label))
   print pred_label.take(5)  ## exception

I am  getting the following error at the highlighted statement.
  
 Exception: It appears that you are attempting to reference
SparkContext from a broadcast variable, action, or   transforamtion.
SparkContext can only be used on the driver, not in code that it run on
workers. For more information, see SPARK-5063.

 I have used the same set of statements for linear models
(LogisticRegresssion and SVM) in PySpark and was able to get the predictions
abd print them.  I am not sure why I am getting the above exception. I am
not using the SparkContenxt directly in any of the above statements. I would
appreciate your help. 

thanks







--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/RandomForest-in-Pyspark-version-1-4-1-tp24103.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



filter expression in API document for DataFrame

2015-03-24 Thread SK


The following statement appears in the Scala API example at
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrame

people.filter(age  30).

I tried this example and it gave a compilation error. I think this needs to
be changed to people.filter(people(age)  30)

Also, it would be good to add some examples for the new equality operator
for columns (e.g. (people(age) === 30) ). The programming guide does not
have an example for this in the DataFrame Operations section and it was not
very obvious that we need to be using a different equality operator for
columns. 


thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/filter-expression-in-API-document-for-DataFrame-tp22213.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



column expression in left outer join for DataFrame

2015-03-24 Thread SK
Hi,

I am trying to port some code that was working in Spark 1.2.0 on the latest
version, Spark 1.3.0. This code involves a left outer join between two
SchemaRDDs which I am now trying to change to a left outer join between 2
DataFrames. I followed the example  for left outer join of DataFrame at
https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html

Here's my code, where df1 and df2 are the 2 dataframes I am joining on the
country field:

 val join_df =  df1.join( df2,  df1.country == df2.country, left_outer)

But I got a compilation error that value  country is not a member of
sql.DataFrame

I  also tried the following:
 val join_df =  df1.join( df2, df1(country) == df2(country),
left_outer)

I got a compilation error that it is a Boolean whereas a Column is required. 

So what is the correct Column expression I need to provide for joining the 2
dataframes on a specific field ?

thanks








--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/column-expression-in-left-outer-join-for-DataFrame-tp22209.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



MLLib: feature standardization

2015-02-06 Thread SK
Hi,

I have a dataset in csv format and I am trying to standardize the features
before using k-means clustering. The data does not have any labels but has
the following format:

s1, f12,f13,...
s2, f21,f22,...

where s is a string id, and f is a floating point feature value.
To perform feature standardization, I need to compute the mean and
variance/std deviation of the features values in each element of the RDD
(i.e each row). However, the summary Statistics library in Spark MLLib
provides only a colStats() method that provides column-wise mean and
variance. I tried to compute the mean and variance per row, using the code
below but got a compilation error that there is no mean() or variance()
method for a tuple or Vector object. Is there a Spark library to compute the
row-wise mean and variance for an RDD, where each row (i.e. element) of the
RDD is a Vector or tuple of N feature values?

thanks

My code for standardization is as follows:

//read the data 
val data=sc.textFile(file_name)
  .map(_.split(,))

// extract the features. For this example I am using only 2 features, but
the data has more features
val features = data.map(d= Vectors.dense(d(1).toDouble, d(2).toDouble))

val std_features = features.map(f= {
   val fmean = f.mean()   // Error:
NO MEAN() for a Vector or Tuple object
   val fstd= 
scala.math.sqrt(f.variance())// Error: NO variance() for a Vector or
Tuple object
   for (i - 0 to f.length) //
standardize the features
   { var fs = 0.0
  if (fstd 0.0)
  fs = (f(i)  - 
fmean)/fstd
  fs
   }
  }   
  )




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/MLLib-feature-standardization-tp21539.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



K-Means final cluster centers

2015-02-05 Thread SK
Hi,

I am trying to get the final cluster centers after running the KMeans
algorithm in MLlib in order to characterize the clusters. But the
KMeansModel does not have any public method to retrieve this info. There
appears to be only  a private method called clusterCentersWithNorm. I guess
I could call predict() to get the final cluster assignment for the dataset
and write my own code to compute the means based on this final assignment.
But I would like to know if  there is a way to get this info from MLLib API
directly after running KMeans?

thanks 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/K-Means-final-cluster-centers-tp21523.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



save a histogram to a file

2015-01-22 Thread SK
Hi,
histogram() returns an object that is a  pair of Arrays. There appears to be
no saveAsTextFile() for this paired object.

Currently I am using the following to save the output to a file:

val hist = a.histogram(10)

val arr1 = sc.parallelize(hist._1).saveAsTextFile(file1)
val arr2 = sc.parallelize(hist._2).saveAsTextFile(file2)

Is there a simpler way to save the histogram() result to a file?

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/save-a-histogram-to-a-file-tp21324.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark SQL performance and data size constraints

2014-11-26 Thread SK
Hi,

I use the following code to read in data and extract the unique users using
Spark SQL. The data is 1.2 TB and I am running this on a cluster with 3 TB
memory. It appears that there is enough memory, but the program just freezes
after sometime where it maps the rdd to the case class Play.  (If I dont use
the Spark SQL portion (i.e dont map to the case class and register the table
etc.)  and merely load the data (first 3 lines of the code below) then the
program completes.)

I  tried with  spark.storage.memoryFraction=0.5 and 0.6 (default) as
suggested in the Tuning guide. but that did not help.
According to the logs, total # of partitions/tasks is 38688 and size of each
rdd partition for the mapping to the case class is around 31 MB. So total
rdd size is 38688*31 = 1.2 TB. This is less than the 3 TB memory on the
cluster. At the time the program stops, the total number of tasks is a
little  38688 with some of them appearing as failed. There are no details
for why the tasks failed. 

Are there any maximum data size constraints in Spark SQL or table creation
that might be causing the program to hang? Are there any performance
optimizations I could try with Spark SQL that might allow the completion of
the task?


 val data = sc.textFile(shared_dir/*.dat)
.map(_.split(\t))
.persist(MEMORY_AND_DISK_SER)


 val play = data.map(f = Play(f(0).trim,f(1).trim, f(2).trim,
f(3).trim))
   .persist(MEMORY_AND_DISK_SER)

 // register the RDD as a table
 play.registerTempTable(play)

 val ids = sql_cxt.sql(SELECT  DISTINCT id  FROM play)

 println(Number of unique account ID = %d.format(ids.count()))
 println(Number of RDDs = %d.format(play.count()))

thanks




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-performance-and-data-size-constraints-tp19843.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark performance optimization examples

2014-11-24 Thread SK
Hi,

Is there any document that provides some guidelines with some examples that
illustrate when different performance optimizations would be useful? I am
interested in knowing the guidelines for using optimizations like cache(),
persist(), repartition(), coalesce(), and broadcast variables.  I studied
the online programming guide, but I would like some more details (something
along the lines of Aaron Davidson's talk which illustrates the use of
repartition() with an example during the Spark summit).

In particular, I have a dataset that is about 1.2TB (about 30 files) that I
am trying to load using sc.textFile on a cluster with a total memory of 3TB
(170 GB per node). But I am not able to successfully complete the loading.
THe program is continuously active in the mapPartitions task but  does not
get past that even after a long time. I have tried some of the above
optimizations. But that has not helped and I am not sure if I am using these
optimizations in the right way or which of the above optimizations would be
most appropriate to this problem.  So I would appreciate any guidelines. 

thanks





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-performance-optimization-examples-tp19707.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark failing when loading large amount of data

2014-11-20 Thread SK
Hi,

I am using sc.textFile(shared_dir/*)  to load all the files in a directory
on a shared partition. The total size of the files in this directory is 1.2
TB. We have a 16  node cluster with 3 TB memory (1 node is driver, 15 nodes
are workers). But the loading fails after around 1 TB of data is read (in
the mapPartitions stage). Basically, there  is no progress in mapPartitions
after 1 TB of input. It seems that the cluster has sufficient memory but not
sure why the program get stuck.

1.2 TB of data divided across 15 worker nodes would require each node to
have about 80 GB of memory. Every node in the cluster is allocated around
170GB of memory. According to the spark documentation, the default storage
fraction for RDDs is 60% of the allocated memory. I have increased that to
0.8 (by setting --conf spark.storage.memorFraction=0.8) , so each node
should have around 136 GB of memory for storing RDDs. So I am not sure why
the program is failing in the mapPartitions stage where it seems to be 
loading the data. 

I dont have a good idea about the Spark internals and would appreciate any
help in fixing this issue. 

thanks
   



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-failing-when-loading-large-amount-of-data-tp19441.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Streaming: getting total count over all windows

2014-11-13 Thread SK
Hi,

I am using the following code to generate the (score, count) for each
window:

val score_count_by_window  = topic.map(r =  r._2)   // r._2 is the integer
score
 .countByValue()
   
score_count_by_window.print()   

E.g. output for a window is as follows, which means that within the Dstream
for that window, there are 2 rdds with score 0; 3 with score 1, and 1 with
score -1.
(0, 2)
(1, 3)
(-1, 1)

I would like to get the aggregate count for each score over all windows
until program terminates. I tried countByValueAndWindow() but the result is
same as countByValue() (i.e. it is producing only per window counts). 
reduceByWindow also does not produce the result I am expecting. What is the
correct way to sum up the counts over multiple windows?

thanks










--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-getting-total-count-over-all-windows-tp1.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



filtering out non English tweets using TwitterUtils

2014-11-11 Thread SK
Hi,

Is there a way to extract only the English language tweets when using
TwitterUtils.createStream()? The filters argument specifies the strings
that need to be contained in the tweets, but I am not sure how this can be
used to specify the language.

thanks





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/filtering-out-non-English-tweets-using-TwitterUtils-tp18614.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: filtering out non English tweets using TwitterUtils

2014-11-11 Thread SK
Thanks for the response. I tried the following :

   tweets.filter(_.getLang()=en)

I get a compilation error:
   value getLang is not a member of twitter4j.Status

But getLang() is one of the methods of twitter4j.Status since version 3.0.6
according to the doc at:
   http://twitter4j.org/javadoc/twitter4j/Status.html#getLang--

What version of twitter4j does Spark Streaming use?

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/filtering-out-non-English-tweets-using-TwitterUtils-tp18614p18621.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: filtering out non English tweets using TwitterUtils

2014-11-11 Thread SK

Small typo in my code in  the previous post. That should be: 
 tweets.filter(_.getLang()==en) 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/filtering-out-non-English-tweets-using-TwitterUtils-tp18614p18622.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



groupBy for DStream

2014-11-11 Thread SK

Hi.

1) I dont see a groupBy() method for a DStream object. Not sure why that is
not supported. Currently I am using filter () to separate out the different
groups. I would like to know if there is a way to convert a DStream object
to a regular RDD so that I can apply the RDD methods like groupBy.


2) The count() method for a DStream object returns a DStream[Long] instead
of a simple Long (like RDD does). How can I extract the simple Long count
value? I tried dstream(0) but got a compilation error that it does not take
parameters. I also tried dstream[0], but that also resulted in a compilation
error. I am not able to use the head() or take(0) method for DStream either.

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/groupBy-for-DStream-tp18623.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



specifying sort order for sort by value

2014-11-06 Thread SK
Hi,

I am using rdd.sortBy(_._2) to get an RDD sorted by value. The default order
is ascending order. How can I get it sorted  in descending order? I could
not find an option to specify the order. 

I need to get the top K elements of the list sorted in descending order. If
there is no option to get the descending order, I would like to know if
there is a way to get the last K elements of the list sorted in ascending
order.  take(k) gets the first k elements, is there an option to get the
last K elements of an RDD ?

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/specifying-sort-order-for-sort-by-value-tp18289.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: specifying sort order for sort by value

2014-11-06 Thread SK
Thanks. I was looking at an older RDD documentation that did not specify the
ordering option. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/specifying-sort-order-for-sort-by-value-tp18289p18292.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Using SQL statements vs. SchemaRDD methods

2014-11-04 Thread SK
SchemaRDD  supports some of the SQL-like functionality like groupBy(),
distinct(), select(). However, SparkSQL also supports SQL statements which
provide this functionality. In terms of future support and performance, is
it better to use SQL statements or the SchemaRDD methods that provide
equivalent functionality? 

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Using-SQL-statements-vs-SchemaRDD-methods-tp18124.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SparkSQL: Nested Query error

2014-10-30 Thread SK
Hi,

 I am getting an error in the Query Plan when I use the SQL statement
exactly as you have suggested. Is that the exact SQL statement I should be
using (I am not very familiar with SQL syntax)?


I also tried using the SchemaRDD's subtract method to perform this query.
usersRDD.subtract(deviceRDD).count(). The count comes out to be  1, but
there are many UIDs in tusers that are not in device - so the result is
not correct. 

I would like to know the right way to do frame this query in SparkSQL.

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-Nested-Query-error-tp17691p17705.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



SparkSQL: Nested Query error

2014-10-29 Thread SK
Hi,

I am using Spark 1.1.0. I have the following SQL statement where I am trying
to count the number of UIDs that are in the tusers table but not in the
device table.

val users_with_no_device = sql_cxt.sql(SELECT COUNT (u_uid) FROM tusers
WHERE tusers.u_uid NOT IN (SELECT d_uid FROM device))

I am getting the following error:
Exception in thread main java.lang.RuntimeException: [1.61] failure:
string literal expected
SELECT COUNT (u_uid) FROM tusers WHERE tusers.u_uid NOT IN (SELECT d_uid
FROM device)

I am not sure if every subquery has to be a string, so I tried to enclose
the subquery as a  string literal as follows: 
val users_with_no_device = sql_cxt.sql(SELECT COUNT (u_uid) FROM tusers
WHERE tusers.u_uid NOT IN (SELECT d_uid FROM device))
But that resulted in a compilation error.

What is the right way to frame the above query in Spark SQL?

thanks




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-Nested-Query-error-tp17691.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming: Sentiment Analysis of Twitter streams

2014-10-15 Thread SK
You are right. Creating the StreamingContext from the SparkContext instead of
SparkConf helped. Thanks for the help.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Sentiment-Analysis-of-Twitter-streams-tp16410p16520.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Getting the value from DStream[Int]

2014-10-15 Thread SK
Hi,

As a result of a reduction operation, the resultant value score is a
DStream[Int] . How can I get the simple Int value?
I tried score[0], and score._1, but neither worked and can't  find a
getValue() in the DStream API. 

thanks
 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Getting-the-value-from-DStream-Int-tp16525.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark Streaming: Sentiment Analysis of Twitter streams

2014-10-14 Thread SK
Hi,

I am trying to implement simple sentiment analysis of Twitter streams in
Spark/Scala.  I am getting an exception and it appears when I combine
SparkContext with StreamingContext in the same program. When I read the
positive and negative words using only SparkContext.textFile (without
creating a StreamingContext) and analyze static text files, the program
works. Likewise, when I just create the twitter stream using
StreamingContext (and dont create a SparkContext to create the vocabulary),
the program works. The exception seems to be appearing when I combine both
SparkContext and StreamingContext in the same program and I am not sure if
we are not allowed to  have both simultaneously. All the examples in the
streaming module contain only the StreamingContext. The error transcript and
my code appear below. I would appreciate your guidance  in fixing this error
and the right way to  read static files and streams in the same program or
any pointers to relevant examples.
Thanks.


--Error transcript -
Lost task 0.0 in stage 2.0 (TID 70, mesos4-dev.sccps.net):
java.io.IOException: unexpected exception type
   
java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538)
   
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025)
   
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
   
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
   
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
   
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
   
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
   
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
   
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
   
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159)
   
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
   
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
-- My code below --
object TweetSentiment {
  def main(args: Array[String]) {

 
val filters = args
val sparkConf = new SparkConf().setAppName(TweetSentiment)
val sc = new SparkContext(sparkConf)

// get the list of positive words
val pos_list =  sc.textFile(positive-words.txt)
  .filter(line = !line.isEmpty())
  .collect()
  .toSet

// get the list of negative words
val neg_list =  sc.textFile(negative-words.txt)
  .filter(line = !line.isEmpty())
  .collect()
  .toSet

   // create twitter stream
   val ssc = new StreamingContext(sparkConf, Seconds(60))
   val stream = TwitterUtils.createStream(ssc, None, filters)
   val tweets = stream.map(r = r.getText)
   tweets.print() // print tweet text

  ssc.start()
  ssc.awaitTermination()
  sc.stop()   // I tried commenting this, but the exception still appeared.
}






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Sentiment-Analysis-of-Twitter-streams-tp16410.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: getting tweets for a specified handle

2014-10-10 Thread SK
Thanks. I made the change and ran the code. But I dont get any tweets for my
handle, although I do see the tweets when I search for it on twitter. Does
Spark allow us to get the tweets from the past (say the last 100 tweets?
tweets that appeared in the last 10 minutes)?

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/getting-tweets-for-a-specified-handle-tp16085p16180.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



getting tweets for a specified handle

2014-10-09 Thread SK
Hi,
I am using Spark 1.1.0.  Is there a way to get the complete tweets
corresponding to a handle (for e.g. @Delta)? I tried using the following
example that extracts just the hashtags and replaced the # with @ as
follows.  I need the complete tweet and not just the tags. 

  //  val hashTags = stream.flatMap(status = status.getText.split(
).filter(_.startsWith(#)))

  // replaced the above with the following:
  val tweets = stream.map(status =
status.getText.filter(_.contains(@Delta)))

  But I get an error:   value contains is not a member of Char

  I am trying to find out if there a better  Spark API to get the tweets for
a handle so that we dont have to do the filtering -  Something along the
lines of searchTwitter(topic, number_of_tags) API that is provided by the
twitteR package in R would be appropriate. 

thanks








--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/getting-tweets-for-a-specified-handle-tp16085.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Shuffle files

2014-10-07 Thread SK
- We set ulimit to 50. But I still get the same too many open files
warning. 

- I tried setting consolidateFiles to True, but that did not help either.

I am using a Mesos cluster.   Does Mesos have any limit on the number of
open files?

thanks






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-files-tp15185p15869.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Application details for failed and teminated jobs

2014-10-02 Thread SK
Hi,

Currently the history server provides application details for only the
successfully completed jobs (where the APPLICATION_COMPLETE file is
generated). However,  (long-running) jobs that we terminate manually or
failed jobs where the APPLICATION_COMPLETE may not be generated, dont show
up on the history server page. They however do show up on the 4040 interface
as long as they are running. Is it possible to save those logs and load them
up on the history server (even when the APPLICATION_COMPLETE is not
present)? This would allow us troubleshoot the failed and terminated jobs
without holding up the cluster.

thanks
 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Application-details-for-failed-and-teminated-jobs-tp15627.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark SQL: ArrayIndexOutofBoundsException

2014-10-02 Thread SK
Hi,

I am trying to extract the number of distinct users from a file using Spark
SQL, but  I am getting  the following error:


ERROR Executor: Exception in task 1.0 in stage 8.0 (TID 15)
java.lang.ArrayIndexOutOfBoundsException: 1


 I am following the code in examples/sql/RDDRelation.scala. My code is as
follows. The error is appearing when it executes the SQL statement. I am new
to  Spark SQL. I would like to know how I can fix this issue. 

thanks for your help. 


 val sql_cxt = new SQLContext(sc)
 import sql_cxt._

 // read the data using th e schema and create a schema RDD
 val tusers = sc.textFile(inp_file)
   .map(_.split(\t))
   .map(p = TUser(p(0), p(1).trim.toInt))

 // register the RDD as a table
 tusers.registerTempTable(tusers)

 // get the number of unique users
 val unique_count = sql_cxt.sql(SELECT COUNT (DISTINCT userid) FROM
tusers).collect().head.getLong(0)

 println(unique_count)






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-ArrayIndexOutofBoundsException-tp15639.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Fwd: Spark SQL: ArrayIndexOutofBoundsException

2014-10-02 Thread SK
Thanks for the help. Yes, I did not realize that the first header line has a
different separator.  

By the way, is there a way to drop the first line that contains the header?
Something along the following lines:

  sc.textFile(inp_file)
  .drop(1)  // or tail() to drop the header line 
  .map  // rest of the processing 

I could not find a drop() function or take the bottom (n) elements for RDD.
Alternatively, a way to create the case class schema from the header line of
the file  and use the rest for the data would be useful - just as a
suggestion.  Currently I am just deleting this header line manually before
processing it in Spark. 


thanks





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-ArrayIndexOutofBoundsException-tp15639p15642.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark.cleaner.ttl

2014-10-01 Thread SK
Hi,

I am using spark v 1.1.0. The default value of spark.cleaner.ttl is infinite
as per the online docs. Since a lot of  shuffle files are generated in
/tmp/spark-local* and the disk is running out of space, we tested with a
smaller value of ttl. However, even when job has completed and the timer 
expires, the files remain and instead of deleting, the timestamps  of the
files keep changing. How can we automatically delete these shuffle files,
say after every 24 hours? 

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-cleaner-ttl-tp15574.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



processing large number of files

2014-09-30 Thread SK
Hi,

I am  trying to compute the number of unique users from a year's worth of
data. So there are about 300 files and each file is quite large (~GB).  I
first tried this without a loop by reading all the files in the directory
using the glob pattern:  sc.textFile(dir/*). But the tasks were stalling
and I was getting a Too many open files warning, even though I increased
the nofile limit to 500K.  The number of shuffle tasks that were being
created was more than 200K and they were all generating shuffle files.
Setting consolidateFiles to true did not help. 

So now I am reading the files in a loop as shown in the  code below. Now I
dont run in to the Too many open files issue.  But the countByKey is
taking a really long time (more then 15 hours and still ongoing). It appears
from the logs that this operation is happening on a single node. From the
logs, I am not able to figure out why it is taking so long. Each node has 16
GB memory and the mesos cluster has 16 nodes.  I have set  spark.serializer
to KryoSerializer.  I am not running into any out of memory errors so far.
Is there some way to improve the performance? Thanks. 

for (i - 1 to 300)
{
 var f = file + i//name of the file
 val user_time = sc.textFile(f)
.map(line = {
 val fields = line.split(\t)
 (fields(11), fields(6))
}) // extract (year-month, user_id)
.distinct()
.countByKey  // group by with year as the key

// now convert Map object to RDD in order to output results
val ut_rdd = sc.parallelize(user_time.toSeq)

// convert to array to extract the count. Need to find if there is
an easier way to do this.
var ar = ut_rdd.toArray()

// aggregate the count for the year
ucount = ucount + ar(0)._2
}




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/processing-large-number-of-files-tp15429.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Shuffle files

2014-09-25 Thread SK
Hi,

I am using Spark 1.1.0 on a cluster. My job takes as input 30 files in a
directory (I am using  sc.textfile(dir/*) ) to read in the files.  I am
getting the following warning:

WARN TaskSetManager: Lost task 99.0 in stage 1.0 (TID 99,
mesos12-dev.sccps.net): java.io.FileNotFoundException:
/tmp/spark-local-20140925215712-0319/12/shuffle_0_99_93138 (Too many open
files)

basically I think a lot of shuffle files are being created. 

1) The tasks eventually fail and the job just hangs (after taking very long,
more than an hour).  If I read these 30 files in a for loop, the same job
completes in a few minutes. However, I need to specify the files names,
which is not convenient. I am assuming that sc.textfile(dir/*) creates a
large RDD for all the 30 files. Is there a way to make the operation on this
large RDD efficient so as to avoid creating too many shuffle files?


2) Also, I am finding that all the shuffle files for my other completed jobs
are not being automatically deleted even after days. I thought that
sc.stop() clears the intermediate files.  Is there some way to
programmatically delete these temp shuffle files upon job completion?


thanks





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-files-tp15185.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



HdfsWordCount only counts some of the words

2014-09-23 Thread SK
Hi,

I tried out the HdfsWordCount program in the Streaming module on a cluster.
Based on the output, I find that it counts only a few of the words. How can
I have it count all the words in the text? I have only one text  file in the
directory. 

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/HdfsWordCount-only-counts-some-of-the-words-tp14929.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: HdfsWordCount only counts some of the words

2014-09-23 Thread SK

I execute it as follows:

$SPARK_HOME/bin/spark-submit   --master master url  --class 
org.apache.spark.examples.streaming.HdfsWordCount 
target/scala-2.10/spark_stream_examples-assembly-1.0.jar  hdfsdir

After I start the job, I add a new test file in hdfsdir. It is a large text
file which I will not be able to copy here. But it  probably has at least
100 distinct words. But the streaming output has only about 5-6 words along
with their counts as follows. I then stop the job after some time. 

Time ...

(word1, cnt1)
(word2, cnt2)
(word3, cnt3)
(word4, cnt4)
(word5, cnt5)

Time ...

Time ...




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/HdfsWordCount-only-counts-some-of-the-words-tp14929p14967.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Streaming: HdfsWordCount does not print any output

2014-09-22 Thread SK
Hi,

I tried running the HdfsWordCount program in the streaming examples in Spark
1.1.0. I provided a directory in the distributed filesystem as input. This
directory has one text file. However, the only thing that the program keeps
printing is the time - but not the word count. I have not used the streaming
module much, so wanted to find out how I can get the output. 

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-HdfsWordCount-does-not-print-any-output-tp14849.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Streaming: HdfsWordCount does not print any output

2014-09-22 Thread SK
This issue is resolved. The file needs to be created after the program has
started to  execute.

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-HdfsWordCount-does-not-print-any-output-tp14849p14852.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



mllib performance on mesos cluster

2014-09-19 Thread SK
Hi,

I have a program similar to the BinaryClassifier example that I am running
using my data (which is fairly small). I run this for 100 iterations. I
observed the following performance:

Standalone mode cluster with 10 nodes (with Spark 1.0.2):  5 minutes
Standalone mode cluster with 10 nodes (with Spark 1.1.0):  8.9 minutes
Mesos cluster with 10 nodes (with Spark 1.1.0): 17 minutes

1) According to the documentation, Spark 1.1.0 has better performance. So I
would like to understand why the runtime is longer on Spark 1.1.0. 

2) Why is the performance on mesos significantly higher than in standalone
mode?  I just wanted to find out if any one else has observed poor
performance for Mllib based programs on mesos cluster. I looked through the
application detail logs and found that some of the scheduler delay values
were higher on mesos compared to standalone mode (40 ms vs. 10 ms).  Is the
mesos scheduler slower?

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/mllib-performance-on-mesos-cluster-tp14692.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark Streaming compilation error: algebird not a member of package com.twitter

2014-09-19 Thread SK
Hi,

I am using the latest release Spark 1.1.0. I am trying to build the
streaming examples (under examples/streaming) as a standalone project with
the following streaming.sbt file. When I run sbt assembly, I get an error
stating that object algebird is not a member of  package com.twitter. I
tried adding the dependency spark-streaming-algebird, but that was not
recognized. What  dependency should I be including for algebird?

import AssemblyKeys._

assemblySettings

name := spark_stream_examples

version := 1.0

scalaVersion := 2.10.4



libraryDependencies += org.apache.spark %% spark-sql % 1.1.0 %
provided

libraryDependencies += org.apache.spark %% spark-streaming % 1.1.0 %
provided

libraryDependencies += org.apache.spark %% spark-streaming-twitter %
1.1.0

libraryDependencies += org.apache.spark %% spark-streaming-flume %
1.1.0

libraryDependencies += org.apache.spark %% spark-streaming-zeromq %
1.1.0

libraryDependencies += org.apache.spark %% spark-streaming-mqtt %
1.1.0

libraryDependencies += org.apache.spark %% spark-streaming-kafka %
1.1.0

//libraryDependencies += org.apache.spark %% spark-streaming-algebird %
1.1.0



resolvers += Akka Repository at http://repo.akka.io/releases/;





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-compilation-error-algebird-not-a-member-of-package-com-twitter-tp14709.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Unable to load app logs for MLLib programs in history server

2014-09-18 Thread SK
Hi,

The default log files for the Mllib examples use a rather long naming
convention that includes special characters like parentheses and comma.For
e.g. one of my log files is named
binaryclassifier-with-params(input.txt,100,1.0,svm,l2,0.1)-1410566770032. 

When I click on the program on the history server page (at port 18080), to
view the detailed application logs, the history server crashes and I need to
restart it. I am using Spark 1.1 on a mesos cluster. 

I renamed the  log file by removing the special characters and  then it
loads up correctly. I am not sure which program is creating the log files.
Can it be changed so that the default log file naming convention does not
include  special characters? 

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-load-app-logs-for-MLLib-programs-in-history-server-tp14627.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark 1.1.0: Cannot load main class from JAR

2014-09-12 Thread SK
Hi,

I am using the Spark 1.1.0 version that was released yesterday. I recompiled
my program to use the latest version using sbt assembly after modifying
project.sbt to use the 1.1.0 version. The compilation goes through and the
jar is built. When I run the jar using spark-submit, I get an error: Cannot
load main class from JAR. This program was working with version 1.0.2. The
class does have a main method. So far I have never had problems recompiling
and running the jar, when I have upgraded to new versions. Is there anything
different I need to do for 1.1.0 ?

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-1-0-Cannot-load-main-class-from-JAR-tp14123.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.1.0: Cannot load main class from JAR

2014-09-12 Thread SK
This issue is resolved. Looks like in the new spark-submit, the jar path has
to be at the end of the options. Earlier I could specify this path in any
order on the command line. 

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-1-0-Cannot-load-main-class-from-JAR-tp14123p14124.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



History server: ERROR ReplayListenerBus: Exception in parsing Spark event log

2014-09-11 Thread SK

Hi,

I am using Spark 1.0.2 on a mesos cluster. After I run my job, when I try to
look at the detailed application stats using a history server@18080, the
stats don't show up for some of the jobs even though the job completed
successfully and the event logs are written to the log folder. The log from
the history server execution is  attached below - looks like it is
encountering some parsing error when reading the EVENT_LOG file ( I have not
modified this file). Basically the line that says Malformed line  seems to
be truncating the first path (instead of amd64, it shows up as a d64).  Does
the history server have any String buffer limitations that would be causing
this problem?  Also, I want to point out that this problem does not happen
all the time - during some runs the app details do show up. However this is
quite unpredictable. 

The same job when I ran using Spark 1.0.1 in standalone mode (i.e without
using a history server), showed up on the application details page.  I am
not sure if this is a problem with the history server or specifically with
version 1.0.2. Is it possible to fix this problem, as I would like to  use
the application details?


thanks




14/09/11 20:50:55 ERROR ReplayListenerBus: Exception in parsing Spark event
log file:/mapr/applogs_spark_mesos/spark_test-1410468489529/EVENT_LOG_1
com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'd64': was
expecting 
 at [Source: java.io.StringReader@2d51a56a; line: 1, column: 4]
at
com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1524)
at
com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:557)
at
com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:2042)
at
com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:1412)
at
com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:679)
at
com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3024)
at
com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:2971)
at
com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2091)
at org.json4s.jackson.JsonMethods$class.parse(JsonMethods.scala:19)
at org.json4s.jackson.JsonMethods$.parse(JsonMethods.scala:44)
at
org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:71)
at
org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:69)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:69)
at
org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:55)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at
org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:55)
at
org.apache.spark.deploy.history.HistoryServer.org$apache$spark$deploy$history$HistoryServer$$renderSparkUI(HistoryServer.scala:182)
at
org.apache.spark.deploy.history.HistoryServer$$anonfun$checkForLogs$3.apply(HistoryServer.scala:149)
at
org.apache.spark.deploy.history.HistoryServer$$anonfun$checkForLogs$3.apply(HistoryServer.scala:146)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.deploy.history.HistoryServer.checkForLogs(HistoryServer.scala:146)
at
org.apache.spark.deploy.history.HistoryServer$$anon$1$$anonfun$run$1.apply$mcV$sp(HistoryServer.scala:77)
at
org.apache.spark.deploy.history.HistoryServer$$anon$1$$anonfun$run$1.apply(HistoryServer.scala:74)
at
org.apache.spark.deploy.history.HistoryServer$$anon$1$$anonfun$run$1.apply(HistoryServer.scala:74)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160)
at
org.apache.spark.deploy.history.HistoryServer$$anon$1.run(HistoryServer.scala:73)

ReplayListenerBus: Malformed line:
d64/jre/lib/jsse.jar:/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/jce.jar:/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/charsets.jar:/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/rhino.jar:/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/jfr.jar:/usr/lib/jvm/java-7-openjdk-amd64/jre/classes,file.encoding:ISO-8859-1,user.timezone:Etc/UTC,java.specification.vendor:Oracle
Corporation,sun.java.launcher:SUN_STANDARD,os.version:3.13.0-32-generic,sun.os.patch.level:unknown,java.vm.specification.vendor:Oracle

Spark Web UI in Mesos mode

2014-09-08 Thread SK
Hi,

I am running Spark 1.0.2 on a cluster in Mesos mode. I am not able to access
the Spark master Web UI at port 8080 but am able to access it at port 5050.
Is 5050 the standard port? 

Also, in the the standalone mode, there is a link to the Application detail
UI directly from the master UI. I dont see that link in the master UI page
in Mesos mode. Instead I have to explicitly go to  port 18080  to access the
application detail. I have set up the history server. Is there some way to
access the application detail link in Mesos mode  directly from the master
UI page (like the standalone UI)?

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Web-UI-in-Mesos-mode-tp13746.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



mllib performance on cluster

2014-09-02 Thread SK
Hi,

I evaluated the runtime performance of some of the MLlib classification
algorithms on a local machine and a cluster with 10 nodes. I used standalone
mode and Spark 1.0.1 in both cases. Here are the results for the total
runtime:
   Local Cluster
Logistic regression   138 sec  336 sec
SVM   138 sec  336 sec
Decision tree 50 sec 132 sec

My dataset is quite small and my programs are very similar to the mllib
examples that are included in the Spark distribution. Why is the runtime on
the cluster significantly higher (almost 3 times) than that on the local
machine even though the former uses more memory and more nodes? Is it
because of the communication overhead on the cluster? I would like to know
if there is something I need to be doing to optimize the performance on the
cluster or if others have also been getting similar results. 

thanks
   



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/mllib-performance-on-cluster-tp13290.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: mllib performance on cluster

2014-09-02 Thread SK
NUm Iterations: For  LR and SVM, I am using the default value of 100.  All
the other parameters also I am using the default values.  I am pretty much
reusing the code from BinaryClassification.scala.  For Decision Tree, I dont
see any parameter for number of iterations inthe example code, so I did not
specify any. I am running each algorithm on my dataset 100 times and taking
the average runtime. 

MY dataset is very dense (hardly any zeros). The labels are 1 and 0. 

I did not explicity specify the number of partitions. I did not see any code
for this in the MLLib examples for BinaryClassification and DecisionTree.

hardware: 
local: intel core i7 with 12 cores and 7.8 GB of which I am allocating 4GB
for the executor memory. According to the application detail stats in the
spark UI, the total memory consumed is around 1.5 GB.

cluster: 10 nodes with a total of 320 cores, with 16GB per node. According
to the application detail stats in the spark UI, the total memory consumed
is around 95.5 GB.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/mllib-performance-on-cluster-tp13290p13299.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: mllib performance on cluster

2014-09-02 Thread SK
The dataset is quite small : 5.6 KB.  It has 200 rows and 3 features, and 1
column of labels.  From this dataset, I split 80% for training set and 20%
for test set. The features are integer counts and labels are binary (1/0).

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/mllib-performance-on-cluster-tp13290p13311.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Too many open files

2014-08-29 Thread SK
Hi,

I am having the same problem reported by Michael. I am trying to open 30
files. ulimit -n  shows the limit is 1024. So I am not sure why the program
is failing with  Too many open files error. The total size of all the 30
files is 230 GB. 
I am running the job on a cluster with 10 nodes, each having 16 GB. The
error appears to be happening at the distinct() stage.

Here is my program. In the following code, are all the 10 nodes trying to
open all of the 30 files or are the files distributed among the 30 nodes?  

val baseFile = /mapr/mapr_dir/files_2013apr*
valx = sc.textFile(baseFile)).map { line =
val
fields = line.split(\t)

(fields(11), fields(6)) 
  
}.distinct().countByKey()
val xrdd = sc.parallelize(x.toSeq)
xrdd.saveAsTextFile(...) 

Instead of using the glob *, I guess I can try using a for loop to read the
files one by one if that helps, but not sure if there is a more efficient
solution. 

The following is the error transcript: 

Job aborted due to stage failure: Task 1.0:201 failed 4 times, most recent
failure: Exception failure in TID 902 on host 192.168.13.11:
java.io.FileNotFoundException:
/tmp/spark-local-20140829131200-0bb7/08/shuffle_0_201_999 (Too many open
files) 
java.io.FileOutputStream.open(Native Method)
java.io.FileOutputStream.init(FileOutputStream.java:221)
org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:116)
org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:177)
org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161)
org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:158)
scala.collection.Iterator$class.foreach(Iterator.scala:727)
org.apache.spark.util.collection.AppendOnlyMap$$anon$1.foreach(AppendOnlyMap.scala:159)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
org.apache.spark.scheduler.Task.run(Task.scala:51)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:744) Driver stacktrace:





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Too-many-open-files-tp1464p13144.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark webUI - application details page

2014-08-28 Thread SK
I was able to recently solve this problem for standalone mode. For this mode,
I did not use a history server. Instead, I set spark.eventLog.dir (in
conf/spark-defaults.conf) to a directory in hdfs (basically this directory
should be in a place that is writable by the master and accessible globally
to all the nodes). 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-webUI-application-details-page-tp3490p13055.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




Re: OutofMemoryError when generating output

2014-08-28 Thread SK
Hi,
Thanks for the response. I tried to use countByKey. But I am not able to
write the output to console or to a file. Neither collect() nor
saveAsTextFile() work for the Map object that is generated after
countByKey(). 

valx = sc.textFile(baseFile)).map { line =
val fields = line.split(\t)
   (fields(11), fields(6)) // extract (month, user_id)
  }.distinct().countByKey()

x.saveAsTextFile(...)  // does not work. generates an error that
saveAstextFile is not defined for Map object


Is there a way to convert the Map object to an object that I can output to
console and to a file?

thanks




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/OutofMemoryError-when-generating-output-tp12847p13056.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Memory statistics in the Application detail UI

2014-08-28 Thread SK
Hi,

I am using a cluster where each node has 16GB (this is the executor memory).
After I complete an MLlib job, the executor tab shows the following:

Memory: 142.6 KB Used (95.5 GB Total) 

and individual worker nodes have the Memory Used values as 17.3 KB / 8.6 GB 
(this is different for different nodes). What does the second number signify
(i.e.  8.6 GB and 95.5 GB)? If 17.3 KB was used out of the total memory of
the node, should it not be 17.3 KB/16 GB?

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Memory-statistics-in-the-Application-detail-UI-tp13082.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Memory statistics in the Application detail UI

2014-08-28 Thread SK
Hi,
Thanks for the responses. I understand that  the second values in the Memory
Used column for the executors add up to 95.5 GB and the first values add up
to 17.3 KB. If 95.5 GB is the memory used to store the RDDs, then what is 
17.3 KB ? is that the memory used for shuffling operations? For non MLlib
applications I get 0.0 for the first number  - i.e memory used is 0.0 (95.5
GB Total). 

Is the total memory used the sum of the two numbers or is the first number
included in the second number  (i.e is 17.3 KB included in the 95.5 GB)? 

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Memory-statistics-in-the-Application-detail-UI-tp13082p13095.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark webUI - application details page

2014-08-26 Thread SK
I have already tried setting the history server and accessing it on
master-url:18080 as per the link. But the page does not list any completed
applications. As I mentioned in my previous mail, I am running Spark in
standalone mode on the cluster  (as well as on my local machine). According
to the link, it appears that the history server is required only in mesos or
yarn mode, not in standalone mode. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-webUI-application-details-page-tp3490p12834.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



OutofMemoryError when generating output

2014-08-26 Thread SK
Hi,

I have the following piece of code that I am running on a cluster with 10
nodes with 2GB memory per node. The tasks seem to complete, but at the point
where it is generating output (saveAsTextFile), the program freezes after
some time and reports an out of memory error (error transcript attached
below). I also tried using collect() and printing the output to console
instead of a file, but got the same error. The program reads some logs for a
month and extracts the number of unique users during the month. The reduced
output is not very large, so not sure why the memory error occurs. I would
appreciate any help in fixing this memory error to get the output. Thanks.

def main (args: Array[String]) {

val conf = new SparkConf().setAppName(App)
val sc = new SparkContext(conf)

 // get the number of users per month
val user_time = sc.union(sc.textFile(baseFile))
   .map(line = {
 val fields = line.split(\t)
(fields(11), fields(6))
}) // extract (month, user_id)
  .groupByKey  // group by month as the key
  .map(g= (g._1, g._2.toSet.size)) // get the
unique id count per month
 //  .collect()
 // user_time.foreach(f =
println(f))
 user_time.map(f = %s, %s.format(f._1,
f._2)).saveAsTextFile(app_output)
 sc.stop()
   }






14/08/26 15:21:15 WARN TaskSetManager: Loss was due to
java.lang.OutOfMemoryError
java.lang.OutOfMemoryError: GC overhead limit exceeded
at
org.apache.spark.util.collection.ExternalAppendOnlyMap.insert(ExternalAppendOnlyMap.scala:121)
at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:60)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$4.apply(PairRDDFunctions.scala:107)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$4.apply(PairRDDFunctions.scala:106)
at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582)
at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at 
org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
at org.apache.spark.scheduler.Task.run(Task.scala:51)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/OutofMemoryError-when-generating-output-tp12847.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark webUI - application details page

2014-08-25 Thread SK
Hi,

I am able to access the Application details web page from the master UI page
when I run Spark in standalone mode on my local machine.  However, I am not
able to access it when I run Spark on our private cluster. The Spark master
runs on one of the nodes in the cluster. I am able to access the spark
master UI at spark://master-url:8080. It shows the listing of all the
running and completed apps. When I click on the completed app, and access
the Application details link, the link points to:
master-url/app/?appId=app-idvalue

When I view the page source to view the html source, the href portion is
blank ().

However, on my local machine, when I click the  Application detail link for
a completed app, it correctly points to
master-url/history/app-id
and when I view the page's html source, the href portion points to
/history/app-id

On the cluster, I have set spark.eventLog.enabled to true in
$SPARK_HOME/conf/spark-defaults.conf on the master node as well as all the
slave nodes. I am using spark 1.0.1 on the cluster.

I am not sure why I am able to access the application details for completed
apps when the app runs on my local machine but not for the apps that run on
our cluster, although in both cases I am using spark 1.0.1 in standalone
mode.  Do I need to do any additional configuration to enable this history
on the cluster?

thanks




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-webUI-application-details-page-tp3490p12792.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Processing multiple files in parallel

2014-08-19 Thread SK
Without the sc.union, my program crashes with the following error:

Exception in thread main org.apache.spark.SparkException: Job aborted due
to stage failure: Master removed our application: FAILED at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1026)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Processing-multiple-files-in-parallel-tp12336p12428.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Extracting unique elements of an ArrayBuffer

2014-08-18 Thread SK
Hi,

I have a piece of code in which the result of a  groupByKey operation is as
follows:

(2013-04, ArrayBuffer(s1, s2, s3, s1, s2, s4))

The first element is a String value representing a date and the ArrayBuffer
consists of (non-unique) strings. I want to extract the unique elements of
the ArrayBuffer. So I am expecting the result to be:

(2013-04, ArrayBuffer(s1, s2, s3, s4))

I tried the following:
  .groupByKey
  .map(g = (g._1, g,_2.distinct)

But I get the following  runtime error: 
value distinct is not a member of Iterable[String]
[error].map(g= (g._1, g._2.distinct))

I also  tried g._2.distinct(), but got the same error. 


I looked at the Scala ArrayBuffer documentation and it supports distinct()
and count() operations.  I am using Spark 1.0.1 and scala 2.10.4.  I would
like to know how to extract the unique elements of the ArrayBuffer above.

thanks








--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Extracting-unique-elements-of-an-ArrayBuffer-tp12320.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Processing multiple files in parallel

2014-08-18 Thread SK

Hi,

I have a piece of code that reads all the (csv) files in a folder. For each
file, it parses each line, extracts the first 2 elements from each row of
the file, groups the tuple  by the key and finally outputs the  number of
unique values for each key.

val conf = new SparkConf().setAppName(App)
val sc = new SparkContext(conf)

val user_time = sc.union(sc.textFile(/directory/*))// union of all
files in the directory
   .map(line = {
   val fields = line.split(,)
   (fields(1), fields(0))  // extract first
2 elements
  }) 
   .groupByKey  // group by timestamp
  .map(g= (g._1, g._2.toSet.size)) // get the
number of unique ids per timestamp

I have a lot of files in the directory (several hundreds). The program takes
a long time. I am not sure if the union operation is preventing the files
from being processed in parallel. Is there a better way to parallelize the
above code ? For example, the first two operations (reading each file and
extracting the first 2 columns from each file) can be done in parallel, but
I am not sure if that is how Spark schedules the above code.

thanks
  





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Processing-multiple-files-in-parallel-tp12336.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark webUI - application details page

2014-08-15 Thread SK
Hi,

Ok, I was specifying --master local. I changed that to --master
spark://localhostname:7077 and am now  able to see the completed
applications. It provides summary stats about runtime and memory usage,
which is sufficient for me at this time. 

However it doesn't seem to archive the info in the application detail UI
that lists detailed stats about the completed stages of the application -
which would be useful for identifying bottleneck steps in a large
application. I guess we need to capture the application detail UI screen
before the app run completes or find a way to extract this info by  parsing
the Json log file in /tmp/spark-events.

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-webUI-application-details-page-tp3490p12187.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark webUI - application details page

2014-08-14 Thread SK
Hi,

I am using Spark 1.0.1. But I am still not able to see the stats for
completed apps on port 4040 - only for running apps. Is this feature
supported or is there a way to log this info to some file? I am interested
in stats about the total # of executors, total runtime, and total memory
used by my Spark program.

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-webUI-application-details-page-tp3490p12144.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark webUI - application details page

2014-08-14 Thread SK
I set  spark.eventLog.enabled to true  in
$SPARK_HOME/conf/spark-defaults.conf and also configured the logging to a
file as well as console in log4j.properties. But I am not able to get the
log of the statistics in a file. On the console there is a lot of log
messages along with the stats - so hard to separate the stats. I prefer the
online format that appears on localhost:4040 - it is more clear. I am
running the job in standalone mode on my local machine. is there some way to
recreate the stats online after the job has completed?

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-webUI-application-details-page-tp3490p12156.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [MLLib]:choosing the Loss function

2014-08-11 Thread SK
Hi,

Thanks for the reference to the LBFGS optimizer. 
I tried to use the LBFGS optimizer, but I am not able to pass it  as an
input to the LogisticRegression model for binary classification. After
studying the code in mllib/classification/LogisticRegression.scala, it
appears that the  only implementation of LogisticRegression uses
GradientDescent as a fixed optimizer. In other words, I dont see a
setOptimizer() function that I can use to change the optimizer to LBFGS.

I tried to follow the code in
https://github.com/dbtsai/spark-lbfgs-benchmark/blob/master/src/main/scala/org/apache/spark/mllib/benchmark/BinaryLogisticRegression.scala
that makes use of LBFGS, but it is not clear to me where  the
LogisticRegression  model with LBFGS is being returned that I can use for
the classification of the test dataset. 

If some one has sample code that uses LogisticRegression with LBFGS instead
of gradientDescent as the optimization algorithm, it would be helpful if you
can post it.

thanks 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/MLLib-choosing-the-Loss-function-tp11738p11913.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: scopt.OptionParser

2014-08-08 Thread SK
i was using sbt package when I got this error. Then I switched to using sbt
assembly and that solved the issue. To run sbt assembly, you need to have
a file called plugins.sbt in the  project root/project directory and it
has  the following line:

addSbtPlugin(com.eed3si9n % sbt-assembly % 0.11.2)

This is in addition to the  project name.sbt file I mentioned in the
earlier mail. 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/scopt-OptionParser-tp8436p11800.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Naive Bayes parameters

2014-08-07 Thread SK
I followed the example in
examples/src/main/scala/org/apache/spark/examples/mllib/SparseNaiveBayes.scala.

IN this file Params is defined as follows: 

case class Params (
input: String = null,
minPartitions: Int = 0,
numFeatures: Int = -1,
lambda: Double = 1.0)

In the main function, the option parser accepts numFeatures as an option.
But I looked at the code in more detail just now and found the following:

  val model = new NaiveBayes().setLambda(params.lambda).run(training)

So looks like at the time of creation only the lambda parameter is used.
Perhaps the example needs to be cleaned up during the next release. I am
currently using Spark version 1.0.1.


thanks




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Naive-Bayes-parameters-tp11592p11623.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Regularization parameters

2014-08-07 Thread SK
Hi,

I am following the code in 
examples/src/main/scala/org/apache/spark/examples/mllib/BinaryClassification.scala
For setting the parameters and parsing the command line options, I am just
reusing that code.Params is defined as follows. 

case class Params(
  input: String = null,
  numIterations: Int = 100,
  stepSize: Double = 1.0,
  algorithm: Algorithm = LR,
  regType: RegType = L2,
  regParam: Double = 0.1)

I use the command line option --regType to choose L1 or L2, and --regParam
to set it to 0.0. The option parser code in the example above parses the
options and creates the LogisticRegression object. It calls
setRegParam(regParam) to set the regularization parameter and calls the
updater to set the regType. 
To run LR, I am again using the code in the example above
(algorithm.run(training).clearThreshold())

The code in the above example computes AUC.  To compute accuracy of the test
data classification, I map the class to 0 if prediction  0.5, else it is
mapped to class 1. THen I compare the predictions with the corresponding
labels and the number of matches is given by correctCount. 

val accuracy = correctCount.toDouble / predictionAndLabel.count

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Regularization-parameters-tp11601p11627.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Regularization parameters

2014-08-07 Thread SK
What is the definition of regParam and what is the range of values it is
allowed to take? 

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Regularization-parameters-tp11601p11737.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



[MLLib]:choosing the Loss function

2014-08-07 Thread SK
Hi,

According to the MLLib guide, there seems to be support for different loss
functions. But I could not find a command line parameter to choose the loss
function but only found regType to choose the regularization. Does MLLib
support a parameter to choose  the loss function?

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/MLLib-choosing-the-Loss-function-tp11738.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Naive Bayes parameters

2014-08-06 Thread SK

1) How is the minPartitions parameter in NaiveBayes example used? What is
the default value?

2) Why is the  numFeatures specified as a parameter? Can this not be
obtained from the data? This parameter is not specified for the other MLlib
algorithms.  

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Naive-Bayes-parameters-tp11592.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Extracting an element from the feature vector in LabeledPoint

2014-08-01 Thread SK
I am using 1.0.1. It does not matter to me whether it is the first or second
element. I would like to know how to extract the i-th element in the feature
vector (not the label).

data.features(i) gives the following error:

method apply in trait Vector cannot be accessed in
org.apache.spark.mllib.linalg.Vector



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Extracting-an-element-from-the-feature-vector-in-LabeledPoint-tp0p11181.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


correct upgrade process

2014-08-01 Thread SK

Hi,

I upgraded to 1.0.1 from 1.0 a couple of weeks ago and have been able to use
some of the features advertised in 1.0.1. However, I get some compilation
errors in some cases and based on user response, these errors have been
addressed in the 1.0.1 version and so I should not be getting these errors.
So I want to make sure I followed the correct upgrade process as below (I am
running Spark on single machine in standalone mode - so no cluster
deployment):

- set SPARK_HOME to the new version

- run sbt assembly in SPARK_HOME to build the new Spark jars

- in the project sbt file point the libraryDependencies for spark-core and
other libraries to the 1.0.1 version and run sbt assembly to build the
project jar.

Is there anything else I need to do to ensure that no old jars are being
used? For example do I need to manually delete any old jars?

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/correct-upgrade-process-tp11194.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: correct upgrade process

2014-08-01 Thread SK
Hi,

So I again ran sbt clean followed by all of the steps listed above to
rebuild the jars after cleaning. My compilation error still persists.
Specifically, I am trying to extract an element from the feature vector that
is part of a LabeledPoint as follows:

data.features(i) 

This gives the following error:
method apply in trait Vector cannot be accessed in
org.apache.spark.mllib.linalg.Vector 

Based on a related post, this bug has been fixed in version 1.0.1 So not
sure why I am still getting this error. 

I noticed that sbt clean only removes the classes and jar files. However,
there is a .ivy2 directory where things get downloaded. That does not seem
to get cleaned and I am not sure if there are any old dependencies from here
that are being used when sbt assembly is run. So do I need to manually
remove this directory before running sbt clean and rebuilding the jars for
the new version?

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/correct-upgrade-process-tp11194p11213.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: evaluating classification accuracy

2014-07-30 Thread SK
I am using 1.0.1 and I am running locally (I am not providing any master
URL). But the zip() does not produce the correct count as I mentioned above.
So not sure if the issue has been fixed in 1.0.1. However, instead of using
zip, I am now using the code that Sean has mentioned and am getting the
correct count. So the issue is resolved.

thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/evaluating-classification-accuracy-tp10822p10980.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Decision Tree requires regression LabeledPoint

2014-07-30 Thread SK
I have also used labeledPoint or libSVM format (for sparse data) for
DecisionTree. When I had categorical labels (not features), I mapped the
categories to numerical data as part of the data transformation step (i.e.
before creating the LabeledPoint).



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Decision-Tree-requires-regression-LabeledPoint-tp10953p10981.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


evaluating classification accuracy

2014-07-28 Thread SK
Hi,

In order to evaluate the ML classification accuracy, I am zipping up the
prediction and test labels as follows and then comparing the pairs in
predictionAndLabel:

val prediction = model.predict(test.map(_.features))
val predictionAndLabel = prediction.zip(test.map(_.label))


However, I am finding that predictionAndLabel.count() has fewer elements
than test.count().  For example, my test vector has 43 elements, but
predictionAndLabel has only 38 pairs. I have tried other samples and always
get fewer elements after zipping. 

Does zipping the two vectors cause any compression? or is this because of
the distributed nature of the algorithm (I am running it in local mode on a
single machine). In order to get the correct accuracy, I need the above
comparison to be done by a single node on the entire test data (my data is
quite small). How can I ensure that?

thanks 






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/evaluating-classification-accuracy-tp10822.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Decision tree classifier in MLlib

2014-07-25 Thread SK
yes, the output  is continuous. So I used a threshold to get binary labels.
If prediction  threshold, then class is 0 else 1. I use this binary label
to then compute the accuracy. Even with this binary transformation, the
accuracy with decision tree model is low compared to LR or SVM (for the
specific dataset I used). 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Decision-tree-classifier-in-MLlib-tp9457p10678.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Kmeans: set initial centers explicitly

2014-07-24 Thread SK

Hi,

The mllib.clustering.kmeans implementation supports a random or parallel
initialization mode to pick the initial centers. is there a way to specify
the initial centers explictly? It would be useful to have a setCenters()
method where we can explicitly specify the initial centers. (For e.g. R
allows us to specify the initial centers.)  

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Kmeans-set-initial-centers-explicitly-tp10609.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: jsonRDD: NoSuchMethodError

2014-07-15 Thread SK
I am running this in standalone mode on a single machine. I built the spark
jar from scratch (sbt assembly) and then included that in my application
(the same process I have done for earlier versions). 

thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/jsonRDD-NoSuchMethodError-tp9688p9735.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: jsonRDD: NoSuchMethodError

2014-07-15 Thread SK
The problem is resolved. Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/jsonRDD-NoSuchMethodError-tp9688p9742.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Help with Json array parsing

2014-07-15 Thread SK
Hi,

I have a json file where the object definition in each line includes an
array component obj that contains 0 or more elements as shown by the
example below.

 {name: 16287e9cdf,  obj: [{min: 50,max: 59 }, {min: 20, max:
29}]},
 {name: 17087e9cdf,  obj: [{min: 30,max: 39 }, {min: 10, max:
19}, {min: 60, max: 69}]},
 {name: 18287e0cdf}

I need to extract all the min values from the JSON definition in each line
for further processing. I used the following Spark code to parse the file
and extract the min fields, but I am getting a runtime error. I would like
to know what is the right way to extract the 0 or more min values from the
array above.

val inp = sc.textFile(args(0))
val res = inp.map(line = { parse(line) })
   .map(json =
  {
 implicit lazy val formats =
org.json4s.DefaultFormats
 val name = (json \ name).extract[String]
 val min_vals =  (json \ obj  \
min).extract[Array[Int]]
 (name, min_vals)
  }
)


Thanks for  your help.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Help-with-Json-array-parsing-tp9807.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Help with Json array parsing

2014-07-15 Thread SK
To add to my previous post, the error at runtime is teh following:

Exception in thread main org.apache.spark.SparkException: Job aborted due
to stage failure: Task 0.0:0 failed 1 times, most recent failure: Exception
failure in TID 0 on host localhost: org.json4s.package$MappingException:
Expected collection but got JInt(20) for root JInt(20) and mapping
int[][int, int]

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Help-with-Json-array-parsing-tp9807p9820.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


jsonRDD: NoSuchMethodError

2014-07-14 Thread SK
Hi,

I am using Spark 1.0.1. I am using the following piece of code to parse a
json file. It is based on the code snippet in the SparkSQL programming
guide. However, the compiler outputs an error stating: 

java.lang.NoSuchMethodError:
org.apache.spark.sql.SQLContext.jsonRDD(Lorg/apache/spark/rdd/RDD;)Lorg/apache/spark/sql/SchemaRDD;

I get a similar error for jsonFile() as well. I have included the spark-sql
1.0.1 jar when building my program using sbt. What is the right library to
import for jsonRDD and jsonFile?

thanks

import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.json

object SQLExample{
   def main(args : Array[String]) {

  val sparkConf = new SparkConf().setAppName(JsonExample)
  val sc = new SparkContext(sparkConf)
  val sqlc = new org.apache.spark.sql.SQLContext(sc)

  val jrdd = sc.textFile(args(0)).filter(r= r.trim != )
  val data = sqlc.jsonRDD(jrdd)

  data.printSchema()
   }
}




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/jsonRDD-NoSuchMethodError-tp9688.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Streaming training@ Spark Summit 2014

2014-07-11 Thread SK
Hi,

I tried out the streaming program on the Spark training web page. I created
a Twitter app as per the instructions (pointing to http://www.twitter.com).
When I run the program, my credentials get printed out correctly but
thereafter, my program just keeps waiting. It does not print out the hashtag
count etc.  My code appears below (essentially same as what is on the
training web page). I would like to know why I am not able to get a
continuous stream and the hashtag count.

thanks

   // relevant code snippet 

   
TutorialHelper.configureTwitterCredentials(apiKey,apiSecret,accessToken,accessTokenSecret)

 val ssc = new StreamingContext(new SparkConf(), Seconds(1))
 val tweets = TwitterUtils.createStream(ssc, None)
 val statuses = tweets.map(status = status.getText())
 statuses.print()

 ssc.checkpoint(checkpointDir)

 val words = statuses.flatMap(status = status.split( ))
 val hashtags = words.filter(word = word.startsWith(#))
 hashtags.print()

 val counts = hashtags.map(tag = (tag, 1))
  .reduceByKeyAndWindow(_ + _, _ - _, Seconds(60 *
5), Seconds(1))
 counts.print()

 val sortedCounts = counts.map { case(tag, count) = (count, tag) }
 .transform(rdd = rdd.sortByKey(false))
 sortedCounts.foreach(rdd =
 println(\nTop 10 hashtags:\n +
rdd.take(10).mkString(\n)))

 ssc.start()
 ssc.awaitTermination()

//end code snippet 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-training-Spark-Summit-2014-tp9465.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Streaming training@ Spark Summit 2014

2014-07-11 Thread SK
I dont get any exceptions or error messages.

I tried it both with and without VPN and had the same outcome. But  I can
try again without VPN later today and report back.

thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-training-Spark-Summit-2014-tp9465p9477.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Streaming training@ Spark Summit 2014

2014-07-11 Thread SK
I dont have a proxy server.

thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-training-Spark-Summit-2014-tp9465p9481.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


ML classifier and data format for dataset with variable number of features

2014-07-11 Thread SK
Hi,

I need to perform binary classification on an image dataset. Each image is a
data point described by a Json object. The feature set for each image is a
set of feature vectors, each feature vector corresponding to a distinct
object in the image. For example, if an image has 5 objects, its feature set
will have 5 feature vectors, whereas an image that has 3 objects will have a
feature set consisting of 3 feature vectors. So  the number of feature
vectors  may be different for different images, although  each feature
vector has the same number of attributes. The classification depends on the
features of the individual objects, so I cannot aggregate them all into a
flat vector.

I have looked through the Mllib examples and it appears that the libSVM data
format and the LabeledData format that Mllib uses, require  all the points
to have the same number of features and they read in a flat feature vector.
I would like to know if any of the Mllib supervised learning classifiers can
be used with json data format and whether they can be used to classify
points with different number of features as described above.

thanks
 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ML-classifier-and-data-format-for-dataset-with-variable-number-of-features-tp9486.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


incorrect labels being read by MLUtils.loadLabeledData()

2014-07-10 Thread SK
Hi,

I have a csv data file, which I have organized  in the following format to
be read as a LabeledPoint(following the example in
mllib/data/sample_tree_data.csv):

1,5.1,3.5,1.4,0.2
1,4.9,3,1.4,0.2
1,4.7,3.2,1.3,0.2
1,4.6,3.1,1.5,0.2

The first column is the binary label (1 or 0) and the remaining columns are
features. I am using the Logistic Regression Classifier in MLLib to create a
model based on the training data and predict the (binary) class of the test
data.   I use MLUtils.loadLabeledData to read  the data file. My prediction
accuracy is quite low (compared to the results I got for the same data from
R), So I tried to debug, by first verifying that the LabeledData is being
read correctly. 
I find that some of the labels are not read correctly. For example, the
first 40 points of the training data have a class of 1, whereas the training
data read by loadLabeledData has label 0 for point 12 and point 14. I would
like to know if this is because of the distributed algorithm that MLLib uses
or if there is something wrong with the format I have above.

thanks  





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/incorrect-labels-being-read-by-MLUtils-loadLabeledData-tp9356.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


scopt.OptionParser

2014-06-27 Thread SK
Hi,

I tried to develop some code to use Logistic Regression, following the code
in BinaryClassification.scala in examples/mllib. My code compiles, but at
runtime complains that scopt/OptionParser class cannot be found. I have the
following import statement in my code:

import scopt.OptionParser


My sbt file contains the following dependencies:

scalaVersion := 2.10.4

libraryDependencies += org.apache.spark %% spark-core % 1.0.0

libraryDependencies += org.apache.spark %% spark-mllib % 1.0.0

libraryDependencies += com.github.scopt %% scopt % 3.2.0

resolvers += Akka Repository at http://repo.akka.io/releases/;

Is there anything else I need to do to include the OptionParser?

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/scopt-OptionParser-tp8436.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Error when running unit tests

2014-06-23 Thread SK

I am using Spark 1.0.0. I am able to successfully run sbt package.
However, when I run sbt test or sbt test-only class,
I get the following error:

[error] error while loading root, zip file is empty
scala.reflect.internal.MissingRequirementError: object scala.runtime in
compiler mirror not found.

The stacktrace seems to indicate that the error is originating from the
scala runtime and not my program. I tried sbt-0.13.5 as well as sbt-0.13.2. 

I would like to know how to fix this error. Thanks for your help.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-running-unit-tests-tp8149.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Unit test failure: Address already in use

2014-06-17 Thread SK
Hi,

I have 3 unit tests (independent of each other) in the /src/test/scala
folder. When I run each of them individually using: sbt test-only test,
all the 3 pass the test. But when I run them all using sbt test, then they
fail with the warning below. I am wondering if the binding exception results
in failure to run the job, thereby causing the failure. If so, what can I do
to address this binding exception? I am running these tests locally on a
standalone machine (i.e. SparkContext(local, test)).


14/06/17 13:42:48 WARN component.AbstractLifeCycle: FAILED
org.eclipse.jetty.server.Server@3487b78d: java.net.BindException: Address
already in use
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind0(Native Method)
at sun.nio.ch.Net.bind(Net.java:174)
at
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:139)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:77)


thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Unit-test-failure-Address-already-in-use-tp7771.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Set comparison

2014-06-16 Thread SK
Hi,

I have a Spark method that returns RDD[String], which I am converting to a
set and then comparing it to the expected output as shown in the following
code. 

1. val expected_res = Set(ID1, ID2, ID3)  // expected output

2. val result:RDD[String] = getData(input)  //method returns RDD[String]
3. val set_val = result.collect().toSet // convert to set. 
4. println(set_val) // prints:  Set(ID1,
ID2, ID3)
5. println(expected_res)// prints:  Set(ID1, ID2,
ID3)

// verify output
6. if( set_val == expected_res)
7.println(true)  // this does not get printed

The value returned by the method is almost same as expected output, but the
verification is failing. I am not sure why the expected_res in Line 5 does
not print the quotes even though Line 1 has them. Could  that be the reason
the comparison is failing? What is the right way to do the above comparison?

thanks





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Set-comparison-tp7696.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Set comparison

2014-06-16 Thread SK
In Line 1, I have expected_res as a set of strings with quotes. So I thought
it would include the quotes during comparison.

Anyway I modified expected_res = Set(\ID1\, \ID2\, \ID3\) and
that seems to work.

thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Set-comparison-tp7696p7699.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: specifying fields for join()

2014-06-13 Thread SK
I used groupBy to create the keys for both RDDs. Then I did the join.

I think though it be useful if in the future Spark could allows us to
specify the fields on which to join, even when the keys are different.
Scalding allows this feature.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/specifying-fields-for-join-tp7528p7591.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


convert List to RDD

2014-06-13 Thread SK
Hi,

I have a List[ (String, Int, Int) ] that I would liek to convert to an RDD.
I tried to use sc.parallelize and sc.makeRDD, but in each case the original
order of items in the List gets modified. Is there a simple way to convert a
List to RDD without using SparkContext? 

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/convert-List-to-RDD-tp7606.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


  1   2   >