This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new da52698 [SPARK-26757][GRAPHX] Return 0 for `count` on empty Edge/Vertex RDDs da52698 is described below commit da526985c7574dccdcc0cca7452e2e999a5b3012 Author: Huon Wilson <huon.wil...@data61.csiro.au> AuthorDate: Thu Jan 31 17:27:11 2019 -0600 [SPARK-26757][GRAPHX] Return 0 for `count` on empty Edge/Vertex RDDs ## What changes were proposed in this pull request? Previously a "java.lang.UnsupportedOperationException: empty collection" exception would be thrown due to using `reduce`, rather than `fold` or similar that can tolerate empty RDDs. This behaviour has existed for the Vertex RDDs since it was introduced in b30e0ae0351be1cbc0b1cf179293587b466ee026. It seems this behaviour was inherited by the Edge RDDs via copy-paste in ee29ef3800438501e0ff207feb00a28973fc0769. ## How was this patch tested? Two new unit tests. Closes #23681 from huonw/empty-graphx. Authored-by: Huon Wilson <huon.wil...@data61.csiro.au> Signed-off-by: Sean Owen <sean.o...@databricks.com> --- .../main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala | 2 +- .../scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala | 2 +- .../main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala | 2 +- .../src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala | 10 ++++++++++ .../test/scala/org/apache/spark/graphx/VertexRDDSuite.scala | 11 +++++++++++ .../scala/org/apache/spark/graphx/lib/SVDPlusPlusSuite.scala | 9 +++++++++ 6 files changed, 33 insertions(+), 3 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala index 376c7b0..eb8abd1 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala @@ -87,7 +87,7 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] ( /** The number of edges in the RDD. */ override def count(): Long = { - partitionsRDD.map(_._2.size.toLong).reduce(_ + _) + partitionsRDD.map(_._2.size.toLong).fold(0)(_ + _) } override def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDDImpl[ED2, VD] = diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala index 3c6f22d..2da9762 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala @@ -87,7 +87,7 @@ class VertexRDDImpl[VD] private[graphx] ( /** The number of vertices in the RDD. */ override def count(): Long = { - partitionsRDD.map(_.size.toLong).reduce(_ + _) + partitionsRDD.map(_.size.toLong).fold(0)(_ + _) } override private[graphx] def mapVertexPartitions[VD2: ClassTag]( diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala index 59fdd85..2847a4e 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala @@ -72,7 +72,7 @@ object SVDPlusPlus { // calculate global rating mean edges.cache() - val (rs, rc) = edges.map(e => (e.attr, 1L)).reduce((a, b) => (a._1 + b._1, a._2 + b._2)) + val (rs, rc) = edges.map(e => (e.attr, 1L)).fold((0, 0))((a, b) => (a._1 + b._1, a._2 + b._2)) val u = rs / rc // construct graph diff --git a/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala index 7a24e32..8fd3e6f 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala @@ -60,4 +60,14 @@ class EdgeRDDSuite extends SparkFunSuite with LocalSparkContext { } } + test("count") { + withSpark { sc => + val empty = EdgeRDD.fromEdges(sc.emptyRDD[Edge[Int]]) + assert(empty.count === 0) + + val edges = List(Edge(0, 1, ()), Edge(1, 2, ()), Edge(2, 0, ())) + val nonempty = EdgeRDD.fromEdges(sc.parallelize(edges)) + assert(nonempty.count === edges.size) + } + } } diff --git a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala index 8e63043..434e6a8 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala @@ -223,4 +223,15 @@ class VertexRDDSuite extends SparkFunSuite with LocalSparkContext { assert(verts.collect().toSeq === data) // test checkpointed RDD } } + + test("count") { + withSpark { sc => + val empty = VertexRDD(sc.emptyRDD[(Long, Unit)]) + assert(empty.count === 0) + + val n = 100 + val nonempty = vertices(sc, n) + assert(nonempty.count === n + 1) + } + } } diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/SVDPlusPlusSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/SVDPlusPlusSuite.scala index 2991438..da0457c 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/lib/SVDPlusPlusSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/SVDPlusPlusSuite.scala @@ -40,4 +40,13 @@ class SVDPlusPlusSuite extends SparkFunSuite with LocalSparkContext { } } + test("Test SVD++ with no edges") { + withSpark { sc => + val edges = sc.emptyRDD[Edge[Double]] + val conf = new SVDPlusPlus.Conf(10, 2, 0.0, 5.0, 0.007, 0.007, 0.005, 0.015) // 2 iterations + val (graph, _) = SVDPlusPlus.run(edges, conf) + assert(graph.vertices.count == 0) + assert(graph.edges.count == 0) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org