westonpace commented on a change in pull request #10145:
URL: https://github.com/apache/arrow/pull/10145#discussion_r621391543



##########
File path: cpp/src/arrow/io/caching.cc
##########
@@ -149,12 +174,111 @@ struct ReadRangeCache::Impl {
     } else {
       entries = std::move(new_entries);
     }
+    // Prefetch immediately, regardless of executor availability, if possible
+    return file->WillNeed(ranges);
+  }
+
+  virtual Result<std::shared_ptr<Buffer>> Read(ReadRange range) {
+    if (range.length == 0) {
+      static const uint8_t byte = 0;
+      return std::make_shared<Buffer>(&byte, 0);
+    }
+
+    const auto it = std::lower_bound(
+        entries.begin(), entries.end(), range,
+        [](const RangeCacheEntry& entry, const ReadRange& range) {
+          return entry.range.offset + entry.range.length < range.offset + 
range.length;
+        });
+    if (it != entries.end() && it->range.Contains(range)) {
+      auto fut = MaybeRead(&*it);
+      ARROW_ASSIGN_OR_RAISE(auto buf, fut.result());
+      return SliceBuffer(std::move(buf), range.offset - it->range.offset, 
range.length);
+    }
+    return Status::Invalid("ReadRangeCache did not find matching cache entry");
+  }
+
+  virtual Future<> Wait() {
+    std::vector<Future<>> futures;
+    for (auto& entry : entries) {
+      futures.emplace_back(MaybeRead(&entry));
+    }
+    return AllComplete(futures);
+  }
+
+  virtual Future<> WaitFor(std::vector<ReadRange> ranges) {
+    auto end = std::remove_if(ranges.begin(), ranges.end(),
+                              [](const ReadRange& range) { return range.length 
== 0; });
+    ranges.resize(end - ranges.begin());
+    // Sort in reverse position order
+    std::sort(ranges.begin(), ranges.end(),
+              [](const ReadRange& a, const ReadRange& b) { return a.offset > 
b.offset; });
+
+    std::vector<Future<>> futures;
+    for (auto& entry : entries) {
+      bool include = false;
+      while (!ranges.empty()) {
+        const auto& next = ranges.back();
+        if (next.offset >= entry.range.offset &&
+            next.offset + next.length <= entry.range.offset + 
entry.range.length) {
+          include = true;
+          ranges.pop_back();
+        } else {
+          break;
+        }
+      }
+      if (include) futures.emplace_back(MaybeRead(&entry));
+      if (ranges.empty()) break;
+    }
+    if (!ranges.empty()) {
+      return Status::Invalid("Given ranges were not previously requested for 
caching");
+    }
+    return AllComplete(futures);
+  }
+};
+
+// Don't read ranges when they're first added. Instead, wait until they're 
requested
+// (either through Read or WaitFor).
+struct ReadRangeCache::LazyImpl : public ReadRangeCache::Impl {
+  // Protect against concurrent modification of entries[i]->future
+  std::mutex entry_mutex;
+
+  virtual ~LazyImpl() = default;
+
+  Future<std::shared_ptr<Buffer>> MaybeRead(RangeCacheEntry* entry) override {
+    if (!entry->future.is_valid()) {
+      entry->future = file->ReadAsync(ctx, entry->range.offset, 
entry->range.length);
+    }
+    return entry->future;
+  }
+
+  RangeCacheEntry Cache(const ReadRange& range) override {
+    return {range, Future<std::shared_ptr<Buffer>>()};

Review comment:
       What fills this future?

##########
File path: cpp/src/arrow/io/caching.cc
##########
@@ -149,12 +174,111 @@ struct ReadRangeCache::Impl {
     } else {
       entries = std::move(new_entries);
     }
+    // Prefetch immediately, regardless of executor availability, if possible
+    return file->WillNeed(ranges);
+  }
+
+  virtual Result<std::shared_ptr<Buffer>> Read(ReadRange range) {
+    if (range.length == 0) {
+      static const uint8_t byte = 0;
+      return std::make_shared<Buffer>(&byte, 0);
+    }
+
+    const auto it = std::lower_bound(
+        entries.begin(), entries.end(), range,
+        [](const RangeCacheEntry& entry, const ReadRange& range) {
+          return entry.range.offset + entry.range.length < range.offset + 
range.length;
+        });
+    if (it != entries.end() && it->range.Contains(range)) {
+      auto fut = MaybeRead(&*it);
+      ARROW_ASSIGN_OR_RAISE(auto buf, fut.result());
+      return SliceBuffer(std::move(buf), range.offset - it->range.offset, 
range.length);
+    }
+    return Status::Invalid("ReadRangeCache did not find matching cache entry");
+  }
+
+  virtual Future<> Wait() {
+    std::vector<Future<>> futures;
+    for (auto& entry : entries) {
+      futures.emplace_back(MaybeRead(&entry));
+    }
+    return AllComplete(futures);
+  }
+
+  virtual Future<> WaitFor(std::vector<ReadRange> ranges) {
+    auto end = std::remove_if(ranges.begin(), ranges.end(),
+                              [](const ReadRange& range) { return range.length 
== 0; });
+    ranges.resize(end - ranges.begin());
+    // Sort in reverse position order
+    std::sort(ranges.begin(), ranges.end(),
+              [](const ReadRange& a, const ReadRange& b) { return a.offset > 
b.offset; });
+
+    std::vector<Future<>> futures;
+    for (auto& entry : entries) {
+      bool include = false;
+      while (!ranges.empty()) {
+        const auto& next = ranges.back();
+        if (next.offset >= entry.range.offset &&
+            next.offset + next.length <= entry.range.offset + 
entry.range.length) {
+          include = true;
+          ranges.pop_back();

Review comment:
       Is there no case where a range will span two entries?

##########
File path: cpp/src/arrow/io/caching.cc
##########
@@ -149,12 +174,111 @@ struct ReadRangeCache::Impl {
     } else {
       entries = std::move(new_entries);
     }
+    // Prefetch immediately, regardless of executor availability, if possible
+    return file->WillNeed(ranges);
+  }
+
+  virtual Result<std::shared_ptr<Buffer>> Read(ReadRange range) {
+    if (range.length == 0) {
+      static const uint8_t byte = 0;
+      return std::make_shared<Buffer>(&byte, 0);
+    }
+
+    const auto it = std::lower_bound(
+        entries.begin(), entries.end(), range,
+        [](const RangeCacheEntry& entry, const ReadRange& range) {
+          return entry.range.offset + entry.range.length < range.offset + 
range.length;
+        });
+    if (it != entries.end() && it->range.Contains(range)) {
+      auto fut = MaybeRead(&*it);
+      ARROW_ASSIGN_OR_RAISE(auto buf, fut.result());
+      return SliceBuffer(std::move(buf), range.offset - it->range.offset, 
range.length);
+    }
+    return Status::Invalid("ReadRangeCache did not find matching cache entry");
+  }
+
+  virtual Future<> Wait() {
+    std::vector<Future<>> futures;
+    for (auto& entry : entries) {
+      futures.emplace_back(MaybeRead(&entry));
+    }
+    return AllComplete(futures);
+  }
+
+  virtual Future<> WaitFor(std::vector<ReadRange> ranges) {
+    auto end = std::remove_if(ranges.begin(), ranges.end(),
+                              [](const ReadRange& range) { return range.length 
== 0; });
+    ranges.resize(end - ranges.begin());
+    // Sort in reverse position order
+    std::sort(ranges.begin(), ranges.end(),
+              [](const ReadRange& a, const ReadRange& b) { return a.offset > 
b.offset; });
+
+    std::vector<Future<>> futures;
+    for (auto& entry : entries) {
+      bool include = false;
+      while (!ranges.empty()) {
+        const auto& next = ranges.back();
+        if (next.offset >= entry.range.offset &&
+            next.offset + next.length <= entry.range.offset + 
entry.range.length) {
+          include = true;
+          ranges.pop_back();
+        } else {
+          break;
+        }
+      }
+      if (include) futures.emplace_back(MaybeRead(&entry));
+      if (ranges.empty()) break;
+    }
+    if (!ranges.empty()) {
+      return Status::Invalid("Given ranges were not previously requested for 
caching");
+    }
+    return AllComplete(futures);
+  }
+};
+
+// Don't read ranges when they're first added. Instead, wait until they're 
requested
+// (either through Read or WaitFor).
+struct ReadRangeCache::LazyImpl : public ReadRangeCache::Impl {
+  // Protect against concurrent modification of entries[i]->future
+  std::mutex entry_mutex;
+
+  virtual ~LazyImpl() = default;
+
+  Future<std::shared_ptr<Buffer>> MaybeRead(RangeCacheEntry* entry) override {
+    if (!entry->future.is_valid()) {
+      entry->future = file->ReadAsync(ctx, entry->range.offset, 
entry->range.length);
+    }
+    return entry->future;
+  }
+
+  RangeCacheEntry Cache(const ReadRange& range) override {
+    return {range, Future<std::shared_ptr<Buffer>>()};
+  }
+
+  Status Cache(std::vector<ReadRange> ranges) override {
+    std::unique_lock<std::mutex> guard(entry_mutex);
+    return ReadRangeCache::Impl::Cache(std::move(ranges));

Review comment:
       The current `file->ReadAsync` has some leeway in it which allows the 
method to be synchronous if needed.  If that is the case this could end up 
holding onto the lock for a while.  Actually, it looks like you have guards on 
the `Wait`/`WaitFor` method as well so perhaps this isn't intended to be 
consumed by multiple threads?
   
   Could you maybe add a short comment explaining how you expect this class to 
be used (e.g. first a thread does a bunch of cache calls and then a bunch of 
read calls?  Or maybe there are multiple threads calling cache or read?)

##########
File path: cpp/src/arrow/io/caching.cc
##########
@@ -149,12 +174,111 @@ struct ReadRangeCache::Impl {
     } else {
       entries = std::move(new_entries);
     }
+    // Prefetch immediately, regardless of executor availability, if possible
+    return file->WillNeed(ranges);
+  }
+
+  virtual Result<std::shared_ptr<Buffer>> Read(ReadRange range) {
+    if (range.length == 0) {
+      static const uint8_t byte = 0;
+      return std::make_shared<Buffer>(&byte, 0);
+    }
+
+    const auto it = std::lower_bound(
+        entries.begin(), entries.end(), range,
+        [](const RangeCacheEntry& entry, const ReadRange& range) {
+          return entry.range.offset + entry.range.length < range.offset + 
range.length;
+        });
+    if (it != entries.end() && it->range.Contains(range)) {
+      auto fut = MaybeRead(&*it);
+      ARROW_ASSIGN_OR_RAISE(auto buf, fut.result());
+      return SliceBuffer(std::move(buf), range.offset - it->range.offset, 
range.length);
+    }
+    return Status::Invalid("ReadRangeCache did not find matching cache entry");
+  }
+
+  virtual Future<> Wait() {
+    std::vector<Future<>> futures;
+    for (auto& entry : entries) {
+      futures.emplace_back(MaybeRead(&entry));
+    }
+    return AllComplete(futures);
+  }
+
+  virtual Future<> WaitFor(std::vector<ReadRange> ranges) {
+    auto end = std::remove_if(ranges.begin(), ranges.end(),
+                              [](const ReadRange& range) { return range.length 
== 0; });
+    ranges.resize(end - ranges.begin());
+    // Sort in reverse position order
+    std::sort(ranges.begin(), ranges.end(),
+              [](const ReadRange& a, const ReadRange& b) { return a.offset > 
b.offset; });
+
+    std::vector<Future<>> futures;
+    for (auto& entry : entries) {
+      bool include = false;
+      while (!ranges.empty()) {
+        const auto& next = ranges.back();
+        if (next.offset >= entry.range.offset &&
+            next.offset + next.length <= entry.range.offset + 
entry.range.length) {
+          include = true;
+          ranges.pop_back();
+        } else {
+          break;
+        }
+      }
+      if (include) futures.emplace_back(MaybeRead(&entry));
+      if (ranges.empty()) break;
+    }
+    if (!ranges.empty()) {
+      return Status::Invalid("Given ranges were not previously requested for 
caching");
+    }
+    return AllComplete(futures);
+  }
+};
+
+// Don't read ranges when they're first added. Instead, wait until they're 
requested
+// (either through Read or WaitFor).
+struct ReadRangeCache::LazyImpl : public ReadRangeCache::Impl {
+  // Protect against concurrent modification of entries[i]->future
+  std::mutex entry_mutex;
+
+  virtual ~LazyImpl() = default;
+
+  Future<std::shared_ptr<Buffer>> MaybeRead(RangeCacheEntry* entry) override {
+    if (!entry->future.is_valid()) {
+      entry->future = file->ReadAsync(ctx, entry->range.offset, 
entry->range.length);
+    }
+    return entry->future;
+  }
+
+  RangeCacheEntry Cache(const ReadRange& range) override {
+    return {range, Future<std::shared_ptr<Buffer>>()};
+  }
+
+  Status Cache(std::vector<ReadRange> ranges) override {
+    std::unique_lock<std::mutex> guard(entry_mutex);
+    return ReadRangeCache::Impl::Cache(std::move(ranges));

Review comment:
       Adding to this, could you create a simple test case around whatever type 
of multithreading you expect to guard against with these mutexes?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to