Rank for SQL and ORDER BY?

2015-01-09 Thread Kevin Burton
I’m trying to do simple graph sort in Spark which I mostly have working.

The one problem I have now is that I need to order them and then assign a
rank position.

So the top item should have rank 0, the next one should have rank 1, etc.

Hive and Pig support this with the RANK operator.

I *think* this is how I would do it with Hive.

SELECT target, COUNT(source) AS indegree, rank() OVER (ORDER BY indegree
DESC) AS rank FROM mygraph GROUP BY target ORDER BY indegree DESC

But that doesn’t seem to work.  What’s the easiest way to accomplish this
in Spark?

Any advice?

-- 

Founder/CEO Spinn3r.com
Location: *San Francisco, CA*
blog: http://burtonator.wordpress.com
… or check out my Google+ profile
https://plus.google.com/102718274791889610666/posts
http://spinn3r.com


Re: Discrepancy in PCA values

2015-01-09 Thread Upul Bandara
Hi Xiangrui,

Thanks for the reply.

Julia code is also using the covariance matrix:
(1/n)*X'*X ;

Thanks,
Upul

On Fri, Jan 9, 2015 at 2:11 AM, Xiangrui Meng men...@gmail.com wrote:

 The Julia code is computing the SVD of the Gram matrix. PCA should be
 applied to the covariance matrix. -Xiangrui

 On Thu, Jan 8, 2015 at 8:27 AM, Upul Bandara upulband...@gmail.com
 wrote:
  Hi All,
 
  I tried to do PCA for the Iris dataset
  [https://archive.ics.uci.edu/ml/datasets/Iris] using MLLib
  [http://spark.apache.org/docs/1.1.1/mllib-dimensionality-reduction.html
 ].
  Also, PCA  was calculated in Julia using following method:
 
  Sigma = (1/numRow(X))*X'*X ;
  [U, S, V] = svd(Sigma);
  Ureduced = U(:, 1:k);
  Z = X*Ureduced;
 
  However, I'm seeing a little difference between values given by MLLib and
  the method shown above .
 
  Does anyone have any idea about this difference?
 
  Additionally, I have attached two visualizations, related to two
 approaches.
 
  Thanks,
  Upul
 
 
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org



Re: Web Service + Spark

2015-01-09 Thread Corey Nolet
Cui Lin,

The solution largely depends on how you want your services deployed (Java
web container, Spray framework, etc...) and if you are using a cluster
manager like Yarn or Mesos vs. just firing up your own executors and master.

I recently worked on an example for deploying Spark services inside of
Jetty using Yarn as the cluster manager. It forced me to learn how Spark
wires up the dependencies/classpaths. If it helps, the example that
resulted from my tinkering is located at [1].


[1] https://github.com/calrissian/spark-jetty-server

On Fri, Jan 9, 2015 at 9:33 PM, Cui Lin cui@hds.com wrote:

  Hello, All,

  What’s the best practice on deploying/publishing spark-based scientific
 applications into a web service? Similar to Shiny on R.
  Thanks!

  Best regards,

  Cui Lin



RE: Implement customized Join for SparkSQL

2015-01-09 Thread Dai, Kevin
Hi,  Rishi

You are right. But the ids may be tens of thousands and B is a database with 
index for id,  which means querying by id is very fast.

In fact we load A and B as separate schemaRDDs as you suggested. But we hope we 
can extend the join implementation to achieve it in the parsing stage.

Best Regards,
Kevin

From: Rishi Yadav [mailto:ri...@infoobjects.com]
Sent: 2015年1月9日 6:52
To: Dai, Kevin
Cc: user@spark.apache.org
Subject: Re: Implement customized Join for SparkSQL

Hi Kevin,

Say A has 10 ids, so you are pulling data from B's data source only for these 
10 ids?

What if you load A and B as separate schemaRDDs and then do join. Spark will 
optimize the path anyway when action is fired .

On Mon, Jan 5, 2015 at 2:28 AM, Dai, Kevin 
yun...@ebay.commailto:yun...@ebay.com wrote:
Hi, All

Suppose I want to join two tables A and B as follows:

Select * from A join B on A.id = B.id

A is a file while B is a database which indexed by id and I wrapped it by Data 
source API.
The desired join flow is:

1.   Generate A’s RDD[Row]

2.   Generate B’s RDD[Row] from A by using A’s id and B’s data source api 
to get row from the database

3.   Merge these two RDDs to the final RDD[Row]

However it seems existing join strategy doesn’t support it?

Any way to achieve it?

Best Regards,
Kevin.



/tmp directory fills up

2015-01-09 Thread Alessandro Baretta
Gents,

I'm building spark using the current master branch and deploying in to
Google Compute Engine on top of Hadoop 2.4/YARN via bdutil, Google's Hadoop
cluster provisioning tool. bdutils configures Spark with

spark.local.dir=/hadoop/spark/tmp,

but this option is ignored in combination with YARN. Bdutils also
configures YARN with:

  property
nameyarn.nodemanager.local-dirs/name
value/mnt/pd1/hadoop/yarn/nm-local-dir/value
description
  Directories on the local machine in which to application temp files.
/description
  /property

This is the right directory for spark to store temporary data in. Still,
Spark is creating such directories as this:

/tmp/spark-51388ee6-9de6-411d-b9b9-ab6f9502d01e

and filling them up with gigabytes worth of output files, filling up the
very small root filesystem.

How can I diagnose why my Spark installation is not picking up the
yarn.nodemanager.local-dirs from yarn?

Alex


Re: DeepLearning and Spark ?

2015-01-09 Thread Peng Cheng
Not if broadcast can only be used between stages. To enable this you have
to at least make broadcast asynchronous  non-blocking.

On 9 January 2015 at 18:02, Krishna Sankar ksanka...@gmail.com wrote:

 I am also looking at this domain. We could potentially use the broadcast
 capability in Spark to distribute the parameters. Haven't thought thru yet.
 Cheers
 k/

 On Fri, Jan 9, 2015 at 2:56 PM, Andrei faithlessfri...@gmail.com wrote:

 Does it makes sense to use Spark's actor system (e.g. via
 SparkContext.env.actorSystem) to create parameter server?

 On Fri, Jan 9, 2015 at 10:09 PM, Peng Cheng rhw...@gmail.com wrote:

 You are not the first :) probably not the fifth to have the question.
 parameter server is not included in spark framework and I've seen all
 kinds of hacking to improvise it: REST api, HDFS, tachyon, etc.
 Not sure if an 'official' benchmark  implementation will be released
 soon

 On 9 January 2015 at 10:59, Marco Shaw marco.s...@gmail.com wrote:

 Pretty vague on details:


 http://www.datasciencecentral.com/m/blogpost?id=6448529%3ABlogPost%3A227199


 On Jan 9, 2015, at 11:39 AM, Jaonary Rabarisoa jaon...@gmail.com
 wrote:

 Hi all,

 DeepLearning algorithms are popular and achieve many state of the art
 performance in several real world machine learning problems. Currently
 there are no DL implementation in spark and I wonder if there is an ongoing
 work on this topics.

 We can do DL in spark Sparkling water and H2O but this adds an
 additional software stack.

 Deeplearning4j seems to implements a distributed version of many
 popural DL algorithm. Porting DL4j in Spark can be interesting.

 Google describes an implementation of a large scale DL in this paper
 http://research.google.com/archive/large_deep_networks_nips2012.html.
 Based on model parallelism and data parallelism.

 So, I'm trying to imaging what should be a good design for DL algorithm
 in Spark ? Spark already have RDD (for data parallelism). Can GraphX be
 used for the model parallelism (as DNN are generally designed as DAG) ? And
 what about using GPUs to do local parallelism (mecanism to push partition
 into GPU memory ) ?


 What do you think about this ?


 Cheers,

 Jao







OOM exception during row deserialization

2015-01-09 Thread Pala M Muthaia
Hi,

I am using Spark 1.0.1. I am trying to debug a OOM exception i saw during a
join step.

Basically, i have a RDD of rows, that i am joining with another RDD of
tuples.

Some of the tasks succeed but a fair number failed with OOM exception with
stack below. The stack belongs to the 'reducer' that is reading shuffle
output from the 'mapper'.

My question is what's the object being deserialized here - just a portion
of an RDD or the whole RDD partition assigned to current reducer? The rows
in the RDD could be large, but definitely not something that would run to
100s of MBs in size, and thus run out of memory.

Also, is there a way to determine size of the object being deserialized
that results in the error (either by looking at some staging hdfs dir or
logs)?

java.lang.OutOfMemoryError (java.lang.OutOfMemoryError: GC overhead
limit exceeded}
java.util.Arrays.copyOf(Arrays.java:2367)
java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130)
java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114)
java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:535)
java.lang.StringBuilder.append(StringBuilder.java:204)
java.io.ObjectInputStream$BlockDataInputStream.readUTFSpan(ObjectInputStream.java:3142)
java.io.ObjectInputStream$BlockDataInputStream.readUTFBody(ObjectInputStream.java:3050)
java.io.ObjectInputStream$BlockDataInputStream.readUTF(ObjectInputStream.java:2863)
java.io.ObjectInputStream.readString(ObjectInputStream.java:1636)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1339)
java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
java.util.ArrayList.readObject(ArrayList.java:771)
sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:606)
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1891)
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913)
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913)
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125)
org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
org.apache.spark.storage.BlockManager$LazyProxyIterator$1.hasNext(BlockManager.scala:1031)



Thanks,
pala


Web Service + Spark

2015-01-09 Thread Cui Lin
Hello, All,

What’s the best practice on deploying/publishing spark-based scientific 
applications into a web service? Similar to Shiny on R.
 Thanks!

Best regards,

Cui Lin


Re: OptionalDataException during Naive Bayes Training

2015-01-09 Thread Xiangrui Meng
How big is your data? Did you see other error messages from executors?
It seems to me like a shuffle communication error. This thread may be
relevant:

http://mail-archives.apache.org/mod_mbox/spark-user/201402.mbox/%3ccalrnvjuvtgae_ag1rqey_cod1nmrlfpesxgsb7g8r21h0bm...@mail.gmail.com%3E


-Xiangrui

On Fri, Jan 9, 2015 at 3:19 AM, jatinpreet jatinpr...@gmail.com wrote:
 Hi,

 I am using Spark Version 1.1 in standalone mode in the cluster. Sometimes,
 during Naive Baye's training, I get OptionalDataException at line,

 map at NaiveBayes.scala:109

 I am getting following exception on the console,

 java.io.OptionalDataException:
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1371)
 java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
 java.util.HashMap.readObject(HashMap.java:1394)
 sun.reflect.GeneratedMethodAccessor626.invoke(Unknown Source)

 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 java.lang.reflect.Method.invoke(Method.java:483)

 java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)

 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)

 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)

 What could be the reason behind this?

 Thanks



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/OptionalDataException-during-Naive-Bayes-Training-tp21059.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to use BigInteger for userId and productId in collaborative Filtering?

2015-01-09 Thread nishanthps
Hi,

The userId's and productId's in my data are bigInts, what is the best way to
run collaborative filtering on this data. Should I modify MLlib's
implementation to support more types? or is there an easy way.

Thanks!,
Nishanth



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-BigInteger-for-userId-and-productId-in-collaborative-Filtering-tp21072.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Issue writing to Cassandra from Spark

2015-01-09 Thread Ankur Srivastava
Hi,

We are currently using spark to join data in Cassandra and then write the
results back into Cassandra. While reads happen with out any error during
the writes we see many exceptions like below. Our environment details are:

- Spark v 1.1.0
- spark-cassandra-connector-java_2.10 v 1.1.0

We are using below settings for the writer

spark.cassandra.output.batch.size.rows=1

spark.cassandra.output.concurrent.writes=1

com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
tried for query failed (tried: [] - use getErrors() for details)

at
com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:108)

at com.datastax.driver.core.RequestHandler$1.run(RequestHandler.java:179)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)

Thanks

Ankur


Re: How to use BigInteger for userId and productId in collaborative Filtering?

2015-01-09 Thread Xiangrui Meng
Do you have more than 2 billion users/products? If not, you can pair
each user/product id with an integer (check RDD.zipWithUniqueId), use
them in ALS, and then join the original bigInt IDs back after
training. -Xiangrui

On Fri, Jan 9, 2015 at 5:12 PM, nishanthps nishant...@gmail.com wrote:
 Hi,

 The userId's and productId's in my data are bigInts, what is the best way to
 run collaborative filtering on this data. Should I modify MLlib's
 implementation to support more types? or is there an easy way.

 Thanks!,
 Nishanth



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-BigInteger-for-userId-and-productId-in-collaborative-Filtering-tp21072.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Web Service + Spark

2015-01-09 Thread gtinside
You can also look at Spark Job Server 
https://github.com/spark-jobserver/spark-jobserver

- Gaurav

 On Jan 9, 2015, at 10:25 PM, Corey Nolet cjno...@gmail.com wrote:
 
 Cui Lin,
 
 The solution largely depends on how you want your services deployed (Java web 
 container, Spray framework, etc...) and if you are using a cluster manager 
 like Yarn or Mesos vs. just firing up your own executors and master.
 
 I recently worked on an example for deploying Spark services inside of Jetty 
 using Yarn as the cluster manager. It forced me to learn how Spark wires up 
 the dependencies/classpaths. If it helps, the example that resulted from my 
 tinkering is located at [1].
 
 
 [1] https://github.com/calrissian/spark-jetty-server
 
 On Fri, Jan 9, 2015 at 9:33 PM, Cui Lin cui@hds.com wrote:
 Hello, All,
 
 What’s the best practice on deploying/publishing spark-based scientific 
 applications into a web service? Similar to Shiny on R.
  Thanks!
 
 Best regards,
 
 Cui Lin
 


Re: Discrepancy in PCA values

