Copilot commented on code in PR #10500:
URL: https://github.com/apache/gravitino/pull/10500#discussion_r3008269849


##########
trino-connector/trino-connector-440-445/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata440.java:
##########
@@ -52,6 +54,23 @@ public void addColumn(
     catalogConnectorMetadata.addColumn(getTableName(tableHandle), 
gravitinoColumn);
   }
 
+  @Override
+  public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(
+      ConnectorSession session,
+      ConnectorTableHandle tableHandle,
+      String procedureName,
+      Map<String, Object> executeProperties,
+      RetryMode retryMode) {
+    return internalMetadata.getTableHandleForExecute(
+        session, GravitinoHandle.unWrap(tableHandle), procedureName, 
executeProperties, retryMode);
+  }
+
+  @Override
+  public void executeTableExecute(
+      ConnectorSession session, ConnectorTableExecuteHandle 
tableExecuteHandle) {
+    internalMetadata.executeTableExecute(session, tableExecuteHandle);

Review Comment:
   `getTableHandleForExecute` returns the internal connector’s 
`ConnectorTableExecuteHandle` directly. Given the internal connector runs in a 
separate plugin classloader, this can fail when Trino serializes the execute 
handle for distributed table execute. Consider wrapping the execute handle in a 
Gravitino handle wrapper (and unwrapping it before delegation), consistent with 
how insert/merge handles are handled.
   ```suggestion
       Optional<ConnectorTableExecuteHandle> internalHandle =
           internalMetadata.getTableHandleForExecute(
               session,
               GravitinoHandle.unWrap(tableHandle),
               procedureName,
               executeProperties,
               retryMode);
   
       if (!internalHandle.isPresent()) {
         return internalHandle;
       }
   
       SchemaTableName tableName = getTableName(tableHandle);
       return internalHandle.map(
           handle ->
               new GravitinoTableExecuteHandle(
                   tableName.getSchemaName(), tableName.getTableName(), 
handle));
     }
   
     @Override
     public void executeTableExecute(
         ConnectorSession session, ConnectorTableExecuteHandle 
tableExecuteHandle) {
       internalMetadata.executeTableExecute(session, 
GravitinoHandle.unWrap(tableExecuteHandle));
   ```



##########
trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata.java:
##########
@@ -660,6 +664,46 @@ public Optional<ConnectorTableLayout> getInsertLayout(
                     : new ConnectorTableLayout(result.getPartitionColumns()));
   }
 
+  @Override
+  public Optional<ConnectorTableLayout> getLayoutForTableExecute(
+      ConnectorSession session, ConnectorTableExecuteHandle 
tableExecuteHandle) {
+    return internalMetadata
+        .getLayoutForTableExecute(session, tableExecuteHandle)
+        .map(
+            result ->
+                result.getPartitioning().isPresent()
+                    ? new ConnectorTableLayout(
+                        new 
GravitinoPartitioningHandle(result.getPartitioning().get()),
+                        result.getPartitionColumns(),
+                        result.supportsMultipleWritersPerPartition())
+                    : new ConnectorTableLayout(result.getPartitionColumns()));
+  }
+
+  @Override
+  public BeginTableExecuteResult<ConnectorTableExecuteHandle, 
ConnectorTableHandle>
+      beginTableExecute(
+          ConnectorSession session,
+          ConnectorTableExecuteHandle tableExecuteHandle,
+          ConnectorTableHandle updatedSourceTableHandle) {
+    BeginTableExecuteResult<ConnectorTableExecuteHandle, ConnectorTableHandle> 
result =
+        internalMetadata.beginTableExecute(
+            session, tableExecuteHandle, 
GravitinoHandle.unWrap(updatedSourceTableHandle));
+    SchemaTableName tableName = getTableName(updatedSourceTableHandle);
+    return new BeginTableExecuteResult<>(
+        result.getTableExecuteHandle(),
+        new GravitinoTableHandle(
+            tableName.getSchemaName(), tableName.getTableName(), 
result.getSourceHandle()));
+  }

Review Comment:
   `ConnectorTableExecuteHandle` is being passed through and returned as the 
