Re: Kafka + Spark streaming

2014-12-31 Thread Samya Maiti
Thanks TD.

On Wed, Dec 31, 2014 at 7:19 AM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 1. Of course, a single block / partition has many Kafka messages, and
 from different Kafka topics interleaved together. The message count is
 not related to the block count. Any message received within a
 particular block interval will go in the same block.

 2. Yes, the receiver will be started on another worker.

 TD


 On Tue, Dec 30, 2014 at 2:19 PM, SamyaMaiti samya.maiti2...@gmail.com
 wrote:
  Hi Experts,
 
  Few general Queries :
 
  1. Can a single block/partition in a RDD have more than 1 kafka message?
 or
  there will be one  only one kafka message per block? In a more broader
 way,
  is the message count related to block in any way or its just that any
  message received with in a particular block interval will go in the same
  block.
 
  2. If a worker goes down which runs the Receiver for Kafka, Will the
  receiver be restarted on some other worker?
 
  Regards,
  Sam
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Spark-streaming-tp20914.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 



FlatMapValues

2014-12-31 Thread Sanjay Subramanian
hey guys 
My dataset is like this 
025126,Chills,8.10,Injection site oedema,8.10,Injection site 
reaction,8.10,Malaise,8.10,Myalgia,8.10

Intended output is ==025126,Chills
025126,Injection site oedema
025126,Injection site reaction
025126,Malaise
025126,Myalgia

My code is as follows but the flatMapValues does not work even after I have 
created the pair 
RDD.reacRdd.map(line
 = line.split(',')).map(fields = {
  if (fields.length = 11  !fields(0).contains(VAERS_ID)) {

(fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9)))
  }
  else {

  }
  }).filter(line = line.toString.length()  0).flatMapValues(skus = 
skus.split('\t')).saveAsTextFile(/data/vaers/msfx/reac/ + 
outFile)
thanks
sanjay

pyspark.daemon not found

2014-12-31 Thread Naveen Kumar Pokala
Error from python worker:
  python: module pyspark.daemon not found
PYTHONPATH was:
  /home/npokala/data/spark-install/spark-master/python:



Please can somebody help me on this, how to resolve the issue.

-Naveen


Re: How to set local property in beeline connect to the spark thrift server

2014-12-31 Thread 田毅
Hi, Xiaoyu!

You can use `spark.sql.thriftserver.scheduler.pool` instead of
`spark.scheduler.pool` only in spark thrift server.



On Wed, Dec 31, 2014 at 3:55 PM, Xiaoyu Wang wangxy...@gmail.com wrote:

 Hi all!

 I use Spark SQL1.2 start the thrift server on yarn.

 I want to use fair scheduler in the thrift server.

 I set the properties in spark-defaults.conf like this:
 spark.scheduler.mode FAIR
 spark.scheduler.allocation.file
 /opt/spark-1.2.0-bin-2.4.1/conf/fairscheduler.xml

 In the thrift server UI can see the scheduler pool is ok.
 [image: 内嵌图片 1]

 How can I specify one sql job to the test pool.
 By default the sql job run in the default pool.

 In the http://spark.apache.org/docs/latest/job-scheduling.html document
 I see sc.setLocalProperty(spark.scheduler.pool, pool1) can be set in
 the code.

 In the beeline I execute set spark.scheduler.pool=test, but no use.
 But how can I set the local property in the beeline?




spark stream + cassandra (execution on event)

2014-12-31 Thread Oleg Ruchovets
Hi .
   I want to use spark streaming to read data from cassandra.
But in my case I need process data based on event. (not retrieving the data
constantly from Cassandra).
Question:
   what is the way to issue the processing using spark streaming from time
to time.

Thanks
Oleg.


Re: FlatMapValues

2014-12-31 Thread Raghavendra Pandey
Why don't you push \n instead of \t in your first transformation [
(fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t
+fields(9)))] and then do saveAsTextFile?

-Raghavendra

On Wed Dec 31 2014 at 1:42:55 PM Sanjay Subramanian
sanjaysubraman...@yahoo.com.invalid wrote:

 hey guys

 My dataset is like this

 025126,Chills,8.10,Injection site oedema,8.10,Injection site
 reaction,8.10,Malaise,8.10,Myalgia,8.10

 Intended output is
 ==
 025126,Chills
 025126,Injection site oedema
 025126,Injection site reaction
 025126,Malaise
 025126,Myalgia

 My code is as follows but the flatMapValues does not work even after I have 
 created the pair RDD.

 

 reacRdd.map(line = line.split(',')).map(fields = {
   if (fields.length = 11  !fields(0).contains(VAERS_ID)) {
 
 (fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9)))
   }
   else {
 
   }
   }).filter(line = line.toString.length()  0).flatMapValues(skus = 
 skus.split('\t')).saveAsTextFile(/data/vaers/msfx/reac/ + outFile)

 


 thanks

 sanjay



Exception in thread main org.apache.hadoop.ipc.RemoteException: Server IPC version 9 cannot communicate with client version 4

2014-12-31 Thread Hafiz Mujadid
I am accessing hdfs with spark .textFile method. and I receive error as 

Exception in thread main org.apache.hadoop.ipc.RemoteException: Server IPC
version 9 cannot communicate with client version 4


here are my dependencies 
http://apache-spark-user-list.1001560.n3.nabble.com/file/n20925/Untitled.png 


Any suggestion ? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Exception-in-thread-main-org-apache-hadoop-ipc-RemoteException-Server-IPC-version-9-cannot-communica4-tp20925.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Exception in thread main org.apache.hadoop.ipc.RemoteException: Server IPC version 9 cannot communicate with client version 4

2014-12-31 Thread Sean Owen
This generally means you have packaged Hadoop 1.x classes into your
app accidentally. The most common cause is not marking Hadoop and
Spark classes as provided dependencies. Your app doesn't need to
ship its own copy of these classes when you use spark-submit.

On Wed, Dec 31, 2014 at 10:47 AM, Hafiz Mujadid
hafizmujadi...@gmail.com wrote:
 I am accessing hdfs with spark .textFile method. and I receive error as

 Exception in thread main org.apache.hadoop.ipc.RemoteException: Server IPC
 version 9 cannot communicate with client version 4


 here are my dependencies
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n20925/Untitled.png


 Any suggestion ?



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Exception-in-thread-main-org-apache-hadoop-ipc-RemoteException-Server-IPC-version-9-cannot-communica4-tp20925.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: pyspark.daemon not found

2014-12-31 Thread Naveen Kumar Pokala
Hi,

I am receiving the following error when I am trying to connect spark cluster( 
which is on unix) from my windows machine using pyspark interactive shell

 pyspark -master (spark cluster url)

Then I executed the following  commands.


lines = sc.textFile(hdfs://master/data/spark/SINGLE.TXT)
lineLengths = lines.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)

I got the following Error

14/12/31 16:20:15 INFO DAGScheduler: Job 0 failed: reduce at stdin:1, took 
6.960438 s
Traceback (most recent call last):
  File stdin, line 1, in module
  File C:\Users\npokala\Downloads\spark-master\python\pyspark\rdd.py, line 
715, in reduce
vals = self.mapPartitions(func).collect()
  File C:\Users\npokala\Downloads\spark-master\python\pyspark\rdd.py, line 
676, in collect
bytesInJava = self._jrdd.collect().iterator()
  File 
C:\Users\npokala\Downloads\spark-master\python\lib\py4j-0.8.2.1-src.zip\py4j\java_gateway.py,
 line 538, in __call__
  File 
C:\Users\npokala\Downloads\spark-master\python\lib\py4j-0.8.2.1-src.zip\py4j\protocol.py,
 line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o23.collect.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 
7,
Error from python worker:
  python: module pyspark.daemon not found
PYTHONPATH was:
  
/home/npokala/data/spark-install/spark-master/python:/home/npokala/data/spark-install/spark-master/python/lib/py4j-0.8.2.1-src.zip:/home/npokala/data/spark-ins
java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at 
org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:163)
at 
org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:86)
at 
org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:62)
at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:102)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:265)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at scala.Option.foreach(Option.scala:236)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Please help me to resolve this issue.


-Naveen



From: Naveen Kumar Pokala [mailto:npok...@spcapitaliq.com]
Sent: Wednesday, December 31, 2014 2:28 PM
To: user@spark.apache.org
Subject: pyspark.daemon not found

Error from python worker:
  python: 

NoSuchMethodError: com.typesafe.config.Config.getDuration with akka-http/akka-stream

2014-12-31 Thread Christophe Billiard
Hi all,

I am currently trying to combine datastax's spark-cassandra-connector and
typesafe's akka-http-experimental
on Spark 1.1.1 (spark-cassandra-connector for Spark 1.2.0 not out yet) and
scala 2.10.4
I am using the hadoop 2.4 pre built package. (build.sbt file at the end)

To solve the java.lang.NoClassDefFoundError:
com/datastax/spark/connector/mapper/ColumnMapper
and other NoClassDefFoundErrors, I have to give some jars to Spark
(build.sbt is not enough).
The connectors works fine.

My spark submit looks like:
sbt clean package; bin/spark-submit   --class SimpleAppStreaming3  
--master local[*]  --jars
spark-cassandra-connector_2.10-1.1.0.jar,cassandra-driver-core-2.1.3.jar,cassandra-thrift-2.0.5.jar,joda-time-2.6.jar
target/scala-2.10/simple-project_2.10-1.0.jar

Then I am trying to add some akka-http/akka-stream features.
Like before I get a java.lang.NoClassDefFoundError:
akka/stream/FlowMaterializer$
Same solution, I begin to add jars.

Now my spark submit looks like:
sbt clean package; bin/spark-submit   --class impleAppStreaming3   --master
local[*]  --jars
spark-cassandra-connector_2.10-1.1.0.jar,cassandra-driver-core-2.1.3.jar,cassandra-thrift-2.0.5.jar,joda-time-2.6.jar,akka-stream-experimental_2.10-1.0-M2.jar
 
target/scala-2.10/simple-project_2.10-1.0.jar

Then I have a new kind of error:
Exception in thread main java.lang.NoSuchMethodError:
com.typesafe.config.Config.getDuration(Ljava/lang/String;Ljava/util/concurrent/TimeUnit;)J
at
akka.stream.StreamSubscriptionTimeoutSettings$.apply(FlowMaterializer.scala:256)
at akka.stream.MaterializerSettings$.apply(FlowMaterializer.scala:185)
at akka.stream.MaterializerSettings$.apply(FlowMaterializer.scala:172)
at 
akka.stream.FlowMaterializer$$anonfun$1.apply(FlowMaterializer.scala:42)
at 
akka.stream.FlowMaterializer$$anonfun$1.apply(FlowMaterializer.scala:42)
at scala.Option.getOrElse(Option.scala:120)
at akka.stream.FlowMaterializer$.apply(FlowMaterializer.scala:42)
at SimpleAppStreaming3$.main(SimpleAppStreaming3.scala:240)
at SimpleAppStreaming3.main(SimpleAppStreaming3.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:329)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

I can't get rid of this error.
I tried:
1) adding several jars (including config-1.2.1.jar)
2) studying the dependency tree (with
https://github.com/jrudolph/sbt-dependency-graph)
3) excluding libraryDependencies (with dependencyOverrides)

Any ideas?

Bonus question: Is there a way to avoid adding all these jars with --jars?

*My build.sbt file*

name := Simple Project

version := 1.0

scalaVersion := 2.10.4

libraryDependencies += org.apache.spark %% spark-core % 1.1.1
//exclude(com.typesafe, config)

libraryDependencies += org.apache.spark %% spark-sql % 1.1.1

libraryDependencies += com.datastax.cassandra % cassandra-driver-core %
2.1.3

libraryDependencies += com.datastax.spark %% spark-cassandra-connector %
1.1.0 withSources() withJavadoc()

libraryDependencies += org.apache.cassandra % cassandra-thrift % 2.0.5

libraryDependencies += joda-time % joda-time % 2.6



libraryDependencies += com.typesafe.akka %% akka-actor  % 2.3.8

libraryDependencies += com.typesafe.akka %% akka-testkit% 2.3.8

libraryDependencies += org.apache.hadoop %  hadoop-client   % 2.4.0

libraryDependencies += ch.qos.logback%  logback-classic % 1.1.2

libraryDependencies += org.mockito   %  mockito-all % 1.10.17

libraryDependencies += org.scalatest %% scalatest   % 2.2.3

libraryDependencies += org.slf4j %  slf4j-api   % 1.7.5

