The following code seems to do what I want. I repartition on RDD not DStreams. I wonder if this has to do with the way windows work?
private static void saveTweetsCSV(JavaSparkContext jsc, JavaDStream<TidyPojo> tidy, String outputURI) { tidy.foreachRDD(new VoidFunction2<JavaRDD< TidyPojo >, Time> () { private static final long serialVersionUID = 1L; // typically we use the CSV file format for data a human needs to work with // We want to repartition the data so that we write the smallest number // of files possible how ever the max number of rows in a given csv // file is small enough for a human to work with easily. final long maxNumRowsPerFile = 100; @Override public void call(JavaRDD<TidyPojo> rdd, Time time) throws Exception { long count = rdd.count(); //if(!rdd.isEmpty()) { if (count > 0) { long numPartisions = count / maxNumRowsPerFile + 1; Long tmp = numPartisions; rdd = rdd.repartition(tmp.intValue()); String dirPath = outputURI + "_CSV" + "-" + time.milliseconds(); // http://spark.apache.org/docs/latest/streaming-programming-guide.html#datafra me-and-sql-operations // Get the singleton instance of SQLContext SQLContext sqlContext = SQLContext.getOrCreate(rdd.context()); DataFrame df = sqlContext.createDataFrame(rdd, TidyTwitterMLPojo.class); TidyPojo.saveCSV(df, dirPath); } } }); } From: Andrew Davidson <a...@santacruzintegration.com> Date: Friday, January 29, 2016 at 1:54 PM To: "user @spark" <user@spark.apache.org> Subject: How to use DStream<T> reparation() ? > My Streaming app has a requirement that my output be saved in the smallest > number of file possible such that each file does not exceed a max number of > rows. Based on my experience it appears that each partition will be written to > separate output file. > > This was really easy to do in my batch processing using data frames and RDD. > Its easy to call count() and then decide how many partitions I want and > finally call repartition(). > > I am having heck of time trying to figure out to do the same thing using spark > streaming. > > > JavaDStream<Pojo> tidy = > > JavaDStream<Long> counts = tidy.count(); > > > > Bellow is the documentation for count. I do not see how I can use this to > figure out how many partitions I need? Stream does not provide a collect(). > foreachRDD() can not return a value. I tried using an accumulator but that did > not work > > > > Any suggestions would be greatly appreciated > > > http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/strea > ming/api/java/JavaDStream.html > count > JavaDStream > <http://spark.apache.org/docs/latest/api/java/org/apache/spark/streaming/api/j > ava/JavaDStream.html> <java.lang.Long> count() > Return a new DStream in which each RDD has a single element generated by > counting each RDD of this DStream. > Returns:(undocumented) >