This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 04c3125 [SPARK-34360][SQL] Support truncation of v2 tables 04c3125 is described below commit 04c3125dcfb2a40b13eef443e5b543795aa31c34 Author: Max Gekk <max.g...@gmail.com> AuthorDate: Sun Feb 21 17:50:38 2021 +0900 [SPARK-34360][SQL] Support truncation of v2 tables ### What changes were proposed in this pull request? 1. Add new interface `TruncatableTable` which represents tables that allow atomic truncation. 2. Implement new method in `InMemoryTable` and in `InMemoryPartitionTable`. ### Why are the changes needed? To support `TRUNCATE TABLE` for v2 tables. ### Does this PR introduce _any_ user-facing change? Should not. ### How was this patch tested? Added new tests to `TableCatalogSuite` that check truncation of non-partitioned and partitioned tables: ``` $ build/sbt "test:testOnly *TableCatalogSuite" ``` Closes #31475 from MaxGekk/dsv2-truncate-table. Authored-by: Max Gekk <max.g...@gmail.com> Signed-off-by: HyukjinKwon <gurwls...@apache.org> --- .../sql/connector/catalog/SupportsDelete.java | 13 ++++++- .../sql/connector/catalog/TruncatableTable.java | 35 ++++++++++++++++++ .../apache/spark/sql/connector/InMemoryTable.scala | 4 +- .../sql/connector/catalog/TableCatalogSuite.scala | 43 +++++++++++++++++++++- 4 files changed, 92 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsDelete.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsDelete.java index 8f51f4e..6a28bca 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsDelete.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsDelete.java @@ -18,6 +18,7 @@ package org.apache.spark.sql.connector.catalog; import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.sources.AlwaysTrue; import org.apache.spark.sql.sources.Filter; /** @@ -27,7 +28,7 @@ import org.apache.spark.sql.sources.Filter; * @since 3.0.0 */ @Evolving -public interface SupportsDelete { +public interface SupportsDelete extends TruncatableTable { /** * Checks whether it is possible to delete data from a data source table that matches filter @@ -68,4 +69,14 @@ public interface SupportsDelete { * @throws IllegalArgumentException If the delete is rejected due to required effort */ void deleteWhere(Filter[] filters); + + @Override + default boolean truncateTable() { + Filter[] filters = new Filter[] { new AlwaysTrue() }; + boolean canDelete = canDeleteWhere(filters); + if (canDelete) { + deleteWhere(filters); + } + return canDelete; + } } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TruncatableTable.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TruncatableTable.java new file mode 100644 index 0000000..a69f384 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TruncatableTable.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog; + +import org.apache.spark.annotation.Evolving; + +/** + * Represents a table which can be atomically truncated. + */ +@Evolving +public interface TruncatableTable extends Table { + /** + * Truncate a table by removing all rows from the table atomically. + * + * @return true if a table was truncated successfully otherwise false + * + * @since 3.2.0 + */ + boolean truncateTable(); +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala index df531b1..00de3a4 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.connector.expressions.{BucketTransform, DaysTransfor import org.apache.spark.sql.connector.read._ import org.apache.spark.sql.connector.write._ import org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, StreamingWrite} -import org.apache.spark.sql.sources.{And, EqualNullSafe, EqualTo, Filter, IsNotNull, IsNull} +import org.apache.spark.sql.sources.{AlwaysTrue, And, EqualNullSafe, EqualTo, Filter, IsNotNull, IsNull} import org.apache.spark.sql.types.{DataType, DateType, IntegerType, StringType, StructField, StructType, TimestampType} import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.unsafe.types.UTF8String @@ -419,6 +419,7 @@ object InMemoryTable { null == extractValue(attr, partitionNames, partValues) case IsNotNull(attr) => null != extractValue(attr, partitionNames, partValues) + case AlwaysTrue() => true case f => throw new IllegalArgumentException(s"Unsupported filter type: $f") } @@ -431,6 +432,7 @@ object InMemoryTable { case _: EqualNullSafe => true case _: IsNull => true case _: IsNotNull => true + case _: AlwaysTrue => true case _ => false } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/TableCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/TableCatalogSuite.scala index ef342e7..485e41f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/TableCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/TableCatalogSuite.scala @@ -23,9 +23,11 @@ import java.util.Collections import scala.collection.JavaConverters._ import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser -import org.apache.spark.sql.connector.InMemoryTableCatalog +import org.apache.spark.sql.connector.{BufferedRows, InMemoryPartitionTable, InMemoryPartitionTableCatalog, InMemoryTable, InMemoryTableCatalog} +import org.apache.spark.sql.connector.expressions.LogicalExpressions import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructField, StructType, TimestampType} import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -887,4 +889,43 @@ class TableCatalogSuite extends SparkFunSuite { assert(exc.getMessage.contains(testNs.quoted)) } + + test("truncate non-partitioned table") { + val catalog = newCatalog() + + val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + .asInstanceOf[InMemoryTable] + table.withData(Array( + new BufferedRows("3").withRow(InternalRow(0, "abc", "3")), + new BufferedRows("4").withRow(InternalRow(1, "def", "4")))) + assert(table.truncateTable()) + assert(table.rows.isEmpty) + } + + test("truncate partitioned table") { + val partCatalog = new InMemoryPartitionTableCatalog + partCatalog.initialize("test", CaseInsensitiveStringMap.empty()) + + val table = partCatalog.createTable( + testIdent, + new StructType() + .add("col0", IntegerType) + .add("part0", IntegerType), + Array(LogicalExpressions.identity(LogicalExpressions.parseReference("part0"))), + util.Collections.emptyMap[String, String]) + val partTable = table.asInstanceOf[InMemoryPartitionTable] + val partIdent = InternalRow.apply(0) + val partIdent1 = InternalRow.apply(1) + partTable.createPartition(partIdent, new util.HashMap[String, String]()) + partTable.createPartition(partIdent1, new util.HashMap[String, String]()) + partTable.withData(Array( + new BufferedRows("0").withRow(InternalRow(0, 0)), + new BufferedRows("1").withRow(InternalRow(1, 1)) + )) + assert(partTable.listPartitionIdentifiers(Array.empty, InternalRow.empty).length == 2) + assert(!partTable.rows.isEmpty) + assert(partTable.truncateTable()) + assert(partTable.listPartitionIdentifiers(Array.empty, InternalRow.empty).length == 2) + assert(partTable.rows.isEmpty) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org