2015-01-09 Thread Xiangrui Meng
You need to subtract mean values to obtain the covariance matrix
(http://en.wikipedia.org/wiki/Covariance_matrix).

On Fri, Jan 9, 2015 at 6:41 PM, Upul Bandara upulband...@gmail.com wrote:
 Hi Xiangrui,

 Thanks for the reply.

 Julia code is also using the covariance matrix:
 (1/n)*X'*X ;

 Thanks,
 Upul

 On Fri, Jan 9, 2015 at 2:11 AM, Xiangrui Meng men...@gmail.com wrote:

 The Julia code is computing the SVD of the Gram matrix. PCA should be
 applied to the covariance matrix. -Xiangrui

 On Thu, Jan 8, 2015 at 8:27 AM, Upul Bandara upulband...@gmail.com
 wrote:
  Hi All,
 
  I tried to do PCA for the Iris dataset
  [https://archive.ics.uci.edu/ml/datasets/Iris] using MLLib
 
  [http://spark.apache.org/docs/1.1.1/mllib-dimensionality-reduction.html].
  Also, PCA  was calculated in Julia using following method:
 
  Sigma = (1/numRow(X))*X'*X ;
  [U, S, V] = svd(Sigma);
  Ureduced = U(:, 1:k);
  Z = X*Ureduced;
 
  However, I'm seeing a little difference between values given by MLLib
  and
  the method shown above .
 
  Does anyone have any idea about this difference?
 
  Additionally, I have attached two visualizations, related to two
  approaches.
 
  Thanks,
  Upul
 
 
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Accidental kill in UI

2015-01-09 Thread Nicholas Chammas
As Sean said, this definitely sounds like something worth a JIRA issue (and
PR).

On Fri Jan 09 2015 at 8:17:34 AM Sean Owen so...@cloudera.com wrote:

 (FWIW yes I think this should certainly be a POST. The link can become
 a miniature form to achieve this and then the endpoint just needs to
 accept POST only. You should propose a pull request.)

 On Fri, Jan 9, 2015 at 12:51 PM, Joe Wass jw...@crossref.org wrote:
  So I had a Spark job with various failures, and I decided to kill it and
  start again. I clicked the 'kill' link in the web console, restarted the
 job
  on the command line and headed back to the web console and refreshed to
 see
  how my job was doing... the URL at the time was:
 
  /stages/stage/kill?id=1terminate=true
 
  Which of course terminated the stage again. No loss, but if I'd waited a
 few
  hours before doing that, I would have lost data.
 
  I know to be careful next time, but isn't 'don't modify state as a
 result of
  a GET request' the first rule of HTTP? It could lead to an expensive
  mistake. Making this a POST would be a simple fix.
 
  Does anyone else think this is worth creating an issue for?
 
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: calculating the mean of SparseVector RDD

2015-01-09 Thread Xiangrui Meng
colStats() computes the mean values along with several other summary
statistics, which makes it slower. How is the performance if you don't
use kryo? -Xiangrui

On Fri, Jan 9, 2015 at 3:46 AM, Rok Roskar rokros...@gmail.com wrote:
 thanks for the suggestion -- however, looks like this is even slower. With
 the small data set I'm using, my aggregate function takes ~ 9 seconds and
 the colStats.mean() takes ~ 1 minute. However, I can't get it to run with
 the Kyro serializer -- I get the error:

 com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 5,
 required: 8

 is there an easy/obvious fix?


 On Wed, Jan 7, 2015 at 7:30 PM, Xiangrui Meng men...@gmail.com wrote:

 There is some serialization overhead. You can try

 https://github.com/apache/spark/blob/master/python/pyspark/mllib/stat.py#L107
 . -Xiangrui

 On Wed, Jan 7, 2015 at 9:42 AM, rok rokros...@gmail.com wrote:
  I have an RDD of SparseVectors and I'd like to calculate the means
  returning
  a dense vector. I've tried doing this with the following (using pyspark,
  spark v1.2.0):
 
  def aggregate_partition_values(vec1, vec2) :
  vec1[vec2.indices] += vec2.values
  return vec1
 
  def aggregate_combined_vectors(vec1, vec2) :
  if all(vec1 == vec2) :
  # then the vector came from only one partition
  return vec1
  else:
  return vec1 + vec2
 
  means = vals.aggregate(np.zeros(vec_len), aggregate_partition_values,
  aggregate_combined_vectors)
  means = means / nvals
 
  This turns out to be really slow -- and doesn't seem to depend on how
  many
  vectors there are so there seems to be some overhead somewhere that I'm
  not
  understanding. Is there a better way of doing this?
 
 
 
  --
  View this message in context:
  http://apache-spark-user-list.1001560.n3.nabble.com/calculating-the-mean-of-SparseVector-RDD-tp21019.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Submitting SparkContext and seeing driverPropsFetcher exception

2015-01-09 Thread Corey Nolet
I'm seeing this exception when creating a new SparkContext in YARN:

[ERROR] AssociationError [akka.tcp://sparkdri...@coreys-mbp.home:58243] -
[akka.tcp://driverpropsfetc...@coreys-mbp.home:58453]: Error [Shut down
address: akka.tcp://driverpropsfetc...@coreys-mbp.home:58453] [

akka.remote.ShutDownAssociation: Shut down address:
akka.tcp://driverpropsfetc...@coreys-mbp.home:58453

Caused by: akka.remote.transport.Transport$InvalidAssociationException: The
remote system terminated the association because it is shutting down.


Any ideas on what this could be?


Re: Zipping RDDs of equal size not possible

2015-01-09 Thread Xiangrui Meng
sample 2 * n tuples, split them into two parts, balance the sizes of
these parts by filtering some tuples out

How do you guarantee that the two RDDs have the same size?

-Xiangrui

On Fri, Jan 9, 2015 at 3:40 AM, Niklas Wilcke
1wil...@informatik.uni-hamburg.de wrote:
 Hi Spark community,

 I have a problem with zipping two RDDs of the same size and same number of
 partitions.
 The error message says that zipping is only allowed on RDDs which are
 partitioned into chunks of exactly the same sizes.
 How can I assure this? My workaround at the moment is to repartition both
 RDDs to only one partition but that obviously
 does not scale.

 This problem originates from my problem to draw n random tuple pairs (Tuple,
 Tuple) from an RDD[Tuple].
 What I do is to sample 2 * n tuples, split them into two parts, balance the
 sizes of these parts
 by filtering some tuples out and zipping them together.

 I would appreciate to read better approaches for both problems.

 Thanks in advance,
 Niklas

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re:Re: EventBatch and SparkFlumeProtocol not found in spark codebase?

2015-01-09 Thread Todd
Thanks  Sean.
I follow the guide, import the codebase into IntellijIdea as Maven project, 
with the profiles:hadoop2.4 and yarn.
In the maven project view, I run  Maven Install against the module: Spark 
Project Parent POM(root).After a pretty long time, all the modules are built 
successfully.
But when I run the LocalPi example, the compiling errors emerge,
1. EventBatch and SparkFlumeProtocol don't exist
2. There are a bunch of errors complaining q is not member of StringContext in 
CodeGenerator.scala
Then, I try by clicking the Generate Sources and Update Folders For All 
Projects, and repeat maven install...still success with compiling errors there

Sean, any guide on this?Thanks












At 2015-01-09 18:08:11, Sean Owen so...@cloudera.com wrote:
What's up with the IJ questions all of the sudden?

This PR from yesterday contains a summary of the answer to your question:
https://github.com/apache/spark/pull/3952 :

Rebuild Project can fail the first time the project is compiled,
because generate source files are not automatically generated. Try
clicking the Generate Sources and Update Folders For All Projects
button in the Maven Projects tool window to manually generate these
sources.

On Fri, Jan 9, 2015 at 10:03 AM, bit1...@163.com bit1...@163.com wrote:
 Hi,
 When I fetch the Spark code base and import into Intellj Idea as SBT
 project, then I build it with SBT, but there is compiling errors in the
 examples module,complaining that the EventBatch and SparkFlumeProtocol,looks
 they should be in
 org.apache.spark.streaming.flume.sink package.

 Not sure what happens.

 Thanks.




 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Questions about Spark and HDFS co-location

2015-01-09 Thread Sean Owen
Spark uses MapReduce InputFormat implementations to read data from
disk, so in that sense it has access to, and uses, the same locality
info that things like MR do. Yes, tasks go to the data, and you want
to run Spark on top of the HDFS DataNodes. (Locality isn't always the
only priority that determines where tasks are scheduled, but it
certainly matters.) I'm not qualified enough to explain it in more
detail, compared to others here.

On Fri, Jan 9, 2015 at 10:13 PM, zfry z...@palantir.com wrote:
 I am running Spark 1.1.1 built against CDH4 and have a few questions
 regarding Spark performance related to co-location with HDFS nodes.

 I want to know whether (and how efficiently) Spark takes advantage of being
 co-located with a HDFS node?

 What I mean by this is: if a file is being read by a Spark executor and that
 file (or most of its blocks) is located in a HDFS DataNode on the same
 machine as a Spark worker, will it read directly off of disk, or does that
 data have to travel through the network in some way? Is there a distinct
 advantage to putting HDFS and Spark on the same box if it is possible or,
 due to the way blocks are distributed about a cluster, are we so likely to
 be moving files over the network that co-location doesn’t really make that
 much of a difference?

 Also, do you know of any papers/books/other resources (other trying to dig
 through the spark code) which do a good job of explaining the Spark/HDFS
 data workflow (ie. how data moves from disk - HDFS - Spark - HDFS)?

 Thanks!
 Zach




 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Questions-about-Spark-and-HDFS-co-location-tp21070.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Questions about Spark and HDFS co-location

2015-01-09 Thread Ted Yu
I was looking for related information and found:
http://spark-summit.org/wp-content/uploads/2013/10/Spark-Ops-Final.pptx

See also http://hbase.apache.org/book.html#perf.hdfs.configs.localread for
how short circuit read is enabled.

Cheers

On Fri, Jan 9, 2015 at 3:50 PM, Sean Owen so...@cloudera.com wrote:

 Spark uses MapReduce InputFormat implementations to read data from
 disk, so in that sense it has access to, and uses, the same locality
 info that things like MR do. Yes, tasks go to the data, and you want
 to run Spark on top of the HDFS DataNodes. (Locality isn't always the
 only priority that determines where tasks are scheduled, but it
 certainly matters.) I'm not qualified enough to explain it in more
 detail, compared to others here.

 On Fri, Jan 9, 2015 at 10:13 PM, zfry z...@palantir.com wrote:
  I am running Spark 1.1.1 built against CDH4 and have a few questions
  regarding Spark performance related to co-location with HDFS nodes.
 
  I want to know whether (and how efficiently) Spark takes advantage of
 being
  co-located with a HDFS node?
 
  What I mean by this is: if a file is being read by a Spark executor and
 that
  file (or most of its blocks) is located in a HDFS DataNode on the same
  machine as a Spark worker, will it read directly off of disk, or does
 that
  data have to travel through the network in some way? Is there a distinct
  advantage to putting HDFS and Spark on the same box if it is possible or,
  due to the way blocks are distributed about a cluster, are we so likely
 to
  be moving files over the network that co-location doesn’t really make
 that
  much of a difference?
 
  Also, do you know of any papers/books/other resources (other trying to
 dig
  through the spark code) which do a good job of explaining the Spark/HDFS
  data workflow (ie. how data moves from disk - HDFS - Spark - HDFS)?
 
  Thanks!
  Zach
 
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Questions-about-Spark-and-HDFS-co-location-tp21070.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Questions about Spark and HDFS co-location

2015-01-09 Thread Andrew Ash
Note also for short circuit reads that early versions are actually
net-negative in performance.  Only after a second hadoop release of the
feature did it turn towards being a positive change.  See earlier threads
on this mailing list where short circuit reads are discussed.

On Fri, Jan 9, 2015 at 3:57 PM, Ted Yu yuzhih...@gmail.com wrote:

 I was looking for related information and found:
 http://spark-summit.org/wp-content/uploads/2013/10/Spark-Ops-Final.pptx

 See also http://hbase.apache.org/book.html#perf.hdfs.configs.localread
 for how short circuit read is enabled.

 Cheers

 On Fri, Jan 9, 2015 at 3:50 PM, Sean Owen so...@cloudera.com wrote:

 Spark uses MapReduce InputFormat implementations to read data from
 disk, so in that sense it has access to, and uses, the same locality
 info that things like MR do. Yes, tasks go to the data, and you want
 to run Spark on top of the HDFS DataNodes. (Locality isn't always the
 only priority that determines where tasks are scheduled, but it
 certainly matters.) I'm not qualified enough to explain it in more
 detail, compared to others here.

 On Fri, Jan 9, 2015 at 10:13 PM, zfry z...@palantir.com wrote:
  I am running Spark 1.1.1 built against CDH4 and have a few questions
  regarding Spark performance related to co-location with HDFS nodes.
 
  I want to know whether (and how efficiently) Spark takes advantage of
 being
  co-located with a HDFS node?
 
  What I mean by this is: if a file is being read by a Spark executor and
 that
  file (or most of its blocks) is located in a HDFS DataNode on the same
  machine as a Spark worker, will it read directly off of disk, or does
 that
  data have to travel through the network in some way? Is there a distinct
  advantage to putting HDFS and Spark on the same box if it is possible
 or,
  due to the way blocks are distributed about a cluster, are we so likely
 to
  be moving files over the network that co-location doesn’t really make
 that
  much of a difference?
 
  Also, do you know of any papers/books/other resources (other trying to
 dig
  through the spark code) which do a good job of explaining the Spark/HDFS
  data workflow (ie. how data moves from disk - HDFS - Spark - HDFS)?
 
  Thanks!
  Zach
 
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Questions-about-Spark-and-HDFS-co-location-tp21070.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





DeepLearning and Spark ?

2015-01-09 Thread Jaonary Rabarisoa
Hi all,

DeepLearning algorithms are popular and achieve many state of the art
performance in several real world machine learning problems. Currently
there are no DL implementation in spark and I wonder if there is an ongoing
work on this topics.

We can do DL in spark Sparkling water and H2O but this adds an additional
software stack.

Deeplearning4j seems to implements a distributed version of many popural DL
algorithm. Porting DL4j in Spark can be interesting.

Google describes an implementation of a large scale DL in this paper
http://research.google.com/archive/large_deep_networks_nips2012.html. Based
on model parallelism and data parallelism.

So, I'm trying to imaging what should be a good design for DL algorithm in
Spark ? Spark already have RDD (for data parallelism). Can GraphX be used
for the model parallelism (as DNN are generally designed as DAG) ? And what
about using GPUs to do local parallelism (mecanism to push partition into
GPU memory ) ?


What do you think about this ?


Cheers,

Jao


Re: DeepLearning and Spark ?

2015-01-09 Thread Marco Shaw
Pretty vague on details:

http://www.datasciencecentral.com/m/blogpost?id=6448529%3ABlogPost%3A227199


 On Jan 9, 2015, at 11:39 AM, Jaonary Rabarisoa jaon...@gmail.com wrote:
 
 Hi all,
 
 DeepLearning algorithms are popular and achieve many state of the art 
 performance in several real world machine learning problems. Currently there 
 are no DL implementation in spark and I wonder if there is an ongoing work on 
 this topics.
 
 We can do DL in spark Sparkling water and H2O but this adds an additional 
 software stack.
 
 Deeplearning4j seems to implements a distributed version of many popural DL 
 algorithm. Porting DL4j in Spark can be interesting.
 
 Google describes an implementation of a large scale DL in this paper 
 http://research.google.com/archive/large_deep_networks_nips2012.html. Based 
 on model parallelism and data parallelism.
 
 So, I'm trying to imaging what should be a good design for DL algorithm in 
 Spark ? Spark already have RDD (for data parallelism). Can GraphX be used for 
 the model parallelism (as DNN are generally designed as DAG) ? And what about 
 using GPUs to do local parallelism (mecanism to push partition into GPU 
 memory ) ? 
 
 
 What do you think about this ?
 
 
 Cheers,
 
 Jao
 


How to access OpenHashSet in my standalone program?

2015-01-09 Thread Tae-Hyuk Ahn
Hi,

I would like to use OpenHashSet
(org.apache.spark.util.collection.OpenHashSet) in my standalone program. I
can import it without error as:

import org.apache.spark.util.collection.OpenHashSet

However, when I try to access it, I am getting an error as:

object OpenHashSet in package collection cannot be accessed in package
org.apache.spark.util.collection

I suspect this error is caused by private object. I am wondering how I can
use this object in my standalone program.

Thanks,

Ted



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-access-OpenHashSet-in-my-standalone-program-tp21065.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Cleaning up spark.local.dir automatically

2015-01-09 Thread michael.england
Thanks, I imagine this will kill any cached RDDs if their files are beyond the 
ttl?

Thanks


From: Raghavendra Pandey [mailto:raghavendra.pan...@gmail.com]
Sent: 09 January 2015 15:29
To: England, Michael (IT/UK); user@spark.apache.org
Subject: Re: Cleaning up spark.local.dir automatically

You may like to look at spark.cleaner.ttl configuration which is infinite by 
default. Spark has that configuration to delete temp files time to time.
On Fri Jan 09 2015 at 8:34:10 PM 
michael.engl...@nomura.commailto:michael.engl...@nomura.com wrote:
Hi,

Is there a way of automatically cleaning up the spark.local.dir after a job has 
been run? I have noticed a large number of temporary files have been stored 
here and are not cleaned up. The only solution I can think of is to run some 
sort of cron job to delete files older than a few days. I am currently using a 
mixture of standalone and YARN spark builds.

Thanks,
Michael


This e-mail (including any attachments) is private and confidential, may 
contain proprietary or privileged information and is intended for the named 
recipient(s) only. Unintended recipients are strictly prohibited from taking 
action on the basis of information in this e-mail and must contact the sender 
immediately, delete this e-mail (and all attachments) and destroy any hard 
copies. Nomura will not accept responsibility or liability for the accuracy or 
completeness of, or the presence of any virus or disabling code in, this 
e-mail. If verification is sought please request a hard copy. Any reference to 
the terms of executed transactions should be treated as preliminary only and 
subject to formal written confirmation by Nomura. Nomura reserves the right to 
retain, monitor and intercept e-mail communications through its networks 
(subject to and in accordance with applicable laws). No confidentiality or 
privilege is waived or lost by Nomura by any mistransmission of this e-mail. 
Any reference to Nomura is a reference to any entity in the Nomura Holdings, 
Inc. group. Please read our Electronic Communications Legal Notice which forms 
part of this e-mail: http://www.Nomura.com/email_disclaimer.htm


This e-mail (including any attachments) is private and confidential, may 
contain proprietary or privileged information and is intended for the named 
recipient(s) only. Unintended recipients are strictly prohibited from taking 
action on the basis of information in this e-mail and must contact the sender 
immediately, delete this e-mail (and all attachments) and destroy any hard 
copies. Nomura will not accept responsibility or liability for the accuracy or 
completeness of, or the presence of any virus or disabling code in, this 
e-mail. If verification is sought please request a hard copy. Any reference to 
the terms of executed transactions should be treated as preliminary only and 
subject to formal written confirmation by Nomura. Nomura reserves the right to 
retain, monitor and intercept e-mail communications through its networks 
(subject to and in accordance with applicable laws). No confidentiality or 
privilege is waived or lost by Nomura by any mistransmission of this e-mail. 
Any reference to Nomura is a reference to any entity in the Nomura Holdings, 
Inc. group. Please read our Electronic Communications Legal Notice which forms 
part of this e-mail: http://www.Nomura.com/email_disclaimer.htm



Re: Cleaning up spark.local.dir automatically

2015-01-09 Thread Raghavendra Pandey
You may like to look at spark.cleaner.ttl configuration which is infinite
by default. Spark has that configuration to delete temp files time to time.

On Fri Jan 09 2015 at 8:34:10 PM michael.engl...@nomura.com wrote:

  Hi,



 Is there a way of automatically cleaning up the spark.local.dir after a
 job has been run? I have noticed a large number of temporary files have
 been stored here and are not cleaned up. The only solution I can think of
 is to run some sort of cron job to delete files older than a few days. I am
 currently using a mixture of standalone and YARN spark builds.



 Thanks,

 Michael



 This e-mail (including any attachments) is private and confidential, may
 contain proprietary or privileged information and is intended for the named
 recipient(s) only. Unintended recipients are strictly prohibited from
 taking action on the basis of information in this e-mail and must contact
 the sender immediately, delete this e-mail (and all attachments) and
 destroy any hard copies. Nomura will not accept responsibility or liability
 for the accuracy or completeness of, or the presence of any virus or
 disabling code in, this e-mail. If verification is sought please request a
 hard copy. Any reference to the terms of executed transactions should be
 treated as preliminary only and subject to formal written confirmation by
 Nomura. Nomura reserves the right to retain, monitor and intercept e-mail
 communications through its networks (subject to and in accordance with
 applicable laws). No confidentiality or privilege is waived or lost by
 Nomura by any mistransmission of this e-mail. Any reference to Nomura is
 a reference to any entity in the Nomura Holdings, Inc. group. Please read
 our Electronic Communications Legal Notice which forms part of this e-mail:
 http://www.Nomura.com/email_disclaimer.htm



Re: Set EXTRA_JAR environment variable for spark-jobserver

2015-01-09 Thread Sasi
Boris,

Yes, as you mentioned, we are creating a new SparkContext for our Job. The
reason being, to define Apache Cassandra connection using SparkConf. We
hope, this also should work.

For uploading JAR, we followed 
(1) Package JAR using *sbt package* command 
(2) Use *curl --data-binary
@target/scala-2.10/spark-jobserver-examples_2.10-1.0.0.jar
localhost:8090/jars/sparking* command to upload
as mentioned in https://github.com/fedragon/spark-jobserver-examples link.

We done some samples earlier for connecting Apache Cassandra to spark using
Scala language. Initially, we faced same exception as
*java.lang.NoClassDefFoundError* during class run and we overcome that using
*--jars {required JAR paths}* option during *spark-submit*. Finally, able to
run them as regular spark app successfully. So, we are sure of what has been
written for this spark-jobserver.

To give you some update, we prepared Uber JAR (an integrated JAR with all
depedencies) as Pankaj mentioned and now facing *SparkException: Job aborted
due to stage failure* for which we need to raise another post.

Thank you once again for your suggestions.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Set-EXTRA-JAR-environment-variable-for-spark-jobserver-tp20989p21054.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Spark History Server can't read event logs

2015-01-09 Thread michael.england
Hi Marcelo,

On MapR, the mapr user can read the files using the NFS mount, however using 
the normal hadoop fs -cat /... command, I get permission denied. As the history 
server is pointing to a location on mapfs, not the NFS mount, I'd imagine the 
Spark history server is trying to read the files using the hadoop api and 
therefore the permissions cause issues here.

Thanks,
Michael


-Original Message-
From: Marcelo Vanzin [mailto:van...@cloudera.com] 
Sent: 08 January 2015 19:23
To: England, Michael (IT/UK)
Cc: user@spark.apache.org
Subject: Re: Spark History Server can't read event logs

Sorry for the noise; but I just remembered you're actually using MapR (and not 
HDFS), so maybe the 3777 trick could work...

On Thu, Jan 8, 2015 at 10:32 AM, Marcelo Vanzin van...@cloudera.com wrote:
 Nevermind my last e-mail. HDFS complains about not understanding 3777...

 On Thu, Jan 8, 2015 at 9:46 AM, Marcelo Vanzin van...@cloudera.com wrote:
 Hmm. Can you set the permissions of /apps/spark/historyserver/logs
 to 3777? I'm not sure HDFS respects the group id bit, but it's worth 
 a try. (BTW that would only affect newly created log directories.)

 On Thu, Jan 8, 2015 at 1:22 AM,  michael.engl...@nomura.com wrote:
 Hi Vanzin,

 I am using the MapR distribution of Hadoop. The history server logs are 
 created by a job with the permissions:

 drwxrwx---   - myusername mygroup   2 2015-01-08 09:14 
 /apps/spark/historyserver/logs/spark-1420708455212

 However, the permissions of the higher directories are mapr:mapr and the 
 user that runs Spark in our case is a unix ID called mapr (in the mapr 
 group). Therefore, this can't read my job event logs as shown above.


 Thanks,
 Michael


 -Original Message-
 From: Marcelo Vanzin [mailto:van...@cloudera.com]
 Sent: 07 January 2015 18:10
 To: England, Michael (IT/UK)
 Cc: user@spark.apache.org
 Subject: Re: Spark History Server can't read event logs

 The Spark code generates the log directory with 770 permissions. On top 
 of that you need to make sure of two things:

 - all directories up to /apps/spark/historyserver/logs/ are readable 
 by the user running the history server
 - the user running the history server belongs to the group that owns 
 /apps/spark/historyserver/logs/

 I think the code could be more explicitly about setting the group of the 
 generated log directories and files, but if you follow the two rules above 
 things should work. Also, I recommend setting 
 /apps/spark/historyserver/logs/ itself to 1777 so that any user can 
 generate logs, but only the owner (or a superuser) can delete them.



 On Wed, Jan 7, 2015 at 7:45 AM,  michael.engl...@nomura.com wrote:
 Hi,



 When I run jobs and save the event logs, they are saved with the 
 permissions of the unix user and group that ran the spark job. The 
 history server is run as a service account and therefore can’t read the 
 files:



 Extract from the History server logs:



 2015-01-07 15:37:24,3021 ERROR Client
 fs/client/fileclient/cc/client.cc:1009
 Thread: 1183 User does not have access to open file
 /apps/spark/historyserver/logs/spark-1420644521194

 15/01/07 15:37:24 ERROR ReplayListenerBus: Exception in parsing 
 Spark event log
 /apps/spark/historyserver/logs/spark-1420644521194/EVENT_LOG_1

 org.apache.hadoop.security.AccessControlException: Open failed for file:
 /apps/spark/historyserver/logs/spark-1420644521194/EVENT_LOG_1, error:
 Permission denied (13)



 Is there a setting which I can change that allows the files to be 
 world readable or at least by the account running the history server?
 Currently, the job appears in the History Sever UI but only states ‘Not 
 Started’.



 Thanks,

 Michael


 This e-mail (including any attachments) is private and 
 confidential, may contain proprietary or privileged information and 
 is intended for the named
 recipient(s) only. Unintended recipients are strictly prohibited 
 from taking action on the basis of information in this e-mail and 
 must contact the sender immediately, delete this e-mail (and all
 attachments) and destroy any hard copies. Nomura will not accept 
 responsibility or liability for the accuracy or completeness of, or 
 the presence of any virus or disabling code in, this e-mail. If 
 verification is sought please request a hard copy. Any reference to 
 the terms of executed transactions should be treated as preliminary only 
 and subject to formal written confirmation by Nomura.
 Nomura reserves the right to retain, monitor and intercept e-mail 
 communications through its networks (subject to and in accordance 
 with applicable laws). No confidentiality or privilege is waived or 
 lost by Nomura by any mistransmission of this e-mail. Any reference 
 to Nomura is a reference to any entity in the Nomura Holdings, Inc.
 group. Please read our Electronic Communications Legal Notice which forms 
 part of this e-mail:
 http://www.Nomura.com/email_disclaimer.htm



 --
 Marcelo


 This e-mail 

Re: Did anyone tried overcommit of CPU cores?

2015-01-09 Thread Xuelin Cao
Thanks, but, how to increase the tasks per core?

For example, if the application claims 10 cores, is it possible to launch
100 tasks concurrently?



On Fri, Jan 9, 2015 at 2:57 PM, Jörn Franke jornfra...@gmail.com wrote:

 Hallo,

 Based on experiences with other software in virtualized environments I
 cannot really recommend this. However, I am not sure how Spark reacts. You
 may face unpredictable task failures depending on utilization, tasks
 connecting to external systems (databases etc.) may fail unexpectedly and
 this might be a problem for them (transactions not finishing etc.).

 Why not increase the tasks per core?

 Best regards
 Le 9 janv. 2015 06:46, Xuelin Cao xuelincao2...@gmail.com a écrit :


 Hi,

   I'm wondering whether it is a good idea to overcommit CPU cores on
 the spark cluster.

   For example, in our testing cluster, each worker machine has 24
 physical CPU cores. However, we are allowed to set the CPU core number to
 48 or more in the spark configuration file. As a result, we are allowed to
 launch more tasks than the number of physical CPU cores.

   The motivation of overcommit CPU cores is, for many times, a task
 cannot consume 100% resource of a single CPU core (due to I/O, shuffle,
 etc.).

   So, overcommit the CPU cores allows more tasks running at the same
 time, and makes the resource be used economically.

   But, is there any reason that we should not doing like this? Anyone
 tried this?

   [image: Inline image 1]





Re: SPARKonYARN failing on CDH 5.3.0 : container cannot be fetched because of NumberFormatException

2015-01-09 Thread Mukesh Jha
I am using pre built *spark-1.2.0-bin-hadoop2.4* from *[1] *to submit spark
applications to yarn, I cannot find the pre built spark for *CDH-5.x*
versions. So, In my case the org.apache.hadoop.yarn.util.ConverterUtils class
is coming from the spark-assembly-1.1.0-hadoop2.4.0.jar which is part of
the pre built spark and hence causing this issue.

How / where can I get spark 1.2.0 built for CDH-5.3.0, Icheck in maven repo
etc with no luck.

*[1]* https://spark.apache.org/downloads.html

On Fri, Jan 9, 2015 at 1:12 AM, Marcelo Vanzin van...@cloudera.com wrote:

 Just to add to Sandy's comment, check your client configuration
 (generally in /etc/spark/conf). If you're using CM, you may need to
 run the Deploy Client Configuration command on the cluster to update
 the configs to match the new version of CDH.

 On Thu, Jan 8, 2015 at 11:38 AM, Sandy Ryza sandy.r...@cloudera.com
 wrote:
  Hi Mukesh,
 
  Those line numbers in ConverterUtils in the stack trace don't appear to
 line
  up with CDH 5.3:
 
 https://github.com/cloudera/hadoop-common/blob/cdh5-2.5.0_5.3.0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java
 
  Is it possible you're still including the old jars on the classpath in
 some
  way?
 
  -Sandy
 
  On Thu, Jan 8, 2015 at 3:38 AM, Mukesh Jha me.mukesh@gmail.com
 wrote:
 
  Hi Experts,
 
  I am running spark inside YARN job.
 
  The spark-streaming job is running fine in CDH-5.0.0 but after the
 upgrade
  to 5.3.0 it cannot fetch containers with the below errors. Looks like
 the
  container id is incorrect and a string is present in a pace where it's
  expecting a number.
 
 
 
  java.lang.IllegalArgumentException: Invalid ContainerId:
  container_e01_1420481081140_0006_01_01
 
  Caused by: java.lang.NumberFormatException: For input string: e01
 
 
 
  Is this a bug?? Did you face something similar and any ideas how to fix
  this?
 
 
 
  15/01/08 09:50:28 INFO yarn.ApplicationMaster: Registered signal
 handlers
  for [TERM, HUP, INT]
 
  15/01/08 09:50:29 ERROR yarn.ApplicationMaster: Uncaught exception:
 
  java.lang.IllegalArgumentException: Invalid ContainerId:
  container_e01_1420481081140_0006_01_01
 
  at
 
 org.apache.hadoop.yarn.util.ConverterUtils.toContainerId(ConverterUtils.java:182)
 
  at
 
 org.apache.spark.deploy.yarn.YarnRMClientImpl.getAttemptId(YarnRMClientImpl.scala:79)
 
  at
 
 org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:79)
 
  at
 
 org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$main$1.apply$mcV$sp(ApplicationMaster.scala:515)
 
  at
 
 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:60)
 
  at
 
 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:59)
 
  at java.security.AccessController.doPrivileged(Native Method)
 
  at javax.security.auth.Subject.doAs(Subject.java:415)
 
  at
 
 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
 
  at
 
 org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:59)
 
  at
 
 org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:513)
 
  at
 
 org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)
 
  Caused by: java.lang.NumberFormatException: For input string: e01
 
  at
 
 java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
 
  at java.lang.Long.parseLong(Long.java:441)
 
  at java.lang.Long.parseLong(Long.java:483)
 
  at
 
 org.apache.hadoop.yarn.util.ConverterUtils.toApplicationAttemptId(ConverterUtils.java:137)
 
  at
 
 org.apache.hadoop.yarn.util.ConverterUtils.toContainerId(ConverterUtils.java:177)
 
  ... 11 more
 
  15/01/08 09:50:29 INFO yarn.ApplicationMaster: Final app status: FAILED,
  exitCode: 10, (reason: Uncaught exception: Invalid ContainerId:
  container_e01_1420481081140_0006_01_01)
 
 
  --
  Thanks  Regards,
 
  Mukesh Jha
 
 



 --
 Marcelo




