This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.1
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 8949e01b68b42a558fd8b60c278ca63f59bcefa8
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Apr 18 10:23:35 2025 +0800

    [core] Use write null for uncompact decimal and timestamp in 
InternalRowSerialize (#5483)
---
 .../data/serializer/InternalRowSerializer.java     | 16 +++++++++-
 .../table/sink/FixedBucketRowKeyExtractorTest.java | 36 ++++++++++++++++++++++
 2 files changed, 51 insertions(+), 1 deletion(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
index ac8cc34e0c..6f02b95a8f 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
@@ -24,14 +24,20 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.BinaryRowWriter;
 import org.apache.paimon.data.BinaryWriter;
 import org.apache.paimon.data.BinaryWriter.ValueSetter;
+import org.apache.paimon.data.Decimal;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.InternalRow.FieldGetter;
 import org.apache.paimon.data.NestedRow;
+import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.io.DataInputView;
 import org.apache.paimon.io.DataOutputView;
 import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeChecks;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.LocalZonedTimestampType;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.TimestampType;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -46,6 +52,7 @@ public class InternalRowSerializer extends 
AbstractRowDataSerializer<InternalRow
     private final Serializer[] fieldSerializers;
     private final FieldGetter[] fieldGetters;
     private final ValueSetter[] valueSetters;
+    private final boolean[] writeNulls;
 
     private transient BinaryRow reuseRow;
     private transient BinaryRowWriter reuseWriter;
@@ -70,11 +77,18 @@ public class InternalRowSerializer extends 
AbstractRowDataSerializer<InternalRow
         this.binarySerializer = new BinaryRowSerializer(types.length);
         this.fieldGetters = new FieldGetter[types.length];
         this.valueSetters = new ValueSetter[types.length];
+        this.writeNulls = new boolean[types.length];
         for (int i = 0; i < types.length; i++) {
             DataType type = types[i];
             fieldGetters[i] = InternalRow.createFieldGetter(type, i);
             // pass serializer to avoid infinite loop
             valueSetters[i] = BinaryWriter.createValueSetter(type, 
fieldSerializers[i]);
+            // see reference: 
org.apache.paimon.codegen.GenerateUtils.binaryWriterWriteNull
+            if (type instanceof DecimalType) {
+                writeNulls[i] = 
!Decimal.isCompact(DataTypeChecks.getPrecision(type));
+            } else if (type instanceof TimestampType || type instanceof 
LocalZonedTimestampType) {
+                writeNulls[i] = 
!Timestamp.isCompact(DataTypeChecks.getPrecision(type));
+            }
         }
     }
 
@@ -157,7 +171,7 @@ public class InternalRowSerializer extends 
AbstractRowDataSerializer<InternalRow
         reuseWriter.writeRowKind(row.getRowKind());
         for (int i = 0; i < types.length; i++) {
             Object field = fieldGetters[i].getFieldOrNull(row);
-            if (field == null) {
+            if (field == null && !writeNulls[i]) {
                 reuseWriter.setNullAt(i);
             } else {
                 valueSetters[i].setValue(reuseWriter, i, field);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractorTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractorTest.java
index f1a4e7d178..01de7d92da 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractorTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractorTest.java
@@ -18,12 +18,17 @@
 
 package org.apache.paimon.table.sink;
 
+import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DecimalType;
 import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.LocalZonedTimestampType;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.TimestampType;
 
 import org.junit.jupiter.api.Test;
 
@@ -32,9 +37,11 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
 
 import static org.apache.paimon.CoreOptions.BUCKET;
 import static org.apache.paimon.CoreOptions.BUCKET_KEY;
+import static 
org.apache.paimon.table.sink.KeyAndBucketExtractor.bucketKeyHashCode;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -71,6 +78,30 @@ public class FixedBucketRowKeyExtractorTest {
                 .hasMessageContaining("Num bucket is illegal");
     }
 
+    @Test
+    public void testUnCompactDecimalAndTimestampNullValueBucketNumber() {
+        GenericRow row = GenericRow.of(null, null, null, 1);
+        int bucketNum = ThreadLocalRandom.current().nextInt(1, 
Integer.MAX_VALUE);
+
+        RowType rowType =
+                new RowType(
+                        Arrays.asList(
+                                new DataField(0, "d", new DecimalType(38, 18)),
+                                new DataField(1, "ltz", new 
LocalZonedTimestampType()),
+                                new DataField(2, "ntz", new TimestampType()),
+                                new DataField(3, "k", new IntType())));
+
+        String[] bucketColsToTest = {"d", "ltz", "ntz"};
+        for (String bucketCol : bucketColsToTest) {
+            FixedBucketRowKeyExtractor extractor = extractor(rowType, "", 
bucketCol, "", bucketNum);
+            BinaryRow binaryRow =
+                    new 
InternalRowSerializer(rowType.project(bucketCol)).toBinaryRow(row);
+            assertThat(bucket(extractor, row))
+                    .isEqualTo(
+                            
KeyAndBucketExtractor.bucket(bucketKeyHashCode(binaryRow), bucketNum));
+        }
+    }
+
     private int bucket(FixedBucketRowKeyExtractor extractor, InternalRow row) {
         extractor.setRecord(row);
         return extractor.bucket();
@@ -92,6 +123,11 @@ public class FixedBucketRowKeyExtractorTest {
                                 new DataField(0, "a", new IntType()),
                                 new DataField(1, "b", new IntType()),
                                 new DataField(2, "c", new IntType())));
+        return extractor(rowType, partK, bk, pk, numBucket);
+    }
+
+    private FixedBucketRowKeyExtractor extractor(
+            RowType rowType, String partK, String bk, String pk, int 
numBucket) {
         List<DataField> fields = TableSchema.newFields(rowType);
         Map<String, String> options = new HashMap<>();
         options.put(BUCKET_KEY.key(), bk);

Reply via email to