This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new fd8d5ad [SPARK-36928][SQL] Handle ANSI intervals in ColumnarRow, ColumnarBatchRow and ColumnarArray fd8d5ad is described below commit fd8d5ad2140d6405357b908dce2d00a21036dedb Author: PengLei <peng.8...@gmail.com> AuthorDate: Thu Oct 28 14:52:41 2021 +0300 [SPARK-36928][SQL] Handle ANSI intervals in ColumnarRow, ColumnarBatchRow and ColumnarArray ### What changes were proposed in this pull request? 1. add handle ansi interval type for `get`, `copy` method of ColumnarArray 2. add handle ansi interval type for `get`, `copy` method of ColumnarBatchRow 3. add handle ansi interval type for `get`, `copy` method of ColumnarRow ### Why are the changes needed? [SPARK-36928](https://issues.apache.org/jira/browse/SPARK-36928) ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Add test case Closes #34421 from Peng-Lei/SPARK-36928. Authored-by: PengLei <peng.8...@gmail.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- .../apache/spark/sql/vectorized/ColumnarArray.java | 6 +- .../spark/sql/vectorized/ColumnarBatchRow.java | 8 +-- .../apache/spark/sql/vectorized/ColumnarRow.java | 8 +-- .../execution/vectorized/ColumnVectorSuite.scala | 69 ++++++++++++++++++++++ .../execution/vectorized/ColumnarBatchSuite.scala | 32 ++++++++++ 5 files changed, 113 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java index 147dd24..2fb6b3f 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java @@ -57,9 +57,11 @@ public final class ColumnarArray extends ArrayData { return UnsafeArrayData.fromPrimitiveArray(toByteArray()); } else if (dt instanceof ShortType) { return UnsafeArrayData.fromPrimitiveArray(toShortArray()); - } else if (dt instanceof IntegerType || dt instanceof DateType) { + } else if (dt instanceof IntegerType || dt instanceof DateType + || dt instanceof YearMonthIntervalType) { return UnsafeArrayData.fromPrimitiveArray(toIntArray()); - } else if (dt instanceof LongType || dt instanceof TimestampType) { + } else if (dt instanceof LongType || dt instanceof TimestampType + || dt instanceof DayTimeIntervalType) { return UnsafeArrayData.fromPrimitiveArray(toLongArray()); } else if (dt instanceof FloatType) { return UnsafeArrayData.fromPrimitiveArray(toFloatArray()); diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatchRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatchRow.java index c6b7287e7..8c32d5c 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatchRow.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatchRow.java @@ -52,9 +52,9 @@ public final class ColumnarBatchRow extends InternalRow { row.setByte(i, getByte(i)); } else if (dt instanceof ShortType) { row.setShort(i, getShort(i)); - } else if (dt instanceof IntegerType) { + } else if (dt instanceof IntegerType || dt instanceof YearMonthIntervalType) { row.setInt(i, getInt(i)); - } else if (dt instanceof LongType) { + } else if (dt instanceof LongType || dt instanceof DayTimeIntervalType) { row.setLong(i, getLong(i)); } else if (dt instanceof FloatType) { row.setFloat(i, getFloat(i)); @@ -151,9 +151,9 @@ public final class ColumnarBatchRow extends InternalRow { return getByte(ordinal); } else if (dataType instanceof ShortType) { return getShort(ordinal); - } else if (dataType instanceof IntegerType) { + } else if (dataType instanceof IntegerType || dataType instanceof YearMonthIntervalType) { return getInt(ordinal); - } else if (dataType instanceof LongType) { + } else if (dataType instanceof LongType || dataType instanceof DayTimeIntervalType) { return getLong(ordinal); } else if (dataType instanceof FloatType) { return getFloat(ordinal); diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java index 4b9d3c5..da4b242 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java @@ -61,9 +61,9 @@ public final class ColumnarRow extends InternalRow { row.setByte(i, getByte(i)); } else if (dt instanceof ShortType) { row.setShort(i, getShort(i)); - } else if (dt instanceof IntegerType) { + } else if (dt instanceof IntegerType || dt instanceof YearMonthIntervalType) { row.setInt(i, getInt(i)); - } else if (dt instanceof LongType) { + } else if (dt instanceof LongType || dt instanceof DayTimeIntervalType) { row.setLong(i, getLong(i)); } else if (dt instanceof FloatType) { row.setFloat(i, getFloat(i)); @@ -160,9 +160,9 @@ public final class ColumnarRow extends InternalRow { return getByte(ordinal); } else if (dataType instanceof ShortType) { return getShort(ordinal); - } else if (dataType instanceof IntegerType) { + } else if (dataType instanceof IntegerType || dataType instanceof YearMonthIntervalType) { return getInt(ordinal); - } else if (dataType instanceof LongType) { + } else if (dataType instanceof LongType || dataType instanceof DayTimeIntervalType) { return getLong(ordinal); } else if (dataType instanceof FloatType) { return getFloat(ordinal); diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala index 43f48ab..cdf41ed 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala @@ -209,6 +209,44 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach { } } + DataTypeTestUtils.yearMonthIntervalTypes.foreach { + dt => + testVectors(dt.typeName, + 10, + dt) { testVector => + (0 until 10).foreach { i => + testVector.appendInt(i) + } + + val array = new ColumnarArray(testVector, 0, 10) + val arrayCopy = array.copy() + + (0 until 10).foreach { i => + assert(array.get(i, dt) === i) + assert(arrayCopy.get(i, dt) === i) + } + } + } + + DataTypeTestUtils.dayTimeIntervalTypes.foreach { + dt => + testVectors(dt.typeName, + 10, + dt) { testVector => + (0 until 10).foreach { i => + testVector.appendLong(i) + } + + val array = new ColumnarArray(testVector, 0, 10) + val arrayCopy = array.copy() + + (0 until 10).foreach { i => + assert(array.get(i, dt) === i) + assert(arrayCopy.get(i, dt) === i) + } + } + } + testVectors("mutable ColumnarRow", 10, IntegerType) { testVector => val mutableRow = new MutableColumnarRow(Array(testVector)) (0 until 10).foreach { i => @@ -536,5 +574,36 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach { } } } + + DataTypeTestUtils.yearMonthIntervalTypes.foreach { dt => + val structType = new StructType().add(dt.typeName, dt) + testVectors("ColumnarRow " + dt.typeName, 10, structType) { v => + val column = v.getChild(0) + (0 until 10).foreach { i => + column.putInt(i, i) + } + (0 until 10).foreach { i => + val row = v.getStruct(i) + val rowCopy = row.copy() + assert(row.get(0, dt) === i) + assert(rowCopy.get(0, dt) === i) + } + } + } + DataTypeTestUtils.dayTimeIntervalTypes.foreach { dt => + val structType = new StructType().add(dt.typeName, dt) + testVectors("ColumnarRow " + dt.typeName, 10, structType) { v => + val column = v.getChild(0) + (0 until 10).foreach { i => + column.putLong(i, i) + } + (0 until 10).foreach { i => + val row = v.getStruct(i) + val rowCopy = row.copy() + assert(row.get(0, dt) === i) + assert(rowCopy.get(0, dt) === i) + } + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala index f01b27e..8921500 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala @@ -1758,4 +1758,36 @@ class ColumnarBatchSuite extends SparkFunSuite { assert(ex.getMessage.contains( "Cannot reserve additional contiguous bytes in the vectorized reader (integer overflow)")) } + + DataTypeTestUtils.yearMonthIntervalTypes.foreach { dt => + testVector(dt.typeName, 10, dt) { + column => + (0 until 10).foreach{ i => + column.putInt(i, i) + } + val bachRow = new ColumnarBatchRow(Array(column)) + (0 until 10).foreach { i => + bachRow.rowId = i + assert(bachRow.get(0, dt) === i) + val batchRowCopy = bachRow.copy() + assert(batchRowCopy.get(0, dt) === i) + } + } + } + + DataTypeTestUtils.dayTimeIntervalTypes.foreach { dt => + testVector(dt.typeName, 10, dt) { + column => + (0 until 10).foreach{ i => + column.putLong(i, i) + } + val bachRow = new ColumnarBatchRow(Array(column)) + (0 until 10).foreach { i => + bachRow.rowId = i + assert(bachRow.get(0, dt) === i) + val batchRowCopy = bachRow.copy() + assert(batchRowCopy.get(0, dt) === i) + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org