Repository: spark Updated Branches: refs/heads/master f8db8945f -> 18ee55dd5
[SPARK-19148][SQL] do not expose the external table concept in Catalog ## What changes were proposed in this pull request? In https://github.com/apache/spark/pull/16296 , we reached a consensus that we should hide the external/managed table concept to users and only expose custom table path. This PR renames `Catalog.createExternalTable` to `createTable`(still keep the old versions for backward compatibility), and only set the table type to EXTERNAL if `path` is specified in options. ## How was this patch tested? new tests in `CatalogSuite` Author: Wenchen Fan <wenc...@databricks.com> Closes #16528 from cloud-fan/create-table. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/18ee55dd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/18ee55dd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/18ee55dd Branch: refs/heads/master Commit: 18ee55dd5de0597d7fb69e8e16ac3744356a6918 Parents: f8db894 Author: Wenchen Fan <wenc...@databricks.com> Authored: Tue Jan 17 12:54:50 2017 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Tue Jan 17 12:54:50 2017 +0800 ---------------------------------------------------------------------- project/MimaExcludes.scala | 5 +- python/pyspark/sql/catalog.py | 27 +++- .../org/apache/spark/sql/catalog/Catalog.scala | 129 ++++++++++++++++--- .../command/createDataSourceTables.scala | 9 -- .../apache/spark/sql/internal/CatalogImpl.scala | 78 ++++------- .../spark/sql/internal/CatalogSuite.scala | 66 +++++++--- 6 files changed, 211 insertions(+), 103 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/18ee55dd/project/MimaExcludes.scala ---------------------------------------------------------------------- diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 2314d7f..e0ee00e 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -43,7 +43,10 @@ object MimaExcludes { ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.recoverPartitions"), // [SPARK-18537] Add a REST api to spark streaming - ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.streaming.scheduler.StreamingListener.onStreamingStarted") + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.streaming.scheduler.StreamingListener.onStreamingStarted"), + + // [SPARK-19148][SQL] do not expose the external table concept in Catalog + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.createTable") ) // Exclude rules for 2.1.x http://git-wip-us.apache.org/repos/asf/spark/blob/18ee55dd/python/pyspark/sql/catalog.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py index 30c7a3f..253a750 100644 --- a/python/pyspark/sql/catalog.py +++ b/python/pyspark/sql/catalog.py @@ -15,6 +15,7 @@ # limitations under the License. # +import warnings from collections import namedtuple from pyspark import since @@ -138,7 +139,27 @@ class Catalog(object): @since(2.0) def createExternalTable(self, tableName, path=None, source=None, schema=None, **options): - """Creates an external table based on the dataset in a data source. + """Creates a table based on the dataset in a data source. + + It returns the DataFrame associated with the external table. + + The data source is specified by the ``source`` and a set of ``options``. + If ``source`` is not specified, the default data source configured by + ``spark.sql.sources.default`` will be used. + + Optionally, a schema can be provided as the schema of the returned :class:`DataFrame` and + created external table. + + :return: :class:`DataFrame` + """ + warnings.warn( + "createExternalTable is deprecated since Spark 2.2, please use createTable instead.", + DeprecationWarning) + return self.createTable(tableName, path, source, schema, **options) + + @since(2.2) + def createTable(self, tableName, path=None, source=None, schema=None, **options): + """Creates a table based on the dataset in a data source. It returns the DataFrame associated with the external table. @@ -157,12 +178,12 @@ class Catalog(object): source = self._sparkSession.conf.get( "spark.sql.sources.default", "org.apache.spark.sql.parquet") if schema is None: - df = self._jcatalog.createExternalTable(tableName, source, options) + df = self._jcatalog.createTable(tableName, source, options) else: if not isinstance(schema, StructType): raise TypeError("schema should be StructType") scala_datatype = self._jsparkSession.parseDataType(schema.json()) - df = self._jcatalog.createExternalTable(tableName, source, scala_datatype, options) + df = self._jcatalog.createTable(tableName, source, scala_datatype, options) return DataFrame(df, self._sparkSession._wrapped) @since(2.0) http://git-wip-us.apache.org/repos/asf/spark/blob/18ee55dd/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala index 6b061f8..41e781e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalog +import scala.collection.JavaConverters._ + import org.apache.spark.annotation.{Experimental, InterfaceStability} import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset} import org.apache.spark.sql.types.StructType @@ -187,82 +189,169 @@ abstract class Catalog { def functionExists(dbName: String, functionName: String): Boolean /** - * :: Experimental :: - * Creates an external table from the given path and returns the corresponding DataFrame. + * Creates a table from the given path and returns the corresponding DataFrame. * It will use the default data source configured by spark.sql.sources.default. * * @since 2.0.0 */ + @deprecated("use createTable instead.", "2.2.0") + def createExternalTable(tableName: String, path: String): DataFrame = { + createTable(tableName, path) + } + + /** + * :: Experimental :: + * Creates a table from the given path and returns the corresponding DataFrame. + * It will use the default data source configured by spark.sql.sources.default. + * + * @since 2.2.0 + */ @Experimental @InterfaceStability.Evolving - def createExternalTable(tableName: String, path: String): DataFrame + def createTable(tableName: String, path: String): DataFrame /** - * :: Experimental :: - * Creates an external table from the given path based on a data source - * and returns the corresponding DataFrame. + * Creates a table from the given path based on a data source and returns the corresponding + * DataFrame. * * @since 2.0.0 */ + @deprecated("use createTable instead.", "2.2.0") + def createExternalTable(tableName: String, path: String, source: String): DataFrame = { + createTable(tableName, path, source) + } + + /** + * :: Experimental :: + * Creates a table from the given path based on a data source and returns the corresponding + * DataFrame. + * + * @since 2.2.0 + */ @Experimental @InterfaceStability.Evolving - def createExternalTable(tableName: String, path: String, source: String): DataFrame + def createTable(tableName: String, path: String, source: String): DataFrame /** - * :: Experimental :: - * Creates an external table from the given path based on a data source and a set of options. + * Creates a table from the given path based on a data source and a set of options. * Then, returns the corresponding DataFrame. * * @since 2.0.0 */ + @deprecated("use createTable instead.", "2.2.0") + def createExternalTable( + tableName: String, + source: String, + options: java.util.Map[String, String]): DataFrame = { + createTable(tableName, source, options) + } + + /** + * :: Experimental :: + * Creates a table from the given path based on a data source and a set of options. + * Then, returns the corresponding DataFrame. + * + * @since 2.2.0 + */ @Experimental @InterfaceStability.Evolving + def createTable( + tableName: String, + source: String, + options: java.util.Map[String, String]): DataFrame = { + createTable(tableName, source, options.asScala.toMap) + } + + /** + * (Scala-specific) + * Creates a table from the given path based on a data source and a set of options. + * Then, returns the corresponding DataFrame. + * + * @since 2.0.0 + */ + @deprecated("use createTable instead.", "2.2.0") def createExternalTable( tableName: String, source: String, - options: java.util.Map[String, String]): DataFrame + options: Map[String, String]): DataFrame = { + createTable(tableName, source, options) + } /** * :: Experimental :: * (Scala-specific) - * Creates an external table from the given path based on a data source and a set of options. + * Creates a table from the given path based on a data source and a set of options. * Then, returns the corresponding DataFrame. * - * @since 2.0.0 + * @since 2.2.0 */ @Experimental @InterfaceStability.Evolving - def createExternalTable( + def createTable( tableName: String, source: String, options: Map[String, String]): DataFrame /** * :: Experimental :: - * Create an external table from the given path based on a data source, a schema and - * a set of options. Then, returns the corresponding DataFrame. + * Create a table from the given path based on a data source, a schema and a set of options. + * Then, returns the corresponding DataFrame. * * @since 2.0.0 */ + @deprecated("use createTable instead.", "2.2.0") + def createExternalTable( + tableName: String, + source: String, + schema: StructType, + options: java.util.Map[String, String]): DataFrame = { + createTable(tableName, source, schema, options) + } + + /** + * :: Experimental :: + * Create a table from the given path based on a data source, a schema and a set of options. + * Then, returns the corresponding DataFrame. + * + * @since 2.2.0 + */ @Experimental @InterfaceStability.Evolving + def createTable( + tableName: String, + source: String, + schema: StructType, + options: java.util.Map[String, String]): DataFrame = { + createTable(tableName, source, schema, options.asScala.toMap) + } + + /** + * (Scala-specific) + * Create a table from the given path based on a data source, a schema and a set of options. + * Then, returns the corresponding DataFrame. + * + * @since 2.0.0 + */ + @deprecated("use createTable instead.", "2.2.0") def createExternalTable( tableName: String, source: String, schema: StructType, - options: java.util.Map[String, String]): DataFrame + options: Map[String, String]): DataFrame = { + createTable(tableName, source, schema, options) + } /** * :: Experimental :: * (Scala-specific) - * Create an external table from the given path based on a data source, a schema and - * a set of options. Then, returns the corresponding DataFrame. + * Create a table from the given path based on a data source, a schema and a set of options. + * Then, returns the corresponding DataFrame. * - * @since 2.0.0 + * @since 2.2.0 */ @Experimental @InterfaceStability.Evolving - def createExternalTable( + def createTable( tableName: String, source: String, schema: StructType, http://git-wip-us.apache.org/repos/asf/spark/blob/18ee55dd/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 90aeebd..beeba05 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -71,15 +71,6 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo options = table.storage.properties ++ pathOption, catalogTable = Some(tableWithDefaultOptions)).resolveRelation() - dataSource match { - case fs: HadoopFsRelation => - if (table.tableType == CatalogTableType.EXTERNAL && fs.location.rootPaths.isEmpty) { - throw new AnalysisException( - "Cannot create a file-based external data source table without path") - } - case _ => - } - val partitionColumnNames = if (table.schema.nonEmpty) { table.partitionColumnNames } else { http://git-wip-us.apache.org/repos/asf/spark/blob/18ee55dd/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 8244b21..9136a83 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.internal -import scala.collection.JavaConverters._ import scala.reflect.runtime.universe.TypeTag import org.apache.spark.annotation.Experimental @@ -257,101 +256,74 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { /** * :: Experimental :: - * Creates an external table from the given path and returns the corresponding DataFrame. + * Creates a table from the given path and returns the corresponding DataFrame. * It will use the default data source configured by spark.sql.sources.default. * * @group ddl_ops - * @since 2.0.0 + * @since 2.2.0 */ @Experimental - override def createExternalTable(tableName: String, path: String): DataFrame = { + override def createTable(tableName: String, path: String): DataFrame = { val dataSourceName = sparkSession.sessionState.conf.defaultDataSourceName - createExternalTable(tableName, path, dataSourceName) + createTable(tableName, path, dataSourceName) } /** * :: Experimental :: - * Creates an external table from the given path based on a data source - * and returns the corresponding DataFrame. + * Creates a table from the given path based on a data source and returns the corresponding + * DataFrame. * * @group ddl_ops - * @since 2.0.0 + * @since 2.2.0 */ @Experimental - override def createExternalTable(tableName: String, path: String, source: String): DataFrame = { - createExternalTable(tableName, source, Map("path" -> path)) - } - - /** - * :: Experimental :: - * Creates an external table from the given path based on a data source and a set of options. - * Then, returns the corresponding DataFrame. - * - * @group ddl_ops - * @since 2.0.0 - */ - @Experimental - override def createExternalTable( - tableName: String, - source: String, - options: java.util.Map[String, String]): DataFrame = { - createExternalTable(tableName, source, options.asScala.toMap) + override def createTable(tableName: String, path: String, source: String): DataFrame = { + createTable(tableName, source, Map("path" -> path)) } /** * :: Experimental :: * (Scala-specific) - * Creates an external table from the given path based on a data source and a set of options. + * Creates a table from the given path based on a data source and a set of options. * Then, returns the corresponding DataFrame. * * @group ddl_ops - * @since 2.0.0 + * @since 2.2.0 */ @Experimental - override def createExternalTable( + override def createTable( tableName: String, source: String, options: Map[String, String]): DataFrame = { - createExternalTable(tableName, source, new StructType, options) - } - - /** - * :: Experimental :: - * Create an external table from the given path based on a data source, a schema and - * a set of options. Then, returns the corresponding DataFrame. - * - * @group ddl_ops - * @since 2.0.0 - */ - @Experimental - override def createExternalTable( - tableName: String, - source: String, - schema: StructType, - options: java.util.Map[String, String]): DataFrame = { - createExternalTable(tableName, source, schema, options.asScala.toMap) + createTable(tableName, source, new StructType, options) } /** * :: Experimental :: * (Scala-specific) - * Create an external table from the given path based on a data source, a schema and - * a set of options. Then, returns the corresponding DataFrame. + * Create a table from the given path based on a data source, a schema and a set of options. + * Then, returns the corresponding DataFrame. * * @group ddl_ops - * @since 2.0.0 + * @since 2.2.0 */ @Experimental - override def createExternalTable( + override def createTable( tableName: String, source: String, schema: StructType, options: Map[String, String]): DataFrame = { val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) + val storage = DataSource.buildStorageFormatFromOptions(options) + val tableType = if (storage.locationUri.isDefined) { + CatalogTableType.EXTERNAL + } else { + CatalogTableType.MANAGED + } val tableDesc = CatalogTable( identifier = tableIdent, - tableType = CatalogTableType.EXTERNAL, - storage = DataSource.buildStorageFormatFromOptions(options), + tableType = tableType, + storage = storage, schema = schema, provider = Some(source) ) http://git-wip-us.apache.org/repos/asf/spark/blob/18ee55dd/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala index 5dd0454..801912f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala @@ -17,6 +17,9 @@ package org.apache.spark.sql.internal +import java.io.File +import java.net.URI + import org.scalatest.BeforeAndAfterEach import org.apache.spark.SparkFunSuite @@ -27,7 +30,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo} import org.apache.spark.sql.catalyst.plans.logical.Range import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.sql.types.{IntegerType, StructType} +import org.apache.spark.sql.types.StructType /** @@ -37,6 +40,7 @@ class CatalogSuite extends SparkFunSuite with BeforeAndAfterEach with SharedSQLContext { + import testImplicits._ private def sessionCatalog: SessionCatalog = spark.sessionState.catalog @@ -306,22 +310,6 @@ class CatalogSuite columnFields.foreach { f => assert(columnString.contains(f.toString)) } } - test("createExternalTable should fail if path is not given for file-based data source") { - val e = intercept[AnalysisException] { - spark.catalog.createExternalTable("tbl", "json", Map.empty[String, String]) - } - assert(e.message.contains("Unable to infer schema")) - - val e2 = intercept[AnalysisException] { - spark.catalog.createExternalTable( - "tbl", - "json", - new StructType().add("i", IntegerType), - Map.empty[String, String]) - } - assert(e2.message == "Cannot create a file-based external data source table without path") - } - test("dropTempView should not un-cache and drop metastore table if a same-name table exists") { withTable("same_name") { spark.range(10).write.saveAsTable("same_name") @@ -460,6 +448,50 @@ class CatalogSuite } } + test("createTable with 'path' in options") { + withTable("t") { + withTempDir { dir => + spark.catalog.createTable( + tableName = "t", + source = "json", + schema = new StructType().add("i", "int"), + options = Map("path" -> dir.getAbsolutePath)) + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + assert(table.tableType == CatalogTableType.EXTERNAL) + assert(table.storage.locationUri.get == dir.getAbsolutePath) + + Seq((1)).toDF("i").write.insertInto("t") + assert(dir.exists() && dir.listFiles().nonEmpty) + + sql("DROP TABLE t") + // the table path and data files are still there after DROP TABLE, if custom table path is + // specified. + assert(dir.exists() && dir.listFiles().nonEmpty) + } + } + } + + test("createTable without 'path' in options") { + withTable("t") { + spark.catalog.createTable( + tableName = "t", + source = "json", + schema = new StructType().add("i", "int"), + options = Map.empty[String, String]) + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + assert(table.tableType == CatalogTableType.MANAGED) + val tablePath = new File(new URI(table.storage.locationUri.get)) + assert(tablePath.exists() && tablePath.listFiles().isEmpty) + + Seq((1)).toDF("i").write.insertInto("t") + assert(tablePath.listFiles().nonEmpty) + + sql("DROP TABLE t") + // the table path is removed after DROP TABLE, if custom table path is not specified. + assert(!tablePath.exists()) + } + } + // TODO: add tests for the rest of them } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org