This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-trino.git


The following commit(s) were added to refs/heads/main by this push:
     new d2114fe  Support insert operations for append table and append queue 
(#89)
d2114fe is described below

commit d2114fe49364999f998a037a723e8a81b327e58d
Author: rfyu <[email protected]>
AuthorDate: Tue Nov 26 13:24:58 2024 +0800

    Support insert operations for append table and append queue (#89)
---
 .../org/apache/paimon/trino/TrinoMetadata.java     | 33 ++++++++---
 .../trino/TrinoNodePartitioningProvider.java       | 19 ++++--
 .../apache/paimon/trino/TrinoPageSinkProvider.java | 12 +---
 .../paimon/trino/TrinoPartitioningHandle.java      | 12 +++-
 .../paimon/trino/UnawareTableShuffleFunction.java  | 63 ++++++++++++++++++++
 .../org/apache/paimon/trino/TestTrinoITCase.java   | 68 ++++++++++++++++++++--
 .../paimon/trino/TestTrinoPartitioningHandle.java  |  4 +-
 7 files changed, 184 insertions(+), 27 deletions(-)

diff --git 
a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoMetadata.java 
b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoMetadata.java
index e896ea0..b987ede 100644
--- a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoMetadata.java
+++ b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoMetadata.java
@@ -92,6 +92,8 @@ import static java.lang.String.format;
 import static java.util.Objects.requireNonNull;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toMap;
+import static org.apache.paimon.table.BucketMode.FIXED;
+import static org.apache.paimon.table.BucketMode.UNAWARE;
 import static org.apache.paimon.trino.TrinoColumnHandle.TRINO_ROW_ID_NAME;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
@@ -126,8 +128,9 @@ public class TrinoMetadata implements ConnectorMetadata {
                             new ConnectorTableLayout(
                                     new TrinoPartitioningHandle(
                                             InstantiationUtil.serializeObject(
-                                                    fileStoreTable.schema())),
-                                    table.primaryKeys(),
+                                                    fileStoreTable.schema()),
+                                            FIXED),
+                                    fileStoreTable.schema().bucketKeys(),
                                     false));
                 } catch (IOException e) {
                     throw new RuntimeException(e);
@@ -140,11 +143,18 @@ public class TrinoMetadata implements ConnectorMetadata {
                 }
                 throw new IllegalArgumentException("Global dynamic bucket mode 
are not supported");
             case UNAWARE:
-                if (!table.primaryKeys().isEmpty()) {
-                    throw new IllegalArgumentException(
-                            "Only append table can support unaware bucket.");
+                try {
+                    return Optional.of(
+                            new ConnectorTableLayout(
+                                    new TrinoPartitioningHandle(
+                                            InstantiationUtil.serializeObject(
+                                                    fileStoreTable.schema()),
+                                            UNAWARE),
+                                    fileStoreTable.schema().partitionKeys(),
+                                    true));
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
                 }
-                throw new IllegalArgumentException("Unaware bucket mode are 
not supported");
             default:
                 throw new IllegalArgumentException("Unknown bucket mode");
         }
@@ -232,6 +242,14 @@ public class TrinoMetadata implements ConnectorMetadata {
             ConnectorSession session, ConnectorTableHandle tableHandle) {
         TrinoTableHandle trinoTableHandle = (TrinoTableHandle) tableHandle;
         Table table = trinoTableHandle.table(catalog);
+        try {
+            if (table.getClass()
+                    == 
Class.forName("org.apache.paimon.table.AppendOnlyFileStoreTable")) {
+                throw new IllegalArgumentException("Append-only table does not 
support upsert");
+            }
+        } catch (ClassNotFoundException e) {
+            throw new RuntimeException(e);
+        }
         Set<String> pkSet = 
table.primaryKeys().stream().collect(Collectors.toSet());
         DataField[] row =
                 table.rowType().getFields().stream()
@@ -255,7 +273,8 @@ public class TrinoMetadata implements ConnectorMetadata {
                 try {
                     return Optional.of(
                             new TrinoPartitioningHandle(
-                                    
InstantiationUtil.serializeObject(fileStoreTable.schema())));
+                                    
InstantiationUtil.serializeObject(fileStoreTable.schema()),
+                                    FIXED));
                 } catch (IOException e) {
                     throw new RuntimeException(e);
                 }
diff --git 
a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoNodePartitioningProvider.java
 
b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoNodePartitioningProvider.java
index 419f8a1..c4c5bbb 100644
--- 
a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoNodePartitioningProvider.java
+++ 
b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoNodePartitioningProvider.java
@@ -40,9 +40,20 @@ public class TrinoNodePartitioningProvider implements 
ConnectorNodePartitioningP
             ConnectorSession session,
             ConnectorPartitioningHandle partitioningHandle,
             List<Type> partitionChannelTypes,
-            int bucketCount) {
-        // todo support different types of tables according to different 
PartitioningHandle
-        return new FixedBucketTableShuffleFunction(
-                partitionChannelTypes, (TrinoPartitioningHandle) 
partitioningHandle, bucketCount);
+            int workerCount) {
+        // todo support dynamic bucket tables
+        TrinoPartitioningHandle trinoPartitioningHandle =
+                (TrinoPartitioningHandle) partitioningHandle;
+        switch (trinoPartitioningHandle.getBucketMode()) {
+            case FIXED:
+                return new FixedBucketTableShuffleFunction(
+                        partitionChannelTypes, trinoPartitioningHandle, 
workerCount);
+            case UNAWARE:
+                return new UnawareTableShuffleFunction(
+                        partitionChannelTypes, trinoPartitioningHandle, 
workerCount);
+            default:
+                throw new UnsupportedOperationException(
+                        "Unsupported bucket mode: " + 
trinoPartitioningHandle.getBucketMode());
+        }
     }
 }
diff --git 
a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoPageSinkProvider.java
 
b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoPageSinkProvider.java
index c8e2c7d..1ee2601 100644
--- 
a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoPageSinkProvider.java
+++ 
b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoPageSinkProvider.java
@@ -95,9 +95,7 @@ public class TrinoPageSinkProvider implements 
ConnectorPageSinkProvider {
                         : BucketMode.FIXED;
         switch (mode) {
             case FIXED:
-                if (table.primaryKeys().isEmpty()) {
-                    throw new IllegalArgumentException("Only support 
primary-key table.");
-                }
+            case UNAWARE:
                 break;
             case DYNAMIC:
             case GLOBAL_DYNAMIC:
@@ -106,14 +104,8 @@ public class TrinoPageSinkProvider implements 
ConnectorPageSinkProvider {
                             "Only primary-key table can support dynamic 
bucket.");
                 }
                 throw new IllegalArgumentException("Global dynamic bucket mode 
are not supported");
-            case UNAWARE:
-                if (!table.primaryKeys().isEmpty()) {
-                    throw new IllegalArgumentException(
-                            "Only append table can support unaware bucket.");
-                }
-                throw new IllegalArgumentException("Unaware bucket mode are 
not supported");
             default:
-                throw new IllegalArgumentException("Unknown bucket mode");
+                throw new IllegalArgumentException("Unknown bucket mode: " + 
mode);
         }
     }
 
diff --git 
a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoPartitioningHandle.java
 
b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoPartitioningHandle.java
index 793bf02..0da69d8 100644
--- 
a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoPartitioningHandle.java
+++ 
b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoPartitioningHandle.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.trino;
 
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.utils.InstantiationUtil;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
@@ -32,10 +33,14 @@ import java.util.Arrays;
 public class TrinoPartitioningHandle implements ConnectorPartitioningHandle {
 
     private final byte[] schema;
+    private final BucketMode bucketMode;
 
     @JsonCreator
-    public TrinoPartitioningHandle(@JsonProperty("schema") byte[] schema) {
+    public TrinoPartitioningHandle(
+            @JsonProperty("schema") byte[] schema,
+            @JsonProperty("bucketMode") BucketMode bucketMode) {
         this.schema = schema;
+        this.bucketMode = bucketMode;
     }
 
     @JsonProperty
@@ -43,6 +48,11 @@ public class TrinoPartitioningHandle implements 
ConnectorPartitioningHandle {
         return schema;
     }
 
+    @JsonProperty
+    public BucketMode getBucketMode() {
+        return bucketMode;
+    }
+
     public TableSchema getOriginalSchema() {
         try {
             return InstantiationUtil.deserializeObject(this.schema, 
getClass().getClassLoader());
diff --git 
a/paimon-trino-440/src/main/java/org/apache/paimon/trino/UnawareTableShuffleFunction.java
 
b/paimon-trino-440/src/main/java/org/apache/paimon/trino/UnawareTableShuffleFunction.java
new file mode 100644
index 0000000..c9ed4be
--- /dev/null
+++ 
b/paimon-trino-440/src/main/java/org/apache/paimon/trino/UnawareTableShuffleFunction.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino;
+
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.codegen.Projection;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.RowKind;
+
+import io.trino.spi.Page;
+import io.trino.spi.connector.BucketFunction;
+import io.trino.spi.type.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/** Trino {@link BucketFunction}. */
+public class UnawareTableShuffleFunction implements BucketFunction {
+    private static final Logger LOG = 
LoggerFactory.getLogger(UnawareTableShuffleFunction.class);
+    private final int workerCount;
+    private final boolean hasPartitionKeys;
+    private final Projection partitionProjection;
+
+    public UnawareTableShuffleFunction(
+            List<Type> partitionChannelTypes,
+            TrinoPartitioningHandle partitioningHandle,
+            int workerCount) {
+        this.hasPartitionKeys = partitionChannelTypes.size() > 0;
+        TableSchema schema = partitioningHandle.getOriginalSchema();
+        this.partitionProjection =
+                CodeGenUtils.newProjection(schema.logicalPartitionType(), 
schema.partitionKeys());
+        this.workerCount = workerCount;
+    }
+
+    @Override
+    public int getBucket(Page page, int position) {
+        if (!hasPartitionKeys) {
+            return 0;
+        } else {
+            TrinoRow trinoRow = new 
TrinoRow(page.getSingleValuePage(position), RowKind.INSERT);
+            BinaryRow partition = partitionProjection.apply(trinoRow);
+            return partition.hashCode() % workerCount;
+        }
+    }
+}
diff --git 
a/paimon-trino-440/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java 
b/paimon-trino-440/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
index 2c5e966..538b592 100644
--- 
a/paimon-trino-440/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
+++ 
b/paimon-trino-440/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
@@ -484,7 +484,7 @@ public class TestTrinoITCase extends 
AbstractTestQueryFramework {
         }
 
         {
-            Path tablePath = new Path(warehouse, "default.db/t103");
+            Path tablePath = new Path(warehouse, 
"default.db/fixed_bucket_table_wi_pk");
             RowType rowType =
                     new RowType(
                             Arrays.asList(
@@ -506,6 +506,50 @@ public class TestTrinoITCase extends 
AbstractTestQueryFramework {
                                     ""));
         }
 
+        {
+            Path tablePath = new Path(warehouse, 
"default.db/fixed_bucket_table_wo_pk");
+            RowType rowType =
+                    new RowType(
+                            Arrays.asList(
+                                    new DataField(0, "id", DataTypes.INT()),
+                                    new DataField(1, "name", 
DataTypes.STRING())));
+            new SchemaManager(LocalFileIO.create(), tablePath)
+                    .createTable(
+                            new Schema(
+                                    rowType.getFields(),
+                                    Collections.emptyList(),
+                                    Collections.emptyList(),
+                                    new HashMap<>() {
+                                        {
+                                            put("file.format", "orc");
+                                            put("bucket", "2");
+                                            put("bucket-key", "id");
+                                        }
+                                    },
+                                    ""));
+        }
+
+        {
+            Path tablePath = new Path(warehouse, "default.db/unaware_table");
+            RowType rowType =
+                    new RowType(
+                            Arrays.asList(
+                                    new DataField(0, "id", DataTypes.INT()),
+                                    new DataField(1, "name", 
DataTypes.STRING())));
+            new SchemaManager(LocalFileIO.create(), tablePath)
+                    .createTable(
+                            new Schema(
+                                    rowType.getFields(),
+                                    Collections.emptyList(),
+                                    Collections.emptyList(),
+                                    new HashMap<>() {
+                                        {
+                                            put("file.format", "orc");
+                                        }
+                                    },
+                                    ""));
+        }
+
         DistributedQueryRunner queryRunner = null;
         try {
             queryRunner =
@@ -847,13 +891,29 @@ public class TestTrinoITCase extends 
AbstractTestQueryFramework {
     }
 
     @Test
-    public void testInsertInto() {
+    public void testInsertIntoFixedBucketTableWiPk() {
         sql(
-                "INSERT INTO paimon.default.t103 VALUES 
(1,'1'),(2,'2'),(3,'3'),(4,'4'),(5,'5'),(6,'6')");
-        assertThat(sql("SELECT * FROM paimon.default.t103 order by id asc"))
+                "INSERT INTO paimon.default.fixed_bucket_table_wi_pk VALUES 
(1,'1'),(2,'2'),(3,'3'),(4,'4'),(5,'5'),(6,'6')");
+        assertThat(sql("SELECT * FROM paimon.default.fixed_bucket_table_wi_pk 
order by id asc"))
                 .isEqualTo("[[1, 1], [2, 2], [3, 3], [4, 4], [5, 5], [6, 6]]");
     }
 
+    @Test
+    public void testInsertIntoFixedBucketTableWoPk() {
+        sql(
+                "INSERT INTO paimon.default.fixed_bucket_table_wo_pk VALUES 
(1,'1'),(2,'2'),(3,'3'),(4,'4'),(1,'1'),(2,'2'),(3,'3'),(4,'4')");
+        assertThat(sql("SELECT * FROM paimon.default.fixed_bucket_table_wo_pk 
order by id asc"))
+                .isEqualTo("[[1, 1], [1, 1], [2, 2], [2, 2], [3, 3], [3, 3], 
[4, 4], [4, 4]]");
+    }
+
+    @Test
+    public void testInsertIntoUnawareTable() {
+        sql(
+                "INSERT INTO paimon.default.unaware_table VALUES 
(1,'1'),(2,'2'),(3,'3'),(4,'4'),(1,'1'),(2,'2'),(3,'3'),(4,'4')");
+        assertThat(sql("SELECT * FROM paimon.default.unaware_table order by id 
asc"))
+                .isEqualTo("[[1, 1], [1, 1], [2, 2], [2, 2], [3, 3], [3, 3], 
[4, 4], [4, 4]]");
+    }
+
     protected String sql(String sql) {
         MaterializedResult result = getQueryRunner().execute(sql);
         return result.getMaterializedRows().toString();
diff --git 
a/paimon-trino-440/src/test/java/org/apache/paimon/trino/TestTrinoPartitioningHandle.java
 
b/paimon-trino-440/src/test/java/org/apache/paimon/trino/TestTrinoPartitioningHandle.java
index 7d97c7c..d23a21e 100644
--- 
a/paimon-trino-440/src/test/java/org/apache/paimon/trino/TestTrinoPartitioningHandle.java
+++ 
b/paimon-trino-440/src/test/java/org/apache/paimon/trino/TestTrinoPartitioningHandle.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.trino;
 
+import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.utils.InstantiationUtil;
 
 import io.airlift.json.JsonCodec;
@@ -34,7 +35,8 @@ public class TestTrinoPartitioningHandle {
     @Test
     public void testTrinoPartitioningHandle() throws Exception {
         byte[] schemaData = InstantiationUtil.serializeObject("test_schema");
-        TrinoPartitioningHandle expected = new 
TrinoPartitioningHandle(schemaData);
+        TrinoPartitioningHandle expected =
+                new TrinoPartitioningHandle(schemaData, BucketMode.FIXED);
         testRoundTrip(expected);
     }
 

Reply via email to