This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new f355348331 NIFI-14982 Bump Snowflake Ingest SDK to 4.3.0 and Snowflake 
JDBC to 3.25.1 (#10313)
f355348331 is described below

commit f355348331ea0587eed3d84f22fce8446619fa68
Author: Pierre Villard <[email protected]>
AuthorDate: Thu Sep 18 15:38:55 2025 +0200

    NIFI-14982 Bump Snowflake Ingest SDK to 4.3.0 and Snowflake JDBC to 3.25.1 
(#10313)
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../nifi-snowflake-processors/pom.xml              |  10 +
 .../snowflake/PutSnowflakeInternalStage.java       |  14 +-
 .../processors/snowflake/SnowflakeIngestIT.java    | 429 +++++++++++++++++++++
 .../nifi-snowflake-bundle/pom.xml                  |   4 +-
 4 files changed, 450 insertions(+), 7 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/pom.xml
 
b/nifi-extension-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/pom.xml
index 54afa998d8..a34015362d 100644
--- 
a/nifi-extension-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/pom.xml
+++ 
b/nifi-extension-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/pom.xml
@@ -50,6 +50,11 @@
             <artifactId>snowflake-ingest-sdk</artifactId>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>net.snowflake</groupId>
+            <artifactId>snowflake-jdbc</artifactId>
+            <scope>provided</scope>
+        </dependency>
         <dependency>
             <groupId>commons-io</groupId>
             <artifactId>commons-io</artifactId>
@@ -91,5 +96,10 @@
             <artifactId>nifi-proxy-configuration-api</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-oauth2-provider-api</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git 
a/nifi-extension-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/PutSnowflakeInternalStage.java
 
b/nifi-extension-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/PutSnowflakeInternalStage.java
index 63b73cad17..fc3879eef0 100644
--- 
a/nifi-extension-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/PutSnowflakeInternalStage.java
+++ 
b/nifi-extension-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/PutSnowflakeInternalStage.java
@@ -147,7 +147,7 @@ public class PutSnowflakeInternalStage extends 
AbstractProcessor {
         }
 
         final SnowflakeInternalStageType internalStageType = 
context.getProperty(INTERNAL_STAGE_TYPE).asAllowableValue(SnowflakeInternalStageType.class);
-        final SnowflakeInternalStageTypeParameters parameters = 
getSnowflakeInternalStageTypeParameters(context, flowFile);
+        final SnowflakeInternalStageTypeParameters parameters = 
getSnowflakeInternalStageTypeParameters(internalStageType, context, flowFile);
         final String internalStageName = 
internalStageType.getStage(parameters);
         final SnowflakeConnectionProviderService connectionProviderService =
                 context.getProperty(SNOWFLAKE_CONNECTION_PROVIDER)
@@ -170,12 +170,16 @@ public class PutSnowflakeInternalStage extends 
AbstractProcessor {
         session.transfer(flowFile, REL_SUCCESS);
     }
 
-    private SnowflakeInternalStageTypeParameters 
getSnowflakeInternalStageTypeParameters(ProcessContext context,
-            FlowFile flowFile) {
+    private SnowflakeInternalStageTypeParameters 
getSnowflakeInternalStageTypeParameters(final SnowflakeInternalStageType 
stageType,
+            final ProcessContext context, final FlowFile flowFile) {
         final String database = 
context.getProperty(DATABASE).evaluateAttributeExpressions(flowFile).getValue();
         final String schema = 
context.getProperty(SCHEMA).evaluateAttributeExpressions(flowFile).getValue();
-        final String table = 
context.getProperty(TABLE).evaluateAttributeExpressions(flowFile).getValue();
-        final String stageName = 
context.getProperty(INTERNAL_STAGE).evaluateAttributeExpressions(flowFile).getValue();
+        final String table = stageType == SnowflakeInternalStageType.TABLE
+                ? 
context.getProperty(TABLE).evaluateAttributeExpressions(flowFile).getValue()
+                : null;
+        final String stageName = stageType == SnowflakeInternalStageType.NAMED
+                ? 
context.getProperty(INTERNAL_STAGE).evaluateAttributeExpressions(flowFile).getValue()
+                : null;
         return new SnowflakeInternalStageTypeParameters(database, schema, 
table, stageName);
     }
 }
diff --git 
a/nifi-extension-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/test/java/org/apache/nifi/processors/snowflake/SnowflakeIngestIT.java
 
b/nifi-extension-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/test/java/org/apache/nifi/processors/snowflake/SnowflakeIngestIT.java
new file mode 100644
index 0000000000..f8c5cdca64
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/test/java/org/apache/nifi/processors/snowflake/SnowflakeIngestIT.java
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake;
+
+import org.apache.nifi.key.service.StandardPrivateKeyService;
+import org.apache.nifi.key.service.api.PrivateKeyService;
+import org.apache.nifi.processors.snowflake.util.SnowflakeAttributes;
+import org.apache.nifi.processors.snowflake.util.SnowflakeInternalStageType;
+import org.apache.nifi.processors.snowflake.util.SnowflakeProperties;
+import org.apache.nifi.snowflake.service.SnowflakeComputingConnectionPool;
+import 
org.apache.nifi.snowflake.service.StandardSnowflakeIngestManagerProviderService;
+import org.apache.nifi.snowflake.service.util.AccountIdentifierFormat;
+import org.apache.nifi.snowflake.service.util.ConnectionUrlFormat;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import static 
org.apache.nifi.processors.snowflake.GetSnowflakeIngestStatus.REL_FAILURE;
+import static 
org.apache.nifi.processors.snowflake.GetSnowflakeIngestStatus.REL_RETRY;
+import static 
org.apache.nifi.processors.snowflake.GetSnowflakeIngestStatus.REL_SUCCESS;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+class SnowflakeIngestIT {
+
+    private static final String ENV_CONNECTION_URL = "NIFI_SNOWFLAKE_URL";
+    private static final String ENV_USERNAME = "NIFI_SNOWFLAKE_USER";
+    private static final String ENV_PASSWORD = "NIFI_SNOWFLAKE_PASSWORD";
+    private static final String ENV_WAREHOUSE = "NIFI_SNOWFLAKE_WAREHOUSE";
+    private static final String ENV_PRIVATE_KEY_PATH = 
"NIFI_SNOWFLAKE_PRIVATE_KEY_PATH";
+    private static final String ENV_PRIVATE_KEY_PASSPHRASE = 
"NIFI_SNOWFLAKE_PRIVATE_KEY_PASSPHRASE";
+    private static final String ENV_ROLE = "NIFI_SNOWFLAKE_ROLE";
+
+    private static final String TEST_DATABASE = "NIFI_TEST_DATABASE";
+    private static final String TEST_SCHEMA = "NIFI_TEST_SCHEMA";
+    private static final String TEST_TABLE = "NIFI_TEST_TABLE";
+    private static final String TEST_STAGE = "NIFI_CSV_STAGE";
+    private static final String TEST_PIPE = "NIFI_CSV_PIPE";
+
+    private static final byte[] CSV_CONTENT = 
"id,value\n1,foo\n".getBytes(StandardCharsets.UTF_8);
+
+    private static final int MAX_INGEST_ATTEMPTS = 10;
+    private static final Duration INGEST_POLL_INTERVAL = Duration.ofSeconds(2);
+
+    private SnowflakeEnvironment environment;
+
+    @BeforeAll
+    void setUpSuite() throws Exception {
+        final ValidationResult validationResult = 
ValidationResult.fromSystemEnv();
+        assumeTrue(validationResult.environment().isPresent(), 
validationResult.message());
+        environment = validationResult.environment().get();
+
+        Class.forName("net.snowflake.client.jdbc.SnowflakeDriver");
+        initializeSnowflakeObjects();
+    }
+
+    @BeforeEach
+    void prepareTestData() throws SQLException {
+        if (environment == null) {
+            return;
+        }
+        truncateTable();
+        clearStage();
+    }
+
+    @AfterEach
+    void cleanupTestData() throws SQLException {
+        if (environment == null) {
+            return;
+        }
+        truncateTable();
+        clearStage();
+    }
+
+    @AfterAll
+    void tearDownSuite() throws SQLException {
+        if (environment == null) {
+            return;
+        }
+        dropSnowflakeObjects();
+    }
+
+    @Test
+    void testSnowflakeIngestFlow() throws Exception {
+        final MockFlowFile stagedFlowFile = stageFlowFile();
+        final MockFlowFile ingestRequest = startIngest(stagedFlowFile);
+        waitForIngestCompletion(ingestRequest);
+        verifyTableContents();
+    }
+
+    private MockFlowFile stageFlowFile() throws Exception {
+        final TestRunner runner = 
TestRunners.newTestRunner(PutSnowflakeInternalStage.class);
+        runner.setValidateExpressionUsage(false);
+        configureConnectionProvider(runner, "snowflake-connection");
+        
runner.setProperty(PutSnowflakeInternalStage.SNOWFLAKE_CONNECTION_PROVIDER, 
"snowflake-connection");
+        runner.setProperty(PutSnowflakeInternalStage.INTERNAL_STAGE_TYPE, 
SnowflakeInternalStageType.NAMED.getValue());
+        runner.setProperty(PutSnowflakeInternalStage.DATABASE, TEST_DATABASE);
+        runner.setProperty(PutSnowflakeInternalStage.SCHEMA, TEST_SCHEMA);
+        runner.setProperty(PutSnowflakeInternalStage.INTERNAL_STAGE, 
TEST_STAGE);
+
+        runner.enqueue(CSV_CONTENT);
+        runner.run();
+
+        
runner.assertAllFlowFilesTransferred(PutSnowflakeInternalStage.REL_SUCCESS, 1);
+        final MockFlowFile result = 
runner.getFlowFilesForRelationship(PutSnowflakeInternalStage.REL_SUCCESS).get(0);
+        final String stagedFilePath = 
result.getAttribute(SnowflakeAttributes.ATTRIBUTE_STAGED_FILE_PATH);
+        assertTrue(stagedFilePath != null && !stagedFilePath.isEmpty(), 
"Staged file path attribute missing");
+        return result;
+    }
+
+    private MockFlowFile startIngest(final MockFlowFile stagedFlowFile) throws 
Exception {
+        final TestRunner runner = 
TestRunners.newTestRunner(StartSnowflakeIngest.class);
+        runner.setValidateExpressionUsage(false);
+        configureIngestManager(runner, "snowflake-ingest");
+        runner.setProperty(StartSnowflakeIngest.INGEST_MANAGER_PROVIDER, 
"snowflake-ingest");
+
+        runner.enqueue(stagedFlowFile.getData(), new 
HashMap<>(stagedFlowFile.getAttributes()));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(StartSnowflakeIngest.REL_SUCCESS, 
1);
+        return 
runner.getFlowFilesForRelationship(StartSnowflakeIngest.REL_SUCCESS).get(0);
+    }
+
+    private void waitForIngestCompletion(final MockFlowFile flowFile) throws 
Exception {
+        final TestRunner runner = 
TestRunners.newTestRunner(GetSnowflakeIngestStatus.class);
+        runner.setValidateExpressionUsage(false);
+        configureIngestManager(runner, "snowflake-ingest-status");
+        runner.setProperty(GetSnowflakeIngestStatus.INGEST_MANAGER_PROVIDER, 
"snowflake-ingest-status");
+
+        final byte[] data = flowFile.getData();
+        final Map<String, String> attributes = new 
HashMap<>(flowFile.getAttributes());
+
+        for (int attempt = 0; attempt < MAX_INGEST_ATTEMPTS; attempt++) {
+            runner.enqueue(data, new HashMap<>(attributes));
+            runner.run();
+
+            if (!runner.getFlowFilesForRelationship(REL_SUCCESS).isEmpty()) {
+                runner.clearTransferState();
+                return;
+            }
+
+            if (!runner.getFlowFilesForRelationship(REL_FAILURE).isEmpty()) {
+                fail("Snowflake ingest reported failure");
+            }
+
+            if (!runner.getFlowFilesForRelationship(REL_RETRY).isEmpty()) {
+                runner.clearTransferState();
+                Thread.sleep(INGEST_POLL_INTERVAL.toMillis());
+                continue;
+            }
+
+            runner.clearTransferState();
+            Thread.sleep(INGEST_POLL_INTERVAL.toMillis());
+        }
+
+        fail("Snowflake ingest did not complete within the expected time");
+    }
+
+    private void verifyTableContents() throws Exception {
+        final TestRunner runner = 
TestRunners.newTestRunner(PutSnowflakeInternalStage.class);
+        runner.setValidateExpressionUsage(false);
+        final SnowflakeComputingConnectionPool connectionService = 
configureConnectionProvider(runner, "snowflake-connection");
+        try (Connection connection = connectionService.getConnection();
+             Statement statement = connection.createStatement();
+             ResultSet resultSet = statement.executeQuery("SELECT ID, VALUE 
FROM " + fullTableName())) {
+            assertTrue(resultSet.next(), "Expected row in test table");
+            assertEquals(1, resultSet.getInt("ID"));
+            assertEquals("foo", resultSet.getString("VALUE"));
+            assertTrue(!resultSet.next(), "Unexpected additional rows in test 
table");
+        }
+    }
+
+    private SnowflakeComputingConnectionPool configureConnectionProvider(final 
TestRunner runner, final String identifier) throws Exception {
+        final SnowflakeComputingConnectionPool connectionService = new 
SnowflakeComputingConnectionPool();
+        runner.addControllerService(identifier, connectionService);
+        runner.setProperty(connectionService, 
SnowflakeComputingConnectionPool.CONNECTION_URL_FORMAT, 
ConnectionUrlFormat.FULL_URL.getValue());
+        runner.setProperty(connectionService, 
SnowflakeComputingConnectionPool.SNOWFLAKE_URL, environment.jdbcUrl());
+        runner.setProperty(connectionService, 
SnowflakeComputingConnectionPool.SNOWFLAKE_USER, environment.username());
+        runner.setProperty(connectionService, 
SnowflakeComputingConnectionPool.SNOWFLAKE_PASSWORD, environment.password());
+        runner.setProperty(connectionService, 
SnowflakeComputingConnectionPool.SNOWFLAKE_WAREHOUSE, environment.warehouse());
+        runner.setProperty(connectionService, SnowflakeProperties.DATABASE, 
TEST_DATABASE);
+        runner.setProperty(connectionService, SnowflakeProperties.SCHEMA, 
TEST_SCHEMA);
+        environment.role().ifPresent(role -> 
runner.setProperty(connectionService, "role", role));
+        runner.enableControllerService(connectionService);
+        return connectionService;
+    }
+
+    private void configureIngestManager(final TestRunner runner, final String 
identifier) throws Exception {
+        final StandardSnowflakeIngestManagerProviderService ingestService = 
new StandardSnowflakeIngestManagerProviderService();
+        runner.addControllerService(identifier, ingestService);
+
+        final PrivateKeyService privateKeyService = 
configurePrivateKeyService(runner, identifier + "-pk");
+        runner.setProperty(ingestService, 
StandardSnowflakeIngestManagerProviderService.ACCOUNT_IDENTIFIER_FORMAT, 
AccountIdentifierFormat.FULL_URL.getValue());
+        runner.setProperty(ingestService, 
StandardSnowflakeIngestManagerProviderService.HOST_URL, 
environment.connectionHost());
+        runner.setProperty(ingestService, 
StandardSnowflakeIngestManagerProviderService.USER_NAME, 
environment.username());
+        runner.setProperty(ingestService, 
StandardSnowflakeIngestManagerProviderService.DATABASE, TEST_DATABASE);
+        runner.setProperty(ingestService, 
StandardSnowflakeIngestManagerProviderService.SCHEMA, TEST_SCHEMA);
+        runner.setProperty(ingestService, 
StandardSnowflakeIngestManagerProviderService.PIPE, TEST_PIPE);
+        runner.setProperty(ingestService, 
StandardSnowflakeIngestManagerProviderService.PRIVATE_KEY_SERVICE, 
privateKeyService.getIdentifier());
+        runner.enableControllerService(ingestService);
+    }
+
+    private PrivateKeyService configurePrivateKeyService(final TestRunner 
runner, final String identifier) throws Exception {
+        final StandardPrivateKeyService privateKeyService = new 
StandardPrivateKeyService();
+        runner.addControllerService(identifier, privateKeyService);
+        runner.setProperty(privateKeyService, 
StandardPrivateKeyService.KEY_FILE, environment.privateKeyPath().toString());
+        runner.setProperty(privateKeyService, 
StandardPrivateKeyService.KEY_PASSWORD, environment.privateKeyPassphrase());
+        runner.enableControllerService(privateKeyService);
+        return privateKeyService;
+    }
+
+    private void initializeSnowflakeObjects() throws SQLException {
+        try (Connection connection = openConnection(); Statement statement = 
connection.createStatement()) {
+            statement.execute("CREATE DATABASE IF NOT EXISTS " + 
TEST_DATABASE);
+            statement.execute("CREATE SCHEMA IF NOT EXISTS " + TEST_DATABASE + 
"." + TEST_SCHEMA);
+            statement.execute("CREATE TABLE IF NOT EXISTS " + fullTableName() 
+ " (id INTEGER, value STRING)");
+            statement.execute("""
+                    CREATE STAGE IF NOT EXISTS %s.%s.%s
+                        FILE_FORMAT = (
+                            TYPE = 'CSV'
+                            FIELD_DELIMITER = ','
+                            SKIP_HEADER = 1
+                            FIELD_OPTIONALLY_ENCLOSED_BY = '"'
+                            ESCAPE_UNENCLOSED_FIELD = NONE
+                        )
+                    """.formatted(TEST_DATABASE, TEST_SCHEMA, TEST_STAGE));
+            statement.execute("""
+                    CREATE OR REPLACE PIPE %s.%s.%s
+                        AUTO_INGEST = FALSE
+                    AS
+                        COPY INTO %s.%s.%s
+                        FROM @%s.%s.%s
+                        FILE_FORMAT = (
+                            TYPE = 'CSV'
+                            FIELD_DELIMITER = ','
+                            SKIP_HEADER = 1
+                            FIELD_OPTIONALLY_ENCLOSED_BY = '"'
+                            ESCAPE_UNENCLOSED_FIELD = NONE
+                        )
+                    """.formatted(TEST_DATABASE, TEST_SCHEMA, TEST_PIPE,
+                    TEST_DATABASE, TEST_SCHEMA, TEST_TABLE,
+                    TEST_DATABASE, TEST_SCHEMA, TEST_STAGE));
+        }
+
+        waitForPipeReady();
+    }
+
+    private void truncateTable() throws SQLException {
+        try (Connection connection = openConnection(); Statement statement = 
connection.createStatement()) {
+            statement.execute("TRUNCATE TABLE IF EXISTS " + fullTableName());
+        }
+    }
+
+    private void clearStage() throws SQLException {
+        try (Connection connection = openConnection(); Statement statement = 
connection.createStatement()) {
+            final boolean hasResultSet = statement.execute("REMOVE @" + 
fullStageName());
+            if (hasResultSet) {
+                try (ResultSet resultSet = statement.getResultSet()) {
+                    while (resultSet != null && resultSet.next()) {
+                        // consume result set
+                    }
+                }
+            }
+        }
+    }
+
+    private void dropSnowflakeObjects() throws SQLException {
+        try (Connection connection = openConnection(); Statement statement = 
connection.createStatement()) {
+            statement.execute("DROP PIPE IF EXISTS " + fullPipeName());
+            statement.execute("DROP STAGE IF EXISTS " + fullStageName());
+            statement.execute("DROP TABLE IF EXISTS " + fullTableName());
+            statement.execute("DROP SCHEMA IF EXISTS " + TEST_DATABASE + "." + 
TEST_SCHEMA + " CASCADE");
+            statement.execute("DROP DATABASE IF EXISTS " + TEST_DATABASE);
+        }
+    }
+
+    private void waitForPipeReady() throws SQLException {
+        final String pipeName = fullPipeName();
+        final String query = "SELECT SYSTEM$PIPE_STATUS('" + pipeName + "')";
+        final int attempts = 10;
+        final long delayMillis = 1_000L;
+
+        for (int attempt = 0; attempt < attempts; attempt++) {
+            try (Connection connection = openConnection(); Statement statement 
= connection.createStatement(); ResultSet resultSet = 
statement.executeQuery(query)) {
+                if (resultSet.next()) {
+                    final String status = resultSet.getString(1);
+                    if (status != null && !status.isEmpty()) {
+                        return;
+                    }
+                }
+            }
+
+            try {
+                Thread.sleep(delayMillis);
+            } catch (InterruptedException interruptedException) {
+                Thread.currentThread().interrupt();
+                throw new IllegalStateException("Interrupted while waiting for 
Snowflake pipe readiness", interruptedException);
+            }
+        }
+        throw new IllegalStateException("Snowflake pipe " + pipeName + " not 
ready after waiting");
+    }
+
+    private Connection openConnection() throws SQLException {
+        final Properties properties = new Properties();
+        properties.put("user", environment.username());
+        properties.put("password", environment.password());
+        properties.put("warehouse", environment.warehouse());
+        environment.role().ifPresent(role -> properties.put("role", role));
+        return DriverManager.getConnection(environment.jdbcUrl(), properties);
+    }
+
+    private String fullTableName() {
+        return TEST_DATABASE + "." + TEST_SCHEMA + "." + TEST_TABLE;
+    }
+
+    private String fullStageName() {
+        return TEST_DATABASE + "." + TEST_SCHEMA + "." + TEST_STAGE;
+    }
+
+    private String fullPipeName() {
+        return TEST_DATABASE + "." + TEST_SCHEMA + "." + TEST_PIPE;
+    }
+
+    private record SnowflakeEnvironment(String connectionHost,
+                                         String username,
+                                         String password,
+                                         String warehouse,
+                                         Path privateKeyPath,
+                                         String privateKeyPassphrase,
+                                         Optional<String> role) {
+        String jdbcUrl() {
+            final String url = connectionHost.startsWith("jdbc:snowflake://")
+                    ? connectionHost
+                    : "jdbc:snowflake://" + connectionHost;
+            return url;
+        }
+    }
+
+    private record ValidationResult(Optional<SnowflakeEnvironment> 
environment, String message) {
+        static ValidationResult fromSystemEnv() {
+            final Map<String, String> env = System.getenv();
+            final List<String> issues = new ArrayList<>();
+
+            final String connectionHost = readEnv(env, ENV_CONNECTION_URL, 
issues);
+            final String username = readEnv(env, ENV_USERNAME, issues);
+            final String password = readEnv(env, ENV_PASSWORD, issues);
+            final String warehouse = readEnv(env, ENV_WAREHOUSE, issues);
+            final String privateKeyPathValue = readEnv(env, 
ENV_PRIVATE_KEY_PATH, issues);
+            final String privateKeyPassphrase = readEnv(env, 
ENV_PRIVATE_KEY_PASSPHRASE, issues);
+            final Optional<String> role = 
Optional.ofNullable(clean(env.get(ENV_ROLE))).filter(value -> !value.isEmpty());
+
+            Path privateKeyPath = null;
+            if (privateKeyPathValue != null) {
+                privateKeyPath = Path.of(privateKeyPathValue).toAbsolutePath();
+                if (!Files.isRegularFile(privateKeyPath)) {
+                    issues.add("Private key file not found at " + 
privateKeyPath);
+                }
+            }
+
+            if (!issues.isEmpty()) {
+                return new ValidationResult(Optional.empty(), String.join(", 
", issues));
+            }
+
+            final SnowflakeEnvironment environment = new SnowflakeEnvironment(
+                    connectionHost,
+                    username,
+                    password,
+                    warehouse,
+                    privateKeyPath,
+                    privateKeyPassphrase,
+                    role
+            );
+            return new ValidationResult(Optional.of(environment), "Snowflake 
integration tests enabled");
+        }
+
+        private static String readEnv(final Map<String, String> env, final 
String key, final List<String> issues) {
+            final String value = clean(env.get(key));
+            if (value == null || value.isEmpty()) {
+                issues.add("Environment variable " + key + " is required");
+                return null;
+            }
+            return value;
+        }
+
+        private static String clean(final String value) {
+            return value == null ? null : value.trim();
+        }
+    }
+}
diff --git a/nifi-extension-bundles/nifi-snowflake-bundle/pom.xml 
b/nifi-extension-bundles/nifi-snowflake-bundle/pom.xml
index b1675462fe..cc16afba9f 100644
--- a/nifi-extension-bundles/nifi-snowflake-bundle/pom.xml
+++ b/nifi-extension-bundles/nifi-snowflake-bundle/pom.xml
@@ -34,13 +34,13 @@
             <dependency>
                 <groupId>net.snowflake</groupId>
                 <artifactId>snowflake-ingest-sdk</artifactId>
-                <version>2.3.0</version>
+                <version>4.3.0</version>
             </dependency>
             <dependency>
                 <groupId>net.snowflake</groupId>
                 <artifactId>snowflake-jdbc</artifactId>
                 <!-- please check snowflake-ingest-sdk compatibility before 
upgrade -->
-                <version>3.23.2</version>
+                <version>3.25.1</version>
             </dependency>
         </dependencies>
     </dependencyManagement>

Reply via email to