This is an automated email from the ASF dual-hosted git repository.

gaborgsomogyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 74b100bcba3 [FLINK-35525][yarn] Add a token services configuration to 
allow obtained token to be passed to Yarn AM
74b100bcba3 is described below

commit 74b100bcba33ce18b21a44b26db7e162507f7830
Author: Zhen Wang <643348...@qq.com>
AuthorDate: Mon Jun 10 20:50:14 2024 +0800

    [FLINK-35525][yarn] Add a token services configuration to allow obtained 
token to be passed to Yarn AM
---
 .../generated/yarn_config_configuration.html       |  6 +++
 .../apache/flink/yarn/YarnClusterDescriptor.java   |  6 ++-
 .../yarn/configuration/YarnConfigOptions.java      | 10 ++++
 .../flink/yarn/YarnClusterDescriptorTest.java      | 31 +++++++++++
 .../token/TestYarnAMDelegationTokenProvider.java   | 63 ++++++++++++++++++++++
 .../token/TestYarnAMDelegationTokenReceiver.java   | 36 +++++++++++++
 ...ink.core.security.token.DelegationTokenProvider | 16 ++++++
 ...ink.core.security.token.DelegationTokenReceiver | 16 ++++++
 8 files changed, 182 insertions(+), 2 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/yarn_config_configuration.html 
b/docs/layouts/shortcodes/generated/yarn_config_configuration.html
index 41c32c77c94..cb0b0f935ba 100644
--- a/docs/layouts/shortcodes/generated/yarn_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/yarn_config_configuration.html
@@ -152,6 +152,12 @@
             <td>String</td>
             <td>The provided usrlib directory in remote. It should be 
pre-uploaded and world-readable. Flink will use it to exclude the local usrlib 
directory(i.e. usrlib/ under the parent directory of FLINK_LIB_DIR). Unlike 
yarn.provided.lib.dirs, YARN will not cache it on the nodes as it is for each 
application. An example could be 
hdfs://$namenode_address/path/of/flink/usrlib</td>
         </tr>
+        <tr>
+            <td><h5>yarn.security.appmaster.delegation.token.services</h5></td>
+            <td style="word-wrap: break-word;">"hadoopfs"</td>
+            <td>List&lt;String&gt;</td>
+            <td>The delegation token provider services are allowed to pass 
obtained tokens to YARN application master. For backward compatibility to make 
log aggregation to work, we add tokens obtained by `hadoopfs` provider to AM by 
default.</td>
+        </tr>
         <tr>
             <td><h5>yarn.security.kerberos.localized-keytab-path</h5></td>
             <td style="word-wrap: break-word;">"krb5.keytab"</td>
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 31bd574b024..bc65a5ee93d 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -147,6 +147,7 @@ import static 
org.apache.flink.yarn.Utils.getPathFromLocalFilePathStr;
 import static org.apache.flink.yarn.Utils.getStartCommand;
 import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
 import static 
org.apache.flink.yarn.YarnConfigKeys.LOCAL_RESOURCE_DESCRIPTOR_SEPARATOR;
+import static 
org.apache.flink.yarn.configuration.YarnConfigOptions.APP_MASTER_TOKEN_SERVICES;
 import static 
org.apache.flink.yarn.configuration.YarnConfigOptions.YARN_CONTAINER_START_COMMAND_TEMPLATE;
 
 /** The descriptor with deployment information for deploying a Flink cluster 
on Yarn. */
@@ -1348,7 +1349,8 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
                         });
     }
 
