[GitHub] [spark] cloud-fan commented on a change in pull request #30403: [SPARK-33448][SQL] Support CACHE/UNCACHE TABLE commands for v2 tables

2020-12-02 Thread GitBox


cloud-fan commented on a change in pull request #30403:
URL: https://github.com/apache/spark/pull/30403#discussion_r534671970



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
##
@@ -19,26 +19,27 @@ package org.apache.spark.sql.execution.command
 
 import java.util.Locale
 
-import org.apache.spark.sql.{Dataset, Row, SparkSession}
-import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, Row, 
SparkSession}
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, 
LogicalPlan}
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
 import org.apache.spark.storage.StorageLevel
 
 case class CacheTableCommand(

Review comment:
   ah, one solution is to follow `InsertIntoStatement` and do not make the 
`table` as a child. Then we resolve `UnresolvedRelation` inside `CacheTable` 
manually in `ResolveTempViews` and other resolution rules.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #30403: [SPARK-33448][SQL] Support CACHE/UNCACHE TABLE commands for v2 tables

2020-11-29 Thread GitBox


cloud-fan commented on a change in pull request #30403:
URL: https://github.com/apache/spark/pull/30403#discussion_r532360022



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
##
@@ -19,26 +19,27 @@ package org.apache.spark.sql.execution.command
 
 import java.util.Locale
 
-import org.apache.spark.sql.{Dataset, Row, SparkSession}
-import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, Row, 
SparkSession}
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, 
LogicalPlan}
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
 import org.apache.spark.storage.StorageLevel
 
 case class CacheTableCommand(

Review comment:
   the next thing we can do is to refactor it using the v2 framework (not 
adding a v2 version). The benefits are: 1. moving the logical plan to catalyst. 
2. resolve the table in the analyzer. e.g.
   ```
   CacheTable(UnresolvedRelation(...), ...)
   ...
   case class CacheTableExec(relation: LogicalPlan) {
 def run() {
val df  = Dataset.ofRows(spark, relation)

 }
   }
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #30403: [SPARK-33448][SQL] Support CACHE/UNCACHE TABLE commands for v2 tables

2020-11-29 Thread GitBox


cloud-fan commented on a change in pull request #30403:
URL: https://github.com/apache/spark/pull/30403#discussion_r532357066



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
##
@@ -2555,11 +2557,15 @@ class DataSourceV2SQLSuite
 }
   }
 
-  private def testNotSupportedV2Command(sqlCommand: String, sqlParams: 
String): Unit = {
+  private def testNotSupportedV2Command(

Review comment:
   unnecessary change. This is minor and let's fix it in your next PR.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #30403: [SPARK-33448][SQL] Support CACHE/UNCACHE TABLE commands for v2 tables

2020-11-27 Thread GitBox


cloud-fan commented on a change in pull request #30403:
URL: https://github.com/apache/spark/pull/30403#discussion_r531453069



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
##
@@ -51,32 +54,35 @@ case class CacheTableCommand(
 
 if (storageLevelValue.nonEmpty) {
   sparkSession.catalog.cacheTable(
-tableIdent.quotedString, 
StorageLevel.fromString(storageLevelValue.get))
+tableName, StorageLevel.fromString(storageLevelValue.get))
 } else {
-  sparkSession.catalog.cacheTable(tableIdent.quotedString)
+  sparkSession.catalog.cacheTable(tableName)
 }
 
 if (!isLazy) {
   // Performs eager caching
-  sparkSession.table(tableIdent).count()
+  sparkSession.table(tableName).count()
 }
 
 Seq.empty[Row]
   }
 }
 
-
 case class UncacheTableCommand(
-tableIdent: TableIdentifier,
+multipartIdentifier: Seq[String],
 ifExists: Boolean) extends RunnableCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
-val tableId = tableIdent.quotedString
-if (!ifExists || sparkSession.catalog.tableExists(tableId)) {

Review comment:
   yup





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #30403: [SPARK-33448][SQL] Support CACHE/UNCACHE TABLE commands for v2 tables

2020-11-26 Thread GitBox


cloud-fan commented on a change in pull request #30403:
URL: https://github.com/apache/spark/pull/30403#discussion_r531426411



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
##
@@ -51,32 +54,35 @@ case class CacheTableCommand(
 
 if (storageLevelValue.nonEmpty) {
   sparkSession.catalog.cacheTable(
-tableIdent.quotedString, 
StorageLevel.fromString(storageLevelValue.get))
+tableName, StorageLevel.fromString(storageLevelValue.get))
 } else {
-  sparkSession.catalog.cacheTable(tableIdent.quotedString)
+  sparkSession.catalog.cacheTable(tableName)
 }
 
 if (!isLazy) {
   // Performs eager caching
-  sparkSession.table(tableIdent).count()
+  sparkSession.table(tableName).count()
 }
 
 Seq.empty[Row]
   }
 }
 
-
 case class UncacheTableCommand(
-tableIdent: TableIdentifier,
+multipartIdentifier: Seq[String],
 ifExists: Boolean) extends RunnableCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
-val tableId = tableIdent.quotedString
-if (!ifExists || sparkSession.catalog.tableExists(tableId)) {

Review comment:
   That's a good point, then we can't change `uncacheTable` either...
   
   We need to create a custom `uncacheTable` method in this command as well.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #30403: [SPARK-33448][SQL] Support CACHE/UNCACHE TABLE commands for v2 tables

2020-11-25 Thread GitBox


cloud-fan commented on a change in pull request #30403:
URL: https://github.com/apache/spark/pull/30403#discussion_r530759305



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
##
@@ -471,8 +471,11 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
* @since 2.0.0
*/
   override def uncacheTable(tableName: String): Unit = {
-val tableIdent = 
sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
-val cascade = !sessionCatalog.isTemporaryTable(tableIdent)
+import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
+val multipartIdentifier =
+  sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName)
+val cascade = (multipartIdentifier.length <= 2) &&
+  !sessionCatalog.isTemporaryTable(multipartIdentifier.asTableIdentifier)

Review comment:
   can we add an overload of `isTemporaryTable` that takes `Seq[String]`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #30403: [SPARK-33448][SQL] Support CACHE/UNCACHE TABLE commands for v2 tables

2020-11-25 Thread GitBox


cloud-fan commented on a change in pull request #30403:
URL: https://github.com/apache/spark/pull/30403#discussion_r530759172



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
##
@@ -51,32 +54,35 @@ case class CacheTableCommand(
 
 if (storageLevelValue.nonEmpty) {
   sparkSession.catalog.cacheTable(
-tableIdent.quotedString, 
StorageLevel.fromString(storageLevelValue.get))
+tableName, StorageLevel.fromString(storageLevelValue.get))
 } else {
-  sparkSession.catalog.cacheTable(tableIdent.quotedString)
+  sparkSession.catalog.cacheTable(tableName)
 }
 
 if (!isLazy) {
   // Performs eager caching
-  sparkSession.table(tableIdent).count()
+  sparkSession.table(tableName).count()
 }
 
 Seq.empty[Row]
   }
 }
 
-
 case class UncacheTableCommand(
-tableIdent: TableIdentifier,
+multipartIdentifier: Seq[String],
 ifExists: Boolean) extends RunnableCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
-val tableId = tableIdent.quotedString
-if (!ifExists || sparkSession.catalog.tableExists(tableId)) {

Review comment:
   We don't need to fix `CatalogImpl` completely in this PR, but the 
related ones should be fixed. For `tableExists`:
   ```
   def tableExists(table: String) = tableExists(parseMultiPartName(table))
   
   def tableExists(db:String, table: String) = tableExists(Seq(db, table))
   
   private def tableExists(name: Seq[String]) ...
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #30403: [SPARK-33448][SQL] Support CACHE/UNCACHE TABLE commands for v2 tables

2020-11-25 Thread GitBox


cloud-fan commented on a change in pull request #30403:
URL: https://github.com/apache/spark/pull/30403#discussion_r530758190



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
##
@@ -51,32 +54,35 @@ case class CacheTableCommand(
 
 if (storageLevelValue.nonEmpty) {
   sparkSession.catalog.cacheTable(
-tableIdent.quotedString, 
StorageLevel.fromString(storageLevelValue.get))
+tableName, StorageLevel.fromString(storageLevelValue.get))
 } else {
-  sparkSession.catalog.cacheTable(tableIdent.quotedString)
+  sparkSession.catalog.cacheTable(tableName)
 }
 
 if (!isLazy) {
   // Performs eager caching
-  sparkSession.table(tableIdent).count()
+  sparkSession.table(tableName).count()
 }
 
 Seq.empty[Row]
   }
 }
 
