>From Hussain Towaileb <[email protected]>:
Hussain Towaileb has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21192?usp=email )
Change subject: [NO ISSUE][EXT]: add knobs to s3 parquet
......................................................................
[NO ISSUE][EXT]: add knobs to s3 parquet
Ext-ref: MB-71701
Change-Id: I404dd7dc013944409482b8e4d77938e843ed2ba8
---
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
A
asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/util/aws/s3/S3UtilsTest.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java
5 files changed, 255 insertions(+), 7 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/92/21192/1
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
index 9a183bd..1dc8a2c 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
@@ -24,6 +24,7 @@
import static
org.apache.hyracks.control.common.config.OptionTypes.POSITIVE_INTEGER;
import static
org.apache.hyracks.control.common.config.OptionTypes.POSITIVE_INTEGER_BYTE_UNIT;
import static org.apache.hyracks.control.common.config.OptionTypes.STRING;
+import static
org.apache.hyracks.control.common.config.OptionTypes.getAcceptedStringValuesOptionType;
import static
org.apache.hyracks.control.common.config.OptionTypes.getRangedIntegerType;
import org.apache.hyracks.api.config.IOption;
@@ -81,7 +82,18 @@
getRangedIntegerType(60, 3600),
900,
"GCS impersonating service account duration in seconds. "
- + "Range from 60 seconds (1 min) to 3600 seconds (1
hour)");
+ + "Range from 60 seconds (1 min) to 3600 seconds (1
hour)"),
+ AWS_S3_STREAM_TYPE(
+ getAcceptedStringValuesOptionType("auto", "analytics",
"classic"),
+ "auto",
+ "The stream type used to read from S3. Valid values are: auto,
analytics, classic. "
+ + "auto (default): uses analytics for AWS S3, classic
for custom endpoints."),
+ AWS_S3_CHANGE_DETECTION_MODE(
+ getAcceptedStringValuesOptionType("auto", "server", "none",
"client"),
+ "auto",
+ "The change detection mode used for S3. Valid values are:
auto, server, client, none. "
+ + "auto (default): uses none for classic streams with
custom endpoints; otherwise "
+ + "Hadoop's default is used.");
private final IOptionType type;
private final Object defaultValue;
@@ -114,6 +126,8 @@
case AWS_ASSUME_ROLE_PREFETCH_TIME:
case AWS_ASSUME_ROLE_ASYNC_REFRESH_ENABLED:
case GCP_IMPERSONATE_SERVICE_ACCOUNT_DURATION:
+ case AWS_S3_STREAM_TYPE:
+ case AWS_S3_CHANGE_DETECTION_MODE:
return Section.COMMON;
case CC_JAVA_OPTS:
case NC_JAVA_OPTS:
@@ -214,4 +228,12 @@
public int getGcpImpersonateServiceAccountDuration() {
return
accessor.getInt(Option.GCP_IMPERSONATE_SERVICE_ACCOUNT_DURATION);
}
+
+ public String getAwsS3StreamType() {
+ return accessor.getString(Option.AWS_S3_STREAM_TYPE);
+ }
+
+ public String getAwsS3ChangeDetectionMode() {
+ return accessor.getString(Option.AWS_S3_CHANGE_DETECTION_MODE);
+ }
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java
index 2e5d213..fb4627e 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java
@@ -52,6 +52,8 @@
public static final String HADOOP_INPUT_STREAM_TYPE =
"fs.s3a.input.stream.type";
public static final String HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC =
"classic";
public static final String HADOOP_INPUT_STREAM_TYPE_VAL_ANALYTICS =
"analytics";
+ public static final String HADOOP_CHANGE_DETECTION_MODE =
"fs.s3a.change.detection.mode";
+ public static final String HADOOP_CHANGE_DETECTION_MODE_VAL_NONE = "none";
/*
* Internal configurations
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
index 07424b1..c0ebb21 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
@@ -54,6 +54,7 @@
import static
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUME_ROLE_REGION;
import static
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUME_ROLE_SESSION_DURATION;
import static
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUME_ROLE_SESSION_NAME;
+import static
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CHANGE_DETECTION_MODE_VAL_NONE;
import static
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CREDENTIALS_TO_ASSUME_ROLE_KEY;
import static
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CREDENTIAL_PROVIDER_KEY;
import static
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_INPUT_STREAM_TYPE;
@@ -134,6 +135,7 @@
public class S3Utils {
private static final Logger LOGGER = LogManager.getLogger();
+ private static final String AUTO_SETTING_VALUE = "auto";
private static final class StaticTrustManagersProvider implements
TlsTrustManagersProvider {
private final TrustManager[] trustManagers;
@@ -285,13 +287,37 @@
if (serviceEndpoint != null) {
// Validation of the URL should be done at hadoop-aws level
jobConf.set(HADOOP_SERVICE_END_POINT, serviceEndpoint);
+ }
- // The analytics-accelerator stream factory (default in Hadoop
3.4+) performs a HeadObject call during
- // stream initialization to fetch the ETag. Non-AWS S3-compatible
endpoints (e.g. mock servers) may not
- // return an ETag on HeadObject, which causes a
NullPointerException. Fall back to the classic stream
- // implementation when a custom service endpoint is in use.
- // TODO: make configurable
- jobConf.set(HADOOP_INPUT_STREAM_TYPE,
HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC);
+ String configuredInputStreamType =
normalizeConfigValue(appCtx.getExternalProperties().getAwsS3StreamType());
+ String resolvedInputStreamType;
+ if (isAuto(configuredInputStreamType)) {
+ // Auto mode: decide based on endpoint
+ if (serviceEndpoint != null) {
+ // The analytics-accelerator stream factory (default in Hadoop
3.4+) performs a HeadObject call during
+ // stream initialization to fetch the ETag. Non-AWS
S3-compatible endpoints may not return an ETag on
+ // HeadObject, which causes a NullPointerException. Fall back
to the classic stream
+ // implementation when a custom service endpoint is in use.
+ resolvedInputStreamType = HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC;
+ } else {
+ resolvedInputStreamType =
S3Constants.HADOOP_INPUT_STREAM_TYPE_VAL_ANALYTICS;
+ }
+ } else {
+ // Explicit override: use the user-specified stream type
+ resolvedInputStreamType = configuredInputStreamType;
+ }
+ jobConf.set(HADOOP_INPUT_STREAM_TYPE, resolvedInputStreamType);
+
+ String configuredChangeDetectionMode =
+
normalizeConfigValue(appCtx.getExternalProperties().getAwsS3ChangeDetectionMode());
+ if (isAuto(configuredChangeDetectionMode)) {
+ // If using a custom endpoint with classic streams, default to no
change detection as
+ // S3-compatible implementations may not support ETag/version
metadata consistently.
+ if (serviceEndpoint != null &&
HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC.equals(resolvedInputStreamType)) {
+ jobConf.set(S3Constants.HADOOP_CHANGE_DETECTION_MODE,
HADOOP_CHANGE_DETECTION_MODE_VAL_NONE);
+ }
+ } else {
+ jobConf.set(S3Constants.HADOOP_CHANGE_DETECTION_MODE,
configuredChangeDetectionMode);
}
boolean pathStyleAddressing =
@@ -743,4 +769,12 @@
"true, false");
}
}
+
+ private static boolean isAuto(String value) {
+ return value == null || value.isBlank() ||
AUTO_SETTING_VALUE.equalsIgnoreCase(value);
+ }
+
+ private static String normalizeConfigValue(String value) {
+ return value == null ? null : value.trim().toLowerCase();
+ }
}
diff --git
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/util/aws/s3/S3UtilsTest.java
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/util/aws/s3/S3UtilsTest.java
new file mode 100644
index 0000000..ea98c26
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/util/aws/s3/S3UtilsTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util.aws.s3;
+
+import static
org.apache.asterix.external.util.aws.AwsConstants.CROSS_REGION_FIELD_NAME;
+import static
org.apache.asterix.external.util.aws.AwsConstants.REGION_FIELD_NAME;
+import static
org.apache.asterix.external.util.aws.AwsConstants.SERVICE_END_POINT_FIELD_NAME;
+import static
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CHANGE_DETECTION_MODE;
+import static
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CHANGE_DETECTION_MODE_VAL_NONE;
+import static
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_INPUT_STREAM_TYPE;
+import static
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_INPUT_STREAM_TYPE_VAL_ANALYTICS;
+import static
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.config.ExternalProperties;
+import org.apache.hadoop.mapred.JobConf;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class S3UtilsTest {
+
+ @Test
+ public void
autoModeWithCustomEndpointDefaultsToClassicAndNoneChangeDetection() throws
Exception {
+ IApplicationContext appCtx = mockAppContext("auto", "auto");
+ JobConf jobConf = new JobConf();
+
+ S3Utils.configureAwsS3HdfsJobConf(appCtx, jobConf,
getConfiguration("http://localhost:9000"));
+
+ Assert.assertEquals(HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC,
jobConf.get(HADOOP_INPUT_STREAM_TYPE));
+ Assert.assertEquals(HADOOP_CHANGE_DETECTION_MODE_VAL_NONE,
jobConf.get(HADOOP_CHANGE_DETECTION_MODE));
+ }
+
+ @Test
+ public void
autoModeWithoutCustomEndpointUsesAnalyticsAndHadoopDefaultChangeDetection()
throws Exception {
+ IApplicationContext appCtx = mockAppContext("auto", "auto");
+ JobConf jobConf = new JobConf();
+
+ S3Utils.configureAwsS3HdfsJobConf(appCtx, jobConf,
getConfiguration(null));
+
+ Assert.assertEquals(HADOOP_INPUT_STREAM_TYPE_VAL_ANALYTICS,
jobConf.get(HADOOP_INPUT_STREAM_TYPE));
+ Assert.assertEquals("server",
jobConf.get(HADOOP_CHANGE_DETECTION_MODE));
+ }
+
+ @Test
+ public void explicitChangeDetectionModeOverridesAutoBehavior() throws
Exception {
+ IApplicationContext appCtx = mockAppContext("auto", "server");
+ JobConf jobConf = new JobConf();
+
+ S3Utils.configureAwsS3HdfsJobConf(appCtx, jobConf,
getConfiguration("http://localhost:9000"));
+
+ Assert.assertEquals(HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC,
jobConf.get(HADOOP_INPUT_STREAM_TYPE));
+ Assert.assertEquals("server",
jobConf.get(HADOOP_CHANGE_DETECTION_MODE));
+ }
+
+ @Test
+ public void autoChangeDetectionWithClassicOnAwsKeepsHadoopDefault() throws
Exception {
+ IApplicationContext appCtx = mockAppContext("classic", "auto");
+ JobConf jobConf = new JobConf();
+
+ S3Utils.configureAwsS3HdfsJobConf(appCtx, jobConf,
getConfiguration(null));
+
+ Assert.assertEquals(HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC,
jobConf.get(HADOOP_INPUT_STREAM_TYPE));
+ Assert.assertEquals("server",
jobConf.get(HADOOP_CHANGE_DETECTION_MODE));
+ }
+
+ @Test
+ public void autoChangeDetectionWithClassicOnCustomEndpointDefaultsToNone()
throws Exception {
+ IApplicationContext appCtx = mockAppContext("classic", "auto");
+ JobConf jobConf = new JobConf();
+
+ S3Utils.configureAwsS3HdfsJobConf(appCtx, jobConf,
getConfiguration("http://localhost:9000"));
+
+ Assert.assertEquals(HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC,
jobConf.get(HADOOP_INPUT_STREAM_TYPE));
+ Assert.assertEquals(HADOOP_CHANGE_DETECTION_MODE_VAL_NONE,
jobConf.get(HADOOP_CHANGE_DETECTION_MODE));
+ }
+
+ @Test
+ public void
autoChangeDetectionWithAnalyticsOnCustomEndpointKeepsHadoopDefault() throws
Exception {
+ IApplicationContext appCtx = mockAppContext("analytics", "auto");
+ JobConf jobConf = new JobConf();
+
+ S3Utils.configureAwsS3HdfsJobConf(appCtx, jobConf,
getConfiguration("http://localhost:9000"));
+
+ Assert.assertEquals(HADOOP_INPUT_STREAM_TYPE_VAL_ANALYTICS,
jobConf.get(HADOOP_INPUT_STREAM_TYPE));
+ Assert.assertEquals("server",
jobConf.get(HADOOP_CHANGE_DETECTION_MODE));
+ }
+
+ @Test
+ public void explicitNoneChangeDetectionIsAlwaysRespected() throws
Exception {
+ IApplicationContext appCtx = mockAppContext("analytics", "none");
+ JobConf jobConf = new JobConf();
+
+ S3Utils.configureAwsS3HdfsJobConf(appCtx, jobConf,
getConfiguration(null));
+
+ Assert.assertEquals(HADOOP_INPUT_STREAM_TYPE_VAL_ANALYTICS,
jobConf.get(HADOOP_INPUT_STREAM_TYPE));
+ Assert.assertEquals(HADOOP_CHANGE_DETECTION_MODE_VAL_NONE,
jobConf.get(HADOOP_CHANGE_DETECTION_MODE));
+ }
+
+ @Test
+ public void
normalizedClassicAndAutoValuesStillDefaultToNoneOnCustomEndpoint() throws
Exception {
+ IApplicationContext appCtx = mockAppContext(" CLASSIC ", " AUTO ");
+ JobConf jobConf = new JobConf();
+
+ S3Utils.configureAwsS3HdfsJobConf(appCtx, jobConf,
getConfiguration("http://localhost:9000"));
+
+ Assert.assertEquals(HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC,
jobConf.get(HADOOP_INPUT_STREAM_TYPE));
+ Assert.assertEquals(HADOOP_CHANGE_DETECTION_MODE_VAL_NONE,
jobConf.get(HADOOP_CHANGE_DETECTION_MODE));
+ }
+
+ private static IApplicationContext mockAppContext(String streamType,
String changeDetectionMode) {
+ IApplicationContext appCtx = Mockito.mock(IApplicationContext.class);
+ ExternalProperties externalProperties =
Mockito.mock(ExternalProperties.class);
+
Mockito.when(appCtx.getExternalProperties()).thenReturn(externalProperties);
+
Mockito.when(externalProperties.getAwsS3StreamType()).thenReturn(streamType);
+
Mockito.when(externalProperties.getAwsS3ChangeDetectionMode()).thenReturn(changeDetectionMode);
+ return appCtx;
+ }
+
+ private static Map<String, String> getConfiguration(String
serviceEndpoint) {
+ Map<String, String> configuration = new HashMap<>();
+ configuration.put(REGION_FIELD_NAME, "us-east-1");
+ configuration.put(CROSS_REGION_FIELD_NAME, "false");
+ if (serviceEndpoint != null) {
+ configuration.put(SERVICE_END_POINT_FIELD_NAME, serviceEndpoint);
+ }
+ return configuration;
+ }
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java
index df2d291..f626851 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java
@@ -20,9 +20,12 @@
import java.net.MalformedURLException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Stream;
import org.apache.hyracks.api.config.IOptionType;
@@ -487,4 +490,43 @@
public static final IOptionType<Long> getRangedLongByteUnit(long min, long
max) {
return new LongByteUnit(min, max);
}
+
+ public static IOptionType<String>
getAcceptedStringValuesOptionType(String... acceptedValues) {
+ return new AcceptedStringValuesOptionType(acceptedValues);
+ }
+
+ private static class AcceptedStringValuesOptionType implements
IOptionType<String> {
+ private final Set<String> accepted;
+ private final String acceptedAsString;
+
+ private AcceptedStringValuesOptionType(String... acceptedValues) {
+ accepted = new HashSet<>(Arrays.asList(acceptedValues));
+ acceptedAsString = String.join(", ", acceptedValues);
+ }
+
+ @Override
+ public String parse(String value) {
+ String normalized = value.trim().toLowerCase();
+ if (!accepted.contains(normalized)) {
+ throw new IllegalArgumentException(
+ "Invalid value '" + value + "'. Accepted values: " +
acceptedAsString);
+ }
+ return normalized;
+ }
+
+ @Override
+ public String parse(JsonNode node) {
+ return node.isNull() ? null : parse(node.asText());
+ }
+
+ @Override
+ public Class<String> targetType() {
+ return String.class;
+ }
+
+ @Override
+ public void serializeJSONFieldUnsafe(String fieldName, Object value,
ObjectNode node) {
+ node.put(fieldName, (String) value);
+ }
+ }
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21192?usp=email
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings?usp=email
Gerrit-MessageType: newchange
Gerrit-Project: asterixdb
Gerrit-Branch: lumina
Gerrit-Change-Id: I404dd7dc013944409482b8e4d77938e843ed2ba8
Gerrit-Change-Number: 21192
Gerrit-PatchSet: 1
Gerrit-Owner: Hussain Towaileb <[email protected]>