>From Hussain Towaileb <[email protected]>: Hussain Towaileb has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21192?usp=email )
Change subject: [NO ISSUE][EXT]: add knobs to s3 parquet ...................................................................... [NO ISSUE][EXT]: add knobs to s3 parquet Details: - add inputStreamType knob - add changeDetectionMode knob Ext-ref: MB-71701 Change-Id: I404dd7dc013944409482b8e4d77938e843ed2ba8 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21192 Integration-Tests: Jenkins <[email protected]> Reviewed-by: Hussain Towaileb <[email protected]> Tested-by: Hussain Towaileb <[email protected]> --- M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java M asterixdb/asterix-om/src/main/java/org/apache/asterix/object/base/AdmObjectNode.java 3 files changed, 112 insertions(+), 9 deletions(-) Approvals: Hussain Towaileb: Looks good to me, approved; Verified Jenkins: Verified diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java index 2e5d213..79f12be 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java @@ -48,11 +48,21 @@ public static final String HADOOP_SERVICE_END_POINT = "fs.s3a.endpoint"; public static final String HADOOP_REGION = "fs.s3a.endpoint.region"; - // input stream + // input stream type + public static final String INPUT_STREAM_TYPE_FIELD_NAME = "inputStreamType"; public static final String HADOOP_INPUT_STREAM_TYPE = "fs.s3a.input.stream.type"; + public static final String HADOOP_INPUT_STREAM_TYPE_VAL_AUTO = "auto"; public static final String HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC = "classic"; public static final String HADOOP_INPUT_STREAM_TYPE_VAL_ANALYTICS = "analytics"; + // change detection mode + public static final String CHANGE_DETECTION_MODE_FIELD_NAME = "changeDetectionMode"; + public static final String HADOOP_CHANGE_DETECTION_MODE = "fs.s3a.change.detection.mode"; + public static final String HADOOP_CHANGE_DETECTION_MODE_VAL_AUTO = "auto"; + public static final String HADOOP_CHANGE_DETECTION_MODE_VAL_SERVER = "server"; + public static final String HADOOP_CHANGE_DETECTION_MODE_VAL_CLIENT = "client"; + public static final String HADOOP_CHANGE_DETECTION_MODE_VAL_NONE = "none"; + /* * Internal configurations */ diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java index 07424b1..7556187 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java @@ -43,6 +43,7 @@ import static org.apache.asterix.external.util.aws.AwsUtils.getCrossRegion; import static org.apache.asterix.external.util.aws.AwsUtils.getPathStyleAddressing; import static org.apache.asterix.external.util.aws.AwsUtils.getRegion; +import static org.apache.asterix.external.util.aws.s3.S3Constants.CHANGE_DETECTION_MODE_FIELD_NAME; import static org.apache.asterix.external.util.aws.s3.S3Constants.FILES; import static org.apache.asterix.external.util.aws.s3.S3Constants.FOLDERS; import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ACCESS_KEY_ID; @@ -54,9 +55,15 @@ import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUME_ROLE_REGION; import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUME_ROLE_SESSION_DURATION; import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUME_ROLE_SESSION_NAME; +import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CHANGE_DETECTION_MODE_VAL_AUTO; +import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CHANGE_DETECTION_MODE_VAL_CLIENT; +import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CHANGE_DETECTION_MODE_VAL_NONE; +import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CHANGE_DETECTION_MODE_VAL_SERVER; import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CREDENTIALS_TO_ASSUME_ROLE_KEY; import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CREDENTIAL_PROVIDER_KEY; import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_INPUT_STREAM_TYPE; +import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_INPUT_STREAM_TYPE_VAL_ANALYTICS; +import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_INPUT_STREAM_TYPE_VAL_AUTO; import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC; import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_INSTANCE_PROFILE; import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_PATH_STYLE_ACCESS; @@ -67,6 +74,7 @@ import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_SESSION_TOKEN; import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_SIMPLE; import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_TEMPORARY; +import static org.apache.asterix.external.util.aws.s3.S3Constants.INPUT_STREAM_TYPE_FIELD_NAME; import static org.apache.asterix.external.util.aws.s3.S3Constants.PATH_STYLE_ADDRESSING_FIELD_NAME; import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString; @@ -190,7 +198,6 @@ } awsClients.setConsumingClient(builder.build()); return awsClients; - } @AiProvenance(agent = AiProvenance.Agent.CLAUDE_SONNET_4_6, tool = AiProvenance.Tool.GITHUB_COPILOT) @@ -285,15 +292,11 @@ if (serviceEndpoint != null) { // Validation of the URL should be done at hadoop-aws level jobConf.set(HADOOP_SERVICE_END_POINT, serviceEndpoint); - - // The analytics-accelerator stream factory (default in Hadoop 3.4+) performs a HeadObject call during - // stream initialization to fetch the ETag. Non-AWS S3-compatible endpoints (e.g. mock servers) may not - // return an ETag on HeadObject, which causes a NullPointerException. Fall back to the classic stream - // implementation when a custom service endpoint is in use. - // TODO: make configurable - jobConf.set(HADOOP_INPUT_STREAM_TYPE, HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC); } + setInputStreamType(configuration, jobConf, serviceEndpoint); + setChangeDetectionMode(configuration, jobConf, serviceEndpoint); + boolean pathStyleAddressing = validateAndGetPathStyleAddressing(configuration.get(PATH_STYLE_ADDRESSING_FIELD_NAME), serviceEndpoint); if (pathStyleAddressing) { @@ -310,6 +313,48 @@ } } + private static void setInputStreamType(Map<String, String> configuration, JobConf jobConf, String serviceEndpoint) { + String configuredInputStreamType = configuration.get(INPUT_STREAM_TYPE_FIELD_NAME); + if (configuredInputStreamType == null) { + configuredInputStreamType = HADOOP_INPUT_STREAM_TYPE_VAL_AUTO; + } + + if (HADOOP_INPUT_STREAM_TYPE_VAL_AUTO.equals(configuredInputStreamType)) { + // Auto mode: decide based on endpoint + if (serviceEndpoint != null) { + // The analytics-accelerator stream factory (default in Hadoop 3.4+) performs a HeadObject call during + // stream initialization to fetch the ETag. Non-AWS S3-compatible endpoints may not return an ETag on + // HeadObject, which causes a NullPointerException. Fall back to the classic stream + // implementation when a custom service endpoint is in use. + jobConf.set(HADOOP_INPUT_STREAM_TYPE, HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC); + } else { + jobConf.set(HADOOP_INPUT_STREAM_TYPE, HADOOP_INPUT_STREAM_TYPE_VAL_ANALYTICS); + } + } else { + // Explicit override: use the user-specified stream type + jobConf.set(HADOOP_INPUT_STREAM_TYPE, configuredInputStreamType); + } + } + + private static void setChangeDetectionMode(Map<String, String> configuration, JobConf jobConf, + String serviceEndpoint) { + String configuredChangeDetectionMode = configuration.get(CHANGE_DETECTION_MODE_FIELD_NAME); + if (configuredChangeDetectionMode == null) { + configuredChangeDetectionMode = HADOOP_CHANGE_DETECTION_MODE_VAL_AUTO; + } + + String inputStreamType = jobConf.get(HADOOP_INPUT_STREAM_TYPE); + if (HADOOP_CHANGE_DETECTION_MODE_VAL_AUTO.equals(configuredChangeDetectionMode)) { + // If using a custom endpoint with classic streams, default to no change detection as + // S3-compatible implementations may not support ETag/version metadata consistently. + if (serviceEndpoint != null && HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC.equals(inputStreamType)) { + jobConf.set(S3Constants.HADOOP_CHANGE_DETECTION_MODE, HADOOP_CHANGE_DETECTION_MODE_VAL_NONE); + } + } else { + jobConf.set(S3Constants.HADOOP_CHANGE_DETECTION_MODE, configuredChangeDetectionMode); + } + } + /** * Sets the credentials provider type and the credentials to hadoop based on the provided configuration * @@ -418,6 +463,9 @@ throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT); } + validateAndNormalizeStreamInputType(configuration); + validateAndNormalizeChangeDetectionMode(configuration); + // container is not needed for iceberg tables, skip validation String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME); if (IcebergUtils.isIcebergTable(configuration)) { @@ -743,4 +791,41 @@ "true, false"); } } + + public static void validateAndNormalizeStreamInputType(Map<String, String> configuration) + throws CompilationException { + String streamInputType = configuration.get(INPUT_STREAM_TYPE_FIELD_NAME); + if (streamInputType == null || streamInputType.isBlank()) { + configuration.put(INPUT_STREAM_TYPE_FIELD_NAME, HADOOP_INPUT_STREAM_TYPE_VAL_AUTO); + return; + } + + if (!HADOOP_INPUT_STREAM_TYPE_VAL_AUTO.equalsIgnoreCase(streamInputType) + && !HADOOP_INPUT_STREAM_TYPE_VAL_ANALYTICS.equalsIgnoreCase(streamInputType) + && !HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC.equalsIgnoreCase(streamInputType)) { + throw new CompilationException(INVALID_PARAM_VALUE_ALLOWED_VALUE, INPUT_STREAM_TYPE_FIELD_NAME, + HADOOP_INPUT_STREAM_TYPE_VAL_AUTO + ", " + HADOOP_INPUT_STREAM_TYPE_VAL_ANALYTICS + ", " + + HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC); + } + configuration.put(INPUT_STREAM_TYPE_FIELD_NAME, streamInputType.toLowerCase()); + } + + public static void validateAndNormalizeChangeDetectionMode(Map<String, String> configuration) + throws CompilationException { + String changeDetectionMode = configuration.get(CHANGE_DETECTION_MODE_FIELD_NAME); + if (changeDetectionMode == null || changeDetectionMode.isBlank()) { + configuration.put(CHANGE_DETECTION_MODE_FIELD_NAME, HADOOP_CHANGE_DETECTION_MODE_VAL_AUTO); + return; + } + + if (!HADOOP_CHANGE_DETECTION_MODE_VAL_AUTO.equalsIgnoreCase(changeDetectionMode) + && !HADOOP_CHANGE_DETECTION_MODE_VAL_NONE.equalsIgnoreCase(changeDetectionMode) + && !HADOOP_CHANGE_DETECTION_MODE_VAL_CLIENT.equalsIgnoreCase(changeDetectionMode) + && !HADOOP_CHANGE_DETECTION_MODE_VAL_SERVER.equalsIgnoreCase(changeDetectionMode)) { + throw new CompilationException(INVALID_PARAM_VALUE_ALLOWED_VALUE, CHANGE_DETECTION_MODE_FIELD_NAME, + HADOOP_CHANGE_DETECTION_MODE_VAL_AUTO + ", " + HADOOP_CHANGE_DETECTION_MODE_VAL_NONE + ", " + + HADOOP_CHANGE_DETECTION_MODE_VAL_CLIENT + ", " + HADOOP_CHANGE_DETECTION_MODE_VAL_SERVER); + } + configuration.put(CHANGE_DETECTION_MODE_FIELD_NAME, changeDetectionMode.toLowerCase()); + } } diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/object/base/AdmObjectNode.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/object/base/AdmObjectNode.java index 8a3f388..43d3cc2 100644 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/object/base/AdmObjectNode.java +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/object/base/AdmObjectNode.java @@ -97,6 +97,14 @@ return this; } + public AdmObjectNode update(String fieldName, IAdmNode value) { + if (value == null) { + value = AdmNullNode.INSTANCE; + } + children.put(fieldName, value); + return this; + } + public IAdmNode remove(String fieldName) { return children.remove(fieldName); } -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21192?usp=email To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings?usp=email Gerrit-MessageType: merged Gerrit-Project: asterixdb Gerrit-Branch: lumina Gerrit-Change-Id: I404dd7dc013944409482b8e4d77938e843ed2ba8 Gerrit-Change-Number: 21192 Gerrit-PatchSet: 5 Gerrit-Owner: Hussain Towaileb <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Hussain Towaileb <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]>
