[
https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15503255#comment-15503255
]
ASF GitHub Bot commented on FLINK-3929:
---------------------------------------
Github user mxm commented on a diff in the pull request:
https://github.com/apache/flink/pull/2275#discussion_r79355653
--- Diff:
flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java ---
@@ -75,34 +85,66 @@ public static void runYarnTaskManager(String[] args,
final Class<? extends YarnT
"specified in the Flink config: " +
flinkTempDirs);
}
- LOG.info("YARN daemon runs as '" +
UserGroupInformation.getCurrentUser().getShortUserName() +
- "' setting user to execute Flink TaskManager to '" +
yarnClientUsername + "'");
-
// tell akka to die in case of an error
configuration.setBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, true);
- UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(yarnClientUsername);
- for (Token<? extends TokenIdentifier> toks :
UserGroupInformation.getCurrentUser().getTokens()) {
- ugi.addToken(toks);
+ String keytabPath = null;
+ if(remoteKeytabPath != null) {
+ File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
+ keytabPath = f.getAbsolutePath();
+ LOG.info("keytabPath: {}", keytabPath);
}
+ UserGroupInformation currentUser =
UserGroupInformation.getCurrentUser();
+
+ LOG.info("YARN daemon is running as: {} Yarn client user
obtainer: {}",
+ currentUser.getShortUserName(),
yarnClientUsername );
+
// Infer the resource identifier from the environment variable
String containerID =
Preconditions.checkNotNull(envs.get(YarnFlinkResourceManager.ENV_FLINK_CONTAINER_ID));
final ResourceID resourceId = new ResourceID(containerID);
LOG.info("ResourceID assigned for this container: {}",
resourceId);
- ugi.doAs(new PrivilegedAction<Object>() {
- @Override
- public Object run() {
- try {
-
TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, resourceId,
taskManager);
- }
- catch (Throwable t) {
- LOG.error("Error while starting the
TaskManager", t);
-
System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
- }
- return null;
+ try {
+
+ SecurityContext.SecurityConfiguration sc = new
SecurityContext.SecurityConfiguration();
+
+ //To support Yarn Secure Integration Test Scenario
+ File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME);
+ if(krb5Conf.exists() && krb5Conf.canRead()) {
+ String krb5Path = krb5Conf.getAbsolutePath();
+ LOG.info("KRB5 Conf: {}", krb5Path);
+ org.apache.hadoop.conf.Configuration conf = new
org.apache.hadoop.conf.Configuration();
+
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"kerberos");
+
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
+ sc.setHadoopConfiguration(conf);
+ }
+
+ if(keytabPath != null && remoteKeytabPrincipal != null)
{
+
configuration.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath);
+
configuration.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY,
remoteKeytabPrincipal);
}
- });
+
configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, currDir);
+
+
SecurityContext.install(sc.setFlinkConfiguration(configuration));
+
+ SecurityContext.getInstalled().runSecured(new
SecurityContext.FlinkSecuredRunner<Integer>() {
+ @Override
+ public Integer run() {
+ try {
+
TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, resourceId,
taskManager);
+ }
+ catch (Throwable t) {
+ LOG.error("Error while starting
the TaskManager", t);
+
System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
+ }
+ return null;
+ }
+ });
+ } catch(Exception e) {
+ LOG.error("Exception occurred while launching Task
Manager. Reason: {}", e);
--- End diff --
The signature is error(String msg, Throwable t). You can remove the
"Reason: {}".
> Support for Kerberos Authentication with Keytab Credential
> ----------------------------------------------------------
>
> Key: FLINK-3929
> URL: https://issues.apache.org/jira/browse/FLINK-3929
> Project: Flink
> Issue Type: New Feature
> Reporter: Eron Wright
> Assignee: Vijay Srinivasaraghavan
> Labels: kerberos, security
> Original Estimate: 672h
> Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
> design doc._
> Add support for a keytab credential to be associated with the Flink cluster,
> to facilitate:
> - Kerberos-authenticated data access for connectors
> - Kerberos-authenticated ZooKeeper access
> Support both the standalone and YARN deployment modes.
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)