pvillard31 commented on code in PR #11090:
URL: https://github.com/apache/nifi/pull/11090#discussion_r3027644071
##########
nifi-extension-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/StandardSnowflakeIngestManagerProviderService.java:
##########
@@ -137,53 +139,63 @@ public class
StandardSnowflakeIngestManagerProviderService extends AbstractContr
PIPE
);
+ private static final String HTTPS_URI_FORMAT = "https://%s";
+
+ private static final String QUALIFIED_PIPE_FORMAT = "%s.%s.%s";
+
+ private static final char UNDERSCORE = '_';
+
+ private static final char HYPHEN = '-';
+
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTY_DESCRIPTORS;
}
- private volatile String fullyQualifiedPipeName;
- private volatile SimpleIngestManager ingestManager;
+ private volatile SnowpipeIngestClient ingestClient;
@OnEnabled
public void onEnabled(final ConfigurationContext context) throws
InitializationException {
final String user =
context.getProperty(USER_NAME).evaluateAttributeExpressions().getValue();
final String database =
context.getProperty(DATABASE).evaluateAttributeExpressions().getValue();
final String schema =
context.getProperty(SCHEMA).evaluateAttributeExpressions().getValue();
final String pipe =
context.getProperty(PIPE).evaluateAttributeExpressions().getValue();
- fullyQualifiedPipeName = database + "." + schema + "." + pipe;
- final PrivateKeyService privateKeyService =
context.getProperty(PRIVATE_KEY_SERVICE)
- .asControllerService(PrivateKeyService.class);
+ final String qualifiedPipeName =
QUALIFIED_PIPE_FORMAT.formatted(database, schema, pipe);
+ final PrivateKeyService privateKeyService =
context.getProperty(PRIVATE_KEY_SERVICE).asControllerService(PrivateKeyService.class);
final PrivateKey privateKey = privateKeyService.getPrivateKey();
- final AccountIdentifierFormat accountIdentifierFormat =
context.getProperty(ACCOUNT_IDENTIFIER_FORMAT)
- .asAllowableValue(AccountIdentifierFormat.class);
- final AccountIdentifierFormatParameters parameters =
getAccountIdentifierFormatParameters(context);
- final String account = accountIdentifierFormat.getAccount(parameters);
- final String host = accountIdentifierFormat.getHostname(parameters);
- try {
- ingestManager = new SimpleIngestManager(account, user,
fullyQualifiedPipeName, privateKey, "https", host, 443);
- } catch (NoSuchAlgorithmException | InvalidKeySpecException e) {
- throw new InitializationException("Failed create Snowflake ingest
manager", e);
+ if (privateKey instanceof RSAPrivateCrtKey rsaPrivateKey) {
+ final AccountIdentifierFormat accountIdentifierFormat =
context.getProperty(ACCOUNT_IDENTIFIER_FORMAT).asAllowableValue(AccountIdentifierFormat.class);
+ final AccountIdentifierFormatParameters parameters =
getAccountIdentifierFormatParameters(context);
+ final String account =
accountIdentifierFormat.getAccount(parameters);
+ final String host =
accountIdentifierFormat.getHostname(parameters);
+ final String hostNormalized = host.replace(UNDERSCORE, HYPHEN);
+
+ final URI baseUri =
URI.create(HTTPS_URI_FORMAT.formatted(hostNormalized));
+ final RSAKeyAuthorizationProvider authorizationProvider = new
RSAKeyAuthorizationProvider(account, user, rsaPrivateKey);
+ ingestClient = new SnowpipeIngestClient(baseUri,
qualifiedPipeName, authorizationProvider);
+ } else {
+ throw new InitializationException("RSA Private Key not provided");
}
}
@OnDisabled
public void onDisabled() {
- if (ingestManager != null) {
- ingestManager.close();
- ingestManager = null;
+ if (ingestClient == null) {
+ getLogger().info("Ingest Client not initialized");
+ } else {
+ ingestClient.close();
}
Review Comment:
Is the info log really useful here? what about setting ingestManager back to
null?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]