spark git commit: [SPARK-16044][SQL] input_file_name() returns empty strings in data sources based on NewHadoopRDD

2016-06-20 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 9d513b8d2 -> 12f00b6ed


[SPARK-16044][SQL] input_file_name() returns empty strings in data sources 
based on NewHadoopRDD

## What changes were proposed in this pull request?

This PR makes `input_file_name()` function return the file paths not empty 
strings for external data sources based on `NewHadoopRDD`, such as 
[spark-redshift](https://github.com/databricks/spark-redshift/blob/cba5eee1ab79ae8f0fa9e668373a54d2b5babf6b/src/main/scala/com/databricks/spark/redshift/RedshiftRelation.scala#L149)
 and 
[spark-xml](https://github.com/databricks/spark-xml/blob/master/src/main/scala/com/databricks/spark/xml/util/XmlFile.scala#L39-L47).

The codes with the external data sources below:

```scala
df.select(input_file_name).show()
```

will produce

- **Before**
  ```
+-+
|input_file_name()|
+-+
| |
+-+
```

- **After**
  ```
++
|   input_file_name()|
++
|file:/private/var...|
++
```

## How was this patch tested?

Unit tests in `ColumnExpressionSuite`.

Author: hyukjinkwon 

Closes #13759 from HyukjinKwon/SPARK-16044.

(cherry picked from commit 4f7f1c436205630ab77d3758d7210cc1a2f0d04a)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/12f00b6e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/12f00b6e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/12f00b6e

Branch: refs/heads/branch-2.0
Commit: 12f00b6edde9b6f97d2450e2cd99edd5e31b9169
Parents: 9d513b8
Author: hyukjinkwon 
Authored: Mon Jun 20 21:55:34 2016 -0700
Committer: Reynold Xin 
Committed: Mon Jun 20 21:55:40 2016 -0700

--
 .../apache/spark/rdd/InputFileNameHolder.scala  |  2 +-
 .../org/apache/spark/rdd/NewHadoopRDD.scala |  7 
 .../spark/sql/ColumnExpressionSuite.scala   | 34 ++--
 3 files changed, 40 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/12f00b6e/core/src/main/scala/org/apache/spark/rdd/InputFileNameHolder.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/InputFileNameHolder.scala 
b/core/src/main/scala/org/apache/spark/rdd/InputFileNameHolder.scala
index 108e9d2..f40d4c8 100644
--- a/core/src/main/scala/org/apache/spark/rdd/InputFileNameHolder.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/InputFileNameHolder.scala
@@ -21,7 +21,7 @@ import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * This holds file names of the current Spark task. This is used in HadoopRDD,
- * FileScanRDD and InputFileName function in Spark SQL.
+ * FileScanRDD, NewHadoopRDD and InputFileName function in Spark SQL.
  */
 private[spark] object InputFileNameHolder {
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/12f00b6e/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 189dc7b..b086baa 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -135,6 +135,12 @@ class NewHadoopRDD[K, V](
   val inputMetrics = context.taskMetrics().inputMetrics
   val existingBytesRead = inputMetrics.bytesRead
 
+  // Sets the thread local variable for the file's name
+  split.serializableHadoopSplit.value match {
+case fs: FileSplit => 
InputFileNameHolder.setInputFileName(fs.getPath.toString)
+case _ => InputFileNameHolder.unsetInputFileName()
+  }
+
   // Find a function that will return the FileSystem bytes read by this 
thread. Do this before
   // creating RecordReader, because RecordReader's constructor might read 
some bytes
   val getBytesReadCallback: Option[() => Long] = 
split.serializableHadoopSplit.value match {
@@ -201,6 +207,7 @@ class NewHadoopRDD[K, V](
 
   private def close() {
 if (reader != null) {
+  InputFileNameHolder.unsetInputFileName()
   // Close the reader and release it. Note: it's very important that 
we don't close the
   // reader more than once, since that exposes us to MAPREDUCE-5918 
when running against
   // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to 
non-deterministic

http://git-wip-us.apache.org/repos/asf/spark/blob/12f00b6e/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala

spark git commit: [SPARK-16044][SQL] input_file_name() returns empty strings in data sources based on NewHadoopRDD

2016-06-20 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 18a8a9b1f -> 4f7f1c436


[SPARK-16044][SQL] input_file_name() returns empty strings in data sources 
based on NewHadoopRDD

## What changes were proposed in this pull request?

This PR makes `input_file_name()` function return the file paths not empty 
strings for external data sources based on `NewHadoopRDD`, such as 
[spark-redshift](https://github.com/databricks/spark-redshift/blob/cba5eee1ab79ae8f0fa9e668373a54d2b5babf6b/src/main/scala/com/databricks/spark/redshift/RedshiftRelation.scala#L149)
 and 
[spark-xml](https://github.com/databricks/spark-xml/blob/master/src/main/scala/com/databricks/spark/xml/util/XmlFile.scala#L39-L47).

The codes with the external data sources below:

```scala
df.select(input_file_name).show()
```

will produce

- **Before**
  ```
+-+
|input_file_name()|
+-+
| |
+-+
```

- **After**
  ```
++
|   input_file_name()|
++
|file:/private/var...|
++
```

## How was this patch tested?

Unit tests in `ColumnExpressionSuite`.

Author: hyukjinkwon 

Closes #13759 from HyukjinKwon/SPARK-16044.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4f7f1c43
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4f7f1c43
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4f7f1c43

Branch: refs/heads/master
Commit: 4f7f1c436205630ab77d3758d7210cc1a2f0d04a
Parents: 18a8a9b
Author: hyukjinkwon 
Authored: Mon Jun 20 21:55:34 2016 -0700
Committer: Reynold Xin 
Committed: Mon Jun 20 21:55:34 2016 -0700

--
 .../apache/spark/rdd/InputFileNameHolder.scala  |  2 +-
 .../org/apache/spark/rdd/NewHadoopRDD.scala |  7 
 .../spark/sql/ColumnExpressionSuite.scala   | 34 ++--
 3 files changed, 40 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4f7f1c43/core/src/main/scala/org/apache/spark/rdd/InputFileNameHolder.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/InputFileNameHolder.scala 
b/core/src/main/scala/org/apache/spark/rdd/InputFileNameHolder.scala
index 108e9d2..f40d4c8 100644
--- a/core/src/main/scala/org/apache/spark/rdd/InputFileNameHolder.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/InputFileNameHolder.scala
@@ -21,7 +21,7 @@ import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * This holds file names of the current Spark task. This is used in HadoopRDD,
- * FileScanRDD and InputFileName function in Spark SQL.
+ * FileScanRDD, NewHadoopRDD and InputFileName function in Spark SQL.
  */
 private[spark] object InputFileNameHolder {
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/4f7f1c43/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 189dc7b..b086baa 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -135,6 +135,12 @@ class NewHadoopRDD[K, V](
   val inputMetrics = context.taskMetrics().inputMetrics
   val existingBytesRead = inputMetrics.bytesRead
 
+  // Sets the thread local variable for the file's name
+  split.serializableHadoopSplit.value match {
+case fs: FileSplit => 
InputFileNameHolder.setInputFileName(fs.getPath.toString)
+case _ => InputFileNameHolder.unsetInputFileName()
+  }
+
   // Find a function that will return the FileSystem bytes read by this 
thread. Do this before
   // creating RecordReader, because RecordReader's constructor might read 
some bytes
   val getBytesReadCallback: Option[() => Long] = 
split.serializableHadoopSplit.value match {
@@ -201,6 +207,7 @@ class NewHadoopRDD[K, V](
 
   private def close() {
 if (reader != null) {
+  InputFileNameHolder.unsetInputFileName()
   // Close the reader and release it. Note: it's very important that 
we don't close the
   // reader more than once, since that exposes us to MAPREDUCE-5918 
when running against
   // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to 
non-deterministic

http://git-wip-us.apache.org/repos/asf/spark/blob/4f7f1c43/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala