Hi guys, I have spark streaming application and I want to increase its performance.Basically its a design question. My input is like time, s1,s2 ...s23. I have to process these columns with same operations. I am running on 40 cores on 10 machines. 1. I am trying to get rid of the loop in the middle but could not think anything useful.2. When I remove saveAsTextFiles, it gets much better from 6 second to 2 second for each batch of 2 second. Any directions would be helpful. val eegStream = KafkaUtils.createStream(ssc, zkQuorum, group, Map(args(0) -> 1),StorageLevel.MEMORY_ONLY).map(_._2) val windowedEEG = eegStream.window(Seconds(2), Seconds(2)).cache val timeAndFile = windowedEEG.map(x=> { val token = x.split(",") (math.round(token(1).toDouble),token(0)) }) val firstTimeAndFile = timeAndFile.transform( rdd => rdd.context.makeRDD(rdd.sortByKey(true).take(1))).map(x=>(1L,(x._1,x._2))) ssc.sparkContext.parallelize((2 to 24).map(i=> { val counts = windowedEEG.map(x=> { val token = x.split(",") (math.round(token(i).toDouble)) }).countByValue() val topCounts = counts.map(_.swap).transform( rdd => rdd.context.makeRDD(rdd.top(60),4)) val absoluteTopCounts = topCounts.map(x => (math.abs(x._2)*x._1 , x._1 )).reduce((a, b) => (a._1 + b._1, a._2 + b._2)) val windowedFWA = absoluteTopCounts.map(x => (x._1.toFloat/x._2)) //Frequency Weighted Amplitude for Normal Data val CMA = windowedFWA.map(r => (1,(r.toDouble,1,1))).updateStateByKey[(Double,Int,Int)](updateSum).map(_._2) val anomaly = CMA.map(x => (1L , x._3)) joinedResult(i) = anomaly.join(firstTimeAndFile) joinedResult(i).map(x => "%s,%s,%s".format(x._2._2._2, x._2._2._1, x._2._1)).saveAsTextFiles("hdfs://host-10-1-4-90.novalocal:9000/user/hduser/output/") joinedResult(i).print }))
Regards,Laeeq