libraryDependencies += org.apache.spark  %% spark-streaming % 1.1.1

 
libraryDependencies += com.typesafe.akka %% akka-stream-experimental   
% 1.0-M2

libraryDependencies += com.typesafe.akka %% akka-http-experimental 
% 1.0-M2

libraryDependencies += com.typesafe.akka %% akka-http-core-experimental
% 1.0-M2


libraryDependencies += com.typesafe % config % 1.2.1

dependencyOverrides += com.typesafe % config % 1.2.1




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodError-com-typesafe-config-Config-getDuration-with-akka-http-akka-stream-tp20926.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: building spark1.2 meet error

2014-12-31 Thread xhudik
Hi J_soft,

for me it is working, I didn't put -Dscala-2.10 -X parameters. I got only
one warning, since I don't have hadoop 2.5 it didn't activate this profile:
/larix@kovral:~/sources/spark-1.2.0 mvn -Pyarn -Phadoop-2.5
-Dhadoop.version=2.5.0 -DskipTests clean package


Found 0 infos
Finished in 3 ms
[INFO]

[INFO] Reactor Summary:
[INFO] 
[INFO] Spark Project Parent POM ... SUCCESS [ 14.177
s]
[INFO] Spark Project Networking ... SUCCESS [ 14.670
s]
[INFO] Spark Project Shuffle Streaming Service  SUCCESS [  9.030
s]
[INFO] Spark Project Core . SUCCESS [04:42
min]
[INFO] Spark Project Bagel  SUCCESS [ 26.184
s]
[INFO] Spark Project GraphX ... SUCCESS [01:07
min]
[INFO] Spark Project Streaming  SUCCESS [01:35
min]
[INFO] Spark Project Catalyst . SUCCESS [01:48
min]
[INFO] Spark Project SQL .. SUCCESS [01:55
min]
[INFO] Spark Project ML Library ... SUCCESS [02:17
min]
[INFO] Spark Project Tools  SUCCESS [ 15.527
s]
[INFO] Spark Project Hive . SUCCESS [01:43
min]
[INFO] Spark Project REPL . SUCCESS [ 45.154
s]
[INFO] Spark Project YARN Parent POM .. SUCCESS [  3.885
s]
[INFO] Spark Project YARN Stable API .. SUCCESS [01:00
min]
[INFO] Spark Project Assembly . SUCCESS [ 50.812
s]
[INFO] Spark Project External Twitter . SUCCESS [ 21.401
s]
[INFO] Spark Project External Flume Sink .. SUCCESS [ 25.207
s]
[INFO] Spark Project External Flume ... SUCCESS [ 34.734
s]
[INFO] Spark Project External MQTT  SUCCESS [ 22.617
s]
[INFO] Spark Project External ZeroMQ .. SUCCESS [ 22.444
s]
[INFO] Spark Project External Kafka ... SUCCESS [ 33.566
s]
[INFO] Spark Project Examples . SUCCESS [01:23
min]
[INFO] Spark Project YARN Shuffle Service . SUCCESS [  4.873
s]
[INFO]

[INFO] BUILD SUCCESS
[INFO]

[INFO] Total time: 23:20 min
[INFO] Finished at: 2014-12-31T12:02:32+01:00
[INFO] Final Memory: 76M/855M
[INFO]

[WARNING] The requested profile hadoop-2.5 could not be activated because
it does not exist./


If it won't work for you. I'd try to delete all sources, download source
code once more and try again ...

good luck, Tomas




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/building-spark1-2-meet-error-tp20853p20927.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: FlatMapValues

2014-12-31 Thread Kapil Malik
Hi Sanjay,

I tried running your code on spark shell piece by piece –

// Setup
val line1 = “025126,Chills,8.10,Injection site oedema,8.10,Injection site 
reaction,8.10,Malaise,8.10,Myalgia,8.10”
val line2 = “025127,Chills,8.10,Injection site oedema,8.10,Injection site 
reaction,8.10,Malaise,8.10,Myalgia,8.10”
val lines = Array[String](line1, line2)

val r1 = sc.parallelize(lines, 2) // r1 is the original RDD[String] to begin 
with

val r2 = r1.map(line = line.split(',')) // RDD[Array[String]] – so far, so good
val r3 = r2.map(fields = {
  if (fields.length = 11  !fields(0).contains(VAERS_ID)) {

(fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9)))
 // Returns a pair (String, String), good
  }
  else {
 // Returns a String, bad
  }
  }) // RDD[Serializable] – PROBLEM

I was not even able to apply flatMapValues since the filtered RDD passed to it 
is RDD[Serializable] and not a pair RDD. I am surprised how your code compiled 
correctly.


The following changes in your snippet make it work as intended -


reacRdd.map(line = line.split(',')).map(fields = {
  if (fields.length = 11  !fields(0).contains(VAERS_ID)) {

(fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9)))
  }
  else {
(,)
  }
  }).filter(pair = pair._1.length()  0).flatMapValues(skus = 
skus.split('\t')).saveAsTextFile(/data/vaers/msfx/reac/ + outFile)

Please note that this too saves lines like (025126,Chills), i.e. with opening 
and closing brackets ( and ). If you want to get rid of them, better do another 
map operation to map pair to String.

Kapil

From: Sanjay Subramanian [mailto:sanjaysubraman...@yahoo.com.INVALID]
Sent: 31 December 2014 13:42
Cc: user@spark.apache.org
Subject: FlatMapValues

hey guys

My dataset is like this

025126,Chills,8.10,Injection site oedema,8.10,Injection site 
reaction,8.10,Malaise,8.10,Myalgia,8.10

Intended output is
==
025126,Chills
025126,Injection site oedema
025126,Injection site reaction
025126,Malaise
025126,Myalgia


My code is as follows but the flatMapValues does not work even after I have 
created the pair RDD.



reacRdd.map(line = line.split(',')).map(fields = {
  if (fields.length = 11  !fields(0).contains(VAERS_ID)) {

(fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9)))
  }
  else {

  }
  }).filter(line = line.toString.length()  0).flatMapValues(skus = 
skus.split('\t')).saveAsTextFile(/data/vaers/msfx/reac/ + outFile)



thanks

sanjay


Re: FlatMapValues

2014-12-31 Thread Fernando O.
Hi Sanjay,

Doing an if inside a Map sounds like a bad idea, it seems like you actually
want to filter and then apply map

On Wed, Dec 31, 2014 at 9:54 AM, Kapil Malik kma...@adobe.com wrote:

  Hi Sanjay,



 I tried running your code on spark shell piece by piece –



 // Setup

 val line1 = “025126,Chills,8.10,Injection site oedema,8.10,Injection site
 reaction,8.10,Malaise,8.10,Myalgia,8.10”

 val line2 = “025127,Chills,8.10,Injection site oedema,8.10,Injection site
 reaction,8.10,Malaise,8.10,Myalgia,8.10”

 val lines = Array[String](line1, line2)



 val r1 = sc.parallelize(lines, 2) // r1 is the original RDD[String] to
 begin with



 val r2 = r1.map(line = line.split(',')) // RDD[Array[String]] – so far,
 so good

 val r3 = r2.map(fields = {

   if (fields.length = 11  !fields(0).contains(VAERS_ID)) {


 (fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9)))
 // Returns a pair (String, String), good

   }

   else {

  // Returns a String, bad

   }

   }) // RDD[Serializable] – PROBLEM



 I was not even able to apply flatMapValues since the filtered RDD passed
 to it is RDD[Serializable] and not a pair RDD. I am surprised how your code
 compiled correctly.





 The following changes in your snippet make it work as intended -



 reacRdd.map(line = line.split(*','*)).map(fields = {
   *if *(fields.length = 11  !fields(0).contains(*VAERS_ID*)) {
 
 (fields(0),(fields(1)+***\t***+fields(3)+***\t***+fields(5)+***\t***+fields(7)+***\t***+fields(9)))
   }
   *else *{
 (*,)*
   }
   }).filter(pair = pair._1.length()  0).flatMapValues(skus = 
 skus.split(*'**\t**'*)).saveAsTextFile(*/data/vaers/msfx/reac/ *+ outFile)



 Please note that this too saves lines like (025126,Chills), i.e. with
 opening and closing brackets ( and ). If you want to get rid of them,
 better do another map operation to map pair to String.



 Kapil



 *From:* Sanjay Subramanian [mailto:sanjaysubraman...@yahoo.com.INVALID]
 *Sent:* 31 December 2014 13:42
 *Cc:* user@spark.apache.org
 *Subject:* FlatMapValues



 hey guys



 My dataset is like this



 025126,Chills,8.10,Injection site oedema,8.10,Injection site
 reaction,8.10,Malaise,8.10,Myalgia,8.10



 Intended output is

 ==

 025126,Chills

 025126,Injection site oedema

 025126,Injection site reaction

 025126,Malaise

 025126,Myalgia



 My code is as follows but the flatMapValues does not work even after I have 
 created the pair RDD.

 

 reacRdd.map(line = line.split(*','*)).map(fields = {
   *if *(fields.length = 11  !fields(0).contains(*VAERS_ID*)) {
 
 (fields(0),(fields(1)+***\t***+fields(3)+***\t***+fields(5)+***\t***+fields(7)+***\t***+fields(9)))
   }
   *else *{

 *  *}
   }).filter(line = line.toString.length()  0).flatMapValues(skus = 
 skus.split(*'**\t**'*)).saveAsTextFile(*/data/vaers/msfx/reac/ *+ outFile)

 



 thanks



 sanjay



Fwd: Sample Spark Program Error

2014-12-31 Thread Naveen Madhire
Hi All,

I am trying to run a sample Spark program using Scala SBT,

Below is the program,

def main(args: Array[String]) {

  val logFile = E:/ApacheSpark/usb/usb/spark/bin/README.md // Should
be some file on your system
  val sc = new SparkContext(local, Simple App,
E:/ApacheSpark/usb/usb/spark/bin,List(target/scala-2.10/sbt2_2.10-1.0.jar))
  val logData = sc.textFile(logFile, 2).cache()

  val numAs = logData.filter(line = line.contains(a)).count()
  val numBs = logData.filter(line = line.contains(b)).count()

  println(Lines with a: %s, Lines with b: %s.format(numAs, numBs))

}


Below is the error log,


