Join two dataframe - Timeout after 5 minutes

2015-09-24 Thread Eyad Sibai
I am trying to join two tables using dataframes using python 3.4 and I am 
getting the following error


I ran it on my localhost machine with 2 workers, spark 1.5


I always get timeout if the job takes more than 5 minutes.




at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
 at 
org.apache.spark.sql.execution.aggregate.TungstenAggregate.doExecute(TungstenAggregate.scala:69)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
 at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
 at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
 at 
org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:142)
 at 
org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:141)
 at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
 ... 33 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [300 
seconds]
 at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
 at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
 at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
 at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
 at scala.concurrent.Await$.result(package.scala:107)
 at 
org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:110)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
 at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
 at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
 at 
org.apache.spark.sql.execution.TungstenProject.doExecute(basicOperators.scala:86)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
 at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
 at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
 at 
org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:119)
 at 
org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:69)
 at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
 ... 41 more


2015-09-23 15:44:09,536 INFO ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/static/sql,null}
2015-09-23 15:44:09,537 INFO ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/SQL1/execution/json,null}
2015-09-23 15:44:09,538 INFO ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/SQL1/execution,null}
2015-09-23 15:44:09,539 INFO ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/SQL1/json,null}
2015-09-23 15:44:09,539 INFO ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/SQL1,null}
2015-09-23 15:44:09,539 INFO ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/static/sql,null}
2015-09-23 15:44:09,539 INFO ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/SQL/execution/json,null}
2015-09-23 15:44:09,539 INFO ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/SQL/execution,null}
2015-09-23 15:44:09,539 INFO ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/SQL/json,null}
2015-09-23 15:44:09,539 INFO ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/SQL,null}
2015-09-23 15:44:09,539 INFO ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/metrics/json,null}
2015-09-23 15:44:09,540 INFO ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/stages/stage/kill,null}
2015-09-23 15:44:09,540 INFO ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/api,null}
2015-09-23 15:44:09,540 INFO ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/,null}
2015-09-23 15:44:09,540 INFO ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/static,null}
2015-09-23 15:44:09,540 INFO ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/executors/threadDump/json,null}
2015-09-23 15:44:09,540 INFO ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/executors/threadDump,null}
2015-09-23 15:44:09,540 INFO ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/executors/json,null}
2015-09-23 15:44:09,540 INFO ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/executors,null}
2015-09-23 15:44:09,540 INFO ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/environment/json,null}
2015-09-23 15:44:09,540 INFO ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/environment,null}
2015-09-23 15:44:09,540 INFO ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/storage/rdd/json,null}
2015-09-23 15:44:09,541 INFO 

GroupBy Java objects in Java Spark

2015-09-24 Thread Ramkumar V
Hi,

I want to know whether grouping by java class objects is possible or not in
java Spark.

I have Tuple2< JavaObject, JavaObject>. i want to groupbyKey and then i'll
do some operations in values after grouping.


*Thanks*,



Re: JdbcRDD Constructor

2015-09-24 Thread satish chandra j
HI Deenar,

Please find the SQL query below:

var SQL_RDD= new JdbcRDD( sc, ()=>
DriverManager.getConnection(url,user,pass),"select col1, col2,
col3..col 37 from schema.Table LIMIT ? OFFSET ?",100,0,*1*,(r:
ResultSet) => (r.getInt("col1"),r.getInt("col2")...r.getInt("col37")))


When I have the above 100,0,*1 * I am getting SQL_RDD.count as 100
When set to 100,0,2 I am getting SQL_RDD.count as 151
When set to 100,0,3 I am getting SQL RDD.count as 201

But where as I expect every execution count should be 100, let me know if I
am missing anything here

Regards,
Satish Chandra


On Thu, Sep 24, 2015 at 12:48 AM, Deenar Toraskar <
deenar.toras...@thinkreactive.co.uk> wrote:

> Satish
>
> Can you post the SQL query you are using?
>
> The SQL query must have 2 placeholders and both of them should be an
> inclusive range (<= and >=)..
>
> e.g. select title, author from books where ? <= id and id <= ?
>
> Are you doing this?
>
> Deenar
>
>
>
>
> *Think Reactive Ltd*
> deenar.toras...@thinkreactive.co.uk
> 07714140812
>
>
>
> On 23 September 2015 at 13:47, satish chandra j 
> wrote:
>
>> HI,
>> Could anybody provide inputs if they have came across similar issue
>>
>> @Rishitesh
>> Could you provide if any sample code to use JdbcRDDSuite
>>
>>
>> Regards,
>> Satish Chandra
>>
>> On Wed, Sep 23, 2015 at 5:14 PM, Rishitesh Mishra <
>> rishi80.mis...@gmail.com> wrote:
>>
>>> I am using Spark 1.5. I always get count = 100, irrespective of num
>>> partitions.
>>>
>>> On Wed, Sep 23, 2015 at 5:00 PM, satish chandra j <
>>> jsatishchan...@gmail.com> wrote:
>>>
 HI,
 Currently using Spark 1.2.2, could you please let me know correct
 results output count which you got it by using JdbcRDDSuite

 Regards,
 Satish Chandra

 On Wed, Sep 23, 2015 at 4:02 PM, Rishitesh Mishra <
 rishi80.mis...@gmail.com> wrote:

> Which version of Spark you are using ??  I can get correct results
> using JdbcRDD. Infact there is a test suite precisely for this (
> JdbcRDDSuite) .
> I changed according to your input and got correct results from this
> test suite.
>
> On Wed, Sep 23, 2015 at 11:00 AM, satish chandra j <
> jsatishchan...@gmail.com> wrote:
>
>> HI All,
>>
>> JdbcRDD constructor has following parameters,
>>
>> *JdbcRDD
>> *
>> (SparkContext
>> 
>>  sc,
>> scala.Function0 getConnection, String sql, *long 
>> lowerBound,
>> long upperBound, int numPartitions*,
>> scala.Function1> >
>>  mapRow,
>> scala.reflect.ClassTag> 
>> > evidence$1)
>>
>> where the below parameters *lowerBound* refers to Lower boundary of
>> entire data, *upperBound *refers to Upper boundary of entire data
>> and *numPartitions *refer to Number of partitions
>>
>> Source table to which JbdcRDD is fetching data from Oracle DB has
>> more than 500 records but its confusing when I tried several executions 
>> by
>> changing "numPartitions" parameter
>>
>> LowerBound,UpperBound,numPartitions: Output Count
>>
>> 0 ,100  ,1   : 100
>>
>> 0 ,100  ,2   : 151
>>
>> 0 ,100  ,3   : 201
>>
>>
>> Please help me in understanding the why Output count is 151 if
>> numPartitions is 2 and Output count is 201 if numPartitions is 3
>>
>> Regards,
>>
>> Satish Chandra
>>
>
>

>>>
>>
>


Re: Custom Hadoop InputSplit, Spark partitions, spark executors/task and Yarn containers

2015-09-24 Thread Adrian Tanase
RE: # because I already have a bunch of InputSplits, do I still need to specify 
the number of executors to get processing parallelized?

I would say it’s best practice to have as many executors as data nodes and as 
many cores as you can get from the cluster – if YARN has enough  resources it 
will deploy the executors distributed across the cluster, then each of them 
will try to process the data locally (check the spark ui for NODE_LOCAL), with 
as many splits in parallel as you defined in spark.executor.cores

-adrian

From: Sandy Ryza
Date: Thursday, September 24, 2015 at 2:43 AM
To: Anfernee Xu
Cc: "user@spark.apache.org"
Subject: Re: Custom Hadoop InputSplit, Spark partitions, spark executors/task 
and Yarn containers

Hi Anfernee,

That's correct that each InputSplit will map to exactly a Spark partition.

On YARN, each Spark executor maps to a single YARN container.  Each executor 
can run multiple tasks over its lifetime, both parallel and sequentially.

If you enable dynamic allocation, after the stage including the InputSplits 
gets submitted, Spark will try to request an appropriate number of executors.

The memory in the YARN resource requests is --executor-memory + what's set for 
spark.yarn.executor.memoryOverhead, which defaults to 10% of --executor-memory.

-Sandy

On Wed, Sep 23, 2015 at 2:38 PM, Anfernee Xu 
> wrote:
Hi Spark experts,

I'm coming across these terminologies and having some confusions, could you 
please help me understand them better?

For instance I have implemented a Hadoop InputFormat to load my external data 
in Spark, in turn my custom InputFormat will create a bunch of InputSplit's, my 
questions is about

# Each InputSplit will exactly map to a Spark partition, is that correct?

# If I run on Yarn, how does Spark executor/task map to Yarn container?

# because I already have a bunch of InputSplits, do I still need to specify the 
number of executors to get processing parallelized?

# How does -executor-memory map to the memory requirement in Yarn's resource 
request?

--
--Anfernee



Re: Scala Limitation - Case Class definition with more than 22 arguments

2015-09-24 Thread satish chandra j
HI All,
As it is for SQL purpose I understand, need to go ahead with Custom Case
Class approach
Could anybody have a sample code for creating Custom Case Class to refer
which would be really helpful

Regards,
Satish Chandra

On Thu, Sep 24, 2015 at 2:51 PM, Adrian Tanase  wrote:

> +1 on grouping the case classes and creating a hierarchy – as long as you
> use the data programatically. For DataFrames / SQL the other ideas probably
> scale better…
>
> From: Ted Yu
> Date: Wednesday, September 23, 2015 at 7:07 AM
> To: satish chandra j
> Cc: user
> Subject: Re: Scala Limitation - Case Class definition with more than 22
> arguments
>
> Can you switch to 2.11 ?
>
> The following has been fixed in 2.11:
> https://issues.scala-lang.org/browse/SI-7296
>
> Otherwise consider packaging related values into a case class of their own.
>
> On Tue, Sep 22, 2015 at 8:48 PM, satish chandra j <
> jsatishchan...@gmail.com> wrote:
>
>> HI All,
>> Do we have any alternative solutions in Scala to avoid limitation in
>> defining a Case Class having more than 22 arguments
>>
>> We are using Scala version 2.10.2, currently I need to define a case
>> class with 37 arguments but getting an error as "*error:Implementation
>> restriction:caseclasses cannot have more than 22parameters.*"
>>
>> It would be a great help if any inputs on the same
>>
>> Regards,
>> Satish Chandra
>>
>>
>>
>


Re: Custom Hadoop InputSplit, Spark partitions, spark executors/task and Yarn containers

2015-09-24 Thread Sabarish Sasidharan
A little caution is needed as one executor per node may not always be ideal
esp when your nodes have lots of RAM. But yes, using lesser number of
executors has benefits like more efficient broadcasts.

Regards
Sab
On 24-Sep-2015 2:57 pm, "Adrian Tanase"  wrote:

> RE: # because I already have a bunch of InputSplits, do I still need to
> specify the number of executors to get processing parallelized?
>
> I would say it’s best practice to have as many executors as data nodes and
> as many cores as you can get from the cluster – if YARN has enough
>  resources it will deploy the executors distributed across the cluster,
> then each of them will try to process the data locally (check the spark ui
> for NODE_LOCAL), with as many splits in parallel as you defined in
> spark.executor.cores
>
> -adrian
>
> From: Sandy Ryza
> Date: Thursday, September 24, 2015 at 2:43 AM
> To: Anfernee Xu
> Cc: "user@spark.apache.org"
> Subject: Re: Custom Hadoop InputSplit, Spark partitions, spark
> executors/task and Yarn containers
>
> Hi Anfernee,
>
> That's correct that each InputSplit will map to exactly a Spark partition.
>
> On YARN, each Spark executor maps to a single YARN container.  Each
> executor can run multiple tasks over its lifetime, both parallel and
> sequentially.
>
> If you enable dynamic allocation, after the stage including the
> InputSplits gets submitted, Spark will try to request an appropriate number
> of executors.
>
> The memory in the YARN resource requests is --executor-memory + what's set
> for spark.yarn.executor.memoryOverhead, which defaults to 10% of
> --executor-memory.
>
> -Sandy
>
> On Wed, Sep 23, 2015 at 2:38 PM, Anfernee Xu 
> wrote:
>
>> Hi Spark experts,
>>
>> I'm coming across these terminologies and having some confusions, could
>> you please help me understand them better?
>>
>> For instance I have implemented a Hadoop InputFormat to load my external
>> data in Spark, in turn my custom InputFormat will create a bunch of
>> InputSplit's, my questions is about
>>
>> # Each InputSplit will exactly map to a Spark partition, is that correct?
>>
>> # If I run on Yarn, how does Spark executor/task map to Yarn container?
>>
>> # because I already have a bunch of InputSplits, do I still need to
>> specify the number of executors to get processing parallelized?
>>
>> # How does -executor-memory map to the memory requirement in Yarn's
>> resource request?
>>
>> --
>> --Anfernee
>>
>
>


Exception during SaveAstextFile Stage

2015-09-24 Thread Chirag Dewan
Hi,

I have 2 stages in my job map and save as text file. During the save text file 
stage I am getting an exception :

15/09/24 15:38:16 WARN AkkaUtils: Error sending message in 1 attempts
java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187)
at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:398)

It might be too early to ask this since I haven't digged at all into why it is 
coming, any one has any idea about this?

Thanks in advance,

Chirag


Re: Scala Limitation - Case Class definition with more than 22 arguments

2015-09-24 Thread satish chandra j
HI All,

In addition to Case Class limitation in Scala, I finding Tuple limitation
too please find the explanation below

//Query to pull data from Source Table

var SQL_RDD= new JdbcRDD( sc, ()=>
DriverManager.getConnection(url,user,pass),"select col1, col2,
col3..col 37 from schema.Table LIMIT ? OFFSET ?",100,0,*1*,(r:
ResultSet) => (r.getInt("col1"),r.getInt("col2")...r.getInt("col37")))


//Define Case Class

case class sqlrow(col1:Int,col2:Int..col37)


var SchSQL= SQL_RDD.map(p => new sqlrow(p._1,p._2.p._37))


followed by apply CreateSchema to RDD and than apply registerTempTable for
defining a table to make use in SQL Context in Spark

As per the above SQL query I need to fetch 37 columns from the source
table, but it seems Scala has tuple restriction which I am defining by r
ResultSet variable in the above SQL, please let me know if any work around
for the same

Regards,
Satish Chandra

On Thu, Sep 24, 2015 at 3:18 PM, satish chandra j 
wrote:

> HI All,
> As it is for SQL purpose I understand, need to go ahead with Custom Case
> Class approach
> Could anybody have a sample code for creating Custom Case Class to refer
> which would be really helpful
>
> Regards,
> Satish Chandra
>
> On Thu, Sep 24, 2015 at 2:51 PM, Adrian Tanase  wrote:
>
>> +1 on grouping the case classes and creating a hierarchy – as long as you
>> use the data programatically. For DataFrames / SQL the other ideas probably
>> scale better…
>>
>> From: Ted Yu
>> Date: Wednesday, September 23, 2015 at 7:07 AM
>> To: satish chandra j
>> Cc: user
>> Subject: Re: Scala Limitation - Case Class definition with more than 22
>> arguments
>>
>> Can you switch to 2.11 ?
>>
>> The following has been fixed in 2.11:
>> https://issues.scala-lang.org/browse/SI-7296
>>
>> Otherwise consider packaging related values into a case class of their
>> own.
>>
>> On Tue, Sep 22, 2015 at 8:48 PM, satish chandra j <
>> jsatishchan...@gmail.com> wrote:
>>
>>> HI All,
>>> Do we have any alternative solutions in Scala to avoid limitation in
>>> defining a Case Class having more than 22 arguments
>>>
>>> We are using Scala version 2.10.2, currently I need to define a case
>>> class with 37 arguments but getting an error as "*error:Implementation
>>> restriction:caseclasses cannot have more than 22parameters.*"
>>>
>>> It would be a great help if any inputs on the same
>>>
>>> Regards,
>>> Satish Chandra
>>>
>>>
>>>
>>
>


Re: GroupBy Java objects in Java Spark

2015-09-24 Thread Sabarish Sasidharan
By java class objects if you mean your custom Java objects, yes of course.
That will work.

Regards
Sab
On 24-Sep-2015 3:36 pm, "Ramkumar V"  wrote:

> Hi,
>
> I want to know whether grouping by java class objects is possible or not
> in java Spark.
>
> I have Tuple2< JavaObject, JavaObject>. i want to groupbyKey and then i'll
> do some operations in values after grouping.
>
>
> *Thanks*,
> 
>
>


Legacy Python code

2015-09-24 Thread Joshua Fox
I managed to put together a first  Spark application on top of my existing
codebase.

But I am still puzzled by the best way to deploy legacy Python code.

Can't I just put my codebase in some directory on the slave machines?

Existing solutions:

1.  Rewrite everything in terms of Spark primitives (map,
reduce, filter, count, etc.) But I have a large legacy codebase, and I
 need to call some coarse-grained functionity inside it. Rewriting would
take too much development time. Also, complex functionality  is most easily
 expressed, and runs fastest,  as ordinary code rather than as a
 combination of hundreds of Spark primitives.

2. Bundle it as a zip and use the pyFiles parameter on SparkContext. But
some of the code loads resources from its own code-path, and so ordinary
file access fails when the source code is in a zip file. In any case, it
seems inefficient to  transmit a large codebase on distributed invocations.

3. Put all the code in Python's site-packages directory. But that directory
is more suited to pip packages than my own code.

4. Set the PYTHONPATH on the slave. But the Python worker code does not
seem to pick this up.

What is the best approach to large legacy Python codebases?

Thanks,

Joshua


Re: Re: How to fix some WARN when submit job on spark 1.5 YARN

2015-09-24 Thread r7raul1...@163.com
Thank you



r7raul1...@163.com
 
From: Sean Owen
Date: 2015-09-24 16:18
To: r7raul1...@163.com
CC: user
Subject: Re: How to fix some WARN when submit job on spark 1.5 YARN
You can ignore all of these. Various libraries can take advantage of
native acceleration if libs are available but it's no problem if they
don't.
 
On Thu, Sep 24, 2015 at 3:25 AM, r7raul1...@163.com  wrote:
> 1 WARN netlib.BLAS: Failed to load implementation from:
> com.github.fommil.netlib.NativeSystemBLAS
> 2 WARN netlib.BLAS: Failed to load implementation from:
> com.github.fommil.netlib.NativeRefBLAS
> 3 WARN  Unable to load native-hadoop library for your platform
> 
> r7raul1...@163.com
 
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org
 


Not fetching all records from Cassandra DB

2015-09-24 Thread satish chandra j
HI All,

Not sure why all records are not retrieved from Cassasndra even though
there are no condition applied in SQL query executed on Cassandra SQL
Context in Spark 1.2.2 version

Note: Its a simple lookup purpose table which has only 10 to 15 records

Please let me know if any inputs on the above

Regards,
Satish Chandra


Re: No space left on device when running graphx job

2015-09-24 Thread Ted Yu
Andy:
Can you show complete stack trace ?

Have you checked there are enough free inode on the .129 machine ?

Cheers

> On Sep 23, 2015, at 11:43 PM, Andy Huang  wrote:
> 
> Hi Jack,
> 
> Are you writing out to disk? Or it sounds like Spark is spilling to disk (RAM 
> filled up) and it's running out of disk space.
> 
> Cheers
> Andy
> 
>> On Thu, Sep 24, 2015 at 4:29 PM, Jack Yang  wrote:
>> Hi folk,
>> 
>>  
>> 
>> I have an issue of graphx. (spark: 1.4.0 + 4 machines + 4G memory + 4 CPU 
>> cores)
>> 
>> Basically, I load data using GraphLoader.edgeListFile mthod and then count 
>> number of nodes using: graph.vertices.count() method.
>> 
>> The problem is :
>> 
>>  
>> 
>> Lost task 11972.0 in stage 6.0 (TID 54585, 192.168.70.129): 
>> java.io.IOException: No space left on device
>> 
>> at java.io.FileOutputStream.writeBytes(Native Method)
>> 
>> at java.io.FileOutputStream.write(FileOutputStream.java:345)
>> 
>>  
>> 
>> when I try a small amount of data, the code is working. So I guess the error 
>> comes from the amount of data.
>> 
>> This is how I submit the job:
>> 
>>  
>> 
>> spark-submit --class "myclass"
>> 
>> --master spark://hadoopmaster:7077  (I am using standalone)
>> 
>> --executor-memory 2048M
>> 
>> --driver-java-options "-XX:MaxPermSize=2G" 
>> 
>> --total-executor-cores 4  my.jar
>> 
>>  
>> 
>>  
>> 
>> Any thoughts?
>> 
>> Best regards,
>> 
>> Jack
>> 
> 
> 
> 
> -- 
> Andy Huang | Managing Consultant | Servian Pty Ltd | t: 02 9376 0700 | f: 02 
> 9376 0730| m: 0433221979


Re: spark + parquet + schema name and metadata

2015-09-24 Thread Borisa Zivkovic
Hi,

your suggestion works nicely.. I was able to attach metadata to columns and
read that metadata from spark and by using ParquetFileReader
It would be nice if we had a way to manipulate parquet metadata directly
from DataFrames though.

regards

On Wed, 23 Sep 2015 at 09:25 Borisa Zivkovic 
wrote:

> Hi,
>
> thanks a lot for this! I will try it out to see if this works ok.
>
> I am planning to use "stable" metadata - so those will be same across all
> parquet files inside directory hierarchy...
>
>
>
> On Tue, 22 Sep 2015 at 18:54 Cheng Lian  wrote:
>
>> Michael reminded me that although we don't support direct manipulation
>> over Parquet metadata, you can still save/query metadata to/from Parquet
>> via DataFrame per-column metadata. For example:
>>
>> import sqlContext.implicits._
>> import org.apache.spark.sql.types.MetadataBuilder
>>
>> val path = "file:///tmp/parquet/meta"
>>
>> // Saving metadata
>> val meta = new MetadataBuilder().putString("appVersion", "1.0.2").build()
>> sqlContext.range(10).select($"id".as("id",
>> meta)).coalesce(1).write.mode("overwrite").parquet(path)
>>
>> // Querying metadata
>>
>> sqlContext.read.parquet(path).schema("id").metadata.getString("appVersion")
>>
>> The metadata is saved together with Spark SQL schema as a JSON string.
>> For example, the above code generates the following Parquet metadata
>> (inspected with parquet-meta):
>>
>> file:
>> file:/private/tmp/parquet/meta/part-r-0-77cb2237-e6a8-4cb6-a452-ae205ba7b660.gz.parquet
>> creator: parquet-mr version 1.6.0
>> extra:   org.apache.spark.sql.parquet.row.metadata =
>> {"type":"struct","fields":[{"name":"id","type":"long","nullable":true,
>> *"metadata":{"appVersion":"1.0.2"}*}]}
>>
>>
>> Cheng
>>
>>
>> On 9/22/15 9:37 AM, Cheng Lian wrote:
>>
>> I see, this makes sense. We should probably add this in Spark SQL.
>>
>> However, there's one corner case to note about user-defined Parquet
>> metadata. When committing a write job, ParquetOutputCommitter writes
>> Parquet summary files (_metadata and _common_metadata), and user-defined
>> key-value metadata written in all Parquet part-files get merged here. The
>> problem is that, if a single key is associated with multiple values,
>> Parquet doesn't know how to reconcile this situation, and simply gives up
>> writing summary files. This can be particular annoying for appending. In
>> general, users should avoid storing "unstable" values like timestamps as
>> Parquet metadata.
>>
>> Cheng
>>
>> On 9/22/15 1:58 AM, Borisa Zivkovic wrote:
>>
>> thanks for answer.
>>
>> I need this in order to be able to track schema metadata.
>>
>> basically when I create parquet files from Spark I want to be able to
>> "tag" them in some way (giving the schema appropriate name or attaching
>> some key/values) and then it is fairly easy to get basic metadata about
>> parquet files when processing and discovering those later on.
>>
>> On Mon, 21 Sep 2015 at 18:17 Cheng Lian  wrote:
>>
>>> Currently Spark SQL doesn't support customizing schema name and
>>> metadata. May I know why these two matters in your use case? Some
>>> Parquet data models, like parquet-avro, do support it, while some others
>>> don't (e.g. parquet-hive).
>>>
>>> Cheng
>>>
>>> On 9/21/15 7:13 AM, Borisa Zivkovic wrote:
>>> > Hi,
>>> >
>>> > I am trying to figure out how to write parquet metadata when
>>> > persisting DataFrames to parquet using Spark (1.4.1)
>>> >
>>> > I could not find a way to change schema name (which seems to be
>>> > hardcoded to root) and also how to add data to key/value metadata in
>>> > parquet footer.
>>> >
>>> > org.apache.parquet.hadoop.metadata.FileMetaData#getKeyValueMetaData
>>> >
>>> > org.apache.parquet.schema.Type#getName
>>> >
>>> > thanks
>>> >
>>> >
>>>
>>>
>>
>>