-- 


Thanks  Regards,

*Mukesh Jha me.mukesh@gmail.com*


EventBatch and SparkFlumeProtocol not found in spark codebase?

2015-01-09 Thread bit1...@163.com
Hi, 
When I fetch the Spark code base and import into Intellj Idea as SBT project, 
then I build it with SBT, but there is compiling errors in the examples 
module,complaining that the EventBatch and SparkFlumeProtocol,looks they should 
be in 
org.apache.spark.streaming.flume.sink package.

Not sure what happens.

Thanks.

 







Is It Feasible for Spark 1.1 Broadcast to Fully Utilize the Ethernet Card Throughput?

2015-01-09 Thread Jun Yang
 Guys,

I have a question regarding to Spark 1.1 broadcast implementation.

In our pipeline, we have a large multi-class LR model, which is about 1GiB
size.
To employ the benefit of Spark parallelism, a natural thinking is to
broadcast this model file to the worker node.

However, it looks that broadcast performance is not quite good.

During the process of broadcasting the model file, I just monitor the
network card throughput of worker node, their
recv/write throughput is just around 30~40 MiB( our server box is equipped
with 100MiB ethernet card).

Is this the real limitation of Spark 1.1 broadcast implementation? Or there
may be some configuration or tricks
that can help make Spark broadcast perform better.

Thanks



-- 
yangjun...@gmail.com
http://hi.baidu.com/yjpro


Re: skipping header from each file

2015-01-09 Thread Sean Owen
I think this was already answered on stackoverflow:
http://stackoverflow.com/questions/27854919/skipping-header-file-from-each-csv-file-in-spark
where the one additional idea would be:


If there were just one header line, in the first record, then the most
efficient way to filter it out is:

rdd.mapPartitionsWithIndex { (idx, iter) = if (idx == 0) iter.drop(1)
else iter }

This doesn't help if of course there are many files with many header
lines inside. You can union 3 RDDs you make this way and union them.


On Fri, Jan 9, 2015 at 6:18 AM, Hafiz Mujadid hafizmujadi...@gmail.com wrote:
 Suppose I give three files paths to spark context to read and each file has
 schema in first row. how can we skip schema lines from headers


 val rdd=sc.textFile(file1,file2,file3);



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/skipping-header-from-each-file-tp21051.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: skipping header from each file

2015-01-09 Thread Somnath Pandeya
May be you can use wholeTextFiles method, which returns filename and content of 
the file as PariRDD and ,then you can remove the first line from files.



-Original Message-
From: Hafiz Mujadid [mailto:hafizmujadi...@gmail.com]
Sent: Friday, January 09, 2015 11:48 AM
To: user@spark.apache.org
Subject: skipping header from each file

Suppose I give three files paths to spark context to read and each file has 
schema in first row. how can we skip schema lines from headers


val rdd=sc.textFile(file1,file2,file3);



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/skipping-header-from-each-file-tp21051.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


 CAUTION - Disclaimer *
This e-mail contains PRIVILEGED AND CONFIDENTIAL INFORMATION intended solely
for the use of the addressee(s). If you are not the intended recipient, please
notify the sender by e-mail and delete the original message. Further, you are 
not
to copy, disclose, or distribute this e-mail or its contents to any other 
person and
any such actions are unlawful. This e-mail may contain viruses. Infosys has 
taken
every reasonable precaution to minimize this risk, but is not liable for any 
damage
you may sustain as a result of any virus in this e-mail. You should carry out 
your
own virus checks before opening the e-mail or attachment. Infosys reserves the
right to monitor and review the content of all messages sent to or from this 
e-mail
address. Messages sent to or from this e-mail address may be stored on the
Infosys e-mail system.
***INFOSYS End of Disclaimer INFOSYS***

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: java.io.IOException: Mkdirs failed to create file:/some/path/myapp.csv while using rdd.saveAsTextFile(fileAddress) Spark

2015-01-09 Thread firemonk9
I am facing same exception in saveAsObjectFile. Have you found any solution ?





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/java-io-IOException-Mkdirs-failed-to-create-file-some-path-myapp-csv-while-using-rdd-saveAsTextFile-k-tp20994p21066.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: TF-IDF from spark-1.1.0 not working on cluster mode

