This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2ae04c24262810646675c490616fd0053d0d0107
Author: slinkydeveloper <francescogu...@gmail.com>
AuthorDate: Wed Nov 24 14:51:34 2021 +0100

    [FLINK-24687][parquet] Copied DecimalDataUtils#is32BitDecimal and 
DecimalDataUtils#is32BitDecimal in ParquetSchemaConverter to remove the 
dependency on DecimalDataUtils (from planner)
    
    Signed-off-by: slinkydeveloper <francescogu...@gmail.com>
---
 .../apache/flink/formats/parquet/row/ParquetRowDataWriter.java |  6 +++---
 .../flink/formats/parquet/utils/ParquetSchemaConverter.java    |  9 +++++++++
 .../flink/formats/parquet/vector/ParquetDecimalVector.java     |  6 +++---
 .../flink/formats/parquet/vector/ParquetSplitReaderUtil.java   | 10 +++++-----
 .../parquet/vector/reader/FixedLenBytesColumnReader.java       | 10 +++++-----
 5 files changed, 25 insertions(+), 16 deletions(-)

diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
index ee1556f..63ec0b0 100644
--- 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.formats.parquet.row;
 
-import org.apache.flink.table.data.DecimalDataUtils;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.types.logical.DecimalType;
@@ -315,8 +315,8 @@ public class ParquetRowDataWriter {
 
         // 1 <= precision <= 18, writes as FIXED_LEN_BYTE_ARRAY
         // optimizer for UnscaledBytesWriter
-        if (DecimalDataUtils.is32BitDecimal(precision)
-                || DecimalDataUtils.is64BitDecimal(precision)) {
+        if (ParquetSchemaConverter.is32BitDecimal(precision)
+                || ParquetSchemaConverter.is64BitDecimal(precision)) {
             return new LongUnscaledBytesWriter();
         }
 
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
index b3a0296..6219439 100644
--- 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
@@ -113,4 +113,13 @@ public class ParquetSchemaConverter {
         }
         return numBytes;
     }
+
+    // From DecimalDataUtils
+    public static boolean is32BitDecimal(int precision) {
+        return precision <= 9;
+    }
+
+    public static boolean is64BitDecimal(int precision) {
+        return precision <= 18 && precision > 9;
+    }
 }
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetDecimalVector.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetDecimalVector.java
index 714e597..6ca1d95 100644
--- 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetDecimalVector.java
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetDecimalVector.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.formats.parquet.vector;
 
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
 import org.apache.flink.table.data.DecimalData;
-import org.apache.flink.table.data.DecimalDataUtils;
 import org.apache.flink.table.data.vector.BytesColumnVector;
 import org.apache.flink.table.data.vector.ColumnVector;
 import org.apache.flink.table.data.vector.DecimalColumnVector;
