spark streaming kafka output

2014-05-05 Thread Weide Zhang
Hi ,

Is there any code to implement a kafka output for spark streaming? My use
case is all the output need to be dumped back to kafka cluster again after
data is processed ?  What will be guideline to implement such function ? I
heard foreachRDD will create one instance of producer per batch ? If so,
will that hurt performance ?

Thanks,

Weide


Re: NoSuchMethodError: breeze.linalg.DenseMatrix

2014-05-05 Thread DB Tsai
Since the breeze jar is brought into spark by mllib package, you may want
to add mllib as your dependency in spark 1.0. For bring it from your
application yourself, you can either use sbt assembly in ur build project
to generate a flat myApp-assembly.jar which contains breeze jar, or use
spark add jar api like Yadid said.


Sincerely,

DB Tsai
---
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Sun, May 4, 2014 at 10:24 PM, wxhsdp wxh...@gmail.com wrote:

 Hi, DB, i think it's something related to sbt publishLocal

 if i remove the breeze dependency in my sbt file, breeze can not be found

 [error] /home/wxhsdp/spark/example/test/src/main/scala/test.scala:5: not
 found: object breeze
 [error] import breeze.linalg._
 [error]^

 here's my sbt file:

 name := Build Project

 version := 1.0

 scalaVersion := 2.10.4

 libraryDependencies += org.apache.spark %% spark-core %
 1.0.0-SNAPSHOT

 resolvers += Akka Repository at http://repo.akka.io/releases/;

 i run sbt publishLocal on the Spark tree.

 but if i manully put spark-assembly-1.0.0-SNAPSHOT-hadoop1.0.4.jar in /lib
 directory, sbt package is
 ok, i can run my app in workers without addJar

 what's the difference between add dependency in sbt after sbt
 publishLocal
 and manully put spark-assembly-1.0.0-SNAPSHOT-hadoop1.0.4.jar in /lib
 directory?

 why can i run my app in worker without addJar this time?


 DB Tsai-2 wrote
  If you add the breeze dependency in your build.sbt project, it will not
 be
  available to all the workers.
 
  There are couple options, 1) use sbt assembly to package breeze into your
  application jar. 2) manually copy breeze jar into all the nodes, and have
  them in the classpath. 3) spark 1.0 has breeze jar in the spark flat
  assembly jar, so you don't need to add breeze dependency yourself.
 
 
  Sincerely,
 
  DB Tsai
  ---
  My Blog: https://www.dbtsai.com
  LinkedIn: https://www.linkedin.com/in/dbtsai
 
 
  On Sun, May 4, 2014 at 4:07 AM, wxhsdp lt;

  wxhsdp@

  gt; wrote:
 
  Hi,
i'am trying to use breeze linalg library for matrix operation in my
  spark
  code. i already add dependency
on breeze in my build.sbt, and package my code sucessfully.
 
when i run on local mode, sbt run local..., everything is ok
 
but when turn to standalone mode, sbt run spark://127.0.0.1:7077
 ...,
  error occurs
 
  14/05/04 18:56:29 WARN scheduler.TaskSetManager: Loss was due to
  java.lang.NoSuchMethodError
  java.lang.NoSuchMethodError:
 
 
 breeze.linalg.DenseMatrix$.implOpMulMatrix_DMD_DMD_eq_DMD()Lbreeze/linalg/operators/DenseMatrixMultiplyStuff$implOpMulMatrix_DMD_DMD_eq_DMD$;
 
in my opinion, everything needed is packaged to the jar file, isn't
 it?
and does anyone used breeze before? is it good for matrix operation?
 
 
 
  --
  View this message in context:
 
 http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodError-breeze-linalg-DenseMatrix-tp5310.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodError-breeze-linalg-DenseMatrix-tp5310p5355.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Check your cluster UI to ensure that workers are registered and have sufficient memory

2014-05-05 Thread Sai Prasanna
I executed the following commands to launch spark app with yarn client
mode. I have Hadoop 2.3.0, Spark 0.8.1 and Scala 2.9.3

SPARK_HADOOP_VERSION=2.3.0 SPARK_YARN=true sbt/sbt assembly

SPARK_YARN_MODE=true \
SPARK_JAR=./assembly/target/scala-2.9.3/spark-assembly-0.8.1-incubating-hadoop2.3.0.jar
\
SPARK_YARN_APP_JAR=examples/target/scala-2.9.3/spark-examples-assembly-0.8.1-incubating.jar
MASTER=yarn-client ./spark-shell

The spark context in the interactive shell is set properly, but after that
when i submit jobs, it tells that the application has not received any
resources.

LOGS:
DAGScheduler: Submitting 4 missing tasks from Stage 0 (MappedRDD[1] at
textFile at console:12)
YarnClientClusterScheduler: Adding task set 0.0 with 4 tasks
WARN YarnClientClusterScheduler: Initial job has not accepted any
resources; check your cluster UI to ensure that workers are registered and
have sufficient memory

What have i missed, i did start spark master and worker and have configured
SPARK_MEM.

Any help will be great !!


unsibscribe

2014-05-05 Thread Konstantin Kudryavtsev
unsibscribe

Thank you,
Konstantin Kudryavtsev


unsubscribe

2014-05-05 Thread Shubhabrata Roy

unsubscribe


Re: Cache issue for iteration with broadcast

2014-05-05 Thread Earthson
.set(spark.cleaner.ttl, 120) drops broadcast_0 which makes a Exception
below. It is strange, because broadcast_0 is no need, and I have broadcast_3
instead, and recent RDD is persisted, there is no need for recomputing...
what is the problem? need help.


~~~
14/05/05 17:03:12 INFO storage.MemoryStore: ensureFreeSpace(52474562) called
with curMem=145126640, maxMem=1145359564
14/05/05 17:03:12 INFO storage.MemoryStore: Block broadcast_3 stored as
values to memory (estimated size 50.0 MB, free 903.9 MB)
14/05/05 17:03:12 INFO scheduler.DAGScheduler: shuffleToMapStage 0 -- 0
14/05/05 17:03:12 INFO scheduler.DAGScheduler: stageIdToStage 0 -- 0
14/05/05 17:03:12 INFO scheduler.DAGScheduler: stageIdToJobIds 0 -- 0
~

Exception in thread Thread-3 java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:154)
Caused by: org.apache.spark.SparkException: Job aborted: Task 9.0:48 failed
4 times (most recent failure: Exception failure:
java.io.FileNotFoundException: http://192.168.7.41:3503/broadcast_0)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Cache-issue-for-iteration-with-broadcast-tp5350p5364.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Is any idea on architecture based on Spark + Spray + Akka

2014-05-05 Thread Quintus Zhou
Hi, Yi

Your project sounds interesting to me, Im also working on 3g4g communication 
domain, besides Ive also done a tiny project based on hadoop, which analyzes 
execution logs.   Recently, Im planed to pick it up again. So, if you don't 
mind, may i know the   introduction of your log analyzing project. 

Regards
Yuding


Sent from my iPhone

On 2014-5-5, at 11:37, ZhangYi yizh...@thoughtworks.com wrote:

 Hi all,
 
 Currently, our project is planning to adopt spark to be big data platform. 
 For the client side, we decide expose REST api based on Spray. Our domain is 
 focus on communication field for 3G and 4G user of processing some data 
 analyst and statictics . Now, Spark + Spray is brand new for us, and we can't 
 find any best practice via google. 
 
 In our opinion, event-driven architecture is good choice for our project 
 maybe. However, more idea is welcome. Thanks.  
 
 -- 
 ZhangYi (张逸)
 Developer
 tel: 15023157626
 blog: agiledon.github.com
 weibo: tw张逸
 Sent with Sparrow
 


unsibscribe

2014-05-05 Thread Chhaya Vishwakarma
unsibscribe

Regards,
Chhaya Vishwakarma



The contents of this e-mail and any attachment(s) may contain confidential or 
privileged information for the intended recipient(s). Unintended recipients are 
prohibited from taking action on the basis of information in this e-mail and 
using or disseminating the information, and must notify the sender and delete 
it from their system. LT Infotech will not accept responsibility or liability 
for the accuracy or completeness of, or the presence of any virus or disabling 
code in this e-mail


java.io.FileNotFoundException: /test/spark-0.9.1/work/app-20140505053550-0000/2/stdout (No such file or directory)

2014-05-05 Thread Francis . Hu
Hi,All

 

 

We run a spark cluster with three workers. 

created a spark streaming application,

then run the spark project using below command:

 

shell sbt run spark://192.168.219.129:7077 tcp://192.168.20.118:5556 foo

 

we looked at the webui of workers, jobs failed without any error or info,
but FileNotFoundException occurred in workers' log file as below:

Is this an existent issue of spark? 

 

 

-in workers'
logs/spark-francis-org.apache.spark.deploy.worker.Worker-1-ubuntu-4.out-
---

 

14/05/05 02:39:39 WARN AbstractHttpConnection:
/logPage/?appId=app-20140505053550-executorId=2logType=stdout

java.io.FileNotFoundException:
/test/spark-0.9.1/work/app-20140505053550-/2/stdout (No such file or
directory)

at java.io.FileInputStream.open(Native Method)

at java.io.FileInputStream.init(FileInputStream.java:138)

at org.apache.spark.util.Utils$.offsetBytes(Utils.scala:687)

at
org.apache.spark.deploy.worker.ui.WorkerWebUI.logPage(WorkerWebUI.scala:119)

at
org.apache.spark.deploy.worker.ui.WorkerWebUI$$anonfun$6.apply(WorkerWebUI.s
cala:52)

at
org.apache.spark.deploy.worker.ui.WorkerWebUI$$anonfun$6.apply(WorkerWebUI.s
cala:52)

at
org.apache.spark.ui.JettyUtils$$anon$1.handle(JettyUtils.scala:61)

at
org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java
:1040)

at
org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:
976)

at
org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:135
)

at
org.eclipse.jetty.server.handler.HandlerList.handle(HandlerList.java:52)

at
org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:1
16)

at org.eclipse.jetty.server.Server.handle(Server.java:363)

at
org.eclipse.jetty.server.AbstractHttpConnection.handleRequest(AbstractHttpCo
nnection.java:483)

at
org.eclipse.jetty.server.AbstractHttpConnection.headerComplete(AbstractHttpC
onnection.java:920)

at
org.eclipse.jetty.server.AbstractHttpConnection$RequestHandler.headerComplet
e(AbstractHttpConnection.java:982)

at org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:635)

at
org.eclipse.jetty.http.HttpParser.parseAvailable(HttpParser.java:235)

at
org.eclipse.jetty.server.AsyncHttpConnection.handle(AsyncHttpConnection.java
:82)

at
org.eclipse.jetty.io.nio.SelectChannelEndPoint.handle(SelectChannelEndPoint.
java:628)

at
org.eclipse.jetty.io.nio.SelectChannelEndPoint$1.run(SelectChannelEndPoint.j
ava:52)

at
org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:
608)

at
org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:5
43)

at java.lang.Thread.run(Thread.java:722)

14/05/05 02:39:41 WARN AbstractHttpConnection:
/logPage/?appId=app-20140505053550-executorId=9logType=stderr

java.io.FileNotFoundException:
/test/spark-0.9.1/work/app-20140505053550-/9/stderr (No such file or
directory)

at java.io.FileInputStream.open(Native Method)

at java.io.FileInputStream.init(FileInputStream.java:138)

at org.apache.spark.util.Utils$.offsetBytes(Utils.scala:687)

at
org.apache.spark.deploy.worker.ui.WorkerWebUI.logPage(WorkerWebUI.scala:119)

at
org.apache.spark.deploy.worker.ui.WorkerWebUI$$anonfun$6.apply(WorkerWebUI.s
cala:52)

at
org.apache.spark.deploy.worker.ui.WorkerWebUI$$anonfun$6.apply(WorkerWebUI.s
cala:52)

at
org.apache.spark.ui.JettyUtils$$anon$1.handle(JettyUtils.scala:61)

at
org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java
:1040)

at
org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:
976)

at
org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:135
)

at
org.eclipse.jetty.server.handler.HandlerList.handle(HandlerList.java:52)

at
org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:1
16)

at org.eclipse.jetty.server.Server.handle(Server.java:363)

at
org.eclipse.jetty.server.AbstractHttpConnection.handleRequest(AbstractHttpCo
nnection.java:483)

at
org.eclipse.jetty.server.AbstractHttpConnection.headerComplete(AbstractHttpC
onnection.java:920)

at
org.eclipse.jetty.server.AbstractHttpConnection$RequestHandler.headerComplet
e(AbstractHttpConnection.java:982)

at org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:635)

at
org.eclipse.jetty.http.HttpParser.parseAvailable(HttpParser.java:235)

at
org.eclipse.jetty.server.AsyncHttpConnection.handle(AsyncHttpConnection.java
:82)

at
org.eclipse.jetty.io.nio.SelectChannelEndPoint.handle(SelectChannelEndPoint.
java:628)

at
org.eclipse.jetty.io.nio.SelectChannelEndPoint$1.run(SelectChannelEndPoint.j

Re: Cache issue for iteration with broadcast

2014-05-05 Thread Earthson
Using checkpoint. It removes dependences:)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Cache-issue-for-iteration-with-broadcast-tp5350p5368.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Cache issue for iteration with broadcast

2014-05-05 Thread Earthson
RDD.checkpoint works fine. But spark.cleaner.ttl is really ugly for broadcast
cleaning. May be it could be removed automatically when no dependences.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Cache-issue-for-iteration-with-broadcast-tp5350p5369.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Cache issue for iteration with broadcast

2014-05-05 Thread Cheng Lian
Have you tried Broadcast.unpersist()?


On Mon, May 5, 2014 at 6:34 PM, Earthson earthson...@gmail.com wrote:

 RDD.checkpoint works fine. But spark.cleaner.ttl is really ugly for
 broadcast
 cleaning. May be it could be removed automatically when no dependences.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Cache-issue-for-iteration-with-broadcast-tp5350p5369.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Spark Streaming and JMS

2014-05-05 Thread Patrick McGloin
Hi all,

Is there a best practice for subscribing to JMS with Spark Streaming?  I
have searched but not found anything conclusive.

In the absence of a standard practice the solution I was thinking of was to
use Akka + Camel (akka.camel.Consumer) to create a subscription for a Spark
Streaming Custom Receiver.  So the actor would look something like this:

class JmsReceiver(jmsURI: String) extends NetworkReceiver[String] with
Consumer {
  //e.g. jms:sonicmq://localhost:2506/queue?destination=SampleQ1
  def endpointUri = jmsURI
  lazy val blockGenerator = new BlockGenerator(StorageLevel.MEMORY_ONLY_SER)

  protected override def onStart() {
blockGenerator.start
  }

  def receive = {
case msg: CamelMessage = { blockGenerator += msg.body }
case _ = { /* ... */ }
  }

  protected override def onStop() {
blockGenerator.stop
  }
}

And then in the main application create receivers like this:

val ssc = new StreamingContext(...)
object tascQueue extends JmsReceiver[String](ssc) {
override def getReceiver():JmsReceiver[String] = {
new JmsReceiver(jms:sonicmq://localhost:2506/queue?destination=TascQueue)
}
}
ssc.registerInputStream(tascQueue)

Is this the best way to go?

Best regards,
Patrick


Re: master attempted to re-register the worker and then took all workers as unregistered

2014-05-05 Thread Nan Zhu
Ah, I think this should be fixed in 0.9.1?  

Did you see the exception is thrown in the worker side?

Best, 

-- 
Nan Zhu


On Sunday, May 4, 2014 at 10:15 PM, Cheney Sun wrote:

 Hi Nan, 
 
 Have you found a way to fix the issue? Now I run into the same problem with
 version 0.9.1.
 
 Thanks,
 Cheney
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/master-attempted-to-re-register-the-worker-and-then-took-all-workers-as-unregistered-tp553p5341.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com 
 (http://Nabble.com).
 
 




Re: Shark on cloudera CDH5 error

2014-05-05 Thread manas Kar
No replies yet. Guess everyone who had this problem knew the obvious reason
why the error occurred. 
It took me some time to figure out the work around though. 

It seems shark depends on
/var/lib/spark/shark-0.9.1/lib_managed/jars/org.apache.hadoop/hadoop-core/hadoop-core.jar
for client server communication.

CDH5 should rely on hadoop-core-2.3.0-mr1-cdh5.0.0.jar. 

1) Grab it from other CDH modules(I chose hadoop) and get this jar from it's
library. 
2) Remove the jar in
/var/lib/spark/shark-0.9.1/lib_managed/jars/org.apache.hadoop/hadoop-core
3) place the jar from(step1) in hadoop-core folder of step2.

Hope this saves some time for some one who has the similar problem.

..Manas




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Shark-on-cloudera-CDH5-error-tp5226p5374.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: configure spark history server for running on Yarn

2014-05-05 Thread Tom Graves
Since 1.0 is still in development you can pick up the latest docs in git: 
https://github.com/apache/spark/tree/branch-1.0/docs

I didn't see anywhere that you said you started the spark history server?

there are multiple things that need to happen for the spark history server to 
work.

1) configure your application to save the history logs - see the eventLog 
settings here 
https://github.com/apache/spark/blob/branch-1.0/docs/configuration.md

2) On yarn -  know the host/port where you are going to start the spark history 
server and configure: spark.yarn.historyServer.address to point to it.  Note 
that this purely makes the link from the ResourceManager UI properly point to 
the Spark History Server Daemon.

3) Start the spark history server pointing to the same directory as specified 
in your application (spark.eventLog.dir)

4) run your application. once it finishes then you can either go to the RM UI 
to link to the spark history UI or go directly to the spark history server ui.

Tom
On Thursday, May 1, 2014 7:09 PM, Jenny Zhao linlin200...@gmail.com wrote:
 
Hi,

I have installed spark 1.0 from the branch-1.0, build went fine, and I have 
tried running the example on Yarn client mode, here is my command: 

/home/hadoop/spark-branch-1.0/bin/spark-submit 
/home/hadoop/spark-branch-1.0/examples/target/scala-2.10/spark-examples-1.0.0-hadoop2.2.0.jar
 --master yarn --deploy-mode client --executor-memory 6g --executor-cores 3 
--driver-memory 3g --name SparkPi --num-executors 2 --class 
org.apache.spark.examples.SparkPi yarn-client 5

after the run, I was not being able to retrieve the log from Yarn's web UI, 
while I have tried to specify the history server in spark-env.sh 

export SPARK_DAEMON_JAVA_OPTS=-Dspark.yarn.historyServer.address=master:18080


I also tried to specify it in spark-defaults.conf, doesn't work as well, I 
would appreciate if someone can tell me what is the way of specifying it either 
in spark-env.sh or spark-defaults.conf, so that this option can be applied to 
any spark application. 


another thing I found is the usage output for spark-submit is not complete/not 
in sync with the online documentation, hope it is addressed with the formal 
release. 

and is this the latest documentation for spark 1.0? 
http://people.csail.mit.edu/matei/spark-unified-docs/running-on-yarn.html

Thank you! 

Spark GCE Script

2014-05-05 Thread Akhil Das
Hi Sparkers,

We have created a quick spark_gce script which can launch a spark cluster
in the Google Cloud. I'm sharing it because it might be helpful for someone
using the Google Cloud for deployment rather than AWS.

Here's the link to the script

https://github.com/sigmoidanalytics/spark_gce

Feel free to use it and suggest any feedback around it.

In short here's what it does:

Just like the spark_ec2 script, this one also reads certain command-line
arguments (See the github
pagehttps://github.com/sigmoidanalytics/spark_gce for
more details) like the cluster name and all, then starts the machines in
the google cloud, sets up the network, adds a 500GB empty disk to all
machines, generate the ssh keys on master and transfer it to all slaves and
install java and downloads and configures Spark/Shark/Hadoop. Also it
starts the shark server automatically. Currently the version is 0.9.1 but
I'm happy to add/support more versions if anyone is interested.


Cheers.


Thanks
Best Regards


Re: Using google cloud storage for spark big data

2014-05-05 Thread Akhil Das
Hi Aureliano,

You might want to check this script out,
https://github.com/sigmoidanalytics/spark_gce
Let me know if you need any help around that.

Thanks
Best Regards


On Tue, Apr 22, 2014 at 7:12 PM, Aureliano Buendia buendia...@gmail.comwrote:




 On Tue, Apr 22, 2014 at 10:50 AM, Andras Nemeth 
 andras.nem...@lynxanalytics.com wrote:

 We don't have anything fancy. It's basically some very thin layer of
 google specifics on top of a stand alone cluster. We basically created two
 disk snapshots, one for the master and one for the workers. The snapshots
 contain initialization scripts so that the master/worker daemons are
 started on boot. So if I want a cluster I just create a new instance (with
 a fixed name) using the master snapshot for the master. When it is up I
 start as many slave instances as I need using the slave snapshot. By the
 time the machines are up the cluster is ready to be used.


 This sounds like being a lot simpler than the existing spark-ec2 script.
 Does google compute engine api makes this happen in a simple way, when
 compared to ec2 api? Does your script do everything spark-ec2 does?

 Also, any plans to make this open source?


 Andras



 On Mon, Apr 21, 2014 at 10:04 PM, Mayur Rustagi 
 mayur.rust...@gmail.comwrote:

 Okay just commented on another thread :)
 I have one that I use internally. Can give it out but will need some
 support from you to fix bugs etc. Let me know if you are interested.

 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Fri, Apr 18, 2014 at 9:08 PM, Aureliano Buendia buendia...@gmail.com
  wrote:

 Thanks, Andras. What approach did you use to setup a spark cluster on
 google compute engine? Currently, there is no production-ready official
 support for an equivalent of spark-ec2 on gce. Did you roll your own?


 On Thu, Apr 17, 2014 at 10:24 AM, Andras Nemeth 
 andras.nem...@lynxanalytics.com wrote:

 Hello!

 On Wed, Apr 16, 2014 at 7:59 PM, Aureliano Buendia 
 buendia...@gmail.com wrote:

 Hi,

 Google has publisheed a new connector for hadoop: google cloud
 storage, which is an equivalent of amazon s3:


 googlecloudplatform.blogspot.com/2014/04/google-bigquery-and-datastore-connectors-for-hadoop.html

 This is actually about Cloud Datastore and not Cloud Storage (yeah,
 quite confusing naming ;) ). But they do already have for a while a cloud
 storage connector, also linked from your article:
 https://developers.google.com/hadoop/google-cloud-storage-connector




 How can spark be configured to use this connector?

 Yes, it can, but in a somewhat hacky way. The problem is that for some
 reason Google does not officially publish the library jar alone, you get 
 it
 installed as part of a Hadoop on Google Cloud installation. So, the
 official way would be (we did not try that) to have a Hadoop on Google
 Cloud installation and run spark on top of that.

 The other option - that we did try and which works fine for us - is to
 snatch the jar:
 https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-1.2.4.jar,
 make sure it's shipped to your workers (e.g. with setJars on SparkConf 
 when
 you create your SparkContext). Then create a core-site.xml file which you
 make sure is on the classpath both in your driver and your cluster (e.g.
 you can make sure it ends up in one of the jars you send with setJars
 above) with this content (with YOUR_* replaced):
 configuration

 propertynamefs.gs.impl/namevaluecom.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem/value/property
   propertynamefs.gs.project.id
 /namevalueYOUR_PROJECT_ID/value/property

 propertynamefs.gs.system.bucket/namevalueYOUR_FAVORITE_BUCKET/value/property
 /configuration

 From this point on you can simply use gs://... filenames to read/write
 data on Cloud Storage.

 Note that you should run your cluster and driver program on Google
 Compute Engine for this to work as is. Probably it's possible to configure
 access from the outside too but we didn't do that.

 Hope this helps,
 Andras











Caused by: java.lang.OutOfMemoryError: unable to create new native thread

2014-05-05 Thread Soumya Simanta
I just upgraded my Spark version to 1.0.0_SNAPSHOT.


commit f25ebed9f4552bc2c88a96aef06729d9fc2ee5b3

Author: witgo wi...@qq.com

Date:   Fri May 2 12:40:27 2014 -0700


I'm running a standalone cluster with 3 workers.

   - *Workers:* 3
   - *Cores:* 48 Total, 0 Used
   - *Memory:* 469.8 GB Total, 0.0 B Used

However, when I try to run bin/spark-shell I get the following error after
sometime even if I don't perform any operations on the Spark shell.




14/05/05 10:20:52 INFO RemoteActorRefProvider$RemotingTerminator: Remoting
shut down.

Exception in thread main java.lang.reflect.InvocationTargetException

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:622)

at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:256)

at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:54)

at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

*Caused by: java.lang.OutOfMemoryError: unable to create new native thread*

at java.lang.Thread.start0(Native Method)

at java.lang.Thread.start(Thread.java:679)

at java.lang.UNIXProcess$1.run(UNIXProcess.java:157)

at java.security.AccessController.doPrivileged(Native Method)

at java.lang.UNIXProcess.init(UNIXProcess.java:119)

at java.lang.ProcessImpl.start(ProcessImpl.java:81)

at java.lang.ProcessBuilder.start(ProcessBuilder.java:470)

at java.lang.Runtime.exec(Runtime.java:612)

at java.lang.Runtime.exec(Runtime.java:485)

at
scala.tools.jline.internal.TerminalLineSettings.exec(TerminalLineSettings.java:178)

at
scala.tools.jline.internal.TerminalLineSettings.exec(TerminalLineSettings.java:168)

at
scala.tools.jline.internal.TerminalLineSettings.stty(TerminalLineSettings.java:163)

at
scala.tools.jline.internal.TerminalLineSettings.get(TerminalLineSettings.java:67)

at
scala.tools.jline.internal.TerminalLineSettings.getProperty(TerminalLineSettings.java:87)

at scala.tools.jline.UnixTerminal.readVirtualKey(UnixTerminal.java:127)

at
scala.tools.jline.console.ConsoleReader.readVirtualKey(ConsoleReader.java:933)

at
org.apache.spark.repl.SparkJLineReader$JLineConsoleReader.readOneKey(SparkJLineReader.scala:54)

at
org.apache.spark.repl.SparkJLineReader.readOneKey(SparkJLineReader.scala:81)

at
scala.tools.nsc.interpreter.InteractiveReader$class.readYesOrNo(InteractiveReader.scala:29)

at
org.apache.spark.repl.SparkJLineReader.readYesOrNo(SparkJLineReader.scala:25)

at org.apache.spark.repl.SparkILoop$$anonfun$1.org
$apache$spark$repl$SparkILoop$$anonfun$$fn$1(SparkILoop.scala:576)

at
org.apache.spark.repl.SparkILoop$$anonfun$1$$anonfun$org$apache$spark$repl$SparkILoop$$anonfun$$fn$1$1.apply$mcZ$sp(SparkILoop.scala:576)

at
scala.tools.nsc.interpreter.InteractiveReader$class.readYesOrNo(InteractiveReader.scala:32)

at
org.apache.spark.repl.SparkJLineReader.readYesOrNo(SparkJLineReader.scala:25)

at org.apache.spark.repl.SparkILoop$$anonfun$1.org
$apache$spark$repl$SparkILoop$$anonfun$$fn$1(SparkILoop.scala:576)

at
org.apache.spark.repl.SparkILoop$$anonfun$1$$anonfun$org$apache$spark$repl$SparkILoop$$anonfun$$fn$1$1.apply$mcZ$sp(SparkILoop.scala:576)

at
scala.tools.nsc.interpreter.InteractiveReader$class.readYesOrNo(InteractiveReader.scala:32)

at
org.apache.spark.repl.SparkJLineReader.readYesOrNo(SparkJLineReader.scala:25)

at org.apache.spark.repl.SparkILoop$$anonfun$1.org
$apache$spark$repl$SparkILoop$$anonfun$$fn$1(SparkILoop.scala:576)

at
org.apache.spark.repl.SparkILoop$$anonfun$1.applyOrElse(SparkILoop.scala:579)

at
org.apache.spark.repl.SparkILoop$$anonfun$1.applyOrElse(SparkILoop.scala:566)

at
scala.runtime.AbstractPartialFunction$mcZL$sp.apply$mcZL$sp(AbstractPartialFunction.scala:33)

at
scala.runtime.AbstractPartialFunction$mcZL$sp.apply(AbstractPartialFunction.scala:33)

at
scala.runtime.AbstractPartialFunction$mcZL$sp.apply(AbstractPartialFunction.scala:25)

at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:608)

at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:611)

at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:936)

at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884)

at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884)

at
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)

at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:884)

at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:983)

at org.apache.spark.repl.Main$.main(Main.scala:31)

at org.apache.spark.repl.Main.main(Main.scala)

... 7 more


RE: another updateStateByKey question - updated w possible Spark bug

2014-05-05 Thread Adrian Mocanu
I’ve encountered this issue again and am able to reproduce it about 10% of the 
time.

1. Here is the input:
RDD[ (a, 126232566, 1), (a, 126232566, 2) ]
RDD[ (a, 126232566, 1), (a, 126232566, 3) ]
RDD[ (a, 126232566, 3) ]
RDD[ (a, 126232566, 4) ]
RDD[ (a, 126232566, 2) ]
RDD[ (a, 126232566, 5), (a, 126232566, 5) ]

2. Here are the actual results (printed DStream – each line is a new RDD with 
RDD Id being the last number on each line):
(((a,126232566),StateClass(3,2,ArrayBuffer(1.0, 2.0))),6)
(((a,126232566),StateClass(7,4,ArrayBuffer(1.0, 3.0))),13)
(((a,126232566),StateClass(10,5,ArrayBuffer(3.0))),20)
(((a,126232566),StateClass(10,5,ArrayBuffer())),26)   -empty elements 
Seq[V]
(((a,126232566),StateClass(14,6,ArrayBuffer(4.0))),33)
(((a,126232566),StateClass(16,7,ArrayBuffer(2.0))),40)
(((a,126232566),StateClass(26,9,ArrayBuffer(5.0, 5.0))),47)
(((a,126232566),StateClass(26,9,ArrayBuffer())),53)  -empty elements Seq[V]
(((a,126232566),StateClass(26,9,ArrayBuffer())),59)  -empty elements Seq[V]

3. Here are the expected results: (all tuples from #2 except those with empty 
Seq[V] )
(((a,126232566),StateClass(3,2,ArrayBuffer(1.0, 2.0))),6)
(((a,126232566),StateClass(7,4,ArrayBuffer(1.0, 3.0))),13)
(((a,126232566),StateClass(10,5,ArrayBuffer(3.0))),20)
(((a,126232566),StateClass(14,6,ArrayBuffer(4.0))),33)
(((a,126232566),StateClass(16,7,ArrayBuffer(2.0))),40)
(((a,126232566),StateClass(26,9,ArrayBuffer(5.0, 5.0))),47)