2015-01-09 Thread Xiangrui Meng
This is worker log, not executor log. The executor log can be found in
folders like /newdisk2/rta/rtauser/workerdir/app-20150109182514-0001/0/
. -Xiangrui

On Fri, Jan 9, 2015 at 5:03 AM, Priya Ch learnings.chitt...@gmail.com wrote:
 Please find the attached worker log.
  I could see stream closed exception

 On Wed, Jan 7, 2015 at 10:51 AM, Xiangrui Meng men...@gmail.com wrote:

 Could you attach the executor log? That may help identify the root
 cause. -Xiangrui

 On Mon, Jan 5, 2015 at 11:12 PM, Priya Ch learnings.chitt...@gmail.com
 wrote:
  Hi All,
 
  Word2Vec and TF-IDF algorithms in spark mllib-1.1.0 are working only in
  local mode and not on distributed mode. Null pointer exception has been
  thrown. Is this a bug in spark-1.1.0 ?
 
  Following is the code:
def main(args:Array[String])
{
   val conf=new SparkConf
   val sc=new SparkContext(conf)
   val
 
  documents=sc.textFile(hdfs://IMPETUS-DSRV02:9000/nlp/sampletext).map(_.split(
  ).toSeq)
   val hashingTF = new HashingTF()
   val tf= hashingTF.transform(documents)
   tf.cache()
  val idf = new IDF().fit(tf)
  val tfidf = idf.transform(tf)
   val rdd=tfidf.map { vec = println(vector is+vec)
  (10)
 }
   rdd.saveAsTextFile(/home/padma/usecase)
 
}
 
 
 
 
  Exception thrown:
 
  15/01/06 12:36:09 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0
  with
  2 tasks
  15/01/06 12:36:10 INFO cluster.SparkDeploySchedulerBackend: Registered
  executor:
 
  Actor[akka.tcp://sparkexecu...@impetus-dsrv05.impetus.co.in:33898/user/Executor#-1525890167]
  with ID 0
  15/01/06 12:36:10 INFO scheduler.TaskSetManager: Starting task 0.0 in
  stage
  0.0 (TID 0, IMPETUS-DSRV05.impetus.co.in, NODE_LOCAL, 1408 bytes)
  15/01/06 12:36:10 INFO scheduler.TaskSetManager: Starting task 1.0 in
  stage
  0.0 (TID 1, IMPETUS-DSRV05.impetus.co.in, NODE_LOCAL, 1408 bytes)
  15/01/06 12:36:10 INFO storage.BlockManagerMasterActor: Registering
  block
  manager IMPETUS-DSRV05.impetus.co.in:35130 with 2.1 GB RAM
  15/01/06 12:36:12 INFO network.ConnectionManager: Accepted connection
  from
  [IMPETUS-DSRV05.impetus.co.in/192.168.145.195:46888]
  15/01/06 12:36:12 INFO network.SendingConnection: Initiating connection
  to
  [IMPETUS-DSRV05.impetus.co.in/192.168.145.195:35130]
  15/01/06 12:36:12 INFO network.SendingConnection: Connected to
  [IMPETUS-DSRV05.impetus.co.in/192.168.145.195:35130], 1 messages pending
  15/01/06 12:36:12 INFO storage.BlockManagerInfo: Added
  broadcast_1_piece0 in
  memory on IMPETUS-DSRV05.impetus.co.in:35130 (size: 2.1 KB, free: 2.1
  GB)
  15/01/06 12:36:12 INFO storage.BlockManagerInfo: Added
  broadcast_0_piece0 in
  memory on IMPETUS-DSRV05.impetus.co.in:35130 (size: 10.1 KB, free: 2.1
  GB)
  15/01/06 12:36:13 INFO storage.BlockManagerInfo: Added rdd_3_1 in memory
  on
  IMPETUS-DSRV05.impetus.co.in:35130 (size: 280.0 B, free: 2.1 GB)
  15/01/06 12:36:13 INFO storage.BlockManagerInfo: Added rdd_3_0 in memory
  on
  IMPETUS-DSRV05.impetus.co.in:35130 (size: 416.0 B, free: 2.1 GB)
  15/01/06 12:36:13 WARN scheduler.TaskSetManager: Lost task 1.0 in stage
  0.0
  (TID 1, IMPETUS-DSRV05.impetus.co.in): java.lang.NullPointerException:
  org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)
  org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)
 
  org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
  org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
  org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 
  org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
  org.apache.spark.scheduler.Task.run(Task.scala:54)
 
  org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
 
 
  java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
 
 
  java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
  java.lang.Thread.run(Thread.java:722)
 
 
  Thanks,
  Padma Ch



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Is It Feasible for Spark 1.1 Broadcast to Fully Utilize the Ethernet Card Throughput?

2015-01-09 Thread Davies Liu
In the current implementation of TorrentBroadcast, the blocks are
fetched one-by-one
in single thread, so it can not fully utilize the network bandwidth.

Davies

On Fri, Jan 9, 2015 at 2:11 AM, Jun Yang yangjun...@gmail.com wrote:
 Guys,

 I have a question regarding to Spark 1.1 broadcast implementation.

 In our pipeline, we have a large multi-class LR model, which is about 1GiB
 size.
 To employ the benefit of Spark parallelism, a natural thinking is to
 broadcast this model file to the worker node.

 However, it looks that broadcast performance is not quite good.

 During the process of broadcasting the model file, I just monitor the
 network card throughput of worker node, their
 recv/write throughput is just around 30~40 MiB( our server box is equipped
 with 100MiB ethernet card).

 Is this the real limitation of Spark 1.1 broadcast implementation? Or there
 may be some configuration or tricks
 that can help make Spark broadcast perform better.

 Thanks



 --
 yangjun...@gmail.com
 http://hi.baidu.com/yjpro

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: DeepLearning and Spark ?

2015-01-09 Thread Peng Cheng
You are not the first :) probably not the fifth to have the question.
parameter server is not included in spark framework and I've seen all kinds
of hacking to improvise it: REST api, HDFS, tachyon, etc.
Not sure if an 'official' benchmark  implementation will be released soon

On 9 January 2015 at 10:59, Marco Shaw marco.s...@gmail.com wrote:

 Pretty vague on details:

 http://www.datasciencecentral.com/m/blogpost?id=6448529%3ABlogPost%3A227199


 On Jan 9, 2015, at 11:39 AM, Jaonary Rabarisoa jaon...@gmail.com wrote:

 Hi all,

 DeepLearning algorithms are popular and achieve many state of the art
 performance in several real world machine learning problems. Currently
 there are no DL implementation in spark and I wonder if there is an ongoing
 work on this topics.

 We can do DL in spark Sparkling water and H2O but this adds an additional
 software stack.

 Deeplearning4j seems to implements a distributed version of many popural
 DL algorithm. Porting DL4j in Spark can be interesting.

 Google describes an implementation of a large scale DL in this paper
 http://research.google.com/archive/large_deep_networks_nips2012.html.
 Based on model parallelism and data parallelism.

 So, I'm trying to imaging what should be a good design for DL algorithm in
 Spark ? Spark already have RDD (for data parallelism). Can GraphX be used
 for the model parallelism (as DNN are generally designed as DAG) ? And what
 about using GPUs to do local parallelism (mecanism to push partition into
 GPU memory ) ?


 What do you think about this ?


 Cheers,

 Jao




RE: RowMatrix.multiply() ?

2015-01-09 Thread Adrian Mocanu
I’m resurrecting this thread because I’m interested in doing transpose on a 
RowMatrix.
There is this other thread too: 
http://apache-spark-user-list.1001560.n3.nabble.com/Matrix-multiplication-in-spark-td12562.html
Which presents https://issues.apache.org/jira/browse/SPARK-3434 which is still 
in work at this time.
Is this the correct Jira issue for the transpose operation? ETA?

Thanks a lot!
-A

From: Reza Zadeh [mailto:r...@databricks.com]
Sent: October-15-14 1:48 PM
To: ll
Cc: u...@spark.incubator.apache.org
Subject: Re: RowMatrix.multiply() ?

Hi,
We are currently working on distributed matrix operations. Two RowMatrices 
cannot be currently multiplied together. Neither can be they be added. They 
functionality will be added soon.

You can of course achieve this yourself by using IndexedRowMatrix and doing one 
join per operation you requested.

Best,
Reza

On Wed, Oct 15, 2014 at 8:50 AM, ll 
duy.huynh@gmail.commailto:duy.huynh@gmail.com wrote:
hi.. it looks like RowMatrix.multiply() takes a local Matrix as a parameter
and returns the result as a distributed RowMatrix.

how do you perform this series of multiplications if A, B, C, and D are all
RowMatrix?

((A x B) x C) x D)

thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/RowMatrix-multiply-tp16509.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org



Re: Parquet compression codecs not applied

2015-01-09 Thread Michael Armbrust
This is a little confusing, but that code path is actually going through
hive.  So the spark sql configuration does not help.

Perhaps, try:
set parquet.compression=GZIP;

On Fri, Jan 9, 2015 at 2:41 AM, Ayoub benali.ayoub.i...@gmail.com wrote:

 Hello,

 I tried to save a table created via the hive context as a parquet file but
 whatever compression codec (uncompressed, snappy, gzip or lzo) I set via
 setConf like:

 setConf(spark.sql.parquet.compression.codec, gzip)

 the size of the generated files is the always the same, so it seems like
 spark context ignores the compression codec that I set.

 Here is a code sample applied via the spark shell:

 import org.apache.spark.sql.hive.HiveContext
 val hiveContext = new HiveContext(sc)

 hiveContext.sql(SET hive.exec.dynamic.partition = true)
 hiveContext.sql(SET hive.exec.dynamic.partition.mode = nonstrict)
 hiveContext.setConf(spark.sql.parquet.binaryAsString, true) // required
 to make data compatible with impala
 hiveContext.setConf(spark.sql.parquet.compression.codec, gzip)

 hiveContext.sql(create external table if not exists foo (bar STRING, ts
 INT) Partitioned by (year INT, month INT, day INT) STORED AS PARQUET
 Location 'hdfs://path/data/foo')

 hiveContext.sql(insert into table foo partition(year, month,day) select *,
 year(from_unixtime(ts)) as year, month(from_unixtime(ts)) as month,
 day(from_unixtime(ts)) as day from raw_foo)

 I tried that with spark 1.2 and 1.3 snapshot against hive 0.13
 and I also tried that with Impala on the same cluster which applied
 correctly the compression codecs.

 Does anyone know what could be the problem ?

 Thanks,
 Ayoub.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Parquet-compression-codecs-not-applied-tp21058.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark SQL: Storing AVRO Schema in Parquet

2015-01-09 Thread Jerry Lam
Hi Raghavendra,

This makes a lot of sense. Thank you.
The problem is that I'm using Spark SQL right now to generate the parquet
file.

What I think I need to do is to use Spark directly and transform all rows
from SchemaRDD to avro objects and supply it to use saveAsNewAPIHadoopFile
(from the PairRDD). From there, I can supply the avro schema to parquet via
AvroParquetOutputFormat.

It is not difficult just not as simple as I would like because SchemaRDD
can write to Parquet file using its schema and if I can supply the avro
schema to parquet, it save me the transformation step for avro objects.

I'm thinking of overriding the saveAsParquetFile method to allows me to
persist the avro schema inside parquet. Is this possible at all?

Best Regards,

Jerry


On Fri, Jan 9, 2015 at 2:05 AM, Raghavendra Pandey 
raghavendra.pan...@gmail.com wrote:

 I cam across this
 http://zenfractal.com/2013/08/21/a-powerful-big-data-trio/. You can take
 a look.


 On Fri Jan 09 2015 at 12:08:49 PM Raghavendra Pandey 
 raghavendra.pan...@gmail.com wrote:

 I have the similar kind of requirement where I want to push avro data
 into parquet. But it seems you have to do it on your own. There
 is parquet-mr project that uses hadoop to do so. I am trying to write a
 spark job to do similar kind of thing.

 On Fri, Jan 9, 2015 at 3:20 AM, Jerry Lam chiling...@gmail.com wrote:

 Hi spark users,

 I'm using spark SQL to create parquet files on HDFS. I would like to
 store the avro schema into the parquet meta so that non spark sql
 applications can marshall the data without avro schema using the avro
 parquet reader. Currently, schemaRDD.saveAsParquetFile does not allow to do
 that. Is there another API that allows me to do this?

 Best Regards,

 Jerry





Re: SparkSQL schemaRDD MapPartitions calls - performance issues - columnar formats?

2015-01-09 Thread Michael Armbrust
The other thing to note here is that Spark SQL defensively copies rows when
we switch into user code.  This probably explains the difference between 1
 2.

The difference between 1  3 is likely the cost of decompressing the column
buffers vs. accessing a bunch of uncompressed primitive objects.

On Fri, Jan 9, 2015 at 6:59 AM, Cheng Lian lian.cs@gmail.com wrote:

  Hey Nathan,

 Thanks for sharing, this is a very interesting post :) My comments are
 inlined below.

 Cheng

 On 1/7/15 11:53 AM, Nathan McCarthy wrote:

 Hi,

  I’m trying to use a combination of SparkSQL and ‘normal' Spark/Scala via
 rdd.mapPartitions(…). Using the latest release 1.2.0.

  Simple example; load up some sample data from parquet on HDFS (about
 380m rows, 10 columns) on a 7 node cluster.

val t = sqlC.parquetFile(/user/n/sales-tran12m.parquet”)
t.registerTempTable(test1”)
sqlC.cacheTable(test1”)

  Now lets do some operations on it; I want the total sales  quantities
 sold for each hour in the day so I choose 3 out of the 10 possible
 columns...

sqlC.sql(select Hour, sum(ItemQty), sum(Sales) from test1 group by
 Hour).collect().foreach(println)

  After the table has been 100% cached in memory, this takes around 11
 seconds.

  Lets do the same thing but via a MapPartitions call (this isn’t
 production ready code but gets the job done).

val try2 = sqlC.sql(select Hour, ItemQty, Sales from test1”)
   rddPC.mapPartitions { case hrs =
 val qtySum = new Array[Double](24)
 val salesSum = new Array[Double](24)

  for(r - hrs) {
   val hr = r.getInt(0)
   qtySum(hr) += r.getDouble(1)
   salesSum(hr) += r.getDouble(2)
 }
 (salesSum zip qtySum).zipWithIndex.map(_.swap).iterator
   }.reduceByKey((a,b) = (a._1 + b._1, a._2 +
 b._2)).collect().foreach(println)

 I believe the evil thing that makes this snippet much slower is the
 for-loop. According to my early benchmark done with Scala 2.9, for-loop can
 be orders of magnitude slower than a simple while-loop, especially when the
 body of the loop only does something as trivial as this case. The reason is
 that Scala for-loop is translated into corresponding
 foreach/map/flatMap/withFilter function calls. And that's exactly why Spark
 SQL tries to avoid for-loop or any other functional style code in critical
 paths (where every row is touched), we also uses reusable mutable row
 objects instead of the immutable version to improve performance. You may
 check HiveTableScan, ParquetTableScan, InMemoryColumnarTableScan etc. for
 reference. Also, the `sum` function calls in your SQL code are translated
 into `o.a.s.s.execution.Aggregate` operators, which also use imperative
 while-loop and reusable mutable rows.

 Another thing to notice is that the `hrs` iterator physically points to
 underlying in-memory columnar byte buffers, and the `for (r - hrs) { ...
 }` loop actually decompresses and extracts values from required byte
 buffers (this is the unwrapping processes you mentioned below).


  Now this takes around ~49 seconds… Even though test1 table is 100%
 cached. The number of partitions remains the same…

  Now if I create a simple RDD of a case class HourSum(hour: Int, qty:
 Double, sales: Double)

  Convert the SchemaRDD;
  val rdd = sqlC.sql(select * from test1).map{ r = HourSum(r.getInt(1),
 r.getDouble(7), r.getDouble(8)) }.cache()
  //cache all the data
 rdd.count()

  Then run basically the same MapPartitions query;

  rdd.mapPartitions { case hrs =
   val qtySum = new Array[Double](24)
   val salesSum = new Array[Double](24)

for(r - hrs) {
 val hr = r.hour
 qtySum(hr) += r.qty
 salesSum(hr) += r.sales
   }
   (salesSum zip qtySum).zipWithIndex.map(_.swap).iterator
 }.reduceByKey((a,b) = (a._1 + b._1, a._2 +
 b._2)).collect().foreach(println)

  This takes around 1.5 seconds! Albeit the memory footprint is much
 larger.

 I guess this 1.5 seconds doesn't include the time spent on caching the
 simple RDD? As I've explained above, in the first `mapPartitions` style
 snippet, columnar byte buffer unwrapping happens within the `mapPartitions`
 call. However, in this version, the unwrapping process happens when the
 `rdd.count()` action is performed. At that point, all values of all columns
 are extracted from underlying byte buffers, and the portion of data you
 need are then manually selected and transformed into the simple case class
 RDD via the `map` call.

 If you include time spent on caching the simple case class RDD, it should
 be even slower than the first `mapPartitions` version.


  My thinking is that because SparkSQL does store things in a columnar
 format, there is some unwrapping to be done out of the column array buffers
 which takes time and for some reason this just takes longer when I switch
 out to map partitions (maybe its unwrapping the entire row, even though I’m
 using just a subset of columns, or maybe there is some object
 creation/autoboxing going on when calling getInt or getDouble)…


Re: Failed to save RDD as text file to local file system

2015-01-09 Thread firemonk9
Have you found any resolution for this issue ?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Failed-to-save-RDD-as-text-file-to-local-file-system-tp21050p21067.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Failed to save RDD as text file to local file system

2015-01-09 Thread NingjunWang
No, do you have any idea?

Regards,

Ningjun Wang
Consulting Software Engineer
LexisNexis
121 Chanlon Road
New Providence, NJ 07974-1541

From: firemonk9 [via Apache Spark User List] 
[mailto:ml-node+s1001560n21067...@n3.nabble.com]
Sent: Friday, January 09, 2015 2:56 PM
To: Wang, Ningjun (LNG-NPV)
Subject: Re: Failed to save RDD as text file to local file system

Have you found any resolution for this issue ?

If you reply to this email, your message will be added to the discussion below:
http://apache-spark-user-list.1001560.n3.nabble.com/Failed-to-save-RDD-as-text-file-to-local-file-system-tp21050p21067.html
To unsubscribe from Failed to save RDD as text file to local file system, click 
herehttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=21050code=bmluZ2p1bi53YW5nQGxleGlzbmV4aXMuY29tfDIxMDUwfC0xNzk5Mzg3ODYz.
NAMLhttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Failed-to-save-RDD-as-text-file-to-local-file-system-tp21050p21068.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Parquet predicate pushdown troubles

2015-01-09 Thread Yana Kadiyska
I am running the following (connecting to an external Hive Metastore)

 /a/shark/spark/bin/spark-shell --master spark://ip:7077  --conf
*spark.sql.parquet.filterPushdown=true*

val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

and then ran two queries:

sqlContext.sql(select count(*) from table where partition='blah' )
andsqlContext.sql(select count(*) from table where partition='blah'
and epoch=1415561604)

​

According to the Input tab in the UI both scan about 140G of data which is
the size of my whole partition. So I have two questions --

1. is there a way to tell from the plan if a predicate pushdown is supposed
to happen?
I see this for the second query

res0: org.apache.spark.sql.SchemaRDD =
SchemaRDD[0] at RDD at SchemaRDD.scala:108
== Query Plan ==
== Physical Plan ==
Aggregate false, [], [Coalesce(SUM(PartialCount#49L),0) AS _c0#0L]
 Exchange SinglePartition
  Aggregate true, [], [COUNT(1) AS PartialCount#49L]
   OutputFaker []
Project []
 ParquetTableScan [epoch#139L], (ParquetRelation list of hdfs files

​
2. am I doing something obviously wrong that this is not working? (Im
guessing it's not woring because the input size for the second query shows
unchanged and the execution time is almost 2x as long)

thanks in advance for any insights


updateStateByKey: cleaning up state for keys not in current window

2015-01-09 Thread Simone Franzini
I know that in order to clean up the state for a key I have to return None
when I call updateStateByKey. However, as far as I understand,
updateStateByKey only gets called for new keys (i.e. keys in current
batch), not for all keys in the DStream.
So, how can I clear the state for those keys in this case? Or, in other
words, how can I clear the state for a key when Seq[V] is empty?


Simone Franzini, PhD

http://www.linkedin.com/in/simonefranzini


Re: RowMatrix.multiply() ?

2015-01-09 Thread Reza Zadeh
Yes that is the correct JIRA. It should make it to 1.3.
Best,
Reza

On Fri, Jan 9, 2015 at 11:13 AM, Adrian Mocanu amoc...@verticalscope.com
wrote:

  I’m resurrecting this thread because I’m interested in doing transpose
 on a RowMatrix.

 There is this other thread too:
 http://apache-spark-user-list.1001560.n3.nabble.com/Matrix-multiplication-in-spark-td12562.html

 Which presents https://issues.apache.org/jira/browse/SPARK-3434 which is
 still in work at this time.

 Is this the correct Jira issue for the transpose operation? ETA?



 Thanks a lot!

 -A



 *From:* Reza Zadeh [mailto:r...@databricks.com]
 *Sent:* October-15-14 1:48 PM
 *To:* ll
 *Cc:* u...@spark.incubator.apache.org
 *Subject:* Re: RowMatrix.multiply() ?



 Hi,

 We are currently working on distributed matrix operations. Two RowMatrices
 cannot be currently multiplied together. Neither can be they be added. They
 functionality will be added soon.



 You can of course achieve this yourself by using IndexedRowMatrix and
 doing one join per operation you requested.



 Best,

 Reza



 On Wed, Oct 15, 2014 at 8:50 AM, ll duy.huynh@gmail.com wrote:

 hi.. it looks like RowMatrix.multiply() takes a local Matrix as a parameter
 and returns the result as a distributed RowMatrix.

 how do you perform this series of multiplications if A, B, C, and D are all
 RowMatrix?

 ((A x B) x C) x D)

 thanks!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/RowMatrix-multiply-tp16509.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Questions about Spark and HDFS co-location

2015-01-09 Thread zfry
I am running Spark 1.1.1 built against CDH4 and have a few questions
regarding Spark performance related to co-location with HDFS nodes. 

I want to know whether (and how efficiently) Spark takes advantage of being
co-located with a HDFS node? 
  
What I mean by this is: if a file is being read by a Spark executor and that
file (or most of its blocks) is located in a HDFS DataNode on the same
machine as a Spark worker, will it read directly off of disk, or does that
data have to travel through the network in some way? Is there a distinct
advantage to putting HDFS and Spark on the same box if it is possible or,
due to the way blocks are distributed about a cluster, are we so likely to
be moving files over the network that co-location doesn’t really make that
much of a difference? 
  
Also, do you know of any papers/books/other resources (other trying to dig
through the spark code) which do a good job of explaining the Spark/HDFS
data workflow (ie. how data moves from disk - HDFS - Spark - HDFS)? 

Thanks! 
Zach




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Questions-about-Spark-and-HDFS-co-location-tp21070.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Reading HBase data - Exception

2015-01-09 Thread lmsiva
This problem got resolved. In spark-submit I was using only
--driver-class-path option, once i added --jars option so that the workers
are aware of the dependant jar files, the problem went away. Need to check
if there are any worker logs that gives better information than the
exception I was getting.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Reading-HBase-data-Exception-tp21009p21071.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Cleaning up spark.local.dir automatically

2015-01-09 Thread Andrew Ash
That's a worker setting which cleans up the files left behind by executors,
so spark.cleaner.ttl isn't at the RDD level.  After
https://issues.apache.org/jira/browse/SPARK-1860 the cleaner won't clean up
directories left by running executors.


On Fri, Jan 9, 2015 at 7:38 AM, michael.engl...@nomura.com wrote:

  Thanks, I imagine this will kill any cached RDDs if their files are
 beyond the ttl?



 Thanks





 *From:* Raghavendra Pandey [mailto:raghavendra.pan...@gmail.com]
 *Sent:* 09 January 2015 15:29
 *To:* England, Michael (IT/UK); user@spark.apache.org
 *Subject:* Re: Cleaning up spark.local.dir automatically



 You may like to look at spark.cleaner.ttl configuration which is infinite
 by default. Spark has that configuration to delete temp files time to time.

 On Fri Jan 09 2015 at 8:34:10 PM michael.engl...@nomura.com wrote:

 Hi,



 Is there a way of automatically cleaning up the spark.local.dir after a
 job has been run? I have noticed a large number of temporary files have
 been stored here and are not cleaned up. The only solution I can think of
 is to run some sort of cron job to delete files older than a few days. I am
 currently using a mixture of standalone and YARN spark builds.



 Thanks,

 Michael





 This e-mail (including any attachments) is private and confidential, may
 contain proprietary or privileged information and is intended for the named
 recipient(s) only. Unintended recipients are strictly prohibited from
 taking action on the basis of information in this e-mail and must contact
 the sender immediately, delete this e-mail (and all attachments) and
 destroy any hard copies. Nomura will not accept responsibility or liability
 for the accuracy or completeness of, or the presence of any virus or
 disabling code in, this e-mail. If verification is sought please request a
 hard copy. Any reference to the terms of executed transactions should be
 treated as preliminary only and subject to formal written confirmation by
 Nomura. Nomura reserves the right to retain, monitor and intercept e-mail
 communications through its networks (subject to and in accordance with
 applicable laws). No confidentiality or privilege is waived or lost by
 Nomura by any mistransmission of this e-mail. Any reference to Nomura is
 a reference to any entity in the Nomura Holdings, Inc. group. Please read
 our Electronic Communications Legal Notice which forms part of this e-mail:
 http://www.Nomura.com/email_disclaimer.htm

 This e-mail (including any attachments) is private and confidential, may
 contain proprietary or privileged information and is intended for the named
 recipient(s) only. Unintended recipients are strictly prohibited from
 taking action on the basis of information in this e-mail and must contact
 the sender immediately, delete this e-mail (and all attachments) and
 destroy any hard copies. Nomura will not accept responsibility or liability
 for the accuracy or completeness of, or the presence of any virus or
 disabling code in, this e-mail. If verification is sought please request a
 hard copy. Any reference to the terms of executed transactions should be
 treated as preliminary only and subject to formal written confirmation by
 Nomura. Nomura reserves the right to retain, monitor and intercept e-mail
 communications through its networks (subject to and in accordance with
 applicable laws). No confidentiality or privilege is waived or lost by
 Nomura by any mistransmission of this e-mail. Any reference to Nomura is
 a reference to any entity in the Nomura Holdings, Inc. group. Please read
 our Electronic Communications Legal Notice which forms part of this e-mail:
 http://www.Nomura.com/email_disclaimer.htm



Re: DeepLearning and Spark ?

2015-01-09 Thread Andrei
Does it makes sense to use Spark's actor system (e.g. via
SparkContext.env.actorSystem) to create parameter server?

On Fri, Jan 9, 2015 at 10:09 PM, Peng Cheng rhw...@gmail.com wrote:

 You are not the first :) probably not the fifth to have the question.
 parameter server is not included in spark framework and I've seen all
 kinds of hacking to improvise it: REST api, HDFS, tachyon, etc.
 Not sure if an 'official' benchmark  implementation will be released soon

 On 9 January 2015 at 10:59, Marco Shaw marco.s...@gmail.com wrote:

 Pretty vague on details:


 http://www.datasciencecentral.com/m/blogpost?id=6448529%3ABlogPost%3A227199


 On Jan 9, 2015, at 11:39 AM, Jaonary Rabarisoa jaon...@gmail.com wrote:

 Hi all,

 DeepLearning algorithms are popular and achieve many state of the art
 performance in several real world machine learning problems. Currently
 there are no DL implementation in spark and I wonder if there is an ongoing
 work on this topics.

 We can do DL in spark Sparkling water and H2O but this adds an additional
 software stack.

 Deeplearning4j seems to implements a distributed version of many popural
 DL algorithm. Porting DL4j in Spark can be interesting.

 Google describes an implementation of a large scale DL in this paper
 http://research.google.com/archive/large_deep_networks_nips2012.html.
 Based on model parallelism and data parallelism.

 So, I'm trying to imaging what should be a good design for DL algorithm
 in Spark ? Spark already have RDD (for data parallelism). Can GraphX be
 used for the model parallelism (as DNN are generally designed as DAG) ? And
 what about using GPUs to do local parallelism (mecanism to push partition
 into GPU memory ) ?


 What do you think about this ?


 Cheers,

 Jao





Re: DeepLearning and Spark ?

2015-01-09 Thread Krishna Sankar
I am also looking at this domain. We could potentially use the broadcast
capability in Spark to distribute the parameters. Haven't thought thru yet.
Cheers
k/

On Fri, Jan 9, 2015 at 2:56 PM, Andrei faithlessfri...@gmail.com wrote:

 Does it makes sense to use Spark's actor system (e.g. via
 SparkContext.env.actorSystem) to create parameter server?

 On Fri, Jan 9, 2015 at 10:09 PM, Peng Cheng rhw...@gmail.com wrote:

 You are not the first :) probably not the fifth to have the question.
 parameter server is not included in spark framework and I've seen all
 kinds of hacking to improvise it: REST api, HDFS, tachyon, etc.
 Not sure if an 'official' benchmark  implementation will be released soon

 On 9 January 2015 at 10:59, Marco Shaw marco.s...@gmail.com wrote:

 Pretty vague on details:


 http://www.datasciencecentral.com/m/blogpost?id=6448529%3ABlogPost%3A227199


 On Jan 9, 2015, at 11:39 AM, Jaonary Rabarisoa jaon...@gmail.com
 wrote:

 Hi all,

 DeepLearning algorithms are popular and achieve many state of the art
 performance in several real world machine learning problems. Currently
 there are no DL implementation in spark and I wonder if there is an ongoing
 work on this topics.

 We can do DL in spark Sparkling water and H2O but this adds an
 additional software stack.

 Deeplearning4j seems to implements a distributed version of many popural
 DL algorithm. Porting DL4j in Spark can be interesting.

 Google describes an implementation of a large scale DL in this paper
 http://research.google.com/archive/large_deep_networks_nips2012.html.
 Based on model parallelism and data parallelism.

 So, I'm trying to imaging what should be a good design for DL algorithm
 in Spark ? Spark already have RDD (for data parallelism). Can GraphX be
 used for the model parallelism (as DNN are generally designed as DAG) ? And
 what about using GPUs to do local parallelism (mecanism to push partition
 into GPU memory ) ?


 What do you think about this ?


 Cheers,

 Jao






Re: RDD Moving Average

2015-01-09 Thread Mohit Jaggi
Read this:
http://mail-archives.apache.org/mod_mbox/spark-user/201405.mbox/%3ccalrvtpkn65rolzbetc+ddk4o+yjm+tfaf5dz8eucpl-2yhy...@mail.gmail.com%3E
 
http://mail-archives.apache.org/mod_mbox/spark-user/201405.mbox/%3ccalrvtpkn65rolzbetc+ddk4o+yjm+tfaf5dz8eucpl-2yhy...@mail.gmail.com%3E





Streaming Checkpointing

2015-01-09 Thread Asim Jalis
In Spark Streaming apps if I enable ssc.checkpoint(dir) does this
checkpoint all RDDs? Or is it just checkpointing windowing and state RDDs?

For example, if in a DStream I am using an iterative algorithm on a
non-state non-window RDD, do I have to checkpoint it explicitly myself, or
can I assume that ssc.checkpoint has taken care of checkpointing it?


Re: EventBatch and SparkFlumeProtocol not found in spark codebase?

2015-01-09 Thread Sean Owen
What's up with the IJ questions all of the sudden?

This PR from yesterday contains a summary of the answer to your question:
https://github.com/apache/spark/pull/3952 :

Rebuild Project can fail the first time the project is compiled,
because generate source files are not automatically generated. Try
clicking the Generate Sources and Update Folders For All Projects
button in the Maven Projects tool window to manually generate these
sources.

On Fri, Jan 9, 2015 at 10:03 AM, bit1...@163.com bit1...@163.com wrote:
 Hi,
 When I fetch the Spark code base and import into Intellj Idea as SBT
 project, then I build it with SBT, but there is compiling errors in the
 examples module,complaining that the EventBatch and SparkFlumeProtocol,looks
 they should be in
 org.apache.spark.streaming.flume.sink package.

 Not sure what happens.

 Thanks.




 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Is It Feasible for Spark 1.1 Broadcast to Fully Utilize the Ethernet Card Throughput?

2015-01-09 Thread Akhil Das
​You can try the following:

- Increase ​spark.akka.frameSize (default is 10MB)
- Try using torrentBroadcast

Thanks
Best Regards

On Fri, Jan 9, 2015 at 3:41 PM, Jun Yang yangjun...@gmail.com wrote:

 Guys,

 I have a question regarding to Spark 1.1 broadcast implementation.

 In our pipeline, we have a large multi-class LR model, which is about 1GiB
 size.
 To employ the benefit of Spark parallelism, a natural thinking is to
 broadcast this model file to the worker node.

 However, it looks that broadcast performance is not quite good.

 During the process of broadcasting the model file, I just monitor the
 network card throughput of worker node, their
 recv/write throughput is just around 30~40 MiB( our server box is equipped
 with 100MiB ethernet card).

 Is this the real limitation of Spark 1.1 broadcast implementation? Or
 there may be some configuration or tricks
 that can help make Spark broadcast perform better.

 Thanks



 --
 yangjun...@gmail.com
 http://hi.baidu.com/yjpro



KryoDeserialization getting java.io.EOFException

2015-01-09 Thread yuemeng1
hi,when i run a query in spark sql ,there give me follow error,what's 
processible reason can  casuse this problem


ava.io.EOFException
at 
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:148)
at 
org.apache.spark.sql.hbase.HBasePartitioner$$anonfun$readObject$1.apply(HBasePartitioner.scala:100)
at 
org.apache.spark.sql.hbase.HBasePartitioner$$anonfun$readObject$1.apply(HBasePartitioner.scala:99)
at 
org.apache.spark.util.Utils$.deserializeViaNestedStream(Utils.scala:126)
at 
org.apache.spark.sql.hbase.HBasePartitioner.readObject(HBasePartitioner.scala:99)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:60)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
15/01/10 01:50:25 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1)
java.io.EOFException
at 
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:148)
at 
org.apache.spark.sql.hbase.HBasePartitioner$$anonfun$readObject$1.apply(HBasePartitioner.scala:100)
at 
org.apache.spark.sql.hbase.HBasePartitioner$$anonfun$readObject$1.apply(HBasePartitioner.scala:99)
at 
org.apache.spark.util.Utils$.deserializeViaNestedStream(Utils.scala:126)
at 
org.apache.spark.sql.hbase.HBasePartitioner.readObject(HBasePartitioner.scala:99)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
at 

OptionalDataException during Naive Bayes Training

2015-01-09 Thread jatinpreet
Hi,

I am using Spark Version 1.1 in standalone mode in the cluster. Sometimes,
during Naive Baye's training, I get OptionalDataException at line, 

map at NaiveBayes.scala:109

I am getting following exception on the console,

java.io.OptionalDataException: 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1371)
java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
java.util.HashMap.readObject(HashMap.java:1394)
sun.reflect.GeneratedMethodAccessor626.invoke(Unknown Source)
   
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:483)
   
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
   
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)
   
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)

What could be the reason behind this?

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/OptionalDataException-during-Naive-Bayes-Training-tp21059.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Queue independent jobs

2015-01-09 Thread Sean Owen
You can parallelize on the driver side. The way to do it is almost
exactly what you have here, where you're iterating over a local Scala
collection of dates and invoking a Spark operation for each. Simply
write dateList.par.map(...) to make the local map proceed in
parallel. It should invoke the Spark jobs simultaneously.

On Fri, Jan 9, 2015 at 10:46 AM, Anders Arpteg arp...@spotify.com wrote:
 Hey,

 Lets say we have multiple independent jobs that each transform some data and
 store in distinct hdfs locations, is there a nice way to run them in
 parallel? See the following pseudo code snippet:

 dateList.map(date =
 sc.hdfsFile(date).map(transform).saveAsHadoopFile(date))

 It's unfortunate if they run in sequence, since all the executors are not
 used efficiently. What's the best way to parallelize execution of these
 jobs?

 Thanks,
 Anders

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SPARKonYARN failing on CDH 5.3.0 : container cannot be fetched because of NumberFormatException

2015-01-09 Thread Sean Owen
Again this is probably not the place for CDH-specific questions, and
this one is already answered at
http://community.cloudera.com/t5/Advanced-Analytics-Apache-Spark/CDH-5-3-0-container-cannot-be-fetched-because-of/m-p/23497#M478

On Fri, Jan 9, 2015 at 9:23 AM, Mukesh Jha me.mukesh@gmail.com wrote:
 I am using pre built spark-1.2.0-bin-hadoop2.4 from [1] to submit spark
 applications to yarn, I cannot find the pre built spark for CDH-5.x
 versions. So, In my case the org.apache.hadoop.yarn.util.ConverterUtils
 class is coming from the spark-assembly-1.1.0-hadoop2.4.0.jar which is part
 of the pre built spark and hence causing this issue.

 How / where can I get spark 1.2.0 built for CDH-5.3.0, Icheck in maven repo
 etc with no luck.

 [1] https://spark.apache.org/downloads.html

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Parallel execution on one node

2015-01-09 Thread Sean Owen
The repartitioning has some overhead. Do your results show that
perhaps this is what is taking the extra time? You can try reading the
source file with more partitions instead of partitioning it after the
fact. See the minPartitions argument to loadLibSVMFile.

But before that, I'd assess whether the core computation here is even
taking much time. If it's already a small fraction of the total
runtime, speeding it up 16x doesn't do much.

On Fri, Jan 9, 2015 at 6:57 AM, mikens msmel...@gmail.com wrote:
 Hello,
 I am new to Spark. I have adapted an example code to do binary
 classification using logistic regression. I tried it on rcv1_train.binary
 dataset using LBFGS.runLBFGS solver, and obtained correct loss.

 Now, I'd like to run code in parallel across 16 cores of my single CPU
 socket. If I understand correctly, parallelism in Spark is achieved by
 partitioning dataset into some number of partitions, approximately 3-4 times
 the amount of cores in the system.  To partition the data, I am calling
 data.repartition(npart), where npart is number of partitions (16*4=64 in my
 case).

 I run the code as follows:
 spark-submit --master local[16] --class logreg
 target/scala-2.10/logistic-regression_2.10-1.0.2.jar  72

 However, I do not observe any speedup compared to when I just use one
 partition. I would much appreciate your help understanding what I am doing
 wrong and why I am not seeing any speedup due to 16 cores. Please find my
 code below.

 Best,
 Mike

 *CODE*
 object logreg {
   def main(args: Array[String]) {

 val conf = new SparkConf().setAppName(logreg)
 val sc = new SparkContext(conf)
 val npart=args(0).toInt;
 val data_ = MLUtils.loadLibSVMFile(sc,
 rcv1_train.binary.0label).cache()
 val data=data_.repartition(npart); // partition dataset in npart
 partitions
 val lambda=(1.0/data.count())
 val splits = data.randomSplit(Array(1.0, 0.0), seed = 11L)
 val training = splits(0).map(x = (x.label,
 MLUtils.appendBias(x.features))).cache()
 val numFeatures = data.take(1)(0).features.size
 val start = System.currentTimeMillis
 val initialWeightsWithIntercept = Vectors.dense(new
 Array[Double](numFeatures + 1))
 val (weightsWithIntercept, loss) = LBFGS.runLBFGS(training,
   new
 LogisticGradient(),
   new
 SquaredL2Updater(),
   10,
   1e-14,
   100,
   lambda,

 initialWeightsWithIntercept)
 val took = (System.currentTimeMillis - start)/1000.0;
 println(LBFGS.runLBFGS:  +  took + s)
 sc.stop()
   }
 }





 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Parallel-execution-on-one-node-tp21052.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: PermGen issues on AWS

2015-01-09 Thread Joe Wass
Thanks, I noticed this after posting. I'll try that.
I also think that perhaps Clojure might be creating more classes than the
equivalent Java would, so I'll nudge it a bit higher.

On 9 January 2015 at 11:45, Sean Owen so...@cloudera.com wrote:

 It's normal for PermGen to be a bit more of an issue with Spark than
 for other JVM-based applications. You should simply increase the
 PermGen size, which I don't see in your command. -XX:MaxPermSize=256m
 allows it to grow to 256m for example. The right size depends on your
 total heap size and app.

 Also, Java 8 no longer has a permanent generation, so this particular
 type of problem and tuning is not needed. You might consider running
 on Java 8.

 On Fri, Jan 9, 2015 at 10:38 AM, Joe Wass jw...@crossref.org wrote:
  I'm running on an AWS cluster of 10 x m1.large (64 bit, 7.5 GiB RAM).
 FWIW
  I'm using the Flambo Clojure wrapper which uses the Java API but I don't
  think that should make any difference. I'm running with the following
  command:
 
  spark/bin/spark-submit --class mything.core --name My Thing --conf
  spark.yarn.executor.memoryOverhead=4096 --conf
  spark.executor.extraJavaOptions=-XX:+CMSClassUnloadingEnabled
  -XX:+CMSPermGenSweepingEnabled /root/spark/code/myjar.jar
 
  For one of the stages I'm getting errors:
 
   - ExecutorLostFailure (executor lost)
   - Resubmitted (resubmitted due to lost executor)
 
  And I think they're caused by slave executor JVMs dying up with this
 error:
 
  java.lang.OutOfMemoryError: PermGen space
  java.lang.Class.getDeclaredConstructors0(Native Method)
  java.lang.Class.privateGetDeclaredConstructors(Class.java:2585)
  java.lang.Class.getConstructor0(Class.java:2885)
  java.lang.Class.newInstance(Class.java:350)
 
 
 sun.reflect.MethodAccessorGenerator$1.run(MethodAccessorGenerator.java:399)
 
 
 sun.reflect.MethodAccessorGenerator$1.run(MethodAccessorGenerator.java:396)
  java.security.AccessController.doPrivileged(Native Method)
 
 
 sun.reflect.MethodAccessorGenerator.generate(MethodAccessorGenerator.java:395)
 
 
 sun.reflect.MethodAccessorGenerator.generateSerializationConstructor(MethodAccessorGenerator.java:113)
 
 
 sun.reflect.ReflectionFactory.newConstructorForSerialization(ReflectionFactory.java:331)
 
 
 java.io.ObjectStreamClass.getSerializableConstructor(ObjectStreamClass.java:1376)
  java.io.ObjectStreamClass.access$1500(ObjectStreamClass.java:72)
  java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:493)
  java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468)
  java.security.AccessController.doPrivileged(Native Method)
  java.io.ObjectStreamClass.init(ObjectStreamClass.java:468)
  java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365)
 
  java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:602)
 
  java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)
 
  java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
 
  java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
 
  java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 
  java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 
  java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 
  java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 
  java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 
  java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 
  java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 
  java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 
  java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 
  java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 
  java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 
 
  1 stage out of 14 (so far) is failing. My failing stage is 1768
 succeeded /
  1862 (940 failed). 7 tasks failed with OOM, 919 were Resubmitted
  (resubmitted due to lost executor).
 
  Now my Aggregated Metrics by Executor shows that 10 out of 16 executors
  show CANNOT FIND ADDRESS which I imagine means the JVM blew up and
 hasn't
  been restarted. Now the 'Executors' tab shows only 7 executors.
 
   - Is this normal?
   - Any ideas why this is happening?
   - Any other measures I can take to prevent this?
   - Is the rest of my app going to run on a reduced number of executors?
   - Can I re-start the executors mid-application? This is a long-running
 job,
  so I'd like to do what I can whilst it's running, if possible.
   - Am I correct in thinking that the --conf arguments are supplied to the
  JVMs of the slave executors, so they will be receiving the
 extraJavaOptions
  and memoryOverhead?
 
  Thanks very much!
 
  Joe



Re: Mapping directory structure to columns in SparkSQL

2015-01-09 Thread Michael Davies
Hi Michael, 

I have got the directory based column support working at least in a trial. I 
have put the trial code here - DirIndexParquet.scala 
https://github.com/MickDavies/spark-parquet-dirindex/blob/master/src/main/scala/org/apache/spark/sql/parquet/DirIndexParquet.scala
 it has involved me copying quite a lot of newParquet. 

There are some tests here that parquet 
https://github.com/MickDavies/spark-parquet-dirindex/tree/master/src/test/scala/org/apache/spark/sql/parquet
 illustrate use.

I’d be keen to help in anyway with the datasources API changes that you 
mention, would you like to discuss?

Thanks

Mick



 On 30 Dec 2014, at 17:40, Michael Davies michael.belldav...@gmail.com wrote:
 
 Hi Michael, 
 
 I’ve looked through the example and the test cases and I think I understand 
 what we need to do - so I’ll give it a go. 
 
 I think what I’d like to try to do is allow files to be added at anytime, so 
 perhaps I can cache partition info, and also what may be useful for us would 
 be to derive schema from the set of all files, hopefully this is achievable 
 also.
 
 Thanks
 
 Mick
 
 
 On 30 Dec 2014, at 04:49, Michael Armbrust mich...@databricks.com 
 mailto:mich...@databricks.com wrote:
 
 You can't do this now without writing a bunch of custom logic (see here for 
 an example: 
 https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
  
 https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala)
 
 I would like to make this easier as part of improvements to the datasources 
 api that we are planning for Spark 1.3
 
 On Mon, Dec 29, 2014 at 2:19 AM, Mickalas michael.belldav...@gmail.com 
 mailto:michael.belldav...@gmail.com wrote:
 I see that there is already a request to add wildcard support to the
 SQLContext.parquetFile function
 https://issues.apache.org/jira/browse/SPARK-3928 
 https://issues.apache.org/jira/browse/SPARK-3928.
 
 What seems like a useful thing for our use case is to associate the
 directory structure with certain columns in the table, but it does not seem
 like this is supported.
 
 For example we want to create parquet files on a daily basis associated with
 geographic regions and so will create a set of files under directories such
 as:
 
 * 2014-12-29/Americas
 * 2014-12-29/Asia
 * 2014-12-30/Americas
 * ...
 
 Where queries have predicates that match the column values determinable from
 directory structure it would be good to only extract data from matching
 files.
 
 Does anyone know if something like this is supported, or whether this is a
 reasonable thing to request?
 
 Mick
 
 
 
 
 
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Mapping-directory-structure-to-columns-in-SparkSQL-tp20880.html
  
 http://apache-spark-user-list.1001560.n3.nabble.com/Mapping-directory-structure-to-columns-in-SparkSQL-tp20880.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com 
 http://nabble.com/.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
 mailto:user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org 
 mailto:user-h...@spark.apache.org
 
 
 



Play Scala Spark Exmaple

2015-01-09 Thread Eduardo Cusa
Hi guys, I running the following example :
https://github.com/knoldus/Play-Spark-Scala in the same machine as the
spark master, and the spark cluster was lauched with ec2 script.


I'm stuck with this errors, any idea how to fix it?

Regards
Eduardo


call the play app prints the following exception :


