Hi guys,
I have spark streaming application and I want to increase its 
performance.Basically its a design question. My input is like time, s1,s2 
...s23. I have to process these columns with same operations. I am running on 
40 cores on 10 machines.
1. I am trying to get rid of the loop in the middle but could not think 
anything useful.2. When I remove saveAsTextFiles, it gets much better from 6 
second to 2 second for each batch of 2 second.
Any directions would be helpful. val eegStream = KafkaUtils.createStream(ssc, 
zkQuorum, group, Map(args(0) -> 1),StorageLevel.MEMORY_ONLY).map(_._2)        
val windowedEEG = eegStream.window(Seconds(2), Seconds(2)).cache
        val timeAndFile = windowedEEG.map(x=> { val token = x.split(",")        
                                        
(math.round(token(1).toDouble),token(0))                                        
        })        val firstTimeAndFile = timeAndFile.transform( rdd => 
rdd.context.makeRDD(rdd.sortByKey(true).take(1))).map(x=>(1L,(x._1,x._2))) 
        ssc.sparkContext.parallelize((2 to 24).map(i=> {                        
                        val counts = windowedEEG.map(x=> { val token = 
x.split(",")                                                
(math.round(token(i).toDouble))                                                
}).countByValue()
        val topCounts = counts.map(_.swap).transform( rdd => 
rdd.context.makeRDD(rdd.top(60),4))        
        val absoluteTopCounts = topCounts.map(x => (math.abs(x._2)*x._1 , x._1 
)).reduce((a, b) => (a._1 + b._1, a._2 + b._2))
        val windowedFWA  = absoluteTopCounts.map(x => (x._1.toFloat/x._2))      
                        //Frequency Weighted Amplitude for Normal Data          
         val CMA = windowedFWA.map(r => 
(1,(r.toDouble,1,1))).updateStateByKey[(Double,Int,Int)](updateSum).map(_._2)   
                 val anomaly = CMA.map(x => (1L , x._3))        joinedResult(i) 
= anomaly.join(firstTimeAndFile)        joinedResult(i).map(x => 
"%s,%s,%s".format(x._2._2._2, x._2._2._1, 
x._2._1)).saveAsTextFiles("hdfs://host-10-1-4-90.novalocal:9000/user/hduser/output/")
        joinedResult(i).print
}))     

Regards,Laeeq

Reply via email to