Re: SparkR Error in sparkR.init(master=“local”) in RStudio

2015-10-06 Thread Felix Cheung
Is it possible that your user does not have permission to write temp file?






On Tue, Oct 6, 2015 at 10:26 AM -0700, "akhandeshi"  
wrote:
It seems it is failing at
 path <- tempfile(pattern = "backend_port")  I do not see backend_port
directory created...



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-Error-in-sparkR-init-master-local-in-RStudio-tp23768p24958.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Help needed to reproduce bug

2015-10-06 Thread Jean-Baptiste Onofré

Hi Nick,

I will try to reproduce your issue on a couple of environment. Just 
wanted which kind of environment you use: spark standalone, spark on 
yarn, or spark on mesos ?


For you, does it occur with any transform() on any RDD or do you use 
specific RDD ?


I plan to use your code in a main and use spark-submit: do you use such 
kind of deployment ?


Thanks !
Regards
JB

On 10/07/2015 07:18 AM, pnpritchard wrote:

Hi spark community,

I was hoping someone could help me by running a code snippet below in the
spark shell, and seeing if they see the same buggy behavior I see. Full
details of the bug can be found in this JIRA issue I filed:
https://issues.apache.org/jira/browse/SPARK-10942.

The issue was closed due to cannot reproduce, however, I can't seem to shake
it. I have worked on this for a while, removing all known variables, and
trying different versions of spark (1.5.0, 1.5.1, master), and different OSs
(Mac OSX, Debian Linux). My coworkers have tried as well and see the same
behavior. This has me convinced that I cannot be the only one in the
community to be able to produce this.

If you have a minute or two, please open a spark shell and copy/paste the
below code. After 30 seconds, check the spark ui, storage tab. If you see
some cached RDDs listed, then the bug has been reproduced. If not, then
there is no bug... and I may be losing my mind.

Thanks in advance!

Nick





import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable

val ssc = new StreamingContext(sc, Seconds(1))

val inputRDDs = mutable.Queue.tabulate(30) { i =>
   sc.parallelize(Seq(i))
}

val input = ssc.queueStream(inputRDDs)

val output = input.transform { rdd =>
   if (rdd.isEmpty()) {
 rdd
   } else {
 val rdd2 = rdd.map(identity)
 rdd2.cache()
 rdd2.setName(rdd.first().toString)
 val rdd3 = rdd2.map(identity) ++ rdd2.map(identity)
 rdd3
   }
}

output.print()

ssc.start()





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Help-needed-to-reproduce-bug-tp24965.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Help needed to reproduce bug

2015-10-06 Thread pnpritchard
Hi spark community,

I was hoping someone could help me by running a code snippet below in the
spark shell, and seeing if they see the same buggy behavior I see. Full
details of the bug can be found in this JIRA issue I filed:
https://issues.apache.org/jira/browse/SPARK-10942.

The issue was closed due to cannot reproduce, however, I can't seem to shake
it. I have worked on this for a while, removing all known variables, and
trying different versions of spark (1.5.0, 1.5.1, master), and different OSs
(Mac OSX, Debian Linux). My coworkers have tried as well and see the same
behavior. This has me convinced that I cannot be the only one in the
community to be able to produce this.

If you have a minute or two, please open a spark shell and copy/paste the
below code. After 30 seconds, check the spark ui, storage tab. If you see
some cached RDDs listed, then the bug has been reproduced. If not, then
there is no bug... and I may be losing my mind.

Thanks in advance!

Nick





import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable

val ssc = new StreamingContext(sc, Seconds(1))

val inputRDDs = mutable.Queue.tabulate(30) { i =>
  sc.parallelize(Seq(i))
}

val input = ssc.queueStream(inputRDDs)

val output = input.transform { rdd =>
  if (rdd.isEmpty()) {
rdd
  } else {
val rdd2 = rdd.map(identity)
rdd2.cache()
rdd2.setName(rdd.first().toString)
val rdd3 = rdd2.map(identity) ++ rdd2.map(identity)
rdd3
  }
}

output.print()

ssc.start()





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Help-needed-to-reproduce-bug-tp24965.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: GenericMutableRow and Row mismatch on Spark 1.5?

2015-10-06 Thread Hemant Bhanawat
An approach can be to wrap your MutableRow in WrappedInternalRow which is a
child class of Row.

Hemant
www.snappydata.io
linkedin.com/company/snappydata


On Tue, Oct 6, 2015 at 3:21 PM, Ophir Cohen  wrote:

> Hi Guys,
> I'm upgrading to Spark 1.5.
>
> In our previous version (Spark 1.3 but it was OK on 1.4 as well) we
> created GenericMutableRow
> (org.apache.spark.sql.catalyst.expressions.GenericMutableRow) and return it
> as org.apache.spark.sql.Row
>
> Starting from Spark 1.5 GenericMutableRow isn't extends Row.
>
> What do you suggest to do?
> How can I convert GenericMutableRow to Row?
>
> Prompt answer will be highly appreciated!
> Thanks,
> Ophir
>


Re: Notification on Spark Streaming job failure

2015-10-06 Thread Krzysztof Zarzycki
Hi Vikram, So you give up using yarn-cluster mode of launching Spark jobs,
is that right? AFAIK when using yarn-cluster mode, the launch process
(spark-submit) monitors job running on YARN, but if it is killed/dies, it
just stops printing the state (RUNNING usually), without influencing the
monitored job. So you cannot use monit features on the launch process (like
restart on fail,etc.)

One more thing: Monit depends on pidfiles and spark-submit (in yarn-client
mode) does not create them. Do you create them on your own?

Thanks!
Krzysiek



2015-10-07 6:37 GMT+02:00 Vikram Kone :

> We are using Monit to kick off spark streaming jobs n seems to work fine.
>
>
> On Monday, September 28, 2015, Chen Song  wrote:
>
>> I am also interested specifically in monitoring and alerting on Spark
>> streaming jobs. It will be helpful to get some general guidelines or advice
>> on this, from people who implemented anything on this.
>>
>> On Fri, Sep 18, 2015 at 2:35 AM, Krzysztof Zarzycki > > wrote:
>>
>>> Hi there Spark Community,
>>> I would like to ask you for an advice: I'm running Spark Streaming jobs
>>> in production. Sometimes these jobs fail and I would like to get email
>>> notification about it. Do you know how I can set up Spark to notify me by
>>> email if my job fails? Or do I have to use external monitoring tool?
>>> I'm thinking of the following options:
>>> 1. As I'm running those jobs on YARN, monitor somehow YARN jobs. Looked
>>> for it as well but couldn't find any YARN feature to do it.
>>> 2. Run Spark Streaming job in some scheduler, like Oozie, Azkaban,
>>> Luigi. Those are created rather for batch jobs, not streaming, but could
>>> work. Has anyone tried that?
>>> 3. Run job driver under "monit" tool and catch the failure and send an
>>> email about it. Currently I'm deploying with yarn-cluster mode and I would
>>> need to resign from it to run under monit
>>> 4. Implement monitoring tool (like Graphite, Ganglia, Prometheus) and
>>> use Spark metrics. And then implement alerting in those. Can I get
>>> information of failed jobs in Spark metrics?
>>> 5. As 4. but implement my own custom job metrics and monitor them.
>>>
>>> What's your opinion about my options? How do you people solve this
>>> problem? Anything Spark specific?
>>> I'll be grateful for any advice in this subject.
>>> Thanks!
>>> Krzysiek
>>>
>>>
>>
>>
>> --
>> Chen Song
>>
>>


Re: Notification on Spark Streaming job failure

2015-10-06 Thread Vikram Kone
We are using Monit to kick off spark streaming jobs n seems to work fine.

On Monday, September 28, 2015, Chen Song  wrote:

> I am also interested specifically in monitoring and alerting on Spark
> streaming jobs. It will be helpful to get some general guidelines or advice
> on this, from people who implemented anything on this.
>
> On Fri, Sep 18, 2015 at 2:35 AM, Krzysztof Zarzycki  > wrote:
>
>> Hi there Spark Community,
>> I would like to ask you for an advice: I'm running Spark Streaming jobs
>> in production. Sometimes these jobs fail and I would like to get email
>> notification about it. Do you know how I can set up Spark to notify me by
>> email if my job fails? Or do I have to use external monitoring tool?
>> I'm thinking of the following options:
>> 1. As I'm running those jobs on YARN, monitor somehow YARN jobs. Looked
>> for it as well but couldn't find any YARN feature to do it.
>> 2. Run Spark Streaming job in some scheduler, like Oozie, Azkaban, Luigi.
>> Those are created rather for batch jobs, not streaming, but could work. Has
>> anyone tried that?
>> 3. Run job driver under "monit" tool and catch the failure and send an
>> email about it. Currently I'm deploying with yarn-cluster mode and I would
>> need to resign from it to run under monit
>> 4. Implement monitoring tool (like Graphite, Ganglia, Prometheus) and use
>> Spark metrics. And then implement alerting in those. Can I get information
>> of failed jobs in Spark metrics?
>> 5. As 4. but implement my own custom job metrics and monitor them.
>>
>> What's your opinion about my options? How do you people solve this
>> problem? Anything Spark specific?
>> I'll be grateful for any advice in this subject.
>> Thanks!
>> Krzysiek
>>
>>
>
>
> --
> Chen Song
>
>


Re: Broadcast var is null

2015-10-06 Thread Nick Peterson
This might seem silly, but...

Stop having your object extend App, and instead give it a main method.
That's worked for me recently when I've had this issue. (There was a very
old issue in Spark related to this; it would seem like a possible
regression, if this fixes it for you.)

-- Nick

On Tue, Oct 6, 2015 at 5:31 AM dpristin  wrote:

> I've reduced the code to the code below - no streaming, no Kafka, no
> checkpoint. Unfortunately the end result is the same - "broadcastVar is
> null" printed in the worker log. Any suggestion on what I'm missing would
> be
> very much appreciated !
>
>
> object BroadcastTest extends App {
>   val logger = LoggerFactory.getLogger("OinkSparkMain")
>   logger.info("OinkSparkMain - Setup Logger")
>
>   val sparkConf = new SparkConf().setAppName("OinkSparkMain")
>   val sc : SparkContext = new SparkContext(sparkConf)
>
>   val rdd = sc.parallelize(Array(1,2,3));
>
>   val arr = Array(1, 2, 3)
>   val broadcastVar = sc.broadcast(arr)
>
>   val mappedEvents =  rdd.map(e => {
> val l = LoggerFactory.getLogger("OinkSparkMain1")
>
> if (broadcastVar == null) {
>   l.info("broadcastVar is null")
>   (e, "empty")
> }
> else {
>   val str = broadcastVar.value.mkString(" | ")
>   l.info("broadcastVar is " + str)
>   (e, str)
> }
>   })
>
>   logger.info("** Total reduced count: " +
> mappedEvents.collect().length)
> }
>
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-var-is-null-tp24927p24950.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: does KafkaCluster can be public ?

2015-10-06 Thread Ted Yu
Or maybe annotate with @DeveloperApi

Cheers

On Tue, Oct 6, 2015 at 7:24 AM, Cody Koeninger  wrote:

> I personally think KafkaCluster (or the equivalent) should be made
> public.  When I'm deploying spark I just sed out the private[spark] and
> rebuild.
>
> There's a general reluctance to make things public due to backwards
> compatibility, but if enough people ask for it... ?
>
> On Tue, Oct 6, 2015 at 6:51 AM, Jonathan Coveney 
> wrote:
>
>> You can put a class in the org.apache.spark namespace to access anything
>> that is private[spark]. You can then make enrichments there to access
>> whatever you need. Just beware upgrade pain :)
>>
>>
>> El martes, 6 de octubre de 2015, Erwan ALLAIN 
>> escribió:
>>
>>> Hello,
>>>
>>> I'm currently testing spark streaming with kafka.
>>> I'm creating DirectStream with KafkaUtils and everything's fine. However
>>> I would like to use the signature where I can specify my own message
>>> handler (to play with partition and offset). In this case, I need to manage
>>> offset/partition by myself to fill fromOffsets argument.
>>> I have found a Jira on this usecase
>>> https://issues.apache.org/jira/browse/SPARK-6714 but it has been closed
>>> telling that it's too specific.
>>> I'm aware that it can be done using kafka api (TopicMetaDataRequest and
>>> OffsetRequest) but what I have to do is almost the same as the KafkaCluster
>>> which is private.
>>>
>>> is it possible to :
>>>  - add another signature in KafkaUtils ?
>>>  - make KafkaCluster public ?
>>>
>>> or do you have any other srmart solution where I don't need to
>>> copy/paste KafkaCluster ?
>>>
>>> Thanks.
>>>
>>> Regards,
>>> Erwan ALLAIN
>>>
>>
>


Re: does KafkaCluster can be public ?

2015-10-06 Thread Sean Owen
For what it's worth, I also use this class in an app, but it happens
to be from Java code where it acts as if it's public. So no problem
for my use case, but I suppose, another small vote for the usefulness
of this class to the caller. I end up using getLatestLeaderOffsets to
figure out how to initialize initial offsets.

On Tue, Oct 6, 2015 at 3:24 PM, Cody Koeninger  wrote:
> I personally think KafkaCluster (or the equivalent) should be made public.
> When I'm deploying spark I just sed out the private[spark] and rebuild.
>
> There's a general reluctance to make things public due to backwards
> compatibility, but if enough people ask for it... ?
>
> On Tue, Oct 6, 2015 at 6:51 AM, Jonathan Coveney  wrote:
>>
>> You can put a class in the org.apache.spark namespace to access anything
>> that is private[spark]. You can then make enrichments there to access
>> whatever you need. Just beware upgrade pain :)
>>
>>
>> El martes, 6 de octubre de 2015, Erwan ALLAIN 
>> escribió:
>>>
>>> Hello,
>>>
>>> I'm currently testing spark streaming with kafka.
>>> I'm creating DirectStream with KafkaUtils and everything's fine. However
>>> I would like to use the signature where I can specify my own message handler
>>> (to play with partition and offset). In this case, I need to manage
>>> offset/partition by myself to fill fromOffsets argument.
>>> I have found a Jira on this usecase
>>> https://issues.apache.org/jira/browse/SPARK-6714 but it has been closed
>>> telling that it's too specific.
>>> I'm aware that it can be done using kafka api (TopicMetaDataRequest and
>>> OffsetRequest) but what I have to do is almost the same as the KafkaCluster
>>> which is private.
>>>
>>> is it possible to :
>>>  - add another signature in KafkaUtils ?
>>>  - make KafkaCluster public ?
>>>
>>> or do you have any other srmart solution where I don't need to copy/paste
>>> KafkaCluster ?
>>>
>>> Thanks.
>>>
>>> Regards,
>>> Erwan ALLAIN
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: SparkR Error in sparkR.init(master=“local”) in RStudio

2015-10-06 Thread Khandeshi, Ami
> Sys.setenv(SPARKR_SUBMIT_ARGS="--verbose sparkr-shell")
> Sys.setenv(SPARK_PRINT_LAUNCH_COMMAND=1)
> 
> sc <- sparkR.init(master="local") 
Launching java with spark-submit command 
/C/DevTools/spark-1.5.1/bin/spark-submit.cmd   --verbose sparkr-shell 
C:\Users\a554719\AppData\Local\Temp\Rtmpw11KJ1\backend_port31b0afd4391 
Error in sparkR.init(master = "local") : 
  JVM is not ready after 10 seconds
In addition: Warning message:
running command '"/C/DevTools/spark-1.5.1/bin/spark-submit.cmd"   --verbose 
sparkr-shell 
C:\Users\a554719\AppData\Local\Temp\Rtmpw11KJ1\backend_port31b0afd4391' had 
status 127

-Original Message-
From: Sun, Rui [mailto:rui@intel.com] 
Sent: Tuesday, October 06, 2015 9:39 AM
To: akhandeshi; user@spark.apache.org
Subject: RE: SparkR Error in sparkR.init(master=“local”) in RStudio

What you have done is supposed to work.  Need more debugging information to 
find the cause.

Could you add the following lines before calling sparkR.init()? 

Sys.setenv(SPARKR_SUBMIT_ARGS="--verbose sparkr-shell")
Sys.setenv(SPARK_PRINT_LAUNCH_COMMAND=1)

Then to see if you can find any hint in the console output

-Original Message-
From: akhandeshi [mailto:ami.khande...@gmail.com] 
Sent: Tuesday, October 6, 2015 8:21 PM
To: user@spark.apache.org
Subject: Re: SparkR Error in sparkR.init(master=“local”) in RStudio

I couldn't get this working...

I have have JAVA_HOME set.
I have defined SPARK_HOME
Sys.setenv(SPARK_HOME="c:\DevTools\spark-1.5.1")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths())) 
library("SparkR", lib.loc="c:\\DevTools\\spark-1.5.1\\lib")
library(SparkR)
sc<-sparkR.init(master="local")

I get
Error in sparkR.init(master = "local") : 
  JVM is not ready after 10 seconds

What am I missing??






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-Error-in-sparkR-init-master-local-in-RStudio-tp23768p24949.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Broadcast var is null

2015-10-06 Thread Sean Owen
Yes, see https://issues.apache.org/jira/browse/SPARK-4170   The reason
was kind of complicated, and the 'fix' was just to warn you against
subclassing App! yes, use a main() method.

On Tue, Oct 6, 2015 at 3:15 PM, Nick Peterson  wrote:
> This might seem silly, but...
>
> Stop having your object extend App, and instead give it a main method.
> That's worked for me recently when I've had this issue. (There was a very
> old issue in Spark related to this; it would seem like a possible
> regression, if this fixes it for you.)
>
> -- Nick
>
> On Tue, Oct 6, 2015 at 5:31 AM dpristin  wrote:
>>
>> I've reduced the code to the code below - no streaming, no Kafka, no
>> checkpoint. Unfortunately the end result is the same - "broadcastVar is
>> null" printed in the worker log. Any suggestion on what I'm missing would
>> be
>> very much appreciated !
>>
>>
>> object BroadcastTest extends App {
>>   val logger = LoggerFactory.getLogger("OinkSparkMain")
>>   logger.info("OinkSparkMain - Setup Logger")
>>
>>   val sparkConf = new SparkConf().setAppName("OinkSparkMain")
>>   val sc : SparkContext = new SparkContext(sparkConf)
>>
>>   val rdd = sc.parallelize(Array(1,2,3));
>>
>>   val arr = Array(1, 2, 3)
>>   val broadcastVar = sc.broadcast(arr)
>>
>>   val mappedEvents =  rdd.map(e => {
>> val l = LoggerFactory.getLogger("OinkSparkMain1")
>>
>> if (broadcastVar == null) {
>>   l.info("broadcastVar is null")
>>   (e, "empty")
>> }
>> else {
>>   val str = broadcastVar.value.mkString(" | ")
>>   l.info("broadcastVar is " + str)
>>   (e, str)
>> }
>>   })
>>
>>   logger.info("** Total reduced count: " +
>> mappedEvents.collect().length)
>> }
>>
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-var-is-null-tp24927p24950.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Trying PCA on spark but serialization is error thrown

2015-10-06 Thread Simon Hebert
Hi,

I tried to used the PCA object in one of my project but end up receiving a
serialization error. Any help would be appreciated. Example taken from
https://spark.apache.org/docs/latest/mllib-feature-extraction.html#pca

My Code:
val selector = new PCA(20)
val transformer = selector.fit(discretizedData.map(_.features))
val filteredData = discretizedData.map(lp => lp.copy(features =
transformer.transform(lp.features)))

Stack trace:
scala> val filteredData = discretizedData.map(lp => lp.copy(features =
transformer.transform(lp.features)))
org.apache.spark.SparkException: Task not serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2021)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:314)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:313)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
at org.apache.spark.rdd.RDD.map(RDD.scala:313)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:40)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:45)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:47)
at $iwC$$iwC$$iwC$$iwC$$iwC.(:49)
at $iwC$$iwC$$iwC$$iwC.(:51)
at $iwC$$iwC$$iwC.(:53)
at $iwC$$iwC.(:55)
at $iwC.(:57)
at (:59)
at .(:63)
at .()
at .(:7)
at .()
at $print()
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
at
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
at
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
at
org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
at
org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
at org.apache.spark.repl.SparkILoop.org
$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
at
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
at
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org
$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException:
org.apache.spark.mllib.feature.PCA
Serialization stack:
- object not serializable (class:
org.apache.spark.mllib.feature.PCA, value:
org.apache.spark.mllib.feature.PCA@51148636)
- field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name:
selector, type: class org.apache.spark.mllib.feature.PCA)
- object (class 

Re: [Spark on YARN] Multiple Auxiliary Shuffle Service Versions

2015-10-06 Thread Steve Loughran

On 6 Oct 2015, at 01:23, Andrew Or 
> wrote:

Both the history server and the shuffle service are backward compatible, but 
not forward compatible. This means as long as you have the latest version of 
history server / shuffle service running in your cluster then you're fine (you 
don't need multiple of them).

FWIW I've just created a JIRA on tracking/reporting version mismatch on history 
server playback better: https://issues.apache.org/jira/browse/SPARK-10950

Even though the UI can't be expected to playback later histories, it could be 
possible to report the issue in a way that users can act on "run a later 
version", rather than raise support calls.



Re: does KafkaCluster can be public ?

2015-10-06 Thread Cody Koeninger
I personally think KafkaCluster (or the equivalent) should be made public.
When I'm deploying spark I just sed out the private[spark] and rebuild.

There's a general reluctance to make things public due to backwards
compatibility, but if enough people ask for it... ?

On Tue, Oct 6, 2015 at 6:51 AM, Jonathan Coveney  wrote:

> You can put a class in the org.apache.spark namespace to access anything
> that is private[spark]. You can then make enrichments there to access
> whatever you need. Just beware upgrade pain :)
>
>
> El martes, 6 de octubre de 2015, Erwan ALLAIN 
> escribió:
>
>> Hello,
>>
>> I'm currently testing spark streaming with kafka.
>> I'm creating DirectStream with KafkaUtils and everything's fine. However
>> I would like to use the signature where I can specify my own message
>> handler (to play with partition and offset). In this case, I need to manage
>> offset/partition by myself to fill fromOffsets argument.
>> I have found a Jira on this usecase
>> https://issues.apache.org/jira/browse/SPARK-6714 but it has been closed
>> telling that it's too specific.
>> I'm aware that it can be done using kafka api (TopicMetaDataRequest and
>> OffsetRequest) but what I have to do is almost the same as the KafkaCluster
>> which is private.
>>
>> is it possible to :
>>  - add another signature in KafkaUtils ?
>>  - make KafkaCluster public ?
>>
>> or do you have any other srmart solution where I don't need to copy/paste
>> KafkaCluster ?
>>
>> Thanks.
>>
>> Regards,
>> Erwan ALLAIN
>>
>


Re: Spark job workflow engine recommendations

2015-10-06 Thread Vikram Kone
Does Azkaban support scheduling long running jobs like spark steaming jobs?
Will Azkaban kill a job if it's running for a long time.

