The following code seems to do what I want. I repartition on RDD not
DStreams. I wonder if this has to do with the way windows work?

   private static void saveTweetsCSV(JavaSparkContext jsc,
JavaDStream<TidyPojo> tidy, String outputURI) {

        tidy.foreachRDD(new VoidFunction2<JavaRDD< TidyPojo >, Time> () {

            private static final long serialVersionUID = 1L;

            // typically we use the CSV file format for data a human needs
to work with

            // We want to repartition the data so that we write the smallest
number

            // of files possible how ever the max number of rows in a given
csv

            // file is small enough for a human to work with easily.

            final long maxNumRowsPerFile = 100;



            @Override

            public void call(JavaRDD<TidyPojo> rdd, Time time) throws
Exception {

                long count = rdd.count();

                //if(!rdd.isEmpty()) {

                if (count > 0) {

                    long numPartisions = count / maxNumRowsPerFile + 1;

                    Long tmp = numPartisions;

                    rdd = rdd.repartition(tmp.intValue());

                    String dirPath = outputURI + "_CSV" + "-" +
time.milliseconds();

                    //
http://spark.apache.org/docs/latest/streaming-programming-guide.html#datafra
me-and-sql-operations

                    // Get the singleton instance of SQLContext

                    SQLContext sqlContext =
SQLContext.getOrCreate(rdd.context());

                   

                    DataFrame df = sqlContext.createDataFrame(rdd,
TidyTwitterMLPojo.class);

                    TidyPojo.saveCSV(df, dirPath);

                }  

            }

        });

    }


From:  Andrew Davidson <a...@santacruzintegration.com>
Date:  Friday, January 29, 2016 at 1:54 PM
To:  "user @spark" <user@spark.apache.org>
Subject:  How to use DStream<T> reparation() ?

> My Streaming app has a requirement that my output be saved in the smallest
> number of file possible such that each file does not exceed a max number of
> rows. Based on my experience it appears that each partition will be written to
> separate output file.
> 
> This was really easy to do in my batch processing using data frames and RDD.
> Its easy to call count() and then decide how many partitions I want and
> finally call repartition().
> 
> I am having heck of time trying to figure out to do the same thing using spark
> streaming. 
> 
> 
> JavaDStream<Pojo> tidy = Š
> 
> JavaDStream<Long> counts = tidy.count();
> 
> 
> 
> Bellow is the documentation for count. I do not see how I can use this to
> figure out how many partitions I need? Stream does not provide a collect().
> foreachRDD() can not return a value. I tried using an accumulator but that did
> not work
> 
> 
> 
> Any suggestions would be greatly appreciated
> 
> 
> http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/strea
> ming/api/java/JavaDStream.html
> count
> JavaDStream 
> <http://spark.apache.org/docs/latest/api/java/org/apache/spark/streaming/api/j
> ava/JavaDStream.html> <java.lang.Long> count()
> Return a new DStream in which each RDD has a single element generated by
> counting each RDD of this DStream.
> Returns:(undocumented)
> 


Reply via email to