This is an automated email from the ASF dual-hosted git repository.
emaynard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris-tools.git
The following commit(s) were added to refs/heads/main by this push:
new ab310d5 Add token refresh and token exchange OAuth2 flows to
polaris-synchronizer tool. (#8)
ab310d5 is described below
commit ab310d564ccb7abe36c9766653e7033bbc0c6294
Author: Mansehaj Singh <[email protected]>
AuthorDate: Fri Apr 25 10:04:53 2025 -0700
Add token refresh and token exchange OAuth2 flows to polaris-synchronizer
tool. (#8)
* Add token refresh and token exchange auth flow
* use credential instead of client id and client secret
* Added token exchange flow
* Remove spurious change
* Addressed comments
* Removed token exchnage
* update comment
* Services properly close resources and enforce closeability on abstractions
* Removed credential and token from parent auth
* addressed comments
* Closed iceberg catalog service properly
---
polaris-synchronizer/README.md | 16 +-
.../tools/sync/polaris/PolarisSynchronizer.java | 41 ++---
.../polaris/auth/AuthenticationSessionWrapper.java | 90 ++++++++++
.../tools/sync/polaris/catalog/ETagManager.java | 2 +-
.../sync/polaris/catalog/NoOpETagManager.java | 4 +
.../tools/sync/polaris/catalog/PolarisCatalog.java | 44 ++---
.../polaris/tools/sync/polaris/http/HttpUtil.java | 39 -----
.../tools/sync/polaris/http/OAuth2Util.java | 78 ---------
.../polaris/service/IcebergCatalogService.java | 3 +-
.../tools/sync/polaris/service/PolarisService.java | 3 +-
.../polaris/service/impl/PolarisApiService.java | 42 +++--
.../service/impl/PolarisIcebergCatalogService.java | 6 +
.../apache/polaris/tools/sync/polaris/CLIUtil.java | 26 +++
.../polaris/CreateOmnipotentPrincipalCommand.java | 194 ++++++++++-----------
.../tools/sync/polaris/SyncPolarisCommand.java | 90 +++-------
15 files changed, 318 insertions(+), 360 deletions(-)
diff --git a/polaris-synchronizer/README.md b/polaris-synchronizer/README.md
index fb9e4f6..6ac82df 100644
--- a/polaris-synchronizer/README.md
+++ b/polaris-synchronizer/README.md
@@ -61,8 +61,7 @@ with 10 concurrent catalog setup threads:
java -jar cli/build/libs/polaris-synchronizer-cli.jar
create-omnipotent-principal \
--polaris-api-connection-properties base-url=http://localhost:8181 \
--polaris-api-connection-properties
oauth2-server-uri=http://localhost:8181/api/catalog/v1/oauth/tokens \
---polaris-api-connection-properties client-id=root \
---polaris-api-connection-properties client-secret=<client_secret> \
+--polaris-api-connection-properties credential=<client_id>:<client_secret> \
--polaris-api-connection-properties scope=PRINCIPAL_ROLE:ALL \
--replace \ # replace it if it already exists
--concurrency 10 # 10 concurrent catalog setup threads
@@ -105,8 +104,7 @@ java -jar cli/build/libs/polaris-synchronizer-cli.jar \
create-omnipotent-principal \
--polaris-api-connection-properties base-url=http://localhost:8181 \
--polaris-api-connection-properties
oauth2-server-uri=http://localhost:8181/api/catalog/v1/oauth/tokens \
---polaris-api-connection-properties client-id=root \
---polaris-api-connection-properties client-secret=<client_secret> \
+--polaris-api-connection-properties credential=<client_id>:<client_secret> \
--polaris-api-connection-properties scope=PRINCIPAL_ROLE:ALL \
--replace \ # replace if it already exists
--concurrency 10 \ # 10 concurrent catalog setup threads
@@ -143,22 +141,18 @@ diff between the source and target Polaris instances.
This can be achieved using
> Polaris instance. The new credentials will be logged to stdout, ONLY for
> each newly created or overwritten principal.
> Please note that this output should be securely managed, client credentials
> should only ever be stored in a secure vault.
-**Example** Running the synchronization between source Polaris instance using
an access token, and a target Polaris instance
+**Example** Running the synchronization between source Polaris instance using
a bearer token, and a target Polaris instance
using client credentials.
```
java -jar cli/build/libs/polaris-synchronizer-cli.jar sync-polaris \
--source-properties base-url=http://localhost:8181 \
---source-properties client-id=root \
---source-properties client-secret=<client_secret> \
---source-properties
oauth2-server-uri=http://localhost:8181/api/catalog/v1/oauth/tokens \
---source-properties scope=PRINCIPAL_ROLE:ALL \
+--source-properties token=<bearer_token> \
--source-properties omnipotent-principal-name=omnipotent-principal-XXXXX \
--source-properties omnipotent-principal-client-id=589550e8b23d271e \
--source-properties omnipotent-principal-client-secret=<omni_client_secret> \
--source-properties
omnipotent-principal-oauth2-server-uri=http://localhost:8181/api/catalog/v1/oauth/tokens
\
--target-properties base-url=http://localhost:5858 \
---target-properties client-id=root \
---target-properties client-secret=<client_secret> \
+--target-properties credential=<client_id>:<client_secret> \
--target-properties
oauth2-server-uri=http://localhost:5858/api/catalog/v1/oauth/tokens \
--target-properties scope=PRINCIPAL_ROLE:ALL \
--target-properties omnipotent-principal-name=omnipotent-principal-YYYYY \
diff --git
a/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/PolarisSynchronizer.java
b/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/PolarisSynchronizer.java
index c6a2cc4..a521ecf 100644
---
a/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/PolarisSynchronizer.java
+++
b/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/PolarisSynchronizer.java
@@ -630,41 +630,30 @@ public class PolarisSynchronizer {
}
for (Catalog catalog : catalogSyncPlan.entitiesToSyncChildren()) {
- IcebergCatalogService sourceIcebergCatalogService;
- try {
- sourceIcebergCatalogService =
source.initializeIcebergCatalogService(catalog.getName());
+ try (IcebergCatalogService sourceIcebergCatalogService =
source.initializeIcebergCatalogService(catalog.getName())) {
clientLogger.info(
- "Initialized Iceberg REST catalog for Polaris catalog {} on
source.",
- catalog.getName());
- } catch (Exception e) {
- if (haltOnFailure) throw e;
- clientLogger.error(
- "Failed to initialize Iceberg REST catalog for Polaris catalog {}
on source.",
- catalog.getName(),
- e);
- continue;
- }
+ "Initialized Iceberg REST catalog for Polaris catalog {} on
source.",
+ catalog.getName());
- IcebergCatalogService targetIcebergCatalogService;
+ try (IcebergCatalogService targetIcebergCatalogService =
target.initializeIcebergCatalogService(catalog.getName())) {
+ clientLogger.info(
+ "Initialized Iceberg REST catalog for Polaris catalog {} on
target.",
+ catalog.getName());
+
+ syncNamespaces(
+ catalog.getName(), Namespace.empty(),
sourceIcebergCatalogService, targetIcebergCatalogService);
+ }
- try {
- targetIcebergCatalogService =
target.initializeIcebergCatalogService(catalog.getName());
- clientLogger.info(
- "Initialized Iceberg REST catalog for Polaris catalog {} on
target.",
- catalog.getName());
} catch (Exception e) {
- if (haltOnFailure) throw e;
clientLogger.error(
- "Failed to initialize Iceberg REST catalog for Polaris catalog {}
on target.",
- catalog.getName(),
- e);
+ "Failed to synchronize Iceberg REST catalog for Polaris
catalog {}.",
+ catalog.getName(),
+ e);
+ if (haltOnFailure) throw new RuntimeException(e);
continue;
}
- syncNamespaces(
- catalog.getName(), Namespace.empty(), sourceIcebergCatalogService,
targetIcebergCatalogService);
-
// NOTE: Grants are synced on a per catalog role basis, so we need to
ensure that catalog roles
// are only synced AFTER Iceberg catalog entities, because they may
depend on the Iceberg catalog
// entities already existing
diff --git
a/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/auth/AuthenticationSessionWrapper.java
b/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/auth/AuthenticationSessionWrapper.java
new file mode 100644
index 0000000..5ce0304
--- /dev/null
+++
b/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/auth/AuthenticationSessionWrapper.java
@@ -0,0 +1,90 @@
+package org.apache.polaris.tools.sync.polaris.auth;
+
+import org.apache.iceberg.rest.HTTPClient;
+import org.apache.iceberg.rest.RESTClient;
+import org.apache.iceberg.rest.auth.AuthConfig;
+import org.apache.iceberg.rest.auth.OAuth2Properties;
+import org.apache.iceberg.rest.auth.OAuth2Util;
+import org.apache.iceberg.util.ThreadPools;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ScheduledExecutorService;
+
+/**
+ * Wraps {@link OAuth2Util.AuthSession} to provide supported authentication
flows.
+ */
+public class AuthenticationSessionWrapper implements Closeable {
+
+ private final RESTClient restClient;
+
+ private final OAuth2Util.AuthSession authSession;
+
+ private final ScheduledExecutorService executor;
+
+ public AuthenticationSessionWrapper(Map<String, String> properties) {
+ this.restClient = HTTPClient.builder(Map.of())
+ .uri(properties.get(OAuth2Properties.OAUTH2_SERVER_URI))
+ .build();
+ this.authSession = this.newAuthSession(this.restClient, properties);
+ this.executor = ThreadPools.newScheduledPool(UUID.randomUUID() +
"-token-refresh", 1);
+ }
+
+ /**
+ * Initializes a new authentication session. Supports client_credentials
and bearer token flow.
+ * @param properties properties to initialize the session with
+ * @return an authentication session, with token refresh if applicable
+ */
+ private OAuth2Util.AuthSession newAuthSession(RESTClient restClient,
Map<String, String> properties) {
+ OAuth2Util.AuthSession parent = new OAuth2Util.AuthSession(
+ Map.of(),
+ AuthConfig.builder()
+ .scope(properties.get(OAuth2Properties.SCOPE))
+
.oauth2ServerUri(properties.get(OAuth2Properties.OAUTH2_SERVER_URI))
+
.optionalOAuthParams(OAuth2Util.buildOptionalParam(properties))
+ .build()
+ );
+
+ // This is for client_credentials flow
+ if (properties.containsKey(OAuth2Properties.CREDENTIAL)) {
+ return OAuth2Util.AuthSession.fromCredential(
+ restClient,
+ // threads created here will be daemon threads, so
termination of main program
+ // will terminate the token refresh thread automatically
+ this.executor,
+ properties.get(OAuth2Properties.CREDENTIAL),
+ parent
+ );
+ }
+
+ // This is for regular bearer token flow
+ if (properties.containsKey(OAuth2Properties.TOKEN)) {
+ return OAuth2Util.AuthSession.fromAccessToken(
+ restClient,
+ // threads created here will be daemon threads, so
termination of main program
+ // will terminate the token refresh thread automatically
+ this.executor,
+ properties.get(OAuth2Properties.TOKEN),
+ null, /* defaultExpiresAtMillis */
+ parent
+ );
+ }
+
+ throw new IllegalArgumentException("Unable to construct authenticated
session with the provided properties.");
+ }
+
+ /**
+ * Get refreshed authentication headers for session.
+ */
+ public Map<String, String> getSessionHeaders() {
+ return this.authSession.headers();
+ }
+
+ @Override
+ public void close() throws IOException {
+ try (restClient; executor) {}
+ }
+
+}
diff --git
a/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/catalog/ETagManager.java
b/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/catalog/ETagManager.java
index 832d75c..bd6fcf5 100644
---
a/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/catalog/ETagManager.java
+++
b/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/catalog/ETagManager.java
@@ -26,7 +26,7 @@ import java.util.Map;
* Generic interface to provide and store ETags for tables within catalogs.
This allows the storage
* of the ETag to be completely independent from the tool.
*/
-public interface ETagManager {
+public interface ETagManager extends AutoCloseable {
/**
* Used to initialize the instance for use. Should be called prior to
diff --git
a/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/catalog/NoOpETagManager.java
b/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/catalog/NoOpETagManager.java
index 1bd1499..a17cbf4 100644
---
a/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/catalog/NoOpETagManager.java
+++
b/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/catalog/NoOpETagManager.java
@@ -35,4 +35,8 @@ public class NoOpETagManager implements ETagManager {
@Override
public void storeETag(String catalogName, TableIdentifier tableIdentifier,
String etag) {}
+
+ @Override
+ public void close() {}
+
}
diff --git
a/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/catalog/PolarisCatalog.java
b/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/catalog/PolarisCatalog.java
index 5c489ae..1e76e34 100644
---
a/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/catalog/PolarisCatalog.java
+++
b/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/catalog/PolarisCatalog.java
@@ -40,7 +40,7 @@ import org.apache.iceberg.rest.RESTCatalog;
import org.apache.iceberg.rest.ResourcePaths;
import org.apache.iceberg.rest.responses.LoadTableResponse;
import org.apache.iceberg.rest.responses.LoadTableResponseParser;
-import org.apache.polaris.tools.sync.polaris.http.OAuth2Util;
+import org.apache.polaris.tools.sync.polaris.auth.AuthenticationSessionWrapper;
/**
* Overrides loadTable default implementation to issue a custom loadTable
request to the Polaris
@@ -58,14 +58,14 @@ public class PolarisCatalog extends RESTCatalog
private Map<String, String> properties = null;
- private String accessToken = null;
-
private HttpClient httpClient = null;
private ObjectMapper objectMapper = null;
private ResourcePaths resourcePaths = null;
+ private AuthenticationSessionWrapper authenticationSession = null;
+
public PolarisCatalog() {
super();
}
@@ -82,22 +82,8 @@ public class PolarisCatalog extends RESTCatalog
super.initialize(name, props);
- if (accessToken == null || httpClient == null || this.objectMapper ==
null) {
- String oauth2ServerUri = props.get("uri") + "/v1/oauth/tokens";
- String credential = props.get("credential");
-
- String clientId = credential.split(":")[0];
- String clientSecret = credential.split(":")[1];
-
- String scope = props.get("scope");
-
- // TODO: Add token refresh
- try {
- this.accessToken = OAuth2Util.fetchToken(oauth2ServerUri, clientId,
clientSecret, scope);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
-
+ if (authenticationSession == null || httpClient == null ||
this.objectMapper == null) {
+ this.authenticationSession = new
AuthenticationSessionWrapper(this.properties);
this.httpClient = HttpClient.newBuilder().build();
this.objectMapper = new ObjectMapper();
}
@@ -127,9 +113,10 @@ public class PolarisCatalog extends RESTCatalog
HttpRequest.Builder requestBuilder =
HttpRequest.newBuilder()
.uri(URI.create(tablePath))
- .header(HttpHeaders.AUTHORIZATION, "Bearer " + accessToken)
.GET();
+
this.authenticationSession.getSessionHeaders().forEach(requestBuilder::header);
+
// specify last known etag in if-none-match header
if (etag != null) {
requestBuilder.header(HttpHeaders.IF_NONE_MATCH, etag);
@@ -171,4 +158,21 @@ public class PolarisCatalog extends RESTCatalog
return new BaseTable(ops, CatalogUtil.fullTableName(catalogName, ident));
}
+
+ @Override
+ public void close() throws IOException {
+ final AuthenticationSessionWrapper session = this.authenticationSession;
+ final HttpClient httpClient = this.httpClient;
+
+ try (session; httpClient) {
+ super.close();
+ } finally {
+ this.authenticationSession = null;
+ this.httpClient = null;
+ this.objectMapper = null;
+ this.resourcePaths = null;
+ }
+
+ super.close();
+ }
}
diff --git
a/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/http/HttpUtil.java
b/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/http/HttpUtil.java
deleted file mode 100644
index 50c1428..0000000
---
a/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/http/HttpUtil.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.polaris.tools.sync.polaris.http;
-
-import java.net.URLEncoder;
-import java.nio.charset.StandardCharsets;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-/** Encapsulates handy http utility methods. */
-public class HttpUtil {
-
- /** Turn a {@link Map} into an xxx-url-form-encoded compatible String form
body. */
- public static String constructFormEncodedString(Map<String, String>
parameters) {
- return parameters.entrySet().stream()
- .map(
- entry ->
- URLEncoder.encode(entry.getKey(), StandardCharsets.UTF_8)
- + "="
- + URLEncoder.encode(entry.getValue(),
StandardCharsets.UTF_8))
- .collect(Collectors.joining("&"));
- }
-}
diff --git
a/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/http/OAuth2Util.java
b/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/http/OAuth2Util.java
deleted file mode 100644
index 98ada78..0000000
---
a/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/http/OAuth2Util.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.polaris.tools.sync.polaris.http;
-
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import java.io.IOException;
-import java.net.URI;
-import java.net.http.HttpClient;
-import java.net.http.HttpRequest;
-import java.net.http.HttpResponse;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import org.apache.http.HttpHeaders;
-import org.apache.http.entity.ContentType;
-
-/** Utility class to manage OAuth2 flow for a Polaris instance. */
-public class OAuth2Util {
-
- private static final HttpClient httpClient = HttpClient.newHttpClient();
-
- private static final ObjectMapper objectMapper = new ObjectMapper();
-
- public static String fetchToken(
- String oauth2ServerUri, String clientId, String clientSecret, String
scope)
- throws IOException {
-
- Map<String, String> formBody =
- Map.of(
- "grant_type", "client_credentials",
- "scope", scope,
- "client_id", clientId,
- "client_secret", clientSecret);
-
- String formBodyAsString = HttpUtil.constructFormEncodedString(formBody);
-
- HttpRequest request =
- HttpRequest.newBuilder()
- .uri(URI.create(oauth2ServerUri))
- .header(HttpHeaders.CONTENT_TYPE,
ContentType.APPLICATION_FORM_URLENCODED.getMimeType())
- .POST(HttpRequest.BodyPublishers.ofString(formBodyAsString))
- .build();
-
- try {
- HttpResponse<String> response =
- httpClient.send(request, HttpResponse.BodyHandlers.ofString());
- Map<String, String> responseBody =
- objectMapper.readValue(response.body(), new TypeReference<>() {});
-
- String accessToken = responseBody.getOrDefault("access_token", null);
-
- if (accessToken != null) {
- return accessToken;
- }
-
- throw new NoSuchElementException(
- "No field 'access_token' found in response from oauth2-server-uri.");
- } catch (Exception e) {
- throw new RuntimeException("Could not fetch access token", e);
- }
- }
-}
diff --git
a/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/service/IcebergCatalogService.java
b/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/service/IcebergCatalogService.java
index fe7020b..240e5f8 100644
---
a/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/service/IcebergCatalogService.java
+++
b/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/service/IcebergCatalogService.java
@@ -23,6 +23,7 @@ import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
+import java.io.Closeable;
import java.util.List;
import java.util.Map;
@@ -30,7 +31,7 @@ import java.util.Map;
* Wrapper around {@link org.apache.iceberg.catalog.Catalog} that exposes
functionality
* that uses multiple Iceberg operations. For example, cascading drops of
namespaces.
*/
-public interface IcebergCatalogService {
+public interface IcebergCatalogService extends AutoCloseable {
// NAMESPACES
List<Namespace> listNamespaces(Namespace parentNamespace);
diff --git
a/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/service/PolarisService.java
b/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/service/PolarisService.java
index f011880..b1cb8e8 100644
---
a/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/service/PolarisService.java
+++
b/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/service/PolarisService.java
@@ -26,13 +26,14 @@ import org.apache.polaris.core.admin.model.Principal;
import org.apache.polaris.core.admin.model.PrincipalRole;
import org.apache.polaris.core.admin.model.PrincipalWithCredentials;
+import java.io.Closeable;
import java.util.List;
import java.util.Map;
/**
* Generic wrapper for a Polaris entity store.
*/
-public interface PolarisService {
+public interface PolarisService extends AutoCloseable {
/**
* Called to perform initializing tasks for a Polaris entity store.
diff --git
a/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/service/impl/PolarisApiService.java
b/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/service/impl/PolarisApiService.java
index f454775..9afc389 100644
---
a/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/service/impl/PolarisApiService.java
+++
b/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/service/impl/PolarisApiService.java
@@ -19,7 +19,6 @@
package org.apache.polaris.tools.sync.polaris.service.impl;
-import org.apache.http.HttpHeaders;
import org.apache.iceberg.catalog.Namespace;
import org.apache.polaris.core.admin.model.AddGrantRequest;
import org.apache.polaris.core.admin.model.Catalog;
@@ -39,10 +38,11 @@ import
org.apache.polaris.core.admin.model.RevokeGrantRequest;
import org.apache.polaris.management.ApiClient;
import org.apache.polaris.management.client.PolarisManagementDefaultApi;
import org.apache.polaris.tools.sync.polaris.access.AccessControlService;
-import org.apache.polaris.tools.sync.polaris.http.OAuth2Util;
+import org.apache.polaris.tools.sync.polaris.auth.AuthenticationSessionWrapper;
import org.apache.polaris.tools.sync.polaris.service.IcebergCatalogService;
import org.apache.polaris.tools.sync.polaris.service.PolarisService;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -68,6 +68,8 @@ public class PolarisApiService implements PolarisService {
private boolean icebergWriteAccess = false;
+ private AuthenticationSessionWrapper authenticationSession = null;
+
public PolarisApiService() {}
@Override
@@ -75,25 +77,14 @@ public class PolarisApiService implements PolarisService {
this.properties = properties;
String baseUrl = properties.get("base-url");
- String token = properties.get("bearer-token");
-
- if (token == null) {
- String oauth2ServerUri = properties.get("oauth2-server-uri");
- String clientId = properties.get("client-id");
- String clientSecret = properties.get("client-secret");
- String scope = properties.get("scope");
-
- token = OAuth2Util.fetchToken(oauth2ServerUri, clientId,
clientSecret, scope);
- }
-
- String bearerToken = token; // to make it effectively final to use it
in a lambda
ApiClient client = new ApiClient();
client.updateBaseUri(baseUrl + "/api/management/v1");
- // TODO: Add token refresh
- client.setRequestInterceptor(requestBuilder ->
- requestBuilder.header(HttpHeaders.AUTHORIZATION, "Bearer " +
bearerToken));
+ this.authenticationSession = new
AuthenticationSessionWrapper(properties);
+
+ client.setRequestInterceptor(requestBuilder
+ ->
authenticationSession.getSessionHeaders().forEach(requestBuilder::header));
this.baseUrl = baseUrl;
this.api = new PolarisManagementDefaultApi(client);
@@ -284,4 +275,21 @@ public class PolarisApiService implements PolarisService {
baseUrl, catalogName, omnipotentPrincipal, properties);
}
+ @Override
+ public void close() throws IOException {
+ AuthenticationSessionWrapper session = this.authenticationSession;
+
+ try (session) {}
+ finally {
+ this.properties = null;
+ this.baseUrl = null;
+ this.api = null;
+ this.accessControlService = null;
+ this.omnipotentPrincipal = null;
+ this.omnipotentPrincipalRole = null;
+ this.icebergWriteAccess = false;
+ this.authenticationSession = null;
+ }
+ }
+
}
diff --git
a/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/service/impl/PolarisIcebergCatalogService.java
b/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/service/impl/PolarisIcebergCatalogService.java
index 3955c67..cb9b50a 100644
---
a/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/service/impl/PolarisIcebergCatalogService.java
+++
b/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/service/impl/PolarisIcebergCatalogService.java
@@ -28,6 +28,7 @@ import
org.apache.polaris.core.admin.model.PrincipalWithCredentials;
import org.apache.polaris.tools.sync.polaris.catalog.PolarisCatalog;
import org.apache.polaris.tools.sync.polaris.service.IcebergCatalogService;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -160,4 +161,9 @@ public class PolarisIcebergCatalogService implements
IcebergCatalogService {
this.catalog.dropTable(tableIdentifier, false /* purge */);
}
+ @Override
+ public void close() throws IOException {
+ this.catalog.close();
+ }
+
}
diff --git
a/polaris-synchronizer/cli/src/main/java/org/apache/polaris/tools/sync/polaris/CLIUtil.java
b/polaris-synchronizer/cli/src/main/java/org/apache/polaris/tools/sync/polaris/CLIUtil.java
new file mode 100644
index 0000000..1232fe6
--- /dev/null
+++
b/polaris-synchronizer/cli/src/main/java/org/apache/polaris/tools/sync/polaris/CLIUtil.java
@@ -0,0 +1,26 @@
+package org.apache.polaris.tools.sync.polaris;
+
+/**
+ * CLI specific utilities and constants.
+ */
+public class CLIUtil {
+
+ public static final String API_SERVICE_PROPERTIES_DESCRIPTION =
+ "\nProperties:" +
+ "\n\t- base-url: the base url of the Polaris instance (eg.
http://localhost:8181)" +
+ "\n\t- token: the bearer token to authenticate against the Polaris
instance with." +
+ "\n\t- oauth2-server-uri: the uri of the OAuth2 server to
authenticate to. (eg. http://localhost:8181/api/catalog/v1/oauth/tokens)" +
+ "\n\t- credential: the client credentials to use to authenticate
against the Polaris instance (eg. <client_id>:client_secret>)" +
+ "\n\t- scope: the scope to authenticate with for the service_admin
(eg. PRINCIPAL_ROLE:ALL)";
+
+ public static final String OMNIPOTENT_PRINCIPAL_PROPERTIES_DESCRIPTION =
+ "\nOmnipotent Principal Properties:" +
+ "\n\t- omnipotent-principal-name: the name of the omnipotent
principal created using create-omnipotent-principal on the source Polaris" +
+ "\n\t- omnipotent-principal-client-id: the client id of the
omnipotent principal created using create-omnipotent-principal on the source
Polaris" +
+ "\n\t- omnipotent-principal-client-secret: the client secret of
the omnipotent principal created using create-omnipotent-principal on the
source Polaris" +
+ "\n\t- omnipotent-principal-oauth2-server-uri: (default:
/v1/oauth/tokens endpoint for provided Polaris base-url) "
+ + "the OAuth2 server to use to authenticate the
omnipotent-principal for Iceberg catalog access";
+
+ private CLIUtil() {}
+
+}
diff --git
a/polaris-synchronizer/cli/src/main/java/org/apache/polaris/tools/sync/polaris/CreateOmnipotentPrincipalCommand.java
b/polaris-synchronizer/cli/src/main/java/org/apache/polaris/tools/sync/polaris/CreateOmnipotentPrincipalCommand.java
index df65f81..39394c7 100644
---
a/polaris-synchronizer/cli/src/main/java/org/apache/polaris/tools/sync/polaris/CreateOmnipotentPrincipalCommand.java
+++
b/polaris-synchronizer/cli/src/main/java/org/apache/polaris/tools/sync/polaris/CreateOmnipotentPrincipalCommand.java
@@ -56,15 +56,7 @@ public class CreateOmnipotentPrincipalCommand implements
Callable<Integer> {
@CommandLine.Option(
names = {"--polaris-api-connection-properties"},
required = true,
- description = "The connection properties to connect to the Polaris
API." +
- "\nProperties:" +
- "\n\t- base-url: the base url of the Polaris instance (eg.
http://localhost:8181)" +
- "\n\t- bearer-token: the bearer token to authenticate
against the Polaris instance with. Must " +
- "be provided if any of oauth2-server-uri, client-id,
client-secret, or scope are not provided." +
- "\n\t- oauth2-server-uri: the uri of the OAuth2 server to
authenticate to. (eg. http://localhost:8181/api/catalog/v1/oauth/tokens)" +
- "\n\t- client-id: the client id belonging to a service admin
to authenticate with" +
- "\n\t- client-secret: the client secret belong to a service
admin to authenticate with" +
- "\n\t- scope: the scope to authenticate with for the
service_admin (eg. PRINCIPAL_ROLE:ALL)"
+ description = CLIUtil.API_SERVICE_PROPERTIES_DESCRIPTION
)
private Map<String, String> polarisApiConnectionProperties;
@@ -96,100 +88,102 @@ public class CreateOmnipotentPrincipalCommand implements
Callable<Integer> {
polarisApiConnectionProperties.putIfAbsent(PolarisApiService.ICEBERG_WRITE_ACCESS_PROPERTY,
String.valueOf(withWriteAccess));
- PolarisService polaris = PolarisServiceFactory.createPolarisService(
- PolarisServiceFactory.ServiceType.API,
polarisApiConnectionProperties);
+ try (PolarisService polaris = PolarisServiceFactory.createPolarisService(
+ PolarisServiceFactory.ServiceType.API,
polarisApiConnectionProperties)) {
+
+ AccessControlService accessControlService = new
AccessControlService((PolarisApiService) polaris);
+
+ PrincipalWithCredentials principalWithCredentials;
+
+ try {
+ principalWithCredentials =
accessControlService.createOmnipotentPrincipal(replace);
+ } catch (Exception e) {
+ consoleLog.error("Failed to create omnipotent principal.", e);
+ return 1;
+ }
+
+ consoleLog.info(
+ "Created omnipotent principal {}.",
principalWithCredentials.getPrincipal().getName());
+
+ PrincipalRole principalRole;
+
+ try {
+ principalRole =
+
accessControlService.createAndAssignPrincipalRole(principalWithCredentials,
replace);
+ } catch (Exception e) {
+ consoleLog.error("Failed to create omnipotent principal role and
assign it to principal.", e);
+ return 1;
+ }
+
+ consoleLog.info(
+ "Created omnipotent principal role {} and assigned it to
omnipotent principal {}.",
+ principalWithCredentials.getPrincipal().getName(),
+ principalRole.getName());
+
+ List<Catalog> catalogs = polaris.listCatalogs();
+
+ consoleLog.info("Identified {} catalogs to create catalog roles for.",
catalogs.size());
+
+ final String permissionLevel = withWriteAccess ? "write" : "readonly";
+
+ AtomicInteger completedCatalogSetups = new AtomicInteger(0);
+
+ Queue<Catalog> failedCatalogs = new ConcurrentLinkedQueue<>();
+
+ ExecutorService executor = Executors.newFixedThreadPool(concurrency);
+
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+ for (Catalog catalog : catalogs) {
+ CompletableFuture<Void> future =
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ accessControlService.setupOmnipotentRoleForCatalog(
+ catalog.getName(), principalRole, replace,
withWriteAccess);
+ } catch (Exception e) {
+ failedCatalogs.add(catalog);
+ consoleLog.error(
+ "Failed to setup omnipotent catalog role
for catalog {} with {} access. - {}/{}",
+ catalog.getName(),
+ permissionLevel,
+ completedCatalogSetups.incrementAndGet(),
+ catalogs.size(),
+ e);
+ return;
+ }
+
+ consoleLog.info(
+ "Finished omnipotent principal setup for
catalog {} with {} access. - {}/{}",
+ catalog.getName(),
+ permissionLevel,
+ completedCatalogSetups.incrementAndGet(),
+ catalogs.size());
+ },
+ executor);
+
+ futures.add(future);
+ }
+
+ futures.forEach(CompletableFuture::join);
+
+ consoleLog.info(
+ "Encountered issues creating catalog roles for the following
catalogs: {}",
+ failedCatalogs.stream().map(Catalog::getName).toList());
+
+ consoleLog.info(
+ "\n======================================================\n"
+ + "Omnipotent Principal Credentials:\n"
+ + "\tname = {}\n"
+ + "\tclientId = {}\n"
+ + "\tclientSecret = {}\n"
+ +
"======================================================",
+ principalWithCredentials.getPrincipal().getName(),
+ principalWithCredentials.getCredentials().getClientId(),
+ principalWithCredentials.getCredentials().getClientSecret());
- AccessControlService accessControlService = new
AccessControlService((PolarisApiService) polaris);
-
- PrincipalWithCredentials principalWithCredentials;
-
- try {
- principalWithCredentials =
accessControlService.createOmnipotentPrincipal(replace);
- } catch (Exception e) {
- consoleLog.error("Failed to create omnipotent principal.", e);
- return 1;
}
- consoleLog.info(
- "Created omnipotent principal {}.",
principalWithCredentials.getPrincipal().getName());
-
- PrincipalRole principalRole;
-
- try {
- principalRole =
-
accessControlService.createAndAssignPrincipalRole(principalWithCredentials,
replace);
- } catch (Exception e) {
- consoleLog.error("Failed to create omnipotent principal role and assign
it to principal.", e);
- return 1;
- }
-
- consoleLog.info(
- "Created omnipotent principal role {} and assigned it to omnipotent
principal {}.",
- principalWithCredentials.getPrincipal().getName(),
- principalRole.getName());
-
- List<Catalog> catalogs = polaris.listCatalogs();
-
- consoleLog.info("Identified {} catalogs to create catalog roles for.",
catalogs.size());
-
- final String permissionLevel = withWriteAccess ? "write" : "readonly";
-
- AtomicInteger completedCatalogSetups = new AtomicInteger(0);
-
- Queue<Catalog> failedCatalogs = new ConcurrentLinkedQueue<>();
-
- ExecutorService executor = Executors.newFixedThreadPool(concurrency);
-
- List<CompletableFuture<Void>> futures = new ArrayList<>();
-
- for (Catalog catalog : catalogs) {
- CompletableFuture<Void> future =
- CompletableFuture.runAsync(
- () -> {
- try {
- accessControlService.setupOmnipotentRoleForCatalog(
- catalog.getName(), principalRole, replace,
withWriteAccess);
- } catch (Exception e) {
- failedCatalogs.add(catalog);
- consoleLog.error(
- "Failed to setup omnipotent catalog role for catalog {}
with {} access. - {}/{}",
- catalog.getName(),
- permissionLevel,
- completedCatalogSetups.incrementAndGet(),
- catalogs.size(),
- e);
- return;
- }
-
- consoleLog.info(
- "Finished omnipotent principal setup for catalog {} with
{} access. - {}/{}",
- catalog.getName(),
- permissionLevel,
- completedCatalogSetups.incrementAndGet(),
- catalogs.size());
- },
- executor);
-
- futures.add(future);
- }
-
- futures.forEach(CompletableFuture::join);
-
- consoleLog.info(
- "Encountered issues creating catalog roles for the following catalogs:
{}",
- failedCatalogs.stream().map(Catalog::getName).toList());
-
- consoleLog.info(
- "\n======================================================\n"
- + "Omnipotent Principal Credentials:\n"
- + "\tname = {}\n"
- + "\tclientId = {}\n"
- + "\tclientSecret = {}\n"
- + "======================================================",
- principalWithCredentials.getPrincipal().getName(),
- principalWithCredentials.getCredentials().getClientId(),
- principalWithCredentials.getCredentials().getClientSecret());
-
return 0;
}
}
diff --git
a/polaris-synchronizer/cli/src/main/java/org/apache/polaris/tools/sync/polaris/SyncPolarisCommand.java
b/polaris-synchronizer/cli/src/main/java/org/apache/polaris/tools/sync/polaris/SyncPolarisCommand.java
index bfd64e2..b3f4cc9 100644
---
a/polaris-synchronizer/cli/src/main/java/org/apache/polaris/tools/sync/polaris/SyncPolarisCommand.java
+++
b/polaris-synchronizer/cli/src/main/java/org/apache/polaris/tools/sync/polaris/SyncPolarisCommand.java
@@ -18,8 +18,6 @@
*/
package org.apache.polaris.tools.sync.polaris;
-import java.io.Closeable;
-import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Callable;
import org.apache.polaris.tools.sync.polaris.catalog.ETagManager;
@@ -49,42 +47,14 @@ public class SyncPolarisCommand implements
Callable<Integer> {
@CommandLine.Option(
names = {"--source-properties"},
required = true,
- description = "Properties to initialize Polaris entity source." +
- "\nProperties:" +
- "\n\t- base-url: the base url of the Polaris instance (eg.
http://localhost:8181)" +
- "\n\t- bearer-token: the bearer token to authenticate
against the Polaris instance with. Must " +
- "be provided if any of oauth2-server-uri, client-id,
client-secret, or scope are not provided." +
- "\n\t- oauth2-server-uri: the uri of the OAuth2 server to
authenticate to. (eg. http://localhost:8181/api/catalog/v1/oauth/tokens)" +
- "\n\t- client-id: the client id belonging to a service admin
to authenticate with" +
- "\n\t- client-secret: the client secret belong to a service
admin to authenticate with" +
- "\n\t- scope: the scope to authenticate with for the
service_admin (eg. PRINCIPAL_ROLE:ALL)" +
- "\nOmnipotent Principal Properties:" +
- "\n\t- omnipotent-principal-name: the name of the omnipotent
principal created using create-omnipotent-principal on the source Polaris" +
- "\n\t- omnipotent-principal-client-id: the client id of the
omnipotent principal created using create-omnipotent-principal on the source
Polaris" +
- "\n\t- omnipotent-principal-client-secret: the client secret
of the omnipotent principal created using create-omnipotent-principal on the
source Polaris" +
- "\n\t- omnipotent-principal-oauth2-server-uri: (default:
/v1/oauth/tokens endpoint for provided Polaris base-url) "
- + "the OAuth2 server to use to authenticate the
omnipotent-principal for Iceberg catalog access"
+ description = CLIUtil.API_SERVICE_PROPERTIES_DESCRIPTION +
CLIUtil.OMNIPOTENT_PRINCIPAL_PROPERTIES_DESCRIPTION
)
private Map<String, String> sourceProperties;
@CommandLine.Option(
names = {"--target-properties"},
required = true,
- description = "Properties to initialize Polaris entity target." +
- "\nProperties:" +
- "\n\t- base-url: the base url of the Polaris instance (eg.
http://localhost:8181)" +
- "\n\t- bearer-token: the bearer token to authenticate
against the Polaris instance with. Must " +
- "be provided if any of oauth2-server-uri, client-id,
client-secret, or scope are not provided." +
- "\n\t- oauth2-server-uri: the uri of the OAuth2 server to
authenticate to. (eg. http://localhost:8181/api/catalog/v1/oauth/tokens)" +
- "\n\t- client-id: the client id belonging to a service admin
to authenticate with" +
- "\n\t- client-secret: the client secret belong to a service
admin to authenticate with" +
- "\n\t- scope: the scope to authenticate with for the
service_admin (eg. PRINCIPAL_ROLE:ALL)" +
- "\nOmnipotent Principal Properties:" +
- "\n\t- omnipotent-principal-name: the name of the omnipotent
principal created using create-omnipotent-principal on the target Polaris" +
- "\n\t- omnipotent-principal-client-id: the client id of the
omnipotent principal created using create-omnipotent-principal on the target
Polaris" +
- "\n\t- omnipotent-principal-client-secret: the client secret
of the omnipotent principal created using create-omnipotent-principal on the
target Polaris" +
- "\n\t- omnipotent-principal-oauth2-server-uri: (default:
/v1/oauth/tokens endpoint for provided Polaris base-url) "
- + "the OAuth2 server to use to retrieve a bearer token for
the omnipotent-principal"
+ description = CLIUtil.API_SERVICE_PROPERTIES_DESCRIPTION +
CLIUtil.OMNIPOTENT_PRINCIPAL_PROPERTIES_DESCRIPTION
)
private Map<String, String> targetProperties;
@@ -130,41 +100,29 @@ public class SyncPolarisCommand implements
Callable<Integer> {
sourceProperties.put(PolarisApiService.ICEBERG_WRITE_ACCESS_PROPERTY,
Boolean.toString(false));
targetProperties.put(PolarisApiService.ICEBERG_WRITE_ACCESS_PROPERTY,
Boolean.toString(true));
- PolarisService source =
-
PolarisServiceFactory.createPolarisService(PolarisServiceFactory.ServiceType.API,
sourceProperties);
- PolarisService target =
-
PolarisServiceFactory.createPolarisService(PolarisServiceFactory.ServiceType.API,
targetProperties);
-
- ETagManager etagService =
ETagManagerFactory.createETagManager(etagManagerType, etagManagerProperties);
-
- Runtime.getRuntime()
- .addShutdownHook(
- new Thread(
- () -> {
- if (etagService instanceof Closeable closableETagService) {
- try {
- closableETagService.close();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }));
-
- PolarisSynchronizer synchronizer =
- new PolarisSynchronizer(
- consoleLog,
- haltOnFailure,
- accessControlAwarePlanner,
- source,
- target,
- etagService);
- synchronizer.syncPrincipalRoles();
- if (shouldSyncPrincipals) {
- consoleLog.warn("Principal migration will reset credentials on the
target Polaris instance. " +
- "Principal migration will log the new target Principal
credentials to stdout.");
- synchronizer.syncPrincipals();
+ try (
+ PolarisService source = PolarisServiceFactory.createPolarisService(
+ PolarisServiceFactory.ServiceType.API, sourceProperties);
+ PolarisService target = PolarisServiceFactory.createPolarisService(
+ PolarisServiceFactory.ServiceType.API, targetProperties);
+ ETagManager etagManager =
ETagManagerFactory.createETagManager(etagManagerType, etagManagerProperties)
+ ) {
+ PolarisSynchronizer synchronizer =
+ new PolarisSynchronizer(
+ consoleLog,
+ haltOnFailure,
+ accessControlAwarePlanner,
+ source,
+ target,
+ etagManager);
+ synchronizer.syncPrincipalRoles();
+ if (shouldSyncPrincipals) {
+ consoleLog.warn("Principal migration will reset credentials on the
target Polaris instance. " +
+ "Principal migration will log the new target Principal
credentials to stdout.");
+ synchronizer.syncPrincipals();
+ }
+ synchronizer.syncCatalogs();
}
- synchronizer.syncCatalogs();
return 0;
}