>From Preetham Poluparthi <[email protected]>:
Preetham Poluparthi has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20958?usp=email )
Change subject: [ASTERIXDB-3707][EXT] Add file level split while reading
parquet files
......................................................................
[ASTERIXDB-3707][EXT] Add file level split while reading parquet files
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
Introduces support for file-based splits during compilation and adds
'compiler.hdfs.split.parallelism' to control thread count for file listing to
improve compilation time.
Change-Id: I288908499c90320f9fc497675ef3d671163c69f0
---
M
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/compiler/provider/SqlppCompilationProvider.java
M
asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
M
asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
M
asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
10 files changed, 77 insertions(+), 9 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/58/20958/1
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/compiler/provider/SqlppCompilationProvider.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/compiler/provider/SqlppCompilationProvider.java
index b6a9295..3835d92 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/compiler/provider/SqlppCompilationProvider.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/compiler/provider/SqlppCompilationProvider.java
@@ -97,7 +97,9 @@
CompilerProperties.COMPILER_COLUMN_FILTER_KEY,
CompilerProperties.COMPILER_BATCH_LOOKUP_KEY,
CompilerProperties.COMPILER_FRAMESIZE_KEY,
FunctionUtil.IMPORT_PRIVATE_FUNCTIONS,
CompilerProperties.COMPILER_MAX_VARIABLE_OCCURRENCES_INLINING_KEY,
- CompilerProperties.COMPILER_DELTALAKE_FILESPLITS_KEY,
FuzzyUtils.SIM_FUNCTION_PROP_NAME,
+ CompilerProperties.COMPILER_DELTALAKE_FILESPLITS_KEY,
+ CompilerProperties.COMPILER_PARQUET_FILESPLITS_KEY,
+ CompilerProperties.COMPILER_HDFS_SPLIT_PARALLELISM_KEY,
FuzzyUtils.SIM_FUNCTION_PROP_NAME,
FuzzyUtils.SIM_THRESHOLD_PROP_NAME,
StartFeedStatement.WAIT_FOR_COMPLETION,
FeedActivityDetails.FEED_POLICY_NAME,
FeedActivityDetails.COLLECT_LOCATIONS,
SqlppQueryRewriter.INLINE_WITH_OPTION,
SqlppExpressionToPlanTranslator.REWRITE_IN_AS_OR_OPTION,
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
index ead02b7..8848a4f 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
@@ -49,6 +49,7 @@
"compiler.forcejoinorder" : false,
"compiler\.framesize" : 32768,
"compiler\.groupmemory" : 163840,
+ "compiler\.hdfs\.split\.parallelism" : 4,
"compiler\.index.covering" : true,
"compiler\.internal\.sanitycheck" : true,
"compiler\.joinmemory" : 262144,
@@ -60,6 +61,7 @@
"compiler.min.windowmemory" : 524288,
"compiler.ordered.fields" : false,
"compiler\.parallelism" : 0,
+ "compiler\.parquet\.filesplits" : false,
"compiler.queryplanshape" : "zigzag",
"compiler.runtime.memory.overhead" : 5,
"compiler\.sort\.parallel" : false,
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
index 0e9e7fb..262b20e 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
@@ -49,6 +49,7 @@
"compiler.forcejoinorder" : false,
"compiler\.framesize" : 32768,
"compiler\.groupmemory" : 163840,
+ "compiler\.hdfs\.split\.parallelism" : 4,
"compiler\.index.covering" : true,
"compiler\.internal\.sanitycheck" : false,
"compiler\.joinmemory" : 262144,
@@ -60,6 +61,7 @@
"compiler.min.windowmemory" : 524288,
"compiler.ordered.fields" : false,
"compiler\.parallelism" : -1,
+ "compiler\.parquet\.filesplits" : false,
"compiler.queryplanshape" : "zigzag",
"compiler.runtime.memory.overhead" : 5,
"compiler\.sort\.parallel" : true,
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
index 8e3c806..9a1c718 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
@@ -49,6 +49,7 @@
"compiler.forcejoinorder" : false,
"compiler\.framesize" : 32768,
"compiler\.groupmemory" : 163840,
+ "compiler\.hdfs\.split\.parallelism" : 4,
"compiler\.index.covering" : true,
"compiler\.internal\.sanitycheck" : false,
"compiler\.joinmemory" : 262144,
@@ -60,6 +61,7 @@
"compiler.min.windowmemory" : 524288,
"compiler.ordered.fields" : false,
"compiler\.parallelism" : 3,
+ "compiler\.parquet\.filesplits" : false,
"compiler.queryplanshape" : "zigzag",
"compiler.runtime.memory.overhead" : 5,
"compiler\.sort\.parallel" : true,
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
index 62be4d0..115342a 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
@@ -159,7 +159,12 @@
128,
"Maximum occurrences of a variable allowed in an expression
for inlining"),
COMPILER_ORDERED_FIELDS(BOOLEAN, AlgebricksConfig.ORDERED_FIELDS,
"Enable/disable select order list"),
- COMPILER_DELTALAKE_FILESPLITS(BOOLEAN, false, "Enable/disable delta
lake file splits");
+ COMPILER_DELTALAKE_FILESPLITS(BOOLEAN, false, "Enable/disable delta
lake file splits"),
+ COMPILER_PARQUET_FILESPLITS(BOOLEAN, false, "Enable/disable parquet
file splits"),
+ COMPILER_HDFS_SPLIT_PARALLELISM(
+ INTEGER,
+ Runtime.getRuntime().availableProcessors(),
+ "Number of threads to use for generating file splits for HDFS
files");
private final IOptionType type;
private final Object defaultValue;
@@ -250,6 +255,8 @@
public static final int COMPILER_PARALLELISM_AS_STORAGE = 0;
public static final String COMPILER_DELTALAKE_FILESPLITS_KEY =
Option.COMPILER_DELTALAKE_FILESPLITS.ini();
+ public static final String COMPILER_PARQUET_FILESPLITS_KEY =
Option.COMPILER_PARQUET_FILESPLITS.ini();
+ public static final String COMPILER_HDFS_SPLIT_PARALLELISM_KEY =
Option.COMPILER_HDFS_SPLIT_PARALLELISM.ini();
public CompilerProperties(PropertiesAccessor accessor) {
super(accessor);
@@ -396,4 +403,13 @@
public boolean isDeltaLakeFileSplitsEnabled() {
return accessor.getBoolean(Option.COMPILER_DELTALAKE_FILESPLITS);
}
+
+ public boolean isParquetFileSplitsEnabled() {
+ return accessor.getBoolean(Option.COMPILER_PARQUET_FILESPLITS);
+ }
+
+ public int getHdfsSplitParallelism() {
+ return accessor.getInt(Option.COMPILER_HDFS_SPLIT_PARALLELISM);
+ }
+
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
index 82653c2..8a894fb 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
@@ -33,6 +33,8 @@
import java.util.UUID;
import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.config.CompilerProperties;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.external.IExternalFilterEvaluator;
@@ -46,6 +48,7 @@
import
org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
import org.apache.asterix.external.input.record.reader.hdfs.HDFSRecordReader;
import
org.apache.asterix.external.input.record.reader.hdfs.avro.AvroFileRecordReader;
+import
org.apache.asterix.external.input.record.reader.hdfs.parquet.MapredParquetInputFormat;
import
org.apache.asterix.external.input.record.reader.hdfs.parquet.ParquetFileRecordReader;
import
org.apache.asterix.external.input.record.reader.stream.StreamRecordReader;
import org.apache.asterix.external.input.stream.HDFSInputStream;
@@ -56,11 +59,13 @@
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.util.HDFSUtils;
import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
@@ -110,6 +115,7 @@
private transient Credentials credentials;
private byte[] serializedCredentials;
private transient UserGroupInformation ugi;
+ private boolean useFileSplits = false;
@Override
public void configure(IServiceContext serviceCtx, Map<String, String>
configuration,
@@ -130,6 +136,14 @@
extractRequiredFiles(serviceCtx, configuration, warningCollector,
filterEvaluatorFactory, hdfsConf);
}
configureHdfsConf(hdfsConf, configuration);
+ useFileSplits =
+ getParquetFileSplitsConfig(configuration,
(ICcApplicationContext) serviceCtx.getApplicationContext());
+ }
+
+ private boolean getParquetFileSplitsConfig(Map<String, String>
configuration, ICcApplicationContext appCtx) {
+ String fileSplits =
configuration.get(CompilerProperties.COMPILER_PARQUET_FILESPLITS_KEY);
+ return fileSplits != null ? Boolean.parseBoolean(fileSplits)
+ : appCtx.getCompilerProperties().isParquetFileSplitsEnabled();
}
private void extractRequiredFiles(IServiceContext serviceCtx, Map<String,
String> configuration,
@@ -177,7 +191,8 @@
this.configuration = configuration;
this.filterEvaluatorFactory = filterEvaluatorFactory;
init((ICCServiceContext) serviceCtx);
- return HDFSUtils.configureHDFSJobConf(configuration);
+ return HDFSUtils.configureHDFSJobConf(configuration,
+ (ICcApplicationContext) serviceCtx.getApplicationContext());
}
protected void configureHdfsConf(JobConf conf, Map<String, String>
configuration)
@@ -229,6 +244,14 @@
if (HDFSUtils.isEmpty(conf)) {
return Scheduler.EMPTY_INPUT_SPLITS;
}
+ if (!useFileSplits && conf.getInputFormat() instanceof
MapredParquetInputFormat) {
+ FileStatus[] fs = ((MapredParquetInputFormat)
conf.getInputFormat()).listStatus(conf);
+ List<InputSplit> splits = new ArrayList<>();
+ for (FileStatus file : fs) {
+ splits.add(new FileSplit(file.getPath(), 0, file.getLen(),
conf));
+ }
+ return splits.toArray(new InputSplit[0]);
+ }
return conf.getInputFormat().getSplits(conf, numPartitions);
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
index 9ac0043..e85fa75 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
@@ -167,7 +167,7 @@
LOGGER.info("Number of delta table parquet data files to scan: {}",
scanFiles.size());
configuration.put(ExternalDataConstants.KEY_PARSER,
ExternalDataConstants.FORMAT_DELTA);
try {
- usingSplits = getFileSplitsConfig(configuration, appCtx);
+ usingSplits = getDeltaFileSplitsConfig(configuration, appCtx);
if (usingSplits) {
distributeSplits(scanFiles, conf, numPartitions);
} else {
@@ -179,7 +179,7 @@
issueWarnings(warnings, warningCollector);
}
- private boolean getFileSplitsConfig(Map<String, String> configuration,
ICcApplicationContext appCtx) {
+ private boolean getDeltaFileSplitsConfig(Map<String, String>
configuration, ICcApplicationContext appCtx) {
String fileSplits =
configuration.get(CompilerProperties.COMPILER_DELTALAKE_FILESPLITS_KEY);
return fileSplits != null ? Boolean.parseBoolean(fileSplits)
:
appCtx.getCompilerProperties().isDeltaLakeFileSplitsEnabled();
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java
index 7b10e09..0321d20 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java
@@ -26,6 +26,7 @@
import java.util.List;
import
org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
@@ -75,6 +76,11 @@
return resultSplits;
}
+ @Override
+ public FileStatus[] listStatus(JobConf job) throws IOException {
+ return super.listStatus(job);
+ }
+
public List<Footer> getFooters(JobConf job) throws IOException {
return realInputFormat.getFooters(job, asList(super.listStatus(job)));
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index e57b4d3..61ce50e 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -21,6 +21,7 @@
import static
org.apache.asterix.common.exceptions.ErrorCode.REQUIRED_PARAM_IF_PARAM_IS_PRESENT;
import static
org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude;
import static
org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE;
+import static
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.LIST_STATUS_NUM_THREADS;
import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
import java.io.ByteArrayInputStream;
@@ -48,6 +49,7 @@
import javax.security.auth.login.LoginException;
import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.config.CompilerProperties;
import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
@@ -222,7 +224,7 @@
}
}
- public static JobConf configureHDFSJobConf(Map<String, String>
configuration) {
+ public static JobConf configureHDFSJobConf(Map<String, String>
configuration, ICcApplicationContext serviceCtx) {
JobConf conf = new JobConf();
String localShortCircuitSocketPath =
configuration.get(ExternalDataConstants.KEY_LOCAL_SOCKET_PATH);
String formatClassName =
HDFSUtils.getInputFormatClassName(configuration);
@@ -243,6 +245,7 @@
}
conf.setClassLoader(HDFSInputStream.class.getClassLoader());
conf.set(ExternalDataConstants.KEY_HADOOP_INPUT_FORMAT,
formatClassName);
+ conf.set(LIST_STATUS_NUM_THREADS,
getHdfsFileSplitParallelism(configuration, serviceCtx));
// Enable local short circuit reads if user supplied the parameters
if (localShortCircuitSocketPath != null) {
@@ -270,6 +273,12 @@
return conf;
}
+ private static String getHdfsFileSplitParallelism(Map<String, String>
configuration, ICcApplicationContext appCtx) {
+ String numThreads =
configuration.get(CompilerProperties.COMPILER_HDFS_SPLIT_PARALLELISM_KEY);
+ return numThreads != null ? numThreads
+ :
String.valueOf(appCtx.getCompilerProperties().getHdfsSplitParallelism());
+ }
+
public static Configuration configureHDFSwrite(Map<String, String>
configuration) {
Configuration conf = new Configuration();
String url = configuration.get(ExternalDataConstants.KEY_HDFS_URL);
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
index f7bf0f6..cf5c004 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
@@ -228,10 +228,16 @@
private void setExternalCollectionCompilerProperties(MetadataProvider
metadataProvider,
Map<String, String> configuration) {
- String fileSplits =
+ String deltaFileSplits =
(String)
metadataProvider.getConfig().get(CompilerProperties.COMPILER_DELTALAKE_FILESPLITS_KEY);
- if (fileSplits != null) {
-
configuration.put(CompilerProperties.COMPILER_DELTALAKE_FILESPLITS_KEY,
fileSplits);
+ if (deltaFileSplits != null) {
+
configuration.put(CompilerProperties.COMPILER_DELTALAKE_FILESPLITS_KEY,
deltaFileSplits);
}
+ String parquetFileSplits =
+ (String)
metadataProvider.getConfig().get(CompilerProperties.COMPILER_PARQUET_FILESPLITS_KEY);
+ if (parquetFileSplits != null) {
+
configuration.put(CompilerProperties.COMPILER_PARQUET_FILESPLITS_KEY,
parquetFileSplits);
+ }
+
}
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20958?usp=email
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings?usp=email
Gerrit-MessageType: newchange
Gerrit-Project: asterixdb
Gerrit-Branch: lumina
Gerrit-Change-Id: I288908499c90320f9fc497675ef3d671163c69f0
Gerrit-Change-Number: 20958
Gerrit-PatchSet: 1
Gerrit-Owner: Preetham Poluparthi <[email protected]>