Connection pool in workers

2015-02-28 Thread A . K . M . Ashrafuzzaman
Hi guys,
I am new to spark and we are running a small project that collects data from 
Kinesis and inserts in to mongo.
I would like to share a high level view of how it is done and would love you 
input on it.

I am fetching kinesis data and for each RDD
  -> Parsing String data
  -> Inserting into a mongo storage

So what I understand is when in each RDD "we are parsing data”, that is 
serialized and send to workers. So when I would want to write to mongo.
Each workers creates a new connection to write to data.

Is there any way I can use a connection pool? By the way I am using scala and 
spark streaming.


A.K.M. Ashrafuzzaman
Lead Software Engineer
NewsCred

(M) 880-175-5592433
Twitter | Blog | Facebook

Check out The Academy, your #1 source
for free content marketing resources

Re: Reg. Difference in Performance

2015-02-28 Thread Deep Pradhan
You mean the size of the data that we take?

Thank You
Regards,
Deep

On Sun, Mar 1, 2015 at 6:04 AM, Joseph Bradley 
wrote:

> Hi Deep,
>
> Compute times may not be very meaningful for small examples like those.
> If you increase the sizes of the examples, then you may start to observe
> more meaningful trends and speedups.
>
> Joseph
>
> On Sat, Feb 28, 2015 at 7:26 AM, Deep Pradhan 
> wrote:
>
>> Hi,
>> I am running Spark applications in GCE. I set up cluster with different
>> number of nodes varying from 1 to 7. The machines are single core machines.
>> I set the spark.default.parallelism to the number of nodes in the cluster
>> for each cluster. I ran the four applications available in Spark Examples,
>> SparkTC, SparkALS, SparkLR, SparkPi for each of the configurations.
>> What I notice is the following:
>> In case of SparkTC and SparkALS, the time to complete the job increases
>> with the increase in number of nodes in cluster, where as in SparkLR and
>> SparkPi, the time to complete the job remains the same across all the
>> configurations.
>> Could anyone explain me this?
>>
>> Thank You
>> Regards,
>> Deep
>>
>
>


Re: Tools to manage workflows on Spark

2015-02-28 Thread Mayur Rustagi
Sorry not really. Spork is a way to migrate your existing pig scripts to
Spark or write new pig jobs then can execute on spark.
For orchestration you are better off using Oozie especially if you are
using other execution engines/systems besides spark.


Regards,
Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoid.com 
@mayur_rustagi 

On Sat, Feb 28, 2015 at 6:59 PM, Qiang Cao  wrote:

> Thanks Mayur! I'm looking for something that would allow me to easily
> describe and manage a workflow on Spark. A workflow in my context is a
> composition of Spark applications that may depend on one another based on
> hdfs inputs/outputs. Is Spork a good fit? The orchestration I want is on
> app level.
>
>
>
> On Sat, Feb 28, 2015 at 9:38 PM, Mayur Rustagi 
> wrote:
>
>> We do maintain it but in apache repo itself. However Pig cannot do
>> orchestration for you. I am not sure what you are looking at from Pig in
>> this context.
>>
>> Regards,
>> Mayur Rustagi
>> Ph: +1 (760) 203 3257
>> http://www.sigmoid.com 
>> @mayur_rustagi 
>>
>> On Sat, Feb 28, 2015 at 6:36 PM, Ted Yu  wrote:
>>
>>> Here was latest modification in spork repo:
>>> Mon Dec 1 10:08:19 2014
>>>
>>> Not sure if it is being actively maintained.
>>>
>>> On Sat, Feb 28, 2015 at 6:26 PM, Qiang Cao 
>>> wrote:
>>>
 Thanks for the pointer, Ashish! I was also looking at Spork
 https://github.com/sigmoidanalytics/spork Pig-on-Spark), but wasn't
 sure if that's the right direction.

 On Sat, Feb 28, 2015 at 6:36 PM, Ashish Nigam 
 wrote:

> You have to call spark-submit from oozie.
> I used this link to get the idea for my implementation -
>
>
> http://mail-archives.apache.org/mod_mbox/oozie-user/201404.mbox/%3CCAHCsPn-0Grq1rSXrAZu35yy_i4T=fvovdox2ugpcuhkwmjp...@mail.gmail.com%3E
>
>
>
> On Feb 28, 2015, at 3:25 PM, Qiang Cao  wrote:
>
> Thanks, Ashish! Is Oozie integrated with Spark? I knew it can
> accommodate some Hadoop jobs.
>
>
> On Sat, Feb 28, 2015 at 6:07 PM, Ashish Nigam 
> wrote:
>
>> Qiang,
>> Did you look at Oozie?
>> We use oozie to run spark jobs in production.
>>
>>
>> On Feb 28, 2015, at 2:45 PM, Qiang Cao  wrote:
>>
>> Hi Everyone,
>>
>> We need to deal with workflows on Spark. In our scenario, each
>> workflow consists of multiple processing steps. Among different steps,
>> there could be dependencies.  I'm wondering if there are tools
>> available that can help us schedule and manage workflows on Spark. I'm
>> looking for something like pig on Hadoop, but it should fully function on
>> Spark.
>>
>> Any suggestion?
>>
>> Thanks in advance!
>>
>> Qiang
>>
>>
>>
>
>

>>>
>>
>


Re: Tools to manage workflows on Spark

2015-02-28 Thread Qiang Cao
Thanks Mayur! I'm looking for something that would allow me to easily
describe and manage a workflow on Spark. A workflow in my context is a
composition of Spark applications that may depend on one another based on
hdfs inputs/outputs. Is Spork a good fit? The orchestration I want is on
app level.


On Sat, Feb 28, 2015 at 9:38 PM, Mayur Rustagi 
wrote:

> We do maintain it but in apache repo itself. However Pig cannot do
> orchestration for you. I am not sure what you are looking at from Pig in
> this context.
>
> Regards,
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoid.com 
> @mayur_rustagi 
>
> On Sat, Feb 28, 2015 at 6:36 PM, Ted Yu  wrote:
>
>> Here was latest modification in spork repo:
>> Mon Dec 1 10:08:19 2014
>>
>> Not sure if it is being actively maintained.
>>
>> On Sat, Feb 28, 2015 at 6:26 PM, Qiang Cao  wrote:
>>
>>> Thanks for the pointer, Ashish! I was also looking at Spork
>>> https://github.com/sigmoidanalytics/spork Pig-on-Spark), but wasn't
>>> sure if that's the right direction.
>>>
>>> On Sat, Feb 28, 2015 at 6:36 PM, Ashish Nigam 
>>> wrote:
>>>
 You have to call spark-submit from oozie.
 I used this link to get the idea for my implementation -


 http://mail-archives.apache.org/mod_mbox/oozie-user/201404.mbox/%3CCAHCsPn-0Grq1rSXrAZu35yy_i4T=fvovdox2ugpcuhkwmjp...@mail.gmail.com%3E



 On Feb 28, 2015, at 3:25 PM, Qiang Cao  wrote:

 Thanks, Ashish! Is Oozie integrated with Spark? I knew it can
 accommodate some Hadoop jobs.


 On Sat, Feb 28, 2015 at 6:07 PM, Ashish Nigam 
 wrote:

> Qiang,
> Did you look at Oozie?
> We use oozie to run spark jobs in production.
>
>
> On Feb 28, 2015, at 2:45 PM, Qiang Cao  wrote:
>
> Hi Everyone,
>
> We need to deal with workflows on Spark. In our scenario, each
> workflow consists of multiple processing steps. Among different steps,
> there could be dependencies.  I'm wondering if there are tools
> available that can help us schedule and manage workflows on Spark. I'm
> looking for something like pig on Hadoop, but it should fully function on
> Spark.
>
> Any suggestion?
>
> Thanks in advance!
>
> Qiang
>
>
>


>>>
>>
>


Re: Accumulator in SparkUI for streaming

2015-02-28 Thread Tim Smith
So somehow Spark Streaming doesn't support display of named accumulators in
the WebUI?


On Tue, Feb 24, 2015 at 7:58 AM, Petar Zecevic 
wrote:

