[SPARK-9646] [SQL] Add metrics for all join and aggregate operators

This PR added metrics for all join and aggregate operators. However, I found 
the metrics may be confusing in the following two case:
1. The iterator is not totally consumed and the metric values will be less.
2. Recreating the iterators will make metric values look bigger than the size 
of the input source, such as `CartesianProduct`.

Author: zsxwing <zsxw...@gmail.com>

Closes #8060 from zsxwing/sql-metrics and squashes the following commits:

40f3fc1 [zsxwing] Mark LongSQLMetric private[metric] to avoid using incorrectly 
and leak memory
b1b9071 [zsxwing] Merge branch 'master' into sql-metrics
4bef25a [zsxwing] Add metrics for SortMergeOuterJoin
95ccfc6 [zsxwing] Merge branch 'master' into sql-metrics
67cb4dd [zsxwing] Add metrics for Project and TungstenProject; remove metrics 
from PhysicalRDD and LocalTableScan
0eb47d4 [zsxwing] Merge branch 'master' into sql-metrics
dd9d932 [zsxwing] Avoid creating new Iterators
589ea26 [zsxwing] Add metrics for all join and aggregate operators

(cherry picked from commit 5831294a7a8fa2524133c5d718cbc8187d2b0620)
Signed-off-by: Yin Huai <yh...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/767ee188
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/767ee188
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/767ee188

Branch: refs/heads/branch-1.5
Commit: 767ee1884b8ecba3afa8ed19a562626361d54f50
Parents: 71460b8
Author: zsxwing <zsxw...@gmail.com>
Authored: Tue Aug 11 12:39:13 2015 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Tue Aug 11 12:39:39 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/execution/Aggregate.scala  |  11 +
 .../spark/sql/execution/ExistingRDD.scala       |   2 -
 .../spark/sql/execution/LocalTableScan.scala    |   2 -
 .../apache/spark/sql/execution/SparkPlan.scala  |  25 +-
 .../aggregate/SortBasedAggregate.scala          |  12 +-
 .../SortBasedAggregationIterator.scala          |  18 +-
 .../execution/aggregate/TungstenAggregate.scala |  12 +-
 .../aggregate/TungstenAggregationIterator.scala |  11 +-
 .../spark/sql/execution/basicOperators.scala    |  36 +-
 .../sql/execution/joins/BroadcastHashJoin.scala |  30 +-
 .../joins/BroadcastHashOuterJoin.scala          |  40 +-
 .../joins/BroadcastLeftSemiJoinHash.scala       |  24 +-
 .../joins/BroadcastNestedLoopJoin.scala         |  27 +-
 .../sql/execution/joins/CartesianProduct.scala  |  25 +-
 .../spark/sql/execution/joins/HashJoin.scala    |   7 +-
 .../sql/execution/joins/HashOuterJoin.scala     |  30 +-
 .../sql/execution/joins/HashSemiJoin.scala      |  23 +-
 .../sql/execution/joins/HashedRelation.scala    |   8 +-
 .../sql/execution/joins/LeftSemiJoinBNL.scala   |  19 +-
 .../sql/execution/joins/LeftSemiJoinHash.scala  |  18 +-
 .../sql/execution/joins/ShuffledHashJoin.scala  |  16 +-
 .../execution/joins/ShuffledHashOuterJoin.scala |  29 +-
 .../sql/execution/joins/SortMergeJoin.scala     |  21 +-
 .../execution/joins/SortMergeOuterJoin.scala    |  38 +-
 .../spark/sql/execution/metric/SQLMetrics.scala |   6 +
 .../execution/joins/HashedRelationSuite.scala   |  14 +-
 .../sql/execution/metric/SQLMetricsSuite.scala  | 450 ++++++++++++++++++-
 27 files changed, 847 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/767ee188/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
