This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new e90d484  feat: Disable cast string to timestamp by default (#337)
e90d484 is described below

commit e90d484e7524a068a2382a1304fd1a7acf4f251a
Author: Andy Grove <[email protected]>
AuthorDate: Mon Apr 29 07:37:00 2024 -0600

    feat: Disable cast string to timestamp by default (#337)
---
 .../main/scala/org/apache/comet/CometConf.scala    |  7 +++++
 docs/source/user-guide/compatibility.md            |  6 ++++
 .../org/apache/comet/serde/QueryPlanSerde.scala    | 17 ++++++++++-
 .../scala/org/apache/comet/CometCastSuite.scala    | 34 ++++++++++++++++++++--
 4 files changed, 60 insertions(+), 4 deletions(-)

diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala 
b/common/src/main/scala/org/apache/comet/CometConf.scala
index d993d6d..b245953 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -365,6 +365,13 @@ object CometConf {
     .booleanConf
     .createWithDefault(false)
 
+  val COMET_CAST_STRING_TO_TIMESTAMP: ConfigEntry[Boolean] = conf(
+    "spark.comet.cast.stringToTimestamp")
+    .doc(
+      "Comet is not currently fully compatible with Spark when casting from 
String to Timestamp.")
+    .booleanConf
+    .createWithDefault(false)
+
 }
 
 object ConfigHelpers {
diff --git a/docs/source/user-guide/compatibility.md 
b/docs/source/user-guide/compatibility.md
index d817ba5..b4b4c92 100644
--- a/docs/source/user-guide/compatibility.md
+++ b/docs/source/user-guide/compatibility.md
@@ -38,3 +38,9 @@ Comet currently delegates to Apache DataFusion for most cast 
operations, and thi
 guaranteed to be consistent with Spark.
 
 There is an [epic](https://github.com/apache/datafusion-comet/issues/286) 
where we are tracking the work to implement Spark-compatible cast expressions.
+
+### Cast from String to Timestamp
+
+Casting from String to Timestamp is disabled by default due to 
incompatibilities with Spark, including timezone
+issues, and can be enabled by setting 
`spark.comet.castStringToTimestamp=true`. See the
+[tracking issue](https://github.com/apache/datafusion-comet/issues/328) for 
more information.
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala 
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index 57b15e2..e1e7a71 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -41,6 +41,7 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
+import org.apache.comet.CometConf
 import org.apache.comet.CometSparkSessionExtensions.{isCometOperatorEnabled, 
isCometScan, isSpark32, isSpark34Plus, withInfo}
 import org.apache.comet.serde.ExprOuterClass.{AggExpr, DataType => 
ProtoDataType, Expr, ScalarFunc}
 import org.apache.comet.serde.ExprOuterClass.DataType.{DataTypeInfo, 
DecimalInfo, ListInfo, MapInfo, StructInfo}
@@ -584,7 +585,21 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
               // Spark 3.4+ has EvalMode enum with values LEGACY, ANSI, and TRY
               evalMode.toString
             }
-            castToProto(timeZoneId, dt, childExpr, evalModeStr)
+            val supportedCast = (child.dataType, dt) match {
+              case (DataTypes.StringType, DataTypes.TimestampType)
+                  if !CometConf.COMET_CAST_STRING_TO_TIMESTAMP.get() =>
+                // https://github.com/apache/datafusion-comet/issues/328
+                withInfo(expr, 
s"${CometConf.COMET_CAST_STRING_TO_TIMESTAMP.key} is disabled")
+                false
+              case _ => true
+            }
+            if (supportedCast) {
+              castToProto(timeZoneId, dt, childExpr, evalModeStr)
+            } else {
+              // no need to call withInfo here since it was called when 
determining
+              // the value for `supportedCast`
+              None
+            }
           } else {
             withInfo(expr, child)
             None
diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala 
b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
index 8abd245..669a855 100644
--- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
@@ -98,12 +98,22 @@ class CometCastSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
   }
 
   ignore("cast string to date") {
-    castTest(generateStrings(datePattern, 8).toDF("a"), DataTypes.DoubleType)
+    castTest(generateStrings(datePattern, 8).toDF("a"), DataTypes.DateType)
+  }
+
+  test("cast string to timestamp disabled by default") {
+    val values = Seq("2020-01-01T12:34:56.123456", "T2").toDF("a")
+    castFallbackTest(
+      values.toDF("a"),
+      DataTypes.TimestampType,
+      "spark.comet.cast.stringToTimestamp is disabled")
   }
 
   ignore("cast string to timestamp") {
-    val values = Seq("2020-01-01T12:34:56.123456", "T2") ++ 
generateStrings(timestampPattern, 8)
-    castTest(values.toDF("a"), DataTypes.DoubleType)
+    withSQLConf((CometConf.COMET_CAST_STRING_TO_TIMESTAMP.key, "true")) {
+      val values = Seq("2020-01-01T12:34:56.123456", "T2") ++ 
generateStrings(timestampPattern, 8)
+      castTest(values.toDF("a"), DataTypes.TimestampType)
+    }
   }
 
   private def generateFloats(): DataFrame = {
@@ -126,6 +136,24 @@ class CometCastSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
     Range(0, dataSize).map(_ => generateString(r, chars, maxLen))
   }
 
+  private def castFallbackTest(
+      input: DataFrame,
+      toType: DataType,
+      expectedMessage: String): Unit = {
+    withTempPath { dir =>
+      val data = roundtripParquet(input, dir).coalesce(1)
+      data.createOrReplaceTempView("t")
+
+      withSQLConf((SQLConf.ANSI_ENABLED.key, "false")) {
+        val df = data.withColumn("converted", col("a").cast(toType))
+        df.collect()
+        val str =
+          new 
ExtendedExplainInfo().generateExtendedInfo(df.queryExecution.executedPlan)
+        assert(str.contains(expectedMessage))
+      }
+    }
+  }
+
   private def castTest(input: DataFrame, toType: DataType): Unit = {
     withTempPath { dir =>
       val data = roundtripParquet(input, dir).coalesce(1)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to