Hi,
I am working with multiple Kafka streams (23 streams) and currently I am
processing them separately. I receive one stream from each topic. I have the
following questions.
1. Spark streaming guide suggests to union these streams. Is it possible to
get statistics of each stream even after they are unioned?
2. My calculations are not complex. I use 2 second batch interval and if I
use 2 streams they get easily processed under 2 seconds by a single core. There
is some shuffling involved in my application. As I increase the number of
streams and the number of executors accordingly, the applications scheduling
delay increases and become unmanageable in 2 seconds. As I believe this happens
because with that many streams, the number of tasks increases thus the
shuffling magnifies and also that all streams using the same executors. Is it
possible to provide part of executors to particular stream while processing
streams simultaneously? E.g. if I have 15 cores on cluster and 5 streams, 5
cores will be taken by 5 receivers and of the rest 10, can I provide 2 cores
each to one of the 5 streams. Just to add, increasing the batch interval does
help but I don't want to increase the batch size due to application
restrictions and delayed results (The blockInterval and defaultParallelism does
help to a limited extent).
Please see attach file for CODE SNIPPET
Regards,Laeeq
//Setting system properties
val conf = new
SparkConf().setMaster("spark://10.1.4.90:7077").setAppName("StreamAnomalyDetector")
.setSparkHome(System.getenv("SPARK_HOME"))
.setJars(List("target/scalaad-1.0-SNAPSHOT-jar-with-dependencies.jar"))
.set("spark.executor.memory", "6g")
.set("spark.executor.logs.rolling.strategy", "size")
.set("spark.executor.logs.rolling.size.maxBytes", "1024")
.set("spark.executor.logs.rolling.maxRetainedFiles", "3")
.set("spark.speculation","true")
.set("spark.locality.wait","1000")
.set("spark.streaming.unpersist","true")
.set("spark.streaming.blockInterval","100")
.set("spark.default.parallelism","10")
val zkQuorum =
"10.1.4.144:2181,10.1.4.145:2181,10.1.4.146:2181,10.1.4.147:2181,10.1.4.148:2181"
val group = "test-group"
// Create the context
val ssc = new StreamingContext(conf, Seconds(2))
//hdfs path to checkpoint old data
ssc.checkpoint("hdfs://host-10-1-4-90.novalocal:9000/user/hduser/checkpointing")
// Create the KafkaDStream
for (a <- 0 to (args.length - 1))
{
val eegStreams = KafkaUtils.createStream(ssc, zkQuorum, group,
Map(args(a) -> 1),StorageLevel.MEMORY_ONLY).map(_._2)
val keyAndValues = eegStreams.map(x=> { val token = x.split(",")
(token(0),token(1),token(2))
}).persist(StorageLevel.MEMORY_ONLY)
val timeAndFile = keyAndValues.map(x=>
(math.round(x._1.toDouble),x._2)).window(Seconds(2), Seconds(2))
val firstTimeAndFile = timeAndFile.transform( rdd =>
rdd.context.makeRDD(rdd.sortByKey(true).take(1))).map(x=>(1L,(x._1,x._2)))
val counts = keyAndValues.map(x =>
math.round(x._3.toDouble)).countByValueAndWindow(Seconds(2),Seconds(2))
val topCounts = counts.map(_.swap).transform( rdd =>
rdd.context.makeRDD(rdd.top(60), 10))
val absoluteTopCounts = topCounts.map(x => (math.abs(x._2)*x._1 , x._1
)).reduce((a, b) => (a._1 + b._1, a._2 + b._2))
val windowedFWA = absoluteTopCounts.map(x => (x._1.toFloat/x._2))
//Frequency Weighted Amplitude for Normal Data
//CMA Stands for Cumulative Moving Average of Frequency Weighted
Amplitude
val CMA = windowedFWA.map(r =>
(1,(r.toDouble,1,1))).updateStateByKey[(Double,Int,Int)](updateSum).map(_._2)
val anomaly = CMA.map(x => (1L , x._3))
val joinedResult = anomaly.join(firstTimeAndFile)
joinedResult.map(x => "%s,%s,%s".format(x._2._2._2, x._2._2._1,
x._2._1)).saveAsTextFiles("hdfs://host-10-1-4-90.novalocal:9000/user/hduser/output/"
+ (a+1))
joinedResult.print
}
ssc.start()
ssc.awaitTermination()
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]