Re: [Spark Streaming] Dynamic Broadcast Variable Update

2017-05-02 Thread Tim Smith
One, I think, you should take this to the spark developer list.

Two, I suspect broadcast variables aren't the best solution for the use
case, you describe. Maybe an in-memory data/object/file store like tachyon
is a better fit.

Thanks,

Tim


On Tue, May 2, 2017 at 11:56 AM, Nipun Arora 
wrote:

> Hi All,
>
> To support our Spark Streaming based anomaly detection tool, we have made
> a patch in Spark 1.6.2 to dynamically update broadcast variables.
>
> I'll first explain our use-case, which I believe should be common to
> several people using Spark Streaming applications. Broadcast variables are
> often used to store values "machine learning models", which can then be
> used on streaming data to "test" and get the desired results (for our case
> anomalies). Unfortunately, in the current spark, broadcast variables are
> final and can only be initialized once before the initialization of the
> streaming context. Hence, if a new model is learned the streaming system
> cannot be updated without shutting down the application, broadcasting
> again, and restarting the application. Our goal was to re-broadcast
> variables without requiring a downtime of the streaming service.
>
> The key to this implementation is a live re-broadcastVariable() interface,
> which can be triggered in between micro-batch executions, without any
> re-boot required for the streaming application. At a high level the task is
> done by re-fetching broadcast variable information from the spark driver,
> and then re-distribute it to the workers. The micro-batch execution is
> blocked while the update is made, by taking a lock on the execution. We
> have already tested this in our prototype deployment of our anomaly
> detection service and can successfully re-broadcast the broadcast variables
> with no downtime.
>
> We would like to integrate these changes in spark, can anyone please let
> me know the process of submitting patches/ new features to spark. Also. I
> understand that the current version of Spark is 2.1. However, our changes
> have been done and tested on Spark 1.6.2, will this be a problem?
>
> Thanks
> Nipun
>



-- 

--
Thanks,

Tim


Re: Initialize Gaussian Mixture Model using Spark ML dataframe API

2017-05-02 Thread Tim Smith
Yes, I noticed these open issues, both with KMeans and GMM:
https://issues.apache.org/jira/browse/SPARK-13025

Thanks,

Tim


On Mon, May 1, 2017 at 9:01 PM, Yanbo Liang <yblia...@gmail.com> wrote:

> Hi Tim,
>
> Spark ML API doesn't support set initial model for GMM currently. I wish
> we can get this feature in Spark 2.3.
>
> Thanks
> Yanbo
>
> On Fri, Apr 28, 2017 at 1:46 AM, Tim Smith <secs...@gmail.com> wrote:
>
>> Hi,
>>
>> I am trying to figure out the API to initialize a gaussian mixture model
>> using either centroids created by K-means or previously calculated GMM
>> model (I am aware that you can "save" a model and "load" in later but I am
>> not interested in saving a model to a filesystem).
>>
>> The Spark MLlib API lets you do this using SetInitialModel
>> https://spark.apache.org/docs/2.1.0/api/scala/index.html#org
>> .apache.spark.mllib.clustering.GaussianMixture
>>
>> However, I cannot figure out how to do this using Spark ML API. Can
>> anyone please point me in the right direction? I've tried reading the Spark
>> ML code and was wondering if the "set" call lets you do that?
>>
>> --
>> Thanks,
>>
>> Tim
>>
>
>


-- 

--
Thanks,

Tim


Initialize Gaussian Mixture Model using Spark ML dataframe API

2017-04-27 Thread Tim Smith
Hi,

I am trying to figure out the API to initialize a gaussian mixture model
using either centroids created by K-means or previously calculated GMM
model (I am aware that you can "save" a model and "load" in later but I am
not interested in saving a model to a filesystem).

The Spark MLlib API lets you do this using SetInitialModel
https://spark.apache.org/docs/2.1.0/api/scala/index.html#org.apache.spark.mllib.clustering.GaussianMixture

However, I cannot figure out how to do this using Spark ML API. Can anyone
please point me in the right direction? I've tried reading the Spark ML
code and was wondering if the "set" call lets you do that?

--
Thanks,

Tim


Re: Assigning a unique row ID

2017-04-07 Thread Tim Smith
http://stackoverflow.com/questions/37231616/add-a-new-column-to-a-dataframe-new-column-i-want-it-to-be-a-uuid-generator


On Fri, Apr 7, 2017 at 3:56 PM, Everett Anderson 
wrote:

> Hi,
>
> What's the best way to assign a truly unique row ID (rather than a hash)
> to a DataFrame/Dataset?
>
> I originally thought that functions.monotonically_increasing_id would do
> this, but it seems to have a rather unfortunate property that if you add it
> as a column to table A and then derive tables X, Y, Z and save those, the
> row ID values in X, Y, and Z may end up different. I assume this is because
> it delays the actual computation to the point where each of those tables is
> computed.
>
>


-- 

--
Thanks,

Tim


Consuming AWS Cloudwatch logs from Kinesis into Spark

2017-04-05 Thread Tim Smith
I am sharing this code snippet since I spent quite some time figuring it
out and I couldn't find any examples online. Between the Kinesis
documentation, tutorial on AWS site and other code snippets on the
Internet, I was confused about structure/format of the messages that Spark
fetches from Kinesis - base64 encoded, json, gzipped - which one first and
what order.

I tested this on EMR-5.4.0, Amazon Hadoop 2.7.3 and Spark 2.1.0. Hope it
helps others googling for similar info. I tried using Structured Streaming
but (1) it's in Alpha and (2) despite including what I thought were all the
dependencies, it complained of not finding DataSource.Kinesis. You probably
do not need all the libs but I am just too lazy to redact ones you don't
require for the snippet below :)

import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.kinesis._
import
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.rdd.RDD
import java.util.Base64
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.functions.explode
import org.apache.commons.math3.stat.descriptive._
import java.io.File
import java.net.InetAddress
import scala.util.control.NonFatal
import org.apache.spark.SparkFiles
import org.apache.spark.sql.SaveMode
import java.util.Properties;
import org.json4s._
import org.json4s.jackson.JsonMethods._
import java.io.{ByteArrayOutputStream, ByteArrayInputStream}
import java.util.zip.{GZIPOutputStream, GZIPInputStream}
import scala.util.Try


//sc.setLogLevel("INFO")

val ssc = new StreamingContext(sc, Seconds(30))

val kinesisStreams = (0 until 2).map { i => KinesisUtils.createStream(ssc,
"myApp", "cloudwatchlogs",
"https://kinesis.us-east-1.amazonaws.com","us-east-1;,
InitialPositionInStream.LATEST , Seconds(30),
StorageLevel.MEMORY_AND_DISK_2,"myId","mySecret") }

val unionStreams = ssc.union(kinesisStreams)

unionStreams.foreachRDD((rdd: RDD[Array[Byte]], time: Time) => {
  if(rdd.count() > 0) {
  val json = rdd.map(input => {
  val inputStream = new GZIPInputStream(new ByteArrayInputStream(input))
  val record = scala.io.Source.fromInputStream(inputStream).mkString
  compact(render(parse(record)))
  })

  val df = spark.sqlContext.read.json(json)
  val preDF =
df.select($"logGroup",explode($"logEvents").as("events_flat"))
  val penDF = preDF.select($"logGroup",$"events_flat.extractedFields")
  val finalDF =
penDF.select($"logGroup".as("cluster"),$"extractedFields.*")
  finalDF.printSchema()
  finalDF.show()
 }
})

ssc.start



--
Thanks,

Tim


Re: Spark REST Job server feedback?

2015-10-08 Thread Tim Smith
I am curious too - any comparison between the two. Looks like one is
Datastax sponsored and the other is Cloudera. Other than that, any
major/core differences in design/approach?

Thanks,

Tim


On Mon, Sep 28, 2015 at 8:32 AM, Ramirez Quetzal  wrote:

> Anyone has feedback on using Hue / Spark Job Server REST servers?
>
>
> http://gethue.com/how-to-use-the-livy-spark-rest-job-server-for-interactive-spark/
>
> https://github.com/spark-jobserver/spark-jobserver
>
> Many thanks,
>
> Rami
>



-- 

--
Thanks,

Tim


Alter table fails to find table

2015-09-02 Thread Tim Smith
Spark 1.3.0 (CDH 5.4.4)

scala> sqlContext.sql("SHOW TABLES").collect
res18: Array[org.apache.spark.sql.Row] = Array([allactivitydata,true],
[sample_07,false], [sample_08,false])

sqlContext.sql("SELECT COUNT(*) from allactivitydata").collect
res19: Array[org.apache.spark.sql.Row] = Array([1227230])

scala> sqlContext.sql("ALTER TABLE allactivitydata ADD COLUMNS (test
STRING)");
15/09/03 04:23:16 INFO ParseDriver: Parsing command: ALTER TABLE
allactivitydata ADD COLUMNS (test STRING)
15/09/03 04:23:16 INFO ParseDriver: Parse Completed
15/09/03 04:23:16 INFO PerfLogger: 
15/09/03 04:23:16 INFO PerfLogger: 
15/09/03 04:23:16 INFO PerfLogger: 
15/09/03 04:23:16 INFO PerfLogger: 
15/09/03 04:23:16 INFO ParseDriver: Parsing command: ALTER TABLE
allactivitydata ADD COLUMNS (test STRING)
15/09/03 04:23:16 INFO ParseDriver: Parse Completed
15/09/03 04:23:16 INFO PerfLogger: 
15/09/03 04:23:16 INFO PerfLogger: 
15/09/03 04:23:16 ERROR Driver: FAILED: SemanticException [Error 10001]:
Table not found default.allactivitydata
org.apache.hadoop.hive.ql.parse.SemanticException: Table not found
default.allactivitydata
at
org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.getTable(BaseSemanticAnalyzer.java:1332)
at
org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.getTable(BaseSemanticAnalyzer.java:1315)
at
org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer.addInputsOutputsAlterTable(DDLSemanticAnalyzer.java:1387)
at
org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer.addInputsOutputsAlterTable(DDLSemanticAnalyzer.java:1374)
at
org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer.analyzeAlterTableModifyCols(DDLSemanticAnalyzer.java:2611)
at
org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer.analyzeInternal(DDLSemanticAnalyzer.java:255)
at
org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:222)
at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:423)
at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:307)
at org.apache.hadoop.hive.ql.Driver.compileInternal(Driver.java:1112)
at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1160)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1049)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1039)
at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:308)
at
org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:280)
at
org.apache.spark.sql.hive.execution.HiveNativeCommand.run(HiveNativeCommand.scala:33)
at
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:55)
at
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:55)
at
org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:65)
at
org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:1088)
at
org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:1088)
at org.apache.spark.sql.DataFrame.(DataFrame.scala:147)
at org.apache.spark.sql.DataFrame.(DataFrame.scala:130)
at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51)
at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:92)
at
$line83.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:32)
at
$line83.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:37)
at
$line83.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:39)
at
$line83.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:41)
at
$line83.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:43)
at
$line83.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:45)
at
$line83.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:47)
at
$line83.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:49)
at
$line83.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:51)
at $line83.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:53)
at $line83.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:55)
at $line83.$read$$iwC$$iwC$$iwC$$iwC$$iwC.(:57)
at $line83.$read$$iwC$$iwC$$iwC$$iwC.(:59)
at $line83.$read$$iwC$$iwC$$iwC.(:61)
at $line83.$read$$iwC$$iwC.(:63)
at $line83.$read$$iwC.(:65)
at $line83.$read.(:67)
at $line83.$read$.(:71)
at $line83.$read$.()
at $line83.$eval$.(:7)
at $line83.$eval$.()
at $line83.$eval.$print()
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
at

Controlling output fileSize in SparkSQL

2015-07-27 Thread Tim Smith
Hi,

I am using Spark 1.3 (CDH 5.4.4). What's the recipe for setting a minimum
output file size when writing out from SparkSQL? So far, I have tried:
--x-
import sqlContext.implicits._
sc.hadoopConfiguration.setBoolean(fs.hdfs.impl.disable.cache,true)
sc.hadoopConfiguration.setLong(fs.local.block.size,1073741824)
sc.hadoopConfiguration.setLong(dfs.blocksize,1073741824)
sqlContext.sql(SET spark.sql.shuffle.partitions=2)
val df = sqlContext.jsonFile(hdfs://nameservice1/user/joe/samplejson/*)
df.saveAsParquetFile(hdfs://nameservice1/user/joe/data/reduceFiles-Parquet)
--x-

But my output still isn't aggregated into 1+GB files.

Thanks,

- Siddhartha


Re: createDirectStream and Stats

2015-06-19 Thread Tim Smith
Essentially, I went from:
k = createStream .
val dataout = k.map(x=myFunc(x._2,someParams))
dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = {
myOutputFunc.write(rec) })

To:
kIn = createDirectStream .
k = kIn.repartition(numberOfExecutors) //since #kafka partitions 
#spark-executors
val dataout = k.map(x=myFunc(x._2,someParams))
dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = {
myOutputFunc.write(rec) })

With the new API, the app starts up and works fine for a while but I guess
starts to deteriorate after a while. With the existing API createStream,
the app does deteriorate but over a much longer period, hours vs days.






On Fri, Jun 19, 2015 at 1:40 PM, Tathagata Das t...@databricks.com wrote:

 Yes, please tell us what operation are you using.

 TD

 On Fri, Jun 19, 2015 at 11:42 AM, Cody Koeninger c...@koeninger.org
 wrote:

 Is there any more info you can provide / relevant code?

 On Fri, Jun 19, 2015 at 1:23 PM, Tim Smith secs...@gmail.com wrote:

 Update on performance of the new API: the new code using the
 createDirectStream API ran overnight and when I checked the app state in
 the morning, there were massive scheduling delays :(

 Not sure why and haven't investigated a whole lot. For now, switched
 back to the createStream API build of my app. Yes, for the record, this is
 with CDH 5.4.1 and Spark 1.3.



 On Thu, Jun 18, 2015 at 7:05 PM, Tim Smith secs...@gmail.com wrote:

 Thanks for the super-fast response, TD :)

 I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4.
 Cloudera, are you listening? :D





 On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Are you using Spark 1.3.x ? That explains. This issue has been fixed
 in Spark 1.4.0. Bonus you get a fancy new streaming UI with more awesome
 stats. :)

 On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith secs...@gmail.com wrote:

 Hi,

 I just switched from createStream to the createDirectStream API
 for kafka and while things otherwise seem happy, the first thing I 
 noticed
 is that stream/receiver stats are gone from the Spark UI :( Those stats
 were very handy for keeping an eye on health of the app.

 What's the best way to re-create those in the Spark UI? Maintain
 Accumulators? Would be really nice to get back receiver-like stats even
 though I understand that createDirectStream is a receiver-less design.

 Thanks,

 Tim










Re: createDirectStream and Stats

2015-06-19 Thread Tim Smith
I did try without repartition, initially, but that was even more horrible
because instead of the allocated 100 executors, only 30 (which is the
number of kafka partitions) would have to do the work. The MyFunc is a
CPU bound task so adding more memory per executor wouldn't help and I saw
that each of the 30 executors was only using one thread/core on each Spark
box. I could go and play with threading in MyFunc but I don't want to mess
with threading with all the parallelism already involved and I don't think
in-app threading outside of what the framework does is really desirable.

With repartition, there is shuffle involved, but at least the computation
load spreads across all 100 executors instead of just 30.




On Fri, Jun 19, 2015 at 7:14 PM, Cody Koeninger c...@koeninger.org wrote:

 If that's the case, you're still only using as many read executors as
 there are kafka partitions.

 I'd remove the repartition. If you weren't doing any shuffles in the old
 job, and are doing a shuffle in the new job, it's not really comparable.

 On Fri, Jun 19, 2015 at 8:16 PM, Tim Smith secs...@gmail.com wrote:

 On Fri, Jun 19, 2015 at 5:15 PM, Tathagata Das t...@databricks.com
 wrote:

 Also, can you find from the spark UI the break up of the stages in each
 batch's jobs, and find which stage is taking more time after a while?


 Sure, will try to debug/troubleshoot. Are there enhancements to this
 specific API between 1.3 and 1.4 that can substantially change it's
 behaviour?


 On Fri, Jun 19, 2015 at 4:51 PM, Cody Koeninger c...@koeninger.org
 wrote:

 when you say your old version was

 k = createStream .

 were you manually creating multiple receivers?  Because otherwise
 you're only using one receiver on one executor...


 Yes, sorry, the earlier/stable version was more like:
 kInStreams = (1 to n).map{_ = KafkaUtils.createStream  // n
 being the number of kafka partitions, 1 receiver per partition
 val k = ssc.union(kInStreams)
 val dataout = k.map(x=myFunc(x._2,someParams))
 dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = {
 myOutputFunc.write(rec) })

 Thanks,

 Tim






 If that's the case I'd try direct stream without the repartitioning.


 On Fri, Jun 19, 2015 at 6:43 PM, Tim Smith secs...@gmail.com wrote:

 Essentially, I went from:
 k = createStream .
 val dataout = k.map(x=myFunc(x._2,someParams))
 dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = {
 myOutputFunc.write(rec) })

 To:
 kIn = createDirectStream .
 k = kIn.repartition(numberOfExecutors) //since #kafka partitions 
 #spark-executors
 val dataout = k.map(x=myFunc(x._2,someParams))
 dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = {
 myOutputFunc.write(rec) })

 With the new API, the app starts up and works fine for a while but I
 guess starts to deteriorate after a while. With the existing API
 createStream, the app does deteriorate but over a much longer period,
 hours vs days.






 On Fri, Jun 19, 2015 at 1:40 PM, Tathagata Das t...@databricks.com
 wrote:

 Yes, please tell us what operation are you using.

 TD

 On Fri, Jun 19, 2015 at 11:42 AM, Cody Koeninger c...@koeninger.org
 wrote:

 Is there any more info you can provide / relevant code?

 On Fri, Jun 19, 2015 at 1:23 PM, Tim Smith secs...@gmail.com
 wrote:

 Update on performance of the new API: the new code using the
 createDirectStream API ran overnight and when I checked the app state 
 in
 the morning, there were massive scheduling delays :(

 Not sure why and haven't investigated a whole lot. For now,
 switched back to the createStream API build of my app. Yes, for the 
 record,
 this is with CDH 5.4.1 and Spark 1.3.



 On Thu, Jun 18, 2015 at 7:05 PM, Tim Smith secs...@gmail.com
 wrote:

 Thanks for the super-fast response, TD :)

 I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4.
 Cloudera, are you listening? :D





 On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Are you using Spark 1.3.x ? That explains. This issue has been
 fixed in Spark 1.4.0. Bonus you get a fancy new streaming UI with 
 more
 awesome stats. :)

 On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith secs...@gmail.com
 wrote:

 Hi,

 I just switched from createStream to the createDirectStream
 API for kafka and while things otherwise seem happy, the first 
 thing I
 noticed is that stream/receiver stats are gone from the Spark UI :( 
 Those
 stats were very handy for keeping an eye on health of the app.

 What's the best way to re-create those in the Spark UI? Maintain
 Accumulators? Would be really nice to get back receiver-like stats 
 even
 though I understand that createDirectStream is a receiver-less 
 design.

 Thanks,

 Tim















Re: createDirectStream and Stats

2015-06-19 Thread Tim Smith
On Fri, Jun 19, 2015 at 5:15 PM, Tathagata Das t...@databricks.com wrote:

 Also, can you find from the spark UI the break up of the stages in each
 batch's jobs, and find which stage is taking more time after a while?


Sure, will try to debug/troubleshoot. Are there enhancements to this
specific API between 1.3 and 1.4 that can substantially change it's
behaviour?


 On Fri, Jun 19, 2015 at 4:51 PM, Cody Koeninger c...@koeninger.org
 wrote:

 when you say your old version was

 k = createStream .

 were you manually creating multiple receivers?  Because otherwise you're
 only using one receiver on one executor...


Yes, sorry, the earlier/stable version was more like:
kInStreams = (1 to n).map{_ = KafkaUtils.createStream  // n
being the number of kafka partitions, 1 receiver per partition
val k = ssc.union(kInStreams)
val dataout = k.map(x=myFunc(x._2,someParams))
dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = {
myOutputFunc.write(rec) })

Thanks,

Tim






 If that's the case I'd try direct stream without the repartitioning.


 On Fri, Jun 19, 2015 at 6:43 PM, Tim Smith secs...@gmail.com wrote:

 Essentially, I went from:
 k = createStream .
 val dataout = k.map(x=myFunc(x._2,someParams))
 dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = {
 myOutputFunc.write(rec) })

 To:
 kIn = createDirectStream .
 k = kIn.repartition(numberOfExecutors) //since #kafka partitions 
 #spark-executors
 val dataout = k.map(x=myFunc(x._2,someParams))
 dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = {
 myOutputFunc.write(rec) })

 With the new API, the app starts up and works fine for a while but I
 guess starts to deteriorate after a while. With the existing API
 createStream, the app does deteriorate but over a much longer period,
 hours vs days.






 On Fri, Jun 19, 2015 at 1:40 PM, Tathagata Das t...@databricks.com
 wrote:

 Yes, please tell us what operation are you using.

 TD

 On Fri, Jun 19, 2015 at 11:42 AM, Cody Koeninger c...@koeninger.org
 wrote:

 Is there any more info you can provide / relevant code?

 On Fri, Jun 19, 2015 at 1:23 PM, Tim Smith secs...@gmail.com wrote:

 Update on performance of the new API: the new code using the
 createDirectStream API ran overnight and when I checked the app state in
 the morning, there were massive scheduling delays :(

 Not sure why and haven't investigated a whole lot. For now, switched
 back to the createStream API build of my app. Yes, for the record, this 
 is
 with CDH 5.4.1 and Spark 1.3.



 On Thu, Jun 18, 2015 at 7:05 PM, Tim Smith secs...@gmail.com wrote:

 Thanks for the super-fast response, TD :)

 I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4.
 Cloudera, are you listening? :D





 On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Are you using Spark 1.3.x ? That explains. This issue has been
 fixed in Spark 1.4.0. Bonus you get a fancy new streaming UI with more
 awesome stats. :)

 On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith secs...@gmail.com
 wrote:

 Hi,

 I just switched from createStream to the createDirectStream
 API for kafka and while things otherwise seem happy, the first thing I
 noticed is that stream/receiver stats are gone from the Spark UI :( 
 Those
 stats were very handy for keeping an eye on health of the app.

 What's the best way to re-create those in the Spark UI? Maintain
 Accumulators? Would be really nice to get back receiver-like stats 
 even
 though I understand that createDirectStream is a receiver-less 
 design.

 Thanks,

 Tim













Re: createDirectStream and Stats

2015-06-19 Thread Tim Smith
Update on performance of the new API: the new code using the
createDirectStream API ran overnight and when I checked the app state in
the morning, there were massive scheduling delays :(

Not sure why and haven't investigated a whole lot. For now, switched back
to the createStream API build of my app. Yes, for the record, this is with
CDH 5.4.1 and Spark 1.3.



On Thu, Jun 18, 2015 at 7:05 PM, Tim Smith secs...@gmail.com wrote:

 Thanks for the super-fast response, TD :)

 I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4. Cloudera,
 are you listening? :D





 On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Are you using Spark 1.3.x ? That explains. This issue has been fixed in
 Spark 1.4.0. Bonus you get a fancy new streaming UI with more awesome
 stats. :)

 On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith secs...@gmail.com wrote:

 Hi,

 I just switched from createStream to the createDirectStream API for
 kafka and while things otherwise seem happy, the first thing I noticed is
 that stream/receiver stats are gone from the Spark UI :( Those stats were
 very handy for keeping an eye on health of the app.

 What's the best way to re-create those in the Spark UI? Maintain
 Accumulators? Would be really nice to get back receiver-like stats even
 though I understand that createDirectStream is a receiver-less design.

 Thanks,

 Tim







Re: createDirectStream and Stats

2015-06-18 Thread Tim Smith
Thanks for the super-fast response, TD :)