14/12/30 23:20:21 INFO rdd.HadoopRDD: Input split:
file:/E:/ApacheSpark/usb/usb/spark/bin/README.md:0+673
14/12/30 23:20:21 INFO storage.MemoryStore: ensureFreeSpace(2032) called
with curMem=34047, maxMem=280248975
14/12/30 23:20:21 INFO storage.MemoryStore: Block rdd_1_0 stored as values
in memory (estimated size 2032.0 B, free 267.2 MB)
14/12/30 23:20:21 INFO storage.BlockManagerInfo: Added rdd_1_0 in memory on
zealot:61452 (size: 2032.0 B, free: 267.3 MB)
14/12/30 23:20:21 INFO storage.BlockManagerMaster: Updated info of block
rdd_1_0
14/12/30 23:20:21 INFO executor.Executor: Finished task 0.0 in stage 0.0
(TID 0). 2300 bytes result sent to driver
14/12/30 23:20:21 INFO scheduler.TaskSetManager: Starting task 1.0 in stage
0.0 (TID 1, localhost, PROCESS_LOCAL, 1264 bytes)
14/12/30 23:20:21 INFO executor.Executor: Running task 1.0 in stage 0.0
(TID 1)
14/12/30 23:20:21 INFO spark.CacheManager: Partition rdd_1_1 not found,
computing it
14/12/30 23:20:21 INFO rdd.HadoopRDD: Input split:
file:/E:/ApacheSpark/usb/usb/spark/bin/README.md:673+673
14/12/30 23:20:21 INFO scheduler.TaskSetManager: Finished task 0.0 in stage
0.0 (TID 0) in 3507 ms on localhost (1/2)
14/12/30 23:20:21 INFO storage.MemoryStore: ensureFreeSpace(1912) called
with curMem=36079, maxMem=280248975
14/12/30 23:20:21 INFO storage.MemoryStore: Block rdd_1_1 stored as values
in memory (estimated size 1912.0 B, free 267.2 MB)
14/12/30 23:20:21 INFO storage.BlockManagerInfo: Added rdd_1_1 in memory on
zealot:61452 (size: 1912.0 B, free: 267.3 MB)
14/12/30 23:20:21 INFO storage.BlockManagerMaster: Updated info of block
rdd_1_1
14/12/30 23:20:21 INFO executor.Executor: Finished task 1.0 in stage 0.0
(TID 1). 2300 bytes result sent to driver
14/12/30 23:20:21 INFO scheduler.TaskSetManager: Finished task 1.0 in stage
0.0 (TID 1) in 261 ms on localhost (2/2)
14/12/30 23:20:21 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0,
whose tasks have all completed, from pool
14/12/30 23:20:21 INFO scheduler.DAGScheduler: Stage 0 (count at
Test1.scala:19) finished in 3.811 s
14/12/30 23:20:21 INFO spark.SparkContext: Job finished: count at
Test1.scala:19, took 3.997365232 s
14/12/30 23:20:21 INFO spark.SparkContext: Starting job: count at
Test1.scala:20
14/12/30 23:20:21 INFO scheduler.DAGScheduler: Got job 1 (count at
Test1.scala:20) with 2 output partitions (allowLocal=false)
14/12/30 23:20:21 INFO scheduler.DAGScheduler: Final stage: Stage 1(count
at Test1.scala:20)
14/12/30 23:20:21 INFO scheduler.DAGScheduler: Parents of final stage:
List()
14/12/30 23:20:21 INFO scheduler.DAGScheduler: Missing parents: List()
14/12/30 23:20:21 INFO scheduler.DAGScheduler: Submitting Stage 1
(FilteredRDD[3] at filter at Test1.scala:20), which has no missing parents
14/12/30 23:20:21 INFO storage.MemoryStore: ensureFreeSpace(2600) called
with curMem=37991, maxMem=280248975
14/12/30 23:20:21 INFO storage.MemoryStore: Block broadcast_2 stored as
values in memory (estimated size 2.5 KB, free 267.2 MB)
14/12/30 23:20:21 INFO scheduler.DAGScheduler: Submitting 2 missing tasks
from Stage 1 (FilteredRDD[3] at filter at Test1.scala:20)
14/12/30 23:20:21 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0
with 2 tasks
14/12/30 23:20:21 INFO scheduler.TaskSetManager: Starting task 0.0 in stage
1.0 (TID 2, localhost, ANY, 1264 bytes)
14/12/30 23:20:21 INFO executor.Executor: Running task 0.0 in stage 1.0
(TID 2)
14/12/30 23:20:21 INFO storage.BlockManager: Found block rdd_1_0 locally
14/12/30 23:20:21 INFO executor.Executor: Finished task 0.0 in stage 1.0
(TID 2). 1731 bytes result sent to driver
14/12/30 23:20:21 INFO scheduler.TaskSetManager: Starting task 1.0 in stage
1.0 (TID 3, localhost, ANY, 1264 bytes)
14/12/30 23:20:21 INFO executor.Executor: Running task 1.0 in stage 1.0
(TID 3)
14/12/30 23:20:21 INFO storage.BlockManager: Found block rdd_1_1 locally
14/12/30 23:20:21 INFO executor.Executor: Finished task 1.0 in stage 1.0
(TID 3). 1731 bytes result sent to driver
14/12/30 23:20:21 INFO scheduler.TaskSetManager: Finished task 1.0 in stage
1.0 (TID 3) in 7 ms on localhost (1/2)
14/12/30 23:20:21 INFO scheduler.TaskSetManager: Finished task 0.0 in stage
1.0 (TID 2) in 16 ms on localhost (2/2)
14/12/30 23:20:21 INFO scheduler.DAGScheduler: Stage 1 (count at
Test1.scala:20) finished in 0.016 s
14/12/30 23:20:21 INFO 

Re: Fwd: Sample Spark Program Error

2014-12-31 Thread RK
If you look at your program output closely, you can see the following output. 
Lines with a: 24, Lines with b: 15

The exception seems to be happening with Spark cleanup after executing your 
code. Try adding sc.stop() at the end of your program to see if the exception 
goes away.

 

 On Wednesday, December 31, 2014 6:40 AM, Naveen Madhire 
vmadh...@umail.iu.edu wrote:
   

 

Hi All,
I am trying to run a sample Spark program using Scala SBT,
Below is the program,
def main(args: Array[String]) {
      val logFile = E:/ApacheSpark/usb/usb/spark/bin/README.md // Should be 
some file on your system      val sc = new SparkContext(local, Simple App, 
E:/ApacheSpark/usb/usb/spark/bin,List(target/scala-2.10/sbt2_2.10-1.0.jar)) 
     val logData = sc.textFile(logFile, 2).cache()
      val numAs = logData.filter(line = line.contains(a)).count()      val 
numBs = logData.filter(line = line.contains(b)).count()
      println(Lines with a: %s, Lines with b: %s.format(numAs, numBs))
    }

Below is the error log,

14/12/30 23:20:21 INFO rdd.HadoopRDD: Input split: 
file:/E:/ApacheSpark/usb/usb/spark/bin/README.md:0+67314/12/30 23:20:21 INFO 
storage.MemoryStore: ensureFreeSpace(2032) called with curMem=34047, 
maxMem=28024897514/12/30 23:20:21 INFO storage.MemoryStore: Block rdd_1_0 
stored as values in memory (estimated size 2032.0 B, free 267.2 MB)14/12/30 
23:20:21 INFO storage.BlockManagerInfo: Added rdd_1_0 in memory on zealot:61452 
(size: 2032.0 B, free: 267.3 MB)14/12/30 23:20:21 INFO 
storage.BlockManagerMaster: Updated info of block rdd_1_014/12/30 23:20:21 INFO 
executor.Executor: Finished task 0.0 in stage 0.0 (TID 0). 2300 bytes result 
sent to driver14/12/30 23:20:21 INFO scheduler.TaskSetManager: Starting task 
1.0 in stage 0.0 (TID 1, localhost, PROCESS_LOCAL, 1264 bytes)14/12/30 23:20:21 
INFO executor.Executor: Running task 1.0 in stage 0.0 (TID 1)14/12/30 23:20:21 
INFO spark.CacheManager: Partition rdd_1_1 not found, computing it14/12/30 
23:20:21 INFO rdd.HadoopRDD: Input split: 
file:/E:/ApacheSpark/usb/usb/spark/bin/README.md:673+67314/12/30 23:20:21 INFO 
scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 3507 ms on 
localhost (1/2)14/12/30 23:20:21 INFO storage.MemoryStore: 
ensureFreeSpace(1912) called with curMem=36079, maxMem=28024897514/12/30 
23:20:21 INFO storage.MemoryStore: Block rdd_1_1 stored as values in memory 
(estimated size 1912.0 B, free 267.2 MB)14/12/30 23:20:21 INFO 
storage.BlockManagerInfo: Added rdd_1_1 in memory on zealot:61452 (size: 1912.0 
B, free: 267.3 MB)14/12/30 23:20:21 INFO storage.BlockManagerMaster: Updated 
info of block rdd_1_114/12/30 23:20:21 INFO executor.Executor: Finished task 
1.0 in stage 0.0 (TID 1). 2300 bytes result sent to driver14/12/30 23:20:21 
INFO scheduler.TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 261 ms 
on localhost (2/2)14/12/30 23:20:21 INFO scheduler.TaskSchedulerImpl: Removed 
TaskSet 0.0, whose tasks have all completed, from pool 14/12/30 23:20:21 INFO 
scheduler.DAGScheduler: Stage 0 (count at Test1.scala:19) finished in 3.811 
s14/12/30 23:20:21 INFO spark.SparkContext: Job finished: count at 
Test1.scala:19, took 3.997365232 s14/12/30 23:20:21 INFO spark.SparkContext: 
Starting job: count at Test1.scala:2014/12/30 23:20:21 INFO 
scheduler.DAGScheduler: Got job 1 (count at Test1.scala:20) with 2 output 
partitions (allowLocal=false)14/12/30 23:20:21 INFO scheduler.DAGScheduler: 
Final stage: Stage 1(count at Test1.scala:20)14/12/30 23:20:21 INFO 
scheduler.DAGScheduler: Parents of final stage: List()14/12/30 23:20:21 INFO 
scheduler.DAGScheduler: Missing parents: List()14/12/30 23:20:21 INFO 
scheduler.DAGScheduler: Submitting Stage 1 (FilteredRDD[3] at filter at 
Test1.scala:20), which has no missing parents14/12/30 23:20:21 INFO 
storage.MemoryStore: ensureFreeSpace(2600) called with curMem=37991, 
maxMem=28024897514/12/30 23:20:21 INFO storage.MemoryStore: Block broadcast_2 
stored as values in memory (estimated size 2.5 KB, free 267.2 MB)14/12/30 
23:20:21 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 1 
(FilteredRDD[3] at filter at Test1.scala:20)14/12/30 23:20:21 INFO 
scheduler.TaskSchedulerImpl: Adding task set 1.0 with 2 tasks14/12/30 23:20:21 
INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, 
localhost, ANY, 1264 bytes)14/12/30 23:20:21 INFO executor.Executor: Running 
task 0.0 in stage 1.0 (TID 2)14/12/30 23:20:21 INFO storage.BlockManager: Found 
block rdd_1_0 locally14/12/30 23:20:21 INFO executor.Executor: Finished task 
0.0 in stage 1.0 (TID 2). 1731 bytes result sent to driver14/12/30 23:20:21 
INFO scheduler.TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, 
localhost, ANY, 1264 bytes)14/12/30 23:20:21 INFO executor.Executor: Running 
task 1.0 in stage 1.0 (TID 3)14/12/30 23:20:21 INFO storage.BlockManager: Found 
block rdd_1_1 locally14/12/30 23:20:21 INFO executor.Executor: Finished task 
1.0 in stage 1.0 (TID 3). 1731 bytes 

Re: FlatMapValues

2014-12-31 Thread Sanjay Subramanian
Hey Kapil, Fernando
Thanks for your mail.
[1] Fernando, if I don't use an if logic inside the map then if I have 
lines of input data that have less fields than I am expecting I get 
ArrayOutOfBounds exception. so the if is to safeguard against that. 
[2] Kapil, I am sorry I did not clarify. Yes my code DID NOT compile saying 
that flatMapValues is not defined.
In fact when I used your snippet , the code still does not compile 
Error:(36, 57) value flatMapValues is not a member of 
org.apache.spark.rdd.RDD[(String, String)]                }).filter(pair = 
pair._1.length()  0).flatMapValues(skus = 
skus.split('\t')).saveAsTextFile(/data/vaers/msfx/reac/ + outFile)            
                                            ^ 

My pom.xml looks like this 
dependency
   groupIdorg.apache.spark/groupId
   artifactIdspark-core_2.10/artifactId
   version1.2.0/version
/dependency
dependency
   groupIdorg.apache.spark/groupId
   artifactIdspark-sql_2.10/artifactId
   version1.2.0/version
/dependency

[3] To summarize all I want is to convert 

SUMMARY===when a dataset looks like the following 
1,red,blue,green2,yellow,violet,pink
I want to output the following and currently not able to
1,red1,blue1,green2,yellow2,violet2,pink

thanks

regards
sanjay

  From: Fernando O. fot...@gmail.com
 To: Kapil Malik kma...@adobe.com 
Cc: Sanjay Subramanian sanjaysubraman...@yahoo.com; user@spark.apache.org 
user@spark.apache.org 
 Sent: Wednesday, December 31, 2014 6:06 AM
 Subject: Re: FlatMapValues
   
Hi Sanjay,
Doing an if inside a Map sounds like a bad idea, it seems like you actually 
want to filter and then apply map



On Wed, Dec 31, 2014 at 9:54 AM, Kapil Malik kma...@adobe.com wrote:

Hi Sanjay, I tried running your code on spark shell piece by piece – // 
Setupval line1 = “025126,Chills,8.10,Injection site oedema,8.10,Injection site 
reaction,8.10,Malaise,8.10,Myalgia,8.10”val line2 = 
“025127,Chills,8.10,Injection site oedema,8.10,Injection site 
reaction,8.10,Malaise,8.10,Myalgia,8.10”val lines = Array[String](line1, line2) 
val r1 = sc.parallelize(lines, 2)// r1 is the original RDD[String] to begin 
with val r2 = r1.map(line = line.split(','))// RDD[Array[String]] – so far, so 
goodval r3 = r2.map(fields = {  if (fields.length = 11  
!fields(0).contains(VAERS_ID)) {    
(fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9)))//
 Returns a pair (String, String), good  }  else {    // Returns a String, bad 
 }  })// RDD[Serializable] – PROBLEM I was not even able to apply flatMapValues 
