liuneng1994 commented on code in PR #5529:
URL: https://github.com/apache/incubator-gluten/pull/5529#discussion_r1584317945
##########
cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h:
##########
@@ -40,63 +42,56 @@ struct PartInfo
size_t row_count;
std::unordered_map<String, String> partition_values;
String bucket_id;
+
+ bool operator<(const PartInfo & rhs) const { return disk_size <
rhs.disk_size; }
};
class SparkMergeTreeWriter
{
public:
static String partInfosToJson(const std::vector<PartInfo> & part_infos);
SparkMergeTreeWriter(
- DB::MergeTreeData & storage_,
+ CustomStorageMergeTreePtr storage_,
const DB::StorageMetadataPtr & metadata_snapshot_,
const DB::ContextPtr & context_,
const String & uuid_,
const String & partition_dir_ = "",
- const String & bucket_dir_ = "")
- : storage(storage_)
- , metadata_snapshot(metadata_snapshot_)
- , context(context_)
- , uuid(uuid_)
- , partition_dir(partition_dir_)
- , bucket_dir(bucket_dir_)
- {
- const DB::Settings & settings = context->getSettingsRef();
- squashing_transform
- =
std::make_unique<DB::SquashingTransform>(settings.min_insert_block_size_rows,
settings.min_insert_block_size_bytes);
- if (!partition_dir.empty())
- {
- Poco::StringTokenizer partitions(partition_dir, "/");
- for (const auto & partition : partitions)
- {
- Poco::StringTokenizer key_value(partition, "=");
- chassert(key_value.count() == 2);
- partition_values.emplace(key_value[0], key_value[1]);
- }
- }
- header = metadata_snapshot->getSampleBlock();
- }
+ const String & bucket_dir_ = "");
void write(DB::Block & block);
void finalize();
std::vector<PartInfo> getAllPartInfo();
private:
- DB::MergeTreeDataWriter::TemporaryPart
- writeTempPart(DB::BlockWithPartition & block_with_partition, const
DB::StorageMetadataPtr & metadata_snapshot);
+ void
+ writeTempPart(MergeTreeDataWriter::TemporaryPart & temp_part,
DB::BlockWithPartition & block_with_partition, const DB::StorageMetadataPtr &
metadata_snapshot);
DB::MergeTreeDataWriter::TemporaryPart
writeTempPartAndFinalize(DB::BlockWithPartition & block_with_partition,
const DB::StorageMetadataPtr & metadata_snapshot);
+ void checkAndMerge(bool force = false);
+ void safeEmplaceBackPart(DB::MergeTreeDataPartPtr);
+ void safeAddPart(DB::MergeTreeDataPartPtr);
+ void manualFreeMemory(size_t before_write_memory);
+
Review Comment:
redundant blank line
##########
cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h:
##########
@@ -40,63 +42,56 @@ struct PartInfo
size_t row_count;
std::unordered_map<String, String> partition_values;
String bucket_id;
+
+ bool operator<(const PartInfo & rhs) const { return disk_size <
rhs.disk_size; }
};
class SparkMergeTreeWriter
{
public:
static String partInfosToJson(const std::vector<PartInfo> & part_infos);
SparkMergeTreeWriter(
- DB::MergeTreeData & storage_,
+ CustomStorageMergeTreePtr storage_,
const DB::StorageMetadataPtr & metadata_snapshot_,
const DB::ContextPtr & context_,
const String & uuid_,
const String & partition_dir_ = "",
- const String & bucket_dir_ = "")
- : storage(storage_)
- , metadata_snapshot(metadata_snapshot_)
- , context(context_)
- , uuid(uuid_)
- , partition_dir(partition_dir_)
- , bucket_dir(bucket_dir_)
- {
- const DB::Settings & settings = context->getSettingsRef();
- squashing_transform
- =
std::make_unique<DB::SquashingTransform>(settings.min_insert_block_size_rows,
settings.min_insert_block_size_bytes);
- if (!partition_dir.empty())
- {
- Poco::StringTokenizer partitions(partition_dir, "/");
- for (const auto & partition : partitions)
- {
- Poco::StringTokenizer key_value(partition, "=");
- chassert(key_value.count() == 2);
- partition_values.emplace(key_value[0], key_value[1]);
- }
- }
- header = metadata_snapshot->getSampleBlock();
- }
+ const String & bucket_dir_ = "");
void write(DB::Block & block);
void finalize();
std::vector<PartInfo> getAllPartInfo();
private:
- DB::MergeTreeDataWriter::TemporaryPart
- writeTempPart(DB::BlockWithPartition & block_with_partition, const
DB::StorageMetadataPtr & metadata_snapshot);
+ void
+ writeTempPart(MergeTreeDataWriter::TemporaryPart & temp_part,
DB::BlockWithPartition & block_with_partition, const DB::StorageMetadataPtr &
metadata_snapshot);
DB::MergeTreeDataWriter::TemporaryPart
writeTempPartAndFinalize(DB::BlockWithPartition & block_with_partition,
const DB::StorageMetadataPtr & metadata_snapshot);
+ void checkAndMerge(bool force = false);
+ void safeEmplaceBackPart(DB::MergeTreeDataPartPtr);
+ void safeAddPart(DB::MergeTreeDataPartPtr);
+ void manualFreeMemory(size_t before_write_memory);
+
String uuid;
String partition_dir;
String bucket_dir;
- DB::MergeTreeData & storage;
+ CustomStorageMergeTreePtr storage;
DB::StorageMetadataPtr metadata_snapshot;
DB::ContextPtr context;
std::unique_ptr<DB::SquashingTransform> squashing_transform;
int part_num = 1;
- std::vector<DB::MergeTreeDataPartPtr> new_parts;
+ ConcurrentDeque<DB::MergeTreeDataPartPtr> new_parts;
+
+
Review Comment:
redundant blank line
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]