On Friday, August 7, 2015, Vikram Kone  wrote:

> Hien,
> Is Azkaban being phased out at linkedin as rumored? If so, what's linkedin
> going to use for workflow scheduling? Is there something else that's going
> to replace Azkaban?
>
> On Fri, Aug 7, 2015 at 11:25 AM, Ted Yu  > wrote:
>
>> In my opinion, choosing some particular project among its peers should
>> leave enough room for future growth (which may come faster than you
>> initially think).
>>
>> Cheers
>>
>> On Fri, Aug 7, 2015 at 11:23 AM, Hien Luu > > wrote:
>>
>>> Scalability is a known issue due the the current architecture.  However
>>> this will be applicable if you run more 20K jobs per day.
>>>
>>> On Fri, Aug 7, 2015 at 10:30 AM, Ted Yu >> > wrote:
>>>
 From what I heard (an ex-coworker who is Oozie committer), Azkaban is
 being phased out at LinkedIn because of scalability issues (though UI-wise,
 Azkaban seems better).

 Vikram:
 I suggest you do more research in related projects (maybe using their
 mailing lists).

 Disclaimer: I don't work for LinkedIn.

 On Fri, Aug 7, 2015 at 10:12 AM, Nick Pentreath <
 nick.pentre...@gmail.com
 > wrote:

> Hi Vikram,
>
> We use Azkaban (2.5.0) in our production workflow scheduling. We just
> use local mode deployment and it is fairly easy to set up. It is pretty
> easy to use and has a nice scheduling and logging interface, as well as
> SLAs (like kill job and notify if it doesn't complete in 3 hours or
> whatever).
>
> However Spark support is not present directly - we run everything with
> shell scripts and spark-submit. There is a plugin interface where one 
> could
> create a Spark plugin, but I found it very cumbersome when I did
> investigate and didn't have the time to work through it to develop that.
>
> It has some quirks and while there is actually a REST API for adding
> jos and dynamically scheduling jobs, it is not documented anywhere so you
> kinda have to figure it out for yourself. But in terms of ease of use I
> found it way better than Oozie. I haven't tried Chronos, and it seemed
> quite involved to set up. Haven't tried Luigi either.
>
> Spark job server is good but as you say lacks some stuff like
> scheduling and DAG type workflows (independent of spark-defined job 
> flows).
>
>
> On Fri, Aug 7, 2015 at 7:00 PM, Jörn Franke  > wrote:
>
>> Check also falcon in combination with oozie
>>
>> Le ven. 7 août 2015 à 17:51, Hien Luu  a
>> écrit :
>>
>>> Looks like Oozie can satisfy most of your requirements.
>>>
>>>
>>>
>>> On Fri, Aug 7, 2015 at 8:43 AM, Vikram Kone >> > wrote:
>>>
 Hi,
 I'm looking for open source workflow tools/engines that allow us to
 schedule spark jobs on a datastax cassandra cluster. Since there are 
 tonnes
 of alternatives out there like Ozzie, Azkaban, Luigi , Chronos etc, I
 wanted to check with people here to see what they are using today.

 Some of the requirements of the workflow engine that I'm looking
 for are

 1. First class support for submitting Spark jobs on Cassandra. Not
 some wrapper Java code to submit tasks.
 2. Active open source community support and well tested at
 production scale.
 3. Should be dead easy to write job dependencices using XML or web
 interface . Ex; job A depends on Job B and Job C, so run Job A after B 
 and
 C are finished. Don't need to write full blown java applications to 
 specify
 job parameters and dependencies. Should be very simple to use.
 4. Time based  recurrent scheduling. Run the spark jobs at a given
 time every hour or day or week or month.
 5. Job monitoring, alerting on failures and email notifications on
 daily basis.

 I have looked at Ooyala's spark job server which seems to be hated
 towards making spark jobs run faster by sharing contexts between the 
 jobs
 but isn't a full blown workflow engine per se. A combination of spark 
 job
 server and workflow engine would be ideal

 Thanks for the inputs

>>>

Deep learning example using spark

2015-10-06 Thread Angel Angel
Hello Sir/Madam,

I am working on the deep learning using spark.

I have implemented some algorithms using spark.

bot now i want to used the imageNet database in spark-1.0.4..

Can you give me some guideline or reference so i can handle the imageNet
database.




Thanking You,
Sagar Jadhav.


Re: How to avoid Spark shuffle spill memory?

2015-10-06 Thread David Mitchell
Hi unk1102,

Try adding more memory to your nodes.  Are you running Spark in the cloud?
If so, increase the memory on your servers.
Do you have default parallelism set (spark.default.parallelism)?  If so,
unset it, and let Spark decided how many partitions to allocate.
You can also try refactoring your code to make is use less memory.

David

On Tue, Oct 6, 2015 at 3:19 PM, unk1102  wrote:

> Hi I have a Spark job which runs for around 4 hours and it shared
> SparkContext and runs many child jobs. When I see each job in UI I see
> shuffle spill of around 30 to 40 GB and because of that many times
> executors
> gets lost because of using physical memory beyond limits how do I avoid
> shuffle spill? I have tried almost all optimisations nothing is helping I
> dont cache anything I am using Spark 1.4.1 and also using tungsten,codegen
> etc  I am using spark.shuffle.storage as 0.2 and spark.storage.memory as
> 0.2
> I tried to increase shuffle memory to 0.6 but then it halts in GC pause
> causing my executor to timeout and then getting lost eventually.
>
> Please guide. Thanks in advance.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-avoid-Spark-shuffle-spill-memory-tp24960.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
### Confidential e-mail, for recipient's (or recipients') eyes only, not
for distribution. ###


Re: Broadcast var is null

2015-10-06 Thread dpristin
This advice solved the problem: "Stop having your object extend App, and
instead give it a 
main method."  https://issues.apache.org/jira/browse/SPARK-4170




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-var-is-null-tp24927p24959.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark cache memory storage

2015-10-06 Thread Lan Jiang
Hi, there

My understanding is that the cache storage is calculated as following

executor heap size * spark.storage.safetyFraction *
spark.storage.memoryFraction.

The default value for safetyFraction is 0.9 and memoryFraction is 0.6. When
I started a spark job on YARN, I set executor-memory to be 6g. thus I
expect the memory cache to be 6 * 0.9 * 0.6 = 3.24g. However, on the Spark
history server, it shows the reserved cached size for each executor is
3.1g. So it does not add up. What do I miss?

Lan


Re: How can I disable logging when running local[*]?

2015-10-06 Thread Jeff Jones
Here’s an example. I echoed JAVA_OPTS so that you can see what I’ve got. Then I 
call ‘activator run’ in the project directory.


jjones-mac:analyzer-perf jjones$ echo $JAVA_OPTS

-Xmx4g -Xmx4g 
-Dlog4j.configuration=file:/Users/jjones/src/adaptive/adaptiveobjects/analyzer-perf/conf/log4j.properties

jjones-mac:analyzer-perf jjones$ activator run

[info] Loading project definition from 
/Users/jjones/src/adaptive/adaptiveobjects/analyzer-perf/project

[info] Set current project to analyzer-perf (in build 
file:/Users/jjones/src/adaptive/adaptiveobjects/analyzer-perf/)

[info] Running com.adaptive.analyzer.perf.AnalyzerPerf

11:15:24.066 [run-main-0] INFO  org.apache.spark.SparkContext - Running Spark 
version 1.4.1

11:15:24.150 [run-main-0] DEBUG o.a.h.m.lib.MutableMetricsFactory - field 
org.apache.hadoop.metrics2.lib.MutableRate 
org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginSuccess with 
annotation @org.apache.hadoop.metrics2.annotation.Metric(about=, always=false, 
sampleName=Ops, type=DEFAULT, value=[Rate of successful kerberos logins and 
latency (milliseconds)], valueName=Time)

11:15:24.156 [run-main-0] DEBUG o.a.h.m.lib.MutableMetricsFactory - field 
org.apache.hadoop.metrics2.lib.MutableRate 
org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginFailure with 
annotation @org.apache.hadoop.metrics2.annotation.Metric(about=, always=false, 
sampleName=Ops, type=DEFAULT, value=[Rate of failed kerberos logins and latency 
(milliseconds)], valueName=Time)

As I mentioned below but repeated for completeness, I also have this in my code.


import org.apache.log4j.PropertyConfigurator

PropertyConfigurator.configure("conf/log4j.properties")
Logger.getRootLogger().setLevel(Level.OFF)
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)

And here’s my log4j.properties (note, I’ve also tried setting the level to OFF):


# Set everything to be logged to the console

log4j.rootCategory=WARN

log4j.appender.console=org.apache.log4j.ConsoleAppender

log4j.appender.console.layout=org.apache.log4j.PatternLayout

log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: 
%m%n


# Change this to set Spark log level

log4j.logger.org.apache.spark=WARN


# Silence akka remoting

log4j.logger.Remoting=WARN


# Ignore messages below warning level from Jetty, because it's a bit verbose

log4j.logger.org.eclipse.jetty=WARN


spark.log.threshold=OFF

spark.root.logger=OFF,DRFA


From: Alex Kozlov
Date: Tuesday, October 6, 2015 at 10:50 AM
To: Jeff Jones
Cc: "user@spark.apache.org"
Subject: Re: How can I disable logging when running local[*]?

Try

JAVA_OPTS='-Dlog4j.configuration=file:/'

Internally, this is just spark.driver.extraJavaOptions, which you should be 
able to set in conf/spark-defaults.conf

Can you provide more details how you invoke the driver?

On Tue, Oct 6, 2015 at 9:48 AM, Jeff Jones 
> wrote:
Thanks. Any chance you know how to pass this to a Scala app that is run via 
TypeSafe activator?

I tried putting it $JAVA_OPTS but I get:

Unrecognized option: --driver-java-options

Error: Could not create the Java Virtual Machine.

Error: A fatal exception has occurred. Program will exit.


I tried a bunch of different quoting but nothing produced a good result. I also 
tried passing it directly to activator using –jvm but it still produces the 
same results with verbose logging. Is there a way I can tell if it’s picking up 
my file?



From: Alex Kozlov
Date: Monday, October 5, 2015 at 8:34 PM
To: Jeff Jones
Cc: "user@spark.apache.org"
Subject: Re: How can I disable logging when running local[*]?

Did you try “--driver-java-options 
'-Dlog4j.configuration=file:/'” and setting the 
log4j.rootLogger=FATAL,console?

On Mon, Oct 5, 2015 at 8:19 PM, Jeff Jones 
> wrote:
I’ve written an application that hosts the Spark driver in-process using 
“local[*]”. I’ve turned off logging in my conf/log4j.properties file. I’ve also 
tried putting the following code prior to creating my SparkContext. These were 
coupled together from various posts I’ve. None of these steps have worked. I’m 
still getting a ton of logging to the console. Anything else I can try?

Thanks,
Jeff

private def disableLogging(): Unit = {
  import org.apache.log4j.PropertyConfigurator

  PropertyConfigurator.configure("conf/log4j.properties")
  Logger.getRootLogger().setLevel(Level.OFF)
  Logger.getLogger("org").setLevel(Level.OFF)
  Logger.getLogger("akka").setLevel(Level.OFF)
}


This message (and any attachments) is intended only for the designated 
recipient(s). It
may contain confidential or proprietary information, or have other limitations 
on use as
indicated by the sender. If you are not a designated recipient, you may not 
review, use,
copy or distribute this message. If you 

Re: compatibility issue with Jersey2

2015-10-06 Thread Marcelo Vanzin
On Tue, Oct 6, 2015 at 12:04 PM, Gary Ogden  wrote:
> But we run unit tests differently in our build environment, which is
> throwing the error. It's setup like this:
>
> I suspect this is what you were referring to when you said I have a problem?

Yes, that is what I was referring to. But, in your test environment,
you might be able to work around the problem by setting
"spark.ui.enabled=false"; that should disable all the code that uses
Jersey, so you can use your newer version in your unit tests.


-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How can I disable logging when running local[*]?

2015-10-06 Thread Alexander Pivovarov
The easiest way to control logging in spark shell is to run Logger.setLevel
commands at the beginning of your program
e.g.

org.apache.log4j.Logger.getLogger("com.amazon").setLevel(org.apache.log4j.Level.WARN)
org.apache.log4j.Logger.getLogger("com.amazonaws").setLevel(org.apache.log4j.Level.WARN)
org.apache.log4j.Logger.getLogger("amazon.emr").setLevel(org.apache.log4j.Level.WARN)
org.apache.log4j.Logger.getLogger("akka").setLevel(org.apache.log4j.Level.WARN)

On Tue, Oct 6, 2015 at 10:50 AM, Alex Kozlov  wrote:

> Try
>
> JAVA_OPTS='-Dlog4j.configuration=file:/'
>
> Internally, this is just spark.driver.extraJavaOptions, which you should
> be able to set in conf/spark-defaults.conf
>
> Can you provide more details how you invoke the driver?
>
> On Tue, Oct 6, 2015 at 9:48 AM, Jeff Jones 
> wrote:
>
>> Thanks. Any chance you know how to pass this to a Scala app that is run
>> via TypeSafe activator?
>>
>> I tried putting it $JAVA_OPTS but I get:
>>
>> Unrecognized option: --driver-java-options
>>
>> Error: Could not create the Java Virtual Machine.
>>
>> Error: A fatal exception has occurred. Program will exit.
>>
>>
>> I tried a bunch of different quoting but nothing produced a good result.
>> I also tried passing it directly to activator using –jvm but it still
>> produces the same results with verbose logging. Is there a way I can tell
>> if it’s picking up my file?
>>
>>
>>
>> From: Alex Kozlov
>> Date: Monday, October 5, 2015 at 8:34 PM
>> To: Jeff Jones
>> Cc: "user@spark.apache.org"
>> Subject: Re: How can I disable logging when running local[*]?
>>
>> Did you try “--driver-java-options
>> '-Dlog4j.configuration=file:/'” and setting the
>> log4j.rootLogger=FATAL,console?
>>
>> On Mon, Oct 5, 2015 at 8:19 PM, Jeff Jones 
>> wrote:
>>
>>> I’ve written an application that hosts the Spark driver in-process using
>>> “local[*]”. I’ve turned off logging in my conf/log4j.properties file. I’ve
>>> also tried putting the following code prior to creating my SparkContext.
>>> These were coupled together from various posts I’ve. None of these steps
>>> have worked. I’m still getting a ton of logging to the console. Anything
>>> else I can try?
>>>
>>> Thanks,
>>> Jeff
>>>
>>> private def disableLogging(): Unit = {
>>>   import org.apache.log4j.PropertyConfigurator
>>>
>>>   PropertyConfigurator.configure("conf/log4j.properties")
>>>   Logger.getRootLogger().setLevel(Level.OFF)
>>>   Logger.getLogger("org").setLevel(Level.OFF)
>>>   Logger.getLogger("akka").setLevel(Level.OFF)
>>> }
>>>
>>>
>>>
>>> This message (and any attachments) is intended only for the designated
>>> recipient(s). It
>>> may contain confidential or proprietary information, or have other
>>> limitations on use as
>>> indicated by the sender. If you are not a designated recipient, you may
>>> not review, use,
>>> copy or distribute this message. If you received this in error, please
>>> notify the sender by
>>> reply e-mail and delete this message.
>>>
>>
>>
>>
>> --
>> Alex Kozlov
>> (408) 507-4987
>> (408) 830-9982 fax
>> (650) 887-2135 efax
>> ale...@gmail.com
>>
>>
>> This message (and any attachments) is intended only for the designated
>> recipient(s). It
>> may contain confidential or proprietary information, or have other
>> limitations on use as
>> indicated by the sender. If you are not a designated recipient, you may
>> not review, use,
>> copy or distribute this message. If you received this in error, please
>> notify the sender by
>> reply e-mail and delete this message.
>>
>
>


Re: ORC files created by Spark job can't be accessed using hive table

2015-10-06 Thread Umesh Kacha
Thanks Michael so the following code written using Spark 1.5.1 should be
able to recognise by Hive table right

dataFrame.write().mode(SaveMode.Append).partitionBy("
entity","date").format("orc").save("baseTable");

Hive console:
Create external table bla bla
stored as ORC
Location '/user/xyz/baseTable'

On Tue, Oct 6, 2015 at 10:54 PM, Michael Armbrust 
wrote:

> I believe this is fixed in Spark 1.5.1 as long as the table is only using
> types that hive understands and is not partitioned.  The problem with
> partitioned tables it that hive does not support dynamic discovery unless
> you manually run the repair command.
>
> On Tue, Oct 6, 2015 at 9:33 AM, Umesh Kacha  wrote:
>
>> Hi Ted thanks I know I solved that by using dataframe for both reading
>> and writing. I am running into different problem now if spark can read hive
>> orc files why can't hive read orc files created by Spark?
>> On Oct 6, 2015 9:28 PM, "Ted Yu"  wrote:
>>
>>> See this thread:
>>> http://search-hadoop.com/m/q3RTtwwjNxXvPEe1
>>>
>>> A brief search in Spark JIRAs didn't find anything opened on this
>>> subject.
>>>
>>> On Tue, Oct 6, 2015 at 8:51 AM, unk1102  wrote:
>>>
 Hi I have a spark job which creates ORC files in partitions using the
 following code


 dataFrame.write().mode(SaveMode.Append).partitionBy("entity","date").format("orc").save("baseTable");

 Above code creates successfully orc files which is readable in Spark
 dataframe

 But when I try to load orc files generated using above code into hive
 orc
 table or hive external table nothing gets printed looks like table is
 empty
 what's wrong here I can see orc files in hdfs but hive table does not
 read
 it please guide



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/ORC-files-created-by-Spark-job-can-t-be-accessed-using-hive-table-tp24954.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


>>>
>


Re: Weird performance pattern of Spark Streaming (1.4.1) + direct Kafka

2015-10-06 Thread Gerard Maas
Hi Cody,

The job is doing ETL from Kafka records to Cassandra. After a
single filtering stage on Spark, the 'TL' part is done using the
dstream.foreachRDD{rdd.foreachPartition{...TL ...}} pattern.

We have metrics on the executor work which we collect and add together,
indicated here by 'local computation'.  As you can see, we also measure how
much it cost us to measure :-)
See how 'local work'  times are comparable.  What's not visible is the task
scheduling and consuming the data from Kafka which becomes part of the
'spark computation' part.

The pattern we see is 1 fast, 1 slow, 1 fast,... zig...zag...

Are there metrics available somehow on the Kafka reading time?

Slow TaskFast Tasklocal computation347.6281.53spark computation6930263metric
collection70138wall clock process7000401total records processed42975002

(time in ms)

kr, Gerard.


On Tue, Oct 6, 2015 at 8:01 PM, Cody Koeninger <c...@koeninger.org> wrote:

> Can you say anything more about what the job is doing?
>
> First thing I'd do is try to get some metrics on the time taken by your
> code on the executors (e.g. when processing the iterator) to see if it's
> consistent between the two situations.
>
> On Tue, Oct 6, 2015 at 11:45 AM, Gerard Maas <gerard.m...@gmail.com>
> wrote:
>
>> Hi,
>>
>> We recently migrated our streaming jobs to the direct kafka receiver. Our
>> initial migration went quite fine but now we are seeing a weird zig-zag
>> performance pattern we cannot explain.
>> In alternating fashion, one task takes about 1 second to finish and the
>> next takes 7sec for a stable streaming rate.
>>
>> Here are comparable metrics for two successive tasks:
>> *Slow*:
>>
>>
>> ​
>>
>> Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded Tasks
>> 20151006-044141-2408867082-5050-21047-S0dnode-3.hdfs.private:3686322 s303
>> 20151006-044141-2408867082-5050-21047-S1dnode-0.hdfs.private:4381240 s110
>> 1120151006-044141-2408867082-5050-21047-S4dnode-5.hdfs.private:5994549 s
>> 10010
>> *Fast*:
>>
>> ​
>>
>> Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded Tasks
>> 20151006-044141-2408867082-5050-21047-S0dnode-3.hdfs.private:368630.6 s40
>> 420151006-044141-2408867082-5050-21047-S1dnode-0.hdfs.private:438121 s909
>> 20151006-044141-2408867082-5050-21047-S4dnode-5.hdfs.private:599451 s110
>> 11
>> We have some custom metrics that measure wall-clock time of execution of
>> certain blocks of the job, like the time it takes to do the local
>> computations (RDD.foreachPartition closure) vs total time.
>> The difference between the slow and fast executing task is on the 'spark
>> computation time' which is wall-clock for the task scheduling
>> (DStream.foreachRDD closure)
>>
>> e.g.
>> Slow task:
>>
>> local computation time: 347.6096849996, *spark computation time:
>> 6930*, metric collection: 70, total process: 7000, total_records: 4297
>>
>> Fast task:
>> local computation time: 281.539042,* spark computation time: 263*,
>> metric collection: 138, total process: 401, total_records: 5002
>>
>> We are currently running Spark 1.4.1. The load and the work to be done is
>> stable -this is on a dev env with that stuff under control.
>>
>> Any ideas what this behavior could be?
>>
>> thanks in advance,  Gerard.
>>
>>
>>
>>
>>
>>
>>
>


Re: Weird performance pattern of Spark Streaming (1.4.1) + direct Kafka

2015-10-06 Thread Cody Koeninger
Can you say anything more about what the job is doing?

First thing I'd do is try to get some metrics on the time taken by your
code on the executors (e.g. when processing the iterator) to see if it's
consistent between the two situations.

On Tue, Oct 6, 2015 at 11:45 AM, Gerard Maas <gerard.m...@gmail.com> wrote:

> Hi,
>
> We recently migrated our streaming jobs to the direct kafka receiver. Our
> initial migration went quite fine but now we are seeing a weird zig-zag
> performance pattern we cannot explain.
> In alternating fashion, one task takes about 1 second to finish and the
> next takes 7sec for a stable streaming rate.
>
> Here are comparable metrics for two successive tasks:
> *Slow*:
>
>
> ​
>
> Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded Tasks
> 20151006-044141-2408867082-5050-21047-S0dnode-3.hdfs.private:3686322 s303
> 20151006-044141-2408867082-5050-21047-S1dnode-0.hdfs.private:4381240 s110
> 1120151006-044141-2408867082-5050-21047-S4dnode-5.hdfs.private:5994549 s10
> 010
> *Fast*:
>
> ​
>
> Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded Tasks
> 20151006-044141-2408867082-5050-21047-S0dnode-3.hdfs.private:368630.6 s404
> 20151006-044141-2408867082-5050-21047-S1dnode-0.hdfs.private:438121 s909
> 20151006-044141-2408867082-5050-21047-S4dnode-5.hdfs.private:599451 s11011
> We have some custom metrics that measure wall-clock time of execution of
> certain blocks of the job, like the time it takes to do the local
> computations (RDD.foreachPartition closure) vs total time.
> The difference between the slow and fast executing task is on the 'spark
> computation time' which is wall-clock for the task scheduling
> (DStream.foreachRDD closure)
>
> e.g.
> Slow task:
>
> local computation time: 347.6096849996, *spark computation time: 6930*,
> metric collection: 70, total process: 7000, total_records: 4297
>
> Fast task:
> local computation time: 281.539042,* spark computation time: 263*, metric
> collection: 138, total process: 401, total_records: 5002
>
> We are currently running Spark 1.4.1. The load and the work to be done is
> stable -this is on a dev env with that stuff under control.
>
> Any ideas what this behavior could be?
>
> thanks in advance,  Gerard.
>
>
>
>
>
>
>


