Repository: spark
Updated Branches:
  refs/heads/branch-2.4 3644c84f5 -> f9b476c6a


[SPARK-25237][SQL] Remove updateBytesReadWithFileSize in FileScanRDD

## What changes were proposed in this pull request?
This pr removed the method `updateBytesReadWithFileSize` in `FileScanRDD` 
because it computes input metrics by file size supported in Hadoop 2.5 and 
earlier. The current Spark does not support the versions, so it causes wrong 
input metric numbers.

This is rework from #22232.

Closes #22232

## How was this patch tested?
Added tests in `FileBasedDataSourceSuite`.

Closes #22324 from maropu/pr22232-2.

Lead-authored-by: dujunling <dujunl...@huawei.com>
Co-authored-by: Takeshi Yamamuro <yamam...@apache.org>
Signed-off-by: Sean Owen <sean.o...@databricks.com>
(cherry picked from commit ed249db9c464062fbab7c6f68ad24caaa95cec82)
Signed-off-by: Sean Owen <sean.o...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f9b476c6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f9b476c6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f9b476c6

Branch: refs/heads/branch-2.4
Commit: f9b476c6ad629007d9334409e4dda99119cf0053
Parents: 3644c84
Author: dujunling <dujunl...@huawei.com>
Authored: Thu Sep 6 21:44:46 2018 -0700
Committer: Sean Owen <sean.o...@databricks.com>
Committed: Thu Sep 6 21:44:53 2018 -0700

----------------------------------------------------------------------
 .../sql/execution/datasources/FileScanRDD.scala | 10 --------
 .../spark/sql/FileBasedDataSourceSuite.scala    | 24 ++++++++++++++++++++
 2 files changed, 24 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f9b476c6/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index 99fc78f..345c9d8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -89,14 +89,6 @@ class FileScanRDD(
         inputMetrics.setBytesRead(existingBytesRead + getBytesReadCallback())
       }
 
-      // If we can't get the bytes read from the FS stats, fall back to the 
file size,
-      // which may be inaccurate.
-      private def updateBytesReadWithFileSize(): Unit = {
-        if (currentFile != null) {
-          inputMetrics.incBytesRead(currentFile.length)
-        }
-      }
-
       private[this] val files = 
split.asInstanceOf[FilePartition].files.toIterator
       private[this] var currentFile: PartitionedFile = null
       private[this] var currentIterator: Iterator[Object] = null
@@ -139,7 +131,6 @@ class FileScanRDD(
 
       /** Advances to the next file. Returns true if a new non-empty iterator 
is available. */
       private def nextIterator(): Boolean = {
-        updateBytesReadWithFileSize()
         if (files.hasNext) {
           currentFile = files.next()
           logInfo(s"Reading File $currentFile")
@@ -208,7 +199,6 @@ class FileScanRDD(
 
       override def close(): Unit = {
         updateBytesRead()
-        updateBytesReadWithFileSize()
         InputFileBlockHolder.unset()
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/f9b476c6/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index 4aa6afd..304ede9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -20,10 +20,13 @@ package org.apache.spark.sql
 import java.io.{File, FileNotFoundException}
 import java.util.Locale
 
+import scala.collection.mutable
+
 import org.apache.hadoop.fs.Path
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.SparkException
+import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
 import org.apache.spark.sql.TestingUDT.{IntervalData, IntervalUDT, NullData, 
NullUDT}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
@@ -473,6 +476,27 @@ class FileBasedDataSourceSuite extends QueryTest with 
SharedSQLContext with Befo
       }
     }
   }
+
+  test("SPARK-25237 compute correct input metrics in FileScanRDD") {
+    withTempPath { p =>
+      val path = p.getAbsolutePath
+      spark.range(1000).repartition(1).write.csv(path)
+      val bytesReads = new mutable.ArrayBuffer[Long]()
+      val bytesReadListener = new SparkListener() {
+        override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
+          bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead
+        }
+      }
+      sparkContext.addSparkListener(bytesReadListener)
+      try {
+        spark.read.csv(path).limit(1).collect()
+        sparkContext.listenerBus.waitUntilEmpty(1000L)
+        assert(bytesReads.sum === 7860)
+      } finally {
+        sparkContext.removeSparkListener(bytesReadListener)
+      }
+    }
+  }
 }
 
 object TestingUDT {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to