RE: Spark SQL Stackoverflow error

2015-03-01 Thread Jishnu Prathap
Hi
The Issue was not fixed .
I removed the between sql layer and directly created features from the file.

Regards
Jishnu Prathap

From: lovelylavs [via Apache Spark User List] 
[mailto:ml-node+s1001560n21862...@n3.nabble.com]
Sent: Sunday, March 01, 2015 4:44 AM
To: Jishnu Menath Prathap (WT01 - BAS)
Subject: Re: Spark SQL Stackoverflow error

Hi,

how was this issue fixed?

If you reply to this email, your message will be added to the discussion below:
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Stackoverflow-error-tp12086p21862.html
To unsubscribe from Spark SQL Stackoverflow error, click 
here.
NAML
The information contained in this electronic message and any attachments to 
this message are intended for the exclusive use of the addressee(s) and may 
contain proprietary, confidential or privileged information. If you are not the 
intended recipient, you should not disseminate, distribute or copy this e-mail. 
Please notify the sender immediately and destroy all copies of this message and 
any attachments. WARNING: Computer viruses can be transmitted via email. The 
recipient should check this email and any attachments for the presence of 
viruses. The company accepts no liability for any damage caused by any virus 
transmitted by this email. www.wipro.com




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Stackoverflow-error-tp12086p21863.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Scalable JDBCRDD

2015-03-01 Thread Cody Koeninger
I'm a little confused by your comments regarding LIMIT.  There's nothing
about JdbcRDD that depends on limit.  You just need to be able to partition
your data in some way such that it has numeric upper and lower bounds.
Primary key range scans, not limit, would ordinarily be the best way to do
that.  If you can't partition your data that way for some reason, how do
you propose to partition it at all?


On Sat, Feb 28, 2015 at 11:13 AM, Michal Klos 
wrote:

> Hi Spark community,
>
> We have a use case where we need to pull huge amounts of data from a SQL
> query against a database into Spark. We need to execute the query against
> our huge database and not a substitute (SparkSQL, Hive, etc) because of a
> couple of factors including custom functions used in the queries that only
> our database has.
>
> We started by looking at JDBC RDD, which utilizes a prepared statement
> with two parameters that are meant to be used to partition the result set
> to the workers... e.g.:
>
> select * from table limit ?,?
>
> turns into
>
> select * from table limit 1,100 on worker 1
> select * from table limit 101,200 on worker 2
>
> This will not work for us because our database cannot support multiple
> execution of these queries without being crippled. But, additionally, our
> database doesn't support the above LIMIT syntax and we don't have a generic
> way of partitioning the various queries.
>
> As a result -- we stated by forking JDBCRDD and made a version that
> executes the SQL query once in getPartitions into a Vector and then hands
> each worker node an index and iterator. Here's a snippet of getPartitions
> and compute:
>
>   override def getPartitions: Array[Partition] = {
> //Compute the DB query once here
> val results = computeQuery
>
> (0 until numPartitions).map(i => {
>   // TODO: would be better to do this partitioning when scrolling through 
> result set if still loading into memory
>   val partitionItems = results.drop(i).sliding(1, 
> numPartitions).flatten.toVector
>   new DBPartition(i, partitionItems)
> }).toArray
>   }
>
>   override def compute(thePart: Partition, context: TaskContext) = new 
> NextIterator[T] {
> val part = thePart.asInstanceOf[DBPartition[T]]
>
> //Shift the result vector to our index number and then do a sliding 
> iterator over it
> val iterator = part.items.iterator
>
> override def getNext : T = {
>   if (iterator.hasNext) {
> iterator.next()
>   } else {
> finished = true
> null.asInstanceOf[T]
>   }
> }
>
> override def close: Unit = ()
>   }
>
> This is a little better since we can just execute the query once. However, 
> the result-set needs to fit in memory.
>
> We've been trying to brainstorm a way to
>
> A) have that result set distribute out to the worker RDD partitions as it's 
> streaming in from the cursor?
> B) have the result set spill to disk if it exceeds memory and do something 
> clever around the iterators?
> C) something else?
>
> We're not familiar enough yet with all of the workings of Spark to know how 
> to proceed on this.
>
> We also thought of the worker-around of having the DB query dump to HDFS/S3 
> and then pick it up for there, but it adds more moving parts and latency to 
> our processing.
>
> Does anyone have a clever suggestion? Are we missing something?
>
> thanks,
> Michal
>
>


Re: Is there any Sparse Matrix implementation in Spark/MLib?

2015-03-01 Thread Vijay Saraswat

GML is a fast, distributed, in-memory sparse (and dense) matrix libraries.

It does not use RDDs for resilience. Instead we have examples that use 
Resilient X10 (which provides recovery of distributed control structures 
in case of node failure) and Hazelcast for stable storage.


We are looking to benchmark with RDDs to compare overhead, and also 
looking to see how the same ideas could be realized on top of RDDs.



On 2/28/15 7:25 PM, Joseph Bradley wrote:

Hi Shahab,

There are actually a few distributed Matrix types which support sparse 
representations: RowMatrix, IndexedRowMatrix, and CoordinateMatrix.  
The documentation has a bit more info about the various uses: 
http://spark.apache.org/docs/latest/mllib-data-types.html#distributed-matrix 



The Spark 1.3 RC includes a new one: BlockMatrix.

But since these are distributed, they are represented using RDDs, so 
they of course will not be as fast as computations on smaller, locally 
stored matrices.


Joseph

On Fri, Feb 27, 2015 at 4:39 AM, Ritesh Kumar Singh 
mailto:riteshoneinamill...@gmail.com>> 
wrote:


try using breeze (scala linear algebra library)

On Fri, Feb 27, 2015 at 5:56 PM, shahab mailto:shahab.mok...@gmail.com>> wrote:

Thanks a lot Vijay, let me see how it performs.

Best
Shahab


On Friday, February 27, 2015, Vijay Saraswat
mailto:vi...@saraswat.org>> wrote:

Available in GML --


http://x10-lang.org/x10-community/applications/global-matrix-library.html

We are exploring how to make it available within Spark.
Any ideas would be much appreciated.

On 2/27/15 7:01 AM, shahab wrote:

Hi,

I just wonder if there is any Sparse Matrix
implementation available  in Spark, so it can be used
in spark application?

best,
/Shahab




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org







Submitting jobs on Spark EC2 cluster: class not found, even if it's on CLASSPATH

2015-03-01 Thread olegshirokikh
Hi there,

I'm trying out Spark Job Server (REST) to submit jobs to spark cluster. I
believe that my problem is unrelated to this specific software, but
otherwise generic issue with missing jars on paths. So every application
implements the trait with SparkJob class:

/object LongPiJob extends SparkJob {
../

SparkJob class is available through the jar file, built by Spark Job Server
Scala application. When I run all this with local Spark cluster, everything
works fine after I add the export line into spark-env.sh:

/export SPARK_CLASSPATH=$SPARK_HOME/job-server/spark-job-server.jar/

However, when I do the same on Spark cluster on EC2, I get the errors:

/java.lang.NoClassDefFoundError: spark/jobserver/SparkJob/

I've added the path in spark-env.sh (on remote Spark master Amazon machine):

/export MASTER=`cat /root/spark-ec2/cluster-url`

*export SPARK_CLASSPATH=/root/spark/job-server/spark-job-server.jar*

export
SPARK_SUBMIT_LIBRARY_PATH="$SPARK_SUBMIT_LIBRARY_PATH:/root/ephemeral-hdfs/lib/native/"
export
SPARK_SUBMIT_CLASSPATH="$SPARK_CLASSPATH:$SPARK_SUBMIT_CLASSPATH:/root/ephemeral-hdfs/conf"/

Also, when I run ./bin/compute-classpath.sh, I can see the required jar,
defining "missing" class at the first place:

/bin]$ ./compute-classpath.sh 
Spark assembly has been built with Hive, including Datanucleus jars on
classpath
/root/spark/job-server/spark-job-server.jar:/root/spark/job-server/spark-job-server.jar::/root/ephemeral-hdfs/conf:/root/spark/conf:/root/spark/lib/spark-assembly-1.2.0-hadoop1.0.4.jar:/root/spark/lib/datanucleus-core-3.2.10.jar:/root/spark/lib/datanucleus-api-jdo-3.2.6.jar:/root/spark/lib/datanucleus-rdbms-3.2.9.jar/


What am I missing? I'd greatly appreciate your help




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-jobs-on-Spark-EC2-cluster-class-not-found-even-if-it-s-on-CLASSPATH-tp21864.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark Streaming testing strategies

2015-03-01 Thread Marcin Kuthan
I have started using Spark and Spark Streaming and I'm wondering how do you
test your applications? Especially Spark Streaming application with window
based transformations.

After some digging I found ManualClock class to take full control over
stream processing. Unfortunately the class is not available outside
spark.streaming package. Are you going to expose the class for other
developers as well? Now I have to use my custom wrapper under
spark.streaming package.

My Spark and Spark Streaming unit tests strategies are documented here:
http://mkuthan.github.io/blog/2015/03/01/spark-unit-testing/

Your feedback is more than appreciated.

Marcin


Re: Some questions after playing a little with the new ml.Pipeline.

2015-03-01 Thread Jaonary Rabarisoa
Hi Joseph,

Thank your for the tips. I understand what should I do when my data are
represented as a RDD. The thing that I can't figure out is how to do the
same thing when the data is view as a DataFrame and I need to add the
result of my pretrained model as a new column in the DataFrame. Preciselly,
I want to implement the following transformer :

class DeepCNNFeature extends Transformer ... {

}

On Sun, Mar 1, 2015 at 1:32 AM, Joseph Bradley 
wrote:

