exceptionfactory commented on a change in pull request #5277:
URL: https://github.com/apache/nifi/pull/5277#discussion_r691202562



##########
File path: nifi-assembly/pom.xml
##########
@@ -742,6 +742,12 @@ language governing permissions and limitations under the 
License. -->
             <version>1.15.0-SNAPSHOT</version>
             <type>nar</type>
         </dependency>
+       <dependency>

Review comment:
       It looks like the indentation of this line is slightly off.

##########
File path: nifi-commons/nifi-security-kerberos/pom.xml
##########
@@ -26,6 +26,11 @@
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-api</artifactId>
             <version>1.15.0-SNAPSHOT</version>
+        </dependency>
+           <dependency>

Review comment:
       It looks like this line identation could be adjusted.
   ```suggestion
              <dependency>
   ```

##########
File path: 
nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
##########
@@ -232,6 +251,22 @@ protected KerberosProperties getKerberosProperties(File 
kerberosConfigFile) {
                 .build());
         }
 
+        if (kerberosUserService != null && (explicitPrincipal != null || 
explicitKeytab != null || explicitPassword != null)) {
+            results.add(new ValidationResult.Builder()
+                    .subject("Kerberos User")
+                    .valid(false)
+                    .explanation("Cannot specify a Kerberos User Service while 
also specifying a Kerberos Principal, Kerberos Keytab, or Kerberos Password")
+                    .build());
+        }
+
+        if (kerberosUserService != null && credentialsService != null) {
+            results.add(new ValidationResult.Builder()
+                    .subject("Kerberos User")
+                    .valid(false)
+                    .explanation("Cannot specify a Kerberos User Service while 
also specifying a Kerberos Credential Service")

Review comment:
       Minor wording adjustment to match service name:
   ```suggestion
                       .explanation("Cannot specify a Kerberos User Service 
while also specifying a Kerberos Credentials Service")
   ```

##########
File path: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
##########
@@ -199,6 +203,13 @@
         .identifiesControllerService(KerberosCredentialsService.class)
         .required(false)
         .build();
+    public static final PropertyDescriptor KERBEROS_USER_SERVICE = new 
PropertyDescriptor.Builder()

Review comment:
       With this being a public variable, is it worth renaming to 
`SELF_CONTAINED_KERBEROS_USER_SERVICE` to indicate the more specific type of 
`KerberosUserService` it identifies?

##########
File path: 
nifi-nar-bundles/nifi-standard-services/nifi-kerberos-user-service-bundle/nifi-kerberos-user-service/.gitignore
##########
@@ -0,0 +1 @@
+/bin/

Review comment:
       Should the `.gitignore` entry be set at a higher parent directory?

##########
File path: 
nifi-commons/nifi-security-kerberos/src/main/java/org/apache/nifi/security/krb/AbstractKerberosUser.java
##########
@@ -75,7 +87,9 @@ public synchronized void login() throws LoginException {
                     // other classes may be referencing an existing subject 
and replacing it may break functionality of those other classes after relogin
                     this.subject = new Subject();
                 }
-                this.loginContext = createLoginContext(subject);
+
+                // the Configuration implementations have only one config 
entry and always return it regardless of the passed in name
+                this.loginContext = new LoginContext("KerberosUser", subject, 
createCallbackHandler(), createConfiguration());

Review comment:
       It looks like the name `KerberosUser` has special meaning and is reused 
elsewhere in this class, it would be helpful to pull it out to a static 
variable.

##########
File path: 
nifi-commons/nifi-security-kerberos/src/test/java/org/apache/nifi/security/krb/TestKerberosTicketCacheUser.java
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.security.krb;
+
+import org.junit.Test;
+
+import javax.security.auth.login.AppConfigurationEntry;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+public class TestKerberosTicketCacheUser {
+
+    @Test
+    public void testGetConfigurationEntry() {
+        final String principal = "f...@nifi.com";
+
+        final KerberosUser kerberosUser = new 
KerberosTicketCacheUser(principal);
+        assertEquals(principal, kerberosUser.getPrincipal());
+
+        final AppConfigurationEntry entry = 
kerberosUser.getConfigurationEntry();
+        assertNotNull(entry);
+        assertEquals(ConfigurationUtil.SUN_KRB5_LOGIN_MODULE, 
entry.getLoginModuleName());
+        assertEquals(AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, 
entry.getControlFlag());
+        assertEquals(principal, entry.getOptions().get("principal"));
+        assertEquals("true", entry.getOptions().get("useTicketCache"));
+        assertNull(entry.getOptions().get("ticketCache"));
+    }
+
+    @Test
+    public void testGetConfigurationEntryWithSpecificTicketCache() {
+        final String principal = "f...@nifi.com";
+        final String ticketCache = "/tmp/cache";

Review comment:
       Is this just a placeholder value, or is the directory actually used? If 
it is used, it would be better to replace it with a base directory using 
`java.io.tmpdir`.

##########
File path: nifi-commons/nifi-security-kerberos-api/pom.xml
##########
@@ -0,0 +1,26 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
https://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-commons</artifactId>
+        <version>1.15.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>nifi-security-kerberos-api</artifactId>
+    <dependencies>
+    </dependencies>

Review comment:
       Should this element be moved since there are no dependencies?

##########
File path: 
nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
##########
@@ -321,6 +356,15 @@ public final void abstractOnStopped() {
                 }
             }
 
+            final KerberosUser kerberosUser = resources.getKerberosUser();
+            if (kerberosUser != null) {
+                try {
+                    kerberosUser.logout();
+                } catch (LoginException e) {
+                    getLogger().warn("Error logging out KerberosUser: " + 
e.getMessage(), e);

Review comment:
       This could be adjusted to use placeholders instead of concatenation. Is 
there value in including the KerberosUser name in the message?
   ```suggestion
                       getLogger().warn("Error logging out KerberosUser: {}", 
e.getMessage(), e);
   ```

##########
File path: 
nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
##########
@@ -212,8 +224,15 @@ protected KerberosProperties getKerberosProperties(File 
kerberosConfigFile) {
 
         try {
             final Configuration conf = 
getHadoopConfigurationForValidation(locations);
-            
results.addAll(KerberosProperties.validatePrincipalWithKeytabOrPassword(
-                    this.getClass().getSimpleName(), conf, resolvedPrincipal, 
resolvedKeytab, explicitPassword, getLogger()));
+            if (kerberosUserService == null) {
+                
results.addAll(KerberosProperties.validatePrincipalWithKeytabOrPassword(
+                        this.getClass().getSimpleName(), conf, 
resolvedPrincipal, resolvedKeytab, explicitPassword, getLogger()));
+            } else {
+                final boolean securityEnabled = 
SecurityUtil.isSecurityEnabled(conf);
+                if (!securityEnabled) {
+                    getLogger().warn("Configuration does not have security 
enabled, KerberosUserService will be ignored");

Review comment:
       For clarity, what do you think about prefixing the message with `Hadoop`?
   ```suggestion
                       getLogger().warn("Hadoop Configuration does not have 
security enabled, KerberosUserService will be ignored");
   ```

##########
File path: 
nifi-commons/nifi-security-kerberos/src/main/java/org/apache/nifi/security/krb/AbstractKerberosUser.java
##########
@@ -212,13 +260,16 @@ private boolean isTGSPrincipal(final KerberosPrincipal 
principal) {
     private long getRefreshTime(final KerberosTicket tgt) {
         long start = tgt.getStartTime().getTime();
         long end = tgt.getEndTime().getTime();
+        long renewUntil = tgt.getRenewTill().getTime();

Review comment:
       Can this be final?

##########
File path: 
nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
##########
@@ -450,6 +476,36 @@ HdfsResources resetHDFSResources(final List<String> 
resourceLocations, ProcessCo
         return new HdfsResources(config, fs, ugi, kerberosUser);
     }
 
+    private KerberosUser getKerberosUser(final ProcessContext context) throws 
IOException {
+        // Check Kerberos User Service first, if present then get the 
KerberosUser from the service
+        // The customValidate method ensures that KerberosUserService can't be 
set at the same time as the credentials service or explicit properties
+        final KerberosUserService kerberosUserService = 
context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
+        if (kerberosUserService != null) {
+            return kerberosUserService.createKerberosUser();
+        }
+
+        // Kerberos User Service wasn't set, so create KerberosUser based on 
credentials service or explicit properties...
+        String principal = 
context.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
+        String keyTab = 
context.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
+        String password = 
context.getProperty(kerberosProperties.getKerberosPassword()).getValue();
+
+        // If the Kerberos Credentials Service is specified, we need to use 
its configuration, not the explicit properties for principal/keytab.
+        // The customValidate method ensures that only one can be set, so we 
know that the principal & keytab above are null.
+        final KerberosCredentialsService credentialsService = 
context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+        if (credentialsService != null) {
+            principal = credentialsService.getPrincipal();
+            keyTab = credentialsService.getKeytab();
+        }
+
+        if (keyTab != null) {
+            return new KerberosKeytabUser(principal, keyTab);
+        } else if (password != null) {
+            return new KerberosPasswordUser(principal, password);
+        } else {
+            throw new IOException("Unable to authenticate with Kerberos, no 
keytab or password was provided");

Review comment:
       It looks like this is a carry-over from the previous implementation, but 
`IOException` doesn't seem like the best option. Although it would change the 
signature, what do you think about `IllegalArgumentException`, or perhaps some 
other exception that indicates a configuration problem?

##########
File path: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/CustomKerberosLogin.java
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.security.JaasContext;
+import org.apache.kafka.common.security.JaasUtils;
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.kafka.common.security.authenticator.AbstractLogin;
+import org.apache.kafka.common.security.kerberos.KerberosLogin;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.RefreshFailedException;
+import javax.security.auth.Subject;
+import javax.security.auth.kerberos.KerberosPrincipal;
+import javax.security.auth.kerberos.KerberosTicket;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.LoginContext;
+import javax.security.auth.login.LoginException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Customized version of {@link 
org.apache.kafka.common.security.kerberos.KerberosLogin} which improves the 
re-login logic
+ * to avoid making system calls to kinit when the ticket cache is being used, 
and to avoid exiting the refresh thread so that
+ * it may recover if the ticket cache is externally refreshed.
+ *
+ * The re-login thread follows a similar approach used by NiFi's KerberosUser 
which attempts to call tgt.refresh()
+ * and falls back to a logout/login.
+ *
+ * The Kafka client is configured to use this login by setting 
SaslConfigs.SASL_LOGIN_CLASS in {@link KafkaProcessorUtils}
+ * when the SASL mechanism is GSSAPI.
+ */
+public class CustomKerberosLogin extends AbstractLogin {
+    private static final Logger log = 
LoggerFactory.getLogger(CustomKerberosLogin.class);
+
+    private Thread t;

Review comment:
       Recommend renaming this variable for clarity:
   ```suggestion
       private Thread refreshThread;
   ```

##########
File path: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
##########
@@ -284,6 +314,16 @@
                         + "must be set or neither must be set.")
                     .build());
             }
+
+            final String jvmJaasConfigFile = 
System.getProperty("java.security.auth.login.config");

Review comment:
       For clarity, it might be helpful to declare the System property name as 
a static variable.

##########
File path: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/CustomKerberosLogin.java
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.security.JaasContext;
+import org.apache.kafka.common.security.JaasUtils;
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.kafka.common.security.authenticator.AbstractLogin;
+import org.apache.kafka.common.security.kerberos.KerberosLogin;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.RefreshFailedException;
+import javax.security.auth.Subject;
+import javax.security.auth.kerberos.KerberosPrincipal;
+import javax.security.auth.kerberos.KerberosTicket;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.LoginContext;
+import javax.security.auth.login.LoginException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Customized version of {@link 
org.apache.kafka.common.security.kerberos.KerberosLogin} which improves the 
re-login logic
+ * to avoid making system calls to kinit when the ticket cache is being used, 
and to avoid exiting the refresh thread so that
+ * it may recover if the ticket cache is externally refreshed.
+ *
+ * The re-login thread follows a similar approach used by NiFi's KerberosUser 
which attempts to call tgt.refresh()
+ * and falls back to a logout/login.
+ *
+ * The Kafka client is configured to use this login by setting 
SaslConfigs.SASL_LOGIN_CLASS in {@link KafkaProcessorUtils}
+ * when the SASL mechanism is GSSAPI.
+ */
+public class CustomKerberosLogin extends AbstractLogin {
+    private static final Logger log = 
LoggerFactory.getLogger(CustomKerberosLogin.class);
+
+    private Thread t;
+    private boolean isKrbTicket;
+
+    private String principal;
+
+    private double ticketRenewWindowFactor;
+    private long minTimeBeforeRelogin;
+
+    private volatile Subject subject;
+
+    private LoginContext loginContext;
+    private String serviceName;
+
+    @Override
+    public void configure(Map<String, ?> configs, String contextName, 
Configuration configuration,
+                          AuthenticateCallbackHandler callbackHandler) {
+        super.configure(configs, contextName, configuration, callbackHandler);
+        this.ticketRenewWindowFactor = (Double) 
configs.get(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR);
+        this.minTimeBeforeRelogin = (Long) 
configs.get(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN);
+        this.serviceName = getServiceName(configs, contextName, configuration);
+    }
+
+    /**
+     * Performs login for each login module specified for the login context of 
this instance and starts the thread used
+     * to periodically re-login to the Kerberos Ticket Granting Server.
+     */
+    @Override
+    public LoginContext login() throws LoginException {
+        loginContext = super.login();
+        subject = loginContext.getSubject();
+        isKrbTicket = 
!subject.getPrivateCredentials(KerberosTicket.class).isEmpty();
+
+        AppConfigurationEntry[] entries = 
configuration().getAppConfigurationEntry(contextName());
+        if (entries.length == 0) {
+            principal = null;
+        } else {
+            // there will only be a single entry
+            AppConfigurationEntry entry = entries[0];
+            if (entry.getOptions().get("principal") != null)
+                principal = (String) entry.getOptions().get("principal");
+            else
+                principal = null;
+        }
+
+        if (!isKrbTicket) {
+            log.debug("[Principal={}]: It is not a Kerberos ticket", 
principal);
+            t = null;
+            // if no TGT, do not bother with ticket management.
+            return loginContext;
+        }
+        log.debug("[Principal={}]: It is a Kerberos ticket", principal);
+
+        t = 
KafkaThread.daemon(String.format("kafka-kerberos-refresh-thread-%s", 
principal), () -> {
+            log.info("[Principal={}]: TGT refresh thread started, 
minTimeBeforeRelogin = {}", principal, minTimeBeforeRelogin);
+            while (true) {
+                try {
+                    Thread.sleep(minTimeBeforeRelogin);
+                } catch (InterruptedException ie) {
+                    log.warn("[Principal={}]: TGT renewal thread has been 
interrupted and will exit.", principal);
+                    return;
+                }
+                try {
+                    checkTGTAndReLogin();
+                } catch (Throwable t) {
+                    log.error("[Principal={}]: Error from TGT refresh thread", 
principal, t);
+                }
+            }
+        });
+        t.start();
+        return loginContext;
+    }
+
+    @Override
+    public void close() {
+        if ((t != null) && (t.isAlive())) {
+            t.interrupt();
+            try {
+                t.join();
+            } catch (InterruptedException e) {
+                log.warn("[Principal={}]: Error while waiting for Login thread 
to shutdown.", principal, e);
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    @Override
+    public Subject subject() {
+        return subject;
+    }
+
+    @Override
+    public String serviceName() {
+        return serviceName;
+    }
+
+    private synchronized void checkTGTAndReLogin() throws LoginException {
+        final KerberosTicket tgt = getTGT();
+        if (tgt == null) {
+            log.info("[Principal={}]: TGT was not found, performing login", 
principal);
+            reLogin();
+            return;
+        }
+
+        if (System.currentTimeMillis() < getRefreshTime(tgt)) {
+            log.debug("[Principal={}]: TGT was found, but has not reached 
expiration window", principal);
+            return;
+        }
+
+        try {
+            tgt.refresh();
+            log.info("[Principal={}]: TGT refreshed", principal);
+            getRefreshTime(tgt);
+        } catch (RefreshFailedException rfe) {
+            log.warn("[Principal={}]: TGT refresh failed, will attempt 
relogin", principal);
+            log.debug("", rfe);
+            reLogin();
+        }
+    }
+
+    private static String getServiceName(Map<String, ?> configs, String 
contextName, Configuration configuration) {
+        List<AppConfigurationEntry> configEntries = 
Arrays.asList(configuration.getAppConfigurationEntry(contextName));
+        String jaasServiceName = JaasContext.configEntryOption(configEntries, 
JaasUtils.SERVICE_NAME, null);
+        String configServiceName = (String) 
configs.get(SaslConfigs.SASL_KERBEROS_SERVICE_NAME);
+        if (jaasServiceName != null && configServiceName != null && 
!jaasServiceName.equals(configServiceName)) {
+            String message = String.format("Conflicting serviceName values 
found in JAAS and Kafka configs " +
+                    "value in JAAS file %s, value in Kafka config %s", 
jaasServiceName, configServiceName);
+            throw new IllegalArgumentException(message);
+        }
+
+        if (jaasServiceName != null)
+            return jaasServiceName;
+        if (configServiceName != null)
+            return configServiceName;
+
+        throw new IllegalArgumentException("No serviceName defined in either 
JAAS or Kafka config");
+    }
+
+
+    private long getRefreshTime(final KerberosTicket tgt) {
+        long start = tgt.getStartTime().getTime();
+        long expires = tgt.getEndTime().getTime();
+
+        log.debug("[Principal={}]: TGT valid starting at: {}", principal, 
tgt.getStartTime());
+        log.debug("[Principal={}]: TGT expires: {}", principal, 
tgt.getEndTime());
+        log.debug("[Principal={}]: TGT renew until: {}", principal, 
tgt.getRenewTill());
+
+        return start + (long) ((expires - start) * ticketRenewWindowFactor);
+    }
+
+    private KerberosTicket getTGT() {
+        Set<KerberosTicket> tickets = 
subject.getPrivateCredentials(KerberosTicket.class);
+        for (KerberosTicket ticket : tickets) {
+            KerberosPrincipal server = ticket.getServer();
+            if (server.getName().equals("krbtgt/" + server.getRealm() + "@" + 
server.getRealm())) {

Review comment:
       This would be easier to follow if the value for comparison were formed 
on a separate line:
   ```suggestion
               final String expectedServerName = String.format("krbtgt/%s@%s", 
server.getRealm(), server.getRealm());
               if (server.getName().equals(expectedServerName)) {
   ```

##########
File path: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/CustomKerberosLogin.java
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.security.JaasContext;
+import org.apache.kafka.common.security.JaasUtils;
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.kafka.common.security.authenticator.AbstractLogin;
+import org.apache.kafka.common.security.kerberos.KerberosLogin;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.RefreshFailedException;
+import javax.security.auth.Subject;
+import javax.security.auth.kerberos.KerberosPrincipal;
+import javax.security.auth.kerberos.KerberosTicket;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.LoginContext;
+import javax.security.auth.login.LoginException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Customized version of {@link 
org.apache.kafka.common.security.kerberos.KerberosLogin} which improves the 
re-login logic
+ * to avoid making system calls to kinit when the ticket cache is being used, 
and to avoid exiting the refresh thread so that
+ * it may recover if the ticket cache is externally refreshed.
+ *
+ * The re-login thread follows a similar approach used by NiFi's KerberosUser 
which attempts to call tgt.refresh()
+ * and falls back to a logout/login.
+ *
+ * The Kafka client is configured to use this login by setting 
SaslConfigs.SASL_LOGIN_CLASS in {@link KafkaProcessorUtils}
+ * when the SASL mechanism is GSSAPI.
+ */
+public class CustomKerberosLogin extends AbstractLogin {
+    private static final Logger log = 
LoggerFactory.getLogger(CustomKerberosLogin.class);
+
+    private Thread t;
+    private boolean isKrbTicket;
+
+    private String principal;
+
+    private double ticketRenewWindowFactor;
+    private long minTimeBeforeRelogin;
+
+    private volatile Subject subject;
+
+    private LoginContext loginContext;
+    private String serviceName;
+
+    @Override
+    public void configure(Map<String, ?> configs, String contextName, 
Configuration configuration,
+                          AuthenticateCallbackHandler callbackHandler) {
+        super.configure(configs, contextName, configuration, callbackHandler);
+        this.ticketRenewWindowFactor = (Double) 
configs.get(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR);
+        this.minTimeBeforeRelogin = (Long) 
configs.get(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN);
+        this.serviceName = getServiceName(configs, contextName, configuration);
+    }
+
+    /**
+     * Performs login for each login module specified for the login context of 
this instance and starts the thread used
+     * to periodically re-login to the Kerberos Ticket Granting Server.
+     */
+    @Override
+    public LoginContext login() throws LoginException {
+        loginContext = super.login();
+        subject = loginContext.getSubject();
+        isKrbTicket = 
!subject.getPrivateCredentials(KerberosTicket.class).isEmpty();
+
+        AppConfigurationEntry[] entries = 
configuration().getAppConfigurationEntry(contextName());
+        if (entries.length == 0) {
+            principal = null;
+        } else {
+            // there will only be a single entry
+            AppConfigurationEntry entry = entries[0];
+            if (entry.getOptions().get("principal") != null)
+                principal = (String) entry.getOptions().get("principal");
+            else
+                principal = null;
+        }
+
+        if (!isKrbTicket) {
+            log.debug("[Principal={}]: It is not a Kerberos ticket", 
principal);
+            t = null;
+            // if no TGT, do not bother with ticket management.
+            return loginContext;
+        }
+        log.debug("[Principal={}]: It is a Kerberos ticket", principal);
+
+        t = 
KafkaThread.daemon(String.format("kafka-kerberos-refresh-thread-%s", 
principal), () -> {
+            log.info("[Principal={}]: TGT refresh thread started, 
minTimeBeforeRelogin = {}", principal, minTimeBeforeRelogin);
+            while (true) {
+                try {
+                    Thread.sleep(minTimeBeforeRelogin);
+                } catch (InterruptedException ie) {
+                    log.warn("[Principal={}]: TGT renewal thread has been 
interrupted and will exit.", principal);
+                    return;
+                }
+                try {
+                    checkTGTAndReLogin();
+                } catch (Throwable t) {
+                    log.error("[Principal={}]: Error from TGT refresh thread", 
principal, t);
+                }
+            }
+        });
+        t.start();
+        return loginContext;
+    }
+
+    @Override
+    public void close() {
+        if ((t != null) && (t.isAlive())) {
+            t.interrupt();
+            try {
+                t.join();
+            } catch (InterruptedException e) {
+                log.warn("[Principal={}]: Error while waiting for Login thread 
to shutdown.", principal, e);
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    @Override
+    public Subject subject() {
+        return subject;
+    }
+
+    @Override
+    public String serviceName() {
+        return serviceName;
+    }
+
+    private synchronized void checkTGTAndReLogin() throws LoginException {
+        final KerberosTicket tgt = getTGT();
+        if (tgt == null) {
+            log.info("[Principal={}]: TGT was not found, performing login", 
principal);
+            reLogin();
+            return;
+        }
+
+        if (System.currentTimeMillis() < getRefreshTime(tgt)) {
+            log.debug("[Principal={}]: TGT was found, but has not reached 
expiration window", principal);
+            return;
+        }
+
+        try {
+            tgt.refresh();
+            log.info("[Principal={}]: TGT refreshed", principal);
+            getRefreshTime(tgt);
+        } catch (RefreshFailedException rfe) {
+            log.warn("[Principal={}]: TGT refresh failed, will attempt 
relogin", principal);
+            log.debug("", rfe);
+            reLogin();
+        }
+    }
+
+    private static String getServiceName(Map<String, ?> configs, String 
contextName, Configuration configuration) {
+        List<AppConfigurationEntry> configEntries = 
Arrays.asList(configuration.getAppConfigurationEntry(contextName));
+        String jaasServiceName = JaasContext.configEntryOption(configEntries, 
JaasUtils.SERVICE_NAME, null);
+        String configServiceName = (String) 
configs.get(SaslConfigs.SASL_KERBEROS_SERVICE_NAME);
+        if (jaasServiceName != null && configServiceName != null && 
!jaasServiceName.equals(configServiceName)) {
+            String message = String.format("Conflicting serviceName values 
found in JAAS and Kafka configs " +
+                    "value in JAAS file %s, value in Kafka config %s", 
jaasServiceName, configServiceName);
+            throw new IllegalArgumentException(message);
+        }
+
+        if (jaasServiceName != null)
+            return jaasServiceName;
+        if (configServiceName != null)
+            return configServiceName;

Review comment:
       Recommend using standard brackets as opposed to shortened conditional 
form.
   ```suggestion
           if (jaasServiceName != null) {
               return jaasServiceName;
           }
           if (configServiceName != null) {
               return configServiceName;
           }    
   ```

##########
File path: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/CustomKerberosLogin.java
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.security.JaasContext;
+import org.apache.kafka.common.security.JaasUtils;
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.kafka.common.security.authenticator.AbstractLogin;
+import org.apache.kafka.common.security.kerberos.KerberosLogin;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.RefreshFailedException;
+import javax.security.auth.Subject;
+import javax.security.auth.kerberos.KerberosPrincipal;
+import javax.security.auth.kerberos.KerberosTicket;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.LoginContext;
+import javax.security.auth.login.LoginException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Customized version of {@link 
org.apache.kafka.common.security.kerberos.KerberosLogin} which improves the 
re-login logic
+ * to avoid making system calls to kinit when the ticket cache is being used, 
and to avoid exiting the refresh thread so that
+ * it may recover if the ticket cache is externally refreshed.
+ *
+ * The re-login thread follows a similar approach used by NiFi's KerberosUser 
which attempts to call tgt.refresh()
+ * and falls back to a logout/login.
+ *
+ * The Kafka client is configured to use this login by setting 
SaslConfigs.SASL_LOGIN_CLASS in {@link KafkaProcessorUtils}
+ * when the SASL mechanism is GSSAPI.
+ */
+public class CustomKerberosLogin extends AbstractLogin {
+    private static final Logger log = 
LoggerFactory.getLogger(CustomKerberosLogin.class);
+
+    private Thread t;
+    private boolean isKrbTicket;
+
+    private String principal;
+
+    private double ticketRenewWindowFactor;
+    private long minTimeBeforeRelogin;
+
+    private volatile Subject subject;
+
+    private LoginContext loginContext;
+    private String serviceName;
+
+    @Override
+    public void configure(Map<String, ?> configs, String contextName, 
Configuration configuration,
+                          AuthenticateCallbackHandler callbackHandler) {
+        super.configure(configs, contextName, configuration, callbackHandler);
+        this.ticketRenewWindowFactor = (Double) 
configs.get(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR);
+        this.minTimeBeforeRelogin = (Long) 
configs.get(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN);
+        this.serviceName = getServiceName(configs, contextName, configuration);
+    }
+
+    /**
+     * Performs login for each login module specified for the login context of 
this instance and starts the thread used
+     * to periodically re-login to the Kerberos Ticket Granting Server.
+     */
+    @Override
+    public LoginContext login() throws LoginException {
+        loginContext = super.login();
+        subject = loginContext.getSubject();
+        isKrbTicket = 
!subject.getPrivateCredentials(KerberosTicket.class).isEmpty();
+
+        AppConfigurationEntry[] entries = 
configuration().getAppConfigurationEntry(contextName());
+        if (entries.length == 0) {
+            principal = null;
+        } else {
+            // there will only be a single entry
+            AppConfigurationEntry entry = entries[0];
+            if (entry.getOptions().get("principal") != null)
+                principal = (String) entry.getOptions().get("principal");
+            else
+                principal = null;
+        }
+
+        if (!isKrbTicket) {
+            log.debug("[Principal={}]: It is not a Kerberos ticket", 
principal);
+            t = null;
+            // if no TGT, do not bother with ticket management.
+            return loginContext;
+        }
+        log.debug("[Principal={}]: It is a Kerberos ticket", principal);
+
+        t = 
KafkaThread.daemon(String.format("kafka-kerberos-refresh-thread-%s", 
principal), () -> {

Review comment:
       Is the `login()` method called only once in the lifecycle of this class? 
This line overwrites the previous Thread value without interrupting the thread. 
In addition, it might be helpful to refactor this approach to an explicit 
`Runnable`, and then use an `ExecutorService`, as opposed to creating a new 
Thread.

##########
File path: 
nifi-nar-bundles/nifi-standard-services/nifi-kerberos-user-service-bundle/nifi-kerberos-user-service/src/main/java/org/apache/nifi/kerberos/KerberosTicketCacheUserService.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kerberos;
+
+import org.apache.nifi.annotation.behavior.Restricted;
+import org.apache.nifi.annotation.behavior.Restriction;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.RequiredPermission;
+import org.apache.nifi.components.resource.ResourceCardinality;
+import org.apache.nifi.components.resource.ResourceType;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.security.krb.KerberosTicketCacheUser;
+import org.apache.nifi.security.krb.KerberosUser;
+
+import java.util.Collections;
+import java.util.List;
+
+@CapabilityDescription("Provides a mechanism for creating a KerberosUser from 
a principal and ticket cache that other components " +
+        "are able to use in order to perform authentication using Kerberos. By 
encapsulating this information into a Controller Service " +
+        "and allowing other components to make use of it an administrator is 
able to choose which users are allowed to use which ticket " +
+        "caches and principals. This provides a more robust security model for 
multi-tenant use cases.")
+@Tags({"Kerberos", "Ticket", "Cache", "Principal", "Credentials", 
"Authentication", "Security"})
+@Restricted(restrictions = {
+        @Restriction(requiredPermission = 
RequiredPermission.ACCESS_TICKET_CACHE,
+                explanation = "Allows user to define a ticket cache and 
principal that can then be used by other components.")
+})
+public class KerberosTicketCacheUserService extends 
AbstractKerberosUserService implements SelfContainedKerberosUserService {
+
+    static final PropertyDescriptor TICKET_CACHE_FILE = new 
PropertyDescriptor.Builder()
+            .name("Kerberos Ticket Cache File")
+            .description("Kerberos ticket cache associated with the 
principal.")
+            .identifiesExternalResource(ResourceCardinality.SINGLE, 
ResourceType.FILE)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .build();
+
+    private volatile String ticketCache;

Review comment:
       Should this be named something like `ticketCachePath` or 
`ticketCacheLocation`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to