Forgot to mention. The reason I want the count is so that I can reparation
my data so that when I save it to disk each file has at 100 rows instead of
lots of smaller files

Kind regards

Andy

From:  Andrew Davidson <a...@santacruzintegration.com>
Date:  Thursday, January 28, 2016 at 6:41 PM
To:  "user @spark" <user@spark.apache.org>
Subject:  looking for an easy way to count number of rows in JavaDStream

> There must be any easy way to count the number of rows in JavaDStream.
> 
> 
> JavaDStream<String> words;
> 
> JavaDStream<Long> hardToUse = words();
> 
> 
> 
> 
> 
> JavaDStream does not seem to have a collect().
> 
> 
> 
> The following works but is very clumsy
> 
> 
> 
> Any suggestions would be greatly appreciated
> 
> 
> 
> Andy
> 
> 
> 
> public class JavaDStreamCount<T> implements Serializable {
> 
>     private static final long serialVersionUID = -3600586183332429887L;
> 
>     public static Logger logger =
> LoggerFactory.getLogger(JavaDStreamCount.class);
> 
>     
> 
>     /**
> 
>      * TODO in 1.6 should be able to use a lambda function
> 
>      * @see https://issues.apache.org/jira/browse/SPARK-4557
> 
>      * @param total
> 
>      * @param javaDStream
> 
>      * @return
> 
>      */
> 
>     @Deprecated
> 
>     public Double hack(Accumulator<Double> total, JavaDStream<T> javaDStream)
> {
> 
>         Count c = new Count(total);
> 
>         javaDStream.foreachRDD(c);
> 
>         return c.getTotal().value();
> 
>     }
> 
>     
> 
>     class Count implements Function<JavaRDD<T>,Void> {
> 
>         private static final long serialVersionUID = -5239727633710162488L;
> 
>         private Accumulator<Double> total;
> 
>         
> 
>         public Count(Accumulator<Double> total) {
> 
>             this.total = total;
> 
>         }
> 
>         
> 
>         @Override
> 
>         public java.lang.Void call(JavaRDD<T> rdd) throws Exception {
> 
>             List<T> data = rdd.collect();
> 
>             int dataSize = data.size();
> 
>             logger.info("Accumulator name:{} data.size:{}", total.name(),
> dataSize);
> 
>             long num = rdd.count();
> 
>             logger.info("num:{}", num);
> 
>             total.add(new Double(num));
> 
>             return null;
> 
>         }
> 
> 
> 
>         public Accumulator<Double> getTotal() {
> 
>             return total;
> 
>         }
> 
>         
> 
>     }
> 
> }
> 
> 
> 
> 


Reply via email to