Re: [Spark Streaming] Dynamic Broadcast Variable Update
One, I think, you should take this to the spark developer list. Two, I suspect broadcast variables aren't the best solution for the use case, you describe. Maybe an in-memory data/object/file store like tachyon is a better fit. Thanks, Tim On Tue, May 2, 2017 at 11:56 AM, Nipun Arorawrote: > Hi All, > > To support our Spark Streaming based anomaly detection tool, we have made > a patch in Spark 1.6.2 to dynamically update broadcast variables. > > I'll first explain our use-case, which I believe should be common to > several people using Spark Streaming applications. Broadcast variables are > often used to store values "machine learning models", which can then be > used on streaming data to "test" and get the desired results (for our case > anomalies). Unfortunately, in the current spark, broadcast variables are > final and can only be initialized once before the initialization of the > streaming context. Hence, if a new model is learned the streaming system > cannot be updated without shutting down the application, broadcasting > again, and restarting the application. Our goal was to re-broadcast > variables without requiring a downtime of the streaming service. > > The key to this implementation is a live re-broadcastVariable() interface, > which can be triggered in between micro-batch executions, without any > re-boot required for the streaming application. At a high level the task is > done by re-fetching broadcast variable information from the spark driver, > and then re-distribute it to the workers. The micro-batch execution is > blocked while the update is made, by taking a lock on the execution. We > have already tested this in our prototype deployment of our anomaly > detection service and can successfully re-broadcast the broadcast variables > with no downtime. > > We would like to integrate these changes in spark, can anyone please let > me know the process of submitting patches/ new features to spark. Also. I > understand that the current version of Spark is 2.1. However, our changes > have been done and tested on Spark 1.6.2, will this be a problem? > > Thanks > Nipun > -- -- Thanks, Tim
Re: Initialize Gaussian Mixture Model using Spark ML dataframe API
Yes, I noticed these open issues, both with KMeans and GMM: https://issues.apache.org/jira/browse/SPARK-13025 Thanks, Tim On Mon, May 1, 2017 at 9:01 PM, Yanbo Liang <yblia...@gmail.com> wrote: > Hi Tim, > > Spark ML API doesn't support set initial model for GMM currently. I wish > we can get this feature in Spark 2.3. > > Thanks > Yanbo > > On Fri, Apr 28, 2017 at 1:46 AM, Tim Smith <secs...@gmail.com> wrote: > >> Hi, >> >> I am trying to figure out the API to initialize a gaussian mixture model >> using either centroids created by K-means or previously calculated GMM >> model (I am aware that you can "save" a model and "load" in later but I am >> not interested in saving a model to a filesystem). >> >> The Spark MLlib API lets you do this using SetInitialModel >> https://spark.apache.org/docs/2.1.0/api/scala/index.html#org >> .apache.spark.mllib.clustering.GaussianMixture >> >> However, I cannot figure out how to do this using Spark ML API. Can >> anyone please point me in the right direction? I've tried reading the Spark >> ML code and was wondering if the "set" call lets you do that? >> >> -- >> Thanks, >> >> Tim >> > > -- -- Thanks, Tim
Initialize Gaussian Mixture Model using Spark ML dataframe API
Hi, I am trying to figure out the API to initialize a gaussian mixture model using either centroids created by K-means or previously calculated GMM model (I am aware that you can "save" a model and "load" in later but I am not interested in saving a model to a filesystem). The Spark MLlib API lets you do this using SetInitialModel https://spark.apache.org/docs/2.1.0/api/scala/index.html#org.apache.spark.mllib.clustering.GaussianMixture However, I cannot figure out how to do this using Spark ML API. Can anyone please point me in the right direction? I've tried reading the Spark ML code and was wondering if the "set" call lets you do that? -- Thanks, Tim
Re: Assigning a unique row ID
http://stackoverflow.com/questions/37231616/add-a-new-column-to-a-dataframe-new-column-i-want-it-to-be-a-uuid-generator On Fri, Apr 7, 2017 at 3:56 PM, Everett Andersonwrote: > Hi, > > What's the best way to assign a truly unique row ID (rather than a hash) > to a DataFrame/Dataset? > > I originally thought that functions.monotonically_increasing_id would do > this, but it seems to have a rather unfortunate property that if you add it > as a column to table A and then derive tables X, Y, Z and save those, the > row ID values in X, Y, and Z may end up different. I assume this is because > it delays the actual computation to the point where each of those tables is > computed. > > -- -- Thanks, Tim
Consuming AWS Cloudwatch logs from Kinesis into Spark
I am sharing this code snippet since I spent quite some time figuring it out and I couldn't find any examples online. Between the Kinesis documentation, tutorial on AWS site and other code snippets on the Internet, I was confused about structure/format of the messages that Spark fetches from Kinesis - base64 encoded, json, gzipped - which one first and what order. I tested this on EMR-5.4.0, Amazon Hadoop 2.7.3 and Spark 2.1.0. Hope it helps others googling for similar info. I tried using Structured Streaming but (1) it's in Alpha and (2) despite including what I thought were all the dependencies, it complained of not finding DataSource.Kinesis. You probably do not need all the libs but I am just too lazy to redact ones you don't require for the snippet below :) import org.apache.spark.streaming.Duration import org.apache.spark.streaming.kinesis._ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.storage.StorageLevel import org.apache.spark.rdd.RDD import java.util.Base64 import org.apache.spark.sql.Row import org.apache.spark.sql.types._ import org.apache.spark.sql.functions.udf import org.apache.spark.sql.functions.explode import org.apache.commons.math3.stat.descriptive._ import java.io.File import java.net.InetAddress import scala.util.control.NonFatal import org.apache.spark.SparkFiles import org.apache.spark.sql.SaveMode import java.util.Properties; import org.json4s._ import org.json4s.jackson.JsonMethods._ import java.io.{ByteArrayOutputStream, ByteArrayInputStream} import java.util.zip.{GZIPOutputStream, GZIPInputStream} import scala.util.Try //sc.setLogLevel("INFO") val ssc = new StreamingContext(sc, Seconds(30)) val kinesisStreams = (0 until 2).map { i => KinesisUtils.createStream(ssc, "myApp", "cloudwatchlogs", "https://kinesis.us-east-1.amazonaws.com","us-east-1;, InitialPositionInStream.LATEST , Seconds(30), StorageLevel.MEMORY_AND_DISK_2,"myId","mySecret") } val unionStreams = ssc.union(kinesisStreams) unionStreams.foreachRDD((rdd: RDD[Array[Byte]], time: Time) => { if(rdd.count() > 0) { val json = rdd.map(input => { val inputStream = new GZIPInputStream(new ByteArrayInputStream(input)) val record = scala.io.Source.fromInputStream(inputStream).mkString compact(render(parse(record))) }) val df = spark.sqlContext.read.json(json) val preDF = df.select($"logGroup",explode($"logEvents").as("events_flat")) val penDF = preDF.select($"logGroup",$"events_flat.extractedFields") val finalDF = penDF.select($"logGroup".as("cluster"),$"extractedFields.*") finalDF.printSchema() finalDF.show() } }) ssc.start -- Thanks, Tim
Re: Spark REST Job server feedback?
I am curious too - any comparison between the two. Looks like one is Datastax sponsored and the other is Cloudera. Other than that, any major/core differences in design/approach? Thanks, Tim On Mon, Sep 28, 2015 at 8:32 AM, Ramirez Quetzalwrote: > Anyone has feedback on using Hue / Spark Job Server REST servers? > > > http://gethue.com/how-to-use-the-livy-spark-rest-job-server-for-interactive-spark/ > > https://github.com/spark-jobserver/spark-jobserver > > Many thanks, > > Rami > -- -- Thanks, Tim
Alter table fails to find table
Spark 1.3.0 (CDH 5.4.4) scala> sqlContext.sql("SHOW TABLES").collect res18: Array[org.apache.spark.sql.Row] = Array([allactivitydata,true], [sample_07,false], [sample_08,false]) sqlContext.sql("SELECT COUNT(*) from allactivitydata").collect res19: Array[org.apache.spark.sql.Row] = Array([1227230]) scala> sqlContext.sql("ALTER TABLE allactivitydata ADD COLUMNS (test STRING)"); 15/09/03 04:23:16 INFO ParseDriver: Parsing command: ALTER TABLE allactivitydata ADD COLUMNS (test STRING) 15/09/03 04:23:16 INFO ParseDriver: Parse Completed 15/09/03 04:23:16 INFO PerfLogger: 15/09/03 04:23:16 INFO PerfLogger: 15/09/03 04:23:16 INFO PerfLogger: 15/09/03 04:23:16 INFO PerfLogger: 15/09/03 04:23:16 INFO ParseDriver: Parsing command: ALTER TABLE allactivitydata ADD COLUMNS (test STRING) 15/09/03 04:23:16 INFO ParseDriver: Parse Completed 15/09/03 04:23:16 INFO PerfLogger: 15/09/03 04:23:16 INFO PerfLogger: 15/09/03 04:23:16 ERROR Driver: FAILED: SemanticException [Error 10001]: Table not found default.allactivitydata org.apache.hadoop.hive.ql.parse.SemanticException: Table not found default.allactivitydata at org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.getTable(BaseSemanticAnalyzer.java:1332) at org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.getTable(BaseSemanticAnalyzer.java:1315) at org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer.addInputsOutputsAlterTable(DDLSemanticAnalyzer.java:1387) at org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer.addInputsOutputsAlterTable(DDLSemanticAnalyzer.java:1374) at org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer.analyzeAlterTableModifyCols(DDLSemanticAnalyzer.java:2611) at org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer.analyzeInternal(DDLSemanticAnalyzer.java:255) at org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:222) at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:423) at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:307) at org.apache.hadoop.hive.ql.Driver.compileInternal(Driver.java:1112) at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1160) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1049) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1039) at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:308) at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:280) at org.apache.spark.sql.hive.execution.HiveNativeCommand.run(HiveNativeCommand.scala:33) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:55) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:55) at org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:65) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:1088) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:1088) at org.apache.spark.sql.DataFrame.(DataFrame.scala:147) at org.apache.spark.sql.DataFrame.(DataFrame.scala:130) at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51) at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:92) at $line83.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:32) at $line83.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:37) at $line83.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:39) at $line83.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:41) at $line83.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:43) at $line83.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:45) at $line83.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:47) at $line83.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:49) at $line83.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:51) at $line83.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:53) at $line83.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:55) at $line83.$read$$iwC$$iwC$$iwC$$iwC$$iwC.(:57) at $line83.$read$$iwC$$iwC$$iwC$$iwC.(:59) at $line83.$read$$iwC$$iwC$$iwC.(:61) at $line83.$read$$iwC$$iwC.(:63) at $line83.$read$$iwC.(:65) at $line83.$read.(:67) at $line83.$read$.(:71) at $line83.$read$.() at $line83.$eval$.(:7) at $line83.$eval$.() at $line83.$eval.$print() at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338) at
Controlling output fileSize in SparkSQL
Hi, I am using Spark 1.3 (CDH 5.4.4). What's the recipe for setting a minimum output file size when writing out from SparkSQL? So far, I have tried: --x- import sqlContext.implicits._ sc.hadoopConfiguration.setBoolean(fs.hdfs.impl.disable.cache,true) sc.hadoopConfiguration.setLong(fs.local.block.size,1073741824) sc.hadoopConfiguration.setLong(dfs.blocksize,1073741824) sqlContext.sql(SET spark.sql.shuffle.partitions=2) val df = sqlContext.jsonFile(hdfs://nameservice1/user/joe/samplejson/*) df.saveAsParquetFile(hdfs://nameservice1/user/joe/data/reduceFiles-Parquet) --x- But my output still isn't aggregated into 1+GB files. Thanks, - Siddhartha
Re: createDirectStream and Stats
Essentially, I went from: k = createStream . val dataout = k.map(x=myFunc(x._2,someParams)) dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = { myOutputFunc.write(rec) }) To: kIn = createDirectStream . k = kIn.repartition(numberOfExecutors) //since #kafka partitions #spark-executors val dataout = k.map(x=myFunc(x._2,someParams)) dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = { myOutputFunc.write(rec) }) With the new API, the app starts up and works fine for a while but I guess starts to deteriorate after a while. With the existing API createStream, the app does deteriorate but over a much longer period, hours vs days. On Fri, Jun 19, 2015 at 1:40 PM, Tathagata Das t...@databricks.com wrote: Yes, please tell us what operation are you using. TD On Fri, Jun 19, 2015 at 11:42 AM, Cody Koeninger c...@koeninger.org wrote: Is there any more info you can provide / relevant code? On Fri, Jun 19, 2015 at 1:23 PM, Tim Smith secs...@gmail.com wrote: Update on performance of the new API: the new code using the createDirectStream API ran overnight and when I checked the app state in the morning, there were massive scheduling delays :( Not sure why and haven't investigated a whole lot. For now, switched back to the createStream API build of my app. Yes, for the record, this is with CDH 5.4.1 and Spark 1.3. On Thu, Jun 18, 2015 at 7:05 PM, Tim Smith secs...@gmail.com wrote: Thanks for the super-fast response, TD :) I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4. Cloudera, are you listening? :D On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Are you using Spark 1.3.x ? That explains. This issue has been fixed in Spark 1.4.0. Bonus you get a fancy new streaming UI with more awesome stats. :) On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith secs...@gmail.com wrote: Hi, I just switched from createStream to the createDirectStream API for kafka and while things otherwise seem happy, the first thing I noticed is that stream/receiver stats are gone from the Spark UI :( Those stats were very handy for keeping an eye on health of the app. What's the best way to re-create those in the Spark UI? Maintain Accumulators? Would be really nice to get back receiver-like stats even though I understand that createDirectStream is a receiver-less design. Thanks, Tim
Re: createDirectStream and Stats
I did try without repartition, initially, but that was even more horrible because instead of the allocated 100 executors, only 30 (which is the number of kafka partitions) would have to do the work. The MyFunc is a CPU bound task so adding more memory per executor wouldn't help and I saw that each of the 30 executors was only using one thread/core on each Spark box. I could go and play with threading in MyFunc but I don't want to mess with threading with all the parallelism already involved and I don't think in-app threading outside of what the framework does is really desirable. With repartition, there is shuffle involved, but at least the computation load spreads across all 100 executors instead of just 30. On Fri, Jun 19, 2015 at 7:14 PM, Cody Koeninger c...@koeninger.org wrote: If that's the case, you're still only using as many read executors as there are kafka partitions. I'd remove the repartition. If you weren't doing any shuffles in the old job, and are doing a shuffle in the new job, it's not really comparable. On Fri, Jun 19, 2015 at 8:16 PM, Tim Smith secs...@gmail.com wrote: On Fri, Jun 19, 2015 at 5:15 PM, Tathagata Das t...@databricks.com wrote: Also, can you find from the spark UI the break up of the stages in each batch's jobs, and find which stage is taking more time after a while? Sure, will try to debug/troubleshoot. Are there enhancements to this specific API between 1.3 and 1.4 that can substantially change it's behaviour? On Fri, Jun 19, 2015 at 4:51 PM, Cody Koeninger c...@koeninger.org wrote: when you say your old version was k = createStream . were you manually creating multiple receivers? Because otherwise you're only using one receiver on one executor... Yes, sorry, the earlier/stable version was more like: kInStreams = (1 to n).map{_ = KafkaUtils.createStream // n being the number of kafka partitions, 1 receiver per partition val k = ssc.union(kInStreams) val dataout = k.map(x=myFunc(x._2,someParams)) dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = { myOutputFunc.write(rec) }) Thanks, Tim If that's the case I'd try direct stream without the repartitioning. On Fri, Jun 19, 2015 at 6:43 PM, Tim Smith secs...@gmail.com wrote: Essentially, I went from: k = createStream . val dataout = k.map(x=myFunc(x._2,someParams)) dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = { myOutputFunc.write(rec) }) To: kIn = createDirectStream . k = kIn.repartition(numberOfExecutors) //since #kafka partitions #spark-executors val dataout = k.map(x=myFunc(x._2,someParams)) dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = { myOutputFunc.write(rec) }) With the new API, the app starts up and works fine for a while but I guess starts to deteriorate after a while. With the existing API createStream, the app does deteriorate but over a much longer period, hours vs days. On Fri, Jun 19, 2015 at 1:40 PM, Tathagata Das t...@databricks.com wrote: Yes, please tell us what operation are you using. TD On Fri, Jun 19, 2015 at 11:42 AM, Cody Koeninger c...@koeninger.org wrote: Is there any more info you can provide / relevant code? On Fri, Jun 19, 2015 at 1:23 PM, Tim Smith secs...@gmail.com wrote: Update on performance of the new API: the new code using the createDirectStream API ran overnight and when I checked the app state in the morning, there were massive scheduling delays :( Not sure why and haven't investigated a whole lot. For now, switched back to the createStream API build of my app. Yes, for the record, this is with CDH 5.4.1 and Spark 1.3. On Thu, Jun 18, 2015 at 7:05 PM, Tim Smith secs...@gmail.com wrote: Thanks for the super-fast response, TD :) I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4. Cloudera, are you listening? :D On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Are you using Spark 1.3.x ? That explains. This issue has been fixed in Spark 1.4.0. Bonus you get a fancy new streaming UI with more awesome stats. :) On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith secs...@gmail.com wrote: Hi, I just switched from createStream to the createDirectStream API for kafka and while things otherwise seem happy, the first thing I noticed is that stream/receiver stats are gone from the Spark UI :( Those stats were very handy for keeping an eye on health of the app. What's the best way to re-create those in the Spark UI? Maintain Accumulators? Would be really nice to get back receiver-like stats even though I understand that createDirectStream is a receiver-less design. Thanks, Tim
Re: createDirectStream and Stats
On Fri, Jun 19, 2015 at 5:15 PM, Tathagata Das t...@databricks.com wrote: Also, can you find from the spark UI the break up of the stages in each batch's jobs, and find which stage is taking more time after a while? Sure, will try to debug/troubleshoot. Are there enhancements to this specific API between 1.3 and 1.4 that can substantially change it's behaviour? On Fri, Jun 19, 2015 at 4:51 PM, Cody Koeninger c...@koeninger.org wrote: when you say your old version was k = createStream . were you manually creating multiple receivers? Because otherwise you're only using one receiver on one executor... Yes, sorry, the earlier/stable version was more like: kInStreams = (1 to n).map{_ = KafkaUtils.createStream // n being the number of kafka partitions, 1 receiver per partition val k = ssc.union(kInStreams) val dataout = k.map(x=myFunc(x._2,someParams)) dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = { myOutputFunc.write(rec) }) Thanks, Tim If that's the case I'd try direct stream without the repartitioning. On Fri, Jun 19, 2015 at 6:43 PM, Tim Smith secs...@gmail.com wrote: Essentially, I went from: k = createStream . val dataout = k.map(x=myFunc(x._2,someParams)) dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = { myOutputFunc.write(rec) }) To: kIn = createDirectStream . k = kIn.repartition(numberOfExecutors) //since #kafka partitions #spark-executors val dataout = k.map(x=myFunc(x._2,someParams)) dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = { myOutputFunc.write(rec) }) With the new API, the app starts up and works fine for a while but I guess starts to deteriorate after a while. With the existing API createStream, the app does deteriorate but over a much longer period, hours vs days. On Fri, Jun 19, 2015 at 1:40 PM, Tathagata Das t...@databricks.com wrote: Yes, please tell us what operation are you using. TD On Fri, Jun 19, 2015 at 11:42 AM, Cody Koeninger c...@koeninger.org wrote: Is there any more info you can provide / relevant code? On Fri, Jun 19, 2015 at 1:23 PM, Tim Smith secs...@gmail.com wrote: Update on performance of the new API: the new code using the createDirectStream API ran overnight and when I checked the app state in the morning, there were massive scheduling delays :( Not sure why and haven't investigated a whole lot. For now, switched back to the createStream API build of my app. Yes, for the record, this is with CDH 5.4.1 and Spark 1.3. On Thu, Jun 18, 2015 at 7:05 PM, Tim Smith secs...@gmail.com wrote: Thanks for the super-fast response, TD :) I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4. Cloudera, are you listening? :D On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Are you using Spark 1.3.x ? That explains. This issue has been fixed in Spark 1.4.0. Bonus you get a fancy new streaming UI with more awesome stats. :) On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith secs...@gmail.com wrote: Hi, I just switched from createStream to the createDirectStream API for kafka and while things otherwise seem happy, the first thing I noticed is that stream/receiver stats are gone from the Spark UI :( Those stats were very handy for keeping an eye on health of the app. What's the best way to re-create those in the Spark UI? Maintain Accumulators? Would be really nice to get back receiver-like stats even though I understand that createDirectStream is a receiver-less design. Thanks, Tim
Re: createDirectStream and Stats
Update on performance of the new API: the new code using the createDirectStream API ran overnight and when I checked the app state in the morning, there were massive scheduling delays :( Not sure why and haven't investigated a whole lot. For now, switched back to the createStream API build of my app. Yes, for the record, this is with CDH 5.4.1 and Spark 1.3. On Thu, Jun 18, 2015 at 7:05 PM, Tim Smith secs...@gmail.com wrote: Thanks for the super-fast response, TD :) I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4. Cloudera, are you listening? :D On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Are you using Spark 1.3.x ? That explains. This issue has been fixed in Spark 1.4.0. Bonus you get a fancy new streaming UI with more awesome stats. :) On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith secs...@gmail.com wrote: Hi, I just switched from createStream to the createDirectStream API for kafka and while things otherwise seem happy, the first thing I noticed is that stream/receiver stats are gone from the Spark UI :( Those stats were very handy for keeping an eye on health of the app. What's the best way to re-create those in the Spark UI? Maintain Accumulators? Would be really nice to get back receiver-like stats even though I understand that createDirectStream is a receiver-less design. Thanks, Tim
Re: createDirectStream and Stats
Thanks for the super-fast response, TD :) I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4. Cloudera, are you listening? :D On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Are you using Spark 1.3.x ? That explains. This issue has been fixed in Spark 1.4.0. Bonus you get a fancy new streaming UI with more awesome stats. :) On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith secs...@gmail.com wrote: Hi, I just switched from createStream to the createDirectStream API for kafka and while things otherwise seem happy, the first thing I noticed is that stream/receiver stats are gone from the Spark UI :( Those stats were very handy for keeping an eye on health of the app. What's the best way to re-create those in the Spark UI? Maintain Accumulators? Would be really nice to get back receiver-like stats even though I understand that createDirectStream is a receiver-less design. Thanks, Tim
createDirectStream and Stats
Hi, I just switched from createStream to the createDirectStream API for kafka and while things otherwise seem happy, the first thing I noticed is that stream/receiver stats are gone from the Spark UI :( Those stats were very handy for keeping an eye on health of the app. What's the best way to re-create those in the Spark UI? Maintain Accumulators? Would be really nice to get back receiver-like stats even though I understand that createDirectStream is a receiver-less design. Thanks, Tim
Re: Accumulator in SparkUI for streaming
So somehow Spark Streaming doesn't support display of named accumulators in the WebUI? On Tue, Feb 24, 2015 at 7:58 AM, Petar Zecevic petar.zece...@gmail.com wrote: Interesting. Accumulators are shown on Web UI if you are using the ordinary SparkContext (Spark 1.2). It just has to be named (and that's what you did). scala val acc = sc.accumulator(0, test accumulator) acc: org.apache.spark.Accumulator[Int] = 0 scala val rdd = sc.parallelize(1 to 1000) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at console:12 scala rdd.foreach(x = acc += 1) scala acc.value res1: Int = 1000 The Stage details page shows: On 20.2.2015. 9:25, Tim Smith wrote: On Spark 1.2: I am trying to capture # records read from a kafka topic: val inRecords = ssc.sparkContext.accumulator(0, InRecords) .. kInStreams.foreach( k = { k.foreachRDD ( rdd = inRecords += rdd.count().toInt ) inRecords.value Question is how do I get the accumulator to show up in the UI? I tried inRecords.value but that didn't help. Pretty sure it isn't showing up in Stage metrics. What's the trick here? collect? Thanks, Tim
Accumulator in SparkUI for streaming
On Spark 1.2: I am trying to capture # records read from a kafka topic: val inRecords = ssc.sparkContext.accumulator(0, InRecords) .. kInStreams.foreach( k = { k.foreachRDD ( rdd = inRecords += rdd.count().toInt ) inRecords.value Question is how do I get the accumulator to show up in the UI? I tried inRecords.value but that didn't help. Pretty sure it isn't showing up in Stage metrics. What's the trick here? collect? Thanks, Tim
How to diagnose could not compute split errors and failed jobs?
My streaming app runs fine for a few hours and then starts spewing Could not compute split, block input-xx-xxx not found errors. After this, jobs start to fail and batches start to pile up. My question isn't so much about why this error but rather, how do I trace what leads to this error? I am using disk+memory for storage so shouldn't be a case of data loss resulting from memory overrun. 15/02/18 22:04:49 ERROR JobScheduler: Error running job streaming job 142429705 ms.28 org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 247644.0 failed 64 times, most recent failure: Lost task 3.63 in stage 247644.0 (TID 3705290, node-dn1-16-test.abcdefg.com): java.lang.Exception: Could not compute split, block input-28-1424297042500 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) at org.apache.spark.rdd.RDD.iterator(RDD.scala:228) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Thanks, Tim
Re: Spark Streaming output cannot be used as input?
+1 for writing the Spark output to Kafka. You can then hang off multiple compute/storage framework from kafka. I am using a similar pipeline to feed ElasticSearch and HDFS in parallel. Allows modularity, you can take down ElasticSearch or HDFS for maintenance without losing (except for some edge cases) data. You can even pipeline other Spark streaming apps off kafka to modularize your processing pipeline so you don't have one single big Spark app doing all the processing. On Wed, Feb 18, 2015 at 3:34 PM, Jose Fernandez jfernan...@sdl.com wrote: Thanks for the advice folks, it is much appreciated. This seems like a pretty unfortunate design flaw. My team was surprised by it. I’m going to drop the two-step process and do it all in a single step until we get Kafka online. *From:* Sean Owen [mailto:so...@cloudera.com] *Sent:* Wednesday, February 18, 2015 1:53 AM *To:* Emre Sevinc *Cc:* Jose Fernandez; user@spark.apache.org *Subject:* Re: Spark Streaming output cannot be used as input? To clarify, sometimes in the world of Hadoop people freely refer to an output 'file' when it's really a directory containing 'part-*' files which are pieces of the file. It's imprecise but that's the meaning. I think the scaladoc may be referring to 'the path to the file, which includes this parent dir, is generated ...' In an inherently distributed system, you want to distributed writes and reads, so big files are really made of logical files within a directory. There is a JIRA open to support nested dirs which has been languishing: https://issues.apache.org/jira/browse/SPARK-3586 I'm hoping to pursue that again with help from tdas after 1.3. That's probably the best solution. An alternative is to not use the file system as a sort of message queue, and instead use something like Kafka. It has a lot of other benefits but maybe it's not feasible to add this to your architecture. You can merge the files with HDFS APIs without much trouble. The dirs will be named consistently according to time and are something you can also query for. Making 1 partition has implications for parallelism of your job. Emre, I think I see what you're getting at but you have the map + materialize pattern which i think doesn't have the right guarantees about re-execution. Why not foreachRDD? Yes you can also consider collecting the whole RDD in foreachRDD and doing what you like, including writing to one file. But that would only work if the data is always small in each RDD. http://www.sdl.com/innovate/sanfran SDL PLC confidential, all rights reserved. If you are not the intended recipient of this mail SDL requests and requires that you delete it without acting upon or copying any of its contents, and we further request that you advise us. SDL PLC is a public limited company registered in England and Wales. Registered number: 02675207. Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6 7DY, UK. On Wed, Feb 18, 2015 at 8:50 AM, Emre Sevinc emre.sev...@gmail.com wrote: Hello Jose, We've hit the same issue a couple of months ago. It is possible to write directly to files instead of creating directories, but it is not straightforward, and I haven't seen any clear demonstration in books, tutorials, etc. We do something like: SparkConf sparkConf = new SparkConf().setAppName(appName); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(batchInterval)); JavaDStreamString stream = MyModuleApp.initializeJob(ssc); MyModuleApp.process(stream); And then in the process method: @Override public void process(JavaDStreamString inStream) { JavaDStreamString json = inStream.map(new MyModuleWorker(jsonSchemaName, validatedJSONoutputDir, rejectedJSONoutputDir)); forceOutput(json); } This, in turn, calls the following (I've removed the irrelevant lines to focus on writing): public class MyModuleWorker implements FunctionString,String { public String call(String json) { // process the data and then write it writeJSON(json, validatedJSONoutputDir_); } } And the writeJSON method is: public static final void writeJSON(String json, String jsonDirPath) throws IOException { String jsonFileName = jsonDirPath + / + UUID.randomUUID().toString() + .json.tmp; URI uri = URI.create(jsonFileName); Configuration conf = new Configuration(); FileSystem fileSystem = FileSystem.get(uri, conf); FSDataOutputStream out = fileSystem.create(new Path(uri)); out.write(json.getBytes(StandardCharsets.UTF_8)); out.close(); fileSystem.rename(new Path(uri), new Path(URI.create(jsonDirPath + / + UUID.randomUUID().toString() + .json))); } Using a similar technique you might be able to achieve your objective. Kind regards, Emre Sevinç http://www.bigindustries.be/ On Wed, Feb 18, 2015 at 4:32 AM, Jose
Re: Streaming scheduling delay
Hi Gerard, Great write-up and really good guidance in there. I have to be honest, I don't know why but setting # of partitions for each dStream to a low number (5-10) just causes the app to choke/crash. Setting it to 20 gets the app going but with not so great delays. Bump it up to 30 and I start winning the war where processing time is consistently below batch time window (20 seconds) except for a batch every few batches where the compute time spikes 10x the usual. Following your guide, I took out some logInfo statements I had in the app but didn't seem to make much difference :( With a higher time window (20 seconds), I got the app to run stably for a few hours but then ran into the dreaded java.lang.Exception: Could not compute split, block input-0-1423761240800 not found. Wonder if I need to add RDD persistence back? Also, I am reaching out to Virdata with some ProServ inquiries. Thanks On Thu, Feb 12, 2015 at 4:30 AM, Gerard Maas gerard.m...@gmail.com wrote: Hi Tim, From this: There are 5 kafka receivers and each incoming stream is split into 40 partitions I suspect that you're creating too many tasks for Spark to process on time. Could you try some of the 'knobs' I describe here to see if that would help? http://www.virdata.com/tuning-spark/ -kr, Gerard. On Thu, Feb 12, 2015 at 8:44 AM, Tim Smith secs...@gmail.com wrote: Just read the thread Are these numbers abnormal for spark streaming? and I think I am seeing similar results - that is - increasing the window seems to be the trick here. I will have to monitor for a few hours/days before I can conclude (there are so many knobs/dials). On Wed, Feb 11, 2015 at 11:16 PM, Tim Smith secs...@gmail.com wrote: On Spark 1.2 (have been seeing this behaviour since 1.0), I have a streaming app that consumes data from Kafka and writes it back to Kafka (different topic). My big problem has been Total Delay. While execution time is usually window size (in seconds), the total delay ranges from a minutes to hours(s) (keeps going up). For a little while, I thought I had solved the issue by bumping up the driver memory. Then I expanded my Kafka cluster to add more nodes and the issue came up again. I tried a few things to smoke out the issue and something tells me the driver is the bottleneck again: 1) From my app, I took out the entire write-out-to-kafka piece. Sure enough, execution, scheduling delay and hence total delay fell to sub second. This assured me that whatever processing I do before writing back to kafka isn't the bottleneck. 2) In my app, I had RDD persistence set at different points but my code wasn't really re-using any RDDs so I took out all explicit persist() statements. And added, spar...unpersist to true in the context. After this, it doesn't seem to matter how much memory I give my executor, the total delay seems to be in the same range. I tried per executor memory from 2G to 12G with no change in total delay so executors aren't memory starved. Also, in the SparkUI, under the Executors tab, all executors show 0/1060MB used when per executor memory is set to 2GB, for example. 3) Input rate in the kafka consumer restricts spikes in incoming data. 4) Tried FIFO and FAIR but didn't make any difference. 5) Adding executors beyond a certain points seems useless (I guess excess ones just sit idle). At any given point in time, the SparkUI shows only one batch pending processing. So with just one batch pending processing, why would the scheduling delay run into minutes/hours if execution time is within the batch window duration? There aren't any failed stages or jobs. Right now, I have 100 executors ( i have tried setting executors from 50-150), each with 2GB and 4 cores and the driver running with 16GB. There are 5 kafka receivers and each incoming stream is split into 40 partitions. Per receiver, input rate is restricted to 2 messages per second. Can anyone help me with clues or areas to look into, for troubleshooting the issue? One nugget I found buried in the code says: The scheduler delay includes the network delay to send the task to the worker machine and to send back the result (but not the time to fetch the task result, if it needed to be fetched from the block manager on the worker). https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala Could this be an issue with the driver being a bottlneck? All the executors posting their logs/stats to the driver? Thanks, Tim
Re: Streaming scheduling delay
TaskSetManager: Lost task 54.56 in stage 16291.0 (TID 1042754) on executor nodedn1-17-acme.com: java.lang.Exception (Could not compute split, block input-4-1423758372200 not found) [duplicate 56] 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.57 in stage 16291.0 (TID 1042758) on executor nodedn1-17-acme.com: java.lang.Exception (Could not compute split, block input-4-1423758372200 not found) [duplicate 57] 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.58 in stage 16291.0 (TID 1042762) on executor nodedn1-12-acme.com: java.lang.Exception (Could not compute split, block input-4-1423758372200 not found) [duplicate 58] 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.59 in stage 16291.0 (TID 1042766) on executor nodedn1-23-acme.com: java.lang.Exception (Could not compute split, block input-4-1423758372200 not found) [duplicate 59] 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.60 in stage 16291.0 (TID 1042774) on executor nodedn1-20-acme.com: java.lang.Exception (Could not compute split, block input-4-1423758372200 not found) [duplicate 60] 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.61 in stage 16291.0 (TID 1042779) on executor nodedn1-13-acme.com: java.lang.Exception (Could not compute split, block input-4-1423758372200 not found) [duplicate 61] 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.62 in stage 16291.0 (TID 1042789) on executor nodedn1-20-acme.com: java.lang.Exception (Could not compute split, block input-4-1423758372200 not found) [duplicate 62] 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.63 in stage 16291.0 (TID 1042793) on executor nodedn1-15-acme.com: java.lang.Exception (Could not compute split, block input-4-1423758372200 not found) [duplicate 63] org.apache.spark.SparkException: Job aborted due to stage failure: Task 54 in stage 16291.0 failed 64 times, most recent failure: Lost task 54.63 in stage 16291.0 (TID 1042793, nodedn1-15-acme.com): java.lang.Exception: Could not compute split, block input-4-1423758372200 not found Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 54 in stage 16291.0 failed 64 times, most recent failure: Lost task 54.63 in stage 16291.0 (TID 1042793, nodedn1-15-acme.com): java.lang.Exception: Could not compute split, block input-4-1423758372200 not found Thanks for looking into it. On Thu, Feb 12, 2015 at 8:10 PM, Tathagata Das t...@databricks.com wrote: Hey Tim, Let me get the key points. 1. If you are not writing back to Kafka, the delay is stable? That is, instead of foreachRDD { // write to kafka } if you do dstream.count, then the delay is stable. Right? 2. If so, then Kafka is the bottleneck. Is the number of partitions, that you spoke of the in the second mail, that determines the parallelism in writes? Is it stable with 30 partitions? Regarding the block exception, could you give me a trace of info level logging that leads to this error? Basically I want trace the lifecycle of the block. TD On Thu, Feb 12, 2015 at 6:29 PM, Tim Smith secs...@gmail.com wrote: Hi Gerard, Great write-up and really good guidance in there. I have to be honest, I don't know why but setting # of partitions for each dStream to a low number (5-10) just causes the app to choke/crash. Setting it to 20 gets the app going but with not so great delays. Bump it up to 30 and I start winning the war where processing time is consistently below batch time window (20 seconds) except for a batch every few batches where the compute time spikes 10x the usual. Following your guide, I took out some logInfo statements I had in the app but didn't seem to make much difference :( With a higher time window (20 seconds), I got the app to run stably for a few hours but then ran into the dreaded java.lang.Exception: Could not compute split, block input-0-1423761240800 not found. Wonder if I need to add RDD persistence back? Also, I am reaching out to Virdata with some ProServ inquiries. Thanks On Thu, Feb 12, 2015 at 4:30 AM, Gerard Maas gerard.m...@gmail.com wrote: Hi Tim, From this: There are 5 kafka receivers and each incoming stream is split into 40 partitions I suspect that you're creating too many tasks for Spark to process on time. Could you try some of the 'knobs' I describe here to see if that would help? http://www.virdata.com/tuning-spark/ -kr, Gerard. On Thu, Feb 12, 2015 at 8:44 AM, Tim Smith secs...@gmail.com wrote: Just read the thread Are these numbers abnormal for spark streaming? and I think I am seeing similar results - that is - increasing the window seems to be the trick here. I will have to monitor for a few hours/days before I can conclude (there are so many knobs/dials). On Wed, Feb 11, 2015 at 11:16 PM, Tim Smith secs...@gmail.com wrote: On Spark 1.2 (have been seeing this behaviour since 1.0), I have a streaming app that consumes data from Kafka and writes it back to Kafka (different topic). My big problem has been Total
Re: Streaming scheduling delay
I replaced the writeToKafka statements with a rdd.count() and sure enough, I have a stable app with total delay well within the batch window (20 seconds). Here's the total delay lines from the driver log: 15/02/13 06:14:26 INFO JobScheduler: Total delay: 6.521 s for time 142380806 ms (execution: 6.404 s) 15/02/13 06:15:22 INFO JobScheduler: Total delay: 42.396 s for time 142380808 ms (execution: 42.338 s) 15/02/13 06:16:21 INFO JobScheduler: Total delay: 81.879 s for time 142380810 ms (execution: 59.483 s) 15/02/13 06:16:40 INFO JobScheduler: Total delay: 80.242 s for time 142380812 ms (execution: 18.363 s) 15/02/13 06:16:50 INFO JobScheduler: Total delay: 70.342 s for time 142380814 ms (execution: 10.100 s) 15/02/13 06:16:56 INFO JobScheduler: Total delay: 56.551 s for time 142380816 ms (execution: 6.209 s) 15/02/13 06:17:06 INFO JobScheduler: Total delay: 46.405 s for time 142380818 ms (execution: 9.854 s) 15/02/13 06:17:13 INFO JobScheduler: Total delay: 33.443 s for time 142380820 ms (execution: 7.038 s) 15/02/13 06:17:21 INFO JobScheduler: Total delay: 21.483 s for time 142380822 ms (execution: 8.039 s) 15/02/13 06:17:26 INFO JobScheduler: Total delay: 6.697 s for time 142380824 ms (execution: 5.213 s) 15/02/13 06:17:45 INFO JobScheduler: Total delay: 5.814 s for time 142380826 ms (execution: 5.767 s) 15/02/13 06:18:06 INFO JobScheduler: Total delay: 6.905 s for time 142380828 ms (execution: 6.858 s) 15/02/13 06:18:28 INFO JobScheduler: Total delay: 8.604 s for time 142380830 ms (execution: 8.556 s) 15/02/13 06:18:45 INFO JobScheduler: Total delay: 5.631 s for time 142380832 ms (execution: 5.583 s) 15/02/13 06:19:04 INFO JobScheduler: Total delay: 4.838 s for time 142380834 ms (execution: 4.791 s) 15/02/13 06:19:24 INFO JobScheduler: Total delay: 4.467 s for time 142380836 ms (execution: 4.422 s) 15/02/13 06:19:45 INFO JobScheduler: Total delay: 5.779 s for time 142380838 ms (execution: 5.733 s) 15/02/13 06:20:04 INFO JobScheduler: Total delay: 4.747 s for time 142380840 ms (execution: 4.701 s) 15/02/13 06:20:24 INFO JobScheduler: Total delay: 4.829 s for time 142380842 ms (execution: 4.782 s) 15/02/13 06:20:44 INFO JobScheduler: Total delay: 4.724 s for time 142380844 ms (execution: 4.678 s) 15/02/13 06:21:04 INFO JobScheduler: Total delay: 4.110 s for time 142380846 ms (execution: 4.064 s) 15/02/13 06:21:24 INFO JobScheduler: Total delay: 4.562 s for time 142380848 ms (execution: 4.514 s) 15/02/13 06:21:43 INFO JobScheduler: Total delay: 3.999 s for time 142380850 ms (execution: 3.954 s) 15/02/13 06:22:04 INFO JobScheduler: Total delay: 4.353 s for time 142380852 ms (execution: 4.309 s) 15/02/13 06:22:24 INFO JobScheduler: Total delay: 4.712 s for time 142380854 ms (execution: 4.667 s) 15/02/13 06:22:44 INFO JobScheduler: Total delay: 4.726 s for time 142380856 ms (execution: 4.681 s) 15/02/13 06:23:07 INFO JobScheduler: Total delay: 7.860 s for time 142380858 ms (execution: 7.816 s) 15/02/13 06:23:28 INFO JobScheduler: Total delay: 8.426 s for time 142380860 ms (execution: 8.383 s) 15/02/13 06:23:43 INFO JobScheduler: Total delay: 3.857 s for time 142380862 ms (execution: 3.814 s) 15/02/13 06:24:03 INFO JobScheduler: Total delay: 3.936 s for time 142380864 ms (execution: 3.892 s) 15/02/13 06:24:23 INFO JobScheduler: Total delay: 3.810 s for time 142380866 ms (execution: 3.767 s) 15/02/13 06:24:43 INFO JobScheduler: Total delay: 3.889 s for time 142380868 ms (execution: 3.845 s) 15/02/13 06:25:03 INFO JobScheduler: Total delay: 3.553 s for time 142380870 ms (execution: 3.510 s) 15/02/13 06:25:27 INFO JobScheduler: Total delay: 7.031 s for time 142380872 ms (execution: 6.989 s) 15/02/13 06:25:43 INFO JobScheduler: Total delay: 3.636 s for time 142380874 ms (execution: 3.594 s) 15/02/13 06:26:03 INFO JobScheduler: Total delay: 3.425 s for time 142380876 ms (execution: 3.383 s) 15/02/13 06:26:23 INFO JobScheduler: Total delay: 3.939 s for time 142380878 ms (execution: 3.897 s) 15/02/13 06:26:43 INFO JobScheduler: Total delay: 3.640 s for time 142380880 ms (execution: 3.596 s) 15/02/13 06:27:03 INFO JobScheduler: Total delay: 3.905 s for time 142380882 ms (execution: 3.861 s) 15/02/13 06:27:24 INFO JobScheduler: Total delay: 4.068 s for time 142380884 ms (execution: 4.026 s) On Thu, Feb 12, 2015 at 9:54 PM, Tim Smith secs...@gmail.com wrote: TD - I will try count() and report back. Meanwhile, attached is the entire driver log that includes the error logs about missing blocks. Cody - Let me research a bit about how to do connection pooling. Sorry, I am not really a programmer. I did see the connection pooling advise in the Spark Streaming Programming guide as an optimization but wasn't sure how to implement it. But do you think it will have a significant impact on performance? Saisai - I think, ideally, I'd rather not do any
Re: Streaming scheduling delay
Hi Saisai, If I understand correctly, you are suggesting that control parallelism by having number of consumers/executors at least 1:1 for number of kafka partitions. For example, if I have 50 partitions for a kafka topic then either have: - 25 or more executors, 25 receivers, each receiver set to 2 consumer threads per topic, or, - 50 or more executors, 50 receivers, each receiver set to 1 consumer thread per topic Actually, both executors and total consumers can be more than the number of kafka partitions (some will probably sit idle). But do away with dStream partitioning altogether. Right? Thanks, - Tim On Thu, Feb 12, 2015 at 11:03 PM, Saisai Shao sai.sai.s...@gmail.com wrote: Hi Tim, I think maybe you can try this way: create Receiver per executor and specify thread for each topic large than 1, and the total number of consumer thread will be: total consumer = (receiver number) * (thread number), and make sure this total consumer is less than or equal to Kafka partition number. In this case, I think the parallelism is enough, received blocks are distributed to each executor. So you don't need to repartition to increase the parallelism. Besides for Kafka's high-level API, Kafka partitions may not be equally distributed to all the receivers, so some tasks may process more data than other tasks. another way you can try DirectKafkaInputDStream in Spark 1.3, that will be more balanced because each Kafka partition mapping to Spark partition. Besides set partition count to 1 for each dStream means dstream.repartition(1) ? If so I think it will still introduce shuffle and move all the data into one partition. Thanks Saisai 2015-02-13 13:54 GMT+08:00 Tim Smith secs...@gmail.com: TD - I will try count() and report back. Meanwhile, attached is the entire driver log that includes the error logs about missing blocks. Cody - Let me research a bit about how to do connection pooling. Sorry, I am not really a programmer. I did see the connection pooling advise in the Spark Streaming Programming guide as an optimization but wasn't sure how to implement it. But do you think it will have a significant impact on performance? Saisai - I think, ideally, I'd rather not do any dStream partitioning. Instead have 1 receiver for each kafka partition (so in this case 23 receivers for 23 kafka partitions) and then have as many or more executors to handle processing of the dStreams. Right? Trouble is, I tried this approach and didn't work. Even If I set 23 receivers, and set partition count to 1 for each dStream (effectively, no stream splitting), my performance is extremely poor/laggy. Should I modify my code to remove dStream partitioning altogether and then try setting as many receivers as kafka partitions? On Thu, Feb 12, 2015 at 9:45 PM, Saisai Shao sai.sai.s...@gmail.com wrote: Hi Tim, I think this code will still introduce shuffle even when you call repartition on each input stream. Actually this style of implementation will generate more jobs (job per each input stream) than union into one stream as called DStream.union(), and union normally has no special overhead as I understood. Also as Cody said, creating Producer per partition could be a potential overhead, producer pool or sharing the Producer for one executor might be better :). // Process stream from each receiver separately // (do not attempt to merge all streams and then re-partition, this causes un-necessary and high amount of shuffle in the job) for (k - kInStreams) { // Re-partition stream from each receiver across all compute nodes to spread out processing load and allows per partition processing // and, set persistence level to spill to disk along with serialization val kInMsgParts = k.repartition(otherConf(dStreamPartitions).toInt). 2015-02-13 13:27 GMT+08:00 Cody Koeninger c...@koeninger.org: outdata.foreachRDD( rdd = rdd.foreachPartition(rec = { val writer = new KafkaOutputService(otherConf(kafkaProducerTopic).toString, propsMap) writer.output(rec) }) ) So this is creating a new kafka producer for every new output partition, right? Have you tried pooling the producers? On Thu, Feb 12, 2015 at 10:52 PM, Tim Smith secs...@gmail.com wrote: 1) Yes, if I disable writing out to kafka and replace it with some very light weight action is rdd.take(1), the app is stable. 2) The partitions I spoke of in the previous mail are the number of partitions I create from each dStream. But yes, since I do processing and writing out, per partition, each dStream partition ends up getting written to a kafka partition. Flow is, broadly: 5 Spark/Kafka Receivers - Split each dStream into 30 partitions (150 partitions) - Apply some transformation logic to each partition - write out each partition to kafka (kafka has 23 partitions). Let me increase the number
Re: Streaming scheduling delay
Just read the thread Are these numbers abnormal for spark streaming? and I think I am seeing similar results - that is - increasing the window seems to be the trick here. I will have to monitor for a few hours/days before I can conclude (there are so many knobs/dials). On Wed, Feb 11, 2015 at 11:16 PM, Tim Smith secs...@gmail.com wrote: On Spark 1.2 (have been seeing this behaviour since 1.0), I have a streaming app that consumes data from Kafka and writes it back to Kafka (different topic). My big problem has been Total Delay. While execution time is usually window size (in seconds), the total delay ranges from a minutes to hours(s) (keeps going up). For a little while, I thought I had solved the issue by bumping up the driver memory. Then I expanded my Kafka cluster to add more nodes and the issue came up again. I tried a few things to smoke out the issue and something tells me the driver is the bottleneck again: 1) From my app, I took out the entire write-out-to-kafka piece. Sure enough, execution, scheduling delay and hence total delay fell to sub second. This assured me that whatever processing I do before writing back to kafka isn't the bottleneck. 2) In my app, I had RDD persistence set at different points but my code wasn't really re-using any RDDs so I took out all explicit persist() statements. And added, spar...unpersist to true in the context. After this, it doesn't seem to matter how much memory I give my executor, the total delay seems to be in the same range. I tried per executor memory from 2G to 12G with no change in total delay so executors aren't memory starved. Also, in the SparkUI, under the Executors tab, all executors show 0/1060MB used when per executor memory is set to 2GB, for example. 3) Input rate in the kafka consumer restricts spikes in incoming data. 4) Tried FIFO and FAIR but didn't make any difference. 5) Adding executors beyond a certain points seems useless (I guess excess ones just sit idle). At any given point in time, the SparkUI shows only one batch pending processing. So with just one batch pending processing, why would the scheduling delay run into minutes/hours if execution time is within the batch window duration? There aren't any failed stages or jobs. Right now, I have 100 executors ( i have tried setting executors from 50-150), each with 2GB and 4 cores and the driver running with 16GB. There are 5 kafka receivers and each incoming stream is split into 40 partitions. Per receiver, input rate is restricted to 2 messages per second. Can anyone help me with clues or areas to look into, for troubleshooting the issue? One nugget I found buried in the code says: The scheduler delay includes the network delay to send the task to the worker machine and to send back the result (but not the time to fetch the task result, if it needed to be fetched from the block manager on the worker). https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala Could this be an issue with the driver being a bottlneck? All the executors posting their logs/stats to the driver? Thanks, Tim
Streaming scheduling delay
On Spark 1.2 (have been seeing this behaviour since 1.0), I have a streaming app that consumes data from Kafka and writes it back to Kafka (different topic). My big problem has been Total Delay. While execution time is usually window size (in seconds), the total delay ranges from a minutes to hours(s) (keeps going up). For a little while, I thought I had solved the issue by bumping up the driver memory. Then I expanded my Kafka cluster to add more nodes and the issue came up again. I tried a few things to smoke out the issue and something tells me the driver is the bottleneck again: 1) From my app, I took out the entire write-out-to-kafka piece. Sure enough, execution, scheduling delay and hence total delay fell to sub second. This assured me that whatever processing I do before writing back to kafka isn't the bottleneck. 2) In my app, I had RDD persistence set at different points but my code wasn't really re-using any RDDs so I took out all explicit persist() statements. And added, spar...unpersist to true in the context. After this, it doesn't seem to matter how much memory I give my executor, the total delay seems to be in the same range. I tried per executor memory from 2G to 12G with no change in total delay so executors aren't memory starved. Also, in the SparkUI, under the Executors tab, all executors show 0/1060MB used when per executor memory is set to 2GB, for example. 3) Input rate in the kafka consumer restricts spikes in incoming data. 4) Tried FIFO and FAIR but didn't make any difference. 5) Adding executors beyond a certain points seems useless (I guess excess ones just sit idle). At any given point in time, the SparkUI shows only one batch pending processing. So with just one batch pending processing, why would the scheduling delay run into minutes/hours if execution time is within the batch window duration? There aren't any failed stages or jobs. Right now, I have 100 executors ( i have tried setting executors from 50-150), each with 2GB and 4 cores and the driver running with 16GB. There are 5 kafka receivers and each incoming stream is split into 40 partitions. Per receiver, input rate is restricted to 2 messages per second. Can anyone help me with clues or areas to look into, for troubleshooting the issue? One nugget I found buried in the code says: The scheduler delay includes the network delay to send the task to the worker machine and to send back the result (but not the time to fetch the task result, if it needed to be fetched from the block manager on the worker). https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala Could this be an issue with the driver being a bottlneck? All the executors posting their logs/stats to the driver? Thanks, Tim
Re: Multiple Kafka Receivers and Union
Good to know it worked out and thanks for the update. I didn't realize you need to provision for receiver workers + processing workers. One would think a worker would process multiple stages of an app/job and receive is just a stage of the job. On Thu, Sep 25, 2014 at 12:05 PM, Matt Narrell matt.narr...@gmail.com wrote: Additionally, If I dial up/down the number of executor cores, this does what I want. Thanks for the extra eyes! mn On Sep 25, 2014, at 12:34 PM, Matt Narrell matt.narr...@gmail.com wrote: Tim, I think I understand this now. I had a five node Spark cluster and a five partition topic, and I created five receivers. I found this: http://stackoverflow.com/questions/25785581/custom-receiver-stalls-worker-in-spark-streaming Indicating that if I use all my workers as receivers, there are none left to do the processing. If I drop the number of partitions/receivers down while still having multiple unioned receivers, I see messages. mn On Sep 25, 2014, at 10:18 AM, Matt Narrell matt.narr...@gmail.com wrote: I suppose I have other problems as I can’t get the Scala example to work either. Puzzling, as I have literally coded like the examples (that are purported to work), but no luck. mn On Sep 24, 2014, at 11:27 AM, Tim Smith secs...@gmail.com wrote: Maybe differences between JavaPairDStream and JavaPairReceiverInputDStream? On Wed, Sep 24, 2014 at 7:46 AM, Matt Narrell matt.narr...@gmail.com wrote: The part that works is the commented out, single receiver stream below the loop. It seems that when I call KafkaUtils.createStream more than once, I don’t receive any messages. I’ll dig through the logs, but at first glance yesterday I didn’t see anything suspect. I’ll have to look closer. mn On Sep 23, 2014, at 6:14 PM, Tim Smith secs...@gmail.com wrote: Maybe post the before-code as in what was the code before you did the loop (that worked)? I had similar situations where reviewing code before (worked) and after (does not work) helped. Also, what helped is the Scala REPL because I can see what are the object types being returned by each statement. Other than code, in the driver logs, you should see events that say Registered receiver for stream 0 from akka.tcp://sp...@node5.acme.net:53135 Now, if you goto node5 and look at Spark or YarnContainer logs (depending on who's doing RM), you should be able to see if the receiver has any errors when trying to talk to kafka. On Tue, Sep 23, 2014 at 3:21 PM, Matt Narrell matt.narr...@gmail.com wrote: To my eyes, these are functionally equivalent. I’ll try a Scala approach, but this may cause waves for me upstream (e.g., non-Java) Thanks for looking at this. If anyone else can see a glaring issue in the Java approach that would be appreciated. Thanks, Matt On Sep 23, 2014, at 4:13 PM, Tim Smith secs...@gmail.com wrote: Sorry, I am almost Java illiterate but here's my Scala code to do the equivalent (that I have tested to work): val kInStreams = (1 to 10).map{_ = KafkaUtils.createStream(ssc,zkhost.acme.net:2182,myGrp,Map(myTopic - 1), StorageLevel.MEMORY_AND_DISK_SER) } //Create 10 receivers across the cluster, one for each partition, potentially but active receivers are only as many kafka partitions you have val kInMsg = ssc.union(kInStreams).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER) On Tue, Sep 23, 2014 at 2:20 PM, Matt Narrell matt.narr...@gmail.com wrote: So, this is scrubbed some for confidentiality, but the meat of it is as follows. Note, that if I substitute the commented section for the loop, I receive messages from the topic. SparkConf sparkConf = new SparkConf(); sparkConf.set(spark.streaming.unpersist, true); sparkConf.set(spark.logConf, true); MapString, String kafkaProps = new HashMap(); kafkaProps.put(zookeeper.connect, Constants.ZK_ENSEMBLE + /kafka); kafkaProps.put(group.id, groupId); JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Seconds.apply(1)); jsc.checkpoint(hdfs://some_location); ListJavaPairDStreamString, ProtobufModel streamList = new ArrayList(5); for (int i = 0; i 5; i++) { streamList.add(KafkaUtils.createStream(jsc, String.class, ProtobufModel.class, StringDecoder.class, ProtobufModelDecoder.class, kafkaProps, Collections.singletonMap(topic, 1), StorageLevel.MEMORY_ONLY_SER())); } final JavaPairDStreamString, ProtobufModel stream = jsc.union(streamList.get(0), streamList.subList(1, streamList.size())); // final JavaPairReceiverInputDStreamString, ProtobufModel stream = // KafkaUtils.createStream(jsc, // String.class, ProtobufModel.class, // StringDecoder.class
Re: Multiple Kafka Receivers and Union
Maybe differences between JavaPairDStream and JavaPairReceiverInputDStream? On Wed, Sep 24, 2014 at 7:46 AM, Matt Narrell matt.narr...@gmail.com wrote: The part that works is the commented out, single receiver stream below the loop. It seems that when I call KafkaUtils.createStream more than once, I don’t receive any messages. I’ll dig through the logs, but at first glance yesterday I didn’t see anything suspect. I’ll have to look closer. mn On Sep 23, 2014, at 6:14 PM, Tim Smith secs...@gmail.com wrote: Maybe post the before-code as in what was the code before you did the loop (that worked)? I had similar situations where reviewing code before (worked) and after (does not work) helped. Also, what helped is the Scala REPL because I can see what are the object types being returned by each statement. Other than code, in the driver logs, you should see events that say Registered receiver for stream 0 from akka.tcp://sp...@node5.acme.net:53135 Now, if you goto node5 and look at Spark or YarnContainer logs (depending on who's doing RM), you should be able to see if the receiver has any errors when trying to talk to kafka. On Tue, Sep 23, 2014 at 3:21 PM, Matt Narrell matt.narr...@gmail.com wrote: To my eyes, these are functionally equivalent. I’ll try a Scala approach, but this may cause waves for me upstream (e.g., non-Java) Thanks for looking at this. If anyone else can see a glaring issue in the Java approach that would be appreciated. Thanks, Matt On Sep 23, 2014, at 4:13 PM, Tim Smith secs...@gmail.com wrote: Sorry, I am almost Java illiterate but here's my Scala code to do the equivalent (that I have tested to work): val kInStreams = (1 to 10).map{_ = KafkaUtils.createStream(ssc,zkhost.acme.net:2182,myGrp,Map(myTopic - 1), StorageLevel.MEMORY_AND_DISK_SER) } //Create 10 receivers across the cluster, one for each partition, potentially but active receivers are only as many kafka partitions you have val kInMsg = ssc.union(kInStreams).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER) On Tue, Sep 23, 2014 at 2:20 PM, Matt Narrell matt.narr...@gmail.com wrote: So, this is scrubbed some for confidentiality, but the meat of it is as follows. Note, that if I substitute the commented section for the loop, I receive messages from the topic. SparkConf sparkConf = new SparkConf(); sparkConf.set(spark.streaming.unpersist, true); sparkConf.set(spark.logConf, true); MapString, String kafkaProps = new HashMap(); kafkaProps.put(zookeeper.connect, Constants.ZK_ENSEMBLE + /kafka); kafkaProps.put(group.id, groupId); JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Seconds.apply(1)); jsc.checkpoint(hdfs://some_location); ListJavaPairDStreamString, ProtobufModel streamList = new ArrayList(5); for (int i = 0; i 5; i++) { streamList.add(KafkaUtils.createStream(jsc, String.class, ProtobufModel.class, StringDecoder.class, ProtobufModelDecoder.class, kafkaProps, Collections.singletonMap(topic, 1), StorageLevel.MEMORY_ONLY_SER())); } final JavaPairDStreamString, ProtobufModel stream = jsc.union(streamList.get(0), streamList.subList(1, streamList.size())); // final JavaPairReceiverInputDStreamString, ProtobufModel stream = // KafkaUtils.createStream(jsc, // String.class, ProtobufModel.class, // StringDecoder.class, ProtobufModelDecoder.class, // kafkaProps, // Collections.singletonMap(topic, 5), // StorageLevel.MEMORY_ONLY_SER()); final JavaPairDStreamString, Integer tuples = stream.mapToPair( new PairFunctionTuple2String, ProtobufModel, String, Integer() { @Override public Tuple2String, Integer call(Tuple2String, ProtobufModel tuple) throws Exception { return new Tuple2(tuple._2().getDeviceId(), 1); } }); … and futher Spark functions ... On Sep 23, 2014, at 2:55 PM, Tim Smith secs...@gmail.com wrote: Posting your code would be really helpful in figuring out gotchas. On Tue, Sep 23, 2014 at 9:19 AM, Matt Narrell matt.narr...@gmail.com wrote: Hey, Spark 1.1.0 Kafka 0.8.1.1 Hadoop (YARN/HDFS) 2.5.1 I have a five partition Kafka topic. I can create a single Kafka receiver via KafkaUtils.createStream with five threads in the topic map and consume messages fine. Sifting through the user list and Google, I see that its possible to split the Kafka receiver among the Spark workers such that I can have a receiver per topic, and have this distributed to workers rather
Re: Multiple Kafka Receivers and Union
Posting your code would be really helpful in figuring out gotchas. On Tue, Sep 23, 2014 at 9:19 AM, Matt Narrell matt.narr...@gmail.com wrote: Hey, Spark 1.1.0 Kafka 0.8.1.1 Hadoop (YARN/HDFS) 2.5.1 I have a five partition Kafka topic. I can create a single Kafka receiver via KafkaUtils.createStream with five threads in the topic map and consume messages fine. Sifting through the user list and Google, I see that its possible to split the Kafka receiver among the Spark workers such that I can have a receiver per topic, and have this distributed to workers rather than localized to the driver. I’m following something like this: https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java#L132 But for Kafka obviously. From the Streaming Programming Guide “ Receiving multiple data streams can therefore be achieved by creating multiple input DStreams and configuring them to receive different partitions of the data stream from the source(s). However, I’m not able to consume any messages from Kafka after I perform the union operation. Again, if I create a single, multi-threaded, receiver I can consume messages fine. If I create 5 receivers in a loop, and call jssc.union(…) i get: INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks INFO scheduler.ReceiverTracker: Stream 1 received 0 blocks INFO scheduler.ReceiverTracker: Stream 2 received 0 blocks INFO scheduler.ReceiverTracker: Stream 3 received 0 blocks INFO scheduler.ReceiverTracker: Stream 4 received 0 blocks Do I need to do anything to the unioned DStream? Am I going about this incorrectly? Thanks in advance. Matt - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Multiple Kafka Receivers and Union
Sorry, I am almost Java illiterate but here's my Scala code to do the equivalent (that I have tested to work): val kInStreams = (1 to 10).map{_ = KafkaUtils.createStream(ssc,zkhost.acme.net:2182,myGrp,Map(myTopic - 1), StorageLevel.MEMORY_AND_DISK_SER) } //Create 10 receivers across the cluster, one for each partition, potentially but active receivers are only as many kafka partitions you have val kInMsg = ssc.union(kInStreams).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER) On Tue, Sep 23, 2014 at 2:20 PM, Matt Narrell matt.narr...@gmail.com wrote: So, this is scrubbed some for confidentiality, but the meat of it is as follows. Note, that if I substitute the commented section for the loop, I receive messages from the topic. SparkConf sparkConf = new SparkConf(); sparkConf.set(spark.streaming.unpersist, true); sparkConf.set(spark.logConf, true); MapString, String kafkaProps = new HashMap(); kafkaProps.put(zookeeper.connect, Constants.ZK_ENSEMBLE + /kafka); kafkaProps.put(group.id, groupId); JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Seconds.apply(1)); jsc.checkpoint(hdfs://some_location); ListJavaPairDStreamString, ProtobufModel streamList = new ArrayList(5); for (int i = 0; i 5; i++) { streamList.add(KafkaUtils.createStream(jsc, String.class, ProtobufModel.class, StringDecoder.class, ProtobufModelDecoder.class, kafkaProps, Collections.singletonMap(topic, 1), StorageLevel.MEMORY_ONLY_SER())); } final JavaPairDStreamString, ProtobufModel stream = jsc.union(streamList.get(0), streamList.subList(1, streamList.size())); // final JavaPairReceiverInputDStreamString, ProtobufModel stream = // KafkaUtils.createStream(jsc, // String.class, ProtobufModel.class, // StringDecoder.class, ProtobufModelDecoder.class, // kafkaProps, // Collections.singletonMap(topic, 5), // StorageLevel.MEMORY_ONLY_SER()); final JavaPairDStreamString, Integer tuples = stream.mapToPair( new PairFunctionTuple2String, ProtobufModel, String, Integer() { @Override public Tuple2String, Integer call(Tuple2String, ProtobufModel tuple) throws Exception { return new Tuple2(tuple._2().getDeviceId(), 1); } }); … and futher Spark functions ... On Sep 23, 2014, at 2:55 PM, Tim Smith secs...@gmail.com wrote: Posting your code would be really helpful in figuring out gotchas. On Tue, Sep 23, 2014 at 9:19 AM, Matt Narrell matt.narr...@gmail.com wrote: Hey, Spark 1.1.0 Kafka 0.8.1.1 Hadoop (YARN/HDFS) 2.5.1 I have a five partition Kafka topic. I can create a single Kafka receiver via KafkaUtils.createStream with five threads in the topic map and consume messages fine. Sifting through the user list and Google, I see that its possible to split the Kafka receiver among the Spark workers such that I can have a receiver per topic, and have this distributed to workers rather than localized to the driver. I’m following something like this: https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java#L132 But for Kafka obviously. From the Streaming Programming Guide “ Receiving multiple data streams can therefore be achieved by creating multiple input DStreams and configuring them to receive different partitions of the data stream from the source(s). However, I’m not able to consume any messages from Kafka after I perform the union operation. Again, if I create a single, multi-threaded, receiver I can consume messages fine. If I create 5 receivers in a loop, and call jssc.union(…) i get: INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks INFO scheduler.ReceiverTracker: Stream 1 received 0 blocks INFO scheduler.ReceiverTracker: Stream 2 received 0 blocks INFO scheduler.ReceiverTracker: Stream 3 received 0 blocks INFO scheduler.ReceiverTracker: Stream 4 received 0 blocks Do I need to do anything to the unioned DStream? Am I going about this incorrectly? Thanks in advance. Matt - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Stable spark streaming app
Dibyendu - I am using the Kafka consumer built into Spark streaming. Pulled the jar from here: http://search.maven.org/remotecontent?filepath=org/apache/spark/spark-streaming-kafka_2.10/1.0.0/spark-streaming-kafka_2.10-1.0.0.jar Thanks for the sbt-assembly link, Soumitra. On Wed, Sep 17, 2014 at 5:50 PM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Hi Tim Just curious to know ; Which Kafka Consumer you have used ? Dib On Sep 18, 2014 4:40 AM, Tim Smith secs...@gmail.com wrote: Thanks :) On Wed, Sep 17, 2014 at 2:10 PM, Paul Wais pw...@yelp.com wrote: Thanks Tim, this is super helpful! Question about jars and spark-submit: why do you provide myawesomeapp.jar as the program jar but then include other jars via the --jars argument? Have you tried building one uber jar with all dependencies and just sending that to Spark as your app jar? I guess that is mostly because I am Scala/sbt noob :) How do I create the uber jar? My .sbt file says: name := My Awesome App version := 1.025 scalaVersion := 2.10.4 resolvers += Apache repo at https://repository.apache.org/content/repositories/releases; libraryDependencies += org.apache.spark %% spark-core % 1.0.0 libraryDependencies += org.apache.spark %% spark-streaming % 1.0.0 libraryDependencies += org.apache.spark %% spark-streaming-kafka % 1.0.0 libraryDependencies += org.apache.kafka %% kafka % 0.8.1.1 Then I run sbt package to generate myawesomeapp.jar. Also, have you ever seen any issues with Spark caching your app jar between runs even if it changes? Not that I can tell but then maybe because I use Yarn, I might be shielded from some jar distribution bugs in Spark? On Wed, Sep 17, 2014 at 1:11 PM, Tim Smith secs...@gmail.com wrote: I don't have anything in production yet but I now at least have a stable (running for more than 24 hours) streaming app. Earlier, the app would crash for all sorts of reasons. Caveats/setup: - Spark 1.0.0 (I have no input flow control unlike Spark 1.1) - Yarn for RM - Input and Output to Kafka - CDH 5.1 - 11 node cluster with 32-cores and 48G max container size for each node (Yarn managed) - 5 partition Kafka topic - both in and out - Roughly, an average of 25k messages per second - App written in Scala (warning: I am a Scala noob) Few things I had to add/tweak to get the app to be stable: - The executor JVMs did not have any GC options set, by default. This might be more of a CDH issue. I noticed that while the Yarn container and other Spark ancillary tasks had GC options set at launch but none for the executors. So I played with different GC options and this worked best: SPARK_JAVA_OPTS=-XX:MaxPermSize=512m -XX:NewSize=1024m -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:+AggressiveHeap -XX:MaxHeapFreeRatio=70 -verbosegc -XX:+PrintGCDetails I tried G1GC but for some reason it just didn't work. I am not a Java programmer or expert so my conclusion is purely trial and error based. The GC logs, with these flags, go to the stdout file in the Yarn container logs on each node/worker. You can set SPARK_JAVA_OPTS in spark-env.sh on the driver node and Yarn will respect these. On CDH/CM specifically, even though you don't run Spark as a service (since you are using Yarn for RM), you can goto Spark Client Advanced Configuration Snippet (Safety Valve) for spark-conf/spark-env.sh and set SPARK_JAVA_OPTS there. - Set these two params - spark.yarn.executor.memoryOverhead spark.yarn.driver.memoryOverhead. Earlier, my app would get killed because the executors running the kafka receivers would get killed by Yarn for over utilization of memory. Now, these are my memory settings (I will paste the entire app launch params later in the email): --driver-memory 2G \ --executor-memory 16G \ --spark.yarn.executor.memoryOverhead 4096 \ --spark.yarn.driver.memoryOverhead 1024 \ Your total executor JVM will consume executor-memory minus spark.yarn.executor.memoryOverhead so you should see each executor JVM consuming no more than 12G, in this case. Here is how I launch my app: run=`date +%m-%d-%YT%T`; \ nohup spark-submit --class myAwesomeApp \ --master yarn myawesomeapp.jar \ --jars spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar \ --driver-memory 2G \ --executor-memory 16G \ --executor-cores 16 \ --num-executors 10 \ --spark.serializer org.apache.spark.serializer.KryoSerializer \ --spark.rdd.compress true \ --spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec \ --spark.akka.threads 64 \ --spark.akka.frameSize 500 \ --spark.task.maxFailures 64 \ --spark.scheduler.mode FAIR \ --spark.yarn.executor.memoryOverhead 4096 \ --spark.yarn.driver.memoryOverhead 1024 \ --spark.shuffle.consolidateFiles true \ --spark.default.parallelism 528
Re: Kafka Spark Streaming on Spark 1.1
What kafka receiver are you using? Did you build a new jar for your app with the latest streaming-kafka code for 1.1? On Thu, Sep 18, 2014 at 11:47 AM, JiajiaJing jj.jing0...@gmail.com wrote: Hi Spark Users, We just upgrade our spark version from 1.0 to 1.1. And we are trying to re-run all the written and tested projects we implemented on Spark 1.0. However, when we try to execute the spark streaming project that stream data from Kafka topics, it yields the following error message. I have no idea about why this occurs because the same project runs successfully with Spark 1.0. May I get some help on this please? Thank you very much! 2014-09-18 11:06:08,841 ERROR [sparkDriver-akka.actor.default-dispatcher-4] scheduler.ReceiverTracker (Logging.scala:logError(75)) - Deregistered receiver for stream 0: Error starting receiver 0 - java.lang.AbstractMethodError at org.apache.spark.Logging$class.log(Logging.scala:52) at org.apache.spark.streaming.kafka.KafkaReceiver.log(KafkaInputDStream.scala:66) at org.apache.spark.Logging$class.logInfo(Logging.scala:59) at org.apache.spark.streaming.kafka.KafkaReceiver.logInfo(KafkaInputDStream.scala:66) at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:86) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:662) Best Regards, Jiajia -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Spark-Streaming-on-Spark-1-1-tp14597.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Stable spark streaming app
at 8:30 AM, Soumitra Kumar kumar.soumi...@gmail.com wrote: Hmm, no response to this thread! Adding to it, please share experiences of building an enterprise grade product based on Spark Streaming. I am exploring Spark Streaming for enterprise software and am cautiously optimistic about it. I see huge potential to improve debuggability of Spark. - Original Message - From: Tim Smith secs...@gmail.com To: spark users user@spark.apache.org Sent: Friday, September 12, 2014 10:09:53 AM Subject: Stable spark streaming app Hi, Anyone have a stable streaming app running in production? Can you share some overview of the app and setup like number of nodes, events per second, broad stream processing workflow, config highlights etc? Thanks, Tim - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Stable spark streaming app
Thanks :) On Wed, Sep 17, 2014 at 2:10 PM, Paul Wais pw...@yelp.com wrote: Thanks Tim, this is super helpful! Question about jars and spark-submit: why do you provide myawesomeapp.jar as the program jar but then include other jars via the --jars argument? Have you tried building one uber jar with all dependencies and just sending that to Spark as your app jar? I guess that is mostly because I am Scala/sbt noob :) How do I create the uber jar? My .sbt file says: name := My Awesome App version := 1.025 scalaVersion := 2.10.4 resolvers += Apache repo at https://repository.apache.org/content/repositories/releases; libraryDependencies += org.apache.spark %% spark-core % 1.0.0 libraryDependencies += org.apache.spark %% spark-streaming % 1.0.0 libraryDependencies += org.apache.spark %% spark-streaming-kafka % 1.0.0 libraryDependencies += org.apache.kafka %% kafka % 0.8.1.1 Then I run sbt package to generate myawesomeapp.jar. Also, have you ever seen any issues with Spark caching your app jar between runs even if it changes? Not that I can tell but then maybe because I use Yarn, I might be shielded from some jar distribution bugs in Spark? On Wed, Sep 17, 2014 at 1:11 PM, Tim Smith secs...@gmail.com wrote: I don't have anything in production yet but I now at least have a stable (running for more than 24 hours) streaming app. Earlier, the app would crash for all sorts of reasons. Caveats/setup: - Spark 1.0.0 (I have no input flow control unlike Spark 1.1) - Yarn for RM - Input and Output to Kafka - CDH 5.1 - 11 node cluster with 32-cores and 48G max container size for each node (Yarn managed) - 5 partition Kafka topic - both in and out - Roughly, an average of 25k messages per second - App written in Scala (warning: I am a Scala noob) Few things I had to add/tweak to get the app to be stable: - The executor JVMs did not have any GC options set, by default. This might be more of a CDH issue. I noticed that while the Yarn container and other Spark ancillary tasks had GC options set at launch but none for the executors. So I played with different GC options and this worked best: SPARK_JAVA_OPTS=-XX:MaxPermSize=512m -XX:NewSize=1024m -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:+AggressiveHeap -XX:MaxHeapFreeRatio=70 -verbosegc -XX:+PrintGCDetails I tried G1GC but for some reason it just didn't work. I am not a Java programmer or expert so my conclusion is purely trial and error based. The GC logs, with these flags, go to the stdout file in the Yarn container logs on each node/worker. You can set SPARK_JAVA_OPTS in spark-env.sh on the driver node and Yarn will respect these. On CDH/CM specifically, even though you don't run Spark as a service (since you are using Yarn for RM), you can goto Spark Client Advanced Configuration Snippet (Safety Valve) for spark-conf/spark-env.sh and set SPARK_JAVA_OPTS there. - Set these two params - spark.yarn.executor.memoryOverhead spark.yarn.driver.memoryOverhead. Earlier, my app would get killed because the executors running the kafka receivers would get killed by Yarn for over utilization of memory. Now, these are my memory settings (I will paste the entire app launch params later in the email): --driver-memory 2G \ --executor-memory 16G \ --spark.yarn.executor.memoryOverhead 4096 \ --spark.yarn.driver.memoryOverhead 1024 \ Your total executor JVM will consume executor-memory minus spark.yarn.executor.memoryOverhead so you should see each executor JVM consuming no more than 12G, in this case. Here is how I launch my app: run=`date +%m-%d-%YT%T`; \ nohup spark-submit --class myAwesomeApp \ --master yarn myawesomeapp.jar \ --jars spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar \ --driver-memory 2G \ --executor-memory 16G \ --executor-cores 16 \ --num-executors 10 \ --spark.serializer org.apache.spark.serializer.KryoSerializer \ --spark.rdd.compress true \ --spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec \ --spark.akka.threads 64 \ --spark.akka.frameSize 500 \ --spark.task.maxFailures 64 \ --spark.scheduler.mode FAIR \ --spark.yarn.executor.memoryOverhead 4096 \ --spark.yarn.driver.memoryOverhead 1024 \ --spark.shuffle.consolidateFiles true \ --spark.default.parallelism 528 \ logs/normRunLog-$run.log \ 2logs/normRunLogError-$run.log \ echo $! logs/current-run.pid Some code optimizations (or, goof ups that I fixed). I did not scientifically measure the impact of each but I think they helped: - Made all my classes and objects serializable and then use Kryo (as you see above) - I map one receive task for each kafka partition - Instead of doing a union on all the incoming streams and then repartition() I now repartition() each incoming stream and process them separately. I believe this reduces shuffle. - Reduced number of repartitions. I was doing 128 after
Re: Low Level Kafka Consumer for Spark
Hi Dibyendu, I am a little confused about the need for rate limiting input from kafka. If the stream coming in from kafka has higher message/second rate than what a Spark job can process then it should simply build a backlog in Spark if the RDDs are cached on disk using persist(). Right? Thanks, Tim On Mon, Sep 15, 2014 at 4:33 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Hi Alon, No this will not be guarantee that same set of messages will come in same RDD. This fix just re-play the messages from last processed offset in same order. Again this is just a interim fix we needed to solve our use case . If you do not need this message re-play feature, just do not perform the ack ( Acknowledgement) call in the Driver code. Then the processed messages will not be written to ZK and hence replay will not happen. Regards, Dibyendu On Mon, Sep 15, 2014 at 4:48 PM, Alon Pe'er alo...@supersonicads.com wrote: Hi Dibyendu, Thanks for your great work! I'm new to Spark Streaming, so I just want to make sure I understand Driver failure issue correctly. In my use case, I want to make sure that messages coming in from Kafka are always broken into the same set of RDDs, meaning that if a set of messages are assigned to one RDD, and the Driver dies before this RDD is processed, then once the Driver recovers, the same set of messages are assigned to a single RDD, instead of arbitrarily repartitioning the messages across different RDDs. Does your Receiver guarantee this behavior, until the problem is fixed in Spark 1.2? Regards, Alon -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p14233.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Stable spark streaming app
Hi, Anyone have a stable streaming app running in production? Can you share some overview of the app and setup like number of nodes, events per second, broad stream processing workflow, config highlights etc? Thanks, Tim - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Some Serious Issue with Spark Streaming ? Blocks Getting Removed and Jobs have Failed..
Similar issue (Spark 1.0.0). Streaming app runs for a few seconds before these errors start to pop all over the driver logs: 14/09/12 17:30:23 WARN TaskSetManager: Loss was due to java.lang.Exception java.lang.Exception: Could not compute split, block input-4-1410542878200 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.UnionPartition.iterator(UnionRDD.scala:33) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:74) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:77) at org.apache.spark.rdd.RDD.iterator(RDD.scala:227) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) I am using MEMORY_AND_DISK_SER for all my RDDs so I should not be losing any blocks unless I run out of disk space, right? On Fri, Sep 12, 2014 at 5:24 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: I agree, Even the Low Level Kafka Consumer which I have written has tunable IO throttling which help me solve this issue ... But question remains , even if there are large backlog, why Spark drop the unprocessed memory blocks ? Dib On Fri, Sep 12, 2014 at 5:47 PM, Jeoffrey Lim jeoffr...@gmail.com wrote: Our issue could be related to this problem as described in: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-in-1-hour-batch-duration-RDD-files-gets-lost-td14027.html which the DStream is processed for every 1 hour batch duration. I have implemented IO throttling in the Receiver as well in our Kafka consumer, and our backlog is not that large. NFO : org.apache.spark.storage.MemoryStore - 1 blocks selected for dropping INFO : org.apache.spark.storage.BlockManager - Dropping block input-0-1410443074600 from memory INFO : org.apache.spark.storage.MemoryStore - Block input-0-1410443074600 of size 12651900 dropped from memory (free 21220667) INFO : org.apache.spark.storage.BlockManagerInfo - Removed input-0-1410443074600 on ip-10-252-5-113.asskickery.us:53752 in memory (size: 12.1 MB, free: 100.6 MB) The question that I have now is: how to prevent the MemoryStore/BlockManager of dropping the block inputs? And should they be logged in the level WARN/ERROR? Thanks. On Fri, Sep 12, 2014 at 4:45 PM, Dibyendu Bhattacharya [via Apache Spark User List] [hidden email] wrote: Dear all, I am sorry. This was a false alarm There was some issue in the RDD processing logic which leads to large backlog. Once I fixed the issues in my processing logic, I can see all messages being pulled nicely without any Block Removed error. I need to tune certain configurations in my Kafka Consumer to modify the data rate and also the batch size. Sorry again. Regards, Dibyendu On Thu, Sep 11, 2014 at 8:13 PM, Nan Zhu [hidden email] wrote: This is my case about broadcast variable: 14/07/21 19:49:13 INFO Executor: Running task ID 4 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 2) 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 2 in 95 ms on localhost (progress: 3/106) 14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for hdfstest_customers 14/07/21 19:49:13 INFO Executor: Serialized size of result for 3 is 596 14/07/21 19:49:13 INFO Executor: Sending result for 3 directly to driver 14/07/21 19:49:13 INFO BlockManager: Found block broadcast_0 locally 14/07/21 19:49:13 INFO Executor: Finished task ID 3 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:5 as TID 5 on executor localhost: localhost (PROCESS_LOCAL) 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:5 as 11885 bytes in 0 ms 14/07/21 19:49:13 INFO Executor: Running task ID 5 14/07/21 19:49:13 INFO BlockManager: Removing broadcast 0 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 3) 14/07/21 19:49:13 INFO ContextCleaner: Cleaned broadcast 0 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 3 in 97 ms on localhost (progress: 4/106) 14/07/21 19:49:13 INFO BlockManager: Found block broadcast_0 locally 14/07/21 19:49:13 INFO
Where do logs go in StandAlone mode
Spark 1.0.0 I write logs out from my app using this object: object LogService extends Logging { /** Set reasonable logging levels for streaming if the user has not configured log4j. */ def setStreamingLogLevels() { val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements if (!log4jInitialized) { // We first log something to initialize Spark's default logging, then we override the // logging level. logInfo(Setting log level to [WARN] for streaming example. + To override add a custom log4j.properties to the classpath.) Logger.getRootLogger.setLevel(Level.WARN) } } } Later, I set LogService.setStreamingLogLevels() and then use logInfo etc. This works well when I run the app under Yarn, all the logs show up under the container logs but when I run the app in Standalone mode, I can't find these logs in neither the master, worker or driver logs. So where do they go? Thanks, Tim - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Executor garbage collection
Hi, Anyone setting any explicit GC options for the executor jvm? If yes, what and how did you arrive at them? Thanks, - Tim - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Announcing Spark 1.1.0!
Thanks for all the good work. Very excited about seeing more features and better stability in the framework. On Thu, Sep 11, 2014 at 5:12 PM, Patrick Wendell pwend...@gmail.com wrote: I am happy to announce the availability of Spark 1.1.0! Spark 1.1.0 is the second release on the API-compatible 1.X line. It is Spark's largest release ever, with contributions from 171 developers! This release brings operational and performance improvements in Spark core including a new implementation of the Spark shuffle designed for very large scale workloads. Spark 1.1 adds significant extensions to the newest Spark modules, MLlib and Spark SQL. Spark SQL introduces a JDBC server, byte code generation for fast expression evaluation, a public types API, JSON support, and other features and optimizations. MLlib introduces a new statistics library along with several new algorithms and optimizations. Spark 1.1 also builds out Spark's Python support and adds new components to the Spark Streaming module. Visit the release notes [1] to read about the new features, or download [2] the release today. [1] http://spark.eu.apache.org/releases/spark-release-1-1-0.html [2] http://spark.eu.apache.org/downloads.html NOTE: SOME ASF DOWNLOAD MIRRORS WILL NOT CONTAIN THE RELEASE FOR SEVERAL HOURS. Please e-mail me directly for any type-o's in the release notes or name listing. Thanks, and congratulations! - Patrick - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to scale more consumer to Kafka stream
How are you creating your kafka streams in Spark? If you have 10 partitions for a topic, you can call createStream ten times to create 10 parallel receivers/executors and then use union to combine all the dStreams. On Wed, Sep 10, 2014 at 7:16 AM, richiesgr richie...@gmail.com wrote: Hi (my previous post as been used by someone else) I'm building a application the read from kafka stream event. In production we've 5 consumers that share 10 partitions. But on spark streaming kafka only 1 worker act as a consumer then distribute the tasks to workers so I can have only 1 machine acting as consumer but I need more because only 1 consumer means Lags. Do you've any idea what I can do ? Another point is interresting the master is not loaded at all I can get up more than 10 % CPU I've tried to increase the queued.max.message.chunks on the kafka client to read more records thinking it'll speed up the read but I only get ERROR consumer.ConsumerFetcherThread: [ConsumerFetcherThread-SparkEC2_ip-10-138-59-194.ec2.internal-1410182950783-5c49c8e8-0-174167372], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 73; ClientId: SparkEC2-ConsumerFetcherThread-SparkEC2_ip-10-138-59-194.ec2.internal-1410182950783-5c49c8e8-0-174167372; ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: [IA2,7] - PartitionFetchInfo(929838589,1048576),[IA2,6] - PartitionFetchInfo(929515796,1048576),[IA2,9] - PartitionFetchInfo(929577946,1048576),[IA2,8] - PartitionFetchInfo(930751599,1048576),[IA2,2] - PartitionFetchInfo(926457704,1048576),[IA2,5] - PartitionFetchInfo(930774385,1048576),[IA2,0] - PartitionFetchInfo(929913213,1048576),[IA2,3] - PartitionFetchInfo(929268891,1048576),[IA2,4] - PartitionFetchInfo(929949877,1048576),[IA2,1] - PartitionFetchInfo(930063114,1048576) java.lang.OutOfMemoryError: Java heap space Is someone have ideas ? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-scale-more-consumer-to-Kafka-stream-tp13883.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark.cleaner.ttl and spark.streaming.unpersist
I am using Spark 1.0.0 (on CDH 5.1) and have a similar issue. In my case, the receivers die within an hour because Yarn kills the containers for high memory usage. I set ttl.cleaner to 30 seconds but that didn't help. So I don't think stale RDDs are an issue here. I did a jmap -histo on a couple of running receiver processes and in a heap of 30G, roughly ~16G is taken by [B which is byte arrays. Still investigating more and would appreciate pointers for troubleshooting. I have dumped the heap of a receiver and will try to go over it. On Wed, Sep 10, 2014 at 1:43 AM, Luis Ángel Vicente Sánchez langel.gro...@gmail.com wrote: I somehow missed that parameter when I was reviewing the documentation, that should do the trick! Thank you! 2014-09-10 2:10 GMT+01:00 Shao, Saisai saisai.s...@intel.com: Hi Luis, The parameter “spark.cleaner.ttl” and “spark.streaming.unpersist” can be used to remove useless timeout streaming data, the difference is that “spark.cleaner.ttl” is time-based cleaner, it does not only clean streaming input data, but also Spark’s useless metadata; while “spark.streaming.unpersist” is reference-based cleaning mechanism, streaming data will be removed when out of slide duration. Both these two parameter can alleviate the memory occupation of Spark Streaming. But if the data is flooded into Spark Streaming when start up like your situation using Kafka, these two parameters cannot well mitigate the problem. Actually you need to control the input data rate to not inject so fast, you can try “spark.straming.receiver.maxRate” to control the inject rate. Thanks Jerry *From:* Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com] *Sent:* Wednesday, September 10, 2014 5:21 AM *To:* user@spark.apache.org *Subject:* spark.cleaner.ttl and spark.streaming.unpersist The executors of my spark streaming application are being killed due to memory issues. The memory consumption is quite high on startup because is the first run and there are quite a few events on the kafka queues that are consumed at a rate of 100K events per sec. I wonder if it's recommended to use spark.cleaner.ttl and spark.streaming.unpersist together to mitigate that problem. And I also wonder if new RDD are being batched while a RDD is being processed. Regards, Luis
Re: how to choose right DStream batch interval
http://www.slideshare.net/spark-project/deep-divewithsparkstreaming-tathagatadassparkmeetup20130617 Slide 39 covers it. On Tue, Sep 9, 2014 at 9:23 PM, qihong qc...@pivotal.io wrote: Hi Mayur, Thanks for your response. I did write a simple test that set up a DStream with 5 batches; The batch duration is 1 second, and the 3rd batch will take extra 2 seconds, the output of the test shows that the 3rd batch causes backlog, and spark streaming does catch up on 4th and 5th batch (DStream.print was modified to output system time) --- Time: 1409959708000 ms, system time: 1409959708269 --- 1155 --- Time: 1409959709000 ms, system time: 1409959709033 --- 2255 delay 2000 ms --- Time: 140995971 ms, system time: 1409959712036 --- 3355 --- Time: 1409959711000 ms, system time: 1409959712059 --- 4455 --- Time: 1409959712000 ms, system time: 1409959712083 --- Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-choose-right-DStream-batch-interval-tp13578p13855.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark-streaming Could not compute split exception
I had a similar issue and many others - all were basically symptoms for yarn killing the container for high memory usage. Haven't gotten to root cause yet. On Tue, Sep 9, 2014 at 3:18 PM, Marcelo Vanzin van...@cloudera.com wrote: Your executor is exiting or crashing unexpectedly: On Tue, Sep 9, 2014 at 3:13 PM, Penny Espinoza pesp...@societyconsulting.com wrote: org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit code from container container_1410224367331_0006_01_03 is : 1 2014-09-09 21:47:26,345 WARN org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exception from container-launch with container ID: container_1410224367331_0006_01_03 and exit code: 1 You can check the app logs (yarn logs --applicationId [id]) and see why the container is exiting. There's probably an exception happening somewhere. -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark.cleaner.ttl and spark.streaming.unpersist
I switched from Yarn to StandAlone mode and haven't had OOM issue yet. However, now I have Akka issues killing the executor: 2014-09-11 02:43:34,543 INFO akka.actor.LocalActorRef: Message [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from Actor[akka://sparkWorker/deadLetters] to Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkWorker%4010.2.16.8%3A44405-6#1549270895] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. Before I switched from Yarn to Standalone, I tried looking at heaps of running executors. What I found odd was that while both - jmap histo:live and jmap histo showed heap usage in few hundreds of MBytes, Yarn kept showing that memory utilization is in several Gigabytes - eventually leading to the container being killed. I would appreciate if someone can duplicate what I am seeing. Basically: 1. Tail your yarn container logs and see what it is reporting as memory used by the JVM 2. In parallel, run jmap -histo:live pid or jmap histo pid on the executor process. They should be about the same, right? Also, in the heap dump, 99% of the heap seems to be occupied with unreachable objects (and most of it is byte arrays). On Wed, Sep 10, 2014 at 12:06 PM, Tim Smith secs...@gmail.com wrote: Actually, I am not doing any explicit shuffle/updateByKey or other transform functions. In my program flow, I take in data from Kafka, match each message against a list of regex and then if a msg matches a regex then extract groups, stuff them in json and push out back to kafka (different topic). So there is really no dependency between two messages in terms of processing. Here's my container histogram: http://pastebin.com/s3nAT3cY Essentially, my app is a cluster grep on steroids. On Wed, Sep 10, 2014 at 11:34 AM, Yana Kadiyska yana.kadiy...@gmail.com wrote: Tim, I asked a similar question twice: here http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-executors-to-stay-alive-tt12940.html and here http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Executor-OOM-tt12383.html and have not yet received any responses. I noticed that the heapdump only contains a very large byte array consuming about 66%(the second link contains a picture of my heap -- I ran with a small heap to be able to get the failure quickly) I don't have solutions but wanted to affirm that I've observed a similar situation... On Wed, Sep 10, 2014 at 2:24 PM, Tim Smith secs...@gmail.com wrote: I am using Spark 1.0.0 (on CDH 5.1) and have a similar issue. In my case, the receivers die within an hour because Yarn kills the containers for high memory usage. I set ttl.cleaner to 30 seconds but that didn't help. So I don't think stale RDDs are an issue here. I did a jmap -histo on a couple of running receiver processes and in a heap of 30G, roughly ~16G is taken by [B which is byte arrays. Still investigating more and would appreciate pointers for troubleshooting. I have dumped the heap of a receiver and will try to go over it. On Wed, Sep 10, 2014 at 1:43 AM, Luis Ángel Vicente Sánchez langel.gro...@gmail.com wrote: I somehow missed that parameter when I was reviewing the documentation, that should do the trick! Thank you! 2014-09-10 2:10 GMT+01:00 Shao, Saisai saisai.s...@intel.com: Hi Luis, The parameter “spark.cleaner.ttl” and “spark.streaming.unpersist” can be used to remove useless timeout streaming data, the difference is that “spark.cleaner.ttl” is time-based cleaner, it does not only clean streaming input data, but also Spark’s useless metadata; while “spark.streaming.unpersist” is reference-based cleaning mechanism, streaming data will be removed when out of slide duration. Both these two parameter can alleviate the memory occupation of Spark Streaming. But if the data is flooded into Spark Streaming when start up like your situation using Kafka, these two parameters cannot well mitigate the problem. Actually you need to control the input data rate to not inject so fast, you can try “spark.straming.receiver.maxRate” to control the inject rate. Thanks Jerry From: Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com] Sent: Wednesday, September 10, 2014 5:21 AM To: user@spark.apache.org Subject: spark.cleaner.ttl and spark.streaming.unpersist The executors of my spark streaming application are being killed due to memory issues. The memory consumption is quite high on startup because is the first run and there are quite a few events on the kafka queues that are consumed at a rate of 100K events per sec. I wonder if it's recommended to use spark.cleaner.ttl and spark.streaming.unpersist together to mitigate that problem. And I also wonder if new RDD are being batched while a RDD is being
Re: Low Level Kafka Consumer for Spark
Thanks TD. Someone already pointed out to me that /repartition(...)/ isn't the right way. You have to /val partedStream = repartition(...)/. Would be nice to have it fixed in the docs. On Fri, Sep 5, 2014 at 10:44 AM, Tathagata Das tathagata.das1...@gmail.com wrote: Some thoughts on this thread to clarify the doubts. 1. Driver recovery: The current (1.1 to be released) does not recover the raw data that has been received but not processes. This is because when the driver dies, the executors die and so does the raw data that was stored in it. Only for HDFS, the data is not lost by driver recovery as the data is already present reliably in HDFS. This is something we want to fix by Spark 1.2 (3 month from now). Regarding recovery by replaying the data from Kafka, it is possible but tricky. Our goal is to provide strong guarantee, exactly-once semantics in all transformations. To guarantee this for all kinds of streaming computations stateful and not-stateful computations, it is requires that the data be replayed through Kafka in exactly same order, and the underlying blocks of data in Spark be regenerated in the exact way as it would have if there was no driver failure. This is quite tricky to implement, requires manipulation of zookeeper offsets, etc, that is hard to do with the high level consumer that KafkaUtil uses. Dibyendu's low level Kafka receiver may enable such approaches in the future. For now we definitely plan to solve the first problem very very soon. 3. Repartitioning: I am trying to understand the repartition issue. One common mistake I have seen is that developers repartition a stream but not use the repartitioned stream. WRONG: inputDstream.repartition(100) inputDstream.map(...).count().print() RIGHT: val repartitionedDStream = inputDStream.repartitoin(100) repartitionedDStream.map(...).count().print() Not sure if this helps solve the problem that you all the facing. I am going to add this to the stremaing programming guide to make sure this common mistake is avoided. TD On Wed, Sep 3, 2014 at 10:38 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Hi, Sorry for little delay . As discussed in this thread, I have modified the Kafka-Spark-Consumer ( https://github.com/dibbhatt/kafka-spark-consumer) code to have dedicated Receiver for every Topic Partition. You can see the example howto create Union of these receivers in consumer.kafka.client.Consumer.java . Thanks to Chris for suggesting this change. Regards, Dibyendu On Mon, Sep 1, 2014 at 2:55 AM, RodrigoB rodrigo.boav...@aspect.com wrote: Just a comment on the recovery part. Is it correct to say that currently Spark Streaming recovery design does not consider re-computations (upon metadata lineage recovery) that depend on blocks of data of the received stream? https://issues.apache.org/jira/browse/SPARK-1647 Just to illustrate a real use case (mine): - We have object states which have a Duration field per state which is incremented on every batch interval. Also this object state is reset to 0 upon incoming state changing events. Let's supposed there is at least one event since the last data checkpoint. This will lead to inconsistency upon driver recovery: The Duration field will get incremented from the data checkpoint version until the recovery moment, but the state change event will never be re-processed...so in the end we have the old state with the wrong Duration value. To make things worst, let's imagine we're dumping the Duration increases somewhere...which means we're spreading the problem across our system. Re-computation awareness is something I've commented on another thread and rather treat it separately. http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-td12568.html#a13205 Re-computations do occur, but the only RDD's that are recovered are the ones from the data checkpoint. This is what we've seen. Is not enough by itself to ensure recovery of computed data and this partial recovery leads to inconsistency in some cases. Roger - I share the same question with you - I'm just not sure if the replicated data really gets persisted on every batch. The execution lineage is checkpointed, but if we have big chunks of data being consumed to Receiver node on let's say a second bases then having it persisted to HDFS every second could be a big challenge for keeping JVM performance - maybe that could be reason why it's not really implemented...assuming it isn't. Dibyendu had a great effort with the offset controlling code but the general state consistent recovery feels to me like another big issue to address. I plan on having a dive into the Streaming code and try to at least contribute with some ideas. Some more insight from anyone on the dev team will be very appreciated. tnks, Rod -- View this message in context:
Re: Publishing a transformed DStream to Kafka
I'd be interested in finding the answer too. Right now, I do: val kafkaOutMsgs = kafkInMessages.map(x=myFunc(x._2,someParam)) kafkaOutMsgs.foreachRDD((rdd,time) = { rdd.foreach(rec = { writer.output(rec) }) } ) //where writer.ouput is a method that takes a string and writer is an instance of a producer class. On Tue, Sep 2, 2014 at 10:12 AM, Massimiliano Tomassi max.toma...@gmail.com wrote: Hello all, after having applied several transformations to a DStream I'd like to publish all the elements in all the resulting RDDs to Kafka. What the best way to do that would be? Just using DStream.foreach and then RDD.foreach ? Is there any other built in utility for this use case? Thanks a lot, Max -- Massimiliano Tomassi e-mail: max.toma...@gmail.com
Re: Low Level Kafka Consumer for Spark
I'd be interested to understand this mechanism as well. But this is the error recovery part of the equation. Consuming from Kafka has two aspects - parallelism and error recovery and I am not sure how either works. For error recovery, I would like to understand how: - A failed receiver gets re-spawned. In 1.0.0, despite settings failed tasks threshold to 64, my job aborts after 4 receiver task failures. - Data loss recovery due to a failed receiver task/executor. For parallelism, I would expect a single createStream() to intelligently map a receiver thread somewhere, one for each kafka partition, but in different JVMs. Also, repartition() does not seem to work as advertised. A repartition(512) should get nodes other than the receiver nodes to get some RDDs to process. No? On Sat, Aug 30, 2014 at 7:14 PM, Roger Hoover roger.hoo...@gmail.com wrote: I have this same question. Isn't there somewhere that the Kafka range metadata can be saved? From my naive perspective, it seems like it should be very similar to HDFS lineage. The original HDFS blocks are kept somewhere (in the driver?) so that if an RDD partition is lost, it can be recomputed. In this case, all we need is the Kafka topic, partition, and offset range. Can someone enlighten us on why two copies of the RDD are needed (or some other mechanism like a WAL) for fault tolerance when using Kafka but not when reading from say HDFS? On Fri, Aug 29, 2014 at 8:58 AM, Jonathan Hodges hodg...@gmail.com wrote: 'this 2-node replication is mainly for failover in case the receiver dies while data is in flight. there's still chance for data loss as there's no write ahead log on the hot path, but this is being addressed.' Can you comment a little on how this will be addressed, will there be a durable WAL? Is there a JIRA for tracking this effort? I am curious without WAL if you can avoid this data loss with explicit management of Kafka offsets e.g. don't commit offset unless data is replicated to multiple nodes or maybe not until processed. The incoming data will always be durably stored to disk in Kafka so can be replayed in failure scenarios to avoid data loss if the offsets are managed properly. On Thu, Aug 28, 2014 at 12:02 PM, Chris Fregly ch...@fregly.com wrote: @bharat- overall, i've noticed a lot of confusion about how Spark Streaming scales - as well as how it handles failover and checkpointing, but we can discuss that separately. there's actually 2 dimensions to scaling here: receiving and processing. *Receiving* receiving can be scaled out by submitting new DStreams/Receivers to the cluster as i've done in the Kinesis example. in fact, i purposely chose to submit multiple receivers in my Kinesis example because i feel it should be the norm and not the exception - particularly for partitioned and checkpoint-capable streaming systems like Kafka and Kinesis. it's the only way to scale. a side note here is that each receiver running in the cluster will immediately replicates to 1 other node for fault-tolerance of that specific receiver. this is where the confusion lies. this 2-node replication is mainly for failover in case the receiver dies while data is in flight. there's still chance for data loss as there's no write ahead log on the hot path, but this is being addressed. this in mentioned in the docs here: https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving *Processing* once data is received, tasks are scheduled across the Spark cluster just like any other non-streaming task where you can specify the number of partitions for reduces, etc. this is the part of scaling that is sometimes overlooked - probably because it works just like regular Spark, but it is worth highlighting. Here's a blurb in the docs: https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-processing the other thing that's confusing with Spark Streaming is that in Scala, you need to explicitly import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions in order to pick up the implicits that allow DStream.reduceByKey and such (versus DStream.transform(rddBatch = rddBatch.reduceByKey()) in other words, DStreams appear to be relatively featureless until you discover this implicit. otherwise, you need to operate on the underlying RDD's explicitly which is not ideal. the Kinesis example referenced earlier in the thread uses the DStream implicits. side note to all of this - i've recently convinced my publisher for my upcoming book, Spark In Action, to let me jump ahead and write the Spark Streaming chapter ahead of other more well-understood libraries. early release is in a month or so. sign up @ http://sparkinaction.com if you wanna get notified. shameless plug that i wouldn't otherwise do, but i really think it will help clear a lot of confusion in this area
Re: Failed to run runJob at ReceiverTracker.scala
I upped the ulimit to 128k files on all nodes. Job crashed again with DAGScheduler: Failed to run runJob at ReceiverTracker.scala:275. Couldn't get the logs because I killed the job and looks like yarn wipe the container logs (not sure why it wipes the logs under /var/log/hadoop-yarn/container). Next time, I will grab the logs while the job is still active/zombie. So is there a limit on how many times a receiver is re-spawned? Thanks, Tim On Thu, Aug 28, 2014 at 10:06 PM, Tathagata Das tathagata.das1...@gmail.com wrote: It did. It got failed and respawned 4 times. In this case, the too many open files is a sign that you need increase the system-wide limit of open files. Try adding ulimit -n 16000 to your conf/spark-env.sh. TD On Thu, Aug 28, 2014 at 5:29 PM, Tim Smith secs...@gmail.com wrote: Appeared after running for a while. I re-ran the job and this time, it crashed with: 14/08/29 00:18:50 WARN ReceiverTracker: Error reported by receiver for stream 0: Error in block pushing thread - java.net.SocketException: Too many open files Shouldn't the failed receiver get re-spawned on a different worker? On Thu, Aug 28, 2014 at 4:12 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Do you see this error right in the beginning or after running for sometime? The root cause seems to be that somehow your Spark executors got killed, which killed receivers and caused further errors. Please try to take a look at the executor logs of the lost executor to find what is the root cause that caused the executor to fail. TD On Thu, Aug 28, 2014 at 3:54 PM, Tim Smith secs...@gmail.com wrote: Hi, Have a Spark-1.0.0 (CDH5) streaming job reading from kafka that died with: 14/08/28 22:28:15 INFO DAGScheduler: Failed to run runJob at ReceiverTracker.scala:275 Exception in thread Thread-59 14/08/28 22:28:15 INFO YarnClientClusterScheduler: Cancelling stage 2 14/08/28 22:28:15 INFO DAGScheduler: Executor lost: 5 (epoch 4) 14/08/28 22:28:15 INFO BlockManagerMasterActor: Trying to remove executor 5 from BlockManagerMaster. 14/08/28 22:28:15 INFO BlockManagerMaster: Removed 5 successfully in removeExecutor org.apache.spark.SparkException: Job aborted due to stage failure: Task 2.0:0 failed 4 times, most recent failure: TID 6481 on host node-dn1-1.ops.sfdc.net failed for unknown reason Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Any insights into this error? Thanks, Tim - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: DStream repartitioning, performance tuning processing
I set partitions to 64: // kInMsg.repartition(64) val outdata = kInMsg.map(x=normalizeLog(x._2,configMap)) // Still see all activity only on the two nodes that seem to be receiving from Kafka. On Thu, Aug 28, 2014 at 5:47 PM, Tim Smith secs...@gmail.com wrote: TD - Apologies, didn't realize I was replying to you instead of the list. What does numPartitions refer to when calling createStream? I read an earlier thread that seemed to suggest that numPartitions translates to partitions created on the Spark side? http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201407.mbox/%3ccaph-c_o04j3njqjhng5ho281mqifnf3k_r6coqxpqh5bh6a...@mail.gmail.com%3E Actually, I re-tried with 64 numPartitions in createStream and that didn't work. I will manually set repartition to 64/128 and see how that goes. Thanks. On Thu, Aug 28, 2014 at 5:42 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Having 16 partitions in KafkaUtils.createStream does not translate to the RDDs in Spark / Spark Streaming having 16 partitions. Repartition is the best way to distribute the received data between all the nodes, as long as there are sufficient number of partitions (try setting it to 2x the number cores given to the application). Yeah, in 1.0.0, ttl should be unnecessary. On Thu, Aug 28, 2014 at 5:17 PM, Tim Smith secs...@gmail.com wrote: On Thu, Aug 28, 2014 at 4:19 PM, Tathagata Das tathagata.das1...@gmail.com wrote: If you are repartitioning to 8 partitions, and your node happen to have at least 4 cores each, its possible that all 8 partitions are assigned to only 2 nodes. Try increasing the number of partitions. Also make sure you have executors (allocated by YARN) running on more than two nodes if you want to use all 11 nodes in your yarn cluster. If you look at the code, I commented out the manual re-partitioning to 8. Instead, I am created 16 partitions when I call createStream. But I will increase the partitions to, say, 64 and see if I get better parallelism. If you are using Spark 1.x, then you dont need to set the ttl for running Spark Streaming. In case you are using older version, why do you want to reduce it? You could reduce it, but it does increase the risk of the premature cleaning, if once in a while things get delayed by 20 seconds. I dont see much harm in keeping the ttl at 60 seconds (a bit of extra garbage shouldnt hurt performance). I am running 1.0.0 (CDH5) so ttl setting is redundant? But you are right, unless I have memory issues, more aggressive pruning won't help. Thanks, Tim TD On Thu, Aug 28, 2014 at 3:16 PM, Tim Smith secs...@gmail.com wrote: Hi, In my streaming app, I receive from kafka where I have tried setting the partitions when calling createStream or later, by calling repartition - in both cases, the number of nodes running the tasks seems to be stubbornly stuck at 2. Since I have 11 nodes in my cluster, I was hoping to use more nodes. I am starting the job as: nohup spark-submit --class logStreamNormalizer --master yarn log-stream-normalizer_2.10-1.0.jar --jars spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar --executor-memory 30G --spark.cleaner.ttl 60 --executor-cores 8 --num-executors 8 normRunLog-6.log 2normRunLogError-6.log echo $! run-6.pid My main code is: val sparkConf = new SparkConf().setAppName(SparkKafkaTest) val ssc = new StreamingContext(sparkConf,Seconds(5)) val kInMsg = KafkaUtils.createStream(ssc,node-nn1-1:2181/zk_kafka,normApp,Map(rawunstruct - 16)) val propsMap = Map(metadata.broker.list - node-dn1-6:9092,node-dn1-7:9092,node-dn1-8:9092, serializer.class - kafka.serializer.StringEncoder, producer.type - async, request.required.acks - 1) val to_topic = normStruct val writer = new KafkaOutputService(to_topic, propsMap) if (!configMap.keySet.isEmpty) { //kInMsg.repartition(8) val outdata = kInMsg.map(x=normalizeLog(x._2,configMap)) outdata.foreachRDD((rdd,time) = { rdd.foreach(rec = { writer.output(rec) }) } ) } ssc.start() ssc.awaitTermination() In terms of total delay, with a 5 second batch, the delays usually stay under 5 seconds, but sometimes jump to ~10 seconds. As a performance tuning question, does this mean, I can reduce my cleaner ttl from 60 to say 25 (still more than double of the peak delay)? Thanks Tim - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: DStream repartitioning, performance tuning processing
I wrote a long post about how I arrived here but in a nutshell I don't see evidence of re-partitioning and workload distribution across the cluster. My new fangled way of starting the job is: run=`date +%m-%d-%YT%T`; \ nohup spark-submit --class logStreamNormalizer \ --master yarn log-stream-normalizer_2.10-1.0.jar \ --jars spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar \ --driver-memory 8G \ --executor-memory 30G \ --executor-cores 16 \ --num-executors 8 \ --spark.serializer org.apache.spark.serializer.KryoSerializer \ --spark.rdd.compress true \ --spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec \ --spark.akka.threads 16 \ --spark.task.maxFailures 64 \ --spark.scheduler.mode FAIR \ logs/normRunLog-$run.log \ 2logs/normRunLogError-$run.log \ echo $! logs/run-$run.pid Since the job spits out lots of logs, here is how I am trying to determine if any tasks got assigned to non-local executors. $ grep TID logs/normRunLogError-08-29-2014T18\:28\:32.log | grep Starting | grep -v NODE_LOCAL | grep -v PROCESS_LOCAL Yields no lines. If I look at resource pool usage in YARN, this app is assigned 252.5GB of memory, 128 VCores and 9 containers. Am I missing something here? Thanks, Tim On Thu, Aug 28, 2014 at 11:55 PM, Tim Smith secs...@gmail.com wrote: I set partitions to 64: // kInMsg.repartition(64) val outdata = kInMsg.map(x=normalizeLog(x._2,configMap)) // Still see all activity only on the two nodes that seem to be receiving from Kafka. On Thu, Aug 28, 2014 at 5:47 PM, Tim Smith secs...@gmail.com wrote: TD - Apologies, didn't realize I was replying to you instead of the list. What does numPartitions refer to when calling createStream? I read an earlier thread that seemed to suggest that numPartitions translates to partitions created on the Spark side? http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201407.mbox/%3ccaph-c_o04j3njqjhng5ho281mqifnf3k_r6coqxpqh5bh6a...@mail.gmail.com%3E Actually, I re-tried with 64 numPartitions in createStream and that didn't work. I will manually set repartition to 64/128 and see how that goes. Thanks. On Thu, Aug 28, 2014 at 5:42 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Having 16 partitions in KafkaUtils.createStream does not translate to the RDDs in Spark / Spark Streaming having 16 partitions. Repartition is the best way to distribute the received data between all the nodes, as long as there are sufficient number of partitions (try setting it to 2x the number cores given to the application). Yeah, in 1.0.0, ttl should be unnecessary. On Thu, Aug 28, 2014 at 5:17 PM, Tim Smith secs...@gmail.com wrote: On Thu, Aug 28, 2014 at 4:19 PM, Tathagata Das tathagata.das1...@gmail.com wrote: If you are repartitioning to 8 partitions, and your node happen to have at least 4 cores each, its possible that all 8 partitions are assigned to only 2 nodes. Try increasing the number of partitions. Also make sure you have executors (allocated by YARN) running on more than two nodes if you want to use all 11 nodes in your yarn cluster. If you look at the code, I commented out the manual re-partitioning to 8. Instead, I am created 16 partitions when I call createStream. But I will increase the partitions to, say, 64 and see if I get better parallelism. If you are using Spark 1.x, then you dont need to set the ttl for running Spark Streaming. In case you are using older version, why do you want to reduce it? You could reduce it, but it does increase the risk of the premature cleaning, if once in a while things get delayed by 20 seconds. I dont see much harm in keeping the ttl at 60 seconds (a bit of extra garbage shouldnt hurt performance). I am running 1.0.0 (CDH5) so ttl setting is redundant? But you are right, unless I have memory issues, more aggressive pruning won't help. Thanks, Tim TD On Thu, Aug 28, 2014 at 3:16 PM, Tim Smith secs...@gmail.com wrote: Hi, In my streaming app, I receive from kafka where I have tried setting the partitions when calling createStream or later, by calling repartition - in both cases, the number of nodes running the tasks seems to be stubbornly stuck at 2. Since I have 11 nodes in my cluster, I was hoping to use more nodes. I am starting the job as: nohup spark-submit --class logStreamNormalizer --master yarn log-stream-normalizer_2.10-1.0.jar --jars spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar --executor-memory 30G --spark.cleaner.ttl 60 --executor-cores 8 --num-executors 8 normRunLog-6.log 2normRunLogError-6.log echo $! run-6.pid My main code is: val sparkConf = new SparkConf().setAppName(SparkKafkaTest) val ssc = new
Re: DStream repartitioning, performance tuning processing
Crash again. On the driver, logs say: 14/08/29 19:04:55 INFO BlockManagerMaster: Removed 7 successfully in removeExecutor org.apache.spark.SparkException: Job aborted due to stage failure: Task 2.0:0 failed 4 times, most recent failure: TID 6383 on host node-dn1-2-acme.com failed for unknown reason Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) I go look at OS on node-dn1-2 and container logs for TID6383 but find nothing. # grep 6383 stderr 14/08/29 18:52:51 INFO CoarseGrainedExecutorBackend: Got assigned task 6383 14/08/29 18:52:51 INFO Executor: Running task ID 6383 However, last message on the container is timestamped 19:04:51 that tells me the executor was killed for some reason right before the driver noticed that executor/task failure. How come my task failed only after 4 times although my config says failure threshold is 64? On Fri, Aug 29, 2014 at 12:00 PM, Tim Smith secs...@gmail.com wrote: I wrote a long post about how I arrived here but in a nutshell I don't see evidence of re-partitioning and workload distribution across the cluster. My new fangled way of starting the job is: run=`date +%m-%d-%YT%T`; \ nohup spark-submit --class logStreamNormalizer \ --master yarn log-stream-normalizer_2.10-1.0.jar \ --jars spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar \ --driver-memory 8G \ --executor-memory 30G \ --executor-cores 16 \ --num-executors 8 \ --spark.serializer org.apache.spark.serializer.KryoSerializer \ --spark.rdd.compress true \ --spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec \ --spark.akka.threads 16 \ --spark.task.maxFailures 64 \ --spark.scheduler.mode FAIR \ logs/normRunLog-$run.log \ 2logs/normRunLogError-$run.log \ echo $! logs/run-$run.pid Since the job spits out lots of logs, here is how I am trying to determine if any tasks got assigned to non-local executors. $ grep TID logs/normRunLogError-08-29-2014T18\:28\:32.log | grep Starting | grep -v NODE_LOCAL | grep -v PROCESS_LOCAL Yields no lines. If I look at resource pool usage in YARN, this app is assigned 252.5GB of memory, 128 VCores and 9 containers. Am I missing something here? Thanks, Tim On Thu, Aug 28, 2014 at 11:55 PM, Tim Smith secs...@gmail.com wrote: I set partitions to 64: // kInMsg.repartition(64) val outdata = kInMsg.map(x=normalizeLog(x._2,configMap)) // Still see all activity only on the two nodes that seem to be receiving from Kafka. On Thu, Aug 28, 2014 at 5:47 PM, Tim Smith secs...@gmail.com wrote: TD - Apologies, didn't realize I was replying to you instead of the list. What does numPartitions refer to when calling createStream? I read an earlier thread that seemed to suggest that numPartitions translates to partitions created on the Spark side? http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201407.mbox/%3ccaph-c_o04j3njqjhng5ho281mqifnf3k_r6coqxpqh5bh6a...@mail.gmail.com%3E Actually, I re-tried with 64 numPartitions in createStream and that didn't work. I will manually set repartition to 64/128 and see how that goes. Thanks. On Thu, Aug 28
Re: Low Level Kafka Consumer for Spark
Good to see I am not the only one who cannot get incoming Dstreams to repartition. I tried repartition(512) but still no luck - the app stubbornly runs only on two nodes. Now this is 1.0.0 but looking at release notes for 1.0.1 and 1.0.2, I don't see anything that says this was an issue and has been fixed. How do I debug the repartition() statement to see what's the flow after the job hits that statement? On Fri, Aug 29, 2014 at 8:31 AM, bharatvenkat bvenkat.sp...@gmail.com wrote: Chris, I did the Dstream.repartition mentioned in the document on parallelism in receiving, as well as set spark.default.parallelism and it still uses only 2 nodes in my cluster. I notice there is another email thread on the same topic: http://apache-spark-user-list.1001560.n3.nabble.com/DStream-repartitioning-performance-tuning-processing-td13069.html My code is in Java and here is what I have: JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(ssc, zkQuorum, cse-job-play-consumer, kafkaTopicMap); JavaPairDStreamString, String newMessages = messages.repartition(partitionSize);// partitionSize=30 JavaDStreamString lines = newMessages.map(new FunctionTuple2lt;String, String, String() { ... public String call(Tuple2String, String tuple2) { return tuple2._2(); } }); JavaDStreamString words = lines.flatMap(new MetricsComputeFunction() ); JavaPairDStreamString, Integer wordCounts = words.mapToPair( new PairFunctionString, String, Integer() { ... } ); wordCounts.foreachRDD(new FunctionJavaPairRDDlt;String, Integer, Void() {...}); Thanks, Bharat -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13131.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
I create my DStream very simply as: val kInMsg = KafkaUtils.createStream(ssc,zkhost1:2181/zk_kafka,testApp,Map(rawunstruct - 8)) . . eventually, before I operate on the DStream, I repartition it: kInMsg.repartition(512) Are you saying that ^^ repartition doesn't split by dstream into multiple smaller streams? Should I manually create multiple Dstreams like this?: val kInputs = (1 to 10).map {_= KafkaUtils.createStream()} Then I apply some custom logic to it as: val outdata = kInMsg.map(x=normalizeLog(x._2,configMap)) //where normalizeLog takes a String and Map of regex and returns a string In my case, I think I have traced the issue to the receiver executor being killed by Yarn: 14/08/29 22:46:30 ERROR YarnClientClusterScheduler: Lost executor 1 on node-dn1-4-acme.com: remote Akka client disassociated This be the root cause? http://apache-spark-developers-list.1001551.n3.nabble.com/Lost-executor-on-YARN-ALS-iterations-td7916.html https://issues.apache.org/jira/browse/SPARK-2121 On Fri, Aug 29, 2014 at 3:28 PM, Sean Owen so...@cloudera.com wrote: Are you using multiple Dstreams? repartitioning does not affect how many receivers you have. It's on 2 nodes for each receiver. You need multiple partitions in the queue, each consumed by a DStream, if you mean to parallelize consuming the queue. On Fri, Aug 29, 2014 at 11:08 PM, Tim Smith secs...@gmail.com wrote: Good to see I am not the only one who cannot get incoming Dstreams to repartition. I tried repartition(512) but still no luck - the app stubbornly runs only on two nodes. Now this is 1.0.0 but looking at release notes for 1.0.1 and 1.0.2, I don't see anything that says this was an issue and has been fixed. How do I debug the repartition() statement to see what's the flow after the job hits that statement? On Fri, Aug 29, 2014 at 8:31 AM, bharatvenkat bvenkat.sp...@gmail.com wrote: Chris, I did the Dstream.repartition mentioned in the document on parallelism in receiving, as well as set spark.default.parallelism and it still uses only 2 nodes in my cluster. I notice there is another email thread on the same topic: http://apache-spark-user-list.1001560.n3.nabble.com/DStream-repartitioning-performance-tuning-processing-td13069.html My code is in Java and here is what I have: JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(ssc, zkQuorum, cse-job-play-consumer, kafkaTopicMap); JavaPairDStreamString, String newMessages = messages.repartition(partitionSize);// partitionSize=30 JavaDStreamString lines = newMessages.map(new FunctionTuple2lt;String, String, String() { ... public String call(Tuple2String, String tuple2) { return tuple2._2(); } }); JavaDStreamString words = lines.flatMap(new MetricsComputeFunction() ); JavaPairDStreamString, Integer wordCounts = words.mapToPair( new PairFunctionString, String, Integer() { ... } ); wordCounts.foreachRDD(new FunctionJavaPairRDDlt;String, Integer, Void() {...}); Thanks, Bharat -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13131.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
Ok, so I did this: val kInStreams = (1 to 10).map{_ = KafkaUtils.createStream(ssc,zkhost1:2181/zk_kafka,testApp,Map(rawunstruct - 1)) } val kInMsg = ssc.union(kInStreams) val outdata = kInMsg.map(x=normalizeLog(x._2,configMap)) This has improved parallelism. Earlier I would only get a Stream 0. Now I have Streams [0-9]. Of course, since the kafka topic has only three partitions, only three of those streams are active but I am seeing more blocks being pulled across the three streams total that what one was doing earlier. Also, four nodes are actively processing tasks (vs only two earlier) now which actually has me confused. If Streams are active only on 3 nodes then how/why did a 4th node get work? If a 4th got work why aren't more nodes getting work? On Fri, Aug 29, 2014 at 4:11 PM, Tim Smith secs...@gmail.com wrote: I create my DStream very simply as: val kInMsg = KafkaUtils.createStream(ssc,zkhost1:2181/zk_kafka,testApp,Map(rawunstruct - 8)) . . eventually, before I operate on the DStream, I repartition it: kInMsg.repartition(512) Are you saying that ^^ repartition doesn't split by dstream into multiple smaller streams? Should I manually create multiple Dstreams like this?: val kInputs = (1 to 10).map {_= KafkaUtils.createStream()} Then I apply some custom logic to it as: val outdata = kInMsg.map(x=normalizeLog(x._2,configMap)) //where normalizeLog takes a String and Map of regex and returns a string In my case, I think I have traced the issue to the receiver executor being killed by Yarn: 14/08/29 22:46:30 ERROR YarnClientClusterScheduler: Lost executor 1 on node-dn1-4-acme.com: remote Akka client disassociated This be the root cause? http://apache-spark-developers-list.1001551.n3.nabble.com/Lost-executor-on-YARN-ALS-iterations-td7916.html https://issues.apache.org/jira/browse/SPARK-2121 On Fri, Aug 29, 2014 at 3:28 PM, Sean Owen so...@cloudera.com wrote: Are you using multiple Dstreams? repartitioning does not affect how many receivers you have. It's on 2 nodes for each receiver. You need multiple partitions in the queue, each consumed by a DStream, if you mean to parallelize consuming the queue. On Fri, Aug 29, 2014 at 11:08 PM, Tim Smith secs...@gmail.com wrote: Good to see I am not the only one who cannot get incoming Dstreams to repartition. I tried repartition(512) but still no luck - the app stubbornly runs only on two nodes. Now this is 1.0.0 but looking at release notes for 1.0.1 and 1.0.2, I don't see anything that says this was an issue and has been fixed. How do I debug the repartition() statement to see what's the flow after the job hits that statement? On Fri, Aug 29, 2014 at 8:31 AM, bharatvenkat bvenkat.sp...@gmail.com wrote: Chris, I did the Dstream.repartition mentioned in the document on parallelism in receiving, as well as set spark.default.parallelism and it still uses only 2 nodes in my cluster. I notice there is another email thread on the same topic: http://apache-spark-user-list.1001560.n3.nabble.com/DStream-repartitioning-performance-tuning-processing-td13069.html My code is in Java and here is what I have: JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(ssc, zkQuorum, cse-job-play-consumer, kafkaTopicMap); JavaPairDStreamString, String newMessages = messages.repartition(partitionSize);// partitionSize=30 JavaDStreamString lines = newMessages.map(new FunctionTuple2lt;String, String, String() { ... public String call(Tuple2String, String tuple2) { return tuple2._2(); } }); JavaDStreamString words = lines.flatMap(new MetricsComputeFunction() ); JavaPairDStreamString, Integer wordCounts = words.mapToPair( new PairFunctionString, String, Integer() { ... } ); wordCounts.foreachRDD(new FunctionJavaPairRDDlt;String, Integer, Void() {...}); Thanks, Bharat -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13131.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Failed to run runJob at ReceiverTracker.scala
Appeared after running for a while. I re-ran the job and this time, it crashed with: 14/08/29 00:18:50 WARN ReceiverTracker: Error reported by receiver for stream 0: Error in block pushing thread - java.net.SocketException: Too many open files Shouldn't the failed receiver get re-spawned on a different worker? On Thu, Aug 28, 2014 at 4:12 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Do you see this error right in the beginning or after running for sometime? The root cause seems to be that somehow your Spark executors got killed, which killed receivers and caused further errors. Please try to take a look at the executor logs of the lost executor to find what is the root cause that caused the executor to fail. TD On Thu, Aug 28, 2014 at 3:54 PM, Tim Smith secs...@gmail.com wrote: Hi, Have a Spark-1.0.0 (CDH5) streaming job reading from kafka that died with: 14/08/28 22:28:15 INFO DAGScheduler: Failed to run runJob at ReceiverTracker.scala:275 Exception in thread Thread-59 14/08/28 22:28:15 INFO YarnClientClusterScheduler: Cancelling stage 2 14/08/28 22:28:15 INFO DAGScheduler: Executor lost: 5 (epoch 4) 14/08/28 22:28:15 INFO BlockManagerMasterActor: Trying to remove executor 5 from BlockManagerMaster. 14/08/28 22:28:15 INFO BlockManagerMaster: Removed 5 successfully in removeExecutor org.apache.spark.SparkException: Job aborted due to stage failure: Task 2.0:0 failed 4 times, most recent failure: TID 6481 on host node-dn1-1.ops.sfdc.net failed for unknown reason Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Any insights into this error? Thanks, Tim
Re: DStream repartitioning, performance tuning processing
TD - Apologies, didn't realize I was replying to you instead of the list. What does numPartitions refer to when calling createStream? I read an earlier thread that seemed to suggest that numPartitions translates to partitions created on the Spark side? http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201407.mbox/%3ccaph-c_o04j3njqjhng5ho281mqifnf3k_r6coqxpqh5bh6a...@mail.gmail.com%3E Actually, I re-tried with 64 numPartitions in createStream and that didn't work. I will manually set repartition to 64/128 and see how that goes. Thanks. On Thu, Aug 28, 2014 at 5:42 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Having 16 partitions in KafkaUtils.createStream does not translate to the RDDs in Spark / Spark Streaming having 16 partitions. Repartition is the best way to distribute the received data between all the nodes, as long as there are sufficient number of partitions (try setting it to 2x the number cores given to the application). Yeah, in 1.0.0, ttl should be unnecessary. On Thu, Aug 28, 2014 at 5:17 PM, Tim Smith secs...@gmail.com wrote: On Thu, Aug 28, 2014 at 4:19 PM, Tathagata Das tathagata.das1...@gmail.com wrote: If you are repartitioning to 8 partitions, and your node happen to have at least 4 cores each, its possible that all 8 partitions are assigned to only 2 nodes. Try increasing the number of partitions. Also make sure you have executors (allocated by YARN) running on more than two nodes if you want to use all 11 nodes in your yarn cluster. If you look at the code, I commented out the manual re-partitioning to 8. Instead, I am created 16 partitions when I call createStream. But I will increase the partitions to, say, 64 and see if I get better parallelism. If you are using Spark 1.x, then you dont need to set the ttl for running Spark Streaming. In case you are using older version, why do you want to reduce it? You could reduce it, but it does increase the risk of the premature cleaning, if once in a while things get delayed by 20 seconds. I dont see much harm in keeping the ttl at 60 seconds (a bit of extra garbage shouldnt hurt performance). I am running 1.0.0 (CDH5) so ttl setting is redundant? But you are right, unless I have memory issues, more aggressive pruning won't help. Thanks, Tim TD On Thu, Aug 28, 2014 at 3:16 PM, Tim Smith secs...@gmail.com wrote: Hi, In my streaming app, I receive from kafka where I have tried setting the partitions when calling createStream or later, by calling repartition - in both cases, the number of nodes running the tasks seems to be stubbornly stuck at 2. Since I have 11 nodes in my cluster, I was hoping to use more nodes. I am starting the job as: nohup spark-submit --class logStreamNormalizer --master yarn log-stream-normalizer_2.10-1.0.jar --jars spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar --executor-memory 30G --spark.cleaner.ttl 60 --executor-cores 8 --num-executors 8 normRunLog-6.log 2normRunLogError-6.log echo $! run-6.pid My main code is: val sparkConf = new SparkConf().setAppName(SparkKafkaTest) val ssc = new StreamingContext(sparkConf,Seconds(5)) val kInMsg = KafkaUtils.createStream(ssc,node-nn1-1:2181/zk_kafka,normApp,Map(rawunstruct - 16)) val propsMap = Map(metadata.broker.list - node-dn1-6:9092,node-dn1-7:9092,node-dn1-8:9092, serializer.class - kafka.serializer.StringEncoder, producer.type - async, request.required.acks - 1) val to_topic = normStruct val writer = new KafkaOutputService(to_topic, propsMap) if (!configMap.keySet.isEmpty) { //kInMsg.repartition(8) val outdata = kInMsg.map(x=normalizeLog(x._2,configMap)) outdata.foreachRDD((rdd,time) = { rdd.foreach(rec = { writer.output(rec) }) } ) } ssc.start() ssc.awaitTermination() In terms of total delay, with a 5 second batch, the delays usually stay under 5 seconds, but sometimes jump to ~10 seconds. As a performance tuning question, does this mean, I can reduce my cleaner ttl from 60 to say 25 (still more than double of the peak delay)? Thanks Tim
Kafka stream receiver stops input
Hi, I have Spark (1.0.0 on CDH5) running with Kafka 0.8.1.1. I have a streaming jobs that reads from a kafka topic and writes output to another kafka topic. The job starts fine but after a while the input stream stops getting any data. I think these messages show no incoming data on the stream: 14/08/28 00:42:15 INFO ReceiverTracker: Stream 0 received 0 blocks I run the job as: spark-submit --class logStreamNormalizer --master yarn log-stream-normalizer_2.10-1.0.jar --jars spark-streaming-kafka_2.10-1.0.2.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar --executor-memory 6G --spark.cleaner.ttl 60 --executor-cores 4 As soon as I start the job, I see an error like: 14/08/28 00:50:59 INFO BlockManagerInfo: Added input-0-1409187056800 in memory on node6-acme.com:39418 (size: 83.3 MB, free: 3.1 GB) Exception in thread pool-1-thread-7 java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:331) at org.apache.spark.storage.BlockMessage.set(BlockMessage.scala:85) at org.apache.spark.storage.BlockMessage$.fromByteBuffer(BlockMessage.scala:176) at org.apache.spark.storage.BlockMessageArray.set(BlockMessageArray.scala:63) at org.apache.spark.storage.BlockMessageArray$.fromBufferMessage(BlockMessageArray.scala:109) at org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:42) at org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) at org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) at org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:662) at org.apache.spark.network.ConnectionManager$$anon$9.run(ConnectionManager.scala:504) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) But not sure if that is the cause because even after that OOM message, I see data coming in: 14/08/28 00:51:00 INFO ReceiverTracker: Stream 0 received 6 blocks Appreciate any pointers or suggestions to troubleshoot the issue. Thanks - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Updating shared data structure between executors
Hi, I am writing some Scala code to normalize a stream of logs using an input configuration file (multiple regex patterns). To avoid re-starting the job, I can read in a new config file using fileStream and then turn the config file to a map. But I am unsure about how to update a shared map (since broadcast vars cannot be updated)? Any help or pointers will be appreciated. Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org