This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-trino.git
The following commit(s) were added to refs/heads/main by this push:
new d2114fe Support insert operations for append table and append queue
(#89)
d2114fe is described below
commit d2114fe49364999f998a037a723e8a81b327e58d
Author: rfyu <[email protected]>
AuthorDate: Tue Nov 26 13:24:58 2024 +0800
Support insert operations for append table and append queue (#89)
---
.../org/apache/paimon/trino/TrinoMetadata.java | 33 ++++++++---
.../trino/TrinoNodePartitioningProvider.java | 19 ++++--
.../apache/paimon/trino/TrinoPageSinkProvider.java | 12 +---
.../paimon/trino/TrinoPartitioningHandle.java | 12 +++-
.../paimon/trino/UnawareTableShuffleFunction.java | 63 ++++++++++++++++++++
.../org/apache/paimon/trino/TestTrinoITCase.java | 68 ++++++++++++++++++++--
.../paimon/trino/TestTrinoPartitioningHandle.java | 4 +-
7 files changed, 184 insertions(+), 27 deletions(-)
diff --git
a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoMetadata.java
b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoMetadata.java
index e896ea0..b987ede 100644
--- a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoMetadata.java
+++ b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoMetadata.java
@@ -92,6 +92,8 @@ import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
+import static org.apache.paimon.table.BucketMode.FIXED;
+import static org.apache.paimon.table.BucketMode.UNAWARE;
import static org.apache.paimon.trino.TrinoColumnHandle.TRINO_ROW_ID_NAME;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -126,8 +128,9 @@ public class TrinoMetadata implements ConnectorMetadata {
new ConnectorTableLayout(
new TrinoPartitioningHandle(
InstantiationUtil.serializeObject(
- fileStoreTable.schema())),
- table.primaryKeys(),
+ fileStoreTable.schema()),
+ FIXED),
+ fileStoreTable.schema().bucketKeys(),
false));
} catch (IOException e) {
throw new RuntimeException(e);
@@ -140,11 +143,18 @@ public class TrinoMetadata implements ConnectorMetadata {
}
throw new IllegalArgumentException("Global dynamic bucket mode
are not supported");
case UNAWARE:
- if (!table.primaryKeys().isEmpty()) {
- throw new IllegalArgumentException(
- "Only append table can support unaware bucket.");
+ try {
+ return Optional.of(
+ new ConnectorTableLayout(
+ new TrinoPartitioningHandle(
+ InstantiationUtil.serializeObject(
+ fileStoreTable.schema()),
+ UNAWARE),
+ fileStoreTable.schema().partitionKeys(),
+ true));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
- throw new IllegalArgumentException("Unaware bucket mode are
not supported");
default:
throw new IllegalArgumentException("Unknown bucket mode");
}
@@ -232,6 +242,14 @@ public class TrinoMetadata implements ConnectorMetadata {
ConnectorSession session, ConnectorTableHandle tableHandle) {
TrinoTableHandle trinoTableHandle = (TrinoTableHandle) tableHandle;
Table table = trinoTableHandle.table(catalog);
+ try {
+ if (table.getClass()
+ ==
Class.forName("org.apache.paimon.table.AppendOnlyFileStoreTable")) {
+ throw new IllegalArgumentException("Append-only table does not
support upsert");
+ }
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
Set<String> pkSet =
table.primaryKeys().stream().collect(Collectors.toSet());
DataField[] row =
table.rowType().getFields().stream()
@@ -255,7 +273,8 @@ public class TrinoMetadata implements ConnectorMetadata {
try {
return Optional.of(
new TrinoPartitioningHandle(
-
InstantiationUtil.serializeObject(fileStoreTable.schema())));
+
InstantiationUtil.serializeObject(fileStoreTable.schema()),
+ FIXED));
} catch (IOException e) {
throw new RuntimeException(e);
}
diff --git
a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoNodePartitioningProvider.java
b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoNodePartitioningProvider.java
index 419f8a1..c4c5bbb 100644
---
a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoNodePartitioningProvider.java
+++
b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoNodePartitioningProvider.java
@@ -40,9 +40,20 @@ public class TrinoNodePartitioningProvider implements
ConnectorNodePartitioningP
ConnectorSession session,
ConnectorPartitioningHandle partitioningHandle,
List<Type> partitionChannelTypes,
- int bucketCount) {
- // todo support different types of tables according to different
PartitioningHandle
- return new FixedBucketTableShuffleFunction(
- partitionChannelTypes, (TrinoPartitioningHandle)
partitioningHandle, bucketCount);
+ int workerCount) {
+ // todo support dynamic bucket tables
+ TrinoPartitioningHandle trinoPartitioningHandle =
+ (TrinoPartitioningHandle) partitioningHandle;
+ switch (trinoPartitioningHandle.getBucketMode()) {
+ case FIXED:
+ return new FixedBucketTableShuffleFunction(
+ partitionChannelTypes, trinoPartitioningHandle,
workerCount);
+ case UNAWARE:
+ return new UnawareTableShuffleFunction(
+ partitionChannelTypes, trinoPartitioningHandle,
workerCount);
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported bucket mode: " +
trinoPartitioningHandle.getBucketMode());
+ }
}
}
diff --git
a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoPageSinkProvider.java
b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoPageSinkProvider.java
index c8e2c7d..1ee2601 100644
---
a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoPageSinkProvider.java
+++
b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoPageSinkProvider.java
@@ -95,9 +95,7 @@ public class TrinoPageSinkProvider implements
ConnectorPageSinkProvider {
: BucketMode.FIXED;
switch (mode) {
case FIXED:
- if (table.primaryKeys().isEmpty()) {
- throw new IllegalArgumentException("Only support
primary-key table.");
- }
+ case UNAWARE:
break;
case DYNAMIC:
case GLOBAL_DYNAMIC:
@@ -106,14 +104,8 @@ public class TrinoPageSinkProvider implements
ConnectorPageSinkProvider {
"Only primary-key table can support dynamic
bucket.");
}
throw new IllegalArgumentException("Global dynamic bucket mode
are not supported");
- case UNAWARE:
- if (!table.primaryKeys().isEmpty()) {
- throw new IllegalArgumentException(
- "Only append table can support unaware bucket.");
- }
- throw new IllegalArgumentException("Unaware bucket mode are
not supported");
default:
- throw new IllegalArgumentException("Unknown bucket mode");
+ throw new IllegalArgumentException("Unknown bucket mode: " +
mode);
}
}
diff --git
a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoPartitioningHandle.java
b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoPartitioningHandle.java
index 793bf02..0da69d8 100644
---
a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoPartitioningHandle.java
+++
b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoPartitioningHandle.java
@@ -19,6 +19,7 @@
package org.apache.paimon.trino;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.BucketMode;
import org.apache.paimon.utils.InstantiationUtil;
import com.fasterxml.jackson.annotation.JsonCreator;
@@ -32,10 +33,14 @@ import java.util.Arrays;
public class TrinoPartitioningHandle implements ConnectorPartitioningHandle {
private final byte[] schema;
+ private final BucketMode bucketMode;
@JsonCreator
- public TrinoPartitioningHandle(@JsonProperty("schema") byte[] schema) {
+ public TrinoPartitioningHandle(
+ @JsonProperty("schema") byte[] schema,
+ @JsonProperty("bucketMode") BucketMode bucketMode) {
this.schema = schema;
+ this.bucketMode = bucketMode;
}
@JsonProperty
@@ -43,6 +48,11 @@ public class TrinoPartitioningHandle implements
ConnectorPartitioningHandle {
return schema;
}
+ @JsonProperty
+ public BucketMode getBucketMode() {
+ return bucketMode;
+ }
+
public TableSchema getOriginalSchema() {
try {
return InstantiationUtil.deserializeObject(this.schema,
getClass().getClassLoader());
diff --git
a/paimon-trino-440/src/main/java/org/apache/paimon/trino/UnawareTableShuffleFunction.java
b/paimon-trino-440/src/main/java/org/apache/paimon/trino/UnawareTableShuffleFunction.java
new file mode 100644
index 0000000..c9ed4be
--- /dev/null
+++
b/paimon-trino-440/src/main/java/org/apache/paimon/trino/UnawareTableShuffleFunction.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino;
+
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.codegen.Projection;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.RowKind;
+
+import io.trino.spi.Page;
+import io.trino.spi.connector.BucketFunction;
+import io.trino.spi.type.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/** Trino {@link BucketFunction}. */
+public class UnawareTableShuffleFunction implements BucketFunction {
+ private static final Logger LOG =
LoggerFactory.getLogger(UnawareTableShuffleFunction.class);
+ private final int workerCount;
+ private final boolean hasPartitionKeys;
+ private final Projection partitionProjection;
+
+ public UnawareTableShuffleFunction(
+ List<Type> partitionChannelTypes,
+ TrinoPartitioningHandle partitioningHandle,
+ int workerCount) {
+ this.hasPartitionKeys = partitionChannelTypes.size() > 0;
+ TableSchema schema = partitioningHandle.getOriginalSchema();
+ this.partitionProjection =
+ CodeGenUtils.newProjection(schema.logicalPartitionType(),
schema.partitionKeys());
+ this.workerCount = workerCount;
+ }
+
+ @Override
+ public int getBucket(Page page, int position) {
+ if (!hasPartitionKeys) {
+ return 0;
+ } else {
+ TrinoRow trinoRow = new
TrinoRow(page.getSingleValuePage(position), RowKind.INSERT);
+ BinaryRow partition = partitionProjection.apply(trinoRow);
+ return partition.hashCode() % workerCount;
+ }
+ }
+}
diff --git
a/paimon-trino-440/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
b/paimon-trino-440/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
index 2c5e966..538b592 100644
---
a/paimon-trino-440/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
+++
b/paimon-trino-440/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
@@ -484,7 +484,7 @@ public class TestTrinoITCase extends
AbstractTestQueryFramework {
}
{
- Path tablePath = new Path(warehouse, "default.db/t103");
+ Path tablePath = new Path(warehouse,
"default.db/fixed_bucket_table_wi_pk");
RowType rowType =
new RowType(
Arrays.asList(
@@ -506,6 +506,50 @@ public class TestTrinoITCase extends
AbstractTestQueryFramework {
""));
}
+ {
+ Path tablePath = new Path(warehouse,
"default.db/fixed_bucket_table_wo_pk");
+ RowType rowType =
+ new RowType(
+ Arrays.asList(
+ new DataField(0, "id", DataTypes.INT()),
+ new DataField(1, "name",
DataTypes.STRING())));
+ new SchemaManager(LocalFileIO.create(), tablePath)
+ .createTable(
+ new Schema(
+ rowType.getFields(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ new HashMap<>() {
+ {
+ put("file.format", "orc");
+ put("bucket", "2");
+ put("bucket-key", "id");
+ }
+ },
+ ""));
+ }
+
+ {
+ Path tablePath = new Path(warehouse, "default.db/unaware_table");
+ RowType rowType =
+ new RowType(
+ Arrays.asList(
+ new DataField(0, "id", DataTypes.INT()),
+ new DataField(1, "name",
DataTypes.STRING())));
+ new SchemaManager(LocalFileIO.create(), tablePath)
+ .createTable(
+ new Schema(
+ rowType.getFields(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ new HashMap<>() {
+ {
+ put("file.format", "orc");
+ }
+ },
+ ""));
+ }
+
DistributedQueryRunner queryRunner = null;
try {
queryRunner =
@@ -847,13 +891,29 @@ public class TestTrinoITCase extends
AbstractTestQueryFramework {
}
@Test
- public void testInsertInto() {
+ public void testInsertIntoFixedBucketTableWiPk() {
sql(
- "INSERT INTO paimon.default.t103 VALUES
(1,'1'),(2,'2'),(3,'3'),(4,'4'),(5,'5'),(6,'6')");
- assertThat(sql("SELECT * FROM paimon.default.t103 order by id asc"))
+ "INSERT INTO paimon.default.fixed_bucket_table_wi_pk VALUES
(1,'1'),(2,'2'),(3,'3'),(4,'4'),(5,'5'),(6,'6')");
+ assertThat(sql("SELECT * FROM paimon.default.fixed_bucket_table_wi_pk
order by id asc"))
.isEqualTo("[[1, 1], [2, 2], [3, 3], [4, 4], [5, 5], [6, 6]]");
}
+ @Test
+ public void testInsertIntoFixedBucketTableWoPk() {
+ sql(
+ "INSERT INTO paimon.default.fixed_bucket_table_wo_pk VALUES
(1,'1'),(2,'2'),(3,'3'),(4,'4'),(1,'1'),(2,'2'),(3,'3'),(4,'4')");
+ assertThat(sql("SELECT * FROM paimon.default.fixed_bucket_table_wo_pk
order by id asc"))
+ .isEqualTo("[[1, 1], [1, 1], [2, 2], [2, 2], [3, 3], [3, 3],
[4, 4], [4, 4]]");
+ }
+
+ @Test
+ public void testInsertIntoUnawareTable() {
+ sql(
+ "INSERT INTO paimon.default.unaware_table VALUES
(1,'1'),(2,'2'),(3,'3'),(4,'4'),(1,'1'),(2,'2'),(3,'3'),(4,'4')");
+ assertThat(sql("SELECT * FROM paimon.default.unaware_table order by id
asc"))
+ .isEqualTo("[[1, 1], [1, 1], [2, 2], [2, 2], [3, 3], [3, 3],
[4, 4], [4, 4]]");
+ }
+
protected String sql(String sql) {
MaterializedResult result = getQueryRunner().execute(sql);
return result.getMaterializedRows().toString();
diff --git
a/paimon-trino-440/src/test/java/org/apache/paimon/trino/TestTrinoPartitioningHandle.java
b/paimon-trino-440/src/test/java/org/apache/paimon/trino/TestTrinoPartitioningHandle.java
index 7d97c7c..d23a21e 100644
---
a/paimon-trino-440/src/test/java/org/apache/paimon/trino/TestTrinoPartitioningHandle.java
+++
b/paimon-trino-440/src/test/java/org/apache/paimon/trino/TestTrinoPartitioningHandle.java
@@ -18,6 +18,7 @@
package org.apache.paimon.trino;
+import org.apache.paimon.table.BucketMode;
import org.apache.paimon.utils.InstantiationUtil;
import io.airlift.json.JsonCodec;
@@ -34,7 +35,8 @@ public class TestTrinoPartitioningHandle {
@Test
public void testTrinoPartitioningHandle() throws Exception {
byte[] schemaData = InstantiationUtil.serializeObject("test_schema");
- TrinoPartitioningHandle expected = new
TrinoPartitioningHandle(schemaData);
+ TrinoPartitioningHandle expected =
+ new TrinoPartitioningHandle(schemaData, BucketMode.FIXED);
testRoundTrip(expected);
}