index e8c6a0f..f3b6a3a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.errors._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.metric.SQLMetrics
 
 /**
  * :: DeveloperApi ::
@@ -45,6 +46,10 @@ case class Aggregate(
     child: SparkPlan)
   extends UnaryNode {
 
+  override private[sql] lazy val metrics = Map(
+    "numInputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of 
input rows"),
+    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of 
output rows"))
+
   override def requiredChildDistribution: List[Distribution] = {
     if (partial) {
       UnspecifiedDistribution :: Nil
@@ -121,12 +126,15 @@ case class Aggregate(
   }
 
   protected override def doExecute(): RDD[InternalRow] = attachTree(this, 
"execute") {
+    val numInputRows = longMetric("numInputRows")
+    val numOutputRows = longMetric("numOutputRows")
     if (groupingExpressions.isEmpty) {
       child.execute().mapPartitions { iter =>
         val buffer = newAggregateBuffer()
         var currentRow: InternalRow = null
         while (iter.hasNext) {
           currentRow = iter.next()
+          numInputRows += 1
           var i = 0
           while (i < buffer.length) {
             buffer(i).update(currentRow)
@@ -142,6 +150,7 @@ case class Aggregate(
           i += 1
         }
 
+        numOutputRows += 1
         Iterator(resultProjection(aggregateResults))
       }
     } else {
@@ -152,6 +161,7 @@ case class Aggregate(
         var currentRow: InternalRow = null
         while (iter.hasNext) {
           currentRow = iter.next()
+          numInputRows += 1
           val currentGroup = groupingProjection(currentRow)
           var currentBuffer = hashTable.get(currentGroup)
           if (currentBuffer == null) {
@@ -180,6 +190,7 @@ case class Aggregate(
             val currentEntry = hashTableIter.next()
             val currentGroup = currentEntry.getKey
             val currentBuffer = currentEntry.getValue
+            numOutputRows += 1
 
             var i = 0
             while (i < currentBuffer.length) {

http://git-wip-us.apache.org/repos/asf/spark/blob/767ee188/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index cae7ca5..abb60cf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -99,8 +99,6 @@ private[sql] case class PhysicalRDD(
     rdd: RDD[InternalRow],
     extraInformation: String) extends LeafNode {
 
-  override protected[sql] val trackNumOfRowsEnabled = true
-
   protected override def doExecute(): RDD[InternalRow] = rdd
 
   override def simpleString: String = "Scan " + extraInformation + 
output.mkString("[", ",", "]")

http://git-wip-us.apache.org/repos/asf/spark/blob/767ee188/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
index 858dd85..34e926e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
@@ -30,8 +30,6 @@ private[sql] case class LocalTableScan(
     output: Seq[Attribute],
     rows: Seq[InternalRow]) extends LeafNode {
 
-  override protected[sql] val trackNumOfRowsEnabled = true
-
   private lazy val rdd = sqlContext.sparkContext.parallelize(rows)
 
   protected override def doExecute(): RDD[InternalRow] = rdd

http://git-wip-us.apache.org/repos/asf/spark/blob/767ee188/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 9ba5cf2..72f5450 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -81,22 +81,9 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with 
Logging with Serializ
   }
 
   /**
-   * Whether track the number of rows output by this SparkPlan
-   */
-  protected[sql] def trackNumOfRowsEnabled: Boolean = false
-
-  private lazy val defaultMetrics: Map[String, SQLMetric[_, _]] =
-    if (trackNumOfRowsEnabled) {
-      Map("numRows" -> SQLMetrics.createLongMetric(sparkContext, "number of 
rows"))
-    }
-    else {
-      Map.empty
-    }
-
-  /**
    * Return all metrics containing metrics of this SparkPlan.
    */
-  private[sql] def metrics: Map[String, SQLMetric[_, _]] = defaultMetrics
+  private[sql] def metrics: Map[String, SQLMetric[_, _]] = Map.empty
 
   /**
    * Return a LongSQLMetric according to the name.
@@ -150,15 +137,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with 
Logging with Serializ
     }
     RDDOperationScope.withScope(sparkContext, nodeName, false, true) {
       prepare()
-      if (trackNumOfRowsEnabled) {
-        val numRows = longMetric("numRows")
-        doExecute().map { row =>
-          numRows += 1
-          row
-        }
-      } else {
-        doExecute()
-      }
+      doExecute()
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/767ee188/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala
index ad428ad..ab26f9c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.physical.{UnspecifiedDistribution, 
ClusteredDistribution, AllTuples, Distribution}
 import org.apache.spark.sql.execution.{UnsafeFixedWidthAggregationMap, 
SparkPlan, UnaryNode}
+import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.types.StructType
 
 case class SortBasedAggregate(
@@ -38,6 +39,10 @@ case class SortBasedAggregate(
     child: SparkPlan)
   extends UnaryNode {
 
+  override private[sql] lazy val metrics = Map(
+    "numInputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of 
input rows"),
+    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of 
output rows"))
+
   override def outputsUnsafeRows: Boolean = false
 
   override def canProcessUnsafeRows: Boolean = false
@@ -63,6 +68,8 @@ case class SortBasedAggregate(
   }
 
   protected override def doExecute(): RDD[InternalRow] = attachTree(this, 
"execute") {
+    val numInputRows = longMetric("numInputRows")
+    val numOutputRows = longMetric("numOutputRows")
     child.execute().mapPartitions { iter =>
       // Because the constructor of an aggregation iterator will read at least 
the first row,
       // we need to get the value of iter.hasNext first.
@@ -84,10 +91,13 @@ case class SortBasedAggregate(
           newProjection _,
           child.output,
           iter,
-          outputsUnsafeRows)
+          outputsUnsafeRows,
+          numInputRows,
+          numOutputRows)
         if (!hasInput && groupingExpressions.isEmpty) {
           // There is no input and there is no grouping expressions.
           // We need to output a single row as the output.
+          numOutputRows += 1
           
Iterator[InternalRow](outputIter.outputForEmptyGroupingKeyWithoutInput())
         } else {
           outputIter

http://git-wip-us.apache.org/repos/asf/spark/blob/767ee188/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
index 67ebafd..73d50e0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.aggregate
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import 
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression2, 
AggregateFunction2}
+import org.apache.spark.sql.execution.metric.LongSQLMetric
 import org.apache.spark.unsafe.KVIterator
 
 /**
@@ -37,7 +38,9 @@ class SortBasedAggregationIterator(
     initialInputBufferOffset: Int,
     resultExpressions: Seq[NamedExpression],
     newMutableProjection: (Seq[Expression], Seq[Attribute]) => (() => 
MutableProjection),
-    outputsUnsafeRows: Boolean)
+    outputsUnsafeRows: Boolean,
+    numInputRows: LongSQLMetric,
+    numOutputRows: LongSQLMetric)
   extends AggregationIterator(
     groupingKeyAttributes,
     valueAttributes,
@@ -103,6 +106,7 @@ class SortBasedAggregationIterator(
       // Get the grouping key.
       val groupingKey = inputKVIterator.getKey
       val currentRow = inputKVIterator.getValue
+      numInputRows += 1
 
       // Check if the current row belongs the current input row.
       if (currentGroupingKey == groupingKey) {
@@ -137,7 +141,7 @@ class SortBasedAggregationIterator(
       val outputRow = generateOutput(currentGroupingKey, 
sortBasedAggregationBuffer)
       // Initialize buffer values for the next group.
       initializeBuffer(sortBasedAggregationBuffer)
-
+      numOutputRows += 1
       outputRow
     } else {
       // no more result
@@ -151,7 +155,7 @@ class SortBasedAggregationIterator(
 
       nextGroupingKey = inputKVIterator.getKey().copy()
       firstRowInNextGroup = inputKVIterator.getValue().copy()
-
+      numInputRows += 1
       sortedInputHasNewGroup = true
     } else {
       // This inputIter is empty.
@@ -181,7 +185,9 @@ object SortBasedAggregationIterator {
       newProjection: (Seq[Expression], Seq[Attribute]) => Projection,
       inputAttributes: Seq[Attribute],
       inputIter: Iterator[InternalRow],
-      outputsUnsafeRows: Boolean): SortBasedAggregationIterator = {
+      outputsUnsafeRows: Boolean,
+      numInputRows: LongSQLMetric,
+      numOutputRows: LongSQLMetric): SortBasedAggregationIterator = {
     val kvIterator = if (UnsafeProjection.canSupport(groupingExprs)) {
       AggregationIterator.unsafeKVIterator(
         groupingExprs,
@@ -202,7 +208,9 @@ object SortBasedAggregationIterator {
       initialInputBufferOffset,
       resultExpressions,
       newMutableProjection,
-      outputsUnsafeRows)
+      outputsUnsafeRows,
+      numInputRows,
+      numOutputRows)
   }
   // scalastyle:on
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/767ee188/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
index 1694794..6b5935a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
@@ -24,6 +24,7 @@ import 
org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression2
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical.{UnspecifiedDistribution, 
ClusteredDistribution, AllTuples, Distribution}
 import org.apache.spark.sql.execution.{UnaryNode, SparkPlan}
+import org.apache.spark.sql.execution.metric.SQLMetrics
 
 case class TungstenAggregate(
     requiredChildDistributionExpressions: Option[Seq[Expression]],
@@ -35,6 +36,10 @@ case class TungstenAggregate(
     child: SparkPlan)
   extends UnaryNode {
 
+  override private[sql] lazy val metrics = Map(
+    "numInputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of 
input rows"),
+    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of 
output rows"))
+
   override def outputsUnsafeRows: Boolean = true
 
   override def canProcessUnsafeRows: Boolean = true
@@ -61,6 +66,8 @@ case class TungstenAggregate(
   }
 
   protected override def doExecute(): RDD[InternalRow] = attachTree(this, 
"execute") {
+    val numInputRows = longMetric("numInputRows")
+    val numOutputRows = longMetric("numOutputRows")
     child.execute().mapPartitions { iter =>
       val hasInput = iter.hasNext
       if (!hasInput && groupingExpressions.nonEmpty) {
@@ -78,9 +85,12 @@ case class TungstenAggregate(
             newMutableProjection,
             child.output,
             iter,
-            testFallbackStartsAt)
+            testFallbackStartsAt,
+            numInputRows,
+            numOutputRows)
 
         if (!hasInput && groupingExpressions.isEmpty) {
+          numOutputRows += 1
           
Iterator.single[UnsafeRow](aggregationIterator.outputForEmptyGroupingKeyWithoutInput())
         } else {
           aggregationIterator

http://git-wip-us.apache.org/repos/asf/spark/blob/767ee188/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
index 3216090..1f383dd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
 import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.{UnsafeKVExternalSorter, 
UnsafeFixedWidthAggregationMap}
+import org.apache.spark.sql.execution.metric.LongSQLMetric
 import org.apache.spark.sql.types.StructType
 
 /**
@@ -83,7 +84,9 @@ class TungstenAggregationIterator(
     newMutableProjection: (Seq[Expression], Seq[Attribute]) => (() => 
MutableProjection),
     originalInputAttributes: Seq[Attribute],
     inputIter: Iterator[InternalRow],
-    testFallbackStartsAt: Option[Int])
+    testFallbackStartsAt: Option[Int],
+    numInputRows: LongSQLMetric,
+    numOutputRows: LongSQLMetric)
   extends Iterator[UnsafeRow] with Logging {
 
   ///////////////////////////////////////////////////////////////////////////
@@ -352,6 +355,7 @@ class TungstenAggregationIterator(
   private def processInputs(): Unit = {
     while (!sortBased && inputIter.hasNext) {
       val newInput = inputIter.next()
+      numInputRows += 1
       val groupingKey = groupProjection.apply(newInput)
       val buffer: UnsafeRow = 
hashMap.getAggregationBufferFromUnsafeRow(groupingKey)
       if (buffer == null) {
@@ -371,6 +375,7 @@ class TungstenAggregationIterator(
     var i = 0
     while (!sortBased && inputIter.hasNext) {
       val newInput = inputIter.next()
+      numInputRows += 1
       val groupingKey = groupProjection.apply(newInput)
       val buffer: UnsafeRow = if (i < fallbackStartsAt) {
         hashMap.getAggregationBufferFromUnsafeRow(groupingKey)
@@ -439,6 +444,7 @@ class TungstenAggregationIterator(
       // Process the rest of input rows.
       while (inputIter.hasNext) {
         val newInput = inputIter.next()
+        numInputRows += 1
         val groupingKey = groupProjection.apply(newInput)
         buffer.copyFrom(initialAggregationBuffer)
         processRow(buffer, newInput)
@@ -462,6 +468,7 @@ class TungstenAggregationIterator(
       // Insert the rest of input rows.
       while (inputIter.hasNext) {
         val newInput = inputIter.next()
+        numInputRows += 1
         val groupingKey = groupProjection.apply(newInput)
         bufferExtractor(newInput)
         externalSorter.insertKV(groupingKey, buffer)
@@ -657,7 +664,7 @@ class TungstenAggregationIterator(
         TaskContext.get().internalMetricsToAccumulators(
           InternalAccumulator.PEAK_EXECUTION_MEMORY).add(peakMemory)
       }
-
+      numOutputRows += 1
       res
     } else {
       // no more result

http://git-wip-us.apache.org/repos/asf/spark/blob/767ee188/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index bf2de24..247c900 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -41,11 +41,20 @@ import org.apache.spark.{HashPartitioner, SparkEnv}
 case class Project(projectList: Seq[NamedExpression], child: SparkPlan) 
extends UnaryNode {
   override def output: Seq[Attribute] = projectList.map(_.toAttribute)
 
+  override private[sql] lazy val metrics = Map(
+    "numRows" -> SQLMetrics.createLongMetric(sparkContext, "number of rows"))
+
   @transient lazy val buildProjection = newMutableProjection(projectList, 
child.output)
 
-  protected override def doExecute(): RDD[InternalRow] = 
child.execute().mapPartitions { iter =>
-    val reusableProjection = buildProjection()
-    iter.map(reusableProjection)
+  protected override def doExecute(): RDD[InternalRow] = {
+    val numRows = longMetric("numRows")
+    child.execute().mapPartitions { iter =>
+      val reusableProjection = buildProjection()
+      iter.map { row =>
+        numRows += 1
+        reusableProjection(row)
+      }
+    }
   }
 
   override def outputOrdering: Seq[SortOrder] = child.outputOrdering
@@ -57,19 +66,28 @@ case class Project(projectList: Seq[NamedExpression], 
child: SparkPlan) extends
  */
 case class TungstenProject(projectList: Seq[NamedExpression], child: 
SparkPlan) extends UnaryNode {
 
+  override private[sql] lazy val metrics = Map(
+    "numRows" -> SQLMetrics.createLongMetric(sparkContext, "number of rows"))
+
   override def outputsUnsafeRows: Boolean = true
   override def canProcessUnsafeRows: Boolean = true
   override def canProcessSafeRows: Boolean = true
 
   override def output: Seq[Attribute] = projectList.map(_.toAttribute)
 
-  protected override def doExecute(): RDD[InternalRow] = 
child.execute().mapPartitions { iter =>
-    this.transformAllExpressions {
-      case CreateStruct(children) => CreateStructUnsafe(children)
-      case CreateNamedStruct(children) => CreateNamedStructUnsafe(children)
+  protected override def doExecute(): RDD[InternalRow] = {
+    val numRows = longMetric("numRows")
+    child.execute().mapPartitions { iter =>
+      this.transformAllExpressions {
+        case CreateStruct(children) => CreateStructUnsafe(children)
+        case CreateNamedStruct(children) => CreateNamedStructUnsafe(children)
+      }
+      val project = UnsafeProjection.create(projectList, child.output)
+      iter.map { row =>
+        numRows += 1
+        project(row)
+      }
     }
-    val project = UnsafeProjection.create(projectList, child.output)
-    iter.map(project)
   }
 
   override def outputOrdering: Seq[SortOrder] = child.outputOrdering

http://git-wip-us.apache.org/repos/asf/spark/blob/767ee188/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
index f7a68e4..2e108cb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.plans.physical.{Distribution, 
Partitioning, UnspecifiedDistribution}
 import org.apache.spark.sql.execution.{BinaryNode, SQLExecution, SparkPlan}
+import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.util.ThreadUtils
 import org.apache.spark.{InternalAccumulator, TaskContext}
 
@@ -45,7 +46,10 @@ case class BroadcastHashJoin(
     right: SparkPlan)
   extends BinaryNode with HashJoin {
 
-  override protected[sql] val trackNumOfRowsEnabled = true
+  override private[sql] lazy val metrics = Map(
+    "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left 
rows"),
+    "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of 
right rows"),
+    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of 
output rows"))
 
   val timeout: Duration = {
     val timeoutValue = sqlContext.conf.broadcastTimeout
@@ -65,6 +69,11 @@ case class BroadcastHashJoin(
   // for the same query.
   @transient
   private lazy val broadcastFuture = {
+    val numBuildRows = buildSide match {
+      case BuildLeft => longMetric("numLeftRows")
+      case BuildRight => longMetric("numRightRows")
+    }
+
     // broadcastFuture is used in "doExecute". Therefore we can get the 
execution id correctly here.
     val executionId = 
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
     future {
@@ -73,8 +82,15 @@ case class BroadcastHashJoin(
       SQLExecution.withExecutionId(sparkContext, executionId) {
         // Note that we use .execute().collect() because we don't want to 
convert data to Scala
         // types
-        val input: Array[InternalRow] = 
buildPlan.execute().map(_.copy()).collect()
-        val hashed = HashedRelation(input.iterator, buildSideKeyGenerator, 
input.size)
+        val input: Array[InternalRow] = buildPlan.execute().map { row =>
+          numBuildRows += 1
+          row.copy()
+        }.collect()
+        // The following line doesn't run in a job so we cannot track the 
metric value. However, we
+        // have already tracked it in the above lines. So here we can use
+        // `SQLMetrics.nullLongMetric` to ignore it.
+        val hashed = HashedRelation(
+          input.iterator, SQLMetrics.nullLongMetric, buildSideKeyGenerator, 
input.size)
         sparkContext.broadcast(hashed)
       }
     }(BroadcastHashJoin.broadcastHashJoinExecutionContext)
@@ -85,6 +101,12 @@ case class BroadcastHashJoin(
   }
 
   protected override def doExecute(): RDD[InternalRow] = {
+    val numStreamedRows = buildSide match {
+      case BuildLeft => longMetric("numRightRows")
+      case BuildRight => longMetric("numLeftRows")
+    }
+    val numOutputRows = longMetric("numOutputRows")
+
     val broadcastRelation = Await.result(broadcastFuture, timeout)
 
     streamedPlan.execute().mapPartitions { streamedIter =>
@@ -95,7 +117,7 @@ case class BroadcastHashJoin(
             
InternalAccumulator.PEAK_EXECUTION_MEMORY).add(unsafe.getUnsafeSize)
         case _ =>
       }
-      hashJoin(streamedIter, hashedRelation)
+      hashJoin(streamedIter, numStreamedRows, hashedRelation, numOutputRows)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/767ee188/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
index a3626de..69a8b95 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical.{Distribution, 
Partitioning, UnspecifiedDistribution}
 import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, RightOuter}
 import org.apache.spark.sql.execution.{BinaryNode, SQLExecution, SparkPlan}
+import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.{InternalAccumulator, TaskContext}
 
 /**
@@ -45,6 +46,11 @@ case class BroadcastHashOuterJoin(
     left: SparkPlan,
     right: SparkPlan) extends BinaryNode with HashOuterJoin {
 
+  override private[sql] lazy val metrics = Map(
+    "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left 
rows"),
+    "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of 
right rows"),
+    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of 
output rows"))
+
   val timeout = {
     val timeoutValue = sqlContext.conf.broadcastTimeout
     if (timeoutValue < 0) {
@@ -63,6 +69,14 @@ case class BroadcastHashOuterJoin(
   // for the same query.
   @transient
   private lazy val broadcastFuture = {
+    val numBuildRows = joinType match {
+      case RightOuter => longMetric("numLeftRows")
+      case LeftOuter => longMetric("numRightRows")
+      case x =>
+        throw new IllegalArgumentException(
+          s"HashOuterJoin should not take $x as the JoinType")
+    }
+
     // broadcastFuture is used in "doExecute". Therefore we can get the 
execution id correctly here.
     val executionId = 
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
     future {
@@ -71,8 +85,15 @@ case class BroadcastHashOuterJoin(
       SQLExecution.withExecutionId(sparkContext, executionId) {
         // Note that we use .execute().collect() because we don't want to 
convert data to Scala
         // types
-        val input: Array[InternalRow] = 
buildPlan.execute().map(_.copy()).collect()
-        val hashed = HashedRelation(input.iterator, buildKeyGenerator, 
input.size)
+        val input: Array[InternalRow] = buildPlan.execute().map { row =>
+          numBuildRows += 1
+          row.copy()
+        }.collect()
+        // The following line doesn't run in a job so we cannot track the 
metric value. However, we
+        // have already tracked it in the above lines. So here we can use
+        // `SQLMetrics.nullLongMetric` to ignore it.
+        val hashed = HashedRelation(
+          input.iterator, SQLMetrics.nullLongMetric, buildKeyGenerator, 
input.size)
         sparkContext.broadcast(hashed)
       }
     }(BroadcastHashJoin.broadcastHashJoinExecutionContext)
@@ -83,6 +104,15 @@ case class BroadcastHashOuterJoin(
   }
 
   override def doExecute(): RDD[InternalRow] = {
+    val numStreamedRows = joinType match {
+      case RightOuter => longMetric("numRightRows")
+      case LeftOuter => longMetric("numLeftRows")
+      case x =>
+        throw new IllegalArgumentException(
+          s"HashOuterJoin should not take $x as the JoinType")
+    }
+    val numOutputRows = longMetric("numOutputRows")
+
     val broadcastRelation = Await.result(broadcastFuture, timeout)
 
     streamedPlan.execute().mapPartitions { streamedIter =>
@@ -101,16 +131,18 @@ case class BroadcastHashOuterJoin(
       joinType match {
         case LeftOuter =>
           streamedIter.flatMap(currentRow => {
+            numStreamedRows += 1
             val rowKey = keyGenerator(currentRow)
             joinedRow.withLeft(currentRow)
-            leftOuterIterator(rowKey, joinedRow, hashTable.get(rowKey), 
resultProj)
+            leftOuterIterator(rowKey, joinedRow, hashTable.get(rowKey), 
resultProj, numOutputRows)
           })
 
         case RightOuter =>
           streamedIter.flatMap(currentRow => {
+            numStreamedRows += 1
             val rowKey = keyGenerator(currentRow)
             joinedRow.withRight(currentRow)
-            rightOuterIterator(rowKey, hashTable.get(rowKey), joinedRow, 
resultProj)
+            rightOuterIterator(rowKey, hashTable.get(rowKey), joinedRow, 
resultProj, numOutputRows)
           })
 
         case x =>

http://git-wip-us.apache.org/repos/asf/spark/blob/767ee188/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
index 5bd06fb..78a8c16 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
@@ -23,6 +23,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
+import org.apache.spark.sql.execution.metric.SQLMetrics
 
 /**
  * :: DeveloperApi ::
@@ -37,18 +38,31 @@ case class BroadcastLeftSemiJoinHash(
     right: SparkPlan,
     condition: Option[Expression]) extends BinaryNode with HashSemiJoin {
 
+  override private[sql] lazy val metrics = Map(
+    "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left 
rows"),
+    "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of 
right rows"),
+    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of 
output rows"))
+
   protected override def doExecute(): RDD[InternalRow] = {
-    val input = right.execute().map(_.copy()).collect()
+    val numLeftRows = longMetric("numLeftRows")
+    val numRightRows = longMetric("numRightRows")
+    val numOutputRows = longMetric("numOutputRows")
+
+    val input = right.execute().map { row =>
+      numRightRows += 1
+      row.copy()
+    }.collect()
 
     if (condition.isEmpty) {
-      val hashSet = buildKeyHashSet(input.toIterator)
+      val hashSet = buildKeyHashSet(input.toIterator, 
SQLMetrics.nullLongMetric)
       val broadcastedRelation = sparkContext.broadcast(hashSet)
 
       left.execute().mapPartitions { streamIter =>
-        hashSemiJoin(streamIter, broadcastedRelation.value)
+        hashSemiJoin(streamIter, numLeftRows, broadcastedRelation.value, 
numOutputRows)
       }
     } else {
-      val hashRelation = HashedRelation(input.toIterator, rightKeyGenerator, 
input.size)
+      val hashRelation =
+        HashedRelation(input.toIterator, SQLMetrics.nullLongMetric, 
rightKeyGenerator, input.size)
       val broadcastedRelation = sparkContext.broadcast(hashRelation)
 
       left.execute().mapPartitions { streamIter =>
@@ -59,7 +73,7 @@ case class BroadcastLeftSemiJoinHash(
               
InternalAccumulator.PEAK_EXECUTION_MEMORY).add(unsafe.getUnsafeSize)
           case _ =>
         }
-        hashSemiJoin(streamIter, hashedRelation)
+        hashSemiJoin(streamIter, numLeftRows, hashedRelation, numOutputRows)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/767ee188/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
index 017a44b..28c88b1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.catalyst.plans.{FullOuter, JoinType, LeftOuter, 
RightOuter}
 import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
+import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.util.collection.CompactBuffer
 
 /**
@@ -38,6 +39,11 @@ case class BroadcastNestedLoopJoin(
     condition: Option[Expression]) extends BinaryNode {
   // TODO: Override requiredChildDistribution.
 
+  override private[sql] lazy val metrics = Map(
+    "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left 
rows"),
+    "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of 
right rows"),
+    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of 
output rows"))
+
   /** BuildRight means the right relation <=> the broadcast relation. */
   private val (streamed, broadcast) = buildSide match {
     case BuildRight => (left, right)
@@ -75,9 +81,17 @@ case class BroadcastNestedLoopJoin(
     newPredicate(condition.getOrElse(Literal(true)), left.output ++ 
right.output)
 
   protected override def doExecute(): RDD[InternalRow] = {
+    val (numStreamedRows, numBuildRows) = buildSide match {
+      case BuildRight => (longMetric("numLeftRows"), 
longMetric("numRightRows"))
+      case BuildLeft => (longMetric("numRightRows"), longMetric("numLeftRows"))
+    }
+    val numOutputRows = longMetric("numOutputRows")
+
     val broadcastedRelation =
-      sparkContext.broadcast(broadcast.execute().map(_.copy())
-        .collect().toIndexedSeq)
+      sparkContext.broadcast(broadcast.execute().map { row =>
+        numBuildRows += 1
+        row.copy()
+      }.collect().toIndexedSeq)
 
     /** All rows that either match both-way, or rows from streamed joined with 
nulls. */
     val matchesOrStreamedRowsWithNulls = streamed.execute().mapPartitions { 
streamedIter =>
@@ -94,6 +108,7 @@ case class BroadcastNestedLoopJoin(
       streamedIter.foreach { streamedRow =>
         var i = 0
         var streamRowMatched = false
+        numStreamedRows += 1
 
         while (i < broadcastedRelation.value.size) {
           val broadcastedRow = broadcastedRelation.value(i)
@@ -162,6 +177,12 @@ case class BroadcastNestedLoopJoin(
 
     // TODO: Breaks lineage.
     sparkContext.union(
-      matchesOrStreamedRowsWithNulls.flatMap(_._1), 
sparkContext.makeRDD(broadcastRowsWithNulls))
+      matchesOrStreamedRowsWithNulls.flatMap(_._1),
+      sparkContext.makeRDD(broadcastRowsWithNulls)
+    ).map { row =>
+      // `broadcastRowsWithNulls` doesn't run in a job so that we have to 
track numOutputRows here.
+      numOutputRows += 1
+      row
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/767ee188/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
index 261b472..2115f40 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
@@ -22,6 +22,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Attribute, JoinedRow}
 import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
+import org.apache.spark.sql.execution.metric.SQLMetrics
 
 /**
  * :: DeveloperApi ::
@@ -30,13 +31,31 @@ import org.apache.spark.sql.execution.{BinaryNode, 
SparkPlan}
 case class CartesianProduct(left: SparkPlan, right: SparkPlan) extends 
BinaryNode {
   override def output: Seq[Attribute] = left.output ++ right.output
 
+  override private[sql] lazy val metrics = Map(
+    "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left 
rows"),
+    "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of 
right rows"),
+    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of 
output rows"))
+
   protected override def doExecute(): RDD[InternalRow] = {
-    val leftResults = left.execute().map(_.copy())
-    val rightResults = right.execute().map(_.copy())
+    val numLeftRows = longMetric("numLeftRows")
+    val numRightRows = longMetric("numRightRows")
+    val numOutputRows = longMetric("numOutputRows")
+
+    val leftResults = left.execute().map { row =>
+      numLeftRows += 1
+      row.copy()
+    }
+    val rightResults = right.execute().map { row =>
+      numRightRows += 1
+      row.copy()
+    }
 
     leftResults.cartesian(rightResults).mapPartitions { iter =>
       val joinedRow = new JoinedRow
-      iter.map(r => joinedRow(r._1, r._2))
+      iter.map { r =>
+        numOutputRows += 1
+        joinedRow(r._1, r._2)
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/767ee188/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
index 22d46d1..7ce4a51 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.joins
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.metric.LongSQLMetric
 
 
 trait HashJoin {
@@ -69,7 +70,9 @@ trait HashJoin {
 
   protected def hashJoin(
       streamIter: Iterator[InternalRow],
-      hashedRelation: HashedRelation): Iterator[InternalRow] =
+      numStreamRows: LongSQLMetric,
+      hashedRelation: HashedRelation,
+      numOutputRows: LongSQLMetric): Iterator[InternalRow] =
   {
     new Iterator[InternalRow] {
       private[this] var currentStreamedRow: InternalRow = _
@@ -98,6 +101,7 @@ trait HashJoin {
           case BuildLeft => joinRow(currentHashMatches(currentMatchPosition), 
currentStreamedRow)
         }
         currentMatchPosition += 1
+        numOutputRows += 1
         resultProjection(ret)
       }
 
@@ -113,6 +117,7 @@ trait HashJoin {
 
         while (currentHashMatches == null && streamIter.hasNext) {
           currentStreamedRow = streamIter.next()
+          numStreamRows += 1
           val key = joinKeys(currentStreamedRow)
           if (!key.anyNull) {
             currentHashMatches = hashedRelation.get(key)

http://git-wip-us.apache.org/repos/asf/spark/blob/767ee188/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
index 701bd3c..6690334 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.metric.LongSQLMetric
 import org.apache.spark.util.collection.CompactBuffer
 
 @DeveloperApi
@@ -114,22 +115,28 @@ trait HashOuterJoin {
       key: InternalRow,
       joinedRow: JoinedRow,
       rightIter: Iterable[InternalRow],
-      resultProjection: InternalRow => InternalRow): Iterator[InternalRow] = {
+      resultProjection: InternalRow => InternalRow,
+      numOutputRows: LongSQLMetric): Iterator[InternalRow] = {
     val ret: Iterable[InternalRow] = {
       if (!key.anyNull) {
         val temp = if (rightIter != null) {
           rightIter.collect {
-            case r if boundCondition(joinedRow.withRight(r)) => 
resultProjection(joinedRow).copy()
+            case r if boundCondition(joinedRow.withRight(r)) => {
+              numOutputRows += 1
+              resultProjection(joinedRow).copy()
+            }
           }
         } else {
           List.empty
         }
         if (temp.isEmpty) {
+          numOutputRows += 1
           resultProjection(joinedRow.withRight(rightNullRow)) :: Nil
         } else {
           temp
         }
       } else {
+        numOutputRows += 1
         resultProjection(joinedRow.withRight(rightNullRow)) :: Nil
       }
     }
@@ -140,22 +147,28 @@ trait HashOuterJoin {
       key: InternalRow,
       leftIter: Iterable[InternalRow],
       joinedRow: JoinedRow,
-      resultProjection: InternalRow => InternalRow): Iterator[InternalRow] = {
+      resultProjection: InternalRow => InternalRow,
+      numOutputRows: LongSQLMetric): Iterator[InternalRow] = {
     val ret: Iterable[InternalRow] = {
       if (!key.anyNull) {
         val temp = if (leftIter != null) {
           leftIter.collect {
-            case l if boundCondition(joinedRow.withLeft(l)) => 
resultProjection(joinedRow).copy()
+            case l if boundCondition(joinedRow.withLeft(l)) => {
+              numOutputRows += 1
+              resultProjection(joinedRow).copy()
+            }
           }
         } else {
           List.empty
         }
         if (temp.isEmpty) {
+          numOutputRows += 1
           resultProjection(joinedRow.withLeft(leftNullRow)) :: Nil
         } else {
           temp
         }
       } else {
+        numOutputRows += 1
         resultProjection(joinedRow.withLeft(leftNullRow)) :: Nil
       }
     }
@@ -164,7 +177,7 @@ trait HashOuterJoin {
 
   protected[this] def fullOuterIterator(
       key: InternalRow, leftIter: Iterable[InternalRow], rightIter: 
Iterable[InternalRow],
-      joinedRow: JoinedRow): Iterator[InternalRow] = {
+      joinedRow: JoinedRow, numOutputRows: LongSQLMetric): 
Iterator[InternalRow] = {
     if (!key.anyNull) {
       // Store the positions of records in right, if one of its associated row 
satisfy
       // the join condition.
@@ -177,6 +190,7 @@ trait HashOuterJoin {
           //    append them directly
 
           case (r, idx) if boundCondition(joinedRow.withRight(r)) =>
+            numOutputRows += 1
             matched = true
             // if the row satisfy the join condition, add its index into the 
matched set
             rightMatchedSet.add(idx)
@@ -189,6 +203,7 @@ trait HashOuterJoin {
           // as we don't know whether we need to append it until finish 
iterating all
           // of the records in right side.
           // If we didn't get any proper row, then append a single row with 
empty right.
+          numOutputRows += 1
           joinedRow.withRight(rightNullRow).copy()
         })
       } ++ rightIter.zipWithIndex.collect {
@@ -197,12 +212,15 @@ trait HashOuterJoin {
         // Re-visiting the records in right, and append additional row with 
empty left, if its not
         // in the matched set.
         case (r, idx) if !rightMatchedSet.contains(idx) =>
+          numOutputRows += 1
           joinedRow(leftNullRow, r).copy()
       }
     } else {
       leftIter.iterator.map[InternalRow] { l =>
+        numOutputRows += 1
         joinedRow(l, rightNullRow).copy()
       } ++ rightIter.iterator.map[InternalRow] { r =>
+        numOutputRows += 1
         joinedRow(leftNullRow, r).copy()
       }
     }
@@ -211,10 +229,12 @@ trait HashOuterJoin {
   // This is only used by FullOuter
   protected[this] def buildHashTable(
       iter: Iterator[InternalRow],
+      numIterRows: LongSQLMetric,
       keyGenerator: Projection): JavaHashMap[InternalRow, 
CompactBuffer[InternalRow]] = {
     val hashTable = new JavaHashMap[InternalRow, CompactBuffer[InternalRow]]()
     while (iter.hasNext) {
       val currentRow = iter.next()
+      numIterRows += 1
       val rowKey = keyGenerator(currentRow)
 
       var existingMatchList = hashTable.get(rowKey)

http://git-wip-us.apache.org/repos/asf/spark/blob/767ee188/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala
index 82dd6eb..beb141a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.joins
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.metric.LongSQLMetric
 
 
 trait HashSemiJoin {
@@ -61,13 +62,15 @@ trait HashSemiJoin {
   @transient private lazy val boundCondition =
     newPredicate(condition.getOrElse(Literal(true)), left.output ++ 
right.output)
 
-  protected def buildKeyHashSet(buildIter: Iterator[InternalRow]): 
java.util.Set[InternalRow] = {
+  protected def buildKeyHashSet(
+      buildIter: Iterator[InternalRow], numBuildRows: LongSQLMetric): 
java.util.Set[InternalRow] = {
     val hashSet = new java.util.HashSet[InternalRow]()
 
     // Create a Hash set of buildKeys
     val rightKey = rightKeyGenerator
     while (buildIter.hasNext) {
       val currentRow = buildIter.next()
+      numBuildRows += 1
       val rowKey = rightKey(currentRow)
       if (!rowKey.anyNull) {
         val keyExists = hashSet.contains(rowKey)
@@ -82,25 +85,35 @@ trait HashSemiJoin {
 
   protected def hashSemiJoin(
     streamIter: Iterator[InternalRow],
-    hashSet: java.util.Set[InternalRow]): Iterator[InternalRow] = {
+    numStreamRows: LongSQLMetric,
+    hashSet: java.util.Set[InternalRow],
+    numOutputRows: LongSQLMetric): Iterator[InternalRow] = {
     val joinKeys = leftKeyGenerator
     streamIter.filter(current => {
+      numStreamRows += 1
       val key = joinKeys(current)
-      !key.anyNull && hashSet.contains(key)
+      val r = !key.anyNull && hashSet.contains(key)
+      if (r) numOutputRows += 1
+      r
     })
   }
 
   protected def hashSemiJoin(
       streamIter: Iterator[InternalRow],
-      hashedRelation: HashedRelation): Iterator[InternalRow] = {
+      numStreamRows: LongSQLMetric,
+      hashedRelation: HashedRelation,
+      numOutputRows: LongSQLMetric): Iterator[InternalRow] = {
     val joinKeys = leftKeyGenerator
     val joinedRow = new JoinedRow
     streamIter.filter { current =>
+      numStreamRows += 1
       val key = joinKeys(current)
       lazy val rowBuffer = hashedRelation.get(key)
-      !key.anyNull && rowBuffer != null && rowBuffer.exists {
+      val r = !key.anyNull && rowBuffer != null && rowBuffer.exists {
         (row: InternalRow) => boundCondition(joinedRow(current, row))
       }
+      if (r) numOutputRows += 1
+      r
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/767ee188/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index 63d35d0..c1bc794 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -25,6 +25,7 @@ import org.apache.spark.shuffle.ShuffleMemoryManager
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.SparkSqlSerializer
+import org.apache.spark.sql.execution.metric.LongSQLMetric
 import org.apache.spark.unsafe.Platform
 import org.apache.spark.unsafe.map.BytesToBytesMap
 import org.apache.spark.unsafe.memory.{ExecutorMemoryManager, MemoryAllocator, 
TaskMemoryManager}
@@ -112,11 +113,13 @@ private[joins] object HashedRelation {
 
   def apply(
       input: Iterator[InternalRow],
+      numInputRows: LongSQLMetric,
       keyGenerator: Projection,
       sizeEstimate: Int = 64): HashedRelation = {
 
     if (keyGenerator.isInstanceOf[UnsafeProjection]) {
-      return UnsafeHashedRelation(input, 
keyGenerator.asInstanceOf[UnsafeProjection], sizeEstimate)
+      return UnsafeHashedRelation(
+        input, numInputRows, keyGenerator.asInstanceOf[UnsafeProjection], 
sizeEstimate)
     }
 
     // TODO: Use Spark's HashMap implementation.
@@ -130,6 +133,7 @@ private[joins] object HashedRelation {
     // Create a mapping of buildKeys -> rows
     while (input.hasNext) {
       currentRow = input.next()
+      numInputRows += 1
       val rowKey = keyGenerator(currentRow)
       if (!rowKey.anyNull) {
         val existingMatchList = hashTable.get(rowKey)
@@ -331,6 +335,7 @@ private[joins] object UnsafeHashedRelation {
 
   def apply(
       input: Iterator[InternalRow],
+      numInputRows: LongSQLMetric,
       keyGenerator: UnsafeProjection,
       sizeEstimate: Int): HashedRelation = {
 
@@ -340,6 +345,7 @@ private[joins] object UnsafeHashedRelation {
     // Create a mapping of buildKeys -> rows
     while (input.hasNext) {
       val unsafeRow = input.next().asInstanceOf[UnsafeRow]
+      numInputRows += 1
       val rowKey = keyGenerator(unsafeRow)
       if (!rowKey.anyNull) {
         val existingMatchList = hashTable.get(rowKey)

http://git-wip-us.apache.org/repos/asf/spark/blob/767ee188/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala
index 4443455..ad63625 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
+import org.apache.spark.sql.execution.metric.SQLMetrics
 
 /**
  * :: DeveloperApi ::
@@ -35,6 +36,11 @@ case class LeftSemiJoinBNL(
   extends BinaryNode {
   // TODO: Override requiredChildDistribution.
 
+  override private[sql] lazy val metrics = Map(
+    "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left 
rows"),
+    "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of 
right rows"),
+    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of 
output rows"))
+
   override def outputPartitioning: Partitioning = streamed.outputPartitioning
 
   override def output: Seq[Attribute] = left.output
@@ -52,13 +58,21 @@ case class LeftSemiJoinBNL(
     newPredicate(condition.getOrElse(Literal(true)), left.output ++ 
right.output)
 
   protected override def doExecute(): RDD[InternalRow] = {
+    val numLeftRows = longMetric("numLeftRows")
+    val numRightRows = longMetric("numRightRows")
+    val numOutputRows = longMetric("numOutputRows")
+
     val broadcastedRelation =
-      
sparkContext.broadcast(broadcast.execute().map(_.copy()).collect().toIndexedSeq)
+      sparkContext.broadcast(broadcast.execute().map { row =>
+        numRightRows += 1
+        row.copy()
+      }.collect().toIndexedSeq)
 
     streamed.execute().mapPartitions { streamedIter =>
       val joinedRow = new JoinedRow
 
       streamedIter.filter(streamedRow => {
+        numLeftRows += 1
         var i = 0
         var matched = false
 
@@ -69,6 +83,9 @@ case class LeftSemiJoinBNL(
           }
           i += 1
         }
+        if (matched) {
+          numOutputRows += 1
+        }
         matched
       })
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/767ee188/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala
index 68ccd34..18808ad 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
Distribution, ClusteredDistribution}
 import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
+import org.apache.spark.sql.execution.metric.SQLMetrics
 
 /**
  * :: DeveloperApi ::
@@ -37,19 +38,28 @@ case class LeftSemiJoinHash(
     right: SparkPlan,
     condition: Option[Expression]) extends BinaryNode with HashSemiJoin {
 
+  override private[sql] lazy val metrics = Map(
+    "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left 
rows"),
+    "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of 
right rows"),
+    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of 
output rows"))
+
   override def outputPartitioning: Partitioning = left.outputPartitioning
 
   override def requiredChildDistribution: Seq[Distribution] =
     ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
 
   protected override def doExecute(): RDD[InternalRow] = {
+    val numLeftRows = longMetric("numLeftRows")
+    val numRightRows = longMetric("numRightRows")
+    val numOutputRows = longMetric("numOutputRows")
+
     right.execute().zipPartitions(left.execute()) { (buildIter, streamIter) =>
       if (condition.isEmpty) {
-        val hashSet = buildKeyHashSet(buildIter)
-        hashSemiJoin(streamIter, hashSet)
+        val hashSet = buildKeyHashSet(buildIter, numRightRows)
+        hashSemiJoin(streamIter, numLeftRows, hashSet, numOutputRows)
       } else {
-        val hashRelation = HashedRelation(buildIter, rightKeyGenerator)
-        hashSemiJoin(streamIter, hashRelation)
+        val hashRelation = HashedRelation(buildIter, numRightRows, 
rightKeyGenerator)
+        hashSemiJoin(streamIter, numLeftRows, hashRelation, numOutputRows)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/767ee188/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
index c923dc8..fc8c943 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
+import org.apache.spark.sql.execution.metric.SQLMetrics
 
 /**
  * :: DeveloperApi ::
@@ -38,7 +39,10 @@ case class ShuffledHashJoin(
     right: SparkPlan)
   extends BinaryNode with HashJoin {
 
-  override protected[sql] val trackNumOfRowsEnabled = true
+  override private[sql] lazy val metrics = Map(
+    "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left 
rows"),
+    "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of 
right rows"),
+    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of 
output rows"))
 
   override def outputPartitioning: Partitioning =
     PartitioningCollection(Seq(left.outputPartitioning, 
right.outputPartitioning))
@@ -47,9 +51,15 @@ case class ShuffledHashJoin(
     ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
 
   protected override def doExecute(): RDD[InternalRow] = {
+    val (numBuildRows, numStreamedRows) = buildSide match {
+      case BuildLeft => (longMetric("numLeftRows"), longMetric("numRightRows"))
+      case BuildRight => (longMetric("numRightRows"), 
longMetric("numLeftRows"))
+    }
+    val numOutputRows = longMetric("numOutputRows")
+
     buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, 
streamIter) =>
-      val hashed = HashedRelation(buildIter, buildSideKeyGenerator)
-      hashJoin(streamIter, hashed)
+      val hashed = HashedRelation(buildIter, numBuildRows, 
buildSideKeyGenerator)
+      hashJoin(streamIter, numStreamedRows, hashed, numOutputRows)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/767ee188/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala
index 6a8c35e..ed282f9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.catalyst.plans.{FullOuter, JoinType, LeftOuter, 
RightOuter}
 import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
+import org.apache.spark.sql.execution.metric.SQLMetrics
 
 /**
  * :: DeveloperApi ::
@@ -41,6 +42,11 @@ case class ShuffledHashOuterJoin(
     left: SparkPlan,
     right: SparkPlan) extends BinaryNode with HashOuterJoin {
 
+  override private[sql] lazy val metrics = Map(
+    "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left 
rows"),
+    "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of 
right rows"),
+    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of 
output rows"))
+
   override def requiredChildDistribution: Seq[Distribution] =
     ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
 
@@ -53,39 +59,48 @@ case class ShuffledHashOuterJoin(
   }
 
   protected override def doExecute(): RDD[InternalRow] = {
+    val numLeftRows = longMetric("numLeftRows")
+    val numRightRows = longMetric("numRightRows")
+    val numOutputRows = longMetric("numOutputRows")
+
     val joinedRow = new JoinedRow()
     left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>
       // TODO this probably can be replaced by external sort (sort merged 
join?)
       joinType match {
         case LeftOuter =>
-          val hashed = HashedRelation(rightIter, buildKeyGenerator)
+          val hashed = HashedRelation(rightIter, numRightRows, 
buildKeyGenerator)
           val keyGenerator = streamedKeyGenerator
           val resultProj = resultProjection
           leftIter.flatMap( currentRow => {
+            numLeftRows += 1
             val rowKey = keyGenerator(currentRow)
             joinedRow.withLeft(currentRow)
-            leftOuterIterator(rowKey, joinedRow, hashed.get(rowKey), 
resultProj)
+            leftOuterIterator(rowKey, joinedRow, hashed.get(rowKey), 
resultProj, numOutputRows)
           })
 
         case RightOuter =>
-          val hashed = HashedRelation(leftIter, buildKeyGenerator)
+          val hashed = HashedRelation(leftIter, numLeftRows, buildKeyGenerator)
           val keyGenerator = streamedKeyGenerator
           val resultProj = resultProjection
           rightIter.flatMap ( currentRow => {
+            numRightRows += 1
             val rowKey = keyGenerator(currentRow)
             joinedRow.withRight(currentRow)
-            rightOuterIterator(rowKey, hashed.get(rowKey), joinedRow, 
resultProj)
+            rightOuterIterator(rowKey, hashed.get(rowKey), joinedRow, 
resultProj, numOutputRows)
           })
 
         case FullOuter =>
           // TODO(davies): use UnsafeRow
-          val leftHashTable = buildHashTable(leftIter, newProjection(leftKeys, 
left.output))
-          val rightHashTable = buildHashTable(rightIter, 
newProjection(rightKeys, right.output))
+          val leftHashTable =
+            buildHashTable(leftIter, numLeftRows, newProjection(leftKeys, 
left.output))
+          val rightHashTable =
+            buildHashTable(rightIter, numRightRows, newProjection(rightKeys, 
right.output))
           (leftHashTable.keySet ++ rightHashTable.keySet).iterator.flatMap { 
key =>
             fullOuterIterator(key,
               leftHashTable.getOrElse(key, EMPTY_LIST),
               rightHashTable.getOrElse(key, EMPTY_LIST),
-              joinedRow)
+              joinedRow,
+              numOutputRows)
           }
 
         case x =>

http://git-wip-us.apache.org/repos/asf/spark/blob/767ee188/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
index 6d656ea..6b73226 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.execution.{BinaryNode, RowIterator, SparkPlan}
+import org.apache.spark.sql.execution.metric.{LongSQLMetric, SQLMetrics}
 
 /**
  * :: DeveloperApi ::
@@ -37,6 +38,11 @@ case class SortMergeJoin(
     left: SparkPlan,
     right: SparkPlan) extends BinaryNode {
 
+  override private[sql] lazy val metrics = Map(
+    "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left 
rows"),
+    "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of 
right rows"),
+    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of 
output rows"))
+
   override def output: Seq[Attribute] = left.output ++ right.output
 
   override def outputPartitioning: Partitioning =
@@ -70,6 +76,10 @@ case class SortMergeJoin(
   }
 
   protected override def doExecute(): RDD[InternalRow] = {
+    val numLeftRows = longMetric("numLeftRows")
+    val numRightRows = longMetric("numRightRows")
+    val numOutputRows = longMetric("numOutputRows")
+
     left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>
       new RowIterator {
         // An ordering that can be used to compare keys from both sides.
@@ -82,7 +92,9 @@ case class SortMergeJoin(
           rightKeyGenerator,
           keyOrdering,
           RowIterator.fromScala(leftIter),
-          RowIterator.fromScala(rightIter)
+          numLeftRows,
+          RowIterator.fromScala(rightIter),
+          numRightRows
         )
         private[this] val joinRow = new JoinedRow
         private[this] val resultProjection: (InternalRow) => InternalRow = {
@@ -108,6 +120,7 @@ case class SortMergeJoin(
           if (currentLeftRow != null) {
             joinRow(currentLeftRow, currentRightMatches(currentMatchIdx))
             currentMatchIdx += 1
+            numOutputRows += 1
             true
           } else {
             false
@@ -144,7 +157,9 @@ private[joins] class SortMergeJoinScanner(
     bufferedKeyGenerator: Projection,
     keyOrdering: Ordering[InternalRow],
     streamedIter: RowIterator,
-    bufferedIter: RowIterator) {
+    numStreamedRows: LongSQLMetric,
+    bufferedIter: RowIterator,
+    numBufferedRows: LongSQLMetric) {
   private[this] var streamedRow: InternalRow = _
   private[this] var streamedRowKey: InternalRow = _
   private[this] var bufferedRow: InternalRow = _
@@ -269,6 +284,7 @@ private[joins] class SortMergeJoinScanner(
     if (streamedIter.advanceNext()) {
       streamedRow = streamedIter.getRow
       streamedRowKey = streamedKeyGenerator(streamedRow)
+      numStreamedRows += 1
       true
     } else {
       streamedRow = null
@@ -286,6 +302,7 @@ private[joins] class SortMergeJoinScanner(
     while (!foundRow && bufferedIter.advanceNext()) {
       bufferedRow = bufferedIter.getRow
       bufferedRowKey = bufferedKeyGenerator(bufferedRow)
+      numBufferedRows += 1
       foundRow = !bufferedRowKey.anyNull
     }
     if (!foundRow) {

http://git-wip-us.apache.org/repos/asf/spark/blob/767ee188/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala
index 5326966..dea9e5e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, RightOuter}
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.execution.{BinaryNode, RowIterator, SparkPlan}
+import org.apache.spark.sql.execution.metric.{LongSQLMetric, SQLMetrics}
 
 /**
  * :: DeveloperApi ::
@@ -40,6 +41,11 @@ case class SortMergeOuterJoin(
     left: SparkPlan,
     right: SparkPlan) extends BinaryNode {
 
+  override private[sql] lazy val metrics = Map(
+    "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left 
rows"),
+    "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of 
right rows"),
+    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of 
output rows"))
+
   override def output: Seq[Attribute] = {
     joinType match {
       case LeftOuter =>
@@ -108,6 +114,10 @@ case class SortMergeOuterJoin(
   }
 
   override def doExecute(): RDD[InternalRow] = {
+    val numLeftRows = longMetric("numLeftRows")
+    val numRightRows = longMetric("numRightRows")
+    val numOutputRows = longMetric("numOutputRows")
+
     left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>
       // An ordering that can be used to compare keys from both sides.
       val keyOrdering = newNaturalAscendingOrdering(leftKeys.map(_.dataType))
@@ -133,10 +143,13 @@ case class SortMergeOuterJoin(
             bufferedKeyGenerator = createRightKeyGenerator(),
             keyOrdering,
             streamedIter = RowIterator.fromScala(leftIter),
-            bufferedIter = RowIterator.fromScala(rightIter)
+            numLeftRows,
+            bufferedIter = RowIterator.fromScala(rightIter),
+            numRightRows
           )
           val rightNullRow = new GenericInternalRow(right.output.length)
-          new LeftOuterIterator(smjScanner, rightNullRow, boundCondition, 
resultProj).toScala
+          new LeftOuterIterator(
+            smjScanner, rightNullRow, boundCondition, resultProj, 
numOutputRows).toScala
 
         case RightOuter =>
           val smjScanner = new SortMergeJoinScanner(
@@ -144,10 +157,13 @@ case class SortMergeOuterJoin(
             bufferedKeyGenerator = createLeftKeyGenerator(),
             keyOrdering,
             streamedIter = RowIterator.fromScala(rightIter),
-            bufferedIter = RowIterator.fromScala(leftIter)
+            numRightRows,
+            bufferedIter = RowIterator.fromScala(leftIter),
+            numLeftRows
           )
           val leftNullRow = new GenericInternalRow(left.output.length)
-          new RightOuterIterator(smjScanner, leftNullRow, boundCondition, 
resultProj).toScala
+          new RightOuterIterator(
+            smjScanner, leftNullRow, boundCondition, resultProj, 
numOutputRows).toScala
 
         case x =>
           throw new IllegalArgumentException(
@@ -162,7 +178,8 @@ private class LeftOuterIterator(
     smjScanner: SortMergeJoinScanner,
     rightNullRow: InternalRow,
     boundCondition: InternalRow => Boolean,
-    resultProj: InternalRow => InternalRow
+    resultProj: InternalRow => InternalRow,
+    numRows: LongSQLMetric
   ) extends RowIterator {
   private[this] val joinedRow: JoinedRow = new JoinedRow()
   private[this] var rightIdx: Int = 0
@@ -198,7 +215,9 @@ private class LeftOuterIterator(
   }
 
   override def advanceNext(): Boolean = {
-    advanceRightUntilBoundConditionSatisfied() || advanceLeft()
+    val r = advanceRightUntilBoundConditionSatisfied() || advanceLeft()
+    if (r) numRows += 1
+    r
   }
 
   override def getRow: InternalRow = resultProj(joinedRow)
@@ -208,7 +227,8 @@ private class RightOuterIterator(
     smjScanner: SortMergeJoinScanner,
     leftNullRow: InternalRow,
     boundCondition: InternalRow => Boolean,
-    resultProj: InternalRow => InternalRow
+    resultProj: InternalRow => InternalRow,
+    numRows: LongSQLMetric
   ) extends RowIterator {
   private[this] val joinedRow: JoinedRow = new JoinedRow()
   private[this] var leftIdx: Int = 0
@@ -244,7 +264,9 @@ private class RightOuterIterator(
   }
 
   override def advanceNext(): Boolean = {
-    advanceLeftUntilBoundConditionSatisfied() || advanceRight()
+    val r = advanceLeftUntilBoundConditionSatisfied() || advanceRight()
+    if (r) numRows += 1
+    r
   }
 
   override def getRow: InternalRow = resultProj(joinedRow)

http://git-wip-us.apache.org/repos/asf/spark/blob/767ee188/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
index 1b51a5e..7a2a98e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
@@ -112,4 +112,10 @@ private[sql] object SQLMetrics {
     sc.cleaner.foreach(_.registerAccumulatorForCleanup(acc))
     acc
   }
+
+  /**
+   * A metric that its value will be ignored. Use this one when we need a 
metric parameter but don't
+   * care about the value.
+   */
+  val nullLongMetric = new LongSQLMetric("null")
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/767ee188/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
index 8b1a9b2..a1fa2c3 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
@@ -22,6 +22,8 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, 
ObjectInputStream,
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.sql.test.TestSQLContext
 import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
 import org.apache.spark.util.collection.CompactBuffer
 
@@ -35,7 +37,8 @@ class HashedRelationSuite extends SparkFunSuite {
 
   test("GeneralHashedRelation") {
     val data = Array(InternalRow(0), InternalRow(1), InternalRow(2), 
InternalRow(2))
-    val hashed = HashedRelation(data.iterator, keyProjection)
+    val numDataRows = SQLMetrics.createLongMetric(TestSQLContext.sparkContext, 
"data")
+    val hashed = HashedRelation(data.iterator, numDataRows, keyProjection)
     assert(hashed.isInstanceOf[GeneralHashedRelation])
 
     assert(hashed.get(data(0)) === CompactBuffer[InternalRow](data(0)))
@@ -45,11 +48,13 @@ class HashedRelationSuite extends SparkFunSuite {
     val data2 = CompactBuffer[InternalRow](data(2))
     data2 += data(2)
     assert(hashed.get(data(2)) === data2)
+    assert(numDataRows.value.value === data.length)
   }
 
   test("UniqueKeyHashedRelation") {
     val data = Array(InternalRow(0), InternalRow(1), InternalRow(2))
-    val hashed = HashedRelation(data.iterator, keyProjection)
+    val numDataRows = SQLMetrics.createLongMetric(TestSQLContext.sparkContext, 
"data")
+    val hashed = HashedRelation(data.iterator, numDataRows, keyProjection)
     assert(hashed.isInstanceOf[UniqueKeyHashedRelation])
 
     assert(hashed.get(data(0)) === CompactBuffer[InternalRow](data(0)))
@@ -62,17 +67,19 @@ class HashedRelationSuite extends SparkFunSuite {
     assert(uniqHashed.getValue(data(1)) === data(1))
     assert(uniqHashed.getValue(data(2)) === data(2))
     assert(uniqHashed.getValue(InternalRow(10)) === null)
+    assert(numDataRows.value.value === data.length)
   }
 
   test("UnsafeHashedRelation") {
     val schema = StructType(StructField("a", IntegerType, true) :: Nil)
     val data = Array(InternalRow(0), InternalRow(1), InternalRow(2), 
InternalRow(2))
+    val numDataRows = SQLMetrics.createLongMetric(TestSQLContext.sparkContext, 
"data")
     val toUnsafe = UnsafeProjection.create(schema)
     val unsafeData = data.map(toUnsafe(_).copy()).toArray
 
     val buildKey = Seq(BoundReference(0, IntegerType, false))
     val keyGenerator = UnsafeProjection.create(buildKey)
-    val hashed = UnsafeHashedRelation(unsafeData.iterator, keyGenerator, 1)
+    val hashed = UnsafeHashedRelation(unsafeData.iterator, numDataRows, 
keyGenerator, 1)
     assert(hashed.isInstanceOf[UnsafeHashedRelation])
 
     assert(hashed.get(unsafeData(0)) === 
CompactBuffer[InternalRow](unsafeData(0)))
@@ -94,5 +101,6 @@ class HashedRelationSuite extends SparkFunSuite {
     assert(hashed2.get(unsafeData(1)) === 
CompactBuffer[InternalRow](unsafeData(1)))
     assert(hashed2.get(toUnsafe(InternalRow(10))) === null)
     assert(hashed2.get(unsafeData(2)) === data2)
+    assert(numDataRows.value.value === data.length)
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to