This is an automated email from the ASF dual-hosted git repository. lixiao pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 2a449df [SPARK-31088][SQL] Add back HiveContext and createExternalTable 2a449df is described below commit 2a449df305d5f8495959fd71d937e0f5f4fff87d Author: gatorsmile <gatorsm...@gmail.com> AuthorDate: Thu Mar 26 23:51:15 2020 -0700 [SPARK-31088][SQL] Add back HiveContext and createExternalTable ### What changes were proposed in this pull request? Based on the discussion in the mailing list [[Proposal] Modification to Spark's Semantic Versioning Policy](http://apache-spark-developers-list.1001551.n3.nabble.com/Proposal-Modification-to-Spark-s-Semantic-Versioning-Policy-td28938.html) , this PR is to add back the following APIs whose maintenance cost are relatively small. - HiveContext - createExternalTable APIs ### Why are the changes needed? Avoid breaking the APIs that are commonly used. ### Does this PR introduce any user-facing change? Adding back the APIs that were removed in 3.0 branch does not introduce the user-facing changes, because Spark 3.0 has not been released. ### How was this patch tested? add a new test suite for createExternalTable APIs. Closes #27815 from gatorsmile/addAPIsBack. Lead-authored-by: gatorsmile <gatorsm...@gmail.com> Co-authored-by: yi.wu <yi...@databricks.com> Signed-off-by: gatorsmile <gatorsm...@gmail.com> (cherry picked from commit b9eafcb52658b7f5ec60bb4ebcc9da0fde94e105) Signed-off-by: gatorsmile <gatorsm...@gmail.com> --- docs/sql-migration-guide.md | 4 - project/MimaExcludes.scala | 2 - python/pyspark/__init__.py | 2 +- python/pyspark/sql/__init__.py | 4 +- python/pyspark/sql/catalog.py | 20 ++++ python/pyspark/sql/context.py | 67 +++++++++++++- .../scala/org/apache/spark/sql/SQLContext.scala | 91 ++++++++++++++++++ .../org/apache/spark/sql/catalog/Catalog.scala | 102 +++++++++++++++++++- .../DeprecatedCreateExternalTableSuite.scala | 85 +++++++++++++++++ .../org/apache/spark/sql/hive/HiveContext.scala | 63 +++++++++++++ .../sql/hive/HiveContextCompatibilitySuite.scala | 103 +++++++++++++++++++++ 11 files changed, 532 insertions(+), 11 deletions(-) diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index d2773d8..ab35e1f 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -309,10 +309,6 @@ license: | ### Others - - In Spark 3.0, the deprecated methods `SQLContext.createExternalTable` and `SparkSession.createExternalTable` have been removed in favor of its replacement, `createTable`. - - - In Spark 3.0, the deprecated `HiveContext` class has been removed. Use `SparkSession.builder.enableHiveSupport()` instead. - - In Spark version 2.4, when a spark session is created via `cloneSession()`, the newly created spark session inherits its configuration from its parent `SparkContext` even though the same configuration may exist with a different value in its parent spark session. Since Spark 3.0, the configurations of a parent `SparkSession` have a higher precedence over the parent `SparkContext`. The old behavior can be restored by setting `spark.sql.legacy.sessionInitWithConfigDefaults` to `true`. - Since Spark 3.0, if `hive.default.fileformat` is not found in `Spark SQL configuration` then it will fallback to hive-site.xml present in the `Hadoop configuration` of `SparkContext`. diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index f8ad60b..9a5029e 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -48,8 +48,6 @@ object MimaExcludes { ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.ExecutorPlugin"), // [SPARK-28980][SQL][CORE][MLLIB] Remove more old deprecated items in Spark 3 - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLContext.createExternalTable"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.createExternalTable"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.clustering.KMeans.train"), ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.mllib.clustering.KMeans.train"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.classification.LogisticRegressionWithSGD$"), diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py index 76a5bd0..70c0b27 100644 --- a/python/pyspark/__init__.py +++ b/python/pyspark/__init__.py @@ -113,7 +113,7 @@ def keyword_only(func): # for back compatibility -from pyspark.sql import SQLContext, Row +from pyspark.sql import SQLContext, HiveContext, Row __all__ = [ "SparkConf", "SparkContext", "SparkFiles", "RDD", "StorageLevel", "Broadcast", diff --git a/python/pyspark/sql/__init__.py b/python/pyspark/sql/__init__.py index 0a8d71c..c28cb8c 100644 --- a/python/pyspark/sql/__init__.py +++ b/python/pyspark/sql/__init__.py @@ -43,7 +43,7 @@ from __future__ import absolute_import from pyspark.sql.types import Row -from pyspark.sql.context import SQLContext, UDFRegistration +from pyspark.sql.context import SQLContext, HiveContext, UDFRegistration from pyspark.sql.session import SparkSession from pyspark.sql.column import Column from pyspark.sql.catalog import Catalog @@ -55,7 +55,7 @@ from pyspark.sql.pandas.group_ops import PandasCogroupedOps __all__ = [ - 'SparkSession', 'SQLContext', 'UDFRegistration', + 'SparkSession', 'SQLContext', 'HiveContext', 'UDFRegistration', 'DataFrame', 'GroupedData', 'Column', 'Catalog', 'Row', 'DataFrameNaFunctions', 'DataFrameStatFunctions', 'Window', 'WindowSpec', 'DataFrameReader', 'DataFrameWriter', 'PandasCogroupedOps' diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py index 08cf6ee..974251f 100644 --- a/python/pyspark/sql/catalog.py +++ b/python/pyspark/sql/catalog.py @@ -138,6 +138,26 @@ class Catalog(object): isBucket=jcolumn.isBucket())) return columns + @since(2.0) + def createExternalTable(self, tableName, path=None, source=None, schema=None, **options): + """Creates a table based on the dataset in a data source. + + It returns the DataFrame associated with the external table. + + The data source is specified by the ``source`` and a set of ``options``. + If ``source`` is not specified, the default data source configured by + ``spark.sql.sources.default`` will be used. + + Optionally, a schema can be provided as the schema of the returned :class:`DataFrame` and + created external table. + + :return: :class:`DataFrame` + """ + warnings.warn( + "createExternalTable is deprecated since Spark 2.2, please use createTable instead.", + DeprecationWarning) + return self.createTable(tableName, path, source, schema, **options) + @since(2.2) def createTable(self, tableName, path=None, source=None, schema=None, **options): """Creates a table based on the dataset in a data source. diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index f203e1c..0b7a7da 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -32,7 +32,7 @@ from pyspark.sql.types import IntegerType, Row, StringType from pyspark.sql.udf import UDFRegistration from pyspark.sql.utils import install_exception_handler -__all__ = ["SQLContext"] +__all__ = ["SQLContext", "HiveContext"] class SQLContext(object): @@ -338,6 +338,24 @@ class SQLContext(object): """ self.sparkSession.catalog.dropTempView(tableName) + @since(1.3) + def createExternalTable(self, tableName, path=None, source=None, schema=None, **options): + """Creates an external table based on the dataset in a data source. + + It returns the DataFrame associated with the external table. + + The data source is specified by the ``source`` and a set of ``options``. + If ``source`` is not specified, the default data source configured by + ``spark.sql.sources.default`` will be used. + + Optionally, a schema can be provided as the schema of the returned :class:`DataFrame` and + created external table. + + :return: :class:`DataFrame` + """ + return self.sparkSession.catalog.createExternalTable( + tableName, path, source, schema, **options) + @ignore_unicode_prefix @since(1.0) def sql(self, sqlQuery): @@ -461,6 +479,53 @@ class SQLContext(object): return StreamingQueryManager(self._ssql_ctx.streams()) +class HiveContext(SQLContext): + """A variant of Spark SQL that integrates with data stored in Hive. + + Configuration for Hive is read from ``hive-site.xml`` on the classpath. + It supports running both SQL and HiveQL commands. + + :param sparkContext: The SparkContext to wrap. + :param jhiveContext: An optional JVM Scala HiveContext. If set, we do not instantiate a new + :class:`HiveContext` in the JVM, instead we make all calls to this object. + + .. note:: Deprecated in 2.0.0. Use SparkSession.builder.enableHiveSupport().getOrCreate(). + """ + + def __init__(self, sparkContext, jhiveContext=None): + warnings.warn( + "HiveContext is deprecated in Spark 2.0.0. Please use " + + "SparkSession.builder.enableHiveSupport().getOrCreate() instead.", + DeprecationWarning) + if jhiveContext is None: + sparkContext._conf.set("spark.sql.catalogImplementation", "hive") + sparkSession = SparkSession.builder._sparkContext(sparkContext).getOrCreate() + else: + sparkSession = SparkSession(sparkContext, jhiveContext.sparkSession()) + SQLContext.__init__(self, sparkContext, sparkSession, jhiveContext) + + @classmethod + def _createForTesting(cls, sparkContext): + """(Internal use only) Create a new HiveContext for testing. + + All test code that touches HiveContext *must* go through this method. Otherwise, + you may end up launching multiple derby instances and encounter with incredibly + confusing error messages. + """ + jsc = sparkContext._jsc.sc() + jtestHive = sparkContext._jvm.org.apache.spark.sql.hive.test.TestHiveContext(jsc, False) + return cls(sparkContext, jtestHive) + + def refreshTable(self, tableName): + """Invalidate and refresh all the cached the metadata of the given + table. For performance reasons, Spark SQL or the external data source + library it uses might cache certain metadata about a table, such as the + location of blocks. When those change outside of Spark SQL, users should + call this function to invalidate the cache. + """ + self._ssql_ctx.refreshTable(tableName) + + def _test(): import os import doctest diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 592c64c..bbcc842 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -480,6 +480,97 @@ class SQLContext private[sql](val sparkSession: SparkSession) /** + * Creates an external table from the given path and returns the corresponding DataFrame. + * It will use the default data source configured by spark.sql.sources.default. + * + * @group ddl_ops + * @since 1.3.0 + */ + @deprecated("use sparkSession.catalog.createTable instead.", "2.2.0") + def createExternalTable(tableName: String, path: String): DataFrame = { + sparkSession.catalog.createTable(tableName, path) + } + + /** + * Creates an external table from the given path based on a data source + * and returns the corresponding DataFrame. + * + * @group ddl_ops + * @since 1.3.0 + */ + @deprecated("use sparkSession.catalog.createTable instead.", "2.2.0") + def createExternalTable( + tableName: String, + path: String, + source: String): DataFrame = { + sparkSession.catalog.createTable(tableName, path, source) + } + + /** + * Creates an external table from the given path based on a data source and a set of options. + * Then, returns the corresponding DataFrame. + * + * @group ddl_ops + * @since 1.3.0 + */ + @deprecated("use sparkSession.catalog.createTable instead.", "2.2.0") + def createExternalTable( + tableName: String, + source: String, + options: java.util.Map[String, String]): DataFrame = { + sparkSession.catalog.createTable(tableName, source, options) + } + + /** + * (Scala-specific) + * Creates an external table from the given path based on a data source and a set of options. + * Then, returns the corresponding DataFrame. + * + * @group ddl_ops + * @since 1.3.0 + */ + @deprecated("use sparkSession.catalog.createTable instead.", "2.2.0") + def createExternalTable( + tableName: String, + source: String, + options: Map[String, String]): DataFrame = { + sparkSession.catalog.createTable(tableName, source, options) + } + + /** + * Create an external table from the given path based on a data source, a schema and + * a set of options. Then, returns the corresponding DataFrame. + * + * @group ddl_ops + * @since 1.3.0 + */ + @deprecated("use sparkSession.catalog.createTable instead.", "2.2.0") + def createExternalTable( + tableName: String, + source: String, + schema: StructType, + options: java.util.Map[String, String]): DataFrame = { + sparkSession.catalog.createTable(tableName, source, schema, options) + } + + /** + * (Scala-specific) + * Create an external table from the given path based on a data source, a schema and + * a set of options. Then, returns the corresponding DataFrame. + * + * @group ddl_ops + * @since 1.3.0 + */ + @deprecated("use sparkSession.catalog.createTable instead.", "2.2.0") + def createExternalTable( + tableName: String, + source: String, + schema: StructType, + options: Map[String, String]): DataFrame = { + sparkSession.catalog.createTable(tableName, source, schema, options) + } + + /** * Registers the given `DataFrame` as a temporary table in the catalog. Temporary tables exist * only during the lifetime of this instance of SQLContext. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala index 318cc62..60738e6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalog import scala.collection.JavaConverters._ -import org.apache.spark.annotation.Stable +import org.apache.spark.annotation.{Evolving, Experimental, Stable} import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset} import org.apache.spark.sql.types.StructType import org.apache.spark.storage.StorageLevel @@ -215,6 +215,20 @@ abstract class Catalog { * @param tableName is either a qualified or unqualified name that designates a table. * If no database identifier is provided, it refers to a table in * the current database. + * @since 2.0.0 + */ + @deprecated("use createTable instead.", "2.2.0") + def createExternalTable(tableName: String, path: String): DataFrame = { + createTable(tableName, path) + } + + /** + * Creates a table from the given path and returns the corresponding DataFrame. + * It will use the default data source configured by spark.sql.sources.default. + * + * @param tableName is either a qualified or unqualified name that designates a table. + * If no database identifier is provided, it refers to a table in + * the current database. * @since 2.2.0 */ def createTable(tableName: String, path: String): DataFrame @@ -226,11 +240,42 @@ abstract class Catalog { * @param tableName is either a qualified or unqualified name that designates a table. * If no database identifier is provided, it refers to a table in * the current database. + * @since 2.0.0 + */ + @deprecated("use createTable instead.", "2.2.0") + def createExternalTable(tableName: String, path: String, source: String): DataFrame = { + createTable(tableName, path, source) + } + + /** + * Creates a table from the given path based on a data source and returns the corresponding + * DataFrame. + * + * @param tableName is either a qualified or unqualified name that designates a table. + * If no database identifier is provided, it refers to a table in + * the current database. * @since 2.2.0 */ def createTable(tableName: String, path: String, source: String): DataFrame /** + * Creates a table from the given path based on a data source and a set of options. + * Then, returns the corresponding DataFrame. + * + * @param tableName is either a qualified or unqualified name that designates a table. + * If no database identifier is provided, it refers to a table in + * the current database. + * @since 2.0.0 + */ + @deprecated("use createTable instead.", "2.2.0") + def createExternalTable( + tableName: String, + source: String, + options: java.util.Map[String, String]): DataFrame = { + createTable(tableName, source, options) + } + + /** * Creates a table based on the dataset in a data source and a set of options. * Then, returns the corresponding DataFrame. * @@ -248,6 +293,24 @@ abstract class Catalog { /** * (Scala-specific) + * Creates a table from the given path based on a data source and a set of options. + * Then, returns the corresponding DataFrame. + * + * @param tableName is either a qualified or unqualified name that designates a table. + * If no database identifier is provided, it refers to a table in + * the current database. + * @since 2.0.0 + */ + @deprecated("use createTable instead.", "2.2.0") + def createExternalTable( + tableName: String, + source: String, + options: Map[String, String]): DataFrame = { + createTable(tableName, source, options) + } + + /** + * (Scala-specific) * Creates a table based on the dataset in a data source and a set of options. * Then, returns the corresponding DataFrame. * @@ -262,6 +325,24 @@ abstract class Catalog { options: Map[String, String]): DataFrame /** + * Create a table from the given path based on a data source, a schema and a set of options. + * Then, returns the corresponding DataFrame. + * + * @param tableName is either a qualified or unqualified name that designates a table. + * If no database identifier is provided, it refers to a table in + * the current database. + * @since 2.0.0 + */ + @deprecated("use createTable instead.", "2.2.0") + def createExternalTable( + tableName: String, + source: String, + schema: StructType, + options: java.util.Map[String, String]): DataFrame = { + createTable(tableName, source, schema, options) + } + + /** * Create a table based on the dataset in a data source, a schema and a set of options. * Then, returns the corresponding DataFrame. * @@ -280,6 +361,25 @@ abstract class Catalog { /** * (Scala-specific) + * Create a table from the given path based on a data source, a schema and a set of options. + * Then, returns the corresponding DataFrame. + * + * @param tableName is either a qualified or unqualified name that designates a table. + * If no database identifier is provided, it refers to a table in + * the current database. + * @since 2.0.0 + */ + @deprecated("use createTable instead.", "2.2.0") + def createExternalTable( + tableName: String, + source: String, + schema: StructType, + options: Map[String, String]): DataFrame = { + createTable(tableName, source, schema, options) + } + + /** + * (Scala-specific) * Create a table based on the dataset in a data source, a schema and a set of options. * Then, returns the corresponding DataFrame. * diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/DeprecatedCreateExternalTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/DeprecatedCreateExternalTableSuite.scala new file mode 100644 index 0000000..0b5cd3d --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/DeprecatedCreateExternalTableSuite.scala @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.internal + +import java.io.File + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogTableType +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.StructType + +class DeprecatedCreateExternalTableSuite extends SharedSparkSession { + test("createExternalTable with explicit path") { + withTable("t") { + withTempDir { dir => + val path = new File(dir, "test") + spark.range(100).write.parquet(path.getAbsolutePath) + spark.catalog.createExternalTable( + tableName = "t", + path = path.getAbsolutePath + ) + assert(spark.sessionState.catalog.tableExists(TableIdentifier("t"))) + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + assert(table.tableType === CatalogTableType.EXTERNAL) + assert(table.provider === Some("parquet")) + assert(table.schema === new StructType().add("id", "long")) + assert(table.storage.locationUri.get == makeQualifiedPath(path.getAbsolutePath)) + } + } + } + + test("createExternalTable with 'path' options") { + withTable("t") { + withTempDir { dir => + val path = new File(dir, "test") + spark.range(100).write.parquet(path.getAbsolutePath) + spark.catalog.createExternalTable( + tableName = "t", + source = "parquet", + options = Map("path" -> path.getAbsolutePath)) + assert(spark.sessionState.catalog.tableExists(TableIdentifier("t"))) + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + assert(table.tableType === CatalogTableType.EXTERNAL) + assert(table.provider === Some("parquet")) + assert(table.schema === new StructType().add("id", "long")) + assert(table.storage.locationUri.get == makeQualifiedPath(path.getAbsolutePath)) + } + } + } + + test("createExternalTable with explicit schema") { + withTable("t") { + withTempDir { dir => + val path = new File(dir, "test") + spark.range(100).write.parquet(path.getAbsolutePath) + spark.catalog.createExternalTable( + tableName = "t", + source = "parquet", + schema = new StructType().add("i", "int"), + options = Map("path" -> path.getAbsolutePath)) + assert(spark.sessionState.catalog.tableExists(TableIdentifier("t"))) + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + assert(table.tableType === CatalogTableType.EXTERNAL) + assert(table.provider === Some("parquet")) + assert(table.schema === new StructType().add("i", "int")) + assert(table.storage.locationUri.get == makeQualifiedPath(path.getAbsolutePath)) + } + } + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala new file mode 100644 index 0000000..02a5117 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.spark.SparkContext +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{SparkSession, SQLContext} + + +/** + * An instance of the Spark SQL execution engine that integrates with data stored in Hive. + * Configuration for Hive is read from hive-site.xml on the classpath. + */ +@deprecated("Use SparkSession.builder.enableHiveSupport instead", "2.0.0") +class HiveContext private[hive](_sparkSession: SparkSession) + extends SQLContext(_sparkSession) with Logging { + + self => + + def this(sc: SparkContext) = { + this(SparkSession.builder().sparkContext(HiveUtils.withHiveExternalCatalog(sc)).getOrCreate()) + } + + def this(sc: JavaSparkContext) = this(sc.sc) + + /** + * Returns a new HiveContext as new session, which will have separated SQLConf, UDF/UDAF, + * temporary tables and SessionState, but sharing the same CacheManager, IsolatedClientLoader + * and Hive client (both of execution and metadata) with existing HiveContext. + */ + override def newSession(): HiveContext = { + new HiveContext(sparkSession.newSession()) + } + + /** + * Invalidate and refresh all the cached the metadata of the given table. For performance reasons, + * Spark SQL or the external data source library it uses might cache certain metadata about a + * table, such as the location of blocks. When those change outside of Spark SQL, users should + * call this function to invalidate the cache. + * + * @since 1.3.0 + */ + def refreshTable(tableName: String): Unit = { + sparkSession.catalog.refreshTable(tableName) + } + +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala new file mode 100644 index 0000000..a80db76 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} + + +class HiveContextCompatibilitySuite extends SparkFunSuite with BeforeAndAfterEach { + + override protected val enableAutoThreadAudit = false + private var sc: SparkContext = null + private var hc: HiveContext = null + + override def beforeAll(): Unit = { + super.beforeAll() + sc = SparkContext.getOrCreate(new SparkConf().setMaster("local").setAppName("test")) + HiveUtils.newTemporaryConfiguration(useInMemoryDerby = true).foreach { case (k, v) => + sc.hadoopConfiguration.set(k, v) + } + hc = new HiveContext(sc) + } + + override def afterEach(): Unit = { + try { + hc.sharedState.cacheManager.clearCache() + hc.sessionState.catalog.reset() + } finally { + super.afterEach() + } + } + + override def afterAll(): Unit = { + try { + sc = null + hc = null + } finally { + super.afterAll() + } + } + + test("basic operations") { + val _hc = hc + import _hc.implicits._ + val df1 = (1 to 20).map { i => (i, i) }.toDF("a", "x") + val df2 = (1 to 100).map { i => (i, i % 10, i % 2 == 0) }.toDF("a", "b", "c") + .select($"a", $"b") + .filter($"a" > 10 && $"b" > 6 && $"c") + val df3 = df1.join(df2, "a") + val res = df3.collect() + val expected = Seq((18, 18, 8)).toDF("a", "x", "b").collect() + assert(res.toSeq == expected.toSeq) + df3.createOrReplaceTempView("mai_table") + val df4 = hc.table("mai_table") + val res2 = df4.collect() + assert(res2.toSeq == expected.toSeq) + } + + test("basic DDLs") { + val _hc = hc + import _hc.implicits._ + val databases = hc.sql("SHOW DATABASES").collect().map(_.getString(0)) + assert(databases.toSeq == Seq("default")) + hc.sql("CREATE DATABASE mee_db") + hc.sql("USE mee_db") + val databases2 = hc.sql("SHOW DATABASES").collect().map(_.getString(0)) + assert(databases2.toSet == Set("default", "mee_db")) + val df = (1 to 10).map { i => ("bob" + i.toString, i) }.toDF("name", "age") + df.createOrReplaceTempView("mee_table") + hc.sql("CREATE TABLE moo_table (name string, age int)") + hc.sql("INSERT INTO moo_table SELECT * FROM mee_table") + assert( + hc.sql("SELECT * FROM moo_table order by name").collect().toSeq == + df.collect().toSeq.sortBy(_.getString(0))) + val tables = hc.sql("SHOW TABLES IN mee_db").select("tableName").collect().map(_.getString(0)) + assert(tables.toSet == Set("moo_table", "mee_table")) + hc.sql("DROP TABLE moo_table") + hc.sql("DROP TABLE mee_table") + val tables2 = hc.sql("SHOW TABLES IN mee_db").select("tableName").collect().map(_.getString(0)) + assert(tables2.isEmpty) + hc.sql("USE default") + hc.sql("DROP DATABASE mee_db CASCADE") + val databases3 = hc.sql("SHOW DATABASES").collect().map(_.getString(0)) + assert(databases3.toSeq == Seq("default")) + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org