Hello Sean, Thank you for your advice. Based on your suggestion, I've modified the code into the following (and once again admired the easy (!) verbosity of Java compared to 'complex and hard to understand' brevity (!) of Scala):
javaDStream.foreachRDD( new Function<JavaRDD<String>, Void>() { @Override public Void call(JavaRDD<String> stringJavaRDD) throws Exception { stringJavaRDD.foreachPartition( new VoidFunction<Iterator<String>>() { @Override public void call(Iterator<String> iteratorString) { return; } } ); return null; } }); I've tested the above in my application, and also observed it with Visual VM but could not see a dramatic speed difference (and small heap usage difference) compared to my initial version where I just use .count() in a foreachRDD block. Nevertheless I'll make more experiments to see if differences come up in terms of speed/memory. Kind regards, Emre Sevinç http://www.bigindustries.be/ On Thu, Feb 26, 2015 at 2:34 PM, Sean Owen <so...@cloudera.com> wrote: > Those do quite different things. One counts the data; the other copies > all of the data to the driver. > > The fastest way to materialize an RDD that I know of is > foreachPartition(i => None) (or equivalent no-op VoidFunction in > Java) > > On Thu, Feb 26, 2015 at 1:28 PM, Emre Sevinc <emre.sev...@gmail.com> > wrote: > > Hello, > > > > I have a piece of code to force the materialization of RDDs in my Spark > > Streaming program, and I'm trying to understand which method is faster > and > > has less memory consumption: > > > > javaDStream.foreachRDD(new Function<JavaRDD<String>, Void>() { > > @Override > > public Void call(JavaRDD<String> stringJavaRDD) throws Exception { > > > > //stringJavaRDD.collect(); > > > > // or count? > > > > //stringJavaRDD.count(); > > > > return null; > > } > > }); > > > > > > I've checked the source code of Spark at > > > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala > , > > and see that collect() is defined as: > > > > def collect(): Array[T] = { > > val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray) > > Array.concat(results: _*) > > } > > > > and count() defined as: > > > > def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum > > > > Therefore I think calling the count() method is faster and/or consumes > less > > memory, but I wanted to be sure. > > > > Anyone cares to comment? > > > > > > -- > > Emre Sevinç > > http://www.bigindustries.be/ > > > -- Emre Sevinc