Repository: spark
Updated Branches:
  refs/heads/branch-1.6 0cb06c993 -> 1ac830aca


[SPARK-16044][SQL] Backport input_file_name() for data source based on 
NewHadoopRDD to branch 1.6

## What changes were proposed in this pull request?

This PR backports https://github.com/apache/spark/pull/13759.

(`SqlNewHadoopRDDState` was renamed to `InputFileNameHolder` and `spark` API 
does not exist in branch 1.6)

## How was this patch tested?

Unit tests in `ColumnExpressionSuite`.

Author: hyukjinkwon <gurwls...@gmail.com>

Closes #13806 from HyukjinKwon/backport-SPARK-16044.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1ac830ac
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1ac830ac
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1ac830ac

Branch: refs/heads/branch-1.6
Commit: 1ac830aca089e9f0b9b0bf367236ffc1184eae7e
Parents: 0cb06c9
Author: hyukjinkwon <gurwls...@gmail.com>
Authored: Wed Jun 29 13:11:56 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Wed Jun 29 13:11:56 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/rdd/NewHadoopRDD.scala     |  7 ++++
 .../spark/sql/ColumnExpressionSuite.scala       | 39 ++++++++++++++++++--
 2 files changed, 42 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1ac830ac/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index c8b4f30..46fe1ba 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -134,6 +134,12 @@ class NewHadoopRDD[K, V](
       val inputMetrics = context.taskMetrics
         .getInputMetricsForReadMethod(DataReadMethod.Hadoop)
 
+      // Sets the thread local variable for the file's name
+      split.serializableHadoopSplit.value match {
+        case fs: FileSplit => 
SqlNewHadoopRDDState.setInputFileName(fs.getPath.toString)
+        case _ => SqlNewHadoopRDDState.unsetInputFileName()
+      }
+
       // Find a function that will return the FileSystem bytes read by this 
thread. Do this before
       // creating RecordReader, because RecordReader's constructor might read 
some bytes
       val bytesReadCallback = inputMetrics.bytesReadCallback.orElse {
@@ -190,6 +196,7 @@ class NewHadoopRDD[K, V](
 
       private def close() {
         if (reader != null) {
+          SqlNewHadoopRDDState.unsetInputFileName()
           // Close the reader and release it. Note: it's very important that 
we don't close the
           // reader more than once, since that exposes us to MAPREDUCE-5918 
when running against
           // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to 
non-deterministic

http://git-wip-us.apache.org/repos/asf/spark/blob/1ac830ac/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
index 38c0eb5..52b3d60 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
@@ -17,9 +17,11 @@
 
 package org.apache.spark.sql
 
-import org.apache.spark.sql.catalyst.expressions.NamedExpression
+import org.apache.hadoop.io.{LongWritable, Text}
+import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => 
NewTextInputFormat}
 import org.scalatest.Matchers._
 
+import org.apache.spark.sql.catalyst.expressions.NamedExpression
 import org.apache.spark.sql.execution.Project
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.test.SharedSQLContext
@@ -591,15 +593,44 @@ class ColumnExpressionSuite extends QueryTest with 
SharedSQLContext {
     )
   }
 
-  test("InputFileName") {
+  test("InputFileName - SqlNewHadoopRDD") {
     withTempPath { dir =>
       val data = sparkContext.parallelize(0 to 10).toDF("id")
       data.write.parquet(dir.getCanonicalPath)
-      val answer = 
sqlContext.read.parquet(dir.getCanonicalPath).select(inputFileName())
+      val answer = 
sqlContext.read.parquet(dir.getCanonicalPath).select(input_file_name())
         .head.getString(0)
       assert(answer.contains(dir.getCanonicalPath))
 
-      checkAnswer(data.select(inputFileName()).limit(1), Row(""))
+      checkAnswer(data.select(input_file_name()).limit(1), Row(""))
+    }
+  }
+
+  test("input_file_name - HadoopRDD") {
+    withTempPath { dir =>
+      val data = sparkContext.parallelize((0 to 10).map(_.toString)).toDF()
+      data.write.text(dir.getCanonicalPath)
+      val df = sparkContext.textFile(dir.getCanonicalPath).toDF()
+      val answer = df.select(input_file_name()).head.getString(0)
+      assert(answer.contains(dir.getCanonicalPath))
+
+      checkAnswer(data.select(input_file_name()).limit(1), Row(""))
+    }
+  }
+
+  test("input_file_name - NewHadoopRDD") {
+    withTempPath { dir =>
+      val data = sparkContext.parallelize((0 to 10).map(_.toString)).toDF()
+      data.write.text(dir.getCanonicalPath)
+      val rdd = sparkContext.newAPIHadoopFile(
+        dir.getCanonicalPath,
+        classOf[NewTextInputFormat],
+        classOf[LongWritable],
+        classOf[Text])
+      val df = rdd.map(pair => pair._2.toString).toDF()
+      val answer = df.select(input_file_name()).head.getString(0)
+      assert(answer.contains(dir.getCanonicalPath))
+
+      checkAnswer(data.select(input_file_name()).limit(1), Row(""))
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to