pvillard31 commented on code in PR #11346:
URL: https://github.com/apache/nifi/pull/11346#discussion_r3444069168
##########
nifi-extension-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java:
##########
@@ -635,17 +693,30 @@ public void setup(ProcessContext context) {
final SSLMode sslMode =
SSLMode.valueOf(context.getProperty(SSL_MODE).getValue());
final SSLContextService sslContextService = sslMode ==
SSLMode.DISABLED ? null :
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+ final PasswordSource passwordSource =
context.getProperty(PASSWORD_SOURCE).asAllowableValue(PasswordSource.class);
+ switch (passwordSource) {
+ case PASSWORD -> {
+ passwordProvider = null;
+ passwordRequestContext = null;
+ password =
StringUtils.defaultString(context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue());
+ }
+ case PASSWORD_PROVIDER -> {
+ password = null;
+ passwordProvider =
context.getProperty(DB_PASSWORD_PROVIDER).asControllerService(DatabasePasswordProvider.class);
+ passwordRequestContext =
DatabasePasswordRequestContext.builder()
+
.jdbcUrl(JDBC_URL_FORMAT.formatted(context.getProperty(HOSTS).evaluateAttributeExpressions().getValue()))
+
.driverClassName(context.getProperty(DRIVER_NAME).evaluateAttributeExpressions().getValue())
+
.databaseUser(context.getProperty(USERNAME).evaluateAttributeExpressions().getValue())
+ .build();
+ }
+ }
+
// Save off MySQL cluster and JDBC driver information, will be used to
connect for event enrichment as well as for the binlog connector
try {
List<InetSocketAddress> hosts =
getHosts(context.getProperty(HOSTS).evaluateAttributeExpressions().getValue());
String username =
context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
- String password =
context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
-
- // BinaryLogClient expects a non-null password, so set it to the
empty string if it is not provided
- if (password == null) {
- password = "";
- }
+ String resolvedPassword = resolvePassword();
Review Comment:
The binlog client password is resolved only once at setup, while the JDBC
connection re-fetches it on each connection. For a provider that returns short
lived tokens (like RDS IAM), should the binlog connection also refresh the
password when it reconnects?
##########
nifi-extension-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java:
##########
@@ -635,17 +693,30 @@ public void setup(ProcessContext context) {
final SSLMode sslMode =
SSLMode.valueOf(context.getProperty(SSL_MODE).getValue());
final SSLContextService sslContextService = sslMode ==
SSLMode.DISABLED ? null :
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+ final PasswordSource passwordSource =
context.getProperty(PASSWORD_SOURCE).asAllowableValue(PasswordSource.class);
+ switch (passwordSource) {
+ case PASSWORD -> {
+ passwordProvider = null;
+ passwordRequestContext = null;
+ password =
StringUtils.defaultString(context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue());
+ }
+ case PASSWORD_PROVIDER -> {
+ password = null;
+ passwordProvider =
context.getProperty(DB_PASSWORD_PROVIDER).asControllerService(DatabasePasswordProvider.class);
+ passwordRequestContext =
DatabasePasswordRequestContext.builder()
+
.jdbcUrl(JDBC_URL_FORMAT.formatted(context.getProperty(HOSTS).evaluateAttributeExpressions().getValue()))
Review Comment:
This request context jdbcUrl is built from the raw MySQL Nodes value, so
with multiple hosts it becomes jdbc:mysql://h1:3306,h2:3306. Should we pass the
actual connected host so providers that sign per host (RDS IAM) work without
needing the optional endpoint override?
##########
nifi-extension-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.java:
##########
@@ -190,6 +195,86 @@ public void
testSslModeRequiredSslContextServiceConnected(@Mock SSLContextServic
assertEquals(BinaryLogSSLSocketFactory.class,
sslSocketFactory.getClass(), "Binary Log SSLSocketFactory class not matched");
}
+ @Test
+ public void testPasswordSourceProviderWithoutControllerServiceNotValid() {
+ testRunner.setProperty(CaptureChangeMySQL.HOSTS,
LOCAL_HOST_DEFAULT_PORT);
+ testRunner.setProperty(CaptureChangeMySQL.PASSWORD_SOURCE,
CaptureChangeMySQL.PasswordSource.PASSWORD_PROVIDER);
+ testRunner.assertNotValid();
+ }
+
+ @Test
+ public void testPasswordSourceProviderWithControllerServiceValid(@Mock
DatabasePasswordProvider passwordProvider) throws InitializationException {
+ testRunner.setProperty(CaptureChangeMySQL.HOSTS,
LOCAL_HOST_DEFAULT_PORT);
+ testRunner.setProperty(CaptureChangeMySQL.PASSWORD_SOURCE,
CaptureChangeMySQL.PasswordSource.PASSWORD_PROVIDER);
+
+ final String identifier = DatabasePasswordProvider.class.getName();
+ when(passwordProvider.getIdentifier()).thenReturn(identifier);
+ testRunner.addControllerService(identifier, passwordProvider);
+ testRunner.enableControllerService(passwordProvider);
+ testRunner.setProperty(CaptureChangeMySQL.DB_PASSWORD_PROVIDER,
identifier);
+ testRunner.assertValid();
+ }
+
+ @Test
+ public void testPasswordProviderTokenUsedOnRun(@Mock
DatabasePasswordProvider passwordProvider) throws InitializationException {
+ final String identifier = DatabasePasswordProvider.class.getName();
+ when(passwordProvider.getIdentifier()).thenReturn(identifier);
+
when(passwordProvider.getPassword(any())).thenReturn("token".toCharArray());
+ testRunner.addControllerService(identifier, passwordProvider);
+ testRunner.enableControllerService(passwordProvider);
+
+ testRunner.setProperty(CaptureChangeMySQL.HOSTS,
LOCAL_HOST_DEFAULT_PORT);
+ testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION,
DRIVER_LOCATION);
+ testRunner.setProperty(CaptureChangeMySQL.USERNAME, ROOT_USER);
+ testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT,
CONNECT_TIMEOUT);
+ testRunner.setProperty(CaptureChangeMySQL.PASSWORD_SOURCE,
CaptureChangeMySQL.PasswordSource.PASSWORD_PROVIDER);
+ testRunner.setProperty(CaptureChangeMySQL.DB_PASSWORD_PROVIDER,
identifier);
+
+ testRunner.run(1, false, true);
+
+ verify(passwordProvider,
atLeastOnce()).getPassword(any(DatabasePasswordRequestContext.class));
+ }
+
+ @Test
+ public void testPasswordProviderEmptyTokenThrowsProcessException(@Mock
DatabasePasswordProvider passwordProvider) throws InitializationException {
Review Comment:
Could the empty token and null token tests be merged into one parameterized
test, since they assert the same branch and the same message?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]