This is an automated email from the ASF dual-hosted git repository.
slfan1989 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git
The following commit(s) were added to refs/heads/master by this push:
new 0e832f2c [AURON #2128] Implement native function of dayofweek (#2129)
0e832f2c is described below
commit 0e832f2cb9298a0c99bba5e5e261f6f518a974ed
Author: Ming Wei <[email protected]>
AuthorDate: Wed Apr 1 05:50:35 2026 +0800
[AURON #2128] Implement native function of dayofweek (#2129)
# Which issue does this PR close?
Closes #https://github.com/apache/auron/issues/2128
# Rationale for this change
To achieve full compatibility with Spark’s date functions, we should
implement `dayofweek()` with the following characteristics:
_Expected behavior_
Function name: `dayofweek(expr)`
Return value: `Sunday = 1, Monday = 2, ..., Saturday = 7`
Example:
`dayofweek('2009-07-30')` → `5`
Supports: `DATE`, `TIMESTAMP`, and compatible string/date inputs
consistent with existing date extraction functions
Null-safe: should return `NULL` if input is `NULL`
Array and scalar inputs: consistent with current date extraction
function implementations
# What changes are included in this PR?
This PR adds native support for the `dayofweek()` function with
Spark-compatible semantics.
The following changes are included:
- Added native implementation of `spark_dayofweek()` in the expression
layer.
- Added `DayOfWeek` expression support in `NativeConverters` for proper
Spark → native translation.
- Added unit tests to verify correctness.
# Are there any user-facing changes?
No.
# How was this patch tested?
CI.
Signed-off-by: weimingdiit <[email protected]>
---
native-engine/datafusion-ext-functions/src/lib.rs | 1 +
.../datafusion-ext-functions/src/spark_dates.rs | 51 ++++++-
.../org/apache/auron/AuronFunctionSuite.scala | 27 ++++
.../apache/spark/sql/auron/NativeConverters.scala | 146 +++++++++++----------
4 files changed, 158 insertions(+), 67 deletions(-)
diff --git a/native-engine/datafusion-ext-functions/src/lib.rs
b/native-engine/datafusion-ext-functions/src/lib.rs
index 9464722e..4b636a40 100644
--- a/native-engine/datafusion-ext-functions/src/lib.rs
+++ b/native-engine/datafusion-ext-functions/src/lib.rs
@@ -74,6 +74,7 @@ pub fn create_auron_ext_function(
"Spark_Year" => Arc::new(spark_dates::spark_year),
"Spark_Month" => Arc::new(spark_dates::spark_month),
"Spark_Day" => Arc::new(spark_dates::spark_day),
+ "Spark_DayOfWeek" => Arc::new(spark_dates::spark_dayofweek),
"Spark_Quarter" => Arc::new(spark_dates::spark_quarter),
"Spark_Hour" => Arc::new(spark_dates::spark_hour),
"Spark_Minute" => Arc::new(spark_dates::spark_minute),
diff --git a/native-engine/datafusion-ext-functions/src/spark_dates.rs
b/native-engine/datafusion-ext-functions/src/spark_dates.rs
index 800b0e53..08ec40a3 100644
--- a/native-engine/datafusion-ext-functions/src/spark_dates.rs
+++ b/native-engine/datafusion-ext-functions/src/spark_dates.rs
@@ -16,7 +16,7 @@
use std::sync::Arc;
use arrow::{
- array::{ArrayRef, Int32Array, TimestampMillisecondArray},
+ array::{ArrayRef, Date32Array, Int32Array, TimestampMillisecondArray},
compute::{DatePart, date_part},
datatypes::{DataType, TimeUnit},
};
@@ -46,6 +46,32 @@ pub fn spark_day(args: &[ColumnarValue]) ->
Result<ColumnarValue> {
Ok(ColumnarValue::Array(date_part(&input, DatePart::Day)?))
}
+/// `spark_dayofweek(date/timestamp/compatible-string)`
+///
+/// Matches Spark's `dayofweek()` semantics:
+/// Sunday = 1, Monday = 2, ..., Saturday = 7.
+pub fn spark_dayofweek(args: &[ColumnarValue]) -> Result<ColumnarValue> {
+ let input = cast(&args[0].clone().into_array(1)?, &DataType::Date32)?;
+ let input = input
+ .as_any()
+ .downcast_ref::<Date32Array>()
+ .expect("internal cast to Date32 must succeed");
+
+ // Date32 is days since 1970-01-01. 1970-01-01 is a Thursday.
+ // If we number weekdays so that Sunday = 0, ..., Saturday = 6,
+ // then 1970-01-01 corresponds to 4. For an offset `days`,
+ // weekday_index = (days + 4) mod 7 gives 0 = Sunday, ..., 6 = Saturday.
+ // Spark wants Sunday = 1, ..., Saturday = 7, so we add 1.
+ let dayofweek = Int32Array::from_iter(input.iter().map(|opt_days| {
+ opt_days.map(|days| {
+ let weekday_index = (days as i64 + 4).rem_euclid(7);
+ weekday_index as i32 + 1
+ })
+ }));
+
+ Ok(ColumnarValue::Array(Arc::new(dayofweek)))
+}
+
/// `spark_quarter(date/timestamp/compatible-string)`
///
/// Simulates Spark's `quarter()` function.
@@ -258,6 +284,29 @@ mod tests {
Ok(())
}
+ #[test]
+ fn test_spark_dayofweek() -> Result<()> {
+ let input = Arc::new(Date32Array::from(vec![
+ Some(-1),
+ Some(0),
+ Some(2),
+ Some(3),
+ Some(4),
+ None,
+ ]));
+ let args = vec![ColumnarValue::Array(input)];
+ let expected_ret: ArrayRef = Arc::new(Int32Array::from(vec![
+ Some(4),
+ Some(5),
+ Some(7),
+ Some(1),
+ Some(2),
+ None,
+ ]));
+ assert_eq!(&spark_dayofweek(&args)?.into_array(1)?, &expected_ret);
+ Ok(())
+ }
+
#[test]
fn test_spark_quarter_basic() -> Result<()> {
// Date32 days relative to 1970-01-01:
diff --git
a/spark-extension-shims-spark/src/test/scala/org/apache/auron/AuronFunctionSuite.scala
b/spark-extension-shims-spark/src/test/scala/org/apache/auron/AuronFunctionSuite.scala
index 06e564a9..442a6f51 100644
---
a/spark-extension-shims-spark/src/test/scala/org/apache/auron/AuronFunctionSuite.scala
+++
b/spark-extension-shims-spark/src/test/scala/org/apache/auron/AuronFunctionSuite.scala
@@ -19,6 +19,7 @@ package org.apache.auron
import java.text.SimpleDateFormat
import org.apache.spark.sql.{AuronQueryTest, Row}
+import org.apache.spark.sql.internal.SQLConf
import org.apache.auron.util.AuronTestUtils
@@ -117,6 +118,32 @@ class AuronFunctionSuite extends AuronQueryTest with
BaseAuronSQLSuite {
}
}
+ test("dayofweek function") {
+ withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") {
+ withTable("t1") {
+ sql("create table t1(c1 date, c2 timestamp) using parquet")
+ sql("""
+ |insert into t1 values
+ | (date'2009-07-30', timestamp'2009-07-30 12:34:56'),
+ | (date'2024-02-29', timestamp'2024-02-29 23:59:59'),
+ | (null, null)
+ |""".stripMargin)
+
+ // DATE column
+ checkSparkAnswerAndOperator("select dayofweek(c1) from t1 where c1 is
not null")
+
+ // NULL DATE input should return NULL
+ checkSparkAnswerAndOperator("select dayofweek(c1) from t1 where c1 is
null")
+
+ // TIMESTAMP column
+ checkSparkAnswerAndOperator("select dayofweek(c2) from t1 where c2 is
not null")
+
+ // NULL TIMESTAMP input should return NULL
+ checkSparkAnswerAndOperator("select dayofweek(c2) from t1 where c2 is
null")
+ }
+ }
+ }
+
test("stddev_samp function with UDAF fallback") {
withSQLConf("spark.auron.udafFallback.enable" -> "true") {
withTable("t1") {
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
index df5c0732..aee252f2 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
@@ -274,6 +274,54 @@ object NativeConverters extends Logging {
override def toString(): String = s"$getClass() dataType:$dataType)"
}
+ private def buildSparkUdfWrapperExpr(
+ sparkExpr: Expression,
+ fallback: Expression => pb.PhysicalExprNode): pb.PhysicalExprNode = {
+ // update subquery result if needed
+ sparkExpr.foreach {
+ case subquery: ExecSubqueryExpression =>
+ prepareExecSubquery(subquery)
+ case _ =>
+ }
+ val exprString = sparkExpr.toString()
+
+ // bind all convertible children
+ val convertedChildren = mutable.LinkedHashMap[pb.PhysicalExprNode,
BoundReference]()
+ val bound = sparkExpr.mapChildren(_.transformDown {
+ case p: Literal => p
+ case p =>
+ try {
+ val convertedChild = convertExprWithFallback(p, isPruningExpr =
false, fallback)
+ val nextBindIndex = convertedChildren.size
+ convertedChildren.getOrElseUpdate(
+ convertedChild,
+ BoundReference(nextBindIndex, p.dataType, p.nullable))
+ } catch {
+ case _: Exception | _: NotImplementedError => p
+ }
+ })
+
+ val paramsSchema = StructType(
+ convertedChildren.values
+ .map(ref => StructField("", ref.dataType, ref.nullable))
+ .toSeq)
+
+ val serialized =
+ serializeExpression(bound.asInstanceOf[Expression with Serializable],
paramsSchema)
+
+ pb.PhysicalExprNode
+ .newBuilder()
+ .setSparkUdfWrapperExpr(
+ pb.PhysicalSparkUDFWrapperExprNode
+ .newBuilder()
+ .setSerialized(ByteString.copyFrom(serialized))
+ .setReturnType(convertDataType(bound.dataType))
+ .setReturnNullable(bound.nullable)
+ .addAllParams(convertedChildren.keys.asJava)
+ .setExprString(exprString))
+ .build()
+ }
+
def convertExpr(sparkExpr: Expression): pb.PhysicalExprNode = {
def fallbackToError: Expression => pb.PhysicalExprNode = { e =>
throw new NotImplementedError(s"unsupported expression: (${e.getClass})
$e")
@@ -321,52 +369,7 @@ object NativeConverters extends Logging {
} catch {
case e: NotImplementedError =>
logWarning(s"Falling back expression: $e")
-
- // update subquery result if needed
- sparkExpr.foreach {
- case subquery: ExecSubqueryExpression =>
- prepareExecSubquery(subquery)
- case _ =>
- }
- val exprString = sparkExpr.toString()
-
- // bind all convertible children
- val convertedChildren = mutable.LinkedHashMap[pb.PhysicalExprNode,
BoundReference]()
- val bound = sparkExpr.mapChildren(_.transformDown {
- case p: Literal => p
- case p =>
- try {
- val convertedChild =
- convertExprWithFallback(p, isPruningExpr = false,
fallbackToError)
- val nextBindIndex = convertedChildren.size
- convertedChildren.getOrElseUpdate(
- convertedChild,
- BoundReference(nextBindIndex, p.dataType, p.nullable))
- } catch {
- case _: Exception | _: NotImplementedError => p
- }
- })
-
- val paramsSchema = StructType(
- convertedChildren.values
- .map(ref => StructField("", ref.dataType, ref.nullable))
- .toSeq)
-
- val serialized =
- serializeExpression(bound.asInstanceOf[Expression with
Serializable], paramsSchema)
-
- // build SparkUDFWrapperExpr
- pb.PhysicalExprNode
- .newBuilder()
- .setSparkUdfWrapperExpr(
- pb.PhysicalSparkUDFWrapperExprNode
- .newBuilder()
- .setSerialized(ByteString.copyFrom(serialized))
- .setReturnType(convertDataType(bound.dataType))
- .setReturnNullable(bound.nullable)
- .addAllParams(convertedChildren.keys.asJava)
- .setExprString(exprString))
- .build()
+ buildSparkUdfWrapperExpr(sparkExpr, fallbackToError)
}
}
@@ -467,27 +470,36 @@ object NativeConverters extends Logging {
}
// cast
- // not performing native cast for timestamp/dates (will use UDFWrapper
instead)
- case cast: Cast
- if !Seq(cast.dataType, cast.child.dataType).exists(t =>
- t.isInstanceOf[TimestampType] || t.isInstanceOf[DateType]) =>
- val castChild =
- if (cast.child.dataType == StringType &&
- (cast.dataType.isInstanceOf[NumericType] || cast.dataType
- .isInstanceOf[BooleanType]) &&
- castTrimStringEnabled) {
- // converting Cast(str as num) to StringTrim(Cast(str as num)) if
enabled
- StringTrim(cast.child)
- } else {
- cast.child
+ case cast: Cast =>
+ val involvesDateOrTimestamp =
+ Seq(cast.dataType, cast.child.dataType).exists {
+ case DateType | TimestampType => true
+ case _ => false
+ }
+
+ if (involvesDateOrTimestamp) {
+ // Keep timestamp/date casts executable in native projects by
wrapping
+ // the Spark expression, since native cast does not support these
types directly.
+ buildSparkUdfWrapperExpr(cast, fallback)
+ } else {
+ val castChild =
+ if (cast.child.dataType == StringType &&
+ (cast.dataType.isInstanceOf[NumericType] || cast.dataType
+ .isInstanceOf[BooleanType]) &&
+ castTrimStringEnabled) {
+ // converting Cast(str as num) to StringTrim(Cast(str as num))
if enabled
+ StringTrim(cast.child)
+ } else {
+ cast.child
+ }
+ buildExprNode {
+ _.setTryCast(
+ pb.PhysicalTryCastNode
+ .newBuilder()
+ .setExpr(convertExprWithFallback(castChild, isPruningExpr,
fallback))
+ .setArrowType(convertDataType(cast.dataType))
+ .build())
}
- buildExprNode {
- _.setTryCast(
- pb.PhysicalTryCastNode
- .newBuilder()
- .setExpr(convertExprWithFallback(castChild, isPruningExpr,
fallback))
- .setArrowType(convertDataType(cast.dataType))
- .build())
}
// in
@@ -926,6 +938,8 @@ object NativeConverters extends Logging {
case Year(child) => buildExtScalarFunction("Spark_Year", child :: Nil,
IntegerType)
case Month(child) => buildExtScalarFunction("Spark_Month", child :: Nil,
IntegerType)
case DayOfMonth(child) => buildExtScalarFunction("Spark_Day", child ::
Nil, IntegerType)
+ case DayOfWeek(child) =>
+ buildExtScalarFunction("Spark_DayOfWeek", child :: Nil, IntegerType)
case Quarter(child) => buildExtScalarFunction("Spark_Quarter", child ::
Nil, IntegerType)
case e: Levenshtein =>