Hello Sean,

Thank you for your advice. Based on your suggestion, I've modified the code
into the following (and once again admired the easy (!) verbosity of Java
compared to 'complex and hard to understand' brevity (!) of Scala):

javaDStream.foreachRDD(
            new Function<JavaRDD<String>, Void>() {
              @Override
              public Void call(JavaRDD<String> stringJavaRDD) throws
Exception {
                stringJavaRDD.foreachPartition(
                        new VoidFunction<Iterator<String>>() {
                          @Override
                          public void call(Iterator<String> iteratorString)
{
                            return;
                          }
                        }
                );

        return null;
      }
    });


I've tested the above in my application, and also observed it with Visual
VM but could not see a dramatic speed difference (and small heap usage
difference) compared to my initial version where I just use .count() in a
foreachRDD block.

Nevertheless I'll make more experiments to see if differences come up in
terms of speed/memory.

Kind regards,

Emre Sevinç
http://www.bigindustries.be/





On Thu, Feb 26, 2015 at 2:34 PM, Sean Owen <so...@cloudera.com> wrote:

> Those do quite different things. One counts the data; the other copies
> all of the data to the driver.
>
> The fastest way to materialize an RDD that I know of is
> foreachPartition(i => None)  (or equivalent no-op VoidFunction in
> Java)
>
> On Thu, Feb 26, 2015 at 1:28 PM, Emre Sevinc <emre.sev...@gmail.com>
> wrote:
> > Hello,
> >
> > I have a piece of code to force the materialization of RDDs in my Spark
> > Streaming program, and I'm trying to understand which method is faster
> and
> > has less memory consumption:
> >
> >   javaDStream.foreachRDD(new Function<JavaRDD<String>, Void>() {
> >       @Override
> >       public Void call(JavaRDD<String> stringJavaRDD) throws Exception {
> >
> >         //stringJavaRDD.collect();
> >
> >        // or count?
> >
> >         //stringJavaRDD.count();
> >
> >         return null;
> >       }
> >     });
> >
> >
> > I've checked the source code of Spark at
> >
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala
> ,
> > and see that collect() is defined as:
> >
> >   def collect(): Array[T] = {
> >     val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
> >    Array.concat(results: _*)
> >   }
> >
> > and count() defined as:
> >
> >   def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
> >
> > Therefore I think calling the count() method is faster and/or consumes
> less
> > memory, but I wanted to be sure.
> >
> > Anyone cares to comment?
> >
> >
> > --
> > Emre Sevinç
> > http://www.bigindustries.be/
> >
>



-- 
Emre Sevinc

Reply via email to