You can use DStream.transform() to do any arbitrary RDD transformations on the RDDs generated by a DStream.
val coalescedDStream = myDStream.transform { _.coalesce(...) } On Tue, Mar 3, 2015 at 1:47 PM, Saiph Kappa <saiph.ka...@gmail.com> wrote: > Sorry I made a mistake in my code. Please ignore my question number 2. > Different numbers of partitions give *the same* results! > > > On Tue, Mar 3, 2015 at 7:32 PM, Saiph Kappa <saiph.ka...@gmail.com> wrote: > >> Hi, >> >> I have a spark streaming application, running on a single node, >> consisting mainly of map operations. I perform repartitioning to control >> the number of CPU cores that I want to use. The code goes like this: >> >> val ssc = new StreamingContext(sparkConf, Seconds(5)) >>> val distFile = ssc.textFileStream("/home/myuser/spark-example/dump") >>> val words = distFile.repartition(cores.toInt).flatMap(_.split(" ")) >>> .filter(_.length > 3) >>> >>> val wordCharValues = words.map(word => { >>> var sum = 0 >>> word.toCharArray.foreach(c => {sum += c.toInt}) >>> sum.toDouble / word.length.toDouble >>> }).foreachRDD(rdd => { >>> println("MEAN: " + rdd.mean()) >>> }) >>> >> >> I have 2 questions: >> 1) How can I use coalesce in this code instead of repartition? >> >> 2) Why, using the same dataset (which is a small file processed within a >> single batch), the result that I obtain for the mean varies with the number >> of partitions? If I don't call the repartition method, the result is always >> the same for every execution, as it should be. But repartitioning for >> instance in 2 partitions gives a different mean value than using 8 >> partitions. I really don't understand why given that my code is >> deterministic. Can someone enlighten me on this? >> >> Thanks. >> > >