Hi Andy,

Regarding the foreachrdd return value, this Jira that will be in 1.6 should
take care of that https://issues.apache.org/jira/browse/SPARK-4557 and make
things a little simpler.
On Dec 15, 2015 6:55 PM, "Andy Davidson" <a...@santacruzintegration.com>
wrote:

> I am writing  a JUnit test for some simple streaming code. I want to make
> assertions about how many things are in a given JavaDStream. I wonder if there
> is an easier way in Java to get the count?
>
> I think there are two points of friction.
>
>
>    1. is it easy to create an accumulator of type double or int, How ever
>    Long is not supported
>    2. We need to use javaDStream.foreachRDD. The Function interface must
>    return void. I was not able to define an accumulator in my driver and
>    use a lambda function. (I am new to lambda in Java)
>
> Here is a little lambda example that logs my test objects. I was not able
> to figure out how to get  to return a value or access a accumulator
>
>        data.foreachRDD(rdd -> {
>
>             logger.info(“Begin data.foreachRDD" );
>
>             for (MyPojo pojo : rdd.collect()) {
>
>                 logger.info("\n{}", pojo.toString());
>
>             }
>
>             return null;
>
>         });
>
>
> Any suggestions would be greatly appreciated
>
> Andy
>
> This following code works in my driver but is a lot of code for such a
> trivial computation. Because it needs to the JavaSparkContext I do not
> think it would work inside a closure. I assume the works do not have access
> to the context as a global and that it shipping it in the closure is not a
> good idea?
>
> public class JavaDStreamCount<T> implements Serializable {
>
>     private static final long serialVersionUID = -3600586183332429887L;
>
>     public static Logger logger =
> LoggerFactory.getLogger(JavaDStreamCount.class);
>
>
>
>     public Double hack(JavaSparkContext sc, JavaDStream<T> javaDStream) {
>
>         Count c = new Count(sc);
>
>         javaDStream.foreachRDD(c);
>
>         return c.getTotal().value();
>
>     }
>
>
>
>     class Count implements Function<JavaRDD<T>,Void> {
>
>         private static final long serialVersionUID =
> -5239727633710162488L;
>
>         Accumulator<Double> total;
>
>
>
>         public Count(JavaSparkContext sc) {
>
>             total = sc.accumulator(0.0);
>
>         }
>
>
>
>         @Override
>
>         public java.lang.Void call(JavaRDD<T> rdd) throws Exception {
>
>             List<T> data = rdd.collect();
>
>             int dataSize = data.size();
>
>             logger.error("data.size:{}", dataSize);
>
>             long num = rdd.count();
>
>             logger.error("num:{}", num);
>
>             total.add(new Double(num));
>
>             return null;
>
>         }
>
>
>         public Accumulator<Double> getTotal() {
>
>             return total;
>
>         }
>
>     }
>
> }
>
>
>
>
>

Reply via email to