Repository: spark Updated Branches: refs/heads/branch-2.2 7f35f5b99 -> 9a4a8e1b0
[SPARK-19236][SQL][BACKPORT-2.2] Added createOrReplaceGlobalTempView method ### What changes were proposed in this pull request? This PR is to backport two PRs for adding the `createOrReplaceGlobalTempView` method https://github.com/apache/spark/pull/18147 https://github.com/apache/spark/pull/16598 --- Added the createOrReplaceGlobalTempView method for dataset API ### How was this patch tested? N/A Author: Xiao Li <gatorsm...@gmail.com> Closes #18167 from gatorsmile/Backport18147. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9a4a8e1b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9a4a8e1b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9a4a8e1b Branch: refs/heads/branch-2.2 Commit: 9a4a8e1b010bcfa187360c8331ef897195732638 Parents: 7f35f5b Author: Xiao Li <gatorsm...@gmail.com> Authored: Fri Jun 2 11:57:22 2017 -0700 Committer: Xiao Li <gatorsm...@gmail.com> Committed: Fri Jun 2 11:57:22 2017 -0700 ---------------------------------------------------------------------- python/pyspark/sql/dataframe.py | 17 ++++++ .../scala/org/apache/spark/sql/Dataset.scala | 16 ++++++ .../sql/execution/GlobalTempViewSuite.scala | 60 +++++++++++--------- 3 files changed, 67 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/9a4a8e1b/python/pyspark/sql/dataframe.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 8a59fcd..b1eb80e 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -191,6 +191,23 @@ class DataFrame(object): """ self._jdf.createGlobalTempView(name) + @since(2.2) + def createOrReplaceGlobalTempView(self, name): + """Creates or replaces a global temporary view using the given name. + + The lifetime of this temporary view is tied to this Spark application. + + >>> df.createOrReplaceGlobalTempView("people") + >>> df2 = df.filter(df.age > 3) + >>> df2.createOrReplaceGlobalTempView("people") + >>> df3 = spark.sql("select * from global_temp.people") + >>> sorted(df3.collect()) == sorted(df2.collect()) + True + >>> spark.catalog.dropGlobalTempView("people") + + """ + self._jdf.createOrReplaceGlobalTempView(name) + @property @since(1.4) def write(self): http://git-wip-us.apache.org/repos/asf/spark/blob/9a4a8e1b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index f491e3c..503b540 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2657,6 +2657,22 @@ class Dataset[T] private[sql]( createTempViewCommand(viewName, replace = false, global = true) } + /** + * Creates or replaces a global temporary view using the given name. The lifetime of this + * temporary view is tied to this Spark application. + * + * Global temporary view is cross-session. Its lifetime is the lifetime of the Spark application, + * i.e. it will be automatically dropped when the application terminates. It's tied to a system + * preserved database `_global_temp`, and we must use the qualified name to refer a global temp + * view, e.g. `SELECT * FROM _global_temp.view1`. + * + * @group basic + * @since 2.2.0 + */ + def createOrReplaceGlobalTempView(viewName: String): Unit = withPlan { + createTempViewCommand(viewName, replace = true, global = true) + } + private def createTempViewCommand( viewName: String, replace: Boolean, http://git-wip-us.apache.org/repos/asf/spark/blob/9a4a8e1b/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala index 5c63c6a..a3d75b2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala @@ -35,39 +35,47 @@ class GlobalTempViewSuite extends QueryTest with SharedSQLContext { private var globalTempDB: String = _ test("basic semantic") { - sql("CREATE GLOBAL TEMP VIEW src AS SELECT 1, 'a'") + try { + sql("CREATE GLOBAL TEMP VIEW src AS SELECT 1, 'a'") + + // If there is no database in table name, we should try local temp view first, if not found, + // try table/view in current database, which is "default" in this case. So we expect + // NoSuchTableException here. + intercept[NoSuchTableException](spark.table("src")) - // If there is no database in table name, we should try local temp view first, if not found, - // try table/view in current database, which is "default" in this case. So we expect - // NoSuchTableException here. - intercept[NoSuchTableException](spark.table("src")) + // Use qualified name to refer to the global temp view explicitly. + checkAnswer(spark.table(s"$globalTempDB.src"), Row(1, "a")) - // Use qualified name to refer to the global temp view explicitly. - checkAnswer(spark.table(s"$globalTempDB.src"), Row(1, "a")) + // Table name without database will never refer to a global temp view. + intercept[NoSuchTableException](sql("DROP VIEW src")) - // Table name without database will never refer to a global temp view. - intercept[NoSuchTableException](sql("DROP VIEW src")) + sql(s"DROP VIEW $globalTempDB.src") + // The global temp view should be dropped successfully. + intercept[NoSuchTableException](spark.table(s"$globalTempDB.src")) - sql(s"DROP VIEW $globalTempDB.src") - // The global temp view should be dropped successfully. - intercept[NoSuchTableException](spark.table(s"$globalTempDB.src")) + // We can also use Dataset API to create global temp view + Seq(1 -> "a").toDF("i", "j").createGlobalTempView("src") + checkAnswer(spark.table(s"$globalTempDB.src"), Row(1, "a")) - // We can also use Dataset API to create global temp view - Seq(1 -> "a").toDF("i", "j").createGlobalTempView("src") - checkAnswer(spark.table(s"$globalTempDB.src"), Row(1, "a")) + // Use qualified name to rename a global temp view. + sql(s"ALTER VIEW $globalTempDB.src RENAME TO src2") + intercept[NoSuchTableException](spark.table(s"$globalTempDB.src")) + checkAnswer(spark.table(s"$globalTempDB.src2"), Row(1, "a")) - // Use qualified name to rename a global temp view. - sql(s"ALTER VIEW $globalTempDB.src RENAME TO src2") - intercept[NoSuchTableException](spark.table(s"$globalTempDB.src")) - checkAnswer(spark.table(s"$globalTempDB.src2"), Row(1, "a")) + // Use qualified name to alter a global temp view. + sql(s"ALTER VIEW $globalTempDB.src2 AS SELECT 2, 'b'") + checkAnswer(spark.table(s"$globalTempDB.src2"), Row(2, "b")) - // Use qualified name to alter a global temp view. - sql(s"ALTER VIEW $globalTempDB.src2 AS SELECT 2, 'b'") - checkAnswer(spark.table(s"$globalTempDB.src2"), Row(2, "b")) + // We can also use Catalog API to drop global temp view + spark.catalog.dropGlobalTempView("src2") + intercept[NoSuchTableException](spark.table(s"$globalTempDB.src2")) - // We can also use Catalog API to drop global temp view - spark.catalog.dropGlobalTempView("src2") - intercept[NoSuchTableException](spark.table(s"$globalTempDB.src2")) + // We can also use Dataset API to replace global temp view + Seq(2 -> "b").toDF("i", "j").createOrReplaceGlobalTempView("src") + checkAnswer(spark.table(s"$globalTempDB.src"), Row(2, "b")) + } finally { + spark.catalog.dropGlobalTempView("src") + } } test("global temp view is shared among all sessions") { @@ -106,7 +114,7 @@ class GlobalTempViewSuite extends QueryTest with SharedSQLContext { test("CREATE TABLE LIKE should work for global temp view") { try { sql("CREATE GLOBAL TEMP VIEW src AS SELECT 1 AS a, '2' AS b") - sql(s"CREATE TABLE cloned LIKE ${globalTempDB}.src") + sql(s"CREATE TABLE cloned LIKE $globalTempDB.src") val tableMeta = spark.sessionState.catalog.getTableMetadata(TableIdentifier("cloned")) assert(tableMeta.schema == new StructType().add("a", "int", false).add("b", "string", false)) } finally { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org