Re: Spark on YARN / aws - executor lost on node restart

2015-09-24 Thread Adrian Tanase
Closing the loop, I’ve submitted this issue – TD, cc-ing you since it’s spark 
streaming, not sure who oversees the Yarn module.
https://issues.apache.org/jira/browse/SPARK-10792

-adrian

From: Adrian Tanase
Date: Friday, September 18, 2015 at 6:18 PM
To: "user@spark.apache.org"
Subject: Re: Spark on YARN / aws - executor lost on node restart

Hi guys,

Digging up this question after spending some more time trying to replicate it.
It seems to be an issue with the YARN – spark integration, wondering if there 
is a bug already tracking this?

If I just kill the process on the machine, YARN detects the container is dead 
and the spark framework requests a new container to be deployed.
If the machine goes away completely, spark sees that the executor is lost but 
YarnAllocator never tries to request the container again. Wondering if there’s 
an implicit assumption that it would be notified by YARN, which might not 
happen if the node dies completely?

If there are no ideas on the list, I’ll prepare some logs and follow up with an 
issue.

Thanks,
-adrian

From: Adrian Tanase
Date: Wednesday, September 16, 2015 at 6:01 PM
To: "user@spark.apache.org"
Subject: Spark on YARN / aws - executor lost on node restart

Hi all,

We’re using spark streaming (1.4.0), deployed on AWS through yarn. It’s a 
stateful app that reads from kafka (with the new direct API) and we’re 
checkpointing to HDFS.

During some resilience testing, we restarted one of the machines and brought it 
back online. During the offline period, the Yarn cluster would not have 
resources to re-create the missing executor.
After starting all the services on the machine, it correctly joined the Yarn 
cluster, however the spark streaming app does not seem to notice that the 
resources are back and has not re-created the missing executor.

The app is correctly running with 6 out o 7 executors, however it’s running 
under capacity.
If we manually kill the driver and re-submit the app to yarn, all the sate is 
correctly recreated from checkpoint and all 7 executors are now online – 
however this seems like a brutal workaround.

So, here are some questions:

  *   Isn't the driver supposed to auto-heal after a machine is completely lost 
and then comes back after some time?
  *   Are any configuration settings that influence how spark driver should 
poll yarn to check back on resources being available again?
  *   Is there a tool one can run to “force” the driver to re-create missing 
workers/executors?

Lastly, another issue was that the driver also crashed and yarn successfully 
restarted it – I’m not sure yet if it’s because of some retry setting or 
another exception, will post the logs after I recreate the problem.

Thanks in advance for any ideas,
-adrian


Re: Querying on multiple Hive stores using Apache Spark

2015-09-24 Thread Karthik
Any ideas or suggestions?

Thanks,
Karthik.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Querying-on-multiple-Hive-stores-using-Apache-Spark-tp24765p24797.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



No space left on device when running graphx job

2015-09-24 Thread Jack Yang
Hi folk,

I have an issue of graphx. (spark: 1.4.0 + 4 machines + 4G memory + 4 CPU cores)
Basically, I load data using GraphLoader.edgeListFile mthod and then count 
number of nodes using: graph.vertices.count() method.
The problem is :

Lost task 11972.0 in stage 6.0 (TID 54585, 192.168.70.129): 
java.io.IOException: No space left on device
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:345)

when I try a small amount of data, the code is working. So I guess the error 
comes from the amount of data.
This is how I submit the job:

spark-submit --class "myclass"
--master spark://hadoopmaster:7077  (I am using standalone)
--executor-memory 2048M
--driver-java-options "-XX:MaxPermSize=2G"
--total-executor-cores 4  my.jar


Any thoughts?
Best regards,
Jack



Re: Join two dataframe - Timeout after 5 minutes

2015-09-24 Thread Shixiong Zhu
You can change "spark.sql.broadcastTimeout" to increase the timeout. The
default value is 300 seconds.

Best Regards,
Shixiong Zhu

2015-09-24 15:16 GMT+08:00 Eyad Sibai :

> I am trying to join two tables using dataframes using python 3.4 and I am
> getting the following error
>
>
> I ran it on my localhost machine with 2 workers, spark 1.5
>
>
> I always get timeout if the job takes more than 5 minutes.
>
>
>
> at
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
>
>  at
> org.apache.spark.sql.execution.aggregate.TungstenAggregate.doExecute(TungstenAggregate.scala:69)
>
>  at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
>
>  at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
>
>  at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>
>  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
>
>  at
> org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:142)
>
>  at
> org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:141)
>
>  at
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
>
>  ... 33 more
>
> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
> [300 seconds]
>
>  at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>
>  at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>
>  at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>
>  at
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>
>  at scala.concurrent.Await$.result(package.scala:107)
>
>  at
> org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:110)
>
>  at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
>
>  at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
>
>  at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>
>  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
>
>  at
> org.apache.spark.sql.execution.TungstenProject.doExecute(basicOperators.scala:86)
>
>  at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
>
>  at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
>
>  at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>
>  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
>
>  at
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:119)
>
>  at
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:69)
>
>  at
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
>
>  ... 41 more
>
>
> 2015-09-23 15:44:09,536 INFO ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/static/sql,null}
>
> 2015-09-23 15:44:09,537 INFO ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/SQL1/execution/json,null}
>
> 2015-09-23 15:44:09,538 INFO ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/SQL1/execution,null}
>
> 2015-09-23 15:44:09,539 INFO ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/SQL1/json,null}
>
> 2015-09-23 15:44:09,539 INFO ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/SQL1,null}
>
> 2015-09-23 15:44:09,539 INFO ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/static/sql,null}
>
> 2015-09-23 15:44:09,539 INFO ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/SQL/execution/json,null}
>
> 2015-09-23 15:44:09,539 INFO ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/SQL/execution,null}
>
> 2015-09-23 15:44:09,539 INFO ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/SQL/json,null}
>
> 2015-09-23 15:44:09,539 INFO ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/SQL,null}
>
> 2015-09-23 15:44:09,539 INFO ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/metrics/json,null}
>
> 2015-09-23 15:44:09,540 INFO ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/stages/stage/kill,null}
>
> 2015-09-23 15:44:09,540 INFO ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/api,null}
>
> 2015-09-23 15:44:09,540 INFO ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/,null}
>
> 2015-09-23 15:44:09,540 INFO ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/static,null}
>
> 2015-09-23 15:44:09,540 INFO ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/executors/threadDump/json,null}
>
> 2015-09-23 15:44:09,540 INFO ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/executors/threadDump,null}
>
> 2015-09-23 15:44:09,540 INFO ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/executors/json,null}
>
> 2015-09-23 

Re: Hbase Spark streaming issue.

2015-09-24 Thread Shixiong Zhu
Looks like you have an incompatible hbase-default.xml in some place. You
can use the following code to find the location of "hbase-default.xml"

println(Thread.currentThread().getContextClassLoader().getResource("hbase-default.xml"))

Best Regards,
Shixiong Zhu

2015-09-21 15:46 GMT+08:00 Siva :

> Hi,
>
> I m seeing some strange error while inserting data from spark streaming to
> hbase.
>
> I can able to write the data from spark (without streaming) to hbase
> successfully, but when i use the same code to write dstream I m seeing the
> below error.
>
> I tried setting the below parameters, still didnt help. Did any face the
> similar issue?
>
> conf.set("hbase.defaults.for.version.skip", "true")
> conf.set("hbase.defaults.for.version", "0.98.4.2.2.4.2-2-hadoop2")
>
> 15/09/20 22:39:10 ERROR Executor: Exception in task 0.0 in stage 14.0 (TID
> 16)
> java.lang.RuntimeException: hbase-default.xml file seems to be for and old
> version of HBase (null), this version is 0.98.4.2.2.4.2-2-hadoop2
> at
> org.apache.hadoop.hbase.HBaseConfiguration.checkDefaultsVersion(HBaseConfiguration.java:73)
> at
> org.apache.hadoop.hbase.HBaseConfiguration.addHbaseResources(HBaseConfiguration.java:105)
> at
> org.apache.hadoop.hbase.HBaseConfiguration.create(HBaseConfiguration.java:116)
> at
> org.apache.hadoop.hbase.HBaseConfiguration.create(HBaseConfiguration.java:125)
> at
> $line51.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$HBaseConn$.hbaseConnection(:49)
> at
> $line52.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$TestHbaseSpark$$anonfun$run$1$$anonfun$apply$1.apply(:73)
> at
> $line52.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$TestHbaseSpark$$anonfun$run$1$$anonfun$apply$1.apply(:73)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at
> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
> at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:782)
> at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:782)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> 15/09/20 22:39:10 WARN TaskSetManager: Lost task 0.0 in stage 14.0 (TID
> 16, localhost): java.lang.RuntimeException: hbase-default.xml file seems to
> be for and old version of HBase (null), this version is
> 0.98.4.2.2.4.2-2-hadoop2
>
>
> Thanks,
> Siva.
>


Re: How to make Group By/reduceByKey more efficient?

2015-09-24 Thread Adrian Tanase
All the *ByKey aggregations perform an efficient shuffle and preserve 
partitioning on the output. If all you need is to call reduceByKey, then don’t 
bother with groupBy. You should use groupBy if you really need all the 
datapoints from a key for a very custom operation.


From the docs:

Note: If you are grouping in order to perform an aggregation (such as a sum or 
average) over each key, using reduceByKey or aggregateByKey will yield much 
better performance. 


What you should worry about in more complex pipelines is that you’re actually 
preserving the partitioner between stages. For example, if you use a custom 
partitioner between a partitionBy and an updateStateBy key. Or if you use .map 
or .flatMap instead of .mapValues and .flatMapValues.

By the way, learn to use the Spark UI to understand the DAG / Execution plan 
and try to navigate the source code - I found the comments and the various 
preservePartitioner options very educational.

-adrian





On 9/23/15, 8:43 AM, "swetha"  wrote:

>Hi,
>
>How to make Group By more efficient? Is it recommended to use a custom
>partitioner and then do a Group By? And can we use a custom partitioner and
>then use a  reduceByKey for optimization?
>
>
>Thanks,
>Swetha
>
>
>
>--
>View this message in context: 
>http://apache-spark-user-list.1001560.n3.nabble.com/How-to-make-Group-By-reduceByKey-more-efficient-tp24780.html
>Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
>-
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>For additional commands, e-mail: user-h...@spark.apache.org
>


Re: Spark 1.5.0 on YARN dynamicAllocation - Initial job has not accepted any resources

2015-09-24 Thread Jonathan Kelly
I cut https://issues.apache.org/jira/browse/SPARK-10790 for this issue.

On Wed, Sep 23, 2015 at 8:38 PM, Jonathan Kelly 
wrote:

> AHA! I figured it out, but it required some tedious remote debugging of
> the Spark ApplicationMaster. (But now I understand the Spark codebase a
> little better than before, so I guess I'm not too put out. =P)
>
> Here's what's happening...
>
> I am setting spark.dynamicAllocation.minExecutors=1 but am not setting
> spark.dynamicAllocation.initialExecutors, so it's remaining at the default
> of spark.dynamicAllocation.minExecutors. However, ExecutorAllocationManager
> doesn't actually request any executors while the application is still
> initializing (see comment here
> ),
> but it still sets numExecutorsTarget to
> spark.dynamicAllocation.initialExecutors (i.e., 1).
>
> The JavaWordCount example I've been trying to run is only operating on a
> very small file, so its first stage only has a single task and thus should
> request a single executor once the polling loop comes along.
>
> Then on this line
> ,
> it returns numExecutorsTarget (1) - oldNumExecutorsTarget (still 1, even
> though there aren't any executors running yet) = 0, for the number of
> executors it should request. Then the app hangs forever because it never
> requests any executors.
>
> I verified this further by setting
> spark.dynamicAllocation.minExecutors=100 and trying to run my SparkPi
> example I mentioned earlier (which runs 100 tasks in its first stage
> because that's the number I'm passing to the driver). Then it would hang in
> the same way as my JavaWordCount example. If I run it again, passing 101
> (so that it has 101 tasks), it works, and if I pass 99, it hangs again.
>
> So it seems that I have found a bug in that if you set
> spark.dynamicAllocation.minExecutors (or, presumably,
> spark.dynamicAllocation.initialExecutors), and the number of tasks in your
> first stage is less than or equal to this min/init number of executors, it
> won't actually request any executors and will just hang indefinitely.
>
> I can't seem to find a JIRA for this, so shall I file one, or has anybody
> else seen anything like this?
>
> ~ Jonathan
>
> On Wed, Sep 23, 2015 at 7:08 PM, Jonathan Kelly 
> wrote:
>
>> Another update that doesn't make much sense:
>>
>> The SparkPi example does work on yarn-cluster mode with dynamicAllocation.
>>
>> That is, the following command works (as well as with yarn-client mode):
>>
>> spark-submit --deploy-mode cluster --class
>> org.apache.spark.examples.SparkPi spark-examples.jar 100
>>
>> But the following one does not work (nor does it work for yarn-client
>> mode):
>>
>> spark-submit --deploy-mode cluster --class
>> org.apache.spark.examples.JavaWordCount spark-examples.jar
>> /tmp/word-count-input.txt
>>
>> So this JavaWordCount example hangs on requesting executors, while
>> SparkPi and spark-shell do work.
>>
>> ~ Jonathan
>>
>> On Wed, Sep 23, 2015 at 6:22 PM, Jonathan Kelly 
>> wrote:
>>
>>> Thanks for the quick response!
>>>
>>> spark-shell is indeed using yarn-client. I forgot to mention that I also
>>> have "spark.master yarn-client" in my spark-defaults.conf file too.
>>>
>>> The working spark-shell and my non-working example application both
>>> display spark.scheduler.mode=FIFO on the Spark UI. Is that what you are
>>> asking about? I haven't actually messed around with different scheduler
>>> modes yet.
>>>
>>> One more thing I should mention is that the YARN ResourceManager tells
>>> me the following on my 5-node cluster, with one node being the master and
>>> not running a NodeManager:
>>> Memory Used: 1.50 GB (this is the running ApplicationMaster that's
>>> waiting and waiting for the executors to start up)
>>> Memory Total: 45 GB (11.25 from each of the 4 slave nodes)
>>> VCores Used: 1
>>> VCores Total: 32
>>> Active Nodes: 4
>>>
>>> ~ Jonathan
>>>
>>> On Wed, Sep 23, 2015 at 6:10 PM, Andrew Duffy 
>>> wrote:
>>>
 What pool is the spark shell being put into? (You can see this through
 the YARN UI under scheduler)

 Are you certain you're starting spark-shell up on YARN? By default it
 uses a local spark executor, so if it "just works" then it's because it's
 not using dynamic allocation.


 On Wed, Sep 23, 2015 at 18:04 Jonathan Kelly 
 wrote:

> I'm running into a problem with YARN dynamicAllocation on Spark 1.5.0
> after using it successfully on an identically configured cluster with 
> Spark
> 1.4.1.
>
> I'm getting the dreaded warning "YarnClusterScheduler: Initial job has
> not accepted any resources; check your cluster UI to 

Re: How to fix some WARN when submit job on spark 1.5 YARN

2015-09-24 Thread Sean Owen
You can ignore all of these. Various libraries can take advantage of
native acceleration if libs are available but it's no problem if they
don't.

On Thu, Sep 24, 2015 at 3:25 AM, r7raul1...@163.com  wrote:
> 1 WARN netlib.BLAS: Failed to load implementation from:
> com.github.fommil.netlib.NativeSystemBLAS
> 2 WARN netlib.BLAS: Failed to load implementation from:
> com.github.fommil.netlib.NativeRefBLAS
> 3 WARN  Unable to load native-hadoop library for your platform
> 
> r7raul1...@163.com

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Fwd: Spark streaming DStream state on worker

2015-09-24 Thread Shixiong Zhu
+user, -dev

It's not clear about `compute` in your question. There are two `compute`
here.

1. DStream.compute: it always runs in the driver, and all RDDs are created
in the driver. E.g.,

DStream.foreachRDD(rdd => rdd.count())

"rdd.count()" is called in the driver.

2. RDD.compute: this will run in the executor and the location is not
guaranteed. E.g.,

DStream.foreachRDD(rdd => rdd.foreach { v =>
println(v)
})

"println(v)" is called in the executor.


Best Regards,
Shixiong Zhu

2015-09-17 3:47 GMT+08:00 Renyi Xiong :

> Hi,
>
> I want to do temporal join operation on DStream across RDDs, my question
> is: Are RDDs from same DStream always computed on same worker (except
> failover) ?
>
> thanks,
> Renyi.
>


Re: No space left on device when running graphx job

2015-09-24 Thread Andy Huang
Hi Jack,

Are you writing out to disk? Or it sounds like Spark is spilling to disk
(RAM filled up) and it's running out of disk space.

Cheers
Andy

On Thu, Sep 24, 2015 at 4:29 PM, Jack Yang  wrote:

> Hi folk,
>
>
>
> I have an issue of graphx. (spark: 1.4.0 + 4 machines + 4G memory + 4 CPU
> cores)
>
> Basically, I load data using GraphLoader.edgeListFile mthod and then count
> number of nodes using: graph.vertices.count() method.
>
> The problem is :
>
>
>
> *Lost task 11972.0 in stage 6.0 (TID 54585, 192.168.70.129):
> java.io.IOException: No space left on device*
>
> *at java.io.FileOutputStream.writeBytes(Native Method)*
>
> *at java.io.FileOutputStream.write(FileOutputStream.java:345)*
>
>
>
> when I try a small amount of data, the code is working. So I guess the
> error comes from the amount of data.
>
> This is how I submit the job:
>
>
>
> spark-submit --class "myclass"
>
> --master spark://hadoopmaster:7077  (I am using standalone)
>
> --executor-memory 2048M
>
> --driver-java-options "-XX:MaxPermSize=2G"
>
> --total-executor-cores 4  my.jar
>
>
>
>
>
> Any thoughts?
>
> Best regards,
>
> Jack
>
>
>



-- 
Andy Huang | Managing Consultant | Servian Pty Ltd | t: 02 9376 0700 |
f: 02 9376 0730| m: 0433221979


Re: Creating BlockMatrix with java API

2015-09-24 Thread Sabarish Sasidharan
What I meant is that something like this would work. Yes, it's less than
elegant but it works.

List, Matrix>> blocks = new
ArrayList,Matrix>>();
blocks.add(
new Tuple2, Matrix>(
new Tuple2(0, 0), Matrices.dense(2, 2, new double[] {0.0D,
1.1D, 2.0D, 3.1D})));
blocks.add(
new Tuple2, Matrix>(
new Tuple2(0, 1), Matrices.dense(2, 2, new double[] {0.0D,
1.1D, 2.0D, 3.1D})));
blocks.add(
new Tuple2, Matrix>(
new Tuple2(1, 0), Matrices.dense(2, 2, new double[] {0.0D,
1.1D, 2.0D, 3.1D})));
blocks.add(
new Tuple2, Matrix>(
new Tuple2(1, 1), Matrices.dense(2, 2, new double[] {0.0D,
1.1D, 2.0D, 3.1D})));
BlockMatrix bm = new
BlockMatrix(CurrentContext.getCurrentContext().parallelize(blocks).rdd(),
2, 2);


Regards
Sab

On Thu, Sep 24, 2015 at 2:35 AM, Pulasthi Supun Wickramasinghe <
pulasthi...@gmail.com> wrote:

> Hi YiZhi,
>
> Actually i was not able to try it out to see if it was working. I sent the
> previous reply assuming that Sabarish's solution would work :). Sorry if
> there was any confusion.
>
> Best Regards,
> Pulasthi
>
> On Wed, Sep 23, 2015 at 6:47 AM, YiZhi Liu  wrote:
>
>> Hi Pulasthi,
>>
>> Are you sure this worked? When I applied rdd.rdd() to the constructor
>> of BlockMatrix, the complier complained
>>
>> [error]
>> spark/examples/src/main/java/org/apache/spark/examples/mllib/JavaSVD.java:38:
>> error: incompatible types: RDD,Matrix>>
>> cannot be converted to RDD,Matrix>>
>> [error] BlockMatrix blockMatrix = new BlockMatrix(rdd.rdd(), 2, 2);
>>
>> It must caused by the type elimination from scala to java. To make it
>> work, we have to define 'rdd' as JavaRDD> Object>, Matrix>>
>>
>> As Yanbo has mentioned, I think a Java friendly constructor is still in
>> demand.
>>
>> 2015-09-23 13:14 GMT+08:00 Pulasthi Supun Wickramasinghe
>> :
>> > Hi Sabarish
>> >
>> > Thanks, that would indeed solve my problem
>> >
>> > Best Regards,
>> > Pulasthi
>> >
>> > On Wed, Sep 23, 2015 at 12:55 AM, Sabarish Sasidharan
>> >  wrote:
>> >>
>> >> Hi Pulasthi
>> >>
>> >> You can always use JavaRDD.rdd() to get the scala rdd. So in your case,
>> >>
>> >> new BlockMatrix(rdd.rdd(), 2, 2)
>> >>
>> >> should work.
>> >>
>> >> Regards
>> >> Sab
>> >>
>> >> On Tue, Sep 22, 2015 at 10:50 PM, Pulasthi Supun Wickramasinghe
>> >>  wrote:
>> >>>
>> >>> Hi Yanbo,
>> >>>
>> >>> Thanks for the reply. I thought i might be missing something. Anyway i
>> >>> moved to using scala since it is the complete API.
>> >>>
>> >>> Best Regards,
>> >>> Pulasthi
>> >>>
>> >>> On Tue, Sep 22, 2015 at 7:03 AM, Yanbo Liang 
>> wrote:
>> 
>>  This is due to the distributed matrices like
>>  BlockMatrix/RowMatrix/IndexedRowMatrix/CoordinateMatrix do not
>> provide Java
>>  friendly constructors. I have file a SPARK-10757 to track this issue.
>> 
>>  2015-09-18 3:36 GMT+08:00 Pulasthi Supun Wickramasinghe
>>  :
>> >
>> > Hi All,
>> >
>> > I am new to Spark and i am trying to do some BlockMatrix operations
>> > with the Mllib API's. But i can't seem to create a BlockMatrix with
>> the java
>> > API. I tried the following
>> >
>> > Matrix matrixa = Matrices.rand(4, 4, new Random(1000));
>> > List,Matrix>> list = new
>> > ArrayList, Matrix>>();
>> > Tuple2 intTuple = new Tuple2> Integer>(0,0);
>> > Tuple2,Matrix> tuple2MatrixTuple2 = new
>> > Tuple2, Matrix>(intTuple,matrixa );
>> > list.add(tuple2MatrixTuple2);
>> > JavaRDD, Matrix>> rdd =
>> > sc.parallelize(list);
>> >
>> > BlockMatrix blockMatrix = new BlockMatrix(rdd,2,2);
>> >
>> >
>> > but since BlockMatrix only takes
>> >
>> "RDD,Matrix>>"
>> > this code does not work. sc.parallelize() returns a JavaRDD so the
>> two are
>> > not compatible. I also couldn't find any code samples for this. Any
>> help on
>> > this would be highly appreciated.
>> >
>> > Best Regards,
>> > Pulasthi
>> > --
>> > Pulasthi S. Wickramasinghe
>> > Graduate Student  | Research Assistant
>> > School of Informatics and Computing | Digital Science Center
>> > Indiana University, Bloomington
>> > cell: 224-386-9035
>> 
>> 
>> >>>
>> >>>
>> >>>
>> >>> --
>> >>> Pulasthi S. Wickramasinghe
>> >>> Graduate Student  | Research Assistant
>> >>> School of Informatics and Computing | Digital Science 

Re: JobScheduler: Error generating jobs for time for custom InputDStream

2015-09-24 Thread Shixiong Zhu
Looks like you returns a "Some(null)" in "compute". If you don't want to
create a RDD, it should return None. If you want to return an empty RDD, it
should return "Some(sc.emptyRDD)".

Best Regards,
Shixiong Zhu

2015-09-15 2:51 GMT+08:00 Juan Rodríguez Hortalá <
juan.rodriguez.hort...@gmail.com>:

> Hi,
>
> I sent this message to the user list a few weeks ago with no luck, so I'm
> forwarding it to the dev list in case someone could give a hand with this.
> Thanks a lot in advance
>
>
> I've developed a ScalaCheck property for testing Spark Streaming
> transformations. To do that I had to develop a custom InputDStream, which
> is very similar to QueueInputDStream but has a method for adding new test
> cases for dstreams, which are objects of type Seq[Seq[A]], to the DStream.
> You can see the code at
> https://github.com/juanrh/sscheck/blob/32c2bff66aa5500182e0162a24ecca6d47707c42/src/main/scala/org/apache/spark/streaming/dstream/DynSeqQueueInputDStream.scala.
> I have developed a few properties that run in local mode
> https://github.com/juanrh/sscheck/blob/32c2bff66aa5500182e0162a24ecca6d47707c42/src/test/scala/es/ucm/fdi/sscheck/spark/streaming/ScalaCheckStreamingTest.scala.
> The problem is that when the batch interval is too small, and the machine
> cannot complete the batches fast enough, I get the following exceptions in
> the Spark log
>
> 15/08/26 11:22:02 ERROR JobScheduler: Error generating jobs for time
> 1440580922500 ms
> java.lang.NullPointerException
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$count$1$$anonfun$apply$18.apply(DStream.scala:587)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$count$1$$anonfun$apply$18.apply(DStream.scala:587)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:668)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:666)
> at
> org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:41)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
> at scala.Option.orElse(Option.scala:257)
> at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
> at
> org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:41)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
> at scala.Option.orElse(Option.scala:257)
> at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
> at
> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> at
> 

