minor improvements

Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/acb03230
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/acb03230
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/acb03230

Branch: refs/heads/master
Commit: acb0323053d270a377e497e975b2dfe59e2f997c
Parents: d6cded7
Author: Hossein Falaki <fal...@gmail.com>
Authored: Tue Dec 31 15:34:26 2013 -0800
Committer: Hossein Falaki <fal...@gmail.com>
Committed: Tue Dec 31 15:34:26 2013 -0800

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala | 5 ++---
 core/src/main/scala/org/apache/spark/rdd/RDD.scala              | 4 +++-
 2 files changed, 5 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/acb03230/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala 
b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 1dc5f8d..088b298 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -229,9 +229,8 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: 
RDD[(K, V)])
     }
     val mergeHLL = (h1: SerializableHyperLogLog, h2: SerializableHyperLogLog) 
=> h1.merge(h2)
 
-    combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).map {
-      case (k, v) => (k, v.value.cardinality())
-    }
+    combineByKey(createHLL, mergeValueHLL, mergeHLL, 
partitioner).mapValues(_.value.cardinality())
+
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/acb03230/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 74fab48..161fd06 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -809,7 +809,9 @@ abstract class RDD[T: ClassTag](
     }
     def mergeCounters(c1: SerializableHyperLogLog, c2: 
SerializableHyperLogLog) = c1.merge(c2)
 
-    mapPartitions(hllCountPartition).reduce(mergeCounters).value.cardinality()
+    val zeroCounter = new SerializableHyperLogLog(new HyperLogLog(relativeSD))
+    mapPartitions(hllCountPartition).aggregate(zeroCounter)(mergeCounters, 
mergeCounters)
+      .value.cardinality()
   }
 
   /**

Reply via email to