This is an automated email from the ASF dual-hosted git repository.
mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new e29bf6553 feat: pass vended credentials to Iceberg native scan (#3523)
e29bf6553 is described below
commit e29bf65535631778dcc73cc8ea31b8e2f5e1c789
Author: Tornike Gurgenidze <[email protected]>
AuthorDate: Tue Feb 17 03:31:54 2026 +0400
feat: pass vended credentials to Iceberg native scan (#3523)
* feat: pass vended credentials to Iceberg native scan
* test: add negative test for credential vending
* fix: address comments
---------
Co-authored-by: Matt Butrovich <[email protected]>
---
pom.xml | 31 +++++++++
spark/pom.xml | 17 +++++
.../apache/comet/iceberg/IcebergReflection.scala | 26 ++++++++
.../org/apache/comet/rules/CometScanRule.scala | 17 +++--
.../serde/operator/CometIcebergNativeScan.scala | 15 +++++
.../apache/iceberg/rest/RESTCatalogAdapter.java | 28 ++++++++
.../org/apache/comet/IcebergReadFromS3Suite.scala | 74 +++++++++++++++++++++-
.../apache/comet/iceberg/RESTCatalogHelper.scala | 25 +++++++-
.../apache/comet/iceberg/RESTCatalogHelper.scala | 25 +++++++-
9 files changed, 249 insertions(+), 9 deletions(-)
diff --git a/pom.xml b/pom.xml
index 1b33fc475..e647cbdcc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -477,12 +477,43 @@ under the License.
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
+ <!--
+ AWS SDK modules for Iceberg REST catalog + S3 tests.
+ iceberg-spark-runtime treats the AWS SDK as provided scope, so tests
+ that exercise Iceberg's S3FileIO (via ResolvingFileIO) must supply
these.
+ AwsProperties references all service client types in method signatures,
+ and Java serialization introspection resolves them at class-load time.
+ -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<version>${amazon-awssdk-v2.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>sts</artifactId>
+ <version>${amazon-awssdk-v2.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>dynamodb</artifactId>
+ <version>${amazon-awssdk-v2.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>glue</artifactId>
+ <version>${amazon-awssdk-v2.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>kms</artifactId>
+ <version>${amazon-awssdk-v2.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
diff --git a/spark/pom.xml b/spark/pom.xml
index a9cd72f51..1b207288c 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -169,10 +169,27 @@ under the License.
<groupId>org.testcontainers</groupId>
<artifactId>minio</artifactId>
</dependency>
+ <!-- AWS SDK modules required by Iceberg's S3FileIO (see parent pom for
details) -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
</dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>sts</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>dynamodb</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>glue</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>kms</artifactId>
+ </dependency>
<!-- Jetty and Iceberg dependencies for testing native Iceberg scan -->
<!-- Note: The specific versions are defined in profiles below based on
Spark version -->
</dependencies>
diff --git
a/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala
b/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala
index 7642749ad..d77821239 100644
--- a/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala
+++ b/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala
@@ -237,6 +237,32 @@ object IcebergReflection extends Logging {
}
}
+ /**
+ * Gets storage properties from an Iceberg table's FileIO.
+ *
+ * This extracts credentials from the FileIO implementation, which is
critical for REST catalog
+ * credential vending. The REST catalog returns temporary S3 credentials
per-table via the
+ * loadTable response, stored in the table's FileIO (typically
ResolvingFileIO).
+ *
+ * The properties() method is not on the FileIO interface -- it exists on
specific
+ * implementations like ResolvingFileIO and S3FileIO. Returns None
gracefully when unavailable.
+ */
+ def getFileIOProperties(table: Any): Option[Map[String, String]] = {
+ import scala.jdk.CollectionConverters._
+ getFileIO(table).flatMap { fileIO =>
+ findMethodInHierarchy(fileIO.getClass, "properties").flatMap {
propsMethod =>
+ propsMethod.invoke(fileIO) match {
+ case javaMap: java.util.Map[_, _] =>
+ val scalaMap = javaMap.asScala.collect { case (k: String, v:
String) =>
+ k -> v
+ }.toMap
+ if (scalaMap.nonEmpty) Some(scalaMap) else None
+ case _ => None
+ }
+ }
+ }
+ }
+
/**
* Gets the schema from an Iceberg table.
*/
diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
index bb37515ab..404d209b4 100644
--- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
+++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
@@ -49,7 +49,7 @@ import
org.apache.comet.iceberg.{CometIcebergNativeScanMetadata, IcebergReflecti
import org.apache.comet.objectstore.NativeConfig
import org.apache.comet.parquet.{Native, SupportsComet}
import org.apache.comet.parquet.CometParquetUtils.{encryptionEnabled,
isEncryptionConfigSupported}
-import org.apache.comet.serde.operator.CometNativeScan
+import org.apache.comet.serde.operator.{CometIcebergNativeScan,
CometNativeScan}
import org.apache.comet.shims.{CometTypeShim, ShimFileFormat,
ShimSubqueryBroadcast}
/**
@@ -387,9 +387,18 @@ case class CometScanRule(session: SparkSession)
val hadoopS3Options =
NativeConfig.extractObjectStoreOptions(hadoopConf, effectiveUri)
- val catalogProperties =
- org.apache.comet.serde.operator.CometIcebergNativeScan
- .hadoopToIcebergS3Properties(hadoopS3Options)
+ val hadoopDerivedProperties =
+
CometIcebergNativeScan.hadoopToIcebergS3Properties(hadoopS3Options)
+
+ // Extract vended credentials from FileIO (REST catalog credential
vending).
+ // FileIO properties take precedence over Hadoop-derived
properties because
+ // they contain per-table credentials vended by the REST catalog.
+ val fileIOProperties = tableOpt
+ .flatMap(IcebergReflection.getFileIOProperties)
+ .map(CometIcebergNativeScan.filterStorageProperties)
+ .getOrElse(Map.empty)
+
+ val catalogProperties = hadoopDerivedProperties ++ fileIOProperties
val result = CometIcebergNativeScanMetadata
.extract(scanExec.scan, effectiveLocation, catalogProperties)
diff --git
a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala
b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala
index 957f62103..c86b2a51b 100644
---
a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala
+++
b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala
@@ -488,6 +488,21 @@ object CometIcebergNativeScan extends
CometOperatorSerde[CometBatchScanExec] wit
}
}
+ /** Storage-related property prefixes passed through to native FileIO. */
+ private val storagePropertyPrefixes =
+ Seq("s3.", "gcs.", "adls.", "client.")
+
+ /**
+ * Filters a properties map to only include storage-related keys.
FileIO.properties() may
+ * contain catalog URIs, bearer tokens, and other non-storage settings that
should not be passed
+ * to the native FileIO builder.
+ */
+ def filterStorageProperties(props: Map[String, String]): Map[String, String]
= {
+ props.filter { case (key, _) =>
+ storagePropertyPrefixes.exists(prefix => key.startsWith(prefix))
+ }
+ }
+
/**
* Transforms Hadoop S3A configuration keys to Iceberg FileIO property keys.
*
diff --git
a/spark/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java
b/spark/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java
index 7d5d6ce6b..7b04110d3 100644
--- a/spark/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java
+++ b/spark/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java
@@ -98,6 +98,14 @@ public class RESTCatalogAdapter implements RESTClient {
private final SupportsNamespaces asNamespaceCatalog;
private final ViewCatalog asViewCatalog;
+ // Optional credentials to inject into loadTable responses, simulating REST
catalog
+ // credential vending. When non-empty, these are added to
LoadTableResponse.config().
+ private Map<String, String> vendedCredentials = ImmutableMap.of();
+
+ public void setVendedCredentials(Map<String, String> credentials) {
+ this.vendedCredentials = credentials;
+ }
+
public RESTCatalogAdapter(Catalog catalog) {
this.catalog = catalog;
this.asNamespaceCatalog =
@@ -279,6 +287,26 @@ public class RESTCatalogAdapter implements RESTClient {
@SuppressWarnings({"MethodLength", "checkstyle:CyclomaticComplexity"})
public <T extends RESTResponse> T handleRequest(
Route route, Map<String, String> vars, Object body, Class<T>
responseType) {
+ T response = doHandleRequest(route, vars, body, responseType);
+ // Inject vended credentials into any LoadTableResponse, simulating REST
catalog
+ // credential vending. This covers CREATE_TABLE, LOAD_TABLE, UPDATE_TABLE,
etc.
+ if (!vendedCredentials.isEmpty() && response instanceof LoadTableResponse)
{
+ LoadTableResponse original = (LoadTableResponse) response;
+ @SuppressWarnings("unchecked")
+ T withCreds =
+ (T)
+ LoadTableResponse.builder()
+ .withTableMetadata(original.tableMetadata())
+ .addAllConfig(original.config())
+ .addAllConfig(vendedCredentials)
+ .build();
+ return withCreds;
+ }
+ return response;
+ }
+
+ private <T extends RESTResponse> T doHandleRequest(
+ Route route, Map<String, String> vars, Object body, Class<T>
responseType) {
switch (route) {
case TOKENS:
return castResponse(responseType, handleOAuthRequest(body));
diff --git a/spark/src/test/scala/org/apache/comet/IcebergReadFromS3Suite.scala
b/spark/src/test/scala/org/apache/comet/IcebergReadFromS3Suite.scala
index 00955e629..c1c90adfa 100644
--- a/spark/src/test/scala/org/apache/comet/IcebergReadFromS3Suite.scala
+++ b/spark/src/test/scala/org/apache/comet/IcebergReadFromS3Suite.scala
@@ -23,7 +23,9 @@ import org.apache.spark.SparkConf
import org.apache.spark.sql.comet.CometIcebergNativeScanExec
import org.apache.spark.sql.execution.SparkPlan
-class IcebergReadFromS3Suite extends CometS3TestBase {
+import org.apache.comet.iceberg.RESTCatalogHelper
+
+class IcebergReadFromS3Suite extends CometS3TestBase with RESTCatalogHelper {
override protected val testBucketName = "test-iceberg-bucket"
@@ -227,4 +229,74 @@ class IcebergReadFromS3Suite extends CometS3TestBase {
spark.sql("DROP TABLE s3_catalog.db.mor_delete_test")
}
+
+ test("REST catalog credential vending rejects wrong credentials") {
+ assume(icebergAvailable, "Iceberg not available in classpath")
+
+ val wrongCreds = Map(
+ "s3.access-key-id" -> "WRONG_ACCESS_KEY",
+ "s3.secret-access-key" -> "WRONG_SECRET_KEY",
+ "s3.endpoint" -> minioContainer.getS3URL,
+ "s3.path-style-access" -> "true")
+ val warehouse = s"s3a://$testBucketName/warehouse-bad-creds"
+
+ withRESTCatalog(vendedCredentials = wrongCreds, warehouseLocation =
Some(warehouse)) {
+ (restUri, _, _) =>
+ withSQLConf(
+ "spark.sql.catalog.bad_cat" ->
"org.apache.iceberg.spark.SparkCatalog",
+ "spark.sql.catalog.bad_cat.catalog-impl" ->
"org.apache.iceberg.rest.RESTCatalog",
+ "spark.sql.catalog.bad_cat.uri" -> restUri,
+ "spark.sql.catalog.bad_cat.warehouse" -> warehouse) {
+
+ spark.sql("CREATE NAMESPACE bad_cat.db")
+
+ // CREATE TABLE succeeds (metadata only, no S3 access needed)
+ spark.sql("CREATE TABLE bad_cat.db.test (id INT) USING iceberg")
+
+ // INSERT fails because S3FileIO uses the wrong vended credentials
+ val e = intercept[Exception] {
+ spark.sql("INSERT INTO bad_cat.db.test VALUES (1)")
+ }
+ assert(e.getMessage.contains("403"), s"Expected S3 403 error but
got: ${e.getMessage}")
+ }
+ }
+ }
+
+ test("REST catalog credential vending with native Iceberg scan on S3") {
+ assume(icebergAvailable, "Iceberg not available in classpath")
+
+ val vendedCreds = Map(
+ "s3.access-key-id" -> userName,
+ "s3.secret-access-key" -> password,
+ "s3.endpoint" -> minioContainer.getS3URL,
+ "s3.path-style-access" -> "true")
+ val warehouse = s"s3a://$testBucketName/warehouse-vending"
+
+ withRESTCatalog(vendedCredentials = vendedCreds, warehouseLocation =
Some(warehouse)) {
+ (restUri, _, _) =>
+ withSQLConf(
+ "spark.sql.catalog.vend_cat" ->
"org.apache.iceberg.spark.SparkCatalog",
+ "spark.sql.catalog.vend_cat.catalog-impl" ->
"org.apache.iceberg.rest.RESTCatalog",
+ "spark.sql.catalog.vend_cat.uri" -> restUri,
+ "spark.sql.catalog.vend_cat.warehouse" -> warehouse,
+ CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "true") {
+
+ spark.sql("CREATE NAMESPACE vend_cat.db")
+
+ spark.sql("""
+ CREATE TABLE vend_cat.db.simple (
+ id INT, name STRING, value DOUBLE
+ ) USING iceberg
+ """)
+ spark.sql("""
+ INSERT INTO vend_cat.db.simple
+ VALUES (1, 'Alice', 10.5), (2, 'Bob', 20.3), (3, 'Charlie', 30.7)
+ """)
+ checkIcebergNativeScan("SELECT * FROM vend_cat.db.simple ORDER BY
id")
+
+ spark.sql("DROP TABLE vend_cat.db.simple")
+ spark.sql("DROP NAMESPACE vend_cat.db")
+ }
+ }
+ }
}
diff --git
a/spark/src/test/spark-3.x/org/apache/comet/iceberg/RESTCatalogHelper.scala
b/spark/src/test/spark-3.x/org/apache/comet/iceberg/RESTCatalogHelper.scala
index 6230ee33e..856700c2c 100644
--- a/spark/src/test/spark-3.x/org/apache/comet/iceberg/RESTCatalogHelper.scala
+++ b/spark/src/test/spark-3.x/org/apache/comet/iceberg/RESTCatalogHelper.scala
@@ -26,7 +26,22 @@ import java.nio.file.Files
trait RESTCatalogHelper {
/** Helper to set up REST catalog with embedded Jetty server (Spark 3.x /
Jetty 9.4) */
- def withRESTCatalog(f: (String, org.eclipse.jetty.server.Server, File) =>
Unit): Unit = {
+ def withRESTCatalog(f: (String, org.eclipse.jetty.server.Server, File) =>
Unit): Unit =
+ withRESTCatalog()(f)
+
+ /**
+ * Helper to set up REST catalog with optional credential vending.
+ *
+ * @param vendedCredentials
+ * Storage credentials to inject into loadTable responses, simulating REST
catalog credential
+ * vending. When non-empty, these are added to every
LoadTableResponse.config().
+ * @param warehouseLocation
+ * Override the warehouse location (e.g., for S3). Defaults to a local
temp directory.
+ */
+ def withRESTCatalog(
+ vendedCredentials: Map[String, String] = Map.empty,
+ warehouseLocation: Option[String] = None)(
+ f: (String, org.eclipse.jetty.server.Server, File) => Unit): Unit = {
import org.apache.iceberg.inmemory.InMemoryCatalog
import org.apache.iceberg.CatalogProperties
import org.apache.iceberg.rest.{RESTCatalogAdapter, RESTCatalogServlet}
@@ -35,12 +50,18 @@ trait RESTCatalogHelper {
import org.eclipse.jetty.server.handler.gzip.GzipHandler
val warehouseDir =
Files.createTempDirectory("comet-rest-catalog-test").toFile
+ val effectiveWarehouse =
warehouseLocation.getOrElse(warehouseDir.getAbsolutePath)
+
val backendCatalog = new InMemoryCatalog()
backendCatalog.initialize(
"in-memory",
- java.util.Map.of(CatalogProperties.WAREHOUSE_LOCATION,
warehouseDir.getAbsolutePath))
+ java.util.Map.of(CatalogProperties.WAREHOUSE_LOCATION,
effectiveWarehouse))
val adapter = new RESTCatalogAdapter(backendCatalog)
+ if (vendedCredentials.nonEmpty) {
+ import scala.jdk.CollectionConverters._
+ adapter.setVendedCredentials(vendedCredentials.asJava)
+ }
val servlet = new RESTCatalogServlet(adapter)
val servletContext = new
ServletContextHandler(ServletContextHandler.NO_SESSIONS)
diff --git
a/spark/src/test/spark-4.0/org/apache/comet/iceberg/RESTCatalogHelper.scala
b/spark/src/test/spark-4.0/org/apache/comet/iceberg/RESTCatalogHelper.scala
index ccd03c544..bd53804b8 100644
--- a/spark/src/test/spark-4.0/org/apache/comet/iceberg/RESTCatalogHelper.scala
+++ b/spark/src/test/spark-4.0/org/apache/comet/iceberg/RESTCatalogHelper.scala
@@ -26,7 +26,22 @@ import java.nio.file.Files
trait RESTCatalogHelper {
/** Helper to set up REST catalog with embedded Jetty server (Spark 4.0 /
Jetty 11) */
- def withRESTCatalog(f: (String, org.eclipse.jetty.server.Server, File) =>
Unit): Unit = {
+ def withRESTCatalog(f: (String, org.eclipse.jetty.server.Server, File) =>
Unit): Unit =
+ withRESTCatalog()(f)
+
+ /**
+ * Helper to set up REST catalog with optional credential vending.
+ *
+ * @param vendedCredentials
+ * Storage credentials to inject into loadTable responses, simulating REST
catalog credential
+ * vending. When non-empty, these are added to every
LoadTableResponse.config().
+ * @param warehouseLocation
+ * Override the warehouse location (e.g., for S3). Defaults to a local
temp directory.
+ */
+ def withRESTCatalog(
+ vendedCredentials: Map[String, String] = Map.empty,
+ warehouseLocation: Option[String] = None)(
+ f: (String, org.eclipse.jetty.server.Server, File) => Unit): Unit = {
import org.apache.iceberg.inmemory.InMemoryCatalog
import org.apache.iceberg.CatalogProperties
import org.apache.iceberg.rest.{RESTCatalogAdapter, RESTCatalogServlet}
@@ -35,12 +50,18 @@ trait RESTCatalogHelper {
import org.eclipse.jetty.server.handler.gzip.GzipHandler
val warehouseDir =
Files.createTempDirectory("comet-rest-catalog-test").toFile
+ val effectiveWarehouse =
warehouseLocation.getOrElse(warehouseDir.getAbsolutePath)
+
val backendCatalog = new InMemoryCatalog()
backendCatalog.initialize(
"in-memory",
- java.util.Map.of(CatalogProperties.WAREHOUSE_LOCATION,
warehouseDir.getAbsolutePath))
+ java.util.Map.of(CatalogProperties.WAREHOUSE_LOCATION,
effectiveWarehouse))
val adapter = new RESTCatalogAdapter(backendCatalog)
+ if (vendedCredentials.nonEmpty) {
+ import scala.jdk.CollectionConverters._
+ adapter.setVendedCredentials(vendedCredentials.asJava)
+ }
val servlet = new RESTCatalogServlet(adapter)
val servletContext = new
ServletContextHandler(ServletContextHandler.NO_SESSIONS)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]