Re: Scala Limitation - Case Class definition with more than 22 arguments

2015-09-24 Thread Adrian Tanase
+1 on grouping the case classes and creating a hierarchy – as long as you use 
the data programatically. For DataFrames / SQL the other ideas probably scale 
better…

From: Ted Yu
Date: Wednesday, September 23, 2015 at 7:07 AM
To: satish chandra j
Cc: user
Subject: Re: Scala Limitation - Case Class definition with more than 22 
arguments

Can you switch to 2.11 ?

The following has been fixed in 2.11:
https://issues.scala-lang.org/browse/SI-7296

Otherwise consider packaging related values into a case class of their own.

On Tue, Sep 22, 2015 at 8:48 PM, satish chandra j 
> wrote:
HI All,
Do we have any alternative solutions in Scala to avoid limitation in defining a 
Case Class having more than 22 arguments

We are using Scala version 2.10.2, currently I need to define a case class with 
37 arguments but getting an error as "error:Implementation 
restriction:caseclasses cannot have more than 22parameters."

It would be a great help if any inputs on the same

Regards,
Satish Chandra





kafka direct streaming with checkpointing

2015-09-24 Thread Radu Brumariu
Hi,
in my application I use Kafka direct streaming and I have also enabled
checkpointing.
This seems to work fine if the application is restarted. However if I
change the code and resubmit the application, it cannot start because of
the checkpointed data being of different class versions.
Is there any way I can use checkpointing that can survive across
application version changes?

Thanks,
Radu


Re: Java Heap Space Error

2015-09-24 Thread Yusuf Can Gürkan
@Jingyu
Yes, it works without regex and concatenation as the query below:

So, what we can understand from this? Because when i do like that, shuffle read 
sizes are equally distributed between partitions.

val usersInputDF = sqlContext.sql(
s"""
 |  select userid from landing where dt='2015-9' and userid != '' and 
userid is not null and userid is not NULL and pagetype = 'productDetail' group 
by userid

   """.stripMargin)

@java8964

I tried with sql.shuffle.partitions = 1 but no luck. It’s again one of the 
partitions shuffle size is huge and the others are very small.


——
So how can i balance this shuffle read size between partitions?


> On 24 Sep 2015, at 03:35, Zhang, Jingyu  wrote:
> 
> Is you sql works if do not runs a regex on strings and concatenates them, I 
> mean just Select the stuff without String operations?
> 
> On 24 September 2015 at 10:11, java8964  > wrote:
> Try to increase partitions count, that will make each partition has less data.
> 
> Yong
> 
> Subject: Re: Java Heap Space Error
> From: yu...@useinsider.com 
> Date: Thu, 24 Sep 2015 00:32:47 +0300
> CC: user@spark.apache.org 
> To: java8...@hotmail.com 
> 
> 
> Yes, it’s possible. I use S3 as data source. My external tables has 
> partitioned. Belowed task is 193/200. Job has 2 stages and its 193. task of 
> 200 in 2.stage because of sql.shuffle.partitions. 
> 
> How can i avoid this situation, this is my query:
> 
> select userid,concat_ws(' ',collect_list(concat_ws(' ',if(productname is not 
> NULL,lower(productname),''),lower(regexp_replace(regexp_replace(substr(productcategory,2,length(productcategory)-2),'\"',''),\",\",'
>  ') inputlist from landing where dt='2015-9' and userid != '' and userid 
> is not null and userid is not NULL and pagetype = 'productDetail' group by 
> userid
> 
> On 23 Sep 2015, at 23:55, java8964  > wrote:
> 
> Based on your description, you job shouldn't have any shuffle then, as you 
> just apply regex and concatenation on the column, but there is one partition 
> having 4.3M records to be read, vs less than 1M records for other partitions.
> 
> Is that possible? It depends on what is the source of your data.
> 
> If there is shuffle in your query (More than 2 stages generated by your 
> query, and this is my guess of what happening), then it simple means that one 
> partition having way more data than the rest of partitions.
> 
> Yong
> 
> From: yu...@useinsider.com 
> Subject: Java Heap Space Error
> Date: Wed, 23 Sep 2015 23:07:17 +0300
> To: user@spark.apache.org 
> 
> What can cause this issue in the attached picture? I’m running and sql query 
> which runs a regex on strings and concatenates them. Because of this task, my 
> job gives java heap space error.
> 
> 
> 
> 
> 
> This message and its attachments may contain legally privileged or 
> confidential information. It is intended solely for the named addressee. If 
> you are not the addressee indicated in this message or responsible for 
> delivery of the message to the addressee, you may not copy or deliver this 
> message or its attachments to anyone. Rather, you should permanently delete 
> this message and its attachments and kindly notify the sender by reply 
> e-mail. Any content of this message and its attachments which does not relate 
> to the official business of the sending company must be taken not to have 
> been sent or endorsed by that company or any of its related entities. No 
> warranty is made that the e-mail or attachments are free from computer virus 
> or other defect.



RE: Java Heap Space Error

2015-09-24 Thread java8964
This is interesting.
So you mean that query as 
"select userid from landing where dt='2015-9' and userid != '' and userid is 
not null and userid is not NULL and pagetype = 'productDetail' group by userid"
works in your cluster?
In this case, do you also see this one task with way more data than the rest, 
as it happened when you use regex and concatenation?
It is hard to believe that just add "regex" and "concatenation" will make the 
distribution more equally across partitions. In your query, the distribution in 
the partitions simply depends on the Hash partitioner of "userid".
Can you show us the query after you add "regex" and "concatenation"?
Yong

Subject: Re: Java Heap Space Error
From: yu...@useinsider.com
Date: Thu, 24 Sep 2015 15:34:48 +0300
CC: user@spark.apache.org
To: jingyu.zh...@news.com.au; java8...@hotmail.com

@JingyuYes, it works without regex and concatenation as the query below:
So, what we can understand from this? Because when i do like that, shuffle read 
sizes are equally distributed between partitions.
val usersInputDF = sqlContext.sql(s""" |  select userid from landing 
where dt='2015-9' and userid != '' and userid is not null and userid is not 
NULL and pagetype = 'productDetail' group by userid
   """.stripMargin)
@java8964
I tried with sql.shuffle.partitions = 1 but no luck. It’s again one of the 
partitions shuffle size is huge and the others are very small.

——So how can i balance this shuffle read size between partitions?

On 24 Sep 2015, at 03:35, Zhang, Jingyu  wrote:Is you 
sql works if do not runs a regex on strings and concatenates them, I mean just 
Select the stuff without String operations?

On 24 September 2015 at 10:11, java8964  wrote:



Try to increase partitions count, that will make each partition has less data.
Yong

Subject: Re: Java Heap Space Error
From: yu...@useinsider.com
Date: Thu, 24 Sep 2015 00:32:47 +0300
CC: user@spark.apache.org
To: java8...@hotmail.com

Yes, it’s possible. I use S3 as data source. My external tables has 
partitioned. Belowed task is 193/200. Job has 2 stages and its 193. task of 200 
in 2.stage because of sql.shuffle.partitions. 
How can i avoid this situation, this is my query:
select userid,concat_ws(' ',collect_list(concat_ws(' ',if(productname is not 
NULL,lower(productname),''),lower(regexp_replace(regexp_replace(substr(productcategory,2,length(productcategory)-2),'\"',''),\",\",'
 ') inputlist from landing where dt='2015-9' and userid != '' and userid is 
not null and userid is not NULL and pagetype = 'productDetail' group by userid

On 23 Sep 2015, at 23:55, java8964  wrote:
Based on your description, you job shouldn't have any shuffle then, as you just 
apply regex and concatenation on the column, but there is one partition having 
4.3M records to be read, vs less than 1M records for other partitions.
Is that possible? It depends on what is the source of your data.
If there is shuffle in your query (More than 2 stages generated by your query, 
and this is my guess of what happening), then it simple means that one 
partition having way more data than the rest of partitions.
Yong
From: yu...@useinsider.com
Subject: Java Heap Space Error
Date: Wed, 23 Sep 2015 23:07:17 +0300
To: user@spark.apache.org

What can cause this issue in the attached picture? I’m running and sql query 
which runs a regex on strings and concatenates them. Because of this task, my 
job gives java heap space error.

  





This message and its attachments may contain legally privileged or confidential 
information. It is intended solely for the named addressee. If you are not the 
addressee indicated in this message or responsible for delivery of the message 
to the addressee, you may not copy or deliver this message or its attachments 
to anyone. Rather, you should permanently delete this message and its 
attachments and kindly notify the sender by reply e-mail. Any content of this 
message and its attachments which does not relate to the official business of 
the sending company must be taken not to have been sent or endorsed by that 
company or any of its related entities. No warranty is made that the e-mail or 
attachments are free from computer virus or other defect.
  

Re: reduceByKeyAndWindow confusion

2015-09-24 Thread Adrian Tanase
Let me take a stab at your questions – can you clarify some of the points 
below? I’m wondering if you’re using the streaming concepts as they were 
intended…

1. Windowed operations

First, I just want to confirm that it is your intention to split the original 
kafka stream into multiple Dstreams – and that grouping by key or 
repartitioning using your Map[Int, List[String]] is not enough.
I have never come across a situation where I needed to do this, so I’m 
wondering if what you need is a simple groupBy / reduceByKey or similar.

Putting that aside, your code below suggests that you’re “windowing” (is that a 
word? :)) the stream twice. You call window on the kafka stream and then you 
“reduce by key and window” the resulting stream.
Again, wondering if your intention is not simply to “reduce by key and window”.

Another thing – you’re computing your RDDs based on the slide duration of the 
window, but that is not correct. The number of RDDs is determined by the “batch 
interval”, which is constant across the streaming context (e.g. 10 seconds, 1 
minute, whatever). Both window duration and slide interval need to be multiples 
of this.

2. Checkpointing

First of all, you should call checkpoint on the streaming context 
ssc.checkpoint(checkpointDir) - where checkpoint dir needs to be a folder in 
local mode and HDFS in cluster mode.
This is probably where the error comes from.

Second – kafka is backed by durable storage so you don’t need to checkpoint 
it’s contents as it an always replay events in case of failure. You could do it 
if you go through the same data multiple times, as a performance enhancement, 
but you don’t have to.

Third – the windowed operation NEEDS to checkpoint data, as it’s stateful – all 
the stateful operations call persist internally as you’ve seen, to avoid 
recreating the full state from original events in case of failure. Doing this 
for a window of 1 hour could take way too long.

Last but not least – when you call checkpoint(interval) you can choose to 
checkpoint more often or less often than the default value. See the 
checkpointing 
docs
 for more info:


For stateful transformations that require RDD checkpointing, the default 
interval is a multiple of the batch interval that is at least 10 seconds. It 
can be set by using dstream.checkpoint(checkpointInterval). Typically, a 
checkpoint interval of 5 - 10 sliding intervals of a DStream is a good setting 
to try.

Hope this helps,
-adrian

From: srungarapu vamsi
Date: Wednesday, September 23, 2015 at 10:51 PM
To: user
Subject: reduceByKeyAndWindow confusion

I create  a stream from kafka as belows"

val kafkaDStream  = 
KafkaUtils.createDirectStream[String,KafkaGenericEvent,StringDecoder,KafkaGenericEventsDecoder](ssc,
 kafkaConf, Set(topics))
  
.window(Minutes(WINDOW_DURATION),Minutes(SLIDER_DURATION))

I have a map ("intToStringList") which is a Map[Int,List[String]]
using this map i am filtering the stream and finally converting it into
Map[Int,DStream[KafkaGenericEvent]]]

1.
Now on this map, for each and every value (which is a 
DStream[KafkaGenericEvent])
i am applying reduceByKeyAndWindow operation.
But since we have to give window duration and slider duration even in 
reduceByKeyAndWindow, does that imply that on every window of the given 
DStream, reduceByKeyAndWindow can be applied with a different window duration 
and slider duration ?
i.e Lets say window DStream is created with window duration-> 16 minutes,
slider duration -> 1 Minute, so  i have one RDD for every window
For reduceByKeyAndWindow, if we have window duration as as 4 minutes and slider 
duration as 1 minute, then will i get 4 RDDs since the 
windowDStream_batchDuration / reduceByKeyAndwindow_batchDuration is 4 ?

2.
As suggested in spark doc, i am trying to give checkpointing interval on the 
kafkaDStream created in the block shown above in the following way:
kafkaDStream.checkpoint(Minutes(4))

But when i execute this, i get the error:
"WindowedDStream has been marked for checkpointing but the storage level has 
not been set to enable persisting. Please use DStream.persist() to set the 
storage level to use memory for better checkpointing performance"
But when i went through the implementation of checkpoint function  of 
DStream.scala, i see a call to persist() function.
Then do i really have to persist function in the WindowedDStream ?
Just to give a shot i made a call to persist method on the windowedDStream and 
then made a call to checkpoint(interval) . Even then i am facing the above 
mentioned error.
How do i solve this ?
--
/Vamsi



Re: kafka direct streaming with checkpointing

2015-09-24 Thread Cody Koeninger
No, you cant use checkpointing across code changes.  Either store offsets
yourself, or start up your new app code and let it catch up before killing
the old one.

On Thu, Sep 24, 2015 at 8:40 AM, Radu Brumariu  wrote:

> Hi,
> in my application I use Kafka direct streaming and I have also enabled
> checkpointing.
> This seems to work fine if the application is restarted. However if I
> change the code and resubmit the application, it cannot start because of
> the checkpointed data being of different class versions.
> Is there any way I can use checkpointing that can survive across
> application version changes?
>
> Thanks,
> Radu
>
>


Re: Spark ClosureCleaner or java serializer OOM when trying to grow

2015-09-24 Thread Ted Yu
Please decrease spark.serializer.objectStreamReset for your queries. The
default value is 100.

I logged SPARK-10787 for improvement.

Cheers

On Wed, Sep 23, 2015 at 6:59 PM, jluan  wrote:

> I have been stuck on this problem for the last few days:
>
> I am attempting to run random forest from MLLIB, it gets through most of
> it,
> but breaks when doing a mapPartition operation. The following stack trace
> is
> shown:
>
> : An error occurred while calling o94.trainRandomForestModel.
> : java.lang.OutOfMemoryError
> at
> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> at
> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> at
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at
> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> at
>
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
> at
>
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
> at
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at
>
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
> at
>
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
> at
>
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
> at
>
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
> at org.apache.spark.SparkContext.clean(SparkContext.scala:2021)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:703)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:702)
> at
>
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
> at
>
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
> at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:702)
> at
>
> org.apache.spark.mllib.tree.DecisionTree$.findBestSplits(DecisionTree.scala:625)
> at
> org.apache.spark.mllib.tree.RandomForest.run(RandomForest.scala:235)
> at
>
> org.apache.spark.mllib.tree.RandomForest$.trainClassifier(RandomForest.scala:291)
> at
>
> org.apache.spark.mllib.api.python.PythonMLLibAPI.trainRandomForestModel(PythonMLLibAPI.scala:742)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
> at
> py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
> at py4j.Gateway.invoke(Gateway.java:259)
> at
> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at py4j.GatewayConnection.run(GatewayConnection.java:207)
> at java.lang.Thread.run(Thread.java:745)
>
> It seems to me that it's trying to serialize the mapPartitions closure, but
> runs out of space doing so. However I don't understand how it could run out
> of space when I gave the driver ~190GB for a file that's 45MB.
>
> I have a cluster setup on AWS such that my master is a r3.8xlarge along
> with
> two r3.4xlarge workers. I have the following configurations:
>
> spark version: 1.5.0
> ---
> spark.executor.memory 32000m
> spark.driver.memory 23m
> spark.driver.cores 10
> spark.executor.cores 5
> spark.executor.instances 17
> spark.driver.maxResultSize 0
> spark.storage.safetyFraction 1
> spark.storage.memoryFraction 0.9
> spark.storage.shuffleFraction 0.05
> spark.default.parallelism 128
>
> The master machine has approximately 240 GB of ram and each worker has
> about
> 120GB of ram.
>
> I load in a relatively tiny RDD of MLLIB LabeledPoint objects, with each
> holding sparse vectors inside. This RDD has a total size of roughly 45MB.
> My
> sparse vector has a total length of ~15 million while only about 3000 or so
> are non-zeros.
>
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-ClosureCleaner-or-java-serializer-OOM-when-trying-to-grow-tp24796.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, 

Networking issues with Spark on EC2

2015-09-24 Thread SURAJ SHETH
Hi,

I am using Spark 1.2 and facing network related issues while performing
simple computations.

This is a custom cluster set up using ec2 machines and spark prebuilt
binary from apache site. The problem is only when we have workers on other
machines(networking involved). Having a single node for the master and the
slave works correctly.

The error log from slave node is attached below. It is reading textFile
from local FS(copied each node) and counting it. The first 30 tasks get
completed within 5 seconds. Then, it takes several minutes to complete
another 10 tasks and eventually dies.

Sometimes, one of the workers completes all the tasks assigned to it.
Different workers have different behavior at different
times(non-deterministic).

Is it related to something specific to EC2?



15/09/24 13:04:40 INFO Executor: Running task 117.0 in stage 0.0 (TID 117)

15/09/24 13:04:41 INFO TorrentBroadcast: Started reading broadcast variable
1

15/09/24 13:04:41 INFO SendingConnection: Initiating connection to
[master_ip:56305]

15/09/24 13:04:41 INFO SendingConnection: Connected to
[master_ip/master_ip_address:56305], 1 messages pending

15/09/24 13:05:41 INFO TorrentBroadcast: Started reading broadcast variable
1

15/09/24 13:05:41 ERROR Executor: Exception in task 77.0 in stage 0.0 (TID
77)

java.io.IOException: sendMessageReliably failed because ack was not
received within 60 sec

at
org.apache.spark.network.nio.ConnectionManager$$anon$13$$anonfun$run$19.apply(ConnectionManager.scala:918)

at
org.apache.spark.network.nio.ConnectionManager$$anon$13$$anonfun$run$19.apply(ConnectionManager.scala:917)

at scala.Option.foreach(Option.scala:236)

at
org.apache.spark.network.nio.ConnectionManager$$anon$13.run(ConnectionManager.scala:917)

at
io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:581)

at
io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:656)

at
io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:367)

at java.lang.Thread.run(Thread.java:745)

15/09/24 13:05:41 INFO CoarseGrainedExecutorBackend: Got assigned task 122

15/09/24 13:05:41 INFO Executor: Running task 3.1 in stage 0.0 (TID 122)

15/09/24 13:06:41 ERROR Executor: Exception in task 113.0 in stage 0.0 (TID
113)

java.io.IOException: sendMessageReliably failed because ack was not
received within 60 sec

at
org.apache.spark.network.nio.ConnectionManager$$anon$13$$anonfun$run$19.apply(ConnectionManager.scala:918)

at
org.apache.spark.network.nio.ConnectionManager$$anon$13$$anonfun$run$19.apply(ConnectionManager.scala:917)

at scala.Option.foreach(Option.scala:236)

at
org.apache.spark.network.nio.ConnectionManager$$anon$13.run(ConnectionManager.scala:917)

at
io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:581)

at
io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:656)

at
io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:367)

at java.lang.Thread.run(Thread.java:745)