since the filtered RDD passed to it is RDD[Serializable] and not a pair RDD. I 
am surprised how your code compiled correctly.  The following changes in your 
snippet make it work as intended - reacRdd.map(line = 
line.split(',')).map(fields = {
  if (fields.length = 11  !fields(0).contains(VAERS_ID)) {
    
(fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9)))
  }
  else {
    (,)
  }
  }).filter(pair = pair._1.length()  0).flatMapValues(skus = 
skus.split('\t')).saveAsTextFile(/data/vaers/msfx/reac/ + outFile) Please 
note that this too saves lines like (025126,Chills),i.e. with opening and 
closing brackets ( and ). If you want to get rid of them, better do another map 
operation to map pair to String. Kapil From: Sanjay Subramanian 
[mailto:sanjaysubraman...@yahoo.com.INVALID]
Sent: 31 December 2014 13:42
Cc: user@spark.apache.org
Subject: FlatMapValues hey guys  My dataset is like this  
025126,Chills,8.10,Injection site oedema,8.10,Injection site 
reaction,8.10,Malaise,8.10,Myalgia,8.10 Intended output is 
==025126,Chills025126,Injection site oedema025126,Injection 
site reaction025126,Malaise025126,Myalgia My code is as follows but the 
flatMapValues does not work even after I have created the pair 
RDD.reacRdd.map(line
 = line.split(',')).map(fields = {
  if (fields.length = 11  !fields(0).contains(VAERS_ID)) {
    
(fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9)))
  }
  else {
    
  }
  }).filter(line = line.toString.length()  0).flatMapValues(skus = 
skus.split('\t')).saveAsTextFile(/data/vaers/msfx/reac/ + 
outFile)
 thanks sanjay



  

Re: Fwd: Sample Spark Program Error

2014-12-31 Thread Naveen Madhire
Yes. The exception is gone now after adding stop() at the end.
Can you please tell me what this stop() does at the end. Does it disable
the spark context.

On Wed, Dec 31, 2014 at 10:09 AM, RK prk...@yahoo.com wrote:

 If you look at your program output closely, you can see the following
 output.
 Lines with a: 24, Lines with b: 15

 The exception seems to be happening with Spark cleanup after executing
 your code. Try adding sc.stop() at the end of your program to see if the
 exception goes away.




   On Wednesday, December 31, 2014 6:40 AM, Naveen Madhire 
 vmadh...@umail.iu.edu wrote:




 Hi All,

 I am trying to run a sample Spark program using Scala SBT,

 Below is the program,

 def main(args: Array[String]) {

   val logFile = E:/ApacheSpark/usb/usb/spark/bin/README.md // Should
 be some file on your system
   val sc = new SparkContext(local, Simple App,
 E:/ApacheSpark/usb/usb/spark/bin,List(target/scala-2.10/sbt2_2.10-1.0.jar))
   val logData = sc.textFile(logFile, 2).cache()

   val numAs = logData.filter(line = line.contains(a)).count()
   val numBs = logData.filter(line = line.contains(b)).count()

   println(Lines with a: %s, Lines with b: %s.format(numAs, numBs))

 }


 Below is the error log,


 14/12/30 23:20:21 INFO rdd.HadoopRDD: Input split:
 file:/E:/ApacheSpark/usb/usb/spark/bin/README.md:0+673
 14/12/30 23:20:21 INFO storage.MemoryStore: ensureFreeSpace(2032) called
 with curMem=34047, maxMem=280248975
 14/12/30 23:20:21 INFO storage.MemoryStore: Block rdd_1_0 stored as values
 in memory (estimated size 2032.0 B, free 267.2 MB)
 14/12/30 23:20:21 INFO storage.BlockManagerInfo: Added rdd_1_0 in memory
 on zealot:61452 (size: 2032.0 B, free: 267.3 MB)
 14/12/30 23:20:21 INFO storage.BlockManagerMaster: Updated info of block
 rdd_1_0
 14/12/30 23:20:21 INFO executor.Executor: Finished task 0.0 in stage 0.0
 (TID 0). 2300 bytes result sent to driver
 14/12/30 23:20:21 INFO scheduler.TaskSetManager: Starting task 1.0 in
 stage 0.0 (TID 1, localhost, PROCESS_LOCAL, 1264 bytes)
 14/12/30 23:20:21 INFO executor.Executor: Running task 1.0 in stage 0.0
 (TID 1)
 14/12/30 23:20:21 INFO spark.CacheManager: Partition rdd_1_1 not found,
 computing it
 14/12/30 23:20:21 INFO rdd.HadoopRDD: Input split:
 file:/E:/ApacheSpark/usb/usb/spark/bin/README.md:673+673
 14/12/30 23:20:21 INFO scheduler.TaskSetManager: Finished task 0.0 in
 stage 0.0 (TID 0) in 3507 ms on localhost (1/2)
 14/12/30 23:20:21 INFO storage.MemoryStore: ensureFreeSpace(1912) called
 with curMem=36079, maxMem=280248975
 14/12/30 23:20:21 INFO storage.MemoryStore: Block rdd_1_1 stored as values
 in memory (estimated size 1912.0 B, free 267.2 MB)
 14/12/30 23:20:21 INFO storage.BlockManagerInfo: Added rdd_1_1 in memory
 on zealot:61452 (size: 1912.0 B, free: 267.3 MB)
 14/12/30 23:20:21 INFO storage.BlockManagerMaster: Updated info of block
 rdd_1_1
 14/12/30 23:20:21 INFO executor.Executor: Finished task 1.0 in stage 0.0
 (TID 1). 2300 bytes result sent to driver
 14/12/30 23:20:21 INFO scheduler.TaskSetManager: Finished task 1.0 in
 stage 0.0 (TID 1) in 261 ms on localhost (2/2)
 14/12/30 23:20:21 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0,
 whose tasks have all completed, from pool
 14/12/30 23:20:21 INFO scheduler.DAGScheduler: Stage 0 (count at
 Test1.scala:19) finished in 3.811 s
 14/12/30 23:20:21 INFO spark.SparkContext: Job finished: count at
 Test1.scala:19, took 3.997365232 s
 14/12/30 23:20:21 INFO spark.SparkContext: Starting job: count at
 Test1.scala:20
 14/12/30 23:20:21 INFO scheduler.DAGScheduler: Got job 1 (count at
 Test1.scala:20) with 2 output partitions (allowLocal=false)
 14/12/30 23:20:21 INFO scheduler.DAGScheduler: Final stage: Stage 1(count
 at Test1.scala:20)
 14/12/30 23:20:21 INFO scheduler.DAGScheduler: Parents of final stage:
 List()
 14/12/30 23:20:21 INFO scheduler.DAGScheduler: Missing parents: List()
 14/12/30 23:20:21 INFO scheduler.DAGScheduler: Submitting Stage 1
 (FilteredRDD[3] at filter at Test1.scala:20), which has no missing parents
 14/12/30 23:20:21 INFO storage.MemoryStore: ensureFreeSpace(2600) called
 with curMem=37991, maxMem=280248975
 14/12/30 23:20:21 INFO storage.MemoryStore: Block broadcast_2 stored as
 values in memory (estimated size 2.5 KB, free 267.2 MB)
 14/12/30 23:20:21 INFO scheduler.DAGScheduler: Submitting 2 missing tasks
 from Stage 1 (FilteredRDD[3] at filter at Test1.scala:20)
 14/12/30 23:20:21 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0
 with 2 tasks
 14/12/30 23:20:21 INFO scheduler.TaskSetManager: Starting task 0.0 in
 stage 1.0 (TID 2, localhost, ANY, 1264 bytes)
 14/12/30 23:20:21 INFO executor.Executor: Running task 0.0 in stage 1.0
 (TID 2)
 14/12/30 23:20:21 INFO storage.BlockManager: Found block rdd_1_0 locally
 14/12/30 23:20:21 INFO executor.Executor: Finished task 0.0 in stage 1.0
 (TID 2). 1731 bytes result sent to driver
 14/12/30 23:20:21 INFO scheduler.TaskSetManager: Starting task 1.0 in
 stage 

Re: Long-running job cleanup

2014-12-31 Thread Ganelin, Ilya
The previously submitted code doesn’t actually show the problem I was trying to 
show effectively since the issue becomes clear between subsequent steps. Within 
a single step it appears things are cleared up properly.  Memory usage becomes 
evident pretty quickly.

def showMemoryUsage(sc: SparkContext) = {
  val usersPerStep = 2500
  val count = 100
  val numSteps = count / usersPerStep

  val users = sc.parallelize(1 to count)
  val zippedUsers = users.zipWithIndex().cache()
  val userFeatures: RDD[(Int, Int)] = sc.parallelize(1 to count).map(s = (s, 
2)).partitionBy(new HashPartitioner(200)).cache()
  val productFeatures: RDD[(Int, Int)] = sc.parallelize(1 to 100).map(s = 
(s, 4)).repartition(1).cache()

  for (i - 1 to numSteps) {
val usersFiltered = zippedUsers.filter(s = {
  ((i - 1) * usersPerStep = s._2)  (s._2  i * usersPerStep)
}).map(_._1).collect()

val results = usersFiltered.map(user = {
  val userScore = userFeatures.lookup(user).head
  val recPerUser = Array(1,2,userScore)
  recPerUser
})

val mapedResults: Array[Int] = results.flatMap(scores = scores).toArray
log(State: Computed  + mapedResults.length +  predictions for stage  + 
i)

sc.parallelize(mapedResults)
// Write to disk (left out since problem is evident even without it)
  }
}

Example broadcast variable added:

14/12/30 19:25:19 INFO BlockManagerInfo: Added broadcast_0piece0 in memory on 
CLIENT_NODE:54640 (size: 794.0 B, free: 441.9 MB)

And then if I parse the entire log looking for “free : XXX.X MB” within a 
single step memory is cleared properly:

Free 441.1 MB
Free 439.8 MB
Free 439.8 MB
Free 441.1 MB
Free 441.1 MB
Free 439.8 MB

But between steps, the amount of available memory decreases (e.g. That range 
that things oscillate between shrinks) and over the course of many hours this 
eventually reduces to zero.

Free 440.7 MB
Free 438.7 MB
Free 438.7 MB
Free 440.7 MB

Free 435.4 MB
Free 425.0 MB
Free 425.0 MB
Free 435.4 MB
Free 425.0 MB
Free 425.0 MB
Free 435.4 MB

Free 426.7 MB
Free 402.5 MB
Free 402.5 MB
Free 426.7 MB
Free 426.7 MB
Free 402.5 MB
Free 402.5 MB
Free 426.7 MB
From: Ganelin, Ganelin, Ilya 
ilya.gane...@capitalone.commailto:ilya.gane...@capitalone.com
Date: Tuesday, December 30, 2014 at 7:30 PM
To: Ilya Ganelin ilgan...@gmail.commailto:ilgan...@gmail.com, Patrick 
Wendell pwend...@gmail.commailto:pwend...@gmail.com
Cc: user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: Long-running job cleanup

Hi Patrick, to follow up on the below discussion, I am including a short code 
snippet that produces the problem on 1.1. This is kind of stupid code since 
it’s a greatly simplified version of what I’m actually doing but it has a 
number of the key components in place. I’m also including some example log 
output. Thank you.


def showMemoryUsage(sc : SparkContext) = {

  val usersPerStep = 25000
  val count = 100
  val numSteps = count/usersPerStep

  val users = sc.parallelize(1 to count)
  val zippedUsers = users.zipWithIndex().cache()
  val userFeatures : RDD[(Int, Int)] = sc.parallelize(1 to 
count).map(s=(s,2)).cache()
  val productFeatures : RDD[(Int, Int)] = sc.parallelize(1 to 5000)
.map(s = (s, 4)).cache()

  for (i - 1 to numSteps) {
val usersFiltered = zippedUsers.filter(s = {
  ((i - 1) * usersPerStep = s._2)  (s._2  i * usersPerStep)
}).map(_._1).collect()

usersFiltered.foreach(user = {
  val mult = productFeatures.map(s = s._2 * userFeatures.lookup(user).head)
  mult.takeOrdered(20)

  // Normally this would then be written to disk
  // For the sake of the example this is all we're doing
})
  }
}

Example broadcast variable added:

14/12/30 19:25:19 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 
innovationclientnode01.cof.ds.capitalone.com:54640 (size: 794.0 B, free: 441.9 
MB)


