This is an automated email from the ASF dual-hosted git repository.

mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new e29bf6553 feat: pass vended credentials to Iceberg native scan (#3523)
e29bf6553 is described below

commit e29bf65535631778dcc73cc8ea31b8e2f5e1c789
Author: Tornike Gurgenidze <[email protected]>
AuthorDate: Tue Feb 17 03:31:54 2026 +0400

    feat: pass vended credentials to Iceberg native scan (#3523)
    
    * feat: pass vended credentials to Iceberg native scan
    
    * test: add negative test for credential vending
    
    * fix: address comments
    
    ---------
    
    Co-authored-by: Matt Butrovich <[email protected]>
---
 pom.xml                                            | 31 +++++++++
 spark/pom.xml                                      | 17 +++++
 .../apache/comet/iceberg/IcebergReflection.scala   | 26 ++++++++
 .../org/apache/comet/rules/CometScanRule.scala     | 17 +++--
 .../serde/operator/CometIcebergNativeScan.scala    | 15 +++++
 .../apache/iceberg/rest/RESTCatalogAdapter.java    | 28 ++++++++
 .../org/apache/comet/IcebergReadFromS3Suite.scala  | 74 +++++++++++++++++++++-
 .../apache/comet/iceberg/RESTCatalogHelper.scala   | 25 +++++++-
 .../apache/comet/iceberg/RESTCatalogHelper.scala   | 25 +++++++-
 9 files changed, 249 insertions(+), 9 deletions(-)

diff --git a/pom.xml b/pom.xml
index 1b33fc475..e647cbdcc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -477,12 +477,43 @@ under the License.
         <version>${testcontainers.version}</version>
         <scope>test</scope>
       </dependency>
+      <!--
+        AWS SDK modules for Iceberg REST catalog + S3 tests.
+        iceberg-spark-runtime treats the AWS SDK as provided scope, so tests
+        that exercise Iceberg's S3FileIO (via ResolvingFileIO) must supply 
these.
+        AwsProperties references all service client types in method signatures,
+        and Java serialization introspection resolves them at class-load time.
+      -->
       <dependency>
         <groupId>software.amazon.awssdk</groupId>
         <artifactId>s3</artifactId>
         <version>${amazon-awssdk-v2.version}</version>
         <scope>test</scope>
       </dependency>
+      <dependency>
+        <groupId>software.amazon.awssdk</groupId>
+        <artifactId>sts</artifactId>
+        <version>${amazon-awssdk-v2.version}</version>
+        <scope>test</scope>
+      </dependency>
+      <dependency>
+        <groupId>software.amazon.awssdk</groupId>
+        <artifactId>dynamodb</artifactId>
+        <version>${amazon-awssdk-v2.version}</version>
+        <scope>test</scope>
+      </dependency>
+      <dependency>
+        <groupId>software.amazon.awssdk</groupId>
+        <artifactId>glue</artifactId>
+        <version>${amazon-awssdk-v2.version}</version>
+        <scope>test</scope>
+      </dependency>
+      <dependency>
+        <groupId>software.amazon.awssdk</groupId>
+        <artifactId>kms</artifactId>
+        <version>${amazon-awssdk-v2.version}</version>
+        <scope>test</scope>
+      </dependency>
 
       <dependency>
         <groupId>org.codehaus.jackson</groupId>
diff --git a/spark/pom.xml b/spark/pom.xml
index a9cd72f51..1b207288c 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -169,10 +169,27 @@ under the License.
       <groupId>org.testcontainers</groupId>
       <artifactId>minio</artifactId>
     </dependency>
+    <!-- AWS SDK modules required by Iceberg's S3FileIO (see parent pom for 
details) -->
     <dependency>
       <groupId>software.amazon.awssdk</groupId>
       <artifactId>s3</artifactId>
     </dependency>
+    <dependency>
+      <groupId>software.amazon.awssdk</groupId>
+      <artifactId>sts</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>software.amazon.awssdk</groupId>
+      <artifactId>dynamodb</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>software.amazon.awssdk</groupId>
+      <artifactId>glue</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>software.amazon.awssdk</groupId>
+      <artifactId>kms</artifactId>
+    </dependency>
     <!-- Jetty and Iceberg dependencies for testing native Iceberg scan -->
     <!-- Note: The specific versions are defined in profiles below based on 
Spark version -->
   </dependencies>
diff --git 
a/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala 
b/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala
index 7642749ad..d77821239 100644
--- a/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala
+++ b/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala
@@ -237,6 +237,32 @@ object IcebergReflection extends Logging {
     }
   }
 
