This is an automated email from the ASF dual-hosted git repository.
changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 00867cb8c7 [GLUTEN-1632][CH]Daily Update Clickhouse Version (20250729)
(#10271)
00867cb8c7 is described below
commit 00867cb8c7539edc4deda9c165a0c8f8efa79684
Author: Kyligence Git <[email protected]>
AuthorDate: Tue Jul 29 08:32:05 2025 -0500
[GLUTEN-1632][CH]Daily Update Clickhouse Version (20250729) (#10271)
* [GLUTEN-1632][CH]Daily Update Clickhouse Version (20250729)
* Fix build due to https://github.com/ClickHouse/ClickHouse/pull/76802
* Fix build due to https://github.com/ClickHouse/ClickHouse/pull/81837
* Fix build due to https://github.com/ClickHouse/ClickHouse/pull/84011
* Fix gtest due to https://github.com/ClickHouse/ClickHouse/pull/83599
---------
Co-authored-by: kyligence-git <[email protected]>
Co-authored-by: Chang chen <[email protected]>
---
cpp-ch/clickhouse.version | 4 ++--
.../CompactObjectStorageDiskTransaction.cpp | 4 ++--
.../CompactObjectStorageDiskTransaction.h | 2 +-
.../Disks/ObjectStorages/GlutenDiskHDFS.cpp | 2 +-
.../ObjectStorages/MetadataStorageFromRocksDB.cpp | 9 +++++++--
.../Disks/ObjectStorages/MetadataStorageFromRocksDB.h | 7 ++++++-
.../local-engine/Storages/Output/NormalFileWriter.h | 19 +++++++++++++++++--
.../Storages/SubstraitSource/ReadBufferBuilder.cpp | 2 +-
cpp-ch/local-engine/tests/gtest_write_pipeline.cpp | 2 +-
9 files changed, 38 insertions(+), 13 deletions(-)
diff --git a/cpp-ch/clickhouse.version b/cpp-ch/clickhouse.version
index 0344f3e2e5..38042d4254 100644
--- a/cpp-ch/clickhouse.version
+++ b/cpp-ch/clickhouse.version
@@ -1,3 +1,3 @@
CH_ORG=Kyligence
-CH_BRANCH=rebase_ch/20250728
-CH_COMMIT=49d80cdb519
+CH_BRANCH=rebase_ch/20250729
+CH_COMMIT=77ef0818976
diff --git
a/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.cpp
b/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.cpp
index 148a43580b..1b5ff914aa 100644
---
a/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.cpp
+++
b/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.cpp
@@ -58,7 +58,7 @@ void TemporaryWriteBufferWrapper::nextImpl()
BufferBase::set(data_buffer->buffer().begin(),
data_buffer->buffer().size(), data_buffer->offset());
}
-void CompactObjectStorageDiskTransaction::commit()
+void CompactObjectStorageDiskTransaction::commit(const
DB::TransactionCommitOptionsVariant & options)
{
auto metadata_tx = disk.getMetadataStorage()->createTransaction();
std::filesystem::path data_path = std::filesystem::path(prefix_path) /
PART_DATA_FILE_NAME;
@@ -105,7 +105,7 @@ void CompactObjectStorageDiskTransaction::commit()
merge_files(files | std::ranges::views::filter([](auto file) { return
!isMetaDataFile(file.first); }), *data_write_buffer, data_key, data_path);
merge_files(files | std::ranges::views::filter([](auto file) { return
isMetaDataFile(file.first); }), *meta_write_buffer, meta_key, meta_path);
- metadata_tx->commit();
+ metadata_tx->commit(options);
files.clear();
}
diff --git
a/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.h
b/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.h
index 0f95ae01ec..7219898907 100644
---
a/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.h
+++
b/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.h
@@ -69,7 +69,7 @@ class CompactObjectStorageDiskTransaction: public
DB::IDiskTransaction {
{
}
- void commit() override;
+ void commit(const DB::TransactionCommitOptionsVariant & options) override;
void undo() override
{
diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp
b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp
index 0eb79541af..a21c700efc 100644
--- a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp
+++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp
@@ -90,7 +90,7 @@ std::unique_ptr<DB::WriteBufferFromFileBase>
GlutenDiskHDFS::writeFile(
const DB::WriteSettings & settings)
{
if (throttler)
- throttler->add(1);
+ throttler->throttle(1);
return DiskObjectStorage::writeFile(path, buf_size, mode, settings);
}
}
diff --git
a/cpp-ch/local-engine/Disks/ObjectStorages/MetadataStorageFromRocksDB.cpp
b/cpp-ch/local-engine/Disks/ObjectStorages/MetadataStorageFromRocksDB.cpp
index 7da8092ffb..212a998ddb 100644
--- a/cpp-ch/local-engine/Disks/ObjectStorages/MetadataStorageFromRocksDB.cpp
+++ b/cpp-ch/local-engine/Disks/ObjectStorages/MetadataStorageFromRocksDB.cpp
@@ -257,9 +257,14 @@ rocksdb::DB & MetadataStorageFromRocksDB::getRocksDB()
const
return *rocksdb;
}
-void MetadataStorageFromRocksDBTransaction::commit()
+void MetadataStorageFromRocksDBTransaction::commit(const
DB::TransactionCommitOptionsVariant & options)
{
- commitImpl(metadata_storage.getMetadataMutex());
+ commitImpl(options, metadata_storage.getMetadataMutex());
+}
+
+std::optional<DB::StoredObjects>
MetadataStorageFromRocksDBTransaction::tryGetBlobsFromTransactionIfExists(const
std::string & path) const
+{
+ return metadata_storage.getStorageObjectsIfExist(path);
}
const DB::IMetadataStorage &
MetadataStorageFromRocksDBTransaction::getStorageForNonTransactionalReads()
const
diff --git
a/cpp-ch/local-engine/Disks/ObjectStorages/MetadataStorageFromRocksDB.h
b/cpp-ch/local-engine/Disks/ObjectStorages/MetadataStorageFromRocksDB.h
index a3756c78ef..1609cf75f8 100644
--- a/cpp-ch/local-engine/Disks/ObjectStorages/MetadataStorageFromRocksDB.h
+++ b/cpp-ch/local-engine/Disks/ObjectStorages/MetadataStorageFromRocksDB.h
@@ -64,6 +64,10 @@ private:
DB::SharedMutex & getMetadataMutex() const;
rocksdb::DB & getRocksDB() const;
+public:
+ bool isReadOnly() const override {return false; };
+
+private:
using RocksDBPtr = rocksdb::DB *;
RocksDBPtr rocksdb = nullptr;
mutable DB::SharedMutex metadata_mutex;
@@ -80,7 +84,7 @@ class MetadataStorageFromRocksDBTransaction final : public
DB::IMetadataTransact
public:
MetadataStorageFromRocksDBTransaction(const MetadataStorageFromRocksDB &
metadata_storage_) : metadata_storage(metadata_storage_) { }
- void commit() override;
+ void commit(const DB::TransactionCommitOptionsVariant & options) override;
const DB::IMetadataStorage & getStorageForNonTransactionalReads() const
override;
bool supportsChmod() const override;
void createEmptyMetadataFile(const std::string & path) override;
@@ -92,6 +96,7 @@ public:
void removeDirectory(const std::string &) override;
void removeRecursive(const std::string &) override;
void unlinkFile(const std::string &) override;
+ std::optional<DB::StoredObjects> tryGetBlobsFromTransactionIfExists(const
std::string &) const override;
private:
const MetadataStorageFromRocksDB & metadata_storage;
diff --git a/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h
b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h
index c6dff6df76..36b629c3e7 100644
--- a/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h
+++ b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h
@@ -474,6 +474,21 @@ public:
return DB::makeASTFunction("concat", std::move(arguments));
}
+private:
+ static std::shared_ptr<DB::IPartitionStrategy>
+ make_partition_strategy(const DB::ContextPtr & context, const DB::Names &
partition_columns, const DB::Block & input_header)
+ {
+ DB::ASTPtr partition_by = make_partition_expression(partition_columns,
input_header);
+ return DB::PartitionStrategyFactory::get(
+ DB::PartitionStrategyFactory::StrategyType::WILDCARD,
+ partition_by,
+ input_header.getNamesAndTypesList(),
+ context,
+ "", // format_name => no need
+ false, // globbed_path => no need
+ true,
+ true);
+ }
DB::SinkPtr createSinkForPartition(const String & partition_id) override
{
if (bucketed_write_)
@@ -499,11 +514,11 @@ public:
const DB::Names & partition_by,
const DB::SharedHeader & input_header,
const std::shared_ptr<WriteStatsBase> & stats)
- : PartitionedSink(make_partition_expression(partition_by,
*input_header), context, input_header)
+ : PartitionedSink(make_partition_strategy(context, partition_by,
*input_header), context, input_header)
, context_(context)
, stats_(stats)
- , bucketed_write_(isBucketedWrite(*input_header))
, empty_delta_stats_(DeltaStats::create(*input_header, partition_by))
+ , bucketed_write_(isBucketedWrite(*input_header))
{
}
};
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
index 1aa5821033..be0b6c0cbc 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
@@ -598,7 +598,7 @@ private:
.use_environment_credentials = true,
.use_insecure_imds_request = false,
.role_arn = getSetting(settings, bucket_name,
BackendInitializerUtil::HADOOP_S3_ASSUMED_ROLE),
- .session_name = getSetting(settings, bucket_name,
BackendInitializerUtil::HADOOP_S3_ASSUMED_SESSION_NAME),
+ .role_session_name = getSetting(settings, bucket_name,
BackendInitializerUtil::HADOOP_S3_ASSUMED_SESSION_NAME),
.external_id = getSetting(settings, bucket_name,
BackendInitializerUtil::HADOOP_S3_ASSUMED_EXTERNAL_ID)});
//TODO: support online change config for cached per_bucket_clients
diff --git a/cpp-ch/local-engine/tests/gtest_write_pipeline.cpp
b/cpp-ch/local-engine/tests/gtest_write_pipeline.cpp
index 832a3f676e..01d3721924 100644
--- a/cpp-ch/local-engine/tests/gtest_write_pipeline.cpp
+++ b/cpp-ch/local-engine/tests/gtest_write_pipeline.cpp
@@ -108,7 +108,7 @@ TEST(LocalExecutor, StorageObjectStorageSink)
/// 1. Create ObjectStorageSink
auto config_cloned_ptr =
std::make_shared<StorageHDFSConfiguration>(config);
DB::StorageObjectStorageSink sink{
- config_cloned_ptr->getPaths().back(),
+ config_cloned_ptr->getPaths().back().path,
object_storage,
config_cloned_ptr,
{},
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]