Re: Is there a any plan to develop SPARK with c++??

2016-02-03 Thread Benjamin Kim
Hi DaeJin,

The closest thing I can think of is this.

https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html

Cheers,
Ben

> On Feb 3, 2016, at 9:49 PM, DaeJin Jung  wrote:
> 
> hello everyone,
> I have a short question.
> 
> I would like to improve performance for SPARK framework using intel native 
> instruction or etc.. So, I wonder if there are any plans to develop SPARK 
> with C++ or C in the near future.
> 
> Please let me know if you have any informantion.
> 
> Best Regards,
> Daejin Jung



Re: Re: About cache table performance in spark sql

2016-02-03 Thread fightf...@163.com
Hi, 
Thanks a lot for your explaination. I know that the slow process mainly caused 
by GC pressure and I had understand this difference 
just from your advice. 

I had each executor memory with 6GB and try to cache table. 
I had 3 executors and finally I can see some info from the spark job ui 
storage, like the following: 
 

RDD Name Storage Level Cached Partitions Fraction Cached Size in Memory Size in 
ExternalBlockStore Size on Disk
In-memory table video1203 Memory Deserialized 1x Replicated 251 100% 18.1 GB 
0.0 B 23.6 GB

I can see that spark sql try to cache data into memory. And when I ran the 
following queries over this table video1203, I can get
fast response. Another thing that confused me is that the above data size (in 
memory and on Disk). I can see that the in memory
data size is 18.1GB, which almost equals sum of my executor memory. But why the 
Disk size if 23.6GB? From impala I get the overall
parquet file size if about 24.59GB. Would be good to had some correction on 
this. 

Best,
Sun.



fightf...@163.com
 
From: Prabhu Joseph
Date: 2016-02-04 14:35
To: fightf...@163.com
CC: user
Subject: Re: About cache table performance in spark sql
Sun,

   When Executor don't have enough memory and if it tries to cache the data, it 
spends lot of time on GC and hence the job will be slow. Either,

 1. We should allocate enough memory to cache all RDD and hence the job 
will complete fast
Or 2. Don't use cache when there is not enough Executor memory.

  To check the GC time, use  --conf 
"spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" 
while submitting the job and SPARK_WORKER_DIR will have sysout with GC.
The sysout will show many "Full GC" happening when cache is used and executor 
does not have enough heap.


Thanks,
Prabhu Joseph

On Thu, Feb 4, 2016 at 11:25 AM, fightf...@163.com  wrote:
Hi, 

I want to make sure that the cache table indeed would accelerate sql queries. 
Here is one of my use case : 
  impala table size : 24.59GB,  no partitions, with about 1 billion+ rows.
I use sqlContext.sql to run queries over this table and try to do cache and 
uncache command to see if there
is any performance disparity. I ran the following query : select * from 
video1203 where id > 10 and id < 20 and added_year != 1989
I can see the following results : 

1  If I did not run cache table and just ran sqlContext.sql(), I can see the 
above query run about 25 seconds.
2  If I firstly run sqlContext.cacheTable("video1203"), the query runs super 
slow and would cause driver OOM exception, but I can 
get final results with about running 9 minuts. 

Would any expert can explain this for me ? I can see that cacheTable cause OOM 
just because the in-memory columnar storage 
cannot hold the 24.59GB+ table size into memory. But why the performance is so 
different and even so bad ? 

Best,
Sun.



fightf...@163.com



Re: spark streaming web ui not showing the events - direct kafka api

2016-02-03 Thread vimal dinakaran
No I am using DSE 4.8 which has spark 1.4. Is this a known issue ?

On Wed, Jan 27, 2016 at 11:52 PM, Cody Koeninger  wrote:

> Have you tried spark 1.5?
>
> On Wed, Jan 27, 2016 at 11:14 AM, vimal dinakaran 
> wrote:
>
>> Hi ,
>>  I am using spark 1.4 with direct kafka api . In my streaming ui , I am
>> able to see the events listed in UI only if add stream.print() statements
>> or else event rate and input events remains in 0 eventhough the events gets
>> processed.
>>
>> Without print statements , I have the action saveToCassandra in the
>> dstream.
>>
>> Any reasons why is this not working ?
>>
>> Thanks
>> Vimal
>>
>
>


Re: About cache table performance in spark sql

2016-02-03 Thread Prabhu Joseph
Sun,

   When Executor don't have enough memory and if it tries to cache the
data, it spends lot of time on GC and hence the job will be slow. Either,

 1. We should allocate enough memory to cache all RDD and hence the job
will complete fast
Or 2. Don't use cache when there is not enough Executor memory.

  To check the GC time, use  --conf
"spark.executor.extraJavaOptions=-XX:+PrintGCDetails
-XX:+PrintGCTimeStamps" while submitting the job and SPARK_WORKER_DIR will
have sysout with GC.
The sysout will show many "Full GC" happening when cache is used and
executor does not have enough heap.


Thanks,
Prabhu Joseph

On Thu, Feb 4, 2016 at 11:25 AM, fightf...@163.com 
wrote:

> Hi,
>
> I want to make sure that the cache table indeed would accelerate sql
> queries. Here is one of my use case :
>   impala table size : 24.59GB,  no partitions, with about 1 billion+ rows.
> I use sqlContext.sql to run queries over this table and try to do cache
> and uncache command to see if there
> is any performance disparity. I ran the following query :
> select * from video1203 where id > 10 and id < 20 and added_year != 1989
> I can see the following results :
>
> 1  If I did not run cache table and just ran sqlContext.sql(), I can see
> the above query run about 25 seconds.
> 2  If I firstly run sqlContext.cacheTable("video1203"), the query runs
> super slow and would cause driver OOM exception, but I can
> get final results with about running 9 minuts.
>
> Would any expert can explain this for me ? I can see that cacheTable cause
> OOM just because the in-memory columnar storage
> cannot hold the 24.59GB+ table size into memory. But why the performance
> is so different and even so bad ?
>
> Best,
> Sun.
>
> --
> fightf...@163.com
>


About cache table performance in spark sql

2016-02-03 Thread fightf...@163.com
Hi, 

I want to make sure that the cache table indeed would accelerate sql queries. 
Here is one of my use case : 
  impala table size : 24.59GB,  no partitions, with about 1 billion+ rows.
I use sqlContext.sql to run queries over this table and try to do cache and 
uncache command to see if there
is any performance disparity. I ran the following query : select * from 
video1203 where id > 10 and id < 20 and added_year != 1989
I can see the following results : 

1  If I did not run cache table and just ran sqlContext.sql(), I can see the 
above query run about 25 seconds.
2  If I firstly run sqlContext.cacheTable("video1203"), the query runs super 
slow and would cause driver OOM exception, but I can 
get final results with about running 9 minuts. 

Would any expert can explain this for me ? I can see that cacheTable cause OOM 
just because the in-memory columnar storage 
cannot hold the 24.59GB+ table size into memory. But why the performance is so 
different and even so bad ? 

Best,
Sun.



fightf...@163.com


Is there a any plan to develop SPARK with c++??

2016-02-03 Thread DaeJin Jung
hello everyone,
I have a short question.

I would like to improve performance for SPARK framework using intel native
instruction or etc.. So, I wonder if there are any plans to develop SPARK
with C++ or C in the near future.

Please let me know if you have any informantion.

Best Regards,
Daejin Jung


Re: DataFrame First method is resulting different results in each iteration

2016-02-03 Thread Hemant Bhanawat
Ahh.. missed that.

I see that you have used "first" function. 'first' returns the first row it
has found. On a single executor it may return the right results. But, on
multiple executors, it will return the first row of any of the executor
which may not be the first row when the results are combined.

I believe, if you change your query like this, you will get the right
results:

ordrd_emp_df.groupBy("DeptNo").
agg($"DeptNo", max("Sal").as("HighestSal"))

But as you can see, you get the highest Sal and not the EmpId with highest
Sal. For getting EmpId with highest Sal, you will have to change your query
to add filters or add subqueries. See the following thread:

http://stackoverflow.com/questions/6841605/get-top-1-row-of-each-group

Hemant Bhanawat
SnappyData (http://snappydata.io/)


On Wed, Feb 3, 2016 at 4:33 PM, satish chandra j 
wrote:

> Hi Hemant,
> My dataframe "ordrd_emd_df" consist data in order as I have applied oderBy
> in the first step
> And also tried having "orderBy" method before "groupBy" than also getting
> different results in each iteration
>
> Regards,
> Satish Chandra
>
>
> On Wed, Feb 3, 2016 at 4:28 PM, Hemant Bhanawat 
> wrote:
>
>> Missing order by?
>>
>> Hemant Bhanawat
>> SnappyData (http://snappydata.io/)
>>
>>
>> On Wed, Feb 3, 2016 at 3:45 PM, satish chandra j <
>> jsatishchan...@gmail.com> wrote:
>>
>>> HI All,
>>> I have data in a emp_df (DataFrame) as mentioned below:
>>>
>>> EmpId   Sal   DeptNo
>>> 001   100   10
>>> 002   120   20
>>> 003   130   10
>>> 004   140   20
>>> 005   150   10
>>>
>>> ordrd_emp_df = emp_df.orderBy($"DeptNo",$"Sal".desc)  which results as
>>> below:
>>>
>>> DeptNo  Sal   EmpId
>>> 10 150   005
>>> 10 130   003
>>> 10 100   001
>>> 20 140   004
>>> 20 120   002
>>>
>>> Now I want to pick highest paid EmpId of each DeptNo.,hence applied agg
>>> First method as below
>>>
>>>
>>> ordrd_emp_df.groupBy("DeptNo").agg($"DeptNo",first("EmpId").as("TopSal")).select($"DeptNo",$"TopSal")
>>>
>>> Expected output is DeptNo  TopSal
>>>   10005
>>>20   004
>>> But my output varies for each iteration such as
>>>
>>> First Iteration results as  Dept  TopSal
>>>   10 003
>>>20 004
>>>
>>> Secnd Iteration results as Dept  TopSal
>>>   10 005
>>>   20 004
>>>
>>> Third Iteration results as  Dept  TopSal
>>>   10 003
>>>   20 002
>>>
>>> Not sure why output varies on each iteration as no change in code and
>>> values in DataFrame
>>>
>>> Please let me know if any inputs on this
>>>
>>> Regards,
>>> Satish Chandra J
>>>
>>
>>
>


Re: Re: clear cache using spark sql cli

2016-02-03 Thread fightf...@163.com
No. That is not my case. Actually I am running spark-sql , which is in 
spark-sql cli mode, and execute
sql queries against my hive tables. In spark-sql cli, there seems no exsiting 
sqlContext or sparkContext, 
only I can run some select/create/insert/delete operations. 

Best,
Sun.



fightf...@163.com
 
From: Ted Yu
Date: 2016-02-04 11:49
To: fightf...@163.com
CC: user
Subject: Re: Re: clear cache using spark sql cli
In spark-shell, I can do:

scala> sqlContext.clearCache()

Is that not the case for you ?

On Wed, Feb 3, 2016 at 7:35 PM, fightf...@163.com  wrote:
Hi, Ted
Yes. I had seen that issue. But it seems that in spark-sql cli cannot do 
command like :
   sqlContext.clearCache() 
Is this right ? In spark-sql cli I can only run some sql queries. So I want to 
see if there 
are any available options to reach this. 

Best,
Sun.



fightf...@163.com
 
From: Ted Yu
Date: 2016-02-04 11:22
To: fightf...@163.com
CC: user
Subject: Re: clear cache using spark sql cli
Have you looked at 
SPARK-5909 Add a clearCache command to Spark SQL's cache manager

On Wed, Feb 3, 2016 at 7:16 PM, fightf...@163.com  wrote:
Hi,
How could I clear cache (execute sql query without any cache) using spark sql 
cli ? 
Is there any command available ? 
Best,
Sun.



fightf...@163.com




RE: sparkR not able to create /append new columns

2016-02-03 Thread Sun, Rui
Devesh,

Note that DataFrame is immutable. withColumn returns a new DataFrame instead of 
adding a column in-pace to the DataFrame being operated.

So, you can modify the for loop like:

for (j in 1:lev)

{

   dummy.df.new<-withColumn(df,
   paste0(colnames(cat.column),j),
   ifelse(df$Species==levels(as.factor(unlist(cat.column)))[j],1,0) )

   df<-dummy.df.new
}

As you can see, withColumn supports adding only one column, it may be more 
convenient if withColumn supports adding multiple columns at once. There is a 
JIRA requesting such feature 
(https://issues.apache.org/jira/browse/SPARK-12225) which is still under 
discussion. If you desire this feature, you could comment on it.

From: Franc Carter [mailto:franc.car...@gmail.com]
Sent: Wednesday, February 3, 2016 7:40 PM
To: Devesh Raj Singh
Cc: user@spark.apache.org
Subject: Re: sparkR not able to create /append new columns


Yes, I didn't work out how to solve that - sorry


On 3 February 2016 at 22:37, Devesh Raj Singh 
mailto:raj.deves...@gmail.com>> wrote:
Hi,

but "withColumn" will only add once, if i want to add columns to the same 
dataframe in a loop it will keep overwriting the added column and in the end 
the last added column( in the loop) will be the added column. like in my code 
above.

On Wed, Feb 3, 2016 at 5:05 PM, Franc Carter 
mailto:franc.car...@gmail.com>> wrote:

I had problems doing this as well - I ended up using 'withColumn', it's not 
particularly graceful but it worked (1.5.2 on AWS EMR)

cheerd

On 3 February 2016 at 22:06, Devesh Raj Singh 
mailto:raj.deves...@gmail.com>> wrote:
Hi,

i am trying to create dummy variables in sparkR by creating new columns for 
categorical variables. But it is not appending the columns


df <- createDataFrame(sqlContext, iris)
class(dtypes(df))

cat.column<-vector(mode="character",length=nrow(df))
cat.column<-collect(select(df,df$Species))
lev<-length(levels(as.factor(unlist(cat.column
varb.names<-vector(mode="character",length=lev)
for (i in 1:lev){

  varb.names[i]<-paste0(colnames(cat.column),i)

}

for (j in 1:lev)

{

   dummy.df.new<-withColumn(df,paste0(colnames
   (cat.column),j),if else(df$Species==levels(as.factor(un list(cat.column))
   [j],1,0) )

}

I am getting the below output for

head(dummy.df.new)

output:

  Sepal_Length Sepal_Width Petal_Length Petal_Width Species Species1
1  5.1 3.5  1.4 0.2  setosa1
2  4.9 3.0  1.4 0.2  setosa1
3  4.7 3.2  1.3 0.2  setosa1
4  4.6 3.1  1.5 0.2  setosa1
5  5.0 3.6  1.4 0.2  setosa1
6  5.4 3.9  1.7 0.4  setosa1

Problem: Species2 and Species3 column are not getting added to the dataframe

--
Warm regards,
Devesh.



--
Franc



--
Warm regards,
Devesh.



--
Franc


Re: Re: clear cache using spark sql cli

2016-02-03 Thread Ted Yu
In spark-shell, I can do:

scala> sqlContext.clearCache()

Is that not the case for you ?

On Wed, Feb 3, 2016 at 7:35 PM, fightf...@163.com  wrote:

> Hi, Ted
> Yes. I had seen that issue. But it seems that in spark-sql cli cannot do
> command like :
>sqlContext.clearCache()
> Is this right ? In spark-sql cli I can only run some sql queries. So I
> want to see if there
> are any available options to reach this.
>
> Best,
> Sun.
>
> --
> fightf...@163.com
>
>
> *From:* Ted Yu 
> *Date:* 2016-02-04 11:22
> *To:* fightf...@163.com
> *CC:* user 
> *Subject:* Re: clear cache using spark sql cli
> Have you looked at
> SPARK-5909 Add a clearCache command to Spark SQL's cache manager
>
> On Wed, Feb 3, 2016 at 7:16 PM, fightf...@163.com 
> wrote:
>
>> Hi,
>> How could I clear cache (execute sql query without any cache) using spark
>> sql cli ?
>> Is there any command available ?
>> Best,
>> Sun.
>>
>> --
>> fightf...@163.com
>>
>
>


Re: Re: clear cache using spark sql cli

2016-02-03 Thread fightf...@163.com
Hi, Ted
Yes. I had seen that issue. But it seems that in spark-sql cli cannot do 
command like :
   sqlContext.clearCache() 
Is this right ? In spark-sql cli I can only run some sql queries. So I want to 
see if there 
are any available options to reach this. 

Best,
Sun.



fightf...@163.com
 
From: Ted Yu
Date: 2016-02-04 11:22
To: fightf...@163.com
CC: user
Subject: Re: clear cache using spark sql cli
Have you looked at 
SPARK-5909 Add a clearCache command to Spark SQL's cache manager

On Wed, Feb 3, 2016 at 7:16 PM, fightf...@163.com  wrote:
Hi,
How could I clear cache (execute sql query without any cache) using spark sql 
cli ? 
Is there any command available ? 
Best,
Sun.



fightf...@163.com



Re: clear cache using spark sql cli

2016-02-03 Thread Ted Yu
Have you looked at
SPARK-5909 Add a clearCache command to Spark SQL's cache manager

On Wed, Feb 3, 2016 at 7:16 PM, fightf...@163.com  wrote:

> Hi,
> How could I clear cache (execute sql query without any cache) using spark
> sql cli ?
> Is there any command available ?
> Best,
> Sun.
>
> --
> fightf...@163.com
>


clear cache using spark sql cli

2016-02-03 Thread fightf...@163.com
Hi,
How could I clear cache (execute sql query without any cache) using spark sql 
cli ? 
Is there any command available ? 
Best,
Sun.



fightf...@163.com


Re: How parquet file decide task number?

2016-02-03 Thread Gavin Yue
Found the answer. It is the block size.

Thanks.

On Wed, Feb 3, 2016 at 5:05 PM, Gavin Yue  wrote:

> I am doing a simple count like:
>
> sqlContext.read.parquet("path").count
>
> I have only 5000 parquet files.  But generate over 2 tasks.
>
> Each parquet file is converted from one gz text file.
>
> Please give some advice.
>
> Thanks
>
>
>
>


Re: [External] Re: Spark 1.6.0 HiveContext NPE

2016-02-03 Thread Ted Yu
Create a pull request:
https://github.com/apache/spark/pull/11066

FYI

On Wed, Feb 3, 2016 at 1:27 PM, Shipper, Jay [USA] 
wrote:

> It was just renamed recently: https://github.com/apache/spark/pull/10981
>
> As SessionState is entirely managed by Spark’s code, it still seems like
> this is a bug with Spark 1.6.0, and not with how our application is using
> HiveContext.  But I’d feel more confident filing a bug if someone else
> could confirm they’re having this issue with Spark 1.6.0.  Ideally, we
> should also have some simple proof of concept that can be posted with the
> bug.
>
> From: Ted Yu 
> Date: Wednesday, February 3, 2016 at 3:57 PM
> To: Jay Shipper 
> Cc: "user@spark.apache.org" 
> Subject: Re: [External] Re: Spark 1.6.0 HiveContext NPE
>
> In ClientWrapper.scala, the SessionState.get().getConf call might have
> been executed ahead of SessionState.start(state) at line 194.
>
> This was the JIRA:
>
> [SPARK-10810] [SPARK-10902] [SQL] Improve session management in SQL
>
> In master branch, there is no more ClientWrapper.scala
>
> FYI
>
> On Wed, Feb 3, 2016 at 11:15 AM, Shipper, Jay [USA] 
> wrote:
>
>> One quick update on this: The NPE is not happening with Spark 1.5.2, so
>> this problem seems specific to Spark 1.6.0.
>>
>> From: Jay Shipper 
>> Date: Wednesday, February 3, 2016 at 12:06 PM
>> To: "user@spark.apache.org" 
>> Subject: Re: [External] Re: Spark 1.6.0 HiveContext NPE
>>
>> Right, I could already tell that from the stack trace and looking at
>> Spark’s code.  What I’m trying to determine is why that’s coming back as
>> null now, just from upgrading Spark to 1.6.0.
>>
>> From: Ted Yu 
>> Date: Wednesday, February 3, 2016 at 12:04 PM
>> To: Jay Shipper 
>> Cc: "user@spark.apache.org" 
>> Subject: [External] Re: Spark 1.6.0 HiveContext NPE
>>
>> Looks like the NPE came from this line:
>>   def conf: HiveConf = SessionState.get().getConf
>>
>> Meaning SessionState.get() returned null.
>>
>> On Wed, Feb 3, 2016 at 8:33 AM, Shipper, Jay [USA] 
>> wrote:
>>
>>> I’m upgrading an application from Spark 1.4.1 to Spark 1.6.0, and I’m
>>> getting a NullPointerException from HiveContext.  It’s happening while it
>>> tries to load some tables via JDBC from an external database (not Hive),
>>> using context.read().jdbc():
>>>
>>> —
>>> java.lang.NullPointerException
>>> at
>>> org.apache.spark.sql.hive.client.ClientWrapper.conf(ClientWrapper.scala:205)
>>> at
>>> org.apache.spark.sql.hive.HiveContext.hiveconf$lzycompute(HiveContext.scala:552)
>>> at org.apache.spark.sql.hive.HiveContext.hiveconf(HiveContext.scala:551)
>>> at
>>> org.apache.spark.sql.hive.HiveContext$$anonfun$configure$1.apply(HiveContext.scala:538)
>>> at
>>> org.apache.spark.sql.hive.HiveContext$$anonfun$configure$1.apply(HiveContext.scala:537)
>>> at
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>> at
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>> at scala.collection.immutable.List.foreach(List.scala:318)
>>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>>> at org.apache.spark.sql.hive.HiveContext.configure(HiveContext.scala:537)
>>> at
>>> org.apache.spark.sql.hive.HiveContext.metadataHive$lzycompute(HiveContext.scala:250)
>>> at
>>> org.apache.spark.sql.hive.HiveContext.metadataHive(HiveContext.scala:237)
>>> at
>>> org.apache.spark.sql.hive.HiveContext$$anon$2.(HiveContext.scala:457)
>>> at
>>> org.apache.spark.sql.hive.HiveContext.catalog$lzycompute(HiveContext.scala:457)
>>> at org.apache.spark.sql.hive.HiveContext.catalog(HiveContext.scala:456)
>>> at
>>> org.apache.spark.sql.hive.HiveContext$$anon$3.(HiveContext.scala:473)
>>> at
>>> org.apache.spark.sql.hive.HiveContext.analyzer$lzycompute(HiveContext.scala:473)
>>> at org.apache.spark.sql.hive.HiveContext.analyzer(HiveContext.scala:472)
>>> at
>>> org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:34)
>>> at org.apache.spark.sql.DataFrame.(DataFrame.scala:133)
>>> at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52)
>>> at
>>> org.apache.spark.sql.SQLContext.baseRelationToDataFrame(SQLContext.scala:442)
>>> at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:223)
>>> at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:146)
>>> —
>>>
>>> Even though the application is not using Hive, HiveContext is used
>>> instead of SQLContext, for the additional functionality it provides.
>>> There’s no hive-site.xml for the application, but this did not cause an
>>> issue for Spark 1.4.1.
>>>
>>> Does anyone have an idea about what’s changed from 1.4.1 to 1.6.0 that
>>> could explain this NPE?  The only obvious change I’ve noticed for
>>> HiveContext is that the default warehouse location is different (1.4.1 -
>>> current directory, 1.6.0 - /user/hive/warehouse), but I verified that this
>>> NPE happens even when /user/hive/warehouse e

How parquet file decide task number?

2016-02-03 Thread Gavin Yue
I am doing a simple count like:

sqlContext.read.parquet("path").count

I have only 5000 parquet files.  But generate over 2 tasks.

Each parquet file is converted from one gz text file.

Please give some advice.

Thanks


Re: spark metrics question

2016-02-03 Thread Yiannis Gkoufas
Hi Matt,

does the custom class you want to package reports metrics of each Executor?

Thanks

On 3 February 2016 at 15:56, Matt K  wrote:

> Thanks for sharing Yiannis, looks very promising!
>
> Do you know if I can package a custom class with my application, or does
> it have to be pre-deployed on all Executor nodes?
>
> On Wed, Feb 3, 2016 at 10:36 AM, Yiannis Gkoufas 
> wrote:
>
>> Hi Matt,
>>
>> there is some related work I recently did in IBM Research for visualizing
>> the metrics produced.
>> You can read about it here
>> http://www.spark.tc/sparkoscope-enabling-spark-optimization-through-cross-stack-monitoring-and-visualization-2/
>> We recently opensourced it if you are interested to have a deeper look to
>> it: https://github.com/ibm-research-ireland/sparkoscope
>>
>> Thanks,
>> Yiannis
>>
>> On 3 February 2016 at 13:32, Matt K  wrote:
>>
>>> Hi guys,
>>>
>>> I'm looking to create a custom sync based on Spark's Metrics System:
>>>
>>> https://github.com/apache/spark/blob/9f603fce78fcc997926e9a72dec44d48cbc396fc/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
>>>
>>> If I want to collect metrics from the Driver, Master, and Executor
>>> nodes, should the jar with the custom class be installed on Driver, Master,
>>> and Executor nodes?
>>>
>>> Also, on Executor nodes, does the MetricsSystem run inside the
>>> Executor's JVM?
>>>
>>> Thanks,
>>> -Matt
>>>
>>
>>
>
>
> --
> www.calcmachine.com - easy online calculator.
>


SparkOscope: Enabling Spark Optimization through Cross-stack Monitoring and Visualization

2016-02-03 Thread Yiannis Gkoufas
Hi all,

I recently sent to the dev mailing list about this contribution, but I
thought it might be useful to post it here, since I have seen a lot of
people asking about OS-level metrics of Spark. This is the result of the
work we have been doing recently in IBM Research around Spark.
Essentially, we have extended Spark metrics system to utilize Hyperic Sigar
library to capture OS-level metrics and modified the Web UI to visualize
those metrics per application.
The above functionalities can be configured in the metrics.properties and
spark-defaults.conf files.
We have recorded a small demo that shows those capabilities which you can
find here : https://ibm.app.box.com/s/vyaedlyb444a4zna1215c7puhxliqxdg
There is a blog post which gives more details on the functionality here:
www.spark.tc/sparkoscope-enabling-spark-optimization-through-cross-stack-monitoring-and-visualization-2/
and also there is a public repo where anyone can try it:
https://github.com/ibm-research-ireland/sparkoscope

Hope someone finds it useful!

Thanks a lot!
Yiannis


RE: Cassandra BEGIN BATCH

2016-02-03 Thread Mohammed Guller
Frank,
I don’t think so. Cassandra does not support transactions in the traditional 
sense. It is not an ACID compliant database.

Mohammed
Author: Big Data Analytics with 
Spark

From: Ted Yu [mailto:yuzhih...@gmail.com]
Sent: Wednesday, February 3, 2016 2:55 PM
To: FrankFlaherty
Cc: user
Subject: Re: Cassandra BEGIN BATCH

Seems you can find faster response on Cassandra Connector mailing list.

On Wed, Feb 3, 2016 at 1:45 PM, FrankFlaherty 
mailto:frank.flahe...@pega.com>> wrote:
Cassandra provides "BEGIN BATCH" and "APPLY BATCH" to perform atomic
execution of multiple statements as below:

BEGIN BATCH
  INSERT INTO "user_status_updates"
("username", "id", "body")
  VALUES(
'dave',
16e2f240-2afa-11e4-8069-5f98e903bf02,
'dave update 4'
);

  INSERT INTO "home_status_updates" (
"timeline_username",
"status_update_id",
"status_update_username",
"body")
  VALUES (
'alice',
16e2f240-2afa-11e4-8069-5f98e903bf02,
'dave',
'dave update 4'
  );
APPLY BATCH;

Is there a way to update two or more Cassandra tables atomically using the
Cassandra Connector from Spark?

Thanks,
Frank





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Cassandra-BEGIN-BATCH-tp26145.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.org



Re: Spark Streaming: My kafka receivers are not consuming in parallel

2016-02-03 Thread Jorge Rodriguez
Please ignore this question, as i've figured out what my problem was.

In the case that anyone else runs into something similar, the problem was
on the kafka side.  I was using the console producer to generate the
messages going into the kafka logs.  This producer will send all of the
messages to the same partition, unless you specify the "--new-producer"
parameter.

Thanks,
Jorge

On Wed, Feb 3, 2016 at 12:44 PM, Jorge Rodriguez 
wrote:

> Hello Spark users,
>
> We are setting up our fist bach of spark streaming pipelines.  And I am
> running into an issue which I am not sure how to resolve, but seems like
> should be fairly trivial.
>
> I am using receiver-mode Kafka consumer that comes with Spark, and running
> in standalone mode.  I've setup two receivers, which are consuming from a 4
> broker, 4 partition kafka topic.
>
> If you will look at the image below, you will see that* even though I
> have two receivers, only one of them ever consumes data at a time*.  I
> believe this to be my current bottleneck for scaling.
>
> What am I missing?
>
> To me, order of events consumed is not important.  I just want to optimize
> for maximum throughput.
>
>
> [image: Inline image 1]
>
> Thanks in advance for any help or tips!
>
> Jorge
>


Re: Cassandra BEGIN BATCH

2016-02-03 Thread Ted Yu
Seems you can find faster response on Cassandra Connector mailing list.

On Wed, Feb 3, 2016 at 1:45 PM, FrankFlaherty 
wrote:

> Cassandra provides "BEGIN BATCH" and "APPLY BATCH" to perform atomic
> execution of multiple statements as below:
>
> BEGIN BATCH
>   INSERT INTO "user_status_updates"
> ("username", "id", "body")
>   VALUES(
> 'dave',
> 16e2f240-2afa-11e4-8069-5f98e903bf02,
> 'dave update 4'
> );
>
>   INSERT INTO "home_status_updates" (
> "timeline_username",
> "status_update_id",
> "status_update_username",
> "body")
>   VALUES (
> 'alice',
> 16e2f240-2afa-11e4-8069-5f98e903bf02,
> 'dave',
> 'dave update 4'
>   );
> APPLY BATCH;
>
> Is there a way to update two or more Cassandra tables atomically using the
> Cassandra Connector from Spark?
>
> Thanks,
> Frank
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Cassandra-BEGIN-BATCH-tp26145.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark 1.5.2 memory error

2016-02-03 Thread Nirav Patel
Hi Jerry,

I agree code + framework goes hand in hand. I am totally in for tuning the
hack out of system as well.  Spark offers tremendous flexibility in that
regards. We have real time application that serves data in ms backed by
spark rdds. It took lot of testing and tuning effort before we could reach
there. We love it here. But sometime when you can't find a solution for a
long time even with help of experts it gets to you. I am still working
towards solution for my job as well. I think I am on to something with
reducing number of cores per executor.

Regarding adapting code to 'bad' framework requires lot of rework and
framework should mention its limitation in first place via documentations.
That can help developer to make decision about framework it self whether
it's a right one for a job at hand or not.

Thanks

On Wed, Feb 3, 2016 at 2:39 PM, Ted Yu  wrote:

> There is also (deprecated) spark.storage.unrollFraction to consider
>
> On Wed, Feb 3, 2016 at 2:21 PM, Nirav Patel  wrote:
>
>> What I meant is executor.cores and task.cpus can dictate how many
>> parallel tasks will run on given executor.
>>
>> Let's take this example setting.
>>
>> spark.executor.memory = 16GB
>> spark.executor.cores = 6
>> spark.task.cpus = 1
>>
>> SO here I think spark will assign 6 tasks to One executor each using 1
>> core and 16/6=2.6GB.
>>
>> ANd out of those 2.6 gb some goes to shuffle and some goes to storage.
>>
>> spark.shuffle.memoryFraction = 0.4
>> spark.storage.memoryFraction = 0.6
>>
>> Again my speculation from some past articles I read.
>>
>>
>>
>>
>>
>>
>>
>>
>> On Wed, Feb 3, 2016 at 2:09 PM, Rishabh Wadhawan 
>> wrote:
>>
>>> As of what I know, Cores won’t give you more portion of executor memory,
>>> because its just cpu cores that you are using per executor. Reducing the
>>> number of cores however would result in lack of parallel processing power.
>>> The executor memory that we specify with spark.executor.memory would be the
>>> max memory that your executor might have. But the memory that you get is
>>> less then that. I don’t clearly remember but i think its either memory/2 or
>>> memory/4. But I may be wrong as I have been out of spark for months.
>>>
>>> On Feb 3, 2016, at 2:58 PM, Nirav Patel  wrote:
>>>
>>> About OP.
>>>
>>> How many cores you assign per executor? May be reducing that number will
>>> give more portion of executor memory to each task being executed on that
>>> executor. Others please comment if that make sense.
>>>
>>>
>>>
>>> On Wed, Feb 3, 2016 at 1:52 PM, Nirav Patel 
>>> wrote:
>>>
 I know it;s a strong word but when I have a case open for that with
 MapR and Databricks for a month and their only solution to change to
 DataFrame it frustrate you. I know DataFrame/Sql catalyst has internal
 optimizations but it requires lot of code change. I think there's something
 fundamentally wrong (or different from hadoop) in framework that is not
 allowing it to do robust memory management. I know my job is memory hogger,
 it does a groupBy and perform combinatorics in reducer side; uses
 additional datastructures at task levels. May be spark is running multiple
 heavier tasks on same executor and collectively they cause OOM. But
 suggesting DataFrame is NOT a Solution for me (and most others who already
 invested time with RDD and loves the type safety it provides). Not even
 sure if changing to DataFrame will for sure solve the issue.

 On Wed, Feb 3, 2016 at 1:33 PM, Mohammed Guller >>> > wrote:

> Nirav,
>
> Sorry to hear about your experience with Spark; however, sucks is a
> very strong word. Many organizations are processing a lot more than 150GB
> of data  with Spark.
>
>
>
> Mohammed
>
> Author: Big Data Analytics with Spark
> 
>
>
>
> *From:* Nirav Patel [mailto:npa...@xactlycorp.com]
> *Sent:* Wednesday, February 3, 2016 11:31 AM
> *To:* Stefan Panayotov
> *Cc:* Jim Green; Ted Yu; Jakob Odersky; user@spark.apache.org
>
> *Subject:* Re: Spark 1.5.2 memory error
>
>
>
> Hi Stefan,
>
>
>
> Welcome to the OOM - heap space club. I have been struggling with
> similar errors (OOM and yarn executor being killed) and failing job or
> sending it in retry loops. I bet the same job will run perfectly fine with
> less resource on Hadoop MapReduce program. I have tested it for my program
> and it does work.
>
>
>
> Bottomline from my experience. Spark sucks with memory management when
> job is processing large (not huge) amount of data. It's failing for me 
> with
> 16gb executors, 10 executors, 6 threads each. And data its processing is
> only 150GB! It's 1 billion rows for me. Same job works perfectly fine with
> 1 million rows.
>
>
>
> Hope that saves you some trouble.

Re: Spark 1.5.2 memory error

2016-02-03 Thread Ted Yu
There is also (deprecated) spark.storage.unrollFraction to consider

On Wed, Feb 3, 2016 at 2:21 PM, Nirav Patel  wrote:

> What I meant is executor.cores and task.cpus can dictate how many parallel
> tasks will run on given executor.
>
> Let's take this example setting.
>
> spark.executor.memory = 16GB
> spark.executor.cores = 6
> spark.task.cpus = 1
>
> SO here I think spark will assign 6 tasks to One executor each using 1
> core and 16/6=2.6GB.
>
> ANd out of those 2.6 gb some goes to shuffle and some goes to storage.
>
> spark.shuffle.memoryFraction = 0.4
> spark.storage.memoryFraction = 0.6
>
> Again my speculation from some past articles I read.
>
>
>
>
>
>
>
>
> On Wed, Feb 3, 2016 at 2:09 PM, Rishabh Wadhawan 
> wrote:
>
>> As of what I know, Cores won’t give you more portion of executor memory,
>> because its just cpu cores that you are using per executor. Reducing the
>> number of cores however would result in lack of parallel processing power.
>> The executor memory that we specify with spark.executor.memory would be the
>> max memory that your executor might have. But the memory that you get is
>> less then that. I don’t clearly remember but i think its either memory/2 or
>> memory/4. But I may be wrong as I have been out of spark for months.
>>
>> On Feb 3, 2016, at 2:58 PM, Nirav Patel  wrote:
>>
>> About OP.
>>
>> How many cores you assign per executor? May be reducing that number will
>> give more portion of executor memory to each task being executed on that
>> executor. Others please comment if that make sense.
>>
>>
>>
>> On Wed, Feb 3, 2016 at 1:52 PM, Nirav Patel 
>> wrote:
>>
>>> I know it;s a strong word but when I have a case open for that with MapR
>>> and Databricks for a month and their only solution to change to DataFrame
>>> it frustrate you. I know DataFrame/Sql catalyst has internal optimizations
>>> but it requires lot of code change. I think there's something fundamentally
>>> wrong (or different from hadoop) in framework that is not allowing it to do
>>> robust memory management. I know my job is memory hogger, it does a groupBy
>>> and perform combinatorics in reducer side; uses additional datastructures
>>> at task levels. May be spark is running multiple heavier tasks on same
>>> executor and collectively they cause OOM. But suggesting DataFrame is NOT a
>>> Solution for me (and most others who already invested time with RDD and
>>> loves the type safety it provides). Not even sure if changing to DataFrame
>>> will for sure solve the issue.
>>>
>>> On Wed, Feb 3, 2016 at 1:33 PM, Mohammed Guller 
>>> wrote:
>>>
 Nirav,

 Sorry to hear about your experience with Spark; however, sucks is a
 very strong word. Many organizations are processing a lot more than 150GB
 of data  with Spark.



 Mohammed

 Author: Big Data Analytics with Spark
 



 *From:* Nirav Patel [mailto:npa...@xactlycorp.com]
 *Sent:* Wednesday, February 3, 2016 11:31 AM
 *To:* Stefan Panayotov
 *Cc:* Jim Green; Ted Yu; Jakob Odersky; user@spark.apache.org

 *Subject:* Re: Spark 1.5.2 memory error



 Hi Stefan,



 Welcome to the OOM - heap space club. I have been struggling with
 similar errors (OOM and yarn executor being killed) and failing job or
 sending it in retry loops. I bet the same job will run perfectly fine with
 less resource on Hadoop MapReduce program. I have tested it for my program
 and it does work.



 Bottomline from my experience. Spark sucks with memory management when
 job is processing large (not huge) amount of data. It's failing for me with
 16gb executors, 10 executors, 6 threads each. And data its processing is
 only 150GB! It's 1 billion rows for me. Same job works perfectly fine with
 1 million rows.



 Hope that saves you some trouble.



 Nirav







 On Wed, Feb 3, 2016 at 11:00 AM, Stefan Panayotov 
 wrote:

 I drastically increased the memory:

 spark.executor.memory = 50g
 spark.driver.memory = 8g
 spark.driver.maxResultSize = 8g
 spark.yarn.executor.memoryOverhead = 768

 I still see executors killed, but this time the memory does not seem to
 be the issue.
 The error on the Jupyter notebook is:


 Py4JJavaError: An error occurred while calling 
 z:org.apache.spark.api.python.PythonRDD.collectAndServe.

 : org.apache.spark.SparkException: Job aborted due to stage failure: 
 Exception while getting task result: java.io.IOException: Failed to 
 connect to /10.0.0.9:48755


 From nodemanagers log corresponding to worker 10.0.0.9:


 2016-02-03 17:31:44,917 INFO  yarn.YarnShuffleService
 (YarnShuffleService.java:initializeApplication(129)) - Initializing
 appl

Re: Spark 1.5.2 memory error

2016-02-03 Thread Jerry Lam
Hi guys,

I was processing 300GB data with lot of joins today. I have a combination
of RDD->Dataframe->RDD due to legacy code. I have memory issues at the
beginning. After fine-tuning those configurations that many already
suggested above, it works with 0 task failed. I think it is fair to say any
memory intensive applications would face similar memory issue. It is not
very fair to say it sucks just because it has memory issues. The memory
issue comes in many forms such as 1. bad framework 2. bad code. 3. bad
framework and bad code. I usually blame bad code first, then bad framework.
If it is truly it fails because of the bad framework (mesos+spark+fine
grain mode = disaster), then make the code changes to adapt to the bad
framework.

I never see code that can magically run with 100% completion when data is
close to terabyte without some serious engineering efforts. A framework can
only help a bit but you are still responsible for making conscious
decisions on how much memory and data you are working with. For instance, a
k-v pair with v having 100GB and you allocate 1GB per executor, this is
going to blow up no matter how many times you execute it.

The memory/core is what I fine tune most. Making sure the task/core has
enough memory to execute to completion. Some times you really don't know
how much data you keep in memory until you profile your application.
(calculate some statistics help).

Best Regards,

Jerry



On Wed, Feb 3, 2016 at 4:58 PM, Nirav Patel  wrote:

> About OP.
>
> How many cores you assign per executor? May be reducing that number will
> give more portion of executor memory to each task being executed on that
> executor. Others please comment if that make sense.
>
>
>
> On Wed, Feb 3, 2016 at 1:52 PM, Nirav Patel  wrote:
>
>> I know it;s a strong word but when I have a case open for that with MapR
>> and Databricks for a month and their only solution to change to DataFrame
>> it frustrate you. I know DataFrame/Sql catalyst has internal optimizations
>> but it requires lot of code change. I think there's something fundamentally
>> wrong (or different from hadoop) in framework that is not allowing it to do
>> robust memory management. I know my job is memory hogger, it does a groupBy
>> and perform combinatorics in reducer side; uses additional datastructures
>> at task levels. May be spark is running multiple heavier tasks on same
>> executor and collectively they cause OOM. But suggesting DataFrame is NOT a
>> Solution for me (and most others who already invested time with RDD and
>> loves the type safety it provides). Not even sure if changing to DataFrame
>> will for sure solve the issue.
>>
>> On Wed, Feb 3, 2016 at 1:33 PM, Mohammed Guller 
>> wrote:
>>
>>> Nirav,
>>>
>>> Sorry to hear about your experience with Spark; however, sucks is a very
>>> strong word. Many organizations are processing a lot more than 150GB of
>>> data  with Spark.
>>>
>>>
>>>
>>> Mohammed
>>>
>>> Author: Big Data Analytics with Spark
>>> 
>>>
>>>
>>>
>>> *From:* Nirav Patel [mailto:npa...@xactlycorp.com]
>>> *Sent:* Wednesday, February 3, 2016 11:31 AM
>>> *To:* Stefan Panayotov
>>> *Cc:* Jim Green; Ted Yu; Jakob Odersky; user@spark.apache.org
>>>
>>> *Subject:* Re: Spark 1.5.2 memory error
>>>
>>>
>>>
>>> Hi Stefan,
>>>
>>>
>>>
>>> Welcome to the OOM - heap space club. I have been struggling with
>>> similar errors (OOM and yarn executor being killed) and failing job or
>>> sending it in retry loops. I bet the same job will run perfectly fine with
>>> less resource on Hadoop MapReduce program. I have tested it for my program
>>> and it does work.
>>>
>>>
>>>
>>> Bottomline from my experience. Spark sucks with memory management when
>>> job is processing large (not huge) amount of data. It's failing for me with
>>> 16gb executors, 10 executors, 6 threads each. And data its processing is
>>> only 150GB! It's 1 billion rows for me. Same job works perfectly fine with
>>> 1 million rows.
>>>
>>>
>>>
>>> Hope that saves you some trouble.
>>>
>>>
>>>
>>> Nirav
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Wed, Feb 3, 2016 at 11:00 AM, Stefan Panayotov 
>>> wrote:
>>>
>>> I drastically increased the memory:
>>>
>>> spark.executor.memory = 50g
>>> spark.driver.memory = 8g
>>> spark.driver.maxResultSize = 8g
>>> spark.yarn.executor.memoryOverhead = 768
>>>
>>> I still see executors killed, but this time the memory does not seem to
>>> be the issue.
>>> The error on the Jupyter notebook is:
>>>
>>>
>>> Py4JJavaError: An error occurred while calling 
>>> z:org.apache.spark.api.python.PythonRDD.collectAndServe.
>>>
>>> : org.apache.spark.SparkException: Job aborted due to stage failure: 
>>> Exception while getting task result: java.io.IOException: Failed to connect 
>>> to /10.0.0.9:48755
>>>
>>>
>>> From nodemanagers log corresponding to worker 10.0.0.9:
>>>
>>>
>>> 2016-02-03 17:31:44,917 INFO  yarn.YarnShuffleService
>>> (YarnShuffleService.java:in

Re: Spark 1.5.2 memory error

2016-02-03 Thread Nirav Patel
What I meant is executor.cores and task.cpus can dictate how many parallel
tasks will run on given executor.

Let's take this example setting.

spark.executor.memory = 16GB
spark.executor.cores = 6
spark.task.cpus = 1

SO here I think spark will assign 6 tasks to One executor each using 1 core
and 16/6=2.6GB.

ANd out of those 2.6 gb some goes to shuffle and some goes to storage.

spark.shuffle.memoryFraction = 0.4
spark.storage.memoryFraction = 0.6

Again my speculation from some past articles I read.








On Wed, Feb 3, 2016 at 2:09 PM, Rishabh Wadhawan 
wrote:

> As of what I know, Cores won’t give you more portion of executor memory,
> because its just cpu cores that you are using per executor. Reducing the
> number of cores however would result in lack of parallel processing power.
> The executor memory that we specify with spark.executor.memory would be the
> max memory that your executor might have. But the memory that you get is
> less then that. I don’t clearly remember but i think its either memory/2 or
> memory/4. But I may be wrong as I have been out of spark for months.
>
> On Feb 3, 2016, at 2:58 PM, Nirav Patel  wrote:
>
> About OP.
>
> How many cores you assign per executor? May be reducing that number will
> give more portion of executor memory to each task being executed on that
> executor. Others please comment if that make sense.
>
>
>
> On Wed, Feb 3, 2016 at 1:52 PM, Nirav Patel  wrote:
>
>> I know it;s a strong word but when I have a case open for that with MapR
>> and Databricks for a month and their only solution to change to DataFrame
>> it frustrate you. I know DataFrame/Sql catalyst has internal optimizations
>> but it requires lot of code change. I think there's something fundamentally
>> wrong (or different from hadoop) in framework that is not allowing it to do
>> robust memory management. I know my job is memory hogger, it does a groupBy
>> and perform combinatorics in reducer side; uses additional datastructures
>> at task levels. May be spark is running multiple heavier tasks on same
>> executor and collectively they cause OOM. But suggesting DataFrame is NOT a
>> Solution for me (and most others who already invested time with RDD and
>> loves the type safety it provides). Not even sure if changing to DataFrame
>> will for sure solve the issue.
>>
>> On Wed, Feb 3, 2016 at 1:33 PM, Mohammed Guller 
>> wrote:
>>
>>> Nirav,
>>>
>>> Sorry to hear about your experience with Spark; however, sucks is a very
>>> strong word. Many organizations are processing a lot more than 150GB of
>>> data  with Spark.
>>>
>>>
>>>
>>> Mohammed
>>>
>>> Author: Big Data Analytics with Spark
>>> 
>>>
>>>
>>>
>>> *From:* Nirav Patel [mailto:npa...@xactlycorp.com]
>>> *Sent:* Wednesday, February 3, 2016 11:31 AM
>>> *To:* Stefan Panayotov
>>> *Cc:* Jim Green; Ted Yu; Jakob Odersky; user@spark.apache.org
>>>
>>> *Subject:* Re: Spark 1.5.2 memory error
>>>
>>>
>>>
>>> Hi Stefan,
>>>
>>>
>>>
>>> Welcome to the OOM - heap space club. I have been struggling with
>>> similar errors (OOM and yarn executor being killed) and failing job or
>>> sending it in retry loops. I bet the same job will run perfectly fine with
>>> less resource on Hadoop MapReduce program. I have tested it for my program
>>> and it does work.
>>>
>>>
>>>
>>> Bottomline from my experience. Spark sucks with memory management when
>>> job is processing large (not huge) amount of data. It's failing for me with
>>> 16gb executors, 10 executors, 6 threads each. And data its processing is
>>> only 150GB! It's 1 billion rows for me. Same job works perfectly fine with
>>> 1 million rows.
>>>
>>>
>>>
>>> Hope that saves you some trouble.
>>>
>>>
>>>
>>> Nirav
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Wed, Feb 3, 2016 at 11:00 AM, Stefan Panayotov 
>>> wrote:
>>>
>>> I drastically increased the memory:
>>>
>>> spark.executor.memory = 50g
>>> spark.driver.memory = 8g
>>> spark.driver.maxResultSize = 8g
>>> spark.yarn.executor.memoryOverhead = 768
>>>
>>> I still see executors killed, but this time the memory does not seem to
>>> be the issue.
>>> The error on the Jupyter notebook is:
>>>
>>>
>>> Py4JJavaError: An error occurred while calling 
>>> z:org.apache.spark.api.python.PythonRDD.collectAndServe.
>>>
>>> : org.apache.spark.SparkException: Job aborted due to stage failure: 
>>> Exception while getting task result: java.io.IOException: Failed to connect 
>>> to /10.0.0.9:48755
>>>
>>>
>>> From nodemanagers log corresponding to worker 10.0.0.9:
>>>
>>>
>>> 2016-02-03 17:31:44,917 INFO  yarn.YarnShuffleService
>>> (YarnShuffleService.java:initializeApplication(129)) - Initializing
>>> application application_1454509557526_0014
>>>
>>>
>>>
>>> 2016-02-03 17:31:44,918 INFO  container.ContainerImpl
>>> (ContainerImpl.java:handle(1131)) - Container
>>> container_1454509557526_0014_01_93 transitioned from LOCALIZING to
>>> LOCALIZED
>>>
>>>
>>>
>>> 2016-02-03 17:31:44,947 I

Re: Spark 1.5.2 memory error

2016-02-03 Thread Rishabh Wadhawan
As of what I know, Cores won’t give you more portion of executor memory, 
because its just cpu cores that you are using per executor. Reducing the number 
of cores however would result in lack of parallel processing power. The 
executor memory that we specify with spark.executor.memory would be the max 
memory that your executor might have. But the memory that you get is less then 
that. I don’t clearly remember but i think its either memory/2 or memory/4. But 
I may be wrong as I have been out of spark for months. 
> On Feb 3, 2016, at 2:58 PM, Nirav Patel  wrote:
> 
> About OP.
> 
> How many cores you assign per executor? May be reducing that number will give 
> more portion of executor memory to each task being executed on that executor. 
> Others please comment if that make sense.
> 
> 
> 
> On Wed, Feb 3, 2016 at 1:52 PM, Nirav Patel  > wrote:
> I know it;s a strong word but when I have a case open for that with MapR and 
> Databricks for a month and their only solution to change to DataFrame it 
> frustrate you. I know DataFrame/Sql catalyst has internal optimizations but 
> it requires lot of code change. I think there's something fundamentally wrong 
> (or different from hadoop) in framework that is not allowing it to do robust 
> memory management. I know my job is memory hogger, it does a groupBy and 
> perform combinatorics in reducer side; uses additional datastructures at task 
> levels. May be spark is running multiple heavier tasks on same executor and 
> collectively they cause OOM. But suggesting DataFrame is NOT a Solution for 
> me (and most others who already invested time with RDD and loves the type 
> safety it provides). Not even sure if changing to DataFrame will for sure 
> solve the issue. 
> 
> On Wed, Feb 3, 2016 at 1:33 PM, Mohammed Guller  > wrote:
> Nirav,
> 
> Sorry to hear about your experience with Spark; however, sucks is a very 
> strong word. Many organizations are processing a lot more than 150GB of data  
> with Spark.
> 
>  
> 
> Mohammed
> 
> Author: Big Data Analytics with Spark 
> 
>  
> 
> From: Nirav Patel [mailto:npa...@xactlycorp.com 
> ] 
> Sent: Wednesday, February 3, 2016 11:31 AM
> To: Stefan Panayotov
> Cc: Jim Green; Ted Yu; Jakob Odersky; user@spark.apache.org 
> 
> 
> Subject: Re: Spark 1.5.2 memory error
> 
>  
> 
> Hi Stefan,
> 
>  
> 
> Welcome to the OOM - heap space club. I have been struggling with similar 
> errors (OOM and yarn executor being killed) and failing job or sending it in 
> retry loops. I bet the same job will run perfectly fine with less resource on 
> Hadoop MapReduce program. I have tested it for my program and it does work.
> 
>  
> 
> Bottomline from my experience. Spark sucks with memory management when job is 
> processing large (not huge) amount of data. It's failing for me with 16gb 
> executors, 10 executors, 6 threads each. And data its processing is only 
> 150GB! It's 1 billion rows for me. Same job works perfectly fine with 1 
> million rows. 
> 
>  
> 
> Hope that saves you some trouble.
> 
>  
> 
> Nirav
> 
>  
> 
>  
> 
>  
> 
> On Wed, Feb 3, 2016 at 11:00 AM, Stefan Panayotov  > wrote:
> 
> I drastically increased the memory:
>  
> spark.executor.memory = 50g
> spark.driver.memory = 8g
> spark.driver.maxResultSize = 8g
> spark.yarn.executor.memoryOverhead = 768
>  
> I still see executors killed, but this time the memory does not seem to be 
> the issue.
> The error on the Jupyter notebook is:
>  
> 
> 
> Py4JJavaError: An error occurred while calling 
> z:org.apache.spark.api.python.PythonRDD.collectAndServe.
> : org.apache.spark.SparkException: Job aborted due to stage failure: 
> Exception while getting task result: java.io.IOException: Failed to connect 
> to /10.0.0.9:48755 
>  
> From nodemanagers log corresponding to worker 10.0.0.9 :
>  
> 
> 2016-02-03 17:31:44,917 INFO  yarn.YarnShuffleService 
> (YarnShuffleService.java:initializeApplication(129)) - Initializing 
> application application_1454509557526_0014
> 
>  
> 
> 2016-02-03 17:31:44,918 INFO  container.ContainerImpl 
> (ContainerImpl.java:handle(1131)) - Container 
> container_1454509557526_0014_01_93 transitioned from LOCALIZING to 
> LOCALIZED
> 
>  
> 
> 2016-02-03 17:31:44,947 INFO  container.ContainerImpl 
> (ContainerImpl.java:handle(1131)) - Container 
> container_1454509557526_0014_01_93 transitioned from LOCALIZED to RUNNING
> 
>  
> 
> 2016-02-03 17:31:44,951 INFO  nodemanager.DefaultContainerExecutor 
> (DefaultContainerExecutor.java:buildCommandExecutor(267)) - launchContainer: 
> [bash, 
> /mnt/resource/hadoop/yarn/local/usercache/root/appcache/application_1454509557526_0014/container_1454509557526_0014_01_93/default_container_executor.sh]
> 
>  
> 
> 2016-02-0

Nearest neighbors in Spark with Annoy

2016-02-03 Thread apu mishra . rr
As mllib doesn't have nearest-neighbors functionality, I'm trying to use
Annoy  for Approximate Nearest Neighbors.
I try to broadcast the Annoy object and pass it to workers; however, it
does not operate as expected.

Below is complete code for reproducibility. The problem is highlighted in
the difference seen when using Annoy with vs without Spark.

from annoy import AnnoyIndex
import random
random.seed(42)

f = 40
t = AnnoyIndex(f)  # Length of item vector that will be indexed
allvectors = []
for i in xrange(20):
v = [random.gauss(0, 1) for z in xrange(f)]
t.add_item(i, v)
allvectors.append((i, v))
t.build(10) # 10 trees

# Use Annoy with Spark
sparkvectors = sc.parallelize(allvectors)
bct = sc.broadcast(t)
x = sparkvectors.map(lambda x: bct.value.get_nns_by_vector(vector=x[1],
n=5))
print "Five closest neighbors for first vector with Spark:",
print x.first()

# Use Annoy without Spark
print "Five closest neighbors for first vector without Spark:",
print(t.get_nns_by_vector(vector=allvectors[0][1], n=5))


Output seen:

Five closest neighbors for first vector with Spark: None

Five closest neighbors for first vector without Spark: [0, 13, 12, 6, 4]


Re: Spark DataFrame Catalyst - Another Oracle like query optimizer?

2016-02-03 Thread Michael Armbrust
On Wed, Feb 3, 2016 at 1:42 PM, Nirav Patel  wrote:

> Awesome! I just read design docs. That is EXACTLY what I was talking
> about! Looking forward to it!
>

Great :)

Most of the API is there in 1.6.  For the next release I would like to
unify DataFrame <-> Dataset and do a lot of work on performance.  Fair
warning, 1.6 there are cases where Datasets are slower than RDDs, but we
are putting a lot off effort into improving that.


Re: Spark 1.5.2 memory error

2016-02-03 Thread Nirav Patel
About OP.

How many cores you assign per executor? May be reducing that number will
give more portion of executor memory to each task being executed on that
executor. Others please comment if that make sense.



On Wed, Feb 3, 2016 at 1:52 PM, Nirav Patel  wrote:

> I know it;s a strong word but when I have a case open for that with MapR
> and Databricks for a month and their only solution to change to DataFrame
> it frustrate you. I know DataFrame/Sql catalyst has internal optimizations
> but it requires lot of code change. I think there's something fundamentally
> wrong (or different from hadoop) in framework that is not allowing it to do
> robust memory management. I know my job is memory hogger, it does a groupBy
> and perform combinatorics in reducer side; uses additional datastructures
> at task levels. May be spark is running multiple heavier tasks on same
> executor and collectively they cause OOM. But suggesting DataFrame is NOT a
> Solution for me (and most others who already invested time with RDD and
> loves the type safety it provides). Not even sure if changing to DataFrame
> will for sure solve the issue.
>
> On Wed, Feb 3, 2016 at 1:33 PM, Mohammed Guller 
> wrote:
>
>> Nirav,
>>
>> Sorry to hear about your experience with Spark; however, sucks is a very
>> strong word. Many organizations are processing a lot more than 150GB of
>> data  with Spark.
>>
>>
>>
>> Mohammed
>>
>> Author: Big Data Analytics with Spark
>> 
>>
>>
>>
>> *From:* Nirav Patel [mailto:npa...@xactlycorp.com]
>> *Sent:* Wednesday, February 3, 2016 11:31 AM
>> *To:* Stefan Panayotov
>> *Cc:* Jim Green; Ted Yu; Jakob Odersky; user@spark.apache.org
>>
>> *Subject:* Re: Spark 1.5.2 memory error
>>
>>
>>
>> Hi Stefan,
>>
>>
>>
>> Welcome to the OOM - heap space club. I have been struggling with similar
>> errors (OOM and yarn executor being killed) and failing job or sending it
>> in retry loops. I bet the same job will run perfectly fine with less
>> resource on Hadoop MapReduce program. I have tested it for my program and
>> it does work.
>>
>>
>>
>> Bottomline from my experience. Spark sucks with memory management when
>> job is processing large (not huge) amount of data. It's failing for me with
>> 16gb executors, 10 executors, 6 threads each. And data its processing is
>> only 150GB! It's 1 billion rows for me. Same job works perfectly fine with
>> 1 million rows.
>>
>>
>>
>> Hope that saves you some trouble.
>>
>>
>>
>> Nirav
>>
>>
>>
>>
>>
>>
>>
>> On Wed, Feb 3, 2016 at 11:00 AM, Stefan Panayotov 
>> wrote:
>>
>> I drastically increased the memory:
>>
>> spark.executor.memory = 50g
>> spark.driver.memory = 8g
>> spark.driver.maxResultSize = 8g
>> spark.yarn.executor.memoryOverhead = 768
>>
>> I still see executors killed, but this time the memory does not seem to
>> be the issue.
>> The error on the Jupyter notebook is:
>>
>>
>> Py4JJavaError: An error occurred while calling 
>> z:org.apache.spark.api.python.PythonRDD.collectAndServe.
>>
>> : org.apache.spark.SparkException: Job aborted due to stage failure: 
>> Exception while getting task result: java.io.IOException: Failed to connect 
>> to /10.0.0.9:48755
>>
>>
>> From nodemanagers log corresponding to worker 10.0.0.9:
>>
>>
>> 2016-02-03 17:31:44,917 INFO  yarn.YarnShuffleService
>> (YarnShuffleService.java:initializeApplication(129)) - Initializing
>> application application_1454509557526_0014
>>
>>
>>
>> 2016-02-03 17:31:44,918 INFO  container.ContainerImpl
>> (ContainerImpl.java:handle(1131)) - Container
>> container_1454509557526_0014_01_93 transitioned from LOCALIZING to
>> LOCALIZED
>>
>>
>>
>> 2016-02-03 17:31:44,947 INFO  container.ContainerImpl
>> (ContainerImpl.java:handle(1131)) - Container
>> container_1454509557526_0014_01_93 transitioned from LOCALIZED to
>> RUNNING
>>
>>
>>
>> 2016-02-03 17:31:44,951 INFO  nodemanager.DefaultContainerExecutor
>> (DefaultContainerExecutor.java:buildCommandExecutor(267)) -
>> launchContainer: [bash,
>> /mnt/resource/hadoop/yarn/local/usercache/root/appcache/application_1454509557526_0014/container_1454509557526_0014_01_93/default_container_executor.sh]
>>
>>
>>
>> 2016-02-03 17:31:45,686 INFO  monitor.ContainersMonitorImpl
>> (ContainersMonitorImpl.java:run(371)) - Starting resource-monitoring for
>> container_1454509557526_0014_01_93
>>
>>
>>
>> 2016-02-03 17:31:45,686 INFO  monitor.ContainersMonitorImpl
>> (ContainersMonitorImpl.java:run(385)) - Stopping resource-monitoring for
>> container_1454509557526_0014_01_11
>>
>>
>>
>>
>>
>>
>>
>> Then I can see the memory usage increasing from 230.6 MB to 12.6 GB,
>> which is far below 50g, and the suddenly getting killed!?!
>>
>>
>>
>>
>>
>>
>>
>> 2016-02-03 17:33:17,350 INFO  monitor.ContainersMonitorImpl
>> (ContainersMonitorImpl.java:run(458)) - Memory usage of ProcessTree 30962
>> for container-id container_1454509557526_0014_01_93: 12.6 GB of 51 GB
>> physical memory u

Re: Spark 1.5.2 memory error

2016-02-03 Thread Nirav Patel
I know it;s a strong word but when I have a case open for that with MapR
and Databricks for a month and their only solution to change to DataFrame
it frustrate you. I know DataFrame/Sql catalyst has internal optimizations
but it requires lot of code change. I think there's something fundamentally
wrong (or different from hadoop) in framework that is not allowing it to do
robust memory management. I know my job is memory hogger, it does a groupBy
and perform combinatorics in reducer side; uses additional datastructures
at task levels. May be spark is running multiple heavier tasks on same
executor and collectively they cause OOM. But suggesting DataFrame is NOT a
Solution for me (and most others who already invested time with RDD and
loves the type safety it provides). Not even sure if changing to DataFrame
will for sure solve the issue.

On Wed, Feb 3, 2016 at 1:33 PM, Mohammed Guller 
wrote:

> Nirav,
>
> Sorry to hear about your experience with Spark; however, sucks is a very
> strong word. Many organizations are processing a lot more than 150GB of
> data  with Spark.
>
>
>
> Mohammed
>
> Author: Big Data Analytics with Spark
> 
>
>
>
> *From:* Nirav Patel [mailto:npa...@xactlycorp.com]
> *Sent:* Wednesday, February 3, 2016 11:31 AM
> *To:* Stefan Panayotov
> *Cc:* Jim Green; Ted Yu; Jakob Odersky; user@spark.apache.org
>
> *Subject:* Re: Spark 1.5.2 memory error
>
>
>
> Hi Stefan,
>
>
>
> Welcome to the OOM - heap space club. I have been struggling with similar
> errors (OOM and yarn executor being killed) and failing job or sending it
> in retry loops. I bet the same job will run perfectly fine with less
> resource on Hadoop MapReduce program. I have tested it for my program and
> it does work.
>
>
>
> Bottomline from my experience. Spark sucks with memory management when job
> is processing large (not huge) amount of data. It's failing for me with
> 16gb executors, 10 executors, 6 threads each. And data its processing is
> only 150GB! It's 1 billion rows for me. Same job works perfectly fine with
> 1 million rows.
>
>
>
> Hope that saves you some trouble.
>
>
>
> Nirav
>
>
>
>
>
>
>
> On Wed, Feb 3, 2016 at 11:00 AM, Stefan Panayotov 
> wrote:
>
> I drastically increased the memory:
>
> spark.executor.memory = 50g
> spark.driver.memory = 8g
> spark.driver.maxResultSize = 8g
> spark.yarn.executor.memoryOverhead = 768
>
> I still see executors killed, but this time the memory does not seem to be
> the issue.
> The error on the Jupyter notebook is:
>
>
> Py4JJavaError: An error occurred while calling 
> z:org.apache.spark.api.python.PythonRDD.collectAndServe.
>
> : org.apache.spark.SparkException: Job aborted due to stage failure: 
> Exception while getting task result: java.io.IOException: Failed to connect 
> to /10.0.0.9:48755
>
>
> From nodemanagers log corresponding to worker 10.0.0.9:
>
>
> 2016-02-03 17:31:44,917 INFO  yarn.YarnShuffleService
> (YarnShuffleService.java:initializeApplication(129)) - Initializing
> application application_1454509557526_0014
>
>
>
> 2016-02-03 17:31:44,918 INFO  container.ContainerImpl
> (ContainerImpl.java:handle(1131)) - Container
> container_1454509557526_0014_01_93 transitioned from LOCALIZING to
> LOCALIZED
>
>
>
> 2016-02-03 17:31:44,947 INFO  container.ContainerImpl
> (ContainerImpl.java:handle(1131)) - Container
> container_1454509557526_0014_01_93 transitioned from LOCALIZED to
> RUNNING
>
>
>
> 2016-02-03 17:31:44,951 INFO  nodemanager.DefaultContainerExecutor
> (DefaultContainerExecutor.java:buildCommandExecutor(267)) -
> launchContainer: [bash,
> /mnt/resource/hadoop/yarn/local/usercache/root/appcache/application_1454509557526_0014/container_1454509557526_0014_01_93/default_container_executor.sh]
>
>
>
> 2016-02-03 17:31:45,686 INFO  monitor.ContainersMonitorImpl
> (ContainersMonitorImpl.java:run(371)) - Starting resource-monitoring for
> container_1454509557526_0014_01_93
>
>
>
> 2016-02-03 17:31:45,686 INFO  monitor.ContainersMonitorImpl
> (ContainersMonitorImpl.java:run(385)) - Stopping resource-monitoring for
> container_1454509557526_0014_01_11
>
>
>
>
>
>
>
> Then I can see the memory usage increasing from 230.6 MB to 12.6 GB, which
> is far below 50g, and the suddenly getting killed!?!
>
>
>
>
>
>
>
> 2016-02-03 17:33:17,350 INFO  monitor.ContainersMonitorImpl
> (ContainersMonitorImpl.java:run(458)) - Memory usage of ProcessTree 30962
> for container-id container_1454509557526_0014_01_93: 12.6 GB of 51 GB
> physical memory used; 52.8 GB of 107.1 GB virtual memory used
>
>
>
> 2016-02-03 17:33:17,613 INFO  container.ContainerImpl
> (ContainerImpl.java:handle(1131)) - Container
> container_1454509557526_0014_01_93 transitioned from RUNNING to KILLING
>
>
>
> 2016-02-03 17:33:17,613 INFO  launcher.ContainerLaunch
> (ContainerLaunch.java:cleanupContainer(370)) - Cleaning up container
> container_1454509557526_0014_01_93
>
>
>
> 2016-02-03 17:33:

Cassandra BEGIN BATCH

2016-02-03 Thread FrankFlaherty
Cassandra provides "BEGIN BATCH" and "APPLY BATCH" to perform atomic
execution of multiple statements as below:

BEGIN BATCH
  INSERT INTO "user_status_updates"
("username", "id", "body")
  VALUES(
'dave',
16e2f240-2afa-11e4-8069-5f98e903bf02,
'dave update 4'
);

  INSERT INTO "home_status_updates" (
"timeline_username",
"status_update_id",
"status_update_username",
"body")
  VALUES (
'alice',
16e2f240-2afa-11e4-8069-5f98e903bf02,
'dave',
'dave update 4'
  );
APPLY BATCH;

Is there a way to update two or more Cassandra tables atomically using the
Cassandra Connector from Spark?

Thanks,
Frank





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Cassandra-BEGIN-BATCH-tp26145.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark DataFrame Catalyst - Another Oracle like query optimizer?

2016-02-03 Thread Nirav Patel
Awesome! I just read design docs. That is EXACTLY what I was talking about!
Looking forward to it!

Thanks

On Wed, Feb 3, 2016 at 7:40 AM, Koert Kuipers  wrote:

> yeah there was some discussion about adding them to RDD, but it would
> break a lot. so Dataset was born.
>
> yes it seems Dataset will be the new RDD for most use cases. but i dont
> think its there yet. just keep an eye out on SPARK- for updates...
>
> On Wed, Feb 3, 2016 at 8:51 AM, Jerry Lam  wrote:
>
>> Hi Nirav,
>>
>> I don't know why those optimizations are not implemented in RDD. It is
>> either a political choice or a practical one (backward compatibility might
>> be difficult if they need to introduce these types of optimization into
>> RDD). I think this is the reasons spark now has Dataset. My understanding
>> is that Dataset is the new RDD.
>>
>>
>> Best Regards,
>>
>> Jerry
>>
>> Sent from my iPhone
>>
>> On 3 Feb, 2016, at 12:26 am, Koert Kuipers  wrote:
>>
>> with respect to joins, unfortunately not all implementations are
>> available. for example i would like to use joins where one side is
>> streaming (and the other cached). this seems to be available for DataFrame
>> but not for RDD.
>>
>> On Wed, Feb 3, 2016 at 12:19 AM, Nirav Patel 
>> wrote:
>>
>>> Hi Jerry,
>>>
>>> Yes I read that benchmark. And doesn't help in most cases. I'll give you
>>> example of one of our application. It's a memory hogger by nature since it
>>> works on groupByKey and performs combinatorics on Iterator. So it maintain
>>> few structures inside task. It works on mapreduce with half the resources I
>>> am giving it for spark and Spark keeps throwing OOM on a pre-step which is
>>> a simple join! I saw every task was done at process_local locality still
>>> join keeps failing due to container being killed. and container gets killed
>>> due to oom.  We have a case with Databricks/Mapr on that for more then a
>>> month. anyway don't wanna distract there. I can believe that changing to
>>> DataFrame and it's computing model can bring performance but I was hoping
>>> that wouldn't be your answer to every performance problem.
>>>
>>> Let me ask this - If I decide to stick with RDD do I still have
>>> flexibility to choose what Join implementation I can use? And similar
>>> underlaying construct to best execute my jobs.
>>>
>>> I said error prone because you need to write column qualifiers instead
>>> of referencing fields. i.e. 'caseObj("field1")' instead of
>>> 'caseObj.field1'; more over multiple tables having similar column names
>>> causing parsing issues; and when you start writing constants for your
>>> columns it just become another schema maintenance inside your app. It feels
>>> like thing of past. Query engine(distributed or not) is old school as I
>>> 'see' it :)
>>>
>>> Thanks for being patient.
>>> Nirav
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Feb 2, 2016 at 6:26 PM, Jerry Lam  wrote:
>>>
 Hi Nirav,
 I'm sure you read this?
 https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html

 There is a benchmark in the article to show that dataframe "can"
 outperform RDD implementation by 2 times. Of course, benchmarks can be
 "made". But from the code snippet you wrote, I "think" dataframe will
 choose between different join implementation based on the data statistics.

 I cannot comment on the beauty of it because "beauty is in the eye of
 the beholder" LOL
 Regarding the comment on error prone, can you say why you think it is
 the case? Relative to what other ways?

 Best Regards,

 Jerry


 On Tue, Feb 2, 2016 at 8:59 PM, Nirav Patel 
 wrote:

> I dont understand why one thinks RDD of case object doesn't have
> types(schema) ? If spark can convert RDD to DataFrame which means it
> understood the schema. SO then from that point why one has to use SQL
> features to do further processing? If all spark need for optimizations is
> schema then what this additional SQL features buys ? If there is a way to
> avoid SQL feature using DataFrame I don't mind it. But looks like I have 
> to
> convert all my existing transformation to things like
> df1.join(df2,df1('abc') == df2('abc'), 'left_outer') .. that's plain ugly
> and error prone in my opinion.
>
> On Tue, Feb 2, 2016 at 5:49 PM, Jerry Lam 
> wrote:
>
>> Hi Michael,
>>
>> Is there a section in the spark documentation demonstrate how to
>> serialize arbitrary objects in Dataframe? The last time I did was using
>> some User Defined Type (copy from VectorUDT).
>>
>> Best Regards,
>>
>> Jerry
>>
>> On Tue, Feb 2, 2016 at 8:46 PM, Michael Armbrust <
>> mich...@databricks.com> wrote:
>>
>>> A principal difference between RDDs and DataFrames/Datasets is that
 the latter have a schema associated to them. This means that they 
 support
 only certain

Re: Low latency queries much slower in 1.6.0

2016-02-03 Thread Rishabh Wadhawan
Hi Younes.
When you have multiple user connected to hive, or you have multiple 
applications trying to access a shared memory. My recommendation would be to 
store it to a off-heap rather then disk. Checkout this link and check RDD 
Persistence http://spark.apache.org/docs/latest/programming-guide.html 
. But you can also 
go for a disk only option and store to hdfs but this would cost you more IO/ 
access cost to again read the data for computation. Using thrift-server to 
cache is the same as using hdfs to store as cache as hive would also use hdfs 
to store it. Thanks
> On Feb 3, 2016, at 1:17 PM, Younes Naguib  
> wrote:
> 
> Hi all,
>  
> Since 1.6.0, low latency query are much slower now.
> This seems to be connected to the multi-user in the thrift-server.
> So on any newly created session, jobs are added to fill the session cache 
> with information related to the tables it queries.
> Here is the details for this job:
> load at LocalCache.java:3599
> org.apache.spark.sql.hive.HiveMetastoreCatalog$$anon$1.load(HiveMetastoreCatalog.scala:124)
> org.spark-project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
> org.spark-project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
> org.spark-project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
> org.spark-project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
> org.spark-project.guava.cache.LocalCache.get(LocalCache.java:4000)
> org.spark-project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
> org.spark-project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
> org.spark-project.guava.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4880)
> org.spark-project.guava.cache.LocalCache$LocalLoadingCache.apply(LocalCache.java:4898)
> org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:387)
> org.apache.spark.sql.hive.HiveContext$$anon$2.org 
> $apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(HiveContext.scala:457)
> org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:161)
> org.apache.spark.sql.hive.HiveContext$$anon$2.lookupRelation(HiveContext.scala:457)
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.scala:303)
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$9.applyOrElse(Analyzer.scala:315)
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$9.applyOrElse(Analyzer.scala:310)
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57)
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57)
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:53)
>  
> 
>  
> Any ways to cache this at thrift-server instead? So it’s reusable but all 
> sessions? Other than going back to single user ofcourseJ
>  
> Thanks,
> Younes



RE: Spark 1.5.2 memory error

2016-02-03 Thread Mohammed Guller
Nirav,
Sorry to hear about your experience with Spark; however, sucks is a very strong 
word. Many organizations are processing a lot more than 150GB of data  with 
Spark.

Mohammed
Author: Big Data Analytics with 
Spark

From: Nirav Patel [mailto:npa...@xactlycorp.com]
Sent: Wednesday, February 3, 2016 11:31 AM
To: Stefan Panayotov
Cc: Jim Green; Ted Yu; Jakob Odersky; user@spark.apache.org
Subject: Re: Spark 1.5.2 memory error

Hi Stefan,

Welcome to the OOM - heap space club. I have been struggling with similar 
errors (OOM and yarn executor being killed) and failing job or sending it in 
retry loops. I bet the same job will run perfectly fine with less resource on 
Hadoop MapReduce program. I have tested it for my program and it does work.

Bottomline from my experience. Spark sucks with memory management when job is 
processing large (not huge) amount of data. It's failing for me with 16gb 
executors, 10 executors, 6 threads each. And data its processing is only 150GB! 
It's 1 billion rows for me. Same job works perfectly fine with 1 million rows.

Hope that saves you some trouble.

Nirav



On Wed, Feb 3, 2016 at 11:00 AM, Stefan Panayotov 
mailto:spanayo...@msn.com>> wrote:
I drastically increased the memory:

spark.executor.memory = 50g
spark.driver.memory = 8g
spark.driver.maxResultSize = 8g
spark.yarn.executor.memoryOverhead = 768

I still see executors killed, but this time the memory does not seem to be the 
issue.
The error on the Jupyter notebook is:



Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.collectAndServe.

: org.apache.spark.SparkException: Job aborted due to stage failure: Exception 
while getting task result: java.io.IOException: Failed to connect to 
/10.0.0.9:48755

From nodemanagers log corresponding to worker 10.0.0.9:

2016-02-03 17:31:44,917 INFO  yarn.YarnShuffleService 
(YarnShuffleService.java:initializeApplication(129)) - Initializing application 
application_1454509557526_0014

2016-02-03 17:31:44,918 INFO  container.ContainerImpl 
(ContainerImpl.java:handle(1131)) - Container 
container_1454509557526_0014_01_93 transitioned from LOCALIZING to LOCALIZED

2016-02-03 17:31:44,947 INFO  container.ContainerImpl 
(ContainerImpl.java:handle(1131)) - Container 
container_1454509557526_0014_01_93 transitioned from LOCALIZED to RUNNING

2016-02-03 17:31:44,951 INFO  nodemanager.DefaultContainerExecutor 
(DefaultContainerExecutor.java:buildCommandExecutor(267)) - launchContainer: 
[bash, 
/mnt/resource/hadoop/yarn/local/usercache/root/appcache/application_1454509557526_0014/container_1454509557526_0014_01_93/default_container_executor.sh]

2016-02-03 17:31:45,686 INFO  monitor.ContainersMonitorImpl 
(ContainersMonitorImpl.java:run(371)) - Starting resource-monitoring for 
container_1454509557526_0014_01_93

2016-02-03 17:31:45,686 INFO  monitor.ContainersMonitorImpl 
(ContainersMonitorImpl.java:run(385)) - Stopping resource-monitoring for 
container_1454509557526_0014_01_11



Then I can see the memory usage increasing from 230.6 MB to 12.6 GB, which is 
far below 50g, and the suddenly getting killed!?!



2016-02-03 17:33:17,350 INFO  monitor.ContainersMonitorImpl 
(ContainersMonitorImpl.java:run(458)) - Memory usage of ProcessTree 30962 for 
container-id container_1454509557526_0014_01_93: 12.6 GB of 51 GB physical 
memory used; 52.8 GB of 107.1 GB virtual memory used

2016-02-03 17:33:17,613 INFO  container.ContainerImpl 
(ContainerImpl.java:handle(1131)) - Container 
container_1454509557526_0014_01_93 transitioned from RUNNING to KILLING

2016-02-03 17:33:17,613 INFO  launcher.ContainerLaunch 
(ContainerLaunch.java:cleanupContainer(370)) - Cleaning up container 
container_1454509557526_0014_01_93

2016-02-03 17:33:17,629 WARN  nodemanager.DefaultContainerExecutor 
(DefaultContainerExecutor.java:launchContainer(223)) - Exit code from container 
container_1454509557526_0014_01_93 is : 143

2016-02-03 17:33:17,667 INFO  container.ContainerImpl 
(ContainerImpl.java:handle(1131)) - Container 
container_1454509557526_0014_01_93 transitioned from KILLING to 
CONTAINER_CLEANEDUP_AFTER_KILL

2016-02-03 17:33:17,669 INFO  nodemanager.NMAuditLogger 
(NMAuditLogger.java:logSuccess(89)) - USER=root   OPERATION=Container 
Finished - KilledTARGET=ContainerImpl RESULT=SUCCESS   
APPID=application_1454509557526_0014 
CONTAINERID=container_1454509557526_0014_01_93

2016-02-03 17:33:17,670 INFO  container.ContainerImpl 
(ContainerImpl.java:handle(1131)) - Container 
container_1454509557526_0014_01_93 transitioned from 
CONTAINER_CLEANEDUP_AFTER_KILL to DONE

2016-02-03 17:33:17,670 INFO  application.ApplicationImpl 
(ApplicationImpl.java:transition(347)) - Removing 
container_1454509557526_0014_01_93 from application 
application_1454509557526_0014

2016-02-03 17:33:17,67

Re: [External] Re: Spark 1.6.0 HiveContext NPE

2016-02-03 Thread Shipper, Jay [USA]
It was just renamed recently: https://github.com/apache/spark/pull/10981

As SessionState is entirely managed by Spark’s code, it still seems like this 
is a bug with Spark 1.6.0, and not with how our application is using 
HiveContext.  But I’d feel more confident filing a bug if someone else could 
confirm they’re having this issue with Spark 1.6.0.  Ideally, we should also 
have some simple proof of concept that can be posted with the bug.

From: Ted Yu mailto:yuzhih...@gmail.com>>
Date: Wednesday, February 3, 2016 at 3:57 PM
To: Jay Shipper mailto:shipper_...@bah.com>>
Cc: "user@spark.apache.org" 
mailto:user@spark.apache.org>>
Subject: Re: [External] Re: Spark 1.6.0 HiveContext NPE

In ClientWrapper.scala, the SessionState.get().getConf call might have been 
executed ahead of SessionState.start(state) at line 194.

This was the JIRA:

[SPARK-10810] [SPARK-10902] [SQL] Improve session management in SQL

In master branch, there is no more ClientWrapper.scala

FYI

On Wed, Feb 3, 2016 at 11:15 AM, Shipper, Jay [USA] 
mailto:shipper_...@bah.com>> wrote:
One quick update on this: The NPE is not happening with Spark 1.5.2, so this 
problem seems specific to Spark 1.6.0.

From: Jay Shipper mailto:shipper_...@bah.com>>
Date: Wednesday, February 3, 2016 at 12:06 PM
To: "user@spark.apache.org" 
mailto:user@spark.apache.org>>
Subject: Re: [External] Re: Spark 1.6.0 HiveContext NPE

Right, I could already tell that from the stack trace and looking at Spark’s 
code.  What I’m trying to determine is why that’s coming back as null now, just 
from upgrading Spark to 1.6.0.

From: Ted Yu mailto:yuzhih...@gmail.com>>
Date: Wednesday, February 3, 2016 at 12:04 PM
To: Jay Shipper mailto:shipper_...@bah.com>>
Cc: "user@spark.apache.org" 
mailto:user@spark.apache.org>>
Subject: [External] Re: Spark 1.6.0 HiveContext NPE

Looks like the NPE came from this line:
  def conf: HiveConf = SessionState.get().getConf

Meaning SessionState.get() returned null.

On Wed, Feb 3, 2016 at 8:33 AM, Shipper, Jay [USA] 
mailto:shipper_...@bah.com>> wrote:
I’m upgrading an application from Spark 1.4.1 to Spark 1.6.0, and I’m getting a 
NullPointerException from HiveContext.  It’s happening while it tries to load 
some tables via JDBC from an external database (not Hive), using 
context.read().jdbc():

—
java.lang.NullPointerException
at org.apache.spark.sql.hive.client.ClientWrapper.conf(ClientWrapper.scala:205)
at 
org.apache.spark.sql.hive.HiveContext.hiveconf$lzycompute(HiveContext.scala:552)
at org.apache.spark.sql.hive.HiveContext.hiveconf(HiveContext.scala:551)
at 
org.apache.spark.sql.hive.HiveContext$$anonfun$configure$1.apply(HiveContext.scala:538)
at 
org.apache.spark.sql.hive.HiveContext$$anonfun$configure$1.apply(HiveContext.scala:537)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.hive.HiveContext.configure(HiveContext.scala:537)
at 
org.apache.spark.sql.hive.HiveContext.metadataHive$lzycompute(HiveContext.scala:250)
at org.apache.spark.sql.hive.HiveContext.metadataHive(HiveContext.scala:237)
at org.apache.spark.sql.hive.HiveContext$$anon$2.(HiveContext.scala:457)
at 
org.apache.spark.sql.hive.HiveContext.catalog$lzycompute(HiveContext.scala:457)
at org.apache.spark.sql.hive.HiveContext.catalog(HiveContext.scala:456)
at org.apache.spark.sql.hive.HiveContext$$anon$3.(HiveContext.scala:473)
at 
org.apache.spark.sql.hive.HiveContext.analyzer$lzycompute(HiveContext.scala:473)
at org.apache.spark.sql.hive.HiveContext.analyzer(HiveContext.scala:472)
at 
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:34)
at org.apache.spark.sql.DataFrame.(DataFrame.scala:133)
at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52)
at org.apache.spark.sql.SQLContext.baseRelationToDataFrame(SQLContext.scala:442)
at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:223)
at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:146)
—

Even though the application is not using Hive, HiveContext is used instead of 
SQLContext, for the additional functionality it provides.  There’s no 
hive-site.xml for the application, but this did not cause an issue for Spark 
1.4.1.

Does anyone have an idea about what’s changed from 1.4.1 to 1.6.0 that could 
explain this NPE?  The only obvious change I’ve noticed for HiveContext is that 
the default warehouse location is different (1.4.1 - current directory, 1.6.0 - 
/user/hive/warehouse), but I verified that this NPE happens even when 
/user/hive/warehouse exists and is readable/writeable for the application.  In 
terms of changes

Parquet StringType column readable as plain-text despite being Gzipped

2016-02-03 Thread Sung Hwan Chung
Hello,

We are using the default compression codec for Parquet when we store our
dataframes. The dataframe has a StringType column whose values can be upto
several MBs large.

The funny thing is that once it's stored, we can browse the file content
with a plain text editor and see large portions of the string contents
unencrypted.

If we use the parquet-tool to browse the metadata, it says the column is
GZIP and the compression ratio is 2.6x, but that just doesn't seem right.

Anybody know what's going on?


RE: spark-cassandra

2016-02-03 Thread Mohammed Guller
Another thing to check is what version of the Spark-Cassandra-Connector the 
Spark Job server passing to the workers. It looks like when you use 
Spark-submit, you are sending the correct SCC jar, but the Spark Job server may 
be using a different one.

Mohammed
Author: Big Data Analytics with 
Spark

From: Gerard Maas [mailto:gerard.m...@gmail.com]
Sent: Wednesday, February 3, 2016 4:56 AM
To: Madabhattula Rajesh Kumar
Cc: user@spark.apache.org
Subject: Re: spark-cassandra

NoSuchMethodError usually refers to a version conflict. Probably your job was 
built against a higher version of the cassandra connector than what's available 
on the run time.
Check that the versions are aligned.

-kr, Gerard.

On Wed, Feb 3, 2016 at 1:37 PM, Madabhattula Rajesh Kumar 
mailto:mrajaf...@gmail.com>> wrote:
Hi,
I am using Spark Jobserver to submit the jobs. I am using spark-cassandra 
connector to connect to Cassandra. I am getting below exception through spak 
jobserver.
If I submit the job through Spark-Submit command it is working fine,.
Please let me know how to solve this issue


Exception in thread "pool-1-thread-1" java.lang.NoSuchMethodError: 
com.datastax.driver.core.TableMetadata.getIndexes()Ljava/util/List;
at com.datastax.spark.connector.cql.Schema$.getIndexMap(Schema.scala:193)
at 
com.datastax.spark.connector.cql.Schema$.com$datastax$spark$connector$cql$Schema$$fetchPartitionKey(Schema.scala:197)
at 
com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchTables$1$2.apply(Schema.scala:239)
at 
com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchTables$1$2.apply(Schema.scala:238)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722)
at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153)
at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
at 
scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
at 
com.datastax.spark.connector.cql.Schema$.com$datastax$spark$connector$cql$Schema$$fetchTables$1(Schema.scala:238)
at 
com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchKeyspaces$1$2.apply(Schema.scala:247)
at 
com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchKeyspaces$1$2.apply(Schema.scala:246)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722)
at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153)
at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
at 
scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
at 
com.datastax.spark.connector.cql.Schema$.com$datastax$spark$connector$cql$Schema$$fetchKeyspaces$1(Schema.scala:246)
at 
com.datastax.spark.connector.cql.Schema$$anonfun$fromCassandra$1.apply(Schema.scala:252)
at 
com.datastax.spark.connector.cql.Schema$$anonfun$fromCassandra$1.apply(Schema.scala:249)
at 
com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withClusterDo$1.apply(CassandraConnector.scala:121)
at 
com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withClusterDo$1.apply(CassandraConnector.scala:120)
at 
com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:110)
at 
com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:109)
at 
com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:139)
at 
com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109)
at 
com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:120)
at com.datastax.spark.connector.cql.Schema$.fromCassandra(Schema.scala:249)
at 
com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:263)
at 
com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:36)
at 
com.cisco.ss.etl.utils.ETLHelper$class.persistBackupConfigDevicesData(ETLHelper.scala:79)
at com.cisco.ss.etl.Main$.persistBackupConfigDevicesData(Main.scala:13)
at 
com.cisco.ss.etl.utils.ETLHelper$class.persistByBacthes(ETLHelper.scala:43)
at com.cisco.ss.etl.Main$.persistByBacthes(Main.scala:13)
at com.cisco.ss.etl.Main$$anonfun$runJob$3.apply(Main.scala:48)
at com.cisco.ss.etl.Main$$anonfun$runJob$3.apply(Main.scala:45)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at com.cisco.ss.etl.Main$.runJob(Main.scala:45)
at com.cisco.ss.etl.Main$.runJob(Main.scala:13)
at 
spark.jobserver.JobManagerActor$$anonfun$spark$j

Re: [External] Re: Spark 1.6.0 HiveContext NPE

2016-02-03 Thread Ted Yu
In ClientWrapper.scala, the SessionState.get().getConf call might have been
executed ahead of SessionState.start(state) at line 194.

This was the JIRA:

[SPARK-10810] [SPARK-10902] [SQL] Improve session management in SQL

In master branch, there is no more ClientWrapper.scala

FYI

On Wed, Feb 3, 2016 at 11:15 AM, Shipper, Jay [USA] 
wrote:

> One quick update on this: The NPE is not happening with Spark 1.5.2, so
> this problem seems specific to Spark 1.6.0.
>
> From: Jay Shipper 
> Date: Wednesday, February 3, 2016 at 12:06 PM
> To: "user@spark.apache.org" 
> Subject: Re: [External] Re: Spark 1.6.0 HiveContext NPE
>
> Right, I could already tell that from the stack trace and looking at
> Spark’s code.  What I’m trying to determine is why that’s coming back as
> null now, just from upgrading Spark to 1.6.0.
>
> From: Ted Yu 
> Date: Wednesday, February 3, 2016 at 12:04 PM
> To: Jay Shipper 
> Cc: "user@spark.apache.org" 
> Subject: [External] Re: Spark 1.6.0 HiveContext NPE
>
> Looks like the NPE came from this line:
>   def conf: HiveConf = SessionState.get().getConf
>
> Meaning SessionState.get() returned null.
>
> On Wed, Feb 3, 2016 at 8:33 AM, Shipper, Jay [USA] 
> wrote:
>
>> I’m upgrading an application from Spark 1.4.1 to Spark 1.6.0, and I’m
>> getting a NullPointerException from HiveContext.  It’s happening while it
>> tries to load some tables via JDBC from an external database (not Hive),
>> using context.read().jdbc():
>>
>> —
>> java.lang.NullPointerException
>> at
>> org.apache.spark.sql.hive.client.ClientWrapper.conf(ClientWrapper.scala:205)
>> at
>> org.apache.spark.sql.hive.HiveContext.hiveconf$lzycompute(HiveContext.scala:552)
>> at org.apache.spark.sql.hive.HiveContext.hiveconf(HiveContext.scala:551)
>> at
>> org.apache.spark.sql.hive.HiveContext$$anonfun$configure$1.apply(HiveContext.scala:538)
>> at
>> org.apache.spark.sql.hive.HiveContext$$anonfun$configure$1.apply(HiveContext.scala:537)
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> at scala.collection.immutable.List.foreach(List.scala:318)
>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>> at org.apache.spark.sql.hive.HiveContext.configure(HiveContext.scala:537)
>> at
>> org.apache.spark.sql.hive.HiveContext.metadataHive$lzycompute(HiveContext.scala:250)
>> at
>> org.apache.spark.sql.hive.HiveContext.metadataHive(HiveContext.scala:237)
>> at
>> org.apache.spark.sql.hive.HiveContext$$anon$2.(HiveContext.scala:457)
>> at
>> org.apache.spark.sql.hive.HiveContext.catalog$lzycompute(HiveContext.scala:457)
>> at org.apache.spark.sql.hive.HiveContext.catalog(HiveContext.scala:456)
>> at
>> org.apache.spark.sql.hive.HiveContext$$anon$3.(HiveContext.scala:473)
>> at
>> org.apache.spark.sql.hive.HiveContext.analyzer$lzycompute(HiveContext.scala:473)
>> at org.apache.spark.sql.hive.HiveContext.analyzer(HiveContext.scala:472)
>> at
>> org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:34)
>> at org.apache.spark.sql.DataFrame.(DataFrame.scala:133)
>> at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52)
>> at
>> org.apache.spark.sql.SQLContext.baseRelationToDataFrame(SQLContext.scala:442)
>> at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:223)
>> at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:146)
>> —
>>
>> Even though the application is not using Hive, HiveContext is used
>> instead of SQLContext, for the additional functionality it provides.
>> There’s no hive-site.xml for the application, but this did not cause an
>> issue for Spark 1.4.1.
>>
>> Does anyone have an idea about what’s changed from 1.4.1 to 1.6.0 that
>> could explain this NPE?  The only obvious change I’ve noticed for
>> HiveContext is that the default warehouse location is different (1.4.1 -
>> current directory, 1.6.0 - /user/hive/warehouse), but I verified that this
>> NPE happens even when /user/hive/warehouse exists and is readable/writeable
>> for the application.  In terms of changes to the application to work with
>> Spark 1.6.0, the only one that might be relevant to this issue is the
>> upgrade in the Hadoop dependencies to match what Spark 1.6.0 uses
>> (2.6.0-cdh5.7.0-SNAPSHOT).
>>
>> Thanks,
>> Jay
>>
>
>


Spark Streaming: My kafka receivers are not consuming in parallel

2016-02-03 Thread Jorge Rodriguez
Hello Spark users,

We are setting up our fist bach of spark streaming pipelines.  And I am
running into an issue which I am not sure how to resolve, but seems like
should be fairly trivial.

I am using receiver-mode Kafka consumer that comes with Spark, and running
in standalone mode.  I've setup two receivers, which are consuming from a 4
broker, 4 partition kafka topic.

If you will look at the image below, you will see that* even though I have
two receivers, only one of them ever consumes data at a time*.  I believe
this to be my current bottleneck for scaling.

What am I missing?

To me, order of events consumed is not important.  I just want to optimize
for maximum throughput.


[image: Inline image 1]

Thanks in advance for any help or tips!

Jorge


Re: Spark 1.5.2 Yarn Application Master - resiliencey

2016-02-03 Thread Nirav Patel
Awesome! it looks promising. Thanks Rishabh and Marcelo.

On Wed, Feb 3, 2016 at 12:09 PM, Rishabh Wadhawan 
wrote:

> Check out this link
> http://spark.apache.org/docs/latest/configuration.html and check
> spark.shuffle.service. Thanks
>
> On Feb 3, 2016, at 1:02 PM, Marcelo Vanzin  wrote:
>
> Yes, but you don't necessarily need to use dynamic allocation (just enable
> the external shuffle service).
>
> On Wed, Feb 3, 2016 at 11:53 AM, Nirav Patel 
> wrote:
>
>> Do you mean this setup?
>>
>> https://spark.apache.org/docs/1.5.2/job-scheduling.html#dynamic-resource-allocation
>>
>>
>>
>> On Wed, Feb 3, 2016 at 11:50 AM, Marcelo Vanzin 
>> wrote:
>>
>>> Without the exact error from the driver that caused the job to restart,
>>> it's hard to tell. But a simple way to improve things is to install the
>>> Spark shuffle service on the YARN nodes, so that even if an executor
>>> crashes, its shuffle output is still available to other executors.
>>>
>>> On Wed, Feb 3, 2016 at 11:46 AM, Nirav Patel 
>>> wrote:
>>>
 Hi,

 I have a spark job running on yarn-client mode. At some point during
 Join stage, executor(container) runs out of memory and yarn kills it. Due
 to this Entire job restarts! and it keeps doing it on every failure?

 What is the best way to checkpoint? I see there's checkpoint api and
 other option might be to persist before Join stage. Would that prevent
 retry of entire job? How about just retrying only the task that was
 distributed to that faulty executor?

 Thanks



 [image: What's New with Xactly]
 

   [image: LinkedIn]
   [image: Twitter]
   [image: Facebook]
   [image: YouTube]
 
>>>
>>>
>>>
>>>
>>> --
>>> Marcelo
>>>
>>
>>
>>
>>
>> [image: What's New with Xactly] 
>>
>>   [image: LinkedIn]
>>   [image: Twitter]
>>   [image: Facebook]
>>   [image: YouTube]
>> 
>>
>
>
>
> --
> Marcelo
>
>
>

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Low latency queries much slower in 1.6.0

2016-02-03 Thread Younes Naguib
Hi all,

Since 1.6.0, low latency query are much slower now.
This seems to be connected to the multi-user in the thrift-server.
So on any newly created session, jobs are added to fill the session cache with 
information related to the tables it queries.
Here is the details for this job:
load at LocalCache.java:3599
org.apache.spark.sql.hive.HiveMetastoreCatalog$$anon$1.load(HiveMetastoreCatalog.scala:124)
org.spark-project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
org.spark-project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
org.spark-project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
org.spark-project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
org.spark-project.guava.cache.LocalCache.get(LocalCache.java:4000)
org.spark-project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
org.spark-project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
org.spark-project.guava.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4880)
org.spark-project.guava.cache.LocalCache$LocalLoadingCache.apply(LocalCache.java:4898)
org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:387)
org.apache.spark.sql.hive.HiveContext$$anon$2.org$apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(HiveContext.scala:457)
org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:161)
org.apache.spark.sql.hive.HiveContext$$anon$2.lookupRelation(HiveContext.scala:457)
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.scala:303)
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$9.applyOrElse(Analyzer.scala:315)
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$9.applyOrElse(Analyzer.scala:310)
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57)
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57)
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:53)

[cid:image001.png@01D15E95.EDDC7080]

Any ways to cache this at thrift-server instead? So it's reusable but all 
sessions? Other than going back to single user ofcourse:)

Thanks,
Younes



Re: Spark 1.5.2 Yarn Application Master - resiliencey

2016-02-03 Thread Rishabh Wadhawan
Check out this link  http://spark.apache.org/docs/latest/configuration.html 
 and check 
spark.shuffle.service. Thanks
> On Feb 3, 2016, at 1:02 PM, Marcelo Vanzin  wrote:
> 
> Yes, but you don't necessarily need to use dynamic allocation (just enable 
> the external shuffle service).
> 
> On Wed, Feb 3, 2016 at 11:53 AM, Nirav Patel  > wrote:
> Do you mean this setup?
> https://spark.apache.org/docs/1.5.2/job-scheduling.html#dynamic-resource-allocation
>  
> 
> 
> 
> 
> On Wed, Feb 3, 2016 at 11:50 AM, Marcelo Vanzin  > wrote:
> Without the exact error from the driver that caused the job to restart, it's 
> hard to tell. But a simple way to improve things is to install the Spark 
> shuffle service on the YARN nodes, so that even if an executor crashes, its 
> shuffle output is still available to other executors.
> 
> On Wed, Feb 3, 2016 at 11:46 AM, Nirav Patel  > wrote:
> Hi,
> 
> I have a spark job running on yarn-client mode. At some point during Join 
> stage, executor(container) runs out of memory and yarn kills it. Due to this 
> Entire job restarts! and it keeps doing it on every failure?
> 
> What is the best way to checkpoint? I see there's checkpoint api and other 
> option might be to persist before Join stage. Would that prevent retry of 
> entire job? How about just retrying only the task that was distributed to 
> that faulty executor? 
> 
> Thanks
> 
> 
> 
>  
> 
>     
>    
>       
> 
> 
> 
> -- 
> Marcelo
> 
> 
> 
> 
>  
> 
>     
>    
>       
> 
> 
> 
> -- 
> Marcelo



Re: Spark 1.5.2 Yarn Application Master - resiliencey

2016-02-03 Thread Rishabh Wadhawan
Hi Nirav
There is a difference between dynamic resource allocation and shuffle service. 
The dynamic allocation when you enable the configurations for it, every time 
you run any task spark will determine the number of executors required to run 
that task for you, which means decreasing the executors when task is simple and 
bumping more executors when task is complex. However, shuffle service would 
basically transfer the intermediate state during any transformation or a task 
execution to another executor if the current executor dies during the process. 
So even one of your executor dies the other active executor could take the 
intermediate state and start executing the process. 
> On Feb 3, 2016, at 1:02 PM, Marcelo Vanzin  wrote:
> 
> Yes, but you don't necessarily need to use dynamic allocation (just enable 
> the external shuffle service).
> 
> On Wed, Feb 3, 2016 at 11:53 AM, Nirav Patel  > wrote:
> Do you mean this setup?
> https://spark.apache.org/docs/1.5.2/job-scheduling.html#dynamic-resource-allocation
>  
> 
> 
> 
> 
> On Wed, Feb 3, 2016 at 11:50 AM, Marcelo Vanzin  > wrote:
> Without the exact error from the driver that caused the job to restart, it's 
> hard to tell. But a simple way to improve things is to install the Spark 
> shuffle service on the YARN nodes, so that even if an executor crashes, its 
> shuffle output is still available to other executors.
> 
> On Wed, Feb 3, 2016 at 11:46 AM, Nirav Patel  > wrote:
> Hi,
> 
> I have a spark job running on yarn-client mode. At some point during Join 
> stage, executor(container) runs out of memory and yarn kills it. Due to this 
> Entire job restarts! and it keeps doing it on every failure?
> 
> What is the best way to checkpoint? I see there's checkpoint api and other 
> option might be to persist before Join stage. Would that prevent retry of 
> entire job? How about just retrying only the task that was distributed to 
> that faulty executor? 
> 
> Thanks
> 
> 
> 
>  
> 
>     
>    
>       
> 
> 
> 
> -- 
> Marcelo
> 
> 
> 
> 
>  
> 
>     
>    
>       
> 
> 
> 
> -- 
> Marcelo



Re: Spark 1.5.2 Yarn Application Master - resiliencey

2016-02-03 Thread Marcelo Vanzin
Yes, but you don't necessarily need to use dynamic allocation (just enable
the external shuffle service).

On Wed, Feb 3, 2016 at 11:53 AM, Nirav Patel  wrote:

> Do you mean this setup?
>
> https://spark.apache.org/docs/1.5.2/job-scheduling.html#dynamic-resource-allocation
>
>
>
> On Wed, Feb 3, 2016 at 11:50 AM, Marcelo Vanzin 
> wrote:
>
>> Without the exact error from the driver that caused the job to restart,
>> it's hard to tell. But a simple way to improve things is to install the
>> Spark shuffle service on the YARN nodes, so that even if an executor
>> crashes, its shuffle output is still available to other executors.
>>
>> On Wed, Feb 3, 2016 at 11:46 AM, Nirav Patel 
>> wrote:
>>
>>> Hi,
>>>
>>> I have a spark job running on yarn-client mode. At some point during
>>> Join stage, executor(container) runs out of memory and yarn kills it. Due
>>> to this Entire job restarts! and it keeps doing it on every failure?
>>>
>>> What is the best way to checkpoint? I see there's checkpoint api and
>>> other option might be to persist before Join stage. Would that prevent
>>> retry of entire job? How about just retrying only the task that was
>>> distributed to that faulty executor?
>>>
>>> Thanks
>>>
>>>
>>>
>>> [image: What's New with Xactly] 
>>>
>>>   [image: LinkedIn]
>>>   [image: Twitter]
>>>   [image: Facebook]
>>>   [image: YouTube]
>>> 
>>
>>
>>
>>
>> --
>> Marcelo
>>
>
>
>
>
> [image: What's New with Xactly] 
>
>   [image: LinkedIn]
>   [image: Twitter]
>   [image: Facebook]
>   [image: YouTube]
> 
>



-- 
Marcelo


Re: Spark 1.5.2 memory error

2016-02-03 Thread Rishabh Wadhawan
Hi I suppose you are using —master yarn-client or yarn cluster. Can you try 
boosting spark.yarn.driver.memoryOverhead, override it to 0.15 *  executor 
memory rather then default 0.1. Check out this link 
https://spark.apache.org/docs/1.5.2/running-on-yarn.html 
. Also try adding 
this SPARK_REPL_OPTS="-XX:MaxPermSize=1g” to increase heap space permanent 
memory size too. One of the reasons for OOM Java Heap Space.

> On Feb 3, 2016, at 12:30 PM, Nirav Patel  wrote:
> 
> Hi Stefan,
> 
> Welcome to the OOM - heap space club. I have been struggling with similar 
> errors (OOM and yarn executor being killed) and failing job or sending it in 
> retry loops. I bet the same job will run perfectly fine with less resource on 
> Hadoop MapReduce program. I have tested it for my program and it does work.
> 
> Bottomline from my experience. Spark sucks with memory management when job is 
> processing large (not huge) amount of data. It's failing for me with 16gb 
> executors, 10 executors, 6 threads each. And data its processing is only 
> 150GB! It's 1 billion rows for me. Same job works perfectly fine with 1 
> million rows. 
> 
> Hope that saves you some trouble.
> 
> Nirav
> 
> 
> 
> On Wed, Feb 3, 2016 at 11:00 AM, Stefan Panayotov  > wrote:
> I drastically increased the memory:
>  
> spark.executor.memory = 50g
> spark.driver.memory = 8g
> spark.driver.maxResultSize = 8g
> spark.yarn.executor.memoryOverhead = 768
>  
> I still see executors killed, but this time the memory does not seem to be 
> the issue.
> The error on the Jupyter notebook is:
>  
> Py4JJavaError: An error occurred while calling 
> z:org.apache.spark.api.python.PythonRDD.collectAndServe.
> : org.apache.spark.SparkException: Job aborted due to stage failure: 
> Exception while getting task result: java.io.IOException: Failed to connect 
> to /10.0.0.9:48755  
> From nodemanagers log corresponding to worker 10.0.0.9 :
>  
> 
> 2016-02-03 17:31:44,917 INFO  yarn.YarnShuffleService 
> (YarnShuffleService.java:initializeApplication(129)) - Initializing 
> application application_1454509557526_0014
> 
> 2016-02-03 17:31:44,918 INFO  container.ContainerImpl 
> (ContainerImpl.java:handle(1131)) - Container 
> container_1454509557526_0014_01_93 transitioned from LOCALIZING to 
> LOCALIZED
> 
> 2016-02-03 17:31:44,947 INFO  container.ContainerImpl 
> (ContainerImpl.java:handle(1131)) - Container 
> container_1454509557526_0014_01_93 transitioned from LOCALIZED to RUNNING
> 
> 2016-02-03 17:31:44,951 INFO  nodemanager.DefaultContainerExecutor 
> (DefaultContainerExecutor.java:buildCommandExecutor(267)) - launchContainer: 
> [bash, 
> /mnt/resource/hadoop/yarn/local/usercache/root/appcache/application_1454509557526_0014/container_1454509557526_0014_01_93/default_container_executor.sh]
> 
> 2016-02-03 17:31:45,686 INFO  monitor.ContainersMonitorImpl 
> (ContainersMonitorImpl.java:run(371)) - Starting resource-monitoring for 
> container_1454509557526_0014_01_93
> 
> 2016-02-03 17:31:45,686 INFO  monitor.ContainersMonitorImpl 
> (ContainersMonitorImpl.java:run(385)) - Stopping resource-monitoring for 
> container_1454509557526_0014_01_11
> 
>  
> 
> Then I can see the memory usage increasing from 230.6 MB to 12.6 GB, which is 
> far below 50g, and the suddenly getting killed!?!
> 
>  
> 
> 2016-02-03 17:33:17,350 INFO  monitor.ContainersMonitorImpl 
> (ContainersMonitorImpl.java:run(458)) - Memory usage of ProcessTree 30962 for 
> container-id container_1454509557526_0014_01_93: 12.6 GB of 51 GB 
> physical memory used; 52.8 GB of 107.1 GB virtual memory used
> 
> 2016-02-03 17:33:17,613 INFO  container.ContainerImpl 
> (ContainerImpl.java:handle(1131)) - Container 
> container_1454509557526_0014_01_93 transitioned from RUNNING to KILLING
> 
> 2016-02-03 17:33:17,613 INFO  launcher.ContainerLaunch 
> (ContainerLaunch.java:cleanupContainer(370)) - Cleaning up container 
> container_1454509557526_0014_01_93
> 
> 2016-02-03 17:33:17,629 WARN  nodemanager.DefaultContainerExecutor 
> (DefaultContainerExecutor.java:launchContainer(223)) - Exit code from 
> container container_1454509557526_0014_01_93 is : 143
> 
> 2016-02-03 17:33:17,667 INFO  container.ContainerImpl 
> (ContainerImpl.java:handle(1131)) - Container 
> container_1454509557526_0014_01_93 transitioned from KILLING to 
> CONTAINER_CLEANEDUP_AFTER_KILL
> 
> 2016-02-03 17:33:17,669 INFO  nodemanager.NMAuditLogger 
> (NMAuditLogger.java:logSuccess(89)) - USER=root   OPERATION=Container 
> Finished - KilledTARGET=ContainerImpl RESULT=SUCCESS   
> APPID=application_1454509557526_0014 
> CONTAINERID=container_1454509557526_0014_01_93
> 
> 2016-02-03 17:33:17,670 INFO  container.ContainerImpl 
> (ContainerImpl.java:handle(1131)) - Container 
> container_1454509557526_0014_01_93 transitioned from 
> CONTAINER_CLEANEDUP

Re: Spark 1.5.2 Yarn Application Master - resiliencey

2016-02-03 Thread Nirav Patel
Do you mean this setup?
https://spark.apache.org/docs/1.5.2/job-scheduling.html#dynamic-resource-allocation



On Wed, Feb 3, 2016 at 11:50 AM, Marcelo Vanzin  wrote:

> Without the exact error from the driver that caused the job to restart,
> it's hard to tell. But a simple way to improve things is to install the
> Spark shuffle service on the YARN nodes, so that even if an executor
> crashes, its shuffle output is still available to other executors.
>
> On Wed, Feb 3, 2016 at 11:46 AM, Nirav Patel 
> wrote:
>
>> Hi,
>>
>> I have a spark job running on yarn-client mode. At some point during Join
>> stage, executor(container) runs out of memory and yarn kills it. Due to
>> this Entire job restarts! and it keeps doing it on every failure?
>>
>> What is the best way to checkpoint? I see there's checkpoint api and
>> other option might be to persist before Join stage. Would that prevent
>> retry of entire job? How about just retrying only the task that was
>> distributed to that faulty executor?
>>
>> Thanks
>>
>>
>>
>> [image: What's New with Xactly] 
>>
>>   [image: LinkedIn]
>>   [image: Twitter]
>>   [image: Facebook]
>>   [image: YouTube]
>> 
>
>
>
>
> --
> Marcelo
>

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Re: Spark 1.5.2 Yarn Application Master - resiliencey

2016-02-03 Thread Marcelo Vanzin
Without the exact error from the driver that caused the job to restart,
it's hard to tell. But a simple way to improve things is to install the
Spark shuffle service on the YARN nodes, so that even if an executor
crashes, its shuffle output is still available to other executors.

On Wed, Feb 3, 2016 at 11:46 AM, Nirav Patel  wrote:

> Hi,
>
> I have a spark job running on yarn-client mode. At some point during Join
> stage, executor(container) runs out of memory and yarn kills it. Due to
> this Entire job restarts! and it keeps doing it on every failure?
>
> What is the best way to checkpoint? I see there's checkpoint api and other
> option might be to persist before Join stage. Would that prevent retry of
> entire job? How about just retrying only the task that was distributed to
> that faulty executor?
>
> Thanks
>
>
>
> [image: What's New with Xactly] 
>
>   [image: LinkedIn]
>   [image: Twitter]
>   [image: Facebook]
>   [image: YouTube]
> 




-- 
Marcelo


Spark 1.5.2 Yarn Application Master - resiliencey

2016-02-03 Thread Nirav Patel
Hi,

I have a spark job running on yarn-client mode. At some point during Join
stage, executor(container) runs out of memory and yarn kills it. Due to
this Entire job restarts! and it keeps doing it on every failure?

What is the best way to checkpoint? I see there's checkpoint api and other
option might be to persist before Join stage. Would that prevent retry of
entire job? How about just retrying only the task that was distributed to
that faulty executor?

Thanks

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Re: Spark 1.5.2 memory error

2016-02-03 Thread Nirav Patel
Hi Stefan,

Welcome to the OOM - heap space club. I have been struggling with similar
errors (OOM and yarn executor being killed) and failing job or sending it
in retry loops. I bet the same job will run perfectly fine with less
resource on Hadoop MapReduce program. I have tested it for my program and
it does work.

Bottomline from my experience. Spark sucks with memory management when job
is processing large (not huge) amount of data. It's failing for me with
16gb executors, 10 executors, 6 threads each. And data its processing is
only 150GB! It's 1 billion rows for me. Same job works perfectly fine with
1 million rows.

Hope that saves you some trouble.

Nirav



On Wed, Feb 3, 2016 at 11:00 AM, Stefan Panayotov 
wrote:

> I drastically increased the memory:
>
> spark.executor.memory = 50g
> spark.driver.memory = 8g
> spark.driver.maxResultSize = 8g
> spark.yarn.executor.memoryOverhead = 768
>
> I still see executors killed, but this time the memory does not seem to be
> the issue.
> The error on the Jupyter notebook is:
>
>
> Py4JJavaError: An error occurred while calling 
> z:org.apache.spark.api.python.PythonRDD.collectAndServe.
> : org.apache.spark.SparkException: Job aborted due to stage failure: 
> Exception while getting task result: java.io.IOException: Failed to connect 
> to /10.0.0.9:48755
>
>
> From nodemanagers log corresponding to worker 10.0.0.9:
>
>
> 2016-02-03 17:31:44,917 INFO  yarn.YarnShuffleService
> (YarnShuffleService.java:initializeApplication(129)) - Initializing
> application application_1454509557526_0014
>
> 2016-02-03 17:31:44,918 INFO  container.ContainerImpl
> (ContainerImpl.java:handle(1131)) - Container
> container_1454509557526_0014_01_93 transitioned from LOCALIZING to
> LOCALIZED
>
> 2016-02-03 17:31:44,947 INFO  container.ContainerImpl
> (ContainerImpl.java:handle(1131)) - Container
> container_1454509557526_0014_01_93 transitioned from LOCALIZED to
> RUNNING
>
> 2016-02-03 17:31:44,951 INFO  nodemanager.DefaultContainerExecutor
> (DefaultContainerExecutor.java:buildCommandExecutor(267)) -
> launchContainer: [bash,
> /mnt/resource/hadoop/yarn/local/usercache/root/appcache/application_1454509557526_0014/container_1454509557526_0014_01_93/default_container_executor.sh]
>
> 2016-02-03 17:31:45,686 INFO  monitor.ContainersMonitorImpl
> (ContainersMonitorImpl.java:run(371)) - Starting resource-monitoring for
> container_1454509557526_0014_01_93
>
> 2016-02-03 17:31:45,686 INFO  monitor.ContainersMonitorImpl
> (ContainersMonitorImpl.java:run(385)) - Stopping resource-monitoring for
> container_1454509557526_0014_01_11
>
>
>
> Then I can see the memory usage increasing from 230.6 MB to 12.6 GB, which
> is far below 50g, and the suddenly getting killed!?!
>
>
>
> 2016-02-03 17:33:17,350 INFO  monitor.ContainersMonitorImpl
> (ContainersMonitorImpl.java:run(458)) - Memory usage of ProcessTree 30962
> for container-id container_1454509557526_0014_01_93: 12.6 GB of 51 GB
> physical memory used; 52.8 GB of 107.1 GB virtual memory used
>
> 2016-02-03 17:33:17,613 INFO  container.ContainerImpl
> (ContainerImpl.java:handle(1131)) - Container
> container_1454509557526_0014_01_93 transitioned from RUNNING to KILLING
>
> 2016-02-03 17:33:17,613 INFO  launcher.ContainerLaunch
> (ContainerLaunch.java:cleanupContainer(370)) - Cleaning up container
> container_1454509557526_0014_01_93
>
> 2016-02-03 17:33:17,629 WARN  nodemanager.DefaultContainerExecutor
> (DefaultContainerExecutor.java:launchContainer(223)) - Exit code from
> container container_1454509557526_0014_01_93 is : 143
>
> 2016-02-03 17:33:17,667 INFO  container.ContainerImpl
> (ContainerImpl.java:handle(1131)) - Container
> container_1454509557526_0014_01_93 transitioned from KILLING to
> CONTAINER_CLEANEDUP_AFTER_KILL
>
> 2016-02-03 17:33:17,669 INFO  nodemanager.NMAuditLogger
> (NMAuditLogger.java:logSuccess(89)) - USER=root   OPERATION=Container
> Finished - KilledTARGET=ContainerImpl RESULT=SUCCESS
> APPID=application_1454509557526_0014
> CONTAINERID=container_1454509557526_0014_01_93
>
> 2016-02-03 17:33:17,670 INFO  container.ContainerImpl
> (ContainerImpl.java:handle(1131)) - Container
> container_1454509557526_0014_01_93 transitioned from
> CONTAINER_CLEANEDUP_AFTER_KILL to DONE
>
> 2016-02-03 17:33:17,670 INFO  application.ApplicationImpl
> (ApplicationImpl.java:transition(347)) - Removing
> container_1454509557526_0014_01_93 from application
> application_1454509557526_0014
>
> 2016-02-03 17:33:17,671 INFO  logaggregation.AppLogAggregatorImpl
> (AppLogAggregatorImpl.java:startContainerLogAggregation(546)) - Considering
> container container_1454509557526_0014_01_93 for log-aggregation
>
> 2016-02-03 17:33:17,671 INFO  containermanager.AuxServices
> (AuxServices.java:handle(196)) - Got event CONTAINER_STOP for appId
> application_1454509557526_0014
>
> 2016-02-03 17:33:17,671 INFO  yarn.YarnShuffleService
> (YarnShuffleService.java:stopContainer(161)) - St

Re: Product similarity with TF/IDF and Cosine similarity (DIMSUM)

2016-02-03 Thread Karl Higley
Hi Alan,

I'm slow responding, so you may have already figured this out. Just in
case, though:

val approx = mat.columnSimilarities(0.1)
approxEntries.first()
res18: ((Long, Long), Double) = ((1638,966248),0.632455532033676)

The above is returning the cosine similarity between columns 1638 and
966248. Since you're providing documents as rows, this is conceptually
something like the similarity between terms based on which documents they
occur in.

In order to get the similarity between documents based on the terms they
contain, you'd need to build a RowMatrix where each row represents one term
and each column represents one document. One way to do that would be to
construct a CoordinateMatrix from your vectors, call transpose() on it,
then convert it to a RowMatrix via toRowMatrix().

Hope that helps!

Best,
Karl

On Sat, Jan 30, 2016 at 4:30 PM Alan Prando  wrote:

> Hi Folks!
>
> I am trying to implement a spark job to calculate the similarity of my
> database products, using only name and descriptions.
> I would like to use TF-IDF to represent my text data and cosine similarity
> to calculate all similarities.
>
> My goal is, after job completes, get all similarities as a list.
> For example:
> Prod1 = ((Prod2, 0.98), (Prod3, 0.88))
> Prod2 = ((Prod1, 0.98), (Prod4, 0.53))
> Prod3 = ((Prod1, 0.98))
> Prod4 = ((Prod1, 0.53))
>
> However, I am new with Spark and I am having issues to use understanding
> what cosine similarity returns!
>
> My code:
> val documents: RDD[Seq[String]] = sc.textFile(filename).map(_.split("
> ").toSeq)
>
> val hashingTF = new HashingTF()
> val tf: RDD[Vector] = hashingTF.transform(documents)
> tf.cache()
>
> val idf = new IDF(minDocFreq = 2).fit(tf)
> val tfidf: RDD[Vector] = idf.transform(tf)
>
> val mat = new RowMatrix(tfidf)
>
> // Compute similar columns perfectly, with brute force.
> val exact = mat.columnSimilarities()
>
> // Compute similar columns with estimation using DIMSUM
> val approx = mat.columnSimilarities(0.1)
>
> val exactEntries = exact.entries.map { case MatrixEntry(i, j, u) =>
> ((i, j), u) }
> val approxEntries = approx.entries.map { case MatrixEntry(i, j, v) =>
> ((i, j), v) }
>
> The file is just products name and description in each row.
>
> The return I got:
> approxEntries.first()
> res18: ((Long, Long), Double) = ((1638,966248),0.632455532033676)
>
> How can I figure out  what row this return is about?
>
> Thanks in advance! =]
>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: [External] Re: Spark 1.6.0 HiveContext NPE

2016-02-03 Thread Shipper, Jay [USA]
One quick update on this: The NPE is not happening with Spark 1.5.2, so this 
problem seems specific to Spark 1.6.0.

From: Jay Shipper mailto:shipper_...@bah.com>>
Date: Wednesday, February 3, 2016 at 12:06 PM
To: "user@spark.apache.org" 
mailto:user@spark.apache.org>>
Subject: Re: [External] Re: Spark 1.6.0 HiveContext NPE

Right, I could already tell that from the stack trace and looking at Spark’s 
code.  What I’m trying to determine is why that’s coming back as null now, just 
from upgrading Spark to 1.6.0.

From: Ted Yu mailto:yuzhih...@gmail.com>>
Date: Wednesday, February 3, 2016 at 12:04 PM
To: Jay Shipper mailto:shipper_...@bah.com>>
Cc: "user@spark.apache.org" 
mailto:user@spark.apache.org>>
Subject: [External] Re: Spark 1.6.0 HiveContext NPE

Looks like the NPE came from this line:
  def conf: HiveConf = SessionState.get().getConf

Meaning SessionState.get() returned null.

On Wed, Feb 3, 2016 at 8:33 AM, Shipper, Jay [USA] 
mailto:shipper_...@bah.com>> wrote:
I’m upgrading an application from Spark 1.4.1 to Spark 1.6.0, and I’m getting a 
NullPointerException from HiveContext.  It’s happening while it tries to load 
some tables via JDBC from an external database (not Hive), using 
context.read().jdbc():

—
java.lang.NullPointerException
at org.apache.spark.sql.hive.client.ClientWrapper.conf(ClientWrapper.scala:205)
at 
org.apache.spark.sql.hive.HiveContext.hiveconf$lzycompute(HiveContext.scala:552)
at org.apache.spark.sql.hive.HiveContext.hiveconf(HiveContext.scala:551)
at 
org.apache.spark.sql.hive.HiveContext$$anonfun$configure$1.apply(HiveContext.scala:538)
at 
org.apache.spark.sql.hive.HiveContext$$anonfun$configure$1.apply(HiveContext.scala:537)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.hive.HiveContext.configure(HiveContext.scala:537)
at 
org.apache.spark.sql.hive.HiveContext.metadataHive$lzycompute(HiveContext.scala:250)
at org.apache.spark.sql.hive.HiveContext.metadataHive(HiveContext.scala:237)
at org.apache.spark.sql.hive.HiveContext$$anon$2.(HiveContext.scala:457)
at 
org.apache.spark.sql.hive.HiveContext.catalog$lzycompute(HiveContext.scala:457)
at org.apache.spark.sql.hive.HiveContext.catalog(HiveContext.scala:456)
at org.apache.spark.sql.hive.HiveContext$$anon$3.(HiveContext.scala:473)
at 
org.apache.spark.sql.hive.HiveContext.analyzer$lzycompute(HiveContext.scala:473)
at org.apache.spark.sql.hive.HiveContext.analyzer(HiveContext.scala:472)
at 
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:34)
at org.apache.spark.sql.DataFrame.(DataFrame.scala:133)
at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52)
at org.apache.spark.sql.SQLContext.baseRelationToDataFrame(SQLContext.scala:442)
at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:223)
at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:146)
—

Even though the application is not using Hive, HiveContext is used instead of 
SQLContext, for the additional functionality it provides.  There’s no 
hive-site.xml for the application, but this did not cause an issue for Spark 
1.4.1.

Does anyone have an idea about what’s changed from 1.4.1 to 1.6.0 that could 
explain this NPE?  The only obvious change I’ve noticed for HiveContext is that 
the default warehouse location is different (1.4.1 - current directory, 1.6.0 - 
/user/hive/warehouse), but I verified that this NPE happens even when 
/user/hive/warehouse exists and is readable/writeable for the application.  In 
terms of changes to the application to work with Spark 1.6.0, the only one that 
might be relevant to this issue is the upgrade in the Hadoop dependencies to 
match what Spark 1.6.0 uses (2.6.0-cdh5.7.0-SNAPSHOT).

Thanks,
Jay



Connect to two different HDFS servers with different usernames

2016-02-03 Thread Wayne Song
Is there any way to get data from HDFS (e.g. with sc.textFile) with two
separate usernames in the same Spark job?  For instance, if I have a file on
hdfs-server-1.com and the alice user has permission to view it, and I have a
file on hdfs-server-2.com and the bob user has permission to view it, I'd
like to be able to do something like:



Is there any way to do something like this?  Or can Spark only connect to
HDFS with the same username that it's running as?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Connect-to-two-different-HDFS-servers-with-different-usernames-tp26143.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Spark 1.5.2 memory error

2016-02-03 Thread Stefan Panayotov
I drastically increased the memory:
 
spark.executor.memory = 50g
spark.driver.memory = 8g
spark.driver.maxResultSize = 8g
spark.yarn.executor.memoryOverhead = 768
 
I still see executors killed, but this time the memory does not seem to be the 
issue.
The error on the Jupyter notebook is:
 
Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Exception 
while getting task result: java.io.IOException: Failed to connect to 
/10.0.0.9:48755 
>From nodemanagers log corresponding to worker 10.0.0.9:
 



2016-02-03 17:31:44,917 INFO  yarn.YarnShuffleService
(YarnShuffleService.java:initializeApplication(129)) - Initializing application
application_1454509557526_0014


2016-02-03 17:31:44,918 INFO  container.ContainerImpl
(ContainerImpl.java:handle(1131)) - Container 
container_1454509557526_0014_01_93
transitioned from LOCALIZING to LOCALIZED


2016-02-03 17:31:44,947 INFO  container.ContainerImpl
(ContainerImpl.java:handle(1131)) - Container 
container_1454509557526_0014_01_93 transitioned from
LOCALIZED to RUNNING


2016-02-03 17:31:44,951 INFO 
nodemanager.DefaultContainerExecutor
(DefaultContainerExecutor.java:buildCommandExecutor(267)) - launchContainer:
[bash,
/mnt/resource/hadoop/yarn/local/usercache/root/appcache/application_1454509557526_0014/container_1454509557526_0014_01_93/default_container_executor.sh]


2016-02-03 17:31:45,686 INFO  monitor.ContainersMonitorImpl
(ContainersMonitorImpl.java:run(371)) - Starting resource-monitoring for
container_1454509557526_0014_01_93


2016-02-03 17:31:45,686 INFO  monitor.ContainersMonitorImpl
(ContainersMonitorImpl.java:run(385)) - Stopping resource-monitoring for
container_1454509557526_0014_01_11


 


Then I can see the memory usage
increasing from 230.6 MB to 12.6 GB, which is far below 50g, and the suddenly 
getting killed!?!


 


2016-02-03 17:33:17,350 INFO  monitor.ContainersMonitorImpl
(ContainersMonitorImpl.java:run(458)) - Memory usage of ProcessTree 30962 for
container-id container_1454509557526_0014_01_93: 12.6 GB of 51 GB physical
memory used; 52.8 GB of 107.1 GB virtual memory used


2016-02-03 17:33:17,613 INFO  container.ContainerImpl
(ContainerImpl.java:handle(1131)) - Container 
container_1454509557526_0014_01_93 transitioned from
RUNNING to KILLING


2016-02-03 17:33:17,613 INFO  launcher.ContainerLaunch
(ContainerLaunch.java:cleanupContainer(370)) - Cleaning up container
container_1454509557526_0014_01_93


2016-02-03 17:33:17,629 WARN 
nodemanager.DefaultContainerExecutor
(DefaultContainerExecutor.java:launchContainer(223)) - Exit code from container
container_1454509557526_0014_01_93 is : 143


2016-02-03 17:33:17,667 INFO  container.ContainerImpl
(ContainerImpl.java:handle(1131)) - Container
container_1454509557526_0014_01_93 transitioned from KILLING to
CONTAINER_CLEANEDUP_AFTER_KILL


2016-02-03 17:33:17,669 INFO  nodemanager.NMAuditLogger
(NMAuditLogger.java:logSuccess(89)) -
USER=root   OPERATION=Container Finished -
KilledTARGET=ContainerImpl
RESULT=SUCCESS  
APPID=application_1454509557526_0014
CONTAINERID=container_1454509557526_0014_01_93


2016-02-03 17:33:17,670 INFO  container.ContainerImpl
(ContainerImpl.java:handle(1131)) - Container
container_1454509557526_0014_01_93 transitioned from
CONTAINER_CLEANEDUP_AFTER_KILL to DONE


2016-02-03 17:33:17,670 INFO  application.ApplicationImpl
(ApplicationImpl.java:transition(347)) - Removing
container_1454509557526_0014_01_93 from application
application_1454509557526_0014


2016-02-03 17:33:17,671 INFO 
logaggregation.AppLogAggregatorImpl
(AppLogAggregatorImpl.java:startContainerLogAggregation(546)) - Considering
container container_1454509557526_0014_01_93 for log-aggregation


2016-02-03 17:33:17,671 INFO  containermanager.AuxServices
(AuxServices.java:handle(196)) - Got event CONTAINER_STOP for appId
application_1454509557526_0014


2016-02-03 17:33:17,671 INFO  yarn.YarnShuffleService
(YarnShuffleService.java:stopContainer(161)) - Stopping container
container_1454509557526_0014_01_93


2016-02-03 17:33:20,351 INFO  monitor.ContainersMonitorImpl
(ContainersMonitorImpl.java:run(385)) - Stopping resource-monitoring for
container_1454509557526_0014_01_93


2016-02-03 17:33:20,383 INFO  monitor.ContainersMonitorImpl
(ContainersMonitorImpl.java:run(458)) - Memory usage of ProcessTree 28727 for
container-id container_1454509557526_0012_01_01: 319.8 MB of 1.5 GB
physical memory used; 1.7 GB of 3.1 GB virtual memory used

2016-02-03
17:33:22,627 INFO  nodemanager.NodeStatusUpdaterImpl
(NodeStatusUpdaterImpl.java:removeOrTrackCompletedContainersFromContext(529)) -
Removed completed containers from NM context: 
[container_1454509557526_0014_01_93]
 
I'll appreciate any suggestions.

Thanks,


Stefan Panayotov, PhD 
Home: 610-355-0919 
Cell: 610-517-5586 
email: spanayo...@msn.com 
s

Re: Spark with SAS

2016-02-03 Thread Benjamin Kim
You can download the Spark ODBC Driver.

https://databricks.com/spark/odbc-driver-download


> On Feb 3, 2016, at 10:09 AM, Jörn Franke  wrote:
> 
> This could be done through odbc. Keep in mind that you can run SaS jobs 
> directly on a Hadoop cluster using the SaS embedded process engine or dump 
> some data to SaS lasr cluster, but you better ask SaS about this.
> 
>> On 03 Feb 2016, at 18:43, Sourav Mazumder  
>> wrote:
>> 
>> Hi,
>> 
>> Is anyone aware of any work going on for integrating Spark with SAS for 
>> executing queries in Spark?
>> 
>> For example calling Spark Jobs from SAS using Spark SQL through Spark SQL's 
>> JDBC/ODBC library.
>> 
>> Regards,
>> Sourav
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark with SAS

2016-02-03 Thread Jörn Franke
This could be done through odbc. Keep in mind that you can run SaS jobs 
directly on a Hadoop cluster using the SaS embedded process engine or dump some 
data to SaS lasr cluster, but you better ask SaS about this.

> On 03 Feb 2016, at 18:43, Sourav Mazumder  wrote:
> 
> Hi,
> 
> Is anyone aware of any work going on for integrating Spark with SAS for 
> executing queries in Spark?
> 
> For example calling Spark Jobs from SAS using Spark SQL through Spark SQL's 
> JDBC/ODBC library.
> 
> Regards,
> Sourav

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark Streaming - 1.6.0: mapWithState Kinesis huge memory usage

2016-02-03 Thread Udo Fholl
Hi all,

I recently migrated from 'updateStateByKey' to 'mapWithState' and now I see
a huge increase of memory. Most of it is a massive "BlockGenerator" (which
points to a massive "ArrayBlockingQueue" that in turns point to a huge
"Object[]").

I'm pretty sure it has to do with my code, but I barely changed anything in
the code. Just adapted the function.

Did anyone run into this?

Best regards,
Udo.


Spark with SAS

2016-02-03 Thread Sourav Mazumder
Hi,

Is anyone aware of any work going on for integrating Spark with SAS for
executing queries in Spark?

For example calling Spark Jobs from SAS using Spark SQL through Spark SQL's
JDBC/ODBC library.

Regards,
Sourav


Re: [External] Re: Spark 1.6.0 HiveContext NPE

2016-02-03 Thread Shipper, Jay [USA]
Right, I could already tell that from the stack trace and looking at Spark’s 
code.  What I’m trying to determine is why that’s coming back as null now, just 
from upgrading Spark to 1.6.0.

From: Ted Yu mailto:yuzhih...@gmail.com>>
Date: Wednesday, February 3, 2016 at 12:04 PM
To: Jay Shipper mailto:shipper_...@bah.com>>
Cc: "user@spark.apache.org" 
mailto:user@spark.apache.org>>
Subject: [External] Re: Spark 1.6.0 HiveContext NPE

Looks like the NPE came from this line:
  def conf: HiveConf = SessionState.get().getConf

Meaning SessionState.get() returned null.

On Wed, Feb 3, 2016 at 8:33 AM, Shipper, Jay [USA] 
mailto:shipper_...@bah.com>> wrote:
I’m upgrading an application from Spark 1.4.1 to Spark 1.6.0, and I’m getting a 
NullPointerException from HiveContext.  It’s happening while it tries to load 
some tables via JDBC from an external database (not Hive), using 
context.read().jdbc():

—
java.lang.NullPointerException
at org.apache.spark.sql.hive.client.ClientWrapper.conf(ClientWrapper.scala:205)
at 
org.apache.spark.sql.hive.HiveContext.hiveconf$lzycompute(HiveContext.scala:552)
at org.apache.spark.sql.hive.HiveContext.hiveconf(HiveContext.scala:551)
at 
org.apache.spark.sql.hive.HiveContext$$anonfun$configure$1.apply(HiveContext.scala:538)
at 
org.apache.spark.sql.hive.HiveContext$$anonfun$configure$1.apply(HiveContext.scala:537)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.hive.HiveContext.configure(HiveContext.scala:537)
at 
org.apache.spark.sql.hive.HiveContext.metadataHive$lzycompute(HiveContext.scala:250)
at org.apache.spark.sql.hive.HiveContext.metadataHive(HiveContext.scala:237)
at org.apache.spark.sql.hive.HiveContext$$anon$2.(HiveContext.scala:457)
at 
org.apache.spark.sql.hive.HiveContext.catalog$lzycompute(HiveContext.scala:457)
at org.apache.spark.sql.hive.HiveContext.catalog(HiveContext.scala:456)
at org.apache.spark.sql.hive.HiveContext$$anon$3.(HiveContext.scala:473)
at 
org.apache.spark.sql.hive.HiveContext.analyzer$lzycompute(HiveContext.scala:473)
at org.apache.spark.sql.hive.HiveContext.analyzer(HiveContext.scala:472)
at 
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:34)
at org.apache.spark.sql.DataFrame.(DataFrame.scala:133)
at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52)
at org.apache.spark.sql.SQLContext.baseRelationToDataFrame(SQLContext.scala:442)
at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:223)
at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:146)
—