15/09/24 13:06:41 INFO TorrentBroadcast: Started reading broadcast variable
1

15/09/24 13:06:41 INFO SendingConnection: Initiating connection to
[master_ip/master_ip_address:44427]

15/09/24 13:06:41 INFO SendingConnection: Connected to
[master_ip/master_ip_address:44427], 1 messages pending

15/09/24 13:07:41 ERROR Executor: Exception in task 37.0 in stage 0.0 (TID
37)

java.io.IOException: sendMessageReliably failed because ack was not
received within 60 sec

at
org.apache.spark.network.nio.ConnectionManager$$anon$13$$anonfun$run$19.apply(ConnectionManager.scala:918)

at
org.apache.spark.network.nio.ConnectionManager$$anon$13$$anonfun$run$19.apply(ConnectionManager.scala:917)

at scala.Option.foreach(Option.scala:236)

at
org.apache.spark.network.nio.ConnectionManager$$anon$13.run(ConnectionManager.scala:917)

at
io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:581)

at
io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:656)

at
io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:367)

at java.lang.Thread.run(Thread.java:745)





I checked the network speed between the master and the slave and it is able
to scp large files at a speed of 60 MB/s.

Any leads on how this can be fixed?



Thanks and Regards,

Suraj Sheth


Re: Long running Spark Streaming Job increasing executing time per batch

2015-09-24 Thread Jeremy Smith
I found a similar issue happens when there is a memory leak in the spark
application (or, in my case, one of the libraries that's used in the spark
application).  Gradually, unclaimed objects make their way into old or
permanent generation space, reducing the available heap.  It causes GC
overhead to build up until the application is spending most of its time
trying (and failing) to free up memory.  In some cases, this eventually
leads to an exception about GC pressure or Out of Memory.  But (at least for
me) that can take hours or days to happen.

I eventually identified the memory leak by running the spark app locally and
attaching VisualVM (a free cross-platform tool; I don't want to post the
link and be marked as spam, but you should be able to find it easily). 
Using the memory profiling tab, you can see how many instances of each class
exist in memory.  Letting the application run for a while, it is easy to see
when some classes' instance count only increases forever, which in turn
makes it easier to find the memory leak.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Long-running-Spark-Streaming-Job-increasing-executing-time-per-batch-tp7918p24804.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Reading avro data using KafkaUtils.createDirectStream

2015-09-24 Thread Daniel Haviv
Hi,
I'm trying to use KafkaUtils.createDirectStream to read avro messages from
Kafka but something is off with my type arguments:

val avroStream = KafkaUtils.createDirectStream[AvroKey[GenericRecord],
GenericRecord, NullWritable, AvroInputFormat[GenericRecord]](ssc,
kafkaParams, topicSet)

I'm getting the following error:
:47: error: type arguments
[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord],org.apache.avro.generic.GenericRecord,org.apache.hadoop.io.NullWritable,org.apache.avro.mapred.AvroInputFormat[org.apache.avro.generic.GenericRecord]]
conform to the bounds of none of the overloaded alternatives of
 value createDirectStream: [K, V, KD <: kafka.serializer.Decoder[K], VD <:
kafka.serializer.Decoder[V]](jssc:
org.apache.spark.streaming.api.java.JavaStreamingContext, keyClass:
Class[K], valueClass: Class[V], keyDecoderClass: Class[KD],
valueDecoderClass: Class[VD], kafkaParams: java.util.Map[String,String],
topics:
java.util.Set[String])org.apache.spark.streaming.api.java.JavaPairInputDStream[K,V]
 [K, V, KD <: kafka.serializer.Decoder[K], VD <:
kafka.serializer.Decoder[V]](ssc:
org.apache.spark.streaming.StreamingContext, kafkaParams:
Map[String,String], topics: Set[String])(implicit evidence$19:
scala.reflect.ClassTag[K], implicit evidence$20: scala.reflect.ClassTag[V],
implicit evidence$21: scala.reflect.ClassTag[KD], implicit evidence$22:
scala.reflect.ClassTag[VD])org.apache.spark.streaming.dstream.InputDStream[(K,
V)]

What am I doing wrong?

Thank you.
Daniel


Re: Reading avro data using KafkaUtils.createDirectStream

2015-09-24 Thread Cody Koeninger
Your third and fourth type parameters need to be subclasses of
kafka.serializer.Decoder

On Thu, Sep 24, 2015 at 10:30 AM, Daniel Haviv <
daniel.ha...@veracity-group.com> wrote:

> Hi,
> I'm trying to use KafkaUtils.createDirectStream to read avro messages from
> Kafka but something is off with my type arguments:
>
> val avroStream = KafkaUtils.createDirectStream[AvroKey[GenericRecord],
> GenericRecord, NullWritable, AvroInputFormat[GenericRecord]](ssc,
> kafkaParams, topicSet)
>
> I'm getting the following error:
> :47: error: type arguments
> [org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord],org.apache.avro.generic.GenericRecord,org.apache.hadoop.io.NullWritable,org.apache.avro.mapred.AvroInputFormat[org.apache.avro.generic.GenericRecord]]
> conform to the bounds of none of the overloaded alternatives of
>  value createDirectStream: [K, V, KD <: kafka.serializer.Decoder[K], VD <:
> kafka.serializer.Decoder[V]](jssc:
> org.apache.spark.streaming.api.java.JavaStreamingContext, keyClass:
> Class[K], valueClass: Class[V], keyDecoderClass: Class[KD],
> valueDecoderClass: Class[VD], kafkaParams: java.util.Map[String,String],
> topics:
> java.util.Set[String])org.apache.spark.streaming.api.java.JavaPairInputDStream[K,V]
>  [K, V, KD <: kafka.serializer.Decoder[K], VD <:
> kafka.serializer.Decoder[V]](ssc:
> org.apache.spark.streaming.StreamingContext, kafkaParams:
> Map[String,String], topics: Set[String])(implicit evidence$19:
> scala.reflect.ClassTag[K], implicit evidence$20: scala.reflect.ClassTag[V],
> implicit evidence$21: scala.reflect.ClassTag[KD], implicit evidence$22:
> scala.reflect.ClassTag[VD])org.apache.spark.streaming.dstream.InputDStream[(K,
> V)]
>
> What am I doing wrong?
>
> Thank you.
> Daniel
>
>
>


Re: Scala api end points

2015-09-24 Thread Ndjido Ardo BAR
Hi Masoom Alam,

I successfully experimented the following project on Github
https://github.com/erisa85/WikiSparkJobServer . I do recommand it to you.

cheers,
Ardo.

On Thu, Sep 24, 2015 at 5:20 PM, masoom alam 
wrote:

> Hi everyone
>
> I am new to Scala. I have a written an application using scala in
> spark Now we want to interface it through rest api end points..what
> is the best choice with usplease share ur experiences
>
> Thanks
>


Re: Java Heap Space Error

2015-09-24 Thread Yusuf Can Gürkan
Yes right, the query you wrote worked in same cluster. In this case, partitions 
were equally distributed but when i used regex and concetanations it’s not as i 
said before. Query with concetanation is below:

val usersInputDF = sqlContext.sql(
  s"""
 |  select userid,concat_ws(' ',collect_list(concat_ws(' ',if(productname 
is not 
NULL,lower(productname),''),lower(regexp_replace(regexp_replace(substr(productcategory,2,length(productcategory)-2),'\"',''),\",\",'
 ') inputlist from landing where 
dt='${dateUtil.getYear}-${dateUtil.getMonth}' and day >= '${day}' and userid != 
'' and userid is not null and userid is not NULL and pagetype = 'productDetail' 
group by userid

   """.stripMargin)


> On 24 Sep 2015, at 16:52, java8964  wrote:
> 
> This is interesting.
> 
> So you mean that query as 
> 
> "select userid from landing where dt='2015-9' and userid != '' and userid is 
> not null and userid is not NULL and pagetype = 'productDetail' group by 
> userid"
> 
> works in your cluster?
> 
> In this case, do you also see this one task with way more data than the rest, 
> as it happened when you use regex and concatenation?
> 
> It is hard to believe that just add "regex" and "concatenation" will make the 
> distribution more equally across partitions. In your query, the distribution 
> in the partitions simply depends on the Hash partitioner of "userid".
> 
> Can you show us the query after you add "regex" and "concatenation"?
> 
> Yong
> 
> Subject: Re: Java Heap Space Error
> From: yu...@useinsider.com
> Date: Thu, 24 Sep 2015 15:34:48 +0300
> CC: user@spark.apache.org
> To: jingyu.zh...@news.com.au; java8...@hotmail.com
> 
> @Jingyu
> Yes, it works without regex and concatenation as the query below:
> 
> So, what we can understand from this? Because when i do like that, shuffle 
> read sizes are equally distributed between partitions.
> 
> val usersInputDF = sqlContext.sql(
> s"""
>  |  select userid from landing where dt='2015-9' and userid != '' and 
> userid is not null and userid is not NULL and pagetype = 'productDetail' 
> group by userid
> 
>""".stripMargin)
> 
> @java8964
> 
> I tried with sql.shuffle.partitions = 1 but no luck. It’s again one of 
> the partitions shuffle size is huge and the others are very small.
> 
> 
> ——
> So how can i balance this shuffle read size between partitions?
> 
> 
> On 24 Sep 2015, at 03:35, Zhang, Jingyu  > wrote:
> 
> Is you sql works if do not runs a regex on strings and concatenates them, I 
> mean just Select the stuff without String operations?
> 
> On 24 September 2015 at 10:11, java8964  > wrote:
> Try to increase partitions count, that will make each partition has less data.
> 
> Yong
> 
> Subject: Re: Java Heap Space Error
> From: yu...@useinsider.com 
> Date: Thu, 24 Sep 2015 00:32:47 +0300
> CC: user@spark.apache.org 
> To: java8...@hotmail.com 
> 
> 
> Yes, it’s possible. I use S3 as data source. My external tables has 
> partitioned. Belowed task is 193/200. Job has 2 stages and its 193. task of 
> 200 in 2.stage because of sql.shuffle.partitions. 
> 
> How can i avoid this situation, this is my query:
> 
> select userid,concat_ws(' ',collect_list(concat_ws(' ',if(productname is not 
> NULL,lower(productname),''),lower(regexp_replace(regexp_replace(substr(productcategory,2,length(productcategory)-2),'\"',''),\",\",'
>  ') inputlist from landing where dt='2015-9' and userid != '' and userid 
> is not null and userid is not NULL and pagetype = 'productDetail' group by 
> userid
> 
> On 23 Sep 2015, at 23:55, java8964  > wrote:
> 
> Based on your description, you job shouldn't have any shuffle then, as you 
> just apply regex and concatenation on the column, but there is one partition 
> having 4.3M records to be read, vs less than 1M records for other partitions.
> 
> Is that possible? It depends on what is the source of your data.
> 
> If there is shuffle in your query (More than 2 stages generated by your 
> query, and this is my guess of what happening), then it simple means that one 
> partition having way more data than the rest of partitions.
> 
> Yong
> 
> From: yu...@useinsider.com 
> Subject: Java Heap Space Error
> Date: Wed, 23 Sep 2015 23:07:17 +0300
> To: user@spark.apache.org 
> 
> What can cause this issue in the attached picture? I’m running and sql query 
> which runs a regex on strings and concatenates them. Because of this task, my 
> job gives java heap space error.
> 
> 
> 
> 
> 
> This message and its attachments may contain legally privileged or 
> confidential information. It is intended solely for the named addressee. If 
> you are not the addressee indicated in 

Re: Remove duplicate keys by always choosing first in file.

2015-09-24 Thread Philip Weaver
Oops, I didn't catch the suggestion to just use RDD.zipWithIndex, which I
forgot existed (and I've discoverd I actually used in another project!). I
will use that instead of the mapPartitionsWithIndex/zipWithIndex solution
that I posted originally.

On Tue, Sep 22, 2015 at 9:07 AM, Philip Weaver 
wrote:

> The indices are definitely necessary. My first solution was just
> reduceByKey { case (v, _) => v } and that didn't work. I needed to look at
> both values and see which had the lower index.
>
> On Tue, Sep 22, 2015 at 8:54 AM, Sean Owen  wrote:
>
>> The point is that this only works if you already knew the file was
>> presented in order within and across partitions, which was the
>> original problem anyway. I don't think it is in general, but in
>> practice, I do imagine it's already in the expected order from
>> textFile. Maybe under the hood this ends up being ensured by
>> TextInputFormat.
>>
>> So, adding the index and sorting on it doesn't add anything.
>>
>> On Tue, Sep 22, 2015 at 4:38 PM, Adrian Tanase  wrote:
>> > just give zipWithIndex a shot, use it early in the pipeline. I think it
>> > provides exactly the info you need, as the index is the original line
>> number
>> > in the file, not the index in the partition.
>> >
>> > Sent from my iPhone
>> >
>> > On 22 Sep 2015, at 17:50, Philip Weaver 
>> wrote:
>> >
>> > Thanks. If textFile can be used in a way that preserves order, than
>> both the
>> > partition index and the index within each partition should be
>> consistent,
>> > right?
>> >
>> > I overcomplicated the question by asking about removing duplicates.
>> > Fundamentally I think my question is, how does one sort lines in a file
>> by
>> > line number.
>> >
>> > On Tue, Sep 22, 2015 at 6:15 AM, Adrian Tanase 
>> wrote:
>> >>
>> >> By looking through the docs and source code, I think you can get away
>> with
>> >> rdd.zipWithIndex to get the index of each line in the file, as long as
>> you
>> >> define the parallelism upfront:
>> >> sc.textFile("README.md", 4)
>> >>
>> >> You can then just do .groupBy(…).mapValues(_.sortBy(…).head) - I’m
>> >> skimming through some tuples, hopefully this is clear enough.
>> >>
>> >> -adrian
>> >>
>> >> From: Philip Weaver
>> >> Date: Tuesday, September 22, 2015 at 3:26 AM
>> >> To: user
>> >> Subject: Remove duplicate keys by always choosing first in file.
>> >>
>> >> I am processing a single file and want to remove duplicate rows by some
>> >> key by always choosing the first row in the file for that key.
>> >>
>> >> The best solution I could come up with is to zip each row with the
>> >> partition index and local index, like this:
>> >>
>> >> rdd.mapPartitionsWithIndex { case (partitionIndex, rows) =>
>> >>   rows.zipWithIndex.map { case (row, localIndex) => (row.key,
>> >> ((partitionIndex, localIndex), row)) }
>> >> }
>> >>
>> >>
>> >> And then using reduceByKey with a min ordering on the (partitionIndex,
>> >> localIndex) pair.
>> >>
>> >> First, can i count on SparkContext.textFile to read the lines in such
>> that
>> >> the partition indexes are always increasing so that the above works?
>> >>
>> >> And, is there a better way to accomplish the same effect?
>> >>
>> >> Thanks!
>> >>
>> >> - Philip
>> >>
>> >
>>
>
>


Scala api end points

2015-09-24 Thread masoom alam
Hi everyone

I am new to Scala. I have a written an application using scala in spark
Now we want to interface it through rest api end points..what is the
best choice with usplease share ur experiences

Thanks


Re: spark + parquet + schema name and metadata

2015-09-24 Thread Cheng Lian
Thanks for the feedback, just filed 
https://issues.apache.org/jira/browse/SPARK-10803 to track this issue.


Cheng

On 9/24/15 4:25 AM, Borisa Zivkovic wrote:

Hi,

your suggestion works nicely.. I was able to attach metadata to 
columns and read that metadata from spark and by using ParquetFileReader
It would be nice if we had a way to manipulate parquet metadata 
directly from DataFrames though.


regards

On Wed, 23 Sep 2015 at 09:25 Borisa Zivkovic 
> wrote:


Hi,

thanks a lot for this! I will try it out to see if this works ok.

I am planning to use "stable" metadata - so those will be same
across all parquet files inside directory hierarchy...



On Tue, 22 Sep 2015 at 18:54 Cheng Lian > wrote:

Michael reminded me that although we don't support direct
manipulation over Parquet metadata, you can still save/query
metadata to/from Parquet via DataFrame per-column metadata.
For example:

import sqlContext.implicits._
import org.apache.spark.sql.types.MetadataBuilder

val path = "file:///tmp/parquet/meta"

// Saving metadata
val meta = new MetadataBuilder().putString("appVersion",
"1.0.2").build()
sqlContext.range(10).select($"id".as("id",
meta)).coalesce(1).write.mode("overwrite").parquet(path)

// Querying metadata

sqlContext.read.parquet(path).schema("id").metadata.getString("appVersion")

The metadata is saved together with Spark SQL schema as a JSON
string. For example, the above code generates the following
Parquet metadata (inspected with parquet-meta):

file:

file:/private/tmp/parquet/meta/part-r-0-77cb2237-e6a8-4cb6-a452-ae205ba7b660.gz.parquet
creator: parquet-mr version 1.6.0
extra: org.apache.spark.sql.parquet.row.metadata =

{"type":"struct","fields":[{"name":"id","type":"long","nullable":true,*"metadata":{"appVersion":"1.0.2"}*}]}


Cheng


On 9/22/15 9:37 AM, Cheng Lian wrote:

I see, this makes sense. We should probably add this in Spark
SQL.

However, there's one corner case to note about user-defined
Parquet metadata. When committing a write job,
ParquetOutputCommitter writes Parquet summary files
(_metadata and _common_metadata), and user-defined key-value
metadata written in all Parquet part-files get merged here.
The problem is that, if a single key is associated with
multiple values, Parquet doesn't know how to reconcile this
situation, and simply gives up writing summary files. This
can be particular annoying for appending. In general, users
should avoid storing "unstable" values like timestamps as
Parquet metadata.

Cheng

On 9/22/15 1:58 AM, Borisa Zivkovic wrote:

thanks for answer.

I need this in order to be able to track schema metadata.

basically when I create parquet files from Spark I want to
be able to "tag" them in some way (giving the schema
appropriate name or attaching some key/values) and then it
is fairly easy to get basic metadata about parquet files
when processing and discovering those later on.

On Mon, 21 Sep 2015 at 18:17 Cheng Lian
> wrote:

Currently Spark SQL doesn't support customizing schema
name and
metadata. May I know why these two matters in your use
case? Some
Parquet data models, like parquet-avro, do support it,
while some others
don't (e.g. parquet-hive).

Cheng

On 9/21/15 7:13 AM, Borisa Zivkovic wrote:
> Hi,
>
> I am trying to figure out how to write parquet
metadata when
> persisting DataFrames to parquet using Spark (1.4.1)
>
> I could not find a way to change schema name (which
seems to be
> hardcoded to root) and also how to add data to
key/value metadata in
> parquet footer.
>
>
org.apache.parquet.hadoop.metadata.FileMetaData#getKeyValueMetaData
>
> org.apache.parquet.schema.Type#getName
>
> thanks
>
>









NegativeArraySizeException on Spark SQL window function

2015-09-24 Thread Bae, Jae Hyeon
Hi Spark users

Can somebody explain about the following WARN with exception? I am running
Spark 1.5.0 and the job was successful but I am wondering whether it's
totally OK to keep using Spark SQL window function

15/09/24 06:31:49 WARN TaskSetManager: Lost task 17.0 in stage 4.0 (TID
18907, rt-mesos14-sjc1.prod.uber.internal):
java.lang.NegativeArraySizeException
at
org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:223)
at org.apache.spark.unsafe.types.UTF8String.clone(UTF8String.java:827)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown
Source)
at
org.apache.spark.sql.execution.Window$$anonfun$8$$anon$1.next(Window.scala:325)
at
org.apache.spark.sql.execution.Window$$anonfun$8$$anon$1.next(Window.scala:252)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:48)
at
org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEsWithMeta$1.apply(EsSpark.scala:86)
at
org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEsWithMeta$1.apply(EsSpark.scala:86)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

The window function I used is,

lead(ts) over (partition by A, B, C order by ts) as next_ts,
lead(status) over (partition by A, B, C order by ts) as next_status,

Thank you
Best, Jae


Re: SparkContext declared as object variable

2015-09-24 Thread Priya Ch
object StreamJob {

  val conf = new SparkConf
  val sc = new SparkContext(conf)

 def main(args:Array[String])
 {
val baseRDD =
sc.parallelize(Array("hi","hai","hi","bye","bye","hi","hai","hi","bye","bye"))
val words = baseRDD.flatMap(line => line.split(","))
val wordPairs = words.map(word => (word,1))
val reducedWords = wordPairs.reduceByKey((a,b) => a+b)
reducedWords.print
  }
}

Please try to run this code in cluster mode, possibly YARN mode and the
spark version is 1.3.0

This has thrown
java.io.IOException: org.apache.spark.SparkException: Failed to get
broadcast_5_piece0 of broadcast_5

Regards,
Padma Ch

On Thu, Sep 24, 2015 at 11:25 AM, Akhil Das 
wrote:

> It should, i don't see any reason for it to not run in cluster mode.
>
> Thanks
> Best Regards
>
> On Wed, Sep 23, 2015 at 8:56 PM, Priya Ch 
> wrote:
>
>> does it run in cluster mode ???
>>
>> On Wed, Sep 23, 2015 at 7:11 PM, Akhil Das 
>> wrote:
>>
>>> Yes of course it works.
>>>
>>> [image: Inline image 1]
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Tue, Sep 22, 2015 at 4:53 PM, Priya Ch 
>>> wrote:
>>>
 Parallelzing some collection (array of strings). Infact in our product
 we are reading data from kafka using KafkaUtils.createStream and applying
 some transformations.

 Is creating sparContext at object level instead of creating in main
 doesn't work 

 On Tue, Sep 22, 2015 at 2:59 PM, Akhil Das 
 wrote:

> Its a "value" not a variable, and what are you parallelizing here?
>
> Thanks
> Best Regards
>
> On Fri, Sep 18, 2015 at 11:21 PM, Priya Ch <
> learnings.chitt...@gmail.com> wrote:
>
>> Hello All,
>>
>>   Instead of declaring sparkContext in main, declared as object
>> variable as -
>>
>>  object sparkDemo
>> {
>>
>>  val conf = new SparkConf
>>  val sc = new SparkContext(conf)
>>
>>   def main(args:Array[String])
>>   {
>>
>> val baseRdd = sc.parallelize()
>>.
>>.
>>.
>>   }
>>
>> }
>>
>> But this piece of code is giving :
>> java.io.IOException: org.apache.spark.SparkException: Failed to get
>> broadcast_5_piece0 of broadcast_5
>> at
>> org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1155)
>> at
>> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
>> at
>> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
>> at
>> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
>> at
>> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
>> at
>> org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
>> at
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58)
>> at org.apache.spark.scheduler.Task.run(Task.scala:64)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: org.apache.spark.SparkException: Failed to get
>> broadcast_5_piece0 of broadcast_5
>> at
>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:137)
>> at
>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:137)
>> at scala.Option.getOrElse(Option.scala:120)
>> at
>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:136)
>> at
>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
>> at
>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
>> at scala.collection.immutable.List.foreach(List.scala:318)
>> at org.apache.spark.broadcast.TorrentBroadcast.org
>> 
>> $apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:119)
>> at
>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:174)

reduceByKey inside updateStateByKey in Spark Streaming???

2015-09-24 Thread swetha
Hi,

How to use reduceByKey inside updateStateByKey? Suppose I have a bunch of
keys for which I need to do sum and average inside the  updateStateByKey by
joining with old state. How do I accomplish that?


Thanks,
Swetha



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/reduceByKey-inside-updateStateByKey-in-Spark-Streaming-tp24808.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: JdbcRDD Constructor

2015-09-24 Thread Deenar Toraskar
On 24 September 2015 at 17:48, Deenar Toraskar <
deenar.toras...@thinkreactive.co.uk> wrote:

> you are interpreting the JDBCRDD API incorrectly. If you want to use
> partitions, then the column used to partition and present in the where
> clause must be numeric and the lower bound and upper bound must be the min
> and max values of the column. Spark will equally distribute the range over
> the number of partitions selected. So in your case OFFSET is the first
> placeholder and LIMIT the second
>
> numPartitions 1 - Your query will be called once with first placeholder 0
> and second placeholder 100, this explains how you get 100 rows
> select *  from schema.Table OFFSET 0 LIMIT 100
>
> numPartitions 2 - Your query will be called twice with first placeholder
> 0 and second placeholder 50, and second time with 51,100. Again this
> explains why you get 150 records
>
> select *  from schema.Table OFFSET 0 LIMIT 50
> select *  from schema.Table OFFSET 51 LIMIT 100
>
> numPartitions 3 - Your query will be called thrice
>
> select *  from schema.Table OFFSET 0 LIMIT 34
> select *  from schema.Table OFFSET 35 LIMIT 67
> select *  from schema.Table OFFSET 68 LIMIT 100
>
> That explains why you get 201 records. You need to amend the query and
> provide correct lower and upper bounds aligned to the column used in the
> where clause.
>
> See
> http://www.sparkexpert.com/2015/01/02/load-database-data-into-spark-using-jdbcrdd-in-java/
>
> Deenar
>
>
>
>
> *Think Reactive Ltd*
> deenar.toras...@thinkreactive.co.uk
> 07714140812
>
>
>
> On 24 September 2015 at 11:55, satish chandra j 
> wrote:
>
>> HI Deenar,
>>
>> Please find the SQL query below:
>>
>> var SQL_RDD= new JdbcRDD( sc, ()=>
>> DriverManager.getConnection(url,user,pass),"select col1, col2,
>> col3..col 37 from schema.Table LIMIT ? OFFSET ?",100,0,*1*,(r:
>> ResultSet) => (r.getInt("col1"),r.getInt("col2")...r.getInt("col37")))
>>
>>
>> When I have the above 100,0,*1 * I am getting SQL_RDD.count as 100
>> When set to 100,0,2 I am getting SQL_RDD.count as 151
>> When set to 100,0,3 I am getting SQL RDD.count as 201
>>
>> But where as I expect every execution count should be 100, let me know if
>> I am missing anything here
>>
>> Regards,
>> Satish Chandra
>>
>>
>> On Thu, Sep 24, 2015 at 12:48 AM, Deenar Toraskar <
>> deenar.toras...@thinkreactive.co.uk> wrote:
>>
>>> Satish
>>>
>>> Can you post the SQL query you are using?
>>>
>>> The SQL query must have 2 placeholders and both of them should be an
>>> inclusive range (<= and >=)..
>>>
>>> e.g. select title, author from books where ? <= id and id <= ?
>>>
>>> Are you doing this?
>>>
>>> Deenar
>>>
>>>
>>>
>>>
>>> *Think Reactive Ltd*
>>> deenar.toras...@thinkreactive.co.uk
>>> 07714140812
>>>
>>>
>>>
>>> On 23 September 2015 at 13:47, satish chandra j <
>>> jsatishchan...@gmail.com> wrote:
>>>
 HI,
 Could anybody provide inputs if they have came across similar issue

 @Rishitesh
 Could you provide if any sample code to use JdbcRDDSuite


 Regards,
 Satish Chandra

 On Wed, Sep 23, 2015 at 5:14 PM, Rishitesh Mishra <
 rishi80.mis...@gmail.com> wrote:

> I am using Spark 1.5. I always get count = 100, irrespective of num
> partitions.
>
> On Wed, Sep 23, 2015 at 5:00 PM, satish chandra j <
> jsatishchan...@gmail.com> wrote:
>
>> HI,
>> Currently using Spark 1.2.2, could you please let me know correct
>> results output count which you got it by using JdbcRDDSuite
>>
>> Regards,
>> Satish Chandra
>>
>> On Wed, Sep 23, 2015 at 4:02 PM, Rishitesh Mishra <
>> rishi80.mis...@gmail.com> wrote:
>>
>>> Which version of Spark you are using ??  I can get correct results
>>> using JdbcRDD. Infact there is a test suite precisely for this (
>>> JdbcRDDSuite) .
>>> I changed according to your input and got correct results from this
>>> test suite.
>>>
>>> On Wed, Sep 23, 2015 at 11:00 AM, satish chandra j <
>>> jsatishchan...@gmail.com> wrote:
>>>
 HI All,

 JdbcRDD constructor has following parameters,

 *JdbcRDD
 *
 (SparkContext
 
  sc,
 scala.Function0 getConnection, String sql, *long 
 lowerBound,
 long upperBound, int numPartitions*,
 scala.Function1>
  mapRow,
 scala.reflect.ClassTag>>> 
 > evidence$1)


why more than more jobs in a batch in spark streaming ?

2015-09-24 Thread Shenghua(Daniel) Wan
Hi,
I noticed that in my streaming application reading from Kafka using
multiple receivers, there are 3 jobs in one batch (via web UI).
According to DAG there are two stages, job 0 execute both 2 stages, but job
1 and job 2 only execute stage 2. There is a disconnection between my
understanding and reality. I have gone over the book and did some googling,
but still I could not find the relationship between batch and jobs. Could
anyone share insights?
Thanks a lot!

-- 

Regards,
Shenghua (Daniel) Wan


Re: Java Heap Space Error

2015-09-24 Thread Yusuf Can Gürkan
Thank you very much. This makes sense. I will write after try your solution.

> On 24 Sep 2015, at 22:43, java8964  wrote:
> 
> I can understand why your first query will finish without OOM, but the new 
> one will fail with OOM.
> 
> In the new query, you are asking a groupByKey/cogroup operation, which will 
> force all the productName + prodcutionCatagory per user id sent to the same 
> reducer. This could easily below out reducer's memory if you have one user id 
> having lot of productName and productCatagory.
> 
> Keep in mind that Spark on the reducer side still use a Hash to merge all the 
> data from different mappers, so the memory in the reduce side has to be able 
> to merge all the productionName + productCatagory for the most frequently 
> shown up user id (at least), and I don't know why you want all the 
> productName and productCategory per user Id (Maybe a distinct could be 
> enough?).
> 
> Image you have one user id show up 1M time in your dataset, with 0.5M 
> productname as 'A', and 0.5M product name as 'B', and your query will push 1M 
> of 'A' and 'B' into the same reducer, and ask Spark to merge them in the 
> HashMap for you for that user Id. This will cause OOM.
> 
> Above all, you need to find out what is the max count per user id in your 
> data: select max(count(*)) from land where . group by userid
> 
> Your memory has to support that amount of productName and productCatagory, 
> and if your partition number is not high enough (even as your unique count of 
> user id), if that is really what you want, to consolidate all the 
> productionName and product catagory together, without even consider removing 
> duplication.
> 
> But both query still should push similar records count per partition, but 
> with much of different volume size of data.
> 
> Yong
> 
> Subject: Re: Java Heap Space Error
> From: yu...@useinsider.com
> Date: Thu, 24 Sep 2015 18:56:51 +0300
> CC: jingyu.zh...@news.com.au; user@spark.apache.org
> To: java8...@hotmail.com
> 
> Yes right, the query you wrote worked in same cluster. In this case, 
> partitions were equally distributed but when i used regex and concetanations 
> it’s not as i said before. Query with concetanation is below:
> 
> val usersInputDF = sqlContext.sql(
>   s"""
>  |  select userid,concat_ws(' ',collect_list(concat_ws(' ',if(productname 
> is not 
> NULL,lower(productname),''),lower(regexp_replace(regexp_replace(substr(productcategory,2,length(productcategory)-2),'\"',''),\",\",'
>  ') inputlist from landing where 
> dt='${dateUtil.getYear}-${dateUtil.getMonth}' and day >= '${day}' and userid 
> != '' and userid is not null and userid is not NULL and pagetype = 
> 'productDetail' group by userid
> 
>""".stripMargin)
> 
> 
> On 24 Sep 2015, at 16:52, java8964  > wrote:
> 
> This is interesting.
> 
> So you mean that query as 
> 
> "select userid from landing where dt='2015-9' and userid != '' and userid is 
> not null and userid is not NULL and pagetype = 'productDetail' group by 
> userid"
> 
> works in your cluster?
> 
> In this case, do you also see this one task with way more data than the rest, 
> as it happened when you use regex and concatenation?
> 
> It is hard to believe that just add "regex" and "concatenation" will make the 
> distribution more equally across partitions. In your query, the distribution 
> in the partitions simply depends on the Hash partitioner of "userid".
> 
> Can you show us the query after you add "regex" and "concatenation"?
> 
> Yong
> 
> Subject: Re: Java Heap Space Error
> From: yu...@useinsider.com 
> Date: Thu, 24 Sep 2015 15:34:48 +0300
> CC: user@spark.apache.org 
> To: jingyu.zh...@news.com.au ; 
> java8...@hotmail.com 
> 
> @Jingyu
> Yes, it works without regex and concatenation as the query below:
> 
> So, what we can understand from this? Because when i do like that, shuffle 
> read sizes are equally distributed between partitions.
> 
> val usersInputDF = sqlContext.sql(
> s"""
>  |  select userid from landing where dt='2015-9' and userid != '' and 
> userid is not null and userid is not NULL and pagetype = 'productDetail' 
> group by userid
> 
>""".stripMargin)
> 
> @java8964
> 
> I tried with sql.shuffle.partitions = 1 but no luck. It’s again one of 
> the partitions shuffle size is huge and the others are very small.
> 
> 
> ——
> So how can i balance this shuffle read size between partitions?
> 
> 
> On 24 Sep 2015, at 03:35, Zhang, Jingyu  > wrote:
> 
> Is you sql works if do not runs a regex on strings and concatenates them, I 
> mean just Select the stuff without String operations?
> 
> On 24 September 2015 at 10:11, java8964  > wrote:
> Try to 

Re: Querying on multiple Hive stores using Apache Spark

2015-09-24 Thread Michael Armbrust
This is not supported yet, though, we laid a lot of the ground work for
doing this in Spark 1.4.

On Wed, Sep 23, 2015 at 11:17 PM, Karthik 
wrote:

> Any ideas or suggestions?
>
> Thanks,
> Karthik.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Querying-on-multiple-Hive-stores-using-Apache-Spark-tp24765p24797.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: why more than more jobs in a batch in spark streaming ?

2015-09-24 Thread Tathagata Das
Are you using DStream.print()? Or something that boils down to RDD.take()?
That can lead to an unpredictable number of jobs. There are other cases as
well, but this one is common.

On Thu, Sep 24, 2015 at 12:04 PM, Shenghua(Daniel) Wan <
wansheng...@gmail.com> wrote:

> Hi,
> I noticed that in my streaming application reading from Kafka using
> multiple receivers, there are 3 jobs in one batch (via web UI).
> According to DAG there are two stages, job 0 execute both 2 stages, but
> job 1 and job 2 only execute stage 2. There is a disconnection between my
> understanding and reality. I have gone over the book and did some googling,
> but still I could not find the relationship between batch and jobs. Could
> anyone share insights?
> Thanks a lot!
>
> --
>
> Regards,
> Shenghua (Daniel) Wan
>


Re: kafka direct streaming with checkpointing

2015-09-24 Thread Radu Brumariu
It seems to me that this scenario that I'm facing, is quite common for
spark jobs using Kafka.
Is there a ticket to add this sort of semantics to checkpointing ? Does it
even make sense to add it there ?

Thanks,
Radu


On Thursday, September 24, 2015, Cody Koeninger  wrote:

> No, you cant use checkpointing across code changes.  Either store offsets
> yourself, or start up your new app code and let it catch up before killing
> the old one.
>
> On Thu, Sep 24, 2015 at 8:40 AM, Radu Brumariu  > wrote:
>
>> Hi,
>> in my application I use Kafka direct streaming and I have also enabled
>> checkpointing.
>> This seems to work fine if the application is restarted. However if I
>> change the code and resubmit the application, it cannot start because of
>> the checkpointed data being of different class versions.
>> Is there any way I can use checkpointing that can survive across
>> application version changes?
>>
>> Thanks,
>> Radu
>>
>>
>


Re: Potential racing condition in DAGScheduler when Spark 1.5 caching

2015-09-24 Thread Mark Hamstra
Where do you see a race in the DAGScheduler?  On a quick look at your stack
trace, this just looks to me like a Job where a Stage failed and then the
DAGScheduler aborted the failed Job.

On Thu, Sep 24, 2015 at 12:00 PM, robin_up  wrote:

> Hi
>
> After upgrade to 1.5, we found a possible racing condition in DAGScheduler
> similar to https://issues.apache.org/jira/browse/SPARK-4454.
>
> Here is the code creating the problem:
>
>
> app_cpm_load = sc.textFile("/user/a/app_ecpm.txt").map(lambda x:
> x.split(',')).map(lambda p: Row(app_id=str(p[0]), loc_filter=str(p[1]),
> cpm_required=float(p[2]) ))
> app_cpm = sqlContext.createDataFrame(app_cpm_load)
> app_cpm.registerTempTable("app_cpm")
>
> app_rev_cpm_sql = '''select loc_filter from app_cpm'''
> app_rev_cpm = sqlContext.sql(app_rev_cpm_sql)
> app_rev_cpm.cache()
> app_rev_cpm.show()
>
>
>
> Traceback (most recent call last):
>   File "", line 1, in 
>   File "/opt/spark/python/pyspark/sql/dataframe.py", line 256, in show
> print(self._jdf.showString(n, truncate))
>   File "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
> line 538, in __call__
>   File "/opt/spark/python/pyspark/sql/utils.py", line 36, in deco
> return f(*a, **kw)
>   File "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line
> 300, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling
> o46.showString.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage
> 1.0
> (TID 4, spark-yarn-dn02): java.util.NoSuchElementException: key not found:
> UK
> at scala.collection.MapLike$class.default(MapLike.scala:228)
> at scala.collection.AbstractMap.default(Map.scala:58)
> at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
> at
>
> org.apache.spark.sql.columnar.compression.DictionaryEncoding$Encoder.compress(compressionSchemes.scala:258)
> at
>
> org.apache.spark.sql.columnar.compression.CompressibleColumnBuilder$class.build(CompressibleColumnBuilder.scala:110)
> at
>
> org.apache.spark.sql.columnar.NativeColumnBuilder.build(ColumnBuilder.scala:87)
> at
>
> org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1$$anonfun$next$2.apply(InMemoryColumnarTableScan.scala:152)
> at
>
> org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1$$anonfun$next$2.apply(InMemoryColumnarTableScan.scala:152)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
>
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
> at
>
> org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:152)
> at
>
> org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:120)
> at
> org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:278)
> at
> org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
> at
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:88)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
> Driver stacktrace:
> at
> org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1280)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1268)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1267)
> at
>
> 

Re: kafka direct streaming with checkpointing

2015-09-24 Thread Cody Koeninger
This has been discussed numerous times, TD's response has consistently been
that it's unlikely to be possible

On Thu, Sep 24, 2015 at 12:26 PM, Radu Brumariu  wrote:

> It seems to me that this scenario that I'm facing, is quite common for
> spark jobs using Kafka.
> Is there a ticket to add this sort of semantics to checkpointing ? Does it
> even make sense to add it there ?
>
> Thanks,
> Radu
>
>
> On Thursday, September 24, 2015, Cody Koeninger 
> wrote:
>
>> No, you cant use checkpointing across code changes.  Either store offsets
>> yourself, or start up your new app code and let it catch up before killing
>> the old one.
>>
>> On Thu, Sep 24, 2015 at 8:40 AM, Radu Brumariu  wrote:
>>
>>> Hi,
>>> in my application I use Kafka direct streaming and I have also enabled
>>> checkpointing.
>>> This seems to work fine if the application is restarted. However if I
>>> change the code and resubmit the application, it cannot start because of
>>> the checkpointed data being of different class versions.
>>> Is there any way I can use checkpointing that can survive across
>>> application version changes?
>>>
>>> Thanks,
>>> Radu
>>>
>>>
>>


Large number of conf broadcasts

2015-09-24 Thread Anders Arpteg
Hi,

Running spark 1.5.0 in yarn-client mode, and am curios in why there are so
many broadcast being done when loading datasets with large number of
partitions/files. Have datasets with thousands of partitions, i.e. hdfs
files in the avro folder, and sometime loading hundreds of these large
datasets. Believe I have located the broadcast to line
SparkContext.scala:1006. It seems to just broadcast the hadoop
configuration, and I don't see why it should be necessary to broadcast that
for EVERY file? Wouldn't it be possible to reuse the same broadcast
configuration? It hardly the case the the configuration would be different
between each file in a single dataset. Seems to be wasting lots of memory
and needs to persist unnecessarily to disk (see below again).

Thanks,
Anders

15/09/24 17:11:11 INFO BlockManager: Writing block broadcast_1871_piece0 to
disk  [19/49086]15/09/24
17:11:11 INFO BlockManagerInfo: Added broadcast_1871_piece0 on disk on
10.254.35.24:49428 (size: 23.1 KB)
15/09/24 17:11:11 INFO MemoryStore: Block broadcast_4803_piece0 stored as
bytes in memory (estimated size 23.1 KB, free 2.4 KB)
15/09/24 17:11:11 INFO BlockManagerInfo: Added broadcast_4803_piece0 in
memory on 10.254.35.24:49428 (size: 23.1 KB, free: 464.0 MB)
15/09/24 17:11:11 INFO SpotifySparkContext: Created broadcast 4803 from
hadoopFile at AvroRelation.scala:121
15/09/24 17:11:11 WARN MemoryStore: Failed to reserve initial memory
threshold of 1024.0 KB for computing block broadcast_4804 in memory
.
15/09/24 17:11:11 WARN MemoryStore: Not enough space to cache
broadcast_4804 in memory! (computed 496.0 B so far)
15/09/24 17:11:11 INFO MemoryStore: Memory use = 530.3 MB (blocks) + 0.0 B
(scratch space shared across 0 tasks(s)) = 530.3 MB. Storage
limit = 530.3 MB.
15/09/24 17:11:11 WARN MemoryStore: Persisting block broadcast_4804 to disk
instead.
15/09/24 17:11:11 INFO MemoryStore: ensureFreeSpace(23703) called with
curMem=556036460, maxMem=556038881
15/09/24 17:11:11 INFO MemoryStore: 1 blocks selected for dropping
15/09/24 17:11:11 INFO BlockManager: Dropping block broadcast_1872_piece0
from memory
15/09/24 17:11:11 INFO BlockManager: Writing block broadcast_1872_piece0 to
disk


Re: Potential racing condition in DAGScheduler when Spark 1.5 caching

2015-09-24 Thread Josh Rosen
I believe that this is an instance of
https://issues.apache.org/jira/browse/SPARK-10422, which should be fixed in
upcoming 1.5.1 release.

On Thu, Sep 24, 2015 at 12:52 PM, Mark Hamstra 
wrote:

> Where do you see a race in the DAGScheduler?  On a quick look at your
> stack trace, this just looks to me like a Job where a Stage failed and then
> the DAGScheduler aborted the failed Job.
>
> On Thu, Sep 24, 2015 at 12:00 PM, robin_up  wrote:
>
>> Hi
>>
>> After upgrade to 1.5, we found a possible racing condition in DAGScheduler
>> similar to https://issues.apache.org/jira/browse/SPARK-4454.
>>
>> Here is the code creating the problem:
>>
>>
>> app_cpm_load = sc.textFile("/user/a/app_ecpm.txt").map(lambda x:
>> x.split(',')).map(lambda p: Row(app_id=str(p[0]), loc_filter=str(p[1]),
>> cpm_required=float(p[2]) ))
>> app_cpm = sqlContext.createDataFrame(app_cpm_load)
>> app_cpm.registerTempTable("app_cpm")
>>
>> app_rev_cpm_sql = '''select loc_filter from app_cpm'''
>> app_rev_cpm = sqlContext.sql(app_rev_cpm_sql)
>> app_rev_cpm.cache()
>> app_rev_cpm.show()
>>
>>
>>
>> Traceback (most recent call last):
>>   File "", line 1, in 
>>   File "/opt/spark/python/pyspark/sql/dataframe.py", line 256, in show
>> print(self._jdf.showString(n, truncate))
>>   File "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
>> line 538, in __call__
>>   File "/opt/spark/python/pyspark/sql/utils.py", line 36, in deco
>> return f(*a, **kw)
>>   File "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line
>> 300, in get_return_value
>> py4j.protocol.Py4JJavaError: An error occurred while calling
>> o46.showString.
>> : org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> 0
>> in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage
>> 1.0
>> (TID 4, spark-yarn-dn02): java.util.NoSuchElementException: key not found:
>> UK
>> at scala.collection.MapLike$class.default(MapLike.scala:228)
>> at scala.collection.AbstractMap.default(Map.scala:58)
>> at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
>> at
>>
>> org.apache.spark.sql.columnar.compression.DictionaryEncoding$Encoder.compress(compressionSchemes.scala:258)
>> at
>>
>> org.apache.spark.sql.columnar.compression.CompressibleColumnBuilder$class.build(CompressibleColumnBuilder.scala:110)
>> at
>>
>> org.apache.spark.sql.columnar.NativeColumnBuilder.build(ColumnBuilder.scala:87)
>> at
>>
>> org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1$$anonfun$next$2.apply(InMemoryColumnarTableScan.scala:152)
>> at
>>
>> org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1$$anonfun$next$2.apply(InMemoryColumnarTableScan.scala:152)
>> at
>>
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> at
>>
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> at
>>
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> at
>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>> at
>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>> at
>>
>> org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:152)
>> at
>>
>> org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:120)
>> at
>> org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:278)
>> at
>> org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
>> at
>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>> at
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>> at org.apache.spark.scheduler.Task.run(Task.scala:88)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Driver stacktrace:
>> at
>> org.apache.spark.scheduler.DAGScheduler.org
>> 

Re: reduceByKey inside updateStateByKey in Spark Streaming???

2015-09-24 Thread Adrian Tanase
The 2 operations can't be used inside one another.

If you need something like an all time average then you need to keep a tuple 
(sum, count) to which you add all the new values that come in every batch. The 
average is then just a map on the state DStream.

Makes sense? have I guessed your use case?

Sent from my iPhone

> On 24 Sep 2015, at 19:47, swetha  wrote:
> 
> Hi,
> 
> How to use reduceByKey inside updateStateByKey? Suppose I have a bunch of
> keys for which I need to do sum and average inside the  updateStateByKey by
> joining with old state. How do I accomplish that?
> 
> 
> Thanks,
> Swetha
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/reduceByKey-inside-updateStateByKey-in-Spark-Streaming-tp24808.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Join over many small files

