This is an automated email from the ASF dual-hosted git repository.
emaynard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/main by this push:
new 24683d86b Implement federation to HadoopCatalog (#1466)
24683d86b is described below
commit 24683d86b46c63a198b2f7b42725037f32d603b8
Author: Eric Maynard <[email protected]>
AuthorDate: Fri May 9 15:18:05 2025 -0700
Implement federation to HadoopCatalog (#1466)
* wip
* quarkus fixes
* autolint
* hadoop impl
* autolint
* Refactors
* refactored
* autolint
* add config
* autolint
* stable
* Remove breakpoint anchor
* add line to application.properties
* yank HADOOP
* autolint
---
.../polaris/core/config/FeatureConfiguration.java | 8 +++
.../connection/AuthenticationParametersDpo.java | 1 +
.../core/connection/ConnectionConfigInfoDpo.java | 20 +++++++-
.../polaris/core/connection/ConnectionType.java | 1 +
.../HadoopConnectionConfigInfoDpo.java} | 59 +++++++++++-----------
.../IcebergCatalogPropertiesProvider.java | 2 +-
.../IcebergRestConnectionConfigInfoDpo.java | 5 +-
.../src/main/resources/application.properties | 1 +
.../quarkus/admin/PolarisAuthzTestBase.java | 4 +-
.../catalog/IcebergCatalogHandlerAuthzTest.java | 4 +-
.../polaris/service/admin/PolarisServiceImpl.java | 27 ++++++++++
.../catalog/iceberg/IcebergCatalogAdapter.java | 2 +-
.../catalog/iceberg/IcebergCatalogHandler.java | 13 ++++-
.../{ => catalog}/CallContextCatalogFactory.java | 5 +-
.../PolarisCallContextCatalogFactory.java | 5 +-
.../org/apache/polaris/service/TestServices.java | 4 +-
spec/polaris-management-service.yml | 12 +++++
17 files changed, 127 insertions(+), 46 deletions(-)
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java
b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java
index 26437a8f6..64e2ae1a3 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java
@@ -21,6 +21,7 @@ package org.apache.polaris.core.config;
import java.util.List;
import java.util.Optional;
import org.apache.polaris.core.admin.model.StorageConfigInfo;
+import org.apache.polaris.core.connection.ConnectionType;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.persistence.cache.EntityWeigher;
@@ -241,4 +242,11 @@ public class FeatureConfiguration<T> extends
PolarisConfiguration<T> {
.description("If true, the policy-store endpoints are enabled")
.defaultValue(true)
.buildFeatureConfiguration();
+
+ public static final FeatureConfiguration<List<String>>
SUPPORTED_CATALOG_CONNECTION_TYPES =
+ PolarisConfiguration.<List<String>>builder()
+ .key("SUPPORTED_CATALOG_CONNECTION_TYPES")
+ .description("The list of supported catalog connection types for
federation")
+ .defaultValue(List.of(ConnectionType.ICEBERG_REST.name()))
+ .buildFeatureConfiguration();
}
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/connection/AuthenticationParametersDpo.java
b/polaris-core/src/main/java/org/apache/polaris/core/connection/AuthenticationParametersDpo.java
index bf07c4b79..f2267b12a 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/connection/AuthenticationParametersDpo.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/connection/AuthenticationParametersDpo.java
@@ -25,6 +25,7 @@ import java.util.Map;
import org.apache.polaris.core.admin.model.AuthenticationParameters;
import org.apache.polaris.core.admin.model.BearerAuthenticationParameters;
import org.apache.polaris.core.admin.model.OAuthClientCredentialsParameters;
+import
org.apache.polaris.core.connection.iceberg.IcebergCatalogPropertiesProvider;
import org.apache.polaris.core.secrets.UserSecretReference;
/**
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/connection/ConnectionConfigInfoDpo.java
b/polaris-core/src/main/java/org/apache/polaris/core/connection/ConnectionConfigInfoDpo.java
index 482dc00fc..4313ede12 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/connection/ConnectionConfigInfoDpo.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/connection/ConnectionConfigInfoDpo.java
@@ -33,7 +33,11 @@ import java.net.URL;
import java.util.Map;
import org.apache.polaris.core.PolarisDiagnostics;
import org.apache.polaris.core.admin.model.ConnectionConfigInfo;
+import org.apache.polaris.core.admin.model.HadoopConnectionConfigInfo;
import org.apache.polaris.core.admin.model.IcebergRestConnectionConfigInfo;
+import org.apache.polaris.core.connection.hadoop.HadoopConnectionConfigInfoDpo;
+import
org.apache.polaris.core.connection.iceberg.IcebergCatalogPropertiesProvider;
+import
org.apache.polaris.core.connection.iceberg.IcebergRestConnectionConfigInfoDpo;
import org.apache.polaris.core.secrets.UserSecretReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,6 +52,7 @@ import org.slf4j.LoggerFactory;
property = "connectionTypeCode")
@JsonSubTypes({
@JsonSubTypes.Type(value = IcebergRestConnectionConfigInfoDpo.class, name =
"1"),
+ @JsonSubTypes.Type(value = HadoopConnectionConfigInfoDpo.class, name = "2"),
})
public abstract class ConnectionConfigInfoDpo implements
IcebergCatalogPropertiesProvider {
private static final Logger logger =
LoggerFactory.getLogger(ConnectionConfigInfoDpo.class);
@@ -140,11 +145,12 @@ public abstract class ConnectionConfigInfoDpo implements
IcebergCatalogPropertie
ConnectionConfigInfo connectionConfigurationModel,
Map<String, UserSecretReference> secretReferences) {
ConnectionConfigInfoDpo config = null;
+ final AuthenticationParametersDpo authenticationParameters;
switch (connectionConfigurationModel.getConnectionType()) {
case ICEBERG_REST:
IcebergRestConnectionConfigInfo icebergRestConfigModel =
(IcebergRestConnectionConfigInfo) connectionConfigurationModel;
- AuthenticationParametersDpo authenticationParameters =
+ authenticationParameters =
AuthenticationParametersDpo.fromAuthenticationParametersModelWithSecrets(
icebergRestConfigModel.getAuthenticationParameters(),
secretReferences);
config =
@@ -153,6 +159,18 @@ public abstract class ConnectionConfigInfoDpo implements
IcebergCatalogPropertie
authenticationParameters,
icebergRestConfigModel.getRemoteCatalogName());
break;
+ case HADOOP:
+ HadoopConnectionConfigInfo hadoopConfigModel =
+ (HadoopConnectionConfigInfo) connectionConfigurationModel;
+ authenticationParameters =
+
AuthenticationParametersDpo.fromAuthenticationParametersModelWithSecrets(
+ hadoopConfigModel.getAuthenticationParameters(),
secretReferences);
+ config =
+ new HadoopConnectionConfigInfoDpo(
+ hadoopConfigModel.getUri(),
+ authenticationParameters,
+ hadoopConfigModel.getWarehouse());
+ break;
default:
throw new IllegalStateException(
"Unsupported connection type: " +
connectionConfigurationModel.getConnectionType());
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/connection/ConnectionType.java
b/polaris-core/src/main/java/org/apache/polaris/core/connection/ConnectionType.java
index 6d4a419a5..3c2d4430a 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/connection/ConnectionType.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/connection/ConnectionType.java
@@ -32,6 +32,7 @@ import java.util.Arrays;
public enum ConnectionType {
NULL_TYPE(0),
ICEBERG_REST(1),
+ HADOOP(2),
;
private static final ConnectionType[] REVERSE_MAPPING_ARRAY;
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/connection/IcebergRestConnectionConfigInfoDpo.java
b/polaris-core/src/main/java/org/apache/polaris/core/connection/hadoop/HadoopConnectionConfigInfoDpo.java
similarity index 67%
copy from
polaris-core/src/main/java/org/apache/polaris/core/connection/IcebergRestConnectionConfigInfoDpo.java
copy to
polaris-core/src/main/java/org/apache/polaris/core/connection/hadoop/HadoopConnectionConfigInfoDpo.java
index 11fac0e22..5f29482c1 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/connection/IcebergRestConnectionConfigInfoDpo.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/connection/hadoop/HadoopConnectionConfigInfoDpo.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.polaris.core.connection;
+package org.apache.polaris.core.connection.hadoop;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.MoreObjects;
@@ -26,30 +26,41 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.iceberg.CatalogProperties;
import org.apache.polaris.core.admin.model.ConnectionConfigInfo;
-import org.apache.polaris.core.admin.model.IcebergRestConnectionConfigInfo;
+import org.apache.polaris.core.admin.model.HadoopConnectionConfigInfo;
+import org.apache.polaris.core.connection.AuthenticationParametersDpo;
+import org.apache.polaris.core.connection.ConnectionConfigInfoDpo;
+import org.apache.polaris.core.connection.ConnectionType;
import org.apache.polaris.core.secrets.UserSecretsManager;
/**
- * The internal persistence-object counterpart to
IcebergRestConnectionConfigInfo defined in the API
- * model.
+ * The internal persistence-object counterpart to {@link
+ * org.apache.polaris.core.admin.model.HadoopConnectionConfigInfo} defined in
the API model.
*/
-public class IcebergRestConnectionConfigInfoDpo extends ConnectionConfigInfoDpo
- implements IcebergCatalogPropertiesProvider {
+public class HadoopConnectionConfigInfoDpo extends ConnectionConfigInfoDpo {
- private final String remoteCatalogName;
+ private final String warehouse;
- public IcebergRestConnectionConfigInfoDpo(
+ public HadoopConnectionConfigInfoDpo(
@JsonProperty(value = "uri", required = true) @Nonnull String uri,
@JsonProperty(value = "authenticationParameters", required = true)
@Nonnull
AuthenticationParametersDpo authenticationParameters,
- @JsonProperty(value = "remoteCatalogName", required = false) @Nullable
- String remoteCatalogName) {
- super(ConnectionType.ICEBERG_REST.getCode(), uri,
authenticationParameters);
- this.remoteCatalogName = remoteCatalogName;
+ @JsonProperty(value = "warehouse", required = false) @Nullable String
remoteCatalogName) {
+ super(ConnectionType.HADOOP.getCode(), uri, authenticationParameters);
+ this.warehouse = remoteCatalogName;
}
- public String getRemoteCatalogName() {
- return remoteCatalogName;
+ public String getWarehouse() {
+ return warehouse;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("connectionTypeCode", getConnectionTypeCode())
+ .add("uri", getUri())
+ .add("warehouse", getWarehouse())
+ .add("authenticationParameters",
getAuthenticationParameters().toString())
+ .toString();
}
@Override
@@ -57,8 +68,8 @@ public class IcebergRestConnectionConfigInfoDpo extends
ConnectionConfigInfoDpo
UserSecretsManager secretsManager) {
HashMap<String, String> properties = new HashMap<>();
properties.put(CatalogProperties.URI, getUri());
- if (getRemoteCatalogName() != null) {
- properties.put(CatalogProperties.WAREHOUSE_LOCATION,
getRemoteCatalogName());
+ if (getWarehouse() != null) {
+ properties.put(CatalogProperties.WAREHOUSE_LOCATION, getWarehouse());
}
properties.putAll(getAuthenticationParameters().asIcebergCatalogProperties(secretsManager));
return properties;
@@ -66,22 +77,12 @@ public class IcebergRestConnectionConfigInfoDpo extends
ConnectionConfigInfoDpo
@Override
public ConnectionConfigInfo asConnectionConfigInfoModel() {
- return IcebergRestConnectionConfigInfo.builder()
-
.setConnectionType(ConnectionConfigInfo.ConnectionTypeEnum.ICEBERG_REST)
+ return HadoopConnectionConfigInfo.builder()
+ .setConnectionType(ConnectionConfigInfo.ConnectionTypeEnum.HADOOP)
.setUri(getUri())
- .setRemoteCatalogName(getRemoteCatalogName())
+ .setWarehouse(getWarehouse())
.setAuthenticationParameters(
getAuthenticationParameters().asAuthenticationParametersModel())
.build();
}
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("connectionTypeCode", getConnectionTypeCode())
- .add("uri", getUri())
- .add("remoteCatalogName", getRemoteCatalogName())
- .add("authenticationParameters",
getAuthenticationParameters().toString())
- .toString();
- }
}
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/connection/IcebergCatalogPropertiesProvider.java
b/polaris-core/src/main/java/org/apache/polaris/core/connection/iceberg/IcebergCatalogPropertiesProvider.java
similarity index 96%
rename from
polaris-core/src/main/java/org/apache/polaris/core/connection/IcebergCatalogPropertiesProvider.java
rename to
polaris-core/src/main/java/org/apache/polaris/core/connection/iceberg/IcebergCatalogPropertiesProvider.java
index e7955bc61..75af01100 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/connection/IcebergCatalogPropertiesProvider.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/connection/iceberg/IcebergCatalogPropertiesProvider.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.polaris.core.connection;
+package org.apache.polaris.core.connection.iceberg;
import jakarta.annotation.Nonnull;
import java.util.Map;
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/connection/IcebergRestConnectionConfigInfoDpo.java
b/polaris-core/src/main/java/org/apache/polaris/core/connection/iceberg/IcebergRestConnectionConfigInfoDpo.java
similarity index 93%
rename from
polaris-core/src/main/java/org/apache/polaris/core/connection/IcebergRestConnectionConfigInfoDpo.java
rename to
polaris-core/src/main/java/org/apache/polaris/core/connection/iceberg/IcebergRestConnectionConfigInfoDpo.java
index 11fac0e22..236dcee29 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/connection/IcebergRestConnectionConfigInfoDpo.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/connection/iceberg/IcebergRestConnectionConfigInfoDpo.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.polaris.core.connection;
+package org.apache.polaris.core.connection.iceberg;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.MoreObjects;
@@ -27,6 +27,9 @@ import java.util.Map;
import org.apache.iceberg.CatalogProperties;
import org.apache.polaris.core.admin.model.ConnectionConfigInfo;
import org.apache.polaris.core.admin.model.IcebergRestConnectionConfigInfo;
+import org.apache.polaris.core.connection.AuthenticationParametersDpo;
+import org.apache.polaris.core.connection.ConnectionConfigInfoDpo;
+import org.apache.polaris.core.connection.ConnectionType;
import org.apache.polaris.core.secrets.UserSecretsManager;
/**
diff --git a/quarkus/defaults/src/main/resources/application.properties
b/quarkus/defaults/src/main/resources/application.properties
index 0482f3de0..11f23de1b 100644
--- a/quarkus/defaults/src/main/resources/application.properties
+++ b/quarkus/defaults/src/main/resources/application.properties
@@ -111,6 +111,7 @@ polaris.realm-context.require-header=false
polaris.features.defaults."ENFORCE_PRINCIPAL_CREDENTIAL_ROTATION_REQUIRED_CHECKING"=false
polaris.features.defaults."SUPPORTED_CATALOG_STORAGE_TYPES"=["S3","GCS","AZURE","FILE"]
# polaris.features.defaults."ENABLE_CATALOG_FEDERATION"=true
+polaris.features.defaults."SUPPORTED_CATALOG_CONNECTION_TYPES"=["ICEBERG_REST"]
# realm overrides
#
polaris.features.realm-overrides."my-realm"."INITIALIZE_DEFAULT_CATALOG_FILEIO_FOR_TEST"=true
diff --git
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java
index bb32d9915..91f0a33d6 100644
---
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java
+++
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java
@@ -87,8 +87,8 @@ import
org.apache.polaris.service.catalog.policy.PolicyCatalog;
import org.apache.polaris.service.config.DefaultConfigurationStore;
import org.apache.polaris.service.config.RealmEntityManagerFactory;
import org.apache.polaris.service.config.ReservedProperties;
-import org.apache.polaris.service.context.CallContextCatalogFactory;
-import org.apache.polaris.service.context.PolarisCallContextCatalogFactory;
+import org.apache.polaris.service.context.catalog.CallContextCatalogFactory;
+import
org.apache.polaris.service.context.catalog.PolarisCallContextCatalogFactory;
import org.apache.polaris.service.events.PolarisEventListener;
import
org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl;
import org.apache.polaris.service.task.TaskExecutor;
diff --git
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogHandlerAuthzTest.java
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogHandlerAuthzTest.java
index 6a95f7cdb..08dd790e9 100644
---
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogHandlerAuthzTest.java
+++
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogHandlerAuthzTest.java
@@ -68,8 +68,8 @@ import
org.apache.polaris.core.persistence.resolver.PolarisResolutionManifest;
import org.apache.polaris.service.catalog.iceberg.IcebergCatalogHandler;
import org.apache.polaris.service.catalog.io.DefaultFileIOFactory;
import org.apache.polaris.service.config.RealmEntityManagerFactory;
-import org.apache.polaris.service.context.CallContextCatalogFactory;
-import org.apache.polaris.service.context.PolarisCallContextCatalogFactory;
+import org.apache.polaris.service.context.catalog.CallContextCatalogFactory;
+import
org.apache.polaris.service.context.catalog.PolarisCallContextCatalogFactory;
import org.apache.polaris.service.http.IfNoneMatch;
import org.apache.polaris.service.quarkus.admin.PolarisAuthzTestBase;
import org.apache.polaris.service.types.NotificationRequest;
diff --git
a/service/common/src/main/java/org/apache/polaris/service/admin/PolarisServiceImpl.java
b/service/common/src/main/java/org/apache/polaris/service/admin/PolarisServiceImpl.java
index c0eba7b0a..e4e351a99 100644
---
a/service/common/src/main/java/org/apache/polaris/service/admin/PolarisServiceImpl.java
+++
b/service/common/src/main/java/org/apache/polaris/service/admin/PolarisServiceImpl.java
@@ -23,6 +23,7 @@ import jakarta.inject.Inject;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.SecurityContext;
import java.util.List;
+import java.util.Locale;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NotAuthorizedException;
@@ -37,6 +38,7 @@ import
org.apache.polaris.core.admin.model.CreateCatalogRequest;
import org.apache.polaris.core.admin.model.CreateCatalogRoleRequest;
import org.apache.polaris.core.admin.model.CreatePrincipalRequest;
import org.apache.polaris.core.admin.model.CreatePrincipalRoleRequest;
+import org.apache.polaris.core.admin.model.ExternalCatalog;
import org.apache.polaris.core.admin.model.GrantCatalogRoleRequest;
import org.apache.polaris.core.admin.model.GrantPrincipalRoleRequest;
import org.apache.polaris.core.admin.model.GrantResource;
@@ -141,6 +143,7 @@ public class PolarisServiceImpl
PolarisAdminService adminService = newAdminService(realmContext,
securityContext);
Catalog catalog = request.getCatalog();
validateStorageConfig(catalog.getStorageConfigInfo());
+ validateConnectionConfigInfo(catalog);
Catalog newCatalog = new
CatalogEntity(adminService.createCatalog(request)).asCatalog();
LOGGER.info("Created new catalog {}", newCatalog);
return Response.status(Response.Status.CREATED).build();
@@ -163,6 +166,30 @@ public class PolarisServiceImpl
}
}
+ private void validateConnectionConfigInfo(Catalog catalog) {
+ if (catalog.getType() == Catalog.TypeEnum.EXTERNAL) {
+ if (catalog instanceof ExternalCatalog externalCatalog) {
+ if (externalCatalog.getConnectionConfigInfo() != null) {
+ String connectionType =
+
externalCatalog.getConnectionConfigInfo().getConnectionType().name();
+ List<String> supportedConnectionTypes =
+ callContext
+ .getPolarisCallContext()
+ .getConfigurationStore()
+ .getConfiguration(
+ callContext.getPolarisCallContext(),
+ FeatureConfiguration.SUPPORTED_CATALOG_CONNECTION_TYPES)
+ .stream()
+ .map(s -> s.toUpperCase(Locale.ROOT))
+ .toList();
+ if (!supportedConnectionTypes.contains(connectionType)) {
+ throw new IllegalStateException("Unsupported connection type: " +
connectionType);
+ }
+ }
+ }
+ }
+ }
+
/** From PolarisCatalogsApiService */
@Override
public Response deleteCatalog(
diff --git
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java
index 8dec97168..4484dc129 100644
---
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java
+++
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java
@@ -75,7 +75,7 @@ import
org.apache.polaris.service.catalog.api.IcebergRestCatalogApiService;
import
org.apache.polaris.service.catalog.api.IcebergRestConfigurationApiService;
import org.apache.polaris.service.catalog.common.CatalogAdapter;
import org.apache.polaris.service.config.ReservedProperties;
-import org.apache.polaris.service.context.CallContextCatalogFactory;
+import org.apache.polaris.service.context.catalog.CallContextCatalogFactory;
import org.apache.polaris.service.http.IcebergHttpUtil;
import org.apache.polaris.service.http.IfNoneMatch;
import org.apache.polaris.service.types.CommitTableRequest;
diff --git
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
index e8912548d..aca12eb6f 100644
---
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
+++
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
@@ -54,6 +54,7 @@ import org.apache.iceberg.exceptions.BadRequestException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.ForbiddenException;
import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.rest.CatalogHandlers;
import org.apache.iceberg.rest.HTTPClient;
import org.apache.iceberg.rest.RESTCatalog;
@@ -79,7 +80,8 @@ import org.apache.polaris.core.config.FeatureConfiguration;
import org.apache.polaris.core.config.PolarisConfigurationStore;
import org.apache.polaris.core.connection.ConnectionConfigInfoDpo;
import org.apache.polaris.core.connection.ConnectionType;
-import org.apache.polaris.core.connection.IcebergRestConnectionConfigInfoDpo;
+import org.apache.polaris.core.connection.hadoop.HadoopConnectionConfigInfoDpo;
+import
org.apache.polaris.core.connection.iceberg.IcebergRestConnectionConfigInfoDpo;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.entity.CatalogEntity;
import org.apache.polaris.core.entity.PolarisEntitySubType;
@@ -97,7 +99,7 @@ import org.apache.polaris.core.storage.PolarisStorageActions;
import org.apache.polaris.service.catalog.SupportsNotifications;
import org.apache.polaris.service.catalog.common.CatalogHandler;
import org.apache.polaris.service.config.ReservedProperties;
-import org.apache.polaris.service.context.CallContextCatalogFactory;
+import org.apache.polaris.service.context.catalog.CallContextCatalogFactory;
import org.apache.polaris.service.http.IcebergHttpUtil;
import org.apache.polaris.service.http.IfNoneMatch;
import org.apache.polaris.service.types.NotificationRequest;
@@ -212,6 +214,7 @@ public class IcebergCatalogHandler extends CatalogHandler
implements AutoCloseab
Catalog federatedCatalog;
ConnectionType connectionType =
ConnectionType.fromCode(connectionConfigInfoDpo.getConnectionTypeCode());
+
switch (connectionType) {
case ICEBERG_REST:
SessionCatalog.SessionContext context =
SessionCatalog.SessionContext.createEmpty();
@@ -226,6 +229,12 @@ public class IcebergCatalogHandler extends CatalogHandler
implements AutoCloseab
((IcebergRestConnectionConfigInfoDpo)
connectionConfigInfoDpo).getRemoteCatalogName(),
connectionConfigInfoDpo.asIcebergCatalogProperties(getUserSecretsManager()));
break;
+ case HADOOP:
+ federatedCatalog = new HadoopCatalog();
+ federatedCatalog.initialize(
+ ((HadoopConnectionConfigInfoDpo)
connectionConfigInfoDpo).getWarehouse(),
+
connectionConfigInfoDpo.asIcebergCatalogProperties(getUserSecretsManager()));
+ break;
default:
throw new UnsupportedOperationException(
"Connection type not supported: " + connectionType);
diff --git
a/service/common/src/main/java/org/apache/polaris/service/context/CallContextCatalogFactory.java
b/service/common/src/main/java/org/apache/polaris/service/context/catalog/CallContextCatalogFactory.java
similarity index 90%
rename from
service/common/src/main/java/org/apache/polaris/service/context/CallContextCatalogFactory.java
rename to
service/common/src/main/java/org/apache/polaris/service/context/catalog/CallContextCatalogFactory.java
index 7c812d6d3..5a89ed49d 100644
---
a/service/common/src/main/java/org/apache/polaris/service/context/CallContextCatalogFactory.java
+++
b/service/common/src/main/java/org/apache/polaris/service/context/catalog/CallContextCatalogFactory.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.polaris.service.context;
+package org.apache.polaris.service.context.catalog;
import jakarta.ws.rs.core.SecurityContext;
import org.apache.iceberg.catalog.Catalog;
@@ -25,6 +25,9 @@ import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifest;
public interface CallContextCatalogFactory {
+
+ static final String WAREHOUSE_LOCATION_BASEDIR =
"/tmp/iceberg_rest_server_warehouse_data/";
+
Catalog createCallContextCatalog(
CallContext context,
AuthenticatedPolarisPrincipal authenticatedPrincipal,
diff --git
a/service/common/src/main/java/org/apache/polaris/service/context/PolarisCallContextCatalogFactory.java
b/service/common/src/main/java/org/apache/polaris/service/context/catalog/PolarisCallContextCatalogFactory.java
similarity index 97%
rename from
service/common/src/main/java/org/apache/polaris/service/context/PolarisCallContextCatalogFactory.java
rename to
service/common/src/main/java/org/apache/polaris/service/context/catalog/PolarisCallContextCatalogFactory.java
index b1d28ce89..96445f949 100644
---
a/service/common/src/main/java/org/apache/polaris/service/context/PolarisCallContextCatalogFactory.java
+++
b/service/common/src/main/java/org/apache/polaris/service/context/catalog/PolarisCallContextCatalogFactory.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.polaris.service.context;
+package org.apache.polaris.service.context.catalog;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@@ -48,9 +48,6 @@ public class PolarisCallContextCatalogFactory implements
CallContextCatalogFacto
private static final Logger LOGGER =
LoggerFactory.getLogger(PolarisCallContextCatalogFactory.class);
- private static final String WAREHOUSE_LOCATION_BASEDIR =
- "/tmp/iceberg_rest_server_warehouse_data/";
-
private final RealmEntityManagerFactory entityManagerFactory;
private final TaskExecutor taskExecutor;
private final FileIOFactory fileIOFactory;
diff --git
a/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java
b/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java
index 15a254a08..34a3caf45 100644
---
a/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java
+++
b/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java
@@ -55,8 +55,8 @@ import
org.apache.polaris.service.catalog.io.MeasuredFileIOFactory;
import org.apache.polaris.service.config.DefaultConfigurationStore;
import org.apache.polaris.service.config.RealmEntityManagerFactory;
import org.apache.polaris.service.config.ReservedProperties;
-import org.apache.polaris.service.context.CallContextCatalogFactory;
-import org.apache.polaris.service.context.PolarisCallContextCatalogFactory;
+import org.apache.polaris.service.context.catalog.CallContextCatalogFactory;
+import
org.apache.polaris.service.context.catalog.PolarisCallContextCatalogFactory;
import org.apache.polaris.service.events.PolarisEventListener;
import org.apache.polaris.service.events.TestPolarisEventListener;
import
org.apache.polaris.service.persistence.InMemoryPolarisMetaStoreManagerFactory;
diff --git a/spec/polaris-management-service.yml
b/spec/polaris-management-service.yml
index 318f17a6c..795dd61ff 100644
--- a/spec/polaris-management-service.yml
+++ b/spec/polaris-management-service.yml
@@ -862,6 +862,7 @@ components:
type: string
enum:
- ICEBERG_REST
+ - HADOOP
description: The type of remote catalog service represented by this
connection
uri:
type: string
@@ -874,6 +875,7 @@ components:
propertyName: connectionType
mapping:
ICEBERG_REST: "#/components/schemas/IcebergRestConnectionConfigInfo"
+ HADOOP: "#/components/schemas/HadoopConnectionConfigInfo"
IcebergRestConnectionConfigInfo:
type: object
@@ -887,6 +889,16 @@ components:
this is specified as the 'warehouse' when multiple logical
catalogs are served under the same base
uri, and often translates into a 'prefix' added to all REST
resource paths
+ HadoopConnectionConfigInfo:
+ type: object
+ description: Configuration necessary for connecting to a Hadoop Catalog
+ allOf:
+ - $ref: '#/components/schemas/ConnectionConfigInfo'
+ properties:
+ warehouse:
+ type: string
+ description: The file path to where this catalog should store tables
+
AuthenticationParameters:
type: object
description: Authentication-specific information for a REST connection