This is an automated email from the ASF dual-hosted git repository.

yufei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git


The following commit(s) were added to refs/heads/main by this push:
     new 2a1070f86 Add a Spark session builder for the tests (#1985)
2a1070f86 is described below

commit 2a1070f86e943214d7571fa1e9673e1ef3a4def4
Author: Yufei Gu <[email protected]>
AuthorDate: Wed Jul 2 13:55:00 2025 -0700

    Add a Spark session builder for the tests (#1985)
---
 .../it/ext/PolarisSparkIntegrationTestBase.java    |  43 +----
 .../service/it/ext/SparkSessionBuilder.java        | 213 +++++++++++++++++++++
 .../spark/quarkus/it/SparkCatalogIcebergIT.java    |  28 +--
 .../spark/quarkus/it/SparkIntegrationBase.java     |  43 +----
 4 files changed, 236 insertions(+), 91 deletions(-)

diff --git 
a/integration-tests/src/main/java/org/apache/polaris/service/it/ext/PolarisSparkIntegrationTestBase.java
 
b/integration-tests/src/main/java/org/apache/polaris/service/it/ext/PolarisSparkIntegrationTestBase.java
index 9ab63aa50..466af5347 100644
--- 
a/integration-tests/src/main/java/org/apache/polaris/service/it/ext/PolarisSparkIntegrationTestBase.java
+++ 
b/integration-tests/src/main/java/org/apache/polaris/service/it/ext/PolarisSparkIntegrationTestBase.java
@@ -159,45 +159,18 @@ public abstract class PolarisSparkIntegrationTestBase {
 
     managementApi.createCatalog(externalCatalog);
 
-    SparkSession.Builder sessionBuilder =
-        SparkSession.builder()
-            .master("local[1]")
-            .config("spark.hadoop.fs.s3.impl", 
"org.apache.hadoop.fs.s3a.S3AFileSystem")
-            .config(
-                "spark.hadoop.fs.s3.aws.credentials.provider",
-                "org.apache.hadoop.fs.s3.TemporaryAWSCredentialsProvider")
-            .config("spark.hadoop.fs.s3.access.key", "foo")
-            .config("spark.hadoop.fs.s3.secret.key", "bar")
-            .config(
-                "spark.sql.extensions",
-                
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
-            .config("spark.ui.showConsoleProgress", false)
-            .config("spark.ui.enabled", "false");
-    spark =
-        withCatalog(withCatalog(sessionBuilder, catalogName), 
externalCatalogName).getOrCreate();
+    spark = buildSparkSession();
 
     onSpark("USE " + catalogName);
   }
 
-  protected SparkSession.Builder withCatalog(SparkSession.Builder builder, 
String catalogName) {
-    return builder
-        .config(
-            String.format("spark.sql.catalog.%s", catalogName),
-            "org.apache.iceberg.spark.SparkCatalog")
-        .config("spark.sql.warehouse.dir", warehouseDir.toString())
-        .config(String.format("spark.sql.catalog.%s.type", catalogName), 
"rest")
-        .config(
-            String.format("spark.sql.catalog.%s.uri", catalogName),
-            endpoints.catalogApiEndpoint().toString())
-        .config(String.format("spark.sql.catalog.%s.warehouse", catalogName), 
catalogName)
-        .config(String.format("spark.sql.catalog.%s.scope", catalogName), 
"PRINCIPAL_ROLE:ALL")
-        .config(
-            String.format("spark.sql.catalog.%s.header.realm", catalogName), 
endpoints.realmId())
-        .config(String.format("spark.sql.catalog.%s.token", catalogName), 
sparkToken)
-        .config(String.format("spark.sql.catalog.%s.s3.access-key-id", 
catalogName), "fakekey")
-        .config(
-            String.format("spark.sql.catalog.%s.s3.secret-access-key", 
catalogName), "fakesecret")
-        .config(String.format("spark.sql.catalog.%s.s3.region", catalogName), 
"us-west-2");
+  protected SparkSession buildSparkSession() {
+    return SparkSessionBuilder.buildWithTestDefaults()
+        .withWarehouse(warehouseDir)
+        .addCatalog(catalogName, "org.apache.iceberg.spark.SparkCatalog", 
endpoints, sparkToken)
+        .addCatalog(
+            externalCatalogName, "org.apache.iceberg.spark.SparkCatalog", 
endpoints, sparkToken)
+        .getOrCreate();
   }
 
   @AfterEach
diff --git 
a/integration-tests/src/main/java/org/apache/polaris/service/it/ext/SparkSessionBuilder.java
 
b/integration-tests/src/main/java/org/apache/polaris/service/it/ext/SparkSessionBuilder.java
new file mode 100644
index 000000000..5646e4e4b
--- /dev/null
+++ 
b/integration-tests/src/main/java/org/apache/polaris/service/it/ext/SparkSessionBuilder.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.it.ext;
+
+import com.google.common.base.Preconditions;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.polaris.service.it.env.PolarisApiEndpoints;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * A fluent builder for configuring SparkSession instances with Polaris 
catalogs.
+ *
+ * <p>This builder creates a SparkSession with sensible test defaults and 
allows easy configuration
+ * of multiple Polaris catalogs. The resulting SparkSession will be configured 
for local execution
+ * with S3Mock support for testing.
+ *
+ * <p>Example usage:
+ *
+ * <pre>
+ * SparkSession session = SparkSessionBuilder
+ *     .buildWithTestDefaults()
+ *     
.withExtensions("org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
+ *     .withWarehouse(warehouseUri)
+ *     .addCatalog("catalog1", "org.apache.iceberg.spark.SparkCatalog", 
endpoints, token)
+ *     .addCatalog("catalog2", "org.apache.polaris.spark.SparkCatalog", 
endpoints, token)
+ *     .createSession();
+ * </pre>
+ *
+ * <p>The final SparkSession will be configured with:
+ *
+ * <ul>
+ *   <li>Local master execution (local[1])
+ *   <li>Disabled Spark UI for clean test output
+ *   <li>S3A filesystem with mock credentials (foo/bar)
+ *   <li>Multiple Polaris catalogs with REST endpoints
+ *   <li>Iceberg extensions for table format support
+ *   <li>Custom warehouse directory location
+ * </ul>
+ *
+ * <p>Each catalog will be configured as:
+ *
+ * <pre>
+ * spark.sql.catalog.{catalogName} = {catalogType}
+ * spark.sql.catalog.{catalogName}.type = rest
+ * spark.sql.catalog.{catalogName}.uri = {polarisEndpoint}
+ * spark.sql.catalog.{catalogName}.token = {authToken}
+ * spark.sql.catalog.{catalogName}.warehouse = {catalogName}
+ * spark.sql.catalog.{catalogName}.scope = PRINCIPAL_ROLE:ALL
+ * </pre>
+ */
+public class SparkSessionBuilder {
+  private final SparkSession.Builder builder;
+  private final List<CatalogConfig> catalogs = new ArrayList<>();
+  private final List<ConfigPair> additionalConfigs = new ArrayList<>();
+
+  private String extensions = 
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions";
+  private URI warehouseDir;
+
+  private SparkSessionBuilder(SparkSession.Builder builder) {
+    this.builder = builder;
+  }
+
+  /**
+   * Create a SparkSessionBuilder with common test defaults
+   *
+   * @return new builder instance with test defaults
+   */
+  public static SparkSessionBuilder buildWithTestDefaults() {
+    // local master
+    var builder = SparkSession.builder();
+    builder.master(String.format("local[%d]", 1));
+    // disable UI
+    builder.config("spark.ui.showConsoleProgress", "false");
+    builder.config("spark.ui.enabled", "false");
+
+    var sparkSessionbuilder = new SparkSessionBuilder(builder);
+    sparkSessionbuilder.withS3MockContainer();
+    return sparkSessionbuilder;
+  }
+
+  private void withS3MockContainer() {
+    withConfig("spark.hadoop.fs.s3.impl", 
"org.apache.hadoop.fs.s3a.S3AFileSystem")
+        .withConfig(
+            "spark.hadoop.fs.s3.aws.credentials.provider",
+            "org.apache.hadoop.fs.s3.TemporaryAWSCredentialsProvider")
+        .withConfig("spark.hadoop.fs.s3.access.key", "foo")
+        .withConfig("spark.hadoop.fs.s3.secret.key", "bar");
+  }
+
+  public SparkSessionBuilder withWarehouse(URI warehouseDir) {
+    this.warehouseDir = warehouseDir;
+    return this;
+  }
+
+  public SparkSessionBuilder withExtensions(String extensions) {
+    this.extensions = extensions;
+    return this;
+  }
+
+  public SparkSessionBuilder addCatalog(
+      String catalogName, String catalogImplClass, PolarisApiEndpoints 
endpoints, String token) {
+    this.catalogs.add(new CatalogConfig(catalogName, catalogImplClass, 
endpoints, token));
+    return this;
+  }
+
+  public SparkSessionBuilder withConfig(String key, String value) {
+    this.additionalConfigs.add(new ConfigPair(key, value));
+    return this;
+  }
+
+  public SparkSession getOrCreate() {
+    if (extensions != null) {
+      builder.config("spark.sql.extensions", extensions);
+    }
+
+    if (warehouseDir != null) {
+      builder.config("spark.sql.warehouse.dir", warehouseDir.toString());
+    }
+
+    // Apply catalog configurations
+    applyCatalogConfigurations();
+
+    // Apply additional configurations
+    applyAdditionalConfigurations();
+
+    return builder.getOrCreate();
+  }
+
+  private void applyCatalogConfigurations() {
+    for (CatalogConfig catalog : catalogs) {
+      applySingleCatalogConfig(catalog);
+    }
+  }
+
+  private void applySingleCatalogConfig(CatalogConfig catalog) {
+    // Basic catalog configuration
+    builder
+        .config(
+            String.format("spark.sql.catalog.%s", catalog.catalogName), 
catalog.catalogImplClass)
+        .config(String.format("spark.sql.catalog.%s.type", 
catalog.catalogName), "rest")
+        .config(
+            String.format("spark.sql.catalog.%s.warehouse", 
catalog.catalogName),
+            catalog.catalogName)
+        .config(
+            String.format("spark.sql.catalog.%s.scope", catalog.catalogName), 
"PRINCIPAL_ROLE:ALL");
+
+    // Add endpoint configuration
+    Preconditions.checkNotNull(catalog.endpoints, "endpoints is required");
+    builder
+        .config(
+            String.format("spark.sql.catalog.%s.uri", catalog.catalogName),
+            catalog.endpoints.catalogApiEndpoint().toString())
+        .config(
+            String.format("spark.sql.catalog.%s.header.realm", 
catalog.catalogName),
+            catalog.endpoints.realmId());
+
+    // Add token configuration
+    if (catalog.token != null) {
+      builder.config(
+          String.format("spark.sql.catalog.%s.token", catalog.catalogName), 
catalog.token);
+    }
+  }
+
+  private void applyAdditionalConfigurations() {
+    for (ConfigPair config : additionalConfigs) {
+      builder.config(config.key, config.value);
+    }
+  }
+
+  private static class ConfigPair {
+    final String key;
+    final String value;
+
+    ConfigPair(String key, String value) {
+      this.key = key;
+      this.value = value;
+    }
+  }
+
+  /** Configuration for a single catalog. */
+  private static class CatalogConfig {
+    final String catalogName;
+    final String catalogImplClass;
+    final PolarisApiEndpoints endpoints;
+    final String token;
+
+    CatalogConfig(
+        String catalogName, String catalogImplClass, PolarisApiEndpoints 
endpoints, String token) {
+      this.catalogName = catalogName;
+      this.catalogImplClass = catalogImplClass;
+      this.endpoints = endpoints;
+      this.token = token;
+    }
+  }
+}
diff --git 
a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogIcebergIT.java
 
b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogIcebergIT.java
index d9182e6e8..812d8f19d 100644
--- 
a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogIcebergIT.java
+++ 
b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogIcebergIT.java
@@ -19,33 +19,17 @@
 package org.apache.polaris.spark.quarkus.it;
 
 import io.quarkus.test.junit.QuarkusIntegrationTest;
+import org.apache.polaris.service.it.ext.SparkSessionBuilder;
 import org.apache.spark.sql.SparkSession;
 
 @QuarkusIntegrationTest
 public class SparkCatalogIcebergIT extends SparkCatalogBaseIT {
   /** Initialize the spark catalog to use the iceberg spark catalog. */
   @Override
-  protected SparkSession.Builder withCatalog(SparkSession.Builder builder, 
String catalogName) {
-    return builder
-        .config(
-            "spark.sql.extensions",
-            
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
-        .config(
-            String.format("spark.sql.catalog.%s", catalogName),
-            "org.apache.iceberg.spark.SparkCatalog")
-        .config("spark.sql.warehouse.dir", warehouseDir.toString())
-        .config(String.format("spark.sql.catalog.%s.type", catalogName), 
"rest")
-        .config(
-            String.format("spark.sql.catalog.%s.uri", catalogName),
-            endpoints.catalogApiEndpoint().toString())
-        .config(String.format("spark.sql.catalog.%s.warehouse", catalogName), 
catalogName)
-        .config(String.format("spark.sql.catalog.%s.scope", catalogName), 
"PRINCIPAL_ROLE:ALL")
-        .config(
-            String.format("spark.sql.catalog.%s.header.realm", catalogName), 
endpoints.realmId())
-        .config(String.format("spark.sql.catalog.%s.token", catalogName), 
sparkToken)
-        .config(String.format("spark.sql.catalog.%s.s3.access-key-id", 
catalogName), "fakekey")
-        .config(
-            String.format("spark.sql.catalog.%s.s3.secret-access-key", 
catalogName), "fakesecret")
-        .config(String.format("spark.sql.catalog.%s.s3.region", catalogName), 
"us-west-2");
+  protected SparkSession buildSparkSession() {
+    return SparkSessionBuilder.buildWithTestDefaults()
+        .withWarehouse(warehouseDir)
+        .addCatalog(catalogName, "org.apache.iceberg.spark.SparkCatalog", 
endpoints, sparkToken)
+        .getOrCreate();
   }
 }
diff --git 
a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIntegrationBase.java
 
b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIntegrationBase.java
index 8d16c36ad..bd30997d9 100644
--- 
a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIntegrationBase.java
+++ 
b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIntegrationBase.java
@@ -43,6 +43,7 @@ import 
org.apache.polaris.service.it.env.IntegrationTestsHelper;
 import org.apache.polaris.service.it.env.ManagementApi;
 import org.apache.polaris.service.it.env.PolarisApiEndpoints;
 import org.apache.polaris.service.it.ext.PolarisIntegrationTestExtension;
+import org.apache.polaris.service.it.ext.SparkSessionBuilder;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
@@ -129,46 +130,20 @@ public abstract class SparkIntegrationBase {
 
     managementApi.createCatalog(catalog);
 
-    SparkSession.Builder sessionBuilder =
-        SparkSession.builder()
-            .master("local[1]")
-            .config("spark.hadoop.fs.s3.impl", 
"org.apache.hadoop.fs.s3a.S3AFileSystem")
-            .config(
-                "spark.hadoop.fs.s3.aws.credentials.provider",
-                "org.apache.hadoop.fs.s3.TemporaryAWSCredentialsProvider")
-            .config("spark.hadoop.fs.s3.access.key", "foo")
-            .config("spark.hadoop.fs.s3.secret.key", "bar")
-            .config("spark.ui.showConsoleProgress", false)
-            .config("spark.ui.enabled", "false");
-    spark = withCatalog(sessionBuilder, catalogName).getOrCreate();
+    spark = buildSparkSession();
 
     onSpark("USE " + catalogName);
   }
 
-  protected SparkSession.Builder withCatalog(SparkSession.Builder builder, 
String catalogName) {
-    return builder
-        .config(
-            "spark.sql.extensions",
+  protected SparkSession buildSparkSession() {
+    return SparkSessionBuilder.buildWithTestDefaults()
+        .withExtensions(
             
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,io.delta.sql.DeltaSparkSessionExtension")
-        .config(
+        .withConfig(
             "spark.sql.catalog.spark_catalog", 
"org.apache.spark.sql.delta.catalog.DeltaCatalog")
-        .config(
-            String.format("spark.sql.catalog.%s", catalogName),
-            "org.apache.polaris.spark.SparkCatalog")
-        .config("spark.sql.warehouse.dir", warehouseDir.toString())
-        .config(String.format("spark.sql.catalog.%s.type", catalogName), 
"rest")
-        .config(
-            String.format("spark.sql.catalog.%s.uri", catalogName),
-            endpoints.catalogApiEndpoint().toString())
-        .config(String.format("spark.sql.catalog.%s.warehouse", catalogName), 
catalogName)
-        .config(String.format("spark.sql.catalog.%s.scope", catalogName), 
"PRINCIPAL_ROLE:ALL")
-        .config(
-            String.format("spark.sql.catalog.%s.header.realm", catalogName), 
endpoints.realmId())
-        .config(String.format("spark.sql.catalog.%s.token", catalogName), 
sparkToken)
-        .config(String.format("spark.sql.catalog.%s.s3.access-key-id", 
catalogName), "fakekey")
-        .config(
-            String.format("spark.sql.catalog.%s.s3.secret-access-key", 
catalogName), "fakesecret")
-        .config(String.format("spark.sql.catalog.%s.s3.region", catalogName), 
"us-west-2");
+        .withWarehouse(warehouseDir)
+        .addCatalog(catalogName, "org.apache.polaris.spark.SparkCatalog", 
endpoints, sparkToken)
+        .getOrCreate();
   }
 
   @AfterEach

Reply via email to