This is an automated email from the ASF dual-hosted git repository.

emaynard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git


The following commit(s) were added to refs/heads/main by this push:
     new 24683d86b Implement federation to HadoopCatalog (#1466)
24683d86b is described below

commit 24683d86b46c63a198b2f7b42725037f32d603b8
Author: Eric Maynard <[email protected]>
AuthorDate: Fri May 9 15:18:05 2025 -0700

    Implement federation to HadoopCatalog (#1466)
    
    * wip
    
    * quarkus fixes
    
    * autolint
    
    * hadoop impl
    
    * autolint
    
    * Refactors
    
    * refactored
    
    * autolint
    
    * add config
    
    * autolint
    
    * stable
    
    * Remove breakpoint anchor
    
    * add line to application.properties
    
    * yank HADOOP
    
    * autolint
---
 .../polaris/core/config/FeatureConfiguration.java  |  8 +++
 .../connection/AuthenticationParametersDpo.java    |  1 +
 .../core/connection/ConnectionConfigInfoDpo.java   | 20 +++++++-
 .../polaris/core/connection/ConnectionType.java    |  1 +
 .../HadoopConnectionConfigInfoDpo.java}            | 59 +++++++++++-----------
 .../IcebergCatalogPropertiesProvider.java          |  2 +-
 .../IcebergRestConnectionConfigInfoDpo.java        |  5 +-
 .../src/main/resources/application.properties      |  1 +
 .../quarkus/admin/PolarisAuthzTestBase.java        |  4 +-
 .../catalog/IcebergCatalogHandlerAuthzTest.java    |  4 +-
 .../polaris/service/admin/PolarisServiceImpl.java  | 27 ++++++++++
 .../catalog/iceberg/IcebergCatalogAdapter.java     |  2 +-
 .../catalog/iceberg/IcebergCatalogHandler.java     | 13 ++++-
 .../{ => catalog}/CallContextCatalogFactory.java   |  5 +-
 .../PolarisCallContextCatalogFactory.java          |  5 +-
 .../org/apache/polaris/service/TestServices.java   |  4 +-
 spec/polaris-management-service.yml                | 12 +++++
 17 files changed, 127 insertions(+), 46 deletions(-)

diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java
index 26437a8f6..64e2ae1a3 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java
@@ -21,6 +21,7 @@ package org.apache.polaris.core.config;
 import java.util.List;
 import java.util.Optional;
 import org.apache.polaris.core.admin.model.StorageConfigInfo;
+import org.apache.polaris.core.connection.ConnectionType;
 import org.apache.polaris.core.context.CallContext;
 import org.apache.polaris.core.persistence.cache.EntityWeigher;
 
@@ -241,4 +242,11 @@ public class FeatureConfiguration<T> extends 
PolarisConfiguration<T> {
           .description("If true, the policy-store endpoints are enabled")
           .defaultValue(true)
           .buildFeatureConfiguration();
+
+  public static final FeatureConfiguration<List<String>> 
SUPPORTED_CATALOG_CONNECTION_TYPES =
+      PolarisConfiguration.<List<String>>builder()
+          .key("SUPPORTED_CATALOG_CONNECTION_TYPES")
+          .description("The list of supported catalog connection types for 
federation")
+          .defaultValue(List.of(ConnectionType.ICEBERG_REST.name()))
+          .buildFeatureConfiguration();
 }
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/connection/AuthenticationParametersDpo.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/connection/AuthenticationParametersDpo.java
index bf07c4b79..f2267b12a 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/connection/AuthenticationParametersDpo.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/connection/AuthenticationParametersDpo.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import org.apache.polaris.core.admin.model.AuthenticationParameters;
 import org.apache.polaris.core.admin.model.BearerAuthenticationParameters;
 import org.apache.polaris.core.admin.model.OAuthClientCredentialsParameters;
+import 
org.apache.polaris.core.connection.iceberg.IcebergCatalogPropertiesProvider;
 import org.apache.polaris.core.secrets.UserSecretReference;
 
 /**
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/connection/ConnectionConfigInfoDpo.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/connection/ConnectionConfigInfoDpo.java
index 482dc00fc..4313ede12 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/connection/ConnectionConfigInfoDpo.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/connection/ConnectionConfigInfoDpo.java
@@ -33,7 +33,11 @@ import java.net.URL;
 import java.util.Map;
 import org.apache.polaris.core.PolarisDiagnostics;
 import org.apache.polaris.core.admin.model.ConnectionConfigInfo;
+import org.apache.polaris.core.admin.model.HadoopConnectionConfigInfo;
 import org.apache.polaris.core.admin.model.IcebergRestConnectionConfigInfo;
+import org.apache.polaris.core.connection.hadoop.HadoopConnectionConfigInfoDpo;
+import 
org.apache.polaris.core.connection.iceberg.IcebergCatalogPropertiesProvider;
+import 
org.apache.polaris.core.connection.iceberg.IcebergRestConnectionConfigInfoDpo;
 import org.apache.polaris.core.secrets.UserSecretReference;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,6 +52,7 @@ import org.slf4j.LoggerFactory;
     property = "connectionTypeCode")
 @JsonSubTypes({
   @JsonSubTypes.Type(value = IcebergRestConnectionConfigInfoDpo.class, name = 
"1"),
+  @JsonSubTypes.Type(value = HadoopConnectionConfigInfoDpo.class, name = "2"),
 })
 public abstract class ConnectionConfigInfoDpo implements 
IcebergCatalogPropertiesProvider {
   private static final Logger logger = 
LoggerFactory.getLogger(ConnectionConfigInfoDpo.class);
@@ -140,11 +145,12 @@ public abstract class ConnectionConfigInfoDpo implements 
IcebergCatalogPropertie
       ConnectionConfigInfo connectionConfigurationModel,
       Map<String, UserSecretReference> secretReferences) {
     ConnectionConfigInfoDpo config = null;
+    final AuthenticationParametersDpo authenticationParameters;
     switch (connectionConfigurationModel.getConnectionType()) {
       case ICEBERG_REST:
         IcebergRestConnectionConfigInfo icebergRestConfigModel =
             (IcebergRestConnectionConfigInfo) connectionConfigurationModel;
-        AuthenticationParametersDpo authenticationParameters =
+        authenticationParameters =
             
AuthenticationParametersDpo.fromAuthenticationParametersModelWithSecrets(
                 icebergRestConfigModel.getAuthenticationParameters(), 
secretReferences);
         config =
@@ -153,6 +159,18 @@ public abstract class ConnectionConfigInfoDpo implements 
IcebergCatalogPropertie
                 authenticationParameters,
                 icebergRestConfigModel.getRemoteCatalogName());
         break;
+      case HADOOP:
+        HadoopConnectionConfigInfo hadoopConfigModel =
+            (HadoopConnectionConfigInfo) connectionConfigurationModel;
+        authenticationParameters =
+            
AuthenticationParametersDpo.fromAuthenticationParametersModelWithSecrets(
+                hadoopConfigModel.getAuthenticationParameters(), 
secretReferences);
+        config =
+            new HadoopConnectionConfigInfoDpo(
+                hadoopConfigModel.getUri(),
+                authenticationParameters,
+                hadoopConfigModel.getWarehouse());
+        break;
       default:
         throw new IllegalStateException(
             "Unsupported connection type: " + 
connectionConfigurationModel.getConnectionType());
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/connection/ConnectionType.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/connection/ConnectionType.java
index 6d4a419a5..3c2d4430a 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/connection/ConnectionType.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/connection/ConnectionType.java
@@ -32,6 +32,7 @@ import java.util.Arrays;
 public enum ConnectionType {
   NULL_TYPE(0),
   ICEBERG_REST(1),
+  HADOOP(2),
   ;
 
   private static final ConnectionType[] REVERSE_MAPPING_ARRAY;
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/connection/IcebergRestConnectionConfigInfoDpo.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/connection/hadoop/HadoopConnectionConfigInfoDpo.java
similarity index 67%
copy from 
polaris-core/src/main/java/org/apache/polaris/core/connection/IcebergRestConnectionConfigInfoDpo.java
copy to 
polaris-core/src/main/java/org/apache/polaris/core/connection/hadoop/HadoopConnectionConfigInfoDpo.java
index 11fac0e22..5f29482c1 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/connection/IcebergRestConnectionConfigInfoDpo.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/connection/hadoop/HadoopConnectionConfigInfoDpo.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.polaris.core.connection;
+package org.apache.polaris.core.connection.hadoop;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.MoreObjects;
@@ -26,30 +26,41 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.polaris.core.admin.model.ConnectionConfigInfo;
-import org.apache.polaris.core.admin.model.IcebergRestConnectionConfigInfo;
+import org.apache.polaris.core.admin.model.HadoopConnectionConfigInfo;
+import org.apache.polaris.core.connection.AuthenticationParametersDpo;
+import org.apache.polaris.core.connection.ConnectionConfigInfoDpo;
+import org.apache.polaris.core.connection.ConnectionType;
 import org.apache.polaris.core.secrets.UserSecretsManager;
 
 /**
- * The internal persistence-object counterpart to 
IcebergRestConnectionConfigInfo defined in the API
- * model.
+ * The internal persistence-object counterpart to {@link
+ * org.apache.polaris.core.admin.model.HadoopConnectionConfigInfo} defined in 
the API model.
  */
-public class IcebergRestConnectionConfigInfoDpo extends ConnectionConfigInfoDpo
-    implements IcebergCatalogPropertiesProvider {
+public class HadoopConnectionConfigInfoDpo extends ConnectionConfigInfoDpo {
 
-  private final String remoteCatalogName;
+  private final String warehouse;
 
-  public IcebergRestConnectionConfigInfoDpo(
+  public HadoopConnectionConfigInfoDpo(
       @JsonProperty(value = "uri", required = true) @Nonnull String uri,
       @JsonProperty(value = "authenticationParameters", required = true) 
@Nonnull
           AuthenticationParametersDpo authenticationParameters,
-      @JsonProperty(value = "remoteCatalogName", required = false) @Nullable
-          String remoteCatalogName) {
-    super(ConnectionType.ICEBERG_REST.getCode(), uri, 
authenticationParameters);
-    this.remoteCatalogName = remoteCatalogName;
+      @JsonProperty(value = "warehouse", required = false) @Nullable String 
remoteCatalogName) {
+    super(ConnectionType.HADOOP.getCode(), uri, authenticationParameters);
+    this.warehouse = remoteCatalogName;
   }
 
-  public String getRemoteCatalogName() {
-    return remoteCatalogName;
+  public String getWarehouse() {
+    return warehouse;
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .add("connectionTypeCode", getConnectionTypeCode())
+        .add("uri", getUri())
+        .add("warehouse", getWarehouse())
+        .add("authenticationParameters", 
getAuthenticationParameters().toString())
+        .toString();
   }
 
   @Override
@@ -57,8 +68,8 @@ public class IcebergRestConnectionConfigInfoDpo extends 
ConnectionConfigInfoDpo
       UserSecretsManager secretsManager) {
     HashMap<String, String> properties = new HashMap<>();
     properties.put(CatalogProperties.URI, getUri());
-    if (getRemoteCatalogName() != null) {
-      properties.put(CatalogProperties.WAREHOUSE_LOCATION, 
getRemoteCatalogName());
+    if (getWarehouse() != null) {
+      properties.put(CatalogProperties.WAREHOUSE_LOCATION, getWarehouse());
     }
     
properties.putAll(getAuthenticationParameters().asIcebergCatalogProperties(secretsManager));
     return properties;
@@ -66,22 +77,12 @@ public class IcebergRestConnectionConfigInfoDpo extends 
ConnectionConfigInfoDpo
 
   @Override
   public ConnectionConfigInfo asConnectionConfigInfoModel() {
-    return IcebergRestConnectionConfigInfo.builder()
-        
.setConnectionType(ConnectionConfigInfo.ConnectionTypeEnum.ICEBERG_REST)
+    return HadoopConnectionConfigInfo.builder()
+        .setConnectionType(ConnectionConfigInfo.ConnectionTypeEnum.HADOOP)
         .setUri(getUri())
-        .setRemoteCatalogName(getRemoteCatalogName())
+        .setWarehouse(getWarehouse())
         .setAuthenticationParameters(
             getAuthenticationParameters().asAuthenticationParametersModel())
         .build();
   }
-
-  @Override
-  public String toString() {
-    return MoreObjects.toStringHelper(this)
-        .add("connectionTypeCode", getConnectionTypeCode())
-        .add("uri", getUri())
-        .add("remoteCatalogName", getRemoteCatalogName())
-        .add("authenticationParameters", 
getAuthenticationParameters().toString())
-        .toString();
-  }
 }
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/connection/IcebergCatalogPropertiesProvider.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/connection/iceberg/IcebergCatalogPropertiesProvider.java
similarity index 96%
rename from 
polaris-core/src/main/java/org/apache/polaris/core/connection/IcebergCatalogPropertiesProvider.java
rename to 
polaris-core/src/main/java/org/apache/polaris/core/connection/iceberg/IcebergCatalogPropertiesProvider.java
index e7955bc61..75af01100 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/connection/IcebergCatalogPropertiesProvider.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/connection/iceberg/IcebergCatalogPropertiesProvider.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.polaris.core.connection;
+package org.apache.polaris.core.connection.iceberg;
 
 import jakarta.annotation.Nonnull;
 import java.util.Map;
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/connection/IcebergRestConnectionConfigInfoDpo.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/connection/iceberg/IcebergRestConnectionConfigInfoDpo.java
similarity index 93%
rename from 
polaris-core/src/main/java/org/apache/polaris/core/connection/IcebergRestConnectionConfigInfoDpo.java
rename to 
polaris-core/src/main/java/org/apache/polaris/core/connection/iceberg/IcebergRestConnectionConfigInfoDpo.java
index 11fac0e22..236dcee29 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/connection/IcebergRestConnectionConfigInfoDpo.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/connection/iceberg/IcebergRestConnectionConfigInfoDpo.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.polaris.core.connection;
+package org.apache.polaris.core.connection.iceberg;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.MoreObjects;
@@ -27,6 +27,9 @@ import java.util.Map;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.polaris.core.admin.model.ConnectionConfigInfo;
 import org.apache.polaris.core.admin.model.IcebergRestConnectionConfigInfo;
+import org.apache.polaris.core.connection.AuthenticationParametersDpo;
+import org.apache.polaris.core.connection.ConnectionConfigInfoDpo;
+import org.apache.polaris.core.connection.ConnectionType;
 import org.apache.polaris.core.secrets.UserSecretsManager;
 
 /**
diff --git a/quarkus/defaults/src/main/resources/application.properties 
b/quarkus/defaults/src/main/resources/application.properties
index 0482f3de0..11f23de1b 100644
--- a/quarkus/defaults/src/main/resources/application.properties
+++ b/quarkus/defaults/src/main/resources/application.properties
@@ -111,6 +111,7 @@ polaris.realm-context.require-header=false
 
polaris.features.defaults."ENFORCE_PRINCIPAL_CREDENTIAL_ROTATION_REQUIRED_CHECKING"=false
 
polaris.features.defaults."SUPPORTED_CATALOG_STORAGE_TYPES"=["S3","GCS","AZURE","FILE"]
 # polaris.features.defaults."ENABLE_CATALOG_FEDERATION"=true
+polaris.features.defaults."SUPPORTED_CATALOG_CONNECTION_TYPES"=["ICEBERG_REST"]
 
 # realm overrides
 # 
polaris.features.realm-overrides."my-realm"."INITIALIZE_DEFAULT_CATALOG_FILEIO_FOR_TEST"=true
diff --git 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java
 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java
index bb32d9915..91f0a33d6 100644
--- 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java
+++ 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java
@@ -87,8 +87,8 @@ import 
org.apache.polaris.service.catalog.policy.PolicyCatalog;
 import org.apache.polaris.service.config.DefaultConfigurationStore;
 import org.apache.polaris.service.config.RealmEntityManagerFactory;
 import org.apache.polaris.service.config.ReservedProperties;
-import org.apache.polaris.service.context.CallContextCatalogFactory;
-import org.apache.polaris.service.context.PolarisCallContextCatalogFactory;
+import org.apache.polaris.service.context.catalog.CallContextCatalogFactory;
+import 
org.apache.polaris.service.context.catalog.PolarisCallContextCatalogFactory;
 import org.apache.polaris.service.events.PolarisEventListener;
 import 
org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl;
 import org.apache.polaris.service.task.TaskExecutor;
diff --git 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogHandlerAuthzTest.java
 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogHandlerAuthzTest.java
index 6a95f7cdb..08dd790e9 100644
--- 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogHandlerAuthzTest.java
+++ 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogHandlerAuthzTest.java
@@ -68,8 +68,8 @@ import 
org.apache.polaris.core.persistence.resolver.PolarisResolutionManifest;
 import org.apache.polaris.service.catalog.iceberg.IcebergCatalogHandler;
 import org.apache.polaris.service.catalog.io.DefaultFileIOFactory;
 import org.apache.polaris.service.config.RealmEntityManagerFactory;
-import org.apache.polaris.service.context.CallContextCatalogFactory;
-import org.apache.polaris.service.context.PolarisCallContextCatalogFactory;
+import org.apache.polaris.service.context.catalog.CallContextCatalogFactory;
+import 
org.apache.polaris.service.context.catalog.PolarisCallContextCatalogFactory;
 import org.apache.polaris.service.http.IfNoneMatch;
 import org.apache.polaris.service.quarkus.admin.PolarisAuthzTestBase;
 import org.apache.polaris.service.types.NotificationRequest;
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/admin/PolarisServiceImpl.java
 
b/service/common/src/main/java/org/apache/polaris/service/admin/PolarisServiceImpl.java
index c0eba7b0a..e4e351a99 100644
--- 
a/service/common/src/main/java/org/apache/polaris/service/admin/PolarisServiceImpl.java
+++ 
b/service/common/src/main/java/org/apache/polaris/service/admin/PolarisServiceImpl.java
@@ -23,6 +23,7 @@ import jakarta.inject.Inject;
 import jakarta.ws.rs.core.Response;
 import jakarta.ws.rs.core.SecurityContext;
 import java.util.List;
+import java.util.Locale;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.NotAuthorizedException;
@@ -37,6 +38,7 @@ import 
org.apache.polaris.core.admin.model.CreateCatalogRequest;
 import org.apache.polaris.core.admin.model.CreateCatalogRoleRequest;
 import org.apache.polaris.core.admin.model.CreatePrincipalRequest;
 import org.apache.polaris.core.admin.model.CreatePrincipalRoleRequest;
+import org.apache.polaris.core.admin.model.ExternalCatalog;
 import org.apache.polaris.core.admin.model.GrantCatalogRoleRequest;
 import org.apache.polaris.core.admin.model.GrantPrincipalRoleRequest;
 import org.apache.polaris.core.admin.model.GrantResource;
@@ -141,6 +143,7 @@ public class PolarisServiceImpl
     PolarisAdminService adminService = newAdminService(realmContext, 
securityContext);
     Catalog catalog = request.getCatalog();
     validateStorageConfig(catalog.getStorageConfigInfo());
+    validateConnectionConfigInfo(catalog);
     Catalog newCatalog = new 
CatalogEntity(adminService.createCatalog(request)).asCatalog();
     LOGGER.info("Created new catalog {}", newCatalog);
     return Response.status(Response.Status.CREATED).build();
@@ -163,6 +166,30 @@ public class PolarisServiceImpl
     }
   }
 
+  private void validateConnectionConfigInfo(Catalog catalog) {
+    if (catalog.getType() == Catalog.TypeEnum.EXTERNAL) {
+      if (catalog instanceof ExternalCatalog externalCatalog) {
+        if (externalCatalog.getConnectionConfigInfo() != null) {
+          String connectionType =
+              
externalCatalog.getConnectionConfigInfo().getConnectionType().name();
+          List<String> supportedConnectionTypes =
+              callContext
+                  .getPolarisCallContext()
+                  .getConfigurationStore()
+                  .getConfiguration(
+                      callContext.getPolarisCallContext(),
+                      FeatureConfiguration.SUPPORTED_CATALOG_CONNECTION_TYPES)
+                  .stream()
+                  .map(s -> s.toUpperCase(Locale.ROOT))
+                  .toList();
+          if (!supportedConnectionTypes.contains(connectionType)) {
+            throw new IllegalStateException("Unsupported connection type: " + 
connectionType);
+          }
+        }
+      }
+    }
+  }
+
   /** From PolarisCatalogsApiService */
   @Override
   public Response deleteCatalog(
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java
 
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java
index 8dec97168..4484dc129 100644
--- 
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java
+++ 
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java
@@ -75,7 +75,7 @@ import 
org.apache.polaris.service.catalog.api.IcebergRestCatalogApiService;
 import 
org.apache.polaris.service.catalog.api.IcebergRestConfigurationApiService;
 import org.apache.polaris.service.catalog.common.CatalogAdapter;
 import org.apache.polaris.service.config.ReservedProperties;
-import org.apache.polaris.service.context.CallContextCatalogFactory;
+import org.apache.polaris.service.context.catalog.CallContextCatalogFactory;
 import org.apache.polaris.service.http.IcebergHttpUtil;
 import org.apache.polaris.service.http.IfNoneMatch;
 import org.apache.polaris.service.types.CommitTableRequest;
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
 
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
index e8912548d..aca12eb6f 100644
--- 
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
+++ 
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
@@ -54,6 +54,7 @@ import org.apache.iceberg.exceptions.BadRequestException;
 import org.apache.iceberg.exceptions.CommitFailedException;
 import org.apache.iceberg.exceptions.ForbiddenException;
 import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hadoop.HadoopCatalog;
 import org.apache.iceberg.rest.CatalogHandlers;
 import org.apache.iceberg.rest.HTTPClient;
 import org.apache.iceberg.rest.RESTCatalog;
@@ -79,7 +80,8 @@ import org.apache.polaris.core.config.FeatureConfiguration;
 import org.apache.polaris.core.config.PolarisConfigurationStore;
 import org.apache.polaris.core.connection.ConnectionConfigInfoDpo;
 import org.apache.polaris.core.connection.ConnectionType;
-import org.apache.polaris.core.connection.IcebergRestConnectionConfigInfoDpo;
+import org.apache.polaris.core.connection.hadoop.HadoopConnectionConfigInfoDpo;
+import 
org.apache.polaris.core.connection.iceberg.IcebergRestConnectionConfigInfoDpo;
 import org.apache.polaris.core.context.CallContext;
 import org.apache.polaris.core.entity.CatalogEntity;
 import org.apache.polaris.core.entity.PolarisEntitySubType;
@@ -97,7 +99,7 @@ import org.apache.polaris.core.storage.PolarisStorageActions;
 import org.apache.polaris.service.catalog.SupportsNotifications;
 import org.apache.polaris.service.catalog.common.CatalogHandler;
 import org.apache.polaris.service.config.ReservedProperties;
-import org.apache.polaris.service.context.CallContextCatalogFactory;
+import org.apache.polaris.service.context.catalog.CallContextCatalogFactory;
 import org.apache.polaris.service.http.IcebergHttpUtil;
 import org.apache.polaris.service.http.IfNoneMatch;
 import org.apache.polaris.service.types.NotificationRequest;
@@ -212,6 +214,7 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
       Catalog federatedCatalog;
       ConnectionType connectionType =
           
ConnectionType.fromCode(connectionConfigInfoDpo.getConnectionTypeCode());
+
       switch (connectionType) {
         case ICEBERG_REST:
           SessionCatalog.SessionContext context = 
SessionCatalog.SessionContext.createEmpty();
@@ -226,6 +229,12 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
               ((IcebergRestConnectionConfigInfoDpo) 
connectionConfigInfoDpo).getRemoteCatalogName(),
               
connectionConfigInfoDpo.asIcebergCatalogProperties(getUserSecretsManager()));
           break;
+        case HADOOP:
+          federatedCatalog = new HadoopCatalog();
+          federatedCatalog.initialize(
+              ((HadoopConnectionConfigInfoDpo) 
connectionConfigInfoDpo).getWarehouse(),
+              
connectionConfigInfoDpo.asIcebergCatalogProperties(getUserSecretsManager()));
+          break;
         default:
           throw new UnsupportedOperationException(
               "Connection type not supported: " + connectionType);
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/context/CallContextCatalogFactory.java
 
b/service/common/src/main/java/org/apache/polaris/service/context/catalog/CallContextCatalogFactory.java
similarity index 90%
rename from 
service/common/src/main/java/org/apache/polaris/service/context/CallContextCatalogFactory.java
rename to 
service/common/src/main/java/org/apache/polaris/service/context/catalog/CallContextCatalogFactory.java
index 7c812d6d3..5a89ed49d 100644
--- 
a/service/common/src/main/java/org/apache/polaris/service/context/CallContextCatalogFactory.java
+++ 
b/service/common/src/main/java/org/apache/polaris/service/context/catalog/CallContextCatalogFactory.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.polaris.service.context;
+package org.apache.polaris.service.context.catalog;
 
 import jakarta.ws.rs.core.SecurityContext;
 import org.apache.iceberg.catalog.Catalog;
@@ -25,6 +25,9 @@ import org.apache.polaris.core.context.CallContext;
 import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifest;
 
 public interface CallContextCatalogFactory {
+
+  static final String WAREHOUSE_LOCATION_BASEDIR = 
"/tmp/iceberg_rest_server_warehouse_data/";
+
   Catalog createCallContextCatalog(
       CallContext context,
       AuthenticatedPolarisPrincipal authenticatedPrincipal,
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/context/PolarisCallContextCatalogFactory.java
 
b/service/common/src/main/java/org/apache/polaris/service/context/catalog/PolarisCallContextCatalogFactory.java
similarity index 97%
rename from 
service/common/src/main/java/org/apache/polaris/service/context/PolarisCallContextCatalogFactory.java
rename to 
service/common/src/main/java/org/apache/polaris/service/context/catalog/PolarisCallContextCatalogFactory.java
index b1d28ce89..96445f949 100644
--- 
a/service/common/src/main/java/org/apache/polaris/service/context/PolarisCallContextCatalogFactory.java
+++ 
b/service/common/src/main/java/org/apache/polaris/service/context/catalog/PolarisCallContextCatalogFactory.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.polaris.service.context;
+package org.apache.polaris.service.context.catalog;
 
 import jakarta.enterprise.context.ApplicationScoped;
 import jakarta.inject.Inject;
@@ -48,9 +48,6 @@ public class PolarisCallContextCatalogFactory implements 
CallContextCatalogFacto
   private static final Logger LOGGER =
       LoggerFactory.getLogger(PolarisCallContextCatalogFactory.class);
 
-  private static final String WAREHOUSE_LOCATION_BASEDIR =
-      "/tmp/iceberg_rest_server_warehouse_data/";
-
   private final RealmEntityManagerFactory entityManagerFactory;
   private final TaskExecutor taskExecutor;
   private final FileIOFactory fileIOFactory;
diff --git 
a/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java
 
b/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java
index 15a254a08..34a3caf45 100644
--- 
a/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java
+++ 
b/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java
@@ -55,8 +55,8 @@ import 
org.apache.polaris.service.catalog.io.MeasuredFileIOFactory;
 import org.apache.polaris.service.config.DefaultConfigurationStore;
 import org.apache.polaris.service.config.RealmEntityManagerFactory;
 import org.apache.polaris.service.config.ReservedProperties;
-import org.apache.polaris.service.context.CallContextCatalogFactory;
-import org.apache.polaris.service.context.PolarisCallContextCatalogFactory;
+import org.apache.polaris.service.context.catalog.CallContextCatalogFactory;
+import 
org.apache.polaris.service.context.catalog.PolarisCallContextCatalogFactory;
 import org.apache.polaris.service.events.PolarisEventListener;
 import org.apache.polaris.service.events.TestPolarisEventListener;
 import 
org.apache.polaris.service.persistence.InMemoryPolarisMetaStoreManagerFactory;
diff --git a/spec/polaris-management-service.yml 
b/spec/polaris-management-service.yml
index 318f17a6c..795dd61ff 100644
--- a/spec/polaris-management-service.yml
+++ b/spec/polaris-management-service.yml
@@ -862,6 +862,7 @@ components:
           type: string
           enum:
             - ICEBERG_REST
+            - HADOOP
           description: The type of remote catalog service represented by this 
connection
         uri:
           type: string
@@ -874,6 +875,7 @@ components:
         propertyName: connectionType
         mapping:
           ICEBERG_REST: "#/components/schemas/IcebergRestConnectionConfigInfo"
+          HADOOP: "#/components/schemas/HadoopConnectionConfigInfo"
 
     IcebergRestConnectionConfigInfo:
       type: object
@@ -887,6 +889,16 @@ components:
             this is specified as the 'warehouse' when multiple logical 
catalogs are served under the same base
             uri, and often translates into a 'prefix' added to all REST 
resource paths
 
+    HadoopConnectionConfigInfo:
+      type: object
+      description: Configuration necessary for connecting to a Hadoop Catalog
+      allOf:
+        - $ref: '#/components/schemas/ConnectionConfigInfo'
+      properties:
+        warehouse:
+          type: string
+          description: The file path to where this catalog should store tables
+
     AuthenticationParameters:
       type: object
       description: Authentication-specific information for a REST connection

Reply via email to