pvillard31 commented on code in PR #11346:
URL: https://github.com/apache/nifi/pull/11346#discussion_r3444069168


##########
nifi-extension-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java:
##########
@@ -635,17 +693,30 @@ public void setup(ProcessContext context) {
         final SSLMode sslMode = 
SSLMode.valueOf(context.getProperty(SSL_MODE).getValue());
         final SSLContextService sslContextService = sslMode == 
SSLMode.DISABLED ? null : 
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
 
+        final PasswordSource passwordSource = 
context.getProperty(PASSWORD_SOURCE).asAllowableValue(PasswordSource.class);
+        switch (passwordSource) {
+            case PASSWORD -> {
+                passwordProvider = null;
+                passwordRequestContext = null;
+                password = 
StringUtils.defaultString(context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue());
+            }
+            case PASSWORD_PROVIDER -> {
+                password = null;
+                passwordProvider = 
context.getProperty(DB_PASSWORD_PROVIDER).asControllerService(DatabasePasswordProvider.class);
+                passwordRequestContext = 
DatabasePasswordRequestContext.builder()
+                        
.jdbcUrl(JDBC_URL_FORMAT.formatted(context.getProperty(HOSTS).evaluateAttributeExpressions().getValue()))
+                        
.driverClassName(context.getProperty(DRIVER_NAME).evaluateAttributeExpressions().getValue())
+                        
.databaseUser(context.getProperty(USERNAME).evaluateAttributeExpressions().getValue())
+                        .build();
+            }
+        }
+
         // Save off MySQL cluster and JDBC driver information, will be used to 
connect for event enrichment as well as for the binlog connector
         try {
             List<InetSocketAddress> hosts = 
getHosts(context.getProperty(HOSTS).evaluateAttributeExpressions().getValue());
 
             String username = 
context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
-            String password = 
context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
-
-            // BinaryLogClient expects a non-null password, so set it to the 
empty string if it is not provided
-            if (password == null) {
-                password = "";
-            }
+            String resolvedPassword = resolvePassword();

Review Comment:
   The binlog client password is resolved only once at setup, while the JDBC 
connection re-fetches it on each connection. For a provider that returns short 
lived tokens (like RDS IAM), should the binlog connection also refresh the 
password when it reconnects?



##########
nifi-extension-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java:
##########
@@ -635,17 +693,30 @@ public void setup(ProcessContext context) {
         final SSLMode sslMode = 
SSLMode.valueOf(context.getProperty(SSL_MODE).getValue());
         final SSLContextService sslContextService = sslMode == 
SSLMode.DISABLED ? null : 
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
 
+        final PasswordSource passwordSource = 
context.getProperty(PASSWORD_SOURCE).asAllowableValue(PasswordSource.class);
+        switch (passwordSource) {
+            case PASSWORD -> {
+                passwordProvider = null;
+                passwordRequestContext = null;
+                password = 
StringUtils.defaultString(context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue());
+            }
+            case PASSWORD_PROVIDER -> {
+                password = null;
+                passwordProvider = 
context.getProperty(DB_PASSWORD_PROVIDER).asControllerService(DatabasePasswordProvider.class);
+                passwordRequestContext = 
DatabasePasswordRequestContext.builder()
+                        
.jdbcUrl(JDBC_URL_FORMAT.formatted(context.getProperty(HOSTS).evaluateAttributeExpressions().getValue()))

Review Comment:
   This request context jdbcUrl is built from the raw MySQL Nodes value, so 
with multiple hosts it becomes jdbc:mysql://h1:3306,h2:3306. Should we pass the 
actual connected host so providers that sign per host (RDS IAM) work without 
needing the optional endpoint override?



##########
nifi-extension-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.java:
##########
@@ -190,6 +195,86 @@ public void 
testSslModeRequiredSslContextServiceConnected(@Mock SSLContextServic
         assertEquals(BinaryLogSSLSocketFactory.class, 
sslSocketFactory.getClass(), "Binary Log SSLSocketFactory class not matched");
     }
 
+    @Test
+    public void testPasswordSourceProviderWithoutControllerServiceNotValid() {
+        testRunner.setProperty(CaptureChangeMySQL.HOSTS, 
LOCAL_HOST_DEFAULT_PORT);
+        testRunner.setProperty(CaptureChangeMySQL.PASSWORD_SOURCE, 
CaptureChangeMySQL.PasswordSource.PASSWORD_PROVIDER);
+        testRunner.assertNotValid();
+    }
+
+    @Test
+    public void testPasswordSourceProviderWithControllerServiceValid(@Mock 
DatabasePasswordProvider passwordProvider) throws InitializationException {
+        testRunner.setProperty(CaptureChangeMySQL.HOSTS, 
LOCAL_HOST_DEFAULT_PORT);
+        testRunner.setProperty(CaptureChangeMySQL.PASSWORD_SOURCE, 
CaptureChangeMySQL.PasswordSource.PASSWORD_PROVIDER);
+
+        final String identifier = DatabasePasswordProvider.class.getName();
+        when(passwordProvider.getIdentifier()).thenReturn(identifier);
+        testRunner.addControllerService(identifier, passwordProvider);
+        testRunner.enableControllerService(passwordProvider);
+        testRunner.setProperty(CaptureChangeMySQL.DB_PASSWORD_PROVIDER, 
identifier);
+        testRunner.assertValid();
+    }
+
+    @Test
+    public void testPasswordProviderTokenUsedOnRun(@Mock 
DatabasePasswordProvider passwordProvider) throws InitializationException {
+        final String identifier = DatabasePasswordProvider.class.getName();
+        when(passwordProvider.getIdentifier()).thenReturn(identifier);
+        
when(passwordProvider.getPassword(any())).thenReturn("token".toCharArray());
+        testRunner.addControllerService(identifier, passwordProvider);
+        testRunner.enableControllerService(passwordProvider);
+
+        testRunner.setProperty(CaptureChangeMySQL.HOSTS, 
LOCAL_HOST_DEFAULT_PORT);
+        testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 
DRIVER_LOCATION);
+        testRunner.setProperty(CaptureChangeMySQL.USERNAME, ROOT_USER);
+        testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, 
CONNECT_TIMEOUT);
+        testRunner.setProperty(CaptureChangeMySQL.PASSWORD_SOURCE, 
CaptureChangeMySQL.PasswordSource.PASSWORD_PROVIDER);
+        testRunner.setProperty(CaptureChangeMySQL.DB_PASSWORD_PROVIDER, 
identifier);
+
+        testRunner.run(1, false, true);
+
+        verify(passwordProvider, 
atLeastOnce()).getPassword(any(DatabasePasswordRequestContext.class));
+    }
+
+    @Test
+    public void testPasswordProviderEmptyTokenThrowsProcessException(@Mock 
DatabasePasswordProvider passwordProvider) throws InitializationException {

Review Comment:
   Could the empty token and null token tests be merged into one parameterized 
test, since they assert the same branch and the same message?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to