This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2ae04c24262810646675c490616fd0053d0d0107 Author: slinkydeveloper <francescogu...@gmail.com> AuthorDate: Wed Nov 24 14:51:34 2021 +0100 [FLINK-24687][parquet] Copied DecimalDataUtils#is32BitDecimal and DecimalDataUtils#is32BitDecimal in ParquetSchemaConverter to remove the dependency on DecimalDataUtils (from planner) Signed-off-by: slinkydeveloper <francescogu...@gmail.com> --- .../apache/flink/formats/parquet/row/ParquetRowDataWriter.java | 6 +++--- .../flink/formats/parquet/utils/ParquetSchemaConverter.java | 9 +++++++++ .../flink/formats/parquet/vector/ParquetDecimalVector.java | 6 +++--- .../flink/formats/parquet/vector/ParquetSplitReaderUtil.java | 10 +++++----- .../parquet/vector/reader/FixedLenBytesColumnReader.java | 10 +++++----- 5 files changed, 25 insertions(+), 16 deletions(-) diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java index ee1556f..63ec0b0 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java @@ -18,7 +18,7 @@ package org.apache.flink.formats.parquet.row; -import org.apache.flink.table.data.DecimalDataUtils; +import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.types.logical.DecimalType; @@ -315,8 +315,8 @@ public class ParquetRowDataWriter { // 1 <= precision <= 18, writes as FIXED_LEN_BYTE_ARRAY // optimizer for UnscaledBytesWriter - if (DecimalDataUtils.is32BitDecimal(precision) - || DecimalDataUtils.is64BitDecimal(precision)) { + if (ParquetSchemaConverter.is32BitDecimal(precision) + || ParquetSchemaConverter.is64BitDecimal(precision)) { return new LongUnscaledBytesWriter(); } diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java index b3a0296..6219439 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java @@ -113,4 +113,13 @@ public class ParquetSchemaConverter { } return numBytes; } + + // From DecimalDataUtils + public static boolean is32BitDecimal(int precision) { + return precision <= 9; + } + + public static boolean is64BitDecimal(int precision) { + return precision <= 18 && precision > 9; + } } diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetDecimalVector.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetDecimalVector.java index 714e597..6ca1d95 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetDecimalVector.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetDecimalVector.java @@ -18,8 +18,8 @@ package org.apache.flink.formats.parquet.vector; +import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter; import org.apache.flink.table.data.DecimalData; -import org.apache.flink.table.data.DecimalDataUtils; import org.apache.flink.table.data.vector.BytesColumnVector; import org.apache.flink.table.data.vector.ColumnVector; import org.apache.flink.table.data.vector.DecimalColumnVector; @@ -40,10 +40,10 @@ public class ParquetDecimalVector implements DecimalColumnVector { @Override public DecimalData getDecimal(int i, int precision, int scale) { - if (DecimalDataUtils.is32BitDecimal(precision)) { + if (ParquetSchemaConverter.is32BitDecimal(precision)) { return DecimalData.fromUnscaledLong( ((IntColumnVector) vector).getInt(i), precision, scale); - } else if (DecimalDataUtils.is64BitDecimal(precision)) { + } else if (ParquetSchemaConverter.is64BitDecimal(precision)) { return DecimalData.fromUnscaledLong( ((LongColumnVector) vector).getLong(i), precision, scale); } else { diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetSplitReaderUtil.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetSplitReaderUtil.java index 90f6ee8..8634f5f 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetSplitReaderUtil.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetSplitReaderUtil.java @@ -19,6 +19,7 @@ package org.apache.flink.formats.parquet.vector; import org.apache.flink.core.fs.Path; +import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter; import org.apache.flink.formats.parquet.vector.reader.BooleanColumnReader; import org.apache.flink.formats.parquet.vector.reader.ByteColumnReader; import org.apache.flink.formats.parquet.vector.reader.BytesColumnReader; @@ -31,7 +32,6 @@ import org.apache.flink.formats.parquet.vector.reader.LongColumnReader; import org.apache.flink.formats.parquet.vector.reader.ShortColumnReader; import org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader; import org.apache.flink.table.data.DecimalData; -import org.apache.flink.table.data.DecimalDataUtils; import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.data.vector.ColumnVector; import org.apache.flink.table.data.vector.VectorizedColumnBatch; @@ -202,13 +202,13 @@ public class ParquetSplitReaderUtil { DecimalData.fromBigDecimal( (BigDecimal) value, precision, scale)); ColumnVector internalVector; - if (DecimalDataUtils.is32BitDecimal(precision)) { + if (ParquetSchemaConverter.is32BitDecimal(precision)) { internalVector = createVectorFromConstant( new IntType(), decimal == null ? null : (int) decimal.toUnscaledLong(), batchSize); - } else if (DecimalDataUtils.is64BitDecimal(precision)) { + } else if (ParquetSchemaConverter.is64BitDecimal(precision)) { internalVector = createVectorFromConstant( new BigIntType(), @@ -371,7 +371,7 @@ public class ParquetSplitReaderUtil { return new HeapTimestampVector(batchSize); case DECIMAL: DecimalType decimalType = (DecimalType) fieldType; - if (DecimalDataUtils.is32BitDecimal(decimalType.getPrecision())) { + if (ParquetSchemaConverter.is32BitDecimal(decimalType.getPrecision())) { checkArgument( (typeName == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY || typeName == PrimitiveType.PrimitiveTypeName.INT32) @@ -379,7 +379,7 @@ public class ParquetSplitReaderUtil { "Unexpected type: %s", typeName); return new HeapIntVector(batchSize); - } else if (DecimalDataUtils.is64BitDecimal(decimalType.getPrecision())) { + } else if (ParquetSchemaConverter.is64BitDecimal(decimalType.getPrecision())) { checkArgument( (typeName == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY || typeName == PrimitiveType.PrimitiveTypeName.INT64) diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/FixedLenBytesColumnReader.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/FixedLenBytesColumnReader.java index aa8d949..9b12067 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/FixedLenBytesColumnReader.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/FixedLenBytesColumnReader.java @@ -17,7 +17,7 @@ package org.apache.flink.formats.parquet.vector.reader; -import org.apache.flink.table.data.DecimalDataUtils; +import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter; import org.apache.flink.table.data.vector.writable.WritableBytesVector; import org.apache.flink.table.data.vector.writable.WritableColumnVector; import org.apache.flink.table.data.vector.writable.WritableIntVector; @@ -47,7 +47,7 @@ public class FixedLenBytesColumnReader<VECTOR extends WritableColumnVector> @Override protected void readBatch(int rowId, int num, VECTOR column) { int bytesLen = descriptor.getPrimitiveType().getTypeLength(); - if (DecimalDataUtils.is32BitDecimal(precision)) { + if (ParquetSchemaConverter.is32BitDecimal(precision)) { WritableIntVector intVector = (WritableIntVector) column; for (int i = 0; i < num; i++) { if (runLenDecoder.readInteger() == maxDefLevel) { @@ -56,7 +56,7 @@ public class FixedLenBytesColumnReader<VECTOR extends WritableColumnVector> intVector.setNullAt(rowId + i); } } - } else if (DecimalDataUtils.is64BitDecimal(precision)) { + } else if (ParquetSchemaConverter.is64BitDecimal(precision)) { WritableLongVector longVector = (WritableLongVector) column; for (int i = 0; i < num; i++) { if (runLenDecoder.readInteger() == maxDefLevel) { @@ -81,7 +81,7 @@ public class FixedLenBytesColumnReader<VECTOR extends WritableColumnVector> @Override protected void readBatchFromDictionaryIds( int rowId, int num, VECTOR column, WritableIntVector dictionaryIds) { - if (DecimalDataUtils.is32BitDecimal(precision)) { + if (ParquetSchemaConverter.is32BitDecimal(precision)) { WritableIntVector intVector = (WritableIntVector) column; for (int i = rowId; i < rowId + num; ++i) { if (!intVector.isNullAt(i)) { @@ -89,7 +89,7 @@ public class FixedLenBytesColumnReader<VECTOR extends WritableColumnVector> intVector.setInt(i, (int) heapBinaryToLong(v)); } } - } else if (DecimalDataUtils.is64BitDecimal(precision)) { + } else if (ParquetSchemaConverter.is64BitDecimal(precision)) { WritableLongVector longVector = (WritableLongVector) column; for (int i = rowId; i < rowId + num; ++i) { if (!longVector.isNullAt(i)) {