Repository: spark Updated Branches: refs/heads/branch-1.6 a0f9cd77a -> c37ed52ec
[SPARK-11522][SQL] input_file_name() returns "" for external tables When computing partition for non-parquet relation, `HadoopRDD.compute` is used. but it does not set the thread local variable `inputFileName` in `NewSqlHadoopRDD`, like `NewSqlHadoopRDD.compute` does.. Yet, when getting the `inputFileName`, `NewSqlHadoopRDD.inputFileName` is exptected, which is empty now. Adding the setting inputFileName in HadoopRDD.compute resolves this issue. Author: xin Wu <xi...@us.ibm.com> Closes #9542 from xwu0226/SPARK-11522. (cherry picked from commit 0e79604aed116bdcb40e03553a2d103b5b1cdbae) Signed-off-by: Yin Huai <yh...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c37ed52e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c37ed52e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c37ed52e Branch: refs/heads/branch-1.6 Commit: c37ed52ecd38a400f99cc82f07440e455b772db2 Parents: a0f9cd7 Author: xin Wu <xi...@us.ibm.com> Authored: Mon Nov 16 08:10:48 2015 -0800 Committer: Yin Huai <yh...@databricks.com> Committed: Mon Nov 16 08:10:59 2015 -0800 ---------------------------------------------------------------------- .../scala/org/apache/spark/rdd/HadoopRDD.scala | 7 ++ .../spark/sql/hive/execution/HiveUDFSuite.scala | 93 +++++++++++++++++++- 2 files changed, 98 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/c37ed52e/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 0453614..7db5834 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -213,6 +213,12 @@ class HadoopRDD[K, V]( val inputMetrics = context.taskMetrics.getInputMetricsForReadMethod(DataReadMethod.Hadoop) + // Sets the thread local variable for the file's name + split.inputSplit.value match { + case fs: FileSplit => SqlNewHadoopRDD.setInputFileName(fs.getPath.toString) + case _ => SqlNewHadoopRDD.unsetInputFileName() + } + // Find a function that will return the FileSystem bytes read by this thread. Do this before // creating RecordReader, because RecordReader's constructor might read some bytes val bytesReadCallback = inputMetrics.bytesReadCallback.orElse { @@ -250,6 +256,7 @@ class HadoopRDD[K, V]( override def close() { if (reader != null) { + SqlNewHadoopRDD.unsetInputFileName() // Close the reader and release it. Note: it's very important that we don't close the // reader more than once, since that exposes us to MAPREDUCE-5918 when running against // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic http://git-wip-us.apache.org/repos/asf/spark/blob/c37ed52e/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala index 5ab477e..9deb1a6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.hive.execution -import java.io.{DataInput, DataOutput} +import java.io.{PrintWriter, File, DataInput, DataOutput} import java.util.{ArrayList, Arrays, Properties} import org.apache.hadoop.conf.Configuration @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectIn import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, ObjectInspectorFactory} import org.apache.hadoop.hive.serde2.{AbstractSerDe, SerDeStats} import org.apache.hadoop.io.Writable +import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.util.Utils @@ -44,7 +45,7 @@ case class ListStringCaseClass(l: Seq[String]) /** * A test suite for Hive custom UDFs. */ -class HiveUDFSuite extends QueryTest with TestHiveSingleton { +class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { import hiveContext.{udf, sql} import hiveContext.implicits._ @@ -348,6 +349,94 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton { sqlContext.dropTempTable("testUDF") } + + test("SPARK-11522 select input_file_name from non-parquet table"){ + + withTempDir { tempDir => + + // EXTERNAL OpenCSVSerde table pointing to LOCATION + + val file1 = new File(tempDir + "/data1") + val writer1 = new PrintWriter(file1) + writer1.write("1,2") + writer1.close() + + val file2 = new File(tempDir + "/data2") + val writer2 = new PrintWriter(file2) + writer2.write("1,2") + writer2.close() + + sql( + s"""CREATE EXTERNAL TABLE csv_table(page_id INT, impressions INT) + ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' + WITH SERDEPROPERTIES ( + \"separatorChar\" = \",\", + \"quoteChar\" = \"\\\"\", + \"escapeChar\" = \"\\\\\") + LOCATION '$tempDir' + """) + + val answer1 = + sql("SELECT input_file_name() FROM csv_table").head().getString(0) + assert(answer1.contains("data1") || answer1.contains("data2")) + + val count1 = sql("SELECT input_file_name() FROM csv_table").distinct().count() + assert(count1 == 2) + sql("DROP TABLE csv_table") + + // EXTERNAL pointing to LOCATION + + sql( + s"""CREATE EXTERNAL TABLE external_t5 (c1 int, c2 int) + ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' + LOCATION '$tempDir' + """) + + val answer2 = + sql("SELECT input_file_name() as file FROM external_t5").head().getString(0) + assert(answer1.contains("data1") || answer1.contains("data2")) + + val count2 = sql("SELECT input_file_name() as file FROM external_t5").distinct().count + assert(count2 == 2) + sql("DROP TABLE external_t5") + } + + withTempDir { tempDir => + + // External parquet pointing to LOCATION + + val parquetLocation = tempDir + "/external_parquet" + sql("SELECT 1, 2").write.parquet(parquetLocation) + + sql( + s"""CREATE EXTERNAL TABLE external_parquet(c1 int, c2 int) + STORED AS PARQUET + LOCATION '$parquetLocation' + """) + + val answer3 = + sql("SELECT input_file_name() as file FROM external_parquet").head().getString(0) + assert(answer3.contains("external_parquet")) + + val count3 = sql("SELECT input_file_name() as file FROM external_parquet").distinct().count + assert(count3 == 1) + sql("DROP TABLE external_parquet") + } + + // Non-External parquet pointing to /tmp/... + + sql("CREATE TABLE parquet_tmp(c1 int, c2 int) " + + " STORED AS parquet " + + " AS SELECT 1, 2") + + val answer4 = + sql("SELECT input_file_name() as file FROM parquet_tmp").head().getString(0) + assert(answer4.contains("parquet_tmp")) + + val count4 = sql("SELECT input_file_name() as file FROM parquet_tmp").distinct().count + assert(count4 == 1) + sql("DROP TABLE parquet_tmp") + } } class TestPair(x: Int, y: Int) extends Writable with Serializable { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org