This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new bf8f5598d [parquet] Fix that cannot read parquet ROW<DECIMAL> data
(#4533)
bf8f5598d is described below
commit bf8f5598d9a48f907f2346df39507c27877a5952
Author: yuzelin <[email protected]>
AuthorDate: Thu Nov 14 20:28:30 2024 +0800
[parquet] Fix that cannot read parquet ROW<DECIMAL> data (#4533)
---
.../data/columnar/heap/AbstractHeapVector.java | 4 +-
.../data/columnar/heap/ElementCountable.java | 23 ++-------
.../apache/paimon/flink/BatchFileStoreITCase.java | 19 +++++++
.../format/parquet/ParquetReaderFactory.java | 6 ++-
.../format/parquet/reader/NestedColumnReader.java | 3 +-
.../reader/NestedPrimitiveColumnReader.java | 6 +--
.../parquet/reader/ParquetDecimalVector.java | 16 +++++-
.../format/parquet/reader/RowColumnReader.java | 59 ----------------------
8 files changed, 50 insertions(+), 86 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/AbstractHeapVector.java
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/AbstractHeapVector.java
index 702877642..f0e82eac4 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/AbstractHeapVector.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/AbstractHeapVector.java
@@ -25,7 +25,8 @@ import java.nio.ByteOrder;
import java.util.Arrays;
/** Heap vector that nullable shared structure. */
-public abstract class AbstractHeapVector extends AbstractWritableVector {
+public abstract class AbstractHeapVector extends AbstractWritableVector
+ implements ElementCountable {
public static final boolean LITTLE_ENDIAN = ByteOrder.nativeOrder() ==
ByteOrder.LITTLE_ENDIAN;
@@ -116,6 +117,7 @@ public abstract class AbstractHeapVector extends
AbstractWritableVector {
return dictionaryIds;
}
+ @Override
public int getLen() {
return this.len;
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/position/RowPosition.java
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/ElementCountable.java
similarity index 60%
rename from
paimon-format/src/main/java/org/apache/paimon/format/parquet/position/RowPosition.java
rename to
paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/ElementCountable.java
index fb6378349..a32762d65 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/position/RowPosition.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/ElementCountable.java
@@ -16,25 +16,10 @@
* limitations under the License.
*/
-package org.apache.paimon.format.parquet.position;
+package org.apache.paimon.data.columnar.heap;
-import javax.annotation.Nullable;
+/** Container with a known number of elements. */
+public interface ElementCountable {
-/** To represent struct's position in repeated type. */
-public class RowPosition {
- @Nullable private final boolean[] isNull;
- private final int positionsCount;
-
- public RowPosition(boolean[] isNull, int positionsCount) {
- this.isNull = isNull;
- this.positionsCount = positionsCount;
- }
-
- public boolean[] getIsNull() {
- return isNull;
- }
-
- public int getPositionsCount() {
- return positionsCount;
- }
+ int getLen();
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index c30e6cd56..cdc114b04 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -34,6 +34,7 @@ import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+import java.math.BigDecimal;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -573,6 +574,24 @@ public class BatchFileStoreITCase extends
CatalogITCaseBase {
validateCount1NotPushDown(sql);
}
+ @Test
+ public void testParquetRowDecimalAndTimestamp() {
+ sql(
+ "CREATE TABLE parquet_row_decimal(`row` ROW<f0 DECIMAL(2,1)>)
WITH ('file.format' = 'parquet')");
+ sql("INSERT INTO parquet_row_decimal VALUES ( (ROW(1.2)) )");
+
+ assertThat(sql("SELECT * FROM parquet_row_decimal"))
+ .containsExactly(Row.of(Row.of(new BigDecimal("1.2"))));
+
+ sql(
+ "CREATE TABLE parquet_row_timestamp(`row` ROW<f0
TIMESTAMP(0)>) WITH ('file.format' = 'parquet')");
+ sql("INSERT INTO parquet_row_timestamp VALUES (
(ROW(TIMESTAMP'2024-11-13 18:00:00')) )");
+
+ assertThat(sql("SELECT * FROM parquet_row_timestamp"))
+ .containsExactly(
+
Row.of(Row.of(DateTimeUtils.toLocalDateTime("2024-11-13 18:00:00", 0))));
+ }
+
private void validateCount1PushDown(String sql) {
Transformation<?> transformation = AbstractTestBase.translate(tEnv,
sql);
while (!transformation.getInputs().isEmpty()) {
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
index 53b4b1634..f0151d6f3 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.columnar.ColumnVector;
import org.apache.paimon.data.columnar.ColumnarRow;
import org.apache.paimon.data.columnar.ColumnarRowIterator;
import org.apache.paimon.data.columnar.VectorizedColumnBatch;
+import org.apache.paimon.data.columnar.heap.ElementCountable;
import org.apache.paimon.data.columnar.writable.WritableColumnVector;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.format.parquet.reader.ColumnReader;
@@ -293,7 +294,10 @@ public class ParquetReaderFactory implements
FormatReaderFactory {
for (int i = 0; i < writableVectors.length; i++) {
switch (projectedFields[i].type().getTypeRoot()) {
case DECIMAL:
- vectors[i] = new ParquetDecimalVector(writableVectors[i]);
+ vectors[i] =
+ new ParquetDecimalVector(
+ writableVectors[i],
+ ((ElementCountable)
writableVectors[i]).getLen());
break;
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java
index c89c77603..68225fbd1 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java
@@ -20,6 +20,7 @@ package org.apache.paimon.format.parquet.reader;
import org.apache.paimon.data.columnar.ColumnVector;
import org.apache.paimon.data.columnar.heap.AbstractHeapVector;
+import org.apache.paimon.data.columnar.heap.ElementCountable;
import org.apache.paimon.data.columnar.heap.HeapArrayVector;
import org.apache.paimon.data.columnar.heap.HeapMapVector;
import org.apache.paimon.data.columnar.heap.HeapRowVector;
@@ -134,7 +135,7 @@ public class NestedColumnReader implements
ColumnReader<WritableColumnVector> {
String.format("Row field does not have any children: %s.",
field));
}
- int len = ((AbstractHeapVector) finalChildrenVectors[0]).getLen();
+ int len = ((ElementCountable) finalChildrenVectors[0]).getLen();
boolean[] isNull = new boolean[len];
Arrays.fill(isNull, true);
boolean hasNull = false;
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java
index 7ee33a0bb..7d00ff792 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java
@@ -495,7 +495,7 @@ public class NestedPrimitiveColumnReader implements
ColumnReader<WritableColumnV
phiv.vector[i] = ((List<Integer>)
valueList).get(i);
}
}
- return new ParquetDecimalVector(phiv);
+ return new ParquetDecimalVector(phiv, total);
case INT64:
HeapLongVector phlv = new HeapLongVector(total);
for (int i = 0; i < valueList.size(); i++) {
@@ -505,10 +505,10 @@ public class NestedPrimitiveColumnReader implements
ColumnReader<WritableColumnV
phlv.vector[i] = ((List<Long>)
valueList).get(i);
}
}
- return new ParquetDecimalVector(phlv);
+ return new ParquetDecimalVector(phlv, total);
default:
HeapBytesVector phbv = getHeapBytesVector(total,
valueList);
- return new ParquetDecimalVector(phbv);
+ return new ParquetDecimalVector(phbv, total);
}
default:
throw new RuntimeException("Unsupported type in the list: " +
type);
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDecimalVector.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDecimalVector.java
index 28d308bac..42714ab06 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDecimalVector.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDecimalVector.java
@@ -25,6 +25,7 @@ import org.apache.paimon.data.columnar.DecimalColumnVector;
import org.apache.paimon.data.columnar.Dictionary;
import org.apache.paimon.data.columnar.IntColumnVector;
import org.apache.paimon.data.columnar.LongColumnVector;
+import org.apache.paimon.data.columnar.heap.ElementCountable;
import org.apache.paimon.data.columnar.writable.WritableBytesVector;
import org.apache.paimon.data.columnar.writable.WritableColumnVector;
import org.apache.paimon.data.columnar.writable.WritableIntVector;
@@ -38,12 +39,18 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
* {@link DecimalColumnVector} interface.
*/
public class ParquetDecimalVector
- implements DecimalColumnVector, WritableLongVector, WritableIntVector,
WritableBytesVector {
+ implements DecimalColumnVector,
+ WritableLongVector,
+ WritableIntVector,
+ WritableBytesVector,
+ ElementCountable {
private final ColumnVector vector;
+ private final int len;
- public ParquetDecimalVector(ColumnVector vector) {
+ public ParquetDecimalVector(ColumnVector vector, int len) {
this.vector = vector;
+ this.len = len;
}
@Override
@@ -225,4 +232,9 @@ public class ParquetDecimalVector
((WritableLongVector) vector).fill(value);
}
}
+
+ @Override
+ public int getLen() {
+ return len;
+ }
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/RowColumnReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/RowColumnReader.java
deleted file mode 100644
index fa2da03ef..000000000
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/RowColumnReader.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.format.parquet.reader;
-
-import org.apache.paimon.data.columnar.heap.HeapRowVector;
-import org.apache.paimon.data.columnar.writable.WritableColumnVector;
-
-import java.io.IOException;
-import java.util.List;
-
-/** Row {@link ColumnReader}. */
-public class RowColumnReader implements ColumnReader<WritableColumnVector> {
-
- private final List<ColumnReader> fieldReaders;
-
- public RowColumnReader(List<ColumnReader> fieldReaders) {
- this.fieldReaders = fieldReaders;
- }
-
- @Override
- public void readToVector(int readNumber, WritableColumnVector vector)
throws IOException {
- HeapRowVector rowVector = (HeapRowVector) vector;
- WritableColumnVector[] vectors = rowVector.getFields();
- // row vector null array
- boolean[] isNulls = new boolean[readNumber];
- for (int i = 0; i < vectors.length; i++) {
- fieldReaders.get(i).readToVector(readNumber, vectors[i]);
-
- for (int j = 0; j < readNumber; j++) {
- if (i == 0) {
- isNulls[j] = vectors[i].isNullAt(j);
- } else {
- isNulls[j] = isNulls[j] && vectors[i].isNullAt(j);
- }
- if (i == vectors.length - 1 && isNulls[j]) {
- // rowColumnVector[j] is null only when all fields[j] of
rowColumnVector[j] is
- // null
- rowVector.setNullAt(j);
- }
- }
- }
- }
-}