This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new e282a65f7f NIFI-14567 Added OAuth support to
SnowflakeComputingConnectionPool
e282a65f7f is described below
commit e282a65f7f01dd02eeff7229c9c9cdbee24edf9a
Author: Mark Bathori <[email protected]>
AuthorDate: Wed May 14 11:08:33 2025 +0200
NIFI-14567 Added OAuth support to SnowflakeComputingConnectionPool
This closes #9944.
Signed-off-by: Peter Turcsanyi <[email protected]>
---
.../nifi-snowflake-services/pom.xml | 4 ++
.../service/SnowflakeComputingConnectionPool.java | 44 ++++++++++++++++++++++
2 files changed, 48 insertions(+)
diff --git
a/nifi-extension-bundles/nifi-snowflake-bundle/nifi-snowflake-services/pom.xml
b/nifi-extension-bundles/nifi-snowflake-bundle/nifi-snowflake-services/pom.xml
index aaf8d09aa4..1a0f61f194 100644
---
a/nifi-extension-bundles/nifi-snowflake-bundle/nifi-snowflake-services/pom.xml
+++
b/nifi-extension-bundles/nifi-snowflake-bundle/nifi-snowflake-services/pom.xml
@@ -65,6 +65,10 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-oauth2-provider-api</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
diff --git
a/nifi-extension-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/SnowflakeComputingConnectionPool.java
b/nifi-extension-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/SnowflakeComputingConnectionPool.java
index abf3465874..a7e747e26a 100644
---
a/nifi-extension-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/SnowflakeComputingConnectionPool.java
+++
b/nifi-extension-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/SnowflakeComputingConnectionPool.java
@@ -24,6 +24,8 @@ import
org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.behavior.SupportsSensitiveDynamicProperties;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
@@ -33,6 +35,7 @@ import org.apache.nifi.dbcp.utils.DBCPProperties;
import org.apache.nifi.dbcp.utils.DataSourceConfiguration;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.snowflake.SnowflakeConnectionProviderService;
@@ -40,9 +43,11 @@ import
org.apache.nifi.processors.snowflake.SnowflakeConnectionWrapper;
import org.apache.nifi.processors.snowflake.util.SnowflakeProperties;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.proxy.ProxyConfigurationService;
+import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.snowflake.service.util.ConnectionUrlFormat;
import org.apache.nifi.snowflake.service.util.ConnectionUrlFormatParameters;
+import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.util.Collection;
@@ -50,6 +55,8 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
+import static net.snowflake.client.core.SFSessionProperty.AUTHENTICATOR;
+import static net.snowflake.client.core.SFSessionProperty.TOKEN;
import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_PASSWORD;
import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_USER;
import static org.apache.nifi.dbcp.utils.DBCPProperties.EVICTION_RUN_PERIOD;
@@ -134,6 +141,12 @@ public class SnowflakeComputingConnectionPool extends
AbstractDBCPConnectionPool
.description("The password for the Snowflake user.")
.build();
+ public static final PropertyDescriptor ACCESS_TOKEN_PROVIDER = new
PropertyDescriptor.Builder()
+ .name("OAuth2 Access Token Provider")
+ .description("Service providing OAuth2 Access Tokens for
authenticating using the HTTP Authorization Header")
+ .identifiesControllerService(OAuth2AccessTokenProvider.class)
+ .build();
+
public static final PropertyDescriptor SNOWFLAKE_WAREHOUSE = new
PropertyDescriptor.Builder()
.name("warehouse")
.displayName("Warehouse")
@@ -142,6 +155,8 @@ public class SnowflakeComputingConnectionPool extends
AbstractDBCPConnectionPool
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.build();
+ private volatile OAuth2AccessTokenProvider accessTokenProvider;
+
private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS =
List.of(
CONNECTION_URL_FORMAT,
SNOWFLAKE_URL,
@@ -155,6 +170,7 @@ public class SnowflakeComputingConnectionPool extends
AbstractDBCPConnectionPool
SnowflakeProperties.DATABASE,
SnowflakeProperties.SCHEMA,
SNOWFLAKE_WAREHOUSE,
+ ACCESS_TOKEN_PROVIDER,
ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE,
VALIDATION_QUERY,
MAX_WAIT_TIME,
@@ -190,6 +206,23 @@ public class SnowflakeComputingConnectionPool extends
AbstractDBCPConnectionPool
return Collections.emptyList();
}
+ @OnEnabled
+ public void onConfigured(final ConfigurationContext context) throws
InitializationException {
+ super.onConfigured(context);
+ accessTokenProvider =
context.getProperty(ACCESS_TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
+ }
+
+ @OnDisabled
+ public void onDisabled() {
+ accessTokenProvider = null;
+ }
+
+ private void refreshAccessToken() {
+ if (accessTokenProvider != null) {
+ dataSource.addConnectionProperty(TOKEN.getPropertyKey(),
accessTokenProvider.getAccessDetails().getAccessToken());
+ }
+ }
+
@Override
protected DataSourceConfiguration getDataSourceConfiguration(final
ConfigurationContext context) {
final String url = getUrl(context);
@@ -241,6 +274,7 @@ public class SnowflakeComputingConnectionPool extends
AbstractDBCPConnectionPool
final String database =
context.getProperty(SnowflakeProperties.DATABASE).evaluateAttributeExpressions().getValue();
final String schema =
context.getProperty(SnowflakeProperties.SCHEMA).evaluateAttributeExpressions().getValue();
final String warehouse =
context.getProperty(SNOWFLAKE_WAREHOUSE).evaluateAttributeExpressions().getValue();
+ final OAuth2AccessTokenProvider tokenProvider =
context.getProperty(ACCESS_TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
final Map<String, String> connectionProperties =
super.getConnectionProperties(context);
if (database != null) {
@@ -252,6 +286,10 @@ public class SnowflakeComputingConnectionPool extends
AbstractDBCPConnectionPool
if (warehouse != null) {
connectionProperties.put("warehouse", warehouse);
}
+ if (tokenProvider != null) {
+ connectionProperties.put(AUTHENTICATOR.getPropertyKey(), "oauth");
+ connectionProperties.put(TOKEN.getPropertyKey(),
tokenProvider.getAccessDetails().getAccessToken());
+ }
final ProxyConfigurationService proxyConfigurationService = context
.getProperty(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE)
@@ -278,6 +316,12 @@ public class SnowflakeComputingConnectionPool extends
AbstractDBCPConnectionPool
return connectionProperties;
}
+ @Override
+ public Connection getConnection() throws ProcessException {
+ refreshAccessToken();
+ return super.getConnection();
+ }
+
@Override
public SnowflakeConnectionWrapper getSnowflakeConnection() {
return new SnowflakeConnectionWrapper(getConnection());