spark streaming kafka output
Hi , Is there any code to implement a kafka output for spark streaming? My use case is all the output need to be dumped back to kafka cluster again after data is processed ? What will be guideline to implement such function ? I heard foreachRDD will create one instance of producer per batch ? If so, will that hurt performance ? Thanks, Weide
Re: NoSuchMethodError: breeze.linalg.DenseMatrix
Since the breeze jar is brought into spark by mllib package, you may want to add mllib as your dependency in spark 1.0. For bring it from your application yourself, you can either use sbt assembly in ur build project to generate a flat myApp-assembly.jar which contains breeze jar, or use spark add jar api like Yadid said. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Sun, May 4, 2014 at 10:24 PM, wxhsdp wxh...@gmail.com wrote: Hi, DB, i think it's something related to sbt publishLocal if i remove the breeze dependency in my sbt file, breeze can not be found [error] /home/wxhsdp/spark/example/test/src/main/scala/test.scala:5: not found: object breeze [error] import breeze.linalg._ [error]^ here's my sbt file: name := Build Project version := 1.0 scalaVersion := 2.10.4 libraryDependencies += org.apache.spark %% spark-core % 1.0.0-SNAPSHOT resolvers += Akka Repository at http://repo.akka.io/releases/; i run sbt publishLocal on the Spark tree. but if i manully put spark-assembly-1.0.0-SNAPSHOT-hadoop1.0.4.jar in /lib directory, sbt package is ok, i can run my app in workers without addJar what's the difference between add dependency in sbt after sbt publishLocal and manully put spark-assembly-1.0.0-SNAPSHOT-hadoop1.0.4.jar in /lib directory? why can i run my app in worker without addJar this time? DB Tsai-2 wrote If you add the breeze dependency in your build.sbt project, it will not be available to all the workers. There are couple options, 1) use sbt assembly to package breeze into your application jar. 2) manually copy breeze jar into all the nodes, and have them in the classpath. 3) spark 1.0 has breeze jar in the spark flat assembly jar, so you don't need to add breeze dependency yourself. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Sun, May 4, 2014 at 4:07 AM, wxhsdp lt; wxhsdp@ gt; wrote: Hi, i'am trying to use breeze linalg library for matrix operation in my spark code. i already add dependency on breeze in my build.sbt, and package my code sucessfully. when i run on local mode, sbt run local..., everything is ok but when turn to standalone mode, sbt run spark://127.0.0.1:7077 ..., error occurs 14/05/04 18:56:29 WARN scheduler.TaskSetManager: Loss was due to java.lang.NoSuchMethodError java.lang.NoSuchMethodError: breeze.linalg.DenseMatrix$.implOpMulMatrix_DMD_DMD_eq_DMD()Lbreeze/linalg/operators/DenseMatrixMultiplyStuff$implOpMulMatrix_DMD_DMD_eq_DMD$; in my opinion, everything needed is packaged to the jar file, isn't it? and does anyone used breeze before? is it good for matrix operation? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodError-breeze-linalg-DenseMatrix-tp5310.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodError-breeze-linalg-DenseMatrix-tp5310p5355.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Check your cluster UI to ensure that workers are registered and have sufficient memory
I executed the following commands to launch spark app with yarn client mode. I have Hadoop 2.3.0, Spark 0.8.1 and Scala 2.9.3 SPARK_HADOOP_VERSION=2.3.0 SPARK_YARN=true sbt/sbt assembly SPARK_YARN_MODE=true \ SPARK_JAR=./assembly/target/scala-2.9.3/spark-assembly-0.8.1-incubating-hadoop2.3.0.jar \ SPARK_YARN_APP_JAR=examples/target/scala-2.9.3/spark-examples-assembly-0.8.1-incubating.jar MASTER=yarn-client ./spark-shell The spark context in the interactive shell is set properly, but after that when i submit jobs, it tells that the application has not received any resources. LOGS: DAGScheduler: Submitting 4 missing tasks from Stage 0 (MappedRDD[1] at textFile at console:12) YarnClientClusterScheduler: Adding task set 0.0 with 4 tasks WARN YarnClientClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory What have i missed, i did start spark master and worker and have configured SPARK_MEM. Any help will be great !!
unsibscribe
unsibscribe Thank you, Konstantin Kudryavtsev
unsubscribe
unsubscribe
Re: Cache issue for iteration with broadcast
.set(spark.cleaner.ttl, 120) drops broadcast_0 which makes a Exception below. It is strange, because broadcast_0 is no need, and I have broadcast_3 instead, and recent RDD is persisted, there is no need for recomputing... what is the problem? need help. ~~~ 14/05/05 17:03:12 INFO storage.MemoryStore: ensureFreeSpace(52474562) called with curMem=145126640, maxMem=1145359564 14/05/05 17:03:12 INFO storage.MemoryStore: Block broadcast_3 stored as values to memory (estimated size 50.0 MB, free 903.9 MB) 14/05/05 17:03:12 INFO scheduler.DAGScheduler: shuffleToMapStage 0 -- 0 14/05/05 17:03:12 INFO scheduler.DAGScheduler: stageIdToStage 0 -- 0 14/05/05 17:03:12 INFO scheduler.DAGScheduler: stageIdToJobIds 0 -- 0 ~ Exception in thread Thread-3 java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:154) Caused by: org.apache.spark.SparkException: Job aborted: Task 9.0:48 failed 4 times (most recent failure: Exception failure: java.io.FileNotFoundException: http://192.168.7.41:3503/broadcast_0) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Cache-issue-for-iteration-with-broadcast-tp5350p5364.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Is any idea on architecture based on Spark + Spray + Akka
Hi, Yi Your project sounds interesting to me, Im also working on 3g4g communication domain, besides Ive also done a tiny project based on hadoop, which analyzes execution logs. Recently, Im planed to pick it up again. So, if you don't mind, may i know the introduction of your log analyzing project. Regards Yuding Sent from my iPhone On 2014-5-5, at 11:37, ZhangYi yizh...@thoughtworks.com wrote: Hi all, Currently, our project is planning to adopt spark to be big data platform. For the client side, we decide expose REST api based on Spray. Our domain is focus on communication field for 3G and 4G user of processing some data analyst and statictics . Now, Spark + Spray is brand new for us, and we can't find any best practice via google. In our opinion, event-driven architecture is good choice for our project maybe. However, more idea is welcome. Thanks. -- ZhangYi (张逸) Developer tel: 15023157626 blog: agiledon.github.com weibo: tw张逸 Sent with Sparrow
unsibscribe
unsibscribe Regards, Chhaya Vishwakarma The contents of this e-mail and any attachment(s) may contain confidential or privileged information for the intended recipient(s). Unintended recipients are prohibited from taking action on the basis of information in this e-mail and using or disseminating the information, and must notify the sender and delete it from their system. LT Infotech will not accept responsibility or liability for the accuracy or completeness of, or the presence of any virus or disabling code in this e-mail
java.io.FileNotFoundException: /test/spark-0.9.1/work/app-20140505053550-0000/2/stdout (No such file or directory)
Hi,All We run a spark cluster with three workers. created a spark streaming application, then run the spark project using below command: shell sbt run spark://192.168.219.129:7077 tcp://192.168.20.118:5556 foo we looked at the webui of workers, jobs failed without any error or info, but FileNotFoundException occurred in workers' log file as below: Is this an existent issue of spark? -in workers' logs/spark-francis-org.apache.spark.deploy.worker.Worker-1-ubuntu-4.out- --- 14/05/05 02:39:39 WARN AbstractHttpConnection: /logPage/?appId=app-20140505053550-executorId=2logType=stdout java.io.FileNotFoundException: /test/spark-0.9.1/work/app-20140505053550-/2/stdout (No such file or directory) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.init(FileInputStream.java:138) at org.apache.spark.util.Utils$.offsetBytes(Utils.scala:687) at org.apache.spark.deploy.worker.ui.WorkerWebUI.logPage(WorkerWebUI.scala:119) at org.apache.spark.deploy.worker.ui.WorkerWebUI$$anonfun$6.apply(WorkerWebUI.s cala:52) at org.apache.spark.deploy.worker.ui.WorkerWebUI$$anonfun$6.apply(WorkerWebUI.s cala:52) at org.apache.spark.ui.JettyUtils$$anon$1.handle(JettyUtils.scala:61) at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java :1040) at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java: 976) at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:135 ) at org.eclipse.jetty.server.handler.HandlerList.handle(HandlerList.java:52) at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:1 16) at org.eclipse.jetty.server.Server.handle(Server.java:363) at org.eclipse.jetty.server.AbstractHttpConnection.handleRequest(AbstractHttpCo nnection.java:483) at org.eclipse.jetty.server.AbstractHttpConnection.headerComplete(AbstractHttpC onnection.java:920) at org.eclipse.jetty.server.AbstractHttpConnection$RequestHandler.headerComplet e(AbstractHttpConnection.java:982) at org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:635) at org.eclipse.jetty.http.HttpParser.parseAvailable(HttpParser.java:235) at org.eclipse.jetty.server.AsyncHttpConnection.handle(AsyncHttpConnection.java :82) at org.eclipse.jetty.io.nio.SelectChannelEndPoint.handle(SelectChannelEndPoint. java:628) at org.eclipse.jetty.io.nio.SelectChannelEndPoint$1.run(SelectChannelEndPoint.j ava:52) at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java: 608) at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:5 43) at java.lang.Thread.run(Thread.java:722) 14/05/05 02:39:41 WARN AbstractHttpConnection: /logPage/?appId=app-20140505053550-executorId=9logType=stderr java.io.FileNotFoundException: /test/spark-0.9.1/work/app-20140505053550-/9/stderr (No such file or directory) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.init(FileInputStream.java:138) at org.apache.spark.util.Utils$.offsetBytes(Utils.scala:687) at org.apache.spark.deploy.worker.ui.WorkerWebUI.logPage(WorkerWebUI.scala:119) at org.apache.spark.deploy.worker.ui.WorkerWebUI$$anonfun$6.apply(WorkerWebUI.s cala:52) at org.apache.spark.deploy.worker.ui.WorkerWebUI$$anonfun$6.apply(WorkerWebUI.s cala:52) at org.apache.spark.ui.JettyUtils$$anon$1.handle(JettyUtils.scala:61) at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java :1040) at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java: 976) at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:135 ) at org.eclipse.jetty.server.handler.HandlerList.handle(HandlerList.java:52) at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:1 16) at org.eclipse.jetty.server.Server.handle(Server.java:363) at org.eclipse.jetty.server.AbstractHttpConnection.handleRequest(AbstractHttpCo nnection.java:483) at org.eclipse.jetty.server.AbstractHttpConnection.headerComplete(AbstractHttpC onnection.java:920) at org.eclipse.jetty.server.AbstractHttpConnection$RequestHandler.headerComplet e(AbstractHttpConnection.java:982) at org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:635) at org.eclipse.jetty.http.HttpParser.parseAvailable(HttpParser.java:235) at org.eclipse.jetty.server.AsyncHttpConnection.handle(AsyncHttpConnection.java :82) at org.eclipse.jetty.io.nio.SelectChannelEndPoint.handle(SelectChannelEndPoint. java:628) at org.eclipse.jetty.io.nio.SelectChannelEndPoint$1.run(SelectChannelEndPoint.j
Re: Cache issue for iteration with broadcast
Using checkpoint. It removes dependences:) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Cache-issue-for-iteration-with-broadcast-tp5350p5368.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Cache issue for iteration with broadcast
RDD.checkpoint works fine. But spark.cleaner.ttl is really ugly for broadcast cleaning. May be it could be removed automatically when no dependences. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Cache-issue-for-iteration-with-broadcast-tp5350p5369.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Cache issue for iteration with broadcast
Have you tried Broadcast.unpersist()? On Mon, May 5, 2014 at 6:34 PM, Earthson earthson...@gmail.com wrote: RDD.checkpoint works fine. But spark.cleaner.ttl is really ugly for broadcast cleaning. May be it could be removed automatically when no dependences. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Cache-issue-for-iteration-with-broadcast-tp5350p5369.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Spark Streaming and JMS
Hi all, Is there a best practice for subscribing to JMS with Spark Streaming? I have searched but not found anything conclusive. In the absence of a standard practice the solution I was thinking of was to use Akka + Camel (akka.camel.Consumer) to create a subscription for a Spark Streaming Custom Receiver. So the actor would look something like this: class JmsReceiver(jmsURI: String) extends NetworkReceiver[String] with Consumer { //e.g. jms:sonicmq://localhost:2506/queue?destination=SampleQ1 def endpointUri = jmsURI lazy val blockGenerator = new BlockGenerator(StorageLevel.MEMORY_ONLY_SER) protected override def onStart() { blockGenerator.start } def receive = { case msg: CamelMessage = { blockGenerator += msg.body } case _ = { /* ... */ } } protected override def onStop() { blockGenerator.stop } } And then in the main application create receivers like this: val ssc = new StreamingContext(...) object tascQueue extends JmsReceiver[String](ssc) { override def getReceiver():JmsReceiver[String] = { new JmsReceiver(jms:sonicmq://localhost:2506/queue?destination=TascQueue) } } ssc.registerInputStream(tascQueue) Is this the best way to go? Best regards, Patrick
Re: master attempted to re-register the worker and then took all workers as unregistered
Ah, I think this should be fixed in 0.9.1? Did you see the exception is thrown in the worker side? Best, -- Nan Zhu On Sunday, May 4, 2014 at 10:15 PM, Cheney Sun wrote: Hi Nan, Have you found a way to fix the issue? Now I run into the same problem with version 0.9.1. Thanks, Cheney -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/master-attempted-to-re-register-the-worker-and-then-took-all-workers-as-unregistered-tp553p5341.html Sent from the Apache Spark User List mailing list archive at Nabble.com (http://Nabble.com).
Re: Shark on cloudera CDH5 error
No replies yet. Guess everyone who had this problem knew the obvious reason why the error occurred. It took me some time to figure out the work around though. It seems shark depends on /var/lib/spark/shark-0.9.1/lib_managed/jars/org.apache.hadoop/hadoop-core/hadoop-core.jar for client server communication. CDH5 should rely on hadoop-core-2.3.0-mr1-cdh5.0.0.jar. 1) Grab it from other CDH modules(I chose hadoop) and get this jar from it's library. 2) Remove the jar in /var/lib/spark/shark-0.9.1/lib_managed/jars/org.apache.hadoop/hadoop-core 3) place the jar from(step1) in hadoop-core folder of step2. Hope this saves some time for some one who has the similar problem. ..Manas -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Shark-on-cloudera-CDH5-error-tp5226p5374.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: configure spark history server for running on Yarn
Since 1.0 is still in development you can pick up the latest docs in git: https://github.com/apache/spark/tree/branch-1.0/docs I didn't see anywhere that you said you started the spark history server? there are multiple things that need to happen for the spark history server to work. 1) configure your application to save the history logs - see the eventLog settings here https://github.com/apache/spark/blob/branch-1.0/docs/configuration.md 2) On yarn - know the host/port where you are going to start the spark history server and configure: spark.yarn.historyServer.address to point to it. Note that this purely makes the link from the ResourceManager UI properly point to the Spark History Server Daemon. 3) Start the spark history server pointing to the same directory as specified in your application (spark.eventLog.dir) 4) run your application. once it finishes then you can either go to the RM UI to link to the spark history UI or go directly to the spark history server ui. Tom On Thursday, May 1, 2014 7:09 PM, Jenny Zhao linlin200...@gmail.com wrote: Hi, I have installed spark 1.0 from the branch-1.0, build went fine, and I have tried running the example on Yarn client mode, here is my command: /home/hadoop/spark-branch-1.0/bin/spark-submit /home/hadoop/spark-branch-1.0/examples/target/scala-2.10/spark-examples-1.0.0-hadoop2.2.0.jar --master yarn --deploy-mode client --executor-memory 6g --executor-cores 3 --driver-memory 3g --name SparkPi --num-executors 2 --class org.apache.spark.examples.SparkPi yarn-client 5 after the run, I was not being able to retrieve the log from Yarn's web UI, while I have tried to specify the history server in spark-env.sh export SPARK_DAEMON_JAVA_OPTS=-Dspark.yarn.historyServer.address=master:18080 I also tried to specify it in spark-defaults.conf, doesn't work as well, I would appreciate if someone can tell me what is the way of specifying it either in spark-env.sh or spark-defaults.conf, so that this option can be applied to any spark application. another thing I found is the usage output for spark-submit is not complete/not in sync with the online documentation, hope it is addressed with the formal release. and is this the latest documentation for spark 1.0? http://people.csail.mit.edu/matei/spark-unified-docs/running-on-yarn.html Thank you!
Spark GCE Script
Hi Sparkers, We have created a quick spark_gce script which can launch a spark cluster in the Google Cloud. I'm sharing it because it might be helpful for someone using the Google Cloud for deployment rather than AWS. Here's the link to the script https://github.com/sigmoidanalytics/spark_gce Feel free to use it and suggest any feedback around it. In short here's what it does: Just like the spark_ec2 script, this one also reads certain command-line arguments (See the github pagehttps://github.com/sigmoidanalytics/spark_gce for more details) like the cluster name and all, then starts the machines in the google cloud, sets up the network, adds a 500GB empty disk to all machines, generate the ssh keys on master and transfer it to all slaves and install java and downloads and configures Spark/Shark/Hadoop. Also it starts the shark server automatically. Currently the version is 0.9.1 but I'm happy to add/support more versions if anyone is interested. Cheers. Thanks Best Regards
Re: Using google cloud storage for spark big data
Hi Aureliano, You might want to check this script out, https://github.com/sigmoidanalytics/spark_gce Let me know if you need any help around that. Thanks Best Regards On Tue, Apr 22, 2014 at 7:12 PM, Aureliano Buendia buendia...@gmail.comwrote: On Tue, Apr 22, 2014 at 10:50 AM, Andras Nemeth andras.nem...@lynxanalytics.com wrote: We don't have anything fancy. It's basically some very thin layer of google specifics on top of a stand alone cluster. We basically created two disk snapshots, one for the master and one for the workers. The snapshots contain initialization scripts so that the master/worker daemons are started on boot. So if I want a cluster I just create a new instance (with a fixed name) using the master snapshot for the master. When it is up I start as many slave instances as I need using the slave snapshot. By the time the machines are up the cluster is ready to be used. This sounds like being a lot simpler than the existing spark-ec2 script. Does google compute engine api makes this happen in a simple way, when compared to ec2 api? Does your script do everything spark-ec2 does? Also, any plans to make this open source? Andras On Mon, Apr 21, 2014 at 10:04 PM, Mayur Rustagi mayur.rust...@gmail.comwrote: Okay just commented on another thread :) I have one that I use internally. Can give it out but will need some support from you to fix bugs etc. Let me know if you are interested. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Apr 18, 2014 at 9:08 PM, Aureliano Buendia buendia...@gmail.com wrote: Thanks, Andras. What approach did you use to setup a spark cluster on google compute engine? Currently, there is no production-ready official support for an equivalent of spark-ec2 on gce. Did you roll your own? On Thu, Apr 17, 2014 at 10:24 AM, Andras Nemeth andras.nem...@lynxanalytics.com wrote: Hello! On Wed, Apr 16, 2014 at 7:59 PM, Aureliano Buendia buendia...@gmail.com wrote: Hi, Google has publisheed a new connector for hadoop: google cloud storage, which is an equivalent of amazon s3: googlecloudplatform.blogspot.com/2014/04/google-bigquery-and-datastore-connectors-for-hadoop.html This is actually about Cloud Datastore and not Cloud Storage (yeah, quite confusing naming ;) ). But they do already have for a while a cloud storage connector, also linked from your article: https://developers.google.com/hadoop/google-cloud-storage-connector How can spark be configured to use this connector? Yes, it can, but in a somewhat hacky way. The problem is that for some reason Google does not officially publish the library jar alone, you get it installed as part of a Hadoop on Google Cloud installation. So, the official way would be (we did not try that) to have a Hadoop on Google Cloud installation and run spark on top of that. The other option - that we did try and which works fine for us - is to snatch the jar: https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-1.2.4.jar, make sure it's shipped to your workers (e.g. with setJars on SparkConf when you create your SparkContext). Then create a core-site.xml file which you make sure is on the classpath both in your driver and your cluster (e.g. you can make sure it ends up in one of the jars you send with setJars above) with this content (with YOUR_* replaced): configuration propertynamefs.gs.impl/namevaluecom.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem/value/property propertynamefs.gs.project.id /namevalueYOUR_PROJECT_ID/value/property propertynamefs.gs.system.bucket/namevalueYOUR_FAVORITE_BUCKET/value/property /configuration From this point on you can simply use gs://... filenames to read/write data on Cloud Storage. Note that you should run your cluster and driver program on Google Compute Engine for this to work as is. Probably it's possible to configure access from the outside too but we didn't do that. Hope this helps, Andras
Caused by: java.lang.OutOfMemoryError: unable to create new native thread
I just upgraded my Spark version to 1.0.0_SNAPSHOT. commit f25ebed9f4552bc2c88a96aef06729d9fc2ee5b3 Author: witgo wi...@qq.com Date: Fri May 2 12:40:27 2014 -0700 I'm running a standalone cluster with 3 workers. - *Workers:* 3 - *Cores:* 48 Total, 0 Used - *Memory:* 469.8 GB Total, 0.0 B Used However, when I try to run bin/spark-shell I get the following error after sometime even if I don't perform any operations on the Spark shell. 14/05/05 10:20:52 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down. Exception in thread main java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:622) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:256) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:54) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) *Caused by: java.lang.OutOfMemoryError: unable to create new native thread* at java.lang.Thread.start0(Native Method) at java.lang.Thread.start(Thread.java:679) at java.lang.UNIXProcess$1.run(UNIXProcess.java:157) at java.security.AccessController.doPrivileged(Native Method) at java.lang.UNIXProcess.init(UNIXProcess.java:119) at java.lang.ProcessImpl.start(ProcessImpl.java:81) at java.lang.ProcessBuilder.start(ProcessBuilder.java:470) at java.lang.Runtime.exec(Runtime.java:612) at java.lang.Runtime.exec(Runtime.java:485) at scala.tools.jline.internal.TerminalLineSettings.exec(TerminalLineSettings.java:178) at scala.tools.jline.internal.TerminalLineSettings.exec(TerminalLineSettings.java:168) at scala.tools.jline.internal.TerminalLineSettings.stty(TerminalLineSettings.java:163) at scala.tools.jline.internal.TerminalLineSettings.get(TerminalLineSettings.java:67) at scala.tools.jline.internal.TerminalLineSettings.getProperty(TerminalLineSettings.java:87) at scala.tools.jline.UnixTerminal.readVirtualKey(UnixTerminal.java:127) at scala.tools.jline.console.ConsoleReader.readVirtualKey(ConsoleReader.java:933) at org.apache.spark.repl.SparkJLineReader$JLineConsoleReader.readOneKey(SparkJLineReader.scala:54) at org.apache.spark.repl.SparkJLineReader.readOneKey(SparkJLineReader.scala:81) at scala.tools.nsc.interpreter.InteractiveReader$class.readYesOrNo(InteractiveReader.scala:29) at org.apache.spark.repl.SparkJLineReader.readYesOrNo(SparkJLineReader.scala:25) at org.apache.spark.repl.SparkILoop$$anonfun$1.org $apache$spark$repl$SparkILoop$$anonfun$$fn$1(SparkILoop.scala:576) at org.apache.spark.repl.SparkILoop$$anonfun$1$$anonfun$org$apache$spark$repl$SparkILoop$$anonfun$$fn$1$1.apply$mcZ$sp(SparkILoop.scala:576) at scala.tools.nsc.interpreter.InteractiveReader$class.readYesOrNo(InteractiveReader.scala:32) at org.apache.spark.repl.SparkJLineReader.readYesOrNo(SparkJLineReader.scala:25) at org.apache.spark.repl.SparkILoop$$anonfun$1.org $apache$spark$repl$SparkILoop$$anonfun$$fn$1(SparkILoop.scala:576) at org.apache.spark.repl.SparkILoop$$anonfun$1$$anonfun$org$apache$spark$repl$SparkILoop$$anonfun$$fn$1$1.apply$mcZ$sp(SparkILoop.scala:576) at scala.tools.nsc.interpreter.InteractiveReader$class.readYesOrNo(InteractiveReader.scala:32) at org.apache.spark.repl.SparkJLineReader.readYesOrNo(SparkJLineReader.scala:25) at org.apache.spark.repl.SparkILoop$$anonfun$1.org $apache$spark$repl$SparkILoop$$anonfun$$fn$1(SparkILoop.scala:576) at org.apache.spark.repl.SparkILoop$$anonfun$1.applyOrElse(SparkILoop.scala:579) at org.apache.spark.repl.SparkILoop$$anonfun$1.applyOrElse(SparkILoop.scala:566) at scala.runtime.AbstractPartialFunction$mcZL$sp.apply$mcZL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcZL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcZL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:608) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:611) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:936) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:884) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:983) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) ... 7 more
RE: another updateStateByKey question - updated w possible Spark bug
I’ve encountered this issue again and am able to reproduce it about 10% of the time. 1. Here is the input: RDD[ (a, 126232566, 1), (a, 126232566, 2) ] RDD[ (a, 126232566, 1), (a, 126232566, 3) ] RDD[ (a, 126232566, 3) ] RDD[ (a, 126232566, 4) ] RDD[ (a, 126232566, 2) ] RDD[ (a, 126232566, 5), (a, 126232566, 5) ] 2. Here are the actual results (printed DStream – each line is a new RDD with RDD Id being the last number on each line): (((a,126232566),StateClass(3,2,ArrayBuffer(1.0, 2.0))),6) (((a,126232566),StateClass(7,4,ArrayBuffer(1.0, 3.0))),13) (((a,126232566),StateClass(10,5,ArrayBuffer(3.0))),20) (((a,126232566),StateClass(10,5,ArrayBuffer())),26) -empty elements Seq[V] (((a,126232566),StateClass(14,6,ArrayBuffer(4.0))),33) (((a,126232566),StateClass(16,7,ArrayBuffer(2.0))),40) (((a,126232566),StateClass(26,9,ArrayBuffer(5.0, 5.0))),47) (((a,126232566),StateClass(26,9,ArrayBuffer())),53) -empty elements Seq[V] (((a,126232566),StateClass(26,9,ArrayBuffer())),59) -empty elements Seq[V] 3. Here are the expected results: (all tuples from #2 except those with empty Seq[V] ) (((a,126232566),StateClass(3,2,ArrayBuffer(1.0, 2.0))),6) (((a,126232566),StateClass(7,4,ArrayBuffer(1.0, 3.0))),13) (((a,126232566),StateClass(10,5,ArrayBuffer(3.0))),20) (((a,126232566),StateClass(14,6,ArrayBuffer(4.0))),33) (((a,126232566),StateClass(16,7,ArrayBuffer(2.0))),40) (((a,126232566),StateClass(26,9,ArrayBuffer(5.0, 5.0))),47) 4. Here is the code: case class StateClass(sum:Integer, count:Integer, elements:Seq[Double]) val updateSumFunc = (values: Seq[(String, Long, Int)], state: Option[StateClass]) = { // if (values.isEmpty) { //// if RDD cannot find values for this key (which is from prev RDD, //// the tuple will not be shown in this RDD w values of 0 //None // } else { val previousState = state.getOrElse(StateClass(0, 0, Seq())) val currentCount = values.size + previousState.count var currentSum=0 for (newValue - values) yield ({ currentSum = currentSum + newValue._3 }) currentSum= currentSum +previousState.sum val elements = for (newValues - values) yield ({ newValues._3.toDouble }) Some(StateClass(currentSum, currentCount, elements)) // } } val partialResultSums= inputStream.map((x:(String, Long, Int)) =((x._1), (x._1, x._2, x._3))) //re map .updateStateByKey[StateClass](updateSumFunc) //update state .transform(rdd=rdd.map(t=(t,rdd.id))) //add RDD ID to RDD tuples partialResultSums.print() Now this is how I generate the RDDs and I suspect the delay is why the issue surfaces: rddQueue += ssc.sparkContext.makeRDD(smallWindow1) // smallWindow1 = List[(String, Long, Int)]( (a, 126232566, 1), (a, 126232566, 2) ) Thread.sleep(1100) rddQueue += ssc.sparkContext.makeRDD(smallWindow2) // smallWindow2= List[(String, Long, Int)]((a, 126232566, 1), (a, 126232566, 3)) Thread.sleep(1100) rddQueue += ssc.sparkContext.makeRDD(smallWindow3) // smallWindow3= List[(String, Long, Int)]((a, 126232566, 3)) Thread.sleep(1100) rddQueue += ssc.sparkContext.makeRDD(smallWindow4) // smallWindow4= List[(String, Long, Int)]((a, 126232566, 4)) Thread.sleep(1100) rddQueue += ssc.sparkContext.makeRDD(smallWindow5) // smallWindow5= List[(String, Long, Int)]((a, 126232566, 2)) Thread.sleep(1100) rddQueue += ssc.sparkContext.makeRDD(smallWindow6) // smallWindow6= List[(String, Long, Int)]((a, 126232566, 5), (a, 126232566, 5)) Thread.sleep(3100) //ssc.awaitTermination() ssc.stop() In my use case when I detect an empty Seq[V] in updateStateByKey function I return None so I can filter the tuples out. However, given that Spark calls updateStateByKey function with empty Seq[V] when it should not, messes my logic up. I wonder how to bypass this bug/feature of Spark. Thanks -Adrian From: Tathagata Das [mailto:tathagata.das1...@gmail.com] Sent: May-02-14 3:10 PM To: user@spark.apache.org Cc: u...@spark.incubator.apache.org Subject: Re: another updateStateByKey question Could be a bug. Can you share a code with data that I can use to reproduce this? TD On May 2, 2014 9:49 AM, Adrian Mocanu amoc...@verticalscope.commailto:amoc...@verticalscope.com wrote: Has anyone else noticed that sometimes the same tuple calls update state function twice? I have 2 tuples with the same key in 1 RDD part of DStream: RDD[ (a,1), (a,2) ] When the update function is called the first time Seq[V] has data: 1, 2 which is correct: StateClass(3,2, ArrayBuffer(1, 2)) Then right away (in my output I see this) the same key is used and the function is called again but this time Seq is empty: StateClass(3,2, ArrayBuffer( )) In the update function I also save Seq[V] to state so I can see it in the RDD. I also
RE: another updateStateByKey question - updated w possible Spark bug
Forgot to mention my batch interval is 1 second: val ssc = new StreamingContext(conf, Seconds(1)) hence the Thread.sleep(1100) From: Adrian Mocanu [mailto:amoc...@verticalscope.com] Sent: May-05-14 12:06 PM To: user@spark.apache.org Cc: u...@spark.incubator.apache.org Subject: RE: another updateStateByKey question - updated w possible Spark bug I’ve encountered this issue again and am able to reproduce it about 10% of the time. 1. Here is the input: RDD[ (a, 126232566, 1), (a, 126232566, 2) ] RDD[ (a, 126232566, 1), (a, 126232566, 3) ] RDD[ (a, 126232566, 3) ] RDD[ (a, 126232566, 4) ] RDD[ (a, 126232566, 2) ] RDD[ (a, 126232566, 5), (a, 126232566, 5) ] 2. Here are the actual results (printed DStream – each line is a new RDD with RDD Id being the last number on each line): (((a,126232566),StateClass(3,2,ArrayBuffer(1.0, 2.0))),6) (((a,126232566),StateClass(7,4,ArrayBuffer(1.0, 3.0))),13) (((a,126232566),StateClass(10,5,ArrayBuffer(3.0))),20) (((a,126232566),StateClass(10,5,ArrayBuffer())),26) -empty elements Seq[V] (((a,126232566),StateClass(14,6,ArrayBuffer(4.0))),33) (((a,126232566),StateClass(16,7,ArrayBuffer(2.0))),40) (((a,126232566),StateClass(26,9,ArrayBuffer(5.0, 5.0))),47) (((a,126232566),StateClass(26,9,ArrayBuffer())),53) -empty elements Seq[V] (((a,126232566),StateClass(26,9,ArrayBuffer())),59) -empty elements Seq[V] 3. Here are the expected results: (all tuples from #2 except those with empty Seq[V] ) (((a,126232566),StateClass(3,2,ArrayBuffer(1.0, 2.0))),6) (((a,126232566),StateClass(7,4,ArrayBuffer(1.0, 3.0))),13) (((a,126232566),StateClass(10,5,ArrayBuffer(3.0))),20) (((a,126232566),StateClass(14,6,ArrayBuffer(4.0))),33) (((a,126232566),StateClass(16,7,ArrayBuffer(2.0))),40) (((a,126232566),StateClass(26,9,ArrayBuffer(5.0, 5.0))),47) 4. Here is the code: case class StateClass(sum:Integer, count:Integer, elements:Seq[Double]) val updateSumFunc = (values: Seq[(String, Long, Int)], state: Option[StateClass]) = { // if (values.isEmpty) { //// if RDD cannot find values for this key (which is from prev RDD, //// the tuple will not be shown in this RDD w values of 0 //None // } else { val previousState = state.getOrElse(StateClass(0, 0, Seq())) val currentCount = values.size + previousState.count var currentSum=0 for (newValue - values) yield ({ currentSum = currentSum + newValue._3 }) currentSum= currentSum +previousState.sum val elements = for (newValues - values) yield ({ newValues._3.toDouble }) Some(StateClass(currentSum, currentCount, elements)) // } } val partialResultSums= inputStream.map((x:(String, Long, Int)) =((x._1), (x._1, x._2, x._3))) //re map .updateStateByKey[StateClass](updateSumFunc) //update state .transform(rdd=rdd.map(t=(t,rdd.id))) //add RDD ID to RDD tuples partialResultSums.print() Now this is how I generate the RDDs and I suspect the delay is why the issue surfaces: rddQueue += ssc.sparkContext.makeRDD(smallWindow1) // smallWindow1 = List[(String, Long, Int)]( (a, 126232566, 1), (a, 126232566, 2) ) Thread.sleep(1100) rddQueue += ssc.sparkContext.makeRDD(smallWindow2) // smallWindow2= List[(String, Long, Int)]((a, 126232566, 1), (a, 126232566, 3)) Thread.sleep(1100) rddQueue += ssc.sparkContext.makeRDD(smallWindow3) // smallWindow3= List[(String, Long, Int)]((a, 126232566, 3)) Thread.sleep(1100) rddQueue += ssc.sparkContext.makeRDD(smallWindow4) // smallWindow4= List[(String, Long, Int)]((a, 126232566, 4)) Thread.sleep(1100) rddQueue += ssc.sparkContext.makeRDD(smallWindow5) // smallWindow5= List[(String, Long, Int)]((a, 126232566, 2)) Thread.sleep(1100) rddQueue += ssc.sparkContext.makeRDD(smallWindow6) // smallWindow6= List[(String, Long, Int)]((a, 126232566, 5), (a, 126232566, 5)) Thread.sleep(3100) //ssc.awaitTermination() ssc.stop() In my use case when I detect an empty Seq[V] in updateStateByKey function I return None so I can filter the tuples out. However, given that Spark calls updateStateByKey function with empty Seq[V] when it should not, messes my logic up. I wonder how to bypass this bug/feature of Spark. Thanks -Adrian From: Tathagata Das [mailto:tathagata.das1...@gmail.com] Sent: May-02-14 3:10 PM To: user@spark.apache.orgmailto:user@spark.apache.org Cc: u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org Subject: Re: another updateStateByKey question Could be a bug. Can you share a code with data that I can use to reproduce this? TD On May 2, 2014 9:49 AM, Adrian Mocanu amoc...@verticalscope.commailto:amoc...@verticalscope.com wrote: Has anyone else noticed that sometimes the same tuple calls update state function twice? I have 2 tuples with the same key
Comprehensive Port Configuration reference?
Is there somewhere documented how one would go about configuring every open port a spark application needs? This seems like one of the main things that make running spark hard in places like EC2 where you arent using the canned spark scripts. Starting an app looks like you'll see ports open for BlockManager OutoutTracker FileServer WebUI Local port to get callbacks from mesos master.. What else? How do I configure all of these? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Comprehensive-Port-Configuration-reference-tp5384.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Cache issue for iteration with broadcast
Yes, I've tried. The problem is new broadcast object generated by every step until eat up all of the memory. I solved it by using RDD.checkpoint to remove dependences to old broadcast object, and use cleanner.ttl to clean up these broadcast object automatically. If there's more elegant way to solve this problem, please tell me:) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Solved-Cache-issue-for-iteration-with-broadcast-tp5350p5385.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark GCE Script
Very cool! Have you thought about sending this as a pull request? We’d be happy to maintain it inside Spark, though it might be interesting to find a single Python package that can manage clusters across both EC2 and GCE. Matei On May 5, 2014, at 7:18 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Hi Sparkers, We have created a quick spark_gce script which can launch a spark cluster in the Google Cloud. I'm sharing it because it might be helpful for someone using the Google Cloud for deployment rather than AWS. Here's the link to the script https://github.com/sigmoidanalytics/spark_gce Feel free to use it and suggest any feedback around it. In short here's what it does: Just like the spark_ec2 script, this one also reads certain command-line arguments (See the github page for more details) like the cluster name and all, then starts the machines in the google cloud, sets up the network, adds a 500GB empty disk to all machines, generate the ssh keys on master and transfer it to all slaves and install java and downloads and configures Spark/Shark/Hadoop. Also it starts the shark server automatically. Currently the version is 0.9.1 but I'm happy to add/support more versions if anyone is interested. Cheers. Thanks Best Regards
Problem with sharing class across worker nodes using spark-shell on Spark 1.0.0
Hi, I'm trying to run a simple Spark job that uses a 3rd party class (in this case twitter4j.Status) in the spark-shell using spark-1.0.0_SNAPSHOT I'm starting my bin/spark-shell with the following command. ./spark-shell *--driver-class-path*$LIBPATH/jodatime2.3/joda-convert-1.2.jar:$LIBPATH/jodatime2.3/joda-time-2.3/joda-time-2.3.jar:$LIBPATH/twitter4j-core-3.0.5.jar *--jars* $LIBPATH/jodatime2.3/joda-convert-1.2.jar,$LIBPATH/jodatime2.3/joda-time-2.3/joda-time-2.3.jar,$LIBPATH/twitter4j-core-3.0.5.jar My code was working fine in 0.9.1 when I used the following options that were pointing to the same jar above. export SPARK_CLASSPATH export ADD_JAR Now I'm getting a NoClassDefFoundError on each of my worker nodes 14/05/05 14:03:30 INFO TaskSetManager: Loss was due to java.lang.NoClassDefFoundError: twitter4j/Status [duplicate 40] 14/05/05 14:03:30 INFO TaskSetManager: Starting task 0.0:26 as TID 73 on executor 2: *worker1.xxx..* (NODE_LOCAL) What am I missing here? Thanks -Soumya
Re: Spark GCE Script
Has anyone considered using jclouds tooling to support multiple cloud providers? Maybe using Pallet? François On May 5, 2014, at 3:22 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: I second this motion. :) A unified cloud deployment tool would be absolutely great. On Mon, May 5, 2014 at 1:34 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Very cool! Have you thought about sending this as a pull request? We’d be happy to maintain it inside Spark, though it might be interesting to find a single Python package that can manage clusters across both EC2 and GCE. Matei On May 5, 2014, at 7:18 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Hi Sparkers, We have created a quick spark_gce script which can launch a spark cluster in the Google Cloud. I'm sharing it because it might be helpful for someone using the Google Cloud for deployment rather than AWS. Here's the link to the script https://github.com/sigmoidanalytics/spark_gce Feel free to use it and suggest any feedback around it. In short here's what it does: Just like the spark_ec2 script, this one also reads certain command-line arguments (See the github page for more details) like the cluster name and all, then starts the machines in the google cloud, sets up the network, adds a 500GB empty disk to all machines, generate the ssh keys on master and transfer it to all slaves and install java and downloads and configures Spark/Shark/Hadoop. Also it starts the shark server automatically. Currently the version is 0.9.1 but I'm happy to add/support more versions if anyone is interested. Cheers. Thanks Best Regards
Re: performance improvement on second operation...without caching?
Ethan, you're not the only one, which is why I was asking about this! :-) Matei, thanks for your response. your answer explains the performance jump in my code, but shows I've missed something key in my understanding of Spark! I was not aware until just now that map output was saved to disk (other than if explicitly told to do use using persist.) It raises almost as many questions as it answers. Where are the shuffle files saved? Locally on the mapper nodes? Is it the same location that disk-spilled cache is saved to? Doesn't the necessity of saving to disk result in increased i/o that would slow the job down? I thought part of the goal of Spark was to do everything in memory unless the user specifically chose to persist...thereby making a choice to incur time/disk space expense up front in return for fast failure recovery? Not that I'm complaining, mind you, but I do think people should be made clearthis not only affects performance, but also, for instance, whether the data is fresh/out of date. I had assumed if I did not set caching, that each time I performed an operation on an RDD, it would re-compute based on lineage, including re-reading the files...so I didn't have to worry about the possibility of my file content changing. But if it's auto-caching shuffle files, my base files won't get re-read even if the content has changed. (Or does it check timestamps?) Thanks, Diana On Mon, May 5, 2014 at 11:07 AM, Ethan Jewett esjew...@gmail.com wrote: Thanks Patrick and Matei for the clarification. I actually have to update some code now, as I was apparently relying on the fact that the output files are being re-used. Explains some edge-case behavior that I've seen. For me, at least, I read the guide, did some tests on fairly extensive RDD dependency graphs, saw that tasks earlier in the dependency graphs were not being regenerated and assumed (very much incorrectly I just found out!) that it was because the RDDs themselves were being cached. I wonder if there is a way to explain this distinction concisely in the programming guide. Or maybe I'm the only one that went down this incorrect learning path :-) Ethan On Sun, May 4, 2014 at 12:05 AM, Matei Zaharia matei.zaha...@gmail.comwrote: Yes, this happens as long as you use the same RDD. For example say you do the following: data1 = sc.textFile(…).map(…).reduceByKey(…) data1.count() data1.filter(…).count() The first count() causes outputs of the map/reduce pair in there to be written out to shuffle files. Next time you do a count, on either this RDD or a child (e.g. after the filter), we notice that output files were already generated for this shuffle so we don’t rerun the map stage. Note that the output does get read again over the network, which is kind of wasteful (if you really wanted to reuse this as quickly as possible you’d use cache()). Matei On May 3, 2014, at 8:44 PM, Koert Kuipers ko...@tresata.com wrote: Hey Matei, Not sure i understand that. These are 2 separate jobs. So the second job takes advantage of the fact that there is map output left somewhere on disk from the first job, and re-uses that? On Sat, May 3, 2014 at 8:29 PM, Matei Zaharia matei.zaha...@gmail.comwrote: Hi Diana, Apart from these reasons, in a multi-stage job, Spark saves the map output files from map stages to the filesystem, so it only needs to rerun the last reduce stage. This is why you only saw one stage executing. These files are saved for fault recovery but they speed up subsequent runs. Matei On May 3, 2014, at 5:21 PM, Patrick Wendell pwend...@gmail.com wrote: Ethan, What you said is actually not true, Spark won't cache RDD's unless you ask it to. The observation here - that running the same job can speed up substantially even without caching - is common. This is because other components in the stack are performing caching and optimizations. Two that can make a huge difference are: 1. The OS buffer cache. Which will keep recently read disk blocks in memory. 2. The Java just-in-time compiler (JIT) which will use runtime profiling to significantly speed up execution speed. These can make a huge difference if you are running the same job over-and-over. And there are other things like the OS network stack increasing TCP windows and so fourth. These will all improve response time as a spark program executes. On Fri, May 2, 2014 at 9:27 AM, Ethan Jewett esjew...@gmail.com wrote: I believe Spark caches RDDs it has memory for regardless of whether you actually call the 'cache' method on the RDD. The 'cache' method just tips off Spark that the RDD should have higher priority. At least, that is my experience and it seems to correspond with your experience and with my recollection of other discussions on this topic on the list. However, going back and looking at the programming guide, this is not the way the cache/persist behavior is described. Does the guide need to be
Re: spark streaming question
One main reason why Spark Streaming can achieve higher throughput than Storm is because Spark Streaming operates in coarser-grained batches - second-scale massive batches - which reduce per-tuple of overheads in shuffles, and other kinds of data movements, etc. Note that, this is also true that this increased throughput does not come for free: larger batches --- larger end-to-end latency. Storm may give a lower end-to-end latency than Spark Streaming (second-scale latency with second-scale batches). However, we have observed that for a large variety of streaming usecases, people are often okay with second-scale latencies but find it much harder work around the atleast-once semantics (double-counting, etc.) and lack of in-built state management (state kept locally in worker can get lost if worker dies). Plus Spark Streaming has the major advantage of having a simpler, higher-level API than Storm and the whole Spark ecosystem (Spark SQL, MLlib, etc.) around it that it can use for writing streaming analytics applications very easily. Regarding Trident, we have heard from many developers that Trident gives lower throughput than Storm due to its transactional guarantees. Its hard to say the reasons behind the performance penalty without doing a very detailed head-to-head analysis. TD On Sun, May 4, 2014 at 5:11 PM, Chris Fregly ch...@fregly.com wrote: great questions, weide. in addition, i'd also like to hear more about how to horizontally scale a spark-streaming cluster. i've gone through the samples (standalone mode) and read the documentation, but it's still not clear to me how to scale this puppy out under high load. i assume i add more receivers (kinesis, flume, etc), but physically how does this work? @TD: can you comment? thanks! -chris On Sun, May 4, 2014 at 2:10 PM, Weide Zhang weo...@gmail.com wrote: Hi , It might be a very general question to ask here but I'm curious to know why spark streaming can achieve better throughput than storm as claimed in the spark streaming paper. Does it depend on certain use cases and/or data source ? What drives better performance in spark streaming case or in other ways, what makes storm not as performant as spark streaming ? Also, in order to guarantee exact-once semantics when node failure happens, spark makes replicas of RDDs and checkpoints so that data can be recomputed on the fly while on Trident case, they use transactional object to persist the state and result but it's not obvious to me which approach is more costly and why ? Any one can provide some experience here ? Thanks a lot, Weide
Local Dev Env with Mesos + Spark Streaming on Docker: Can't submit jobs.
Hi all, I'm currently working on creating a set of docker images to facilitate local development with Spark/streaming on Mesos (+zk, hdfs, kafka) After solving the initial hurdles to get things working together in docker containers, now everything seems to start-up correctly and the mesos UI shows slaves as they are started. I'm trying to submit a job from IntelliJ and the jobs submissions seem to get lost in Mesos translation. The logs are not helping me to figure out what's wrong, so I'm posting them here in the hope that they can ring a bell and somebdoy could provide me a hint on what's wrong/missing with my setup. DRIVER (IntelliJ running a Job.scala main) 14/05/05 21:52:31 INFO MetadataCleaner: Ran metadata cleaner for SHUFFLE_BLOCK_MANAGER 14/05/05 21:52:31 INFO BlockManager: Dropping broadcast blocks older than 1399319251962 14/05/05 21:52:31 INFO BlockManager: Dropping non broadcast blocks older than 1399319251962 14/05/05 21:52:31 INFO MetadataCleaner: Ran metadata cleaner for BROADCAST_VARS 14/05/05 21:52:31 INFO MetadataCleaner: Ran metadata cleaner for BLOCK_MANAGER 14/05/05 21:52:32 INFO MetadataCleaner: Ran metadata cleaner for HTTP_BROADCAST 14/05/05 21:52:32 INFO MetadataCleaner: Ran metadata cleaner for MAP_OUTPUT_TRACKER 14/05/05 21:52:32 INFO MetadataCleaner: Ran metadata cleaner for SPARK_CONTEXT MESOS MASTER I0505 19:52:39.718080 388 master.cpp:690] Registering framework 201405051517-67113388-5050-383-6995 at scheduler(1)@127.0.1.1:58115 I0505 19:52:39.718261 388 master.cpp:493] Framework 201405051517-67113388-5050-383-6995 disconnected I0505 19:52:39.718277 389 hierarchical_allocator_process.hpp:332] Added framework 201405051517-67113388-5050-383-6995 I0505 19:52:39.718312 388 master.cpp:520] Giving framework 201405051517-67113388-5050-383-6995 0ns to failover I0505 19:52:39.718431 389 hierarchical_allocator_process.hpp:408] Deactivated framework 201405051517-67113388-5050-383-6995 W0505 19:52:39.718459 388 master.cpp:1388] Master returning resources offered to framework 201405051517-67113388-5050-383-6995 because the framework has terminated or is inactive I0505 19:52:39.718567 388 master.cpp:1376] Framework failover timeout, removing framework 201405051517-67113388-5050-383-6995 MESOS SLAVE I0505 19:49:27.66201920 slave.cpp:1191] Asked to shut down framework 201405051517-67113388-5050-383-6803 by master@172.17.0.4:5050 W0505 19:49:27.66207220 slave.cpp:1206] Cannot shut down unknown framework 201405051517-67113388-5050-383-6803 I0505 19:49:28.66215318 slave.cpp:1191] Asked to shut down framework 201405051517-67113388-5050-383-6804 by master@172.17.0.4:5050 W0505 19:49:28.66221218 slave.cpp:1206] Cannot shut down unknown framework 201405051517-67113388-5050-383-6804 I0505 19:49:29.66219913 slave.cpp:1191] Asked to shut down framework 201405051517-67113388-5050-383-6805 by master@172.17.0.4:5050 W0505 19:49:29.66225613 slave.cpp:1206] Cannot shut down unknown framework 201405051517-67113388-5050-383-6805 I0505 19:49:30.66244316 slave.cpp:1191] Asked to shut down framework 201405051517-67113388-5050-383-6806 by master@172.17.0.4:5050 W0505 19:49:30.66248916 slave.cpp:1206] Cannot shut down unknown framework 201405051517-67113388-5050-383-6806 Thanks in advance, Gerard.
Re: Spark Streaming and JMS
A few high-level suggestions. 1. I recommend using the new Receiver API in almost-released Spark 1.0 (see branch-1.0 / master branch on github). Its a slightly better version of the earlier NetworkReceiver, as it hides away blockgenerator (which needed to be unnecessarily manually started and stopped) and add other lifecycle management methods like stop, restart, reportError to deal with errors in receiving data. Also, adds ability to write custom receiver from Java. Take a look at this examplehttps://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala of writing custom receiver in the new API. I am updating the custom receiver guide right now (https://github.com/apache/spark/pull/652). 2. Once you create a JMSReceiver class by extending NetworkReceiver/Receiver, you can create DStream out of the receiver by val jmsStream = ssc.networkStream(new JMSReceiver()) 3. As far as i understand from seeing the docs of akka,camel.Consumerhttp://doc.akka.io/api/akka/2.3.2/index.html#akka.camel.Consumer, it is essentially a specialized Akka actor. For Akka actors, there is a ssc.actorStream, where you can specify your own actor class. You get actor supervision (and therefore error handling, etc.) with that. See the example AkkaWordCount - old style using NetworkReceiverhttps://github.com/apache/spark/blob/branch-0.9/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala, or new style using Receiverhttps://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala . I havent personally played around with JMS before so cant comment much on JMS specific intricacies. TD On Mon, May 5, 2014 at 5:31 AM, Patrick McGloin mcgloin.patr...@gmail.comwrote: Hi all, Is there a best practice for subscribing to JMS with Spark Streaming? I have searched but not found anything conclusive. In the absence of a standard practice the solution I was thinking of was to use Akka + Camel (akka.camel.Consumer) to create a subscription for a Spark Streaming Custom Receiver. So the actor would look something like this: class JmsReceiver(jmsURI: String) extends NetworkReceiver[String] with Consumer { //e.g. jms:sonicmq://localhost:2506/queue?destination=SampleQ1 def endpointUri = jmsURI lazy val blockGenerator = new BlockGenerator(StorageLevel.MEMORY_ONLY_SER) protected override def onStart() { blockGenerator.start } def receive = { case msg: CamelMessage = { blockGenerator += msg.body } case _ = { /* ... */ } } protected override def onStop() { blockGenerator.stop } } And then in the main application create receivers like this: val ssc = new StreamingContext(...) object tascQueue extends JmsReceiver[String](ssc) { override def getReceiver():JmsReceiver[String] = { new JmsReceiver(jms :sonicmq://localhost:2506/queue?destination=TascQueue) } } ssc.registerInputStream(tascQueue) Is this the best way to go? Best regards, Patrick
Re: Local Dev Env with Mesos + Spark Streaming on Docker: Can't submit jobs.
Hi Benjamin, Yes, we initially used a modified version of the AmpLabs docker scripts [1]. The amplab docker images are a good starting point. One of the biggest hurdles has been HDFS, which requires reverse-DNS and I didn't want to go the dnsmasq route to keep the containers relatively simple to use without the need of external scripts. Ended up running a 1-node setup nnode+dnode. I'm still looking for a better solution for HDFS [2] Our usecase using docker is to easily create local dev environments both for development and for automated functional testing (using cucumber). My aim is to strongly reduce the time of the develop-deploy-test cycle. That also means that we run the minimum number of instances required to have a functionally working setup. E.g. 1 Zookeeper, 1 Kafka broker, ... For the actual cluster deployment we have Chef-based devops toolchain that put things in place on public cloud providers. Personally, I think Docker rocks and would like to replace those complex cookbooks with Dockerfiles once the technology is mature enough. -greetz, Gerard. [1] https://github.com/amplab/docker-scripts [2] http://stackoverflow.com/questions/23410505/how-to-run-hdfs-cluster-without-dns On Mon, May 5, 2014 at 11:00 PM, Benjamin bboui...@gmail.com wrote: Hi, Before considering running on Mesos, did you try to submit the application on Spark deployed without Mesos on Docker containers ? Currently investigating this idea to deploy quickly a complete set of clusters with Docker, I'm interested by your findings on sharing the settings of Kafka and Zookeeper across nodes. How many broker and zookeeper do you use ? Regards, On Mon, May 5, 2014 at 10:11 PM, Gerard Maas gerard.m...@gmail.comwrote: Hi all, I'm currently working on creating a set of docker images to facilitate local development with Spark/streaming on Mesos (+zk, hdfs, kafka) After solving the initial hurdles to get things working together in docker containers, now everything seems to start-up correctly and the mesos UI shows slaves as they are started. I'm trying to submit a job from IntelliJ and the jobs submissions seem to get lost in Mesos translation. The logs are not helping me to figure out what's wrong, so I'm posting them here in the hope that they can ring a bell and somebdoy could provide me a hint on what's wrong/missing with my setup. DRIVER (IntelliJ running a Job.scala main) 14/05/05 21:52:31 INFO MetadataCleaner: Ran metadata cleaner for SHUFFLE_BLOCK_MANAGER 14/05/05 21:52:31 INFO BlockManager: Dropping broadcast blocks older than 1399319251962 14/05/05 21:52:31 INFO BlockManager: Dropping non broadcast blocks older than 1399319251962 14/05/05 21:52:31 INFO MetadataCleaner: Ran metadata cleaner for BROADCAST_VARS 14/05/05 21:52:31 INFO MetadataCleaner: Ran metadata cleaner for BLOCK_MANAGER 14/05/05 21:52:32 INFO MetadataCleaner: Ran metadata cleaner for HTTP_BROADCAST 14/05/05 21:52:32 INFO MetadataCleaner: Ran metadata cleaner for MAP_OUTPUT_TRACKER 14/05/05 21:52:32 INFO MetadataCleaner: Ran metadata cleaner for SPARK_CONTEXT MESOS MASTER I0505 19:52:39.718080 388 master.cpp:690] Registering framework 201405051517-67113388-5050-383-6995 at scheduler(1)@127.0.1.1:58115 I0505 19:52:39.718261 388 master.cpp:493] Framework 201405051517-67113388-5050-383-6995 disconnected I0505 19:52:39.718277 389 hierarchical_allocator_process.hpp:332] Added framework 201405051517-67113388-5050-383-6995 I0505 19:52:39.718312 388 master.cpp:520] Giving framework 201405051517-67113388-5050-383-6995 0ns to failover I0505 19:52:39.718431 389 hierarchical_allocator_process.hpp:408] Deactivated framework 201405051517-67113388-5050-383-6995 W0505 19:52:39.718459 388 master.cpp:1388] Master returning resources offered to framework 201405051517-67113388-5050-383-6995 because the framework has terminated or is inactive I0505 19:52:39.718567 388 master.cpp:1376] Framework failover timeout, removing framework 201405051517-67113388-5050-383-6995 MESOS SLAVE I0505 19:49:27.66201920 slave.cpp:1191] Asked to shut down framework 201405051517-67113388-5050-383-6803 by master@172.17.0.4:5050 W0505 19:49:27.66207220 slave.cpp:1206] Cannot shut down unknown framework 201405051517-67113388-5050-383-6803 I0505 19:49:28.66215318 slave.cpp:1191] Asked to shut down framework 201405051517-67113388-5050-383-6804 by master@172.17.0.4:5050 W0505 19:49:28.66221218 slave.cpp:1206] Cannot shut down unknown framework 201405051517-67113388-5050-383-6804 I0505 19:49:29.66219913 slave.cpp:1191] Asked to shut down framework 201405051517-67113388-5050-383-6805 by master@172.17.0.4:5050 W0505 19:49:29.66225613 slave.cpp:1206] Cannot shut down unknown framework 201405051517-67113388-5050-383-6805 I0505 19:49:30.66244316 slave.cpp:1191] Asked to shut down framework 201405051517-67113388-5050-383-6806 by master@172.17.0.4:5050
Spark 0.9.1 - saveAsSequenceFile and large RDD
Hi, Fairly new to Spark. I'm using Spark's saveAsSequenceFile() to write large Sequence Files to HDFS. The Sequence Files need to be large to be efficiently accessed in HDFS, preferably larger than Hadoop's block size, 64MB. The task works for files smaller than 64 MiB (with a warning for sequence files close to 64 MiB). For files larger than 64 MiB, the task fails with a libprotobuf error. Here is the full log: 14/05/05 18:18:00 INFO MesosSchedulerBackend: Registered as framework ID 201404231353-1315739402-5050-26649-0091 14/05/05 18:18:12 INFO SequenceFileRDDFunctions: Saving as sequence file of type (LongWritable,BytesWritable) 14/05/05 18:18:14 INFO SparkContext: Starting job: saveAsSequenceFile at X.scala:171 14/05/05 18:18:14 INFO DAGScheduler: Got job 0 (saveAsSequenceFile at X.scala:171) with 1 output partitions (allowLocal=false) 14/05/05 18:18:14 INFO DAGScheduler: Final stage: Stage 0 (saveAsSequenceFile at X.scala:171) 14/05/05 18:18:14 INFO DAGScheduler: Parents of final stage: List() 14/05/05 18:18:14 INFO DAGScheduler: Missing parents: List() 14/05/05 18:18:14 INFO DAGScheduler: Submitting Stage 0 (ParallelCollectionRDD[0] at makeRDD at X.scala:170), which has no missing parents 14/05/05 18:18:19 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (ParallelCollectionRDD[0] at makeRDD at X.scala:170) 14/05/05 18:18:19 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 14/05/05 18:18:19 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on executor 201404231353-1315739402-5050-26649-3: dn-04 (PROCESS_LOCAL) 14/05/05 18:18:23 INFO TaskSetManager: Serialized task 0.0:0 as 113006452 bytes in 3890 ms [libprotobuf ERROR google/protobuf/io/coded_stream.cc:171] A protocol message was rejected because it was too big (more than 67108864 bytes). To increase the limit (or to disable these warnings), see CodedInputStream::SetTotalBytesLimit() in google/protobuf/io/coded_stream.h. F0505 18:18:24.616025 27889 construct.cpp:48] Check failed: parsed Unexpected failure while parsing protobuf *** Check failure stack trace: *** @ 0x7fc8d49ba96d google::LogMessage::Fail() @ 0x7fc8d49be987 google::LogMessage::SendToLog() @ 0x7fc8d49bc809 google::LogMessage::Flush() @ 0x7fc8d49bcb0d google::LogMessageFatal::~LogMessageFatal() The code is fairly simple val kv = large Seq of Key Value pairs //set parallelism to 1 to keep the file from being partitioned sc.makeRDD(kv,1) .saveAsSequenceFile(path) Does anyone have any pointers on how to get past this? Thanks, -- *Allen Lee* Software Engineer MediaCrossing Inc.
Re: Incredible slow iterative computation
Update: Checkpointing it doesn't perform. I checked by the isCheckpointed method but it returns always false. ??? 2014-05-05 23:14 GMT+02:00 Andrea Esposito and1...@gmail.com: Checkpoint doesn't help it seems. I do it at each iteration/superstep. Looking deeply, the RDDs are recomputed just few times at the initial 'phase' after they aren't recomputed anymore. I attach screenshots: bootstrap phase, recompute section and after. This is still unexpected because i persist all the intermediate results. Anyway the time of each iteration degrates perpetually, as instance: at the first superstep it takes 3 sec and at 70 superstep it takes 8 sec. An iteration, looking at the screenshot, is from row 528 to 122. Any idea where to investigate? 2014-05-02 22:28 GMT+02:00 Andrew Ash and...@andrewash.com: If you end up with a really long dependency tree between RDDs (like 100+) people have reported success with using the .checkpoint() method. This computes the RDD and then saves it, flattening the dependency tree. It turns out that having a really long RDD dependency graph causes serialization sizes of tasks to go up, plus any failures causes a long sequence of operations to regenerate the missing partition. Maybe give that a shot and see if it helps? On Fri, May 2, 2014 at 3:29 AM, Andrea Esposito and1...@gmail.comwrote: Sorry for the very late answer. I carefully follow what you have pointed out and i figure out that the structure used for each record was too big with many small objects. Changing it the memory usage drastically decrease. Despite that i'm still struggling with the behaviour of decreasing performance along supersteps. Now the memory footprint is much less than before and GC time is not noticeable anymore. I supposed that some RDDs are recomputed and watching carefully the stages there is evidence of that but i don't understand why it's happening. Recalling my usage pattern: newRdd = oldRdd.map(myFun).persist(myStorageLevel) newRdd.foreach(x = {}) // Force evaluation oldRdd.unpersist(true) According to my usage pattern i tried to don't unpersist the intermediate RDDs (i.e. oldRdd) but nothing change. Any hints? How could i debug this? 2014-04-14 12:55 GMT+02:00 Andrew Ash and...@andrewash.com: A lot of your time is being spent in garbage collection (second image). Maybe your dataset doesn't easily fit into memory? Can you reduce the number of new objects created in myFun? How big are your heap sizes? Another observation is that in the 4th image some of your RDDs are massive and some are tiny. On Mon, Apr 14, 2014 at 11:45 AM, Andrea Esposito and1...@gmail.comwrote: Hi all, i'm developing an iterative computation over graphs but i'm struggling with some embarrassing low performaces. The computation is heavily iterative and i'm following this rdd usage pattern: newRdd = oldRdd.map(myFun).persist(myStorageLevel) newRdd.foreach(x = {}) // Force evaluation oldRdd.unpersist(true) I'm using a machine equips by 30 cores and 120 GB of RAM. As an example i've run with a small graph of 4000 verts and 80 thousand edges and the performance at the first iterations are 10+ minutes and after they take lots more. I attach the Spark UI screenshots of just the first 2 iterations. I tried with MEMORY_ONLY_SER and MEMORY_AND_DISK_SER and also i changed the spark.shuffle.memoryFraction to 0.3 but nothing changed (with so lot of RAM for 4E10 verts these settings are quite pointless i guess). How should i continue to investigate? Any advices are very very welcome, thanks. Best, EA
Re: Incredible slow iterative computation
checkpoint seems to be just add a CheckPoint mark? You need an action after marked it. I have tried it with success:) newRdd = oldRdd.map(myFun).persist(myStorageLevel) newRdd.checkpoint // checkpoint here newRdd.isCheckpointed // false here newRdd.foreach(x = {}) // Force evaluation newRdd.isCheckpointed // true here oldRdd.unpersist(true) If you have new broadcast object for each step of iteration, broadcast will eat up all of the memory. You may need to set spark.cleaner.ttl to a small enough value. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Incredible-slow-iterative-computation-tp4204p5407.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
答复: java.io.FileNotFoundException: /test/spark-0.9.1/work/app-20140505053550-0000/2/stdout (No such file or directory)
The file does not exist in fact and no permission issue. francis@ubuntu-4:/test/spark-0.9.1$ ll work/app-20140505053550-/ total 24 drwxrwxr-x 6 francis francis 4096 May 5 05:35 ./ drwxrwxr-x 11 francis francis 4096 May 5 06:18 ../ drwxrwxr-x 2 francis francis 4096 May 5 05:35 2/ drwxrwxr-x 2 francis francis 4096 May 5 05:35 4/ drwxrwxr-x 2 francis francis 4096 May 5 05:35 7/ drwxrwxr-x 2 francis francis 4096 May 5 05:35 9/ Francis 发件人: Tathagata Das [mailto:tathagata.das1...@gmail.com] 发送时间: Tuesday, May 06, 2014 3:45 收件人: user@spark.apache.org 主题: Re: java.io.FileNotFoundException: /test/spark-0.9.1/work/app-20140505053550-/2/stdout (No such file or directory) Do those file actually exist? Those stdout/stderr should have the output of the spark's executors running in the workers, and its weird that they dont exist. Could be permission issue - maybe the directories/files are not being generated because it cannot? TD On Mon, May 5, 2014 at 3:06 AM, Francis.Hu francis...@reachjunction.com wrote: Hi,All We run a spark cluster with three workers. created a spark streaming application, then run the spark project using below command: shell sbt run spark://192.168.219.129:7077 tcp://192.168.20.118:5556 foo we looked at the webui of workers, jobs failed without any error or info, but FileNotFoundException occurred in workers' log file as below: Is this an existent issue of spark? -in workers' logs/spark-francis-org.apache.spark.deploy.worker.Worker-1-ubuntu-4.out 14/05/05 02:39:39 WARN AbstractHttpConnection: /logPage/?appId=app-20140505053550-executorId=2logType=stdout java.io.FileNotFoundException: /test/spark-0.9.1/work/app-20140505053550-/2/stdout (No such file or directory) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.init(FileInputStream.java:138) at org.apache.spark.util.Utils$.offsetBytes(Utils.scala:687) at org.apache.spark.deploy.worker.ui.WorkerWebUI.logPage(WorkerWebUI.scala:119) at org.apache.spark.deploy.worker.ui.WorkerWebUI$$anonfun$6.apply(WorkerWebUI.scala:52) at org.apache.spark.deploy.worker.ui.WorkerWebUI$$anonfun$6.apply(WorkerWebUI.scala:52) at org.apache.spark.ui.JettyUtils$$anon$1.handle(JettyUtils.scala:61) at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1040) at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:976) at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:135) at org.eclipse.jetty.server.handler.HandlerList.handle(HandlerList.java:52) at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:116) at org.eclipse.jetty.server.Server.handle(Server.java:363) at org.eclipse.jetty.server.AbstractHttpConnection.handleRequest(AbstractHttpConnection.java:483) at org.eclipse.jetty.server.AbstractHttpConnection.headerComplete(AbstractHttpConnection.java:920) at org.eclipse.jetty.server.AbstractHttpConnection$RequestHandler.headerComplete(AbstractHttpConnection.java:982) at org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:635) at org.eclipse.jetty.http.HttpParser.parseAvailable(HttpParser.java:235) at org.eclipse.jetty.server.AsyncHttpConnection.handle(AsyncHttpConnection.java:82) at org.eclipse.jetty.io.nio.SelectChannelEndPoint.handle(SelectChannelEndPoint.java:628) at org.eclipse.jetty.io.nio.SelectChannelEndPoint$1.run(SelectChannelEndPoint.java:52) at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608) at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543) at java.lang.Thread.run(Thread.java:722) 14/05/05 02:39:41 WARN AbstractHttpConnection: /logPage/?appId=app-20140505053550-executorId=9logType=stderr java.io.FileNotFoundException: /test/spark-0.9.1/work/app-20140505053550-/9/stderr (No such file or directory) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.init(FileInputStream.java:138) at org.apache.spark.util.Utils$.offsetBytes(Utils.scala:687) at org.apache.spark.deploy.worker.ui.WorkerWebUI.logPage(WorkerWebUI.scala:119) at org.apache.spark.deploy.worker.ui.WorkerWebUI$$anonfun$6.apply(WorkerWebUI.scala:52) at org.apache.spark.deploy.worker.ui.WorkerWebUI$$anonfun$6.apply(WorkerWebUI.scala:52) at org.apache.spark.ui.JettyUtils$$anon$1.handle(JettyUtils.scala:61) at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1040) at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:976) at
details about event log
Hi, i'am looking at the event log, i'am a little confuse about some metrics here's the info of one task: Launch Time:1399336904603 Finish Time:1399336906465 Executor Run Time:1781 Shuffle Read Metrics:Shuffle Finish Time:1399336906027, Fetch Wait Time:0 Shuffle Write Metrics:{Shuffle Bytes Written:12865587,Shuffle Write Time:11804679} (Shuffle Finish Time - Launch Time) is the time to fetch block written by previous stage (Finish Time - Shuffle Finish Time) is the time to do the task job is that right? and what does Fetch Wait Time mean? i'am also confused about Shuffle Write Time, why is it so big? what's the measurement unit of it? thank you :) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/details-about-event-log-tp5411.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
RE: sbt/sbt run command returns a JVM problem
hi I still have over 1g left for my program. Date: Sun, 4 May 2014 19:14:30 -0700 From: ml-node+s1001560n5340...@n3.nabble.com To: gyz...@hotmail.com Subject: Re: sbt/sbt run command returns a JVM problem the total memory of your machine is 2G right?then how much memory is left free? wouldn`t ubuntu take up quite a big portion of 2G? just a guess! On Sat, May 3, 2014 at 8:15 PM, Carter [hidden email] wrote: Hi, thanks for all your help. I tried your setting in the sbt file, but the problem is still there. The Java setting in my sbt file is: java \ -Xmx1200m -XX:MaxPermSize=350m -XX:ReservedCodeCacheSize=256m \ -jar ${JAR} \ $@ I have tried to set these 3 parameters bigger and smaller, but nothing works. Did I change the right thing? Thank you very much. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/sbt-sbt-run-command-returns-a-JVM-problem-tp5157p5267.html Sent from the Apache Spark User List mailing list archive at Nabble.com. If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/sbt-sbt-run-command-returns-a-JVM-problem-tp5157p5340.html To unsubscribe from sbt/sbt run command returns a JVM problem, click here. NAML -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/sbt-sbt-run-command-returns-a-JVM-problem-tp5157p5412.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: How to use spark-submit
Yes, I'm struggling with a similar problem where my class are not found on the worker nodes. I'm using 1.0.0_SNAPSHOT. I would really appreciate if someone can provide some documentation on the usage of spark-submit. Thanks On May 5, 2014, at 10:24 PM, Stephen Boesch java...@gmail.com wrote: I have a spark streaming application that uses the external streaming modules (e.g. kafka, mqtt, ..) as well. It is not clear how to properly invoke the spark-submit script: what are the ---driver-class-path and/or -Dspark.executor.extraClassPath parameters required? For reference, the following error is proving difficult to resolve: java.lang.ClassNotFoundException: org.apache.spark.streaming.examples.StreamingExamples
Can I share RDD between a pyspark and spark API
Hi experts. I have some pre-built python parsers that I am planning to use, just because I don't want to write them again in scala. However after the data is parsed I would like to take the RDD and use it in a scala program.(Yes, I like scala more than python and more comfortable in scala :) In doing so I don't want to push the parsed data to disk and then re-obtain it via the scala class. Is there a way I can achieve what I want in an efficient way? ..Manas -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Can-I-share-RDD-between-a-pyspark-and-spark-API-tp5415.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
about broadcast
In my code, there are two broadcast variables. Sometimes reading the small one took more time than the big one, so strange! Log on slave node is as follows: Block broadcast_2 stored as values to memory (estimated size *4.0 KB*, free 17.2 GB) Reading broadcast variable 2 took *9.998537123* s Block broadcast_1 stored as values to memory (estimated size *705.9 MB*, free 16.5 GB) Reading broadcast variable 1 took *2.596005629* s Reading the small one took about 0.004s normally, but more then 9s Occasionally. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/about-broadcast-tp5416.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: about broadcast
additional, Reading the big broadcast variable always took about 2s. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/about-broadcast-tp5416p5417.html Sent from the Apache Spark User List mailing list archive at Nabble.com.