4. Here is the code:
case class StateClass(sum:Integer, count:Integer, elements:Seq[Double])

val updateSumFunc = (values: Seq[(String, Long, Int)], state: 
Option[StateClass]) = {
//  if (values.isEmpty) {
//// if RDD cannot find values for this key (which is from prev RDD,
//// the tuple will not be shown in this RDD w values of 0
//None
//  } else {
val previousState = state.getOrElse(StateClass(0, 0, Seq()))
val currentCount = values.size + previousState.count
var currentSum=0
for (newValue - values) yield ({
  currentSum = currentSum + newValue._3
})
currentSum= currentSum +previousState.sum
val elements = for (newValues - values) yield ({
  newValues._3.toDouble
})
Some(StateClass(currentSum, currentCount, elements))
//  }
}

val partialResultSums= inputStream.map((x:(String, Long, Int)) =((x._1), 
(x._1, x._2, x._3)))  //re map
.updateStateByKey[StateClass](updateSumFunc)  //update state
.transform(rdd=rdd.map(t=(t,rdd.id)))   //add RDD ID to RDD tuples

partialResultSums.print()

Now this is how I generate the RDDs and I suspect the delay is why the issue 
surfaces:

rddQueue += ssc.sparkContext.makeRDD(smallWindow1)  // smallWindow1 = 
List[(String, Long, Int)]( (a, 126232566, 1), (a, 126232566, 2) )

Thread.sleep(1100)
rddQueue += ssc.sparkContext.makeRDD(smallWindow2)   // smallWindow2= 
List[(String, Long, Int)]((a, 126232566, 1), (a, 126232566, 3))

Thread.sleep(1100)
rddQueue += ssc.sparkContext.makeRDD(smallWindow3)   // smallWindow3= 
List[(String, Long, Int)]((a, 126232566, 3))

Thread.sleep(1100)
rddQueue += ssc.sparkContext.makeRDD(smallWindow4)   // smallWindow4= 
List[(String, Long, Int)]((a, 126232566, 4))

Thread.sleep(1100)
rddQueue += ssc.sparkContext.makeRDD(smallWindow5)   // smallWindow5= 
List[(String, Long, Int)]((a, 126232566, 2))

Thread.sleep(1100)
rddQueue += ssc.sparkContext.makeRDD(smallWindow6)   // smallWindow6= 
List[(String, Long, Int)]((a, 126232566, 5), (a, 126232566, 5))

Thread.sleep(3100)
//ssc.awaitTermination()
ssc.stop()

In my use case when I detect an empty Seq[V] in updateStateByKey function I 
return None so I can filter the tuples out. However, given that Spark calls 
updateStateByKey function with empty Seq[V] when it should not, messes my logic 
up.
I wonder how to bypass this bug/feature of Spark.

Thanks
-Adrian
From: Tathagata Das [mailto:tathagata.das1...@gmail.com]
Sent: May-02-14 3:10 PM
To: user@spark.apache.org
Cc: u...@spark.incubator.apache.org
Subject: Re: another updateStateByKey question


Could be a bug. Can you share a code with data that I can use to reproduce this?

TD
On May 2, 2014 9:49 AM, Adrian Mocanu 
amoc...@verticalscope.commailto:amoc...@verticalscope.com wrote:
Has anyone else noticed that sometimes the same tuple calls update state 
function twice?
I have 2 tuples with the same key in 1 RDD part of DStream: RDD[ (a,1), (a,2) ]
When the update function is called the first time Seq[V] has data: 1, 2 which 
is correct: StateClass(3,2, ArrayBuffer(1, 2))
Then right away (in my output I see this) the same key is used and the function 
is called again but this time Seq is empty: StateClass(3,2, ArrayBuffer( ))

In the update function I also save Seq[V] to state so I can see it in the RDD. 
I also 

RE: another updateStateByKey question - updated w possible Spark bug

2014-05-05 Thread Adrian Mocanu
Forgot to mention my batch interval is 1 second:
val ssc = new StreamingContext(conf, Seconds(1))
hence the Thread.sleep(1100)

From: Adrian Mocanu [mailto:amoc...@verticalscope.com]
Sent: May-05-14 12:06 PM
To: user@spark.apache.org
Cc: u...@spark.incubator.apache.org
Subject: RE: another updateStateByKey question - updated w possible Spark bug

I’ve encountered this issue again and am able to reproduce it about 10% of the 
time.

1. Here is the input:
RDD[ (a, 126232566, 1), (a, 126232566, 2) ]
RDD[ (a, 126232566, 1), (a, 126232566, 3) ]
RDD[ (a, 126232566, 3) ]
RDD[ (a, 126232566, 4) ]
RDD[ (a, 126232566, 2) ]
RDD[ (a, 126232566, 5), (a, 126232566, 5) ]

2. Here are the actual results (printed DStream – each line is a new RDD with 
RDD Id being the last number on each line):
(((a,126232566),StateClass(3,2,ArrayBuffer(1.0, 2.0))),6)
(((a,126232566),StateClass(7,4,ArrayBuffer(1.0, 3.0))),13)
(((a,126232566),StateClass(10,5,ArrayBuffer(3.0))),20)
(((a,126232566),StateClass(10,5,ArrayBuffer())),26)   -empty elements 
Seq[V]
(((a,126232566),StateClass(14,6,ArrayBuffer(4.0))),33)
(((a,126232566),StateClass(16,7,ArrayBuffer(2.0))),40)
(((a,126232566),StateClass(26,9,ArrayBuffer(5.0, 5.0))),47)
(((a,126232566),StateClass(26,9,ArrayBuffer())),53)  -empty elements Seq[V]
(((a,126232566),StateClass(26,9,ArrayBuffer())),59)  -empty elements Seq[V]

3. Here are the expected results: (all tuples from #2 except those with empty 
Seq[V] )
(((a,126232566),StateClass(3,2,ArrayBuffer(1.0, 2.0))),6)
(((a,126232566),StateClass(7,4,ArrayBuffer(1.0, 3.0))),13)
(((a,126232566),StateClass(10,5,ArrayBuffer(3.0))),20)
(((a,126232566),StateClass(14,6,ArrayBuffer(4.0))),33)
(((a,126232566),StateClass(16,7,ArrayBuffer(2.0))),40)
(((a,126232566),StateClass(26,9,ArrayBuffer(5.0, 5.0))),47)

4. Here is the code:
case class StateClass(sum:Integer, count:Integer, elements:Seq[Double])

val updateSumFunc = (values: Seq[(String, Long, Int)], state: 
Option[StateClass]) = {
//  if (values.isEmpty) {
//// if RDD cannot find values for this key (which is from prev RDD,
//// the tuple will not be shown in this RDD w values of 0
//None
//  } else {
val previousState = state.getOrElse(StateClass(0, 0, Seq()))
val currentCount = values.size + previousState.count
var currentSum=0
for (newValue - values) yield ({
  currentSum = currentSum + newValue._3
})
currentSum= currentSum +previousState.sum
val elements = for (newValues - values) yield ({
  newValues._3.toDouble
})
Some(StateClass(currentSum, currentCount, elements))
//  }
}

val partialResultSums= inputStream.map((x:(String, Long, Int)) =((x._1), 
(x._1, x._2, x._3)))  //re map
.updateStateByKey[StateClass](updateSumFunc)  //update state
.transform(rdd=rdd.map(t=(t,rdd.id)))   //add RDD ID to RDD tuples

partialResultSums.print()

Now this is how I generate the RDDs and I suspect the delay is why the issue 
surfaces:

rddQueue += ssc.sparkContext.makeRDD(smallWindow1)  // smallWindow1 = 
List[(String, Long, Int)]( (a, 126232566, 1), (a, 126232566, 2) )

Thread.sleep(1100)
rddQueue += ssc.sparkContext.makeRDD(smallWindow2)   // smallWindow2= 
List[(String, Long, Int)]((a, 126232566, 1), (a, 126232566, 3))

Thread.sleep(1100)
rddQueue += ssc.sparkContext.makeRDD(smallWindow3)   // smallWindow3= 
List[(String, Long, Int)]((a, 126232566, 3))

Thread.sleep(1100)
rddQueue += ssc.sparkContext.makeRDD(smallWindow4)   // smallWindow4= 
List[(String, Long, Int)]((a, 126232566, 4))

Thread.sleep(1100)
rddQueue += ssc.sparkContext.makeRDD(smallWindow5)   // smallWindow5= 
List[(String, Long, Int)]((a, 126232566, 2))

Thread.sleep(1100)
rddQueue += ssc.sparkContext.makeRDD(smallWindow6)   // smallWindow6= 
List[(String, Long, Int)]((a, 126232566, 5), (a, 126232566, 5))

Thread.sleep(3100)
//ssc.awaitTermination()
ssc.stop()

In my use case when I detect an empty Seq[V] in updateStateByKey function I 
return None so I can filter the tuples out. However, given that Spark calls 
updateStateByKey function with empty Seq[V] when it should not, messes my logic 
up.
I wonder how to bypass this bug/feature of Spark.

Thanks
-Adrian
From: Tathagata Das [mailto:tathagata.das1...@gmail.com]
Sent: May-02-14 3:10 PM
To: user@spark.apache.orgmailto:user@spark.apache.org
Cc: u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org
Subject: Re: another updateStateByKey question


Could be a bug. Can you share a code with data that I can use to reproduce this?

TD
On May 2, 2014 9:49 AM, Adrian Mocanu 
amoc...@verticalscope.commailto:amoc...@verticalscope.com wrote:
Has anyone else noticed that sometimes the same tuple calls update state 
function twice?
I have 2 tuples with the same key 

Comprehensive Port Configuration reference?

2014-05-05 Thread Scott Clasen
Is there somewhere documented how one would go about configuring every open
port a spark application needs?

This seems like one of the main things that make running spark hard in
places like EC2 where you arent using the canned spark scripts.

Starting an app looks like you'll see ports open for

BlockManager
OutoutTracker
FileServer
WebUI
Local port to get callbacks from mesos master..

What else?

How do I configure all of these?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Comprehensive-Port-Configuration-reference-tp5384.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Cache issue for iteration with broadcast

