This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3ba3b6c66f [opt](FileCache) use modification time to determine whether
the file is changed (#18906)
3ba3b6c66f is described below
commit 3ba3b6c66f9db38d980cafc4290c9217721b4e37
Author: Ashin Gau <[email protected]>
AuthorDate: Thu May 11 07:50:39 2023 +0800
[opt](FileCache) use modification time to determine whether the file is
changed (#18906)
Get the last modification time from file status, and use the combination of
path and modification time to generate cache identifier.
When a file is changed, the modification time will be changed, so the
former cache path will be invalid.
---
be/src/io/cache/block/cached_remote_file_reader.cpp | 13 +++++++++----
be/src/io/cache/block/cached_remote_file_reader.h | 5 +++--
be/src/io/fs/file_reader_options.h | 2 ++
be/src/io/fs/remote_file_system.cpp | 6 ++++--
be/src/vec/exec/format/csv/csv_reader.cpp | 4 ++++
be/src/vec/exec/format/json/new_json_reader.cpp | 2 ++
be/src/vec/exec/format/orc/vorc_reader.cpp | 2 ++
be/src/vec/exec/format/parquet/vparquet_reader.cpp | 2 ++
.../doris/catalog/HiveMetaStoreClientHelper.java | 1 +
.../org/apache/doris/common/util/BrokerUtil.java | 1 +
.../doris/datasource/hive/HiveMetaStoreCache.java | 3 +++
.../apache/doris/fs/remote/BrokerFileSystem.java | 2 +-
.../java/org/apache/doris/fs/remote/RemoteFile.java | 21 ++++++++++++++++++---
.../apache/doris/fs/remote/RemoteFileSystem.java | 4 ++--
.../org/apache/doris/fs/remote/S3FileSystem.java | 2 +-
.../apache/doris/fs/remote/dfs/DFSFileSystem.java | 2 +-
.../doris/planner/external/FileGroupInfo.java | 1 +
.../doris/planner/external/FileQueryScanNode.java | 1 +
.../apache/doris/planner/external/FileScanNode.java | 10 ++++++----
.../apache/doris/planner/external/FileSplit.java | 9 ++++++++-
.../apache/doris/planner/external/HiveScanNode.java | 2 +-
.../apache/doris/planner/external/TVFScanNode.java | 13 +------------
.../planner/external/iceberg/IcebergSplit.java | 1 +
.../ExternalFileTableValuedFunction.java | 1 +
.../apache/doris/broker/hdfs/FileSystemManager.java | 1 +
gensrc/thrift/PaloBrokerService.thrift | 1 +
gensrc/thrift/PlanNodes.thrift | 2 ++
27 files changed, 80 insertions(+), 34 deletions(-)
diff --git a/be/src/io/cache/block/cached_remote_file_reader.cpp
b/be/src/io/cache/block/cached_remote_file_reader.cpp
index f2c4c80173..81268ec5ad 100644
--- a/be/src/io/cache/block/cached_remote_file_reader.cpp
+++ b/be/src/io/cache/block/cached_remote_file_reader.cpp
@@ -42,17 +42,22 @@ namespace doris {
namespace io {
CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr
remote_file_reader,
- const std::string& cache_path)
+ const std::string& cache_path,
+ const long modification_time)
: _remote_file_reader(std::move(remote_file_reader)) {
- _cache_key = IFileCache::hash(cache_path);
+ // Use path and modification time to build cache key
+ std::string unique_path = fmt::format("{}:{}", cache_path,
modification_time);
+ _cache_key = IFileCache::hash(unique_path);
_cache = FileCacheFactory::instance().get_by_path(_cache_key);
}
CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr
remote_file_reader,
const std::string&
cache_base_path,
- const std::string& cache_path)
+ const std::string& cache_path,
+ const long modification_time)
: _remote_file_reader(std::move(remote_file_reader)) {
- _cache_key = IFileCache::hash(cache_path);
+ std::string unique_path = fmt::format("{}:{}", cache_path,
modification_time);
+ _cache_key = IFileCache::hash(unique_path);
_cache = FileCacheFactory::instance().get_by_path(cache_base_path);
if (_cache == nullptr) {
LOG(WARNING) << "Can't get cache from base path: " << cache_base_path
diff --git a/be/src/io/cache/block/cached_remote_file_reader.h
b/be/src/io/cache/block/cached_remote_file_reader.h
index 68b80e0ce4..5e66f9970a 100644
--- a/be/src/io/cache/block/cached_remote_file_reader.h
+++ b/be/src/io/cache/block/cached_remote_file_reader.h
@@ -39,10 +39,11 @@ struct FileCacheStatistics;
class CachedRemoteFileReader final : public FileReader {
public:
- CachedRemoteFileReader(FileReaderSPtr remote_file_reader, const
std::string& cache_path);
+ CachedRemoteFileReader(FileReaderSPtr remote_file_reader, const
std::string& cache_path,
+ const long modification_time);
CachedRemoteFileReader(FileReaderSPtr remote_file_reader, const
std::string& cache_base_path,
- const std::string& cache_path);
+ const std::string& cache_path, const long
modification_time);
~CachedRemoteFileReader() override;
diff --git a/be/src/io/fs/file_reader_options.h
b/be/src/io/fs/file_reader_options.h
index 66629f54de..744db5e1f6 100644
--- a/be/src/io/fs/file_reader_options.h
+++ b/be/src/io/fs/file_reader_options.h
@@ -80,6 +80,8 @@ public:
int64_t file_size = -1;
bool has_cache_base_path = false;
std::string cache_base_path;
+ // Use modification time to determine whether the file is changed
+ int64_t modification_time = 0;
void specify_cache_path(const std::string& base_path) {
has_cache_base_path = true;
diff --git a/be/src/io/fs/remote_file_system.cpp
b/be/src/io/fs/remote_file_system.cpp
index 63246a3c87..611f7cf894 100644
--- a/be/src/io/fs/remote_file_system.cpp
+++ b/be/src/io/fs/remote_file_system.cpp
@@ -97,9 +97,11 @@ Status RemoteFileSystem::open_file_impl(const Path& path,
const FileReaderOption
if (reader_options.has_cache_base_path) {
// from query session variable: file_cache_base_path
*reader = std::make_shared<CachedRemoteFileReader>(
- std::move(raw_reader), reader_options.cache_base_path,
cache_path);
+ std::move(raw_reader), reader_options.cache_base_path,
cache_path,
+ reader_options.modification_time);
} else {
- *reader =
std::make_shared<CachedRemoteFileReader>(std::move(raw_reader), cache_path);
+ *reader =
std::make_shared<CachedRemoteFileReader>(std::move(raw_reader), cache_path,
+
reader_options.modification_time);
}
break;
}
diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp
b/be/src/vec/exec/format/csv/csv_reader.cpp
index cff15c88bb..7cb36e94c7 100644
--- a/be/src/vec/exec/format/csv/csv_reader.cpp
+++ b/be/src/vec/exec/format/csv/csv_reader.cpp
@@ -170,6 +170,8 @@ Status CsvReader::init_reader(bool is_load) {
RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id,
&_file_reader));
} else {
io::FileReaderOptions reader_options =
FileFactory::get_reader_options(_state);
+ reader_options.modification_time =
+ _range.__isset.modification_time ? _range.modification_time :
0;
RETURN_IF_ERROR(io::DelegateReader::create_file_reader(
_profile, _system_properties, _file_description,
&_file_system, &_file_reader,
io::DelegateReader::AccessMode::SEQUENTIAL, reader_options,
_io_ctx,
@@ -657,6 +659,8 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool*
is_parse_name) {
_file_description.start_offset = start_offset;
io::FileReaderOptions reader_options =
FileFactory::get_reader_options(_state);
+ reader_options.modification_time =
+ _range.__isset.modification_time ? _range.modification_time : 0;
RETURN_IF_ERROR(FileFactory::create_file_reader(_profile,
_system_properties, _file_description,
&_file_system,
&_file_reader, reader_options));
if (_file_reader->size() == 0 && _params.file_type !=
TFileType::FILE_STREAM &&
diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp
b/be/src/vec/exec/format/json/new_json_reader.cpp
index 9643e48ab6..daa549776b 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -375,6 +375,8 @@ Status NewJsonReader::_open_file_reader() {
RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id,
&_file_reader));
} else {
io::FileReaderOptions reader_options =
FileFactory::get_reader_options(_state);
+ reader_options.modification_time =
+ _range.__isset.modification_time ? _range.modification_time :
0;
RETURN_IF_ERROR(io::DelegateReader::create_file_reader(
_profile, _system_properties, _file_description,
&_file_system, &_file_reader,
io::DelegateReader::AccessMode::SEQUENTIAL, reader_options,
_io_ctx,
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp
b/be/src/vec/exec/format/orc/vorc_reader.cpp
index ea36eec962..0f13b4d191 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -209,6 +209,8 @@ Status OrcReader::_create_file_reader() {
if (_file_input_stream == nullptr) {
io::FileReaderSPtr inner_reader;
io::FileReaderOptions reader_options =
FileFactory::get_reader_options(_state);
+ reader_options.modification_time =
+ _scan_range.__isset.modification_time ?
_scan_range.modification_time : 0;
RETURN_IF_ERROR(io::DelegateReader::create_file_reader(
_profile, _system_properties, _file_description,
&_file_system, &inner_reader,
io::DelegateReader::AccessMode::RANDOM, reader_options,
_io_ctx));
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 40d3492229..a9bf8dc6c8 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -209,6 +209,8 @@ Status ParquetReader::_open_file() {
SCOPED_RAW_TIMER(&_statistics.open_file_time);
++_statistics.open_file_num;
io::FileReaderOptions reader_options =
FileFactory::get_reader_options(_state);
+ reader_options.modification_time =
+ _scan_range.__isset.modification_time ?
_scan_range.modification_time : 0;
RETURN_IF_ERROR(io::DelegateReader::create_file_reader(
_profile, _system_properties, _file_description,
&_file_system, &_file_reader,
io::DelegateReader::AccessMode::RANDOM, reader_options,
_io_ctx));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
index 938f659159..184e61b1d1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
@@ -233,6 +233,7 @@ public class HiveMetaStoreClientHelper {
brokerFileStatus.setIsDir(fileLocation.isDirectory());
brokerFileStatus.setIsSplitable(true);
brokerFileStatus.setSize(fileLocation.getSize());
+
brokerFileStatus.setModificationTime(fileLocation.getModificationTime());
// filePath.toUri().getPath() =
"/path/to/partition/file_name"
// eg:
/home/work/dev/hive/apache-hive-2.3.7-bin/data/warehouse
// + /dae.db/customer/state=CA/city=SanJose/000000_0
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
index 7fa0beda8d..0444abf443 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
@@ -102,6 +102,7 @@ public class BrokerUtil {
if (r.isFile()) {
TBrokerFileStatus status = new TBrokerFileStatus(r.getName(),
!r.isFile(), r.getSize(), r.isFile());
status.setBlockSize(r.getBlockSize());
+ status.setModificationTime(r.getModificationTime());
fileStatuses.add(status);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index 3bd7daade5..f6382b80da 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -324,6 +324,7 @@ public class HiveMetaStoreCache {
// Convert the hadoop split to Doris Split.
for (int i = 0; i < splits.length; i++) {
org.apache.hadoop.mapred.FileSplit fs =
((org.apache.hadoop.mapred.FileSplit) splits[i]);
+ // todo: get modification time
result.addSplit(new FileSplit(fs.getPath(),
fs.getStart(), fs.getLength(), -1, null, null));
}
}
@@ -802,6 +803,7 @@ public class HiveMetaStoreCache {
status.setPath(file.getPath());
status.length = file.getSize();
status.blockSize = file.getBlockSize();
+ status.modificationTime = file.getModificationTime();
files.add(status);
}
@@ -823,6 +825,7 @@ public class HiveMetaStoreCache {
Path path;
long length;
long blockSize;
+ long modificationTime;
}
@Data
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileSystem.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileSystem.java
index 14befa5c16..a2eb5560a4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileSystem.java
@@ -581,7 +581,7 @@ public class BrokerFileSystem extends RemoteFileSystem {
List<TBrokerFileStatus> fileStatus = rep.getFiles();
for (TBrokerFileStatus tFile : fileStatus) {
- RemoteFile file = new RemoteFile(tFile.path, !tFile.isDir,
tFile.size, 0);
+ RemoteFile file = new RemoteFile(tFile.path, !tFile.isDir,
tFile.size, 0, tFile.getModificationTime());
result.add(file);
}
LOG.info("finished to list remote path {}. get files: {}",
remotePath, result);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFile.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFile.java
index cdfcf451c3..a8d918cffa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFile.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFile.java
@@ -33,25 +33,36 @@ public class RemoteFile {
// A large file will split into multiple blocks. The blocks are
transparent to the user.
// Default block size for HDFS 2.x is 128M.
private final long blockSize;
+ private long modificationTime;
private Path path;
BlockLocation[] blockLocations;
public RemoteFile(String name, boolean isFile, long size, long blockSize) {
- this(name, null, isFile, !isFile, size, blockSize, null);
+ this(name, null, isFile, !isFile, size, blockSize, 0, null);
+ }
+
+ public RemoteFile(String name, boolean isFile, long size, long blockSize,
long modificationTime) {
+ this(name, null, isFile, !isFile, size, blockSize, modificationTime,
null);
}
public RemoteFile(Path path, boolean isDirectory, long size, long
blockSize, BlockLocation[] blockLocations) {
- this(path.getName(), path, !isDirectory, isDirectory, size, blockSize,
blockLocations);
+ this(path.getName(), path, !isDirectory, isDirectory, size, blockSize,
0, blockLocations);
+ }
+
+ public RemoteFile(Path path, boolean isDirectory, long size, long
blockSize, long modificationTime,
+ BlockLocation[] blockLocations) {
+ this(path.getName(), path, !isDirectory, isDirectory, size, blockSize,
modificationTime, blockLocations);
}
public RemoteFile(String name, Path path, boolean isFile, boolean
isDirectory,
- long size, long blockSize, BlockLocation[]
blockLocations) {
+ long size, long blockSize, long modificationTime, BlockLocation[]
blockLocations) {
Preconditions.checkState(!Strings.isNullOrEmpty(name));
this.name = name;
this.isFile = isFile;
this.isDirectory = isDirectory;
this.size = size;
this.blockSize = blockSize;
+ this.modificationTime = modificationTime;
this.path = path;
this.blockLocations = blockLocations;
}
@@ -80,6 +91,10 @@ public class RemoteFile {
return blockSize;
}
+ public long getModificationTime() {
+ return modificationTime;
+ }
+
public BlockLocation[] getBlockLocations() {
return blockLocations;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java
index 35710f8785..7d87993733 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java
@@ -58,8 +58,8 @@ public abstract class RemoteFileSystem extends
PersistentFileSystem {
List<RemoteFile> locations = new ArrayList<>();
while (locatedFiles.hasNext()) {
LocatedFileStatus fileStatus = locatedFiles.next();
- RemoteFile location = new RemoteFile(fileStatus.getPath(),
fileStatus.isDirectory(),
- fileStatus.getLen(), fileStatus.getBlockSize(),
fileStatus.getBlockLocations());
+ RemoteFile location = new RemoteFile(fileStatus.getPath(),
fileStatus.isDirectory(), fileStatus.getLen(),
+ fileStatus.getBlockSize(),
fileStatus.getModificationTime(), fileStatus.getBlockLocations());
locations.add(location);
}
return new RemoteFiles(locations);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
index c6cfc0ba02..0d09037c81 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
@@ -79,7 +79,7 @@ public class S3FileSystem extends ObjFileSystem {
RemoteFile remoteFile = new RemoteFile(
fileNameOnly ? fileStatus.getPath().getName() :
fileStatus.getPath().toString(),
!fileStatus.isDirectory(), fileStatus.isDirectory() ?
-1 : fileStatus.getLen(),
- fileStatus.getBlockSize());
+ fileStatus.getBlockSize(),
fileStatus.getModificationTime());
result.add(remoteFile);
}
} catch (FileNotFoundException e) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
index 26478782a7..d8df27a5c0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
@@ -431,7 +431,7 @@ public class DFSFileSystem extends RemoteFileSystem {
RemoteFile remoteFile = new RemoteFile(
fileNameOnly ? fileStatus.getPath().getName() :
fileStatus.getPath().toString(),
!fileStatus.isDirectory(), fileStatus.isDirectory() ?
-1 : fileStatus.getLen(),
- fileStatus.getBlockSize());
+ fileStatus.getBlockSize(),
fileStatus.getModificationTime());
result.add(remoteFile);
}
} catch (FileNotFoundException e) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java
index a1e3499382..7808c35ac6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java
@@ -337,6 +337,7 @@ public class FileGroupInfo {
rangeDesc.setSize(fileStatus.size);
rangeDesc.setFileSize(fileStatus.size);
}
+ rangeDesc.setModificationTime(fileStatus.getModificationTime());
return rangeDesc;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
index 86e3dab30f..a36369325e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
@@ -306,6 +306,7 @@ public abstract class FileQueryScanNode extends
FileScanNode {
// need full path
rangeDesc.setPath(fileSplit.getPath().toString());
}
+ rangeDesc.setModificationTime(fileSplit.getModificationTime());
return rangeDesc;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java
index 3a24ba3319..aa0e923203 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java
@@ -212,7 +212,7 @@ public class FileScanNode extends ExternalScanNode {
}
protected List<Split> splitFile(Path path, long blockSize, BlockLocation[]
blockLocations, long length,
- boolean splittable, List<String>
partitionValues) throws IOException {
+ long modificationTime, boolean splittable, List<String>
partitionValues) throws IOException {
if (blockLocations == null) {
blockLocations = new BlockLocation[0];
}
@@ -226,7 +226,7 @@ public class FileScanNode extends ExternalScanNode {
if (!splittable) {
LOG.debug("Path {} is not splittable.", path);
String[] hosts = blockLocations.length == 0 ? null :
blockLocations[0].getHosts();
- result.add(new FileSplit(path, 0, length, length, hosts,
partitionValues));
+ result.add(new FileSplit(path, 0, length, length,
modificationTime, hosts, partitionValues));
return result;
}
long bytesRemaining;
@@ -234,12 +234,14 @@ public class FileScanNode extends ExternalScanNode {
bytesRemaining -= splitSize) {
int location = getBlockIndex(blockLocations, length -
bytesRemaining);
String[] hosts = location == -1 ? null :
blockLocations[location].getHosts();
- result.add(new FileSplit(path, length - bytesRemaining, splitSize,
length, hosts, partitionValues));
+ result.add(new FileSplit(path, length - bytesRemaining, splitSize,
+ length, modificationTime, hosts, partitionValues));
}
if (bytesRemaining != 0L) {
int location = getBlockIndex(blockLocations, length -
bytesRemaining);
String[] hosts = location == -1 ? null :
blockLocations[location].getHosts();
- result.add(new FileSplit(path, length - bytesRemaining,
bytesRemaining, length, hosts, partitionValues));
+ result.add(new FileSplit(path, length - bytesRemaining,
bytesRemaining,
+ length, modificationTime, hosts, partitionValues));
}
LOG.debug("Path {} includes {} splits.", path, result.size());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java
index 03f1ab3b60..21b88f9c2b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java
@@ -34,6 +34,7 @@ public class FileSplit implements Split {
// -1 means unset.
// If the file length is not set, the file length will be fetched from the
file system.
protected long fileLength;
+ protected long modificationTime;
protected String[] hosts;
protected TableFormatType tableFormatType;
// The values of partitions.
@@ -42,15 +43,21 @@ public class FileSplit implements Split {
protected List<String> partitionValues;
public FileSplit(Path path, long start, long length, long fileLength,
- String[] hosts, List<String> partitionValues) {
+ long modificationTime, String[] hosts, List<String>
partitionValues) {
this.path = path;
this.start = start;
this.length = length;
this.fileLength = fileLength;
+ this.modificationTime = modificationTime;
this.hosts = hosts == null ? new String[0] : hosts;
this.partitionValues = partitionValues;
}
+ public FileSplit(Path path, long start, long length, long fileLength,
+ String[] hosts, List<String> partitionValues) {
+ this(path, start, length, fileLength, 0, hosts, partitionValues);
+ }
+
public String[] getHosts() {
return hosts;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
index 96c1872010..87d0e55796 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
@@ -170,7 +170,7 @@ public class HiveScanNode extends FileQueryScanNode {
boolean isSplittable = fileCacheValue.isSplittable();
for (HiveMetaStoreCache.HiveFileStatus status :
fileCacheValue.getFiles()) {
allFiles.addAll(splitFile(status.getPath(),
status.getBlockSize(),
- status.getBlockLocations(), status.getLength(),
+ status.getBlockLocations(), status.getLength(),
status.getModificationTime(),
isSplittable,
fileCacheValue.getPartitionValues()));
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java
index a8ede0d843..67008f5d7a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java
@@ -111,7 +111,7 @@ public class TVFScanNode extends FileQueryScanNode {
Path path = new Path(fileStatus.getPath());
try {
splits.addAll(splitFile(path, fileStatus.getBlockSize(), null,
fileStatus.getSize(),
- fileStatus.isSplitable, null));
+ fileStatus.getModificationTime(),
fileStatus.isSplitable, null));
} catch (IOException e) {
LOG.warn("get file split failed for TVF: {}", path, e);
throw new UserException(e);
@@ -119,15 +119,4 @@ public class TVFScanNode extends FileQueryScanNode {
}
return splits;
}
-
- private void addFileSplits(Path path, long fileSize, long splitSize,
List<Split> splits) {
- long bytesRemaining;
- for (bytesRemaining = fileSize; (double) bytesRemaining / (double)
splitSize > 1.1D;
- bytesRemaining -= splitSize) {
- splits.add(new FileSplit(path, fileSize - bytesRemaining,
splitSize, fileSize, new String[0], null));
- }
- if (bytesRemaining != 0L) {
- splits.add(new FileSplit(path, fileSize - bytesRemaining,
bytesRemaining, fileSize, new String[0], null));
- }
- }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
index 896b4968b4..9064017088 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
@@ -26,6 +26,7 @@ import java.util.List;
@Data
public class IcebergSplit extends FileSplit {
+ // File path will be changed if the file is modified, so there's no need
to get modification time.
public IcebergSplit(Path file, long start, long length, long fileLength,
String[] hosts) {
super(file, start, length, fileLength, hosts, null);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
index 47adc717a5..74ee6a855d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
@@ -457,6 +457,7 @@ public abstract class ExternalFileTableValuedFunction
extends TableValuedFunctio
fileRangeDesc.setStartOffset(0);
fileRangeDesc.setSize(firstFile.getSize());
fileRangeDesc.setFileSize(firstFile.getSize());
+ fileRangeDesc.setModificationTime(firstFile.getModificationTime());
// set TFileScanRange
TFileScanRange fileScanRange = new TFileScanRange();
fileScanRange.addToRanges(fileRangeDesc);
diff --git
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
index 17f4d13216..1b0b94b923 100644
---
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
+++
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
@@ -1036,6 +1036,7 @@ public class FileSystemManager {
brokerFileStatus.setSize(fileStatus.getLen());
brokerFileStatus.setIsSplitable(true);
}
+
brokerFileStatus.setModificationTime(fileStatus.getModificationTime());
if (fileNameOnly) {
// return like this: file.txt
brokerFileStatus.setPath(fileStatus.getPath().getName());
diff --git a/gensrc/thrift/PaloBrokerService.thrift
b/gensrc/thrift/PaloBrokerService.thrift
index 1d4d2e876a..308c606544 100644
--- a/gensrc/thrift/PaloBrokerService.thrift
+++ b/gensrc/thrift/PaloBrokerService.thrift
@@ -56,6 +56,7 @@ struct TBrokerFileStatus {
//and the entire file must be imported as a
complete map task.
//the return value of the compressed file is
false
5: optional i64 blockSize; //Block size in FS. e.g. HDFS and S3
+ 6: optional i64 modificationTime = 0; // Last modification time
}
struct TBrokerFD {
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 3b332744a9..b2ac960a10 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -348,6 +348,8 @@ struct TFileRangeDesc {
7: optional list<string> columns_from_path_keys;
// For data lake table format
8: optional TTableFormatFileDesc table_format_params
+ // Use modification time to determine whether the file is changed
+ 9: optional i64 modification_time
}
// TFileScanRange represents a set of descriptions of a file and the rules for
reading and converting it.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]