Re: Concurreny does not improve for Spark Jobs with Same Spark Context

2016-02-18 Thread Prabhu Joseph
Fair Scheduler, YARN Queue has the entire cluster resource as maxResource,
preemption does not come into picture during test case, all the spark jobs
got the requested resource.

The concurrent jobs with different spark context runs fine, so suspecting
on resource contention is not a correct one.

The performace degrades only for concurrent jobs on shared spark context.
Is SparkContext has any critical section, which needs locking, and jobs
waiting to read that. I know Spark and Scala is not a old thread model, it
uses Actor Model, where locking does not happen, but still want to verify
is java old  threading is used somewhere.



On Friday, February 19, 2016, Jörn Franke  wrote:

> How did you configure YARN queues? What scheduler? Preemption ?
>
> > On 19 Feb 2016, at 06:51, Prabhu Joseph  > wrote:
> >
> > Hi All,
> >
> >When running concurrent Spark Jobs on YARN (Spark-1.5.2) which share
> a single Spark Context, the jobs take more time to complete comparing with
> when they ran with different Spark Context.
> > The spark jobs are submitted on different threads.
> >
> > Test Case:
> >
> > A.  3 spark jobs submitted serially
> > B.  3 spark jobs submitted concurrently and with different
> SparkContext
> > C.  3 spark jobs submitted concurrently and with same Spark Context
> > D.  3 spark jobs submitted concurrently and with same Spark Context
> and tripling the resources.
> >
> > A and B takes equal time, But C and D are taking 2-3 times longer than
> A, which shows concurrency does not improve with shared Spark Context.
> [Spark Job Server]
> >
> > Thanks,
> > Prabhu Joseph
>


Re: Concurreny does not improve for Spark Jobs with Same Spark Context

2016-02-18 Thread Ted Yu
Is it possible to perform the tests using Spark 1.6.0 ?

Thanks

On Thu, Feb 18, 2016 at 9:51 PM, Prabhu Joseph 
wrote:

> Hi All,
>
>When running concurrent Spark Jobs on YARN (Spark-1.5.2) which share a
> single Spark Context, the jobs take more time to complete comparing with
> when they ran with different Spark Context.
> The spark jobs are submitted on different threads.
>
> Test Case:
>
> A.  3 spark jobs submitted serially
> B.  3 spark jobs submitted concurrently and with different SparkContext
> C.  3 spark jobs submitted concurrently and with same Spark Context
> D.  3 spark jobs submitted concurrently and with same Spark Context
> and tripling the resources.
>
> A and B takes equal time, But C and D are taking 2-3 times longer than A,
> which shows concurrency does not improve with shared Spark Context. [Spark
> Job Server]
>
> Thanks,
> Prabhu Joseph
>


RE: Spark JDBC connection - data writing success or failure cases

2016-02-18 Thread Mich Talebzadeh
Sorry where is the source of data. Are you writing to Oracle table or reading 
from?

 

In general JDBC messages will you about the connection failure halfway or any 
other message received say from Oracle via JDBC.

 

What batch size are you using for this transaction?

 

HTH

 

Dr Mich Talebzadeh

 

LinkedIn   

 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

  http://talebzadehmich.wordpress.com

 

NOTE: The information in this email is proprietary and confidential. This 
message is for the designated recipient only, if you are not the intended 
recipient, you should destroy it immediately. Any information in this message 
shall not be understood as given or endorsed by Peridale Technology Ltd, its 
subsidiaries or their employees, unless expressly so stated. It is the 
responsibility of the recipient to ensure that this email is virus free, 
therefore neither Peridale Technology Ltd, its subsidiaries nor their employees 
accept any responsibility.

 

 

From: Divya Gehlot [mailto:divya.htco...@gmail.com] 
Sent: 19 February 2016 02:36
To: user @spark 
Subject: Spark JDBC connection - data writing success or failure cases

 

Hi,

I am a Spark job which connects to RDBMS (in mycase its Oracle).

How can we check that complete data writing is successful?

Can I use commit in case of success or rollback in case of failure ?

 

 

 

Thanks,

Divya 



Re: Concurreny does not improve for Spark Jobs with Same Spark Context

2016-02-18 Thread Jörn Franke
How did you configure YARN queues? What scheduler? Preemption ?

> On 19 Feb 2016, at 06:51, Prabhu Joseph  wrote:
> 
> Hi All,
> 
>When running concurrent Spark Jobs on YARN (Spark-1.5.2) which share a 
> single Spark Context, the jobs take more time to complete comparing with when 
> they ran with different Spark Context.
> The spark jobs are submitted on different threads.
> 
> Test Case: 
>   
> A.  3 spark jobs submitted serially
> B.  3 spark jobs submitted concurrently and with different SparkContext
> C.  3 spark jobs submitted concurrently and with same Spark Context
> D.  3 spark jobs submitted concurrently and with same Spark Context and 
> tripling the resources.
> 
> A and B takes equal time, But C and D are taking 2-3 times longer than A, 
> which shows concurrency does not improve with shared Spark Context. [Spark 
> Job Server]
> 
> Thanks,
> Prabhu Joseph

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Concurreny does not improve for Spark Jobs with Same Spark Context

2016-02-18 Thread Prabhu Joseph
Hi All,

   When running concurrent Spark Jobs on YARN (Spark-1.5.2) which share a
single Spark Context, the jobs take more time to complete comparing with
when they ran with different Spark Context.
The spark jobs are submitted on different threads.

Test Case:

A.  3 spark jobs submitted serially
B.  3 spark jobs submitted concurrently and with different SparkContext
C.  3 spark jobs submitted concurrently and with same Spark Context
D.  3 spark jobs submitted concurrently and with same Spark Context and
tripling the resources.

A and B takes equal time, But C and D are taking 2-3 times longer than A,
which shows concurrency does not improve with shared Spark Context. [Spark
Job Server]

Thanks,
Prabhu Joseph


Logistic Regression using ML Pipeline

2016-02-18 Thread Arunkumar Pillai
Hi

I'm trying to build logistic regression using ML Pipeline

 val lr = new LogisticRegression()

lr.setFitIntercept(true)
lr.setMaxIter(100)
val model = lr.fit(data)

println(model.summary)

I'm getting coefficients but not able to get the predicted and probability
values.

Please help

-- 
Thanks and Regards
Arun


Re: cannot coerce class "data.frame" to a DataFrame - with spark R

2016-02-18 Thread Felix Cheung
Doesn't DESeqDataSetFromMatrix work with data.frame only? It wouldn't work with 
Spark's DataFrame - try collect(countMat) and others to convert them into 
data.frame?



_
From: roni 
Sent: Thursday, February 18, 2016 4:55 PM
Subject: cannot coerce class "data.frame" to a DataFrame - with spark R
To:  


   Hi ,    I am trying to convert a bioinformatics R script to use 
spark R. It uses external bioconductor package (DESeq2) so the only conversion 
really I have made is to change the way it reads the input file.   
  When I call my external R library function in DESeq2 I get error 
cannot coerce class "data.frame" to a DataFrame .  
  I am listing my old R code and new spark R code below and the line 
giving problem is in RED.  ORIGINAL R -   library(plyr) 
library(dplyr) library(DESeq2) 
library(pheatmap) library(gplots) library(RColorBrewer) 
library(matrixStats) library(pheatmap) 
library(ggplot2) library(hexbin) library(corrplot)  
   
 sampleDictFile <- "/160208.txt" sampleDict <- 
read.table(sampleDictFile) 
 peaks <- read.table("/Atlas.txt") countMat <- 
read.table("/cntMatrix.txt", header = TRUE, sep = "\t") 
 colnames(countMat) <- sampleDict$sample 
rownames(peaks) <- rownames(countMat) <- paste0(peaks$seqnames, ":", 
peaks$start, "-", peaks$end, "  ", peaks$symbol) peaks$id <- 
rownames(peaks)  #  SPARK R 
CODE   peaks <- (read.csv("/Atlas.txt",header = TRUE, sep = 
"\t")))sampleDict<- (read.csv("/160208.txt",header = TRUE, sep 
= "\t", stringsAsFactors = FALSE))countMat<-  
(read.csv("/cntMatrix.txt",header = TRUE, sep = "\t"))  
--- 
 COMMON CODE  for both -  
   countMat <- countMat[, sampleDict$sample]   colData 
<- sampleDict[,"group", drop = FALSE]   design <- ~ group   
  
    dds <- DESeqDataSetFromMatrix(countData = countMat, colData 
= colData, design = design) 
  This line gives error - dds <- 
DESeqDataSetFromMatrix(countData = countMat, colData =  (colData), design = 
design)  Error in DataFrame(colData, row.names = rownames(colData)) :   
  cannot coerce class "data.frame" to a DataFrame  
  I tried as.data.frame or using DataFrame to wrap the defs , but no 
luck.  What Can I do differently?  
  Thanks  Roni  
   


  

Re: Using sbt assembly

2016-02-18 Thread Brian London
You need to add the plugin to your plugins.sbt file not your build.sbt
file.  Also, I don't see a 0.13.9 version on Github.  0.14.2 is current.

On Thu, Feb 18, 2016 at 9:50 PM Arko Provo Mukherjee <
arkoprovomukher...@gmail.com> wrote:

> Hello,
>
> I am trying to use sbt assembly to generate a fat JAR.
>
> Here is my \project\assembly.sbt file:
> resolvers += Resolver.url("bintray-sbt-plugins",
> url("http://dl.bintray.com/sbt/sbt-plugin-releases
> "))(Resolver.ivyStylePatterns)
>
> addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.13.9")
>
>
> However, when I run sbt assembly I get the error:
> [error] (*:update) sbt.ResolveException: unresolved dependency:
> com.eed3si9n#sbt-assembly;0.13.9: not found
>
> Anyone faced this issue before?
>
> Thanks & regards
> Arko
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Using sbt assembly

2016-02-18 Thread Arko Provo Mukherjee
Hello,

I am trying to use sbt assembly to generate a fat JAR.

Here is my \project\assembly.sbt file:
resolvers += Resolver.url("bintray-sbt-plugins",
url("http://dl.bintray.com/sbt/sbt-plugin-releases;))(Resolver.ivyStylePatterns)

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.13.9")


However, when I run sbt assembly I get the error:
[error] (*:update) sbt.ResolveException: unresolved dependency:
com.eed3si9n#sbt-assembly;0.13.9: not found

Anyone faced this issue before?

Thanks & regards
Arko

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark History Server NOT showing Jobs with Hortonworks

2016-02-18 Thread Divya Gehlot
Hi Sutanu ,

When you run your spark shell
you would  see below lines in your console

16/02/18 21:43:53 INFO AbstractConnector: Started
SelectChannelConnector@0.0.0.0:4041
16/02/18 21:43:53 INFO Utils: Successfully started service 'SparkUI' on
port 4041.
16/02/18 21:43:54 INFO SparkUI: Started SparkUI at http://xx.xx.xx.xxx:4041

As In my case instead of default port the UI started at 4041 port .

Hope this helps.

Thanks,
Divya



On 19 February 2016 at 07:09, Mich Talebzadeh  wrote:

> Is 4040 port used in your host? It should be default
>
>
>
> Example
>
>
>
> *netstat -plten|grep 4040*
>
>
>
> tcp0  0 :::4040
> :::*LISTEN  1009   42748209   *22778*/java
>
>
>
> *ps -ef|grep 22778*
>
>
>
> hduser   22778 22770  0 08:34 pts/100:01:18 /usr/java/latest/bin/java
> -cp
> /home/hduser/jars/jconn4.jar:/home/hduser/jars/ojdbc6.jar:/usr/lib/spark-1.5.2-bin-hadoop2.6/conf/:/usr/lib/spark-1.5.2-bin-hadoop2.6/lib/spark-assembly-1.5.2-hadoop2.6.0.jar:/usr/lib/spark-1.5.2-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/usr/lib/spark-1.5.2-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/usr/lib/spark-1.5.2-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/home/hduser/hadoop-2.6.0/etc/hadoop/
> -Dscala.usejavacp=true -Xms1G -Xmx1G -XX:MaxPermSize=256m
> org.apache.spark.deploy.SparkSubmit --master spark://50.140.197.217:7077
> --class org.apache.spark.repl.Main --name Spark shell spark-shell
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> NOTE: The information in this email is proprietary and confidential. This
> message is for the designated recipient only, if you are not the intended
> recipient, you should destroy it immediately. Any information in this
> message shall not be understood as given or endorsed by Peridale Technology
> Ltd, its subsidiaries or their employees, unless expressly so stated. It is
> the responsibility of the recipient to ensure that this email is virus
> free, therefore neither Peridale Technology Ltd, its subsidiaries nor their
> employees accept any responsibility.
>
>
>
>
>
> *From:* Sutanu Das [mailto:sd2...@att.com]
> *Sent:* 18 February 2016 22:58
> *To:* Mich Talebzadeh ; user@spark.apache.org
>
> *Subject:* RE: Spark History Server NOT showing Jobs with Hortonworks
>
>
>
> Hi Mich, Community - Do I need to specify it in the properties file in my
> spark-submit ?
>
>
>
> *From:* Mich Talebzadeh [mailto:m...@peridale.co.uk ]
>
> *Sent:* Thursday, February 18, 2016 4:28 PM
> *To:* Sutanu Das; user@spark.apache.org
> *Subject:* RE: Spark History Server NOT showing Jobs with Hortonworks
>
>
>
> The jobs are normally shown under :4040/jobs/ in a normal set up
> not using any vendor’s flavoiur
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> NOTE: The information in this email is proprietary and confidential. This
> message is for the designated recipient only, if you are not the intended
> recipient, you should destroy it immediately. Any information in this
> message shall not be understood as given or endorsed by Peridale Technology
> Ltd, its subsidiaries or their employees, unless expressly so stated. It is
> the responsibility of the recipient to ensure that this email is virus
> free, therefore neither Peridale Technology Ltd, its subsidiaries nor their
> employees accept any responsibility.
>
>
>
>
>
> *From:* Sutanu Das [mailto:sd2...@att.com ]
> *Sent:* 18 February 2016 22:22
> *To:* user@spark.apache.org
> *Subject:* Spark History Server NOT showing Jobs with Hortonworks
>
>
>
> Hi Community,
>
>
>
> Challenged with Spark issues with *Hortonworks*  (HDP 2.3.2_Spark 1.4.1)
> – The Spark History Server is NOT showing the Spark Running Jobs in Local
> Mode
>
>
>
> The local-host:4040/app/v1 is ALSO not working
>
>
>
> How can I look at my local Spark job?
>
>
>
>
>
> # Generated by Apache Ambari. Fri Feb  5 00:37:06 2016
>
>
>
> spark.history.kerberos.keytab none
>
> spark.history.kerberos.principal none
>
> spark.history.provider
> org.apache.spark.deploy.yarn.history.YarnHistoryProvider
>
> spark.history.ui.port 18080
>
> spark.yarn.containerLauncherMaxThreads 25
>
> spark.yarn.driver.memoryOverhead 2048
>
> spark.yarn.executor.memoryOverhead 2048
>
> spark.yarn.historyServer.address has-dal-0001.corp.wayport.net:18080
>
> spark.yarn.max.executor.failures 3
>
> spark.yarn.preserve.staging.files false
>
> spark.yarn.queue default
>
> spark.yarn.scheduler.heartbeat.interval-ms 5000
>
> spark.yarn.services 

Spark JDBC connection - data writing success or failure cases

2016-02-18 Thread Divya Gehlot
Hi,
I am a Spark job which connects to RDBMS (in mycase its Oracle).
How can we check that complete data writing is successful?
Can I use commit in case of success or rollback in case of failure ?



Thanks,
Divya


StreamingKMeans does not update cluster centroid locations

2016-02-18 Thread ramach1776
I have streaming application wherein I train the model using a receiver input
stream in 4 sec batches

val stream = ssc.receiverStream(receiver) //receiver gets new data every
batch
model.trainOn(stream.map(Vectors.parse))
If I use
model.latestModel.clusterCenters.foreach(println)

the value of cluster centers remain unchanged from the very initial value it
got during first iteration (when the streaming app started)

when I use the model to predict cluster assignment with a labeled input the
assignments change over time as expected

  testData.transform {rdd =>
rdd.map(lp => (lp.label, model.latestModel().predict(lp.features)))
  }.print()










--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/StreamingKMeans-does-not-update-cluster-centroid-locations-tp26275.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: subtractByKey increases RDD size in memory - any ideas?

2016-02-18 Thread Andrew Ehrlich
There could be clues in the different RDD subclasses; rdd1 is
ParallelCollectionRDD but rdd3 is SubtractedRDD.

On Thu, Feb 18, 2016 at 1:37 PM, DaPsul  wrote:

> (copy from
>
> http://stackoverflow.com/questions/35467128/spark-subtractbykey-increases-rdd-cached-memory-size
> )
>
> I've found a very strange behavior for RDD's (spark 1.6.0 with scala 2.11):
>
> When i use subtractByKey on an RDD the resulting RDD should be of equal or
> smaller size. What i get is an RDD that takes even more space in memory:
>
> //Initialize first RDD
> val rdd1 = sc.parallelize(Array((1,1),(2,2),(3,3))).cache()
>
> //dummy action to cache it => size according to webgui: 184 Bytes
> rdd1.first
>
> //Initialize RDD to subtract (empty RDD should result in no change for
> rdd1)
> val rdd2 = sc.parallelize(Array[(Int,Int)]())
>
> //perform subtraction
> val rdd3 = rdd1.subtractByKey(rdd2).cache()
>
> //dummy action to cache rdd3 => size according to webgui: 208 Bytes
> rdd3.first
>
> I frist realized this strange behaviour for an RDD of ~200k rows and size
> 1.3 GB that scaled up to more than 2 GB after subtraction
>
> Edit: Tried the example above with more values(10k) => same behaviour. The
> size increases by ~1.6 times. Also reduceByKey seems to have a similar
> effect.
>
> When i create an RDD by
>
> sc.paralellize(rdd3.collect())
>
> the size is the same as for rdd3, so the increased size carries over even
> if
> it's extracted from RDD.
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/subtractByKey-increases-RDD-size-in-memory-any-ideas-tp26272.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Hive REGEXP_REPLACE use or equivalent in Spark

2016-02-18 Thread Andrew Ehrlich
Use the scala method .split(",") to split the string into a collection of
strings, and try using .replaceAll() on the field with the "?" to remove it.

On Thu, Feb 18, 2016 at 2:09 PM, Mich Talebzadeh 
wrote:

> Hi,
>
> What is the equivalent of this Hive statement in Spark
>
>
>
> select "?2,500.00", REGEXP_REPLACE("?2,500.00",'[^\\d\\.]','');
> ++--+--+
> |_c0 |   _c1|
> ++--+--+
> | ?2,500.00  | 2500.00  |
> ++--+--+
>
> Basically I want to get rid of "?" and "," in the csv file
>
>
>
> The full csv line is
>
>
>
> scala> csv2.first
> res94: String = 360,10/02/2014,"?2,500.00",?0.00,"?2,500.00"
>
> I want to transform that string into 5 columns and use "," as the split
>
> Thanks,
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> NOTE: The information in this email is proprietary and confidential. This
> message is for the designated recipient only, if you are not the intended
> recipient, you should destroy it immediately. Any information in this
> message shall not be understood as given or endorsed by Peridale Technology
> Ltd, its subsidiaries or their employees, unless expressly so stated. It is
> the responsibility of the recipient to ensure that this email is virus
> free, therefore neither Peridale Technology Ltd, its subsidiaries nor their
> employees accept any responsibility.
>
>
>
>
>


Re: spark 1.6 new memory management - some issues with tasks not using all executors

2016-02-18 Thread Koert Kuipers
looking at the cached rdd i see a similar story:
with useLegacyMode = true the cached rdd is spread out across 10 executors,
but with useLegacyMode = false the data for the cached rdd sits on only 3
executors (the rest all show 0s). my cached RDD is a key-value RDD that got
partitioned (hash partitioner, 50 partitions) before being cached.

On Thu, Feb 18, 2016 at 6:51 PM, Koert Kuipers  wrote:

> hello all,
> we are just testing a semi-realtime application (it should return results
> in less than 20 seconds from cached RDDs) on spark 1.6.0. before this it
> used to run on spark 1.5.1
>
> in spark 1.6.0 the performance is similar to 1.5.1 if i set
> spark.memory.useLegacyMode = true, however if i switch to
> spark.memory.useLegacyMode = false the queries take about 50% to 100% more
> time.
>
> the issue becomes clear when i focus on a single stage: the individual
> tasks are not slower at all, but they run on less executors.
> in my test query i have 50 tasks and 10 executors. both with useLegacyMode
> = true and useLegacyMode = false the tasks finish in about 3 seconds and
> show as running PROCESS_LOCAL. however when  useLegacyMode = false the
> tasks run on just 3 executors out of 10, while with useLegacyMode = true
> they spread out across 10 executors. all the tasks running on just a few
> executors leads to the slower results.
>
> any idea why this would happen?
> thanks! koert
>
>
>


spark 1.6 new memory management - some issues with tasks not using all executors

2016-02-18 Thread Koert Kuipers
hello all,
we are just testing a semi-realtime application (it should return results
in less than 20 seconds from cached RDDs) on spark 1.6.0. before this it
used to run on spark 1.5.1

in spark 1.6.0 the performance is similar to 1.5.1 if i set
spark.memory.useLegacyMode = true, however if i switch to
spark.memory.useLegacyMode = false the queries take about 50% to 100% more
time.

the issue becomes clear when i focus on a single stage: the individual
tasks are not slower at all, but they run on less executors.
in my test query i have 50 tasks and 10 executors. both with useLegacyMode
= true and useLegacyMode = false the tasks finish in about 3 seconds and
show as running PROCESS_LOCAL. however when  useLegacyMode = false the
tasks run on just 3 executors out of 10, while with useLegacyMode = true
they spread out across 10 executors. all the tasks running on just a few
executors leads to the slower results.

any idea why this would happen?
thanks! koert


Re: UDAF support for DataFrames in Spark 1.5.0?

2016-02-18 Thread Ted Yu
Richard:
Please see SPARK-9664 Use sqlContext.udf to register UDAFs

Cheers

On Thu, Feb 18, 2016 at 3:18 PM, Kabeer Ahmed 
wrote:

> I use Spark 1.5 with CDH5.5 distribution and I see that support is present
> for UDAF. From the link:
> https://databricks.com/blog/2015/09/16/spark-1-5-dataframe-api-highlights-datetimestring-handling-time-intervals-and-udafs.html,
> I read that this is an experimental feature. So it makes sense not to find
> this in the documentation.
>
> For confirmation whether it works in Spark 1.5 I quickly tried out the
> example in the link and it works. I hope this answers your question.
>
> Kabeer.
>
> On 18/02/16 16:31, Richard Cobbe wrote:
>
> I'm working on an application using DataFrames (Scala API) in Spark 1.5.0,
> and we need to define and use several custom aggregators.  I'm having
> trouble figuring out how to do this, however.
>
> First, which version of Spark did UDAF support land in?  Has it in fact
> landed at all?
> https://issues.apache.org/jira/browse/SPARK-3947 suggests that UDAFs should
> be available in 1.5.0.  However, the associated pull request includes
> classes like org.apache.spark.sql.UDAFRegistration, but these classes don't
> appear in the API docs, and I'm not able to use them from the spark shell
> ("type UDAFRegistration is not a member of package org.apache.spark.sql").
>
> I don't have access to a Spark 1.6.0 installation, but UDAFRegistration
> doesn't appear in the Scaladoc pages for 1.6.
>
> Second, assuming that this functionality is supported in some version of
> Spark, could someone point me to some documentation or an example that
> demonstrates how to define and use a custom aggregation function?
>
> Many thanks,
>
> Richard
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>


Re: Streaming with broadcast joins

2016-02-18 Thread Srikanth
Code with SQL broadcast hint. This worked and I was able to see that
broadcastjoin was performed.

val testDF = sqlContext.read.format("com.databricks.spark.csv")
   .schema(schema).load("file:///shared/data/test-data.txt")

val lines = ssc.socketTextStream("DevNode", )

lines.foreachRDD((rdd, timestamp) => {
   val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt,
l(1))).toDF()
   val resultDF = recordDF.join(testDF, "Age")

 
resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
}

But for every batch this file was read and broadcast was performed.
Evaluating the entire DAG I guess.
  16/02/18 12:24:02 INFO HadoopRDD: Input split:
file:/shared/data/test-data.txt:27+28
16/02/18 12:24:02 INFO HadoopRDD: Input split:
file:/shared/data/test-data.txt:0+27

16/02/18 12:25:00 INFO HadoopRDD: Input split:
file:/shared/data/test-data.txt:27+28
16/02/18 12:25:00 INFO HadoopRDD: Input split:
file:/shared/data/test-data.txt:0+27


Then I changed code to broadcast the dataframe. This didn't work either.
Not sure if this is what you meant by broadcasting a dataframe.

val testDF = sc.broadcast(sqlContext.read.format("com.databricks.spark.csv")
.schema(schema).load("file:///shared/data/test-data.txt")
 )

val lines = ssc.socketTextStream("DevNode", )

lines.foreachRDD((rdd, timestamp) => {
val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt,
l(1))).toDF()
val resultDF = recordDF.join(testDF.value, "Age")

 
resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
}


On Thu, Feb 18, 2016 at 12:55 PM, Sebastian Piu 
wrote:

> Can you paste the code where you use sc.broadcast ?
>
> On Thu, Feb 18, 2016 at 5:32 PM Srikanth  wrote:
>
>> Sebastian,
>>
>> I was able to broadcast using sql broadcast hint. Question is how to
>> prevent this broadcast for each RDD.
>> Is there a way where it can be broadcast once and used locally for each
>> RDD?
>> Right now every batch the metadata file is read and the DF is broadcasted.
>> I tried sc.broadcast and that did not provide this behavior.
>>
>> Srikanth
>>
>>
>> On Wed, Feb 17, 2016 at 4:53 PM, Sebastian Piu 
>> wrote:
>>
>>> You should be able to broadcast that data frame using sc.broadcast and
>>> join against it.
>>>
>>> On Wed, 17 Feb 2016, 21:13 Srikanth  wrote:
>>>
 Hello,

 I have a streaming use case where I plan to keep a dataset broadcasted
 and cached on each executor.
 Every micro batch in streaming will create a DF out of the RDD and join
 the batch.
 The below code will perform the broadcast operation for each RDD. Is
 there a way to broadcast it just once?

 Alternate approachs are also welcome.

 val DF1 = sqlContext.read.format("json").schema(schema1).load(file1)

 val metaDF =
 sqlContext.read.format("json").schema(schema1).load(file2)
   .join(DF1, "id")
 metaDF.cache


   val lines = streamingcontext.textFileStream(path)

   lines.foreachRDD( rdd => {
   val recordDF = rdd.flatMap(r => Record(r)).toDF()
   val joinedDF = recordDF.join(broadcast(metaDF), "id")

   joinedDF.rdd.foreachPartition ( partition => {
 partition.foreach( row => {
  ...
  ...
 })
   })
   })

  streamingcontext.start

 On a similar note, if the metaDF is too big for broadcast, can I
 partition it(df.repartition($"col")) and also partition each streaming RDD?
 This way I can avoid shuffling metaDF each time.

 Let me know you thoughts.

 Thanks


>>


Re: UDAF support for DataFrames in Spark 1.5.0?

2016-02-18 Thread Kabeer Ahmed
I use Spark 1.5 with CDH5.5 distribution and I see that support is present for 
UDAF. From the link: 
https://databricks.com/blog/2015/09/16/spark-1-5-dataframe-api-highlights-datetimestring-handling-time-intervals-and-udafs.html,
 I read that this is an experimental feature. So it makes sense not to find 
this in the documentation.

For confirmation whether it works in Spark 1.5 I quickly tried out the example 
in the link and it works. I hope this answers your question.

Kabeer.

On 18/02/16 16:31, Richard Cobbe wrote:

I'm working on an application using DataFrames (Scala API) in Spark 1.5.0,
and we need to define and use several custom aggregators.  I'm having
trouble figuring out how to do this, however.

First, which version of Spark did UDAF support land in?  Has it in fact
landed at all?

https://issues.apache.org/jira/browse/SPARK-3947 suggests that UDAFs should
be available in 1.5.0.  However, the associated pull request includes
classes like org.apache.spark.sql.UDAFRegistration, but these classes don't
appear in the API docs, and I'm not able to use them from the spark shell
("type UDAFRegistration is not a member of package org.apache.spark.sql").

I don't have access to a Spark 1.6.0 installation, but UDAFRegistration
doesn't appear in the Scaladoc pages for 1.6.

Second, assuming that this functionality is supported in some version of
Spark, could someone point me to some documentation or an example that
demonstrates how to define and use a custom aggregation function?

Many thanks,

Richard

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.org





RE: Spark History Server NOT showing Jobs with Hortonworks

2016-02-18 Thread Mich Talebzadeh
Is 4040 port used in your host? It should be default

 

Example

 

netstat -plten|grep 4040

 

tcp0  0 :::4040 :::*
LISTEN  1009   42748209   22778/java

 

ps -ef|grep 22778

 

hduser   22778 22770  0 08:34 pts/100:01:18 /usr/java/latest/bin/java
-cp
/home/hduser/jars/jconn4.jar:/home/hduser/jars/ojdbc6.jar:/usr/lib/spark-1.5
.2-bin-hadoop2.6/conf/:/usr/lib/spark-1.5.2-bin-hadoop2.6/lib/spark-assembly
-1.5.2-hadoop2.6.0.jar:/usr/lib/spark-1.5.2-bin-hadoop2.6/lib/datanucleus-co
re-3.2.10.jar:/usr/lib/spark-1.5.2-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2
.6.jar:/usr/lib/spark-1.5.2-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/h
ome/hduser/hadoop-2.6.0/etc/hadoop/ -Dscala.usejavacp=true -Xms1G -Xmx1G
-XX:MaxPermSize=256m org.apache.spark.deploy.SparkSubmit --master
spark://50.140.197.217:7077 --class org.apache.spark.repl.Main --name Spark
shell spark-shell

 

Dr Mich Talebzadeh

 

LinkedIn

https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUr
V8Pw

 

  http://talebzadehmich.wordpress.com

 

NOTE: The information in this email is proprietary and confidential. This
message is for the designated recipient only, if you are not the intended
recipient, you should destroy it immediately. Any information in this
message shall not be understood as given or endorsed by Peridale Technology
Ltd, its subsidiaries or their employees, unless expressly so stated. It is
the responsibility of the recipient to ensure that this email is virus free,
therefore neither Peridale Technology Ltd, its subsidiaries nor their
employees accept any responsibility.

 

 

From: Sutanu Das [mailto:sd2...@att.com] 
Sent: 18 February 2016 22:58
To: Mich Talebzadeh ; user@spark.apache.org
Subject: RE: Spark History Server NOT showing Jobs with Hortonworks

 

Hi Mich, Community - Do I need to specify it in the properties file in my
spark-submit ?

 

From: Mich Talebzadeh [mailto:m...@peridale.co.uk] 
Sent: Thursday, February 18, 2016 4:28 PM
To: Sutanu Das; user@spark.apache.org  
Subject: RE: Spark History Server NOT showing Jobs with Hortonworks

 

The jobs are normally shown under :4040/jobs/ in a normal set up
not using any vendor's flavoiur

 

Dr Mich Talebzadeh

 

LinkedIn

https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUr
V8Pw

 

  http://talebzadehmich.wordpress.com

 

NOTE: The information in this email is proprietary and confidential. This
message is for the designated recipient only, if you are not the intended
recipient, you should destroy it immediately. Any information in this
message shall not be understood as given or endorsed by Peridale Technology
Ltd, its subsidiaries or their employees, unless expressly so stated. It is
the responsibility of the recipient to ensure that this email is virus free,
therefore neither Peridale Technology Ltd, its subsidiaries nor their
employees accept any responsibility.

 

 

From: Sutanu Das [mailto:sd2...@att.com] 
Sent: 18 February 2016 22:22
To: user@spark.apache.org  
Subject: Spark History Server NOT showing Jobs with Hortonworks

 

Hi Community,

 

Challenged with Spark issues with Hortonworks  (HDP 2.3.2_Spark 1.4.1) - The
Spark History Server is NOT showing the Spark Running Jobs in Local Mode 

 

The local-host:4040/app/v1 is ALSO not working

 

How can I look at my local Spark job?

 

 

# Generated by Apache Ambari. Fri Feb  5 00:37:06 2016



spark.history.kerberos.keytab none

spark.history.kerberos.principal none

spark.history.provider
org.apache.spark.deploy.yarn.history.YarnHistoryProvider

spark.history.ui.port 18080

spark.yarn.containerLauncherMaxThreads 25

spark.yarn.driver.memoryOverhead 2048

spark.yarn.executor.memoryOverhead 2048

spark.yarn.historyServer.address has-dal-0001.corp.wayport.net:18080

spark.yarn.max.executor.failures 3

spark.yarn.preserve.staging.files false

spark.yarn.queue default

spark.yarn.scheduler.heartbeat.interval-ms 5000

spark.yarn.services org.apache.spark.deploy.yarn.history.YarnHistoryService

spark.yarn.submit.file.replication 3

 

History Server 

*   Timeline Service Location:
http://has-dal-0002.corp.wayport.net:8188/
*   Last Updated: Feb 18, 2016 10:09:12 PM UTC
*   Service Started: Feb 5, 2016 12:37:15 AM UTC
*   Current Time: Feb 18, 2016 10:10:46 PM UTC
*   Timeline Service: Timeline service is enabled
*   History Provider: Apache Hadoop YARN Timeline Service

 



RE: Spark History Server NOT showing Jobs with Hortonworks

2016-02-18 Thread Sutanu Das
Hi Mich, Community - Do I need to specify it in the properties file in my 
spark-submit ?

From: Mich Talebzadeh [mailto:m...@peridale.co.uk]
Sent: Thursday, February 18, 2016 4:28 PM
To: Sutanu Das; user@spark.apache.org
Subject: RE: Spark History Server NOT showing Jobs with Hortonworks

The jobs are normally shown under :4040/jobs/ in a normal set up not 
using any vendor's flavoiur

Dr Mich Talebzadeh

LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

http://talebzadehmich.wordpress.com

NOTE: The information in this email is proprietary and confidential. This 
message is for the designated recipient only, if you are not the intended 
recipient, you should destroy it immediately. Any information in this message 
shall not be understood as given or endorsed by Peridale Technology Ltd, its 
subsidiaries or their employees, unless expressly so stated. It is the 
responsibility of the recipient to ensure that this email is virus free, 
therefore neither Peridale Technology Ltd, its subsidiaries nor their employees 
accept any responsibility.


From: Sutanu Das [mailto:sd2...@att.com]
Sent: 18 February 2016 22:22
To: user@spark.apache.org
Subject: Spark History Server NOT showing Jobs with Hortonworks

Hi Community,

Challenged with Spark issues with Hortonworks  (HDP 2.3.2_Spark 1.4.1) - The 
Spark History Server is NOT showing the Spark Running Jobs in Local Mode

The local-host:4040/app/v1 is ALSO not working

How can I look at my local Spark job?


# Generated by Apache Ambari. Fri Feb  5 00:37:06 2016

spark.history.kerberos.keytab none
spark.history.kerberos.principal none
spark.history.provider org.apache.spark.deploy.yarn.history.YarnHistoryProvider
spark.history.ui.port 18080
spark.yarn.containerLauncherMaxThreads 25
spark.yarn.driver.memoryOverhead 2048
spark.yarn.executor.memoryOverhead 2048
spark.yarn.historyServer.address has-dal-0001.corp.wayport.net:18080
spark.yarn.max.executor.failures 3
spark.yarn.preserve.staging.files false
spark.yarn.queue default
spark.yarn.scheduler.heartbeat.interval-ms 5000
spark.yarn.services org.apache.spark.deploy.yarn.history.YarnHistoryService
spark.yarn.submit.file.replication 3

History Server

  *   Timeline Service Location: http://has-dal-0002.corp.wayport.net:8188/
  *   Last Updated: Feb 18, 2016 10:09:12 PM UTC
  *   Service Started: Feb 5, 2016 12:37:15 AM UTC
  *   Current Time: Feb 18, 2016 10:10:46 PM UTC
  *   Timeline Service: Timeline service is enabled
  *   History Provider: Apache Hadoop YARN Timeline Service



RE: JDBC based access to RDD

2016-02-18 Thread Mich Talebzadeh
Can you please clarify your point

 

Do you mean using JDBC to get data from other databases into Spark

 

 

val s = HiveContext.load("jdbc",

Map("url" -> _ORACLEserver,

"dbtable" -> "table”,

"user" -> _username,

"password" -> _password))

 

HTH

 

Dr Mich Talebzadeh

 

LinkedIn   

 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

  http://talebzadehmich.wordpress.com

 

NOTE: The information in this email is proprietary and confidential. This 
message is for the designated recipient only, if you are not the intended 
recipient, you should destroy it immediately. Any information in this message 
shall not be understood as given or endorsed by Peridale Technology Ltd, its 
subsidiaries or their employees, unless expressly so stated. It is the 
responsibility of the recipient to ensure that this email is virus free, 
therefore neither Peridale Technology Ltd, its subsidiaries nor their employees 
accept any responsibility.

 

 

From: Shyam Sarkar [mailto:ssarkarayushnet...@gmail.com] 
Sent: 18 February 2016 22:36
To: user@spark.apache.org
Subject: JDBC based access to RDD

 

Is there any good code example for JDBC based access to RDD ?

Thanks.



JDBC based access to RDD

2016-02-18 Thread Shyam Sarkar
Is there any good code example for JDBC based access to RDD ?
Thanks.


RE: Spark History Server NOT showing Jobs with Hortonworks

2016-02-18 Thread Mich Talebzadeh
The jobs are normally shown under :4040/jobs/ in a normal set up
not using any vendor's flavoiur

 

Dr Mich Talebzadeh

 

LinkedIn

https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUr
V8Pw

 

  http://talebzadehmich.wordpress.com

 

NOTE: The information in this email is proprietary and confidential. This
message is for the designated recipient only, if you are not the intended
recipient, you should destroy it immediately. Any information in this
message shall not be understood as given or endorsed by Peridale Technology
Ltd, its subsidiaries or their employees, unless expressly so stated. It is
the responsibility of the recipient to ensure that this email is virus free,
therefore neither Peridale Technology Ltd, its subsidiaries nor their
employees accept any responsibility.

 

 

From: Sutanu Das [mailto:sd2...@att.com] 
Sent: 18 February 2016 22:22
To: user@spark.apache.org
Subject: Spark History Server NOT showing Jobs with Hortonworks

 

Hi Community,

 

Challenged with Spark issues with Hortonworks  (HDP 2.3.2_Spark 1.4.1) - The
Spark History Server is NOT showing the Spark Running Jobs in Local Mode 

 

The local-host:4040/app/v1 is ALSO not working

 

How can I look at my local Spark job?

 

 

# Generated by Apache Ambari. Fri Feb  5 00:37:06 2016



spark.history.kerberos.keytab none

spark.history.kerberos.principal none

spark.history.provider
org.apache.spark.deploy.yarn.history.YarnHistoryProvider

spark.history.ui.port 18080

spark.yarn.containerLauncherMaxThreads 25

spark.yarn.driver.memoryOverhead 2048

spark.yarn.executor.memoryOverhead 2048

spark.yarn.historyServer.address has-dal-0001.corp.wayport.net:18080

spark.yarn.max.executor.failures 3

spark.yarn.preserve.staging.files false

spark.yarn.queue default

spark.yarn.scheduler.heartbeat.interval-ms 5000

spark.yarn.services org.apache.spark.deploy.yarn.history.YarnHistoryService

spark.yarn.submit.file.replication 3

 

History Server 

*   Timeline Service Location:
http://has-dal-0002.corp.wayport.net:8188/
*   Last Updated: Feb 18, 2016 10:09:12 PM UTC
*   Service Started: Feb 5, 2016 12:37:15 AM UTC
*   Current Time: Feb 18, 2016 10:10:46 PM UTC
*   Timeline Service: Timeline service is enabled
*   History Provider: Apache Hadoop YARN Timeline Service

 



Spark History Server NOT showing Jobs with Hortonworks

2016-02-18 Thread Sutanu Das
Hi Community,

Challenged with Spark issues with Hortonworks  (HDP 2.3.2_Spark 1.4.1) - The 
Spark History Server is NOT showing the Spark Running Jobs in Local Mode

The local-host:4040/app/v1 is ALSO not working

How can I look at my local Spark job?


# Generated by Apache Ambari. Fri Feb  5 00:37:06 2016

spark.history.kerberos.keytab none
spark.history.kerberos.principal none
spark.history.provider org.apache.spark.deploy.yarn.history.YarnHistoryProvider
spark.history.ui.port 18080
spark.yarn.containerLauncherMaxThreads 25
spark.yarn.driver.memoryOverhead 2048
spark.yarn.executor.memoryOverhead 2048
spark.yarn.historyServer.address has-dal-0001.corp.wayport.net:18080
spark.yarn.max.executor.failures 3
spark.yarn.preserve.staging.files false
spark.yarn.queue default
spark.yarn.scheduler.heartbeat.interval-ms 5000
spark.yarn.services org.apache.spark.deploy.yarn.history.YarnHistoryService
spark.yarn.submit.file.replication 3

History Server

  *   Timeline Service Location: http://has-dal-0002.corp.wayport.net:8188/
  *   Last Updated: Feb 18, 2016 10:09:12 PM UTC
  *   Service Started: Feb 5, 2016 12:37:15 AM UTC
  *   Current Time: Feb 18, 2016 10:10:46 PM UTC
  *   Timeline Service: Timeline service is enabled
  *   History Provider: Apache Hadoop YARN Timeline Service



Hive REGEXP_REPLACE use or equivalent in Spark

2016-02-18 Thread Mich Talebzadeh
Hi,

What is the equivalent of this Hive statement in Spark

 

select "?2,500.00", REGEXP_REPLACE("?2,500.00",'[^\\d\\.]','');
++--+--+
|_c0 |   _c1|
++--+--+
| ?2,500.00  | 2500.00  |
++--+--+

Basically I want to get rid of "?" and "," in the csv file

 

The full csv line is

 

scala> csv2.first
res94: String = 360,10/02/2014,"?2,500.00",?0.00,"?2,500.00"

I want to transform that string into 5 columns and use "," as the split

Thanks,

Dr Mich Talebzadeh

 

LinkedIn

https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUr
V8Pw

 

  http://talebzadehmich.wordpress.com

 

NOTE: The information in this email is proprietary and confidential. This
message is for the designated recipient only, if you are not the intended
recipient, you should destroy it immediately. Any information in this
message shall not be understood as given or endorsed by Peridale Technology
Ltd, its subsidiaries or their employees, unless expressly so stated. It is
the responsibility of the recipient to ensure that this email is virus free,
therefore neither Peridale Technology Ltd, its subsidiaries nor their
employees accept any responsibility.

 

 



subtractByKey increases RDD size in memory - any ideas?

2016-02-18 Thread DaPsul
(copy from
http://stackoverflow.com/questions/35467128/spark-subtractbykey-increases-rdd-cached-memory-size)

I've found a very strange behavior for RDD's (spark 1.6.0 with scala 2.11):

When i use subtractByKey on an RDD the resulting RDD should be of equal or
smaller size. What i get is an RDD that takes even more space in memory:

//Initialize first RDD
val rdd1 = sc.parallelize(Array((1,1),(2,2),(3,3))).cache()

//dummy action to cache it => size according to webgui: 184 Bytes
rdd1.first

//Initialize RDD to subtract (empty RDD should result in no change for rdd1)
val rdd2 = sc.parallelize(Array[(Int,Int)]())

//perform subtraction
val rdd3 = rdd1.subtractByKey(rdd2).cache()

//dummy action to cache rdd3 => size according to webgui: 208 Bytes
rdd3.first

I frist realized this strange behaviour for an RDD of ~200k rows and size
1.3 GB that scaled up to more than 2 GB after subtraction

Edit: Tried the example above with more values(10k) => same behaviour. The
size increases by ~1.6 times. Also reduceByKey seems to have a similar
effect.

When i create an RDD by

sc.paralellize(rdd3.collect())

the size is the same as for rdd3, so the increased size carries over even if
it's extracted from RDD.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/subtractByKey-increases-RDD-size-in-memory-any-ideas-tp26272.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Yarn client mode: Setting environment variables

2016-02-18 Thread Lin Zhao
Thanks for the reply. I also found that  sparkConf.setExecutorEnv works for 
yarn.

From: Saisai Shao >
Date: Wednesday, February 17, 2016 at 6:02 PM
To: Lin Zhao >
Cc: "user@spark.apache.org" 
>
Subject: Re: Yarn client mode: Setting environment variables

IIUC for example you want to set environment FOO=bar in executor side, you 
could use "spark.executor.Env.FOO=bar" in conf file, AM will pick this 
configuration and set as environment variable through container launching. Just 
list all the envs you want to set in executor side like spark.executor.xxx=xxx.

Thanks
Saisai

On Thu, Feb 18, 2016 at 3:31 AM, Lin Zhao 
> wrote:
I've been trying to set some environment variables for the spark executors but 
haven't had much like. I tried editting conf/spark-env.sh but it doesn't get 
through to the executors. I'm running 1.6.0 and yarn, any pointer is 
appreciated.

Thanks,
Lin



Re: Importing csv files into Hive ORC target table

2016-02-18 Thread Alex Dzhagriev
Hi Mich,

Try to use a regexp to parse your string instead of the split.

Thanks, Alex.

On Thu, Feb 18, 2016 at 6:35 PM, Mich Talebzadeh <
mich.talebza...@cloudtechnologypartners.co.uk> wrote:

>
>
> thanks,
>
>
>
> I have an issue here.
>
> define rdd to read the CSV file
>
> scala> var csv = sc.textFile("/data/stg/table2")
> csv: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[69] at textFile
> at :27
>
> I then get rid of the header
>
> scala> val csv2 = csv.mapPartitionsWithIndex { (idx, iter) => if (idx ==
> 0) iter.drop(1) else iter }
> csv2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[72] at
> mapPartitionsWithIndex at :29
>
> This is what I have now
>
> scala> csv.first
>
> res79: String = Invoice Number,Payment date,Net,VAT,Total
>
> *scala> csv2.first*
>
> *res80: String = 360,10/02/2014,"?2,500.00",?0.00,"?2,500.00"*
>
> Then I define a class based on the columns
>
> scala> case class Invoice(invoice: String, date: String, net: String, vat:
> String, total: String)
> defined class Invoice
>
> Next stage to map the data to their individual columns
>
> scala> val ttt = csv2.map(_.split(",")).map(p =>
> Invoice(p(0),p(1),p(2),p(3),p(4)))
> ttt: org.apache.spark.rdd.RDD[Invoice] = MapPartitionsRDD[74] at map at
> :33
>
> the problem now I have is that one column is missing
>
> *scala> ttt.first*
> *res81: Invoice = Invoice(360,10/02/2014,"?2,500.00",?0.00)*
>
> it seems that I am missing the last column here!
>
> I suspect the cause of the problem is the "," used in "?2,500.00" which is
> a money column of "£25" in excel.
>
> Any work around is appreciated.
>
> Thanks,
>
> Mich
>
>
>
>
>
> On 17/02/2016 22:58, Alex Dzhagriev wrote:
>
> Hi Mich,
>
> You can use data frames (
> http://spark.apache.org/docs/latest/sql-programming-guide.html#dataframes)
> to achieve that.
>
> val sqlContext = new HiveContext(sc)
>
> var rdd = sc.textFile("/data/stg/table2")
>
> //...
> //perform you business logic, cleanups, etc.
> //...
>
> sqlContext.createDataFrame(resultRdd).write.orc("..path..")
>
> Please, note that resultRdd should contain Products (e.g. case classes)
>
> Cheers, Alex.
>
>
>
> On Wed, Feb 17, 2016 at 11:43 PM, Mich Talebzadeh <
> mich.talebza...@cloudtechnologypartners.co.uk> wrote:
>
>> Hi,
>>
>> We put csv files that are zipped using bzip into a staging are on hdfs
>>
>> In Hive an external table is created as below:
>>
>> DROP TABLE IF EXISTS stg_t2;
>> CREATE EXTERNAL TABLE stg_t2 (
>>  INVOICENUMBER string
>> ,PAYMENTDATE string
>> ,NET string
>> ,VAT string
>> ,TOTAL string
>> )
>> COMMENT 'from csv file from excel sheet PayInsPeridaleTechnology'
>> ROW FORMAT serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
>> STORED AS TEXTFILE
>> LOCATION '/data/stg/table2'
>> TBLPROPERTIES ("skip.header.line.count"="1")
>>
>> We have an ORC table in Hive created as below:
>>
>>
>>
>> DROP TABLE IF EXISTS t2;
>> CREATE TABLE t2 (
>>  INVOICENUMBER  INT
>> ,PAYMENTDATEtimestamp
>> ,NETDECIMAL(20,2)
>> ,VATDECIMAL(20,2)
>> ,TOTAL  DECIMAL(20,2)
>> )
>> COMMENT 'from csv file from excel sheet PayInsPeridaleTechnology'
>> STORED AS ORC
>> TBLPROPERTIES ( "orc.compress"="ZLIB" )
>> ;
>>
>> Then we insert the data from the external table into target table do some
>> conversion and ignoring empty rows
>>
>> INSERT INTO TABLE t2
>> SELECT
>>   INVOICENUMBER
>> , CAST(UNIX_TIMESTAMP(paymentdate,'DD/MM/')*1000 as timestamp)
>> --, CAST(REGEXP_REPLACE(SUBSTR(net,2,20),",","") AS DECIMAL(20,2))
>> , CAST(REGEXP_REPLACE(net,'[^\\d\\.]','') AS DECIMAL(20,2))
>> , CAST(REGEXP_REPLACE(vat,'[^\\d\\.]','') AS DECIMAL(20,2))
>> , CAST(REGEXP_REPLACE(total,'[^\\d\\.]','') AS DECIMAL(20,2))
>> FROM
>> stg_t2
>>
>> This works OK for now.
>>
>>
>>
>> I was wondering whether this could be done using operations on rdd in
>> Spark?
>>
>> var rdd = sc.textFile("/data/stg/table2")
>>
>> I can use rdd.count to see the total rows and
>> rdd.collect.foreach(println) to see the individual rows
>>
>>
>>
>> I would like to get some ideas on how I can do CAST conversion etc on the
>> data to clean it up and store it in the said ORC table?
>>
>>
>>
>> Thanks
>>
>> --
>>
>> Dr Mich Talebzadeh
>>
>> LinkedIn  
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> http://talebzadehmich.wordpress.com
>>
>> NOTE: The information in this email is proprietary and confidential. This 
>> message is for the designated recipient only, if you are not the intended 
>> recipient, you should destroy it immediately. Any information in this 
>> message shall not be understood as given or endorsed by Cloud Technology 
>> Partners Ltd, its subsidiaries or their employees, unless expressly so 
>> stated. It is the responsibility of the recipient to ensure that this email 
>> is virus free, therefore neither Cloud Technology partners Ltd, its 
>> subsidiaries nor their employees 

Lazy executors

2016-02-18 Thread Bemaze
I'm running a regular Spark job with a cluster of 50 core instances and 100
executors. The work they are doing appears to be fairly evenly distributed,
however often I will see one or two of the executors appear to be doing no
work. They are listed as having tasks active and often those become very
long running tasks that are sometimes re-driven speculatively to other
executors. The timeline graphs shows them spending all that time
'computing'. I checked both the stdout and stderr logs of the executors and
couldn't see anything unusual other than they just weren't logging anything
after a certain point (they were doing work fine up to that point) and the
thread dump didn't really enlighten me as to what might be going on either,
they just appeared to be doing normal work.

Interestingly if I didn't change the configuration or source data at all,
the same executors seemed to get stuck in the same way in a completely
reproducible way.

Question 1: Is this a 'known' behavior of Spark?

Question 2: Is there an easy way to have the driver detect and restart these
stuck executors?

Extra information: Job running on an EMR cluster with Spark 1.6 with an
r3.2xlarge as driver and 50 i2.2xlarge core instances, 2 executors per
instance, 3 cores per executor, reading input data direct from S3.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Lazy-executors-tp26271.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to use a custom partitioner in a dataframe in Spark

2016-02-18 Thread Koert Kuipers
although it is not a bad idea to write data out partitioned, and then use a
merge join when reading it back in, this currently isn't even easily doable
with rdds because when you read an rdd from disk the partitioning info is
lost. re-introducing a partitioner at that point causes a shuffle defeating
the purpose.

On Thu, Feb 18, 2016 at 1:49 PM, Rishi Mishra  wrote:

> Michael,
> Is there any specific reason why DataFrames does not have partitioners
> like RDDs ? This will be very useful if one is writing custom datasources ,
> which keeps data in partitions. While storing data one can pre-partition
> the data at Spark level rather than at the datasource.
>
> Regards,
> Rishitesh Mishra,
> SnappyData . (http://www.snappydata.io/)
>
> https://in.linkedin.com/in/rishiteshmishra
>
> On Thu, Feb 18, 2016 at 3:50 AM, swetha kasireddy <
> swethakasire...@gmail.com> wrote:
>
>> So suppose I have a bunch of userIds and I need to save them as parquet
>> in database. I also need to load them back and need to be able to do a join
>> on userId. My idea is to partition by userId hashcode first and then on
>> userId. So that I don't have to deal with any performance issues because of
>> a number of small files and also to be able to scan faster.
>>
>>
>> Something like ...df.write.format("parquet").partitionBy( "userIdHash"
>> , "userId").mode(SaveMode.Append).save("userRecords");
>>
>> On Wed, Feb 17, 2016 at 2:16 PM, swetha kasireddy <
>> swethakasire...@gmail.com> wrote:
>>
>>> So suppose I have a bunch of userIds and I need to save them as parquet
>>> in database. I also need to load them back and need to be able to do a join
>>> on userId. My idea is to partition by userId hashcode first and then on
>>> userId.
>>>
>>>
>>>
>>> On Wed, Feb 17, 2016 at 11:51 AM, Michael Armbrust <
>>> mich...@databricks.com> wrote:
>>>
 Can you describe what you are trying to accomplish?  What would the
 custom partitioner be?

 On Tue, Feb 16, 2016 at 1:21 PM, SRK  wrote:

> Hi,
>
> How do I use a custom partitioner when I do a saveAsTable in a
> dataframe.
>
>
> Thanks,
> Swetha
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-a-custom-partitioner-in-a-dataframe-in-Spark-tp26240.html
> Sent from the Apache Spark User List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

>>>
>>
>


Re: SPARK-9559

2016-02-18 Thread Igor Berman
what are you trying to solve?
killing worker jvm is like killing yarn node manager...why would you do
this?
usually worker jvm is "agent" on each worker machine which opens executors
per each application, so it doesn't works hard or has big memory footprint
yes it can fail, but it rather corner situation which also might be handled
with monitoring/automatic restarts etc



On 18 February 2016 at 17:21, Daniel Darabos <
daniel.dara...@lynxanalytics.com> wrote:

> YARN may be a workaround.
>
> On Thu, Feb 18, 2016 at 4:13 PM, Ashish Soni 
> wrote:
>
>> Hi All ,
>>
>> Just wanted to know if there is any work around or resolution for below
>> issue in Stand alone mode
>>
>> https://issues.apache.org/jira/browse/SPARK-9559
>>
>> Ashish
>>
>
>


Access to broadcasted variable

2016-02-18 Thread jeff saremi




I'd like to know if the broadcasted object gets serialized when accessed by the 
executor during the execution of a task?
I know that it gets serialized from the driver to the worker. This question is 
inside worker when executor JVM's are accessing it

thanks
Jeff
  

Re: How to use a custom partitioner in a dataframe in Spark

2016-02-18 Thread Rishi Mishra
Michael,
Is there any specific reason why DataFrames does not have partitioners like
RDDs ? This will be very useful if one is writing custom datasources ,
which keeps data in partitions. While storing data one can pre-partition
the data at Spark level rather than at the datasource.

Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra

On Thu, Feb 18, 2016 at 3:50 AM, swetha kasireddy  wrote:

> So suppose I have a bunch of userIds and I need to save them as parquet in
> database. I also need to load them back and need to be able to do a join
> on userId. My idea is to partition by userId hashcode first and then on
> userId. So that I don't have to deal with any performance issues because of
> a number of small files and also to be able to scan faster.
>
>
> Something like ...df.write.format("parquet").partitionBy( "userIdHash"
> , "userId").mode(SaveMode.Append).save("userRecords");
>
> On Wed, Feb 17, 2016 at 2:16 PM, swetha kasireddy <
> swethakasire...@gmail.com> wrote:
>
>> So suppose I have a bunch of userIds and I need to save them as parquet
>> in database. I also need to load them back and need to be able to do a join
>> on userId. My idea is to partition by userId hashcode first and then on
>> userId.
>>
>>
>>
>> On Wed, Feb 17, 2016 at 11:51 AM, Michael Armbrust <
>> mich...@databricks.com> wrote:
>>
>>> Can you describe what you are trying to accomplish?  What would the
>>> custom partitioner be?
>>>
>>> On Tue, Feb 16, 2016 at 1:21 PM, SRK  wrote:
>>>
 Hi,

 How do I use a custom partitioner when I do a saveAsTable in a
 dataframe.


 Thanks,
 Swetha



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-a-custom-partitioner-in-a-dataframe-in-Spark-tp26240.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


>>>
>>
>


Re: SparkConf does not work for spark.driver.memory

2016-02-18 Thread Marcelo Vanzin
On Thu, Feb 18, 2016 at 10:26 AM, wgtmac  wrote:
> In the code, I did following:
> val sc = new SparkContext(new
> SparkConf().setAppName("test").set("spark.driver.memory", "4g"))

You can't set the driver memory like this, in any deploy mode. When
that code runs, the driver is already running, so there's no way to
modify the JVM's command line options at that time.

-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



SparkConf does not work for spark.driver.memory

2016-02-18 Thread wgtmac
Hi

I'm using spark 1.5.1. But I encountered a problem using SparkConf to set
spark.driver.memory in yarn-cluster mode.

Example 1:

In the code, I did following:
val sc = new SparkContext(new
SparkConf().setAppName("test").set("spark.driver.memory", "4g"))

And used following command to submit job:

YARN_CONF_DIR=/etc/hadoop/conf ./bin/spark-submit \
--class path.to.className \
--jars jarName \
--queue default \
--num-executors 4 \
--conf spark.eventLog.overwrite=true \
--conf spark.eventLog.enabled=true \
--conf spark.eventLog.dir=hdfs://localhost:port \
--conf spark.yarn.historyServer.address=http://host:port 

Although in the webUI spark.driver.memory is 4g but actually driver memory
is 1g (default value)
And my job failed due to shortage of driver memory which throws exceeding GC
limit exception.

Example 2:
Almost same configuration except I added "--driver-memory 2g" at the end of
the command.
Then in the webUI spark.driver.memory is 4g but actually driver memory is 2g
now.
My job succeeded which proves the driver memory is different with example 1.

So my question is that: In yarn-cluster mode, is it ineffective to use
SparkConf to set spark.driver.memory?
Thanks!





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkConf-does-not-work-for-spark-driver-memory-tp26270.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: adding a split and union to a streaming application cause big performance hit

2016-02-18 Thread ramach1776
bq. streamingContext.remember("duration") did not help

Can you give a bit more detail on the above ?
Did you mean the job encountered OOME later on ?

Which Spark release are you using ?

 tried these 2 global settings (and restarted the app) after enabling cache
for stream1
conf.set("spark.streaming.unpersist", "true")

streamingContext.remember(Seconds(batchDuration * 4))

batch duration is 4 sec

Using spark-1.4.1. The application runs for about 4-5 hrs then see out of
memory error





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/adding-a-split-and-union-to-a-streaming-application-cause-big-performance-hit-tp26259p26269.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Streaming with broadcast joins

2016-02-18 Thread Sebastian Piu
Can you paste the code where you use sc.broadcast ?

On Thu, Feb 18, 2016 at 5:32 PM Srikanth  wrote:

> Sebastian,
>
> I was able to broadcast using sql broadcast hint. Question is how to
> prevent this broadcast for each RDD.
> Is there a way where it can be broadcast once and used locally for each
> RDD?
> Right now every batch the metadata file is read and the DF is broadcasted.
> I tried sc.broadcast and that did not provide this behavior.
>
> Srikanth
>
>
> On Wed, Feb 17, 2016 at 4:53 PM, Sebastian Piu 
> wrote:
>
>> You should be able to broadcast that data frame using sc.broadcast and
>> join against it.
>>
>> On Wed, 17 Feb 2016, 21:13 Srikanth  wrote:
>>
>>> Hello,
>>>
>>> I have a streaming use case where I plan to keep a dataset broadcasted
>>> and cached on each executor.
>>> Every micro batch in streaming will create a DF out of the RDD and join
>>> the batch.
>>> The below code will perform the broadcast operation for each RDD. Is
>>> there a way to broadcast it just once?
>>>
>>> Alternate approachs are also welcome.
>>>
>>> val DF1 = sqlContext.read.format("json").schema(schema1).load(file1)
>>>
>>> val metaDF =
>>> sqlContext.read.format("json").schema(schema1).load(file2)
>>>   .join(DF1, "id")
>>> metaDF.cache
>>>
>>>
>>>   val lines = streamingcontext.textFileStream(path)
>>>
>>>   lines.foreachRDD( rdd => {
>>>   val recordDF = rdd.flatMap(r => Record(r)).toDF()
>>>   val joinedDF = recordDF.join(broadcast(metaDF), "id")
>>>
>>>   joinedDF.rdd.foreachPartition ( partition => {
>>> partition.foreach( row => {
>>>  ...
>>>  ...
>>> })
>>>   })
>>>   })
>>>
>>>  streamingcontext.start
>>>
>>> On a similar note, if the metaDF is too big for broadcast, can I
>>> partition it(df.repartition($"col")) and also partition each streaming RDD?
>>> This way I can avoid shuffling metaDF each time.
>>>
>>> Let me know you thoughts.
>>>
>>> Thanks
>>>
>>>
>


Importing csv files into Hive ORC target table

2016-02-18 Thread Mich Talebzadeh
 

> thanks, 
> 
> I have an issue here. 
> 
> define rdd to read the CSV file 
> 
> scala> var csv = sc.textFile("/data/stg/table2")
> csv: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[69] at textFile at 
> :27 
> 
> I then get rid of the header 
> 
> scala> val csv2 = csv.mapPartitionsWithIndex { (idx, iter) => if (idx == 0) 
> iter.drop(1) else iter }
> csv2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[72] at 
> mapPartitionsWithIndex at :29 
> 
> This is what I have now 
> 
> scala> csv.first 
> 
> res79: String = Invoice Number,Payment date,Net,VAT,Total 
> 
> SCALA> CSV2.FIRST 
> 
> RES80: STRING = 360,10/02/2014,"?2,500.00",?0.00,"?2,500.00" 
> 
> Then I define a class based on the columns 
> 
> scala> case class Invoice(invoice: String, date: String, net: String, vat: 
> String, total: String)
> defined class Invoice 
> 
> Next stage to map the data to their individual columns 
> 
> scala> val ttt = csv2.map(_.split(",")).map(p => 
> Invoice(p(0),p(1),p(2),p(3),p(4)))
> ttt: org.apache.spark.rdd.RDD[Invoice] = MapPartitionsRDD[74] at map at 
> :33 
> 
> the problem now I have is that one column is missing 
> 
> SCALA> TTT.FIRST
> RES81: INVOICE = INVOICE(360,10/02/2014,"?2,500.00",?0.00) 
> 
> it seems that I am missing the last column here! 
> 
> I suspect the cause of the problem is the "," used in "?2,500.00" which is a 
> money column of "£25" in excel. 
> 
> Any work around is appreciated. 
> 
> Thanks, 
> 
> Mich 
> 
> On 17/02/2016 22:58, Alex Dzhagriev wrote: 
> Hi Mich, 
> 
> You can use data frames 
> (http://spark.apache.org/docs/latest/sql-programming-guide.html#dataframes 
> [3]) to achieve that. 
> 
> val sqlContext = new HiveContext(sc) 
> 
> var rdd = sc.textFile("/data/stg/table2") 
> 
> //... 
> //perform you business logic, cleanups, etc. 
> //... 
> 
> sqlContext.createDataFrame(resultRdd).write.orc("..path..") 
> 
> Please, note that resultRdd should contain Products (e.g. case classes) 
> 
> Cheers, Alex. 
> 
> On Wed, Feb 17, 2016 at 11:43 PM, Mich Talebzadeh 
>  wrote:
> 
> Hi, 
> 
> We put csv files that are zipped using bzip into a staging are on hdfs 
> 
> In Hive an external table is created as below: 
> 
> DROP TABLE IF EXISTS stg_t2;
> CREATE EXTERNAL TABLE stg_t2 (
> INVOICENUMBER string
> ,PAYMENTDATE string
> ,NET string
> ,VAT string
> ,TOTAL string
> )
> COMMENT 'from csv file from excel sheet PayInsPeridaleTechnology'
> ROW FORMAT serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
> STORED AS TEXTFILE
> LOCATION '/data/stg/table2'
> TBLPROPERTIES ("skip.header.line.count"="1") 
> 
> We have an ORC table in Hive created as below: 
> 
> DROP TABLE IF EXISTS t2;
> CREATE TABLE t2 (
> INVOICENUMBER INT
> ,PAYMENTDATE timestamp
> ,NET DECIMAL(20,2)
> ,VAT DECIMAL(20,2)
> ,TOTAL DECIMAL(20,2)
> )
> COMMENT 'from csv file from excel sheet PayInsPeridaleTechnology'
> STORED AS ORC
> TBLPROPERTIES ( "orc.compress"="ZLIB" )
> ; 
> 
> Then we insert the data from the external table into target table do some 
> conversion and ignoring empty rows 
> 
> INSERT INTO TABLE t2
> SELECT
> INVOICENUMBER
> , CAST(UNIX_TIMESTAMP(paymentdate,'DD/MM/')*1000 as timestamp)
> --, CAST(REGEXP_REPLACE(SUBSTR(net,2,20),",","") AS DECIMAL(20,2))
> , CAST(REGEXP_REPLACE(net,'[^\d\.]','') AS DECIMAL(20,2))
> , CAST(REGEXP_REPLACE(vat,'[^\d\.]','') AS DECIMAL(20,2))
> , CAST(REGEXP_REPLACE(total,'[^\d\.]','') AS DECIMAL(20,2))
> FROM
> stg_t2 
> 
> This works OK for now. 
> 
> I was wondering whether this could be done using operations on rdd in Spark? 
> 
> var rdd = sc.textFile("/data/stg/table2") 
> 
> I can use rdd.count to see the total rows and rdd.collect.foreach(println) to 
> see the individual rows 
> 
> I would like to get some ideas on how I can do CAST conversion etc on the 
> data to clean it up and store it in the said ORC table? 
> 
> Thanks 
> 
> -- 
> 
> Dr Mich Talebzadeh
> 
> LinkedIn 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  [1]
> 
> http://talebzadehmich.wordpress.com [2]
> 
> NOTE: The information in this email is proprietary and confidential. This 
> message is for the designated recipient only, if you are not the intended 
> recipient, you should destroy it immediately. Any information in this message 
> shall not be understood as given or endorsed by Cloud Technology Partners 
> Ltd, its subsidiaries or their employees, unless expressly so stated. It is 
> the responsibility of the recipient to ensure that this email is virus free, 
> therefore neither Cloud Technology partners Ltd, its subsidiaries nor their 
> employees accept any responsibility.

-- 

Dr Mich Talebzadeh

LinkedIn
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

http://talebzadehmich.wordpress.com

NOTE: The information in this email is proprietary and confidential.
This message is for the designated recipient only, if you are not the
intended recipient, 

Re: Streaming with broadcast joins

2016-02-18 Thread Srikanth
Sebastian,

I was able to broadcast using sql broadcast hint. Question is how to
prevent this broadcast for each RDD.
Is there a way where it can be broadcast once and used locally for each RDD?
Right now every batch the metadata file is read and the DF is broadcasted.
I tried sc.broadcast and that did not provide this behavior.

Srikanth


On Wed, Feb 17, 2016 at 4:53 PM, Sebastian Piu 
wrote:

> You should be able to broadcast that data frame using sc.broadcast and
> join against it.
>
> On Wed, 17 Feb 2016, 21:13 Srikanth  wrote:
>
>> Hello,
>>
>> I have a streaming use case where I plan to keep a dataset broadcasted
>> and cached on each executor.
>> Every micro batch in streaming will create a DF out of the RDD and join
>> the batch.
>> The below code will perform the broadcast operation for each RDD. Is
>> there a way to broadcast it just once?
>>
>> Alternate approachs are also welcome.
>>
>> val DF1 = sqlContext.read.format("json").schema(schema1).load(file1)
>>
>> val metaDF =
>> sqlContext.read.format("json").schema(schema1).load(file2)
>>   .join(DF1, "id")
>> metaDF.cache
>>
>>
>>   val lines = streamingcontext.textFileStream(path)
>>
>>   lines.foreachRDD( rdd => {
>>   val recordDF = rdd.flatMap(r => Record(r)).toDF()
>>   val joinedDF = recordDF.join(broadcast(metaDF), "id")
>>
>>   joinedDF.rdd.foreachPartition ( partition => {
>> partition.foreach( row => {
>>  ...
>>  ...
>> })
>>   })
>>   })
>>
>>  streamingcontext.start
>>
>> On a similar note, if the metaDF is too big for broadcast, can I
>> partition it(df.repartition($"col")) and also partition each streaming RDD?
>> This way I can avoid shuffling metaDF each time.
>>
>> Let me know you thoughts.
>>
>> Thanks
>>
>>


Re: equalTo isin not working as expected with a constructed column with DataFrames

2016-02-18 Thread Mehdi Ben Haj Abbes
Hi,
I forgot to mention that I'm using the 1.5.1 version.
Regards,

On Thu, Feb 18, 2016 at 4:20 PM, Mehdi Ben Haj Abbes 
wrote:

> Hi folks,
>
> I have DataFrame with let's say this schema :
> -dealId,
> -ptf,
> -ts
> from it I derive another dataframe (lets call it df) to which I add an
> extra column (withColumn) which is the concatenation of the 3 existing
> columns and I call it (the new column) "theone"
>
> When I print the schema for the new dataframe "theone" column has a String
> type. And when I do
> df.where(df.col("theone").equalTo("nonExistantValue")).toJavaRDD.count well
> I get the initial size of df as if the filtring did not work. but If I do
> the same query but filtring on one of the original columns I get what is
> expected as count which 0
>
> The same goes for isin
>
> Any help will be more than appreciated.
>
> Best regards,
>
>
> --
> Mehdi BEN HAJ ABBES
>
>


-- 
Mehdi BEN HAJ ABBES


Re: SPARK REST API on YARN

2016-02-18 Thread Ricardo Paiva
You can use the yarn proxy:

http://
:8088/proxy//api/v1/applications//executors

I have an scala application that monitor the number of executors of some
spark streamings, and I had a similar problem, where I iterate over the
running jobs and get the number of executors:

val states = EnumSet.of(
  YarnApplicationState.RUNNING,
  YarnApplicationState.ACCEPTED)
val it = yarnClient.getApplications(states).iterator()
while (it.hasNext()) {
val app = it.next()
val strUrl = appReport.getTrackingUrl + "api/v1/applications/" +
appReport.getName + "/executors"
val url = new URL(strUrl)
val urlCon = url.openConnection()
val content = fromInputStream(urlCon.getInputStream)
.getLines.mkString("\n")
val j = JSON.parseFull(content)
val currentExecutors = j.get.asInstanceOf[List[Map[String,
String]]].filterNot(_("id") == "driver").size


Regards,

Ricardo

On Thu, Feb 18, 2016 at 1:56 PM, alvarobrandon [via Apache Spark User List]
 wrote:

> Hello:
>
> I wanted to access the REST API (
> http://spark.apache.org/docs/latest/monitoring.html#rest-api) of Spark to
> monitor my jobs. However I'm running my Spark Apps over YARN. When I try to
> make a request to http://localhost:4040/api/v1 as the documentation says
> I don't get any response. My question is. It is possible to access this
> REST API when you are not using Spark in Standalone mode?
>
> Thanks in advance
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/SPARK-REST-API-on-YARN-tp26267.html
> To start a new topic under Apache Spark User List, email
> ml-node+s1001560n1...@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here
> 
> .
> NAML
> 
>



-- 
Ricardo Paiva
Big Data / Semântica
2483-6432
*globo.com* 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SPARK-REST-API-on-YARN-tp26267p26268.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Is this likely to cause any problems?

2016-02-18 Thread James Hammerton
I have now... So far  I think the issues I've had are not related to this,
but I wanted to be sure in case it should be something that needs to be
patched. I've had some jobs run successfully but this warning appears in
the logs.

Regards,

James

On 18 February 2016 at 12:23, Ted Yu  wrote:

> Have you seen this ?
>
> HADOOP-10988
>
> Cheers
>
> On Thu, Feb 18, 2016 at 3:39 AM, James Hammerton  wrote:
>
>> HI,
>>
>> I am seeing warnings like this in the logs when I run Spark jobs:
>>
>> OpenJDK 64-Bit Server VM warning: You have loaded library 
>> /root/ephemeral-hdfs/lib/native/libhadoop.so.1.0.0 which might have disabled 
>> stack guard. The VM will try to fix the stack guard now.
>> It's highly recommended that you fix the library with 'execstack -c 
>> ', or link it with '-z noexecstack'.
>>
>>
>> I used spark-ec2 to launch the cluster with the default AMI, Spark 1.5.2,
>> hadoop major version 2.4. I altered the jdk to be openjdk 8 as I'd written
>> some jobs in Java 8. The 6 workers nodes are m4.2xlarge and master is
>> m4.large.
>>
>> Could this contribute to any problems running the jobs?
>>
>> Regards,
>>
>> James
>>
>
>


Re: Is this likely to cause any problems?

2016-02-18 Thread James Hammerton
I'm fairly new to Spark.

The documentation suggests using the spark-ec2 script to launch clusters in
AWS, hence I used it.

Would EMR offer any advantage?

Regards,

James


On 18 February 2016 at 14:04, Gourav Sengupta 
wrote:

> Hi,
>
> Just out of sheet curiosity why are you not using EMR to start your SPARK
> cluster?
>
>
> Regards,
> Gourav
>
> On Thu, Feb 18, 2016 at 12:23 PM, Ted Yu  wrote:
>
>> Have you seen this ?
>>
>> HADOOP-10988
>>
>> Cheers
>>
>> On Thu, Feb 18, 2016 at 3:39 AM, James Hammerton  wrote:
>>
>>> HI,
>>>
>>> I am seeing warnings like this in the logs when I run Spark jobs:
>>>
>>> OpenJDK 64-Bit Server VM warning: You have loaded library 
>>> /root/ephemeral-hdfs/lib/native/libhadoop.so.1.0.0 which might have 
>>> disabled stack guard. The VM will try to fix the stack guard now.
>>> It's highly recommended that you fix the library with 'execstack -c 
>>> ', or link it with '-z noexecstack'.
>>>
>>>
>>> I used spark-ec2 to launch the cluster with the default AMI, Spark
>>> 1.5.2, hadoop major version 2.4. I altered the jdk to be openjdk 8 as I'd
>>> written some jobs in Java 8. The 6 workers nodes are m4.2xlarge and master
>>> is m4.large.
>>>
>>> Could this contribute to any problems running the jobs?
>>>
>>> Regards,
>>>
>>> James
>>>
>>
>>
>


UDAF support for DataFrames in Spark 1.5.0?

2016-02-18 Thread Richard Cobbe
I'm working on an application using DataFrames (Scala API) in Spark 1.5.0,
and we need to define and use several custom aggregators.  I'm having
trouble figuring out how to do this, however.

First, which version of Spark did UDAF support land in?  Has it in fact
landed at all?

https://issues.apache.org/jira/browse/SPARK-3947 suggests that UDAFs should
be available in 1.5.0.  However, the associated pull request includes
classes like org.apache.spark.sql.UDAFRegistration, but these classes don't
appear in the API docs, and I'm not able to use them from the spark shell
("type UDAFRegistration is not a member of package org.apache.spark.sql").

I don't have access to a Spark 1.6.0 installation, but UDAFRegistration
doesn't appear in the Scaladoc pages for 1.6.

Second, assuming that this functionality is supported in some version of
Spark, could someone point me to some documentation or an example that
demonstrates how to define and use a custom aggregation function?

Many thanks,

Richard

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming with Kafka Use Case

2016-02-18 Thread Cody Koeninger
If by smaller block interval you mean the value in seconds passed to the
streaming context constructor, no.  You'll still get everything from the
starting offset until now in the first batch.

On Thu, Feb 18, 2016 at 10:02 AM, praveen S  wrote:

> Sorry.. Rephrasing :
> Can this issue be resolved by having a smaller block interval?
>
> Regards,
> Praveen
> On 18 Feb 2016 21:30, "praveen S"  wrote:
>
>> Can having a smaller block interval only resolve this?
>>
>> Regards,
>> Praveen
>> On 18 Feb 2016 21:13, "Cody Koeninger"  wrote:
>>
>>> Backpressure won't help you with the first batch, you'd need 
>>> spark.streaming.kafka.maxRatePerPartition
>>> for that
>>>
>>> On Thu, Feb 18, 2016 at 9:40 AM, praveen S  wrote:
>>>
 Have a look at

 spark.streaming.backpressure.enabled
 Property

 Regards,
 Praveen
 On 18 Feb 2016 00:13, "Abhishek Anand"  wrote:

> I have a spark streaming application running in production. I am
> trying to find a solution for a particular use case when my application 
> has
> a downtime of say 5 hours and is restarted. Now, when I start my streaming
> application after 5 hours there would be considerable amount of data then
> in the Kafka and my cluster would be unable to repartition and process 
> that.
>
> Is there any workaround so that when my streaming application starts
> it starts taking data for 1-2 hours, process it , then take the data for
> next 1 hour process it. Now when its done processing of previous 5 hours
> data which missed, normal streaming should start with the given slide
> interval.
>
> Please suggest any ideas and feasibility of this.
>
>
> Thanks !!
> Abhi
>

>>>


Re: Spark Streaming with Kafka Use Case

2016-02-18 Thread praveen S
Sorry.. Rephrasing :
Can this issue be resolved by having a smaller block interval?

Regards,
Praveen
On 18 Feb 2016 21:30, "praveen S"  wrote:

> Can having a smaller block interval only resolve this?
>
> Regards,
> Praveen
> On 18 Feb 2016 21:13, "Cody Koeninger"  wrote:
>
>> Backpressure won't help you with the first batch, you'd need 
>> spark.streaming.kafka.maxRatePerPartition
>> for that
>>
>> On Thu, Feb 18, 2016 at 9:40 AM, praveen S  wrote:
>>
>>> Have a look at
>>>
>>> spark.streaming.backpressure.enabled
>>> Property
>>>
>>> Regards,
>>> Praveen
>>> On 18 Feb 2016 00:13, "Abhishek Anand"  wrote:
>>>
 I have a spark streaming application running in production. I am trying
 to find a solution for a particular use case when my application has a
 downtime of say 5 hours and is restarted. Now, when I start my streaming
 application after 5 hours there would be considerable amount of data then
 in the Kafka and my cluster would be unable to repartition and process 
 that.

 Is there any workaround so that when my streaming application starts it
 starts taking data for 1-2 hours, process it , then take the data for next
 1 hour process it. Now when its done processing of previous 5 hours data
 which missed, normal streaming should start with the given slide interval.

 Please suggest any ideas and feasibility of this.


 Thanks !!
 Abhi

>>>
>>


Re: Spark Streaming with Kafka Use Case

2016-02-18 Thread praveen S
Can having a smaller block interval only resolve this?

Regards,
Praveen
On 18 Feb 2016 21:13, "Cody Koeninger"  wrote:

> Backpressure won't help you with the first batch, you'd need 
> spark.streaming.kafka.maxRatePerPartition
> for that
>
> On Thu, Feb 18, 2016 at 9:40 AM, praveen S  wrote:
>
>> Have a look at
>>
>> spark.streaming.backpressure.enabled
>> Property
>>
>> Regards,
>> Praveen
>> On 18 Feb 2016 00:13, "Abhishek Anand"  wrote:
>>
>>> I have a spark streaming application running in production. I am trying
>>> to find a solution for a particular use case when my application has a
>>> downtime of say 5 hours and is restarted. Now, when I start my streaming
>>> application after 5 hours there would be considerable amount of data then
>>> in the Kafka and my cluster would be unable to repartition and process that.
>>>
>>> Is there any workaround so that when my streaming application starts it
>>> starts taking data for 1-2 hours, process it , then take the data for next
>>> 1 hour process it. Now when its done processing of previous 5 hours data
>>> which missed, normal streaming should start with the given slide interval.
>>>
>>> Please suggest any ideas and feasibility of this.
>>>
>>>
>>> Thanks !!
>>> Abhi
>>>
>>
>


Re: Is stddev not a supported aggregation function in SparkSQL WindowSpec?

2016-02-18 Thread Mich Talebzadeh
 

On 18/02/2016 11:47, Mich Talebzadeh wrote: 

> It is available in Hive as well 
> 
> You can of course write your own standard deviation function 
> 
> For example sttdev for column amount_sold cann be expressed as 
> 
> SQRT((SUM(POWER(amount_sold,2))-(COUNT(1)*POWER(AVG(amount_sold),2)))/(COUNT(1)-1))
>  
> 
> COUNT(1) is the total records in the sample/table. 
> 
> HTH 
> 
> On 18/02/2016 11:03, rok wrote: 
> There is a stddev function since 1.6: 
> http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.stddev
>  [1] 
> 
> If you are using spark < 1.6 you can write your own more or less easily. 
> 
> On Wed, Feb 17, 2016 at 5:06 PM, mayx [via Apache Spark User List] <[hidden 
> email]> wrote:
> I'd like to use standard deviation over window partitions on the Spark 
> dataframe, but it didn't work. Is it not supported? Looks like it supports 
> many aggregation functions, such as mean, min, etc. How can I make a feature 
> request for this? 
> 
> -
> 
> If you reply to this email, your message will be added to the discussion 
> below: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Is-stddev-not-a-supported-aggregation-function-in-SparkSQL-WindowSpec-tp26250.html
>  [2] 
> To start a new topic under Apache Spark User List, email [hidden email] 
> To unsubscribe from Apache Spark User List, click here.
> NAML [3] 
> 
> -
> View this message in context: Re: Is stddev not a supported aggregation 
> function in SparkSQL WindowSpec? [4]
> Sent from the Apache Spark User List mailing list archive [5] at Nabble.com.

-- 

Dr Mich Talebzadeh

LinkedIn
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

http://talebzadehmich.wordpress.com

NOTE: The information in this email is proprietary and confidential.
This message is for the designated recipient only, if you are not the
intended recipient, you should destroy it immediately. Any information
in this message shall not be understood as given or endorsed by Cloud
Technology Partners Ltd, its subsidiaries or their employees, unless
expressly so stated. It is the responsibility of the recipient to ensure
that this email is virus free, therefore neither Cloud Technology
partners Ltd, its subsidiaries nor their employees accept any
responsibility.

-- 

Dr Mich Talebzadeh

LinkedIn
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

http://talebzadehmich.wordpress.com

NOTE: The information in this email is proprietary and confidential.
This message is for the designated recipient only, if you are not the
intended recipient, you should destroy it immediately. Any information
in this message shall not be understood as given or endorsed by Cloud
Technology Partners Ltd, its subsidiaries or their employees, unless
expressly so stated. It is the responsibility of the recipient to ensure
that this email is virus free, therefore neither Cloud Technology
partners Ltd, its subsidiaries nor their employees accept any
responsibility.

 

Links:
--
[1]
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.stddev
[2]
http://apache-spark-user-list.1001560.n3.nabble.com/Is-stddev-not-a-supported-aggregation-function-in-SparkSQL-WindowSpec-tp26250.html
[3]
http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml
[4]
http://apache-spark-user-list.1001560.n3.nabble.com/Is-stddev-not-a-supported-aggregation-function-in-SparkSQL-WindowSpec-tp26250p26263.html
[5] http://apache-spark-user-list.1001560.n3.nabble.com/


SPARK REST API on YARN

2016-02-18 Thread alvarobrandon
Hello:

I wanted to access the REST API
(http://spark.apache.org/docs/latest/monitoring.html#rest-api) of Spark to
monitor my jobs. However I'm running my Spark Apps over YARN. When I try to
make a request to http://localhost:4040/api/v1 as the documentation says I
don't get any response. My question is. It is possible to access this REST
API when you are not using Spark in Standalone mode?

Thanks in advance



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SPARK-REST-API-on-YARN-tp26267.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming with Kafka Use Case

2016-02-18 Thread Cody Koeninger
Backpressure won't help you with the first batch, you'd need
spark.streaming.kafka.maxRatePerPartition
for that

On Thu, Feb 18, 2016 at 9:40 AM, praveen S  wrote:

> Have a look at
>
> spark.streaming.backpressure.enabled
> Property
>
> Regards,
> Praveen
> On 18 Feb 2016 00:13, "Abhishek Anand"  wrote:
>
>> I have a spark streaming application running in production. I am trying
>> to find a solution for a particular use case when my application has a
>> downtime of say 5 hours and is restarted. Now, when I start my streaming
>> application after 5 hours there would be considerable amount of data then
>> in the Kafka and my cluster would be unable to repartition and process that.
>>
>> Is there any workaround so that when my streaming application starts it
>> starts taking data for 1-2 hours, process it , then take the data for next
>> 1 hour process it. Now when its done processing of previous 5 hours data
>> which missed, normal streaming should start with the given slide interval.
>>
>> Please suggest any ideas and feasibility of this.
>>
>>
>> Thanks !!
>> Abhi
>>
>


Re: Spark Streaming with Kafka Use Case

2016-02-18 Thread praveen S
Have a look at

spark.streaming.backpressure.enabled
Property

Regards,
Praveen
On 18 Feb 2016 00:13, "Abhishek Anand"  wrote:

> I have a spark streaming application running in production. I am trying to
> find a solution for a particular use case when my application has a
> downtime of say 5 hours and is restarted. Now, when I start my streaming
> application after 5 hours there would be considerable amount of data then
> in the Kafka and my cluster would be unable to repartition and process that.
>
> Is there any workaround so that when my streaming application starts it
> starts taking data for 1-2 hours, process it , then take the data for next
> 1 hour process it. Now when its done processing of previous 5 hours data
> which missed, normal streaming should start with the given slide interval.
>
> Please suggest any ideas and feasibility of this.
>
>
> Thanks !!
> Abhi
>


SPARK-9559

2016-02-18 Thread Ashish Soni
Hi All ,

Just wanted to know if there is any work around or resolution for below
issue in Stand alone mode

https://issues.apache.org/jira/browse/SPARK-9559

Ashish


Re: How to train and predict in parallel via Spark MLlib?

2016-02-18 Thread Игорь Ляхов
Xiangrui, thnx for your answer!
Could you clarify some details?
What do you mean "I can trigger training jobs in different threads on the
driver"? I have 4-machine cluster (It will grow in future), and I wish use
them in parallel for training and predicting.
Do you have any example? It will be great if you show me anyone.

Thanks a lot for your participation!
--Igor

2016-02-18 17:24 GMT+03:00 Xiangrui Meng :

> If you have a big cluster, you can trigger training jobs in different
> threads on the driver. Putting RDDs inside an RDD won't work. -Xiangrui
>
> On Thu, Feb 18, 2016, 4:28 AM Igor L.  wrote:
>
>> Good day, Spark team!
>> I have to solve regression problem for different restricitons. There is a
>> bunch of criteria and rules for them, I have to build model and make
>> predictions for each, combine all and save.
>> So, now my solution looks like:
>>
>> criteria2Rules: List[(String, Set[String])]
>> var result: RDD[(Id, Double)] = sc.parallelize(Array[(Id, Double)]())
>> criteria2Rules.foreach {
>>   case (criterion, rules) =>
>> val trainDataSet: RDD[LabeledPoint] = prepareTrainSet(criterion,
>> data)
>> val model: GradientBoostedTreesModel = buildModel(trainDataSet)
>> val predictionDataSet = preparePredictionDataSet(criterion, data)
>> val predictedScores = predictScores(predictionDataSet, model,
>> criterion, rules)
>> result = result.union(predictedScores)
>> }
>>
>> It works almost nice, but too slow for the reason
>> GradientBoostedTreesModel
>> training not so fast, especially in case of big amount of features,
>> samples
>> and also quite big list of using criteria.
>> I suppose it could work better, if Spark will train models and make
>> predictions in parallel.
>>
>> I've tried to use a relational way of data operation:
>>
>> val criteria2RulesRdd: RDD[(String, Set[String])]
>>
>> val cartesianCriteriaRules2DataRdd =
>> criteria2RulesRdd.cartesian(dataRdd)
>> cartesianCriteriaRules2DataRdd
>>   .aggregateByKey(List[Data]())(
>> { case (lst, tuple) => lst :+ tuple }, { case (lstL, lstR) => lstL
>> ::: lstR}
>>   )
>>   .map {
>> case (criteria, rulesSet, scorePredictionDataList) =>
>>   val trainSet = ???
>>   val model = ???
>>   val predictionSet = ???
>>   val predictedScores = ???
>>   }
>>   ...
>>
>> but it inevitably brings to situation when one RDD is produced inside
>> another RDD (GradientBoostedTreesModel is trained on RDD[LabeledPoint])
>> and
>> as far as I know it's a bad scenario, e.g.
>> toy example below doesn't work:
>> scala> sc.parallelize(1 to 100).map(x => (x,
>> sc.parallelize(Array(2)).map(_
>> * 2).collect)).collect.
>>
>> Is there any way to use Spark MLlib in parallel way?
>>
>> Thank u for attention!
>>
>> --
>> BR,
>> Junior Scala/Python Developer
>> Igor L.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-train-and-predict-in-parallel-via-Spark-MLlib-tp26261.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>


-- 
--
С уважением,
Игорь Ляхов


Re: [MLlib] What is the best way to forecast the next month page visit?

2016-02-18 Thread diplomatic Guru
Hi Jorge,

Thanks for the example. I managed to get the job to run but the results are
appalling.

The best I could get it:
Test Mean Squared Error: 684.3709679595169
Learned regression tree model:
DecisionTreeModel regressor of depth 30 with 6905 nodes

I tried tweaking maxDepth and maxBins but I couldn't get any better results.

Do you know how I could improve the results?



On 5 February 2016 at 08:34, Jorge Machado  wrote:

> Hi,
>
> For Example an array:
>
> 3 Categories : Nov,Dec, Jan.
>
> Nov = 1,0,0
> Dec = 0,1,0
> Jan = 0,0,1
> for the complete Year you would have 12 Categories.  Like  Jan =
> 1,0,0,0,0,0,0,0,0,0,0,0
> Pages:
> PageA: 0,0,0,1
> PageB: 0,0,1,0
> PageC:0,1,0,0
> PageD:1,0,0,0
>
> If you are using decisionTree I think you do not need to normalize the
> other values
>
> You should have at the end for Januar and PageA something like :
>
> LabeledPoint (label , (0,0,1,0,0,01,1.0,2.0,3.0))
>
> Pass the LabeledPoint to the ML model.
>
> test it.
>
> PS: label is what you want to predict.
>
> On 02/02/2016, at 20:44, diplomatic Guru  wrote:
>
> Hi Jorge,
>
> Unfortunately, I couldn't transform the data as you suggested.
>
> This is what I get:
>
> +---+-+-+
> | id|pageIndex|  pageVec|
> +---+-+-+
> |0.0|  3.0|(3,[],[])|
> |1.0|  0.0|(3,[0],[1.0])|
> |2.0|  2.0|(3,[2],[1.0])|
> |3.0|  1.0|(3,[1],[1.0])|
> +---+-+-+
>
>
> This is the snippets:
>
> JavaRDD jrdd = jsc.parallelize(Arrays.asList(
> RowFactory.create(0.0, "PageA", 1.0, 2.0, 3.0),
> RowFactory.create(1.0, "PageB", 4.0, 5.0, 6.0),
> RowFactory.create(2.0, "PageC", 7.0, 8.0, 9.0),
> RowFactory.create(3.0, "PageD", 10.0, 11.0, 12.0)
>
> ));
>
> StructType schema = new StructType(new StructField[] {
> new StructField("id", DataTypes.DoubleType, false,
> Metadata.empty()),
> new StructField("page", DataTypes.StringType, false,
> Metadata.empty()),
> new StructField("Nov", DataTypes.DoubleType, false,
> Metadata.empty()),
> new StructField("Dec", DataTypes.DoubleType, false,
> Metadata.empty()),
> new StructField("Jan", DataTypes.DoubleType, false,
> Metadata.empty()) });
>
> DataFrame df = sqlContext.createDataFrame(jrdd, schema);
>
> StringIndexerModel indexer = new
> StringIndexer().setInputCol("page").setInputCol("Nov")
>
> .setInputCol("Dec").setInputCol("Jan").setOutputCol("pageIndex").fit(df);
>
> OneHotEncoder encoder = new
> OneHotEncoder().setInputCol("pageIndex").setOutputCol("pageVec");
>
> DataFrame indexed = indexer.transform(df);
>
> DataFrame encoded = encoder.transform(indexed);
> encoded.select("id", "pageIndex", "pageVec").show();
>
>
> Could you please let me know what I'm doing wrong?
>
>
> PS: My cluster is running Spark 1.3.0, which doesn't support
> StringIndexer, OneHotEncoder  but for testing this I've installed the 1.6.0
> on my local machine.
>
> Cheer.
>
>
> On 2 February 2016 at 10:25, Jorge Machado  wrote:
>
>> Hi Guru,
>>
>> Any results ? :)
>>
>> On 01/02/2016, at 14:34, diplomatic Guru 
>> wrote:
>>
>> Hi Jorge,
>>
>> Thank you for the reply and your example. I'll try your suggestion and
>> will let you know the outcome.
>>
>> Cheers
>>
>>
>> On 1 February 2016 at 13:17, Jorge Machado  wrote:
>>
>>> Hi Guru,
>>>
>>> So First transform your Name pages with OneHotEncoder (
>>> https://spark.apache.org/docs/latest/ml-features.html#onehotencoder)
>>> then make the same thing for months:
>>>
>>> You will end with something like:
>>> (first tree are the pagename, the other the month,)
>>> (0,0,1,0,0,1)
>>>
>>> then you have your label that is what you want to predict. At the end
>>> you will have an LabeledPoint with (1 -> (0,0,1,0,0,1)) this will
>>> represent (1 -> (PageA, UV_NOV))
>>> After that try a regression tree with
>>>
>>> val model = DecisionTree.trainRegressor(trainingData,
>>> categoricalFeaturesInfo, impurity,maxDepth, maxBins)
>>>
>>>
>>> Regards
>>> Jorge
>>>
>>> On 01/02/2016, at 12:29, diplomatic Guru 
>>> wrote:
>>>
>>> Any suggestions please?
>>>
>>>
>>> On 29 January 2016 at 22:31, diplomatic Guru 
>>> wrote:
>>>
 Hello guys,

 I'm trying understand how I could predict the next month page views
 based on the previous access pattern.

 For example, I've collected statistics on page views:

 e.g.
 Page,UniqueView
 -
 pageA, 1
 pageB, 999
 ...
 pageZ,200

 I aggregate the statistics monthly.

 I've prepared a file containing last 3 months as this:

 e.g.
 Page,UV_NOV, UV_DEC, UV_JAN
 ---
 pageA, 1,9989,11000
 pageB, 999,500,700
 ...
 pageZ,200,50,34


 Based 

Re: Why no computations run on workers/slaves in cluster mode?

2016-02-18 Thread Gourav Sengupta
Hi,

Have you registered an application in the standalone cluster? This can also
happen if the data path that you are giving SPARK to access is only visible
in one system and not another. For example if I provide the data path as
"/abcd/*" and that path is available in only one system and not other then
the workers will only run from that system.


Regards,
Gourav Sengupta

On Wed, Feb 17, 2016 at 4:20 PM, Junjie Qian 
wrote:

> Hi all,
>
> I am new to Spark, and have one problem that, no computations run on
> workers/slave_servers in the standalone cluster mode.
>
> The Spark version is 1.6.0, and environment is CentOS. I run the example
> codes, e.g.
> https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/mllib/LinearRegression.scala#L117
> .
>
> What I did: 1. setup slaves in ./conf/slaves, 2. setup the spark-env.sh
> file, 3. sbin/start-all.sh, 4. run the test program with spark-submit.
> Follow the link, http://spark.apache.org/docs/latest/spark-standalone.html
> .
>
> Could anyone give some suggestions on this? Or the link to how setup this?
>
> Many thanks
> Junjie Qian
>


Re: Is this likely to cause any problems?

2016-02-18 Thread Gourav Sengupta
Hi Ted/ Teng,

Just read the content in the email which is very different from what the
facts are:
Just to want to add another point, spark-ec2 is nice to keep and improve
because it allows users to any version of spark (nightly-build for
example). EMR does not allow you to do that without manual process.

EMR does provide different version of SPARK to run, like currently SPARK
versions 1.4.1, 1.5.0, 1.5.2 and 1.6 are all available. SPARK 1.6 was
released in Amazon on Jan4, 2016 and EMR provided SPARK 1.6 in another 20
days, production ready, scalable, and integrated in AWS world.


Regards,
Gourav Sengupta


On Thu, Feb 18, 2016 at 2:30 PM, Ted Yu  wrote:

> Please see the last 3 posts on this thread:
>
> http://search-hadoop.com/m/q3RTtTorTf2o3UGK1=Re+spark+ec2+vs+EMR
>
> FYI
>
> On Thu, Feb 18, 2016 at 6:25 AM, Teng Qiu  wrote:
>
>> EMR is great, but I'm curiosity how are you dealing with security
>> settings with EMR, only whitelisting some IP range with security group
>> setting is really too weak.
>>
>> are there really many production system are using EMR? for me, i feel
>> using EMR means everyone in my IP range (for some ISP it may be the whole
>> town...) is able to see my spark web UI or use my running zepplin notebook
>> if they do some port scanning...
>>
>> 2016-02-18 15:04 GMT+01:00 Gourav Sengupta :
>>
>>> Hi,
>>>
>>> Just out of sheet curiosity why are you not using EMR to start your
>>> SPARK cluster?
>>>
>>>
>>> Regards,
>>> Gourav
>>>
>>> On Thu, Feb 18, 2016 at 12:23 PM, Ted Yu  wrote:
>>>
 Have you seen this ?

 HADOOP-10988

 Cheers

 On Thu, Feb 18, 2016 at 3:39 AM, James Hammerton 
 wrote:

> HI,
>
> I am seeing warnings like this in the logs when I run Spark jobs:
>
> OpenJDK 64-Bit Server VM warning: You have loaded library 
> /root/ephemeral-hdfs/lib/native/libhadoop.so.1.0.0 which might have 
> disabled stack guard. The VM will try to fix the stack guard now.
> It's highly recommended that you fix the library with 'execstack -c 
> ', or link it with '-z noexecstack'.
>
>
> I used spark-ec2 to launch the cluster with the default AMI, Spark
> 1.5.2, hadoop major version 2.4. I altered the jdk to be openjdk 8 as I'd
> written some jobs in Java 8. The 6 workers nodes are m4.2xlarge and master
> is m4.large.
>
> Could this contribute to any problems running the jobs?
>
> Regards,
>
> James
>


>>>
>>
>


Re: Why no computations run on workers/slaves in cluster mode?

2016-02-18 Thread Xiangrui Meng
Did the test program finish and did you see any error messages from the
console? -Xiangrui

On Wed, Feb 17, 2016, 1:49 PM Junjie Qian  wrote:

> Hi all,
>
> I am new to Spark, and have one problem that, no computations run on
> workers/slave_servers in the standalone cluster mode.
>
> The Spark version is 1.6.0, and environment is CentOS. I run the example
> codes, e.g.
> https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/mllib/LinearRegression.scala#L117
> .
>
> What I did: 1. setup slaves in ./conf/slaves, 2. setup the spark-env.sh
> file, 3. sbin/start-all.sh, 4. run the test program with spark-submit.
> Follow the link, http://spark.apache.org/docs/latest/spark-standalone.html
> .
>
> Could anyone give some suggestions on this? Or the link to how setup this?
>
> Many thanks
> Junjie Qian
>


Re: Is this likely to cause any problems?

2016-02-18 Thread Gourav Sengupta
Hi Teng,

Are you using VPC in EMR? Seems quite curious though that you can lock in
traffic at gateway, subnet, security group (using private setting using
NAT) and still feel insecured. I will be really interested to know what
your feelings are based on. I bet Amazon guys will also find it very
interesting.

And I am almost sure that none of EMR hosted services of HADOOP, SPARK,
Zepplin, etc are exposed to the external IP addresses even if you are using
the classical setting.


Regards,
Gourav Sengupta


On Thu, Feb 18, 2016 at 2:25 PM, Teng Qiu  wrote:

> EMR is great, but I'm curiosity how are you dealing with security settings
> with EMR, only whitelisting some IP range with security group setting is
> really too weak.
>
> are there really many production system are using EMR? for me, i feel
> using EMR means everyone in my IP range (for some ISP it may be the whole
> town...) is able to see my spark web UI or use my running zepplin notebook
> if they do some port scanning...
>
> 2016-02-18 15:04 GMT+01:00 Gourav Sengupta :
>
>> Hi,
>>
>> Just out of sheet curiosity why are you not using EMR to start your SPARK
>> cluster?
>>
>>
>> Regards,
>> Gourav
>>
>> On Thu, Feb 18, 2016 at 12:23 PM, Ted Yu  wrote:
>>
>>> Have you seen this ?
>>>
>>> HADOOP-10988
>>>
>>> Cheers
>>>
>>> On Thu, Feb 18, 2016 at 3:39 AM, James Hammerton  wrote:
>>>
 HI,

 I am seeing warnings like this in the logs when I run Spark jobs:

 OpenJDK 64-Bit Server VM warning: You have loaded library 
 /root/ephemeral-hdfs/lib/native/libhadoop.so.1.0.0 which might have 
 disabled stack guard. The VM will try to fix the stack guard now.
 It's highly recommended that you fix the library with 'execstack -c 
 ', or link it with '-z noexecstack'.


 I used spark-ec2 to launch the cluster with the default AMI, Spark
 1.5.2, hadoop major version 2.4. I altered the jdk to be openjdk 8 as I'd
 written some jobs in Java 8. The 6 workers nodes are m4.2xlarge and master
 is m4.large.

 Could this contribute to any problems running the jobs?

 Regards,

 James

>>>
>>>
>>
>


Re: How to train and predict in parallel via Spark MLlib?

2016-02-18 Thread Xiangrui Meng
If you have a big cluster, you can trigger training jobs in different
threads on the driver. Putting RDDs inside an RDD won't work. -Xiangrui

On Thu, Feb 18, 2016, 4:28 AM Igor L.  wrote:

> Good day, Spark team!
> I have to solve regression problem for different restricitons. There is a
> bunch of criteria and rules for them, I have to build model and make
> predictions for each, combine all and save.
> So, now my solution looks like:
>
> criteria2Rules: List[(String, Set[String])]
> var result: RDD[(Id, Double)] = sc.parallelize(Array[(Id, Double)]())
> criteria2Rules.foreach {
>   case (criterion, rules) =>
> val trainDataSet: RDD[LabeledPoint] = prepareTrainSet(criterion,
> data)
> val model: GradientBoostedTreesModel = buildModel(trainDataSet)
> val predictionDataSet = preparePredictionDataSet(criterion, data)
> val predictedScores = predictScores(predictionDataSet, model,
> criterion, rules)
> result = result.union(predictedScores)
> }
>
> It works almost nice, but too slow for the reason GradientBoostedTreesModel
> training not so fast, especially in case of big amount of features, samples
> and also quite big list of using criteria.
> I suppose it could work better, if Spark will train models and make
> predictions in parallel.
>
> I've tried to use a relational way of data operation:
>
> val criteria2RulesRdd: RDD[(String, Set[String])]
>
> val cartesianCriteriaRules2DataRdd =
> criteria2RulesRdd.cartesian(dataRdd)
> cartesianCriteriaRules2DataRdd
>   .aggregateByKey(List[Data]())(
> { case (lst, tuple) => lst :+ tuple }, { case (lstL, lstR) => lstL
> ::: lstR}
>   )
>   .map {
> case (criteria, rulesSet, scorePredictionDataList) =>
>   val trainSet = ???
>   val model = ???
>   val predictionSet = ???
>   val predictedScores = ???
>   }
>   ...
>
> but it inevitably brings to situation when one RDD is produced inside
> another RDD (GradientBoostedTreesModel is trained on RDD[LabeledPoint]) and
> as far as I know it's a bad scenario, e.g.
> toy example below doesn't work:
> scala> sc.parallelize(1 to 100).map(x => (x, sc.parallelize(Array(2)).map(_
> * 2).collect)).collect.
>
> Is there any way to use Spark MLlib in parallel way?
>
> Thank u for attention!
>
> --
> BR,
> Junior Scala/Python Developer
> Igor L.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-train-and-predict-in-parallel-via-Spark-MLlib-tp26261.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Is this likely to cause any problems?

2016-02-18 Thread Ted Yu
Please see the last 3 posts on this thread:

http://search-hadoop.com/m/q3RTtTorTf2o3UGK1=Re+spark+ec2+vs+EMR

FYI

On Thu, Feb 18, 2016 at 6:25 AM, Teng Qiu  wrote:

> EMR is great, but I'm curiosity how are you dealing with security settings
> with EMR, only whitelisting some IP range with security group setting is
> really too weak.
>
> are there really many production system are using EMR? for me, i feel
> using EMR means everyone in my IP range (for some ISP it may be the whole
> town...) is able to see my spark web UI or use my running zepplin notebook
> if they do some port scanning...
>
> 2016-02-18 15:04 GMT+01:00 Gourav Sengupta :
>
>> Hi,
>>
>> Just out of sheet curiosity why are you not using EMR to start your SPARK
>> cluster?
>>
>>
>> Regards,
>> Gourav
>>
>> On Thu, Feb 18, 2016 at 12:23 PM, Ted Yu  wrote:
>>
>>> Have you seen this ?
>>>
>>> HADOOP-10988
>>>
>>> Cheers
>>>
>>> On Thu, Feb 18, 2016 at 3:39 AM, James Hammerton  wrote:
>>>
 HI,

 I am seeing warnings like this in the logs when I run Spark jobs:

 OpenJDK 64-Bit Server VM warning: You have loaded library 
 /root/ephemeral-hdfs/lib/native/libhadoop.so.1.0.0 which might have 
 disabled stack guard. The VM will try to fix the stack guard now.
 It's highly recommended that you fix the library with 'execstack -c 
 ', or link it with '-z noexecstack'.


 I used spark-ec2 to launch the cluster with the default AMI, Spark
 1.5.2, hadoop major version 2.4. I altered the jdk to be openjdk 8 as I'd
 written some jobs in Java 8. The 6 workers nodes are m4.2xlarge and master
 is m4.large.

 Could this contribute to any problems running the jobs?

 Regards,

 James

>>>
>>>
>>
>


Re: Is this likely to cause any problems?

2016-02-18 Thread Teng Qiu
EMR is great, but I'm curiosity how are you dealing with security settings
with EMR, only whitelisting some IP range with security group setting is
really too weak.

are there really many production system are using EMR? for me, i feel using
EMR means everyone in my IP range (for some ISP it may be the whole
town...) is able to see my spark web UI or use my running zepplin notebook
if they do some port scanning...

2016-02-18 15:04 GMT+01:00 Gourav Sengupta :

> Hi,
>
> Just out of sheet curiosity why are you not using EMR to start your SPARK
> cluster?
>
>
> Regards,
> Gourav
>
> On Thu, Feb 18, 2016 at 12:23 PM, Ted Yu  wrote:
>
>> Have you seen this ?
>>
>> HADOOP-10988
>>
>> Cheers
>>
>> On Thu, Feb 18, 2016 at 3:39 AM, James Hammerton  wrote:
>>
>>> HI,
>>>
>>> I am seeing warnings like this in the logs when I run Spark jobs:
>>>
>>> OpenJDK 64-Bit Server VM warning: You have loaded library 
>>> /root/ephemeral-hdfs/lib/native/libhadoop.so.1.0.0 which might have 
>>> disabled stack guard. The VM will try to fix the stack guard now.
>>> It's highly recommended that you fix the library with 'execstack -c 
>>> ', or link it with '-z noexecstack'.
>>>
>>>
>>> I used spark-ec2 to launch the cluster with the default AMI, Spark
>>> 1.5.2, hadoop major version 2.4. I altered the jdk to be openjdk 8 as I'd
>>> written some jobs in Java 8. The 6 workers nodes are m4.2xlarge and master
>>> is m4.large.
>>>
>>> Could this contribute to any problems running the jobs?
>>>
>>> Regards,
>>>
>>> James
>>>
>>
>>
>


Re: adding a split and union to a streaming application cause big performance hit

2016-02-18 Thread krishna ramachandran
I tried these 2 global settings (and restarted the app) after enabling
cache for stream1

conf.set("spark.streaming.unpersist", "true")

streamingContext.remember(Seconds(batchDuration * 4))

batch duration is 4 sec

Using spark-1.4.1. The application runs for about 4-5 hrs then see out of
memory error

regards

Krishna

On Thu, Feb 18, 2016 at 4:54 AM, Ted Yu  wrote:

> bq. streamingContext.remember("duration") did not help
>
> Can you give a bit more detail on the above ?
> Did you mean the job encountered OOME later on ?
>
> Which Spark release are you using ?
>
> Cheers
>
> On Wed, Feb 17, 2016 at 6:03 PM, ramach1776  wrote:
>
>> We have a streaming application containing approximately 12 jobs every
>> batch,
>> running in streaming mode (4 sec batches). Each  job has several
>> transformations and 1 action (output to cassandra) which causes the
>> execution of the job (DAG)
>>
>> For example the first job,
>>
>> /job 1
>> ---> receive Stream A --> map --> filter -> (union with another stream B)
>> --> map -->/ groupbykey --> transform --> reducebykey --> map
>>
>> Likewise we go thro' few more transforms and save to database (job2,
>> job3...)
>>
>> Recently we added a new transformation further downstream wherein we union
>> the output of DStream from job 1 (in italics) with output from a new
>> transformation(job 5). It appears the whole execution thus far is repeated
>> which is redundant (I can see this in execution graph & also performance
>> ->
>> processing time).
>>
>> That is, with this additional transformation (union with a stream
>> processed
>> upstream) each batch runs as much as 2.5 times slower compared to runs
>> without the union. If I cache the DStream from job 1(italics), performance
>> improves substantially but hit out of memory errors within few hours.
>>
>> What is the recommended way to cache/unpersist in such a scenario? there
>> is
>> no dstream level "unpersist"
>> setting "spark.streaming.unpersist" to true and
>> streamingContext.remember("duration") did not help.
>>
>>
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/adding-a-split-and-union-to-a-streaming-application-cause-big-performance-hit-tp26259.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Is this likely to cause any problems?

2016-02-18 Thread Gourav Sengupta
Hi,

Just out of sheet curiosity why are you not using EMR to start your SPARK
cluster?


Regards,
Gourav

On Thu, Feb 18, 2016 at 12:23 PM, Ted Yu  wrote:

> Have you seen this ?
>
> HADOOP-10988
>
> Cheers
>
> On Thu, Feb 18, 2016 at 3:39 AM, James Hammerton  wrote:
>
>> HI,
>>
>> I am seeing warnings like this in the logs when I run Spark jobs:
>>
>> OpenJDK 64-Bit Server VM warning: You have loaded library 
>> /root/ephemeral-hdfs/lib/native/libhadoop.so.1.0.0 which might have disabled 
>> stack guard. The VM will try to fix the stack guard now.
>> It's highly recommended that you fix the library with 'execstack -c 
>> ', or link it with '-z noexecstack'.
>>
>>
>> I used spark-ec2 to launch the cluster with the default AMI, Spark 1.5.2,
>> hadoop major version 2.4. I altered the jdk to be openjdk 8 as I'd written
>> some jobs in Java 8. The 6 workers nodes are m4.2xlarge and master is
>> m4.large.
>>
>> Could this contribute to any problems running the jobs?
>>
>> Regards,
>>
>> James
>>
>
>


Re: Error when executing Spark application on YARN

2016-02-18 Thread alvarobrandon
Found the solution. I was pointing to the wrong hadoop conf directory. I feel
so stupid :P



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-executing-Spark-application-on-YARN-tp26248p26266.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: adding a split and union to a streaming application cause big performance hit

2016-02-18 Thread Ted Yu
bq. streamingContext.remember("duration") did not help

Can you give a bit more detail on the above ?
Did you mean the job encountered OOME later on ?

Which Spark release are you using ?

Cheers

On Wed, Feb 17, 2016 at 6:03 PM, ramach1776  wrote:

> We have a streaming application containing approximately 12 jobs every
> batch,
> running in streaming mode (4 sec batches). Each  job has several
> transformations and 1 action (output to cassandra) which causes the
> execution of the job (DAG)
>
> For example the first job,
>
> /job 1
> ---> receive Stream A --> map --> filter -> (union with another stream B)
> --> map -->/ groupbykey --> transform --> reducebykey --> map
>
> Likewise we go thro' few more transforms and save to database (job2,
> job3...)
>
> Recently we added a new transformation further downstream wherein we union
> the output of DStream from job 1 (in italics) with output from a new
> transformation(job 5). It appears the whole execution thus far is repeated
> which is redundant (I can see this in execution graph & also performance ->
> processing time).
>
> That is, with this additional transformation (union with a stream processed
> upstream) each batch runs as much as 2.5 times slower compared to runs
> without the union. If I cache the DStream from job 1(italics), performance
> improves substantially but hit out of memory errors within few hours.
>
> What is the recommended way to cache/unpersist in such a scenario? there is
> no dstream level "unpersist"
> setting "spark.streaming.unpersist" to true and
> streamingContext.remember("duration") did not help.
>
>
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/adding-a-split-and-union-to-a-streaming-application-cause-big-performance-hit-tp26259.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Is this likely to cause any problems?

2016-02-18 Thread Ted Yu
Have you seen this ?

HADOOP-10988

Cheers

On Thu, Feb 18, 2016 at 3:39 AM, James Hammerton  wrote:

> HI,
>
> I am seeing warnings like this in the logs when I run Spark jobs:
>
> OpenJDK 64-Bit Server VM warning: You have loaded library 
> /root/ephemeral-hdfs/lib/native/libhadoop.so.1.0.0 which might have disabled 
> stack guard. The VM will try to fix the stack guard now.
> It's highly recommended that you fix the library with 'execstack -c 
> ', or link it with '-z noexecstack'.
>
>
> I used spark-ec2 to launch the cluster with the default AMI, Spark 1.5.2,
> hadoop major version 2.4. I altered the jdk to be openjdk 8 as I'd written
> some jobs in Java 8. The 6 workers nodes are m4.2xlarge and master is
> m4.large.
>
> Could this contribute to any problems running the jobs?
>
> Regards,
>
> James
>


How do I stream in Parquet files using fileStream() and ParquetInputFormat?

2016-02-18 Thread Rory Byrne
Hi,

I'm trying to understand how to stream Parquet files into Spark using
StreamingContext.fileStream[Key, Value, Format]().

I am struggling to understand a) what should be passed as Key and Value
(assuming ParquetInputFormat - is this the correct format?), and b) how -
if at all - to configure the ParquetInputFormat with a ReadSupport class,
 RecordMaterializer etc..

I have tried setting the ReadSupportClass to GroupReadSupport (from the
examples), but I am having problems with the fact that I must also pass a
Hadoop MapReduce job - which is expected to be running and attached to a
job tracker.

Any help or reading suggestions are appreciated as I have almost no
knowledge of Hadoop so this low level use of Hadoop is very confusing for
me.


Is this likely to cause any problems?

2016-02-18 Thread James Hammerton
HI,

I am seeing warnings like this in the logs when I run Spark jobs:

OpenJDK 64-Bit Server VM warning: You have loaded library
/root/ephemeral-hdfs/lib/native/libhadoop.so.1.0.0 which might have
disabled stack guard. The VM will try to fix the stack guard now.
It's highly recommended that you fix the library with 'execstack -c
', or link it with '-z noexecstack'.


I used spark-ec2 to launch the cluster with the default AMI, Spark 1.5.2,
hadoop major version 2.4. I altered the jdk to be openjdk 8 as I'd written
some jobs in Java 8. The 6 workers nodes are m4.2xlarge and master is
m4.large.

Could this contribute to any problems running the jobs?

Regards,

James


Re: Reading CSV file using pyspark

2016-02-18 Thread Gourav Sengupta
Hi Devesh,

you have to start your SPARK Shell using the packages. The command is
mentioned below (you can use pyspark instead of spark-shell), anyways all
the required commands for this is mentioned here
https://github.com/databricks/spark-csv and I prefer using the 2.11 version
instead of 2.10 as there are some write issues which 2.11 resolves.
Hopefully you are using the latest release of SPARK.

$SPARK_HOME/bin/spark-shell --packages com.databricks:spark-csv_2.11:1.3.0

Regards,
Gourav Sengupta


On Thu, Feb 18, 2016 at 11:05 AM, Teng Qiu  wrote:

> download a right version of this jar
> http://mvnrepository.com/artifact/com.databricks/spark-csv_2.10 (or
> 2.11), and append it to SPARK_CLASSPATH
>
> 2016-02-18 11:05 GMT+01:00 Devesh Raj Singh :
>
>> Hi,
>>
>> I want to read CSV file in pyspark
>>
>> I am running pyspark on pycharm
>> I am trying to load a csv using pyspark
>>
>> import os
>> import sys
>>
>> os.environ['SPARK_HOME']="/Users/devesh/Downloads/spark-1.5.1-bin-hadoop2.6"
>> sys.path.append("/Users/devesh/Downloads/spark-1.5.1-bin-hadoop2.6/python/")
>>
>> # Now we are ready to import Spark Modules
>> try:
>> from pyspark import SparkContext
>> from pyspark import SparkConf
>> from pyspark.mllib.fpm import FPGrowth
>> print ("Successfully imported all Spark Modules")
>> except ImportError as e:
>> print ("Error importing Spark Modules", e)
>> sys.exit(1)
>>
>>
>> sc = SparkContext('local')
>>
>> from pyspark.sql import HiveContext, SQLContext
>> from pyspark.sql import SQLContext
>>
>> df = 
>> sqlContext.read.format('com.databricks.spark.csv').options(header='true', 
>> inferschema='true').load('/Users/devesh/work/iris/iris.csv')
>>
>> I am getting the following error
>>
>> Py4JJavaError: An error occurred while calling o88.load.
>> : java.lang.ClassNotFoundException: Failed to load class for data source:
>> com.databricks.spark.csv.
>> at
>> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:67)
>> --
>> Warm regards,
>> Devesh.
>>
>
>


explaination for parent.slideDuration in ReducedWindowedDStream

2016-02-18 Thread Sachin Aggarwal
While reading code i came across parent.slideDuration in
ReducedWindowedDStream class

val currentWindow = new Interval(currentTime - windowDuration +
parent.slideDuration,
  currentTime)

//  _
// |  previous window   _|___
// |___|   current window|  --> Time
// |_|
//
// | _|  | _|
//  | |
//  V V
//   old RDDs new RDDs
//

Here currentWindow can be expresed as currentTime - windowDuration
*for what we need parent.slideDuration.*

same is repeated in further expressions also,

val oldRDDs =
  reducedStream.slice(previousWindow.beginTime,
currentWindow.beginTime - parent.slideDuration)
logDebug("# old RDDs = " + oldRDDs.size)

// Get the RDDs of the reduced values in "new time steps"
val newRDDs =
  reducedStream.slice(previousWindow.endTime +
parent.slideDuration, currentWindow.endTime)
logDebug("# new RDDs = " + newRDDs.size)


-- 

Thanks & Regards

Sachin Aggarwal
7760502772


Re: Error when executing Spark application on YARN

2016-02-18 Thread alvarobrandon
1. It happens to all the classes inside the jar package.
2. I didn't do any changes. 
   - I have three nodes: one master and two slaves in the conf/slaves
file
   - In spark-env.sh I just set the HADOOP_CONF_DIR parameter
   - In spark-defaults.conf I didn't change anything
3. The container doesn't even starts. 

It seems like there is some problem when sending the jar files. I have just
realised I get the following message.
Diagnostics: java.io.IOException: Resource
file:/opt/spark/BenchMark-1.0-SNAPSHOT.jar changed on src filesystem
(expected 1455792343000, was 145579310





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-executing-Spark-application-on-YARN-tp26248p26264.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Reading CSV file using pyspark

2016-02-18 Thread Teng Qiu
download a right version of this jar
http://mvnrepository.com/artifact/com.databricks/spark-csv_2.10 (or 2.11),
and append it to SPARK_CLASSPATH

2016-02-18 11:05 GMT+01:00 Devesh Raj Singh :

> Hi,
>
> I want to read CSV file in pyspark
>
> I am running pyspark on pycharm
> I am trying to load a csv using pyspark
>
> import os
> import sys
>
> os.environ['SPARK_HOME']="/Users/devesh/Downloads/spark-1.5.1-bin-hadoop2.6"
> sys.path.append("/Users/devesh/Downloads/spark-1.5.1-bin-hadoop2.6/python/")
>
> # Now we are ready to import Spark Modules
> try:
> from pyspark import SparkContext
> from pyspark import SparkConf
> from pyspark.mllib.fpm import FPGrowth
> print ("Successfully imported all Spark Modules")
> except ImportError as e:
> print ("Error importing Spark Modules", e)
> sys.exit(1)
>
>
> sc = SparkContext('local')
>
> from pyspark.sql import HiveContext, SQLContext
> from pyspark.sql import SQLContext
>
> df = 
> sqlContext.read.format('com.databricks.spark.csv').options(header='true', 
> inferschema='true').load('/Users/devesh/work/iris/iris.csv')
>
> I am getting the following error
>
> Py4JJavaError: An error occurred while calling o88.load.
> : java.lang.ClassNotFoundException: Failed to load class for data source:
> com.databricks.spark.csv.
> at
> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:67)
> --
> Warm regards,
> Devesh.
>


Re: Is stddev not a supported aggregation function in SparkSQL WindowSpec?

2016-02-18 Thread rok
There is a stddev function since 1.6:
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.stddev


If you are using spark < 1.6 you can write your own more or less easily.

On Wed, Feb 17, 2016 at 5:06 PM, mayx [via Apache Spark User List] <
ml-node+s1001560n26250...@n3.nabble.com> wrote:

> I'd like to use standard deviation over window partitions on the Spark
> dataframe, but it didn't work. Is it not supported? Looks like it supports
> many aggregation functions, such as mean, min, etc. How can I make a
> feature request for this?
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Is-stddev-not-a-supported-aggregation-function-in-SparkSQL-WindowSpec-tp26250.html
> To start a new topic under Apache Spark User List, email
> ml-node+s1001560n1...@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-stddev-not-a-supported-aggregation-function-in-SparkSQL-WindowSpec-tp26250p26263.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

How do I stream in Parquet files using fileStream() and ParquetInputFormat

2016-02-18 Thread roryofbyrne
Hi, 

I'm trying to understand how to stream Parquet files into Spark using
StreamingContext.fileStream[Key, Value, Format](). 

I am struggling to understand a) what should be passed as Key and Value
(assuming ParquetInputFormat - is this the correct format?), and b) how - if
at all - to configure the ParquetInputFormat with a ReadSupport class, 
RecordMaterializer etc.. 

