This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push: new f579eaebd [LBM] append CREATE/DELETE records in batch to metadata f579eaebd is described below commit f579eaebdae9b63a6e028cbf07c49e62b827def9 Author: Yingchun Lai <laiyingc...@apache.org> AuthorDate: Mon Feb 20 17:23:15 2023 +0800 [LBM] append CREATE/DELETE records in batch to metadata Minor code refactor to append CREATE/DELETE records in batch to metadata, without any functional changes. This is a sub-task of KUDU-3371, updating metadata in batch will be beneficial to record metadata in RocksDB. Since there is no functional changes, so this patch doesn't add new tests, but make sure all tests passed. The simple benchmark LogBlockManagerTest.StartupBenchmark shows that there is about 5% time reduced for 'create blocks' and 'delete blocks' stages. Change-Id: Ie24adab3b1dbbea55108a1f525093136fcf5a726 Reviewed-on: http://gerrit.cloudera.org:8080/19519 Tested-by: Alexey Serbin <ale...@apache.org> Reviewed-by: Alexey Serbin <ale...@apache.org> --- src/kudu/fs/log_block_manager.cc | 157 +++++++++++++++++++++++++-------------- 1 file changed, 103 insertions(+), 54 deletions(-) diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc index 8f71b98bb..bc1852d42 100644 --- a/src/kudu/fs/log_block_manager.cc +++ b/src/kudu/fs/log_block_manager.cc @@ -23,6 +23,7 @@ #include <cstddef> #include <cstdint> #include <functional> +#include <iterator> #include <map> #include <memory> #include <mutex> @@ -377,13 +378,12 @@ class LogWritableBlock final : public WritableBlock { // Starts an asynchronous flush of dirty block data to disk. Status FlushDataAsync(); - // Write this block's metadata to disk. - // - // Does not synchronize the written data; that takes place in Close(). - Status AppendMetadata(); - LogBlockContainer* container() const { return container_.get(); } + int64_t block_offset() const { return block_offset_; } + + int64_t block_length() const { return block_length_; } + private: // The owning container. LogBlockContainerRefPtr container_; @@ -458,7 +458,8 @@ class LogBlockContainer: public RefCountedThreadSafe<LogBlockContainer> { // Frees the space associated with a block or a group of blocks at 'offset' // and 'length'. This is a physical operation, not a logical one; a separate - // AppendMetadata() is required to record the deletion in container metadata. + // AppendMetadataForDeleteRecords() is required to record the deletions in + // container metadata. // // The on-disk effects of this call are made durable only after SyncData(). Status PunchHole(int64_t offset, int64_t length); @@ -487,10 +488,18 @@ class LogBlockContainer: public RefCountedThreadSafe<LogBlockContainer> { // See RWFile::ReadV(). Status ReadVData(int64_t offset, ArrayView<Slice> results) const; - // Appends 'pb' to this container's metadata file. + // Appends block ids to delete to this container's metadata file according to 'lbs', + // the block ids deleted successfully are returned by 'deleted_block_ids', even if + // returning non-OK status. // // The on-disk effects of this call are made durable only after SyncMetadata(). - Status AppendMetadata(const BlockRecordPB& pb); + Status AppendMetadataForDeleteRecords(const vector<LogBlockRefPtr>& lbs, + vector<BlockId>* deleted_block_ids); + + // Appends 'blocks' to add to this container's metadata file. + // + // The on-disk effects of this call are made durable only after SyncMetadata(). + Status AppendMetadataForCreateRecords(const vector<LogWritableBlock*>& blocks); // Asynchronously flush this container's data file from 'offset' through // to 'length'. @@ -1307,13 +1316,10 @@ Status LogBlockContainer::DoCloseBlocks(const vector<LogWritableBlock*>& blocks, // Append metadata only after data is synced so that there's // no chance of metadata landing on the disk before the data. - for (auto* block : blocks) { - RETURN_NOT_OK_PREPEND(block->AppendMetadata(), - "unable to append block's metadata during close"); - } + RETURN_NOT_OK_PREPEND(AppendMetadataForCreateRecords(blocks), + "unable to append creation record(s) to block metadata during close"); if (mode == SYNC) { - VLOG(3) << "Syncing metadata file " << metadata_file_->filename(); RETURN_NOT_OK(SyncMetadata()); } @@ -1378,21 +1384,13 @@ Status LogBlockContainer::ReadData(int64_t offset, Slice result) const { RETURN_NOT_OK_HANDLE_ERROR(data_file_->Read(offset, result)); return Status::OK(); } + Status LogBlockContainer::ReadVData(int64_t offset, ArrayView<Slice> results) const { DCHECK_GE(offset, 0); RETURN_NOT_OK_HANDLE_ERROR(data_file_->ReadV(offset, results)); return Status::OK(); } -Status LogBlockContainer::AppendMetadata(const BlockRecordPB& pb) { - RETURN_NOT_OK_HANDLE_ERROR(read_only_status()); - // Note: We don't check for sufficient disk space for metadata writes in - // order to allow for block deletion on full disks. - shared_lock<RWMutex> l(metadata_compact_lock_); - RETURN_NOT_OK_HANDLE_ERROR(metadata_file_->Append(pb)); - return Status::OK(); -} - Status LogBlockContainer::FlushData(int64_t offset, int64_t length) { RETURN_NOT_OK_HANDLE_ERROR(read_only_status()); DCHECK_GE(offset, 0); @@ -1410,7 +1408,63 @@ Status LogBlockContainer::SyncData() { return Status::OK(); } +Status LogBlockContainer::AppendMetadataForDeleteRecords(const vector<LogBlockRefPtr>& lbs, + vector<BlockId>* deleted_block_ids) { + RETURN_NOT_OK_HANDLE_ERROR(read_only_status()); + // Note: We don't check for sufficient disk space for metadata writes in + // order to allow for block deletion on full disks. + DCHECK(deleted_block_ids); + deleted_block_ids->reserve(lbs.size()); + vector<BlockRecordPB> records; + records.reserve(lbs.size()); + BlockRecordPB record; + record.set_op_type(DELETE); + record.set_timestamp_us(GetCurrentTimeMicros()); + for (const auto& lb : lbs) { + lb->block_id().CopyToPB(record.mutable_block_id()); + records.emplace_back(record); + deleted_block_ids->emplace_back(lb->block_id()); + } + + int deleted_count = 0; + SCOPED_CLEANUP({ + // Purge the blocks that failed to delete. + deleted_block_ids->resize(deleted_count); + }); + + shared_lock<RWMutex> l(metadata_compact_lock_); + for (const auto& r : records) { + RETURN_NOT_OK_HANDLE_ERROR(metadata_file_->Append(r)); + ++deleted_count; + } + + return Status::OK(); +} + +Status LogBlockContainer::AppendMetadataForCreateRecords(const vector<LogWritableBlock*>& blocks) { + RETURN_NOT_OK_HANDLE_ERROR(read_only_status()); + vector<BlockRecordPB> records; + records.reserve(blocks.size()); + BlockRecordPB record; + record.set_op_type(CREATE); + record.set_timestamp_us(GetCurrentTimeMicros()); + for (const auto* block : blocks) { + block->id().CopyToPB(record.mutable_block_id()); + record.set_offset(block->block_offset()); + record.set_length(block->block_length()); + records.emplace_back(record); + } + + shared_lock<RWMutex> l(metadata_compact_lock_); + for (const auto& r : records) { + RETURN_NOT_OK_HANDLE_ERROR(metadata_file_->Append(r)); + } + + return Status::OK(); +} + Status LogBlockContainer::SyncMetadata() { + VLOG(3) << "Syncing metadata file " << metadata_file_->filename(); RETURN_NOT_OK_HANDLE_ERROR(read_only_status()); if (FLAGS_enable_data_block_fsync) { if (metrics_) metrics_->generic_metrics.total_disk_sync->Increment(); @@ -2037,16 +2091,6 @@ void LogWritableBlock::DoClose() { state_ = CLOSED; } -Status LogWritableBlock::AppendMetadata() { - BlockRecordPB record; - id().CopyToPB(record.mutable_block_id()); - record.set_op_type(CREATE); - record.set_timestamp_us(GetCurrentTimeMicros()); - record.set_offset(block_offset_); - record.set_length(block_length_); - return container_->AppendMetadata(record); -} - //////////////////////////////////////////////////////////// // LogReadableBlock //////////////////////////////////////////////////////////// @@ -2662,6 +2706,8 @@ Status LogBlockManager::RemoveLogBlocks(const vector<BlockId>& block_ids, DCHECK(log_blocks); Status first_failure; vector<LogBlockRefPtr> lbs; + lbs.reserve(block_ids.size()); + log_blocks->reserve(block_ids.size()); if (deleted) { deleted->reserve(block_ids.size()); } @@ -2692,18 +2738,23 @@ Status LogBlockManager::RemoveLogBlocks(const vector<BlockId>& block_ids, metrics()->bytes_under_management->DecrementBy(blocks_length); } + unordered_map<LogBlockContainer*, vector<LogBlockRefPtr>> lbs_by_containers; for (auto& lb : lbs) { - VLOG(3) << "Deleting block " << lb->block_id(); - lb->container()->BlockDeleted(lb); + auto& lbs_by_container = LookupOrInsert(&lbs_by_containers, lb->container(), {}); + lbs_by_container.emplace_back(std::move(lb)); + } + for (auto& [container, clbs] : lbs_by_containers) { + for (const auto& lb : clbs) { + VLOG(3) << "Deleting block " << lb->block_id(); + // TODO(yingchun): BlockDeleted in batch + container->BlockDeleted(lb); + } // Record the on-disk deletion. // // TODO(unknown): what if this fails? Should we restore the in-memory block? - BlockRecordPB record; - lb->block_id().CopyToPB(record.mutable_block_id()); - record.set_op_type(DELETE); - record.set_timestamp_us(GetCurrentTimeMicros()); - Status s = lb->container()->AppendMetadata(record); + vector<BlockId> deleted_block_ids; + Status s = container->AppendMetadataForDeleteRecords(clbs, &deleted_block_ids); // We don't bother fsyncing the metadata append for deletes in order to avoid // the disk overhead. Even if we did fsync it, we'd still need to account for @@ -2711,28 +2762,26 @@ Status LogBlockManager::RemoveLogBlocks(const vector<BlockId>& block_ids, // fsync). // // TODO(KUDU-829): Implement GC of orphaned blocks. - + // TODO(yingchun): Add some metrics to track the number of orphaned blocks. if (!s.ok()) { if (first_failure.ok()) { - first_failure = s.CloneAndPrepend( - "Unable to append deletion record to block metadata"); + first_failure = s.CloneAndPrepend("Unable to append deletion record(s) to block metadata"); } + // Purge the blocks that failed to delete. + clbs.resize(deleted_block_ids.size()); } else { // Metadata files of containers with very few live blocks will be compacted. - if (!lb->container()->read_only() && - FLAGS_log_container_metadata_runtime_compact && - lb->container()->ShouldCompact()) { - scoped_refptr<LogBlockContainer> self(lb->container()); - lb->container()->ExecClosure([self]() { - self->CompactMetadata(); - }); + if (!container->read_only() && FLAGS_log_container_metadata_runtime_compact && + container->ShouldCompact()) { + scoped_refptr<LogBlockContainer> self(container); + container->ExecClosure([self]() { self->CompactMetadata(); }); } + } - if (deleted) { - deleted->emplace_back(lb->block_id()); - } - log_blocks->emplace_back(std::move(lb)); + if (deleted) { + std::move(deleted_block_ids.begin(), deleted_block_ids.end(), std::back_inserter(*deleted)); } + std::move(clbs.begin(), clbs.end(), std::back_inserter(*log_blocks)); } return first_failure; @@ -2746,7 +2795,7 @@ Status LogBlockManager::RemoveLogBlock(const BlockId& block_id, auto it = blocks_by_block_id->find(block_id); if (it == blocks_by_block_id->end()) { - return Status::NotFound("Can't find block", block_id.ToString()); + return Status::NotFound("cannot find block to remove", block_id.ToString()); } LogBlockContainer* container = it->second->container();