internal connector’s handle (e.g., `result.getTableExecuteHandle()`), unlike 
other handle types (table/insert/merge/partitioning) that are wrapped for 
cross-plugin-classloader serialization. This can break table procedure 
execution in distributed mode when Trino needs to serialize the execute handle. 
Introduce a Gravitino wrapper for `ConnectorTableExecuteHandle` (similar to 
`GravitinoInsertTableHandle`) and unwrap it before delegating to 
`internalMetadata`/`pageSinkProvider`.



##########
trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoPageSinkProvider.java:
##########
@@ -65,6 +66,16 @@ public ConnectorPageSink createPageSink(
         pageSinkId);
   }
 
+  @Override
+  public ConnectorPageSink createPageSink(
+      ConnectorTransactionHandle transactionHandle,
+      ConnectorSession session,
+      ConnectorTableExecuteHandle tableExecuteHandle,
+      ConnectorPageSinkId pageSinkId) {
+    return pageSinkProvider.createPageSink(
+        GravitinoHandle.unWrap(transactionHandle), session, 
tableExecuteHandle, pageSinkId);

Review Comment:
   `createPageSink(..., ConnectorTableExecuteHandle, ...)` forwards 
`tableExecuteHandle` without unwrapping/wrapping. If the execute handle comes 
from an internal connector classloader, Trino may need it to be serialized 
across nodes, and this connector already wraps other internal handles for that 
reason. Consider adding a Gravitino wrapper for `ConnectorTableExecuteHandle` 
and unwrap it here before delegating (consistent with the insert/merge 
overloads).
   ```suggestion
           GravitinoHandle.unWrap(transactionHandle),
           session,
           GravitinoHandle.unWrap(tableExecuteHandle),
           pageSinkId);
   ```



##########
trino-connector/trino-connector-452-468/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata452.java:
##########
@@ -67,6 +69,23 @@ public Optional<ConnectorOutputMetadata> finishInsert(
         computedStatistics);
   }
 
+  @Override
+  public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(
+      ConnectorSession session,
+      ConnectorTableHandle tableHandle,
+      String procedureName,
+      Map<String, Object> executeProperties,
+      RetryMode retryMode) {
+    return internalMetadata.getTableHandleForExecute(
+        session, GravitinoHandle.unWrap(tableHandle), procedureName, 
executeProperties, retryMode);
+  }

Review Comment:
   `getTableHandleForExecute` returns the internal connector’s 
`ConnectorTableExecuteHandle` directly. Since internal connectors are created 
from separate plugin classloaders, this execute handle may not be safely 
serializable/deserializable by Trino for distributed execution. Wrap the 
execute handle in a Gravitino handle wrapper (and unwrap when delegating) to be 
consistent with `GravitinoInsertTableHandle`/`GravitinoMergeTableHandle`.



##########
trino-connector/trino-connector-473-478/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata478.java:
##########
@@ -66,6 +68,29 @@ public void addColumn(
     catalogConnectorMetadata.addColumn(getTableName(tableHandle), 
gravitinoColumn, columnPosition);
   }
 