2014-05-05 Thread Earthson
Yes, I've tried.

The problem is new broadcast object generated by every step until eat up all
of the memory. 

I solved it by using RDD.checkpoint to remove dependences to old broadcast
object, and use cleanner.ttl to clean up these broadcast object
automatically. 

If there's more elegant way to solve this problem, please tell me:) 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Solved-Cache-issue-for-iteration-with-broadcast-tp5350p5385.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark GCE Script

2014-05-05 Thread Matei Zaharia
Very cool! Have you thought about sending this as a pull request? We’d be happy 
to maintain it inside Spark, though it might be interesting to find a single 
Python package that can manage clusters across both EC2 and GCE.

Matei

On May 5, 2014, at 7:18 AM, Akhil Das ak...@sigmoidanalytics.com wrote:

 Hi Sparkers,
 
 We have created a quick spark_gce script which can launch a spark cluster in 
 the Google Cloud. I'm sharing it because it might be helpful for someone 
 using the Google Cloud for deployment rather than AWS.
 
 Here's the link to the script
 
 https://github.com/sigmoidanalytics/spark_gce
 
 Feel free to use it and suggest any feedback around it.
 
 In short here's what it does:
 
 Just like the spark_ec2 script, this one also reads certain command-line 
 arguments (See the github page for more details) like the cluster name and 
 all, then starts the machines in the google cloud, sets up the network, adds 
 a 500GB empty disk to all machines, generate the ssh keys on master and 
 transfer it to all slaves and install java and downloads and configures 
 Spark/Shark/Hadoop. Also it starts the shark server automatically. Currently 
 the version is 0.9.1 but I'm happy to add/support more versions if anyone is 
 interested.
 
 
 Cheers.
 
 
 Thanks
 Best Regards



Problem with sharing class across worker nodes using spark-shell on Spark 1.0.0

2014-05-05 Thread Soumya Simanta
Hi,

I'm trying to run a simple Spark job that uses a 3rd party class (in this
case twitter4j.Status) in the spark-shell using spark-1.0.0_SNAPSHOT

I'm starting my bin/spark-shell with the following command.

./spark-shell 
*--driver-class-path*$LIBPATH/jodatime2.3/joda-convert-1.2.jar:$LIBPATH/jodatime2.3/joda-time-2.3/joda-time-2.3.jar:$LIBPATH/twitter4j-core-3.0.5.jar
*--jars*
$LIBPATH/jodatime2.3/joda-convert-1.2.jar,$LIBPATH/jodatime2.3/joda-time-2.3/joda-time-2.3.jar,$LIBPATH/twitter4j-core-3.0.5.jar


My code was working fine in 0.9.1 when I used the following options that
were pointing to the same jar above.

export SPARK_CLASSPATH

export ADD_JAR


Now I'm getting a NoClassDefFoundError on each of my worker nodes

14/05/05 14:03:30 INFO TaskSetManager: Loss was due to
java.lang.NoClassDefFoundError: twitter4j/Status [duplicate 40]

14/05/05 14:03:30 INFO TaskSetManager: Starting task 0.0:26 as TID 73 on
executor 2: *worker1.xxx..* (NODE_LOCAL)


What am I missing here?


Thanks

-Soumya


Re: Spark GCE Script

2014-05-05 Thread François Le lay
Has anyone considered using jclouds tooling to support multiple cloud 
providers? Maybe using Pallet?

François

 On May 5, 2014, at 3:22 PM, Nicholas Chammas nicholas.cham...@gmail.com 
 wrote:
 
 I second this motion. :)
 
 A unified cloud deployment tool would be absolutely great.
 
 
 On Mon, May 5, 2014 at 1:34 PM, Matei Zaharia matei.zaha...@gmail.com wrote:
 Very cool! Have you thought about sending this as a pull request? We’d be 
 happy to maintain it inside Spark, though it might be interesting to find a 
 single Python package that can manage clusters across both EC2 and GCE.
 
 Matei
 
 On May 5, 2014, at 7:18 AM, Akhil Das ak...@sigmoidanalytics.com wrote:
 
 Hi Sparkers,
 
 We have created a quick spark_gce script which can launch a spark cluster 
 in the Google Cloud. I'm sharing it because it might be helpful for someone 
 using the Google Cloud for deployment rather than AWS.
 
 Here's the link to the script
 
 https://github.com/sigmoidanalytics/spark_gce
 
 Feel free to use it and suggest any feedback around it.
 
 In short here's what it does:
 
 Just like the spark_ec2 script, this one also reads certain command-line 
 arguments (See the github page for more details) like the cluster name and 
 all, then starts the machines in the google cloud, sets up the network, 
 adds a 500GB empty disk to all machines, generate the ssh keys on master 
 and transfer it to all slaves and install java and downloads and configures 
 Spark/Shark/Hadoop. Also it starts the shark server automatically. 
 Currently the version is 0.9.1 but I'm happy to add/support more versions 
 if anyone is interested.
 
 
 Cheers.
 
 
 Thanks
 Best Regards
 


Re: performance improvement on second operation...without caching?

2014-05-05 Thread Diana Carroll
Ethan, you're not the only one, which is why I was asking about this! :-)

Matei, thanks for your response. your answer explains the performance jump
in my code, but shows I've missed something key in my understanding of
Spark!

I was not aware until just now that map output was saved to disk (other
than if explicitly told to do use using persist.)  It raises almost as many
questions as it answers.

Where are the shuffle files saved?  Locally on the mapper nodes?  Is it the
same location that disk-spilled cache is saved to?  Doesn't the necessity
of saving to disk result in increased i/o that would slow the job down?  I
thought part of the goal of Spark was to do everything in memory unless the
user specifically chose to persist...thereby making a choice to incur
time/disk space expense up front in return for fast failure recovery?

Not that I'm complaining, mind you, but I do think people should be made
clearthis not only affects performance, but also, for instance, whether
the data is fresh/out of date.  I had assumed if I did not set caching,
that each time I performed an operation on an RDD, it would re-compute
based on lineage, including re-reading the files...so I didn't have to
worry about the possibility of my file content changing.  But if it's
auto-caching shuffle files, my base files won't get re-read even if the
content has changed. (Or does it check timestamps?)

Thanks,
Diana








On Mon, May 5, 2014 at 11:07 AM, Ethan Jewett esjew...@gmail.com wrote:

 Thanks Patrick and Matei for the clarification. I actually have to update
 some code now, as I was apparently relying on the fact that the output
 files are being re-used. Explains some edge-case behavior that I've seen.

 For me, at least, I read the guide, did some tests on fairly extensive RDD
 dependency graphs, saw that tasks earlier in the dependency graphs were not
 being regenerated and assumed (very much incorrectly I just found out!)
 that it was because the RDDs themselves were being cached. I wonder if
 there is a way to explain this distinction concisely in the programming
 guide. Or maybe I'm the only one that went down this incorrect learning
 path :-)

 Ethan


 On Sun, May 4, 2014 at 12:05 AM, Matei Zaharia matei.zaha...@gmail.comwrote:

 Yes, this happens as long as you use the same RDD. For example say you do
 the following:

 data1 = sc.textFile(…).map(…).reduceByKey(…)
 data1.count()
 data1.filter(…).count()

 The first count() causes outputs of the map/reduce pair in there to be
 written out to shuffle files. Next time you do a count, on either this RDD
 or a child (e.g. after the filter), we notice that output files were
 already generated for this shuffle so we don’t rerun the map stage. Note
 that the output does get read again over the network, which is kind of
 wasteful (if you really wanted to reuse this as quickly as possible you’d
 use cache()).

 Matei

 On May 3, 2014, at 8:44 PM, Koert Kuipers ko...@tresata.com wrote:

 Hey Matei,
 Not sure i understand that. These are 2 separate jobs. So the second job
 takes advantage of the fact that there is map output left somewhere on disk
 from the first job, and re-uses that?


 On Sat, May 3, 2014 at 8:29 PM, Matei Zaharia matei.zaha...@gmail.comwrote:

 Hi Diana,

 Apart from these reasons, in a multi-stage job, Spark saves the map
 output files from map stages to the filesystem, so it only needs to rerun
 the last reduce stage. This is why you only saw one stage executing. These
 files are saved for fault recovery but they speed up subsequent runs.

 Matei

 On May 3, 2014, at 5:21 PM, Patrick Wendell pwend...@gmail.com wrote:

 Ethan,

 What you said is actually not true, Spark won't cache RDD's unless you
 ask it to.

 The observation here - that running the same job can speed up
 substantially even without caching - is common. This is because other
 components in the stack are performing caching and optimizations. Two that
 can make a huge difference are:

 1. The OS buffer cache. Which will keep recently read disk blocks in
 memory.
 2. The Java just-in-time compiler (JIT) which will use runtime profiling
 to significantly speed up execution speed.

 These can make a huge difference if you are running the same job
 over-and-over. And there are other things like the OS network stack
 increasing TCP windows and so fourth. These will all improve response time
 as a spark program executes.


 On Fri, May 2, 2014 at 9:27 AM, Ethan Jewett esjew...@gmail.com wrote:

 I believe Spark caches RDDs it has memory for regardless of whether you
 actually call the 'cache' method on the RDD. The 'cache' method just tips
 off Spark that the RDD should have higher priority. At least, that is my
 experience and it seems to correspond with your experience and with my
 recollection of other discussions on this topic on the list. However, going
 back and looking at the programming guide, this is not the way the
 cache/persist behavior is described. Does the guide need to be 

Re: spark streaming question

2014-05-05 Thread Tathagata Das
One main reason why Spark Streaming can achieve higher throughput than
Storm is because Spark Streaming operates in coarser-grained batches -
second-scale massive batches - which reduce per-tuple of overheads in
shuffles, and other kinds of data movements, etc.

Note that, this is also true that this increased throughput does not come
for free: larger batches --- larger end-to-end latency. Storm may give a
lower end-to-end latency than Spark Streaming (second-scale latency with
second-scale batches). However, we have observed that for a large variety
of streaming usecases, people are often okay with second-scale latencies
but find it much harder work around the atleast-once
semantics  (double-counting, etc.) and lack of in-built state management
(state kept locally in worker can get lost if worker dies). Plus Spark
Streaming has the major advantage of having a simpler, higher-level API
than Storm and the whole Spark ecosystem (Spark SQL, MLlib, etc.) around it
that it can use for writing streaming analytics applications very easily.

Regarding Trident, we have heard from many developers that Trident gives
lower throughput than Storm due to its transactional guarantees. Its hard
to say the reasons behind the performance penalty without doing a very
detailed head-to-head analysis.

TD


On Sun, May 4, 2014 at 5:11 PM, Chris Fregly ch...@fregly.com wrote:

 great questions, weide.  in addition, i'd also like to hear more about how
 to horizontally scale a spark-streaming cluster.

 i've gone through the samples (standalone mode) and read the
 documentation, but it's still not clear to me how to scale this puppy out
 under high load.  i assume i add more receivers (kinesis, flume, etc), but
 physically how does this work?

 @TD:  can you comment?

 thanks!

 -chris


 On Sun, May 4, 2014 at 2:10 PM, Weide Zhang weo...@gmail.com wrote:

 Hi ,

 It might be a very general question to ask here but I'm curious to know
 why spark streaming can achieve better throughput than storm as claimed in
 the spark streaming paper. Does it depend on certain use cases and/or data
 source ? What drives better performance in spark streaming case or in other
 ways, what makes storm not as performant as spark streaming ?

 Also, in order to guarantee exact-once semantics when node failure
 happens,  spark makes replicas of RDDs and checkpoints so that data can be
 recomputed on the fly while on Trident case, they use transactional object
 to persist the state and result but it's not obvious to me which approach
 is more costly and why ? Any one can provide some experience here ?

 Thanks a lot,

 Weide





Local Dev Env with Mesos + Spark Streaming on Docker: Can't submit jobs.

2014-05-05 Thread Gerard Maas
Hi all,

I'm currently working on creating a set of docker images to facilitate
local development with Spark/streaming on Mesos (+zk, hdfs, kafka)

After solving the initial hurdles to get things working together in docker
containers, now everything seems to start-up correctly and the mesos UI
shows slaves as they are started.

I'm trying to submit a job from IntelliJ and the jobs submissions seem to
get lost in Mesos translation. The logs are not helping me to figure out
what's wrong, so I'm posting them here in the hope that they can ring a
bell and somebdoy could provide me a hint on what's wrong/missing with my
setup.


 DRIVER (IntelliJ running a Job.scala main) 
14/05/05 21:52:31 INFO MetadataCleaner: Ran metadata cleaner for
SHUFFLE_BLOCK_MANAGER
14/05/05 21:52:31 INFO BlockManager: Dropping broadcast blocks older than
1399319251962
14/05/05 21:52:31 INFO BlockManager: Dropping non broadcast blocks older
than 1399319251962
14/05/05 21:52:31 INFO MetadataCleaner: Ran metadata cleaner for
BROADCAST_VARS
14/05/05 21:52:31 INFO MetadataCleaner: Ran metadata cleaner for
BLOCK_MANAGER
14/05/05 21:52:32 INFO MetadataCleaner: Ran metadata cleaner for
HTTP_BROADCAST
14/05/05 21:52:32 INFO MetadataCleaner: Ran metadata cleaner for
MAP_OUTPUT_TRACKER
14/05/05 21:52:32 INFO MetadataCleaner: Ran metadata cleaner for
SPARK_CONTEXT


 MESOS MASTER 
I0505 19:52:39.718080   388 master.cpp:690] Registering framework
201405051517-67113388-5050-383-6995 at scheduler(1)@127.0.1.1:58115
I0505 19:52:39.718261   388 master.cpp:493] Framework
201405051517-67113388-5050-383-6995 disconnected
I0505 19:52:39.718277   389 hierarchical_allocator_process.hpp:332] Added
framework 201405051517-67113388-5050-383-6995
I0505 19:52:39.718312   388 master.cpp:520] Giving framework
201405051517-67113388-5050-383-6995 0ns to failover
I0505 19:52:39.718431   389 hierarchical_allocator_process.hpp:408]
Deactivated framework 201405051517-67113388-5050-383-6995
W0505 19:52:39.718459   388 master.cpp:1388] Master returning resources
offered to framework 201405051517-67113388-5050-383-6995 because the
framework has terminated or is inactive
I0505 19:52:39.718567   388 master.cpp:1376] Framework failover timeout,
removing framework 201405051517-67113388-5050-383-6995



 MESOS SLAVE 
I0505 19:49:27.66201920 slave.cpp:1191] Asked to shut down framework
201405051517-67113388-5050-383-6803 by master@172.17.0.4:5050
W0505 19:49:27.66207220 slave.cpp:1206] Cannot shut down unknown
framework 201405051517-67113388-5050-383-6803
I0505 19:49:28.66215318 slave.cpp:1191] Asked to shut down framework
201405051517-67113388-5050-383-6804 by master@172.17.0.4:5050
W0505 19:49:28.66221218 slave.cpp:1206] Cannot shut down unknown
framework 201405051517-67113388-5050-383-6804
I0505 19:49:29.66219913 slave.cpp:1191] Asked to shut down framework
201405051517-67113388-5050-383-6805 by master@172.17.0.4:5050
W0505 19:49:29.66225613 slave.cpp:1206] Cannot shut down unknown
framework 201405051517-67113388-5050-383-6805
I0505 19:49:30.66244316 slave.cpp:1191] Asked to shut down framework
201405051517-67113388-5050-383-6806 by master@172.17.0.4:5050
W0505 19:49:30.66248916 slave.cpp:1206] Cannot shut down unknown
framework 201405051517-67113388-5050-383-6806


Thanks in advance,

Gerard.


Re: Spark Streaming and JMS

2014-05-05 Thread Tathagata Das
A few high-level suggestions.

1. I recommend using the new Receiver API in almost-released Spark 1.0 (see
branch-1.0 / master branch on github). Its a slightly better version of the
earlier NetworkReceiver, as it hides away blockgenerator (which needed to
be unnecessarily manually started and stopped) and add other lifecycle
management methods like stop, restart, reportError to deal with errors in
receiving data. Also, adds ability to write custom receiver from Java. Take
a look at this 
examplehttps://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala
of
writing custom receiver in the new API. I am updating the custom receiver
guide right now (https://github.com/apache/spark/pull/652).

2. Once you create a JMSReceiver class by extending
NetworkReceiver/Receiver, you can create DStream out of the receiver by

val jmsStream = ssc.networkStream(new JMSReceiver())

3. As far as i understand from seeing the docs of
akka,camel.Consumerhttp://doc.akka.io/api/akka/2.3.2/index.html#akka.camel.Consumer,
it is essentially a specialized Akka actor. For Akka actors, there is a
ssc.actorStream, where you can specify your own actor class. You get actor
supervision (and therefore error handling, etc.) with that. See the example
AkkaWordCount - old style using
NetworkReceiverhttps://github.com/apache/spark/blob/branch-0.9/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala,
or new style using
Receiverhttps://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
.

I havent personally played around with JMS before so cant comment much on
JMS specific intricacies.

TD



On Mon, May 5, 2014 at 5:31 AM, Patrick McGloin
mcgloin.patr...@gmail.comwrote:

 Hi all,

 Is there a best practice for subscribing to JMS with Spark Streaming?  I
 have searched but not found anything conclusive.

 In the absence of a standard practice the solution I was thinking of was
 to use Akka + Camel (akka.camel.Consumer) to create a subscription for a
 Spark Streaming Custom Receiver.  So the actor would look something like
 this:

 class JmsReceiver(jmsURI: String) extends NetworkReceiver[String] with
 Consumer {
   //e.g. jms:sonicmq://localhost:2506/queue?destination=SampleQ1
   def endpointUri = jmsURI
   lazy val blockGenerator = new
 BlockGenerator(StorageLevel.MEMORY_ONLY_SER)

   protected override def onStart() {
 blockGenerator.start
   }

   def receive = {
 case msg: CamelMessage = { blockGenerator += msg.body }
 case _ = { /* ... */ }
   }

   protected override def onStop() {
 blockGenerator.stop
   }
 }

 And then in the main application create receivers like this:

 val ssc = new StreamingContext(...)
 object tascQueue extends JmsReceiver[String](ssc) {
 override def getReceiver():JmsReceiver[String] = {
  new JmsReceiver(jms
 :sonicmq://localhost:2506/queue?destination=TascQueue)
  }
 }
 ssc.registerInputStream(tascQueue)

 Is this the best way to go?

 Best regards,
 Patrick



Re: Local Dev Env with Mesos + Spark Streaming on Docker: Can't submit jobs.

2014-05-05 Thread Gerard Maas
Hi Benjamin,

Yes, we initially used a modified version of the AmpLabs docker scripts
[1]. The amplab docker images are a good starting point.
One of the biggest hurdles has been HDFS, which requires reverse-DNS and I
didn't want to go the dnsmasq route to keep the containers relatively
simple to use without the need of external scripts. Ended up running a
1-node setup nnode+dnode. I'm still looking for a better solution for HDFS
[2]

Our usecase using docker is to easily create local dev environments both
for development and for automated functional testing (using cucumber). My
aim is to strongly reduce the time of the develop-deploy-test cycle.
That  also means that we run the minimum number of instances required to
have a functionally working setup. E.g. 1 Zookeeper, 1 Kafka broker, ...

For the actual cluster deployment we have Chef-based devops toolchain that
 put things in place on public cloud providers.
Personally, I think Docker rocks and would like to replace those complex
cookbooks with Dockerfiles once the technology is mature enough.

-greetz, Gerard.

[1] https://github.com/amplab/docker-scripts
[2]
http://stackoverflow.com/questions/23410505/how-to-run-hdfs-cluster-without-dns


On Mon, May 5, 2014 at 11:00 PM, Benjamin bboui...@gmail.com wrote:

 Hi,

 Before considering running on Mesos, did you try to submit the application
 on Spark deployed without Mesos on Docker containers ?

 Currently investigating this idea to deploy quickly a complete set of
 clusters with Docker, I'm interested by your findings on sharing the
 settings of Kafka and Zookeeper across nodes. How many broker and zookeeper
 do you use ?

 Regards,



 On Mon, May 5, 2014 at 10:11 PM, Gerard Maas gerard.m...@gmail.comwrote:

 Hi all,

 I'm currently working on creating a set of docker images to facilitate
 local development with Spark/streaming on Mesos (+zk, hdfs, kafka)

 After solving the initial hurdles to get things working together in
 docker containers, now everything seems to start-up correctly and the mesos
 UI shows slaves as they are started.

 I'm trying to submit a job from IntelliJ and the jobs submissions seem to
 get lost in Mesos translation. The logs are not helping me to figure out
 what's wrong, so I'm posting them here in the hope that they can ring a
 bell and somebdoy could provide me a hint on what's wrong/missing with my
 setup.


  DRIVER (IntelliJ running a Job.scala main) 
 14/05/05 21:52:31 INFO MetadataCleaner: Ran metadata cleaner for
 SHUFFLE_BLOCK_MANAGER
 14/05/05 21:52:31 INFO BlockManager: Dropping broadcast blocks older than
 1399319251962
 14/05/05 21:52:31 INFO BlockManager: Dropping non broadcast blocks older
 than 1399319251962
 14/05/05 21:52:31 INFO MetadataCleaner: Ran metadata cleaner for
 BROADCAST_VARS
 14/05/05 21:52:31 INFO MetadataCleaner: Ran metadata cleaner for
 BLOCK_MANAGER
 14/05/05 21:52:32 INFO MetadataCleaner: Ran metadata cleaner for
 HTTP_BROADCAST
 14/05/05 21:52:32 INFO MetadataCleaner: Ran metadata cleaner for
 MAP_OUTPUT_TRACKER
 14/05/05 21:52:32 INFO MetadataCleaner: Ran metadata cleaner for
 SPARK_CONTEXT


  MESOS MASTER 
 I0505 19:52:39.718080   388 master.cpp:690] Registering framework
 201405051517-67113388-5050-383-6995 at scheduler(1)@127.0.1.1:58115
 I0505 19:52:39.718261   388 master.cpp:493] Framework
 201405051517-67113388-5050-383-6995 disconnected
 I0505 19:52:39.718277   389 hierarchical_allocator_process.hpp:332] Added
 framework 201405051517-67113388-5050-383-6995
 I0505 19:52:39.718312   388 master.cpp:520] Giving framework
 201405051517-67113388-5050-383-6995 0ns to failover
 I0505 19:52:39.718431   389 hierarchical_allocator_process.hpp:408]
 Deactivated framework 201405051517-67113388-5050-383-6995
 W0505 19:52:39.718459   388 master.cpp:1388] Master returning resources
 offered to framework 201405051517-67113388-5050-383-6995 because the
 framework has terminated or is inactive
 I0505 19:52:39.718567   388 master.cpp:1376] Framework failover timeout,
 removing framework 201405051517-67113388-5050-383-6995



  MESOS SLAVE 
 I0505 19:49:27.66201920 slave.cpp:1191] Asked to shut down framework
 201405051517-67113388-5050-383-6803 by master@172.17.0.4:5050
 W0505 19:49:27.66207220 slave.cpp:1206] Cannot shut down unknown
 framework 201405051517-67113388-5050-383-6803
 I0505 19:49:28.66215318 slave.cpp:1191] Asked to shut down framework
 201405051517-67113388-5050-383-6804 by master@172.17.0.4:5050
 W0505 19:49:28.66221218 slave.cpp:1206] Cannot shut down unknown
 framework 201405051517-67113388-5050-383-6804
 I0505 19:49:29.66219913 slave.cpp:1191] Asked to shut down framework
 201405051517-67113388-5050-383-6805 by master@172.17.0.4:5050
 W0505 19:49:29.66225613 slave.cpp:1206] Cannot shut down unknown
 framework 201405051517-67113388-5050-383-6805
 I0505 19:49:30.66244316 slave.cpp:1191] Asked to shut down framework
 201405051517-67113388-5050-383-6806 by master@172.17.0.4:5050
 

Spark 0.9.1 - saveAsSequenceFile and large RDD

2014-05-05 Thread Allen Lee
Hi,

Fairly new to Spark.  I'm using Spark's saveAsSequenceFile() to write large
Sequence Files to HDFS.  The Sequence Files need to be large to be
efficiently accessed in HDFS, preferably larger than Hadoop's block size,
64MB.  The task works for files smaller than 64 MiB (with a warning for
sequence files close to 64 MiB).  For files larger than 64 MiB, the task
fails with a libprotobuf error. Here is the full log:

14/05/05 18:18:00 INFO MesosSchedulerBackend: Registered as framework ID
201404231353-1315739402-5050-26649-0091
14/05/05 18:18:12 INFO SequenceFileRDDFunctions: Saving as sequence file of
type (LongWritable,BytesWritable)
14/05/05 18:18:14 INFO SparkContext: Starting job: saveAsSequenceFile at
X.scala:171
14/05/05 18:18:14 INFO DAGScheduler: Got job 0 (saveAsSequenceFile at
X.scala:171) with 1 output partitions (allowLocal=false)
14/05/05 18:18:14 INFO DAGScheduler: Final stage: Stage 0
(saveAsSequenceFile at X.scala:171)
14/05/05 18:18:14 INFO DAGScheduler: Parents of final stage: List()
14/05/05 18:18:14 INFO DAGScheduler: Missing parents: List()
14/05/05 18:18:14 INFO DAGScheduler: Submitting Stage 0
(ParallelCollectionRDD[0] at makeRDD at X.scala:170), which has no
missing parents
14/05/05 18:18:19 INFO DAGScheduler: Submitting 1 missing tasks from Stage
0 (ParallelCollectionRDD[0] at makeRDD at X.scala:170)
14/05/05 18:18:19 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
14/05/05 18:18:19 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on
executor 201404231353-1315739402-5050-26649-3: dn-04 (PROCESS_LOCAL)
14/05/05 18:18:23 INFO TaskSetManager: Serialized task 0.0:0 as 113006452
bytes in 3890 ms
[libprotobuf ERROR google/protobuf/io/coded_stream.cc:171] A protocol
message was rejected because it was too big (more than 67108864 bytes).  To
increase the limit (or to disable these warnings), see
CodedInputStream::SetTotalBytesLimit() in google/protobuf/io/coded_stream.h.
F0505 18:18:24.616025 27889 construct.cpp:48] Check failed: parsed
Unexpected failure while parsing protobuf
*** Check failure stack trace: ***
@ 0x7fc8d49ba96d  google::LogMessage::Fail()
@ 0x7fc8d49be987  google::LogMessage::SendToLog()
@ 0x7fc8d49bc809  google::LogMessage::Flush()
@ 0x7fc8d49bcb0d  google::LogMessageFatal::~LogMessageFatal()



The code is fairly simple

val kv = large Seq of Key Value pairs

//set parallelism to 1 to keep the file from being partitioned
sc.makeRDD(kv,1)
   .saveAsSequenceFile(path)


Does anyone have any pointers on how to get past this?

Thanks,

-- 
*Allen Lee*
Software Engineer
MediaCrossing Inc.


Re: Incredible slow iterative computation

2014-05-05 Thread Andrea Esposito
Update: Checkpointing it doesn't perform. I checked by the isCheckpointed
method but it returns always false. ???


2014-05-05 23:14 GMT+02:00 Andrea Esposito and1...@gmail.com:

 Checkpoint doesn't help it seems. I do it at each iteration/superstep.

 Looking deeply, the RDDs are recomputed just few times at the initial
 'phase' after they aren't recomputed anymore. I attach screenshots:
 bootstrap phase, recompute section and after. This is still unexpected
 because i persist all the intermediate results.

 Anyway the time of each iteration degrates perpetually, as instance: at
 the first superstep it takes 3 sec and at 70 superstep it takes 8 sec.

 An iteration, looking at the screenshot, is from row 528 to 122.

 Any idea where to investigate?


 2014-05-02 22:28 GMT+02:00 Andrew Ash and...@andrewash.com:

 If you end up with a really long dependency tree between RDDs (like 100+)
 people have reported success with using the .checkpoint() method.  This
 computes the RDD and then saves it, flattening the dependency tree.  It
 turns out that having a really long RDD dependency graph causes
 serialization sizes of tasks to go up, plus any failures causes a long
 sequence of operations to regenerate the missing partition.

 Maybe give that a shot and see if it helps?


 On Fri, May 2, 2014 at 3:29 AM, Andrea Esposito and1...@gmail.comwrote:

 Sorry for the very late answer.

 I carefully follow what you have pointed out and i figure out that the
 structure used for each record was too big with many small objects.
 Changing it the memory usage drastically decrease.

 Despite that i'm still struggling with the behaviour of decreasing
 performance along supersteps. Now the memory footprint is much less than
 before and GC time is not noticeable anymore.
 I supposed that some RDDs are recomputed and watching carefully the
 stages there is evidence of that but i don't understand why it's happening.

 Recalling my usage pattern:

 newRdd = oldRdd.map(myFun).persist(myStorageLevel)

 newRdd.foreach(x = {}) // Force evaluation

 oldRdd.unpersist(true)


 According to my usage pattern i tried to don't unpersist the
 intermediate RDDs (i.e. oldRdd) but nothing change.

 Any hints? How could i debug this?



 2014-04-14 12:55 GMT+02:00 Andrew Ash and...@andrewash.com:

 A lot of your time is being spent in garbage collection (second image).
  Maybe your dataset doesn't easily fit into memory?  Can you reduce the
 number of new objects created in myFun?

 How big are your heap sizes?

 Another observation is that in the 4th image some of your RDDs are
 massive and some are tiny.


 On Mon, Apr 14, 2014 at 11:45 AM, Andrea Esposito and1...@gmail.comwrote:

 Hi all,

 i'm developing an iterative computation over graphs but i'm struggling
 with some embarrassing low performaces.

 The computation is heavily iterative and i'm following this rdd usage
 pattern:

 newRdd = oldRdd.map(myFun).persist(myStorageLevel)

 newRdd.foreach(x = {}) // Force evaluation
 oldRdd.unpersist(true)


 I'm using a machine equips by 30 cores and 120 GB of RAM.
 As an example i've run with a small graph of 4000 verts and 80
 thousand edges and the performance at the first iterations are 10+ minutes
 and after they take lots more.
 I attach the Spark UI screenshots of just the first 2 iterations.

 I tried with MEMORY_ONLY_SER and MEMORY_AND_DISK_SER and also i
 changed the spark.shuffle.memoryFraction to 0.3 but nothing changed 
 (with
 so lot of RAM for 4E10 verts these settings are quite pointless i guess).

 How should i continue to investigate?

 Any advices are very very welcome, thanks.

 Best,
 EA








