>From Peeyush Gupta <[email protected]>: Peeyush Gupta has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20573?usp=email )
Change subject: [NO ISSUE][ET] Support Delta lake tables on Azure ...................................................................... [NO ISSUE][ET] Support Delta lake tables on Azure - user model changes: no - storage format changes: no - interface changes: no Ext-ref: MB-69384 Change-Id: I528849556dc70d11cc5eb4ab55a7f7382c84061f Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20573 Reviewed-by: Hussain Towaileb <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Peeyush Gupta <[email protected]> Tested-by: Jenkins <[email protected]> --- M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java M asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties A asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/delta/AzureDeltaReaderFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/AzureConstants.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/AzureUtils.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/blob/BlobUtils.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/datalake/DatalakeConstants.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/datalake/DatalakeUtils.java M asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory 10 files changed, 95 insertions(+), 13 deletions(-) Approvals: Hussain Towaileb: Looks good to me, approved Jenkins: Verified; Verified Peeyush Gupta: Looks good to me, but someone else must approve diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java index 1dadeba..326828e 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java @@ -450,6 +450,7 @@ PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT(3122), // Avro error UNSUPPORTED_TYPE_FOR_AVRO(3123), + EXTERNAL_COLLECTION_NOT_SUPPORTED(3124), // Lifecycle management errors DUPLICATE_PARTITION_ID(4000), diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties index 5cff16c..b401b22 100644 --- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties +++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties @@ -454,6 +454,7 @@ 3121 = Parameter '%1$s' or '%2$s' is required if '%3$s' is provided 3122 = Parameter '%1$s' is not allowed if '%2$s' is provided 3123 = Type '%1$s' contains declared fields, which is not supported for 'avro' format +3124 = External collection of type %1$s is not supported on %2$s # Lifecycle management errors 4000 = Partition id %1$s for node %2$s already in use by node %3$s diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/delta/AzureDeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/delta/AzureDeltaReaderFactory.java new file mode 100644 index 0000000..cb40026 --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/delta/AzureDeltaReaderFactory.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.input.record.reader.azure.delta; + +import static org.apache.asterix.external.util.azure.AzureConstants.HADOOP_AZURE_PROTOCOL; +import static org.apache.asterix.external.util.azure.AzureUtils.extractEndPoint; +import static org.apache.asterix.external.util.azure.blob.BlobUtils.buildClient; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.asterix.common.api.IApplicationContext; +import org.apache.asterix.external.input.record.reader.aws.delta.DeltaReaderFactory; +import org.apache.asterix.external.util.ExternalDataConstants; +import org.apache.asterix.external.util.azure.AzureConstants; +import org.apache.asterix.external.util.azure.AzureUtils; +import org.apache.asterix.external.util.azure.datalake.DatalakeUtils; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; + +import com.azure.storage.file.datalake.DataLakeServiceClient; + +public class AzureDeltaReaderFactory extends DeltaReaderFactory { + private static final long serialVersionUID = 1L; + private static final List<String> RECORD_READER_NAMES = + Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_DATA_LAKE); + + @Override + protected void configureJobConf(IApplicationContext appCtx, JobConf conf, Map<String, String> configuration) + throws AlgebricksException { + // get endpoint + DataLakeServiceClient dataLakeServiceClient = DatalakeUtils.buildClient(appCtx, configuration); + String endPoint = extractEndPoint(dataLakeServiceClient.getAccountUrl()); + configuration.put(AzureConstants.ACCOUNT_URL, dataLakeServiceClient.getAccountUrl()); + AzureUtils.configureAzureHdfsJobConf(conf, configuration, endPoint); + } + + @Override + protected String getTablePath(Map<String, String> configuration) throws AlgebricksException { + return HADOOP_AZURE_PROTOCOL + "://" + configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '@' + + extractEndPoint(configuration.get(AzureConstants.ACCOUNT_URL)) + '/' + + configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME); + } + + @Override + public List<String> getRecordReaderNames() { + return RECORD_READER_NAMES; + } + +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java index d8f6a9e..428368b 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java @@ -37,6 +37,7 @@ import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.ExternalDataPrefix; import org.apache.asterix.external.util.ExternalDataUtils; +import org.apache.asterix.external.util.azure.AzureUtils; import org.apache.hadoop.mapred.JobConf; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.application.IServiceContext; @@ -61,7 +62,7 @@ // get endpoint BlobServiceClient blobServiceClient = buildClient(appCtx, configuration); - String endPoint = extractEndPoint(blobServiceClient.getAccountUrl()); + String endPoint = AzureUtils.extractEndPoint(blobServiceClient.getAccountUrl()); // get include/exclude matchers IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration); @@ -138,12 +139,6 @@ return builder.toString(); } - private static String extractEndPoint(String uri) { - //The URI is in the form http(s)://<accountName>.blob.core.windows.net - //We need to Remove the protocol (i.e., http(s)://) from the URI - return uri.substring(uri.indexOf("//") + "//".length()); - } - private static void appendFileURI(StringBuilder builder, String container, String endPoint, BlobItem file) { builder.append(HADOOP_AZURE_PROTOCOL); builder.append("://"); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/AzureConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/AzureConstants.java index 9ae8359..385d1b6 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/AzureConstants.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/AzureConstants.java @@ -51,6 +51,7 @@ public static final String CLIENT_ID_FIELD_NAME = "clientId"; public static final String CLIENT_SECRET_FIELD_NAME = "clientSecret"; public static final String ENDPOINT_FIELD_NAME = "endpoint"; + public static final String ACCOUNT_URL = "accountURL"; /* * Hadoop-Azure diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/AzureUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/AzureUtils.java index b3aeba1..d8c09bb 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/AzureUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/AzureUtils.java @@ -113,4 +113,10 @@ conf.set(HADOOP_CLIENT_ID, clientId); } } + + public static String extractEndPoint(String uri) { + //The URI is in the form http(s)://<accountName>.blob.core.windows.net + //We need to Remove the protocol (i.e., http(s)://) from the URI + return uri.substring(uri.indexOf("//") + "//".length()); + } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/blob/BlobUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/blob/BlobUtils.java index 66e7f98..4860188 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/blob/BlobUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/blob/BlobUtils.java @@ -24,6 +24,7 @@ import static org.apache.asterix.external.util.ExternalDataUtils.getDisableSslVerify; import static org.apache.asterix.external.util.ExternalDataUtils.getFirstNotNull; import static org.apache.asterix.external.util.ExternalDataUtils.getPrefix; +import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable; import static org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude; import static org.apache.asterix.external.util.azure.AzureConstants.ACCOUNT_KEY_FIELD_NAME; import static org.apache.asterix.external.util.azure.AzureConstants.ACCOUNT_NAME_FIELD_NAME; @@ -305,9 +306,11 @@ */ public static void validateAzureBlobProperties(Map<String, String> configuration, SourceLocation srcLoc, IWarningCollector collector, IApplicationContext appCtx) throws CompilationException { - - // check if the format property is present - if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) { + if (isDeltaTable(configuration)) { + throw new CompilationException(ErrorCode.EXTERNAL_COLLECTION_NOT_SUPPORTED, "delta-table", + "azure blob storage"); + } else if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) { + // check if the format property is present throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT); } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/datalake/DatalakeConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/datalake/DatalakeConstants.java index a114530..513af32 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/datalake/DatalakeConstants.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/datalake/DatalakeConstants.java @@ -33,4 +33,5 @@ is file1.json, file2.json and file3.json. */ public static final String RECURSIVE_FIELD_NAME = "recursive"; + public static final boolean DEFAULT_RECUSRIVE_VALUE = true; } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/datalake/DatalakeUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/datalake/DatalakeUtils.java index e37105f..475ede3 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/datalake/DatalakeUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/datalake/DatalakeUtils.java @@ -23,6 +23,8 @@ import static org.apache.asterix.common.exceptions.ErrorCode.REQUIRED_PARAM_IF_PARAM_IS_PRESENT; import static org.apache.asterix.external.util.ExternalDataUtils.getFirstNotNull; import static org.apache.asterix.external.util.ExternalDataUtils.getPrefix; +import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable; +import static org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableProperties; import static org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude; import static org.apache.asterix.external.util.azure.AzureConstants.ACCOUNT_KEY_FIELD_NAME; import static org.apache.asterix.external.util.azure.AzureConstants.ACCOUNT_NAME_FIELD_NAME; @@ -32,6 +34,7 @@ import static org.apache.asterix.external.util.azure.AzureConstants.MANAGED_IDENTITY_FIELD_NAME; import static org.apache.asterix.external.util.azure.AzureConstants.SHARED_ACCESS_SIGNATURE_FIELD_NAME; import static org.apache.asterix.external.util.azure.AzureConstants.TENANT_ID_FIELD_NAME; +import static org.apache.asterix.external.util.azure.datalake.DatalakeConstants.DEFAULT_RECUSRIVE_VALUE; import static org.apache.asterix.external.util.azure.datalake.DatalakeConstants.RECURSIVE_FIELD_NAME; import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString; @@ -229,7 +232,8 @@ // Get all objects in a container and extract the paths to files ListPathsOptions listOptions = new ListPathsOptions(); - boolean recursive = Boolean.parseBoolean(configuration.get(RECURSIVE_FIELD_NAME)); + boolean recursive = configuration.containsKey(RECURSIVE_FIELD_NAME) + ? Boolean.parseBoolean(configuration.get(RECURSIVE_FIELD_NAME)) : DEFAULT_RECUSRIVE_VALUE; listOptions.setRecursive(recursive); listOptions.setPath(getPrefix(configuration, false, false)); PagedIterable<PathItem> pathItems = fileSystemClient.listPaths(listOptions, null); @@ -281,7 +285,9 @@ IWarningCollector collector, IApplicationContext appCtx) throws CompilationException { // check if the format property is present - if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) { + if (isDeltaTable(configuration)) { + validateDeltaTableProperties(configuration); + } else if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) { throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT); } diff --git a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory index 1f25c4b..0eef480 100644 --- a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory +++ b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory @@ -29,4 +29,5 @@ org.apache.asterix.external.input.record.reader.azure.parquet.AzureBlobParquetReaderFactory org.apache.asterix.external.input.record.reader.azure.parquet.AzureDataLakeParquetReaderFactory org.apache.asterix.external.input.record.reader.gcs.parquet.GCSParquetReaderFactory -org.apache.asterix.external.input.record.reader.gcs.delta.GCSDeltaReaderFactory \ No newline at end of file +org.apache.asterix.external.input.record.reader.gcs.delta.GCSDeltaReaderFactory +org.apache.asterix.external.input.record.reader.azure.delta.AzureDeltaReaderFactory \ No newline at end of file -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20573?usp=email To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings?usp=email Gerrit-MessageType: merged Gerrit-Project: asterixdb Gerrit-Branch: phoenix Gerrit-Change-Id: I528849556dc70d11cc5eb4ab55a7f7382c84061f Gerrit-Change-Number: 20573 Gerrit-PatchSet: 5 Gerrit-Owner: Peeyush Gupta <[email protected]> Gerrit-Reviewer: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Hussain Towaileb <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Peeyush Gupta <[email protected]>