2015-09-24 Thread Tracewski, Lukasz
Thanks for answer!

Why sequence files though, why not to work directly on RDDs? My input files are 
CSVs and often contain some garbage both at the beginning and end of a file. 
Mind that I am working in Python, I am not sure if it will be as efficient as 
intended.

Any examples in PySpark will be much welcomed.

Thanks!
Lucas

From: ayan guha [mailto:guha.a...@gmail.com]
Sent: 24 September 2015 00:19
To: Tracewski, Lukasz (KFDB 3)
Cc: user@spark.apache.org
Subject: Re: Join over many small files

I think this can be a good case for using sequence file format to pack many 
files to few sequence files with file name as key andd content as value. Then 
read it as RDD and produce tuples like you mentioned (key=fileno+id, 
value=value). After that, it is a simple map operation to generate the diff 
(broadcasting special file is right idea).

On Thu, Sep 24, 2015 at 7:31 AM, Tracewski, Lukasz 
> 
wrote:
Hi all,

I would like you to ask for an advise on how to efficiently make a join 
operation in Spark with tens of thousands of tiny files. A single file has a 
few KB and ~50 rows. In another scenario they might have 200 KB and 2000 rows.

To give you impression how they look like:

File 01
ID | VALUE
01 | 10
02 | 12
03 | 55
…

File 02
ID | VALUE
01 | 33
02 | 21
03 | 53
…

and so on… ID is unique in a file, but repeats in every file. There is also a 
Special file which has the same form:

File Special
ID | VALUE
01 | 21
02 | 23
03 | 54
…

What I would like to get is a join of File 01..1 with File Special to get a 
difference between values:

File Result 01 = File Special – File 01
ID | VALUE
01 | 21-10
02 | 23-12
03 | 54-53
…

And save result to a csv, meaning 1 new files. What’s the best way of doing 
this?

My idea was the following:

1.   Read all Files with wholeTextFiles, each to a separate partition

2.   Perform map-side join with broadcast variable inside mapPartitions 
(the “Special” file will be broadcasted).

I am on Spark 1.3, but it can be upgraded if needed. Perhaps this could be done 
better in a dataframe? Then I would create one large dataframe, with additional 
“filename” key, i.e.:
File | ID | Value
01 | 01 | 10
01 | 02 | 12
01 | 03 | 55
02 | 01 | 21
02 | 02 | 23
…

What would be then a way to make an efficient query over such dataframe?

Any advice will be appreciated.

Best regards,
Lucas

==
Please access the attached hyperlink for an important electronic communications 
disclaimer:
http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html
==



--
Best Regards,
Ayan Guha


=== 
Please access the attached hyperlink for an important electronic communications 
disclaimer: 
http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html 
=== 


Re: ERROR BoundedByteBufferReceive: OOME with size 352518400

2015-09-24 Thread Sourabh Chandak
Here is the code snippet, starting line 365 in KafkaCluster.scala:

type Err = ArrayBuffer[Throwable]

/** If the result is right, return it, otherwise throw SparkException */
def checkErrors[T](result: Either[Err, T]): T = {
  result.fold(
errs => throw new SparkException(errs.mkString("Throwing this errir\n")),
ok => ok
  )
}



On Thu, Sep 24, 2015 at 3:00 PM, Sourabh Chandak 
wrote:

> I was able to get pass this issue. I was pointing the SSL port whereas
> SimpleConsumer should point to the PLAINTEXT port. But after fixing that I
> am getting the following error:
>
> Exception in thread "main" org.apache.spark.SparkException:
> java.nio.BufferUnderflowException
> at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
> at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
> at scala.util.Either.fold(Either.scala:97)
> at
> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
> at
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:309)
> at
> org.ofe.weve.test.KafkaTest$.setupProcessingContext(KafkaTest.scala:36)
> at org.ofe.weve.test.KafkaTest$.main(KafkaTest.scala:59)
> at org.ofe.weve.test.KafkaTest.main(KafkaTest.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> Thanks,
> Sourabh
>
> On Thu, Sep 24, 2015 at 2:04 PM, Cody Koeninger 
> wrote:
>
>> That looks like the OOM is in the driver, when getting partition metadata
>> to create the direct stream.  In that case, executor memory allocation
>> doesn't matter.
>>
>> Allocate more driver memory, or put a profiler on it to see what's taking
>> up heap.
>>
>>
>>
>> On Thu, Sep 24, 2015 at 3:51 PM, Sourabh Chandak 
>> wrote:
>>
>>> Adding Cody and Sriharsha
>>>
>>> On Thu, Sep 24, 2015 at 1:25 PM, Sourabh Chandak 
>>> wrote:
>>>
 Hi,

 I have ported receiver less spark streaming for kafka to Spark 1.2 and
 am trying to run a spark streaming job to consume data form my broker, but
 I am getting the following error:

 15/09/24 20:17:45 ERROR BoundedByteBufferReceive: OOME with size
 352518400
 java.lang.OutOfMemoryError: Java heap space
 at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
 at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
 at
 kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
 at
 kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
 at
 kafka.network.Receive$class.readCompletely(Transmission.scala:56)
 at
 kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
 at
 kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
 at
 kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:83)
 at
 kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:80)
 at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:103)
 at
 org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:126)
 at
 org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:125)
 at
 org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:346)
 at
 org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:342)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at
 scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
 at org.apache.spark.streaming.kafka.KafkaCluster.org
 $apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:342)
 at
 org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:125)
 at
 org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
 at
 

Re: Reading Hive Tables using SQLContext

2015-09-24 Thread Michael Armbrust
No, you have to use a HiveContext.

On Thu, Sep 24, 2015 at 2:47 PM, Sathish Kumaran Vairavelu <
vsathishkuma...@gmail.com> wrote:

> Hello,
>
> Is it possible to access Hive tables directly from SQLContext instead of
> HiveContext? I am facing with errors while doing it.
>
> Please let me know
>
>
> Thanks
>
> Sathish
>


Re: Networking issues with Spark on EC2

2015-09-24 Thread Ankur Srivastava
Hi Suraj,

Spark uses a lot of ports to communicate between nodes. Probably your
security group is restrictive and does not allow instances to communicate
on all networks. The easiest way to resolve it is to add a Rule to allow
all Inbound traffic on all ports (0-65535) to instances in same security
group like this.

All TCP
TCP
0 - 65535
 your security group

Hope this helps!!

Thanks
Ankur

On Thu, Sep 24, 2015 at 7:09 AM SURAJ SHETH  wrote:

> Hi,
>
> I am using Spark 1.2 and facing network related issues while performing
> simple computations.
>
> This is a custom cluster set up using ec2 machines and spark prebuilt
> binary from apache site. The problem is only when we have workers on other
> machines(networking involved). Having a single node for the master and the
> slave works correctly.
>
> The error log from slave node is attached below. It is reading textFile
> from local FS(copied each node) and counting it. The first 30 tasks get
> completed within 5 seconds. Then, it takes several minutes to complete
> another 10 tasks and eventually dies.
>
> Sometimes, one of the workers completes all the tasks assigned to it.
> Different workers have different behavior at different
> times(non-deterministic).
>
> Is it related to something specific to EC2?
>
>
>
> 15/09/24 13:04:40 INFO Executor: Running task 117.0 in stage 0.0 (TID 117)
>
> 15/09/24 13:04:41 INFO TorrentBroadcast: Started reading broadcast
> variable 1
>
> 15/09/24 13:04:41 INFO SendingConnection: Initiating connection to
> [master_ip:56305]
>
> 15/09/24 13:04:41 INFO SendingConnection: Connected to
> [master_ip/master_ip_address:56305], 1 messages pending
>
> 15/09/24 13:05:41 INFO TorrentBroadcast: Started reading broadcast
> variable 1
>
> 15/09/24 13:05:41 ERROR Executor: Exception in task 77.0 in stage 0.0 (TID
> 77)
>
> java.io.IOException: sendMessageReliably failed because ack was not
> received within 60 sec
>
> at
> org.apache.spark.network.nio.ConnectionManager$$anon$13$$anonfun$run$19.apply(ConnectionManager.scala:918)
>
> at
> org.apache.spark.network.nio.ConnectionManager$$anon$13$$anonfun$run$19.apply(ConnectionManager.scala:917)
>
> at scala.Option.foreach(Option.scala:236)
>
> at
> org.apache.spark.network.nio.ConnectionManager$$anon$13.run(ConnectionManager.scala:917)
>
> at
> io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:581)
>
> at
> io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:656)
>
> at
> io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:367)
>
> at java.lang.Thread.run(Thread.java:745)
>
> 15/09/24 13:05:41 INFO CoarseGrainedExecutorBackend: Got assigned task 122
>
> 15/09/24 13:05:41 INFO Executor: Running task 3.1 in stage 0.0 (TID 122)
>
> 15/09/24 13:06:41 ERROR Executor: Exception in task 113.0 in stage 0.0
> (TID 113)
>
> java.io.IOException: sendMessageReliably failed because ack was not
> received within 60 sec
>
> at
> org.apache.spark.network.nio.ConnectionManager$$anon$13$$anonfun$run$19.apply(ConnectionManager.scala:918)
>
> at
> org.apache.spark.network.nio.ConnectionManager$$anon$13$$anonfun$run$19.apply(ConnectionManager.scala:917)
>
> at scala.Option.foreach(Option.scala:236)
>
> at
> org.apache.spark.network.nio.ConnectionManager$$anon$13.run(ConnectionManager.scala:917)
>
> at
> io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:581)
>
> at
> io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:656)
>
> at
> io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:367)
>
> at java.lang.Thread.run(Thread.java:745)
>
> 15/09/24 13:06:41 INFO TorrentBroadcast: Started reading broadcast
> variable 1
>
> 15/09/24 13:06:41 INFO SendingConnection: Initiating connection to
> [master_ip/master_ip_address:44427]
>
> 15/09/24 13:06:41 INFO SendingConnection: Connected to
> [master_ip/master_ip_address:44427], 1 messages pending
>
> 15/09/24 13:07:41 ERROR Executor: Exception in task 37.0 in stage 0.0 (TID
> 37)
>
> java.io.IOException: sendMessageReliably failed because ack was not
> received within 60 sec
>
> at
> org.apache.spark.network.nio.ConnectionManager$$anon$13$$anonfun$run$19.apply(ConnectionManager.scala:918)
>
> at
> org.apache.spark.network.nio.ConnectionManager$$anon$13$$anonfun$run$19.apply(ConnectionManager.scala:917)
>
> at scala.Option.foreach(Option.scala:236)
>
> at
> org.apache.spark.network.nio.ConnectionManager$$anon$13.run(ConnectionManager.scala:917)
>
> at
> io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:581)
>
> at
> io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:656)
>
> at
> io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:367)
>
> at 

Reasonable performance numbers?

2015-09-24 Thread Young, Matthew T
Hello,

I am doing performance testing with Spark Streaming. I want to know if the 
throughput numbers I am encountering are reasonable for the power of my cluster 
and Spark's performance characteristics.

My job has the following processing steps:

1.  Read 600 Byte JSON strings from a 7 broker / 48 partition Kafka cluster 
via the Kafka Direct API

2.  Parse the JSON with play-json or lift-json (no significant performance 
difference)

3.  Read one integer value out of the JSON

4.  Compute the average of this integer value across all records in the 
batch with DoubleRDD.mean

5.  Write the average for the batch back to a different Kafka topic

I have tried 2, 4, and 10 second batch intervals. The best throughput I can 
sustain is about 75,000 records/second for the whole cluster.

The Spark cluster is in a VM environment with 3 VMs. Each VM has 32 GB of RAM 
and 16 cores. The systems are networked with 10 GB NICs. I started testing with 
Spark 1.3.1 and switched to Spark 1.5 to see if there was improvement (none 
significant). When I look at the event timeline in the WebUI I see that the 
majority of the processing time for each batch is "Executor Computing Time" in 
the foreachRDD that computes the average, not the transform that does the JSON 
parsing.

CPU util hovers around 40% across the cluster, and RAM has plenty of free space 
remaining as well. Network comes nowhere close to being saturated.

My colleague implementing similar functionality in Storm is able to exceed a 
quarter million records per second with the same hardware.

Is 75K records/seconds reasonable for a cluster of this size? What kind of 
performance would you expect for this job?


Thanks,

-- Matthew


Re: Potential racing condition in DAGScheduler when Spark 1.5 caching

2015-09-24 Thread Robin Li
Josh

Looked closer, I think you are correct, not a racing condition. This only
shows up on persisting string, other data format looks fine.

Also whe we reverted to 1.4 the issue's gone.

Thanks

On Thursday, 24 September 2015, Josh Rosen  wrote:

> I believe that this is an instance of
> https://issues.apache.org/jira/browse/SPARK-10422, which should be fixed
> in upcoming 1.5.1 release.
>
> On Thu, Sep 24, 2015 at 12:52 PM, Mark Hamstra  > wrote:
>
>> Where do you see a race in the DAGScheduler?  On a quick look at your
>> stack trace, this just looks to me like a Job where a Stage failed and then
>> the DAGScheduler aborted the failed Job.
>>
>> On Thu, Sep 24, 2015 at 12:00 PM, robin_up > > wrote:
>>
>>> Hi
>>>
>>> After upgrade to 1.5, we found a possible racing condition in
>>> DAGScheduler
>>> similar to https://issues.apache.org/jira/browse/SPARK-4454.
>>>
>>> Here is the code creating the problem:
>>>
>>>
>>> app_cpm_load = sc.textFile("/user/a/app_ecpm.txt").map(lambda x:
>>> x.split(',')).map(lambda p: Row(app_id=str(p[0]), loc_filter=str(p[1]),
>>> cpm_required=float(p[2]) ))
>>> app_cpm = sqlContext.createDataFrame(app_cpm_load)
>>> app_cpm.registerTempTable("app_cpm")
>>>
>>> app_rev_cpm_sql = '''select loc_filter from app_cpm'''
>>> app_rev_cpm = sqlContext.sql(app_rev_cpm_sql)
>>> app_rev_cpm.cache()
>>> app_rev_cpm.show()
>>>
>>>
>>>
>>> Traceback (most recent call last):
>>>   File "", line 1, in 
>>>   File "/opt/spark/python/pyspark/sql/dataframe.py", line 256, in show
>>> print(self._jdf.showString(n, truncate))
>>>   File "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
>>> line 538, in __call__
>>>   File "/opt/spark/python/pyspark/sql/utils.py", line 36, in deco
>>> return f(*a, **kw)
>>>   File "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
>>> line
>>> 300, in get_return_value
>>> py4j.protocol.Py4JJavaError: An error occurred while calling
>>> o46.showString.
>>> : org.apache.spark.SparkException: Job aborted due to stage failure:
>>> Task 0
>>> in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage
>>> 1.0
>>> (TID 4, spark-yarn-dn02): java.util.NoSuchElementException: key not
>>> found:
>>> UK
>>> at scala.collection.MapLike$class.default(MapLike.scala:228)
>>> at scala.collection.AbstractMap.default(Map.scala:58)
>>> at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
>>> at
>>>
>>> org.apache.spark.sql.columnar.compression.DictionaryEncoding$Encoder.compress(compressionSchemes.scala:258)
>>> at
>>>
>>> org.apache.spark.sql.columnar.compression.CompressibleColumnBuilder$class.build(CompressibleColumnBuilder.scala:110)
>>> at
>>>
>>> org.apache.spark.sql.columnar.NativeColumnBuilder.build(ColumnBuilder.scala:87)
>>> at
>>>
>>> org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1$$anonfun$next$2.apply(InMemoryColumnarTableScan.scala:152)
>>> at
>>>
>>> org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1$$anonfun$next$2.apply(InMemoryColumnarTableScan.scala:152)
>>> at
>>>
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>> at
>>>
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>> at
>>>
>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>> at
>>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>> at
>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>> at
>>> scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>>> at
>>>
>>> org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:152)
>>> at
>>>
>>> org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:120)
>>> at
>>> org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:278)
>>> at
>>> org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
>>> at
>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
>>> at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>> at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>>> at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>> at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>>> at
>>> 

Reading Hive Tables using SQLContext

2015-09-24 Thread Sathish Kumaran Vairavelu
Hello,

Is it possible to access Hive tables directly from SQLContext instead of
HiveContext? I am facing with errors while doing it.

Please let me know


Thanks

Sathish


Re: kafka direct streaming with checkpointing

2015-09-24 Thread Radu Brumariu
Would changing the direct stream api to support committing the offsets to
kafka's ZK( like a regular consumer) as a fallback mechanism, in case
recovering from checkpoint fails , be an accepted solution?

On Thursday, September 24, 2015, Cody Koeninger  wrote:

> This has been discussed numerous times, TD's response has consistently
> been that it's unlikely to be possible
>
> On Thu, Sep 24, 2015 at 12:26 PM, Radu Brumariu  > wrote:
>
>> It seems to me that this scenario that I'm facing, is quite common for
>> spark jobs using Kafka.
>> Is there a ticket to add this sort of semantics to checkpointing ? Does
>> it even make sense to add it there ?
>>
>> Thanks,
>> Radu
>>
>>
>> On Thursday, September 24, 2015, Cody Koeninger > > wrote:
>>
>>> No, you cant use checkpointing across code changes.  Either store
>>> offsets yourself, or start up your new app code and let it catch up before
>>> killing the old one.
>>>
>>> On Thu, Sep 24, 2015 at 8:40 AM, Radu Brumariu  wrote:
>>>
 Hi,
 in my application I use Kafka direct streaming and I have also enabled
 checkpointing.
 This seems to work fine if the application is restarted. However if I
 change the code and resubmit the application, it cannot start because of
 the checkpointed data being of different class versions.
 Is there any way I can use checkpointing that can survive across
 application version changes?

 Thanks,
 Radu


>>>
>


executor-cores setting does not work under Yarn

2015-09-24 Thread Gavin Yue
Running Spark app over Yarn 2.7

Here is my sparksubmit setting:
--master yarn-cluster \
 --num-executors 100 \
 --executor-cores 3 \
 --executor-memory 20g \
 --driver-memory 20g \
 --driver-cores 2 \

But the executor cores setting is not working. It always assigns only one
vcore  to one container based on the cluster metrics from yarn resource
manager website.

And yarn setting for container is
min:   max: 

I have tried to change num-executors and executor memory. It even ignores
the min cCores setting and always assign one core per container.

Any advice?

Thank you!


Re: ERROR BoundedByteBufferReceive: OOME with size 352518400

2015-09-24 Thread Cody Koeninger
That looks like the OOM is in the driver, when getting partition metadata
to create the direct stream.  In that case, executor memory allocation
doesn't matter.

Allocate more driver memory, or put a profiler on it to see what's taking
up heap.



On Thu, Sep 24, 2015 at 3:51 PM, Sourabh Chandak 
wrote:

> Adding Cody and Sriharsha
>
> On Thu, Sep 24, 2015 at 1:25 PM, Sourabh Chandak 
> wrote:
>
>> Hi,
>>
>> I have ported receiver less spark streaming for kafka to Spark 1.2 and am
>> trying to run a spark streaming job to consume data form my broker, but I
>> am getting the following error:
>>
>> 15/09/24 20:17:45 ERROR BoundedByteBufferReceive: OOME with size 352518400
>> java.lang.OutOfMemoryError: Java heap space
>> at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
>> at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
>> at
>> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
>> at
>> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
>> at
>> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>> at
>> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>> at
>> kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
>> at
>> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:83)
>> at
>> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:80)
>> at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:103)
>> at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:126)
>> at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:125)
>> at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:346)
>> at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:342)
>> at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> at
>> scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
>> at org.apache.spark.streaming.kafka.KafkaCluster.org
>> $apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:342)
>> at
>> org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:125)
>> at
>> org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
>> at
>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:296)
>> at
>> org.ofe.weve.test.KafkaTest$.setupProcessingContext(KafkaTest.scala:35)
>> at org.ofe.weve.test.KafkaTest$.main(KafkaTest.scala:58)
>> at org.ofe.weve.test.KafkaTest.main(KafkaTest.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:497)
>> at
>> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>
>>
>>
>> I have tried allocating 100G of memory with 1 executor but it is still
>> failing.
>>
>> Spark version: 1.2.2
>> Kafka version ported: 0.8.2
>> Kafka server version: trunk version with SSL enabled
>>
>> Can someone please help me debug this.
>>
>> Thanks,
>> Sourabh
>>
>
>


Re: ERROR BoundedByteBufferReceive: OOME with size 352518400

2015-09-24 Thread Sourabh Chandak
I was able to get pass this issue. I was pointing the SSL port whereas
SimpleConsumer should point to the PLAINTEXT port. But after fixing that I
am getting the following error:

Exception in thread "main" org.apache.spark.SparkException:
java.nio.BufferUnderflowException
at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at scala.util.Either.fold(Either.scala:97)
at
org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
at
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:309)
at
org.ofe.weve.test.KafkaTest$.setupProcessingContext(KafkaTest.scala:36)
at org.ofe.weve.test.KafkaTest$.main(KafkaTest.scala:59)
at org.ofe.weve.test.KafkaTest.main(KafkaTest.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Thanks,
Sourabh

On Thu, Sep 24, 2015 at 2:04 PM, Cody Koeninger  wrote:

> That looks like the OOM is in the driver, when getting partition metadata
> to create the direct stream.  In that case, executor memory allocation
> doesn't matter.
>
> Allocate more driver memory, or put a profiler on it to see what's taking
> up heap.
>
>
>
> On Thu, Sep 24, 2015 at 3:51 PM, Sourabh Chandak 
> wrote:
>
>> Adding Cody and Sriharsha
>>
>> On Thu, Sep 24, 2015 at 1:25 PM, Sourabh Chandak 
>> wrote:
>>
>>> Hi,
>>>
>>> I have ported receiver less spark streaming for kafka to Spark 1.2 and
>>> am trying to run a spark streaming job to consume data form my broker, but
>>> I am getting the following error:
>>>
>>> 15/09/24 20:17:45 ERROR BoundedByteBufferReceive: OOME with size
>>> 352518400
>>> java.lang.OutOfMemoryError: Java heap space
>>> at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
>>> at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
>>> at
>>> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
>>> at
>>> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
>>> at
>>> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>>> at
>>> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>>> at
>>> kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
>>> at
>>> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:83)
>>> at
>>> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:80)
>>> at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:103)
>>> at
>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:126)
>>> at
>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:125)
>>> at
>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:346)
>>> at
>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:342)
>>> at
>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>> at
>>> scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
>>> at org.apache.spark.streaming.kafka.KafkaCluster.org
>>> $apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:342)
>>> at
>>> org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:125)
>>> at
>>> org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
>>> at
>>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:296)
>>> at
>>> org.ofe.weve.test.KafkaTest$.setupProcessingContext(KafkaTest.scala:35)
>>> at org.ofe.weve.test.KafkaTest$.main(KafkaTest.scala:58)
>>> at org.ofe.weve.test.KafkaTest.main(KafkaTest.scala)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> 

Why Checkpoint is throwing "actor.OneForOneStrategy: NullPointerException"

2015-09-24 Thread Uthayan Suthakar
Hello all,

My Stream job is throwing below exception at every interval. It is first
deleting the the checkpoint file and then it's trying to checkpoint, is
this normal behaviour? I'm using Spark 1.3.0. Do you know what may cause
this issue?