I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4. Cloudera,
are you listening? :D





On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 Are you using Spark 1.3.x ? That explains. This issue has been fixed in
 Spark 1.4.0. Bonus you get a fancy new streaming UI with more awesome
 stats. :)

 On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith secs...@gmail.com wrote:

 Hi,

 I just switched from createStream to the createDirectStream API for
 kafka and while things otherwise seem happy, the first thing I noticed is
 that stream/receiver stats are gone from the Spark UI :( Those stats were
 very handy for keeping an eye on health of the app.

 What's the best way to re-create those in the Spark UI? Maintain
 Accumulators? Would be really nice to get back receiver-like stats even
 though I understand that createDirectStream is a receiver-less design.

 Thanks,

 Tim






createDirectStream and Stats

2015-06-18 Thread Tim Smith
Hi,

I just switched from createStream to the createDirectStream API for
kafka and while things otherwise seem happy, the first thing I noticed is
that stream/receiver stats are gone from the Spark UI :( Those stats were
very handy for keeping an eye on health of the app.

What's the best way to re-create those in the Spark UI? Maintain
Accumulators? Would be really nice to get back receiver-like stats even
though I understand that createDirectStream is a receiver-less design.

Thanks,

Tim


Re: Accumulator in SparkUI for streaming

2015-02-28 Thread Tim Smith
So somehow Spark Streaming doesn't support display of named accumulators in
the WebUI?


On Tue, Feb 24, 2015 at 7:58 AM, Petar Zecevic petar.zece...@gmail.com
wrote:


 Interesting. Accumulators are shown on Web UI if you are using the
 ordinary SparkContext (Spark 1.2). It just has to be named (and that's what
 you did).

 scala val acc = sc.accumulator(0, test accumulator)
 acc: org.apache.spark.Accumulator[Int] = 0
 scala val rdd = sc.parallelize(1 to 1000)
 rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
 parallelize at console:12
 scala rdd.foreach(x = acc += 1)
 scala acc.value
 res1: Int = 1000

 The Stage details page shows:




 On 20.2.2015. 9:25, Tim Smith wrote:

  On Spark 1.2:

  I am trying to capture # records read from a kafka topic:

  val inRecords = ssc.sparkContext.accumulator(0, InRecords)

  ..

  kInStreams.foreach( k =
 {

   k.foreachRDD ( rdd =  inRecords += rdd.count().toInt  )
   inRecords.value


  Question is how do I get the accumulator to show up in the UI? I tried
 inRecords.value but that didn't help. Pretty sure it isn't showing up in
 Stage metrics.

  What's the trick here? collect?

  Thanks,

  Tim





Accumulator in SparkUI for streaming

2015-02-20 Thread Tim Smith
On Spark 1.2:

I am trying to capture # records read from a kafka topic:

val inRecords = ssc.sparkContext.accumulator(0, InRecords)

..

kInStreams.foreach( k =
{

 k.foreachRDD ( rdd =  inRecords += rdd.count().toInt  )
 inRecords.value


Question is how do I get the accumulator to show up in the UI? I tried
inRecords.value but that didn't help. Pretty sure it isn't showing up in
Stage metrics.

What's the trick here? collect?

Thanks,

Tim


How to diagnose could not compute split errors and failed jobs?

2015-02-19 Thread Tim Smith
My streaming app runs fine for a few hours and then starts spewing Could
not compute split, block input-xx-xxx not found errors. After this,
jobs start to fail and batches start to pile up.

My question isn't so much about why this error but rather, how do I trace
what leads to this error? I am using disk+memory for storage so shouldn't
be a case of data loss resulting from memory overrun.

15/02/18 22:04:49 ERROR JobScheduler: Error running job streaming job
142429705 ms.28
org.apache.spark.SparkException: Job aborted due to stage failure: Task 3
in stage 247644.0 failed 64 times, most recent failure: Lost task 3.63 in
stage 247644.0 (TID 3705290, node-dn1-16-test.abcdefg.com):
java.lang.Exception: Could not compute split, block input-28-1424297042500
not found
at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Thanks,

Tim


Re: Spark Streaming output cannot be used as input?

2015-02-18 Thread Tim Smith
+1 for writing the Spark output to Kafka. You can then hang off multiple
compute/storage framework from kafka. I am using a similar pipeline to feed
ElasticSearch and HDFS in parallel. Allows modularity, you can take down
ElasticSearch or HDFS for maintenance without losing (except for some edge
cases) data.

You can even pipeline other Spark streaming apps off kafka to modularize
your processing pipeline so you don't have one single big Spark app doing
all the processing.



On Wed, Feb 18, 2015 at 3:34 PM, Jose Fernandez jfernan...@sdl.com wrote:

  Thanks for the advice folks, it is much appreciated. This seems like a
 pretty unfortunate design flaw. My team was surprised by it.



 I’m going to drop the two-step process and do it all in a single step
 until we get Kafka online.



 *From:* Sean Owen [mailto:so...@cloudera.com]
 *Sent:* Wednesday, February 18, 2015 1:53 AM
 *To:* Emre Sevinc
 *Cc:* Jose Fernandez; user@spark.apache.org
 *Subject:* Re: Spark Streaming output cannot be used as input?



 To clarify, sometimes in the world of Hadoop people freely refer to an
 output 'file' when it's really a directory containing 'part-*' files which
 are pieces of the file. It's imprecise but that's the meaning. I think the
 scaladoc may be referring to 'the path to the file, which includes this
 parent dir, is generated ...' In an inherently distributed system, you want
 to distributed writes and reads, so big files are really made of logical
 files within a directory.



 There is a JIRA open to support nested dirs which has been languishing:
 https://issues.apache.org/jira/browse/SPARK-3586

 I'm hoping to pursue that again with help from tdas after 1.3.

 That's probably the best solution.



 An alternative is to not use the file system as a sort of message queue,
 and instead use something like Kafka. It has a lot of other benefits but
 maybe it's not feasible to add this to your architecture.



 You can merge the files with HDFS APIs without much trouble. The dirs will
 be named consistently according to time and are something you can also
 query for.



 Making 1 partition has implications for parallelism of your job.



 Emre, I think I see what you're getting at but you have the map +
 materialize pattern which i think doesn't have the right guarantees about
 re-execution. Why not foreachRDD?



 Yes you can also consider collecting the whole RDD in foreachRDD and doing
 what you like, including writing to one file. But that would only work if
 the data is always small in each RDD.




   http://www.sdl.com/innovate/sanfran


   SDL PLC confidential, all rights reserved. If you are not the intended
 recipient of this mail SDL requests and requires that you delete it without
 acting upon or copying any of its contents, and we further request that you
 advise us.

 SDL PLC is a public limited company registered in England and Wales.
 Registered number: 02675207.
 Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6
 7DY, UK.

 On Wed, Feb 18, 2015 at 8:50 AM, Emre Sevinc emre.sev...@gmail.com
 wrote:

 Hello Jose,

 We've hit the same issue a couple of months ago. It is possible to write
 directly to files instead of creating directories, but it is not
 straightforward, and I haven't seen any clear demonstration in books,
 tutorials, etc.

 We do something like:

 SparkConf sparkConf = new SparkConf().setAppName(appName);
 JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new
 Duration(batchInterval));
 JavaDStreamString stream = MyModuleApp.initializeJob(ssc);
 MyModuleApp.process(stream);



 And then in the process method:

 @Override public void process(JavaDStreamString inStream) {



 JavaDStreamString json = inStream.map(new 
 MyModuleWorker(jsonSchemaName, validatedJSONoutputDir, 
 rejectedJSONoutputDir));

 forceOutput(json);

   }

  This, in turn, calls the following (I've removed the irrelevant lines to 
 focus on writing):


 public class MyModuleWorker implements FunctionString,String {

   public String call(String json) {


 // process the data and then write it

 writeJSON(json, validatedJSONoutputDir_);

   }



 }

 And the writeJSON method is:

 public static final void writeJSON(String json, String jsonDirPath) throws 
 IOException {

 String jsonFileName = jsonDirPath + / + UUID.randomUUID().toString() + 
 .json.tmp;

 URI uri = URI.create(jsonFileName);

 Configuration conf = new Configuration();

 FileSystem fileSystem = FileSystem.get(uri, conf);

 FSDataOutputStream out = fileSystem.create(new Path(uri));

 out.write(json.getBytes(StandardCharsets.UTF_8));

 out.close();



 fileSystem.rename(new Path(uri),

 new Path(URI.create(jsonDirPath + / + 
 UUID.randomUUID().toString() + .json)));



   }



 Using a similar technique you might be able to achieve your objective.

 Kind regards,

 Emre Sevinç
 http://www.bigindustries.be/





 On Wed, Feb 18, 2015 at 4:32 AM, Jose 

Re: Streaming scheduling delay

2015-02-12 Thread Tim Smith
Hi Gerard,

Great write-up and really good guidance in there.

I have to be honest, I don't know why but setting # of partitions for each
dStream to a low number (5-10) just causes the app to choke/crash. Setting
it to 20 gets the app going but with not so great delays. Bump it up to 30
and I start winning the war where processing time is consistently below
batch time window (20 seconds) except for a batch every few batches where
the compute time spikes 10x the usual.

Following your guide, I took out some logInfo statements I had in the app
but didn't seem to make much difference :(

With a higher time window (20 seconds), I got the app to run stably for a
few hours but then ran into the dreaded java.lang.Exception: Could not
compute split, block input-0-1423761240800 not found. Wonder if I need to
add RDD persistence back?

Also, I am reaching out to Virdata with some ProServ inquiries.

Thanks





On Thu, Feb 12, 2015 at 4:30 AM, Gerard Maas gerard.m...@gmail.com wrote:

 Hi Tim,

 From this:  There are 5 kafka receivers and each incoming stream is
 split into 40 partitions  I suspect that you're creating too many tasks
 for Spark to process on time.
 Could you try some of the 'knobs' I describe here to see if that would
 help?

 http://www.virdata.com/tuning-spark/

 -kr, Gerard.

 On Thu, Feb 12, 2015 at 8:44 AM, Tim Smith secs...@gmail.com wrote:

 Just read the thread Are these numbers abnormal for spark streaming?
 and I think I am seeing similar results - that is - increasing the window
 seems to be the trick here. I will have to monitor for a few hours/days
 before I can conclude (there are so many knobs/dials).



 On Wed, Feb 11, 2015 at 11:16 PM, Tim Smith secs...@gmail.com wrote:

 On Spark 1.2 (have been seeing this behaviour since 1.0), I have a
 streaming app that consumes data from Kafka and writes it back to Kafka
 (different topic). My big problem has been Total Delay. While execution
 time is usually window size (in seconds), the total delay ranges from a
 minutes to hours(s) (keeps going up).

 For a little while, I thought I had solved the issue by bumping up the
 driver memory. Then I expanded my Kafka cluster to add more nodes and the
 issue came up again. I tried a few things to smoke out the issue and
 something tells me the driver is the bottleneck again:

 1) From my app, I took out the entire write-out-to-kafka piece. Sure
 enough, execution, scheduling delay and hence total delay fell to sub
 second. This assured me that whatever processing I do before writing back
 to kafka isn't the bottleneck.

 2) In my app, I had RDD persistence set at different points but my code
 wasn't really re-using any RDDs so I took out all explicit persist()
 statements. And added, spar...unpersist to true in the context. After
 this, it doesn't seem to matter how much memory I give my executor, the
 total delay seems to be in the same range. I tried per executor memory from
 2G to 12G with no change in total delay so executors aren't memory starved.
 Also, in the SparkUI, under the Executors tab, all executors show 0/1060MB
 used when per executor memory is set to 2GB, for example.

 3) Input rate in the kafka consumer restricts spikes in incoming data.

 4) Tried FIFO and FAIR but didn't make any difference.

 5) Adding executors beyond a certain points seems useless (I guess
 excess ones just sit idle).

 At any given point in time, the SparkUI shows only one batch pending
 processing. So with just one batch pending processing, why would the
 scheduling delay run into minutes/hours if execution time is within the
 batch window duration? There aren't any failed stages or jobs.

 Right now, I have 100 executors ( i have tried setting executors from
 50-150), each with 2GB and 4 cores and the driver running with 16GB. There
 are 5 kafka receivers and each incoming stream is split into 40 partitions.
 Per receiver, input rate is restricted to 2 messages per second.

 Can anyone help me with clues or areas to look into, for troubleshooting
 the issue?

 One nugget I found buried in the code says:
 The scheduler delay includes the network delay to send the task to the
 worker machine and to send back the result (but not the time to fetch the
 task result, if it needed to be fetched from the block manager on the
 worker).

 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala

 Could this be an issue with the driver being a bottlneck? All the
 executors posting their logs/stats to the driver?

 Thanks,

 Tim




















Re: Streaming scheduling delay

2015-02-12 Thread Tim Smith
 TaskSetManager: Lost task 54.56 in stage 16291.0
