>From Hussain Towaileb <[email protected]>:
Hussain Towaileb has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21272?usp=email )
Change subject: [NO ISSUE][EXT]: ensure all required params are set before
validation
......................................................................
[NO ISSUE][EXT]: ensure all required params are set before validation
Details:
- Ensure all required Iceberg parameters are already assigned
before performing any validation.
Ext-ref: MB-71974
Change-Id: I02c35ea1f874da33ff995f1992734b66153b7cf8
---
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/helpers/IcebergStatementValidationHelper.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
M asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/IcebergParquetDataParser.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java
6 files changed, 38 insertions(+), 18 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/72/21272/1
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 86b71d4..8aa308f 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -117,6 +117,7 @@
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.util.WriterValidationUtil;
import org.apache.asterix.external.util.iceberg.IcebergConstants;
+import org.apache.asterix.external.util.iceberg.IcebergUtils;
import
org.apache.asterix.external.writer.printer.parquet.SchemaConverterVisitor;
import org.apache.asterix.lang.common.base.Expression;
import org.apache.asterix.lang.common.base.IQueryRewriter;
@@ -1072,7 +1073,10 @@
ExternalDataUtils.normalize(properties);
ExternalDataUtils.validate(properties);
ExternalDataUtils.validateType(properties, (ARecordType)
itemType);
- validateIfIcebergTable(metadataProvider,
requestParameters, properties, mdTxnCtx, sourceLoc,
+ beforeExternalCollectionValidation(properties);
+
+ Map<String, String> propertiesCopy = new
HashMap<>(properties);
+ validateIfIcebergTable(metadataProvider,
requestParameters, propertiesCopy, mdTxnCtx, sourceLoc,
externalDetails.getAdapter());
validateExternalDatasetProperties(externalDetails,
properties, dd.getSourceLocation(), mdTxnCtx,
appCtx, metadataProvider);
@@ -1186,6 +1190,18 @@
return Optional.of(dataset);
}
+ /**
+ * extensions can use this method to apply any changes on the original
external collection properties
+ * before validation
+ *
+ * @param properties external collection properties
+ */
+ protected void beforeExternalCollectionValidation(Map<String, String>
properties) {
+ if (IcebergUtils.isIcebergTable(properties)) {
+ IcebergUtils.setDefaults(properties);
+ }
+ }
+
protected void validateIfIcebergTable(MetadataProvider metadataProvider,
IRequestParameters requestParameters,
Map<String, String> properties, MetadataTransactionContext
mdTxnCtx, SourceLocation srcLoc, String adapter)
throws AlgebricksException {
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/helpers/IcebergStatementValidationHelper.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/helpers/IcebergStatementValidationHelper.java
index 8df53b2..7ac35b6 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/helpers/IcebergStatementValidationHelper.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/helpers/IcebergStatementValidationHelper.java
@@ -19,7 +19,6 @@
package org.apache.asterix.app.translator.helpers;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Map;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
@@ -49,29 +48,26 @@
}
public static void validateIfIcebergTable(ICcApplicationContext appCtx,
MetadataProvider metadataProvider,
- MetadataTransactionContext mdTxnCtx, Map<String, String>
collectionProperties,
- Map<String, String> extraCollectionProperties, SourceLocation
srcLoc, String adapter)
- throws AlgebricksException {
- if (!IcebergUtils.isIcebergTable(collectionProperties)) {
+ MetadataTransactionContext mdTxnCtx, Map<String, String>
properties, Map<String, String> extraProperties,
+ SourceLocation srcLoc, String adapter) throws AlgebricksException {
+ if (!IcebergUtils.isIcebergTable(properties)) {
return;
}
- IcebergUtils.setDefaultFormat(collectionProperties);
- IcebergUtils.validateIcebergTableProperties(collectionProperties);
+ IcebergUtils.validateIcebergTableProperties(properties);
// work on a copy of the properties from now onward to avoid modifying
the original collection properties
- Map<String, String> propertiesCopy = new
HashMap<>(collectionProperties);
- propertiesCopy.put(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE,
adapter);
+ properties.put(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE,
adapter);
// ensure the specified catalog exists
- String catalogName =
propertiesCopy.get(IcebergConstants.ICEBERG_CATALOG_NAME);
+ String catalogName =
properties.get(IcebergConstants.ICEBERG_CATALOG_NAME);
Catalog catalog = MetadataManager.INSTANCE.getCatalog(mdTxnCtx,
catalogName);
if (catalog == null) {
throw new CompilationException(ErrorCode.UNKNOWN_CATALOG, srcLoc,
catalogName);
}
// validate snapshot exists if provided
- propertiesCopy.putAll(extraCollectionProperties);
- metadataProvider.addIcebergCatalogPropertiesIfNeeded(appCtx,
propertiesCopy);
- IcebergSnapshotUtils.validateSnapshotExists(propertiesCopy);
+ properties.putAll(extraProperties);
+ metadataProvider.addIcebergCatalogPropertiesIfNeeded(appCtx,
properties);
+ IcebergSnapshotUtils.validateSnapshotExists(properties);
}
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index dbdd41e..6254733 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -353,6 +353,7 @@
PARQUET_WRITER_ERROR(1245),
UPDATE_INSERT_POSITION_OUT_OF_BOUNDS(1246),
ASYNC_NOT_SUPPORTED_FOR_STATEMENT(1247),
+ UNSUPPORTED_FILE_IO_TYPE(1248),
// Feed errors
DATAFLOW_ILLEGAL_STATE(3001),
diff --git
a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 0850ad1..78d3e20 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -355,6 +355,7 @@
1245 = Parquet writer error: %1$s
1246 = Insert position %1$s is out of bounds for array of length %2$s
1247 = Async mode not supported for the statement %1$s
+1248 = Unsupported Iceberg FileIO type. Found: %1$s
# Feed Errors
3001 = Illegal state.
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/IcebergParquetDataParser.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/IcebergParquetDataParser.java
index e88a9cc..79840f2 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/IcebergParquetDataParser.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/IcebergParquetDataParser.java
@@ -328,7 +328,7 @@
binarySerde.serialize(aBinary, out);
}
- public void serializeDate(Object value, DataOutput output) throws
HyracksDataException {
+ private void serializeDate(Object value, DataOutput output) throws
HyracksDataException {
LocalDate localDate = (LocalDate) value;
if (parserContext.isDateAsInt()) {
serializeInteger((int) localDate.toEpochDay(), output);
@@ -338,7 +338,7 @@
}
}
- public void serializeTime(Object value, DataOutput output) throws
HyracksDataException {
+ private void serializeTime(Object value, DataOutput output) throws
HyracksDataException {
LocalTime localTime = (LocalTime) value;
int timeInMillis = (int)
TimeUnit.NANOSECONDS.toMillis(localTime.toNanoOfDay());
if (parserContext.isTimeAsInt()) {
@@ -349,7 +349,7 @@
}
}
- public void serializeTimestamp(Type type, Object value, DataOutput output)
throws HyracksDataException {
+ private void serializeTimestamp(Type type, Object value, DataOutput
output) throws HyracksDataException {
Instant instant;
switch (value) {
case OffsetDateTime offsetDateTime -> instant =
offsetDateTime.toInstant();
@@ -385,7 +385,7 @@
}
}
- public static ATypeTag getTypeTag(Type type, boolean isNull,
IcebergConverterContext parserContext)
+ private static ATypeTag getTypeTag(Type type, boolean isNull,
IcebergConverterContext parserContext)
throws HyracksDataException {
if (isNull) {
return ATypeTag.NULL;
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java
index 4006a65..6366275 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java
@@ -359,6 +359,10 @@
return projectedRecordType.getFieldNames();
}
+ public static void setDefaults(Map<String, String> configuration) {
+ setDefaultFormat(configuration);
+ }
+
/**
* Sets the default format to Parquet if the format is not provided for
Iceberg tables
* @param configuration configuration
@@ -425,6 +429,8 @@
} else if (BlobUtils.isBlobAdapter(ioType) ||
DatalakeUtils.isDatalakeAdapter(ioType)) {
// ADLSFileIO is used for both Blob storage and Datalake storage
setIcebergAzureAdlsFileIoProperties(catalogProperties);
+ } else {
+ throw
CompilationException.create(ErrorCode.UNSUPPORTED_FILE_IO_TYPE, ioType);
}
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21272?usp=email
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings?usp=email
Gerrit-MessageType: newchange
Gerrit-Project: asterixdb
Gerrit-Branch: lumina
Gerrit-Change-Id: I02c35ea1f874da33ff995f1992734b66153b7cf8
Gerrit-Change-Number: 21272
Gerrit-PatchSet: 1
Gerrit-Owner: Hussain Towaileb <[email protected]>