+  @Override
+  public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(
+      ConnectorSession session,
+      ConnectorAccessControl accessControl,
+      ConnectorTableHandle tableHandle,
+      String procedureName,
+      Map<String, Object> executeProperties,
+      RetryMode retryMode) {
+    return internalMetadata.getTableHandleForExecute(
+        session,
+        accessControl,
+        GravitinoHandle.unWrap(tableHandle),
+        procedureName,
+        executeProperties,
+        retryMode);
+  }
+
+  @Override
+  public Map<String, Long> executeTableExecute(
+      ConnectorSession session, ConnectorTableExecuteHandle 
tableExecuteHandle) {
+    return internalMetadata.executeTableExecute(session, tableExecuteHandle);

Review Comment:
   `getTableHandleForExecute` returns the internal connector’s 
`ConnectorTableExecuteHandle` directly. With internal connectors loaded from 
separate plugin classloaders, this execute handle may not be 
serializable/deserializable by Trino when running table execute across workers. 
Wrap the handle in a Gravitino wrapper (and unwrap it when delegating) 
similarly to how insert/merge handles are wrapped.
   ```suggestion
       return internalMetadata
           .getTableHandleForExecute(
               session,
               accessControl,
               GravitinoHandle.unWrap(tableHandle),
               procedureName,
               executeProperties,
               retryMode)
           .map(GravitinoHandle::wrap);
     }
   
     @Override
     public Map<String, Long> executeTableExecute(
         ConnectorSession session, ConnectorTableExecuteHandle 
tableExecuteHandle) {
       return internalMetadata.executeTableExecute(
           session, GravitinoHandle.unWrap(tableExecuteHandle));
   ```



##########
trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/lakehouse-iceberg/00013_snapshot_maintenance.sql:
##########
@@ -0,0 +1,63 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--  http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied.  See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+-- Test Iceberg snapshot maintenance procedures via Gravitino connector
+
+CREATE SCHEMA IF NOT EXISTS gt_snapshot_test;
+
+CREATE TABLE gt_snapshot_test.maintenance_table (
+    id int,
+    name varchar
+);
+
+-- Insert data to create snapshots
+INSERT INTO gt_snapshot_test.maintenance_table VALUES (1, 'alice');
+
+INSERT INTO gt_snapshot_test.maintenance_table VALUES (2, 'bob');
+
+INSERT INTO gt_snapshot_test.maintenance_table VALUES (3, 'charlie');
+
+-- Verify we have multiple snapshots
+SELECT count(*) >= 3 FROM "gt_snapshot_test"."maintenance_table$snapshots";
+
+-- Test expire_snapshots procedure (expire snapshots older than a far future 
timestamp keeps all)
+ALTER TABLE gt_snapshot_test.maintenance_table EXECUTE 
expire_snapshots(retention_threshold => '0s');

Review Comment:
   The comment says the `expire_snapshots` call uses “a far future timestamp 
keeps all”, but the statement uses `retention_threshold => '0s'` (which is 
effectively immediate and can expire snapshots older than “now”). Either adjust 
the comment to match the behavior being tested, or change the parameters so the 
SQL matches the stated intent.
   ```suggestion
   ALTER TABLE gt_snapshot_test.maintenance_table EXECUTE 
expire_snapshots(retention_threshold => '36500d');
   ```



##########
trino-connector/trino-connector-446-451/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata446.java:
##########
@@ -52,6 +54,23 @@ public void addColumn(
     catalogConnectorMetadata.addColumn(getTableName(tableHandle), 
gravitinoColumn);
   }
 
+  @Override
+  public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(
+      ConnectorSession session,
+      ConnectorTableHandle tableHandle,
+      String procedureName,
+      Map<String, Object> executeProperties,
+      RetryMode retryMode) {
+    return internalMetadata.getTableHandleForExecute(
+        session, GravitinoHandle.unWrap(tableHandle), procedureName, 
executeProperties, retryMode);
+  }
+
+  @Override
+  public void executeTableExecute(
+      ConnectorSession session, ConnectorTableExecuteHandle 
tableExecuteHandle) {
+    internalMetadata.executeTableExecute(session, tableExecuteHandle);

Review Comment:
   `getTableHandleForExecute` returns the internal connector’s 
`ConnectorTableExecuteHandle` directly. If the internal connector is loaded in 
a separate plugin classloader, Trino may be unable to serialize/deserialize 
this handle during distributed execution of table procedures. Wrap it in a 
Gravitino handle wrapper and unwrap it before delegating, similar to the 
existing insert/merge handle pattern.
   ```suggestion
       Optional<ConnectorTableExecuteHandle> internalHandle =
           internalMetadata.getTableHandleForExecute(
               session,
               GravitinoHandle.unWrap(tableHandle),
               procedureName,
               executeProperties,
               retryMode);
   
       SchemaTableName tableName = getTableName(tableHandle);
       return internalHandle.map(
           handle ->
               new GravitinoTableExecuteHandle(
                   tableName.getSchemaName(), tableName.getTableName(), 
handle));
     }
   
     @Override
     public void executeTableExecute(
         ConnectorSession session, ConnectorTableExecuteHandle 
tableExecuteHandle) {
       internalMetadata.executeTableExecute(session, 
GravitinoHandle.unWrap(tableExecuteHandle));
   ```



##########
trino-connector/trino-connector-469-472/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata469.java:
##########
@@ -73,6 +75,29 @@ public Optional<ConnectorOutputMetadata> finishInsert(
         computedStatistics);
   }
 
+  @Override
+  public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(
+      ConnectorSession session,
+      ConnectorAccessControl accessControl,
+      ConnectorTableHandle tableHandle,
+      String procedureName,
+      Map<String, Object> executeProperties,
+      RetryMode retryMode) {
+    return internalMetadata.getTableHandleForExecute(
+        session,
+        accessControl,
+        GravitinoHandle.unWrap(tableHandle),
+        procedureName,
+        executeProperties,
+        retryMode);
+  }
+
+  @Override
+  public void executeTableExecute(
+      ConnectorSession session, ConnectorTableExecuteHandle 
tableExecuteHandle) {
+    internalMetadata.executeTableExecute(session, tableExecuteHandle);

Review Comment:
   `getTableHandleForExecute` returns the internal connector’s 
`ConnectorTableExecuteHandle` directly. Because the internal connector is 
loaded via a separate plugin classloader, this handle can fail to 
serialize/deserialize when Trino distributes the table execute operation. Wrap 
the returned execute handle in a Gravitino handle wrapper and unwrap it when 
delegating (consistent with the insert/merge handle wrappers).
   ```suggestion
       SchemaTableName tableName = getTableName(tableHandle);
       return internalMetadata
           .getTableHandleForExecute(
               session,
               accessControl,
               GravitinoHandle.unWrap(tableHandle),
               procedureName,
               executeProperties,
               retryMode)
           .map(
               handle ->
                   new GravitinoTableExecuteHandle(
                       tableName.getSchemaName(), tableName.getTableName(), 
handle));
     }
   
     @Override
     public void executeTableExecute(
         ConnectorSession session, ConnectorTableExecuteHandle 
tableExecuteHandle) {
       internalMetadata.executeTableExecute(
           session, GravitinoHandle.unWrap(tableExecuteHandle));
   ```



##########
trino-connector/trino-connector-435-439/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata435.java:
##########
@@ -50,6 +52,23 @@ public void addColumn(
     catalogConnectorMetadata.addColumn(getTableName(tableHandle), 
gravitinoColumn);
   }
 
+  @Override
+  public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(
+      ConnectorSession session,
+      ConnectorTableHandle tableHandle,
+      String procedureName,
+      Map<String, Object> executeProperties,
+      RetryMode retryMode) {
+    return internalMetadata.getTableHandleForExecute(
+        session, GravitinoHandle.unWrap(tableHandle), procedureName, 
executeProperties, retryMode);
+  }
+
+  @Override
+  public void executeTableExecute(
+      ConnectorSession session, ConnectorTableExecuteHandle 
tableExecuteHandle) {
+    internalMetadata.executeTableExecute(session, tableExecuteHandle);

Review Comment:
   `getTableHandleForExecute` returns the internal connector’s 
`ConnectorTableExecuteHandle` directly. Since internal connectors are loaded 
via separate `PluginClassLoader`s, this handle may not be 
serializable/deserializable by Trino when distributed execution requires 
shipping the handle to workers. Wrap the returned execute handle in a Gravitino 
handle wrapper (and unwrap it when delegating) to match the existing pattern 
used for insert/merge handles.
   ```suggestion
       Optional<ConnectorTableExecuteHandle> internalHandle =
           internalMetadata.getTableHandleForExecute(
               session,
               GravitinoHandle.unWrap(tableHandle),
               procedureName,
               executeProperties,
               retryMode);
   
       if (!internalHandle.isPresent()) {
         return internalHandle;
       }
   
       SchemaTableName tableName = getTableName(tableHandle);
       return Optional.of(
           new GravitinoHandle(
               tableName.getSchemaName(), tableName.getTableName(), 
internalHandle.get()));
     }
   
     @Override
     public void executeTableExecute(
         ConnectorSession session, ConnectorTableExecuteHandle 
tableExecuteHandle) {
       internalMetadata.executeTableExecute(
           session, GravitinoHandle.unWrap(tableExecuteHandle));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to