(TID 1042754) on executor nodedn1-17-acme.com: java.lang.Exception (Could
not compute split, block input-4-1423758372200 not found) [duplicate 56]
15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.57 in stage 16291.0
(TID 1042758) on executor nodedn1-17-acme.com: java.lang.Exception (Could
not compute split, block input-4-1423758372200 not found) [duplicate 57]
15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.58 in stage 16291.0
(TID 1042762) on executor nodedn1-12-acme.com: java.lang.Exception (Could
not compute split, block input-4-1423758372200 not found) [duplicate 58]
15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.59 in stage 16291.0
(TID 1042766) on executor nodedn1-23-acme.com: java.lang.Exception (Could
not compute split, block input-4-1423758372200 not found) [duplicate 59]
15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.60 in stage 16291.0
(TID 1042774) on executor nodedn1-20-acme.com: java.lang.Exception (Could
not compute split, block input-4-1423758372200 not found) [duplicate 60]
15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.61 in stage 16291.0
(TID 1042779) on executor nodedn1-13-acme.com: java.lang.Exception (Could
not compute split, block input-4-1423758372200 not found) [duplicate 61]
15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.62 in stage 16291.0
(TID 1042789) on executor nodedn1-20-acme.com: java.lang.Exception (Could
not compute split, block input-4-1423758372200 not found) [duplicate 62]
15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.63 in stage 16291.0
(TID 1042793) on executor nodedn1-15-acme.com: java.lang.Exception (Could
not compute split, block input-4-1423758372200 not found) [duplicate 63]
org.apache.spark.SparkException: Job aborted due to stage failure: Task 54
in stage 16291.0 failed 64 times, most recent failure: Lost task 54.63 in
stage 16291.0 (TID 1042793, nodedn1-15-acme.com): java.lang.Exception:
Could not compute split, block input-4-1423758372200 not found
Exception in thread main org.apache.spark.SparkException: Job aborted due
to stage failure: Task 54 in stage 16291.0 failed 64 times, most recent
failure: Lost task 54.63 in stage 16291.0 (TID 1042793, nodedn1-15-acme.com):
java.lang.Exception: Could not compute split, block input-4-1423758372200
not found


Thanks for looking into it.





On Thu, Feb 12, 2015 at 8:10 PM, Tathagata Das t...@databricks.com wrote:

 Hey Tim,

 Let me get the key points.
 1. If you are not writing back to Kafka, the delay is stable? That is,
 instead of foreachRDD { // write to kafka }  if you do dstream.count,
 then the delay is stable. Right?
 2. If so, then Kafka is the bottleneck. Is the number of partitions, that
 you spoke of the in the second mail, that determines the parallelism in
 writes? Is it stable with 30 partitions?

 Regarding the block exception, could you give me a trace of info level
 logging that leads to this error? Basically I want trace the lifecycle of
 the block.

 TD



 On Thu, Feb 12, 2015 at 6:29 PM, Tim Smith secs...@gmail.com wrote:

 Hi Gerard,

 Great write-up and really good guidance in there.

 I have to be honest, I don't know why but setting # of partitions for
 each dStream to a low number (5-10) just causes the app to choke/crash.
 Setting it to 20 gets the app going but with not so great delays. Bump it
 up to 30 and I start winning the war where processing time is consistently
 below batch time window (20 seconds) except for a batch every few batches
 where the compute time spikes 10x the usual.

 Following your guide, I took out some logInfo statements I had in the
 app but didn't seem to make much difference :(

 With a higher time window (20 seconds), I got the app to run stably for a
 few hours but then ran into the dreaded java.lang.Exception: Could not
 compute split, block input-0-1423761240800 not found. Wonder if I need to
 add RDD persistence back?

 Also, I am reaching out to Virdata with some ProServ inquiries.

 Thanks





 On Thu, Feb 12, 2015 at 4:30 AM, Gerard Maas gerard.m...@gmail.com
 wrote:

 Hi Tim,

 From this:  There are 5 kafka receivers and each incoming stream is
 split into 40 partitions  I suspect that you're creating too many
 tasks for Spark to process on time.
 Could you try some of the 'knobs' I describe here to see if that would
 help?

 http://www.virdata.com/tuning-spark/

 -kr, Gerard.

 On Thu, Feb 12, 2015 at 8:44 AM, Tim Smith secs...@gmail.com wrote:

 Just read the thread Are these numbers abnormal for spark streaming?
 and I think I am seeing similar results - that is - increasing the window
 seems to be the trick here. I will have to monitor for a few hours/days
 before I can conclude (there are so many knobs/dials).



 On Wed, Feb 11, 2015 at 11:16 PM, Tim Smith secs...@gmail.com wrote:

 On Spark 1.2 (have been seeing this behaviour since 1.0), I have a
 streaming app that consumes data from Kafka and writes it back to Kafka
 (different topic). My big problem has been Total

Re: Streaming scheduling delay

2015-02-12 Thread Tim Smith
I replaced the writeToKafka statements with a rdd.count() and sure enough,
I have a stable app with total delay well within the batch window (20
seconds). Here's the total delay lines from the driver log:
15/02/13 06:14:26 INFO JobScheduler: Total delay: 6.521 s for time
142380806 ms (execution: 6.404 s)
15/02/13 06:15:22 INFO JobScheduler: Total delay: 42.396 s for time
142380808 ms (execution: 42.338 s)
15/02/13 06:16:21 INFO JobScheduler: Total delay: 81.879 s for time
142380810 ms (execution: 59.483 s)
15/02/13 06:16:40 INFO JobScheduler: Total delay: 80.242 s for time
142380812 ms (execution: 18.363 s)
15/02/13 06:16:50 INFO JobScheduler: Total delay: 70.342 s for time
142380814 ms (execution: 10.100 s)
15/02/13 06:16:56 INFO JobScheduler: Total delay: 56.551 s for time
142380816 ms (execution: 6.209 s)
15/02/13 06:17:06 INFO JobScheduler: Total delay: 46.405 s for time
142380818 ms (execution: 9.854 s)
15/02/13 06:17:13 INFO JobScheduler: Total delay: 33.443 s for time
142380820 ms (execution: 7.038 s)
15/02/13 06:17:21 INFO JobScheduler: Total delay: 21.483 s for time
142380822 ms (execution: 8.039 s)
15/02/13 06:17:26 INFO JobScheduler: Total delay: 6.697 s for time
142380824 ms (execution: 5.213 s)
15/02/13 06:17:45 INFO JobScheduler: Total delay: 5.814 s for time
142380826 ms (execution: 5.767 s)
15/02/13 06:18:06 INFO JobScheduler: Total delay: 6.905 s for time
142380828 ms (execution: 6.858 s)
15/02/13 06:18:28 INFO JobScheduler: Total delay: 8.604 s for time
142380830 ms (execution: 8.556 s)
15/02/13 06:18:45 INFO JobScheduler: Total delay: 5.631 s for time
142380832 ms (execution: 5.583 s)
15/02/13 06:19:04 INFO JobScheduler: Total delay: 4.838 s for time
142380834 ms (execution: 4.791 s)
15/02/13 06:19:24 INFO JobScheduler: Total delay: 4.467 s for time
142380836 ms (execution: 4.422 s)
15/02/13 06:19:45 INFO JobScheduler: Total delay: 5.779 s for time
142380838 ms (execution: 5.733 s)
15/02/13 06:20:04 INFO JobScheduler: Total delay: 4.747 s for time
142380840 ms (execution: 4.701 s)
15/02/13 06:20:24 INFO JobScheduler: Total delay: 4.829 s for time
142380842 ms (execution: 4.782 s)
15/02/13 06:20:44 INFO JobScheduler: Total delay: 4.724 s for time
142380844 ms (execution: 4.678 s)
15/02/13 06:21:04 INFO JobScheduler: Total delay: 4.110 s for time
142380846 ms (execution: 4.064 s)
15/02/13 06:21:24 INFO JobScheduler: Total delay: 4.562 s for time
142380848 ms (execution: 4.514 s)
15/02/13 06:21:43 INFO JobScheduler: Total delay: 3.999 s for time
142380850 ms (execution: 3.954 s)
15/02/13 06:22:04 INFO JobScheduler: Total delay: 4.353 s for time
142380852 ms (execution: 4.309 s)
15/02/13 06:22:24 INFO JobScheduler: Total delay: 4.712 s for time
142380854 ms (execution: 4.667 s)
15/02/13 06:22:44 INFO JobScheduler: Total delay: 4.726 s for time
142380856 ms (execution: 4.681 s)
15/02/13 06:23:07 INFO JobScheduler: Total delay: 7.860 s for time
142380858 ms (execution: 7.816 s)
15/02/13 06:23:28 INFO JobScheduler: Total delay: 8.426 s for time
142380860 ms (execution: 8.383 s)
15/02/13 06:23:43 INFO JobScheduler: Total delay: 3.857 s for time
142380862 ms (execution: 3.814 s)
15/02/13 06:24:03 INFO JobScheduler: Total delay: 3.936 s for time
142380864 ms (execution: 3.892 s)
15/02/13 06:24:23 INFO JobScheduler: Total delay: 3.810 s for time
142380866 ms (execution: 3.767 s)
15/02/13 06:24:43 INFO JobScheduler: Total delay: 3.889 s for time
142380868 ms (execution: 3.845 s)
15/02/13 06:25:03 INFO JobScheduler: Total delay: 3.553 s for time
142380870 ms (execution: 3.510 s)
15/02/13 06:25:27 INFO JobScheduler: Total delay: 7.031 s for time
142380872 ms (execution: 6.989 s)
15/02/13 06:25:43 INFO JobScheduler: Total delay: 3.636 s for time
142380874 ms (execution: 3.594 s)
15/02/13 06:26:03 INFO JobScheduler: Total delay: 3.425 s for time
142380876 ms (execution: 3.383 s)
15/02/13 06:26:23 INFO JobScheduler: Total delay: 3.939 s for time
142380878 ms (execution: 3.897 s)
15/02/13 06:26:43 INFO JobScheduler: Total delay: 3.640 s for time
142380880 ms (execution: 3.596 s)
15/02/13 06:27:03 INFO JobScheduler: Total delay: 3.905 s for time
142380882 ms (execution: 3.861 s)
15/02/13 06:27:24 INFO JobScheduler: Total delay: 4.068 s for time
142380884 ms (execution: 4.026 s)




On Thu, Feb 12, 2015 at 9:54 PM, Tim Smith secs...@gmail.com wrote:

 TD - I will try count() and report back. Meanwhile, attached is the entire
 driver log that includes the error logs about missing blocks.

 Cody - Let me research a bit about how to do connection pooling. Sorry, I
 am not really a programmer. I did see the connection pooling advise in the
 Spark Streaming Programming guide as an optimization but wasn't sure how to
 implement it. But do you think it will have a significant impact on
 performance?

 Saisai - I think, ideally, I'd rather not do any

Re: Streaming scheduling delay

2015-02-12 Thread Tim Smith
Hi Saisai,

If I understand correctly, you are suggesting that control parallelism by
having number of consumers/executors at least 1:1 for number of kafka
partitions. For example, if I have 50 partitions for a kafka topic then
either have:
- 25 or more executors, 25 receivers, each receiver set to 2 consumer
threads per topic, or,
- 50 or more executors, 50 receivers, each receiver set to 1 consumer
thread per topic

Actually, both executors and total consumers can be more than the number of
kafka partitions (some will probably sit idle).

But do away with dStream partitioning altogether.

Right?

Thanks,

- Tim




On Thu, Feb 12, 2015 at 11:03 PM, Saisai Shao sai.sai.s...@gmail.com
wrote:

 Hi Tim,

 I think maybe you can try this way:

 create Receiver per executor and specify thread for each topic large than
 1, and the total number of consumer thread will be: total consumer =
 (receiver number) * (thread number), and make sure this total consumer is
 less than or equal to Kafka partition number. In this case, I think the
 parallelism is enough, received blocks are distributed to each executor. So
 you don't need to repartition to increase the parallelism.

 Besides for Kafka's high-level API, Kafka partitions may not be equally
 distributed to all the receivers, so some tasks may process more data than
 other tasks. another way you can try DirectKafkaInputDStream in Spark 1.3,
 that will be more balanced because each Kafka partition mapping to Spark
 partition.


 Besides set partition count to 1 for each dStream means
 dstream.repartition(1) ? If so I think it will still introduce shuffle and
 move all the data into one partition.

 Thanks
 Saisai

 2015-02-13 13:54 GMT+08:00 Tim Smith secs...@gmail.com:

 TD - I will try count() and report back. Meanwhile, attached is the
 entire driver log that includes the error logs about missing blocks.

 Cody - Let me research a bit about how to do connection pooling. Sorry, I
 am not really a programmer. I did see the connection pooling advise in the
 Spark Streaming Programming guide as an optimization but wasn't sure how to
 implement it. But do you think it will have a significant impact on
 performance?

 Saisai - I think, ideally, I'd rather not do any dStream partitioning.
 Instead have 1 receiver for each kafka partition (so in this case 23
 receivers for 23 kafka partitions) and then have as many or more executors
 to handle processing of the dStreams. Right? Trouble is, I tried this
 approach and didn't work. Even If I set 23 receivers, and set partition
 count to 1 for each dStream (effectively, no stream splitting), my
 performance is extremely poor/laggy. Should I modify my code to remove
 dStream partitioning altogether and then try setting as many receivers as
 kafka partitions?





 On Thu, Feb 12, 2015 at 9:45 PM, Saisai Shao sai.sai.s...@gmail.com
 wrote:

 Hi Tim,

 I think this code will still introduce shuffle even when you call
 repartition on each input stream. Actually this style of implementation
 will generate more jobs (job per each input stream) than union into one
 stream as called DStream.union(), and union normally has no special
 overhead as I understood.

 Also as Cody said, creating Producer per partition could be a potential
 overhead, producer pool or sharing the Producer for one executor might be
 better :).


  // Process stream from each receiver separately
  // (do not attempt to merge all streams and then re-partition,
 this causes un-necessary and high amount of shuffle in the job)
  for (k - kInStreams)
 {
  // Re-partition stream from each receiver across all
 compute nodes to spread out processing load and allows per partition
 processing
  // and, set persistence level to spill to disk along
 with serialization
  val kInMsgParts =
 k.repartition(otherConf(dStreamPartitions).toInt).

 2015-02-13 13:27 GMT+08:00 Cody Koeninger c...@koeninger.org:

 outdata.foreachRDD( rdd = rdd.foreachPartition(rec = {

  val writer = new
 KafkaOutputService(otherConf(kafkaProducerTopic).toString, propsMap)

  writer.output(rec)

 }) )


 So this is creating a new kafka producer for every new output
 partition, right?  Have you tried pooling the producers?

 On Thu, Feb 12, 2015 at 10:52 PM, Tim Smith secs...@gmail.com wrote:

 1) Yes, if I disable writing out to kafka and replace it with some
 very light weight action is rdd.take(1), the app is stable.

 2) The partitions I spoke of in the previous mail are the number of
 partitions I create from each dStream. But yes, since I do processing and
 writing out, per partition, each dStream partition ends up getting written
 to a kafka partition. Flow is, broadly:
 5 Spark/Kafka Receivers - Split each dStream into 30 partitions (150
 partitions) - Apply some transformation logic to each partition - write
 out each partition to kafka (kafka has 23 partitions). Let me increase the
 number

Re: Streaming scheduling delay

2015-02-11 Thread Tim Smith
Just read the thread Are these numbers abnormal for spark streaming? and
I think I am seeing similar results - that is - increasing the window seems
to be the trick here. I will have to monitor for a few hours/days before I
can conclude (there are so many knobs/dials).



On Wed, Feb 11, 2015 at 11:16 PM, Tim Smith secs...@gmail.com wrote:

 On Spark 1.2 (have been seeing this behaviour since 1.0), I have a
 streaming app that consumes data from Kafka and writes it back to Kafka
 (different topic). My big problem has been Total Delay. While execution
 time is usually window size (in seconds), the total delay ranges from a
 minutes to hours(s) (keeps going up).

 For a little while, I thought I had solved the issue by bumping up the
 driver memory. Then I expanded my Kafka cluster to add more nodes and the
 issue came up again. I tried a few things to smoke out the issue and
 something tells me the driver is the bottleneck again:

 1) From my app, I took out the entire write-out-to-kafka piece. Sure
 enough, execution, scheduling delay and hence total delay fell to sub
 second. This assured me that whatever processing I do before writing back
 to kafka isn't the bottleneck.

 2) In my app, I had RDD persistence set at different points but my code
 wasn't really re-using any RDDs so I took out all explicit persist()
 statements. And added, spar...unpersist to true in the context. After
 this, it doesn't seem to matter how much memory I give my executor, the
 total delay seems to be in the same range. I tried per executor memory from
 2G to 12G with no change in total delay so executors aren't memory starved.
 Also, in the SparkUI, under the Executors tab, all executors show 0/1060MB
 used when per executor memory is set to 2GB, for example.

 3) Input rate in the kafka consumer restricts spikes in incoming data.

 4) Tried FIFO and FAIR but didn't make any difference.

 5) Adding executors beyond a certain points seems useless (I guess excess
 ones just sit idle).

 At any given point in time, the SparkUI shows only one batch pending
 processing. So with just one batch pending processing, why would the
 scheduling delay run into minutes/hours if execution time is within the
 batch window duration? There aren't any failed stages or jobs.

 Right now, I have 100 executors ( i have tried setting executors from
 50-150), each with 2GB and 4 cores and the driver running with 16GB. There
 are 5 kafka receivers and each incoming stream is split into 40 partitions.
 Per receiver, input rate is restricted to 2 messages per second.

 Can anyone help me with clues or areas to look into, for troubleshooting
 the issue?

 One nugget I found buried in the code says:
 The scheduler delay includes the network delay to send the task to the
 worker machine and to send back the result (but not the time to fetch the
 task result, if it needed to be fetched from the block manager on the
 worker).

 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala

 Could this be an issue with the driver being a bottlneck? All the
 executors posting their logs/stats to the driver?

 Thanks,

 Tim


















Streaming scheduling delay

2015-02-11 Thread Tim Smith
On Spark 1.2 (have been seeing this behaviour since 1.0), I have a
streaming app that consumes data from Kafka and writes it back to Kafka
(different topic). My big problem has been Total Delay. While execution
time is usually window size (in seconds), the total delay ranges from a
minutes to hours(s) (keeps going up).

For a little while, I thought I had solved the issue by bumping up the
driver memory. Then I expanded my Kafka cluster to add more nodes and the
issue came up again. I tried a few things to smoke out the issue and
something tells me the driver is the bottleneck again:

1) From my app, I took out the entire write-out-to-kafka piece. Sure
enough, execution, scheduling delay and hence total delay fell to sub
second. This assured me that whatever processing I do before writing back
to kafka isn't the bottleneck.

2) In my app, I had RDD persistence set at different points but my code
wasn't really re-using any RDDs so I took out all explicit persist()
statements. And added, spar...unpersist to true in the context. After
this, it doesn't seem to matter how much memory I give my executor, the
total delay seems to be in the same range. I tried per executor memory from
2G to 12G with no change in total delay so executors aren't memory starved.
Also, in the SparkUI, under the Executors tab, all executors show 0/1060MB
used when per executor memory is set to 2GB, for example.

