andygrove commented on issue #3232:
URL:
https://github.com/apache/datafusion-comet/issues/3232#issuecomment-3778431439
## Fuzz Tests for Reproduction
Below are the fuzz tests that reproduce these edge cases. These can be added
to `CometCsvExpressionSuite.scala`:
<details>
<summary>Click to expand test code</summary>
```scala
test("to_csv - fuzz test with random primitive data") {
// Generate random data with various primitive types
val numIterations = 10
val numRows = 100
for (seed <- 1 to numIterations) {
val random = new Random(seed)
withTempDir { dir =>
val path = new Path(dir.toURI.toString, s"test_$seed.parquet")
val filename = path.toString
// Generate schema with primitive types only (excluding incompatible
types)
val compatibleTypes = Seq(
DataTypes.BooleanType,
DataTypes.ByteType,
DataTypes.ShortType,
DataTypes.IntegerType,
DataTypes.LongType,
DataTypes.FloatType,
DataTypes.DoubleType,
DataTypes.createDecimalType(10, 2),
DataTypes.StringType)
val schema = StructType(compatibleTypes.zipWithIndex.map { case (dt,
i) =>
StructField(s"c$i", dt, nullable = true)
})
withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
val dataGenOptions = DataGenOptions(
allowNull = true,
generateNegativeZero = true,
generateNaN = true,
generateInfinity = true)
ParquetGenerator
.makeParquetFile(random, spark, filename, schema, numRows,
dataGenOptions)
}
withSQLConf(CometConf.getExprAllowIncompatConfigKey(classOf[StructsToCsv]) ->
"true") {
val df = spark.read.parquet(filename)
val structCols = df.columns.map(col)
// Test with default options
checkSparkAnswerAndOperator(df.select(to_csv(struct(structCols:
_*))))
// Test with quoteAll
checkSparkAnswerAndOperator(
df.select(to_csv(struct(structCols: _*), Map("quoteAll" ->
"true").asJava)))
}
}
}
}
test("to_csv - fuzz test with random CSV write options") {
val table = "t_fuzz_options"
val random = new Random(42)
// Generate various delimiters to test
val delimiters = Seq(",", ";", "|", "\t", ":", "#", "~")
val quotes = Seq("\"", "'")
val escapes = Seq("\\", "/", "!")
val nullValues = Seq("", "NULL", "N/A", "null", "\\N", "<null>")
withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key ->
CometConf.SCAN_NATIVE_ICEBERG_COMPAT) {
withTable(table) {
sql(s"create table $table(str_col string, int_col int, bool_col
boolean) using parquet")
sql(s"insert into $table values('hello', 1, true)")
sql(s"insert into $table values('world', 2, false)")
sql(s"insert into $table values(null, null, null)")
sql(s"insert into $table values('', 0, true)")
sql(s"insert into $table values('has,comma', 42, false)")
sql(s"""insert into $table values('has"quote', 99, true)""")
sql(s"insert into $table values('has\nnewline', -1, false)")
sql(s"insert into $table values(' spaces ', 100, true)")
val df = sql(s"select * from $table")
// Fuzz test with random combinations of options
for (_ <- 1 to 20) {
val delimiter = delimiters(random.nextInt(delimiters.length))
val quote = quotes(random.nextInt(quotes.length))
val escape = escapes(random.nextInt(escapes.length))
val nullValue = nullValues(random.nextInt(nullValues.length))
val quoteAll = random.nextBoolean()
val ignoreLeading = random.nextBoolean()
val ignoreTrailing = random.nextBoolean()
val options = Map(
"delimiter" -> delimiter,
"quote" -> quote,
"escape" -> escape,
"nullValue" -> nullValue,
"quoteAll" -> quoteAll.toString,
"ignoreLeadingWhiteSpace" -> ignoreLeading.toString,
"ignoreTrailingWhiteSpace" -> ignoreTrailing.toString).asJava
checkSparkAnswerAndOperator(
df.select(to_csv(struct(col("str_col"), col("int_col"),
col("bool_col")), options)))
}
}
}
}
test("to_csv - edge case: delimiter in null value representation") {
withSQLConf(
CometConf.COMET_NATIVE_SCAN_IMPL.key ->
CometConf.SCAN_NATIVE_ICEBERG_COMPAT,
CometConf.getExprAllowIncompatConfigKey(classOf[StructsToCsv]) ->
"true") {
val data = Seq(Row("hello", null), Row(null, "world"), Row(null, null))
val schema = StructType(
Seq(
StructField("a", StringType, nullable = true),
StructField("b", StringType, nullable = true)))
val df = spark.createDataFrame(spark.sparkContext.parallelize(data),
schema)
// Test when null value contains the delimiter
checkSparkAnswerAndOperator(
df.select(to_csv(struct(col("a"), col("b")), Map("nullValue" ->
"N,A").asJava)))
// Test when null value contains the quote character
checkSparkAnswerAndOperator(
df.select(to_csv(struct(col("a"), col("b")), Map("nullValue" ->
"N\"A").asJava)))
}
}
test("to_csv - edge case: single column struct") {
withSQLConf(
CometConf.COMET_NATIVE_SCAN_IMPL.key ->
CometConf.SCAN_NATIVE_ICEBERG_COMPAT,
CometConf.getExprAllowIncompatConfigKey(classOf[StructsToCsv]) ->
"true") {
val data = Seq(Row("hello"), Row("world"), Row(null), Row(""),
Row("has,comma"))
val schema = StructType(Seq(StructField("single", StringType, nullable
= true)))
val df = spark.createDataFrame(spark.sparkContext.parallelize(data),
schema)
checkSparkAnswerAndOperator(df.select(to_csv(struct(col("single")))))
checkSparkAnswerAndOperator(
df.select(to_csv(struct(col("single")), Map("quoteAll" ->
"true").asJava)))
}
}
test("to_csv - fuzz test: comprehensive random data and options") {
val numIterations = 5
val numRows = 50
for (seed <- 1 to numIterations) {
val random = new Random(seed)
withTempDir { dir =>
val path = new Path(dir.toURI.toString,
s"comprehensive_$seed.parquet")
val filename = path.toString
val compatibleTypes = Seq(
DataTypes.BooleanType,
DataTypes.ByteType,
DataTypes.ShortType,
DataTypes.IntegerType,
DataTypes.LongType,
DataTypes.FloatType,
DataTypes.DoubleType,
DataTypes.createDecimalType(10, 2),
DataTypes.StringType)
val numCols = 3 + random.nextInt(6)
val selectedTypes =
(0 until numCols).map(_ =>
compatibleTypes(random.nextInt(compatibleTypes.length)))
val schema = StructType(selectedTypes.zipWithIndex.map { case (dt,
i) =>
StructField(s"c$i", dt, nullable = true)
})
// Custom strings that are problematic for CSV
val problematicStrings = Seq(
",", "\"", "\\", "\n", "\r", "\t",
"a,b", "a\"b", "a\\b", "a\nb",
" ", " ", "", "null", "NULL")
val dataGenOptions = DataGenOptions(
allowNull = true,
generateNegativeZero = true,
generateNaN = true,
generateInfinity = true,
customStrings = problematicStrings,
maxStringLength = 20)
withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
ParquetGenerator
.makeParquetFile(random, spark, filename, schema, numRows,
dataGenOptions)
}
withSQLConf(CometConf.getExprAllowIncompatConfigKey(classOf[StructsToCsv]) ->
"true") {
val df = spark.read.parquet(filename)
val structCols = df.columns.map(col)
val delimiters = Seq(",", ";", "|", "\t")
val quotes = Seq("\"", "'")
val escapes = Seq("\\", "/")
val nullValues = Seq("", "NULL", "N/A")
val options = Map(
"delimiter" -> delimiters(random.nextInt(delimiters.length)),
"quote" -> quotes(random.nextInt(quotes.length)),
"escape" -> escapes(random.nextInt(escapes.length)),
"nullValue" -> nullValues(random.nextInt(nullValues.length)),
"quoteAll" -> random.nextBoolean().toString,
"ignoreLeadingWhiteSpace" -> random.nextBoolean().toString,
"ignoreTrailingWhiteSpace" ->
random.nextBoolean().toString).asJava
checkSparkAnswerAndOperator(df.select(to_csv(struct(structCols:
_*), options)))
}
}
}
}
```
</details>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]