Hi guys,
I have spark streaming application and I want to increase its
performance.Basically its a design question. My input is like time, s1,s2
...s23. I have to process these columns with same operations. I am running on
40 cores on 10 machines.
1. I am trying to get rid of the loop in the middle but could not think
anything useful.2. When I remove saveAsTextFiles, it gets much better from 6
second to 2 second for each batch of 2 second.
Any directions would be helpful. val eegStream = KafkaUtils.createStream(ssc,
zkQuorum, group, Map(args(0) -> 1),StorageLevel.MEMORY_ONLY).map(_._2)
val windowedEEG = eegStream.window(Seconds(2), Seconds(2)).cache
val timeAndFile = windowedEEG.map(x=> { val token = x.split(",")
(math.round(token(1).toDouble),token(0))
}) val firstTimeAndFile = timeAndFile.transform( rdd =>
rdd.context.makeRDD(rdd.sortByKey(true).take(1))).map(x=>(1L,(x._1,x._2)))
ssc.sparkContext.parallelize((2 to 24).map(i=> {
val counts = windowedEEG.map(x=> { val token =
x.split(",")
(math.round(token(i).toDouble))
}).countByValue()
val topCounts = counts.map(_.swap).transform( rdd =>
rdd.context.makeRDD(rdd.top(60),4))
val absoluteTopCounts = topCounts.map(x => (math.abs(x._2)*x._1 , x._1
)).reduce((a, b) => (a._1 + b._1, a._2 + b._2))
val windowedFWA = absoluteTopCounts.map(x => (x._1.toFloat/x._2))
//Frequency Weighted Amplitude for Normal Data
val CMA = windowedFWA.map(r =>
(1,(r.toDouble,1,1))).updateStateByKey[(Double,Int,Int)](updateSum).map(_._2)
val anomaly = CMA.map(x => (1L , x._3)) joinedResult(i)
= anomaly.join(firstTimeAndFile) joinedResult(i).map(x =>
"%s,%s,%s".format(x._2._2._2, x._2._2._1,
x._2._1)).saveAsTextFiles("hdfs://host-10-1-4-90.novalocal:9000/user/hduser/output/")
joinedResult(i).print
}))
Regards,Laeeq