This is an automated email from the ASF dual-hosted git repository. exceptionfactory pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new fd8acd57b8 NIFI-13555 Added Verification to HikariDBCPConnectionPool (#9085) fd8acd57b8 is described below commit fd8acd57b8172417e382fd004a7a7480792d2da7 Author: Matt Burgess <mattyb...@apache.org> AuthorDate: Tue Aug 6 14:26:19 2024 -0400 NIFI-13555 Added Verification to HikariDBCPConnectionPool (#9085) Signed-off-by: David Handermann <exceptionfact...@apache.org> --- .../apache/nifi/dbcp/HikariCPConnectionPool.java | 201 +++++++++++++++++---- .../nifi/dbcp/HikariCPConnectionPoolTest.java | 51 +++++- .../nifi/util/MockControllerServiceLookup.java | 2 +- 3 files changed, 211 insertions(+), 43 deletions(-) diff --git a/nifi-extension-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-hikari-dbcp-service/src/main/java/org/apache/nifi/dbcp/HikariCPConnectionPool.java b/nifi-extension-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-hikari-dbcp-service/src/main/java/org/apache/nifi/dbcp/HikariCPConnectionPool.java index 230df5a6f3..59413b71e4 100644 --- a/nifi-extension-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-hikari-dbcp-service/src/main/java/org/apache/nifi/dbcp/HikariCPConnectionPool.java +++ b/nifi-extension-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-hikari-dbcp-service/src/main/java/org/apache/nifi/dbcp/HikariCPConnectionPool.java @@ -27,6 +27,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnDisabled; import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.ConfigVerificationResult; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.RequiredPermission; @@ -34,12 +35,15 @@ import org.apache.nifi.components.resource.ResourceCardinality; import org.apache.nifi.components.resource.ResourceType; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.VerifiableControllerService; import org.apache.nifi.expression.AttributeExpression; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.kerberos.KerberosUserService; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.security.krb.KerberosAction; +import org.apache.nifi.security.krb.KerberosLoginException; import org.apache.nifi.security.krb.KerberosUser; import javax.security.auth.login.LoginException; @@ -48,10 +52,14 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import static org.apache.nifi.components.ConfigVerificationResult.Outcome.FAILED; +import static org.apache.nifi.components.ConfigVerificationResult.Outcome.SUCCESSFUL; + /** * Implementation of Database Connection Pooling Service. HikariCP is used for connection pooling functionality. */ @@ -71,7 +79,7 @@ import java.util.stream.Collectors; ) } ) -public class HikariCPConnectionPool extends AbstractControllerService implements DBCPService { +public class HikariCPConnectionPool extends AbstractControllerService implements DBCPService, VerifiableControllerService { /** * Property Name Prefix for Sensitive Dynamic Properties */ @@ -81,6 +89,8 @@ public class HikariCPConnectionPool extends AbstractControllerService implements private static final String DEFAULT_TOTAL_CONNECTIONS = "10"; private static final String DEFAULT_MAX_CONN_LIFETIME = "-1"; + private static final int DEFAULT_MIN_VALIDATION_TIMEOUT = 250; + public static final PropertyDescriptor DATABASE_URL = new PropertyDescriptor.Builder() .name("hikaricp-connection-url") .displayName("Database Connection URL") @@ -254,7 +264,132 @@ public class HikariCPConnectionPool extends AbstractControllerService implements */ @OnEnabled public void onConfigured(final ConfigurationContext context) { + dataSource = new HikariDataSource(); + configureDataSource(context, dataSource); + } + + private long extractMillisWithInfinite(PropertyValue prop) { + return "-1".equals(prop.getValue()) ? INFINITE_MILLISECONDS : prop.asTimePeriod(TimeUnit.MILLISECONDS); + } + + /** + * Shutdown pool, close all open connections. + * If a principal is authenticated with a KDC, that principal is logged out. + * <p> + * If a @{@link LoginException} occurs while attempting to log out the @{@link org.apache.nifi.security.krb.KerberosUser}, + * an attempt will still be made to shut down the pool and close open connections. + * + */ + @OnDisabled + public void shutdown() { + try { + if (kerberosUser != null) { + kerberosUser.logout(); + } + } finally { + kerberosUser = null; + try { + if (dataSource != null) { + dataSource.close(); + } + } finally { + dataSource = null; + } + } + } + + @Override + public Connection getConnection() throws ProcessException { + try { + final Connection con; + if (kerberosUser != null) { + KerberosAction<Connection> kerberosAction = new KerberosAction<>(kerberosUser, dataSource::getConnection, getLogger()); + con = kerberosAction.execute(); + } else { + con = dataSource.getConnection(); + } + return con; + } catch (final SQLException e) { + // If using Kerberos, attempt to re-login + if (kerberosUser != null) { + getLogger().info("Error getting connection, performing Kerberos re-login"); + kerberosUser.login(); + } + throw new ProcessException("Connection retrieval failed", e); + } + } + + @Override + public List<ConfigVerificationResult> verify(final ConfigurationContext context, final ComponentLog verificationLogger, final Map<String, String> variables) { + List<ConfigVerificationResult> results = new ArrayList<>(); + final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class); + KerberosUser kerberosUser = null; + try { + if (kerberosUserService != null) { + kerberosUser = kerberosUserService.createKerberosUser(); + if (kerberosUser != null) { + results.add(new ConfigVerificationResult.Builder() + .verificationStepName("Configure Kerberos User") + .outcome(SUCCESSFUL) + .explanation("Successfully configured Kerberos user") + .build()); + } + } + } catch (final Exception e) { + verificationLogger.error("Failed to configure Kerberos user", e); + results.add(new ConfigVerificationResult.Builder() + .verificationStepName("Configure Kerberos User") + .outcome(FAILED) + .explanation("Failed to configure Kerberos user: " + e.getMessage()) + .build()); + } + final HikariDataSource hikariDataSource = new HikariDataSource(); + try { + configureDataSource(context, hikariDataSource); + results.add(new ConfigVerificationResult.Builder() + .verificationStepName("Configure Data Source") + .outcome(SUCCESSFUL) + .explanation("Successfully configured data source") + .build()); + + try (final Connection conn = getConnection(hikariDataSource, kerberosUser)) { + results.add(new ConfigVerificationResult.Builder() + .verificationStepName("Establish Connection") + .outcome(SUCCESSFUL) + .explanation("Successfully established Database Connection") + .build()); + } catch (final Exception e) { + verificationLogger.error("Failed to establish Database Connection", e); + results.add(new ConfigVerificationResult.Builder() + .verificationStepName("Establish Connection") + .outcome(FAILED) + .explanation("Failed to establish Database Connection: " + e.getMessage()) + .build()); + } + } catch (final Exception e) { + String message = "Failed to configure Data Source."; + if (e.getCause() instanceof ClassNotFoundException) { + message += String.format(" Ensure changes to the '%s' property are applied before verifying", + DB_DRIVER_LOCATION.getDisplayName()); + } + verificationLogger.error(message, e); + results.add(new ConfigVerificationResult.Builder() + .verificationStepName("Configure Data Source") + .outcome(FAILED) + .explanation(message + ": " + e.getMessage()) + .build()); + } finally { + try { + shutdown(dataSource, kerberosUser); + } catch (final SQLException e) { + verificationLogger.error("Failed to shut down data source", e); + } + } + return results; + } + + protected void configureDataSource(final ConfigurationContext context, final HikariDataSource dataSource) { final String driverName = context.getProperty(DB_DRIVERNAME).evaluateAttributeExpressions().getValue(); final String user = context.getProperty(DB_USER).evaluateAttributeExpressions().getValue(); final String passw = context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue(); @@ -264,6 +399,7 @@ public class HikariCPConnectionPool extends AbstractControllerService implements final long maxWaitMillis = extractMillisWithInfinite(context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions()); final int minIdle = context.getProperty(MIN_IDLE).evaluateAttributeExpressions().asInteger(); final long maxConnLifetimeMillis = extractMillisWithInfinite(context.getProperty(MAX_CONN_LIFETIME).evaluateAttributeExpressions()); + final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class); if (kerberosUserService != null) { @@ -273,9 +409,8 @@ public class HikariCPConnectionPool extends AbstractControllerService implements } } - dataSource = new HikariDataSource(); - dataSource.setDriverClassName(driverName); dataSource.setConnectionTimeout(maxWaitMillis); + dataSource.setValidationTimeout(Math.max(maxWaitMillis, DEFAULT_MIN_VALIDATION_TIMEOUT)); dataSource.setMaximumPoolSize(maxTotal); dataSource.setMinimumIdle(minIdle); dataSource.setMaxLifetime(maxConnLifetimeMillis); @@ -284,6 +419,7 @@ public class HikariCPConnectionPool extends AbstractControllerService implements dataSource.setConnectionTestQuery(validationQuery); } + dataSource.setDriverClassName(driverName); dataSource.setJdbcUrl(dburl); dataSource.setUsername(user); dataSource.setPassword(passw); @@ -308,42 +444,11 @@ public class HikariCPConnectionPool extends AbstractControllerService implements dataSource.setPoolName(toString()); } - private long extractMillisWithInfinite(PropertyValue prop) { - return "-1".equals(prop.getValue()) ? INFINITE_MILLISECONDS : prop.asTimePeriod(TimeUnit.MILLISECONDS); - } - - /** - * Shutdown pool, close all open connections. - * If a principal is authenticated with a KDC, that principal is logged out. - * <p> - * If a @{@link LoginException} occurs while attempting to log out the @{@link org.apache.nifi.security.krb.KerberosUser}, - * an attempt will still be made to shut down the pool and close open connections. - * - */ - @OnDisabled - public void shutdown() { - try { - if (kerberosUser != null) { - kerberosUser.logout(); - } - } finally { - kerberosUser = null; - try { - if (dataSource != null) { - dataSource.close(); - } - } finally { - dataSource = null; - } - } - } - - @Override - public Connection getConnection() throws ProcessException { + private Connection getConnection(final HikariDataSource dataSource, final KerberosUser kerberosUser) { try { final Connection con; if (kerberosUser != null) { - KerberosAction<Connection> kerberosAction = new KerberosAction<>(kerberosUser, () -> dataSource.getConnection(), getLogger()); + KerberosAction<Connection> kerberosAction = new KerberosAction<>(kerberosUser, dataSource::getConnection, getLogger()); con = kerberosAction.execute(); } else { con = dataSource.getConnection(); @@ -352,14 +457,30 @@ public class HikariCPConnectionPool extends AbstractControllerService implements } catch (final SQLException e) { // If using Kerberos, attempt to re-login if (kerberosUser != null) { - getLogger().info("Error getting connection, performing Kerberos re-login"); - kerberosUser.login(); + try { + getLogger().info("Error getting connection, performing Kerberos re-login", e); + kerberosUser.login(); + } catch (KerberosLoginException le) { + throw new ProcessException("Unable to authenticate Kerberos principal", le); + } } - throw new ProcessException("Connection retrieval failed", e); + throw new ProcessException(e); } } - @Override + private void shutdown(final HikariDataSource dataSource, final KerberosUser kerberosUser) throws SQLException { + try { + if (kerberosUser != null) { + kerberosUser.logout(); + } + } finally { + if (dataSource != null) { + dataSource.close(); + } + } + } + + @Override public String toString() { return String.format("%s[id=%s]", getClass().getSimpleName(), getIdentifier()); } diff --git a/nifi-extension-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-hikari-dbcp-service/src/test/java/org/apache/nifi/dbcp/HikariCPConnectionPoolTest.java b/nifi-extension-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-hikari-dbcp-service/src/test/java/org/apache/nifi/dbcp/HikariCPConnectionPoolTest.java index 3abc75cb19..5d2f925a0a 100644 --- a/nifi-extension-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-hikari-dbcp-service/src/test/java/org/apache/nifi/dbcp/HikariCPConnectionPoolTest.java +++ b/nifi-extension-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-hikari-dbcp-service/src/test/java/org/apache/nifi/dbcp/HikariCPConnectionPoolTest.java @@ -16,7 +16,11 @@ */ package org.apache.nifi.dbcp; +import org.apache.nifi.components.ConfigVerificationResult; import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.ControllerServiceConfiguration; +import org.apache.nifi.util.MockConfigurationContext; +import org.apache.nifi.util.MockProcessContext; import org.apache.nifi.util.NoOpProcessor; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -26,8 +30,14 @@ import org.junit.jupiter.api.Test; import java.sql.Connection; import java.sql.SQLException; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; public class HikariCPConnectionPoolTest { @@ -35,6 +45,10 @@ public class HikariCPConnectionPoolTest { private static final String INVALID_CONNECTION_URL = "jdbc:h2"; + private static final String DB_DRIVERNAME_VALUE = "jdbc:mock"; + + private static final String MAX_WAIT_TIME_VALUE = "5 s"; + private TestRunner runner; @BeforeEach @@ -134,11 +148,44 @@ public class HikariCPConnectionPoolTest { } } + @Test + void testVerifySuccessful() throws Exception { + final HikariCPConnectionPool service = new HikariCPConnectionPool(); + runner.addControllerService(SERVICE_ID, service); + final Connection mockConnection = mock(Connection.class); + MockDriver.setConnection(mockConnection); + setDatabaseProperties(service); + runner.setProperty(service, HikariCPConnectionPool.MAX_TOTAL_CONNECTIONS, "2"); + runner.enableControllerService(service); + runner.assertValid(service); + MockProcessContext processContext = (MockProcessContext) runner.getProcessContext(); + final ControllerServiceConfiguration configuration = processContext.getConfiguration(service.getIdentifier()); + final MockConfigurationContext configContext = new MockConfigurationContext(service, configuration.getProperties(), processContext, Collections.emptyMap()); + final List<ConfigVerificationResult> results = service.verify(configContext, runner.getLogger(), configContext.getAllProperties()); + + assertOutcomeSuccessful(results); + } + private void setDatabaseProperties(final HikariCPConnectionPool service) { - runner.setProperty(service, HikariCPConnectionPool.DATABASE_URL, "jdbc:mock"); + runner.setProperty(service, HikariCPConnectionPool.DATABASE_URL, DB_DRIVERNAME_VALUE); runner.setProperty(service, HikariCPConnectionPool.DB_DRIVERNAME, MockDriver.class.getName()); - runner.setProperty(service, HikariCPConnectionPool.MAX_WAIT_TIME, "5 s"); + runner.setProperty(service, HikariCPConnectionPool.MAX_WAIT_TIME, MAX_WAIT_TIME_VALUE); runner.setProperty(service, HikariCPConnectionPool.DB_USER, String.class.getSimpleName()); runner.setProperty(service, HikariCPConnectionPool.DB_PASSWORD, String.class.getName()); } + + private void assertOutcomeSuccessful(final List<ConfigVerificationResult> results) { + assertNotNull(results); + final Iterator<ConfigVerificationResult> resultsFound = results.iterator(); + + assertTrue(resultsFound.hasNext()); + final ConfigVerificationResult firstResult = resultsFound.next(); + assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, firstResult.getOutcome(), firstResult.getExplanation()); + + assertTrue(resultsFound.hasNext()); + final ConfigVerificationResult secondResult = resultsFound.next(); + assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, secondResult.getOutcome(), secondResult.getExplanation()); + + assertFalse(resultsFound.hasNext()); + } } \ No newline at end of file diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceLookup.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceLookup.java index 5bec0ce078..9d5de109d6 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceLookup.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceLookup.java @@ -55,7 +55,7 @@ public abstract class MockControllerServiceLookup implements ControllerServiceLo this.controllerServiceMap.putAll(other.controllerServiceMap); } - protected ControllerServiceConfiguration getConfiguration(final String identifier) { + public ControllerServiceConfiguration getConfiguration(final String identifier) { return controllerServiceMap.get(identifier); }