Hi Mich

Yup I was surprised to find empty files. Its easy to work around. Note I
should probably use coalesce() and not repartition()

In general I found I almost always need to reparation. I was getting
thousands of empty partitions. It was really slowing my system down.

   private static void save(JavaDStream<String> json, String outputURIBase)
{

        /*

        using saveAsTestFiles will cause lots of empty directories to be
created.

        DStream<String> data = json.dstream();

        data.saveAsTextFiles(outputURI, null);

        */        

        

        jsonTweets.foreachRDD(new VoidFunction2<JavaRDD<String>, Time>() {

            private static final long serialVersionUID = 1L;

            @Override

            public void call(JavaRDD<String> rdd, Time time) throws
Exception {

                Long count = rdd.count();

                //if(!rdd.isEmpty()) {

                if(count > 0) {

                    rdd = repartition(rdd, count.intValue());

                    long milliSeconds = time.milliseconds();

                    String date =
Utils.convertMillisecondsToDateStr(milliSeconds);

                    String dirPath = outputURIBase

                            + File.separator +  date

                            + File.separator + "tweet-" +
time.milliseconds();

                    rdd.saveAsTextFile(dirPath);

                }  

            }

            

            final int maxNumRowsPerFile = 200;

            JavaRDD<String> repartition(JavaRDD<String> rdd, int count) {

                long numPartisions = count / maxNumRowsPerFile + 1;

                Long tmp = numPartisions;

                rdd = rdd.repartition(tmp.intValue());

                return rdd;

            }

        });

     

    }




From:  Mich Talebzadeh <mich.talebza...@gmail.com>
Date:  Tuesday, April 5, 2016 at 3:06 PM
To:  "user @spark" <user@spark.apache.org>
Subject:  Saving Spark streaming RDD with saveAsTextFiles ends up creating
empty files on HDFS

> Spark 1.6.1
> 
> The following creates empty files. It prints lines OK with println
> 
> val result = lines.filter(_.contains("ASE 15")).flatMap(line =>
> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _)
> result.saveAsTextFiles("/tmp/rdd_stuff")
> 
> I am getting zero length files
> 
> drwxr-xr-x   - hduser supergroup          0 2016-04-05 23:19
> /tmp/rdd_stuff-1459894755000
> drwxr-xr-x   - hduser supergroup          0 2016-04-05 23:20
> /tmp/rdd_stuff-1459894810000
> 
> Any ideas?
> 
> Thanks,
> 
> Dr Mich Talebzadeh
> 
>  
> 
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8
> Pw 
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV
> 8Pw> 
> 
>  
> 
> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
> 
>  


Reply via email to