This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new f579eaebd [LBM] append CREATE/DELETE records in batch to metadata
f579eaebd is described below

commit f579eaebdae9b63a6e028cbf07c49e62b827def9
Author: Yingchun Lai <laiyingc...@apache.org>
AuthorDate: Mon Feb 20 17:23:15 2023 +0800

    [LBM] append CREATE/DELETE records in batch to metadata
    
    Minor code refactor to append CREATE/DELETE records
    in batch to metadata, without any functional changes.
    
    This is a sub-task of KUDU-3371, updating metadata in
    batch will be beneficial to record metadata in RocksDB.
    
    Since there is no functional changes, so this patch
    doesn't add new tests, but make sure all tests passed.
    
    The simple benchmark LogBlockManagerTest.StartupBenchmark
    shows that there is about 5% time reduced for
    'create blocks' and 'delete blocks' stages.
    
    Change-Id: Ie24adab3b1dbbea55108a1f525093136fcf5a726
    Reviewed-on: http://gerrit.cloudera.org:8080/19519
    Tested-by: Alexey Serbin <ale...@apache.org>
    Reviewed-by: Alexey Serbin <ale...@apache.org>
---
 src/kudu/fs/log_block_manager.cc | 157 +++++++++++++++++++++++++--------------
 1 file changed, 103 insertions(+), 54 deletions(-)

diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index 8f71b98bb..bc1852d42 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -23,6 +23,7 @@
 #include <cstddef>
 #include <cstdint>
 #include <functional>
+#include <iterator>
 #include <map>
 #include <memory>
 #include <mutex>
@@ -377,13 +378,12 @@ class LogWritableBlock final : public WritableBlock {
   // Starts an asynchronous flush of dirty block data to disk.
   Status FlushDataAsync();
 
-  // Write this block's metadata to disk.
-  //
-  // Does not synchronize the written data; that takes place in Close().
-  Status AppendMetadata();
-
   LogBlockContainer* container() const { return container_.get(); }
 
+  int64_t block_offset() const { return block_offset_; }
+
+  int64_t block_length() const { return block_length_; }
+
  private:
   // The owning container.
   LogBlockContainerRefPtr container_;
@@ -458,7 +458,8 @@ class LogBlockContainer: public 
RefCountedThreadSafe<LogBlockContainer> {
 
   // Frees the space associated with a block or a group of blocks at 'offset'
   // and 'length'. This is a physical operation, not a logical one; a separate
-  // AppendMetadata() is required to record the deletion in container metadata.
+  // AppendMetadataForDeleteRecords() is required to record the deletions in
+  // container metadata.
   //
   // The on-disk effects of this call are made durable only after SyncData().
   Status PunchHole(int64_t offset, int64_t length);
@@ -487,10 +488,18 @@ class LogBlockContainer: public 
RefCountedThreadSafe<LogBlockContainer> {
   // See RWFile::ReadV().
   Status ReadVData(int64_t offset, ArrayView<Slice> results) const;
 
-  // Appends 'pb' to this container's metadata file.
+  // Appends block ids to delete to this container's metadata file according 
to 'lbs',
+  // the block ids deleted successfully are returned by 'deleted_block_ids', 
even if
+  // returning non-OK status.
   //
   // The on-disk effects of this call are made durable only after 
SyncMetadata().
-  Status AppendMetadata(const BlockRecordPB& pb);
+  Status AppendMetadataForDeleteRecords(const vector<LogBlockRefPtr>& lbs,
+                                        vector<BlockId>* deleted_block_ids);
+
+  // Appends 'blocks' to add to this container's metadata file.
+  //
+  // The on-disk effects of this call are made durable only after 
SyncMetadata().
+  Status AppendMetadataForCreateRecords(const vector<LogWritableBlock*>& 
blocks);
 
   // Asynchronously flush this container's data file from 'offset' through
   // to 'length'.
@@ -1307,13 +1316,10 @@ Status LogBlockContainer::DoCloseBlocks(const 
vector<LogWritableBlock*>& blocks,
 
     // Append metadata only after data is synced so that there's
     // no chance of metadata landing on the disk before the data.
-    for (auto* block : blocks) {
-      RETURN_NOT_OK_PREPEND(block->AppendMetadata(),
-                            "unable to append block's metadata during close");
-    }
+    RETURN_NOT_OK_PREPEND(AppendMetadataForCreateRecords(blocks),
+                          "unable to append creation record(s) to block 
metadata during close");
 
     if (mode == SYNC) {
-      VLOG(3) << "Syncing metadata file " << metadata_file_->filename();
       RETURN_NOT_OK(SyncMetadata());
     }
 
@@ -1378,21 +1384,13 @@ Status LogBlockContainer::ReadData(int64_t offset, 
Slice result) const {
   RETURN_NOT_OK_HANDLE_ERROR(data_file_->Read(offset, result));
   return Status::OK();
 }
+
 Status LogBlockContainer::ReadVData(int64_t offset, ArrayView<Slice> results) 
const {
   DCHECK_GE(offset, 0);
   RETURN_NOT_OK_HANDLE_ERROR(data_file_->ReadV(offset, results));
   return Status::OK();
 }
 
-Status LogBlockContainer::AppendMetadata(const BlockRecordPB& pb) {
-  RETURN_NOT_OK_HANDLE_ERROR(read_only_status());
-  // Note: We don't check for sufficient disk space for metadata writes in
-  // order to allow for block deletion on full disks.
-  shared_lock<RWMutex> l(metadata_compact_lock_);
-  RETURN_NOT_OK_HANDLE_ERROR(metadata_file_->Append(pb));
-  return Status::OK();
-}
-
 Status LogBlockContainer::FlushData(int64_t offset, int64_t length) {
   RETURN_NOT_OK_HANDLE_ERROR(read_only_status());
   DCHECK_GE(offset, 0);
@@ -1410,7 +1408,63 @@ Status LogBlockContainer::SyncData() {
   return Status::OK();
 }
 
+Status LogBlockContainer::AppendMetadataForDeleteRecords(const 
vector<LogBlockRefPtr>& lbs,
+                                                         vector<BlockId>* 
deleted_block_ids) {
+  RETURN_NOT_OK_HANDLE_ERROR(read_only_status());
+  // Note: We don't check for sufficient disk space for metadata writes in
+  // order to allow for block deletion on full disks.
+  DCHECK(deleted_block_ids);
+  deleted_block_ids->reserve(lbs.size());
+  vector<BlockRecordPB> records;
+  records.reserve(lbs.size());
+  BlockRecordPB record;
+  record.set_op_type(DELETE);
+  record.set_timestamp_us(GetCurrentTimeMicros());
+  for (const auto& lb : lbs) {
+    lb->block_id().CopyToPB(record.mutable_block_id());
+    records.emplace_back(record);
+    deleted_block_ids->emplace_back(lb->block_id());
+  }
+
+  int deleted_count = 0;
+  SCOPED_CLEANUP({
+    // Purge the blocks that failed to delete.
+    deleted_block_ids->resize(deleted_count);
+  });
+
+  shared_lock<RWMutex> l(metadata_compact_lock_);
+  for (const auto& r : records) {
+    RETURN_NOT_OK_HANDLE_ERROR(metadata_file_->Append(r));
+    ++deleted_count;
+  }
+
+  return Status::OK();
+}
+
+Status LogBlockContainer::AppendMetadataForCreateRecords(const 
vector<LogWritableBlock*>& blocks) {
+  RETURN_NOT_OK_HANDLE_ERROR(read_only_status());
+  vector<BlockRecordPB> records;
+  records.reserve(blocks.size());
+  BlockRecordPB record;
+  record.set_op_type(CREATE);
+  record.set_timestamp_us(GetCurrentTimeMicros());
+  for (const auto* block : blocks) {
+    block->id().CopyToPB(record.mutable_block_id());
+    record.set_offset(block->block_offset());
+    record.set_length(block->block_length());
+    records.emplace_back(record);
+  }
+
+  shared_lock<RWMutex> l(metadata_compact_lock_);
+  for (const auto& r : records) {
+    RETURN_NOT_OK_HANDLE_ERROR(metadata_file_->Append(r));
+  }
+
+  return Status::OK();
+}
+
 Status LogBlockContainer::SyncMetadata() {
+  VLOG(3) << "Syncing metadata file " << metadata_file_->filename();
   RETURN_NOT_OK_HANDLE_ERROR(read_only_status());
   if (FLAGS_enable_data_block_fsync) {
     if (metrics_) metrics_->generic_metrics.total_disk_sync->Increment();
@@ -2037,16 +2091,6 @@ void LogWritableBlock::DoClose() {
   state_ = CLOSED;
 }
 
-Status LogWritableBlock::AppendMetadata() {
-  BlockRecordPB record;
-  id().CopyToPB(record.mutable_block_id());
-  record.set_op_type(CREATE);
-  record.set_timestamp_us(GetCurrentTimeMicros());
-  record.set_offset(block_offset_);
-  record.set_length(block_length_);
-  return container_->AppendMetadata(record);
-}
-
 ////////////////////////////////////////////////////////////
 // LogReadableBlock
 ////////////////////////////////////////////////////////////
@@ -2662,6 +2706,8 @@ Status LogBlockManager::RemoveLogBlocks(const 
vector<BlockId>& block_ids,
   DCHECK(log_blocks);
   Status first_failure;
   vector<LogBlockRefPtr> lbs;
+  lbs.reserve(block_ids.size());
+  log_blocks->reserve(block_ids.size());
   if (deleted) {
     deleted->reserve(block_ids.size());
   }
@@ -2692,18 +2738,23 @@ Status LogBlockManager::RemoveLogBlocks(const 
vector<BlockId>& block_ids,
     metrics()->bytes_under_management->DecrementBy(blocks_length);
   }
 
+  unordered_map<LogBlockContainer*, vector<LogBlockRefPtr>> lbs_by_containers;
   for (auto& lb : lbs) {
-    VLOG(3) << "Deleting block " << lb->block_id();
-    lb->container()->BlockDeleted(lb);
+    auto& lbs_by_container = LookupOrInsert(&lbs_by_containers, 
lb->container(), {});
+    lbs_by_container.emplace_back(std::move(lb));
+  }
 
+  for (auto& [container, clbs] : lbs_by_containers) {
+    for (const auto& lb : clbs) {
+      VLOG(3) << "Deleting block " << lb->block_id();
+      // TODO(yingchun): BlockDeleted in batch
+      container->BlockDeleted(lb);
+    }
     // Record the on-disk deletion.
     //
     // TODO(unknown): what if this fails? Should we restore the in-memory 
block?
-    BlockRecordPB record;
-    lb->block_id().CopyToPB(record.mutable_block_id());
-    record.set_op_type(DELETE);
-    record.set_timestamp_us(GetCurrentTimeMicros());
-    Status s = lb->container()->AppendMetadata(record);
+    vector<BlockId> deleted_block_ids;
+    Status s = container->AppendMetadataForDeleteRecords(clbs, 
&deleted_block_ids);
 
     // We don't bother fsyncing the metadata append for deletes in order to 
avoid
     // the disk overhead. Even if we did fsync it, we'd still need to account 
for
@@ -2711,28 +2762,26 @@ Status LogBlockManager::RemoveLogBlocks(const 
vector<BlockId>& block_ids,
     // fsync).
     //
     // TODO(KUDU-829): Implement GC of orphaned blocks.
-
+    // TODO(yingchun): Add some metrics to track the number of orphaned blocks.
     if (!s.ok()) {
       if (first_failure.ok()) {
-        first_failure = s.CloneAndPrepend(
-            "Unable to append deletion record to block metadata");
+        first_failure = s.CloneAndPrepend("Unable to append deletion record(s) 
to block metadata");
       }
+      // Purge the blocks that failed to delete.
+      clbs.resize(deleted_block_ids.size());
     } else {
       // Metadata files of containers with very few live blocks will be 
compacted.
-      if (!lb->container()->read_only() &&
-          FLAGS_log_container_metadata_runtime_compact &&
-          lb->container()->ShouldCompact()) {
-        scoped_refptr<LogBlockContainer> self(lb->container());
-        lb->container()->ExecClosure([self]() {
-          self->CompactMetadata();
-        });
+      if (!container->read_only() && 
FLAGS_log_container_metadata_runtime_compact &&
+          container->ShouldCompact()) {
+        scoped_refptr<LogBlockContainer> self(container);
+        container->ExecClosure([self]() { self->CompactMetadata(); });
       }
+    }
 
-      if (deleted) {
-        deleted->emplace_back(lb->block_id());
-      }
-      log_blocks->emplace_back(std::move(lb));
+    if (deleted) {
+      std::move(deleted_block_ids.begin(), deleted_block_ids.end(), 
std::back_inserter(*deleted));
     }
+    std::move(clbs.begin(), clbs.end(), std::back_inserter(*log_blocks));
   }
 
   return first_failure;
@@ -2746,7 +2795,7 @@ Status LogBlockManager::RemoveLogBlock(const BlockId& 
block_id,
 
   auto it = blocks_by_block_id->find(block_id);
   if (it == blocks_by_block_id->end()) {
-    return Status::NotFound("Can't find block", block_id.ToString());
+    return Status::NotFound("cannot find block to remove", 
block_id.ToString());
   }
 
   LogBlockContainer* container = it->second->container();

Reply via email to