-    private void setTokensFor(ContainerLaunchContext containerLaunchContext, 
boolean fetchToken)
+    @VisibleForTesting
+    void setTokensFor(ContainerLaunchContext containerLaunchContext, boolean 
fetchToken)
             throws Exception {
         Credentials credentials = new Credentials();
 
@@ -1372,7 +1374,7 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
 
             // This is here for backward compatibility to make log aggregation 
work
             for (Map.Entry<String, byte[]> e : 
container.getTokens().entrySet()) {
-                if (e.getKey().equals("hadoopfs")) {
+                if 
(flinkConfiguration.get(APP_MASTER_TOKEN_SERVICES).contains(e.getKey())) {
                     
credentials.addAll(HadoopDelegationTokenConverter.deserialize(e.getValue()));
                 }
             }
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
index 06ab7e7437d..5f45c1f86b4 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
@@ -364,6 +364,16 @@ public class YarnConfigOptions {
                                     + "resource directory. If set to false, 
Flink"
                                     + " will try to directly locate the keytab 
from the path itself.");
 
+    public static final ConfigOption<List<String>> APP_MASTER_TOKEN_SERVICES =
+            key("yarn.security.appmaster.delegation.token.services")
+                    .stringType()
+                    .asList()
+                    .defaultValues("hadoopfs")
+                    .withDescription(
+                            "The delegation token provider services are 
allowed to pass obtained tokens to YARN application master."
+                                    + " For backward compatibility to make log 
aggregation to work, we add tokens obtained"
+                                    + " by `hadoopfs` provider to AM by 
default.");
+
     public static final ConfigOption<List<String>> PROVIDED_LIB_DIRS =
             key("yarn.provided.lib.dirs")
                     .stringType()
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index e0e581f75eb..f87313cf21b 100644
--- 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -35,23 +35,29 @@ import 
org.apache.flink.yarn.configuration.YarnConfigOptions;
 import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal;
 import org.apache.flink.yarn.configuration.YarnDeploymentTarget;
 import org.apache.flink.yarn.configuration.YarnLogConfigUtil;
+import org.apache.flink.yarn.token.TestYarnAMDelegationTokenProvider;
 
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.Records;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
@@ -69,6 +75,7 @@ import java.util.UUID;
 import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
 import static 
org.apache.flink.runtime.jobmanager.JobManagerProcessUtils.createDefaultJobManagerProcessSpec;
 import static org.apache.flink.yarn.Utils.getPathFromLocalFile;
+import static 
org.apache.flink.yarn.configuration.YarnConfigOptions.APP_MASTER_TOKEN_SERVICES;
 import static 
org.apache.flink.yarn.configuration.YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR;
 import static 
org.apache.flink.yarn.configuration.YarnConfigOptions.YARN_CONTAINER_START_COMMAND_TEMPLATE;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -974,4 +981,28 @@ class YarnClusterDescriptorTest {
                     appId.toString());
         }
     }
+
+    @Test
+    public void testSetTokensForYarnAppMaster() {
+        final Configuration flinkConfig = new Configuration();
+        flinkConfig.set(
+                APP_MASTER_TOKEN_SERVICES,
+                Arrays.asList(TestYarnAMDelegationTokenProvider.SERVICE_NAME));
+        YarnClusterDescriptor yarnClusterDescriptor = 
createYarnClusterDescriptor(flinkConfig);
+        ContainerLaunchContext amContainer = 
Records.newRecord(ContainerLaunchContext.class);
+        try {
+            yarnClusterDescriptor.setTokensFor(amContainer, true);
+            Credentials credentials = new Credentials();
+            try (DataInputStream dis =
+                    new DataInputStream(
+                            new 
ByteArrayInputStream(amContainer.getTokens().array()))) {
+                credentials.readTokenStorageStream(dis);
+            }
+            assertThat(credentials.getAllTokens())
+                    .hasSize(1)
+                    
.contains(TestYarnAMDelegationTokenProvider.TEST_YARN_AM_TOKEN);
+        } catch (Exception e) {
+            fail("Should not throw exception when setting tokens for AM 
container.");
+        }
+    }
 }
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/token/TestYarnAMDelegationTokenProvider.java
 
b/flink-yarn/src/test/java/org/apache/flink/yarn/token/TestYarnAMDelegationTokenProvider.java
new file mode 100644
index 00000000000..cc7d839515c
--- /dev/null
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/token/TestYarnAMDelegationTokenProvider.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.token;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.security.token.DelegationTokenProvider;
+import 
org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenConverter;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+
+import java.io.IOException;
+import java.util.Optional;
+
+public class TestYarnAMDelegationTokenProvider implements 
DelegationTokenProvider {
+
+    public static final String SERVICE_NAME = "yarn-am";
+    public static final Token<? extends TokenIdentifier> TEST_YARN_AM_TOKEN =
+            new Token<>(
+                    new byte[4],
+                    new byte[4],
+                    new Text("TEST_YARN_AM_TOKEN_KIND"),
+                    new Text("TEST_YARN_AM_TOKEN_SERVICE"));
+
+    @Override
+    public String serviceName() {
+        return SERVICE_NAME;
+    }
+
+    @Override
+    public void init(Configuration configuration) {}
+
+    @Override
+    public boolean delegationTokensRequired() {
+        return true;
+    }
+
+    @Override
+    public ObtainedDelegationTokens obtainDelegationTokens() throws 
IOException {
+        Credentials credentials = new Credentials();
+        credentials.addToken(TEST_YARN_AM_TOKEN.getService(), 
TEST_YARN_AM_TOKEN);
+        return new ObtainedDelegationTokens(
+                HadoopDelegationTokenConverter.serialize(credentials), 
Optional.empty());
+    }
+}
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/token/TestYarnAMDelegationTokenReceiver.java
 
b/flink-yarn/src/test/java/org/apache/flink/yarn/token/TestYarnAMDelegationTokenReceiver.java
new file mode 100644
index 00000000000..38d3a7abfd6
--- /dev/null
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/token/TestYarnAMDelegationTokenReceiver.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.token;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.security.token.DelegationTokenReceiver;
+
+public class TestYarnAMDelegationTokenReceiver implements 
DelegationTokenReceiver {
+
+    @Override
+    public String serviceName() {
+        return "yarn-am";
+    }
+
+    @Override
+    public void init(Configuration configuration) throws Exception {}
+
+    @Override
+    public void onNewTokensObtained(byte[] tokens) throws Exception {}
+}
diff --git 
a/flink-yarn/src/test/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
 
b/flink-yarn/src/test/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
new file mode 100644
index 00000000000..5b8223ff1d3
--- /dev/null
+++ 
b/flink-yarn/src/test/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.yarn.token.TestYarnAMDelegationTokenProvider
diff --git 
a/flink-yarn/src/test/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
 
b/flink-yarn/src/test/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
new file mode 100644
index 00000000000..e4a9611e32c
--- /dev/null
+++ 
b/flink-yarn/src/test/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.yarn.token.TestYarnAMDelegationTokenReceiver

Reply via email to