Hi Andy, Regarding the foreachrdd return value, this Jira that will be in 1.6 should take care of that https://issues.apache.org/jira/browse/SPARK-4557 and make things a little simpler. On Dec 15, 2015 6:55 PM, "Andy Davidson" <a...@santacruzintegration.com> wrote:
> I am writing a JUnit test for some simple streaming code. I want to make > assertions about how many things are in a given JavaDStream. I wonder if there > is an easier way in Java to get the count? > > I think there are two points of friction. > > > 1. is it easy to create an accumulator of type double or int, How ever > Long is not supported > 2. We need to use javaDStream.foreachRDD. The Function interface must > return void. I was not able to define an accumulator in my driver and > use a lambda function. (I am new to lambda in Java) > > Here is a little lambda example that logs my test objects. I was not able > to figure out how to get to return a value or access a accumulator > > data.foreachRDD(rdd -> { > > logger.info(“Begin data.foreachRDD" ); > > for (MyPojo pojo : rdd.collect()) { > > logger.info("\n{}", pojo.toString()); > > } > > return null; > > }); > > > Any suggestions would be greatly appreciated > > Andy > > This following code works in my driver but is a lot of code for such a > trivial computation. Because it needs to the JavaSparkContext I do not > think it would work inside a closure. I assume the works do not have access > to the context as a global and that it shipping it in the closure is not a > good idea? > > public class JavaDStreamCount<T> implements Serializable { > > private static final long serialVersionUID = -3600586183332429887L; > > public static Logger logger = > LoggerFactory.getLogger(JavaDStreamCount.class); > > > > public Double hack(JavaSparkContext sc, JavaDStream<T> javaDStream) { > > Count c = new Count(sc); > > javaDStream.foreachRDD(c); > > return c.getTotal().value(); > > } > > > > class Count implements Function<JavaRDD<T>,Void> { > > private static final long serialVersionUID = > -5239727633710162488L; > > Accumulator<Double> total; > > > > public Count(JavaSparkContext sc) { > > total = sc.accumulator(0.0); > > } > > > > @Override > > public java.lang.Void call(JavaRDD<T> rdd) throws Exception { > > List<T> data = rdd.collect(); > > int dataSize = data.size(); > > logger.error("data.size:{}", dataSize); > > long num = rdd.count(); > > logger.error("num:{}", num); > > total.add(new Double(num)); > > return null; > > } > > > public Accumulator<Double> getTotal() { > > return total; > > } > > } > > } > > > > >