>
> Interesting. Accumulators are shown on Web UI if you are using the
> ordinary SparkContext (Spark 1.2). It just has to be named (and that's what
> you did).
>
> scala> val acc = sc.accumulator(0, "test accumulator")
> acc: org.apache.spark.Accumulator[Int] = 0
> scala> val rdd = sc.parallelize(1 to 1000)
> rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
> parallelize at :12
> scala> rdd.foreach(x => acc += 1)
> scala> acc.value
> res1: Int = 1000
>
> The Stage details page shows:
>
>
>
>
> On 20.2.2015. 9:25, Tim Smith wrote:
>
>  On Spark 1.2:
>
>  I am trying to capture # records read from a kafka topic:
>
>  val inRecords = ssc.sparkContext.accumulator(0, "InRecords")
>
>  ..
>
>  kInStreams.foreach( k =>
> {
>
>   k.foreachRDD ( rdd =>  inRecords += rdd.count().toInt  )
>   inRecords.value
>
>
>  Question is how do I get the accumulator to show up in the UI? I tried
> "inRecords.value" but that didn't help. Pretty sure it isn't showing up in
> Stage metrics.
>
>  What's the trick here? collect?
>
>  Thanks,
>
>  Tim
>
>
>


Re: Tools to manage workflows on Spark

2015-02-28 Thread Mayur Rustagi
We do maintain it but in apache repo itself. However Pig cannot do
orchestration for you. I am not sure what you are looking at from Pig in
this context.

Regards,
Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoid.com 
@mayur_rustagi 

On Sat, Feb 28, 2015 at 6:36 PM, Ted Yu  wrote:

> Here was latest modification in spork repo:
> Mon Dec 1 10:08:19 2014
>
> Not sure if it is being actively maintained.
>
> On Sat, Feb 28, 2015 at 6:26 PM, Qiang Cao  wrote:
>
>> Thanks for the pointer, Ashish! I was also looking at Spork
>> https://github.com/sigmoidanalytics/spork Pig-on-Spark), but wasn't sure
>> if that's the right direction.
>>
>> On Sat, Feb 28, 2015 at 6:36 PM, Ashish Nigam 
>> wrote:
>>
>>> You have to call spark-submit from oozie.
>>> I used this link to get the idea for my implementation -
>>>
>>>
>>> http://mail-archives.apache.org/mod_mbox/oozie-user/201404.mbox/%3CCAHCsPn-0Grq1rSXrAZu35yy_i4T=fvovdox2ugpcuhkwmjp...@mail.gmail.com%3E
>>>
>>>
>>>
>>> On Feb 28, 2015, at 3:25 PM, Qiang Cao  wrote:
>>>
>>> Thanks, Ashish! Is Oozie integrated with Spark? I knew it can
>>> accommodate some Hadoop jobs.
>>>
>>>
>>> On Sat, Feb 28, 2015 at 6:07 PM, Ashish Nigam 
>>> wrote:
>>>
 Qiang,
 Did you look at Oozie?
 We use oozie to run spark jobs in production.


 On Feb 28, 2015, at 2:45 PM, Qiang Cao  wrote:

 Hi Everyone,

 We need to deal with workflows on Spark. In our scenario, each workflow
 consists of multiple processing steps. Among different steps, there could
 be dependencies.  I'm wondering if there are tools available that can
 help us schedule and manage workflows on Spark. I'm looking for something
 like pig on Hadoop, but it should fully function on Spark.

 Any suggestion?

 Thanks in advance!

 Qiang



>>>
>>>
>>
>


Re: Tools to manage workflows on Spark

2015-02-28 Thread Ted Yu
Here was latest modification in spork repo:
Mon Dec 1 10:08:19 2014

Not sure if it is being actively maintained.

On Sat, Feb 28, 2015 at 6:26 PM, Qiang Cao  wrote:

> Thanks for the pointer, Ashish! I was also looking at Spork
> https://github.com/sigmoidanalytics/spork Pig-on-Spark), but wasn't sure
> if that's the right direction.
>
> On Sat, Feb 28, 2015 at 6:36 PM, Ashish Nigam 
> wrote:
>
>> You have to call spark-submit from oozie.
>> I used this link to get the idea for my implementation -
>>
>>
>> http://mail-archives.apache.org/mod_mbox/oozie-user/201404.mbox/%3CCAHCsPn-0Grq1rSXrAZu35yy_i4T=fvovdox2ugpcuhkwmjp...@mail.gmail.com%3E
>>
>>
>>
>> On Feb 28, 2015, at 3:25 PM, Qiang Cao  wrote:
>>
>> Thanks, Ashish! Is Oozie integrated with Spark? I knew it can accommodate
>> some Hadoop jobs.
>>
>>
>> On Sat, Feb 28, 2015 at 6:07 PM, Ashish Nigam 
>> wrote:
>>
>>> Qiang,
>>> Did you look at Oozie?
>>> We use oozie to run spark jobs in production.
>>>
>>>
>>> On Feb 28, 2015, at 2:45 PM, Qiang Cao  wrote:
>>>
>>> Hi Everyone,
>>>
>>> We need to deal with workflows on Spark. In our scenario, each workflow
>>> consists of multiple processing steps. Among different steps, there could
>>> be dependencies.  I'm wondering if there are tools available that can
>>> help us schedule and manage workflows on Spark. I'm looking for something
>>> like pig on Hadoop, but it should fully function on Spark.
>>>
>>> Any suggestion?
>>>
>>> Thanks in advance!
>>>
>>> Qiang
>>>
>>>
>>>
>>
>>
>


Re: Tools to manage workflows on Spark

2015-02-28 Thread Qiang Cao
Thanks for the pointer, Ashish! I was also looking at Spork
https://github.com/sigmoidanalytics/spork Pig-on-Spark), but wasn't sure if
that's the right direction.

On Sat, Feb 28, 2015 at 6:36 PM, Ashish Nigam 
wrote:

> You have to call spark-submit from oozie.
> I used this link to get the idea for my implementation -
>
>
> http://mail-archives.apache.org/mod_mbox/oozie-user/201404.mbox/%3CCAHCsPn-0Grq1rSXrAZu35yy_i4T=fvovdox2ugpcuhkwmjp...@mail.gmail.com%3E
>
>
>
> On Feb 28, 2015, at 3:25 PM, Qiang Cao  wrote:
>
> Thanks, Ashish! Is Oozie integrated with Spark? I knew it can accommodate
> some Hadoop jobs.
>
>
> On Sat, Feb 28, 2015 at 6:07 PM, Ashish Nigam 
> wrote:
>
>> Qiang,
>> Did you look at Oozie?
>> We use oozie to run spark jobs in production.
>>
>>
>> On Feb 28, 2015, at 2:45 PM, Qiang Cao  wrote:
>>
>> Hi Everyone,
>>
>> We need to deal with workflows on Spark. In our scenario, each workflow
>> consists of multiple processing steps. Among different steps, there could
>> be dependencies.  I'm wondering if there are tools available that can
>> help us schedule and manage workflows on Spark. I'm looking for something
>> like pig on Hadoop, but it should fully function on Spark.
>>
>> Any suggestion?
>>
>> Thanks in advance!
>>
>> Qiang
>>
>>
>>
>
>


Re: Unable to find org.apache.spark.sql.catalyst.ScalaReflection class

2015-02-28 Thread Michael Armbrust
I think its possible that the problem is that the scala compiler is not
being loaded by the primordial classloader (but instead by some child
classloader) and thus the scala reflection mirror is failing to initialize
when it can't find it. Unfortunately, the only solution that I know of is
to load all required jars when the JVM starts.

On Sat, Feb 28, 2015 at 5:26 PM, Ashish Nigam 
wrote:

> Also, can scala version play any role here?
> I am using scala 2.11.5 but all spark packages have dependency to scala
> 2.11.2
> Just wanted to make sure that scala version is not an issue here.
>
> On Sat, Feb 28, 2015 at 9:18 AM, Ashish Nigam 
> wrote:
>
>> Hi,
>> I wrote a very simple program in scala to convert an existing RDD to
>> SchemaRDD.
>> But createSchemaRDD function is throwing exception
>>
>> Exception in thread "main" scala.ScalaReflectionException: class
>> org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with primordial
>> classloader with boot classpath [.] not found
>>
>>
>> Here's more info on the versions I am using -
>>
>> 2.11
>> 1.2.1
>> 2.11.5
>>
>> Please let me know how can I resolve this problem.
>>
>> Thanks
>> Ashish
>>
>
>


Re: Unable to find org.apache.spark.sql.catalyst.ScalaReflection class

2015-02-28 Thread Ashish Nigam
Also, can scala version play any role here?
I am using scala 2.11.5 but all spark packages have dependency to scala
2.11.2
Just wanted to make sure that scala version is not an issue here.

On Sat, Feb 28, 2015 at 9:18 AM, Ashish Nigam 
wrote:

> Hi,
> I wrote a very simple program in scala to convert an existing RDD to
> SchemaRDD.
> But createSchemaRDD function is throwing exception
>
> Exception in thread "main" scala.ScalaReflectionException: class
> org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with primordial
> classloader with boot classpath [.] not found
>
>
> Here's more info on the versions I am using -
>
> 2.11
> 1.2.1
> 2.11.5
>
> Please let me know how can I resolve this problem.
>
> Thanks
> Ashish
>


Re: Reg. Difference in Performance

2015-02-28 Thread Joseph Bradley
Hi Deep,

Compute times may not be very meaningful for small examples like those.  If
you increase the sizes of the examples, then you may start to observe more
meaningful trends and speedups.

Joseph

On Sat, Feb 28, 2015 at 7:26 AM, Deep Pradhan 
wrote:

> Hi,
> I am running Spark applications in GCE. I set up cluster with different
> number of nodes varying from 1 to 7. The machines are single core machines.
> I set the spark.default.parallelism to the number of nodes in the cluster
> for each cluster. I ran the four applications available in Spark Examples,
> SparkTC, SparkALS, SparkLR, SparkPi for each of the configurations.
> What I notice is the following:
> In case of SparkTC and SparkALS, the time to complete the job increases
> with the increase in number of nodes in cluster, where as in SparkLR and
> SparkPi, the time to complete the job remains the same across all the
> configurations.
> Could anyone explain me this?
>
> Thank You
> Regards,
> Deep
>


Re: Some questions after playing a little with the new ml.Pipeline.

2015-02-28 Thread Joseph Bradley
Hi Jao,

You can use external tools and libraries if they can be called from your
Spark program or script (with appropriate conversion of data types, etc.).
The best way to apply a pre-trained model to a dataset would be to call the
model from within a closure, e.g.:

myRDD.map { myDatum => preTrainedModel.predict(myDatum) }

If your data is distributed in an RDD (myRDD), then the above call will
distribute the computation of prediction using the pre-trained model.  It
will require that all of your Spark workers be able to run the
preTrainedModel; that may mean installing Caffe and dependencies on all
nodes in the compute cluster.

For the second question, I would modify the above call as follows:

myRDD.mapPartitions { myDataOnPartition =>
  val myModel = // instantiate neural network on this partition
  myDataOnPartition.map { myDatum => myModel.predict(myDatum) }
}

I hope this helps!
Joseph

On Fri, Feb 27, 2015 at 10:27 PM, Jaonary Rabarisoa 
wrote:

> Dear all,
>
>
> We mainly do large scale computer vision task (image classification,
> retrieval, ...). The pipeline is really great stuff for that. We're trying
> to reproduce the tutorial given on that topic during the latest spark
> summit (
> http://ampcamp.berkeley.edu/5/exercises/image-classification-with-pipelines.html
>  )
> using the master version of spark pipeline and dataframe. The tutorial
> shows different examples of feature extraction stages before running
> machine learning algorithms. Even the tutorial is straightforward to
> reproduce with this new API, we still have some questions :
>
>- Can one use external tools (e.g via pipe) as a pipeline stage ? An
>example of use case is to extract feature learned with convolutional neural
>network. In our case, this corresponds to a pre-trained neural network with
>Caffe library (http://caffe.berkeleyvision.org/) .
>
>
>- The second question is about the performance of the pipeline.
>Library such as Caffe processes the data in batch and instancing one Caffe
>network can be time consuming when this network is very deep. So, we can
>gain performance if we minimize the number of Caffe network creation and
>give data in batch to the network. In the pipeline, this corresponds to run
>transformers that work on a partition basis and give the whole partition to
>a single caffe network. How can we create such a transformer ?
>
>
>
> Best,
>
> Jao
>


Re: Is there any Sparse Matrix implementation in Spark/MLib?

2015-02-28 Thread Joseph Bradley
Hi Shahab,

There are actually a few distributed Matrix types which support sparse
representations: RowMatrix, IndexedRowMatrix, and CoordinateMatrix.
The documentation has a bit more info about the various uses:
http://spark.apache.org/docs/latest/mllib-data-types.html#distributed-matrix

The Spark 1.3 RC includes a new one: BlockMatrix.

But since these are distributed, they are represented using RDDs, so they
of course will not be as fast as computations on smaller, locally stored
matrices.

Joseph

On Fri, Feb 27, 2015 at 4:39 AM, Ritesh Kumar Singh <
riteshoneinamill...@gmail.com> wrote:

> try using breeze (scala linear algebra library)
>
> On Fri, Feb 27, 2015 at 5:56 PM, shahab  wrote:
>
>> Thanks a lot Vijay, let me see how it performs.
>>
>> Best
>> Shahab
>>
>>
>> On Friday, February 27, 2015, Vijay Saraswat  wrote:
>>
>>> Available in GML --
>>>
>>> http://x10-lang.org/x10-community/applications/global-
>>> matrix-library.html
>>>
>>> We are exploring how to make it available within Spark. Any ideas would
>>> be much appreciated.
>>>
>>> On 2/27/15 7:01 AM, shahab wrote:
>>>
 Hi,

 I just wonder if there is any Sparse Matrix implementation available
 in Spark, so it can be used in spark application?

 best,
 /Shahab

>>>
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>


Re: Tools to manage workflows on Spark

2015-02-28 Thread Ashish Nigam
You have to call spark-submit from oozie.
I used this link to get the idea for my implementation - 

http://mail-archives.apache.org/mod_mbox/oozie-user/201404.mbox/%3CCAHCsPn-0Grq1rSXrAZu35yy_i4T=fvovdox2ugpcuhkwmjp...@mail.gmail.com%3E



> On Feb 28, 2015, at 3:25 PM, Qiang Cao  wrote:
> 
> Thanks, Ashish! Is Oozie integrated with Spark? I knew it can accommodate 
> some Hadoop jobs.
> 
> 
> On Sat, Feb 28, 2015 at 6:07 PM, Ashish Nigam  > wrote:
> Qiang,
> Did you look at Oozie?
> We use oozie to run spark jobs in production.
> 
> 
>> On Feb 28, 2015, at 2:45 PM, Qiang Cao > > wrote:
>> 
>> Hi Everyone,
>> 
>> We need to deal with workflows on Spark. In our scenario, each workflow 
>> consists of multiple processing steps. Among different steps, there could be 
>> dependencies.  I'm wondering if there are tools available that can help us 
>> schedule and manage workflows on Spark. I'm looking for something like pig 
>> on Hadoop, but it should fully function on Spark.
>> 
>> Any suggestion?
>> 
>> Thanks in advance!
>> 
>> Qiang
> 
> 



Re: Tools to manage workflows on Spark

2015-02-28 Thread Qiang Cao
Thanks, Ashish! Is Oozie integrated with Spark? I knew it can accommodate
some Hadoop jobs.


On Sat, Feb 28, 2015 at 6:07 PM, Ashish Nigam 
wrote:

> Qiang,
> Did you look at Oozie?
> We use oozie to run spark jobs in production.
>
>
> On Feb 28, 2015, at 2:45 PM, Qiang Cao  wrote:
>
> Hi Everyone,
>
> We need to deal with workflows on Spark. In our scenario, each workflow
> consists of multiple processing steps. Among different steps, there could
> be dependencies.  I'm wondering if there are tools available that can
> help us schedule and manage workflows on Spark. I'm looking for something
> like pig on Hadoop, but it should fully function on Spark.
>
> Any suggestion?
>
> Thanks in advance!
>
> Qiang
>
>
>


Re: Tools to manage workflows on Spark

2015-02-28 Thread Ashish Nigam
Qiang,
Did you look at Oozie?
We use oozie to run spark jobs in production.


> On Feb 28, 2015, at 2:45 PM, Qiang Cao  wrote:
> 
> Hi Everyone,
> 
> We need to deal with workflows on Spark. In our scenario, each workflow 
> consists of multiple processing steps. Among different steps, there could be 
> dependencies.  I'm wondering if there are tools available that can help us 
> schedule and manage workflows on Spark. I'm looking for something like pig on 
> Hadoop, but it should fully function on Spark.
> 
> Any suggestion?
> 
> Thanks in advance!
> 
> Qiang



Re: Unable to find org.apache.spark.sql.catalyst.ScalaReflection class

2015-02-28 Thread Ashish Nigam
Ted,
spark-catalyst_2.11-1.2.1.jar is present in the class path. BTW, I am running 
the code locally in eclipse workspace.

Here’s complete exception stack trace - 

Exception in thread "main" scala.ScalaReflectionException: class 
org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with primordial 
classloader with boot classpath 
[/Applications/eclipse/plugins/org.scala-lang.scala-library_2.11.5.v20150101-184742-3fafbc204f.jar:/Applications/eclipse/plugins/org.scala-lang.scala-reflect_2.11.5.v20150101-184742-3fafbc204f.jar:/Applications/eclipse/plugins/org.scala-lang.scala-actors_2.11.5.v20150101-184742-3fafbc204f.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_31.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_31.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_31.jdk/Contents/Home/jre/lib/sunrsasign.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_31.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_31.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_31.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_31.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_31.jdk/Contents/Home/jre/classes]
 not found.
at 
scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:123)
at 
scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:22)
at 
org.apache.spark.sql.catalyst.ScalaReflection$$typecreator1$1.apply(ScalaReflection.scala:115)
at 
scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:232)
at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:232)
at scala.reflect.api.TypeTags$class.typeOf(TypeTags.scala:341)
at scala.reflect.api.Universe.typeOf(Universe.scala:61)
at 
org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:115)
at 
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:33)
at 
org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:100)
at 
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:33)
at 
org.apache.spark.sql.catalyst.ScalaReflection$class.attributesFor(ScalaReflection.scala:94)
at 
org.apache.spark.sql.catalyst.ScalaReflection$.attributesFor(ScalaReflection.scala:33)
at org.apache.spark.sql.SQLContext.createSchemaRDD(SQLContext.scala:111)
——





> On Feb 28, 2015, at 9:31 AM, Ted Yu  wrote:
> 
> Have you verified that spark-catalyst_2.10 jar was in the classpath ?
> 
> Cheers
> 
> On Sat, Feb 28, 2015 at 9:18 AM, Ashish Nigam  > wrote:
> Hi,
> I wrote a very simple program in scala to convert an existing RDD to 
> SchemaRDD.
> But createSchemaRDD function is throwing exception 
> 
> Exception in thread "main" scala.ScalaReflectionException: class 
> org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with primordial 
> classloader with boot classpath [.] not found
> 
> 
> Here's more info on the versions I am using - 
> 
> 2.11
> 1.2.1
> 2.11.5
> 
> Please let me know how can I resolve this problem.
> 
> Thanks
> Ashish
> 



Tools to manage workflows on Spark

2015-02-28 Thread Qiang Cao
Hi Everyone,

We need to deal with workflows on Spark. In our scenario, each workflow
consists of multiple processing steps. Among different steps, there could
be dependencies.  I'm wondering if there are tools available that can help
us schedule and manage workflows on Spark. I'm looking for something like
pig on Hadoop, but it should fully function on Spark.

Any suggestion?

Thanks in advance!

Qiang


Re: Upgrade to Spark 1.2.1 using Guava

2015-02-28 Thread Erlend Hamnaberg
Yes. I ran into this problem with mahout snapshot and spark 1.2.0 not
really trying to figure out why that was a problem, since there were
already too many moving parts in my app. Obviously there is a classpath
issue somewhere.