Re: compatibility issue with Jersey2

2015-10-06 Thread Gary Ogden
In our separate environments we run it with spark-submit, so I can give
that a try.

But we run unit tests differently in our build environment, which is
throwing the error. It's setup like this:

helper = new CassandraHelper(settings.getCassandra().get());
SparkConf sparkConf = getCassSparkConf(helper);
sparkConf.setMaster("local[*]");
sparkConf.setAppName("TEST");
sparkConf.set("spark.driver.allowMultipleContexts", "true");

sc = new JavaSparkContext(sparkConf);

I suspect this is what you were referring to when you said I have a problem?



On 6 October 2015 at 15:40, Marcelo Vanzin  wrote:

> On Tue, Oct 6, 2015 at 5:57 AM, oggie  wrote:
> > We have a Java app written with spark 1.3.1. That app also uses Jersey
> 2.9
> > client to make external calls.  We see spark 1.4.1 uses Jersey 1.9.
>
> How is this app deployed? If it's run via spark-submit, you could use
> "spark.{driver,executor}.userClassPathFirst" to make your app use
> jersey 2.9 while letting Spark use the older jersey.
>
> If you're somehow embedding Spark and running everything in the same
> classloader, then you have a problem.
>
> --
> Marcelo
>


How to avoid Spark shuffle spill memory?

2015-10-06 Thread unk1102
Hi I have a Spark job which runs for around 4 hours and it shared
SparkContext and runs many child jobs. When I see each job in UI I see
shuffle spill of around 30 to 40 GB and because of that many times executors
gets lost because of using physical memory beyond limits how do I avoid
shuffle spill? I have tried almost all optimisations nothing is helping I
dont cache anything I am using Spark 1.4.1 and also using tungsten,codegen
etc  I am using spark.shuffle.storage as 0.2 and spark.storage.memory as 0.2
I tried to increase shuffle memory to 0.6 but then it halts in GC pause
causing my executor to timeout and then getting lost eventually.

Please guide. Thanks in advance.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-avoid-Spark-shuffle-spill-memory-tp24960.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark-submit --packages using different resolver

2015-10-06 Thread Jerry Lam
This is the ticket SPARK-10951


Cheers~

On Tue, Oct 6, 2015 at 11:33 AM, Jerry Lam  wrote:

> Hi Burak,
>
> Thank you for the tip.
> Unfortunately it does not work. It throws:
>
> java.net.MalformedURLException: unknown protocol: s3n]
> at
> org.apache.spark.deploy.SparkSubmitUtils$.resolveMavenCoordinates(SparkSubmit.scala:1003)
> at
> org.apache.spark.deploy.SparkSubmit$.prepareSubmitEnvironment(SparkSubmit.scala:286)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:153)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> It looks like the meat is in the createRepoResolvers which does not
> currently support s3 repo. I will file a jira ticket for this.
>
> Best Regards,
>
> Jerry
>
> On Sat, Oct 3, 2015 at 12:50 PM, Burak Yavuz  wrote:
>
>> Hi Jerry,
>>
>> The --packages feature doesn't support private repositories right now.
>> However, in the case of s3, maybe it might work. Could you please try using
>> the --repositories flag and provide the address:
>> `$ spark-submit --packages my:awesome:package --repositories
>> s3n://$aws_ak:$aws_sak@bucket/path/to/repo`
>>
>> If that doesn't work, could you please file a JIRA?
>>
>> Best,
>> Burak
>>
>>
>> On Thu, Oct 1, 2015 at 8:58 PM, Jerry Lam  wrote:
>>
>>> Hi spark users and developers,
>>>
>>> I'm trying to use spark-submit --packages against private s3 repository.
>>> With sbt, I'm using fm-sbt-s3-resolver with proper aws s3 credentials. I
>>> wonder how can I add this resolver into spark-submit such that --packages
>>> can resolve dependencies from private repo?
>>>
>>> Thank you!
>>>
>>> Jerry
>>>
>>
>>
>


Re: Weird performance pattern of Spark Streaming (1.4.1) + direct Kafka

2015-10-06 Thread Cody Koeninger
I'm not clear on what you're measuring.  Can you post relevant code
snippets including the measurement code?

As far as kafka metrics, nothing currently.  There is an info-level log
message every time a kafka rdd iterator is instantiated,

log.info(s"Computing topic ${part.topic}, partition ${part.partition} "
+

  s"offsets ${part.fromOffset} -> ${part.untilOffset}")


If you log once you're done with an iterator you should be able to see the
delta.

The other thing to try is reduce the number of parts involved in the job to
isolate it ... first thing I'd do there is take cassandra out of the
equation.



On Tue, Oct 6, 2015 at 2:00 PM, Gerard Maas <gerard.m...@gmail.com> wrote:

> Hi Cody,
>
> The job is doing ETL from Kafka records to Cassandra. After a
> single filtering stage on Spark, the 'TL' part is done using the
> dstream.foreachRDD{rdd.foreachPartition{...TL ...}} pattern.
>
> We have metrics on the executor work which we collect and add together,
> indicated here by 'local computation'.  As you can see, we also measure how
> much it cost us to measure :-)
> See how 'local work'  times are comparable.  What's not visible is the
> task scheduling and consuming the data from Kafka which becomes part of the
> 'spark computation' part.
>
> The pattern we see is 1 fast, 1 slow, 1 fast,... zig...zag...
>
> Are there metrics available somehow on the Kafka reading time?
>
> Slow TaskFast Tasklocal computation347.6281.53spark computation6930263metric
> collection70138wall clock process7000401total records processed42975002
>
> (time in ms)
>
> kr, Gerard.
>
>
> On Tue, Oct 6, 2015 at 8:01 PM, Cody Koeninger <c...@koeninger.org> wrote:
>
>> Can you say anything more about what the job is doing?
>>
>> First thing I'd do is try to get some metrics on the time taken by your
>> code on the executors (e.g. when processing the iterator) to see if it's
>> consistent between the two situations.
>>
>> On Tue, Oct 6, 2015 at 11:45 AM, Gerard Maas <gerard.m...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> We recently migrated our streaming jobs to the direct kafka receiver.
>>> Our initial migration went quite fine but now we are seeing a weird zig-zag
>>> performance pattern we cannot explain.
>>> In alternating fashion, one task takes about 1 second to finish and the
>>> next takes 7sec for a stable streaming rate.
>>>
>>> Here are comparable metrics for two successive tasks:
>>> *Slow*:
>>>
>>>
>>> ​
>>>
>>> Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded Tasks
>>> 20151006-044141-2408867082-5050-21047-S0dnode-3.hdfs.private:3686322 s30
>>> 320151006-044141-2408867082-5050-21047-S1dnode-0.hdfs.private:4381240 s
>>> 1101120151006-044141-2408867082-5050-21047-S4dnode-5.hdfs.private:5994549
>>> s10010
>>> *Fast*:
>>>
>>> ​
>>>
>>> Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded Tasks
>>> 20151006-044141-2408867082-5050-21047-S0dnode-3.hdfs.private:368630.6 s4
>>> 0420151006-044141-2408867082-5050-21047-S1dnode-0.hdfs.private:438121 s9
>>> 0920151006-044141-2408867082-5050-21047-S4dnode-5.hdfs.private:599451 s
>>> 11011
>>> We have some custom metrics that measure wall-clock time of execution of
>>> certain blocks of the job, like the time it takes to do the local
>>> computations (RDD.foreachPartition closure) vs total time.
>>> The difference between the slow and fast executing task is on the 'spark
>>> computation time' which is wall-clock for the task scheduling
>>> (DStream.foreachRDD closure)
>>>
>>> e.g.
>>> Slow task:
>>>
>>> local computation time: 347.6096849996, *spark computation time:
>>> 6930*, metric collection: 70, total process: 7000, total_records: 4297
>>>
>>> Fast task:
>>> local computation time: 281.539042,* spark computation time: 263*,
>>> metric collection: 138, total process: 401, total_records: 5002
>>>
>>> We are currently running Spark 1.4.1. The load and the work to be done
>>> is stable -this is on a dev env with that stuff under control.
>>>
>>> Any ideas what this behavior could be?
>>>
>>> thanks in advance,  Gerard.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>


Re: Lookup / Access of master data in spark streaming

2015-10-06 Thread Olivier Girardot
That's great !  Thanks !
So to sum up, to do some kind of "always up-to-date" lookup we can use
broadcast variables and re-broadcast when the data has changed using
whether the "transform" RDD to RDD transformation, "foreachRDD" or
transformWith.

Thank you for your time

Regards,

2015-10-05 23:49 GMT+02:00 Tathagata Das :

> Yes, when old broacast objects are not referenced any more in the driver,
> then associated data in the driver AND the executors will get cleared.
>
> On Mon, Oct 5, 2015 at 1:40 PM, Olivier Girardot <
> o.girar...@lateral-thoughts.com> wrote:
>
>> @td does that mean that the "old" broadcasted data will in any way be
>> "garbage collected" at some point if no RDD or transformation is using it
>> anymore ?
>>
>> Regards,
>>
>> Olivier.
>>
>> 2015-04-09 21:49 GMT+02:00 Amit Assudani :
>>
>>> Thanks a lot TD for detailed answers. The answers lead to few more
>>> questions,
>>>
>>>
>>>1. "the transform RDD-to-RDD function runs on the driver “ - I
>>>didn’t understand this, does it mean when I use transform function on
>>>DStream, it is not parallelized, surely I m missing something here.
>>>2.  updateStateByKey I think won’t work in this use case,  I have
>>>three separate attribute streams ( with different frequencies ) make up 
>>> the
>>>combined state ( i.e. Entity ) at point in time on which I want to do 
>>> some
>>>processing. Do you think otherwise ?
>>>3. transform+join seems only option so far, but any guestimate how
>>>would this perform/ react on cluster ? Assuming, master data in 100s of
>>>Gbs, and join is based on some row key. We are talking about slice of
>>>stream data to be joined with 100s of Gbs of master data continuously. Is
>>>it something can be done but should not be done ?
>>>
>>> Regards,
>>> Amit
>>>
>>> From: Tathagata Das 
>>> Date: Thursday, April 9, 2015 at 3:13 PM
>>> To: amit assudani 
>>> Cc: "user@spark.apache.org" 
>>> Subject: Re: Lookup / Access of master data in spark streaming
>>>
>>> Responses inline. Hope they help.
>>>
>>> On Thu, Apr 9, 2015 at 8:20 AM, Amit Assudani 
>>> wrote:
>>>
 Hi Friends,

 I am trying to solve a use case in spark streaming, I need help on
 getting to right approach on lookup / update the master data.

 Use case ( simplified )
 I’ve a dataset of entity with three attributes and identifier/row key
 in a persistent store.

 Each attribute along with row key come from a different stream let’s
 say, effectively 3 source streams.

 Now whenever any attribute comes up, I want to update/sync the
 persistent store and do some processing, but the processing would require
 the latest state of entity with latest values of three attributes.

 I wish if I have the all the entities cached in some sort of
 centralized cache ( like we have data in hdfs ) within spark streaming
 which may be used for data local processing. But I assume there is no such
 thing.

 potential approaches I m thinking of, I suspect first two are not
 feasible, but I want to confirm,
   1.  Is Broadcast Variables mutable ? If yes, can I use it as
 cache for all entities sizing  around 100s of GBs provided i have a cluster
 with enough RAM.

>>>
>>> Broadcast variables are not mutable. But you can always create a new
>>> broadcast variable when you want and use the "latest" broadcast variable in
>>> your computation.
>>>
>>> dstream.transform { rdd =>
>>>
>>>val latestBroacast = getLatestBroadcastVariable()  // fetch existing
>>> or update+create new and return
>>>val transformedRDD = rdd. ..  // use  latestBroacast in RDD
>>> tranformations
>>>transformedRDD
>>> }
>>>
>>> Since the transform RDD-to-RDD function runs on the driver every batch
>>> interval, it will always use the latest broadcast variable that you want.
>>> Though note that whenever you create a new broadcast, the next batch may
>>> take a little longer to as the data needs to be actually broadcasted out.
>>> That can also be made asynchronous by running a simple task (to force the
>>> broadcasting out) on any new broadcast variable in a different thread as
>>> Spark Streaming batch schedule, but using the same underlying Spark Context.
>>>
>>>
>>>

1. Is there any kind of sticky partition possible, so that I route
my stream data to go through the same node where I've the corresponding
entities, subset of entire store, cached in memory within JVM / off 
 heap on
the node, this would avoid lookups from store.

 You could use updateStateByKey. That is quite sticky, but does not
>>> eliminate the possibility that it can run on a different node. In fact this
>>> is necessary for fault-tolerance - what if the node it was supposed to run
>>> goes 

Re: Spark SQL with Hive error: "Conf non-local session path expected to be non-null;"

2015-10-06 Thread er.jayants...@gmail.com
I have recently upgraded from spark 1.2 to spark 1.3.

After upgrade I made necessary changes to incorporate DataFrames instead of
JavaSchemaRDD.
Now I am getting this error message
*("org.apache.spark.sql.AnalysisException: Conf non-local session path
expected to be non-null")* while running my spark program written in Java on
my Spark cluster.

I am creating HiveContext as shown below:
JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
HiveContext sqlContext = new HiveContext(sparkContext.sc());

Any idea.. why i am getting this error.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-with-Hive-error-Conf-non-local-session-path-expected-to-be-non-null-tp24922p24942.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: 1.5 Build Errors

2015-10-06 Thread Benjamin Zaitlen
Hi All,

Sean patiently worked with me in solving this issue.  The problem was
entirely my fault in settings MAVEN_OPTS env variable was set and was
overriding everything.

--Ben

On Tue, Sep 8, 2015 at 1:37 PM, Benjamin Zaitlen  wrote:

> Yes, just reran with the following
>
> (spark_build)root@ip-10-45-130-206:~/spark# export MAVEN_OPTS="-Xmx4096mb
>> -XX:MaxPermSize=1024M -XX:ReservedCodeCacheSize=1024m"
>> (spark_build)root@ip-10-45-130-206:~/spark# build/mvn -Pyarn
>> -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests clean package
>
>
> and grepping for java
>
>
> root   641  9.9  0.3 4411732 49040 pts/4   Sl+  17:35   0:01
>> /usr/lib/jvm/java-7-openjdk-amd64/bin/java -server -Xmx2g
>> -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m
>> -Dzinc.home=/root/spark/build/zinc-0.3.5.3 -classpath
>> /root/spark/build/zinc-0.3.5.3/lib/compiler-interface-sources.jar:/root/spark/build/zinc-0.3.5.3/lib/incremental-compiler.jar:/root/spark/build/zinc-0.3.5.3/lib/nailgun-server.jar:/root/spark/build/zinc-0.3.5.3/lib/sbt-interface.jar:/root/spark/build/zinc-0.3.5.3/lib/scala-compiler.jar:/root/spark/build/zinc-0.3.5.3/lib/scala-library.jar:/root/spark/build/zinc-0.3.5.3/lib/scala-reflect.jar:/root/spark/build/zinc-0.3.5.3/lib/zinc.jar
>> com.typesafe.zinc.Nailgun 3030 0
>> root   687  226  2.0 1803664 312876 pts/4  Sl+  17:36   0:22
>> /usr/lib/jvm/java-7-openjdk-amd64/bin/java -Xms256m -Xmx512m -classpath
>> /opt/anaconda/envs/spark_build/share/apache-maven-3.3.3/boot/plexus-classworlds-2.5.2.jar
>> -Dclassworlds.conf=/opt/anaconda/envs/spark_build/share/apache-maven-3.3.3/bin/m2.conf
>> -Dmaven.home=/opt/anaconda/envs/spark_build/share/apache-maven-3.3.3
>> -Dmaven.multiModuleProjectDirectory=/root/spark
>> org.codehaus.plexus.classworlds.launcher.Launcher -DzincPort=3030 -Pyarn
>> -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests clean package
>
>
> On Tue, Sep 8, 2015 at 1:14 PM, Sean Owen  wrote:
>
>> MAVEN_OPTS shouldn't affect zinc as it's an unrelated application. You
>> can run "zinc -J-Xmx4g..." in general, but in the provided script,
>> ZINC_OPTS seems to be the equivalent, yes. It kind of looks like your
>> mvn process isn't getting any special memory args there. Is MAVEN_OPTS
>> really exported?
>>
>> FWIW I use my own local mvn and zinc and it works fine.
>>
>> On Tue, Sep 8, 2015 at 6:05 PM, Benjamin Zaitlen 
>> wrote:
>> > I'm running zinv while compiling.  It seems that MAVEN_OPTS doesn't
>> really
>> > change much?  Or perhaps I'm misunderstanding something -- grepping for
>> java
>> > i see
>> >
>> >> root 24355  102  8.8 4687376 1350724 pts/4 Sl   16:51  11:08
>> >> /usr/lib/jvm/java-7-openjdk-amd64/bin/java -server -Xmx2g
>> >> -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m
>> >> -Dzinc.home=/root/spark/build/zinc-0.3.5.3 -classpath
>> >>
>> /root/spark/build/zinc-0.3.5.3/lib/compiler-interface-sources.jar:/root/spark/build/zinc-0.3.5.3/lib/incremental-compiler.jar:/root/spark/build/zinc-0.3.5.3/lib/nailgun-server.jar:/root/spark/build/zinc-0.3.5.3/lib/sbt-interface.jar:/root/spark/build/zinc-0.3.5.3/lib/scala-compiler.jar:/root/spark/build/zinc-0.3.5.3/lib/scala-library.jar:/root/spark/build/zinc-0.3.5.3/lib/scala-reflect.jar:/root/spark/build/zinc-0.3.5.3/lib/zinc.jar
>> >> com.typesafe.zinc.Nailgun 3030 0
>> >> root 25151 22.0  3.2 2269092 495276 pts/4  Sl+  16:53   1:56
>> >> /usr/lib/jvm/java-7-openjdk-amd64/bin/java -Xms256m -Xmx512m -classpath
>> >>
>> /opt/anaconda/envs/spark_build/share/apache-maven-3.3.3/boot/plexus-classworlds-2.5.2.jar
>> >>
>> -Dclassworlds.conf=/opt/anaconda/envs/spark_build/share/apache-maven-3.3.3/bin/m2.conf
>> >> -Dmaven.home=/opt/anaconda/envs/spark_build/share/apache-maven-3.3.3
>> >> -Dmaven.multiModuleProjectDirectory=/root/spark
>> >> org.codehaus.plexus.classworlds.launcher.Launcher -DzincPort=3030 clean
>> >> package -DskipTests -Pyarn -Phive -Phive-thriftserver -Phadoop-2.4
>> >> -Dhadoop.version=2.4.0
>> >
>> >
>> > So the heap size is still 2g even with MAVEN_OPTS set with 4g.  I
>> noticed
>> > that within build/mvn _COMPILE_JVM_OPTS is set to 2g and this is what
>> > ZINC_OPTS is set to.
>> >
>> > --Ben
>> >
>> >
>> > On Tue, Sep 8, 2015 at 11:06 AM, Ted Yu  wrote:
>> >>
>> >> Do you run Zinc while compiling ?
>> >>
>> >> Cheers
>> >>
>> >> On Tue, Sep 8, 2015 at 7:56 AM, Benjamin Zaitlen 
>> >> wrote:
>> >>>
>> >>> I'm still getting errors with 3g.  I've increase to 4g and I'll report
>> >>> back
>> >>>
>> >>> To be clear:
>> >>>
>> >>> export MAVEN_OPTS="-Xmx4g -XX:MaxPermSize=1024M
>> >>> -XX:ReservedCodeCacheSize=1024m"
>> >>>
>>  [ERROR] GC overhead limit exceeded -> [Help 1]
>>  [ERROR]
>>  [ERROR] To see the full stack trace of the errors, re-run Maven with
>> the
>>  -e switch.
>>  [ERROR] Re-run Maven using the -X switch to enable full debug
>> logging.
>>  [ERROR]
>>  [ERROR] For more 

Help with big data operation performance

2015-10-06 Thread Saif.A.Ellafi
Hi all,

In a stand-alone cluster operation, with more than 80 gbs of ram in each node, 
am trying to:

1.  load a partitioned json dataframe which weights around 100GB as input
2.  apply transformations such as cast some column types
3.  get some percentiles which involves sort by, rdd transformations and 
lookup actions
4.  flatten many unordered rows with HiveContext df explode function
5.  compute histogram to each column of the dataframe

I have tried many strategies, from threaded per-column transformations, to a 
for loop with each column. Tried to put as many common transformations out of 
this loop as possible, and persist the original dataframe, at different points. 
Also FIFO vs FAIR, speculation, etc.

With a limited dataframe to 50k rows, the operation is succesful in local[32], 
in a somewhat short amount of time, eg 30 minutes for all columns. Where around 
14 minutes is only loading the data into memory and doing a count. But when I 
want to go further away, I need to go into cluster mode.

The first transformations and the percentile calculations, are very fast, 
faster than local mode. But computing the histogram on the final dataframe gets 
stuck, it seems stuck at garbage collection operations and never completes. 
Trying the 50k limited dataframe within cluster mode, happens exactly the same, 
where local mode succeeded. The task seems to hang at some random point out out 
of always the same stage, at the very end of the histogram computation.

I have also tried repartitioning up to 1024 pieces and coalescing down to 1 
piece, with no different result. The process always hangs up in cluster mode. 
Local mode cannot handle this big operation and ends up stuck somewhere as 
well. I have enabled the parallel GCer too.

How can I proceed and diagnose?
Any help thankful
Saif



Does feature parity exist between Scala and Python on Spark

2015-10-06 Thread dant
Hi,
I'm hearing a common theme running that I should only do serious programming
in Scala on Spark (1.5.1). Real power users use Scala. It is said that
Python is great for analytics but in the end the code should be written to
Scala to finalise. There are a number of reasons I'm hearing:

1. Spark is written in Scala so will always be faster than any other
language implementation on top of it.
2. Spark releases always favour more features being visible and enabled for
Scala API than Python API.

Are there any truth's to the above? I'm a little sceptical.

Thanks
Dan



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Does-feature-parity-exist-between-Scala-and-Python-on-Spark-tp24961.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Does feature parity exist between Spark and PySpark

2015-10-06 Thread dant
Hi

I'm hearing a common theme running that I should only do serious programming
in Scala on Spark (1.5.1). Real power users use Scala. It is said that
Python is great for analytics but in the end the code should be written to
Scala to finalise. There are a number of reasons I'm hearing:

1. Spark is written in Scala so will always be faster than any other
language implementation on top of it.
2. Spark releases always favour more features being visible and enabled for
Scala API than Python API.

Are there any truth's to the above? I'm a little sceptical.

Apologies for the duplication, my previous message was held up due to
subscription issue. Reposting now.

Thanks
Dan




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Does-feature-parity-exist-between-Spark-and-PySpark-tp24963.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: laziness in textFile reading from HDFS?

2015-10-06 Thread Matt Narrell
One.

I read in LZO compressed files from HDFS
Perform a map operation
cache the results of this map operation
call saveAsHadoopFile to write LZO back to HDFS.

Without the cache, the job will stall.  

mn

> On Oct 5, 2015, at 7:25 PM, Mohammed Guller  wrote:
> 
> Is there any specific reason for caching the RDD? How many passes you make 
> over the dataset? 
> 
> Mohammed
> 
> -Original Message-
> From: Matt Narrell [mailto:matt.narr...@gmail.com] 
> Sent: Saturday, October 3, 2015 9:50 PM
> To: Mohammed Guller
> Cc: davidkl; user@spark.apache.org
> Subject: Re: laziness in textFile reading from HDFS?
> 
> Is there any more information or best practices here?  I have the exact same 
> issues when reading large data sets from HDFS (larger than available RAM) and 
> I cannot run without setting the RDD persistence level to 
> MEMORY_AND_DISK_SER, and using nearly all the cluster resources.
> 
> Should I repartition this RDD to be equal to the number of cores?  
> 
> I notice that the job duration on the YARN UI is about 30 minutes longer than 
> the Spark UI.  When the job initially starts, there is no tasks shown in the 
> Spark UI..?
> 
> All I;m doing is reading records from HDFS text files with sc.textFile, and 
> rewriting them back to HDFS grouped by a timestamp.
> 
> Thanks,
> mn
> 
>> On Sep 29, 2015, at 8:06 PM, Mohammed Guller  wrote:
>> 
>> 1) It is not required to have the same amount of memory as data. 
>> 2) By default the # of partitions are equal to the number of HDFS 
>> blocks
>> 3) Yes, the read operation is lazy
>> 4) It is okay to have more number of partitions than number of cores. 
>> 
>> Mohammed
>> 
>> -Original Message-
>> From: davidkl [mailto:davidkl...@hotmail.com]
>> Sent: Monday, September 28, 2015 1:40 AM
>> To: user@spark.apache.org
>> Subject: laziness in textFile reading from HDFS?
>> 
>> Hello,
>> 
>> I need to process a significant amount of data every day, about 4TB. This 
>> will be processed in batches of about 140GB. The cluster this will be 
>> running on doesn't have enough memory to hold the dataset at once, so I am 
>> trying to understand how this works internally.
>> 
>> When using textFile to read an HDFS folder (containing multiple files), I 
>> understand that the number of partitions created are equal to the number of 
>> HDFS blocks, correct? Are those created in a lazy way? I mean, if the number 
>> of blocks/partitions is larger than the number of cores/threads the Spark 
>> driver was launched with (N), are N partitions created initially and then 
>> the rest when required? Or are all those partitions created up front?
>> 
>> I want to avoid reading the whole data into memory just to spill it out to 
>> disk if there is no enough memory.
>> 
>> Thanks! 
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/laziness-in-textFi
>> le-reading-from-HDFS-tp24837.html Sent from the Apache Spark User List 
>> mailing list archive at Nabble.com.
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For 
>> additional commands, e-mail: user-h...@spark.apache.org
>> 
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For 
>> additional commands, e-mail: user-h...@spark.apache.org
>> 
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: GraphX: How can I tell if 2 nodes are connected?

