This is an automated email from the ASF dual-hosted git repository.
zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 07c09b586 [CH] add throttler to GlutenHDFSDisk (#6046)
07c09b586 is described below
commit 07c09b5868421b4195f3e666aa7950f0d2312213
Author: LiuNeng <[email protected]>
AuthorDate: Wed Jun 12 10:17:36 2024 +0800
[CH] add throttler to GlutenHDFSDisk (#6046)
[CH] add throttler to GlutenHDFSDisk
Co-authored-by: liuneng1994 <[email protected]>
---
cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp | 13 ++++++++++++-
cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h | 8 ++++++++
2 files changed, 20 insertions(+), 1 deletion(-)
diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp
b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp
index bff4108f2..cdbe6c728 100644
--- a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp
+++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp
@@ -17,6 +17,8 @@
#include "GlutenDiskHDFS.h"
#include <ranges>
+
+#include <Common/Throttler.h>
#include <Parser/SerializedPlanParser.h>
#if USE_HDFS
@@ -70,6 +72,15 @@ DiskObjectStoragePtr
GlutenDiskHDFS::createDiskObjectStorage()
config_prefix);
}
-
+std::unique_ptr<DB::WriteBufferFromFileBase> GlutenDiskHDFS::writeFile(
+ const String & path,
+ size_t buf_size,
+ DB::WriteMode mode,
+ const DB::WriteSettings & settings)
+{
+ if (throttler)
+ throttler->add(1);
+ return DiskObjectStorage::writeFile(path, buf_size, mode, settings);
+}
}
#endif
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h
b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h
index 9caedaae8..4e375b283 100644
--- a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h
+++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h
@@ -19,6 +19,7 @@
#include <config.h>
+#include <Common/Throttler.h>
#include <Disks/ObjectStorages/DiskObjectStorage.h>
#if USE_HDFS
#include <Disks/ObjectStorages/GlutenHDFSObjectStorage.h>
@@ -43,6 +44,8 @@ public:
object_key_prefix = object_key_prefix_;
hdfs_object_storage =
dynamic_cast<local_engine::GlutenHDFSObjectStorage *>(object_storage_.get());
hdfsSetWorkingDirectory(hdfs_object_storage->getHDFSFS(), "/");
+ auto max_speed = config.getUInt(config_prefix + ".write_speed", 450);
+ throttler = std::make_shared<DB::Throttler>(max_speed);
}
void createDirectory(const String & path) override;
@@ -52,11 +55,16 @@ public:
void removeDirectory(const String & path) override;
DB::DiskObjectStoragePtr createDiskObjectStorage() override;
+
+ std::unique_ptr<DB::WriteBufferFromFileBase> writeFile(const String& path,
size_t buf_size, DB::WriteMode mode,
+ const DB::WriteSettings& settings) override;
+
private:
String path2AbsPath(const String & path);
GlutenHDFSObjectStorage * hdfs_object_storage;
String object_key_prefix;
+ DB::ThrottlerPtr throttler;
};
#endif
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]