Repository: spark
Updated Branches:
  refs/heads/branch-1.5 ed73f5439 -> 3298fb69f


[SPARK-9449] [SQL] Include MetastoreRelation's inputFiles

Author: Michael Armbrust <mich...@databricks.com>

Closes #8119 from marmbrus/metastoreInputFiles.

(cherry picked from commit 660e6dcff8125b83cc73dbe00c90cbe58744bc66)
Signed-off-by: Michael Armbrust <mich...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3298fb69
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3298fb69
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3298fb69

Branch: refs/heads/branch-1.5
Commit: 3298fb69ff118ea53c4d5b204638c61d016d0506
Parents: ed73f54
Author: Michael Armbrust <mich...@databricks.com>
Authored: Wed Aug 12 17:07:29 2015 -0700
Committer: Michael Armbrust <mich...@databricks.com>
Committed: Wed Aug 12 17:08:01 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/DataFrame.scala  | 10 ++++---
 .../spark/sql/execution/FileRelation.scala      | 28 ++++++++++++++++++++
 .../apache/spark/sql/sources/interfaces.scala   |  6 +++--
 .../org/apache/spark/sql/DataFrameSuite.scala   | 26 +++++++++---------
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 16 +++++++++--
 5 files changed, 66 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3298fb69/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 27b994f..c466d9e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -34,10 +34,10 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, _}
+import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, 
SqlParser}
-import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, 
LogicalRDD, SQLExecution}
+import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, 
FileRelation, LogicalRDD, SQLExecution}
 import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, 
LogicalRelation}
 import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
 import org.apache.spark.sql.sources.HadoopFsRelation
@@ -1560,8 +1560,10 @@ class DataFrame private[sql](
    */
   def inputFiles: Array[String] = {
     val files: Seq[String] = logicalPlan.collect {
-      case LogicalRelation(fsBasedRelation: HadoopFsRelation) =>
-        fsBasedRelation.paths.toSeq
+      case LogicalRelation(fsBasedRelation: FileRelation) =>
+        fsBasedRelation.inputFiles
+      case fr: FileRelation =>
+        fr.inputFiles
     }.flatten
     files.toSet.toArray
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/3298fb69/sql/core/src/main/scala/org/apache/spark/sql/execution/FileRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/FileRelation.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/FileRelation.scala
new file mode 100644
index 0000000..7a2a9ee
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/FileRelation.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+/**
+ * An interface for relations that are backed by files.  When a class 
implements this interface,
+ * the list of paths that it returns will be returned to a user who calls 
`inputPaths` on any
+ * DataFrame that queries this relation.
+ */
+private[sql] trait FileRelation {
+  /** Returns the list of files that will be read when scanning this relation. 
*/
+  def inputFiles: Array[String]
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3298fb69/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 2f8417a..b3b326f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -31,7 +31,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.expressions._
 import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
-import org.apache.spark.sql.execution.RDDConversions
+import org.apache.spark.sql.execution.{FileRelation, RDDConversions}
 import org.apache.spark.sql.execution.datasources.{PartitioningUtils, 
PartitionSpec, Partition}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql._
@@ -406,7 +406,7 @@ abstract class OutputWriter {
  */
 @Experimental
 abstract class HadoopFsRelation private[sql](maybePartitionSpec: 
Option[PartitionSpec])
-  extends BaseRelation with Logging {
+  extends BaseRelation with FileRelation with Logging {
 
   override def toString: String = getClass.getSimpleName + paths.mkString("[", 
",", "]")
 
@@ -516,6 +516,8 @@ abstract class HadoopFsRelation 
private[sql](maybePartitionSpec: Option[Partitio
    */
   def paths: Array[String]
 
+  override def inputFiles: Array[String] = 
cachedLeafStatuses().map(_.getPath.toString).toArray
+
   /**
    * Partition columns.  Can be either defined by 
[[userDefinedPartitionColumns]] or automatically
    * discovered.  Note that they should always be nullable.

http://git-wip-us.apache.org/repos/asf/spark/blob/3298fb69/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index adbd951..2feec29 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -485,21 +485,23 @@ class DataFrameSuite extends QueryTest with SQLTestUtils {
   }
 
   test("inputFiles") {
-    val fakeRelation1 = new ParquetRelation(Array("/my/path", 
"/my/other/path"),
-      Some(testData.schema), None, Map.empty)(sqlContext)
-    val df1 = DataFrame(sqlContext, LogicalRelation(fakeRelation1))
-    assert(df1.inputFiles.toSet == fakeRelation1.paths.toSet)
+    withTempDir { dir =>
+      val df = Seq((1, 22)).toDF("a", "b")
 
-    val fakeRelation2 = new JSONRelation(
-      None, 1, Some(testData.schema), None, None, 
Array("/json/path"))(sqlContext)
-    val df2 = DataFrame(sqlContext, LogicalRelation(fakeRelation2))
-    assert(df2.inputFiles.toSet == fakeRelation2.paths.toSet)
+      val parquetDir = new File(dir, "parquet").getCanonicalPath
+      df.write.parquet(parquetDir)
+      val parquetDF = sqlContext.read.parquet(parquetDir)
+      assert(parquetDF.inputFiles.nonEmpty)
 
-    val unionDF = df1.unionAll(df2)
-    assert(unionDF.inputFiles.toSet == fakeRelation1.paths.toSet ++ 
fakeRelation2.paths)
+      val jsonDir = new File(dir, "json").getCanonicalPath
+      df.write.json(jsonDir)
+      val jsonDF = sqlContext.read.json(jsonDir)
+      assert(parquetDF.inputFiles.nonEmpty)
 
-    val filtered = df1.filter("false").unionAll(df2.intersect(df2))
-    assert(filtered.inputFiles.toSet == fakeRelation1.paths.toSet ++ 
fakeRelation2.paths)
+      val unioned = jsonDF.unionAll(parquetDF).inputFiles.sorted
+      val allFiles = (jsonDF.inputFiles ++ 
parquetDF.inputFiles).toSet.toArray.sorted
+      assert(unioned === allFiles)
+    }
   }
 
   ignore("show") {

http://git-wip-us.apache.org/repos/asf/spark/blob/3298fb69/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index ac9aaed..5e54978 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
 import org.apache.spark.sql.catalyst.{InternalRow, SqlParser, TableIdentifier}
-import org.apache.spark.sql.execution.datasources
+import org.apache.spark.sql.execution.{FileRelation, datasources}
 import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, 
LogicalRelation, Partition => ParquetPartition, PartitionSpec, 
ResolvedDataSource}
 import org.apache.spark.sql.hive.client._
 import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
@@ -739,7 +739,7 @@ private[hive] case class MetastoreRelation
     (databaseName: String, tableName: String, alias: Option[String])
     (val table: HiveTable)
     (@transient sqlContext: SQLContext)
-  extends LeafNode with MultiInstanceRelation {
+  extends LeafNode with MultiInstanceRelation with FileRelation {
 
   override def equals(other: Any): Boolean = other match {
     case relation: MetastoreRelation =>
@@ -888,6 +888,18 @@ private[hive] case class MetastoreRelation
   /** An attribute map for determining the ordinal for non-partition columns. 
*/
   val columnOrdinals = AttributeMap(attributes.zipWithIndex)
 
+  override def inputFiles: Array[String] = {
+    val partLocations = 
table.getPartitions(Nil).map(_.storage.location).toArray
+    if (partLocations.nonEmpty) {
+      partLocations
+    } else {
+      Array(
+        table.location.getOrElse(
+          sys.error(s"Could not get the location of ${table.qualifiedName}.")))
+    }
+  }
+
+
   override def newInstance(): MetastoreRelation = {
     MetastoreRelation(databaseName, tableName, alias)(table)(sqlContext)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to