This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push:
new 8751e69 [FLINK-15561] Unify Kerberos credentials checking
8751e69 is described below
commit 8751e69037d8a9b1756b75eed62a368c3ef29137
Author: Rong Rong <[email protected]>
AuthorDate: Sun Jan 12 15:16:51 2020 -0800
[FLINK-15561] Unify Kerberos credentials checking
Before, we had duplicate code in HadoopModule and YarnClusterDescriptor,
now we use the same code for both. That code is refactored to a util.
---
.../java/org/apache/flink/runtime/util/HadoopUtils.java | 17 +++++++++++++++++
.../flink/runtime/security/modules/HadoopModule.java | 15 +++------------
.../org/apache/flink/yarn/YarnClusterDescriptor.java | 10 +++++-----
3 files changed, 25 insertions(+), 17 deletions(-)
diff --git
a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
index f9244d3..e91cd99 100644
---
a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
+++
b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
@@ -112,6 +112,23 @@ public class HadoopUtils {
return result;
}
+ public static boolean isCredentialsConfigured(UserGroupInformation ugi,
boolean useTicketCache) throws Exception {
+ if (UserGroupInformation.isSecurityEnabled()) {
+ if (useTicketCache && !ugi.hasKerberosCredentials()) {
+ // a delegation token is an adequate substitute
in most cases
+ if (!HadoopUtils.hasHDFSDelegationToken()) {
+ LOG.error("Hadoop security is enabled,
but current login user has neither Kerberos credentials " +
+ "nor delegation tokens!");
+ return false;
+ } else {
+ LOG.warn("Hadoop security is enabled
but current login user does not have Kerberos credentials, " +
+ "use delegation token instead.
Flink application will terminate after token expires.");
+ }
+ }
+ }
+ return true;
+ }
+
/**
* Indicates whether the current user has an HDFS delegation token.
*/
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
index 1e045ad..b9250e6 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
@@ -137,19 +137,10 @@ public class HadoopModule implements SecurityModule {
loginUser = UserGroupInformation.getLoginUser();
}
- if (UserGroupInformation.isSecurityEnabled()) {
- // note: UGI::hasKerberosCredentials
inaccurately reports false
- // for logins based on a keytab (fixed in
Hadoop 2.6.1, see HADOOP-10786),
- // so we check only in ticket cache scenario.
- if (securityConfig.useTicketCache() &&
!loginUser.hasKerberosCredentials()) {
- // a delegation token is an adequate
substitute in most cases
- if
(!HadoopUtils.hasHDFSDelegationToken()) {
- LOG.warn("Hadoop security is
enabled but current login user does not have Kerberos credentials");
- }
- }
- }
+ boolean isCredentialsConfigured =
HadoopUtils.isCredentialsConfigured(
+ loginUser, securityConfig.useTicketCache());
- LOG.info("Hadoop user set to {}", loginUser);
+ LOG.info("Hadoop user set to {}, credentials check
status: {}", loginUser, isCredentialsConfigured);
} catch (Throwable ex) {
throw new SecurityInstallException("Unable to set the
Hadoop login user", ex);
diff --git
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 4a905ae..31bd6a1 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -43,6 +43,7 @@ import
org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.util.HadoopUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ShutdownHookUtil;
@@ -432,12 +433,11 @@ public class YarnClusterDescriptor implements
ClusterDescriptor<ApplicationId> {
// so we check only in ticket cache scenario.
boolean useTicketCache =
flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);
- UserGroupInformation loginUser =
UserGroupInformation.getCurrentUser();
- if (loginUser.getAuthenticationMethod() ==
UserGroupInformation.AuthenticationMethod.KERBEROS
- && useTicketCache &&
!loginUser.hasKerberosCredentials()) {
- LOG.error("Hadoop security with Kerberos is
enabled but the login user does not have Kerberos credentials");
+ boolean isCredentialsConfigured =
HadoopUtils.isCredentialsConfigured(
+ UserGroupInformation.getCurrentUser(),
useTicketCache);
+ if (!isCredentialsConfigured) {
throw new RuntimeException("Hadoop security
with Kerberos is enabled but the login user " +
- "does not have Kerberos
credentials");
+ "does not have Kerberos credentials or
delegation tokens!");
}
}