Even though the application is not using Hive, HiveContext is used instead of 
SQLContext, for the additional functionality it provides.  There’s no 
hive-site.xml for the application, but this did not cause an issue for Spark 
1.4.1.

Does anyone have an idea about what’s changed from 1.4.1 to 1.6.0 that could 
explain this NPE?  The only obvious change I’ve noticed for HiveContext is that 
the default warehouse location is different (1.4.1 - current directory, 1.6.0 - 
/user/hive/warehouse), but I verified that this NPE happens even when 
/user/hive/warehouse exists and is readable/writeable for the application.  In 
terms of changes to the application to work with Spark 1.6.0, the only one that 
might be relevant to this issue is the upgrade in the Hadoop dependencies to 
match what Spark 1.6.0 uses (2.6.0-cdh5.7.0-SNAPSHOT).

Thanks,
Jay



Re: Spark 1.6.0 HiveContext NPE

2016-02-03 Thread Ted Yu
Looks like the NPE came from this line:
  def conf: HiveConf = SessionState.get().getConf

Meaning SessionState.get() returned null.

On Wed, Feb 3, 2016 at 8:33 AM, Shipper, Jay [USA] 
wrote:

> I’m upgrading an application from Spark 1.4.1 to Spark 1.6.0, and I’m
> getting a NullPointerException from HiveContext.  It’s happening while it
> tries to load some tables via JDBC from an external database (not Hive),
> using context.read().jdbc():
>
> —
> java.lang.NullPointerException
> at
> org.apache.spark.sql.hive.client.ClientWrapper.conf(ClientWrapper.scala:205)
> at
> org.apache.spark.sql.hive.HiveContext.hiveconf$lzycompute(HiveContext.scala:552)
> at org.apache.spark.sql.hive.HiveContext.hiveconf(HiveContext.scala:551)
> at
> org.apache.spark.sql.hive.HiveContext$$anonfun$configure$1.apply(HiveContext.scala:538)
> at
> org.apache.spark.sql.hive.HiveContext$$anonfun$configure$1.apply(HiveContext.scala:537)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at org.apache.spark.sql.hive.HiveContext.configure(HiveContext.scala:537)
> at
> org.apache.spark.sql.hive.HiveContext.metadataHive$lzycompute(HiveContext.scala:250)
> at
> org.apache.spark.sql.hive.HiveContext.metadataHive(HiveContext.scala:237)
> at
> org.apache.spark.sql.hive.HiveContext$$anon$2.(HiveContext.scala:457)
> at
> org.apache.spark.sql.hive.HiveContext.catalog$lzycompute(HiveContext.scala:457)
> at org.apache.spark.sql.hive.HiveContext.catalog(HiveContext.scala:456)
> at
> org.apache.spark.sql.hive.HiveContext$$anon$3.(HiveContext.scala:473)
> at
> org.apache.spark.sql.hive.HiveContext.analyzer$lzycompute(HiveContext.scala:473)
> at org.apache.spark.sql.hive.HiveContext.analyzer(HiveContext.scala:472)
> at
> org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:34)
> at org.apache.spark.sql.DataFrame.(DataFrame.scala:133)
> at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52)
> at
> org.apache.spark.sql.SQLContext.baseRelationToDataFrame(SQLContext.scala:442)
> at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:223)
> at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:146)
> —
>
> Even though the application is not using Hive, HiveContext is used instead
> of SQLContext, for the additional functionality it provides.  There’s no
> hive-site.xml for the application, but this did not cause an issue for
> Spark 1.4.1.
>
> Does anyone have an idea about what’s changed from 1.4.1 to 1.6.0 that
> could explain this NPE?  The only obvious change I’ve noticed for
> HiveContext is that the default warehouse location is different (1.4.1 -
> current directory, 1.6.0 - /user/hive/warehouse), but I verified that this
> NPE happens even when /user/hive/warehouse exists and is readable/writeable
> for the application.  In terms of changes to the application to work with
> Spark 1.6.0, the only one that might be relevant to this issue is the
> upgrade in the Hadoop dependencies to match what Spark 1.6.0 uses
> (2.6.0-cdh5.7.0-SNAPSHOT).
>
> Thanks,
> Jay
>


Spark 1.6.0 HiveContext NPE

2016-02-03 Thread Shipper, Jay [USA]
I’m upgrading an application from Spark 1.4.1 to Spark 1.6.0, and I’m getting a 
NullPointerException from HiveContext.  It’s happening while it tries to load 
some tables via JDBC from an external database (not Hive), using 
context.read().jdbc():

—
java.lang.NullPointerException
at org.apache.spark.sql.hive.client.ClientWrapper.conf(ClientWrapper.scala:205)
at 
org.apache.spark.sql.hive.HiveContext.hiveconf$lzycompute(HiveContext.scala:552)
at org.apache.spark.sql.hive.HiveContext.hiveconf(HiveContext.scala:551)
at 
org.apache.spark.sql.hive.HiveContext$$anonfun$configure$1.apply(HiveContext.scala:538)
at 
org.apache.spark.sql.hive.HiveContext$$anonfun$configure$1.apply(HiveContext.scala:537)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.hive.HiveContext.configure(HiveContext.scala:537)
at 
org.apache.spark.sql.hive.HiveContext.metadataHive$lzycompute(HiveContext.scala:250)
at org.apache.spark.sql.hive.HiveContext.metadataHive(HiveContext.scala:237)
at org.apache.spark.sql.hive.HiveContext$$anon$2.(HiveContext.scala:457)
at 
org.apache.spark.sql.hive.HiveContext.catalog$lzycompute(HiveContext.scala:457)
at org.apache.spark.sql.hive.HiveContext.catalog(HiveContext.scala:456)
at org.apache.spark.sql.hive.HiveContext$$anon$3.(HiveContext.scala:473)
at 
org.apache.spark.sql.hive.HiveContext.analyzer$lzycompute(HiveContext.scala:473)
at org.apache.spark.sql.hive.HiveContext.analyzer(HiveContext.scala:472)
at 
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:34)
at org.apache.spark.sql.DataFrame.(DataFrame.scala:133)
at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52)
at org.apache.spark.sql.SQLContext.baseRelationToDataFrame(SQLContext.scala:442)
at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:223)
at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:146)
—

Even though the application is not using Hive, HiveContext is used instead of 
SQLContext, for the additional functionality it provides.  There’s no 
hive-site.xml for the application, but this did not cause an issue for Spark 
1.4.1.

Does anyone have an idea about what’s changed from 1.4.1 to 1.6.0 that could 
explain this NPE?  The only obvious change I’ve noticed for HiveContext is that 
the default warehouse location is different (1.4.1 - current directory, 1.6.0 - 
/user/hive/warehouse), but I verified that this NPE happens even when 
/user/hive/warehouse exists and is readable/writeable for the application.  In 
terms of changes to the application to work with Spark 1.6.0, the only one that 
might be relevant to this issue is the upgrade in the Hadoop dependencies to 
match what Spark 1.6.0 uses (2.6.0-cdh5.7.0-SNAPSHOT).

Thanks,
Jay


Re: question on spark.streaming.kafka.maxRetries

2016-02-03 Thread Cody Koeninger
KafkaRDD will use the standard kafka configuration parameter
refresh.leader.backoff.ms if it is set in the kafkaParams map passed to
createDirectStream.

On Tue, Feb 2, 2016 at 9:10 PM, Chen Song  wrote:

> For Kafka direct stream, is there a way to set the time between successive
> retries? From my testing, it looks like it is 200ms. Any way I can increase
> the time?
>
>
>


Re: Spark 1.5 Streaming + Kafka 0.9.0

2016-02-03 Thread Cody Koeninger
0.9 brokers should be backwards compatible with 0.8 simple consumer, which
is what the direct stream uses.  If you're finding any problems, please
report them.

There's work underway to add 0.9 consumer support to spark, at
https://issues.apache.org/jira/browse/SPARK-12177


On Wed, Feb 3, 2016 at 9:56 AM, Pavel Sýkora  wrote:

> Hi,
>
> According to the Spark docs, Spark Streaming 1.5 (and 1.6) is compatible
> with Kafka 0.8.2.1 (Direct Kafka API). Nevertheless, I need to use Kafka
> 0.9.0 with Spark 1.5.x streaming.
>
> I tried to use Kafka 0.9.0 as both source and output of Spark 1.5
> Streaming, but it seems it works well.
>
> Does anybody have the same experience? Or does anybody have some problems
> integrating Spark Streaming 1.5 or 1.6 with Kafka 0.9.0?
>
> Thanks in advance,
>
> Pavel
>


Re: spark metrics question

2016-02-03 Thread Matt K
Thanks for sharing Yiannis, looks very promising!

Do you know if I can package a custom class with my application, or does it
have to be pre-deployed on all Executor nodes?

On Wed, Feb 3, 2016 at 10:36 AM, Yiannis Gkoufas 
wrote:

> Hi Matt,
>
> there is some related work I recently did in IBM Research for visualizing
> the metrics produced.
> You can read about it here
> http://www.spark.tc/sparkoscope-enabling-spark-optimization-through-cross-stack-monitoring-and-visualization-2/
> We recently opensourced it if you are interested to have a deeper look to
> it: https://github.com/ibm-research-ireland/sparkoscope
>
> Thanks,
> Yiannis
>
> On 3 February 2016 at 13:32, Matt K  wrote:
>
>> Hi guys,
>>
>> I'm looking to create a custom sync based on Spark's Metrics System:
>>
>> https://github.com/apache/spark/blob/9f603fce78fcc997926e9a72dec44d48cbc396fc/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
>>
>> If I want to collect metrics from the Driver, Master, and Executor nodes,
>> should the jar with the custom class be installed on Driver, Master, and
>> Executor nodes?
>>
>> Also, on Executor nodes, does the MetricsSystem run inside the Executor's
>> JVM?
>>
>> Thanks,
>> -Matt
>>
>
>


-- 
www.calcmachine.com - easy online calculator.


Spark 1.5 Streaming + Kafka 0.9.0

2016-02-03 Thread Pavel Sýkora
Hi,



According to the Spark docs, Spark Streaming 1.5 (and 1.6) is compatible 
with Kafka 0.8.2.1 (Direct Kafka API). Nevertheless, I need to use Kafka 
0.9.0 with Spark 1.5.x streaming.




I tried to use Kafka 0.9.0 as both source and output of Spark 1.5 Streaming,
but it seems it works well. 




Does anybody have the same experience? Or does anybody have some problems 
integrating Spark Streaming 1.5 or 1.6 with Kafka 0.9.0?




Thanks in advance,




Pavel  


Re: Spark DataFrame Catalyst - Another Oracle like query optimizer?

2016-02-03 Thread Koert Kuipers
yeah there was some discussion about adding them to RDD, but it would break
a lot. so Dataset was born.

yes it seems Dataset will be the new RDD for most use cases. but i dont
think its there yet. just keep an eye out on SPARK- for updates...

On Wed, Feb 3, 2016 at 8:51 AM, Jerry Lam  wrote:

> Hi Nirav,
>
> I don't know why those optimizations are not implemented in RDD. It is
> either a political choice or a practical one (backward compatibility might
> be difficult if they need to introduce these types of optimization into
> RDD). I think this is the reasons spark now has Dataset. My understanding
> is that Dataset is the new RDD.
>
>
> Best Regards,
>
> Jerry
>
> Sent from my iPhone
>
> On 3 Feb, 2016, at 12:26 am, Koert Kuipers  wrote:
>
> with respect to joins, unfortunately not all implementations are
> available. for example i would like to use joins where one side is
> streaming (and the other cached). this seems to be available for DataFrame
> but not for RDD.
>
> On Wed, Feb 3, 2016 at 12:19 AM, Nirav Patel 
> wrote:
>
>> Hi Jerry,
>>
>> Yes I read that benchmark. And doesn't help in most cases. I'll give you
>> example of one of our application. It's a memory hogger by nature since it
>> works on groupByKey and performs combinatorics on Iterator. So it maintain
>> few structures inside task. It works on mapreduce with half the resources I
>> am giving it for spark and Spark keeps throwing OOM on a pre-step which is
>> a simple join! I saw every task was done at process_local locality still
>> join keeps failing due to container being killed. and container gets killed
>> due to oom.  We have a case with Databricks/Mapr on that for more then a
>> month. anyway don't wanna distract there. I can believe that changing to
>> DataFrame and it's computing model can bring performance but I was hoping
>> that wouldn't be your answer to every performance problem.
>>
>> Let me ask this - If I decide to stick with RDD do I still have
>> flexibility to choose what Join implementation I can use? And similar
>> underlaying construct to best execute my jobs.
>>
>> I said error prone because you need to write column qualifiers instead of
>> referencing fields. i.e. 'caseObj("field1")' instead of 'caseObj.field1';
>> more over multiple tables having similar column names causing parsing
>> issues; and when you start writing constants for your columns it just
>> become another schema maintenance inside your app. It feels like thing of
>> past. Query engine(distributed or not) is old school as I 'see' it :)
>>
>> Thanks for being patient.
>> Nirav
>>
>>
>>
>>
>>
>> On Tue, Feb 2, 2016 at 6:26 PM, Jerry Lam  wrote:
>>
>>> Hi Nirav,
>>> I'm sure you read this?
>>> https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html
>>>
>>> There is a benchmark in the article to show that dataframe "can"
>>> outperform RDD implementation by 2 times. Of course, benchmarks can be
>>> "made". But from the code snippet you wrote, I "think" dataframe will
>>> choose between different join implementation based on the data statistics.
>>>
>>> I cannot comment on the beauty of it because "beauty is in the eye of
>>> the beholder" LOL
>>> Regarding the comment on error prone, can you say why you think it is
>>> the case? Relative to what other ways?
>>>
>>> Best Regards,
>>>
>>> Jerry
>>>
>>>
>>> On Tue, Feb 2, 2016 at 8:59 PM, Nirav Patel 
>>> wrote:
>>>
 I dont understand why one thinks RDD of case object doesn't have
 types(schema) ? If spark can convert RDD to DataFrame which means it
 understood the schema. SO then from that point why one has to use SQL
 features to do further processing? If all spark need for optimizations is
 schema then what this additional SQL features buys ? If there is a way to
 avoid SQL feature using DataFrame I don't mind it. But looks like I have to
 convert all my existing transformation to things like
 df1.join(df2,df1('abc') == df2('abc'), 'left_outer') .. that's plain ugly
 and error prone in my opinion.

 On Tue, Feb 2, 2016 at 5:49 PM, Jerry Lam  wrote:

> Hi Michael,
>
> Is there a section in the spark documentation demonstrate how to
> serialize arbitrary objects in Dataframe? The last time I did was using
> some User Defined Type (copy from VectorUDT).
>
> Best Regards,
>
> Jerry
>
> On Tue, Feb 2, 2016 at 8:46 PM, Michael Armbrust <
> mich...@databricks.com> wrote:
>
>> A principal difference between RDDs and DataFrames/Datasets is that
>>> the latter have a schema associated to them. This means that they 
>>> support
>>> only certain types (primitives, case classes and more) and that they are
>>> uniform, whereas RDDs can contain any serializable object and must not
>>> necessarily be uniform. These properties make it possible to generate 
>>> very
>>> efficient serialization and other optimizations that cannot be 

Re: spark metrics question

2016-02-03 Thread Yiannis Gkoufas
Hi Matt,

there is some related work I recently did in IBM Research for visualizing
the metrics produced.
You can read about it here
http://www.spark.tc/sparkoscope-enabling-spark-optimization-through-cross-stack-monitoring-and-visualization-2/
We recently opensourced it if you are interested to have a deeper look to
it: https://github.com/ibm-research-ireland/sparkoscope

Thanks,
Yiannis

On 3 February 2016 at 13:32, Matt K  wrote:

> Hi guys,
>
> I'm looking to create a custom sync based on Spark's Metrics System:
>
> https://github.com/apache/spark/blob/9f603fce78fcc997926e9a72dec44d48cbc396fc/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
>
> If I want to collect metrics from the Driver, Master, and Executor nodes,
> should the jar with the custom class be installed on Driver, Master, and
> Executor nodes?
>
> Also, on Executor nodes, does the MetricsSystem run inside the Executor's
> JVM?
>
> Thanks,
> -Matt
>


Re: java.lang.ArrayIndexOutOfBoundsException when attempting broadcastjoin

2016-02-03 Thread Alexandr Dzhagriev
Hi Sebastian,

Do you have any updates on the issue? I faced with pretty the same problem
and disabling kryo + raising the spark.network.timeout up to 600s helped.
So for my job it takes about 5 minutes to broadcast the variable (~5GB in
my case) but then it's fast. I mean much faster than shuffling with usual
join anyway. Hope it helps.


Thanks, Alex.


Re: Spark DataFrame Catalyst - Another Oracle like query optimizer?

2016-02-03 Thread Jerry Lam
Hi Nirav,

I don't know why those optimizations are not implemented in RDD. It is either a 
political choice or a practical one (backward compatibility might be difficult 
if they need to introduce these types of optimization into RDD). I think this 
is the reasons spark now has Dataset. My understanding is that Dataset is the 
new RDD. 


Best Regards,

Jerry

Sent from my iPhone

> On 3 Feb, 2016, at 12:26 am, Koert Kuipers  wrote:
> 
> with respect to joins, unfortunately not all implementations are available. 
> for example i would like to use joins where one side is streaming (and the 
> other cached). this seems to be available for DataFrame but not for RDD.
> 
>> On Wed, Feb 3, 2016 at 12:19 AM, Nirav Patel  wrote:
>> Hi Jerry,
>> 
>> Yes I read that benchmark. And doesn't help in most cases. I'll give you 
>> example of one of our application. It's a memory hogger by nature since it 
>> works on groupByKey and performs combinatorics on Iterator. So it maintain 
>> few structures inside task. It works on mapreduce with half the resources I 
>> am giving it for spark and Spark keeps throwing OOM on a pre-step which is a 
>> simple join! I saw every task was done at process_local locality still join 
>> keeps failing due to container being killed. and container gets killed due 
>> to oom.  We have a case with Databricks/Mapr on that for more then a month. 
>> anyway don't wanna distract there. I can believe that changing to DataFrame 
>> and it's computing model can bring performance but I was hoping that 
>> wouldn't be your answer to every performance problem.  
>> 
>> Let me ask this - If I decide to stick with RDD do I still have flexibility 
>> to choose what Join implementation I can use? And similar underlaying 
>> construct to best execute my jobs. 
>> 
>> I said error prone because you need to write column qualifiers instead of 
>> referencing fields. i.e. 'caseObj("field1")' instead of 'caseObj.field1'; 
>> more over multiple tables having similar column names causing parsing 
>> issues; and when you start writing constants for your columns it just become 
>> another schema maintenance inside your app. It feels like thing of past. 
>> Query engine(distributed or not) is old school as I 'see' it :)
>> 
>> Thanks for being patient.
>> Nirav
>> 
>> 
>> 
>> 
>> 
>>> On Tue, Feb 2, 2016 at 6:26 PM, Jerry Lam  wrote:
>>> Hi Nirav,
>>> I'm sure you read this? 
>>> https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html
>>> 
>>> There is a benchmark in the article to show that dataframe "can" outperform 
>>> RDD implementation by 2 times. Of course, benchmarks can be "made". But 
>>> from the code snippet you wrote, I "think" dataframe will choose between 
>>> different join implementation based on the data statistics. 
>>> 
>>> I cannot comment on the beauty of it because "beauty is in the eye of the 
>>> beholder" LOL
>>> Regarding the comment on error prone, can you say why you think it is the 
>>> case? Relative to what other ways?
>>> 
>>> Best Regards,
>>> 
>>> Jerry
>>> 
>>> 
 On Tue, Feb 2, 2016 at 8:59 PM, Nirav Patel  wrote:
 I dont understand why one thinks RDD of case object doesn't have 
 types(schema) ? If spark can convert RDD to DataFrame which means it 
 understood the schema. SO then from that point why one has to use SQL 
 features to do further processing? If all spark need for optimizations is 
 schema then what this additional SQL features buys ? If there is a way to 
 avoid SQL feature using DataFrame I don't mind it. But looks like I have 
 to convert all my existing transformation to things like 
 df1.join(df2,df1('abc') == df2('abc'), 'left_outer') .. that's plain ugly 
 and error prone in my opinion. 
 
> On Tue, Feb 2, 2016 at 5:49 PM, Jerry Lam  wrote:
> Hi Michael,
> 
> Is there a section in the spark documentation demonstrate how to 
> serialize arbitrary objects in Dataframe? The last time I did was using 
> some User Defined Type (copy from VectorUDT). 
> 
> Best Regards,
> 
> Jerry
> 
> On Tue, Feb 2, 2016 at 8:46 PM, Michael Armbrust  
> wrote:
>>> A principal difference between RDDs and DataFrames/Datasets is that the 
>>> latter have a schema associated to them. This means that they support 
>>> only certain types (primitives, case classes and more) and that they 
>>> are uniform, whereas RDDs can contain any serializable object and must 
>>> not necessarily be uniform. These properties make it possible to 
>>> generate very efficient serialization and other optimizations that 
>>> cannot be achieved with plain RDDs.
>> 
>> You can use Encoder.kryo() as well to serialize arbitrary objects, just 
>> like with RDDs.
 
 
 
 
 
 
 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
> 


spark metrics question

2016-02-03 Thread Matt K
Hi guys,

I'm looking to create a custom sync based on Spark's Metrics System:
https://github.com/apache/spark/blob/9f603fce78fcc997926e9a72dec44d48cbc396fc/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala

If I want to collect metrics from the Driver, Master, and Executor nodes,
should the jar with the custom class be installed on Driver, Master, and
Executor nodes?

Also, on Executor nodes, does the MetricsSystem run inside the Executor's
JVM?

Thanks,
-Matt


Re: spark-cassandra

2016-02-03 Thread Gerard Maas
NoSuchMethodError usually refers to a version conflict. Probably your job
was built against a higher version of the cassandra connector than what's
available on the run time.
Check that the versions are aligned.

-kr, Gerard.

On Wed, Feb 3, 2016 at 1:37 PM, Madabhattula Rajesh Kumar <
mrajaf...@gmail.com> wrote:

> Hi,
>
> I am using Spark Jobserver to submit the jobs. I am using spark-cassandra
> connector to connect to Cassandra. I am getting below exception through
> spak jobserver.
>
> If I submit the job through *Spark-Submit *command it is working fine,.
>
> Please let me know how to solve this issue
>
>
> Exception in thread "pool-1-thread-1" java.lang.NoSuchMethodError:
> com.datastax.driver.core.TableMetadata.getIndexes()Ljava/util/List;
> at
> com.datastax.spark.connector.cql.Schema$.getIndexMap(Schema.scala:193)
> at
> com.datastax.spark.connector.cql.Schema$.com$datastax$spark$connector$cql$Schema$$fetchPartitionKey(Schema.scala:197)
> at
> com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchTables$1$2.apply(Schema.scala:239)
> at
> com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchTables$1$2.apply(Schema.scala:238)
> at
> scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722)
> at
> scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153)
> at
> scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
> at
> scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
> at
> com.datastax.spark.connector.cql.Schema$.com$datastax$spark$connector$cql$Schema$$fetchTables$1(Schema.scala:238)
> at
> com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchKeyspaces$1$2.apply(Schema.scala:247)
> at
> com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchKeyspaces$1$2.apply(Schema.scala:246)
> at
> scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722)
> at
> scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153)
> at
> scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
> at
> scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
> at
> com.datastax.spark.connector.cql.Schema$.com$datastax$spark$connector$cql$Schema$$fetchKeyspaces$1(Schema.scala:246)
> at
> com.datastax.spark.connector.cql.Schema$$anonfun$fromCassandra$1.apply(Schema.scala:252)
> at
> com.datastax.spark.connector.cql.Schema$$anonfun$fromCassandra$1.apply(Schema.scala:249)
> at
> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withClusterDo$1.apply(CassandraConnector.scala:121)
> at
> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withClusterDo$1.apply(CassandraConnector.scala:120)
> at
> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:110)
> at
> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:109)
> at
> com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:139)
> at
> com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109)
> at
> com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:120)
> at
> com.datastax.spark.connector.cql.Schema$.fromCassandra(Schema.scala:249)
> at
> com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:263)
> at
> com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:36)
> at
> com.cisco.ss.etl.utils.ETLHelper$class.persistBackupConfigDevicesData(ETLHelper.scala:79)
> at com.cisco.ss.etl.Main$.persistBackupConfigDevicesData(Main.scala:13)
> at
> com.cisco.ss.etl.utils.ETLHelper$class.persistByBacthes(ETLHelper.scala:43)
> at com.cisco.ss.etl.Main$.persistByBacthes(Main.scala:13)
> at com.cisco.ss.etl.Main$$anonfun$runJob$3.apply(Main.scala:48)
> at com.cisco.ss.etl.Main$$anonfun$runJob$3.apply(Main.scala:45)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at com.cisco.ss.etl.Main$.runJob(Main.scala:45)
> at com.cisco.ss.etl.Main$.runJob(Main.scala:13)
> at
> spark.jobserver.JobManagerActor$$anonfun$spark$jobserver$JobManagerActor$$getJobFuture$4.apply(JobManagerActor.scala:274)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecut

spark-cassandra

2016-02-03 Thread Madabhattula Rajesh Kumar
Hi,

I am using Spark Jobserver to submit the jobs. I am using spark-cassandra
connector to connect to Cassandra. I am getting below exception through
spak jobserver.

If I submit the job through *Spark-Submit *command it is working fine,.

Please let me know how to solve this issue


Exception in thread "pool-1-thread-1" java.lang.NoSuchMethodError:
com.datastax.driver.core.TableMetadata.getIndexes()Ljava/util/List;
at
com.datastax.spark.connector.cql.Schema$.getIndexMap(Schema.scala:193)
at
com.datastax.spark.connector.cql.Schema$.com$datastax$spark$connector$cql$Schema$$fetchPartitionKey(Schema.scala:197)
at
com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchTables$1$2.apply(Schema.scala:239)
at
com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchTables$1$2.apply(Schema.scala:238)
at
scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722)
at
scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153)
at
scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
at
scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
at
com.datastax.spark.connector.cql.Schema$.com$datastax$spark$connector$cql$Schema$$fetchTables$1(Schema.scala:238)
at
com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchKeyspaces$1$2.apply(Schema.scala:247)
at
com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchKeyspaces$1$2.apply(Schema.scala:246)
at
scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722)
at
scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153)
at
scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
at
scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
at
com.datastax.spark.connector.cql.Schema$.com$datastax$spark$connector$cql$Schema$$fetchKeyspaces$1(Schema.scala:246)
at
com.datastax.spark.connector.cql.Schema$$anonfun$fromCassandra$1.apply(Schema.scala:252)
at
com.datastax.spark.connector.cql.Schema$$anonfun$fromCassandra$1.apply(Schema.scala:249)
at
com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withClusterDo$1.apply(CassandraConnector.scala:121)
at
com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withClusterDo$1.apply(CassandraConnector.scala:120)
at
com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:110)
at
com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:109)
at
com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:139)
at
com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109)
at
com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:120)
at
com.datastax.spark.connector.cql.Schema$.fromCassandra(Schema.scala:249)
at
com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:263)
at
com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:36)
at
com.cisco.ss.etl.utils.ETLHelper$class.persistBackupConfigDevicesData(ETLHelper.scala:79)
at com.cisco.ss.etl.Main$.persistBackupConfigDevicesData(Main.scala:13)
at
com.cisco.ss.etl.utils.ETLHelper$class.persistByBacthes(ETLHelper.scala:43)
at com.cisco.ss.etl.Main$.persistByBacthes(Main.scala:13)
at com.cisco.ss.etl.Main$$anonfun$runJob$3.apply(Main.scala:48)
at com.cisco.ss.etl.Main$$anonfun$runJob$3.apply(Main.scala:45)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at com.cisco.ss.etl.Main$.runJob(Main.scala:45)
at com.cisco.ss.etl.Main$.runJob(Main.scala:13)
at
spark.jobserver.JobManagerActor$$anonfun$spark$jobserver$JobManagerActor$$getJobFuture$4.apply(JobManagerActor.scala:274)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Regards,
Rajesh


Spark Streaming: Dealing with downstream services faults

2016-02-03 Thread Udo Fholl
Hi all,

I need to send to an external service the result of our aggregations. I
need to make sure that these results are actually sent.

My current approach is to send them in an invocation of "foreachRDD". But
how that is going to work with failures?

Should I instead use "mapWithState" then "transform (here is where I would
send)" and "mapWithState" to remove those that were successfully sent?

Thank you.

Udo.


Spark streaming archive results

2016-02-03 Thread Udo Fholl
Hi,

I want to aggregate on a small window and send downstream every 30 secs.
But I would also like to store in our archive the outcome every 20min.

My current approach (simplified version) is:

  val stream = //
  val statedStream = stream.mapWithState(stateSpec)
  val archiveStream = statedStream


  statedStream.foreachRDD(rdd => rdd.foreachPartition(d =>
sendDownstream(d)))

  archive.window(Minutes(20).foreachRDD(rdd => rdd.foreachPartition(d =>
archive(d)))


Is there a more suitable way to deal with this situations?

Thanks.

Udo


Re: sparkR not able to create /append new columns

2016-02-03 Thread Franc Carter
Yes, I didn't work out how to solve that - sorry


On 3 February 2016 at 22:37, Devesh Raj Singh 
wrote:

> Hi,
>
> but "withColumn" will only add once, if i want to add columns to the same
> dataframe in a loop it will keep overwriting the added column and in the
> end the last added column( in the loop) will be the added column. like in
> my code above.
>
> On Wed, Feb 3, 2016 at 5:05 PM, Franc Carter 
> wrote:
>
>>
>> I had problems doing this as well - I ended up using 'withColumn', it's
>> not particularly graceful but it worked (1.5.2 on AWS EMR)
>>
>> cheerd
>>
>> On 3 February 2016 at 22:06, Devesh Raj Singh 
>> wrote:
>>
>>> Hi,
>>>
>>> i am trying to create dummy variables in sparkR by creating new columns
>>> for categorical variables. But it is not appending the columns
>>>
>>>
>>> df <- createDataFrame(sqlContext, iris)
>>> class(dtypes(df))
>>>
>>> cat.column<-vector(mode="character",length=nrow(df))
>>> cat.column<-collect(select(df,df$Species))
>>> lev<-length(levels(as.factor(unlist(cat.column
>>> varb.names<-vector(mode="character",length=lev)
>>> for (i in 1:lev){
>>>
>>>   varb.names[i]<-paste0(colnames(cat.column),i)
>>>
>>> }
>>>
>>> for (j in 1:lev)
>>>
>>> {
>>>
>>>dummy.df.new<-withColumn(df,paste0(colnames
>>>(cat.column),j),if else(df$Species==levels(as.factor(un
>>> list(cat.column))
>>>[j],1,0) )
>>>
>>> }
>>>
>>> I am getting the below output for
>>>
>>> head(dummy.df.new)
>>>
>>> output:
>>>
>>>   Sepal_Length Sepal_Width Petal_Length Petal_Width Species Species1
>>> 1  5.1 3.5  1.4 0.2  setosa1
>>> 2  4.9 3.0  1.4 0.2  setosa1
>>> 3  4.7 3.2  1.3 0.2  setosa1
>>> 4  4.6 3.1  1.5 0.2  setosa1
>>> 5  5.0 3.6  1.4 0.2  setosa1
>>> 6  5.4 3.9  1.7 0.4  setosa1
>>>
>>> Problem: Species2 and Species3 column are not getting added to the
>>> dataframe
>>>
>>> --
>>> Warm regards,
>>> Devesh.
>>>
>>
>>
>>
>> --
>> Franc
>>
>
>
>
> --
> Warm regards,
> Devesh.
>



-- 
Franc


Re: sparkR not able to create /append new columns

2016-02-03 Thread Devesh Raj Singh
Hi,

but "withColumn" will only add once, if i want to add columns to the same
dataframe in a loop it will keep overwriting the added column and in the
end the last added column( in the loop) will be the added column. like in
my code above.

On Wed, Feb 3, 2016 at 5:05 PM, Franc Carter  wrote:

>
> I had problems doing this as well - I ended up using 'withColumn', it's
> not particularly graceful but it worked (1.5.2 on AWS EMR)
>
> cheerd
>
> On 3 February 2016 at 22:06, Devesh Raj Singh 
> wrote:
>
>> Hi,
>>
>> i am trying to create dummy variables in sparkR by creating new columns
>> for categorical variables. But it is not appending the columns
>>
>>
>> df <- createDataFrame(sqlContext, iris)
>> class(dtypes(df))
>>
>> cat.column<-vector(mode="character",length=nrow(df))
>> cat.column<-collect(select(df,df$Species))
>> lev<-length(levels(as.factor(unlist(cat.column
>> varb.names<-vector(mode="character",length=lev)
>> for (i in 1:lev){
>>
>>   varb.names[i]<-paste0(colnames(cat.column),i)
>>
>> }
>>
>> for (j in 1:lev)
>>
>> {
>>
>>dummy.df.new<-withColumn(df,paste0(colnames
>>(cat.column),j),if else(df$Species==levels(as.factor(un
>> list(cat.column))
>>[j],1,0) )
>>
>> }
>>
>> I am getting the below output for
>>
>> head(dummy.df.new)
>>
>> output:
>>
>>   Sepal_Length Sepal_Width Petal_Length Petal_Width Species Species1
>> 1  5.1 3.5  1.4 0.2  setosa1
>> 2  4.9 3.0  1.4 0.2  setosa1
>> 3  4.7 3.2  1.3 0.2  setosa1
>> 4  4.6 3.1  1.5 0.2  setosa1
>> 5  5.0 3.6  1.4 0.2  setosa1
>> 6  5.4 3.9  1.7 0.4  setosa1
>>
>> Problem: Species2 and Species3 column are not getting added to the
>> dataframe
>>
>> --
>> Warm regards,
>> Devesh.
>>
>
>
>
> --
> Franc
>



-- 
Warm regards,
Devesh.


Re: sparkR not able to create /append new columns

2016-02-03 Thread Franc Carter
I had problems doing this as well - I ended up using 'withColumn', it's not
particularly graceful but it worked (1.5.2 on AWS EMR)

cheerd

On 3 February 2016 at 22:06, Devesh Raj Singh 
wrote:

> Hi,
>
> i am trying to create dummy variables in sparkR by creating new columns
> for categorical variables. But it is not appending the columns
>
>
> df <- createDataFrame(sqlContext, iris)
> class(dtypes(df))
>
> cat.column<-vector(mode="character",length=nrow(df))
> cat.column<-collect(select(df,df$Species))
> lev<-length(levels(as.factor(unlist(cat.column
> varb.names<-vector(mode="character",length=lev)
> for (i in 1:lev){
>
>   varb.names[i]<-paste0(colnames(cat.column),i)
>
> }
>
> for (j in 1:lev)
>
> {
>
>dummy.df.new<-withColumn(df,paste0(colnames
>(cat.column),j),if else(df$Species==levels(as.factor(un
> list(cat.column))
>[j],1,0) )
>
> }
>
> I am getting the below output for
>
> head(dummy.df.new)
>
> output:
>
>   Sepal_Length Sepal_Width Petal_Length Petal_Width Species Species1
> 1  5.1 3.5  1.4 0.2  setosa1
> 2  4.9 3.0  1.4 0.2  setosa1
> 3  4.7 3.2  1.3 0.2  setosa1
> 4  4.6 3.1  1.5 0.2  setosa1
> 5  5.0 3.6  1.4 0.2  setosa1
> 6  5.4 3.9  1.7 0.4  setosa1
>
> Problem: Species2 and Species3 column are not getting added to the
> dataframe
>
> --
> Warm regards,
> Devesh.
>



-- 
Franc


sparkR not able to create /append new columns

2016-02-03 Thread Devesh Raj Singh
Hi,

i am trying to create dummy variables in sparkR by creating new columns for
categorical variables. But it is not appending the columns


df <- createDataFrame(sqlContext, iris)
class(dtypes(df))

cat.column<-vector(mode="character",length=nrow(df))
cat.column<-collect(select(df,df$Species))
lev<-length(levels(as.factor(unlist(cat.column
varb.names<-vector(mode="character",length=lev)
for (i in 1:lev){

  varb.names[i]<-paste0(colnames(cat.column),i)

}

for (j in 1:lev)

{

   dummy.df.new<-withColumn(df,paste0(colnames
   (cat.column),j),if else(df$Species==levels(as.factor(un list(cat.column))
   [j],1,0) )

}

I am getting the below output for

head(dummy.df.new)

output:

  Sepal_Length Sepal_Width Petal_Length Petal_Width Species Species1
1  5.1 3.5  1.4 0.2  setosa1
2  4.9 3.0  1.4 0.2  setosa1
3  4.7 3.2  1.3 0.2  setosa1
4  4.6 3.1  1.5 0.2  setosa1
5  5.0 3.6  1.4 0.2  setosa1
6  5.4 3.9  1.7 0.4  setosa1

Problem: Species2 and Species3 column are not getting added to the dataframe

-- 
Warm regards,
Devesh.


Re: DataFrame First method is resulting different results in each iteration

2016-02-03 Thread satish chandra j
Hi Hemant,
My dataframe "ordrd_emd_df" consist data in order as I have applied oderBy
in the first step
And also tried having "orderBy" method before "groupBy" than also getting
different results in each iteration

Regards,
Satish Chandra


On Wed, Feb 3, 2016 at 4:28 PM, Hemant Bhanawat 
wrote:

> Missing order by?
>
> Hemant Bhanawat
> SnappyData (http://snappydata.io/)
>
>
> On Wed, Feb 3, 2016 at 3:45 PM, satish chandra j  > wrote:
>
>> HI All,
>> I have data in a emp_df (DataFrame) as mentioned below:
>>
>> EmpId   Sal   DeptNo
>> 001   100   10
>> 002   120   20
>> 003   130   10
>> 004   140   20
>> 005   150   10
>>
>> ordrd_emp_df = emp_df.orderBy($"DeptNo",$"Sal".desc)  which results as
>> below:
>>
>> DeptNo  Sal   EmpId
>> 10 150   005
>> 10 130   003
>> 10 100   001
>> 20 140   004
>> 20 120   002
>>
>> Now I want to pick highest paid EmpId of each DeptNo.,hence applied agg
>> First method as below
>>
>>
>> ordrd_emp_df.groupBy("DeptNo").agg($"DeptNo",first("EmpId").as("TopSal")).select($"DeptNo",$"TopSal")
>>
>> Expected output is DeptNo  TopSal
>>   10005
>>20   004
>> But my output varies for each iteration such as
>>
>> First Iteration results as  Dept  TopSal
>>   10 003
>>20 004
>>
>> Secnd Iteration results as Dept  TopSal
>>   10 005
>>   20 004
>>
>> Third Iteration results as  Dept  TopSal
>>   10 003
>>   20 002
>>
>> Not sure why output varies on each iteration as no change in code and
>> values in DataFrame
>>
>> Please let me know if any inputs on this
>>
>> Regards,
>> Satish Chandra J
>>
>
>


Re: DataFrame First method is resulting different results in each iteration

2016-02-03 Thread Hemant Bhanawat
Missing order by?

Hemant Bhanawat
SnappyData (http://snappydata.io/)

On Wed, Feb 3, 2016 at 3:45 PM, satish chandra j 
wrote:

> HI All,
> I have data in a emp_df (DataFrame) as mentioned below:
>
> EmpId   Sal   DeptNo
> 001   100   10
> 002   120   20
> 003   130   10
> 004   140   20
> 005   150   10
>
> ordrd_emp_df = emp_df.orderBy($"DeptNo",$"Sal".desc)  which results as
> below:
>
> DeptNo  Sal   EmpId
> 10 150   005
> 10 130   003
> 10 100   001
> 20 140   004
> 20 120   002
>
> Now I want to pick highest paid EmpId of each DeptNo.,hence applied agg
> First method as below
>
>
> ordrd_emp_df.groupBy("DeptNo").agg($"DeptNo",first("EmpId").as("TopSal")).select($"DeptNo",$"TopSal")
>
> Expected output is DeptNo  TopSal
>   10005
>20   004
> But my output varies for each iteration such as
>
> First Iteration results as  Dept  TopSal
>   10 003
>20 004
>
> Secnd Iteration results as Dept  TopSal
>   10 005
>   20 004
>
> Third Iteration results as  Dept  TopSal
>   10 003
>   20 002
>
> Not sure why output varies on each iteration as no change in code and
> values in DataFrame
>
> Please let me know if any inputs on this
>
> Regards,
> Satish Chandra J
>


Re: Guidelines for writing SPARK packages

2016-02-03 Thread Takeshi Yamamuro
Hi,

A package I maintain (https://github.com/maropu/hivemall-spark) extends
existing SparkSQL/DataFrame classes for a third-party library.
Please use this as a concrete example.

Thanks,
takeshi

On Tue, Feb 2, 2016 at 6:20 PM, Praveen Devarao 
wrote:

> Thanks David.
>
> I am looking at extending the SparkSQL library with a custom
> package...hence was looking at more from details on any specific classes to
> be extended or implement (with) to achieve the redirect of calls to my
> module (when using .format).
>
> If you have any info on these lines do share with me...else debugging
> through would be the way :-)
>
> Thanking You
>
> Praveen Devarao
>
>
>
> From:David Russell 
> To:Praveen Devarao/India/IBM@IBMIN
> Cc:user 
> Date:01/02/2016 07:03 pm
> Subject:Re: Guidelines for writing SPARK packages
> Sent by:marchoffo...@gmail.com
> --
>
>
>
> Hi Praveen,
>
> The basic requirements for releasing a Spark package on
> spark-packages.org are as follows:
>
> 1. The package content must be hosted by GitHub in a public repo under
> the owner's account.
> 2. The repo name must match the package name.
> 3. The master branch of the repo must contain "README.md" and "LICENSE".
>
> Per the doc on spark-packages.org site an example package that meets
> those requirements can be found at
> https://github.com/databricks/spark-avro. My own recently released
> SAMBA package also meets these requirements:
> https://github.com/onetapbeyond/lambda-spark-executor.
>
> As you can see there is nothing in this list of requirements that
> demands the implementation of specific interfaces. What you'll need to
> implement will depend entirely on what you want to accomplish. If you
> want to register a release for your package you will also need to push
> the artifacts for your package to Maven central.
>
> David
>
>
> On Mon, Feb 1, 2016 at 7:03 AM, Praveen Devarao 
> wrote:
> > Hi,
> >
> > Is there any guidelines or specs to write a Spark package? I
> would
> > like to implement a spark package and would like to know the way it
> needs to
> > be structured (implement some interfaces etc) so that it can plug into
> Spark
> > for extended functionality.
> >
> > Could any one help me point to docs or links on the above?
> >
> > Thanking You
> >
> > Praveen Devarao
>
>
>
> --
> "All that is gold does not glitter, Not all those who wander are lost."
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
>
>


-- 
---
Takeshi Yamamuro


DataFrame First method is resulting different results in each iteration

2016-02-03 Thread satish chandra j
HI All,
I have data in a emp_df (DataFrame) as mentioned below:

EmpId   Sal   DeptNo
001   100   10
002   120   20
003   130   10
004   140   20
005   150   10

ordrd_emp_df = emp_df.orderBy($"DeptNo",$"Sal".desc)  which results as
below:

DeptNo  Sal   EmpId
10 150   005
10 130   003
10 100   001
20 140   004
20 120   002

Now I want to pick highest paid EmpId of each DeptNo.,hence applied agg
First method as below

ordrd_emp_df.groupBy("DeptNo").agg($"DeptNo",first("EmpId").as("TopSal")).select($"DeptNo",$"TopSal")

Expected output is DeptNo  TopSal
  10005
   20   004
But my output varies for each iteration such as

First Iteration results as  Dept  TopSal
  10 003
   20 004

Secnd Iteration results as Dept  TopSal
  10 005
  20 004

Third Iteration results as  Dept  TopSal
  10 003
  20 002

Not sure why output varies on each iteration as no change in code and
values in DataFrame

Please let me know if any inputs on this

Regards,
Satish Chandra J


Re: recommendProductsForUser for a subset of user

2016-02-03 Thread Sabarish Sasidharan
You could always construct a new MatrixFactorizationModel with your
filtered set of user features and product features. I believe its just a
stateless wrapper around the actual rdds.

Regards
Sab

On Wed, Feb 3, 2016 at 5:28 AM, Roberto Pagliari 
wrote:

> When using ALS, is it possible to use recommendProductsForUser for a
> subset of users?
>
> Currently, productFeatures and userFeatures are val. Is there a
> workaround for it? Using recommendForUser repeatedly would not work in my
> case, since it would be too slow with many users.
>
>
> Thank you,
>
>


-- 

Architect - Big Data
Ph: +91 99805 99458

Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan
India ICT)*
+++