How RDD lineage works
Hi, I don't get a good understanding how RDD lineage works, so I would ask whether spark provides a unit test in the code base to illustrate how RDD lineage works. If there is, What's the class name is it? Thanks! bit1...@163.com
Re: Re: How RDD lineage works
Thanks TD and Zhihong for the guide. I will check it bit1...@163.com From: Tathagata Das Date: 2015-07-31 12:27 To: Ted Yu CC: bit1...@163.com; user Subject: Re: How RDD lineage works You have to read the original Spark paper to understand how RDD lineage works. https://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf On Thu, Jul 30, 2015 at 9:25 PM, Ted Yu yuzhih...@gmail.com wrote: Please take a look at: core/src/test/scala/org/apache/spark/CheckpointSuite.scala Cheers On Thu, Jul 30, 2015 at 7:39 PM, bit1...@163.com bit1...@163.com wrote: Hi, I don't get a good understanding how RDD lineage works, so I would ask whether spark provides a unit test in the code base to illustrate how RDD lineage works. If there is, What's the class name is it? Thanks! bit1...@163.com
Re: Re: How RDD lineage works
The following is copied from the paper, is something related with rdd lineage. Is there a unit test that covers this scenario(rdd partition lost and recovery)? Thanks. If a partition of an RDD is lost, the RDD has enough information about how it was derived from other RDDs to recompute just that partition. Thus, lost data can be recovered, often quite quickly, without requiring costly replication. bit1...@163.com From: bit1...@163.com Date: 2015-07-31 13:11 To: Tathagata Das; yuzhihong CC: user Subject: Re: Re: How RDD lineage works Thanks TD and Zhihong for the guide. I will check it bit1...@163.com From: Tathagata Das Date: 2015-07-31 12:27 To: Ted Yu CC: bit1...@163.com; user Subject: Re: How RDD lineage works You have to read the original Spark paper to understand how RDD lineage works. https://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf On Thu, Jul 30, 2015 at 9:25 PM, Ted Yu yuzhih...@gmail.com wrote: Please take a look at: core/src/test/scala/org/apache/spark/CheckpointSuite.scala Cheers On Thu, Jul 30, 2015 at 7:39 PM, bit1...@163.com bit1...@163.com wrote: Hi, I don't get a good understanding how RDD lineage works, so I would ask whether spark provides a unit test in the code base to illustrate how RDD lineage works. If there is, What's the class name is it? Thanks! bit1...@163.com
A question about spark checkpoint
Hi, I have following code that uses checkpoint to checkpoint the heavy ops,which works well that the last heavyOpRDD.foreach(println) will not recompute from the beginning. But when I re-run this program, the rdd computing chain will be recomputed from the beginning, I thought that it will also read from the checkpoint directory since I have the data there when I last run it. Do I misunderstand how checkpoint works or there are some configuration to make it work. Thanks import org.apache.spark.{SparkConf, SparkContext} object CheckpointTest { def squareWithHeavyOp(x: Int) = { Thread.sleep(2000) println(ssquareWithHeavyOp $x) x * x } def main(args: Array[String]) { val conf = new SparkConf().setMaster(local).setAppName(CheckpointTest) val sc = new SparkContext(conf) sc.setCheckpointDir(file:///d:/checkpointDir) val rdd = sc.parallelize(List(1, 2, 3, 4, 5)) val heavyOpRDD = rdd.map(squareWithHeavyOp) heavyOpRDD.checkpoint() heavyOpRDD.foreach(println) println(Job 0 has been finished, press ENTER to do job 1) readLine() heavyOpRDD.foreach(println) } } bit1...@163.com
What if request cores are not satisfied
Hi, Assume a following scenario: The spark standalone cluster has 10 cores in total, I have an application that will request 12 cores. Will the application run with fewer cores than requested or will it simply wait for ever since there are only 10 cores available. I would guess it will be run with fewer cores, but I didn't get a chance to try/test it. Thanks. bit1...@163.com
Re: Re: Application jar file not found exception when submitting application
Thanks Shixiong for the reply. Yes, I confirm that the file exists there ,simply checks with ls -l /data/software/spark-1.3.1-bin-2.4.0/applications/pss.am.core-1.0-SNAPSHOT-shaded.jar bit1...@163.com From: Shixiong Zhu Date: 2015-07-06 18:41 To: bit1...@163.com CC: user Subject: Re: Application jar file not found exception when submitting application Before running your script, could you confirm that /data/software/spark-1.3.1-bin-2.4.0/applications/pss.am.core-1.0-SNAPSHOT-shaded.jar exists? You might forget to build this jar. Best Regards, Shixiong Zhu 2015-07-06 18:14 GMT+08:00 bit1...@163.com bit1...@163.com: Hi, I have following shell script that will submit the application to the cluster. But whenever I start the application, I encounter FileNotFoundException, after retrying for serveral times, I can successfully submit it! SPARK=/data/software/spark-1.3.1-bin-2.4.0 APP_HOME=/data/software/spark-1.3.1-bin-2.4.0/applications $SPARK/bin/spark-submit --deploy-mode cluster --name PssAmStreamingApplication --master spark:/com-app1:7077 --driver-memory 1G --executor-memory 4G --total-executor-cores 10 --class com.app.PssAmStreamingApplicationDriver $APP_HOME/pss.am.core-1.0-SNAPSHOT-shaded.jar [root@com-app2 applications]# ./submitApplicationStreaming.sh Running Spark using the REST application submission protocol. Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/07/06 18:05:35 INFO StandaloneRestClient: Submitting a request to launch an application in spark://com-app1:7077. Warning: Master endpoint spark://com-app1:7077 was not a REST server. Falling back to legacy submission gateway instead. 15/07/06 18:05:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Sending launch command to spark://com-app1:7077 Driver successfully submitted as driver-20150706180538-0008 ... waiting before polling master for driver state ... polling master for driver state State of driver-20150706180538-0008 is ERROR Exception from cluster was: java.io.FileNotFoundException: File file:/data/software/spark-1.3.1-bin-2.4.0/applications/pss.am.core-1.0-SNAPSHOT-shaded.jar does not exist java.io.FileNotFoundException: File file:/data/software/spark-1.3.1-bin-2.4.0/applications/pss.am.core-1.0-SNAPSHOT-shaded.jar does not exist at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:511) at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:724) at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:501) at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:397) at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:464) at org.apache.spark.deploy.worker.DriverRunner.org$apache$spark$deploy$worker$DriverRunner$$downloadUserJar(DriverRunner.scala:146) at org.apache.spark.deploy.worker.DriverRunner$$anon$1.run(DriverRunner.scala:72) bit1...@163.com
Application jar file not found exception when submitting application
Hi, I have following shell script that will submit the application to the cluster. But whenever I start the application, I encounter FileNotFoundException, after retrying for serveral times, I can successfully submit it! SPARK=/data/software/spark-1.3.1-bin-2.4.0 APP_HOME=/data/software/spark-1.3.1-bin-2.4.0/applications $SPARK/bin/spark-submit --deploy-mode cluster --name PssAmStreamingApplication --master spark:/com-app1:7077 --driver-memory 1G --executor-memory 4G --total-executor-cores 10 --class com.app.PssAmStreamingApplicationDriver $APP_HOME/pss.am.core-1.0-SNAPSHOT-shaded.jar [root@com-app2 applications]# ./submitApplicationStreaming.sh Running Spark using the REST application submission protocol. Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/07/06 18:05:35 INFO StandaloneRestClient: Submitting a request to launch an application in spark://com-app1:7077. Warning: Master endpoint spark://com-app1:7077 was not a REST server. Falling back to legacy submission gateway instead. 15/07/06 18:05:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Sending launch command to spark://com-app1:7077 Driver successfully submitted as driver-20150706180538-0008 ... waiting before polling master for driver state ... polling master for driver state State of driver-20150706180538-0008 is ERROR Exception from cluster was: java.io.FileNotFoundException: File file:/data/software/spark-1.3.1-bin-2.4.0/applications/pss.am.core-1.0-SNAPSHOT-shaded.jar does not exist java.io.FileNotFoundException: File file:/data/software/spark-1.3.1-bin-2.4.0/applications/pss.am.core-1.0-SNAPSHOT-shaded.jar does not exist at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:511) at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:724) at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:501) at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:397) at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:464) at org.apache.spark.deploy.worker.DriverRunner.org$apache$spark$deploy$worker$DriverRunner$$downloadUserJar(DriverRunner.scala:146) at org.apache.spark.deploy.worker.DriverRunner$$anon$1.run(DriverRunner.scala:72) bit1...@163.com
Explanation of the numbers on Spark Streaming UI
Hi, Spark users, Following images are copied from spark streaming UI. I observed for about 30 minutes, and see that the Processed records(438768, at the moment I copied the image) are always lagging behind Received records(480783) by about 40k records, Since the waiting batches is 1 and the received records are many more than processed records, I can't understand why the total delay or scheduling day is not obvious(5 secs) here. Can someone help explain what clues from this UI? Thanks. bit1...@163.com
when cached RDD will unpersist its data
I am kind of consused about when cached RDD will unpersist its data. I know we can explicitly unpersist it with RDD.unpersist ,but can it be unpersist automatically by the spark framework? Thanks. bit1...@163.com
How to figure out how many records received by individual receiver
Hi, I am using spark1.3.1, and have 2 receivers, On the web UI, I can only see the total records received by all these 2 receivers, but I can't figure out the records received by individual receiver? Not sure whether the information is shown on the UI in spark1.4. bit1...@163.com
What does [Stage 0: (0 + 2) / 2] mean on the console
Hi, I have a spark streaming application that runs locally with two receivers, some code snippet is as follows: conf.setMaster(local[4]) //RPC Log Streaming val rpcStream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, consumerParams, topicRPC, StorageLevel.MEMORY_ONLY) RPCLogStreamProcessor.process(rpcStream, taskConfBroadcast) //HTTP Log Streaming val httpStream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, consumerParams, topicHTTP, StorageLevel.MEMORY_ONLY) HttpLogStreamProcessor.process(httpStream, taskConfBroadcast) There is a log information showing on the console in red color [Stage 0: (0 + 2) / 2] It appears, then disappear, and then appear, disappear... For the above code, if I only have rpc streaming and comment the httpStream, then it disappear. I don't know how it occurs and how to suppress it bit1...@163.com
Re: Re: What does [Stage 0: (0 + 2) / 2] mean on the console
Hi, Akhil, Thank you for the explanation! bit1...@163.com From: Akhil Das Date: 2015-06-23 16:29 To: bit1...@163.com CC: user Subject: Re: What does [Stage 0: (0 + 2) / 2] mean on the console Well, you could that (Stage information) is an ASCII representation of the WebUI (running on port 4040). Since you set local[4] you will have 4 threads for your computation, and since you are having 2 receivers, you are left with 2 threads to process ((0 + 2) -- This 2 is your 2 threads.) And the other /2 means you are having 2 tasks in that stage (with id 0). Thanks Best Regards On Tue, Jun 23, 2015 at 1:21 PM, bit1...@163.com bit1...@163.com wrote: Hi, I have a spark streaming application that runs locally with two receivers, some code snippet is as follows: conf.setMaster(local[4]) //RPC Log Streaming val rpcStream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, consumerParams, topicRPC, StorageLevel.MEMORY_ONLY) RPCLogStreamProcessor.process(rpcStream, taskConfBroadcast) //HTTP Log Streaming val httpStream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, consumerParams, topicHTTP, StorageLevel.MEMORY_ONLY) HttpLogStreamProcessor.process(httpStream, taskConfBroadcast) There is a log information showing on the console in red color [Stage 0: (0 + 2) / 2] It appears, then disappear, and then appear, disappear... For the above code, if I only have rpc streaming and comment the httpStream, then it disappear. I don't know how it occurs and how to suppress it bit1...@163.com
Re: RE: Build spark application into uber jar
Thanks. I guess what you mean by maven build target is maven profile. I added two profiles, one is LocalRun, the other is ClusterRun for the spark related artifact scope. So that, I don't have to change the pom file but just to select a profile. profile idLocalRun/id properties spark.scopecompile/spark.scope /properties /profile profile idClusterRun/id properties spark.scopeprovided/spark.scope /properties /profile bit1...@163.com From: prajod.vettiyat...@wipro.com Date: 2015-06-19 15:22 To: bit1...@163.com; ak...@sigmoidanalytics.com CC: user@spark.apache.org Subject: RE: Re: Build spark application into uber jar Hi, When running inside Eclipse IDE, I use another maven target to build. That is the default maven target. For building for uber jar. I use the assembly jar target. So use two maven build targets in the same pom file to solve this issue. In maven you can have multiple build targets, and each target can have its own command line options. prajod From: bit1...@163.com [mailto:bit1...@163.com] Sent: 19 June 2015 12:36 To: Akhil Das; Prajod S Vettiyattil (WT01 - BAS) Cc: user Subject: Re: Re: Build spark application into uber jar Thank you Akhil. Hmm.. but I am using Maven as the building tool, bit1...@163.com From: Akhil Das Date: 2015-06-19 15:31 To: Prajod S Vettiyattil (WT01 - BAS) CC: user@spark.apache.org Subject: Re: Build spark application into uber jar This is how i used to build a assembly jar with sbt: Your build.sbt file would look like this: import AssemblyKeys._ assemblySettings name := FirstScala version := 1.0 scalaVersion := 2.10.4 libraryDependencies += org.apache.spark %% spark-core % 1.3.1 libraryDependencies += org.apache.spark %% spark-streaming % 1.3.1 libraryDependencies += org.apache.spark %% spark-mllib % 1.3.1 Also create a file inside project directory named plugins.sbt and add this line inside it: addSbtPlugin(com.eed3si9n % sbt-assembly % 0.11.2) And then You will be able to do sbt assembly Thanks Best Regards On Fri, Jun 19, 2015 at 12:09 PM, prajod.vettiyat...@wipro.com wrote: but when I run the application locally, it complains that spark related stuff is missing I use the uber jar option. What do you mean by “locally” ? In the Spark scala shell ? In the From: bit1...@163.com [mailto:bit1...@163.com] Sent: 19 June 2015 08:11 To: user Subject: Build spark application into uber jar Hi,sparks, I have a spark streaming application that is a maven project, I would like to build it into a uber jar and run in the cluster. I have found out two options to build the uber jar, either of them has its shortcomings, so I would ask how you guys do it. Thanks. 1. Use the maven shade jar, and I have marked the spark related stuff as provided in the pom.xml, like: dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version${spark.version}/version scopeprovided/scope /dependency With this, looks it can build the uber jar, but when I run the application locally, it complains that spark related stuff is missing which is not surprising because the spark related things are marked as provided, which will not included in runtime time 2. Instead of marking the spark things as provided, i configure the maven shade plugin to exclude the spark things as following, but there are still many things are there. executions execution phasepackage/phase goals goalshade/goal /goals configuration artifactSet excludes excludejunit:junit/exclude excludelog4j:log4j:jar:/exclude excludeorg.scala-lang:scala-library:jar:/exclude excludeorg.apache.spark:spark-core_2.10/exclude excludeorg.apache.spark:spark-sql_2.10/exclude excludeorg.apache.spark:spark-streaming_2.10/exclude /excludes /artifactSet /configuration Does someone ever build uber jar for the spark application, I would like to see how you do it, thanks! bit1...@163.com The information contained in this electronic message and any attachments to this message are intended for the exclusive use of the addressee(s) and may contain proprietary, confidential or privileged information. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately and destroy all copies of this message and any attachments. WARNING: Computer viruses can be transmitted via email. The recipient should check this email and any attachments for the presence of viruses. The company accepts no liability for any damage caused by any virus transmitted by this email. www.wipro.com The information contained in this electronic message and any attachments to this message are intended for the exclusive use of the addressee(s) and may contain proprietary, confidential or privileged information. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify
Re: RE: Build spark application into uber jar
Sure, Thanks Projod for the detailed steps! bit1...@163.com From: prajod.vettiyat...@wipro.com Date: 2015-06-19 16:56 To: bit1...@163.com; ak...@sigmoidanalytics.com CC: user@spark.apache.org Subject: RE: RE: Build spark application into uber jar Multiple maven profiles may be the ideal way. You can also do this with: 1. The defaul build command “mvn compile” , for local builds(use this to build with Eclipse’s “Run As-Maven build” option when you right-click on the pom.xml file) 2. Add maven build options to the same build command as above, for the uber jar build: “mvn compile assembly:single”(use this to build with Eclipse’s “Run As-Maven build…” option when you right-click on the pom.xml file). Note the extra dots(…) after “Maven build” in this option. Regards, Prajod From: bit1...@163.com [mailto:bit1...@163.com] Sent: 19 June 2015 13:01 To: Prajod S Vettiyattil (WT01 - BAS); Akhil Das Cc: user Subject: Re: RE: Build spark application into uber jar Thanks. I guess what you mean by maven build target is maven profile. I added two profiles, one is LocalRun, the other is ClusterRun for the spark related artifact scope. So that, I don't have to change the pom file but just to select a profile. profile idLocalRun/id properties spark.scopecompile/spark.scope /properties /profile profile idClusterRun/id properties spark.scopeprovided/spark.scope /properties /profile bit1...@163.com From: prajod.vettiyat...@wipro.com Date: 2015-06-19 15:22 To: bit1...@163.com; ak...@sigmoidanalytics.com CC: user@spark.apache.org Subject: RE: Re: Build spark application into uber jar Hi, When running inside Eclipse IDE, I use another maven target to build. That is the default maven target. For building for uber jar. I use the assembly jar target. So use two maven build targets in the same pom file to solve this issue. In maven you can have multiple build targets, and each target can have its own command line options. prajod From: bit1...@163.com [mailto:bit1...@163.com] Sent: 19 June 2015 12:36 To: Akhil Das; Prajod S Vettiyattil (WT01 - BAS) Cc: user Subject: Re: Re: Build spark application into uber jar Thank you Akhil. Hmm.. but I am using Maven as the building tool, bit1...@163.com From: Akhil Das Date: 2015-06-19 15:31 To: Prajod S Vettiyattil (WT01 - BAS) CC: user@spark.apache.org Subject: Re: Build spark application into uber jar This is how i used to build a assembly jar with sbt: Your build.sbt file would look like this: import AssemblyKeys._ assemblySettings name := FirstScala version := 1.0 scalaVersion := 2.10.4 libraryDependencies += org.apache.spark %% spark-core % 1.3.1 libraryDependencies += org.apache.spark %% spark-streaming % 1.3.1 libraryDependencies += org.apache.spark %% spark-mllib % 1.3.1 Also create a file inside project directory named plugins.sbt and add this line inside it: addSbtPlugin(com.eed3si9n % sbt-assembly % 0.11.2) And then You will be able to do sbt assembly Thanks Best Regards On Fri, Jun 19, 2015 at 12:09 PM, prajod.vettiyat...@wipro.com wrote: but when I run the application locally, it complains that spark related stuff is missing I use the uber jar option. What do you mean by “locally” ? In the Spark scala shell ? In the From: bit1...@163.com [mailto:bit1...@163.com] Sent: 19 June 2015 08:11 To: user Subject: Build spark application into uber jar Hi,sparks, I have a spark streaming application that is a maven project, I would like to build it into a uber jar and run in the cluster. I have found out two options to build the uber jar, either of them has its shortcomings, so I would ask how you guys do it. Thanks. 1. Use the maven shade jar, and I have marked the spark related stuff as provided in the pom.xml, like: dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version${spark.version}/version scopeprovided/scope /dependency With this, looks it can build the uber jar, but when I run the application locally, it complains that spark related stuff is missing which is not surprising because the spark related things are marked as provided, which will not included in runtime time 2. Instead of marking the spark things as provided, i configure the maven shade plugin to exclude the spark things as following, but there are still many things are there. executions execution phasepackage/phase goals goalshade/goal /goals configuration artifactSet excludes excludejunit:junit/exclude excludelog4j:log4j:jar:/exclude excludeorg.scala-lang:scala-library:jar:/exclude excludeorg.apache.spark:spark-core_2.10/exclude excludeorg.apache.spark:spark-sql_2.10/exclude excludeorg.apache.spark:spark-streaming_2.10/exclude /excludes /artifactSet /configuration Does someone ever build uber jar for the spark application, I would like to see how you do it, thanks! bit1...@163.com
Re: RE: Spark or Storm
I think your observation is correct, you have to take care of these replayed data at your end,eg,each message has a unique id or something else. I am using I think in the above sentense, because I am not sure and I also have a related question: I am wonderring how direct stream + kakfa is implemented when the Driver is down and restarted, will it always first replay the checkpointed failed batch or will it honor Kafka's offset reset policy(auto.offset.reset). If it honors the reset policy and it is set as smallest, then it is the at least once semantics; if it set largest, then it will be at most once semantics? bit1...@163.com From: Haopu Wang Date: 2015-06-19 18:47 To: Enno Shioji; Tathagata Das CC: prajod.vettiyat...@wipro.com; Cody Koeninger; bit1...@163.com; Jordan Pilat; Will Briggs; Ashish Soni; ayan guha; user@spark.apache.org; Sateesh Kavuri; Spark Enthusiast; Sabarish Sasidharan Subject: RE: RE: Spark or Storm My question is not directly related: about the exactly-once semantic, the document (copied below) said spark streaming gives exactly-once semantic, but actually from my test result, with check-point enabled, the application always re-process the files in last batch after gracefully restart. == Semantics of Received Data Different input sources provide different guarantees, ranging from at-least once to exactly once. Read for more details. With Files If all of the input data is already present in a fault-tolerant files system like HDFS, Spark Streaming can always recover from any failure and process all the data. This gives exactly-once semantics, that all the data will be processed exactly once no matter what fails. From: Enno Shioji [mailto:eshi...@gmail.com] Sent: Friday, June 19, 2015 5:29 PM To: Tathagata Das Cc: prajod.vettiyat...@wipro.com; Cody Koeninger; bit1...@163.com; Jordan Pilat; Will Briggs; Ashish Soni; ayan guha; user@spark.apache.org; Sateesh Kavuri; Spark Enthusiast; Sabarish Sasidharan Subject: Re: RE: Spark or Storm Fair enough, on second thought, just saying that it should be idempotent is indeed more confusing. I guess the crux of the confusion comes from the fact that people tend to assume the work you described (store batch id and skip etc.) is handled by the framework, perhaps partly because Storm Trident does handle it (you just need to let Storm know if the output operation has succeeded or not, and it handles the batch id storing skipping business). Whenever I explain people that one needs to do this additional work you described to get end-to-end exactly-once semantics, it usually takes a while to convince them. In my limited experience, they tend to interpret transactional in that sentence to mean that you just have to write to a transactional storage like ACID RDB. Pointing them to Semantics of output operations is usually sufficient though. Maybe others like @Ashish can weigh on this; did you interpret it in this way? What if we change the statement into: end-to-end exactly-once semantics (if your updates to downstream systems are idempotent or transactional). To learn how to make your updates idempotent or transactional, see the Semantics of output operations section in this chapter That way, it's clear that it's not sufficient to merely write to a transactional storage like ACID store. On Fri, Jun 19, 2015 at 9:08 AM, Tathagata Das t...@databricks.com wrote: If the current documentation is confusing, we can definitely improve the documentation. However, I dont not understand why is the term transactional confusing. If your output operation has to add 5, then the user has to implement the following mechanism 1. If the unique id of the batch of data is already present in the store, then skip the update 2. Otherwise atomically do both, the update operation as well as store the unique id of the batch. This is pretty much the definition of a transaction. The user has to be aware of the transactional semantics of the data store while implementing this functionality. You CAN argue that this effective makes the whole updating sort-a idempotent, as even if you try doing it multiple times, it will update only once. But that is not what is generally considered as idempotent. Writing a fixed count, not an increment, is usually what is called idempotent. And so just mentioning that the output operation must be idempotent is, in my opinion, more confusing. To take a page out of the Storm / Trident guide, even they call this exact conditional updating of Trident State as transactional operation. See transactional spout in the Trident State guide - https://storm.apache.org/documentation/Trident-state In the end, I am totally open the suggestions and PRs on how to make the programming guide easier to understand. :) TD On Thu, Jun 18, 2015 at 11:47 PM, Enno Shioji eshi...@gmail.com wrote: Tbh I find the doc around this a bit confusing. If it says end-to-end exactly-once
Re: RE: Build spark application into uber jar
Thank you for the reply. Run the application locally means that I run the application in my IDE with master as local[*]. When spark stuff is marked as provided, then I can't run it because the spark stuff is missing. So, how do you work around this? Thanks! bit1...@163.com From: prajod.vettiyat...@wipro.com Date: 2015-06-19 14:39 To: user@spark.apache.org Subject: RE: Build spark application into uber jar but when I run the application locally, it complains that spark related stuff is missing I use the uber jar option. What do you mean by “locally” ? In the Spark scala shell ? In the From: bit1...@163.com [mailto:bit1...@163.com] Sent: 19 June 2015 08:11 To: user Subject: Build spark application into uber jar Hi,sparks, I have a spark streaming application that is a maven project, I would like to build it into a uber jar and run in the cluster. I have found out two options to build the uber jar, either of them has its shortcomings, so I would ask how you guys do it. Thanks. 1. Use the maven shade jar, and I have marked the spark related stuff as provided in the pom.xml, like: dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version${spark.version}/version scopeprovided/scope /dependency With this, looks it can build the uber jar, but when I run the application locally, it complains that spark related stuff is missing which is not surprising because the spark related things are marked as provided, which will not included in runtime time 2. Instead of marking the spark things as provided, i configure the maven shade plugin to exclude the spark things as following, but there are still many things are there. executions execution phasepackage/phase goals goalshade/goal /goals configuration artifactSet excludes excludejunit:junit/exclude excludelog4j:log4j:jar:/exclude excludeorg.scala-lang:scala-library:jar:/exclude excludeorg.apache.spark:spark-core_2.10/exclude excludeorg.apache.spark:spark-sql_2.10/exclude excludeorg.apache.spark:spark-streaming_2.10/exclude /excludes /artifactSet /configuration Does someone ever build uber jar for the spark application, I would like to see how you do it, thanks! bit1...@163.com The information contained in this electronic message and any attachments to this message are intended for the exclusive use of the addressee(s) and may contain proprietary, confidential or privileged information. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately and destroy all copies of this message and any attachments. WARNING: Computer viruses can be transmitted via email. The recipient should check this email and any attachments for the presence of viruses. The company accepts no liability for any damage caused by any virus transmitted by this email. www.wipro.com
Re: RE: Spark or Storm
I am wondering how direct stream api ensures end-to-end exactly once semantics I think there are two things involved: 1. From the spark streaming end, the driver will replay the Offset range when it's down and restarted,which means that the new tasks will process some already processed data. 2. From the user end, since tasks may process already processed data, user end should detect that some data has already been processed,eg, use some unique ID. Not sure if I have understood correctly. bit1...@163.com From: prajod.vettiyat...@wipro.com Date: 2015-06-18 16:56 To: jrpi...@gmail.com; eshi...@gmail.com CC: wrbri...@gmail.com; asoni.le...@gmail.com; guha.a...@gmail.com; user@spark.apache.org; sateesh.kav...@gmail.com; sparkenthusi...@yahoo.in; sabarish.sasidha...@manthan.com Subject: RE: Spark or Storm not being able to read from Kafka using multiple nodes Kafka is plenty capable of doing this.. I faced the same issue before Spark 1.3 was released. The issue was not with Kafka, but with Spark Streaming’s Kafka connector. Before Spark 1.3.0 release one Spark worker would get all the streamed messages. We had to re-partition to distribute the processing. From Spark 1.3.0 release the Spark Direct API for Kafka supported parallel reads from Kafka streamed to Spark workers. See the “Approach 2: Direct Approach” in this page: http://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html. Note that is also mentions zero data loss and exactly once semantics for kafka integration. Prajod From: Jordan Pilat [mailto:jrpi...@gmail.com] Sent: 18 June 2015 03:57 To: Enno Shioji Cc: Will Briggs; asoni.le...@gmail.com; ayan guha; user; Sateesh Kavuri; Spark Enthusiast; Sabarish Sasidharan Subject: Re: Spark or Storm not being able to read from Kafka using multiple nodes Kafka is plenty capable of doing this, by clustering together multiple consumer instances into a consumer group. If your topic is sufficiently partitioned, the consumer group can consume the topic in a parallelized fashion. If it isn't, you still have the fault tolerance associated with clustering the consumers. OK JRP On Jun 17, 2015 1:27 AM, Enno Shioji eshi...@gmail.com wrote: We've evaluated Spark Streaming vs. Storm and ended up sticking with Storm. Some of the important draw backs are: Spark has no back pressure (receiver rate limit can alleviate this to a certain point, but it's far from ideal) There is also no exactly-once semantics. (updateStateByKey can achieve this semantics, but is not practical if you have any significant amount of state because it does so by dumping the entire state on every checkpointing) There are also some minor drawbacks that I'm sure will be fixed quickly, like no task timeout, not being able to read from Kafka using multiple nodes, data loss hazard with Kafka. It's also not possible to attain very low latency in Spark, if that's what you need. The pos for Spark is the concise and IMO more intuitive syntax, especially if you compare it with Storm's Java API. I admit I might be a bit biased towards Storm tho as I'm more familiar with it. Also, you can do some processing with Kinesis. If all you need to do is straight forward transformation and you are reading from Kinesis to begin with, it might be an easier option to just do the transformation in Kinesis. On Wed, Jun 17, 2015 at 7:15 AM, Sabarish Sasidharan sabarish.sasidha...@manthan.com wrote: Whatever you write in bolts would be the logic you want to apply on your events. In Spark, that logic would be coded in map() or similar such transformations and/or actions. Spark doesn't enforce a structure for capturing your processing logic like Storm does. Regards Sab Probably overloading the question a bit. In Storm, Bolts have the functionality of getting triggered on events. Is that kind of functionality possible with Spark streaming? During each phase of the data processing, the transformed data is stored to the database and this transformed data should then be sent to a new pipeline for further processing How can this be achieved using Spark? On Wed, Jun 17, 2015 at 10:10 AM, Spark Enthusiast sparkenthusi...@yahoo.in wrote: I have a use-case where a stream of Incoming events have to be aggregated and joined to create Complex events. The aggregation will have to happen at an interval of 1 minute (or less). The pipeline is : send events enrich event Upstream services --- KAFKA - event Stream Processor Complex Event Processor Elastic Search. From what I understand, Storm will make a very good ESP and Spark Streaming will make a good CEP. But, we are also evaluating Storm with Trident. How does Spark Streaming compare with Storm with Trident? Sridhar Chellappa On Wednesday, 17 June 2015 10:02 AM, ayan guha guha.a...@gmail.com wrote: I
Build spark application into uber jar
Hi,sparks, I have a spark streaming application that is a maven project, I would like to build it into a uber jar and run in the cluster. I have found out two options to build the uber jar, either of them has its shortcomings, so I would ask how you guys do it. Thanks. 1. Use the maven shade jar, and I have marked the spark related stuff as provided in the pom.xml, like: dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version${spark.version}/version scopeprovided/scope /dependency With this, looks it can build the uber jar, but when I run the application locally, it complains that spark related stuff is missing which is not surprising because the spark related things are marked as provided, which will not included in runtime time 2. Instead of marking the spark things as provided, i configure the maven shade plugin to exclude the spark things as following, but there are still many things are there. executions execution phasepackage/phase goals goalshade/goal /goals configuration artifactSet excludes excludejunit:junit/exclude excludelog4j:log4j:jar:/exclude excludeorg.scala-lang:scala-library:jar:/exclude excludeorg.apache.spark:spark-core_2.10/exclude excludeorg.apache.spark:spark-sql_2.10/exclude excludeorg.apache.spark:spark-streaming_2.10/exclude /excludes /artifactSet /configuration Does someone ever build uber jar for the spark application, I would like to see how you do it, thanks! bit1...@163.com
Re: Wired Problem: Task not serializable[Spark Streaming]
Could someone help explain what happens that leads to the Task not serializable issue? Thanks. bit1...@163.com From: bit1...@163.com Date: 2015-06-08 19:08 To: user Subject: Wired Problem: Task not serializable[Spark Streaming] Hi, With the following simple code, I got an exception that complains Task not serializable. The root cause is I use return in map foreach loop Why return in map foreach loop cause the Task not serializable problem, can someone please this to me? import org.apache.spark.SparkConf import org.apache.spark.streaming._ import scala.collection.mutable object NetCatStreamingWordCount3 { def main(args: Array[String]) { val conf = new SparkConf().setAppName(NetCatWordCount) conf.setMaster(local[3]) val ssc = new StreamingContext(conf, Seconds(5)) val lines = ssc.socketTextStream(localhost, ) lines.foreachRDD(rdd = { rdd.foreachPartition(partitionIterable= { val map = mutable.Map[String, String]() while(partitionIterable.hasNext) { val v = partitionIterable.next() map += v -v } map.foreach(entry = { if (entry._1.equals(abc)) { return; //This is the root cause that cause the Task not serializable. } }) }) }) ssc.start() ssc.awaitTermination() } } bit1...@163.com
Which class takes place of BlockManagerWorker in Spark 1.3.1
Hi, I remembered that there is a class called BlockManagerWorker in spark previous releases. In the 1.3.1 code, I could see that some method comment still refers to BlockManagerWorker which doesn't exist at all. I would ask which class takes place of BlockManagerWorker in Spark 1.3.1? Thanks. BTW, BlockManagerMaster is there, it makes no sense that BlockManagerWorker is gone. bit1...@163.com
Don't understand the numbers on the Storage UI(/storage/rdd/?id=4)
Hi, I do a word count application with 600M text file, and give the RDD's StorageLevel as StorageLevel.MEMORY_AND_DISK_2. I got two questions that I can't explain: 1. The StorageLevel shown on the UI is Disk Serialized 2x Replicated,but I am using StorageLevel.MEMORY_AND_DISK_2,where is the Memory info? Storage Level: Disk Serialized 2x Replicated Cached Partitions: 20 Total Partitions: 20 Memory Size: 107.6 MB Disk Size: 277.1 MB 2. My textfile is 600M,but the memory and Disk size shown above is about 400M total(107.6M + 277.1M), and I am using 2 replications, So, in my opinion it should be about 600M * 2, Looks some compression happens under the scene or something else? Thanks! bit1...@163.com
Articles related with how spark handles spark components(Driver,Worker,Executor, Task) failure
Hi, I am looking for some articles/blogs on the topic about how spark handles the various failures,such as Driver,Worker,Executor, Task..etc Are there some articles/blogs on this topic? Detailes into source code would be the best. Thanks very much! bit1...@163.com
Don't understand schedule jobs within an Application
Hi, sparks, Following is copied from the spark online document http://spark.apache.org/docs/latest/job-scheduling.html. Basically, I have two questions on it: 1. If two jobs in an application has dependencies, that is one job depends on the result of the other job, then I think they will have to run sequentially. 2. Since jobs scheduling happens within one application, I don't think job scheduing will give benefits to multi-users as the last sentence says.in my opinion, multi users can benifit only from cross applications scheduling. Maybe i haven't had a good understanding on the job scheduing, could someone elaborate this? Thanks very much By default, Spark’s scheduler runs jobs in FIFO fashion. Each job is divided into “stages” (e.g. map and reduce phases), and the first job gets priority on all available resources while its stages have tasks to launch, then the second job gets priority, etc. If the jobs at the head of the queue don’t need to use the whole cluster, later jobs can start to run right away, but if the jobs at the head of the queue are large, then later jobs may be delayed significantly. Starting in Spark 0.8, it is also possible to configure fair sharing between jobs. Under fair sharing, Spark assigns tasks between jobs in a “round robin” fashion, so that all jobs get a roughly equal share of cluster resources. This means that short jobs submitted while a long job is running can start receiving resources right away and still get good response times, without waiting for the long job to finish. This mode is best for multi-user settings bit1...@163.com
Re: How Broadcast variable works
Can someone help take a look at my questions? Thanks. bit1...@163.com From: bit1...@163.com Date: 2015-05-29 18:57 To: user Subject: How Broadcast variable works Hi, I have a spark streaming application. SparkContext uses broadcast vriables to broadcast Configuration information that each task will be used I have following two questions: 1. Will the broadcast variable be broadcasted every time when driver sends tasks to workers in each batch interval? 2. If the above question is true, then if the broadcast variable is modified between the batch interval(The configuration information is updated over time) and Spark Context broadcasts it again, will tasks see the updated variable? Thanks. bit1...@163.com
回复: How to use zookeeper in Spark Streaming
Can someone please help me on this? bit1...@163.com 发件人: bit1...@163.com 发送时间: 2015-05-24 13:53 收件人: user 主题: How to use zookeeper in Spark Streaming Hi, In my spark streaming application, when the application starts and get running, the Tasks running on the Worker nodes need to be notified that some configurations have been changed from time to time, these configurations reside on the Zookeeper. My question is, where should I put the code that works with Zookeeper for the configuration change, in the Driver code or in the Task code? Is there some guide on this? Thanks. bit1...@163.com
Re: Re: Spark streaming - textFileStream/fileStream - Get file name
Correct myself: For the SparkContext#wholeTextFile, the RDD's elements are kv pairs, the key is the file path, and the value is the file content So,for the SparkContext#wholeTextFile, the RDD has already carried the file information. bit1...@163.com From: Saisai Shao Date: 2015-04-29 15:50 To: Akhil Das CC: bit1...@163.com; Vadim Bichutskiy; lokeshkumar; user Subject: Re: Re: Spark streaming - textFileStream/fileStream - Get file name Yes, looks like a solution but quite tricky. You have to parse the debug string to get the file name, also relies on HadoopRDD to get the file name :) 2015-04-29 14:52 GMT+08:00 Akhil Das ak...@sigmoidanalytics.com: It is possible to access the filename, its a bit tricky though. val fstream = ssc.fileStream[LongWritable, IntWritable, SequenceFileInputFormat[LongWritable, IntWritable]](/home/akhld/input/) fstream.foreach(x ={ //You can get it with this object. println(x.values.toDebugString) } ) Thanks Best Regards On Wed, Apr 29, 2015 at 8:33 AM, bit1...@163.com bit1...@163.com wrote: For the SparkContext#textFile, if a directory is given as the path parameter ,then it will pick up the files in the directory, so the same thing will occur. bit1...@163.com From: Saisai Shao Date: 2015-04-29 10:54 To: Vadim Bichutskiy CC: bit1...@163.com; lokeshkumar; user Subject: Re: Re: Spark streaming - textFileStream/fileStream - Get file name I think it might be useful in Spark Streaming's file input stream, but not sure is it useful in SparkContext#textFile, since we specify the file by our own, so why we still need to know the file name. I will open up a JIRA to mention about this feature. Thanks Jerry 2015-04-29 10:49 GMT+08:00 Vadim Bichutskiy vadim.bichuts...@gmail.com: I was wondering about the same thing. Vadim ᐧ On Tue, Apr 28, 2015 at 10:19 PM, bit1...@163.com bit1...@163.com wrote: Looks to me that the same thing also applies to the SparkContext.textFile or SparkContext.wholeTextFile, there is no way in RDD to figure out the file information where the data in RDD is from bit1...@163.com From: Saisai Shao Date: 2015-04-29 10:10 To: lokeshkumar CC: spark users Subject: Re: Spark streaming - textFileStream/fileStream - Get file name I think currently there's no API in Spark Streaming you can use to get the file names for file input streams. Actually it is not trivial to support this, may be you could file a JIRA with wishes you want the community to support, so anyone who is interested can take a crack on this. Thanks Jerry 2015-04-29 0:13 GMT+08:00 lokeshkumar lok...@dataken.net: Hi Forum, Using spark streaming and listening to the files in HDFS using textFileStream/fileStream methods, how do we get the fileNames which are read by these methods? I used textFileStream which has file contents in JavaDStream and I got no success with fileStream as it is throwing me a compilation error with spark version 1.3.1. Can someone please tell me if we have an API function or any other way to get the file names that these streaming methods read? Thanks Lokesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-textFileStream-fileStream-Get-file-name-tp22692.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org 邮件带有附件预览链接,若您转发或回复此邮件时不希望对方预览附件,建议您手动删除链接。 共有 1 个附件 image.png(80K) 极速下载 在线预览
Re: Re: Question about Memory Used and VCores Used
Thanks Sandy, it is very useful! bit1...@163.com From: Sandy Ryza Date: 2015-04-29 15:24 To: bit1...@163.com CC: user Subject: Re: Question about Memory Used and VCores Used Hi, Good question. The extra memory comes from spark.yarn.executor.memoryOverhead, the space used for the application master, and the way the YARN rounds requests up. This explains it in a little more detail: http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/ -Sandy On Tue, Apr 28, 2015 at 7:12 PM, bit1...@163.com bit1...@163.com wrote: Hi,guys, I have the following computation with 3 workers: spark-sql --master yarn --executor-memory 3g --executor-cores 2 --driver-memory 1g -e 'select count(*) from table' The resources used are shown as below on the UI: I don't understand why the memory used is 15GB and vcores used is 5. I think the memory used should be executor-memory*numOfWorkers=3G*3=9G, and the Vcores used shoulde be executor-cores*numOfWorkers=6 Can you please explain the result?Thanks. bit1...@163.com 邮件带有附件预览链接,若您转发或回复此邮件时不希望对方预览附件,建议您手动删除链接。 共有 1 个附件 Catch.jpg(16K) 极速下载 在线预览
Question about Memory Used and VCores Used
Hi,guys, I have the following computation with 3 workers: spark-sql --master yarn --executor-memory 3g --executor-cores 2 --driver-memory 1g -e 'select count(*) from table' The resources used are shown as below on the UI: I don't understand why the memory used is 15GB and vcores used is 5. I think the memory used should be executor-memory*numOfWorkers=3G*3=9G, and the Vcores used shoulde be executor-cores*numOfWorkers=6 Can you please explain the result?Thanks. bit1...@163.com
Re: Re: Spark streaming - textFileStream/fileStream - Get file name
Looks to me that the same thing also applies to the SparkContext.textFile or SparkContext.wholeTextFile, there is no way in RDD to figure out the file information where the data in RDD is from bit1...@163.com From: Saisai Shao Date: 2015-04-29 10:10 To: lokeshkumar CC: spark users Subject: Re: Spark streaming - textFileStream/fileStream - Get file name I think currently there's no API in Spark Streaming you can use to get the file names for file input streams. Actually it is not trivial to support this, may be you could file a JIRA with wishes you want the community to support, so anyone who is interested can take a crack on this. Thanks Jerry 2015-04-29 0:13 GMT+08:00 lokeshkumar lok...@dataken.net: Hi Forum, Using spark streaming and listening to the files in HDFS using textFileStream/fileStream methods, how do we get the fileNames which are read by these methods? I used textFileStream which has file contents in JavaDStream and I got no success with fileStream as it is throwing me a compilation error with spark version 1.3.1. Can someone please tell me if we have an API function or any other way to get the file names that these streaming methods read? Thanks Lokesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-textFileStream-fileStream-Get-file-name-tp22692.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Re: Spark streaming - textFileStream/fileStream - Get file name
For the SparkContext#textFile, if a directory is given as the path parameter ,then it will pick up the files in the directory, so the same thing will occur. bit1...@163.com From: Saisai Shao Date: 2015-04-29 10:54 To: Vadim Bichutskiy CC: bit1...@163.com; lokeshkumar; user Subject: Re: Re: Spark streaming - textFileStream/fileStream - Get file name I think it might be useful in Spark Streaming's file input stream, but not sure is it useful in SparkContext#textFile, since we specify the file by our own, so why we still need to know the file name. I will open up a JIRA to mention about this feature. Thanks Jerry 2015-04-29 10:49 GMT+08:00 Vadim Bichutskiy vadim.bichuts...@gmail.com: I was wondering about the same thing. Vadim ᐧ On Tue, Apr 28, 2015 at 10:19 PM, bit1...@163.com bit1...@163.com wrote: Looks to me that the same thing also applies to the SparkContext.textFile or SparkContext.wholeTextFile, there is no way in RDD to figure out the file information where the data in RDD is from bit1...@163.com From: Saisai Shao Date: 2015-04-29 10:10 To: lokeshkumar CC: spark users Subject: Re: Spark streaming - textFileStream/fileStream - Get file name I think currently there's no API in Spark Streaming you can use to get the file names for file input streams. Actually it is not trivial to support this, may be you could file a JIRA with wishes you want the community to support, so anyone who is interested can take a crack on this. Thanks Jerry 2015-04-29 0:13 GMT+08:00 lokeshkumar lok...@dataken.net: Hi Forum, Using spark streaming and listening to the files in HDFS using textFileStream/fileStream methods, how do we get the fileNames which are read by these methods? I used textFileStream which has file contents in JavaDStream and I got no success with fileStream as it is throwing me a compilation error with spark version 1.3.1. Can someone please tell me if we have an API function or any other way to get the file names that these streaming methods read? Thanks Lokesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-textFileStream-fileStream-Get-file-name-tp22692.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Why Spark is much faster than Hadoop MapReduce even on disk
Hi, I am frequently asked why spark is also much faster than Hadoop MapReduce on disk (without the use of memory cache). I have no convencing answer for this question, could you guys elaborate on this? Thanks!
Re: Re: Why Spark is much faster than Hadoop MapReduce even on disk
Is it? I learned somewhere else that spark's speed is 5~10 times faster than Hadoop MapReduce. bit1...@163.com From: Ilya Ganelin Date: 2015-04-28 10:55 To: bit1...@163.com; user Subject: Re: Why Spark is much faster than Hadoop MapReduce even on disk I believe the typical answer is that Spark is actually a bit slower. On Mon, Apr 27, 2015 at 7:34 PM bit1...@163.com bit1...@163.com wrote: Hi, I am frequently asked why spark is also much faster than Hadoop MapReduce on disk (without the use of memory cache). I have no convencing answer for this question, could you guys elaborate on this? Thanks!
Re: Re: spark streaming printing no output
Looks the message is consumed by the another console?( can see messages typed on this port from another console.) bit1...@163.com From: Shushant Arora Date: 2015-04-15 17:11 To: Akhil Das CC: user@spark.apache.org Subject: Re: spark streaming printing no output When I launched spark-shell using, spark-shell ---master local[2]. Same behaviour, no output on console but only timestamps. When I did, lines.saveAsTextFiles(hdfslocation,suffix); I get empty files of 0 bytes on hdfs On Wed, Apr 15, 2015 at 12:46 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Just make sure you have atleast 2 cores available for processing. You can try launching it in local[2] and make sure its working fine. Thanks Best Regards On Tue, Apr 14, 2015 at 11:41 PM, Shushant Arora shushantaror...@gmail.com wrote: Hi I am running a spark streaming application but on console nothing is getting printed. I am doing 1.bin/spark-shell --master clusterMgrUrl 2.import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.Duration import org.apache.spark.streaming.Seconds val ssc = new StreamingContext( sc, Seconds(1)) val lines = ssc.socketTextStream(hostname,) lines.print() ssc.start() ssc.awaitTermination() Jobs are getting created when I see webUI but nothing gets printed on console. I have started a nc script on hostname port and can see messages typed on this port from another console. Please let me know If I am doing something wrong.
Re: Re: About Waiting batches on the spark streaming UI
Thanks Tathagata for the explanation! bit1...@163.com From: Tathagata Das Date: 2015-04-04 01:28 To: Ted Yu CC: bit1129; user Subject: Re: About Waiting batches on the spark streaming UI Maybe that should be marked as waiting as well. Will keep that in mind. We plan to update the ui soon, so will keep that in mind. On Apr 3, 2015 10:12 AM, Ted Yu yuzhih...@gmail.com wrote: Maybe add another stat for batches waiting in the job queue ? Cheers On Fri, Apr 3, 2015 at 10:01 AM, Tathagata Das t...@databricks.com wrote: Very good question! This is because the current code is written such that the ui considers a batch as waiting only when it has actually started being processed. Thats batched waiting in the job queue is not considered in the calculation. It is arguable that it may be more intuitive to count that in the waiting as well. On Apr 3, 2015 12:59 AM, bit1...@163.com bit1...@163.com wrote: I copied the following from the spark streaming UI, I don't know why the Waiting batches is 1, my understanding is that it should be 72. Following is my understanding: 1. Total time is 1minute 35 seconds=95 seconds 2. Batch interval is 1 second, so, 95 batches are generated in 95 seconds. 3. Processed batches are 23(Correct, because in my processing code, it does nothing but sleep 4 seconds) 4. Then the waiting batches should be 95-23=72 Started at: Fri Apr 03 15:17:47 CST 2015 Time since start: 1 minute 35 seconds Network receivers: 1 Batch interval: 1 second Processed batches: 23 Waiting batches: 1 Received records: 0 Processed records: 0 bit1...@163.com
About Waiting batches on the spark streaming UI
I copied the following from the spark streaming UI, I don't know why the Waiting batches is 1, my understanding is that it should be 72. Following is my understanding: 1. Total time is 1minute 35 seconds=95 seconds 2. Batch interval is 1 second, so, 95 batches are generated in 95 seconds. 3. Processed batches are 23(Correct, because in my processing code, it does nothing but sleep 4 seconds) 4. Then the waiting batches should be 95-23=72 Started at: Fri Apr 03 15:17:47 CST 2015 Time since start: 1 minute 35 seconds Network receivers: 1 Batch interval: 1 second Processed batches: 23 Waiting batches: 1 Received records: 0 Processed records: 0 bit1...@163.com
Re: Spark + Kafka
Please make sure that you have given more cores than Receiver numbers. From: James King Date: 2015-04-01 15:21 To: user Subject: Spark + Kafka I have a simple setup/runtime of Kafka and Sprak. I have a command line consumer displaying arrivals to Kafka topic. So i know messages are being received. But when I try to read from Kafka topic I get no messages, here are some logs below. I'm thinking there aren't enough threads. How do i check that. Thank you. 2015-04-01 08:56:50 INFO JobScheduler:59 - Starting job streaming job 142787141 ms.0 from job set of time 142787141 ms 2015-04-01 08:56:50 INFO JobScheduler:59 - Finished job streaming job 142787141 ms.0 from job set of time 142787141 ms 2015-04-01 08:56:50 INFO JobScheduler:59 - Total delay: 0.002 s for time 142787141 ms (execution: 0.000 s) 2015-04-01 08:56:50 DEBUG JobGenerator:63 - Got event ClearMetadata(142787141 ms) 2015-04-01 08:56:50 DEBUG DStreamGraph:63 - Clearing metadata for time 142787141 ms 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Clearing references to old RDDs: [] 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Unpersisting old RDDs: 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Cleared 0 RDDs that were older than 1427871405000 ms: 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Clearing references to old RDDs: [1427871405000 ms - 8] 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Unpersisting old RDDs: 8 2015-04-01 08:56:50 INFO BlockRDD:59 - Removing RDD 8 from persistence list 2015-04-01 08:56:50 DEBUG BlockManagerMasterActor:50 - [actor] received message RemoveRdd(8) from Actor[akka://sparkDriver/temp/$n] 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:50 - [actor] received message RemoveRdd(8) from Actor[akka://sparkDriver/temp/$o] 2015-04-01 08:56:50 DEBUG BlockManagerMasterActor:56 - [actor] handled message (0.287257 ms) RemoveRdd(8) from Actor[akka://sparkDriver/temp/$n] 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - removing RDD 8 2015-04-01 08:56:50 INFO BlockManager:59 - Removing RDD 8 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - Done removing RDD 8, response is 0 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:56 - [actor] handled message (0.038047 ms) RemoveRdd(8) from Actor[akka://sparkDriver/temp/$o] 2015-04-01 08:56:50 INFO KafkaInputDStream:59 - Removing blocks of RDD BlockRDD[8] at createStream at KafkaLogConsumer.java:53 of time 142787141 ms 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Cleared 1 RDDs that were older than 1427871405000 ms: 1427871405000 ms 2015-04-01 08:56:50 DEBUG DStreamGraph:63 - Cleared old metadata for time 142787141 ms 2015-04-01 08:56:50 INFO ReceivedBlockTracker:59 - Deleting batches ArrayBuffer(142787140 ms) 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - Sent response: 0 to Actor[akka://sparkDriver/temp/$o] 2015-04-01 08:56:50 DEBUG SparkDeploySchedulerBackend:50 - [actor] received message ReviveOffers from Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] 2015-04-01 08:56:50 DEBUG TaskSchedulerImpl:63 - parentName: , name: TaskSet_0, runningTasks: 0 2015-04-01 08:56:50 DEBUG SparkDeploySchedulerBackend:56 - [actor] handled message (0.499181 ms) ReviveOffers from Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] 2015-04-01 08:56:50 WARN TaskSchedulerImpl:71 - Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources 2015-04-01 08:56:51 DEBUG SparkDeploySchedulerBackend:50 - [actor] received message ReviveOffers from Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] 2015-04-01 08:56:51 DEBUG TaskSchedulerImpl:63 - parentName: , name: TaskSet_0, runningTasks: 0 2015-04-01 08:56:51 DEBUG SparkDeploySchedulerBackend:56 - [actor] handled message (0.886121 ms) ReviveOffers from Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] 2015-04-01 08:56:52 DEBUG AppClient$ClientActor:50 - [actor] received message ExecutorUpdated(0,EXITED,Some(Command exited with code 1),Some(1)) from Actor[akka.tcp://sparkMaster@somesparkhost:7077/user/Master#336117298] 2015-04-01 08:56:52 INFO AppClient$ClientActor:59 - Executor updated: app-20150401065621-0007/0 is now EXITED (Command exited with code 1) 2015-04-01 08:56:52 INFO SparkDeploySchedulerBackend:59 - Executor app-20150401065621-0007/0 removed: Command exited with code 1 2015-04-01 08:56:52 DEBUG SparkDeploySchedulerBackend:50 - [actor] received message RemoveExecutor(0,Unknown executor exit code (1)) from Actor[akka://sparkDriver/temp/$p] 2015-04-01 08:56:52 ERROR SparkDeploySchedulerBackend:75 - Asked to remove non-existent executor 0 2015-04-01 08:56:52 DEBUG SparkDeploySchedulerBackend:56 - [actor] handled message (1.394052 ms) RemoveExecutor(0,Unknown executor exit code (1)) from Actor[akka://sparkDriver/temp/$p] 2015-04-01 08:56:52 DEBUG AppClient$ClientActor:56 - [actor] handled message
Re: Re: Explanation on the Hive in the Spark assembly
Thanks Cheng for the great explanation! bit1...@163.com From: Cheng Lian Date: 2015-03-16 00:53 To: bit1...@163.com; Wang, Daoyuan; user Subject: Re: Explanation on the Hive in the Spark assembly Spark SQL supports most commonly used features of HiveQL. However, different HiveQL statements are executed in different manners: DDL statements (e.g. CREATE TABLE, DROP TABLE, etc.) and commands (e.g. SET key = value, ADD FILE, ADD JAR, etc.) In most cases, Spark SQL simply delegates these statements to Hive, as they don’t need to issue any distributed jobs and don’t rely on the computation engine (Spark, MR, or Tez). SELECT queries, CREATE TABLE ... AS SELECT ... statements and insertions These statements are executed using Spark as the execution engine. The Hive classes packaged in the assembly jar are used to provide entry points to Hive features, for example: HiveQL parser Talking to Hive metastore to execute DDL statements Accessing UDF/UDAF/UDTF As for the differences between Hive on Spark and Spark SQL’s Hive support, please refer to this article by Reynold: https://databricks.com/blog/2014/07/01/shark-spark-sql-hive-on-spark-and-the-future-of-sql-on-spark.html Cheng On 3/14/15 10:53 AM, bit1...@163.com wrote: Thanks Daoyuan. What do you mean by running some native command, I never thought that hive will run without an computing engine like Hadoop MR or spark. Thanks. bit1...@163.com From: Wang, Daoyuan Date: 2015-03-13 16:39 To: bit1...@163.com; user Subject: RE: Explanation on the Hive in the Spark assembly Hi bit1129, 1, hive in spark assembly removed most dependencies of hive on Hadoop to avoid conflicts. 2, this hive is used to run some native command, which does not rely on spark or mapreduce. Thanks, Daoyuan From: bit1...@163.com [mailto:bit1...@163.com] Sent: Friday, March 13, 2015 4:24 PM To: user Subject: Explanation on the Hive in the Spark assembly Hi, sparkers, I am kind of confused about hive in the spark assembly. I think hive in the spark assembly is not the same thing as Hive On Spark(Hive On Spark, is meant to run hive using spark execution engine). So, my question is: 1. What is the difference between Hive in the spark assembly and Hive on Hadoop? 2. Does Hive in the spark assembly use Spark execution engine or Hadoop MR engine? Thanks. bit1...@163.com
Re: RE: Explanation on the Hive in the Spark assembly
Thanks Daoyuan. What do you mean by running some native command, I never thought that hive will run without an computing engine like Hadoop MR or spark. Thanks. bit1...@163.com From: Wang, Daoyuan Date: 2015-03-13 16:39 To: bit1...@163.com; user Subject: RE: Explanation on the Hive in the Spark assembly Hi bit1129, 1, hive in spark assembly removed most dependencies of hive on Hadoop to avoid conflicts. 2, this hive is used to run some native command, which does not rely on spark or mapreduce. Thanks, Daoyuan From: bit1...@163.com [mailto:bit1...@163.com] Sent: Friday, March 13, 2015 4:24 PM To: user Subject: Explanation on the Hive in the Spark assembly Hi, sparkers, I am kind of confused about hive in the spark assembly. I think hive in the spark assembly is not the same thing as Hive On Spark(Hive On Spark, is meant to run hive using spark execution engine). So, my question is: 1. What is the difference between Hive in the spark assembly and Hive on Hadoop? 2. Does Hive in the spark assembly use Spark execution engine or Hadoop MR engine? Thanks. bit1...@163.com
How does Spark honor data locality when allocating computing resources for an application
Hi, sparkers, When I read the code about computing resources allocation for the newly submitted application in the Master#schedule method, I got a question about data locality: // Pack each app into as few nodes as possible until we've assigned all its cores for (worker - workers if worker.coresFree 0 worker.state == WorkerState.ALIVE) { for (app - waitingApps if app.coresLeft 0) { if (canUse(app, worker)) { val coresToUse = math.min(worker.coresFree, app.coresLeft) if (coresToUse 0) { val exec = app.addExecutor(worker, coresToUse) launchExecutor(worker, exec) app.state = ApplicationState.RUNNING } } } } Looks that the resource allocation policy here is that Master will assign as few workers as possible, so long as these few workers has enough resources for the application. My question is: Assume that the data the application will process is spread on all the worker nodes, then the data locality is lost if using the above policy? Not sure whether I have unstandood correctly or I have missed something. bit1...@163.com
Explanation on the Hive in the Spark assembly
Hi, sparkers, I am kind of confused about hive in the spark assembly. I think hive in the spark assembly is not the same thing as Hive On Spark(Hive On Spark, is meant to run hive using spark execution engine). So, my question is: 1. What is the difference between Hive in the spark assembly and Hive on Hadoop? 2. Does Hive in the spark assembly use Spark execution engine or Hadoop MR engine? Thanks. bit1...@163.com
Re: Explanation on the Hive in the Spark assembly
Can anyone have a look on this question? Thanks. bit1...@163.com From: bit1...@163.com Date: 2015-03-13 16:24 To: user Subject: Explanation on the Hive in the Spark assembly Hi, sparkers, I am kind of confused about hive in the spark assembly. I think hive in the spark assembly is not the same thing as Hive On Spark(Hive On Spark, is meant to run hive using spark execution engine). So, my question is: 1. What is the difference between Hive in the spark assembly and Hive on Hadoop? 2. Does Hive in the spark assembly use Spark execution engine or Hadoop MR engine? Thanks. bit1...@163.com
Number of cores per executor on Spark Standalone
Hi , I know that spark on yarn has a configuration parameter(executor-cores NUM) to specify the number of cores per executor. How about spark standalone? I can specify the total cores, but how could I know how many cores each executor will take(presume one node one executor)? bit1...@163.com
Re: Re: Many Receiver vs. Many threads per Receiver
Sure, Thanks Tathagata! bit1...@163.com From: Tathagata Das Date: 2015-02-26 14:47 To: bit1...@163.com CC: Akhil Das; user Subject: Re: Re: Many Receiver vs. Many threads per Receiver Spark Streaming has a new Kafka direct stream, to be release as experimental feature with 1.3. That uses a low level consumer. Not sure if it satisfies your purpose. If you want more control, its best to create your own Receiver with the low level Kafka API. TD On Tue, Feb 24, 2015 at 12:09 AM, bit1...@163.com bit1...@163.com wrote: Thanks Akhil. Not sure whether thelowlevel consumer.will be officially supported by Spark Streaming. So far, I don't see it mentioned/documented in the spark streaming programming guide. bit1...@163.com From: Akhil Das Date: 2015-02-24 16:21 To: bit1...@163.com CC: user Subject: Re: Many Receiver vs. Many threads per Receiver I believe when you go with 1, it will distribute the consumer across your cluster (possibly on 6 machines), but still it i don't see a away to tell from which partition it will consume etc. If you are looking to have a consumer where you can specify the partition details and all, then you are better off with the lowlevel consumer. Thanks Best Regards On Tue, Feb 24, 2015 at 9:36 AM, bit1...@163.com bit1...@163.com wrote: Hi, I am experimenting Spark Streaming and Kafka Integration, To read messages from Kafka in parallel, basically there are two ways 1. Create many Receivers like (1 to 6).map(_ = KakfaUtils.createStream). 2. Specifiy many threads when calling KakfaUtils.createStream like val topicMap(myTopic=6), this will create one receiver with 6 reading threads. My question is which option is better, sounds option 2 is better is to me because it saves a lot of cores(one Receiver one core), but I learned from somewhere else that choice 1 is better, so I would ask and see how you guys elaborate on this. Thank bit1...@163.com
Re: Re: Many Receiver vs. Many threads per Receiver
Thanks Akhil. Not sure whether thelowlevel consumer.will be officially supported by Spark Streaming. So far, I don't see it mentioned/documented in the spark streaming programming guide. bit1...@163.com From: Akhil Das Date: 2015-02-24 16:21 To: bit1...@163.com CC: user Subject: Re: Many Receiver vs. Many threads per Receiver I believe when you go with 1, it will distribute the consumer across your cluster (possibly on 6 machines), but still it i don't see a away to tell from which partition it will consume etc. If you are looking to have a consumer where you can specify the partition details and all, then you are better off with the lowlevel consumer. Thanks Best Regards On Tue, Feb 24, 2015 at 9:36 AM, bit1...@163.com bit1...@163.com wrote: Hi, I am experimenting Spark Streaming and Kafka Integration, To read messages from Kafka in parallel, basically there are two ways 1. Create many Receivers like (1 to 6).map(_ = KakfaUtils.createStream). 2. Specifiy many threads when calling KakfaUtils.createStream like val topicMap(myTopic=6), this will create one receiver with 6 reading threads. My question is which option is better, sounds option 2 is better is to me because it saves a lot of cores(one Receiver one core), but I learned from somewhere else that choice 1 is better, so I would ask and see how you guys elaborate on this. Thank bit1...@163.com
Re_ Re_ Does Spark Streaming depend on Hadoop_(4)
I am crazy for frequent mail rejection so I create a new thread SMTP error, DOT: 552 spam score (5.7) exceeded threshold (FREEMAIL_ENVFROM_END_DIGIT,FREEMAIL_REPLY,HTML_FONT_FACE_BAD,HTML_MESSAGE,RCVD_IN_BL_SPAMCOP_NET,SPF_PASS Hi Silvio and Ted I know there is a configuration parameter to control to write log to HDFS, but I didn't enable it. From the stack trace, looks like accessing HDFS is triggered in my code, but I didn't use HDFS, following is my code: object MyKafkaWordCount { def main(args: Array[String]) { println(Start to run MyKafkaWordCount) val conf = new SparkConf().setAppName(MyKafkaWordCount).setMaster(local[20]) val ssc = new StreamingContext(conf, Seconds(3)) val topicMap = Map(topic-p6-r2-1) val zkQuorum = localhost:2181; val group = topic-p6-r2-consumer-group //Kakfa has 6 partitions, here create 6 Receiver val streams = (1 to 6).map ( _ = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) ) //repartition to 18, 3 times of the receiver val partitions = ssc.union(streams).repartition(18).map(DataReceived: + _) partitions.print() ssc.start() ssc.awaitTermination() } } bit1...@163.com
Re: Re: About FlumeUtils.createStream
Thanks both of you guys on this! bit1...@163.com From: Akhil Das Date: 2015-02-24 12:58 To: Tathagata Das CC: user; bit1129 Subject: Re: About FlumeUtils.createStream I see, thanks for the clarification TD. On 24 Feb 2015 09:56, Tathagata Das t...@databricks.com wrote: Akhil, that is incorrect. Spark will list on the given port for Flume to push data into it. When in local mode, it will listen on localhost: When in some kind of cluster, instead of localhost you will have to give the hostname of the cluster node where you want Flume to forward the data. Spark will launch the Flume receiver on that node (assuming the hostname matching is correct), and list on port , for receiving data from Flume. So only the configured machine will listen on port . I suggest trying the other stream. FlumeUtils.createPollingStream. More details here. http://spark.apache.org/docs/latest/streaming-flume-integration.html On Sat, Feb 21, 2015 at 12:17 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Spark won't listen on mate, It basically means you have a flume source running at port of your localhost. And when you submit your application in standalone mode, workers will consume date from that port. Thanks Best Regards On Sat, Feb 21, 2015 at 9:22 AM, bit1...@163.com bit1...@163.com wrote: Hi, In the spark streaming application, I write the code, FlumeUtils.createStream(ssc,localhost,),which means spark will listen on the port, and wait for Flume Sink to write to it. My question is: when I submit the application to the Spark Standalone cluster, will be opened only on the Driver Machine or all the workers will also open the port and wait for the Flume data?
Re: Re: About FlumeUtils.createStream
The behvior is exactly what I expected. Thanks Akhil and Tathagata! bit1...@163.com From: Akhil Das Date: 2015-02-24 13:32 To: bit1129 CC: Tathagata Das; user Subject: Re: Re: About FlumeUtils.createStream That depends on how many machines you have in your cluster. Say you have 6 workers and its most likely it is to be distributed across all worker (assuming your topic has 6 partitions). Now when you have more than 6 partition, say 12. Then these 6 receivers will start to consume from 2 partitions at a time. And when you have less partitions say 3, then 3 of the receivers will be idle. On 24 Feb 2015 10:16, bit1...@163.com bit1...@163.com wrote: Hi, Akhil,Tathagata, This leads me to another question ,For the Spark Streaming and Kafka Integration, If there are more than one Receiver in the cluster, such as val streams = (1 to 6).map ( _ = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) ), then these Receivers will stay on one cluster node, or will they distributed among the cluster nodes? bit1...@163.com From: Akhil Das Date: 2015-02-24 12:58 To: Tathagata Das CC: user; bit1129 Subject: Re: About FlumeUtils.createStream I see, thanks for the clarification TD. On 24 Feb 2015 09:56, Tathagata Das t...@databricks.com wrote: Akhil, that is incorrect. Spark will list on the given port for Flume to push data into it. When in local mode, it will listen on localhost: When in some kind of cluster, instead of localhost you will have to give the hostname of the cluster node where you want Flume to forward the data. Spark will launch the Flume receiver on that node (assuming the hostname matching is correct), and list on port , for receiving data from Flume. So only the configured machine will listen on port . I suggest trying the other stream. FlumeUtils.createPollingStream. More details here. http://spark.apache.org/docs/latest/streaming-flume-integration.html On Sat, Feb 21, 2015 at 12:17 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Spark won't listen on mate, It basically means you have a flume source running at port of your localhost. And when you submit your application in standalone mode, workers will consume date from that port. Thanks Best Regards On Sat, Feb 21, 2015 at 9:22 AM, bit1...@163.com bit1...@163.com wrote: Hi, In the spark streaming application, I write the code, FlumeUtils.createStream(ssc,localhost,),which means spark will listen on the port, and wait for Flume Sink to write to it. My question is: when I submit the application to the Spark Standalone cluster, will be opened only on the Driver Machine or all the workers will also open the port and wait for the Flume data?
Re: Re: About FlumeUtils.createStream
Hi, Akhil,Tathagata, This leads me to another question ,For the Spark Streaming and Kafka Integration, If there are more than one Receiver in the cluster, such as val streams = (1 to 6).map ( _ = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) ), then these Receivers will stay on one cluster node, or will they distributed among the cluster nodes? bit1...@163.com From: Akhil Das Date: 2015-02-24 12:58 To: Tathagata Das CC: user; bit1129 Subject: Re: About FlumeUtils.createStream I see, thanks for the clarification TD. On 24 Feb 2015 09:56, Tathagata Das t...@databricks.com wrote: Akhil, that is incorrect. Spark will list on the given port for Flume to push data into it. When in local mode, it will listen on localhost: When in some kind of cluster, instead of localhost you will have to give the hostname of the cluster node where you want Flume to forward the data. Spark will launch the Flume receiver on that node (assuming the hostname matching is correct), and list on port , for receiving data from Flume. So only the configured machine will listen on port . I suggest trying the other stream. FlumeUtils.createPollingStream. More details here. http://spark.apache.org/docs/latest/streaming-flume-integration.html On Sat, Feb 21, 2015 at 12:17 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Spark won't listen on mate, It basically means you have a flume source running at port of your localhost. And when you submit your application in standalone mode, workers will consume date from that port. Thanks Best Regards On Sat, Feb 21, 2015 at 9:22 AM, bit1...@163.com bit1...@163.com wrote: Hi, In the spark streaming application, I write the code, FlumeUtils.createStream(ssc,localhost,),which means spark will listen on the port, and wait for Flume Sink to write to it. My question is: when I submit the application to the Spark Standalone cluster, will be opened only on the Driver Machine or all the workers will also open the port and wait for the Flume data?
Re: Re: Does Spark Streaming depend on Hadoop?
[hadoop@hadoop bin]$ sh submit.log.streaming.kafka.complicated.sh Spark assembly has been built with Hive, including Datanucleus jars on classpath Start to run MyKafkaWordCount Exception in thread main java.net.ConnectException: Call From hadoop.master/192.168.26.137 to hadoop.master:9000 failed on connection exception: java.net.ConnectException: Connection refused; For more details see: http://wiki.apache.org/hadoop/ConnectionRefused at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:783) at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:730) at org.apache.hadoop.ipc.Client.call(Client.java:1414) at org.apache.hadoop.ipc.Client.call(Client.java:1363) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206) at com.sun.proxy.$Proxy18.getFileInfo(Unknown Source) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:190) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103) at com.sun.proxy.$Proxy18.getFileInfo(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:699) at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1762) at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1124) at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1120) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1120) at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1398) at org.apache.spark.util.FileLogger.createLogDir(FileLogger.scala:123) at org.apache.spark.util.FileLogger.start(FileLogger.scala:115) at org.apache.spark.scheduler.EventLoggingListener.start(EventLoggingListener.scala:74) at org.apache.spark.SparkContext.init(SparkContext.scala:353) at org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:571) at org.apache.spark.streaming.StreamingContext.init(StreamingContext.scala:74) at spark.examples.streaming.MyKafkaWordCount$.main(MyKafkaWordCount.scala:14) at spark.examples.streaming.MyKafkaWordCount.main(MyKafkaWordCount.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206) at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:529) at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:493) at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:604) at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:699) at org.apache.hadoop.ipc.Client$Connection.access$2800(Client.java:367) at org.apache.hadoop.ipc.Client.getConnection(Client.java:1462) at org.apache.hadoop.ipc.Client.call(Client.java:1381) ... 32 more bit1...@163.com From: Ted Yu Date: 2015-02-24 10:24 To: bit1...@163.com CC: user Subject: Re: Does Spark Streaming depend on Hadoop? Can you pastebin the whole stack trace ? Thanks On Feb 23, 2015, at 6:14 PM, bit1...@163.com bit1...@163.com wrote: Hi, When I submit a spark streaming application with following script, ./spark-submit --name MyKafkaWordCount --master local[20] --executor-memory 512M --total-executor-cores 2 --class spark.examples.streaming.MyKafkaWordCount my.kakfa.wordcountjar An exception occurs: Exception in thread main java.net.ConnectException: Call From hadoop.master/192.168.26.137 to hadoop.master:9000 failed on connection exception. From the exception, it tries to connect to 9000 which
Does Spark Streaming depend on Hadoop?
Hi, When I submit a spark streaming application with following script, ./spark-submit --name MyKafkaWordCount --master local[20] --executor-memory 512M --total-executor-cores 2 --class spark.examples.streaming.MyKafkaWordCount my.kakfa.wordcountjar An exception occurs: Exception in thread main java.net.ConnectException: Call From hadoop.master/192.168.26.137 to hadoop.master:9000 failed on connection exception. From the exception, it tries to connect to 9000 which is for Hadoop/HDFS. and I don't use Hadoop at all in my code(such as save to HDFS). bit1...@163.com
Re: Re: Re_ Re_ Does Spark Streaming depend on Hadoop_(4)
Thanks Tathagata! You are right, I have packaged the contents of the spark shipped example jar into my jarwhich contains serveral HDFS configuration files like hdfs-default.xml etc. Thanks! bit1...@163.com From: Tathagata Das Date: 2015-02-24 12:04 To: bit1...@163.com CC: yuzhihong; silvio.fiorito; user Subject: Re: Re_ Re_ Does Spark Streaming depend on Hadoop_(4) You could have a hdfs configuration files in the classpath of the program. HDFS libraries that Spark uses automatically picks those up and starts using them. TD On Mon, Feb 23, 2015 at 7:47 PM, bit1...@163.com bit1...@163.com wrote: I am crazy for frequent mail rejection so I create a new thread SMTP error, DOT: 552 spam score (5.7) exceeded threshold (FREEMAIL_ENVFROM_END_DIGIT,FREEMAIL_REPLY,HTML_FONT_FACE_BAD,HTML_MESSAGE,RCVD_IN_BL_SPAMCOP_NET,SPF_PASS Hi Silvio and Ted I know there is a configuration parameter to control to write log to HDFS, but I didn't enable it. From the stack trace, looks like accessing HDFS is triggered in my code, but I didn't use HDFS, following is my code: object MyKafkaWordCount { def main(args: Array[String]) { println(Start to run MyKafkaWordCount) val conf = new SparkConf().setAppName(MyKafkaWordCount).setMaster(local[20]) val ssc = new StreamingContext(conf, Seconds(3)) val topicMap = Map(topic-p6-r2-1) val zkQuorum = localhost:2181; val group = topic-p6-r2-consumer-group //Kakfa has 6 partitions, here create 6 Receiver val streams = (1 to 6).map ( _ = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) ) //repartition to 18, 3 times of the receiver val partitions = ssc.union(streams).repartition(18).map(DataReceived: + _) partitions.print() ssc.start() ssc.awaitTermination() } } bit1...@163.com
Many Receiver vs. Many threads per Receiver
Hi, I am experimenting Spark Streaming and Kafka Integration, To read messages from Kafka in parallel, basically there are two ways 1. Create many Receivers like (1 to 6).map(_ = KakfaUtils.createStream). 2. Specifiy many threads when calling KakfaUtils.createStream like val topicMap(myTopic=6), this will create one receiver with 6 reading threads. My question is which option is better, sounds option 2 is better is to me because it saves a lot of cores(one Receiver one core), but I learned from somewhere else that choice 1 is better, so I would ask and see how you guys elaborate on this. Thank bit1...@163.com
Re: Re: Spark streaming doesn't print output when working with standalone master
Thanks Akhil. From: Akhil Das Date: 2015-02-20 16:29 To: bit1...@163.com CC: user Subject: Re: Re: Spark streaming doesn't print output when working with standalone master local[3] spawns 3 threads on 1 core :) Thanks Best Regards On Fri, Feb 20, 2015 at 12:50 PM, bit1...@163.com bit1...@163.com wrote: Thanks Akhil, you are right. I checked and find that I have only 1 core allocated to the program I am running on a visual machine,and only allocate one processor to it(1 core per processor), so even if I have specified --total-executor-cores 3 in the submit script, the application will still only be allocated one processor. This leads to me another question: Although I have only one core, If I have specified the master and executor as --master local[3] --executor-memory 512M --total-executor-cores 3. Since I have only one core, why does this work? bit1...@163.com From: Akhil Das Date: 2015-02-20 15:13 To: bit1...@163.com CC: user Subject: Re: Spark streaming doesn't print output when working with standalone master While running the program go to your clusters webUI (that runs on 8080, prolly at hadoop.master:8080) and see how many cores are allocated to the program, it should be = 2 for the stream to get processed. Thanks Best Regards On Fri, Feb 20, 2015 at 9:29 AM, bit1...@163.com bit1...@163.com wrote: Hi, I am trying the spark streaming log analysis reference application provided by Databricks at https://github.com/databricks/reference-apps/tree/master/logs_analyzer When I deploy the code to the standalone cluster, there is no output at will with the following shell script.Which means, the windowDStream has 0 RDDs ./spark-submit --deploy-mode client --name LogAnalyzerStreaming --master spark://hadoop.master:7077 --executor-memory 512M --total-executor-cores 3 --class spark.examples.databricks.reference.apps.loganalysis.LogAnalyzerStreaming LogApp.jar But, when I change --master to be --master local[3], the program starts to work fine. Can anyone have some advice? Thanks! ./spark-submit --deploy-mode client --name LogAnalyzerStreaming --master local[3] --executor-memory 512M --total-executor-cores 3 --class spark.examples.databricks.reference.apps.loganalysis.LogAnalyzerStreaming LogApp.jar object LogAnalyzerStreaming { val WINDOW_LENGTH = new Duration(12 * 1000) val SLIDE_INTERVAL = new Duration(6 * 1000) def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName(Log Analyzer Streaming in Scala) val sc = new SparkContext(sparkConf) val streamingContext = new StreamingContext(sc, SLIDE_INTERVAL) val logLinesDStream = streamingContext.socketTextStream(localhost, ) val accessLogsDStream = logLinesDStream.map(ApacheAccessLog.parseLogLine).cache() val windowDStream = accessLogsDStream.window(WINDOW_LENGTH, SLIDE_INTERVAL) windowDStream.foreachRDD(accessLogs = { if (accessLogs.count() == 0) { println(No access com.databricks.app.logs received in this time interval) } else { // Calculate statistics based on the content size. val contentSizes = accessLogs.map(log = log.contentSize).cache() println(Content Size Avg: %s, Min: %s, Max: %s.format( contentSizes.reduce(_ + _) / contentSizes.count, contentSizes.min, contentSizes.max )) streamingContext.start() streamingContext.awaitTermination() } } 邮件带有附件预览链接,若您转发或回复此邮件时不希望对方预览附件,建议您手动删除链接。 共有 1 个附件 image.png(13K) 极速下载 在线预览 邮件带有附件预览链接,若您转发或回复此邮件时不希望对方预览附件,建议您手动删除链接。 共有 1 个附件 image(02-20-15-14-57).png(13K) 极速下载 在线预览
About FlumeUtils.createStream
Hi, In the spark streaming application, I write the code, FlumeUtils.createStream(ssc,localhost,),which means spark will listen on the port, and wait for Flume Sink to write to it. My question is: when I submit the application to the Spark Standalone cluster, will be opened only on the Driver Machine or all the workers will also open the port and wait for the Flume data?
Re: Re: Spark streaming doesn't print output when working with standalone master
Thanks Akhil, you are right. I checked and find that I have only 1 core allocated to the program I am running on a visual machine,and only allocate one processor to it(1 core per processor), so even if I have specified --total-executor-cores 3 in the submit script, the application will still only be allocated one processor. This leads to me another question: Although I have only one core, If I have specified the master and executor as --master local[3] --executor-memory 512M --total-executor-cores 3. Since I have only one core, why does this work? bit1...@163.com From: Akhil Das Date: 2015-02-20 15:13 To: bit1...@163.com CC: user Subject: Re: Spark streaming doesn't print output when working with standalone master While running the program go to your clusters webUI (that runs on 8080, prolly at hadoop.master:8080) and see how many cores are allocated to the program, it should be = 2 for the stream to get processed. Thanks Best Regards On Fri, Feb 20, 2015 at 9:29 AM, bit1...@163.com bit1...@163.com wrote: Hi, I am trying the spark streaming log analysis reference application provided by Databricks at https://github.com/databricks/reference-apps/tree/master/logs_analyzer When I deploy the code to the standalone cluster, there is no output at will with the following shell script.Which means, the windowDStream has 0 RDDs ./spark-submit --deploy-mode client --name LogAnalyzerStreaming --master spark://hadoop.master:7077 --executor-memory 512M --total-executor-cores 3 --class spark.examples.databricks.reference.apps.loganalysis.LogAnalyzerStreaming LogApp.jar But, when I change --master to be --master local[3], the program starts to work fine. Can anyone have some advice? Thanks! ./spark-submit --deploy-mode client --name LogAnalyzerStreaming --master local[3] --executor-memory 512M --total-executor-cores 3 --class spark.examples.databricks.reference.apps.loganalysis.LogAnalyzerStreaming LogApp.jar object LogAnalyzerStreaming { val WINDOW_LENGTH = new Duration(12 * 1000) val SLIDE_INTERVAL = new Duration(6 * 1000) def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName(Log Analyzer Streaming in Scala) val sc = new SparkContext(sparkConf) val streamingContext = new StreamingContext(sc, SLIDE_INTERVAL) val logLinesDStream = streamingContext.socketTextStream(localhost, ) val accessLogsDStream = logLinesDStream.map(ApacheAccessLog.parseLogLine).cache() val windowDStream = accessLogsDStream.window(WINDOW_LENGTH, SLIDE_INTERVAL) windowDStream.foreachRDD(accessLogs = { if (accessLogs.count() == 0) { println(No access com.databricks.app.logs received in this time interval) } else { // Calculate statistics based on the content size. val contentSizes = accessLogs.map(log = log.contentSize).cache() println(Content Size Avg: %s, Min: %s, Max: %s.format( contentSizes.reduce(_ + _) / contentSizes.count, contentSizes.min, contentSizes.max )) streamingContext.start() streamingContext.awaitTermination() } } 邮件带有附件预览链接,若您转发或回复此邮件时不希望对方预览附件,建议您手动删除链接。 共有 1 个附件 image.png(13K) 极速下载 在线预览
Spark streaming doesn't print output when working with standalone master
Hi, I am trying the spark streaming log analysis reference application provided by Databricks at https://github.com/databricks/reference-apps/tree/master/logs_analyzer When I deploy the code to the standalone cluster, there is no output at will with the following shell script.Which means, the windowDStream has 0 RDDs ./spark-submit --deploy-mode client --name LogAnalyzerStreaming --master spark://hadoop.master:7077 --executor-memory 512M --total-executor-cores 3 --class spark.examples.databricks.reference.apps.loganalysis.LogAnalyzerStreaming LogApp.jar But, when I change --master to be --master local[3], the program starts to work fine. Can anyone have some advice? Thanks! ./spark-submit --deploy-mode client --name LogAnalyzerStreaming --master local[3] --executor-memory 512M --total-executor-cores 3 --class spark.examples.databricks.reference.apps.loganalysis.LogAnalyzerStreaming LogApp.jar object LogAnalyzerStreaming { val WINDOW_LENGTH = new Duration(12 * 1000) val SLIDE_INTERVAL = new Duration(6 * 1000) def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName(Log Analyzer Streaming in Scala) val sc = new SparkContext(sparkConf) val streamingContext = new StreamingContext(sc, SLIDE_INTERVAL) val logLinesDStream = streamingContext.socketTextStream(localhost, ) val accessLogsDStream = logLinesDStream.map(ApacheAccessLog.parseLogLine).cache() val windowDStream = accessLogsDStream.window(WINDOW_LENGTH, SLIDE_INTERVAL) windowDStream.foreachRDD(accessLogs = { if (accessLogs.count() == 0) { println(No access com.databricks.app.logs received in this time interval) } else { // Calculate statistics based on the content size. val contentSizes = accessLogs.map(log = log.contentSize).cache() println(Content Size Avg: %s, Min: %s, Max: %s.format( contentSizes.reduce(_ + _) / contentSizes.count, contentSizes.min, contentSizes.max )) streamingContext.start() streamingContext.awaitTermination() } }
java.lang.StackOverflowError when doing spark sql
I am using spark 1.2.0(prebuild with hadoop 2.4) on windows7 I found a same bug here https://issues.apache.org/jira/browse/SPARK-4208,but it is still open, is there a workaround for this? Thanks! The stack trace: StackOverflow Exception occurs Exception in thread main java.lang.StackOverflowError at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
Re: Problem with 1 master + 2 slaves cluster
But I am able to run the SparkPi example: ./run-example SparkPi 1000 --master spark://192.168.26.131:7077 Result:Pi is roughly 3.14173708 bit1...@163.com From: bit1...@163.com Date: 2015-02-18 16:29 To: user Subject: Problem with 1 master + 2 slaves cluster Hi sparkers, I setup a spark(1.2.1) cluster with 1 master and 2 slaves, and then startup them, everything looks running normally. In the master node, I run the spark-shell, with the following steps: bin/spark-shell --master spark://192.168.26.131:7077 scala var rdd = sc.textFile(file:///home/hadoop/history.txt.used.byspark, 7) rdd.flatMap(_.split( )).map((_, 1)).reduceByKey(_ + _,5).map(x = (x._2, x._1)).sortByKey(false).map(x = (x._2, x._1)).saveAsTextFile(file:///home/hadoop/output) After finishing running the application, there is no word count related output, there does exist an output directory appear on each slave node, but there is only a _temporary subdirectory Any ideas? Thanks!
Re: Re: Problem with 1 master + 2 slaves cluster
Sure, thanks Akhil. A further question : Is local file system(file:///) not supported in standalone cluster? bit1...@163.com From: Akhil Das Date: 2015-02-18 17:35 To: bit1...@163.com CC: user Subject: Re: Problem with 1 master + 2 slaves cluster Since the cluster is standalone, you are better off reading/writing to hdfs instead of local filesystem. Thanks Best Regards On Wed, Feb 18, 2015 at 2:32 PM, bit1...@163.com bit1...@163.com wrote: But I am able to run the SparkPi example: ./run-example SparkPi 1000 --master spark://192.168.26.131:7077 Result:Pi is roughly 3.14173708 bit1...@163.com From: bit1...@163.com Date: 2015-02-18 16:29 To: user Subject: Problem with 1 master + 2 slaves cluster Hi sparkers, I setup a spark(1.2.1) cluster with 1 master and 2 slaves, and then startup them, everything looks running normally. In the master node, I run the spark-shell, with the following steps: bin/spark-shell --master spark://192.168.26.131:7077 scala var rdd = sc.textFile(file:///home/hadoop/history.txt.used.byspark, 7) rdd.flatMap(_.split( )).map((_, 1)).reduceByKey(_ + _,5).map(x = (x._2, x._1)).sortByKey(false).map(x = (x._2, x._1)).saveAsTextFile(file:///home/hadoop/output) After finishing running the application, there is no word count related output, there does exist an output directory appear on each slave node, but there is only a _temporary subdirectory Any ideas? Thanks!
Re: Re: Question about spark streaming+Flume
Hi Arush, With your code, I still didn't see the output Received X flumes events.. bit1...@163.com From: bit1...@163.com Date: 2015-02-17 14:08 To: Arush Kharbanda CC: user Subject: Re: Re: Question about spark streaming+Flume Ok, you are missing a letter in foreachRDD.. let me proceed.. bit1...@163.com From: Arush Kharbanda Date: 2015-02-17 14:31 To: bit1...@163.com CC: user Subject: Re: Question about spark streaming+Flume Hi Can you try this val lines = FlumeUtils.createStream(ssc,localhost,) // Print out the count of events received from this server in each batch lines.count().map(cnt = Received + cnt + flume events. at + System.currentTimeMillis() ) lines.forechRDD(_.foreach(println)) Thanks Arush On Tue, Feb 17, 2015 at 11:17 AM, bit1...@163.com bit1...@163.com wrote: Hi, I am trying Spark Streaming + Flume example: 1. Code object SparkFlumeNGExample { def main(args : Array[String]) { val conf = new SparkConf().setAppName(SparkFlumeNGExample) val ssc = new StreamingContext(conf, Seconds(10)) val lines = FlumeUtils.createStream(ssc,localhost,) // Print out the count of events received from this server in each batch lines.count().map(cnt = Received + cnt + flume events. at + System.currentTimeMillis() ).print() ssc.start() ssc.awaitTermination(); } } 2. I submit the application with following sh: ./spark-submit --deploy-mode client --name SparkFlumeEventCount --master spark://hadoop.master:7077 --executor-memory 512M --total-executor-cores 2 --class spark.examples.streaming.SparkFlumeNGWordCount spark-streaming-flume.jar When I write data to flume, I only notice the following console information that input is added. storage.BlockManagerInfo: Added input-0-1424151807400 in memory on localhost:39338 (size: 1095.0 B, free: 267.2 MB) 15/02/17 00:43:30 INFO scheduler.JobScheduler: Added jobs for time 142415181 ms 15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time 142415182 ms 15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time 142415187 ms But I didn't the output from the code: Received X flumes events I am no idea where the problem is, any idea? Thanks -- Arush Kharbanda || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Question about spark streaming+Flume
Hi, I am trying Spark Streaming + Flume example: 1. Code object SparkFlumeNGExample { def main(args : Array[String]) { val conf = new SparkConf().setAppName(SparkFlumeNGExample) val ssc = new StreamingContext(conf, Seconds(10)) val lines = FlumeUtils.createStream(ssc,localhost,) // Print out the count of events received from this server in each batch lines.count().map(cnt = Received + cnt + flume events. at + System.currentTimeMillis() ).print() ssc.start() ssc.awaitTermination(); } } 2. I submit the application with following sh: ./spark-submit --deploy-mode client --name SparkFlumeEventCount --master spark://hadoop.master:7077 --executor-memory 512M --total-executor-cores 2 --class spark.examples.streaming.SparkFlumeNGWordCount spark-streaming-flume.jar When I write data to flume, I only notice the following console information that input is added. storage.BlockManagerInfo: Added input-0-1424151807400 in memory on localhost:39338 (size: 1095.0 B, free: 267.2 MB) 15/02/17 00:43:30 INFO scheduler.JobScheduler: Added jobs for time 142415181 ms 15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time 142415182 ms 15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time 142415187 ms But I didn't the output from the code: Received X flumes events I am no idea where the problem is, any idea? Thanks
Re: Re: Question about spark streaming+Flume
Ok, you are missing a letter in foreachRDD.. let me proceed.. bit1...@163.com From: Arush Kharbanda Date: 2015-02-17 14:31 To: bit1...@163.com CC: user Subject: Re: Question about spark streaming+Flume Hi Can you try this val lines = FlumeUtils.createStream(ssc,localhost,) // Print out the count of events received from this server in each batch lines.count().map(cnt = Received + cnt + flume events. at + System.currentTimeMillis() ) lines.forechRDD(_.foreach(println)) Thanks Arush On Tue, Feb 17, 2015 at 11:17 AM, bit1...@163.com bit1...@163.com wrote: Hi, I am trying Spark Streaming + Flume example: 1. Code object SparkFlumeNGExample { def main(args : Array[String]) { val conf = new SparkConf().setAppName(SparkFlumeNGExample) val ssc = new StreamingContext(conf, Seconds(10)) val lines = FlumeUtils.createStream(ssc,localhost,) // Print out the count of events received from this server in each batch lines.count().map(cnt = Received + cnt + flume events. at + System.currentTimeMillis() ).print() ssc.start() ssc.awaitTermination(); } } 2. I submit the application with following sh: ./spark-submit --deploy-mode client --name SparkFlumeEventCount --master spark://hadoop.master:7077 --executor-memory 512M --total-executor-cores 2 --class spark.examples.streaming.SparkFlumeNGWordCount spark-streaming-flume.jar When I write data to flume, I only notice the following console information that input is added. storage.BlockManagerInfo: Added input-0-1424151807400 in memory on localhost:39338 (size: 1095.0 B, free: 267.2 MB) 15/02/17 00:43:30 INFO scheduler.JobScheduler: Added jobs for time 142415181 ms 15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time 142415182 ms 15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time 142415187 ms But I didn't the output from the code: Received X flumes events I am no idea where the problem is, any idea? Thanks -- Arush Kharbanda || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Re: Re: Question about spark streaming+Flume
Thanks Arush.. With your code, compiling error occurs: Error:(19, 11) value forechRDD is not a member of org.apache.spark.streaming.dstream.ReceiverInputDStream[org.apache.spark.streaming.flume.SparkFlumeEvent] lines.forechRDD(_.foreach(println)) ^ From: Arush Kharbanda Date: 2015-02-17 14:31 To: bit1...@163.com CC: user Subject: Re: Question about spark streaming+Flume Hi Can you try this val lines = FlumeUtils.createStream(ssc,localhost,) // Print out the count of events received from this server in each batch lines.count().map(cnt = Received + cnt + flume events. at + System.currentTimeMillis() ) lines.forechRDD(_.foreach(println)) Thanks Arush On Tue, Feb 17, 2015 at 11:17 AM, bit1...@163.com bit1...@163.com wrote: Hi, I am trying Spark Streaming + Flume example: 1. Code object SparkFlumeNGExample { def main(args : Array[String]) { val conf = new SparkConf().setAppName(SparkFlumeNGExample) val ssc = new StreamingContext(conf, Seconds(10)) val lines = FlumeUtils.createStream(ssc,localhost,) // Print out the count of events received from this server in each batch lines.count().map(cnt = Received + cnt + flume events. at + System.currentTimeMillis() ).print() ssc.start() ssc.awaitTermination(); } } 2. I submit the application with following sh: ./spark-submit --deploy-mode client --name SparkFlumeEventCount --master spark://hadoop.master:7077 --executor-memory 512M --total-executor-cores 2 --class spark.examples.streaming.SparkFlumeNGWordCount spark-streaming-flume.jar When I write data to flume, I only notice the following console information that input is added. storage.BlockManagerInfo: Added input-0-1424151807400 in memory on localhost:39338 (size: 1095.0 B, free: 267.2 MB) 15/02/17 00:43:30 INFO scheduler.JobScheduler: Added jobs for time 142415181 ms 15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time 142415182 ms 15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time 142415187 ms But I didn't the output from the code: Received X flumes events I am no idea where the problem is, any idea? Thanks -- Arush Kharbanda || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Re: Hi: hadoop 2.5 for spark
You can use prebuilt version that is built upon hadoop2.4. From: Siddharth Ubale Date: 2015-01-30 15:50 To: user@spark.apache.org Subject: Hi: hadoop 2.5 for spark Hi , I am beginner with Apache spark. Can anyone let me know if it is mandatory to build spark with the Hadoop version I am using or can I use a pre built package and use it with my existing HDFS root folder? I am using Hadoop 2.5.0 and want to use Apache spark 1.2.0 with it. I could see a pre built version for 2.4 and above in the downbloads section of Spark homepage - downloads. Siddharth Ubale, Synchronized Communications #43, Velankani Tech Park, Block No. II, 3rd Floor, Electronic City Phase I, Bangalore – 560 100 Tel : +91 80 3202 4060 Web: www.syncoms.com London|Bangalore|Orlando we innovate, plan, execute, and transform the business 邮件带有附件预览链接,若您转发或回复此邮件时不希望对方预览附件,建议您手动删除链接。 共有 1 个附件 image001.jpg(3K) 极速下载 在线预览
Re: RE: Shuffle to HDFS
I have also thought that Hadoop mapper output result is saved on HDFS, at least if the job only has Mapper but doesn't have Reducer. If there is reducer, then the map output will be saved on local disk? From: Shao, Saisai Date: 2015-01-26 15:23 To: Larry Liu CC: u...@spark.incubator.apache.org Subject: RE: Shuffle to HDFS Hey Larry, I don’t think Hadoop will put shuffle output in HDFS, instead it’s behavior is the same as what Spark did, store mapper output (shuffle) data on local disks. You might misunderstood something J. Thanks Jerry From: Larry Liu [mailto:larryli...@gmail.com] Sent: Monday, January 26, 2015 3:03 PM To: Shao, Saisai Cc: u...@spark.incubator.apache.org Subject: Re: Shuffle to HDFS Hi,Jerry Thanks for your reply. The reason I have this question is that in Hadoop, mapper intermediate output (shuffle) will be stored in HDFS. I think the default location for spark is /tmp I think. Larry On Sun, Jan 25, 2015 at 9:44 PM, Shao, Saisai saisai.s...@intel.com wrote: Hi Larry, I don’t think current Spark’s shuffle can support HDFS as a shuffle output. Anyway, is there any specific reason to spill shuffle data to HDFS or NFS, this will severely increase the shuffle time. Thanks Jerry From: Larry Liu [mailto:larryli...@gmail.com] Sent: Sunday, January 25, 2015 4:45 PM To: u...@spark.incubator.apache.org Subject: Shuffle to HDFS How to change shuffle output to HDFS or NFS?
Error occurs when running Spark SQL example
When I run the following spark sql example within Idea, I got the StackOverflowError, lookes like the scala.util.parsing.combinator.Parsers are calling recursively and infinitely. Anyone encounters this? package spark.examples import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.sql._ case class Person(name: String, age: Int) object SparkSQLExample { def main (args: Array[String]) { System.setProperty(hadoop.home.dir, E:\\devsoftware\\hadoop-2.5.2\\hadoop-2.5.2); val conf = new SparkConf() conf.setAppName(SparkSQLExample) conf.setMaster(local) val sc = new SparkContext(conf) val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ val people = sc.textFile(file:///D:/people.txt).map(_.split(,)).map(p = Person(p(0), p(1).trim.toInt)) people.registerTempTable(people) val teenagers = sqlContext.sql(SELECT name FROM people WHERE age = 13 AND age = 19) teenagers.map(t = Name: + t(0)).collect().foreach(println) } } The errors is: 15/01/17 19:42:14.489 main INFO SparkContext: Created broadcast 0 from textFile at SparkSQLExample.scala:24 15/01/17 19:42:15.464 main DEBUG DDLParser: Not recognized as DDL: [1.1] failure: ``CREATE'' expected but identifier SELECT found SELECT name FROM people WHERE age = 13 AND age = 19 ^ Exception in thread main java.lang.StackOverflowError at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
EventBatch and SparkFlumeProtocol not found in spark codebase?
Hi, When I fetch the Spark code base and import into Intellj Idea as SBT project, then I build it with SBT, but there is compiling errors in the examples module,complaining that the EventBatch and SparkFlumeProtocol,looks they should be in org.apache.spark.streaming.flume.sink package. Not sure what happens. Thanks.
Re: Re: I think I am almost lost in the internals of Spark
Thanks Eric. Yes..I am Chinese, :-). I will read through the articles, thank you! bit1...@163.com From: eric wong Date: 2015-01-07 10:46 To: bit1...@163.com CC: user Subject: Re: Re: I think I am almost lost in the internals of Spark A good beginning if you are chinese. https://github.com/JerryLead/SparkInternals/tree/master/markdown 2015-01-07 10:13 GMT+08:00 bit1...@163.com bit1...@163.com: Thank you, Tobias. I will look into the Spark paper. But it looks that the paper has been moved, http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf. A web page is returned (Resource not found)when I access it. bit1...@163.com From: Tobias Pfeiffer Date: 2015-01-07 09:24 To: Todd CC: user Subject: Re: I think I am almost lost in the internals of Spark Hi, On Tue, Jan 6, 2015 at 11:24 PM, Todd bit1...@163.com wrote: I am a bit new to Spark, except that I tried simple things like word count, and the examples given in the spark sql programming guide. Now, I am investigating the internals of Spark, but I think I am almost lost, because I could not grasp a whole picture what spark does when it executes the word count. I recommend understanding what an RDD is and how it is processed, using http://spark.apache.org/docs/latest/programming-guide.html#resilient-distributed-datasets-rdds and probably also http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf (once the server is back). Understanding how an RDD is processed is probably most helpful to understand the whole of Spark. Tobias -- 王海华
Re: Unable to build spark from source
The error hints that the maven module scala-compiler can't be fetched from repo1.maven.org. Should some repositoy urls be added to the Maven's settings file? bit1...@163.com From: Manoj Kumar Date: 2015-01-03 18:46 To: user Subject: Unable to build spark from source Hello, I tried to build Spark from source using this command (all dependencies installed) but it fails this error. Any help would be appreciated. mvn -DskipTests clean package [INFO] Spark Project Parent POM .. FAILURE [28:14.408s] [INFO] Spark Project Networking .. SKIPPED [INFO] Spark Project Shuffle Streaming Service ... SKIPPED [INFO] Spark Project Core SKIPP INFO] BUILD FAILURE [INFO] [INFO] Total time: 28:15.136s [INFO] Finished at: Sat Jan 03 15:26:31 IST 2015 [INFO] Final Memory: 20M/143M [INFO] [ERROR] Failed to execute goal net.alchim31.maven:scala-maven-plugin:3.2.0:compile (scala-compile-first) on project spark-parent: Execution scala-compile-first of goal net.alchim31.maven:scala-maven-plugin:3.2.0:compile failed: Plugin net.alchim31.maven:scala-maven-plugin:3.2.0 or one of its dependencies could not be resolved: The following artifacts could not be resolved: org.scala-lang:scala-compiler:jar:2.10.3, org.scala-lang:scala-reflect:jar:2.10.3: Could not transfer artifact org.scala-lang:scala-compiler:jar:2.10.3 from/to central (https://repo1.maven.org/maven2): GET request of: org/scala-lang/scala-compiler/2.10.3/scala-compiler-2.10.3.jar from central failed: Connection reset - [Help 1] -- Godspeed, Manoj Kumar, Intern, Telecom ParisTech Mech Undergrad http://manojbits.wordpress.com
Re: sqlContext is undefined in the Spark Shell
This is a noise,please ignore I figured out what happens... bit1...@163.com From: bit1...@163.com Date: 2015-01-03 19:03 To: user Subject: sqlContext is undefined in the Spark Shell Hi, In the spark shell, I do the following two things: 1. scala val cxt = new org.apache.spark.sql.SQLContext(sc); 2. scala import sqlContext._ The 1st one succeeds while the 2nd one fails with the following error, console:10: error: not found: value sqlContext import sqlContext._ Is there something missing? I am using Spark 1.2.0. Thanks. bit1...@163.com
sqlContext is undefined in the Spark Shell
Hi, In the spark shell, I do the following two things: 1. scala val cxt = new org.apache.spark.sql.SQLContext(sc); 2. scala import sqlContext._ The 1st one succeeds while the 2nd one fails with the following error, console:10: error: not found: value sqlContext import sqlContext._ Is there something missing? I am using Spark 1.2.0. Thanks. bit1...@163.com