This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-trino.git


The following commit(s) were added to refs/heads/main by this push:
     new 6c98629  Supports insert operations (#86)
6c98629 is described below

commit 6c98629073c78ca8b00f860de171b956fba30169
Author: rfyu <[email protected]>
AuthorDate: Wed Nov 20 13:23:21 2024 +0800

    Supports insert operations (#86)
---
 .../apache/paimon/trino/fileio/TrinoFileIO.java    |   9 +-
 .../trino/FixedBucketTableShuffleFunction.java     |  87 +++++++++
 .../org/apache/paimon/trino/TrinoColumnHandle.java |   9 +
 .../org/apache/paimon/trino/TrinoConnector.java    |  21 +++
 .../apache/paimon/trino/TrinoConnectorFactory.java |   8 +
 .../paimon/trino/TrinoMergePageSourceWrapper.java  |  96 ++++++++++
 .../org/apache/paimon/trino/TrinoMergeSink.java    | 112 ++++++++++++
 ...TrinoModule.java => TrinoMergeTableHandle.java} |  32 ++--
 .../org/apache/paimon/trino/TrinoMetadata.java     | 202 +++++++++++++++++++++
 .../java/org/apache/paimon/trino/TrinoModule.java  |   2 +
 .../trino/TrinoNodePartitioningProvider.java       |  48 +++++
 .../org/apache/paimon/trino/TrinoPageSink.java     |  96 ++++++++++
 .../apache/paimon/trino/TrinoPageSinkProvider.java | 131 +++++++++++++
 .../paimon/trino/TrinoPageSourceProvider.java      |  45 ++++-
 .../paimon/trino/TrinoPartitioningHandle.java      |  67 +++++++
 .../java/org/apache/paimon/trino/TrinoRow.java     | 181 ++++++++++++++++++
 .../paimon/trino/TrinoSessionProperties.java       |  28 +++
 .../org/apache/paimon/trino/TrinoTableHandle.java  |   5 +-
 .../apache/paimon/trino/fileio/TrinoFileIO.java    |   9 +-
 .../org/apache/paimon/trino/TestTrinoITCase.java   |  43 ++++-
 .../paimon/trino/TestTrinoPartitioningHandle.java  |  47 +++++
 .../java/org/apache/paimon/trino/TestTrinoRow.java | 112 ++++++++++++
 22 files changed, 1353 insertions(+), 37 deletions(-)

diff --git 
a/paimon-trino-420/src/main/java/org/apache/paimon/trino/fileio/TrinoFileIO.java
 
b/paimon-trino-420/src/main/java/org/apache/paimon/trino/fileio/TrinoFileIO.java
index c9eca78..29b1b56 100644
--- 
a/paimon-trino-420/src/main/java/org/apache/paimon/trino/fileio/TrinoFileIO.java
+++ 
b/paimon-trino-420/src/main/java/org/apache/paimon/trino/fileio/TrinoFileIO.java
@@ -32,6 +32,7 @@ import io.trino.filesystem.FileIterator;
 import io.trino.filesystem.Location;
 import io.trino.filesystem.TrinoFileSystem;
 import io.trino.filesystem.TrinoInputFile;
+import io.trino.filesystem.TrinoOutputFile;
 
 import javax.annotation.Nullable;
 
@@ -65,9 +66,11 @@ public class TrinoFileIO implements FileIO {
     }
 
     @Override
-    public PositionOutputStream newOutputStream(Path path, boolean b) throws 
IOException {
+    public PositionOutputStream newOutputStream(Path path, boolean overwrite) 
throws IOException {
+        TrinoOutputFile trinoOutputFile =
+                trinoFileSystem.newOutputFile(Location.of(path.toString()));
         return new PositionOutputStreamWrapper(
-                
trinoFileSystem.newOutputFile(Location.of(path.toString())).create());
+                overwrite ? trinoOutputFile.createOrOverwrite() : 
trinoOutputFile.create());
     }
 
     @Override
@@ -122,7 +125,7 @@ public class TrinoFileIO implements FileIO {
     }
 
     @Override
-    public boolean delete(Path path, boolean b) throws IOException {
+    public boolean delete(Path path, boolean recursive) throws IOException {
         Location location = Location.of(path.toString());
         if (trinoFileSystem.directoryExists(location).orElse(false)) {
             trinoFileSystem.deleteDirectory(location);
diff --git 
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/FixedBucketTableShuffleFunction.java
 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/FixedBucketTableShuffleFunction.java
new file mode 100644
index 0000000..6732bcf
--- /dev/null
+++ 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/FixedBucketTableShuffleFunction.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.codegen.Projection;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.KeyAndBucketExtractor;
+import org.apache.paimon.types.RowKind;
+
+import io.trino.spi.Page;
+import io.trino.spi.block.Block;
+import io.trino.spi.block.RowBlock;
+import io.trino.spi.connector.BucketFunction;
+import io.trino.spi.type.RowType;
+import io.trino.spi.type.Type;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.List;
+
+/** Trino {@link BucketFunction}. */
+public class FixedBucketTableShuffleFunction implements BucketFunction {
+
+    private final int workerCount;
+    private final int bucketCount;
+    private final boolean isRowId;
+    private final Projection pkProjection;
+
+    public FixedBucketTableShuffleFunction(
+            List<Type> partitionChannelTypes,
+            TrinoPartitioningHandle partitioningHandle,
+            int workerCount) {
+
+        TableSchema schema = partitioningHandle.getOriginalSchema();
+        this.pkProjection =
+                CodeGenUtils.newProjection(schema.logicalPrimaryKeysType(), 
schema.primaryKeys());
+        this.bucketCount = new CoreOptions(schema.options()).bucket();
+        this.workerCount = workerCount;
+        this.isRowId =
+                partitionChannelTypes.size() == 1
+                        && partitionChannelTypes.get(0) instanceof RowType;
+    }
+
+    @Override
+    public int getBucket(Page page, int position) {
+        if (isRowId) {
+            RowBlock rowBlock = (RowBlock) page.getBlock(0);
+            try {
+                Method method = 
RowBlock.class.getDeclaredMethod("getRawFieldBlocks");
+                method.setAccessible(true);
+                page = new Page(rowBlock.getPositionCount(), (Block[]) 
method.invoke(rowBlock));
+            } catch (NoSuchMethodException e) {
+                throw new RuntimeException(e);
+            } catch (InvocationTargetException e) {
+                throw new RuntimeException(e);
+            } catch (IllegalAccessException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        TrinoRow trinoRow = new TrinoRow(page.getSingleValuePage(position), 
RowKind.INSERT);
+        BinaryRow pk = pkProjection.apply(trinoRow);
+        int bucket =
+                KeyAndBucketExtractor.bucket(
+                        KeyAndBucketExtractor.bucketKeyHashCode(pk), 
bucketCount);
+        return bucket % workerCount;
+    }
+}
diff --git 
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoColumnHandle.java
 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoColumnHandle.java
index 2a80a7e..ec413c0 100644
--- 
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoColumnHandle.java
+++ 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoColumnHandle.java
@@ -31,9 +31,12 @@ import static java.util.Objects.requireNonNull;
 
 /** Trino {@link ColumnHandle}. */
 public final class TrinoColumnHandle implements ColumnHandle {
+
+    public static final String TRINO_ROW_ID_NAME = "$row_id";
     private final String columnName;
     private final String typeString;
     private final Type trinoType;
+    private final boolean isRowId;
 
     @JsonCreator
     public TrinoColumnHandle(
@@ -43,6 +46,7 @@ public final class TrinoColumnHandle implements ColumnHandle {
         this.columnName = requireNonNull(columnName, "columnName is null");
         this.typeString = requireNonNull(typeString, "columnType is null");
         this.trinoType = requireNonNull(trinoType, "columnType is null");
+        this.isRowId = TRINO_ROW_ID_NAME.equals(columnName);
     }
 
     public static TrinoColumnHandle of(String columnName, DataType columnType) 
{
@@ -67,6 +71,11 @@ public final class TrinoColumnHandle implements ColumnHandle 
{
         return trinoType;
     }
 
+    @JsonProperty
+    public boolean isRowId() {
+        return isRowId;
+    }
+
     public DataType logicalType() {
         return JsonSerdeUtil.fromJson(typeString, DataType.class);
     }
diff --git 
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoConnector.java 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoConnector.java
index 9097247..dff66e5 100644
--- 
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoConnector.java
+++ 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoConnector.java
@@ -20,6 +20,8 @@ package org.apache.paimon.trino;
 
 import io.trino.spi.connector.Connector;
 import io.trino.spi.connector.ConnectorMetadata;
+import io.trino.spi.connector.ConnectorNodePartitioningProvider;
+import io.trino.spi.connector.ConnectorPageSinkProvider;
 import io.trino.spi.connector.ConnectorPageSourceProvider;
 import io.trino.spi.connector.ConnectorSession;
 import io.trino.spi.connector.ConnectorSplitManager;
@@ -38,6 +40,8 @@ public class TrinoConnector implements Connector {
     private final ConnectorMetadata trinoMetadata;
     private final ConnectorSplitManager trinoSplitManager;
     private final ConnectorPageSourceProvider trinoPageSourceProvider;
+    private final ConnectorPageSinkProvider trinoPageSinkProvider;
+    private final ConnectorNodePartitioningProvider 
trinoNodePartitioningProvider;
     private final List<PropertyMetadata<?>> tableProperties;
     private final List<PropertyMetadata<?>> sessionProperties;
 
@@ -45,12 +49,19 @@ public class TrinoConnector implements Connector {
             ConnectorMetadata trinoMetadata,
             ConnectorSplitManager trinoSplitManager,
             ConnectorPageSourceProvider trinoPageSourceProvider,
+            ConnectorPageSinkProvider trinoPageSinkProvider,
+            ConnectorNodePartitioningProvider trinoNodePartitioningProvider,
             TrinoTableOptions trinoTableOptions,
             TrinoSessionProperties trinoSessionProperties) {
         this.trinoMetadata = requireNonNull(trinoMetadata, "trinoMetadata is 
null");
         this.trinoSplitManager = requireNonNull(trinoSplitManager, 
"trinoSplitManager is null");
         this.trinoPageSourceProvider =
                 requireNonNull(trinoPageSourceProvider, 
"trinoRecordSetProvider is null");
+        this.trinoPageSinkProvider =
+                requireNonNull(trinoPageSinkProvider, "trinoPageSinkProvider 
is null");
+        this.trinoNodePartitioningProvider =
+                requireNonNull(
+                        trinoNodePartitioningProvider, 
"trinoNodePartitioningProvider is null");
         this.tableProperties = trinoTableOptions.getTableProperties();
         this.sessionProperties = trinoSessionProperties.getSessionProperties();
     }
@@ -78,6 +89,11 @@ public class TrinoConnector implements Connector {
         return trinoPageSourceProvider;
     }
 
+    @Override
+    public ConnectorPageSinkProvider getPageSinkProvider() {
+        return trinoPageSinkProvider;
+    }
+
     @Override
     public List<PropertyMetadata<?>> getSessionProperties() {
         return sessionProperties;
@@ -87,4 +103,9 @@ public class TrinoConnector implements Connector {
     public List<PropertyMetadata<?>> getTableProperties() {
         return tableProperties;
     }
+
+    @Override
+    public ConnectorNodePartitioningProvider getNodePartitioningProvider() {
+        return trinoNodePartitioningProvider;
+    }
 }
diff --git 
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoConnectorFactory.java
 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoConnectorFactory.java
index ad1d500..4327699 100644
--- 
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoConnectorFactory.java
+++ 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoConnectorFactory.java
@@ -31,6 +31,7 @@ import io.trino.filesystem.manager.FileSystemModule;
 import io.trino.hdfs.HdfsModule;
 import io.trino.hdfs.authentication.HdfsAuthenticationModule;
 import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorMetadata;
+import 
io.trino.plugin.base.classloader.ClassLoaderSafeConnectorPageSinkProvider;
 import 
io.trino.plugin.base.classloader.ClassLoaderSafeConnectorPageSourceProvider;
 import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorSplitManager;
 import io.trino.plugin.hive.NodeVersion;
@@ -129,6 +130,10 @@ public class TrinoConnectorFactory implements 
ConnectorFactory {
             TrinoSplitManager trinoSplitManager = 
injector.getInstance(TrinoSplitManager.class);
             TrinoPageSourceProvider trinoPageSourceProvider =
                     injector.getInstance(TrinoPageSourceProvider.class);
+            TrinoPageSinkProvider trinoPageSinkProvider =
+                    injector.getInstance(TrinoPageSinkProvider.class);
+            TrinoNodePartitioningProvider trinoNodePartitioningProvider =
+                    injector.getInstance(TrinoNodePartitioningProvider.class);
             TrinoSessionProperties trinoSessionProperties =
                     injector.getInstance(TrinoSessionProperties.class);
             TrinoTableOptions trinoTableOptions = 
injector.getInstance(TrinoTableOptions.class);
@@ -138,6 +143,9 @@ public class TrinoConnectorFactory implements 
ConnectorFactory {
                     new 
ClassLoaderSafeConnectorSplitManager(trinoSplitManager, classLoader),
                     new ClassLoaderSafeConnectorPageSourceProvider(
                             trinoPageSourceProvider, classLoader),
+                    new ClassLoaderSafeConnectorPageSinkProvider(
+                            trinoPageSinkProvider, classLoader),
+                    trinoNodePartitioningProvider,
                     trinoTableOptions,
                     trinoSessionProperties);
         }
diff --git 
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoMergePageSourceWrapper.java
 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoMergePageSourceWrapper.java
new file mode 100644
index 0000000..f29397a
--- /dev/null
+++ 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoMergePageSourceWrapper.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino;
+
+import io.trino.spi.Page;
+import io.trino.spi.block.Block;
+import io.trino.spi.block.RowBlock;
+import io.trino.spi.connector.ConnectorPageSource;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Optional;
+
+/** Trino {@link ConnectorPageSource}. */
+public class TrinoMergePageSourceWrapper implements ConnectorPageSource {
+
+    private final ConnectorPageSource pageSource;
+    private final HashMap<String, Integer> fieldToIndex;
+
+    public TrinoMergePageSourceWrapper(
+            ConnectorPageSource pageSource, HashMap<String, Integer> 
fieldToIndex) {
+        this.pageSource = pageSource;
+        this.fieldToIndex = fieldToIndex;
+    }
+
+    public static TrinoMergePageSourceWrapper wrap(
+            ConnectorPageSource pageSource, HashMap<String, Integer> 
fieldToIndex) {
+        return new TrinoMergePageSourceWrapper(pageSource, fieldToIndex);
+    }
+
+    @Override
+    public long getCompletedBytes() {
+        return pageSource.getCompletedBytes();
+    }
+
+    @Override
+    public long getReadTimeNanos() {
+        return pageSource.getReadTimeNanos();
+    }
+
+    @Override
+    public boolean isFinished() {
+        return pageSource.isFinished();
+    }
+
+    @Override
+    public Page getNextPage() {
+        Page nextPage = pageSource.getNextPage();
+        if (nextPage == null) {
+            return null;
+        }
+        int rowCount = nextPage.getPositionCount();
+
+        Block[] newBlocks = new Block[nextPage.getChannelCount() + 1];
+        Block[] rowIdBlocks = new Block[fieldToIndex.size()];
+        for (int i = 0, idx = 0; i < nextPage.getChannelCount(); i++) {
+            Block block = nextPage.getBlock(i);
+            newBlocks[i] = block;
+            if (fieldToIndex.containsValue(i)) {
+                rowIdBlocks[idx] = block;
+                idx++;
+            }
+        }
+        newBlocks[nextPage.getChannelCount()] =
+                RowBlock.fromFieldBlocks(
+                        rowCount, Optional.of(new 
boolean[fieldToIndex.size()]), rowIdBlocks);
+
+        return new Page(rowCount, newBlocks);
+    }
+
+    @Override
+    public long getMemoryUsage() {
+        return pageSource.getMemoryUsage();
+    }
+
+    @Override
+    public void close() throws IOException {
+        pageSource.close();
+    }
+}
diff --git 
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoMergeSink.java 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoMergeSink.java
new file mode 100644
index 0000000..597b4e7
--- /dev/null
+++ 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoMergeSink.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino;
+
+import org.apache.paimon.types.RowKind;
+
+import io.airlift.slice.Slice;
+import io.trino.spi.Page;
+import io.trino.spi.block.Block;
+import io.trino.spi.connector.ConnectorMergeSink;
+import io.trino.spi.connector.ConnectorPageSink;
+import io.trino.spi.type.TinyintType;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.IntStream;
+
+/** Trino {@link ConnectorMergeSink}. */
+public class TrinoMergeSink implements ConnectorMergeSink {
+
+    private final TrinoPageSink pageSink;
+    private final int dataColumnCount;
+
+    public TrinoMergeSink(ConnectorPageSink pageSink, int dataColumnCount) {
+        this.pageSink = (TrinoPageSink) pageSink;
+        this.dataColumnCount = dataColumnCount;
+    }
+
+    @Override
+    public void storeMergedRows(Page page) {
+        int inputChannelCount = page.getChannelCount();
+        if (inputChannelCount != dataColumnCount + 2) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "inputPage channelCount (%s) == dataColumns size 
(%s) + 2",
+                            inputChannelCount, dataColumnCount));
+        } else {
+            int positionCount = page.getPositionCount();
+            if (positionCount <= 0) {
+                throw new IllegalArgumentException(
+                        "positionCount should be > 0, but is " + 
positionCount);
+            } else {
+                Block operationBlock = page.getBlock(inputChannelCount - 2);
+                int[] deletePositions = new int[positionCount];
+                int[] insertPositions = new int[positionCount];
+                int deletePositionCount = 0;
+                int insertPositionCount = 0;
+
+                for (int position = 0; position < positionCount; ++position) {
+                    byte operation = 
TinyintType.TINYINT.getByte(operationBlock, position);
+                    switch (operation) {
+                        case 1:
+                        case 4:
+                            insertPositions[insertPositionCount] = position;
+                            ++insertPositionCount;
+                            break;
+                        case 2:
+                        case 5:
+                            deletePositions[deletePositionCount] = position;
+                            ++deletePositionCount;
+                            break;
+                        case 3:
+                        default:
+                            throw new IllegalArgumentException(
+                                    "Invalid merge operation: " + operation);
+                    }
+                }
+
+                Optional<Page> deletePage = Optional.empty();
+                if (deletePositionCount > 0) {
+                    deletePage =
+                            Optional.of(
+                                    page.getColumns(IntStream.range(0, 
dataColumnCount).toArray())
+                                            .getPositions(deletePositions, 0, 
deletePositionCount));
+                }
+
+                Optional<Page> insertPage = Optional.empty();
+                if (insertPositionCount > 0) {
+                    insertPage =
+                            Optional.of(
+                                    page.getColumns(IntStream.range(0, 
dataColumnCount).toArray())
+                                            .getPositions(insertPositions, 0, 
insertPositionCount));
+                }
+
+                deletePage.ifPresent(delete -> pageSink.writePage(delete, 
RowKind.DELETE));
+                insertPage.ifPresent(insert -> pageSink.writePage(insert, 
RowKind.INSERT));
+            }
+        }
+    }
+
+    @Override
+    public CompletableFuture<Collection<Slice>> finish() {
+        return pageSink.finish();
+    }
+}
diff --git 
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoModule.java 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoMergeTableHandle.java
similarity index 51%
copy from 
paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoModule.java
copy to 
paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoMergeTableHandle.java
index da22ed6..901d6fa 100644
--- a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoModule.java
+++ 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoMergeTableHandle.java
@@ -18,30 +18,24 @@
 
 package org.apache.paimon.trino;
 
-import org.apache.paimon.options.Options;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.trino.spi.connector.ConnectorMergeTableHandle;
+import io.trino.spi.connector.ConnectorTableHandle;
 
-import com.google.inject.Binder;
-import com.google.inject.Module;
+/** Trino {@link ConnectorMergeTableHandle}. */
+public class TrinoMergeTableHandle implements ConnectorMergeTableHandle {
 
-import java.util.Map;
+    private final TrinoTableHandle tableHandle;
 
-import static com.google.inject.Scopes.SINGLETON;
-
-/** Module for binding instance. */
-public class TrinoModule implements Module {
-    private Map<String, String> config;
-
-    public TrinoModule(Map<String, String> config) {
-        this.config = config;
+    @JsonCreator
+    public TrinoMergeTableHandle(@JsonProperty("tableHandle") TrinoTableHandle 
tableHandle) {
+        this.tableHandle = tableHandle;
     }
 
     @Override
-    public void configure(Binder binder) {
-        binder.bind(Options.class).toInstance(new Options(config));
-        binder.bind(TrinoMetadataFactory.class).in(SINGLETON);
-        binder.bind(TrinoSplitManager.class).in(SINGLETON);
-        binder.bind(TrinoPageSourceProvider.class).in(SINGLETON);
-        binder.bind(TrinoSessionProperties.class).in(SINGLETON);
-        binder.bind(TrinoTableOptions.class).in(SINGLETON);
+    @JsonProperty
+    public ConnectorTableHandle getTableHandle() {
+        return tableHandle;
     }
 }
diff --git 
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoMetadata.java 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoMetadata.java
index 468d8aa..e896ea0 100644
--- 
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoMetadata.java
+++ 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoMetadata.java
@@ -25,8 +25,15 @@ import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageSerializer;
 import org.apache.paimon.trino.catalog.TrinoCatalog;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.InstantiationUtil;
 import org.apache.paimon.utils.StringUtils;
 
 import io.airlift.slice.Slice;
@@ -34,9 +41,15 @@ import io.trino.spi.TrinoException;
 import io.trino.spi.connector.Assignment;
 import io.trino.spi.connector.ColumnHandle;
 import io.trino.spi.connector.ColumnMetadata;
+import io.trino.spi.connector.ConnectorInsertTableHandle;
+import io.trino.spi.connector.ConnectorMergeTableHandle;
 import io.trino.spi.connector.ConnectorMetadata;
+import io.trino.spi.connector.ConnectorOutputMetadata;
+import io.trino.spi.connector.ConnectorOutputTableHandle;
+import io.trino.spi.connector.ConnectorPartitioningHandle;
 import io.trino.spi.connector.ConnectorSession;
 import io.trino.spi.connector.ConnectorTableHandle;
+import io.trino.spi.connector.ConnectorTableLayout;
 import io.trino.spi.connector.ConnectorTableMetadata;
 import io.trino.spi.connector.ConnectorTableProperties;
 import io.trino.spi.connector.ConnectorTableVersion;
@@ -44,11 +57,14 @@ import io.trino.spi.connector.Constraint;
 import io.trino.spi.connector.ConstraintApplicationResult;
 import io.trino.spi.connector.LimitApplicationResult;
 import io.trino.spi.connector.ProjectionApplicationResult;
+import io.trino.spi.connector.RetryMode;
+import io.trino.spi.connector.RowChangeParadigm;
 import io.trino.spi.connector.SchemaTableName;
 import io.trino.spi.connector.SchemaTablePrefix;
 import io.trino.spi.expression.ConnectorExpression;
 import io.trino.spi.predicate.Domain;
 import io.trino.spi.security.TrinoPrincipal;
+import io.trino.spi.statistics.ComputedStatistics;
 import io.trino.spi.type.LongTimestampWithTimeZone;
 import io.trino.spi.type.TimestampWithTimeZoneType;
 import io.trino.spi.type.Type;
@@ -56,6 +72,7 @@ import io.trino.spi.type.VarcharType;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -69,11 +86,13 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
+import static 
io.trino.spi.connector.RowChangeParadigm.DELETE_ROW_AND_INSERT_ROW;
 import static io.trino.spi.type.DateTimeEncoding.unpackMillisUtc;
 import static java.lang.String.format;
 import static java.util.Objects.requireNonNull;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toMap;
+import static org.apache.paimon.trino.TrinoColumnHandle.TRINO_ROW_ID_NAME;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Trino {@link ConnectorMetadata}. */
@@ -90,6 +109,189 @@ public class TrinoMetadata implements ConnectorMetadata {
         return catalog;
     }
 
+    // todo support dynamic bucket table
+    @Override
+    public Optional<ConnectorTableLayout> getInsertLayout(
+            ConnectorSession session, ConnectorTableHandle tableHandle) {
+        TrinoTableHandle trinoTableHandle = (TrinoTableHandle) tableHandle;
+        Table table = trinoTableHandle.table(catalog);
+        if (!(table instanceof FileStoreTable)) {
+            throw new IllegalArgumentException(table.getClass() + " is not 
supported");
+        }
+        FileStoreTable fileStoreTable = (FileStoreTable) table;
+        switch (fileStoreTable.bucketMode()) {
+            case FIXED:
+                try {
+                    return Optional.of(
+                            new ConnectorTableLayout(
+                                    new TrinoPartitioningHandle(
+                                            InstantiationUtil.serializeObject(
+                                                    fileStoreTable.schema())),
+                                    table.primaryKeys(),
+                                    false));
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            case DYNAMIC:
+            case GLOBAL_DYNAMIC:
+                if (table.primaryKeys().isEmpty()) {
+                    throw new IllegalArgumentException(
+                            "Only primary-key table can support dynamic 
bucket.");
+                }
+                throw new IllegalArgumentException("Global dynamic bucket mode 
are not supported");
+            case UNAWARE:
+                if (!table.primaryKeys().isEmpty()) {
+                    throw new IllegalArgumentException(
+                            "Only append table can support unaware bucket.");
+                }
+                throw new IllegalArgumentException("Unaware bucket mode are 
not supported");
+            default:
+                throw new IllegalArgumentException("Unknown bucket mode");
+        }
+    }
+
+    @Override
+    public ConnectorOutputTableHandle beginCreateTable(
+            ConnectorSession session,
+            ConnectorTableMetadata tableMetadata,
+            Optional<ConnectorTableLayout> layout,
+            RetryMode retryMode) {
+        createTable(session, tableMetadata, false);
+        return getTableHandle(session, tableMetadata.getTable(), 
Collections.emptyMap());
+    }
+
+    @Override
+    public Optional<ConnectorOutputMetadata> finishCreateTable(
+            ConnectorSession session,
+            ConnectorOutputTableHandle tableHandle,
+            Collection<Slice> fragments,
+            Collection<ComputedStatistics> computedStatistics) {
+        if (fragments.isEmpty()) {
+            return Optional.empty();
+        }
+        return commit(session, (TrinoTableHandle) tableHandle, fragments);
+    }
+
+    @Override
+    public ConnectorInsertTableHandle beginInsert(
+            ConnectorSession session,
+            ConnectorTableHandle tableHandle,
+            List<ColumnHandle> columns,
+            RetryMode retryMode) {
+        return (ConnectorInsertTableHandle) tableHandle;
+    }
+
+    @Override
+    public Optional<ConnectorOutputMetadata> finishInsert(
+            ConnectorSession session,
+            ConnectorInsertTableHandle insertHandle,
+            Collection<Slice> fragments,
+            Collection<ComputedStatistics> computedStatistics) {
+        return commit(session, (TrinoTableHandle) insertHandle, fragments);
+    }
+
+    private Optional<ConnectorOutputMetadata> commit(
+            ConnectorSession session, TrinoTableHandle insertHandle, 
Collection<Slice> fragments) {
+        CommitMessageSerializer serializer = new CommitMessageSerializer();
+        List<CommitMessage> commitMessages =
+                fragments.stream()
+                        .map(
+                                slice -> {
+                                    try {
+                                        return serializer.deserialize(
+                                                serializer.getVersion(), 
slice.getBytes());
+                                    } catch (IOException e) {
+                                        throw new RuntimeException(e);
+                                    }
+                                })
+                        .collect(toList());
+
+        if (commitMessages.isEmpty()) {
+            return Optional.empty();
+        }
+
+        TrinoTableHandle table = insertHandle;
+        BatchWriteBuilder batchWriteBuilder =
+                table.tableWithDynamicOptions(catalog, 
session).newBatchWriteBuilder();
+        if (TrinoSessionProperties.enableInsertOverwrite(session)) {
+            batchWriteBuilder.withOverwrite();
+        }
+        batchWriteBuilder.newCommit().commit(commitMessages);
+        return Optional.empty();
+    }
+
+    @Override
+    public RowChangeParadigm getRowChangeParadigm(
+            ConnectorSession session, ConnectorTableHandle tableHandle) {
+        return DELETE_ROW_AND_INSERT_ROW;
+    }
+
+    // todo support dynamic bucket table
+    @Override
+    public ColumnHandle getMergeRowIdColumnHandle(
+            ConnectorSession session, ConnectorTableHandle tableHandle) {
+        TrinoTableHandle trinoTableHandle = (TrinoTableHandle) tableHandle;
+        Table table = trinoTableHandle.table(catalog);
+        Set<String> pkSet = 
table.primaryKeys().stream().collect(Collectors.toSet());
+        DataField[] row =
+                table.rowType().getFields().stream()
+                        .filter(dataField -> pkSet.contains(dataField.name()))
+                        .toArray(DataField[]::new);
+        return TrinoColumnHandle.of(TRINO_ROW_ID_NAME, DataTypes.ROW(row));
+    }
+
+    // todo support dynamic bucket table
+    @Override
+    public Optional<ConnectorPartitioningHandle> getUpdateLayout(
+            ConnectorSession session, ConnectorTableHandle tableHandle) {
+        TrinoTableHandle trinoTableHandle = (TrinoTableHandle) tableHandle;
+        Table table = trinoTableHandle.table(catalog);
+        if (!(table instanceof FileStoreTable)) {
+            throw new IllegalArgumentException(table.getClass() + " is not 
supported");
+        }
+        FileStoreTable fileStoreTable = (FileStoreTable) table;
+        switch (fileStoreTable.bucketMode()) {
+            case FIXED:
+                try {
+                    return Optional.of(
+                            new TrinoPartitioningHandle(
+                                    
InstantiationUtil.serializeObject(fileStoreTable.schema())));
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            case DYNAMIC:
+            case GLOBAL_DYNAMIC:
+                if (table.primaryKeys().isEmpty()) {
+                    throw new IllegalArgumentException(
+                            "Only primary-key table can support dynamic 
bucket.");
+                }
+                throw new IllegalArgumentException("Global dynamic bucket mode 
are not supported");
+            case UNAWARE:
+                if (!table.primaryKeys().isEmpty()) {
+                    throw new IllegalArgumentException(
+                            "Only append table can support unaware bucket.");
+                }
+                throw new IllegalArgumentException("Unaware bucket mode are 
not supported");
+            default:
+                throw new IllegalArgumentException("Unknown bucket mode");
+        }
+    }
+
+    @Override
+    public ConnectorMergeTableHandle beginMerge(
+            ConnectorSession session, ConnectorTableHandle tableHandle, 
RetryMode retryMode) {
+        return new TrinoMergeTableHandle((TrinoTableHandle) tableHandle);
+    }
+
+    @Override
+    public void finishMerge(
+            ConnectorSession session,
+            ConnectorMergeTableHandle mergeTableHandle,
+            Collection<Slice> fragments,
+            Collection<ComputedStatistics> computedStatistics) {
+        commit(session, (TrinoTableHandle) mergeTableHandle.getTableHandle(), 
fragments);
+    }
+
     @Override
     public boolean schemaExists(ConnectorSession session, String schemaName) {
         catalog.initSession(session);
diff --git 
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoModule.java 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoModule.java
index da22ed6..f629736 100644
--- a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoModule.java
+++ b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoModule.java
@@ -41,6 +41,8 @@ public class TrinoModule implements Module {
         binder.bind(TrinoMetadataFactory.class).in(SINGLETON);
         binder.bind(TrinoSplitManager.class).in(SINGLETON);
         binder.bind(TrinoPageSourceProvider.class).in(SINGLETON);
+        binder.bind(TrinoPageSinkProvider.class).in(SINGLETON);
+        binder.bind(TrinoNodePartitioningProvider.class).in(SINGLETON);
         binder.bind(TrinoSessionProperties.class).in(SINGLETON);
         binder.bind(TrinoTableOptions.class).in(SINGLETON);
     }
diff --git 
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoNodePartitioningProvider.java
 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoNodePartitioningProvider.java
new file mode 100644
index 0000000..419f8a1
--- /dev/null
+++ 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoNodePartitioningProvider.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino;
+
+import com.google.inject.Inject;
+import io.trino.spi.connector.BucketFunction;
+import io.trino.spi.connector.ConnectorNodePartitioningProvider;
+import io.trino.spi.connector.ConnectorPartitioningHandle;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorTransactionHandle;
+import io.trino.spi.type.Type;
+
+import java.util.List;
+
+/** Trino {@link ConnectorNodePartitioningProvider}. */
+public class TrinoNodePartitioningProvider implements 
ConnectorNodePartitioningProvider {
+
+    @Inject
+    public TrinoNodePartitioningProvider() {}
+
+    @Override
+    public BucketFunction getBucketFunction(
+            ConnectorTransactionHandle transactionHandle,
+            ConnectorSession session,
+            ConnectorPartitioningHandle partitioningHandle,
+            List<Type> partitionChannelTypes,
+            int bucketCount) {
+        // todo support different types of tables according to different 
PartitioningHandle
+        return new FixedBucketTableShuffleFunction(
+                partitionChannelTypes, (TrinoPartitioningHandle) 
partitioningHandle, bucketCount);
+    }
+}
diff --git 
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSink.java 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSink.java
new file mode 100644
index 0000000..bbd94e5
--- /dev/null
+++ 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSink.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino;
+
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageSerializer;
+import org.apache.paimon.types.RowKind;
+
+import io.airlift.slice.Slice;
+import io.trino.spi.Page;
+import io.trino.spi.connector.ConnectorPageSink;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static io.airlift.slice.Slices.wrappedBuffer;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+/** TrinoPageSink. */
+public class TrinoPageSink implements ConnectorPageSink {
+
+    private final BatchTableWrite writer;
+
+    public TrinoPageSink(BatchTableWrite writer) {
+        this.writer = writer;
+    }
+
+    @Override
+    public CompletableFuture<?> appendPage(Page page) {
+        try {
+            writePage(page, RowKind.INSERT);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        return NOT_BLOCKED;
+    }
+
+    public void writePage(Page page, RowKind rowKind) {
+        try {
+            for (int i = 0; i < page.getPositionCount(); i++) {
+                writer.write(new TrinoRow(page.getSingleValuePage(i), 
rowKind));
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Collection<Slice>> finish() {
+        Collection<Slice> commitTasks = new ArrayList<>();
+        try {
+            List<CommitMessage> commitMessages = writer.prepareCommit();
+            CommitMessageSerializer serializer = new CommitMessageSerializer();
+            for (CommitMessage commitMessage : commitMessages) {
+                
commitTasks.add(wrappedBuffer(serializer.serialize(commitMessage)));
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        } finally {
+            try {
+                writer.close();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+        return completedFuture(commitTasks);
+    }
+
+    @Override
+    public void abort() {
+        try {
+            writer.close();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git 
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSinkProvider.java
 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSinkProvider.java
new file mode 100644
index 0000000..c8e2c7d
--- /dev/null
+++ 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSinkProvider.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino;
+
+import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.trino.catalog.TrinoCatalog;
+
+import com.google.inject.Inject;
+import io.trino.spi.connector.ConnectorInsertTableHandle;
+import io.trino.spi.connector.ConnectorMergeSink;
+import io.trino.spi.connector.ConnectorMergeTableHandle;
+import io.trino.spi.connector.ConnectorOutputTableHandle;
+import io.trino.spi.connector.ConnectorPageSink;
+import io.trino.spi.connector.ConnectorPageSinkId;
+import io.trino.spi.connector.ConnectorPageSinkProvider;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorTransactionHandle;
+
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.paimon.trino.ClassLoaderUtils.runWithContextClassLoader;
+
+/** Trino {@link ConnectorPageSinkProvider}. */
+public class TrinoPageSinkProvider implements ConnectorPageSinkProvider {
+
+    private final TrinoCatalog trinoCatalog;
+
+    @Inject
+    public TrinoPageSinkProvider(TrinoMetadataFactory trinoMetadataFactory) {
+        this.trinoCatalog =
+                requireNonNull(trinoMetadataFactory, "trinoMetadataFactory is 
null")
+                        .create()
+                        .catalog();
+    }
+
+    @Override
+    public ConnectorPageSink createPageSink(
+            ConnectorTransactionHandle transactionHandle,
+            ConnectorSession session,
+            ConnectorOutputTableHandle outputTableHandle,
+            ConnectorPageSinkId pageSinkId) {
+        return createPageSink((TrinoTableHandle) outputTableHandle, session);
+    }
+
+    @Override
+    public ConnectorPageSink createPageSink(
+            ConnectorTransactionHandle transactionHandle,
+            ConnectorSession session,
+            ConnectorInsertTableHandle insertTableHandle,
+            ConnectorPageSinkId pageSinkId) {
+        return createPageSink((TrinoTableHandle) insertTableHandle, session);
+    }
+
+    private ConnectorPageSink createPageSink(
+            TrinoTableHandle tableHandle, ConnectorSession session) {
+        trinoCatalog.initSession(session);
+        Table table = tableHandle.tableWithDynamicOptions(trinoCatalog, 
session);
+        validataBucketMode(table);
+
+        return runWithContextClassLoader(
+                () -> {
+                    BatchWriteBuilder batchWriteBuilder = 
table.newBatchWriteBuilder();
+                    if (TrinoSessionProperties.enableInsertOverwrite(session)) 
{
+                        batchWriteBuilder.withOverwrite();
+                    }
+                    BatchTableWrite write = batchWriteBuilder.newWrite();
+                    return new TrinoPageSink(write);
+                },
+                TrinoPageSinkProvider.class.getClassLoader());
+    }
+
+    private static void validataBucketMode(Table table) {
+        BucketMode mode =
+                table instanceof FileStoreTable
+                        ? ((FileStoreTable) table).bucketMode()
+                        : BucketMode.FIXED;
+        switch (mode) {
+            case FIXED:
+                if (table.primaryKeys().isEmpty()) {
+                    throw new IllegalArgumentException("Only support 
primary-key table.");
+                }
+                break;
+            case DYNAMIC:
+            case GLOBAL_DYNAMIC:
+                if (table.primaryKeys().isEmpty()) {
+                    throw new IllegalArgumentException(
+                            "Only primary-key table can support dynamic 
bucket.");
+                }
+                throw new IllegalArgumentException("Global dynamic bucket mode 
are not supported");
+            case UNAWARE:
+                if (!table.primaryKeys().isEmpty()) {
+                    throw new IllegalArgumentException(
+                            "Only append table can support unaware bucket.");
+                }
+                throw new IllegalArgumentException("Unaware bucket mode are 
not supported");
+            default:
+                throw new IllegalArgumentException("Unknown bucket mode");
+        }
+    }
+
+    @Override
+    public ConnectorMergeSink createMergeSink(
+            ConnectorTransactionHandle transactionHandle,
+            ConnectorSession session,
+            ConnectorMergeTableHandle mergeHandle,
+            ConnectorPageSinkId pageSinkId) {
+        TrinoTableHandle trinoTableHandle = (TrinoTableHandle) 
mergeHandle.getTableHandle();
+        Table table = trinoTableHandle.tableWithDynamicOptions(trinoCatalog, 
session);
+        return new TrinoMergeSink(
+                createPageSink(trinoTableHandle, session), 
table.rowType().getFields().size());
+    }
+}
diff --git 
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java
 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java
index a380e42..41bb874 100644
--- 
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java
+++ 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java
@@ -71,6 +71,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalLong;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 import static 
io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext;
@@ -106,14 +107,52 @@ public class TrinoPageSourceProvider implements 
ConnectorPageSourceProvider {
         TrinoTableHandle trinoTableHandle = (TrinoTableHandle) tableHandle;
         Table table = trinoTableHandle.tableWithDynamicOptions(trinoCatalog, 
session);
         return runWithContextClassLoader(
-                () ->
-                        createPageSource(
+                () -> {
+                    Optional<TrinoColumnHandle> rowId =
+                            columns.stream()
+                                    .map(TrinoColumnHandle.class::cast)
+                                    .filter(column -> column.isRowId())
+                                    .findFirst();
+                    if (rowId.isPresent()) {
+                        List<ColumnHandle> dataColumns =
+                                columns.stream()
+                                        .map(TrinoColumnHandle.class::cast)
+                                        .filter(column -> !column.isRowId())
+                                        .collect(Collectors.toList());
+                        Set<String> rowIdFileds =
+                                ((io.trino.spi.type.RowType) 
rowId.get().getTrinoType())
+                                        .getFields().stream()
+                                                
.map(io.trino.spi.type.RowType.Field::getName)
+                                                .map(Optional::get)
+                                                .collect(Collectors.toSet());
+
+                        HashMap<String, Integer> fieldToIndex = new 
HashMap<>();
+                        for (int i = 0; i < dataColumns.size(); i++) {
+                            TrinoColumnHandle trinoColumnHandle =
+                                    (TrinoColumnHandle) dataColumns.get(i);
+                            if 
(rowIdFileds.contains(trinoColumnHandle.getColumnName())) {
+                                
fieldToIndex.put(trinoColumnHandle.getColumnName(), i);
+                            }
+                        }
+                        return TrinoMergePageSourceWrapper.wrap(
+                                createPageSource(
+                                        session,
+                                        table,
+                                        trinoTableHandle.getFilter(),
+                                        (TrinoSplit) split,
+                                        dataColumns,
+                                        trinoTableHandle.getLimit()),
+                                fieldToIndex);
+                    } else {
+                        return createPageSource(
                                 session,
                                 table,
                                 trinoTableHandle.getFilter(),
                                 (TrinoSplit) split,
                                 columns,
-                                trinoTableHandle.getLimit()),
+                                trinoTableHandle.getLimit());
+                    }
+                },
                 TrinoPageSourceProvider.class.getClassLoader());
     }
 
diff --git 
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPartitioningHandle.java
 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPartitioningHandle.java
new file mode 100644
index 0000000..793bf02
--- /dev/null
+++ 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPartitioningHandle.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino;
+
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.utils.InstantiationUtil;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.trino.spi.connector.ConnectorPartitioningHandle;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+/** Trino {@link ConnectorPartitioningHandle}. */
+public class TrinoPartitioningHandle implements ConnectorPartitioningHandle {
+
+    private final byte[] schema;
+
+    @JsonCreator
+    public TrinoPartitioningHandle(@JsonProperty("schema") byte[] schema) {
+        this.schema = schema;
+    }
+
+    @JsonProperty
+    public byte[] getSchema() {
+        return schema;
+    }
+
+    public TableSchema getOriginalSchema() {
+        try {
+            return InstantiationUtil.deserializeObject(this.schema, 
getClass().getClassLoader());
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } catch (ClassNotFoundException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        TrinoPartitioningHandle that = (TrinoPartitioningHandle) o;
+        return Arrays.equals(schema, that.getSchema());
+    }
+}
diff --git 
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoRow.java 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoRow.java
new file mode 100644
index 0000000..0d4823d
--- /dev/null
+++ b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoRow.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.types.RowKind;
+
+import io.airlift.slice.Slice;
+import io.trino.spi.Page;
+import io.trino.spi.type.DecimalType;
+import io.trino.spi.type.Int128;
+import io.trino.spi.type.TypeUtils;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+
+import static io.trino.spi.type.BigintType.BIGINT;
+import static io.trino.spi.type.BooleanType.BOOLEAN;
+import static io.trino.spi.type.Decimals.MAX_SHORT_PRECISION;
+import static io.trino.spi.type.DoubleType.DOUBLE;
+import static io.trino.spi.type.IntegerType.INTEGER;
+import static io.trino.spi.type.RealType.REAL;
+import static io.trino.spi.type.SmallintType.SMALLINT;
+import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS;
+import static io.trino.spi.type.VarbinaryType.VARBINARY;
+import static 
org.apache.paimon.shade.guava30.com.google.common.base.Verify.verify;
+
+/** TrinoRow {@link InternalRow}. */
+public class TrinoRow implements InternalRow, Serializable {
+
+    private final RowKind rowKind;
+    private final Page singlePage;
+
+    public TrinoRow(Page singlePage, RowKind rowKind) {
+        verify(singlePage.getPositionCount() == 1, "singlePage must have only 
one row");
+        this.singlePage = singlePage;
+        this.rowKind = rowKind;
+    }
+
+    @Override
+    public int getFieldCount() {
+        return singlePage.getChannelCount();
+    }
+
+    @Override
+    public RowKind getRowKind() {
+        return rowKind;
+    }
+
+    @Override
+    public void setRowKind(RowKind rowKind) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean isNullAt(int i) {
+        return singlePage.getBlock(i).isNull(0);
+    }
+
+    @Override
+    public boolean getBoolean(int i) {
+        return (boolean) TypeUtils.readNativeValue(BOOLEAN, 
singlePage.getBlock(i), 0);
+    }
+
+    @Override
+    public byte getByte(int i) {
+        Slice slice = (Slice) TypeUtils.readNativeValue(VARBINARY, 
singlePage.getBlock(i), 0);
+        return slice.getByte(0);
+    }
+
+    @Override
+    public short getShort(int i) {
+        long value = (long) TypeUtils.readNativeValue(SMALLINT, 
singlePage.getBlock(i), 0);
+        if (value < Short.MIN_VALUE || value > Short.MAX_VALUE) {
+            throw new IllegalArgumentException("Value out of range for short: 
" + value);
+        }
+        return (short) value;
+    }
+
+    @Override
+    public int getInt(int i) {
+        long value = (long) TypeUtils.readNativeValue(INTEGER, 
singlePage.getBlock(i), 0);
+        if (value < Integer.MIN_VALUE || value > Integer.MAX_VALUE) {
+            throw new IllegalArgumentException("Value out of range for int: " 
+ value);
+        }
+        return (int) value;
+    }
+
+    @Override
+    public long getLong(int i) {
+        return (long) TypeUtils.readNativeValue(BIGINT, 
singlePage.getBlock(i), 0);
+    }
+
+    @Override
+    public float getFloat(int i) {
+        return Float.intBitsToFloat(
+                Math.toIntExact((long) TypeUtils.readNativeValue(REAL, 
singlePage.getBlock(i), 0)));
+    }
+
+    @Override
+    public double getDouble(int i) {
+        return (double) TypeUtils.readNativeValue(DOUBLE, 
singlePage.getBlock(i), 0);
+    }
+
+    @Override
+    public BinaryString getString(int i) {
+        return BinaryString.fromBytes(getBinary(i));
+    }
+
+    @Override
+    public Decimal getDecimal(int i, int decimalPrecision, int decimalScale) {
+        Object value =
+                TypeUtils.readNativeValue(
+                        DecimalType.createDecimalType(decimalPrecision, 
decimalScale),
+                        singlePage.getBlock(i),
+                        0);
+        if (decimalPrecision <= MAX_SHORT_PRECISION) {
+            return Decimal.fromUnscaledLong((Long) value, decimalPrecision, 
decimalScale);
+        } else {
+            long high = ((Int128) value).getHigh();
+            long low = ((Int128) value).getLow();
+            BigInteger bigIntegerValue =
+                    
BigInteger.valueOf(high).shiftLeft(64).add(BigInteger.valueOf(low));
+            BigDecimal bigDecimalValue = new BigDecimal(bigIntegerValue, 
decimalScale);
+            return Decimal.fromBigDecimal(bigDecimalValue, decimalPrecision, 
decimalScale);
+        }
+    }
+
+    @Override
+    public Timestamp getTimestamp(int i, int timestampPrecision) {
+        long value = (long) TypeUtils.readNativeValue(TIMESTAMP_MICROS, 
singlePage.getBlock(i), 0);
+        return Timestamp.fromMicros(value);
+    }
+
+    @Override
+    public byte[] getBinary(int i) {
+        Slice slice = (Slice) TypeUtils.readNativeValue(VARBINARY, 
singlePage.getBlock(i), 0);
+        return slice.getBytes();
+    }
+
+    @Override
+    public InternalArray getArray(int i) {
+        // todo
+        //            singlePage.getBlock(i).getChildren()
+        return null;
+    }
+
+    @Override
+    public InternalMap getMap(int i) {
+        // todo
+        return null;
+    }
+
+    @Override
+    public InternalRow getRow(int i, int i1) {
+        // todo
+        return null;
+    }
+}
diff --git 
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoSessionProperties.java
 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoSessionProperties.java
index 825d34c..e929bfc 100644
--- 
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoSessionProperties.java
+++ 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoSessionProperties.java
@@ -27,6 +27,7 @@ import java.util.List;
 
 import static io.trino.spi.session.PropertyMetadata.doubleProperty;
 import static io.trino.spi.session.PropertyMetadata.longProperty;
+import static io.trino.spi.type.VarcharType.VARCHAR;
 import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID;
 import static org.apache.paimon.CoreOptions.SCAN_TIMESTAMP_MILLIS;
 
@@ -36,6 +37,8 @@ public class TrinoSessionProperties {
     public static final String SCAN_TIMESTAMP = "scan_timestamp_millis";
     public static final String SCAN_SNAPSHOT = "scan_snapshot_id";
     public static final String MINIMUM_SPLIT_WEIGHT = "minimum_split_weight";
+    public static final String INSERT_EXISTING_PARTITIONS_BEHAVIOR =
+            "insert_existing_partitions_behavior";
 
     private final List<PropertyMetadata<?>> sessionProperties;
 
@@ -57,6 +60,18 @@ public class TrinoSessionProperties {
                         .add(
                                 doubleProperty(
                                         MINIMUM_SPLIT_WEIGHT, "Minimum split 
weight", 0.05, false))
+                        .add(
+                                new PropertyMetadata<>(
+                                        INSERT_EXISTING_PARTITIONS_BEHAVIOR,
+                                        "Behavior on insert existing 
partitions",
+                                        VARCHAR,
+                                        InsertExistingPartitionsBehavior.class,
+                                        
InsertExistingPartitionsBehavior.APPEND,
+                                        false,
+                                        value ->
+                                                
InsertExistingPartitionsBehavior.valueOf(
+                                                        (String) value),
+                                        
InsertExistingPartitionsBehavior::toString))
                         .build();
     }
 
@@ -75,4 +90,17 @@ public class TrinoSessionProperties {
     public static Double getMinimumSplitWeight(ConnectorSession session) {
         return session.getProperty(MINIMUM_SPLIT_WEIGHT, Double.class);
     }
+
+    public static boolean enableInsertOverwrite(ConnectorSession session) {
+        return session.getProperty(
+                        INSERT_EXISTING_PARTITIONS_BEHAVIOR, 
InsertExistingPartitionsBehavior.class)
+                == InsertExistingPartitionsBehavior.OVERWRITE;
+    }
+
+    /** Insert existing partitions behavior. */
+    public enum InsertExistingPartitionsBehavior {
+        ERROR,
+        APPEND,
+        OVERWRITE,
+    }
 }
diff --git 
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoTableHandle.java
 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoTableHandle.java
index bc5d025..d5dbf44 100644
--- 
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoTableHandle.java
+++ 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoTableHandle.java
@@ -28,6 +28,8 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import io.trino.spi.connector.ColumnHandle;
 import io.trino.spi.connector.ColumnMetadata;
+import io.trino.spi.connector.ConnectorInsertTableHandle;
+import io.trino.spi.connector.ConnectorOutputTableHandle;
 import io.trino.spi.connector.ConnectorSession;
 import io.trino.spi.connector.ConnectorTableHandle;
 import io.trino.spi.connector.ConnectorTableMetadata;
@@ -44,7 +46,8 @@ import java.util.OptionalLong;
 import java.util.stream.Collectors;
 
 /** Trino {@link ConnectorTableHandle}. */
-public class TrinoTableHandle implements ConnectorTableHandle {
+public class TrinoTableHandle
+        implements ConnectorTableHandle, ConnectorInsertTableHandle, 
ConnectorOutputTableHandle {
 
     private final String schemaName;
     private final String tableName;
diff --git 
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/fileio/TrinoFileIO.java
 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/fileio/TrinoFileIO.java
index 6b74596..0592751 100644
--- 
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/fileio/TrinoFileIO.java
+++ 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/fileio/TrinoFileIO.java
@@ -32,6 +32,7 @@ import io.trino.filesystem.FileIterator;
 import io.trino.filesystem.Location;
 import io.trino.filesystem.TrinoFileSystem;
 import io.trino.filesystem.TrinoInputFile;
+import io.trino.filesystem.TrinoOutputFile;
 
 import javax.annotation.Nullable;
 
@@ -65,9 +66,11 @@ public class TrinoFileIO implements FileIO {
     }
 
     @Override
-    public PositionOutputStream newOutputStream(Path path, boolean b) throws 
IOException {
+    public PositionOutputStream newOutputStream(Path path, boolean overwrite) 
throws IOException {
+        TrinoOutputFile trinoOutputFile =
+                trinoFileSystem.newOutputFile(Location.of(path.toString()));
         return new PositionOutputStreamWrapper(
-                
trinoFileSystem.newOutputFile(Location.of(path.toString())).create());
+                overwrite ? trinoOutputFile.createOrOverwrite() : 
trinoOutputFile.create());
     }
 
     @Override
@@ -132,7 +135,7 @@ public class TrinoFileIO implements FileIO {
     }
 
     @Override
-    public boolean delete(Path path, boolean b) throws IOException {
+    public boolean delete(Path path, boolean recursive) throws IOException {
         Location location = Location.of(path.toString());
         if (trinoFileSystem.directoryExists(location).orElse(false)) {
             trinoFileSystem.deleteDirectory(location);
diff --git 
a/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
 
b/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
index 0258ff8..dda03b3 100644
--- 
a/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
+++ 
b/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
@@ -489,6 +489,29 @@ public abstract class TestTrinoITCase extends 
AbstractTestQueryFramework {
             commit.commit(2, writer.prepareCommit(true, 2));
         }
 
+        {
+            Path tablePath = new Path(warehouse, "default.db/t103");
+            RowType rowType =
+                    new RowType(
+                            Arrays.asList(
+                                    new DataField(0, "id", DataTypes.INT()),
+                                    new DataField(1, "name", 
DataTypes.STRING())));
+            new SchemaManager(LocalFileIO.create(), tablePath)
+                    .createTable(
+                            new Schema(
+                                    rowType.getFields(),
+                                    Collections.emptyList(),
+                                    Collections.emptyList(),
+                                    new HashMap<>() {
+                                        {
+                                            put("file.format", "orc");
+                                            put("primary-key", "id");
+                                            put("bucket", "2");
+                                        }
+                                    },
+                                    ""));
+        }
+
         DistributedQueryRunner queryRunner = null;
         try {
             queryRunner =
@@ -620,9 +643,7 @@ public abstract class TestTrinoITCase extends 
AbstractTestQueryFramework {
                         + "bucket_key = 'order_key',"
                         + "changelog_producer = 'input'"
                         + ")");
-        assertThat(sql("SHOW TABLES FROM paimon.default"))
-                .isEqualTo(
-                        "[[empty_t], [orders], [t1], [t100], [t101], [t102], 
[t2], [t3], [t4], [t99]]");
+        assertThat(sql("SHOW TABLES FROM paimon.default")).contains("orders");
         sql("DROP TABLE IF EXISTS paimon.default.orders");
     }
 
@@ -644,9 +665,8 @@ public abstract class TestTrinoITCase extends 
AbstractTestQueryFramework {
                         + "changelog_producer = 'input'"
                         + ")");
         sql("ALTER TABLE paimon.default.t5 RENAME TO t6");
-        assertThat(sql("SHOW TABLES FROM paimon.default"))
-                .isEqualTo(
-                        "[[empty_t], [t1], [t100], [t101], [t102], [t2], [t3], 
[t4], [t6], [t99]]");
+        String result = sql("SHOW TABLES FROM paimon.default");
+        assertThat(result).doesNotContain("t5").contains("t6");
         sql("DROP TABLE IF EXISTS paimon.default.t6");
     }
 
@@ -668,8 +688,7 @@ public abstract class TestTrinoITCase extends 
AbstractTestQueryFramework {
                         + "changelog_producer = 'input'"
                         + ")");
         sql("DROP TABLE IF EXISTS paimon.default.t5");
-        assertThat(sql("SHOW TABLES FROM paimon.default"))
-                .isEqualTo("[[empty_t], [t1], [t100], [t101], [t102], [t2], 
[t3], [t4], [t99]]");
+        assertThat(sql("SHOW TABLES FROM 
paimon.default")).doesNotContain("t5");
     }
 
     @Test
@@ -833,6 +852,14 @@ public abstract class TestTrinoITCase extends 
AbstractTestQueryFramework {
         assertThat(sql("SELECT * FROM paimon.default.t102 where c = 
2")).isEqualTo("[[a2, 2, 2]]");
     }
 
+    @Test
+    public void testInsertInto() {
+        sql(
+                "INSERT INTO paimon.default.t103 VALUES 
(1,'1'),(2,'2'),(3,'3'),(4,'4'),(5,'5'),(6,'6')");
+        assertThat(sql("SELECT * FROM paimon.default.t103 order by id asc"))
+                .isEqualTo("[[1, 1], [2, 2], [3, 3], [4, 4], [5, 5], [6, 6]]");
+    }
+
     protected String sql(String sql) {
         MaterializedResult result = getQueryRunner().execute(sql);
         return result.getMaterializedRows().toString();
diff --git 
a/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoPartitioningHandle.java
 
b/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoPartitioningHandle.java
new file mode 100644
index 0000000..7d97c7c
--- /dev/null
+++ 
b/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoPartitioningHandle.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino;
+
+import org.apache.paimon.utils.InstantiationUtil;
+
+import io.airlift.json.JsonCodec;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link TrinoPartitioningHandle}. */
+public class TestTrinoPartitioningHandle {
+
+    private final JsonCodec<TrinoPartitioningHandle> codec =
+            JsonCodec.jsonCodec(TrinoPartitioningHandle.class);
+
+    @Test
+    public void testTrinoPartitioningHandle() throws Exception {
+        byte[] schemaData = InstantiationUtil.serializeObject("test_schema");
+        TrinoPartitioningHandle expected = new 
TrinoPartitioningHandle(schemaData);
+        testRoundTrip(expected);
+    }
+
+    private void testRoundTrip(TrinoPartitioningHandle expected) {
+        String json = codec.toJson(expected);
+        TrinoPartitioningHandle actual = codec.fromJson(json);
+        assertThat(actual).isEqualTo(expected);
+        assertThat(actual.getSchema()).isEqualTo(expected.getSchema());
+    }
+}
diff --git 
a/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoRow.java 
b/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoRow.java
new file mode 100644
index 0000000..580a93d
--- /dev/null
+++ 
b/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoRow.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.types.RowKind;
+
+import io.airlift.slice.Slices;
+import io.trino.spi.Page;
+import io.trino.spi.type.DecimalType;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.time.LocalDateTime;
+
+import static io.trino.spi.type.BigintType.BIGINT;
+import static io.trino.spi.type.BooleanType.BOOLEAN;
+import static io.trino.spi.type.Decimals.encodeScaledValue;
+import static io.trino.spi.type.Decimals.encodeShortScaledValue;
+import static io.trino.spi.type.DoubleType.DOUBLE;
+import static io.trino.spi.type.IntegerType.INTEGER;
+import static io.trino.spi.type.RealType.REAL;
+import static io.trino.spi.type.SmallintType.SMALLINT;
+import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS;
+import static io.trino.spi.type.TypeUtils.writeNativeValue;
+import static io.trino.spi.type.VarbinaryType.VARBINARY;
+import static io.trino.spi.type.VarcharType.VARCHAR;
+import static io.trino.type.DateTimes.MICROSECONDS_PER_MILLISECOND;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/** Test for {@link TrinoRow}. */
+public class TestTrinoRow {
+
+    @Test
+    void test() {
+        Page singlePage =
+                new Page(
+                        1,
+                        writeNativeValue(BOOLEAN, null),
+                        writeNativeValue(BOOLEAN, false),
+                        writeNativeValue(VARBINARY, 
Slices.wrappedBuffer((byte) 22)),
+                        writeNativeValue(SMALLINT, 356L),
+                        writeNativeValue(INTEGER, 4L),
+                        writeNativeValue(BIGINT, 23567222L),
+                        writeNativeValue(REAL, (long) 
Float.floatToIntBits(1213.33f)),
+                        writeNativeValue(DOUBLE, 121.3d),
+                        writeNativeValue(
+                                VARCHAR,
+                                Slices.wrappedBuffer(
+                                        new 
String("rfyu").getBytes(StandardCharsets.UTF_8))),
+                        writeNativeValue(
+                                DecimalType.createDecimalType(2, 2),
+                                
encodeShortScaledValue(BigDecimal.valueOf(0.21), 2)),
+                        writeNativeValue(
+                                DecimalType.createDecimalType(38, 2),
+                                
encodeScaledValue(BigDecimal.valueOf(65782123123.01), 2)),
+                        writeNativeValue(
+                                DecimalType.createDecimalType(10, 1),
+                                
encodeShortScaledValue(BigDecimal.valueOf(62123123.5), 1)),
+                        writeNativeValue(
+                                TIMESTAMP_MICROS,
+                                Timestamp.fromLocalDateTime(
+                                                        
LocalDateTime.parse("2007-12-03T10:15:30"))
+                                                .getMillisecond()
+                                        * MICROSECONDS_PER_MILLISECOND),
+                        writeNativeValue(
+                                VARBINARY,
+                                Slices.wrappedBuffer(
+                                        
"varbinary_v".getBytes(StandardCharsets.UTF_8))));
+        TrinoRow trinoRow = new TrinoRow(singlePage, RowKind.INSERT);
+
+        assertThat(trinoRow.getRowKind()).isEqualTo(RowKind.INSERT);
+        assertThat(trinoRow.isNullAt(0)).isEqualTo(true);
+        assertThat(trinoRow.getBoolean(1)).isEqualTo(false);
+        assertThat(trinoRow.getByte(2)).isEqualTo((byte) 22);
+        assertThat(trinoRow.getShort(3)).isEqualTo((short) 356);
+        assertThat(trinoRow.getInt(4)).isEqualTo(4);
+        assertThat(trinoRow.getLong(5)).isEqualTo(23567222L);
+        assertThat(trinoRow.getFloat(6)).isEqualTo(1213.33f);
+        assertThat(trinoRow.getDouble(7)).isEqualTo(121.3d);
+        
assertThat(trinoRow.getString(8)).isEqualTo(BinaryString.fromString("rfyu"));
+        assertThat(trinoRow.getDecimal(9, 2, 2))
+                .isEqualTo(Decimal.fromBigDecimal(BigDecimal.valueOf(0.21), 2, 
2));
+        assertThat(trinoRow.getDecimal(10, 38, 2))
+                
.isEqualTo(Decimal.fromBigDecimal(BigDecimal.valueOf(65782123123.01), 38, 2));
+        assertThat(trinoRow.getDecimal(11, 10, 1))
+                
.isEqualTo(Decimal.fromBigDecimal(BigDecimal.valueOf(62123123.5), 10, 1));
+        assertThat(trinoRow.getTimestamp(12, 6))
+                
.isEqualTo(Timestamp.fromLocalDateTime(LocalDateTime.parse("2007-12-03T10:15:30")));
+        assertThat(trinoRow.getBinary(13))
+                .isEqualTo("varbinary_v".getBytes(StandardCharsets.UTF_8));
+    }
+}

Reply via email to