This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4901361d8a [core] Lookup supports Variant type (#6618)
4901361d8a is described below

commit 4901361d8adc1d27ba02dda0ec3472467480e61e
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Nov 18 10:59:15 2025 +0800

    [core] Lookup supports Variant type (#6618)
---
 .../data/serializer/RowCompactedSerializer.java    |  25 +++++
 .../apache/paimon/data/serializer/Serializer.java  |  29 ++++++
 .../serializer/RowCompactedSerializerTest.java     |  20 ++++
 .../flink/lookup/LookupVariantTableTest.java       | 114 +++++++++++++++++++++
 4 files changed, 188 insertions(+)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/RowCompactedSerializer.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/RowCompactedSerializer.java
index 3529349588..30c93c30b2 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/RowCompactedSerializer.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/RowCompactedSerializer.java
@@ -29,6 +29,7 @@ import org.apache.paimon.data.InternalMap;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.InternalRow.FieldGetter;
 import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.data.variant.Variant;
 import org.apache.paimon.io.DataInputView;
 import org.apache.paimon.io.DataOutputView;
 import org.apache.paimon.memory.MemorySegment;
@@ -213,6 +214,19 @@ public class RowCompactedSerializer implements 
Serializer<InternalRow> {
                         (writer, pos, value) ->
                                 writer.writeTimestamp((Timestamp) value, 
timestampPrecision);
                 break;
+            case VARIANT:
+                fieldWriter =
+                        (writer, pos, value) -> {
+                            Variant variant = (Variant) value;
+                            byte[] bytes;
+                            try {
+                                bytes = 
VariantSerializer.INSTANCE.serializeToBytes(variant);
+                            } catch (IOException e) {
+                                throw new RuntimeException(e);
+                            }
+                            writer.writeBinary(bytes);
+                        };
+                break;
             case ARRAY:
                 Serializer<InternalArray> arraySerializer = 
InternalSerializers.create(fieldType);
                 fieldWriter =
@@ -301,6 +315,17 @@ public class RowCompactedSerializer implements 
Serializer<InternalRow> {
                 final int timestampPrecision = getPrecision(fieldType);
                 fieldReader = (reader, pos) -> 
reader.readTimestamp(timestampPrecision);
                 break;
+            case VARIANT:
+                fieldReader =
+                        (reader, pos) -> {
+                            byte[] bytes = reader.readBinary();
+                            try {
+                                return 
VariantSerializer.INSTANCE.deserializeFromBytes(bytes);
+                            } catch (IOException e) {
+                                throw new RuntimeException(e);
+                            }
+                        };
+                break;
             case ARRAY:
                 fieldReader = (reader, pos) -> reader.readArray();
                 break;
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/Serializer.java 
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/Serializer.java
index ebcb02d704..5e35569a33 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/Serializer.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/Serializer.java
@@ -19,8 +19,12 @@
 package org.apache.paimon.data.serializer;
 
 import org.apache.paimon.io.DataInputView;
+import org.apache.paimon.io.DataInputViewStreamWrapper;
 import org.apache.paimon.io.DataOutputView;
+import org.apache.paimon.io.DataOutputViewStreamWrapper;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.Serializable;
 
@@ -66,6 +70,31 @@ public interface Serializer<T> extends Serializable {
      */
     T deserialize(DataInputView source) throws IOException;
 
+    /**
+     * Serializes the given record to byte array.
+     *
+     * @param record The record to serialize.
+     * @return The serialized element.
+     */
+    default byte[] serializeToBytes(T record) throws IOException {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        DataOutputViewStreamWrapper view = new 
DataOutputViewStreamWrapper(out);
+        serialize(record, view);
+        return out.toByteArray();
+    }
+
+    /**
+     * De-serializes a record from byte array.
+     *
+     * @param bytes The byte array to de-serialize.
+     * @return The deserialized element.
+     */
+    default T deserializeFromBytes(byte[] bytes) throws IOException {
+        ByteArrayInputStream in = new ByteArrayInputStream(bytes);
+        DataInputViewStreamWrapper view = new DataInputViewStreamWrapper(in);
+        return deserialize(view);
+    }
+
     /**
      * Serializes the given record to string.
      *
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/data/serializer/RowCompactedSerializerTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/data/serializer/RowCompactedSerializerTest.java
index 919ff4f9dc..6e03a56999 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/data/serializer/RowCompactedSerializerTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/data/serializer/RowCompactedSerializerTest.java
@@ -21,6 +21,7 @@ package org.apache.paimon.data.serializer;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.variant.GenericVariant;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
@@ -178,6 +179,25 @@ abstract class RowCompactedSerializerTest extends 
SerializerTestInstance<Interna
         }
     }
 
+    static final class VariantTypesTest extends RowCompactedSerializerTest {
+        public VariantTypesTest() {
+            super(getRowSerializer(), getData());
+        }
+
+        private static InternalRow[] getData() {
+            return new GenericRow[] {
+                GenericRow.of(null, null),
+                GenericRow.of(
+                        
GenericVariant.fromJson("{\"age\":27,\"city\":\"Beijing\"}"),
+                        
GenericVariant.fromJson("{\"age\":29,\"city\":\"Shanghai\"}"))
+            };
+        }
+
+        private static RowCompactedSerializer getRowSerializer() {
+            return new RowCompactedSerializer(RowType.of(DataTypes.VARIANT(), 
DataTypes.VARIANT()));
+        }
+    }
+
     static final class NestedInternalRowTest extends 
RowCompactedSerializerTest {
 
         private static final RowType NESTED_DATA_TYPE =
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupVariantTableTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupVariantTableTest.java
new file mode 100644
index 0000000000..7dfe6d74a2
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupVariantTableTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lookup;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.variant.GenericVariant;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.disk.IOManagerImpl;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VariantType;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.paimon.CoreOptions.MergeEngine.PARTIAL_UPDATE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link LookupTable} with variant type. */
+public class LookupVariantTableTest extends TableTestBase {
+
+    @TempDir java.nio.file.Path tempDir;
+    private IOManager ioManager;
+
+    @BeforeEach
+    public void before() throws IOException {
+        this.ioManager = new IOManagerImpl(tempDir.toString());
+    }
+
+    @Test
+    public void testRemoteFile() throws Exception {
+        Options options = new Options();
+        options.set(CoreOptions.BUCKET, 1);
+        options.set(CoreOptions.DELETION_VECTORS_ENABLED, true);
+        options.set(CoreOptions.LOOKUP_REMOTE_FILE_ENABLED, true);
+        options.set(CoreOptions.MERGE_ENGINE, PARTIAL_UPDATE);
+        Identifier identifier = new Identifier("default", "t");
+        Schema schema =
+                new Schema(
+                        RowType.of(new IntType(), new IntType(), new 
VariantType()).getFields(),
+                        Collections.emptyList(),
+                        Collections.singletonList("f0"),
+                        options.toMap(),
+                        null);
+        catalog.createTable(identifier, schema, false);
+        FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
+        BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+
+        // first write
+        try (BatchTableWrite write = 
writeBuilder.newWrite().withIOManager(ioManager);
+                BatchTableCommit commit = writeBuilder.newCommit()) {
+            write.write(
+                    GenericRow.of(
+                            1, null, 
GenericVariant.fromJson("{\"age\":27,\"city\":\"Beijing\"}")));
+            commit.commit(write.prepareCommit());
+        }
+
+        // second write to trigger ser and deser
+        try (BatchTableWrite write = 
writeBuilder.newWrite().withIOManager(ioManager);
+                BatchTableCommit commit = writeBuilder.newCommit()) {
+            write.write(GenericRow.of(1, 1, null));
+            commit.commit(write.prepareCommit());
+        }
+
+        // read to assert
+        ReadBuilder readBuilder = table.newReadBuilder();
+        List<String> result = new ArrayList<>();
+        readBuilder
+                .newRead()
+                .createReader(readBuilder.newScan().plan())
+                .forEachRemaining(
+                        row -> {
+                            result.add(
+                                    row.getInt(0)
+                                            + "-"
+                                            + row.getInt(1)
+                                            + "-"
+                                            + row.getVariant(2).toJson());
+                        });
+        
assertThat(result).containsOnly("1-1-{\"age\":27,\"city\":\"Beijing\"}");
+    }
+}

Reply via email to