And then if I parse the entire log looking for “free : XXX.X MB” I see the 
available memory slowly ticking away:

Free 441.8 MB

Free 441.8 MB

Free 441.8 MB

Free 441.8 MB

Free 441.8 MB

Free 441.8 MB

Free 441.8 MB

…

Free 441.7 MB

Free 441.7 MB

Free 441.7 MB

Free 441.7 MB

And so on.


Clearly the above code is not persisting the intermediate RDD (mult), yet 
memory is never being properly freed up.

From: Ilya Ganelin ilgan...@gmail.commailto:ilgan...@gmail.com
Date: Sunday, December 28, 2014 at 4:02 PM
To: Patrick Wendell pwend...@gmail.commailto:pwend...@gmail.com, Ganelin, 
Ilya ilya.gane...@capitalone.commailto:ilya.gane...@capitalone.com
Cc: user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: Long-running job cleanup

Hi Patrick - is that cleanup present in 1.1?

The overhead I am talking about is with regards to what I believe is shuffle 
related metadata. If I watch the execution log I see small broadcast variables 
created for every stage of execution, a few KB at a time, 

RE: FlatMapValues

2014-12-31 Thread Kapil Malik
Hi Sanjay,

Oh yes .. on flatMapValues, it's defined in PairRDDFunctions, and you need to 
import org.apache.spark.rdd.SparkContext._ to use them 
(http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions
 )

@Sean, yes indeed flatMap / flatMapValues both can be used.

Regards,

Kapil 

-Original Message-
From: Sean Owen [mailto:so...@cloudera.com] 
Sent: 31 December 2014 21:16
To: Sanjay Subramanian
Cc: user@spark.apache.org
Subject: Re: FlatMapValues

From the clarification below, the problem is that you are calling 
flatMapValues, which is only available on an RDD of key-value tuples.
Your map function returns a tuple in one case but a String in the other, so 
your RDD is a bunch of Any, which is not at all what you want. You need to 
return a tuple in both cases, which is what Kapil pointed out.

However it's still not quite what you want. Your input is basically [key value1 
value2 value3] so you want to flatMap that to (key,value1)
(key,value2) (key,value3). flatMapValues does not come into play.

On Wed, Dec 31, 2014 at 3:25 PM, Sanjay Subramanian 
sanjaysubraman...@yahoo.com wrote:
 My understanding is as follows

 STEP 1 (This would create a pair RDD)
 ===

 reacRdd.map(line = line.split(',')).map(fields = {
   if (fields.length = 11  !fields(0).contains(VAERS_ID)) {

 (fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9)))
   }
   else {
 
   }
   })

 STEP 2
 ===
 Since previous step created a pair RDD, I thought flatMapValues method 
 will be applicable.
 But the code does not even compile saying that flatMapValues is not 
 applicable to RDD :-(


 reacRdd.map(line = line.split(',')).map(fields = {
   if (fields.length = 11  !fields(0).contains(VAERS_ID)) {

 (fields(0),(fields(1)+\t+fields(3)+\t+fields(5)+\t+fields(7)+\t+fields(9)))
   }
   else {
 
   }
   }).flatMapValues(skus =
 skus.split('\t')).saveAsTextFile(/data/vaers/msfx/reac/ + outFile)


 SUMMARY
 ===
 when a dataset looks like the following

 1,red,blue,green
 2,yellow,violet,pink

 I want to output the following and I am asking how do I do that ? 
 Perhaps my code is 100% wrong. Please correct me and educate me :-)

 1,red
 1,blue
 1,green
 2,yellow
 2,violet
 2,pink

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Fwd: Sample Spark Program Error

2014-12-31 Thread Kapil Malik
Hi Naveen,

Quoting 
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext
SparkContext is Main entry point for Spark functionality. A SparkContext 
represents the connection to a Spark cluster, and can be used to create RDDs, 
accumulators and broadcast variables on that cluster.

Only one SparkContext may be active per JVM. You must stop() the active 
SparkContext before creating a new one

So stop ( ) shuts down the connection between Driver program and Spark master, 
and does some cleanup. Indeed, after calling this, you cannot do any operation 
on it or on any RDD created via this context.

Regards,

Kapil

From: Naveen Madhire [mailto:vmadh...@umail.iu.edu]
Sent: 31 December 2014 22:08
To: RK
Cc: user@spark.apache.org
Subject: Re: Fwd: Sample Spark Program Error

Yes. The exception is gone now after adding stop() at the end.
Can you please tell me what this stop() does at the end. Does it disable the 
spark context.

On Wed, Dec 31, 2014 at 10:09 AM, RK 
prk...@yahoo.commailto:prk...@yahoo.com wrote:
If you look at your program output closely, you can see the following output.
Lines with a: 24, Lines with b: 15

The exception seems to be happening with Spark cleanup after executing your 
code. Try adding sc.stop() at the end of your program to see if the exception 
goes away.



On Wednesday, December 31, 2014 6:40 AM, Naveen Madhire 
vmadh...@umail.iu.edumailto:vmadh...@umail.iu.edu wrote:


Hi All,

I am trying to run a sample Spark program using Scala SBT,

Below is the program,

def main(args: Array[String]) {

  val logFile = E:/ApacheSpark/usb/usb/spark/bin/README.md // Should be 
some file on your system
  val sc = new SparkContext(local, Simple App, 
E:/ApacheSpark/usb/usb/spark/bin,List(target/scala-2.10/sbt2_2.10-1.0.jar))
  val logData = sc.textFile(logFile, 2).cache()

  val numAs = logData.filter(line = line.contains(a)).count()
  val numBs = logData.filter(line = line.contains(b)).count()

  println(Lines with a: %s, Lines with b: %s.format(numAs, numBs))

}


Below is the error log,


14/12/30 23:20:21 INFO rdd.HadoopRDD: Input split: 
file:/E:/ApacheSpark/usb/usb/spark/bin/README.md:0+673
14/12/30 23:20:21 INFO storage.MemoryStore: ensureFreeSpace(2032) called with 
curMem=34047, maxMem=280248975
14/12/30 23:20:21 INFO storage.MemoryStore: Block rdd_1_0 stored as values in 
memory (estimated size 2032.0 B, free 267.2 MB)
14/12/30 23:20:21 INFO storage.BlockManagerInfo: Added rdd_1_0 in memory on 
zealot:61452 (size: 2032.0 B, free: 267.3 MB)
14/12/30 23:20:21 INFO storage.BlockManagerMaster: Updated info of block rdd_1_0
14/12/30 23:20:21 INFO executor.Executor: Finished task 0.0 in stage 0.0 (TID 
0). 2300 bytes result sent to driver
14/12/30 23:20:21 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 0.0 
(TID 1, localhost, PROCESS_LOCAL, 1264 bytes)
14/12/30 23:20:21 INFO executor.Executor: Running task 1.0 in stage 0.0 (TID 1)
14/12/30 23:20:21 INFO spark.CacheManager: Partition rdd_1_1 not found, 
computing it
14/12/30 23:20:21 INFO rdd.HadoopRDD: Input split: 
file:/E:/ApacheSpark/usb/usb/spark/bin/README.md:673+673
14/12/30 23:20:21 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 
(TID 0) in 3507 ms on localhost (1/2)
14/12/30 23:20:21 INFO storage.MemoryStore: ensureFreeSpace(1912) called with 
curMem=36079, maxMem=280248975
14/12/30 23:20:21 INFO storage.MemoryStore: Block rdd_1_1 stored as values in 
memory (estimated size 1912.0 B, free 267.2 MB)
14/12/30 23:20:21 INFO storage.BlockManagerInfo: Added rdd_1_1 in memory on 
zealot:61452 (size: 1912.0 B, free: 267.3 MB)
14/12/30 23:20:21 INFO storage.BlockManagerMaster: Updated info of block rdd_1_1
14/12/30 23:20:21 INFO executor.Executor: Finished task 1.0 in stage 0.0 (TID 
1). 2300 bytes result sent to driver
14/12/30 23:20:21 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 0.0 
(TID 1) in 261 ms on localhost (2/2)
14/12/30 23:20:21 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose 
tasks have all completed, from pool
14/12/30 23:20:21 INFO scheduler.DAGScheduler: Stage 0 (count at 
Test1.scala:19) finished in 3.811 s
14/12/30 23:20:21 INFO spark.SparkContext: Job finished: count at 
Test1.scala:19, took 3.997365232 s
14/12/30 23:20:21 INFO spark.SparkContext: Starting job: count at Test1.scala:20
14/12/30 23:20:21 INFO scheduler.DAGScheduler: Got job 1 (count at 
Test1.scala:20) with 2 output partitions (allowLocal=false)
14/12/30 23:20:21 INFO scheduler.DAGScheduler: Final stage: Stage 1(count at 
Test1.scala:20)
14/12/30 23:20:21 INFO scheduler.DAGScheduler: Parents of final stage: List()
14/12/30 23:20:21 INFO scheduler.DAGScheduler: Missing parents: List()
14/12/30 23:20:21 INFO scheduler.DAGScheduler: Submitting Stage 1 
(FilteredRDD[3] at filter at Test1.scala:20), which has no missing parents
14/12/30 23:20:21 INFO storage.MemoryStore: ensureFreeSpace(2600) called with 
curMem=37991, maxMem=280248975
14/12/30 

Re: Big performance difference between client and cluster deployment mode; is this expected?

2014-12-31 Thread Sean Owen
-dev, +user

A decent guess: Does your 'save' function entail collecting data back
to the driver? and are you running this from a machine that's not in
your Spark cluster? Then in client mode you're shipping data back to a
less-nearby machine, compared to with cluster mode. That could explain
the bottleneck.