3) Input rate in the kafka consumer restricts spikes in incoming data.

4) Tried FIFO and FAIR but didn't make any difference.

5) Adding executors beyond a certain points seems useless (I guess excess
ones just sit idle).

At any given point in time, the SparkUI shows only one batch pending
processing. So with just one batch pending processing, why would the
scheduling delay run into minutes/hours if execution time is within the
batch window duration? There aren't any failed stages or jobs.

Right now, I have 100 executors ( i have tried setting executors from
50-150), each with 2GB and 4 cores and the driver running with 16GB. There
are 5 kafka receivers and each incoming stream is split into 40 partitions.
Per receiver, input rate is restricted to 2 messages per second.

Can anyone help me with clues or areas to look into, for troubleshooting
the issue?

One nugget I found buried in the code says:
The scheduler delay includes the network delay to send the task to the
worker machine and to send back the result (but not the time to fetch the
task result, if it needed to be fetched from the block manager on the
worker).
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala

Could this be an issue with the driver being a bottlneck? All the executors
posting their logs/stats to the driver?

Thanks,

Tim


Re: Multiple Kafka Receivers and Union

2014-09-25 Thread Tim Smith
Good to know it worked out and thanks for the update. I didn't realize
you need to provision for receiver workers + processing workers. One
would think a worker would process multiple stages of an app/job and
receive is just a stage of the job.



On Thu, Sep 25, 2014 at 12:05 PM, Matt Narrell matt.narr...@gmail.com wrote:
 Additionally,

 If I dial up/down the number of executor cores, this does what I want.
 Thanks for the extra eyes!

 mn

 On Sep 25, 2014, at 12:34 PM, Matt Narrell matt.narr...@gmail.com wrote:

 Tim,

 I think I understand this now.  I had a five node Spark cluster and a five
 partition topic, and I created five receivers.  I found this:
 http://stackoverflow.com/questions/25785581/custom-receiver-stalls-worker-in-spark-streaming
 Indicating that if I use all my workers as receivers, there are none left to
 do the processing.  If I drop the number of partitions/receivers down while
 still having multiple unioned receivers, I see messages.

 mn

 On Sep 25, 2014, at 10:18 AM, Matt Narrell matt.narr...@gmail.com wrote:

 I suppose I have other problems as I can’t get the Scala example to work
 either.  Puzzling, as I have literally coded like the examples (that are
 purported to work), but no luck.

 mn

 On Sep 24, 2014, at 11:27 AM, Tim Smith secs...@gmail.com wrote:

 Maybe differences between JavaPairDStream and JavaPairReceiverInputDStream?

 On Wed, Sep 24, 2014 at 7:46 AM, Matt Narrell matt.narr...@gmail.com
 wrote:

 The part that works is the commented out, single receiver stream below the
 loop.  It seems that when I call KafkaUtils.createStream more than once, I
 don’t receive any messages.

 I’ll dig through the logs, but at first glance yesterday I didn’t see
 anything suspect.  I’ll have to look closer.

 mn

 On Sep 23, 2014, at 6:14 PM, Tim Smith secs...@gmail.com wrote:

 Maybe post the before-code as in what was the code before you did the
 loop (that worked)? I had similar situations where reviewing code
 before (worked) and after (does not work) helped. Also, what helped is
 the Scala REPL because I can see what are the object types being
 returned by each statement.

 Other than code, in the driver logs, you should see events that say
 Registered receiver for stream 0 from
 akka.tcp://sp...@node5.acme.net:53135

 Now, if you goto node5 and look at Spark or YarnContainer logs
 (depending on who's doing RM), you should be able to see if the
 receiver has any errors when trying to talk to kafka.



 On Tue, Sep 23, 2014 at 3:21 PM, Matt Narrell matt.narr...@gmail.com
 wrote:

 To my eyes, these are functionally equivalent.  I’ll try a Scala approach,
 but this may cause waves for me upstream (e.g., non-Java)

 Thanks for looking at this.  If anyone else can see a glaring issue in the
 Java approach that would be appreciated.

 Thanks,
 Matt

 On Sep 23, 2014, at 4:13 PM, Tim Smith secs...@gmail.com wrote:

 Sorry, I am almost Java illiterate but here's my Scala code to do the
 equivalent (that I have tested to work):

 val kInStreams = (1 to 10).map{_ =
 KafkaUtils.createStream(ssc,zkhost.acme.net:2182,myGrp,Map(myTopic
 - 1), StorageLevel.MEMORY_AND_DISK_SER) } //Create 10 receivers
 across the cluster, one for each partition, potentially but active
 receivers are only as many kafka partitions you have

 val kInMsg =
 ssc.union(kInStreams).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)




 On Tue, Sep 23, 2014 at 2:20 PM, Matt Narrell matt.narr...@gmail.com
 wrote:

 So, this is scrubbed some for confidentiality, but the meat of it is as
 follows.  Note, that if I substitute the commented section for the loop, I
 receive messages from the topic.

 SparkConf sparkConf = new SparkConf();
 sparkConf.set(spark.streaming.unpersist, true);
 sparkConf.set(spark.logConf, true);

 MapString, String kafkaProps = new HashMap();
 kafkaProps.put(zookeeper.connect, Constants.ZK_ENSEMBLE + /kafka);
 kafkaProps.put(group.id, groupId);

 JavaStreamingContext jsc = new JavaStreamingContext(sparkConf,
 Seconds.apply(1));
 jsc.checkpoint(hdfs://some_location);

 ListJavaPairDStreamString, ProtobufModel streamList = new
 ArrayList(5);

 for (int i = 0; i  5; i++) {
 streamList.add(KafkaUtils.createStream(jsc,
String.class, ProtobufModel.class,
StringDecoder.class,
 ProtobufModelDecoder.class,
kafkaProps,
Collections.singletonMap(topic, 1),
StorageLevel.MEMORY_ONLY_SER()));
 }

 final JavaPairDStreamString, ProtobufModel stream =
 jsc.union(streamList.get(0), streamList.subList(1, streamList.size()));

 //  final JavaPairReceiverInputDStreamString, ProtobufModel stream =
 //  KafkaUtils.createStream(jsc,
 //  String.class,
 ProtobufModel.class,
 //  StringDecoder.class

Re: Multiple Kafka Receivers and Union

2014-09-24 Thread Tim Smith
Maybe differences between JavaPairDStream and JavaPairReceiverInputDStream?

On Wed, Sep 24, 2014 at 7:46 AM, Matt Narrell matt.narr...@gmail.com wrote:
 The part that works is the commented out, single receiver stream below the 
 loop.  It seems that when I call KafkaUtils.createStream more than once, I 
 don’t receive any messages.

 I’ll dig through the logs, but at first glance yesterday I didn’t see 
 anything suspect.  I’ll have to look closer.

 mn

 On Sep 23, 2014, at 6:14 PM, Tim Smith secs...@gmail.com wrote:

 Maybe post the before-code as in what was the code before you did the
 loop (that worked)? I had similar situations where reviewing code
 before (worked) and after (does not work) helped. Also, what helped is
 the Scala REPL because I can see what are the object types being
 returned by each statement.

 Other than code, in the driver logs, you should see events that say
 Registered receiver for stream 0 from
 akka.tcp://sp...@node5.acme.net:53135

 Now, if you goto node5 and look at Spark or YarnContainer logs
 (depending on who's doing RM), you should be able to see if the
 receiver has any errors when trying to talk to kafka.



 On Tue, Sep 23, 2014 at 3:21 PM, Matt Narrell matt.narr...@gmail.com wrote:
 To my eyes, these are functionally equivalent.  I’ll try a Scala approach, 
 but this may cause waves for me upstream (e.g., non-Java)

 Thanks for looking at this.  If anyone else can see a glaring issue in the 
 Java approach that would be appreciated.

 Thanks,
 Matt

 On Sep 23, 2014, at 4:13 PM, Tim Smith secs...@gmail.com wrote:

 Sorry, I am almost Java illiterate but here's my Scala code to do the
 equivalent (that I have tested to work):

 val kInStreams = (1 to 10).map{_ =
 KafkaUtils.createStream(ssc,zkhost.acme.net:2182,myGrp,Map(myTopic
 - 1), StorageLevel.MEMORY_AND_DISK_SER) } //Create 10 receivers
 across the cluster, one for each partition, potentially but active
 receivers are only as many kafka partitions you have

 val kInMsg = 
 ssc.union(kInStreams).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)




 On Tue, Sep 23, 2014 at 2:20 PM, Matt Narrell matt.narr...@gmail.com 
 wrote:
 So, this is scrubbed some for confidentiality, but the meat of it is as 
 follows.  Note, that if I substitute the commented section for the loop, 
 I receive messages from the topic.

 SparkConf sparkConf = new SparkConf();
 sparkConf.set(spark.streaming.unpersist, true);
 sparkConf.set(spark.logConf, true);

 MapString, String kafkaProps = new HashMap();
 kafkaProps.put(zookeeper.connect, Constants.ZK_ENSEMBLE + /kafka);
 kafkaProps.put(group.id, groupId);

 JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, 
 Seconds.apply(1));
 jsc.checkpoint(hdfs://some_location);

 ListJavaPairDStreamString, ProtobufModel streamList = new 
 ArrayList(5);

 for (int i = 0; i  5; i++) {
   streamList.add(KafkaUtils.createStream(jsc,
  String.class, 
 ProtobufModel.class,
  StringDecoder.class, 
 ProtobufModelDecoder.class,
  kafkaProps,
  Collections.singletonMap(topic, 
 1),
  StorageLevel.MEMORY_ONLY_SER()));
 }

 final JavaPairDStreamString, ProtobufModel stream = 
 jsc.union(streamList.get(0), streamList.subList(1, streamList.size()));

 //  final JavaPairReceiverInputDStreamString, ProtobufModel stream =
 //  KafkaUtils.createStream(jsc,
 //  String.class, 
 ProtobufModel.class,
 //  StringDecoder.class, 
 ProtobufModelDecoder.class,
 //  kafkaProps,
 //  
 Collections.singletonMap(topic, 5),
 //  
 StorageLevel.MEMORY_ONLY_SER());

 final JavaPairDStreamString, Integer tuples = stream.mapToPair(
   new PairFunctionTuple2String, ProtobufModel, String, Integer() {
   @Override
   public Tuple2String, Integer call(Tuple2String, 
 ProtobufModel tuple) throws Exception {
   return new Tuple2(tuple._2().getDeviceId(), 1);
   }
   });

 … and futher Spark functions ...

 On Sep 23, 2014, at 2:55 PM, Tim Smith secs...@gmail.com wrote:

 Posting your code would be really helpful in figuring out gotchas.

 On Tue, Sep 23, 2014 at 9:19 AM, Matt Narrell matt.narr...@gmail.com 
 wrote:
 Hey,

 Spark 1.1.0
 Kafka 0.8.1.1
 Hadoop (YARN/HDFS) 2.5.1

 I have a five partition Kafka topic.  I can create a single Kafka 
 receiver
 via KafkaUtils.createStream with five threads in the topic map and 
 consume
 messages fine.  Sifting through the user list and Google, I see that its
 possible to split the Kafka receiver among the Spark workers such that 
 I can
 have a receiver per topic, and have this distributed to workers rather

Re: Multiple Kafka Receivers and Union

2014-09-23 Thread Tim Smith
Posting your code would be really helpful in figuring out gotchas.

On Tue, Sep 23, 2014 at 9:19 AM, Matt Narrell matt.narr...@gmail.com wrote:
 Hey,

 Spark 1.1.0
 Kafka 0.8.1.1
 Hadoop (YARN/HDFS) 2.5.1

 I have a five partition Kafka topic.  I can create a single Kafka receiver
 via KafkaUtils.createStream with five threads in the topic map and consume
 messages fine.  Sifting through the user list and Google, I see that its
 possible to split the Kafka receiver among the Spark workers such that I can
 have a receiver per topic, and have this distributed to workers rather than
 localized to the driver.  I’m following something like this:
 https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java#L132
 But for Kafka obviously.  From the Streaming Programming Guide “ Receiving
 multiple data streams can therefore be achieved by creating multiple input
 DStreams and configuring them to receive different partitions of the data
 stream from the source(s).

 However, I’m not able to consume any messages from Kafka after I perform the
 union operation.  Again, if I create a single, multi-threaded, receiver I
 can consume messages fine.  If I create 5 receivers in a loop, and call
 jssc.union(…) i get:

 INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks
 INFO scheduler.ReceiverTracker: Stream 1 received 0 blocks
 INFO scheduler.ReceiverTracker: Stream 2 received 0 blocks
 INFO scheduler.ReceiverTracker: Stream 3 received 0 blocks
 INFO scheduler.ReceiverTracker: Stream 4 received 0 blocks


 Do I need to do anything to the unioned DStream?  Am I going about this
 incorrectly?

 Thanks in advance.

 Matt

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Multiple Kafka Receivers and Union

2014-09-23 Thread Tim Smith
Sorry, I am almost Java illiterate but here's my Scala code to do the
equivalent (that I have tested to work):

val kInStreams = (1 to 10).map{_ =
KafkaUtils.createStream(ssc,zkhost.acme.net:2182,myGrp,Map(myTopic
- 1), StorageLevel.MEMORY_AND_DISK_SER) } //Create 10 receivers
across the cluster, one for each partition, potentially but active
receivers are only as many kafka partitions you have

val kInMsg = 
ssc.union(kInStreams).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)




On Tue, Sep 23, 2014 at 2:20 PM, Matt Narrell matt.narr...@gmail.com wrote:
 So, this is scrubbed some for confidentiality, but the meat of it is as 
 follows.  Note, that if I substitute the commented section for the loop, I 
 receive messages from the topic.

 SparkConf sparkConf = new SparkConf();
 sparkConf.set(spark.streaming.unpersist, true);
 sparkConf.set(spark.logConf, true);

 MapString, String kafkaProps = new HashMap();
 kafkaProps.put(zookeeper.connect, Constants.ZK_ENSEMBLE + /kafka);
 kafkaProps.put(group.id, groupId);

 JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, 
 Seconds.apply(1));
 jsc.checkpoint(hdfs://some_location);

 ListJavaPairDStreamString, ProtobufModel streamList = new ArrayList(5);

 for (int i = 0; i  5; i++) {
 streamList.add(KafkaUtils.createStream(jsc,
String.class, ProtobufModel.class,
StringDecoder.class, 
 ProtobufModelDecoder.class,
kafkaProps,
Collections.singletonMap(topic, 1),
StorageLevel.MEMORY_ONLY_SER()));
 }

 final JavaPairDStreamString, ProtobufModel stream = 
 jsc.union(streamList.get(0), streamList.subList(1, streamList.size()));

 //  final JavaPairReceiverInputDStreamString, ProtobufModel stream =
 //  KafkaUtils.createStream(jsc,
 //  String.class, ProtobufModel.class,
 //  StringDecoder.class, 
 ProtobufModelDecoder.class,
 //  kafkaProps,
 //  Collections.singletonMap(topic, 
 5),
 //  StorageLevel.MEMORY_ONLY_SER());

 final JavaPairDStreamString, Integer tuples = stream.mapToPair(
 new PairFunctionTuple2String, ProtobufModel, String, Integer() {
 @Override
 public Tuple2String, Integer call(Tuple2String, ProtobufModel 
 tuple) throws Exception {
 return new Tuple2(tuple._2().getDeviceId(), 1);
 }
 });

 … and futher Spark functions ...

 On Sep 23, 2014, at 2:55 PM, Tim Smith secs...@gmail.com wrote:

 Posting your code would be really helpful in figuring out gotchas.

 On Tue, Sep 23, 2014 at 9:19 AM, Matt Narrell matt.narr...@gmail.com wrote:
 Hey,

 Spark 1.1.0
 Kafka 0.8.1.1
 Hadoop (YARN/HDFS) 2.5.1

 I have a five partition Kafka topic.  I can create a single Kafka receiver
 via KafkaUtils.createStream with five threads in the topic map and consume
 messages fine.  Sifting through the user list and Google, I see that its
 possible to split the Kafka receiver among the Spark workers such that I can
 have a receiver per topic, and have this distributed to workers rather than
 localized to the driver.  I’m following something like this:
 https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java#L132
 But for Kafka obviously.  From the Streaming Programming Guide “ Receiving
 multiple data streams can therefore be achieved by creating multiple input
 DStreams and configuring them to receive different partitions of the data
 stream from the source(s).

 However, I’m not able to consume any messages from Kafka after I perform the
 union operation.  Again, if I create a single, multi-threaded, receiver I
 can consume messages fine.  If I create 5 receivers in a loop, and call
 jssc.union(…) i get:

 INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks
 INFO scheduler.ReceiverTracker: Stream 1 received 0 blocks
 INFO scheduler.ReceiverTracker: Stream 2 received 0 blocks
 INFO scheduler.ReceiverTracker: Stream 3 received 0 blocks
 INFO scheduler.ReceiverTracker: Stream 4 received 0 blocks


 Do I need to do anything to the unioned DStream?  Am I going about this
 incorrectly?

 Thanks in advance.

 Matt

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Stable spark streaming app

2014-09-18 Thread Tim Smith
Dibyendu - I am using the Kafka consumer built into Spark streaming.
Pulled the jar from here:
http://search.maven.org/remotecontent?filepath=org/apache/spark/spark-streaming-kafka_2.10/1.0.0/spark-streaming-kafka_2.10-1.0.0.jar

Thanks for the sbt-assembly link, Soumitra.

