This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 02220560c5 [Improvement](multi catalog)Hive splitter. Get HDFS/S3 
splits by using FileSystem api  (#17706)
02220560c5 is described below

commit 02220560c59bdb18cdf924b62ae9574c5bedee84
Author: Jibing-Li <[email protected]>
AuthorDate: Wed Mar 15 00:25:00 2023 +0800

    [Improvement](multi catalog)Hive splitter. Get HDFS/S3 splits by using 
FileSystem api  (#17706)
    
    Use FileSystem API to get splits for file in HDFS/S3 instead of calling 
InputFormat.getSplits.
    The splits is based on blocks in HDFS/S3.
---
 .../main/java/org/apache/doris/common/Config.java  |  4 ++
 .../doris/datasource/HMSExternalCatalog.java       |  1 +
 .../doris/datasource/hive/HiveMetaStoreCache.java  | 36 +++++++---
 .../apache/doris/external/hive/util/HiveUtil.java  | 32 +++++++++
 .../doris/planner/external/FileSplitStrategy.java  |  2 +-
 .../doris/planner/external/HiveSplitter.java       | 84 ++++++++++++++++++++--
 6 files changed, 144 insertions(+), 15 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 1f7561c8cb..b2f33c0dea 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1714,6 +1714,9 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true, masterOnly = false)
     public static long file_scan_node_split_num = 128;
 
+    @ConfField(mutable = true, masterOnly = false)
+    public static long file_split_size = 0; // 0 means use the block size in 
HDFS/S3 as split size
+
     /**
      * If set to TRUE, FE will:
      * 1. divide BE into high load and low load(no mid load) to force 
triggering tablet scheduling;
@@ -2057,6 +2060,7 @@ public class Config extends ConfigBase {
     @ConfField(mutable = false, masterOnly = false)
     public static String mysql_load_server_secure_path = "";
 
+
     @ConfField(mutable = false, masterOnly = false)
     public static int mysql_load_thread_pool = 4;
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
index 0cf7afdc53..80443ca8b2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
@@ -53,6 +53,7 @@ public class HMSExternalCatalog extends ExternalCatalog {
     protected PooledHiveMetaStoreClient client;
     // Record the latest synced event id when processing hive events
     private long lastSyncedEventId;
+    public static final String ENABLE_SELF_SPLITTER = "enable.self.splitter";
 
     /**
      * Default constructor for HMSExternalCatalog.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index c2313eb885..156f6eba66 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -36,6 +36,7 @@ import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.planner.ColumnBound;
 import org.apache.doris.planner.ListPartitionPrunerV2;
 import org.apache.doris.planner.PartitionPrunerV2Base.UniqueId;
+import org.apache.doris.planner.external.HiveSplitter;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
@@ -50,6 +51,7 @@ import com.google.common.collect.RangeMap;
 import com.google.common.collect.TreeRangeMap;
 import lombok.Data;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
@@ -249,16 +251,20 @@ public class HiveMetaStoreCache {
             try {
                 InputFormat<?, ?> inputFormat = 
HiveUtil.getInputFormat(jobConf, key.inputFormat, false);
                 InputSplit[] splits;
-                String remoteUser = jobConf.get(HdfsResource.HADOOP_USER_NAME);
-
-                // TODO: Implement getSplits logic by ourselves, don't call 
inputFormat.getSplits anymore.
-                if (!Strings.isNullOrEmpty(remoteUser)) {
-                    UserGroupInformation ugi = 
UserGroupInformation.createRemoteUser(remoteUser);
-                    splits = ugi.doAs(
-                            (PrivilegedExceptionAction<InputSplit[]>) () -> 
inputFormat.getSplits(jobConf, 0));
+                // TODO: This is a temp config, will remove it after the 
HiveSplitter is stable.
+                if (key.useSelfSplitter) {
+                    splits = HiveSplitter.getHiveSplits(new 
Path(finalLocation), inputFormat, jobConf);
                 } else {
-                    splits = inputFormat.getSplits(jobConf, 0 /* use hdfs 
block size as default */);
+                    String remoteUser = 
jobConf.get(HdfsResource.HADOOP_USER_NAME);
+                    if (!Strings.isNullOrEmpty(remoteUser)) {
+                        UserGroupInformation ugi = 
UserGroupInformation.createRemoteUser(remoteUser);
+                        splits = ugi.doAs(
+                            (PrivilegedExceptionAction<InputSplit[]>) () -> 
inputFormat.getSplits(jobConf, 0));
+                    } else {
+                        splits = inputFormat.getSplits(jobConf, 0 /* use hdfs 
block size as default */);
+                    }
                 }
+
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("load #{} files for {} in catalog {}", 
splits.length, key, catalog.getName());
                 }
@@ -310,10 +316,10 @@ public class HiveMetaStoreCache {
         }
     }
 
-    public List<InputSplit> getFilesByPartitions(List<HivePartition> 
partitions) {
+    public List<InputSplit> getFilesByPartitions(List<HivePartition> 
partitions, boolean useSelfSplitter) {
         long start = System.currentTimeMillis();
         List<FileCacheKey> keys = 
Lists.newArrayListWithExpectedSize(partitions.size());
-        partitions.stream().forEach(p -> keys.add(new 
FileCacheKey(p.getPath(), p.getInputFormat())));
+        partitions.stream().forEach(p -> keys.add(new 
FileCacheKey(p.getPath(), p.getInputFormat(), useSelfSplitter)));
 
         Stream<FileCacheKey> stream;
         if (partitions.size() < MIN_BATCH_FETCH_PARTITION_NUM) {
@@ -601,10 +607,20 @@ public class HiveMetaStoreCache {
         private String location;
         // not in key
         private String inputFormat;
+        // Temp variable, use self file splitter or use InputFormat.getSplits.
+        // Will remove after self splitter is stable.
+        private boolean useSelfSplitter;
 
         public FileCacheKey(String location, String inputFormat) {
             this.location = location;
             this.inputFormat = inputFormat;
+            this.useSelfSplitter = false;
+        }
+
+        public FileCacheKey(String location, String inputFormat, boolean 
useSelfSplitter) {
+            this.location = location;
+            this.inputFormat = inputFormat;
+            this.useSelfSplitter = useSelfSplitter;
         }
 
         @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java
index 1586788a30..f3a617d3e4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java
@@ -25,6 +25,8 @@ import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.UserException;
 
 import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat;
 import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
@@ -40,6 +42,8 @@ import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.util.List;
 
 /**
@@ -178,4 +182,32 @@ public final class HiveUtil {
         }
     }
 
+    public static boolean isSplittable(InputFormat<?, ?> inputFormat, 
FileSystem fileSystem, Path path) {
+        // ORC uses a custom InputFormat but is always splittable
+        if (inputFormat.getClass().getSimpleName().equals("OrcInputFormat")) {
+            return true;
+        }
+
+        // use reflection to get isSplittable method on FileInputFormat
+        Method method = null;
+        for (Class<?> clazz = inputFormat.getClass(); clazz != null; clazz = 
clazz.getSuperclass()) {
+            try {
+                method = clazz.getDeclaredMethod("isSplitable", 
FileSystem.class, Path.class);
+                break;
+            } catch (NoSuchMethodException ignored) {
+                LOG.warn("Class {} doesn't contain isSplitable method.", 
clazz);
+            }
+        }
+
+        if (method == null) {
+            return false;
+        }
+        try {
+            method.setAccessible(true);
+            return (boolean) method.invoke(inputFormat, fileSystem, path);
+        } catch (InvocationTargetException | IllegalAccessException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplitStrategy.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplitStrategy.java
index e574aeb9d2..8fd7f2d16a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplitStrategy.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplitStrategy.java
@@ -34,7 +34,7 @@ public class FileSplitStrategy {
     }
 
     public boolean hasNext() {
-        return totalSplitSize > Config.file_scan_node_split_size || splitNum > 
Config.file_scan_node_split_num;
+        return totalSplitSize >= Config.file_scan_node_split_size || splitNum 
>= Config.file_scan_node_split_num;
     }
 
     public void next() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java
index a49935b9ee..3dc2253f8a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java
@@ -23,11 +23,13 @@ import org.apache.doris.catalog.ListPartitionItem;
 import org.apache.doris.catalog.PartitionItem;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.catalog.external.HMSExternalTable;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.Util;
 import org.apache.doris.datasource.HMSExternalCatalog;
 import org.apache.doris.datasource.hive.HiveMetaStoreCache;
 import org.apache.doris.datasource.hive.HivePartition;
+import org.apache.doris.external.hive.util.HiveUtil;
 import org.apache.doris.planner.ColumnRange;
 import org.apache.doris.planner.ListPartitionPrunerV2;
 import org.apache.doris.planner.Split;
@@ -35,12 +37,20 @@ import org.apache.doris.planner.Splitter;
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hive.ql.io.orc.OrcSplit;
 import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -73,6 +83,13 @@ public class HiveSplitter implements Splitter {
                 hivePartitionValues = 
cache.getPartitionValues(hmsTable.getDbName(), hmsTable.getName(),
                     partitionColumnTypes);
             }
+            Map<String, String> properties = 
hmsTable.getCatalog().getCatalogProperty().getProperties();
+            boolean useSelfSplitter = false;
+            if (properties.containsKey(HMSExternalCatalog.ENABLE_SELF_SPLITTER)
+                    && 
properties.get(HMSExternalCatalog.ENABLE_SELF_SPLITTER).equalsIgnoreCase("true"))
 {
+                LOG.debug("Using self splitter for hmsTable {}", 
hmsTable.getName());
+                useSelfSplitter = true;
+            }
 
             List<Split> allFiles = Lists.newArrayList();
             if (hivePartitionValues != null) {
@@ -99,13 +116,13 @@ public class HiveSplitter implements Splitter {
                 List<HivePartition> partitions = 
cache.getAllPartitions(hmsTable.getDbName(), hmsTable.getName(),
                         partitionValuesList);
                 // 4. get all files of partitions
-                getFileSplitByPartitions(cache, partitions, allFiles);
+                getFileSplitByPartitions(cache, partitions, allFiles, 
useSelfSplitter);
             } else {
                 // unpartitioned table, create a dummy partition to save 
location and inputformat,
                 // so that we can unify the interface.
                 HivePartition dummyPartition = new 
HivePartition(hmsTable.getRemoteTable().getSd().getInputFormat(),
                         hmsTable.getRemoteTable().getSd().getLocation(), null);
-                getFileSplitByPartitions(cache, 
Lists.newArrayList(dummyPartition), allFiles);
+                getFileSplitByPartitions(cache, 
Lists.newArrayList(dummyPartition), allFiles, useSelfSplitter);
                 this.totalPartitionNum = 1;
                 this.readPartitionNum = 1;
             }
@@ -121,8 +138,8 @@ public class HiveSplitter implements Splitter {
     }
 
     private void getFileSplitByPartitions(HiveMetaStoreCache cache, 
List<HivePartition> partitions,
-                                          List<Split> allFiles) {
-        List<InputSplit> files = cache.getFilesByPartitions(partitions);
+                                          List<Split> allFiles, boolean 
useSelfSplitter) {
+        List<InputSplit> files = cache.getFilesByPartitions(partitions, 
useSelfSplitter);
         if (LOG.isDebugEnabled()) {
             LOG.debug("get #{} files from #{} partitions: {}", files.size(), 
partitions.size(),
                     Joiner.on(",")
@@ -152,4 +169,63 @@ public class HiveSplitter implements Splitter {
     public int getReadPartitionNum() {
         return readPartitionNum;
     }
+
+    // Get splits by using FileSystem API, the splits are blocks in HDFS or S3 
like storage system.
+    public static InputSplit[] getHiveSplits(Path path, InputFormat<?, ?> 
inputFormat,
+                                             JobConf jobConf) throws 
IOException {
+        FileSystem fs = path.getFileSystem(jobConf);
+        boolean splittable = HiveUtil.isSplittable(inputFormat, fs, path);
+        List<InputSplit> splits = Lists.newArrayList();
+        RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = 
fs.listFiles(path, true);
+        if (!locatedFileStatusRemoteIterator.hasNext()) {
+            LOG.debug("File status for path {} is empty.", path);
+            return new InputSplit[0];
+        }
+        if (!splittable) {
+            LOG.debug("Path {} is not splittable.", path);
+            while (locatedFileStatusRemoteIterator.hasNext()) {
+                LocatedFileStatus status = 
locatedFileStatusRemoteIterator.next();
+                BlockLocation block = status.getBlockLocations()[0];
+                splits.add(new FileSplit(status.getPath(), 0, status.getLen(), 
block.getHosts()));
+            }
+            return splits.toArray(new InputSplit[splits.size()]);
+        }
+        long splitSize = Config.file_split_size;
+        boolean useBlockSize = (splitSize <= 0);
+        while (locatedFileStatusRemoteIterator.hasNext()) {
+            LocatedFileStatus status = locatedFileStatusRemoteIterator.next();
+            if (useBlockSize) {
+                splitSize = status.getBlockSize();
+            }
+            BlockLocation[] blockLocations = status.getBlockLocations();
+            long length = status.getLen();
+            long bytesRemaining;
+            for (bytesRemaining = length; (double) bytesRemaining / (double) 
splitSize > 1.1D;
+                    bytesRemaining -= splitSize) {
+                int location = getBlockIndex(blockLocations, length - 
bytesRemaining);
+                splits.add(new FileSplit(status.getPath(), length - 
bytesRemaining,
+                        splitSize, blockLocations[location].getHosts()));
+            }
+            if (bytesRemaining != 0L) {
+                int location = getBlockIndex(blockLocations, length - 
bytesRemaining);
+                splits.add(new FileSplit(status.getPath(), length - 
bytesRemaining,
+                        bytesRemaining, blockLocations[location].getHosts()));
+            }
+        }
+
+        LOG.debug("Path {} includes {} splits.", path, splits.size());
+        return splits.toArray(new InputSplit[splits.size()]);
+    }
+
+    private static int getBlockIndex(BlockLocation[] blkLocations, long 
offset) {
+        for (int i = 0; i < blkLocations.length; ++i) {
+            if (blkLocations[i].getOffset() <= offset
+                    && offset < blkLocations[i].getOffset() + 
blkLocations[i].getLength()) {
+                return i;
+            }
+        }
+        BlockLocation last = blkLocations[blkLocations.length - 1];
+        long fileLength = last.getOffset() + last.getLength() - 1L;
+        throw new IllegalArgumentException(String.format("Offset %d is outside 
of file (0..%d)", offset, fileLength));
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to