> Hi Jao,
>
> You can use external tools and libraries if they can be called from your
> Spark program or script (with appropriate conversion of data types, etc.).
> The best way to apply a pre-trained model to a dataset would be to call the
> model from within a closure, e.g.:
>
> myRDD.map { myDatum => preTrainedModel.predict(myDatum) }
>
> If your data is distributed in an RDD (myRDD), then the above call will
> distribute the computation of prediction using the pre-trained model.  It
> will require that all of your Spark workers be able to run the
> preTrainedModel; that may mean installing Caffe and dependencies on all
> nodes in the compute cluster.
>
> For the second question, I would modify the above call as follows:
>
> myRDD.mapPartitions { myDataOnPartition =>
>   val myModel = // instantiate neural network on this partition
>   myDataOnPartition.map { myDatum => myModel.predict(myDatum) }
> }
>
> I hope this helps!
> Joseph
>
> On Fri, Feb 27, 2015 at 10:27 PM, Jaonary Rabarisoa 
> wrote:
>
>> Dear all,
>>
>>
>> We mainly do large scale computer vision task (image classification,
>> retrieval, ...). The pipeline is really great stuff for that. We're trying
>> to reproduce the tutorial given on that topic during the latest spark
>> summit (
>> http://ampcamp.berkeley.edu/5/exercises/image-classification-with-pipelines.html
>>  )
>> using the master version of spark pipeline and dataframe. The tutorial
>> shows different examples of feature extraction stages before running
>> machine learning algorithms. Even the tutorial is straightforward to
>> reproduce with this new API, we still have some questions :
>>
>>- Can one use external tools (e.g via pipe) as a pipeline stage ? An
>>example of use case is to extract feature learned with convolutional 
>> neural
>>network. In our case, this corresponds to a pre-trained neural network 
>> with
>>Caffe library (http://caffe.berkeleyvision.org/) .
>>
>>
>>- The second question is about the performance of the pipeline.
>>Library such as Caffe processes the data in batch and instancing one Caffe
>>network can be time consuming when this network is very deep. So, we can
>>gain performance if we minimize the number of Caffe network creation and
>>give data in batch to the network. In the pipeline, this corresponds to 
>> run
>>transformers that work on a partition basis and give the whole partition 
>> to
>>a single caffe network. How can we create such a transformer ?
>>
>>
>>
>> Best,
>>
>> Jao
>>
>
>


Re: Some questions after playing a little with the new ml.Pipeline.

2015-03-01 Thread Jaonary Rabarisoa
class DeepCNNFeature extends Transformer ... {

override def transform(data: DataFrame, paramMap: ParamMap): DataFrame
= {


 // How can I do a map partition on the underlying RDD and
then add the column ?

 }
}

On Sun, Mar 1, 2015 at 10:23 AM, Jaonary Rabarisoa 
wrote:

> Hi Joseph,
>
> Thank your for the tips. I understand what should I do when my data are
> represented as a RDD. The thing that I can't figure out is how to do the
> same thing when the data is view as a DataFrame and I need to add the
> result of my pretrained model as a new column in the DataFrame. Preciselly,
> I want to implement the following transformer :
>
> class DeepCNNFeature extends Transformer ... {
>
> }
>
> On Sun, Mar 1, 2015 at 1:32 AM, Joseph Bradley 
> wrote:
>
>> Hi Jao,
>>
>> You can use external tools and libraries if they can be called from your
>> Spark program or script (with appropriate conversion of data types, etc.).
>> The best way to apply a pre-trained model to a dataset would be to call the
>> model from within a closure, e.g.:
>>
>> myRDD.map { myDatum => preTrainedModel.predict(myDatum) }
>>
>> If your data is distributed in an RDD (myRDD), then the above call will
>> distribute the computation of prediction using the pre-trained model.  It
>> will require that all of your Spark workers be able to run the
>> preTrainedModel; that may mean installing Caffe and dependencies on all
>> nodes in the compute cluster.
>>
>> For the second question, I would modify the above call as follows:
>>
>> myRDD.mapPartitions { myDataOnPartition =>
>>   val myModel = // instantiate neural network on this partition
>>   myDataOnPartition.map { myDatum => myModel.predict(myDatum) }
>> }
>>
>> I hope this helps!
>> Joseph
>>
>> On Fri, Feb 27, 2015 at 10:27 PM, Jaonary Rabarisoa 
>> wrote:
>>
>>> Dear all,
>>>
>>>
>>> We mainly do large scale computer vision task (image classification,
>>> retrieval, ...). The pipeline is really great stuff for that. We're trying
>>> to reproduce the tutorial given on that topic during the latest spark
>>> summit (
>>> http://ampcamp.berkeley.edu/5/exercises/image-classification-with-pipelines.html
>>>  )
>>> using the master version of spark pipeline and dataframe. The tutorial
>>> shows different examples of feature extraction stages before running
>>> machine learning algorithms. Even the tutorial is straightforward to
>>> reproduce with this new API, we still have some questions :
>>>
>>>- Can one use external tools (e.g via pipe) as a pipeline stage ? An
>>>example of use case is to extract feature learned with convolutional 
>>> neural
>>>network. In our case, this corresponds to a pre-trained neural network 
>>> with
>>>Caffe library (http://caffe.berkeleyvision.org/) .
>>>
>>>
>>>- The second question is about the performance of the pipeline.
>>>Library such as Caffe processes the data in batch and instancing one 
>>> Caffe
>>>network can be time consuming when this network is very deep. So, we can
>>>gain performance if we minimize the number of Caffe network creation and
>>>give data in batch to the network. In the pipeline, this corresponds to 
>>> run
>>>transformers that work on a partition basis and give the whole partition 
>>> to
>>>a single caffe network. How can we create such a transformer ?
>>>
>>>
>>>
>>> Best,
>>>
>>> Jao
>>>
>>
>>
>


Re: Number of cores per executor on Spark Standalone

2015-03-01 Thread Deborah Siegel
Hi,

Someone else will have a better answer. I think that for standalone mode,
executors will grab whatever cores they can based on either configurations
on the worker, or application specific configurations. Could be wrong, but
I believe mesos is similar to this- and that YARN is alone in the ability
to specify a specific number of cores given to each executor.

For Standalone Mode, configurations on the workers can limit the number of
cores available on themselves, and applications can limit the number of
cores they will grab across the entire cluster.

1) environmental property on each worker -SPARK_WORKER_CORES, or set this
as --cores as you manually start each worker. This will effect how many
cores are available on the worker for all applications.
2) environmental property on each worker - spark.deploy.defaultCores, which
limits the number of cores any single application can grab from the worker
in the case that the application has not set total.maximum.cores  (or
-total-executor-cores as a flag to spark-submit). If the application has
not set total.maximum.cores, and the worker does not have
spark.deploy.defaultCores set, the application can grab infinite cores on
the node. Could be an issue for a shared cluster.

Sincerely,
Deb









On Fri, Feb 27, 2015 at 11:13 PM, bit1...@163.com  wrote:

> Hi ,
>
> I know that spark on yarn has a configuration parameter(executor-cores
> NUM) to  specify the number of cores per executor.
> How about spark standalone? I can specify the total cores, but how could I
> know how many cores each executor will take(presume one node one
> executor)?
>
>
> --
> bit1...@163.com
>


Re: Scalable JDBCRDD

2015-03-01 Thread Jörn Franke
What database are you using?
Le 28 févr. 2015 18:15, "Michal Klos"  a écrit :

> Hi Spark community,
>
> We have a use case where we need to pull huge amounts of data from a SQL
> query against a database into Spark. We need to execute the query against
> our huge database and not a substitute (SparkSQL, Hive, etc) because of a
> couple of factors including custom functions used in the queries that only
> our database has.
>
> We started by looking at JDBC RDD, which utilizes a prepared statement
> with two parameters that are meant to be used to partition the result set
> to the workers... e.g.:
>
> select * from table limit ?,?
>
> turns into
>
> select * from table limit 1,100 on worker 1
> select * from table limit 101,200 on worker 2
>
> This will not work for us because our database cannot support multiple
> execution of these queries without being crippled. But, additionally, our
> database doesn't support the above LIMIT syntax and we don't have a generic
> way of partitioning the various queries.
>
> As a result -- we stated by forking JDBCRDD and made a version that
> executes the SQL query once in getPartitions into a Vector and then hands
> each worker node an index and iterator. Here's a snippet of getPartitions
> and compute:
>
>   override def getPartitions: Array[Partition] = {
> //Compute the DB query once here
> val results = computeQuery
>
> (0 until numPartitions).map(i => {
>   // TODO: would be better to do this partitioning when scrolling through 
> result set if still loading into memory
>   val partitionItems = results.drop(i).sliding(1, 
> numPartitions).flatten.toVector
>   new DBPartition(i, partitionItems)
> }).toArray
>   }
>
>   override def compute(thePart: Partition, context: TaskContext) = new 
> NextIterator[T] {
> val part = thePart.asInstanceOf[DBPartition[T]]
>
> //Shift the result vector to our index number and then do a sliding 
> iterator over it
> val iterator = part.items.iterator
>
> override def getNext : T = {
>   if (iterator.hasNext) {
> iterator.next()
>   } else {
> finished = true
> null.asInstanceOf[T]
>   }
> }
>
> override def close: Unit = ()
>   }
>
> This is a little better since we can just execute the query once. However, 
> the result-set needs to fit in memory.
>
> We've been trying to brainstorm a way to
>
> A) have that result set distribute out to the worker RDD partitions as it's 
> streaming in from the cursor?
> B) have the result set spill to disk if it exceeds memory and do something 
> clever around the iterators?
> C) something else?
>
> We're not familiar enough yet with all of the workings of Spark to know how 
> to proceed on this.
>
> We also thought of the worker-around of having the DB query dump to HDFS/S3 
> and then pick it up for there, but it adds more moving parts and latency to 
> our processing.
>
> Does anyone have a clever suggestion? Are we missing something?
>
> thanks,
> Michal
>
>


Re: Is there any Sparse Matrix implementation in Spark/MLib?

2015-03-01 Thread shahab
Thanks Josef for the comments, I think I need to do some benchmarking.

best,
/Shahab

On Sun, Mar 1, 2015 at 1:25 AM, Joseph Bradley 
wrote:

> Hi Shahab,
>
> There are actually a few distributed Matrix types which support sparse
> representations: RowMatrix, IndexedRowMatrix, and CoordinateMatrix.
> The documentation has a bit more info about the various uses:
> http://spark.apache.org/docs/latest/mllib-data-types.html#distributed-matrix
>
> The Spark 1.3 RC includes a new one: BlockMatrix.
>
> But since these are distributed, they are represented using RDDs, so they
> of course will not be as fast as computations on smaller, locally stored
> matrices.
>
> Joseph
>
> On Fri, Feb 27, 2015 at 4:39 AM, Ritesh Kumar Singh <
> riteshoneinamill...@gmail.com> wrote:
>
>> try using breeze (scala linear algebra library)
>>
>> On Fri, Feb 27, 2015 at 5:56 PM, shahab  wrote:
>>
>>> Thanks a lot Vijay, let me see how it performs.
>>>
>>> Best
>>> Shahab
>>>
>>>
>>> On Friday, February 27, 2015, Vijay Saraswat  wrote:
>>>
 Available in GML --

 http://x10-lang.org/x10-community/applications/global-
 matrix-library.html

 We are exploring how to make it available within Spark. Any ideas would
 be much appreciated.

 On 2/27/15 7:01 AM, shahab wrote:

> Hi,
>
> I just wonder if there is any Sparse Matrix implementation available
> in Spark, so it can be used in spark application?
>
> best,
> /Shahab
>


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


>>
>


Re: Is there any Sparse Matrix implementation in Spark/MLib?

2015-03-01 Thread shahab
Thanks Vijay, but the setup requirement for GML was not straightforward for
me at all, so I put it aside for a while.

best,
/Shahab

On Sun, Mar 1, 2015 at 9:34 AM, Vijay Saraswat  wrote:

>  GML is a fast, distributed, in-memory sparse (and dense) matrix
> libraries.
>
> It does not use RDDs for resilience. Instead we have examples that use
> Resilient X10 (which provides recovery of distributed control structures in
> case of node failure) and Hazelcast for stable storage.
>
> We are looking to benchmark with RDDs to compare overhead, and also
> looking to see how the same ideas could be realized on top of RDDs.
>
>
>
> On 2/28/15 7:25 PM, Joseph Bradley wrote:
>
> Hi Shahab,
>
>  There are actually a few distributed Matrix types which support sparse
> representations: RowMatrix, IndexedRowMatrix, and CoordinateMatrix.
> The documentation has a bit more info about the various uses:
> http://spark.apache.org/docs/latest/mllib-data-types.html#distributed-matrix
>
>  The Spark 1.3 RC includes a new one: BlockMatrix.
>
>  But since these are distributed, they are represented using RDDs, so
> they of course will not be as fast as computations on smaller, locally
> stored matrices.
>
>  Joseph
>
> On Fri, Feb 27, 2015 at 4:39 AM, Ritesh Kumar Singh <
> riteshoneinamill...@gmail.com> wrote:
>
>> try using breeze (scala linear algebra library)
>>
>> On Fri, Feb 27, 2015 at 5:56 PM, shahab  wrote:
>>
>>> Thanks a lot Vijay, let me see how it performs.
>>>
>>>  Best
>>> Shahab
>>>
>>>
>>> On Friday, February 27, 2015, Vijay Saraswat  wrote:
>>>
 Available in GML --


 http://x10-lang.org/x10-community/applications/global-matrix-library.html

 We are exploring how to make it available within Spark. Any ideas would
 be much appreciated.

 On 2/27/15 7:01 AM, shahab wrote:

> Hi,
>
> I just wonder if there is any Sparse Matrix implementation available
> in Spark, so it can be used in spark application?
>
> best,
> /Shahab
>


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


>>
>
>


Re: Columnar-Oriented RDDs

2015-03-01 Thread Night Wolf
Thanks for the comments guys.

Parquet is awesome. My question with using Parquet for on disk storage -
how should I load that into memory as a spark RDD and cache it and keep it
in a columnar format?

I know I can use Spark SQL with parquet which is awesome. But as soon as I
step out of SQL we have problems as it kinda gets converted back to a row
oriented format.

@Koert - that looks really exciting. Do you have any statistics on memory
and scan performance?

On Saturday, February 14, 2015, Koert Kuipers  wrote:

> i wrote a proof of concept to automatically store any RDD of tuples or
> case classes in columar format using arrays (and strongly typed, so you get
> the benefit of primitive arrays). see:
> https://github.com/tresata/spark-columnar
>
> On Fri, Feb 13, 2015 at 3:06 PM, Michael Armbrust  > wrote:
>
>> Shark's in-memory code was ported to Spark SQL and is used by default
>> when you run .cache on a SchemaRDD or CACHE TABLE.
>>
>> I'd also look at parquet which is more efficient and handles nested data
>> better.
>>
>> On Fri, Feb 13, 2015 at 7:36 AM, Night Wolf > > wrote:
>>
>>> Hi all,
>>>
>>> I'd like to build/use column oriented RDDs in some of my Spark code. A
>>> normal Spark RDD is stored as row oriented object if I understand
>>> correctly.
>>>
>>> I'd like to leverage some of the advantages of a columnar memory format.
>>> Shark (used to) and SparkSQL uses a columnar storage format using primitive
>>> arrays for each column.
>>>
>>> I'd be interested to know more about this approach and how I could build
>>> my own custom columnar-oriented RDD which I can use outside of Spark SQL.
>>>
>>> Could anyone give me some pointers on where to look to do something like
>>> this, either from scratch or using whats there in the SparkSQL libs or
>>> elsewhere. I know Evan Chan in a presentation made mention of building a
>>> custom RDD of column-oriented blocks of data.
>>>
>>> Cheers,
>>> ~N
>>>
>>
>>
>


Re: Connection pool in workers

2015-03-01 Thread A.K.M. Ashrafuzzaman
Sorry guys may bad,
Here is a high level code sample,

val unionStreams = ssc.union(kinesisStreams)
unionStreams.foreachRDD(rdd => {
  rdd.foreach(tweet =>
val strTweet = new String(tweet, "UTF-8")
val interaction = InteractionParser.parser(strTweet)
interactionDAL.insert(interaction)
  )
})

Here I have to close the connection for interactionDAL other wise the JVM gives 
me error that the connection is open. I tried with sticky connection as well 
with keep_alive true. So my guess was that at the point of 
“unionStreams.foreachRDD” or at “rdd.foreach” the code is marshaled and send to 
workers and workers un-marshals and execute the process, which is why the 
connection is alway opened for each RDD. I might be completely wrong. I would 
love to know what is going on underneath.
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Scalable JDBCRDD

2015-03-01 Thread michal.klo...@gmail.com
Jorn: Vertica 

Cody: I posited the limit just as an example of how jdbcrdd could be used least 
invasively. Let's say we used a partition on a time field -- we would still 
need to have N executions of those queries. The queries we have are very 
intense and concurrency is an issue even if the the N partitioned queries are 
smaller. Some queries require evaluating the whole data set first. If our use 
case a simple select * from table.. Then the partitions would be an easier sell 
if it wasn't for the concurrency problem :) Long story short -- we need only 
one execution of the query and would like to just divy out the result set.

M



> On Mar 1, 2015, at 5:18 AM, Jörn Franke  wrote:
> 
> What database are you using?
> 
> Le 28 févr. 2015 18:15, "Michal Klos"  a écrit :
>> Hi Spark community,
>> 
>> We have a use case where we need to pull huge amounts of data from a SQL 
>> query against a database into Spark. We need to execute the query against 
>> our huge database and not a substitute (SparkSQL, Hive, etc) because of a 
>> couple of factors including custom functions used in the queries that only 
>> our database has.
>> 
>> We started by looking at JDBC RDD, which utilizes a prepared statement with 
>> two parameters that are meant to be used to partition the result set to the 
>> workers... e.g.:
>> 
>> select * from table limit ?,?
>> 
>> turns into
>> 
>> select * from table limit 1,100 on worker 1
>> select * from table limit 101,200 on worker 2
>> 
>> This will not work for us because our database cannot support multiple 
>> execution of these queries without being crippled. But, additionally, our 
>> database doesn't support the above LIMIT syntax and we don't have a generic 
>> way of partitioning the various queries.
>> 
>> As a result -- we stated by forking JDBCRDD and made a version that executes 
>> the SQL query once in getPartitions into a Vector and then hands each worker 
>> node an index and iterator. Here's a snippet of getPartitions and compute:
>> 
>>   override def getPartitions: Array[Partition] = {
>> //Compute the DB query once here
>> val results = computeQuery
>>  
>> (0 until numPartitions).map(i => {
>>   // TODO: would be better to do this partitioning when scrolling 
>> through result set if still loading into memory
>>   val partitionItems = results.drop(i).sliding(1, 
>> numPartitions).flatten.toVector
>>   new DBPartition(i, partitionItems)
>> }).toArray
>>   }
>> 
>>   override def compute(thePart: Partition, context: TaskContext) = new 
>> NextIterator[T] {
>> val part = thePart.asInstanceOf[DBPartition[T]]
>>  
>> //Shift the result vector to our index number and then do a sliding 
>> iterator over it
>> val iterator = part.items.iterator
>>  
>> override def getNext : T = {
>>   if (iterator.hasNext) {
>> iterator.next()
>>   } else {
>> finished = true
>> null.asInstanceOf[T]
>>   }
>> }
>>  
>> override def close: Unit = ()
>>   }
>> 
>> This is a little better since we can just execute the query once. However, 
>> the result-set needs to fit in memory. 
>> 
>> We've been trying to brainstorm a way to 
>> 
>> A) have that result set distribute out to the worker RDD partitions as it's 
>> streaming in from the cursor?
>> B) have the result set spill to disk if it exceeds memory and do something 
>> clever around the iterators?
>> C) something else?
>> 
>> We're not familiar enough yet with all of the workings of Spark to know how 
>> to proceed on this.
>> 
>> We also thought of the worker-around of having the DB query dump to HDFS/S3 
>> and then pick it up for there, but it adds more moving parts and latency to 
>> our processing. 
>> 
>> Does anyone have a clever suggestion? Are we missing something? 
>> 
>> thanks,
>> Michal


Store Spark data into hive table

2015-03-01 Thread tarek_abouzeid
I am trying to store my word count output into hive data warehouse my
pipeline is:

Flume streaming => spark do word count => store result in hive table for
visualization later

my code is :

*import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.flume._
import org.apache.spark.util.IntParam
import org.apache.spark.sql._
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.SQLContext

object WordCount {
  def main(args: Array[String]) {
if (args.length < 2) {
  System.err.println(
"Usage: WordCount  ")
  System.exit(1)
}

val Array(host, port) = args

val batchInterval = Milliseconds(2000)

// Create the context and set the batch size
val sparkConf = new SparkConf().setAppName("WordCount")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, batchInterval)

// Create a flume stream
val stream = FlumeUtils.createStream(ssc, host, port.toInt)

// Print out the count of events received from this server in each batch
stream.count().map(cnt => "Received !!!:" + cnt + " flume events."
).print()
  
// it holds the string stream (converted event body array into string)
val body = stream.map(e => new String(e.event.getBody.array))


val counts = body.flatMap(line =>
line.toLowerCase.replaceAll("[^a-zA-Z0-9\\s]", "").split("\\s+"))
 .map(word => (word, 1))
 .reduceByKey(_ + _)

// TESTING storing variable counts into hive ::

val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) 
import hiveContext._ 
val good = createSchemaRDD(counts)
good.saveAsTable("meta_test")  

ssc.start()
ssc.awaitTermination()
  }
}*

this gives me error :  *value createschemardd is not a member of
org.apache.spark.sql.sqlcontext*

so is there any way to fix this or other method to store data into hive data
warehouse ?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Store-Spark-data-into-hive-table-tp21865.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Scalable JDBCRDD

2015-03-01 Thread eric
What you're saying is that, due to the intensity of the query, you need 
to run a single query and partition the results, versus running one 
query for each partition.


I assume it's not viable to throw the query results into another table 
in your database and then query that using the normal approach?


--eric

On 3/1/15 4:28 AM, michal.klo...@gmail.com wrote:

Jorn: Vertica

Cody: I posited the limit just as an example of how jdbcrdd could be 
used least invasively. Let's say we used a partition on a time field 
-- we would still need to have N executions of those queries. The 
queries we have are very intense and concurrency is an issue even if 
the the N partitioned queries are smaller. Some queries require 
evaluating the whole data set first. If our use case a simple select * 
from table.. Then the partitions would be an easier sell if it wasn't 
for the concurrency problem :) Long story short -- we need only one 
execution of the query and would like to just divy out the result set.


M



On Mar 1, 2015, at 5:18 AM, Jörn Franke > wrote:



What database are you using?

Le 28 févr. 2015 18:15, "Michal Klos" > a écrit :


Hi Spark community,

We have a use case where we need to pull huge amounts of data
from a SQL query against a database into Spark. We need to
execute the query against our huge database and not a substitute
(SparkSQL, Hive, etc) because of a couple of factors including
custom functions used in the queries that only our database has.

We started by looking at JDBC RDD, which utilizes a prepared
statement with two parameters that are meant to be used to
partition the result set to the workers... e.g.:

select * from table limit ?,?

turns into

select * from table limit 1,100 on worker 1
select * from table limit 101,200 on worker 2

This will not work for us because our database cannot support
multiple execution of these queries without being crippled. But,
additionally, our database doesn't support the above LIMIT syntax
and we don't have a generic way of partitioning the various queries.

As a result -- we stated by forking JDBCRDD and made a version
that executes the SQL query once in getPartitions into a Vector
and then hands each worker node an index and iterator. Here's a
snippet of getPartitions and compute:

override def getPartitions: Array[Partition] = {
//Compute the DB query once here
val results = computeQuery
(0 until numPartitions).map(i => {
// TODO: would be better to do this partitioning when scrolling
through result set if still loading into memory
val partitionItems = results.drop(i).sliding(1,
numPartitions).flatten.toVector
new DBPartition(i, partitionItems)
}).toArray
}

override def compute(thePart: Partition, context: TaskContext) =
new NextIterator[T] {
val part = thePart.asInstanceOf[DBPartition[T]]
//Shift the result vector to our index number and then do a
sliding iterator over it
val iterator = part.items.iterator
override def getNext : T = {
if (iterator.hasNext) {
iterator.next()
} else {
finished = true
null.asInstanceOf[T]
}
}
override def close: Unit = ()
}
This is a little better since we can just execute the query once.
However, the result-set needs to fit in memory.
We've been trying to brainstorm a way to
A) have that result set distribute out to the worker RDD
partitions as it's streaming in from the cursor?
B) have the result set spill to disk if it exceeds memory and do
something clever around the iterators?
C) something else?
We're not familiar enough yet with all of the workings of Spark
to know how to proceed on this.
We also thought of the worker-around of having the DB query dump
to HDFS/S3 and then pick it up for there, but it adds more moving
parts and latency to our processing.
Does anyone have a clever suggestion? Are we missing something?
thanks,
Michal





unsafe memory access in spark 1.2.1

2015-03-01 Thread Zalzberg, Idan (Agoda)
Hi,
I am using spark 1.2.1, sometimes I get these errors sporadically:
Any thought on what could be the cause?

Thanks

2015-02-27 15:08:47 ERROR SparkUncaughtExceptionHandler:96 - Uncaught exception 
in thread Thread[Executor task launch worker-25,5,main]
java.lang.InternalError: a fault occurred in a recent unsafe memory access 
operation in compiled Java code
at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1377)
at 
java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
at 
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
at 
org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at 
scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at 
org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:365)
at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)



This message is confidential and is for the sole use of the intended 
recipient(s). It may also be privileged or otherwise protected by copyright or 
other legal rules. If you have received it by mistake please let us know by 
reply email and delete it from your system. It is prohibited to copy this 
message or disclose its content to anyone. Any confidentiality or privilege is 
not waived or lost by any mistaken delivery or unauthorized disclosure of the 
message. All messages sent to and from Agoda may be monitored to ensure 
compliance with company policies, to protect the company's interests and to 
remove potential malware. Electronic messages may be intercepted, amended, lost 
or deleted, or contain viruses.


Re: unsafe memory access in spark 1.2.1

2015-03-01 Thread Ted Yu
What Java version are you using ?

Thanks

On Sun, Mar 1, 2015 at 7:03 AM, Zalzberg, Idan (Agoda) <
idan.zalzb...@agoda.com> wrote:

>  Hi,
>
> I am using spark 1.2.1, sometimes I get these errors sporadically:
>
> Any thought on what could be the cause?
>
> Thanks
>
>
>
> 2015-02-27 15:08:47 ERROR SparkUncaughtExceptionHandler:96 - Uncaught
> exception in thread Thread[Executor task launch worker-25,5,main]
>
> java.lang.InternalError: a fault occurred in a recent unsafe memory access
> operation in compiled Java code
>
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1377)
>
> at
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>
> at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
>
> at
> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
>
> at
> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>
> at
> scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
>
> at
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>
> at
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>
> at
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>
> at
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>
> at
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>
> at
> org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:365)
>
> at
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211)
>
> at
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
>
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:56)
>
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
> at java.lang.Thread.run(Thread.java:745)
>
>
>
> --
> This message is confidential and is for the sole use of the intended
> recipient(s). It may also be privileged or otherwise protected by copyright
> or other legal rules. If you have received it by mistake please let us know
> by reply email and delete it from your system. It is prohibited to copy
> this message or disclose its content to anyone. Any confidentiality or
> privilege is not waived or lost by any mistaken delivery or unauthorized
> disclosure of the message. All messages sent to and from Agoda may be
> monitored to ensure compliance with company policies, to protect the
> company's interests and to remove potential malware. Electronic messages
> may be intercepted, amended, lost or deleted, or contain viruses.
>


Re: Scalable JDBCRDD

2015-03-01 Thread michal.klo...@gmail.com
Yes exactly.

The temp table is an approach but then we need to manage the deletion of it etc.

I'm sure we won't be the only people with this crazy use case. 

If there isn't a feasible way to do this "within the framework" then that's 
okay. But if there is a way we are happy to write the code and PR it back :)

M



> On Mar 1, 2015, at 10:02 AM, eric  wrote:
> 
> What you're saying is that, due to the intensity of the query, you need to 
> run a single query and partition the results, versus running one query for 
> each partition.
> 
> I assume it's not viable to throw the query results into another table in 
> your database and then query that using the normal approach?
> 
> --eric
> 
>> On 3/1/15 4:28 AM, michal.klo...@gmail.com wrote:
>> Jorn: Vertica 
>> 
>> Cody: I posited the limit just as an example of how jdbcrdd could be used 
>> least invasively. Let's say we used a partition on a time field -- we would 
>> still need to have N executions of those queries. The queries we have are 
>> very intense and concurrency is an issue even if the the N partitioned 
>> queries are smaller. Some queries require evaluating the whole data set 
>> first. If our use case a simple select * from table.. Then the partitions 
>> would be an easier sell if it wasn't for the concurrency problem :) Long 
>> story short -- we need only one execution of the query and would like to 
>> just divy out the result set.
>> 
>> M
>> 
>> 
>> 
>> On Mar 1, 2015, at 5:18 AM, Jörn Franke  wrote:
>> 
>>> What database are you using?
>>> 
>>> Le 28 févr. 2015 18:15, "Michal Klos"  a écrit :
 Hi Spark community,
 
 We have a use case where we need to pull huge amounts of data from a SQL 
 query against a database into Spark. We need to execute the query against 
 our huge database and not a substitute (SparkSQL, Hive, etc) because of a 
 couple of factors including custom functions used in the queries that only 
 our database has.
 
 We started by looking at JDBC RDD, which utilizes a prepared statement 
 with two parameters that are meant to be used to partition the result set 
 to the workers... e.g.:
 
 select * from table limit ?,?
 
 turns into
 
 select * from table limit 1,100 on worker 1
 select * from table limit 101,200 on worker 2
 
 This will not work for us because our database cannot support multiple 
 execution of these queries without being crippled. But, additionally, our 
 database doesn't support the above LIMIT syntax and we don't have a 
 generic way of partitioning the various queries.
 
 As a result -- we stated by forking JDBCRDD and made a version that 
 executes the SQL query once in getPartitions into a Vector and then hands 
 each worker node an index and iterator. Here's a snippet of getPartitions 
 and compute:
 
   override def getPartitions: Array[Partition] = {
 //Compute the DB query once here
 val results = computeQuery
  
 (0 until numPartitions).map(i => {
   // TODO: would be better to do this partitioning when scrolling 
 through result set if still loading into memory
   val partitionItems = results.drop(i).sliding(1, 
 numPartitions).flatten.toVector
   new DBPartition(i, partitionItems)
 }).toArray
   }
 
   override def compute(thePart: Partition, context: TaskContext) = new 
 NextIterator[T] {
 val part = thePart.asInstanceOf[DBPartition[T]]
  
 //Shift the result vector to our index number and then do a sliding 
 iterator over it
 val iterator = part.items.iterator
  
 override def getNext : T = {
   if (iterator.hasNext) {
 iterator.next()
   } else {
 finished = true
 null.asInstanceOf[T]
   }
 }
  
 override def close: Unit = ()
   }
 
 This is a little better since we can just execute the query once. However, 
 the result-set needs to fit in memory. 
 We've been trying to brainstorm a way to 
 A) have that result set distribute out to the worker RDD partitions as 
 it's streaming in from the cursor?
 B) have the result set spill to disk if it exceeds memory and do something 
 clever around the iterators?
 C) something else?
 We're not familiar enough yet with all of the workings of Spark to know 
 how to proceed on this.
 We also thought of the worker-around of having the DB query dump to 
 HDFS/S3 and then pick it up for there, but it adds more moving parts and 
 latency to our processing. 
 Does anyone have a clever suggestion? Are we missing something? 
 thanks,
 Michal
> 


RE: unsafe memory access in spark 1.2.1

2015-03-01 Thread Zalzberg, Idan (Agoda)
My run time version is:

java version "1.7.0_75"
OpenJDK Runtime Environment (rhel-2.5.4.0.el6_6-x86_64 u75-b13)
OpenJDK 64-Bit Server VM (build 24.75-b04, mixed mode)

Thanks

From: Ted Yu [mailto:yuzhih...@gmail.com]
Sent: Sunday, March 01, 2015 10:18 PM
To: Zalzberg, Idan (Agoda)
Cc: user@spark.apache.org
Subject: Re: unsafe memory access in spark 1.2.1

What Java version are you using ?

Thanks

On Sun, Mar 1, 2015 at 7:03 AM, Zalzberg, Idan (Agoda) 
mailto:idan.zalzb...@agoda.com>> wrote:
Hi,
I am using spark 1.2.1, sometimes I get these errors sporadically:
Any thought on what could be the cause?

Thanks

2015-02-27 15:08:47 ERROR SparkUncaughtExceptionHandler:96 - Uncaught exception 
in thread Thread[Executor task launch worker-25,5,main]
java.lang.InternalError: a fault occurred in a recent unsafe memory access 
operation in compiled Java code
at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1377)
at 
java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
at 
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
at 
org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at 
scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at 
org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:365)
at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)



This message is confidential and is for the sole use of the intended 
recipient(s). It may also be privileged or otherwise protected by copyright or 
other legal rules. If you have received it by mistake please let us know by 
reply email and delete it from your system. It is prohibited to copy this 
message or disclose its content to anyone. Any confidentiality or privilege is 
not waived or lost by any mistaken delivery or unauthorized disclosure of the 
message. All messages sent to and from Agoda may be monitored to ensure 
compliance with company policies, to protect the company's interests and to 
remove potential malware. Electronic messages may be intercepted, amended, lost 
or deleted, or contain viruses.



Re: unsafe memory access in spark 1.2.1

2015-03-01 Thread Ted Yu
Google led me to:
https://bugs.openjdk.java.net/browse/JDK-8040802

Not sure if the last comment there applies to your deployment.

On Sun, Mar 1, 2015 at 8:32 AM, Zalzberg, Idan (Agoda) <
idan.zalzb...@agoda.com> wrote:

>  My run time version is:
>
>
>
> java version "1.7.0_75"
>
> OpenJDK Runtime Environment (rhel-2.5.4.0.el6_6-x86_64 u75-b13)
>
> OpenJDK 64-Bit Server VM (build 24.75-b04, mixed mode)
>
>
>
> Thanks
>
>
>
> *From:* Ted Yu [mailto:yuzhih...@gmail.com]
> *Sent:* Sunday, March 01, 2015 10:18 PM
> *To:* Zalzberg, Idan (Agoda)
> *Cc:* user@spark.apache.org
> *Subject:* Re: unsafe memory access in spark 1.2.1
>
>
>
> What Java version are you using ?
>
>
>
> Thanks
>
>
>
> On Sun, Mar 1, 2015 at 7:03 AM, Zalzberg, Idan (Agoda) <
> idan.zalzb...@agoda.com> wrote:
>
>  Hi,
>
> I am using spark 1.2.1, sometimes I get these errors sporadically:
>
> Any thought on what could be the cause?
>
> Thanks
>
>
>
> 2015-02-27 15:08:47 ERROR SparkUncaughtExceptionHandler:96 - Uncaught
> exception in thread Thread[Executor task launch worker-25,5,main]
>
> java.lang.InternalError: a fault occurred in a recent unsafe memory access
> operation in compiled Java code
>
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1377)
>
> at
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>
> at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
>
> at
> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
>
> at
> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>
> at
> scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
>
> at
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>
> at
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>
> at
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>
> at
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>
> at
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>
> at
> org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:365)
>
> at
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211)
>
> at
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
>
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:56)
>
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
> at java.lang.Thread.run(Thread.java:745)
>
>
>
>
>  --
>  This message is confidential and is for the sole use of the intended
> recipient(s). It may also be privileged or otherwise protected by copyright
> or other legal rules. If you have received it by mistake please let us know
> by reply email and delete it from your system. It is prohibited to copy
> this message or disclose its content to anyone. Any confidentiality or
> privilege is not waived or lost by any mistaken delivery or unauthorized
> disclosure of the message. All messages sent to and from Agoda may be
> monitored to ensure compliance with company policies, to protect the
> company's interests and to remove potential malware. Electronic messages
> may be intercepted, amended, lost or deleted, or contain viruses.
>
>
>


Re: Columnar-Oriented RDDs

2015-03-01 Thread Koert Kuipers
Hey,
I do not have any statistics. I just wanted to show it can be done but left
it at that. The memory usage should be predictable: the benefit comes from
using arrays for primitive types. Accessing the data row-wise means
re-assembling the rows from the columnar data, which i have not tried to
profile or optimize at all yet, but for sure there should be some overhead
compared to an row-oriented RDD.
Also the format relies on compile-time types (which is what allows the
usage of arrays for primitive types).

On Sun, Mar 1, 2015 at 6:33 AM, Night Wolf  wrote:

> Thanks for the comments guys.
>
> Parquet is awesome. My question with using Parquet for on disk storage -
> how should I load that into memory as a spark RDD and cache it and keep it
> in a columnar format?
>
> I know I can use Spark SQL with parquet which is awesome. But as soon as I
> step out of SQL we have problems as it kinda gets converted back to a row
> oriented format.
>
> @Koert - that looks really exciting. Do you have any statistics on memory
> and scan performance?
>
>
> On Saturday, February 14, 2015, Koert Kuipers  wrote:
>
>> i wrote a proof of concept to automatically store any RDD of tuples or
>> case classes in columar format using arrays (and strongly typed, so you get
>> the benefit of primitive arrays). see:
>> https://github.com/tresata/spark-columnar
>>
>> On Fri, Feb 13, 2015 at 3:06 PM, Michael Armbrust > > wrote:
>>
>>> Shark's in-memory code was ported to Spark SQL and is used by default
>>> when you run .cache on a SchemaRDD or CACHE TABLE.
>>>
>>> I'd also look at parquet which is more efficient and handles nested data
>>> better.
>>>
>>> On Fri, Feb 13, 2015 at 7:36 AM, Night Wolf 
>>> wrote:
>>>
 Hi all,

 I'd like to build/use column oriented RDDs in some of my Spark code. A
 normal Spark RDD is stored as row oriented object if I understand
 correctly.

 I'd like to leverage some of the advantages of a columnar memory
 format. Shark (used to) and SparkSQL uses a columnar storage format using
 primitive arrays for each column.

 I'd be interested to know more about this approach and how I could
 build my own custom columnar-oriented RDD which I can use outside of Spark
 SQL.

 Could anyone give me some pointers on where to look to do something
 like this, either from scratch or using whats there in the SparkSQL libs or
 elsewhere. I know Evan Chan in a presentation made mention of building a
 custom RDD of column-oriented blocks of data.

 Cheers,
 ~N

>>>
>>>
>>


Re: Spark Streaming testing strategies

2015-03-01 Thread Holden Karau
There is also the Spark Testing Base package which is on spark-packages.org
and hides the ugly bits (it's based on the existing streaming test code but
I cleaned it up a bit to try and limit the number of internals it was
touching).

On Sunday, March 1, 2015, Marcin Kuthan  wrote:

> I have started using Spark and Spark Streaming and I'm wondering how do
> you test your applications? Especially Spark Streaming application with
> window based transformations.
>
> After some digging I found ManualClock class to take full control over
> stream processing. Unfortunately the class is not available outside
> spark.streaming package. Are you going to expose the class for other
> developers as well? Now I have to use my custom wrapper under
> spark.streaming package.
>
> My Spark and Spark Streaming unit tests strategies are documented here:
> http://mkuthan.github.io/blog/2015/03/01/spark-unit-testing/
>
> Your feedback is more than appreciated.
>
> Marcin
>
>

-- 
Cell : 425-233-8271


Re: Problem getting program to run on 15TB input

2015-03-01 Thread Arun Luthra
I tried a shorter simper version of the program, with just 1 RDD,
 essentially it is:

sc.textFile(..., N).map().filter().map( blah => (id,
1L)).reduceByKey().saveAsTextFile(...)

Here is a typical GC log trace from one of the yarn container logs:

54.040: [GC [PSYoungGen: 9176064K->28206K(10704896K)]
9176064K->28278K(35171840K), 0.0234420 secs] [Times: user=0.15 sys=0.01,
real=0.02 secs]
77.864: [GC [PSYoungGen: 9204270K->150553K(10704896K)]
9204342K->150641K(35171840K), 0.0423020 secs] [Times: user=0.30 sys=0.26,
real=0.04 secs]
79.485: [GC [PSYoungGen: 9326617K->333519K(10704896K)]
9326705K->333615K(35171840K), 0.0774990 secs] [Times: user=0.35 sys=1.28,
real=0.08 secs]
92.974: [GC [PSYoungGen: 9509583K->193370K(10704896K)]
9509679K->193474K(35171840K), 0.0241590 secs] [Times: user=0.35 sys=0.11,
real=0.02 secs]
114.842: [GC [PSYoungGen: 9369434K->123577K(10704896K)]
9369538K->123689K(35171840K), 0.0201000 secs] [Times: user=0.31 sys=0.00,
real=0.02 secs]
117.277: [GC [PSYoungGen: 9299641K->135459K(11918336K)]
9299753K->135579K(36385280K), 0.0244820 secs] [Times: user=0.19 sys=0.25,
real=0.02 secs]

So ~9GB is getting GC'ed every few seconds. Which seems like a lot.

Question: The filter() is removing 99% of the data. Does this 99% of the
data get GC'ed?

Now, I was able to finally get to reduceByKey() by reducing the number of
executor-cores (to 2), based on suggestions at
http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-OutOfMemoryError-java-lang-OutOfMemoryError-GC-overhead-limit-exceeded-td9036.html
. This makes everything before reduceByKey() run pretty smoothly.

I ran this with more executor-memory and less executors (most important
thing was fewer executor-cores):

--num-executors 150 \
--driver-memory 15g \
--executor-memory 110g \
--executor-cores 32 \

But then, reduceByKey() fails with:

java.lang.OutOfMemoryError: Java heap space




On Sat, Feb 28, 2015 at 12:09 PM, Arun Luthra  wrote:

> The Spark UI names the line number and name of the operation (repartition
> in this case) that it is performing. Only if this information is wrong
> (just a possibility), could it have started groupByKey already.
>
> I will try to analyze the amount of skew in the data by using reduceByKey
> (or simply countByKey) which is relatively inexpensive. For the purposes of
> this algorithm I can simply log and remove keys with huge counts, before
> doing groupByKey.
>
> On Sat, Feb 28, 2015 at 11:38 AM, Aaron Davidson 
> wrote:
>
>> All stated symptoms are consistent with GC pressure (other nodes timeout
>> trying to connect because of a long stop-the-world), quite possibly due to
>> groupByKey. groupByKey is a very expensive operation as it may bring all
>> the data for a particular partition into memory (in particular, it cannot
>> spill values for a single key, so if you have a single very skewed key you
>> can get behavior like this).
>>
>> On Sat, Feb 28, 2015 at 11:33 AM, Paweł Szulc 
>> wrote:
>>
>>> But groupbykey will repartition according to numer of keys as I
>>> understand how it works. How do you know that you haven't reached the
>>> groupbykey phase? Are you using a profiler or do yoi base that assumption
>>> only on logs?
>>>
>>> sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik 
>>> napisał:
>>>
>>> A correction to my first post:

 There is also a repartition right before groupByKey to help avoid
 too-many-open-files error:


 rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile()

 On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra 
 wrote:

> The job fails before getting to groupByKey.
>
> I see a lot of timeout errors in the yarn logs, like:
>
> 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1
> attempts
> akka.pattern.AskTimeoutException: Timed out
>
> and
>
> 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2
> attempts
> java.util.concurrent.TimeoutException: Futures timed out after [30
> seconds]
>
> and some of these are followed by:
>
> 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver
> Disassociated [akka.tcp://sparkExecutor@...] ->
> [akka.tcp://sparkDriver@...] disassociated! Shutting down.
> 15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0
> in stage 1.0 (TID 336601)
> java.io.FileNotFoundException:
> /hadoop/yarn/local//spark-local-20150228123450-3a71/36/shuffle_0_421027_0
> (No such file or directory)
>
>
>
>
> On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc 
> wrote:
>
>> I would first check whether  there is any possibility that after
>> doing groupbykey one of the groups does not fit in one of the executors'
>> memory.
>>
>> To back up my theory, instead of doing groupbykey + map try
>> reducebykey + mapvalues.
>>
>> Let me know if that hel

Pushing data from AWS Kinesis -> Spark Streaming -> AWS Redshift

2015-03-01 Thread Mike Trienis
Hi All,

I am looking at integrating a data stream from AWS Kinesis to AWS Redshift
and since I am already ingesting the data through Spark Streaming, it seems
convenient to also push that data to AWS Redshift at the same time.

I have taken a look at the AWS kinesis connector although I am not sure it
was designed to integrate with Apache Spark. It seems more like a
standalone approach:

   - https://github.com/awslabs/amazon-kinesis-connectors

There is also a Spark redshift integration library, however, it looks like
it was intended for pulling data rather than pushing data to AWS Redshift:

   - https://github.com/databricks/spark-redshift

I finally took a look at a general Scala library that integrates with AWS
Redshift:

   - http://scalikejdbc.org/

Does anyone have any experience pushing data from Spark Streaming to AWS
Redshift? Does it make sense conceptually, or does it make more sense to
push data from AWS Kinesis to AWS Redshift VIA another standalone approach
such as the AWS Kinesis connectors.

Thanks, Mike.


RE: unsafe memory access in spark 1.2.1

2015-03-01 Thread Zalzberg, Idan (Agoda)
Thanks,
We monitor disk space so I doubt that is it, but I will check again


From: Ted Yu [mailto:yuzhih...@gmail.com]
Sent: Sunday, March 01, 2015 11:45 PM
To: Zalzberg, Idan (Agoda)
Cc: user@spark.apache.org
Subject: Re: unsafe memory access in spark 1.2.1

Google led me to:
https://bugs.openjdk.java.net/browse/JDK-8040802

Not sure if the last comment there applies to your deployment.

On Sun, Mar 1, 2015 at 8:32 AM, Zalzberg, Idan (Agoda) 
mailto:idan.zalzb...@agoda.com>> wrote:
My run time version is:

java version "1.7.0_75"
OpenJDK Runtime Environment (rhel-2.5.4.0.el6_6-x86_64 u75-b13)
OpenJDK 64-Bit Server VM (build 24.75-b04, mixed mode)

Thanks

From: Ted Yu [mailto:yuzhih...@gmail.com]
Sent: Sunday, March 01, 2015 10:18 PM
To: Zalzberg, Idan (Agoda)
Cc: user@spark.apache.org
Subject: Re: unsafe memory access in spark 1.2.1

What Java version are you using ?

Thanks

On Sun, Mar 1, 2015 at 7:03 AM, Zalzberg, Idan (Agoda) 
mailto:idan.zalzb...@agoda.com>> wrote:
Hi,
I am using spark 1.2.1, sometimes I get these errors sporadically:
Any thought on what could be the cause?

Thanks

2015-02-27 15:08:47 ERROR SparkUncaughtExceptionHandler:96 - Uncaught exception 
in thread Thread[Executor task launch worker-25,5,main]
java.lang.InternalError: a fault occurred in a recent unsafe memory access 
operation in compiled Java code
at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1377)
at 
java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
at 
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
at 
org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at 
scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at 
org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:365)
at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)



This message is confidential and is for the sole use of the intended 
recipient(s). It may also be privileged or otherwise protected by copyright or 
other legal rules. If you have received it by mistake please let us know by 
reply email and delete it from your system. It is prohibited to copy this 
message or disclose its content to anyone. Any confidentiality or privilege is 
not waived or lost by any mistaken delivery or unauthorized disclosure of the 
message. All messages sent to and from Agoda may be monitored to ensure 
compliance with company policies, to protect the company's interests and to 
remove potential malware. Electronic messages may be intercepted, amended, lost 
or deleted, or contain viruses.




Re: Connection pool in workers

2015-03-01 Thread Chris Fregly
hey AKM!

this is a very common problem.  the streaming programming guide addresses
this issue here, actually:
http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html#design-patterns-for-using-foreachrdd

the tl;dr is this:
1) you want to use foreachPartition() to operate on a whole partition
versus a single record with foreachRDD()
2) you want to get/release the ConnectionPool within each worker
3) make sure you initialize the ConnectionPool first - or do it lazily upon
getting the first connection.

here's the sample code referenced in the link above with some additional
comments for clarity:

dstream.foreachRDD { rdd =>
  // everything within here runs on the Driver

  rdd.foreachPartition { partitionOfRecords =>
   // everything within here runs on the Worker and operates on a partition
of records

// ConnectionPool is a static, lazily initialized singleton pool of
connections that runs within the Worker JVM

// retrieve a connection from the pool
val connection = ConnectionPool.getConnection()

// perform the application logic here - parse and write to mongodb
using the connection
partitionOfRecords.foreach(record => connection.send(record))

// return to the pool for future reuse
ConnectionPool.returnConnection(connection)
  }
}

hope that helps!

-chris




On Sun, Mar 1, 2015 at 4:00 AM, A.K.M. Ashrafuzzaman <
ashrafuzzaman...@gmail.com> wrote:

> Sorry guys may bad,
> Here is a high level code sample,
>
> val unionStreams = ssc.union(kinesisStreams)
> unionStreams.foreachRDD(rdd => {
>   rdd.foreach(tweet =>
> val strTweet = new String(tweet, "UTF-8")
> val interaction = InteractionParser.parser(strTweet)
> interactionDAL.insert(interaction)
>   )
> })
>
> Here I have to close the connection for interactionDAL other wise the JVM
> gives me error that the connection is open. I tried with sticky connection
> as well with keep_alive true. So my guess was that at the point of
> “unionStreams.foreachRDD” or at “rdd.foreach” the code is marshaled and
> send to workers and workers un-marshals and execute the process, which is
> why the connection is alway opened for each RDD. I might be completely
> wrong. I would love to know what is going on underneath.
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Pushing data from AWS Kinesis -> Spark Streaming -> AWS Redshift

2015-03-01 Thread Chris Fregly
Hey Mike-

Great to see you're using the AWS stack to its fullest!

I've already created the Kinesis-Spark Streaming connector with examples,
documentation, test, and everything.  You'll need to build Spark from
source with the -Pkinesis-asl profile, otherwise they won't be included in
the build.  This is due to the Amazon Software License (ASL).

Here's the link to the Kinesis-Spark Streaming integration guide:
http://spark.apache.org/docs/1.2.0/streaming-kinesis-integration.html.

Here's a link to the source:
https://github.com/apache/spark/tree/master/extras/kinesis-asl/src/main/scala/org/apache/spark

Here's the original jira as well as some follow-up jiras that I'm working
on now:

   - https://issues.apache.org/jira/browse/SPARK-1981
   - https://issues.apache.org/jira/browse/SPARK-5959
   - https://issues.apache.org/jira/browse/SPARK-5960

I met a lot of folks at the Strata San Jose conference a couple weeks ago
who are using this Kinesis connector with good results, so you should be OK
there.

Regarding Redshift, the current Redshift connector only pulls data from
Redshift - it doesn't not write to Redshift.

And actually, it only reads files that have been UNLOADed from Redshift
into S3, so it's not pulling from Redshift directly.

This is confusing, I know.  Here's the jira for this work:
https://issues.apache.org/jira/browse/SPARK-3205

For now, you'd have to use the AWS Java SDK to write to Redshift directly
from within your Spark Streaming worker as batches of data come in from the
Kinesis Stream.

This is roughly how your Spark Streaming app would look:

dstream.foreachRDD { rdd =>
  // everything within here runs on the Driver

  rdd.foreachPartition { partitionOfRecords =>
   // everything within here runs on the Worker and operates on a partition
of records

// RedshiftConnectionPool is a static, lazily initialized singleton
pool of connections that runs within the Worker JVM

// retrieve a connection from the pool
val connection = RedshiftConnectionPool.getConnection()

// perform the application logic here - parse and write to Redshift
using the connection
partitionOfRecords.foreach(record => connection.send(record))

// return to the pool for future reuse
RedshiftConnectionPool.returnConnection(connection)
  }
}

Note that you would need to write the RedshiftConnectionPool single class
yourself using the AWS Java SDK as I mentioned.

There is a relatively new Spark SQL DataSources API that supports reading
and writing to these data sources.

An example Avro implementation is here:
http://spark-packages.org/package/databricks/spark-avro, although I think
that's a read-only impl, but you get the idea.

I've created 2 jiras for creating libraries for DynamoDB and Redshift that
implement this DataSources API.

Here are the jiras:

   - https://issues.apache.org/jira/browse/SPARK-6101 (DynamoDB)
   - https://issues.apache.org/jira/browse/SPARK-6102 (Redshift)

Perhaps you can take a stab at SPARK-6102?  I should be done with the
DynamoDB impl in the next few weeks - scheduled for Spark 1.4.0.

Hope that helps!  :)

-Chris


On Sun, Mar 1, 2015 at 11:06 AM, Mike Trienis 
wrote:

> Hi All,
>
> I am looking at integrating a data stream from AWS Kinesis to AWS Redshift
> and since I am already ingesting the data through Spark Streaming, it seems
> convenient to also push that data to AWS Redshift at the same time.
>
> I have taken a look at the AWS kinesis connector although I am not sure it
> was designed to integrate with Apache Spark. It seems more like a
> standalone approach:
>
>- https://github.com/awslabs/amazon-kinesis-connectors
>
> There is also a Spark redshift integration library, however, it looks like
> it was intended for pulling data rather than pushing data to AWS Redshift:
>
>- https://github.com/databricks/spark-redshift
>
> I finally took a look at a general Scala library that integrates with AWS
> Redshift:
>
>- http://scalikejdbc.org/
>
> Does anyone have any experience pushing data from Spark Streaming to AWS
> Redshift? Does it make sense conceptually, or does it make more sense to
> push data from AWS Kinesis to AWS Redshift VIA another standalone approach
> such as the AWS Kinesis connectors.
>
> Thanks, Mike.
>
>


Re: Streaming scheduling delay

2015-03-01 Thread Josh J
On Fri, Feb 13, 2015 at 2:21 AM, Gerard Maas  wrote:

> KafkaOutputServicePool


Could you please give an example code of how KafkaOutputServicePool would
look like? When I tried object pooling I end up with various not
serializable exceptions.

Thanks!
Josh


Column Similarities using DIMSUM fails with GC overhead limit exceeded

2015-03-01 Thread Sabarish Sasidharan
I am trying to compute column similarities on a 30x1000 RowMatrix of
DenseVectors. The size of the input RDD is 3.1MB and its all in one
partition. I am running on a single node of 15G and giving the driver 1G
and the executor 9G. This is on a single node hadoop. In the first attempt
the BlockManager doesn't respond within the heart beat interval. In the
second attempt I am seeing a GC overhead limit exceeded error. And it is
almost always in the RowMatrix.columSimilaritiesDIMSUM ->
mapPartitionsWithIndex (line 570)

java.lang.OutOfMemoryError: GC overhead limit exceeded
at
org.apache.spark.mllib.linalg.distributed.RowMatrix$$anonfun$19$$anonfun$apply$2.apply(RowMatrix.scala:570)
at
org.apache.spark.mllib.linalg.distributed.RowMatrix$$anonfun$19$$anonfun$apply$2.apply(RowMatrix.scala:528)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)


It also really seems to be running out of memory. I am seeing the following
in the attempt log
Heap
 PSYoungGen  total 2752512K, used 2359296K
  eden space 2359296K, 100% used
  from space 393216K, 0% used
  to   space 393216K, 0% used
 ParOldGen   total 6291456K, used 6291376K [0x00058000,
0x0007, 0x0007)
  object space 6291456K, 99% used
 Metaspace   used 39225K, capacity 39558K, committed 39904K, reserved
