Spark as standalone or with Hadoop stack.

2015-09-22 Thread Shiv Kandavelu

Hi All,

We currently have a Hadoop cluster having Yarn as the resource manager.
We are planning to use HBase as the data store due to the C-P aspects of the 
CAP Theorem.
We now want to do extensive data processing both stored data in HBase as well 
as Steam processing from online website / API
We now want to use both Spark/Mapreduce on an existing Hadoop cluster.

One of the recommendation we got was to use Spark Cluster as a standalone with 
Mesos as a resource manager on top of it to Monitor and scale. The reason for 
this recommendation is that Standalone Spark with Mesos is 100x faster than the 
Spark/Yarn/Hadoop combination. It was also mentioned that building on 
Spark/Mesos can help automatically add spark nodes on the fly for processing to 
scale. Also, it is easy to switch the bottom data stack HBASE to Cassandra or 
something else if we use Spark.

We are in the process of evaluating which stack will work best and with the 
knowledge we have, it is getting tough to pick one versus the other b/c of our 
inexperience in these platforms.

Can you help us understand the pros and cons of having Spark as a Standalone 
cluster Vs running on top of Hadoop stack?


pyspark question: create RDD from csr_matrix

2015-09-22 Thread jeff saremi
i've tried desperately to create an RDD from a matrix i have. Every combination 

I have a sparse matrix returned from a call to 
dv = DictVectorizer()sv_tf = dv.fit_transform(tf)

which is supposed to be a matrix of document terms and their frequencies.
I need to convert this to an RDD so I can feed it to pyspark functions such as 

I tried applying a Vectors.sparse(??, sv_tf) but i didn't know what the 
dimension should be
I tried doing a sc.parallelize(sv_tf) which didn't work either
I tried both above methods with sv_tf.toarray(). Again no luck


Re: Deploying spark-streaming application on production

2015-09-22 Thread Adrian Tanase
btw I re-read the docs and I want to clarify that reliable receiver + WAL gives 
you at least once, not exactly once semantics.

Sent from my iPhone

On 21 Sep 2015, at 21:50, Adrian Tanase 
> wrote:

I'm wondering, isn't this the canonical use case for WAL + reliable receiver?

As far as I know you can tune Mqtt server to wait for ack on messages (qos 
level 2?).
With some support from the client libray you could achieve exactly once 
semantics on the read side, if you ack message only after writing it to WAL, 


Sent from my iPhone

On 21 Sep 2015, at 12:35, Petr Novak 
> wrote:

In short there is no direct support for it in Spark AFAIK. You will either 
manage it in MQTT or have to add another layer of indirection - either 
in-memory based (observable streams, in-mem db) or disk based (Kafka, hdfs 
files, db) which will keep you unprocessed events.

Now realizing, there is support for backpressure in v1.5.0 but I don't know if 
it could be exploited aka I don't know if it is possible to decouple event 
reading into memory and actual processing code in Spark which could be swapped 
on the fly. Probably not without some custom built facility for it.


On Mon, Sep 21, 2015 at 11:26 AM, Petr Novak 
> wrote:
I should read my posts at least once to avoid so many typos. Hopefully you are 
brave enough to read through.


On Mon, Sep 21, 2015 at 11:23 AM, Petr Novak 
> wrote:
I think you would have to persist events somehow if you don't want to miss 
them. I don't see any other option there. Either in MQTT if it is supported 
there or routing them through Kafka.

There is WriteAheadLog in Spark but you would have decouple stream MQTT reading 
and processing into 2 separate job so that you could upgrade the processing one 
assuming the reading one would be stable (without changes) across versions. But 
it is problematic because there is no easy way how to share DStreams between 
jobs - you would have develop your own facility for it.

Alternatively the reading job could could save MQTT event in its the most raw 
form into files - to limit need to change code - and then the processing job 
would work on top of it using Spark streaming based on files. I this is 
inefficient and can get quite complex if you would like to make it reliable.

Basically either MQTT supports prsistence (which I don't know) or there is 
Kafka for these use case.

Another option would be I think to place observable streams in between MQTT and 
Spark streaming with bakcpressure as far as you could perform upgrade till 
buffers fills up.

I'm sorry that it is not thought out well from my side, it is just a brainstorm 
but it might lead you somewhere.


On Mon, Sep 21, 2015 at 10:09 AM, Jeetendra Gangele 
> wrote:
Hi All,

I have an spark streaming application with batch (10 ms) which is reading the 
MQTT channel and dumping the data from MQTT to HDFS.

So suppose if I have to deploy new application jar(with changes in spark 
streaming application) what is the best way to deploy, currently I am doing as 

1.killing the running streaming app using yarn application -kill ID
2. and then starting the application again

Problem with above approach is since we are not persisting the events in MQTT 
we will miss the events for the period of deploy.

how to handle this case?


How to share memory in a broadcast between tasks in the same executor?

2015-09-22 Thread Clément Frison

My team and I have a 32-core machine and we would like to use a huge object
- for example a large dictionary - in a map transformation and use all our
cores in parallel by sharing this object among some tasks.

We broadcast our large dictionary.

dico_br = sc.broadcast(dico)

We use it in a map: x: (x[0], function(x[1], dico_br)))

where function does a lookup : dico_br.value[x]

Our issue is that our dictionary is loaded 32 times in memory, and it
doesn't fit. So what we are doing is limiting the number of executors. It
works fine but we only have 8 cpus working in parallel instead of 32.

We would like to take advantage of multicore processing and shared memory,
as the 32 cores are in the same machine. For example we would like to load
the dictionary in memory 8 times only and make 4 cores share it. How could
we achieve that with Spark ?

What we have tried - without success :

1) One driver/worker with 32 cores : local[32]

2) Standalone with one master and 8 workers - each of them having 4 cores

Thanks a lot for your help, Clement

Re: Spark 1.5 UDAF ArrayType

2015-09-22 Thread Michael Armbrust
Check out:

On Tue, Sep 22, 2015 at 12:49 PM, Deenar Toraskar <> wrote:

> Michael
> Thank you for your prompt answer. I will repost after I try this again on
> 1.5.1 or branch-1.5. In addition a blog post on SparkSQL data types would
> be very helpful. I am familiar with the Hive data types, but there is very
> little documentation on Spark SQL data types.
> Regards
> Deenar
> *Think Reactive Ltd*
> 07714140812
> On 22 September 2015 at 19:28, Michael Armbrust 
> wrote:
>> I think that you are hitting a bug (which should be fixed in Spark
>> 1.5.1).  I'm hoping we can cut an RC for that this week.  Until then you
>> could try building branch-1.5.
>> On Tue, Sep 22, 2015 at 11:13 AM, Deenar Toraskar <
>>> wrote:
>>> Hi
>>> I am trying to write an UDAF ArraySum, that does element wise sum of
>>> arrays of Doubles returning an array of Double following the sample in
>>> I am getting the following error. Any guidance on handle complex type in
>>> Spark SQL would be appreciated.
>>> Regards
>>> Deenar
>>> import org.apache.spark.sql.expressions.MutableAggregationBuffer
>>> import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
>>> import org.apache.spark.sql.Row
>>> import org.apache.spark.sql.types._
>>> import org.apache.spark.sql.functions._
>>> class ArraySum extends UserDefinedAggregateFunction {
>>>def inputSchema: org.apache.spark.sql.types.StructType =
>>> StructType(StructField("value", ArrayType(DoubleType, false)) :: Nil)
>>>   def bufferSchema: StructType =
>>> StructType(StructField("value", ArrayType(DoubleType, false)) :: Nil)
>>>   def dataType: DataType = ArrayType(DoubleType, false)
>>>   def deterministic: Boolean = true
>>>   def initialize(buffer: MutableAggregationBuffer): Unit = {
>>> buffer(0) = Nil
>>>   }
>>>   def update(buffer: MutableAggregationBuffer,input: Row): Unit = {
>>> val currentSum : Seq[Double] = buffer.getSeq(0)
>>> val currentRow : Seq[Double] = input.getSeq(0)
>>> buffer(0) = (currentSum, currentRow) match {
>>>   case (Nil, Nil) => Nil
>>>   case (Nil, row) => row
>>>   case (sum, Nil) => sum
>>>   case (sum, row) => (seq, anotherSeq){ case (a, b) => a
>>> + b }
>>>   // TODO handle different sizes arrays here
>>> }
>>>   }
>>>   def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
>>> val currentSum : Seq[Double] = buffer1.getSeq(0)
>>> val currentRow : Seq[Double] = buffer2.getSeq(0)
>>> buffer1(0) = (currentSum, currentRow) match {
>>>   case (Nil, Nil) => Nil
>>>   case (Nil, row) => row
>>>   case (sum, Nil) => sum
>>>   case (sum, row) => (seq, anotherSeq){ case (a, b) => a
>>> + b }
>>>   // TODO handle different sizes arrays here
>>> }
>>>   }
>>>   def evaluate(buffer: Row): Any = {
>>> buffer.getSeq(0)
>>>   }
>>> }
>>> val arraySum = new ArraySum
>>> sqlContext.udf.register("ArraySum", arraySum)
>>> *%sql select ArraySum(Array(1.0,2.0,3.0)) from pnls where date =
>>> '2015-05-22' limit 10*
>>> gives me the following error
>>> Error in SQL statement: SparkException: Job aborted due to stage
>>> failure: Task 0 in stage 219.0 failed 4 times, most recent failure: Lost
>>> task 0.3 in stage 219.0 (TID 11242,
>>> java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef
>>> cannot be cast to org.apache.spark.sql.types.ArrayData at
>>> org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getArray(rows.scala:47)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.GenericMutableRow.getArray(rows.scala:247)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.JoinedRow.getArray(JoinedRow.scala:108)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown
>>> Source) at
>>> org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$32.apply(AggregationIterator.scala:373)
>>> at
>>> org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$32.apply(AggregationIterator.scala:362)
>>> at
>>> at
>>> at scala.collection.Iterator$$anon$ at
>>> scala.collection.Iterator$$anon$ at
>>> scala.collection.Iterator$class.foreach(Iterator.scala:727) at

Re: Spark as standalone or with Hadoop stack.

2015-09-22 Thread Ted Yu
bq. it's relatively harder to use it with HBase

I agree with Sean.
I work on HBase. To my knowledge, no one runs HBase on top of Mesos.

On Tue, Sep 22, 2015 at 12:31 PM, Sean Owen  wrote:

> Who told you Mesos would make Spark 100x faster? does it make sense
> that just the resource manager could make that kind of difference?
> This sounds entirely wrong, or, maybe a mishearing.
> I don't know if Mesos is somehow easier to use with Cassandra, but
> it's relatively harder to use it with HBase, HDFS, etc. You probably
> want to use the Hadoop resource manager, YARN, if using Hadoop-ish
> stack components.
> As for Spark, the YARN integration actually has some advantages at the
> moment, like dynamic allocation. I think the security story is more
> complete too (? not sure).
> On Tue, Sep 22, 2015 at 8:25 PM, Shiv Kandavelu
>  wrote:
> >
> >
> > Hi All,
> >
> >
> >
> > We currently have a Hadoop cluster having Yarn as the resource manager.
> >
> > We are planning to use HBase as the data store due to the C-P aspects of
> the
> > CAP Theorem.
> >
> > We now want to do extensive data processing both stored data in HBase as
> > well as Steam processing from online website / API
> >
> > We now want to use both Spark/Mapreduce on an existing Hadoop cluster.
> >
> >
> >
> > One of the recommendation we got was to use Spark Cluster as a standalone
> > with Mesos as a resource manager on top of it to Monitor and scale. The
> > reason for this recommendation is that Standalone Spark with Mesos is
> 100x
> > faster than the Spark/Yarn/Hadoop combination. It was also mentioned that
> > building on Spark/Mesos can help automatically add spark nodes on the fly
> > for processing to scale. Also, it is easy to switch the bottom data stack
> > HBASE to Cassandra or something else if we use Spark.
> >
> >
> >
> > We are in the process of evaluating which stack will work best and with
> the
> > knowledge we have, it is getting tough to pick one versus the other b/c
> of
> > our inexperience in these platforms.
> >
> >
> >
> > Can you help us understand the pros and cons of having Spark as a
> Standalone
> > cluster Vs running on top of Hadoop stack?
> >
> >
> >
> > Thanks!
> >
> >
> -
> To unsubscribe, e-mail:
> For additional commands, e-mail:

Re: Spark 1.5 UDAF ArrayType

2015-09-22 Thread Michael Armbrust
I think that you are hitting a bug (which should be fixed in Spark 1.5.1).
I'm hoping we can cut an RC for that this week.  Until then you could try
building branch-1.5.

On Tue, Sep 22, 2015 at 11:13 AM, Deenar Toraskar  wrote:

> Hi
> I am trying to write an UDAF ArraySum, that does element wise sum of
> arrays of Doubles returning an array of Double following the sample in
> I am getting the following error. Any guidance on handle complex type in
> Spark SQL would be appreciated.
> Regards
> Deenar
> import org.apache.spark.sql.expressions.MutableAggregationBuffer
> import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.functions._
> class ArraySum extends UserDefinedAggregateFunction {
>def inputSchema: org.apache.spark.sql.types.StructType =
> StructType(StructField("value", ArrayType(DoubleType, false)) :: Nil)
>   def bufferSchema: StructType =
> StructType(StructField("value", ArrayType(DoubleType, false)) :: Nil)
>   def dataType: DataType = ArrayType(DoubleType, false)
>   def deterministic: Boolean = true
>   def initialize(buffer: MutableAggregationBuffer): Unit = {
> buffer(0) = Nil
>   }
>   def update(buffer: MutableAggregationBuffer,input: Row): Unit = {
> val currentSum : Seq[Double] = buffer.getSeq(0)
> val currentRow : Seq[Double] = input.getSeq(0)
> buffer(0) = (currentSum, currentRow) match {
>   case (Nil, Nil) => Nil
>   case (Nil, row) => row
>   case (sum, Nil) => sum
>   case (sum, row) => (seq, anotherSeq){ case (a, b) => a +
> b }
>   // TODO handle different sizes arrays here
> }
>   }
>   def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
> val currentSum : Seq[Double] = buffer1.getSeq(0)
> val currentRow : Seq[Double] = buffer2.getSeq(0)
> buffer1(0) = (currentSum, currentRow) match {
>   case (Nil, Nil) => Nil
>   case (Nil, row) => row
>   case (sum, Nil) => sum
>   case (sum, row) => (seq, anotherSeq){ case (a, b) => a +
> b }
>   // TODO handle different sizes arrays here
> }
>   }
>   def evaluate(buffer: Row): Any = {
> buffer.getSeq(0)
>   }
> }
> val arraySum = new ArraySum
> sqlContext.udf.register("ArraySum", arraySum)
> *%sql select ArraySum(Array(1.0,2.0,3.0)) from pnls where date =
> '2015-05-22' limit 10*
> gives me the following error
> Error in SQL statement: SparkException: Job aborted due to stage failure:
> Task 0 in stage 219.0 failed 4 times, most recent failure: Lost task 0.3 in
> stage 219.0 (TID 11242, java.lang.ClassCastException:
> scala.collection.mutable.WrappedArray$ofRef cannot be cast to
> org.apache.spark.sql.types.ArrayData at
> org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getArray(rows.scala:47)
> at
> org.apache.spark.sql.catalyst.expressions.GenericMutableRow.getArray(rows.scala:247)
> at
> org.apache.spark.sql.catalyst.expressions.JoinedRow.getArray(JoinedRow.scala:108)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown
> Source) at
> org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$32.apply(AggregationIterator.scala:373)
> at
> org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$32.apply(AggregationIterator.scala:362)
> at
> at
> at scala.collection.Iterator$$anon$ at
> scala.collection.Iterator$$anon$ at
> scala.collection.Iterator$class.foreach(Iterator.scala:727) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> at scala.collection.TraversableOnce$
> at at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215)
> at

Re: How to share memory in a broadcast between tasks in the same executor?

2015-09-22 Thread Utkarsh Sengar
If broadcast variable doesn't fit in memory, I think is not the right fit
for you.
You can think about fitting it with an RDD as a tuple with other data you
are working on.

Say you are working on RDD (rdd in your case), run a map/reduce
to convert it to RDD> so now you have
relevant data from the dict as part of your RDD available locally in the
Its much efficient than finding workarounds to loading it partially.


On Tue, Sep 22, 2015 at 11:42 AM, Clément Frison 

> Hello,
> My team and I have a 32-core machine and we would like to use a huge
> object - for example a large dictionary - in a map transformation and use
> all our cores in parallel by sharing this object among some tasks.
> We broadcast our large dictionary.
> dico_br = sc.broadcast(dico)
> We use it in a map:
> x: (x[0], function(x[1], dico_br)))
> where function does a lookup : dico_br.value[x]
> Our issue is that our dictionary is loaded 32 times in memory, and it
> doesn't fit. So what we are doing is limiting the number of executors. It
> works fine but we only have 8 cpus working in parallel instead of 32.
> We would like to take advantage of multicore processing and shared memory,
> as the 32 cores are in the same machine. For example we would like to load
> the dictionary in memory 8 times only and make 4 cores share it. How could
> we achieve that with Spark ?
> What we have tried - without success :
> 1) One driver/worker with 32 cores : local[32]
> 2) Standalone with one master and 8 workers - each of them having 4 cores
> Thanks a lot for your help, Clement


Re: Mesos Tasks only run on one node

2015-09-22 Thread Tim Chen
What configuration have you used, and what are the slaves configuration?

Possiblity all other nodes either don't have enough resources, are is using
a another role that's preventing from the executor to be launched.


On Mon, Sep 21, 2015 at 1:58 PM, John Omernik  wrote:

> I have a happy healthy Mesos cluster (0.24) running in my lab.  I've
> compiled spark-1.5.0 and it seems to be working fine, except for one small
> issue, my tasks all seem to run on one node. (I have 6 in the cluster).
> Basically, I have directory of compressed text files.  Compressed, these
> 25 files add up to 1.2 GB of data, in bin/pyspark I do:
> txtfiles = sc.textFile("/path/to/my/data/*")
> txtfiles.count()
> This goes through and gives me the correct count, but all my tasks (25 of
> them) run on one node, let's call it node4.
> Interesting.
> So I was running spark from node4, but I would have thought it would have
> hit up more nodes.
> So I ran it on node5.  In executors tab on the spark UI, there is only one
> registered, and it's node4, and once again all tasks ran on node4.
> I am running in fine grain mode... is there a setting somewhere to allow
> for more executors? This seems weird. I've been away from Spark from 1.2.x
> but I don't seem to remember this...

Re: Creating BlockMatrix with java API

2015-09-22 Thread Pulasthi Supun Wickramasinghe
Hi Yanbo,

Thanks for the reply. I thought i might be missing something. Anyway i
moved to using scala since it is the complete API.

Best Regards,

On Tue, Sep 22, 2015 at 7:03 AM, Yanbo Liang  wrote:

> This is due to the distributed matrices like 
> BlockMatrix/RowMatrix/IndexedRowMatrix/CoordinateMatrix do
> not provide Java friendly constructors. I have file a SPARK-10757
>  to track this issue.
> 2015-09-18 3:36 GMT+08:00 Pulasthi Supun Wickramasinghe <
>> Hi All,
>> I am new to Spark and i am trying to do some BlockMatrix operations with
>> the Mllib API's. But i can't seem to create a BlockMatrix with the java
>> API. I tried the following
>> Matrix matrixa = Matrices.rand(4, 4, new Random(1000));
>> List,Matrix>> list = new 
>> ArrayList, Matrix>>();
>> Tuple2 intTuple = new Tuple2(0,0);
>> Tuple2,Matrix> tuple2MatrixTuple2 = new 
>> Tuple2, Matrix>(intTuple,matrixa );
>> list.add(tuple2MatrixTuple2);
>> JavaRDD, Matrix>> rdd = sc.parallelize(list);
>> BlockMatrix blockMatrix = new BlockMatrix(rdd,2,2);
>> but since BlockMatrix only
>> takes 
>> "RDD,Matrix>>"
>> this code does not work. sc.parallelize() returns a JavaRDD so the two
>> are not compatible. I also couldn't find any code samples for this. Any
>> help on this would be highly appreciated.
>> Best Regards,
>> Pulasthi
>> --
>> Pulasthi S. Wickramasinghe
>> Graduate Student  | Research Assistant
>> School of Informatics and Computing | Digital Science Center
>> Indiana University, Bloomington
>> cell: 224-386-9035

Pulasthi S. Wickramasinghe
Graduate Student  | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
cell: 224-386-9035

Re: spark + parquet + schema name and metadata

2015-09-22 Thread Cheng Lian

I see, this makes sense. We should probably add this in Spark SQL.

However, there's one corner case to note about user-defined Parquet 
metadata. When committing a write job, ParquetOutputCommitter writes 
Parquet summary files (_metadata and _common_metadata), and user-defined 
key-value metadata written in all Parquet part-files get merged here. 
The problem is that, if a single key is associated with multiple values, 
Parquet doesn't know how to reconcile this situation, and simply gives 
up writing summary files. This can be particular annoying for appending. 
In general, users should avoid storing "unstable" values like timestamps 
as Parquet metadata.


On 9/22/15 1:58 AM, Borisa Zivkovic wrote:

thanks for answer.

I need this in order to be able to track schema metadata.

basically when I create parquet files from Spark I want to be able to 
"tag" them in some way (giving the schema appropriate name or 
attaching some key/values) and then it is fairly easy to get basic 
metadata about parquet files when processing and discovering those 
later on.

On Mon, 21 Sep 2015 at 18:17 Cheng Lian > wrote:

Currently Spark SQL doesn't support customizing schema name and
metadata. May I know why these two matters in your use case? Some
Parquet data models, like parquet-avro, do support it, while some
don't (e.g. parquet-hive).


On 9/21/15 7:13 AM, Borisa Zivkovic wrote:
> Hi,
> I am trying to figure out how to write parquet metadata when
> persisting DataFrames to parquet using Spark (1.4.1)
> I could not find a way to change schema name (which seems to be
> hardcoded to root) and also how to add data to key/value metadata in
> parquet footer.
> org.apache.parquet.hadoop.metadata.FileMetaData#getKeyValueMetaData
> org.apache.parquet.schema.Type#getName
> thanks

Re: SparkR - calling as.vector() with rdd dataframe causes error

2015-09-22 Thread Luciano Resende
localDF is a pure R data frame and as.vector will work with no problems, as
for calling it in the SparkR objects, try calling collect before you call
as.vector (or in your case, the algorithms), that should solve your problem.

On Mon, Sep 21, 2015 at 8:48 AM, Ellen Kraffmiller <> wrote:

> Thank you for the link! I was using
>, and I didn't see
> replies there.
> Regarding your code example, I'm doing the same thing and successfully
> creating the rdd, but the problem is that when I call a clustering
> algorithm like amap::hcluster(), I get an error from as.vector() that the
> rdd cannot be coerced into a vector.
> On Fri, Sep 18, 2015 at 12:33 PM, Luciano Resende 
> wrote:
>> I see the thread with all the responses on the bottom at mail-archive :
>> On Fri, Sep 18, 2015 at 7:58 AM, Ellen Kraffmiller <
>>> wrote:
>>> Thanks for your response.  Is there a reason why this thread isn't
>>> appearing on the mailing list?  So far, I only see my post, with no
>>> answers, although I have received 2 answers via email.  It would be nice if
>>> other people could see these answers as well.
>>> On Thu, Sep 17, 2015 at 2:22 AM, Sun, Rui  wrote:
 The existing algorithms operating on R data.frame can't simply operate
 on SparkR DataFrame. They have to be re-implemented to be based on SparkR
 DataFrame API.

 -Original Message-
 From: ekraffmiller []
 Sent: Thursday, September 17, 2015 3:30 AM
 Subject: SparkR - calling as.vector() with rdd dataframe causes error

 I have a library of clustering algorithms that I'm trying to run in the
 SparkR interactive shell. (I am working on a proof of concept for a
 document classification tool.) Each algorithm takes a term document matrix
 in the form of a dataframe.  When I pass the method a local dataframe, the
 clustering algorithm works correctly, but when I pass it a spark rdd, it
 gives an error trying to coerce the data into a vector.  Here is the code,
 that I'm calling within SparkR:

 # get matrix from a file
 file <-


 #read it into variable
  raw_data <- read.csv(file,sep=',',header=FALSE)

 #convert to a local dataframe
 localDF = data.frame(raw_data)

 # create the rdd
 rdd  <- createDataFrame(sqlContext,localDF)

 #call the algorithm with the localDF - this works result <-
 galileo(localDF, model='hclust',dist='euclidean',link='ward',K=5)

 #call with the rdd - this produces error result <- galileo(rdd,

 Error in as.vector(data) :
   no method for coercing this S4 class to a vector

 I get the same error if I try to directly call as.vector(rdd) as well.

 Is there a reason why this works for localDF and not rdd?  Should I be
 doing something else to coerce the object into a vector?


 View this message in context:
 Sent from the Apache Spark User List mailing list archive at

 To unsubscribe, e-mail: For
 additional commands, e-mail:

>> --
>> Luciano Resende

Luciano Resende

Re: Remove duplicate keys by always choosing first in file.

2015-09-22 Thread Philip Weaver
The indices are definitely necessary. My first solution was just
reduceByKey { case (v, _) => v } and that didn't work. I needed to look at
both values and see which had the lower index.

On Tue, Sep 22, 2015 at 8:54 AM, Sean Owen  wrote:

> The point is that this only works if you already knew the file was
> presented in order within and across partitions, which was the
> original problem anyway. I don't think it is in general, but in
> practice, I do imagine it's already in the expected order from
> textFile. Maybe under the hood this ends up being ensured by
> TextInputFormat.
> So, adding the index and sorting on it doesn't add anything.
> On Tue, Sep 22, 2015 at 4:38 PM, Adrian Tanase  wrote:
> > just give zipWithIndex a shot, use it early in the pipeline. I think it
> > provides exactly the info you need, as the index is the original line
> number
> > in the file, not the index in the partition.
> >
> > Sent from my iPhone
> >
> > On 22 Sep 2015, at 17:50, Philip Weaver  wrote:
> >
> > Thanks. If textFile can be used in a way that preserves order, than both
> the
> > partition index and the index within each partition should be consistent,
> > right?
> >
> > I overcomplicated the question by asking about removing duplicates.
> > Fundamentally I think my question is, how does one sort lines in a file
> by
> > line number.
> >
> > On Tue, Sep 22, 2015 at 6:15 AM, Adrian Tanase 
> wrote:
> >>
> >> By looking through the docs and source code, I think you can get away
> with
> >> rdd.zipWithIndex to get the index of each line in the file, as long as
> you
> >> define the parallelism upfront:
> >> sc.textFile("", 4)
> >>
> >> You can then just do .groupBy(…).mapValues(_.sortBy(…).head) - I’m
> >> skimming through some tuples, hopefully this is clear enough.
> >>
> >> -adrian
> >>
> >> From: Philip Weaver
> >> Date: Tuesday, September 22, 2015 at 3:26 AM
> >> To: user
> >> Subject: Remove duplicate keys by always choosing first in file.
> >>
> >> I am processing a single file and want to remove duplicate rows by some
> >> key by always choosing the first row in the file for that key.
> >>
> >> The best solution I could come up with is to zip each row with the
> >> partition index and local index, like this:
> >>
> >> rdd.mapPartitionsWithIndex { case (partitionIndex, rows) =>
> >> { case (row, localIndex) => (row.key,
> >> ((partitionIndex, localIndex), row)) }
> >> }
> >>
> >>
> >> And then using reduceByKey with a min ordering on the (partitionIndex,
> >> localIndex) pair.
> >>
> >> First, can i count on SparkContext.textFile to read the lines in such
> that
> >> the partition indexes are always increasing so that the above works?
> >>
> >> And, is there a better way to accomplish the same effect?
> >>
> >> Thanks!
> >>
> >> - Philip
> >>
> >

Re: Has anyone used the Twitter API for location filtering?

2015-09-22 Thread Jo Sunad
Thanks Akhil, but I can't seem to get any tweets that include location
data. For example, when I do stream.filter(status =>
status.getPlace().getName) and run the code for 20 minutes I only get null
values.It seems like Twitter might purposely be removing the Place for free

On Tue, Sep 22, 2015 at 2:20 AM, Akhil Das 

> ​That's because sometime getPlace returns null and calling getLang over
> null throws up either null pointer exception or noSuchMethodError. You need
> to filter out those statuses which doesn't include location data.​
> Thanks
> Best Regards
> On Fri, Sep 18, 2015 at 12:46 AM, Jo Sunad  wrote:
>> I've been trying to filter for GeoLocation, Place or even Time Zone and I
>> keep getting null values. I think I got one Place in 20 minutes of the app
>> running (without any filters on tweets).
>> Is this normal? Do I have to try querying rather than filtering?
>> my code is following TD's example...
>> val stream = TwitterUtils
>> val hashtags = (status => status.getPlace().getName(),
>> status.getText())
>> getText, getFollowers, etc all work fine, I just don't get anything
>> location based (and getLang() for some reason throws a noMethodError).
>> Thanks for the help!

Re: How to speed up MLlib LDA?

2015-09-22 Thread Charles Earl
It seems that the Vowpal Wabbit version is most similar to what is in
Although the Intel seems to implement the Hierarchical Dirichlet Process
(topics and subtopics) as opposed to the implementation in VW, which is
based on
As opposed to Monte Carlo methods, in the HDP/VW they are using iterative
optimization of model parameters with respect predicted tokens (my best
shot at a one sentence).
The VW code is *highly* optimized.
A fast inferencer for Spark LDA would be of great value.

On Tue, Sep 22, 2015 at 1:30 PM, Pedro Rodriguez 

