>From Hussain Towaileb <[email protected]>:

Hussain Towaileb has submitted this change. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21192?usp=email )

Change subject: [NO ISSUE][EXT]: add knobs to s3 parquet
......................................................................

[NO ISSUE][EXT]: add knobs to s3 parquet

Details:
- add inputStreamType knob
- add changeDetectionMode knob

Ext-ref: MB-71701
Change-Id: I404dd7dc013944409482b8e4d77938e843ed2ba8
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21192
Integration-Tests: Jenkins <[email protected]>
Reviewed-by: Hussain Towaileb <[email protected]>
Tested-by: Hussain Towaileb <[email protected]>
---
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
M 
asterixdb/asterix-om/src/main/java/org/apache/asterix/object/base/AdmObjectNode.java
3 files changed, 112 insertions(+), 9 deletions(-)

Approvals:
  Hussain Towaileb: Looks good to me, approved; Verified
  Jenkins: Verified




diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java
index 2e5d213..79f12be 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java
@@ -48,11 +48,21 @@
     public static final String HADOOP_SERVICE_END_POINT = "fs.s3a.endpoint";
     public static final String HADOOP_REGION = "fs.s3a.endpoint.region";

-    // input stream
+    // input stream type
+    public static final String INPUT_STREAM_TYPE_FIELD_NAME = 
"inputStreamType";
     public static final String HADOOP_INPUT_STREAM_TYPE = 
"fs.s3a.input.stream.type";
+    public static final String HADOOP_INPUT_STREAM_TYPE_VAL_AUTO = "auto";
     public static final String HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC = 
"classic";
     public static final String HADOOP_INPUT_STREAM_TYPE_VAL_ANALYTICS = 
"analytics";

+    // change detection mode
+    public static final String CHANGE_DETECTION_MODE_FIELD_NAME = 
"changeDetectionMode";
+    public static final String HADOOP_CHANGE_DETECTION_MODE = 
"fs.s3a.change.detection.mode";
+    public static final String HADOOP_CHANGE_DETECTION_MODE_VAL_AUTO = "auto";
+    public static final String HADOOP_CHANGE_DETECTION_MODE_VAL_SERVER = 
"server";
+    public static final String HADOOP_CHANGE_DETECTION_MODE_VAL_CLIENT = 
"client";
+    public static final String HADOOP_CHANGE_DETECTION_MODE_VAL_NONE = "none";
+
     /*
      * Internal configurations
      */
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
index 07424b1..7556187 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
@@ -43,6 +43,7 @@
 import static org.apache.asterix.external.util.aws.AwsUtils.getCrossRegion;
 import static 
org.apache.asterix.external.util.aws.AwsUtils.getPathStyleAddressing;
 import static org.apache.asterix.external.util.aws.AwsUtils.getRegion;
+import static 
org.apache.asterix.external.util.aws.s3.S3Constants.CHANGE_DETECTION_MODE_FIELD_NAME;
 import static org.apache.asterix.external.util.aws.s3.S3Constants.FILES;
 import static org.apache.asterix.external.util.aws.s3.S3Constants.FOLDERS;
 import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ACCESS_KEY_ID;
@@ -54,9 +55,15 @@
 import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUME_ROLE_REGION;
 import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUME_ROLE_SESSION_DURATION;
 import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUME_ROLE_SESSION_NAME;
+import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CHANGE_DETECTION_MODE_VAL_AUTO;
+import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CHANGE_DETECTION_MODE_VAL_CLIENT;
+import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CHANGE_DETECTION_MODE_VAL_NONE;
+import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CHANGE_DETECTION_MODE_VAL_SERVER;
 import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CREDENTIALS_TO_ASSUME_ROLE_KEY;
 import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CREDENTIAL_PROVIDER_KEY;
 import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_INPUT_STREAM_TYPE;
+import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_INPUT_STREAM_TYPE_VAL_ANALYTICS;
+import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_INPUT_STREAM_TYPE_VAL_AUTO;
 import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC;
 import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_INSTANCE_PROFILE;
 import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_PATH_STYLE_ACCESS;
@@ -67,6 +74,7 @@
 import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_SESSION_TOKEN;
 import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_SIMPLE;
 import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_TEMPORARY;
+import static 
org.apache.asterix.external.util.aws.s3.S3Constants.INPUT_STREAM_TYPE_FIELD_NAME;
 import static 
org.apache.asterix.external.util.aws.s3.S3Constants.PATH_STYLE_ADDRESSING_FIELD_NAME;
 import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;

@@ -190,7 +198,6 @@
         }
         awsClients.setConsumingClient(builder.build());
         return awsClients;
