>From Preetham Poluparthi <[email protected]>: Preetham Poluparthi has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20958?usp=email )
Change subject: [ASTERIXDB-3707][EXT] Add file level split while reading parquet files ...................................................................... [ASTERIXDB-3707][EXT] Add file level split while reading parquet files - user model changes: no - storage format changes: no - interface changes: no Details: Introduces support for file-based splits during compilation and adds 'compiler.hdfs.split.parallelism' to control thread count for file listing to improve compilation time. Ext-ref : MB-70421 Change-Id: I288908499c90320f9fc497675ef3d671163c69f0 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20958 Tested-by: Jenkins <[email protected]> Reviewed-by: Ali Alsuliman <[email protected]> --- M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/compiler/provider/SqlppCompilationProvider.java M asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm M asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm M asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java 14 files changed, 114 insertions(+), 10 deletions(-) Approvals: Jenkins: Verified Ali Alsuliman: Looks good to me, approved diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/compiler/provider/SqlppCompilationProvider.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/compiler/provider/SqlppCompilationProvider.java index 26fe54c..c54a531 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/compiler/provider/SqlppCompilationProvider.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/compiler/provider/SqlppCompilationProvider.java @@ -112,6 +112,8 @@ StartFeedStatement.WAIT_FOR_COMPLETION, FeedActivityDetails.FEED_POLICY_NAME, FeedActivityDetails.COLLECT_LOCATIONS, SqlppQueryRewriter.INLINE_WITH_OPTION, SqlppExpressionToPlanTranslator.REWRITE_IN_AS_OR_OPTION, "hash_merge", "output-record-type", + CompilerProperties.COMPILER_PARQUET_FILESPLITS_KEY, + CompilerProperties.COMPILER_HDFS_SPLIT_PARALLELISM_KEY, DisjunctivePredicateToJoinRule.REWRITE_OR_AS_JOIN_OPTION, SetAsterixPhysicalOperatorsRule.REWRITE_ATTEMPT_BATCH_ASSIGN, EquivalenceClassUtils.REWRITE_INTERNAL_QUERYUID_PK, SqlppQueryRewriter.SQL_COMPAT_OPTION, diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm index 47034dd..10c1856 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm @@ -60,6 +60,7 @@ "compiler.forcejoinorder" : false, "compiler\.framesize" : 32768, "compiler\.groupmemory" : 163840, + "compiler\.hdfs\.split\.parallelism" : 4, "compiler\.index.covering" : true, "compiler\.internal\.sanitycheck" : true, "compiler\.joinmemory" : 262144, @@ -72,6 +73,7 @@ "compiler\.optimize\.expression.max\.args" : 100, "compiler.ordered.fields" : false, "compiler\.parallelism" : 0, + "compiler\.parquet\.filesplits" : false, "compiler.queryplanshape" : "zigzag", "compiler.runtime.memory.overhead" : 5, "compiler\.sort\.parallel" : false, diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm index f963801..22c4fc8 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm @@ -60,6 +60,7 @@ "compiler.forcejoinorder" : false, "compiler\.framesize" : 32768, "compiler\.groupmemory" : 163840, + "compiler\.hdfs\.split\.parallelism" : 4, "compiler\.index.covering" : true, "compiler\.internal\.sanitycheck" : false, "compiler\.joinmemory" : 262144, @@ -72,6 +73,7 @@ "compiler.optimize.expression.max.args" : 100, "compiler.ordered.fields" : false, "compiler\.parallelism" : -1, + "compiler\.parquet\.filesplits" : false, "compiler.queryplanshape" : "zigzag", "compiler.runtime.memory.overhead" : 5, "compiler\.sort\.parallel" : true, diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm index 824698e..a36d3b5 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm @@ -60,6 +60,7 @@ "compiler.forcejoinorder" : false, "compiler\.framesize" : 32768, "compiler\.groupmemory" : 163840, + "compiler\.hdfs\.split\.parallelism" : 4, "compiler\.index.covering" : true, "compiler\.internal\.sanitycheck" : false, "compiler\.joinmemory" : 262144, @@ -72,6 +73,7 @@ "compiler.optimize.expression.max.args" : 100, "compiler.ordered.fields" : false, "compiler\.parallelism" : 3, + "compiler\.parquet\.filesplits" : false, "compiler.queryplanshape" : "zigzag", "compiler.runtime.memory.overhead" : 5, "compiler\.sort\.parallel" : true, diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java index ea2dddc..073bd43 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java @@ -175,7 +175,12 @@ COMPILER_DISJUNCTION_HASH_THRESHOLD( getRangedIntegerType(-1, Integer.MAX_VALUE), AlgebricksConfig.HASH_BASED_OR_THRESHOLD_DEFAULT, - "The number of disjunctions after which a hash-based approach is used for evaluating OR operation (-1 disables using the hash-based approach)"); + "The number of disjunctions after which a hash-based approach is used for evaluating OR operation (-1 disables using the hash-based approach)"), + COMPILER_PARQUET_FILESPLITS(BOOLEAN, false, "Enable/disable parquet file splits"), + COMPILER_HDFS_SPLIT_PARALLELISM( + INTEGER, + Runtime.getRuntime().availableProcessors(), + "Number of threads to use for generating file splits for HDFS files"); private final IOptionType type; private final Object defaultValue; @@ -276,6 +281,8 @@ public static final int COMPILER_PARALLELISM_AS_STORAGE = 0; public static final String COMPILER_DELTALAKE_FILESPLITS_KEY = Option.COMPILER_DELTALAKE_FILESPLITS.ini(); + public static final String COMPILER_PARQUET_FILESPLITS_KEY = Option.COMPILER_PARQUET_FILESPLITS.ini(); + public static final String COMPILER_HDFS_SPLIT_PARALLELISM_KEY = Option.COMPILER_HDFS_SPLIT_PARALLELISM.ini(); public CompilerProperties(PropertiesAccessor accessor) { super(accessor); @@ -434,4 +441,13 @@ public boolean isDeltaLakeFileSplitsEnabled() { return accessor.getBoolean(Option.COMPILER_DELTALAKE_FILESPLITS); } + + public boolean isParquetFileSplitsEnabled() { + return accessor.getBoolean(Option.COMPILER_PARQUET_FILESPLITS); + } + + public int getHdfsSplitParallelism() { + return accessor.getInt(Option.COMPILER_HDFS_SPLIT_PARALLELISM); + } + } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java index fbd4779..2edfe4f1 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java @@ -105,6 +105,9 @@ getCommonExpressionLimitSize(compilerProperties, querySpecificConfig, sourceLoc); boolean orderFields = getBoolean(querySpecificConfig, CompilerProperties.COMPILER_ORDERED_FIELDS_KEY, compilerProperties.isOrderedFields()); + boolean useFileSplits = getBoolean(querySpecificConfig, CompilerProperties.COMPILER_PARQUET_FILESPLITS_KEY, + compilerProperties.isParquetFileSplitsEnabled()); + int hdfsSplitParallelism = getHdfsSplitParellelism(compilerProperties, querySpecificConfig, sourceLoc); PhysicalOptimizationConfig physOptConf = new PhysicalOptimizationConfig(); physOptConf.setFrameSize(frameSize); @@ -117,6 +120,8 @@ physOptConf.setSortSamples(sortNumSamples); physOptConf.setIndexOnly(indexOnly); physOptConf.setRewriteOrToJoin(rewriteOrToJoin); + physOptConf.setParquetFileSplit(useFileSplits); + physOptConf.setHdfsSplitParallelism(hdfsSplitParallelism); physOptConf.setSanityCheckEnabled(sanityCheck); physOptConf.setExternalFieldPushdown(externalFieldPushdown); physOptConf.setSubplanMerge(subplanMerge); @@ -283,4 +288,16 @@ throw AsterixException.create(ErrorCode.COMPILATION_ERROR, sourceLoc, e.getMessage()); } } + + private static int getHdfsSplitParellelism(CompilerProperties compilerProperties, + Map<String, Object> querySpecificConfig, SourceLocation sourceLoc) throws AsterixException { + String valueInQuery = (String) querySpecificConfig.get(CompilerProperties.COMPILER_HDFS_SPLIT_PARALLELISM_KEY); + try { + return valueInQuery == null ? compilerProperties.getHdfsSplitParallelism() + : OptionTypes.POSITIVE_INTEGER.parse(valueInQuery); + } catch (IllegalArgumentException e) { + throw AsterixException.create(ErrorCode.COMPILATION_ERROR, sourceLoc, e.getMessage()); + } + } + } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java index e08d025..fc7ce6f 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java @@ -33,6 +33,8 @@ import java.util.UUID; import org.apache.asterix.common.api.IApplicationContext; +import org.apache.asterix.common.config.CompilerProperties; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.external.IExternalFilterEvaluator; @@ -46,6 +48,7 @@ import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory; import org.apache.asterix.external.input.record.reader.hdfs.HDFSRecordReader; import org.apache.asterix.external.input.record.reader.hdfs.avro.AvroFileRecordReader; +import org.apache.asterix.external.input.record.reader.hdfs.parquet.MapredParquetInputFormat; import org.apache.asterix.external.input.record.reader.hdfs.parquet.ParquetFileRecordReader; import org.apache.asterix.external.input.record.reader.stream.StreamRecordReader; import org.apache.asterix.external.input.stream.HDFSInputStream; @@ -56,11 +59,13 @@ import org.apache.asterix.external.util.ExternalDataUtils; import org.apache.asterix.external.util.HDFSUtils; import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; @@ -132,6 +137,12 @@ configureHdfsConf(hdfsConf, configuration); } + private boolean getParquetFileSplitsConfig(Map<String, String> configuration, ICcApplicationContext appCtx) { + String fileSplits = configuration.get(CompilerProperties.COMPILER_PARQUET_FILESPLITS_KEY); + return fileSplits != null ? Boolean.parseBoolean(fileSplits) + : appCtx.getCompilerProperties().isParquetFileSplitsEnabled(); + } + private void extractRequiredFiles(IServiceContext serviceCtx, Map<String, String> configuration, IWarningCollector warningCollector, IExternalFilterEvaluatorFactory filterEvaluatorFactory, JobConf hdfsConf) throws HyracksDataException, AlgebricksException { @@ -177,7 +188,8 @@ this.configuration = configuration; this.filterEvaluatorFactory = filterEvaluatorFactory; init((ICCServiceContext) serviceCtx); - return HDFSUtils.configureHDFSJobConf(configuration); + return HDFSUtils.configureHDFSJobConf(configuration, + (ICcApplicationContext) serviceCtx.getApplicationContext()); } protected void configureHdfsConf(JobConf conf, Map<String, String> configuration) @@ -230,6 +242,16 @@ if (HDFSUtils.isEmpty(conf)) { return Scheduler.EMPTY_INPUT_SPLITS; } + boolean useFileSplits = getParquetFileSplitsConfig(configuration, + (ICcApplicationContext) serviceCtx.getApplicationContext()); + if (useFileSplits && conf.getInputFormat() instanceof MapredParquetInputFormat) { + FileStatus[] fs = ((MapredParquetInputFormat) conf.getInputFormat()).listStatus(conf); + List<InputSplit> splits = new ArrayList<>(); + for (FileStatus file : fs) { + splits.add(new FileSplit(file.getPath(), 0, file.getLen(), conf)); + } + return splits.toArray(new InputSplit[0]); + } return conf.getInputFormat().getSplits(conf, numPartitions); } catch (IllegalArgumentException e) { throw CompilationException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, e, getMessageOrToString(e) diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java index 9ac0043..e85fa75 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java @@ -167,7 +167,7 @@ LOGGER.info("Number of delta table parquet data files to scan: {}", scanFiles.size()); configuration.put(ExternalDataConstants.KEY_PARSER, ExternalDataConstants.FORMAT_DELTA); try { - usingSplits = getFileSplitsConfig(configuration, appCtx); + usingSplits = getDeltaFileSplitsConfig(configuration, appCtx); if (usingSplits) { distributeSplits(scanFiles, conf, numPartitions); } else { @@ -179,7 +179,7 @@ issueWarnings(warnings, warningCollector); } - private boolean getFileSplitsConfig(Map<String, String> configuration, ICcApplicationContext appCtx) { + private boolean getDeltaFileSplitsConfig(Map<String, String> configuration, ICcApplicationContext appCtx) { String fileSplits = configuration.get(CompilerProperties.COMPILER_DELTALAKE_FILESPLITS_KEY); return fileSplits != null ? Boolean.parseBoolean(fileSplits) : appCtx.getCompilerProperties().isDeltaLakeFileSplitsEnabled(); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java index 7b10e09..0321d20 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java @@ -26,6 +26,7 @@ import java.util.List; import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; @@ -75,6 +76,11 @@ return resultSplits; } + @Override + public FileStatus[] listStatus(JobConf job) throws IOException { + return super.listStatus(job); + } + public List<Footer> getFooters(JobConf job) throws IOException { return realInputFormat.getFooters(job, asList(super.listStatus(job))); } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java index e57b4d3..61ce50e 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java @@ -21,6 +21,7 @@ import static org.apache.asterix.common.exceptions.ErrorCode.REQUIRED_PARAM_IF_PARAM_IS_PRESENT; import static org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude; import static org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE; +import static org.apache.hadoop.mapreduce.lib.input.FileInputFormat.LIST_STATUS_NUM_THREADS; import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString; import java.io.ByteArrayInputStream; @@ -48,6 +49,7 @@ import javax.security.auth.login.LoginException; import org.apache.asterix.common.api.IApplicationContext; +import org.apache.asterix.common.config.CompilerProperties; import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.AsterixException; @@ -222,7 +224,7 @@ } } - public static JobConf configureHDFSJobConf(Map<String, String> configuration) { + public static JobConf configureHDFSJobConf(Map<String, String> configuration, ICcApplicationContext serviceCtx) { JobConf conf = new JobConf(); String localShortCircuitSocketPath = configuration.get(ExternalDataConstants.KEY_LOCAL_SOCKET_PATH); String formatClassName = HDFSUtils.getInputFormatClassName(configuration); @@ -243,6 +245,7 @@ } conf.setClassLoader(HDFSInputStream.class.getClassLoader()); conf.set(ExternalDataConstants.KEY_HADOOP_INPUT_FORMAT, formatClassName); + conf.set(LIST_STATUS_NUM_THREADS, getHdfsFileSplitParallelism(configuration, serviceCtx)); // Enable local short circuit reads if user supplied the parameters if (localShortCircuitSocketPath != null) { @@ -270,6 +273,12 @@ return conf; } + private static String getHdfsFileSplitParallelism(Map<String, String> configuration, ICcApplicationContext appCtx) { + String numThreads = configuration.get(CompilerProperties.COMPILER_HDFS_SPLIT_PARALLELISM_KEY); + return numThreads != null ? numThreads + : String.valueOf(appCtx.getCompilerProperties().getHdfsSplitParallelism()); + } + public static Configuration configureHDFSwrite(Map<String, String> configuration) { Configuration conf = new Configuration(); String url = configuration.get(ExternalDataConstants.KEY_HDFS_URL); diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java index f7bf0f6..624b0b8 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java @@ -140,7 +140,6 @@ addExternalProjectionInfo(projectionFiltrationInfo, edd.getProperties()); properties = addSubPath(externalDataSource.getProperties(), properties); properties.put(KEY_EXTERNAL_SCAN_BUFFER_SIZE, String.valueOf(externalScanBufferSize)); - setExternalCollectionCompilerProperties(metadataProvider, properties); IExternalFilterEvaluatorFactory filterEvaluatorFactory = metadataProvider .createExternalFilterEvaluatorFactory(context, typeEnv, projectionFiltrationInfo, properties); ITypedAdapterFactory adapterFactory = @@ -226,12 +225,22 @@ return dataset.getDatasetType() == DatasetType.EXTERNAL; } - private void setExternalCollectionCompilerProperties(MetadataProvider metadataProvider, + public static void setExternalCollectionCompilerProperties(MetadataProvider metadataProvider, Map<String, String> configuration) { - String fileSplits = + String deltaFileSplits = (String) metadataProvider.getConfig().get(CompilerProperties.COMPILER_DELTALAKE_FILESPLITS_KEY); - if (fileSplits != null) { - configuration.put(CompilerProperties.COMPILER_DELTALAKE_FILESPLITS_KEY, fileSplits); + if (deltaFileSplits != null) { + configuration.put(CompilerProperties.COMPILER_DELTALAKE_FILESPLITS_KEY, deltaFileSplits); + } + String parquetFileSplits = + (String) metadataProvider.getConfig().get(CompilerProperties.COMPILER_PARQUET_FILESPLITS_KEY); + if (parquetFileSplits != null) { + configuration.put(CompilerProperties.COMPILER_PARQUET_FILESPLITS_KEY, parquetFileSplits); + } + String hdfsSplitParallelism = + (String) metadataProvider.getConfig().get(CompilerProperties.COMPILER_HDFS_SPLIT_PARALLELISM_KEY); + if (hdfsSplitParallelism != null) { + configuration.put(CompilerProperties.COMPILER_HDFS_SPLIT_PARALLELISM_KEY, hdfsSplitParallelism); } } } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java index f2a6211..26060be 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java @@ -21,6 +21,7 @@ import static org.apache.asterix.common.api.IIdentifierMapper.Modifier.PLURAL; import static org.apache.asterix.common.metadata.MetadataConstants.METADATA_OBJECT_NAME_INVALID_CHARS; import static org.apache.asterix.common.utils.IdentifierUtil.dataset; +import static org.apache.asterix.metadata.declared.DatasetDataSource.setExternalCollectionCompilerProperties; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -1022,6 +1023,7 @@ configuration.put(ExternalDataConstants.KEY_DATASET_DATABASE, dataset.getDatabaseName()); configuration.put(ExternalDataConstants.KEY_DATASET_DATAVERSE, dataset.getDataverseName().getCanonicalForm()); + setExternalCollectionCompilerProperties(this, configuration); setExternalEntityId(configuration); setSourceType(configuration, adapterName); diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java index 6fc11cb..b87cb92 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java @@ -52,4 +52,5 @@ public static final int HASH_BASED_OR_THRESHOLD_DEFAULT = 40; public static final int MAX_EXPRESSION_TREE_SIZE_DEFAULT = 100; public static final int COMMON_EXPRESSION_LIMIT_DEFAULT = 100; + public static final int HDFS_SPLIT_PARALLEL_DEFAULT = Runtime.getRuntime().availableProcessors(); } diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java index 854f3ff..1aa821b 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java @@ -46,6 +46,8 @@ private static final String SORT_SAMPLES = "SORT_SAMPLES"; private static final String INDEX_ONLY = "INDEX_ONLY"; private static final String REWRITE_OR_TO_JOIN = "REWRITE_OR_TO_JOIN"; + private static final String ENABLE_PARQUET_FILE_SPLIT = "ENABLE_PARQUET_FILE_SPLIT"; + private static final String HDFS_SPLIT_PARALLELISM = "HDFS_SPLIT_PARALLELISM"; private static final String SANITY_CHECK = "SANITY_CHECK"; private static final String EXTERNAL_FIELD_PUSHDOWN = "EXTERNAL_FIELD_PUSHDOWN"; private static final String SUBPLAN_MERGE = "SUBPLAN_MERGE"; @@ -277,6 +279,18 @@ return getBoolean(INDEX_ONLY, AlgebricksConfig.INDEX_ONLY_DEFAULT); } + public void setParquetFileSplit(boolean parquetFileSplit) { + setBoolean(ENABLE_PARQUET_FILE_SPLIT, parquetFileSplit); + } + + public void setHdfsSplitParallelism(int hdfsSplitParallelism) { + setInt(HDFS_SPLIT_PARALLELISM, hdfsSplitParallelism); + } + + public int getHdfsSplitParallelism() { + return getInt(HDFS_SPLIT_PARALLELISM, AlgebricksConfig.HDFS_SPLIT_PARALLEL_DEFAULT); + } + public void setSanityCheckEnabled(boolean sanityCheck) { setBoolean(SANITY_CHECK, sanityCheck); } -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20958?usp=email To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings?usp=email Gerrit-MessageType: merged Gerrit-Project: asterixdb Gerrit-Branch: lumina Gerrit-Change-Id: I288908499c90320f9fc497675ef3d671163c69f0 Gerrit-Change-Number: 20958 Gerrit-PatchSet: 13 Gerrit-Owner: Preetham Poluparthi <[email protected]> Gerrit-Reviewer: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Preetham Poluparthi <[email protected]>