15/09/24 16:35:55 INFO scheduler.TaskSetManager: Finished task 1.0 in stage
84.0 (TID 799) in 12 ms on itrac1511.cern.ch (1/8)
*15/09/24 16:35:55 INFO streaming.CheckpointWriter:
Deleting 
hdfs://p01001532067275/user/wdtmon/wdt-dstream-6/checkpoint-144310422*
*15/09/24 16:35:55 INFO streaming.CheckpointWriter: Checkpoint for time
144310422 ms saved to file
'hdfs://p01001532067275/user/wdtmon/wdt-dstream-6/*
checkpoint-144310422', took 10696 bytes and 108 ms
15/09/24 16:35:55 INFO streaming.DStreamGraph: Clearing checkpoint data for
time 144310422 ms
15/09/24 16:35:55 INFO streaming.DStreamGraph: Cleared checkpoint data for
time 144310422 ms
15/09/24 16:35:55 ERROR actor.OneForOneStrategy:
java.lang.NullPointerException
at
org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:168)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:168)
at
scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
at
scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
at
scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
at
scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
at
scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
at scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
at
org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:168)
at
org.apache.spark.streaming.scheduler.JobGenerator.clearCheckpointData(JobGenerator.scala:279)
at org.apache.spark.streaming.scheduler.JobGenerator.org
$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:86)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


Cheers,

Uthay


Re: Why Checkpoint is throwing "actor.OneForOneStrategy: NullPointerException"

2015-09-24 Thread Tathagata Das
Are you by any chance setting DStream.remember() with null?

On Thu, Sep 24, 2015 at 5:02 PM, Uthayan Suthakar <
uthayan.sutha...@gmail.com> wrote:

> Hello all,
>
> My Stream job is throwing below exception at every interval. It is first
> deleting the the checkpoint file and then it's trying to checkpoint, is
> this normal behaviour? I'm using Spark 1.3.0. Do you know what may cause
> this issue?
>
> 15/09/24 16:35:55 INFO scheduler.TaskSetManager: Finished task 1.0 in
> stage 84.0 (TID 799) in 12 ms on itrac1511.cern.ch (1/8)
> *15/09/24 16:35:55 INFO streaming.CheckpointWriter:
> Deleting 
> hdfs://p01001532067275/user/wdtmon/wdt-dstream-6/checkpoint-144310422*
> *15/09/24 16:35:55 INFO streaming.CheckpointWriter: Checkpoint for time
> 144310422 ms saved to file
> 'hdfs://p01001532067275/user/wdtmon/wdt-dstream-6/*
> checkpoint-144310422', took 10696 bytes and 108 ms
> 15/09/24 16:35:55 INFO streaming.DStreamGraph: Clearing checkpoint data
> for time 144310422 ms
> 15/09/24 16:35:55 INFO streaming.DStreamGraph: Cleared checkpoint data for
> time 144310422 ms
> 15/09/24 16:35:55 ERROR actor.OneForOneStrategy:
> java.lang.NullPointerException
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:168)
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:168)
> at
> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
> at
> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
> at
> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
> at
> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
> at
> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
> at
> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
> at
> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:168)
> at
> org.apache.spark.streaming.scheduler.JobGenerator.clearCheckpointData(JobGenerator.scala:279)
> at org.apache.spark.streaming.scheduler.JobGenerator.org
> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:86)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
> Cheers,
>
> Uthay
>
>


Re: Unable to start spark-shell on YARN

2015-09-24 Thread Doug Balog
The error is because the shell is trying to resolve hdp.version and can’t.
To fix this, you need to put a file called java-opts in your conf directory 
that  has something like this.

-Dhdp.version=2.x.x.x

Where 2.x.x.x is there version of hdp that you are using.
Cheers,

Doug

