Repository: spark Updated Branches: refs/heads/branch-1.6 0cb06c993 -> 1ac830aca
[SPARK-16044][SQL] Backport input_file_name() for data source based on NewHadoopRDD to branch 1.6 ## What changes were proposed in this pull request? This PR backports https://github.com/apache/spark/pull/13759. (`SqlNewHadoopRDDState` was renamed to `InputFileNameHolder` and `spark` API does not exist in branch 1.6) ## How was this patch tested? Unit tests in `ColumnExpressionSuite`. Author: hyukjinkwon <gurwls...@gmail.com> Closes #13806 from HyukjinKwon/backport-SPARK-16044. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1ac830ac Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1ac830ac Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1ac830ac Branch: refs/heads/branch-1.6 Commit: 1ac830aca089e9f0b9b0bf367236ffc1184eae7e Parents: 0cb06c9 Author: hyukjinkwon <gurwls...@gmail.com> Authored: Wed Jun 29 13:11:56 2016 -0700 Committer: Reynold Xin <r...@databricks.com> Committed: Wed Jun 29 13:11:56 2016 -0700 ---------------------------------------------------------------------- .../org/apache/spark/rdd/NewHadoopRDD.scala | 7 ++++ .../spark/sql/ColumnExpressionSuite.scala | 39 ++++++++++++++++++-- 2 files changed, 42 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/1ac830ac/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index c8b4f30..46fe1ba 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -134,6 +134,12 @@ class NewHadoopRDD[K, V]( val inputMetrics = context.taskMetrics .getInputMetricsForReadMethod(DataReadMethod.Hadoop) + // Sets the thread local variable for the file's name + split.serializableHadoopSplit.value match { + case fs: FileSplit => SqlNewHadoopRDDState.setInputFileName(fs.getPath.toString) + case _ => SqlNewHadoopRDDState.unsetInputFileName() + } + // Find a function that will return the FileSystem bytes read by this thread. Do this before // creating RecordReader, because RecordReader's constructor might read some bytes val bytesReadCallback = inputMetrics.bytesReadCallback.orElse { @@ -190,6 +196,7 @@ class NewHadoopRDD[K, V]( private def close() { if (reader != null) { + SqlNewHadoopRDDState.unsetInputFileName() // Close the reader and release it. Note: it's very important that we don't close the // reader more than once, since that exposes us to MAPREDUCE-5918 when running against // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic http://git-wip-us.apache.org/repos/asf/spark/blob/1ac830ac/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala index 38c0eb5..52b3d60 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala @@ -17,9 +17,11 @@ package org.apache.spark.sql -import org.apache.spark.sql.catalyst.expressions.NamedExpression +import org.apache.hadoop.io.{LongWritable, Text} +import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFormat} import org.scalatest.Matchers._ +import org.apache.spark.sql.catalyst.expressions.NamedExpression import org.apache.spark.sql.execution.Project import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSQLContext @@ -591,15 +593,44 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext { ) } - test("InputFileName") { + test("InputFileName - SqlNewHadoopRDD") { withTempPath { dir => val data = sparkContext.parallelize(0 to 10).toDF("id") data.write.parquet(dir.getCanonicalPath) - val answer = sqlContext.read.parquet(dir.getCanonicalPath).select(inputFileName()) + val answer = sqlContext.read.parquet(dir.getCanonicalPath).select(input_file_name()) .head.getString(0) assert(answer.contains(dir.getCanonicalPath)) - checkAnswer(data.select(inputFileName()).limit(1), Row("")) + checkAnswer(data.select(input_file_name()).limit(1), Row("")) + } + } + + test("input_file_name - HadoopRDD") { + withTempPath { dir => + val data = sparkContext.parallelize((0 to 10).map(_.toString)).toDF() + data.write.text(dir.getCanonicalPath) + val df = sparkContext.textFile(dir.getCanonicalPath).toDF() + val answer = df.select(input_file_name()).head.getString(0) + assert(answer.contains(dir.getCanonicalPath)) + + checkAnswer(data.select(input_file_name()).limit(1), Row("")) + } + } + + test("input_file_name - NewHadoopRDD") { + withTempPath { dir => + val data = sparkContext.parallelize((0 to 10).map(_.toString)).toDF() + data.write.text(dir.getCanonicalPath) + val rdd = sparkContext.newAPIHadoopFile( + dir.getCanonicalPath, + classOf[NewTextInputFormat], + classOf[LongWritable], + classOf[Text]) + val df = rdd.map(pair => pair._2.toString).toDF() + val answer = df.select(input_file_name()).head.getString(0) + assert(answer.contains(dir.getCanonicalPath)) + + checkAnswer(data.select(input_file_name()).limit(1), Row("")) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org