1083392K
  class spaceused 5736K, capacity 5794K, committed 5888K, reserved
1048576K​

​What could be going wrong?

Regards
Sab


Re: Tools to manage workflows on Spark

2015-03-01 Thread Felix C
We use Oozie as well, and it has worked well.
The catch is each action in Oozie is separate and one cannot retain 
SparkContext or RDD, or leverage caching or temp table, going into another 
Oozie action. You could either save output to file or put all Spark processing 
into one Oozie action.

--- Original Message ---

From: "Mayur Rustagi" 
Sent: February 28, 2015 7:07 PM
To: "Qiang Cao" 
Cc: "Ted Yu" , "Ashish Nigam" , 
"user" 
Subject: Re: Tools to manage workflows on Spark

Sorry not really. Spork is a way to migrate your existing pig scripts to
Spark or write new pig jobs then can execute on spark.
For orchestration you are better off using Oozie especially if you are
using other execution engines/systems besides spark.


Regards,
Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoid.com 
@mayur_rustagi 

On Sat, Feb 28, 2015 at 6:59 PM, Qiang Cao  wrote:

> Thanks Mayur! I'm looking for something that would allow me to easily
> describe and manage a workflow on Spark. A workflow in my context is a
> composition of Spark applications that may depend on one another based on
> hdfs inputs/outputs. Is Spork a good fit? The orchestration I want is on
> app level.
>
>
>
> On Sat, Feb 28, 2015 at 9:38 PM, Mayur Rustagi 
> wrote:
>
>> We do maintain it but in apache repo itself. However Pig cannot do
>> orchestration for you. I am not sure what you are looking at from Pig in
>> this context.
>>
>> Regards,
>> Mayur Rustagi
>> Ph: +1 (760) 203 3257
>> http://www.sigmoid.com 
>> @mayur_rustagi 
>>
>> On Sat, Feb 28, 2015 at 6:36 PM, Ted Yu  wrote:
>>
>>> Here was latest modification in spork repo:
>>> Mon Dec 1 10:08:19 2014
>>>
>>> Not sure if it is being actively maintained.
>>>
>>> On Sat, Feb 28, 2015 at 6:26 PM, Qiang Cao 
>>> wrote:
>>>
 Thanks for the pointer, Ashish! I was also looking at Spork
 https://github.com/sigmoidanalytics/spork Pig-on-Spark), but wasn't
 sure if that's the right direction.

 On Sat, Feb 28, 2015 at 6:36 PM, Ashish Nigam 
 wrote:

> You have to call spark-submit from oozie.
> I used this link to get the idea for my implementation -
>
>
> http://mail-archives.apache.org/mod_mbox/oozie-user/201404.mbox/%3CCAHCsPn-0Grq1rSXrAZu35yy_i4T=fvovdox2ugpcuhkwmjp...@mail.gmail.com%3E
>
>
>
> On Feb 28, 2015, at 3:25 PM, Qiang Cao  wrote:
>
> Thanks, Ashish! Is Oozie integrated with Spark? I knew it can
> accommodate some Hadoop jobs.
>
>
> On Sat, Feb 28, 2015 at 6:07 PM, Ashish Nigam 
> wrote:
>
>> Qiang,
>> Did you look at Oozie?
>> We use oozie to run spark jobs in production.
>>
>>
>> On Feb 28, 2015, at 2:45 PM, Qiang Cao  wrote:
>>
>> Hi Everyone,
>>
>> We need to deal with workflows on Spark. In our scenario, each
>> workflow consists of multiple processing steps. Among different steps,
>> there could be dependencies.  I'm wondering if there are tools
>> available that can help us schedule and manage workflows on Spark. I'm
>> looking for something like pig on Hadoop, but it should fully function on
>> Spark.
>>
>> Any suggestion?
>>
>> Thanks in advance!
>>
>> Qiang
>>
>>
>>
>
>

>>>
>>
>


RE: Is SPARK_CLASSPATH really deprecated?

2015-03-01 Thread Taeyun Kim
spark.executor.extraClassPath is especially useful when the output is
written to HBase, since the data nodes on the cluster have HBase library
jars.

-Original Message-
From: Patrick Wendell [mailto:pwend...@gmail.com] 
Sent: Friday, February 27, 2015 5:22 PM
To: Kannan Rajah
Cc: Marcelo Vanzin; user@spark.apache.org
Subject: Re: Is SPARK_CLASSPATH really deprecated?

I think we need to just update the docs, it is a bit unclear right now. At
the time, we made it worded fairly sternly because we really wanted people
to use --jars when we deprecated SPARK_CLASSPATH. But there are other types
of deployments where there is a legitimate need to augment the classpath of
every executor.

I think it should probably say something more like

"Extra classpath entries to append to the classpath of executors. This is
sometimes used in deployment environments where dependencies of Spark are
present in a specific place on all nodes".

Kannan - if you want to submit I patch I can help review it.

On Thu, Feb 26, 2015 at 8:24 PM, Kannan Rajah  wrote:
> Thanks Marcelo. Do you think it would be useful to make 
> spark.executor.extraClassPath be made to pick up some environment 
> variable that can be set from spark-env.sh? Here is a example.
>
> spark-env.sh
> --
> executor_extra_cp = get_hbase_jars_for_cp export executor_extra_cp
>
> spark-defaults.conf
> -
> spark.executor.extraClassPath = ${executor_extra_cp}
>
> This will let us add logic inside get_hbase_jars_for_cp function to 
> pick the right version hbase jars. There could be multiple versions 
> installed on the node.
>
>
>
> --
> Kannan
>
> On Thu, Feb 26, 2015 at 6:08 PM, Marcelo Vanzin 
wrote:
>>
>> On Thu, Feb 26, 2015 at 5:12 PM, Kannan Rajah 
wrote:
>> > Also, I would like to know if there is a localization overhead when 
>> > we use spark.executor.extraClassPath. Again, in the case of hbase, 
>> > these jars would be typically available on all nodes. So there is 
>> > no need to localize them from the node where job was submitted. I 
>> > am wondering if we use the SPARK_CLASSPATH approach, then it would 
>> > not do localization. That would be an added benefit.
>> > Please clarify.
>>
>> spark.executor.extraClassPath doesn't localize anything. It just 
>> prepends those classpath entries to the usual classpath used to 
>> launch the executor. There's no copying of files or anything, so 
>> they're expected to exist on the nodes.
>>
>> It's basically exactly the same as SPARK_CLASSPATH, but broken down 
>> to two options (one for the executors, and one for the driver).
>>
>> --
>> Marcelo
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Tools to manage workflows on Spark

2015-03-01 Thread Himanish Kushary
We are running our Spark jobs on Amazon AWS and are using AWS Datapipeline
for orchestration of the different spark jobs. AWS datapipeline provides
automatic EMR cluster provisioning, retry on failure,SNS notification etc.
out of the box and works well for us.





On Sun, Mar 1, 2015 at 7:02 PM, Felix C  wrote:

>  We use Oozie as well, and it has worked well.
> The catch is each action in Oozie is separate and one cannot retain
> SparkContext or RDD, or leverage caching or temp table, going into another
> Oozie action. You could either save output to file or put all Spark
> processing into one Oozie action.
>
> --- Original Message ---
>
> From: "Mayur Rustagi" 
> Sent: February 28, 2015 7:07 PM
> To: "Qiang Cao" 
> Cc: "Ted Yu" , "Ashish Nigam" ,
> "user" 
> Subject: Re: Tools to manage workflows on Spark
>
>  Sorry not really. Spork is a way to migrate your existing pig scripts to
> Spark or write new pig jobs then can execute on spark.
> For orchestration you are better off using Oozie especially if you are
> using other execution engines/systems besides spark.
>
>
> Regards,
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoid.com 
> @mayur_rustagi 
>
> On Sat, Feb 28, 2015 at 6:59 PM, Qiang Cao  wrote:
>
> Thanks Mayur! I'm looking for something that would allow me to easily
> describe and manage a workflow on Spark. A workflow in my context is a
> composition of Spark applications that may depend on one another based on
> hdfs inputs/outputs. Is Spork a good fit? The orchestration I want is on
> app level.
>
>
>
> On Sat, Feb 28, 2015 at 9:38 PM, Mayur Rustagi 
> wrote:
>
> We do maintain it but in apache repo itself. However Pig cannot do
> orchestration for you. I am not sure what you are looking at from Pig in
> this context.
>
> Regards,
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoid.com 
>  @mayur_rustagi 
>
> On Sat, Feb 28, 2015 at 6:36 PM, Ted Yu  wrote:
>
> Here was latest modification in spork repo:
> Mon Dec 1 10:08:19 2014
>
>  Not sure if it is being actively maintained.
>
> On Sat, Feb 28, 2015 at 6:26 PM, Qiang Cao  wrote:
>
> Thanks for the pointer, Ashish! I was also looking at Spork
> https://github.com/sigmoidanalytics/spork Pig-on-Spark), but wasn't sure
> if that's the right direction.
>
> On Sat, Feb 28, 2015 at 6:36 PM, Ashish Nigam 
> wrote:
>
> You have to call spark-submit from oozie.
> I used this link to get the idea for my implementation -
>
>
> http://mail-archives.apache.org/mod_mbox/oozie-user/201404.mbox/%3CCAHCsPn-0Grq1rSXrAZu35yy_i4T=fvovdox2ugpcuhkwmjp...@mail.gmail.com%3E
>
>
>
>  On Feb 28, 2015, at 3:25 PM, Qiang Cao  wrote:
>
>  Thanks, Ashish! Is Oozie integrated with Spark? I knew it can
> accommodate some Hadoop jobs.
>
>
> On Sat, Feb 28, 2015 at 6:07 PM, Ashish Nigam 
> wrote:
>
> Qiang,
> Did you look at Oozie?
> We use oozie to run spark jobs in production.
>
>
>  On Feb 28, 2015, at 2:45 PM, Qiang Cao  wrote:
>
>  Hi Everyone,
>
>  We need to deal with workflows on Spark. In our scenario, each workflow
> consists of multiple processing steps. Among different steps, there could
> be dependencies.  I'm wondering if there are tools available that can
> help us schedule and manage workflows on Spark. I'm looking for something
> like pig on Hadoop, but it should fully function on Spark.
>
>  Any suggestion?
>
>  Thanks in advance!
>
>  Qiang
>
>
>
>
>
>
>
>
>
>


-- 
Thanks & Regards
Himanish


Re: Column Similarities using DIMSUM fails with GC overhead limit exceeded

2015-03-01 Thread Reza Zadeh
Hi Sabarish,

Works fine for me with less than those settings (30x1000 dense matrix, 1GB
driver, 1GB executor):

bin/spark-shell --driver-memory 1G --executor-memory 1G

Then running the following finished without trouble and in a few seconds.
Are you sure your driver is actually getting the RAM you think you gave it?

// Create 30x1000 matrix
val rows = sc.parallelize(1 to 30, 4).map { line =>
  val values = Array.tabulate(1000)(x=>scala.math.random)
  Vectors.dense(values)
}.cache()
val mat = new RowMatrix(rows)

