Forgot to mention. The reason I want the count is so that I can reparation
my data so that when I save it to disk each file has at 100 rows instead of
lots of smaller files
Kind regards
Andy
From: Andrew Davidson
Date: Thursday, January 28, 2016 at 6:41 PM
To: "user @spark"
Subject: looking for an easy way to count number of rows in JavaDStream
> There must be any easy way to count the number of rows in JavaDStream.
>
>
> JavaDStream words;
>
> JavaDStream hardToUse = words();
>
>
>
>
>
> JavaDStream does not seem to have a collect().
>
>
>
> The following works but is very clumsy
>
>
>
> Any suggestions would be greatly appreciated
>
>
>
> Andy
>
>
>
> public class JavaDStreamCount implements Serializable {
>
> private static final long serialVersionUID = -3600586183332429887L;
>
> public static Logger logger =
> LoggerFactory.getLogger(JavaDStreamCount.class);
>
>
>
> /**
>
> * TODO in 1.6 should be able to use a lambda function
>
> * @see https://issues.apache.org/jira/browse/SPARK-4557
>
> * @param total
>
> * @param javaDStream
>
> * @return
>
> */
>
> @Deprecated
>
> public Double hack(Accumulator total, JavaDStream javaDStream)
> {
>
> Count c = new Count(total);
>
> javaDStream.foreachRDD(c);
>
> return c.getTotal().value();
>
> }
>
>
>
> class Count implements Function {
>
> private static final long serialVersionUID = -5239727633710162488L;
>
> private Accumulator total;
>
>
>
> public Count(Accumulator total) {
>
> this.total = total;
>
> }
>
>
>
> @Override
>
> public java.lang.Void call(JavaRDD rdd) throws Exception {
>
> List data = rdd.collect();
>
> int dataSize = data.size();
>
> logger.info("Accumulator name:{} data.size:{}", total.name(),
> dataSize);
>
> long num = rdd.count();
>
> logger.info("num:{}", num);
>
> total.add(new Double(num));
>
> return null;
>
> }
>
>
>
> public Accumulator getTotal() {
>
> return total;
>
> }
>
>
>
> }
>
> }
>
>
>
>