>From Preetham Poluparthi <[email protected]>:

Preetham Poluparthi has submitted this change. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20958?usp=email )

Change subject: [ASTERIXDB-3707][EXT] Add file level split while reading 
parquet files
......................................................................

[ASTERIXDB-3707][EXT] Add file level split while reading parquet files

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
Introduces support for file-based splits during compilation and adds 
'compiler.hdfs.split.parallelism' to control thread count for file listing to 
improve compilation time.
Ext-ref : MB-70421

Change-Id: I288908499c90320f9fc497675ef3d671163c69f0
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20958
Tested-by: Jenkins <[email protected]>
Reviewed-by: Ali Alsuliman <[email protected]>
---
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/compiler/provider/SqlppCompilationProvider.java
M 
asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
M 
asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
M 
asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
14 files changed, 114 insertions(+), 10 deletions(-)

Approvals:
  Jenkins: Verified
  Ali Alsuliman: Looks good to me, approved




diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/compiler/provider/SqlppCompilationProvider.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/compiler/provider/SqlppCompilationProvider.java
index 26fe54c..c54a531 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/compiler/provider/SqlppCompilationProvider.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/compiler/provider/SqlppCompilationProvider.java
@@ -112,6 +112,8 @@
                 StartFeedStatement.WAIT_FOR_COMPLETION, 
FeedActivityDetails.FEED_POLICY_NAME,
                 FeedActivityDetails.COLLECT_LOCATIONS, 
SqlppQueryRewriter.INLINE_WITH_OPTION,
                 SqlppExpressionToPlanTranslator.REWRITE_IN_AS_OR_OPTION, 
"hash_merge", "output-record-type",
+                CompilerProperties.COMPILER_PARQUET_FILESPLITS_KEY,
+                CompilerProperties.COMPILER_HDFS_SPLIT_PARALLELISM_KEY,
                 DisjunctivePredicateToJoinRule.REWRITE_OR_AS_JOIN_OPTION,
                 SetAsterixPhysicalOperatorsRule.REWRITE_ATTEMPT_BATCH_ASSIGN,
                 EquivalenceClassUtils.REWRITE_INTERNAL_QUERYUID_PK, 
SqlppQueryRewriter.SQL_COMPAT_OPTION,
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
index 47034dd..10c1856 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
@@ -60,6 +60,7 @@
     "compiler.forcejoinorder" : false,
     "compiler\.framesize" : 32768,
     "compiler\.groupmemory" : 163840,
+    "compiler\.hdfs\.split\.parallelism" : 4,
     "compiler\.index.covering" : true,
     "compiler\.internal\.sanitycheck" : true,
     "compiler\.joinmemory" : 262144,
@@ -72,6 +73,7 @@
     "compiler\.optimize\.expression.max\.args" : 100,
     "compiler.ordered.fields" : false,
     "compiler\.parallelism" : 0,
+    "compiler\.parquet\.filesplits" : false,
     "compiler.queryplanshape" : "zigzag",
     "compiler.runtime.memory.overhead" : 5,
     "compiler\.sort\.parallel" : false,
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
index f963801..22c4fc8 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
@@ -60,6 +60,7 @@
     "compiler.forcejoinorder" : false,
     "compiler\.framesize" : 32768,
     "compiler\.groupmemory" : 163840,
+    "compiler\.hdfs\.split\.parallelism" : 4,
     "compiler\.index.covering" : true,
     "compiler\.internal\.sanitycheck" : false,
     "compiler\.joinmemory" : 262144,
@@ -72,6 +73,7 @@
     "compiler.optimize.expression.max.args" : 100,
     "compiler.ordered.fields" : false,
     "compiler\.parallelism" : -1,
+    "compiler\.parquet\.filesplits" : false,
     "compiler.queryplanshape" : "zigzag",
     "compiler.runtime.memory.overhead" : 5,
     "compiler\.sort\.parallel" : true,
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
index 824698e..a36d3b5 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
@@ -60,6 +60,7 @@
     "compiler.forcejoinorder" : false,
     "compiler\.framesize" : 32768,
     "compiler\.groupmemory" : 163840,
+    "compiler\.hdfs\.split\.parallelism" : 4,
     "compiler\.index.covering" : true,
     "compiler\.internal\.sanitycheck" : false,
     "compiler\.joinmemory" : 262144,
@@ -72,6 +73,7 @@
     "compiler.optimize.expression.max.args" : 100,
     "compiler.ordered.fields" : false,
     "compiler\.parallelism" : 3,
+    "compiler\.parquet\.filesplits" : false,
     "compiler.queryplanshape" : "zigzag",
     "compiler.runtime.memory.overhead" : 5,
     "compiler\.sort\.parallel" : true,
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
index ea2dddc..073bd43 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
@@ -175,7 +175,12 @@
         COMPILER_DISJUNCTION_HASH_THRESHOLD(
                 getRangedIntegerType(-1, Integer.MAX_VALUE),
                 AlgebricksConfig.HASH_BASED_OR_THRESHOLD_DEFAULT,
-                "The number of disjunctions after which a hash-based approach 
is used for evaluating OR operation (-1 disables using the hash-based 
approach)");
+                "The number of disjunctions after which a hash-based approach 
is used for evaluating OR operation (-1 disables using the hash-based 
approach)"),
+        COMPILER_PARQUET_FILESPLITS(BOOLEAN, false, "Enable/disable parquet 
file splits"),
+        COMPILER_HDFS_SPLIT_PARALLELISM(
+                INTEGER,
+                Runtime.getRuntime().availableProcessors(),
+                "Number of threads to use for generating file splits for HDFS 
files");

         private final IOptionType type;
         private final Object defaultValue;
@@ -276,6 +281,8 @@

     public static final int COMPILER_PARALLELISM_AS_STORAGE = 0;
     public static final String COMPILER_DELTALAKE_FILESPLITS_KEY = 
Option.COMPILER_DELTALAKE_FILESPLITS.ini();
+    public static final String COMPILER_PARQUET_FILESPLITS_KEY = 
Option.COMPILER_PARQUET_FILESPLITS.ini();
+    public static final String COMPILER_HDFS_SPLIT_PARALLELISM_KEY = 
Option.COMPILER_HDFS_SPLIT_PARALLELISM.ini();

     public CompilerProperties(PropertiesAccessor accessor) {
         super(accessor);
@@ -434,4 +441,13 @@
     public boolean isDeltaLakeFileSplitsEnabled() {
         return accessor.getBoolean(Option.COMPILER_DELTALAKE_FILESPLITS);
     }
+
+    public boolean isParquetFileSplitsEnabled() {
+        return accessor.getBoolean(Option.COMPILER_PARQUET_FILESPLITS);
+    }
+
+    public int getHdfsSplitParallelism() {
+        return accessor.getInt(Option.COMPILER_HDFS_SPLIT_PARALLELISM);
+    }
+
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java
index fbd4779..2edfe4f1 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java
@@ -105,6 +105,9 @@
                 getCommonExpressionLimitSize(compilerProperties, 
querySpecificConfig, sourceLoc);
         boolean orderFields = getBoolean(querySpecificConfig, 
CompilerProperties.COMPILER_ORDERED_FIELDS_KEY,
                 compilerProperties.isOrderedFields());
+        boolean useFileSplits = getBoolean(querySpecificConfig, 
CompilerProperties.COMPILER_PARQUET_FILESPLITS_KEY,
+                compilerProperties.isParquetFileSplitsEnabled());
+        int hdfsSplitParallelism = getHdfsSplitParellelism(compilerProperties, 
querySpecificConfig, sourceLoc);

         PhysicalOptimizationConfig physOptConf = new 
PhysicalOptimizationConfig();
         physOptConf.setFrameSize(frameSize);
@@ -117,6 +120,8 @@
         physOptConf.setSortSamples(sortNumSamples);
         physOptConf.setIndexOnly(indexOnly);
         physOptConf.setRewriteOrToJoin(rewriteOrToJoin);
+        physOptConf.setParquetFileSplit(useFileSplits);
+        physOptConf.setHdfsSplitParallelism(hdfsSplitParallelism);
         physOptConf.setSanityCheckEnabled(sanityCheck);
         physOptConf.setExternalFieldPushdown(externalFieldPushdown);
         physOptConf.setSubplanMerge(subplanMerge);
@@ -283,4 +288,16 @@
             throw AsterixException.create(ErrorCode.COMPILATION_ERROR, 
sourceLoc, e.getMessage());
         }
     }
+
+    private static int getHdfsSplitParellelism(CompilerProperties 
compilerProperties,
+            Map<String, Object> querySpecificConfig, SourceLocation sourceLoc) 
throws AsterixException {
+        String valueInQuery = (String) 
querySpecificConfig.get(CompilerProperties.COMPILER_HDFS_SPLIT_PARALLELISM_KEY);
+        try {
+            return valueInQuery == null ? 
compilerProperties.getHdfsSplitParallelism()
+                    : OptionTypes.POSITIVE_INTEGER.parse(valueInQuery);
+        } catch (IllegalArgumentException e) {
+            throw AsterixException.create(ErrorCode.COMPILATION_ERROR, 
sourceLoc, e.getMessage());
+        }
+    }
+
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
index e08d025..fc7ce6f 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
@@ -33,6 +33,8 @@
 import java.util.UUID;

 import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.config.CompilerProperties;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.external.IExternalFilterEvaluator;
@@ -46,6 +48,7 @@
 import 
org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
 import org.apache.asterix.external.input.record.reader.hdfs.HDFSRecordReader;
 import 
org.apache.asterix.external.input.record.reader.hdfs.avro.AvroFileRecordReader;
+import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.MapredParquetInputFormat;
 import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.ParquetFileRecordReader;
 import 
org.apache.asterix.external.input.record.reader.stream.StreamRecordReader;
 import org.apache.asterix.external.input.stream.HDFSInputStream;
@@ -56,11 +59,13 @@
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.HDFSUtils;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
@@ -132,6 +137,12 @@
         configureHdfsConf(hdfsConf, configuration);
     }

+    private boolean getParquetFileSplitsConfig(Map<String, String> 
configuration, ICcApplicationContext appCtx) {
+        String fileSplits = 
configuration.get(CompilerProperties.COMPILER_PARQUET_FILESPLITS_KEY);
+        return fileSplits != null ? Boolean.parseBoolean(fileSplits)
+                : appCtx.getCompilerProperties().isParquetFileSplitsEnabled();
+    }
+
     private void extractRequiredFiles(IServiceContext serviceCtx, Map<String, 
String> configuration,
             IWarningCollector warningCollector, 
IExternalFilterEvaluatorFactory filterEvaluatorFactory,
             JobConf hdfsConf) throws HyracksDataException, AlgebricksException 
{
@@ -177,7 +188,8 @@
         this.configuration = configuration;
         this.filterEvaluatorFactory = filterEvaluatorFactory;
         init((ICCServiceContext) serviceCtx);
-        return HDFSUtils.configureHDFSJobConf(configuration);
+        return HDFSUtils.configureHDFSJobConf(configuration,
+                (ICcApplicationContext) serviceCtx.getApplicationContext());
     }
 
     protected void configureHdfsConf(JobConf conf, Map<String, String> 
configuration)
@@ -230,6 +242,16 @@
             if (HDFSUtils.isEmpty(conf)) {
                 return Scheduler.EMPTY_INPUT_SPLITS;
             }
+            boolean useFileSplits = getParquetFileSplitsConfig(configuration,
+                    (ICcApplicationContext) 
serviceCtx.getApplicationContext());
+            if (useFileSplits && conf.getInputFormat() instanceof 
MapredParquetInputFormat) {
+                FileStatus[] fs = ((MapredParquetInputFormat) 
conf.getInputFormat()).listStatus(conf);
+                List<InputSplit> splits = new ArrayList<>();
+                for (FileStatus file : fs) {
+                    splits.add(new FileSplit(file.getPath(), 0, file.getLen(), 
conf));
+                }
+                return splits.toArray(new InputSplit[0]);
+            }
             return conf.getInputFormat().getSplits(conf, numPartitions);
         } catch (IllegalArgumentException e) {
             throw CompilationException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, 
e, getMessageOrToString(e)
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
index 9ac0043..e85fa75 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
@@ -167,7 +167,7 @@
         LOGGER.info("Number of delta table parquet data files to scan: {}", 
scanFiles.size());
         configuration.put(ExternalDataConstants.KEY_PARSER, 
ExternalDataConstants.FORMAT_DELTA);
         try {
-            usingSplits = getFileSplitsConfig(configuration, appCtx);
+            usingSplits = getDeltaFileSplitsConfig(configuration, appCtx);
             if (usingSplits) {
                 distributeSplits(scanFiles, conf, numPartitions);
             } else {
@@ -179,7 +179,7 @@
         issueWarnings(warnings, warningCollector);
     }

-    private boolean getFileSplitsConfig(Map<String, String> configuration, 
ICcApplicationContext appCtx) {
+    private boolean getDeltaFileSplitsConfig(Map<String, String> 
configuration, ICcApplicationContext appCtx) {
         String fileSplits = 
configuration.get(CompilerProperties.COMPILER_DELTALAKE_FILESPLITS_KEY);
         return fileSplits != null ? Boolean.parseBoolean(fileSplits)
                 : 
appCtx.getCompilerProperties().isDeltaLakeFileSplitsEnabled();
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java
index 7b10e09..0321d20 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java
@@ -26,6 +26,7 @@
 import java.util.List;

 import 
org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
@@ -75,6 +76,11 @@
         return resultSplits;
     }

+    @Override
+    public FileStatus[] listStatus(JobConf job) throws IOException {
+        return super.listStatus(job);
+    }
+
     public List<Footer> getFooters(JobConf job) throws IOException {
         return realInputFormat.getFooters(job, asList(super.listStatus(job)));
     }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index e57b4d3..61ce50e 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -21,6 +21,7 @@
 import static 
org.apache.asterix.common.exceptions.ErrorCode.REQUIRED_PARAM_IF_PARAM_IS_PRESENT;
 import static 
org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude;
 import static 
org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE;
+import static 
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.LIST_STATUS_NUM_THREADS;
 import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;

 import java.io.ByteArrayInputStream;
@@ -48,6 +49,7 @@
 import javax.security.auth.login.LoginException;

 import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.config.CompilerProperties;
 import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
@@ -222,7 +224,7 @@
         }
     }

-    public static JobConf configureHDFSJobConf(Map<String, String> 
configuration) {
+    public static JobConf configureHDFSJobConf(Map<String, String> 
configuration, ICcApplicationContext serviceCtx) {
         JobConf conf = new JobConf();
         String localShortCircuitSocketPath = 
configuration.get(ExternalDataConstants.KEY_LOCAL_SOCKET_PATH);
         String formatClassName = 
HDFSUtils.getInputFormatClassName(configuration);
@@ -243,6 +245,7 @@
         }
         conf.setClassLoader(HDFSInputStream.class.getClassLoader());
         conf.set(ExternalDataConstants.KEY_HADOOP_INPUT_FORMAT, 
formatClassName);
+        conf.set(LIST_STATUS_NUM_THREADS, 
getHdfsFileSplitParallelism(configuration, serviceCtx));

         // Enable local short circuit reads if user supplied the parameters
         if (localShortCircuitSocketPath != null) {
@@ -270,6 +273,12 @@
         return conf;
     }

+    private static String getHdfsFileSplitParallelism(Map<String, String> 
configuration, ICcApplicationContext appCtx) {
+        String numThreads = 
configuration.get(CompilerProperties.COMPILER_HDFS_SPLIT_PARALLELISM_KEY);
+        return numThreads != null ? numThreads
+                : 
String.valueOf(appCtx.getCompilerProperties().getHdfsSplitParallelism());
+    }
+
     public static Configuration configureHDFSwrite(Map<String, String> 
configuration) {
         Configuration conf = new Configuration();
         String url = configuration.get(ExternalDataConstants.KEY_HDFS_URL);
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
index f7bf0f6..624b0b8 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
@@ -140,7 +140,6 @@
                         addExternalProjectionInfo(projectionFiltrationInfo, 
edd.getProperties());
                 properties = addSubPath(externalDataSource.getProperties(), 
properties);
                 properties.put(KEY_EXTERNAL_SCAN_BUFFER_SIZE, 
String.valueOf(externalScanBufferSize));
-                setExternalCollectionCompilerProperties(metadataProvider, 
properties);
                 IExternalFilterEvaluatorFactory filterEvaluatorFactory = 
metadataProvider
                         .createExternalFilterEvaluatorFactory(context, 
typeEnv, projectionFiltrationInfo, properties);
                 ITypedAdapterFactory adapterFactory =
@@ -226,12 +225,22 @@
         return dataset.getDatasetType() == DatasetType.EXTERNAL;
     }

-    private void setExternalCollectionCompilerProperties(MetadataProvider 
metadataProvider,
+    public static void 
setExternalCollectionCompilerProperties(MetadataProvider metadataProvider,
             Map<String, String> configuration) {
-        String fileSplits =
+        String deltaFileSplits =
                 (String) 
metadataProvider.getConfig().get(CompilerProperties.COMPILER_DELTALAKE_FILESPLITS_KEY);
-        if (fileSplits != null) {
-            
configuration.put(CompilerProperties.COMPILER_DELTALAKE_FILESPLITS_KEY, 
fileSplits);
+        if (deltaFileSplits != null) {
+            
configuration.put(CompilerProperties.COMPILER_DELTALAKE_FILESPLITS_KEY, 
deltaFileSplits);
+        }
+        String parquetFileSplits =
+                (String) 
metadataProvider.getConfig().get(CompilerProperties.COMPILER_PARQUET_FILESPLITS_KEY);
+        if (parquetFileSplits != null) {
+            
configuration.put(CompilerProperties.COMPILER_PARQUET_FILESPLITS_KEY, 
parquetFileSplits);
+        }
+        String hdfsSplitParallelism =
+                (String) 
metadataProvider.getConfig().get(CompilerProperties.COMPILER_HDFS_SPLIT_PARALLELISM_KEY);
+        if (hdfsSplitParallelism != null) {
+            
configuration.put(CompilerProperties.COMPILER_HDFS_SPLIT_PARALLELISM_KEY, 
hdfsSplitParallelism);
         }
     }
 }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index f2a6211..26060be 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -21,6 +21,7 @@
 import static org.apache.asterix.common.api.IIdentifierMapper.Modifier.PLURAL;
 import static 
org.apache.asterix.common.metadata.MetadataConstants.METADATA_OBJECT_NAME_INVALID_CHARS;
 import static org.apache.asterix.common.utils.IdentifierUtil.dataset;
+import static 
org.apache.asterix.metadata.declared.DatasetDataSource.setExternalCollectionCompilerProperties;

 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
@@ -1022,6 +1023,7 @@
             configuration.put(ExternalDataConstants.KEY_DATASET_DATABASE, 
dataset.getDatabaseName());
             configuration.put(ExternalDataConstants.KEY_DATASET_DATAVERSE,
                     dataset.getDataverseName().getCanonicalForm());
+            setExternalCollectionCompilerProperties(this, configuration);
             setExternalEntityId(configuration);
             setSourceType(configuration, adapterName);

diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java
index 6fc11cb..b87cb92 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java
@@ -52,4 +52,5 @@
     public static final int HASH_BASED_OR_THRESHOLD_DEFAULT = 40;
     public static final int MAX_EXPRESSION_TREE_SIZE_DEFAULT = 100;
     public static final int COMMON_EXPRESSION_LIMIT_DEFAULT = 100;
+    public static final int HDFS_SPLIT_PARALLEL_DEFAULT = 
Runtime.getRuntime().availableProcessors();
 }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
index 854f3ff..1aa821b 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
@@ -46,6 +46,8 @@
     private static final String SORT_SAMPLES = "SORT_SAMPLES";
     private static final String INDEX_ONLY = "INDEX_ONLY";
     private static final String REWRITE_OR_TO_JOIN = "REWRITE_OR_TO_JOIN";
+    private static final String ENABLE_PARQUET_FILE_SPLIT = 
"ENABLE_PARQUET_FILE_SPLIT";
+    private static final String HDFS_SPLIT_PARALLELISM = 
"HDFS_SPLIT_PARALLELISM";
     private static final String SANITY_CHECK = "SANITY_CHECK";
     private static final String EXTERNAL_FIELD_PUSHDOWN = 
"EXTERNAL_FIELD_PUSHDOWN";
     private static final String SUBPLAN_MERGE = "SUBPLAN_MERGE";
@@ -277,6 +279,18 @@
         return getBoolean(INDEX_ONLY, AlgebricksConfig.INDEX_ONLY_DEFAULT);
     }

+    public void setParquetFileSplit(boolean parquetFileSplit) {
+        setBoolean(ENABLE_PARQUET_FILE_SPLIT, parquetFileSplit);
+    }
+
+    public void setHdfsSplitParallelism(int hdfsSplitParallelism) {
+        setInt(HDFS_SPLIT_PARALLELISM, hdfsSplitParallelism);
+    }
+
+    public int getHdfsSplitParallelism() {
+        return getInt(HDFS_SPLIT_PARALLELISM, 
AlgebricksConfig.HDFS_SPLIT_PARALLEL_DEFAULT);
+    }
+
     public void setSanityCheckEnabled(boolean sanityCheck) {
         setBoolean(SANITY_CHECK, sanityCheck);
     }

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20958?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings?usp=email

Gerrit-MessageType: merged
Gerrit-Project: asterixdb
Gerrit-Branch: lumina
Gerrit-Change-Id: I288908499c90320f9fc497675ef3d671163c69f0
Gerrit-Change-Number: 20958
Gerrit-PatchSet: 13
Gerrit-Owner: Preetham Poluparthi <[email protected]>
Gerrit-Reviewer: Ali Alsuliman <[email protected]>
Gerrit-Reviewer: Anon. E. Moose #1000171
Gerrit-Reviewer: Jenkins <[email protected]>
Gerrit-Reviewer: Michael Blow <[email protected]>
Gerrit-Reviewer: Murtadha Hubail <[email protected]>
Gerrit-Reviewer: Preetham Poluparthi <[email protected]>

Reply via email to