[*error*] a.r.EndpointWriter - AssociationError
[akka.tcp://sparkDriver@ip-10-28-236-122.ec2.internal:47481] -
[akka.tcp://driverPropsFetcher@ip-10-158-18-250.ec2.internal:52575]:
Error [Shut down address:
akka.tcp://driverPropsFetcher@ip-10-158-18-250.ec2.internal:52575] [
akka.remote.ShutDownAssociation: Shut down address:
akka.tcp://driverPropsFetcher@ip-10-158-18-250.ec2.internal:52575
Caused by: akka.remote.transport.Transport$InvalidAssociationException:
The remote system terminated the association because it is shutting
down.




The master recive the spark application and generate the following stderr
log page:


15/01/09 13:31:23 INFO Remoting: Remoting started; listening on
addresses :[akka.tcp://sparkExecutor@ip-10-158-18-250.ec2.internal:37856]
15/01/09 13:31:23 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://sparkExecutor@ip-10-158-18-250.ec2.internal:37856]
15/01/09 13:31:23 INFO util.Utils: Successfully started service
'sparkExecutor' on port 37856.
15/01/09 13:31:23 INFO util.AkkaUtils: Connecting to MapOutputTracker:
akka.tcp://sparkDriver@ip-10-28-236-122.ec2.internal:47481/user/MapOutputTracker
15/01/09 13:31:23 INFO util.AkkaUtils: Connecting to
BlockManagerMaster:
akka.tcp://sparkDriver@ip-10-28-236-122.ec2.internal:47481/user/BlockManagerMaster
15/01/09 13:31:23 INFO storage.DiskBlockManager: Created local
directory at /mnt/spark/spark-local-20150109133123-3805
15/01/09 13:31:23 INFO storage.DiskBlockManager: Created local
directory at /mnt2/spark/spark-local-20150109133123-b05e
15/01/09 13:31:23 INFO util.Utils: Successfully started service
'Connection manager for block manager' on port 36936.
15/01/09 13:31:23 INFO network.ConnectionManager: Bound socket to port
36936 with id =
ConnectionManagerId(ip-10-158-18-250.ec2.internal,36936)
15/01/09 13:31:23 INFO storage.MemoryStore: MemoryStore started with
capacity 265.4 MB
15/01/09 13:31:23 INFO storage.BlockManagerMaster: Trying to register
BlockManager
15/01/09 13:31:23 INFO storage.BlockManagerMaster: Registered BlockManager
15/01/09 13:31:23 INFO util.AkkaUtils: Connecting to
HeartbeatReceiver:
akka.tcp://sparkDriver@ip-10-28-236-122.ec2.internal:47481/user/HeartbeatReceiver
15/01/09 13:31:54 ERROR executor.CoarseGrainedExecutorBackend: Driver
Disassociated [akka.tcp://sparkExecutor@ip-10-158-18-250.ec2.internal:57671]
- [akka.tcp://sparkDriver@ip-10-28-236-122.ec2.internal:47481]
disassociated! Shutting down.


Newbie Question on How Tasks are Executed

2015-01-09 Thread mixtou
I am writing my first project in spark. It is the implementation of the
Space Saving counting algorithm.
I am trying to understand how tasks are executed in partitions. 
As you can see from my code the algorithms keeps in memory only a small
amount of words for example 100. The top-k ones not all of them. For each
new word it arrives, it replaces the least frequent one and updates the
counts and errors. If the word exists it increments its count. My question
is where and how this code will be executed. 
For example i set the rod's parallelism to 4. So each task will be split in
4 partitions right?
When i collect the words shouldn't i be returned 4*k words, since i have 4
partitions with k words kept in each one (k words from each partition)?  If
true why am i getting only k words as result when i collect them?? Where is
the merging happening??
Is there a way to know which partitions returns what?? Is there a
partitionid ???
How can i monitor each partitions execution is that possible to know for
example what words get executed and were (i mean the partition)

My code:
===
import org.apache.spark.{SparkConf, SparkContext}

/**
 * Created by mixtou on 30/12/14.
 */

object SpaceSaving {

  var frequent_words_counters = scala.collection.immutable.Map[String,
Array[Int]]();
  var guaranteed_words = scala.collection.immutable.Map[String,
Array[Int]]();
  val top_k: Int = 100;
  var words_no: Int = 0;
  var tStart: Long = 0;
  var tStop: Long = 0;
  var fi: Double = 0.01;

  def main (args: Array[String]): Unit = {

val sparkConf = new SparkConf().setAppName(Space Saving
Project).setMaster(local);
val ctx = new SparkContext(sparkConf);

val lines =
ctx.textFile(/Users/mixtou/PhD/Courses/Databases_Advanced/Project/scala_spark_space_saving/src/main/resources/README.txt,
4)
  .map(line = line.toLowerCase());
val nonEmptyLines = lines.filter(line = line.nonEmpty);
val regex = [,.:;'\\\?\\-!\\(\\)\\+\\[\\]\\d+].r;
val cleanLines = nonEmptyLines.map(line = regex.replaceAllIn(line, 
));
val dirtyWords = cleanLines.flatMap(line = line.split(\\s+));
val words = dirtyWords.filter(word = word.length  3);

words.foreach(word = space_saving_algorithm(word));

if (frequent_words_counters.size  0) {
  frequent_words_counters.foreach(line = println(Top Frequent Word: 
+ line._1 +  with count:  + line._2(0) +  end error:  + line._2(1)));
}

System.out.println(=== Throughput:= + 1000*(words_no/(tStop -
tStart))+  words per second. );
estimateGuaranteedFrequentWords();

ctx.stop();
  }

  def space_saving_algorithm(word: String) = {

if (frequent_words_counters.contains(word)) {
  val count = frequent_words_counters.get(word).get(0);
  val error = frequent_words_counters.get(word).get(1);
  frequent_words_counters += word - Array[Int](count + 1, error);
}
else {

  if (frequent_words_counters.size  top_k) {
frequent_words_counters += word - Array[Int](1, 0);
  }
  else {
replaceLeastEntry(word);
  }
}

if (words_no  0) {
  tStop = java.lang.System.currentTimeMillis();
}
else {
  tStart = java.lang.System.currentTimeMillis();
}
words_no += 1;
  }

  def replaceLeastEntry(word: String): Unit = {

var temp_list = frequent_words_counters.toList.sortWith( (x,y) =
x._2(0)  y._2(0) );
val word_count = temp_list.last._2(0);
temp_list = temp_list.take(temp_list.length - 1);

frequent_words_counters = temp_list.toMap[String, Array[Int]];
frequent_words_counters += word - Array[Int](word_count+1, word_count);
  }

  def estimateGuaranteedFrequentWords(): Unit = {
frequent_words_counters.foreach{tuple =
  if (tuple._2(0) - tuple._2(1)  words_no*fi) {
guaranteed_words -= tuple._1;
  }
  else {
System.out.println(Guaranteed Word : +tuple._1+ with count:
+tuple._2(0)+ and error: +tuple._2(1));

  }
}
  }
}



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Newbie-Question-on-How-Tasks-are-Executed-tp21064.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Accidental kill in UI

2015-01-09 Thread Joe Wass
So I had a Spark job with various failures, and I decided to kill it and
start again. I clicked the 'kill' link in the web console, restarted the
job on the command line and headed back to the web console and refreshed to
see how my job was doing... the URL at the time was:

/stages/stage/kill?id=1terminate=true

Which of course terminated the stage again. No loss, but if I'd waited a
few hours before doing that, I would have lost data.

I know to be careful next time, but isn't 'don't modify state as a result
of a GET request' the first rule of HTTP? It could lead to an expensive
mistake. Making this a POST would be a simple fix.

Does anyone else think this is worth creating an issue for?


Re: Accidental kill in UI

2015-01-09 Thread Sean Owen
(FWIW yes I think this should certainly be a POST. The link can become
a miniature form to achieve this and then the endpoint just needs to
accept POST only. You should propose a pull request.)

On Fri, Jan 9, 2015 at 12:51 PM, Joe Wass jw...@crossref.org wrote:
 So I had a Spark job with various failures, and I decided to kill it and
 start again. I clicked the 'kill' link in the web console, restarted the job
 on the command line and headed back to the web console and refreshed to see
 how my job was doing... the URL at the time was:

 /stages/stage/kill?id=1terminate=true

 Which of course terminated the stage again. No loss, but if I'd waited a few
 hours before doing that, I would have lost data.

 I know to be careful next time, but isn't 'don't modify state as a result of
 a GET request' the first rule of HTTP? It could lead to an expensive
 mistake. Making this a POST would be a simple fix.

 Does anyone else think this is worth creating an issue for?



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: TF-IDF from spark-1.1.0 not working on cluster mode

2015-01-09 Thread Priya Ch
Please find the attached worker log.
 I could see stream closed exception

On Wed, Jan 7, 2015 at 10:51 AM, Xiangrui Meng men...@gmail.com wrote:

 Could you attach the executor log? That may help identify the root
 cause. -Xiangrui

 On Mon, Jan 5, 2015 at 11:12 PM, Priya Ch learnings.chitt...@gmail.com
 wrote:
  Hi All,
 
  Word2Vec and TF-IDF algorithms in spark mllib-1.1.0 are working only in
  local mode and not on distributed mode. Null pointer exception has been
  thrown. Is this a bug in spark-1.1.0 ?
 
  Following is the code:
def main(args:Array[String])
{
   val conf=new SparkConf
   val sc=new SparkContext(conf)
   val
 
 documents=sc.textFile(hdfs://IMPETUS-DSRV02:9000/nlp/sampletext).map(_.split(
  ).toSeq)
   val hashingTF = new HashingTF()
   val tf= hashingTF.transform(documents)
   tf.cache()
  val idf = new IDF().fit(tf)
  val tfidf = idf.transform(tf)
   val rdd=tfidf.map { vec = println(vector is+vec)
  (10)
 }
   rdd.saveAsTextFile(/home/padma/usecase)
 
}
 
 
 
 
  Exception thrown:
 
  15/01/06 12:36:09 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0
 with
  2 tasks
  15/01/06 12:36:10 INFO cluster.SparkDeploySchedulerBackend: Registered
  executor:
  Actor[akka.tcp://
 sparkexecu...@impetus-dsrv05.impetus.co.in:33898/user/Executor#-1525890167
 ]
  with ID 0
  15/01/06 12:36:10 INFO scheduler.TaskSetManager: Starting task 0.0 in
 stage
  0.0 (TID 0, IMPETUS-DSRV05.impetus.co.in, NODE_LOCAL, 1408 bytes)
  15/01/06 12:36:10 INFO scheduler.TaskSetManager: Starting task 1.0 in
 stage
  0.0 (TID 1, IMPETUS-DSRV05.impetus.co.in, NODE_LOCAL, 1408 bytes)
  15/01/06 12:36:10 INFO storage.BlockManagerMasterActor: Registering block
  manager IMPETUS-DSRV05.impetus.co.in:35130 with 2.1 GB RAM
  15/01/06 12:36:12 INFO network.ConnectionManager: Accepted connection
 from
  [IMPETUS-DSRV05.impetus.co.in/192.168.145.195:46888]
  15/01/06 12:36:12 INFO network.SendingConnection: Initiating connection
 to
  [IMPETUS-DSRV05.impetus.co.in/192.168.145.195:35130]
  15/01/06 12:36:12 INFO network.SendingConnection: Connected to
  [IMPETUS-DSRV05.impetus.co.in/192.168.145.195:35130], 1 messages pending
  15/01/06 12:36:12 INFO storage.BlockManagerInfo: Added
 broadcast_1_piece0 in
  memory on IMPETUS-DSRV05.impetus.co.in:35130 (size: 2.1 KB, free: 2.1
 GB)
  15/01/06 12:36:12 INFO storage.BlockManagerInfo: Added
 broadcast_0_piece0 in
  memory on IMPETUS-DSRV05.impetus.co.in:35130 (size: 10.1 KB, free: 2.1
 GB)
  15/01/06 12:36:13 INFO storage.BlockManagerInfo: Added rdd_3_1 in memory
 on
  IMPETUS-DSRV05.impetus.co.in:35130 (size: 280.0 B, free: 2.1 GB)
  15/01/06 12:36:13 INFO storage.BlockManagerInfo: Added rdd_3_0 in memory
 on
  IMPETUS-DSRV05.impetus.co.in:35130 (size: 416.0 B, free: 2.1 GB)
  15/01/06 12:36:13 WARN scheduler.TaskSetManager: Lost task 1.0 in stage
 0.0
  (TID 1, IMPETUS-DSRV05.impetus.co.in): java.lang.NullPointerException:
  org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)
  org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)
 
  org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
  org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
  org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 
  org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
  org.apache.spark.scheduler.Task.run(Task.scala:54)
 
  org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
 
 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
 
 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
  java.lang.Thread.run(Thread.java:722)
 
 
  Thanks,
  Padma Ch



spark-rtauser-org.apache.spark.deploy.worker.Worker-1-IMPETUS-DSRV02.out
Description: Binary data

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Parquet compression codecs not applied

2015-01-09 Thread Ayoub
Hello, 

I tried to save a table created via the hive context as a parquet file but
whatever compression codec (uncompressed, snappy, gzip or lzo) I set via
setConf like: 

setConf(spark.sql.parquet.compression.codec, gzip) 

the size of the generated files is the always the same, so it seems like
spark context ignores the compression codec that I set. 

Here is a code sample applied via the spark shell: 

import org.apache.spark.sql.hive.HiveContext 
val hiveContext = new HiveContext(sc) 

hiveContext.sql(SET hive.exec.dynamic.partition = true) 
hiveContext.sql(SET hive.exec.dynamic.partition.mode = nonstrict) 
hiveContext.setConf(spark.sql.parquet.binaryAsString, true) // required
to make data compatible with impala 
hiveContext.setConf(spark.sql.parquet.compression.codec, gzip) 

hiveContext.sql(create external table if not exists foo (bar STRING, ts
INT) Partitioned by (year INT, month INT, day INT) STORED AS PARQUET
Location 'hdfs://path/data/foo') 

hiveContext.sql(insert into table foo partition(year, month,day) select *,
year(from_unixtime(ts)) as year, month(from_unixtime(ts)) as month, 
day(from_unixtime(ts)) as day from raw_foo) 

I tried that with spark 1.2 and 1.3 snapshot against hive 0.13 
and I also tried that with Impala on the same cluster which applied
correctly the compression codecs. 

Does anyone know what could be the problem ? 

Thanks, 
Ayoub.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Parquet-compression-codecs-not-applied-tp21058.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Set EXTRA_JAR environment variable for spark-jobserver

2015-01-09 Thread Sasi
We are able to resolve *SparkException: Job aborted due to stage failure: All
masters are unresponsive! Giving up* as well. Spark-jobserver working fine
now and need to experiment more.

Thank you guys.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Set-EXTRA-JAR-environment-variable-for-spark-jobserver-tp20989p21060.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Did anyone tried overcommit of CPU cores?

2015-01-09 Thread gen tang
Hi,

As you said, the --executor-cores will define the max number of tasks that
an executor can take simultaneously. So, if you claim 10 cores, it is not
possible to launch more than 10 tasks in an executor at the same time.
According to my experience, set cores more than physical CPU core will
cause overload of CPU at some point of execution of spark application.
especially when you are using algorithm in mllib package. In addition, the
executor-cores will affect the default level of parallelism of spark.
Therefore, I recommend you to set cores = physical cores by default.
Moreover, I don't think overcommit cpu will increase the use of CPU. In my
opinion, it just increase the waiting queue of CPU.
If you observe the CPU load is very low (through ganglia for example) and
too much IO, maybe increasing level of parallelism or serializing your
object is a good choice.

Hoping this helps

Cheers
Gen


On Fri, Jan 9, 2015 at 10:12 AM, Xuelin Cao xuelincao2...@gmail.com wrote:


 Thanks, but, how to increase the tasks per core?

 For example, if the application claims 10 cores, is it possible to launch
 100 tasks concurrently?



 On Fri, Jan 9, 2015 at 2:57 PM, Jörn Franke jornfra...@gmail.com wrote:

 Hallo,

 Based on experiences with other software in virtualized environments I
 cannot really recommend this. However, I am not sure how Spark reacts. You
 may face unpredictable task failures depending on utilization, tasks
 connecting to external systems (databases etc.) may fail unexpectedly and
 this might be a problem for them (transactions not finishing etc.).

 Why not increase the tasks per core?

 Best regards
 Le 9 janv. 2015 06:46, Xuelin Cao xuelincao2...@gmail.com a écrit :


 Hi,

   I'm wondering whether it is a good idea to overcommit CPU cores on
 the spark cluster.

   For example, in our testing cluster, each worker machine has 24
 physical CPU cores. However, we are allowed to set the CPU core number to
 48 or more in the spark configuration file. As a result, we are allowed to
 launch more tasks than the number of physical CPU cores.

   The motivation of overcommit CPU cores is, for many times, a task
 cannot consume 100% resource of a single CPU core (due to I/O, shuffle,
 etc.).

   So, overcommit the CPU cores allows more tasks running at the same
 time, and makes the resource be used economically.

   But, is there any reason that we should not doing like this?
 Anyone tried this?

   [image: Inline image 1]






Zipping RDDs of equal size not possible

2015-01-09 Thread Niklas Wilcke
Hi Spark community,

I have a problem with zipping two RDDs of the same size and same number
of partitions.
The error message says that zipping is only allowed on RDDs which are
partitioned into chunks of exactly the same sizes.
How can I assure this? My workaround at the moment is to repartition
both RDDs to only one partition but that obviously
does not scale.

This problem originates from my problem to draw n random tuple pairs
(Tuple, Tuple) from an RDD[Tuple].
What I do is to sample 2 * n tuples, split them into two parts, balance
the sizes of these parts
by filtering some tuples out and zipping them together.

I would appreciate to read better approaches for both problems.

Thanks in advance,
Niklas


Re: Queue independent jobs

2015-01-09 Thread Anders Arpteg
Awesome, it actually seems to work. Amazing how simple it can be
sometimes...

Thanks Sean!

On Fri, Jan 9, 2015 at 12:42 PM, Sean Owen so...@cloudera.com wrote:

 You can parallelize on the driver side. The way to do it is almost
 exactly what you have here, where you're iterating over a local Scala
 collection of dates and invoking a Spark operation for each. Simply
 write dateList.par.map(...) to make the local map proceed in
 parallel. It should invoke the Spark jobs simultaneously.

 On Fri, Jan 9, 2015 at 10:46 AM, Anders Arpteg arp...@spotify.com wrote:
  Hey,
 
  Lets say we have multiple independent jobs that each transform some data
 and
  store in distinct hdfs locations, is there a nice way to run them in
  parallel? See the following pseudo code snippet:
 
  dateList.map(date =
  sc.hdfsFile(date).map(transform).saveAsHadoopFile(date))
 
  It's unfortunate if they run in sequence, since all the executors are not
  used efficiently. What's the best way to parallelize execution of these
  jobs?
 
  Thanks,
  Anders



Re: PermGen issues on AWS

2015-01-09 Thread Sean Owen
It's normal for PermGen to be a bit more of an issue with Spark than
for other JVM-based applications. You should simply increase the
PermGen size, which I don't see in your command. -XX:MaxPermSize=256m
allows it to grow to 256m for example. The right size depends on your
total heap size and app.

Also, Java 8 no longer has a permanent generation, so this particular
type of problem and tuning is not needed. You might consider running
on Java 8.

On Fri, Jan 9, 2015 at 10:38 AM, Joe Wass jw...@crossref.org wrote:
 I'm running on an AWS cluster of 10 x m1.large (64 bit, 7.5 GiB RAM). FWIW
 I'm using the Flambo Clojure wrapper which uses the Java API but I don't
 think that should make any difference. I'm running with the following
 command:

 spark/bin/spark-submit --class mything.core --name My Thing --conf
 spark.yarn.executor.memoryOverhead=4096 --conf
 spark.executor.extraJavaOptions=-XX:+CMSClassUnloadingEnabled
 -XX:+CMSPermGenSweepingEnabled /root/spark/code/myjar.jar

 For one of the stages I'm getting errors:

  - ExecutorLostFailure (executor lost)
  - Resubmitted (resubmitted due to lost executor)

 And I think they're caused by slave executor JVMs dying up with this error:

 java.lang.OutOfMemoryError: PermGen space
 java.lang.Class.getDeclaredConstructors0(Native Method)
 java.lang.Class.privateGetDeclaredConstructors(Class.java:2585)
 java.lang.Class.getConstructor0(Class.java:2885)
 java.lang.Class.newInstance(Class.java:350)

 sun.reflect.MethodAccessorGenerator$1.run(MethodAccessorGenerator.java:399)

 sun.reflect.MethodAccessorGenerator$1.run(MethodAccessorGenerator.java:396)
 java.security.AccessController.doPrivileged(Native Method)

 sun.reflect.MethodAccessorGenerator.generate(MethodAccessorGenerator.java:395)

 sun.reflect.MethodAccessorGenerator.generateSerializationConstructor(MethodAccessorGenerator.java:113)

 sun.reflect.ReflectionFactory.newConstructorForSerialization(ReflectionFactory.java:331)

 java.io.ObjectStreamClass.getSerializableConstructor(ObjectStreamClass.java:1376)
 java.io.ObjectStreamClass.access$1500(ObjectStreamClass.java:72)
 java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:493)
 java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468)
 java.security.AccessController.doPrivileged(Native Method)
 java.io.ObjectStreamClass.init(ObjectStreamClass.java:468)
 java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365)
 java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:602)

 java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)
 java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)

 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)

 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)

 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)

 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)

 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)

 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)


 1 stage out of 14 (so far) is failing. My failing stage is 1768 succeeded /
 1862 (940 failed). 7 tasks failed with OOM, 919 were Resubmitted
 (resubmitted due to lost executor).

 Now my Aggregated Metrics by Executor shows that 10 out of 16 executors
 show CANNOT FIND ADDRESS which I imagine means the JVM blew up and hasn't
 been restarted. Now the 'Executors' tab shows only 7 executors.

  - Is this normal?
  - Any ideas why this is happening?
  - Any other measures I can take to prevent this?
  - Is the rest of my app going to run on a reduced number of executors?
  - Can I re-start the executors mid-application? This is a long-running job,
 so I'd like to do what I can whilst it's running, if possible.
  - Am I correct in thinking that the --conf arguments are supplied to the
 JVMs of the slave executors, so they will be receiving the extraJavaOptions
 and memoryOverhead?

 Thanks very much!

 Joe

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: calculating the mean of SparseVector RDD