Re: Incredible slow iterative computation

2014-05-05 Thread Earthson
checkpoint seems to be just add a CheckPoint mark? You need an action after
marked it. I have tried it with success:)

newRdd = oldRdd.map(myFun).persist(myStorageLevel)
newRdd.checkpoint // checkpoint here
newRdd.isCheckpointed // false here
newRdd.foreach(x = {}) // Force evaluation
newRdd.isCheckpointed // true here
oldRdd.unpersist(true) 




If you have new broadcast object for each step of iteration, broadcast will
eat up all of the memory. You may need to set spark.cleaner.ttl to a small
enough value.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Incredible-slow-iterative-computation-tp4204p5407.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


答复: java.io.FileNotFoundException: /test/spark-0.9.1/work/app-20140505053550-0000/2/stdout (No such file or directory)

2014-05-05 Thread Francis . Hu
The file does not exist in fact and no permission issue. 

 

francis@ubuntu-4:/test/spark-0.9.1$ ll work/app-20140505053550-/

total 24

drwxrwxr-x  6 francis francis 4096 May  5 05:35 ./

drwxrwxr-x 11 francis francis 4096 May  5 06:18 ../

drwxrwxr-x  2 francis francis 4096 May  5 05:35 2/

drwxrwxr-x  2 francis francis 4096 May  5 05:35 4/

drwxrwxr-x  2 francis francis 4096 May  5 05:35 7/

drwxrwxr-x  2 francis francis 4096 May  5 05:35 9/

 

Francis

 

发件人: Tathagata Das [mailto:tathagata.das1...@gmail.com] 
发送时间: Tuesday, May 06, 2014 3:45
收件人: user@spark.apache.org
主题: Re: java.io.FileNotFoundException: 
/test/spark-0.9.1/work/app-20140505053550-/2/stdout (No such file or 
directory)

 

Do those file actually exist? Those stdout/stderr should have the output of the 
spark's executors running in the workers, and its weird that they dont exist. 
Could be permission issue - maybe the directories/files are not being generated 
because it cannot?

 

TD

 

On Mon, May 5, 2014 at 3:06 AM, Francis.Hu francis...@reachjunction.com wrote:

Hi,All

 

 

We run a spark cluster with three workers. 

created a spark streaming application,

then run the spark project using below command:

 

shell sbt run spark://192.168.219.129:7077 tcp://192.168.20.118:5556 foo

 

we looked at the webui of workers, jobs failed without any error or info, but 
FileNotFoundException occurred in workers' log file as below:

Is this an existent issue of spark? 

 

 

-in workers' 
logs/spark-francis-org.apache.spark.deploy.worker.Worker-1-ubuntu-4.out

 

14/05/05 02:39:39 WARN AbstractHttpConnection: 
/logPage/?appId=app-20140505053550-executorId=2logType=stdout

java.io.FileNotFoundException: 
/test/spark-0.9.1/work/app-20140505053550-/2/stdout (No such file or 
directory)

at java.io.FileInputStream.open(Native Method)

at java.io.FileInputStream.init(FileInputStream.java:138)

at org.apache.spark.util.Utils$.offsetBytes(Utils.scala:687)

at 
org.apache.spark.deploy.worker.ui.WorkerWebUI.logPage(WorkerWebUI.scala:119)

at 
org.apache.spark.deploy.worker.ui.WorkerWebUI$$anonfun$6.apply(WorkerWebUI.scala:52)

at 
org.apache.spark.deploy.worker.ui.WorkerWebUI$$anonfun$6.apply(WorkerWebUI.scala:52)

at org.apache.spark.ui.JettyUtils$$anon$1.handle(JettyUtils.scala:61)

at 
org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1040)

at 
org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:976)

at 
org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:135)

at 
org.eclipse.jetty.server.handler.HandlerList.handle(HandlerList.java:52)

at 
org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:116)

at org.eclipse.jetty.server.Server.handle(Server.java:363)

at 
org.eclipse.jetty.server.AbstractHttpConnection.handleRequest(AbstractHttpConnection.java:483)

at 
org.eclipse.jetty.server.AbstractHttpConnection.headerComplete(AbstractHttpConnection.java:920)

at 
org.eclipse.jetty.server.AbstractHttpConnection$RequestHandler.headerComplete(AbstractHttpConnection.java:982)

at org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:635)

at org.eclipse.jetty.http.HttpParser.parseAvailable(HttpParser.java:235)

at 
org.eclipse.jetty.server.AsyncHttpConnection.handle(AsyncHttpConnection.java:82)

at 
org.eclipse.jetty.io.nio.SelectChannelEndPoint.handle(SelectChannelEndPoint.java:628)

at 
org.eclipse.jetty.io.nio.SelectChannelEndPoint$1.run(SelectChannelEndPoint.java:52)

at 
org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)

at 
org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)

at java.lang.Thread.run(Thread.java:722)

14/05/05 02:39:41 WARN AbstractHttpConnection: 
/logPage/?appId=app-20140505053550-executorId=9logType=stderr

java.io.FileNotFoundException: 
/test/spark-0.9.1/work/app-20140505053550-/9/stderr (No such file or 
directory)

at java.io.FileInputStream.open(Native Method)

at java.io.FileInputStream.init(FileInputStream.java:138)

at org.apache.spark.util.Utils$.offsetBytes(Utils.scala:687)

at 
org.apache.spark.deploy.worker.ui.WorkerWebUI.logPage(WorkerWebUI.scala:119)

at 
org.apache.spark.deploy.worker.ui.WorkerWebUI$$anonfun$6.apply(WorkerWebUI.scala:52)

at 
org.apache.spark.deploy.worker.ui.WorkerWebUI$$anonfun$6.apply(WorkerWebUI.scala:52)

at org.apache.spark.ui.JettyUtils$$anon$1.handle(JettyUtils.scala:61)

at 
org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1040)

at 
org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:976)

at 

details about event log

2014-05-05 Thread wxhsdp
Hi,

i'am looking at the event log, i'am a little confuse about some metrics

here's the info of one task:

Launch Time:1399336904603
Finish Time:1399336906465
Executor Run Time:1781
Shuffle Read Metrics:Shuffle Finish Time:1399336906027, Fetch Wait
Time:0
Shuffle Write Metrics:{Shuffle Bytes Written:12865587,Shuffle Write
Time:11804679}

(Shuffle Finish Time - Launch Time) is the time to fetch block written by
previous stage
(Finish Time - Shuffle Finish Time) is the time to do the task job
is that right?

and what does Fetch Wait Time mean?  i'am also confused about  Shuffle
Write Time, why is it
so big? what's the measurement unit of it?

thank you :)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/details-about-event-log-tp5411.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


RE: sbt/sbt run command returns a JVM problem

2014-05-05 Thread Carter
hi I still have over 1g left for my program.

Date: Sun, 4 May 2014 19:14:30 -0700
From: ml-node+s1001560n5340...@n3.nabble.com
To: gyz...@hotmail.com
Subject: Re: sbt/sbt run command returns a JVM problem



the total memory of your machine is 2G right?then how much memory is 
left free? wouldn`t ubuntu take up quite a big portion of 2G?
just a guess!


On Sat, May 3, 2014 at 8:15 PM, Carter [hidden email] wrote:

Hi, thanks for all your help.

I tried your setting in the sbt file, but the problem is still there.



The Java setting in my sbt file is:

java \

  -Xmx1200m -XX:MaxPermSize=350m -XX:ReservedCodeCacheSize=256m \

  -jar ${JAR} \

  $@



I have tried to set these 3 parameters bigger and smaller, but nothing

works. Did I change the right thing?



Thank you very much.







--

View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/sbt-sbt-run-command-returns-a-JVM-problem-tp5157p5267.html


Sent from the Apache Spark User List mailing list archive at Nabble.com.














If you reply to this email, your message will be added to the 
discussion below:

http://apache-spark-user-list.1001560.n3.nabble.com/sbt-sbt-run-command-returns-a-JVM-problem-tp5157p5340.html



To unsubscribe from sbt/sbt run command returns a JVM 
problem, click here.

NAML
  



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/sbt-sbt-run-command-returns-a-JVM-problem-tp5157p5412.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: How to use spark-submit

2014-05-05 Thread Soumya Simanta


Yes, I'm struggling with a similar problem where my class are not found on the 
worker nodes. I'm using 1.0.0_SNAPSHOT.  I would really appreciate if someone 
can provide some documentation on the usage of spark-submit. 

Thanks 

 On May 5, 2014, at 10:24 PM, Stephen Boesch java...@gmail.com wrote:
 
 
 I have a spark streaming application that uses the external streaming modules 
 (e.g. kafka, mqtt, ..) as well.  It is not clear how to properly invoke the 
 spark-submit script: what are the ---driver-class-path and/or 
 -Dspark.executor.extraClassPath parameters required?  
 
  For reference, the following error is proving difficult to resolve:
 
 java.lang.ClassNotFoundException: 
 org.apache.spark.streaming.examples.StreamingExamples
 


Can I share RDD between a pyspark and spark API

2014-05-05 Thread manas Kar
Hi experts.
 I have some pre-built python parsers that I am planning to use, just
because I don't want to write them again in scala. However after the data is
parsed I would like to take the RDD and use it in a scala program.(Yes, I
like scala more than python and more comfortable in scala :)

In doing so I don't want to push the parsed data to disk and then re-obtain
it via the scala class. Is there a way I can achieve what I want in an
efficient way?

..Manas



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Can-I-share-RDD-between-a-pyspark-and-spark-API-tp5415.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


about broadcast

2014-05-05 Thread randylu
  In my code, there are two broadcast variables. Sometimes reading the small
one took more time than the big one, so strange!
  Log on slave node is as follows:
Block broadcast_2 stored as values to memory (estimated size *4.0 KB*, free
17.2 GB)
Reading broadcast variable 2 took *9.998537123* s
Block broadcast_1 stored as values to memory (estimated size *705.9 MB*,
free 16.5 GB)
Reading broadcast variable 1 took *2.596005629* s
  Reading the small one took about 0.004s normally, but more then 9s
Occasionally.
  




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/about-broadcast-tp5416.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: about broadcast

2014-05-05 Thread randylu
additional, Reading the big broadcast variable always took about 2s.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/about-broadcast-tp5416p5417.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.