This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 1678531 NIFI-7025: Initial commit adding Kerberos Password feature
for Hive components Kerberos Password property should not support EL, this
includes a change to KerberosProperties which is also used by the HDFS
processors (AbstractHadoopProcessor) Added wiring in a KerberosContext to a
TestRunner's MockProcessorInitializationContext Removed synchronization blocks
around KerberosUser.checkTGTAndRelogin, since that method is already
synchronized Updated AbstractHadoopProcessor t [...]
1678531 is described below
commit 167853163884a20c27cd44b38679918220774411
Author: jstorck <[email protected]>
AuthorDate: Fri Feb 28 01:18:06 2020 -0500
NIFI-7025: Initial commit adding Kerberos Password feature for Hive
components
Kerberos Password property should not support EL, this includes a change to
KerberosProperties which is also used by the HDFS processors
(AbstractHadoopProcessor)
Added wiring in a KerberosContext to a TestRunner's
MockProcessorInitializationContext
Removed synchronization blocks around KerberosUser.checkTGTAndRelogin,
since that method is already synchronized
Updated AbstractHadoopProcessor to have a boolean accessor method to
determine if explicit keytab configuration is allowed
Removed synchronization block from HiveConnectionPool's getConnection
method (in Hive, Hive_1_1, Hive3 modules), since new TGT ticket acquisition is
handled by the KerberosUser implementation. If UGI is used to relogin,
synchronization is handled internally by UGI.
Added Kerberos Principal and Kerberos Password properties to Hive,
Hive_1_1, and Hive3 components
Hive, Hive_1_1, and Hive3 components now use KerberosUser implementations
to authenticate with a KDC
Updated handling of the NIFI_ALLOW_EXPLICIT_KEYTAB environment variable in
Hive and Hive3 components. An accessor method has been added that uses
Boolean.parseBoolean, which returns true if the environment variable is set to
true, and false otherwise (including when the environment variable is unset).
Addressing PR feedback
Addressing PR feedback
This closes #4102.
---
.../nifi/security/krb/AbstractKerberosUser.java | 4 +-
.../nifi/security/krb/KeytabConfiguration.java | 4 +
.../util/MockProcessorInitializationContext.java | 15 +++-
.../nifi/util/StandardProcessorTestRunner.java | 15 +++-
.../java/org/apache/nifi/util/TestRunners.java | 22 +++++
.../org/apache/nifi/hadoop/KerberosProperties.java | 3 +-
.../java/org/apache/nifi/hadoop/SecurityUtil.java | 9 ++
.../processors/hadoop/AbstractHadoopProcessor.java | 29 +++----
.../hadoop/AbstractHadoopProcessorSpec.groovy | 30 +++----
.../processors/hadoop/SimpleHadoopProcessor.java | 10 +++
.../apache/nifi/dbcp/hive/HiveConnectionPool.java | 83 ++++++++++++------
.../nifi/processors/hive/PutHiveStreaming.java | 65 ++++++++++++---
.../apache/nifi/util/hive/HiveConfigurator.java | 24 +++++-
.../nifi/processors/hive/TestPutHiveStreaming.java | 24 +++++-
.../apache/nifi/dbcp/hive/Hive3ConnectionPool.java | 95 ++++++++++++++-------
.../nifi/processors/hive/PutHive3Streaming.java | 65 +++++++++++++--
.../apache/nifi/util/hive/HiveConfigurator.java | 24 +++++-
.../processors/hive/TestPutHive3Streaming.java | 10 ++-
.../nifi/dbcp/hive/Hive_1_1ConnectionPool.java | 97 +++++++++++++++++-----
.../apache/nifi/util/hive/HiveConfigurator.java | 24 +++++-
20 files changed, 505 insertions(+), 147 deletions(-)
diff --git
a/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/krb/AbstractKerberosUser.java
b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/krb/AbstractKerberosUser.java
index 6764497..5278618 100644
---
a/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/krb/AbstractKerberosUser.java
+++
b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/krb/AbstractKerberosUser.java
@@ -82,7 +82,9 @@ public abstract class AbstractKerberosUser implements
KerberosUser {
loggedIn.set(true);
LOGGER.debug("Successful login for {}", new Object[]{principal});
} catch (LoginException le) {
- throw new LoginException("Unable to login with " + principal + "
due to: " + le.getMessage());
+ LoginException loginException = new LoginException("Unable to
login with " + principal + " due to: " + le.getMessage());
+ loginException.setStackTrace(le.getStackTrace());
+ throw loginException;
}
}
diff --git
a/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/krb/KeytabConfiguration.java
b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/krb/KeytabConfiguration.java
index 24af9b2..a038c1d 100644
---
a/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/krb/KeytabConfiguration.java
+++
b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/krb/KeytabConfiguration.java
@@ -17,6 +17,8 @@
package org.apache.nifi.security.krb;
import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.security.auth.login.AppConfigurationEntry;
import javax.security.auth.login.Configuration;
@@ -27,6 +29,7 @@ import java.util.Map;
* Custom JAAS Configuration object for a provided principal and keytab.
*/
public class KeytabConfiguration extends Configuration {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(KeytabConfiguration.class);
private final String principal;
private final String keytabFile;
@@ -63,6 +66,7 @@ public class KeytabConfiguration extends Configuration {
final String krbLoginModuleName = ConfigurationUtil.IS_IBM
? ConfigurationUtil.IBM_KRB5_LOGIN_MODULE :
ConfigurationUtil.SUN_KRB5_LOGIN_MODULE;
+ LOGGER.debug("krbLoginModuleName: {}, configuration options: {}",
krbLoginModuleName, options);
this.kerberosKeytabConfigEntry = new AppConfigurationEntry(
krbLoginModuleName,
AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, options);
}
diff --git
a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java
b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java
index 6a26371..d48fc3d 100644
---
a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java
+++
b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java
@@ -23,6 +23,7 @@ import java.util.UUID;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.controller.NodeTypeProvider;
+import org.apache.nifi.kerberos.KerberosContext;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.ProcessorInitializationContext;
@@ -31,15 +32,21 @@ public class MockProcessorInitializationContext implements
ProcessorInitializati
private final MockComponentLog logger;
private final String processorId;
private final MockProcessContext context;
+ private final KerberosContext kerberosContext;
public MockProcessorInitializationContext(final Processor processor, final
MockProcessContext context) {
- this(processor, context, null);
+ this(processor, context, null, null);
}
public MockProcessorInitializationContext(final Processor processor, final
MockProcessContext context, final MockComponentLog logger) {
+ this(processor, context, logger, null);
+ }
+
+ public MockProcessorInitializationContext(final Processor processor, final
MockProcessContext context, final MockComponentLog logger, KerberosContext
kerberosContext) {
processorId = UUID.randomUUID().toString();
this.logger = logger == null ? new MockComponentLog(processorId,
processor) : logger;
this.context = context;
+ this.kerberosContext = kerberosContext;
}
@Override
@@ -94,16 +101,16 @@ public class MockProcessorInitializationContext implements
ProcessorInitializati
@Override
public String getKerberosServicePrincipal() {
- return null; //this needs to be wired in.
+ return kerberosContext != null ?
kerberosContext.getKerberosServicePrincipal() : null;
}
@Override
public File getKerberosServiceKeytab() {
- return null; //this needs to be wired in.
+ return kerberosContext != null ?
kerberosContext.getKerberosServiceKeytab() : null;
}
@Override
public File getKerberosConfigurationFile() {
- return null; //this needs to be wired in.
+ return kerberosContext != null ?
kerberosContext.getKerberosConfigurationFile() : null;
}
}
diff --git
a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
index d995f8e..40010c6 100644
---
a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
+++
b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
@@ -64,6 +64,7 @@ import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.kerberos.KerberosContext;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
@@ -77,6 +78,7 @@ public class StandardProcessorTestRunner implements
TestRunner {
private final Processor processor;
private final MockProcessContext context;
+ private final KerberosContext kerberosContext;
private final MockFlowFileQueue flowFileQueue;
private final SharedSessionState sharedState;
private final AtomicLong idGenerator;
@@ -99,10 +101,18 @@ public class StandardProcessorTestRunner implements
TestRunner {
}
StandardProcessorTestRunner(final Processor processor, String
processorName) {
- this(processor, processorName, null);
+ this(processor, processorName, null, null);
+ }
+
+ StandardProcessorTestRunner(final Processor processor, String
processorName, KerberosContext kerberosContext) {
+ this(processor, processorName, null, kerberosContext);
}
StandardProcessorTestRunner(final Processor processor, String
processorName, MockComponentLog logger) {
+ this(processor, processorName, logger, null);
+ }
+
+ StandardProcessorTestRunner(final Processor processor, String
processorName, MockComponentLog logger, KerberosContext kerberosContext) {
this.processor = processor;
this.idGenerator = new AtomicLong(0L);
this.sharedState = new SharedSessionState(processor, idGenerator);
@@ -111,8 +121,9 @@ public class StandardProcessorTestRunner implements
TestRunner {
this.processorStateManager = new MockStateManager(processor);
this.variableRegistry = new MockVariableRegistry();
this.context = new MockProcessContext(processor, processorName,
processorStateManager, variableRegistry);
+ this.kerberosContext = kerberosContext;
- final MockProcessorInitializationContext mockInitContext = new
MockProcessorInitializationContext(processor, context, logger);
+ final MockProcessorInitializationContext mockInitContext = new
MockProcessorInitializationContext(processor, context, logger, kerberosContext);
processor.initialize(mockInitContext);
this.logger = mockInitContext.getLogger();
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunners.java
b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunners.java
index 9792473..8370e88 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunners.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunners.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.util;
+import org.apache.nifi.kerberos.KerberosContext;
import org.apache.nifi.processor.Processor;
public class TestRunners {
@@ -31,6 +32,16 @@ public class TestRunners {
}
/**
+ * Returns a {@code TestRunner} for the given {@code Processor} which uses
the given {@code KerberosContext}.
+ * @param processor the {@code Processor} under test
+ * @param kerberosContext the {@code KerberosContext} used during the test
+ * @return
+ */
+ public static TestRunner newTestRunner(final Processor processor,
KerberosContext kerberosContext) {
+ return newTestRunner(processor,processor.getClass().getName(),
kerberosContext);
+ }
+
+ /**
* Returns a {@code TestRunner} for the given {@code Processor}.
* The processor name available from {@code
TestRunner.getProcessContext().getName()} will have the default name of {@code
processor.getClass().getName()}
* @param processor the {@code Processor} under test
@@ -53,6 +64,17 @@ public class TestRunners {
}
/**
+ * Returns a {@code TestRunner} for the given {@code Processor} and {@code
KerberosContext}.
+ * @param processor the {@code Processor} under test
+ * @param name the name to give the {@code Processor}
+ * @param kerberosContext the {@code KerberosContext} used during the test
+ * @return a {@code TestRunner}
+ */
+ public static TestRunner newTestRunner(final Processor processor, String
name, KerberosContext kerberosContext) {
+ return new StandardProcessorTestRunner(processor, name,
kerberosContext);
+ }
+
+ /**
* Returns a {@code TestRunner} for the given {@code Processor}.
* The processor name available from {@code
TestRunner.getProcessContext().getName()} will be the passed name.
* @param processor the {@code Processor} under test
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosProperties.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosProperties.java
index ba5e9ec..5977b6c 100644
---
a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosProperties.java
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosProperties.java
@@ -104,8 +104,7 @@ public class KerberosProperties {
.name("Kerberos Password")
.required(false)
.description("Kerberos password associated with the
principal.")
-
.addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
-
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.sensitive(true)
.build();
}
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/SecurityUtil.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/SecurityUtil.java
index 6a8f079..eeabaf0 100644
---
a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/SecurityUtil.java
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/SecurityUtil.java
@@ -78,6 +78,15 @@ public class SecurityUtil {
return UserGroupInformation.getCurrentUser();
}
+ /**
+ * Authenticates a {@link KerberosUser} and acquires a {@link
UserGroupInformation} instance using {@link
UserGroupInformation#getUGIFromSubject(Subject)}.
+ * The {@link UserGroupInformation} will use the given {@link
Configuration}.
+ *
+ * @param config The Configuration to apply to the acquired
UserGroupInformation instance
+ * @param kerberosUser The KerberosUser to authenticate
+ * @return A UserGroupInformation instance created using the Subject of
the given KerberosUser
+ * @throws IOException if authentication fails
+ */
public static synchronized UserGroupInformation
getUgiForKerberosUser(final Configuration config, final KerberosUser
kerberosUser) throws IOException {
UserGroupInformation.setConfiguration(config);
try {
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
index 69ea716..7a59170 100644
---
a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
@@ -184,12 +184,11 @@ public abstract class AbstractHadoopProcessor extends
AbstractProcessor {
final String configResources =
validationContext.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
final String explicitPrincipal =
validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
final String explicitKeytab =
validationContext.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
- final String explicitPassword =
validationContext.getProperty(kerberosProperties.getKerberosPassword()).evaluateAttributeExpressions().getValue();
+ final String explicitPassword =
validationContext.getProperty(kerberosProperties.getKerberosPassword()).getValue();
final KerberosCredentialsService credentialsService =
validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
final String resolvedPrincipal;
final String resolvedKeytab;
- final String resolvedPassword;
if (credentialsService == null) {
resolvedPrincipal = explicitPrincipal;
resolvedKeytab = explicitKeytab;
@@ -197,7 +196,6 @@ public abstract class AbstractHadoopProcessor extends
AbstractProcessor {
resolvedPrincipal = credentialsService.getPrincipal();
resolvedKeytab = credentialsService.getKeytab();
}
- resolvedPassword = explicitPassword;
final List<ValidationResult> results = new ArrayList<>();
@@ -220,7 +218,7 @@ public abstract class AbstractHadoopProcessor extends
AbstractProcessor {
final Configuration conf = resources.getConfiguration();
results.addAll(KerberosProperties.validatePrincipalWithKeytabOrPassword(
- this.getClass().getSimpleName(), conf, resolvedPrincipal,
resolvedKeytab, resolvedPassword, getLogger()));
+ this.getClass().getSimpleName(), conf, resolvedPrincipal,
resolvedKeytab, explicitPassword, getLogger()));
} catch (final IOException e) {
results.add(new ValidationResult.Builder()
@@ -238,8 +236,7 @@ public abstract class AbstractHadoopProcessor extends
AbstractProcessor {
.build());
}
- final String allowExplicitKeytabVariable =
getAllowExplicitKeytabEnvironmentVariable();
- if ("false".equalsIgnoreCase(allowExplicitKeytabVariable) &&
explicitKeytab != null) {
+ if (!isAllowExplicitKeytab() && explicitKeytab != null) {
results.add(new ValidationResult.Builder()
.subject("Kerberos Credentials")
.valid(false)
@@ -390,7 +387,7 @@ public abstract class AbstractHadoopProcessor extends
AbstractProcessor {
if (SecurityUtil.isSecurityEnabled(config)) {
String principal =
context.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
String keyTab =
context.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
- String password =
context.getProperty(kerberosProperties.getKerberosPassword()).evaluateAttributeExpressions().getValue();
+ String password =
context.getProperty(kerberosProperties.getKerberosPassword()).getValue();
// If the Kerberos Credentials Service is specified, we need
to use its configuration, not the explicit properties for principal/keytab.
// The customValidate method ensures that only one can be set,
so we know that the principal & keytab above are null.
@@ -545,15 +542,13 @@ public abstract class AbstractHadoopProcessor extends
AbstractProcessor {
if (hdfsResources.get().getKerberosUser() != null) {
// if there's a KerberosUser associated with this UGI, check the
TGT and relogin if it is close to expiring
KerberosUser kerberosUser = hdfsResources.get().getKerberosUser();
- synchronized (kerberosUser) {
- getLogger().debug("kerberosUser is " + kerberosUser);
- try {
- getLogger().debug("checking TGT on kerberosUser [{}]" +
kerberosUser);
- kerberosUser.checkTGTAndRelogin();
- } catch (LoginException e) {
- throw new ProcessException("Unable to relogin with
kerberos credentials for " + kerberosUser.getPrincipal(), e);
+ getLogger().debug("kerberosUser is " + kerberosUser);
+ try {
+ getLogger().debug("checking TGT on kerberosUser " +
kerberosUser);
+ kerberosUser.checkTGTAndRelogin();
+ } catch (LoginException e) {
+ throw new ProcessException("Unable to relogin with kerberos
credentials for " + kerberosUser.getPrincipal(), e);
}
- }
} else {
getLogger().debug("kerberosUser was null, will not refresh TGT
with KerberosUser");
}
@@ -563,8 +558,8 @@ public abstract class AbstractHadoopProcessor extends
AbstractProcessor {
/*
* Overridable by subclasses in the same package, mainly intended for
testing purposes to allow verification without having to set environment
variables.
*/
- String getAllowExplicitKeytabEnvironmentVariable() {
- return System.getenv(ALLOW_EXPLICIT_KEYTAB);
+ boolean isAllowExplicitKeytab() {
+ return Boolean.parseBoolean(System.getenv(ALLOW_EXPLICIT_KEYTAB));
}
static protected class HdfsResources {
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/test/groovy/org/apache/nifi/processors/hadoop/AbstractHadoopProcessorSpec.groovy
b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/test/groovy/org/apache/nifi/processors/hadoop/AbstractHadoopProcessorSpec.groovy
index 9b4faf5..49a835f 100644
---
a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/test/groovy/org/apache/nifi/processors/hadoop/AbstractHadoopProcessorSpec.groovy
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/test/groovy/org/apache/nifi/processors/hadoop/AbstractHadoopProcessorSpec.groovy
@@ -74,22 +74,22 @@ class AbstractHadoopProcessorSpec extends Specification {
actualValidationErrors.size() == expectedValidationErrorCount
where:
- testName | configuredPrincipal | configuredKeytab | configuredPassword
| allowExplicitKeytab | configuredKeytabCredentialsService |
configuredKeytabCredentialsServicePrincipal |
configuredKeytabCredentialsServiceKeytab || expectedValidationErrorCount
- "success case 1" | "principal" | "keytab" | null
| "true" | false | null
| null
|| 0
- "success case 2" | "principal" | null |
"password" | "true" | false |
null | null
|| 0
- "success case 3" | "principal" | null |
"password" | "false" | false |
null | null
|| 0
- "success case 4" | null | null | null
| "true" | true |
"principal" | "keytab"
|| 0
- "success case 5" | null | null | null
| "false" | true |
"principal" | "keytab"
|| 0
+ testName | configuredPrincipal | configuredKeytab |
configuredPassword | allowExplicitKeytab | configuredKeytabCredentialsService |
configuredKeytabCredentialsServicePrincipal |
configuredKeytabCredentialsServiceKeytab || expectedValidationErrorCount
+ "success case 1" | "principal" | "keytab" | null
| true | false | null
| null || 0
+ "success case 2" | "principal" | null | "password"
| true | false | null
| null || 0
+ "success case 3" | "principal" | null | "password"
| false | false | null
| null || 0
+ "success case 4" | null | null | null
| true | true |
"principal" | "keytab"
|| 0
+ "success case 5" | null | null | null
| false | true |
"principal" | "keytab"
|| 0
// do not allow explicit keytab, but provide one anyway; validation
fails
- "failure case 1" | "principal" | "keytab" | null
| "false" | false | null
| null
|| 1
- "failure case 2" | null | "keytab" | null
| "false" | false | null
| null
|| 2
+ "failure case 1" | "principal" | "keytab" | null
| false | false | null
| null || 1
+ "failure case 2" | null | "keytab" | null
| false | false | null
| null || 2
// keytab credentials service is provided, but explicit properties for
principal, password, or keytab are also provided; validation fails
- "failure case 3" | "principal" | null | null
| "true" | true |
"principal" | "keytab"
|| 1
- "failure case 4" | null | "keytab" | null
| "true" | true |
"principal" | "keytab"
|| 1
- "failure case 5" | null | null |
"password" | "true" | true |
"principal" | "keytab"
|| 2
- "failure case 6" | "principal" | null | null
| "false" | true |
"principal" | "keytab"
|| 1
- "failure case 7" | null | "keytab" | null
| "false" | true |
"principal" | "keytab"
|| 2
- "failure case 8" | null | null |
"password" | "false" | true |
"principal" | "keytab"
|| 2
+ "failure case 3" | "principal" | null | null
| true | true |
"principal" | "keytab"
|| 1
+ "failure case 4" | null | "keytab" | null
| true | true |
"principal" | "keytab"
|| 1
+ "failure case 5" | null | null | "password"
| true | true |
"principal" | "keytab"
|| 2
+ "failure case 6" | "principal" | null | null
| false | true |
"principal" | "keytab"
|| 1
+ "failure case 7" | null | "keytab" | null
| false | true |
"principal" | "keytab"
|| 2
+ "failure case 8" | null | null | "password"
| false | true |
"principal" | "keytab"
|| 2
}
private class TestAbstractHadoopProcessor extends AbstractHadoopProcessor {
@@ -105,7 +105,7 @@ class AbstractHadoopProcessorSpec extends Specification {
}
@Override
- String getAllowExplicitKeytabEnvironmentVariable() {
+ boolean isAllowExplicitKeytab() {
allowExplicitKeytab
}
diff --git
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/SimpleHadoopProcessor.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/SimpleHadoopProcessor.java
index 4aa8e54..f8622da 100644
---
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/SimpleHadoopProcessor.java
+++
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/SimpleHadoopProcessor.java
@@ -26,9 +26,15 @@ import java.io.File;
public class SimpleHadoopProcessor extends AbstractHadoopProcessor {
private KerberosProperties testKerberosProperties;
+ private boolean allowExplicitKeytab;
public SimpleHadoopProcessor(KerberosProperties kerberosProperties) {
+ this(kerberosProperties, true);
+ }
+
+ public SimpleHadoopProcessor(KerberosProperties kerberosProperties,
boolean allowExplicitKeytab) {
this.testKerberosProperties = kerberosProperties;
+ this.allowExplicitKeytab = allowExplicitKeytab;
}
@Override
@@ -40,4 +46,8 @@ public class SimpleHadoopProcessor extends
AbstractHadoopProcessor {
return testKerberosProperties;
}
+ @Override
+ boolean isAllowExplicitKeytab() {
+ return allowExplicitKeytab;
+ }
}
diff --git
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java
index 378799e..f84af6f 100644
---
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java
+++
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java
@@ -39,6 +39,9 @@ import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.security.krb.KerberosKeytabUser;
+import org.apache.nifi.security.krb.KerberosPasswordUser;
+import org.apache.nifi.security.krb.KerberosUser;
import org.apache.nifi.util.hive.AuthenticationFailedException;
import org.apache.nifi.util.hive.HiveConfigurator;
import org.apache.nifi.util.hive.HiveUtils;
@@ -59,6 +62,8 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
+import javax.security.auth.login.LoginException;
+
/**
* Implementation for Database Connection Pooling Service used for Apache Hive
* connections. Apache DBCP is used for connection pooling functionality.
@@ -165,6 +170,7 @@ public class HiveConnectionPool extends
AbstractControllerService implements Hiv
private volatile HiveConfigurator hiveConfigurator = new
HiveConfigurator();
private volatile UserGroupInformation ugi;
+ private final AtomicReference<KerberosUser> kerberosUserReference = new
AtomicReference<>();
private volatile File kerberosConfigFile = null;
private volatile KerberosProperties kerberosProperties;
@@ -184,6 +190,7 @@ public class HiveConnectionPool extends
AbstractControllerService implements Hiv
kerberosProperties = new KerberosProperties(kerberosConfigFile);
props.add(kerberosProperties.getKerberosPrincipal());
props.add(kerberosProperties.getKerberosKeytab());
+ props.add(kerberosProperties.getKerberosPassword());
properties = props;
}
@@ -201,37 +208,37 @@ public class HiveConnectionPool extends
AbstractControllerService implements Hiv
if (confFileProvided) {
final String explicitPrincipal =
validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
final String explicitKeytab =
validationContext.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
+ final String explicitPassword =
validationContext.getProperty(kerberosProperties.getKerberosPassword()).getValue();
final KerberosCredentialsService credentialsService =
validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
final String resolvedPrincipal;
final String resolvedKeytab;
- if (credentialsService == null) {
- resolvedPrincipal = explicitPrincipal;
- resolvedKeytab = explicitKeytab;
- } else {
+ if (credentialsService != null) {
resolvedPrincipal = credentialsService.getPrincipal();
resolvedKeytab = credentialsService.getKeytab();
+ } else {
+ resolvedPrincipal = explicitPrincipal;
+ resolvedKeytab = explicitKeytab;
}
final String configFiles =
validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
- problems.addAll(hiveConfigurator.validate(configFiles,
resolvedPrincipal, resolvedKeytab, validationResourceHolder, getLogger()));
+ problems.addAll(hiveConfigurator.validate(configFiles,
resolvedPrincipal, resolvedKeytab, explicitPassword, validationResourceHolder,
getLogger()));
- if (credentialsService != null && (explicitPrincipal != null ||
explicitKeytab != null)) {
+ if (credentialsService != null && (explicitPrincipal != null ||
explicitKeytab != null || explicitPassword != null)) {
problems.add(new ValidationResult.Builder()
.subject("Kerberos Credentials")
.valid(false)
- .explanation("Cannot specify both a Kerberos Credentials
Service and a principal/keytab")
+ .explanation("Cannot specify a Kerberos Credentials
Service while also specifying a Kerberos Principal, Kerberos Keytab, or
Kerberos Password")
.build());
}
- final String allowExplicitKeytabVariable =
System.getenv(ALLOW_EXPLICIT_KEYTAB);
- if ("false".equalsIgnoreCase(allowExplicitKeytabVariable) &&
(explicitPrincipal != null || explicitKeytab != null)) {
+ if (!isAllowExplicitKeytab() && explicitKeytab != null) {
problems.add(new ValidationResult.Builder()
.subject("Kerberos Credentials")
.valid(false)
- .explanation("The '" + ALLOW_EXPLICIT_KEYTAB + "' system
environment variable is configured to forbid explicitly configuring
principal/keytab in processors. "
- + "The Kerberos Credentials Service should be used
instead of setting the Kerberos Keytab or Kerberos Principal property.")
+ .explanation("The '" + ALLOW_EXPLICIT_KEYTAB + "' system
environment variable is configured to forbid explicitly configuring Kerberos
Keytab in processors. "
+ + "The Kerberos Credentials Service should be used
instead of setting the Kerberos Keytab or Kerberos Principal property.")
.build());
}
}
@@ -293,28 +300,37 @@ public class HiveConnectionPool extends
AbstractControllerService implements Hiv
if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
final String explicitPrincipal =
context.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
final String explicitKeytab =
context.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
+ final String explicitPassword =
context.getProperty(kerberosProperties.getKerberosPassword()).getValue();
final KerberosCredentialsService credentialsService =
context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
final String resolvedPrincipal;
final String resolvedKeytab;
- if (credentialsService == null) {
- resolvedPrincipal = explicitPrincipal;
- resolvedKeytab = explicitKeytab;
- } else {
+ if (credentialsService != null) {
resolvedPrincipal = credentialsService.getPrincipal();
resolvedKeytab = credentialsService.getKeytab();
+ } else {
+ resolvedPrincipal = explicitPrincipal;
+ resolvedKeytab = explicitKeytab;
}
- log.info("Hive Security Enabled, logging in as principal {} with
keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab});
+ if (resolvedKeytab != null) {
+ kerberosUserReference.set(new
KerberosKeytabUser(resolvedPrincipal, resolvedKeytab));
+ log.info("Hive Security Enabled, logging in as principal {}
with keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab});
+ } else if (explicitPassword != null) {
+ kerberosUserReference.set(new
KerberosPasswordUser(resolvedPrincipal, explicitPassword));
+ log.info("Hive Security Enabled, logging in as principal {}
with password", new Object[] {resolvedPrincipal});
+ } else {
+ throw new InitializationException("Unable to authenticate with
Kerberos, no keytab or password was provided");
+ }
try {
- ugi = hiveConfigurator.authenticate(hiveConfig,
resolvedPrincipal, resolvedKeytab);
+ ugi = hiveConfigurator.authenticate(hiveConfig,
kerberosUserReference.get());
} catch (AuthenticationFailedException ae) {
log.error(ae.getMessage(), ae);
throw new InitializationException(ae);
}
- getLogger().info("Successfully logged in as principal {} with
keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab});
+ getLogger().info("Successfully logged in as principal " +
resolvedPrincipal);
}
final String user =
context.getProperty(DB_USER).evaluateAttributeExpressions().getValue();
@@ -356,13 +372,24 @@ public class HiveConnectionPool extends
AbstractControllerService implements Hiv
public Connection getConnection() throws ProcessException {
try {
if (ugi != null) {
- synchronized(this) {
- /*
- * Make sure that only one thread can request that the UGI
relogin at a time. This
- * explicit relogin attempt is necessary due to the Hive
client/thrift not implicitly handling
- * the acquisition of a new TGT after the current one has
expired.
- * https://issues.apache.org/jira/browse/NIFI-5134
- */
+ /*
+ * Explicitly check the TGT and relogin if necessary with the
KerberosUser instance. No synchronization
+ * is necessary in the client code, since
AbstractKerberosUser's checkTGTAndRelogin method is synchronized.
+ */
+ getLogger().trace("getting UGI instance");
+ if (kerberosUserReference.get() != null) {
+ // if there's a KerberosUser associated with this UGI,
check the TGT and relogin if it is close to expiring
+ KerberosUser kerberosUser = kerberosUserReference.get();
+ getLogger().debug("kerberosUser is " + kerberosUser);
+ try {
+ getLogger().debug("checking TGT on kerberosUser [{}]",
new Object[]{kerberosUser});
+ kerberosUser.checkTGTAndRelogin();
+ } catch (LoginException e) {
+ throw new ProcessException("Unable to relogin with
kerberos credentials for " + kerberosUser.getPrincipal(), e);
+ }
+ } else {
+ getLogger().debug("kerberosUser was null, will not refresh
TGT with KerberosUser");
+ // no synchronization is needed for
UserGroupInformation.checkTGTAndReloginFromKeytab; UGI handles the
synchronization internally
ugi.checkTGTAndReloginFromKeytab();
}
try {
@@ -395,4 +422,10 @@ public class HiveConnectionPool extends
AbstractControllerService implements Hiv
return connectionUrl;
}
+ /*
+ * Overridable by subclasses in the same package, mainly intended for
testing purposes to allow verification without having to set environment
variables.
+ */
+ boolean isAllowExplicitKeytab() {
+ return Boolean.parseBoolean(System.getenv(ALLOW_EXPLICIT_KEYTAB));
+ }
}
diff --git
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
index 1627159..0ef4211 100644
---
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
+++
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
@@ -62,6 +62,9 @@ import org.apache.nifi.processor.util.pattern.ErrorTypes;
import org.apache.nifi.processor.util.pattern.ExceptionHandler;
import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
import org.apache.nifi.processor.util.pattern.RoutingResult;
+import org.apache.nifi.security.krb.KerberosKeytabUser;
+import org.apache.nifi.security.krb.KerberosPasswordUser;
+import org.apache.nifi.security.krb.KerberosUser;
import org.apache.nifi.util.hive.AuthenticationFailedException;
import org.apache.nifi.util.hive.HiveConfigurator;
import org.apache.nifi.util.hive.HiveOptions;
@@ -70,6 +73,7 @@ import org.apache.nifi.util.hive.HiveWriter;
import org.xerial.snappy.Snappy;
import org.apache.nifi.util.hive.ValidationResources;
+import javax.security.auth.login.LoginException;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
@@ -318,6 +322,7 @@ public class PutHiveStreaming extends
AbstractSessionFactoryProcessor {
protected volatile HiveConfigurator hiveConfigurator = new
HiveConfigurator();
protected volatile UserGroupInformation ugi;
+ final protected AtomicReference<KerberosUser> kerberosUserReference = new
AtomicReference<>();
protected volatile HiveConf hiveConfig;
protected final AtomicBoolean sendHeartBeat = new AtomicBoolean(false);
@@ -353,6 +358,7 @@ public class PutHiveStreaming extends
AbstractSessionFactoryProcessor {
kerberosProperties = new KerberosProperties(kerberosConfigFile);
props.add(kerberosProperties.getKerberosPrincipal());
props.add(kerberosProperties.getKerberosKeytab());
+ props.add(kerberosProperties.getKerberosPassword());
propertyDescriptors = Collections.unmodifiableList(props);
Set<Relationship> _relationships = new HashSet<>();
@@ -382,6 +388,7 @@ public class PutHiveStreaming extends
AbstractSessionFactoryProcessor {
if (confFileProvided) {
final String explicitPrincipal =
validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
final String explicitKeytab =
validationContext.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
+ final String explicitPassword =
validationContext.getProperty(kerberosProperties.getKerberosPassword()).getValue();
final KerberosCredentialsService credentialsService =
validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
final String resolvedPrincipal;
@@ -396,23 +403,22 @@ public class PutHiveStreaming extends
AbstractSessionFactoryProcessor {
final String configFiles =
validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
- problems.addAll(hiveConfigurator.validate(configFiles,
resolvedPrincipal, resolvedKeytab, validationResourceHolder, getLogger()));
+ problems.addAll(hiveConfigurator.validate(configFiles,
resolvedPrincipal, resolvedKeytab, explicitPassword, validationResourceHolder,
getLogger()));
- if (credentialsService != null && (explicitPrincipal != null ||
explicitKeytab != null)) {
+ if (credentialsService != null && (explicitPrincipal != null ||
explicitKeytab != null || explicitPassword != null)) {
problems.add(new ValidationResult.Builder()
.subject("Kerberos Credentials")
.valid(false)
- .explanation("Cannot specify both a Kerberos Credentials
Service and a principal/keytab")
+ .explanation("Cannot specify a Kerberos Credentials
Service while also specifying a Kerberos Principal, Kerberos Keytab, or
Kerberos Password")
.build());
}
- final String allowExplicitKeytabVariable =
System.getenv(ALLOW_EXPLICIT_KEYTAB);
- if ("false".equalsIgnoreCase(allowExplicitKeytabVariable) &&
(explicitPrincipal != null || explicitKeytab != null)) {
+ if (!isAllowExplicitKeytab() && explicitKeytab != null) {
problems.add(new ValidationResult.Builder()
.subject("Kerberos Credentials")
.valid(false)
- .explanation("The '" + ALLOW_EXPLICIT_KEYTAB + "' system
environment variable is configured to forbid explicitly configuring
principal/keytab in processors. "
- + "The Kerberos Credentials Service should be used
instead of setting the Kerberos Keytab or Kerberos Principal property.")
+ .explanation("The '" + ALLOW_EXPLICIT_KEYTAB + "' system
environment variable is configured to forbid explicitly configuring Kerberos
Keytab in processors. "
+ + "The Kerberos Credentials Service should be used
instead of setting the Kerberos Keytab or Kerberos Principal property.")
.build());
}
}
@@ -446,6 +452,7 @@ public class PutHiveStreaming extends
AbstractSessionFactoryProcessor {
if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
final String explicitPrincipal =
context.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
final String explicitKeytab =
context.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
+ final String explicitPassword =
context.getProperty(kerberosProperties.getKerberosPassword()).getValue();
final KerberosCredentialsService credentialsService =
context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
final String resolvedPrincipal;
@@ -458,16 +465,26 @@ public class PutHiveStreaming extends
AbstractSessionFactoryProcessor {
resolvedKeytab = credentialsService.getKeytab();
}
- log.info("Hive Security Enabled, logging in as principal {} with
keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab});
+ if (resolvedKeytab != null) {
+ kerberosUserReference.set(new
KerberosKeytabUser(resolvedPrincipal, resolvedKeytab));
+ log.info("Hive Security Enabled, logging in as principal {}
with keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab});
+ } else if (explicitPassword != null) {
+ kerberosUserReference.set(new
KerberosPasswordUser(resolvedPrincipal, explicitPassword));
+ log.info("Hive Security Enabled, logging in as principal {}
with password", new Object[] {resolvedPrincipal});
+ } else {
+ throw new ProcessException("Unable to authenticate with
Kerberos, no keytab or password was provided");
+ }
+
try {
- ugi = hiveConfigurator.authenticate(hiveConfig,
resolvedPrincipal, resolvedKeytab);
+ ugi = hiveConfigurator.authenticate(hiveConfig,
kerberosUserReference.get());
} catch (AuthenticationFailedException ae) {
throw new ProcessException("Kerberos authentication failed for
Hive Streaming", ae);
}
- log.info("Successfully logged in as principal {} with keytab {}",
new Object[] {resolvedPrincipal, resolvedKeytab});
+ log.info("Successfully logged in as principal " +
resolvedPrincipal);
} else {
ugi = null;
+ kerberosUserReference.set(null);
}
callTimeout =
context.getProperty(CALL_TIMEOUT).evaluateAttributeExpressions().asInteger() *
1000; // milliseconds
@@ -967,6 +984,7 @@ public class PutHiveStreaming extends
AbstractSessionFactoryProcessor {
}
ugi = null;
+ kerberosUserReference.set(null);
}
private void setupHeartBeatTimer(int heartbeatInterval) {
@@ -1048,7 +1066,7 @@ public class PutHiveStreaming extends
AbstractSessionFactoryProcessor {
HiveWriter writer = writers.get(endPoint);
if (writer == null) {
log.debug("Creating Writer to Hive end point : " + endPoint);
- writer = makeHiveWriter(endPoint, callTimeoutPool, ugi,
options);
+ writer = makeHiveWriter(endPoint, callTimeoutPool, getUgi(),
options);
if (writers.size() > (options.getMaxOpenConnections() - 1)) {
log.info("cached HiveEndPoint size {} exceeded
maxOpenConnections {} ", new Object[]{writers.size(),
options.getMaxOpenConnections()});
int retired = retireIdleWriters(writers,
options.getIdleTimeout());
@@ -1144,6 +1162,31 @@ public class PutHiveStreaming extends
AbstractSessionFactoryProcessor {
return kerberosProperties;
}
+ UserGroupInformation getUgi() {
+ getLogger().trace("getting UGI instance");
+ if (kerberosUserReference.get() != null) {
+ // if there's a KerberosUser associated with this UGI, check the
TGT and relogin if it is close to expiring
+ KerberosUser kerberosUser = kerberosUserReference.get();
+ getLogger().debug("kerberosUser is " + kerberosUser);
+ try {
+ getLogger().debug("checking TGT on kerberosUser [{}]", new
Object[] {kerberosUser});
+ kerberosUser.checkTGTAndRelogin();
+ } catch (LoginException e) {
+ throw new ProcessException("Unable to relogin with kerberos
credentials for " + kerberosUser.getPrincipal(), e);
+ }
+ } else {
+ getLogger().debug("kerberosUser was null, will not refresh TGT
with KerberosUser");
+ }
+ return ugi;
+ }
+
+ /*
+ * Overridable by subclasses in the same package, mainly intended for
testing purposes to allow verification without having to set environment
variables.
+ */
+ boolean isAllowExplicitKeytab() {
+ return Boolean.parseBoolean(System.getenv(ALLOW_EXPLICIT_KEYTAB));
+ }
+
protected class HiveStreamingRecord {
private List<String> partitionValues;
diff --git
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveConfigurator.java
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveConfigurator.java
index 5fed81a..b212207 100644
---
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveConfigurator.java
+++
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveConfigurator.java
@@ -26,6 +26,7 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.hadoop.SecurityUtil;
import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.security.krb.KerberosUser;
import java.io.IOException;
import java.util.ArrayList;
@@ -35,7 +36,8 @@ import java.util.concurrent.atomic.AtomicReference;
public class HiveConfigurator {
- public Collection<ValidationResult> validate(String configFiles, String
principal, String keyTab, AtomicReference<ValidationResources>
validationResourceHolder, ComponentLog log) {
+ public Collection<ValidationResult> validate(String configFiles, String
principal, String keyTab, String password,
+
AtomicReference<ValidationResources> validationResourceHolder, ComponentLog
log) {
final List<ValidationResult> problems = new ArrayList<>();
ValidationResources resources = validationResourceHolder.get();
@@ -50,7 +52,7 @@ public class HiveConfigurator {
final Configuration hiveConfig = resources.getConfiguration();
-
problems.addAll(KerberosProperties.validatePrincipalWithKeytabOrPassword(this.getClass().getSimpleName(),
hiveConfig, principal, keyTab, null, log));
+
problems.addAll(KerberosProperties.validatePrincipalWithKeytabOrPassword(this.getClass().getSimpleName(),
hiveConfig, principal, keyTab, password, log));
return problems;
}
@@ -75,6 +77,22 @@ public class HiveConfigurator {
}
/**
+ * Acquires a {@link UserGroupInformation} using the given {@link
Configuration} and {@link KerberosUser}.
+ * @see SecurityUtil#getUgiForKerberosUser(Configuration, KerberosUser)
+ * @param hiveConfig The Configuration to apply to the acquired
UserGroupInformation
+ * @param kerberosUser The KerberosUser to authenticate
+ * @return A UserGroupInformation instance created using the Subject of
the given KerberosUser
+ * @throws AuthenticationFailedException if authentication fails
+ */
+ public UserGroupInformation authenticate(final Configuration hiveConfig,
KerberosUser kerberosUser) throws AuthenticationFailedException {
+ try {
+ return SecurityUtil.getUgiForKerberosUser(hiveConfig,
kerberosUser);
+ } catch (IOException ioe) {
+ throw new AuthenticationFailedException("Kerberos Authentication
for Hive failed", ioe);
+ }
+ }
+
+ /**
* As of Apache NiFi 1.5.0, due to changes made to
* {@link SecurityUtil#loginKerberos(Configuration, String, String)},
which is used by this
* class to authenticate a principal with Kerberos, Hive controller
services no longer
@@ -91,7 +109,9 @@ public class HiveConfigurator {
* authentication attempts that would leave the Hive controller service in
an unrecoverable state.
*
* @see SecurityUtil#loginKerberos(Configuration, String, String)
+ * @deprecated Use {@link
SecurityUtil#getUgiForKerberosUser(Configuration, KerberosUser)}
*/
+ @Deprecated
public UserGroupInformation authenticate(final Configuration hiveConfig,
String principal, String keyTab) throws AuthenticationFailedException {
UserGroupInformation ugi;
try {
diff --git
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java
index ed3461d..3681755 100644
---
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java
+++
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java
@@ -34,7 +34,10 @@ import org.apache.hive.hcatalog.streaming.StreamingException;
import org.apache.hive.hcatalog.streaming.TransactionBatch;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.hadoop.SecurityUtil;
+import org.apache.nifi.kerberos.KerberosContext;
import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.security.krb.KerberosPasswordUser;
+import org.apache.nifi.security.krb.KerberosUser;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -67,6 +70,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
@@ -103,7 +107,11 @@ public class TestPutHiveStreaming {
when(hiveConfigurator.getConfigurationFromFiles(AdditionalMatchers.or(anyString(),
isNull()))).thenReturn(hiveConf);
processor.hiveConfigurator = hiveConfigurator;
processor.setKerberosProperties(kerberosPropsWithFile);
- runner = TestRunners.newTestRunner(processor);
+ KerberosContext mockKerberosContext = mock(KerberosContext.class);
+
when(mockKerberosContext.getKerberosConfigurationFile()).thenReturn(kerberosPropsWithFile.getKerberosConfigFile());
+ when(mockKerberosContext.getKerberosServiceKeytab()).thenReturn(null);
+
when(mockKerberosContext.getKerberosServicePrincipal()).thenReturn(null);
+ runner = TestRunners.newTestRunner(processor, mockKerberosContext);
}
@Test
@@ -118,23 +126,27 @@ public class TestPutHiveStreaming {
}
@Test
- public void testUgiGetsCleared() {
+ public void testUgiAndKerberosUserGetsCleared() {
runner.setProperty(PutHiveStreaming.METASTORE_URI,
"thrift://localhost:9083");
runner.setProperty(PutHiveStreaming.DB_NAME, "default");
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
processor.ugi = mock(UserGroupInformation.class);
+ processor.kerberosUserReference.set(new KerberosPasswordUser("user",
"password"));
runner.run();
assertNull(processor.ugi);
+ assertNull(processor.kerberosUserReference.get());
}
@Test
public void testUgiGetsSetIfSecure() throws AuthenticationFailedException,
IOException {
when(hiveConf.get(SecurityUtil.HADOOP_SECURITY_AUTHENTICATION)).thenReturn(SecurityUtil.KERBEROS);
ugi = mock(UserGroupInformation.class);
- when(hiveConfigurator.authenticate(eq(hiveConf),
AdditionalMatchers.or(anyString(), isNull()),
AdditionalMatchers.or(anyString(), isNull()))).thenReturn(ugi);
+ when(hiveConfigurator.authenticate(eq(hiveConf),
any(KerberosUser.class))).thenReturn(ugi);
runner.setProperty(PutHiveStreaming.METASTORE_URI,
"thrift://localhost:9083");
runner.setProperty(PutHiveStreaming.DB_NAME, "default");
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
+ runner.setProperty(kerberosPropsWithFile.getKerberosKeytab(),
"src/test/resources/fake.keytab");
+ runner.setProperty(kerberosPropsWithFile.getKerberosPrincipal(),
"principal");
Map<String, Object> user1 = new HashMap<String, Object>() {
{
put("name", "Joe");
@@ -163,7 +175,7 @@ public class TestPutHiveStreaming {
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
runner.setProperty(PutHiveStreaming.HIVE_CONFIGURATION_RESOURCES,
"src/test/resources/core-site-security.xml,
src/test/resources/hive-site-security.xml");
runner.setProperty(kerberosPropsWithFile.getKerberosPrincipal(),
"test@REALM");
- runner.setProperty(kerberosPropsWithFile.getKerberosKeytab(),
"src/test/resources/fake.keytab");
+ runner.setProperty(kerberosPropsWithFile.getKerberosKeytab(),
"src/test/resources/missing.keytab");
runner.run();
}
@@ -1037,6 +1049,10 @@ public class TestPutHiveStreaming {
this.generateExceptionOnFlushAndClose =
generateExceptionOnFlushAndClose;
}
+ @Override
+ UserGroupInformation getUgi() {
+ return ugi;
+ }
}
private class MockHiveWriter extends HiveWriter {
diff --git
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/dbcp/hive/Hive3ConnectionPool.java
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/dbcp/hive/Hive3ConnectionPool.java
index 01597fa..0a49a40 100644
---
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/dbcp/hive/Hive3ConnectionPool.java
+++
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/dbcp/hive/Hive3ConnectionPool.java
@@ -43,6 +43,9 @@ import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.security.krb.KerberosKeytabUser;
+import org.apache.nifi.security.krb.KerberosPasswordUser;
+import org.apache.nifi.security.krb.KerberosUser;
import org.apache.nifi.util.hive.AuthenticationFailedException;
import org.apache.nifi.util.hive.HiveConfigurator;
import org.apache.nifi.util.hive.HiveUtils;
@@ -62,6 +65,8 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import javax.security.auth.login.LoginException;
+
/**
* Implementation for Database Connection Pooling Service used for Apache Hive
* connections. Apache DBCP is used for connection pooling functionality.
@@ -264,6 +269,7 @@ public class Hive3ConnectionPool extends
AbstractControllerService implements Hi
private volatile HiveConfigurator hiveConfigurator = new
HiveConfigurator();
private volatile UserGroupInformation ugi;
+ private final AtomicReference<KerberosUser> kerberosUserReference = new
AtomicReference<>();
private volatile File kerberosConfigFile = null;
private volatile KerberosProperties kerberosProperties;
@@ -289,6 +295,7 @@ public class Hive3ConnectionPool extends
AbstractControllerService implements Hi
kerberosProperties = new KerberosProperties(kerberosConfigFile);
props.add(kerberosProperties.getKerberosPrincipal());
props.add(kerberosProperties.getKerberosKeytab());
+ props.add(kerberosProperties.getKerberosPassword());
properties = props;
}
@@ -306,38 +313,38 @@ public class Hive3ConnectionPool extends
AbstractControllerService implements Hi
if (confFileProvided) {
final String explicitPrincipal =
validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
final String explicitKeytab =
validationContext.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
+ final String explicitPassword =
validationContext.getProperty(kerberosProperties.getKerberosPassword()).getValue();
final KerberosCredentialsService credentialsService =
validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
final String resolvedPrincipal;
final String resolvedKeytab;
- if (credentialsService == null) {
- resolvedPrincipal = explicitPrincipal;
- resolvedKeytab = explicitKeytab;
- } else {
+ if (credentialsService != null) {
resolvedPrincipal = credentialsService.getPrincipal();
resolvedKeytab = credentialsService.getKeytab();
+ } else {
+ resolvedPrincipal = explicitPrincipal;
+ resolvedKeytab = explicitKeytab;
}
final String configFiles =
validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
- problems.addAll(hiveConfigurator.validate(configFiles,
resolvedPrincipal, resolvedKeytab, validationResourceHolder, getLogger()));
+ problems.addAll(hiveConfigurator.validate(configFiles,
resolvedPrincipal, resolvedKeytab, explicitPassword, validationResourceHolder,
getLogger()));
- if (credentialsService != null && (explicitPrincipal != null ||
explicitKeytab != null)) {
+ if (credentialsService != null && (explicitPrincipal != null ||
explicitKeytab != null || explicitPassword != null)) {
problems.add(new ValidationResult.Builder()
- .subject("Kerberos Credentials")
- .valid(false)
- .explanation("Cannot specify both a Kerberos
Credentials Service and a principal/keytab")
- .build());
+ .subject("Kerberos Credentials")
+ .valid(false)
+ .explanation("Cannot specify a Kerberos Credentials
Service while also specifying a Kerberos Principal, Kerberos Keytab, or
Kerberos Password")
+ .build());
}
- final String allowExplicitKeytabVariable =
System.getenv(ALLOW_EXPLICIT_KEYTAB);
- if ("false".equalsIgnoreCase(allowExplicitKeytabVariable) &&
(explicitPrincipal != null || explicitKeytab != null)) {
+ if (!isAllowExplicitKeytab() && explicitKeytab != null) {
problems.add(new ValidationResult.Builder()
- .subject("Kerberos Credentials")
- .valid(false)
- .explanation("The '" + ALLOW_EXPLICIT_KEYTAB + "'
system environment variable is configured to forbid explicitly configuring
principal/keytab in processors. "
- + "The Kerberos Credentials Service should be
used instead of setting the Kerberos Keytab or Kerberos Principal property.")
- .build());
+ .subject("Kerberos Credentials")
+ .valid(false)
+ .explanation("The '" + ALLOW_EXPLICIT_KEYTAB + "' system
environment variable is configured to forbid explicitly configuring Kerberos
Keytab in processors. "
+ + "The Kerberos Credentials Service should be used
instead of setting the Kerberos Keytab or Kerberos Principal property.")
+ .build());
}
}
@@ -398,28 +405,37 @@ public class Hive3ConnectionPool extends
AbstractControllerService implements Hi
if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
final String explicitPrincipal =
context.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
final String explicitKeytab =
context.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
+ final String explicitPassword =
context.getProperty(kerberosProperties.getKerberosPassword()).getValue();
final KerberosCredentialsService credentialsService =
context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
final String resolvedPrincipal;
final String resolvedKeytab;
- if (credentialsService == null) {
- resolvedPrincipal = explicitPrincipal;
- resolvedKeytab = explicitKeytab;
- } else {
+ if (credentialsService != null) {
resolvedPrincipal = credentialsService.getPrincipal();
resolvedKeytab = credentialsService.getKeytab();
+ } else {
+ resolvedPrincipal = explicitPrincipal;
+ resolvedKeytab = explicitKeytab;
}
- log.info("Hive Security Enabled, logging in as principal {} with
keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab});
+ if (resolvedKeytab != null) {
+ kerberosUserReference.set(new
KerberosKeytabUser(resolvedPrincipal, resolvedKeytab));
+ log.info("Hive Security Enabled, logging in as principal {}
with keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab});
+ } else if (explicitPassword != null) {
+ kerberosUserReference.set(new
KerberosPasswordUser(resolvedPrincipal, explicitPassword));
+ log.info("Hive Security Enabled, logging in as principal {}
with password", new Object[] {resolvedPrincipal});
+ } else {
+ throw new InitializationException("Unable to authenticate with
Kerberos, no keytab or password was provided");
+ }
try {
- ugi = hiveConfigurator.authenticate(hiveConfig,
resolvedPrincipal, resolvedKeytab);
+ ugi = hiveConfigurator.authenticate(hiveConfig,
kerberosUserReference.get());
} catch (AuthenticationFailedException ae) {
log.error(ae.getMessage(), ae);
throw new InitializationException(ae);
}
- getLogger().info("Successfully logged in as principal {} with
keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab});
+ getLogger().info("Successfully logged in as principal " +
resolvedPrincipal);
}
final String user =
context.getProperty(DB_USER).evaluateAttributeExpressions().getValue();
@@ -476,13 +492,24 @@ public class Hive3ConnectionPool extends
AbstractControllerService implements Hi
public Connection getConnection() throws ProcessException {
try {
if (ugi != null) {
- synchronized(this) {
- /*
- * Make sure that only one thread can request that the UGI
relogin at a time. This
- * explicit relogin attempt is necessary due to the Hive
client/thrift not implicitly handling
- * the acquisition of a new TGT after the current one has
expired.
- * https://issues.apache.org/jira/browse/NIFI-5134
- */
+ /*
+ * Explicitly check the TGT and relogin if necessary with the
KerberosUser instance. No synchronization
+ * is necessary in the client code, since
AbstractKerberosUser's checkTGTAndRelogin method is synchronized.
+ */
+ getLogger().trace("getting UGI instance");
+ if (kerberosUserReference.get() != null) {
+ // if there's a KerberosUser associated with this UGI,
check the TGT and relogin if it is close to expiring
+ KerberosUser kerberosUser = kerberosUserReference.get();
+ getLogger().debug("kerberosUser is " + kerberosUser);
+ try {
+ getLogger().debug("checking TGT on kerberosUser " +
kerberosUser);
+ kerberosUser.checkTGTAndRelogin();
+ } catch (LoginException e) {
+ throw new ProcessException("Unable to relogin with
kerberos credentials for " + kerberosUser.getPrincipal(), e);
+ }
+ } else {
+ getLogger().debug("kerberosUser was null, will not refresh
TGT with KerberosUser");
+ // no synchronization is needed for
UserGroupInformation.checkTGTAndReloginFromKeytab; UGI handles the
synchronization internally
ugi.checkTGTAndReloginFromKeytab();
}
try {
@@ -515,4 +542,10 @@ public class Hive3ConnectionPool extends
AbstractControllerService implements Hi
return connectionUrl;
}
+ /*
+ * Overridable by subclasses in the same package, mainly intended for
testing purposes to allow verification without having to set environment
variables.
+ */
+ boolean isAllowExplicitKeytab() {
+ return Boolean.parseBoolean(System.getenv(ALLOW_EXPLICIT_KEYTAB));
+ }
}
diff --git
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
index 535754f..a1123d2 100644
---
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
+++
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
@@ -39,6 +39,7 @@ import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.hadoop.SecurityUtil;
@@ -54,6 +55,9 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.pattern.DiscontinuedException;
import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
import
org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
+import org.apache.nifi.security.krb.KerberosKeytabUser;
+import org.apache.nifi.security.krb.KerberosPasswordUser;
+import org.apache.nifi.security.krb.KerberosUser;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.util.StringUtils;
@@ -201,6 +205,25 @@ public class PutHive3Streaming extends AbstractProcessor {
.required(false)
.build();
+ static final PropertyDescriptor KERBEROS_PRINCIPAL = new
PropertyDescriptor.Builder()
+ .name("kerberos-principal")
+ .displayName("Kerberos Principal")
+ .description("The principal to use when specifying the principal
and password directly in the processor for authenticating via Kerberos.")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ static final PropertyDescriptor KERBEROS_PASSWORD = new
PropertyDescriptor.Builder()
+ .name("kerberos-password")
+ .displayName("Kerberos Password")
+ .description("The password to use when specifying the principal
and password directly in the processor for authenticating via Kerberos.")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .sensitive(true)
+ .build();
+
// Relationships
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
@@ -225,6 +248,7 @@ public class PutHive3Streaming extends AbstractProcessor {
protected volatile HiveConfigurator hiveConfigurator = new
HiveConfigurator();
protected volatile UserGroupInformation ugi;
+ final protected AtomicReference<KerberosUser> kerberosUserReference = new
AtomicReference<>();
protected volatile HiveConf hiveConfig;
protected volatile int callTimeout;
@@ -247,6 +271,8 @@ public class PutHive3Streaming extends AbstractProcessor {
props.add(DISABLE_STREAMING_OPTIMIZATIONS);
props.add(ROLLBACK_ON_FAILURE);
props.add(KERBEROS_CREDENTIALS_SERVICE);
+ props.add(KERBEROS_PRINCIPAL);
+ props.add(KERBEROS_PASSWORD);
propertyDescriptors = Collections.unmodifiableList(props);
@@ -274,14 +300,23 @@ public class PutHive3Streaming extends AbstractProcessor {
final List<ValidationResult> problems = new ArrayList<>();
final KerberosCredentialsService credentialsService =
validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+ final String explicitPrincipal =
validationContext.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
+ final String explicitPassword =
validationContext.getProperty(KERBEROS_PASSWORD).getValue();
- final String resolvedPrincipal = credentialsService != null ?
credentialsService.getPrincipal() : null;
+ final String resolvedPrincipal = credentialsService != null ?
credentialsService.getPrincipal() : explicitPrincipal;
final String resolvedKeytab = credentialsService != null ?
credentialsService.getKeytab() : null;
if (confFileProvided) {
final String configFiles =
validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
- problems.addAll(hiveConfigurator.validate(configFiles,
resolvedPrincipal, resolvedKeytab, validationResourceHolder, getLogger()));
+ problems.addAll(hiveConfigurator.validate(configFiles,
resolvedPrincipal, resolvedKeytab, explicitPassword, validationResourceHolder,
getLogger()));
}
+ if (credentialsService != null && (explicitPrincipal != null ||
explicitPassword != null)) {
+ problems.add(new ValidationResult.Builder()
+ .subject(KERBEROS_CREDENTIALS_SERVICE.getDisplayName())
+ .valid(false)
+ .explanation("kerberos principal/password and kerberos
credential service cannot be configured at the same time")
+ .build());
+ }
return problems;
}
@@ -310,20 +345,33 @@ public class PutHive3Streaming extends AbstractProcessor {
if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
final KerberosCredentialsService credentialsService =
context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+ final String explicitPrincipal =
context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
+ final String explicitPassword =
context.getProperty(KERBEROS_PASSWORD).getValue();
+
+ final String resolvedPrincipal = credentialsService != null ?
credentialsService.getPrincipal() : explicitPrincipal;
+ final String resolvedKeytab = credentialsService != null ?
credentialsService.getKeytab() : null;
+
+ if (resolvedKeytab != null) {
+ kerberosUserReference.set(new
KerberosKeytabUser(resolvedPrincipal, resolvedKeytab));
+ log.info("Hive Security Enabled, logging in as principal {}
with keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab});
+ } else if (explicitPassword != null) {
+ kerberosUserReference.set(new
KerberosPasswordUser(resolvedPrincipal, explicitPassword));
+ log.info("Hive Security Enabled, logging in as principal {}
with password", new Object[] {resolvedPrincipal});
+ } else {
+ throw new ProcessException("Unable to authenticate with
Kerberos, no keytab or password was provided");
+ }
- final String resolvedPrincipal = credentialsService.getPrincipal();
- final String resolvedKeytab = credentialsService.getKeytab();
-
- log.info("Hive Security Enabled, logging in as principal {} with
keytab {}", new Object[]{resolvedPrincipal, resolvedKeytab});
try {
- ugi = hiveConfigurator.authenticate(hiveConfig,
resolvedPrincipal, resolvedKeytab);
+ ugi = hiveConfigurator.authenticate(hiveConfig,
kerberosUserReference.get());
} catch (AuthenticationFailedException ae) {
- throw new ProcessException("Kerberos authentication failed for
Hive Streaming", ae);
+ log.error(ae.getMessage(), ae);
+ throw new ProcessException(ae);
}
log.info("Successfully logged in as principal {} with keytab {}",
new Object[]{resolvedPrincipal, resolvedKeytab});
} else {
ugi = null;
+ kerberosUserReference.set(null);
}
callTimeout =
context.getProperty(CALL_TIMEOUT).evaluateAttributeExpressions().asInteger() *
1000; // milliseconds
@@ -532,6 +580,7 @@ public class PutHive3Streaming extends AbstractProcessor {
}
ugi = null;
+ kerberosUserReference.set(null);
}
private void abortAndCloseConnection(StreamingConnection connection) {
diff --git
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/util/hive/HiveConfigurator.java
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/util/hive/HiveConfigurator.java
index 092557b..3832944 100644
---
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/util/hive/HiveConfigurator.java
+++
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/util/hive/HiveConfigurator.java
@@ -26,6 +26,7 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.hadoop.SecurityUtil;
import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.security.krb.KerberosUser;
import java.io.IOException;
import java.util.ArrayList;
@@ -38,7 +39,8 @@ import java.util.concurrent.atomic.AtomicReference;
*/
public class HiveConfigurator {
- public Collection<ValidationResult> validate(String configFiles, String
principal, String keyTab, AtomicReference<ValidationResources>
validationResourceHolder, ComponentLog log) {
+ public Collection<ValidationResult> validate(String configFiles, String
principal, String keyTab, String password,
+
AtomicReference<ValidationResources> validationResourceHolder, ComponentLog
log) {
final List<ValidationResult> problems = new ArrayList<>();
ValidationResources resources = validationResourceHolder.get();
@@ -53,7 +55,7 @@ public class HiveConfigurator {
final Configuration hiveConfig = resources.getConfiguration();
-
problems.addAll(KerberosProperties.validatePrincipalWithKeytabOrPassword(this.getClass().getSimpleName(),
hiveConfig, principal, keyTab, null, log));
+
problems.addAll(KerberosProperties.validatePrincipalWithKeytabOrPassword(this.getClass().getSimpleName(),
hiveConfig, principal, keyTab, password, log));
return problems;
}
@@ -78,6 +80,22 @@ public class HiveConfigurator {
}
/**
+ * Acquires a {@link UserGroupInformation} using the given {@link
Configuration} and {@link KerberosUser}.
+ * @see SecurityUtil#getUgiForKerberosUser(Configuration, KerberosUser)
+ * @param hiveConfig The Configuration to apply to the acquired
UserGroupInformation
+ * @param kerberosUser The KerberosUser to authenticate
+ * @return A UserGroupInformation instance created using the Subject of
the given KerberosUser
+ * @throws AuthenticationFailedException if authentication fails
+ */
+ public UserGroupInformation authenticate(final Configuration hiveConfig,
KerberosUser kerberosUser) throws AuthenticationFailedException {
+ try {
+ return SecurityUtil.getUgiForKerberosUser(hiveConfig,
kerberosUser);
+ } catch (IOException ioe) {
+ throw new AuthenticationFailedException("Kerberos Authentication
for Hive failed", ioe);
+ }
+ }
+
+ /**
* As of Apache NiFi 1.5.0, due to changes made to
* {@link SecurityUtil#loginKerberos(Configuration, String, String)},
which is used by this
* class to authenticate a principal with Kerberos, Hive controller
services no longer
@@ -94,7 +112,9 @@ public class HiveConfigurator {
* authentication attempts that would leave the Hive controller service in
an unrecoverable state.
*
* @see SecurityUtil#loginKerberos(Configuration, String, String)
+ * @deprecated Use {@link
SecurityUtil#getUgiForKerberosUser(Configuration, KerberosUser)}
*/
+ @Deprecated
public UserGroupInformation authenticate(final Configuration hiveConfig,
String principal, String keyTab) throws AuthenticationFailedException {
UserGroupInformation ugi;
try {
diff --git
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java
index 16c42e5..d9113fe 100644
---
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java
+++
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java
@@ -61,6 +61,7 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.security.krb.KerberosUser;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.record.MapRecord;
@@ -115,9 +116,12 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
@@ -262,7 +266,7 @@ public class TestPutHive3Streaming {
}
@Test
- public void testUgiGetsCleared() throws Exception {
+ public void testUgiAndKerberosUserGetsCleared() throws Exception {
configure(processor, 0);
runner.setProperty(PutHive3Streaming.METASTORE_URI,
"thrift://localhost:9083");
runner.setProperty(PutHive3Streaming.DB_NAME, "default");
@@ -270,6 +274,7 @@ public class TestPutHive3Streaming {
processor.ugi = mock(UserGroupInformation.class);
runner.run();
assertNull(processor.ugi);
+ assertNull(processor.kerberosUserReference.get());
}
@Test
@@ -281,12 +286,13 @@ public class TestPutHive3Streaming {
runner.setProperty(KERBEROS_CREDENTIALS_SERVICE, "kcs");
runner.enableControllerService(kcs);
ugi = mock(UserGroupInformation.class);
- when(hiveConfigurator.authenticate(eq(hiveConf), anyString(),
anyString())).thenReturn(ugi);
+ when(hiveConfigurator.authenticate(eq(hiveConf),
any(KerberosUser.class))).thenReturn(ugi);
runner.setProperty(PutHive3Streaming.METASTORE_URI,
"thrift://localhost:9083");
runner.setProperty(PutHive3Streaming.DB_NAME, "default");
runner.setProperty(PutHive3Streaming.TABLE_NAME, "users");
runner.enqueue(new byte[0]);
runner.run();
+ verify(hiveConfigurator, times(1)).authenticate(eq(hiveConf),
any(KerberosUser.class));
}
@Test(expected = AssertionError.class)
diff --git
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/main/java/org/apache/nifi/dbcp/hive/Hive_1_1ConnectionPool.java
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/main/java/org/apache/nifi/dbcp/hive/Hive_1_1ConnectionPool.java
index dd2e1fe..1e460bc 100644
---
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/main/java/org/apache/nifi/dbcp/hive/Hive_1_1ConnectionPool.java
+++
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/main/java/org/apache/nifi/dbcp/hive/Hive_1_1ConnectionPool.java
@@ -31,12 +31,16 @@ import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.hadoop.SecurityUtil;
import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.security.krb.KerberosKeytabUser;
+import org.apache.nifi.security.krb.KerberosPasswordUser;
+import org.apache.nifi.security.krb.KerberosUser;
import org.apache.nifi.util.hive.AuthenticationFailedException;
import org.apache.nifi.util.hive.HiveConfigurator;
import org.apache.nifi.util.hive.HiveUtils;
@@ -57,6 +61,8 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
+import javax.security.auth.login.LoginException;
+
/**
* Implementation for Database Connection Pooling Service used for Apache Hive
1.1
* connections. Apache DBCP is used for connection pooling functionality.
@@ -149,6 +155,25 @@ public class Hive_1_1ConnectionPool extends
AbstractControllerService implements
.required(false)
.build();
+ static final PropertyDescriptor KERBEROS_PRINCIPAL = new
PropertyDescriptor.Builder()
+ .name("kerberos-principal")
+ .displayName("Kerberos Principal")
+ .description("The principal to use when specifying the principal
and password directly in the processor for authenticating via Kerberos.")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ static final PropertyDescriptor KERBEROS_PASSWORD = new
PropertyDescriptor.Builder()
+ .name("kerberos-password")
+ .displayName("Kerberos Password")
+ .description("The password to use when specifying the principal
and password directly in the processor for authenticating via Kerberos.")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .sensitive(true)
+ .build();
+
private List<PropertyDescriptor> properties;
@@ -161,6 +186,7 @@ public class Hive_1_1ConnectionPool extends
AbstractControllerService implements
private volatile HiveConfigurator hiveConfigurator = new
HiveConfigurator();
private volatile UserGroupInformation ugi;
+ private final AtomicReference<KerberosUser> kerberosUserReference = new
AtomicReference<>();
@Override
protected void init(final ControllerServiceInitializationContext context) {
@@ -173,6 +199,8 @@ public class Hive_1_1ConnectionPool extends
AbstractControllerService implements
props.add(MAX_TOTAL_CONNECTIONS);
props.add(VALIDATION_QUERY);
props.add(KERBEROS_CREDENTIALS_SERVICE);
+ props.add(KERBEROS_PRINCIPAL);
+ props.add(KERBEROS_PASSWORD);
properties = props;
}
@@ -190,19 +218,29 @@ public class Hive_1_1ConnectionPool extends
AbstractControllerService implements
if (confFileProvided) {
final KerberosCredentialsService credentialsService =
validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+ final String explicitPrincipal =
validationContext.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
+ final String explicitPassword =
validationContext.getProperty(KERBEROS_PASSWORD).getValue();
final String resolvedPrincipal;
final String resolvedKeytab;
- if (credentialsService == null) {
- resolvedPrincipal = null;
- resolvedKeytab = null;
- } else {
+ if (credentialsService != null) {
resolvedPrincipal = credentialsService.getPrincipal();
resolvedKeytab = credentialsService.getKeytab();
+ } else {
+ resolvedPrincipal = explicitPrincipal;
+ resolvedKeytab = null;
}
final String configFiles =
validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
- problems.addAll(hiveConfigurator.validate(configFiles,
resolvedPrincipal, resolvedKeytab, validationResourceHolder, getLogger()));
+ problems.addAll(hiveConfigurator.validate(configFiles,
resolvedPrincipal, resolvedKeytab, explicitPassword, validationResourceHolder,
getLogger()));
+
+ if (credentialsService != null && (explicitPrincipal != null ||
explicitPassword != null)) {
+ problems.add(new ValidationResult.Builder()
+ .subject(KERBEROS_CREDENTIALS_SERVICE.getDisplayName())
+ .valid(false)
+ .explanation("kerberos principal/password and kerberos
credential service cannot be configured at the same time")
+ .build());
+ }
}
return problems;
@@ -260,28 +298,38 @@ public class Hive_1_1ConnectionPool extends
AbstractControllerService implements
final String drv = HiveDriver.class.getName();
if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
+ final String explicitPrincipal =
context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
+ final String explicitPassword =
context.getProperty(KERBEROS_PASSWORD).getValue();
final KerberosCredentialsService credentialsService =
context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
final String resolvedPrincipal;
final String resolvedKeytab;
- if (credentialsService == null) {
- resolvedPrincipal = null;
- resolvedKeytab = null;
- } else {
+ if (credentialsService != null) {
resolvedPrincipal = credentialsService.getPrincipal();
resolvedKeytab = credentialsService.getKeytab();
+ } else {
+ resolvedPrincipal = explicitPrincipal;
+ resolvedKeytab = null;
}
- log.info("Hive Security Enabled, logging in as principal {} with
keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab});
+ if (resolvedKeytab != null) {
+ kerberosUserReference.set(new
KerberosKeytabUser(resolvedPrincipal, resolvedKeytab));
+ log.info("Hive Security Enabled, logging in as principal {}
with keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab});
+ } else if (explicitPassword != null) {
+ kerberosUserReference.set(new
KerberosPasswordUser(resolvedPrincipal, explicitPassword));
+ log.info("Hive Security Enabled, logging in as principal {}
with password", new Object[] {resolvedPrincipal});
+ } else {
+ throw new InitializationException("Unable to authenticate with
Kerberos, no keytab or password was provided");
+ }
try {
- ugi = hiveConfigurator.authenticate(hiveConfig,
resolvedPrincipal, resolvedKeytab);
+ ugi = hiveConfigurator.authenticate(hiveConfig,
kerberosUserReference.get());
} catch (AuthenticationFailedException ae) {
log.error(ae.getMessage(), ae);
throw new InitializationException(ae);
}
- getLogger().info("Successfully logged in as principal {} with
keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab});
+ getLogger().info("Successfully logged in as principal " +
resolvedPrincipal);
}
final String user =
context.getProperty(DB_USER).evaluateAttributeExpressions().getValue();
@@ -325,13 +373,24 @@ public class Hive_1_1ConnectionPool extends
AbstractControllerService implements
public Connection getConnection() throws ProcessException {
try {
if (ugi != null) {
- synchronized(this) {
- /*
- * Make sure that only one thread can request that the UGI
relogin at a time. This
- * explicit relogin attempt is necessary due to the Hive
client/thrift not implicitly handling
- * the acquisition of a new TGT after the current one has
expired.
- * https://issues.apache.org/jira/browse/NIFI-5134
- */
+ /*
+ * Explicitly check the TGT and relogin if necessary with the
KerberosUser instance. No synchronization
+ * is necessary in the client code, since
AbstractKerberosUser's checkTGTAndRelogin method is synchronized.
+ */
+ getLogger().trace("getting UGI instance");
+ if (kerberosUserReference.get() != null) {
+ // if there's a KerberosUser associated with this UGI,
check the TGT and relogin if it is close to expiring
+ KerberosUser kerberosUser = kerberosUserReference.get();
+ getLogger().debug("kerberosUser is " + kerberosUser);
+ try {
+ getLogger().debug("checking TGT on kerberosUser " +
kerberosUser);
+ kerberosUser.checkTGTAndRelogin();
+ } catch (LoginException e) {
+ throw new ProcessException("Unable to relogin with
kerberos credentials for " + kerberosUser.getPrincipal(), e);
+ }
+ } else {
+ getLogger().debug("kerberosUser was null, will not refresh
TGT with KerberosUser");
+ // no synchronization is needed for
UserGroupInformation.checkTGTAndReloginFromKeytab; UGI handles the
synchronization internally
ugi.checkTGTAndReloginFromKeytab();
}
try {
diff --git
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/main/java/org/apache/nifi/util/hive/HiveConfigurator.java
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/main/java/org/apache/nifi/util/hive/HiveConfigurator.java
index 5fed81a..b212207 100644
---
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/main/java/org/apache/nifi/util/hive/HiveConfigurator.java
+++
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/main/java/org/apache/nifi/util/hive/HiveConfigurator.java
@@ -26,6 +26,7 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.hadoop.SecurityUtil;
import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.security.krb.KerberosUser;
import java.io.IOException;
import java.util.ArrayList;
@@ -35,7 +36,8 @@ import java.util.concurrent.atomic.AtomicReference;
public class HiveConfigurator {
- public Collection<ValidationResult> validate(String configFiles, String
principal, String keyTab, AtomicReference<ValidationResources>
validationResourceHolder, ComponentLog log) {
+ public Collection<ValidationResult> validate(String configFiles, String
principal, String keyTab, String password,
+
AtomicReference<ValidationResources> validationResourceHolder, ComponentLog
log) {
final List<ValidationResult> problems = new ArrayList<>();
ValidationResources resources = validationResourceHolder.get();
@@ -50,7 +52,7 @@ public class HiveConfigurator {
final Configuration hiveConfig = resources.getConfiguration();
-
problems.addAll(KerberosProperties.validatePrincipalWithKeytabOrPassword(this.getClass().getSimpleName(),
hiveConfig, principal, keyTab, null, log));
+
problems.addAll(KerberosProperties.validatePrincipalWithKeytabOrPassword(this.getClass().getSimpleName(),
hiveConfig, principal, keyTab, password, log));
return problems;
}
@@ -75,6 +77,22 @@ public class HiveConfigurator {
}
/**
+ * Acquires a {@link UserGroupInformation} using the given {@link
Configuration} and {@link KerberosUser}.
+ * @see SecurityUtil#getUgiForKerberosUser(Configuration, KerberosUser)
+ * @param hiveConfig The Configuration to apply to the acquired
UserGroupInformation
+ * @param kerberosUser The KerberosUser to authenticate
+ * @return A UserGroupInformation instance created using the Subject of
the given KerberosUser
+ * @throws AuthenticationFailedException if authentication fails
+ */
+ public UserGroupInformation authenticate(final Configuration hiveConfig,
KerberosUser kerberosUser) throws AuthenticationFailedException {
+ try {
+ return SecurityUtil.getUgiForKerberosUser(hiveConfig,
kerberosUser);
+ } catch (IOException ioe) {
+ throw new AuthenticationFailedException("Kerberos Authentication
for Hive failed", ioe);
+ }
+ }
+
+ /**
* As of Apache NiFi 1.5.0, due to changes made to
* {@link SecurityUtil#loginKerberos(Configuration, String, String)},
which is used by this
* class to authenticate a principal with Kerberos, Hive controller
services no longer
@@ -91,7 +109,9 @@ public class HiveConfigurator {
* authentication attempts that would leave the Hive controller service in
an unrecoverable state.
*
* @see SecurityUtil#loginKerberos(Configuration, String, String)
+ * @deprecated Use {@link
SecurityUtil#getUgiForKerberosUser(Configuration, KerberosUser)}
*/
+ @Deprecated
public UserGroupInformation authenticate(final Configuration hiveConfig,
String principal, String keyTab) throws AuthenticationFailedException {
UserGroupInformation ugi;
try {