>From Hussain Towaileb <[email protected]>: Hussain Towaileb has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21024?usp=email )
Change subject: [ASTERIXDB-3634][EXT]: Add support to Azure ADLS Iceberg FileIO ...................................................................... [ASTERIXDB-3634][EXT]: Add support to Azure ADLS Iceberg FileIO Ext-ref: MB-70848 Change-Id: Idd5819f653a20e207a5ca659850e4298316ec84f Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21024 Reviewed-by: Hussain Towaileb <[email protected]> Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> --- M asterixdb/asterix-external-data/pom.xml M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeReaderFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/delta/AzureDeltaReaderFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/AzureConstants.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/blob/BlobUtils.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/datalake/DatalakeUtils.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergConstants.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/nessie/NessieUtils.java M asterixdb/pom.xml 14 files changed, 136 insertions(+), 74 deletions(-) Approvals: Hussain Towaileb: Looks good to me, but someone else must approve Jenkins: Verified; Verified Murtadha Hubail: Looks good to me, approved diff --git a/asterixdb/asterix-external-data/pom.xml b/asterixdb/asterix-external-data/pom.xml index a889126..000beff 100644 --- a/asterixdb/asterix-external-data/pom.xml +++ b/asterixdb/asterix-external-data/pom.xml @@ -657,6 +657,10 @@ </dependency> <dependency> <groupId>org.apache.iceberg</groupId> + <artifactId>iceberg-azure</artifactId> + </dependency> + <dependency> + <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-nessie</artifactId> </dependency> <dependency> diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java index b7d1197e..e8f7d91 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java @@ -69,8 +69,9 @@ public class IcebergParquetRecordReaderFactory implements IIcebergRecordReaderFactory<Record> { private static final long serialVersionUID = 1L; - private static final List<String> RECORD_READER_NAMES = - Arrays.asList(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3, ExternalDataConstants.KEY_ADAPTER_NAME_GCS); + private static final List<String> RECORD_READER_NAMES = Arrays.asList(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3, + ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_BLOB, ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_DATALAKE, + ExternalDataConstants.KEY_ADAPTER_NAME_GCS); private final List<FileScanTask> fileScanTasks = new ArrayList<>(); private final List<PartitionWorkLoadBasedOnSize> partitionWorkLoadsBasedOnSize = new ArrayList<>(); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeReaderFactory.java index 74c5f5e..cc65482 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeReaderFactory.java @@ -30,7 +30,7 @@ private static final long serialVersionUID = 1L; private static final List<String> recordReaderNames = - Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_DATA_LAKE); + Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_DATALAKE); @Override public List<String> getRecordReaderNames() { diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/delta/AzureDeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/delta/AzureDeltaReaderFactory.java index cb40026..b97449f 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/delta/AzureDeltaReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/delta/AzureDeltaReaderFactory.java @@ -20,7 +20,6 @@ import static org.apache.asterix.external.util.azure.AzureConstants.HADOOP_AZURE_PROTOCOL; import static org.apache.asterix.external.util.azure.AzureUtils.extractEndPoint; -import static org.apache.asterix.external.util.azure.blob.BlobUtils.buildClient; import java.util.Collections; import java.util.List; @@ -40,7 +39,7 @@ public class AzureDeltaReaderFactory extends DeltaReaderFactory { private static final long serialVersionUID = 1L; private static final List<String> RECORD_READER_NAMES = - Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_DATA_LAKE); + Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_DATALAKE); @Override protected void configureJobConf(IApplicationContext appCtx, JobConf conf, Map<String, String> configuration) diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java index 93774ee..fc6374c 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java @@ -52,7 +52,7 @@ public class AzureDataLakeParquetReaderFactory extends HDFSDataSourceFactory { private static final long serialVersionUID = -6140824803254158253L; private static final List<String> recordReaderNames = - Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_DATA_LAKE); + Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_DATALAKE); @Override public void configure(IServiceContext serviceCtx, Map<String, String> configuration, diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java index 2431a79..8f0d363 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java @@ -154,8 +154,8 @@ public static final String KEY_ADAPTER_NAME_AWS_S3 = "S3"; public static final String KEY_ADAPTER_NAME_AZURE_BLOB = "AZUREBLOB"; public static final String KEY_ADAPTER_NAME_AZURE_BLOB_ALIAS = "AZURE_BLOB"; - public static final String KEY_ADAPTER_NAME_AZURE_DATA_LAKE = "AZUREDATALAKE"; - public static final String KEY_ADAPTER_NAME_AZURE_DATA_LAKE_ALIAS = "AZURE_DATALAKE"; + public static final String KEY_ADAPTER_NAME_AZURE_DATALAKE = "AZUREDATALAKE"; + public static final String KEY_ADAPTER_NAME_AZURE_DATALAKE_ALIAS = "AZURE_DATALAKE"; public static final String KEY_ADAPTER_NAME_GCS = "GCS"; public static final String KEY_ADAPTER_NAME_HDFS = "HDFS"; public static final String KEY_ADAPTER_NAME_HTTP_ADAPTER = "http"; @@ -164,7 +164,7 @@ KEY_ADAPTER_NAME_PUSH_TWITTER, KEY_ADAPTER_NAME_TWITTER_PULL, KEY_ADAPTER_NAME_PULL_TWITTER, KEY_ADAPTER_NAME_TWITTER_USER_STREAM, KEY_ADAPTER_NAME_LOCALFS, KEY_ADAPTER_NAME_SOCKET, KEY_ADAPTER_NAME_HTTP, KEY_ADAPTER_NAME_AWS_S3, KEY_ADAPTER_NAME_AZURE_BLOB, - KEY_ADAPTER_NAME_AZURE_DATA_LAKE, KEY_ADAPTER_NAME_GCS, KEY_ADAPTER_NAME_HDFS); + KEY_ADAPTER_NAME_AZURE_DATALAKE, KEY_ADAPTER_NAME_GCS, KEY_ADAPTER_NAME_HDFS); /** * HDFS class names diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java index 5b59677..dba9bd7 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java @@ -30,6 +30,8 @@ import static org.apache.asterix.common.utils.CSVConstants.KEY_QUOTE; import static org.apache.asterix.external.util.ExternalDataConstants.DEFINITION_FIELD_NAME; import static org.apache.asterix.external.util.ExternalDataConstants.DISABLE_SSL_VERIFY_FIELD_NAME; +import static org.apache.asterix.external.util.ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_DATALAKE; +import static org.apache.asterix.external.util.ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_DATALAKE_ALIAS; import static org.apache.asterix.external.util.ExternalDataConstants.KEY_EXCLUDE; import static org.apache.asterix.external.util.ExternalDataConstants.KEY_EXTERNAL_SCAN_BUFFER_SIZE; import static org.apache.asterix.external.util.ExternalDataConstants.KEY_INCLUDE; @@ -37,10 +39,7 @@ import static org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_END; import static org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_START; import static org.apache.asterix.external.util.aws.s3.S3Utils.configureAwsS3HdfsJobConf; -import static org.apache.asterix.external.util.azure.blob.BlobUtils.validateAzureBlobProperties; -import static org.apache.asterix.external.util.azure.datalake.DatalakeUtils.validateAzureDataLakeProperties; import static org.apache.asterix.external.util.google.GCSUtils.configureHdfsJobConf; -import static org.apache.asterix.external.util.google.GCSUtils.validateProperties; import static org.apache.asterix.external.util.iceberg.IcebergUtils.isIcebergTable; import static org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE; import static org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.EMPTY_TYPE; @@ -750,13 +749,13 @@ S3Utils.validateProperties(appCtx, configuration, srcLoc, collector); break; case ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_BLOB: - validateAzureBlobProperties(configuration, srcLoc, collector, appCtx); + BlobUtils.validateProperties(configuration, srcLoc, collector, appCtx); break; - case ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_DATA_LAKE: - validateAzureDataLakeProperties(configuration, srcLoc, collector, appCtx); + case KEY_ADAPTER_NAME_AZURE_DATALAKE, KEY_ADAPTER_NAME_AZURE_DATALAKE_ALIAS: + DatalakeUtils.validateProperties(configuration, srcLoc, collector, appCtx); break; case ExternalDataConstants.KEY_ADAPTER_NAME_GCS: - validateProperties(appCtx, configuration, srcLoc, collector); + GCSUtils.validateProperties(appCtx, configuration, srcLoc, collector); break; case ExternalDataConstants.KEY_ADAPTER_NAME_HDFS: HDFSUtils.validateProperties(configuration, srcLoc, collector); @@ -1166,8 +1165,8 @@ protocol = AzureConstants.HADOOP_AZURE_PROTOCOL; String blobEndpoint = BlobUtils.getEndpointFromClient(configurations); return protocol + "://" + container + "@" + blobEndpoint + "/"; - case ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_DATA_LAKE: - case ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_DATA_LAKE_ALIAS: + case ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_DATALAKE: + case ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_DATALAKE_ALIAS: protocol = AzureConstants.HADOOP_AZURE_PROTOCOL; String dataLakeEndpoint = DatalakeUtils.getEndpointFromClient(configurations); return protocol + "://" + container + "@" + dataLakeEndpoint + "/"; @@ -1202,7 +1201,7 @@ switch (normalizedAdapter) { case ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3: case ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_BLOB: - case ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_DATA_LAKE: + case ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_DATALAKE: case ExternalDataConstants.KEY_ADAPTER_NAME_GCS: return ExternalDataConstants.DEFINITION_FIELD_NAME; default: diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/AzureConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/AzureConstants.java index 0314fff..030c9f1 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/AzureConstants.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/AzureConstants.java @@ -99,4 +99,6 @@ public static final String HADOOP_MANAGED_IDENTITY_ENDPOINT = "fs.azure.account.oauth2.msi.endpoint"; public static final String HADOOP_MANAGED_IDENTITY_ENDPOINT_VALUE = "http://169.254.169.254/metadata/identity/oauth2/token"; + + public static final String TOKEN_REQUEST_CONTEXT_SCOPE = "https://storage.azure.com/.default"; } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/blob/BlobUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/blob/BlobUtils.java index b4caa52..72a5def 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/blob/BlobUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/blob/BlobUtils.java @@ -53,6 +53,7 @@ import org.apache.asterix.external.util.ExternalDataUtils; import org.apache.asterix.external.util.azure.AzureConstants; import org.apache.asterix.external.util.azure.AzureUtils; +import org.apache.asterix.external.util.iceberg.IcebergUtils; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.IWarningCollector; @@ -116,9 +117,11 @@ BlobServiceClientBuilder builder = new BlobServiceClientBuilder(); builder.httpLogOptions(AzureConstants.HTTP_LOG_OPTIONS); - int timeout = appCtx.getExternalProperties().getAzureRequestTimeout(); - RequestRetryOptions requestRetryOptions = new RequestRetryOptions(null, null, timeout, null, null, null); - builder.retryOptions(requestRetryOptions); + if (appCtx != null) { + int timeout = appCtx.getExternalProperties().getAzureRequestTimeout(); + RequestRetryOptions requestRetryOptions = new RequestRetryOptions(null, null, timeout, null, null, null); + builder.retryOptions(requestRetryOptions); + } // Endpoint is required if (endpoint == null) { @@ -310,7 +313,7 @@ * @param configuration properties * @throws CompilationException Compilation exception */ - public static void validateAzureBlobProperties(Map<String, String> configuration, SourceLocation srcLoc, + public static void validateProperties(Map<String, String> configuration, SourceLocation srcLoc, IWarningCollector collector, IApplicationContext appCtx) throws CompilationException { if (isDeltaTable(configuration)) { throw new CompilationException(ErrorCode.EXTERNAL_COLLECTION_NOT_SUPPORTED, "delta-table", @@ -320,6 +323,11 @@ throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT); } + String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME); + if (IcebergUtils.isIcebergTable(configuration)) { + return; + } + validateIncludeExclude(configuration); try { // TODO(htowaileb): maybe something better, this will check to ensure type is supported before creation @@ -331,7 +339,6 @@ // Check if the bucket is present BlobServiceClient blobServiceClient; try { - String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME); blobServiceClient = buildClient(appCtx, configuration); BlobContainerClient blobContainer = blobServiceClient.getBlobContainerClient(container); @@ -366,4 +373,8 @@ return AzureUtils.extractEndPoint(builder.buildClient().getAccountUrl()); } + public static boolean isBlobAdapter(String type) { + return type.equalsIgnoreCase(ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_BLOB) + || type.equalsIgnoreCase(ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_BLOB_ALIAS); + } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/datalake/DatalakeUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/datalake/DatalakeUtils.java index 395cb0f..b469678 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/datalake/DatalakeUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/datalake/DatalakeUtils.java @@ -34,8 +34,10 @@ import static org.apache.asterix.external.util.azure.AzureConstants.MANAGED_IDENTITY_FIELD_NAME; import static org.apache.asterix.external.util.azure.AzureConstants.SHARED_ACCESS_SIGNATURE_FIELD_NAME; import static org.apache.asterix.external.util.azure.AzureConstants.TENANT_ID_FIELD_NAME; +import static org.apache.asterix.external.util.azure.AzureConstants.TOKEN_REQUEST_CONTEXT_SCOPE; import static org.apache.asterix.external.util.azure.datalake.DatalakeConstants.DEFAULT_RECUSRIVE_VALUE; import static org.apache.asterix.external.util.azure.datalake.DatalakeConstants.RECURSIVE_FIELD_NAME; +import static org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_COLLECTION_PROPERTY_PREFIX_INTERNAL; import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString; import java.util.ArrayList; @@ -55,12 +57,16 @@ import org.apache.asterix.external.util.ExternalDataUtils; import org.apache.asterix.external.util.azure.AzureConstants; import org.apache.asterix.external.util.azure.AzureUtils; +import org.apache.asterix.external.util.iceberg.IcebergUtils; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.IWarningCollector; import org.apache.hyracks.api.exceptions.SourceLocation; import org.apache.hyracks.api.exceptions.Warning; +import org.apache.iceberg.azure.AzureProperties; +import com.azure.core.credential.AccessToken; import com.azure.core.credential.AzureSasCredential; +import com.azure.core.credential.TokenRequestContext; import com.azure.core.http.rest.PagedIterable; import com.azure.identity.ClientSecretCredentialBuilder; import com.azure.identity.ManagedIdentityCredentialBuilder; @@ -109,9 +115,11 @@ DataLakeServiceClientBuilder builder = new DataLakeServiceClientBuilder(); builder.httpLogOptions(AzureConstants.HTTP_LOG_OPTIONS); - int timeout = appCtx.getExternalProperties().getAzureRequestTimeout(); - RequestRetryOptions requestRetryOptions = new RequestRetryOptions(null, null, timeout, null, null, null); - builder.retryOptions(requestRetryOptions); + if (appCtx != null) { + int timeout = appCtx.getExternalProperties().getAzureRequestTimeout(); + RequestRetryOptions requestRetryOptions = new RequestRetryOptions(null, null, timeout, null, null, null); + builder.retryOptions(requestRetryOptions); + } // Endpoint is required if (endpoint == null) { @@ -282,7 +290,7 @@ * @param configuration properties * @throws CompilationException Compilation exception */ - public static void validateAzureDataLakeProperties(Map<String, String> configuration, SourceLocation srcLoc, + public static void validateProperties(Map<String, String> configuration, SourceLocation srcLoc, IWarningCollector collector, IApplicationContext appCtx) throws CompilationException { // check if the format property is present @@ -292,12 +300,16 @@ throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT); } + String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME); + if (IcebergUtils.isIcebergTable(configuration)) { + return; + } + validateIncludeExclude(configuration); // Check if the bucket is present DataLakeServiceClient dataLakeServiceClient; try { - String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME); dataLakeServiceClient = buildClient(appCtx, configuration); DataLakeFileSystemClient fileSystemClient = dataLakeServiceClient.getFileSystemClient(container); @@ -332,4 +344,60 @@ return AzureUtils.extractEndPoint(builder.buildClient().getAccountUrl()); } + public static boolean isDatalakeAdapter(String adapter) { + return adapter.equalsIgnoreCase(ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_DATALAKE) + || adapter.equalsIgnoreCase(ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_DATALAKE_ALIAS); + } + + public static void setIcebergAdlsAuthParams(Map<String, String> properties) throws CompilationException { + String managedIdentity = + properties.get(ICEBERG_COLLECTION_PROPERTY_PREFIX_INTERNAL + MANAGED_IDENTITY_FIELD_NAME); + String accountName = properties.get(ICEBERG_COLLECTION_PROPERTY_PREFIX_INTERNAL + ACCOUNT_NAME_FIELD_NAME); + String accountKey = properties.get(ICEBERG_COLLECTION_PROPERTY_PREFIX_INTERNAL + ACCOUNT_KEY_FIELD_NAME); + String sharedAccessSignature = + properties.get(ICEBERG_COLLECTION_PROPERTY_PREFIX_INTERNAL + SHARED_ACCESS_SIGNATURE_FIELD_NAME); + String tenantId = properties.get(ICEBERG_COLLECTION_PROPERTY_PREFIX_INTERNAL + TENANT_ID_FIELD_NAME); + String clientId = properties.get(ICEBERG_COLLECTION_PROPERTY_PREFIX_INTERNAL + CLIENT_ID_FIELD_NAME); + String clientSecret = properties.get(ICEBERG_COLLECTION_PROPERTY_PREFIX_INTERNAL + CLIENT_SECRET_FIELD_NAME); + + Map<String, String> collectionProperties = IcebergUtils.filterCollectionProperties(properties); + DataLakeServiceClient dataLakeServiceClient = DatalakeUtils.buildClient(null, collectionProperties); + String endpoint = AzureUtils.extractEndPoint(dataLakeServiceClient.getAccountUrl()); + + if (accountName != null && accountKey != null) { + properties.put(AzureProperties.ADLS_SHARED_KEY_ACCOUNT_NAME, accountName); + properties.put(AzureProperties.ADLS_SHARED_KEY_ACCOUNT_KEY, accountKey); + return; + } + + if (sharedAccessSignature != null) { + properties.put(AzureProperties.ADLS_SAS_TOKEN_PREFIX + endpoint, sharedAccessSignature); + return; + } + + if (managedIdentity != null) { + ManagedIdentityCredentialBuilder builder = new ManagedIdentityCredentialBuilder(); + if (clientId != null) { + builder.clientId(clientId); + } + TokenRequestContext context = new TokenRequestContext().addScopes(TOKEN_REQUEST_CONTEXT_SCOPE); + AccessToken token = builder.build().getToken(context).block(); + if (token != null) { + properties.put(AzureProperties.ADLS_TOKEN, token.getToken()); + } + return; + } + + if (clientSecret != null) { + ClientSecretCredentialBuilder builder = new ClientSecretCredentialBuilder(); + builder.clientId(clientId); + builder.clientSecret(clientSecret); + builder.tenantId(tenantId); + TokenRequestContext context = new TokenRequestContext().addScopes(TOKEN_REQUEST_CONTEXT_SCOPE); + AccessToken token = builder.build().getToken(context).block(); + if (token != null) { + properties.put(AzureProperties.ADLS_TOKEN, token.getToken()); + } + } + } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergConstants.java index 7b5b913..24c6df6 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergConstants.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergConstants.java @@ -23,6 +23,8 @@ import org.apache.asterix.external.util.aws.AwsConstants; import org.apache.asterix.external.util.azure.AzureConstants; import org.apache.asterix.external.util.google.GCSConstants; +import org.apache.iceberg.aws.s3.S3FileIO; +import org.apache.iceberg.azure.adlsv2.ADLSFileIO; public class IcebergConstants { private IcebergConstants() { @@ -53,7 +55,7 @@ public static final String REST_SIG4_GLUE_SIGNING_NAME = "glue"; // catalog properties - public static final String S3_FILE_IO = "org.apache.iceberg.aws.s3.S3FileIO"; + public static final String S3_FILE_IO = S3FileIO.class.getName(); public static final String REST_SIG4_SIGNING_NAME = "rest.signing-name"; public static final String REST_SIG4_SIGNING_REGION = "rest.signing-region"; } @@ -63,6 +65,10 @@ public static final String QUOTA_PROJECT_ID_KEY = "quotaProjectId"; } + public static class Azure { + public static final String ADLS_FILE_IO = ADLSFileIO.class.getName(); + } + public static class Rest { } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java index b23f6a1..6b16404 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java @@ -18,7 +18,6 @@ */ package org.apache.asterix.external.util.iceberg; -import static org.apache.asterix.common.exceptions.ErrorCode.EXTERNAL_SOURCE_ERROR; import static org.apache.asterix.common.exceptions.ErrorCode.UNSUPPORTED_ICEBERG_DATA_FORMAT; import static org.apache.asterix.external.util.aws.EnsureCloseClientsFactoryRegistry.FACTORY_INSTANCE_ID_KEY; import static org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_AVRO_FORMAT; @@ -46,6 +45,8 @@ import org.apache.asterix.external.util.ExternalDataUtils; import org.apache.asterix.external.util.aws.EnsureCloseClientsFactoryRegistry; import org.apache.asterix.external.util.aws.iceberg.glue.GlueUtils; +import org.apache.asterix.external.util.azure.blob.BlobUtils; +import org.apache.asterix.external.util.azure.datalake.DatalakeUtils; import org.apache.asterix.external.util.google.iceberg.biglake_metastore.BiglakeMetastoreUtils; import org.apache.asterix.external.util.google.iceberg.fileio.GCSFileIO; import org.apache.asterix.external.util.iceberg.nessie.NessieUtils; @@ -379,7 +380,8 @@ } } - public static void setFileIoProperties(Map<String, String> catalogProperties, IcebergCatalogSource catalogSource) { + public static void setFileIoProperties(Map<String, String> catalogProperties, IcebergCatalogSource catalogSource) + throws CompilationException { if (catalogSource == IcebergCatalogSource.NESSIE_REST) { // NESSIE_REST should not set any FileIO properties, it is provided by Nessie server return; @@ -393,6 +395,9 @@ setIcebergS3FileIoProperties(catalogProperties); } else if (ioType.equalsIgnoreCase(ExternalDataConstants.KEY_ADAPTER_NAME_GCS)) { setIcebergGcsFileIoProperties(catalogProperties); + } else if (BlobUtils.isBlobAdapter(ioType) || DatalakeUtils.isDatalakeAdapter(ioType)) { + // ADLSFileIO is used for both Blob storage and Datalake storage + setIcebergAzureAdlsFileIoProperties(catalogProperties); } } @@ -405,6 +410,11 @@ properties.put(CatalogProperties.FILE_IO_IMPL, GCSFileIO.class.getName()); } + public static void setIcebergAzureAdlsFileIoProperties(Map<String, String> properties) throws CompilationException { + properties.put(CatalogProperties.FILE_IO_IMPL, IcebergConstants.Azure.ADLS_FILE_IO); + DatalakeUtils.setIcebergAdlsAuthParams(properties); + } + public static void putCatalogProperties(Map<String, String> addTo, Map<String, String> toAdd) { for (Map.Entry<String, String> entry : toAdd.entrySet()) { addTo.putIfAbsent(ICEBERG_CATALOG_PROPERTY_PREFIX_INTERNAL + entry.getKey(), entry.getValue()); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/nessie/NessieUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/nessie/NessieUtils.java index 9473008..0e36ac6 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/nessie/NessieUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/nessie/NessieUtils.java @@ -163,7 +163,7 @@ private static void setBasicProperties(Map<String, String> catalogProperties) throws CompilationException { String username = catalogProperties.get(USERNAME_FIELD_NAME); String password = catalogProperties.get(PASSWORD_FIELD_NAME); - if (password != null) { + if (password == null) { throw CompilationException.create(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, PASSWORD_FIELD_NAME, USERNAME_FIELD_NAME); } diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml index 7db6e3d..cffa215 100644 --- a/asterixdb/pom.xml +++ b/asterixdb/pom.xml @@ -2012,43 +2012,10 @@ <!-- Iceberg --> <dependency> <groupId>org.apache.iceberg</groupId> - <artifactId>iceberg-core</artifactId> + <artifactId>iceberg-bom</artifactId> <version>${icebergjavasdk.version}</version> - </dependency> - <dependency> - <groupId>org.apache.iceberg</groupId> - <artifactId>iceberg-common</artifactId> - <version>${icebergjavasdk.version}</version> - </dependency> - <dependency> - <groupId>org.apache.iceberg</groupId> - <artifactId>iceberg-api</artifactId> - <version>${icebergjavasdk.version}</version> - </dependency> - <dependency> - <groupId>org.apache.iceberg</groupId> - <artifactId>iceberg-data</artifactId> - <version>${icebergjavasdk.version}</version> - </dependency> - <dependency> - <groupId>org.apache.iceberg</groupId> - <artifactId>iceberg-parquet</artifactId> - <version>${icebergjavasdk.version}</version> - </dependency> - <dependency> - <groupId>org.apache.iceberg</groupId> - <artifactId>iceberg-aws</artifactId> - <version>${icebergjavasdk.version}</version> - </dependency> - <dependency> - <groupId>org.apache.iceberg</groupId> - <artifactId>iceberg-gcp</artifactId> - <version>${icebergjavasdk.version}</version> - </dependency> - <dependency> - <groupId>org.apache.iceberg</groupId> - <artifactId>iceberg-nessie</artifactId> - <version>${icebergjavasdk.version}</version> + <type>pom</type> + <scope>import</scope> </dependency> <dependency> <groupId>org.projectnessie.nessie</groupId> @@ -2060,11 +2027,6 @@ <artifactId>nessie-client</artifactId> <version>${nessieproject.version}</version> </dependency> - <dependency> - <groupId>org.apache.iceberg</groupId> - <artifactId>iceberg-bundled-guava</artifactId> - <version>${icebergjavasdk.version}</version> - </dependency> </dependencies> </dependencyManagement> -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21024?usp=email To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings?usp=email Gerrit-MessageType: merged Gerrit-Project: asterixdb Gerrit-Branch: lumina Gerrit-Change-Id: Idd5819f653a20e207a5ca659850e4298316ec84f Gerrit-Change-Number: 21024 Gerrit-PatchSet: 5 Gerrit-Owner: Hussain Towaileb <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Hussain Towaileb <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]>