2015-10-06 Thread Dino Fancellu
Ok, thanks, just wanted to make sure I wasn't missing something
obvious. I've worked with Neo4j cypher as well, where it was rather
more obvious.

e.g. http://neo4j.com/docs/milestone/query-match.html#_shortest_path
http://neo4j.com/docs/stable/cypher-refcard/

Dino.

On 6 October 2015 at 06:43, Robineast [via Apache Spark User List]
 wrote:
> GraphX doesn't implement Tinkerpop functionality but there is an external
> effort to provide an implementation. See
> https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-4279
> Robin East
> Spark GraphX in Action Michael Malak and Robin East
> Manning Publications Co.
> http://www.manning.com/books/spark-graphx-in-action
>
>
> 
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-How-can-I-tell-if-2-nodes-are-connected-tp24926p24941.html
> To unsubscribe from GraphX: How can I tell if 2 nodes are connected?, click
> here.
> NAML




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-How-can-I-tell-if-2-nodes-are-connected-tp24926p24944.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: StructType has more rows, than corresponding Row has objects.

2015-10-06 Thread Eugene Morozov
Davies,

that seemed to be my issue, my colleague helped me to resolved it. The
problem was that we build RDD and corresponding StructType by
ourselves (no json, parquet, cassandra, etc - we take a list of business
objects and convert them to Rows, then infer struct type) and I missed one
thing.
--
Be well!
Jean Morozov

On Tue, Oct 6, 2015 at 1:58 AM, Davies Liu  wrote:

> Could you tell us a way to reproduce this failure? Reading from JSON or
> Parquet?
>
> On Mon, Oct 5, 2015 at 4:28 AM, Eugene Morozov
>  wrote:
> > Hi,
> >
> > We're building our own framework on top of spark and we give users pretty
> > complex schema to work with. That requires from us to build dataframes by
> > ourselves: we transform business objects to rows and struct types and
> uses
> > these two to create dataframe.
> >
> > Everything was fine until I started to upgrade to spark 1.5.0 (from
> 1.3.1).
> > Seems to be catalyst engine has been changed and now using almost the
> same
> > code to produce rows and struct types I have the following:
> > http://ibin.co/2HzUsoe9O96l, some of rows in the end result have
> different
> > number of values and corresponding struct types.
> >
> > I'm almost sure it's my own fault, but there is always a small chance,
> that
> > something is wrong in spark codebase. If you've seen something similar
> or if
> > there is a jira for smth similar, I'd be glad to know. Thanks.
> > --
> > Be well!
> > Jean Morozov
>


Re: How can I disable logging when running local[*]?

2015-10-06 Thread Alex Kozlov
Try

JAVA_OPTS='-Dlog4j.configuration=file:/'

Internally, this is just spark.driver.extraJavaOptions, which you should be
able to set in conf/spark-defaults.conf

Can you provide more details how you invoke the driver?

On Tue, Oct 6, 2015 at 9:48 AM, Jeff Jones 
wrote:

> Thanks. Any chance you know how to pass this to a Scala app that is run
> via TypeSafe activator?
>
> I tried putting it $JAVA_OPTS but I get:
>
> Unrecognized option: --driver-java-options
>
> Error: Could not create the Java Virtual Machine.
>
> Error: A fatal exception has occurred. Program will exit.
>
>
> I tried a bunch of different quoting but nothing produced a good result. I
> also tried passing it directly to activator using –jvm but it still
> produces the same results with verbose logging. Is there a way I can tell
> if it’s picking up my file?
>
>
>
> From: Alex Kozlov
> Date: Monday, October 5, 2015 at 8:34 PM
> To: Jeff Jones
> Cc: "user@spark.apache.org"
> Subject: Re: How can I disable logging when running local[*]?
>
> Did you try “--driver-java-options
> '-Dlog4j.configuration=file:/'” and setting the
> log4j.rootLogger=FATAL,console?
>
> On Mon, Oct 5, 2015 at 8:19 PM, Jeff Jones 
> wrote:
>
>> I’ve written an application that hosts the Spark driver in-process using
>> “local[*]”. I’ve turned off logging in my conf/log4j.properties file. I’ve
>> also tried putting the following code prior to creating my SparkContext.
>> These were coupled together from various posts I’ve. None of these steps
>> have worked. I’m still getting a ton of logging to the console. Anything
>> else I can try?
>>
>> Thanks,
>> Jeff
>>
>> private def disableLogging(): Unit = {
>>   import org.apache.log4j.PropertyConfigurator
>>
>>   PropertyConfigurator.configure("conf/log4j.properties")
>>   Logger.getRootLogger().setLevel(Level.OFF)
>>   Logger.getLogger("org").setLevel(Level.OFF)
>>   Logger.getLogger("akka").setLevel(Level.OFF)
>> }
>>
>>
>>
>> This message (and any attachments) is intended only for the designated
>> recipient(s). It
>> may contain confidential or proprietary information, or have other
>> limitations on use as
>> indicated by the sender. If you are not a designated recipient, you may
>> not review, use,
>> copy or distribute this message. If you received this in error, please
>> notify the sender by
>> reply e-mail and delete this message.
>>
>
>
>
> --
> Alex Kozlov
> (408) 507-4987
> (408) 830-9982 fax
> (650) 887-2135 efax
> ale...@gmail.com
>
>
> This message (and any attachments) is intended only for the designated
> recipient(s). It
> may contain confidential or proprietary information, or have other
> limitations on use as
> indicated by the sender. If you are not a designated recipient, you may
> not review, use,
> copy or distribute this message. If you received this in error, please
> notify the sender by
> reply e-mail and delete this message.
>


Re: compatibility issue with Jersey2

2015-10-06 Thread Marcelo Vanzin
On Tue, Oct 6, 2015 at 5:57 AM, oggie  wrote:
> We have a Java app written with spark 1.3.1. That app also uses Jersey 2.9
> client to make external calls.  We see spark 1.4.1 uses Jersey 1.9.

How is this app deployed? If it's run via spark-submit, you could use
"spark.{driver,executor}.userClassPathFirst" to make your app use
jersey 2.9 while letting Spark use the older jersey.

If you're somehow embedding Spark and running everything in the same
classloader, then you have a problem.

-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Weird performance pattern of Spark Streaming (1.4.1) + direct Kafka

2015-10-06 Thread Jeff Nadler
Gerard - any chance this is related to task locality waiting?Can you
try (just as a diagnostic) something like this, does the unexpected delay
go away?

.set("spark.locality.wait", "0")


On Tue, Oct 6, 2015 at 12:00 PM, Gerard Maas <gerard.m...@gmail.com> wrote:

> Hi Cody,
>
> The job is doing ETL from Kafka records to Cassandra. After a
> single filtering stage on Spark, the 'TL' part is done using the
> dstream.foreachRDD{rdd.foreachPartition{...TL ...}} pattern.
>
> We have metrics on the executor work which we collect and add together,
> indicated here by 'local computation'.  As you can see, we also measure how
> much it cost us to measure :-)
> See how 'local work'  times are comparable.  What's not visible is the
> task scheduling and consuming the data from Kafka which becomes part of the
> 'spark computation' part.
>
> The pattern we see is 1 fast, 1 slow, 1 fast,... zig...zag...
>
> Are there metrics available somehow on the Kafka reading time?
>
> Slow TaskFast Tasklocal computation347.6281.53spark computation6930263metric
> collection70138wall clock process7000401total records processed42975002
>
> (time in ms)
>
> kr, Gerard.
>
>
> On Tue, Oct 6, 2015 at 8:01 PM, Cody Koeninger <c...@koeninger.org> wrote:
>
>> Can you say anything more about what the job is doing?
>>
>> First thing I'd do is try to get some metrics on the time taken by your
>> code on the executors (e.g. when processing the iterator) to see if it's
>> consistent between the two situations.
>>
>> On Tue, Oct 6, 2015 at 11:45 AM, Gerard Maas <gerard.m...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> We recently migrated our streaming jobs to the direct kafka receiver.
>>> Our initial migration went quite fine but now we are seeing a weird zig-zag
>>> performance pattern we cannot explain.
>>> In alternating fashion, one task takes about 1 second to finish and the
>>> next takes 7sec for a stable streaming rate.
>>>
>>> Here are comparable metrics for two successive tasks:
>>> *Slow*:
>>>
>>>
>>> ​
>>>
>>> Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded Tasks
>>> 20151006-044141-2408867082-5050-21047-S0dnode-3.hdfs.private:3686322 s30
>>> 320151006-044141-2408867082-5050-21047-S1dnode-0.hdfs.private:4381240 s
>>> 1101120151006-044141-2408867082-5050-21047-S4dnode-5.hdfs.private:5994549
>>> s10010
>>> *Fast*:
>>>
>>> ​
>>>
>>> Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded Tasks
>>> 20151006-044141-2408867082-5050-21047-S0dnode-3.hdfs.private:368630.6 s4
>>> 0420151006-044141-2408867082-5050-21047-S1dnode-0.hdfs.private:438121 s9
>>> 0920151006-044141-2408867082-5050-21047-S4dnode-5.hdfs.private:599451 s
>>> 11011
>>> We have some custom metrics that measure wall-clock time of execution of
>>> certain blocks of the job, like the time it takes to do the local
>>> computations (RDD.foreachPartition closure) vs total time.
>>> The difference between the slow and fast executing task is on the 'spark
>>> computation time' which is wall-clock for the task scheduling
>>> (DStream.foreachRDD closure)
>>>
>>> e.g.
>>> Slow task:
>>>
>>> local computation time: 347.6096849996, *spark computation time:
>>> 6930*, metric collection: 70, total process: 7000, total_records: 4297
>>>
>>> Fast task:
>>> local computation time: 281.539042,* spark computation time: 263*,
>>> metric collection: 138, total process: 401, total_records: 5002
>>>
>>> We are currently running Spark 1.4.1. The load and the work to be done
>>> is stable -this is on a dev env with that stuff under control.
>>>
>>> Any ideas what this behavior could be?
>>>
>>> thanks in advance,  Gerard.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>


Re: [Spark on YARN] Multiple Auxiliary Shuffle Service Versions

2015-10-06 Thread Alex Rovner
Thank you all for your help.

*Alex Rovner*
*Director, Data Engineering *
*o:* 646.759.0052

* *

On Tue, Oct 6, 2015 at 11:17 AM, Steve Loughran 
wrote:

>
> On 6 Oct 2015, at 01:23, Andrew Or  wrote:
>
> Both the history server and the shuffle service are backward compatible,
> but not forward compatible. This means as long as you have the latest
> version of history server / shuffle service running in your cluster then
> you're fine (you don't need multiple of them).
>
>
> FWIW I've just created a JIRA on tracking/reporting version mismatch on
> history server playback better:
> https://issues.apache.org/jira/browse/SPARK-10950
>
> Even though the UI can't be expected to playback later histories, it could
> be possible to report the issue in a way that users can act on "run a later
> version", rather than raise support calls.
>
>


Re: Does feature parity exist between Spark and PySpark

2015-10-06 Thread Richard Eggert
Since the Python API is built on top of the Scala implementation,  its
performance can be at best roughly the same as that of the Scala API (as in
the case of DataFrames and SQL) and at worst several orders of magnitude
slower.

Likewise,  since the a Scala implementation of new features necessarily
needs to be completed before they can be ported to other languages, those
other languages tend to lag behind the Scala API, though I believe the
development team has been making a conscious effort for the past few months
to get those other languages as up to date as possible before publishing
new releases.

Third party extensions to Spark are generally written in Scala but rarely
ported to other languages.

Additionally,  Scala's type system can make it a bit easier to keep track
of the structure of your data while you are implementing complex
transformations, at least for regular RDDs (not as helpful for
SQL/DataFrames), and there are a lot of nest tricks you can do with
implicits and pattern matching to improve the readability of your code.

That said,  there are plenty of people using the Python API quite
effectively,  so using Scala is by no means a requirement,  though it does
have several advantages.

Anyway,  that's my two cents.

Rich
On Oct 6, 2015 7:40 PM, "ayan guha"  wrote:

> Hi
>
> 2 cents
>
> 1. It should not be true anymore if data frames are used. The reason is
> regardless of the language DF uses same optimization engine behind the
> scene.
> 2. This is generally true in the sense Python APIs are typically  little
> behind of scala/java ones.
>
> Best
> Ayan
>
> On Wed, Oct 7, 2015 at 9:15 AM, dant  wrote:
>
>> Hi
>>
>> I'm hearing a common theme running that I should only do serious
>> programming
>> in Scala on Spark (1.5.1). Real power users use Scala. It is said that
>> Python is great for analytics but in the end the code should be written to
>> Scala to finalise. There are a number of reasons I'm hearing:
>>
>> 1. Spark is written in Scala so will always be faster than any other
>> language implementation on top of it.
>> 2. Spark releases always favour more features being visible and enabled
>> for
>> Scala API than Python API.
>>
>> Are there any truth's to the above? I'm a little sceptical.
>>
>> Apologies for the duplication, my previous message was held up due to
>> subscription issue. Reposting now.
>>
>> Thanks
>> Dan
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Does-feature-parity-exist-between-Spark-and-PySpark-tp24963.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>


Re: Does feature parity exist between Spark and PySpark

2015-10-06 Thread Richard Eggert
That should have read "a lot of neat tricks", not "a lot of nest tricks".
That's what I get for sending emails on my phone
On Oct 6, 2015 8:32 PM, "Richard Eggert"  wrote:

> Since the Python API is built on top of the Scala implementation,  its
> performance can be at best roughly the same as that of the Scala API (as in
> the case of DataFrames and SQL) and at worst several orders of magnitude
> slower.
>
> Likewise,  since the a Scala implementation of new features necessarily
> needs to be completed before they can be ported to other languages, those
> other languages tend to lag behind the Scala API, though I believe the
> development team has been making a conscious effort for the past few months
> to get those other languages as up to date as possible before publishing
> new releases.
>
> Third party extensions to Spark are generally written in Scala but rarely
> ported to other languages.
>
> Additionally,  Scala's type system can make it a bit easier to keep track
> of the structure of your data while you are implementing complex
> transformations, at least for regular RDDs (not as helpful for
> SQL/DataFrames), and there are a lot of nest tricks you can do with
> implicits and pattern matching to improve the readability of your code.
>
> That said,  there are plenty of people using the Python API quite
> effectively,  so using Scala is by no means a requirement,  though it does
> have several advantages.
>
> Anyway,  that's my two cents.
>
> Rich
> On Oct 6, 2015 7:40 PM, "ayan guha"  wrote:
>
>> Hi
>>
>> 2 cents
>>
>> 1. It should not be true anymore if data frames are used. The reason is
>> regardless of the language DF uses same optimization engine behind the
>> scene.
>> 2. This is generally true in the sense Python APIs are typically  little
>> behind of scala/java ones.
>>
>> Best
>> Ayan
>>
>> On Wed, Oct 7, 2015 at 9:15 AM, dant  wrote:
>>
>>> Hi
>>>
>>> I'm hearing a common theme running that I should only do serious
>>> programming
>>> in Scala on Spark (1.5.1). Real power users use Scala. It is said that
>>> Python is great for analytics but in the end the code should be written
>>> to
>>> Scala to finalise. There are a number of reasons I'm hearing:
>>>
>>> 1. Spark is written in Scala so will always be faster than any other
>>> language implementation on top of it.
>>> 2. Spark releases always favour more features being visible and enabled
>>> for
>>> Scala API than Python API.
>>>
>>> Are there any truth's to the above? I'm a little sceptical.
>>>
>>> Apologies for the duplication, my previous message was held up due to
>>> subscription issue. Reposting now.
>>>
>>> Thanks
>>> Dan
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Does-feature-parity-exist-between-Spark-and-PySpark-tp24963.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>


Re: Does feature parity exist between Scala and Python on Spark

2015-10-06 Thread DW @ Gmail
While I have a preference for Scala ( not surprising as a Typesafe person), the 
DataFrame API gives feature and performance parity for Python. The RDD API 
gives feature parity. 

So, use what makes you most successful for other reasons ;)

Sent from my rotary phone. 


> On Oct 6, 2015, at 4:14 PM, dant  wrote:
> 
> Hi,
> I'm hearing a common theme running that I should only do serious programming
> in Scala on Spark (1.5.1). Real power users use Scala. It is said that
> Python is great for analytics but in the end the code should be written to
> Scala to finalise. There are a number of reasons I'm hearing:
> 
> 1. Spark is written in Scala so will always be faster than any other
> language implementation on top of it.
> 2. Spark releases always favour more features being visible and enabled for
> Scala API than Python API.
> 
> Are there any truth's to the above? I'm a little sceptical.
> 
> Thanks
> Dan
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Does-feature-parity-exist-between-Scala-and-Python-on-Spark-tp24961.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Does feature parity exist between Spark and PySpark

2015-10-06 Thread Don Drake
If you are using Dataframes in PySpark, then the performance will be the
same as Scala.  However, if you need to implement your own UDF, or run a
map() against a DataFrame in Python, then you will pay the penalty for
performance when executing those functions since all of your data has to go
through a gateway to Python and back.

In regards to API features, Scala does get better treatment, but things are
much better in the Python API than it was even 10 months ago.

-Don


On Tue, Oct 6, 2015 at 5:15 PM, dant  wrote:

> Hi
>
> I'm hearing a common theme running that I should only do serious
> programming
> in Scala on Spark (1.5.1). Real power users use Scala. It is said that
> Python is great for analytics but in the end the code should be written to
> Scala to finalise. There are a number of reasons I'm hearing:
>
> 1. Spark is written in Scala so will always be faster than any other
> language implementation on top of it.
> 2. Spark releases always favour more features being visible and enabled for
> Scala API than Python API.
>
> Are there any truth's to the above? I'm a little sceptical.
>
> Apologies for the duplication, my previous message was held up due to
> subscription issue. Reposting now.
>
> Thanks
> Dan
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Does-feature-parity-exist-between-Spark-and-PySpark-tp24963.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
https://twitter.com/dondrake 
800-733-2143


unresolved dependency: org.apache.spark#spark-streaming_2.10;1.5.0: not found

2015-10-06 Thread shahab
Hi,

I am trying to use Spark 1.5, Mlib, but I keep getting
 "sbt.ResolveException: unresolved dependency:
org.apache.spark#spark-streaming_2.10;1.5.0: not found" .

It is weird that this happens, but I could not find any solution for this.
Does any one faced the same issue?


best,
/Shahab

Here is my SBT library dependencies:

libraryDependencies ++= Seq(

"com.google.guava" % "guava" % "16.0"  ,

"org.apache.spark" % "spark-unsafe_2.10" % "1.5.0",

"org.apache.spark" % "spark-core_2.10" % "1.5.0",

"org.apache.spark" % "spark-mllib_2.10" % "1.5.0",

"org.apache.hadoop" % "hadoop-client" % "2.6.0",

"net.java.dev.jets3t" % "jets3t" % "0.9.0" % "provided",

"com.github.nscala-time" %% "nscala-time" % "1.0.0",

"org.scalatest" % "scalatest_2.10" % "2.1.3",

"junit" % "junit" % "4.8.1" % "test",

"net.jpountz.lz4" % "lz4" % "1.2.0" % "provided",

"org.clapper" %% "grizzled-slf4j" % "1.0.2",

"net.jpountz.lz4" % "lz4" % "1.2.0" % "provided"

   )


