This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch old_version_master
in repository https://gitbox.apache.org/repos/asf/paimon-trino.git
The following commit(s) were added to refs/heads/old_version_master by this
push:
new f67712a [388] Cherry-pick incremental read from
https://github.com/apache/paimon-trino/commit/5697da8de19a60e52842fd86d612c99e35aa0877\#diff-e62f488a14d012b280ffd60b07e6a3915757e393f68103b42fd3262adc0b9d61,
make it suitable for trino-388
f67712a is described below
commit f67712ac0c5651e11952d51fea5d28f3229f419d
Author: 仟弋 <[email protected]>
AuthorDate: Tue Jul 22 11:25:31 2025 +0800
[388] Cherry-pick incremental read from
https://github.com/apache/paimon-trino/commit/5697da8de19a60e52842fd86d612c99e35aa0877\#diff-e62f488a14d012b280ffd60b07e6a3915757e393f68103b42fd3262adc0b9d61,
make it suitable for trino-388
---
.../apache/paimon/trino/TableChangesFunction.java | 223 ++++++++++
.../paimon/trino/TableChangesFunctionProvider.java | 43 ++
.../org/apache/paimon/trino/TrinoConnector.java | 57 +++
.../apache/paimon/trino/TrinoConnectorBase.java | 96 +++++
.../paimon/trino/TrinoConnectorFactoryBase.java | 97 +++++
.../org/apache/paimon/trino/TrinoMetadataBase.java | 447 +++++++++++++++++++++
.../java/org/apache/paimon/trino/TrinoModule.java | 54 +++
.../apache/paimon/trino/TrinoSplitManagerBase.java | 66 +++
.../org/apache/paimon/trino/TrinoTableHandle.java | 210 ++++++++++
.../apache/paimon/trino/TestTrino388ITCase.java | 26 ++
.../apache/paimon/trino/TrinoTableOptionUtils.java | 2 +-
11 files changed, 1320 insertions(+), 1 deletion(-)
diff --git
a/paimon-trino-388/src/main/java/org/apache/paimon/trino/TableChangesFunction.java
b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TableChangesFunction.java
new file mode 100644
index 0000000..94c8da9
--- /dev/null
+++
b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TableChangesFunction.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.InstantiationUtil;
+
+import com.google.inject.Inject;
+import io.airlift.slice.Slice;
+import io.airlift.slice.Slices;
+import io.trino.spi.TrinoException;
+import io.trino.spi.connector.ColumnHandle;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorTransactionHandle;
+import io.trino.spi.connector.SchemaTableName;
+import io.trino.spi.predicate.TupleDomain;
+import io.trino.spi.ptf.AbstractConnectorTableFunction;
+import io.trino.spi.ptf.Argument;
+import io.trino.spi.ptf.Descriptor;
+import io.trino.spi.ptf.ScalarArgument;
+import io.trino.spi.ptf.ScalarArgumentSpecification;
+import io.trino.spi.ptf.TableFunctionAnalysis;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+
+import static io.trino.spi.StandardErrorCode.INVALID_FUNCTION_ARGUMENT;
+import static
io.trino.spi.ptf.ReturnTypeSpecification.GenericTable.GENERIC_TABLE;
+import static io.trino.spi.type.VarcharType.VARCHAR;
+import static java.util.Locale.ENGLISH;
+import static java.util.Objects.requireNonNull;
+
+/** TableChangesFunction. */
+public class TableChangesFunction extends AbstractConnectorTableFunction {
+
+ private static final Slice INVALID_VALUE = Slices.utf8Slice("invalid");
+ private static final String FUNCTION_NAME = "table_changes";
+ private static final String SCHEMA_NAME_VAR_NAME = "SCHEMA_NAME";
+ private static final String TABLE_NAME_VAR_NAME = "TABLE_NAME";
+ private static final String INCREMENTAL_BETWEEN_SCAN_MODE =
+
TrinoTableOptionUtils.convertOptionKey(CoreOptions.INCREMENTAL_BETWEEN_SCAN_MODE.key())
+ .toUpperCase(ENGLISH);
+ private static final String INCREMENTAL_BETWEEN_TIMESTAMP =
+
TrinoTableOptionUtils.convertOptionKey(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP.key())
+ .toUpperCase(ENGLISH);
+ private static final String INCREMENTAL_BETWEEN =
+
TrinoTableOptionUtils.convertOptionKey(CoreOptions.INCREMENTAL_BETWEEN.key())
+ .toUpperCase(ENGLISH);
+ private final TrinoMetadata trinoMetadata;
+
+ @Inject
+ public TableChangesFunction(TrinoMetadataFactory trinoMetadataFactory) {
+ super(
+ "system",
+ FUNCTION_NAME,
+ ImmutableList.of(
+ ScalarArgumentSpecification.builder()
+ .name(SCHEMA_NAME_VAR_NAME)
+ .type(VARCHAR)
+ .build(),
+ ScalarArgumentSpecification.builder()
+ .name(TABLE_NAME_VAR_NAME)
+ .type(VARCHAR)
+ .build(),
+ ScalarArgumentSpecification.builder()
+ .name(INCREMENTAL_BETWEEN_SCAN_MODE)
+ .defaultValue(
+ Slices.utf8Slice(
+
CoreOptions.INCREMENTAL_BETWEEN_SCAN_MODE
+ .defaultValue()
+ .toString()))
+ .type(VARCHAR)
+ .build(),
+ ScalarArgumentSpecification.builder()
+ .name(INCREMENTAL_BETWEEN)
+ .defaultValue(INVALID_VALUE)
+ .type(VARCHAR)
+ .build(),
+ ScalarArgumentSpecification.builder()
+ .name(INCREMENTAL_BETWEEN_TIMESTAMP)
+ .defaultValue(INVALID_VALUE)
+ .type(VARCHAR)
+ .build()),
+ GENERIC_TABLE);
+ this.trinoMetadata =
+ requireNonNull(trinoMetadataFactory, "trinoMetadataFactory is
null").create();
+ }
+
+ @Override
+ public TableFunctionAnalysis analyze(
+ ConnectorSession session,
+ ConnectorTransactionHandle transaction,
+ Map<String, Argument> arguments) {
+ String schema = getSchemaName(arguments);
+ String table = getTableName(arguments);
+
+ Slice incrementalBetweenValue =
+ (Slice) ((ScalarArgument)
arguments.get(INCREMENTAL_BETWEEN)).getValue();
+ Slice incrementalBetweenTimestamp =
+ (Slice) ((ScalarArgument)
arguments.get(INCREMENTAL_BETWEEN_TIMESTAMP)).getValue();
+ if (incrementalBetweenValue.equals(INVALID_VALUE)
+ && incrementalBetweenTimestamp.equals(INVALID_VALUE)) {
+ throw new TrinoException(
+ INVALID_FUNCTION_ARGUMENT,
+ "Either "
+ + INCREMENTAL_BETWEEN
+ + " or "
+ + INCREMENTAL_BETWEEN_TIMESTAMP
+ + " must be provided");
+ }
+
+ SchemaTableName schemaTableName = new SchemaTableName(schema, table);
+ try {
+ TrinoTableHandle trinoTableHandle =
+ trinoMetadata.getTableHandle(session, schemaTableName);
+ Table paimonTable = trinoTableHandle.table();
+ Map<String, String> options = new HashMap<>(paimonTable.options());
+ if (!incrementalBetweenValue.equals(INVALID_VALUE)) {
+ options.put(
+ CoreOptions.INCREMENTAL_BETWEEN.key(),
+ incrementalBetweenValue.toStringUtf8());
+ }
+ if (!incrementalBetweenTimestamp.equals(INVALID_VALUE)) {
+ options.put(
+ CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP.key(),
+ incrementalBetweenTimestamp.toStringUtf8());
+ }
+ paimonTable = paimonTable.copy(options);
+
+ ImmutableList.Builder<Descriptor.Field> columns =
ImmutableList.builder();
+ List<ColumnHandle> projectedColumns = new ArrayList<>();
+ paimonTable.rowType().getFields().stream()
+ .forEach(
+ column -> {
+ columns.add(
+ new Descriptor.Field(
+ column.name(),
+ Optional.of(
+
TrinoTypeUtils.fromPaimonType(
+
column.type()))));
+ projectedColumns.add(
+ TrinoColumnHandle.of(
+ column.name().toLowerCase(),
column.type()));
+ });
+ return TableFunctionAnalysis.builder()
+ .returnedType(new Descriptor(columns.build()))
+ .handle(
+ new TrinoTableHandle(
+ schema,
+ table,
+
InstantiationUtil.serializeObject(paimonTable),
+ TupleDomain.all(),
+ Optional.of(projectedColumns),
+ OptionalLong.empty()))
+ .build();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to serialize table", e);
+ }
+ }
+
+ private static String getSchemaName(Map<String, Argument> arguments) {
+ if (argumentExists(arguments, SCHEMA_NAME_VAR_NAME)) {
+ return ((Slice)
+ checkNonNull(
+ ((ScalarArgument)
arguments.get(SCHEMA_NAME_VAR_NAME))
+ .getValue()))
+ .toStringUtf8();
+ }
+ throw new TrinoException(
+ INVALID_FUNCTION_ARGUMENT, SCHEMA_NAME_VAR_NAME + " argument
not found");
+ }
+
+ private static String getTableName(Map<String, Argument> arguments) {
+ if (argumentExists(arguments, TABLE_NAME_VAR_NAME)) {
+ return ((Slice)
+ checkNonNull(
+ ((ScalarArgument)
arguments.get(TABLE_NAME_VAR_NAME))
+ .getValue()))
+ .toStringUtf8();
+ }
+ throw new TrinoException(
+ INVALID_FUNCTION_ARGUMENT, TABLE_NAME_VAR_NAME + " argument
not found");
+ }
+
+ private static boolean argumentExists(Map<String, Argument> arguments,
String key) {
+ Argument argument = arguments.get(key);
+ if (argument instanceof ScalarArgument) {
+ return !(((ScalarArgument) argument).getValue() == null);
+ }
+ throw new IllegalArgumentException("Unsupported argument type: " +
argument);
+ }
+
+ private static Object checkNonNull(Object argumentValue) {
+ if (argumentValue == null) {
+ throw new TrinoException(
+ INVALID_FUNCTION_ARGUMENT, FUNCTION_NAME + " arguments may
not be null");
+ }
+ return argumentValue;
+ }
+}
diff --git
a/paimon-trino-388/src/main/java/org/apache/paimon/trino/TableChangesFunctionProvider.java
b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TableChangesFunctionProvider.java
new file mode 100644
index 0000000..0f9f8a9
--- /dev/null
+++
b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TableChangesFunctionProvider.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino;
+
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorTableFunction;
+import io.trino.spi.ptf.ConnectorTableFunction;
+
+import static java.util.Objects.requireNonNull;
+
+/** TableChangesFunctionProvider. */
+public class TableChangesFunctionProvider implements
Provider<ConnectorTableFunction> {
+ private final TrinoMetadataFactory trinoMetadataFactory;
+
+ @Inject
+ public TableChangesFunctionProvider(TrinoMetadataFactory
trinoMetadataFactory) {
+ this.trinoMetadataFactory =
+ requireNonNull(trinoMetadataFactory, "trinoMetadataFactory is
null");
+ }
+
+ @Override
+ public ConnectorTableFunction get() {
+ return new ClassLoaderSafeConnectorTableFunction(
+ new TableChangesFunction(trinoMetadataFactory),
getClass().getClassLoader());
+ }
+}
diff --git
a/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoConnector.java
b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoConnector.java
new file mode 100644
index 0000000..043616f
--- /dev/null
+++ b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoConnector.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino;
+
+import io.trino.spi.connector.Connector;
+import io.trino.spi.connector.ConnectorTransactionHandle;
+import io.trino.spi.ptf.ConnectorTableFunction;
+import io.trino.spi.transaction.IsolationLevel;
+
+import java.util.Set;
+
+/** Trino {@link Connector}. */
+public class TrinoConnector extends TrinoConnectorBase {
+
+ public TrinoConnector(
+ TrinoMetadataBase trinoMetadata,
+ TrinoSplitManagerBase trinoSplitManager,
+ TrinoPageSourceProvider trinoPageSourceProvider,
+ TrinoTableOptions trinoTableOptions,
+ TrinoSessionProperties trinoSessionProperties,
+ Set<ConnectorTableFunction> connectorTableFunctions) {
+ super(
+ trinoMetadata,
+ trinoSplitManager,
+ trinoPageSourceProvider,
+ trinoTableOptions,
+ trinoSessionProperties,
+ connectorTableFunctions);
+ }
+
+ @Override
+ public ConnectorTransactionHandle beginTransaction(
+ IsolationLevel isolationLevel, boolean readOnly) {
+ return beginTransactionBase(isolationLevel, readOnly);
+ }
+
+ @Override
+ public TrinoMetadataBase getMetadata(ConnectorTransactionHandle
transactionHandle) {
+ return getMetadataBase(transactionHandle);
+ }
+}
diff --git
a/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoConnectorBase.java
b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoConnectorBase.java
new file mode 100644
index 0000000..24d6aeb
--- /dev/null
+++
b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoConnectorBase.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableSet;
+
+import io.trino.spi.connector.Connector;
+import io.trino.spi.connector.ConnectorTransactionHandle;
+import io.trino.spi.ptf.ConnectorTableFunction;
+import io.trino.spi.session.PropertyMetadata;
+import io.trino.spi.transaction.IsolationLevel;
+
+import java.util.List;
+import java.util.Set;
+
+import static io.trino.spi.transaction.IsolationLevel.READ_COMMITTED;
+import static io.trino.spi.transaction.IsolationLevel.checkConnectorSupports;
+import static java.util.Objects.requireNonNull;
+
+/** Trino {@link Connector}. */
+public abstract class TrinoConnectorBase implements Connector {
+ private final TrinoMetadataBase trinoMetadata;
+ private final TrinoSplitManagerBase trinoSplitManager;
+ private final TrinoPageSourceProvider trinoPageSourceProvider;
+ private final List<PropertyMetadata<?>> tableProperties;
+ private final List<PropertyMetadata<?>> sessionProperties;
+ private final Set<ConnectorTableFunction> tableFunctions;
+
+ public TrinoConnectorBase(
+ TrinoMetadataBase trinoMetadata,
+ TrinoSplitManagerBase trinoSplitManager,
+ TrinoPageSourceProvider trinoPageSourceProvider,
+ TrinoTableOptions trinoTableOptions,
+ TrinoSessionProperties trinoSessionProperties,
+ Set<ConnectorTableFunction> tableFunctions) {
+ this.trinoMetadata = requireNonNull(trinoMetadata, "jmxMetadata is
null");
+ this.trinoSplitManager = requireNonNull(trinoSplitManager,
"jmxSplitManager is null");
+ this.trinoPageSourceProvider =
+ requireNonNull(trinoPageSourceProvider, "jmxRecordSetProvider
is null");
+ this.tableProperties = trinoTableOptions.getTableProperties();
+ sessionProperties = trinoSessionProperties.getSessionProperties();
+ this.tableFunctions =
+ ImmutableSet.copyOf(requireNonNull(tableFunctions,
"tableFunctions is null"));
+ }
+
+ protected ConnectorTransactionHandle beginTransactionBase(
+ IsolationLevel isolationLevel, boolean readOnly) {
+ checkConnectorSupports(READ_COMMITTED, isolationLevel);
+ return TrinoTransactionHandle.INSTANCE;
+ }
+
+ protected TrinoMetadataBase getMetadataBase(ConnectorTransactionHandle
transactionHandle) {
+ return trinoMetadata;
+ }
+
+ @Override
+ public TrinoSplitManagerBase getSplitManager() {
+ return trinoSplitManager;
+ }
+
+ @Override
+ public TrinoPageSourceProvider getPageSourceProvider() {
+ return trinoPageSourceProvider;
+ }
+
+ @Override
+ public List<PropertyMetadata<?>> getSessionProperties() {
+ return sessionProperties;
+ }
+
+ @Override
+ public List<PropertyMetadata<?>> getTableProperties() {
+ return tableProperties;
+ }
+
+ @Override
+ public Set<ConnectorTableFunction> getTableFunctions() {
+ return tableFunctions;
+ }
+}
diff --git
a/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoConnectorFactoryBase.java
b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoConnectorFactoryBase.java
new file mode 100644
index 0000000..8a54f1d
--- /dev/null
+++
b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoConnectorFactoryBase.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino;
+
+import com.google.inject.Injector;
+import com.google.inject.Key;
+import com.google.inject.Module;
+import io.airlift.bootstrap.Bootstrap;
+import io.airlift.json.JsonModule;
+import io.trino.plugin.hive.HiveHdfsModule;
+import io.trino.plugin.hive.NodeVersion;
+import io.trino.plugin.hive.authentication.HdfsAuthenticationModule;
+import io.trino.spi.classloader.ThreadContextClassLoader;
+import io.trino.spi.connector.Connector;
+import io.trino.spi.connector.ConnectorContext;
+import io.trino.spi.connector.ConnectorFactory;
+import io.trino.spi.ptf.ConnectorTableFunction;
+import io.trino.spi.type.TypeManager;
+
+import java.util.Map;
+import java.util.Set;
+
+/** Trino {@link ConnectorFactory}. */
+public abstract class TrinoConnectorFactoryBase implements ConnectorFactory {
+
+ @Override
+ public String getName() {
+ return "paimon";
+ }
+
+ @Override
+ public Connector create(
+ String catalogName, Map<String, String> config, ConnectorContext
context) {
+
+ try (ThreadContextClassLoader ignored =
+ new
ThreadContextClassLoader(TrinoConnectorFactoryBase.class.getClassLoader())) {
+ Bootstrap app = new Bootstrap(modules(catalogName, config,
context));
+
+ Injector injector =
+ app.doNotInitializeLogging()
+ .setRequiredConfigurationProperties(Map.of())
+ .setOptionalConfigurationProperties(config)
+ .initialize();
+
+ TrinoMetadata trinoMetadata =
injector.getInstance(TrinoMetadataFactory.class).create();
+ TrinoSplitManager trinoSplitManager =
injector.getInstance(TrinoSplitManager.class);
+ TrinoPageSourceProvider trinoPageSourceProvider =
+ injector.getInstance(TrinoPageSourceProvider.class);
+ TrinoSessionProperties trinoSessionProperties =
+ injector.getInstance(TrinoSessionProperties.class);
+ TrinoTableOptions trinoTableOptions =
injector.getInstance(TrinoTableOptions.class);
+ Set<ConnectorTableFunction> connectorTableFunctions =
+ injector.getInstance(new Key<>() {});
+
+ return new TrinoConnector(
+ trinoMetadata,
+ trinoSplitManager,
+ trinoPageSourceProvider,
+ trinoTableOptions,
+ trinoSessionProperties,
+ connectorTableFunctions);
+ }
+ }
+
+ protected Module[] modules(
+ String catalogName, Map<String, String> config, ConnectorContext
context) {
+ return new Module[] {
+ new JsonModule(),
+ new TrinoModule(config),
+ new HiveHdfsModule(),
+ new HdfsAuthenticationModule(),
+ binder -> {
+ binder.bind(NodeVersion.class)
+ .toInstance(
+ new NodeVersion(
+
context.getNodeManager().getCurrentNode().getVersion()));
+
binder.bind(TypeManager.class).toInstance(context.getTypeManager());
+ }
+ };
+ }
+}
diff --git
a/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoMetadataBase.java
b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoMetadataBase.java
new file mode 100644
index 0000000..dfcc435
--- /dev/null
+++
b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoMetadataBase.java
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.InstantiationUtil;
+import org.apache.paimon.utils.StringUtils;
+
+import io.trino.spi.connector.Assignment;
+import io.trino.spi.connector.ColumnHandle;
+import io.trino.spi.connector.ColumnMetadata;
+import io.trino.spi.connector.ConnectorMetadata;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorTableHandle;
+import io.trino.spi.connector.ConnectorTableMetadata;
+import io.trino.spi.connector.ConnectorTableProperties;
+import io.trino.spi.connector.Constraint;
+import io.trino.spi.connector.ConstraintApplicationResult;
+import io.trino.spi.connector.LimitApplicationResult;
+import io.trino.spi.connector.ProjectionApplicationResult;
+import io.trino.spi.connector.SchemaTableName;
+import io.trino.spi.connector.SchemaTablePrefix;
+import io.trino.spi.connector.TableFunctionApplicationResult;
+import io.trino.spi.expression.ConnectorExpression;
+import io.trino.spi.predicate.Domain;
+import io.trino.spi.predicate.TupleDomain;
+import io.trino.spi.ptf.ConnectorTableFunctionHandle;
+import io.trino.spi.security.TrinoPrincipal;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Trino {@link ConnectorMetadata}. */
+public abstract class TrinoMetadataBase implements ConnectorMetadata {
+
+ protected final Catalog catalog;
+
+ public TrinoMetadataBase(Catalog catalog) {
+ this.catalog = catalog;
+ }
+
+ @Override
+ public List<String> listSchemaNames(ConnectorSession session) {
+ return listSchemaNames();
+ }
+
+ private List<String> listSchemaNames() {
+ return catalog.listDatabases();
+ }
+
+ @Override
+ public void createSchema(
+ ConnectorSession session,
+ String schemaName,
+ Map<String, Object> properties,
+ TrinoPrincipal owner) {
+ checkArgument(
+ !StringUtils.isNullOrWhitespaceOnly(schemaName),
+ "schemaName cannot be null or empty");
+
+ try {
+ catalog.createDatabase(schemaName, true);
+ } catch (Catalog.DatabaseAlreadyExistException e) {
+ throw new RuntimeException(format("database already existed:
'%s'", schemaName));
+ }
+ }
+
+ @Override
+ public void dropSchema(ConnectorSession session, String schemaName) {
+ checkArgument(
+ !StringUtils.isNullOrWhitespaceOnly(schemaName),
+ "schemaName cannot be null or empty");
+ try {
+ catalog.dropDatabase(schemaName, false, true);
+ } catch (Catalog.DatabaseNotEmptyException e) {
+ throw new RuntimeException(format("database is not empty: '%s'",
schemaName));
+ } catch (Catalog.DatabaseNotExistException e) {
+ throw new RuntimeException(format("database not exists: '%s'",
schemaName));
+ }
+ }
+
+ @Override
+ public TrinoTableHandle getTableHandle(ConnectorSession session,
SchemaTableName tableName) {
+ return getTableHandle(tableName, null);
+ }
+
+ @Override
+ public ConnectorTableProperties getTableProperties(
+ ConnectorSession session, ConnectorTableHandle table) {
+ return new ConnectorTableProperties();
+ }
+
+ public TrinoTableHandle getTableHandle(
+ SchemaTableName tableName, Map<String, String> dynamicOptions) {
+ Identifier tablePath = new Identifier(tableName.getSchemaName(),
tableName.getTableName());
+ byte[] serializedTable;
+ try {
+ Table table = catalog.getTable(tablePath);
+ if (dynamicOptions != null && !dynamicOptions.isEmpty()) {
+ table = table.copy(dynamicOptions);
+ }
+ serializedTable = InstantiationUtil.serializeObject(table);
+ } catch (Catalog.TableNotExistException e) {
+ return null;
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+
+ return new TrinoTableHandle(
+ tableName.getSchemaName(), tableName.getTableName(),
serializedTable);
+ }
+
+ @Override
+ public ConnectorTableMetadata getTableMetadata(
+ ConnectorSession session, ConnectorTableHandle tableHandle) {
+ return ((TrinoTableHandle) tableHandle).tableMetadata();
+ }
+
+ @Override
+ public List<SchemaTableName> listTables(ConnectorSession session,
Optional<String> schemaName) {
+ List<SchemaTableName> tables = new ArrayList<>();
+ schemaName
+ .map(Collections::singletonList)
+ .orElseGet(catalog::listDatabases)
+ .forEach(schema -> tables.addAll(listTables(schema)));
+ return tables;
+ }
+
+ private List<SchemaTableName> listTables(String schema) {
+ try {
+ return catalog.listTables(schema).stream()
+ .map(table -> new SchemaTableName(schema, table))
+ .collect(toList());
+ } catch (Catalog.DatabaseNotExistException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void createTable(
+ ConnectorSession session,
+ ConnectorTableMetadata tableMetadata,
+ boolean ignoreExisting) {
+ SchemaTableName table = tableMetadata.getTable();
+ Identifier identifier = Identifier.create(table.getSchemaName(),
table.getTableName());
+
+ try {
+ catalog.createTable(identifier, prepareSchema(tableMetadata),
false);
+ } catch (Catalog.DatabaseNotExistException e) {
+ throw new RuntimeException(format("database not exists: '%s'",
table.getSchemaName()));
+ } catch (Catalog.TableAlreadyExistException e) {
+ throw new RuntimeException(format("table already existed: '%s'",
table.getTableName()));
+ }
+ }
+
+ private Schema prepareSchema(ConnectorTableMetadata tableMetadata) {
+ Map<String, Object> properties = new
HashMap<>(tableMetadata.getProperties());
+ Schema.Builder builder =
+ Schema.newBuilder()
+
.primaryKey(TrinoTableOptions.getPrimaryKeys(properties))
+
.partitionKeys(TrinoTableOptions.getPartitionedKeys(properties));
+
+ for (ColumnMetadata column : tableMetadata.getColumns()) {
+ builder.column(
+ column.getName(),
+ TrinoTypeUtils.toPaimonType(column.getType()),
+ column.getComment());
+ }
+
+ TrinoTableOptionUtils.buildOptions(builder, properties);
+
+ return builder.build();
+ }
+
+ @Override
+ public void renameTable(
+ ConnectorSession session,
+ ConnectorTableHandle tableHandle,
+ SchemaTableName newTableName) {
+ TrinoTableHandle oldTableHandle = (TrinoTableHandle) tableHandle;
+ try {
+ catalog.renameTable(
+ new Identifier(oldTableHandle.getSchemaName(),
oldTableHandle.getTableName()),
+ new Identifier(newTableName.getSchemaName(),
newTableName.getTableName()),
+ false);
+ } catch (Catalog.TableNotExistException e) {
+ throw new RuntimeException(
+ format("table not exists: '%s'",
oldTableHandle.getTableName()));
+ } catch (Catalog.TableAlreadyExistException e) {
+ throw new RuntimeException(
+ format("table already existed: '%s'",
newTableName.getTableName()));
+ }
+ }
+
+ @Override
+ public void dropTable(ConnectorSession session, ConnectorTableHandle
tableHandle) {
+ TrinoTableHandle trinoTableHandle = (TrinoTableHandle) tableHandle;
+ try {
+ catalog.dropTable(
+ new Identifier(
+ trinoTableHandle.getSchemaName(),
trinoTableHandle.getTableName()),
+ false);
+ } catch (Catalog.TableNotExistException e) {
+ throw new RuntimeException(
+ format("table not exists: '%s'",
trinoTableHandle.getTableName()));
+ }
+ }
+
+ @Override
+ public Map<String, ColumnHandle> getColumnHandles(
+ ConnectorSession session, ConnectorTableHandle tableHandle) {
+ TrinoTableHandle table = (TrinoTableHandle) tableHandle;
+ Map<String, ColumnHandle> handleMap = new HashMap<>();
+ for (ColumnMetadata column : table.columnMetadatas()) {
+ handleMap.put(column.getName(),
table.columnHandle(column.getName()));
+ }
+ return handleMap;
+ }
+
+ @Override
+ public ColumnMetadata getColumnMetadata(
+ ConnectorSession session, ConnectorTableHandle tableHandle,
ColumnHandle columnHandle) {
+ return ((TrinoColumnHandle) columnHandle).getColumnMetadata();
+ }
+
+ @Override
+ public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(
+ ConnectorSession session, SchemaTablePrefix prefix) {
+ requireNonNull(prefix, "prefix is null");
+ List<SchemaTableName> tableNames;
+ if (prefix.getTable().isPresent()) {
+ tableNames = Collections.singletonList(prefix.toSchemaTableName());
+ } else {
+ tableNames = listTables(session, prefix.getSchema());
+ }
+
+ return tableNames.stream()
+ .collect(
+ toMap(
+ Function.identity(),
+ table -> getTableHandle(session,
table).columnMetadatas()));
+ }
+
+ @Override
+ public void addColumn(
+ ConnectorSession session, ConnectorTableHandle tableHandle,
ColumnMetadata column) {
+ TrinoTableHandle trinoTableHandle = (TrinoTableHandle) tableHandle;
+ Identifier identifier =
+ new Identifier(trinoTableHandle.getSchemaName(),
trinoTableHandle.getTableName());
+ List<SchemaChange> changes = new ArrayList<>();
+ changes.add(
+ SchemaChange.addColumn(
+ column.getName(),
TrinoTypeUtils.toPaimonType(column.getType())));
+ try {
+ catalog.alterTable(identifier, changes, false);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ format("failed to alter table: '%s'",
trinoTableHandle.getTableName()), e);
+ }
+ }
+
+ @Override
+ public void renameColumn(
+ ConnectorSession session,
+ ConnectorTableHandle tableHandle,
+ ColumnHandle source,
+ String target) {
+ TrinoTableHandle trinoTableHandle = (TrinoTableHandle) tableHandle;
+ Identifier identifier =
+ new Identifier(trinoTableHandle.getSchemaName(),
trinoTableHandle.getTableName());
+ TrinoColumnHandle trinoColumnHandle = (TrinoColumnHandle) source;
+ List<SchemaChange> changes = new ArrayList<>();
+
changes.add(SchemaChange.renameColumn(trinoColumnHandle.getColumnName(),
target));
+ try {
+ catalog.alterTable(identifier, changes, false);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ format("failed to alter table: '%s'",
trinoTableHandle.getTableName()), e);
+ }
+ }
+
+ @Override
+ public void dropColumn(
+ ConnectorSession session, ConnectorTableHandle tableHandle,
ColumnHandle column) {
+ TrinoTableHandle trinoTableHandle = (TrinoTableHandle) tableHandle;
+ Identifier identifier =
+ new Identifier(trinoTableHandle.getSchemaName(),
trinoTableHandle.getTableName());
+ TrinoColumnHandle trinoColumnHandle = (TrinoColumnHandle) column;
+ List<SchemaChange> changes = new ArrayList<>();
+
changes.add(SchemaChange.dropColumn(trinoColumnHandle.getColumnName()));
+ try {
+ catalog.alterTable(identifier, changes, false);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ format("failed to alter table: '%s'",
trinoTableHandle.getTableName()), e);
+ }
+ }
+
+ @Override
+ public Optional<ConstraintApplicationResult<ConnectorTableHandle>>
applyFilter(
+ ConnectorSession session, ConnectorTableHandle handle, Constraint
constraint) {
+ TrinoTableHandle trinoTableHandle = (TrinoTableHandle) handle;
+ TupleDomain<TrinoColumnHandle> oldFilter =
trinoTableHandle.getFilter();
+ TupleDomain<TrinoColumnHandle> newFilter =
+ constraint
+ .getSummary()
+ .transformKeys(TrinoColumnHandle.class::cast)
+ .intersect(oldFilter);
+ if (oldFilter.equals(newFilter)) {
+ return Optional.empty();
+ }
+
+ LinkedHashMap<TrinoColumnHandle, Domain> acceptedDomains = new
LinkedHashMap<>();
+ LinkedHashMap<TrinoColumnHandle, Domain> unsupportedDomains = new
LinkedHashMap<>();
+ new TrinoFilterConverter(trinoTableHandle.table().rowType())
+ .convert(newFilter, acceptedDomains, unsupportedDomains);
+
+ List<String> partitionKeys = trinoTableHandle.table().partitionKeys();
+ LinkedHashMap<TrinoColumnHandle, Domain> unenforcedDomains = new
LinkedHashMap<>();
+ acceptedDomains.forEach(
+ (columnHandle, domain) -> {
+ if (!partitionKeys.contains(columnHandle.getColumnName()))
{
+ unenforcedDomains.put(columnHandle, domain);
+ }
+ });
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ TupleDomain<ColumnHandle> remain =
+ (TupleDomain)
+ TupleDomain.withColumnDomains(unsupportedDomains)
+
.intersect(TupleDomain.withColumnDomains(unenforcedDomains));
+
+ return Optional.of(
+ new
ConstraintApplicationResult<>(trinoTableHandle.copy(newFilter), remain, false));
+ }
+
+ @Override
+ public Optional<ProjectionApplicationResult<ConnectorTableHandle>>
applyProjection(
+ ConnectorSession session,
+ ConnectorTableHandle handle,
+ List<ConnectorExpression> projections,
+ Map<String, ColumnHandle> assignments) {
+ TrinoTableHandle trinoTableHandle = (TrinoTableHandle) handle;
+ List<ColumnHandle> newColumns = new ArrayList<>(assignments.values());
+
+ if (trinoTableHandle.getProjectedColumns().isPresent()
+ && containSameElements(newColumns,
trinoTableHandle.getProjectedColumns().get())) {
+ return Optional.empty();
+ }
+
+ List<Assignment> assignmentList = new ArrayList<>();
+ assignments.forEach(
+ (name, column) ->
+ assignmentList.add(
+ new Assignment(
+ name,
+ column,
+ ((TrinoColumnHandle)
column).getTrinoType())));
+
+ return Optional.of(
+ new ProjectionApplicationResult<>(
+ trinoTableHandle.copy(Optional.of(newColumns)),
+ projections,
+ assignmentList,
+ false));
+ }
+
+ private static boolean containSameElements(
+ List<? extends ColumnHandle> first, List<? extends ColumnHandle>
second) {
+ return new HashSet<>(first).equals(new HashSet<>(second));
+ }
+
+ @Override
+ public Optional<LimitApplicationResult<ConnectorTableHandle>> applyLimit(
+ ConnectorSession session, ConnectorTableHandle handle, long limit)
{
+ TrinoTableHandle table = (TrinoTableHandle) handle;
+
+ if (table.getLimit().isPresent() && table.getLimit().getAsLong() <=
limit) {
+ return Optional.empty();
+ }
+
+ if (!table.getFilter().isAll()) {
+ LinkedHashMap<TrinoColumnHandle, Domain> acceptedDomains = new
LinkedHashMap<>();
+ LinkedHashMap<TrinoColumnHandle, Domain> unsupportedDomains = new
LinkedHashMap<>();
+ new TrinoFilterConverter(table.table().rowType())
+ .convert(table.getFilter(), acceptedDomains,
unsupportedDomains);
+ Set<String> acceptedFields =
+ acceptedDomains.keySet().stream()
+ .map(TrinoColumnHandle::getColumnName)
+ .collect(Collectors.toSet());
+ if (unsupportedDomains.size() > 0
+ ||
!table.table().partitionKeys().containsAll(acceptedFields)) {
+ return Optional.empty();
+ }
+ }
+
+ table = table.copy(OptionalLong.of(limit));
+
+ return Optional.of(new LimitApplicationResult<>(table, false, false));
+ }
+
+ @Override
+ public Optional<TableFunctionApplicationResult<ConnectorTableHandle>>
applyTableFunction(
+ ConnectorSession session, ConnectorTableFunctionHandle handle) {
+ return Optional.of(
+ new TableFunctionApplicationResult(
+ handle, ((TrinoTableHandle)
handle).getProjectedColumns().get()));
+ }
+}
diff --git
a/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoModule.java
b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoModule.java
new file mode 100644
index 0000000..b356b71
--- /dev/null
+++ b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoModule.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino;
+
+import org.apache.paimon.options.Options;
+
+import com.google.inject.Binder;
+import com.google.inject.Module;
+import com.google.inject.Scopes;
+import io.trino.spi.ptf.ConnectorTableFunction;
+
+import java.util.Map;
+
+import static com.google.inject.Scopes.SINGLETON;
+import static com.google.inject.multibindings.Multibinder.newSetBinder;
+
+/** Module for binding instance. */
+public class TrinoModule implements Module {
+ private Map<String, String> config;
+
+ public TrinoModule(Map<String, String> config) {
+ this.config = config;
+ }
+
+ @Override
+ public void configure(Binder binder) {
+ binder.bind(Options.class).toInstance(new Options(config));
+ binder.bind(TrinoMetadataFactory.class).in(SINGLETON);
+ binder.bind(TrinoSplitManager.class).in(SINGLETON);
+ binder.bind(TrinoPageSourceProvider.class).in(SINGLETON);
+ binder.bind(TrinoSessionProperties.class).in(SINGLETON);
+ binder.bind(TrinoTableOptions.class).in(SINGLETON);
+ newSetBinder(binder, ConnectorTableFunction.class)
+ .addBinding()
+ .toProvider(TableChangesFunctionProvider.class)
+ .in(Scopes.SINGLETON);
+ }
+}
diff --git
a/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoSplitManagerBase.java
b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoSplitManagerBase.java
new file mode 100644
index 0000000..61a26d2
--- /dev/null
+++
b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoSplitManagerBase.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino;
+
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
+
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorSplitManager;
+import io.trino.spi.connector.ConnectorSplitSource;
+import io.trino.spi.connector.ConnectorTableHandle;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Trino {@link ConnectorSplitManager}. */
+public abstract class TrinoSplitManagerBase implements ConnectorSplitManager {
+
+ protected ConnectorSplitSource getSplits(
+ ConnectorTableHandle connectorTableHandle, ConnectorSession
session) {
+ // TODO dynamicFilter?
+ // TODO what is constraint?
+
+ TrinoTableHandle tableHandle = (TrinoTableHandle) connectorTableHandle;
+ Table table = tableHandle.tableWithDynamicOptions(session);
+ ReadBuilder readBuilder = table.newReadBuilder();
+ new TrinoFilterConverter(table.rowType())
+ .convert(tableHandle.getFilter())
+ .ifPresent(readBuilder::withFilter);
+ tableHandle.getLimit().ifPresent(limit -> readBuilder.withLimit((int)
limit));
+ List<Split> splits = readBuilder.newScan().plan().splits();
+
+ long maxRowCount =
splits.stream().mapToLong(Split::rowCount).max().orElse(0L);
+ double minimumSplitWeight =
TrinoSessionProperties.getMinimumSplitWeight(session);
+ return new TrinoSplitSource(
+ splits.stream()
+ .map(
+ split ->
+ TrinoSplit.fromSplit(
+ split,
+ Math.min(
+ Math.max(
+ (double)
split.rowCount()
+ /
maxRowCount,
+
minimumSplitWeight),
+ 1.0)))
+ .collect(Collectors.toList()));
+ }
+}
diff --git
a/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoTableHandle.java
b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoTableHandle.java
new file mode 100644
index 0000000..a9d0a6d
--- /dev/null
+++
b/paimon-trino-388/src/main/java/org/apache/paimon/trino/TrinoTableHandle.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.InstantiationUtil;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.trino.spi.connector.ColumnHandle;
+import io.trino.spi.connector.ColumnMetadata;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorTableHandle;
+import io.trino.spi.connector.ConnectorTableMetadata;
+import io.trino.spi.connector.SchemaTableName;
+import io.trino.spi.predicate.TupleDomain;
+import io.trino.spi.ptf.ConnectorTableFunctionHandle;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.stream.Collectors;
+
+/** Trino {@link ConnectorTableHandle}. */
+public final class TrinoTableHandle implements ConnectorTableHandle,
ConnectorTableFunctionHandle {
+
+ private final String schemaName;
+ private final String tableName;
+ private final byte[] serializedTable;
+ private final TupleDomain<TrinoColumnHandle> filter;
+ private final Optional<List<ColumnHandle>> projectedColumns;
+ private final OptionalLong limit;
+
+ private Table lazyTable;
+
+ public TrinoTableHandle(String schemaName, String tableName, byte[]
serializedTable) {
+ this(
+ schemaName,
+ tableName,
+ serializedTable,
+ TupleDomain.all(),
+ Optional.empty(),
+ OptionalLong.empty());
+ }
+
+ @JsonCreator
+ public TrinoTableHandle(
+ @JsonProperty("schemaName") String schemaName,
+ @JsonProperty("tableName") String tableName,
+ @JsonProperty("serializedTable") byte[] serializedTable,
+ @JsonProperty("filter") TupleDomain<TrinoColumnHandle> filter,
+ @JsonProperty("projection") Optional<List<ColumnHandle>>
projectedColumns,
+ @JsonProperty("limit") OptionalLong limit) {
+ this.schemaName = schemaName;
+ this.tableName = tableName;
+ this.serializedTable = serializedTable;
+ this.filter = filter;
+ this.projectedColumns = projectedColumns;
+ this.limit = limit;
+ }
+
+ @JsonProperty
+ public String getSchemaName() {
+ return schemaName;
+ }
+
+ @JsonProperty
+ public String getTableName() {
+ return tableName;
+ }
+
+ @JsonProperty
+ public byte[] getSerializedTable() {
+ return serializedTable;
+ }
+
+ @JsonProperty
+ public TupleDomain<TrinoColumnHandle> getFilter() {
+ return filter;
+ }
+
+ @JsonProperty
+ public Optional<List<ColumnHandle>> getProjectedColumns() {
+ return projectedColumns;
+ }
+
+ public OptionalLong getLimit() {
+ return limit;
+ }
+
+ public TrinoTableHandle copy(TupleDomain<TrinoColumnHandle> filter) {
+ return new TrinoTableHandle(
+ schemaName, tableName, serializedTable, filter,
projectedColumns, limit);
+ }
+
+ public TrinoTableHandle copy(Optional<List<ColumnHandle>>
projectedColumns) {
+ return new TrinoTableHandle(
+ schemaName, tableName, serializedTable, filter,
projectedColumns, limit);
+ }
+
+ public TrinoTableHandle copy(OptionalLong limit) {
+ return new TrinoTableHandle(
+ schemaName, tableName, serializedTable, filter,
projectedColumns, limit);
+ }
+
+ public Table tableWithDynamicOptions(ConnectorSession session) {
+ // see TrinoConnector.getSessionProperties
+ Map<String, String> dynamicOptions = new HashMap<>();
+ Long scanTimestampMills =
TrinoSessionProperties.getScanTimestampMillis(session);
+ if (scanTimestampMills != null) {
+ dynamicOptions.put(
+ CoreOptions.SCAN_TIMESTAMP_MILLIS.key(),
scanTimestampMills.toString());
+ }
+ Long scanSnapshotId =
TrinoSessionProperties.getScanSnapshotId(session);
+ if (scanSnapshotId != null) {
+ dynamicOptions.put(CoreOptions.SCAN_SNAPSHOT_ID.key(),
scanSnapshotId.toString());
+ }
+
+ return dynamicOptions.size() > 0 ? table().copy(dynamicOptions) :
table();
+ }
+
+ public Table table() {
+ if (lazyTable == null) {
+ try {
+ lazyTable =
+ InstantiationUtil.deserializeObject(
+ serializedTable,
this.getClass().getClassLoader());
+ } catch (IOException | ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return lazyTable;
+ }
+
+ public ConnectorTableMetadata tableMetadata() {
+ return new ConnectorTableMetadata(
+ SchemaTableName.schemaTableName(schemaName, tableName),
+ columnMetadatas(),
+ Collections.emptyMap(),
+ Optional.empty());
+ }
+
+ public List<ColumnMetadata> columnMetadatas() {
+ return table().rowType().getFields().stream()
+ .map(
+ column ->
+ ColumnMetadata.builder()
+ .setName(column.name())
+
.setType(TrinoTypeUtils.fromPaimonType(column.type()))
+
.setNullable(column.type().isNullable())
+
.setComment(Optional.ofNullable(column.description()))
+ .build())
+ .collect(Collectors.toList());
+ }
+
+ public TrinoColumnHandle columnHandle(String field) {
+ List<String> fieldNames = FieldNameUtils.fieldNames(table().rowType());
+ int index = fieldNames.indexOf(field);
+ if (index == -1) {
+ throw new RuntimeException(
+ String.format("Cannot find field %s in schema %s", field,
fieldNames));
+ }
+ return TrinoColumnHandle.of(field, table().rowType().getTypeAt(index));
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TrinoTableHandle that = (TrinoTableHandle) o;
+ return Arrays.equals(serializedTable, that.serializedTable)
+ && Objects.equals(schemaName, that.schemaName)
+ && Objects.equals(tableName, that.tableName)
+ && Objects.equals(filter, that.filter)
+ && Objects.equals(projectedColumns, that.projectedColumns);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ schemaName, tableName, filter, projectedColumns,
Arrays.hashCode(serializedTable));
+ }
+}
diff --git
a/paimon-trino-388/src/test/java/org/apache/paimon/trino/TestTrino388ITCase.java
b/paimon-trino-388/src/test/java/org/apache/paimon/trino/TestTrino388ITCase.java
index decefcf..073b4bd 100644
---
a/paimon-trino-388/src/test/java/org/apache/paimon/trino/TestTrino388ITCase.java
+++
b/paimon-trino-388/src/test/java/org/apache/paimon/trino/TestTrino388ITCase.java
@@ -18,9 +18,35 @@
package org.apache.paimon.trino;
+import org.testng.annotations.Test;
+
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
/** {@link TestTrinoITCase} for Trino 388. */
public class TestTrino388ITCase extends TestTrinoITCase {
public TestTrino388ITCase() {
super(388);
}
+
+ @Test
+ public void testIncrementalRead() {
+ assertThatExceptionOfType(RuntimeException.class)
+ .isThrownBy(
+ () ->
+ sql(
+ "SELECT * FROM
TABLE(paimon.system.table_changes(schema_name=>'default',table_name=>'t2'))"))
+ .withMessage(
+ "Either INCREMENTAL_BETWEEN or
INCREMENTAL_BETWEEN_TIMESTAMP must be provided");
+ assertThat(
+ sql(
+ "SELECT * FROM
TABLE(paimon.system.table_changes(schema_name=>'default',table_name=>'t2',incremental_between=>'1,2'))"))
+ .isEqualTo("[[5, 6, 3, 3], [7, 8, 4, 4]]");
+ assertThat(
+ sql(
+ String.format(
+ "SELECT * FROM
TABLE(paimon.system.table_changes(schema_name=>'default',table_name=>'t2',incremental_between_timestamp=>'%s,%s'))",
+ t2FirstCommitTimestamp,
System.currentTimeMillis())))
+ .isEqualTo("[[5, 6, 3, 3], [7, 8, 4, 4]]");
+ }
}
diff --git
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoTableOptionUtils.java
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoTableOptionUtils.java
index bcde932..b22e3b1 100644
---
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoTableOptionUtils.java
+++
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoTableOptionUtils.java
@@ -125,7 +125,7 @@ public class TrinoTableOptionUtils {
}
}
- private static String convertOptionKey(String key) {
+ public static String convertOptionKey(String key) {
String regex = "[.\\-]";
Pattern pattern = Pattern.compile(regex);
Matcher matcher = pattern.matcher(key);