This is an automated email from the ASF dual-hosted git repository. zhangzc pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push: new 300c3fae3 [Gluten-5152][CH] fix core dump issues when running in parallel (#5235) 300c3fae3 is described below commit 300c3fae301b104bf009d7db636c66b305e734bd Author: Hongbin Ma <mahong...@apache.org> AuthorDate: Mon Apr 1 20:33:42 2024 +0800 [Gluten-5152][CH] fix core dump issues when running in parallel (#5235) [CH] fix core dump issues when running in parallel --- .../spark/sql/delta/ClickhouseSnapshot.scala | 4 +-- .../sql/delta/catalog/ClickHouseTableV2.scala | 3 ++- .../utils/MergeTreePartsPartitionsUtil.scala | 4 +-- cpp-ch/local-engine/Operator/ExpandTransform.cpp | 1 - cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp | 2 +- .../Storages/CustomStorageMergeTree.cpp | 10 ++++++- .../Storages/StorageMergeTreeFactory.cpp | 31 +++++++++++++++++++--- 7 files changed, 44 insertions(+), 11 deletions(-) diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/ClickhouseSnapshot.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/ClickhouseSnapshot.scala index d88f437e5..0f53ad478 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/ClickhouseSnapshot.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/ClickhouseSnapshot.scala @@ -88,7 +88,7 @@ object ClickhouseSnapshot { .build() val addFileToAddMTPCache: LoadingCache[AddFileAsKey, AddMergeTreeParts] = CacheBuilder.newBuilder - .maximumSize(100000) + .maximumSize(1000000) .expireAfterAccess(3600L, TimeUnit.SECONDS) .recordStats .build[AddFileAsKey, AddMergeTreeParts](new CacheLoader[AddFileAsKey, AddMergeTreeParts]() { @@ -99,7 +99,7 @@ object ClickhouseSnapshot { }) val pathToAddMTPCache: Cache[String, AddMergeTreeParts] = CacheBuilder.newBuilder - .maximumSize(100000) + .maximumSize(1000000) .expireAfterAccess(3600L, TimeUnit.SECONDS) .recordStats() .build() diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala index 92d12c05f..11b2b18e9 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala @@ -314,7 +314,8 @@ object ClickHouseTableV2 extends Logging { } else if (temporalThreadLocalCHTable.get() != null) { temporalThreadLocalCHTable.get() } else { - throw new IllegalStateException("Can not find ClickHouseTableV2") + throw new IllegalStateException( + s"Can not find ClickHouseTableV2 for deltalog ${deltaLog.dataPath}") } } diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala index e6861b145..e71405a31 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala @@ -175,7 +175,7 @@ object MergeTreePartsPartitionsUtil extends Logging { "Can't find AddMergeTreeParts from cache pathToAddMTPCache for key: " + path + ". This happens when too many new entries are added to " + "pathToAddMTPCache during current query. " + - "Try rerun current query. KeySample: " + keySample + "Try rerun current query. Existing KeySample: " + keySample ) } ret @@ -307,7 +307,7 @@ object MergeTreePartsPartitionsUtil extends Logging { "Can't find AddMergeTreeParts from cache pathToAddMTPCache for key: " + path + ". This happens when too many new entries are added to " + "pathToAddMTPCache during current query. " + - "Try rerun current query. KeySample: " + keySample) + "Try rerun current query. Existing KeySample: " + keySample) } ret })) diff --git a/cpp-ch/local-engine/Operator/ExpandTransform.cpp b/cpp-ch/local-engine/Operator/ExpandTransform.cpp index b8cff215b..d48d48439 100644 --- a/cpp-ch/local-engine/Operator/ExpandTransform.cpp +++ b/cpp-ch/local-engine/Operator/ExpandTransform.cpp @@ -16,7 +16,6 @@ */ #include <memory> -#include "Columns/ColumnSparse.h" #include <Columns/ColumnNullable.h> #include <Columns/ColumnsNumber.h> #include <Columns/IColumn.h> diff --git a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp index 22702d9a9..811eaa6d9 100644 --- a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp +++ b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp @@ -66,7 +66,7 @@ static Int64 findMinPosition(const NameSet & condition_table_columns, const Name CustomStorageMergeTreePtr MergeTreeRelParser::parseStorage( const substrait::ReadRel::ExtensionTable & extension_table, - ContextMutablePtr context, UUID uuid) + ContextMutablePtr context, UUID uuid) { google::protobuf::StringValue table; table.ParseFromString(extension_table.detail().value()); diff --git a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp index 02123aac9..c8f0b1f32 100644 --- a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp +++ b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp @@ -108,7 +108,15 @@ DataPartsVector CustomStorageMergeTree::loadDataPartsWithNames(std::unordered_se auto res = loadDataPart(part_info, name, disk, MergeTreeDataPartState::Active); data_parts.emplace_back(res.part); } - calculateColumnAndSecondaryIndexSizesImpl(); // without it "test mergetree optimize partitioned by one low card column" will log ERROR + + if(getStorageID().hasUUID()) + { + // the following lines will modify storage's member. + // So when current storage is shared (when UUID is default Nil value), + // we should avoid modify because we don't have locks here + + calculateColumnAndSecondaryIndexSizesImpl(); // without it "test mergetree optimize partitioned by one low card column" will log ERROR + } return data_parts; } diff --git a/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.cpp b/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.cpp index f35d8a8a9..d5b23ffbd 100644 --- a/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.cpp +++ b/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.cpp @@ -27,11 +27,36 @@ StorageMergeTreeFactory & StorageMergeTreeFactory::instance() void StorageMergeTreeFactory::freeStorage(StorageID id) { auto table_name = id.database_name + "." + id.table_name + "@" + toString(id.uuid); - std::lock_guard lock(storage_map_mutex); - if (storage_map.contains(table_name)) + + { - storage_map.erase(table_name); + std::lock_guard lock(storage_map_mutex); + if (storage_map.contains(table_name)) + { + storage_map.erase(table_name); + } + if (storage_columns_map.contains(table_name)) + { + storage_columns_map.erase(table_name); + } } + + { + std::lock_guard lock(datapart_mutex); + if (datapart_map.contains(table_name)) + { + datapart_map.erase(table_name); + } + } + + { + std::lock_guard lock(metadata_map_mutex); + if (metadata_map.contains(table_name)) + { + metadata_map.erase(table_name); + } + } + } CustomStorageMergeTreePtr --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@gluten.apache.org For additional commands, e-mail: commits-h...@gluten.apache.org