This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new edec4612a Chore: to_json unit/benchmark tests (#3011)
edec4612a is described below
commit edec4612a038f06920092ff87558108b0de43c21
Author: Kazantsev Maksim <[email protected]>
AuthorDate: Tue Jan 6 13:24:20 2026 -0800
Chore: to_json unit/benchmark tests (#3011)
---
.../scala/org/apache/comet/serde/structs.scala | 39 +++++++------
.../apache/comet/testing/FuzzDataGenerator.scala | 11 ++--
.../apache/comet/CometJsonExpressionSuite.scala | 39 ++++++++++++-
.../benchmark/CometJsonExpressionBenchmark.scala | 64 +++++++++++++++++++++-
4 files changed, 123 insertions(+), 30 deletions(-)
diff --git a/spark/src/main/scala/org/apache/comet/serde/structs.scala
b/spark/src/main/scala/org/apache/comet/serde/structs.scala
index 55e031d34..b76c64bac 100644
--- a/spark/src/main/scala/org/apache/comet/serde/structs.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/structs.scala
@@ -111,26 +111,6 @@ object CometStructsToJson extends
CometExpressionSerde[StructsToJson] {
withInfo(expr, "StructsToJson with options is not supported")
None
} else {
-
- def isSupportedType(dt: DataType): Boolean = {
- dt match {
- case StructType(fields) =>
- fields.forall(f => isSupportedType(f.dataType))
- case DataTypes.BooleanType | DataTypes.ByteType |
DataTypes.ShortType |
- DataTypes.IntegerType | DataTypes.LongType | DataTypes.FloatType
|
- DataTypes.DoubleType | DataTypes.StringType =>
- true
- case DataTypes.DateType | DataTypes.TimestampType =>
- // TODO implement these types with tests for formatting options
and timezone
- false
- case _: MapType | _: ArrayType =>
- // Spark supports map and array in StructsToJson but this is not
yet
- // implemented in Comet
- false
- case _ => false
- }
- }
-
val isSupported = expr.child.dataType match {
case s: StructType =>
s.fields.forall(f => isSupportedType(f.dataType))
@@ -166,6 +146,25 @@ object CometStructsToJson extends
CometExpressionSerde[StructsToJson] {
}
}
}
+
+ def isSupportedType(dt: DataType): Boolean = {
+ dt match {
+ case StructType(fields) =>
+ fields.forall(f => isSupportedType(f.dataType))
+ case DataTypes.BooleanType | DataTypes.ByteType | DataTypes.ShortType |
+ DataTypes.IntegerType | DataTypes.LongType | DataTypes.FloatType |
+ DataTypes.DoubleType | DataTypes.StringType =>
+ true
+ case DataTypes.DateType | DataTypes.TimestampType =>
+ // TODO implement these types with tests for formatting options and
timezone
+ false
+ case _: MapType | _: ArrayType =>
+ // Spark supports map and array in StructsToJson but this is not yet
+ // implemented in Comet
+ false
+ case _ => false
+ }
+ }
}
object CometJsonToStructs extends CometExpressionSerde[JsonToStructs] {
diff --git
a/spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala
b/spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala
index 00a85930b..24daebe13 100644
--- a/spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala
+++ b/spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala
@@ -229,8 +229,8 @@ object FuzzDataGenerator {
Range(0, numRows).map(_ => {
r.nextInt(20) match {
case 0 if options.allowNull => null
- case 1 => Float.NegativeInfinity
- case 2 => Float.PositiveInfinity
+ case 1 if options.generateInfinity => Float.NegativeInfinity
+ case 2 if options.generateInfinity => Float.PositiveInfinity
case 3 => Float.MinValue
case 4 => Float.MaxValue
case 5 => 0.0f
@@ -243,8 +243,8 @@ object FuzzDataGenerator {
Range(0, numRows).map(_ => {
r.nextInt(20) match {
case 0 if options.allowNull => null
- case 1 => Double.NegativeInfinity
- case 2 => Double.PositiveInfinity
+ case 1 if options.generateInfinity => Double.NegativeInfinity
+ case 2 if options.generateInfinity => Double.PositiveInfinity
case 3 => Double.MinValue
case 4 => Double.MaxValue
case 5 => 0.0
@@ -329,4 +329,5 @@ case class DataGenOptions(
generateNaN: Boolean = true,
baseDate: Long = FuzzDataGenerator.defaultBaseDate,
customStrings: Seq[String] = Seq.empty,
- maxStringLength: Int = 8)
+ maxStringLength: Int = 8,
+ generateInfinity: Boolean = true)
diff --git
a/spark/src/test/scala/org/apache/comet/CometJsonExpressionSuite.scala
b/spark/src/test/scala/org/apache/comet/CometJsonExpressionSuite.scala
index 38f576526..64c330dbd 100644
--- a/spark/src/test/scala/org/apache/comet/CometJsonExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometJsonExpressionSuite.scala
@@ -19,24 +19,59 @@
package org.apache.comet
+import scala.util.Random
+
import org.scalactic.source.Position
import org.scalatest.Tag
+import org.apache.hadoop.fs.Path
import org.apache.spark.sql.CometTestBase
-import org.apache.spark.sql.catalyst.expressions.JsonToStructs
+import org.apache.spark.sql.catalyst.expressions.{JsonToStructs, StructsToJson}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.functions._
+
+import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus
+import org.apache.comet.serde.CometStructsToJson
+import org.apache.comet.testing.{DataGenOptions, ParquetGenerator,
SchemaGenOptions}
class CometJsonExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
override protected def test(testName: String, testTags: Tag*)(testFun: =>
Any)(implicit
pos: Position): Unit = {
super.test(testName, testTags: _*) {
-
withSQLConf(CometConf.getExprAllowIncompatConfigKey(classOf[JsonToStructs]) ->
"true") {
+ withSQLConf(
+ CometConf.getExprAllowIncompatConfigKey(classOf[JsonToStructs]) ->
"true",
+ CometConf.getExprAllowIncompatConfigKey(classOf[StructsToJson]) ->
"true") {
testFun
}
}
}
+ test("to_json - all supported types") {
+ assume(!isSpark40Plus)
+ withTempDir { dir =>
+ val path = new Path(dir.toURI.toString, "test.parquet")
+ val filename = path.toString
+ val random = new Random(42)
+ withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
+ ParquetGenerator.makeParquetFile(
+ random,
+ spark,
+ filename,
+ 100,
+ SchemaGenOptions(generateArray = false, generateStruct = false,
generateMap = false),
+ DataGenOptions(generateNaN = false, generateInfinity = false))
+ }
+ val table = spark.read.parquet(filename)
+ val fieldsNames = table.schema.fields
+ .filter(sf => CometStructsToJson.isSupportedType(sf.dataType))
+ .map(sf => col(sf.name))
+ .toSeq
+ val df = table.select(to_json(struct(fieldsNames: _*)))
+ checkSparkAnswerAndOperator(df)
+ }
+ }
+
test("from_json - basic primitives") {
Seq(true, false).foreach { dictionaryEnabled =>
withParquetTable(
diff --git
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometJsonExpressionBenchmark.scala
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometJsonExpressionBenchmark.scala
index 5b4741ba6..5f1365bd7 100644
---
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometJsonExpressionBenchmark.scala
+++
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometJsonExpressionBenchmark.scala
@@ -19,7 +19,7 @@
package org.apache.spark.sql.benchmark
-import org.apache.spark.sql.catalyst.expressions.JsonToStructs
+import org.apache.spark.sql.catalyst.expressions.{JsonToStructs, StructsToJson}
import org.apache.comet.CometConf
@@ -106,6 +106,44 @@ object CometJsonExpressionBenchmark extends
CometBenchmarkBase {
FROM $tbl
""")
+ case "to_json - simple primitives" =>
+ spark.sql(
+ s"""SELECT named_struct("a", CAST(value AS INT), "b",
concat("str_", CAST(value AS STRING))) AS json_struct FROM $tbl""")
+
+ case "to_json - all primitive types" =>
+ spark.sql(s"""
+ SELECT named_struct(
+ "i32", CAST(value % 1000 AS INT),
+ "i64", CAST(value * 1000000000L AS LONG),
+ "f32", CAST(value * 1.5 AS FLOAT),
+ "f64", CAST(value * 2.5 AS DOUBLE),
+ "bool", CASE WHEN value % 2 = 0 THEN true ELSE false END,
+ "str", concat("value_", CAST(value AS STRING))
+ ) AS json_struct FROM $tbl
+ """)
+
+ case "to_json - with nulls" =>
+ spark.sql(s"""
+ SELECT
+ CASE
+ WHEN value % 10 = 0 THEN CAST(NULL AS STRUCT<a: INT, b:
STRING>)
+ WHEN value % 5 = 0 THEN named_struct("a", CAST(NULL AS INT),
"b", "test")
+ WHEN value % 3 = 0 THEN named_struct("a", CAST(123 AS INT),
"b", CAST(NULL AS STRING))
+ ELSE named_struct("a", CAST(value AS INT), "b",
concat("str_", CAST(value AS STRING)))
+ END AS json_struct
+ FROM $tbl
+ """)
+
+ case "to_json - nested struct" =>
+ spark.sql(s"""
+ SELECT named_struct(
+ "outer", named_struct(
+ "inner_a", CAST(value AS INT),
+ "inner_b", concat("nested_", CAST(value AS STRING))
+ )
+ ) AS json_struct FROM $tbl
+ """)
+
case _ =>
spark.sql(s"""
SELECT
@@ -117,8 +155,9 @@ object CometJsonExpressionBenchmark extends
CometBenchmarkBase {
prepareTable(dir, jsonData)
val extraConfigs = Map(
+ CometConf.getExprAllowIncompatConfigKey(classOf[JsonToStructs]) ->
"true",
CometConf.getExprAllowIncompatConfigKey(
- classOf[JsonToStructs]) -> "true") ++ config.extraCometConfigs
+ classOf[StructsToJson]) -> "true") ++ config.extraCometConfigs
runExpressionBenchmark(config.name, values, config.query, extraConfigs)
}
@@ -127,6 +166,7 @@ object CometJsonExpressionBenchmark extends
CometBenchmarkBase {
// Configuration for all JSON expression benchmarks
private val jsonExpressions = List(
+ // from_json tests
JsonExprConfig(
"from_json - simple primitives",
"a INT, b STRING",
@@ -146,7 +186,25 @@ object CometJsonExpressionBenchmark extends
CometBenchmarkBase {
JsonExprConfig(
"from_json - field access",
"a INT, b STRING",
- "SELECT from_json(json_str, 'a INT, b STRING').a FROM parquetV1Table"))
+ "SELECT from_json(json_str, 'a INT, b STRING').a FROM parquetV1Table"),
+
+ // to_json tests
+ JsonExprConfig(
+ "to_json - simple primitives",
+ "a INT, b STRING",
+ "SELECT to_json(json_struct) FROM parquetV1Table"),
+ JsonExprConfig(
+ "to_json - all primitive types",
+ "i32 INT, i64 BIGINT, f32 FLOAT, f64 DOUBLE, bool BOOLEAN, str STRING",
+ "SELECT to_json(json_struct) FROM parquetV1Table"),
+ JsonExprConfig(
+ "to_json - with nulls",
+ "a INT, b STRING",
+ "SELECT to_json(json_struct) FROM parquetV1Table"),
+ JsonExprConfig(
+ "to_json - nested struct",
+ "outer STRUCT<inner_a: INT, inner_b: STRING>",
+ "SELECT to_json(json_struct) FROM parquetV1Table"))
override def runCometBenchmark(mainArgs: Array[String]): Unit = {
val values = 1024 * 1024
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]