> I helped some with the LDA and worked quite a bit on a Gibbs version. I
> don't know if the Gibbs version might help, but since it is not (yet) in
> MLlib, Intel Analytics kindly created a spark package with their adapted
> version plus a couple other LDA algorithms:
> It might be worth trying out. Do you know what LDA algorithm VW uses?
> Pedro
> On Tue, Sep 22, 2015 at 1:54 AM, Marko Asplund 
> wrote:
>> Hi,
>> I did some profiling for my LDA prototype code that requests topic
>> distributions from a model.
>> According to Java Mission Control more than 80 % of execution time during
>> sample interval is spent in the following methods:
>> org.apache.commons.math3.util.FastMath.log(double); count: 337; 47.07%
>> org.apache.commons.math3.special.Gamma.digamma(double); count: 164; 22.91%
>> org.apache.commons.math3.util.FastMath.log(double, double[]); count: 50;
>> 6.98%
>> java.lang.Double.valueOf(double); count: 31; 4.33%
>> Is there any way of using the API more optimally?
>> Are there any opportunities for optimising the "topicDistributions" code
>> path in MLlib?
>> My code looks like this:
>> // executed once
>> val model = LocalLDAModel.load(ctx, ModelFileName)
>> // executed four times
>> val samples = Transformers.toSparseVectors(vocabularySize,
>> ctx.parallelize(Seq(input))) // fast
>> model.topicDistributions( // <== this
>> seems to take about 4 seconds to execute
>> marko
> --
> Pedro Rodriguez
> PhD Student in Distributed Machine Learning | CU Boulder
> UC Berkeley AMPLab Alumni
> | | 208-340-1703
> Github: | LinkedIn:

- Charles

Re: How to speed up MLlib LDA?

2015-09-22 Thread Marko Asplund
How optimized are the Commons math3 methods that showed up in profiling?
Are there any higher performance alternatives to these?


Re: How to speed up MLlib LDA?

2015-09-22 Thread Pedro Rodriguez
I helped some with the LDA and worked quite a bit on a Gibbs version. I
don't know if the Gibbs version might help, but since it is not (yet) in
MLlib, Intel Analytics kindly created a spark package with their adapted
version plus a couple other LDA algorithms:

It might be worth trying out. Do you know what LDA algorithm VW uses?


On Tue, Sep 22, 2015 at 1:54 AM, Marko Asplund 

> Hi,
> I did some profiling for my LDA prototype code that requests topic
> distributions from a model.
> According to Java Mission Control more than 80 % of execution time during
> sample interval is spent in the following methods:
> org.apache.commons.math3.util.FastMath.log(double); count: 337; 47.07%
> org.apache.commons.math3.special.Gamma.digamma(double); count: 164; 22.91%
> org.apache.commons.math3.util.FastMath.log(double, double[]); count: 50;
> 6.98%
> java.lang.Double.valueOf(double); count: 31; 4.33%
> Is there any way of using the API more optimally?
> Are there any opportunities for optimising the "topicDistributions" code
> path in MLlib?
> My code looks like this:
> // executed once
> val model = LocalLDAModel.load(ctx, ModelFileName)
> // executed four times
> val samples = Transformers.toSparseVectors(vocabularySize,
> ctx.parallelize(Seq(input))) // fast
> model.topicDistributions( // <== this
> seems to take about 4 seconds to execute
> marko

Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni | | 208-340-1703
Github: | LinkedIn:

Re: Count for select not matching count for group by

2015-09-22 Thread Michael Armbrust
This looks like something is wrong with predicate pushdown.  Can you
include the output of calling explain, and tell us what format the data is
stored in?

On Mon, Sep 21, 2015 at 8:06 AM, Michael Kelly 

> Hi,
> I'm seeing some strange behaviour with spark 1.5, I have a dataframe
> that I have built from loading and joining some hive tables stored in
> s3.
> The dataframe is cached in memory, using df.cache.
> What I'm seeing is that the counts I get when I do a group by on a
> column are different from what I get when I filter/select and count.
> outcome | count
> --
> 'A'   |  100
> 'B'   |  200
> df.filter("outcome = 'A'").count
> # 50
> df.filter(df("outcome") === "A").count
> # 50
> I expect the count of columns that match 'A' in the groupBy to match
> the count when filtering. Any ideas what might be happening?
> Thanks,
> Michael
> -
> To unsubscribe, e-mail:
> For additional commands, e-mail:

Spark 1.5 UDAF ArrayType

2015-09-22 Thread Deenar Toraskar

I am trying to write an UDAF ArraySum, that does element wise sum of arrays
of Doubles returning an array of Double following the sample in
I am getting the following error. Any guidance on handle complex type in
Spark SQL would be appreciated.


import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

class ArraySum extends UserDefinedAggregateFunction {
   def inputSchema: org.apache.spark.sql.types.StructType =
StructType(StructField("value", ArrayType(DoubleType, false)) :: Nil)

  def bufferSchema: StructType =
StructType(StructField("value", ArrayType(DoubleType, false)) :: Nil)

  def dataType: DataType = ArrayType(DoubleType, false)

  def deterministic: Boolean = true

  def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = Nil

  def update(buffer: MutableAggregationBuffer,input: Row): Unit = {
val currentSum : Seq[Double] = buffer.getSeq(0)
val currentRow : Seq[Double] = input.getSeq(0)
buffer(0) = (currentSum, currentRow) match {
  case (Nil, Nil) => Nil
  case (Nil, row) => row
  case (sum, Nil) => sum
  case (sum, row) => (seq, anotherSeq){ case (a, b) => a + b
  // TODO handle different sizes arrays here

  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
val currentSum : Seq[Double] = buffer1.getSeq(0)
val currentRow : Seq[Double] = buffer2.getSeq(0)
buffer1(0) = (currentSum, currentRow) match {
  case (Nil, Nil) => Nil
  case (Nil, row) => row
  case (sum, Nil) => sum
  case (sum, row) => (seq, anotherSeq){ case (a, b) => a + b
  // TODO handle different sizes arrays here

  def evaluate(buffer: Row): Any = {

val arraySum = new ArraySum
sqlContext.udf.register("ArraySum", arraySum)

*%sql select ArraySum(Array(1.0,2.0,3.0)) from pnls where date =
'2015-05-22' limit 10*

gives me the following error

Error in SQL statement: SparkException: Job aborted due to stage failure:
Task 0 in stage 219.0 failed 4 times, most recent failure: Lost task 0.3 in
stage 219.0 (TID 11242, java.lang.ClassCastException:
scala.collection.mutable.WrappedArray$ofRef cannot be cast to
org.apache.spark.sql.types.ArrayData at
Source) at
at scala.collection.Iterator$$anon$ at
scala.collection.Iterator$$anon$ at
scala.collection.Iterator$class.foreach(Iterator.scala:727) at
scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$ at at
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at at
org.apache.spark.executor.Executor$ at

Re: WAL on S3

2015-09-22 Thread Tathagata Das
You can keep the checkpoints in the Hadoop-compatible file system and the
WAL somewhere else using your custom WAL implementation. Yes, cleaning up
the stuff gets complicated as it is not as easy as deleting off the
checkpoint directory - you will have to clean up checkpoint directory as
well as the whatever other storage that your custom WAL uses. However, if I
remember correctly, the WAL information is used only when the Dstreams are
recovered correctly from checkpoints.

Note that, there are further details here that require deeper
understanding. There are actually two uses of WALs in the system -

1. Data WAL for received data  - This is what is usually referred to as the
WAL everywhere. Each receiver writes to a different WAL. This deals with
bulk data.
2. Metadata WAL - This is used by the driver to save metadata information
like  block to data WAL segment mapping, etc. I usually skip mentioning
this. This WAL is automatically used when data WAL is enabled. And this
deals with small data.

If you have to get around S3's limitations, you will have to plugin both
WALs (see this

for SparkConfs, but not that we havent made these confs public). While the
system supports plugging them in, we havent made this information public
yet because of such complexities in working with it.  And we have invested
time in making common sources like Kafka not require WALs (e.g. Direct
Kafka  approach). In future, we do hope to have a better solution for
general receivers + WALs + S3 (personally, I really wish S3's semantics
improve and fixes this issue).

Another alternative direction may be Amazon EFS. Since it based on EBS, it
may give the necessary semantics. But I havent given that a spin, so its
uncharted territory :)


On Tue, Sep 22, 2015 at 5:15 PM, Michal Čizmazia  wrote:

> My understanding of pluggable WAL was that it eliminates the need for
> having a Hadoop-compatible file system [1].
> What is the use of pluggable WAL when it can be only used together with
> checkpointing which still requires a Hadoop-compatible file system?
> [1]:
> On 22 September 2015 at 19:57, Tathagata Das 
> wrote:
>> 1. Currently, the WAL can be used only with checkpointing turned on,
>> because it does not make sense to recover from WAL if there is not
>> checkpoint information to recover from.
>> 2. Since the current implementation saves the WAL in the checkpoint
>> directory, they share the fate -- if checkpoint directory is deleted, then
>> both checkpoint info and WAL info is deleted.
>> 3. Checkpointing is currently not pluggable. Why do do you want that?
>> On Tue, Sep 22, 2015 at 4:53 PM, Michal Čizmazia 
>> wrote:
>>> I am trying to use pluggable WAL, but it can be used only with
>>> checkpointing turned on. Thus I still need have a Hadoop-compatible file
>>> system.
>>> Is there something like pluggable checkpointing?
>>> Or can WAL be used without checkpointing? What happens when WAL is
>>> available but the checkpoint directory is lost?
>>> Thanks!
>>> On 18 September 2015 at 05:47, Tathagata Das 
>>> wrote:
 I dont think it would work with multipart upload either. The file is
 not visible until the multipart download is explicitly closed. So even if
 each write a part upload, all the parts are not visible until the multiple
 download is closed.


 On Fri, Sep 18, 2015 at 1:55 AM, Steve Loughran  wrote:

> > On 17 Sep 2015, at 21:40, Tathagata Das  wrote:
> >
> > Actually, the current WAL implementation (as of Spark 1.5) does not
> work with S3 because S3 does not support flushing. Basically, the current
> implementation assumes that after write + flush, the data is immediately
> durable, and readable if the system crashes without closing the WAL file.
> This does not work with S3 as data is durable only and only if the S3 file
> output stream is cleanly closed.
> >
> more precisely, unless you turn multipartition uploads on, the S3n/s3a
> clients Spark uses *doesn't even upload anything to s3*.
> It's not a filesystem, and you have to bear that in mind.
> Amazon's own s3 client used in EMR behaves differently; it may be
> usable as a destination (I haven't tested)


Spark standalone/Mesos on top of Ceph

2015-09-22 Thread
Hi guys,

Here is the info for Ceph : 

We are investigating and using Ceph for distributed storage and monitoring, 
specifically interested

in using Ceph as the underlied file system storage for spark. However, we had 
no experience for achiveing 

that. Any body has seen such progress ? 


Re: Spark standalone/Mesos on top of Ceph

2015-09-22 Thread Jerry Lam
Do you have specific reasons to use Ceph? I used Ceph before, I'm not too
in love with it especially when I was using the Ceph Object Gateway S3 API.
There are some incompatibilities with aws s3 api. You really really need to
try it because making the commitment. Did you managed to install it?

On Tue, Sep 22, 2015 at 9:28 PM, 

> Hi guys,
> Here is the info for Ceph :
> We are investigating and using Ceph for distributed storage and
> monitoring, specifically interested
> in using Ceph as the underlied file system storage for spark. However, we
> had no experience for achiveing
> that. Any body has seen such progress ?
> Best,
> Sun.
> --

how to submit the spark job outside the cluster

2015-09-22 Thread Zhiliang Zhu
 Dear Experts,

Spark job is running on the cluster by yarn. Since the job can be submited at 
the place on the machine from the cluster,however, I would like to submit the 
job from another machine which does not belong to the cluster.I know for this, 
hadoop job could be done by way of another machine which is installed hadoop 
gateway which is usedto connect the cluster.
Then what would go for spark, is it same as hadoop... And where is the 
instruction doc for installing this gateway...
Thank you very much~~Zhiliang 


Re: how to submit the spark job outside the cluster

2015-09-22 Thread Zhan Zhang
It should be similar to other hadoop jobs. You need hadoop configuration in 
your client machine, and point the HADOOP_CONF_DIR in spark to the 


Zhan Zhang
On Sep 22, 2015, at 6:37 PM, Zhiliang Zhu 
> wrote:

Dear Experts,

Spark job is running on the cluster by yarn. Since the job can be submited at 
the place on the machine from the cluster,
however, I would like to submit the job from another machine which does not 
belong to the cluster.
I know for this, hadoop job could be done by way of another machine which is 
installed hadoop gateway which is used
to connect the cluster.

Then what would go for spark, is it same as hadoop... And where is the 
instruction doc for installing this gateway...

Thank you very much~~

Yarn Shutting Down Spark Processing

2015-09-22 Thread Bryan Jeffrey

I have a Spark streaming job running on a cluster managed by Yarn.  The
spark streaming job starts and receives data from Kafka.  It is processing
well and then after several seconds I see the following error:

15/09/22 14:53:49 ERROR yarn.ApplicationMaster: SparkContext did not
initialize after waiting for 10 ms. Please check earlier log output for
errors. Failing the application.
15/09/22 14:53:49 INFO yarn.ApplicationMaster: Final app status: FAILED,
exitCode: 13, (reason: Timed out waiting for SparkContext.)

The spark process is then (obviously) shut down by Yarn.

What do I need to change to allow Yarn to initialize Spark streaming (vs.
batch) jobs?

Thank you,

Bryan Jeffrey

Re: Re: Spark standalone/Mesos on top of Ceph

2015-09-22 Thread Jerry Lam
Hi Sun,

The issue with Ceph as the underlying file system for Spark is that you
lose data locality. Ceph is not designed to have spark run directly on top
of the OSDs. I know that cephfs provides data location information via
hadoop compatible API. The last time I researched on this is that the
integration is experimental (just google it and you will find a lot of
discussions eg.

However, this might not be a biggest issue as long as you have GREAT
network bandwidth like infiniband or +10 Gigabit Ethernet. My guess is that
the architecture and the performance will be similar to S3+Spark at best
(with 10GE instances) if you guys do the network stuff seriously.



On Tue, Sep 22, 2015 at 9:59 PM, 

> Hi Jerry
> Yeah, we managed to run and use ceph already in our few production
> environment, especially with OpenStack.
> The reason we want to use Ceph is that we aim to look for some workarounds
> for unified storage layer and the design
> concepts of ceph is quite catching. I am just interested in such work like
> the hadoop cephfs plugin and recently we
> are going to do some benchmark tests between HDFS and cephfs.
> So the ongoing progress would be benificial if some related work between
> Apache Spark and Ceph could dedicate some
> thoughful insights.
> BTW, for the Ceph Object Gateway s3 rest api, agreed for such
> inconvinience and some incompobilities. However, we had not
> yet quite researched and tested over radosgw a lot. But we had some little
> requirements using gw in some use cases.
> Hope for more considerations and talks.
> Best,
> Sun.
> --
> *From:* Jerry Lam 
> *Date:* 2015-09-23 09:37
> *To:*
> *CC:* user 
> *Subject:* Re: Spark standalone/Mesos on top of Ceph
> Do you have specific reasons to use Ceph? I used Ceph before, I'm not too
> in love with it especially when I was using the Ceph Object Gateway S3 API.
> There are some incompatibilities with aws s3 api. You really really need to
> try it because making the commitment. Did you managed to install it?
> On Tue, Sep 22, 2015 at 9:28 PM, 
> wrote:
>> Hi guys,
>> Here is the info for Ceph :
>> We are investigating and using Ceph for distributed storage and
>> monitoring, specifically interested
>> in using Ceph as the underlied file system storage for spark. However, we
>> had no experience for achiveing
>> that. Any body has seen such progress ?
>> Best,
>> Sun.
>> --

Parallel collection in driver programs

2015-09-22 Thread Andy Huang
Hi All,

Would like know if anyone has experienced with parallel collection in the
driver program. And, if there is actual advantage/disadvantage of doing so.

E.g. With a collection of Jdbc connections and tables

We have adapted our non-spark code which utilize parallel collection to the
spark code and it seems to work fine.

val conf = List(

val _JDBC_DEFAULT = "jdbc:sqlserver://;database=TestSource"
val _STORE_DEFAULT = "hdfs://"

val prop = new Properties()

  val qry = pair._2.split("::")(0)
  val pCol = pair._2.split("::")(1)
  val lo = pair._2.split("::")(2).toInt
  val hi = pair._2.split("::")(3).toInt
  val part = pair._2.split("::")(4).toInt

  //create dataframe from jdbc table
  val jdbcDF =
"("+qry+") a",
lo, //lower bound
hi, //upper bound
part, //number of partitions
prop //java.utils.Properties - key value pair

  //save to parquet


Andy Huang | Managing Consultant | Servian Pty Ltd | t: 02 9376 0700 |
f: 02 9376 0730| m: 0433221979

Re: Streaming Receiver Imbalance Problem

2015-09-22 Thread Tathagata Das
A lot of these imbalances were solved in spark 1.5. Could you give that a

On Tue, Sep 22, 2015 at 12:17 AM, SLiZn Liu  wrote:

> Hi spark users,
> In our Spark Streaming app via Kafka integration on Mesos, we initialed 3
> receivers to receive 3 Kafka partitions, whereas records receiving rate
> imbalance been observed, with spark.streaming.receiver.maxRate is set to
> 120, sometimes 1 of which receives very close to the limit while the
> other two only at roughly fifty per second.
> This may be caused by previous receiver failure, where one of the
> receivers’ receiving rate drop to 0. We restarted the Spark Streaming app,
> and the imbalance began. We suspect that the partition which received by
> the failing receiver got jammed, and the other two receivers cannot take up
> its data.
> The 3-nodes cluster tends to run slowly, nearly all the tasks is
> registered at the node with previous receiver failure(I used unionto
> combine 3 receivers’ DStream, thus I expect the combined DStream is well
> distributed across all nodes), cannot guarantee to finish one batch in a
> single batch time, stages get piled up, and the digested log shows as
> following:
> ...
> 5728.399: [GC (Allocation Failure) [PSYoungGen: 6954678K->17088K(6961152K)] 
> 7074614K->138108K(20942336K), 0.0203877 secs] [Times: user=0.20 sys=0.00, 
> real=0.02 secs]
> ...
> 5/09/22 13:33:35 ERROR TaskSchedulerImpl: Ignoring update with state FINISHED 
> for TID 77219 because its task set is gone (this is likely the result of
> receiving duplicate task finished status updates)
> ...
> the two type of log was printed in execution of some (not all) stages.
> My configurations:
> # of cores on each node: 64
> # of nodes: 3
> batch time is set to 10 seconds
> spark.streaming.receiver.maxRate120
> spark.streaming.blockInterval   160  // set to the value that divides 
> 10 seconds approx. to  total cores, which is 64, to max out all the nodes: 
> 10s * 1000 / 64
>  // this one doesn't seem to 
> work, since the young gen / old gen ratio is nearly 0.3 instead of 0.1
> anyone got an idea? Appreciate for your patience.
> BR,
> Todd Leo
> ​

Re: Streaming Receiver Imbalance Problem

2015-09-22 Thread SLiZn Liu
Cool, we are still sticking with 1.3.1, will upgrade to 1.5 ASAP. Thanks
for the tips, Tathagata!

On Wed, Sep 23, 2015 at 10:40 AM Tathagata Das  wrote:

> A lot of these imbalances were solved in spark 1.5. Could you give that a
> spin?
> On Tue, Sep 22, 2015 at 12:17 AM, SLiZn Liu 
> wrote:
>> Hi spark users,
>> In our Spark Streaming app via Kafka integration on Mesos, we initialed 3
>> receivers to receive 3 Kafka partitions, whereas records receiving rate
>> imbalance been observed, with spark.streaming.receiver.maxRate is set to
>> 120, sometimes 1 of which receives very close to the limit while the
>> other two only at roughly fifty per second.
>> This may be caused by previous receiver failure, where one of the
>> receivers’ receiving rate drop to 0. We restarted the Spark Streaming app,
>> and the imbalance began. We suspect that the partition which received by
>> the failing receiver got jammed, and the other two receivers cannot take up
>> its data.
>> The 3-nodes cluster tends to run slowly, nearly all the tasks is
>> registered at the node with previous receiver failure(I used unionto
>> combine 3 receivers’ DStream, thus I expect the combined DStream is well
>> distributed across all nodes), cannot guarantee to finish one batch in a
>> single batch time, stages get piled up, and the digested log shows as
>> following:
>> ...
>> 5728.399: [GC (Allocation Failure) [PSYoungGen: 6954678K->17088K(6961152K)] 
>> 7074614K->138108K(20942336K), 0.0203877 secs] [Times: user=0.20 sys=0.00, 
>> real=0.02 secs]
>> ...
>> 5/09/22 13:33:35 ERROR TaskSchedulerImpl: Ignoring update with state 
>> FINISHED for TID 77219 because its task set is gone (this is likely the 
>> result of
>> receiving duplicate task finished status updates)
>> ...
>> the two type of log was printed in execution of some (not all) stages.
>> My configurations:
>> # of cores on each node: 64
>> # of nodes: 3
>> batch time is set to 10 seconds
>> spark.streaming.receiver.maxRate120
>> spark.streaming.blockInterval   160  // set to the value that 
>> divides 10 seconds approx. to  total cores, which is 64, to max out all the 
>> nodes: 10s * 1000 / 64
>>  // this one doesn't seem to 
>> work, since the young gen / old gen ratio is nearly 0.3 instead of 0.1
>> anyone got an idea? Appreciate for your patience.
>> BR,
>> Todd Leo
>> ​

Re: Re: Spark standalone/Mesos on top of Ceph

2015-09-22 Thread
Hi Jerry

Yeah, we managed to run and use ceph already in our few production environment, 
especially with OpenStack. 

The reason we want to use Ceph is that we aim to look for some workarounds for 
unified storage layer and the design

concepts of ceph is quite catching. I am just interested in such work like the 
hadoop cephfs plugin and recently we 

are going to do some benchmark tests between HDFS and cephfs. 

So the ongoing progress would be benificial if some related work between Apache 
Spark and Ceph could dedicate some
thoughful insights. 

BTW, for the Ceph Object Gateway s3 rest api, agreed for such inconvinience and 
some incompobilities. However, we had not

yet quite researched and tested over radosgw a lot. But we had some little 
requirements using gw in some use cases. 

Hope for more considerations and talks.

From: Jerry Lam
Date: 2015-09-23 09:37
CC: user
Subject: Re: Spark standalone/Mesos on top of Ceph
Do you have specific reasons to use Ceph? I used Ceph before, I'm not too in 
love with it especially when I was using the Ceph Object Gateway S3 API. There 
are some incompatibilities with aws s3 api. You really really need to try it 
because making the commitment. Did you managed to install it? 

On Tue, Sep 22, 2015 at 9:28 PM, wrote:

Hi guys,

Here is the info for Ceph : 

We are investigating and using Ceph for distributed storage and monitoring, 
specifically interested

in using Ceph as the underlied file system storage for spark. However, we had 
no experience for achiveing 

that. Any body has seen such progress ? 


Re: Long GC pauses with Spark SQL 1.3.0 and billion row tables

2015-09-22 Thread tridib
By skewed did you mean it's not distributed uniformly across partition?
All of my columns are string and almost of same size. i.e.


View this message in context:
Sent from the Apache Spark User List mailing list archive at

To unsubscribe, e-mail:
For additional commands, e-mail:

Re: How to share memory in a broadcast between tasks in the same executor?

2015-09-22 Thread Deenar Toraskar

In local mode all worker threads run in the driver VM. Your dictionary
should not be copied 32 times, in fact it wont be broadcast at all. Have
you tried increasing spark.driver.memory to ensure that the driver uses all
the memory on the machine.


On 22 September 2015 at 19:42, Clément Frison 

> Hello,
> My team and I have a 32-core machine and we would like to use a huge
> object - for example a large dictionary - in a map transformation and use
> all our cores in parallel by sharing this object among some tasks.
> We broadcast our large dictionary.
> dico_br = sc.broadcast(dico)
> We use it in a map:
> x: (x[0], function(x[1], dico_br)))
> where function does a lookup : dico_br.value[x]
> Our issue is that our dictionary is loaded 32 times in memory, and it
> doesn't fit. So what we are doing is limiting the number of executors. It
> works fine but we only have 8 cpus working in parallel instead of 32.
> We would like to take advantage of multicore processing and shared memory,
> as the 32 cores are in the same machine. For example we would like to load
> the dictionary in memory 8 times only and make 4 cores share it. How could
> we achieve that with Spark ?
> What we have tried - without success :
> 1) One driver/worker with 32 cores : local[32]
> 2) Standalone with one master and 8 workers - each of them having 4 cores
> Thanks a lot for your help, Clement

Re: how to submit the spark job outside the cluster

2015-09-22 Thread Zhan Zhang
Hi Zhiliang,

I cannot find a specific doc. But as far as I remember, you can log in one of 
your cluster machine, and find the hadoop configuration location, for example 
/etc/hadoop/conf, copy that directory to your local machine.
Typically it has hdfs-site.xml, yarn-site.xml etc. In spark, the former is used 
to access hdfs, and the latter is used to launch application on top of yarn.

Then in the, you add export HADOOP_CONF_DIR=/etc/hadoop/conf.


Zhan Zhang

On Sep 22, 2015, at 8:14 PM, Zhiliang Zhu 
> wrote:

Hi Zhan,

Yes, I get it now.
I have not ever deployed hadoop configuration locally, and do not find the 
specific doc, would you help provide the doc to do that...

Thank you,

On Wednesday, September 23, 2015 11:08 AM, Zhan Zhang 
> wrote:

There is no difference between running the client in or out of the client 
(assuming there is no firewall or network connectivity issue), as long as you 
have hadoop configuration locally.  Here is the doc for running on yarn.


Zhan Zhang

On Sep 22, 2015, at 7:49 PM, Zhiliang Zhu 
> wrote:

Hi Zhan,

Thanks very much for your help comment.
I also view it would be similar to hadoop job submit, however, I was not 
deciding whether it is like that when
it comes to spark.

Have you ever tried that for spark...
Would you give me the deployment doc for hadoop and spark gateway, since this 
is the first time for me
to do that, I do not find the specific doc for it.

Best Regards,

On Wednesday, September 23, 2015 10:20 AM, Zhan Zhang 
> wrote:

It should be similar to other hadoop jobs. You need hadoop configuration in 
your client machine, and point the HADOOP_CONF_DIR in spark to the 


Zhan Zhang
On Sep 22, 2015, at 6:37 PM, Zhiliang Zhu 
> wrote:

Dear Experts,

Spark job is running on the cluster by yarn. Since the job can be submited at 
the place on the machine from the cluster,
however, I would like to submit the job from another machine which does not 
belong to the cluster.
I know for this, hadoop job could be done by way of another machine which is 
installed hadoop gateway which is used
to connect the cluster.

Then what would go for spark, is it same as hadoop... And where is the 
instruction doc for installing this gateway...

Thank you very much~~

RE: Why is 1 executor overworked and other sit idle?

2015-09-22 Thread Chirag Dewan
Thanks Ted and Rich.

So if I repartition my RDD programmatically and call coalesce on the RDD to 1 
partition would that generate 1 output file?

Ahh.. Is my coalesce operation causing 1 partition, hence 1 output file and 1 
executor working on all the data?

To summarize this is what I do :-

1)  Create a Cassandra RDD

2)  Cache this RDD

3)  Map it to CSV

4)  Coalesce(because I need a single output file)

5)  Write to file on local file system

This makes sense.



From: Richard Eggert []
Sent: Wednesday, September 23, 2015 5:39 AM
To: Ted Yu
Cc: User; Chirag Dewan
Subject: Re: Why is 1 executor overworked and other sit idle?

If there's only one partition, by definition it will only be handled by one 
executor. Repartition to divide the work up. Note that this will also result in 
multiple output files,  however. If you absolutely need them to be combined 
into a single file,  I suggest using the Unix/Linux 'cat' command to 
concatenate the files afterwards.

On Sep 22, 2015 9:20 AM, "Ted Yu" 
> wrote:
Have you tried using repartition to spread the load ?


On Sep 22, 2015, at 4:22 AM, Chirag Dewan 
> wrote:

I am using Spark to access around 300m rows in Cassandra.

My job is pretty simple as I am just mapping my row into a CSV format and 
saving it as a text file.

public String call(CassandraRow row)

throws Exception {
sb = new StringBuilder();






My map methods looks like this.

I am having a 3 node cluster. I observe that driver starts on Node A. And 
executors are spawned on all 3 nodes. But the executor of Node B or C are doing 
all the tasks. It starts a saveasTextFile job with 1 output partition and 
stores the RDDs in memory and also commits the file on local file system.

This executor is using a lot of system memory and CPU while others are sitting 

Am I doing something wrong? Is my RDD correctly partitioned?

Thanks in advance.


Re: Py4j issue with Python Kafka Module

2015-09-22 Thread ayan guha
I must have been gone mad :) Thanks for pointing it out. I downloaded 1.5.0
assembly jar and added it in SPARK_CLASSPATH.

However, I am getting a new error now

>>> kvs =

  Spark Streaming's Kafka libraries not found in class path. Try one of the

  1. Include the Kafka library and its dependencies with in the
 spark-submit command as

 $ bin/spark-submit --packages

  2. Download the JAR of the artifact from Maven Central
 Group Id = org.apache.spark, Artifact Id =
Version = 1.5.0.
 Then, include the jar in the spark-submit command as

 $ bin/spark-submit --jars  ...

Traceback (most recent call last):
  File "", line 1, in 
\streaming\", line 130, in createDirectStream
raise e
py4j.protocol.Py4JJavaError: An error occurred while calling o30.loadClass.
: java.lang.ClassNotFoundException:
at Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at py4j.reflection.MethodInvoker.invoke(
at py4j.Gateway.invoke(
at py4j.commands.CallCommand.execute(
at Source)

>>> os.environ['SPARK_CLASSPATH']

So I launched pyspark with --jars with the assembly jar. Now it is working.

THANK YOU for help.

Curiosity:  Why adding it to SPARK CLASSPATH did not work?


On Wed, Sep 23, 2015 at 2:25 AM, Saisai Shao  wrote:

> I think you're using the wrong version of kafka assembly jar, I think
> Python API from direct Kafka stream is not supported for Spark 1.3.0, you'd
> better change to version 1.5.0, looks like you're using Spark 1.5.0, why
> you choose Kafka assembly 1.3.0?
> D:\sw\spark-streaming-kafka-assembly_2.10-1.3.0.jar
> On Tue, Sep 22, 2015 at 6:41 AM, ayan guha  wrote:
>> Hi
>> I have added spark assembly jar to SPARK CLASSPATH
>> >>> print os.environ['SPARK_CLASSPATH']
>> D:\sw\spark-streaming-kafka-assembly_2.10-1.3.0.jar
>> Now  I am facing below issue with a test topic
>> >>> ssc = StreamingContext(sc, 2)
>> >>> kvs =
>> KafkaUtils.createDirectStream(ssc,['spark'],{"":'l
>> ocalhost:9092'})
>> Traceback (most recent call last):
>>   File "", line 1, in 
>>   File
>> "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\pyspark
>> \streaming\", line 126, in createDirectStream
>> jstream = helper.createDirectStream(ssc._jssc, kafkaParams,
>> set(topics), jfr
>> omOffsets)
>>   File
>> "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\lib\py4
>>\py4j\", line 538, in __call__
>>   File
>> "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\pyspark
>> \sql\", line 36, in deco
>> return f(*a, **kw)
>>   File
>> "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\lib\py4
>>\py4j\", line 304, in get_return_value
>> py4j.protocol.Py4JError: An error occurred while calling
>> o22.createDirectStream.
>>  Trace:
>> py4j.Py4JException: Method createDirectStream([class
>> org.apache.spark.streaming.
>>, class java.util.HashMap, class
>> java.util.HashSet,
>>  class java.util.HashMap]) does not exist
>> at
>> py4j.reflection.ReflectionEngine.getMethod(
>> at
>> py4j.reflection.ReflectionEngine.getMethod(
>> at py4j.Gateway.invoke(
>> at
>> py4j.commands.AbstractCommand.invokeMethod(
>> at py4j.commands.CallCommand.execute(
>> at
>> at Source)

Re: Apache Spark job in local[*] is slower than regular 1-thread Python program

2015-09-22 Thread Jonathan Coveney
It's highly conceivable to be able to beat spark in performance on tiny
data sets like this. That's not really what it has been optimized for.

El martes, 22 de septiembre de 2015, juljoin 

> Hello,
> I am trying to figure Spark out and I still have some problems with its
> speed, I can't figure them out. In short, I wrote two programs that loop
> through a 3.8Gb file and filter each line depending of if a certain word is
> present.
> I wrote a one-thread python program doing the job and I obtain:
> - for the 3.8Gb file:
> / lines found: 82100
>  in: *10.54 seconds*/
>  - no filter, just looping through the file:
> / in: 01.65 seconds/
> The Spark app doing the same and executed on 8 threads gives:
>  - for the 3.8Gb file:
> / lines found: 82100
>  in: *18.27 seconds*/
>  - for a 38Mb file:
> /lines found: 821
> in: 2.53 seconds/
> I must do something wrong to obtain a result twice as slow on the 8 threads
> than on 1 thread.
> 1. First, I thought it might be because of the setting-up cost of Spark.
> But
> for smaller files it only takes 2 seconds which makes this option
> improbable.
> 2. Looping through the file takes up 1.65 seconds (thank you SSD ^_^ ),
> processing takes up the other 9seconds (for the python app).
> -> This is why I thought splitting it up on the different processes will
> definitely speed it up.
> Note: Increasing the number of threads in Spark improves the speed (from 57
> seconds with 1 thread to 18 seconds with 8 threads). But still, there is a
> big difference in performance between simple python and Spark, it must be
> my
> doing!
> Can someone point me out on what I am doing wrong? That would be greatly
> appreciated :) I am new with all this big data stuff.
> *Here is the code for the Spark app:*
> *And the python code:*
> Thank you for reading up to this point :)
> Have a nice day!
> - Julien
> --
> View this message in context:
> Sent from the Apache Spark User List mailing list archive at
> -
> To unsubscribe, e-mail: 
> For additional commands, e-mail: 

Re: how to submit the spark job outside the cluster

2015-09-22 Thread Zhiliang Zhu
Hi Zhan,
Yes, I get it now. 
I have not ever deployed hadoop configuration locally, and do not find the 
specific doc, would you help provide the doc to do that...
Thank you,Zhiliang

 On Wednesday, September 23, 2015 11:08 AM, Zhan Zhang 

 There is no difference between running the client in or out of the client 
(assuming there is no firewall or network connectivity issue), as long as you 
have hadoop configuration locally.  Here is the doc for running on yarn.
Zhan Zhang
On Sep 22, 2015, at 7:49 PM, Zhiliang Zhu  wrote:

Hi Zhan,
Thanks very much for your help comment.I also view it would be similar to 
hadoop job submit, however, I was not deciding whether it is like that whenit 
comes to spark.  
Have you ever tried that for spark...Would you give me the deployment doc for 
hadoop and spark gateway, since this is the first time for meto do that, I do 
not find the specific doc for it.

Best Regards,Zhiliang

On Wednesday, September 23, 2015 10:20 AM, Zhan Zhang  

It should be similar to other hadoop jobs. You need hadoop configuration in 
your client machine, and point the HADOOP_CONF_DIR in spark to the 
Zhan Zhang
On Sep 22, 2015, at 6:37 PM, Zhiliang Zhu  wrote:

Dear Experts,

Spark job is running on the cluster by yarn. Since the job can be submited at 
the place on the machine from the cluster,however, I would like to submit the 
job from another machine which does not belong to the cluster.I know for this, 
hadoop job could be done by way of another machine which is installed hadoop 
gateway which is usedto connect the cluster.
Then what would go for spark, is it same as hadoop... And where is the 
instruction doc for installing this gateway...
Thank you very much~~Zhiliang 


Scala Limitation - Case Class definition with more than 22 arguments

2015-09-22 Thread satish chandra j
HI All,
Do we have any alternative solutions in Scala to avoid limitation in
defining a Case Class having more than 22 arguments

We are using Scala version 2.10.2, currently I need to define a case class
with 37 arguments but getting an error as "*error: Implementation
restriction: case classes cannot have more than 22 parameters.*"

It would be a great help if any inputs on the same

Satish Chandra

Re: Scala Limitation - Case Class definition with more than 22 arguments

2015-09-22 Thread Andy Huang
Alternatively, I would suggest you looking at programmatically building the

refer to


On Wed, Sep 23, 2015 at 2:07 PM, Ted Yu  wrote:

> Can you switch to 2.11 ?
> The following has been fixed in 2.11:
> Otherwise consider packaging related values into a case class of their own.
> On Tue, Sep 22, 2015 at 8:48 PM, satish chandra j <
>> wrote:
>> HI All,
>> Do we have any alternative solutions in Scala to avoid limitation in
>> defining a Case Class having more than 22 arguments
>> We are using Scala version 2.10.2, currently I need to define a case
>> class with 37 arguments but getting an error as "*error: Implementation
>> restriction: case classes cannot have more than 22 parameters.*"
>> It would be a great help if any inputs on the same
>> Regards,
>> Satish Chandra

Andy Huang | Managing Consultant | Servian Pty Ltd | t: 02 9376 0700 |
f: 02 9376 0730| m: 0433221979

Re: Invalid checkpoint url

2015-09-22 Thread srungarapu vamsi
No, i am getting in the cluster mode.
I think i understood why i am getting this error, please correct me if i am
Reason is:
checkpointing writes rdd to disk, so this checkpointing happens on all
workers. Whenever, spark has to read back the rdd , checkpoint directory
should be reachable to all the workers and should be a common place where
workers can write to and read from. This  asks for commonly accessible file
system like nfs or hdfs or s3.
So, if i give ssc.checkpoint("some local directory"), since workers are not
able to read the rdds from the other worker's checkpoint directory , i am
facing the above mentioned error.
With this understanding, i am creating a t2 medium, hdfs 2.7.1 node and
pointing the check point directory to "hdfs://ip:port/path/to/directory"

Please correct me if i am wrong.

On Wed, Sep 23, 2015 at 4:53 AM, Tathagata Das  wrote:

> Are you getting this error in local mode?
> On Tue, Sep 22, 2015 at 7:34 AM, srungarapu vamsi <
>> wrote:
>> Yes, I tried ssc.checkpoint("checkpoint"), it works for me as long as i
>> don't use reduceByKeyAndWindow.
>> When i start using "reduceByKeyAndWindow" it complains me with the error
>> "Exception in thread "main" org.apache.spark.SparkException: Invalid
>> checkpoint directory: file:/home/ubuntu/checkpoint/342e3171-01f3-48$
>> 2-97be-e3862eb5c944/rdd-8"
>> The stack trace is as below:
>> Exception in thread "main" org.apache.spark.SparkException: Invalid
>> checkpoint directory: file:/home/ubuntu/checkpoint/342e3171[22/9706$
>> 2-97be-e3862eb5c944/rdd-8
>> at
>> org.apache.spark.rdd.CheckpointRDD.getPartitions(CheckpointRDD.scala:54)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
>> at scala.Option.getOrElse(Option.scala:120)
>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
>> at
>> org.apache.spark.rdd.RDDCheckpointData.doCheckpoint(RDDCheckpointData.scala:97)
>> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1415)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>> at scala.collection.immutable.List.foreach(List.scala:318)
>> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>> at scala.collection.immutable.List.foreach(List.scala:318)
>> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1468)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1483)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1504)
>> at
>> com.datastax.spark.connector.streaming.DStreamFunctions$$anonfun$saveToCassandra$1.apply(DStreamFunctions.scala:33)
>> at
>> com.datastax.spark.connector.streaming.DStreamFunctions$$anonfun$saveToCassandra$1.apply(DStreamFunctions.scala:33)
>> at

Re: Creating BlockMatrix with java API

2015-09-22 Thread Pulasthi Supun Wickramasinghe
Hi Sabarish

Thanks, that would indeed solve my problem

Best Regards,

On Wed, Sep 23, 2015 at 12:55 AM, Sabarish Sasidharan <> wrote:

> Hi Pulasthi
> You can always use JavaRDD.rdd() to get the scala rdd. So in your case,
> new BlockMatrix(rdd.rdd(), 2, 2)
> should work.
> Regards
> Sab
> On Tue, Sep 22, 2015 at 10:50 PM, Pulasthi Supun Wickramasinghe <
>> wrote:
>> Hi Yanbo,
>> Thanks for the reply. I thought i might be missing something. Anyway i
>> moved to using scala since it is the complete API.
>> Best Regards,
>> Pulasthi
>> On Tue, Sep 22, 2015 at 7:03 AM, Yanbo Liang  wrote:
>>> This is due to the distributed matrices like 
>>> BlockMatrix/RowMatrix/IndexedRowMatrix/CoordinateMatrix do
>>> not provide Java friendly constructors. I have file a SPARK-10757
>>>  to track this issue.
>>> 2015-09-18 3:36 GMT+08:00 Pulasthi Supun Wickramasinghe <
 Hi All,

 I am new to Spark and i am trying to do some BlockMatrix operations
 with the Mllib API's. But i can't seem to create a BlockMatrix with the
 java API. I tried the following

 Matrix matrixa = Matrices.rand(4, 4, new Random(1000));
 List,Matrix>> list = new 
 ArrayList, Matrix>>();
 Tuple2 intTuple = new Tuple2(0,0);
 Tuple2,Matrix> tuple2MatrixTuple2 = new 
 Tuple2, Matrix>(intTuple,matrixa );
 JavaRDD, Matrix>> rdd = 

 BlockMatrix blockMatrix = new BlockMatrix(rdd,2,2);

 but since BlockMatrix only
 this code does not work. sc.parallelize() returns a JavaRDD so the two
 are not compatible. I also couldn't find any code samples for this. Any
 help on this would be highly appreciated.

 Best Regards,
 Pulasthi S. Wickramasinghe
 Graduate Student  | Research Assistant
 School of Informatics and Computing | Digital Science Center
 Indiana University, Bloomington
 cell: 224-386-9035

>> --
>> Pulasthi S. Wickramasinghe
>> Graduate Student  | Research Assistant
>> School of Informatics and Computing | Digital Science Center
>> Indiana University, Bloomington
>> cell: 224-386-9035
> --
> Architect - Big Data
> Ph: +91 99805 99458
> Manthan Systems | *Company of the year - Analytics (2014 Frost and
> Sullivan India ICT)*
> +++

Pulasthi S. Wickramasinghe
Graduate Student  | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
cell: 224-386-9035

How to make Group By/reduceByKey more efficient?

2015-09-22 Thread swetha

How to make Group By more efficient? Is it recommended to use a custom
partitioner and then do a Group By? And can we use a custom partitioner and
then use a  reduceByKey for optimization?


View this message in context:
Sent from the Apache Spark User List mailing list archive at

To unsubscribe, e-mail:
For additional commands, e-mail:

Re: how to submit the spark job outside the cluster

2015-09-22 Thread Zhiliang Zhu
Hi Zhan,
Thanks very much for your help comment.I also view it would be similar to 
hadoop job submit, however, I was not deciding whether it is like that when it 
comes to spark.  
Have you ever tried that for spark...Would you give me the deployment doc for 
hadoop and spark gateway, since this is the first time for meto do that, I do 
not find the specific doc for it. 

Best Regards,Zhiliang


 On Wednesday, September 23, 2015 10:20 AM, Zhan Zhang 

 It should be similar to other hadoop jobs. You need hadoop configuration in 
your client machine, and point the HADOOP_CONF_DIR in spark to the 
Zhan Zhang
On Sep 22, 2015, at 6:37 PM, Zhiliang Zhu  wrote:

Dear Experts,

Spark job is running on the cluster by yarn. Since the job can be submited at 
the place on the machine from the cluster,however, I would like to submit the 
job from another machine which does not belong to the cluster.I know for this, 
hadoop job could be done by way of another machine which is installed hadoop 
gateway which is usedto connect the cluster.
Then what would go for spark, is it same as hadoop... And where is the 
instruction doc for installing this gateway...
Thank you very much~~Zhiliang 


Re: Py4j issue with Python Kafka Module

2015-09-22 Thread Saisai Shao
I think it is something related to class loader, the behavior is different
for classpath and --jars. If you want to know the details I think you'd
better dig out some source code.


On Tue, Sep 22, 2015 at 9:10 PM, ayan guha  wrote:

> I must have been gone mad :) Thanks for pointing it out. I downloaded
> 1.5.0 assembly jar and added it in SPARK_CLASSPATH.
> However, I am getting a new error now
> >>> kvs =
> KafkaUtils.createDirectStream(ssc,['spark'],{"":'l
> ocalhost:9092'})
>   Spark Streaming's Kafka libraries not found in class path. Try one of
> the foll
> owing.
>   1. Include the Kafka library and its dependencies with in the
>  spark-submit command as
>  $ bin/spark-submit --packages
> org.apache.spark:spark-streaming-kafka:1.5.0
> ...
>   2. Download the JAR of the artifact from Maven Central
> /,
>  Group Id = org.apache.spark, Artifact Id =
> spark-streaming-kafka-assembly,
> Version = 1.5.0.
>  Then, include the jar in the spark-submit command as
>  $ bin/spark-submit --jars  ...
> Traceback (most recent call last):
>   File "", line 1, in 
>   File
> "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\pyspark
> \streaming\", line 130, in createDirectStream
> raise e
> py4j.protocol.Py4JJavaError: An error occurred while calling o30.loadClass.
> : java.lang.ClassNotFoundException:
> org.apache.spark.streaming.kafka.KafkaUtilsP
> ythonHelper
> at Source)
> at java.lang.ClassLoader.loadClass(Unknown Source)
> at java.lang.ClassLoader.loadClass(Unknown Source)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
> at java.lang.reflect.Method.invoke(Unknown Source)
> at py4j.reflection.MethodInvoker.invoke(
> at
> py4j.reflection.ReflectionEngine.invoke(
> at py4j.Gateway.invoke(
> at
> py4j.commands.AbstractCommand.invokeMethod(
> at py4j.commands.CallCommand.execute(
> at
> at Source)
> >>> os.environ['SPARK_CLASSPATH']
> 'D:\\sw\\spark-streaming-kafka-assembly_2.10-1.5.0'
> >>>
> So I launched pyspark with --jars with the assembly jar. Now it is
> working.
> THANK YOU for help.
> Curiosity:  Why adding it to SPARK CLASSPATH did not work?
> Best
> Ayan
> On Wed, Sep 23, 2015 at 2:25 AM, Saisai Shao 
> wrote:
>> I think you're using the wrong version of kafka assembly jar, I think
>> Python API from direct Kafka stream is not supported for Spark 1.3.0, you'd
>> better change to version 1.5.0, looks like you're using Spark 1.5.0, why
>> you choose Kafka assembly 1.3.0?
>> D:\sw\spark-streaming-kafka-assembly_2.10-1.3.0.jar
>> On Tue, Sep 22, 2015 at 6:41 AM, ayan guha  wrote:
>>> Hi
>>> I have added spark assembly jar to SPARK CLASSPATH
>>> >>> print os.environ['SPARK_CLASSPATH']
>>> D:\sw\spark-streaming-kafka-assembly_2.10-1.3.0.jar
>>> Now  I am facing below issue with a test topic
>>> >>> ssc = StreamingContext(sc, 2)
>>> >>> kvs =
>>> KafkaUtils.createDirectStream(ssc,['spark'],{"":'l
>>> ocalhost:9092'})
>>> Traceback (most recent call last):
>>>   File "", line 1, in 
>>>   File
>>> "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\pyspark
>>> \streaming\", line 126, in createDirectStream
>>> jstream = helper.createDirectStream(ssc._jssc, kafkaParams,
>>> set(topics), jfr
>>> omOffsets)
>>>   File
>>> "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\lib\py4
>>>\py4j\", line 538, in __call__
>>>   File
>>> "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\pyspark
>>> \sql\", line 36, in deco
>>> return f(*a, **kw)
>>>   File
>>> "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\lib\py4
>>>\py4j\", line 304, in get_return_value
>>> py4j.protocol.Py4JError: An error occurred while calling
>>> o22.createDirectStream.
>>>  Trace:
>>> py4j.Py4JException: Method createDirectStream([class
>>> org.apache.spark.streaming.
>>>, class java.util.HashMap, class
>>> java.util.HashSet,
>>>  class java.util.HashMap]) does not exist
>>> at

Re: Creating BlockMatrix with java API

2015-09-22 Thread Sabarish Sasidharan
Hi Pulasthi

You can always use JavaRDD.rdd() to get the scala rdd. So in your case,

new BlockMatrix(rdd.rdd(), 2, 2)

should work.


On Tue, Sep 22, 2015 at 10:50 PM, Pulasthi Supun Wickramasinghe <> wrote:

> Hi Yanbo,
> Thanks for the reply. I thought i might be missing something. Anyway i
> moved to using scala since it is the complete API.
> Best Regards,
> Pulasthi
> On Tue, Sep 22, 2015 at 7:03 AM, Yanbo Liang  wrote:
>> This is due to the distributed matrices like 
>> BlockMatrix/RowMatrix/IndexedRowMatrix/CoordinateMatrix do
>> not provide Java friendly constructors. I have file a SPARK-10757
>>  to track this issue.
>> 2015-09-18 3:36 GMT+08:00 Pulasthi Supun Wickramasinghe <
>>> Hi All,
>>> I am new to Spark and i am trying to do some BlockMatrix operations with
>>> the Mllib API's. But i can't seem to create a BlockMatrix with the java
>>> API. I tried the following
>>> Matrix matrixa = Matrices.rand(4, 4, new Random(1000));
>>> List,Matrix>> list = new 
>>> ArrayList, Matrix>>();
>>> Tuple2 intTuple = new Tuple2(0,0);
>>> Tuple2,Matrix> tuple2MatrixTuple2 = new 
>>> Tuple2, Matrix>(intTuple,matrixa );
>>> list.add(tuple2MatrixTuple2);
>>> JavaRDD, Matrix>> rdd = 
>>> sc.parallelize(list);
>>> BlockMatrix blockMatrix = new BlockMatrix(rdd,2,2);
>>> but since BlockMatrix only
>>> takes 
>>> "RDD,Matrix>>"
>>> this code does not work. sc.parallelize() returns a JavaRDD so the two
>>> are not compatible. I also couldn't find any code samples for this. Any
>>> help on this would be highly appreciated.
>>> Best Regards,
>>> Pulasthi
>>> --
>>> Pulasthi S. Wickramasinghe
>>> Graduate Student  | Research Assistant
>>> School of Informatics and Computing | Digital Science Center
>>> Indiana University, Bloomington
>>> cell: 224-386-9035
> --
> Pulasthi S. Wickramasinghe
> Graduate Student  | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
> cell: 224-386-9035


Architect - Big Data
Ph: +91 99805 99458

Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan
India ICT)*

Re: SparkR vs R

2015-09-22 Thread Yashwanth Kumar

1. The main difference between SparkR and R is that "SparkR" can handle

Yes, you can use other core libraries inside SparkR(not algos like

2.Yes, core R libraries will not be distributed. You can use function from
these libraries which are applicabe for mapper kind of thing. funnctions
which can be applied on each line individually.

3. SparkR is an wrapper for an underlying scala code, Whereas for R it is
R gives you complete flexibility to do any machine learning u want, While
SparkR is still in developing stage.

View this message in context:
Sent from the Apache Spark User List mailing list archive at

To unsubscribe, e-mail:
For additional commands, e-mail:

Re: how to submit the spark job outside the cluster

2015-09-22 Thread Zhan Zhang
There is no difference between running the client in or out of the client 
(assuming there is no firewall or network connectivity issue), as long as you 
have hadoop configuration locally.  Here is the doc for running on yarn.


Zhan Zhang

On Sep 22, 2015, at 7:49 PM, Zhiliang Zhu 
> wrote:

Hi Zhan,

Thanks very much for your help comment.
I also view it would be similar to hadoop job submit, however, I was not 
deciding whether it is like that when
it comes to spark.

Have you ever tried that for spark...
Would you give me the deployment doc for hadoop and spark gateway, since this 
is the first time for me
to do that, I do not find the specific doc for it.

Best Regards,

On Wednesday, September 23, 2015 10:20 AM, Zhan Zhang 
> wrote:

It should be similar to other hadoop jobs. You need hadoop configuration in 
your client machine, and point the HADOOP_CONF_DIR in spark to the 


Zhan Zhang
On Sep 22, 2015, at 6:37 PM, Zhiliang Zhu 
> wrote:

Dear Experts,

Spark job is running on the cluster by yarn. Since the job can be submited at 
the place on the machine from the cluster,
however, I would like to submit the job from another machine which does not 
belong to the cluster.
I know for this, hadoop job could be done by way of another machine which is 
installed hadoop gateway which is used
to connect the cluster.

Then what would go for spark, is it same as hadoop... And where is the 
instruction doc for installing this gateway...

Thank you very much~~

Re: WAL on S3

2015-09-22 Thread Michal Čizmazia
Can checkpoints be stored to S3 (via S3/S3A Hadoop URL)?

Trying to answer this question, I looked into Checkpoint.getCheckpointFiles
[1]. It is doing findFirstIn which would probably be calling the S3 LIST
operation. S3 LIST is prone to eventual consistency [2]. What would happen
when getCheckpointFiles retrieves an incomplete list of files to [1]?

The pluggable WAL interface allows me to work around the eventual
consistency of S3 by storing an index of filenames in DynamoDB. However it
seems that something similar is required for checkpoints as well.

I am implementing a Reliable Receiver for Amazon SQS. Alternatively, is
there something I can borrow from DirectKafkaInputDStream? After a DStream
computes an RDD, is there a way for the DStream to tell when processing of
that RDD has been finished and only after that delete the SQS messages.

I was also considering Amazon EFS, but it is only available in a single
region for a preview. EBS could be an option, but it cannot be used across
multiple Availability Zones.


On 22 September 2015 at 21:09, Tathagata Das  wrote:

> You can keep the checkpoints in the Hadoop-compatible file system and the
> WAL somewhere else using your custom WAL implementation. Yes, cleaning up
> the stuff gets complicated as it is not as easy as deleting off the
> checkpoint directory - you will have to clean up checkpoint directory as
> well as the whatever other storage that your custom WAL uses. However, if I
> remember correctly, the WAL information is used only when the Dstreams are
> recovered correctly from checkpoints.
> Note that, there are further details here that require deeper
> understanding. There are actually two uses of WALs in the system -
> 1. Data WAL for received data  - This is what is usually referred to as
> the WAL everywhere. Each receiver writes to a different WAL. This deals
> with bulk data.
> 2. Metadata WAL - This is used by the driver to save metadata information
> like  block to data WAL segment mapping, etc. I usually skip mentioning
> this. This WAL is automatically used when data WAL is enabled. And this
> deals with small data.
> If you have to get around S3's limitations, you will have to plugin both
> WALs (see this
> for SparkConfs, but not that we havent made these confs public). While the
> system supports plugging them in, we havent made this information public
> yet because of such complexities in working with it.  And we have invested
> time in making common sources like Kafka not require WALs (e.g. Direct
> Kafka  approach). In future, we do hope to have a better solution for
> general receivers + WALs + S3 (personally, I really wish S3's semantics
> improve and fixes this issue).
> Another alternative direction may be Amazon EFS. Since it based on EBS, it
> may give the necessary semantics. But I havent given that a spin, so its
> uncharted territory :)
> TD
> On Tue, Sep 22, 2015 at 5:15 PM, Michal Čizmazia 
> wrote:
>> My understanding of pluggable WAL was that it eliminates the need for
>> having a Hadoop-compatible file system [1].
>> What is the use of pluggable WAL when it can be only used together with
>> checkpointing which still requires a Hadoop-compatible file system?
>> [1]:
>> On 22 September 2015 at 19:57, Tathagata Das > > wrote:
>>> 1. Currently, the WAL can be used only with checkpointing turned on,
>>> because it does not make sense to recover from WAL if there is not
>>> checkpoint information to recover from.
>>> 2. Since the current implementation saves the WAL in the checkpoint
>>> directory, they share the fate -- if checkpoint directory is deleted, then
>>> both checkpoint info and WAL info is deleted.
>>> 3. Checkpointing is currently not pluggable. Why do do you want that?
>>> On Tue, Sep 22, 2015 at 4:53 PM, Michal Čizmazia 
>>> wrote:
 I am trying to use pluggable WAL, but it can be used only with
 checkpointing turned on. Thus I still need have a Hadoop-compatible file

 Is there something like pluggable checkpointing?

 Or can WAL be used without checkpointing? What happens when WAL is
 available but the checkpoint directory is lost?


 On 18 September 2015 at 05:47, Tathagata Das 

> I dont think it would work with multipart upload either. The file is
> not visible until the multipart download is explicitly closed. So even if
> each write a part 

Re: SparkR for accumulo

2015-09-22 Thread madhvi.gupta

Hi Rui,

Cant we use the accumulo data RDD created from JAVA in spark, in sparkR?

Thanks and Regards
Madhvi Gupta

On Tuesday 22 September 2015 04:42 PM, Sun, Rui wrote:

I am afraid that there is no support for accumulo in SparkR now, because:

1. It seems that there is no data source support for accumulo, so we can't 
create SparkR dataframe on accumulo
2. It is possible to create RDD from accumulo via AccumuloInputFormat in Scala. 
But unfortunately, SparkR does not support creating RDD from Hadoop files other 
than text file.

-Original Message-
From: madhvi.gupta []
Sent: Tuesday, September 22, 2015 6:25 PM
To: user
Subject: SparkR for accumulo


I want to process accumulo data in R through sparkR.Can anyone help me and let 
me know how to get accumulo data in spark to be used in R?

Thanks and Regards
Madhvi Gupta

To unsubscribe, e-mail:
For additional commands, e-mail:

To unsubscribe, e-mail:
For additional commands, e-mail:

Re: how to submit the spark job outside the cluster

2015-09-22 Thread Zhiliang Zhu
Hi Zhan,
I really appreciate your help, I will do as that next.And on the local machine, 
no hadoop/spark needs to be installed, but only copied with the 
/etc/hadoop/conf... whether the information (for example IP, hostname etc) of 
local machine 
would be set in the conf files...

Moreover, do you have any exprience to submit hadoop/spark job by way of java 
program deployed on thegateway node, but not by way of hadoop/spark command...
Thank you very much~Best Regards,Zhiliang


 On Wednesday, September 23, 2015 11:30 AM, Zhan Zhang 

 Hi Zhiliang,
I cannot find a specific doc. But as far as I remember, you can log in one of 
your cluster machine, and find the hadoop configuration location, for example 
/etc/hadoop/conf, copy that directory to your local machine. Typically it has 
hdfs-site.xml, yarn-site.xml etc. In spark, the former is used to access hdfs, 
and the latter is used to launch application on top of yarn.
Then in the, you add export HADOOP_CONF_DIR=/etc/hadoop/conf. 
Zhan Zhang

On Sep 22, 2015, at 8:14 PM, Zhiliang Zhu  wrote:

Hi Zhan,
Yes, I get it now. 
I have not ever deployed hadoop configuration locally, and do not find the 
specific doc, would you help provide the doc to do that...
Thank you,Zhiliang

On Wednesday, September 23, 2015 11:08 AM, Zhan Zhang  

There is no difference between running the client in or out of the client 
(assuming there is no firewall or network connectivity issue), as long as you 
have hadoop configuration locally.  Here is the doc for running on yarn.
Zhan Zhang
On Sep 22, 2015, at 7:49 PM, Zhiliang Zhu  wrote:

Hi Zhan,
Thanks very much for your help comment.I also view it would be similar to 
hadoop job submit, however, I was not deciding whether it is like that whenit 
comes to spark.  
Have you ever tried that for spark...Would you give me the deployment doc for 
hadoop and spark gateway, since this is the first time for meto do that, I do 
not find the specific doc for it.

Best Regards,Zhiliang

On Wednesday, September 23, 2015 10:20 AM, Zhan Zhang  

It should be similar to other hadoop jobs. You need hadoop configuration in 
your client machine, and point the HADOOP_CONF_DIR in spark to the 
Zhan Zhang
On Sep 22, 2015, at 6:37 PM, Zhiliang Zhu  wrote:

Dear Experts,

Spark job is running on the cluster by yarn. Since the job can be submited at 
the place on the machine from the cluster,however, I would like to submit the 
job from another machine which does not belong to the cluster.I know for this, 
hadoop job could be done by way of another machine which is installed hadoop 
gateway which is usedto connect the cluster.
Then what would go for spark, is it same as hadoop... And where is the 
instruction doc for installing this gateway...
Thank you very much~~Zhiliang 


Re: Scala Limitation - Case Class definition with more than 22 arguments

2015-09-22 Thread Ted Yu
Can you switch to 2.11 ?

The following has been fixed in 2.11:

Otherwise consider packaging related values into a case class of their own.

On Tue, Sep 22, 2015 at 8:48 PM, satish chandra j 

> HI All,
> Do we have any alternative solutions in Scala to avoid limitation in
> defining a Case Class having more than 22 arguments
> We are using Scala version 2.10.2, currently I need to define a case class
> with 37 arguments but getting an error as "*error: Implementation
> restriction: case classes cannot have more than 22 parameters.*"
> It would be a great help if any inputs on the same
> Regards,
> Satish Chandra

JdbcRDD Constructor

2015-09-22 Thread satish chandra j
HI All,

JdbcRDD constructor has following parameters,


scala.Function0 getConnection, String sql, *long
long upperBound, int numPartitions*, scala.Function1>
> evidence$1)

where the below parameters *lowerBound* refers to Lower boundary of
entire data, *upperBound *refers to Upper boundary of entire data and
*refer to Number of partitions

Source table to which JbdcRDD is fetching data from Oracle DB has more than
500 records but its confusing when I tried several executions by changing
"numPartitions" parameter

LowerBound,UpperBound,numPartitions: Output Count

0 ,100  ,1   : 100

0 ,100  ,2   : 151

0 ,100  ,3   : 201

Please help me in understanding the why Output count is 151 if
numPartitions is 2 and Output count is 201 if numPartitions is 3


Satish Chandra

Re: spark-avro takes a lot time to load thousands of files

2015-09-22 Thread Deenar Toraskar

Can you elaborate why are you using a broadcast variable to concatenate
many Avro files into a single ORC file. Look at wholetextfiles on Spark

SparkContext.wholeTextFiles lets you read a directory containing multiple
small text files, and returns each of them as (filename, content) pairs.
This is in contrast with textFile, which would return one record per line
in each file.
You can then process this RDD in parallel over the cluster, convert to a
dataframe and save as a ORC file.


Re: Partitions on RDDs

2015-09-22 Thread Yashwanth Kumar
In the first rdd transformation (eg: reading from a file
sc.textfile("path",partition)), the partition you specify will be applied to
all further transformations and actions from this rdd.

In few places repartitioning your rdd will give a added advantage.
Repartition is usually done during actions stage.

View this message in context:
Sent from the Apache Spark User List mailing list archive at

To unsubscribe, e-mail:
For additional commands, e-mail:

Partitions on RDDs

2015-09-22 Thread XIANDI
I'm always confused by the partitions. We may have many RDDs in the code. Do
we need to partition on all of them? Do the rdds get rearranged among all
the nodes whenever we do a partition? What is a wise way of doing

View this message in context:
Sent from the Apache Spark User List mailing list archive at

To unsubscribe, e-mail:
For additional commands, e-mail:

Re: HDP 2.3 support for Spark 1.5.x

2015-09-22 Thread Zhan Zhang
Hi Krishna,

For the time being, you can download from upstream, and it should be running OK 
for HDP2.3.  For hdp specific problem, you can ask in Hortonworks forum.


Zhan Zhang

On Sep 22, 2015, at 3:42 PM, Krishna Sankar 
> wrote:


  *   We have HDP 2.3 installed just now. It comes with Spark 1.3.x. The 
current wisdom is that it will support the 1.4.x train (which is good, need 
DataFrame et al).
  *   What is the plan to support Spark 1.5.x ? Can we install 1.5.0 on HDP 2.3 
? Or will Spark 1.5.x support be in HDP 2.3.x and if so ~when ?

Cheers & Thanks

Re: WAL on S3

2015-09-22 Thread Michal Čizmazia
I am trying to use pluggable WAL, but it can be used only with
checkpointing turned on. Thus I still need have a Hadoop-compatible file

Is there something like pluggable checkpointing?

Or can WAL be used without checkpointing? What happens when WAL is
available but the checkpoint directory is lost?


On 18 September 2015 at 05:47, Tathagata Das  wrote:

> I dont think it would work with multipart upload either. The file is not
> visible until the multipart download is explicitly closed. So even if each
> write a part upload, all the parts are not visible until the multiple
> download is closed.
> TD
> On Fri, Sep 18, 2015 at 1:55 AM, Steve Loughran 
> wrote:
>> > On 17 Sep 2015, at 21:40, Tathagata Das  wrote:
>> >
>> > Actually, the current WAL implementation (as of Spark 1.5) does not
>> work with S3 because S3 does not support flushing. Basically, the current
>> implementation assumes that after write + flush, the data is immediately
>> durable, and readable if the system crashes without closing the WAL file.
>> This does not work with S3 as data is durable only and only if the S3 file
>> output stream is cleanly closed.
>> >
>> more precisely, unless you turn multipartition uploads on, the S3n/s3a
>> clients Spark uses *doesn't even upload anything to s3*.
>> It's not a filesystem, and you have to bear that in mind.
>> Amazon's own s3 client used in EMR behaves differently; it may be usable
>> as a destination (I haven't tested)

Re: Why is 1 executor overworked and other sit idle?

2015-09-22 Thread Richard Eggert
If there's only one partition, by definition it will only be handled by one
executor. Repartition to divide the work up. Note that this will also
result in multiple output files,  however. If you absolutely need them to
be combined into a single file,  I suggest using the Unix/Linux 'cat'
command to concatenate the files afterwards.

On Sep 22, 2015 9:20 AM, "Ted Yu"  wrote:

> Have you tried using repartition to spread the load ?
> Cheers
> On Sep 22, 2015, at 4:22 AM, Chirag Dewan 
> wrote:
> Hi,
> I am using Spark to access around 300m rows in Cassandra.
> My job is pretty simple as I am just mapping my row into a CSV format and
> saving it as a text file.
> public String call(CassandraRow row)
> throws Exception {
> StringBuilder sb = new StringBuilder();
> sb.append(row.getString(10));
> sb.append(",");
> sb.append(row.getString(11));
> sb.append(",");
> sb.append(row.getString(8));
> sb.append(",");
> sb.append(row.getString(7));
> return
> sb.toString();
> }
> My map methods looks like this.
> I am having a 3 node cluster. I observe that driver starts on Node A. And
> executors are spawned on all 3 nodes. But the executor of Node B or C are
> doing all the tasks. It starts a saveasTextFile job with 1 output partition
> and stores the RDDs in memory and also commits the file on local file
> system.
> This executor is using a lot of system memory and CPU while others are
> sitting idle.
> Am I doing something wrong? Is my RDD correctly partitioned?
> Thanks in advance.
> Chirag

Re: WAL on S3

2015-09-22 Thread Michal Čizmazia
My understanding of pluggable WAL was that it eliminates the need for
having a Hadoop-compatible file system [1].

What is the use of pluggable WAL when it can be only used together with
checkpointing which still requires a Hadoop-compatible file system?


On 22 September 2015 at 19:57, Tathagata Das 

> 1. Currently, the WAL can be used only with checkpointing turned on,
> because it does not make sense to recover from WAL if there is not
> checkpoint information to recover from.
> 2. Since the current implementation saves the WAL in the checkpoint
> directory, they share the fate -- if checkpoint directory is deleted, then
> both checkpoint info and WAL info is deleted.
> 3. Checkpointing is currently not pluggable. Why do do you want that?
> On Tue, Sep 22, 2015 at 4:53 PM, Michal Čizmazia 
> wrote:
>> I am trying to use pluggable WAL, but it can be used only with
>> checkpointing turned on. Thus I still need have a Hadoop-compatible file
>> system.
>> Is there something like pluggable checkpointing?
>> Or can WAL be used without checkpointing? What happens when WAL is
>> available but the checkpoint directory is lost?
>> Thanks!
>> On 18 September 2015 at 05:47, Tathagata Das  wrote:
>>> I dont think it would work with multipart upload either. The file is not
>>> visible until the multipart download is explicitly closed. So even if each
>>> write a part upload, all the parts are not visible until the multiple
>>> download is closed.
>>> TD
>>> On Fri, Sep 18, 2015 at 1:55 AM, Steve Loughran 
>>> wrote:

 > On 17 Sep 2015, at 21:40, Tathagata Das  wrote:
 > Actually, the current WAL implementation (as of Spark 1.5) does not
 work with S3 because S3 does not support flushing. Basically, the current
 implementation assumes that after write + flush, the data is immediately
 durable, and readable if the system crashes without closing the WAL file.
 This does not work with S3 as data is durable only and only if the S3 file
 output stream is cleanly closed.

 more precisely, unless you turn multipartition uploads on, the S3n/s3a
 clients Spark uses *doesn't even upload anything to s3*.

 It's not a filesystem, and you have to bear that in mind.

 Amazon's own s3 client used in EMR behaves differently; it may be
 usable as a destination (I haven't tested)


Re: Partitions on RDDs

2015-09-22 Thread Richard Eggert
In general,  RDDs get partitioned automatically without programmer
intervention. You generally don't need to worry about them unless you need
to adjust the size/number of partitions or the partitioning scheme
according to the needs of your application. Partitions get redistributed
among nodes whenever a shuffle occurs.  Repartitioning may cause a shuffle
to occur in some situations,  but it is not guaranteed to occur in all

In general,  smaller/more numerous partitions allow work to be distributed
among more workers,  but larger/fewer partitions allow work to be done in
larger chunks,  which may result in the work getting done more quickly as
long as all workers are kept busy, due to reduced overhead. Also,  the
number of partitions determines how many files get generated by actions
that save RDDs to files.

The maximum size of any one partition is ultimately limited by the
available memory of any single executor.

On Sep 22, 2015 6:42 PM, "XIANDI"  wrote:

> I'm always confused by the partitions. We may have many RDDs in the code.
> Do
> we need to partition on all of them? Do the rdds get rearranged among all
> the nodes whenever we do a partition? What is a wise way of doing
> partitions?
> --
> View this message in context:
> Sent from the Apache Spark User List mailing list archive at
> -
> To unsubscribe, e-mail:
> For additional commands, e-mail:

Re: WAL on S3

2015-09-22 Thread Tathagata Das
1. Currently, the WAL can be used only with checkpointing turned on,
because it does not make sense to recover from WAL if there is not
checkpoint information to recover from.

2. Since the current implementation saves the WAL in the checkpoint
directory, they share the fate -- if checkpoint directory is deleted, then
both checkpoint info and WAL info is deleted.

3. Checkpointing is currently not pluggable. Why do do you want that?

On Tue, Sep 22, 2015 at 4:53 PM, Michal Čizmazia  wrote:

> I am trying to use pluggable WAL, but it can be used only with
> checkpointing turned on. Thus I still need have a Hadoop-compatible file
> system.
> Is there something like pluggable checkpointing?
> Or can WAL be used without checkpointing? What happens when WAL is
> available but the checkpoint directory is lost?
> Thanks!
> On 18 September 2015 at 05:47, Tathagata Das  wrote:
>> I dont think it would work with multipart upload either. The file is not
>> visible until the multipart download is explicitly closed. So even if each
>> write a part upload, all the parts are not visible until the multiple
>> download is closed.
>> TD
>> On Fri, Sep 18, 2015 at 1:55 AM, Steve Loughran 
>> wrote:
>>> > On 17 Sep 2015, at 21:40, Tathagata Das  wrote:
>>> >
>>> > Actually, the current WAL implementation (as of Spark 1.5) does not
>>> work with S3 because S3 does not support flushing. Basically, the current
>>> implementation assumes that after write + flush, the data is immediately
>>> durable, and readable if the system crashes without closing the WAL file.
>>> This does not work with S3 as data is durable only and only if the S3 file
>>> output stream is cleanly closed.
>>> >
>>> more precisely, unless you turn multipartition uploads on, the S3n/s3a
>>> clients Spark uses *doesn't even upload anything to s3*.
>>> It's not a filesystem, and you have to bear that in mind.
>>> Amazon's own s3 client used in EMR behaves differently; it may be usable
>>> as a destination (I haven't tested)

SPARK_WORKER_INSTANCES was detected (set to '2')…This is deprecated in Spark 1.0+

2015-09-22 Thread Jacek Laskowski

This is for Spark 1.6.0-SNAPSHOT (SHA1

I've been toying with Spark Standalone cluster and have the following
file in conf/

➜  spark git:(master) ✗ cat conf/

# multiple Spark worker processes on a machine

It's fine and the cluster works fine. It's also fine according to

So far so good.

Just today I saw the following when I executed `spark-submit`:

15/09/23 00:48:26 WARN SparkConf:
SPARK_WORKER_INSTANCES was detected (set to '2').
This is deprecated in Spark 1.0+.

Please instead use:
 - ./spark-submit with --num-executors to specify the number of executors
 - spark.executor.instances to configure the number of instances in
the spark config.

Why is the deprecation? Is it not supported (not recommended given the
message) to have a Spark Standalone cluster and executing spark-submit
on the same machine?


Jacek Laskowski | |
Follow me at
Upvote at

To unsubscribe, e-mail:
For additional commands, e-mail:

Re: Spark as standalone or with Hadoop stack.

2015-09-22 Thread Jacek Laskowski
On Tue, Sep 22, 2015 at 10:03 PM, Ted Yu  wrote:

> To my knowledge, no one runs HBase on top of Mesos.


That sentence caught my attention. Could you explain the reasons for
not running HBase on Mesos, i.e. what makes Mesos inappropriate for


To unsubscribe, e-mail:
For additional commands, e-mail:

Re: Invalid checkpoint url

2015-09-22 Thread Tathagata Das
Are you getting this error in local mode?

On Tue, Sep 22, 2015 at 7:34 AM, srungarapu vamsi 

> Yes, I tried ssc.checkpoint("checkpoint"), it works for me as long as i
> don't use reduceByKeyAndWindow.
> When i start using "reduceByKeyAndWindow" it complains me with the error
> "Exception in thread "main" org.apache.spark.SparkException: Invalid
> checkpoint directory: file:/home/ubuntu/checkpoint/342e3171-01f3-48$
> 2-97be-e3862eb5c944/rdd-8"
> The stack trace is as below:
> Exception in thread "main" org.apache.spark.SparkException: Invalid
> checkpoint directory: file:/home/ubuntu/checkpoint/342e3171[22/9706$
> 2-97be-e3862eb5c944/rdd-8
> at
> org.apache.spark.rdd.CheckpointRDD.getPartitions(CheckpointRDD.scala:54)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
> at
> org.apache.spark.rdd.RDDCheckpointData.doCheckpoint(RDDCheckpointData.scala:97)
> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1415)
> at
> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
> at
> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417)
> at
> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
> at
> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417)
> at
> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
> at
> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417)
> at
> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
> at
> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417)
> at
> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
> at
> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417)
> at
> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
> at
> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1468)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1483)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1504)
> at
> com.datastax.spark.connector.streaming.DStreamFunctions$$anonfun$saveToCassandra$1.apply(DStreamFunctions.scala:33)
> at
> com.datastax.spark.connector.streaming.DStreamFunctions$$anonfun$saveToCassandra$1.apply(DStreamFunctions.scala:33)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
> at scala.util.Try$.apply(Try.scala:161)
> at
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:176)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
> at

Re: Apache Spark job in local[*] is slower than regular 1-thread Python program

2015-09-22 Thread Richard Eggert
Maybe it's just my phone,  but I don't see any code.
On Sep 22, 2015 11:46 AM, "juljoin"  wrote:

> Hello,
> I am trying to figure Spark out and I still have some problems with its
> speed, I can't figure them out. In short, I wrote two programs that loop
> through a 3.8Gb file and filter each line depending of if a certain word is
> present.
> I wrote a one-thread python program doing the job and I obtain:
> - for the 3.8Gb file:
> / lines found: 82100
>  in: *10.54 seconds*/
>  - no filter, just looping through the file:
> / in: 01.65 seconds/
> The Spark app doing the same and executed on 8 threads gives:
>  - for the 3.8Gb file:
> / lines found: 82100
>  in: *18.27 seconds*/
>  - for a 38Mb file:
> /lines found: 821
> in: 2.53 seconds/
> I must do something wrong to obtain a result twice as slow on the 8 threads
> than on 1 thread.
> 1. First, I thought it might be because of the setting-up cost of Spark.
> But
> for smaller files it only takes 2 seconds which makes this option
> improbable.
> 2. Looping through the file takes up 1.65 seconds (thank you SSD ^_^ ),
> processing takes up the other 9seconds (for the python app).
> -> This is why I thought splitting it up on the different processes will
> definitely speed it up.
> Note: Increasing the number of threads in Spark improves the speed (from 57
> seconds with 1 thread to 18 seconds with 8 threads). But still, there is a
> big difference in performance between simple python and Spark, it must be
> my
> doing!
> Can someone point me out on what I am doing wrong? That would be greatly
> appreciated :) I am new with all this big data stuff.
> *Here is the code for the Spark app:*
> *And the python code:*
> Thank you for reading up to this point :)
> Have a nice day!
> - Julien
> --
> View this message in context:
> Sent from the Apache Spark User List mailing list archive at
> -
> To unsubscribe, e-mail:
> For additional commands, e-mail:

Re: Streaming Receiver Imbalance Problem

2015-09-22 Thread Tathagata Das
Also, you could switch to the Direct KAfka API which was first released as
experimental in 1.3. In 1.5 we graduated it from experimental, but its
quite usable in Spark 1.3.1


On Tue, Sep 22, 2015 at 7:45 PM, SLiZn Liu  wrote:

> Cool, we are still sticking with 1.3.1, will upgrade to 1.5 ASAP. Thanks
> for the tips, Tathagata!
> On Wed, Sep 23, 2015 at 10:40 AM Tathagata Das 
> wrote:
>> A lot of these imbalances were solved in spark 1.5. Could you give that a
>> spin?
>> On Tue, Sep 22, 2015 at 12:17 AM, SLiZn Liu 
>> wrote:
>>> Hi spark users,
>>> In our Spark Streaming app via Kafka integration on Mesos, we initialed
>>> 3 receivers to receive 3 Kafka partitions, whereas records receiving rate
>>> imbalance been observed, with spark.streaming.receiver.maxRate is set
>>> to 120, sometimes 1 of which receives very close to the limit while the
>>> other two only at roughly fifty per second.
>>> This may be caused by previous receiver failure, where one of the
>>> receivers’ receiving rate drop to 0. We restarted the Spark Streaming app,
>>> and the imbalance began. We suspect that the partition which received by
>>> the failing receiver got jammed, and the other two receivers cannot take up
>>> its data.
>>> The 3-nodes cluster tends to run slowly, nearly all the tasks is
>>> registered at the node with previous receiver failure(I used unionto
>>> combine 3 receivers’ DStream, thus I expect the combined DStream is
>>> well distributed across all nodes), cannot guarantee to finish one batch in
>>> a single batch time, stages get piled up, and the digested log shows as
>>> following:
>>> ...
>>> 5728.399: [GC (Allocation Failure) [PSYoungGen: 6954678K->17088K(6961152K)] 
>>> 7074614K->138108K(20942336K), 0.0203877 secs] [Times: user=0.20 sys=0.00, 
>>> real=0.02 secs]
>>> ...
>>> 5/09/22 13:33:35 ERROR TaskSchedulerImpl: Ignoring update with state 
>>> FINISHED for TID 77219 because its task set is gone (this is likely the 
>>> result of
>>> receiving duplicate task finished status updates)
>>> ...
>>> the two type of log was printed in execution of some (not all) stages.
>>> My configurations:
>>> # of cores on each node: 64
>>> # of nodes: 3
>>> batch time is set to 10 seconds
>>> spark.streaming.receiver.maxRate120
>>> spark.streaming.blockInterval   160  // set to the value that 
>>> divides 10 seconds approx. to  total cores, which is 64, to max out all the 
>>> nodes: 10s * 1000 / 64
>>>  // this one doesn't seem to 
>>> work, since the young gen / old gen ratio is nearly 0.3 instead of 0.1
>>> anyone got an idea? Appreciate for your patience.
>>> BR,
>>> Todd Leo
>>> ​

Re: Invalid checkpoint url

2015-09-22 Thread Tathagata Das
Bingo! That is the problem. The solution is now obvious I presume :)

On Tue, Sep 22, 2015 at 9:41 PM, srungarapu vamsi 

> @Das,
> No, i am getting in the cluster mode.
> I think i understood why i am getting this error, please correct me if i
> am wrong.
> Reason is:
> checkpointing writes rdd to disk, so this checkpointing happens on all
> workers. Whenever, spark has to read back the rdd , checkpoint directory
> should be reachable to all the workers and should be a common place where
> workers can write to and read from. This  asks for commonly accessible file
> system like nfs or hdfs or s3.
> So, if i give ssc.checkpoint("some local directory"), since workers are
> not able to read the rdds from the other worker's checkpoint directory , i
> am facing the above mentioned error.
> With this understanding, i am creating a t2 medium, hdfs 2.7.1 node and
> pointing the check point directory to "hdfs://ip:port/path/to/directory"
> Please correct me if i am wrong.
> On Wed, Sep 23, 2015 at 4:53 AM, Tathagata Das 
> wrote:
>> Are you getting this error in local mode?
>> On Tue, Sep 22, 2015 at 7:34 AM, srungarapu vamsi <
>>> wrote:
>>> Yes, I tried ssc.checkpoint("checkpoint"), it works for me as long as i
>>> don't use reduceByKeyAndWindow.
>>> When i start using "reduceByKeyAndWindow" it complains me with the error
>>> "Exception in thread "main" org.apache.spark.SparkException: Invalid
>>> checkpoint directory: file:/home/ubuntu/checkpoint/342e3171-01f3-48$
>>> 2-97be-e3862eb5c944/rdd-8"
>>> The stack trace is as below:
>>> Exception in thread "main" org.apache.spark.SparkException: Invalid
>>> checkpoint directory: file:/home/ubuntu/checkpoint/342e3171[22/9706$
>>> 2-97be-e3862eb5c944/rdd-8
>>> at
>>> org.apache.spark.rdd.CheckpointRDD.getPartitions(CheckpointRDD.scala:54)
>>> at
>>> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
>>> at
>>> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
>>> at scala.Option.getOrElse(Option.scala:120)
>>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
>>> at
>>> org.apache.spark.rdd.RDDCheckpointData.doCheckpoint(RDDCheckpointData.scala:97)
>>> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1415)
>>> at
>>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>>> at
>>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>>> at scala.collection.immutable.List.foreach(List.scala:318)
>>> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417)
>>> at
>>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>>> at
>>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>>> at scala.collection.immutable.List.foreach(List.scala:318)
>>> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417)
>>> at
>>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>>> at
>>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>>> at
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>> at
>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417)
>>> at
>>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>>> at
>>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>>> at
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>> at
>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417)
>>> at
>>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>>> at
>>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>>> at
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>> at
>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417)
>>> at
>>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>>> at
>>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>>> at
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>> at
>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417)
>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1468)
>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1483)
>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1504)