-
     }

     @AiProvenance(agent = AiProvenance.Agent.CLAUDE_SONNET_4_6, tool = 
AiProvenance.Tool.GITHUB_COPILOT)
@@ -285,15 +292,11 @@
         if (serviceEndpoint != null) {
             // Validation of the URL should be done at hadoop-aws level
             jobConf.set(HADOOP_SERVICE_END_POINT, serviceEndpoint);
-
-            // The analytics-accelerator stream factory (default in Hadoop 
3.4+) performs a HeadObject call during
-            // stream initialization to fetch the ETag. Non-AWS S3-compatible 
endpoints (e.g. mock servers) may not
-            // return an ETag on HeadObject, which causes a 
NullPointerException. Fall back to the classic stream
-            // implementation when a custom service endpoint is in use.
-            // TODO: make configurable
-            jobConf.set(HADOOP_INPUT_STREAM_TYPE, 
HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC);
         }

+        setInputStreamType(configuration, jobConf, serviceEndpoint);
+        setChangeDetectionMode(configuration, jobConf, serviceEndpoint);
+
         boolean pathStyleAddressing =
                 
validateAndGetPathStyleAddressing(configuration.get(PATH_STYLE_ADDRESSING_FIELD_NAME),
 serviceEndpoint);
         if (pathStyleAddressing) {
@@ -310,6 +313,48 @@
         }
     }

