cfmcgrady commented on code in PR #2643:
URL: https://github.com/apache/datafusion-comet/pull/2643#discussion_r2535050650
##########
spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala:
##########
@@ -777,4 +778,28 @@ class CometArrayExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelp
}
}
}
+
+ test("array_reverse 2") {
+ // This test validates data correctness for array<binary> columns with
nullable elements.
+ // See https://github.com/apache/datafusion-comet/issues/2612
+ withTempDir { dir =>
Review Comment:
It’s exercising a long‑standing null-handling issue in Comet.
Minimal reproducible snippet:
```scala
sql("select cast(array(null) as array<binary>) as c1").write
.mode("overwrite")
.save("/tmp/parquet/t1")
sql("select c1, reverse(c1) from parquet.`/tmp/parquet/t1`").show
```
```
# current output
+------+-----------+
| c1|reverse(c1)|
+------+-----------+
|[NULL]| [[]]|
+------+-----------+
# expected output
+------+-----------+
| c1|reverse(c1)|
+------+-----------+
|[NULL]| [NULL]|
+------+-----------+
```
Why this happens:
- reverse for `array<binary>` isn’t implemented natively by Comet, so the
operator falls back to vanilla Spark execution. The root scan, however, is
still CometNativeScan.
```
== Physical Plan ==
CollectLimit (4)
+- * Project (3)
+- * ColumnarToRow (2)
+- CometNativeScan parquet (1)
```
Note the scan is CometNativeScan. The bug is the mismatch between Comet’s
columnar getters and Spark’s expectations when nulls are present.
Relevant generated code (codegenStageId=1) highlights:
- The reverse logic is here:
```java
/* 047 */ for (int project_i_0 = 0; project_i_0 <
project_numElements_1; project_i_0++) {
/* 048 */ int project_j_0 = project_numElements_1 - project_i_0 - 1;
/* 049 */ project_arrayData_0.update(project_i_0,
project_expr_0_0.getBinary(project_j_0));
/* 050 */ }
/* 051 */ project_value_2 = project_arrayData_0;
```
Observation 1: When constructing the reversed array, Spark’s code directly
calls getBinary(j) and does not check element nullability at this point. It
relies on getBinary(j) returning null for null elements.
- When writing out the array, Spark does distinguish nulls:
```java
/* 099 */ for (int project_index_2 = 0; project_index_2 <
project_numElements_3; project_index_2++) {
/* 100 */ if (project_tmpInput_2.isNullAt(project_index_2)) {
/* 101 */
columnartorow_mutableStateArray_4[3].setNull8Bytes(project_index_2);
/* 102 */ } else {
/* 103 */
columnartorow_mutableStateArray_4[3].write(project_index_2,
project_tmpInput_2.getBinary(project_index_2));
/* 104 */ }
```
Observation 2: Spark uses isNullAt to mark nulls, and only calls
getBinary(i) for non-null elements. Therefore, Comet must return null from
getBinary(i) when the element is null; returning an empty byte array leads to
[[]] instead of [NULL].
This PR makes Comet’s ColumnVector getters (getBinary, getUTF8String,
getArray, getMap, getDecimal) return null when isNullAt(i) is true to fix this
bug
##########
spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala:
##########
@@ -777,4 +778,28 @@ class CometArrayExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelp
}
}
}
+
+ test("array_reverse 2") {
+ // This test validates data correctness for array<binary> columns with
nullable elements.
+ // See https://github.com/apache/datafusion-comet/issues/2612
+ withTempDir { dir =>
Review Comment:
<details>
<summary>FYI, the full generated code</summary>
```java
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=1
/* 006 */ final class GeneratedIteratorForCodegenStage1 extends
org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */ private Object[] references;
/* 008 */ private scala.collection.Iterator[] inputs;
/* 009 */ private int columnartorow_batchIdx_0;
/* 010 */ private
org.apache.spark.sql.execution.vectorized.OnHeapColumnVector[]
columnartorow_mutableStateArray_2 = new
org.apache.spark.sql.execution.vectorized.OnHeapColumnVector[1];
/* 011 */ private
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[]
columnartorow_mutableStateArray_3 = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[3];
/* 012 */ private org.apache.spark.sql.vectorized.ColumnarBatch[]
columnartorow_mutableStateArray_1 = new
org.apache.spark.sql.vectorized.ColumnarBatch[1];
/* 013 */ private scala.collection.Iterator[]
columnartorow_mutableStateArray_0 = new scala.collection.Iterator[1];
/* 014 */ private
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter[]
columnartorow_mutableStateArray_4 = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter[4];
/* 015 */
/* 016 */ public GeneratedIteratorForCodegenStage1(Object[] references) {
/* 017 */ this.references = references;
/* 018 */ }
/* 019 */
/* 020 */ public void init(int index, scala.collection.Iterator[] inputs) {
/* 021 */ partitionIndex = index;
/* 022 */ this.inputs = inputs;
/* 023 */ columnartorow_mutableStateArray_0[0] = inputs[0];
/* 024 */
/* 025 */ columnartorow_mutableStateArray_3[0] = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 32);
/* 026 */ columnartorow_mutableStateArray_4[0] = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter(columnartorow_mutableStateArray_3[0],
8);
/* 027 */ columnartorow_mutableStateArray_3[1] = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 32);
/* 028 */ columnartorow_mutableStateArray_4[1] = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter(columnartorow_mutableStateArray_3[1],
8);
/* 029 */ columnartorow_mutableStateArray_3[2] = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 64);
/* 030 */ columnartorow_mutableStateArray_4[2] = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter(columnartorow_mutableStateArray_3[2],
8);
/* 031 */ columnartorow_mutableStateArray_4[3] = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter(columnartorow_mutableStateArray_3[2],
8);
/* 032 */
/* 033 */ }
/* 034 */
/* 035 */ private void project_doConsume_0(ArrayData project_expr_0_0,
boolean project_exprIsNull_0_0) throws java.io.IOException {
/* 036 */ // common sub-expressions
/* 037 */
/* 038 */ boolean project_isNull_2 = project_exprIsNull_0_0;
/* 039 */ ArrayData project_value_2 = null;
/* 040 */
/* 041 */ if (!project_exprIsNull_0_0) {
/* 042 */ final int project_numElements_1 =
project_expr_0_0.numElements();
/* 043 */
/* 044 */ ArrayData project_arrayData_0 = ArrayData.allocateArrayData(
/* 045 */ -1, project_numElements_1, " reverse failed.");
/* 046 */
/* 047 */ for (int project_i_0 = 0; project_i_0 <
project_numElements_1; project_i_0++) {
/* 048 */ int project_j_0 = project_numElements_1 - project_i_0 - 1;
/* 049 */ project_arrayData_0.update(project_i_0,
project_expr_0_0.getBinary(project_j_0));
/* 050 */ }
/* 051 */ project_value_2 = project_arrayData_0;
/* 052 */
/* 053 */ }
/* 054 */ columnartorow_mutableStateArray_3[2].reset();
/* 055 */
/* 056 */ columnartorow_mutableStateArray_3[2].zeroOutNullBytes();
/* 057 */
/* 058 */ if (project_exprIsNull_0_0) {
/* 059 */ columnartorow_mutableStateArray_3[2].setNullAt(0);
/* 060 */ } else {
/* 061 */ // Remember the current cursor so that we can calculate how
many bytes are
/* 062 */ // written later.
/* 063 */ final int project_previousCursor_1 =
columnartorow_mutableStateArray_3[2].cursor();
/* 064 */
/* 065 */ final ArrayData project_tmpInput_1 = project_expr_0_0;
/* 066 */ if (project_tmpInput_1 instanceof UnsafeArrayData) {
/* 067 */
columnartorow_mutableStateArray_3[2].write((UnsafeArrayData)
project_tmpInput_1);
/* 068 */ } else {
/* 069 */ final int project_numElements_2 =
project_tmpInput_1.numElements();
/* 070 */
columnartorow_mutableStateArray_4[2].initialize(project_numElements_2);
/* 071 */
/* 072 */ for (int project_index_1 = 0; project_index_1 <
project_numElements_2; project_index_1++) {
/* 073 */ if (project_tmpInput_1.isNullAt(project_index_1)) {
/* 074 */
columnartorow_mutableStateArray_4[2].setNull8Bytes(project_index_1);
/* 075 */ } else {
/* 076 */
columnartorow_mutableStateArray_4[2].write(project_index_1,
project_tmpInput_1.getBinary(project_index_1));
/* 077 */ }
/* 078 */
/* 079 */ }
/* 080 */ }
/* 081 */
/* 082 */
columnartorow_mutableStateArray_3[2].setOffsetAndSizeFromPreviousCursor(0,
project_previousCursor_1);
/* 083 */ }
/* 084 */
/* 085 */ if (project_isNull_2) {
/* 086 */ columnartorow_mutableStateArray_3[2].setNullAt(1);
/* 087 */ } else {
/* 088 */ // Remember the current cursor so that we can calculate how
many bytes are
/* 089 */ // written later.
/* 090 */ final int project_previousCursor_2 =
columnartorow_mutableStateArray_3[2].cursor();
/* 091 */
/* 092 */ final ArrayData project_tmpInput_2 = project_value_2;
/* 093 */ if (project_tmpInput_2 instanceof UnsafeArrayData) {
/* 094 */
columnartorow_mutableStateArray_3[2].write((UnsafeArrayData)
project_tmpInput_2);
/* 095 */ } else {
/* 096 */ final int project_numElements_3 =
project_tmpInput_2.numElements();
/* 097 */
columnartorow_mutableStateArray_4[3].initialize(project_numElements_3);
/* 098 */
/* 099 */ for (int project_index_2 = 0; project_index_2 <
project_numElements_3; project_index_2++) {
/* 100 */ if (project_tmpInput_2.isNullAt(project_index_2)) {
/* 101 */
columnartorow_mutableStateArray_4[3].setNull8Bytes(project_index_2);
/* 102 */ } else {
/* 103 */
columnartorow_mutableStateArray_4[3].write(project_index_2,
project_tmpInput_2.getBinary(project_index_2));
/* 104 */ }
/* 105 */
/* 106 */ }
/* 107 */ }
/* 108 */
/* 109 */
columnartorow_mutableStateArray_3[2].setOffsetAndSizeFromPreviousCursor(1,
project_previousCursor_2);
/* 110 */ }
/* 111 */ append((columnartorow_mutableStateArray_3[2].getRow()));
/* 112 */
/* 113 */ }
/* 114 */
/* 115 */ private void columnartorow_nextBatch_0() throws
java.io.IOException {
/* 116 */ if (columnartorow_mutableStateArray_0[0].hasNext()) {
/* 117 */ columnartorow_mutableStateArray_1[0] =
(org.apache.spark.sql.vectorized.ColumnarBatch)columnartorow_mutableStateArray_0[0].next();
/* 118 */ ((org.apache.spark.sql.execution.metric.SQLMetric)
references[1] /* numInputBatches */).add(1);
/* 119 */ ((org.apache.spark.sql.execution.metric.SQLMetric)
references[0] /* numOutputRows
*/).add(columnartorow_mutableStateArray_1[0].numRows());
/* 120 */ columnartorow_batchIdx_0 = 0;
/* 121 */ columnartorow_mutableStateArray_2[0] =
(org.apache.spark.sql.execution.vectorized.OnHeapColumnVector)
columnartorow_mutableStateArray_1[0].column(0);
/* 122 */
/* 123 */ }
/* 124 */ }
/* 125 */
/* 126 */ protected void processNext() throws java.io.IOException {
/* 127 */ if (columnartorow_mutableStateArray_1[0] == null) {
/* 128 */ columnartorow_nextBatch_0();
/* 129 */ }
/* 130 */ while ( columnartorow_mutableStateArray_1[0] != null) {
/* 131 */ int columnartorow_numRows_0 =
columnartorow_mutableStateArray_1[0].numRows();
/* 132 */ int columnartorow_localEnd_0 = columnartorow_numRows_0 -
columnartorow_batchIdx_0;
/* 133 */ for (int columnartorow_localIdx_0 = 0;
columnartorow_localIdx_0 < columnartorow_localEnd_0;
columnartorow_localIdx_0++) {
/* 134 */ int columnartorow_rowIdx_0 = columnartorow_batchIdx_0 +
columnartorow_localIdx_0;
/* 135 */ boolean columnartorow_isNull_0 =
columnartorow_mutableStateArray_2[0].isNullAt(columnartorow_rowIdx_0);
/* 136 */ ArrayData columnartorow_value_0 = columnartorow_isNull_0 ?
null : (columnartorow_mutableStateArray_2[0].getArray(columnartorow_rowIdx_0));
/* 137 */
/* 138 */ project_doConsume_0(columnartorow_value_0,
columnartorow_isNull_0);
/* 139 */ if (shouldStop()) { columnartorow_batchIdx_0 =
columnartorow_rowIdx_0 + 1; return; }
/* 140 */ }
/* 141 */ columnartorow_batchIdx_0 = columnartorow_numRows_0;
/* 142 */ columnartorow_mutableStateArray_1[0] = null;
/* 143 */ columnartorow_nextBatch_0();
/* 144 */ }
/* 145 */ }
/* 146 */
/* 147 */ }
```
</details>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]