>From Hussain Towaileb <[email protected]>:
Hussain Towaileb has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/10783 )
Change subject: [NO ISSUE][EXT] Avoid duplicate open for streams + minor
refactoring
......................................................................
[NO ISSUE][EXT] Avoid duplicate open for streams + minor refactoring
Change-Id: I405e84a30ee67b176c3389db6fd026c408ae1685
---
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStream.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStreamFactory.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
7 files changed, 34 insertions(+), 40 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/83/10783/1
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
index ca55b6f..0b215a0 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
@@ -30,6 +30,7 @@
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.external.api.AsterixInputStream;
@@ -69,9 +70,17 @@
return partitionConstraint;
}
+ protected int getPartitionsCount() {
+ return getPartitionConstraint().getLocations().length;
+ }
+
@Override
- public abstract void configure(IServiceContext ctx, Map<String, String>
configuration,
- IWarningCollector warningCollector) throws AlgebricksException;
+ public void configure(IServiceContext ctx, Map<String, String>
configuration, IWarningCollector warningCollector)
+ throws AlgebricksException {
+ this.configuration = configuration;
+ this.partitionConstraint =
+ ((ICcApplicationContext)
ctx.getApplicationContext()).getClusterStateManager().getClusterLocations();
+ }
/**
* Finds the smallest workload and returns it
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
index e3e53d5..48035f3 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
@@ -42,15 +42,17 @@
public class AwsS3InputStream extends AbstractExternalInputStream {
private final S3Client s3Client;
+ private final String bucket;
public AwsS3InputStream(Map<String, String> configuration, List<String>
filePaths) throws HyracksDataException {
super(configuration, filePaths);
this.s3Client = buildAwsS3Client(configuration);
+ this.bucket =
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
}
@Override
protected boolean getInputStream() throws IOException {
- String bucket = configuration.get(AwsS3.CONTAINER_NAME_FIELD_NAME);
+ String fileName = filePaths.get(nextFileIndex);
GetObjectRequest.Builder getObjectBuilder = GetObjectRequest.builder();
GetObjectRequest getObjectRequest =
getObjectBuilder.bucket(bucket).key(filePaths.get(nextFileIndex)).build();
@@ -67,11 +69,10 @@
}
// Use gzip stream if needed
- String filename = filePaths.get(nextFileIndex).toLowerCase();
- if (filename.endsWith(".gz") || filename.endsWith(".gzip")) {
- in = new GZIPInputStream(s3Client.getObject(getObjectRequest),
ExternalDataConstants.DEFAULT_BUFFER_SIZE);
+ String lowerCaseFileName = fileName.toLowerCase();
+ if (lowerCaseFileName.endsWith(".gz") ||
lowerCaseFileName.endsWith(".gzip")) {
+ in = new GZIPInputStream(in,
ExternalDataConstants.DEFAULT_BUFFER_SIZE);
}
-
return true;
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
index 5bab888..a1c577a 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
@@ -18,19 +18,17 @@
*/
package org.apache.asterix.external.input.record.reader.aws;
-import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3;
-
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.BiPredicate;
import java.util.regex.Matcher;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.external.api.AsterixInputStream;
import
org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
+import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.IServiceContext;
@@ -61,8 +59,7 @@
@Override
public void configure(IServiceContext ctx, Map<String, String>
configuration, IWarningCollector warningCollector)
throws AlgebricksException {
- this.configuration = configuration;
- ICcApplicationContext ccApplicationContext = (ICcApplicationContext)
ctx.getApplicationContext();
+ super.configure(ctx, configuration, warningCollector);
// Ensure the validity of include/exclude
ExternalDataUtils.validateIncludeExclude(configuration);
@@ -70,7 +67,7 @@
// Prepare to retrieve the objects
List<S3Object> filesOnly;
- String container = configuration.get(AwsS3.CONTAINER_NAME_FIELD_NAME);
+ String container =
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
S3Client s3Client =
ExternalDataUtils.AwsS3.buildAwsS3Client(configuration);
try {
@@ -101,12 +98,8 @@
warningCollector.warn(warning);
}
- // Partition constraints
- partitionConstraint =
ccApplicationContext.getClusterStateManager().getClusterLocations();
- int partitionsCount = partitionConstraint.getLocations().length;
-
// Distribute work load amongst the partitions
- distributeWorkLoad(filesOnly, partitionsCount);
+ distributeWorkLoad(filesOnly, getPartitionsCount());
}
/**
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStream.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStream.java
index 358c412..3fb3395 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStream.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStream.java
@@ -18,8 +18,6 @@
*/
package org.apache.asterix.external.input.record.reader.azure;
-import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob;
-
import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -43,15 +41,17 @@
public class AzureBlobInputStream extends AbstractExternalInputStream {
private final BlobServiceClient client;
+ private final String container;
public AzureBlobInputStream(Map<String, String> configuration,
List<String> filePaths) throws HyracksDataException {
super(configuration, filePaths);
this.client = buildAzureClient(configuration);
+ this.container =
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
}
@Override
protected boolean getInputStream() throws IOException {
- String container =
configuration.get(AzureBlob.CONTAINER_NAME_FIELD_NAME);
+ String fileName = filePaths.get(nextFileIndex);
BlobContainerClient blobContainerClient;
BlobClient blob;
try {
@@ -60,9 +60,9 @@
in = blob.openInputStream();
// Use gzip stream if needed
- String filename = filePaths.get(nextFileIndex).toLowerCase();
- if (filename.endsWith(".gz") || filename.endsWith(".gzip")) {
- in = new GZIPInputStream(in = blob.openInputStream(),
ExternalDataConstants.DEFAULT_BUFFER_SIZE);
+ String lowerCaseFileName = fileName.toLowerCase();
+ if (lowerCaseFileName.endsWith(".gz") ||
lowerCaseFileName.endsWith(".gzip")) {
+ in = new GZIPInputStream(in,
ExternalDataConstants.DEFAULT_BUFFER_SIZE);
}
} catch (BlobStorageException ex) {
if (ex.getErrorCode().equals(BlobErrorCode.BLOB_NOT_FOUND)) {
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStreamFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStreamFactory.java
index 167e22a..ca064b1 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStreamFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStreamFactory.java
@@ -18,19 +18,17 @@
*/
package org.apache.asterix.external.input.record.reader.azure;
-import static org.apache.asterix.external.util.ExternalDataConstants.*;
-
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.BiPredicate;
import java.util.regex.Matcher;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.external.api.AsterixInputStream;
import
org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
+import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.IServiceContext;
@@ -56,10 +54,9 @@
@Override
public void configure(IServiceContext ctx, Map<String, String>
configuration, IWarningCollector warningCollector)
throws AlgebricksException {
- this.configuration = configuration;
- ICcApplicationContext ccApplicationContext = (ICcApplicationContext)
ctx.getApplicationContext();
+ super.configure(ctx, configuration, warningCollector);
- String container =
configuration.get(AzureBlob.CONTAINER_NAME_FIELD_NAME);
+ String container =
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
List<BlobItem> filesOnly = new ArrayList<>();
@@ -87,12 +84,8 @@
warningCollector.warn(warning);
}
- // Partition constraints
- partitionConstraint =
ccApplicationContext.getClusterStateManager().getClusterLocations();
- int partitionsCount = partitionConstraint.getLocations().length;
-
// Distribute work load amongst the partitions
- distributeWorkLoad(filesOnly, partitionsCount);
+ distributeWorkLoad(filesOnly, getPartitionsCount());
} catch (Exception ex) {
throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR,
ex.getMessage());
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index fd5b269..45d15df 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -288,6 +288,7 @@
public static final String INVALID_VAL = "invalid value";
public static final String DEFINITION_FIELD_NAME = "definition";
+ public static final String CONTAINER_NAME_FIELD_NAME = "container";
public static class AwsS3 {
private AwsS3() {
@@ -298,7 +299,6 @@
public static final String ACCESS_KEY_ID_FIELD_NAME = "accessKeyId";
public static final String SECRET_ACCESS_KEY_FIELD_NAME =
"secretAccessKey";
public static final String SESSION_TOKEN_FIELD_NAME = "sessionToken";
- public static final String CONTAINER_NAME_FIELD_NAME = "container";
public static final String SERVICE_END_POINT_FIELD_NAME =
"serviceEndpoint";
}
@@ -307,8 +307,6 @@
throw new AssertionError("do not instantiate");
}
- public static final String CONTAINER_NAME_FIELD_NAME = "container";
- public static final String DEFINITION_FIELD_NAME = "definition";
public static final String CONNECTION_STRING_FIELD_NAME =
"connectionString";
public static final String ACCOUNT_NAME_FIELD_NAME = "accountName";
public static final String ACCOUNT_KEY_FIELD_NAME = "accountKey";
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 3ac1116..d1bbe89 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -776,7 +776,7 @@
S3Client s3Client = buildAwsS3Client(configuration);;
S3Response response;
boolean useOldApi = false;
- String container =
configuration.get(ExternalDataConstants.AwsS3.CONTAINER_NAME_FIELD_NAME);
+ String container =
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
String prefix = getPrefix(configuration);
try {
@@ -943,7 +943,7 @@
// Check if the bucket is present
BlobServiceClient blobServiceClient;
try {
- String container =
configuration.get(ExternalDataConstants.AwsS3.CONTAINER_NAME_FIELD_NAME);
+ String container =
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
blobServiceClient = buildAzureClient(configuration);
BlobContainerClient blobContainer =
blobServiceClient.getBlobContainerClient(container);
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/10783
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: cheshire-cat
Gerrit-Change-Id: I405e84a30ee67b176c3389db6fd026c408ae1685
Gerrit-Change-Number: 10783
Gerrit-PatchSet: 1
Gerrit-Owner: Hussain Towaileb <[email protected]>
Gerrit-MessageType: newchange