>From Hussain Towaileb <[email protected]>:
Hussain Towaileb has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20689?usp=email )
Change subject: [ASTERIXDB-3634][EXT]: Add support to Iceberg pt.4
......................................................................
[ASTERIXDB-3634][EXT]: Add support to Iceberg pt.4
Details:
- Validate table properties at creation.
- namespace is required when creation iceberg table.
- validate snapshot id/timestamp at creation.
- pin snapshot id during compile and runtime phases.
- properly handle deletes when reading tables.
Ext-ref: MB-63115
Change-Id: I0d9d962720e3a8fb4935d35ec3efa50c5ffc030a
---
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
M asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
M asterixdb/asterix-external-data/pom.xml
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergFileRecordReader.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java
7 files changed, 121 insertions(+), 17 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/89/20689/1
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 3a62746..c3f7c55 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -1193,6 +1193,7 @@
return;
}
IcebergUtils.setDefaultFormat(properties);
+ IcebergUtils.validateIcebergTableProperties(properties);
// ensure the specified catalog exists
String catalogName =
properties.get(IcebergConstants.ICEBERG_CATALOG_NAME);
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 29aa4a5..5eb48e3 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -343,6 +343,8 @@
ICEBERG_NAMESPACE_DOES_NOT_EXIST(1235),
ICEBERG_TABLE_DOES_NOT_EXIST(1236),
UNSUPPORTED_ICEBERG_DATA_FORMAT(1237),
+ ICEBERG_SNAPSHOT_ID_NOT_FOUND(1238),
+ INVALID_ICEBERG_SNAPSHOT_VALUE(1239),
// Feed errors
DATAFLOW_ILLEGAL_STATE(3001),
diff --git
a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 607eb0b..2968701 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -345,6 +345,8 @@
1235 = The provided Iceberg namespace '%1$s' does not exist
1236 = The provided Iceberg table '%1$s' does not exist
1237 = The provided data format '%1$s' is not supported for Iceberg tables
+1238 = Iceberg snapshot '%1$s' not found
+1239 = Invalid Iceberg snapshot value: '%1$s'
# Feed Errors
3001 = Illegal state.
diff --git a/asterixdb/asterix-external-data/pom.xml
b/asterixdb/asterix-external-data/pom.xml
index c9ff38d..283a7ac 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -649,6 +649,10 @@
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-parquet</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-data</artifactId>
+ </dependency>
</dependencies>
<!-- apply patch for HADOOP-17225 to workaround CVE-2019-10172 -->
<repositories>
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergFileRecordReader.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergFileRecordReader.java
index 9f779b8..4f71d49 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergFileRecordReader.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergFileRecordReader.java
@@ -35,10 +35,12 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.GenericDeleteFilter;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.io.CloseableIterable;
@@ -60,6 +62,7 @@
private int nextTaskIndex = 0;
private Catalog catalog;
private FileIO tableFileIo;
+ private Schema schemaAtSnapshot;
private CloseableIterable<Record> iterable;
private Iterator<Record> recordsIterator;
@@ -91,8 +94,14 @@
}
Table table = catalog.loadTable(tableIdentifier);
tableFileIo = table.io();
+
+ // we always have a snapshot id since we pin it at compile time
+ long snapshotId = getSnapshotId(configuration);
+ Snapshot snapshot = table.snapshot(snapshotId);
+ schemaAtSnapshot = table.schemas().get(snapshot.schemaId());
}
+ @Override
public boolean hasNext() throws Exception {
// iterator has more records
if (recordsIterator != null && recordsIterator.hasNext()) {
@@ -169,9 +178,24 @@
private void setNextRecordsIterator() {
FileScanTask task = fileScanTasks.get(nextTaskIndex++);
+ GenericDeleteFilter deleteFilter =
+ new GenericDeleteFilter(tableFileIo, task, schemaAtSnapshot,
projectedSchema);
+ Schema requiredSchema = deleteFilter.requiredSchema();
InputFile inFile = tableFileIo.newInputFile(task.file().location());
- iterable =
Parquet.read(inFile).project(projectedSchema).split(task.start(), task.length())
- .createReaderFunc(fs ->
GenericParquetReaders.buildReader(projectedSchema, fs)).build();
- recordsIterator = iterable.iterator();
+
+ iterable =
+
Parquet.read(inFile).project(requiredSchema).filter(task.residual()).split(task.start(),
task.length())
+ .createReaderFunc(fs ->
GenericParquetReaders.buildReader(requiredSchema, fs)).build();
+
+ recordsIterator = deleteFilter.filter(iterable).iterator();
+ }
+
+ private long getSnapshotId(Map<String, String> configuration) {
+ String snapshotStr =
configuration.get(IcebergConstants.ICEBERG_SNAPSHOT_ID_PROPERTY_KEY);
+ if (snapshotStr != null) {
+ return Long.parseLong(snapshotStr);
+ }
+ // should not reach here as we validate at compile time
+ throw new IllegalStateException("Snapshot must've been pinned during
compilation phase");
}
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java
index f9af1d1..f782a95 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java
@@ -18,8 +18,11 @@
*/
package org.apache.asterix.external.input.record.reader.aws.iceberg;
+import static
org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_SNAPSHOT_ID_PROPERTY_KEY;
+import static
org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY;
import static
org.apache.asterix.external.util.iceberg.IcebergUtils.getProjectedFields;
-import static
org.apache.asterix.external.util.iceberg.IcebergUtils.setSnapshot;
+import static
org.apache.asterix.external.util.iceberg.IcebergUtils.snapshotIdExists;
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
import java.io.Serializable;
import java.util.ArrayList;
@@ -56,6 +59,7 @@
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.util.SnapshotUtil;
public class IcebergParquetRecordReaderFactory implements
IIcebergRecordReaderFactory<Record> {
@@ -130,7 +134,7 @@
Table table = catalog.loadTable(tableIdentifier);
TableScan scan = table.newScan();
- scan = setSnapshot(configuration, scan);
+ scan = setAndPinScanSnapshot(configuration, configurationCopy,
table, scan);
String[] projectedFields = getProjectedFields(configuration);
projectedSchema = table.schema();
if (projectedFields != null && projectedFields.length > 0) {
@@ -182,6 +186,52 @@
return projectedSchema;
}
+ /**
+ * Sets the snapshot id (or timestamp) if present and pin it to be used by
both compile and runtime phases. If no
+ * snapshot is provided, the latest snapshot is used and pinned.
+ *
+ * @param configuration configuration
+ * @param configurationCopy configurationCopy
+ * @param table table
+ * @param scan scan
+ * @return table scan
+ * @throws CompilationException CompilationException
+ */
+ private TableScan setAndPinScanSnapshot(Map<String, String> configuration,
Map<String, String> configurationCopy,
+ Table table, TableScan scan) throws CompilationException {
+ long snapshot;
+ String snapshotIdStr =
configuration.get(ICEBERG_SNAPSHOT_ID_PROPERTY_KEY);
+ String asOfTimestampStr =
configuration.get(ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY);
+
+ if (snapshotIdStr != null) {
+ snapshot = Long.parseLong(snapshotIdStr);
+ if (!snapshotIdExists(table, snapshot)) {
+ throw
CompilationException.create(ErrorCode.ICEBERG_SNAPSHOT_ID_NOT_FOUND, snapshot);
+ }
+ } else if (asOfTimestampStr != null) {
+ try {
+ snapshot = SnapshotUtil.snapshotIdAsOfTime(table,
Long.parseLong(asOfTimestampStr));
+ } catch (IllegalArgumentException e) {
+ throw
CompilationException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, e,
getMessageOrToString(e));
+ }
+ } else {
+ snapshot = table.currentSnapshot().snapshotId();
+ }
+
+ scan = scan.useSnapshot(snapshot);
+ pinSnapshotId(configuration, configurationCopy, snapshot);
+ return scan;
+ }
+
+ private void pinSnapshotId(Map<String, String> configuration, Map<String,
String> configurationCopy,
+ long snapshotId) {
+ String snapshotIdStr = Long.toString(snapshotId);
+ configuration.remove(ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY);
+ configurationCopy.remove(ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY);
+ configuration.put(ICEBERG_SNAPSHOT_ID_PROPERTY_KEY, snapshotIdStr);
+ configurationCopy.put(ICEBERG_SNAPSHOT_ID_PROPERTY_KEY, snapshotIdStr);
+ }
+
public static class PartitionWorkLoadBasedOnSize implements Serializable {
private static final long serialVersionUID = 3L;
private final List<FileScanTask> fileScanTasks = new ArrayList<>();
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java
index 3569aca..30a6858 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java
@@ -18,9 +18,12 @@
*/
package org.apache.asterix.external.util.iceberg;
+import static
org.apache.asterix.common.exceptions.ErrorCode.PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT;
import static
org.apache.asterix.common.exceptions.ErrorCode.UNSUPPORTED_ICEBERG_DATA_FORMAT;
import static
org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_AVRO_FORMAT;
import static
org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_PARQUET_FORMAT;
+import static
org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_SNAPSHOT_ID_PROPERTY_KEY;
+import static
org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY;
import static
org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_TABLE_FORMAT;
import java.io.IOException;
@@ -39,7 +42,7 @@
import org.apache.asterix.external.util.aws.glue.GlueUtils;
import
org.apache.asterix.external.util.google.biglake_metastore.BiglakeMetastore;
import org.apache.asterix.om.types.ARecordType;
-import org.apache.iceberg.TableScan;
+import org.apache.iceberg.Table;
import org.apache.iceberg.aws.glue.GlueCatalog;
import org.apache.iceberg.catalog.Catalog;
@@ -105,11 +108,38 @@
}
public static void validateIcebergTableProperties(Map<String, String>
properties) throws CompilationException {
+ // required table name
String tableName =
properties.get(IcebergConstants.ICEBERG_TABLE_NAME_PROPERTY_KEY);
if (tableName == null || tableName.isEmpty()) {
throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED,
IcebergConstants.ICEBERG_TABLE_NAME_PROPERTY_KEY);
}
+
+ // required namespace
+ String namespace =
properties.get(IcebergConstants.ICEBERG_NAMESPACE_PROPERTY_KEY);
+ if (namespace == null || namespace.isEmpty()) {
+ throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED,
+ IcebergConstants.ICEBERG_NAMESPACE_PROPERTY_KEY);
+ }
+
+ // validate snapshot id and timestamp
+ String snapshotId = properties.get(ICEBERG_SNAPSHOT_ID_PROPERTY_KEY);
+ String snapshotTimestamp =
properties.get(ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY);
+ if (snapshotId != null && snapshotTimestamp != null) {
+ throw new
CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT,
+ ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY,
ICEBERG_SNAPSHOT_ID_PROPERTY_KEY);
+ }
+
+ try {
+ if (snapshotId != null) {
+ Long.parseLong(snapshotId);
+ } else if (snapshotTimestamp != null) {
+ Long.parseLong(snapshotTimestamp);
+ }
+ } catch (NumberFormatException e) {
+ throw new
CompilationException(ErrorCode.INVALID_ICEBERG_SNAPSHOT_VALUE,
+ snapshotId != null ? snapshotId : snapshotTimestamp);
+ }
}
/**
@@ -186,17 +216,8 @@
}
}
- public static TableScan setSnapshot(Map<String, String> configuration,
TableScan scan) {
- String snapshot =
configuration.get(IcebergConstants.ICEBERG_SNAPSHOT_ID_PROPERTY_KEY);
- if (snapshot != null) {
- scan = scan.useSnapshot(Long.parseLong(snapshot));
- } else {
- String asOfTimestamp =
configuration.get(IcebergConstants.ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY);
- if (asOfTimestamp != null) {
- scan = scan.asOfTime(Long.parseLong(asOfTimestamp));
- }
- }
- return scan;
+ public static boolean snapshotIdExists(Table table, long snapshot) {
+ return table.snapshot(snapshot) != null;
}
public static String[] getProjectedFields(Map<String, String>
configuration) throws IOException {
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20689?usp=email
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings?usp=email
Gerrit-MessageType: newchange
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I0d9d962720e3a8fb4935d35ec3efa50c5ffc030a
Gerrit-Change-Number: 20689
Gerrit-PatchSet: 1
Gerrit-Owner: Hussain Towaileb <[email protected]>