On Wed, Dec 31, 2014 at 4:12 PM, Enno Shioji eshi...@gmail.com wrote:
 Hi,

 I have a very, very simple streaming job. When I deploy this on the exact
 same cluster, with the exact same parameters, I see big (40%) performance
 difference between client and cluster deployment mode. This seems a bit
 surprising.. Is this expected?

 The streaming job is:

 val msgStream = kafkaStream
   .map { case (k, v) = v}
   .map(DatatypeConverter.printBase64Binary)
   .foreachRDD(save)
   .saveAsTextFile(s3n://some.bucket/path, classOf[LzoCodec])

 I tried several times, but the job deployed with client mode can only
 write at 60% throughput of the job deployed with cluster mode and this
 happens consistently. I'm logging at INFO level, but my application code
 doesn't log anything so it's only Spark logs. The logs I see in client
 mode doesn't seem like a crazy amount.

 The setup is:
 spark-ec2 [...] \
   --copy-aws-credentials \
   --instance-type=m3.2xlarge \
   -s 2 launch test_cluster

 And all the deployment was done from the master machine.

 ᐧ

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Big performance difference between client and cluster deployment mode; is this expected?

2014-12-31 Thread Enno Shioji
Also the job was deployed from the master machine in the cluster.
ᐧ

On Wed, Dec 31, 2014 at 6:35 PM, Enno Shioji eshi...@gmail.com wrote:

 Oh sorry that was a edit mistake. The code is essentially:

  val msgStream = kafkaStream
.map { case (k, v) = v}
.map(DatatypeConverter.printBase64Binary)
.saveAsTextFile(s3n://some.bucket/path, classOf[LzoCodec])

 I.e. there is essentially no original code (I was calling saveAsTextFile
 in a save function but that was just a remnant from previous debugging).


 ᐧ

 On Wed, Dec 31, 2014 at 6:21 PM, Sean Owen so...@cloudera.com wrote:

 -dev, +user

 A decent guess: Does your 'save' function entail collecting data back
 to the driver? and are you running this from a machine that's not in
 your Spark cluster? Then in client mode you're shipping data back to a
 less-nearby machine, compared to with cluster mode. That could explain
 the bottleneck.

 On Wed, Dec 31, 2014 at 4:12 PM, Enno Shioji eshi...@gmail.com wrote:
  Hi,
 
  I have a very, very simple streaming job. When I deploy this on the
 exact
  same cluster, with the exact same parameters, I see big (40%)
 performance
  difference between client and cluster deployment mode. This seems a
 bit
  surprising.. Is this expected?
 
  The streaming job is:
 
  val msgStream = kafkaStream
.map { case (k, v) = v}
.map(DatatypeConverter.printBase64Binary)
.foreachRDD(save)
.saveAsTextFile(s3n://some.bucket/path, classOf[LzoCodec])
 
  I tried several times, but the job deployed with client mode can only
  write at 60% throughput of the job deployed with cluster mode and this
  happens consistently. I'm logging at INFO level, but my application code
  doesn't log anything so it's only Spark logs. The logs I see in client
  mode doesn't seem like a crazy amount.
 
  The setup is:
  spark-ec2 [...] \
--copy-aws-credentials \
--instance-type=m3.2xlarge \
-s 2 launch test_cluster
 
  And all the deployment was done from the master machine.
 
  ᐧ





Re: Big performance difference between client and cluster deployment mode; is this expected?

2014-12-31 Thread Enno Shioji
Oh sorry that was a edit mistake. The code is essentially:

 val msgStream = kafkaStream
   .map { case (k, v) = v}
   .map(DatatypeConverter.printBase64Binary)
   .saveAsTextFile(s3n://some.bucket/path, classOf[LzoCodec])

I.e. there is essentially no original code (I was calling saveAsTextFile in
a save function but that was just a remnant from previous debugging).


ᐧ

On Wed, Dec 31, 2014 at 6:21 PM, Sean Owen so...@cloudera.com wrote:

 -dev, +user

 A decent guess: Does your 'save' function entail collecting data back
 to the driver? and are you running this from a machine that's not in
 your Spark cluster? Then in client mode you're shipping data back to a
 less-nearby machine, compared to with cluster mode. That could explain
 the bottleneck.

 On Wed, Dec 31, 2014 at 4:12 PM, Enno Shioji eshi...@gmail.com wrote:
  Hi,
 
  I have a very, very simple streaming job. When I deploy this on the exact
  same cluster, with the exact same parameters, I see big (40%) performance
  difference between client and cluster deployment mode. This seems a
 bit
  surprising.. Is this expected?
 
  The streaming job is:
 
  val msgStream = kafkaStream
.map { case (k, v) = v}
.map(DatatypeConverter.printBase64Binary)
.foreachRDD(save)
.saveAsTextFile(s3n://some.bucket/path, classOf[LzoCodec])
 
  I tried several times, but the job deployed with client mode can only
  write at 60% throughput of the job deployed with cluster mode and this
  happens consistently. I'm logging at INFO level, but my application code
  doesn't log anything so it's only Spark logs. The logs I see in client
  mode doesn't seem like a crazy amount.
 
  The setup is:
  spark-ec2 [...] \
--copy-aws-credentials \
--instance-type=m3.2xlarge \
-s 2 launch test_cluster
 
  And all the deployment was done from the master machine.
 
  ᐧ



Re: building spark1.2 meet error

2014-12-31 Thread Jacek Laskowski
Hi,

Where does the following path that appears in the logs below come from?

/opt/xdsp/spark-1.2.0/H:\Soft\Maven\repository/org/scala-lang/scala-compiler/2.10.4/scala-compiler-2.10.4.jar

Did you somehow point at the local maven repository that's H:\Soft\Maven?

Jacek
31 gru 2014 01:48 j_soft zsof...@gmail.com napisał(a):

 no,it still fail use mvn -Pyarn -Phadoop-2.5 -Dhadoop.version=2.5.0
 -Dscala-2.10 -X -DskipTests clean package

 ...
 [DEBUG] /opt/xdsp/spark-1.2.0/core/src/main/scala
 [DEBUG] includes = [**/*.scala,**/*.java,]
 [DEBUG] excludes = []
 [WARNING] Zinc server is not available at port 3030 - reverting to normal
 incremental compile
 [INFO] Using incremental compilation
 [DEBUG] Setup = {
 [DEBUG]scala compiler =

 /opt/xdsp/spark-1.2.0/H:\Soft\Maven\repository/org/scala-lang/scala-compiler/2.10.4/scala-compiler-2.10.4.jar
 [DEBUG]scala library =

 /opt/xdsp/spark-1.2.0/H:\Soft\Maven\repository/org/scala-lang/scala-library/2.10.4/scala-library-2.10.4.jar
 [DEBUG]scala extra = {
 [DEBUG]

 /opt/xdsp/spark-1.2.0/H:\Soft\Maven\repository/org/scala-lang/scala-reflect/2.10.4/scala-reflect-2.10.4.jar
 [DEBUG]}
 [DEBUG]sbt interface =

 /opt/xdsp/spark-1.2.0/H:\Soft\Maven\repository/com/typesafe/sbt/sbt-interface/0.13.5/sbt-interface-0.13.5.jar
 [DEBUG]compiler interface sources =

 /opt/xdsp/spark-1.2.0/H:\Soft\Maven\repository/com/typesafe/sbt/compiler-interface/0.13.5/compiler-interface-0.13.5-sources.jar
 [DEBUG]java home =
 [DEBUG]fork java = false
 [DEBUG]cache directory = /root/.zinc/0.3.5
 [DEBUG] }
 [INFO] 'compiler-interface' not yet compiled for Scala 2.10.4. Compiling...
 [DEBUG] Plain interface to Scala compiler 2.10.4  with arguments:
 -nowarn
 -d
 /tmp/sbt_8b816650
 -bootclasspath


 /opt/jdk1.7/jre/lib/resources.jar:/opt/jdk1.7/jre/lib/rt.jar:/opt/jdk1.7/jre/lib/sunrsasign.jar:/opt/jdk1.7/jre/lib/jsse.jar:/opt/jdk1.7/jre/lib/jce.jar:/opt/jdk1.7/jre/lib/charsets.jar:/opt/jdk1.7/jre/lib/jfr.jar:/opt/jdk1.7/jre/classes:/opt/xdsp/spark-1.2.0/H:\Soft\Maven\repository/org/scala-lang/scala-library/2.10.4/scala-library-2.10.4.jar
 -classpath


 /opt/xdsp/spark-1.2.0/H:\Soft\Maven\repository/com/typesafe/sbt/sbt-interface/0.13.5/sbt-interface-0.13.5.jar:/opt/xdsp/spark-1.2.0/H:\Soft\Maven\repository/com/typesafe/sbt/compiler-interface/0.13.5/compiler-interface-0.13.5-sources.jar:/opt/xdsp/spark-1.2.0/H:\Soft\Maven\repository/org/scala-lang/scala-compiler/2.10.4/scala-compiler-2.10.4.jar:/opt/xdsp/spark-1.2.0/H:\Soft\Maven\repository/org/scala-lang/scala-reflect/2.10.4/scala-reflect-2.10.4.jar
 /tmp/sbt_b9456a7b/xsbt/API.scala
 /tmp/sbt_b9456a7b/xsbt/Analyzer.scala
 /tmp/sbt_b9456a7b/xsbt/Command.scala
 /tmp/sbt_b9456a7b/xsbt/Compat.scala
 /tmp/sbt_b9456a7b/xsbt/CompilerInterface.scala
 /tmp/sbt_b9456a7b/xsbt/ConsoleInterface.scala
 /tmp/sbt_b9456a7b/xsbt/DelegatingReporter.scala
 /tmp/sbt_b9456a7b/xsbt/Dependency.scala
 /tmp/sbt_b9456a7b/xsbt/ExtractAPI.scala
 /tmp/sbt_b9456a7b/xsbt/ExtractUsedNames.scala
 /tmp/sbt_b9456a7b/xsbt/LocateClassFile.scala
 /tmp/sbt_b9456a7b/xsbt/Log.scala
 /tmp/sbt_b9456a7b/xsbt/Message.scala
 /tmp/sbt_b9456a7b/xsbt/ScaladocInterface.scala
 error: scala.reflect.internal.MissingRequirementError: object scala.runtime
 in compiler mirror not found.
 at

 scala.reflect.internal.MissingRequirementError$.signal(MissingRequirementError.scala:16)
 at

 scala.reflect.internal.MissingRequirementError$.notFound(MissingRequirementError.scala:17)
 at
 scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:48)
 at
 scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:40)
 at
 scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:61)
 at
 scala.reflect.internal.Mirrors$RootsBase.getPackage(Mirrors.scala:172)
 at

 scala.reflect.internal.Mirrors$RootsBase.getRequiredPackage(Mirrors.scala:175)
 at

 scala.reflect.internal.Definitions$DefinitionsClass.RuntimePackage$lzycompute(Definitions.scala:183)
 at

 scala.reflect.internal.Definitions$DefinitionsClass.RuntimePackage(Definitions.scala:183)
 at

 scala.reflect.internal.Definitions$DefinitionsClass.RuntimePackageClass$lzycompute(Definitions.scala:184)
 at

 scala.reflect.internal.Definitions$DefinitionsClass.RuntimePackageClass(Definitions.scala:184)
 at

 scala.reflect.internal.Definitions$DefinitionsClass.AnnotationDefaultAttr$lzycompute(Definitions.scala:1024)
 at

 scala.reflect.internal.Definitions$DefinitionsClass.AnnotationDefaultAttr(Definitions.scala:1023)
 at

 scala.reflect.internal.Definitions$DefinitionsClass.syntheticCoreClasses$lzycompute(Definitions.scala:1153)
 at

 

Re: Why the major.minor version of the new hive-exec is 51.0?

2014-12-31 Thread Michael Armbrust
We actually do publish our own version of this jar, because the version
that the hive team publishes is an uber jar and this breaks all kinds of
things.  As a result I'd file the JIRA against Spark.

On Wed, Dec 31, 2014 at 12:55 PM, Ted Yu yuzhih...@gmail.com wrote:

 Michael:
 hive-exec-0.12.0-protobuf-2.5.jar is not generated from Spark source code,
 right ?

 What would be done after the JIRA is opened ?

 Cheers

 On Wed, Dec 31, 2014 at 12:16 PM, Michael Armbrust mich...@databricks.com
  wrote:

 This was not intended, can you open a JIRA?

 On Tue, Dec 30, 2014 at 8:40 PM, Ted Yu yuzhih...@gmail.com wrote:

 I extracted org/apache/hadoop/hive/common/CompressionUtils.class from the
 jar and used hexdump to view the class file.
 Bytes 6 and 7 are 00 and 33, respectively.

 According to http://en.wikipedia.org/wiki/Java_class_file, the jar was
 produced using Java 7.

 FYI

 On Tue, Dec 30, 2014 at 8:09 PM, Shixiong Zhu zsxw...@gmail.com wrote:

  The major.minor version of the new org.spark-project.hive.hive-exec is
  51.0, so it will require people use JDK7. Is it intentional?
 
  dependency
  groupIdorg.spark-project.hive/groupId
  artifactIdhive-exec/artifactId
  version0.12.0-protobuf-2.5/version
  /dependency
 
  You can use the following steps to reproduce it (Need to use JDK6):
 
  1. Create a Test.java file with the following content:
 
  public class Test {
 
  public static void main(String[] args) throws Exception{
 Class.forName(org.apache.hadoop.hive.conf.HiveConf);
  }
 
  }
 
  2. javac Test.java
  3. java -classpath
 
 
 ~/.m2/repository/org/spark-project/hive/hive-exec/0.12.0-protobuf-2.5/hive-exec-0.12.0-protobuf-2.5.jar:.
  Test
 
  Exception in thread main java.lang.UnsupportedClassVersionError:
  org/apache/hadoop/hive/conf/HiveConf : Unsupported major.minor version
 51.0
  at java.lang.ClassLoader.defineClass1(Native Method)
  at java.lang.ClassLoader.defineClassCond(ClassLoader.java:631)
  at java.lang.ClassLoader.defineClass(ClassLoader.java:615)
  at
 java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141)
  at java.net.URLClassLoader.defineClass(URLClassLoader.java:283)
  at java.net.URLClassLoader.access$000(URLClassLoader.java:58)
  at java.net.URLClassLoader$1.run(URLClassLoader.java:197)
  at java.security.AccessController.doPrivileged(Native Method)
  at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
  at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
  at java.lang.Class.forName0(Native Method)
  at java.lang.Class.forName(Class.java:169)
  at Test.main(Test.java:5)
 
 
  Best Regards,
  Shixiong Zhu
 






Re: Why the major.minor version of the new hive-exec is 51.0?

2014-12-31 Thread Ted Yu
I see.

I logged SPARK-5041 which references this thread.

Thanks

On Wed, Dec 31, 2014 at 12:57 PM, Michael Armbrust mich...@databricks.com
wrote:

 We actually do publish our own version of this jar, because the version
 that the hive team publishes is an uber jar and this breaks all kinds of
 things.  As a result I'd file the JIRA against Spark.

 On Wed, Dec 31, 2014 at 12:55 PM, Ted Yu yuzhih...@gmail.com wrote:

 Michael:
 hive-exec-0.12.0-protobuf-2.5.jar is not generated from Spark source
 code, right ?

 What would be done after the JIRA is opened ?

 Cheers

 On Wed, Dec 31, 2014 at 12:16 PM, Michael Armbrust 
 mich...@databricks.com wrote:

 This was not intended, can you open a JIRA?

 On Tue, Dec 30, 2014 at 8:40 PM, Ted Yu yuzhih...@gmail.com wrote:

 I extracted org/apache/hadoop/hive/common/CompressionUtils.class from
 the
 jar and used hexdump to view the class file.
 Bytes 6 and 7 are 00 and 33, respectively.

 According to http://en.wikipedia.org/wiki/Java_class_file, the jar was
 produced using Java 7.

 FYI

 On Tue, Dec 30, 2014 at 8:09 PM, Shixiong Zhu zsxw...@gmail.com
 wrote:

  The major.minor version of the new org.spark-project.hive.hive-exec is
  51.0, so it will require people use JDK7. Is it intentional?
 
  dependency
  groupIdorg.spark-project.hive/groupId
  artifactIdhive-exec/artifactId
  version0.12.0-protobuf-2.5/version
  /dependency
 
  You can use the following steps to reproduce it (Need to use JDK6):
 
  1. Create a Test.java file with the following content:
 
  public class Test {
 
  public static void main(String[] args) throws Exception{
 Class.forName(org.apache.hadoop.hive.conf.HiveConf);
  }
 
  }
 
  2. javac Test.java
  3. java -classpath
 
 
 ~/.m2/repository/org/spark-project/hive/hive-exec/0.12.0-protobuf-2.5/hive-exec-0.12.0-protobuf-2.5.jar:.
  Test
 
  Exception in thread main java.lang.UnsupportedClassVersionError:
  org/apache/hadoop/hive/conf/HiveConf : Unsupported major.minor
 version 51.0
  at java.lang.ClassLoader.defineClass1(Native Method)
  at java.lang.ClassLoader.defineClassCond(ClassLoader.java:631)
  at java.lang.ClassLoader.defineClass(ClassLoader.java:615)
  at
 java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141)
  at java.net.URLClassLoader.defineClass(URLClassLoader.java:283)
  at java.net.URLClassLoader.access$000(URLClassLoader.java:58)
  at java.net.URLClassLoader$1.run(URLClassLoader.java:197)
  at java.security.AccessController.doPrivileged(Native Method)
  at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
  at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
  at java.lang.Class.forName0(Native Method)
  at java.lang.Class.forName(Class.java:169)
  at Test.main(Test.java:5)
 
 
  Best Regards,
  Shixiong Zhu
 







