This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 80da53ed1dfd [SPARK-52143][TESTS] Add test for table constraints with `current_*` functions 80da53ed1dfd is described below commit 80da53ed1dfd9ddd2ca9274f33b1fb20b7402c82 Author: Gengliang Wang <gengli...@apache.org> AuthorDate: Fri May 23 11:58:37 2025 -0700 [SPARK-52143][TESTS] Add test for table constraints with `current_*` functions ### What changes were proposed in this pull request? Add test for table constraints with `current_*` functions ### Why are the changes needed? Improve test coverage ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT ### Was this patch authored or co-authored using generative AI tooling? No Closes #50993 from gengliangwang/testCurrent. Authored-by: Gengliang Wang <gengli...@apache.org> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../command/v2/CheckConstraintSuite.scala | 242 +++++++++++++++++++++ 1 file changed, 242 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CheckConstraintSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CheckConstraintSuite.scala index a734f8507dac..397d9248f628 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CheckConstraintSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CheckConstraintSuite.scala @@ -371,6 +371,248 @@ class CheckConstraintSuite extends QueryTest with CommandSuiteBase with DDLComma } } + test("Check constraint with current_timestamp function") { + withNamespaceAndTable("ns", "tbl", nonPartitionCatalog) { t => + // Create table with a constraint using current_timestamp + sql(s"CREATE TABLE $t (id INT, creation_time TIMESTAMP, " + + s"CONSTRAINT valid_time CHECK (creation_time <= current_timestamp())) $defaultUsing") + + // Insert valid data (current or past timestamp) + sql(s"INSERT INTO $t VALUES (1, current_timestamp()), (2, TIMESTAMP '2020-01-01 00:00:00')") + checkAnswer(spark.table(t).select("id"), Seq(Row(1), Row(2))) + + // Insert valid data with null timestamp + sql(s"INSERT INTO $t VALUES (3, null)") + checkAnswer(spark.table(t).select("id"), Seq(Row(1), Row(2), Row(3))) + + // Future timestamp should fail validation + val tomorrow = "current_timestamp() + INTERVAL 1 DAY" + val error = intercept[SparkRuntimeException] { + sql(s"INSERT INTO $t VALUES (4, $tomorrow)") + } + assert(error.getMessage.contains("CHECK_CONSTRAINT_VIOLATION")) + assert(error.getMessageParameters.get("constraintName") == "valid_time") + assert(error.getMessageParameters.get("expression") == "creation_time <= current_timestamp()") + } + } + + test("Check constraint with current_timestamp function - update operation") { + withNamespaceAndTable("ns", "tbl", rowLevelOPCatalog) { t => + // Create table with a constraint using current_timestamp + sql(s"CREATE TABLE $t (id INT, creation_time TIMESTAMP, " + + s"CONSTRAINT valid_time CHECK (creation_time <= current_timestamp())) $defaultUsing") + + // Insert initial data + sql(s"INSERT INTO $t VALUES (1, current_timestamp()), (2, TIMESTAMP '2020-01-01 00:00:00')") + + // Valid update with current or past timestamp + sql(s"UPDATE $t SET creation_time = TIMESTAMP '2021-01-01 00:00:00' WHERE id = 1") + checkAnswer(spark.table(t).select("id") + .where("creation_time = TIMESTAMP '2021-01-01 00:00:00'"), + Seq(Row(1))) + + // Valid update with null timestamp + sql(s"UPDATE $t SET creation_time = null WHERE id = 2") + checkAnswer(sql(s"SELECT * FROM $t WHERE id = 2"), Seq(Row(2, null))) + + // Future timestamp should fail validation + val tomorrow = "current_timestamp() + INTERVAL 1 DAY" + val error = intercept[SparkRuntimeException] { + sql(s"UPDATE $t SET creation_time = $tomorrow WHERE id = 1") + } + assert(error.getMessage.contains("CHECK_CONSTRAINT_VIOLATION")) + assert(error.getMessageParameters.get("constraintName") == "valid_time") + assert(error.getMessageParameters.get("expression") == "creation_time <= current_timestamp()") + } + } + + test("Check constraint with current_timestamp function - merge operation") { + withNamespaceAndTable("ns", "tbl", rowLevelOPCatalog) { target => + withNamespaceAndTable("ns", "source", rowLevelOPCatalog) { source => + // Create target table with constraint using current_timestamp + sql(s"CREATE TABLE $target (id INT, creation_time TIMESTAMP, " + + s"CONSTRAINT valid_time CHECK (creation_time <= current_timestamp())) $defaultUsing") + + // Create source table without constraints + sql(s"CREATE TABLE $source (id INT, creation_time TIMESTAMP) $defaultUsing") + + // Insert initial data + sql(s"INSERT INTO $target VALUES (1, TIMESTAMP '2020-01-01 00:00:00')") + sql(s"INSERT INTO $source VALUES " + + s"(2, TIMESTAMP '2021-01-01 00:00:00'), " + + s"(3, current_timestamp()), " + + s"(4, null)") + + // Valid merge with past timestamps or null + sql( + s""" + |MERGE INTO $target t + |USING (SELECT * FROM $source WHERE id IN (2, 4)) s + |ON t.id = s.id + |WHEN MATCHED THEN UPDATE SET creation_time = s.creation_time + |WHEN NOT MATCHED THEN INSERT (id, creation_time) VALUES (s.id, s.creation_time) + |""".stripMargin) + + checkAnswer(spark.table(target).orderBy("id"), + Seq(Row(1, java.sql.Timestamp.valueOf("2020-01-01 00:00:00")), + Row(2, java.sql.Timestamp.valueOf("2021-01-01 00:00:00")), + Row(4, null))) + + // Future timestamp should fail validation + val tomorrow = "current_timestamp() + INTERVAL 1 DAY" + sql(s"INSERT INTO $source VALUES (5, $tomorrow)") + + val error = intercept[SparkRuntimeException] { + sql( + s""" + |MERGE INTO $target t + |USING (SELECT * FROM $source WHERE id = 5) s + |ON t.id = s.id + |WHEN NOT MATCHED THEN INSERT (id, creation_time) VALUES (s.id, s.creation_time) + |""".stripMargin) + } + + assert(error.getMessage.contains("CHECK_CONSTRAINT_VIOLATION")) + assert(error.getMessageParameters.get("constraintName") == "valid_time") + assert(error.getMessageParameters.get("expression") == + "creation_time <= current_timestamp()") + } + } + } + + test("Check constraint with current_date function") { + // Create another table with other current_* functions + withNamespaceAndTable("ns", "tbl", nonPartitionCatalog) { t => + sql(s"CREATE TABLE $t (id INT, creation_date DATE, " + + s"CONSTRAINT valid_date CHECK (creation_date <= current_date())) $defaultUsing") + + // Insert valid data (current or past timestamp) + sql(s"INSERT INTO $t VALUES (1, current_date()), (2, DATE'2020-01-01')") + checkAnswer(spark.table(t).select("id"), Seq(Row(1), Row(2))) + + // Future date should fail validation + val tomorrow = "DATE'9999-12-31'" + val error = intercept[SparkRuntimeException] { + sql(s"INSERT INTO $t VALUES (3, $tomorrow)") + } + checkError( + exception = error, + condition = "CHECK_CONSTRAINT_VIOLATION", + sqlState = "23001", + parameters = Map( + "constraintName" -> "valid_date", + "expression" -> "creation_date <= current_date()", + "values" -> " - creation_date : 2932896" + ) + ) + } + } + + test("Check constraint with current_database function") { + withNamespaceAndTable("test_db", "tbl", nonPartitionCatalog) { t => + sql(s"USE $nonPartitionCatalog.test_db") + sql(s"CREATE TABLE $t (id INT, db STRING, " + + s"CONSTRAINT valid_db CHECK (db = current_database())) $defaultUsing") + + // Insert valid data (current database) + sql(s"INSERT INTO $t VALUES (1, current_database()), (2, 'test_db')") + checkAnswer(spark.table(t).select("id"), Seq(Row(1), Row(2))) + + // Invalid database should fail validation + val error = intercept[SparkRuntimeException] { + sql(s"INSERT INTO $t VALUES (3, 'invalid_db')") + } + checkError( + exception = error, + condition = "CHECK_CONSTRAINT_VIOLATION", + sqlState = "23001", + parameters = Map( + "constraintName" -> "valid_db", + "expression" -> "db = current_database()", + "values" -> " - db : invalid_db" + ) + ) + } + } + + test("Check constraint with current_database function - update operation") { + withNamespaceAndTable("test_db", "tbl", rowLevelOPCatalog) { t => + sql(s"USE $rowLevelOPCatalog.test_db") + sql(s"CREATE TABLE $t (id INT, db STRING, " + + s"CONSTRAINT valid_db CHECK (db = current_database())) $defaultUsing") + + // Insert initial valid data + sql(s"INSERT INTO $t VALUES (1, current_database()), (2, 'test_db')") + checkAnswer(spark.table(t).select("id"), Seq(Row(1), Row(2))) + + // Valid update with current database value + sql(s"UPDATE $t SET db = 'test_db' WHERE id = 1") + checkAnswer(spark.table(t).where("id = 1"), Seq(Row(1, "test_db"))) + + // Invalid database should fail validation + val error = intercept[SparkRuntimeException] { + sql(s"UPDATE $t SET db = 'invalid_db' WHERE id = 2") + } + checkError( + exception = error, + condition = "CHECK_CONSTRAINT_VIOLATION", + sqlState = "23001", + parameters = Map( + "constraintName" -> "valid_db", + "expression" -> "db = current_database()", + "values" -> " - db : invalid_db" + ) + ) + } + } + + test("Check constraint with current_database function - merge operation") { + withNamespaceAndTable("test_db", "target", rowLevelOPCatalog) { target => + withNamespaceAndTable("test_db", "source", rowLevelOPCatalog) { source => + sql(s"USE $rowLevelOPCatalog.test_db") + sql(s"CREATE TABLE $target (id INT, db STRING, " + + s"CONSTRAINT valid_db CHECK (db = current_database())) $defaultUsing") + sql(s"CREATE TABLE $source (id INT, db STRING) $defaultUsing") + + // Insert initial valid data + sql(s"INSERT INTO $target VALUES (1, current_database()), (2, 'test_db')") + sql(s"INSERT INTO $source VALUES (3, 'test_db'), (4, 'invalid_db')") + + // Valid merge with current database value + sql( + s""" + |MERGE INTO $target t + |USING (SELECT * FROM $source WHERE id = 3) s + |ON t.id = s.id + |WHEN NOT MATCHED THEN INSERT (id, db) VALUES (s.id, s.db) + |""".stripMargin) + checkAnswer(spark.table(target).orderBy("id"), + Seq(Row(1, "test_db"), Row(2, "test_db"), Row(3, "test_db"))) + + // Invalid database should fail validation + val error = intercept[SparkRuntimeException] { + sql( + s""" + |MERGE INTO $target t + |USING (SELECT * FROM $source WHERE id = 4) s + |ON t.id = s.id + |WHEN NOT MATCHED THEN INSERT (id, db) VALUES (s.id, s.db) + |""".stripMargin) + } + checkError( + exception = error, + condition = "CHECK_CONSTRAINT_VIOLATION", + sqlState = "23001", + parameters = Map( + "constraintName" -> "valid_db", + "expression" -> "db = current_database()", + "values" -> " - db : invalid_db" + ) + ) + } + } + } + test("Check constraint violation on table insert - nested column") { withNamespaceAndTable("ns", "tbl", nonPartitionCatalog) { t => sql(s"CREATE TABLE $t (id INT," + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org