[SPARK-5310][SQL] Fixes to Docs and Datasources API - Various Fixes to docs - Make data source traits actually interfaces
Based on #4862 but with fixed conflicts. Author: Reynold Xin <r...@databricks.com> Author: Michael Armbrust <mich...@databricks.com> Closes #4868 from marmbrus/pr/4862 and squashes the following commits: fe091ea [Michael Armbrust] Merge remote-tracking branch 'origin/master' into pr/4862 0208497 [Reynold Xin] Test fixes. 34e0a28 [Reynold Xin] [SPARK-5310][SQL] Various fixes to Spark SQL docs. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/54d19689 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/54d19689 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/54d19689 Branch: refs/heads/master Commit: 54d19689ff8d786acde5b8ada6741854ffadadea Parents: 1259994 Author: Reynold Xin <r...@databricks.com> Authored: Mon Mar 2 22:14:08 2015 -0800 Committer: Michael Armbrust <mich...@databricks.com> Committed: Mon Mar 2 22:14:08 2015 -0800 ---------------------------------------------------------------------- project/SparkBuild.scala | 29 +- .../scala/org/apache/spark/sql/DataFrame.scala | 36 +- .../scala/org/apache/spark/sql/RDDApi.scala | 4 +- .../apache/spark/sql/jdbc/JDBCRelation.scala | 3 +- .../apache/spark/sql/json/JSONRelation.scala | 5 +- .../apache/spark/sql/parquet/newParquet.scala | 3 +- .../apache/spark/sql/sources/interfaces.scala | 43 +- .../apache/spark/sql/sources/DDLTestSuite.scala | 2 +- .../spark/sql/sources/FilteredScanSuite.scala | 3 +- .../spark/sql/sources/PrunedScanSuite.scala | 3 +- .../spark/sql/sources/TableScanSuite.scala | 11 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 3 +- .../hive/execution/CreateTableAsSelect.scala | 3 +- .../execution/DescribeHiveTableCommand.scala | 4 +- .../sql/hive/execution/HiveNativeCommand.scala | 6 +- .../sql/hive/execution/HiveTableScan.scala | 4 +- .../hive/execution/InsertIntoHiveTable.scala | 6 +- .../hive/execution/ScriptTransformation.scala | 15 +- .../spark/sql/hive/execution/commands.scala | 27 +- .../spark/sql/hive/execution/package.scala | 25 - .../spark/sql/hive/HiveParquetSuite.scala | 92 +++ .../apache/spark/sql/hive/parquetSuites.scala | 767 +++++++++++++++++++ .../spark/sql/parquet/HiveParquetSuite.scala | 91 --- .../spark/sql/parquet/parquetSuites.scala | 766 ------------------ 24 files changed, 965 insertions(+), 986 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/54d19689/project/SparkBuild.scala ---------------------------------------------------------------------- diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index e4b1b96..4f17df5 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -357,6 +357,21 @@ object Unidoc { names.map(s => "org.apache.spark." + s).mkString(":") } + private def ignoreUndocumentedPackages(packages: Seq[Seq[File]]): Seq[Seq[File]] = { + packages + .map(_.filterNot(_.getName.contains("$"))) + .map(_.filterNot(_.getCanonicalPath.contains("akka"))) + .map(_.filterNot(_.getCanonicalPath.contains("deploy"))) + .map(_.filterNot(_.getCanonicalPath.contains("network"))) + .map(_.filterNot(_.getCanonicalPath.contains("shuffle"))) + .map(_.filterNot(_.getCanonicalPath.contains("executor"))) + .map(_.filterNot(_.getCanonicalPath.contains("python"))) + .map(_.filterNot(_.getCanonicalPath.contains("collection"))) + .map(_.filterNot(_.getCanonicalPath.contains("sql/catalyst"))) + .map(_.filterNot(_.getCanonicalPath.contains("sql/execution"))) + .map(_.filterNot(_.getCanonicalPath.contains("sql/hive/test"))) + } + lazy val settings = scalaJavaUnidocSettings ++ Seq ( publish := {}, @@ -368,22 +383,12 @@ object Unidoc { // Skip actual catalyst, but include the subproject. // Catalyst is not public API and contains quasiquotes which break scaladoc. unidocAllSources in (ScalaUnidoc, unidoc) := { - (unidocAllSources in (ScalaUnidoc, unidoc)).value - .map(_.filterNot(_.getCanonicalPath.contains("sql/catalyst"))) + ignoreUndocumentedPackages((unidocAllSources in (ScalaUnidoc, unidoc)).value) }, // Skip class names containing $ and some internal packages in Javadocs unidocAllSources in (JavaUnidoc, unidoc) := { - (unidocAllSources in (JavaUnidoc, unidoc)).value - .map(_.filterNot(_.getName.contains("$"))) - .map(_.filterNot(_.getCanonicalPath.contains("akka"))) - .map(_.filterNot(_.getCanonicalPath.contains("deploy"))) - .map(_.filterNot(_.getCanonicalPath.contains("network"))) - .map(_.filterNot(_.getCanonicalPath.contains("shuffle"))) - .map(_.filterNot(_.getCanonicalPath.contains("executor"))) - .map(_.filterNot(_.getCanonicalPath.contains("python"))) - .map(_.filterNot(_.getCanonicalPath.contains("collection"))) - .map(_.filterNot(_.getCanonicalPath.contains("sql/catalyst"))) + ignoreUndocumentedPackages((unidocAllSources in (JavaUnidoc, unidoc)).value) }, // Javadoc options: create a window title, and group key packages on index page http://git-wip-us.apache.org/repos/asf/spark/blob/54d19689/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index f3aac08..46f5070 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -64,7 +64,7 @@ private[sql] object DataFrame { * val people = sqlContext.parquetFile("...") * * // Create a DataFrame from data sources - * val df = + * val df = sqlContext.load("...", "json") * }}} * * Once created, it can be manipulated using the various domain-specific-language (DSL) functions @@ -80,9 +80,10 @@ private[sql] object DataFrame { * {{{ * // The following creates a new column that increases everybody's age by 10. * people("age") + 10 // in Scala + * people.col("age").plus(10); // in Java * }}} * - * A more concrete example: + * A more concrete example in Scala: * {{{ * // To create DataFrame using SQLContext * val people = sqlContext.parquetFile("...") @@ -94,6 +95,18 @@ private[sql] object DataFrame { * .agg(avg(people("salary")), max(people("age"))) * }}} * + * and in Java: + * {{{ + * // To create DataFrame using SQLContext + * DataFrame people = sqlContext.parquetFile("..."); + * DataFrame department = sqlContext.parquetFile("..."); + * + * people.filter("age".gt(30)) + * .join(department, people.col("deptId").equalTo(department("id"))) + * .groupBy(department.col("name"), "gender") + * .agg(avg(people.col("salary")), max(people.col("age"))); + * }}} + * * @groupname basic Basic DataFrame functions * @groupname dfops Language Integrated Queries * @groupname rdd RDD Operations @@ -102,7 +115,7 @@ private[sql] object DataFrame { */ // TODO: Improve documentation. @Experimental -class DataFrame protected[sql]( +class DataFrame private[sql]( @transient val sqlContext: SQLContext, @DeveloperApi @transient val queryExecution: SQLContext#QueryExecution) extends RDDApi[Row] with Serializable { @@ -295,12 +308,14 @@ class DataFrame protected[sql]( * 1984 04 0.450090 0.483521 * }}} * @param numRows Number of rows to show - * @group basic + * + * @group action */ def show(numRows: Int): Unit = println(showString(numRows)) /** * Displays the top 20 rows of [[DataFrame]] in a tabular form. + * @group action */ def show(): Unit = show(20) @@ -738,16 +753,19 @@ class DataFrame protected[sql]( /** * Returns the first `n` rows. + * @group action */ def head(n: Int): Array[Row] = limit(n).collect() /** * Returns the first row. + * @group action */ def head(): Row = head(1).head /** * Returns the first row. Alias for head(). + * @group action */ override def first(): Row = head() @@ -834,6 +852,11 @@ class DataFrame protected[sql]( /** * @group basic */ + override def cache(): this.type = persist() + + /** + * @group basic + */ override def persist(newLevel: StorageLevel): this.type = { sqlContext.cacheManager.cacheQuery(this, None, newLevel) this @@ -847,6 +870,11 @@ class DataFrame protected[sql]( this } + /** + * @group basic + */ + override def unpersist(): this.type = unpersist(blocking = false) + ///////////////////////////////////////////////////////////////////////////// // I/O ///////////////////////////////////////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/spark/blob/54d19689/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala b/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala index df866fd..ba4373f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala @@ -29,13 +29,13 @@ import org.apache.spark.storage.StorageLevel */ private[sql] trait RDDApi[T] { - def cache(): this.type = persist() + def cache(): this.type def persist(): this.type def persist(newLevel: StorageLevel): this.type - def unpersist(): this.type = unpersist(blocking = false) + def unpersist(): this.type def unpersist(blocking: Boolean): this.type http://git-wip-us.apache.org/repos/asf/spark/blob/54d19689/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala index beb76f2..1778d39 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala @@ -119,7 +119,8 @@ private[sql] case class JDBCRelation( url: String, table: String, parts: Array[Partition])(@transient val sqlContext: SQLContext) - extends PrunedFilteredScan { + extends BaseRelation + with PrunedFilteredScan { override val schema = JDBCRDD.resolveTable(url, table) http://git-wip-us.apache.org/repos/asf/spark/blob/54d19689/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala index f9d0ba2..b645199 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala @@ -90,7 +90,10 @@ private[sql] case class JSONRelation( samplingRatio: Double, userSpecifiedSchema: Option[StructType])( @transient val sqlContext: SQLContext) - extends TableScan with InsertableRelation { + extends BaseRelation + with TableScan + with InsertableRelation { + // TODO: Support partitioned JSON relation. private def baseRDD = sqlContext.sparkContext.textFile(path) http://git-wip-us.apache.org/repos/asf/spark/blob/54d19689/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 8d95858..234e6bb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -159,7 +159,8 @@ private[sql] case class ParquetRelation2( maybeSchema: Option[StructType] = None, maybePartitionSpec: Option[PartitionSpec] = None)( @transient val sqlContext: SQLContext) - extends CatalystScan + extends BaseRelation + with CatalystScan with InsertableRelation with SparkHadoopMapReduceUtil with Logging { http://git-wip-us.apache.org/repos/asf/spark/blob/54d19689/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 0c4b706..a046a48 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.spark.sql.sources import org.apache.spark.annotation.{Experimental, DeveloperApi} @@ -90,12 +91,6 @@ trait CreatableRelationProvider { * existing data is expected to be overwritten by the contents of the DataFrame. * ErrorIfExists mode means that when saving a DataFrame to a data source, * if data already exists, an exception is expected to be thrown. - * - * @param sqlContext - * @param mode - * @param parameters - * @param data - * @return */ def createRelation( sqlContext: SQLContext, @@ -138,7 +133,7 @@ abstract class BaseRelation { * A BaseRelation that can produce all of its tuples as an RDD of Row objects. */ @DeveloperApi -trait TableScan extends BaseRelation { +trait TableScan { def buildScan(): RDD[Row] } @@ -148,7 +143,7 @@ trait TableScan extends BaseRelation { * containing all of its tuples as Row objects. */ @DeveloperApi -trait PrunedScan extends BaseRelation { +trait PrunedScan { def buildScan(requiredColumns: Array[String]): RDD[Row] } @@ -162,25 +157,11 @@ trait PrunedScan extends BaseRelation { * as filtering partitions based on a bloom filter. */ @DeveloperApi -trait PrunedFilteredScan extends BaseRelation { +trait PrunedFilteredScan { def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] } /** - * ::Experimental:: - * An interface for experimenting with a more direct connection to the query planner. Compared to - * [[PrunedFilteredScan]], this operator receives the raw expressions from the - * [[org.apache.spark.sql.catalyst.plans.logical.LogicalPlan]]. Unlike the other APIs this - * interface is not designed to be binary compatible across releases and thus should only be used - * for experimentation. - */ -@Experimental -trait CatalystScan extends BaseRelation { - def buildScan(requiredColumns: Seq[Attribute], filters: Seq[Expression]): RDD[Row] -} - -@DeveloperApi -/** * ::DeveloperApi:: * A BaseRelation that can be used to insert data into it through the insert method. * If overwrite in insert method is true, the old data in the relation should be overwritten with @@ -196,6 +177,20 @@ trait CatalystScan extends BaseRelation { * If a data source needs to check the actual nullability of a field, it needs to do it in the * insert method. */ -trait InsertableRelation extends BaseRelation { +@DeveloperApi +trait InsertableRelation { def insert(data: DataFrame, overwrite: Boolean): Unit } + +/** + * ::Experimental:: + * An interface for experimenting with a more direct connection to the query planner. Compared to + * [[PrunedFilteredScan]], this operator receives the raw expressions from the + * [[org.apache.spark.sql.catalyst.plans.logical.LogicalPlan]]. Unlike the other APIs this + * interface is NOT designed to be binary compatible across releases and thus should only be used + * for experimentation. + */ +@Experimental +trait CatalystScan { + def buildScan(requiredColumns: Seq[Attribute], filters: Seq[Expression]): RDD[Row] +} http://git-wip-us.apache.org/repos/asf/spark/blob/54d19689/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala index 0ec756b..54af50c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala @@ -29,7 +29,7 @@ class DDLScanSource extends RelationProvider { } case class SimpleDDLScan(from: Int, to: Int)(@transient val sqlContext: SQLContext) - extends TableScan { + extends BaseRelation with TableScan { override def schema = StructType(Seq( http://git-wip-us.apache.org/repos/asf/spark/blob/54d19689/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala index 41cd356..ffeccf0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala @@ -32,7 +32,8 @@ class FilteredScanSource extends RelationProvider { } case class SimpleFilteredScan(from: Int, to: Int)(@transient val sqlContext: SQLContext) - extends PrunedFilteredScan { + extends BaseRelation + with PrunedFilteredScan { override def schema = StructType( http://git-wip-us.apache.org/repos/asf/spark/blob/54d19689/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala index a33cf11..08fb538 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala @@ -31,7 +31,8 @@ class PrunedScanSource extends RelationProvider { } case class SimplePrunedScan(from: Int, to: Int)(@transient val sqlContext: SQLContext) - extends PrunedScan { + extends BaseRelation + with PrunedScan { override def schema = StructType( http://git-wip-us.apache.org/repos/asf/spark/blob/54d19689/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala index 0a4d4b6..7928600 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala @@ -33,7 +33,7 @@ class SimpleScanSource extends RelationProvider { } case class SimpleScan(from: Int, to: Int)(@transient val sqlContext: SQLContext) - extends TableScan { + extends BaseRelation with TableScan { override def schema = StructType(StructField("i", IntegerType, nullable = false) :: Nil) @@ -51,10 +51,11 @@ class AllDataTypesScanSource extends SchemaRelationProvider { } case class AllDataTypesScan( - from: Int, - to: Int, - userSpecifiedSchema: StructType)(@transient val sqlContext: SQLContext) - extends TableScan { + from: Int, + to: Int, + userSpecifiedSchema: StructType)(@transient val sqlContext: SQLContext) + extends BaseRelation + with TableScan { override def schema = userSpecifiedSchema http://git-wip-us.apache.org/repos/asf/spark/blob/54d19689/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 86fc654..fe86bd2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -777,7 +777,8 @@ private[hive] case class MetastoreRelation val columnOrdinals = AttributeMap(attributes.zipWithIndex) } -object HiveMetastoreTypes { + +private[hive] object HiveMetastoreTypes { protected val ddlParser = new DDLParser(HiveQl.parseSql(_)) def toDataType(metastoreType: String): DataType = synchronized { http://git-wip-us.apache.org/repos/asf/spark/blob/54d19689/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala index a547bab..a0c91cb 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala @@ -28,7 +28,6 @@ import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.hive.MetastoreRelation /** - * :: Experimental :: * Create table and insert the query result into it. * @param database the database name of the new relation * @param tableName the table name of the new relation @@ -38,7 +37,7 @@ import org.apache.spark.sql.hive.MetastoreRelation * @param desc the CreateTableDesc, which may contains serde, storage handler etc. */ -@Experimental +private[hive] case class CreateTableAsSelect( database: String, tableName: String, http://git-wip-us.apache.org/repos/asf/spark/blob/54d19689/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala index 07b5a84..d0510aa 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala @@ -29,11 +29,9 @@ import org.apache.spark.sql.hive.HiveShim import org.apache.spark.sql.SQLContext /** - * :: DeveloperApi :: - * * Implementation for "describe [extended] table". */ -@DeveloperApi +private[hive] case class DescribeHiveTableCommand( table: MetastoreRelation, override val output: Seq[Attribute], http://git-wip-us.apache.org/repos/asf/spark/blob/54d19689/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala index 781a2e9..9636da2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala @@ -17,17 +17,13 @@ package org.apache.spark.sql.hive.execution -import org.apache.spark.annotation.DeveloperApi import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Row} import org.apache.spark.sql.execution.RunnableCommand import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.SQLContext import org.apache.spark.sql.types.StringType -/** - * :: DeveloperApi :: - */ -@DeveloperApi +private[hive] case class HiveNativeCommand(sql: String) extends RunnableCommand { override def output = http://git-wip-us.apache.org/repos/asf/spark/blob/54d19689/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala index b56175f..5b3cf28 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala @@ -26,21 +26,19 @@ import org.apache.hadoop.hive.serde2.objectinspector._ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils -import org.apache.spark.annotation.DeveloperApi import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution._ import org.apache.spark.sql.hive._ import org.apache.spark.sql.types.{BooleanType, DataType} /** - * :: DeveloperApi :: * The Hive table scan operator. Column and partition pruning are both handled. * * @param requestedAttributes Attributes to be fetched from the Hive table. * @param relation The Hive table be be scanned. * @param partitionPruningPred An optional partition pruning predicate for partitioned table. */ -@DeveloperApi +private[hive] case class HiveTableScan( requestedAttributes: Seq[Attribute], relation: MetastoreRelation, http://git-wip-us.apache.org/repos/asf/spark/blob/54d19689/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 91af35f..ba5c8e0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -32,7 +32,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.Object import org.apache.hadoop.hive.serde2.objectinspector._ import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf} -import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.Row import org.apache.spark.sql.execution.{UnaryNode, SparkPlan} @@ -41,10 +40,7 @@ import org.apache.spark.sql.hive.{ ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.sql.hive.HiveShim._ import org.apache.spark.{SerializableWritable, SparkException, TaskContext} -/** - * :: DeveloperApi :: - */ -@DeveloperApi +private[hive] case class InsertIntoHiveTable( table: MetastoreRelation, partition: Map[String, Option[String]], http://git-wip-us.apache.org/repos/asf/spark/blob/54d19689/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala index c54fbb6..0c9aee3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala @@ -21,15 +21,12 @@ import java.io.{BufferedReader, InputStreamReader} import java.io.{DataInputStream, DataOutputStream, EOFException} import java.util.Properties +import scala.collection.JavaConversions._ + import org.apache.hadoop.hive.serde.serdeConstants import org.apache.hadoop.hive.serde2.AbstractSerDe -import org.apache.hadoop.hive.serde2.Serializer -import org.apache.hadoop.hive.serde2.Deserializer import org.apache.hadoop.hive.serde2.objectinspector._ -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.ObjectInspectorOptions -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption -import org.apache.spark.annotation.DeveloperApi import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.ScriptInputOutputSchema import org.apache.spark.sql.execution._ @@ -38,19 +35,14 @@ import org.apache.spark.sql.hive.{HiveContext, HiveInspectors} import org.apache.spark.sql.hive.HiveShim._ import org.apache.spark.util.Utils - -/* Implicit conversions */ -import scala.collection.JavaConversions._ - /** - * :: DeveloperApi :: * Transforms the input by forking and running the specified script. * * @param input the set of expression that should be passed to the script. * @param script the command that should be executed. * @param output the attributes that are produced by the script. */ -@DeveloperApi +private[hive] case class ScriptTransformation( input: Seq[Expression], script: String, @@ -175,6 +167,7 @@ case class ScriptTransformation( /** * The wrapper class of Hive input and output schema properties */ +private[hive] case class HiveScriptIOSchema ( inputRowFormat: Seq[(String, String)], outputRowFormat: Seq[(String, String)], http://git-wip-us.apache.org/repos/asf/spark/blob/54d19689/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 36bd3f8..63ad145 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.hive.execution -import org.apache.spark.annotation.DeveloperApi import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries import org.apache.spark.sql.catalyst.util._ @@ -30,14 +29,13 @@ import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.types.StructType /** - * :: DeveloperApi :: * Analyzes the given table in the current database to generate statistics, which will be * used in query optimizations. * * Right now, it only supports Hive tables and it only updates the size of a Hive table * in the Hive metastore. */ -@DeveloperApi +private[hive] case class AnalyzeTable(tableName: String) extends RunnableCommand { override def run(sqlContext: SQLContext) = { @@ -47,10 +45,9 @@ case class AnalyzeTable(tableName: String) extends RunnableCommand { } /** - * :: DeveloperApi :: * Drops a table from the metastore and removes it if it is cached. */ -@DeveloperApi +private[hive] case class DropTable( tableName: String, ifExists: Boolean) extends RunnableCommand { @@ -75,10 +72,7 @@ case class DropTable( } } -/** - * :: DeveloperApi :: - */ -@DeveloperApi +private[hive] case class AddJar(path: String) extends RunnableCommand { override def run(sqlContext: SQLContext) = { @@ -89,10 +83,7 @@ case class AddJar(path: String) extends RunnableCommand { } } -/** - * :: DeveloperApi :: - */ -@DeveloperApi +private[hive] case class AddFile(path: String) extends RunnableCommand { override def run(sqlContext: SQLContext) = { @@ -103,10 +94,7 @@ case class AddFile(path: String) extends RunnableCommand { } } -/** - * :: DeveloperApi :: - */ -@DeveloperApi +private[hive] case class CreateMetastoreDataSource( tableName: String, userSpecifiedSchema: Option[StructType], @@ -146,10 +134,7 @@ case class CreateMetastoreDataSource( } } -/** - * :: DeveloperApi :: - */ -@DeveloperApi +private[hive] case class CreateMetastoreDataSourceAsSelect( tableName: String, provider: String, http://git-wip-us.apache.org/repos/asf/spark/blob/54d19689/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/package.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/package.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/package.scala deleted file mode 100644 index 4989c42..0000000 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/package.scala +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive - -/** - * Physical execution operators used for running queries against data stored in Hive. These - * are not intended for use by users, but are documents so that it is easier to understand - * the output of EXPLAIN queries. - */ -package object execution http://git-wip-us.apache.org/repos/asf/spark/blob/54d19689/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala new file mode 100644 index 0000000..7ff5719 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.spark.sql.catalyst.expressions.Row +import org.apache.spark.sql.hive.test.TestHive +import org.apache.spark.sql.parquet.ParquetTest +import org.apache.spark.sql.{QueryTest, SQLConf} + +case class Cases(lower: String, UPPER: String) + +class HiveParquetSuite extends QueryTest with ParquetTest { + val sqlContext = TestHive + + import sqlContext._ + + def run(prefix: String): Unit = { + test(s"$prefix: Case insensitive attribute names") { + withParquetTable((1 to 4).map(i => Cases(i.toString, i.toString)), "cases") { + val expected = (1 to 4).map(i => Row(i.toString)) + checkAnswer(sql("SELECT upper FROM cases"), expected) + checkAnswer(sql("SELECT LOWER FROM cases"), expected) + } + } + + test(s"$prefix: SELECT on Parquet table") { + val data = (1 to 4).map(i => (i, s"val_$i")) + withParquetTable(data, "t") { + checkAnswer(sql("SELECT * FROM t"), data.map(Row.fromTuple)) + } + } + + test(s"$prefix: Simple column projection + filter on Parquet table") { + withParquetTable((1 to 4).map(i => (i % 2 == 0, i, s"val_$i")), "t") { + checkAnswer( + sql("SELECT `_1`, `_3` FROM t WHERE `_1` = true"), + Seq(Row(true, "val_2"), Row(true, "val_4"))) + } + } + + test(s"$prefix: Converting Hive to Parquet Table via saveAsParquetFile") { + withTempPath { dir => + sql("SELECT * FROM src").saveAsParquetFile(dir.getCanonicalPath) + parquetFile(dir.getCanonicalPath).registerTempTable("p") + withTempTable("p") { + checkAnswer( + sql("SELECT * FROM src ORDER BY key"), + sql("SELECT * from p ORDER BY key").collect().toSeq) + } + } + } + + test(s"$prefix: INSERT OVERWRITE TABLE Parquet table") { + withParquetTable((1 to 10).map(i => (i, s"val_$i")), "t") { + withTempPath { file => + sql("SELECT * FROM t LIMIT 1").saveAsParquetFile(file.getCanonicalPath) + parquetFile(file.getCanonicalPath).registerTempTable("p") + withTempTable("p") { + // let's do three overwrites for good measure + sql("INSERT OVERWRITE TABLE p SELECT * FROM t") + sql("INSERT OVERWRITE TABLE p SELECT * FROM t") + sql("INSERT OVERWRITE TABLE p SELECT * FROM t") + checkAnswer(sql("SELECT * FROM p"), sql("SELECT * FROM t").collect().toSeq) + } + } + } + } + } + + withSQLConf(SQLConf.PARQUET_USE_DATA_SOURCE_API -> "true") { + run("Parquet data source enabled") + } + + withSQLConf(SQLConf.PARQUET_USE_DATA_SOURCE_API -> "false") { + run("Parquet data source disabled") + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/54d19689/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala new file mode 100644 index 0000000..1904f5f --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -0,0 +1,767 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import java.io.File + +import org.scalatest.BeforeAndAfterAll + +import org.apache.spark.sql.{QueryTest, SQLConf, SaveMode} +import org.apache.spark.sql.catalyst.expressions.Row +import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD} +import org.apache.spark.sql.hive.execution.HiveTableScan +import org.apache.spark.sql.hive.test.TestHive._ +import org.apache.spark.sql.hive.test.TestHive.implicits._ +import org.apache.spark.sql.sources.{InsertIntoDataSource, LogicalRelation} +import org.apache.spark.sql.parquet.{ParquetRelation2, ParquetTableScan} +import org.apache.spark.sql.SaveMode +import org.apache.spark.sql.types._ + +// The data where the partitioning key exists only in the directory structure. +case class ParquetData(intField: Int, stringField: String) +// The data that also includes the partitioning key +case class ParquetDataWithKey(p: Int, intField: Int, stringField: String) + +case class StructContainer(intStructField :Int, stringStructField: String) + +case class ParquetDataWithComplexTypes( + intField: Int, + stringField: String, + structField: StructContainer, + arrayField: Seq[Int]) + +case class ParquetDataWithKeyAndComplexTypes( + p: Int, + intField: Int, + stringField: String, + structField: StructContainer, + arrayField: Seq[Int]) + +/** + * A suite to test the automatic conversion of metastore tables with parquet data to use the + * built in parquet support. + */ +class ParquetMetastoreSuiteBase extends ParquetPartitioningTest { + override def beforeAll(): Unit = { + super.beforeAll() + + sql(s""" + create external table partitioned_parquet + ( + intField INT, + stringField STRING + ) + PARTITIONED BY (p int) + ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + STORED AS + INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + location '${partitionedTableDir.getCanonicalPath}' + """) + + sql(s""" + create external table partitioned_parquet_with_key + ( + intField INT, + stringField STRING + ) + PARTITIONED BY (p int) + ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + STORED AS + INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + location '${partitionedTableDirWithKey.getCanonicalPath}' + """) + + sql(s""" + create external table normal_parquet + ( + intField INT, + stringField STRING + ) + ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + STORED AS + INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + location '${new File(normalTableDir, "normal").getCanonicalPath}' + """) + + sql(s""" + CREATE EXTERNAL TABLE partitioned_parquet_with_complextypes + ( + intField INT, + stringField STRING, + structField STRUCT<intStructField: INT, stringStructField: STRING>, + arrayField ARRAY<INT> + ) + PARTITIONED BY (p int) + ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + STORED AS + INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + LOCATION '${partitionedTableDirWithComplexTypes.getCanonicalPath}' + """) + + sql(s""" + CREATE EXTERNAL TABLE partitioned_parquet_with_key_and_complextypes + ( + intField INT, + stringField STRING, + structField STRUCT<intStructField: INT, stringStructField: STRING>, + arrayField ARRAY<INT> + ) + PARTITIONED BY (p int) + ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + STORED AS + INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + LOCATION '${partitionedTableDirWithKeyAndComplexTypes.getCanonicalPath}' + """) + + (1 to 10).foreach { p => + sql(s"ALTER TABLE partitioned_parquet ADD PARTITION (p=$p)") + } + + (1 to 10).foreach { p => + sql(s"ALTER TABLE partitioned_parquet_with_key ADD PARTITION (p=$p)") + } + + (1 to 10).foreach { p => + sql(s"ALTER TABLE partitioned_parquet_with_key_and_complextypes ADD PARTITION (p=$p)") + } + + (1 to 10).foreach { p => + sql(s"ALTER TABLE partitioned_parquet_with_complextypes ADD PARTITION (p=$p)") + } + + val rdd1 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}""")) + jsonRDD(rdd1).registerTempTable("jt") + val rdd2 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":[$i, null]}""")) + jsonRDD(rdd2).registerTempTable("jt_array") + + setConf("spark.sql.hive.convertMetastoreParquet", "true") + } + + override def afterAll(): Unit = { + sql("DROP TABLE partitioned_parquet") + sql("DROP TABLE partitioned_parquet_with_key") + sql("DROP TABLE partitioned_parquet_with_complextypes") + sql("DROP TABLE partitioned_parquet_with_key_and_complextypes") + sql("DROP TABLE normal_parquet") + sql("DROP TABLE IF EXISTS jt") + sql("DROP TABLE IF EXISTS jt_array") + setConf("spark.sql.hive.convertMetastoreParquet", "false") + } + + test(s"conversion is working") { + assert( + sql("SELECT * FROM normal_parquet").queryExecution.executedPlan.collect { + case _: HiveTableScan => true + }.isEmpty) + assert( + sql("SELECT * FROM normal_parquet").queryExecution.executedPlan.collect { + case _: ParquetTableScan => true + case _: PhysicalRDD => true + }.nonEmpty) + } +} + +class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { + val originalConf = conf.parquetUseDataSourceApi + + override def beforeAll(): Unit = { + super.beforeAll() + + sql( + """ + |create table test_parquet + |( + | intField INT, + | stringField STRING + |) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + |STORED AS + | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + """.stripMargin) + + conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true") + } + + override def afterAll(): Unit = { + super.afterAll() + sql("DROP TABLE IF EXISTS test_parquet") + + setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString) + } + + test("scan an empty parquet table") { + checkAnswer(sql("SELECT count(*) FROM test_parquet"), Row(0)) + } + + test("scan an empty parquet table with upper case") { + checkAnswer(sql("SELECT count(INTFIELD) FROM TEST_parquet"), Row(0)) + } + + test("insert into an empty parquet table") { + sql( + """ + |create table test_insert_parquet + |( + | intField INT, + | stringField STRING + |) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + |STORED AS + | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + """.stripMargin) + + // Insert into am empty table. + sql("insert into table test_insert_parquet select a, b from jt where jt.a > 5") + checkAnswer( + sql(s"SELECT intField, stringField FROM test_insert_parquet WHERE intField < 8"), + Row(6, "str6") :: Row(7, "str7") :: Nil + ) + // Insert overwrite. + sql("insert overwrite table test_insert_parquet select a, b from jt where jt.a < 5") + checkAnswer( + sql(s"SELECT intField, stringField FROM test_insert_parquet WHERE intField > 2"), + Row(3, "str3") :: Row(4, "str4") :: Nil + ) + sql("DROP TABLE IF EXISTS test_insert_parquet") + + // Create it again. + sql( + """ + |create table test_insert_parquet + |( + | intField INT, + | stringField STRING + |) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + |STORED AS + | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + """.stripMargin) + // Insert overwrite an empty table. + sql("insert overwrite table test_insert_parquet select a, b from jt where jt.a < 5") + checkAnswer( + sql(s"SELECT intField, stringField FROM test_insert_parquet WHERE intField > 2"), + Row(3, "str3") :: Row(4, "str4") :: Nil + ) + // Insert into the table. + sql("insert into table test_insert_parquet select a, b from jt") + checkAnswer( + sql(s"SELECT intField, stringField FROM test_insert_parquet"), + (1 to 10).map(i => Row(i, s"str$i")) ++ (1 to 4).map(i => Row(i, s"str$i")) + ) + sql("DROP TABLE IF EXISTS test_insert_parquet") + } + + test("scan a parquet table created through a CTAS statement") { + sql( + """ + |create table test_parquet_ctas ROW FORMAT + |SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + |STORED AS + | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + |AS select * from jt + """.stripMargin) + + checkAnswer( + sql(s"SELECT a, b FROM test_parquet_ctas WHERE a = 1"), + Seq(Row(1, "str1")) + ) + + table("test_parquet_ctas").queryExecution.analyzed match { + case LogicalRelation(p: ParquetRelation2) => // OK + case _ => + fail( + s"test_parquet_ctas should be converted to ${classOf[ParquetRelation2].getCanonicalName}") + } + + sql("DROP TABLE IF EXISTS test_parquet_ctas") + } + + test("MetastoreRelation in InsertIntoTable will be converted") { + sql( + """ + |create table test_insert_parquet + |( + | intField INT + |) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + |STORED AS + | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + """.stripMargin) + + val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt") + df.queryExecution.executedPlan match { + case ExecutedCommand( + InsertIntoDataSource( + LogicalRelation(r: ParquetRelation2), query, overwrite)) => // OK + case o => fail("test_insert_parquet should be converted to a " + + s"${classOf[ParquetRelation2].getCanonicalName} and " + + s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan." + + s"However, found a ${o.toString} ") + } + + checkAnswer( + sql("SELECT intField FROM test_insert_parquet WHERE test_insert_parquet.intField > 5"), + sql("SELECT a FROM jt WHERE jt.a > 5").collect() + ) + + sql("DROP TABLE IF EXISTS test_insert_parquet") + } + + test("MetastoreRelation in InsertIntoHiveTable will be converted") { + sql( + """ + |create table test_insert_parquet + |( + | int_array array<int> + |) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + |STORED AS + | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + """.stripMargin) + + val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array") + df.queryExecution.executedPlan match { + case ExecutedCommand( + InsertIntoDataSource( + LogicalRelation(r: ParquetRelation2), query, overwrite)) => // OK + case o => fail("test_insert_parquet should be converted to a " + + s"${classOf[ParquetRelation2].getCanonicalName} and " + + s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan." + + s"However, found a ${o.toString} ") + } + + checkAnswer( + sql("SELECT int_array FROM test_insert_parquet"), + sql("SELECT a FROM jt_array").collect() + ) + + sql("DROP TABLE IF EXISTS test_insert_parquet") + } +} + +class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase { + val originalConf = conf.parquetUseDataSourceApi + + override def beforeAll(): Unit = { + super.beforeAll() + conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "false") + } + + override def afterAll(): Unit = { + super.afterAll() + setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString) + } + + test("MetastoreRelation in InsertIntoTable will not be converted") { + sql( + """ + |create table test_insert_parquet + |( + | intField INT + |) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + |STORED AS + | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + """.stripMargin) + + val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt") + df.queryExecution.executedPlan match { + case insert: execution.InsertIntoHiveTable => // OK + case o => fail(s"The SparkPlan should be ${classOf[InsertIntoHiveTable].getCanonicalName}. " + + s"However, found ${o.toString}.") + } + + checkAnswer( + sql("SELECT intField FROM test_insert_parquet WHERE test_insert_parquet.intField > 5"), + sql("SELECT a FROM jt WHERE jt.a > 5").collect() + ) + + sql("DROP TABLE IF EXISTS test_insert_parquet") + } + + // TODO: enable it after the fix of SPARK-5950. + ignore("MetastoreRelation in InsertIntoHiveTable will not be converted") { + sql( + """ + |create table test_insert_parquet + |( + | int_array array<int> + |) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + |STORED AS + | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + """.stripMargin) + + val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array") + df.queryExecution.executedPlan match { + case insert: execution.InsertIntoHiveTable => // OK + case o => fail(s"The SparkPlan should be ${classOf[InsertIntoHiveTable].getCanonicalName}. " + + s"However, found ${o.toString}.") + } + + checkAnswer( + sql("SELECT int_array FROM test_insert_parquet"), + sql("SELECT a FROM jt_array").collect() + ) + + sql("DROP TABLE IF EXISTS test_insert_parquet") + } +} + +/** + * A suite of tests for the Parquet support through the data sources API. + */ +class ParquetSourceSuiteBase extends ParquetPartitioningTest { + override def beforeAll(): Unit = { + super.beforeAll() + + sql( s""" + create temporary table partitioned_parquet + USING org.apache.spark.sql.parquet + OPTIONS ( + path '${partitionedTableDir.getCanonicalPath}' + ) + """) + + sql( s""" + create temporary table partitioned_parquet_with_key + USING org.apache.spark.sql.parquet + OPTIONS ( + path '${partitionedTableDirWithKey.getCanonicalPath}' + ) + """) + + sql( s""" + create temporary table normal_parquet + USING org.apache.spark.sql.parquet + OPTIONS ( + path '${new File(partitionedTableDir, "p=1").getCanonicalPath}' + ) + """) + + sql( s""" + CREATE TEMPORARY TABLE partitioned_parquet_with_key_and_complextypes + USING org.apache.spark.sql.parquet + OPTIONS ( + path '${partitionedTableDirWithKeyAndComplexTypes.getCanonicalPath}' + ) + """) + + sql( s""" + CREATE TEMPORARY TABLE partitioned_parquet_with_complextypes + USING org.apache.spark.sql.parquet + OPTIONS ( + path '${partitionedTableDirWithComplexTypes.getCanonicalPath}' + ) + """) + } + + test("SPARK-6016 make sure to use the latest footers") { + sql("drop table if exists spark_6016_fix") + + // Create a DataFrame with two partitions. So, the created table will have two parquet files. + val df1 = jsonRDD(sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i}"""), 2)) + df1.saveAsTable("spark_6016_fix", "parquet", SaveMode.Overwrite) + checkAnswer( + sql("select * from spark_6016_fix"), + (1 to 10).map(i => Row(i)) + ) + + // Create a DataFrame with four partitions. So, the created table will have four parquet files. + val df2 = jsonRDD(sparkContext.parallelize((1 to 10).map(i => s"""{"b":$i}"""), 4)) + df2.saveAsTable("spark_6016_fix", "parquet", SaveMode.Overwrite) + // For the bug of SPARK-6016, we are caching two outdated footers for df1. Then, + // since the new table has four parquet files, we are trying to read new footers from two files + // and then merge metadata in footers of these four (two outdated ones and two latest one), + // which will cause an error. + checkAnswer( + sql("select * from spark_6016_fix"), + (1 to 10).map(i => Row(i)) + ) + + sql("drop table spark_6016_fix") + } +} + +class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase { + val originalConf = conf.parquetUseDataSourceApi + + override def beforeAll(): Unit = { + super.beforeAll() + conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true") + } + + override def afterAll(): Unit = { + super.afterAll() + setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString) + } + + test("values in arrays and maps stored in parquet are always nullable") { + val df = createDataFrame(Tuple2(Map(2 -> 3), Seq(4, 5, 6)) :: Nil).toDF("m", "a") + val mapType1 = MapType(IntegerType, IntegerType, valueContainsNull = false) + val arrayType1 = ArrayType(IntegerType, containsNull = false) + val expectedSchema1 = + StructType( + StructField("m", mapType1, nullable = true) :: + StructField("a", arrayType1, nullable = true) :: Nil) + assert(df.schema === expectedSchema1) + + df.saveAsTable("alwaysNullable", "parquet") + + val mapType2 = MapType(IntegerType, IntegerType, valueContainsNull = true) + val arrayType2 = ArrayType(IntegerType, containsNull = true) + val expectedSchema2 = + StructType( + StructField("m", mapType2, nullable = true) :: + StructField("a", arrayType2, nullable = true) :: Nil) + + assert(table("alwaysNullable").schema === expectedSchema2) + + checkAnswer( + sql("SELECT m, a FROM alwaysNullable"), + Row(Map(2 -> 3), Seq(4, 5, 6))) + + sql("DROP TABLE alwaysNullable") + } +} + +class ParquetDataSourceOffSourceSuite extends ParquetSourceSuiteBase { + val originalConf = conf.parquetUseDataSourceApi + + override def beforeAll(): Unit = { + super.beforeAll() + conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "false") + } + + override def afterAll(): Unit = { + super.afterAll() + setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString) + } +} + +/** + * A collection of tests for parquet data with various forms of partitioning. + */ +abstract class ParquetPartitioningTest extends QueryTest with BeforeAndAfterAll { + var partitionedTableDir: File = null + var normalTableDir: File = null + var partitionedTableDirWithKey: File = null + var partitionedTableDirWithComplexTypes: File = null + var partitionedTableDirWithKeyAndComplexTypes: File = null + + override def beforeAll(): Unit = { + partitionedTableDir = File.createTempFile("parquettests", "sparksql") + partitionedTableDir.delete() + partitionedTableDir.mkdir() + + normalTableDir = File.createTempFile("parquettests", "sparksql") + normalTableDir.delete() + normalTableDir.mkdir() + + (1 to 10).foreach { p => + val partDir = new File(partitionedTableDir, s"p=$p") + sparkContext.makeRDD(1 to 10) + .map(i => ParquetData(i, s"part-$p")) + .toDF() + .saveAsParquetFile(partDir.getCanonicalPath) + } + + sparkContext + .makeRDD(1 to 10) + .map(i => ParquetData(i, s"part-1")) + .toDF() + .saveAsParquetFile(new File(normalTableDir, "normal").getCanonicalPath) + + partitionedTableDirWithKey = File.createTempFile("parquettests", "sparksql") + partitionedTableDirWithKey.delete() + partitionedTableDirWithKey.mkdir() + + (1 to 10).foreach { p => + val partDir = new File(partitionedTableDirWithKey, s"p=$p") + sparkContext.makeRDD(1 to 10) + .map(i => ParquetDataWithKey(p, i, s"part-$p")) + .toDF() + .saveAsParquetFile(partDir.getCanonicalPath) + } + + partitionedTableDirWithKeyAndComplexTypes = File.createTempFile("parquettests", "sparksql") + partitionedTableDirWithKeyAndComplexTypes.delete() + partitionedTableDirWithKeyAndComplexTypes.mkdir() + + (1 to 10).foreach { p => + val partDir = new File(partitionedTableDirWithKeyAndComplexTypes, s"p=$p") + sparkContext.makeRDD(1 to 10).map { i => + ParquetDataWithKeyAndComplexTypes( + p, i, s"part-$p", StructContainer(i, f"${i}_string"), 1 to i) + }.toDF().saveAsParquetFile(partDir.getCanonicalPath) + } + + partitionedTableDirWithComplexTypes = File.createTempFile("parquettests", "sparksql") + partitionedTableDirWithComplexTypes.delete() + partitionedTableDirWithComplexTypes.mkdir() + + (1 to 10).foreach { p => + val partDir = new File(partitionedTableDirWithComplexTypes, s"p=$p") + sparkContext.makeRDD(1 to 10).map { i => + ParquetDataWithComplexTypes(i, s"part-$p", StructContainer(i, f"${i}_string"), 1 to i) + }.toDF().saveAsParquetFile(partDir.getCanonicalPath) + } + } + + override protected def afterAll(): Unit = { + partitionedTableDir.delete() + normalTableDir.delete() + partitionedTableDirWithKey.delete() + partitionedTableDirWithComplexTypes.delete() + partitionedTableDirWithKeyAndComplexTypes.delete() + } + + Seq( + "partitioned_parquet", + "partitioned_parquet_with_key", + "partitioned_parquet_with_complextypes", + "partitioned_parquet_with_key_and_complextypes").foreach { table => + + test(s"ordering of the partitioning columns $table") { + checkAnswer( + sql(s"SELECT p, stringField FROM $table WHERE p = 1"), + Seq.fill(10)(Row(1, "part-1")) + ) + + checkAnswer( + sql(s"SELECT stringField, p FROM $table WHERE p = 1"), + Seq.fill(10)(Row("part-1", 1)) + ) + } + + test(s"project the partitioning column $table") { + checkAnswer( + sql(s"SELECT p, count(*) FROM $table group by p"), + Row(1, 10) :: + Row(2, 10) :: + Row(3, 10) :: + Row(4, 10) :: + Row(5, 10) :: + Row(6, 10) :: + Row(7, 10) :: + Row(8, 10) :: + Row(9, 10) :: + Row(10, 10) :: Nil + ) + } + + test(s"project partitioning and non-partitioning columns $table") { + checkAnswer( + sql(s"SELECT stringField, p, count(intField) FROM $table GROUP BY p, stringField"), + Row("part-1", 1, 10) :: + Row("part-2", 2, 10) :: + Row("part-3", 3, 10) :: + Row("part-4", 4, 10) :: + Row("part-5", 5, 10) :: + Row("part-6", 6, 10) :: + Row("part-7", 7, 10) :: + Row("part-8", 8, 10) :: + Row("part-9", 9, 10) :: + Row("part-10", 10, 10) :: Nil + ) + } + + test(s"simple count $table") { + checkAnswer( + sql(s"SELECT COUNT(*) FROM $table"), + Row(100)) + } + + test(s"pruned count $table") { + checkAnswer( + sql(s"SELECT COUNT(*) FROM $table WHERE p = 1"), + Row(10)) + } + + test(s"non-existent partition $table") { + checkAnswer( + sql(s"SELECT COUNT(*) FROM $table WHERE p = 1000"), + Row(0)) + } + + test(s"multi-partition pruned count $table") { + checkAnswer( + sql(s"SELECT COUNT(*) FROM $table WHERE p IN (1,2,3)"), + Row(30)) + } + + test(s"non-partition predicates $table") { + checkAnswer( + sql(s"SELECT COUNT(*) FROM $table WHERE intField IN (1,2,3)"), + Row(30)) + } + + test(s"sum $table") { + checkAnswer( + sql(s"SELECT SUM(intField) FROM $table WHERE intField IN (1,2,3) AND p = 1"), + Row(1 + 2 + 3)) + } + + test(s"hive udfs $table") { + checkAnswer( + sql(s"SELECT concat(stringField, stringField) FROM $table"), + sql(s"SELECT stringField FROM $table").map { + case Row(s: String) => Row(s + s) + }.collect().toSeq) + } + } + + Seq( + "partitioned_parquet_with_key_and_complextypes", + "partitioned_parquet_with_complextypes").foreach { table => + + test(s"SPARK-5775 read struct from $table") { + checkAnswer( + sql(s"SELECT p, structField.intStructField, structField.stringStructField FROM $table WHERE p = 1"), + (1 to 10).map(i => Row(1, i, f"${i}_string"))) + } + + // Re-enable this after SPARK-5508 is fixed + ignore(s"SPARK-5775 read array from $table") { + checkAnswer( + sql(s"SELECT arrayField, p FROM $table WHERE p = 1"), + (1 to 10).map(i => Row(1 to i, 1))) + } + } + + + test("non-part select(*)") { + checkAnswer( + sql("SELECT COUNT(*) FROM normal_parquet"), + Row(10)) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/54d19689/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala deleted file mode 100644 index e89b448..0000000 --- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.parquet - -import org.apache.spark.sql.{SQLConf, QueryTest} -import org.apache.spark.sql.catalyst.expressions.Row -import org.apache.spark.sql.hive.test.TestHive - -case class Cases(lower: String, UPPER: String) - -class HiveParquetSuite extends QueryTest with ParquetTest { - val sqlContext = TestHive - - import sqlContext._ - - def run(prefix: String): Unit = { - test(s"$prefix: Case insensitive attribute names") { - withParquetTable((1 to 4).map(i => Cases(i.toString, i.toString)), "cases") { - val expected = (1 to 4).map(i => Row(i.toString)) - checkAnswer(sql("SELECT upper FROM cases"), expected) - checkAnswer(sql("SELECT LOWER FROM cases"), expected) - } - } - - test(s"$prefix: SELECT on Parquet table") { - val data = (1 to 4).map(i => (i, s"val_$i")) - withParquetTable(data, "t") { - checkAnswer(sql("SELECT * FROM t"), data.map(Row.fromTuple)) - } - } - - test(s"$prefix: Simple column projection + filter on Parquet table") { - withParquetTable((1 to 4).map(i => (i % 2 == 0, i, s"val_$i")), "t") { - checkAnswer( - sql("SELECT `_1`, `_3` FROM t WHERE `_1` = true"), - Seq(Row(true, "val_2"), Row(true, "val_4"))) - } - } - - test(s"$prefix: Converting Hive to Parquet Table via saveAsParquetFile") { - withTempPath { dir => - sql("SELECT * FROM src").saveAsParquetFile(dir.getCanonicalPath) - parquetFile(dir.getCanonicalPath).registerTempTable("p") - withTempTable("p") { - checkAnswer( - sql("SELECT * FROM src ORDER BY key"), - sql("SELECT * from p ORDER BY key").collect().toSeq) - } - } - } - - test(s"$prefix: INSERT OVERWRITE TABLE Parquet table") { - withParquetTable((1 to 10).map(i => (i, s"val_$i")), "t") { - withTempPath { file => - sql("SELECT * FROM t LIMIT 1").saveAsParquetFile(file.getCanonicalPath) - parquetFile(file.getCanonicalPath).registerTempTable("p") - withTempTable("p") { - // let's do three overwrites for good measure - sql("INSERT OVERWRITE TABLE p SELECT * FROM t") - sql("INSERT OVERWRITE TABLE p SELECT * FROM t") - sql("INSERT OVERWRITE TABLE p SELECT * FROM t") - checkAnswer(sql("SELECT * FROM p"), sql("SELECT * FROM t").collect().toSeq) - } - } - } - } - } - - withSQLConf(SQLConf.PARQUET_USE_DATA_SOURCE_API -> "true") { - run("Parquet data source enabled") - } - - withSQLConf(SQLConf.PARQUET_USE_DATA_SOURCE_API -> "false") { - run("Parquet data source disabled") - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org