Re: UpdateStateByKey persist to Tachyon

2014-12-31 Thread amkcom
bumping this thread up



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/UpdateStateByKey-persist-to-Tachyon-tp20798p20930.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Big performance difference between client and cluster deployment mode; is this expected?

2014-12-31 Thread Tathagata Das
Whats your spark-submit commands in both cases? Is it Spark Standalone or
YARN (both support client and cluster)? Accordingly what is the number of
executors/cores requested?

TD

On Wed, Dec 31, 2014 at 10:36 AM, Enno Shioji eshi...@gmail.com wrote:

 Also the job was deployed from the master machine in the cluster.
 ᐧ

 On Wed, Dec 31, 2014 at 6:35 PM, Enno Shioji eshi...@gmail.com wrote:

 Oh sorry that was a edit mistake. The code is essentially:

  val msgStream = kafkaStream
.map { case (k, v) = v}
.map(DatatypeConverter.printBase64Binary)
.saveAsTextFile(s3n://some.bucket/path, classOf[LzoCodec])

 I.e. there is essentially no original code (I was calling saveAsTextFile
 in a save function but that was just a remnant from previous debugging).


 ᐧ

 On Wed, Dec 31, 2014 at 6:21 PM, Sean Owen so...@cloudera.com wrote:

 -dev, +user

 A decent guess: Does your 'save' function entail collecting data back
 to the driver? and are you running this from a machine that's not in
 your Spark cluster? Then in client mode you're shipping data back to a
 less-nearby machine, compared to with cluster mode. That could explain
 the bottleneck.

 On Wed, Dec 31, 2014 at 4:12 PM, Enno Shioji eshi...@gmail.com wrote:
  Hi,
 
  I have a very, very simple streaming job. When I deploy this on the
 exact
  same cluster, with the exact same parameters, I see big (40%)
 performance
  difference between client and cluster deployment mode. This seems
 a bit
  surprising.. Is this expected?
 
  The streaming job is:
 
  val msgStream = kafkaStream
.map { case (k, v) = v}
.map(DatatypeConverter.printBase64Binary)
.foreachRDD(save)
.saveAsTextFile(s3n://some.bucket/path, classOf[LzoCodec])
 
  I tried several times, but the job deployed with client mode can only
  write at 60% throughput of the job deployed with cluster mode and
 this
  happens consistently. I'm logging at INFO level, but my application
 code
  doesn't log anything so it's only Spark logs. The logs I see in
 client
  mode doesn't seem like a crazy amount.
 
  The setup is:
  spark-ec2 [...] \
--copy-aws-credentials \
--instance-type=m3.2xlarge \
-s 2 launch test_cluster
 
  And all the deployment was done from the master machine.
 
  ᐧ






limit vs sample for indexing a small amount of data quickly?

2014-12-31 Thread Kevin Burton
Is there a limit function which just returns the first N records?

Sample is nice but I’m trying to do this so it’s super fast and just to
test the functionality of an algorithm.

With sample I’d have to compute the % that would yield 1000 results first…

Kevin

-- 

Founder/CEO Spinn3r.com
Location: *San Francisco, CA*
blog: http://burtonator.wordpress.com
… or check out my Google+ profile
https://plus.google.com/102718274791889610666/posts
http://spinn3r.com


Re: limit vs sample for indexing a small amount of data quickly?

2014-12-31 Thread Fernando O.
There's a take method that might do what you need:

*def take(**num**: **Int**): Array[T]*

Take the first num elements of the RDD.
On Jan 1, 2015 12:02 AM, Kevin Burton bur...@spinn3r.com wrote:

 Is there a limit function which just returns the first N records?

 Sample is nice but I’m trying to do this so it’s super fast and just to
 test the functionality of an algorithm.

 With sample I’d have to compute the % that would yield 1000 results first…

 Kevin

 --

 Founder/CEO Spinn3r.com
 Location: *San Francisco, CA*
 blog: http://burtonator.wordpress.com
 … or check out my Google+ profile
 https://plus.google.com/102718274791889610666/posts
 http://spinn3r.com




NullPointerException

2014-12-31 Thread rapelly kartheek
Hi,
I get this following Exception when I submit spark application that
calculates the frequency of characters in a file. Especially, when I
increase the size of data, I face this problem.

Exception in thread Thread-47 org.apache.spark.SparkException: Job
aborted due to stage failure: Task 11.0:10 failed 4 times, most recent
failure: Exception failure in TID 295 on host s1:
java.lang.NullPointerException
org.apache.spark.storage.BlockManager.org
$apache$spark$storage$BlockManager$$replicate(BlockManager.scala:786)
org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:752)
org.apache.spark.storage.BlockManager.put(BlockManager.scala:574)
org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:108)
org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
org.apache.spark.scheduler.Task.run(Task.scala:51)

org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


Any help?

Thank you!


Re: NullPointerException

2014-12-31 Thread Josh Rosen
Which version of Spark are you using?

On Wed, Dec 31, 2014 at 10:24 PM, rapelly kartheek kartheek.m...@gmail.com
wrote:

 Hi,
 I get this following Exception when I submit spark application that
 calculates the frequency of characters in a file. Especially, when I
 increase the size of data, I face this problem.

 Exception in thread Thread-47 org.apache.spark.SparkException: Job
 aborted due to stage failure: Task 11.0:10 failed 4 times, most recent
 failure: Exception failure in TID 295 on host s1:
 java.lang.NullPointerException
 org.apache.spark.storage.BlockManager.org
 $apache$spark$storage$BlockManager$$replicate(BlockManager.scala:786)
 org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:752)
 org.apache.spark.storage.BlockManager.put(BlockManager.scala:574)
 org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:108)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
 org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
 org.apache.spark.scheduler.Task.run(Task.scala:51)

 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 java.lang.Thread.run(Thread.java:745)
 Driver stacktrace:
 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
 at scala.Option.foreach(Option.scala:236)
 at
 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
 at
 org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 at akka.actor.ActorCell.invoke(ActorCell.scala:456)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


 Any help?

 Thank you!



spark ignoring all memory settings and defaulting to 512MB?

2014-12-31 Thread Kevin Burton
This is really weird and I’m surprised no one has found this issue yet.

I’ve spent about an hour or more trying to debug this :-(

My spark install is ignoring ALL my memory settings.  And of course my job
is running out of memory.

The default is 512MB so pretty darn small.

The worker and master start up and both use 512M

This alone is very weird and poor documentation IMO because:

 SPARK_WORKER_MEMORY, to set how much total memory workers have to give
executors (e.g. 1000m, 2g)”

… so if it’s giving it to executors, AKA the memory executors run with,
then it should be SPARK_EXECUTOR_MEMORY…

… and the worker actually uses SPARK_DAEMON memory.

but actually I’m right.  It IS SPARK_EXECUTOR_MEMORY… according to
bin/spark-class

… but, that’s not actually being used :-(

that setting is just flat out begin ignored and it’s just using 512MB.  So
all my jobs fail.

… and I write an ‘echo’ so I could trace the spark-class script to see what
the daemons are actually being run with and spark-class wasn’t being called
with and nothing is logged for the coarse grained executor.  I guess it’s
just inheriting the JVM opts from it’s parent and Java is launching the
process directly?

This is a nightmare :(

-- 

Founder/CEO Spinn3r.com
Location: *San Francisco, CA*
blog: http://burtonator.wordpress.com
… or check out my Google+ profile
https://plus.google.com/102718274791889610666/posts
http://spinn3r.com


Re: NullPointerException

2014-12-31 Thread rapelly kartheek
spark-1.0.0

On Thu, Jan 1, 2015 at 12:04 PM, Josh Rosen rosenvi...@gmail.com wrote:

 Which version of Spark are you using?

 On Wed, Dec 31, 2014 at 10:24 PM, rapelly kartheek 
 kartheek.m...@gmail.com wrote:

 Hi,
 I get this following Exception when I submit spark application that
 calculates the frequency of characters in a file. Especially, when I
 increase the size of data, I face this problem.

 Exception in thread Thread-47 org.apache.spark.SparkException: Job
 aborted due to stage failure: Task 11.0:10 failed 4 times, most recent
 failure: Exception failure in TID 295 on host s1:
 java.lang.NullPointerException
 org.apache.spark.storage.BlockManager.org
 $apache$spark$storage$BlockManager$$replicate(BlockManager.scala:786)

 org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:752)
 org.apache.spark.storage.BlockManager.put(BlockManager.scala:574)
 org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:108)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
 org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:229)

 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
 org.apache.spark.scheduler.Task.run(Task.scala:51)

 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 java.lang.Thread.run(Thread.java:745)
 Driver stacktrace:
 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
 at scala.Option.foreach(Option.scala:236)
 at
 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
 at
 org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 at akka.actor.ActorCell.invoke(ActorCell.scala:456)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


 Any help?

 Thank you!





Fwd: NullPointerException

2014-12-31 Thread rapelly kartheek
-- Forwarded message --
From: rapelly kartheek kartheek.m...@gmail.com
Date: Thu, Jan 1, 2015 at 12:05 PM
Subject: Re: NullPointerException
To: Josh Rosen rosenvi...@gmail.com, user@spark.apache.org


spark-1.0.0

On Thu, Jan 1, 2015 at 12:04 PM, Josh Rosen rosenvi...@gmail.com wrote:

 Which version of Spark are you using?

 On Wed, Dec 31, 2014 at 10:24 PM, rapelly kartheek 
 kartheek.m...@gmail.com wrote:

 Hi,
 I get this following Exception when I submit spark application that
 calculates the frequency of characters in a file. Especially, when I
 increase the size of data, I face this problem.

 Exception in thread Thread-47 org.apache.spark.SparkException: Job
 aborted due to stage failure: Task 11.0:10 failed 4 times, most recent
 failure: Exception failure in TID 295 on host s1:
 java.lang.NullPointerException
 org.apache.spark.storage.BlockManager.org
 $apache$spark$storage$BlockManager$$replicate(BlockManager.scala:786)

 org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:752)
 org.apache.spark.storage.BlockManager.put(BlockManager.scala:574)
 org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:108)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
 org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:229)

 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
 org.apache.spark.scheduler.Task.run(Task.scala:51)

 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 java.lang.Thread.run(Thread.java:745)
 Driver stacktrace:
 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
 at scala.Option.foreach(Option.scala:236)
 at
 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
 at
 org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 at akka.actor.ActorCell.invoke(ActorCell.scala:456)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


 Any help?

 Thank you!





