This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-trino.git
The following commit(s) were added to refs/heads/main by this push:
new 6c98629 Supports insert operations (#86)
6c98629 is described below
commit 6c98629073c78ca8b00f860de171b956fba30169
Author: rfyu <[email protected]>
AuthorDate: Wed Nov 20 13:23:21 2024 +0800
Supports insert operations (#86)
---
.../apache/paimon/trino/fileio/TrinoFileIO.java | 9 +-
.../trino/FixedBucketTableShuffleFunction.java | 87 +++++++++
.../org/apache/paimon/trino/TrinoColumnHandle.java | 9 +
.../org/apache/paimon/trino/TrinoConnector.java | 21 +++
.../apache/paimon/trino/TrinoConnectorFactory.java | 8 +
.../paimon/trino/TrinoMergePageSourceWrapper.java | 96 ++++++++++
.../org/apache/paimon/trino/TrinoMergeSink.java | 112 ++++++++++++
...TrinoModule.java => TrinoMergeTableHandle.java} | 32 ++--
.../org/apache/paimon/trino/TrinoMetadata.java | 202 +++++++++++++++++++++
.../java/org/apache/paimon/trino/TrinoModule.java | 2 +
.../trino/TrinoNodePartitioningProvider.java | 48 +++++
.../org/apache/paimon/trino/TrinoPageSink.java | 96 ++++++++++
.../apache/paimon/trino/TrinoPageSinkProvider.java | 131 +++++++++++++
.../paimon/trino/TrinoPageSourceProvider.java | 45 ++++-
.../paimon/trino/TrinoPartitioningHandle.java | 67 +++++++
.../java/org/apache/paimon/trino/TrinoRow.java | 181 ++++++++++++++++++
.../paimon/trino/TrinoSessionProperties.java | 28 +++
.../org/apache/paimon/trino/TrinoTableHandle.java | 5 +-
.../apache/paimon/trino/fileio/TrinoFileIO.java | 9 +-
.../org/apache/paimon/trino/TestTrinoITCase.java | 43 ++++-
.../paimon/trino/TestTrinoPartitioningHandle.java | 47 +++++
.../java/org/apache/paimon/trino/TestTrinoRow.java | 112 ++++++++++++
22 files changed, 1353 insertions(+), 37 deletions(-)
diff --git
a/paimon-trino-420/src/main/java/org/apache/paimon/trino/fileio/TrinoFileIO.java
b/paimon-trino-420/src/main/java/org/apache/paimon/trino/fileio/TrinoFileIO.java
index c9eca78..29b1b56 100644
---
a/paimon-trino-420/src/main/java/org/apache/paimon/trino/fileio/TrinoFileIO.java
+++
b/paimon-trino-420/src/main/java/org/apache/paimon/trino/fileio/TrinoFileIO.java
@@ -32,6 +32,7 @@ import io.trino.filesystem.FileIterator;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoInputFile;
+import io.trino.filesystem.TrinoOutputFile;
import javax.annotation.Nullable;
@@ -65,9 +66,11 @@ public class TrinoFileIO implements FileIO {
}
@Override
- public PositionOutputStream newOutputStream(Path path, boolean b) throws
IOException {
+ public PositionOutputStream newOutputStream(Path path, boolean overwrite)
throws IOException {
+ TrinoOutputFile trinoOutputFile =
+ trinoFileSystem.newOutputFile(Location.of(path.toString()));
return new PositionOutputStreamWrapper(
-
trinoFileSystem.newOutputFile(Location.of(path.toString())).create());
+ overwrite ? trinoOutputFile.createOrOverwrite() :
trinoOutputFile.create());
}
@Override
@@ -122,7 +125,7 @@ public class TrinoFileIO implements FileIO {
}
@Override
- public boolean delete(Path path, boolean b) throws IOException {
+ public boolean delete(Path path, boolean recursive) throws IOException {
Location location = Location.of(path.toString());
if (trinoFileSystem.directoryExists(location).orElse(false)) {
trinoFileSystem.deleteDirectory(location);
diff --git
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/FixedBucketTableShuffleFunction.java
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/FixedBucketTableShuffleFunction.java
new file mode 100644
index 0000000..6732bcf
--- /dev/null
+++
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/FixedBucketTableShuffleFunction.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.codegen.Projection;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.KeyAndBucketExtractor;
+import org.apache.paimon.types.RowKind;
+
+import io.trino.spi.Page;
+import io.trino.spi.block.Block;
+import io.trino.spi.block.RowBlock;
+import io.trino.spi.connector.BucketFunction;
+import io.trino.spi.type.RowType;
+import io.trino.spi.type.Type;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.List;
+
+/** Trino {@link BucketFunction}. */
+public class FixedBucketTableShuffleFunction implements BucketFunction {
+
+ private final int workerCount;
+ private final int bucketCount;
+ private final boolean isRowId;
+ private final Projection pkProjection;
+
+ public FixedBucketTableShuffleFunction(
+ List<Type> partitionChannelTypes,
+ TrinoPartitioningHandle partitioningHandle,
+ int workerCount) {
+
+ TableSchema schema = partitioningHandle.getOriginalSchema();
+ this.pkProjection =
+ CodeGenUtils.newProjection(schema.logicalPrimaryKeysType(),
schema.primaryKeys());
+ this.bucketCount = new CoreOptions(schema.options()).bucket();
+ this.workerCount = workerCount;
+ this.isRowId =
+ partitionChannelTypes.size() == 1
+ && partitionChannelTypes.get(0) instanceof RowType;
+ }
+
+ @Override
+ public int getBucket(Page page, int position) {
+ if (isRowId) {
+ RowBlock rowBlock = (RowBlock) page.getBlock(0);
+ try {
+ Method method =
RowBlock.class.getDeclaredMethod("getRawFieldBlocks");
+ method.setAccessible(true);
+ page = new Page(rowBlock.getPositionCount(), (Block[])
method.invoke(rowBlock));
+ } catch (NoSuchMethodException e) {
+ throw new RuntimeException(e);
+ } catch (InvocationTargetException e) {
+ throw new RuntimeException(e);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ TrinoRow trinoRow = new TrinoRow(page.getSingleValuePage(position),
RowKind.INSERT);
+ BinaryRow pk = pkProjection.apply(trinoRow);
+ int bucket =
+ KeyAndBucketExtractor.bucket(
+ KeyAndBucketExtractor.bucketKeyHashCode(pk),
bucketCount);
+ return bucket % workerCount;
+ }
+}
diff --git
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoColumnHandle.java
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoColumnHandle.java
index 2a80a7e..ec413c0 100644
---
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoColumnHandle.java
+++
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoColumnHandle.java
@@ -31,9 +31,12 @@ import static java.util.Objects.requireNonNull;
/** Trino {@link ColumnHandle}. */
public final class TrinoColumnHandle implements ColumnHandle {
+
+ public static final String TRINO_ROW_ID_NAME = "$row_id";
private final String columnName;
private final String typeString;
private final Type trinoType;
+ private final boolean isRowId;
@JsonCreator
public TrinoColumnHandle(
@@ -43,6 +46,7 @@ public final class TrinoColumnHandle implements ColumnHandle {
this.columnName = requireNonNull(columnName, "columnName is null");
this.typeString = requireNonNull(typeString, "columnType is null");
this.trinoType = requireNonNull(trinoType, "columnType is null");
+ this.isRowId = TRINO_ROW_ID_NAME.equals(columnName);
}
public static TrinoColumnHandle of(String columnName, DataType columnType)
{
@@ -67,6 +71,11 @@ public final class TrinoColumnHandle implements ColumnHandle
{
return trinoType;
}
+ @JsonProperty
+ public boolean isRowId() {
+ return isRowId;
+ }
+
public DataType logicalType() {
return JsonSerdeUtil.fromJson(typeString, DataType.class);
}
diff --git
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoConnector.java
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoConnector.java
index 9097247..dff66e5 100644
---
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoConnector.java
+++
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoConnector.java
@@ -20,6 +20,8 @@ package org.apache.paimon.trino;
import io.trino.spi.connector.Connector;
import io.trino.spi.connector.ConnectorMetadata;
+import io.trino.spi.connector.ConnectorNodePartitioningProvider;
+import io.trino.spi.connector.ConnectorPageSinkProvider;
import io.trino.spi.connector.ConnectorPageSourceProvider;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplitManager;
@@ -38,6 +40,8 @@ public class TrinoConnector implements Connector {
private final ConnectorMetadata trinoMetadata;
private final ConnectorSplitManager trinoSplitManager;
private final ConnectorPageSourceProvider trinoPageSourceProvider;
+ private final ConnectorPageSinkProvider trinoPageSinkProvider;
+ private final ConnectorNodePartitioningProvider
trinoNodePartitioningProvider;
private final List<PropertyMetadata<?>> tableProperties;
private final List<PropertyMetadata<?>> sessionProperties;
@@ -45,12 +49,19 @@ public class TrinoConnector implements Connector {
ConnectorMetadata trinoMetadata,
ConnectorSplitManager trinoSplitManager,
ConnectorPageSourceProvider trinoPageSourceProvider,
+ ConnectorPageSinkProvider trinoPageSinkProvider,
+ ConnectorNodePartitioningProvider trinoNodePartitioningProvider,
TrinoTableOptions trinoTableOptions,
TrinoSessionProperties trinoSessionProperties) {
this.trinoMetadata = requireNonNull(trinoMetadata, "trinoMetadata is
null");
this.trinoSplitManager = requireNonNull(trinoSplitManager,
"trinoSplitManager is null");
this.trinoPageSourceProvider =
requireNonNull(trinoPageSourceProvider,
"trinoRecordSetProvider is null");
+ this.trinoPageSinkProvider =
+ requireNonNull(trinoPageSinkProvider, "trinoPageSinkProvider
is null");
+ this.trinoNodePartitioningProvider =
+ requireNonNull(
+ trinoNodePartitioningProvider,
"trinoNodePartitioningProvider is null");
this.tableProperties = trinoTableOptions.getTableProperties();
this.sessionProperties = trinoSessionProperties.getSessionProperties();
}
@@ -78,6 +89,11 @@ public class TrinoConnector implements Connector {
return trinoPageSourceProvider;
}
+ @Override
+ public ConnectorPageSinkProvider getPageSinkProvider() {
+ return trinoPageSinkProvider;
+ }
+
@Override
public List<PropertyMetadata<?>> getSessionProperties() {
return sessionProperties;
@@ -87,4 +103,9 @@ public class TrinoConnector implements Connector {
public List<PropertyMetadata<?>> getTableProperties() {
return tableProperties;
}
+
+ @Override
+ public ConnectorNodePartitioningProvider getNodePartitioningProvider() {
+ return trinoNodePartitioningProvider;
+ }
}
diff --git
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoConnectorFactory.java
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoConnectorFactory.java
index ad1d500..4327699 100644
---
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoConnectorFactory.java
+++
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoConnectorFactory.java
@@ -31,6 +31,7 @@ import io.trino.filesystem.manager.FileSystemModule;
import io.trino.hdfs.HdfsModule;
import io.trino.hdfs.authentication.HdfsAuthenticationModule;
import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorMetadata;
+import
io.trino.plugin.base.classloader.ClassLoaderSafeConnectorPageSinkProvider;
import
io.trino.plugin.base.classloader.ClassLoaderSafeConnectorPageSourceProvider;
import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorSplitManager;
import io.trino.plugin.hive.NodeVersion;
@@ -129,6 +130,10 @@ public class TrinoConnectorFactory implements
ConnectorFactory {
TrinoSplitManager trinoSplitManager =
injector.getInstance(TrinoSplitManager.class);
TrinoPageSourceProvider trinoPageSourceProvider =
injector.getInstance(TrinoPageSourceProvider.class);
+ TrinoPageSinkProvider trinoPageSinkProvider =
+ injector.getInstance(TrinoPageSinkProvider.class);
+ TrinoNodePartitioningProvider trinoNodePartitioningProvider =
+ injector.getInstance(TrinoNodePartitioningProvider.class);
TrinoSessionProperties trinoSessionProperties =
injector.getInstance(TrinoSessionProperties.class);
TrinoTableOptions trinoTableOptions =
injector.getInstance(TrinoTableOptions.class);
@@ -138,6 +143,9 @@ public class TrinoConnectorFactory implements
ConnectorFactory {
new
ClassLoaderSafeConnectorSplitManager(trinoSplitManager, classLoader),
new ClassLoaderSafeConnectorPageSourceProvider(
trinoPageSourceProvider, classLoader),
+ new ClassLoaderSafeConnectorPageSinkProvider(
+ trinoPageSinkProvider, classLoader),
+ trinoNodePartitioningProvider,
trinoTableOptions,
trinoSessionProperties);
}
diff --git
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoMergePageSourceWrapper.java
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoMergePageSourceWrapper.java
new file mode 100644
index 0000000..f29397a
--- /dev/null
+++
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoMergePageSourceWrapper.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino;
+
+import io.trino.spi.Page;
+import io.trino.spi.block.Block;
+import io.trino.spi.block.RowBlock;
+import io.trino.spi.connector.ConnectorPageSource;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Optional;
+
+/** Trino {@link ConnectorPageSource}. */
+public class TrinoMergePageSourceWrapper implements ConnectorPageSource {
+
+ private final ConnectorPageSource pageSource;
+ private final HashMap<String, Integer> fieldToIndex;
+
+ public TrinoMergePageSourceWrapper(
+ ConnectorPageSource pageSource, HashMap<String, Integer>
fieldToIndex) {
+ this.pageSource = pageSource;
+ this.fieldToIndex = fieldToIndex;
+ }
+
+ public static TrinoMergePageSourceWrapper wrap(
+ ConnectorPageSource pageSource, HashMap<String, Integer>
fieldToIndex) {
+ return new TrinoMergePageSourceWrapper(pageSource, fieldToIndex);
+ }
+
+ @Override
+ public long getCompletedBytes() {
+ return pageSource.getCompletedBytes();
+ }
+
+ @Override
+ public long getReadTimeNanos() {
+ return pageSource.getReadTimeNanos();
+ }
+
+ @Override
+ public boolean isFinished() {
+ return pageSource.isFinished();
+ }
+
+ @Override
+ public Page getNextPage() {
+ Page nextPage = pageSource.getNextPage();
+ if (nextPage == null) {
+ return null;
+ }
+ int rowCount = nextPage.getPositionCount();
+
+ Block[] newBlocks = new Block[nextPage.getChannelCount() + 1];
+ Block[] rowIdBlocks = new Block[fieldToIndex.size()];
+ for (int i = 0, idx = 0; i < nextPage.getChannelCount(); i++) {
+ Block block = nextPage.getBlock(i);
+ newBlocks[i] = block;
+ if (fieldToIndex.containsValue(i)) {
+ rowIdBlocks[idx] = block;
+ idx++;
+ }
+ }
+ newBlocks[nextPage.getChannelCount()] =
+ RowBlock.fromFieldBlocks(
+ rowCount, Optional.of(new
boolean[fieldToIndex.size()]), rowIdBlocks);
+
+ return new Page(rowCount, newBlocks);
+ }
+
+ @Override
+ public long getMemoryUsage() {
+ return pageSource.getMemoryUsage();
+ }
+
+ @Override
+ public void close() throws IOException {
+ pageSource.close();
+ }
+}
diff --git
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoMergeSink.java
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoMergeSink.java
new file mode 100644
index 0000000..597b4e7
--- /dev/null
+++
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoMergeSink.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino;
+
+import org.apache.paimon.types.RowKind;
+
+import io.airlift.slice.Slice;
+import io.trino.spi.Page;
+import io.trino.spi.block.Block;
+import io.trino.spi.connector.ConnectorMergeSink;
+import io.trino.spi.connector.ConnectorPageSink;
+import io.trino.spi.type.TinyintType;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.IntStream;
+
+/** Trino {@link ConnectorMergeSink}. */
+public class TrinoMergeSink implements ConnectorMergeSink {
+
+ private final TrinoPageSink pageSink;
+ private final int dataColumnCount;
+
+ public TrinoMergeSink(ConnectorPageSink pageSink, int dataColumnCount) {
+ this.pageSink = (TrinoPageSink) pageSink;
+ this.dataColumnCount = dataColumnCount;
+ }
+
+ @Override
+ public void storeMergedRows(Page page) {
+ int inputChannelCount = page.getChannelCount();
+ if (inputChannelCount != dataColumnCount + 2) {
+ throw new IllegalArgumentException(
+ String.format(
+ "inputPage channelCount (%s) == dataColumns size
(%s) + 2",
+ inputChannelCount, dataColumnCount));
+ } else {
+ int positionCount = page.getPositionCount();
+ if (positionCount <= 0) {
+ throw new IllegalArgumentException(
+ "positionCount should be > 0, but is " +
positionCount);
+ } else {
+ Block operationBlock = page.getBlock(inputChannelCount - 2);
+ int[] deletePositions = new int[positionCount];
+ int[] insertPositions = new int[positionCount];
+ int deletePositionCount = 0;
+ int insertPositionCount = 0;
+
+ for (int position = 0; position < positionCount; ++position) {
+ byte operation =
TinyintType.TINYINT.getByte(operationBlock, position);
+ switch (operation) {
+ case 1:
+ case 4:
+ insertPositions[insertPositionCount] = position;
+ ++insertPositionCount;
+ break;
+ case 2:
+ case 5:
+ deletePositions[deletePositionCount] = position;
+ ++deletePositionCount;
+ break;
+ case 3:
+ default:
+ throw new IllegalArgumentException(
+ "Invalid merge operation: " + operation);
+ }
+ }
+
+ Optional<Page> deletePage = Optional.empty();
+ if (deletePositionCount > 0) {
+ deletePage =
+ Optional.of(
+ page.getColumns(IntStream.range(0,
dataColumnCount).toArray())
+ .getPositions(deletePositions, 0,
deletePositionCount));
+ }
+
+ Optional<Page> insertPage = Optional.empty();
+ if (insertPositionCount > 0) {
+ insertPage =
+ Optional.of(
+ page.getColumns(IntStream.range(0,
dataColumnCount).toArray())
+ .getPositions(insertPositions, 0,
insertPositionCount));
+ }
+
+ deletePage.ifPresent(delete -> pageSink.writePage(delete,
RowKind.DELETE));
+ insertPage.ifPresent(insert -> pageSink.writePage(insert,
RowKind.INSERT));
+ }
+ }
+ }
+
+ @Override
+ public CompletableFuture<Collection<Slice>> finish() {
+ return pageSink.finish();
+ }
+}
diff --git
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoModule.java
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoMergeTableHandle.java
similarity index 51%
copy from
paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoModule.java
copy to
paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoMergeTableHandle.java
index da22ed6..901d6fa 100644
--- a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoModule.java
+++
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoMergeTableHandle.java
@@ -18,30 +18,24 @@
package org.apache.paimon.trino;
-import org.apache.paimon.options.Options;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.trino.spi.connector.ConnectorMergeTableHandle;
+import io.trino.spi.connector.ConnectorTableHandle;
-import com.google.inject.Binder;
-import com.google.inject.Module;
+/** Trino {@link ConnectorMergeTableHandle}. */
+public class TrinoMergeTableHandle implements ConnectorMergeTableHandle {
-import java.util.Map;
+ private final TrinoTableHandle tableHandle;
-import static com.google.inject.Scopes.SINGLETON;
-
-/** Module for binding instance. */
-public class TrinoModule implements Module {
- private Map<String, String> config;
-
- public TrinoModule(Map<String, String> config) {
- this.config = config;
+ @JsonCreator
+ public TrinoMergeTableHandle(@JsonProperty("tableHandle") TrinoTableHandle
tableHandle) {
+ this.tableHandle = tableHandle;
}
@Override
- public void configure(Binder binder) {
- binder.bind(Options.class).toInstance(new Options(config));
- binder.bind(TrinoMetadataFactory.class).in(SINGLETON);
- binder.bind(TrinoSplitManager.class).in(SINGLETON);
- binder.bind(TrinoPageSourceProvider.class).in(SINGLETON);
- binder.bind(TrinoSessionProperties.class).in(SINGLETON);
- binder.bind(TrinoTableOptions.class).in(SINGLETON);
+ @JsonProperty
+ public ConnectorTableHandle getTableHandle() {
+ return tableHandle;
}
}
diff --git
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoMetadata.java
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoMetadata.java
index 468d8aa..e896ea0 100644
---
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoMetadata.java
+++
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoMetadata.java
@@ -25,8 +25,15 @@ import org.apache.paimon.data.BinaryString;
import org.apache.paimon.fs.Path;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageSerializer;
import org.apache.paimon.trino.catalog.TrinoCatalog;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.InstantiationUtil;
import org.apache.paimon.utils.StringUtils;
import io.airlift.slice.Slice;
@@ -34,9 +41,15 @@ import io.trino.spi.TrinoException;
import io.trino.spi.connector.Assignment;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ColumnMetadata;
+import io.trino.spi.connector.ConnectorInsertTableHandle;
+import io.trino.spi.connector.ConnectorMergeTableHandle;
import io.trino.spi.connector.ConnectorMetadata;
+import io.trino.spi.connector.ConnectorOutputMetadata;
+import io.trino.spi.connector.ConnectorOutputTableHandle;
+import io.trino.spi.connector.ConnectorPartitioningHandle;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableHandle;
+import io.trino.spi.connector.ConnectorTableLayout;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.ConnectorTableProperties;
import io.trino.spi.connector.ConnectorTableVersion;
@@ -44,11 +57,14 @@ import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.ConstraintApplicationResult;
import io.trino.spi.connector.LimitApplicationResult;
import io.trino.spi.connector.ProjectionApplicationResult;
+import io.trino.spi.connector.RetryMode;
+import io.trino.spi.connector.RowChangeParadigm;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SchemaTablePrefix;
import io.trino.spi.expression.ConnectorExpression;
import io.trino.spi.predicate.Domain;
import io.trino.spi.security.TrinoPrincipal;
+import io.trino.spi.statistics.ComputedStatistics;
import io.trino.spi.type.LongTimestampWithTimeZone;
import io.trino.spi.type.TimestampWithTimeZoneType;
import io.trino.spi.type.Type;
@@ -56,6 +72,7 @@ import io.trino.spi.type.VarcharType;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -69,11 +86,13 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
+import static
io.trino.spi.connector.RowChangeParadigm.DELETE_ROW_AND_INSERT_ROW;
import static io.trino.spi.type.DateTimeEncoding.unpackMillisUtc;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
+import static org.apache.paimon.trino.TrinoColumnHandle.TRINO_ROW_ID_NAME;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Trino {@link ConnectorMetadata}. */
@@ -90,6 +109,189 @@ public class TrinoMetadata implements ConnectorMetadata {
return catalog;
}
+ // todo support dynamic bucket table
+ @Override
+ public Optional<ConnectorTableLayout> getInsertLayout(
+ ConnectorSession session, ConnectorTableHandle tableHandle) {
+ TrinoTableHandle trinoTableHandle = (TrinoTableHandle) tableHandle;
+ Table table = trinoTableHandle.table(catalog);
+ if (!(table instanceof FileStoreTable)) {
+ throw new IllegalArgumentException(table.getClass() + " is not
supported");
+ }
+ FileStoreTable fileStoreTable = (FileStoreTable) table;
+ switch (fileStoreTable.bucketMode()) {
+ case FIXED:
+ try {
+ return Optional.of(
+ new ConnectorTableLayout(
+ new TrinoPartitioningHandle(
+ InstantiationUtil.serializeObject(
+ fileStoreTable.schema())),
+ table.primaryKeys(),
+ false));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ case DYNAMIC:
+ case GLOBAL_DYNAMIC:
+ if (table.primaryKeys().isEmpty()) {
+ throw new IllegalArgumentException(
+ "Only primary-key table can support dynamic
bucket.");
+ }
+ throw new IllegalArgumentException("Global dynamic bucket mode
are not supported");
+ case UNAWARE:
+ if (!table.primaryKeys().isEmpty()) {
+ throw new IllegalArgumentException(
+ "Only append table can support unaware bucket.");
+ }
+ throw new IllegalArgumentException("Unaware bucket mode are
not supported");
+ default:
+ throw new IllegalArgumentException("Unknown bucket mode");
+ }
+ }
+
+ @Override
+ public ConnectorOutputTableHandle beginCreateTable(
+ ConnectorSession session,
+ ConnectorTableMetadata tableMetadata,
+ Optional<ConnectorTableLayout> layout,
+ RetryMode retryMode) {
+ createTable(session, tableMetadata, false);
+ return getTableHandle(session, tableMetadata.getTable(),
Collections.emptyMap());
+ }
+
+ @Override
+ public Optional<ConnectorOutputMetadata> finishCreateTable(
+ ConnectorSession session,
+ ConnectorOutputTableHandle tableHandle,
+ Collection<Slice> fragments,
+ Collection<ComputedStatistics> computedStatistics) {
+ if (fragments.isEmpty()) {
+ return Optional.empty();
+ }
+ return commit(session, (TrinoTableHandle) tableHandle, fragments);
+ }
+
+ @Override
+ public ConnectorInsertTableHandle beginInsert(
+ ConnectorSession session,
+ ConnectorTableHandle tableHandle,
+ List<ColumnHandle> columns,
+ RetryMode retryMode) {
+ return (ConnectorInsertTableHandle) tableHandle;
+ }
+
+ @Override
+ public Optional<ConnectorOutputMetadata> finishInsert(
+ ConnectorSession session,
+ ConnectorInsertTableHandle insertHandle,
+ Collection<Slice> fragments,
+ Collection<ComputedStatistics> computedStatistics) {
+ return commit(session, (TrinoTableHandle) insertHandle, fragments);
+ }
+
+ private Optional<ConnectorOutputMetadata> commit(
+ ConnectorSession session, TrinoTableHandle insertHandle,
Collection<Slice> fragments) {
+ CommitMessageSerializer serializer = new CommitMessageSerializer();
+ List<CommitMessage> commitMessages =
+ fragments.stream()
+ .map(
+ slice -> {
+ try {
+ return serializer.deserialize(
+ serializer.getVersion(),
slice.getBytes());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .collect(toList());
+
+ if (commitMessages.isEmpty()) {
+ return Optional.empty();
+ }
+
+ TrinoTableHandle table = insertHandle;
+ BatchWriteBuilder batchWriteBuilder =
+ table.tableWithDynamicOptions(catalog,
session).newBatchWriteBuilder();
+ if (TrinoSessionProperties.enableInsertOverwrite(session)) {
+ batchWriteBuilder.withOverwrite();
+ }
+ batchWriteBuilder.newCommit().commit(commitMessages);
+ return Optional.empty();
+ }
+
+ @Override
+ public RowChangeParadigm getRowChangeParadigm(
+ ConnectorSession session, ConnectorTableHandle tableHandle) {
+ return DELETE_ROW_AND_INSERT_ROW;
+ }
+
+ // todo support dynamic bucket table
+ @Override
+ public ColumnHandle getMergeRowIdColumnHandle(
+ ConnectorSession session, ConnectorTableHandle tableHandle) {
+ TrinoTableHandle trinoTableHandle = (TrinoTableHandle) tableHandle;
+ Table table = trinoTableHandle.table(catalog);
+ Set<String> pkSet =
table.primaryKeys().stream().collect(Collectors.toSet());
+ DataField[] row =
+ table.rowType().getFields().stream()
+ .filter(dataField -> pkSet.contains(dataField.name()))
+ .toArray(DataField[]::new);
+ return TrinoColumnHandle.of(TRINO_ROW_ID_NAME, DataTypes.ROW(row));
+ }
+
+ // todo support dynamic bucket table
+ @Override
+ public Optional<ConnectorPartitioningHandle> getUpdateLayout(
+ ConnectorSession session, ConnectorTableHandle tableHandle) {
+ TrinoTableHandle trinoTableHandle = (TrinoTableHandle) tableHandle;
+ Table table = trinoTableHandle.table(catalog);
+ if (!(table instanceof FileStoreTable)) {
+ throw new IllegalArgumentException(table.getClass() + " is not
supported");
+ }
+ FileStoreTable fileStoreTable = (FileStoreTable) table;
+ switch (fileStoreTable.bucketMode()) {
+ case FIXED:
+ try {
+ return Optional.of(
+ new TrinoPartitioningHandle(
+
InstantiationUtil.serializeObject(fileStoreTable.schema())));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ case DYNAMIC:
+ case GLOBAL_DYNAMIC:
+ if (table.primaryKeys().isEmpty()) {
+ throw new IllegalArgumentException(
+ "Only primary-key table can support dynamic
bucket.");
+ }
+ throw new IllegalArgumentException("Global dynamic bucket mode
are not supported");
+ case UNAWARE:
+ if (!table.primaryKeys().isEmpty()) {
+ throw new IllegalArgumentException(
+ "Only append table can support unaware bucket.");
+ }
+ throw new IllegalArgumentException("Unaware bucket mode are
not supported");
+ default:
+ throw new IllegalArgumentException("Unknown bucket mode");
+ }
+ }
+
+ @Override
+ public ConnectorMergeTableHandle beginMerge(
+ ConnectorSession session, ConnectorTableHandle tableHandle,
RetryMode retryMode) {
+ return new TrinoMergeTableHandle((TrinoTableHandle) tableHandle);
+ }
+
+ @Override
+ public void finishMerge(
+ ConnectorSession session,
+ ConnectorMergeTableHandle mergeTableHandle,
+ Collection<Slice> fragments,
+ Collection<ComputedStatistics> computedStatistics) {
+ commit(session, (TrinoTableHandle) mergeTableHandle.getTableHandle(),
fragments);
+ }
+
@Override
public boolean schemaExists(ConnectorSession session, String schemaName) {
catalog.initSession(session);
diff --git
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoModule.java
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoModule.java
index da22ed6..f629736 100644
--- a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoModule.java
+++ b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoModule.java
@@ -41,6 +41,8 @@ public class TrinoModule implements Module {
binder.bind(TrinoMetadataFactory.class).in(SINGLETON);
binder.bind(TrinoSplitManager.class).in(SINGLETON);
binder.bind(TrinoPageSourceProvider.class).in(SINGLETON);
+ binder.bind(TrinoPageSinkProvider.class).in(SINGLETON);
+ binder.bind(TrinoNodePartitioningProvider.class).in(SINGLETON);
binder.bind(TrinoSessionProperties.class).in(SINGLETON);
binder.bind(TrinoTableOptions.class).in(SINGLETON);
}
diff --git
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoNodePartitioningProvider.java
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoNodePartitioningProvider.java
new file mode 100644
index 0000000..419f8a1
--- /dev/null
+++
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoNodePartitioningProvider.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino;
+
+import com.google.inject.Inject;
+import io.trino.spi.connector.BucketFunction;
+import io.trino.spi.connector.ConnectorNodePartitioningProvider;
+import io.trino.spi.connector.ConnectorPartitioningHandle;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorTransactionHandle;
+import io.trino.spi.type.Type;
+
+import java.util.List;
+
+/** Trino {@link ConnectorNodePartitioningProvider}. */
+public class TrinoNodePartitioningProvider implements
ConnectorNodePartitioningProvider {
+
+ @Inject
+ public TrinoNodePartitioningProvider() {}
+
+ @Override
+ public BucketFunction getBucketFunction(
+ ConnectorTransactionHandle transactionHandle,
+ ConnectorSession session,
+ ConnectorPartitioningHandle partitioningHandle,
+ List<Type> partitionChannelTypes,
+ int bucketCount) {
+ // todo support different types of tables according to different
PartitioningHandle
+ return new FixedBucketTableShuffleFunction(
+ partitionChannelTypes, (TrinoPartitioningHandle)
partitioningHandle, bucketCount);
+ }
+}
diff --git
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSink.java
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSink.java
new file mode 100644
index 0000000..bbd94e5
--- /dev/null
+++
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSink.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino;
+
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageSerializer;
+import org.apache.paimon.types.RowKind;
+
+import io.airlift.slice.Slice;
+import io.trino.spi.Page;
+import io.trino.spi.connector.ConnectorPageSink;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static io.airlift.slice.Slices.wrappedBuffer;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+/** TrinoPageSink. */
+public class TrinoPageSink implements ConnectorPageSink {
+
+ private final BatchTableWrite writer;
+
+ public TrinoPageSink(BatchTableWrite writer) {
+ this.writer = writer;
+ }
+
+ @Override
+ public CompletableFuture<?> appendPage(Page page) {
+ try {
+ writePage(page, RowKind.INSERT);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return NOT_BLOCKED;
+ }
+
+ public void writePage(Page page, RowKind rowKind) {
+ try {
+ for (int i = 0; i < page.getPositionCount(); i++) {
+ writer.write(new TrinoRow(page.getSingleValuePage(i),
rowKind));
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Collection<Slice>> finish() {
+ Collection<Slice> commitTasks = new ArrayList<>();
+ try {
+ List<CommitMessage> commitMessages = writer.prepareCommit();
+ CommitMessageSerializer serializer = new CommitMessageSerializer();
+ for (CommitMessage commitMessage : commitMessages) {
+
commitTasks.add(wrappedBuffer(serializer.serialize(commitMessage)));
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ writer.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return completedFuture(commitTasks);
+ }
+
+ @Override
+ public void abort() {
+ try {
+ writer.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSinkProvider.java
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSinkProvider.java
new file mode 100644
index 0000000..c8e2c7d
--- /dev/null
+++
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSinkProvider.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino;
+
+import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.trino.catalog.TrinoCatalog;
+
+import com.google.inject.Inject;
+import io.trino.spi.connector.ConnectorInsertTableHandle;
+import io.trino.spi.connector.ConnectorMergeSink;
+import io.trino.spi.connector.ConnectorMergeTableHandle;
+import io.trino.spi.connector.ConnectorOutputTableHandle;
+import io.trino.spi.connector.ConnectorPageSink;
+import io.trino.spi.connector.ConnectorPageSinkId;
+import io.trino.spi.connector.ConnectorPageSinkProvider;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorTransactionHandle;
+
+import static java.util.Objects.requireNonNull;
+import static
org.apache.paimon.trino.ClassLoaderUtils.runWithContextClassLoader;
+
+/** Trino {@link ConnectorPageSinkProvider}. */
+public class TrinoPageSinkProvider implements ConnectorPageSinkProvider {
+
+ private final TrinoCatalog trinoCatalog;
+
+ @Inject
+ public TrinoPageSinkProvider(TrinoMetadataFactory trinoMetadataFactory) {
+ this.trinoCatalog =
+ requireNonNull(trinoMetadataFactory, "trinoMetadataFactory is
null")
+ .create()
+ .catalog();
+ }
+
+ @Override
+ public ConnectorPageSink createPageSink(
+ ConnectorTransactionHandle transactionHandle,
+ ConnectorSession session,
+ ConnectorOutputTableHandle outputTableHandle,
+ ConnectorPageSinkId pageSinkId) {
+ return createPageSink((TrinoTableHandle) outputTableHandle, session);
+ }
+
+ @Override
+ public ConnectorPageSink createPageSink(
+ ConnectorTransactionHandle transactionHandle,
+ ConnectorSession session,
+ ConnectorInsertTableHandle insertTableHandle,
+ ConnectorPageSinkId pageSinkId) {
+ return createPageSink((TrinoTableHandle) insertTableHandle, session);
+ }
+
+ private ConnectorPageSink createPageSink(
+ TrinoTableHandle tableHandle, ConnectorSession session) {
+ trinoCatalog.initSession(session);
+ Table table = tableHandle.tableWithDynamicOptions(trinoCatalog,
session);
+ validataBucketMode(table);
+
+ return runWithContextClassLoader(
+ () -> {
+ BatchWriteBuilder batchWriteBuilder =
table.newBatchWriteBuilder();
+ if (TrinoSessionProperties.enableInsertOverwrite(session))
{
+ batchWriteBuilder.withOverwrite();
+ }
+ BatchTableWrite write = batchWriteBuilder.newWrite();
+ return new TrinoPageSink(write);
+ },
+ TrinoPageSinkProvider.class.getClassLoader());
+ }
+
+ private static void validataBucketMode(Table table) {
+ BucketMode mode =
+ table instanceof FileStoreTable
+ ? ((FileStoreTable) table).bucketMode()
+ : BucketMode.FIXED;
+ switch (mode) {
+ case FIXED:
+ if (table.primaryKeys().isEmpty()) {
+ throw new IllegalArgumentException("Only support
primary-key table.");
+ }
+ break;
+ case DYNAMIC:
+ case GLOBAL_DYNAMIC:
+ if (table.primaryKeys().isEmpty()) {
+ throw new IllegalArgumentException(
+ "Only primary-key table can support dynamic
bucket.");
+ }
+ throw new IllegalArgumentException("Global dynamic bucket mode
are not supported");
+ case UNAWARE:
+ if (!table.primaryKeys().isEmpty()) {
+ throw new IllegalArgumentException(
+ "Only append table can support unaware bucket.");
+ }
+ throw new IllegalArgumentException("Unaware bucket mode are
not supported");
+ default:
+ throw new IllegalArgumentException("Unknown bucket mode");
+ }
+ }
+
+ @Override
+ public ConnectorMergeSink createMergeSink(
+ ConnectorTransactionHandle transactionHandle,
+ ConnectorSession session,
+ ConnectorMergeTableHandle mergeHandle,
+ ConnectorPageSinkId pageSinkId) {
+ TrinoTableHandle trinoTableHandle = (TrinoTableHandle)
mergeHandle.getTableHandle();
+ Table table = trinoTableHandle.tableWithDynamicOptions(trinoCatalog,
session);
+ return new TrinoMergeSink(
+ createPageSink(trinoTableHandle, session),
table.rowType().getFields().size());
+ }
+}
diff --git
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java
index a380e42..41bb874 100644
---
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java
+++
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java
@@ -71,6 +71,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
+import java.util.Set;
import java.util.stream.Collectors;
import static
io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext;
@@ -106,14 +107,52 @@ public class TrinoPageSourceProvider implements
ConnectorPageSourceProvider {
TrinoTableHandle trinoTableHandle = (TrinoTableHandle) tableHandle;
Table table = trinoTableHandle.tableWithDynamicOptions(trinoCatalog,
session);
return runWithContextClassLoader(
- () ->
- createPageSource(
+ () -> {
+ Optional<TrinoColumnHandle> rowId =
+ columns.stream()
+ .map(TrinoColumnHandle.class::cast)
+ .filter(column -> column.isRowId())
+ .findFirst();
+ if (rowId.isPresent()) {
+ List<ColumnHandle> dataColumns =
+ columns.stream()
+ .map(TrinoColumnHandle.class::cast)
+ .filter(column -> !column.isRowId())
+ .collect(Collectors.toList());
+ Set<String> rowIdFileds =
+ ((io.trino.spi.type.RowType)
rowId.get().getTrinoType())
+ .getFields().stream()
+
.map(io.trino.spi.type.RowType.Field::getName)
+ .map(Optional::get)
+ .collect(Collectors.toSet());
+
+ HashMap<String, Integer> fieldToIndex = new
HashMap<>();
+ for (int i = 0; i < dataColumns.size(); i++) {
+ TrinoColumnHandle trinoColumnHandle =
+ (TrinoColumnHandle) dataColumns.get(i);
+ if
(rowIdFileds.contains(trinoColumnHandle.getColumnName())) {
+
fieldToIndex.put(trinoColumnHandle.getColumnName(), i);
+ }
+ }
+ return TrinoMergePageSourceWrapper.wrap(
+ createPageSource(
+ session,
+ table,
+ trinoTableHandle.getFilter(),
+ (TrinoSplit) split,
+ dataColumns,
+ trinoTableHandle.getLimit()),
+ fieldToIndex);
+ } else {
+ return createPageSource(
session,
table,
trinoTableHandle.getFilter(),
(TrinoSplit) split,
columns,
- trinoTableHandle.getLimit()),
+ trinoTableHandle.getLimit());
+ }
+ },
TrinoPageSourceProvider.class.getClassLoader());
}
diff --git
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPartitioningHandle.java
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPartitioningHandle.java
new file mode 100644
index 0000000..793bf02
--- /dev/null
+++
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPartitioningHandle.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino;
+
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.utils.InstantiationUtil;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.trino.spi.connector.ConnectorPartitioningHandle;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+/** Trino {@link ConnectorPartitioningHandle}. */
+public class TrinoPartitioningHandle implements ConnectorPartitioningHandle {
+
+ private final byte[] schema;
+
+ @JsonCreator
+ public TrinoPartitioningHandle(@JsonProperty("schema") byte[] schema) {
+ this.schema = schema;
+ }
+
+ @JsonProperty
+ public byte[] getSchema() {
+ return schema;
+ }
+
+ public TableSchema getOriginalSchema() {
+ try {
+ return InstantiationUtil.deserializeObject(this.schema,
getClass().getClassLoader());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TrinoPartitioningHandle that = (TrinoPartitioningHandle) o;
+ return Arrays.equals(schema, that.getSchema());
+ }
+}
diff --git
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoRow.java
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoRow.java
new file mode 100644
index 0000000..0d4823d
--- /dev/null
+++ b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoRow.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.types.RowKind;
+
+import io.airlift.slice.Slice;
+import io.trino.spi.Page;
+import io.trino.spi.type.DecimalType;
+import io.trino.spi.type.Int128;
+import io.trino.spi.type.TypeUtils;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+
+import static io.trino.spi.type.BigintType.BIGINT;
+import static io.trino.spi.type.BooleanType.BOOLEAN;
+import static io.trino.spi.type.Decimals.MAX_SHORT_PRECISION;
+import static io.trino.spi.type.DoubleType.DOUBLE;
+import static io.trino.spi.type.IntegerType.INTEGER;
+import static io.trino.spi.type.RealType.REAL;
+import static io.trino.spi.type.SmallintType.SMALLINT;
+import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS;
+import static io.trino.spi.type.VarbinaryType.VARBINARY;
+import static
org.apache.paimon.shade.guava30.com.google.common.base.Verify.verify;
+
+/** TrinoRow {@link InternalRow}. */
+public class TrinoRow implements InternalRow, Serializable {
+
+ private final RowKind rowKind;
+ private final Page singlePage;
+
+ public TrinoRow(Page singlePage, RowKind rowKind) {
+ verify(singlePage.getPositionCount() == 1, "singlePage must have only
one row");
+ this.singlePage = singlePage;
+ this.rowKind = rowKind;
+ }
+
+ @Override
+ public int getFieldCount() {
+ return singlePage.getChannelCount();
+ }
+
+ @Override
+ public RowKind getRowKind() {
+ return rowKind;
+ }
+
+ @Override
+ public void setRowKind(RowKind rowKind) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isNullAt(int i) {
+ return singlePage.getBlock(i).isNull(0);
+ }
+
+ @Override
+ public boolean getBoolean(int i) {
+ return (boolean) TypeUtils.readNativeValue(BOOLEAN,
singlePage.getBlock(i), 0);
+ }
+
+ @Override
+ public byte getByte(int i) {
+ Slice slice = (Slice) TypeUtils.readNativeValue(VARBINARY,
singlePage.getBlock(i), 0);
+ return slice.getByte(0);
+ }
+
+ @Override
+ public short getShort(int i) {
+ long value = (long) TypeUtils.readNativeValue(SMALLINT,
singlePage.getBlock(i), 0);
+ if (value < Short.MIN_VALUE || value > Short.MAX_VALUE) {
+ throw new IllegalArgumentException("Value out of range for short:
" + value);
+ }
+ return (short) value;
+ }
+
+ @Override
+ public int getInt(int i) {
+ long value = (long) TypeUtils.readNativeValue(INTEGER,
singlePage.getBlock(i), 0);
+ if (value < Integer.MIN_VALUE || value > Integer.MAX_VALUE) {
+ throw new IllegalArgumentException("Value out of range for int: "
+ value);
+ }
+ return (int) value;
+ }
+
+ @Override
+ public long getLong(int i) {
+ return (long) TypeUtils.readNativeValue(BIGINT,
singlePage.getBlock(i), 0);
+ }
+
+ @Override
+ public float getFloat(int i) {
+ return Float.intBitsToFloat(
+ Math.toIntExact((long) TypeUtils.readNativeValue(REAL,
singlePage.getBlock(i), 0)));
+ }
+
+ @Override
+ public double getDouble(int i) {
+ return (double) TypeUtils.readNativeValue(DOUBLE,
singlePage.getBlock(i), 0);
+ }
+
+ @Override
+ public BinaryString getString(int i) {
+ return BinaryString.fromBytes(getBinary(i));
+ }
+
+ @Override
+ public Decimal getDecimal(int i, int decimalPrecision, int decimalScale) {
+ Object value =
+ TypeUtils.readNativeValue(
+ DecimalType.createDecimalType(decimalPrecision,
decimalScale),
+ singlePage.getBlock(i),
+ 0);
+ if (decimalPrecision <= MAX_SHORT_PRECISION) {
+ return Decimal.fromUnscaledLong((Long) value, decimalPrecision,
decimalScale);
+ } else {
+ long high = ((Int128) value).getHigh();
+ long low = ((Int128) value).getLow();
+ BigInteger bigIntegerValue =
+
BigInteger.valueOf(high).shiftLeft(64).add(BigInteger.valueOf(low));
+ BigDecimal bigDecimalValue = new BigDecimal(bigIntegerValue,
decimalScale);
+ return Decimal.fromBigDecimal(bigDecimalValue, decimalPrecision,
decimalScale);
+ }
+ }
+
+ @Override
+ public Timestamp getTimestamp(int i, int timestampPrecision) {
+ long value = (long) TypeUtils.readNativeValue(TIMESTAMP_MICROS,
singlePage.getBlock(i), 0);
+ return Timestamp.fromMicros(value);
+ }
+
+ @Override
+ public byte[] getBinary(int i) {
+ Slice slice = (Slice) TypeUtils.readNativeValue(VARBINARY,
singlePage.getBlock(i), 0);
+ return slice.getBytes();
+ }
+
+ @Override
+ public InternalArray getArray(int i) {
+ // todo
+ // singlePage.getBlock(i).getChildren()
+ return null;
+ }
+
+ @Override
+ public InternalMap getMap(int i) {
+ // todo
+ return null;
+ }
+
+ @Override
+ public InternalRow getRow(int i, int i1) {
+ // todo
+ return null;
+ }
+}
diff --git
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoSessionProperties.java
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoSessionProperties.java
index 825d34c..e929bfc 100644
---
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoSessionProperties.java
+++
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoSessionProperties.java
@@ -27,6 +27,7 @@ import java.util.List;
import static io.trino.spi.session.PropertyMetadata.doubleProperty;
import static io.trino.spi.session.PropertyMetadata.longProperty;
+import static io.trino.spi.type.VarcharType.VARCHAR;
import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID;
import static org.apache.paimon.CoreOptions.SCAN_TIMESTAMP_MILLIS;
@@ -36,6 +37,8 @@ public class TrinoSessionProperties {
public static final String SCAN_TIMESTAMP = "scan_timestamp_millis";
public static final String SCAN_SNAPSHOT = "scan_snapshot_id";
public static final String MINIMUM_SPLIT_WEIGHT = "minimum_split_weight";
+ public static final String INSERT_EXISTING_PARTITIONS_BEHAVIOR =
+ "insert_existing_partitions_behavior";
private final List<PropertyMetadata<?>> sessionProperties;
@@ -57,6 +60,18 @@ public class TrinoSessionProperties {
.add(
doubleProperty(
MINIMUM_SPLIT_WEIGHT, "Minimum split
weight", 0.05, false))
+ .add(
+ new PropertyMetadata<>(
+ INSERT_EXISTING_PARTITIONS_BEHAVIOR,
+ "Behavior on insert existing
partitions",
+ VARCHAR,
+ InsertExistingPartitionsBehavior.class,
+
InsertExistingPartitionsBehavior.APPEND,
+ false,
+ value ->
+
InsertExistingPartitionsBehavior.valueOf(
+ (String) value),
+
InsertExistingPartitionsBehavior::toString))
.build();
}
@@ -75,4 +90,17 @@ public class TrinoSessionProperties {
public static Double getMinimumSplitWeight(ConnectorSession session) {
return session.getProperty(MINIMUM_SPLIT_WEIGHT, Double.class);
}
+
+ public static boolean enableInsertOverwrite(ConnectorSession session) {
+ return session.getProperty(
+ INSERT_EXISTING_PARTITIONS_BEHAVIOR,
InsertExistingPartitionsBehavior.class)
+ == InsertExistingPartitionsBehavior.OVERWRITE;
+ }
+
+ /** Insert existing partitions behavior. */
+ public enum InsertExistingPartitionsBehavior {
+ ERROR,
+ APPEND,
+ OVERWRITE,
+ }
}
diff --git
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoTableHandle.java
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoTableHandle.java
index bc5d025..d5dbf44 100644
---
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoTableHandle.java
+++
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoTableHandle.java
@@ -28,6 +28,8 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ColumnMetadata;
+import io.trino.spi.connector.ConnectorInsertTableHandle;
+import io.trino.spi.connector.ConnectorOutputTableHandle;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTableMetadata;
@@ -44,7 +46,8 @@ import java.util.OptionalLong;
import java.util.stream.Collectors;
/** Trino {@link ConnectorTableHandle}. */
-public class TrinoTableHandle implements ConnectorTableHandle {
+public class TrinoTableHandle
+ implements ConnectorTableHandle, ConnectorInsertTableHandle,
ConnectorOutputTableHandle {
private final String schemaName;
private final String tableName;
diff --git
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/fileio/TrinoFileIO.java
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/fileio/TrinoFileIO.java
index 6b74596..0592751 100644
---
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/fileio/TrinoFileIO.java
+++
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/fileio/TrinoFileIO.java
@@ -32,6 +32,7 @@ import io.trino.filesystem.FileIterator;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoInputFile;
+import io.trino.filesystem.TrinoOutputFile;
import javax.annotation.Nullable;
@@ -65,9 +66,11 @@ public class TrinoFileIO implements FileIO {
}
@Override
- public PositionOutputStream newOutputStream(Path path, boolean b) throws
IOException {
+ public PositionOutputStream newOutputStream(Path path, boolean overwrite)
throws IOException {
+ TrinoOutputFile trinoOutputFile =
+ trinoFileSystem.newOutputFile(Location.of(path.toString()));
return new PositionOutputStreamWrapper(
-
trinoFileSystem.newOutputFile(Location.of(path.toString())).create());
+ overwrite ? trinoOutputFile.createOrOverwrite() :
trinoOutputFile.create());
}
@Override
@@ -132,7 +135,7 @@ public class TrinoFileIO implements FileIO {
}
@Override
- public boolean delete(Path path, boolean b) throws IOException {
+ public boolean delete(Path path, boolean recursive) throws IOException {
Location location = Location.of(path.toString());
if (trinoFileSystem.directoryExists(location).orElse(false)) {
trinoFileSystem.deleteDirectory(location);
diff --git
a/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
b/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
index 0258ff8..dda03b3 100644
---
a/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
+++
b/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
@@ -489,6 +489,29 @@ public abstract class TestTrinoITCase extends
AbstractTestQueryFramework {
commit.commit(2, writer.prepareCommit(true, 2));
}
+ {
+ Path tablePath = new Path(warehouse, "default.db/t103");
+ RowType rowType =
+ new RowType(
+ Arrays.asList(
+ new DataField(0, "id", DataTypes.INT()),
+ new DataField(1, "name",
DataTypes.STRING())));
+ new SchemaManager(LocalFileIO.create(), tablePath)
+ .createTable(
+ new Schema(
+ rowType.getFields(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ new HashMap<>() {
+ {
+ put("file.format", "orc");
+ put("primary-key", "id");
+ put("bucket", "2");
+ }
+ },
+ ""));
+ }
+
DistributedQueryRunner queryRunner = null;
try {
queryRunner =
@@ -620,9 +643,7 @@ public abstract class TestTrinoITCase extends
AbstractTestQueryFramework {
+ "bucket_key = 'order_key',"
+ "changelog_producer = 'input'"
+ ")");
- assertThat(sql("SHOW TABLES FROM paimon.default"))
- .isEqualTo(
- "[[empty_t], [orders], [t1], [t100], [t101], [t102],
[t2], [t3], [t4], [t99]]");
+ assertThat(sql("SHOW TABLES FROM paimon.default")).contains("orders");
sql("DROP TABLE IF EXISTS paimon.default.orders");
}
@@ -644,9 +665,8 @@ public abstract class TestTrinoITCase extends
AbstractTestQueryFramework {
+ "changelog_producer = 'input'"
+ ")");
sql("ALTER TABLE paimon.default.t5 RENAME TO t6");
- assertThat(sql("SHOW TABLES FROM paimon.default"))
- .isEqualTo(
- "[[empty_t], [t1], [t100], [t101], [t102], [t2], [t3],
[t4], [t6], [t99]]");
+ String result = sql("SHOW TABLES FROM paimon.default");
+ assertThat(result).doesNotContain("t5").contains("t6");
sql("DROP TABLE IF EXISTS paimon.default.t6");
}
@@ -668,8 +688,7 @@ public abstract class TestTrinoITCase extends
AbstractTestQueryFramework {
+ "changelog_producer = 'input'"
+ ")");
sql("DROP TABLE IF EXISTS paimon.default.t5");
- assertThat(sql("SHOW TABLES FROM paimon.default"))
- .isEqualTo("[[empty_t], [t1], [t100], [t101], [t102], [t2],
[t3], [t4], [t99]]");
+ assertThat(sql("SHOW TABLES FROM
paimon.default")).doesNotContain("t5");
}
@Test
@@ -833,6 +852,14 @@ public abstract class TestTrinoITCase extends
AbstractTestQueryFramework {
assertThat(sql("SELECT * FROM paimon.default.t102 where c =
2")).isEqualTo("[[a2, 2, 2]]");
}
+ @Test
+ public void testInsertInto() {
+ sql(
+ "INSERT INTO paimon.default.t103 VALUES
(1,'1'),(2,'2'),(3,'3'),(4,'4'),(5,'5'),(6,'6')");
+ assertThat(sql("SELECT * FROM paimon.default.t103 order by id asc"))
+ .isEqualTo("[[1, 1], [2, 2], [3, 3], [4, 4], [5, 5], [6, 6]]");
+ }
+
protected String sql(String sql) {
MaterializedResult result = getQueryRunner().execute(sql);
return result.getMaterializedRows().toString();
diff --git
a/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoPartitioningHandle.java
b/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoPartitioningHandle.java
new file mode 100644
index 0000000..7d97c7c
--- /dev/null
+++
b/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoPartitioningHandle.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino;
+
+import org.apache.paimon.utils.InstantiationUtil;
+
+import io.airlift.json.JsonCodec;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link TrinoPartitioningHandle}. */
+public class TestTrinoPartitioningHandle {
+
+ private final JsonCodec<TrinoPartitioningHandle> codec =
+ JsonCodec.jsonCodec(TrinoPartitioningHandle.class);
+
+ @Test
+ public void testTrinoPartitioningHandle() throws Exception {
+ byte[] schemaData = InstantiationUtil.serializeObject("test_schema");
+ TrinoPartitioningHandle expected = new
TrinoPartitioningHandle(schemaData);
+ testRoundTrip(expected);
+ }
+
+ private void testRoundTrip(TrinoPartitioningHandle expected) {
+ String json = codec.toJson(expected);
+ TrinoPartitioningHandle actual = codec.fromJson(json);
+ assertThat(actual).isEqualTo(expected);
+ assertThat(actual.getSchema()).isEqualTo(expected.getSchema());
+ }
+}
diff --git
a/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoRow.java
b/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoRow.java
new file mode 100644
index 0000000..580a93d
--- /dev/null
+++
b/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoRow.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.types.RowKind;
+
+import io.airlift.slice.Slices;
+import io.trino.spi.Page;
+import io.trino.spi.type.DecimalType;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.time.LocalDateTime;
+
+import static io.trino.spi.type.BigintType.BIGINT;
+import static io.trino.spi.type.BooleanType.BOOLEAN;
+import static io.trino.spi.type.Decimals.encodeScaledValue;
+import static io.trino.spi.type.Decimals.encodeShortScaledValue;
+import static io.trino.spi.type.DoubleType.DOUBLE;
+import static io.trino.spi.type.IntegerType.INTEGER;
+import static io.trino.spi.type.RealType.REAL;
+import static io.trino.spi.type.SmallintType.SMALLINT;
+import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS;
+import static io.trino.spi.type.TypeUtils.writeNativeValue;
+import static io.trino.spi.type.VarbinaryType.VARBINARY;
+import static io.trino.spi.type.VarcharType.VARCHAR;
+import static io.trino.type.DateTimes.MICROSECONDS_PER_MILLISECOND;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/** Test for {@link TrinoRow}. */
+public class TestTrinoRow {
+
+ @Test
+ void test() {
+ Page singlePage =
+ new Page(
+ 1,
+ writeNativeValue(BOOLEAN, null),
+ writeNativeValue(BOOLEAN, false),
+ writeNativeValue(VARBINARY,
Slices.wrappedBuffer((byte) 22)),
+ writeNativeValue(SMALLINT, 356L),
+ writeNativeValue(INTEGER, 4L),
+ writeNativeValue(BIGINT, 23567222L),
+ writeNativeValue(REAL, (long)
Float.floatToIntBits(1213.33f)),
+ writeNativeValue(DOUBLE, 121.3d),
+ writeNativeValue(
+ VARCHAR,
+ Slices.wrappedBuffer(
+ new
String("rfyu").getBytes(StandardCharsets.UTF_8))),
+ writeNativeValue(
+ DecimalType.createDecimalType(2, 2),
+
encodeShortScaledValue(BigDecimal.valueOf(0.21), 2)),
+ writeNativeValue(
+ DecimalType.createDecimalType(38, 2),
+
encodeScaledValue(BigDecimal.valueOf(65782123123.01), 2)),
+ writeNativeValue(
+ DecimalType.createDecimalType(10, 1),
+
encodeShortScaledValue(BigDecimal.valueOf(62123123.5), 1)),
+ writeNativeValue(
+ TIMESTAMP_MICROS,
+ Timestamp.fromLocalDateTime(
+
LocalDateTime.parse("2007-12-03T10:15:30"))
+ .getMillisecond()
+ * MICROSECONDS_PER_MILLISECOND),
+ writeNativeValue(
+ VARBINARY,
+ Slices.wrappedBuffer(
+
"varbinary_v".getBytes(StandardCharsets.UTF_8))));
+ TrinoRow trinoRow = new TrinoRow(singlePage, RowKind.INSERT);
+
+ assertThat(trinoRow.getRowKind()).isEqualTo(RowKind.INSERT);
+ assertThat(trinoRow.isNullAt(0)).isEqualTo(true);
+ assertThat(trinoRow.getBoolean(1)).isEqualTo(false);
+ assertThat(trinoRow.getByte(2)).isEqualTo((byte) 22);
+ assertThat(trinoRow.getShort(3)).isEqualTo((short) 356);
+ assertThat(trinoRow.getInt(4)).isEqualTo(4);
+ assertThat(trinoRow.getLong(5)).isEqualTo(23567222L);
+ assertThat(trinoRow.getFloat(6)).isEqualTo(1213.33f);
+ assertThat(trinoRow.getDouble(7)).isEqualTo(121.3d);
+
assertThat(trinoRow.getString(8)).isEqualTo(BinaryString.fromString("rfyu"));
+ assertThat(trinoRow.getDecimal(9, 2, 2))
+ .isEqualTo(Decimal.fromBigDecimal(BigDecimal.valueOf(0.21), 2,
2));
+ assertThat(trinoRow.getDecimal(10, 38, 2))
+
.isEqualTo(Decimal.fromBigDecimal(BigDecimal.valueOf(65782123123.01), 38, 2));
+ assertThat(trinoRow.getDecimal(11, 10, 1))
+
.isEqualTo(Decimal.fromBigDecimal(BigDecimal.valueOf(62123123.5), 10, 1));
+ assertThat(trinoRow.getTimestamp(12, 6))
+
.isEqualTo(Timestamp.fromLocalDateTime(LocalDateTime.parse("2007-12-03T10:15:30")));
+ assertThat(trinoRow.getBinary(13))
+ .isEqualTo("varbinary_v".getBytes(StandardCharsets.UTF_8));
+ }
+}