Repository: spark Updated Branches: refs/heads/master 5207a005c -> 33c6eb521
[SPARK-15171][SQL] Deprecate registerTempTable and add dataset.createTempView ## What changes were proposed in this pull request? Deprecates registerTempTable and add dataset.createTempView, dataset.createOrReplaceTempView. ## How was this patch tested? Unit tests. Author: Sean Zhong <seanzh...@databricks.com> Closes #12945 from clockfly/spark-15171. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/33c6eb52 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/33c6eb52 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/33c6eb52 Branch: refs/heads/master Commit: 33c6eb5218ce3c31cc9f632a67fd2c7057569683 Parents: 5207a00 Author: Sean Zhong <seanzh...@databricks.com> Authored: Thu May 12 15:51:53 2016 +0800 Committer: Cheng Lian <l...@databricks.com> Committed: Thu May 12 15:51:53 2016 +0800 ---------------------------------------------------------------------- .../sbt_app_sql/src/main/scala/SqlApp.scala | 4 +- .../apache/spark/examples/sql/JavaSparkSQL.java | 8 +-- .../streaming/JavaSqlNetworkWordCount.java | 2 +- examples/src/main/python/sql.py | 2 +- .../python/streaming/sql_network_wordcount.py | 2 +- .../apache/spark/examples/sql/RDDRelation.scala | 4 +- .../streaming/SqlNetworkWordCount.scala | 2 +- .../spark/ml/feature/SQLTransformer.scala | 5 +- python/pyspark/sql/catalog.py | 26 +++------- python/pyspark/sql/context.py | 4 +- python/pyspark/sql/dataframe.py | 51 ++++++++++++++++++-- python/pyspark/sql/session.py | 6 +-- .../analysis/DistinctAggregationRewriter.scala | 2 +- .../sql/catalyst/catalog/SessionCatalog.scala | 2 +- .../sql/catalyst/analysis/AnalysisTest.scala | 2 +- .../analysis/DecimalPrecisionSuite.scala | 2 +- .../catalyst/catalog/SessionCatalogSuite.scala | 26 +++++----- .../org/apache/spark/sql/DataFrameWriter.scala | 2 +- .../scala/org/apache/spark/sql/Dataset.scala | 30 +++++++++++- .../scala/org/apache/spark/sql/SQLContext.scala | 4 +- .../org/apache/spark/sql/SparkSession.scala | 17 +++---- .../org/apache/spark/sql/catalog/Catalog.scala | 8 +-- .../spark/sql/execution/SparkSqlParser.scala | 2 +- .../spark/sql/execution/command/cache.scala | 3 +- .../spark/sql/execution/command/views.scala | 2 +- .../spark/sql/execution/datasources/ddl.scala | 4 +- .../apache/spark/sql/internal/CatalogImpl.scala | 12 ++--- .../org/apache/spark/sql/CachedTableSuite.scala | 6 +-- .../org/apache/spark/sql/DataFrameSuite.scala | 19 ++++---- .../spark/sql/DataFrameTimeWindowingSuite.scala | 2 +- .../org/apache/spark/sql/DatasetSuite.scala | 18 +++++++ .../org/apache/spark/sql/ListTablesSuite.scala | 2 +- .../org/apache/spark/sql/SQLQuerySuite.scala | 6 +-- .../scala/org/apache/spark/sql/UDFSuite.scala | 4 +- .../columnar/InMemoryColumnarQuerySuite.scala | 2 +- .../parquet/ParquetReadBenchmark.scala | 2 +- .../spark/sql/internal/CatalogSuite.scala | 2 +- .../apache/spark/sql/test/SQLTestUtils.scala | 2 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 2 +- .../spark/sql/hive/ErrorPositionSuite.scala | 6 +-- .../spark/sql/hive/HiveDDLCommandSuite.scala | 2 +- .../apache/spark/sql/hive/ListTablesSuite.scala | 2 +- .../hive/execution/AggregationQuerySuite.scala | 2 +- .../spark/sql/hive/execution/HiveUDFSuite.scala | 2 +- .../hive/execution/SQLWindowFunctionSuite.scala | 2 +- 45 files changed, 197 insertions(+), 120 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/33c6eb52/dev/audit-release/sbt_app_sql/src/main/scala/SqlApp.scala ---------------------------------------------------------------------- diff --git a/dev/audit-release/sbt_app_sql/src/main/scala/SqlApp.scala b/dev/audit-release/sbt_app_sql/src/main/scala/SqlApp.scala index 69c1154..1002631 100644 --- a/dev/audit-release/sbt_app_sql/src/main/scala/SqlApp.scala +++ b/dev/audit-release/sbt_app_sql/src/main/scala/SqlApp.scala @@ -41,7 +41,7 @@ object SparkSqlExample { import sqlContext._ val people = sc.makeRDD(1 to 100, 10).map(x => Person(s"Name$x", x)).toDF() - people.registerTempTable("people") + people.createOrReplaceTempView("people") val teenagers = sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") val teenagerNames = teenagers.map(t => "Name: " + t(0)).collect() teenagerNames.foreach(println) @@ -52,7 +52,7 @@ object SparkSqlExample { System.exit(-1) } } - + test(teenagerNames.size == 7, "Unexpected number of selected elements: " + teenagerNames) println("Test succeeded") sc.stop() http://git-wip-us.apache.org/repos/asf/spark/blob/33c6eb52/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java index d956750..cf0167f 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java @@ -75,7 +75,7 @@ public class JavaSparkSQL { // Apply a schema to an RDD of Java Beans and register it as a table. Dataset<Row> schemaPeople = spark.createDataFrame(people, Person.class); - schemaPeople.registerTempTable("people"); + schemaPeople.createOrReplaceTempView("people"); // SQL can be run over RDDs that have been registered as tables. Dataset<Row> teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19"); @@ -102,7 +102,7 @@ public class JavaSparkSQL { Dataset<Row> parquetFile = spark.read().parquet("people.parquet"); //Parquet files can also be registered as tables and then used in SQL statements. - parquetFile.registerTempTable("parquetFile"); + parquetFile.createOrReplaceTempView("parquetFile"); Dataset<Row> teenagers2 = spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19"); teenagerNames = teenagers2.toJavaRDD().map(new Function<Row, String>() { @@ -131,7 +131,7 @@ public class JavaSparkSQL { // |-- name: StringType // Register this DataFrame as a table. - peopleFromJsonFile.registerTempTable("people"); + peopleFromJsonFile.createOrReplaceTempView("people"); // SQL statements can be run by using the sql methods provided by `spark` Dataset<Row> teenagers3 = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19"); @@ -163,7 +163,7 @@ public class JavaSparkSQL { // | |-- state: StringType // |-- name: StringType - peopleFromJsonRDD.registerTempTable("people2"); + peopleFromJsonRDD.createOrReplaceTempView("people2"); Dataset<Row> peopleWithCity = spark.sql("SELECT name, address.city FROM people2"); List<String> nameAndCity = peopleWithCity.toJavaRDD().map(new Function<Row, String>() { http://git-wip-us.apache.org/repos/asf/spark/blob/33c6eb52/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java index 57953ef..5130522 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java @@ -95,7 +95,7 @@ public final class JavaSqlNetworkWordCount { Dataset<Row> wordsDataFrame = spark.createDataFrame(rowRDD, JavaRecord.class); // Register as table - wordsDataFrame.registerTempTable("words"); + wordsDataFrame.createOrReplaceTempView("words"); // Do word count on table using SQL and print it Dataset<Row> wordCountsDataFrame = http://git-wip-us.apache.org/repos/asf/spark/blob/33c6eb52/examples/src/main/python/sql.py ---------------------------------------------------------------------- diff --git a/examples/src/main/python/sql.py b/examples/src/main/python/sql.py index d2e895d..2340240 100644 --- a/examples/src/main/python/sql.py +++ b/examples/src/main/python/sql.py @@ -67,7 +67,7 @@ if __name__ == "__main__": # |-- name: string (nullable = true) # Register this DataFrame as a temporary table. - people.registerTempTable("people") + people.createOrReplaceTempView("people") # SQL statements can be run by using the sql methods provided by `spark` teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") http://git-wip-us.apache.org/repos/asf/spark/blob/33c6eb52/examples/src/main/python/streaming/sql_network_wordcount.py ---------------------------------------------------------------------- diff --git a/examples/src/main/python/streaming/sql_network_wordcount.py b/examples/src/main/python/streaming/sql_network_wordcount.py index f8801d4..25e8215 100644 --- a/examples/src/main/python/streaming/sql_network_wordcount.py +++ b/examples/src/main/python/streaming/sql_network_wordcount.py @@ -71,7 +71,7 @@ if __name__ == "__main__": wordsDataFrame = spark.createDataFrame(rowRdd) # Register as table - wordsDataFrame.registerTempTable("words") + wordsDataFrame.createOrReplaceTempView("words") # Do word count on table using SQL and print it wordCountsDataFrame = \ http://git-wip-us.apache.org/repos/asf/spark/blob/33c6eb52/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala b/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala index 94c378a..d1bda0f 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala @@ -37,7 +37,7 @@ object RDDRelation { val df = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i"))) // Any RDD containing case classes can be registered as a table. The schema of the table is // automatically inferred using scala reflection. - df.registerTempTable("records") + df.createOrReplaceTempView("records") // Once tables have been registered, you can run SQL queries over them. println("Result of SELECT *:") @@ -67,7 +67,7 @@ object RDDRelation { parquetFile.where($"key" === 1).select($"value".as("a")).collect().foreach(println) // These files can also be registered as tables. - parquetFile.registerTempTable("parquetFile") + parquetFile.createOrReplaceTempView("parquetFile") spark.sql("SELECT * FROM parquetFile").collect().foreach(println) spark.stop() http://git-wip-us.apache.org/repos/asf/spark/blob/33c6eb52/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala index 9aba4a0..688c5b2 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala @@ -67,7 +67,7 @@ object SqlNetworkWordCount { val wordsDataFrame = rdd.map(w => Record(w)).toDF() // Register as table - wordsDataFrame.registerTempTable("words") + wordsDataFrame.createOrReplaceTempView("words") // Do word count on table using SQL and print it val wordCountsDataFrame = http://git-wip-us.apache.org/repos/asf/spark/blob/33c6eb52/mllib/src/main/scala/org/apache/spark/ml/feature/SQLTransformer.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/SQLTransformer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/SQLTransformer.scala index 400435d..2d4cac6 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/SQLTransformer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/SQLTransformer.scala @@ -48,6 +48,7 @@ class SQLTransformer @Since("1.6.0") (override val uid: String) extends Transfor /** * SQL statement parameter. The statement is provided in string form. + * * @group param */ @Since("1.6.0") @@ -66,7 +67,7 @@ class SQLTransformer @Since("1.6.0") (override val uid: String) extends Transfor @Since("2.0.0") override def transform(dataset: Dataset[_]): DataFrame = { val tableName = Identifiable.randomUID(uid) - dataset.registerTempTable(tableName) + dataset.createOrReplaceTempView(tableName) val realStatement = $(statement).replace(tableIdentifier, tableName) dataset.sparkSession.sql(realStatement) } @@ -79,7 +80,7 @@ class SQLTransformer @Since("1.6.0") (override val uid: String) extends Transfor val dummyDF = sqlContext.createDataFrame(dummyRDD, schema) val tableName = Identifiable.randomUID(uid) val realStatement = $(statement).replace(tableIdentifier, tableName) - dummyDF.registerTempTable(tableName) + dummyDF.createOrReplaceTempView(tableName) val outputSchema = sqlContext.sql(realStatement).schema sqlContext.dropTempTable(tableName) outputSchema http://git-wip-us.apache.org/repos/asf/spark/blob/33c6eb52/python/pyspark/sql/catalog.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py index 9cfdd0a..812dbba 100644 --- a/python/pyspark/sql/catalog.py +++ b/python/pyspark/sql/catalog.py @@ -166,34 +166,20 @@ class Catalog(object): return DataFrame(df, self._sparkSession._wrapped) @since(2.0) - def dropTempTable(self, tableName): - """Drops the temporary table with the given table name in the catalog. - If the table has been cached before, then it will also be uncached. + def dropTempView(self, viewName): + """Drops the temporary view with the given view name in the catalog. + If the view has been cached before, then it will also be uncached. - >>> spark.createDataFrame([(1, 1)]).registerTempTable("my_table") + >>> spark.createDataFrame([(1, 1)]).createTempView("my_table") >>> spark.table("my_table").collect() [Row(_1=1, _2=1)] - >>> spark.catalog.dropTempTable("my_table") + >>> spark.catalog.dropTempView("my_table") >>> spark.table("my_table") # doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ... AnalysisException: ... """ - self._jcatalog.dropTempTable(tableName) - - @since(2.0) - def registerTable(self, df, tableName): - """Registers the given :class:`DataFrame` as a temporary table in the catalog. - - >>> df = spark.createDataFrame([(2, 1), (3, 1)]) - >>> spark.catalog.registerTable(df, "my_cool_table") - >>> spark.table("my_cool_table").collect() - [Row(_1=2, _2=1), Row(_1=3, _2=1)] - """ - if isinstance(df, DataFrame): - self._jsparkSession.registerTable(df._jdf, tableName) - else: - raise ValueError("Can only register DataFrame as table") + self._jcatalog.dropTempView(viewName) @ignore_unicode_prefix @since(2.0) http://git-wip-us.apache.org/repos/asf/spark/blob/33c6eb52/python/pyspark/sql/context.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index 02e742c..ca111ae 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -302,7 +302,7 @@ class SQLContext(object): >>> sqlContext.registerDataFrameAsTable(df, "table1") """ - self.sparkSession.catalog.registerTable(df, tableName) + df.createOrReplaceTempView(tableName) @since(1.6) def dropTempTable(self, tableName): @@ -311,7 +311,7 @@ class SQLContext(object): >>> sqlContext.registerDataFrameAsTable(df, "table1") >>> sqlContext.dropTempTable("table1") """ - self.sparkSession.catalog.dropTempTable(tableName) + self.sparkSession.catalog.dropTempView(tableName) @since(1.3) def createExternalTable(self, tableName, path=None, source=None, schema=None, **options): http://git-wip-us.apache.org/repos/asf/spark/blob/33c6eb52/python/pyspark/sql/dataframe.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 49b4818..a0264ce 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -119,11 +119,55 @@ class DataFrame(object): that was used to create this :class:`DataFrame`. >>> df.registerTempTable("people") - >>> df2 = sqlContext.sql("select * from people") + >>> df2 = spark.sql("select * from people") >>> sorted(df.collect()) == sorted(df2.collect()) True + >>> spark.catalog.dropTempView("people") + + .. note:: Deprecated in 2.0, use createOrReplaceTempView instead. + """ + self._jdf.createOrReplaceTempView(name) + + @since(2.0) + def createTempView(self, name): + """Creates a temporary view with this DataFrame. + + The lifetime of this temporary table is tied to the :class:`SparkSession` + that was used to create this :class:`DataFrame`. + throws :class:`TempTableAlreadyExistsException`, if the view name already exists in the + catalog. + + >>> df.createTempView("people") + >>> df2 = spark.sql("select * from people") + >>> sorted(df.collect()) == sorted(df2.collect()) + True + >>> df.createTempView("people") # doctest: +IGNORE_EXCEPTION_DETAIL + Traceback (most recent call last): + ... + Py4JJavaError: ... + : org.apache.spark.sql.catalyst.analysis.TempTableAlreadyExistsException... + >>> spark.catalog.dropTempView("people") + + """ + self._jdf.createTempView(name) + + @since(2.0) + def createOrReplaceTempView(self, name): + """Creates or replaces a temporary view with this DataFrame. + + The lifetime of this temporary table is tied to the :class:`SparkSession` + that was used to create this :class:`DataFrame`. + + >>> df.createOrReplaceTempView("people") + >>> df2 = df.filter(df.age > 3) + >>> df2.createOrReplaceTempView("people") + >>> df3 = spark.sql("select * from people") + >>> sorted(df3.collect()) == sorted(df2.collect()) + True + >>> spark.catalog.dropTempView("people") + """ - self._jdf.registerTempTable(name) + self._jdf.createOrReplaceTempView(name) @property @since(1.4) @@ -1479,12 +1523,13 @@ class DataFrameStatFunctions(object): def _test(): import doctest from pyspark.context import SparkContext - from pyspark.sql import Row, SQLContext + from pyspark.sql import Row, SQLContext, SparkSession import pyspark.sql.dataframe globs = pyspark.sql.dataframe.__dict__.copy() sc = SparkContext('local[4]', 'PythonTest') globs['sc'] = sc globs['sqlContext'] = SQLContext(sc) + globs['spark'] = SparkSession(sc) globs['df'] = sc.parallelize([(2, 'Alice'), (5, 'Bob')])\ .toDF(StructType([StructField('age', IntegerType()), StructField('name', StringType())])) http://git-wip-us.apache.org/repos/asf/spark/blob/33c6eb52/python/pyspark/sql/session.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index 4ee9ab8..ae31435 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -160,7 +160,7 @@ class SparkSession(object): ... b=True, list=[1, 2, 3], dict={"s": 0}, row=Row(a=1), ... time=datetime(2014, 8, 1, 14, 1, 5))]) >>> df = allTypes.toDF() - >>> df.registerTempTable("allTypes") + >>> df.createOrReplaceTempView("allTypes") >>> spark.sql('select i+1, d+1, not b, list[1], dict["s"], time, row.a ' ... 'from allTypes where b and i > 0').collect() [Row((i + CAST(1 AS BIGINT))=2, (d + CAST(1 AS DOUBLE))=2.0, (NOT b)=False, list[1]=2, \ @@ -484,7 +484,7 @@ class SparkSession(object): :return: :class:`DataFrame` - >>> spark.catalog.registerTable(df, "table1") + >>> df.createOrReplaceTempView("table1") >>> df2 = spark.sql("SELECT field1 AS f1, field2 as f2 from table1") >>> df2.collect() [Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')] @@ -497,7 +497,7 @@ class SparkSession(object): :return: :class:`DataFrame` - >>> spark.catalog.registerTable(df, "table1") + >>> df.createOrReplaceTempView("table1") >>> df2 = spark.table("table1") >>> sorted(df.collect()) == sorted(df2.collect()) True http://git-wip-us.apache.org/repos/asf/spark/blob/33c6eb52/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala index 2e30d83..063eff4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.types.IntegerType * ("a", "ca1", "cb2", 5), * ("b", "ca1", "cb1", 13)) * .toDF("key", "cat1", "cat2", "value") - * data.registerTempTable("data") + * data.createOrReplaceTempView("data") * * val agg = data.groupBy($"key") * .agg( http://git-wip-us.apache.org/repos/asf/spark/blob/33c6eb52/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index f53311c5..0fc4ab5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -315,7 +315,7 @@ class SessionCatalog( /** * Create a temporary table. */ - def createTempTable( + def createTempView( name: String, tableDefinition: LogicalPlan, overrideIfExists: Boolean): Unit = synchronized { http://git-wip-us.apache.org/repos/asf/spark/blob/33c6eb52/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala index b1fcf01..3acb261 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala @@ -31,7 +31,7 @@ trait AnalysisTest extends PlanTest { private def makeAnalyzer(caseSensitive: Boolean): Analyzer = { val conf = new SimpleCatalystConf(caseSensitive) val catalog = new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, conf) - catalog.createTempTable("TaBlE", TestRelations.testRelation, overrideIfExists = true) + catalog.createTempView("TaBlE", TestRelations.testRelation, overrideIfExists = true) new Analyzer(catalog, conf) { override val extendedResolutionRules = EliminateSubqueryAliases :: Nil } http://git-wip-us.apache.org/repos/asf/spark/blob/33c6eb52/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala index b3b1f5b..66d9b4c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala @@ -52,7 +52,7 @@ class DecimalPrecisionSuite extends PlanTest with BeforeAndAfter { private val b: Expression = UnresolvedAttribute("b") before { - catalog.createTempTable("table", relation, overrideIfExists = true) + catalog.createTempView("table", relation, overrideIfExists = true) } private def checkType(expression: Expression, expectedType: DataType): Unit = { http://git-wip-us.apache.org/repos/asf/spark/blob/33c6eb52/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index 80422c2..726b7a1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -199,17 +199,17 @@ class SessionCatalogSuite extends SparkFunSuite { val catalog = new SessionCatalog(newBasicCatalog()) val tempTable1 = Range(1, 10, 1, 10, Seq()) val tempTable2 = Range(1, 20, 2, 10, Seq()) - catalog.createTempTable("tbl1", tempTable1, overrideIfExists = false) - catalog.createTempTable("tbl2", tempTable2, overrideIfExists = false) + catalog.createTempView("tbl1", tempTable1, overrideIfExists = false) + catalog.createTempView("tbl2", tempTable2, overrideIfExists = false) assert(catalog.getTempTable("tbl1") == Option(tempTable1)) assert(catalog.getTempTable("tbl2") == Option(tempTable2)) assert(catalog.getTempTable("tbl3").isEmpty) // Temporary table already exists intercept[TempTableAlreadyExistsException] { - catalog.createTempTable("tbl1", tempTable1, overrideIfExists = false) + catalog.createTempView("tbl1", tempTable1, overrideIfExists = false) } // Temporary table already exists but we override it - catalog.createTempTable("tbl1", tempTable2, overrideIfExists = true) + catalog.createTempView("tbl1", tempTable2, overrideIfExists = true) assert(catalog.getTempTable("tbl1") == Option(tempTable2)) } @@ -244,7 +244,7 @@ class SessionCatalogSuite extends SparkFunSuite { val externalCatalog = newBasicCatalog() val sessionCatalog = new SessionCatalog(externalCatalog) val tempTable = Range(1, 10, 2, 10, Seq()) - sessionCatalog.createTempTable("tbl1", tempTable, overrideIfExists = false) + sessionCatalog.createTempView("tbl1", tempTable, overrideIfExists = false) sessionCatalog.setCurrentDatabase("db2") assert(sessionCatalog.getTempTable("tbl1") == Some(tempTable)) assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2")) @@ -256,7 +256,7 @@ class SessionCatalogSuite extends SparkFunSuite { sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false) assert(externalCatalog.listTables("db2").toSet == Set("tbl2")) // If database is specified, temp tables are never dropped - sessionCatalog.createTempTable("tbl1", tempTable, overrideIfExists = false) + sessionCatalog.createTempView("tbl1", tempTable, overrideIfExists = false) sessionCatalog.createTable(newTable("tbl1", "db2"), ignoreIfExists = false) sessionCatalog.dropTable(TableIdentifier("tbl1", Some("db2")), ignoreIfNotExists = false) assert(sessionCatalog.getTempTable("tbl1") == Some(tempTable)) @@ -305,7 +305,7 @@ class SessionCatalogSuite extends SparkFunSuite { val externalCatalog = newBasicCatalog() val sessionCatalog = new SessionCatalog(externalCatalog) val tempTable = Range(1, 10, 2, 10, Seq()) - sessionCatalog.createTempTable("tbl1", tempTable, overrideIfExists = false) + sessionCatalog.createTempView("tbl1", tempTable, overrideIfExists = false) sessionCatalog.setCurrentDatabase("db2") assert(sessionCatalog.getTempTable("tbl1") == Option(tempTable)) assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2")) @@ -385,7 +385,7 @@ class SessionCatalogSuite extends SparkFunSuite { val sessionCatalog = new SessionCatalog(externalCatalog) val tempTable1 = Range(1, 10, 1, 10, Seq()) val metastoreTable1 = externalCatalog.getTable("db2", "tbl1") - sessionCatalog.createTempTable("tbl1", tempTable1, overrideIfExists = false) + sessionCatalog.createTempView("tbl1", tempTable1, overrideIfExists = false) sessionCatalog.setCurrentDatabase("db2") // If we explicitly specify the database, we'll look up the relation in that database assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1", Some("db2"))) @@ -423,7 +423,7 @@ class SessionCatalogSuite extends SparkFunSuite { assert(!catalog.tableExists(TableIdentifier("tbl2", Some("db1")))) // If database is explicitly specified, do not check temporary tables val tempTable = Range(1, 10, 1, 10, Seq()) - catalog.createTempTable("tbl3", tempTable, overrideIfExists = false) + catalog.createTempView("tbl3", tempTable, overrideIfExists = false) assert(!catalog.tableExists(TableIdentifier("tbl3", Some("db2")))) // If database is not explicitly specified, check the current database catalog.setCurrentDatabase("db2") @@ -435,8 +435,8 @@ class SessionCatalogSuite extends SparkFunSuite { test("list tables without pattern") { val catalog = new SessionCatalog(newBasicCatalog()) val tempTable = Range(1, 10, 2, 10, Seq()) - catalog.createTempTable("tbl1", tempTable, overrideIfExists = false) - catalog.createTempTable("tbl4", tempTable, overrideIfExists = false) + catalog.createTempView("tbl1", tempTable, overrideIfExists = false) + catalog.createTempView("tbl4", tempTable, overrideIfExists = false) assert(catalog.listTables("db1").toSet == Set(TableIdentifier("tbl1"), TableIdentifier("tbl4"))) assert(catalog.listTables("db2").toSet == @@ -452,8 +452,8 @@ class SessionCatalogSuite extends SparkFunSuite { test("list tables with pattern") { val catalog = new SessionCatalog(newBasicCatalog()) val tempTable = Range(1, 10, 2, 10, Seq()) - catalog.createTempTable("tbl1", tempTable, overrideIfExists = false) - catalog.createTempTable("tbl4", tempTable, overrideIfExists = false) + catalog.createTempView("tbl1", tempTable, overrideIfExists = false) + catalog.createTempView("tbl4", tempTable, overrideIfExists = false) assert(catalog.listTables("db1", "*").toSet == catalog.listTables("db1").toSet) assert(catalog.listTables("db2", "*").toSet == catalog.listTables("db2").toSet) assert(catalog.listTables("db2", "tbl*").toSet == http://git-wip-us.apache.org/repos/asf/spark/blob/33c6eb52/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index a9e8329..6f5fb69 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -321,7 +321,7 @@ final class DataFrameWriter private[sql](df: DataFrame) { val sink = new MemorySink(df.schema) val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlan(sink)) - resultDf.registerTempTable(queryName) + resultDf.createOrReplaceTempView(queryName) val continuousQuery = df.sparkSession.sessionState.continuousQueryManager.startQuery( queryName, checkpointLocation, http://git-wip-us.apache.org/repos/asf/spark/blob/33c6eb52/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 45a69ca..210ad95 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2303,13 +2303,39 @@ class Dataset[T] private[sql]( /** * Registers this [[Dataset]] as a temporary table using the given name. The lifetime of this - * temporary table is tied to the [[SQLContext]] that was used to create this Dataset. + * temporary table is tied to the [[SparkSession]] that was used to create this Dataset. * * @group basic * @since 1.6.0 */ + @deprecated("Use createOrReplaceTempView(viewName) instead.", "2.0.0") def registerTempTable(tableName: String): Unit = { - sparkSession.registerTable(toDF(), tableName) + createOrReplaceTempView(tableName) + } + + /** + * Creates a temporary view using the given name. The lifetime of this + * temporary view is tied to the [[SparkSession]] that was used to create this Dataset. + * + * @throws AnalysisException if the view name already exists + * + * @group basic + * @since 2.0.0 + */ + @throws[AnalysisException] + def createTempView(viewName: String): Unit = { + sparkSession.createTempView(viewName, toDF(), replaceIfExists = false) + } + + /** + * Creates a temporary view using the given name. The lifetime of this + * temporary view is tied to the [[SparkSession]] that was used to create this Dataset. + * + * @group basic + * @since 2.0.0 + */ + def createOrReplaceTempView(viewName: String): Unit = { + sparkSession.createTempView(viewName, toDF(), replaceIfExists = true) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/33c6eb52/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 168ac7e..c64e284 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -597,7 +597,7 @@ class SQLContext private[sql]( * only during the lifetime of this instance of SQLContext. */ private[sql] def registerDataFrameAsTable(df: DataFrame, tableName: String): Unit = { - sparkSession.registerTable(df, tableName) + sparkSession.createTempView(tableName, df, replaceIfExists = true) } /** @@ -609,7 +609,7 @@ class SQLContext private[sql]( * @since 1.3.0 */ def dropTempTable(tableName: String): Unit = { - sparkSession.catalog.dropTempTable(tableName) + sparkSession.catalog.dropTempView(tableName) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/33c6eb52/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index c7fa8f7..02c9dc0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -284,7 +284,7 @@ class SparkSession private( * // |-- name: string (nullable = false) * // |-- age: integer (nullable = true) * - * dataFrame.registerTempTable("people") + * dataFrame.createOrReplaceTempView("people") * sparkSession.sql("select name from people").collect.foreach(println) * }}} * @@ -515,17 +515,16 @@ class SparkSession private( } /** - * Registers the given [[DataFrame]] as a temporary table in the catalog. - * Temporary tables exist only during the lifetime of this instance of [[SparkSession]]. + * Creates a temporary view with a DataFrame. The lifetime of this temporary view is tied to + * this [[SparkSession]]. */ - protected[sql] def registerTable(df: DataFrame, tableName: String): Unit = { - sessionState.catalog.createTempTable( - sessionState.sqlParser.parseTableIdentifier(tableName).table, - df.logicalPlan, - overrideIfExists = true) + protected[sql] def createTempView( + viewName: String, df: DataFrame, replaceIfExists: Boolean) = { + sessionState.catalog.createTempView( + sessionState.sqlParser.parseTableIdentifier(viewName).table, + df.logicalPlan, replaceIfExists) } - /* ----------------- * | Everything else | * ----------------- */ http://git-wip-us.apache.org/repos/asf/spark/blob/33c6eb52/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala index 7a815c1..49c0742 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala @@ -175,13 +175,13 @@ abstract class Catalog { options: Map[String, String]): DataFrame /** - * Drops the temporary table with the given table name in the catalog. - * If the table has been cached before, then it will also be uncached. + * Drops the temporary view with the given view name in the catalog. + * If the view has been cached before, then it will also be uncached. * - * @param tableName the name of the table to be dropped. + * @param viewName the name of the view to be dropped. * @since 2.0.0 */ - def dropTempTable(tableName: String): Unit + def dropTempView(viewName: String): Unit /** * Returns true if the table is currently cached in-memory. http://git-wip-us.apache.org/repos/asf/spark/blob/33c6eb52/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index b6e074b..3045f3a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -753,7 +753,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { if (temp) { throw new ParseException( "CREATE TEMPORARY TABLE is not supported yet. " + - "Please use registerTempTable as an alternative.", ctx) + "Please use CREATE TEMPORARY VIEW as an alternative.", ctx) } if (ctx.skewSpec != null) { throw operationNotAllowed("CREATE TABLE ... SKEWED BY", ctx) http://git-wip-us.apache.org/repos/asf/spark/blob/33c6eb52/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala index f05401b..31dc016 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala @@ -30,7 +30,8 @@ case class CacheTableCommand( override def run(sparkSession: SparkSession): Seq[Row] = { plan.foreach { logicalPlan => - sparkSession.registerTable(Dataset.ofRows(sparkSession, logicalPlan), tableName) + sparkSession.createTempView( + tableName, Dataset.ofRows(sparkSession, logicalPlan), replaceIfExists = true) } sparkSession.catalog.cacheTable(tableName) http://git-wip-us.apache.org/repos/asf/spark/blob/33c6eb52/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index 70ce5c8..075849a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -136,7 +136,7 @@ case class CreateViewCommand( } } - catalog.createTempTable(table.table, logicalPlan, replace) + catalog.createTempView(table.table, logicalPlan, replace) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/33c6eb52/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala index 3863be5..68238db 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala @@ -82,7 +82,7 @@ case class CreateTempTableUsing( userSpecifiedSchema = userSpecifiedSchema, className = provider, options = options) - sparkSession.sessionState.catalog.createTempTable( + sparkSession.sessionState.catalog.createTempView( tableIdent.table, Dataset.ofRows(sparkSession, LogicalRelation(dataSource.resolveRelation())).logicalPlan, overrideIfExists = true) @@ -113,7 +113,7 @@ case class CreateTempTableUsingAsSelect( bucketSpec = None, options = options) val result = dataSource.write(mode, df) - sparkSession.sessionState.catalog.createTempTable( + sparkSession.sessionState.catalog.createTempView( tableIdent.table, Dataset.ofRows(sparkSession, LogicalRelation(result)).logicalPlan, overrideIfExists = true) http://git-wip-us.apache.org/repos/asf/spark/blob/33c6eb52/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index fdfb188..473e827 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -283,16 +283,16 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { } /** - * Drops the temporary table with the given table name in the catalog. - * If the table has been cached/persisted before, it's also unpersisted. + * Drops the temporary view with the given view name in the catalog. + * If the view has been cached/persisted before, it's also unpersisted. * - * @param tableName the name of the table to be unregistered. + * @param viewName the name of the view to be dropped. * @group ddl_ops * @since 2.0.0 */ - override def dropTempTable(tableName: String): Unit = { - sparkSession.cacheManager.tryUncacheQuery(sparkSession.table(tableName)) - sessionCatalog.dropTable(TableIdentifier(tableName), ignoreIfNotExists = true) + override def dropTempView(viewName: String): Unit = { + sparkSession.cacheManager.tryUncacheQuery(sparkSession.table(viewName)) + sessionCatalog.dropTable(TableIdentifier(viewName), ignoreIfNotExists = true) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/33c6eb52/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 800316c..6d8de80 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -288,7 +288,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext test("Drops temporary table") { testData.select('key).registerTempTable("t1") spark.table("t1") - spark.catalog.dropTempTable("t1") + spark.catalog.dropTempView("t1") intercept[AnalysisException](spark.table("t1")) } @@ -300,7 +300,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext assert(spark.catalog.isCached("t1")) assert(spark.catalog.isCached("t2")) - spark.catalog.dropTempTable("t1") + spark.catalog.dropTempView("t1") intercept[AnalysisException](spark.table("t1")) assert(!spark.catalog.isCached("t2")) } @@ -382,7 +382,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext sql("SELECT key, count(*) FROM orderedTable GROUP BY key ORDER BY key"), sql("SELECT key, count(*) FROM testData3x GROUP BY key ORDER BY key").collect()) spark.catalog.uncacheTable("orderedTable") - spark.catalog.dropTempTable("orderedTable") + spark.catalog.dropTempView("orderedTable") // Set up two tables distributed in the same way. Try this with the data distributed into // different number of partitions. http://git-wip-us.apache.org/repos/asf/spark/blob/33c6eb52/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index f77403c..f573abf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -994,17 +994,18 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { // pass case: parquet table (HadoopFsRelation) df.write.mode(SaveMode.Overwrite).parquet(tempParquetFile.getCanonicalPath) val pdf = spark.read.parquet(tempParquetFile.getCanonicalPath) - pdf.registerTempTable("parquet_base") + pdf.createOrReplaceTempView("parquet_base") + insertion.write.insertInto("parquet_base") // pass case: json table (InsertableRelation) df.write.mode(SaveMode.Overwrite).json(tempJsonFile.getCanonicalPath) val jdf = spark.read.json(tempJsonFile.getCanonicalPath) - jdf.registerTempTable("json_base") + jdf.createOrReplaceTempView("json_base") insertion.write.mode(SaveMode.Overwrite).insertInto("json_base") // error cases: insert into an RDD - df.registerTempTable("rdd_base") + df.createOrReplaceTempView("rdd_base") val e1 = intercept[AnalysisException] { insertion.write.insertInto("rdd_base") } @@ -1012,14 +1013,14 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { // error case: insert into a logical plan that is not a LeafNode val indirectDS = pdf.select("_1").filter($"_1" > 5) - indirectDS.registerTempTable("indirect_ds") + indirectDS.createOrReplaceTempView("indirect_ds") val e2 = intercept[AnalysisException] { insertion.write.insertInto("indirect_ds") } assert(e2.getMessage.contains("Inserting into an RDD-based table is not allowed.")) // error case: insert into an OneRowRelation - Dataset.ofRows(spark, OneRowRelation).registerTempTable("one_row") + Dataset.ofRows(spark, OneRowRelation).createOrReplaceTempView("one_row") val e3 = intercept[AnalysisException] { insertion.write.insertInto("one_row") } @@ -1443,13 +1444,13 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { test("SPARK-12982: Add table name validation in temp table registration") { val df = Seq("foo", "bar").map(Tuple1.apply).toDF("col") // invalid table name test as below - intercept[AnalysisException](df.registerTempTable("t~")) + intercept[AnalysisException](df.createOrReplaceTempView("t~")) // valid table name test as below - df.registerTempTable("table1") + df.createOrReplaceTempView("table1") // another invalid table name test as below - intercept[AnalysisException](df.registerTempTable("#$@sum")) + intercept[AnalysisException](df.createOrReplaceTempView("#$@sum")) // another invalid table name test as below - intercept[AnalysisException](df.registerTempTable("table!#")) + intercept[AnalysisException](df.createOrReplaceTempView("table!#")) } test("assertAnalyzed shouldn't replace original stack trace") { http://git-wip-us.apache.org/repos/asf/spark/blob/33c6eb52/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala index a957d5b..4ee2006 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala @@ -249,7 +249,7 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSQLContext with B try { f(tableName) } finally { - spark.catalog.dropTempTable(tableName) + spark.catalog.dropTempView(tableName) } } http://git-wip-us.apache.org/repos/asf/spark/blob/33c6eb52/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 3c8c862..0784041 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -22,6 +22,8 @@ import java.sql.{Date, Timestamp} import scala.language.postfixOps +import org.scalatest.words.MatcherWords.be + import org.apache.spark.sql.catalyst.encoders.{OuterScopes, RowEncoder} import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.functions._ @@ -674,6 +676,22 @@ class DatasetSuite extends QueryTest with SharedSQLContext { }.getMessage assert(message.contains("The 0th field of input row cannot be null")) } + + test("createTempView") { + val dataset = Seq(1, 2, 3).toDS() + dataset.createOrReplaceTempView("tempView") + + // Overrrides the existing temporary view with same name + // No exception should be thrown here. + dataset.createOrReplaceTempView("tempView") + + // Throws AnalysisException if temp view with same name already exists + val e = intercept[AnalysisException]( + dataset.createTempView("tempView")) + intercept[AnalysisException](dataset.createTempView("tempView")) + assert(e.message.contains("already exists")) + dataset.sparkSession.catalog.dropTempView("tempView") + } } case class OtherTuple(_1: String, _2: Int) http://git-wip-us.apache.org/repos/asf/spark/blob/33c6eb52/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala index c88dfe5..1c6e6cc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala @@ -83,7 +83,7 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex checkAnswer( spark.wrapped.tables().filter("tableName = 'tables'").select("tableName", "isTemporary"), Row("tables", true)) - spark.catalog.dropTempTable("tables") + spark.catalog.dropTempView("tables") } } } http://git-wip-us.apache.org/repos/asf/spark/blob/33c6eb52/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 3bbe87a..7020841 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -333,7 +333,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { "SELECT sum('a'), avg('a'), count(null) FROM testData", Row(null, null, 0) :: Nil) } finally { - spark.catalog.dropTempTable("testData3x") + spark.catalog.dropTempView("testData3x") } } @@ -1453,12 +1453,12 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { spark.read.json(sparkContext.makeRDD("""{"a": {"b": [{"c": 1}]}}""" :: Nil)) .registerTempTable("data") checkAnswer(sql("SELECT a.b[0].c FROM data GROUP BY a.b[0].c"), Row(1)) - spark.catalog.dropTempTable("data") + spark.catalog.dropTempView("data") spark.read.json( sparkContext.makeRDD("""{"a": {"b": 1}}""" :: Nil)).registerTempTable("data") checkAnswer(sql("SELECT a.b + 1 FROM data GROUP BY a.b + 1"), Row(2)) - spark.catalog.dropTempTable("data") + spark.catalog.dropTempView("data") } test("SPARK-4432 Fix attribute reference resolution error when using ORDER BY") { http://git-wip-us.apache.org/repos/asf/spark/blob/33c6eb52/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala index 427f24a..9221543 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala @@ -55,7 +55,7 @@ class UDFSuite extends QueryTest with SharedSQLContext { val df = Seq((1, "Tearing down the walls that divide us")).toDF("id", "saying") df.registerTempTable("tmp_table") checkAnswer(sql("select spark_partition_id() from tmp_table").toDF(), Row(0)) - spark.catalog.dropTempTable("tmp_table") + spark.catalog.dropTempView("tmp_table") } test("SPARK-8005 input_file_name") { @@ -66,7 +66,7 @@ class UDFSuite extends QueryTest with SharedSQLContext { val answer = sql("select input_file_name() from test_table").head().getString(0) assert(answer.contains(dir.getCanonicalPath)) assert(sql("select input_file_name() from test_table").distinct().collect().length >= 2) - spark.catalog.dropTempTable("test_table") + spark.catalog.dropTempView("test_table") } } http://git-wip-us.apache.org/repos/asf/spark/blob/33c6eb52/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala index 88269a6..2099d4e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala @@ -191,7 +191,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { checkAnswer( sql(s"SELECT DISTINCT ${allColumns} FROM InMemoryCache_different_data_types"), spark.table("InMemoryCache_different_data_types").collect()) - spark.catalog.dropTempTable("InMemoryCache_different_data_types") + spark.catalog.dropTempView("InMemoryCache_different_data_types") } test("SPARK-10422: String column in InMemoryColumnarCache needs to override clone method") { http://git-wip-us.apache.org/repos/asf/spark/blob/33c6eb52/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala index 373d3a3..69a600a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala @@ -52,7 +52,7 @@ object ParquetReadBenchmark { } def withTempTable(tableNames: String*)(f: => Unit): Unit = { - try f finally tableNames.foreach(spark.catalog.dropTempTable) + try f finally tableNames.foreach(spark.catalog.dropTempView) } def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = { http://git-wip-us.apache.org/repos/asf/spark/blob/33c6eb52/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala index d8a2c38..e4d4cec 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala @@ -58,7 +58,7 @@ class CatalogSuite } private def createTempTable(name: String): Unit = { - sessionCatalog.createTempTable(name, Range(1, 2, 3, 4, Seq()), overrideIfExists = true) + sessionCatalog.createTempView(name, Range(1, 2, 3, 4, Seq()), overrideIfExists = true) } private def dropTable(name: String, db: Option[String] = None): Unit = { http://git-wip-us.apache.org/repos/asf/spark/blob/33c6eb52/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala index a49a8c9..45a9c9d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -153,7 +153,7 @@ private[sql] trait SQLTestUtils try f finally { // If the test failed part way, we don't want to mask the failure by failing to remove // temp tables that never got created. - try tableNames.foreach(spark.catalog.dropTempTable) catch { + try tableNames.foreach(spark.catalog.dropTempView) catch { case _: NoSuchTableException => } } http://git-wip-us.apache.org/repos/asf/spark/blob/33c6eb52/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 2f20cde..4c528fb 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -130,7 +130,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log options = options) LogicalRelation( - dataSource.resolveRelation(), + dataSource.resolveRelation(checkPathExist = true), metastoreTableIdentifier = Some(TableIdentifier(in.name, Some(in.database)))) } } http://git-wip-us.apache.org/repos/asf/spark/blob/33c6eb52/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala index 093cd3a..d96eb01 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala @@ -31,7 +31,7 @@ class ErrorPositionSuite extends QueryTest with TestHiveSingleton with BeforeAnd override protected def beforeEach(): Unit = { super.beforeEach() if (spark.wrapped.tableNames().contains("src")) { - spark.catalog.dropTempTable("src") + spark.catalog.dropTempView("src") } Seq((1, "")).toDF("key", "value").registerTempTable("src") Seq((1, 1, 1)).toDF("a", "a", "b").registerTempTable("dupAttributes") @@ -39,8 +39,8 @@ class ErrorPositionSuite extends QueryTest with TestHiveSingleton with BeforeAnd override protected def afterEach(): Unit = { try { - spark.catalog.dropTempTable("src") - spark.catalog.dropTempTable("dupAttributes") + spark.catalog.dropTempView("src") + spark.catalog.dropTempView("dupAttributes") } finally { super.afterEach() } http://git-wip-us.apache.org/repos/asf/spark/blob/33c6eb52/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala index 538e218..2d8b1f3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala @@ -348,7 +348,7 @@ class HiveDDLCommandSuite extends PlanTest { test("create table - temporary") { val query = "CREATE TEMPORARY TABLE tab1 (id int, name string)" val e = intercept[ParseException] { parser.parsePlan(query) } - assert(e.message.contains("registerTempTable")) + assert(e.message.contains("CREATE TEMPORARY TABLE is not supported yet")) } test("create table - external") { http://git-wip-us.apache.org/repos/asf/spark/blob/33c6eb52/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala index e8188e5..8dc756b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala @@ -33,7 +33,7 @@ class ListTablesSuite extends QueryTest with TestHiveSingleton with BeforeAndAft override def beforeAll(): Unit = { super.beforeAll() // The catalog in HiveContext is a case insensitive one. - sessionState.catalog.createTempTable( + sessionState.catalog.createTempView( "ListTablesSuiteTable", df.logicalPlan, overrideIfExists = true) sql("CREATE TABLE HiveListTablesSuiteTable (key int, value string)") sql("CREATE DATABASE IF NOT EXISTS ListTablesSuiteDB") http://git-wip-us.apache.org/repos/asf/spark/blob/33c6eb52/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala index 0f416eb..c97b3f3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala @@ -193,7 +193,7 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te spark.sql("DROP TABLE IF EXISTS agg1") spark.sql("DROP TABLE IF EXISTS agg2") spark.sql("DROP TABLE IF EXISTS agg3") - spark.catalog.dropTempTable("emptyTable") + spark.catalog.dropTempView("emptyTable") } finally { super.afterAll() } http://git-wip-us.apache.org/repos/asf/spark/blob/33c6eb52/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala index 51d537d..521964e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -347,7 +347,7 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { sql("DROP TEMPORARY FUNCTION IF EXISTS testUDTFExplode") } - spark.catalog.dropTempTable("testUDF") + spark.catalog.dropTempView("testUDF") } test("Hive UDF in group by") { http://git-wip-us.apache.org/repos/asf/spark/blob/33c6eb52/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLWindowFunctionSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLWindowFunctionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLWindowFunctionSuite.scala index cbbeacf..4d284e1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLWindowFunctionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLWindowFunctionSuite.scala @@ -353,7 +353,7 @@ class SQLWindowFunctionSuite extends QueryTest with SQLTestUtils with TestHiveSi checkAnswer(actual, expected) - spark.catalog.dropTempTable("nums") + spark.catalog.dropTempView("nums") } test("SPARK-7595: Window will cause resolve failed with self join") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org