On Wed, Sep 17, 2014 at 5:50 PM, Dibyendu Bhattacharya
dibyendu.bhattach...@gmail.com wrote:
 Hi Tim

 Just curious to know ; Which Kafka Consumer you have used ?

 Dib

 On Sep 18, 2014 4:40 AM, Tim Smith secs...@gmail.com wrote:

 Thanks :)

 On Wed, Sep 17, 2014 at 2:10 PM, Paul Wais pw...@yelp.com wrote:
  Thanks Tim, this is super helpful!
 
  Question about jars and spark-submit:  why do you provide
  myawesomeapp.jar as the program jar but then include other jars via
  the --jars argument?  Have you tried building one uber jar with all
  dependencies and just sending that to Spark as your app jar?

 I guess that is mostly because I am Scala/sbt noob :) How do I create
 the uber jar? My .sbt file says:
 name := My Awesome App
 version := 1.025
 scalaVersion := 2.10.4
 resolvers += Apache repo at
 https://repository.apache.org/content/repositories/releases;
 libraryDependencies += org.apache.spark %% spark-core % 1.0.0
 libraryDependencies += org.apache.spark %% spark-streaming % 1.0.0
 libraryDependencies += org.apache.spark %% spark-streaming-kafka %
 1.0.0
 libraryDependencies += org.apache.kafka %% kafka % 0.8.1.1

 Then I run sbt package to generate myawesomeapp.jar.

 
  Also, have you ever seen any issues with Spark caching your app jar
  between runs even if it changes?

 Not that I can tell but then maybe because I use Yarn, I might be
 shielded from some jar distribution bugs in Spark?


 
  On Wed, Sep 17, 2014 at 1:11 PM, Tim Smith secs...@gmail.com wrote:
  I don't have anything in production yet but I now at least have a
  stable (running for more than 24 hours) streaming app. Earlier, the
  app would crash for all sorts of reasons. Caveats/setup:
  - Spark 1.0.0 (I have no input flow control unlike Spark 1.1)
  - Yarn for RM
  - Input and Output to Kafka
  - CDH 5.1
  - 11 node cluster with 32-cores and 48G max container size for each
  node (Yarn managed)
  - 5 partition Kafka topic - both in and out
  - Roughly, an average of 25k messages per second
  - App written in Scala (warning: I am a Scala noob)
 
  Few things I had to add/tweak to get the app to be stable:
  - The executor JVMs did not have any GC options set, by default. This
  might be more of a CDH issue. I noticed that while the Yarn container
  and other Spark ancillary tasks had GC options set at launch but none
  for the executors. So I played with different GC options and this
  worked best:
  SPARK_JAVA_OPTS=-XX:MaxPermSize=512m -XX:NewSize=1024m
  -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70
  -XX:+AggressiveHeap -XX:MaxHeapFreeRatio=70 -verbosegc
  -XX:+PrintGCDetails
 
  I tried G1GC but for some reason it just didn't work. I am not a Java
  programmer or expert so my conclusion is purely trial and error based.
  The GC logs, with these flags, go to the stdout file in the Yarn
  container logs on each node/worker. You can set SPARK_JAVA_OPTS in
  spark-env.sh on the driver node and Yarn will respect these. On CDH/CM
  specifically, even though you don't run Spark as a service (since you
  are using Yarn for RM), you can goto Spark Client Advanced
  Configuration Snippet (Safety Valve) for spark-conf/spark-env.sh and
  set SPARK_JAVA_OPTS there.
 
  - Set these two params - spark.yarn.executor.memoryOverhead
  spark.yarn.driver.memoryOverhead. Earlier, my app would get killed
  because the executors running the kafka receivers would get killed by
  Yarn for over utilization of memory. Now, these are my memory settings
  (I will paste the entire app launch params later in the email):
  --driver-memory 2G \
  --executor-memory 16G \
  --spark.yarn.executor.memoryOverhead 4096 \
  --spark.yarn.driver.memoryOverhead 1024 \
 
  Your total executor JVM will consume executor-memory minus
  spark.yarn.executor.memoryOverhead so you should see each executor
  JVM consuming no more than 12G, in this case.
 
  Here is how I launch my app:
  run=`date +%m-%d-%YT%T`; \
  nohup spark-submit --class myAwesomeApp \
  --master yarn myawesomeapp.jar \
  --jars
  spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar
  \
  --driver-memory 2G \
  --executor-memory 16G \
  --executor-cores 16 \
  --num-executors 10 \
  --spark.serializer org.apache.spark.serializer.KryoSerializer \
  --spark.rdd.compress true \
  --spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec
  \
  --spark.akka.threads 64 \
  --spark.akka.frameSize 500 \
  --spark.task.maxFailures 64 \
  --spark.scheduler.mode FAIR \
  --spark.yarn.executor.memoryOverhead 4096 \
  --spark.yarn.driver.memoryOverhead 1024 \
  --spark.shuffle.consolidateFiles true \
  --spark.default.parallelism 528

Re: Kafka Spark Streaming on Spark 1.1

2014-09-18 Thread Tim Smith
What kafka receiver are you using? Did you build a new jar for your
app with the latest streaming-kafka code for 1.1?


On Thu, Sep 18, 2014 at 11:47 AM, JiajiaJing jj.jing0...@gmail.com wrote:
 Hi Spark Users,

 We just upgrade our spark version from 1.0 to 1.1. And we are trying to
 re-run all the written and tested projects we implemented on Spark 1.0.
 However, when we try to execute the spark streaming project that stream data
 from Kafka topics, it yields the following error message. I have no idea
 about why this occurs because the same project runs successfully with Spark
 1.0.
 May I get some help on this please?

 Thank you very much!


 2014-09-18 11:06:08,841 ERROR [sparkDriver-akka.actor.default-dispatcher-4]
 scheduler.ReceiverTracker (Logging.scala:logError(75)) - Deregistered
 receiver for stream 0: Error starting receiver 0 -
 java.lang.AbstractMethodError
 at org.apache.spark.Logging$class.log(Logging.scala:52)
 at
 org.apache.spark.streaming.kafka.KafkaReceiver.log(KafkaInputDStream.scala:66)
 at org.apache.spark.Logging$class.logInfo(Logging.scala:59)
 at
 org.apache.spark.streaming.kafka.KafkaReceiver.logInfo(KafkaInputDStream.scala:66)
 at
 org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:86)
 at
 org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
 at
 org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
 at
 org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
 at
 org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
 at
 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
 at
 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
 at org.apache.spark.scheduler.Task.run(Task.scala:54)
 at 
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
 at java.lang.Thread.run(Thread.java:662)




 Best Regards,

 Jiajia



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Spark-Streaming-on-Spark-1-1-tp14597.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Stable spark streaming app

2014-09-17 Thread Tim Smith
 at 8:30 AM, Soumitra Kumar
kumar.soumi...@gmail.com wrote:
 Hmm, no response to this thread!

 Adding to it, please share experiences of building an enterprise grade 
 product based on Spark Streaming.

 I am exploring Spark Streaming for enterprise software and am cautiously 
 optimistic about it. I see huge potential to improve debuggability of Spark.

 - Original Message -
 From: Tim Smith secs...@gmail.com
 To: spark users user@spark.apache.org
 Sent: Friday, September 12, 2014 10:09:53 AM
 Subject: Stable spark streaming app

 Hi,

 Anyone have a stable streaming app running in production? Can you
 share some overview of the app and setup like number of nodes, events
 per second, broad stream processing workflow, config highlights etc?

 Thanks,

 Tim

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Stable spark streaming app

2014-09-17 Thread Tim Smith
Thanks :)

On Wed, Sep 17, 2014 at 2:10 PM, Paul Wais pw...@yelp.com wrote:
 Thanks Tim, this is super helpful!

 Question about jars and spark-submit:  why do you provide
 myawesomeapp.jar as the program jar but then include other jars via
 the --jars argument?  Have you tried building one uber jar with all
 dependencies and just sending that to Spark as your app jar?

I guess that is mostly because I am Scala/sbt noob :) How do I create
the uber jar? My .sbt file says:
name := My Awesome App
version := 1.025
scalaVersion := 2.10.4
resolvers += Apache repo at
https://repository.apache.org/content/repositories/releases;
libraryDependencies += org.apache.spark %% spark-core % 1.0.0
libraryDependencies += org.apache.spark %% spark-streaming % 1.0.0
libraryDependencies += org.apache.spark %% spark-streaming-kafka % 1.0.0
libraryDependencies += org.apache.kafka %% kafka % 0.8.1.1

Then I run sbt package to generate myawesomeapp.jar.


 Also, have you ever seen any issues with Spark caching your app jar
 between runs even if it changes?

Not that I can tell but then maybe because I use Yarn, I might be
shielded from some jar distribution bugs in Spark?



 On Wed, Sep 17, 2014 at 1:11 PM, Tim Smith secs...@gmail.com wrote:
 I don't have anything in production yet but I now at least have a
 stable (running for more than 24 hours) streaming app. Earlier, the
 app would crash for all sorts of reasons. Caveats/setup:
 - Spark 1.0.0 (I have no input flow control unlike Spark 1.1)
 - Yarn for RM
 - Input and Output to Kafka
 - CDH 5.1
 - 11 node cluster with 32-cores and 48G max container size for each
 node (Yarn managed)
 - 5 partition Kafka topic - both in and out
 - Roughly, an average of 25k messages per second
 - App written in Scala (warning: I am a Scala noob)

 Few things I had to add/tweak to get the app to be stable:
 - The executor JVMs did not have any GC options set, by default. This
 might be more of a CDH issue. I noticed that while the Yarn container
 and other Spark ancillary tasks had GC options set at launch but none
 for the executors. So I played with different GC options and this
 worked best:
 SPARK_JAVA_OPTS=-XX:MaxPermSize=512m -XX:NewSize=1024m
 -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70
 -XX:+AggressiveHeap -XX:MaxHeapFreeRatio=70 -verbosegc
 -XX:+PrintGCDetails

 I tried G1GC but for some reason it just didn't work. I am not a Java
 programmer or expert so my conclusion is purely trial and error based.
 The GC logs, with these flags, go to the stdout file in the Yarn
 container logs on each node/worker. You can set SPARK_JAVA_OPTS in
 spark-env.sh on the driver node and Yarn will respect these. On CDH/CM
 specifically, even though you don't run Spark as a service (since you
 are using Yarn for RM), you can goto Spark Client Advanced
 Configuration Snippet (Safety Valve) for spark-conf/spark-env.sh and
 set SPARK_JAVA_OPTS there.

 - Set these two params - spark.yarn.executor.memoryOverhead
 spark.yarn.driver.memoryOverhead. Earlier, my app would get killed
 because the executors running the kafka receivers would get killed by
 Yarn for over utilization of memory. Now, these are my memory settings
 (I will paste the entire app launch params later in the email):
 --driver-memory 2G \
 --executor-memory 16G \
 --spark.yarn.executor.memoryOverhead 4096 \
 --spark.yarn.driver.memoryOverhead 1024 \

 Your total executor JVM will consume executor-memory minus
 spark.yarn.executor.memoryOverhead so you should see each executor
 JVM consuming no more than 12G, in this case.

 Here is how I launch my app:
 run=`date +%m-%d-%YT%T`; \
 nohup spark-submit --class myAwesomeApp \
 --master yarn myawesomeapp.jar \
 --jars 
 spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar
 \
 --driver-memory 2G \
 --executor-memory 16G \
 --executor-cores 16 \
 --num-executors 10 \
 --spark.serializer org.apache.spark.serializer.KryoSerializer \
 --spark.rdd.compress true \
 --spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec \
 --spark.akka.threads 64 \
 --spark.akka.frameSize 500 \
 --spark.task.maxFailures 64 \
 --spark.scheduler.mode FAIR \
 --spark.yarn.executor.memoryOverhead 4096 \
 --spark.yarn.driver.memoryOverhead 1024 \
 --spark.shuffle.consolidateFiles true \
 --spark.default.parallelism 528 \
logs/normRunLog-$run.log \
 2logs/normRunLogError-$run.log  \
 echo $!  logs/current-run.pid

 Some code optimizations (or, goof ups that I fixed). I did not
 scientifically measure the impact of each but I think they helped:
 - Made all my classes and objects serializable and then use Kryo (as
 you see above)
 - I map one receive task for each kafka partition
 - Instead of doing a union on all the incoming streams and then
 repartition() I now repartition() each incoming stream and process
 them separately. I believe this reduces shuffle.
 - Reduced number of repartitions. I was doing 128 after

Re: Low Level Kafka Consumer for Spark

2014-09-15 Thread Tim Smith
Hi Dibyendu,

I am a little confused about the need for rate limiting input from
kafka. If the stream coming in from kafka has higher message/second
rate than what a Spark job can process then it should simply build a
backlog in Spark if the RDDs are cached on disk using persist().
Right?

Thanks,

Tim


On Mon, Sep 15, 2014 at 4:33 AM, Dibyendu Bhattacharya
dibyendu.bhattach...@gmail.com wrote:
 Hi Alon,

 No this will not be guarantee that same set of messages will come in same
 RDD. This fix just re-play the messages from last processed offset in same
 order. Again this is just a interim fix we needed to solve our use case . If
 you do not need this message re-play feature, just do not perform the ack (
 Acknowledgement) call in the Driver code. Then the processed messages will
 not be written to ZK and hence replay will not happen.

 Regards,
 Dibyendu

 On Mon, Sep 15, 2014 at 4:48 PM, Alon Pe'er alo...@supersonicads.com
 wrote:

 Hi Dibyendu,

 Thanks for your great work!

 I'm new to Spark Streaming, so I just want to make sure I understand
 Driver
 failure issue correctly.

 In my use case, I want to make sure that messages coming in from Kafka are
 always broken into the same set of RDDs, meaning that if a set of messages
 are assigned to one RDD, and the Driver dies before this RDD is processed,
 then once the Driver recovers, the same set of messages are assigned to a
 single RDD, instead of arbitrarily repartitioning the messages across
 different RDDs.

 Does your Receiver guarantee this behavior, until the problem is fixed in
 Spark 1.2?

 Regards,
 Alon



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p14233.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Stable spark streaming app

2014-09-12 Thread Tim Smith
Hi,

Anyone have a stable streaming app running in production? Can you
share some overview of the app and setup like number of nodes, events
per second, broad stream processing workflow, config highlights etc?

Thanks,

Tim

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Some Serious Issue with Spark Streaming ? Blocks Getting Removed and Jobs have Failed..

2014-09-12 Thread Tim Smith
Similar issue (Spark 1.0.0). Streaming app runs for a few seconds
before these errors start to pop all over the driver logs:

14/09/12 17:30:23 WARN TaskSetManager: Loss was due to java.lang.Exception
java.lang.Exception: Could not compute split, block
input-4-1410542878200 not found
at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at org.apache.spark.rdd.UnionPartition.iterator(UnionRDD.scala:33)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:74)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:77)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.Task.run(Task.scala:51)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

I am using MEMORY_AND_DISK_SER for all my RDDs so I should not be
losing any blocks unless I run out of disk space, right?



On Fri, Sep 12, 2014 at 5:24 AM, Dibyendu Bhattacharya
dibyendu.bhattach...@gmail.com wrote:
 I agree,

 Even the Low Level Kafka Consumer which I have written has tunable IO
 throttling which help me solve this issue ... But question remains , even if
 there are large backlog, why Spark drop the unprocessed memory blocks ?

 Dib

 On Fri, Sep 12, 2014 at 5:47 PM, Jeoffrey Lim jeoffr...@gmail.com wrote:

 Our issue could be related to this problem as described in:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-in-1-hour-batch-duration-RDD-files-gets-lost-td14027.html
 which the DStream is processed for every 1 hour batch duration.

 I have implemented IO throttling in the Receiver as well in our Kafka
 consumer, and our backlog is not that large.

 NFO : org.apache.spark.storage.MemoryStore - 1 blocks selected for
 dropping
 INFO : org.apache.spark.storage.BlockManager - Dropping block
 input-0-1410443074600 from memory
 INFO : org.apache.spark.storage.MemoryStore - Block input-0-1410443074600
 of size 12651900 dropped from memory (free 21220667)
 INFO : org.apache.spark.storage.BlockManagerInfo - Removed
 input-0-1410443074600 on ip-10-252-5-113.asskickery.us:53752 in memory
 (size: 12.1 MB, free: 100.6 MB)

 The question that I have now is: how to prevent the
 MemoryStore/BlockManager of dropping the block inputs? And should they be
 logged in the level WARN/ERROR?


 Thanks.


 On Fri, Sep 12, 2014 at 4:45 PM, Dibyendu Bhattacharya [via Apache Spark
 User List] [hidden email] wrote:

 Dear all,

 I am sorry. This was a false alarm

 There was some issue in the RDD processing logic which leads to large
 backlog. Once I fixed the issues in my processing logic, I can see all
 messages being pulled nicely without any Block Removed error. I need to tune
 certain configurations in my Kafka Consumer to modify the data rate and also
 the batch size.

 Sorry again.


 Regards,
 Dibyendu

 On Thu, Sep 11, 2014 at 8:13 PM, Nan Zhu [hidden email] wrote:

 This is my case about broadcast variable:

 14/07/21 19:49:13 INFO Executor: Running task ID 4
 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 2)
 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 2 in 95 ms on
 localhost (progress: 3/106)
 14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for
 hdfstest_customers
 14/07/21 19:49:13 INFO Executor: Serialized size of result for 3 is 596
 14/07/21 19:49:13 INFO Executor: Sending result for 3 directly to driver
 14/07/21 19:49:13 INFO BlockManager: Found block broadcast_0 locally
 14/07/21 19:49:13 INFO Executor: Finished task ID 3
 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:5 as TID 5 on
 executor localhost: localhost (PROCESS_LOCAL)
 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:5 as 11885
 bytes in 0 ms
 14/07/21 19:49:13 INFO Executor: Running task ID 5
 14/07/21 19:49:13 INFO BlockManager: Removing broadcast 0
 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 3)
 14/07/21 19:49:13 INFO ContextCleaner: Cleaned broadcast 0
 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 3 in 97 ms on
 localhost (progress: 4/106)
 14/07/21 19:49:13 INFO BlockManager: Found block broadcast_0 locally
 14/07/21 19:49:13 INFO 

Where do logs go in StandAlone mode

2014-09-12 Thread Tim Smith
Spark 1.0.0

I write logs out from my app using this object:

object LogService extends Logging {

/** Set reasonable logging levels for streaming if the user has not
configured log4j. */
 def setStreamingLogLevels() {
val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
if (!log4jInitialized) {
  // We first log something to initialize Spark's default logging,
then we override the
  // logging level.
  logInfo(Setting log level to [WARN] for streaming example. +
 To override add a custom log4j.properties to the classpath.)
  Logger.getRootLogger.setLevel(Level.WARN)
}
  }
}

Later, I set LogService.setStreamingLogLevels() and then use logInfo etc.

This works well when I run the app under Yarn, all the logs show up
under the container logs but when I run the app in Standalone mode, I
can't find these logs in neither the master, worker or driver logs. So
where do they go?

Thanks,

Tim

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Executor garbage collection

2014-09-12 Thread Tim Smith
Hi,

Anyone setting any explicit GC options for the executor jvm? If yes,
what and how did you arrive at them?

Thanks,

- Tim

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Announcing Spark 1.1.0!

2014-09-11 Thread Tim Smith
Thanks for all the good work. Very excited about seeing more features and
better stability in the framework.