> On Sep 24, 2015, at 6:11 PM, ÐΞ€ρ@Ҝ (๏̯͡๏)  wrote:
> 
> Spark 1.4.1
> YARN
> Hadoop version: 2.7.1.2.3.1.0-2574
> ./bin/spark-shell  --master yarn
> Hadoop cluster setup using Ambari.
> 
> 
> Shell fails as YARN job failed. Any suggestions ? 
> 
> LOGS:
> 
> 15/09/24 15:07:51 INFO impl.YarnClientImpl: Submitted application 
> application_1443126834156_0016
> 15/09/24 15:07:52 INFO yarn.Client: Application report for 
> application_1443126834156_0016 (state: ACCEPTED)
> 15/09/24 15:07:52 INFO yarn.Client: 
>client token: N/A
>diagnostics: N/A
>ApplicationMaster host: N/A
>ApplicationMaster RPC port: -1
>queue: default
>start time: 1443132471179
>final status: UNDEFINED
>tracking URL: http://host:8088/proxy/application_1443126834156_0016/
>user: zeppelin
> 15/09/24 15:07:53 INFO yarn.Client: Application report for 
> application_1443126834156_0016 (state: ACCEPTED)
> 15/09/24 15:07:54 INFO yarn.Client: Application report for 
> application_1443126834156_0016 (state: ACCEPTED)
> 15/09/24 15:07:55 INFO yarn.Client: Application report for 
> application_1443126834156_0016 (state: ACCEPTED)
> 15/09/24 15:07:56 INFO yarn.Client: Application report for 
> application_1443126834156_0016 (state: FAILED)
> 15/09/24 15:07:56 INFO yarn.Client: 
>client token: N/A
>diagnostics: Application application_1443126834156_0016 failed 2 times 
> due to AM Container for appattempt_1443126834156_0016_02 exited with  
> exitCode: 1
> For more detailed output, check application tracking 
> page:http://host:8088/cluster/app/application_1443126834156_0016Then, click 
> on links to logs of each attempt.
> Diagnostics: Exception from container-launch.
> Container id: container_e03_1443126834156_0016_02_01
> Exit code: 1
> Exception message: 
> /hadoop/yarn/local/usercache/zeppelin/appcache/application_1443126834156_0016/container_e03_1443126834156_0016_02_01/launch_container.sh:
>  line 24: 
> $PWD:$PWD/__hadoop_conf__:$PWD/__spark__.jar:$HADOOP_CONF_DIR:/usr/hdp/current/hadoop-client/*:/usr/hdp/current/hadoop-client/lib/*:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-hdfs-client/lib/*:/usr/hdp/current/hadoop-yarn-client/*:/usr/hdp/current/hadoop-yarn-client/lib/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/lib/*:$PWD/mr-framework/hadoop/share/hadoop/common/*:$PWD/mr-framework/hadoop/share/hadoop/common/lib/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/lib/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/lib/*:$PWD/mr-framework/hadoop/share/hadoop/tools/lib/*:/usr/hdp/${hdp.version}/hadoop/lib/hadoop-lzo-0.6.0.${hdp.version}.jar:/etc/hadoop/conf/secure:
>  bad substitution
> 
> Stack trace: ExitCodeException exitCode=1: 
> /hadoop/yarn/local/usercache/zeppelin/appcache/application_1443126834156_0016/container_e03_1443126834156_0016_02_01/launch_container.sh:
>  line 24: 
> $PWD:$PWD/__hadoop_conf__:$PWD/__spark__.jar:$HADOOP_CONF_DIR:/usr/hdp/current/hadoop-client/*:/usr/hdp/current/hadoop-client/lib/*:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-hdfs-client/lib/*:/usr/hdp/current/hadoop-yarn-client/*:/usr/hdp/current/hadoop-yarn-client/lib/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/lib/*:$PWD/mr-framework/hadoop/share/hadoop/common/*:$PWD/mr-framework/hadoop/share/hadoop/common/lib/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/lib/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/lib/*:$PWD/mr-framework/hadoop/share/hadoop/tools/lib/*:/usr/hdp/${hdp.version}/hadoop/lib/hadoop-lzo-0.6.0.${hdp.version}.jar:/etc/hadoop/conf/secure:
>  bad substitution
> 
>   at org.apache.hadoop.util.Shell.runCommand(Shell.java:545)
>   at org.apache.hadoop.util.Shell.run(Shell.java:456)
>   at 
> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:722)
>   at 
> org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211)
>   at 
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
>   at 
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> 

Stop a Dstream computation

2015-09-24 Thread Samya
Hi Team,

I have a code piece as follows.

try{
someDstream.someaction(...)   //Step1
}catch{
   case ex:Exception =>{
someDstream.someaction(...)   //Step2
   }
}

When I get an exception for current batch, Step2 executes as expected.

But for the next batches, when there is no exception, then also both step1 &
step2 executes. In this scenario I want only Step1 to execute.

Regards,
Sam



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Stop-a-Dstream-computation-tp24816.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: No space left on device when running graphx job

2015-09-24 Thread Jack Yang
Hi all,
I resolved the problems.
Thanks folk.
Jack

From: Jack Yang [mailto:j...@uow.edu.au]
Sent: Friday, 25 September 2015 9:57 AM
To: Ted Yu; Andy Huang
Cc: user@spark.apache.org
Subject: RE: No space left on device when running graphx job

Also, please see the screenshot below from spark web ui:
This is the snapshot just 5 seconds (I guess) before the job crashed.

[cid:image001.png@01D0F79F.44F6CC70]

From: Jack Yang [mailto:j...@uow.edu.au]
Sent: Friday, 25 September 2015 9:55 AM
To: Ted Yu; Andy Huang
Cc: user@spark.apache.org
Subject: RE: No space left on device when running graphx job

Hi, here is the full stack trace:

15/09/25 09:50:14 WARN scheduler.TaskSetManager: Lost task 21088.0 in stage 6.0 
(TID 62230, 192.168.70.129): java.io.IOException: No space left on device
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:345)
at 
java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at java.io.DataOutputStream.writeLong(DataOutputStream.java:224)
at 
org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFile$1$$anonfun$apply$mcV$sp$1.apply$mcVJ$sp(IndexShuffleBlockResolver.scala:86)
at 
org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFile$1$$anonfun$apply$mcV$sp$1.apply(IndexShuffleBlockResolver.scala:84)
at 
org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFile$1$$anonfun$apply$mcV$sp$1.apply(IndexShuffleBlockResolver.scala:84)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofLong.foreach(ArrayOps.scala:168)
at 
org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFile$1.apply$mcV$sp(IndexShuffleBlockResolver.scala:84)
at 
org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFile$1.apply(IndexShuffleBlockResolver.scala:80)
at 
org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFile$1.apply(IndexShuffleBlockResolver.scala:80)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
at 
org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFile(IndexShuffleBlockResolver.scala:88)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:71)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)


I am using df –i command to monitor the inode usage, which shows the below all 
the time:

Filesystem  Inodes  IUsed  IFree IUse% Mounted on
/dev/sda1  1245184 275424 969760   23% /
udev382148484 3816641% /dev
tmpfs   384505366 3841391% /run
none384505  3 3845021% /run/lock
none384505  1 3845041% /run/shm



From: Ted Yu [mailto:yuzhih...@gmail.com]
Sent: Thursday, 24 September 2015 9:12 PM
To: Andy Huang
Cc: Jack Yang; user@spark.apache.org
Subject: Re: No space left on device when running graphx job

Andy:
Can you show complete stack trace ?

Have you checked there are enough free inode on the .129 machine ?

Cheers

On Sep 23, 2015, at 11:43 PM, Andy Huang 
> wrote:
Hi Jack,

Are you writing out to disk? Or it sounds like Spark is spilling to disk (RAM 
filled up) and it's running out of disk space.

Cheers
Andy

On Thu, Sep 24, 2015 at 4:29 PM, Jack Yang 
> wrote:
Hi folk,

I have an issue of graphx. (spark: 1.4.0 + 4 machines + 4G memory + 4 CPU cores)
Basically, I load data using GraphLoader.edgeListFile mthod and then count 
number of nodes using: graph.vertices.count() method.
The problem is :

Lost task 11972.0 in stage 6.0 (TID 54585, 192.168.70.129): 
java.io.IOException: No space left on device
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:345)

when I try a small amount of data, the code is working. So I guess the error 
comes from the amount of data.
This is how I submit the job:

spark-submit --class "myclass"
--master spark://hadoopmaster:7077  (I am using standalone)
--executor-memory 2048M
--driver-java-options "-XX:MaxPermSize=2G"
--total-executor-cores 4  my.jar


Any thoughts?
Best 

Re: Unable to start spark-shell on YARN

2015-09-24 Thread ๏̯͡๏
This is resolved now
On Thu, Sep 24, 2015 at 7:47 PM Doug Balog  wrote:

> The error is because the shell is trying to resolve hdp.version and can’t.
> To fix this, you need to put a file called java-opts in your conf
> directory that  has something like this.
>
> -Dhdp.version=2.x.x.x
>
> Where 2.x.x.x is there version of hdp that you are using.
> Cheers,
>
> Doug
>
> > On Sep 24, 2015, at 6:11 PM, ÐΞ€ρ@Ҝ (๏̯͡๏)  wrote:
> >
> > Spark 1.4.1
> > YARN
> > Hadoop version: 2.7.1.2.3.1.0-2574
> > ./bin/spark-shell  --master yarn
> > Hadoop cluster setup using Ambari.
> >
> >
> > Shell fails as YARN job failed. Any suggestions ?
> >
> > LOGS:
> >
> > 15/09/24 15:07:51 INFO impl.YarnClientImpl: Submitted application
> application_1443126834156_0016
> > 15/09/24 15:07:52 INFO yarn.Client: Application report for
> application_1443126834156_0016 (state: ACCEPTED)
> > 15/09/24 15:07:52 INFO yarn.Client:
> >client token: N/A
> >diagnostics: N/A
> >ApplicationMaster host: N/A
> >ApplicationMaster RPC port: -1
> >queue: default
> >start time: 1443132471179
> >final status: UNDEFINED
> >tracking URL:
> http://host:8088/proxy/application_1443126834156_0016/
> >user: zeppelin
> > 15/09/24 15:07:53 INFO yarn.Client: Application report for
> application_1443126834156_0016 (state: ACCEPTED)
> > 15/09/24 15:07:54 INFO yarn.Client: Application report for
> application_1443126834156_0016 (state: ACCEPTED)
> > 15/09/24 15:07:55 INFO yarn.Client: Application report for
> application_1443126834156_0016 (state: ACCEPTED)
> > 15/09/24 15:07:56 INFO yarn.Client: Application report for
> application_1443126834156_0016 (state: FAILED)
> > 15/09/24 15:07:56 INFO yarn.Client:
> >client token: N/A
> >diagnostics: Application application_1443126834156_0016 failed 2
> times due to AM Container for appattempt_1443126834156_0016_02 exited
> with  exitCode: 1
> > For more detailed output, check application tracking page:
> http://host:8088/cluster/app/application_1443126834156_0016Then, click on
> links to logs of each attempt.
> > Diagnostics: Exception from container-launch.
> > Container id: container_e03_1443126834156_0016_02_01
> > Exit code: 1
> > Exception message:
> /hadoop/yarn/local/usercache/zeppelin/appcache/application_1443126834156_0016/container_e03_1443126834156_0016_02_01/launch_container.sh:
> line 24:
> $PWD:$PWD/__hadoop_conf__:$PWD/__spark__.jar:$HADOOP_CONF_DIR:/usr/hdp/current/hadoop-client/*:/usr/hdp/current/hadoop-client/lib/*:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-hdfs-client/lib/*:/usr/hdp/current/hadoop-yarn-client/*:/usr/hdp/current/hadoop-yarn-client/lib/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/lib/*:$PWD/mr-framework/hadoop/share/hadoop/common/*:$PWD/mr-framework/hadoop/share/hadoop/common/lib/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/lib/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/lib/*:$PWD/mr-framework/hadoop/share/hadoop/tools/lib/*:/usr/hdp/${hdp.version}/hadoop/lib/hadoop-lzo-0.6.0.${hdp.version}.jar:/etc/hadoop/conf/secure:
> bad substitution
> >
> > Stack trace: ExitCodeException exitCode=1:
> /hadoop/yarn/local/usercache/zeppelin/appcache/application_1443126834156_0016/container_e03_1443126834156_0016_02_01/launch_container.sh:
> line 24:
> $PWD:$PWD/__hadoop_conf__:$PWD/__spark__.jar:$HADOOP_CONF_DIR:/usr/hdp/current/hadoop-client/*:/usr/hdp/current/hadoop-client/lib/*:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-hdfs-client/lib/*:/usr/hdp/current/hadoop-yarn-client/*:/usr/hdp/current/hadoop-yarn-client/lib/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/lib/*:$PWD/mr-framework/hadoop/share/hadoop/common/*:$PWD/mr-framework/hadoop/share/hadoop/common/lib/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/lib/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/lib/*:$PWD/mr-framework/hadoop/share/hadoop/tools/lib/*:/usr/hdp/${hdp.version}/hadoop/lib/hadoop-lzo-0.6.0.${hdp.version}.jar:/etc/hadoop/conf/secure:
> bad substitution
> >
> >   at org.apache.hadoop.util.Shell.runCommand(Shell.java:545)
> >   at org.apache.hadoop.util.Shell.run(Shell.java:456)
> >   at
> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:722)
> >   at
> org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211)
> >   at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
> >   at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
> >  

Re: Spark ClosureCleaner or java serializer OOM when trying to grow

2015-09-24 Thread jluan
With spark.serializer.objectStreamReset set to 1, I ran a sample scala test
code which still seems to be crashing at the same place. If someone could
verify this independently, I would greatly appreciate it.

Scala Code:
--
import scala.util.Random
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.mllib.tree.RandomForest
import org.apache.spark.mllib.tree.model.RandomForestModel
import org.apache.spark.mllib.util.MLUtils

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint

val r = Random

var size = 1500
var count = 3000
val indptr = (1 to size by size/count).toArray
val data = Seq.fill(count)(r.nextDouble()).toArray

var dset = ArrayBuffer[LabeledPoint]()
for (i <- 1 to 10) {
dset += LabeledPoint(r.nextInt(2), Vectors.sparse(size, indptr, data));
}

val distData = sc.parallelize(dset)
val splits = distData.randomSplit(Array(0.7, 0.3))
val (trainingData, testData) = (splits(0), splits(1))

// Train a RandomForest model.
//  Empty categoricalFeaturesInfo indicates all features are continuous.
val numClasses = 2
val categoricalFeaturesInfo = Map[Int, Int]()
val numTrees = 3 // Use more in practice.
val featureSubsetStrategy = "auto" // Let the algorithm choose.
val impurity = "gini"
val maxDepth = 4
val maxBins = 32

val model = RandomForest.trainClassifier(trainingData, numClasses,
categoricalFeaturesInfo,
  numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-ClosureCleaner-or-java-serializer-OOM-when-trying-to-grow-tp24796p24818.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark.mesos.coarse impacts memory performance on mesos

2015-09-24 Thread Utkarsh Sengar
Bumping this one up, any suggestions on the stacktrace?
spark.mesos.coarse=true is not working and the driver crashed with the
error.

On Wed, Sep 23, 2015 at 3:29 PM, Utkarsh Sengar 
wrote:

> Missed to do a reply-all.
>
> Tim,
>
> spark.mesos.coarse = true doesn't work and spark.mesos.coarse = false
> works (sorry there was a typo in my last email, I meant "when I do
> "spark.mesos.coarse=false", the job works like a charm. ").
>
> I get this exception with spark.mesos.coarse = true:
>
> 15/09/22 20:18:05 INFO MongoCollectionSplitter: Created split: min={ "_id"
> : "55af4bf26750ad38a444d7cf"}, max= { "_id" : "55af5a61e8a42806f47546c1"}
>
> 15/09/22
> 20:18:05 INFO MongoCollectionSplitter: Created split: min={ "_id" :
> "55af5a61e8a42806f47546c1"}, max= null
>
> Exception
> in thread "main" java.lang.OutOfMemoryError: Java heap space
>
> 
> at org.apache.spark.rdd.CartesianRDD.getPartitions(CartesianRDD.scala:60)
>
> 
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
>
> 
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
>
> 
> at scala.Option.getOrElse(Option.scala:120)
>
> 
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
>
> 
> at org.apache.spark.rdd.CartesianRDD.getPartitions(CartesianRDD.scala:60)
>
> 
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
>
> 
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
>
> 
> at scala.Option.getOrElse(Option.scala:120)
>
> 
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
>
> 
> at
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
>
> 
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
>
> 
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
>
> 
> at scala.Option.getOrElse(Option.scala:120)
>
> 
> at 

Using Map and Basic Operators yield java.lang.ClassCastException (Parquet + Hive + Spark SQL 1.5.0 + Thrift)

2015-09-24 Thread Dominic Ricard
Hi,
   I stumbled on the following today. We have Parquet files that expose a
column in a Map format. This is very convenient as we have data parts that
can vary in time. Not knowing what the data will be, we simply split it in
tuples and insert it as a map inside 1 column.

Retrieving the data is very easy. Syntax looks like this:

select column.key1.key2 from table;

Column value look like this:
{}
{"key1":"value"}
{"key1":{"key2":"value"}}

But when trying to do basic operators on that column, I get the following
error:

query: select (column.key1.key2 / 30 < 1) from table

ERROR processing query/statement. Error Code: 0, SQL state:
TStatus(statusCode:ERROR_STATUS,
infoMessages:[*org.apache.hive.service.cli.HiveSQLException:java.lang.ClassCastException:
org.apache.spark.sql.types.NullType$ cannot be cast to
org.apache.spark.sql.types.MapType:26:25,
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation:runInternal:SparkExecuteStatementOperation.scala:259,
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation:run:SparkExecuteStatementOperation.scala:144,
org.apache.hive.service.cli.session.HiveSessionImpl:executeStatementInternal:HiveSessionImpl.java:388,
org.apache.hive.service.cli.session.HiveSessionImpl:executeStatement:HiveSessionImpl.java:369,
sun.reflect.GeneratedMethodAccessor115:invoke::-1,
sun.reflect.DelegatingMethodAccessorImpl:invoke:DelegatingMethodAccessorImpl.java:43,
java.lang.reflect.Method:invoke:Method.java:497,
org.apache.hive.service.cli.session.HiveSessionProxy:invoke:HiveSessionProxy.java:78,
org.apache.hive.service.cli.session.HiveSessionProxy:access$000:HiveSessionProxy.java:36,
org.apache.hive.service.cli.session.HiveSessionProxy$1:run:HiveSessionProxy.java:63,
java.security.AccessController:doPrivileged:AccessController.java:-2,
javax.security.auth.Subject:doAs:Subject.java:422,
org.apache.hadoop.security.UserGroupInformation:doAs:UserGroupInformation.java:1628,
org.apache.hive.service.cli.session.HiveSessionProxy:invoke:HiveSessionProxy.java:59,
com.sun.proxy.$Proxy39:executeStatement::-1,
org.apache.hive.service.cli.CLIService:executeStatement:CLIService.java:261,
org.apache.hive.service.cli.thrift.ThriftCLIService:ExecuteStatement:ThriftCLIService.java:486,
org.apache.hive.service.cli.thrift.TCLIService$Processor$ExecuteStatement:getResult:TCLIService.java:1313,
org.apache.hive.service.cli.thrift.TCLIService$Processor$ExecuteStatement:getResult:TCLIService.java:1298,
org.apache.thrift.ProcessFunction:process:ProcessFunction.java:39,
org.apache.thrift.TBaseProcessor:process:TBaseProcessor.java:39,
org.apache.hive.service.auth.TSetIpAddressProcessor:process:TSetIpAddressProcessor.java:56,
org.apache.thrift.server.TThreadPoolServer$WorkerProcess:run:TThreadPoolServer.java:285,
java.util.concurrent.ThreadPoolExecutor:runWorker:ThreadPoolExecutor.java:1142,
java.util.concurrent.ThreadPoolExecutor$Worker:run:ThreadPoolExecutor.java:617,
java.lang.Thread:run:Thread.java:745], errorCode:0,
errorMessage:java.lang.ClassCastException:
org.apache.spark.sql.types.NullType$ cannot be cast to
org.apache.spark.sql.types.MapType), Query:

Trying to apply a Logical or Arithmetic Operator on Map value will yield the
error.

The solution I found was to simply cast as a int and change my logical
evaluation to equal. (In this case, I wanted to get TRUE if the value was
less than 1 and casting it as a float or double was yielding the same error)

select cast(cast(column.key1.key2 as int) / 30 as int) = 0 from table
(Yes. Need to cast the remainder for some reason...)

Can anyone shine some light on why map type have problem dealing with basic
operators?

Thanks!




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Using-Map-and-Basic-Operators-yield-java-lang-ClassCastException-Parquet-Hive-Spark-SQL-1-5-0-Thrift-tp24809.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Java Heap Space Error

2015-09-24 Thread java8964
I can understand why your first query will finish without OOM, but the new one 
will fail with OOM.
In the new query, you are asking a groupByKey/cogroup operation, which will 
force all the productName + prodcutionCatagory per user id sent to the same 
reducer. This could easily below out reducer's memory if you have one user id 
having lot of productName and productCatagory.
Keep in mind that Spark on the reducer side still use a Hash to merge all the 
data from different mappers, so the memory in the reduce side has to be able to 
merge all the productionName + productCatagory for the most frequently shown up 
user id (at least), and I don't know why you want all the productName and 
productCategory per user Id (Maybe a distinct could be enough?).
Image you have one user id show up 1M time in your dataset, with 0.5M 
productname as 'A', and 0.5M product name as 'B', and your query will push 1M 
of 'A' and 'B' into the same reducer, and ask Spark to merge them in the 
HashMap for you for that user Id. This will cause OOM.
Above all, you need to find out what is the max count per user id in your data: 
select max(count(*)) from land where . group by userid
Your memory has to support that amount of productName and productCatagory, and 
if your partition number is not high enough (even as your unique count of user 
id), if that is really what you want, to consolidate all the productionName and 
product catagory together, without even consider removing duplication.
But both query still should push similar records count per partition, but with 
much of different volume size of data.
Yong

Subject: Re: Java Heap Space Error
From: yu...@useinsider.com
Date: Thu, 24 Sep 2015 18:56:51 +0300
CC: jingyu.zh...@news.com.au; user@spark.apache.org
To: java8...@hotmail.com

Yes right, the query you wrote worked in same cluster. In this case, partitions 
were equally distributed but when i used regex and concetanations it’s not as i 
said before. Query with concetanation is below:
val usersInputDF = sqlContext.sql(
  s"""
 |  select userid,concat_ws(' ',collect_list(concat_ws(' ',if(productname 
is not 
NULL,lower(productname),''),lower(regexp_replace(regexp_replace(substr(productcategory,2,length(productcategory)-2),'\"',''),\",\",'
 ') inputlist from landing where 
dt='${dateUtil.getYear}-${dateUtil.getMonth}' and day >= '${day}' and userid != 
'' and userid is not null and userid is not NULL and pagetype = 'productDetail' 
group by userid

   """.stripMargin)

On 24 Sep 2015, at 16:52, java8964  wrote:This is 
interesting.
So you mean that query as 
"select userid from landing where dt='2015-9' and userid != '' and userid is 
not null and userid is not NULL and pagetype = 'productDetail' group by userid"
works in your cluster?
In this case, do you also see this one task with way more data than the rest, 
as it happened when you use regex and concatenation?
It is hard to believe that just add "regex" and "concatenation" will make the 
distribution more equally across partitions. In your query, the distribution in 
the partitions simply depends on the Hash partitioner of "userid".
Can you show us the query after you add "regex" and "concatenation"?
Yong

Subject: Re: Java Heap Space Error
From: yu...@useinsider.com
Date: Thu, 24 Sep 2015 15:34:48 +0300
CC: user@spark.apache.org
To: jingyu.zh...@news.com.au; java8...@hotmail.com

@JingyuYes, it works without regex and concatenation as the query below:
So, what we can understand from this? Because when i do like that, shuffle read 
sizes are equally distributed between partitions.
val usersInputDF = sqlContext.sql(s""" |  select userid from landing 
where dt='2015-9' and userid != '' and userid is not null and userid is not 
NULL and pagetype = 'productDetail' group by userid
   """.stripMargin)
@java8964
I tried with sql.shuffle.partitions = 1 but no luck. It’s again one of the 
partitions shuffle size is huge and the others are very small.

——So how can i balance this shuffle read size between partitions?

On 24 Sep 2015, at 03:35, Zhang, Jingyu  wrote:Is you 
sql works if do not runs a regex on strings and concatenates them, I mean just 
Select the stuff without String operations?

On 24 September 2015 at 10:11, java8964  wrote:
Try to increase partitions count, that will make each partition has less data.
Yong

Subject: Re: Java Heap Space Error
From: yu...@useinsider.com
Date: Thu, 24 Sep 2015 00:32:47 +0300
CC: user@spark.apache.org
To: java8...@hotmail.com

Yes, it’s possible. I use S3 as data source. My external tables has 
partitioned. Belowed task is 193/200. Job has 2 stages and its 193. task of 200 
in 2.stage because of sql.shuffle.partitions. 
How can i avoid this situation, this is my query:
select userid,concat_ws(' ',collect_list(concat_ws(' ',if(productname is not 

Re: Reading Hive Tables using SQLContext

2015-09-24 Thread Sathish Kumaran Vairavelu
Thanks Michael. Just want to check if there is a roadmap to include Hive
tables from SQLContext.

-Sathish

On Thu, Sep 24, 2015 at 7:46 PM Michael Armbrust 
wrote:

> No, you have to use a HiveContext.
>
> On Thu, Sep 24, 2015 at 2:47 PM, Sathish Kumaran Vairavelu <
> vsathishkuma...@gmail.com> wrote:
>
>> Hello,
>>
>> Is it possible to access Hive tables directly from SQLContext instead of
>> HiveContext? I am facing with errors while doing it.
>>
>> Please let me know
>>
>>
>> Thanks
>>
>> Sathish
>>
>
>


Exception on save s3n file (1.4.1, hadoop 2.6)

2015-09-24 Thread Zhang, Jingyu
I got following exception when I run
JavPairRDD.values().saveAsTextFile("s3n://bucket);
Can anyone help me out? thanks


15/09/25 12:24:32 INFO SparkContext: Successfully stopped SparkContext

Exception in thread "main" java.lang.NoClassDefFoundError:
org/jets3t/service/ServiceException

at org.apache.hadoop.fs.s3native.NativeS3FileSystem.createDefaultStore(
NativeS3FileSystem.java:338)

at org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(
NativeS3FileSystem.java:328)

at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2596)

at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)

at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)

at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)

at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)

at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)

at
org.apache.spark.SparkHadoopWriter$.createPathFromString(SparkHadoopWriter.scala:170)

at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:988)

at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:965)

at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:965)

at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)

at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)

at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)

at
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:965)

at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:897)

at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:897)

at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:897)

at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)

at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)

at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)

at
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:896)

at
org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1404)

at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1383)

at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1383)

at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)

at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)

at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)

at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1383)

at
org.apache.spark.api.java.JavaRDDLike$class.saveAsTextFile(JavaRDDLike.scala:519)

at
org.apache.spark.api.java.AbstractJavaRDDLike.saveAsTextFile(JavaRDDLike.scala:47)

at com.news.report.section.SectionSubSection.run(SectionSubSection.java:184)

at com.news.report.section.SectionSubSection.main(SectionSubSection.java:67)

Caused by: java.lang.ClassNotFoundException:
org.jets3t.service.ServiceException

at java.net.URLClassLoader.findClass(URLClassLoader.java:381)

at java.lang.ClassLoader.loadClass(ClassLoader.java:424)

at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)

at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

... 34 more

-- 
This message and its attachments may contain legally privileged or 
confidential information. It is intended solely for the named addressee. If 
you are not the addressee indicated in this message or responsible for 
delivery of the message to the addressee, you may not copy or deliver this 
message or its attachments to anyone. Rather, you should permanently delete 
this message and its attachments and kindly notify the sender by reply 
e-mail. Any content of this message and its attachments which does not 
relate to the official business of the sending company must be taken not to 
have been sent or endorsed by that company or any of its related entities. 
No warranty is made that the e-mail or attachments are free from computer 
virus or other defect.


RE: Using Map and Basic Operators yield java.lang.ClassCastException (Parquet + Hive + Spark SQL 1.5.0 + Thrift)

2015-09-24 Thread Dominic Ricard
No, those were just examples on how maps can look like. In my case, the 
key-value is either there or not in the form of the later:

{"key1":{"key2":"value"}}

If key1 is present, then it will contain a tuple of key2:value, value being a 
'int'

I guess, after some testing, that my problem is on how casting a Map value to 
the primitives Float and Double are handled. Handling INT is all good but float 
and double are causing the exception.

Thanks.

Dominic Ricard
Triton Digital

-Original Message-
From: Cheng Lian [mailto:lian.cs@gmail.com] 
Sent: Thursday, September 24, 2015 5:47 PM
To: Dominic Ricard; user@spark.apache.org
Subject: Re: Using Map and Basic Operators yield java.lang.ClassCastException 
(Parquet + Hive + Spark SQL 1.5.0 + Thrift)



On 9/24/15 11:34 AM, Dominic Ricard wrote:
> Hi,
> I stumbled on the following today. We have Parquet files that 
> expose a column in a Map format. This is very convenient as we have 
> data parts that can vary in time. Not knowing what the data will be, 
> we simply split it in tuples and insert it as a map inside 1 column.
>
> Retrieving the data is very easy. Syntax looks like this:
>
> select column.key1.key2 from table;
>
> Column value look like this:
> {}
> {"key1":"value"}
> {"key1":{"key2":"value"}}
Do you mean that the value type of the map may also vary? The 2nd record has a 
string value, while the 3rd one has another nested map as its value. This isn't 
supported in Spark SQL.
>
> But when trying to do basic operators on that column, I get the 
> following
> error:
>
> query: select (column.key1.key2 / 30 < 1) from table
>
> ERROR processing query/statement. Error Code: 0, SQL state:
> TStatus(statusCode:ERROR_STATUS,
> infoMessages:[*org.apache.hive.service.cli.HiveSQLException:java.lang.ClassCastException:
> org.apache.spark.sql.types.NullType$ cannot be cast to 
> org.apache.spark.sql.types.MapType:26:25,
> org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation:
> runInternal:SparkExecuteStatementOperation.scala:259,
> org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation:
> run:SparkExecuteStatementOperation.scala:144,
> org.apache.hive.service.cli.session.HiveSessionImpl:executeStatementIn
> ternal:HiveSessionImpl.java:388, 
> org.apache.hive.service.cli.session.HiveSessionImpl:executeStatement:H
> iveSessionImpl.java:369, 
> sun.reflect.GeneratedMethodAccessor115:invoke::-1,
> sun.reflect.DelegatingMethodAccessorImpl:invoke:DelegatingMethodAccess
> orImpl.java:43, java.lang.reflect.Method:invoke:Method.java:497,
> org.apache.hive.service.cli.session.HiveSessionProxy:invoke:HiveSessio
> nProxy.java:78, 
> org.apache.hive.service.cli.session.HiveSessionProxy:access$000:HiveSe
> ssionProxy.java:36, 
> org.apache.hive.service.cli.session.HiveSessionProxy$1:run:HiveSession
> Proxy.java:63, 
> java.security.AccessController:doPrivileged:AccessController.java:-2,
> javax.security.auth.Subject:doAs:Subject.java:422,
> org.apache.hadoop.security.UserGroupInformation:doAs:UserGroupInformat
> ion.java:1628, 
> org.apache.hive.service.cli.session.HiveSessionProxy:invoke:HiveSessio
> nProxy.java:59, com.sun.proxy.$Proxy39:executeStatement::-1,
> org.apache.hive.service.cli.CLIService:executeStatement:CLIService.jav
> a:261, 
> org.apache.hive.service.cli.thrift.ThriftCLIService:ExecuteStatement:T
> hriftCLIService.java:486, 
> org.apache.hive.service.cli.thrift.TCLIService$Processor$ExecuteStatem
> ent:getResult:TCLIService.java:1313,
> org.apache.hive.service.cli.thrift.TCLIService$Processor$ExecuteStatem
> ent:getResult:TCLIService.java:1298,
> org.apache.thrift.ProcessFunction:process:ProcessFunction.java:39,
> org.apache.thrift.TBaseProcessor:process:TBaseProcessor.java:39,
> org.apache.hive.service.auth.TSetIpAddressProcessor:process:TSetIpAddr
> essProcessor.java:56, 
> org.apache.thrift.server.TThreadPoolServer$WorkerProcess:run:TThreadPo
> olServer.java:285, 
> java.util.concurrent.ThreadPoolExecutor:runWorker:ThreadPoolExecutor.j
> ava:1142, 
> java.util.concurrent.ThreadPoolExecutor$Worker:run:ThreadPoolExecutor.
> java:617, java.lang.Thread:run:Thread.java:745], errorCode:0,
> errorMessage:java.lang.ClassCastException:
> org.apache.spark.sql.types.NullType$ cannot be cast to 
> org.apache.spark.sql.types.MapType), Query:
>
> Trying to apply a Logical or Arithmetic Operator on Map value will 
> yield the error.
>
> The solution I found was to simply cast as a int and change my logical 
> evaluation to equal. (In this case, I wanted to get TRUE if the value 
> was less than 1 and casting it as a float or double was yielding the 
> same error)
>
> select cast(cast(column.key1.key2 as int) / 30 as int) = 0 from table 
> (Yes. Need to cast the remainder for some reason...)
>
> Can anyone shine some light on why map type have problem dealing with 
> basic operators?
>
> Thanks!
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Using-Map-and-Basi
> 

Re: how to submit the spark job outside the cluster

2015-09-24 Thread Zhiliang Zhu
Hi Zhan,
I have done that as your kind help.
However, I just could use "hadoop fs -ls/-mkdir/-rm XXX" commands to operate at 
the remote machine with gateway, 
but commands "hadoop fs -cat/-put XXX    YYY" would not work with error message 
as below:
put: File /user/zhuzl/wordcount/input/1._COPYING_ could only be replicated to 0 
nodes instead of minReplication (=1).  There are 2 datanode(s) running and 2 
node(s) are excluded in this operation.
15/09/25 10:44:00 INFO hdfs.DFSClient: Exception in createBlockOutputStream
org.apache.hadoop.net.ConnectTimeoutException: 6 millis timeout while 
waiting for channel to be ready for connect. ch : 
java.nio.channels.SocketChannel[connection-pending remote=/10.6.28.96:50010]...
in the cluster, all machines' /etc/hosts10.6.32.132  master  #all is local area 
network ip
10.6.28.96    core1    #must this place use global ip, in order to operate for 
remote machine ? 
10.6.26.160  core2  

in the remote machine's /etc/hosts
42.62.77.77 master  #all is global area network ip, or else no commands will 
work
42.62.77.81 core1   #but still -cat / -put will not work
42.62.77.83 core2

Would you help comment some...
Thank you very much!Zhiliang
 



 On Wednesday, September 23, 2015 11:30 AM, Zhan Zhang 
 wrote:
   

 Hi Zhiliang,
I cannot find a specific doc. But as far as I remember, you can log in one of 
your cluster machine, and find the hadoop configuration location, for example 
/etc/hadoop/conf, copy that directory to your local machine. Typically it has 
hdfs-site.xml, yarn-site.xml etc. In spark, the former is used to access hdfs, 
and the latter is used to launch application on top of yarn.
Then in the spark-env.sh, you add export HADOOP_CONF_DIR=/etc/hadoop/conf. 
Thanks.
Zhan Zhang

On Sep 22, 2015, at 8:14 PM, Zhiliang Zhu  wrote:

Hi Zhan,
Yes, I get it now. 
I have not ever deployed hadoop configuration locally, and do not find the 
specific doc, would you help provide the doc to do that...
Thank you,Zhiliang

On Wednesday, September 23, 2015 11:08 AM, Zhan Zhang  
wrote:


There is no difference between running the client in or out of the client 
(assuming there is no firewall or network connectivity issue), as long as you 
have hadoop configuration locally.  Here is the doc for running on yarn.
http://spark.apache.org/docs/latest/running-on-yarn.html
Thanks.
Zhan Zhang
On Sep 22, 2015, at 7:49 PM, Zhiliang Zhu  wrote:

Hi Zhan,
Thanks very much for your help comment.I also view it would be similar to 
hadoop job submit, however, I was not deciding whether it is like that whenit 
comes to spark.  
Have you ever tried that for spark...Would you give me the deployment doc for 
hadoop and spark gateway, since this is the first time for meto do that, I do 
not find the specific doc for it.

Best Regards,Zhiliang




On Wednesday, September 23, 2015 10:20 AM, Zhan Zhang  
wrote:


It should be similar to other hadoop jobs. You need hadoop configuration in 
your client machine, and point the HADOOP_CONF_DIR in spark to the 
configuration.
Thanks
Zhan Zhang
On Sep 22, 2015, at 6:37 PM, Zhiliang Zhu  wrote:

Dear Experts,

Spark job is running on the cluster by yarn. Since the job can be submited at 
the place on the machine from the cluster,however, I would like to submit the 
job from another machine which does not belong to the cluster.I know for this, 
hadoop job could be done by way of another machine which is installed hadoop 
gateway which is usedto connect the cluster.
Then what would go for spark, is it same as hadoop... And where is the 
instruction doc for installing this gateway...
Thank you very much~~Zhiliang 













  

ERROR BoundedByteBufferReceive: OOME with size 352518400

2015-09-24 Thread Sourabh Chandak
Hi,

I have ported receiver less spark streaming for kafka to Spark 1.2 and am
trying to run a spark streaming job to consume data form my broker, but I
am getting the following error:

15/09/24 20:17:45 ERROR BoundedByteBufferReceive: OOME with size 352518400
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at
kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
at
kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:83)
at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:80)
at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:103)
at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:126)
at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:125)
at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:346)
at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:342)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at
scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at org.apache.spark.streaming.kafka.KafkaCluster.org
$apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:342)
at
org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:125)
at
org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
at
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:296)
at
org.ofe.weve.test.KafkaTest$.setupProcessingContext(KafkaTest.scala:35)
at org.ofe.weve.test.KafkaTest$.main(KafkaTest.scala:58)
at org.ofe.weve.test.KafkaTest.main(KafkaTest.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)



I have tried allocating 100G of memory with 1 executor but it is still
failing.

Spark version: 1.2.2
Kafka version ported: 0.8.2
Kafka server version: trunk version with SSL enabled

Can someone please help me debug this.

Thanks,
Sourabh


Re: ERROR BoundedByteBufferReceive: OOME with size 352518400

2015-09-24 Thread Sourabh Chandak
Adding Cody and Sriharsha

On Thu, Sep 24, 2015 at 1:25 PM, Sourabh Chandak 
wrote:

> Hi,
>
> I have ported receiver less spark streaming for kafka to Spark 1.2 and am
> trying to run a spark streaming job to consume data form my broker, but I
> am getting the following error:
>
> 15/09/24 20:17:45 ERROR BoundedByteBufferReceive: OOME with size 352518400
> java.lang.OutOfMemoryError: Java heap space
> at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
> at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
> at
> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
> at
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
> at
> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> at
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
> at
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:83)
> at
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:80)
> at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:103)
> at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:126)
> at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:125)
> at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:346)
> at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:342)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at
> scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
> at org.apache.spark.streaming.kafka.KafkaCluster.org
> $apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:342)
> at
> org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:125)
> at
> org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
> at
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:296)
> at
> org.ofe.weve.test.KafkaTest$.setupProcessingContext(KafkaTest.scala:35)
> at org.ofe.weve.test.KafkaTest$.main(KafkaTest.scala:58)
> at org.ofe.weve.test.KafkaTest.main(KafkaTest.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
>
>
> I have tried allocating 100G of memory with 1 executor but it is still
> failing.
>
> Spark version: 1.2.2
> Kafka version ported: 0.8.2
> Kafka server version: trunk version with SSL enabled
>
> Can someone please help me debug this.
>
> Thanks,
> Sourabh
>


Re: Why Checkpoint is throwing "actor.OneForOneStrategy: NullPointerException"

2015-09-24 Thread Terry Hoo
I met this before: in my program, some DStreams are not initialized since
they are not in the path of  of output.

You can  check if you are the same case.


Thanks!
- Terry

On Fri, Sep 25, 2015 at 10:22 AM, Tathagata Das  wrote:

> Are you by any chance setting DStream.remember() with null?
>
> On Thu, Sep 24, 2015 at 5:02 PM, Uthayan Suthakar <
> uthayan.sutha...@gmail.com> wrote:
>
>> Hello all,
>>
>> My Stream job is throwing below exception at every interval. It is first
>> deleting the the checkpoint file and then it's trying to checkpoint, is
>> this normal behaviour? I'm using Spark 1.3.0. Do you know what may cause
>> this issue?
>>
>> 15/09/24 16:35:55 INFO scheduler.TaskSetManager: Finished task 1.0 in
>> stage 84.0 (TID 799) in 12 ms on itrac1511.cern.ch (1/8)
>> *15/09/24 16:35:55 INFO streaming.CheckpointWriter:
>> Deleting 
>> hdfs://p01001532067275/user/wdtmon/wdt-dstream-6/checkpoint-144310422*
>> *15/09/24 16:35:55 INFO streaming.CheckpointWriter: Checkpoint for time
>> 144310422 ms saved to file
>> 'hdfs://p01001532067275/user/wdtmon/wdt-dstream-6/*
>> checkpoint-144310422', took 10696 bytes and 108 ms
>> 15/09/24 16:35:55 INFO streaming.DStreamGraph: Clearing checkpoint data
>> for time 144310422 ms
>> 15/09/24 16:35:55 INFO streaming.DStreamGraph: Cleared checkpoint data
>> for time 144310422 ms
>> 15/09/24 16:35:55 ERROR actor.OneForOneStrategy:
>> java.lang.NullPointerException
>> at
>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:168)
>> at
>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:168)
>> at
>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>> at
>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>> at
>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>> at
>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>> at
>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>> at
>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>> at
>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:168)
>> at
>> org.apache.spark.streaming.scheduler.JobGenerator.clearCheckpointData(JobGenerator.scala:279)
>> at org.apache.spark.streaming.scheduler.JobGenerator.org
>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181)
>> at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:86)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>> at
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>>
>> Cheers,
>>
>> Uthay
>>
>>
>