@@ -40,10 +40,10 @@ public class ParquetDecimalVector implements 
DecimalColumnVector {
 
     @Override
     public DecimalData getDecimal(int i, int precision, int scale) {
-        if (DecimalDataUtils.is32BitDecimal(precision)) {
+        if (ParquetSchemaConverter.is32BitDecimal(precision)) {
             return DecimalData.fromUnscaledLong(
                     ((IntColumnVector) vector).getInt(i), precision, scale);
-        } else if (DecimalDataUtils.is64BitDecimal(precision)) {
+        } else if (ParquetSchemaConverter.is64BitDecimal(precision)) {
             return DecimalData.fromUnscaledLong(
                     ((LongColumnVector) vector).getLong(i), precision, scale);
         } else {
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetSplitReaderUtil.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetSplitReaderUtil.java
index 90f6ee8..8634f5f 100644
--- 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetSplitReaderUtil.java
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetSplitReaderUtil.java
@@ -19,6 +19,7 @@
 package org.apache.flink.formats.parquet.vector;
 
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
 import org.apache.flink.formats.parquet.vector.reader.BooleanColumnReader;
 import org.apache.flink.formats.parquet.vector.reader.ByteColumnReader;
 import org.apache.flink.formats.parquet.vector.reader.BytesColumnReader;
@@ -31,7 +32,6 @@ import 
org.apache.flink.formats.parquet.vector.reader.LongColumnReader;
 import org.apache.flink.formats.parquet.vector.reader.ShortColumnReader;
 import org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader;
 import org.apache.flink.table.data.DecimalData;
-import org.apache.flink.table.data.DecimalDataUtils;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.data.vector.ColumnVector;
 import org.apache.flink.table.data.vector.VectorizedColumnBatch;
@@ -202,13 +202,13 @@ public class ParquetSplitReaderUtil {
                                         DecimalData.fromBigDecimal(
                                                 (BigDecimal) value, precision, 
scale));
                 ColumnVector internalVector;
-                if (DecimalDataUtils.is32BitDecimal(precision)) {
+                if (ParquetSchemaConverter.is32BitDecimal(precision)) {
                     internalVector =
                             createVectorFromConstant(
                                     new IntType(),
                                     decimal == null ? null : (int) 
decimal.toUnscaledLong(),
                                     batchSize);
-                } else if (DecimalDataUtils.is64BitDecimal(precision)) {
+                } else if (ParquetSchemaConverter.is64BitDecimal(precision)) {
                     internalVector =
                             createVectorFromConstant(
                                     new BigIntType(),
@@ -371,7 +371,7 @@ public class ParquetSplitReaderUtil {
                 return new HeapTimestampVector(batchSize);
             case DECIMAL:
                 DecimalType decimalType = (DecimalType) fieldType;
-                if 
(DecimalDataUtils.is32BitDecimal(decimalType.getPrecision())) {
+                if 
(ParquetSchemaConverter.is32BitDecimal(decimalType.getPrecision())) {
                     checkArgument(
                             (typeName == 
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
                                             || typeName == 
PrimitiveType.PrimitiveTypeName.INT32)
@@ -379,7 +379,7 @@ public class ParquetSplitReaderUtil {
                             "Unexpected type: %s",
                             typeName);
                     return new HeapIntVector(batchSize);
-                } else if 
(DecimalDataUtils.is64BitDecimal(decimalType.getPrecision())) {
+                } else if 
(ParquetSchemaConverter.is64BitDecimal(decimalType.getPrecision())) {
                     checkArgument(
                             (typeName == 
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
                                             || typeName == 
PrimitiveType.PrimitiveTypeName.INT64)
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/FixedLenBytesColumnReader.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/FixedLenBytesColumnReader.java
index aa8d949..9b12067 100644
--- 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/FixedLenBytesColumnReader.java
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/FixedLenBytesColumnReader.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.formats.parquet.vector.reader;
 
-import org.apache.flink.table.data.DecimalDataUtils;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
 import org.apache.flink.table.data.vector.writable.WritableBytesVector;
 import org.apache.flink.table.data.vector.writable.WritableColumnVector;
 import org.apache.flink.table.data.vector.writable.WritableIntVector;
@@ -47,7 +47,7 @@ public class FixedLenBytesColumnReader<VECTOR extends 
WritableColumnVector>
     @Override
     protected void readBatch(int rowId, int num, VECTOR column) {
         int bytesLen = descriptor.getPrimitiveType().getTypeLength();
-        if (DecimalDataUtils.is32BitDecimal(precision)) {
+        if (ParquetSchemaConverter.is32BitDecimal(precision)) {
             WritableIntVector intVector = (WritableIntVector) column;
             for (int i = 0; i < num; i++) {
                 if (runLenDecoder.readInteger() == maxDefLevel) {
@@ -56,7 +56,7 @@ public class FixedLenBytesColumnReader<VECTOR extends 
WritableColumnVector>
                     intVector.setNullAt(rowId + i);
                 }
             }
-        } else if (DecimalDataUtils.is64BitDecimal(precision)) {
+        } else if (ParquetSchemaConverter.is64BitDecimal(precision)) {
             WritableLongVector longVector = (WritableLongVector) column;
             for (int i = 0; i < num; i++) {
                 if (runLenDecoder.readInteger() == maxDefLevel) {
@@ -81,7 +81,7 @@ public class FixedLenBytesColumnReader<VECTOR extends 
WritableColumnVector>
     @Override
     protected void readBatchFromDictionaryIds(
             int rowId, int num, VECTOR column, WritableIntVector 
dictionaryIds) {
-        if (DecimalDataUtils.is32BitDecimal(precision)) {
+        if (ParquetSchemaConverter.is32BitDecimal(precision)) {
             WritableIntVector intVector = (WritableIntVector) column;
             for (int i = rowId; i < rowId + num; ++i) {
                 if (!intVector.isNullAt(i)) {
@@ -89,7 +89,7 @@ public class FixedLenBytesColumnReader<VECTOR extends 
WritableColumnVector>
                     intVector.setInt(i, (int) heapBinaryToLong(v));
                 }
             }
-        } else if (DecimalDataUtils.is64BitDecimal(precision)) {
+        } else if (ParquetSchemaConverter.is64BitDecimal(precision)) {
             WritableLongVector longVector = (WritableLongVector) column;
             for (int i = rowId; i < rowId + num; ++i) {
                 if (!longVector.isNullAt(i)) {

Reply via email to