This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new e282a65f7f NIFI-14567 Added OAuth support to 
SnowflakeComputingConnectionPool
e282a65f7f is described below

commit e282a65f7f01dd02eeff7229c9c9cdbee24edf9a
Author: Mark Bathori <[email protected]>
AuthorDate: Wed May 14 11:08:33 2025 +0200

    NIFI-14567 Added OAuth support to SnowflakeComputingConnectionPool
    
    This closes #9944.
    
    Signed-off-by: Peter Turcsanyi <[email protected]>
---
 .../nifi-snowflake-services/pom.xml                |  4 ++
 .../service/SnowflakeComputingConnectionPool.java  | 44 ++++++++++++++++++++++
 2 files changed, 48 insertions(+)

diff --git 
a/nifi-extension-bundles/nifi-snowflake-bundle/nifi-snowflake-services/pom.xml 
b/nifi-extension-bundles/nifi-snowflake-bundle/nifi-snowflake-services/pom.xml
index aaf8d09aa4..1a0f61f194 100644
--- 
a/nifi-extension-bundles/nifi-snowflake-bundle/nifi-snowflake-services/pom.xml
+++ 
b/nifi-extension-bundles/nifi-snowflake-bundle/nifi-snowflake-services/pom.xml
@@ -65,6 +65,10 @@
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-utils</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-oauth2-provider-api</artifactId>
+        </dependency>
 
         <dependency>
             <groupId>org.apache.nifi</groupId>
diff --git 
a/nifi-extension-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/SnowflakeComputingConnectionPool.java
 
b/nifi-extension-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/SnowflakeComputingConnectionPool.java
index abf3465874..a7e747e26a 100644
--- 
a/nifi-extension-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/SnowflakeComputingConnectionPool.java
+++ 
b/nifi-extension-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/SnowflakeComputingConnectionPool.java
@@ -24,6 +24,8 @@ import 
org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
 import org.apache.nifi.annotation.behavior.SupportsSensitiveDynamicProperties;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
@@ -33,6 +35,7 @@ import org.apache.nifi.dbcp.utils.DBCPProperties;
 import org.apache.nifi.dbcp.utils.DataSourceConfiguration;
 import org.apache.nifi.expression.AttributeExpression;
 import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.snowflake.SnowflakeConnectionProviderService;
@@ -40,9 +43,11 @@ import 
org.apache.nifi.processors.snowflake.SnowflakeConnectionWrapper;
 import org.apache.nifi.processors.snowflake.util.SnowflakeProperties;
 import org.apache.nifi.proxy.ProxyConfiguration;
 import org.apache.nifi.proxy.ProxyConfigurationService;
+import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.snowflake.service.util.ConnectionUrlFormat;
 import org.apache.nifi.snowflake.service.util.ConnectionUrlFormatParameters;
 
+import java.sql.Connection;
 import java.sql.Driver;
 import java.sql.DriverManager;
 import java.util.Collection;
@@ -50,6 +55,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import static net.snowflake.client.core.SFSessionProperty.AUTHENTICATOR;
+import static net.snowflake.client.core.SFSessionProperty.TOKEN;
 import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_PASSWORD;
 import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_USER;
 import static org.apache.nifi.dbcp.utils.DBCPProperties.EVICTION_RUN_PERIOD;
@@ -134,6 +141,12 @@ public class SnowflakeComputingConnectionPool extends 
AbstractDBCPConnectionPool
             .description("The password for the Snowflake user.")
             .build();
 
+    public static final PropertyDescriptor ACCESS_TOKEN_PROVIDER = new 
PropertyDescriptor.Builder()
+            .name("OAuth2 Access Token Provider")
+            .description("Service providing OAuth2 Access Tokens for 
authenticating using the HTTP Authorization Header")
+            .identifiesControllerService(OAuth2AccessTokenProvider.class)
+            .build();
+
     public static final PropertyDescriptor SNOWFLAKE_WAREHOUSE = new 
PropertyDescriptor.Builder()
             .name("warehouse")
             .displayName("Warehouse")
@@ -142,6 +155,8 @@ public class SnowflakeComputingConnectionPool extends 
AbstractDBCPConnectionPool
             .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
             .build();
 
+    private volatile OAuth2AccessTokenProvider accessTokenProvider;
+
     private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
List.of(
             CONNECTION_URL_FORMAT,
             SNOWFLAKE_URL,
@@ -155,6 +170,7 @@ public class SnowflakeComputingConnectionPool extends 
AbstractDBCPConnectionPool
             SnowflakeProperties.DATABASE,
             SnowflakeProperties.SCHEMA,
             SNOWFLAKE_WAREHOUSE,
+            ACCESS_TOKEN_PROVIDER,
             ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE,
             VALIDATION_QUERY,
             MAX_WAIT_TIME,
@@ -190,6 +206,23 @@ public class SnowflakeComputingConnectionPool extends 
AbstractDBCPConnectionPool
         return Collections.emptyList();
     }
 
+    @OnEnabled
+    public void onConfigured(final ConfigurationContext context) throws 
InitializationException {
+        super.onConfigured(context);
+        accessTokenProvider = 
context.getProperty(ACCESS_TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
+    }
+
+    @OnDisabled
+    public void onDisabled() {
+        accessTokenProvider = null;
+    }
+
+    private void refreshAccessToken() {
+        if (accessTokenProvider != null) {
+            dataSource.addConnectionProperty(TOKEN.getPropertyKey(), 
accessTokenProvider.getAccessDetails().getAccessToken());
+        }
+    }
+
     @Override
     protected DataSourceConfiguration getDataSourceConfiguration(final 
ConfigurationContext context) {
         final String url = getUrl(context);
@@ -241,6 +274,7 @@ public class SnowflakeComputingConnectionPool extends 
AbstractDBCPConnectionPool
         final String database = 
context.getProperty(SnowflakeProperties.DATABASE).evaluateAttributeExpressions().getValue();
         final String schema = 
context.getProperty(SnowflakeProperties.SCHEMA).evaluateAttributeExpressions().getValue();
         final String warehouse = 
context.getProperty(SNOWFLAKE_WAREHOUSE).evaluateAttributeExpressions().getValue();
+        final OAuth2AccessTokenProvider tokenProvider = 
context.getProperty(ACCESS_TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
 
         final Map<String, String> connectionProperties = 
super.getConnectionProperties(context);
         if (database != null) {
@@ -252,6 +286,10 @@ public class SnowflakeComputingConnectionPool extends 
AbstractDBCPConnectionPool
         if (warehouse != null) {
             connectionProperties.put("warehouse", warehouse);
         }
+        if (tokenProvider != null) {
+            connectionProperties.put(AUTHENTICATOR.getPropertyKey(), "oauth");
+            connectionProperties.put(TOKEN.getPropertyKey(), 
tokenProvider.getAccessDetails().getAccessToken());
+        }
 
         final ProxyConfigurationService proxyConfigurationService = context
                 
.getProperty(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE)
@@ -278,6 +316,12 @@ public class SnowflakeComputingConnectionPool extends 
AbstractDBCPConnectionPool
         return connectionProperties;
     }
 
+    @Override
+    public Connection getConnection() throws ProcessException {
+        refreshAccessToken();
+        return super.getConnection();
+    }
+
     @Override
     public SnowflakeConnectionWrapper getSnowflakeConnection() {
         return new SnowflakeConnectionWrapper(getConnection());

Reply via email to