// Compute similar columns perfectly, with brute force.
val exact = mat.columnSimilarities().entries.map(x => x.value).sum()



On Sun, Mar 1, 2015 at 3:31 PM, Sabarish Sasidharan <
sabarish.sasidha...@manthan.com> wrote:

> I am trying to compute column similarities on a 30x1000 RowMatrix of
> DenseVectors. The size of the input RDD is 3.1MB and its all in one
> partition. I am running on a single node of 15G and giving the driver 1G
> and the executor 9G. This is on a single node hadoop. In the first attempt
> the BlockManager doesn't respond within the heart beat interval. In the
> second attempt I am seeing a GC overhead limit exceeded error. And it is
> almost always in the RowMatrix.columSimilaritiesDIMSUM ->
> mapPartitionsWithIndex (line 570)
>
> java.lang.OutOfMemoryError: GC overhead limit exceeded
> at
> org.apache.spark.mllib.linalg.distributed.RowMatrix$$anonfun$19$$anonfun$apply$2.apply(RowMatrix.scala:570)
> at
> org.apache.spark.mllib.linalg.distributed.RowMatrix$$anonfun$19$$anonfun$apply$2.apply(RowMatrix.scala:528)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>
>
> It also really seems to be running out of memory. I am seeing the
> following in the attempt log
> Heap
>  PSYoungGen  total 2752512K, used 2359296K
>   eden space 2359296K, 100% used
>   from space 393216K, 0% used
>   to   space 393216K, 0% used
>  ParOldGen   total 6291456K, used 6291376K [0x00058000,
> 0x0007, 0x0007)
>   object space 6291456K, 99% used
>  Metaspace   used 39225K, capacity 39558K, committed 39904K, reserved
> 1083392K
>   class spaceused 5736K, capacity 5794K, committed 5888K, reserved
> 1048576K​
>
> ​What could be going wrong?
>
> Regards
> Sab
>


Re: Column Similarities using DIMSUM fails with GC overhead limit exceeded

2015-03-01 Thread Sabarish Sasidharan
Sorry, I actually meant 30 x 1 matrix (missed a 0)


Regards
Sab


Re: Tools to manage workflows on Spark

2015-03-01 Thread Qiang Cao
Thanks, Himanish and Felix!

On Sun, Mar 1, 2015 at 7:50 PM, Himanish Kushary  wrote:

> We are running our Spark jobs on Amazon AWS and are using AWS Datapipeline
> for orchestration of the different spark jobs. AWS datapipeline provides
> automatic EMR cluster provisioning, retry on failure,SNS notification etc.
> out of the box and works well for us.
>
>
>
>
>
> On Sun, Mar 1, 2015 at 7:02 PM, Felix C  wrote:
>
>>  We use Oozie as well, and it has worked well.
>> The catch is each action in Oozie is separate and one cannot retain
>> SparkContext or RDD, or leverage caching or temp table, going into another
>> Oozie action. You could either save output to file or put all Spark
>> processing into one Oozie action.
>>
>> --- Original Message ---
>>
>> From: "Mayur Rustagi" 
>> Sent: February 28, 2015 7:07 PM
>> To: "Qiang Cao" 
>> Cc: "Ted Yu" , "Ashish Nigam" <
>> ashnigamt...@gmail.com>, "user" 
>> Subject: Re: Tools to manage workflows on Spark
>>
>>  Sorry not really. Spork is a way to migrate your existing pig scripts
>> to Spark or write new pig jobs then can execute on spark.
>> For orchestration you are better off using Oozie especially if you are
>> using other execution engines/systems besides spark.
>>
>>
>> Regards,
>> Mayur Rustagi
>> Ph: +1 (760) 203 3257
>> http://www.sigmoid.com 
>> @mayur_rustagi 
>>
>> On Sat, Feb 28, 2015 at 6:59 PM, Qiang Cao  wrote:
>>
>> Thanks Mayur! I'm looking for something that would allow me to easily
>> describe and manage a workflow on Spark. A workflow in my context is a
>> composition of Spark applications that may depend on one another based on
>> hdfs inputs/outputs. Is Spork a good fit? The orchestration I want is on
>> app level.
>>
>>
>>
>> On Sat, Feb 28, 2015 at 9:38 PM, Mayur Rustagi 
>> wrote:
>>
>> We do maintain it but in apache repo itself. However Pig cannot do
>> orchestration for you. I am not sure what you are looking at from Pig in
>> this context.
>>
>> Regards,
>> Mayur Rustagi
>> Ph: +1 (760) 203 3257
>> http://www.sigmoid.com 
>>  @mayur_rustagi 
>>
>> On Sat, Feb 28, 2015 at 6:36 PM, Ted Yu  wrote:
>>
>> Here was latest modification in spork repo:
>> Mon Dec 1 10:08:19 2014
>>
>>  Not sure if it is being actively maintained.
>>
>> On Sat, Feb 28, 2015 at 6:26 PM, Qiang Cao  wrote:
>>
>> Thanks for the pointer, Ashish! I was also looking at Spork
>> https://github.com/sigmoidanalytics/spork Pig-on-Spark), but wasn't sure
>> if that's the right direction.
>>
>> On Sat, Feb 28, 2015 at 6:36 PM, Ashish Nigam 
>> wrote:
>>
>> You have to call spark-submit from oozie.
>> I used this link to get the idea for my implementation -
>>
>>
>> http://mail-archives.apache.org/mod_mbox/oozie-user/201404.mbox/%3CCAHCsPn-0Grq1rSXrAZu35yy_i4T=fvovdox2ugpcuhkwmjp...@mail.gmail.com%3E
>>
>>
>>
>>  On Feb 28, 2015, at 3:25 PM, Qiang Cao  wrote:
>>
>>  Thanks, Ashish! Is Oozie integrated with Spark? I knew it can
>> accommodate some Hadoop jobs.
>>
>>
>> On Sat, Feb 28, 2015 at 6:07 PM, Ashish Nigam 
>> wrote:
>>
>> Qiang,
>> Did you look at Oozie?
>> We use oozie to run spark jobs in production.
>>
>>
>>  On Feb 28, 2015, at 2:45 PM, Qiang Cao  wrote:
>>
>>  Hi Everyone,
>>
>>  We need to deal with workflows on Spark. In our scenario, each workflow
>> consists of multiple processing steps. Among different steps, there could
>> be dependencies.  I'm wondering if there are tools available that can
>> help us schedule and manage workflows on Spark. I'm looking for something
>> like pig on Hadoop, but it should fully function on Spark.
>>
>>  Any suggestion?
>>
>>  Thanks in advance!
>>
>>  Qiang
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>
>
> --
> Thanks & Regards
> Himanish
>


Re: Column Similarities using DIMSUM fails with GC overhead limit exceeded

2015-03-01 Thread Reza Zadeh
Hi Sab,
In this dense case, the output will contain 1 x 1 entries, i.e. 100
million doubles, which doesn't fit in 1GB with overheads.
For a dense matrix, similarColumns() scales quadratically in the number of
columns, so you need more memory across the cluster.
Reza


On Sun, Mar 1, 2015 at 7:06 PM, Sabarish Sasidharan <
sabarish.sasidha...@manthan.com> wrote:

> Sorry, I actually meant 30 x 1 matrix (missed a 0)
>
>
> Regards
> Sab
>
>


Re: Column Similarities using DIMSUM fails with GC overhead limit exceeded

2015-03-01 Thread Sabarish Sasidharan
​Hi Reza
​​
I see that ((int, int), double) pairs are generated for any combination
that meets the criteria controlled by the threshold. But assuming a simple
1x10K matrix that means I would need atleast 12GB memory per executor for
the flat map just for these pairs excluding any other overhead. Is that
correct? How can we make this scale for even larger n (when m stays small)
like 100 x 5 million.​ One is by using higher thresholds. The other is that
I use a SparseVector to begin with. Are there any other optimizations I can
take advantage of?

​Thanks
Sab


documentation - graphx-programming-guide error?

2015-03-01 Thread Deborah Siegel
Hello,

I am running through examples given on
http://spark.apache.org/docs/1.2.1/graphx-programming-guide.html

The section for Map Reduce Triplets Transition Guide (Legacy) indicates
that one can run the following .aggregateMessages code

val graph: Graph[Int, Float] = ...
def msgFun(triplet: EdgeContext[Int, Float, String]) {
triplet.sendToDst("Hi") }
def reduceFun(a: Int, b: Int): Int = a + b
val result = graph.aggregateMessages[String](msgFun, reduceFun)

I created a graph of the indicated type, and get an error

scala> val result = graph.aggregateMessages[String](msgFun, reduceFun)
:23: error: type mismatch;
found   : Int
required: String
Error occurred in an application involving default arguments.
val result = graph.aggregateMessages[String](msgFun, reduceFun)

  ^
What is this example supposed to do? The following would work, although
I'll admit I am perplexed by the example's intent.

def msgFun(triplet: EdgeContext[Int, Float, (Int,String)]) {
  triplet.sendToDst(1, "Hi")
}
def reduceFun(a: (Int,String), b: (Int,String)): (Int,String) = ((a._1 +
b._1),a._2)
val result = graph.aggregateMessages[(Int,String)](msgFun, reduceFun)

Sincerely,
Deb


Re: Column Similarities using DIMSUM fails with GC overhead limit exceeded

2015-03-01 Thread Debasish Das
Column based similarities work well if the columns are mild (10K, 100K, we
actually scaled it to 1.5M columns but it really stress tests the shuffle
and it needs to tune the shuffle parameters)...You can either use dimsum
sampling or come up with your own threshold based on your application that
you can apply in reduceByKey (you have to change the code to use
combineByKey and add your filters before shuffling the keys to reducer)...

The other variant that you are mentioning is row based similarity flow
which is tracked in the following JIRA where I am interesting in doing no
shuffle but use broadcast and mapPartitions. I will open up the PR soon but
it is compute intensive and I am experimenting with BLAS optimizations...

https://issues.apache.org/jira/browse/SPARK-4823

Your case of 100 x 5 million (tranpose of it) for example is very common in
matrix factorization where you have user factors and product factors which
will typically be 5 million x 100 dense matrix and you want to compute
user->user and item->item similarities...

You are right that sparsity helps but you can't apply sparsity (for example
pick topK) before doing the dot products...so it is still a compute
intensive operation...

On Sun, Mar 1, 2015 at 9:36 PM, Sabarish Sasidharan <
sabarish.sasidha...@manthan.com> wrote:

> ​Hi Reza
> ​​
> I see that ((int, int), double) pairs are generated for any combination
> that meets the criteria controlled by the threshold. But assuming a simple
> 1x10K matrix that means I would need atleast 12GB memory per executor for
> the flat map just for these pairs excluding any other overhead. Is that
> correct? How can we make this scale for even larger n (when m stays small)
> like 100 x 5 million.​ One is by using higher thresholds. The other is that
> I use a SparseVector to begin with. Are there any other optimizations I can
> take advantage of?
>
> ​Thanks
> Sab
>
>