Hi, I am working with multiple Kafka streams (23 streams) and currently I am processing them separately. I receive one stream from each topic. I have the following questions. 1. Spark streaming guide suggests to union these streams. Is it possible to get statistics of each stream even after they are unioned? 2. My calculations are not complex. I use 2 second batch interval and if I use 2 streams they get easily processed under 2 seconds by a single core. There is some shuffling involved in my application. As I increase the number of streams and the number of executors accordingly, the applications scheduling delay increases and become unmanageable in 2 seconds. As I believe this happens because with that many streams, the number of tasks increases thus the shuffling magnifies and also that all streams using the same executors. Is it possible to provide part of executors to particular stream while processing streams simultaneously? E.g. if I have 15 cores on cluster and 5 streams, 5 cores will be taken by 5 receivers and of the rest 10, can I provide 2 cores each to one of the 5 streams. Just to add, increasing the batch interval does help but I don't want to increase the batch size due to application restrictions and delayed results (The blockInterval and defaultParallelism does help to a limited extent). Please see attach file for CODE SNIPPET Regards,Laeeq
//Setting system properties
val conf = new SparkConf().setMaster("spark://10.1.4.90:7077").setAppName("StreamAnomalyDetector") .setSparkHome(System.getenv("SPARK_HOME")) .setJars(List("target/scalaad-1.0-SNAPSHOT-jar-with-dependencies.jar")) .set("spark.executor.memory", "6g") .set("spark.executor.logs.rolling.strategy", "size") .set("spark.executor.logs.rolling.size.maxBytes", "1024") .set("spark.executor.logs.rolling.maxRetainedFiles", "3") .set("spark.speculation","true") .set("spark.locality.wait","1000") .set("spark.streaming.unpersist","true") .set("spark.streaming.blockInterval","100") .set("spark.default.parallelism","10") val zkQuorum = "10.1.4.144:2181,10.1.4.145:2181,10.1.4.146:2181,10.1.4.147:2181,10.1.4.148:2181" val group = "test-group" // Create the context val ssc = new StreamingContext(conf, Seconds(2)) //hdfs path to checkpoint old data ssc.checkpoint("hdfs://host-10-1-4-90.novalocal:9000/user/hduser/checkpointing") // Create the KafkaDStream for (a <- 0 to (args.length - 1)) { val eegStreams = KafkaUtils.createStream(ssc, zkQuorum, group, Map(args(a) -> 1),StorageLevel.MEMORY_ONLY).map(_._2) val keyAndValues = eegStreams.map(x=> { val token = x.split(",") (token(0),token(1),token(2)) }).persist(StorageLevel.MEMORY_ONLY) val timeAndFile = keyAndValues.map(x=> (math.round(x._1.toDouble),x._2)).window(Seconds(2), Seconds(2)) val firstTimeAndFile = timeAndFile.transform( rdd => rdd.context.makeRDD(rdd.sortByKey(true).take(1))).map(x=>(1L,(x._1,x._2))) val counts = keyAndValues.map(x => math.round(x._3.toDouble)).countByValueAndWindow(Seconds(2),Seconds(2)) val topCounts = counts.map(_.swap).transform( rdd => rdd.context.makeRDD(rdd.top(60), 10)) val absoluteTopCounts = topCounts.map(x => (math.abs(x._2)*x._1 , x._1 )).reduce((a, b) => (a._1 + b._1, a._2 + b._2)) val windowedFWA = absoluteTopCounts.map(x => (x._1.toFloat/x._2)) //Frequency Weighted Amplitude for Normal Data //CMA Stands for Cumulative Moving Average of Frequency Weighted Amplitude val CMA = windowedFWA.map(r => (1,(r.toDouble,1,1))).updateStateByKey[(Double,Int,Int)](updateSum).map(_._2) val anomaly = CMA.map(x => (1L , x._3)) val joinedResult = anomaly.join(firstTimeAndFile) joinedResult.map(x => "%s,%s,%s".format(x._2._2._2, x._2._2._1, x._2._1)).saveAsTextFiles("hdfs://host-10-1-4-90.novalocal:9000/user/hduser/output/" + (a+1)) joinedResult.print } ssc.start() ssc.awaitTermination() } }
--------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org