Any help is appreciated as I have almost no knowledge of Hadoop so this low
level use of Hadoop is very confusing for me.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-do-I-stream-in-Parquet-files-using-fileStream-and-ParquetInputFormat-tp26262.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Reading CSV file using pyspark

2016-02-18 Thread Devesh Raj Singh
Hi,

I want to read CSV file in pyspark

I am running pyspark on pycharm
I am trying to load a csv using pyspark

import os
import sys

os.environ['SPARK_HOME']="/Users/devesh/Downloads/spark-1.5.1-bin-hadoop2.6"
sys.path.append("/Users/devesh/Downloads/spark-1.5.1-bin-hadoop2.6/python/")

# Now we are ready to import Spark Modules
try:
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.mllib.fpm import FPGrowth
print ("Successfully imported all Spark Modules")
except ImportError as e:
print ("Error importing Spark Modules", e)
sys.exit(1)


sc = SparkContext('local')

from pyspark.sql import HiveContext, SQLContext
from pyspark.sql import SQLContext

df = sqlContext.read.format('com.databricks.spark.csv').options(header='true',
inferschema='true').load('/Users/devesh/work/iris/iris.csv')

I am getting the following error

Py4JJavaError: An error occurred while calling o88.load.
: java.lang.ClassNotFoundException: Failed to load class for data source:
com.databricks.spark.csv.
at
org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:67)
-- 
Warm regards,
Devesh.