does KafkaCluster can be public ?

2015-10-06 Thread Erwan ALLAIN
Hello,

I'm currently testing spark streaming with kafka.
I'm creating DirectStream with KafkaUtils and everything's fine. However I
would like to use the signature where I can specify my own message handler
(to play with partition and offset). In this case, I need to manage
offset/partition by myself to fill fromOffsets argument.
I have found a Jira on this usecase
https://issues.apache.org/jira/browse/SPARK-6714 but it has been closed
telling that it's too specific.
I'm aware that it can be done using kafka api (TopicMetaDataRequest and
OffsetRequest) but what I have to do is almost the same as the KafkaCluster
which is private.

is it possible to :
 - add another signature in KafkaUtils ?
 - make KafkaCluster public ?

or do you have any other srmart solution where I don't need to copy/paste
KafkaCluster ?

Thanks.

Regards,
Erwan ALLAIN


GenericMutableRow and Row mismatch on Spark 1.5?

2015-10-06 Thread Ophir Cohen
Hi Guys,
I'm upgrading to Spark 1.5.

In our previous version (Spark 1.3 but it was OK on 1.4 as well) we created
GenericMutableRow
(org.apache.spark.sql.catalyst.expressions.GenericMutableRow) and return it
as org.apache.spark.sql.Row

Starting from Spark 1.5 GenericMutableRow isn't extends Row.

What do you suggest to do?
How can I convert GenericMutableRow to Row?

Prompt answer will be highly appreciated!
Thanks,
Ophir


Re: Spark thrift service and Hive impersonation.

2015-10-06 Thread Steve Loughran

On 5 Oct 2015, at 22:51, Jagat Singh 
> wrote:

Hello Steve,

Thanks for confirmation.

Is there any work planned work on this.


Not that I'm aware of, though somebody may be doing it.

SparkSQL is not hive. It uses some of the libraries -the thriftserver stuff is 
subclassed to get thrift support, and so, transitively ODBC support. The 
sql/hive module uses the hive JAR to parse SQL and to get at data. And in Spark 
1.5, it's based off Hive 1.2.1, so can access hive metadata created by the 
current hive releases. But it's a different SQL engine underneath


Thanks,

Jagat Singh



On Wed, Sep 30, 2015 at 9:37 PM, Vinay Shukla 
> wrote:
Steve is right,
 The Spark thing server does not profs page end user identity downstream yet.



On Wednesday, September 30, 2015, Steve Loughran 
> wrote:

On 30 Sep 2015, at 03:24, Mohammed Guller  wrote:

Does each user needs to start own thrift server to use it?

No. One of the benefits of the Spark Thrift Server is that it allows multiple 
users to share a single SparkContext.

Most likely, you have file permissions issue.



I don't think the spark hive thrift server does the multi-user stuff (yet)

Mohammed

From: Jagat Singh [mailto:jagatsi...@gmail.com]
Sent: Tuesday, September 29, 2015 5:30 PM
To: SparkUser
Subject: Spark thrift service and Hive impersonation.

Hi,

I have started the Spark thrift service using spark user.

Does each user needs to start own thrift server to use it?

Using beeline i am able to connect to server and execute show tables;

However when we try to execute some real query it runs as spark user and HDFS 
permissions does not allow them to be read.

The query fails with error

0: jdbc:hive2://localhost:1> select count(*) from mytable;
Error: org.apache.hadoop.hive.ql.metadata.HiveException: Unable to fetch table 
mytable. java.security.AccessControlException: Permission denied: user=spark, 
access=READ, inode="/data/mytable":tdcprdr:tdcprdr:drwxr-x--x
at 
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkFsPermission(FSPermissionChecker.java:271)


And in thrift server we get log.


In the hive-site.xml we have impersonation enabled.

   
  hive.server2.enable.doAs
  true



  hive.server2.enable.impersonation
  true


Is there any other configuration to be done for it to work like normal hive 
thrift server.

Thanks





RE: laziness in textFile reading from HDFS?

2015-10-06 Thread Mohammed Guller
I have not used LZO compressed files from Spark, so not sure why it stalls 
without caching. 

In general, if you are going to make just one pass over the data, there is not 
much benefit in caching it. The data gets read anyway only after the first 
action is called. If you are calling just a map operation and then a save 
operation, I don't see how caching would help.

Mohammed


-Original Message-
From: Matt Narrell [mailto:matt.narr...@gmail.com] 
Sent: Tuesday, October 6, 2015 3:32 PM
To: Mohammed Guller
Cc: davidkl; user@spark.apache.org
Subject: Re: laziness in textFile reading from HDFS?

One.

I read in LZO compressed files from HDFS Perform a map operation cache the 
results of this map operation call saveAsHadoopFile to write LZO back to HDFS.

Without the cache, the job will stall.  

mn

> On Oct 5, 2015, at 7:25 PM, Mohammed Guller  wrote:
> 
> Is there any specific reason for caching the RDD? How many passes you make 
> over the dataset? 
> 
> Mohammed
> 
> -Original Message-
> From: Matt Narrell [mailto:matt.narr...@gmail.com]
> Sent: Saturday, October 3, 2015 9:50 PM
> To: Mohammed Guller
> Cc: davidkl; user@spark.apache.org
> Subject: Re: laziness in textFile reading from HDFS?
> 
> Is there any more information or best practices here?  I have the exact same 
> issues when reading large data sets from HDFS (larger than available RAM) and 
> I cannot run without setting the RDD persistence level to 
> MEMORY_AND_DISK_SER, and using nearly all the cluster resources.
> 
> Should I repartition this RDD to be equal to the number of cores?  
> 
> I notice that the job duration on the YARN UI is about 30 minutes longer than 
> the Spark UI.  When the job initially starts, there is no tasks shown in the 
> Spark UI..?
> 
> All I;m doing is reading records from HDFS text files with sc.textFile, and 
> rewriting them back to HDFS grouped by a timestamp.
> 
> Thanks,
> mn
> 
>> On Sep 29, 2015, at 8:06 PM, Mohammed Guller  wrote:
>> 
>> 1) It is not required to have the same amount of memory as data. 
>> 2) By default the # of partitions are equal to the number of HDFS 
>> blocks
>> 3) Yes, the read operation is lazy
>> 4) It is okay to have more number of partitions than number of cores. 
>> 
>> Mohammed
>> 
>> -Original Message-
>> From: davidkl [mailto:davidkl...@hotmail.com]
>> Sent: Monday, September 28, 2015 1:40 AM
>> To: user@spark.apache.org
>> Subject: laziness in textFile reading from HDFS?
>> 
>> Hello,
>> 
>> I need to process a significant amount of data every day, about 4TB. This 
>> will be processed in batches of about 140GB. The cluster this will be 
>> running on doesn't have enough memory to hold the dataset at once, so I am 
>> trying to understand how this works internally.
>> 
>> When using textFile to read an HDFS folder (containing multiple files), I 
>> understand that the number of partitions created are equal to the number of 
>> HDFS blocks, correct? Are those created in a lazy way? I mean, if the number 
>> of blocks/partitions is larger than the number of cores/threads the Spark 
>> driver was launched with (N), are N partitions created initially and then 
>> the rest when required? Or are all those partitions created up front?
>> 
>> I want to avoid reading the whole data into memory just to spill it out to 
>> disk if there is no enough memory.
>> 
>> Thanks! 
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/laziness-in-textF
>> i le-reading-from-HDFS-tp24837.html Sent from the Apache Spark User 
>> List mailing list archive at Nabble.com.
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For 
>> additional commands, e-mail: user-h...@spark.apache.org
>> 
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For 
>> additional commands, e-mail: user-h...@spark.apache.org
>> 
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: laziness in textFile reading from HDFS?

2015-10-06 Thread Matt Narrell
Agreed. This is spark 1.2 on CDH5.x. How do you mitigate where the data sets 
are larger than available memory?

My jobs stall and gc/heap issues all over the place.  

..via mobile

> On Oct 6, 2015, at 4:44 PM, Mohammed Guller  wrote:
> 
> I have not used LZO compressed files from Spark, so not sure why it stalls 
> without caching. 
> 
> In general, if you are going to make just one pass over the data, there is 
> not much benefit in caching it. The data gets read anyway only after the 
> first action is called. If you are calling just a map operation and then a 
> save operation, I don't see how caching would help.
> 
> Mohammed
> 
> 
> -Original Message-
> From: Matt Narrell [mailto:matt.narr...@gmail.com] 
> Sent: Tuesday, October 6, 2015 3:32 PM
> To: Mohammed Guller
> Cc: davidkl; user@spark.apache.org
> Subject: Re: laziness in textFile reading from HDFS?
> 
> One.
> 
> I read in LZO compressed files from HDFS Perform a map operation cache the 
> results of this map operation call saveAsHadoopFile to write LZO back to HDFS.
> 
> Without the cache, the job will stall.  
> 
> mn
> 
>> On Oct 5, 2015, at 7:25 PM, Mohammed Guller  wrote:
>> 
>> Is there any specific reason for caching the RDD? How many passes you make 
>> over the dataset? 
>> 
>> Mohammed
>> 
>> -Original Message-
>> From: Matt Narrell [mailto:matt.narr...@gmail.com]
>> Sent: Saturday, October 3, 2015 9:50 PM
>> To: Mohammed Guller
>> Cc: davidkl; user@spark.apache.org
>> Subject: Re: laziness in textFile reading from HDFS?
>> 
>> Is there any more information or best practices here?  I have the exact same 
>> issues when reading large data sets from HDFS (larger than available RAM) 
>> and I cannot run without setting the RDD persistence level to 
>> MEMORY_AND_DISK_SER, and using nearly all the cluster resources.
>> 
>> Should I repartition this RDD to be equal to the number of cores?  
>> 
>> I notice that the job duration on the YARN UI is about 30 minutes longer 
>> than the Spark UI.  When the job initially starts, there is no tasks shown 
>> in the Spark UI..?
>> 
>> All I;m doing is reading records from HDFS text files with sc.textFile, and 
>> rewriting them back to HDFS grouped by a timestamp.
>> 
>> Thanks,
>> mn
>> 
>>> On Sep 29, 2015, at 8:06 PM, Mohammed Guller  wrote:
>>> 
>>> 1) It is not required to have the same amount of memory as data. 
>>> 2) By default the # of partitions are equal to the number of HDFS 
>>> blocks
>>> 3) Yes, the read operation is lazy
>>> 4) It is okay to have more number of partitions than number of cores. 
>>> 
>>> Mohammed
>>> 
>>> -Original Message-
>>> From: davidkl [mailto:davidkl...@hotmail.com]
>>> Sent: Monday, September 28, 2015 1:40 AM
>>> To: user@spark.apache.org
>>> Subject: laziness in textFile reading from HDFS?
>>> 
>>> Hello,
>>> 
>>> I need to process a significant amount of data every day, about 4TB. This 
>>> will be processed in batches of about 140GB. The cluster this will be 
>>> running on doesn't have enough memory to hold the dataset at once, so I am 
>>> trying to understand how this works internally.
>>> 
>>> When using textFile to read an HDFS folder (containing multiple files), I 
>>> understand that the number of partitions created are equal to the number of 
>>> HDFS blocks, correct? Are those created in a lazy way? I mean, if the 
>>> number of blocks/partitions is larger than the number of cores/threads the 
>>> Spark driver was launched with (N), are N partitions created initially and 
>>> then the rest when required? Or are all those partitions created up front?
>>> 
>>> I want to avoid reading the whole data into memory just to spill it out to 
>>> disk if there is no enough memory.
>>> 
>>> Thanks! 
>>> 
>>> 
>>> 
>>> --
>>> View this message in context: 
>>> http://apache-spark-user-list.1001560.n3.nabble.com/laziness-in-textF
>>> i le-reading-from-HDFS-tp24837.html Sent from the Apache Spark User 
>>> List mailing list archive at Nabble.com.
>>> 
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For 
>>> additional commands, e-mail: user-h...@spark.apache.org
>>> 
>>> 
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For 
>>> additional commands, e-mail: user-h...@spark.apache.org
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Weird performance pattern of Spark Streaming (1.4.1) + direct Kafka

2015-10-06 Thread Cody Koeninger
I agree getting cassandra out of the picture is a good first step.

But if you just do foreachRDD { _.count } recent versions of direct stream
shouldn't do any work at all on the executor (since the number of messages
in the rdd is known already)

do a foreachPartition and println or count the iterator manually.

On Tue, Oct 6, 2015 at 6:02 PM, Tathagata Das <t...@databricks.com> wrote:

> Are sure that this is not related to Cassandra inserts? Could you just do
> foreachRDD { _.count } instead  to keep Cassandra out of the picture and
> then test this agian.
>
> On Tue, Oct 6, 2015 at 12:33 PM, Adrian Tanase <atan...@adobe.com> wrote:
>
>> Also check if the Kafka cluster is still balanced. Maybe one of the
>> brokers manages too many partitions, all the work will stay on that
>> executor unless you repartition right after kakfka (and I'm not saying you
>> should).
>>
>> Sent from my iPhone
>>
>> On 06 Oct 2015, at 22:17, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> I'm not clear on what you're measuring.  Can you post relevant code
>> snippets including the measurement code?
>>
>> As far as kafka metrics, nothing currently.  There is an info-level log
>> message every time a kafka rdd iterator is instantiated,
>>
>> log.info(s"Computing topic ${part.topic}, partition
>> ${part.partition} " +
>>
>>   s"offsets ${part.fromOffset} -> ${part.untilOffset}")
>>
>>
>> If you log once you're done with an iterator you should be able to see
>> the delta.
>>
>> The other thing to try is reduce the number of parts involved in the job
>> to isolate it ... first thing I'd do there is take cassandra out of the
>> equation.
>>
>>
>>
>> On Tue, Oct 6, 2015 at 2:00 PM, Gerard Maas <gerard.m...@gmail.com>
>> wrote:
>>
>>> Hi Cody,
>>>
>>> The job is doing ETL from Kafka records to Cassandra. After a
>>> single filtering stage on Spark, the 'TL' part is done using the
>>> dstream.foreachRDD{rdd.foreachPartition{...TL ...}} pattern.
>>>
>>> We have metrics on the executor work which we collect and add together,
>>> indicated here by 'local computation'.  As you can see, we also measure how
>>> much it cost us to measure :-)
>>> See how 'local work'  times are comparable.  What's not visible is the
>>> task scheduling and consuming the data from Kafka which becomes part of the
>>> 'spark computation' part.
>>>
>>> The pattern we see is 1 fast, 1 slow, 1 fast,... zig...zag...
>>>
>>> Are there metrics available somehow on the Kafka reading time?
>>>
>>> Slow Task Fast Task local computation 347.6 281.53 spark computation
>>> 6930 263 metric collection 70 138 wall clock process 7000 401 total
>>> records processed 4297 5002
>>>
>>> (time in ms)
>>>
>>> kr, Gerard.
>>>
>>>
>>> On Tue, Oct 6, 2015 at 8:01 PM, Cody Koeninger <c...@koeninger.org>
>>> wrote:
>>>
>>>> Can you say anything more about what the job is doing?
>>>>
>>>> First thing I'd do is try to get some metrics on the time taken by your
>>>> code on the executors (e.g. when processing the iterator) to see if it's
>>>> consistent between the two situations.
>>>>
>>>> On Tue, Oct 6, 2015 at 11:45 AM, Gerard Maas <gerard.m...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> We recently migrated our streaming jobs to the direct kafka receiver.
>>>>> Our initial migration went quite fine but now we are seeing a weird 
>>>>> zig-zag
>>>>> performance pattern we cannot explain.
>>>>> In alternating fashion, one task takes about 1 second to finish and
>>>>> the next takes 7sec for a stable streaming rate.
>>>>>
>>>>> Here are comparable metrics for two successive tasks:
>>>>> *Slow*:
>>>>>
>>>>>
>>>>> ​
>>>>>
>>>>> Executor ID Address Task Time Total Tasks Failed Tasks Succeeded Tasks
>>>>> 20151006-044141-2408867082-5050-21047-S0 dnode-3.hdfs.private:36863 22
>>>>> s 3 0 3 20151006-044141-2408867082-5050-21047-S1
>>>>> dnode-0.hdfs.private:43812 40 s 11 0 11
>>>>> 20151006-044141-2408867082-5050-21047-S4 dnode-5.hdfs.private:59945 49
>>>>> s 10 0 10
>>>>> *Fast*:
>>>>>
>>>>> ​

Re: RDD of ImmutableList

2015-10-06 Thread Jonathan Coveney
Nobody is saying not to use immutable data structures, only that guava's
aren't natively supported.

Scala's default collections library is all immutable. list, Vector, Map.
This is what people generally use, especially in scala code!

El martes, 6 de octubre de 2015, Jakub Dubovsky <
spark.dubovsky.ja...@seznam.cz> escribió:

> Thank you for quick reaction.
>
> I have to say this is very surprising to me. I never received an advice to
> stop using an immutable approach. Whole RDD is designed to be immutable
> (which is sort of sabotaged by not being able to (de)serialize immutable
> classes properly). I will ask on dev list if this is to be changed or not.
>
> Ok, I have let go initial feelings and now let's be pragmatic. And this is
> still for everyone not just Igor:
>
> I use a class from a library which is immutable. Now I want to use this
> class to represent my data in RDD because this saves me a huge amount of
> work. The class uses ImmutableList as one of its fields. That's why it
> fails. But isn't there a way to workaround this? I ask this because I
> have exactly zero knowledge about kryo and the way how it works. So for
> example would some of these two work?
>
> 1) Change the external class so that it implements writeObject, readObject
> methods (it's java). Will these methods be used by kryo? (I can ask the
> maintainers of a library to change the class if the change is reasonable.
> Adding these methods would be while dropping immutability certainly
> wouldn't)
>
> 2) Wrap the class to scala class which would translate the data during
> (de)serialization?
>
>   Thanks!
>   Jakub Dubovsky
>
> -- Původní zpráva --
> Od: Igor Berman  >
> Komu: Jakub Dubovsky  >
> Datum: 5. 10. 2015 20:11:35
> Předmět: Re: RDD of ImmutableList
>
> kryo doesn't support guava's collections by default
> I remember encountered project in github that fixes this(not sure though).
> I've ended to stop using guava collections as soon as spark rdds are
> concerned.
>
> On 5 October 2015 at 21:04, Jakub Dubovsky  > wrote:
>
> Hi all,
>
>   I would like to have an advice on how to use ImmutableList with RDD. Small
> presentation of an essence of my problem in spark-shell with guava jar
> added:
>
> scala> import com.google.common.collect.ImmutableList
> import com.google.common.collect.ImmutableList
>
> scala> val arr = Array(ImmutableList.of(1,2), ImmutableList.of(2,4),
> ImmutableList.of(3,6))
> arr: Array[com.google.common.collect.ImmutableList[Int]] = Array([1, 2],
> [2, 4], [3, 6])
>
> scala> val rdd = sc.parallelize(arr)
> rdd:
> org.apache.spark.rdd.RDD[com.google.common.collect.ImmutableList[Int]] =
> ParallelCollectionRDD[0] at parallelize at :24
>
> scala> rdd.count
>
>  This results in kryo exception saying that it cannot add a new element to
> list instance while deserialization:
>
> java.io.IOException: java.lang.UnsupportedOperationException
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163)
> at
> org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70)
> ...
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.UnsupportedOperationException
> at
> com.google.common.collect.ImmutableCollection.add(ImmutableCollection.java:91)
> at
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
> at
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:18)
> ...
>
>   It somehow makes sense. But I cannot think of a workaround and I do not
> believe that using ImmutableList with RDD is not possible. How this is
> solved?
>
>   Thank you in advance!
>
>Jakub Dubovsky
>
>
>


Re: Weird performance pattern of Spark Streaming (1.4.1) + direct Kafka

2015-10-06 Thread Tathagata Das
Good point!

On Tue, Oct 6, 2015 at 4:23 PM, Cody Koeninger <c...@koeninger.org> wrote:

> I agree getting cassandra out of the picture is a good first step.
>
> But if you just do foreachRDD { _.count } recent versions of direct stream
> shouldn't do any work at all on the executor (since the number of messages
> in the rdd is known already)
>
> do a foreachPartition and println or count the iterator manually.
>
> On Tue, Oct 6, 2015 at 6:02 PM, Tathagata Das <t...@databricks.com> wrote:
>
>> Are sure that this is not related to Cassandra inserts? Could you just do
>> foreachRDD { _.count } instead  to keep Cassandra out of the picture and
>> then test this agian.
>>
>> On Tue, Oct 6, 2015 at 12:33 PM, Adrian Tanase <atan...@adobe.com> wrote:
>>
>>> Also check if the Kafka cluster is still balanced. Maybe one of the
>>> brokers manages too many partitions, all the work will stay on that
>>> executor unless you repartition right after kakfka (and I'm not saying you
>>> should).
>>>
>>> Sent from my iPhone
>>>
>>> On 06 Oct 2015, at 22:17, Cody Koeninger <c...@koeninger.org> wrote:
>>>
>>> I'm not clear on what you're measuring.  Can you post relevant code
>>> snippets including the measurement code?
>>>
>>> As far as kafka metrics, nothing currently.  There is an info-level log
>>> message every time a kafka rdd iterator is instantiated,
>>>
>>> log.info(s"Computing topic ${part.topic}, partition
>>> ${part.partition} " +
>>>
>>>   s"offsets ${part.fromOffset} -> ${part.untilOffset}")
>>>
>>>
>>> If you log once you're done with an iterator you should be able to see
>>> the delta.
>>>
>>> The other thing to try is reduce the number of parts involved in the job
>>> to isolate it ... first thing I'd do there is take cassandra out of the
>>> equation.
>>>
>>>
>>>
>>> On Tue, Oct 6, 2015 at 2:00 PM, Gerard Maas <gerard.m...@gmail.com>
>>> wrote:
>>>
>>>> Hi Cody,
>>>>
>>>> The job is doing ETL from Kafka records to Cassandra. After a
>>>> single filtering stage on Spark, the 'TL' part is done using the
>>>> dstream.foreachRDD{rdd.foreachPartition{...TL ...}} pattern.
>>>>
>>>> We have metrics on the executor work which we collect and add together,
>>>> indicated here by 'local computation'.  As you can see, we also measure how
>>>> much it cost us to measure :-)
>>>> See how 'local work'  times are comparable.  What's not visible is the
>>>> task scheduling and consuming the data from Kafka which becomes part of the
>>>> 'spark computation' part.
>>>>
>>>> The pattern we see is 1 fast, 1 slow, 1 fast,... zig...zag...
>>>>
>>>> Are there metrics available somehow on the Kafka reading time?
>>>>
>>>> Slow Task Fast Task local computation 347.6 281.53 spark computation
>>>> 6930 263 metric collection 70 138 wall clock process 7000 401 total
>>>> records processed 4297 5002
>>>>
>>>> (time in ms)
>>>>
>>>> kr, Gerard.
>>>>
>>>>
>>>> On Tue, Oct 6, 2015 at 8:01 PM, Cody Koeninger <c...@koeninger.org>
>>>> wrote:
>>>>
>>>>> Can you say anything more about what the job is doing?
>>>>>
>>>>> First thing I'd do is try to get some metrics on the time taken by
>>>>> your code on the executors (e.g. when processing the iterator) to see if
>>>>> it's consistent between the two situations.
>>>>>
>>>>> On Tue, Oct 6, 2015 at 11:45 AM, Gerard Maas <gerard.m...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> We recently migrated our streaming jobs to the direct kafka receiver.
>>>>>> Our initial migration went quite fine but now we are seeing a weird 
>>>>>> zig-zag
>>>>>> performance pattern we cannot explain.
>>>>>> In alternating fashion, one task takes about 1 second to finish and
>>>>>> the next takes 7sec for a stable streaming rate.
>>>>>>
>>>>>> Here are comparable metrics for two successive tasks:
>>>>>> *Slow*:
>>>>>>
>>>>>>
>>>>>> ​
>>>>>>
&

Re: does KafkaCluster can be public ?

2015-10-06 Thread Tathagata Das
Given the interest, I am also inclining towards making it a public
developer API. Maybe even experimental. Cody, mind submitting a patch?


On Tue, Oct 6, 2015 at 7:45 AM, Sean Owen  wrote:

> For what it's worth, I also use this class in an app, but it happens
> to be from Java code where it acts as if it's public. So no problem
> for my use case, but I suppose, another small vote for the usefulness
> of this class to the caller. I end up using getLatestLeaderOffsets to
> figure out how to initialize initial offsets.
>
> On Tue, Oct 6, 2015 at 3:24 PM, Cody Koeninger  wrote:
> > I personally think KafkaCluster (or the equivalent) should be made
> public.
> > When I'm deploying spark I just sed out the private[spark] and rebuild.
> >
> > There's a general reluctance to make things public due to backwards
> > compatibility, but if enough people ask for it... ?
> >
> > On Tue, Oct 6, 2015 at 6:51 AM, Jonathan Coveney 
> wrote:
> >>
> >> You can put a class in the org.apache.spark namespace to access anything
> >> that is private[spark]. You can then make enrichments there to access
> >> whatever you need. Just beware upgrade pain :)
> >>
> >>
> >> El martes, 6 de octubre de 2015, Erwan ALLAIN 
> >> escribió:
> >>>
> >>> Hello,
> >>>
> >>> I'm currently testing spark streaming with kafka.
> >>> I'm creating DirectStream with KafkaUtils and everything's fine.
> However
> >>> I would like to use the signature where I can specify my own message
> handler
> >>> (to play with partition and offset). In this case, I need to manage
> >>> offset/partition by myself to fill fromOffsets argument.
> >>> I have found a Jira on this usecase
> >>> https://issues.apache.org/jira/browse/SPARK-6714 but it has been
> closed
> >>> telling that it's too specific.
> >>> I'm aware that it can be done using kafka api (TopicMetaDataRequest and
> >>> OffsetRequest) but what I have to do is almost the same as the
> KafkaCluster
> >>> which is private.
> >>>
> >>> is it possible to :
> >>>  - add another signature in KafkaUtils ?
> >>>  - make KafkaCluster public ?
> >>>
> >>> or do you have any other srmart solution where I don't need to
> copy/paste
> >>> KafkaCluster ?
> >>>
> >>> Thanks.
> >>>
> >>> Regards,
> >>> Erwan ALLAIN
> >
> >
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


RE: laziness in textFile reading from HDFS?

2015-10-06 Thread Mohammed Guller
It is not uncommon to process datasets larger than available memory with Spark. 

I don't remember whether LZO files are splittable. Perhaps, in your case Spark 
is running into issues while decompressing a large LZO file.

See if this helps:
http://stackoverflow.com/questions/25248170/spark-hadoop-throws-exception-for-large-lzo-files


Mohammed


-Original Message-
From: Matt Narrell [mailto:matt.narr...@gmail.com] 
Sent: Tuesday, October 6, 2015 4:08 PM
To: Mohammed Guller
Cc: davidkl; user@spark.apache.org
Subject: Re: laziness in textFile reading from HDFS?

Agreed. This is spark 1.2 on CDH5.x. How do you mitigate where the data sets 
are larger than available memory?

My jobs stall and gc/heap issues all over the place.  

..via mobile

> On Oct 6, 2015, at 4:44 PM, Mohammed Guller  wrote:
> 
> I have not used LZO compressed files from Spark, so not sure why it stalls 
> without caching. 
> 
> In general, if you are going to make just one pass over the data, there is 
> not much benefit in caching it. The data gets read anyway only after the 
> first action is called. If you are calling just a map operation and then a 
> save operation, I don't see how caching would help.
> 
> Mohammed
> 
> 
> -Original Message-
> From: Matt Narrell [mailto:matt.narr...@gmail.com]
> Sent: Tuesday, October 6, 2015 3:32 PM
> To: Mohammed Guller
> Cc: davidkl; user@spark.apache.org
> Subject: Re: laziness in textFile reading from HDFS?
> 
> One.
> 
> I read in LZO compressed files from HDFS Perform a map operation cache the 
> results of this map operation call saveAsHadoopFile to write LZO back to HDFS.
> 
> Without the cache, the job will stall.  
> 
> mn
> 
>> On Oct 5, 2015, at 7:25 PM, Mohammed Guller  wrote:
>> 
>> Is there any specific reason for caching the RDD? How many passes you make 
>> over the dataset? 
>> 
>> Mohammed
>> 
>> -Original Message-
>> From: Matt Narrell [mailto:matt.narr...@gmail.com]
>> Sent: Saturday, October 3, 2015 9:50 PM
>> To: Mohammed Guller
>> Cc: davidkl; user@spark.apache.org
>> Subject: Re: laziness in textFile reading from HDFS?
>> 
>> Is there any more information or best practices here?  I have the exact same 
>> issues when reading large data sets from HDFS (larger than available RAM) 
>> and I cannot run without setting the RDD persistence level to 
>> MEMORY_AND_DISK_SER, and using nearly all the cluster resources.
>> 
>> Should I repartition this RDD to be equal to the number of cores?  
>> 
>> I notice that the job duration on the YARN UI is about 30 minutes longer 
>> than the Spark UI.  When the job initially starts, there is no tasks shown 
>> in the Spark UI..?
>> 
>> All I;m doing is reading records from HDFS text files with sc.textFile, and 
>> rewriting them back to HDFS grouped by a timestamp.
>> 
>> Thanks,
>> mn
>> 
>>> On Sep 29, 2015, at 8:06 PM, Mohammed Guller  wrote:
>>> 
>>> 1) It is not required to have the same amount of memory as data. 
>>> 2) By default the # of partitions are equal to the number of HDFS 
>>> blocks
>>> 3) Yes, the read operation is lazy
>>> 4) It is okay to have more number of partitions than number of cores. 
>>> 
>>> Mohammed
>>> 
>>> -Original Message-
>>> From: davidkl [mailto:davidkl...@hotmail.com]
>>> Sent: Monday, September 28, 2015 1:40 AM
>>> To: user@spark.apache.org
>>> Subject: laziness in textFile reading from HDFS?
>>> 
>>> Hello,
>>> 
>>> I need to process a significant amount of data every day, about 4TB. This 
>>> will be processed in batches of about 140GB. The cluster this will be 
>>> running on doesn't have enough memory to hold the dataset at once, so I am 
>>> trying to understand how this works internally.
>>> 
>>> When using textFile to read an HDFS folder (containing multiple files), I 
>>> understand that the number of partitions created are equal to the number of 
>>> HDFS blocks, correct? Are those created in a lazy way? I mean, if the 
>>> number of blocks/partitions is larger than the number of cores/threads the 
>>> Spark driver was launched with (N), are N partitions created initially and 
>>> then the rest when required? Or are all those partitions created up front?
>>> 
>>> I want to avoid reading the whole data into memory just to spill it out to 
>>> disk if there is no enough memory.
>>> 
>>> Thanks! 
>>> 
>>> 
>>> 
>>> --
>>> View this message in context: 
>>> http://apache-spark-user-list.1001560.n3.nabble.com/laziness-in-text
>>> F i le-reading-from-HDFS-tp24837.html Sent from the Apache Spark 
>>> User List mailing list archive at Nabble.com.
>>> 
>>> 
>>> - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For 
>>> additional commands, e-mail: user-h...@spark.apache.org
>>> 
>>> 
>>> 
>>> - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 

Re: does KafkaCluster can be public ?

2015-10-06 Thread Cody Koeninger
Sure no prob.

On Tue, Oct 6, 2015 at 6:35 PM, Tathagata Das  wrote:

> Given the interest, I am also inclining towards making it a public
> developer API. Maybe even experimental. Cody, mind submitting a patch?
>
>
> On Tue, Oct 6, 2015 at 7:45 AM, Sean Owen  wrote:
>
>> For what it's worth, I also use this class in an app, but it happens
>> to be from Java code where it acts as if it's public. So no problem
>> for my use case, but I suppose, another small vote for the usefulness
>> of this class to the caller. I end up using getLatestLeaderOffsets to
>> figure out how to initialize initial offsets.
>>
>> On Tue, Oct 6, 2015 at 3:24 PM, Cody Koeninger 
>> wrote:
>> > I personally think KafkaCluster (or the equivalent) should be made
>> public.
>> > When I'm deploying spark I just sed out the private[spark] and rebuild.
>> >
>> > There's a general reluctance to make things public due to backwards
>> > compatibility, but if enough people ask for it... ?
>> >
>> > On Tue, Oct 6, 2015 at 6:51 AM, Jonathan Coveney 
>> wrote:
>> >>
>> >> You can put a class in the org.apache.spark namespace to access
>> anything
>> >> that is private[spark]. You can then make enrichments there to access
>> >> whatever you need. Just beware upgrade pain :)
>> >>
>> >>
>> >> El martes, 6 de octubre de 2015, Erwan ALLAIN > >
>> >> escribió:
>> >>>
>> >>> Hello,
>> >>>
>> >>> I'm currently testing spark streaming with kafka.
>> >>> I'm creating DirectStream with KafkaUtils and everything's fine.
>> However
>> >>> I would like to use the signature where I can specify my own message
>> handler
>> >>> (to play with partition and offset). In this case, I need to manage
>> >>> offset/partition by myself to fill fromOffsets argument.
>> >>> I have found a Jira on this usecase
>> >>> https://issues.apache.org/jira/browse/SPARK-6714 but it has been
>> closed
>> >>> telling that it's too specific.
>> >>> I'm aware that it can be done using kafka api (TopicMetaDataRequest
>> and
>> >>> OffsetRequest) but what I have to do is almost the same as the
>> KafkaCluster
>> >>> which is private.
>> >>>
>> >>> is it possible to :
>> >>>  - add another signature in KafkaUtils ?
>> >>>  - make KafkaCluster public ?
>> >>>
>> >>> or do you have any other srmart solution where I don't need to
>> copy/paste
>> >>> KafkaCluster ?
>> >>>
>> >>> Thanks.
>> >>>
>> >>> Regards,
>> >>> Erwan ALLAIN
>> >
>> >
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: laziness in textFile reading from HDFS?

2015-10-06 Thread Jonathan Coveney
LZO files are not splittable by default but there are projects with Input
and Output formats to make splittable LZO files. Check out twitter's
elephantbird on GitHub

El miércoles, 7 de octubre de 2015, Mohammed Guller 
escribió:

> It is not uncommon to process datasets larger than available memory with
> Spark.
>
> I don't remember whether LZO files are splittable. Perhaps, in your case
> Spark is running into issues while decompressing a large LZO file.
>
> See if this helps:
>
> http://stackoverflow.com/questions/25248170/spark-hadoop-throws-exception-for-large-lzo-files
>
>
> Mohammed
>
>
> -Original Message-
> From: Matt Narrell [mailto:matt.narr...@gmail.com ]
> Sent: Tuesday, October 6, 2015 4:08 PM
> To: Mohammed Guller
> Cc: davidkl; user@spark.apache.org 
> Subject: Re: laziness in textFile reading from HDFS?
>
> Agreed. This is spark 1.2 on CDH5.x. How do you mitigate where the data
> sets are larger than available memory?
>
> My jobs stall and gc/heap issues all over the place.
>
> ..via mobile
>
> > On Oct 6, 2015, at 4:44 PM, Mohammed Guller  > wrote:
> >
> > I have not used LZO compressed files from Spark, so not sure why it
> stalls without caching.
> >
> > In general, if you are going to make just one pass over the data, there
> is not much benefit in caching it. The data gets read anyway only after the
> first action is called. If you are calling just a map operation and then a
> save operation, I don't see how caching would help.
> >
> > Mohammed
> >
> >
> > -Original Message-
> > From: Matt Narrell [mailto:matt.narr...@gmail.com ]
> > Sent: Tuesday, October 6, 2015 3:32 PM
> > To: Mohammed Guller
> > Cc: davidkl; user@spark.apache.org 
> > Subject: Re: laziness in textFile reading from HDFS?
> >
> > One.
> >
> > I read in LZO compressed files from HDFS Perform a map operation cache
> the results of this map operation call saveAsHadoopFile to write LZO back
> to HDFS.
> >
> > Without the cache, the job will stall.
> >
> > mn
> >
> >> On Oct 5, 2015, at 7:25 PM, Mohammed Guller  > wrote:
> >>
> >> Is there any specific reason for caching the RDD? How many passes you
> make over the dataset?
> >>
> >> Mohammed
> >>
> >> -Original Message-
> >> From: Matt Narrell [mailto:matt.narr...@gmail.com ]
> >> Sent: Saturday, October 3, 2015 9:50 PM
> >> To: Mohammed Guller
> >> Cc: davidkl; user@spark.apache.org 
> >> Subject: Re: laziness in textFile reading from HDFS?
> >>
> >> Is there any more information or best practices here?  I have the exact
> same issues when reading large data sets from HDFS (larger than available
> RAM) and I cannot run without setting the RDD persistence level to
> MEMORY_AND_DISK_SER, and using nearly all the cluster resources.
> >>
> >> Should I repartition this RDD to be equal to the number of cores?
> >>
> >> I notice that the job duration on the YARN UI is about 30 minutes
> longer than the Spark UI.  When the job initially starts, there is no tasks
> shown in the Spark UI..?
> >>
> >> All I;m doing is reading records from HDFS text files with sc.textFile,
> and rewriting them back to HDFS grouped by a timestamp.
> >>
> >> Thanks,
> >> mn
> >>
> >>> On Sep 29, 2015, at 8:06 PM, Mohammed Guller  > wrote:
> >>>
> >>> 1) It is not required to have the same amount of memory as data.
> >>> 2) By default the # of partitions are equal to the number of HDFS
> >>> blocks
> >>> 3) Yes, the read operation is lazy
> >>> 4) It is okay to have more number of partitions than number of cores.
> >>>
> >>> Mohammed
> >>>
> >>> -Original Message-
> >>> From: davidkl [mailto:davidkl...@hotmail.com ]
> >>> Sent: Monday, September 28, 2015 1:40 AM
> >>> To: user@spark.apache.org 
> >>> Subject: laziness in textFile reading from HDFS?
> >>>
> >>> Hello,
> >>>
> >>> I need to process a significant amount of data every day, about 4TB.
> This will be processed in batches of about 140GB. The cluster this will be
> running on doesn't have enough memory to hold the dataset at once, so I am
> trying to understand how this works internally.
> >>>
> >>> When using textFile to read an HDFS folder (containing multiple
> files), I understand that the number of partitions created are equal to the
> number of HDFS blocks, correct? Are those created in a lazy way? I mean, if
> the number of blocks/partitions is larger than the number of cores/threads
> the Spark driver was launched with (N), are N partitions created initially
> and then the rest when required? Or are all those partitions created up
> front?
> >>>
> >>> I want to avoid reading the whole data into memory just to spill it
> out to disk if there is no enough memory.
> >>>
> >>> Thanks!
> >>>
> >>>
> >>>
> >>> --
> >>> View this message in context:
> >>> 

Re: Weird performance pattern of Spark Streaming (1.4.1) + direct Kafka

2015-10-06 Thread Tathagata Das
Are sure that this is not related to Cassandra inserts? Could you just do
foreachRDD { _.count } instead  to keep Cassandra out of the picture and
then test this agian.

On Tue, Oct 6, 2015 at 12:33 PM, Adrian Tanase <atan...@adobe.com> wrote:

> Also check if the Kafka cluster is still balanced. Maybe one of the
> brokers manages too many partitions, all the work will stay on that
> executor unless you repartition right after kakfka (and I'm not saying you
> should).
>
> Sent from my iPhone
>
> On 06 Oct 2015, at 22:17, Cody Koeninger <c...@koeninger.org> wrote:
>
> I'm not clear on what you're measuring.  Can you post relevant code
> snippets including the measurement code?
>
> As far as kafka metrics, nothing currently.  There is an info-level log
> message every time a kafka rdd iterator is instantiated,
>
> log.info(s"Computing topic ${part.topic}, partition ${part.partition}
> " +
>
>   s"offsets ${part.fromOffset} -> ${part.untilOffset}")
>
>
> If you log once you're done with an iterator you should be able to see the
> delta.
>
> The other thing to try is reduce the number of parts involved in the job
> to isolate it ... first thing I'd do there is take cassandra out of the
> equation.
>
>
>
> On Tue, Oct 6, 2015 at 2:00 PM, Gerard Maas <gerard.m...@gmail.com> wrote:
>
>> Hi Cody,
>>
>> The job is doing ETL from Kafka records to Cassandra. After a
>> single filtering stage on Spark, the 'TL' part is done using the
>> dstream.foreachRDD{rdd.foreachPartition{...TL ...}} pattern.
>>
>> We have metrics on the executor work which we collect and add together,
>> indicated here by 'local computation'.  As you can see, we also measure how
>> much it cost us to measure :-)
>> See how 'local work'  times are comparable.  What's not visible is the
>> task scheduling and consuming the data from Kafka which becomes part of the
>> 'spark computation' part.
>>
>> The pattern we see is 1 fast, 1 slow, 1 fast,... zig...zag...
>>
>> Are there metrics available somehow on the Kafka reading time?
>>
>> Slow Task Fast Task local computation 347.6 281.53 spark computation 6930
>> 263 metric collection 70 138 wall clock process 7000 401 total records
>> processed 4297 5002
>>
>> (time in ms)
>>
>> kr, Gerard.
>>
>>
>> On Tue, Oct 6, 2015 at 8:01 PM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> Can you say anything more about what the job is doing?
>>>
>>> First thing I'd do is try to get some metrics on the time taken by your
>>> code on the executors (e.g. when processing the iterator) to see if it's
>>> consistent between the two situations.
>>>
>>> On Tue, Oct 6, 2015 at 11:45 AM, Gerard Maas <gerard.m...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> We recently migrated our streaming jobs to the direct kafka receiver.
>>>> Our initial migration went quite fine but now we are seeing a weird zig-zag
>>>> performance pattern we cannot explain.
>>>> In alternating fashion, one task takes about 1 second to finish and the
>>>> next takes 7sec for a stable streaming rate.
>>>>
>>>> Here are comparable metrics for two successive tasks:
>>>> *Slow*:
>>>>
>>>>
>>>> ​
>>>>
>>>> Executor ID Address Task Time Total Tasks Failed Tasks Succeeded Tasks
>>>> 20151006-044141-2408867082-5050-21047-S0 dnode-3.hdfs.private:36863 22
>>>> s 3 0 3 20151006-044141-2408867082-5050-21047-S1
>>>> dnode-0.hdfs.private:43812 40 s 11 0 11
>>>> 20151006-044141-2408867082-5050-21047-S4 dnode-5.hdfs.private:59945 49
>>>> s 10 0 10
>>>> *Fast*:
>>>>
>>>> ​
>>>>
>>>> Executor ID Address Task Time Total Tasks Failed Tasks Succeeded Tasks
>>>> 20151006-044141-2408867082-5050-21047-S0 dnode-3.hdfs.private:36863 0.6
>>>> s 4 0 4 20151006-044141-2408867082-5050-21047-S1
>>>> dnode-0.hdfs.private:43812 1 s 9 0 9
>>>> 20151006-044141-2408867082-5050-21047-S4 dnode-5.hdfs.private:59945 1 s
>>>> 11 0 11
>>>> We have some custom metrics that measure wall-clock time of execution
>>>> of certain blocks of the job, like the time it takes to do the local
>>>> computations (RDD.foreachPartition closure) vs total time.
>>>> The difference between the slow and fast executing task is on the
>>>> 'spark computation time' which is wall-clock for the task scheduling
>>>> (DStream.foreachRDD closure)
>>>>
>>>> e.g.
>>>> Slow task:
>>>>
>>>> local computation time: 347.6096849996, *spark computation time:
>>>> 6930*, metric collection: 70, total process: 7000, total_records: 4297
>>>>
>>>> Fast task:
>>>> local computation time: 281.539042,* spark computation time: 263*,
>>>> metric collection: 138, total process: 401, total_records: 5002
>>>>
>>>> We are currently running Spark 1.4.1. The load and the work to be done
>>>> is stable -this is on a dev env with that stuff under control.
>>>>
>>>> Any ideas what this behavior could be?
>>>>
>>>> thanks in advance,  Gerard.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>
>


Re: Does feature parity exist between Spark and PySpark

2015-10-06 Thread ayan guha
Hi

2 cents

1. It should not be true anymore if data frames are used. The reason is
regardless of the language DF uses same optimization engine behind the
scene.
2. This is generally true in the sense Python APIs are typically  little
behind of scala/java ones.

Best
Ayan

On Wed, Oct 7, 2015 at 9:15 AM, dant  wrote:

> Hi
>
> I'm hearing a common theme running that I should only do serious
> programming
> in Scala on Spark (1.5.1). Real power users use Scala. It is said that
> Python is great for analytics but in the end the code should be written to
> Scala to finalise. There are a number of reasons I'm hearing:
>
> 1. Spark is written in Scala so will always be faster than any other
> language implementation on top of it.
> 2. Spark releases always favour more features being visible and enabled for
> Scala API than Python API.
>
> Are there any truth's to the above? I'm a little sceptical.
>
> Apologies for the duplication, my previous message was held up due to
> subscription issue. Reposting now.
>
> Thanks
> Dan
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Does-feature-parity-exist-between-Spark-and-PySpark-tp24963.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Best Regards,
Ayan Guha


Re: laziness in textFile reading from HDFS?

2015-10-06 Thread Jonathan Coveney
LZO files are not splittable by default but there are projects with Input
and Output formats to make splittable LZO files. Check out twitter's
elephantbird on GitHub

El miércoles, 7 de octubre de 2015, Mohammed Guller 
escribió:

> It is not uncommon to process datasets larger than available memory with
> Spark.
>
> I don't remember whether LZO files are splittable. Perhaps, in your case
> Spark is running into issues while decompressing a large LZO file.
>
> See if this helps:
>
> http://stackoverflow.com/questions/25248170/spark-hadoop-throws-exception-for-large-lzo-files
>
>
> Mohammed
>
>
> -Original Message-
> From: Matt Narrell [mailto:matt.narr...@gmail.com ]
> Sent: Tuesday, October 6, 2015 4:08 PM
> To: Mohammed Guller
> Cc: davidkl; user@spark.apache.org 
> Subject: Re: laziness in textFile reading from HDFS?
>
> Agreed. This is spark 1.2 on CDH5.x. How do you mitigate where the data
> sets are larger than available memory?
>
> My jobs stall and gc/heap issues all over the place.
>
> ..via mobile
>
> > On Oct 6, 2015, at 4:44 PM, Mohammed Guller  > wrote:
> >
> > I have not used LZO compressed files from Spark, so not sure why it
> stalls without caching.
> >
> > In general, if you are going to make just one pass over the data, there
> is not much benefit in caching it. The data gets read anyway only after the
> first action is called. If you are calling just a map operation and then a
> save operation, I don't see how caching would help.
> >
> > Mohammed
> >
> >
> > -Original Message-
> > From: Matt Narrell [mailto:matt.narr...@gmail.com ]
> > Sent: Tuesday, October 6, 2015 3:32 PM
> > To: Mohammed Guller
> > Cc: davidkl; user@spark.apache.org 
> > Subject: Re: laziness in textFile reading from HDFS?
> >
> > One.
> >
> > I read in LZO compressed files from HDFS Perform a map operation cache
> the results of this map operation call saveAsHadoopFile to write LZO back
> to HDFS.
> >
> > Without the cache, the job will stall.
> >
> > mn
> >
> >> On Oct 5, 2015, at 7:25 PM, Mohammed Guller  > wrote:
> >>
> >> Is there any specific reason for caching the RDD? How many passes you
> make over the dataset?
> >>
> >> Mohammed
> >>
> >> -Original Message-
> >> From: Matt Narrell [mailto:matt.narr...@gmail.com ]
> >> Sent: Saturday, October 3, 2015 9:50 PM
> >> To: Mohammed Guller
> >> Cc: davidkl; user@spark.apache.org 
> >> Subject: Re: laziness in textFile reading from HDFS?
> >>
> >> Is there any more information or best practices here?  I have the exact
> same issues when reading large data sets from HDFS (larger than available
> RAM) and I cannot run without setting the RDD persistence level to
> MEMORY_AND_DISK_SER, and using nearly all the cluster resources.
> >>
> >> Should I repartition this RDD to be equal to the number of cores?
> >>
> >> I notice that the job duration on the YARN UI is about 30 minutes
> longer than the Spark UI.  When the job initially starts, there is no tasks
> shown in the Spark UI..?
> >>
> >> All I;m doing is reading records from HDFS text files with sc.textFile,
> and rewriting them back to HDFS grouped by a timestamp.
> >>
> >> Thanks,
> >> mn
> >>
> >>> On Sep 29, 2015, at 8:06 PM, Mohammed Guller  > wrote:
> >>>
> >>> 1) It is not required to have the same amount of memory as data.
> >>> 2) By default the # of partitions are equal to the number of HDFS
> >>> blocks
> >>> 3) Yes, the read operation is lazy
> >>> 4) It is okay to have more number of partitions than number of cores.
> >>>
> >>> Mohammed
> >>>
> >>> -Original Message-
> >>> From: davidkl [mailto:davidkl...@hotmail.com ]
> >>> Sent: Monday, September 28, 2015 1:40 AM
> >>> To: user@spark.apache.org 
> >>> Subject: laziness in textFile reading from HDFS?
> >>>
> >>> Hello,
> >>>
> >>> I need to process a significant amount of data every day, about 4TB.
> This will be processed in batches of about 140GB. The cluster this will be
> running on doesn't have enough memory to hold the dataset at once, so I am
> trying to understand how this works internally.
> >>>
> >>> When using textFile to read an HDFS folder (containing multiple
> files), I understand that the number of partitions created are equal to the
> number of HDFS blocks, correct? Are those created in a lazy way? I mean, if
> the number of blocks/partitions is larger than the number of cores/threads
> the Spark driver was launched with (N), are N partitions created initially
> and then the rest when required? Or are all those partitions created up
> front?
> >>>
> >>> I want to avoid reading the whole data into memory just to spill it
> out to disk if there is no enough memory.
> >>>
> >>> Thanks!
> >>>
> >>>
> >>>
> >>> --
> >>> View this message in context:
> >>> 

RE: DStream Transformation to save JSON in Cassandra 2.1

2015-10-06 Thread Prateek .
Thanks Ashish and Jean.

 I am using scala API, so I used the case class

case class Coordinate(id: String, ax: Double, ay: Double, az: Double, oa: 
Double, ob: Double, og:Double)

def mapToCoordinate(jsonMap: Map[String,Any]): Coordinate = {
//map the coordinate
}

val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val jsonf = 
lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[String,
 Any]])
val coordinate = jsonf.map(m => mapToCoordinate(m))


coordinate.saveToCassandra("android_data", "coordinate", SomeColumns("id", 
"ax","ay", "az", "oa", "ob", "og"))
// I am doing something wrong here Casssandra schema is getting "Some" type of 
value
Some(9e5ccb3d5e4e421392eb98978a6b368e) | Some(0.8437097133160075) | 
Some(0.5656238331780492) | Some(0.5235250642853548)

I am not able to figure out how to map the Dstream[Coordinate] to columns in 
schema .

Thank You
Prateek

-Original Message-
From: Jean-Baptiste Onofré [mailto:j...@nanthrax.net]
Sent: Monday, October 05, 2015 7:58 PM
To: user@spark.apache.org
Subject: Re: DStream Transformation to save JSON in Cassandra 2.1

Hi Prateek,

I see two ways:

- using Cassandra CQL to adapt the RDD in the DStream to Cassandra
- using a Cassandra converter

You have a couple of code snippet in the examples. Let me know if you need a 
code sample.

Regards
JB

On 10/05/2015 04:14 PM, Prateek . wrote:
> Hi,
>
> I am beginner in Spark , this is sample data I get from Kafka stream:
>
> {"id":
> "9f5ccb3d5f4f421392fb98978a6b368f","coordinate":{"ax":"1.20","ay":"3.8
> 0","az":"9.90","oa":"8.03","ob":"8.8","og":"9.97"}}
>
>val lines = KafkaUtils.createStream(ssc, zkQuorum, group, 
> topicMap).map(_._2)
>val jsonf =
> lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.i
> mmutable.Map[String, Any]])
>
>I am getting a, DSTream[Map[String,Any]]. I need to store each
> coordinate values in the below Cassandra schema
>
> CREATE TABLE iotdata.coordinate (
>  id text PRIMARY KEY, ax double, ay double, az double, oa double,
> ob double, oz double
> )
>
> For this what transformations I need to apply before I execute 
> saveToCassandra().
>
> Thank You,
> Prateek
>
>
> "DISCLAIMER: This message is proprietary to Aricent and is intended solely 
> for the use of the individual to whom it is addressed. It may contain 
> privileged or confidential information and should not be circulated or used 
> for any purpose other than for what it is intended. If you have received this 
> message in error, please notify the originator immediately. If you are not 
> the intended recipient, you are notified that you are strictly prohibited 
> from using, copying, altering, or disclosing the contents of this message. 
> Aricent accepts no responsibility for loss or damage arising from the use of 
> the information transmitted by this email including damage from virus."
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For
> additional commands, e-mail: user-h...@spark.apache.org
>

--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org

"DISCLAIMER: This message is proprietary to Aricent and is intended solely for 
the use of the individual to whom it is addressed. It may contain privileged or 
confidential information and should not be circulated or used for any purpose 
other than for what it is intended. If you have received this message in error, 
please notify the originator immediately. If you are not the intended 
recipient, you are notified that you are strictly prohibited from using, 
copying, altering, or disclosing the contents of this message. Aricent accepts 
no responsibility for loss or damage arising from the use of the information 
transmitted by this email including damage from virus."

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Graphx hangs and crashes on EdgeRDD creation

2015-10-06 Thread William Saar
Hi, I get the same problem with both the CanonicalVertexCut and 
RandomVertexCut, with the graph code as follows

val graph = Graph.fromEdgeTuples(indexedEdges, 0, None, 
StorageLevel.MEMORY_AND_DISK_SER, StorageLevel.MEMORY_AND_DISK_SER);
graph.partitionBy(PartitionStrategy.RandomVertexCut);
graph.connectedComponents().vertices


From: Robin East [mailto:robin.e...@xense.co.uk]
Sent: den 5 oktober 2015 19:07
To: William Saar ; user@spark.apache.org
Subject: Re: Graphx hangs and crashes on EdgeRDD creation

Have you tried using Graph.partitionBy? e.g. using 
PartitionStrategy.RandomVertexCut?
---
Robin East
Spark GraphX in Action Michael Malak and Robin East
Manning Publications Co.
http://www.manning.com/books/spark-graphx-in-action[manning.com]




On 5 Oct 2015, at 09:14, William Saar 
> wrote:

Hi,
I am trying to run a GraphX job on 20 million edges with Spark 1.5.1, but the 
job seems to hang for 30 minutes on a single executor when creating the graph 
and eventually crashes with “IllegalArgumentException: Size exceeds 
Integer.MAX_VALUE”

I suspect this is because of partitioning problem, but how can I control the 
partitioning of the creation of the EdgeRDD?

My graph code only does the following:
val graph = Graph.fromEdgeTuples(indexedEdges, 0, None, 
StorageLevel.MEMORY_AND_DISK_SER, StorageLevel.MEMORY_AND_DISK_SER);
graph.connectedComponents().vertices

The web UI shows the following while the job is hanging (I am running this 
inside a transform operation on spark streaming)
transform at 
MyJob.scala:62+details
RDD: EdgeRDD, 
EdgeRDD

org.apache.spark.streaming.dstream.DStream.transform(DStream.scala:649)

com.example.MyJob$.main(MyJob.scala:62)

com.example.MyJob.main(MyJob.scala)

sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

The executor thread dump while the job is hanging is the following
Thread 66: Executor task launch worker-1 (RUNNABLE)
java.lang.System.identityHashCode(Native Method)
com.esotericsoftware.kryo.util.IdentityObjectIntMap.get(IdentityObjectIntMap.java:241)
com.esotericsoftware.kryo.util.MapReferenceResolver.getWrittenId(MapReferenceResolver.java:28)
com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:588)
com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:566)
org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158)
org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153)
org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1190)
org.apache.spark.storage.DiskStore$$anonfun$putIterator$1.apply$mcV$sp(DiskStore.scala:81)
org.apache.spark.storage.DiskStore$$anonfun$putIterator$1.apply(DiskStore.scala:81)
org.apache.spark.storage.DiskStore$$anonfun$putIterator$1.apply(DiskStore.scala:81)
org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1206)
org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:82)
org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:791)
org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638)
org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:153)
org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
org.apache.spark.scheduler.Task.run(Task.scala:88)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)

The failure stack trace is as follows:
15/10/02 17:09:54 ERROR JobScheduler: Error generating jobs for time 

compatibility issue with Jersey2

2015-10-06 Thread oggie
I have some jersey compatibility issues when I tried to upgrade from 1.3.1 to
1.4.1..

We have a Java app written with spark 1.3.1. That app also uses Jersey 2.9
client to make external calls.  We see spark 1.4.1 uses Jersey 1.9.

In 1.3.1 we were able to add some exclusions to our pom and everything
worked fine.  But now it seems there's extra logic in Spark that now needs
those exclusions.  If I remove the exclusions, then our code that uses
Jersey2 fails. I don't really want to downgrade our code to Jersey1 though.

The error is:

java.lang.NoClassDefFoundError:
com/sun/jersey/spi/container/servlet/ServletContainer
...
at
org.apache.spark.status.api.v1.ApiRootResource$.getServletHandler(ApiRootResource.scala:174)


Is there anything that can be done in the pom to fix this? Here's what we
have right now in our pom:
  
org.apache.spark
spark-core_2.10
1.4.1


com.sun.jersey
jersey-core


com.sun.jersey
jersey-client


com.sun.jersey
jersey-server

 
com.sun.jersey
jersey-json

 
com.sun.jersey.contribs
jersey-guice

 
com.sun.jersey
jersey-grizzly2


com.sun.jersey.jersey-test-framework
jersey-test-framework-core


com.sun.jersey.jersey-test-framework
jersey-test-framework-grizzly2


provided




org.apache.spark
spark-streaming_2.10
1.4.1
provided


org.apache.spark
spark-sql_2.10
1.4.1
provided


org.glassfish.jersey.core
jersey-client
2.9





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/compatibility-issue-with-Jersey2-tp24951.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SparkR Error in sparkR.init(master=“local”) in RStudio

2015-10-06 Thread akhandeshi
I couldn't get this working...

I have have JAVA_HOME set.
I have defined SPARK_HOME
Sys.setenv(SPARK_HOME="c:\DevTools\spark-1.5.1")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
library("SparkR", lib.loc="c:\\DevTools\\spark-1.5.1\\lib")
library(SparkR)
sc<-sparkR.init(master="local")

I get 
Error in sparkR.init(master = "local") : 
  JVM is not ready after 10 seconds

What am I missing??






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-Error-in-sparkR-init-master-local-in-RStudio-tp23768p24949.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Broadcast var is null

2015-10-06 Thread dpristin
I've reduced the code to the code below - no streaming, no Kafka, no
checkpoint. Unfortunately the end result is the same - "broadcastVar is
null" printed in the worker log. Any suggestion on what I'm missing would be
very much appreciated !


object BroadcastTest extends App {
  val logger = LoggerFactory.getLogger("OinkSparkMain")
  logger.info("OinkSparkMain - Setup Logger")

  val sparkConf = new SparkConf().setAppName("OinkSparkMain")
  val sc : SparkContext = new SparkContext(sparkConf)

  val rdd = sc.parallelize(Array(1,2,3));

  val arr = Array(1, 2, 3)
  val broadcastVar = sc.broadcast(arr)

  val mappedEvents =  rdd.map(e => {
val l = LoggerFactory.getLogger("OinkSparkMain1")

if (broadcastVar == null) {
  l.info("broadcastVar is null")
  (e, "empty")
}
else {
  val str = broadcastVar.value.mkString(" | ")
  l.info("broadcastVar is " + str)
  (e, str)
}
  })

  logger.info("** Total reduced count: " +
mappedEvents.collect().length)
}






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-var-is-null-tp24927p24950.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: does KafkaCluster can be public ?

2015-10-06 Thread Jonathan Coveney
You can put a class in the org.apache.spark namespace to access anything
that is private[spark]. You can then make enrichments there to access
whatever you need. Just beware upgrade pain :)

El martes, 6 de octubre de 2015, Erwan ALLAIN 
escribió:

> Hello,
>
> I'm currently testing spark streaming with kafka.
> I'm creating DirectStream with KafkaUtils and everything's fine. However I
> would like to use the signature where I can specify my own message handler
> (to play with partition and offset). In this case, I need to manage
> offset/partition by myself to fill fromOffsets argument.
> I have found a Jira on this usecase
> https://issues.apache.org/jira/browse/SPARK-6714 but it has been closed
> telling that it's too specific.
> I'm aware that it can be done using kafka api (TopicMetaDataRequest and
> OffsetRequest) but what I have to do is almost the same as the KafkaCluster
> which is private.
>
> is it possible to :
>  - add another signature in KafkaUtils ?
>  - make KafkaCluster public ?
>
> or do you have any other srmart solution where I don't need to copy/paste
> KafkaCluster ?
>
> Thanks.
>
> Regards,
> Erwan ALLAIN
>


Re: [Spark on YARN] Multiple Auxiliary Shuffle Service Versions

2015-10-06 Thread Andreas Fritzler
Hi Andrew,

thanks a lot for the clarification!

Regards,
Andreas

On Tue, Oct 6, 2015 at 2:23 AM, Andrew Or  wrote:

> Hi all,
>
> Both the history server and the shuffle service are backward compatible,
> but not forward compatible. This means as long as you have the latest
> version of history server / shuffle service running in your cluster then
> you're fine (you don't need multiple of them).
>
> That said, an old shuffle service (e.g. 1.2) also happens to work with say
> Spark 1.4 because the shuffle file formats haven't changed. However, there
> are no guarantees that this will remain the case.
>
> -Andrew
>
> 2015-10-05 16:37 GMT-07:00 Alex Rovner :
>
>> We are running CDH 5.4 with Spark 1.3 as our main version and that
>> version is configured to use the external shuffling service. We have also
>> installed Spark 1.5 and have configured it not to use the external
>> shuffling service and that works well for us so far. I would be interested
>> myself how to configure multiple versions to use the same shuffling service.
>>
>> *Alex Rovner*
>> *Director, Data Engineering *
>> *o:* 646.759.0052
>>
>> * *
>>
>> On Mon, Oct 5, 2015 at 11:06 AM, Andreas Fritzler <
>> andreas.fritz...@gmail.com> wrote:
>>
>>> Hi Steve, Alex,
>>>
>>> how do you handle the distribution and configuration of
>>> the spark-*-yarn-shuffle.jar on your NodeManagers if you want to use 2
>>> different Spark versions?
>>>
>>> Regards,
>>> Andreas
>>>
>>> On Mon, Oct 5, 2015 at 4:54 PM, Steve Loughran 
>>> wrote:
>>>

 > On 5 Oct 2015, at 16:48, Alex Rovner 
 wrote:
 >
 > Hey Steve,
 >
 > Are you referring to the 1.5 version of the history server?
 >


 Yes. I should warn, however, that there's no guarantee that a history
 server running the 1.4 code will handle the histories of a 1.5+ job. In
 fact, I'm fairly confident it won't, as the events to get replayed are
 different.

>>>
>>>
>>
>


Re: compatibility issue with Jersey2

2015-10-06 Thread Ted Yu
Maybe build Spark with -Djersey.version=2.9 ?

Cheers

On Tue, Oct 6, 2015 at 5:57 AM, oggie  wrote:

> I have some jersey compatibility issues when I tried to upgrade from 1.3.1
> to
> 1.4.1..
>
> We have a Java app written with spark 1.3.1. That app also uses Jersey 2.9
> client to make external calls.  We see spark 1.4.1 uses Jersey 1.9.
>
> In 1.3.1 we were able to add some exclusions to our pom and everything
> worked fine.  But now it seems there's extra logic in Spark that now needs
> those exclusions.  If I remove the exclusions, then our code that uses
> Jersey2 fails. I don't really want to downgrade our code to Jersey1 though.
>
> The error is:
>
> java.lang.NoClassDefFoundError:
> com/sun/jersey/spi/container/servlet/ServletContainer
> ...
> at
>
> org.apache.spark.status.api.v1.ApiRootResource$.getServletHandler(ApiRootResource.scala:174)
>
>
> Is there anything that can be done in the pom to fix this? Here's what we
> have right now in our pom:
>   
> org.apache.spark
> spark-core_2.10
> 1.4.1
> 
> 
> com.sun.jersey
> jersey-core
> 
> 
> com.sun.jersey
> jersey-client
> 
> 
> com.sun.jersey
> jersey-server
> 
>  
> com.sun.jersey
> jersey-json
> 
>  
> com.sun.jersey.contribs
> jersey-guice
> 
>  
> com.sun.jersey
> jersey-grizzly2
> 
> 
> com.sun.jersey.jersey-test-framework
> jersey-test-framework-core
> 
> 
> com.sun.jersey.jersey-test-framework
> jersey-test-framework-grizzly2
> 
> 
> provided
> 
>
>
> 
> org.apache.spark
> spark-streaming_2.10
> 1.4.1
> provided
> 
> 
> org.apache.spark
> spark-sql_2.10
> 1.4.1
> provided
> 
> 
> org.glassfish.jersey.core
> jersey-client
> 2.9
> 
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/compatibility-issue-with-Jersey2-tp24951.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: extracting the top 100 values from an rdd and save it as text file

2015-10-06 Thread gtanguy
Hello patelmiteshn,

This could do the trick :

rdd1 = rdd.sortBy(lambda x: x[1], ascending=False)
rdd2 = rdd1.zipWithIndex().filter(tuple => tuple._2 < 1)
rdd2.saveAsTextFile() 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/extracting-the-top-100-values-from-an-rdd-and-save-it-as-text-file-tp24937p24948.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Enabling kryo serialization slows down machine learning app.

2015-10-06 Thread fede.sc
Hi,
my team is setting up a machine-learning framework based on Spark's mlib,
that currently uses
logistic regression. I enabled Kryo serialization and enforced class
registration, so I know
that all the serialized classes are registered. However, the running times
when Kryo
serialization is enabled are consistently longer. This is true both when
running locally on
a smaller samples (1.6 minutes vs 1.3m) and also when running with a larger
sample on AWS with
two workers nodes (2h30 vs 1h50).

Using the monitoring tools suggests that Task Deserialization Times are
similar (although perhaps
slightly longer for Kryo), but Task Durations and even Scheduler Delays
increase significantly.

There is also a significant difference in memory usage: for Kryo the number
of stored RDDs is
higher (much more so on the local sample: 40 vs. 4).

Does anyone have an idea of what can be going on, or where should I focus to
find out?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Enabling-kryo-serialization-slows-down-machine-learning-app-tp24947.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: SparkR Error in sparkR.init(master=“local”) in RStudio

2015-10-06 Thread Sun, Rui
What you have done is supposed to work.  Need more debugging information to 
find the cause.

Could you add the following lines before calling sparkR.init()? 

Sys.setenv(SPARKR_SUBMIT_ARGS="--verbose sparkr-shell")
Sys.setenv(SPARK_PRINT_LAUNCH_COMMAND=1)

Then to see if you can find any hint in the console output

-Original Message-
From: akhandeshi [mailto:ami.khande...@gmail.com] 
Sent: Tuesday, October 6, 2015 8:21 PM
To: user@spark.apache.org
Subject: Re: SparkR Error in sparkR.init(master=“local”) in RStudio

I couldn't get this working...

I have have JAVA_HOME set.
I have defined SPARK_HOME
Sys.setenv(SPARK_HOME="c:\DevTools\spark-1.5.1")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths())) 
library("SparkR", lib.loc="c:\\DevTools\\spark-1.5.1\\lib")
library(SparkR)
sc<-sparkR.init(master="local")

I get
Error in sparkR.init(master = "local") : 
  JVM is not ready after 10 seconds

What am I missing??






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-Error-in-sparkR-init-master-local-in-RStudio-tp23768p24949.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Trying PCA on spark but serialization is error thrown

2015-10-06 Thread Cukoo
Hi,

I tried to used the PCA object in one of my project but end up receiving a
serialization error. Any help would be appreciated. Example taken from
https://spark.apache.org/docs/latest/mllib-feature-extraction.html#pca

My Code:
val selector = new PCA(20)
val transformer = selector.fit(discretizedData.map(_.features))
val filteredData = discretizedData.map(lp => lp.copy(features =
transformer.transform(lp.features)))

Stack trace:
scala> val filteredData = discretizedData.map(lp => lp.copy(features =
transformer.transform(lp.features)))
org.apache.spark.SparkException: Task not serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2021)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:314)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:313)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
at org.apache.spark.rdd.RDD.map(RDD.scala:313)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:40)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:45)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:47)
at $iwC$$iwC$$iwC$$iwC$$iwC.(:49)
at $iwC$$iwC$$iwC$$iwC.(:51)
at $iwC$$iwC$$iwC.(:53)
at $iwC$$iwC.(:55)
at $iwC.(:57)
at (:59)
at .(:63)
at .()
at .(:7)
at .()
at $print()
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
at
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
at
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
at
org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
at
org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
at
org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
at
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
at
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at
org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException:
org.apache.spark.mllib.feature.PCA
Serialization stack:
- object not serializable (class:
org.apache.spark.mllib.feature.PCA, value:
org.apache.spark.mllib.feature.PCA@51148636)
- field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name:
selector, type: class org.apache.spark.mllib.feature.PCA)
- object (class 

Re: How can I disable logging when running local[*]?

2015-10-06 Thread Jeff Jones
Thanks. Any chance you know how to pass this to a Scala app that is run via 
TypeSafe activator?

I tried putting it $JAVA_OPTS but I get:

Unrecognized option: --driver-java-options

Error: Could not create the Java Virtual Machine.

Error: A fatal exception has occurred. Program will exit.


I tried a bunch of different quoting but nothing produced a good result. I also 
tried passing it directly to activator using –jvm but it still produces the 
same results with verbose logging. Is there a way I can tell if it’s picking up 
my file?



From: Alex Kozlov
Date: Monday, October 5, 2015 at 8:34 PM
To: Jeff Jones
Cc: "user@spark.apache.org"
Subject: Re: How can I disable logging when running local[*]?

Did you try “--driver-java-options 
'-Dlog4j.configuration=file:/'” and setting the 
log4j.rootLogger=FATAL,console?

On Mon, Oct 5, 2015 at 8:19 PM, Jeff Jones 
> wrote:
I’ve written an application that hosts the Spark driver in-process using 
“local[*]”. I’ve turned off logging in my conf/log4j.properties file. I’ve also 
tried putting the following code prior to creating my SparkContext. These were 
coupled together from various posts I’ve. None of these steps have worked. I’m 
still getting a ton of logging to the console. Anything else I can try?

Thanks,
Jeff

private def disableLogging(): Unit = {
  import org.apache.log4j.PropertyConfigurator

  PropertyConfigurator.configure("conf/log4j.properties")
  Logger.getRootLogger().setLevel(Level.OFF)
  Logger.getLogger("org").setLevel(Level.OFF)
  Logger.getLogger("akka").setLevel(Level.OFF)
}


This message (and any attachments) is intended only for the designated 
recipient(s). It
may contain confidential or proprietary information, or have other limitations 
on use as
indicated by the sender. If you are not a designated recipient, you may not 
review, use,
copy or distribute this message. If you received this in error, please notify 
the sender by
reply e-mail and delete this message.



--
Alex Kozlov
(408) 507-4987
(408) 830-9982 fax
(650) 887-2135 efax
ale...@gmail.com


This message (and any attachments) is intended only for the designated 
recipient(s). It
may contain confidential or proprietary information, or have other limitations 
on use as
indicated by the sender. If you are not a designated recipient, you may not 
review, use,
copy or distribute this message. If you received this in error, please notify 
the sender by
reply e-mail and delete this message.


Re: ORC files created by Spark job can't be accessed using hive table

2015-10-06 Thread Ted Yu
See this thread:
http://search-hadoop.com/m/q3RTtwwjNxXvPEe1

A brief search in Spark JIRAs didn't find anything opened on this subject.

On Tue, Oct 6, 2015 at 8:51 AM, unk1102  wrote:

> Hi I have a spark job which creates ORC files in partitions using the
> following code
>
>
> dataFrame.write().mode(SaveMode.Append).partitionBy("entity","date").format("orc").save("baseTable");
>
> Above code creates successfully orc files which is readable in Spark
> dataframe
>
> But when I try to load orc files generated using above code into hive orc
> table or hive external table nothing gets printed looks like table is empty
> what's wrong here I can see orc files in hdfs but hive table does not read
> it please guide
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/ORC-files-created-by-Spark-job-can-t-be-accessed-using-hive-table-tp24954.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


API to run spark Jobs

2015-10-06 Thread shahid qadri
Hi Folks

How i can submit my spark app(python) to the cluster without using 
spark-submit, actually i need to invoke jobs from UI
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.3.1 on Yarn not using all given capacity

2015-10-06 Thread Cesar Berezowski
3 cores* not 8

César.



> Le 6 oct. 2015 à 19:08, Cesar Berezowski  a écrit :
> 
> I deployed hdp 2.3.1 and got spark 1.3.1, spark 1.4 is supposed to be 
> available as technical preview I think
> 
> vendor’s forum ? you mean hortonworks' ? 
> 
> --
> Update on my info: 
> 
> Set Yarn to use 16 cores instead of 8 & set min container size to 4096mb
> Thus: 
> 12 executors, 12G of Ram and 8 cores
> 
> But same issue, still creates 3 container (+ driver), 1 core and 6.3gb each, 
> taking 16gb on yarn
> 
> César.
> 
> 
> 
>> Le 6 oct. 2015 à 19:00, Ted Yu > > a écrit :
>> 
>> Considering posting the question on vendor's forum.
>> 
>> HDP 2.3 comes with Spark 1.4 if I remember correctly.
>> 
>> On Tue, Oct 6, 2015 at 9:05 AM, czoo > > wrote:
>> Hi,
>> 
>> This post might be a duplicate with updates from another one (by me), sorry
>> in advance
>> 
>> I have an HDP 2.3 cluster running Spark 1.3.1 on 6 nodes (edge + master + 4
>> workers)
>> Each worker has 8 cores and 40G of RAM available in Yarn
>> 
>> That makes a total of 160GB and 32 cores
>> 
>> I'm running a job with the following parameters :
>> --master yarn-client
>> --num-executors 12 (-> 3 / node)
>> --executor-cores 2
>> --executor-memory 12G
>> 
>> I don't know if it's optimal but it should run (right ?)
>> 
>> However I end up with spark setting up 2 executors using 1 core & 6.2G each
>> 
>> Plus, my job is doing a cartesian product so I end up with a pretty big
>> DataFrame that inevitably ends on a GC exception...
>> It used to run on HDP2.2 / Spark 1.2.1 but I can't find any way to run it
>> now
>> 
>> Any Idea ?
>> 
>> Thanks a lot
>> 
>> Cesar
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-3-1-on-Yarn-not-using-all-given-capacity-tp24955.html
>>  
>> 
>> Sent from the Apache Spark User List mailing list archive at Nabble.com 
>> .
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
>> 
>> For additional commands, e-mail: user-h...@spark.apache.org 
>> 
>> 
>> 
> 



Re: ORC files created by Spark job can't be accessed using hive table

2015-10-06 Thread Michael Armbrust
I believe this is fixed in Spark 1.5.1 as long as the table is only using
types that hive understands and is not partitioned.  The problem with
partitioned tables it that hive does not support dynamic discovery unless
you manually run the repair command.

On Tue, Oct 6, 2015 at 9:33 AM, Umesh Kacha  wrote:

> Hi Ted thanks I know I solved that by using dataframe for both reading and
> writing. I am running into different problem now if spark can read hive orc
> files why can't hive read orc files created by Spark?
> On Oct 6, 2015 9:28 PM, "Ted Yu"  wrote:
>
>> See this thread:
>> http://search-hadoop.com/m/q3RTtwwjNxXvPEe1
>>
>> A brief search in Spark JIRAs didn't find anything opened on this subject.
>>
>> On Tue, Oct 6, 2015 at 8:51 AM, unk1102  wrote:
>>
>>> Hi I have a spark job which creates ORC files in partitions using the
>>> following code
>>>
>>>
>>> dataFrame.write().mode(SaveMode.Append).partitionBy("entity","date").format("orc").save("baseTable");
>>>
>>> Above code creates successfully orc files which is readable in Spark
>>> dataframe
>>>
>>> But when I try to load orc files generated using above code into hive orc
>>> table or hive external table nothing gets printed looks like table is
>>> empty
>>> what's wrong here I can see orc files in hdfs but hive table does not
>>> read
>>> it please guide
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/ORC-files-created-by-Spark-job-can-t-be-accessed-using-hive-table-tp24954.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>


Re: SparkR Error in sparkR.init(master=“local”) in RStudio

2015-10-06 Thread Hossein
Have you built the Spark jars? Can you run the Spark Scala shell?

--Hossein

On Tuesday, October 6, 2015, Khandeshi, Ami 
wrote:

> > Sys.setenv(SPARKR_SUBMIT_ARGS="--verbose sparkr-shell")
> > Sys.setenv(SPARK_PRINT_LAUNCH_COMMAND=1)
> >
> > sc <- sparkR.init(master="local")
> Launching java with spark-submit command
> /C/DevTools/spark-1.5.1/bin/spark-submit.cmd   --verbose sparkr-shell
> C:\Users\a554719\AppData\Local\Temp\Rtmpw11KJ1\backend_port31b0afd4391
> Error in sparkR.init(master = "local") :
>   JVM is not ready after 10 seconds
> In addition: Warning message:
> running command '"/C/DevTools/spark-1.5.1/bin/spark-submit.cmd"
>  --verbose sparkr-shell
> C:\Users\a554719\AppData\Local\Temp\Rtmpw11KJ1\backend_port31b0afd4391' had
> status 127
>
> -Original Message-
> From: Sun, Rui [mailto:rui@intel.com ]
> Sent: Tuesday, October 06, 2015 9:39 AM
> To: akhandeshi; user@spark.apache.org 
> Subject: RE: SparkR Error in sparkR.init(master=“local”) in RStudio
>
> What you have done is supposed to work.  Need more debugging information
> to find the cause.
>
> Could you add the following lines before calling sparkR.init()?
>
> Sys.setenv(SPARKR_SUBMIT_ARGS="--verbose sparkr-shell")
> Sys.setenv(SPARK_PRINT_LAUNCH_COMMAND=1)
>
> Then to see if you can find any hint in the console output
>
> -Original Message-
> From: akhandeshi [mailto:ami.khande...@gmail.com ]
> Sent: Tuesday, October 6, 2015 8:21 PM
> To: user@spark.apache.org 
> Subject: Re: SparkR Error in sparkR.init(master=“local”) in RStudio
>
> I couldn't get this working...
>
> I have have JAVA_HOME set.
> I have defined SPARK_HOME
> Sys.setenv(SPARK_HOME="c:\DevTools\spark-1.5.1")
> .libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
> library("SparkR", lib.loc="c:\\DevTools\\spark-1.5.1\\lib")
> library(SparkR)
> sc<-sparkR.init(master="local")
>
> I get
> Error in sparkR.init(master = "local") :
>   JVM is not ready after 10 seconds
>
> What am I missing??
>
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-Error-in-sparkR-init-master-local-in-RStudio-tp23768p24949.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> For additional commands, e-mail: user-h...@spark.apache.org 
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> For additional commands, e-mail: user-h...@spark.apache.org 
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> For additional commands, e-mail: user-h...@spark.apache.org 
>
>

-- 
--Hossein


Weird performance pattern of Spark Streaming (1.4.1) + direct Kafka

2015-10-06 Thread Gerard Maas
Hi,

We recently migrated our streaming jobs to the direct kafka receiver. Our
initial migration went quite fine but now we are seeing a weird zig-zag
performance pattern we cannot explain.
In alternating fashion, one task takes about 1 second to finish and the
next takes 7sec for a stable streaming rate.

Here are comparable metrics for two successive tasks:
*Slow*:


​

Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded Tasks
20151006-044141-2408867082-5050-21047-S0dnode-3.hdfs.private:3686322 s303
20151006-044141-2408867082-5050-21047-S1dnode-0.hdfs.private:4381240 s11011
20151006-044141-2408867082-5050-21047-S4dnode-5.hdfs.private:5994549 s10010
*Fast*:

​

Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded Tasks
20151006-044141-2408867082-5050-21047-S0dnode-3.hdfs.private:368630.6 s404
20151006-044141-2408867082-5050-21047-S1dnode-0.hdfs.private:438121 s909
20151006-044141-2408867082-5050-21047-S4dnode-5.hdfs.private:599451 s11011
We have some custom metrics that measure wall-clock time of execution of
certain blocks of the job, like the time it takes to do the local
computations (RDD.foreachPartition closure) vs total time.
The difference between the slow and fast executing task is on the 'spark
computation time' which is wall-clock for the task scheduling
(DStream.foreachRDD closure)

e.g.
Slow task:

local computation time: 347.6096849996, *spark computation time: 6930*,
metric collection: 70, total process: 7000, total_records: 4297

Fast task:
local computation time: 281.539042,* spark computation time: 263*, metric
collection: 138, total process: 401, total_records: 5002

We are currently running Spark 1.4.1. The load and the work to be done is
stable -this is on a dev env with that stuff under control.

Any ideas what this behavior could be?

thanks in advance,  Gerard.


Re: API to run spark Jobs

2015-10-06 Thread Ted Yu
Please take a look at:
org.apache.spark.deploy.rest.RestSubmissionClient

which is used
by core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

FYI

On Tue, Oct 6, 2015 at 10:08 AM, shahid qadri 
wrote:

> hi Jeff
> Thanks
> More specifically i need the Rest api to submit pyspark job, can you point
> me to Spark submit  REST api
>
> On Oct 6, 2015, at 10:25 PM, Jeff Nadler  wrote:
>
>
> Spark standalone doesn't come with a UI for submitting jobs.   Some Hadoop
> distros might, for example EMR in AWS has a job submit UI.
>
> Spark submit just calls a REST api, you could build any UI you want on top
> of that...
>
>
> On Tue, Oct 6, 2015 at 9:37 AM, shahid qadri 
> wrote:
>
>> Hi Folks
>>
>> How i can submit my spark app(python) to the cluster without using
>> spark-submit, actually i need to invoke jobs from UI
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>


Re: SparkR Error in sparkR.init(master=“local”) in RStudio

2015-10-06 Thread akhandeshi
It seems it is failing at 
 path <- tempfile(pattern = "backend_port")  I do not see backend_port
directory created...



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-Error-in-sparkR-init-master-local-in-RStudio-tp23768p24958.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark 1.3.1 on Yarn not using all given capacity

2015-10-06 Thread czoo
Hi, 

This post might be a duplicate with updates from another one (by me), sorry
in advance

I have an HDP 2.3 cluster running Spark 1.3.1 on 6 nodes (edge + master + 4
workers) 
Each worker has 8 cores and 40G of RAM available in Yarn 

That makes a total of 160GB and 32 cores

I'm running a job with the following parameters : 
--master yarn-client
--num-executors 12 (-> 3 / node)
--executor-cores 2 
--executor-memory 12G 

I don't know if it's optimal but it should run (right ?)

However I end up with spark setting up 2 executors using 1 core & 6.2G each

Plus, my job is doing a cartesian product so I end up with a pretty big
DataFrame that inevitably ends on a GC exception...
It used to run on HDP2.2 / Spark 1.2.1 but I can't find any way to run it
now

Any Idea ?

Thanks a lot

Cesar



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-3-1-on-Yarn-not-using-all-given-capacity-tp24955.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: ORC files created by Spark job can't be accessed using hive table

2015-10-06 Thread Umesh Kacha
Hi Ted thanks I know I solved that by using dataframe for both reading and
writing. I am running into different problem now if spark can read hive orc
files why can't hive read orc files created by Spark?
On Oct 6, 2015 9:28 PM, "Ted Yu"  wrote:

> See this thread:
> http://search-hadoop.com/m/q3RTtwwjNxXvPEe1
>
> A brief search in Spark JIRAs didn't find anything opened on this subject.
>
> On Tue, Oct 6, 2015 at 8:51 AM, unk1102  wrote:
>
>> Hi I have a spark job which creates ORC files in partitions using the
>> following code
>>
>>
>> dataFrame.write().mode(SaveMode.Append).partitionBy("entity","date").format("orc").save("baseTable");
>>
>> Above code creates successfully orc files which is readable in Spark
>> dataframe
>>
>> But when I try to load orc files generated using above code into hive orc
>> table or hive external table nothing gets printed looks like table is
>> empty
>> what's wrong here I can see orc files in hdfs but hive table does not read
>> it please guide
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/ORC-files-created-by-Spark-job-can-t-be-accessed-using-hive-table-tp24954.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: API to run spark Jobs

2015-10-06 Thread Jeff Nadler
Spark standalone doesn't come with a UI for submitting jobs.   Some Hadoop
distros might, for example EMR in AWS has a job submit UI.

Spark submit just calls a REST api, you could build any UI you want on top
of that...


On Tue, Oct 6, 2015 at 9:37 AM, shahid qadri 
wrote:

> Hi Folks
>
> How i can submit my spark app(python) to the cluster without using
> spark-submit, actually i need to invoke jobs from UI
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark 1.3.1 on Yarn not using all given capacity

2015-10-06 Thread Ted Yu
Considering posting the question on vendor's forum.

HDP 2.3 comes with Spark 1.4 if I remember correctly.

On Tue, Oct 6, 2015 at 9:05 AM, czoo  wrote:

> Hi,
>
> This post might be a duplicate with updates from another one (by me), sorry
> in advance
>
> I have an HDP 2.3 cluster running Spark 1.3.1 on 6 nodes (edge + master + 4
> workers)
> Each worker has 8 cores and 40G of RAM available in Yarn
>
> That makes a total of 160GB and 32 cores
>
> I'm running a job with the following parameters :
> --master yarn-client
> --num-executors 12 (-> 3 / node)
> --executor-cores 2
> --executor-memory 12G
>
> I don't know if it's optimal but it should run (right ?)
>
> However I end up with spark setting up 2 executors using 1 core & 6.2G each
>
> Plus, my job is doing a cartesian product so I end up with a pretty big
> DataFrame that inevitably ends on a GC exception...
> It used to run on HDP2.2 / Spark 1.2.1 but I can't find any way to run it
> now
>
> Any Idea ?
>
> Thanks a lot
>
> Cesar
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-3-1-on-Yarn-not-using-all-given-capacity-tp24955.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: API to run spark Jobs

2015-10-06 Thread shahid qadri
hi Jeff 
Thanks
More specifically i need the Rest api to submit pyspark job, can you point me 
to Spark submit  REST api

> On Oct 6, 2015, at 10:25 PM, Jeff Nadler  wrote:
> 
> 
> Spark standalone doesn't come with a UI for submitting jobs.   Some Hadoop 
> distros might, for example EMR in AWS has a job submit UI.
> 
> Spark submit just calls a REST api, you could build any UI you want on top of 
> that...
> 
> 
> On Tue, Oct 6, 2015 at 9:37 AM, shahid qadri  > wrote:
> Hi Folks
> 
> How i can submit my spark app(python) to the cluster without using 
> spark-submit, actually i need to invoke jobs from UI
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> 
> For additional commands, e-mail: user-h...@spark.apache.org 
> 
> 
> 



Re: API to run spark Jobs

2015-10-06 Thread Jeff Nadler
Yeah I was going to suggest looking at the code too.   It's a shame there
isn't a page in the docs that covers the port 6066 rest api.

On Tue, Oct 6, 2015 at 10:16 AM, Ted Yu  wrote:

> Please take a look at:
> org.apache.spark.deploy.rest.RestSubmissionClient
>
> which is used
> by core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
>
> FYI
>
> On Tue, Oct 6, 2015 at 10:08 AM, shahid qadri 
> wrote:
>
>> hi Jeff
>> Thanks
>> More specifically i need the Rest api to submit pyspark job, can you
>> point me to Spark submit  REST api
>>
>> On Oct 6, 2015, at 10:25 PM, Jeff Nadler  wrote:
>>
>>
>> Spark standalone doesn't come with a UI for submitting jobs.   Some
>> Hadoop distros might, for example EMR in AWS has a job submit UI.
>>
>> Spark submit just calls a REST api, you could build any UI you want on
>> top of that...
>>
>>
>> On Tue, Oct 6, 2015 at 9:37 AM, shahid qadri 
>> wrote:
>>
>>> Hi Folks
>>>
>>> How i can submit my spark app(python) to the cluster without using
>>> spark-submit, actually i need to invoke jobs from UI
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>>
>


Re: spark-submit --packages using different resolver

2015-10-06 Thread Jerry Lam
Hi Burak,

Thank you for the tip.
Unfortunately it does not work. It throws:

java.net.MalformedURLException: unknown protocol: s3n]
at
org.apache.spark.deploy.SparkSubmitUtils$.resolveMavenCoordinates(SparkSubmit.scala:1003)
at
org.apache.spark.deploy.SparkSubmit$.prepareSubmitEnvironment(SparkSubmit.scala:286)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:153)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

It looks like the meat is in the createRepoResolvers which does not
currently support s3 repo. I will file a jira ticket for this.

Best Regards,

Jerry

On Sat, Oct 3, 2015 at 12:50 PM, Burak Yavuz  wrote:

> Hi Jerry,
>
> The --packages feature doesn't support private repositories right now.
> However, in the case of s3, maybe it might work. Could you please try using
> the --repositories flag and provide the address:
> `$ spark-submit --packages my:awesome:package --repositories
> s3n://$aws_ak:$aws_sak@bucket/path/to/repo`
>
> If that doesn't work, could you please file a JIRA?
>
> Best,
> Burak
>
>
> On Thu, Oct 1, 2015 at 8:58 PM, Jerry Lam  wrote:
>
>> Hi spark users and developers,
>>
>> I'm trying to use spark-submit --packages against private s3 repository.
>> With sbt, I'm using fm-sbt-s3-resolver with proper aws s3 credentials. I
>> wonder how can I add this resolver into spark-submit such that --packages
>> can resolve dependencies from private repo?
>>
>> Thank you!
>>
>> Jerry
>>
>
>


ORC files created by Spark job can't be accessed using hive table

2015-10-06 Thread unk1102
Hi I have a spark job which creates ORC files in partitions using the
following code 

dataFrame.write().mode(SaveMode.Append).partitionBy("entity","date").format("orc").save("baseTable");

Above code creates successfully orc files which is readable in Spark
dataframe 

But when I try to load orc files generated using above code into hive orc
table or hive external table nothing gets printed looks like table is empty
what's wrong here I can see orc files in hdfs but hive table does not read
it please guide 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ORC-files-created-by-Spark-job-can-t-be-accessed-using-hive-table-tp24954.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org