Re: Py4j issue with Python Kafka Module

2015-09-22 Thread Tathagata Das
SPARK_CLASSPATH is I believe deprecated right now. So I am not surprised
that there is some difference in the code paths.

On Tue, Sep 22, 2015 at 9:45 PM, Saisai Shao  wrote:

> I think it is something related to class loader, the behavior is different
> for classpath and --jars. If you want to know the details I think you'd
> better dig out some source code.
> Thanks
> Jerry
> On Tue, Sep 22, 2015 at 9:10 PM, ayan guha  wrote:
>> I must have been gone mad :) Thanks for pointing it out. I downloaded
>> 1.5.0 assembly jar and added it in SPARK_CLASSPATH.
>> However, I am getting a new error now
>> >>> kvs =
>> KafkaUtils.createDirectStream(ssc,['spark'],{"":'l
>> ocalhost:9092'})
>>   Spark Streaming's Kafka libraries not found in class path. Try one of
>> the foll
>> owing.
>>   1. Include the Kafka library and its dependencies with in the
>>  spark-submit command as
>>  $ bin/spark-submit --packages
>> org.apache.spark:spark-streaming-kafka:1.5.0
>> ...
>>   2. Download the JAR of the artifact from Maven Central
>> /,
>>  Group Id = org.apache.spark, Artifact Id =
>> spark-streaming-kafka-assembly,
>> Version = 1.5.0.
>>  Then, include the jar in the spark-submit command as
>>  $ bin/spark-submit --jars  ...
>> Traceback (most recent call last):
>>   File "", line 1, in 
>>   File
>> "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\pyspark
>> \streaming\", line 130, in createDirectStream
>> raise e
>> py4j.protocol.Py4JJavaError: An error occurred while calling
>> o30.loadClass.
>> : java.lang.ClassNotFoundException:
>> org.apache.spark.streaming.kafka.KafkaUtilsP
>> ythonHelper
>> at Source)
>> at java.lang.ClassLoader.loadClass(Unknown Source)
>> at java.lang.ClassLoader.loadClass(Unknown Source)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
>> at java.lang.reflect.Method.invoke(Unknown Source)
>> at py4j.reflection.MethodInvoker.invoke(
>> at
>> py4j.reflection.ReflectionEngine.invoke(
>> at py4j.Gateway.invoke(
>> at
>> py4j.commands.AbstractCommand.invokeMethod(
>> at py4j.commands.CallCommand.execute(
>> at
>> at Source)
>> >>> os.environ['SPARK_CLASSPATH']
>> 'D:\\sw\\spark-streaming-kafka-assembly_2.10-1.5.0'
>> >>>
>> So I launched pyspark with --jars with the assembly jar. Now it is
>> working.
>> THANK YOU for help.
>> Curiosity:  Why adding it to SPARK CLASSPATH did not work?
>> Best
>> Ayan
>> On Wed, Sep 23, 2015 at 2:25 AM, Saisai Shao 
>> wrote:
>>> I think you're using the wrong version of kafka assembly jar, I think
>>> Python API from direct Kafka stream is not supported for Spark 1.3.0, you'd
>>> better change to version 1.5.0, looks like you're using Spark 1.5.0, why
>>> you choose Kafka assembly 1.3.0?
>>> D:\sw\spark-streaming-kafka-assembly_2.10-1.3.0.jar
>>> On Tue, Sep 22, 2015 at 6:41 AM, ayan guha  wrote:

 I have added spark assembly jar to SPARK CLASSPATH

 >>> print os.environ['SPARK_CLASSPATH']

 Now  I am facing below issue with a test topic

 >>> ssc = StreamingContext(sc, 2)
 >>> kvs =
 Traceback (most recent call last):
   File "", line 1, in 
 \streaming\", line 126, in createDirectStream
 jstream = helper.createDirectStream(ssc._jssc, kafkaParams,
 set(topics), jfr
 "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\lib\py4\py4j\", line 538, in __call__
 \sql\", line 36, in deco
 return f(*a, **kw)
 "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\lib\py4\py4j\", line 304, in get_return_value

Re: Remove duplicate keys by always choosing first in file.

2015-09-22 Thread Sean Owen
The point is that this only works if you already knew the file was
presented in order within and across partitions, which was the
original problem anyway. I don't think it is in general, but in
practice, I do imagine it's already in the expected order from
textFile. Maybe under the hood this ends up being ensured by

So, adding the index and sorting on it doesn't add anything.

On Tue, Sep 22, 2015 at 4:38 PM, Adrian Tanase  wrote:
> just give zipWithIndex a shot, use it early in the pipeline. I think it
> provides exactly the info you need, as the index is the original line number
> in the file, not the index in the partition.
> Sent from my iPhone
> On 22 Sep 2015, at 17:50, Philip Weaver  wrote:
> Thanks. If textFile can be used in a way that preserves order, than both the
> partition index and the index within each partition should be consistent,
> right?
> I overcomplicated the question by asking about removing duplicates.
> Fundamentally I think my question is, how does one sort lines in a file by
> line number.
> On Tue, Sep 22, 2015 at 6:15 AM, Adrian Tanase  wrote:
>> By looking through the docs and source code, I think you can get away with
>> rdd.zipWithIndex to get the index of each line in the file, as long as you
>> define the parallelism upfront:
>> sc.textFile("", 4)
>> You can then just do .groupBy(…).mapValues(_.sortBy(…).head) - I’m
>> skimming through some tuples, hopefully this is clear enough.
>> -adrian
>> From: Philip Weaver
>> Date: Tuesday, September 22, 2015 at 3:26 AM
>> To: user
>> Subject: Remove duplicate keys by always choosing first in file.
>> I am processing a single file and want to remove duplicate rows by some
>> key by always choosing the first row in the file for that key.
>> The best solution I could come up with is to zip each row with the
>> partition index and local index, like this:
>> rdd.mapPartitionsWithIndex { case (partitionIndex, rows) =>
>> { case (row, localIndex) => (row.key,
>> ((partitionIndex, localIndex), row)) }
>> }
>> And then using reduceByKey with a min ordering on the (partitionIndex,
>> localIndex) pair.
>> First, can i count on SparkContext.textFile to read the lines in such that
>> the partition indexes are always increasing so that the above works?
>> And, is there a better way to accomplish the same effect?
>> Thanks!
>> - Philip

To unsubscribe, e-mail:
For additional commands, e-mail:

Re: Using Spark for portfolio manager app

2015-09-22 Thread Thúy Hằng Lê
That's great answer Andrian.
I find a lots of information here. I have direction for application now, i
will try your suggestion :)

Vào Thứ Ba, ngày 22 tháng 9 năm 2015, Adrian Tanase  đã

>1. reading from kafka has exactly once guarantees - we are using it in
>production today (with the direct receiver)
>1. ​you will probably have 2 topics, loading both into spark and
>   joining / unioning as needed is not an issue
>   2. tons of optimizations you can do there, assuming everything else
>   works
>2. ​for ad-hoc query I would say you absolutely need to look at
>external storage
>1. ​querying the Dstream or spark's RDD's directly should be done
>   mostly for aggregates/metrics, not by users
>   2. if you look at HBase or Cassandra for storage then 50k
>   writes /sec are not a problem at all, especially combined with a smart
>   client that does batch puts (like async hbase
>   )
>   3. you could also consider writing the updates to another kafka
>   topic and have  a different component that updates the DB, if you think 
> of
>   other optimisations there
>3. ​by stats I assume you mean metrics (operational or business)
>1. ​there are multiple ways to do this, however I would not encourage
>   you to query spark directly, especially if you need an archive/history 
> of
>   your datapoints
>   2. we are using OpenTSDB (we already have a HBase cluster) +
>   Grafana for dashboarding
>   3. collecting the metrics is a bit hairy in a streaming app - we
>   have experimented with both accumulators and RDDs specific for metrics -
>   chose the RDDs that write to OpenTSDB using foreachRdd
> ​-adrian
> --
> *From:* Thúy Hằng Lê  >
> *Sent:* Sunday, September 20, 2015 7:26 AM
> *To:* Jörn Franke
> *Cc:*
> *Subject:* Re: Using Spark for portfolio manager app
> Thanks Adrian and Jorn for the answers.
> Yes, you're right there are lot of things I need to consider if I want to
> use Spark for my app.
> I still have few concerns/questions from your information:
> 1/ I need to combine trading stream with tick stream, I am planning to use
> Kafka for that
> If I am using approach #2 (Direct Approach) in this tutorial
> Spark Streaming + Kafka Integration Guide - Spark 1.4.1 ...
> Spark Streaming + Kafka Integration Guide. Apache Kafka is
> publish-subscribe messaging rethought as a distributed, partitioned,
> replicated commit log service.
> Will I receive exactly one semantics? Or I have to add some logic in my
> code to archive that.
> As your suggestion of using delta update, exactly one semantic is required
> for this application.
> 2/ For ad-hoc query, I must output of Spark to external storage and query
> on that right?
> Is there any way to do ah-hoc query on Spark? my application could have
> 50k updates per second at pick time.
> Persistent to external storage lead to high latency in my app.
> 3/ How to get real-time statistics from Spark,
> In  most of the Spark streaming examples, the statistics are echo to the
> stdout.
> However, I want to display those statics on GUI, is there any way to
> retrieve data from Spark directly without using external Storage?
> 2015-09-19 16:23 GMT+07:00 Jörn Franke  >:
>> If you want to be able to let your users query their portfolio then you
>> may want to think about storing the current state of the portfolios in
>> hbase/phoenix or alternatively a cluster of relationaldatabases can make
>> sense. For the rest you may use Spark.
>> Le sam. 19 sept. 2015 à 4:43, Thúy Hằng Lê > > a écrit :
>>> Hi all,
>>> I am going to build a financial application for Portfolio Manager, where
>>> each portfolio contains a list of stocks, the number of shares purchased,
>>> and the purchase price.
>>> Another source of information is stocks price from market data. The
>>> application need to calculate real-time gain or lost of each stock in each
>>> portfolio ( compared to the purchase price).
>>> I am new with Spark, i know using Spark Streaming I can aggregate
>>> portfolio possitions in real-time, for example:
>>> user A contains:
>>>   - 100 IBM stock with transactionValue=$15000
>>>   - 500 AAPL stock with transactionValue=$11400
>>> Now given the stock prices 

Re: Py4j issue with Python Kafka Module

2015-09-22 Thread Saisai Shao
I think you're using the wrong version of kafka assembly jar, I think
Python API from direct Kafka stream is not supported for Spark 1.3.0, you'd
better change to version 1.5.0, looks like you're using Spark 1.5.0, why
you choose Kafka assembly 1.3.0?


On Tue, Sep 22, 2015 at 6:41 AM, ayan guha  wrote:

> Hi
> I have added spark assembly jar to SPARK CLASSPATH
> >>> print os.environ['SPARK_CLASSPATH']
> D:\sw\spark-streaming-kafka-assembly_2.10-1.3.0.jar
> Now  I am facing below issue with a test topic
> >>> ssc = StreamingContext(sc, 2)
> >>> kvs =
> KafkaUtils.createDirectStream(ssc,['spark'],{"":'l
> ocalhost:9092'})
> Traceback (most recent call last):
>   File "", line 1, in 
>   File
> "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\pyspark
> \streaming\", line 126, in createDirectStream
> jstream = helper.createDirectStream(ssc._jssc, kafkaParams,
> set(topics), jfr
> omOffsets)
>   File
> "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\lib\py4
>\py4j\", line 538, in __call__
>   File
> "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\pyspark
> \sql\", line 36, in deco
> return f(*a, **kw)
>   File
> "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\lib\py4
>\py4j\", line 304, in get_return_value
> py4j.protocol.Py4JError: An error occurred while calling
> o22.createDirectStream.
>  Trace:
> py4j.Py4JException: Method createDirectStream([class
> org.apache.spark.streaming.
>, class java.util.HashMap, class
> java.util.HashSet,
>  class java.util.HashMap]) does not exist
> at
> py4j.reflection.ReflectionEngine.getMethod(
> at
> py4j.reflection.ReflectionEngine.getMethod(
> at py4j.Gateway.invoke(
> at
> py4j.commands.AbstractCommand.invokeMethod(
> at py4j.commands.CallCommand.execute(
> at
> at Source)
> >>>
> Am I doing something wrong?
> --
> Best Regards,
> Ayan Guha

Re: passing SparkContext as parameter

2015-09-22 Thread Petr Novak
And probably the original source code

On Tue, Sep 22, 2015 at 10:37 AM, Petr Novak  wrote:

> To complete design pattern:
> Petr
> On Mon, Sep 21, 2015 at 10:02 PM, Romi Kuntsman  wrote:
>> Cody, that's a great reference!
>> As shown there - the best way to connect to an external database from the
>> workers is to create a connection pool on (each) worker.
>> The driver mass pass, via broadcast, the connection string, but not the
>> connect object itself and not the spark context.
>> On Mon, Sep 21, 2015 at 5:31 PM Cody Koeninger 
>> wrote:
>>> That isn't accurate, I think you're confused about foreach.
>>> Look at
>>> On Mon, Sep 21, 2015 at 7:36 AM, Romi Kuntsman  wrote:
 foreach is something that runs on the driver, not the workers.

 if you want to perform some function on each record from cassandra, you
 need to do, which will run distributed on the spark

 *Romi Kuntsman*, *Big Data Engineer*

 On Mon, Sep 21, 2015 at 3:29 PM, Priya Ch  wrote:

> Yes, but i need to read from cassandra db within a spark
> transformation..something like..
> dstream.forachRDD{
> rdd=> rdd.foreach {
>  message =>
>  sc.cassandraTable()
>   .
>   .
>   .
> }
> }
> Since rdd.foreach gets executed on workers, how can i make
> sparkContext available on workers ???
> Regards,
> Padma Ch
> On Mon, Sep 21, 2015 at 5:10 PM, Ted Yu  wrote:
>> You can use broadcast variable for passing connection information.
>> Cheers
>> On Sep 21, 2015, at 4:27 AM, Priya Ch 
>> wrote:
>> can i use this sparkContext on executors ??
>> In my application, i have scenario of reading from db for certain
>> records in rdd. Hence I need sparkContext to read from DB (cassandra in 
>> our
>> case),
>> If sparkContext couldn't be sent to executors , what is the
>> workaround for this ??
>> On Mon, Sep 21, 2015 at 3:06 PM, Petr Novak 
>> wrote:
>>> add @transient?
>>> On Mon, Sep 21, 2015 at 11:27 AM, Priya Ch <
>>>> wrote:
 Hello All,

 How can i pass sparkContext as a parameter to a method in an
 object. Because passing sparkContext is giving me TaskNotSerializable

 How can i achieve this ?

 Padma Ch



Re: Deploying spark-streaming application on production

2015-09-22 Thread Petr Novak
Or if there is an option on MQTT server to block events ingestion towards
Spark but still keep receiving and buffering them in MQTT and wait for ACK,
then it would be possible just to gracefully shutdown Spark job to finish
what is in its buffers and restart.


On Tue, Sep 22, 2015 at 10:53 AM, Petr Novak  wrote:

> Ahh the problem probably is async ingestion to Spark receiver buffers,
> hence WAL is required I would say.
> Petr
> On Tue, Sep 22, 2015 at 10:53 AM, Petr Novak  wrote:
>> If MQTT can be configured with long enough timeout for ACK and can buffer
>> enough events while waiting for Spark Job restart then I think one could do
>> even without WAL assuming that Spark job shutdowns gracefully. Possibly
>> saving its own custom metadata somewhere, f.e. Zookeeper, if required to
>> restart Spark job.
>> On Tue, Sep 22, 2015 at 10:53 AM, Petr Novak 
>> wrote:
>>> Ahh the problem probably is async ingestion to Spark receiver buffers,
>>> hence WAL is required I would say.
>>> On Tue, Sep 22, 2015 at 10:52 AM, Petr Novak 
>>> wrote:
 If MQTT can be configured with long enough timeout for ACK and can
 buffer enough events while waiting for Spark Job restart then I think one
 could do even without WAL assuming that Spark job shutdowns gracefully.
 Possibly saving its own custom metadata somewhere, f.e. Zookeeper, if
 required to restart Spark job.


 On Mon, Sep 21, 2015 at 8:49 PM, Adrian Tanase 

> I'm wondering, isn't this the canonical use case for WAL + reliable
> receiver?
> As far as I know you can tune Mqtt server to wait for ack on messages
> (qos level 2?).
> With some support from the client libray you could achieve exactly
> once semantics on the read side, if you ack message only after writing it
> to WAL, correct?
> -adrian
> Sent from my iPhone
> On 21 Sep 2015, at 12:35, Petr Novak  wrote:
> In short there is no direct support for it in Spark AFAIK. You will
> either manage it in MQTT or have to add another layer of indirection -
> either in-memory based (observable streams, in-mem db) or disk based
> (Kafka, hdfs files, db) which will keep you unprocessed events.
> Now realizing, there is support for backpressure in v1.5.0 but I don't
> know if it could be exploited aka I don't know if it is possible to
> decouple event reading into memory and actual processing code in Spark
> which could be swapped on the fly. Probably not without some custom built
> facility for it.
> Petr
> On Mon, Sep 21, 2015 at 11:26 AM, Petr Novak 
> wrote:
>> I should read my posts at least once to avoid so many typos.
>> Hopefully you are brave enough to read through.
>> Petr
>> On Mon, Sep 21, 2015 at 11:23 AM, Petr Novak 
>> wrote:
>>> I think you would have to persist events somehow if you don't want
>>> to miss them. I don't see any other option there. Either in MQTT if it 
>>> is
>>> supported there or routing them through Kafka.
>>> There is WriteAheadLog in Spark but you would have decouple stream
>>> MQTT reading and processing into 2 separate job so that you could 
>>> upgrade
>>> the processing one assuming the reading one would be stable (without
>>> changes) across versions. But it is problematic because there is no easy
>>> way how to share DStreams between jobs - you would have develop your own
>>> facility for it.
>>> Alternatively the reading job could could save MQTT event in its the
>>> most raw form into files - to limit need to change code - and then the
>>> processing job would work on top of it using Spark streaming based on
>>> files. I this is inefficient and can get quite complex if you would 
>>> like to
>>> make it reliable.
>>> Basically either MQTT supports prsistence (which I don't know) or
>>> there is Kafka for these use case.
>>> Another option would be I think to place observable streams in
>>> between MQTT and Spark streaming with bakcpressure as far as you could
>>> perform upgrade till buffers fills up.
>>> I'm sorry that it is not thought out well from my side, it is just a
>>> brainstorm but it might lead you somewhere.
>>> Regards,
>>> Petr
>>> On Mon, Sep 21, 2015 at 10:09 AM, Jeetendra Gangele <
>>>> wrote:
 Hi All,

 I have an spark streaming application with batch (10 ms) which is
 reading the MQTT channel and dumping the data from MQTT to HDFS.

 So suppose if I have to deploy new application 

Re: Troubleshooting "Task not serializable" in Spark/Scala environments

2015-09-22 Thread Huy Banh
The header should be sent from driver to workers already by spark. And
therefore in sparkshell it works.

In scala IDE, the code inside an app class. Then you need to check if the
app class is serializable.

On Tue, Sep 22, 2015 at 9:13 AM Alexis Gillain <> wrote:

> As Igor said header must be available on each partition so the solution is
> broadcasting it.
> About the difference between repl and scala IDE, it may come from the
> sparkContext setup as REPL define one by default.
> 2015-09-22 8:41 GMT+08:00 Igor Berman :
>> Try to broadcasr header
>> On Sep 22, 2015 08:07, "Balaji Vijayan" 
>> wrote:
>>> Howdy,
>>> I'm a relative novice at Spark/Scala and I'm puzzled by some behavior
>>> that I'm seeing in 2 of my local Spark/Scala environments (Scala for
>>> Jupyter and Scala IDE) but not the 3rd (Spark Shell). The following code
>>> throws the following stack trace error in the former 2 environments but
>>> executes successfully in the 3rd. I'm not sure how to go about
>>> troubleshooting my former 2 environments so any assistance is greatly
>>> appreciated.
>>> Code:
>>> //get file
>>> val logFile = "s3n://file"
>>> val logData  = sc.textFile(logFile)
>>> // header
>>> val header =  logData.first
>>> // filter out header
>>> val sample = logData.filter(!_.contains(header)).map {
>>>  line => line.replaceAll("['\"]","").substring(0,line.length()-1)
>>> }.takeSample(false,100,12L)
>>> Stack Trace:
>>> org.apache.spark.SparkException: Task not serializable
>>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315)
>>> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
>>> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
>>> org.apache.spark.SparkContext.clean(SparkContext.scala:1893)
>>> org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:311)
>>> org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:310)
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
>>> org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
>>> org.apache.spark.rdd.RDD.filter(RDD.scala:310)
>>> cmd6$$user$$anonfun$3.apply(Main.scala:134)
>>> cmd6$$user$$anonfun$3.apply(Main.scala:133)
>>> org.apache.spark.SparkConf
>>> Serialization stack:
>>> - object not serializable (class: org.apache.spark.SparkConf, value: 
>>> org.apache.spark.SparkConf@309ed441)
>>> - field (class: cmd2$$user, name: conf, type: class 
>>> org.apache.spark.SparkConf)
>>> - object (class cmd2$$user, cmd2$$user@75a88665)
>>> - field (class: cmd6, name: $ref$cmd2, type: class cmd2$$user)
>>> - object (class cmd6, cmd6@5e9e8f0b)
>>> - field (class: cmd6$$user, name: $outer, type: class cmd6)
>>> - object (class cmd6$$user, cmd6$$user@692f81c)
>>> - field (class: cmd6$$user$$anonfun$3, name: $outer, type: class 
>>> cmd6$$user)
>>> - object (class cmd6$$user$$anonfun$3, )
>>> - field (class: cmd6$$user$$anonfun$3$$anonfun$apply$1, name: $outer, 
>>> type: class cmd6$$user$$anonfun$3)
>>> - object (class cmd6$$user$$anonfun$3$$anonfun$apply$1, )
>>> org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
>>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
>>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
>>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
>>> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
>>> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
>>> org.apache.spark.SparkContext.clean(SparkContext.scala:1893)
>>> org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:311)
>>> org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:310)
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
>>> org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
>>> org.apache.spark.rdd.RDD.filter(RDD.scala:310)
>>> cmd6$$user$$anonfun$3.apply(Main.scala:134)
>>> cmd6$$user$$anonfun$3.apply(Main.scala:133)
>>> Thanks,
>>> Balaji
> --
> Alexis GILLAIN

Re: Spark Web UI + NGINX

2015-09-22 Thread Akhil Das
Can you not just tunnel it?

Like on Machine A:

ssh -L 8080: machineB

And on your local machine:

ssh -L 80: machineA

And then simply open http://localhost/ that will show up the spark ui
running on machineB.

People at digitalOcean has made wonder article on how to setup reverse
proxy with Apache server,
You can have a look at that too.

Best Regards

On Thu, Sep 17, 2015 at 2:36 PM, Renato Perini 

> Hello!
> I'm trying to set up a reverse proxy (using nginx) for the Spark Web UI.
> I have 2 machines:
>1) Machine A, with a public IP. This machine will be used to access
> Spark Web UI on the Machine B through its private IP address.
>2) Machine B, where Spark is installed (standalone master cluster, 1
> worker node and the history server) not accessible from the outside.
> Basically I want to access the Spark Web UI through my Machine A using the
> URL:
> http://machine_A_ip_address/spark
> Any advised setup for Spark Web UI + nginx?
> Thank you.

Re: Spark Lost executor && shuffle.FetchFailedException

2015-09-22 Thread Akhil Das
If you can look a bit deeper in the executor logs, then you might find the
root cause for this issue. Also make sure the ports (seems 34869 here) are
accessible between all the machines.

Best Regards

On Mon, Sep 21, 2015 at 12:40 PM,  wrote:

> Hi All:
> When I write the data to the hive dynamic partition table, many errors and 
> warnings as following happen...
> Is the reason that shuffle output is so large ?
> =
> 15/09/21 14:53:09 ERROR cluster.YarnClusterScheduler: Lost executor 402 on 
> remote Rpc client disassociated
> =
> 15/09/21 14:53:27 WARN scheduler.TaskSetManager: Lost task 107.0 in stage 7.0 
> (TID 27601, FetchFailed(BlockManagerId(513, 
>, 34869), shuffleId=1, mapId=90, reduceId=107, message=
> org.apache.spark.shuffle.FetchFailedException: Failed to connect to 
>   at 
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
>   at 
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:84)
>   at 
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:84)
>   at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>   at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>   at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at 
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:216)
>   at 
>   at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>   at
>   at org.apache.spark.executor.Executor$
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(
>   at 
> java.util.concurrent.ThreadPoolExecutor$
>   at
> Caused by: Failed to connect to 
>   at 
>   at 
>   at 
>   at 
>   at 
>   at 
>   at 
> java.util.concurrent.Executors$
>   at java.util.concurrent.FutureTask$Sync.innerRun(
>   at
>   ... 3 more
> Caused by: 拒绝连接: 
>   at Method)
>   at 
>   at 
>   at 
>   at 
>   at 
>   at 

Re: How to get a new RDD by ordinarily subtract its adjacent rows

2015-09-22 Thread Zhiliang Zhu
Dear Sujit,
Since you are senior with Spark, I might not know whether it is convenient for 
you to help comment some on my dilemma 
while using spark to deal with R background application ...
Thank you very much!Zhiliang

 On Tuesday, September 22, 2015 1:45 AM, Zhiliang Zhu  

 Hi Romi,
I must show my sincere appreciation towards your kind & helpful help.
One more question, currently I am using spark to deal with financial data 
analysis, so lots of operations on R data.frame/matrix and stat/regressionare 
always called.However, SparkR currently is not that strong, most of its 
functions are from spark SQL and Mlib. Then, SQL and DataFrame is not as 
flexibly & easyas R operate on data.frame/matrix, moreover, now I do not decide 
how much function in Mlib would be used to R specific stat/regression .
I have also thought of only operating the data by way of spark Java, it is 
quite much hard to act as data.frame/matrix from R .I think I have lost in risk 
by those.
Would you help comment some on my points...
Thank you very much!Zhiliang


 On Tuesday, September 22, 2015 1:21 AM, Sujit Pal  

 Hi Zhiliang,
Haven't used the Java API but found this Javadoc page, may be helpful to you.

I think the equivalent Java code snippet might go something like this:
RDDFunctions.fromRDD(rdd1, ClassTag$.apply(Class)).sliding(2)
(the second parameter of fromRDD comes from this discussion 

There is also the SlidingRDD 

So maybe something like this:
new SlidingRDD(rdd1, 2, ClassTag$.apply(Class))

On Mon, Sep 21, 2015 at 9:16 AM, Zhiliang Zhu  wrote:

Hi Sujit,
I must appreciate your kind help very much~
It seems to be OK, however, do you know the corresponding spark Java API 
achievement...Is there any java API as scala sliding, and it seemed that I do 
not find spark scala's doc about sliding ...
Thank you very much~Zhiliang 

 On Monday, September 21, 2015 11:48 PM, Sujit Pal  

 Hi Zhiliang, 
Would something like this work?
val rdd2 = rdd1.sliding(2).map(v => v(1) - v(0))

On Mon, Sep 21, 2015 at 7:58 AM, Zhiliang Zhu  

Hi Romi,
Thanks very much for your kind help comment~~
In fact there is some valid backgroud of the application, it is about R data 
analysis #fund_nav_daily is a M X N (or M X 1) matrix or data.frame, col is 
each daily fund return, row is the daily date#fund_return_daily needs to count 
the each fund's daily return subtracted the previous day's return 
fund_return_daily <- diff(log(fund_nav_daily)) 
#the first row would be all 0, since there is no previous row ahead first row
fund_return_daily <- rbind(matrix(0,ncol = ncol(fund_return_daily)), 
fund_return_daily) ...
I need to exactly code the R program by way of spark, then RDD/DataFrame is 
used to replace R data.frame, however, I just found that it is VERY MUCH 
diffcult to make the spark program to flexibly descript & transform R backgroud 
applications.I think I have seriously lost myself into risk about this...
Would you help direct me some about the above coding issue... and my risk about 
practice in spark/R application...
I must show all my sincere thanks torwards your kind help.

P.S. currently sparkR in spark 1.4.1 , there is many bug in the API 
createDataFrame/except/unionAll, and it seemsthat spark Java has more functions 
than sparkR.Also, no specific R regression algorithmn is including in sparkR .
Best Regards,Zhiliang

 On Monday, September 21, 2015 7:36 PM, Romi Kuntsman  

 RDD is a set of data rows (in your case numbers), there is no meaning for the 
order of the items.
What exactly are you trying to accomplish?

Romi Kuntsman, Big Data Engineer

On Mon, Sep 21, 2015 at 2:29 PM, Zhiliang Zhu  

Dear , 

I have took lots of days to think into this issue, however, without any 
success...I shall appreciate your all kind help.
There is an RDD rdd1, I would like get a new RDD rdd2, each row in 
rdd2[ i ] = rdd1[ i ] - rdd[i - 1] .What kinds of API or function would I use...

Thanks very much!John





Re: spark.mesos.coarse impacts memory performance on mesos

2015-09-22 Thread Tim Chen
Hi Utkarsh,

Just to be sure you originally set coarse to false but then to true? Or is
it the other way around?

Also what's the exception/stack trace when the driver crashed?

Coarse grain mode per-starts all the Spark executor backends, so has the
least overhead comparing to fine grain. There is no single answer for which
mode you should use, otherwise we would have removed one of those modes
since it depends on your use case.

There are quite some factor why there could be huge GC pauses, but I don't
think if you switch to standalone your GC pauses go away.


On Mon, Sep 21, 2015 at 5:18 PM, Utkarsh Sengar 

> I am running Spark 1.4.1 on mesos.
> The spark job does a "cartesian" of 4 RDDs (aRdd, bRdd, cRdd, dRdd) of
> size 100, 100, 7 and 1 respectively. Lets call it prouctRDD.
> Creation of "aRdd" needs data pull from multiple data sources, merging it
> and creating a tuple of JavaRdd, finally aRDD looks something like this:
> JavaRDD>
> bRdd, cRdd and dRdds are just List<> of values.
> Then apply a transformation on prouctRDD and finally call "saveAsTextFile"
> to save the result of my transformation.
> Problem:
> By setting "spark.mesos.coarse=true", creation of "aRdd" works fine but
> driver crashes while doing the cartesian but when I do
> "spark.mesos.coarse=true", the job works like a charm. I am running spark
> on mesos.
> Comments:
> So I wanted to understand what role does "spark.mesos.coarse=true" plays
> in terms of memory and compute performance. My findings look counter
> intuitive since:
>1. "spark.mesos.coarse=true" just runs on 1 mesos task, so there
>should be an overhead of spinning up mesos tasks which should impact the
>2. What config for "spark.mesos.coarse" recommended for running spark
>on mesos? Or there is no best answer and it depends on usecase?
>3. Also by setting "spark.mesos.coarse=true", I notice that I get huge
>GC pauses even with small dataset but a long running job (but this can be a
>separate discussion).
> Let me know if I am missing something obvious, we are learning spark
> tuning as we move forward :)
> --
> Thanks,
> -Utkarsh

Re: passing SparkContext as parameter

2015-09-22 Thread Petr Novak
To complete design pattern:


On Mon, Sep 21, 2015 at 10:02 PM, Romi Kuntsman  wrote:

> Cody, that's a great reference!
> As shown there - the best way to connect to an external database from the
> workers is to create a connection pool on (each) worker.
> The driver mass pass, via broadcast, the connection string, but not the
> connect object itself and not the spark context.
> On Mon, Sep 21, 2015 at 5:31 PM Cody Koeninger  wrote:
>> That isn't accurate, I think you're confused about foreach.
>> Look at
>> On Mon, Sep 21, 2015 at 7:36 AM, Romi Kuntsman  wrote:
>>> foreach is something that runs on the driver, not the workers.
>>> if you want to perform some function on each record from cassandra, you
>>> need to do, which will run distributed on the spark
>>> workers
>>> *Romi Kuntsman*, *Big Data Engineer*
>>> On Mon, Sep 21, 2015 at 3:29 PM, Priya Ch 
>>> wrote:
 Yes, but i need to read from cassandra db within a spark
 transformation..something like..


 rdd=> rdd.foreach {
  message =>

 Since rdd.foreach gets executed on workers, how can i make sparkContext
 available on workers ???

 Padma Ch

 On Mon, Sep 21, 2015 at 5:10 PM, Ted Yu  wrote:

> You can use broadcast variable for passing connection information.
> Cheers
> On Sep 21, 2015, at 4:27 AM, Priya Ch 
> wrote:
> can i use this sparkContext on executors ??
> In my application, i have scenario of reading from db for certain
> records in rdd. Hence I need sparkContext to read from DB (cassandra in 
> our
> case),
> If sparkContext couldn't be sent to executors , what is the workaround
> for this ??
> On Mon, Sep 21, 2015 at 3:06 PM, Petr Novak 
> wrote:
>> add @transient?
>> On Mon, Sep 21, 2015 at 11:27 AM, Priya Ch <
>>> wrote:
>>> Hello All,
>>> How can i pass sparkContext as a parameter to a method in an
>>> object. Because passing sparkContext is giving me TaskNotSerializable
>>> Exception.
>>> How can i achieve this ?
>>> Thanks,
>>> Padma Ch


Re: spark + parquet + schema name and metadata

2015-09-22 Thread Borisa Zivkovic
thanks for answer.

I need this in order to be able to track schema metadata.

basically when I create parquet files from Spark I want to be able to "tag"
them in some way (giving the schema appropriate name or attaching some
key/values) and then it is fairly easy to get basic metadata about parquet
files when processing and discovering those later on.

On Mon, 21 Sep 2015 at 18:17 Cheng Lian  wrote:

> Currently Spark SQL doesn't support customizing schema name and
> metadata. May I know why these two matters in your use case? Some
> Parquet data models, like parquet-avro, do support it, while some others
> don't (e.g. parquet-hive).
> Cheng
> On 9/21/15 7:13 AM, Borisa Zivkovic wrote:
> > Hi,
> >
> > I am trying to figure out how to write parquet metadata when
> > persisting DataFrames to parquet using Spark (1.4.1)
> >
> > I could not find a way to change schema name (which seems to be
> > hardcoded to root) and also how to add data to key/value metadata in
> > parquet footer.
> >
> > org.apache.parquet.hadoop.metadata.FileMetaData#getKeyValueMetaData
> >
> > org.apache.parquet.schema.Type#getName
> >
> > thanks
> >
> >

Uneven distribution of tasks among workers in Spark/GraphX 1.5.0

2015-09-22 Thread dmytro
I have a large list of edges as a 5000 partition RDD. Now, I'm doing a simple
shuffle-heavy operation:

val g = Graph.fromEdges(edges, ...).partitionBy(...)
val subs = Graph(g.collectEdges(...), g.edges).collectNeighbors()

The job gets divided into 9 stages. My cluster has 3 workers in the same
local network.
Even though Spark 1.5.0 works much faster and first several stages run on
the full load,
starting from one of the stages, a single machine suddenly grabs takes 99%
of the tasks
while others take as many tasks as they have cores and wait until the one
finishes everything. Interestingly, on Spark 1.3.1, all stages get their
tasks distributed
evenly among the cluster machines. I'm suspecting that this could be a bug
in 1.5.0

View this message in context:
Sent from the Apache Spark User List mailing list archive at

To unsubscribe, e-mail:
For additional commands, e-mail:

Re: Cache after filter Vs Writing back to HDFS

2015-09-22 Thread Akhil Das
Instead of .map you can try doing a .mapPartitions and see the performance.

Best Regards

On Fri, Sep 18, 2015 at 2:47 AM, Gavin Yue  wrote:

> For a large dataset, I want to filter out something and then do the
> computing intensive work.
> What I am doing now:
> Data.filter(somerules).cache()
> Data.count()
> But this sometimes takes unusually long time due to cache missing and
> recalculation.
> So I changed to this way.
> Data.filter.saveasTextFile()
> sc.testFile(),map(timeintesivecompute)
> Second one is even faster.
> How could I tune the job to reach maximum performance?
> Thank you.

RE: Support of other languages?

2015-09-22 Thread Sun, Rui
Although the data is RDD[Array[Byte]] where content is not meaningful to Spark 
Core, it has to be on heap, as Spark Core manipulates RDD transformations on 

SPARK-10399 is irrelevant. it aims to manipulate off-heap data using C++library 
via JNI. This is done in-process.

From: Rahul Palamuttam []
Sent: Thursday, September 17, 2015 3:09 PM
To: Sun, Rui
Subject: Re: Support of other languages?


Thank you for both responses.
Sun you pointed out the exact issue I was referring to, which is 
copying,serializing, deserializing, the byte-array between the JVM heap and the 
worker memory.
It also doesn't make sense why the byte-array should be kept on-heap, since the 
data of the parent partition is just a byte array that only makes sense to a 
python environment.
Shouldn't we be writing the byte-array off-heap and provide supporting 
interfaces for outside processes to read and interact with the data?
I'm probably oversimplifying what is really required to do this.

There is a recent JIRA which I thought was interesting with respect to our 
discussion. JIRA

There's also a suggestion, at the bottom of the JIRA, that considers exposing 
on-heap memory which is pretty interesting.

- Rahul Palamuttam

On Wed, Sep 9, 2015 at 4:52 AM, Sun, Rui 
> wrote:
Hi, Rahul,

To support a new language other than Java/Scala in spark, it is different 
between RDD API and DataFrame API.


RDD is a distributed collection of the language-specific data types whose 
representation is unknown to JVM. Also transformation functions for RDD are 
written in the language which can't be executed on JVM. That's why worker 
processes of the language runtime are needed in such case. Generally, to 
support RDD API in the language, a subclass of the Scala RDD is needed on JVM 
side (for example, PythonRDD for python, RRDD for R) where compute() is 
overridden to send the serialized parent partition data (yes, what you mean 
data copy happens here) and the serialized transformation function via socket 
to the worker process. The worker process deserializes the partition data and 
the transformation function, then applies the function to the data. The result 
is sent back to JVM via socket after serialization as byte array. From JVM's 
viewpoint, the resulting RDD is a collection of byte arrays.

Performance is a concern in such case, as there are overheads, like launching 
of worker processes, serialization/deserialization of partition data, 
bi-directional communication cost of the data.
Besides, as the JVM can't know the real representation of data in the RDD, it 
is difficult and complex to support shuffle and aggregation operations. The 
Spark Core's built-in aggregator and shuffle can't be utilized directly. There 
should be language specific implementation to support these operations, which 
cause additional overheads.

Additional memory occupation by the worker processes is also a concern.

For DataFrame API:

Things are much simpler than RDD API. For DataFrame, data is read from Data 
Source API and is represented as native objects within the JVM and there is no 
language-specific transformation functions. Basically, DataFrame API in the 
language are just method wrappers to the corresponding ones in Scala DataFrame 

Performance is not a concern. The computation is done on native objects in JVM, 
virtually no performance lost.

The only exception is UDF in DataFrame. The UDF() has to rely on language 
worker processes, similar to RDD API.

-Original Message-
From: Rahul Palamuttam 
Sent: Tuesday, September 8, 2015 10:54 AM
Subject: Support of other languages?

I wanted to know more about how Spark supports R and Python, with respect to 
what gets copied into the language environments.

To clarify :

I know that PySpark utilizes py4j sockets to pass pickled python functions 
between the JVM and the python daemons. However, I wanted to know how it passes 
the data from the JVM into the daemon environment. I assume it has to copy the 
data over into the new environment, since python can't exactly operate in JVM 
heap space, (or can it?).

I had the same question with respect to SparkR, though I'm not completely 
familiar with how they pass around native R code through the worker JVM's.

The primary question I wanted to ask is does Spark make a second copy of data, 
so language-specific daemons can operate on the data? What are some of the 
other limitations encountered when we try to offer multi-language support, 
whether it's in performance or in general software architecture.
With python in particular the collect operation must be first written to disk 
and then read back from the python driver process.

Would appreciate any 

Re: Lost tasks in Spark SQL join jobs

2015-09-22 Thread Akhil Das
If you look a bit in the error logs, you can possibly see other issues like
GC over head etc, which causes the next set of tasks to fail.

Best Regards

On Thu, Sep 17, 2015 at 9:26 AM, Gang Bai  wrote:

> Hi all,
> I’m joining two tables on a specific attribute. The job is like
> `sqlContext.sql(“SELECT * FROM tableA LEFT JOIN tableB on
> tableA.uuid=tableB.uuid”)`, where tableA and tableB are two temp tables, of
> which both sizes are around 100 GBs and are not skewed on 'uuid’.
> As I run the application, I constantly see logs saying two sets of error:
> One is like:
> 15/09/17 11:06:50 WARN TaskSetManager: Lost task 2946.0 in stage 1.0 (TID
> 1228,
> /data2/hadoop/local/usercache/megatron/appcache/application_1435099124107_3613186/blockmgr-4761cb8d-0dbd-4832-98ef-e64a787e09d4/2f/
> (No such file or directory)
> at Method)
> at
> at
> at
> at
> org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$5$$anonfun$apply$2.apply(ExternalSorter.scala:759)
> at
> org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$5$$anonfun$apply$2.apply(ExternalSorter.scala:758)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at
> org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$5.apply(ExternalSorter.scala:758)
> at
> org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$5.apply(ExternalSorter.scala:754)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at
> org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:754)
> at
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:71)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at
> at
> org.apache.spark.executor.Executor$
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(
> at
> java.util.concurrent.ThreadPoolExecutor$
> at
> and the other is like:
> 5/09/17 11:06:50 ERROR YarnScheduler: Lost executor 925 on
> remote Akka client disassociated
> 15/09/17 11:06:50 INFO TaskSetManager: Re-queueing tasks for 925 from
> TaskSet 1.0
> 15/09/17 11:06:50 WARN ReliableDeliverySupervisor: Association with remote
> system [akka.tcp://sparkExecutor@] has failed, address is
> now gated for [5000] ms. Reason is: [Disassociated].
> 15/09/17 11:06:50 WARN TaskSetManager: Lost task 1321.0 in stage 1.0 (TID
> 1142, ExecutorLostFailure (executor 925 lost)
> 15/09/17 11:06:50 INFO DAGScheduler: Executor lost: 925 (epoch 1659)
> 15/09/17 11:06:50 INFO BlockManagerMasterActor: Trying to remove executor
> 925 from BlockManagerMaster.
> 15/09/17 11:06:50 INFO BlockManagerMasterActor: Removing block manager
> BlockManagerId(925,, 51494)
> 15/09/17 11:06:50 INFO BlockManagerMaster: Removed 925 successfully in
> removeExecutor
> And increasing the num of executors and executor memory didn’t help. Seems
> this is a very basic use case of SQL. So my question is how to solve this
> issue?
> Thanks,
> Gang
> -
> To unsubscribe, e-mail:
> For additional commands, e-mail:

Re: Has anyone used the Twitter API for location filtering?

2015-09-22 Thread Akhil Das
​That's because sometime getPlace returns null and calling getLang over
null throws up either null pointer exception or noSuchMethodError. You need
to filter out those statuses which doesn't include location data.​

Best Regards

On Fri, Sep 18, 2015 at 12:46 AM, Jo Sunad  wrote:

> I've been trying to filter for GeoLocation, Place or even Time Zone and I
> keep getting null values. I think I got one Place in 20 minutes of the app
> running (without any filters on tweets).
> Is this normal? Do I have to try querying rather than filtering?
> my code is following TD's example...
> val stream = TwitterUtils
> val hashtags = (status => status.getPlace().getName(),
> status.getText())
> getText, getFollowers, etc all work fine, I just don't get anything
> location based (and getLang() for some reason throws a noMethodError).
> Thanks for the help!

Re: Stopping criteria for gradient descent

2015-09-22 Thread Yanbo Liang
Hi Nishanth,

The convergence tolerance is a condition which decides iteration
In LogisticRegression with SGD optimization, it depends on the difference
of weight vectors.
But in GBT it depends on the validate error on the held out test set.

2015-09-18 4:09 GMT+08:00 nishanthps :

> Hi,
> I am running LogisticRegressionWithSGD in spark 1.4.1 and it always takes
> 100 iterations to train (which is the default). It never meets the
> convergence criteria, shouldn't the convergence criteria for SGD be based
> on
> difference in logloss or the difference in accuracy on a held out test set
> instead of the difference in weight vectors?
> Code for convergence criteria:
> Thanks,
> Nishanth
> --
> View this message in context:
> Sent from the Apache Spark User List mailing list archive at
> -
> To unsubscribe, e-mail:
> For additional commands, e-mail:

Re: SparkContext declared as object variable

2015-09-22 Thread Akhil Das
Its a "value" not a variable, and what are you parallelizing here?

Best Regards

On Fri, Sep 18, 2015 at 11:21 PM, Priya Ch 

> Hello All,
>   Instead of declaring sparkContext in main, declared as object variable
> as -
>  object sparkDemo
> {
>  val conf = new SparkConf
>  val sc = new SparkContext(conf)
>   def main(args:Array[String])
>   {
> val baseRdd = sc.parallelize()
>   }
> }
> But this piece of code is giving :
> org.apache.spark.SparkException: Failed to get
> broadcast_5_piece0 of broadcast_5
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1155)
> at
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
> at
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
> at
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
> at
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58)
> at
> at
> org.apache.spark.executor.Executor$
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(
> at
> java.util.concurrent.ThreadPoolExecutor$
> at
> Caused by: org.apache.spark.SparkException: Failed to get
> broadcast_5_piece0 of broadcast_5
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:137)
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:137)
> at scala.Option.getOrElse(Option.scala:120)
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:136)
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at
> $apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:119)
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:174)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1152)
> Why should't we declare sc as object variable ???
> Regards,
> Padma Ch

Re: How to speed up MLlib LDA?

2015-09-22 Thread Marko Asplund

I did some profiling for my LDA prototype code that requests topic
distributions from a model.
According to Java Mission Control more than 80 % of execution time during
sample interval is spent in the following methods:

org.apache.commons.math3.util.FastMath.log(double); count: 337; 47.07%
org.apache.commons.math3.special.Gamma.digamma(double); count: 164; 22.91%
org.apache.commons.math3.util.FastMath.log(double, double[]); count: 50;
java.lang.Double.valueOf(double); count: 31; 4.33%

Is there any way of using the API more optimally?
Are there any opportunities for optimising the "topicDistributions" code
path in MLlib?

My code looks like this:

// executed once
val model = LocalLDAModel.load(ctx, ModelFileName)

// executed four times
val samples = Transformers.toSparseVectors(vocabularySize,
ctx.parallelize(Seq(input))) // fast
model.topicDistributions( // <== this
seems to take about 4 seconds to execute


Re: Deploying spark-streaming application on production

2015-09-22 Thread Petr Novak
If MQTT can be configured with long enough timeout for ACK and can buffer
enough events while waiting for Spark Job restart then I think one could do
even without WAL assuming that Spark job shutdowns gracefully. Possibly
saving its own custom metadata somewhere, f.e. Zookeeper, if required to
restart Spark job.

On Tue, Sep 22, 2015 at 10:53 AM, Petr Novak  wrote:

> Ahh the problem probably is async ingestion to Spark receiver buffers,
> hence WAL is required I would say.
> On Tue, Sep 22, 2015 at 10:52 AM, Petr Novak  wrote:
>> If MQTT can be configured with long enough timeout for ACK and can buffer
>> enough events while waiting for Spark Job restart then I think one could do
>> even without WAL assuming that Spark job shutdowns gracefully. Possibly
>> saving its own custom metadata somewhere, f.e. Zookeeper, if required to
>> restart Spark job.
>> Petr
>> On Mon, Sep 21, 2015 at 8:49 PM, Adrian Tanase  wrote:
>>> I'm wondering, isn't this the canonical use case for WAL + reliable
>>> receiver?
>>> As far as I know you can tune Mqtt server to wait for ack on messages
>>> (qos level 2?).
>>> With some support from the client libray you could achieve exactly once
>>> semantics on the read side, if you ack message only after writing it to
>>> WAL, correct?
>>> -adrian
>>> Sent from my iPhone
>>> On 21 Sep 2015, at 12:35, Petr Novak  wrote:
>>> In short there is no direct support for it in Spark AFAIK. You will
>>> either manage it in MQTT or have to add another layer of indirection -
>>> either in-memory based (observable streams, in-mem db) or disk based
>>> (Kafka, hdfs files, db) which will keep you unprocessed events.
>>> Now realizing, there is support for backpressure in v1.5.0 but I don't
>>> know if it could be exploited aka I don't know if it is possible to
>>> decouple event reading into memory and actual processing code in Spark
>>> which could be swapped on the fly. Probably not without some custom built
>>> facility for it.
>>> Petr
>>> On Mon, Sep 21, 2015 at 11:26 AM, Petr Novak 
>>> wrote:
 I should read my posts at least once to avoid so many typos. Hopefully
 you are brave enough to read through.


 On Mon, Sep 21, 2015 at 11:23 AM, Petr Novak 

> I think you would have to persist events somehow if you don't want to
> miss them. I don't see any other option there. Either in MQTT if it is
> supported there or routing them through Kafka.
> There is WriteAheadLog in Spark but you would have decouple stream
> MQTT reading and processing into 2 separate job so that you could upgrade
> the processing one assuming the reading one would be stable (without
> changes) across versions. But it is problematic because there is no easy
> way how to share DStreams between jobs - you would have develop your own
> facility for it.
> Alternatively the reading job could could save MQTT event in its the
> most raw form into files - to limit need to change code - and then the
> processing job would work on top of it using Spark streaming based on
> files. I this is inefficient and can get quite complex if you would like 
> to
> make it reliable.
> Basically either MQTT supports prsistence (which I don't know) or
> there is Kafka for these use case.
> Another option would be I think to place observable streams in between
> MQTT and Spark streaming with bakcpressure as far as you could perform
> upgrade till buffers fills up.
> I'm sorry that it is not thought out well from my side, it is just a
> brainstorm but it might lead you somewhere.
> Regards,
> Petr
> On Mon, Sep 21, 2015 at 10:09 AM, Jeetendra Gangele <
>> wrote:
>> Hi All,
>> I have an spark streaming application with batch (10 ms) which is
>> reading the MQTT channel and dumping the data from MQTT to HDFS.
>> So suppose if I have to deploy new application jar(with changes in
>> spark streaming application) what is the best way to deploy, currently I 
>> am
>> doing as below
>> 1.killing the running streaming app using yarn application -kill ID
>> 2. and then starting the application again
>> Problem with above approach is since we are not persisting the events
>> in MQTT we will miss the events for the period of deploy.
>> how to handle this case?
>> regards
>> jeeetndra


Re: Deploying spark-streaming application on production

2015-09-22 Thread Petr Novak
Ahh the problem probably is async ingestion to Spark receiver buffers,
hence WAL is required I would say.


On Tue, Sep 22, 2015 at 10:53 AM, Petr Novak  wrote:

> If MQTT can be configured with long enough timeout for ACK and can buffer
> enough events while waiting for Spark Job restart then I think one could do
> even without WAL assuming that Spark job shutdowns gracefully. Possibly
> saving its own custom metadata somewhere, f.e. Zookeeper, if required to
> restart Spark job.
> On Tue, Sep 22, 2015 at 10:53 AM, Petr Novak  wrote:
>> Ahh the problem probably is async ingestion to Spark receiver buffers,
>> hence WAL is required I would say.
>> On Tue, Sep 22, 2015 at 10:52 AM, Petr Novak 
>> wrote:
>>> If MQTT can be configured with long enough timeout for ACK and can
>>> buffer enough events while waiting for Spark Job restart then I think one
>>> could do even without WAL assuming that Spark job shutdowns gracefully.
>>> Possibly saving its own custom metadata somewhere, f.e. Zookeeper, if
>>> required to restart Spark job.
>>> Petr
>>> On Mon, Sep 21, 2015 at 8:49 PM, Adrian Tanase 
>>> wrote:
 I'm wondering, isn't this the canonical use case for WAL + reliable

 As far as I know you can tune Mqtt server to wait for ack on messages
 (qos level 2?).
 With some support from the client libray you could achieve exactly once
 semantics on the read side, if you ack message only after writing it to
 WAL, correct?


 Sent from my iPhone

 On 21 Sep 2015, at 12:35, Petr Novak  wrote:

 In short there is no direct support for it in Spark AFAIK. You will
 either manage it in MQTT or have to add another layer of indirection -
 either in-memory based (observable streams, in-mem db) or disk based
 (Kafka, hdfs files, db) which will keep you unprocessed events.

 Now realizing, there is support for backpressure in v1.5.0 but I don't
 know if it could be exploited aka I don't know if it is possible to
 decouple event reading into memory and actual processing code in Spark
 which could be swapped on the fly. Probably not without some custom built
 facility for it.


 On Mon, Sep 21, 2015 at 11:26 AM, Petr Novak 

> I should read my posts at least once to avoid so many typos. Hopefully
> you are brave enough to read through.
> Petr
> On Mon, Sep 21, 2015 at 11:23 AM, Petr Novak 
> wrote:
>> I think you would have to persist events somehow if you don't want to
>> miss them. I don't see any other option there. Either in MQTT if it is
>> supported there or routing them through Kafka.
>> There is WriteAheadLog in Spark but you would have decouple stream
>> MQTT reading and processing into 2 separate job so that you could upgrade
>> the processing one assuming the reading one would be stable (without
>> changes) across versions. But it is problematic because there is no easy
>> way how to share DStreams between jobs - you would have develop your own
>> facility for it.
>> Alternatively the reading job could could save MQTT event in its the
>> most raw form into files - to limit need to change code - and then the
>> processing job would work on top of it using Spark streaming based on
>> files. I this is inefficient and can get quite complex if you would like 
>> to
>> make it reliable.
>> Basically either MQTT supports prsistence (which I don't know) or
>> there is Kafka for these use case.
>> Another option would be I think to place observable streams in
>> between MQTT and Spark streaming with bakcpressure as far as you could
>> perform upgrade till buffers fills up.
>> I'm sorry that it is not thought out well from my side, it is just a
>> brainstorm but it might lead you somewhere.
>> Regards,
>> Petr
>> On Mon, Sep 21, 2015 at 10:09 AM, Jeetendra Gangele <
>>> wrote:
>>> Hi All,
>>> I have an spark streaming application with batch (10 ms) which is
>>> reading the MQTT channel and dumping the data from MQTT to HDFS.
>>> So suppose if I have to deploy new application jar(with changes in
>>> spark streaming application) what is the best way to deploy, currently 
>>> I am
>>> doing as below
>>> 1.killing the running streaming app using yarn application -kill ID
>>> 2. and then starting the application again
>>> Problem with above approach is since we are not persisting the
>>> events in MQTT we will miss the events for the period of deploy.
>>> how to handle this case?

Re: Long GC pauses with Spark SQL 1.3.0 and billion row tables

2015-09-22 Thread dmytro
Could it be that your data is skewed? Do you have variable-length column

View this message in context:
Sent from the Apache Spark User List mailing list archive at

To unsubscribe, e-mail:
For additional commands, e-mail:

Re: passing SparkContext as parameter

2015-09-22 Thread Priya Ch
I have scenario like this -

 I read dstream of messages from kafka. Now if my rdd contains 10 messages,
for each message I need to query the cassandraDB, do some modification and
update the records in DB. If there is no option of passing sparkContext to
workers to read.write into DB, the only option is to use
CassandraConnextor.withSession  If yes, for writing to table, should i
construct the entire INSERT statement for thousands of fields in the DB ?
Is this way of writing code is an optimized way ???

On Tue, Sep 22, 2015 at 1:32 AM, Romi Kuntsman  wrote:

> Cody, that's a great reference!
> As shown there - the best way to connect to an external database from the
> workers is to create a connection pool on (each) worker.
> The driver mass pass, via broadcast, the connection string, but not the
> connect object itself and not the spark context.
> On Mon, Sep 21, 2015 at 5:31 PM Cody Koeninger  wrote:
>> That isn't accurate, I think you're confused about foreach.
>> Look at
>> On Mon, Sep 21, 2015 at 7:36 AM, Romi Kuntsman  wrote:
>>> foreach is something that runs on the driver, not the workers.
>>> if you want to perform some function on each record from cassandra, you
>>> need to do, which will run distributed on the spark
>>> workers
>>> *Romi Kuntsman*, *Big Data Engineer*
>>> On Mon, Sep 21, 2015 at 3:29 PM, Priya Ch 
>>> wrote:
 Yes, but i need to read from cassandra db within a spark
 transformation..something like..


 rdd=> rdd.foreach {
  message =>

 Since rdd.foreach gets executed on workers, how can i make sparkContext
 available on workers ???

 Padma Ch

 On Mon, Sep 21, 2015 at 5:10 PM, Ted Yu  wrote:

> You can use broadcast variable for passing connection information.
> Cheers
> On Sep 21, 2015, at 4:27 AM, Priya Ch 
> wrote:
> can i use this sparkContext on executors ??
> In my application, i have scenario of reading from db for certain
> records in rdd. Hence I need sparkContext to read from DB (cassandra in 
> our
> case),
> If sparkContext couldn't be sent to executors , what is the workaround
> for this ??
> On Mon, Sep 21, 2015 at 3:06 PM, Petr Novak 
> wrote:
>> add @transient?
>> On Mon, Sep 21, 2015 at 11:27 AM, Priya Ch <
>>> wrote:
>>> Hello All,
>>> How can i pass sparkContext as a parameter to a method in an
>>> object. Because passing sparkContext is giving me TaskNotSerializable
>>> Exception.
>>> How can i achieve this ?
>>> Thanks,
>>> Padma Ch


Streaming Receiver Imbalance Problem

2015-09-22 Thread SLiZn Liu
Hi spark users,

In our Spark Streaming app via Kafka integration on Mesos, we initialed 3
receivers to receive 3 Kafka partitions, whereas records receiving rate
imbalance been observed, with spark.streaming.receiver.maxRate is set to 120,
sometimes 1 of which receives very close to the limit while the other two
only at roughly fifty per second.

This may be caused by previous receiver failure, where one of the
receivers’ receiving rate drop to 0. We restarted the Spark Streaming app,
and the imbalance began. We suspect that the partition which received by
the failing receiver got jammed, and the other two receivers cannot take up
its data.

The 3-nodes cluster tends to run slowly, nearly all the tasks is registered
at the node with previous receiver failure(I used unionto combine 3
receivers’ DStream, thus I expect the combined DStream is well distributed
across all nodes), cannot guarantee to finish one batch in a single batch
time, stages get piled up, and the digested log shows as following:

5728.399: [GC (Allocation Failure) [PSYoungGen:
6954678K->17088K(6961152K)] 7074614K->138108K(20942336K), 0.0203877
secs] [Times: user=0.20 sys=0.00, real=0.02 secs]

5/09/22 13:33:35 ERROR TaskSchedulerImpl: Ignoring update with state
FINISHED for TID 77219 because its task set is gone (this is likely
the result of
receiving duplicate task finished status updates)


the two type of log was printed in execution of some (not all) stages.

My configurations:
# of cores on each node: 64
# of nodes: 3
batch time is set to 10 seconds

spark.streaming.blockInterval   160  // set to the value that
divides 10 seconds approx. to  total cores, which is 64, to max out
all the nodes: 10s * 1000 / 64  // this one doesn't seem
to work, since the young gen / old gen ratio is nearly 0.3 instead of

anyone got an idea? Appreciate for your patience.

Todd Leo

Re: Spark Ingestion into Relational DB

2015-09-22 Thread Jörn Franke
In these cases you may want to have a separate oracle instance for the
batch process and another one for serving it to avoid  sla surprises.
Nevertheless, if data processing becomes more strategic cross-projects you
may think about job management and HDFS using Hadoop with Spark.

Le mar. 22 sept. 2015 à 8:02, Sri Eswari Devi Subbiah <> a écrit :

> Hi,
> Thanks for the reply. Let me explain our scenario little bit more.
> Currently we have multiple data feeds through files from different systems.
> We run batch jobs to extract the data from files, normalize that data,
> match that data against Oracle database and finally consolidate the cleaned
> data in Oracle.
> I am evaluating rather than running batch jobs, can I run spark streaming
> from the data files to finally write the cleansed data into Oracle
> database. Once the data is consolidated in Oracle, it serves as the source
> of truth for external users.
> Regards,
> Sri Eswari.
> On Mon, Sep 21, 2015 at 10:55 PM, Jörn Franke 
> wrote:
>> You do not need Hadoop. However, you should think about using it. If you
>> use Spark to load data directly from Oracle then your database might have
>> unexpected loads of data once a Spark node may fail. Additionally, the
>> Oracle Database, if it is not based on local disk, may have a storage
>> bottleneck. Furthermore, Spark standalone has no resource management
>> mechanism for supporting different slas, you may need yarn (hadoop) for
>> that. Finally, using the Oracle Database for storing all the data may be an
>> expensive exercise. What I have seen often is that hadoop is used for
>> storing all the data and managing the resources. Spark can be used for
>> machine learning over this data and the Oracle Database (or any relational
>> datastore, Nosql database, in-memory db) is used to serve the data to a lot
>> of users. This is also the basic idea behind the lambda architecture.
>> Le mar. 22 sept. 2015 à 7:13, Sri  a écrit :
>>> Hi,
>>> We have a usecase  where we get the dated from different systems and
>>> finally
>>> data will be consolidated into Oracle Database. Does spark is a valid
>>> useless for this scenario. Currently we also don't have any big data
>>> component. In case if we go with Spark to ingest data, does it require
>>> hadoop.
>>> --
>>> View this message in context:
>>> Sent from the Apache Spark User List mailing list archive at
>>> -
>>> To unsubscribe, e-mail:
>>> For additional commands, e-mail:

Py4j issue with Python Kafka Module

2015-09-22 Thread ayan guha

I have added spark assembly jar to SPARK CLASSPATH

>>> print os.environ['SPARK_CLASSPATH']

Now  I am facing below issue with a test topic

>>> ssc = StreamingContext(sc, 2)
>>> kvs =
Traceback (most recent call last):
  File "", line 1, in 
\streaming\", line 126, in createDirectStream
jstream = helper.createDirectStream(ssc._jssc, kafkaParams,
set(topics), jfr
"D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\lib\py4\py4j\", line 538, in __call__
\sql\", line 36, in deco
return f(*a, **kw)
"D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\lib\py4\py4j\", line 304, in get_return_value
py4j.protocol.Py4JError: An error occurred while calling
py4j.Py4JException: Method createDirectStream([class
org.apache.spark.streaming., class java.util.HashMap, class
 class java.util.HashMap]) does not exist


at py4j.Gateway.invoke(
at py4j.commands.CallCommand.execute(
at Source)


Am I doing something wrong?

Best Regards,
Ayan Guha

Re: spark-avro takes a lot time to load thousands of files

2015-09-22 Thread Daniel Haviv
I Agree but it's a constraint I have to deal with.
The idea is load these files and merge them into ORC.
When using hive on Tez it takes less than a minute. 


> On 22 בספט׳ 2015, at 16:00, Jonathan Coveney  wrote:
> having a file per record is pretty inefficient on almost any file system
> El martes, 22 de septiembre de 2015, Daniel Haviv 
>  escribió:
>> Hi,
>> We are trying to load around 10k avro files (each file holds only one 
>> record) using spark-avro but it takes over 15 minutes to load.
>> It seems that most of the work is being done at the driver where it created 
>> a broadcast variable for each file.
>> Any idea why is it behaving that way ?
>> Thank you.
>> Daniel

Re: Spark Streaming distributed job

2015-09-22 Thread Adrian Tanase
I think you need to dig into the custom receiver implementation. As long as the 
source is distributed and partitioned, the downstream .map, .foreachXX are all 
distributed as you would expect.

You could look at how the “classic” Kafka receiver is instantiated in the 
streaming guide and try to start from there:


On 9/22/15, 1:51 AM, ""  wrote:

>Please could you explain me what is exactly distributed when I launch a spark 
>streaming job over YARN cluster ?
>My code is something like :
>JavaDStream customReceiverStream = 
>JavaDStream incoming_msg =
>   new Function()
>   {
>   public String call(JMSEvent jmsEvent)
>   {
>   return jmsEvent.getText();
>   }
>   }
>   );
>incoming_msg.foreachRDD( new Function() {
>   public Void call(JavaRDD rdd) throws Exception {
>   rdd.foreachPartition(new VoidFunction() { 
>   @Override
>   public void call(Iterator msg) throws Exception 
> {
>   while (msg.hasNext()) {
>  // insert message in MongoDB
>So, in this code , at what step is done the distribution over YARN :
>- Does my receiver is distributed (and so all the rest also) ?
>- Does the foreachRDD is distributed (and so all the rest also)?
>- Does foreachPartition is distributed ?
>To unsubscribe, e-mail:
>For additional commands, e-mail:

RE: Long GC pauses with Spark SQL 1.3.0 and billion row tables

2015-09-22 Thread java8964
Or at least tell us how many partitions you are using.

> Date: Tue, 22 Sep 2015 02:06:15 -0700
> From:
> To:
> Subject: Re: Long GC pauses with Spark SQL 1.3.0 and billion row tables
> Could it be that your data is skewed? Do you have variable-length column
> types?
> --
> View this message in context: 
> Sent from the Apache Spark User List mailing list archive at
> -
> To unsubscribe, e-mail:
> For additional commands, e-mail:


2015-09-22 Thread Akhil Das
No, you can either set the configurations within your SparkConf's hadoop

  val hadoopConf = sparkContext.hadoopConfiguration
  hadoopConf.set("fs.s3n.awsAccessKeyId", s3Key)
  hadoopConf.set("fs.s3n.awsSecretAccessKey", s3Secret)

 or you can set it in the environment as:


Best Regards

On Mon, Sep 21, 2015 at 9:04 PM, Michel Lemay  wrote:

> Hi,
> It looks like spark does read AWS credentials from environment variable
> AWS_CREDENTIAL_FILE like awscli does.
> Mike

Re: Slow Performance with Apache Spark Gradient Boosted Tree training runs

2015-09-22 Thread Yashwanth Kumar
Hi vkutsenko,

Can you just give partitions to the input labeled rdd, like:

  data = MLUtils.loadLibSVMFile(,

Here, i used 5, since you have have 5 cores.

Also for further benchmark and performance tuning:

View this message in context:
Sent from the Apache Spark User List mailing list archive at

To unsubscribe, e-mail:
For additional commands, e-mail:

  1   2   >