How to train and predict in parallel via Spark MLlib?

2016-02-18 Thread Igor L.
Good day, Spark team!
I have to solve regression problem for different restricitons. There is a
bunch of criteria and rules for them, I have to build model and make
predictions for each, combine all and save.
So, now my solution looks like:

criteria2Rules: List[(String, Set[String])]
var result: RDD[(Id, Double)] = sc.parallelize(Array[(Id, Double)]())
criteria2Rules.foreach {
  case (criterion, rules) =>
val trainDataSet: RDD[LabeledPoint] = prepareTrainSet(criterion,
data)
val model: GradientBoostedTreesModel = buildModel(trainDataSet)
val predictionDataSet = preparePredictionDataSet(criterion, data)
val predictedScores = predictScores(predictionDataSet, model,
criterion, rules)
result = result.union(predictedScores)
}

It works almost nice, but too slow for the reason GradientBoostedTreesModel
training not so fast, especially in case of big amount of features, samples
and also quite big list of using criteria. 
I suppose it could work better, if Spark will train models and make
predictions in parallel.

I've tried to use a relational way of data operation:

val criteria2RulesRdd: RDD[(String, Set[String])]

val cartesianCriteriaRules2DataRdd =
criteria2RulesRdd.cartesian(dataRdd)
cartesianCriteriaRules2DataRdd
  .aggregateByKey(List[Data]())(
{ case (lst, tuple) => lst :+ tuple }, { case (lstL, lstR) => lstL
::: lstR}
  )
  .map {
case (criteria, rulesSet, scorePredictionDataList) =>
  val trainSet = ???
  val model = ???
  val predictionSet = ???
  val predictedScores = ???
  }
  ...

but it inevitably brings to situation when one RDD is produced inside
another RDD (GradientBoostedTreesModel is trained on RDD[LabeledPoint]) and
as far as I know it's a bad scenario, e.g.
toy example below doesn't work:
scala> sc.parallelize(1 to 100).map(x => (x, sc.parallelize(Array(2)).map(_
* 2).collect)).collect.

Is there any way to use Spark MLlib in parallel way?

Thank u for attention!

--
BR,
Junior Scala/Python Developer
Igor L.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-train-and-predict-in-parallel-via-Spark-MLlib-tp26261.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org