spark git commit: [SPARK-16044][SQL] input_file_name() returns empty strings in data sources based on NewHadoopRDD
Repository: spark Updated Branches: refs/heads/branch-2.0 9d513b8d2 -> 12f00b6ed [SPARK-16044][SQL] input_file_name() returns empty strings in data sources based on NewHadoopRDD ## What changes were proposed in this pull request? This PR makes `input_file_name()` function return the file paths not empty strings for external data sources based on `NewHadoopRDD`, such as [spark-redshift](https://github.com/databricks/spark-redshift/blob/cba5eee1ab79ae8f0fa9e668373a54d2b5babf6b/src/main/scala/com/databricks/spark/redshift/RedshiftRelation.scala#L149) and [spark-xml](https://github.com/databricks/spark-xml/blob/master/src/main/scala/com/databricks/spark/xml/util/XmlFile.scala#L39-L47). The codes with the external data sources below: ```scala df.select(input_file_name).show() ``` will produce - **Before** ``` +-+ |input_file_name()| +-+ | | +-+ ``` - **After** ``` ++ | input_file_name()| ++ |file:/private/var...| ++ ``` ## How was this patch tested? Unit tests in `ColumnExpressionSuite`. Author: hyukjinkwonCloses #13759 from HyukjinKwon/SPARK-16044. (cherry picked from commit 4f7f1c436205630ab77d3758d7210cc1a2f0d04a) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/12f00b6e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/12f00b6e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/12f00b6e Branch: refs/heads/branch-2.0 Commit: 12f00b6edde9b6f97d2450e2cd99edd5e31b9169 Parents: 9d513b8 Author: hyukjinkwon Authored: Mon Jun 20 21:55:34 2016 -0700 Committer: Reynold Xin Committed: Mon Jun 20 21:55:40 2016 -0700 -- .../apache/spark/rdd/InputFileNameHolder.scala | 2 +- .../org/apache/spark/rdd/NewHadoopRDD.scala | 7 .../spark/sql/ColumnExpressionSuite.scala | 34 ++-- 3 files changed, 40 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/12f00b6e/core/src/main/scala/org/apache/spark/rdd/InputFileNameHolder.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/InputFileNameHolder.scala b/core/src/main/scala/org/apache/spark/rdd/InputFileNameHolder.scala index 108e9d2..f40d4c8 100644 --- a/core/src/main/scala/org/apache/spark/rdd/InputFileNameHolder.scala +++ b/core/src/main/scala/org/apache/spark/rdd/InputFileNameHolder.scala @@ -21,7 +21,7 @@ import org.apache.spark.unsafe.types.UTF8String /** * This holds file names of the current Spark task. This is used in HadoopRDD, - * FileScanRDD and InputFileName function in Spark SQL. + * FileScanRDD, NewHadoopRDD and InputFileName function in Spark SQL. */ private[spark] object InputFileNameHolder { /** http://git-wip-us.apache.org/repos/asf/spark/blob/12f00b6e/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 189dc7b..b086baa 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -135,6 +135,12 @@ class NewHadoopRDD[K, V]( val inputMetrics = context.taskMetrics().inputMetrics val existingBytesRead = inputMetrics.bytesRead + // Sets the thread local variable for the file's name + split.serializableHadoopSplit.value match { +case fs: FileSplit => InputFileNameHolder.setInputFileName(fs.getPath.toString) +case _ => InputFileNameHolder.unsetInputFileName() + } + // Find a function that will return the FileSystem bytes read by this thread. Do this before // creating RecordReader, because RecordReader's constructor might read some bytes val getBytesReadCallback: Option[() => Long] = split.serializableHadoopSplit.value match { @@ -201,6 +207,7 @@ class NewHadoopRDD[K, V]( private def close() { if (reader != null) { + InputFileNameHolder.unsetInputFileName() // Close the reader and release it. Note: it's very important that we don't close the // reader more than once, since that exposes us to MAPREDUCE-5918 when running against // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic http://git-wip-us.apache.org/repos/asf/spark/blob/12f00b6e/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
spark git commit: [SPARK-16044][SQL] input_file_name() returns empty strings in data sources based on NewHadoopRDD
Repository: spark Updated Branches: refs/heads/master 18a8a9b1f -> 4f7f1c436 [SPARK-16044][SQL] input_file_name() returns empty strings in data sources based on NewHadoopRDD ## What changes were proposed in this pull request? This PR makes `input_file_name()` function return the file paths not empty strings for external data sources based on `NewHadoopRDD`, such as [spark-redshift](https://github.com/databricks/spark-redshift/blob/cba5eee1ab79ae8f0fa9e668373a54d2b5babf6b/src/main/scala/com/databricks/spark/redshift/RedshiftRelation.scala#L149) and [spark-xml](https://github.com/databricks/spark-xml/blob/master/src/main/scala/com/databricks/spark/xml/util/XmlFile.scala#L39-L47). The codes with the external data sources below: ```scala df.select(input_file_name).show() ``` will produce - **Before** ``` +-+ |input_file_name()| +-+ | | +-+ ``` - **After** ``` ++ | input_file_name()| ++ |file:/private/var...| ++ ``` ## How was this patch tested? Unit tests in `ColumnExpressionSuite`. Author: hyukjinkwonCloses #13759 from HyukjinKwon/SPARK-16044. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4f7f1c43 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4f7f1c43 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4f7f1c43 Branch: refs/heads/master Commit: 4f7f1c436205630ab77d3758d7210cc1a2f0d04a Parents: 18a8a9b Author: hyukjinkwon Authored: Mon Jun 20 21:55:34 2016 -0700 Committer: Reynold Xin Committed: Mon Jun 20 21:55:34 2016 -0700 -- .../apache/spark/rdd/InputFileNameHolder.scala | 2 +- .../org/apache/spark/rdd/NewHadoopRDD.scala | 7 .../spark/sql/ColumnExpressionSuite.scala | 34 ++-- 3 files changed, 40 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4f7f1c43/core/src/main/scala/org/apache/spark/rdd/InputFileNameHolder.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/InputFileNameHolder.scala b/core/src/main/scala/org/apache/spark/rdd/InputFileNameHolder.scala index 108e9d2..f40d4c8 100644 --- a/core/src/main/scala/org/apache/spark/rdd/InputFileNameHolder.scala +++ b/core/src/main/scala/org/apache/spark/rdd/InputFileNameHolder.scala @@ -21,7 +21,7 @@ import org.apache.spark.unsafe.types.UTF8String /** * This holds file names of the current Spark task. This is used in HadoopRDD, - * FileScanRDD and InputFileName function in Spark SQL. + * FileScanRDD, NewHadoopRDD and InputFileName function in Spark SQL. */ private[spark] object InputFileNameHolder { /** http://git-wip-us.apache.org/repos/asf/spark/blob/4f7f1c43/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 189dc7b..b086baa 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -135,6 +135,12 @@ class NewHadoopRDD[K, V]( val inputMetrics = context.taskMetrics().inputMetrics val existingBytesRead = inputMetrics.bytesRead + // Sets the thread local variable for the file's name + split.serializableHadoopSplit.value match { +case fs: FileSplit => InputFileNameHolder.setInputFileName(fs.getPath.toString) +case _ => InputFileNameHolder.unsetInputFileName() + } + // Find a function that will return the FileSystem bytes read by this thread. Do this before // creating RecordReader, because RecordReader's constructor might read some bytes val getBytesReadCallback: Option[() => Long] = split.serializableHadoopSplit.value match { @@ -201,6 +207,7 @@ class NewHadoopRDD[K, V]( private def close() { if (reader != null) { + InputFileNameHolder.unsetInputFileName() // Close the reader and release it. Note: it's very important that we don't close the // reader more than once, since that exposes us to MAPREDUCE-5918 when running against // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic http://git-wip-us.apache.org/repos/asf/spark/blob/4f7f1c43/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala