Repository: spark
Updated Branches:
  refs/heads/branch-1.6 a0f9cd77a -> c37ed52ec


[SPARK-11522][SQL] input_file_name() returns "" for external tables

When computing partition for non-parquet relation, `HadoopRDD.compute` is used. 
but it does not set the thread local variable `inputFileName` in 
`NewSqlHadoopRDD`, like `NewSqlHadoopRDD.compute` does.. Yet, when getting the 
`inputFileName`, `NewSqlHadoopRDD.inputFileName` is exptected, which is empty 
now.
Adding the setting inputFileName in HadoopRDD.compute resolves this issue.

Author: xin Wu <xi...@us.ibm.com>

Closes #9542 from xwu0226/SPARK-11522.

(cherry picked from commit 0e79604aed116bdcb40e03553a2d103b5b1cdbae)
Signed-off-by: Yin Huai <yh...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c37ed52e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c37ed52e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c37ed52e

Branch: refs/heads/branch-1.6
Commit: c37ed52ecd38a400f99cc82f07440e455b772db2
Parents: a0f9cd7
Author: xin Wu <xi...@us.ibm.com>
Authored: Mon Nov 16 08:10:48 2015 -0800
Committer: Yin Huai <yh...@databricks.com>
Committed: Mon Nov 16 08:10:59 2015 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  |  7 ++
 .../spark/sql/hive/execution/HiveUDFSuite.scala | 93 +++++++++++++++++++-
 2 files changed, 98 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c37ed52e/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 0453614..7db5834 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -213,6 +213,12 @@ class HadoopRDD[K, V](
 
       val inputMetrics = 
context.taskMetrics.getInputMetricsForReadMethod(DataReadMethod.Hadoop)
 
+      // Sets the thread local variable for the file's name
+      split.inputSplit.value match {
+        case fs: FileSplit => 
SqlNewHadoopRDD.setInputFileName(fs.getPath.toString)
+        case _ => SqlNewHadoopRDD.unsetInputFileName()
+      }
+
       // Find a function that will return the FileSystem bytes read by this 
thread. Do this before
       // creating RecordReader, because RecordReader's constructor might read 
some bytes
       val bytesReadCallback = inputMetrics.bytesReadCallback.orElse {
@@ -250,6 +256,7 @@ class HadoopRDD[K, V](
 
       override def close() {
         if (reader != null) {
+          SqlNewHadoopRDD.unsetInputFileName()
           // Close the reader and release it. Note: it's very important that 
we don't close the
           // reader more than once, since that exposes us to MAPREDUCE-5918 
when running against
           // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to 
non-deterministic

http://git-wip-us.apache.org/repos/asf/spark/blob/c37ed52e/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
index 5ab477e..9deb1a6 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.hive.execution
 
-import java.io.{DataInput, DataOutput}
+import java.io.{PrintWriter, File, DataInput, DataOutput}
 import java.util.{ArrayList, Arrays, Properties}
 
 import org.apache.hadoop.conf.Configuration
@@ -28,6 +28,7 @@ import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectIn
 import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, 
ObjectInspectorFactory}
 import org.apache.hadoop.hive.serde2.{AbstractSerDe, SerDeStats}
 import org.apache.hadoop.io.Writable
+import org.apache.spark.sql.test.SQLTestUtils
 import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.util.Utils
@@ -44,7 +45,7 @@ case class ListStringCaseClass(l: Seq[String])
 /**
  * A test suite for Hive custom UDFs.
  */
-class HiveUDFSuite extends QueryTest with TestHiveSingleton {
+class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
 
   import hiveContext.{udf, sql}
   import hiveContext.implicits._
@@ -348,6 +349,94 @@ class HiveUDFSuite extends QueryTest with 
TestHiveSingleton {
 
     sqlContext.dropTempTable("testUDF")
   }
+
+  test("SPARK-11522 select input_file_name from non-parquet table"){
+
+    withTempDir { tempDir =>
+
+      // EXTERNAL OpenCSVSerde table pointing to LOCATION
+
+      val file1 = new File(tempDir + "/data1")
+      val writer1 = new PrintWriter(file1)
+      writer1.write("1,2")
+      writer1.close()
+
+      val file2 = new File(tempDir + "/data2")
+      val writer2 = new PrintWriter(file2)
+      writer2.write("1,2")
+      writer2.close()
+
+      sql(
+        s"""CREATE EXTERNAL TABLE csv_table(page_id INT, impressions INT)
+        ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
+        WITH SERDEPROPERTIES (
+          \"separatorChar\" = \",\",
+          \"quoteChar\"     = \"\\\"\",
+          \"escapeChar\"    = \"\\\\\")
+        LOCATION '$tempDir'
+      """)
+
+      val answer1 =
+        sql("SELECT input_file_name() FROM csv_table").head().getString(0)
+      assert(answer1.contains("data1") || answer1.contains("data2"))
+
+      val count1 = sql("SELECT input_file_name() FROM 
csv_table").distinct().count()
+      assert(count1 == 2)
+      sql("DROP TABLE csv_table")
+
+      // EXTERNAL pointing to LOCATION
+
+      sql(
+        s"""CREATE EXTERNAL TABLE external_t5 (c1 int, c2 int)
+        ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
+        LOCATION '$tempDir'
+      """)
+
+      val answer2 =
+        sql("SELECT input_file_name() as file FROM 
external_t5").head().getString(0)
+      assert(answer1.contains("data1") || answer1.contains("data2"))
+
+      val count2 = sql("SELECT input_file_name() as file FROM 
external_t5").distinct().count
+      assert(count2 == 2)
+      sql("DROP TABLE external_t5")
+    }
+
+    withTempDir { tempDir =>
+
+      // External parquet pointing to LOCATION
+
+      val parquetLocation = tempDir + "/external_parquet"
+      sql("SELECT 1, 2").write.parquet(parquetLocation)
+
+      sql(
+        s"""CREATE EXTERNAL TABLE external_parquet(c1 int, c2 int)
+        STORED AS PARQUET
+        LOCATION '$parquetLocation'
+      """)
+
+      val answer3 =
+        sql("SELECT input_file_name() as file FROM 
external_parquet").head().getString(0)
+      assert(answer3.contains("external_parquet"))
+
+      val count3 = sql("SELECT input_file_name() as file FROM 
external_parquet").distinct().count
+      assert(count3 == 1)
+      sql("DROP TABLE external_parquet")
+    }
+
+    // Non-External parquet pointing to /tmp/...
+
+    sql("CREATE TABLE parquet_tmp(c1 int, c2 int) " +
+      " STORED AS parquet " +
+      " AS SELECT 1, 2")
+
+    val answer4 =
+      sql("SELECT input_file_name() as file FROM 
parquet_tmp").head().getString(0)
+    assert(answer4.contains("parquet_tmp"))
+
+    val count4 = sql("SELECT input_file_name() as file FROM 
parquet_tmp").distinct().count
+    assert(count4 == 1)
+    sql("DROP TABLE parquet_tmp")
+  }
 }
 
 class TestPair(x: Int, y: Int) extends Writable with Serializable {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to