>From Hussain Towaileb <[email protected]>: Hussain Towaileb has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17730 )
Change subject: [ASTERIXDB-3247][EXT]: Push computed field evaluation to files listing ...................................................................... [ASTERIXDB-3247][EXT]: Push computed field evaluation to files listing Change-Id: I36ba077a26fbb142945e7b5ea7298548263c1d67 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17730 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Hussain Towaileb <[email protected]> Reviewed-by: Wail Alkowaileet <[email protected]> --- M asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataPrefix.java 6 files changed, 82 insertions(+), 44 deletions(-) Approvals: Wail Alkowaileet: Looks good to me, approved Hussain Towaileb: Looks good to me, but someone else must approve Jenkins: Verified; Verified diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml index 50502da..ce906f6 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml @@ -233,6 +233,7 @@ <expected-warn>Failed to evaluate computed field. File: 'external-filter/department/accounting/0.json'. Computed Field Name: 'name'. Computed Field Type: 'bigint'. Computed Field Value: 'accounting'. Reason: 'For input string: "accounting"'</expected-warn> <expected-warn>Failed to evaluate computed field. File: 'external-filter/department/engineering/0.json'. Computed Field Name: 'name'. Computed Field Type: 'bigint'. Computed Field Value: 'engineering'. Reason: 'For input string: "engineering"'</expected-warn> <expected-warn>Failed to evaluate computed field. File: 'external-filter/department/hr/0.json'. Computed Field Name: 'name'. Computed Field Type: 'bigint'. Computed Field Value: 'hr'. Reason: 'For input string: "hr"'</expected-warn> + <expected-warn>The provided external dataset configuration returned no files from the external source</expected-warn> </compilation-unit> </test-case> <test-case FilePath="external-dataset/s3/filter"> diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java index 7ae992a..045b746 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java @@ -18,7 +18,6 @@ */ package org.apache.asterix.external.input.record.reader.aws; -import java.util.ArrayList; import java.util.Comparator; import java.util.List; import java.util.Map; @@ -56,39 +55,22 @@ // Ensure the validity of include/exclude ExternalDataUtils.validateIncludeExclude(configuration); IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration); + IExternalFilterEvaluator evaluator = filterEvaluatorFactory.create(ctx, warningCollector); - //Get a list of S3 objects + // prepare prefix for computed field calculations ExternalDataPrefix externalDataPrefix = new ExternalDataPrefix(configuration, warningCollector); configuration.put(ExternalDataPrefix.PREFIX_ROOT_FIELD_NAME, externalDataPrefix.getRoot()); // TODO(htowaileb): Since we're using the root to load the files then start filtering, it might end up being // very expensive since at the root of the prefix we might load millions of files, we should consider (when // possible) to get the value and add it - List<S3Object> filesOnly = S3Utils.listS3Objects(configuration, includeExcludeMatcher, warningCollector); - filesOnly = filterPrefixes(externalDataPrefix, filesOnly, filterEvaluatorFactory.create(ctx, warningCollector)); + List<S3Object> filesOnly = S3Utils.listS3Objects(configuration, includeExcludeMatcher, warningCollector, + externalDataPrefix, evaluator); // Distribute work load amongst the partitions distributeWorkLoad(filesOnly, getPartitionsCount()); } - private List<S3Object> filterPrefixes(ExternalDataPrefix prefix, List<S3Object> filesOnly, - IExternalFilterEvaluator evaluator) throws HyracksDataException { - - // if no computed fields or empty files list, return the original list - if (filesOnly.isEmpty() || !prefix.hasComputedFields() || evaluator.isEmpty()) { - return filesOnly; - } - - List<S3Object> filteredList = new ArrayList<>(); - for (S3Object file : filesOnly) { - if (prefix.evaluate(file.key(), evaluator)) { - filteredList.add(file); - } - } - - return filteredList; - } - /** * To efficiently utilize the parallelism, work load will be distributed amongst the partitions based on the file * size. diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java index 66312bb..bccb6f8 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java @@ -28,10 +28,12 @@ import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; +import org.apache.asterix.common.external.IExternalFilterEvaluator; import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory; import org.apache.asterix.external.input.HDFSDataSourceFactory; import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher; import org.apache.asterix.external.util.ExternalDataConstants; +import org.apache.asterix.external.util.ExternalDataPrefix; import org.apache.asterix.external.util.ExternalDataUtils; import org.apache.asterix.external.util.aws.s3.S3Constants; import org.apache.asterix.external.util.aws.s3.S3Utils; @@ -56,9 +58,16 @@ public void configure(IServiceContext serviceCtx, Map<String, String> configuration, IWarningCollector warningCollector, IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws AlgebricksException, HyracksDataException { + + // prepare prefix for computed field calculations + ExternalDataPrefix externalDataPrefix = new ExternalDataPrefix(configuration, warningCollector); + IExternalFilterEvaluator evaluator = filterEvaluatorFactory.create(serviceCtx, warningCollector); + configuration.put(ExternalDataPrefix.PREFIX_ROOT_FIELD_NAME, externalDataPrefix.getRoot()); + //Get path String path = configuration.containsKey(ExternalDataConstants.KEY_PATH) - ? configuration.get(ExternalDataConstants.KEY_PATH) : buildPathURIs(configuration, warningCollector); + ? configuration.get(ExternalDataConstants.KEY_PATH) + : buildPathURIs(configuration, warningCollector, externalDataPrefix, evaluator); //Put S3 configurations to AsterixDB's Hadoop configuration putS3ConfToHadoopConf(configuration, path); @@ -108,11 +117,13 @@ * @return Comma-delimited paths (e.g., "s3a://bucket/file1.parquet,s3a://bucket/file2.parquet") * @throws CompilationException Compilation exception */ - private static String buildPathURIs(Map<String, String> configuration, IWarningCollector warningCollector) - throws CompilationException { + private static String buildPathURIs(Map<String, String> configuration, IWarningCollector warningCollector, + ExternalDataPrefix externalDataPrefix, IExternalFilterEvaluator evaluator) + throws CompilationException, HyracksDataException { String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME); IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration); - List<S3Object> filesOnly = S3Utils.listS3Objects(configuration, includeExcludeMatcher, warningCollector); + List<S3Object> filesOnly = S3Utils.listS3Objects(configuration, includeExcludeMatcher, warningCollector, + externalDataPrefix, evaluator); StringBuilder builder = new StringBuilder(); if (!filesOnly.isEmpty()) { diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataPrefix.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataPrefix.java index 9d45fed..f6973ee 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataPrefix.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataPrefix.java @@ -250,6 +250,11 @@ // TODO provide the List to avoid array creation List<String> keySegments = extractPrefixSegments(key); + // no computed fields filter, accept path + if (!hasComputedFields() || evaluator.isEmpty()) { + return true; + } + // segments of object key have to be larger than segments of the prefix if (keySegments.size() <= segments.size()) { return false; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java index e60190b..318716f 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java @@ -45,6 +45,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.function.BiPredicate; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.regex.PatternSyntaxException; @@ -54,6 +55,7 @@ import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; +import org.apache.asterix.common.external.IExternalFilterEvaluator; import org.apache.asterix.common.functions.ExternalFunctionLanguage; import org.apache.asterix.common.library.ILibrary; import org.apache.asterix.common.library.ILibraryManager; @@ -980,4 +982,20 @@ argHolder.getDataOutput().writeByte(ARRAY16); argHolder.getDataOutput().writeShort((short) 0); } + + /** + * Tests the provided key against all the provided predicates/evaluators and return true if they all pass. + * + * @param key key + * @param predicate predicate + * @param matchers matchers + * @param externalDataPrefix external data prefix + * @param evaluator evaluator + * + * @return true if key passes all tests, false otherwise + */ + public static boolean evaluate(String key, BiPredicate<List<Matcher>, String> predicate, List<Matcher> matchers, + ExternalDataPrefix externalDataPrefix, IExternalFilterEvaluator evaluator) throws HyracksDataException { + return !key.endsWith("/") && predicate.test(matchers, key) && externalDataPrefix.evaluate(key, evaluator); + } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java index a4f3a1e..1436e55 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java @@ -56,13 +56,16 @@ import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.external.IExternalFilterEvaluator; import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory; import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.ExternalDataPrefix; +import org.apache.asterix.external.util.ExternalDataUtils; import org.apache.asterix.external.util.HDFSUtils; import org.apache.hadoop.fs.s3a.Constants; import org.apache.hadoop.mapred.JobConf; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.IWarningCollector; import org.apache.hyracks.api.exceptions.SourceLocation; import org.apache.hyracks.api.exceptions.Warning; @@ -360,7 +363,8 @@ */ public static List<S3Object> listS3Objects(Map<String, String> configuration, AbstractExternalInputStreamFactory.IncludeExcludeMatcher includeExcludeMatcher, - IWarningCollector warningCollector) throws CompilationException { + IWarningCollector warningCollector, ExternalDataPrefix externalDataPrefix, + IExternalFilterEvaluator evaluator) throws CompilationException, HyracksDataException { // Prepare to retrieve the objects List<S3Object> filesOnly; String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME); @@ -368,13 +372,15 @@ String prefix = getPrefix(configuration); try { - filesOnly = listS3Objects(s3Client, container, prefix, includeExcludeMatcher); + filesOnly = + listS3Objects(s3Client, container, prefix, includeExcludeMatcher, externalDataPrefix, evaluator); } catch (S3Exception ex) { // New API is not implemented, try falling back to old API try { // For error code, see https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html if (ex.awsErrorDetails().errorCode().equals(ERROR_METHOD_NOT_IMPLEMENTED)) { - filesOnly = oldApiListS3Objects(s3Client, container, prefix, includeExcludeMatcher); + filesOnly = oldApiListS3Objects(s3Client, container, prefix, includeExcludeMatcher, + externalDataPrefix, evaluator); } else { throw ex; } @@ -407,7 +413,8 @@ * @param includeExcludeMatcher include/exclude matchers to apply */ private static List<S3Object> listS3Objects(S3Client s3Client, String container, String prefix, - AbstractExternalInputStreamFactory.IncludeExcludeMatcher includeExcludeMatcher) { + AbstractExternalInputStreamFactory.IncludeExcludeMatcher includeExcludeMatcher, + ExternalDataPrefix externalDataPrefix, IExternalFilterEvaluator evaluator) throws HyracksDataException { String newMarker = null; List<S3Object> filesOnly = new ArrayList<>(); @@ -425,7 +432,7 @@ // Collect the paths to files only collectAndFilterFiles(listObjectsResponse.contents(), includeExcludeMatcher.getPredicate(), - includeExcludeMatcher.getMatchersList(), filesOnly); + includeExcludeMatcher.getMatchersList(), filesOnly, externalDataPrefix, evaluator); // Mark the flag as done if done, otherwise, get the marker of the previous response for the next request if (!listObjectsResponse.isTruncated()) { @@ -447,7 +454,8 @@ * @param includeExcludeMatcher include/exclude matchers to apply */ private static List<S3Object> oldApiListS3Objects(S3Client s3Client, String container, String prefix, - AbstractExternalInputStreamFactory.IncludeExcludeMatcher includeExcludeMatcher) { + AbstractExternalInputStreamFactory.IncludeExcludeMatcher includeExcludeMatcher, + ExternalDataPrefix externalDataPrefix, IExternalFilterEvaluator evaluator) throws HyracksDataException { String newMarker = null; List<S3Object> filesOnly = new ArrayList<>(); @@ -465,7 +473,7 @@ // Collect the paths to files only collectAndFilterFiles(listObjectsResponse.contents(), includeExcludeMatcher.getPredicate(), - includeExcludeMatcher.getMatchersList(), filesOnly); + includeExcludeMatcher.getMatchersList(), filesOnly, externalDataPrefix, evaluator); // Mark the flag as done if done, otherwise, get the marker of the previous response for the next request if (!listObjectsResponse.isTruncated()) { @@ -479,21 +487,20 @@ } /** - * AWS S3 returns all the objects as paths, not differentiating between folder and files. The path is considered - * a file if it does not end up with a "/" which is the separator in a folder structure. + * Collects only files that pass all tests * - * @param s3Objects List of returned objects + * @param s3Objects s3 objects + * @param predicate predicate + * @param matchers matchers + * @param filesOnly filtered files + * @param externalDataPrefix external data prefix + * @param evaluator evaluator */ private static void collectAndFilterFiles(List<S3Object> s3Objects, BiPredicate<List<Matcher>, String> predicate, - List<Matcher> matchers, List<S3Object> filesOnly) { + List<Matcher> matchers, List<S3Object> filesOnly, ExternalDataPrefix externalDataPrefix, + IExternalFilterEvaluator evaluator) throws HyracksDataException { for (S3Object object : s3Objects) { - // skip folders - if (object.key().endsWith("/")) { - continue; - } - - // No filter, add file - if (predicate.test(matchers, object.key())) { + if (ExternalDataUtils.evaluate(object.key(), predicate, matchers, externalDataPrefix, evaluator)) { filesOnly.add(object); } } -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17730 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Change-Id: I36ba077a26fbb142945e7b5ea7298548263c1d67 Gerrit-Change-Number: 17730 Gerrit-PatchSet: 2 Gerrit-Owner: Hussain Towaileb <[email protected]> Gerrit-Reviewer: Hussain Towaileb <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Wail Alkowaileet <[email protected]> Gerrit-CC: Anon. E. Moose #1000171 Gerrit-MessageType: merged