+  /**
+   * Gets storage properties from an Iceberg table's FileIO.
+   *
+   * This extracts credentials from the FileIO implementation, which is 
critical for REST catalog
+   * credential vending. The REST catalog returns temporary S3 credentials 
per-table via the
+   * loadTable response, stored in the table's FileIO (typically 
ResolvingFileIO).
+   *
+   * The properties() method is not on the FileIO interface -- it exists on 
specific
+   * implementations like ResolvingFileIO and S3FileIO. Returns None 
gracefully when unavailable.
+   */
+  def getFileIOProperties(table: Any): Option[Map[String, String]] = {
+    import scala.jdk.CollectionConverters._
+    getFileIO(table).flatMap { fileIO =>
+      findMethodInHierarchy(fileIO.getClass, "properties").flatMap { 
propsMethod =>
+        propsMethod.invoke(fileIO) match {
+          case javaMap: java.util.Map[_, _] =>
+            val scalaMap = javaMap.asScala.collect { case (k: String, v: 
String) =>
+              k -> v
+            }.toMap
+            if (scalaMap.nonEmpty) Some(scalaMap) else None
+          case _ => None
+        }
+      }
+    }
+  }
+
   /**
    * Gets the schema from an Iceberg table.
    */
diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala 
b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
index bb37515ab..404d209b4 100644
--- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
+++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
@@ -49,7 +49,7 @@ import 
org.apache.comet.iceberg.{CometIcebergNativeScanMetadata, IcebergReflecti
 import org.apache.comet.objectstore.NativeConfig
 import org.apache.comet.parquet.{Native, SupportsComet}
 import org.apache.comet.parquet.CometParquetUtils.{encryptionEnabled, 
isEncryptionConfigSupported}
-import org.apache.comet.serde.operator.CometNativeScan
+import org.apache.comet.serde.operator.{CometIcebergNativeScan, 
CometNativeScan}
 import org.apache.comet.shims.{CometTypeShim, ShimFileFormat, 
ShimSubqueryBroadcast}
 
 /**
@@ -387,9 +387,18 @@ case class CometScanRule(session: SparkSession)
 
             val hadoopS3Options = 
NativeConfig.extractObjectStoreOptions(hadoopConf, effectiveUri)
 
-            val catalogProperties =
-              org.apache.comet.serde.operator.CometIcebergNativeScan
-                .hadoopToIcebergS3Properties(hadoopS3Options)
+            val hadoopDerivedProperties =
+              
CometIcebergNativeScan.hadoopToIcebergS3Properties(hadoopS3Options)
+
+            // Extract vended credentials from FileIO (REST catalog credential 
vending).
+            // FileIO properties take precedence over Hadoop-derived 
properties because
+            // they contain per-table credentials vended by the REST catalog.
+            val fileIOProperties = tableOpt
+              .flatMap(IcebergReflection.getFileIOProperties)
+              .map(CometIcebergNativeScan.filterStorageProperties)
+              .getOrElse(Map.empty)
+
+            val catalogProperties = hadoopDerivedProperties ++ fileIOProperties
 
             val result = CometIcebergNativeScanMetadata
               .extract(scanExec.scan, effectiveLocation, catalogProperties)
diff --git 
a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala
 
b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala
index 957f62103..c86b2a51b 100644
--- 
a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala
+++ 
b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala
@@ -488,6 +488,21 @@ object CometIcebergNativeScan extends 
CometOperatorSerde[CometBatchScanExec] wit
     }
   }
 
+  /** Storage-related property prefixes passed through to native FileIO. */
+  private val storagePropertyPrefixes =
+    Seq("s3.", "gcs.", "adls.", "client.")
+
+  /**
+   * Filters a properties map to only include storage-related keys. 
FileIO.properties() may
+   * contain catalog URIs, bearer tokens, and other non-storage settings that 
should not be passed
+   * to the native FileIO builder.
+   */
+  def filterStorageProperties(props: Map[String, String]): Map[String, String] 
= {
+    props.filter { case (key, _) =>
+      storagePropertyPrefixes.exists(prefix => key.startsWith(prefix))
+    }
+  }
+
   /**
    * Transforms Hadoop S3A configuration keys to Iceberg FileIO property keys.
    *
diff --git 
a/spark/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java 
b/spark/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java
index 7d5d6ce6b..7b04110d3 100644
--- a/spark/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java
+++ b/spark/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java
@@ -98,6 +98,14 @@ public class RESTCatalogAdapter implements RESTClient {
   private final SupportsNamespaces asNamespaceCatalog;
   private final ViewCatalog asViewCatalog;
 
+  // Optional credentials to inject into loadTable responses, simulating REST 
catalog
+  // credential vending. When non-empty, these are added to 
LoadTableResponse.config().
+  private Map<String, String> vendedCredentials = ImmutableMap.of();
+
+  public void setVendedCredentials(Map<String, String> credentials) {
+    this.vendedCredentials = credentials;
+  }
+
   public RESTCatalogAdapter(Catalog catalog) {
     this.catalog = catalog;
     this.asNamespaceCatalog =
@@ -279,6 +287,26 @@ public class RESTCatalogAdapter implements RESTClient {
   @SuppressWarnings({"MethodLength", "checkstyle:CyclomaticComplexity"})
   public <T extends RESTResponse> T handleRequest(
       Route route, Map<String, String> vars, Object body, Class<T> 
responseType) {
+    T response = doHandleRequest(route, vars, body, responseType);
+    // Inject vended credentials into any LoadTableResponse, simulating REST 
catalog
+    // credential vending. This covers CREATE_TABLE, LOAD_TABLE, UPDATE_TABLE, 
etc.
+    if (!vendedCredentials.isEmpty() && response instanceof LoadTableResponse) 
{
+      LoadTableResponse original = (LoadTableResponse) response;
+      @SuppressWarnings("unchecked")
+      T withCreds =
+          (T)
+              LoadTableResponse.builder()
+                  .withTableMetadata(original.tableMetadata())
+                  .addAllConfig(original.config())
+                  .addAllConfig(vendedCredentials)
+                  .build();
+      return withCreds;
+    }
+    return response;
+  }
+
+  private <T extends RESTResponse> T doHandleRequest(
+      Route route, Map<String, String> vars, Object body, Class<T> 
responseType) {
     switch (route) {
       case TOKENS:
         return castResponse(responseType, handleOAuthRequest(body));
diff --git a/spark/src/test/scala/org/apache/comet/IcebergReadFromS3Suite.scala 
b/spark/src/test/scala/org/apache/comet/IcebergReadFromS3Suite.scala
index 00955e629..c1c90adfa 100644
--- a/spark/src/test/scala/org/apache/comet/IcebergReadFromS3Suite.scala
+++ b/spark/src/test/scala/org/apache/comet/IcebergReadFromS3Suite.scala
@@ -23,7 +23,9 @@ import org.apache.spark.SparkConf
 import org.apache.spark.sql.comet.CometIcebergNativeScanExec
 import org.apache.spark.sql.execution.SparkPlan
 
-class IcebergReadFromS3Suite extends CometS3TestBase {
+import org.apache.comet.iceberg.RESTCatalogHelper
+
+class IcebergReadFromS3Suite extends CometS3TestBase with RESTCatalogHelper {
 
   override protected val testBucketName = "test-iceberg-bucket"
 
@@ -227,4 +229,74 @@ class IcebergReadFromS3Suite extends CometS3TestBase {
 
     spark.sql("DROP TABLE s3_catalog.db.mor_delete_test")
   }
+
+  test("REST catalog credential vending rejects wrong credentials") {
+    assume(icebergAvailable, "Iceberg not available in classpath")
+
+    val wrongCreds = Map(
+      "s3.access-key-id" -> "WRONG_ACCESS_KEY",
+      "s3.secret-access-key" -> "WRONG_SECRET_KEY",
+      "s3.endpoint" -> minioContainer.getS3URL,
+      "s3.path-style-access" -> "true")
+    val warehouse = s"s3a://$testBucketName/warehouse-bad-creds"
+
+    withRESTCatalog(vendedCredentials = wrongCreds, warehouseLocation = 
Some(warehouse)) {
+      (restUri, _, _) =>
+        withSQLConf(
+          "spark.sql.catalog.bad_cat" -> 
"org.apache.iceberg.spark.SparkCatalog",
+          "spark.sql.catalog.bad_cat.catalog-impl" -> 
"org.apache.iceberg.rest.RESTCatalog",
+          "spark.sql.catalog.bad_cat.uri" -> restUri,
+          "spark.sql.catalog.bad_cat.warehouse" -> warehouse) {
+
+          spark.sql("CREATE NAMESPACE bad_cat.db")
+
+          // CREATE TABLE succeeds (metadata only, no S3 access needed)
+          spark.sql("CREATE TABLE bad_cat.db.test (id INT) USING iceberg")
+
+          // INSERT fails because S3FileIO uses the wrong vended credentials
+          val e = intercept[Exception] {
+            spark.sql("INSERT INTO bad_cat.db.test VALUES (1)")
+          }
+          assert(e.getMessage.contains("403"), s"Expected S3 403 error but 
got: ${e.getMessage}")
+        }
+    }
+  }
+
+  test("REST catalog credential vending with native Iceberg scan on S3") {
+    assume(icebergAvailable, "Iceberg not available in classpath")
+
+    val vendedCreds = Map(
+      "s3.access-key-id" -> userName,
+      "s3.secret-access-key" -> password,
+      "s3.endpoint" -> minioContainer.getS3URL,
+      "s3.path-style-access" -> "true")
+    val warehouse = s"s3a://$testBucketName/warehouse-vending"
+
+    withRESTCatalog(vendedCredentials = vendedCreds, warehouseLocation = 
Some(warehouse)) {
+      (restUri, _, _) =>
+        withSQLConf(
+          "spark.sql.catalog.vend_cat" -> 
"org.apache.iceberg.spark.SparkCatalog",
+          "spark.sql.catalog.vend_cat.catalog-impl" -> 
"org.apache.iceberg.rest.RESTCatalog",
+          "spark.sql.catalog.vend_cat.uri" -> restUri,
+          "spark.sql.catalog.vend_cat.warehouse" -> warehouse,
+          CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "true") {
+
+          spark.sql("CREATE NAMESPACE vend_cat.db")
+
+          spark.sql("""
+            CREATE TABLE vend_cat.db.simple (
+              id INT, name STRING, value DOUBLE
+            ) USING iceberg
+          """)
+          spark.sql("""
+            INSERT INTO vend_cat.db.simple
+            VALUES (1, 'Alice', 10.5), (2, 'Bob', 20.3), (3, 'Charlie', 30.7)
+          """)
+          checkIcebergNativeScan("SELECT * FROM vend_cat.db.simple ORDER BY 
id")
+
+          spark.sql("DROP TABLE vend_cat.db.simple")
+          spark.sql("DROP NAMESPACE vend_cat.db")
+        }
+    }
+  }
 }
diff --git 
a/spark/src/test/spark-3.x/org/apache/comet/iceberg/RESTCatalogHelper.scala 
b/spark/src/test/spark-3.x/org/apache/comet/iceberg/RESTCatalogHelper.scala
index 6230ee33e..856700c2c 100644
--- a/spark/src/test/spark-3.x/org/apache/comet/iceberg/RESTCatalogHelper.scala
+++ b/spark/src/test/spark-3.x/org/apache/comet/iceberg/RESTCatalogHelper.scala
@@ -26,7 +26,22 @@ import java.nio.file.Files
 trait RESTCatalogHelper {
 
   /** Helper to set up REST catalog with embedded Jetty server (Spark 3.x / 
Jetty 9.4) */
-  def withRESTCatalog(f: (String, org.eclipse.jetty.server.Server, File) => 
Unit): Unit = {
+  def withRESTCatalog(f: (String, org.eclipse.jetty.server.Server, File) => 
Unit): Unit =
+    withRESTCatalog()(f)
+
+  /**
+   * Helper to set up REST catalog with optional credential vending.
+   *
+   * @param vendedCredentials
+   *   Storage credentials to inject into loadTable responses, simulating REST 
catalog credential
+   *   vending. When non-empty, these are added to every 
LoadTableResponse.config().
+   * @param warehouseLocation
+   *   Override the warehouse location (e.g., for S3). Defaults to a local 
temp directory.
+   */
+  def withRESTCatalog(
+      vendedCredentials: Map[String, String] = Map.empty,
+      warehouseLocation: Option[String] = None)(
+      f: (String, org.eclipse.jetty.server.Server, File) => Unit): Unit = {
     import org.apache.iceberg.inmemory.InMemoryCatalog
     import org.apache.iceberg.CatalogProperties
     import org.apache.iceberg.rest.{RESTCatalogAdapter, RESTCatalogServlet}
@@ -35,12 +50,18 @@ trait RESTCatalogHelper {
     import org.eclipse.jetty.server.handler.gzip.GzipHandler
 
     val warehouseDir = 
Files.createTempDirectory("comet-rest-catalog-test").toFile
+    val effectiveWarehouse = 
warehouseLocation.getOrElse(warehouseDir.getAbsolutePath)
+
     val backendCatalog = new InMemoryCatalog()
     backendCatalog.initialize(
       "in-memory",
-      java.util.Map.of(CatalogProperties.WAREHOUSE_LOCATION, 
warehouseDir.getAbsolutePath))
+      java.util.Map.of(CatalogProperties.WAREHOUSE_LOCATION, 
effectiveWarehouse))
 
     val adapter = new RESTCatalogAdapter(backendCatalog)
+    if (vendedCredentials.nonEmpty) {
+      import scala.jdk.CollectionConverters._
+      adapter.setVendedCredentials(vendedCredentials.asJava)
+    }
     val servlet = new RESTCatalogServlet(adapter)
 
     val servletContext = new 
ServletContextHandler(ServletContextHandler.NO_SESSIONS)
diff --git 
a/spark/src/test/spark-4.0/org/apache/comet/iceberg/RESTCatalogHelper.scala 
b/spark/src/test/spark-4.0/org/apache/comet/iceberg/RESTCatalogHelper.scala
index ccd03c544..bd53804b8 100644
--- a/spark/src/test/spark-4.0/org/apache/comet/iceberg/RESTCatalogHelper.scala
+++ b/spark/src/test/spark-4.0/org/apache/comet/iceberg/RESTCatalogHelper.scala
@@ -26,7 +26,22 @@ import java.nio.file.Files
 trait RESTCatalogHelper {
 
   /** Helper to set up REST catalog with embedded Jetty server (Spark 4.0 / 
Jetty 11) */
-  def withRESTCatalog(f: (String, org.eclipse.jetty.server.Server, File) => 
Unit): Unit = {
+  def withRESTCatalog(f: (String, org.eclipse.jetty.server.Server, File) => 
Unit): Unit =
+    withRESTCatalog()(f)
+
+  /**
+   * Helper to set up REST catalog with optional credential vending.
+   *
+   * @param vendedCredentials
+   *   Storage credentials to inject into loadTable responses, simulating REST 
catalog credential
+   *   vending. When non-empty, these are added to every 
LoadTableResponse.config().
+   * @param warehouseLocation
+   *   Override the warehouse location (e.g., for S3). Defaults to a local 
temp directory.
+   */
+  def withRESTCatalog(
+      vendedCredentials: Map[String, String] = Map.empty,
+      warehouseLocation: Option[String] = None)(
+      f: (String, org.eclipse.jetty.server.Server, File) => Unit): Unit = {
     import org.apache.iceberg.inmemory.InMemoryCatalog
     import org.apache.iceberg.CatalogProperties
     import org.apache.iceberg.rest.{RESTCatalogAdapter, RESTCatalogServlet}
@@ -35,12 +50,18 @@ trait RESTCatalogHelper {
     import org.eclipse.jetty.server.handler.gzip.GzipHandler
 
     val warehouseDir = 
Files.createTempDirectory("comet-rest-catalog-test").toFile
+    val effectiveWarehouse = 
warehouseLocation.getOrElse(warehouseDir.getAbsolutePath)
+
     val backendCatalog = new InMemoryCatalog()
     backendCatalog.initialize(
       "in-memory",
-      java.util.Map.of(CatalogProperties.WAREHOUSE_LOCATION, 
warehouseDir.getAbsolutePath))
+      java.util.Map.of(CatalogProperties.WAREHOUSE_LOCATION, 
effectiveWarehouse))
 
     val adapter = new RESTCatalogAdapter(backendCatalog)
+    if (vendedCredentials.nonEmpty) {
+      import scala.jdk.CollectionConverters._
+      adapter.setVendedCredentials(vendedCredentials.asJava)
+    }
     val servlet = new RESTCatalogServlet(adapter)
 
     val servletContext = new 
ServletContextHandler(ServletContextHandler.NO_SESSIONS)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to