This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 671c5b478e [core] RESTCatalog: support DLFECSTokenLoader (#5312)
671c5b478e is described below

commit 671c5b478e22c1989b2175b72b8f568ca73bf9c3
Author: shyjsarah <[email protected]>
AuthorDate: Wed Mar 19 14:04:20 2025 +0800

    [core] RESTCatalog: support DLFECSTokenLoader (#5312)
---
 docs/content/concepts/rest-catalog.md              |  15 +++
 .../org/apache/paimon/rest/RESTCatalogOptions.java |  13 +++
 .../apache/paimon/rest/auth/DLFECSTokenLoader.java | 120 +++++++++++++++++++++
 .../paimon/rest/auth/DLFECSTokenLoaderFactory.java |  40 +++++++
 .../services/org.apache.paimon.factories.Factory   |   1 +
 .../apache/paimon/rest/auth/AuthSessionTest.java   | 120 +++++++++++++++++++++
 .../paimon/rest/auth/MockECSMetadataService.java   |  86 +++++++++++++++
 7 files changed, 395 insertions(+)

diff --git a/docs/content/concepts/rest-catalog.md 
b/docs/content/concepts/rest-catalog.md
index f11d7f3d69..ccf8445d33 100644
--- a/docs/content/concepts/rest-catalog.md
+++ b/docs/content/concepts/rest-catalog.md
@@ -109,8 +109,23 @@ WITH (
 );
 ```
 
+- DLF sts token using aliyun ecs role
+```sql
+CREATE CATALOG `paimon-rest-catalog`
+WITH (
+    'type' = 'paimon',
+    'uri' = '<catalog server url>',
+    'metastore' = 'rest',
+    'warehouse' = 'my_instance_name',
+    'token.provider' = 'dlf',
+    'dlf.token-loader' = 'ecs'
+    'dlf.token-ecs-role-name' = 'my_ecs_role_name'
+);
+```
+
 {{< hint info >}}
 The `'warehouse'` is your catalog instance name on the server, not the path.
+The `'dlf.token-ecs-role-name'` is an optional parameter,dlf token loader can 
automatically obtain it through ecs metadata service. More information about 
ecs role can be found 
[here](https://help.aliyun.com/zh/ecs/user-guide/attach-an-instance-ram-role-to-an-ecs-instance?spm=a2c4g.11186623.0.0.7e774adcRAJ0wK).
 {{< /hint >}}
 
 ## Conclusion
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java
index 3980108e7e..0359f6fa25 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java
@@ -85,4 +85,17 @@ public class RESTCatalogOptions {
                     .stringType()
                     .noDefaultValue()
                     .withDescription("REST Catalog auth DLF token loader.");
+
+    public static final ConfigOption<String> DLF_TOKEN_ECS_METADATA_URL =
+            ConfigOptions.key("dlf.token-ecs-metadata-url")
+                    .stringType()
+                    .defaultValue(
+                            
"http://100.100.100.200/latest/meta-data/Ram/security-credentials/";)
+                    .withDescription("REST Catalog auth DLF token ecs metadata 
url.");
+
+    public static final ConfigOption<String> DLF_TOKEN_ECS_ROLE_NAME =
+            ConfigOptions.key("dlf.token-ecs-role-name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("REST Catalog auth DLF token ecs role 
name.");
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFECSTokenLoader.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFECSTokenLoader.java
new file mode 100644
index 0000000000..c73f32c990
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFECSTokenLoader.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.auth;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.rest.ExponentialHttpRetryInterceptor;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.time.Duration;
+import java.util.Arrays;
+
+import static okhttp3.ConnectionSpec.CLEARTEXT;
+import static okhttp3.ConnectionSpec.COMPATIBLE_TLS;
+import static okhttp3.ConnectionSpec.MODERN_TLS;
+import static org.apache.paimon.rest.RESTObjectMapper.OBJECT_MAPPER;
+
+/** DLF Token Loader for ECS Metadata Service. */
+public class DLFECSTokenLoader implements DLFTokenLoader {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DLFECSTokenLoader.class);
+
+    private static final OkHttpClient HTTP_CLIENT =
+            new OkHttpClient.Builder()
+                    .retryOnConnectionFailure(true)
+                    .connectionSpecs(Arrays.asList(MODERN_TLS, COMPATIBLE_TLS, 
CLEARTEXT))
+                    .addInterceptor(new ExponentialHttpRetryInterceptor(3))
+                    .connectTimeout(Duration.ofMinutes(3))
+                    .readTimeout(Duration.ofMinutes(3))
+                    .build();
+
+    private String ecsMetadataURL;
+
+    private String roleName;
+
+    public DLFECSTokenLoader(String ecsMetaDataURL, @Nullable String roleName) 
{
+        this.ecsMetadataURL = ecsMetaDataURL;
+        this.roleName = roleName;
+    }
+
+    @Override
+    public DLFToken loadToken() {
+        if (roleName == null) {
+            roleName = getRole(ecsMetadataURL);
+        }
+        return getToken(ecsMetadataURL + roleName);
+    }
+
+    private static String getRole(String url) {
+        try {
+            return getResponseBody(url);
+        } catch (Exception e) {
+            throw new RuntimeException("get role failed, error : " + 
e.getMessage(), e);
+        }
+    }
+
+    private static DLFToken getToken(String url) {
+        try {
+            String token = getResponseBody(url);
+            return OBJECT_MAPPER.readValue(token, DLFToken.class);
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        } catch (Exception e) {
+            throw new RuntimeException("get token failed, error : " + 
e.getMessage(), e);
+        }
+    }
+
+    @VisibleForTesting
+    protected static String getResponseBody(String url) {
+        Request request = new Request.Builder().url(url).get().build();
+        long startTime = System.currentTimeMillis();
+        try (Response response = HTTP_CLIENT.newCall(request).execute()) {
+            if (response == null) {
+                throw new RuntimeException("get response failed, response is 
null");
+            }
+            if (!response.isSuccessful()) {
+                throw new RuntimeException("get response failed, response : " 
+ response);
+            }
+            String responseBodyStr = response.body() != null ? 
response.body().string() : null;
+            if (responseBodyStr == null) {
+                throw new RuntimeException("get response failed, response body 
is null");
+            }
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(
+                        "get response success, url : {}, cost : {} ms",
+                        url,
+                        System.currentTimeMillis() - startTime);
+            }
+            return responseBodyStr;
+        } catch (RuntimeException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new RuntimeException("get response failed, error : " + 
e.getMessage(), e);
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFECSTokenLoaderFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFECSTokenLoaderFactory.java
new file mode 100644
index 0000000000..12be6ef276
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFECSTokenLoaderFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.auth;
+
+import org.apache.paimon.options.Options;
+
+import static 
org.apache.paimon.rest.RESTCatalogOptions.DLF_TOKEN_ECS_METADATA_URL;
+import static 
org.apache.paimon.rest.RESTCatalogOptions.DLF_TOKEN_ECS_ROLE_NAME;
+
+/** Factory for {@link DLFECSTokenLoader}. */
+public class DLFECSTokenLoaderFactory implements DLFTokenLoaderFactory {
+
+    @Override
+    public String identifier() {
+        return "ecs";
+    }
+
+    @Override
+    public DLFTokenLoader create(Options options) {
+        String ecsMetadataURL = options.get(DLF_TOKEN_ECS_METADATA_URL);
+        String roleName = options.get(DLF_TOKEN_ECS_ROLE_NAME);
+        return new DLFECSTokenLoader(ecsMetadataURL, roleName);
+    }
+}
diff --git 
a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 3789e93908..bf6317e847 100644
--- 
a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -41,3 +41,4 @@ 
org.apache.paimon.iceberg.migrate.IcebergMigrateHadoopMetadataFactory
 org.apache.paimon.rest.auth.BearTokenAuthProviderFactory
 org.apache.paimon.rest.auth.DLFAuthProviderFactory
 org.apache.paimon.rest.auth.DLFLocalFileTokenLoaderFactory
+org.apache.paimon.rest.auth.DLFECSTokenLoaderFactory
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthSessionTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthSessionTest.java
index b0ab1fd8dd..9a17f08906 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthSessionTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthSessionTest.java
@@ -46,6 +46,8 @@ import static 
org.apache.paimon.rest.RESTCatalogOptions.DLF_ACCESS_KEY_ID;
 import static org.apache.paimon.rest.RESTCatalogOptions.DLF_ACCESS_KEY_SECRET;
 import static org.apache.paimon.rest.RESTCatalogOptions.DLF_REGION;
 import static org.apache.paimon.rest.RESTCatalogOptions.DLF_SECURITY_TOKEN;
+import static 
org.apache.paimon.rest.RESTCatalogOptions.DLF_TOKEN_ECS_METADATA_URL;
+import static 
org.apache.paimon.rest.RESTCatalogOptions.DLF_TOKEN_ECS_ROLE_NAME;
 import static org.apache.paimon.rest.RESTCatalogOptions.DLF_TOKEN_LOADER;
 import static org.apache.paimon.rest.RESTCatalogOptions.DLF_TOKEN_PATH;
 import static org.apache.paimon.rest.RESTCatalogOptions.TOKEN;
@@ -291,6 +293,124 @@ public class AuthSessionTest {
         Assert.assertEquals(fetchToken.getSecurityToken(), 
customToken.getSecurityToken());
     }
 
+    @Test
+    public void testCreateDlfAuthProviderByECSTokenProvider()
+            throws IOException, InterruptedException {
+        MockECSMetadataService mockECSMetadataService = new 
MockECSMetadataService("EcsTestRole");
+        mockECSMetadataService.start();
+        try {
+            DLFToken theFirstMockToken = generateToken();
+            mockECSMetadataService.setMockToken(theFirstMockToken);
+            String theFirstMockTokenStr =
+                    
OBJECT_MAPPER_INSTANCE.writeValueAsString(theFirstMockToken);
+            long tokenRefreshInMills = 1000;
+            // create options with token loader
+            Options options = new Options();
+            options.set(DLF_TOKEN_LOADER.key(), "ecs");
+            options.set(
+                    DLF_TOKEN_ECS_METADATA_URL.key(),
+                    mockECSMetadataService.getUrl() + 
"latest/meta-data/Ram/security-credentials/");
+            options.set(RESTCatalogOptions.URI.key(), "serverUrl");
+            options.set(DLF_REGION.key(), "cn-hangzhou");
+            options.set(TOKEN_REFRESH_TIME.key(), tokenRefreshInMills + "ms");
+            AuthProvider authProvider =
+                    AuthProviderFactory.createAuthProvider(
+                            AuthProviderEnum.DLF.identifier(), options);
+            ScheduledExecutorService executor =
+                    ThreadPoolUtils.createScheduledThreadPool(1, 
"refresh-token");
+            AuthSession session = 
AuthSession.fromRefreshAuthProvider(executor, authProvider);
+            DLFAuthProvider dlfAuthProvider = (DLFAuthProvider) 
session.getAuthProvider();
+            String theFirstFetchTokenStr =
+                    
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
+            assertEquals(theFirstFetchTokenStr, theFirstMockTokenStr);
+
+            DLFToken theSecondMockToken = generateToken();
+            String theSecondMockTokenStr =
+                    
OBJECT_MAPPER_INSTANCE.writeValueAsString(theSecondMockToken);
+            mockECSMetadataService.setMockToken(theSecondMockToken);
+            Thread.sleep(tokenRefreshInMills * 2);
+            String theSecondFetchTokenStr =
+                    
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
+            assertEquals(theSecondFetchTokenStr, theSecondMockTokenStr);
+        } finally {
+            mockECSMetadataService.shutdown();
+        }
+    }
+
+    @Test
+    public void testCreateDlfAuthProviderByECSTokenProviderWithDefineRole()
+            throws IOException, InterruptedException {
+        MockECSMetadataService mockECSMetadataService = new 
MockECSMetadataService("CustomRole");
+        mockECSMetadataService.start();
+        try {
+            DLFToken theFirstMockToken = generateToken();
+            mockECSMetadataService.setMockToken(theFirstMockToken);
+            String theFirstMockTokenStr =
+                    
OBJECT_MAPPER_INSTANCE.writeValueAsString(theFirstMockToken);
+            long tokenRefreshInMills = 1000;
+            // create options with token loader
+            Options options = new Options();
+            options.set(DLF_TOKEN_LOADER.key(), "ecs");
+            options.set(
+                    DLF_TOKEN_ECS_METADATA_URL.key(),
+                    mockECSMetadataService.getUrl() + 
"latest/meta-data/Ram/security-credentials/");
+            options.set(DLF_TOKEN_ECS_ROLE_NAME.key(), "CustomRole");
+            options.set(RESTCatalogOptions.URI.key(), "serverUrl");
+            options.set(DLF_REGION.key(), "cn-hangzhou");
+            options.set(TOKEN_REFRESH_TIME.key(), tokenRefreshInMills + "ms");
+            AuthProvider authProvider =
+                    AuthProviderFactory.createAuthProvider(
+                            AuthProviderEnum.DLF.identifier(), options);
+            ScheduledExecutorService executor =
+                    ThreadPoolUtils.createScheduledThreadPool(1, 
"refresh-token");
+            AuthSession session = 
AuthSession.fromRefreshAuthProvider(executor, authProvider);
+            DLFAuthProvider dlfAuthProvider = (DLFAuthProvider) 
session.getAuthProvider();
+            String theFirstFetchTokenStr =
+                    
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
+            assertEquals(theFirstFetchTokenStr, theFirstMockTokenStr);
+
+            DLFToken theSecondMockToken = generateToken();
+            String theSecondMockTokenStr =
+                    
OBJECT_MAPPER_INSTANCE.writeValueAsString(theSecondMockToken);
+            mockECSMetadataService.setMockToken(theSecondMockToken);
+            Thread.sleep(tokenRefreshInMills * 2);
+            String theSecondFetchTokenStr =
+                    
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
+            assertEquals(theSecondFetchTokenStr, theSecondMockTokenStr);
+        } finally {
+            mockECSMetadataService.shutdown();
+        }
+    }
+
+    @Test
+    public void testCreateDlfAuthProviderByECSTokenProviderWithInvalidRole()
+            throws IOException, InterruptedException {
+        MockECSMetadataService mockECSMetadataService = new 
MockECSMetadataService("EcsTestRole");
+        mockECSMetadataService.start();
+        try {
+            DLFToken theFirstMockToken = generateToken();
+            mockECSMetadataService.setMockToken(theFirstMockToken);
+            // create options with token loader
+            Options options = new Options();
+            options.set(DLF_TOKEN_LOADER.key(), "ecs");
+            options.set(
+                    DLF_TOKEN_ECS_METADATA_URL.key(),
+                    mockECSMetadataService.getUrl() + 
"latest/meta-data/Ram/security-credentials/");
+            options.set(DLF_TOKEN_ECS_ROLE_NAME.key(), "CustomRole");
+            options.set(RESTCatalogOptions.URI.key(), "serverUrl");
+            options.set(DLF_REGION.key(), "cn-hangzhou");
+            assertThrows(
+                    RuntimeException.class,
+                    () -> {
+                        AuthProvider authProvider =
+                                AuthProviderFactory.createAuthProvider(
+                                        AuthProviderEnum.DLF.identifier(), 
options);
+                    });
+        } finally {
+            mockECSMetadataService.shutdown();
+        }
+    }
+
     @Test
     public void testDLFAuthProviderAuthHeaderWhenDataIsNotEmpty() throws 
Exception {
         String fileName = UUID.randomUUID().toString();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/auth/MockECSMetadataService.java
 
b/paimon-core/src/test/java/org/apache/paimon/rest/auth/MockECSMetadataService.java
new file mode 100644
index 0000000000..32ae496d6b
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/rest/auth/MockECSMetadataService.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.auth;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import okhttp3.mockwebserver.Dispatcher;
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+import okhttp3.mockwebserver.RecordedRequest;
+
+import java.io.IOException;
+
+/** Mock ECS metadata service for testing. */
+public class MockECSMetadataService {
+
+    private static final ObjectMapper OBJECT_MAPPER_INSTANCE = new 
ObjectMapper();
+
+    private final MockWebServer server;
+
+    private DLFToken mockToken;
+
+    public MockECSMetadataService(String ecsRoleName) {
+        server = new MockWebServer();
+        server.setDispatcher(initDispatcher(ecsRoleName));
+    }
+
+    public void start() throws IOException {
+        server.start();
+    }
+
+    public void shutdown() throws IOException {
+        server.shutdown();
+    }
+
+    public String getUrl() {
+        return server.url("").toString();
+    }
+
+    public Dispatcher initDispatcher(String ecsRoleName) {
+        return new Dispatcher() {
+            @Override
+            public MockResponse dispatch(RecordedRequest request) {
+                String path = request.getPath();
+                if 
(path.equals("/latest/meta-data/Ram/security-credentials/")) {
+                    return new 
MockResponse().setResponseCode(200).setBody(ecsRoleName);
+                } else if (path.equals(
+                        "/latest/meta-data/Ram/security-credentials/" + 
ecsRoleName)) {
+                    try {
+                        String body = 
OBJECT_MAPPER_INSTANCE.writeValueAsString(mockToken);
+                        return new 
MockResponse().setResponseCode(200).setBody(body);
+                    } catch (JsonProcessingException e) {
+                        throw new RuntimeException(e);
+                    }
+                } else {
+                    return new MockResponse().setResponseCode(404);
+                }
+            }
+        };
+    }
+
+    public void setMockToken(DLFToken mockToken) {
+        this.mockToken = mockToken;
+    }
+
+    public DLFToken getMockToken() {
+        return mockToken;
+    }
+}

Reply via email to