Hi,
I am working with multiple Kafka streams (23 streams) and currently I am 
processing them separately. I receive one stream from each topic. I have the 
following questions.
1.    Spark streaming guide suggests to union these streams. Is it possible to 
get statistics of each stream even after they are unioned?
2.    My calculations are not complex. I use 2 second batch interval and if I 
use 2 streams they get easily processed under 2 seconds by a single core. There 
is some shuffling involved in my application. As I increase the number of 
streams and the number of executors accordingly, the applications scheduling 
delay increases and become unmanageable in 2 seconds. As I believe this happens 
because with that many streams, the number of tasks increases thus the 
shuffling magnifies and also that all streams using the same executors. Is it 
possible to provide part of executors to particular stream while processing 
streams simultaneously? E.g. if I have 15 cores on cluster and 5 streams, 5  
cores will be taken by 5 receivers and of the rest 10, can I provide 2 cores 
each to one of the 5 streams. Just to add, increasing the batch interval does 
help but I don't want to increase the batch size due to application 
restrictions and delayed results (The blockInterval and defaultParallelism does 
help to a limited extent).          
Please see attach file for CODE SNIPPET
Regards,Laeeq 
        //Setting system properties

        val conf = new 
SparkConf().setMaster("spark://10.1.4.90:7077").setAppName("StreamAnomalyDetector")
        .setSparkHome(System.getenv("SPARK_HOME"))
        .setJars(List("target/scalaad-1.0-SNAPSHOT-jar-with-dependencies.jar"))
        .set("spark.executor.memory", "6g")
        .set("spark.executor.logs.rolling.strategy", "size") 
        .set("spark.executor.logs.rolling.size.maxBytes", "1024") 
        .set("spark.executor.logs.rolling.maxRetainedFiles", "3")
        .set("spark.speculation","true")
        .set("spark.locality.wait","1000")        
        .set("spark.streaming.unpersist","true")
        .set("spark.streaming.blockInterval","100")
        .set("spark.default.parallelism","10")

        val zkQuorum = 
"10.1.4.144:2181,10.1.4.145:2181,10.1.4.146:2181,10.1.4.147:2181,10.1.4.148:2181"
        val group = "test-group"
    
        // Create the context
        val ssc = new StreamingContext(conf, Seconds(2))
                
        //hdfs path to checkpoint old data
        
ssc.checkpoint("hdfs://host-10-1-4-90.novalocal:9000/user/hduser/checkpointing")
        
        // Create the KafkaDStream
        for (a <- 0 to (args.length - 1))
        {
        val eegStreams = KafkaUtils.createStream(ssc, zkQuorum, group, 
Map(args(a) -> 1),StorageLevel.MEMORY_ONLY).map(_._2)
        val keyAndValues = eegStreams.map(x=> { val token = x.split(",")
                                                (token(0),token(1),token(2))
                                                
}).persist(StorageLevel.MEMORY_ONLY)
                
        val timeAndFile = keyAndValues.map(x=> 
(math.round(x._1.toDouble),x._2)).window(Seconds(2), Seconds(2))
        val firstTimeAndFile = timeAndFile.transform( rdd => 
rdd.context.makeRDD(rdd.sortByKey(true).take(1))).map(x=>(1L,(x._1,x._2)))
        
        val counts = keyAndValues.map(x => 
math.round(x._3.toDouble)).countByValueAndWindow(Seconds(2),Seconds(2))
        val topCounts = counts.map(_.swap).transform( rdd => 
rdd.context.makeRDD(rdd.top(60), 10))       
        val absoluteTopCounts = topCounts.map(x => (math.abs(x._2)*x._1 , x._1 
)).reduce((a, b) => (a._1 + b._1, a._2 + b._2))
        val windowedFWA  = absoluteTopCounts.map(x => (x._1.toFloat/x._2))      
                        //Frequency Weighted Amplitude for Normal Data

        //CMA Stands for Cumulative Moving Average of Frequency Weighted 
Amplitude
        val CMA = windowedFWA.map(r => 
(1,(r.toDouble,1,1))).updateStateByKey[(Double,Int,Int)](updateSum).map(_._2)   
 
        val anomaly = CMA.map(x => (1L , x._3))
        val joinedResult = anomaly.join(firstTimeAndFile)
        joinedResult.map(x => "%s,%s,%s".format(x._2._2._2, x._2._2._1, 
x._2._1)).saveAsTextFiles("hdfs://host-10-1-4-90.novalocal:9000/user/hduser/output/"
 + (a+1))
        joinedResult.print
        }

    ssc.start()
    ssc.awaitTermination()
    }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to