>From Ayush Tripathi <[email protected]>: Ayush Tripathi has uploaded this change for review. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19098 )
Change subject: [ASTERIXDB-3503][EXT] Fixing Internal Error issue when Delta table does not exists. ...................................................................... [ASTERIXDB-3503][EXT] Fixing Internal Error issue when Delta table does not exists. - user model changes: yes - storage format changes: no - interface changes: no Ext-ref : MB-64314 Change-Id: I8d403c8c0698d9d39dc8988eb8b57588a684dbed --- M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java M asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-table-not-exists/deltalake-table-not-exists.00.ddl.sqlpp 5 files changed, 117 insertions(+), 12 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/98/19098/1 diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-table-not-exists/deltalake-table-not-exists.00.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-table-not-exists/deltalake-table-not-exists.00.ddl.sqlpp new file mode 100644 index 0000000..c57de93 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-table-not-exists/deltalake-table-not-exists.00.ddl.sqlpp @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + DROP DATAVERSE test IF EXISTS; + CREATE DATAVERSE test; + + USE test; + + + CREATE TYPE DeltalakeTableType as { + }; + + CREATE EXTERNAL COLLECTION DeltalakeDataset(DeltalakeTableType) USING %adapter% + ( + %template%, + ("container"="playground"), + ("definition"="delta-data/s1"), + ("table-format" = "delta") + ); \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml index 2c7af49..1ac9394 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml @@ -328,6 +328,13 @@ </compilation-unit> </test-case> <test-case FilePath="external-dataset"> + <compilation-unit name="common/deltalake-table-not-exists"> + <placeholder name="adapter" value="S3" /> + <output-dir compare="Text">none</output-dir> + <expected-error>ASX1108: External source error. io.delta.kernel.exceptions.TableNotFoundException: Delta table at path `s3a://playground/delta-data/s1` is not found.</expected-error> + </compilation-unit> + </test-case> + <test-case FilePath="external-dataset"> <compilation-unit name="common/avro/avro-types/avro-map"> <placeholder name="adapter" value="S3" /> <output-dir compare="Text">common/avro/avro-types/avro-map</output-dir> diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java index 9d93fce..e7ee3eb 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java @@ -19,6 +19,7 @@ package org.apache.asterix.external.input.record.reader.aws.delta; import static org.apache.asterix.external.util.aws.s3.S3Constants.SERVICE_END_POINT_FIELD_NAME; +import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString; import java.io.IOException; import java.io.Serializable; @@ -33,6 +34,8 @@ import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory; import org.apache.asterix.external.api.IExternalDataRuntimeContext; import org.apache.asterix.external.api.IRecordReader; @@ -60,6 +63,7 @@ import io.delta.kernel.data.Row; import io.delta.kernel.defaults.engine.DefaultEngine; import io.delta.kernel.engine.Engine; +import io.delta.kernel.exceptions.KernelException; import io.delta.kernel.internal.InternalScanFileUtils; import io.delta.kernel.types.StructType; import io.delta.kernel.utils.CloseableIterator; @@ -109,7 +113,13 @@ Engine engine = DefaultEngine.create(conf); io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine, tableMetadataPath); - Snapshot snapshot = table.getLatestSnapshot(engine); + Snapshot snapshot; + try { + snapshot = table.getLatestSnapshot(engine); + } catch (KernelException e) { + LOGGER.info("Failed to get latest snapshot for table: {}", tableMetadataPath, e); + throw RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, e, getMessageOrToString(e)); + } List<Warning> warnings = new ArrayList<>(); DeltaConverterContext converterContext = new DeltaConverterContext(configuration, warnings); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java index b311870..2dc6bf9 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java @@ -18,6 +18,7 @@ */ package org.apache.asterix.external.util; +import static com.azure.core.implementation.http.rest.RestProxyUtils.LOGGER; import static org.apache.asterix.common.metadata.MetadataConstants.DEFAULT_DATABASE; import static org.apache.asterix.external.util.ExternalDataConstants.DEFINITION_FIELD_NAME; import static org.apache.asterix.external.util.ExternalDataConstants.KEY_DELIMITER; @@ -29,12 +30,14 @@ import static org.apache.asterix.external.util.ExternalDataConstants.KEY_QUOTE; import static org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_END; import static org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_START; +import static org.apache.asterix.external.util.aws.s3.S3Constants.SERVICE_END_POINT_FIELD_NAME; import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.validateAzureBlobProperties; import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.validateAzureDataLakeProperties; import static org.apache.asterix.external.util.google.gcs.GCSUtils.validateProperties; import static org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE; import static org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.EMPTY_TYPE; import static org.apache.asterix.runtime.evaluators.functions.StringEvaluatorUtils.RESERVED_REGEX_CHARS; +import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString; import static org.msgpack.core.MessagePack.Code.ARRAY16; import java.io.ByteArrayOutputStream; @@ -109,6 +112,12 @@ import org.apache.iceberg.Table; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.io.CloseableIterable; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import io.delta.kernel.defaults.engine.DefaultEngine; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.exceptions.KernelException; public class ExternalDataUtils { @@ -117,6 +126,8 @@ private static final int DEFAULT_MAX_ARGUMENT_SZ = 1024 * 1024; private static final int HEADER_FUDGE = 64; + private static final Logger LOGGER = LogManager.getLogger(); + static { valueParserFactoryMap.put(ATypeTag.INTEGER, IntegerParserFactory.INSTANCE); valueParserFactoryMap.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE); @@ -504,6 +515,35 @@ } } + public static void validateDeltaTableExists(Map<String, String> configuration) throws CompilationException { + Configuration conf = new Configuration(); + conf.set(S3Constants.HADOOP_ACCESS_KEY_ID, configuration.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME)); + conf.set(S3Constants.HADOOP_SECRET_ACCESS_KEY, configuration.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME)); + if (configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME) != null) { + conf.set(S3Constants.HADOOP_SESSION_TOKEN, configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME)); + } + conf.set(S3Constants.HADOOP_REGION, configuration.get(S3Constants.REGION_FIELD_NAME)); + String serviceEndpoint = configuration.get(SERVICE_END_POINT_FIELD_NAME); + if (serviceEndpoint != null) { + conf.set(S3Constants.HADOOP_SERVICE_END_POINT, serviceEndpoint); + } + conf.set(ExternalDataConstants.KEY_REQUESTED_FIELDS, + configuration.getOrDefault(ExternalDataConstants.KEY_REQUESTED_FIELDS, "")); + conf.set(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION, + configuration.getOrDefault(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION, "")); + String tableMetadataPath = S3Constants.HADOOP_S3_PROTOCOL + "://" + + configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/' + + configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME); + Engine engine = DefaultEngine.create(conf); + io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine, tableMetadataPath); + try { + table.getLatestSnapshot(engine); + } catch (KernelException e) { + LOGGER.info("Failed to get latest snapshot for table: {}", tableMetadataPath, e); + throw CompilationException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, e, getMessageOrToString(e)); + } + } + public static void prepareIcebergTableFormat(Map<String, String> configuration, Configuration conf, String tableMetadataPath) throws AlgebricksException { HadoopTables tables = new HadoopTables(conf); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java index 891d7f3..1dfe13a 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java @@ -22,10 +22,7 @@ import static org.apache.asterix.common.exceptions.ErrorCode.PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT; import static org.apache.asterix.common.exceptions.ErrorCode.REQUIRED_PARAM_IF_PARAM_IS_PRESENT; import static org.apache.asterix.common.exceptions.ErrorCode.S3_REGION_NOT_SUPPORTED; -import static org.apache.asterix.external.util.ExternalDataUtils.getPrefix; -import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable; -import static org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableProperties; -import static org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude; +import static org.apache.asterix.external.util.ExternalDataUtils.*; import static org.apache.asterix.external.util.aws.s3.S3Constants.ACCESS_KEY_ID_FIELD_NAME; import static org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_INTERNAL_ERROR; import static org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_METHOD_NOT_IMPLEMENTED; @@ -274,13 +271,6 @@ */ public static void validateProperties(Map<String, String> configuration, SourceLocation srcLoc, IWarningCollector collector) throws CompilationException { - if (isDeltaTable(configuration)) { - validateDeltaTableProperties(configuration); - } - // check if the format property is present - else if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) { - throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT); - } // Both parameters should be passed, or neither should be passed (for anonymous/no auth) String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME); @@ -346,6 +336,14 @@ if (!response.sdkHttpResponse().isSuccessful()) { throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, container); } + if (isDeltaTable(configuration)) { + validateDeltaTableProperties(configuration); + validateDeltaTableExists(configuration); + } + // check if the format property is present + else if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) { + throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT); + } } /** -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19098 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: goldfish Gerrit-Change-Id: I8d403c8c0698d9d39dc8988eb8b57588a684dbed Gerrit-Change-Number: 19098 Gerrit-PatchSet: 1 Gerrit-Owner: Ayush Tripathi <[email protected]> Gerrit-MessageType: newchange