2015-01-09 Thread Rok Roskar
thanks for the suggestion -- however, looks like this is even slower. With
the small data set I'm using, my aggregate function takes ~ 9 seconds and
the colStats.mean() takes ~ 1 minute. However, I can't get it to run with
the Kyro serializer -- I get the error:

com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 5,
required: 8

is there an easy/obvious fix?


On Wed, Jan 7, 2015 at 7:30 PM, Xiangrui Meng men...@gmail.com wrote:

 There is some serialization overhead. You can try

 https://github.com/apache/spark/blob/master/python/pyspark/mllib/stat.py#L107
 . -Xiangrui

 On Wed, Jan 7, 2015 at 9:42 AM, rok rokros...@gmail.com wrote:
  I have an RDD of SparseVectors and I'd like to calculate the means
 returning
  a dense vector. I've tried doing this with the following (using pyspark,
  spark v1.2.0):
 
  def aggregate_partition_values(vec1, vec2) :
  vec1[vec2.indices] += vec2.values
  return vec1
 
  def aggregate_combined_vectors(vec1, vec2) :
  if all(vec1 == vec2) :
  # then the vector came from only one partition
  return vec1
  else:
  return vec1 + vec2
 
  means = vals.aggregate(np.zeros(vec_len), aggregate_partition_values,
  aggregate_combined_vectors)
  means = means / nvals
 
  This turns out to be really slow -- and doesn't seem to depend on how
 many
  vectors there are so there seems to be some overhead somewhere that I'm
 not
  understanding. Is there a better way of doing this?
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/calculating-the-mean-of-SparseVector-RDD-tp21019.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 



PermGen issues on AWS

2015-01-09 Thread Joe Wass
I'm running on an AWS cluster of 10 x m1.large (64 bit, 7.5 GiB RAM). FWIW
I'm using the Flambo Clojure wrapper which uses the Java API but I don't
think that should make any difference. I'm running with the following
command:

spark/bin/spark-submit --class mything.core --name My Thing --conf
spark.yarn.executor.memoryOverhead=4096 --conf
spark.executor.extraJavaOptions=-XX:+CMSClassUnloadingEnabled
-XX:+CMSPermGenSweepingEnabled /root/spark/code/myjar.jar

For one of the stages I'm getting errors:

 - ExecutorLostFailure (executor lost)
 - Resubmitted (resubmitted due to lost executor)

And I think they're caused by slave executor JVMs dying up with this error:

java.lang.OutOfMemoryError: PermGen space
java.lang.Class.getDeclaredConstructors0(Native Method)
java.lang.Class.privateGetDeclaredConstructors(Class.java:2585)
java.lang.Class.getConstructor0(Class.java:2885)
java.lang.Class.newInstance(Class.java:350)

sun.reflect.MethodAccessorGenerator$1.run(MethodAccessorGenerator.java:399)

sun.reflect.MethodAccessorGenerator$1.run(MethodAccessorGenerator.java:396)
java.security.AccessController.doPrivileged(Native Method)

sun.reflect.MethodAccessorGenerator.generate(MethodAccessorGenerator.java:395)

sun.reflect.MethodAccessorGenerator.generateSerializationConstructor(MethodAccessorGenerator.java:113)

sun.reflect.ReflectionFactory.newConstructorForSerialization(ReflectionFactory.java:331)

java.io.ObjectStreamClass.getSerializableConstructor(ObjectStreamClass.java:1376)
java.io.ObjectStreamClass.access$1500(ObjectStreamClass.java:72)
java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:493)
java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468)
java.security.AccessController.doPrivileged(Native Method)
java.io.ObjectStreamClass.init(ObjectStreamClass.java:468)
java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365)
java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:602)

java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)

java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)

java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)

java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)

java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)

java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)

java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)


1 stage out of 14 (so far) is failing. My failing stage is 1768 succeeded /
1862 (940 failed). 7 tasks failed with OOM, 919 were Resubmitted
(resubmitted due to lost executor).

Now my Aggregated Metrics by Executor shows that 10 out of 16 executors
show CANNOT FIND ADDRESS which I imagine means the JVM blew up and hasn't
been restarted. Now the 'Executors' tab shows only 7 executors.

 - Is this normal?
 - Any ideas why this is happening?
 - Any other measures I can take to prevent this?
 - Is the rest of my app going to run on a reduced number of executors?
 - Can I re-start the executors mid-application? This is a long-running
job, so I'd like to do what I can whilst it's running, if possible.
 - Am I correct in thinking that the --conf arguments are supplied to the
JVMs of the slave executors, so they will be receiving the extraJavaOptions
and memoryOverhead?

Thanks very much!

Joe


Queue independent jobs

2015-01-09 Thread Anders Arpteg
Hey,

Lets say we have multiple independent jobs that each transform some data
and store in distinct hdfs locations, is there a nice way to run them in
parallel? See the following pseudo code snippet:

dateList.map(date =
sc.hdfsFile(date).map(transform).saveAsHadoopFile(date))

It's unfortunate if they run in sequence, since all the executors are not
used efficiently. What's the best way to parallelize execution of these
jobs?

Thanks,
Anders


Re: Data locality running Spark on Mesos

2015-01-09 Thread Michael V Le

Hi Tim,

Thanks for your response.

The benchmark I used just reads data in from HDFS and builds the Linear
Regression model using methods from the MLlib.
Unfortunately, for various reasons, I can't open the source code for the
benchmark at this time.
I will try to replicate the problem using some sample benchmarks provided
by the vanilla Spark distribution.
It is very possible that I have something very screwy in my workload or
setup.

The parameters I used for the Spark on Mesos are the following:
driver memory = 1G
total-executor-cores = 60
spark.executor.memory 6g
spark.storage.memoryFraction 0.9
spark.mesos.coarse = true

The rest are default values, so spark.locality.wait should just be 3000ms.

I launched the Spark job on a separate node from the 10-node cluster using
spark-submit.

With regards to Mesos in fine-grained mode, do you have a feel for the
overhead of
launching executors for every task? Of course, any perceived slow down will
probably be very dependent
on the workload. I just want to have a feel of the possible overhead (e.g.,
factor of 2 or 3 slowdown?).
If not a data locality issue, perhaps this overhead can be a factor in the
slowdown I observed, at least in the fine-grained case.

BTW: i'm using Spark ver 1.1.0 and Mesos ver 0.20.0

Thanks,
Mike




From:   Tim Chen t...@mesosphere.io
To: Michael V Le/Watson/IBM@IBMUS
Cc: user user@spark.apache.org
Date:   01/08/2015 03:04 PM
Subject:Re: Data locality running Spark on Mesos



How did you run this benchmark, and is there a open version I can try it
with?

And what is your configurations, like spark.locality.wait, etc?

Tim

On Thu, Jan 8, 2015 at 11:44 AM, mvle m...@us.ibm.com wrote:
  Hi,

  I've noticed running Spark apps on Mesos is significantly slower compared
  to
  stand-alone or Spark on YARN.
  I don't think it should be the case, so I am posting the problem here in
  case someone has some explanation
  or can point me to some configuration options i've missed.

  I'm running the LinearRegression benchmark with a dataset of 48.8GB.
  On a 10-node stand-alone Spark cluster (each node 4-core, 8GB of RAM),
  I can finish the workload in about 5min (I don't remember exactly).
  The data is loaded into HDFS spanning the same 10-node cluster.
  There are 6 worker instances per node.

  However, when running the same workload on the same cluster but now with
  Spark on Mesos (course-grained mode), the execution time is somewhere
  around
  15min. Actually, I tried with find-grained mode and giving each Mesos
  node 6
  VCPUs (to hopefully get 6 executors like the stand-alone test), I still
  get
  roughly 15min.

  I've noticed that when Spark is running on Mesos, almost all tasks
  execute
  with locality NODE_LOCAL (even in Mesos in coarse-grained mode). On
  stand-alone, the locality is mostly PROCESS_LOCAL.

  I think this locality issue might be the reason for the slow down but I
  can't figure out why, especially for coarse-grained mode as the executors
  supposedly do not go away until job completion.

  Any ideas?

  Thanks,
  Mike



  --
  View this message in context:
  
http://apache-spark-user-list.1001560.n3.nabble.com/Data-locality-running-Spark-on-Mesos-tp21041.html

  Sent from the Apache Spark User List mailing list archive at Nabble.com.

  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org



Re: SparkSQL schemaRDD MapPartitions calls - performance issues - columnar formats?

2015-01-09 Thread Cheng Lian

Hey Nathan,

Thanks for sharing, this is a very interesting post :) My comments are 
inlined below.


Cheng

On 1/7/15 11:53 AM, Nathan McCarthy wrote:

Hi,

I’m trying to use a combination of SparkSQL and ‘normal' Spark/Scala 
via rdd.mapPartitions(…). Using the latest release 1.2.0.


Simple example; load up some sample data from parquet on HDFS (about 
380m rows, 10 columns) on a 7 node cluster.


  val t = sqlC.parquetFile(/user/n/sales-tran12m.parquet”)
t.registerTempTable(test1”)
sqlC.cacheTable(test1”)

Now lets do some operations on it; I want the total sales  quantities 
sold for each hour in the day so I choose 3 out of the 10 possible 
columns...


sqlC.sql(select Hour, sum(ItemQty), sum(Sales) from test1 group by 
Hour).collect().foreach(println)


After the table has been 100% cached in memory, this takes around 11 
seconds.


Lets do the same thing but via a MapPartitions call (this isn’t 
production ready code but gets the job done).


  val try2 = sqlC.sql(select Hour, ItemQty, Sales from test1”)
rddPC.mapPartitions { case hrs =
val qtySum = new Array[Double](24)
val salesSum = new Array[Double](24)

for(r - hrs) {
  val hr = r.getInt(0)
qtySum(hr) += r.getDouble(1)
salesSum(hr) += r.getDouble(2)
}
(salesSum zip qtySum).zipWithIndex.map(_.swap).iterator
}.reduceByKey((a,b) = (a._1 + b._1, a._2 + 
b._2)).collect().foreach(println)
I believe the evil thing that makes this snippet much slower is the 
for-loop. According to my early benchmark done with Scala 2.9, for-loop 
can be orders of magnitude slower than a simple while-loop, especially 
when the body of the loop only does something as trivial as this case. 
The reason is that Scala for-loop is translated into corresponding 
foreach/map/flatMap/withFilter function calls. And that's exactly why 
Spark SQL tries to avoid for-loop or any other functional style code in 
critical paths (where every row is touched), we also uses reusable 
mutable row objects instead of the immutable version to improve 
performance. You may check HiveTableScan, ParquetTableScan, 
InMemoryColumnarTableScan etc. for reference. Also, the `sum` function 
calls in your SQL code are translated into `o.a.s.s.execution.Aggregate` 
operators, which also use imperative while-loop and reusable mutable rows.


Another thing to notice is that the `hrs` iterator physically points to 
underlying in-memory columnar byte buffers, and the `for (r - hrs) { 
... }` loop actually decompresses and extracts values from required byte 
buffers (this is the unwrapping processes you mentioned below).


Now this takes around ~49 seconds… Even though test1 table is 100% 
cached. The number of partitions remains the same…


Now if I create a simple RDD of a case class HourSum(hour: Int, qty: 
Double, sales: Double)


Convert the SchemaRDD;
val rdd = sqlC.sql(select * from test1).map{ r = 
HourSum(r.getInt(1), r.getDouble(7), r.getDouble(8)) }.cache()

//cache all the data
rdd.count()

Then run basically the same MapPartitions query;

rdd.mapPartitions { case hrs =
  val qtySum = new Array[Double](24)
  val salesSum = new Array[Double](24)

  for(r - hrs) {
val hr = r.hour
qtySum(hr) += r.qty
salesSum(hr) += r.sales
  }
  (salesSum zip qtySum).zipWithIndex.map(_.swap).iterator
}.reduceByKey((a,b) = (a._1 + b._1, a._2 + 
b._2)).collect().foreach(println)


This takes around 1.5 seconds! Albeit the memory footprint is much larger.
I guess this 1.5 seconds doesn't include the time spent on caching the 
simple RDD? As I've explained above, in the first `mapPartitions` style 
snippet, columnar byte buffer unwrapping happens within the 
`mapPartitions` call. However, in this version, the unwrapping process 
happens when the `rdd.count()` action is performed. At that point, all 
values of all columns are extracted from underlying byte buffers, and 
the portion of data you need are then manually selected and transformed 
into the simple case class RDD via the `map` call.


If you include time spent on caching the simple case class RDD, it 
should be even slower than the first `mapPartitions` version.


My thinking is that because SparkSQL does store things in a columnar 
format, there is some unwrapping to be done out of the column array 
buffers which takes time and for some reason this just takes longer 
when I switch out to map partitions (maybe its unwrapping the entire 
row, even though I’m using just a subset of columns, or maybe there is 
some object creation/autoboxing going on when calling getInt or 
getDouble)…


I’ve tried simpler cases too, like just summing sales. Running sum via 
SQL is fast (4.7 seconds), running a mapPartition sum on a double RDD 
is even faster (2.6 seconds). But MapPartitions on the SchemaRDD;


/sqlC.sql(select SalesInclGST from test1).mapPartitions(iter = 
Iterator(iter.foldLeft(0.0)((t,r) = t+r.getDouble(0.sum/


 takes a long time (33 seconds). In all these examples everything is 
fully cached in memory. And yes for these kinds 

Cleaning up spark.local.dir automatically

2015-01-09 Thread michael.england
Hi,

Is there a way of automatically cleaning up the spark.local.dir after a job has 
been run? I have noticed a large number of temporary files have been stored 
here and are not cleaned up. The only solution I can think of is to run some 
sort of cron job to delete files older than a few days. I am currently using a 
mixture of standalone and YARN spark builds.

Thanks,
Michael



This e-mail (including any attachments) is private and confidential, may 
contain proprietary or privileged information and is intended for the named 
recipient(s) only. Unintended recipients are strictly prohibited from taking 
action on the basis of information in this e-mail and must contact the sender 
immediately, delete this e-mail (and all attachments) and destroy any hard 
copies. Nomura will not accept responsibility or liability for the accuracy or 
completeness of, or the presence of any virus or disabling code in, this 
e-mail. If verification is sought please request a hard copy. Any reference to 
the terms of executed transactions should be treated as preliminary only and 
subject to formal written confirmation by Nomura. Nomura reserves the right to 
retain, monitor and intercept e-mail communications through its networks 
(subject to and in accordance with applicable laws). No confidentiality or 
privilege is waived or lost by Nomura by any mistransmission of this e-mail. 
Any reference to Nomura is a reference to any entity in the Nomura Holdings, 
Inc. group. Please read our Electronic Communications Legal Notice which forms 
part of this e-mail: http://www.Nomura.com/email_disclaimer.htm