-
 case class UncacheTableCommand(
-tableIdent: TableIdentifier,
+multipartIdentifier: Seq[String],
 ifExists: Boolean) extends RunnableCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
-val tableId = tableIdent.quotedString
-if (!ifExists || sparkSession.catalog.tableExists(tableId)) {

Review comment:
   This reminds me that we should check `CatalogImpl` and see if n-part 
name is well supported. Seems like `tableExists` is broken...





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #30403: [SPARK-33448][SQL] Support CACHE/UNCACHE TABLE commands for v2 tables

2020-11-25 Thread GitBox


cloud-fan commented on a change in pull request #30403:
URL: https://github.com/apache/spark/pull/30403#discussion_r530756887



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala
##
@@ -27,7 +26,6 @@ import org.apache.spark.sql.connector.catalog.{Identifier, 
Table, TableCatalog}
  * Physical plan node for dropping a table.
  */
 case class DropTableExec(

Review comment:
   Can we avoid changing it? It's also being changed in 
https://github.com/apache/spark/pull/30491





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #30403: [SPARK-33448][SQL] Support CACHE/UNCACHE TABLE commands for v2 tables

2020-11-25 Thread GitBox


cloud-fan commented on a change in pull request #30403:
URL: https://github.com/apache/spark/pull/30403#discussion_r530608911



##
File path: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
##
@@ -421,7 +421,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils 
with TestHiveSingleto
   activateDatabase(db) {
 sql("REFRESH TABLE default.cachedTable")
 assertCached(
-  sql("SELECT * FROM default.cachedTable"), 
"`default`.`cachedTable`", DISK_ONLY)
+  sql("SELECT * FROM default.cachedTable"), "cachedTable", 
DISK_ONLY)

Review comment:
   Why is `default.` gone here?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #30403: [SPARK-33448][SQL] Support CACHE/UNCACHE TABLE commands for v2 tables

2020-11-25 Thread GitBox


cloud-fan commented on a change in pull request #30403:
URL: https://github.com/apache/spark/pull/30403#discussion_r530337419



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
##
@@ -442,19 +442,11 @@ class ResolveSessionCatalog(
 ShowCreateTableCommand(ident.asTableIdentifier)
   }
 
-case CacheTableStatement(tbl, plan, isLazy, options) =>
-  val name = if (plan.isDefined) {
-// CACHE TABLE ... AS SELECT creates a temp view with the input query.
-// Temp view doesn't belong to any catalog and we shouldn't resolve 
catalog in the name.
-tbl
-  } else {
-parseTempViewOrV1Table(tbl, "CACHE TABLE")
-  }
-  CacheTableCommand(name.asTableIdentifier, plan, isLazy, options)
+case CacheTable(multipartIdent, plan, isLazy, options) =>
+  CacheTableCommand(multipartIdent, plan, isLazy, options)

Review comment:
   it seems unnecessary to have `CacheTable` now. Should the parser create 
`CacheTableCommand` directly?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #30403: [SPARK-33448][SQL] Support CACHE/UNCACHE TABLE commands for v2 tables

2020-11-24 Thread GitBox


cloud-fan commented on a change in pull request #30403:
URL: https://github.com/apache/spark/pull/30403#discussion_r530055643



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
##
@@ -679,3 +679,30 @@ case class TruncateTable(
 partitionSpec: Option[TablePartitionSpec]) extends Command {
   override def children: Seq[LogicalPlan] = child :: Nil
 }
+
+/**
+ * The logical plan of the CACHE TABLE command.
+ */
+case class CacheTable(
+child: LogicalPlan,

Review comment:
   After more thought, I think CACHE TABLE is not a DDL command that needs 
to interact with catalogs, and it doesn't need a v2 version.
   
   The current problem is that `CacheTableCommand` only takes v1 table 
identifier and can't cache v2 tables with n part name. Maybe we can fix 
`CacheTableCommand` directly?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #30403: [SPARK-33448][SQL] Support CACHE/UNCACHE TABLE commands for v2 tables

2020-11-23 Thread GitBox


cloud-fan commented on a change in pull request #30403:
URL: https://github.com/apache/spark/pull/30403#discussion_r529252117



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
##
@@ -2018,28 +2021,32 @@ class DataSourceV2SQLSuite
 }
   }
 
-  test("CACHE TABLE") {
+  test("CACHE/UNCACHE TABLE") {

Review comment:
   shall we also test CACHE TABLE AS SELECT? with temp view exists and not.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #30403: [SPARK-33448][SQL] Support CACHE/UNCACHE TABLE commands for v2 tables

2020-11-23 Thread GitBox


cloud-fan commented on a change in pull request #30403:
URL: https://github.com/apache/spark/pull/30403#discussion_r529251673



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
##
@@ -710,6 +712,7 @@ class DataSourceV2SQLSuite
 
   test("DropTable: basic") {
 val tableName = "testcat.ns1.ns2.tbl"
+sql(s"EXPLAIN EXTENDED DROP TABLE IF EXISTS $tableName").show(false)

Review comment:
   is it for debug?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #30403: [SPARK-33448][SQL] Support CACHE/UNCACHE TABLE commands for v2 tables

2020-11-23 Thread GitBox


cloud-fan commented on a change in pull request #30403:
URL: https://github.com/apache/spark/pull/30403#discussion_r528592990



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
+import org.apache.spark.sql.execution.CacheTableUtils
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * Physical plan node for caching a table.
+ */
+case class CacheTableExec(
+session: SparkSession,

Review comment:
   We don't need this parameter, we can access 
`SparkPlan.sqlContext.sparkSession`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #30403: [SPARK-33448][SQL] Support CACHE/UNCACHE TABLE commands for v2 tables

2020-11-23 Thread GitBox


cloud-fan commented on a change in pull request #30403:
URL: https://github.com/apache/spark/pull/30403#discussion_r528592256



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
+import org.apache.spark.sql.execution.CacheTableUtils
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * Physical plan node for caching a table.
+ */
+case class CacheTableExec(
+session: SparkSession,
+catalog: TableCatalog,
+table: Table,
+ident: Identifier,
+isLazy: Boolean,
+options: Map[String, String]) extends V2CommandExec {
+  override def run(): Seq[InternalRow] = {
+import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
+
+val v2Relation = DataSourceV2Relation.create(table, Some(catalog), 
Some(ident))
+val df = Dataset.ofRows(session, v2Relation)
+val tableName = Some(ident.quoted)
+val optStorageLevel = CacheTableUtils.getStorageLevel(options)
+if (optStorageLevel.nonEmpty) {
+  session.sharedState.cacheManager.cacheQuery(
+df, tableName, StorageLevel.fromString(optStorageLevel.get))
+} else {
+  session.sharedState.cacheManager.cacheQuery(df, tableName)
+}
+
+if (!isLazy) {
+  // Performs eager caching.
+  df.count()
+}
+
+Seq.empty
+  }
+
+  override def output: Seq[Attribute] = Seq.empty
+}
+
+/**
+ * Physical plan node for uncaching a table.
+ */
+case class UncacheTableExec(
+session: SparkSession,
+catalog: TableCatalog,
+table: Table,
+ident: Identifier) extends V2CommandExec {
+  override def run(): Seq[InternalRow] = {
+val v2Relation = DataSourceV2Relation.create(table, Some(catalog), 
Some(ident))
+val df = Dataset.ofRows(session, v2Relation)
+session.sharedState.cacheManager.uncacheQuery(df, cascade = true)

Review comment:
   we can call the `uncacheQuery` method that takes `LogicalPlan`

##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
+import org.apache.spark.sql.execution.CacheTableUtils
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * Physical plan node for caching a table.
+ */
+case class CacheTableExec(
+session: SparkSession,
+catalog: TableCatalog,
+table: Table,
+ident: Identifier,
+isLazy: Boolean,
+options: Map[String, String]) extends V2CommandExec {
+  override def run(): Seq[InternalRow] = {
+import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
+
+val v2Relation = DataSourceV2Relation.create(table, Some(catalog), 
Some(ident))
+val df = Dataset.ofRows(session, v2Relation)
+val 

[GitHub] [spark] cloud-fan commented on a change in pull request #30403: [SPARK-33448][SQL] Support CACHE/UNCACHE TABLE commands for v2 tables

2020-11-23 Thread GitBox


cloud-fan commented on a change in pull request #30403:
URL: https://github.com/apache/spark/pull/30403#discussion_r528591659



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
+import org.apache.spark.sql.execution.CacheTableUtils
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * Physical plan node for caching a table.
+ */
+case class CacheTableExec(
+session: SparkSession,

Review comment:
   we can pass `CacheManager` to be more specific.

##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
+import org.apache.spark.sql.execution.CacheTableUtils
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * Physical plan node for caching a table.
+ */
+case class CacheTableExec(
+session: SparkSession,
+catalog: TableCatalog,
+table: Table,
+ident: Identifier,
+isLazy: Boolean,
+options: Map[String, String]) extends V2CommandExec {
+  override def run(): Seq[InternalRow] = {
+import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
+
+val v2Relation = DataSourceV2Relation.create(table, Some(catalog), 
Some(ident))
+val df = Dataset.ofRows(session, v2Relation)
+val tableName = Some(ident.quoted)
+val optStorageLevel = CacheTableUtils.getStorageLevel(options)
+if (optStorageLevel.nonEmpty) {
+  session.sharedState.cacheManager.cacheQuery(
+df, tableName, StorageLevel.fromString(optStorageLevel.get))
+} else {
+  session.sharedState.cacheManager.cacheQuery(df, tableName)
+}
+
+if (!isLazy) {
+  // Performs eager caching.
+  df.count()
+}
+
+Seq.empty
+  }
+
+  override def output: Seq[Attribute] = Seq.empty
+}
+
+/**
+ * Physical plan node for uncaching a table.
+ */
+case class UncacheTableExec(
+session: SparkSession,

Review comment:
   ditto





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #30403: [SPARK-33448][SQL] Support CACHE/UNCACHE TABLE commands for v2 tables

2020-11-23 Thread GitBox


cloud-fan commented on a change in pull request #30403:
URL: https://github.com/apache/spark/pull/30403#discussion_r528589715



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
##
@@ -3343,14 +3343,21 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] 
with SQLConfHelper with Logg
 "the table name in CACHE TABLE AS SELECT", ctx)
 }
 val options = 
Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
-CacheTableStatement(tableName, query, ctx.LAZY != null, options)
+val isLazy = ctx.LAZY != null
+if (query.isDefined) {
+  CacheTableAsSelect(tableName.head, query.get, isLazy, options)

Review comment:
   nit: now we can move the if `tableName.length > 1` check into this 
branch.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org