/Erlend
On 27 Feb 2015 22:30, "Pat Ferrel"  wrote:

> @Erlend hah, we were trying to merge your PR and ran into this—small
> world. You actually compile the JavaSerializer source in your project?
>
> @Marcelo do you mean by modifying spark.executor.extraClassPath on all
> workers, that didn’t seem to work?
>
> On Feb 27, 2015, at 1:23 PM, Erlend Hamnaberg 
> wrote:
>
> Hi.
>
> I have had a simliar issue. I had to pull the JavaSerializer source into
> my own project, just so I got the classloading of this class under control.
>
> This must be a class loader issue with spark.
>
> -E
>
> On Fri, Feb 27, 2015 at 8:52 PM, Pat Ferrel  wrote:
>
>> I understand that I need to supply Guava to Spark. The HashBiMap is
>> created in the client and broadcast to the workers. So it is needed in
>> both. To achieve this there is a deps.jar with Guava (and Scopt but that is
>> only for the client). Scopt is found so I know the jar is fine for the
>> client.
>>
>> I pass in the deps.jar to the context creation code. I’ve checked the
>> content of the jar and have verified that it is used at context creation
>> time.
>>
>> I register the serializer as follows:
>>
>> class MyKryoRegistrator extends KryoRegistrator {
>>
>>   override def registerClasses(kryo: Kryo) = {
>>
>> val h: HashBiMap[String, Int] = HashBiMap.create[String, Int]()
>> //kryo.addDefaultSerializer(h.getClass, new JavaSerializer())
>> log.info("\n\n\nRegister Serializer for " +
>> h.getClass.getCanonicalName + "\n\n\n") // just to be sure this does indeed
>> get logged
>> kryo.register(classOf[com.google.common.collect.HashBiMap[String,
>> Int]], new JavaSerializer())
>>   }
>> }
>>
>> The job proceeds up until the broadcast value, a HashBiMap, is
>> deserialized, which is where I get the following error.
>>
>> Have I missed a step for deserialization of broadcast values? Odd that
>> serialization happened but deserialization failed. I’m running on a
>> standalone localhost-only cluster.
>>
>>
>> 15/02/27 11:40:34 WARN scheduler.TaskSetManager: Lost task 1.0 in stage
>> 4.0 (TID 9, 192.168.0.2): java.io.IOException:
>> com.esotericsoftware.kryo.KryoException: Error during Java deserialization.
>> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1093)
>> at
>> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
>> at
>> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
>> at
>> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
>> at
>> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
>> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
>> at
>> my.TDIndexedDatasetReader$$anonfun$5.apply(TextDelimitedReaderWriter.scala:95)
>> at
>> my.TDIndexedDatasetReader$$anonfun$5.apply(TextDelimitedReaderWriter.scala:94)
>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>> at
>> org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:366)
>> at
>> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211)
>> at
>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>> at org.apache.spark.scheduler.Task.run(Task.scala:56)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: com.esotericsoftware.kryo.KryoException: Error during Java
>> deserialization.
>> at
>> com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:42)
>> at
>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>> at
>> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:144)
>> at
>> org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216)
>> at
>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:177)
>> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1090)
>> ... 19 more
>>
>>  root eror ==
>> Caused by: java.lang.Cl

Re: SparkSQL production readiness

2015-02-28 Thread Michael Armbrust
We are planning to remove the alpha tag in 1.3.0.

On Sat, Feb 28, 2015 at 12:30 AM, Wang, Daoyuan 
wrote:

>  Hopefully  the alpha tag will be remove in 1.4.0, if the community can
> review code a little bit faster :P
>
>
>
> Thanks,
>
> Daoyuan
>
>
>
> *From:* Ashish Mukherjee [mailto:ashish.mukher...@gmail.com]
> *Sent:* Saturday, February 28, 2015 4:28 PM
> *To:* user@spark.apache.org
> *Subject:* SparkSQL production readiness
>
>
>
> Hi,
>
>
>
> I am exploring SparkSQL for my purposes of performing large relational
> operations across a cluster. However, it seems to be in alpha right now. Is
> there any indication when it would be considered production-level? I don't
> see any info on the site.
>
>
>
> Regards,
>
> Ashish
>


Re: bitten by spark.yarn.executor.memoryOverhead

2015-02-28 Thread Corey Nolet
Thanks for taking this on Ted!

On Sat, Feb 28, 2015 at 4:17 PM, Ted Yu  wrote:

> I have created SPARK-6085 with pull request:
> https://github.com/apache/spark/pull/4836
>
> Cheers
>
> On Sat, Feb 28, 2015 at 12:08 PM, Corey Nolet  wrote:
>
>> +1 to a better default as well.
>>
>> We were working find until we ran against a real dataset which was much
>> larger than the test dataset we were using locally. It took me a couple
>> days and digging through many logs to figure out this value was what was
>> causing the problem.
>>
>> On Sat, Feb 28, 2015 at 11:38 AM, Ted Yu  wrote:
>>
>>> Having good out-of-box experience is desirable.
>>>
>>> +1 on increasing the default.
>>>
>>>
>>> On Sat, Feb 28, 2015 at 8:27 AM, Sean Owen  wrote:
>>>
 There was a recent discussion about whether to increase or indeed make
 configurable this kind of default fraction. I believe the suggestion
 there too was that 9-10% is a safer default.

 Advanced users can lower the resulting overhead value; it may still
 have to be increased in some cases, but a fatter default may make this
 kind of surprise less frequent.

 I'd support increasing the default; any other thoughts?

 On Sat, Feb 28, 2015 at 3:34 PM, Koert Kuipers 
 wrote:
 > hey,
 > running my first map-red like (meaning disk-to-disk, avoiding in
 memory
 > RDDs) computation in spark on yarn i immediately got bitten by a too
 low
 > spark.yarn.executor.memoryOverhead. however it took me about an hour
 to find
 > out this was the cause. at first i observed failing shuffles leading
 to
 > restarting of tasks, then i realized this was because executors could
 not be
 > reached, then i noticed in containers got shut down and reallocated in
 > resourcemanager logs (no mention of errors, it seemed the containers
 > finished their business and shut down successfully), and finally i
 found the
 > reason in nodemanager logs.
 >
 > i dont think this is a pleasent first experience. i realize
 > spark.yarn.executor.memoryOverhead needs to be set differently from
 > situation to situation. but shouldnt the default be a somewhat higher
 value
 > so that these errors are unlikely, and then the experts that are
 willing to
 > deal with these errors can tune it lower? so why not make the default
 10%
 > instead of 7%? that gives something that works in most situations out
 of the
 > box (at the cost of being a little wasteful). it worked for me.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


>>>
>>
>


Re: bitten by spark.yarn.executor.memoryOverhead

2015-02-28 Thread Ted Yu
I have created SPARK-6085 with pull request:
https://github.com/apache/spark/pull/4836

Cheers

On Sat, Feb 28, 2015 at 12:08 PM, Corey Nolet  wrote:

> +1 to a better default as well.
>
> We were working find until we ran against a real dataset which was much
> larger than the test dataset we were using locally. It took me a couple
> days and digging through many logs to figure out this value was what was
> causing the problem.
>
> On Sat, Feb 28, 2015 at 11:38 AM, Ted Yu  wrote:
>
>> Having good out-of-box experience is desirable.
>>
>> +1 on increasing the default.
>>
>>
>> On Sat, Feb 28, 2015 at 8:27 AM, Sean Owen  wrote:
>>
>>> There was a recent discussion about whether to increase or indeed make
>>> configurable this kind of default fraction. I believe the suggestion
>>> there too was that 9-10% is a safer default.
>>>
>>> Advanced users can lower the resulting overhead value; it may still
>>> have to be increased in some cases, but a fatter default may make this
>>> kind of surprise less frequent.
>>>
>>> I'd support increasing the default; any other thoughts?
>>>
>>> On Sat, Feb 28, 2015 at 3:34 PM, Koert Kuipers 
>>> wrote:
>>> > hey,
>>> > running my first map-red like (meaning disk-to-disk, avoiding in memory
>>> > RDDs) computation in spark on yarn i immediately got bitten by a too
>>> low
>>> > spark.yarn.executor.memoryOverhead. however it took me about an hour
>>> to find
>>> > out this was the cause. at first i observed failing shuffles leading to
>>> > restarting of tasks, then i realized this was because executors could
>>> not be
>>> > reached, then i noticed in containers got shut down and reallocated in
>>> > resourcemanager logs (no mention of errors, it seemed the containers
>>> > finished their business and shut down successfully), and finally i
>>> found the
>>> > reason in nodemanager logs.
>>> >
>>> > i dont think this is a pleasent first experience. i realize
>>> > spark.yarn.executor.memoryOverhead needs to be set differently from
>>> > situation to situation. but shouldnt the default be a somewhat higher
>>> value
>>> > so that these errors are unlikely, and then the experts that are
>>> willing to
>>> > deal with these errors can tune it lower? so why not make the default
>>> 10%
>>> > instead of 7%? that gives something that works in most situations out
>>> of the
>>> > box (at the cost of being a little wasteful). it worked for me.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: Missing shuffle files

2015-02-28 Thread Corey Nolet
Just wanted to point out- raising the memory-head (as I saw in the logs)
was the fix for this issue and I have not seen dying executors since this
calue was increased

On Tue, Feb 24, 2015 at 3:52 AM, Anders Arpteg  wrote:

> If you thinking of the yarn memory overhead, then yes, I have increased
> that as well. However, I'm glad to say that my job finished successfully
> finally. Besides the timeout and memory settings, performing repartitioning
> (with shuffling) at the right time seems to be the key to make this large
> job succeed. With all the transformations in the job, the partition
> distribution was becoming increasingly skewed. Not easy to figure out when
> and to what number of partitions to set, and takes forever to tweak these
> settings since it's works perfectly for small datasets and you'll have to
> experiment with large time-consuming jobs. Imagine if there was an
> automatic partition reconfiguration function that automagically did that...
>
>
> On Tue, Feb 24, 2015 at 3:20 AM, Corey Nolet  wrote:
>
>> I *think* this may have been related to the default memory overhead
>> setting being too low. I raised the value to 1G it and tried my job again
>> but i had to leave the office before it finished. It did get further but
>> I'm not exactly sure if that's just because i raised the memory. I'll see
>> tomorrow- but i have a suspicion this may have been the cause of the
>> executors being killed by the application master.
>> On Feb 23, 2015 5:25 PM, "Corey Nolet"  wrote:
>>
>>> I've got the opposite problem with regards to partitioning. I've got
>>> over 6000 partitions for some of these RDDs which immediately blows the
>>> heap somehow- I'm still not exactly sure how. If I coalesce them down to
>>> about 600-800 partitions, I get the problems where the executors are dying
>>> without any other error messages (other than telling me the executor was
>>> lost in the UI). If I don't coalesce, I pretty immediately get Java heap
>>> space exceptions that kill the job altogether.
>>>
>>> Putting in the timeouts didn't seem to help the case where I am
>>> coalescing. Also, I don't see any dfferences between 'disk only' and
>>> 'memory and disk' storage levels- both of them are having the same
>>> problems. I notice large shuffle files (30-40gb) that only seem to spill a
>>> few hundred mb.
>>>
>>> On Mon, Feb 23, 2015 at 4:28 PM, Anders Arpteg 
>>> wrote:
>>>
 Sounds very similar to what I experienced Corey. Something that seems
 to at least help with my problems is to have more partitions. Am already
 fighting between ending up with too many partitions in the end and having
 too few in the beginning. By coalescing at late as possible and avoiding
 too few in the beginning, the problems seems to decrease. Also, increasing
 spark.akka.askTimeout and spark.core.connection.ack.wait.timeout
 significantly (~700 secs), the problems seems to almost disappear. Don't
 wont to celebrate yet, still long way left before the job complete but it's
 looking better...

 On Mon, Feb 23, 2015 at 9:54 PM, Corey Nolet  wrote:

> I'm looking @ my yarn container logs for some of the executors which
> appear to be failing (with the missing shuffle files). I see exceptions
> that say "client.TransportClientFactor: Found inactive connection to
> host/ip:port, closing it."
>
> Right after that I see "shuffle.RetryingBlockFetcher: Exception while
> beginning fetch of 1 outstanding blocks. java.io.IOException: Failed to
> connect to host/ip:port"
>
> Right after that exception I see "RECEIVED SIGNAL 15: SIGTERM"
>
> Finally, following the sigterm, I see "FileNotFoundExcception:
> /hdfs/01/yarn/nm/usercache../spark-local-uuid/shuffle_5_09_0.data (No
> such file for directory)"
>
> I'm looking @ the nodemanager and application master logs and I see no
> indications whatsoever that there were any memory issues during this 
> period
> of time. The Spark UI is telling me none of the executors are really using
> too much memory when this happens. It is a big job that's catching several
> 100's of GB but each node manager on the cluster has 64gb of ram just for
> yarn containers (physical nodes have 128gb). On this cluster, we have 128
> nodes. I've also tried using DISK_ONLY storage level but to no avail.
>
> Any further ideas on how to track this down? Again, we're able to run
> this same job on about 1/5th of the data just fine.The only thing that's
> pointing me towards a memory issue is that it seems to be happening in the
> same stages each time and when I lower the memory that each executor has
> allocated it happens in earlier stages but I can't seem to find anything
> that says an executor (or container for that matter) has run low on 
> memory.
>
>
>
> On Mon, Feb 23, 2015 at 9:24 AM, Anders Arpteg 
> wrote:
>
>> No, unfor

Re: Problem getting program to run on 15TB input

2015-02-28 Thread Arun Luthra
The Spark UI names the line number and name of the operation (repartition
in this case) that it is performing. Only if this information is wrong
(just a possibility), could it have started groupByKey already.

I will try to analyze the amount of skew in the data by using reduceByKey
(or simply countByKey) which is relatively inexpensive. For the purposes of
this algorithm I can simply log and remove keys with huge counts, before
doing groupByKey.

On Sat, Feb 28, 2015 at 11:38 AM, Aaron Davidson  wrote:

> All stated symptoms are consistent with GC pressure (other nodes timeout
> trying to connect because of a long stop-the-world), quite possibly due to
> groupByKey. groupByKey is a very expensive operation as it may bring all
> the data for a particular partition into memory (in particular, it cannot
> spill values for a single key, so if you have a single very skewed key you
> can get behavior like this).
>
> On Sat, Feb 28, 2015 at 11:33 AM, Paweł Szulc 
> wrote:
>
>> But groupbykey will repartition according to numer of keys as I
>> understand how it works. How do you know that you haven't reached the
>> groupbykey phase? Are you using a profiler or do yoi base that assumption
>> only on logs?
>>
>> sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik 
>> napisał:
>>
>> A correction to my first post:
>>>
>>> There is also a repartition right before groupByKey to help avoid
>>> too-many-open-files error:
>>>
>>>
>>> rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile()
>>>
>>> On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra 
>>> wrote:
>>>
 The job fails before getting to groupByKey.

 I see a lot of timeout errors in the yarn logs, like:

 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1
 attempts
 akka.pattern.AskTimeoutException: Timed out

 and

 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2
 attempts
 java.util.concurrent.TimeoutException: Futures timed out after [30
 seconds]

 and some of these are followed by:

 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver
 Disassociated [akka.tcp://sparkExecutor@...] ->
 [akka.tcp://sparkDriver@...] disassociated! Shutting down.
 15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0
 in stage 1.0 (TID 336601)
 java.io.FileNotFoundException:
 /hadoop/yarn/local//spark-local-20150228123450-3a71/36/shuffle_0_421027_0
 (No such file or directory)




 On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc 
 wrote:

> I would first check whether  there is any possibility that after doing
> groupbykey one of the groups does not fit in one of the executors' memory.
>
> To back up my theory, instead of doing groupbykey + map try
> reducebykey + mapvalues.
>
> Let me know if that helped.
>
> Pawel Szulc
> http://rabbitonweb.com
>
> sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik <
> arun.lut...@gmail.com> napisał:
>
> So, actually I am removing the persist for now, because there is
>> significant filtering that happens after calling textFile()... but I will
>> keep that option in mind.
>>
>> I just tried a few different combinations of number of executors,
>> executor memory, and more importantly, number of tasks... *all three
>> times it failed when approximately 75.1% of the tasks were completed (no
>> matter how many tasks resulted from repartitioning the data in
>> textfile(..., N))*. Surely this is a strong clue to something?
>>
>>
>>
>> On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz 
>> wrote:
>>
>>> Hi,
>>>
>>> Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER`
>>> generates many small objects that lead to very long GC time, causing the
>>> executor losts, heartbeat not received, and GC overhead limit exceeded
>>> messages.
>>> Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can
>>> also try `OFF_HEAP` (and use Tachyon).
>>>
>>> Burak
>>>
>>> On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra >> > wrote:
>>>
 My program in pseudocode looks like this:

 val conf = new SparkConf().setAppName("Test")
   .set("spark.storage.memoryFraction","0.2") // default 0.6
   .set("spark.shuffle.memoryFraction","0.12") // default 0.2
   .set("spark.shuffle.manager","SORT") // preferred setting for
 optimized joins
   .set("spark.shuffle.consolidateFiles","true") // helpful for
 "too many files open"
   .set("spark.mesos.coarse", "true") // helpful for
 MapOutputTracker errors?
   .set("spark.akka.frameSize","500") // helpful when using
 consildateFiles=true
   .set("spark.akka.askTimeout", "3

Re: bitten by spark.yarn.executor.memoryOverhead

2015-02-28 Thread Corey Nolet
+1 to a better default as well.

We were working find until we ran against a real dataset which was much
larger than the test dataset we were using locally. It took me a couple
days and digging through many logs to figure out this value was what was
causing the problem.

On Sat, Feb 28, 2015 at 11:38 AM, Ted Yu  wrote:

> Having good out-of-box experience is desirable.
>
> +1 on increasing the default.
>
>
> On Sat, Feb 28, 2015 at 8:27 AM, Sean Owen  wrote:
>
>> There was a recent discussion about whether to increase or indeed make
>> configurable this kind of default fraction. I believe the suggestion
>> there too was that 9-10% is a safer default.
>>
>> Advanced users can lower the resulting overhead value; it may still
>> have to be increased in some cases, but a fatter default may make this
>> kind of surprise less frequent.
>>
>> I'd support increasing the default; any other thoughts?
>>
>> On Sat, Feb 28, 2015 at 3:34 PM, Koert Kuipers  wrote:
>> > hey,
>> > running my first map-red like (meaning disk-to-disk, avoiding in memory
>> > RDDs) computation in spark on yarn i immediately got bitten by a too low
>> > spark.yarn.executor.memoryOverhead. however it took me about an hour to
>> find
>> > out this was the cause. at first i observed failing shuffles leading to
>> > restarting of tasks, then i realized this was because executors could
>> not be
>> > reached, then i noticed in containers got shut down and reallocated in
>> > resourcemanager logs (no mention of errors, it seemed the containers
>> > finished their business and shut down successfully), and finally i
>> found the
>> > reason in nodemanager logs.
>> >
>> > i dont think this is a pleasent first experience. i realize
>> > spark.yarn.executor.memoryOverhead needs to be set differently from
>> > situation to situation. but shouldnt the default be a somewhat higher
>> value
>> > so that these errors are unlikely, and then the experts that are
>> willing to
>> > deal with these errors can tune it lower? so why not make the default
>> 10%
>> > instead of 7%? that gives something that works in most situations out
>> of the
>> > box (at the cost of being a little wasteful). it worked for me.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Scheduler hang?

2015-02-28 Thread Victor Tso-Guillen
Moving user to bcc.

What I found was that the TaskSetManager for my task set that had 5 tasks
had preferred locations set for 4 of the 5. Three had localhost/
and had completed. The one that had nothing had also completed. The last
one was set by our code to be my IP address. Local mode can hang on this
because of https://issues.apache.org/jira/browse/SPARK-4939 addressed by
https://github.com/apache/spark/pull/4147, which is obviously not an
optimal solution but since it's only local mode, it's very good enough. I'm
not going to wait for those seconds to tick by to complete the task, so
I'll fix the IP address reporting side for local mode in my code.

On Thu, Feb 26, 2015 at 8:32 PM, Victor Tso-Guillen  wrote:

> Of course, breakpointing on every status update and revive offers
> invocation kept the problem from happening. Where could the race be?
>
> On Thu, Feb 26, 2015 at 7:55 PM, Victor Tso-Guillen 
> wrote:
>
>> Love to hear some input on this. I did get a standalone cluster up on my
>> local machine and the problem didn't present itself. I'm pretty confident
>> that means the problem is in the LocalBackend or something near it.
>>
>> On Thu, Feb 26, 2015 at 1:37 PM, Victor Tso-Guillen 
>> wrote:
>>
>>> Okay I confirmed my suspicions of a hang. I made a request that stopped
>>> progressing, though the already-scheduled tasks had finished. I made a
>>> separate request that was small enough not to hang, and it kicked the hung
>>> job enough to finish. I think what's happening is that the scheduler or the
>>> local backend is not kicking the revive offers messaging at the right time,
>>> but I have to dig into the code some more to nail the culprit. Anyone on
>>> these list have experience in those code areas that could help?
>>>
>>> On Thu, Feb 26, 2015 at 2:27 AM, Victor Tso-Guillen 
>>> wrote:
>>>
 Thanks for the link. Unfortunately, I turned on rdd compression and
 nothing changed. I tried moving netty -> nio and no change :(

 On Thu, Feb 26, 2015 at 2:01 AM, Akhil Das 
 wrote:

> Not many that i know of, but i bumped into this one
> https://issues.apache.org/jira/browse/SPARK-4516
>
> Thanks
> Best Regards
>
> On Thu, Feb 26, 2015 at 3:26 PM, Victor Tso-Guillen 
> wrote:
>
>> Is there any potential problem from 1.1.1 to 1.2.1 with shuffle
>> dependencies that produce no data?
>>
>> On Thu, Feb 26, 2015 at 1:56 AM, Victor Tso-Guillen 
>> wrote:
>>
>>> The data is small. The job is composed of many small stages.
>>>
>>> * I found that with fewer than 222 the problem exhibits. What will
>>> be gained by going higher?
>>> * Pushing up the parallelism only pushes up the boundary at which
>>> the system appears to hang. I'm worried about some sort of message loss 
>>> or
>>> inconsistency.
>>> * Yes, we are using Kryo.
>>> * I'll try that, but I'm again a little confused why you're
>>> recommending this. I'm stumped so might as well?
>>>
>>> On Wed, Feb 25, 2015 at 11:13 PM, Akhil Das <
>>> ak...@sigmoidanalytics.com> wrote:
>>>
 What operation are you trying to do and how big is the data that
 you are operating on?

 Here's a few things which you can try:

 - Repartition the RDD to a higher number than 222
 - Specify the master as local[*] or local[10]
 - Use Kryo Serializer (.set("spark.serializer",
 "org.apache.spark.serializer.KryoSerializer"))
 - Enable RDD Compression (.set("spark.rdd.compress","true") )


 Thanks
 Best Regards

 On Thu, Feb 26, 2015 at 10:15 AM, Victor Tso-Guillen <
 v...@paxata.com> wrote:

> I'm getting this really reliably on Spark 1.2.1. Basically I'm in
> local mode with parallelism at 8. I have 222 tasks and I never seem 
> to get
> far past 40. Usually in the 20s to 30s it will just hang. The last 
> logging
> is below, and a screenshot of the UI.
>
> 2015-02-25 20:39:55.779 GMT-0800 INFO  [task-result-getter-3]
> TaskSetManager - Finished task 3.0 in stage 16.0 (TID 22) in 612 ms on
> localhost (1/5)
> 2015-02-25 20:39:55.825 GMT-0800 INFO  [Executor task launch
> worker-10] Executor - Finished task 1.0 in stage 16.0 (TID 20). 2492 
> bytes
> result sent to driver
> 2015-02-25 20:39:55.825 GMT-0800 INFO  [Executor task launch
> worker-8] Executor - Finished task 2.0 in stage 16.0 (TID 21). 2492 
> bytes
> result sent to driver
> 2015-02-25 20:39:55.831 GMT-0800 INFO  [task-result-getter-0]
> TaskSetManager - Finished task 1.0 in stage 16.0 (TID 20) in 670 ms on
> localhost (2/5)
> 2015-02-25 20:39:55.836 GMT-0800 INFO  [task-result-getter-1]
> TaskSetManager - Finished task 2.0 in stage 16.0 (TID 21) in 67

Re: Problem getting program to run on 15TB input

2015-02-28 Thread Aaron Davidson
All stated symptoms are consistent with GC pressure (other nodes timeout
trying to connect because of a long stop-the-world), quite possibly due to
groupByKey. groupByKey is a very expensive operation as it may bring all
the data for a particular partition into memory (in particular, it cannot
spill values for a single key, so if you have a single very skewed key you
can get behavior like this).

On Sat, Feb 28, 2015 at 11:33 AM, Paweł Szulc  wrote:

> But groupbykey will repartition according to numer of keys as I understand
> how it works. How do you know that you haven't reached the groupbykey
> phase? Are you using a profiler or do yoi base that assumption only on logs?
>
> sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik 
> napisał:
>
> A correction to my first post:
>>
>> There is also a repartition right before groupByKey to help avoid
>> too-many-open-files error:
>>
>>
>> rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile()
>>
>> On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra 
>> wrote:
>>
>>> The job fails before getting to groupByKey.
>>>
>>> I see a lot of timeout errors in the yarn logs, like:
>>>
>>> 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1
>>> attempts
>>> akka.pattern.AskTimeoutException: Timed out
>>>
>>> and
>>>
>>> 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2
>>> attempts
>>> java.util.concurrent.TimeoutException: Futures timed out after [30
>>> seconds]
>>>
>>> and some of these are followed by:
>>>
>>> 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver
>>> Disassociated [akka.tcp://sparkExecutor@...] -> [akka.tcp://sparkDriver@...]
>>> disassociated! Shutting down.
>>> 15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0 in
>>> stage 1.0 (TID 336601)
>>> java.io.FileNotFoundException:
>>> /hadoop/yarn/local//spark-local-20150228123450-3a71/36/shuffle_0_421027_0
>>> (No such file or directory)
>>>
>>>
>>>
>>>
>>> On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc 
>>> wrote:
>>>
 I would first check whether  there is any possibility that after doing
 groupbykey one of the groups does not fit in one of the executors' memory.

 To back up my theory, instead of doing groupbykey + map try reducebykey
 + mapvalues.

 Let me know if that helped.

 Pawel Szulc
 http://rabbitonweb.com

 sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik <
 arun.lut...@gmail.com> napisał:

 So, actually I am removing the persist for now, because there is
> significant filtering that happens after calling textFile()... but I will
> keep that option in mind.
>
> I just tried a few different combinations of number of executors,
> executor memory, and more importantly, number of tasks... *all three
> times it failed when approximately 75.1% of the tasks were completed (no
> matter how many tasks resulted from repartitioning the data in
> textfile(..., N))*. Surely this is a strong clue to something?
>
>
>
> On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz  wrote:
>
>> Hi,
>>
>> Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER`
>> generates many small objects that lead to very long GC time, causing the
>> executor losts, heartbeat not received, and GC overhead limit exceeded
>> messages.
>> Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can
>> also try `OFF_HEAP` (and use Tachyon).
>>
>> Burak
>>
>> On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra 
>> wrote:
>>
>>> My program in pseudocode looks like this:
>>>
>>> val conf = new SparkConf().setAppName("Test")
>>>   .set("spark.storage.memoryFraction","0.2") // default 0.6
>>>   .set("spark.shuffle.memoryFraction","0.12") // default 0.2
>>>   .set("spark.shuffle.manager","SORT") // preferred setting for
>>> optimized joins
>>>   .set("spark.shuffle.consolidateFiles","true") // helpful for
>>> "too many files open"
>>>   .set("spark.mesos.coarse", "true") // helpful for
>>> MapOutputTracker errors?
>>>   .set("spark.akka.frameSize","500") // helpful when using
>>> consildateFiles=true
>>>   .set("spark.akka.askTimeout", "30")
>>>   .set("spark.shuffle.compress","false") //
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>>>   .set("spark.file.transferTo","false") //
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>>>   .set("spark.core.connection.ack.wait.timeout","600") //
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>>>   .set("spark.speculation","true")
>>>   .set("spark.worker.timeout","600") //
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3

Re: Problem getting program to run on 15TB input

2015-02-28 Thread Paweł Szulc
But groupbykey will repartition according to numer of keys as I understand
how it works. How do you know that you haven't reached the groupbykey
phase? Are you using a profiler or do yoi base that assumption only on logs?

sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik 
napisał:

> A correction to my first post:
>
> There is also a repartition right before groupByKey to help avoid
> too-many-open-files error:
>
>
> rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile()
>
> On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra 
> wrote:
>
>> The job fails before getting to groupByKey.
>>
>> I see a lot of timeout errors in the yarn logs, like:
>>
>> 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1 attempts
>> akka.pattern.AskTimeoutException: Timed out
>>
>> and
>>
>> 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2 attempts
>> java.util.concurrent.TimeoutException: Futures timed out after [30
>> seconds]
>>
>> and some of these are followed by:
>>
>> 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver
>> Disassociated [akka.tcp://sparkExecutor@...] -> [akka.tcp://sparkDriver@...]
>> disassociated! Shutting down.
>> 15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0 in
>> stage 1.0 (TID 336601)
>> java.io.FileNotFoundException:
>> /hadoop/yarn/local//spark-local-20150228123450-3a71/36/shuffle_0_421027_0
>> (No such file or directory)
>>
>>
>>
>>
>> On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc 
>> wrote:
>>
>>> I would first check whether  there is any possibility that after doing
>>> groupbykey one of the groups does not fit in one of the executors' memory.
>>>
>>> To back up my theory, instead of doing groupbykey + map try reducebykey
>>> + mapvalues.
>>>
>>> Let me know if that helped.
>>>
>>> Pawel Szulc
>>> http://rabbitonweb.com
>>>
>>> sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik 
>>> napisał:
>>>
>>> So, actually I am removing the persist for now, because there is
 significant filtering that happens after calling textFile()... but I will
 keep that option in mind.

 I just tried a few different combinations of number of executors,
 executor memory, and more importantly, number of tasks... *all three
 times it failed when approximately 75.1% of the tasks were completed (no
 matter how many tasks resulted from repartitioning the data in
 textfile(..., N))*. Surely this is a strong clue to something?



 On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz  wrote:

> Hi,
>
> Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER`
> generates many small objects that lead to very long GC time, causing the
> executor losts, heartbeat not received, and GC overhead limit exceeded
> messages.
> Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can
> also try `OFF_HEAP` (and use Tachyon).
>
> Burak
>
> On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra 
> wrote:
>
>> My program in pseudocode looks like this:
>>
>> val conf = new SparkConf().setAppName("Test")
>>   .set("spark.storage.memoryFraction","0.2") // default 0.6
>>   .set("spark.shuffle.memoryFraction","0.12") // default 0.2
>>   .set("spark.shuffle.manager","SORT") // preferred setting for
>> optimized joins
>>   .set("spark.shuffle.consolidateFiles","true") // helpful for
>> "too many files open"
>>   .set("spark.mesos.coarse", "true") // helpful for
>> MapOutputTracker errors?
>>   .set("spark.akka.frameSize","500") // helpful when using
>> consildateFiles=true
>>   .set("spark.akka.askTimeout", "30")
>>   .set("spark.shuffle.compress","false") //
>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>>   .set("spark.file.transferTo","false") //
>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>>   .set("spark.core.connection.ack.wait.timeout","600") //
>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>>   .set("spark.speculation","true")
>>   .set("spark.worker.timeout","600") //
>> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
>>   .set("spark.akka.timeout","300") //
>> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
>>   .set("spark.storage.blockManagerSlaveTimeoutMs","12")
>>   .set("spark.driver.maxResultSize","2048") // in response to
>> error: Total size of serialized results of 39901 tasks (1024.0 MB) is
>> bigger than spark.driver.maxResultSize (1024.0 MB)
>>   .set("spark.serializer",
>> "org.apache.spark.serializer.KryoSerializer")
>>
>> .set("spark.kryo.registrator","com.att.bdcoe.cip.ooh.MyRegistrator")

Re: getting this error while runing

2015-02-28 Thread shahid
Also the data file is on hdfs



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/getting-this-error-while-runing-tp21860p21861.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Problem getting program to run on 15TB input

2015-02-28 Thread Arun Luthra
A correction to my first post:

There is also a repartition right before groupByKey to help avoid
too-many-open-files error:

rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile()

On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra  wrote:

> The job fails before getting to groupByKey.
>
> I see a lot of timeout errors in the yarn logs, like:
>
> 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1 attempts
> akka.pattern.AskTimeoutException: Timed out
>
> and
>
> 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2 attempts
> java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
>
> and some of these are followed by:
>
> 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver
> Disassociated [akka.tcp://sparkExecutor@...] -> [akka.tcp://sparkDriver@...]
> disassociated! Shutting down.
> 15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0 in
> stage 1.0 (TID 336601)
> java.io.FileNotFoundException:
> /hadoop/yarn/local//spark-local-20150228123450-3a71/36/shuffle_0_421027_0
> (No such file or directory)
>
>
>
>
> On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc  wrote:
>
>> I would first check whether  there is any possibility that after doing
>> groupbykey one of the groups does not fit in one of the executors' memory.
>>
>> To back up my theory, instead of doing groupbykey + map try reducebykey +
>> mapvalues.
>>
>> Let me know if that helped.
>>
>> Pawel Szulc
>> http://rabbitonweb.com
>>
>> sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik 
>> napisał:
>>
>> So, actually I am removing the persist for now, because there is
>>> significant filtering that happens after calling textFile()... but I will
>>> keep that option in mind.
>>>
>>> I just tried a few different combinations of number of executors,
>>> executor memory, and more importantly, number of tasks... *all three
>>> times it failed when approximately 75.1% of the tasks were completed (no
>>> matter how many tasks resulted from repartitioning the data in
>>> textfile(..., N))*. Surely this is a strong clue to something?
>>>
>>>
>>>
>>> On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz  wrote:
>>>
 Hi,

 Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER`
 generates many small objects that lead to very long GC time, causing the
 executor losts, heartbeat not received, and GC overhead limit exceeded
 messages.
 Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can
 also try `OFF_HEAP` (and use Tachyon).

 Burak

 On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra 
 wrote:

> My program in pseudocode looks like this:
>
> val conf = new SparkConf().setAppName("Test")
>   .set("spark.storage.memoryFraction","0.2") // default 0.6
>   .set("spark.shuffle.memoryFraction","0.12") // default 0.2
>   .set("spark.shuffle.manager","SORT") // preferred setting for
> optimized joins
>   .set("spark.shuffle.consolidateFiles","true") // helpful for
> "too many files open"
>   .set("spark.mesos.coarse", "true") // helpful for
> MapOutputTracker errors?
>   .set("spark.akka.frameSize","500") // helpful when using
> consildateFiles=true
>   .set("spark.akka.askTimeout", "30")
>   .set("spark.shuffle.compress","false") //
> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>   .set("spark.file.transferTo","false") //
> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>   .set("spark.core.connection.ack.wait.timeout","600") //
> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>   .set("spark.speculation","true")
>   .set("spark.worker.timeout","600") //
> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
>   .set("spark.akka.timeout","300") //
> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
>   .set("spark.storage.blockManagerSlaveTimeoutMs","12")
>   .set("spark.driver.maxResultSize","2048") // in response to
> error: Total size of serialized results of 39901 tasks (1024.0 MB) is
> bigger than spark.driver.maxResultSize (1024.0 MB)
>   .set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
>
> .set("spark.kryo.registrator","com.att.bdcoe.cip.ooh.MyRegistrator")
>   .set("spark.kryo.registrationRequired", "true")
>
> val rdd1 = sc.textFile(file1).persist(StorageLevel
> .MEMORY_AND_DISK_SER).map(_.split("\\|", -1)...filter(...)
>
> val rdd2 =
> sc.textFile(file2).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split("\\|",
> -1)...filter(...)
>
>
> rdd2.union(rdd1).map(...).filter(...).groupByKey().map(...).flatMap(...).sav

Re: Problem getting program to run on 15TB input

2015-02-28 Thread Arun Luthra
The job fails before getting to groupByKey.

I see a lot of timeout errors in the yarn logs, like:

15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1 attempts
akka.pattern.AskTimeoutException: Timed out

and

15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2 attempts
java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]

and some of these are followed by:

15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver
Disassociated [akka.tcp://sparkExecutor@...] -> [akka.tcp://sparkDriver@...]
disassociated! Shutting down.
15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0 in
stage 1.0 (TID 336601)
java.io.FileNotFoundException:
/hadoop/yarn/local//spark-local-20150228123450-3a71/36/shuffle_0_421027_0
(No such file or directory)




On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc  wrote:

> I would first check whether  there is any possibility that after doing
> groupbykey one of the groups does not fit in one of the executors' memory.
>
> To back up my theory, instead of doing groupbykey + map try reducebykey +
> mapvalues.
>
> Let me know if that helped.
>
> Pawel Szulc
> http://rabbitonweb.com
>
> sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik 
> napisał:
>
> So, actually I am removing the persist for now, because there is
>> significant filtering that happens after calling textFile()... but I will
>> keep that option in mind.
>>
>> I just tried a few different combinations of number of executors,
>> executor memory, and more importantly, number of tasks... *all three
>> times it failed when approximately 75.1% of the tasks were completed (no
>> matter how many tasks resulted from repartitioning the data in
>> textfile(..., N))*. Surely this is a strong clue to something?
>>
>>
>>
>> On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz  wrote:
>>
>>> Hi,
>>>
>>> Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER`
>>> generates many small objects that lead to very long GC time, causing the
>>> executor losts, heartbeat not received, and GC overhead limit exceeded
>>> messages.
>>> Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can
>>> also try `OFF_HEAP` (and use Tachyon).
>>>
>>> Burak
>>>
>>> On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra 
>>> wrote:
>>>
 My program in pseudocode looks like this:

 val conf = new SparkConf().setAppName("Test")
   .set("spark.storage.memoryFraction","0.2") // default 0.6
   .set("spark.shuffle.memoryFraction","0.12") // default 0.2
   .set("spark.shuffle.manager","SORT") // preferred setting for
 optimized joins
   .set("spark.shuffle.consolidateFiles","true") // helpful for "too
 many files open"
   .set("spark.mesos.coarse", "true") // helpful for
 MapOutputTracker errors?
   .set("spark.akka.frameSize","500") // helpful when using
 consildateFiles=true
   .set("spark.akka.askTimeout", "30")
   .set("spark.shuffle.compress","false") //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set("spark.file.transferTo","false") //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set("spark.core.connection.ack.wait.timeout","600") //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set("spark.speculation","true")
   .set("spark.worker.timeout","600") //
 http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
   .set("spark.akka.timeout","300") //
 http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
   .set("spark.storage.blockManagerSlaveTimeoutMs","12")
   .set("spark.driver.maxResultSize","2048") // in response to
 error: Total size of serialized results of 39901 tasks (1024.0 MB) is
 bigger than spark.driver.maxResultSize (1024.0 MB)
   .set("spark.serializer",
 "org.apache.spark.serializer.KryoSerializer")

 .set("spark.kryo.registrator","com.att.bdcoe.cip.ooh.MyRegistrator")
   .set("spark.kryo.registrationRequired", "true")

 val rdd1 = 
 sc.textFile(file1).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split("\\|",
 -1)...filter(...)

 val rdd2 =
 sc.textFile(file2).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split("\\|",
 -1)...filter(...)


 rdd2.union(rdd1).map(...).filter(...).groupByKey().map(...).flatMap(...).saveAsTextFile()


 I run the code with:
   --num-executors 500 \
   --driver-memory 20g \
   --executor-memory 20g \
   --executor-cores 32 \


 I'm using kryo serialization on everything, including broadcast
 variables.

 Spark creates 145k tasks, and the first stage includes everything
 before groupByKey(). It fails before getting to groupByKey. I have tried

getting this error while runing

2015-02-28 Thread shahid

conf =
SparkConf().setAppName("spark_calc3merged").setMaster("spark://ec2-54-145-68-13.compute-1.amazonaws.com:7077")
sc =
SparkContext(conf=conf,pyFiles=["/root/platinum.py","/root/collections2.py"])
  
15/02/28 19:06:38 WARN scheduler.TaskSetManager: Lost task 5.0 in stage 3.0
(TID 38, ip-10-80-15-145.ec2.internal):
com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0,
required: 73065
com.esotericsoftware.kryo.io.Output.require(Output.java:138)
com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206)
   
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29)
   
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18)
com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
   
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:312)
   
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
   
org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:156)
   
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
   
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
   
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
15/02/28 19:06:38 INFO scheduler.TaskSetManager: Starting task 5.1 in stage
3.0 (TID 44, ip-10-80-15-145.ec2.internal, NODE_LOCAL, 1502 bytes)
15/02/28 19:06:38 INFO scheduler.TaskSetManager: Finished task 8.0 in stage
3.0 (TID 41) in 7040 ms on ip-10-80-98-118.ec2.internal (9/11)
15/02/28 19:06:38 INFO scheduler.TaskSetManager: Finished task 9.0 in stage
3.0 (TID 42) in 7847 ms on ip-10-80-15-145.ec2.internal (10/11)
15/02/28 19:06:50 WARN scheduler.TaskSetManager: Lost task 5.1 in stage 3.0
(TID 44, ip-10-80-15-145.ec2.internal):
com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0,
required: 73065
com.esotericsoftware.kryo.io.Output.require(Output.java:138)
com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206)
   
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29)
   
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18)
com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
   
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:312)
   
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
   
org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:156)
   
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
   
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
   
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
15/02/28 19:06:50 INFO scheduler.TaskSetManager: Starting task 5.2 in stage
3.0 (TID 45, ip-10-80-98-118.ec2.internal, NODE_LOCAL, 1502 bytes)
15/02/28 19:07:01 WARN scheduler.TaskSetManager: Lost task 5.2 in stage 3.0
(TID 45, ip-10-80-98-118.ec2.internal):
com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0,
required: 73065
com.esotericsoftware.kryo.io.Output.require(Output.java:138)
com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206)
   
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29)
   
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18)
com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
   
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:312)
   
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
   
org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:156)
   
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
   

Re: Getting to proto buff classes in Spark Context

2015-02-28 Thread John Meehan
Maybe try including the jar with 

--driver-class-path 



> On Feb 26, 2015, at 12:16 PM, Akshat Aranya  wrote:
> 
> My guess would be that you are packaging too many things in your job, which 
> is causing problems with the classpath.  When your jar goes in first, you get 
> the correct version of protobuf, but some other version of something else.  
> When your jar goes in later, other things work, but protobuf breaks.  This is 
> just a guess though; take a look at what you're packaging in your jar and 
> look for things that Spark or Kafka could also be using.
> 
>> On Thu, Feb 26, 2015 at 10:06 AM, necro351 .  wrote:
>> Hello everyone,
>> 
>> We are trying to decode a message inside a Spark job that we receive from 
>> Kafka. The message is encoded using Proto Buff. The problem is when decoding 
>> we get class-not-found exceptions. We have tried remedies we found online in 
>> Stack Exchange and mail list archives but nothing seems to work.
>> 
>> (This question is a re-ask, but we really cannot figure this one out.)
>> 
>> We created a standalone repository with a very simple Spark job that 
>> exhibits the above issues. The spark job reads the messages from the FS, 
>> decodes them, and prints them. Its easy to checkout and try to see the 
>> exception yourself: just uncomment the code that prints the messages from 
>> within the RDD. The only sources are the generated Proto Buff java sources 
>> and a small Spark Job that decodes a message. I'd appreciate if anyone could 
>> take a look.
>> 
>> https://github.com/vibhav/spark-protobuf
>> 
>> We tried a couple remedies already.
>> 
>> Setting "spark.files.userClassPathFirst" didn't fix the problem for us. I am 
>> not very familiar with the Spark and Scala environment, so please correct 
>> any incorrect assumptions or statements I make.
>> 
>> However, I don't believe this to be a classpath visibility issue. I wrote a 
>> small helper method to print out the classpath from both the driver and 
>> worker, and the output is identical. (I'm printing out 
>> System.getProperty("java.class.path") -- is there a better way to do this or 
>> check the class path?). You can print out the class paths the same way we 
>> are from the example project above.
>> 
>> Furthermore, userClassPathFirst seems to have a detrimental effect on 
>> otherwise working code, which I cannot explain or do not understand. 
>> 
>> For example, I created a trivial RDD as such:
>> 
>> val l = List(1, 2, 3)
>> sc.makeRDD(l).foreach((x: Int) => {
>> println(x.toString)
>> })
>> 
>> With userClassPathFirst set, I encounter a java.lang.ClassCastException 
>> trying to execute that code. Is that to be expected? You can re-create this 
>> issue by commenting out the block of code that tries to print the above in 
>> the example project we linked to above.
>> 
>> We also tried dynamically adding the jar with .addJar to the Spark Context 
>> but this seemed to have no effect.
>> 
>> Thanks in advance for any help y'all can provide.
> 


Re: How to debug a Hung task

2015-02-28 Thread Michael Albert
For what it's worth, I was seeing mysterious hangs, but it went away when 
upgrading from spark1.2 to 1.2.1.I don't know if this is your problem.Also, I'm 
using AWS EMR images, which were also "upgraded".
Anyway, that's my experience.
-Mike

  From: Manas Kar 
 To: "user@spark.apache.org"  
 Sent: Friday, February 27, 2015 3:50 PM
 Subject: How to debug a Hung task
   
Hi,  I have a spark application that hangs on doing just one task (Rest 200-300 
task gets completed in reasonable time)I can see in the Thread dump which 
function gets stuck how ever I don't have a clue as to what value is causing 
that behaviour.Also, logging the inputs before the function is executed does 
not help as the actual message gets buried in logs.
How do one go about debugging such case?Also, is there a way I can wrap my 
function inside some sort of timer based environment and if it took too long I 
would throw a stack trace or some sort. 
ThanksManas

  

How to debug a hung spark application

2015-02-28 Thread manasdebashiskar
Hi,
 I have a spark application that hangs on doing just one task (Rest 200-300
task gets completed in reasonable time)
I can see in the Thread dump which function gets stuck how ever I don't
have a clue as to what value is causing that behaviour.
Also, logging the inputs before the function is executed does not help as
the actual message gets buried in logs.

How do one go about debugging such case?
Also, is there a way I can wrap my function inside some sort of timer based
environment and if it took too long I would throw a stack trace or some
sort.

Thanks




-
Manas Kar
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-debug-a-hung-spark-application-tp21859.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Problem getting program to run on 15TB input

2015-02-28 Thread Paweł Szulc
I would first check whether  there is any possibility that after doing
groupbykey one of the groups does not fit in one of the executors' memory.

To back up my theory, instead of doing groupbykey + map try reducebykey +
mapvalues.

Let me know if that helped.

Pawel Szulc
http://rabbitonweb.com

sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik 
napisał:

> So, actually I am removing the persist for now, because there is
> significant filtering that happens after calling textFile()... but I will
> keep that option in mind.
>
> I just tried a few different combinations of number of executors, executor
> memory, and more importantly, number of tasks... *all three times it
> failed when approximately 75.1% of the tasks were completed (no matter how
> many tasks resulted from repartitioning the data in textfile(..., N))*.
> Surely this is a strong clue to something?
>
>
>
> On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz  wrote:
>
>> Hi,
>>
>> Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER`
>> generates many small objects that lead to very long GC time, causing the
>> executor losts, heartbeat not received, and GC overhead limit exceeded
>> messages.
>> Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can also
>> try `OFF_HEAP` (and use Tachyon).
>>
>> Burak
>>
>> On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra 
>> wrote:
>>
>>> My program in pseudocode looks like this:
>>>
>>> val conf = new SparkConf().setAppName("Test")
>>>   .set("spark.storage.memoryFraction","0.2") // default 0.6
>>>   .set("spark.shuffle.memoryFraction","0.12") // default 0.2
>>>   .set("spark.shuffle.manager","SORT") // preferred setting for
>>> optimized joins
>>>   .set("spark.shuffle.consolidateFiles","true") // helpful for "too
>>> many files open"
>>>   .set("spark.mesos.coarse", "true") // helpful for MapOutputTracker
>>> errors?
>>>   .set("spark.akka.frameSize","500") // helpful when using
>>> consildateFiles=true
>>>   .set("spark.akka.askTimeout", "30")
>>>   .set("spark.shuffle.compress","false") //
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>>>   .set("spark.file.transferTo","false") //
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>>>   .set("spark.core.connection.ack.wait.timeout","600") //
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>>>   .set("spark.speculation","true")
>>>   .set("spark.worker.timeout","600") //
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
>>>   .set("spark.akka.timeout","300") //
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
>>>   .set("spark.storage.blockManagerSlaveTimeoutMs","12")
>>>   .set("spark.driver.maxResultSize","2048") // in response to error:
>>> Total size of serialized results of 39901 tasks (1024.0 MB) is bigger than
>>> spark.driver.maxResultSize (1024.0 MB)
>>>   .set("spark.serializer",
>>> "org.apache.spark.serializer.KryoSerializer")
>>>
>>> .set("spark.kryo.registrator","com.att.bdcoe.cip.ooh.MyRegistrator")
>>>   .set("spark.kryo.registrationRequired", "true")
>>>
>>> val rdd1 = 
>>> sc.textFile(file1).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split("\\|",
>>> -1)...filter(...)
>>>
>>> val rdd2 =
>>> sc.textFile(file2).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split("\\|",
>>> -1)...filter(...)
>>>
>>>
>>> rdd2.union(rdd1).map(...).filter(...).groupByKey().map(...).flatMap(...).saveAsTextFile()
>>>
>>>
>>> I run the code with:
>>>   --num-executors 500 \
>>>   --driver-memory 20g \
>>>   --executor-memory 20g \
>>>   --executor-cores 32 \
>>>
>>>
>>> I'm using kryo serialization on everything, including broadcast
>>> variables.
>>>
>>> Spark creates 145k tasks, and the first stage includes everything before
>>> groupByKey(). It fails before getting to groupByKey. I have tried doubling
>>> and tripling the number of partitions when calling textFile, with no
>>> success.
>>>
>>> Very similar code (trivial changes, to accomodate different input)
>>> worked on a smaller input (~8TB)... Not that it was easy to get that
>>> working.
>>>
>>>
>>>
>>> Errors vary, here is what I am getting right now:
>>>
>>> ERROR SendingConnection: Exception while reading SendingConnection
>>> ... java.nio.channels.ClosedChannelException
>>> (^ guessing that is symptom of something else)
>>>
>>> WARN BlockManagerMasterActor: Removing BlockManager
>>> BlockManagerId(...) with no recent heart beats: 120030ms exceeds 12ms
>>> (^ guessing that is symptom of something else)
>>>
>>> ERROR ActorSystemImpl: Uncaught fatal error from thread (...) shutting
>>> down ActorSystem [sparkDriver]
>>> *java.lang.OutOfMemoryError: GC overhead limit exceeded*
>>>
>>>
>>>
>>> Other times I will get messages about "executor lost..." about 1 message
>>> per second, after ~~50k tasks complete, unti

Re: Unable to find org.apache.spark.sql.catalyst.ScalaReflection class

2015-02-28 Thread Ted Yu
Have you verified that spark-catalyst_2.10 jar was in the classpath ?

Cheers

On Sat, Feb 28, 2015 at 9:18 AM, Ashish Nigam 
wrote:

> Hi,
> I wrote a very simple program in scala to convert an existing RDD to
> SchemaRDD.
> But createSchemaRDD function is throwing exception
>
> Exception in thread "main" scala.ScalaReflectionException: class
> org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with primordial
> classloader with boot classpath [.] not found
>
>
> Here's more info on the versions I am using -
>
> 2.11
> 1.2.1
> 2.11.5
>
> Please let me know how can I resolve this problem.
>
> Thanks
> Ashish
>


Re: Problem getting program to run on 15TB input

2015-02-28 Thread Arun Luthra
So, actually I am removing the persist for now, because there is
significant filtering that happens after calling textFile()... but I will
keep that option in mind.

I just tried a few different combinations of number of executors, executor
memory, and more importantly, number of tasks... *all three times it failed
when approximately 75.1% of the tasks were completed (no matter how many
tasks resulted from repartitioning the data in textfile(..., N))*. Surely
this is a strong clue to something?



On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz  wrote:

> Hi,
>
> Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER` generates
> many small objects that lead to very long GC time, causing the executor
> losts, heartbeat not received, and GC overhead limit exceeded messages.
> Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can also
> try `OFF_HEAP` (and use Tachyon).
>
> Burak
>
> On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra 
> wrote:
>
>> My program in pseudocode looks like this:
>>
>> val conf = new SparkConf().setAppName("Test")
>>   .set("spark.storage.memoryFraction","0.2") // default 0.6
>>   .set("spark.shuffle.memoryFraction","0.12") // default 0.2
>>   .set("spark.shuffle.manager","SORT") // preferred setting for
>> optimized joins
>>   .set("spark.shuffle.consolidateFiles","true") // helpful for "too
>> many files open"
>>   .set("spark.mesos.coarse", "true") // helpful for MapOutputTracker
>> errors?
>>   .set("spark.akka.frameSize","500") // helpful when using
>> consildateFiles=true
>>   .set("spark.akka.askTimeout", "30")
>>   .set("spark.shuffle.compress","false") //
>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>>   .set("spark.file.transferTo","false") //
>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>>   .set("spark.core.connection.ack.wait.timeout","600") //
>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>>   .set("spark.speculation","true")
>>   .set("spark.worker.timeout","600") //
>> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
>>   .set("spark.akka.timeout","300") //
>> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
>>   .set("spark.storage.blockManagerSlaveTimeoutMs","12")
>>   .set("spark.driver.maxResultSize","2048") // in response to error:
>> Total size of serialized results of 39901 tasks (1024.0 MB) is bigger than
>> spark.driver.maxResultSize (1024.0 MB)
>>   .set("spark.serializer",
>> "org.apache.spark.serializer.KryoSerializer")
>>   .set("spark.kryo.registrator","com.att.bdcoe.cip.ooh.MyRegistrator")
>>   .set("spark.kryo.registrationRequired", "true")
>>
>> val rdd1 = 
>> sc.textFile(file1).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split("\\|",
>> -1)...filter(...)
>>
>> val rdd2 =
>> sc.textFile(file2).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split("\\|",
>> -1)...filter(...)
>>
>>
>> rdd2.union(rdd1).map(...).filter(...).groupByKey().map(...).flatMap(...).saveAsTextFile()
>>
>>
>> I run the code with:
>>   --num-executors 500 \
>>   --driver-memory 20g \
>>   --executor-memory 20g \
>>   --executor-cores 32 \
>>
>>
>> I'm using kryo serialization on everything, including broadcast
>> variables.
>>
>> Spark creates 145k tasks, and the first stage includes everything before
>> groupByKey(). It fails before getting to groupByKey. I have tried doubling
>> and tripling the number of partitions when calling textFile, with no
>> success.
>>
>> Very similar code (trivial changes, to accomodate different input) worked
>> on a smaller input (~8TB)... Not that it was easy to get that working.
>>
>>
>>
>> Errors vary, here is what I am getting right now:
>>
>> ERROR SendingConnection: Exception while reading SendingConnection
>> ... java.nio.channels.ClosedChannelException
>> (^ guessing that is symptom of something else)
>>
>> WARN BlockManagerMasterActor: Removing BlockManager
>> BlockManagerId(...) with no recent heart beats: 120030ms exceeds 12ms
>> (^ guessing that is symptom of something else)
>>
>> ERROR ActorSystemImpl: Uncaught fatal error from thread (...) shutting
>> down ActorSystem [sparkDriver]
>> *java.lang.OutOfMemoryError: GC overhead limit exceeded*
>>
>>
>>
>> Other times I will get messages about "executor lost..." about 1 message
>> per second, after ~~50k tasks complete, until there are almost no executors
>> left and progress slows to nothing.
>>
>> I ran with verbose GC info; I do see failing yarn containers that have
>> multiple (like 30) "Full GC" messages but I don't know how to interpret if
>> that is the problem. Typical Full GC time taken seems ok: [Times:
>> user=23.30 sys=0.06, real=1.94 secs]
>>
>>
>>
>> Suggestions, please?
>>
>> Huge thanks for useful suggestions,
>> Arun
>>
>
>


Unable to find org.apache.spark.sql.catalyst.ScalaReflection class

2015-02-28 Thread Ashish Nigam
Hi,
I wrote a very simple program in scala to convert an existing RDD to
SchemaRDD.
But createSchemaRDD function is throwing exception

Exception in thread "main" scala.ScalaReflectionException: class
org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with primordial
classloader with boot classpath [.] not found


Here's more info on the versions I am using -

2.11
1.2.1
2.11.5

Please let me know how can I resolve this problem.

Thanks
Ashish


Scalable JDBCRDD

2015-02-28 Thread Michal Klos
Hi Spark community,

We have a use case where we need to pull huge amounts of data from a SQL
query against a database into Spark. We need to execute the query against
our huge database and not a substitute (SparkSQL, Hive, etc) because of a
couple of factors including custom functions used in the queries that only
our database has.

We started by looking at JDBC RDD, which utilizes a prepared statement with
two parameters that are meant to be used to partition the result set to the
workers... e.g.:

select * from table limit ?,?

turns into

select * from table limit 1,100 on worker 1
select * from table limit 101,200 on worker 2

This will not work for us because our database cannot support multiple
execution of these queries without being crippled. But, additionally, our
database doesn't support the above LIMIT syntax and we don't have a generic
way of partitioning the various queries.

As a result -- we stated by forking JDBCRDD and made a version that
executes the SQL query once in getPartitions into a Vector and then hands
each worker node an index and iterator. Here's a snippet of getPartitions
and compute:

  override def getPartitions: Array[Partition] = {
//Compute the DB query once here
val results = computeQuery

(0 until numPartitions).map(i => {
  // TODO: would be better to do this partitioning when scrolling
through result set if still loading into memory
  val partitionItems = results.drop(i).sliding(1,
numPartitions).flatten.toVector
  new DBPartition(i, partitionItems)
}).toArray
  }

  override def compute(thePart: Partition, context: TaskContext) = new
NextIterator[T] {
val part = thePart.asInstanceOf[DBPartition[T]]

//Shift the result vector to our index number and then do a
sliding iterator over it
val iterator = part.items.iterator

override def getNext : T = {
  if (iterator.hasNext) {
iterator.next()
  } else {
finished = true
null.asInstanceOf[T]
  }
}

override def close: Unit = ()
  }

This is a little better since we can just execute the query once.
However, the result-set needs to fit in memory.

We've been trying to brainstorm a way to

A) have that result set distribute out to the worker RDD partitions as
it's streaming in from the cursor?
B) have the result set spill to disk if it exceeds memory and do
something clever around the iterators?
C) something else?

We're not familiar enough yet with all of the workings of Spark to
know how to proceed on this.

We also thought of the worker-around of having the DB query dump to
HDFS/S3 and then pick it up for there, but it adds more moving parts
and latency to our processing.

Does anyone have a clever suggestion? Are we missing something?

thanks,
Michal


Re: Failed to parse Hive query

2015-02-28 Thread Anusha Shamanur
Hi,
I reconfigured everything. Still facing the same issue.
Can someone please help?

On Friday, February 27, 2015, Anusha Shamanur  wrote:

> I do.
> What tags should I change in this?
> I changed the value of hive.exec.scratchdir to /tmp/hive.
> What else?
>
> On Fri, Feb 27, 2015 at 2:14 PM, Michael Armbrust  > wrote:
>
>> Do you have a hive-site.xml file or a core-site.xml file?  Perhaps
>> something is misconfigured there?
>>
>> On Fri, Feb 27, 2015 at 7:17 AM, Anusha Shamanur > > wrote:
>>
>>> Hi,
>>>
>>> I am trying to do this in spark-shell:
>>>
>>> val hiveCtx = neworg.apache.spark.sql.hive.HiveContext(sc) val listTables 
>>> =hiveCtx.hql("show tables")
>>>
>>> The second line fails to execute with this message:
>>>
>>> warning: there were 1 deprecation warning(s); re-run with -deprecation
>>> for details org.apache.spark.sql.hive.HiveQl$ParseException: Failed to
>>> parse: show tables at
>>> org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:239) at
>>> org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:50)
>>> at
>>> org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:49)
>>> at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at
>>> scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at
>>> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
>>>
>>> ... at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>> Caused by: java.lang.NullPointerException: Conf non-local session path
>>> expected to be non-null at
>>> com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
>>> at
>>> org.apache.hadoop.hive.ql.session.SessionState.getHDFSSessionPath(SessionState.java:586)
>>> at org.apache.hadoop.hive.ql.Context.(Context.java:129) at
>>> org.apache.hadoop.hive.ql.Context.(Context.java:116) at
>>> org.apache.spark.sql.hive.HiveQl$.getAst(HiveQl.scala:227) at
>>> org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:240) ... 87 more
>>>
>>>
>>> Any help would be appreciated.
>>>
>>>
>>>
>>> --
>>> Sent from Gmail mobile
>>>
>>
>>
>
>
> --
> Regards,
> Anusha
>


-- 
Sent from Gmail mobile


Re: bitten by spark.yarn.executor.memoryOverhead

2015-02-28 Thread Ted Yu
Having good out-of-box experience is desirable.

+1 on increasing the default.


On Sat, Feb 28, 2015 at 8:27 AM, Sean Owen  wrote:

> There was a recent discussion about whether to increase or indeed make
> configurable this kind of default fraction. I believe the suggestion
> there too was that 9-10% is a safer default.
>
> Advanced users can lower the resulting overhead value; it may still
> have to be increased in some cases, but a fatter default may make this
> kind of surprise less frequent.
>
> I'd support increasing the default; any other thoughts?
>
> On Sat, Feb 28, 2015 at 3:34 PM, Koert Kuipers  wrote:
> > hey,
> > running my first map-red like (meaning disk-to-disk, avoiding in memory
> > RDDs) computation in spark on yarn i immediately got bitten by a too low
> > spark.yarn.executor.memoryOverhead. however it took me about an hour to
> find
> > out this was the cause. at first i observed failing shuffles leading to
> > restarting of tasks, then i realized this was because executors could
> not be
> > reached, then i noticed in containers got shut down and reallocated in
> > resourcemanager logs (no mention of errors, it seemed the containers
> > finished their business and shut down successfully), and finally i found
> the
> > reason in nodemanager logs.
> >
> > i dont think this is a pleasent first experience. i realize
> > spark.yarn.executor.memoryOverhead needs to be set differently from
> > situation to situation. but shouldnt the default be a somewhat higher
> value
> > so that these errors are unlikely, and then the experts that are willing
> to
> > deal with these errors can tune it lower? so why not make the default 10%
> > instead of 7%? that gives something that works in most situations out of
> the
> > box (at the cost of being a little wasteful). it worked for me.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: bitten by spark.yarn.executor.memoryOverhead

2015-02-28 Thread Sean Owen
There was a recent discussion about whether to increase or indeed make
configurable this kind of default fraction. I believe the suggestion
there too was that 9-10% is a safer default.

Advanced users can lower the resulting overhead value; it may still
have to be increased in some cases, but a fatter default may make this
kind of surprise less frequent.

I'd support increasing the default; any other thoughts?

On Sat, Feb 28, 2015 at 3:34 PM, Koert Kuipers  wrote:
> hey,
> running my first map-red like (meaning disk-to-disk, avoiding in memory
> RDDs) computation in spark on yarn i immediately got bitten by a too low
> spark.yarn.executor.memoryOverhead. however it took me about an hour to find
> out this was the cause. at first i observed failing shuffles leading to
> restarting of tasks, then i realized this was because executors could not be
> reached, then i noticed in containers got shut down and reallocated in
> resourcemanager logs (no mention of errors, it seemed the containers
> finished their business and shut down successfully), and finally i found the
> reason in nodemanager logs.
>
> i dont think this is a pleasent first experience. i realize
> spark.yarn.executor.memoryOverhead needs to be set differently from
> situation to situation. but shouldnt the default be a somewhat higher value
> so that these errors are unlikely, and then the experts that are willing to
> deal with these errors can tune it lower? so why not make the default 10%
> instead of 7%? that gives something that works in most situations out of the
> box (at the cost of being a little wasteful). it worked for me.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Upgrade to Spark 1.2.1 using Guava

2015-02-28 Thread Pat Ferrel
Maybe but any time the work around is to use "spark-submit --conf 
spark.executor.extraClassPath=/guava.jar blah” that means that standalone apps 
must have hard coded paths that are honored on every worker. And as you know a 
lib is pretty much blocked from use of this version of Spark—hence the blocker 
severity.

I could easily be wrong but userClassPathFirst doesn’t seem to be the issue. 
There is no class conflict.

On Feb 27, 2015, at 7:13 PM, Sean Owen  wrote:

This seems like a job for userClassPathFirst. Or could be. It's
definitely an issue of visibility between where the serializer is and
where the user class is.

At the top you said Pat that you didn't try this, but why not?

On Fri, Feb 27, 2015 at 10:11 PM, Pat Ferrel  wrote:
> I’ll try to find a Jira for it. I hope a fix is in 1.3
> 
> 
> On Feb 27, 2015, at 1:59 PM, Pat Ferrel  wrote:
> 
> Thanks! that worked.
> 
> On Feb 27, 2015, at 1:50 PM, Pat Ferrel  wrote:
> 
> I don’t use spark-submit I have a standalone app.
> 
> So I guess you want me to add that key/value to the conf in my code and make 
> sure it exists on workers.
> 
> 
> On Feb 27, 2015, at 1:47 PM, Marcelo Vanzin  wrote:
> 
> On Fri, Feb 27, 2015 at 1:42 PM, Pat Ferrel  wrote:
>> I changed in the spark master conf, which is also the only worker. I added a 
>> path to the jar that has guava in it. Still can’t find the class.
> 
> Sorry, I'm still confused about what config you're changing. I'm
> suggesting using:
> 
> spark-submit --conf spark.executor.extraClassPath=/guava.jar blah
> 
> 
> --
> Marcelo
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 
> 
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 
> 
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 
> 
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



bitten by spark.yarn.executor.memoryOverhead

2015-02-28 Thread Koert Kuipers
hey,
running my first map-red like (meaning disk-to-disk, avoiding in memory
RDDs) computation in spark on yarn i immediately got bitten by a too low
spark.yarn.executor.memoryOverhead. however it took me about an hour to
find out this was the cause. at first i observed failing shuffles leading
to restarting of tasks, then i realized this was because executors could
not be reached, then i noticed in containers got shut down and reallocated
in resourcemanager logs (no mention of errors, it seemed the containers
finished their business and shut down successfully), and finally i found
the reason in nodemanager logs.

i dont think this is a pleasent first experience. i realize
spark.yarn.executor.memoryOverhead needs to be set differently from
situation to situation. but shouldnt the default be a somewhat higher value
so that these errors are unlikely, and then the experts that are willing to
deal with these errors can tune it lower? so why not make the default 10%
instead of 7%? that gives something that works in most situations out of
the box (at the cost of being a little wasteful). it worked for me.


Reg. Difference in Performance

2015-02-28 Thread Deep Pradhan
Hi,
I am running Spark applications in GCE. I set up cluster with different
number of nodes varying from 1 to 7. The machines are single core machines.
I set the spark.default.parallelism to the number of nodes in the cluster
for each cluster. I ran the four applications available in Spark Examples,
SparkTC, SparkALS, SparkLR, SparkPi for each of the configurations.
What I notice is the following:
In case of SparkTC and SparkALS, the time to complete the job increases
with the increase in number of nodes in cluster, where as in SparkLR and
SparkPi, the time to complete the job remains the same across all the
configurations.
Could anyone explain me this?

Thank You
Regards,
Deep


SORT BY and ORDER BY file size v/s RAM size

2015-02-28 Thread DEVAN M.S.
*Hi devs,*

*Is there any connection between the input file size and RAM size for
sorting using SparkSQL ?*
*I tried 1 GB file with 8 GB RAM with 4 cores and got
java.lang.OutOfMemoryError: GC overhead limit exceeded.*
*Or could it  be for any other reason ? Its working for other SparkSQL
operations.*


15/02/28 16:33:03 INFO Utils: Successfully started service 'sparkDriver' on
port 41392.
15/02/28 16:33:03 INFO SparkEnv: Registering MapOutputTracker
15/02/28 16:33:03 INFO SparkEnv: Registering BlockManagerMaster
15/02/28 16:33:03 INFO DiskBlockManager: Created local directory at
/tmp/spark-ecf4d6f0-c526-48fa-bd8a-d74a8bf64820/spark-4865c193-05e6-4aa1-999b-ab8c426479ab
15/02/28 16:33:03 INFO MemoryStore: MemoryStore started with capacity 944.7
MB
15/02/28 16:33:03 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
15/02/28 16:33:03 INFO HttpFileServer: HTTP File server directory is
/tmp/spark-af545c0b-15e6-4efa-a151-2c73faba8948/spark-987f58b4-5735-4965-91d1-38f238f4bb11
15/02/28 16:33:03 INFO HttpServer: Starting HTTP Server
15/02/28 16:33:03 INFO Utils: Successfully started service 'HTTP file
server' on port 44588.
15/02/28 16:33:08 INFO Utils: Successfully started service 'SparkUI' on
port 4040.
15/02/28 16:33:08 INFO SparkUI: Started SparkUI at http://10.30.9.7:4040
15/02/28 16:33:08 INFO Executor: Starting executor ID  on host
localhost
15/02/28 16:33:08 INFO AkkaUtils: Connecting to HeartbeatReceiver:
akka.tcp://sparkDriver@10.30.9.7:41392/user/HeartbeatReceiver
15/02/28 16:33:08 INFO NettyBlockTransferService: Server created on 34475
15/02/28 16:33:08 INFO BlockManagerMaster: Trying to register BlockManager
15/02/28 16:33:08 INFO BlockManagerMasterActor: Registering block manager
localhost:34475 with 944.7 MB RAM, BlockManagerId(, localhost,
34475)
15/02/28 16:33:08 INFO BlockManagerMaster: Registered BlockManager
15/02/28 16:33:09 INFO MemoryStore: ensureFreeSpace(193213) called with
curMem=0, maxMem=990550425
15/02/28 16:33:09 INFO MemoryStore: Block broadcast_0 stored as values in
memory (estimated size 188.7 KB, free 944.5 MB)
15/02/28 16:33:09 INFO MemoryStore: ensureFreeSpace(25432) called with
curMem=193213, maxMem=990550425
15/02/28 16:33:09 INFO MemoryStore: Block broadcast_0_piece0 stored as
bytes in memory (estimated size 24.8 KB, free 944.5 MB)
15/02/28 16:33:09 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory
on localhost:34475 (size: 24.8 KB, free: 944.6 MB)
15/02/28 16:33:09 INFO BlockManagerMaster: Updated info of block
broadcast_0_piece0
15/02/28 16:33:09 INFO SparkContext: Created broadcast 0 from textFile at
SortSQL.scala:20
15/02/28 16:33:10 INFO HiveMetaStore: 0: Opening raw store with
implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
15/02/28 16:33:10 INFO ObjectStore: ObjectStore, initialize called
15/02/28 16:33:10 INFO Persistence: Property datanucleus.cache.level2
unknown - will be ignored
15/02/28 16:33:10 INFO Persistence: Property
hive.metastore.integral.jdo.pushdown unknown - will be ignored
15/02/28 16:33:12 INFO ObjectStore: Setting MetaStore object pin classes
with
hive.metastore.cache.pinobjtypes="Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order"
15/02/28 16:33:12 INFO MetaStoreDirectSql: MySQL check failed, assuming we
are not on mysql: Lexical error at line 1, column 5.  Encountered: "@"
(64), after : "".
15/02/28 16:33:13 INFO Datastore: The class
"org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as
"embedded-only" so does not have its own datastore table.
15/02/28 16:33:13 INFO Datastore: The class
"org.apache.hadoop.hive.metastore.model.MOrder" is tagged as
"embedded-only" so does not have its own datastore table.
15/02/28 16:33:13 INFO Datastore: The class
"org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as
"embedded-only" so does not have its own datastore table.
15/02/28 16:33:13 INFO Datastore: The class
"org.apache.hadoop.hive.metastore.model.MOrder" is tagged as
"embedded-only" so does not have its own datastore table.
15/02/28 16:33:13 INFO Query: Reading in results for query
"org.datanucleus.store.rdbms.query.SQLQuery@0" since the connection used is
closing
15/02/28 16:33:13 INFO ObjectStore: Initialized ObjectStore
15/02/28 16:33:14 INFO HiveMetaStore: Added admin role in metastore
15/02/28 16:33:14 INFO HiveMetaStore: Added public role in metastore
15/02/28 16:33:14 INFO HiveMetaStore: No user is added in admin role, since
config is empty
15/02/28 16:33:14 INFO SessionState: No Tez session required at this point.
hive.execution.engine=mr.
15/02/28 16:33:14 INFO ParseDriver: Parsing command: SELECT * FROM people
SORT BY B DESC
15/02/28 16:33:14 INFO ParseDriver: Parse Completed
15/02/28 16:33:14 INFO deprecation: mapred.tip.id is deprecated. Instead,
use mapreduce.task.id
15/02/28 16:33:14 INFO deprecation: mapred.task.id is deprecated. Instead,
use mapreduce.task.attempt.id
15/02/28 16:33:14 INFO dep

Re: Running in-memory SQL on streamed relational data

2015-02-28 Thread Akhil Das
I think you can do simple operations like foreachRDD or transform to get
access to the RDDs in the stream and then you can do SparkSQL over it.

Thanks
Best Regards

On Sat, Feb 28, 2015 at 3:27 PM, Ashish Mukherjee <
ashish.mukher...@gmail.com> wrote:

> Hi,
>
> I have been looking at Spark Streaming , which seems to be for the use
> case of live streams which are processed one line at a time generally in
> real-time.
>
> Since SparkSQL reads data from some filesystem, I was wondering if there
> is something which connects SparkSQL with Spark Streaming, so I can send
> live relational tuples in a stream (rather than read filesystem data) for
> SQL operations.
>
> Also, at present, doing it with Spark Streaming would have complexities of
> handling multiple Dstreams etc. since I may want to run multiple adhoc
> queries of this kind on adhoc data I stream through.
>
> Has anyone done this kind of thing with Spark before? i.e combination of
> SparkSQL with Streaming.
>
> Regards,
> Ashish
>


Running in-memory SQL on streamed relational data

2015-02-28 Thread Ashish Mukherjee
Hi,

I have been looking at Spark Streaming , which seems to be for the use case
of live streams which are processed one line at a time generally in
real-time.

Since SparkSQL reads data from some filesystem, I was wondering if there is
something which connects SparkSQL with Spark Streaming, so I can send live
relational tuples in a stream (rather than read filesystem data) for SQL
operations.

Also, at present, doing it with Spark Streaming would have complexities of
handling multiple Dstreams etc. since I may want to run multiple adhoc
queries of this kind on adhoc data I stream through.

Has anyone done this kind of thing with Spark before? i.e combination of
SparkSQL with Streaming.

Regards,
Ashish


Re: Facing error: java.lang.ArrayIndexOutOfBoundsException while executing SparkSQL join query

2015-02-28 Thread anamika gupta
The issue is now resolved.

One of the csv files had an incorrect record at the end.

On Fri, Feb 27, 2015 at 4:24 PM, anamika gupta 
wrote:

> I have three tables with the following schema:
>
> case class* date_d*(WID: Int, CALENDAR_DATE: java.sql.Timestamp,
> DATE_STRING: String, DAY_OF_WEEK: String, DAY_OF_MONTH: Int, DAY_OF_YEAR:
> Int, END_OF_MONTH_FLAG: String, YEARWEEK: Int, CALENDAR_MONTH: String,
> MONTH_NUM: Int, YEARMONTH: Int, QUARTER: Int, YEAR: Int)
>
>
>
> case class* interval_f*(ORG_ID: Int, CHANNEL_WID: Int, SDP_WID: Int,
> MEAS_WID: Int, DATE_WID: Int, TIME_WID: Int, VALIDATION_STATUS_CD: Int,
> VAL_FAIL_CD:Int, INTERVAL_FLAG_CD: Int, CHANGE_METHOD_WID:Int,
> SOURCE_LAST_UPD_TIME: java.sql.Timestamp, INTERVAL_END_TIME:
> java.sql.Timestamp, LOCKED: String, EXT_VERSION_TIME: java.sql.Timestamp,
> INTERVAL_VALUE: Double, INSERT_TIME: java.sql.Timestamp, LAST_UPD_TIME:
> java.sql.Timestamp)
>
>
>
> class *sdp_d*( WID :Option[Int], BATCH_ID :Option[Int], SRC_ID
> :Option[String], ORG_ID :Option[Int], CLASS_WID :Option[Int], DESC_TEXT
> :Option[String], PREMISE_WID :Option[Int], FEED_LOC :Option[String],
> GPS_LAT :Option[Double], GPS_LONG :Option[Double], PULSE_OUTPUT_BLOCK
> :Option[String], UDC_ID :Option[String], UNIVERSAL_ID :Option[String],
> IS_VIRTUAL_FLG :Option[String], SEAL_INFO :Option[String], ACCESS_INFO
> :Option[String], ALT_ACCESS_INFO :Option[String], LOC_INFO :Option[String],
> ALT_LOC_INFO :Option[String], TYPE :Option[String], SUB_TYPE
> :Option[String], TIMEZONE_ID :Option[Int], GIS_ID :Option[String],
> BILLED_UPTO_TIME :Option[java.sql.Timestamp], POWER_STATUS :Option[String],
> LOAD_STATUS :Option[String], BILLING_HOLD_STATUS :Option[String],
> INSERT_TIME :Option[java.sql.Timestamp], LAST_UPD_TIME
> :Option[java.sql.Timestamp]) extends Product{
>
> @throws(classOf[IndexOutOfBoundsException])
> override def productElement(n: Int) = n match
> {
> case 0 => WID; case 1 => BATCH_ID; case 2 => SRC_ID; case 3 =>
> ORG_ID; case 4 => CLASS_WID; case 5 => DESC_TEXT; case 6 => PREMISE_WID;
> case 7 => FEED_LOC; case 8 => GPS_LAT; case 9 => GPS_LONG; case 10 =>
> PULSE_OUTPUT_BLOCK; case 11 => UDC_ID; case 12 => UNIVERSAL_ID; case 13 =>
> IS_VIRTUAL_FLG; case 14 => SEAL_INFO; case 15 => ACCESS_INFO; case 16 =>
> ALT_ACCESS_INFO; case 17 => LOC_INFO; case 18 => ALT_LOC_INFO; case 19 =>
> TYPE; case 20 => SUB_TYPE; case 21 => TIMEZONE_ID; case 22 => GIS_ID; case
> 23 => BILLED_UPTO_TIME; case 24 => POWER_STATUS; case 25 => LOAD_STATUS;
> case 26 => BILLING_HOLD_STATUS; case 27 => INSERT_TIME; case 28 =>
> LAST_UPD_TIME; case _ => throw new IndexOutOfBoundsException(n.toString())
> }
>
> override def productArity: Int = 29; override def canEqual(that: Any):
> Boolean = that.isInstanceOf[sdp_d]
> }
>
>
>
> Non-join queries work fine:
>
> *val q1 = sqlContext.sql("""SELECT YEAR, DAY_OF_YEAR, MAX(WID), MIN(WID),
> COUNT(*) FROM date_d GROUP BY YEAR, DAY_OF_YEAR ORDER BY YEAR,
> DAY_OF_YEAR""")*
>
> res4: Array[org.apache.spark.sql.Row] =
> Array([2014,305,20141101,20141101,1], [2014,306,20141102,20141102,1],
> [2014,307,20141103,20141103,1], [2014,308,20141104,20141104,1],
> [2014,309,20141105,20141105,1], [2014,310,20141106,20141106,1],
> [2014,311,20141107,20141107,1], [2014,312,20141108,20141108,1],
> [2014,313,20141109,20141109,1], [2014,314,20141110,20141110,1],
> [2014,315,2014,2014,1], [2014,316,20141112,20141112,1],
> [2014,317,20141113,20141113,1], [2014,318,20141114,20141114,1],
> [2014,319,20141115,20141115,1], [2014,320,20141116,20141116,1],
> [2014,321,20141117,20141117,1], [2014,322,20141118,20141118,1],
> [2014,323,20141119,20141119,1], [2014,324,20141120,20141120,1],
> [2014,325,20141121,20141121,1], [2014,326,20141122,20141122,1],
> [2014,327,20141123,20141123,1], [2014,328,20141...
>
>
>
> But the join queries throw this error:*
> java.lang.ArrayIndexOutOfBoundsException*
>
> *scala> val q = sqlContext.sql("""select * from date_d dd join interval_f
> intf on intf.DATE_WID = dd.WID Where intf.DATE_WID >= 20141101 AND
> intf.DATE_WID <= 20141110""")*
>
> q: org.apache.spark.sql.SchemaRDD =
> SchemaRDD[38] at RDD at SchemaRDD.scala:103
> == Query Plan ==
> == Physical Plan ==
> Project
> [WID#0,CALENDAR_DATE#1,DATE_STRING#2,DAY_OF_WEEK#3,DAY_OF_MONTH#4,DAY_OF_YEAR#5,END_OF_MONTH_FLAG#6,YEARWEEK#7,CALENDAR_MONTH#8,MONTH_NUM#9,YEARMONTH#10,QUARTER#11,YEAR#12,ORG_ID#13,CHANNEL_WID#14,SDP_WID#15,MEAS_WID#16,DATE_WID#17,TIME_WID#18,VALIDATION_STATUS_CD#19,VAL_FAIL_CD#20,INTERVAL_FLAG_CD#21,CHANGE_METHOD_WID#22,SOURCE_LAST_UPD_TIME#23,INTERVAL_END_TIME#24,LOCKED#25,EXT_VERSION_TIME#26,INTERVAL_VALUE#27,INSERT_TIME#28,LAST_UPD_TIME#29]
>  ShuffledHashJoin [WID#0], [DATE_WID#17], BuildRight
>   Exchange (HashPartitioning [WID#0], 200)
>InMemoryColumnarTableScan
> [WID#0,CALENDAR_DATE#1,DATE_STRING#2,DAY_OF_WEEK#3,DAY_OF_MONTH#4,DAY_OF_YEAR#5,END_OF_MONTH_FLA...
>
>
> *scala> q.take(5).foreach(println)*
>
> 15/02/27 15:50:26 INFO SparkCont

RE: SparkSQL production readiness

2015-02-28 Thread Wang, Daoyuan
Hopefully  the alpha tag will be remove in 1.4.0, if the community can review 
code a little bit faster :P

Thanks,
Daoyuan

From: Ashish Mukherjee [mailto:ashish.mukher...@gmail.com]
Sent: Saturday, February 28, 2015 4:28 PM
To: user@spark.apache.org
Subject: SparkSQL production readiness

Hi,

I am exploring SparkSQL for my purposes of performing large relational 
operations across a cluster. However, it seems to be in alpha right now. Is 
there any indication when it would be considered production-level? I don't see 
any info on the site.

Regards,
Ashish


SparkSQL production readiness

2015-02-28 Thread Ashish Mukherjee
Hi,

I am exploring SparkSQL for my purposes of performing large relational
operations across a cluster. However, it seems to be in alpha right now. Is
there any indication when it would be considered production-level? I don't
see any info on the site.

Regards,
Ashish