Re: spark ignoring all memory settings and defaulting to 512MB?

2014-12-31 Thread Kevin Burton
wow. Just figured it out:

conf.set( spark.executor.memory, 2g);

I have to set it in the Job… that’s really counter intuitive.  Especially
because the documentation in spark-env.sh says the exact opposite.

What’s the resolution here.  This seems like a mess. I’d propose a solution
to clean it up but I don’t know where to begin.

On Wed, Dec 31, 2014 at 10:35 PM, Kevin Burton bur...@spinn3r.com wrote:

 This is really weird and I’m surprised no one has found this issue yet.

 I’ve spent about an hour or more trying to debug this :-(

 My spark install is ignoring ALL my memory settings.  And of course my job
 is running out of memory.

 The default is 512MB so pretty darn small.

 The worker and master start up and both use 512M

 This alone is very weird and poor documentation IMO because:

  SPARK_WORKER_MEMORY, to set how much total memory workers have to give
 executors (e.g. 1000m, 2g)”

 … so if it’s giving it to executors, AKA the memory executors run with,
 then it should be SPARK_EXECUTOR_MEMORY…

 … and the worker actually uses SPARK_DAEMON memory.

 but actually I’m right.  It IS SPARK_EXECUTOR_MEMORY… according to
 bin/spark-class

 … but, that’s not actually being used :-(

 that setting is just flat out begin ignored and it’s just using 512MB.  So
 all my jobs fail.

 … and I write an ‘echo’ so I could trace the spark-class script to see
 what the daemons are actually being run with and spark-class wasn’t being
 called with and nothing is logged for the coarse grained executor.  I guess
 it’s just inheriting the JVM opts from it’s parent and Java is launching
 the process directly?

 This is a nightmare :(

 --

 Founder/CEO Spinn3r.com
 Location: *San Francisco, CA*
 blog: http://burtonator.wordpress.com
 … or check out my Google+ profile
 https://plus.google.com/102718274791889610666/posts
 http://spinn3r.com




-- 

Founder/CEO Spinn3r.com
Location: *San Francisco, CA*
blog: http://burtonator.wordpress.com
… or check out my Google+ profile
https://plus.google.com/102718274791889610666/posts
http://spinn3r.com


Re: NullPointerException

2014-12-31 Thread Josh Rosen
It looks like 'null' might be selected as a block replication peer?
https://github.com/apache/spark/blob/v1.0.0/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L786

I know that we fixed some replication bugs in newer versions of Spark (such
as https://github.com/apache/spark/pull/2366), so it's possible that this
issue would be resolved by updating.  Can you try re-running your job with
a newer Spark version to see whether you still see the same error?

On Wed, Dec 31, 2014 at 10:35 PM, rapelly kartheek kartheek.m...@gmail.com
wrote:

 spark-1.0.0

 On Thu, Jan 1, 2015 at 12:04 PM, Josh Rosen rosenvi...@gmail.com wrote:

 Which version of Spark are you using?

 On Wed, Dec 31, 2014 at 10:24 PM, rapelly kartheek 
 kartheek.m...@gmail.com wrote:

 Hi,
 I get this following Exception when I submit spark application that
 calculates the frequency of characters in a file. Especially, when I
 increase the size of data, I face this problem.

 Exception in thread Thread-47 org.apache.spark.SparkException: Job
 aborted due to stage failure: Task 11.0:10 failed 4 times, most recent
 failure: Exception failure in TID 295 on host s1:
 java.lang.NullPointerException
 org.apache.spark.storage.BlockManager.org
 $apache$spark$storage$BlockManager$$replicate(BlockManager.scala:786)

 org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:752)
 org.apache.spark.storage.BlockManager.put(BlockManager.scala:574)

 org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:108)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
 org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:229)

 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
 org.apache.spark.scheduler.Task.run(Task.scala:51)

 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 java.lang.Thread.run(Thread.java:745)
 Driver stacktrace:
 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
 at scala.Option.foreach(Option.scala:236)
 at
 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
 at
 org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 at akka.actor.ActorCell.invoke(ActorCell.scala:456)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


 Any help?

 Thank you!






Re: NullPointerException

2014-12-31 Thread rapelly kartheek
Ok. Let me try out on a newer version.

Thank you!!

On Thu, Jan 1, 2015 at 12:17 PM, Josh Rosen rosenvi...@gmail.com wrote:

 It looks like 'null' might be selected as a block replication peer?
 https://github.com/apache/spark/blob/v1.0.0/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L786

 I know that we fixed some replication bugs in newer versions of Spark
 (such as https://github.com/apache/spark/pull/2366), so it's possible
 that this issue would be resolved by updating.  Can you try re-running your
 job with a newer Spark version to see whether you still see the same error?

 On Wed, Dec 31, 2014 at 10:35 PM, rapelly kartheek 
 kartheek.m...@gmail.com wrote:

 spark-1.0.0

 On Thu, Jan 1, 2015 at 12:04 PM, Josh Rosen rosenvi...@gmail.com wrote:

 Which version of Spark are you using?

 On Wed, Dec 31, 2014 at 10:24 PM, rapelly kartheek 
 kartheek.m...@gmail.com wrote:

 Hi,
 I get this following Exception when I submit spark application that
 calculates the frequency of characters in a file. Especially, when I
 increase the size of data, I face this problem.

 Exception in thread Thread-47 org.apache.spark.SparkException: Job
 aborted due to stage failure: Task 11.0:10 failed 4 times, most recent
 failure: Exception failure in TID 295 on host s1:
 java.lang.NullPointerException
 org.apache.spark.storage.BlockManager.org
 $apache$spark$storage$BlockManager$$replicate(BlockManager.scala:786)

 org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:752)

 org.apache.spark.storage.BlockManager.put(BlockManager.scala:574)

 org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:108)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
 org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:229)

 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
 org.apache.spark.scheduler.Task.run(Task.scala:51)

 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 java.lang.Thread.run(Thread.java:745)
 Driver stacktrace:
 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
 at scala.Option.foreach(Option.scala:236)
 at
 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
 at
 org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 at akka.actor.ActorCell.invoke(ActorCell.scala:456)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


 Any help?

 Thank you!







Re: spark ignoring all memory settings and defaulting to 512MB?

2014-12-31 Thread Ilya Ganelin
Welcome to Spark. What's more fun is that setting controls memory on the
executors but if you want to set memory limit on the driver you need to
configure it as a parameter of the spark-submit script. You also set
num-executors and executor-cores on the spark submit call.

See both the Spark tuning guide and the Spark configuration page for more
discussion of stuff like this.

W.r.t. The spark memory option, my understanding is that parameter has been
deprecated (the SPARK_EXE_MEM) and the documentation is probably stale.
Good starting point for cleanup would probably be to update that :-).
On Thu, Jan 1, 2015 at 1:45 AM Kevin Burton bur...@spinn3r.com wrote:

 wow. Just figured it out:

 conf.set( spark.executor.memory, 2g);

 I have to set it in the Job… that’s really counter intuitive.  Especially
 because the documentation in spark-env.sh says the exact opposite.

 What’s the resolution here.  This seems like a mess. I’d propose a
 solution to clean it up but I don’t know where to begin.

 On Wed, Dec 31, 2014 at 10:35 PM, Kevin Burton bur...@spinn3r.com wrote:

 This is really weird and I’m surprised no one has found this issue yet.

 I’ve spent about an hour or more trying to debug this :-(

 My spark install is ignoring ALL my memory settings.  And of course my
 job is running out of memory.

 The default is 512MB so pretty darn small.

 The worker and master start up and both use 512M

 This alone is very weird and poor documentation IMO because:

  SPARK_WORKER_MEMORY, to set how much total memory workers have to give
 executors (e.g. 1000m, 2g)”

 … so if it’s giving it to executors, AKA the memory executors run with,
 then it should be SPARK_EXECUTOR_MEMORY…

 … and the worker actually uses SPARK_DAEMON memory.

 but actually I’m right.  It IS SPARK_EXECUTOR_MEMORY… according to
 bin/spark-class

 … but, that’s not actually being used :-(

 that setting is just flat out begin ignored and it’s just using 512MB.
 So all my jobs fail.

 … and I write an ‘echo’ so I could trace the spark-class script to see
 what the daemons are actually being run with and spark-class wasn’t being
 called with and nothing is logged for the coarse grained executor.  I guess
 it’s just inheriting the JVM opts from it’s parent and Java is launching
 the process directly?

 This is a nightmare :(

 --

 Founder/CEO Spinn3r.com
 Location: *San Francisco, CA*
 blog: http://burtonator.wordpress.com
 … or check out my Google+ profile
 https://plus.google.com/102718274791889610666/posts
 http://spinn3r.com




 --

 Founder/CEO Spinn3r.com
 Location: *San Francisco, CA*
 blog: http://burtonator.wordpress.com
 … or check out my Google+ profile
 https://plus.google.com/102718274791889610666/posts
 http://spinn3r.com




Re: Big performance difference between client and cluster deployment mode; is this expected?

2014-12-31 Thread Enno Shioji
Hi Tathagata,

It's a standalone cluster. The submit commands are:

== CLIENT
spark-submit --class com.fake.Test \
--deploy-mode client --master spark://fake.com:7077 \
fake.jar arguments

== CLUSTER
 spark-submit --class com.fake.Test \
 --deploy-mode cluster --master spark://fake.com:7077 \
 s3n://fake.jar arguments

And they are both occupying all available slots. (8 * 2 machine = 16 slots).


ᐧ

On Thu, Jan 1, 2015 at 12:21 AM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 Whats your spark-submit commands in both cases? Is it Spark Standalone or
 YARN (both support client and cluster)? Accordingly what is the number of
 executors/cores requested?

 TD

 On Wed, Dec 31, 2014 at 10:36 AM, Enno Shioji eshi...@gmail.com wrote:

 Also the job was deployed from the master machine in the cluster.

 On Wed, Dec 31, 2014 at 6:35 PM, Enno Shioji eshi...@gmail.com wrote:

 Oh sorry that was a edit mistake. The code is essentially:

  val msgStream = kafkaStream
.map { case (k, v) = v}
.map(DatatypeConverter.printBase64Binary)
.saveAsTextFile(s3n://some.bucket/path, classOf[LzoCodec])

 I.e. there is essentially no original code (I was calling saveAsTextFile
 in a save function but that was just a remnant from previous debugging).



 On Wed, Dec 31, 2014 at 6:21 PM, Sean Owen so...@cloudera.com wrote:

 -dev, +user

 A decent guess: Does your 'save' function entail collecting data back
 to the driver? and are you running this from a machine that's not in
 your Spark cluster? Then in client mode you're shipping data back to a
 less-nearby machine, compared to with cluster mode. That could explain
 the bottleneck.

 On Wed, Dec 31, 2014 at 4:12 PM, Enno Shioji eshi...@gmail.com wrote:
  Hi,
 
  I have a very, very simple streaming job. When I deploy this on the
 exact
  same cluster, with the exact same parameters, I see big (40%)
 performance
  difference between client and cluster deployment mode. This seems
 a bit
  surprising.. Is this expected?
 
  The streaming job is:
 
  val msgStream = kafkaStream
.map { case (k, v) = v}
.map(DatatypeConverter.printBase64Binary)
.foreachRDD(save)
.saveAsTextFile(s3n://some.bucket/path, classOf[LzoCodec])
 
  I tried several times, but the job deployed with client mode can
 only
  write at 60% throughput of the job deployed with cluster mode and
 this
  happens consistently. I'm logging at INFO level, but my application
 code
  doesn't log anything so it's only Spark logs. The logs I see in
 client
  mode doesn't seem like a crazy amount.
 
  The setup is:
  spark-ec2 [...] \
--copy-aws-credentials \
--instance-type=m3.2xlarge \
-s 2 launch test_cluster
 
  And all the deployment was done from the master machine.
 
  ᐧ