Copilot commented on code in PR #10500:
URL: https://github.com/apache/gravitino/pull/10500#discussion_r3008269849
##########
trino-connector/trino-connector-440-445/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata440.java:
##########
@@ -52,6 +54,23 @@ public void addColumn(
catalogConnectorMetadata.addColumn(getTableName(tableHandle),
gravitinoColumn);
}
+ @Override
+ public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(
+ ConnectorSession session,
+ ConnectorTableHandle tableHandle,
+ String procedureName,
+ Map<String, Object> executeProperties,
+ RetryMode retryMode) {
+ return internalMetadata.getTableHandleForExecute(
+ session, GravitinoHandle.unWrap(tableHandle), procedureName,
executeProperties, retryMode);
+ }
+
+ @Override
+ public void executeTableExecute(
+ ConnectorSession session, ConnectorTableExecuteHandle
tableExecuteHandle) {
+ internalMetadata.executeTableExecute(session, tableExecuteHandle);
Review Comment:
`getTableHandleForExecute` returns the internal connector’s
`ConnectorTableExecuteHandle` directly. Given the internal connector runs in a
separate plugin classloader, this can fail when Trino serializes the execute
handle for distributed table execute. Consider wrapping the execute handle in a
Gravitino handle wrapper (and unwrapping it before delegation), consistent with
how insert/merge handles are handled.
```suggestion
Optional<ConnectorTableExecuteHandle> internalHandle =
internalMetadata.getTableHandleForExecute(
session,
GravitinoHandle.unWrap(tableHandle),
procedureName,
executeProperties,
retryMode);
if (!internalHandle.isPresent()) {
return internalHandle;
}
SchemaTableName tableName = getTableName(tableHandle);
return internalHandle.map(
handle ->
new GravitinoTableExecuteHandle(
tableName.getSchemaName(), tableName.getTableName(),
handle));
}
@Override
public void executeTableExecute(
ConnectorSession session, ConnectorTableExecuteHandle
tableExecuteHandle) {
internalMetadata.executeTableExecute(session,
GravitinoHandle.unWrap(tableExecuteHandle));
```
##########
trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata.java:
##########
@@ -660,6 +664,46 @@ public Optional<ConnectorTableLayout> getInsertLayout(
: new ConnectorTableLayout(result.getPartitionColumns()));
}
+ @Override
+ public Optional<ConnectorTableLayout> getLayoutForTableExecute(
+ ConnectorSession session, ConnectorTableExecuteHandle
tableExecuteHandle) {
+ return internalMetadata
+ .getLayoutForTableExecute(session, tableExecuteHandle)
+ .map(
+ result ->
+ result.getPartitioning().isPresent()
+ ? new ConnectorTableLayout(
+ new
GravitinoPartitioningHandle(result.getPartitioning().get()),
+ result.getPartitionColumns(),
+ result.supportsMultipleWritersPerPartition())
+ : new ConnectorTableLayout(result.getPartitionColumns()));
+ }
+
+ @Override
+ public BeginTableExecuteResult<ConnectorTableExecuteHandle,
ConnectorTableHandle>
+ beginTableExecute(
+ ConnectorSession session,
+ ConnectorTableExecuteHandle tableExecuteHandle,
+ ConnectorTableHandle updatedSourceTableHandle) {
+ BeginTableExecuteResult<ConnectorTableExecuteHandle, ConnectorTableHandle>
result =
+ internalMetadata.beginTableExecute(
+ session, tableExecuteHandle,
GravitinoHandle.unWrap(updatedSourceTableHandle));
+ SchemaTableName tableName = getTableName(updatedSourceTableHandle);
+ return new BeginTableExecuteResult<>(
+ result.getTableExecuteHandle(),
+ new GravitinoTableHandle(
+ tableName.getSchemaName(), tableName.getTableName(),
result.getSourceHandle()));
+ }
Review Comment:
`ConnectorTableExecuteHandle` is being passed through and returned as the
internal connector’s handle (e.g., `result.getTableExecuteHandle()`), unlike
other handle types (table/insert/merge/partitioning) that are wrapped for
cross-plugin-classloader serialization. This can break table procedure
execution in distributed mode when Trino needs to serialize the execute handle.
Introduce a Gravitino wrapper for `ConnectorTableExecuteHandle` (similar to
`GravitinoInsertTableHandle`) and unwrap it before delegating to
`internalMetadata`/`pageSinkProvider`.
##########
trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoPageSinkProvider.java:
##########
@@ -65,6 +66,16 @@ public ConnectorPageSink createPageSink(
pageSinkId);
}
+ @Override
+ public ConnectorPageSink createPageSink(
+ ConnectorTransactionHandle transactionHandle,
+ ConnectorSession session,
+ ConnectorTableExecuteHandle tableExecuteHandle,
+ ConnectorPageSinkId pageSinkId) {
+ return pageSinkProvider.createPageSink(
+ GravitinoHandle.unWrap(transactionHandle), session,
tableExecuteHandle, pageSinkId);
Review Comment:
`createPageSink(..., ConnectorTableExecuteHandle, ...)` forwards
`tableExecuteHandle` without unwrapping/wrapping. If the execute handle comes
from an internal connector classloader, Trino may need it to be serialized
across nodes, and this connector already wraps other internal handles for that
reason. Consider adding a Gravitino wrapper for `ConnectorTableExecuteHandle`
and unwrap it here before delegating (consistent with the insert/merge
overloads).
```suggestion
GravitinoHandle.unWrap(transactionHandle),
session,
GravitinoHandle.unWrap(tableExecuteHandle),
pageSinkId);
```
##########
trino-connector/trino-connector-452-468/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata452.java:
##########
@@ -67,6 +69,23 @@ public Optional<ConnectorOutputMetadata> finishInsert(
computedStatistics);
}
+ @Override
+ public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(
+ ConnectorSession session,
+ ConnectorTableHandle tableHandle,
+ String procedureName,
+ Map<String, Object> executeProperties,
+ RetryMode retryMode) {
+ return internalMetadata.getTableHandleForExecute(
+ session, GravitinoHandle.unWrap(tableHandle), procedureName,
executeProperties, retryMode);
+ }
Review Comment:
`getTableHandleForExecute` returns the internal connector’s
`ConnectorTableExecuteHandle` directly. Since internal connectors are created
from separate plugin classloaders, this execute handle may not be safely
serializable/deserializable by Trino for distributed execution. Wrap the
execute handle in a Gravitino handle wrapper (and unwrap when delegating) to be
consistent with `GravitinoInsertTableHandle`/`GravitinoMergeTableHandle`.
##########
trino-connector/trino-connector-473-478/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata478.java:
##########
@@ -66,6 +68,29 @@ public void addColumn(
catalogConnectorMetadata.addColumn(getTableName(tableHandle),
gravitinoColumn, columnPosition);
}
+ @Override
+ public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(
+ ConnectorSession session,
+ ConnectorAccessControl accessControl,
+ ConnectorTableHandle tableHandle,
+ String procedureName,
+ Map<String, Object> executeProperties,
+ RetryMode retryMode) {
+ return internalMetadata.getTableHandleForExecute(
+ session,
+ accessControl,
+ GravitinoHandle.unWrap(tableHandle),
+ procedureName,
+ executeProperties,
+ retryMode);
+ }
+
+ @Override
+ public Map<String, Long> executeTableExecute(
+ ConnectorSession session, ConnectorTableExecuteHandle
tableExecuteHandle) {
+ return internalMetadata.executeTableExecute(session, tableExecuteHandle);
Review Comment:
`getTableHandleForExecute` returns the internal connector’s
`ConnectorTableExecuteHandle` directly. With internal connectors loaded from
separate plugin classloaders, this execute handle may not be
serializable/deserializable by Trino when running table execute across workers.
Wrap the handle in a Gravitino wrapper (and unwrap it when delegating)
similarly to how insert/merge handles are wrapped.
```suggestion
return internalMetadata
.getTableHandleForExecute(
session,
accessControl,
GravitinoHandle.unWrap(tableHandle),
procedureName,
executeProperties,
retryMode)
.map(GravitinoHandle::wrap);
}
@Override
public Map<String, Long> executeTableExecute(
ConnectorSession session, ConnectorTableExecuteHandle
tableExecuteHandle) {
return internalMetadata.executeTableExecute(
session, GravitinoHandle.unWrap(tableExecuteHandle));
```
##########
trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/lakehouse-iceberg/00013_snapshot_maintenance.sql:
##########
@@ -0,0 +1,63 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied. See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+-- Test Iceberg snapshot maintenance procedures via Gravitino connector
+
+CREATE SCHEMA IF NOT EXISTS gt_snapshot_test;
+
+CREATE TABLE gt_snapshot_test.maintenance_table (
+ id int,
+ name varchar
+);
+
+-- Insert data to create snapshots
+INSERT INTO gt_snapshot_test.maintenance_table VALUES (1, 'alice');
+
+INSERT INTO gt_snapshot_test.maintenance_table VALUES (2, 'bob');
+
+INSERT INTO gt_snapshot_test.maintenance_table VALUES (3, 'charlie');
+
+-- Verify we have multiple snapshots
+SELECT count(*) >= 3 FROM "gt_snapshot_test"."maintenance_table$snapshots";
+
+-- Test expire_snapshots procedure (expire snapshots older than a far future
timestamp keeps all)
+ALTER TABLE gt_snapshot_test.maintenance_table EXECUTE
expire_snapshots(retention_threshold => '0s');
Review Comment:
The comment says the `expire_snapshots` call uses “a far future timestamp
keeps all”, but the statement uses `retention_threshold => '0s'` (which is
effectively immediate and can expire snapshots older than “now”). Either adjust
the comment to match the behavior being tested, or change the parameters so the
SQL matches the stated intent.
```suggestion
ALTER TABLE gt_snapshot_test.maintenance_table EXECUTE
expire_snapshots(retention_threshold => '36500d');
```
##########
trino-connector/trino-connector-446-451/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata446.java:
##########
@@ -52,6 +54,23 @@ public void addColumn(
catalogConnectorMetadata.addColumn(getTableName(tableHandle),
gravitinoColumn);
}
+ @Override
+ public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(
+ ConnectorSession session,
+ ConnectorTableHandle tableHandle,
+ String procedureName,
+ Map<String, Object> executeProperties,
+ RetryMode retryMode) {
+ return internalMetadata.getTableHandleForExecute(
+ session, GravitinoHandle.unWrap(tableHandle), procedureName,
executeProperties, retryMode);
+ }
+
+ @Override
+ public void executeTableExecute(
+ ConnectorSession session, ConnectorTableExecuteHandle
tableExecuteHandle) {
+ internalMetadata.executeTableExecute(session, tableExecuteHandle);
Review Comment:
`getTableHandleForExecute` returns the internal connector’s
`ConnectorTableExecuteHandle` directly. If the internal connector is loaded in
a separate plugin classloader, Trino may be unable to serialize/deserialize
this handle during distributed execution of table procedures. Wrap it in a
Gravitino handle wrapper and unwrap it before delegating, similar to the
existing insert/merge handle pattern.
```suggestion
Optional<ConnectorTableExecuteHandle> internalHandle =
internalMetadata.getTableHandleForExecute(
session,
GravitinoHandle.unWrap(tableHandle),
procedureName,
executeProperties,
retryMode);
SchemaTableName tableName = getTableName(tableHandle);
return internalHandle.map(
handle ->
new GravitinoTableExecuteHandle(
tableName.getSchemaName(), tableName.getTableName(),
handle));
}
@Override
public void executeTableExecute(
ConnectorSession session, ConnectorTableExecuteHandle
tableExecuteHandle) {
internalMetadata.executeTableExecute(session,
GravitinoHandle.unWrap(tableExecuteHandle));
```
##########
trino-connector/trino-connector-469-472/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata469.java:
##########
@@ -73,6 +75,29 @@ public Optional<ConnectorOutputMetadata> finishInsert(
computedStatistics);
}
+ @Override
+ public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(
+ ConnectorSession session,
+ ConnectorAccessControl accessControl,
+ ConnectorTableHandle tableHandle,
+ String procedureName,
+ Map<String, Object> executeProperties,
+ RetryMode retryMode) {
+ return internalMetadata.getTableHandleForExecute(
+ session,
+ accessControl,
+ GravitinoHandle.unWrap(tableHandle),
+ procedureName,
+ executeProperties,
+ retryMode);
+ }
+
+ @Override
+ public void executeTableExecute(
+ ConnectorSession session, ConnectorTableExecuteHandle
tableExecuteHandle) {
+ internalMetadata.executeTableExecute(session, tableExecuteHandle);
Review Comment:
`getTableHandleForExecute` returns the internal connector’s
`ConnectorTableExecuteHandle` directly. Because the internal connector is
loaded via a separate plugin classloader, this handle can fail to
serialize/deserialize when Trino distributes the table execute operation. Wrap
the returned execute handle in a Gravitino handle wrapper and unwrap it when
delegating (consistent with the insert/merge handle wrappers).
```suggestion
SchemaTableName tableName = getTableName(tableHandle);
return internalMetadata
.getTableHandleForExecute(
session,
accessControl,
GravitinoHandle.unWrap(tableHandle),
procedureName,
executeProperties,
retryMode)
.map(
handle ->
new GravitinoTableExecuteHandle(
tableName.getSchemaName(), tableName.getTableName(),
handle));
}
@Override
public void executeTableExecute(
ConnectorSession session, ConnectorTableExecuteHandle
tableExecuteHandle) {
internalMetadata.executeTableExecute(
session, GravitinoHandle.unWrap(tableExecuteHandle));
```
##########
trino-connector/trino-connector-435-439/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata435.java:
##########
@@ -50,6 +52,23 @@ public void addColumn(
catalogConnectorMetadata.addColumn(getTableName(tableHandle),
gravitinoColumn);
}
+ @Override
+ public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(
+ ConnectorSession session,
+ ConnectorTableHandle tableHandle,
+ String procedureName,
+ Map<String, Object> executeProperties,
+ RetryMode retryMode) {
+ return internalMetadata.getTableHandleForExecute(
+ session, GravitinoHandle.unWrap(tableHandle), procedureName,
executeProperties, retryMode);
+ }
+
+ @Override
+ public void executeTableExecute(
+ ConnectorSession session, ConnectorTableExecuteHandle
tableExecuteHandle) {
+ internalMetadata.executeTableExecute(session, tableExecuteHandle);
Review Comment:
`getTableHandleForExecute` returns the internal connector’s
`ConnectorTableExecuteHandle` directly. Since internal connectors are loaded
via separate `PluginClassLoader`s, this handle may not be
serializable/deserializable by Trino when distributed execution requires
shipping the handle to workers. Wrap the returned execute handle in a Gravitino
handle wrapper (and unwrap it when delegating) to match the existing pattern
used for insert/merge handles.
```suggestion
Optional<ConnectorTableExecuteHandle> internalHandle =
internalMetadata.getTableHandleForExecute(
session,
GravitinoHandle.unWrap(tableHandle),
procedureName,
executeProperties,
retryMode);
if (!internalHandle.isPresent()) {
return internalHandle;
}
SchemaTableName tableName = getTableName(tableHandle);
return Optional.of(
new GravitinoHandle(
tableName.getSchemaName(), tableName.getTableName(),
internalHandle.get()));
}
@Override
public void executeTableExecute(
ConnectorSession session, ConnectorTableExecuteHandle
tableExecuteHandle) {
internalMetadata.executeTableExecute(
session, GravitinoHandle.unWrap(tableExecuteHandle));
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]