This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4901361d8a [core] Lookup supports Variant type (#6618)
4901361d8a is described below
commit 4901361d8adc1d27ba02dda0ec3472467480e61e
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Nov 18 10:59:15 2025 +0800
[core] Lookup supports Variant type (#6618)
---
.../data/serializer/RowCompactedSerializer.java | 25 +++++
.../apache/paimon/data/serializer/Serializer.java | 29 ++++++
.../serializer/RowCompactedSerializerTest.java | 20 ++++
.../flink/lookup/LookupVariantTableTest.java | 114 +++++++++++++++++++++
4 files changed, 188 insertions(+)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/RowCompactedSerializer.java
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/RowCompactedSerializer.java
index 3529349588..30c93c30b2 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/RowCompactedSerializer.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/RowCompactedSerializer.java
@@ -29,6 +29,7 @@ import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.InternalRow.FieldGetter;
import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.io.DataInputView;
import org.apache.paimon.io.DataOutputView;
import org.apache.paimon.memory.MemorySegment;
@@ -213,6 +214,19 @@ public class RowCompactedSerializer implements
Serializer<InternalRow> {
(writer, pos, value) ->
writer.writeTimestamp((Timestamp) value,
timestampPrecision);
break;
+ case VARIANT:
+ fieldWriter =
+ (writer, pos, value) -> {
+ Variant variant = (Variant) value;
+ byte[] bytes;
+ try {
+ bytes =
VariantSerializer.INSTANCE.serializeToBytes(variant);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ writer.writeBinary(bytes);
+ };
+ break;
case ARRAY:
Serializer<InternalArray> arraySerializer =
InternalSerializers.create(fieldType);
fieldWriter =
@@ -301,6 +315,17 @@ public class RowCompactedSerializer implements
Serializer<InternalRow> {
final int timestampPrecision = getPrecision(fieldType);
fieldReader = (reader, pos) ->
reader.readTimestamp(timestampPrecision);
break;
+ case VARIANT:
+ fieldReader =
+ (reader, pos) -> {
+ byte[] bytes = reader.readBinary();
+ try {
+ return
VariantSerializer.INSTANCE.deserializeFromBytes(bytes);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ };
+ break;
case ARRAY:
fieldReader = (reader, pos) -> reader.readArray();
break;
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/Serializer.java
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/Serializer.java
index ebcb02d704..5e35569a33 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/Serializer.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/Serializer.java
@@ -19,8 +19,12 @@
package org.apache.paimon.data.serializer;
import org.apache.paimon.io.DataInputView;
+import org.apache.paimon.io.DataInputViewStreamWrapper;
import org.apache.paimon.io.DataOutputView;
+import org.apache.paimon.io.DataOutputViewStreamWrapper;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serializable;
@@ -66,6 +70,31 @@ public interface Serializer<T> extends Serializable {
*/
T deserialize(DataInputView source) throws IOException;
+ /**
+ * Serializes the given record to byte array.
+ *
+ * @param record The record to serialize.
+ * @return The serialized element.
+ */
+ default byte[] serializeToBytes(T record) throws IOException {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ DataOutputViewStreamWrapper view = new
DataOutputViewStreamWrapper(out);
+ serialize(record, view);
+ return out.toByteArray();
+ }
+
+ /**
+ * De-serializes a record from byte array.
+ *
+ * @param bytes The byte array to de-serialize.
+ * @return The deserialized element.
+ */
+ default T deserializeFromBytes(byte[] bytes) throws IOException {
+ ByteArrayInputStream in = new ByteArrayInputStream(bytes);
+ DataInputViewStreamWrapper view = new DataInputViewStreamWrapper(in);
+ return deserialize(view);
+ }
+
/**
* Serializes the given record to string.
*
diff --git
a/paimon-common/src/test/java/org/apache/paimon/data/serializer/RowCompactedSerializerTest.java
b/paimon-common/src/test/java/org/apache/paimon/data/serializer/RowCompactedSerializerTest.java
index 919ff4f9dc..6e03a56999 100644
---
a/paimon-common/src/test/java/org/apache/paimon/data/serializer/RowCompactedSerializerTest.java
+++
b/paimon-common/src/test/java/org/apache/paimon/data/serializer/RowCompactedSerializerTest.java
@@ -21,6 +21,7 @@ package org.apache.paimon.data.serializer;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.variant.GenericVariant;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
@@ -178,6 +179,25 @@ abstract class RowCompactedSerializerTest extends
SerializerTestInstance<Interna
}
}
+ static final class VariantTypesTest extends RowCompactedSerializerTest {
+ public VariantTypesTest() {
+ super(getRowSerializer(), getData());
+ }
+
+ private static InternalRow[] getData() {
+ return new GenericRow[] {
+ GenericRow.of(null, null),
+ GenericRow.of(
+
GenericVariant.fromJson("{\"age\":27,\"city\":\"Beijing\"}"),
+
GenericVariant.fromJson("{\"age\":29,\"city\":\"Shanghai\"}"))
+ };
+ }
+
+ private static RowCompactedSerializer getRowSerializer() {
+ return new RowCompactedSerializer(RowType.of(DataTypes.VARIANT(),
DataTypes.VARIANT()));
+ }
+ }
+
static final class NestedInternalRowTest extends
RowCompactedSerializerTest {
private static final RowType NESTED_DATA_TYPE =
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupVariantTableTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupVariantTableTest.java
new file mode 100644
index 0000000000..7dfe6d74a2
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupVariantTableTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lookup;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.variant.GenericVariant;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.disk.IOManagerImpl;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VariantType;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.paimon.CoreOptions.MergeEngine.PARTIAL_UPDATE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link LookupTable} with variant type. */
+public class LookupVariantTableTest extends TableTestBase {
+
+ @TempDir java.nio.file.Path tempDir;
+ private IOManager ioManager;
+
+ @BeforeEach
+ public void before() throws IOException {
+ this.ioManager = new IOManagerImpl(tempDir.toString());
+ }
+
+ @Test
+ public void testRemoteFile() throws Exception {
+ Options options = new Options();
+ options.set(CoreOptions.BUCKET, 1);
+ options.set(CoreOptions.DELETION_VECTORS_ENABLED, true);
+ options.set(CoreOptions.LOOKUP_REMOTE_FILE_ENABLED, true);
+ options.set(CoreOptions.MERGE_ENGINE, PARTIAL_UPDATE);
+ Identifier identifier = new Identifier("default", "t");
+ Schema schema =
+ new Schema(
+ RowType.of(new IntType(), new IntType(), new
VariantType()).getFields(),
+ Collections.emptyList(),
+ Collections.singletonList("f0"),
+ options.toMap(),
+ null);
+ catalog.createTable(identifier, schema, false);
+ FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
+ BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+
+ // first write
+ try (BatchTableWrite write =
writeBuilder.newWrite().withIOManager(ioManager);
+ BatchTableCommit commit = writeBuilder.newCommit()) {
+ write.write(
+ GenericRow.of(
+ 1, null,
GenericVariant.fromJson("{\"age\":27,\"city\":\"Beijing\"}")));
+ commit.commit(write.prepareCommit());
+ }
+
+ // second write to trigger ser and deser
+ try (BatchTableWrite write =
writeBuilder.newWrite().withIOManager(ioManager);
+ BatchTableCommit commit = writeBuilder.newCommit()) {
+ write.write(GenericRow.of(1, 1, null));
+ commit.commit(write.prepareCommit());
+ }
+
+ // read to assert
+ ReadBuilder readBuilder = table.newReadBuilder();
+ List<String> result = new ArrayList<>();
+ readBuilder
+ .newRead()
+ .createReader(readBuilder.newScan().plan())
+ .forEachRemaining(
+ row -> {
+ result.add(
+ row.getInt(0)
+ + "-"
+ + row.getInt(1)
+ + "-"
+ + row.getVariant(2).toJson());
+ });
+
assertThat(result).containsOnly("1-1-{\"age\":27,\"city\":\"Beijing\"}");
+ }
+}