+    private static void setInputStreamType(Map<String, String> configuration, 
JobConf jobConf, String serviceEndpoint) {
+        String configuredInputStreamType = 
configuration.get(INPUT_STREAM_TYPE_FIELD_NAME);
+        if (configuredInputStreamType == null) {
+            configuredInputStreamType = HADOOP_INPUT_STREAM_TYPE_VAL_AUTO;
+        }
+
+        if 
(HADOOP_INPUT_STREAM_TYPE_VAL_AUTO.equals(configuredInputStreamType)) {
+            // Auto mode: decide based on endpoint
+            if (serviceEndpoint != null) {
+                // The analytics-accelerator stream factory (default in Hadoop 
3.4+) performs a HeadObject call during
+                // stream initialization to fetch the ETag. Non-AWS 
S3-compatible endpoints may not return an ETag on
+                // HeadObject, which causes a NullPointerException. Fall back 
to the classic stream
+                // implementation when a custom service endpoint is in use.
+                jobConf.set(HADOOP_INPUT_STREAM_TYPE, 
HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC);
+            } else {
+                jobConf.set(HADOOP_INPUT_STREAM_TYPE, 
HADOOP_INPUT_STREAM_TYPE_VAL_ANALYTICS);
+            }
+        } else {
+            // Explicit override: use the user-specified stream type
+            jobConf.set(HADOOP_INPUT_STREAM_TYPE, configuredInputStreamType);
+        }
+    }
+
+    private static void setChangeDetectionMode(Map<String, String> 
configuration, JobConf jobConf,
+            String serviceEndpoint) {
+        String configuredChangeDetectionMode = 
configuration.get(CHANGE_DETECTION_MODE_FIELD_NAME);
+        if (configuredChangeDetectionMode == null) {
+            configuredChangeDetectionMode = 
HADOOP_CHANGE_DETECTION_MODE_VAL_AUTO;
+        }
+
+        String inputStreamType = jobConf.get(HADOOP_INPUT_STREAM_TYPE);
+        if 
(HADOOP_CHANGE_DETECTION_MODE_VAL_AUTO.equals(configuredChangeDetectionMode)) {
+            // If using a custom endpoint with classic streams, default to no 
change detection as
+            // S3-compatible implementations may not support ETag/version 
metadata consistently.
+            if (serviceEndpoint != null && 
HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC.equals(inputStreamType)) {
+                jobConf.set(S3Constants.HADOOP_CHANGE_DETECTION_MODE, 
HADOOP_CHANGE_DETECTION_MODE_VAL_NONE);
+            }
+        } else {
+            jobConf.set(S3Constants.HADOOP_CHANGE_DETECTION_MODE, 
configuredChangeDetectionMode);
+        }
+    }
+
     /**
      * Sets the credentials provider type and the credentials to hadoop based 
on the provided configuration
      *
@@ -418,6 +463,9 @@
             throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, 
srcLoc, ExternalDataConstants.KEY_FORMAT);
         }

+        validateAndNormalizeStreamInputType(configuration);
+        validateAndNormalizeChangeDetectionMode(configuration);
+
         // container is not needed for iceberg tables, skip validation
         String container = 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
         if (IcebergUtils.isIcebergTable(configuration)) {
@@ -743,4 +791,41 @@
                     "true, false");
         }
     }
+
+    public static void validateAndNormalizeStreamInputType(Map<String, String> 
configuration)
+            throws CompilationException {
+        String streamInputType = 
configuration.get(INPUT_STREAM_TYPE_FIELD_NAME);
+        if (streamInputType == null || streamInputType.isBlank()) {
+            configuration.put(INPUT_STREAM_TYPE_FIELD_NAME, 
HADOOP_INPUT_STREAM_TYPE_VAL_AUTO);
+            return;
+        }
+
+        if 
(!HADOOP_INPUT_STREAM_TYPE_VAL_AUTO.equalsIgnoreCase(streamInputType)
+                && 
!HADOOP_INPUT_STREAM_TYPE_VAL_ANALYTICS.equalsIgnoreCase(streamInputType)
+                && 
!HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC.equalsIgnoreCase(streamInputType)) {
+            throw new CompilationException(INVALID_PARAM_VALUE_ALLOWED_VALUE, 
INPUT_STREAM_TYPE_FIELD_NAME,
+                    HADOOP_INPUT_STREAM_TYPE_VAL_AUTO + ", " + 
HADOOP_INPUT_STREAM_TYPE_VAL_ANALYTICS + ", "
+                            + HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC);
+        }
+        configuration.put(INPUT_STREAM_TYPE_FIELD_NAME, 
streamInputType.toLowerCase());
+    }
+
+    public static void validateAndNormalizeChangeDetectionMode(Map<String, 
String> configuration)
+            throws CompilationException {
+        String changeDetectionMode = 
configuration.get(CHANGE_DETECTION_MODE_FIELD_NAME);
+        if (changeDetectionMode == null || changeDetectionMode.isBlank()) {
+            configuration.put(CHANGE_DETECTION_MODE_FIELD_NAME, 
HADOOP_CHANGE_DETECTION_MODE_VAL_AUTO);
+            return;
+        }
+
+        if 
(!HADOOP_CHANGE_DETECTION_MODE_VAL_AUTO.equalsIgnoreCase(changeDetectionMode)
+                && 
!HADOOP_CHANGE_DETECTION_MODE_VAL_NONE.equalsIgnoreCase(changeDetectionMode)
+                && 
!HADOOP_CHANGE_DETECTION_MODE_VAL_CLIENT.equalsIgnoreCase(changeDetectionMode)
+                && 
!HADOOP_CHANGE_DETECTION_MODE_VAL_SERVER.equalsIgnoreCase(changeDetectionMode)) 
{
+            throw new CompilationException(INVALID_PARAM_VALUE_ALLOWED_VALUE, 
CHANGE_DETECTION_MODE_FIELD_NAME,
+                    HADOOP_CHANGE_DETECTION_MODE_VAL_AUTO + ", " + 
HADOOP_CHANGE_DETECTION_MODE_VAL_NONE + ", "
+                            + HADOOP_CHANGE_DETECTION_MODE_VAL_CLIENT + ", " + 
HADOOP_CHANGE_DETECTION_MODE_VAL_SERVER);
+        }
+        configuration.put(CHANGE_DETECTION_MODE_FIELD_NAME, 
changeDetectionMode.toLowerCase());
+    }
 }
diff --git 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/object/base/AdmObjectNode.java
 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/object/base/AdmObjectNode.java
index 8a3f388..43d3cc2 100644
--- 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/object/base/AdmObjectNode.java
+++ 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/object/base/AdmObjectNode.java
@@ -97,6 +97,14 @@
         return this;
     }

+    public AdmObjectNode update(String fieldName, IAdmNode value) {
+        if (value == null) {
+            value = AdmNullNode.INSTANCE;
+        }
+        children.put(fieldName, value);
+        return this;
+    }
+
     public IAdmNode remove(String fieldName) {
         return children.remove(fieldName);
     }

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21192?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings?usp=email

Gerrit-MessageType: merged
Gerrit-Project: asterixdb
Gerrit-Branch: lumina
Gerrit-Change-Id: I404dd7dc013944409482b8e4d77938e843ed2ba8
Gerrit-Change-Number: 21192
Gerrit-PatchSet: 5
Gerrit-Owner: Hussain Towaileb <[email protected]>
Gerrit-Reviewer: Anon. E. Moose #1000171
Gerrit-Reviewer: Hussain Towaileb <[email protected]>
Gerrit-Reviewer: Jenkins <[email protected]>
Gerrit-Reviewer: Michael Blow <[email protected]>

Reply via email to