This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 50d658d710 [iceberg] Improved ByteBuffer string conversion for Iceberg
manifests (#5008)
50d658d710 is described below
commit 50d658d7108ae2e7b3ed460ac1129aa4a2294d26
Author: 0dunay0 <[email protected]>
AuthorDate: Mon Feb 10 05:28:10 2025 +0000
[iceberg] Improved ByteBuffer string conversion for Iceberg manifests
(#5008)
---
.../iceberg/manifest/IcebergConversions.java | 14 +-
.../paimon/iceberg/IcebergCompatibilityTest.java | 105 ++++++++++++
.../manifest/IcebergConversionsVarcharTest.java | 181 +++++++++++++++++++++
3 files changed, 299 insertions(+), 1 deletion(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java
index 9048d46e44..b20c55ee67 100644
---
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java
@@ -73,7 +73,19 @@ public class IcebergConversions {
case VARCHAR:
CharBuffer buffer = CharBuffer.wrap(value.toString());
try {
- return ENCODER.get().encode(buffer);
+ ByteBuffer encoded = ENCODER.get().encode(buffer);
+ // ByteBuffer and CharBuffer allocate space based on
capacity
+ // not actual content length. so we need to create a new
ByteBuffer
+ // with the exact length of the encoded content
+ // to avoid padding the output with \u0000
+ if (encoded.limit() != encoded.capacity()) {
+ ByteBuffer exact =
ByteBuffer.allocate(encoded.limit());
+ encoded.position(0);
+ exact.put(encoded);
+ exact.flip();
+ return exact;
+ }
+ return encoded;
} catch (CharacterCodingException e) {
throw new RuntimeException("Failed to encode value as
UTF-8: " + value, e);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
index 7258a1dd41..f29eb55113 100644
---
a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
@@ -53,6 +53,11 @@ import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.SeekableFileInput;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.IcebergGenerics;
@@ -63,7 +68,10 @@ import org.apache.iceberg.io.CloseableIterable;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import java.io.File;
import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.ArrayList;
@@ -595,6 +603,103 @@ public class IcebergCompatibilityTest {
"Record(2, {20=[Record(cherry, 200), Record(pear,
201)]})");
}
+ @Test
+ public void testStringPartitionNullPadding() throws Exception {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT(),
DataTypes.VARCHAR(20)},
+ new String[] {"k", "country"});
+ FileStoreTable table =
+ createPaimonTable(
+ rowType,
+ Collections.singletonList("country"),
+ Collections.singletonList("k"),
+ -1);
+
+ String commitUser = UUID.randomUUID().toString();
+ TableWriteImpl<?> write = table.newWrite(commitUser);
+ TableCommitImpl commit = table.newCommit(commitUser);
+
+ write.write(GenericRow.of(1, BinaryString.fromString("Switzerland")),
1);
+ write.write(GenericRow.of(2, BinaryString.fromString("Australia")), 1);
+ write.write(GenericRow.of(3, BinaryString.fromString("Brazil")), 1);
+ write.write(GenericRow.of(4, BinaryString.fromString("Grand Duchy of
Luxembourg")), 1);
+ commit.commit(1, write.prepareCommit(false, 1));
+ assertThat(getIcebergResult())
+ .containsExactlyInAnyOrder(
+ "Record(1, Switzerland)",
+ "Record(2, Australia)",
+ "Record(3, Brazil)",
+ "Record(4, Grand Duchy of Luxembourg)");
+
+ FileIO fileIO = table.fileIO();
+ IcebergMetadata metadata =
+ IcebergMetadata.fromPath(
+ fileIO, new Path(table.location(),
"metadata/v1.metadata.json"));
+
+ IcebergPathFactory pathFactory =
+ new IcebergPathFactory(new Path(table.location(), "metadata"));
+ IcebergManifestList manifestList = IcebergManifestList.create(table,
pathFactory);
+ String currentSnapshotManifest =
metadata.currentSnapshot().manifestList();
+
+ File snapShotAvroFile = new File(currentSnapshotManifest);
+ String expectedPartitionSummary =
+ "[{\"contains_null\": false, \"contains_nan\": false,
\"lower_bound\": \"Australia\", \"upper_bound\": \"Switzerland\"}]";
+ try (DataFileReader<GenericRecord> dataFileReader =
+ new DataFileReader<>(
+ new SeekableFileInput(snapShotAvroFile), new
GenericDatumReader<>())) {
+ while (dataFileReader.hasNext()) {
+ GenericRecord record = dataFileReader.next();
+ String partitionSummary = record.get("partitions").toString();
+ assertThat(partitionSummary).doesNotContain("\\u0000");
+
assertThat(partitionSummary).isEqualTo(expectedPartitionSummary);
+ }
+ }
+
+ String tableManifest =
manifestList.read(snapShotAvroFile.getName()).get(0).manifestPath();
+
+ try (DataFileReader<GenericRecord> dataFileReader =
+ new DataFileReader<>(
+ new SeekableFileInput(new File(tableManifest)),
+ new GenericDatumReader<>())) {
+
+ while (dataFileReader.hasNext()) {
+ GenericRecord record = dataFileReader.next();
+ GenericRecord dataFile = (GenericRecord)
record.get("data_file");
+
+ // Check lower bounds
+ GenericData.Array<?> lowerBounds =
+ (GenericData.Array<?>) dataFile.get("lower_bounds");
+ if (lowerBounds != null) {
+ for (Object bound : lowerBounds) {
+ GenericRecord boundRecord = (GenericRecord) bound;
+ int key = (Integer) boundRecord.get("key");
+ if (key == 1) { // key = 1 is the partition key
+ ByteBuffer value = (ByteBuffer)
boundRecord.get("value");
+ String boundValue = new String(value.array(),
StandardCharsets.UTF_8);
+ assertThat(boundValue).doesNotContain("\u0000");
+ }
+ }
+ }
+
+ // Check upper bounds
+ GenericData.Array<?> upperBounds =
+ (GenericData.Array<?>) dataFile.get("upper_bounds");
+ if (upperBounds != null) {
+ for (Object bound : upperBounds) {
+ GenericRecord boundRecord = (GenericRecord) bound;
+ int key = (Integer) boundRecord.get("key");
+ if (key == 1) { // key = 1 is the partition key
+ ByteBuffer value = (ByteBuffer)
boundRecord.get("value");
+ String boundValue = new String(value.array(),
StandardCharsets.UTF_8);
+ assertThat(boundValue).doesNotContain("\u0000");
+ }
+ }
+ }
+ }
+ }
+ }
+
// ------------------------------------------------------------------------
// Random Tests
// ------------------------------------------------------------------------
diff --git
a/paimon-core/src/test/java/org/apache/paimon/iceberg/manifest/IcebergConversionsVarcharTest.java
b/paimon-core/src/test/java/org/apache/paimon/iceberg/manifest/IcebergConversionsVarcharTest.java
new file mode 100644
index 0000000000..c53e6bb0d0
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/iceberg/manifest/IcebergConversionsVarcharTest.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.manifest;
+
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+class IcebergConversionsVarcharTest {
+
+ @Test
+ void testEmptyString() {
+ String empty = "";
+ ByteBuffer result =
IcebergConversions.toByteBuffer(DataTypes.VARCHAR(10), empty);
+ String decodedString = new String(result.array(),
StandardCharsets.UTF_8);
+ assertThat(result.array()).isEmpty();
+ assertThat(empty).isEqualTo(decodedString);
+ }
+
+ @Test
+ void testNullHandling() {
+ assertThatThrownBy(() ->
IcebergConversions.toByteBuffer(DataTypes.VARCHAR(10), null))
+ .isInstanceOf(NullPointerException.class);
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideSpecialStrings")
+ @DisplayName("Test special string cases")
+ void testSpecialStrings(String input) {
+ ByteBuffer result =
IcebergConversions.toByteBuffer(DataTypes.VARCHAR(100), input);
+ String decoded = new String(result.array(), 0, result.limit(),
StandardCharsets.UTF_8);
+ assertThat(decoded).isEqualTo(input);
+ }
+
+ private static Stream<Arguments> provideSpecialStrings() {
+ return Stream.of(
+ Arguments.of("Hello\u0000World"), // Embedded null
+ Arguments.of("\n\r\t"), // Control characters
+ Arguments.of(" "), // Single space
+ Arguments.of(" "), // Multiple spaces
+ Arguments.of("①②③"), // Unicode numbers
+ Arguments.of("🌟🌞🌝"), // Emojis
+ Arguments.of("Hello\uD83D\uDE00World"), // Surrogate pairs
+ Arguments.of("\uFEFF"), // Byte Order Mark
+ Arguments.of("Hello\\World"), // Backslashes
+ Arguments.of("Hello\"World"), // Quotes
+ Arguments.of("Hello'World"), // Single quotes
+ Arguments.of("Hello\bWorld"), // Backspace
+ Arguments.of("Hello\fWorld") // Form feed
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideLongStrings")
+ void testLongStrings(String input) {
+ ByteBuffer result =
+
IcebergConversions.toByteBuffer(DataTypes.VARCHAR(input.length()), input);
+ String decoded = new String(result.array(), 0, result.limit(),
StandardCharsets.UTF_8);
+ assertThat(decoded).isEqualTo(input).hasSize(input.length());
+ }
+
+ private static Stream<Arguments> provideLongStrings() {
+ return Stream.of(
+ Arguments.of(createString(1)),
+ Arguments.of(createString(10)),
+ Arguments.of(createString(100)),
+ Arguments.of(createString(1000)),
+ Arguments.of(createString(10000)));
+ }
+
+ private static String createString(int length) {
+ StringBuilder sb = new StringBuilder(length);
+ for (int i = 0; i < length; i++) {
+ sb.append('a');
+ }
+ return sb.toString();
+ }
+
+ @Test
+ void testMultiByteCharacters() {
+ String[] inputs = {
+ "中文", // Chinese
+ "한글", // Korean
+ "日本語", // Japanese
+ "🌟", // Emoji (4 bytes)
+ "Café", // Latin-1 Supplement
+ "Привет", // Cyrillic
+ "שָׁלוֹם", // Hebrew with combining marks
+ "ᄀᄁᄂᄃᄄ", // Hangul Jamo
+ "बहुत बढ़िया", // Devanagari
+ "العربية" // Arabic
+ };
+
+ for (String input : inputs) {
+ ByteBuffer result =
+
IcebergConversions.toByteBuffer(DataTypes.VARCHAR(input.length() * 4), input);
+ String decoded = new String(result.array(), 0, result.limit(),
StandardCharsets.UTF_8);
+ assertThat(decoded).isEqualTo(input);
+ assertThat(result.limit()).isGreaterThanOrEqualTo(input.length());
+ }
+ }
+
+ @Test
+ void testBufferProperties() {
+ String input = "Hello, World!";
+ ByteBuffer result =
+
IcebergConversions.toByteBuffer(DataTypes.VARCHAR(input.length()), input);
+
+ assertThat(result.limit()).isEqualTo(result.array().length);
+ assertThat(containsTrailingZeros(result)).isFalse();
+ }
+
+ @Test
+ void testConcurrentAccess() throws InterruptedException {
+ int threadCount = 10;
+ Thread[] threads = new Thread[threadCount];
+ String[] inputs = new String[threadCount];
+ ByteBuffer[] results = new ByteBuffer[threadCount];
+
+ for (int i = 0; i < threadCount; i++) {
+ final int index = i;
+ inputs[index] = "Thread" + index;
+ threads[index] =
+ new Thread(
+ () -> {
+ results[index] =
+ IcebergConversions.toByteBuffer(
+
DataTypes.VARCHAR(inputs[index].length()),
+ inputs[index]);
+ });
+ threads[index].start();
+ }
+
+ for (Thread thread : threads) {
+ thread.join();
+ }
+
+ for (int i = 0; i < threadCount; i++) {
+ String decoded =
+ new String(results[i].array(), 0, results[i].limit(),
StandardCharsets.UTF_8);
+ assertThat(decoded).isEqualTo(inputs[i]);
+ }
+ }
+
+ private boolean containsTrailingZeros(ByteBuffer buffer) {
+ byte[] array = buffer.array();
+ for (int i = buffer.limit(); i < array.length; i++) {
+ if (array[i] != 0) {
+ return true;
+ }
+ }
+ return false;
+ }
+}