>From Hussain Towaileb <[email protected]>:
Hussain Towaileb has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20648?usp=email )
Change subject: [ASTERIXDB-3634][EXT]: Add support to Iceberg pt.3
......................................................................
[ASTERIXDB-3634][EXT]: Add support to Iceberg pt.3
Details:
- Support parquet format by default unless format is provided.
- All some method to be overridden by extensions.
- Remove null properties values before init'ing catalog.
- Disable failing test.
Ext-ref: MB-63115
Change-Id: I1726c2168bfec1f137390c7c2112c2df59151dc2
---
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M
asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
A
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/IcebergFileRecordReader.java
A
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/converter/IcebergConverterContext.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
8 files changed, 301 insertions(+), 6 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/48/20648/1
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 0946e4a..3a62746 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -1187,11 +1187,12 @@
return Optional.of(dataset);
}
- private void validateIfIcebergTable(Map<String, String> properties,
MetadataTransactionContext mdTxnCtx,
+ protected void validateIfIcebergTable(Map<String, String> properties,
MetadataTransactionContext mdTxnCtx,
SourceLocation srcLoc) throws AlgebricksException {
if (!IcebergUtils.isIcebergTable(properties)) {
return;
}
+ IcebergUtils.setDefaultFormat(properties);
// ensure the specified catalog exists
String catalogName =
properties.get(IcebergConstants.ICEBERG_CATALOG_NAME);
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index c6d6d32..ae4154e 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -1479,12 +1479,14 @@
<expected-error>ASX1178: Unsupported iceberg
table</expected-error>
</compilation-unit>
</test-case>
+ <!-- old iceberg test, check why failing
<test-case FilePath="external-dataset/s3">
<compilation-unit name="iceberg-mixed-data-format">
<output-dir compare="Text">none</output-dir>
<expected-error>avro-file.avro. Reason: not a Parquet
file</expected-error>
</compilation-unit>
</test-case>
+ -->
<test-case FilePath="external-dataset/s3">
<compilation-unit name="iceberg-empty">
<output-dir compare="Text">iceberg-empty</output-dir>
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/IcebergFileRecordReader.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/IcebergFileRecordReader.java
new file mode 100644
index 0000000..e77859e
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/IcebergFileRecordReader.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.asterix.external.input.record.GenericRecord;
+import org.apache.asterix.external.util.IFeedLogManager;
+import org.apache.asterix.external.util.iceberg.IcebergConstants;
+import org.apache.asterix.external.util.iceberg.IcebergUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.parquet.Parquet;
+
+/**
+ * Iceberg record reader.
+ * The reader returns records in Iceberg Record format.
+ */
+public class IcebergFileRecordReader implements IRecordReader<Record> {
+
+ private final List<FileScanTask> fileScanTasks;
+ private final Schema projectedSchema;
+ private final Map<String, String> configuration;
+ private final IRawRecord<Record> record;
+
+ private int nextTaskIndex = 0;
+ private FileIO tableFileIo;
+ private CloseableIterable<Record> iterable;
+ private Iterator<Record> recordsIterator;
+
+ public IcebergFileRecordReader(List<FileScanTask> fileScanTasks, Schema
projectedSchema,
+ Map<String, String> configuration) throws HyracksDataException {
+ this.fileScanTasks = fileScanTasks;
+ this.projectedSchema = projectedSchema;
+ this.configuration = configuration;
+ this.record = new GenericRecord<>();
+
+ try {
+ initializeTable();
+ } catch (CompilationException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ private void initializeTable() throws CompilationException {
+ if (fileScanTasks.isEmpty()) {
+ return;
+ }
+
+ String namespace = IcebergUtils.getNamespace(configuration);
+ String tableName =
configuration.get(IcebergConstants.ICEBERG_TABLE_NAME_PROPERTY_KEY);
+ Catalog catalog =
+
IcebergUtils.initializeCatalog(IcebergUtils.filterCatalogProperties(configuration),
namespace);
+ TableIdentifier tableIdentifier =
TableIdentifier.of(Namespace.of(namespace), tableName);
+ if (!catalog.tableExists(tableIdentifier)) {
+ throw
CompilationException.create(ErrorCode.ICEBERG_TABLE_DOES_NOT_EXIST, tableName);
+ }
+ Table table = catalog.loadTable(tableIdentifier);
+ tableFileIo = table.io();
+ }
+
+ public boolean hasNext() throws Exception {
+ // iterator has more records
+ if (recordsIterator != null && recordsIterator.hasNext()) {
+ return true;
+ }
+
+ // go to next task
+ // if a file is empty, we will go to the next task
+ while (nextTaskIndex < fileScanTasks.size()) {
+
+ // close previous iterable
+ if (iterable != null) {
+ iterable.close();
+ iterable = null;
+ }
+
+ // Load next task
+ setNextRecordsIterator();
+
+ // if the new iterator has rows → good
+ if (recordsIterator != null && recordsIterator.hasNext()) {
+ return true;
+ }
+
+ // else: this task is empty → continue the loop to the next task
+ }
+
+ // no more tasks & no more rows
+ return false;
+ }
+
+ @Override
+ public IRawRecord<Record> next() throws IOException, InterruptedException {
+ Record icebergRecord = recordsIterator.next();
+ record.set(icebergRecord);
+ return record;
+ }
+
+ @Override
+ public boolean stop() {
+ return false;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (tableFileIo != null) {
+ tableFileIo.close();
+ }
+ if (iterable != null) {
+ iterable.close();
+ }
+ }
+
+ @Override
+ public void setController(AbstractFeedDataFlowController controller) {
+
+ }
+
+ @Override
+ public void setFeedLogManager(IFeedLogManager feedLogManager) throws
HyracksDataException {
+
+ }
+
+ @Override
+ public boolean handleException(Throwable th) {
+ return false;
+ }
+
+ private void setNextRecordsIterator() {
+ FileScanTask task = fileScanTasks.get(nextTaskIndex++);
+ InputFile inFile = tableFileIo.newInputFile(task.file().location());
+ iterable =
Parquet.read(inFile).project(projectedSchema).split(task.start(), task.length())
+ .createReaderFunc(fs ->
GenericParquetReaders.buildReader(projectedSchema, fs)).build();
+ recordsIterator = iterable.iterator();
+ }
+}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/converter/IcebergConverterContext.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/converter/IcebergConverterContext.java
new file mode 100644
index 0000000..916e536
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/converter/IcebergConverterContext.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta.converter;
+
+import java.io.DataOutput;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+
+import org.apache.asterix.external.parser.jackson.ParserContext;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.om.base.ADate;
+import org.apache.asterix.om.base.ADateTime;
+import org.apache.asterix.om.base.AMutableDate;
+import org.apache.asterix.om.base.AMutableDateTime;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.Warning;
+
+public class IcebergConverterContext extends ParserContext {
+ @SuppressWarnings("unchecked")
+ private final ISerializerDeserializer<ADate> dateSerDer =
+
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADATE);
+ @SuppressWarnings("unchecked")
+ private final ISerializerDeserializer<ADateTime> datetimeSerDer =
+
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADATETIME);
+ private final boolean decimalToDouble;
+ private final boolean timestampAsLong;
+ private final boolean dateAsInt;
+
+ private final int timeZoneOffset;
+ private final AMutableDate mutableDate = new AMutableDate(0);
+ private final AMutableDateTime mutableDateTime = new AMutableDateTime(0);
+ private final List<Warning> warnings;
+
+ public IcebergConverterContext(Map<String, String> configuration,
List<Warning> warnings) {
+ this.warnings = warnings;
+ decimalToDouble = Boolean.parseBoolean(configuration
+
.getOrDefault(ExternalDataConstants.DeltaOptions.DECIMAL_TO_DOUBLE,
ExternalDataConstants.FALSE));
+ timestampAsLong = Boolean.parseBoolean(configuration
+
.getOrDefault(ExternalDataConstants.DeltaOptions.TIMESTAMP_AS_LONG,
ExternalDataConstants.TRUE));
+ dateAsInt = Boolean.parseBoolean(
+
configuration.getOrDefault(ExternalDataConstants.DeltaOptions.DATE_AS_INT,
ExternalDataConstants.TRUE));
+ String configuredTimeZoneId =
configuration.get(ExternalDataConstants.DeltaOptions.TIMEZONE);
+ if (configuredTimeZoneId != null && !configuredTimeZoneId.isEmpty()) {
+ timeZoneOffset =
TimeZone.getTimeZone(configuredTimeZoneId).getRawOffset();
+ } else {
+ timeZoneOffset = 0;
+ }
+ }
+
+ public void serializeDate(int value, DataOutput output) {
+ try {
+ mutableDate.setValue(value);
+ dateSerDer.serialize(mutableDate, output);
+ } catch (HyracksDataException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public void serializeDateTime(long timestamp, DataOutput output) {
+ try {
+ mutableDateTime.setValue(timestamp);
+ datetimeSerDer.serialize(mutableDateTime, output);
+ } catch (HyracksDataException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public boolean isDecimalToDoubleEnabled() {
+ return decimalToDouble;
+ }
+
+ public int getTimeZoneOffset() {
+ return timeZoneOffset;
+ }
+
+ public boolean isTimestampAsLong() {
+ return timestampAsLong;
+ }
+
+ public boolean isDateAsInt() {
+ return dateAsInt;
+ }
+
+ public List<Warning> getWarnings() {
+ return warnings;
+ }
+}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java
index f9af1d1..1b7d2ae 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java
@@ -39,6 +39,7 @@
import org.apache.asterix.external.api.IExternalDataRuntimeContext;
import org.apache.asterix.external.api.IIcebergRecordReaderFactory;
import org.apache.asterix.external.api.IRecordReader;
+import
org.apache.asterix.external.input.record.reader.aws.delta.IcebergFileRecordReader;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.iceberg.IcebergConstants;
import org.apache.asterix.external.util.iceberg.IcebergUtils;
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
index 9608108..c026f29 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
@@ -318,10 +318,9 @@
throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED,
srcLoc, ExternalDataConstants.KEY_FORMAT);
}
- // iceberg tables can be created without passing the bucket,
- // only validate bucket presence if container is passed
+ // container is not needed for iceberg tables, skip validation
String container =
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
- if (IcebergUtils.isIcebergTable(configuration) && container == null) {
+ if (IcebergUtils.isIcebergTable(configuration)) {
return;
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java
index 369b5fe..3569aca 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java
@@ -27,6 +27,7 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import org.apache.asterix.common.config.CatalogConfig;
@@ -168,6 +169,8 @@
throw
CompilationException.create(ErrorCode.UNSUPPORTED_ICEBERG_CATALOG_SOURCE,
source);
}
+ // remove null values to avoid failures in internal checks
+ catalogProperties.values().removeIf(Objects::isNull);
return switch (catalogSource.get()) {
case CatalogConfig.IcebergCatalogSource.AWS_GLUE ->
GlueUtils.initializeCatalog(catalogProperties, namespace);
case CatalogConfig.IcebergCatalogSource.BIGLAKE_METASTORE ->
BiglakeMetastore.initializeCatalog(catalogProperties, namespace);
@@ -201,4 +204,14 @@
ARecordType projectedRecordType =
ExternalDataUtils.getExpectedType(encoded);
return projectedRecordType.getFieldNames();
}
+
+ /**
+ * Sets the default format to Parquet if the format is not provided for
Iceberg tables
+ * @param configuration configuration
+ */
+ public static void setDefaultFormat(Map<String, String> configuration) {
+ if (IcebergUtils.isIcebergTable(configuration) &&
configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
+ configuration.put(ExternalDataConstants.KEY_FORMAT,
ExternalDataConstants.FORMAT_PARQUET);
+ }
+ }
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index f0a803a..3eb1940 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -1009,7 +1009,7 @@
setSourceType(configuration, adapterName);
// for iceberg table, add catalog properties to the configuration
- addIcebergCatalogPropertiesIfNeeded(configuration);
+ addIcebergCatalogPropertiesIfNeeded(appCtx, configuration);
return
AdapterFactoryProvider.getAdapterFactory(getApplicationContext().getServiceContext(),
adapterName,
configuration, itemType, null, warningCollector,
filterEvaluatorFactory);
} catch (AlgebricksException e) {
@@ -1019,7 +1019,8 @@
}
}
- private void addIcebergCatalogPropertiesIfNeeded(Map<String, String>
configuration) throws AlgebricksException {
+ protected void addIcebergCatalogPropertiesIfNeeded(ICcApplicationContext
appCtx, Map<String, String> configuration)
+ throws AlgebricksException {
if (IcebergUtils.isIcebergTable(configuration)) {
String catalogName =
configuration.get(IcebergConstants.ICEBERG_CATALOG_NAME);
IcebergCatalog catalog =
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20648?usp=email
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings?usp=email
Gerrit-MessageType: newchange
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I1726c2168bfec1f137390c7c2112c2df59151dc2
Gerrit-Change-Number: 20648
Gerrit-PatchSet: 1
Gerrit-Owner: Hussain Towaileb <[email protected]>