This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 671c5b478e [core] RESTCatalog: support DLFECSTokenLoader (#5312)
671c5b478e is described below
commit 671c5b478e22c1989b2175b72b8f568ca73bf9c3
Author: shyjsarah <[email protected]>
AuthorDate: Wed Mar 19 14:04:20 2025 +0800
[core] RESTCatalog: support DLFECSTokenLoader (#5312)
---
docs/content/concepts/rest-catalog.md | 15 +++
.../org/apache/paimon/rest/RESTCatalogOptions.java | 13 +++
.../apache/paimon/rest/auth/DLFECSTokenLoader.java | 120 +++++++++++++++++++++
.../paimon/rest/auth/DLFECSTokenLoaderFactory.java | 40 +++++++
.../services/org.apache.paimon.factories.Factory | 1 +
.../apache/paimon/rest/auth/AuthSessionTest.java | 120 +++++++++++++++++++++
.../paimon/rest/auth/MockECSMetadataService.java | 86 +++++++++++++++
7 files changed, 395 insertions(+)
diff --git a/docs/content/concepts/rest-catalog.md
b/docs/content/concepts/rest-catalog.md
index f11d7f3d69..ccf8445d33 100644
--- a/docs/content/concepts/rest-catalog.md
+++ b/docs/content/concepts/rest-catalog.md
@@ -109,8 +109,23 @@ WITH (
);
```
+- DLF sts token using aliyun ecs role
+```sql
+CREATE CATALOG `paimon-rest-catalog`
+WITH (
+ 'type' = 'paimon',
+ 'uri' = '<catalog server url>',
+ 'metastore' = 'rest',
+ 'warehouse' = 'my_instance_name',
+ 'token.provider' = 'dlf',
+ 'dlf.token-loader' = 'ecs'
+ 'dlf.token-ecs-role-name' = 'my_ecs_role_name'
+);
+```
+
{{< hint info >}}
The `'warehouse'` is your catalog instance name on the server, not the path.
+The `'dlf.token-ecs-role-name'` is an optional parameter,dlf token loader can
automatically obtain it through ecs metadata service. More information about
ecs role can be found
[here](https://help.aliyun.com/zh/ecs/user-guide/attach-an-instance-ram-role-to-an-ecs-instance?spm=a2c4g.11186623.0.0.7e774adcRAJ0wK).
{{< /hint >}}
## Conclusion
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java
index 3980108e7e..0359f6fa25 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java
@@ -85,4 +85,17 @@ public class RESTCatalogOptions {
.stringType()
.noDefaultValue()
.withDescription("REST Catalog auth DLF token loader.");
+
+ public static final ConfigOption<String> DLF_TOKEN_ECS_METADATA_URL =
+ ConfigOptions.key("dlf.token-ecs-metadata-url")
+ .stringType()
+ .defaultValue(
+
"http://100.100.100.200/latest/meta-data/Ram/security-credentials/")
+ .withDescription("REST Catalog auth DLF token ecs metadata
url.");
+
+ public static final ConfigOption<String> DLF_TOKEN_ECS_ROLE_NAME =
+ ConfigOptions.key("dlf.token-ecs-role-name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("REST Catalog auth DLF token ecs role
name.");
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFECSTokenLoader.java
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFECSTokenLoader.java
new file mode 100644
index 0000000000..c73f32c990
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFECSTokenLoader.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.auth;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.rest.ExponentialHttpRetryInterceptor;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.time.Duration;
+import java.util.Arrays;
+
+import static okhttp3.ConnectionSpec.CLEARTEXT;
+import static okhttp3.ConnectionSpec.COMPATIBLE_TLS;
+import static okhttp3.ConnectionSpec.MODERN_TLS;
+import static org.apache.paimon.rest.RESTObjectMapper.OBJECT_MAPPER;
+
+/** DLF Token Loader for ECS Metadata Service. */
+public class DLFECSTokenLoader implements DLFTokenLoader {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DLFECSTokenLoader.class);
+
+ private static final OkHttpClient HTTP_CLIENT =
+ new OkHttpClient.Builder()
+ .retryOnConnectionFailure(true)
+ .connectionSpecs(Arrays.asList(MODERN_TLS, COMPATIBLE_TLS,
CLEARTEXT))
+ .addInterceptor(new ExponentialHttpRetryInterceptor(3))
+ .connectTimeout(Duration.ofMinutes(3))
+ .readTimeout(Duration.ofMinutes(3))
+ .build();
+
+ private String ecsMetadataURL;
+
+ private String roleName;
+
+ public DLFECSTokenLoader(String ecsMetaDataURL, @Nullable String roleName)
{
+ this.ecsMetadataURL = ecsMetaDataURL;
+ this.roleName = roleName;
+ }
+
+ @Override
+ public DLFToken loadToken() {
+ if (roleName == null) {
+ roleName = getRole(ecsMetadataURL);
+ }
+ return getToken(ecsMetadataURL + roleName);
+ }
+
+ private static String getRole(String url) {
+ try {
+ return getResponseBody(url);
+ } catch (Exception e) {
+ throw new RuntimeException("get role failed, error : " +
e.getMessage(), e);
+ }
+ }
+
+ private static DLFToken getToken(String url) {
+ try {
+ String token = getResponseBody(url);
+ return OBJECT_MAPPER.readValue(token, DLFToken.class);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ } catch (Exception e) {
+ throw new RuntimeException("get token failed, error : " +
e.getMessage(), e);
+ }
+ }
+
+ @VisibleForTesting
+ protected static String getResponseBody(String url) {
+ Request request = new Request.Builder().url(url).get().build();
+ long startTime = System.currentTimeMillis();
+ try (Response response = HTTP_CLIENT.newCall(request).execute()) {
+ if (response == null) {
+ throw new RuntimeException("get response failed, response is
null");
+ }
+ if (!response.isSuccessful()) {
+ throw new RuntimeException("get response failed, response : "
+ response);
+ }
+ String responseBodyStr = response.body() != null ?
response.body().string() : null;
+ if (responseBodyStr == null) {
+ throw new RuntimeException("get response failed, response body
is null");
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "get response success, url : {}, cost : {} ms",
+ url,
+ System.currentTimeMillis() - startTime);
+ }
+ return responseBodyStr;
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new RuntimeException("get response failed, error : " +
e.getMessage(), e);
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFECSTokenLoaderFactory.java
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFECSTokenLoaderFactory.java
new file mode 100644
index 0000000000..12be6ef276
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFECSTokenLoaderFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.auth;
+
+import org.apache.paimon.options.Options;
+
+import static
org.apache.paimon.rest.RESTCatalogOptions.DLF_TOKEN_ECS_METADATA_URL;
+import static
org.apache.paimon.rest.RESTCatalogOptions.DLF_TOKEN_ECS_ROLE_NAME;
+
+/** Factory for {@link DLFECSTokenLoader}. */
+public class DLFECSTokenLoaderFactory implements DLFTokenLoaderFactory {
+
+ @Override
+ public String identifier() {
+ return "ecs";
+ }
+
+ @Override
+ public DLFTokenLoader create(Options options) {
+ String ecsMetadataURL = options.get(DLF_TOKEN_ECS_METADATA_URL);
+ String roleName = options.get(DLF_TOKEN_ECS_ROLE_NAME);
+ return new DLFECSTokenLoader(ecsMetadataURL, roleName);
+ }
+}
diff --git
a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 3789e93908..bf6317e847 100644
---
a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -41,3 +41,4 @@
org.apache.paimon.iceberg.migrate.IcebergMigrateHadoopMetadataFactory
org.apache.paimon.rest.auth.BearTokenAuthProviderFactory
org.apache.paimon.rest.auth.DLFAuthProviderFactory
org.apache.paimon.rest.auth.DLFLocalFileTokenLoaderFactory
+org.apache.paimon.rest.auth.DLFECSTokenLoaderFactory
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthSessionTest.java
b/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthSessionTest.java
index b0ab1fd8dd..9a17f08906 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthSessionTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthSessionTest.java
@@ -46,6 +46,8 @@ import static
org.apache.paimon.rest.RESTCatalogOptions.DLF_ACCESS_KEY_ID;
import static org.apache.paimon.rest.RESTCatalogOptions.DLF_ACCESS_KEY_SECRET;
import static org.apache.paimon.rest.RESTCatalogOptions.DLF_REGION;
import static org.apache.paimon.rest.RESTCatalogOptions.DLF_SECURITY_TOKEN;
+import static
org.apache.paimon.rest.RESTCatalogOptions.DLF_TOKEN_ECS_METADATA_URL;
+import static
org.apache.paimon.rest.RESTCatalogOptions.DLF_TOKEN_ECS_ROLE_NAME;
import static org.apache.paimon.rest.RESTCatalogOptions.DLF_TOKEN_LOADER;
import static org.apache.paimon.rest.RESTCatalogOptions.DLF_TOKEN_PATH;
import static org.apache.paimon.rest.RESTCatalogOptions.TOKEN;
@@ -291,6 +293,124 @@ public class AuthSessionTest {
Assert.assertEquals(fetchToken.getSecurityToken(),
customToken.getSecurityToken());
}
+ @Test
+ public void testCreateDlfAuthProviderByECSTokenProvider()
+ throws IOException, InterruptedException {
+ MockECSMetadataService mockECSMetadataService = new
MockECSMetadataService("EcsTestRole");
+ mockECSMetadataService.start();
+ try {
+ DLFToken theFirstMockToken = generateToken();
+ mockECSMetadataService.setMockToken(theFirstMockToken);
+ String theFirstMockTokenStr =
+
OBJECT_MAPPER_INSTANCE.writeValueAsString(theFirstMockToken);
+ long tokenRefreshInMills = 1000;
+ // create options with token loader
+ Options options = new Options();
+ options.set(DLF_TOKEN_LOADER.key(), "ecs");
+ options.set(
+ DLF_TOKEN_ECS_METADATA_URL.key(),
+ mockECSMetadataService.getUrl() +
"latest/meta-data/Ram/security-credentials/");
+ options.set(RESTCatalogOptions.URI.key(), "serverUrl");
+ options.set(DLF_REGION.key(), "cn-hangzhou");
+ options.set(TOKEN_REFRESH_TIME.key(), tokenRefreshInMills + "ms");
+ AuthProvider authProvider =
+ AuthProviderFactory.createAuthProvider(
+ AuthProviderEnum.DLF.identifier(), options);
+ ScheduledExecutorService executor =
+ ThreadPoolUtils.createScheduledThreadPool(1,
"refresh-token");
+ AuthSession session =
AuthSession.fromRefreshAuthProvider(executor, authProvider);
+ DLFAuthProvider dlfAuthProvider = (DLFAuthProvider)
session.getAuthProvider();
+ String theFirstFetchTokenStr =
+
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
+ assertEquals(theFirstFetchTokenStr, theFirstMockTokenStr);
+
+ DLFToken theSecondMockToken = generateToken();
+ String theSecondMockTokenStr =
+
OBJECT_MAPPER_INSTANCE.writeValueAsString(theSecondMockToken);
+ mockECSMetadataService.setMockToken(theSecondMockToken);
+ Thread.sleep(tokenRefreshInMills * 2);
+ String theSecondFetchTokenStr =
+
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
+ assertEquals(theSecondFetchTokenStr, theSecondMockTokenStr);
+ } finally {
+ mockECSMetadataService.shutdown();
+ }
+ }
+
+ @Test
+ public void testCreateDlfAuthProviderByECSTokenProviderWithDefineRole()
+ throws IOException, InterruptedException {
+ MockECSMetadataService mockECSMetadataService = new
MockECSMetadataService("CustomRole");
+ mockECSMetadataService.start();
+ try {
+ DLFToken theFirstMockToken = generateToken();
+ mockECSMetadataService.setMockToken(theFirstMockToken);
+ String theFirstMockTokenStr =
+
OBJECT_MAPPER_INSTANCE.writeValueAsString(theFirstMockToken);
+ long tokenRefreshInMills = 1000;
+ // create options with token loader
+ Options options = new Options();
+ options.set(DLF_TOKEN_LOADER.key(), "ecs");
+ options.set(
+ DLF_TOKEN_ECS_METADATA_URL.key(),
+ mockECSMetadataService.getUrl() +
"latest/meta-data/Ram/security-credentials/");
+ options.set(DLF_TOKEN_ECS_ROLE_NAME.key(), "CustomRole");
+ options.set(RESTCatalogOptions.URI.key(), "serverUrl");
+ options.set(DLF_REGION.key(), "cn-hangzhou");
+ options.set(TOKEN_REFRESH_TIME.key(), tokenRefreshInMills + "ms");
+ AuthProvider authProvider =
+ AuthProviderFactory.createAuthProvider(
+ AuthProviderEnum.DLF.identifier(), options);
+ ScheduledExecutorService executor =
+ ThreadPoolUtils.createScheduledThreadPool(1,
"refresh-token");
+ AuthSession session =
AuthSession.fromRefreshAuthProvider(executor, authProvider);
+ DLFAuthProvider dlfAuthProvider = (DLFAuthProvider)
session.getAuthProvider();
+ String theFirstFetchTokenStr =
+
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
+ assertEquals(theFirstFetchTokenStr, theFirstMockTokenStr);
+
+ DLFToken theSecondMockToken = generateToken();
+ String theSecondMockTokenStr =
+
OBJECT_MAPPER_INSTANCE.writeValueAsString(theSecondMockToken);
+ mockECSMetadataService.setMockToken(theSecondMockToken);
+ Thread.sleep(tokenRefreshInMills * 2);
+ String theSecondFetchTokenStr =
+
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
+ assertEquals(theSecondFetchTokenStr, theSecondMockTokenStr);
+ } finally {
+ mockECSMetadataService.shutdown();
+ }
+ }
+
+ @Test
+ public void testCreateDlfAuthProviderByECSTokenProviderWithInvalidRole()
+ throws IOException, InterruptedException {
+ MockECSMetadataService mockECSMetadataService = new
MockECSMetadataService("EcsTestRole");
+ mockECSMetadataService.start();
+ try {
+ DLFToken theFirstMockToken = generateToken();
+ mockECSMetadataService.setMockToken(theFirstMockToken);
+ // create options with token loader
+ Options options = new Options();
+ options.set(DLF_TOKEN_LOADER.key(), "ecs");
+ options.set(
+ DLF_TOKEN_ECS_METADATA_URL.key(),
+ mockECSMetadataService.getUrl() +
"latest/meta-data/Ram/security-credentials/");
+ options.set(DLF_TOKEN_ECS_ROLE_NAME.key(), "CustomRole");
+ options.set(RESTCatalogOptions.URI.key(), "serverUrl");
+ options.set(DLF_REGION.key(), "cn-hangzhou");
+ assertThrows(
+ RuntimeException.class,
+ () -> {
+ AuthProvider authProvider =
+ AuthProviderFactory.createAuthProvider(
+ AuthProviderEnum.DLF.identifier(),
options);
+ });
+ } finally {
+ mockECSMetadataService.shutdown();
+ }
+ }
+
@Test
public void testDLFAuthProviderAuthHeaderWhenDataIsNotEmpty() throws
Exception {
String fileName = UUID.randomUUID().toString();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/auth/MockECSMetadataService.java
b/paimon-core/src/test/java/org/apache/paimon/rest/auth/MockECSMetadataService.java
new file mode 100644
index 0000000000..32ae496d6b
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/rest/auth/MockECSMetadataService.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.auth;
+
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import okhttp3.mockwebserver.Dispatcher;
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+import okhttp3.mockwebserver.RecordedRequest;
+
+import java.io.IOException;
+
+/** Mock ECS metadata service for testing. */
+public class MockECSMetadataService {
+
+ private static final ObjectMapper OBJECT_MAPPER_INSTANCE = new
ObjectMapper();
+
+ private final MockWebServer server;
+
+ private DLFToken mockToken;
+
+ public MockECSMetadataService(String ecsRoleName) {
+ server = new MockWebServer();
+ server.setDispatcher(initDispatcher(ecsRoleName));
+ }
+
+ public void start() throws IOException {
+ server.start();
+ }
+
+ public void shutdown() throws IOException {
+ server.shutdown();
+ }
+
+ public String getUrl() {
+ return server.url("").toString();
+ }
+
+ public Dispatcher initDispatcher(String ecsRoleName) {
+ return new Dispatcher() {
+ @Override
+ public MockResponse dispatch(RecordedRequest request) {
+ String path = request.getPath();
+ if
(path.equals("/latest/meta-data/Ram/security-credentials/")) {
+ return new
MockResponse().setResponseCode(200).setBody(ecsRoleName);
+ } else if (path.equals(
+ "/latest/meta-data/Ram/security-credentials/" +
ecsRoleName)) {
+ try {
+ String body =
OBJECT_MAPPER_INSTANCE.writeValueAsString(mockToken);
+ return new
MockResponse().setResponseCode(200).setBody(body);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ return new MockResponse().setResponseCode(404);
+ }
+ }
+ };
+ }
+
+ public void setMockToken(DLFToken mockToken) {
+ this.mockToken = mockToken;
+ }
+
+ public DLFToken getMockToken() {
+ return mockToken;
+ }
+}