Forgot to mention. The reason I want the count is so that I can reparation my data so that when I save it to disk each file has at 100 rows instead of lots of smaller files
Kind regards Andy From: Andrew Davidson <a...@santacruzintegration.com> Date: Thursday, January 28, 2016 at 6:41 PM To: "user @spark" <user@spark.apache.org> Subject: looking for an easy way to count number of rows in JavaDStream > There must be any easy way to count the number of rows in JavaDStream. > > > JavaDStream<String> words; > > JavaDStream<Long> hardToUse = words(); > > > > > > JavaDStream does not seem to have a collect(). > > > > The following works but is very clumsy > > > > Any suggestions would be greatly appreciated > > > > Andy > > > > public class JavaDStreamCount<T> implements Serializable { > > private static final long serialVersionUID = -3600586183332429887L; > > public static Logger logger = > LoggerFactory.getLogger(JavaDStreamCount.class); > > > > /** > > * TODO in 1.6 should be able to use a lambda function > > * @see https://issues.apache.org/jira/browse/SPARK-4557 > > * @param total > > * @param javaDStream > > * @return > > */ > > @Deprecated > > public Double hack(Accumulator<Double> total, JavaDStream<T> javaDStream) > { > > Count c = new Count(total); > > javaDStream.foreachRDD(c); > > return c.getTotal().value(); > > } > > > > class Count implements Function<JavaRDD<T>,Void> { > > private static final long serialVersionUID = -5239727633710162488L; > > private Accumulator<Double> total; > > > > public Count(Accumulator<Double> total) { > > this.total = total; > > } > > > > @Override > > public java.lang.Void call(JavaRDD<T> rdd) throws Exception { > > List<T> data = rdd.collect(); > > int dataSize = data.size(); > > logger.info("Accumulator name:{} data.size:{}", total.name(), > dataSize); > > long num = rdd.count(); > > logger.info("num:{}", num); > > total.add(new Double(num)); > > return null; > > } > > > > public Accumulator<Double> getTotal() { > > return total; > > } > > > > } > > } > > > >