Repository: spark Updated Branches: refs/heads/branch-2.1 80d583bd0 -> 47ab4afed
[SPARK-19003][DOCS] Add Java example in Spark Streaming Guide, section Design Patterns for using foreachRDD ## What changes were proposed in this pull request? Added missing Java example under section "Design Patterns for using foreachRDD". Now this section has examples in all 3 languages, improving consistency of documentation. ## How was this patch tested? Manual. Generated docs using command "SKIP_API=1 jekyll build" and verified generated HTML page manually. The syntax of example has been tested for correctness using sample code on Java1.7 and Spark 2.2.0-SNAPSHOT. Author: adesharatushar <tushar_adesh...@persistent.com> Closes #16408 from adesharatushar/streaming-doc-fix. (cherry picked from commit dba81e1dcdea1e8bd196c88d4810f9a04312acbf) Signed-off-by: Sean Owen <so...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/47ab4afe Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/47ab4afe Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/47ab4afe Branch: refs/heads/branch-2.1 Commit: 47ab4afed69bb019b4e0f85e26e52dc5cee338df Parents: 80d583b Author: adesharatushar <tushar_adesh...@persistent.com> Authored: Thu Dec 29 22:03:34 2016 +0000 Committer: Sean Owen <so...@cloudera.com> Committed: Thu Dec 29 22:03:42 2016 +0000 ---------------------------------------------------------------------- docs/streaming-programming-guide.md | 72 ++++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/47ab4afe/docs/streaming-programming-guide.md ---------------------------------------------------------------------- diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index 1fcd198..38b4f78 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -1246,6 +1246,22 @@ dstream.foreachRDD { rdd => } {% endhighlight %} </div> +<div data-lang="java" markdown="1"> +{% highlight java %} +dstream.foreachRDD(new VoidFunction<JavaRDD<String>>() { + @Override + public void call(JavaRDD<String> rdd) { + final Connection connection = createNewConnection(); // executed at the driver + rdd.foreach(new VoidFunction<String>() { + @Override + public void call(String record) { + connection.send(record); // executed at the worker + } + }); + } +}); +{% endhighlight %} +</div> <div data-lang="python" markdown="1"> {% highlight python %} def sendRecord(rdd): @@ -1279,6 +1295,23 @@ dstream.foreachRDD { rdd => } {% endhighlight %} </div> +<div data-lang="java" markdown="1"> +{% highlight java %} +dstream.foreachRDD(new VoidFunction<JavaRDD<String>>() { + @Override + public void call(JavaRDD<String> rdd) { + rdd.foreach(new VoidFunction<String>() { + @Override + public void call(String record) { + Connection connection = createNewConnection(); + connection.send(record); + connection.close(); + } + }); + } +}); +{% endhighlight %} +</div> <div data-lang="python" markdown="1"> {% highlight python %} def sendRecord(record): @@ -1309,6 +1342,25 @@ dstream.foreachRDD { rdd => } {% endhighlight %} </div> +<div data-lang="java" markdown="1"> +{% highlight java %} +dstream.foreachRDD(new VoidFunction<JavaRDD<String>>() { + @Override + public void call(JavaRDD<String> rdd) { + rdd.foreachPartition(new VoidFunction<Iterator<String>>() { + @Override + public void call(Iterator<String> partitionOfRecords) { + Connection connection = createNewConnection(); + while (partitionOfRecords.hasNext()) { + connection.send(partitionOfRecords.next()); + } + connection.close(); + } + }); + } +}); +{% endhighlight %} +</div> <div data-lang="python" markdown="1"> {% highlight python %} def sendPartition(iter): @@ -1342,6 +1394,26 @@ dstream.foreachRDD { rdd => {% endhighlight %} </div> +<div data-lang="java" markdown="1"> +{% highlight java %} +dstream.foreachRDD(new VoidFunction<JavaRDD<String>>() { + @Override + public void call(JavaRDD<String> rdd) { + rdd.foreachPartition(new VoidFunction<Iterator<String>>() { + @Override + public void call(Iterator<String> partitionOfRecords) { + // ConnectionPool is a static, lazily initialized pool of connections + Connection connection = ConnectionPool.getConnection(); + while (partitionOfRecords.hasNext()) { + connection.send(partitionOfRecords.next()); + } + ConnectionPool.returnConnection(connection); // return to the pool for future reuse + } + }); + } +}); +{% endhighlight %} +</div> <div data-lang="python" markdown="1"> {% highlight python %} def sendPartition(iter): --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org