This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new f003453a6117 [SPARK-52882][SQL] Implement the current_time function in Scala f003453a6117 is described below commit f003453a6117bd49c4d94a8bd909f862f77ed084 Author: Uros Bojanic <uros.boja...@databricks.com> AuthorDate: Thu Jul 24 08:19:57 2025 +0200 [SPARK-52882][SQL] Implement the current_time function in Scala ### What changes were proposed in this pull request? Implement the `current_time` function in Scala API. ### Why are the changes needed? Expand API support for the `CurrentTime` expression. ### Does this PR introduce _any_ user-facing change? Yes, the new function is now available in Scala API. ### How was this patch tested? Added appropriate Scala function tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51574 from uros-db/scala-current_time. Authored-by: Uros Bojanic <uros.boja...@databricks.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- python/pyspark/sql/tests/test_functions.py | 3 +- .../scala/org/apache/spark/sql/functions.scala | 30 ++++++++ .../apache/spark/sql/TimeFunctionsSuiteBase.scala | 79 ++++++++++++++++++++++ 3 files changed, 111 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/tests/test_functions.py b/python/pyspark/sql/tests/test_functions.py index 3a4dfd8f6e4d..1bdb9dd76455 100644 --- a/python/pyspark/sql/tests/test_functions.py +++ b/python/pyspark/sql/tests/test_functions.py @@ -83,9 +83,10 @@ class FunctionsTestsMixin: # Functions that we expect to be missing in python until they are added to pyspark expected_missing_in_py = set( # TODO(SPARK-52888): Implement the make_time function in Python + # TODO(SPARK-52889): Implement the current_time function in Python # TODO(SPARK-52890): Implement the to_time function in Python # TODO(SPARK-52891): Implement the try_to_time function in Python - ["make_time", "to_time", "try_to_time"] + ["current_time", "make_time", "to_time", "try_to_time"] ) self.assertEqual( diff --git a/sql/api/src/main/scala/org/apache/spark/sql/functions.scala b/sql/api/src/main/scala/org/apache/spark/sql/functions.scala index e16554eaddbf..4d8f658ca32d 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/functions.scala @@ -1392,6 +1392,36 @@ object functions { */ def count_if(e: Column): Column = Column.fn("count_if", e) + /** + * Returns the current time at the start of query evaluation. Note that the result will contain + * 6 fractional digits of seconds. + * + * @return + * A time. + * + * @group datetime_funcs + * @since 4.1.0 + */ + def current_time(): Column = { + Column.fn("current_time") + } + + /** + * Returns the current time at the start of query evaluation. + * + * @param precision + * An integer literal in the range [0..6], indicating how many fractional digits of seconds to + * include in the result. + * @return + * A time. + * + * @group datetime_funcs + * @since 4.1.0 + */ + def current_time(precision: Int): Column = { + Column.fn("current_time", lit(precision)) + } + /** * Aggregate function: computes a histogram on numeric 'expr' using nb bins. The return value is * an array of (x,y) pairs representing the centers of the histogram's bins. As the value of diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TimeFunctionsSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/TimeFunctionsSuiteBase.scala index 7d7c4597ddfe..8506ab4527c9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/TimeFunctionsSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/TimeFunctionsSuiteBase.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql import java.time.LocalTime +import java.time.temporal.ChronoUnit import org.apache.spark.{SparkConf, SparkDateTimeException} import org.apache.spark.sql.functions._ @@ -28,6 +29,84 @@ import org.apache.spark.sql.types._ abstract class TimeFunctionsSuiteBase extends QueryTest with SharedSparkSession { import testImplicits._ + // Helper method to assert that two DataFrames with TimeType values are approximately equal. + // This method assumes that the two dataframes (df1 and df2) have the same schemas and sizes. + // Also, only 1 column is expected in each DataFrame, and that column must be of TimeType. + private def assertTwoTimesAreApproximatelyEqual(df1: DataFrame, df2: DataFrame) = { + // Check that both DataFrames have the same schema. + val schema1 = df1.schema + val schema2 = df2.schema + require(schema1 == schema2, "Both DataFrames must have the same schema, but got " + + s"$schema1 and $schema2 for the two given DataFrames df1 and df2, respectively.") + // Check that both DataFrames have the same number of rows. + val numRows1 = df1.count() + val numRows2 = df2.count() + require(numRows1 == numRows2, "Both DataFrames must have the same number of rows, but got" + + s"$numRows1 and $numRows2 rows in the two given DataFrames df1 and df2, respectively.") + // Check that both DataFrames have only 1 column. + val fields1 = schema1.fields.length + require(fields1 == 1, s"The first DataFrame must have only one column, but got $fields1.") + val fields2 = schema2.fields.length + require(fields2 == 1, s"The second DataFrame must have only one column, but got $fields2.") + // Check that the column type is TimeType. + val columnType1 = schema1.fields.head.dataType + require(columnType1.isInstanceOf[TimeType], "The column type of the first DataFrame " + + s"must be TimeType, but got $columnType1.") + val columnType2 = schema2.fields.head.dataType + require(columnType2.isInstanceOf[TimeType], "The column type of the second DataFrame " + + s"must be TimeType, but got $columnType2.") + + // Extract the LocalTime values from the input DataFrames. + val time1: LocalTime = df1.collect().head.get(0).asInstanceOf[LocalTime] + val time2: LocalTime = df2.collect().head.get(0).asInstanceOf[LocalTime] + + // Check that the time difference is within a set number of minutes. + val maxTimeDiffInMinutes = 15 // This should be enough time to ensure correctness. + val timeDiffInMillis = Math.abs(ChronoUnit.MILLIS.between(time1, time2)) + assert( + timeDiffInMillis <= maxTimeDiffInMinutes * 60 * 1000, + s"Time difference exceeds $maxTimeDiffInMinutes minutes: $timeDiffInMillis ms." + ) + } + + test("SPARK-52882: current_time function with default precision") { + // Create a dummy DataFrame with a single row to test the current_time() function. + val df = spark.range(1) + + // Test the function using both `selectExpr` and `select`. + val result1 = df.selectExpr( + "current_time()" + ) + val result2 = df.select( + current_time() + ) + + // Check that both methods produce approximately the same result. + assertTwoTimesAreApproximatelyEqual(result1, result2) + } + + test("SPARK-52882: current_time function with specified precision") { + (0 to 6).foreach { precision: Int => + // Create a dummy DataFrame with a single row to test the current_time(precision) function. + val df = spark.range(1) + + // Test the function using both `selectExpr` and `select`. + val result1 = df.selectExpr( + s"current_time($precision)" + ) + val result2 = df.select( + current_time(precision) + ) + + // Confirm that the precision is correctly set. + assert(result1.schema.fields.head.dataType == TimeType(precision)) + assert(result2.schema.fields.head.dataType == TimeType(precision)) + + // Check that both methods produce approximately the same result. + assertTwoTimesAreApproximatelyEqual(result1, result2) + } + } + test("SPARK-52881: make_time function") { // Input data for the function. val schema = StructType(Seq( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org