This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new 8bb054642f4 [SPARK-44805][SQL] getBytes/getShorts/getInts/etc. should work in a column vector that has a dictionary 8bb054642f4 is described below commit 8bb054642f4b6b829bed7ff9bd2f94cc43b0f481 Author: Bruce Robbins <bersprock...@gmail.com> AuthorDate: Fri Sep 8 12:57:42 2023 -0700 [SPARK-44805][SQL] getBytes/getShorts/getInts/etc. should work in a column vector that has a dictionary Change getBytes/getShorts/getInts/getLongs/getFloats/getDoubles in `OnHeapColumnVector` and `OffHeapColumnVector` to use the dictionary, if present. The following query gets incorrect results: ``` drop table if exists t1; create table t1 using parquet as select * from values (named_struct('f1', array(1, 2, 3), 'f2', array(1, 1, 2))) as (value); select cast(value as struct<f1:array<double>,f2:array<int>>) AS value from t1; {"f1":[1.0,2.0,3.0],"f2":[0,0,0]} ``` The result should be: ``` {"f1":[1.0,2.0,3.0],"f2":[1,2,3]} ``` The cast operation copies the second array by calling `ColumnarArray#copy`, which in turn calls `ColumnarArray#toIntArray`, which in turn calls `ColumnVector#getInts` on the underlying column vector (which is either an `OnHeapColumnVector` or an `OffHeapColumnVector`). The implementation of `getInts` in either concrete class assumes there is no dictionary and does not use it if it is present (in fact, it even asserts that there is no dictionary). However, in the above example, the col [...] ``` java -cp ~/github/parquet-mr/parquet-tools/target/parquet-tools-1.10.1.jar org.apache.parquet.tools.Main meta ./spark-warehouse/t1/part-00000-122fdd53-8166-407b-aec5-08e0c2845c3d-c000.snappy.parquet ... row group 1: RC:1 TS:112 OFFSET:4 ------------------------------------------------------------------------------------------------------------------------------------------------------- value: .f1: ..list: ...element: INT32 SNAPPY DO:0 FPO:4 SZ:47/47/1.00 VC:3 ENC:RLE,PLAIN ST:[min: 1, max: 3, num_nulls: 0] .f2: ..list: ...element: INT32 SNAPPY DO:51 FPO:80 SZ:69/65/0.94 VC:3 ENC:RLE,PLAIN_DICTIONARY ST:[min: 1, max: 2, num_nulls: 0] ``` The same bug also occurs when field f2 is a map. This PR fixes that case as well. No, except for fixing the correctness issue. New tests. No. Closes #42850 from bersprockets/vector_oddity. Authored-by: Bruce Robbins <bersprock...@gmail.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> (cherry picked from commit fac236e1350d1c71dd772251709db3af877a69c2) Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../sql/execution/columnar/ColumnDictionary.java | 18 +++-- .../execution/vectorized/OffHeapColumnVector.java | 55 +++++++++++---- .../execution/vectorized/OnHeapColumnVector.java | 54 +++++++++++---- .../datasources/parquet/ParquetQuerySuite.scala | 10 +++ .../execution/vectorized/ColumnVectorSuite.scala | 80 +++++++++++++++++++++- 5 files changed, 186 insertions(+), 31 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/columnar/ColumnDictionary.java b/sql/core/src/main/java/org/apache/spark/sql/execution/columnar/ColumnDictionary.java index 419dda874d3..29271fc5c0a 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/columnar/ColumnDictionary.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/columnar/ColumnDictionary.java @@ -22,6 +22,8 @@ import org.apache.spark.sql.execution.vectorized.Dictionary; public final class ColumnDictionary implements Dictionary { private int[] intDictionary; private long[] longDictionary; + private float[] floatDictionary; + private double[] doubleDictionary; public ColumnDictionary(int[] dictionary) { this.intDictionary = dictionary; @@ -31,6 +33,14 @@ public final class ColumnDictionary implements Dictionary { this.longDictionary = dictionary; } + public ColumnDictionary(float[] dictionary) { + this.floatDictionary = dictionary; + } + + public ColumnDictionary(double[] dictionary) { + this.doubleDictionary = dictionary; + } + @Override public int decodeToInt(int id) { return intDictionary[id]; @@ -42,14 +52,10 @@ public final class ColumnDictionary implements Dictionary { } @Override - public float decodeToFloat(int id) { - throw new UnsupportedOperationException("Dictionary encoding does not support float"); - } + public float decodeToFloat(int id) { return floatDictionary[id]; } @Override - public double decodeToDouble(int id) { - throw new UnsupportedOperationException("Dictionary encoding does not support double"); - } + public double decodeToDouble(int id) { return doubleDictionary[id]; } @Override public byte[] decodeToBinary(int id) { diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index 711c00856e9..6663c4ba995 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -210,9 +210,14 @@ public final class OffHeapColumnVector extends WritableColumnVector { @Override public byte[] getBytes(int rowId, int count) { - assert(dictionary == null); byte[] array = new byte[count]; - Platform.copyMemory(null, data + rowId, array, Platform.BYTE_ARRAY_OFFSET, count); + if (dictionary == null) { + Platform.copyMemory(null, data + rowId, array, Platform.BYTE_ARRAY_OFFSET, count); + } else { + for (int i = 0; i < count; i++) { + array[i] = getByte(rowId + i); + } + } return array; } @@ -266,9 +271,14 @@ public final class OffHeapColumnVector extends WritableColumnVector { @Override public short[] getShorts(int rowId, int count) { - assert(dictionary == null); short[] array = new short[count]; - Platform.copyMemory(null, data + rowId * 2L, array, Platform.SHORT_ARRAY_OFFSET, count * 2L); + if (dictionary == null) { + Platform.copyMemory(null, data + rowId * 2L, array, Platform.SHORT_ARRAY_OFFSET, count * 2L); + } else { + for (int i = 0; i < count; i++) { + array[i] = getShort(rowId + i); + } + } return array; } @@ -327,9 +337,14 @@ public final class OffHeapColumnVector extends WritableColumnVector { @Override public int[] getInts(int rowId, int count) { - assert(dictionary == null); int[] array = new int[count]; - Platform.copyMemory(null, data + rowId * 4L, array, Platform.INT_ARRAY_OFFSET, count * 4L); + if (dictionary == null) { + Platform.copyMemory(null, data + rowId * 4L, array, Platform.INT_ARRAY_OFFSET, count * 4L); + } else { + for (int i = 0; i < count; i++) { + array[i] = getInt(rowId + i); + } + } return array; } @@ -399,9 +414,14 @@ public final class OffHeapColumnVector extends WritableColumnVector { @Override public long[] getLongs(int rowId, int count) { - assert(dictionary == null); long[] array = new long[count]; - Platform.copyMemory(null, data + rowId * 8L, array, Platform.LONG_ARRAY_OFFSET, count * 8L); + if (dictionary == null) { + Platform.copyMemory(null, data + rowId * 8L, array, Platform.LONG_ARRAY_OFFSET, count * 8L); + } else { + for (int i = 0; i < count; i++) { + array[i] = getLong(rowId + i); + } + } return array; } @@ -458,9 +478,14 @@ public final class OffHeapColumnVector extends WritableColumnVector { @Override public float[] getFloats(int rowId, int count) { - assert(dictionary == null); float[] array = new float[count]; - Platform.copyMemory(null, data + rowId * 4L, array, Platform.FLOAT_ARRAY_OFFSET, count * 4L); + if (dictionary == null) { + Platform.copyMemory(null, data + rowId * 4L, array, Platform.FLOAT_ARRAY_OFFSET, count * 4L); + } else { + for (int i = 0; i < count; i++) { + array[i] = getFloat(rowId + i); + } + } return array; } @@ -518,9 +543,15 @@ public final class OffHeapColumnVector extends WritableColumnVector { @Override public double[] getDoubles(int rowId, int count) { - assert(dictionary == null); double[] array = new double[count]; - Platform.copyMemory(null, data + rowId * 8L, array, Platform.DOUBLE_ARRAY_OFFSET, count * 8L); + if (dictionary == null) { + Platform.copyMemory(null, data + rowId * 8L, array, Platform.DOUBLE_ARRAY_OFFSET, + count * 8L); + } else { + for (int i = 0; i < count; i++) { + array[i] = getDouble(rowId + i); + } + } return array; } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java index 505377bdb68..85f25b1bbbf 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java @@ -208,9 +208,14 @@ public final class OnHeapColumnVector extends WritableColumnVector { @Override public byte[] getBytes(int rowId, int count) { - assert(dictionary == null); byte[] array = new byte[count]; - System.arraycopy(byteData, rowId, array, 0, count); + if (dictionary == null) { + System.arraycopy(byteData, rowId, array, 0, count); + } else { + for (int i = 0; i < count; i++) { + array[i] = getByte(rowId + i); + } + } return array; } @@ -263,9 +268,14 @@ public final class OnHeapColumnVector extends WritableColumnVector { @Override public short[] getShorts(int rowId, int count) { - assert(dictionary == null); short[] array = new short[count]; - System.arraycopy(shortData, rowId, array, 0, count); + if (dictionary == null) { + System.arraycopy(shortData, rowId, array, 0, count); + } else { + for (int i = 0; i < count; i++) { + array[i] = getShort(rowId + i); + } + } return array; } @@ -319,9 +329,14 @@ public final class OnHeapColumnVector extends WritableColumnVector { @Override public int[] getInts(int rowId, int count) { - assert(dictionary == null); int[] array = new int[count]; - System.arraycopy(intData, rowId, array, 0, count); + if (dictionary == null) { + System.arraycopy(intData, rowId, array, 0, count); + } else { + for (int i = 0; i < count; i++) { + array[i] = getInt(rowId + i); + } + } return array; } @@ -385,9 +400,14 @@ public final class OnHeapColumnVector extends WritableColumnVector { @Override public long[] getLongs(int rowId, int count) { - assert(dictionary == null); long[] array = new long[count]; - System.arraycopy(longData, rowId, array, 0, count); + if (dictionary == null) { + System.arraycopy(longData, rowId, array, 0, count); + } else { + for (int i = 0; i < count; i++) { + array[i] = getLong(rowId + i); + } + } return array; } @@ -437,9 +457,14 @@ public final class OnHeapColumnVector extends WritableColumnVector { @Override public float[] getFloats(int rowId, int count) { - assert(dictionary == null); float[] array = new float[count]; - System.arraycopy(floatData, rowId, array, 0, count); + if (dictionary == null) { + System.arraycopy(floatData, rowId, array, 0, count); + } else { + for (int i = 0; i < count; i++) { + array[i] = getFloat(rowId + i); + } + } return array; } @@ -491,9 +516,14 @@ public final class OnHeapColumnVector extends WritableColumnVector { @Override public double[] getDoubles(int rowId, int count) { - assert(dictionary == null); double[] array = new double[count]; - System.arraycopy(doubleData, rowId, array, 0, count); + if (dictionary == null) { + System.arraycopy(doubleData, rowId, array, 0, count); + } else { + for (int i = 0; i < count; i++) { + array[i] = getDouble(rowId + i); + } + } return array; } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 4e236ad7865..51de8fa04c6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -1014,6 +1014,16 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS checkAnswer(sql("select * from tbl"), expected) } } + + test("SPARK-44805: cast of struct with two arrays") { + withTable("tbl") { + sql("create table tbl (value struct<f1:array<int>,f2:array<int>>) using parquet") + sql("insert into tbl values (named_struct('f1', array(1, 2, 3), 'f2', array(1, 1, 2)))") + val df = sql("select cast(value as struct<f1:array<double>,f2:array<int>>) AS value from tbl") + val expected = Row(Row(Array(1.0d, 2.0d, 3.0d), Array(1, 1, 2))) :: Nil + checkAnswer(df, expected) + } + } } class ParquetV1QuerySuite extends ParquetQuerySuite { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala index 4cf2376a3fc..c9fab1e9d15 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala @@ -21,7 +21,7 @@ import org.scalatest.BeforeAndAfterEach import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow -import org.apache.spark.sql.execution.columnar.ColumnAccessor +import org.apache.spark.sql.execution.columnar.{ColumnAccessor, ColumnDictionary} import org.apache.spark.sql.execution.columnar.compression.ColumnBuilderHelper import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnarArray @@ -383,6 +383,84 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach { assert(testVector.getStruct(1).get(1, DoubleType) === 5.67) } + testVectors("SPARK-44805: getInts with dictionary", 3, IntegerType) { testVector => + val dict = new ColumnDictionary(Array[Int](7, 8, 9)) + testVector.setDictionary(dict) + testVector.reserveDictionaryIds(3) + testVector.getDictionaryIds.putInt(0, 0) + testVector.getDictionaryIds.putInt(1, 1) + testVector.getDictionaryIds.putInt(2, 2) + + assert(testVector.getInts(0, 3)(0) == 7) + assert(testVector.getInts(0, 3)(1) == 8) + assert(testVector.getInts(0, 3)(2) == 9) + } + + testVectors("SPARK-44805: getShorts with dictionary", 3, ShortType) { testVector => + val dict = new ColumnDictionary(Array[Int](7, 8, 9)) + testVector.setDictionary(dict) + testVector.reserveDictionaryIds(3) + testVector.getDictionaryIds.putInt(0, 0) + testVector.getDictionaryIds.putInt(1, 1) + testVector.getDictionaryIds.putInt(2, 2) + + assert(testVector.getShorts(0, 3)(0) == 7) + assert(testVector.getShorts(0, 3)(1) == 8) + assert(testVector.getShorts(0, 3)(2) == 9) + } + + testVectors("SPARK-44805: getBytes with dictionary", 3, ByteType) { testVector => + val dict = new ColumnDictionary(Array[Int](7, 8, 9)) + testVector.setDictionary(dict) + testVector.reserveDictionaryIds(3) + testVector.getDictionaryIds.putInt(0, 0) + testVector.getDictionaryIds.putInt(1, 1) + testVector.getDictionaryIds.putInt(2, 2) + + assert(testVector.getBytes(0, 3)(0) == 7) + assert(testVector.getBytes(0, 3)(1) == 8) + assert(testVector.getBytes(0, 3)(2) == 9) + } + + testVectors("SPARK-44805: getLongs with dictionary", 3, LongType) { testVector => + val dict = new ColumnDictionary(Array[Long](2147483648L, 2147483649L, 2147483650L)) + testVector.setDictionary(dict) + testVector.reserveDictionaryIds(3) + testVector.getDictionaryIds.putInt(0, 0) + testVector.getDictionaryIds.putInt(1, 1) + testVector.getDictionaryIds.putInt(2, 2) + + assert(testVector.getLongs(0, 3)(0) == 2147483648L) + assert(testVector.getLongs(0, 3)(1) == 2147483649L) + assert(testVector.getLongs(0, 3)(2) == 2147483650L) + } + + testVectors("SPARK-44805: getFloats with dictionary", 3, FloatType) { testVector => + val dict = new ColumnDictionary(Array[Float](0.1f, 0.2f, 0.3f)) + testVector.setDictionary(dict) + testVector.reserveDictionaryIds(3) + testVector.getDictionaryIds.putInt(0, 0) + testVector.getDictionaryIds.putInt(1, 1) + testVector.getDictionaryIds.putInt(2, 2) + + assert(testVector.getFloats(0, 3)(0) == 0.1f) + assert(testVector.getFloats(0, 3)(1) == 0.2f) + assert(testVector.getFloats(0, 3)(2) == 0.3f) + } + + testVectors("SPARK-44805: getDoubles with dictionary", 3, DoubleType) { testVector => + val dict = new ColumnDictionary(Array[Double](1342.17727d, 1342.17728d, 1342.17729d)) + testVector.setDictionary(dict) + testVector.reserveDictionaryIds(3) + testVector.getDictionaryIds.putInt(0, 0) + testVector.getDictionaryIds.putInt(1, 1) + testVector.getDictionaryIds.putInt(2, 2) + + assert(testVector.getDoubles(0, 3)(0) == 1342.17727d) + assert(testVector.getDoubles(0, 3)(1) == 1342.17728d) + assert(testVector.getDoubles(0, 3)(2) == 1342.17729d) + } + test("[SPARK-22092] off-heap column vector reallocation corrupts array data") { withVector(new OffHeapColumnVector(8, arrayType)) { testVector => val data = testVector.arrayData() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org