>From Hussain Towaileb <[email protected]>: Hussain Towaileb has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21263?usp=email )
Change subject: [NO ISSUE][EXT]: misc iceberg fixes ...................................................................... [NO ISSUE][EXT]: misc iceberg fixes Details: - Ensure "Bearer " is not part of bearer token when passed. - Add serialization replacement for Nessie exceptions. - Ensure projected colums filtration is applied. - Fix handling of serializing binary with respect to arrays. - Ensure record field are accessed by name instead of position. Ext-ref: MB-71912 Change-Id: If41bc97040b0ef6dfd45ca085c61e796d0e63c5a Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21263 Integration-Tests: Jenkins <[email protected]> Reviewed-by: Hussain Towaileb <[email protected]> Tested-by: Hussain Towaileb <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java M asterixdb/asterix-app/src/test/resources/runtimets/results/iceberg/all-data-types/result.020.adm M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergFileRecordReader.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/IcebergParquetDataParser.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/iceberg/auth/GoogleAuthSession.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/nessie/NessieUtils.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/rest/RestConstants.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/rest/RestUtils.java 10 files changed, 71 insertions(+), 35 deletions(-) Approvals: Murtadha Hubail: Looks good to me, approved Jenkins: Verified Anon. E. Moose #1000171: Hussain Towaileb: Looks good to me, but someone else must approve; Verified diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java index 6d17fef..822ebc1 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java @@ -139,6 +139,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.projectnessie.client.http.HttpClientException; +import org.projectnessie.client.rest.NessieServiceException; import org.projectnessie.error.BaseNessieClientServerException; import com.azure.storage.blob.models.BlobStorageException; @@ -168,6 +169,7 @@ registerReplacement(BlobStorageException.class, SerializableExceptionProxy::new); registerReplacement(DataLakeStorageException.class, SerializableExceptionProxy::new); registerReplacement(BaseNessieClientServerException.class, SerializableExceptionProxy::new); + registerReplacement(NessieServiceException.class, SerializableExceptionProxy::new); registerReplacement(HttpClientException.class, SerializableExceptionProxy::new); } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java index 7566206..be43cc0 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java @@ -117,6 +117,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.projectnessie.client.http.HttpClientException; +import org.projectnessie.client.rest.NessieServiceException; import org.projectnessie.error.BaseNessieClientServerException; import com.azure.storage.blob.models.BlobStorageException; @@ -155,6 +156,7 @@ registerReplacement(BlobStorageException.class, SerializableExceptionProxy::new); registerReplacement(DataLakeStorageException.class, SerializableExceptionProxy::new); registerReplacement(BaseNessieClientServerException.class, SerializableExceptionProxy::new); + registerReplacement(NessieServiceException.class, SerializableExceptionProxy::new); registerReplacement(HttpClientException.class, SerializableExceptionProxy::new); } diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/iceberg/all-data-types/result.020.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/iceberg/all-data-types/result.020.adm index 2835651..d5f405e 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/iceberg/all-data-types/result.020.adm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/iceberg/all-data-types/result.020.adm @@ -1 +1 @@ -{ "binary_field": "AQIDBAUGBwg=", "fixed_field": "SGVsbG8gV29ybGQ=", "geometry_field": "AQIDBAUGBwgJCg==", "geography_field": "AQIDBAUGBwgJCg0=", "bool_field": true, "byte_field": 42, "short_field": 1000, "int_field": 42, "long_field": 9223372036854775807, "float_field": 3.14, "double_field": 2.718281828459045, "decimal_field": 12345.6789, "string_field": "hello world", "varchar_field": "varchar value one", "char_field": "Hi", "uuid_field": uuid("550e8400-e29b-41d4-a716-446655440000"), "date_field": 19723, "time_field": 37230000, "timestamp_field": 1707000000000, "timestamp_ntz_field": 1707048000000, "timestamp_nano_field": 1707000000000, "interval_ym_field": 14, "interval_dt_field": 37230000000, "struct_field": { "name": "Alice", "age": 30, "active": true }, "list_field": [ "a", "b", "c" ], "map_field": { "key1": "value1", "key2": "100" }, "variant_field": "string value", "unknown_field": null } \ No newline at end of file +{ "binary_field": "AQIDBAUGBwg=", "fixed_field": "SGVsbG8gV29ybGQ=", "geometry_field": "AQIDBAUGBwgJCg==", "geography_field": "AQIDBAUGBwgJCg0=", "bool_field": true, "byte_field": 42, "short_field": 1000, "int_field": 42, "long_field": 9223372036854775807, "float_field": 3.14, "double_field": 2.718281828459045, "decimal_field": 12345.6789, "string_field": "hello world", "varchar_field": "varchar value one", "char_field": "Hi", "uuid_field": uuid("550e8400-e29b-41d4-a716-446655440000"), "date_field": 19723, "time_field": 37230000, "timestamp_field": 1707000000000000, "timestamp_ntz_field": 1707048000000000, "timestamp_nano_field": 1707000000000000000, "interval_ym_field": 14, "interval_dt_field": 37230000000, "struct_field": { "name": "Alice", "age": 30, "active": true }, "list_field": [ "a", "b", "c" ], "map_field": { "key1": "value1", "key2": "100" }, "variant_field": "string value", "unknown_field": null } \ No newline at end of file diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergFileRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergFileRecordReader.java index 6949ef8..06ff6ef 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergFileRecordReader.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergFileRecordReader.java @@ -33,6 +33,8 @@ import org.apache.asterix.external.util.iceberg.IcebergConstants; import org.apache.asterix.external.util.iceberg.IcebergUtils; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.util.CleanupUtils; +import org.apache.hyracks.api.util.ExceptionUtils; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; @@ -76,7 +78,8 @@ try { initializeTable(); } catch (CompilationException e) { - throw HyracksDataException.create(e); + Throwable throwable = closeResources(e); + throw HyracksDataException.create(throwable); } } @@ -158,17 +161,17 @@ @Override public void close() throws IOException { - if (iterable != null) { - iterable.close(); - } - if (tableFileIo != null) { - tableFileIo.close(); - } - + Throwable throwable = CleanupUtils.closeSilently(iterable, null); + throwable = CleanupUtils.closeSilently(tableFileIo, throwable); try { - IcebergUtils.closeAndCleanup(catalog, catalogProperties); - } catch (CompilationException e) { - throw HyracksDataException.create(e); + if (catalog != null) { + IcebergUtils.closeAndCleanup(catalog, catalogProperties); + } + } catch (Exception ex) { + throwable = ExceptionUtils.suppress(throwable, ex); + } + if (throwable != null) { + throw HyracksDataException.create(throwable); } } @@ -219,4 +222,18 @@ } throw new IllegalStateException("Snapshot must've been pinned during compilation phase"); } + + private Throwable closeResources(Throwable throwable) { + if (tableFileIo != null) { + throwable = CleanupUtils.closeSilently(tableFileIo, throwable); + } + if (catalog != null) { + try { + IcebergUtils.closeAndCleanup(catalog, catalogProperties); + } catch (Exception ex) { + throwable = ExceptionUtils.suppress(throwable, ex); + } + } + return throwable; + } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java index e8f7d91..17d24ed 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java @@ -152,7 +152,7 @@ if (projectedFields != null && projectedFields.length > 0) { projectedSchema = projectedSchema.select(projectedFields); } - scan.project(projectedSchema); + scan = scan.project(projectedSchema); Expression filterExpression = ((IcebergTableFilterEvaluatorFactory) filterEvaluatorFactory).getFilterExpression(); if (filterExpression != null) { diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/IcebergParquetDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/IcebergParquetDataParser.java index a01be90..e88a9cc 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/IcebergParquetDataParser.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/IcebergParquetDataParser.java @@ -25,11 +25,13 @@ import java.io.IOException; import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.time.Instant; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; import java.time.OffsetDateTime; import java.time.ZoneId; +import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -95,13 +97,14 @@ NestedField field = projectedSchema.columns().get(i); String fieldName = field.name(); Type fieldType = field.type(); - ATypeTag typeTag = getTypeTag(fieldType, record.get(i) == null, parserContext); + Object fieldValue = record.getField(fieldName); + ATypeTag typeTag = getTypeTag(fieldType, fieldValue == null, parserContext); IValueReference value; if (valueEmbedder.shouldEmbed(fieldName, typeTag)) { value = valueEmbedder.getEmbeddedValue(); } else { valueBuffer.reset(); - parseValue(fieldType, record.get(i), valueBuffer.getDataOutput()); + parseValue(fieldType, fieldValue, valueBuffer.getDataOutput()); value = valueBuffer; } @@ -308,7 +311,14 @@ private void serializeBinary(Object value, DataOutput out) throws HyracksDataException { ByteBuffer byteBuffer = (ByteBuffer) value; - aBinary.setValue(byteBuffer.array(), 0, byteBuffer.array().length); + if (byteBuffer.hasArray()) { + aBinary.setValue(byteBuffer.array(), byteBuffer.arrayOffset() + byteBuffer.position(), + byteBuffer.remaining()); + } else { + byte[] bytes = new byte[byteBuffer.remaining()]; + byteBuffer.duplicate().get(bytes); + aBinary.setValue(bytes, 0, bytes.length); + } binarySerde.serialize(aBinary, out); } @@ -340,29 +350,25 @@ } public void serializeTimestamp(Type type, Object value, DataOutput output) throws HyracksDataException { - long timestampInMillis; + Instant instant; switch (value) { - case OffsetDateTime offsetDateTime -> - timestampInMillis = offsetDateTime.toInstant().toEpochMilli(); - + case OffsetDateTime offsetDateTime -> instant = offsetDateTime.toInstant(); case LocalDateTime localDateTime -> { ZoneId zoneId = parserContext.getTimeZoneId(); - timestampInMillis = localDateTime.atZone(zoneId).toInstant().toEpochMilli(); + instant = localDateTime.atZone(zoneId).toInstant(); } - - case null, default -> { - throw RuntimeDataException.create( - ErrorCode.EXTERNAL_SOURCE_ERROR, - value == null - ? "unexpected null value for field type (" + type + ")" - : "unexpected value type (" + value.getClass() + ") for field type (" + type + ")"); - } + case null, default -> throw RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, + value == null ? "unexpected null value for field type (" + type + ")" + : "unexpected value type (" + value.getClass() + ") for field type (" + type + ")"); } if (parserContext.isTimestampAsLong()) { - serializeLong(timestampInMillis, output); + long timestampAsLong = type.typeId() == Type.TypeID.TIMESTAMP_NANO + ? ChronoUnit.NANOS.between(Instant.EPOCH, instant) + : ChronoUnit.MICROS.between(Instant.EPOCH, instant); + serializeLong(timestampAsLong, output); } else { - aDateTime.setValue(timestampInMillis); + aDateTime.setValue(instant.toEpochMilli()); datetimeSerde.serialize(aDateTime, output); } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/iceberg/auth/GoogleAuthSession.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/iceberg/auth/GoogleAuthSession.java index 7acaeb3..dbd5789 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/iceberg/auth/GoogleAuthSession.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/iceberg/auth/GoogleAuthSession.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.UncheckedIOException; +import org.apache.asterix.external.util.iceberg.rest.RestConstants; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.rest.HTTPHeaders; import org.apache.iceberg.rest.HTTPRequest; @@ -65,8 +66,8 @@ AccessToken token = credentials.getAccessToken(); if (token != null && token.getTokenValue() != null) { - HTTPHeaders newHeaders = request.headers().putIfAbsent( - HTTPHeaders.of(HTTPHeaders.HTTPHeader.of("Authorization", "Bearer " + token.getTokenValue()))); + HTTPHeaders newHeaders = request.headers().putIfAbsent(HTTPHeaders.of(HTTPHeaders.HTTPHeader + .of("Authorization", RestConstants.BEARER_TOKEN_PREFIX + token.getTokenValue()))); return newHeaders.equals(request.headers()) ? request : ImmutableHTTPRequest.builder().from(request).headers(newHeaders).build(); } else { diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/nessie/NessieUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/nessie/NessieUtils.java index f0df588..f25d20f 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/nessie/NessieUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/nessie/NessieUtils.java @@ -49,6 +49,7 @@ import java.util.Map; import org.apache.asterix.common.exceptions.CompilationException; +import org.apache.asterix.external.util.iceberg.rest.RestConstants; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.nessie.NessieCatalog; @@ -158,7 +159,10 @@ if (notAllowed != null) { throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed, BEARER_TOKEN_FIELD_NAME); } - + // strip "Bearer " prefix if present - the Nessie client adds it internally + if (bearerToken.startsWith(RestConstants.BEARER_TOKEN_PREFIX)) { + bearerToken = bearerToken.substring(RestConstants.BEARER_TOKEN_PREFIX.length()); + } catalogProperties.put(NESSIE_AUTHENTICATION_TYPE_FIELD_NAME, NESSIE_AUTHENTICATION_TYPE_BEARER); catalogProperties.put(NESSIE_AUTHENTICATION_BEARER_TOKEN_FIELD_NAME, bearerToken); } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/rest/RestConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/rest/RestConstants.java index 3b278c0..fe28ea5 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/rest/RestConstants.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/rest/RestConstants.java @@ -28,4 +28,5 @@ public static final String ICEBERG_CREDENTIAL_PROPERTY_NAME = "credential"; public static final String ICEBERG_SCOPE_PROPERTY_NAME = "scope"; + public static final String BEARER_TOKEN_PREFIX = "Bearer "; } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/rest/RestUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/rest/RestUtils.java index 57121b8..820e44b 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/rest/RestUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/rest/RestUtils.java @@ -134,7 +134,10 @@ if (notAllowed != null) { throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed, BEARER_TOKEN_FIELD_NAME); } - + // Strip "Bearer " prefix if present - the Iceberg REST client adds it internally + if (bearerToken.startsWith(RestConstants.BEARER_TOKEN_PREFIX)) { + bearerToken = bearerToken.substring(RestConstants.BEARER_TOKEN_PREFIX.length()); + } catalogProperties.put(AuthProperties.AUTH_TYPE, AuthProperties.AUTH_TYPE_OAUTH2); catalogProperties.put(RestConstants.ICEBERG_BEARER_TOKEN_PROPERTY_NAME, bearerToken); } -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21263?usp=email To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings?usp=email Gerrit-MessageType: merged Gerrit-Project: asterixdb Gerrit-Branch: lumina Gerrit-Change-Id: If41bc97040b0ef6dfd45ca085c61e796d0e63c5a Gerrit-Change-Number: 21263 Gerrit-PatchSet: 5 Gerrit-Owner: Hussain Towaileb <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Hussain Towaileb <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]>
