This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 80da53ed1dfd [SPARK-52143][TESTS] Add test for table constraints with 
`current_*` functions
80da53ed1dfd is described below

commit 80da53ed1dfd9ddd2ca9274f33b1fb20b7402c82
Author: Gengliang Wang <gengli...@apache.org>
AuthorDate: Fri May 23 11:58:37 2025 -0700

    [SPARK-52143][TESTS] Add test for table constraints with `current_*` 
functions
    
    ### What changes were proposed in this pull request?
    
    Add test for table constraints with `current_*` functions
    
    ### Why are the changes needed?
    
    Improve test coverage
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    UT
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #50993 from gengliangwang/testCurrent.
    
    Authored-by: Gengliang Wang <gengli...@apache.org>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../command/v2/CheckConstraintSuite.scala          | 242 +++++++++++++++++++++
 1 file changed, 242 insertions(+)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CheckConstraintSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CheckConstraintSuite.scala
index a734f8507dac..397d9248f628 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CheckConstraintSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CheckConstraintSuite.scala
@@ -371,6 +371,248 @@ class CheckConstraintSuite extends QueryTest with 
CommandSuiteBase with DDLComma
     }
   }
 
+  test("Check constraint with current_timestamp function") {
+    withNamespaceAndTable("ns", "tbl", nonPartitionCatalog) { t =>
+      // Create table with a constraint using current_timestamp
+      sql(s"CREATE TABLE $t (id INT, creation_time TIMESTAMP, " +
+        s"CONSTRAINT valid_time CHECK (creation_time <= current_timestamp())) 
$defaultUsing")
+
+      // Insert valid data (current or past timestamp)
+      sql(s"INSERT INTO $t VALUES (1, current_timestamp()), (2, TIMESTAMP 
'2020-01-01 00:00:00')")
+      checkAnswer(spark.table(t).select("id"), Seq(Row(1), Row(2)))
+
+      // Insert valid data with null timestamp
+      sql(s"INSERT INTO $t VALUES (3, null)")
+      checkAnswer(spark.table(t).select("id"), Seq(Row(1), Row(2), Row(3)))
+
+      // Future timestamp should fail validation
+      val tomorrow = "current_timestamp() + INTERVAL 1 DAY"
+      val error = intercept[SparkRuntimeException] {
+        sql(s"INSERT INTO $t VALUES (4, $tomorrow)")
+      }
+      assert(error.getMessage.contains("CHECK_CONSTRAINT_VIOLATION"))
+      assert(error.getMessageParameters.get("constraintName") == "valid_time")
+      assert(error.getMessageParameters.get("expression") == "creation_time <= 
current_timestamp()")
+    }
+  }
+
+  test("Check constraint with current_timestamp function - update operation") {
+    withNamespaceAndTable("ns", "tbl", rowLevelOPCatalog) { t =>
+      // Create table with a constraint using current_timestamp
+      sql(s"CREATE TABLE $t (id INT, creation_time TIMESTAMP, " +
+        s"CONSTRAINT valid_time CHECK (creation_time <= current_timestamp())) 
$defaultUsing")
+
+      // Insert initial data
+      sql(s"INSERT INTO $t VALUES (1, current_timestamp()), (2, TIMESTAMP 
'2020-01-01 00:00:00')")
+
+      // Valid update with current or past timestamp
+      sql(s"UPDATE $t SET creation_time = TIMESTAMP '2021-01-01 00:00:00' 
WHERE id = 1")
+      checkAnswer(spark.table(t).select("id")
+        .where("creation_time = TIMESTAMP '2021-01-01 00:00:00'"),
+        Seq(Row(1)))
+
+      // Valid update with null timestamp
+      sql(s"UPDATE $t SET creation_time = null WHERE id = 2")
+      checkAnswer(sql(s"SELECT * FROM $t WHERE id = 2"), Seq(Row(2, null)))
+
+      // Future timestamp should fail validation
+      val tomorrow = "current_timestamp() + INTERVAL 1 DAY"
+      val error = intercept[SparkRuntimeException] {
+        sql(s"UPDATE $t SET creation_time = $tomorrow WHERE id = 1")
+      }
+      assert(error.getMessage.contains("CHECK_CONSTRAINT_VIOLATION"))
+      assert(error.getMessageParameters.get("constraintName") == "valid_time")
+      assert(error.getMessageParameters.get("expression") == "creation_time <= 
current_timestamp()")
+    }
+  }
+
+  test("Check constraint with current_timestamp function - merge operation") {
+    withNamespaceAndTable("ns", "tbl", rowLevelOPCatalog) { target =>
+      withNamespaceAndTable("ns", "source", rowLevelOPCatalog) { source =>
+        // Create target table with constraint using current_timestamp
+        sql(s"CREATE TABLE $target (id INT, creation_time TIMESTAMP, " +
+          s"CONSTRAINT valid_time CHECK (creation_time <= 
current_timestamp())) $defaultUsing")
+
+        // Create source table without constraints
+        sql(s"CREATE TABLE $source (id INT, creation_time TIMESTAMP) 
$defaultUsing")
+
+        // Insert initial data
+        sql(s"INSERT INTO $target VALUES (1, TIMESTAMP '2020-01-01 00:00:00')")
+        sql(s"INSERT INTO $source VALUES " +
+          s"(2, TIMESTAMP '2021-01-01 00:00:00'), " +
+          s"(3, current_timestamp()), " +
+          s"(4, null)")
+
+        // Valid merge with past timestamps or null
+        sql(
+          s"""
+             |MERGE INTO $target t
+             |USING (SELECT * FROM $source WHERE id IN (2, 4)) s
+             |ON t.id = s.id
+             |WHEN MATCHED THEN UPDATE SET creation_time = s.creation_time
+             |WHEN NOT MATCHED THEN INSERT (id, creation_time) VALUES (s.id, 
s.creation_time)
+             |""".stripMargin)
+
+        checkAnswer(spark.table(target).orderBy("id"),
+          Seq(Row(1, java.sql.Timestamp.valueOf("2020-01-01 00:00:00")),
+              Row(2, java.sql.Timestamp.valueOf("2021-01-01 00:00:00")),
+              Row(4, null)))
+
+        // Future timestamp should fail validation
+        val tomorrow = "current_timestamp() + INTERVAL 1 DAY"
+        sql(s"INSERT INTO $source VALUES (5, $tomorrow)")
+
+        val error = intercept[SparkRuntimeException] {
+          sql(
+            s"""
+               |MERGE INTO $target t
+               |USING (SELECT * FROM $source WHERE id = 5) s
+               |ON t.id = s.id
+               |WHEN NOT MATCHED THEN INSERT (id, creation_time) VALUES (s.id, 
s.creation_time)
+               |""".stripMargin)
+        }
+
+        assert(error.getMessage.contains("CHECK_CONSTRAINT_VIOLATION"))
+        assert(error.getMessageParameters.get("constraintName") == 
"valid_time")
+        assert(error.getMessageParameters.get("expression") ==
+          "creation_time <= current_timestamp()")
+      }
+    }
+  }
+
+  test("Check constraint with current_date function") {
+    // Create another table with other current_* functions
+    withNamespaceAndTable("ns", "tbl", nonPartitionCatalog) { t =>
+      sql(s"CREATE TABLE $t (id INT, creation_date DATE, " +
+        s"CONSTRAINT valid_date CHECK (creation_date <= current_date())) 
$defaultUsing")
+
+      // Insert valid data (current or past timestamp)
+      sql(s"INSERT INTO $t VALUES (1, current_date()), (2, DATE'2020-01-01')")
+      checkAnswer(spark.table(t).select("id"), Seq(Row(1), Row(2)))
+
+      // Future date should fail validation
+      val tomorrow = "DATE'9999-12-31'"
+      val error = intercept[SparkRuntimeException] {
+        sql(s"INSERT INTO $t VALUES (3, $tomorrow)")
+      }
+      checkError(
+        exception = error,
+        condition = "CHECK_CONSTRAINT_VIOLATION",
+        sqlState = "23001",
+        parameters = Map(
+          "constraintName" -> "valid_date",
+          "expression" -> "creation_date <= current_date()",
+          "values" -> " - creation_date : 2932896"
+        )
+      )
+    }
+  }
+
+  test("Check constraint with current_database function") {
+    withNamespaceAndTable("test_db", "tbl", nonPartitionCatalog) { t =>
+      sql(s"USE $nonPartitionCatalog.test_db")
+      sql(s"CREATE TABLE $t (id INT, db STRING, " +
+        s"CONSTRAINT valid_db CHECK (db = current_database())) $defaultUsing")
+
+      // Insert valid data (current database)
+      sql(s"INSERT INTO $t VALUES (1, current_database()), (2, 'test_db')")
+      checkAnswer(spark.table(t).select("id"), Seq(Row(1), Row(2)))
+
+      // Invalid database should fail validation
+      val error = intercept[SparkRuntimeException] {
+        sql(s"INSERT INTO $t VALUES (3, 'invalid_db')")
+      }
+      checkError(
+        exception = error,
+        condition = "CHECK_CONSTRAINT_VIOLATION",
+        sqlState = "23001",
+        parameters = Map(
+          "constraintName" -> "valid_db",
+          "expression" -> "db = current_database()",
+          "values" -> " - db : invalid_db"
+        )
+      )
+    }
+  }
+
+  test("Check constraint with current_database function - update operation") {
+    withNamespaceAndTable("test_db", "tbl", rowLevelOPCatalog) { t =>
+      sql(s"USE $rowLevelOPCatalog.test_db")
+      sql(s"CREATE TABLE $t (id INT, db STRING, " +
+        s"CONSTRAINT valid_db CHECK (db = current_database())) $defaultUsing")
+
+      // Insert initial valid data
+      sql(s"INSERT INTO $t VALUES (1, current_database()), (2, 'test_db')")
+      checkAnswer(spark.table(t).select("id"), Seq(Row(1), Row(2)))
+
+      // Valid update with current database value
+      sql(s"UPDATE $t SET db = 'test_db' WHERE id = 1")
+      checkAnswer(spark.table(t).where("id = 1"), Seq(Row(1, "test_db")))
+
+      // Invalid database should fail validation
+      val error = intercept[SparkRuntimeException] {
+        sql(s"UPDATE $t SET db = 'invalid_db' WHERE id = 2")
+      }
+      checkError(
+        exception = error,
+        condition = "CHECK_CONSTRAINT_VIOLATION",
+        sqlState = "23001",
+        parameters = Map(
+          "constraintName" -> "valid_db",
+          "expression" -> "db = current_database()",
+          "values" -> " - db : invalid_db"
+        )
+      )
+    }
+  }
+
+  test("Check constraint with current_database function - merge operation") {
+    withNamespaceAndTable("test_db", "target", rowLevelOPCatalog) { target =>
+      withNamespaceAndTable("test_db", "source", rowLevelOPCatalog) { source =>
+        sql(s"USE $rowLevelOPCatalog.test_db")
+        sql(s"CREATE TABLE $target (id INT, db STRING, " +
+          s"CONSTRAINT valid_db CHECK (db = current_database())) 
$defaultUsing")
+        sql(s"CREATE TABLE $source (id INT, db STRING) $defaultUsing")
+
+        // Insert initial valid data
+        sql(s"INSERT INTO $target VALUES (1, current_database()), (2, 
'test_db')")
+        sql(s"INSERT INTO $source VALUES (3, 'test_db'), (4, 'invalid_db')")
+
+        // Valid merge with current database value
+        sql(
+          s"""
+             |MERGE INTO $target t
+             |USING (SELECT * FROM $source WHERE id = 3) s
+             |ON t.id = s.id
+             |WHEN NOT MATCHED THEN INSERT (id, db) VALUES (s.id, s.db)
+             |""".stripMargin)
+        checkAnswer(spark.table(target).orderBy("id"),
+          Seq(Row(1, "test_db"), Row(2, "test_db"), Row(3, "test_db")))
+
+        // Invalid database should fail validation
+        val error = intercept[SparkRuntimeException] {
+          sql(
+            s"""
+               |MERGE INTO $target t
+               |USING (SELECT * FROM $source WHERE id = 4) s
+               |ON t.id = s.id
+               |WHEN NOT MATCHED THEN INSERT (id, db) VALUES (s.id, s.db)
+               |""".stripMargin)
+        }
+        checkError(
+          exception = error,
+          condition = "CHECK_CONSTRAINT_VIOLATION",
+          sqlState = "23001",
+          parameters = Map(
+            "constraintName" -> "valid_db",
+            "expression" -> "db = current_database()",
+            "values" -> " - db : invalid_db"
+          )
+        )
+      }
+    }
+  }
+
   test("Check constraint violation on table insert - nested column") {
     withNamespaceAndTable("ns", "tbl", nonPartitionCatalog) { t =>
       sql(s"CREATE TABLE $t (id INT," +


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to