Repository: spark
Updated Branches:
  refs/heads/master 0e4bdebec -> 264bc6362


[SPARK-16165][SQL] Fix the update logic for InMemoryTableScanExec.readBatches

## What changes were proposed in this pull request?

Currently, `readBatches` accumulator of `InMemoryTableScanExec` is updated only 
when `spark.sql.inMemoryColumnarStorage.partitionPruning` is true. Although 
this metric is used for only testing purpose, we had better have correct metric 
without considering SQL options.

## How was this patch tested?

Pass the Jenkins tests (including a new testcase).

Author: Dongjoon Hyun <dongj...@apache.org>

Closes #13870 from dongjoon-hyun/SPARK-16165.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/264bc636
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/264bc636
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/264bc636

Branch: refs/heads/master
Commit: 264bc63623b20529abcf84abcb333e7c16ad1ef9
Parents: 0e4bdeb
Author: Dongjoon Hyun <dongj...@apache.org>
Authored: Fri Jun 24 07:19:20 2016 +0800
Committer: Cheng Lian <l...@databricks.com>
Committed: Fri Jun 24 07:19:20 2016 +0800

----------------------------------------------------------------------
 .../execution/columnar/InMemoryTableScanExec.scala   |  6 +++---
 .../columnar/PartitionBatchPruningSuite.scala        | 15 +++++++++++++++
 2 files changed, 18 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/264bc636/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
index 2695f35..183e494 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
@@ -147,9 +147,6 @@ private[sql] case class InMemoryTableScanExec(
               logInfo(s"Skipping partition based on stats $statsString")
               false
             } else {
-              if (enableAccumulators) {
-                readBatches.add(1)
-              }
               true
             }
           }
@@ -159,6 +156,9 @@ private[sql] case class InMemoryTableScanExec(
 
       // update SQL metrics
       val withMetrics = cachedBatchesToScan.map { batch =>
+        if (enableAccumulators) {
+          readBatches.add(1)
+        }
         numOutputRows += batch.numRows
         batch
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/264bc636/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
index a118cec..7ca8e04 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
@@ -119,6 +119,21 @@ class PartitionBatchPruningSuite
     }
   }
 
+  // With disable IN_MEMORY_PARTITION_PRUNING option
+  test("disable IN_MEMORY_PARTITION_PRUNING") {
+    spark.conf.set(SQLConf.IN_MEMORY_PARTITION_PRUNING.key, false)
+
+    val df = sql("SELECT key FROM pruningData WHERE key = 1")
+    val result = df.collect().map(_(0)).toArray
+    assert(result.length === 1)
+
+    val (readPartitions, readBatches) = df.queryExecution.sparkPlan.collect {
+        case in: InMemoryTableScanExec => (in.readPartitions.value, 
in.readBatches.value)
+      }.head
+    assert(readPartitions === 5)
+    assert(readBatches === 10)
+  }
+
   def checkBatchPruning(
       query: String,
       expectedReadPartitions: Int,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to