[GitHub] [spark] imback82 commented on a change in pull request #30403: [SPARK-33448][SQL] Support CACHE/UNCACHE TABLE commands for v2 tables
imback82 commented on a change in pull request #30403: URL: https://github.com/apache/spark/pull/30403#discussion_r534502351 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala ## @@ -19,26 +19,27 @@ package org.apache.spark.sql.execution.command import java.util.Locale -import org.apache.spark.sql.{Dataset, Row, SparkSession} -import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, Row, SparkSession} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LogicalPlan} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper import org.apache.spark.storage.StorageLevel case class CacheTableCommand( Review comment: One issue I am encountering by moving to the v2 framework (for v2 tables) is the following. When `CACHE TABLE testcat.tbl` is run, `tbl` is changed from `DataSourceV2Relation` to `DataSourceV2ScanRelation` in `V2ScanRelationPushDown` rule, now that the plan goes thru analyzer, optimizer, etc. But, if I run `spark.table("testcat.tbl")`, the query execution has `tbl` as `DataSourceV2Relation`, thus cache is not applied. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] imback82 commented on a change in pull request #30403: [SPARK-33448][SQL] Support CACHE/UNCACHE TABLE commands for v2 tables
imback82 commented on a change in pull request #30403: URL: https://github.com/apache/spark/pull/30403#discussion_r533823907 ## File path: sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala ## @@ -2555,11 +2557,15 @@ class DataSourceV2SQLSuite } } - private def testNotSupportedV2Command(sqlCommand: String, sqlParams: String): Unit = { + private def testNotSupportedV2Command( Review comment: Ok, will fix. ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala ## @@ -19,26 +19,27 @@ package org.apache.spark.sql.execution.command import java.util.Locale -import org.apache.spark.sql.{Dataset, Row, SparkSession} -import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, Row, SparkSession} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LogicalPlan} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper import org.apache.spark.storage.StorageLevel case class CacheTableCommand( Review comment: OK, will do. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] imback82 commented on a change in pull request #30403: [SPARK-33448][SQL] Support CACHE/UNCACHE TABLE commands for v2 tables
imback82 commented on a change in pull request #30403: URL: https://github.com/apache/spark/pull/30403#discussion_r531440623 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala ## @@ -51,32 +54,35 @@ case class CacheTableCommand( if (storageLevelValue.nonEmpty) { sparkSession.catalog.cacheTable( -tableIdent.quotedString, StorageLevel.fromString(storageLevelValue.get)) +tableName, StorageLevel.fromString(storageLevelValue.get)) } else { - sparkSession.catalog.cacheTable(tableIdent.quotedString) + sparkSession.catalog.cacheTable(tableName) } if (!isLazy) { // Performs eager caching - sparkSession.table(tableIdent).count() + sparkSession.table(tableName).count() } Seq.empty[Row] } } - case class UncacheTableCommand( -tableIdent: TableIdentifier, +multipartIdentifier: Seq[String], ifExists: Boolean) extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { -val tableId = tableIdent.quotedString -if (!ifExists || sparkSession.catalog.tableExists(tableId)) { Review comment: Ah, I see your point. `CatalogImpl.cacheTable` already supports v2 tables since it resolves the table name by `sparkSession.table(tableName)`. But, I think `CatalogImpl.cacheTable`, `CatalogImpl.unCacheTable` should only support v1 tables, and `CacheTableCommand` and `UncacheTableCommand` should go thru `sparkSession.sharedState.cacheManager`, right? I will update accordingly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] imback82 commented on a change in pull request #30403: [SPARK-33448][SQL] Support CACHE/UNCACHE TABLE commands for v2 tables
imback82 commented on a change in pull request #30403: URL: https://github.com/apache/spark/pull/30403#discussion_r530763650 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala ## @@ -51,32 +54,35 @@ case class CacheTableCommand( if (storageLevelValue.nonEmpty) { sparkSession.catalog.cacheTable( -tableIdent.quotedString, StorageLevel.fromString(storageLevelValue.get)) +tableName, StorageLevel.fromString(storageLevelValue.get)) } else { - sparkSession.catalog.cacheTable(tableIdent.quotedString) + sparkSession.catalog.cacheTable(tableName) } if (!isLazy) { // Performs eager caching - sparkSession.table(tableIdent).count() + sparkSession.table(tableName).count() } Seq.empty[Row] } } - case class UncacheTableCommand( -tableIdent: TableIdentifier, +multipartIdentifier: Seq[String], ifExists: Boolean) extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { -val tableId = tableIdent.quotedString -if (!ifExists || sparkSession.catalog.tableExists(tableId)) { Review comment: `CatalogImpl` is for session catalog, so it cannot support v2 tables, no? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] imback82 commented on a change in pull request #30403: [SPARK-33448][SQL] Support CACHE/UNCACHE TABLE commands for v2 tables
imback82 commented on a change in pull request #30403: URL: https://github.com/apache/spark/pull/30403#discussion_r530762754 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala ## @@ -27,7 +26,6 @@ import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} * Physical plan node for dropping a table. */ case class DropTableExec( Review comment: OK, will revert. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] imback82 commented on a change in pull request #30403: [SPARK-33448][SQL] Support CACHE/UNCACHE TABLE commands for v2 tables
imback82 commented on a change in pull request #30403: URL: https://github.com/apache/spark/pull/30403#discussion_r530762698 ## File path: sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala ## @@ -471,8 +471,11 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { * @since 2.0.0 */ override def uncacheTable(tableName: String): Unit = { -val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) -val cascade = !sessionCatalog.isTemporaryTable(tableIdent) +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper +val multipartIdentifier = + sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName) +val cascade = (multipartIdentifier.length <= 2) && + !sessionCatalog.isTemporaryTable(multipartIdentifier.asTableIdentifier) Review comment: Looks like the overload already exists as `isTempView`. :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] imback82 commented on a change in pull request #30403: [SPARK-33448][SQL] Support CACHE/UNCACHE TABLE commands for v2 tables
imback82 commented on a change in pull request #30403: URL: https://github.com/apache/spark/pull/30403#discussion_r530652131 ## File path: sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala ## @@ -421,7 +421,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto activateDatabase(db) { sql("REFRESH TABLE default.cachedTable") assertCached( - sql("SELECT * FROM default.cachedTable"), "`default`.`cachedTable`", DISK_ONLY) + sql("SELECT * FROM default.cachedTable"), "cachedTable", DISK_ONLY) Review comment: Because the cache was created with `CACHE TABLE cachedTable OPTIONS('storageLevel' 'DISK_ONLY')`. Before, `cachedTable` was resolved to `default.cachedTable` in `ResolveSessionCatalog`. Note that it's just a name of cache builder and doesn't affect `spark.catalog.isCached(tableName: String)`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] imback82 commented on a change in pull request #30403: [SPARK-33448][SQL] Support CACHE/UNCACHE TABLE commands for v2 tables
imback82 commented on a change in pull request #30403: URL: https://github.com/apache/spark/pull/30403#discussion_r530571035 ## File path: sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala ## @@ -421,7 +421,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto activateDatabase(db) { sql("REFRESH TABLE default.cachedTable") assertCached( - sql("SELECT * FROM default.cachedTable"), "`default`.`cachedTable`", DISK_ONLY) + sql("SELECT * FROM default.cachedTable"), "cachedTable", DISK_ONLY) Review comment: ~~@cloud-fan Looks like we need to resolve catalog/current namespace for this scenario?~~ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] imback82 commented on a change in pull request #30403: [SPARK-33448][SQL] Support CACHE/UNCACHE TABLE commands for v2 tables
imback82 commented on a change in pull request #30403: URL: https://github.com/apache/spark/pull/30403#discussion_r530606256 ## File path: sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala ## @@ -421,7 +421,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto activateDatabase(db) { sql("REFRESH TABLE default.cachedTable") assertCached( - sql("SELECT * FROM default.cachedTable"), "`default`.`cachedTable`", DISK_ONLY) + sql("SELECT * FROM default.cachedTable"), "cachedTable", DISK_ONLY) Review comment: I guess this is OK as long as `spark.catalog.isCached` works fine since it's just a name of cache builder. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] imback82 commented on a change in pull request #30403: [SPARK-33448][SQL] Support CACHE/UNCACHE TABLE commands for v2 tables
imback82 commented on a change in pull request #30403: URL: https://github.com/apache/spark/pull/30403#discussion_r530606256 ## File path: sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala ## @@ -421,7 +421,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto activateDatabase(db) { sql("REFRESH TABLE default.cachedTable") assertCached( - sql("SELECT * FROM default.cachedTable"), "`default`.`cachedTable`", DISK_ONLY) + sql("SELECT * FROM default.cachedTable"), "cachedTable", DISK_ONLY) Review comment: I guess this is OK as long as `spark.catalog.isCached` works fine since it's just a name of cache builder? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] imback82 commented on a change in pull request #30403: [SPARK-33448][SQL] Support CACHE/UNCACHE TABLE commands for v2 tables
imback82 commented on a change in pull request #30403: URL: https://github.com/apache/spark/pull/30403#discussion_r530571035 ## File path: sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala ## @@ -421,7 +421,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto activateDatabase(db) { sql("REFRESH TABLE default.cachedTable") assertCached( - sql("SELECT * FROM default.cachedTable"), "`default`.`cachedTable`", DISK_ONLY) + sql("SELECT * FROM default.cachedTable"), "cachedTable", DISK_ONLY) Review comment: @cloud-fan Looks like we need to resolve catalog/current namespace for this scenario? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] imback82 commented on a change in pull request #30403: [SPARK-33448][SQL] Support CACHE/UNCACHE TABLE commands for v2 tables
imback82 commented on a change in pull request #30403: URL: https://github.com/apache/spark/pull/30403#discussion_r530121226 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala ## @@ -679,3 +679,30 @@ case class TruncateTable( partitionSpec: Option[TablePartitionSpec]) extends Command { override def children: Seq[LogicalPlan] = child :: Nil } + +/** + * The logical plan of the CACHE TABLE command. + */ +case class CacheTable( +child: LogicalPlan, Review comment: Also, since it's not resolving to catalogs, we should move it out of `ResolveSessionCatalog`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] imback82 commented on a change in pull request #30403: [SPARK-33448][SQL] Support CACHE/UNCACHE TABLE commands for v2 tables
imback82 commented on a change in pull request #30403: URL: https://github.com/apache/spark/pull/30403#discussion_r530117606 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala ## @@ -679,3 +679,30 @@ case class TruncateTable( partitionSpec: Option[TablePartitionSpec]) extends Command { override def children: Seq[LogicalPlan] = child :: Nil } + +/** + * The logical plan of the CACHE TABLE command. + */ +case class CacheTable( +child: LogicalPlan, Review comment: I updated `CacheTableCommand` and `uncacheTable` to support multiparts name (and not resolving the identifier). Please check what you think about the new approach. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] imback82 commented on a change in pull request #30403: [SPARK-33448][SQL] Support CACHE/UNCACHE TABLE commands for v2 tables
imback82 commented on a change in pull request #30403: URL: https://github.com/apache/spark/pull/30403#discussion_r529273422 ## File path: sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala ## @@ -2018,28 +2021,32 @@ class DataSourceV2SQLSuite } } - test("CACHE TABLE") { + test("CACHE/UNCACHE TABLE") { Review comment: Added a test with an existing temp view to `CachedTableSuite`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] imback82 commented on a change in pull request #30403: [SPARK-33448][SQL] Support CACHE/UNCACHE TABLE commands for v2 tables
imback82 commented on a change in pull request #30403: URL: https://github.com/apache/spark/pull/30403#discussion_r529272864 ## File path: sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala ## @@ -710,6 +712,7 @@ class DataSourceV2SQLSuite test("DropTable: basic") { val tableName = "testcat.ns1.ns2.tbl" +sql(s"EXPLAIN EXTENDED DROP TABLE IF EXISTS $tableName").show(false) Review comment: Oops. removed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] imback82 commented on a change in pull request #30403: [SPARK-33448][SQL] Support CACHE/UNCACHE TABLE commands for v2 tables
imback82 commented on a change in pull request #30403: URL: https://github.com/apache/spark/pull/30403#discussion_r528907060 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala ## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.{Dataset, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} +import org.apache.spark.sql.execution.CacheTableUtils +import org.apache.spark.storage.StorageLevel + +/** + * Physical plan node for caching a table. + */ +case class CacheTableExec( +session: SparkSession, +catalog: TableCatalog, +table: Table, +ident: Identifier, +isLazy: Boolean, +options: Map[String, String]) extends V2CommandExec { + override def run(): Seq[InternalRow] = { +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper + +val v2Relation = DataSourceV2Relation.create(table, Some(catalog), Some(ident)) +val df = Dataset.ofRows(session, v2Relation) +val tableName = Some(ident.quoted) +val optStorageLevel = CacheTableUtils.getStorageLevel(options) +if (optStorageLevel.nonEmpty) { + session.sharedState.cacheManager.cacheQuery( +df, tableName, StorageLevel.fromString(optStorageLevel.get)) Review comment: `tableName` is used only for display purpose (e.g., `InMemoryTableScanExec`). The `cachedData` is matched by the logical plan, so I think the current approach is OK. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] imback82 commented on a change in pull request #30403: [SPARK-33448][SQL] Support CACHE/UNCACHE TABLE commands for v2 tables
imback82 commented on a change in pull request #30403: URL: https://github.com/apache/spark/pull/30403#discussion_r528907060 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala ## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.{Dataset, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} +import org.apache.spark.sql.execution.CacheTableUtils +import org.apache.spark.storage.StorageLevel + +/** + * Physical plan node for caching a table. + */ +case class CacheTableExec( +session: SparkSession, +catalog: TableCatalog, +table: Table, +ident: Identifier, +isLazy: Boolean, +options: Map[String, String]) extends V2CommandExec { + override def run(): Seq[InternalRow] = { +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper + +val v2Relation = DataSourceV2Relation.create(table, Some(catalog), Some(ident)) +val df = Dataset.ofRows(session, v2Relation) +val tableName = Some(ident.quoted) +val optStorageLevel = CacheTableUtils.getStorageLevel(options) +if (optStorageLevel.nonEmpty) { + session.sharedState.cacheManager.cacheQuery( +df, tableName, StorageLevel.fromString(optStorageLevel.get)) Review comment: `tableName` is used only for display purpose (e.g., `InMemoryTableScanExec`). The `cachedData` is matched by the logical plan, so I think the current approach should be OK. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] imback82 commented on a change in pull request #30403: [SPARK-33448][SQL] Support CACHE/UNCACHE TABLE commands for v2 tables
imback82 commented on a change in pull request #30403: URL: https://github.com/apache/spark/pull/30403#discussion_r528907060 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala ## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.{Dataset, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} +import org.apache.spark.sql.execution.CacheTableUtils +import org.apache.spark.storage.StorageLevel + +/** + * Physical plan node for caching a table. + */ +case class CacheTableExec( +session: SparkSession, +catalog: TableCatalog, +table: Table, +ident: Identifier, +isLazy: Boolean, +options: Map[String, String]) extends V2CommandExec { + override def run(): Seq[InternalRow] = { +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper + +val v2Relation = DataSourceV2Relation.create(table, Some(catalog), Some(ident)) +val df = Dataset.ofRows(session, v2Relation) +val tableName = Some(ident.quoted) +val optStorageLevel = CacheTableUtils.getStorageLevel(options) +if (optStorageLevel.nonEmpty) { + session.sharedState.cacheManager.cacheQuery( +df, tableName, StorageLevel.fromString(optStorageLevel.get)) Review comment: `tableName` is used only for display purpose (e.g., `InMemoryTableScanExec`). The `cachedData` is matched by the logical plan, so I think it should be OK. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] imback82 commented on a change in pull request #30403: [SPARK-33448][SQL] Support CACHE/UNCACHE TABLE commands for v2 tables
imback82 commented on a change in pull request #30403: URL: https://github.com/apache/spark/pull/30403#discussion_r528399664 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala ## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.{Dataset, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} +import org.apache.spark.sql.execution.CacheTableUtils +import org.apache.spark.storage.StorageLevel + +/** + * Physical plan node for caching a table. + */ +case class CacheTableExec( +session: SparkSession, +catalog: TableCatalog, +table: Table, +ident: Identifier, +isLazy: Boolean, +options: Map[String, String]) extends V2CommandExec { + override def run(): Seq[InternalRow] = { +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper + +val v2Relation = DataSourceV2Relation.create(table, Some(catalog), Some(ident)) +val df = Dataset.ofRows(session, v2Relation) +val tableName = Some(ident.quoted) +val optStorageLevel = CacheTableUtils.getStorageLevel(options) +if (optStorageLevel.nonEmpty) { + session.sharedState.cacheManager.cacheQuery( +df, tableName, StorageLevel.fromString(optStorageLevel.get)) +} else { + session.sharedState.cacheManager.cacheQuery(df, tableName) +} + +if (!isLazy) { + // Performs eager caching. + df.count() +} + +Seq.empty + } + + override def output: Seq[Attribute] = Seq.empty +} + +/** + * Physical plan node for uncaching a table. + */ +case class UncacheTableExec( +session: SparkSession, +catalog: TableCatalog, +table: Table, +ident: Identifier) extends V2CommandExec { + override def run(): Seq[InternalRow] = { +val v2Relation = DataSourceV2Relation.create(table, Some(catalog), Some(ident)) +val df = Dataset.ofRows(session, v2Relation) +// Cascade should be true unless a temporary view is uncached. Review comment: OK, I removed this comment. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] imback82 commented on a change in pull request #30403: [SPARK-33448][SQL] Support CACHE/UNCACHE TABLE commands for v2 tables
imback82 commented on a change in pull request #30403: URL: https://github.com/apache/spark/pull/30403#discussion_r528399639 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala ## @@ -3323,7 +3323,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg } /** - * Create a [[CacheTableStatement]]. + * Create a [[CacheTable]] pr [[CacheTableAsSelect]]. Review comment: Thanks, fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org