>From Hussain Towaileb <[email protected]>: Hussain Towaileb has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20689?usp=email )
Change subject: [ASTERIXDB-3634][EXT]: Add support to Iceberg pt.4 ...................................................................... [ASTERIXDB-3634][EXT]: Add support to Iceberg pt.4 Details: - Validate table properties at creation. - namespace is required when creation iceberg table. - validate snapshot id/timestamp at creation. - pin snapshot id during compile and runtime phases. - properly handle deletes when reading tables. Ext-ref: MB-63115 Change-Id: I0d9d962720e3a8fb4935d35ec3efa50c5ffc030a Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20689 Reviewed-by: Hussain Towaileb <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Peeyush Gupta <[email protected]> Tested-by: Jenkins <[email protected]> --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java M asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties M asterixdb/asterix-external-data/pom.xml M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergFileRecordReader.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergConstants.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java 8 files changed, 158 insertions(+), 26 deletions(-) Approvals: Jenkins: Verified; Verified Hussain Towaileb: Looks good to me, but someone else must approve Peeyush Gupta: Looks good to me, approved diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index 3a62746..c3f7c55 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -1193,6 +1193,7 @@ return; } IcebergUtils.setDefaultFormat(properties); + IcebergUtils.validateIcebergTableProperties(properties); // ensure the specified catalog exists String catalogName = properties.get(IcebergConstants.ICEBERG_CATALOG_NAME); diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java index 29aa4a5..5eb48e3 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java @@ -343,6 +343,8 @@ ICEBERG_NAMESPACE_DOES_NOT_EXIST(1235), ICEBERG_TABLE_DOES_NOT_EXIST(1236), UNSUPPORTED_ICEBERG_DATA_FORMAT(1237), + ICEBERG_SNAPSHOT_ID_NOT_FOUND(1238), + INVALID_ICEBERG_SNAPSHOT_VALUE(1239), // Feed errors DATAFLOW_ILLEGAL_STATE(3001), diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties index 607eb0b..2968701 100644 --- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties +++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties @@ -345,6 +345,8 @@ 1235 = The provided Iceberg namespace '%1$s' does not exist 1236 = The provided Iceberg table '%1$s' does not exist 1237 = The provided data format '%1$s' is not supported for Iceberg tables +1238 = Iceberg snapshot '%1$s' not found +1239 = Invalid Iceberg snapshot value: '%1$s' # Feed Errors 3001 = Illegal state. diff --git a/asterixdb/asterix-external-data/pom.xml b/asterixdb/asterix-external-data/pom.xml index c9ff38d..283a7ac 100644 --- a/asterixdb/asterix-external-data/pom.xml +++ b/asterixdb/asterix-external-data/pom.xml @@ -649,6 +649,10 @@ <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-parquet</artifactId> </dependency> + <dependency> + <groupId>org.apache.iceberg</groupId> + <artifactId>iceberg-data</artifactId> + </dependency> </dependencies> <!-- apply patch for HADOOP-17225 to workaround CVE-2019-10172 --> <repositories> diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergFileRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergFileRecordReader.java index 9f779b8..8e6273a 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergFileRecordReader.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergFileRecordReader.java @@ -35,10 +35,12 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.GenericDeleteFilter; import org.apache.iceberg.data.Record; import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.io.CloseableIterable; @@ -60,6 +62,7 @@ private int nextTaskIndex = 0; private Catalog catalog; private FileIO tableFileIo; + private Schema schemaAtSnapshot; private CloseableIterable<Record> iterable; private Iterator<Record> recordsIterator; @@ -91,8 +94,23 @@ } Table table = catalog.loadTable(tableIdentifier); tableFileIo = table.io(); + + // we always have a snapshot id since we pin it at compile time + long snapshotId = getSnapshotId(configuration); + Snapshot snapshot = table.snapshot(snapshotId); + if (snapshot == null) { + // Snapshot might have been expired/GC'd between compile and runtime + throw CompilationException.create(ErrorCode.ICEBERG_SNAPSHOT_ID_NOT_FOUND, snapshotId); + } + + this.schemaAtSnapshot = table.schemas().get(snapshot.schemaId()); + if (schemaAtSnapshot == null) { + throw CompilationException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, + "Missing schemaId=" + snapshot.schemaId() + " for snapshotId=" + snapshotId); + } } + @Override public boolean hasNext() throws Exception { // iterator has more records if (recordsIterator != null && recordsIterator.hasNext()) { @@ -138,12 +156,12 @@ @Override public void close() throws IOException { - if (tableFileIo != null) { - tableFileIo.close(); - } if (iterable != null) { iterable.close(); } + if (tableFileIo != null) { + tableFileIo.close(); + } try { IcebergUtils.closeCatalog(catalog); @@ -154,12 +172,12 @@ @Override public void setController(AbstractFeedDataFlowController controller) { - + // no-op } @Override public void setFeedLogManager(IFeedLogManager feedLogManager) throws HyracksDataException { - + // no-op } @Override @@ -170,8 +188,33 @@ private void setNextRecordsIterator() { FileScanTask task = fileScanTasks.get(nextTaskIndex++); InputFile inFile = tableFileIo.newInputFile(task.file().location()); - iterable = Parquet.read(inFile).project(projectedSchema).split(task.start(), task.length()) - .createReaderFunc(fs -> GenericParquetReaders.buildReader(projectedSchema, fs)).build(); - recordsIterator = iterable.iterator(); + + int deletesCount = (task.deletes() == null) ? 0 : task.deletes().size(); + if (deletesCount == 0) { + // No deletes: read only projected schema + iterable = Parquet.read(inFile).project(projectedSchema).filter(task.residual()) + .split(task.start(), task.length()) + .createReaderFunc(fs -> GenericParquetReaders.buildReader(projectedSchema, fs)).build(); + recordsIterator = iterable.iterator(); + return; + } + + // Has deletes: read required schema, then apply delete filter + GenericDeleteFilter deleteFilter = + new GenericDeleteFilter(tableFileIo, task, schemaAtSnapshot, projectedSchema); + + Schema requiredSchema = deleteFilter.requiredSchema(); + iterable = + Parquet.read(inFile).project(requiredSchema).filter(task.residual()).split(task.start(), task.length()) + .createReaderFunc(fs -> GenericParquetReaders.buildReader(requiredSchema, fs)).build(); + recordsIterator = deleteFilter.filter(iterable).iterator(); + } + + private long getSnapshotId(Map<String, String> configuration) { + String snapshotStr = configuration.get(IcebergConstants.ICEBERG_SNAPSHOT_ID_PROPERTY_KEY); + if (snapshotStr != null) { + return Long.parseLong(snapshotStr); + } + throw new IllegalStateException("Snapshot must've been pinned during compilation phase"); } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java index f9af1d1..6c37242 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java @@ -18,8 +18,13 @@ */ package org.apache.asterix.external.input.record.reader.aws.iceberg; +import static org.apache.asterix.common.exceptions.ErrorCode.EXTERNAL_SOURCE_ERROR; +import static org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_SCHEMA_ID_PROPERTY_KEY; +import static org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_SNAPSHOT_ID_PROPERTY_KEY; +import static org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY; import static org.apache.asterix.external.util.iceberg.IcebergUtils.getProjectedFields; -import static org.apache.asterix.external.util.iceberg.IcebergUtils.setSnapshot; +import static org.apache.asterix.external.util.iceberg.IcebergUtils.snapshotIdExists; +import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString; import java.io.Serializable; import java.util.ArrayList; @@ -49,6 +54,7 @@ import org.apache.hyracks.api.exceptions.IWarningCollector; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.TableScan; import org.apache.iceberg.catalog.Catalog; @@ -56,6 +62,7 @@ import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.Record; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.util.SnapshotUtil; public class IcebergParquetRecordReaderFactory implements IIcebergRecordReaderFactory<Record> { @@ -130,13 +137,16 @@ Table table = catalog.loadTable(tableIdentifier); TableScan scan = table.newScan(); - scan = setSnapshot(configuration, scan); + scan = setAndPinScanSnapshot(configurationCopy, table, scan); + long snapshotId = Long.parseLong(configurationCopy.get(ICEBERG_SNAPSHOT_ID_PROPERTY_KEY)); + Schema schemaAtSnapshot = table.schemas().get(table.snapshot(snapshotId).schemaId()); + String[] projectedFields = getProjectedFields(configuration); - projectedSchema = table.schema(); + projectedSchema = schemaAtSnapshot; if (projectedFields != null && projectedFields.length > 0) { projectedSchema = projectedSchema.select(projectedFields); } - scan.project(projectedSchema); + scan = scan.project(projectedSchema); try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) { tasks.forEach(fileScanTasks::add); } @@ -146,7 +156,7 @@ throw ex; } catch (Exception ex) { throwable = ex; - throw CompilationException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, ex.getMessage()); + throw CompilationException.create(EXTERNAL_SOURCE_ERROR, ex, ex.getMessage()); } finally { try { IcebergUtils.closeCatalog(catalog); @@ -154,7 +164,7 @@ if (throwable != null) { throwable.addSuppressed(ex); } else { - throw CompilationException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, ex.getMessage()); + throw CompilationException.create(EXTERNAL_SOURCE_ERROR, ex, ex.getMessage()); } } } @@ -182,6 +192,54 @@ return projectedSchema; } + /** + * Sets the snapshot id (or timestamp) if present and pin it to be used by both compile and runtime phases. If no + * snapshot is provided, the latest snapshot is used and pinned. + * + * @param configurationCopy configurationCopy + * @param table table + * @param scan scan + * @return table scan + * @throws CompilationException CompilationException + */ + private TableScan setAndPinScanSnapshot(Map<String, String> configurationCopy, Table table, TableScan scan) + throws CompilationException { + long snapshot; + String snapshotIdStr = configurationCopy.get(ICEBERG_SNAPSHOT_ID_PROPERTY_KEY); + String asOfTimestampStr = configurationCopy.get(ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY); + + if (snapshotIdStr != null) { + snapshot = Long.parseLong(snapshotIdStr); + if (!snapshotIdExists(table, snapshot)) { + throw CompilationException.create(ErrorCode.ICEBERG_SNAPSHOT_ID_NOT_FOUND, snapshot); + } + } else if (asOfTimestampStr != null) { + try { + snapshot = SnapshotUtil.snapshotIdAsOfTime(table, Long.parseLong(asOfTimestampStr)); + if (!snapshotIdExists(table, snapshot)) { + throw CompilationException.create(ErrorCode.ICEBERG_SNAPSHOT_ID_NOT_FOUND, snapshot); + } + } catch (IllegalArgumentException e) { + throw CompilationException.create(EXTERNAL_SOURCE_ERROR, e, getMessageOrToString(e)); + } + } else { + if (table.currentSnapshot() == null) { + throw CompilationException.create(EXTERNAL_SOURCE_ERROR, "table " + table.name() + " has no snapshots"); + } + snapshot = table.currentSnapshot().snapshotId(); + } + + scan = scan.useSnapshot(snapshot); + pinSnapshotId(configurationCopy, table, snapshot); + return scan; + } + + private void pinSnapshotId(Map<String, String> configurationCopy, Table table, long snapshotId) { + Snapshot snapshot = table.snapshot(snapshotId); + configurationCopy.put(ICEBERG_SNAPSHOT_ID_PROPERTY_KEY, String.valueOf(snapshot.snapshotId())); + configurationCopy.put(ICEBERG_SCHEMA_ID_PROPERTY_KEY, Integer.toString(snapshot.schemaId())); + } + public static class PartitionWorkLoadBasedOnSize implements Serializable { private static final long serialVersionUID = 3L; private final List<FileScanTask> fileScanTasks = new ArrayList<>(); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergConstants.java index 3921e36..b3ba665 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergConstants.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergConstants.java @@ -30,6 +30,7 @@ public static final String ICEBERG_TABLE_NAME_PROPERTY_KEY = "tableName"; public static final String ICEBERG_NAMESPACE_PROPERTY_KEY = "namespace"; public static final String ICEBERG_SNAPSHOT_ID_PROPERTY_KEY = "snapshotId"; + public static final String ICEBERG_SCHEMA_ID_PROPERTY_KEY = "schemaId"; public static final String ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY = "snapshotTimestamp"; public static final String ICEBERG_PROPERTY_PREFIX_INTERNAL = "catalog-property#"; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java index 3569aca..30a6858 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java @@ -18,9 +18,12 @@ */ package org.apache.asterix.external.util.iceberg; +import static org.apache.asterix.common.exceptions.ErrorCode.PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT; import static org.apache.asterix.common.exceptions.ErrorCode.UNSUPPORTED_ICEBERG_DATA_FORMAT; import static org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_AVRO_FORMAT; import static org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_PARQUET_FORMAT; +import static org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_SNAPSHOT_ID_PROPERTY_KEY; +import static org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY; import static org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_TABLE_FORMAT; import java.io.IOException; @@ -39,7 +42,7 @@ import org.apache.asterix.external.util.aws.glue.GlueUtils; import org.apache.asterix.external.util.google.biglake_metastore.BiglakeMetastore; import org.apache.asterix.om.types.ARecordType; -import org.apache.iceberg.TableScan; +import org.apache.iceberg.Table; import org.apache.iceberg.aws.glue.GlueCatalog; import org.apache.iceberg.catalog.Catalog; @@ -105,11 +108,38 @@ } public static void validateIcebergTableProperties(Map<String, String> properties) throws CompilationException { + // required table name String tableName = properties.get(IcebergConstants.ICEBERG_TABLE_NAME_PROPERTY_KEY); if (tableName == null || tableName.isEmpty()) { throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, IcebergConstants.ICEBERG_TABLE_NAME_PROPERTY_KEY); } + + // required namespace + String namespace = properties.get(IcebergConstants.ICEBERG_NAMESPACE_PROPERTY_KEY); + if (namespace == null || namespace.isEmpty()) { + throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, + IcebergConstants.ICEBERG_NAMESPACE_PROPERTY_KEY); + } + + // validate snapshot id and timestamp + String snapshotId = properties.get(ICEBERG_SNAPSHOT_ID_PROPERTY_KEY); + String snapshotTimestamp = properties.get(ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY); + if (snapshotId != null && snapshotTimestamp != null) { + throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, + ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY, ICEBERG_SNAPSHOT_ID_PROPERTY_KEY); + } + + try { + if (snapshotId != null) { + Long.parseLong(snapshotId); + } else if (snapshotTimestamp != null) { + Long.parseLong(snapshotTimestamp); + } + } catch (NumberFormatException e) { + throw new CompilationException(ErrorCode.INVALID_ICEBERG_SNAPSHOT_VALUE, + snapshotId != null ? snapshotId : snapshotTimestamp); + } } /** @@ -186,17 +216,8 @@ } } - public static TableScan setSnapshot(Map<String, String> configuration, TableScan scan) { - String snapshot = configuration.get(IcebergConstants.ICEBERG_SNAPSHOT_ID_PROPERTY_KEY); - if (snapshot != null) { - scan = scan.useSnapshot(Long.parseLong(snapshot)); - } else { - String asOfTimestamp = configuration.get(IcebergConstants.ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY); - if (asOfTimestamp != null) { - scan = scan.asOfTime(Long.parseLong(asOfTimestamp)); - } - } - return scan; + public static boolean snapshotIdExists(Table table, long snapshot) { + return table.snapshot(snapshot) != null; } public static String[] getProjectedFields(Map<String, String> configuration) throws IOException { -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20689?usp=email To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings?usp=email Gerrit-MessageType: merged Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Change-Id: I0d9d962720e3a8fb4935d35ec3efa50c5ffc030a Gerrit-Change-Number: 20689 Gerrit-PatchSet: 5 Gerrit-Owner: Hussain Towaileb <[email protected]> Gerrit-Reviewer: Hussain Towaileb <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Peeyush Gupta <[email protected]>
