>From Hussain Towaileb <[email protected]>: Hussain Towaileb has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19386 )
Change subject: [ASTERIXDB-3514][EXT]: Cleanup on S3 cross-account auth ...................................................................... [ASTERIXDB-3514][EXT]: Cleanup on S3 cross-account auth Ext-ref: MB-63505 Change-Id: Iba5dfd5e077f87aa0c1ee1cc81fcf197c9f06762 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19386 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Hussain Towaileb <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java 6 files changed, 119 insertions(+), 124 deletions(-) Approvals: Murtadha Hubail: Looks good to me, approved Hussain Towaileb: Looks good to me, but someone else must approve Jenkins: Verified; Verified Anon. E. Moose #1000171: diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java index ed26f7c..fcdbabd 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java @@ -18,26 +18,20 @@ */ package org.apache.asterix.app.external; -import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.common.external.IExternalCredentialsCache; -import org.apache.asterix.external.util.ExternalDataConstants; -import org.apache.asterix.external.util.aws.s3.S3Constants; -import org.apache.commons.lang3.tuple.Pair; import org.apache.hyracks.util.Span; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import software.amazon.awssdk.auth.credentials.AwsSessionCredentials; - public class ExternalCredentialsCache implements IExternalCredentialsCache { private static final Logger LOGGER = LogManager.getLogger(); - private final ConcurrentMap<String, Pair<Span, Object>> cache = new ConcurrentHashMap<>(); + private final ConcurrentMap<String, TemporaryCredentials> cache = new ConcurrentHashMap<>(); private final int awsAssumeRoleDuration; private final int refreshAwsAssumeRoleThresholdPercentage; @@ -51,7 +45,7 @@ public synchronized Object get(String key) { invalidateCache(); if (cache.containsKey(key)) { - return cache.get(key).getRight(); + return cache.get(key).getCredentials(); } return null; } @@ -65,20 +59,9 @@ } @Override - public synchronized void put(String key, String type, Map<String, String> credentials) { - if (ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3.equalsIgnoreCase(type)) { - updateAwsCache(key, credentials); - } - } - - private void updateAwsCache(String name, Map<String, String> credentials) { - String accessKeyId = credentials.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME); - String secretAccessKey = credentials.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME); - String sessionToken = credentials.get(S3Constants.SESSION_TOKEN_FIELD_NAME); - - AwsSessionCredentials sessionCreds = AwsSessionCredentials.create(accessKeyId, secretAccessKey, sessionToken); - cache.put(name, Pair.of(Span.start(awsAssumeRoleDuration, TimeUnit.SECONDS), sessionCreds)); - LOGGER.info("Received and cached new credentials for {}", name); + public synchronized void put(String key, Object credentials) { + cache.put(key, new TemporaryCredentials(Span.start(awsAssumeRoleDuration, TimeUnit.SECONDS), credentials)); + LOGGER.info("Received and cached new credentials for {}", key); } /** @@ -86,7 +69,7 @@ */ private void invalidateCache() { cache.entrySet().removeIf(entry -> { - boolean shouldRemove = needsRefresh(entry.getValue().getLeft()); + boolean shouldRemove = needsRefresh(entry.getValue().getDuration()); if (shouldRemove) { LOGGER.info("Removing cached credentials for {} because it expired", entry.getKey()); } @@ -106,4 +89,22 @@ int passedPercentage = (int) (passed * 100); return passedPercentage > refreshAwsAssumeRoleThresholdPercentage; } + + static class TemporaryCredentials { + private final Span duration; + private final Object credentials; + + public TemporaryCredentials(Span duration, Object credentials) { + this.duration = duration; + this.credentials = credentials; + } + + public Span getDuration() { + return duration; + } + + public Object getCredentials() { + return credentials; + } + } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java index 29e3239..4c382d0 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java @@ -24,7 +24,6 @@ import static org.apache.asterix.common.exceptions.ErrorCode.REJECT_BAD_CLUSTER_STATE; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -39,10 +38,10 @@ import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.common.external.IExternalCredentialsCache; import org.apache.asterix.common.external.IExternalCredentialsCacheUpdater; +import org.apache.asterix.common.messaging.api.INcAddressedMessage; import org.apache.asterix.common.messaging.api.MessageFuture; import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.aws.s3.S3AuthUtils; -import org.apache.asterix.external.util.aws.s3.S3Constants; import org.apache.asterix.messaging.CCMessageBroker; import org.apache.asterix.messaging.NCMessageBroker; import org.apache.hyracks.api.application.INCServiceContext; @@ -53,6 +52,11 @@ import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.AwsSessionCredentials; +/** + * The class is responsible for generating new credentials based on the adapter type. Given a request: + * - if we are the CC, generate new creds and ask all NCs to update their cache + * - if we are the NC, send a message to the CC to generate new creds + */ public class ExternalCredentialsCacheUpdater implements IExternalCredentialsCacheUpdater { private static final Logger LOGGER = LogManager.getLogger(); @@ -66,49 +70,46 @@ public synchronized Object generateAndCacheCredentials(Map<String, String> configuration) throws HyracksDataException, CompilationException { IExternalCredentialsCache cache = appCtx.getExternalCredentialsCache(); - String name = configuration.get(ExternalDataConstants.KEY_ENTITY_ID); - Object credentials = cache.get(name); + String key = configuration.get(ExternalDataConstants.KEY_ENTITY_ID); + Object credentials = cache.get(key); if (credentials != null) { return credentials; } - /* - * if we are the CC, generate new creds and ask all NCs to update their cache - * if we are the NC, send a message to the CC to generate new creds and ask all NCs to update their cache - */ - if (appCtx instanceof ICcApplicationContext ccAppCtx) { - IClusterManagementWork.ClusterState state = ccAppCtx.getClusterStateManager().getState(); - if (!(state == ACTIVE || state == REBALANCE_REQUIRED)) { - throw new RuntimeDataException(REJECT_BAD_CLUSTER_STATE, state); - } + String type = configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE); + if (ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3.equals(type)) { + credentials = generateAwsCredentials(configuration); + } - String accessKeyId; - String secretAccessKey; - String sessionToken; - Map<String, String> credentialsMap = new HashMap<>(); + return credentials; + } + + // TODO: this can probably be refactored out into something that is AWS-specific + private Object generateAwsCredentials(Map<String, String> configuration) + throws HyracksDataException, CompilationException { + String key = configuration.get(ExternalDataConstants.KEY_ENTITY_ID); + AwsSessionCredentials credentials; + if (appCtx instanceof ICcApplicationContext) { + validateClusterState(); try { - LOGGER.info("attempting to update credentials for {}", name); + LOGGER.info("attempting to update AWS credentials for {}", key); AwsCredentialsProvider newCredentials = S3AuthUtils.assumeRoleAndGetCredentials(configuration); - LOGGER.info("updated credentials successfully for {}", name); - AwsSessionCredentials sessionCredentials = (AwsSessionCredentials) newCredentials.resolveCredentials(); - accessKeyId = sessionCredentials.accessKeyId(); - secretAccessKey = sessionCredentials.secretAccessKey(); - sessionToken = sessionCredentials.sessionToken(); + LOGGER.info("updated AWS credentials successfully for {}", key); + credentials = (AwsSessionCredentials) newCredentials.resolveCredentials(); + appCtx.getExternalCredentialsCache().put(key, credentials); } catch (CompilationException ex) { - LOGGER.info("failed to refresh credentials for {}", name, ex); + LOGGER.info("failed to refresh AWS credentials for {}", key, ex); throw ex; } - // credentials need refreshing - credentialsMap.put(S3Constants.ACCESS_KEY_ID_FIELD_NAME, accessKeyId); - credentialsMap.put(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME, secretAccessKey); - credentialsMap.put(S3Constants.SESSION_TOKEN_FIELD_NAME, sessionToken); + String accessKeyId = credentials.accessKeyId(); + String secretAccessKey = credentials.secretAccessKey(); + String sessionToken = credentials.sessionToken(); + UpdateAwsCredentialsCacheRequest request = + new UpdateAwsCredentialsCacheRequest(configuration, accessKeyId, secretAccessKey, sessionToken); // request all NCs to update their credentials cache with the latest creds - updateNcsCredentialsCache(ccAppCtx, name, credentialsMap, configuration); - String type = configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE); - cache.put(name, type, credentialsMap); - credentials = AwsSessionCredentials.create(accessKeyId, secretAccessKey, sessionToken); + updateNcsCredentialsCache(key, request); } else { NCMessageBroker broker = (NCMessageBroker) appCtx.getServiceContext().getMessageBroker(); MessageFuture messageFuture = broker.registerMessageFuture(); @@ -116,7 +117,7 @@ long futureId = messageFuture.getFutureId(); RefreshAwsCredentialsRequest request = new RefreshAwsCredentialsRequest(nodeId, futureId, configuration); try { - LOGGER.info("no valid credentials found for {}, requesting credentials from CC", name); + LOGGER.info("no valid AWS credentials found for {}, requesting AWS credentials from CC", key); broker.sendMessageToPrimaryCC(request); RefreshAwsCredentialsResponse response = (RefreshAwsCredentialsResponse) messageFuture .get(DEFAULT_NC_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); @@ -126,7 +127,7 @@ credentials = AwsSessionCredentials.create(response.getAccessKeyId(), response.getSecretAccessKey(), response.getSessionToken()); } catch (Exception ex) { - LOGGER.info("failed to refresh credentials for {}", name, ex); + LOGGER.info("failed to refresh AWS credentials for {}", key, ex); throw HyracksDataException.create(ex); } finally { broker.deregisterMessageFuture(futureId); @@ -135,14 +136,12 @@ return credentials; } - private void updateNcsCredentialsCache(ICcApplicationContext appCtx, String name, Map<String, String> credentials, - Map<String, String> configuration) throws HyracksDataException { - final List<String> ncs = new ArrayList<>(appCtx.getClusterStateManager().getParticipantNodes()); + private void updateNcsCredentialsCache(String key, INcAddressedMessage request) throws HyracksDataException { + ICcApplicationContext ccAppCtx = (ICcApplicationContext) appCtx; + final List<String> ncs = new ArrayList<>(ccAppCtx.getClusterStateManager().getParticipantNodes()); CCMessageBroker broker = (CCMessageBroker) appCtx.getServiceContext().getMessageBroker(); - UpdateAwsCredentialsCacheRequest request = new UpdateAwsCredentialsCacheRequest(configuration, credentials); - try { - LOGGER.info("requesting all NCs to update their credentials for {}", name); + LOGGER.info("requesting all NCs to update their credentials for {}", key); for (String nc : ncs) { broker.sendApplicationMessageToNC(request, nc); } @@ -151,4 +150,12 @@ throw HyracksDataException.create(e); } } + + private void validateClusterState() throws HyracksDataException { + ICcApplicationContext ccAppCtx = (ICcApplicationContext) appCtx; + IClusterManagementWork.ClusterState state = ccAppCtx.getClusterStateManager().getState(); + if (!(state == ACTIVE || state == REBALANCE_REQUIRED)) { + throw new RuntimeDataException(REJECT_BAD_CLUSTER_STATE, state); + } + } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java index 438e425..753dbf1 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java @@ -23,26 +23,31 @@ import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.messaging.api.INcAddressedMessage; import org.apache.asterix.external.util.ExternalDataConstants; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; + +import software.amazon.awssdk.auth.credentials.AwsSessionCredentials; public class UpdateAwsCredentialsCacheRequest implements INcAddressedMessage { - private static final Logger LOGGER = LogManager.getLogger(); private static final long serialVersionUID = 1L; private final Map<String, String> configuration; - private final Map<String, String> credentials; + private final String accessKeyId; + private final String secretAccessKey; + private final String sessionToken; - public UpdateAwsCredentialsCacheRequest(Map<String, String> configuration, Map<String, String> credentials) { + public UpdateAwsCredentialsCacheRequest(Map<String, String> configuration, String accessKeyId, + String secretAccessKey, String sessionToken) { this.configuration = configuration; - this.credentials = credentials; + this.accessKeyId = accessKeyId; + this.secretAccessKey = secretAccessKey; + this.sessionToken = sessionToken; + } @Override public void handle(INcApplicationContext appCtx) { String name = configuration.get(ExternalDataConstants.KEY_ENTITY_ID); - String type = configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE); - appCtx.getExternalCredentialsCache().put(name, type, credentials); + AwsSessionCredentials credentials = AwsSessionCredentials.create(accessKeyId, secretAccessKey, sessionToken); + appCtx.getExternalCredentialsCache().put(name, credentials); } @Override diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index 97bc3cd..8ba9a02 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -579,17 +579,11 @@ // async queries are completed after their job completes if (ResultDelivery.ASYNC != resultDelivery) { appCtx.getRequestTracker().complete(requestParameters.getRequestReference().getUuid()); - postRequestCompleteCleanup(requestParameters); } Thread.currentThread().setName(threadName); } } - protected void postRequestCompleteCleanup(IRequestParameters requestParameters) { - String uuid = requestParameters.getRequestReference().getUuid(); - appCtx.getExternalCredentialsCache().delete(uuid); - } - protected void configureMetadataProvider(MetadataProvider metadataProvider, Map<String, String> config, Counter resultSetIdCounter, FileSplit outputFile, IRequestParameters requestParameters, Statement statement) { @@ -1038,12 +1032,8 @@ ExternalDataUtils.normalize(properties); ExternalDataUtils.validate(properties); ExternalDataUtils.validateType(properties, (ARecordType) itemType); - Map<String, String> propertiesCopy = preparePropertiesCopyForValidation(externalDetails, properties, - dd.getSourceLocation(), mdTxnCtx, appCtx, metadataProvider); - // do any necessary validation on the copy to avoid changing the original and storing it in the metadata - metadataProvider.setExternalEntityIdFromParts(propertiesCopy, databaseName, dataverseName, - datasetName, false); - validateAdapterSpecificProperties(propertiesCopy, dd.getSourceLocation(), appCtx); + validateExternalDatasetProperties(externalDetails, properties, dd.getSourceLocation(), mdTxnCtx, + appCtx, metadataProvider); datasetDetails = new ExternalDatasetDetails(externalDetails.getAdapter(), properties, new Date(), TransactionState.COMMIT); break; @@ -2459,7 +2449,6 @@ sourceLoc, EnumSet.of(DropOption.IF_EXISTS), requestParameters.isForceDropDataset()); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue()); - deleteDatasetCachedCredentials(ds); return true; } catch (Exception e) { LOGGER.error("failed to drop dataset; executing compensating operations", e); @@ -2507,10 +2496,6 @@ } } - protected void deleteDatasetCachedCredentials(Dataset dataset) throws CompilationException { - appCtx.getExternalCredentialsCache().delete(dataset.getDatasetFullyQualifiedName().toString()); - } - protected void handleIndexDropStatement(MetadataProvider metadataProvider, Statement stmt, IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception { IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt; @@ -4057,11 +4042,8 @@ copyStmt, itemType, mdTxnCtx, metadataProvider); ExternalDataUtils.normalize(properties); ExternalDataUtils.validate(properties); - Map<String, String> propertiesCopy = preparePropertiesCopyForValidation(externalDetails, properties, - copyStmt.getSourceLocation(), mdTxnCtx, appCtx, metadataProvider); - // do any necessary validation on the copy to avoid changing the original and storing it in the metadata - metadataProvider.setExternalEntityId(propertiesCopy, dataset); - validateAdapterSpecificProperties(propertiesCopy, copyStmt.getSourceLocation(), appCtx); + validateExternalDatasetProperties(externalDetails, properties, copyStmt.getSourceLocation(), mdTxnCtx, + appCtx, metadataProvider); CompiledCopyFromFileStatement cls = new CompiledCopyFromFileStatement(databaseName, dataverseName, copyStmt.getDatasetName(), itemType, externalDetails.getAdapter(), properties); cls.setSourceLocation(stmt.getSourceLocation()); @@ -4118,9 +4100,6 @@ ResultMetadata outMetadata, IRequestParameters requestParameters, Map<String, IAObject> stmtParams, Stats stats) throws Exception { CopyToStatement copyTo = (CopyToStatement) stmt; - Namespace namespace = copyTo.getNamespace(); - DataverseName dataverseName = namespace.getDataverseName(); - String databaseName = namespace.getDatabaseName(); final IRequestTracker requestTracker = appCtx.getRequestTracker(); final ClientRequest clientRequest = (ClientRequest) requestTracker.get(requestParameters.getRequestReference().getUuid()); @@ -4149,14 +4128,9 @@ metadataProvider.setMetadataTxnContext(mdTxnCtx); try { ExternalDetailsDecl edd = copyTo.getExternalDetailsDecl(); - Map<String, String> properties = createAndValidateAdapterConfigurationForCopyToStmt(edd, + edd.setProperties(createAndValidateAdapterConfigurationForCopyToStmt(edd, ExternalDataConstants.WRITER_SUPPORTED_ADAPTERS, copyTo.getSourceLocation(), mdTxnCtx, - metadataProvider); - - // request id is used to cache credentials if needed, and clear them after request is done - String uuid = requestParameters.getRequestReference().getUuid(); - metadataProvider.setExternalEntityIdFromParts(properties, databaseName, dataverseName, uuid, true); - edd.setProperties(properties); + metadataProvider)); if (ExternalDataConstants.FORMAT_PARQUET .equalsIgnoreCase(edd.getProperties().get(ExternalDataConstants.KEY_FORMAT))) { @@ -5585,7 +5559,6 @@ // complete async jobs after their job completes if (ResultDelivery.ASYNC == resultDelivery) { requestTracker.complete(clientRequest.getId()); - postRequestCompleteCleanup(requestParameters); } locker.unlock(); } @@ -5802,8 +5775,7 @@ * @param details external details * @param sourceLoc source location */ - protected void normalizeAdapters(ExternalDetailsDecl details, SourceLocation sourceLoc) - throws CompilationException { + private void normalizeAdapters(ExternalDetailsDecl details, SourceLocation sourceLoc) throws CompilationException { String adapter = details.getAdapter(); Optional<String> normalizedAdapter = getSupportedAdapters().stream().filter(k -> k.equalsIgnoreCase(adapter)).findFirst(); @@ -5817,15 +5789,17 @@ return ExternalDataConstants.EXTERNAL_READ_ADAPTERS; } - protected Map<String, String> preparePropertiesCopyForValidation(ExternalDetailsDecl externalDetails, + protected void validateExternalDatasetProperties(ExternalDetailsDecl externalDetails, Map<String, String> properties, SourceLocation srcLoc, MetadataTransactionContext mdTxnCtx, IApplicationContext appCtx, MetadataProvider metadataProvider) throws AlgebricksException, HyracksDataException { + // Validate adapter specific properties normalizeAdapters(externalDetails, srcLoc); String adapter = externalDetails.getAdapter(); Map<String, String> details = new HashMap<>(properties); details.put(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE, adapter); - return details; + metadataProvider.setExternalEntityId(details); + validateAdapterSpecificProperties(details, srcLoc, appCtx); } protected Map<String, String> createAndValidateAdapterConfigurationForCopyToStmt( @@ -5835,6 +5809,7 @@ String adapterName = externalDetailsDecl.getAdapter(); Map<String, String> properties = externalDetailsDecl.getProperties(); properties.put(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE, adapterName); + md.setExternalEntityId(properties); WriterValidationUtil.validateWriterConfiguration(adapterName, supportedAdapters, properties, sourceLocation); return properties; } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java index 3a9ae1c..689ff16 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java @@ -18,8 +18,6 @@ */ package org.apache.asterix.common.external; -import java.util.Map; - public interface IExternalCredentialsCache { /** @@ -41,8 +39,7 @@ * Updates the credentials cache with the provided credentials for the specified name * * @param key credentials key - * @param type credentials type * @param credentials credentials to cache */ - void put(String key, String type, Map<String, String> credentials); + void put(String key, Object credentials); } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java index 6f3a9b6..20299d5 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java @@ -32,6 +32,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.UUID; import java.util.stream.Collectors; import org.apache.asterix.common.api.INamespaceResolver; @@ -48,7 +49,6 @@ import org.apache.asterix.common.external.IDataSourceAdapter; import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory; import org.apache.asterix.common.functions.FunctionSignature; -import org.apache.asterix.common.metadata.DatasetFullyQualifiedName; import org.apache.asterix.common.metadata.DataverseName; import org.apache.asterix.common.metadata.LockList; import org.apache.asterix.common.metadata.MetadataConstants; @@ -999,7 +999,8 @@ configuration.put(ExternalDataConstants.KEY_DATASET_DATABASE, dataset.getDatabaseName()); configuration.put(ExternalDataConstants.KEY_DATASET_DATAVERSE, dataset.getDataverseName().getCanonicalForm()); - setExternalEntityId(configuration, dataset); + setExternalEntityId(configuration); + setSourceType(configuration, adapterName); return AdapterFactoryProvider.getAdapterFactory(getApplicationContext().getServiceContext(), adapterName, configuration, itemType, null, warningCollector, filterEvaluatorFactory); } catch (Exception e) { @@ -1007,18 +1008,12 @@ } } - public void setExternalEntityId(Map<String, String> configuration, Dataset dataset) throws AlgebricksException { - configuration.put(ExternalDataConstants.KEY_ENTITY_ID, dataset.getDatasetFullyQualifiedName().toString()); + protected void setSourceType(Map<String, String> configuration, String adapterName) { + configuration.putIfAbsent(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE, adapterName); } - public void setExternalEntityIdFromParts(Map<String, String> configuration, String database, - DataverseName dataverse, String dataset, boolean isUuid) throws AlgebricksException { - if (isUuid) { - configuration.put(ExternalDataConstants.KEY_ENTITY_ID, dataset); - } else { - DatasetFullyQualifiedName fqn = new DatasetFullyQualifiedName(database, dataverse, dataset); - configuration.put(ExternalDataConstants.KEY_ENTITY_ID, fqn.toString()); - } + public void setExternalEntityId(Map<String, String> configuration) throws AlgebricksException { + configuration.put(ExternalDataConstants.KEY_ENTITY_ID, UUID.randomUUID().toString()); } public TxnId getTxnId() { -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19386 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Change-Id: Iba5dfd5e077f87aa0c1ee1cc81fcf197c9f06762 Gerrit-Change-Number: 19386 Gerrit-PatchSet: 2 Gerrit-Owner: Hussain Towaileb <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Hussain Towaileb <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-MessageType: merged