On Thu, Sep 11, 2014 at 5:12 PM, Patrick Wendell pwend...@gmail.com wrote:

 I am happy to announce the availability of Spark 1.1.0! Spark 1.1.0 is
 the second release on the API-compatible 1.X line. It is Spark's
 largest release ever, with contributions from 171 developers!

 This release brings operational and performance improvements in Spark
 core including a new implementation of the Spark shuffle designed for
 very large scale workloads. Spark 1.1 adds significant extensions to
 the newest Spark modules, MLlib and Spark SQL. Spark SQL introduces a
 JDBC server, byte code generation for fast expression evaluation, a
 public types API, JSON support, and other features and optimizations.
 MLlib introduces a new statistics library along with several new
 algorithms and optimizations. Spark 1.1 also builds out Spark's Python
 support and adds new components to the Spark Streaming module.

 Visit the release notes [1] to read about the new features, or
 download [2] the release today.

 [1] http://spark.eu.apache.org/releases/spark-release-1-1-0.html
 [2] http://spark.eu.apache.org/downloads.html

 NOTE: SOME ASF DOWNLOAD MIRRORS WILL NOT CONTAIN THE RELEASE FOR SEVERAL
 HOURS.

 Please e-mail me directly for any type-o's in the release notes or name
 listing.

 Thanks, and congratulations!
 - Patrick

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: How to scale more consumer to Kafka stream

2014-09-10 Thread Tim Smith
How are you creating your kafka streams in Spark?

If you have 10 partitions for a topic, you can call createStream ten
times to create 10 parallel receivers/executors and then use union to
combine all the dStreams.



On Wed, Sep 10, 2014 at 7:16 AM, richiesgr richie...@gmail.com wrote:

 Hi (my previous post as been used by someone else)

 I'm building a application the read from kafka stream event. In production
 we've 5 consumers that share 10 partitions.
 But on spark streaming kafka only 1 worker act as a consumer then
 distribute
 the tasks to workers so I can have only 1 machine acting as consumer but I
 need more because only 1 consumer means Lags.

 Do you've any idea what I can do ? Another point is interresting the master
 is not loaded at all I can get up more than 10 % CPU

 I've tried to increase the queued.max.message.chunks on the kafka client to
 read more records thinking it'll speed up the read but I only get

 ERROR consumer.ConsumerFetcherThread:

 [ConsumerFetcherThread-SparkEC2_ip-10-138-59-194.ec2.internal-1410182950783-5c49c8e8-0-174167372],
 Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 73; ClientId:

 SparkEC2-ConsumerFetcherThread-SparkEC2_ip-10-138-59-194.ec2.internal-1410182950783-5c49c8e8-0-174167372;
 ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: [IA2,7] -
 PartitionFetchInfo(929838589,1048576),[IA2,6] -
 PartitionFetchInfo(929515796,1048576),[IA2,9] -
 PartitionFetchInfo(929577946,1048576),[IA2,8] -
 PartitionFetchInfo(930751599,1048576),[IA2,2] -
 PartitionFetchInfo(926457704,1048576),[IA2,5] -
 PartitionFetchInfo(930774385,1048576),[IA2,0] -
 PartitionFetchInfo(929913213,1048576),[IA2,3] -
 PartitionFetchInfo(929268891,1048576),[IA2,4] -
 PartitionFetchInfo(929949877,1048576),[IA2,1] -
 PartitionFetchInfo(930063114,1048576)
 java.lang.OutOfMemoryError: Java heap space

 Is someone have ideas ?
 Thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-scale-more-consumer-to-Kafka-stream-tp13883.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: spark.cleaner.ttl and spark.streaming.unpersist

2014-09-10 Thread Tim Smith
I am using Spark 1.0.0 (on CDH 5.1) and have a similar issue. In my case,
the receivers die within an hour because Yarn kills the containers for high
memory usage. I set ttl.cleaner to 30 seconds but that didn't help. So I
don't think stale RDDs are an issue here. I did a jmap -histo on a couple
of running receiver processes and in a heap of 30G, roughly ~16G is taken
by [B which is byte arrays.

Still investigating more and would appreciate pointers for troubleshooting.
I have dumped the heap of a receiver and will try to go over it.




On Wed, Sep 10, 2014 at 1:43 AM, Luis Ángel Vicente Sánchez 
langel.gro...@gmail.com wrote:

 I somehow missed that parameter when I was reviewing the documentation,
 that should do the trick! Thank you!

 2014-09-10 2:10 GMT+01:00 Shao, Saisai saisai.s...@intel.com:

  Hi Luis,



 The parameter “spark.cleaner.ttl” and “spark.streaming.unpersist” can be
 used to remove useless timeout streaming data, the difference is that
 “spark.cleaner.ttl” is time-based cleaner, it does not only clean streaming
 input data, but also Spark’s useless metadata; while
 “spark.streaming.unpersist” is reference-based cleaning mechanism,
 streaming data will be removed when out of slide duration.



 Both these two parameter can alleviate the memory occupation of Spark
 Streaming. But if the data is flooded into Spark Streaming when start up
 like your situation using Kafka, these two parameters cannot well mitigate
 the problem. Actually you need to control the input data rate to not inject
 so fast, you can try “spark.straming.receiver.maxRate” to control the
 inject rate.



 Thanks

 Jerry



 *From:* Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com]
 *Sent:* Wednesday, September 10, 2014 5:21 AM
 *To:* user@spark.apache.org
 *Subject:* spark.cleaner.ttl and spark.streaming.unpersist



 The executors of my spark streaming application are being killed due to
 memory issues. The memory consumption is quite high on startup because is
 the first run and there are quite a few events on the kafka queues that are
 consumed at a rate of 100K events per sec.

 I wonder if it's recommended to use spark.cleaner.ttl and
 spark.streaming.unpersist together to mitigate that problem. And I also
 wonder if new RDD are being batched while a RDD is being processed.

 Regards,

 Luis





Re: how to choose right DStream batch interval

2014-09-10 Thread Tim Smith
http://www.slideshare.net/spark-project/deep-divewithsparkstreaming-tathagatadassparkmeetup20130617

Slide 39 covers it.

On Tue, Sep 9, 2014 at 9:23 PM, qihong qc...@pivotal.io wrote:

 Hi Mayur,

 Thanks for your response. I did write a simple test that set up a DStream
 with
 5 batches; The batch duration is 1 second, and the 3rd batch will take
 extra
 2 seconds, the output of the test shows that the 3rd batch causes backlog,
 and spark streaming does catch up on 4th and 5th batch (DStream.print
 was modified to output system time)

 ---
 Time: 1409959708000 ms, system time: 1409959708269
 ---
 1155
 ---
 Time: 1409959709000 ms, system time: 1409959709033
 ---
 2255
 delay 2000 ms
 ---
 Time: 140995971 ms, system time: 1409959712036
 ---
 3355
 ---
 Time: 1409959711000 ms, system time: 1409959712059
 ---
 4455
 ---
 Time: 1409959712000 ms, system time: 1409959712083
 ---
 

 Thanks!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/how-to-choose-right-DStream-batch-interval-tp13578p13855.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: spark-streaming Could not compute split exception

2014-09-10 Thread Tim Smith
I had a similar issue and many others - all were basically symptoms for
yarn killing the container for high memory usage. Haven't gotten to root
cause yet.

On Tue, Sep 9, 2014 at 3:18 PM, Marcelo Vanzin van...@cloudera.com wrote:

 Your executor is exiting or crashing unexpectedly:

 On Tue, Sep 9, 2014 at 3:13 PM, Penny Espinoza
 pesp...@societyconsulting.com wrote:
  org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit
  code from container container_1410224367331_0006_01_03 is : 1
  2014-09-09 21:47:26,345 WARN
  org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor:
  Exception from container-launch with container ID:
  container_1410224367331_0006_01_03 and exit code: 1

 You can check the app logs (yarn logs --applicationId [id]) and see
 why the container is exiting. There's probably an exception happening
 somewhere.


 --
 Marcelo

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: spark.cleaner.ttl and spark.streaming.unpersist

2014-09-10 Thread Tim Smith
I switched from Yarn to StandAlone mode and haven't had OOM issue yet.
However, now I have Akka issues killing the executor:

2014-09-11 02:43:34,543 INFO akka.actor.LocalActorRef: Message
[akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying]
from Actor[akka://sparkWorker/deadLetters] to
Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkWorker%4010.2.16.8%3A44405-6#1549270895]
was not delivered. [2] dead letters encountered. This logging can be
turned off or adjusted with configuration settings
'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

Before I switched from Yarn to Standalone, I tried looking at heaps of
running executors. What I found odd was that while both - jmap
histo:live and jmap histo showed heap usage in few hundreds of MBytes,
Yarn kept showing that memory utilization is in several Gigabytes -
eventually leading to the container being killed.

I would appreciate if someone can duplicate what I am seeing. Basically:
1. Tail your yarn container logs and see what it is reporting as
memory used by the JVM
2. In parallel, run jmap -histo:live pid or jmap histo pid on
the executor process.

They should be about the same, right?

Also, in the heap dump, 99% of the heap seems to be occupied with
unreachable objects (and most of it is byte arrays).




On Wed, Sep 10, 2014 at 12:06 PM, Tim Smith secs...@gmail.com wrote:
 Actually, I am not doing any explicit shuffle/updateByKey or other
 transform functions. In my program flow, I take in data from Kafka,
 match each message against a list of regex and then if a msg matches a
 regex then extract groups, stuff them in json and push out back to
 kafka (different topic). So there is really no dependency between two
 messages in terms of processing. Here's my container histogram:
 http://pastebin.com/s3nAT3cY

 Essentially, my app is a cluster grep on steroids.



 On Wed, Sep 10, 2014 at 11:34 AM, Yana Kadiyska yana.kadiy...@gmail.com 
 wrote:
 Tim, I asked a similar question twice:
 here
 http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-executors-to-stay-alive-tt12940.html
 and here
 http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Executor-OOM-tt12383.html

 and have not yet received any responses. I noticed that the heapdump only
 contains a very large byte array consuming about 66%(the second link
 contains a picture of my heap -- I ran with a small heap to be able to get
 the failure quickly)

 I don't have solutions but wanted to affirm that I've observed a similar
 situation...

 On Wed, Sep 10, 2014 at 2:24 PM, Tim Smith secs...@gmail.com wrote:

 I am using Spark 1.0.0 (on CDH 5.1) and have a similar issue. In my case,
 the receivers die within an hour because Yarn kills the containers for high
 memory usage. I set ttl.cleaner to 30 seconds but that didn't help. So I
 don't think stale RDDs are an issue here. I did a jmap -histo on a couple
 of running receiver processes and in a heap of 30G, roughly ~16G is taken by
 [B which is byte arrays.

 Still investigating more and would appreciate pointers for
 troubleshooting. I have dumped the heap of a receiver and will try to go
 over it.




 On Wed, Sep 10, 2014 at 1:43 AM, Luis Ángel Vicente Sánchez
 langel.gro...@gmail.com wrote:

 I somehow missed that parameter when I was reviewing the documentation,
 that should do the trick! Thank you!

 2014-09-10 2:10 GMT+01:00 Shao, Saisai saisai.s...@intel.com:

 Hi Luis,



 The parameter “spark.cleaner.ttl” and “spark.streaming.unpersist” can be
 used to remove useless timeout streaming data, the difference is that
 “spark.cleaner.ttl” is time-based cleaner, it does not only clean 
 streaming
 input data, but also Spark’s useless metadata; while
 “spark.streaming.unpersist” is reference-based cleaning mechanism, 
 streaming
 data will be removed when out of slide duration.



 Both these two parameter can alleviate the memory occupation of Spark
 Streaming. But if the data is flooded into Spark Streaming when start up
 like your situation using Kafka, these two parameters cannot well mitigate
 the problem. Actually you need to control the input data rate to not 
 inject
 so fast, you can try “spark.straming.receiver.maxRate” to control the 
 inject
 rate.



 Thanks

 Jerry



 From: Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com]
 Sent: Wednesday, September 10, 2014 5:21 AM
 To: user@spark.apache.org
 Subject: spark.cleaner.ttl and spark.streaming.unpersist



 The executors of my spark streaming application are being killed due to
 memory issues. The memory consumption is quite high on startup because is
 the first run and there are quite a few events on the kafka queues that 
 are
 consumed at a rate of 100K events per sec.

 I wonder if it's recommended to use spark.cleaner.ttl and
 spark.streaming.unpersist together to mitigate that problem. And I also
 wonder if new RDD are being batched while a RDD is being

Re: Low Level Kafka Consumer for Spark

2014-09-08 Thread Tim Smith
Thanks TD. Someone already pointed out to me that /repartition(...)/ isn't
the right way. You have to /val partedStream = repartition(...)/. Would be
nice to have it fixed in the docs.




On Fri, Sep 5, 2014 at 10:44 AM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 Some thoughts on this thread to clarify the doubts.

 1. Driver recovery: The current (1.1 to be released) does not recover the
 raw data that has been received but not processes. This is because when the
 driver dies, the executors die and so does the raw data that was stored in
 it. Only for HDFS, the data is not lost by driver recovery as the data is
 already present reliably in HDFS. This is something we want to fix by Spark
 1.2 (3 month from now). Regarding recovery by replaying the data from
 Kafka, it is possible but tricky. Our goal is to provide strong guarantee,
 exactly-once semantics in all transformations. To guarantee this for all
 kinds of streaming computations stateful and not-stateful computations, it
 is requires that the data be replayed through Kafka in exactly same order,
 and the underlying blocks of data in Spark be regenerated in the exact way
 as it would have if there was no driver failure. This is quite tricky to
 implement, requires manipulation of zookeeper offsets, etc, that is hard to
 do with the high level consumer that KafkaUtil uses. Dibyendu's low level
 Kafka receiver may enable such approaches in the future. For now we
 definitely plan to solve the first problem very very soon.

 3. Repartitioning: I am trying to understand the repartition issue. One
 common mistake I have seen is that developers repartition a stream but not
 use the repartitioned stream.

 WRONG:
 inputDstream.repartition(100)
 inputDstream.map(...).count().print()

 RIGHT:
 val repartitionedDStream = inputDStream.repartitoin(100)
 repartitionedDStream.map(...).count().print()

 Not sure if this helps solve the problem that you all the facing. I am
 going to add this to the stremaing programming guide to make sure this
 common mistake is avoided.

 TD




 On Wed, Sep 3, 2014 at 10:38 AM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 Hi,

 Sorry for little delay . As discussed in this thread, I have modified the
 Kafka-Spark-Consumer ( https://github.com/dibbhatt/kafka-spark-consumer)
 code to have dedicated Receiver for every Topic Partition. You can see the
 example howto create Union of these receivers
 in consumer.kafka.client.Consumer.java .

 Thanks to Chris for suggesting this change.

 Regards,
 Dibyendu


 On Mon, Sep 1, 2014 at 2:55 AM, RodrigoB rodrigo.boav...@aspect.com
 wrote:

 Just a comment on the recovery part.

 Is it correct to say that currently Spark Streaming recovery design does
 not
 consider re-computations (upon metadata lineage recovery) that depend on
 blocks of data of the received stream?
 https://issues.apache.org/jira/browse/SPARK-1647

 Just to illustrate a real use case (mine):
 - We have object states which have a Duration field per state which is
 incremented on every batch interval. Also this object state is reset to 0
 upon incoming state changing events. Let's supposed there is at least one
 event since the last data checkpoint. This will lead to inconsistency
 upon
 driver recovery: The Duration field will get incremented from the data
 checkpoint version until the recovery moment, but the state change event
 will never be re-processed...so in the end we have the old state with the
 wrong Duration value.
 To make things worst, let's imagine we're dumping the Duration increases
 somewhere...which means we're spreading the problem across our system.
 Re-computation awareness is something I've commented on another thread
 and
 rather treat it separately.

 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-td12568.html#a13205

 Re-computations do occur, but the only RDD's that are recovered are the
 ones
 from the data checkpoint. This is what we've seen. Is not enough by
 itself
 to ensure recovery of computed data and this partial recovery leads to
 inconsistency in some cases.

 Roger - I share the same question with you - I'm just not sure if the
 replicated data really gets persisted on every batch. The execution
 lineage
 is checkpointed, but if we have big chunks of data being consumed to
 Receiver node on let's say a second bases then having it persisted to
 HDFS
 every second could be a big challenge for keeping JVM performance - maybe
 that could be reason why it's not really implemented...assuming it isn't.

 Dibyendu had a great effort with the offset controlling code but the
 general
 state consistent recovery feels to me like another big issue to address.

 I plan on having a dive into the Streaming code and try to at least
 contribute with some ideas. Some more insight from anyone on the dev team
 will be very appreciated.

 tnks,
 Rod




 --
 View this message in context:
 

Re: Publishing a transformed DStream to Kafka

2014-09-02 Thread Tim Smith
I'd be interested in finding the answer too. Right now, I do:

val kafkaOutMsgs = kafkInMessages.map(x=myFunc(x._2,someParam))
kafkaOutMsgs.foreachRDD((rdd,time) = { rdd.foreach(rec = {
writer.output(rec) }) } ) //where writer.ouput is a method that takes a
string and writer is an instance of a producer class.





On Tue, Sep 2, 2014 at 10:12 AM, Massimiliano Tomassi max.toma...@gmail.com
 wrote:

 Hello all,
 after having applied several transformations to a DStream I'd like to
 publish all the elements in all the resulting RDDs to Kafka. What the best
 way to do that would be? Just using DStream.foreach and then RDD.foreach ?
 Is there any other built in utility for this use case?

 Thanks a lot,
 Max

 --
 
 Massimiliano Tomassi
 
 e-mail: max.toma...@gmail.com
 



Re: Low Level Kafka Consumer for Spark

2014-08-30 Thread Tim Smith
I'd be interested to understand this mechanism as well. But this is the
error recovery part of the equation. Consuming from Kafka has two aspects -
parallelism and error recovery and I am not sure how either works. For
error recovery, I would like to understand how:
- A failed receiver gets re-spawned. In 1.0.0, despite settings failed
tasks threshold to 64, my job aborts after 4 receiver task failures.
- Data loss recovery due to a failed receiver task/executor.


 For parallelism, I would expect a single createStream() to intelligently
map a receiver thread somewhere, one for each kafka partition, but in
different JVMs. Also, repartition() does not seem to work as advertised. A
repartition(512) should get nodes other than the receiver nodes to get some
RDDs to process. No?


On Sat, Aug 30, 2014 at 7:14 PM, Roger Hoover roger.hoo...@gmail.com
wrote:

 I have this same question.  Isn't there somewhere that the Kafka range
 metadata can be saved?  From my naive perspective, it seems like it should
 be very similar to HDFS lineage.  The original HDFS blocks are kept
 somewhere (in the driver?) so that if an RDD partition is lost, it can be
 recomputed.  In this case, all we need is the Kafka topic, partition, and
 offset range.

 Can someone enlighten us on why two copies of the RDD are needed (or some
 other mechanism like a WAL) for fault tolerance when using Kafka but not
 when reading from say HDFS?



 On Fri, Aug 29, 2014 at 8:58 AM, Jonathan Hodges hodg...@gmail.com
 wrote:

 'this 2-node replication is mainly for failover in case the receiver
 dies while data is in flight.  there's still chance for data loss as
 there's no write ahead log on the hot path, but this is being addressed.'

 Can you comment a little on how this will be addressed, will there be a
 durable WAL?  Is there a JIRA for tracking this effort?

 I am curious without WAL if you can avoid this data loss with explicit
 management of Kafka offsets e.g. don't commit offset unless data is
 replicated to multiple nodes or maybe not until processed.  The incoming
 data will always be durably stored to disk in Kafka so can be replayed in
 failure scenarios to avoid data loss if the offsets are managed properly.




 On Thu, Aug 28, 2014 at 12:02 PM, Chris Fregly ch...@fregly.com wrote:

 @bharat-

 overall, i've noticed a lot of confusion about how Spark Streaming
 scales - as well as how it handles failover and checkpointing, but we can
 discuss that separately.

 there's actually 2 dimensions to scaling here:  receiving and processing.

 *Receiving*
 receiving can be scaled out by submitting new DStreams/Receivers to the
 cluster as i've done in the Kinesis example.  in fact, i purposely chose to
 submit multiple receivers in my Kinesis example because i feel it should be
 the norm and not the exception - particularly for partitioned and
 checkpoint-capable streaming systems like Kafka and Kinesis.   it's the
 only way to scale.

 a side note here is that each receiver running in the cluster will
 immediately replicates to 1 other node for fault-tolerance of that specific
 receiver.  this is where the confusion lies.  this 2-node replication is
 mainly for failover in case the receiver dies while data is in flight.
  there's still chance for data loss as there's no write ahead log on the
 hot path, but this is being addressed.

 this in mentioned in the docs here:
 https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving

 *Processing*
 once data is received, tasks are scheduled across the Spark cluster just
 like any other non-streaming task where you can specify the number of
 partitions for reduces, etc.  this is the part of scaling that is sometimes
 overlooked - probably because it works just like regular Spark, but it is
 worth highlighting.

 Here's a blurb in the docs:
 https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-processing

 the other thing that's confusing with Spark Streaming is that in Scala,
 you need to explicitly

 import
 org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions

 in order to pick up the implicits that allow DStream.reduceByKey and
 such (versus DStream.transform(rddBatch = rddBatch.reduceByKey())

 in other words, DStreams appear to be relatively featureless until you
 discover this implicit.  otherwise, you need to operate on the underlying
 RDD's explicitly which is not ideal.

 the Kinesis example referenced earlier in the thread uses the DStream
 implicits.


 side note to all of this - i've recently convinced my publisher for my
 upcoming book, Spark In Action, to let me jump ahead and write the Spark
 Streaming chapter ahead of other more well-understood libraries.  early
 release is in a month or so.  sign up  @ http://sparkinaction.com if
 you wanna get notified.

 shameless plug that i wouldn't otherwise do, but i really think it will
 help clear a lot of confusion in this area 

Re: Failed to run runJob at ReceiverTracker.scala

2014-08-29 Thread Tim Smith
I upped the ulimit to 128k files on all nodes. Job crashed again with
DAGScheduler: Failed to run runJob at ReceiverTracker.scala:275.
Couldn't get the logs because I killed the job and looks like yarn
wipe the container logs (not sure why it wipes the logs under
/var/log/hadoop-yarn/container). Next time, I will grab the logs while
the job is still active/zombie.

So is there a limit on how many times a receiver is re-spawned?

Thanks,

Tim


On Thu, Aug 28, 2014 at 10:06 PM, Tathagata Das
tathagata.das1...@gmail.com wrote:
 It did. It got failed and respawned 4 times.
 In this case, the too many open files is a sign that you need increase the
 system-wide limit of open files.
 Try adding ulimit -n 16000 to your conf/spark-env.sh.

 TD


 On Thu, Aug 28, 2014 at 5:29 PM, Tim Smith secs...@gmail.com wrote:

 Appeared after running for a while. I re-ran the job and this time, it
 crashed with:
 14/08/29 00:18:50 WARN ReceiverTracker: Error reported by receiver for
 stream 0: Error in block pushing thread - java.net.SocketException: Too many
 open files

 Shouldn't the failed receiver get re-spawned on a different worker?



 On Thu, Aug 28, 2014 at 4:12 PM, Tathagata Das
 tathagata.das1...@gmail.com wrote:

 Do you see this error right in the beginning or after running for
 sometime?

 The root cause seems to be that somehow your Spark executors got killed,
 which killed receivers and caused further errors. Please try to take a look
 at the executor logs of the lost executor to find what is the root cause
 that caused the executor to fail.

 TD


 On Thu, Aug 28, 2014 at 3:54 PM, Tim Smith secs...@gmail.com wrote:

 Hi,

 Have a Spark-1.0.0 (CDH5) streaming job reading from kafka that died
 with:

 14/08/28 22:28:15 INFO DAGScheduler: Failed to run runJob at
 ReceiverTracker.scala:275
 Exception in thread Thread-59 14/08/28 22:28:15 INFO
 YarnClientClusterScheduler: Cancelling stage 2
 14/08/28 22:28:15 INFO DAGScheduler: Executor lost: 5 (epoch 4)
 14/08/28 22:28:15 INFO BlockManagerMasterActor: Trying to remove
 executor 5 from BlockManagerMaster.
 14/08/28 22:28:15 INFO BlockManagerMaster: Removed 5 successfully in
 removeExecutor
 org.apache.spark.SparkException: Job aborted due to stage failure: Task
 2.0:0 failed 4 times, most recent failure: TID 6481 on host
 node-dn1-1.ops.sfdc.net failed for unknown reason
 Driver stacktrace:
 at
 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
 at scala.Option.foreach(Option.scala:236)
 at
 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
 at
 org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 at akka.actor.ActorCell.invoke(ActorCell.scala:456)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


 Any insights into this error?

 Thanks,

 Tim





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: DStream repartitioning, performance tuning processing

2014-08-29 Thread Tim Smith
I set partitions to 64:

//
 kInMsg.repartition(64)
 val outdata = kInMsg.map(x=normalizeLog(x._2,configMap))
//

Still see all activity only on the two nodes that seem to be receiving
from Kafka.

On Thu, Aug 28, 2014 at 5:47 PM, Tim Smith secs...@gmail.com wrote:
 TD - Apologies, didn't realize I was replying to you instead of the list.

 What does numPartitions refer to when calling createStream? I read an
 earlier thread that seemed to suggest that numPartitions translates to
 partitions created on the Spark side?
 http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201407.mbox/%3ccaph-c_o04j3njqjhng5ho281mqifnf3k_r6coqxpqh5bh6a...@mail.gmail.com%3E

 Actually, I re-tried with 64 numPartitions in createStream and that didn't
 work. I will manually set repartition to 64/128 and see how that goes.

 Thanks.




 On Thu, Aug 28, 2014 at 5:42 PM, Tathagata Das tathagata.das1...@gmail.com
 wrote:

 Having 16 partitions in KafkaUtils.createStream does not translate to the
 RDDs in Spark / Spark Streaming having 16 partitions. Repartition is the
 best way to distribute the received data between all the nodes, as long as
 there are sufficient number of partitions (try setting it to 2x the number
 cores given to the application).

 Yeah, in 1.0.0, ttl should be unnecessary.



 On Thu, Aug 28, 2014 at 5:17 PM, Tim Smith secs...@gmail.com wrote:

 On Thu, Aug 28, 2014 at 4:19 PM, Tathagata Das
 tathagata.das1...@gmail.com wrote:

 If you are repartitioning to 8 partitions, and your node happen to have
 at least 4 cores each, its possible that all 8 partitions are assigned to
 only 2 nodes. Try increasing the number of partitions. Also make sure you
 have executors (allocated by YARN) running on more than two nodes if you
 want to use all 11 nodes in your yarn cluster.


 If you look at the code, I commented out the manual re-partitioning to 8.
 Instead, I am created 16 partitions when I call createStream. But I will
 increase the partitions to, say, 64 and see if I get better parallelism.



 If you are using Spark 1.x, then you dont need to set the ttl for
 running Spark Streaming. In case you are using older version, why do you
 want to reduce it? You could reduce it, but it does increase the risk of 
 the
 premature cleaning, if once in a while things get delayed by 20 seconds. I
 dont see much harm in keeping the ttl at 60 seconds (a bit of extra garbage
 shouldnt hurt performance).


 I am running 1.0.0 (CDH5) so ttl setting is redundant? But you are right,
 unless I have memory issues, more aggressive pruning won't help.

 Thanks,

 Tim




 TD


 On Thu, Aug 28, 2014 at 3:16 PM, Tim Smith secs...@gmail.com wrote:

 Hi,

 In my streaming app, I receive from kafka where I have tried setting
 the partitions when calling createStream or later, by calling 
 repartition
 - in both cases, the number of nodes running the tasks seems to be
 stubbornly stuck at 2. Since I have 11 nodes in my cluster, I was hoping 
 to
 use more nodes.

 I am starting the job as:
 nohup spark-submit --class logStreamNormalizer --master yarn
 log-stream-normalizer_2.10-1.0.jar --jars
 spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar
 --executor-memory 30G --spark.cleaner.ttl 60 --executor-cores 8
 --num-executors 8 normRunLog-6.log 2normRunLogError-6.log  echo $! 
 run-6.pid

 My main code is:
  val sparkConf = new SparkConf().setAppName(SparkKafkaTest)
  val ssc = new StreamingContext(sparkConf,Seconds(5))
  val kInMsg =
 KafkaUtils.createStream(ssc,node-nn1-1:2181/zk_kafka,normApp,Map(rawunstruct
 - 16))

  val propsMap = Map(metadata.broker.list -
 node-dn1-6:9092,node-dn1-7:9092,node-dn1-8:9092, serializer.class -
 kafka.serializer.StringEncoder, producer.type - async,
 request.required.acks - 1)
  val to_topic = normStruct
  val writer = new KafkaOutputService(to_topic, propsMap)


  if (!configMap.keySet.isEmpty)
  {
   //kInMsg.repartition(8)
   val outdata = kInMsg.map(x=normalizeLog(x._2,configMap))
   outdata.foreachRDD((rdd,time) = { rdd.foreach(rec = {
 writer.output(rec) }) } )
  }

  ssc.start()
  ssc.awaitTermination()

 In terms of total delay, with a 5 second batch, the delays usually stay
 under 5 seconds, but sometimes jump to ~10 seconds. As a performance 
 tuning
 question, does this mean, I can reduce my cleaner ttl from 60 to say 25
 (still more than double of the peak delay)?

 Thanks

 Tim






-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: DStream repartitioning, performance tuning processing

2014-08-29 Thread Tim Smith
I wrote a long post about how I arrived here but in a nutshell I don't see
evidence of re-partitioning and workload distribution across the cluster.
My new fangled way of starting the job is:

run=`date +%m-%d-%YT%T`; \
nohup spark-submit --class logStreamNormalizer \
--master yarn log-stream-normalizer_2.10-1.0.jar \
--jars
spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar
\
--driver-memory 8G \
--executor-memory 30G \
--executor-cores 16 \
--num-executors 8 \
--spark.serializer org.apache.spark.serializer.KryoSerializer \
--spark.rdd.compress true \
--spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec \
--spark.akka.threads 16 \
--spark.task.maxFailures 64 \
--spark.scheduler.mode FAIR \
logs/normRunLog-$run.log \
2logs/normRunLogError-$run.log  \
echo $!  logs/run-$run.pid

Since the job spits out lots of logs, here is how I am trying to determine
if any tasks got assigned to non-local executors.
$ grep TID logs/normRunLogError-08-29-2014T18\:28\:32.log  | grep Starting
| grep -v NODE_LOCAL | grep -v PROCESS_LOCAL

Yields no lines.

If I look at resource pool usage in YARN, this app is assigned 252.5GB of
memory, 128 VCores and 9 containers. Am I missing something here?

Thanks,

Tim







On Thu, Aug 28, 2014 at 11:55 PM, Tim Smith secs...@gmail.com wrote:

 I set partitions to 64:

 //
  kInMsg.repartition(64)
  val outdata = kInMsg.map(x=normalizeLog(x._2,configMap))
 //

 Still see all activity only on the two nodes that seem to be receiving
 from Kafka.

 On Thu, Aug 28, 2014 at 5:47 PM, Tim Smith secs...@gmail.com wrote:
  TD - Apologies, didn't realize I was replying to you instead of the list.
 
  What does numPartitions refer to when calling createStream? I read an
  earlier thread that seemed to suggest that numPartitions translates to
  partitions created on the Spark side?
 
 http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201407.mbox/%3ccaph-c_o04j3njqjhng5ho281mqifnf3k_r6coqxpqh5bh6a...@mail.gmail.com%3E
 
  Actually, I re-tried with 64 numPartitions in createStream and that
 didn't
  work. I will manually set repartition to 64/128 and see how that goes.
 
  Thanks.
 
 
 
 
  On Thu, Aug 28, 2014 at 5:42 PM, Tathagata Das 
 tathagata.das1...@gmail.com
  wrote:
 
  Having 16 partitions in KafkaUtils.createStream does not translate to
 the
  RDDs in Spark / Spark Streaming having 16 partitions. Repartition is the
  best way to distribute the received data between all the nodes, as long
 as
  there are sufficient number of partitions (try setting it to 2x the
 number
  cores given to the application).
 
  Yeah, in 1.0.0, ttl should be unnecessary.
 
 
 
  On Thu, Aug 28, 2014 at 5:17 PM, Tim Smith secs...@gmail.com wrote:
 
  On Thu, Aug 28, 2014 at 4:19 PM, Tathagata Das
  tathagata.das1...@gmail.com wrote:
 
  If you are repartitioning to 8 partitions, and your node happen to
 have
  at least 4 cores each, its possible that all 8 partitions are
 assigned to
  only 2 nodes. Try increasing the number of partitions. Also make sure
 you
  have executors (allocated by YARN) running on more than two nodes if
 you
  want to use all 11 nodes in your yarn cluster.
 
 
  If you look at the code, I commented out the manual re-partitioning to
 8.
  Instead, I am created 16 partitions when I call createStream. But I
 will
  increase the partitions to, say, 64 and see if I get better
 parallelism.
 
 
 
  If you are using Spark 1.x, then you dont need to set the ttl for
  running Spark Streaming. In case you are using older version, why do
 you
  want to reduce it? You could reduce it, but it does increase the risk
 of the
  premature cleaning, if once in a while things get delayed by 20
 seconds. I
  dont see much harm in keeping the ttl at 60 seconds (a bit of extra
 garbage
  shouldnt hurt performance).
 
 
  I am running 1.0.0 (CDH5) so ttl setting is redundant? But you are
 right,
  unless I have memory issues, more aggressive pruning won't help.
 
  Thanks,
 
  Tim
 
 
 
 
  TD
 
 
  On Thu, Aug 28, 2014 at 3:16 PM, Tim Smith secs...@gmail.com wrote:
 
  Hi,
 
  In my streaming app, I receive from kafka where I have tried setting
  the partitions when calling createStream or later, by calling
 repartition
  - in both cases, the number of nodes running the tasks seems to be
  stubbornly stuck at 2. Since I have 11 nodes in my cluster, I was
 hoping to
  use more nodes.
 
  I am starting the job as:
  nohup spark-submit --class logStreamNormalizer --master yarn
  log-stream-normalizer_2.10-1.0.jar --jars
 
 spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar
  --executor-memory 30G --spark.cleaner.ttl 60 --executor-cores 8
  --num-executors 8 normRunLog-6.log 2normRunLogError-6.log  echo
 $! 
  run-6.pid
 
  My main code is:
   val sparkConf = new SparkConf().setAppName(SparkKafkaTest)
   val ssc = new

Re: DStream repartitioning, performance tuning processing

2014-08-29 Thread Tim Smith
Crash again. On the driver, logs say:
14/08/29 19:04:55 INFO BlockManagerMaster: Removed 7 successfully in
removeExecutor
org.apache.spark.SparkException: Job aborted due to stage failure: Task
2.0:0 failed 4 times, most recent failure: TID 6383 on host
node-dn1-2-acme.com failed for unknown reason
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


I go look at OS on node-dn1-2 and container logs for TID6383 but find
nothing.
# grep 6383 stderr
14/08/29 18:52:51 INFO CoarseGrainedExecutorBackend: Got assigned task 6383
14/08/29 18:52:51 INFO Executor: Running task ID 6383

However, last message on the container is timestamped 19:04:51 that tells
me the executor was killed for some reason right before the driver noticed
that executor/task failure.

How come my task failed only after 4 times although my config says failure
threshold is 64?








On Fri, Aug 29, 2014 at 12:00 PM, Tim Smith secs...@gmail.com wrote:

 I wrote a long post about how I arrived here but in a nutshell I don't see
 evidence of re-partitioning and workload distribution across the cluster.
 My new fangled way of starting the job is:

 run=`date +%m-%d-%YT%T`; \
 nohup spark-submit --class logStreamNormalizer \
 --master yarn log-stream-normalizer_2.10-1.0.jar \
 --jars
 spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar
 \
 --driver-memory 8G \
 --executor-memory 30G \
 --executor-cores 16 \
 --num-executors 8 \
 --spark.serializer org.apache.spark.serializer.KryoSerializer \
 --spark.rdd.compress true \
 --spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec \
 --spark.akka.threads 16 \
 --spark.task.maxFailures 64 \
 --spark.scheduler.mode FAIR \
 logs/normRunLog-$run.log \
 2logs/normRunLogError-$run.log  \
 echo $!  logs/run-$run.pid

 Since the job spits out lots of logs, here is how I am trying to determine
 if any tasks got assigned to non-local executors.
 $ grep TID logs/normRunLogError-08-29-2014T18\:28\:32.log  | grep Starting
 | grep -v NODE_LOCAL | grep -v PROCESS_LOCAL

 Yields no lines.

 If I look at resource pool usage in YARN, this app is assigned 252.5GB of
 memory, 128 VCores and 9 containers. Am I missing something here?

 Thanks,

 Tim







 On Thu, Aug 28, 2014 at 11:55 PM, Tim Smith secs...@gmail.com wrote:

 I set partitions to 64:

 //
  kInMsg.repartition(64)
  val outdata = kInMsg.map(x=normalizeLog(x._2,configMap))
 //

 Still see all activity only on the two nodes that seem to be receiving
 from Kafka.

 On Thu, Aug 28, 2014 at 5:47 PM, Tim Smith secs...@gmail.com wrote:
  TD - Apologies, didn't realize I was replying to you instead of the
 list.
 
  What does numPartitions refer to when calling createStream? I read an
  earlier thread that seemed to suggest that numPartitions translates to
  partitions created on the Spark side?
 
 http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201407.mbox/%3ccaph-c_o04j3njqjhng5ho281mqifnf3k_r6coqxpqh5bh6a...@mail.gmail.com%3E
 
  Actually, I re-tried with 64 numPartitions in createStream and that
 didn't
  work. I will manually set repartition to 64/128 and see how that goes.
 
  Thanks.
 
 
 
 
  On Thu, Aug 28

Re: Low Level Kafka Consumer for Spark

2014-08-29 Thread Tim Smith
Good to see I am not the only one who cannot get incoming Dstreams to
repartition. I tried repartition(512) but still no luck - the app
stubbornly runs only on two nodes. Now this is 1.0.0 but looking at
release notes for 1.0.1 and 1.0.2, I don't see anything that says this
was an issue and has been fixed.

How do I debug the repartition() statement to see what's the flow
after the job hits that statement?


On Fri, Aug 29, 2014 at 8:31 AM, bharatvenkat bvenkat.sp...@gmail.com wrote:
 Chris,

 I did the Dstream.repartition mentioned in the document on parallelism in
 receiving, as well as set spark.default.parallelism and it still uses only
 2 nodes in my cluster.  I notice there is another email thread on the same
 topic:

 http://apache-spark-user-list.1001560.n3.nabble.com/DStream-repartitioning-performance-tuning-processing-td13069.html

 My code is in Java and here is what I have:

JavaPairReceiverInputDStreamString, String messages =

 KafkaUtils.createStream(ssc, zkQuorum,
 cse-job-play-consumer, kafkaTopicMap);

 JavaPairDStreamString, String newMessages =
 messages.repartition(partitionSize);// partitionSize=30

 JavaDStreamString lines = newMessages.map(new
 FunctionTuple2lt;String, String, String() {
 ...

 public String call(Tuple2String, String tuple2) {
   return tuple2._2();
 }
   });

 JavaDStreamString words = lines.flatMap(new
 MetricsComputeFunction()
 );

 JavaPairDStreamString, Integer wordCounts = words.mapToPair(
 new PairFunctionString, String, Integer() {
...
 }
 );

  wordCounts.foreachRDD(new FunctionJavaPairRDDlt;String, Integer,
 Void() {...});

 Thanks,
 Bharat



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13131.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Low Level Kafka Consumer for Spark

2014-08-29 Thread Tim Smith
I create my DStream very simply as:
val kInMsg =
KafkaUtils.createStream(ssc,zkhost1:2181/zk_kafka,testApp,Map(rawunstruct
- 8))
.
.
eventually, before I operate on the DStream, I repartition it:
kInMsg.repartition(512)

Are you saying that ^^ repartition doesn't split by dstream into multiple
smaller streams? Should I manually create multiple Dstreams like this?:
val kInputs = (1 to 10).map {_= KafkaUtils.createStream()}

Then I apply some custom logic to it as:
val outdata = kInMsg.map(x=normalizeLog(x._2,configMap)) //where
normalizeLog takes a String and Map of regex and returns a string

In my case, I think I have traced the issue to the receiver executor being
killed by Yarn:
14/08/29 22:46:30 ERROR YarnClientClusterScheduler: Lost executor 1 on
node-dn1-4-acme.com: remote Akka client disassociated

This be the root cause?
http://apache-spark-developers-list.1001551.n3.nabble.com/Lost-executor-on-YARN-ALS-iterations-td7916.html
https://issues.apache.org/jira/browse/SPARK-2121





On Fri, Aug 29, 2014 at 3:28 PM, Sean Owen so...@cloudera.com wrote:

 Are you using multiple Dstreams? repartitioning does not affect how
 many receivers you have. It's on 2 nodes for each receiver. You need
 multiple partitions in the queue, each consumed by a DStream, if you
 mean to parallelize consuming the queue.

 On Fri, Aug 29, 2014 at 11:08 PM, Tim Smith secs...@gmail.com wrote:
  Good to see I am not the only one who cannot get incoming Dstreams to
  repartition. I tried repartition(512) but still no luck - the app
  stubbornly runs only on two nodes. Now this is 1.0.0 but looking at
  release notes for 1.0.1 and 1.0.2, I don't see anything that says this
  was an issue and has been fixed.
 
  How do I debug the repartition() statement to see what's the flow
  after the job hits that statement?
 
 
  On Fri, Aug 29, 2014 at 8:31 AM, bharatvenkat bvenkat.sp...@gmail.com
 wrote:
  Chris,
 
  I did the Dstream.repartition mentioned in the document on parallelism
 in
  receiving, as well as set spark.default.parallelism and it still uses
 only
  2 nodes in my cluster.  I notice there is another email thread on the
 same
  topic:
 
 
 http://apache-spark-user-list.1001560.n3.nabble.com/DStream-repartitioning-performance-tuning-processing-td13069.html
 
  My code is in Java and here is what I have:
 
 JavaPairReceiverInputDStreamString, String messages =
 
  KafkaUtils.createStream(ssc, zkQuorum,
  cse-job-play-consumer, kafkaTopicMap);
 
  JavaPairDStreamString, String newMessages =
  messages.repartition(partitionSize);// partitionSize=30
 
  JavaDStreamString lines = newMessages.map(new
  FunctionTuple2lt;String, String, String() {
  ...
 
  public String call(Tuple2String, String tuple2) {
return tuple2._2();
  }
});
 
  JavaDStreamString words = lines.flatMap(new
  MetricsComputeFunction()
  );
 
  JavaPairDStreamString, Integer wordCounts = words.mapToPair(
  new PairFunctionString, String, Integer() {
 ...
  }
  );
 
   wordCounts.foreachRDD(new FunctionJavaPairRDDlt;String,
 Integer,
  Void() {...});
 
  Thanks,
  Bharat
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13131.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 



Re: Low Level Kafka Consumer for Spark

2014-08-29 Thread Tim Smith
Ok, so I did this:
val kInStreams = (1 to 10).map{_ =
KafkaUtils.createStream(ssc,zkhost1:2181/zk_kafka,testApp,Map(rawunstruct
- 1)) }
val kInMsg = ssc.union(kInStreams)
val outdata = kInMsg.map(x=normalizeLog(x._2,configMap))

This has improved parallelism. Earlier I would only get a Stream 0. Now I
have Streams [0-9]. Of course, since the kafka topic has only three
partitions, only three of those streams are active but I am seeing more
blocks being pulled across the three streams total that what one was doing
earlier. Also, four nodes are actively processing tasks (vs only two
earlier) now which actually has me confused. If Streams are active only
on 3 nodes then how/why did a 4th node get work? If a 4th got work why
aren't more nodes getting work?






On Fri, Aug 29, 2014 at 4:11 PM, Tim Smith secs...@gmail.com wrote:

 I create my DStream very simply as:
 val kInMsg =
 KafkaUtils.createStream(ssc,zkhost1:2181/zk_kafka,testApp,Map(rawunstruct
 - 8))
 .
 .
 eventually, before I operate on the DStream, I repartition it:
 kInMsg.repartition(512)

 Are you saying that ^^ repartition doesn't split by dstream into multiple
 smaller streams? Should I manually create multiple Dstreams like this?:
 val kInputs = (1 to 10).map {_= KafkaUtils.createStream()}

 Then I apply some custom logic to it as:
 val outdata = kInMsg.map(x=normalizeLog(x._2,configMap)) //where
 normalizeLog takes a String and Map of regex and returns a string

 In my case, I think I have traced the issue to the receiver executor being
 killed by Yarn:
 14/08/29 22:46:30 ERROR YarnClientClusterScheduler: Lost executor 1 on
 node-dn1-4-acme.com: remote Akka client disassociated

 This be the root cause?

 http://apache-spark-developers-list.1001551.n3.nabble.com/Lost-executor-on-YARN-ALS-iterations-td7916.html
 https://issues.apache.org/jira/browse/SPARK-2121





 On Fri, Aug 29, 2014 at 3:28 PM, Sean Owen so...@cloudera.com wrote:

 Are you using multiple Dstreams? repartitioning does not affect how
 many receivers you have. It's on 2 nodes for each receiver. You need
 multiple partitions in the queue, each consumed by a DStream, if you
 mean to parallelize consuming the queue.

 On Fri, Aug 29, 2014 at 11:08 PM, Tim Smith secs...@gmail.com wrote:
  Good to see I am not the only one who cannot get incoming Dstreams to
  repartition. I tried repartition(512) but still no luck - the app
  stubbornly runs only on two nodes. Now this is 1.0.0 but looking at
  release notes for 1.0.1 and 1.0.2, I don't see anything that says this
  was an issue and has been fixed.
 
  How do I debug the repartition() statement to see what's the flow
  after the job hits that statement?
 
 
  On Fri, Aug 29, 2014 at 8:31 AM, bharatvenkat bvenkat.sp...@gmail.com
 wrote:
  Chris,
 
  I did the Dstream.repartition mentioned in the document on parallelism
 in
  receiving, as well as set spark.default.parallelism and it still
 uses only
  2 nodes in my cluster.  I notice there is another email thread on the
 same
  topic:
 
 
 http://apache-spark-user-list.1001560.n3.nabble.com/DStream-repartitioning-performance-tuning-processing-td13069.html
 
  My code is in Java and here is what I have:
 
 JavaPairReceiverInputDStreamString, String messages =
 
  KafkaUtils.createStream(ssc, zkQuorum,
  cse-job-play-consumer, kafkaTopicMap);
 
  JavaPairDStreamString, String newMessages =
  messages.repartition(partitionSize);// partitionSize=30
 
  JavaDStreamString lines = newMessages.map(new
  FunctionTuple2lt;String, String, String() {
  ...
 
  public String call(Tuple2String, String tuple2) {
return tuple2._2();
  }
});
 
  JavaDStreamString words = lines.flatMap(new
  MetricsComputeFunction()
  );
 
  JavaPairDStreamString, Integer wordCounts = words.mapToPair(
  new PairFunctionString, String, Integer() {
 ...
  }
  );
 
   wordCounts.foreachRDD(new FunctionJavaPairRDDlt;String,
 Integer,
  Void() {...});
 
  Thanks,
  Bharat
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13131.html
  Sent from the Apache Spark User List mailing list archive at
 Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 





Re: Failed to run runJob at ReceiverTracker.scala

2014-08-28 Thread Tim Smith
Appeared after running for a while. I re-ran the job and this time, it
crashed with:
14/08/29 00:18:50 WARN ReceiverTracker: Error reported by receiver for
stream 0: Error in block pushing thread - java.net.SocketException: Too
many open files

Shouldn't the failed receiver get re-spawned on a different worker?



On Thu, Aug 28, 2014 at 4:12 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 Do you see this error right in the beginning or after running for sometime?

 The root cause seems to be that somehow your Spark executors got killed,
 which killed receivers and caused further errors. Please try to take a look
 at the executor logs of the lost executor to find what is the root cause
 that caused the executor to fail.

 TD


 On Thu, Aug 28, 2014 at 3:54 PM, Tim Smith secs...@gmail.com wrote:

 Hi,

 Have a Spark-1.0.0 (CDH5) streaming job reading from kafka that died with:

 14/08/28 22:28:15 INFO DAGScheduler: Failed to run runJob at
 ReceiverTracker.scala:275
 Exception in thread Thread-59 14/08/28 22:28:15 INFO
 YarnClientClusterScheduler: Cancelling stage 2
 14/08/28 22:28:15 INFO DAGScheduler: Executor lost: 5 (epoch 4)
 14/08/28 22:28:15 INFO BlockManagerMasterActor: Trying to remove executor
 5 from BlockManagerMaster.
 14/08/28 22:28:15 INFO BlockManagerMaster: Removed 5 successfully in
 removeExecutor
 org.apache.spark.SparkException: Job aborted due to stage failure: Task
 2.0:0 failed 4 times, most recent failure: TID 6481 on host
 node-dn1-1.ops.sfdc.net failed for unknown reason
 Driver stacktrace:
 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
 at scala.Option.foreach(Option.scala:236)
 at
 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
 at
 org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 at akka.actor.ActorCell.invoke(ActorCell.scala:456)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


 Any insights into this error?

 Thanks,

 Tim





Re: DStream repartitioning, performance tuning processing

2014-08-28 Thread Tim Smith
TD - Apologies, didn't realize I was replying to you instead of the list.

What does numPartitions refer to when calling createStream? I read an
earlier thread that seemed to suggest that numPartitions translates to
partitions created on the Spark side?
http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201407.mbox/%3ccaph-c_o04j3njqjhng5ho281mqifnf3k_r6coqxpqh5bh6a...@mail.gmail.com%3E

Actually, I re-tried with 64 numPartitions in createStream and that didn't
work. I will manually set repartition to 64/128 and see how that goes.

Thanks.




On Thu, Aug 28, 2014 at 5:42 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 Having 16 partitions in KafkaUtils.createStream does not translate to the
 RDDs in Spark / Spark Streaming having 16 partitions. Repartition is the
 best way to distribute the received data between all the nodes, as long as
 there are sufficient number of partitions (try setting it to 2x the number
 cores given to the application).

 Yeah, in 1.0.0, ttl should be unnecessary.



 On Thu, Aug 28, 2014 at 5:17 PM, Tim Smith secs...@gmail.com wrote:

 On Thu, Aug 28, 2014 at 4:19 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 If you are repartitioning to 8 partitions, and your node happen to have
 at least 4 cores each, its possible that all 8 partitions are assigned to
 only 2 nodes. Try increasing the number of partitions. Also make sure you
 have executors (allocated by YARN) running on more than two nodes if you
 want to use all 11 nodes in your yarn cluster.


 If you look at the code, I commented out the manual re-partitioning to 8.
 Instead, I am created 16 partitions when I call createStream. But I will
 increase the partitions to, say, 64 and see if I get better parallelism.



 If you are using Spark 1.x, then you dont need to set the ttl for
 running Spark Streaming. In case you are using older version, why do you
 want to reduce it? You could reduce it, but it does increase the risk of
 the premature cleaning, if once in a while things get delayed by 20
 seconds. I dont see much harm in keeping the ttl at 60 seconds (a bit of
 extra garbage shouldnt hurt performance).


 I am running 1.0.0 (CDH5) so ttl setting is redundant? But you are right,
 unless I have memory issues, more aggressive pruning won't help.

 Thanks,

 Tim




  TD


 On Thu, Aug 28, 2014 at 3:16 PM, Tim Smith secs...@gmail.com wrote:

 Hi,

 In my streaming app, I receive from kafka where I have tried setting
 the partitions when calling createStream or later, by calling repartition
 - in both cases, the number of nodes running the tasks seems to be
 stubbornly stuck at 2. Since I have 11 nodes in my cluster, I was hoping to
 use more nodes.

 I am starting the job as:
 nohup spark-submit --class logStreamNormalizer --master yarn
 log-stream-normalizer_2.10-1.0.jar --jars
 spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar
 --executor-memory 30G --spark.cleaner.ttl 60 --executor-cores 8
 --num-executors 8 normRunLog-6.log 2normRunLogError-6.log  echo $! 
 run-6.pid

 My main code is:
  val sparkConf = new SparkConf().setAppName(SparkKafkaTest)
  val ssc = new StreamingContext(sparkConf,Seconds(5))
  val kInMsg =
 KafkaUtils.createStream(ssc,node-nn1-1:2181/zk_kafka,normApp,Map(rawunstruct
 - 16))

  val propsMap = Map(metadata.broker.list -
 node-dn1-6:9092,node-dn1-7:9092,node-dn1-8:9092, serializer.class -
 kafka.serializer.StringEncoder, producer.type - async,
 request.required.acks - 1)
  val to_topic = normStruct
  val writer = new KafkaOutputService(to_topic, propsMap)


  if (!configMap.keySet.isEmpty)
  {
   //kInMsg.repartition(8)
   val outdata = kInMsg.map(x=normalizeLog(x._2,configMap))
   outdata.foreachRDD((rdd,time) = { rdd.foreach(rec = {
 writer.output(rec) }) } )
  }

  ssc.start()
  ssc.awaitTermination()

 In terms of total delay, with a 5 second batch, the delays usually stay
 under 5 seconds, but sometimes jump to ~10 seconds. As a performance tuning
 question, does this mean, I can reduce my cleaner ttl from 60 to say 25
 (still more than double of the peak delay)?

 Thanks

 Tim







Kafka stream receiver stops input

2014-08-27 Thread Tim Smith
Hi,

I have Spark (1.0.0 on CDH5) running with Kafka 0.8.1.1.

I have a streaming jobs that reads from a kafka topic and writes
output to another kafka topic. The job starts fine but after a while
the input stream stops getting any data. I think these messages show
no incoming data on the stream:
14/08/28 00:42:15 INFO ReceiverTracker: Stream 0 received 0 blocks

I run the job as:
spark-submit --class logStreamNormalizer --master yarn
log-stream-normalizer_2.10-1.0.jar --jars
spark-streaming-kafka_2.10-1.0.2.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar
--executor-memory 6G --spark.cleaner.ttl 60 --executor-cores 4

As soon as I start the job, I see an error like:

14/08/28 00:50:59 INFO BlockManagerInfo: Added input-0-1409187056800
in memory on node6-acme.com:39418 (size: 83.3 MB, free: 3.1 GB)
Exception in thread pool-1-thread-7 java.lang.OutOfMemoryError: Java
heap space
at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
at org.apache.spark.storage.BlockMessage.set(BlockMessage.scala:85)
at org.apache.spark.storage.BlockMessage$.fromByteBuffer(BlockMessage.scala:176)
at org.apache.spark.storage.BlockMessageArray.set(BlockMessageArray.scala:63)
at 
org.apache.spark.storage.BlockMessageArray$.fromBufferMessage(BlockMessageArray.scala:109)
at 
org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:42)
at 
org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34)
at 
org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34)
at 
org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:662)
at 
org.apache.spark.network.ConnectionManager$$anon$9.run(ConnectionManager.scala:504)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

But not sure if that is the cause because even after that OOM message,
I see data coming in:
14/08/28 00:51:00 INFO ReceiverTracker: Stream 0 received 6 blocks

Appreciate any pointers or suggestions to troubleshoot the issue.

Thanks

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Updating shared data structure between executors

2014-08-19 Thread Tim Smith
Hi,

I am writing some Scala code to normalize a stream of logs using an
input configuration file (multiple regex patterns). To avoid
re-starting the job, I can read in a new config file using fileStream
and then turn the config file to a map. But I am unsure about how to
update a shared map (since broadcast vars cannot be updated)?

Any help or pointers will be appreciated.

Thanks!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org