This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 02220560c5 [Improvement](multi catalog)Hive splitter. Get HDFS/S3
splits by using FileSystem api (#17706)
02220560c5 is described below
commit 02220560c59bdb18cdf924b62ae9574c5bedee84
Author: Jibing-Li <[email protected]>
AuthorDate: Wed Mar 15 00:25:00 2023 +0800
[Improvement](multi catalog)Hive splitter. Get HDFS/S3 splits by using
FileSystem api (#17706)
Use FileSystem API to get splits for file in HDFS/S3 instead of calling
InputFormat.getSplits.
The splits is based on blocks in HDFS/S3.
---
.../main/java/org/apache/doris/common/Config.java | 4 ++
.../doris/datasource/HMSExternalCatalog.java | 1 +
.../doris/datasource/hive/HiveMetaStoreCache.java | 36 +++++++---
.../apache/doris/external/hive/util/HiveUtil.java | 32 +++++++++
.../doris/planner/external/FileSplitStrategy.java | 2 +-
.../doris/planner/external/HiveSplitter.java | 84 ++++++++++++++++++++--
6 files changed, 144 insertions(+), 15 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 1f7561c8cb..b2f33c0dea 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1714,6 +1714,9 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = false)
public static long file_scan_node_split_num = 128;
+ @ConfField(mutable = true, masterOnly = false)
+ public static long file_split_size = 0; // 0 means use the block size in
HDFS/S3 as split size
+
/**
* If set to TRUE, FE will:
* 1. divide BE into high load and low load(no mid load) to force
triggering tablet scheduling;
@@ -2057,6 +2060,7 @@ public class Config extends ConfigBase {
@ConfField(mutable = false, masterOnly = false)
public static String mysql_load_server_secure_path = "";
+
@ConfField(mutable = false, masterOnly = false)
public static int mysql_load_thread_pool = 4;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
index 0cf7afdc53..80443ca8b2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
@@ -53,6 +53,7 @@ public class HMSExternalCatalog extends ExternalCatalog {
protected PooledHiveMetaStoreClient client;
// Record the latest synced event id when processing hive events
private long lastSyncedEventId;
+ public static final String ENABLE_SELF_SPLITTER = "enable.self.splitter";
/**
* Default constructor for HMSExternalCatalog.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index c2313eb885..156f6eba66 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -36,6 +36,7 @@ import org.apache.doris.metric.MetricRepo;
import org.apache.doris.planner.ColumnBound;
import org.apache.doris.planner.ListPartitionPrunerV2;
import org.apache.doris.planner.PartitionPrunerV2Base.UniqueId;
+import org.apache.doris.planner.external.HiveSplitter;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
@@ -50,6 +51,7 @@ import com.google.common.collect.RangeMap;
import com.google.common.collect.TreeRangeMap;
import lombok.Data;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
@@ -249,16 +251,20 @@ public class HiveMetaStoreCache {
try {
InputFormat<?, ?> inputFormat =
HiveUtil.getInputFormat(jobConf, key.inputFormat, false);
InputSplit[] splits;
- String remoteUser = jobConf.get(HdfsResource.HADOOP_USER_NAME);
-
- // TODO: Implement getSplits logic by ourselves, don't call
inputFormat.getSplits anymore.
- if (!Strings.isNullOrEmpty(remoteUser)) {
- UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(remoteUser);
- splits = ugi.doAs(
- (PrivilegedExceptionAction<InputSplit[]>) () ->
inputFormat.getSplits(jobConf, 0));
+ // TODO: This is a temp config, will remove it after the
HiveSplitter is stable.
+ if (key.useSelfSplitter) {
+ splits = HiveSplitter.getHiveSplits(new
Path(finalLocation), inputFormat, jobConf);
} else {
- splits = inputFormat.getSplits(jobConf, 0 /* use hdfs
block size as default */);
+ String remoteUser =
jobConf.get(HdfsResource.HADOOP_USER_NAME);
+ if (!Strings.isNullOrEmpty(remoteUser)) {
+ UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(remoteUser);
+ splits = ugi.doAs(
+ (PrivilegedExceptionAction<InputSplit[]>) () ->
inputFormat.getSplits(jobConf, 0));
+ } else {
+ splits = inputFormat.getSplits(jobConf, 0 /* use hdfs
block size as default */);
+ }
}
+
if (LOG.isDebugEnabled()) {
LOG.debug("load #{} files for {} in catalog {}",
splits.length, key, catalog.getName());
}
@@ -310,10 +316,10 @@ public class HiveMetaStoreCache {
}
}
- public List<InputSplit> getFilesByPartitions(List<HivePartition>
partitions) {
+ public List<InputSplit> getFilesByPartitions(List<HivePartition>
partitions, boolean useSelfSplitter) {
long start = System.currentTimeMillis();
List<FileCacheKey> keys =
Lists.newArrayListWithExpectedSize(partitions.size());
- partitions.stream().forEach(p -> keys.add(new
FileCacheKey(p.getPath(), p.getInputFormat())));
+ partitions.stream().forEach(p -> keys.add(new
FileCacheKey(p.getPath(), p.getInputFormat(), useSelfSplitter)));
Stream<FileCacheKey> stream;
if (partitions.size() < MIN_BATCH_FETCH_PARTITION_NUM) {
@@ -601,10 +607,20 @@ public class HiveMetaStoreCache {
private String location;
// not in key
private String inputFormat;
+ // Temp variable, use self file splitter or use InputFormat.getSplits.
+ // Will remove after self splitter is stable.
+ private boolean useSelfSplitter;
public FileCacheKey(String location, String inputFormat) {
this.location = location;
this.inputFormat = inputFormat;
+ this.useSelfSplitter = false;
+ }
+
+ public FileCacheKey(String location, String inputFormat, boolean
useSelfSplitter) {
+ this.location = location;
+ this.inputFormat = inputFormat;
+ this.useSelfSplitter = useSelfSplitter;
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java
index 1586788a30..f3a617d3e4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java
@@ -25,6 +25,8 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
@@ -40,6 +42,8 @@ import org.apache.hadoop.util.ReflectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.util.List;
/**
@@ -178,4 +182,32 @@ public final class HiveUtil {
}
}
+ public static boolean isSplittable(InputFormat<?, ?> inputFormat,
FileSystem fileSystem, Path path) {
+ // ORC uses a custom InputFormat but is always splittable
+ if (inputFormat.getClass().getSimpleName().equals("OrcInputFormat")) {
+ return true;
+ }
+
+ // use reflection to get isSplittable method on FileInputFormat
+ Method method = null;
+ for (Class<?> clazz = inputFormat.getClass(); clazz != null; clazz =
clazz.getSuperclass()) {
+ try {
+ method = clazz.getDeclaredMethod("isSplitable",
FileSystem.class, Path.class);
+ break;
+ } catch (NoSuchMethodException ignored) {
+ LOG.warn("Class {} doesn't contain isSplitable method.",
clazz);
+ }
+ }
+
+ if (method == null) {
+ return false;
+ }
+ try {
+ method.setAccessible(true);
+ return (boolean) method.invoke(inputFormat, fileSystem, path);
+ } catch (InvocationTargetException | IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplitStrategy.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplitStrategy.java
index e574aeb9d2..8fd7f2d16a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplitStrategy.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplitStrategy.java
@@ -34,7 +34,7 @@ public class FileSplitStrategy {
}
public boolean hasNext() {
- return totalSplitSize > Config.file_scan_node_split_size || splitNum >
Config.file_scan_node_split_num;
+ return totalSplitSize >= Config.file_scan_node_split_size || splitNum
>= Config.file_scan_node_split_num;
}
public void next() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java
index a49935b9ee..3dc2253f8a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java
@@ -23,11 +23,13 @@ import org.apache.doris.catalog.ListPartitionItem;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.Type;
import org.apache.doris.catalog.external.HMSExternalTable;
+import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
import org.apache.doris.datasource.hive.HivePartition;
+import org.apache.doris.external.hive.util.HiveUtil;
import org.apache.doris.planner.ColumnRange;
import org.apache.doris.planner.ListPartitionPrunerV2;
import org.apache.doris.planner.Split;
@@ -35,12 +37,20 @@ import org.apache.doris.planner.Splitter;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hive.ql.io.orc.OrcSplit;
import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -73,6 +83,13 @@ public class HiveSplitter implements Splitter {
hivePartitionValues =
cache.getPartitionValues(hmsTable.getDbName(), hmsTable.getName(),
partitionColumnTypes);
}
+ Map<String, String> properties =
hmsTable.getCatalog().getCatalogProperty().getProperties();
+ boolean useSelfSplitter = false;
+ if (properties.containsKey(HMSExternalCatalog.ENABLE_SELF_SPLITTER)
+ &&
properties.get(HMSExternalCatalog.ENABLE_SELF_SPLITTER).equalsIgnoreCase("true"))
{
+ LOG.debug("Using self splitter for hmsTable {}",
hmsTable.getName());
+ useSelfSplitter = true;
+ }
List<Split> allFiles = Lists.newArrayList();
if (hivePartitionValues != null) {
@@ -99,13 +116,13 @@ public class HiveSplitter implements Splitter {
List<HivePartition> partitions =
cache.getAllPartitions(hmsTable.getDbName(), hmsTable.getName(),
partitionValuesList);
// 4. get all files of partitions
- getFileSplitByPartitions(cache, partitions, allFiles);
+ getFileSplitByPartitions(cache, partitions, allFiles,
useSelfSplitter);
} else {
// unpartitioned table, create a dummy partition to save
location and inputformat,
// so that we can unify the interface.
HivePartition dummyPartition = new
HivePartition(hmsTable.getRemoteTable().getSd().getInputFormat(),
hmsTable.getRemoteTable().getSd().getLocation(), null);
- getFileSplitByPartitions(cache,
Lists.newArrayList(dummyPartition), allFiles);
+ getFileSplitByPartitions(cache,
Lists.newArrayList(dummyPartition), allFiles, useSelfSplitter);
this.totalPartitionNum = 1;
this.readPartitionNum = 1;
}
@@ -121,8 +138,8 @@ public class HiveSplitter implements Splitter {
}
private void getFileSplitByPartitions(HiveMetaStoreCache cache,
List<HivePartition> partitions,
- List<Split> allFiles) {
- List<InputSplit> files = cache.getFilesByPartitions(partitions);
+ List<Split> allFiles, boolean
useSelfSplitter) {
+ List<InputSplit> files = cache.getFilesByPartitions(partitions,
useSelfSplitter);
if (LOG.isDebugEnabled()) {
LOG.debug("get #{} files from #{} partitions: {}", files.size(),
partitions.size(),
Joiner.on(",")
@@ -152,4 +169,63 @@ public class HiveSplitter implements Splitter {
public int getReadPartitionNum() {
return readPartitionNum;
}
+
+ // Get splits by using FileSystem API, the splits are blocks in HDFS or S3
like storage system.
+ public static InputSplit[] getHiveSplits(Path path, InputFormat<?, ?>
inputFormat,
+ JobConf jobConf) throws
IOException {
+ FileSystem fs = path.getFileSystem(jobConf);
+ boolean splittable = HiveUtil.isSplittable(inputFormat, fs, path);
+ List<InputSplit> splits = Lists.newArrayList();
+ RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator =
fs.listFiles(path, true);
+ if (!locatedFileStatusRemoteIterator.hasNext()) {
+ LOG.debug("File status for path {} is empty.", path);
+ return new InputSplit[0];
+ }
+ if (!splittable) {
+ LOG.debug("Path {} is not splittable.", path);
+ while (locatedFileStatusRemoteIterator.hasNext()) {
+ LocatedFileStatus status =
locatedFileStatusRemoteIterator.next();
+ BlockLocation block = status.getBlockLocations()[0];
+ splits.add(new FileSplit(status.getPath(), 0, status.getLen(),
block.getHosts()));
+ }
+ return splits.toArray(new InputSplit[splits.size()]);
+ }
+ long splitSize = Config.file_split_size;
+ boolean useBlockSize = (splitSize <= 0);
+ while (locatedFileStatusRemoteIterator.hasNext()) {
+ LocatedFileStatus status = locatedFileStatusRemoteIterator.next();
+ if (useBlockSize) {
+ splitSize = status.getBlockSize();
+ }
+ BlockLocation[] blockLocations = status.getBlockLocations();
+ long length = status.getLen();
+ long bytesRemaining;
+ for (bytesRemaining = length; (double) bytesRemaining / (double)
splitSize > 1.1D;
+ bytesRemaining -= splitSize) {
+ int location = getBlockIndex(blockLocations, length -
bytesRemaining);
+ splits.add(new FileSplit(status.getPath(), length -
bytesRemaining,
+ splitSize, blockLocations[location].getHosts()));
+ }
+ if (bytesRemaining != 0L) {
+ int location = getBlockIndex(blockLocations, length -
bytesRemaining);
+ splits.add(new FileSplit(status.getPath(), length -
bytesRemaining,
+ bytesRemaining, blockLocations[location].getHosts()));
+ }
+ }
+
+ LOG.debug("Path {} includes {} splits.", path, splits.size());
+ return splits.toArray(new InputSplit[splits.size()]);
+ }
+
+ private static int getBlockIndex(BlockLocation[] blkLocations, long
offset) {
+ for (int i = 0; i < blkLocations.length; ++i) {
+ if (blkLocations[i].getOffset() <= offset
+ && offset < blkLocations[i].getOffset() +
blkLocations[i].getLength()) {
+ return i;
+ }
+ }
+ BlockLocation last = blkLocations[blkLocations.length - 1];
+ long fileLength = last.getOffset() + last.getLength() - 1L;
+ throw new IllegalArgumentException(String.format("Offset %d is outside
of file (0..%d)", offset, fileLength));
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]