This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.1 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 8949e01b68b42a558fd8b60c278ca63f59bcefa8 Author: Zouxxyy <[email protected]> AuthorDate: Fri Apr 18 10:23:35 2025 +0800 [core] Use write null for uncompact decimal and timestamp in InternalRowSerialize (#5483) --- .../data/serializer/InternalRowSerializer.java | 16 +++++++++- .../table/sink/FixedBucketRowKeyExtractorTest.java | 36 ++++++++++++++++++++++ 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java index ac8cc34e0c..6f02b95a8f 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java @@ -24,14 +24,20 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.BinaryRowWriter; import org.apache.paimon.data.BinaryWriter; import org.apache.paimon.data.BinaryWriter.ValueSetter; +import org.apache.paimon.data.Decimal; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.InternalRow.FieldGetter; import org.apache.paimon.data.NestedRow; +import org.apache.paimon.data.Timestamp; import org.apache.paimon.io.DataInputView; import org.apache.paimon.io.DataOutputView; import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypeChecks; +import org.apache.paimon.types.DecimalType; +import org.apache.paimon.types.LocalZonedTimestampType; import org.apache.paimon.types.RowType; +import org.apache.paimon.types.TimestampType; import java.io.IOException; import java.util.Arrays; @@ -46,6 +52,7 @@ public class InternalRowSerializer extends AbstractRowDataSerializer<InternalRow private final Serializer[] fieldSerializers; private final FieldGetter[] fieldGetters; private final ValueSetter[] valueSetters; + private final boolean[] writeNulls; private transient BinaryRow reuseRow; private transient BinaryRowWriter reuseWriter; @@ -70,11 +77,18 @@ public class InternalRowSerializer extends AbstractRowDataSerializer<InternalRow this.binarySerializer = new BinaryRowSerializer(types.length); this.fieldGetters = new FieldGetter[types.length]; this.valueSetters = new ValueSetter[types.length]; + this.writeNulls = new boolean[types.length]; for (int i = 0; i < types.length; i++) { DataType type = types[i]; fieldGetters[i] = InternalRow.createFieldGetter(type, i); // pass serializer to avoid infinite loop valueSetters[i] = BinaryWriter.createValueSetter(type, fieldSerializers[i]); + // see reference: org.apache.paimon.codegen.GenerateUtils.binaryWriterWriteNull + if (type instanceof DecimalType) { + writeNulls[i] = !Decimal.isCompact(DataTypeChecks.getPrecision(type)); + } else if (type instanceof TimestampType || type instanceof LocalZonedTimestampType) { + writeNulls[i] = !Timestamp.isCompact(DataTypeChecks.getPrecision(type)); + } } } @@ -157,7 +171,7 @@ public class InternalRowSerializer extends AbstractRowDataSerializer<InternalRow reuseWriter.writeRowKind(row.getRowKind()); for (int i = 0; i < types.length; i++) { Object field = fieldGetters[i].getFieldOrNull(row); - if (field == null) { + if (field == null && !writeNulls[i]) { reuseWriter.setNullAt(i); } else { valueSetters[i].setValue(reuseWriter, i, field); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractorTest.java b/paimon-core/src/test/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractorTest.java index f1a4e7d178..01de7d92da 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractorTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractorTest.java @@ -18,12 +18,17 @@ package org.apache.paimon.table.sink; +import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.serializer.InternalRowSerializer; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DecimalType; import org.apache.paimon.types.IntType; +import org.apache.paimon.types.LocalZonedTimestampType; import org.apache.paimon.types.RowType; +import org.apache.paimon.types.TimestampType; import org.junit.jupiter.api.Test; @@ -32,9 +37,11 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; import static org.apache.paimon.CoreOptions.BUCKET; import static org.apache.paimon.CoreOptions.BUCKET_KEY; +import static org.apache.paimon.table.sink.KeyAndBucketExtractor.bucketKeyHashCode; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -71,6 +78,30 @@ public class FixedBucketRowKeyExtractorTest { .hasMessageContaining("Num bucket is illegal"); } + @Test + public void testUnCompactDecimalAndTimestampNullValueBucketNumber() { + GenericRow row = GenericRow.of(null, null, null, 1); + int bucketNum = ThreadLocalRandom.current().nextInt(1, Integer.MAX_VALUE); + + RowType rowType = + new RowType( + Arrays.asList( + new DataField(0, "d", new DecimalType(38, 18)), + new DataField(1, "ltz", new LocalZonedTimestampType()), + new DataField(2, "ntz", new TimestampType()), + new DataField(3, "k", new IntType()))); + + String[] bucketColsToTest = {"d", "ltz", "ntz"}; + for (String bucketCol : bucketColsToTest) { + FixedBucketRowKeyExtractor extractor = extractor(rowType, "", bucketCol, "", bucketNum); + BinaryRow binaryRow = + new InternalRowSerializer(rowType.project(bucketCol)).toBinaryRow(row); + assertThat(bucket(extractor, row)) + .isEqualTo( + KeyAndBucketExtractor.bucket(bucketKeyHashCode(binaryRow), bucketNum)); + } + } + private int bucket(FixedBucketRowKeyExtractor extractor, InternalRow row) { extractor.setRecord(row); return extractor.bucket(); @@ -92,6 +123,11 @@ public class FixedBucketRowKeyExtractorTest { new DataField(0, "a", new IntType()), new DataField(1, "b", new IntType()), new DataField(2, "c", new IntType()))); + return extractor(rowType, partK, bk, pk, numBucket); + } + + private FixedBucketRowKeyExtractor extractor( + RowType rowType, String partK, String bk, String pk, int numBucket) { List<DataField> fields = TableSchema.newFields(rowType); Map<String, String> options = new HashMap<>(); options.put(BUCKET_KEY.key(), bk);
