ilya-biryukov updated this revision to Diff 131941.
ilya-biryukov added a comment.

- Remove threading/ dir, moved everything to the top-level
- Rename ThreadPool.h to Threading.h


Repository:
  rCTE Clang Tools Extra

https://reviews.llvm.org/D42174

Files:
  clangd/CMakeLists.txt
  clangd/ClangdServer.cpp
  clangd/ClangdServer.h
  clangd/ClangdUnitStore.h
  clangd/TUScheduler.cpp
  clangd/TUScheduler.h
  clangd/Threading.cpp
  clangd/Threading.h

Index: clangd/Threading.h
===================================================================
--- /dev/null
+++ clangd/Threading.h
@@ -0,0 +1,81 @@
+//===--- ThreadPool.h --------------------------------------------*- C++-*-===//
+//
+//                     The LLVM Compiler Infrastructure
+//
+// This file is distributed under the University of Illinois Open Source
+// License. See LICENSE.TXT for details.
+//
+//===----------------------------------------------------------------------===//
+
+#ifndef LLVM_CLANG_TOOLS_EXTRA_CLANGD_THREADING_THREADPOOL_H
+#define LLVM_CLANG_TOOLS_EXTRA_CLANGD_THREADING_THREADPOOL_H
+
+#include "Function.h"
+#include <condition_variable>
+#include <deque>
+#include <mutex>
+#include <thread>
+#include <vector>
+
+namespace clang {
+namespace clangd {
+/// A simple fixed-size thread pool implementation.
+class ThreadPool {
+public:
+  /// If \p AsyncThreadsCount is 0, requests added using addToFront and addToEnd
+  /// will be processed synchronously on the calling thread.
+  // Otherwise, \p AsyncThreadsCount threads will be created to schedule the
+  // requests.
+  ThreadPool(unsigned AsyncThreadsCount);
+  ~ThreadPool();
+
+  /// Add a new request to run function \p F with args \p As to the start of the
+  /// queue. The request will be run on a separate thread.
+  template <class Func, class... Args>
+  void addToFront(Func &&F, Args &&... As) {
+    if (RunSynchronously) {
+      std::forward<Func>(F)(std::forward<Args>(As)...);
+      return;
+    }
+
+    {
+      std::lock_guard<std::mutex> Lock(Mutex);
+      RequestQueue.push_front(
+          BindWithForward(std::forward<Func>(F), std::forward<Args>(As)...));
+    }
+    RequestCV.notify_one();
+  }
+
+  /// Add a new request to run function \p F with args \p As to the end of the
+  /// queue. The request will be run on a separate thread.
+  template <class Func, class... Args> void addToEnd(Func &&F, Args &&... As) {
+    if (RunSynchronously) {
+      std::forward<Func>(F)(std::forward<Args>(As)...);
+      return;
+    }
+
+    {
+      std::lock_guard<std::mutex> Lock(Mutex);
+      RequestQueue.push_back(
+          BindWithForward(std::forward<Func>(F), std::forward<Args>(As)...));
+    }
+    RequestCV.notify_one();
+  }
+
+private:
+  bool RunSynchronously;
+  mutable std::mutex Mutex;
+  /// We run some tasks on separate threads(parsing, CppFile cleanup).
+  /// These threads looks into RequestQueue to find requests to handle and
+  /// terminate when Done is set to true.
+  std::vector<std::thread> Workers;
+  /// Setting Done to true will make the worker threads terminate.
+  bool Done = false;
+  /// A queue of requests.
+  std::deque<UniqueFunction<void()>> RequestQueue;
+  /// Condition variable to wake up worker threads.
+  std::condition_variable RequestCV;
+};
+} // namespace clangd
+} // namespace clang
+#endif
Index: clangd/Threading.cpp
===================================================================
--- /dev/null
+++ clangd/Threading.cpp
@@ -0,0 +1,61 @@
+#include "Threading.h"
+#include "llvm/Support/FormatVariadic.h"
+#include "llvm/Support/Threading.h"
+
+namespace clang {
+namespace clangd {
+ThreadPool::ThreadPool(unsigned AsyncThreadsCount)
+    : RunSynchronously(AsyncThreadsCount == 0) {
+  if (RunSynchronously) {
+    // Don't start the worker thread if we're running synchronously
+    return;
+  }
+
+  Workers.reserve(AsyncThreadsCount);
+  for (unsigned I = 0; I < AsyncThreadsCount; ++I) {
+    Workers.push_back(std::thread([this, I]() {
+      llvm::set_thread_name(llvm::formatv("scheduler/{0}", I));
+      while (true) {
+        UniqueFunction<void()> Request;
+
+        // Pick request from the queue
+        {
+          std::unique_lock<std::mutex> Lock(Mutex);
+          // Wait for more requests.
+          RequestCV.wait(Lock,
+                         [this] { return !RequestQueue.empty() || Done; });
+          if (Done)
+            return;
+
+          assert(!RequestQueue.empty() && "RequestQueue was empty");
+
+          // We process requests starting from the front of the queue. Users of
+          // ThreadPool have a way to prioritise their requests by putting
+          // them to the either side of the queue (using either addToEnd or
+          // addToFront).
+          Request = std::move(RequestQueue.front());
+          RequestQueue.pop_front();
+        } // unlock Mutex
+
+        Request();
+      }
+    }));
+  }
+}
+
+ThreadPool::~ThreadPool() {
+  if (RunSynchronously)
+    return; // no worker thread is running in that case
+
+  {
+    std::lock_guard<std::mutex> Lock(Mutex);
+    // Wake up the worker thread
+    Done = true;
+  } // unlock Mutex
+  RequestCV.notify_all();
+
+  for (auto &Worker : Workers)
+    Worker.join();
+}
+} // namespace clangd
+} // namespace clang
Index: clangd/TUScheduler.h
===================================================================
--- /dev/null
+++ clangd/TUScheduler.h
@@ -0,0 +1,94 @@
+//===--- TUScheduler.h -------------------------------------------*-C++-*-===//
+//
+//                     The LLVM Compiler Infrastructure
+//
+// This file is distributed under the University of Illinois Open Source
+// License. See LICENSE.TXT for details.
+//
+//===----------------------------------------------------------------------===//
+
+#ifndef LLVM_CLANG_TOOLS_EXTRA_CLANGD_THREADING_SCHEDULER_H
+#define LLVM_CLANG_TOOLS_EXTRA_CLANGD_THREADING_SCHEDULER_H
+
+#include "ClangdUnit.h"
+#include "ClangdUnitStore.h"
+#include "Function.h"
+#include "Threading.h"
+
+namespace clang {
+namespace clangd {
+/// Returns a number of a default async threads to use for TUScheduler.
+/// Returned value is always >= 1 (i.e. will not cause requests to be processed
+/// synchronously).
+unsigned getDefaultAsyncThreadsCount();
+
+struct InputsAndAST {
+  const ParseInputs &Inputs;
+  ParsedAST &AST;
+};
+
+struct InputsAndPreamble {
+  const ParseInputs &Inputs;
+  const PreambleData *Preamble;
+};
+
+/// Handles running tasks for ClangdServer and managing the resources (e.g.,
+/// preambles and ASTs) for opened files.
+/// TUScheduler is not thread-safe, only one thread should be providing updates
+/// and scheduling tasks.
+/// Callbacks are run on a threadpool and it's appropriate to do slow work in
+/// them.
+class TUScheduler {
+public:
+  TUScheduler(unsigned AsyncThreadsCount, bool StorePreamblesInMemory,
+              ASTParsedCallback ASTCallback);
+
+  /// Returns estimated memory usage for each of the currently open files.
+  /// The order of results is unspecified.
+  std::vector<std::pair<Path, std::size_t>> getUsedBytesPerFile() const;
+
+  /// Schedule an update for \p File. Adds \p File to a list of tracked files if
+  /// \p File was not part of it before.
+  /// FIXME(ibiryukov): remove the callback from this function.
+  void update(
+      Context Ctx, PathRef File, ParseInputs Inputs,
+      UniqueFunction<void(Context, llvm::Optional<std::vector<DiagWithFixIts>>)>
+          OnUpdated);
+
+  /// Remove \p File from the list of tracked files and schedule removal of its
+  /// resources. \p Action will be called when resources are freed.
+  /// If an error occurs during processing, it is forwarded to the \p Action
+  /// callback.
+  /// FIXME(ibiryukov): the callback passed to this function is not used, we
+  /// should remove it.
+  void remove(PathRef File, UniqueFunction<void(llvm::Error)> Action);
+
+  /// Schedule an async read of the AST. \p Action will be called when AST is
+  /// ready. The AST passed to \p Action refers to the version of \p File
+  /// tracked at the time of the call, even if new updates are received before
+  /// \p Action is executed.
+  /// If an error occurs during processing, it is forwarded to the \p Action
+  /// callback.
+  void runWithAST(PathRef File,
+                  UniqueFunction<void(llvm::Expected<InputsAndAST>)> Action);
+
+  /// Schedule an async read of the Preamble. Preamble passed to \p Action may
+  /// be built for any version of the file, callers must not rely on it being
+  /// consistent with the current version of the file.
+  /// If an error occurs during processing, it is forwarded to the \p Action
+  /// callback.
+  void runWithPreamble(
+      PathRef File,
+      UniqueFunction<void(llvm::Expected<InputsAndPreamble>)> Action);
+
+private:
+  const ParseInputs &getInputs(PathRef File);
+
+  llvm::StringMap<ParseInputs> CachedInputs;
+  CppFileCollection Files;
+  ThreadPool Threads;
+};
+} // namespace clangd
+} // namespace clang
+
+#endif
Index: clangd/TUScheduler.cpp
===================================================================
--- /dev/null
+++ clangd/TUScheduler.cpp
@@ -0,0 +1,124 @@
+#include "TUScheduler.h"
+#include "clang/Frontend/PCHContainerOperations.h"
+#include "llvm/Support/Errc.h"
+
+namespace clang {
+namespace clangd {
+unsigned getDefaultAsyncThreadsCount() {
+  unsigned HardwareConcurrency = std::thread::hardware_concurrency();
+  // C++ standard says that hardware_concurrency()
+  // may return 0, fallback to 1 worker thread in
+  // that case.
+  if (HardwareConcurrency == 0)
+    return 1;
+  return HardwareConcurrency;
+}
+
+TUScheduler::TUScheduler(unsigned AsyncThreadsCount,
+                         bool StorePreamblesInMemory,
+                         ASTParsedCallback ASTCallback)
+
+    : Files(StorePreamblesInMemory, std::make_shared<PCHContainerOperations>(),
+            std::move(ASTCallback)),
+      Threads(AsyncThreadsCount) {}
+
+void TUScheduler::update(
+    Context Ctx, PathRef File, ParseInputs Inputs,
+    UniqueFunction<void(Context Ctx,
+                        llvm::Optional<std::vector<DiagWithFixIts>>)>
+        OnUpdated) {
+  CachedInputs[File] = Inputs;
+
+  auto Resources = Files.getOrCreateFile(File);
+  auto DeferredRebuild = Resources->deferRebuild(std::move(Inputs));
+
+  Threads.addToFront(
+      [](Context Ctx, decltype(OnUpdated) OnUpdated,
+         decltype(DeferredRebuild) DeferredRebuild) {
+        auto Diags = DeferredRebuild(Ctx);
+        OnUpdated(std::move(Ctx), Diags);
+      },
+      std::move(Ctx), std::move(OnUpdated), std::move(DeferredRebuild));
+}
+
+void TUScheduler::remove(PathRef File,
+                         UniqueFunction<void(llvm::Error)> Action) {
+  CachedInputs.erase(File);
+
+  auto Resources = Files.removeIfPresent(File);
+  if (!Resources) {
+    Action(llvm::make_error<llvm::StringError>(
+        "trying to remove non-added document", llvm::errc::invalid_argument));
+    return;
+  }
+
+  auto DeferredCancel = Resources->deferCancelRebuild();
+  Threads.addToFront(
+      [](decltype(Action) Action, decltype(DeferredCancel) DeferredCancel) {
+        DeferredCancel();
+        Action(llvm::Error::success());
+      },
+      std::move(Action), std::move(DeferredCancel));
+}
+
+void TUScheduler::runWithAST(
+    PathRef File, UniqueFunction<void(llvm::Expected<InputsAndAST>)> Action) {
+  auto Resources = Files.getFile(File);
+  if (!Resources) {
+    Action(llvm::make_error<llvm::StringError>(
+        "trying to get AST for non-added document",
+        llvm::errc::invalid_argument));
+    return;
+  }
+
+  const ParseInputs &Inputs = getInputs(File);
+  // We currently block the calling thread until AST is available and run the
+  // action on the calling thread to avoid inconsistent states coming from
+  // subsequent updates.
+  // FIXME(ibiryukov): this should be moved to the worker threads.
+  Resources->getAST().get()->runUnderLock([&](ParsedAST *AST) {
+    if (AST)
+      Action(InputsAndAST{Inputs, *AST});
+    else
+      Action(llvm::make_error<llvm::StringError>(
+          "Could not build AST for the latest file update",
+          llvm::errc::invalid_argument));
+  });
+}
+
+void TUScheduler::runWithPreamble(
+    PathRef File,
+    UniqueFunction<void(llvm::Expected<InputsAndPreamble>)> Action) {
+  std::shared_ptr<CppFile> Resources = Files.getFile(File);
+  if (!Resources) {
+    Action(llvm::make_error<llvm::StringError>(
+        "trying to get preamble for non-added document",
+        llvm::errc::invalid_argument));
+    return;
+  }
+
+  const ParseInputs &Inputs = getInputs(File);
+  std::shared_ptr<const PreambleData> Preamble =
+      Resources->getPossiblyStalePreamble();
+  Threads.addToFront(
+      [Resources, Preamble, Inputs](decltype(Action) Action) mutable {
+        if (!Preamble)
+          Preamble = Resources->getPossiblyStalePreamble();
+
+        Action(InputsAndPreamble{Inputs, Preamble.get()});
+      },
+      std::move(Action));
+}
+
+const ParseInputs &TUScheduler::getInputs(PathRef File) {
+  auto It = CachedInputs.find(File);
+  assert(It != CachedInputs.end());
+  return It->second;
+}
+
+std::vector<std::pair<Path, std::size_t>>
+TUScheduler::getUsedBytesPerFile() const {
+  return Files.getUsedBytesPerFile();
+}
+} // namespace clangd
+} // namespace clang
Index: clangd/ClangdUnitStore.h
===================================================================
--- clangd/ClangdUnitStore.h
+++ clangd/ClangdUnitStore.h
@@ -27,26 +27,26 @@
 public:
   /// \p ASTCallback is called when a file is parsed synchronously. This should
   /// not be expensive since it blocks diagnostics.
-  explicit CppFileCollection(ASTParsedCallback ASTCallback)
-      : ASTCallback(std::move(ASTCallback)) {}
+  explicit CppFileCollection(bool StorePreamblesInMemory,
+                             std::shared_ptr<PCHContainerOperations> PCHs,
+                             ASTParsedCallback ASTCallback)
+      : ASTCallback(std::move(ASTCallback)), PCHs(std::move(PCHs)),
+        StorePreamblesInMemory(StorePreamblesInMemory) {}
 
-  std::shared_ptr<CppFile>
-  getOrCreateFile(PathRef File, bool StorePreamblesInMemory,
-                  std::shared_ptr<PCHContainerOperations> PCHs) {
+  std::shared_ptr<CppFile> getOrCreateFile(PathRef File) {
     std::lock_guard<std::mutex> Lock(Mutex);
     auto It = OpenedFiles.find(File);
     if (It == OpenedFiles.end()) {
       It = OpenedFiles
                .try_emplace(File, CppFile::Create(File, StorePreamblesInMemory,
-                                                  std::move(PCHs), ASTCallback))
+                                                  PCHs, ASTCallback))
                .first;
     }
     return It->second;
   }
 
   std::shared_ptr<CppFile> getFile(PathRef File) const {
     std::lock_guard<std::mutex> Lock(Mutex);
-
     auto It = OpenedFiles.find(File);
     if (It == OpenedFiles.end())
       return nullptr;
@@ -64,6 +64,8 @@
   mutable std::mutex Mutex;
   llvm::StringMap<std::shared_ptr<CppFile>> OpenedFiles;
   ASTParsedCallback ASTCallback;
+  std::shared_ptr<PCHContainerOperations> PCHs;
+  bool StorePreamblesInMemory;
 };
 } // namespace clangd
 } // namespace clang
Index: clangd/ClangdServer.h
===================================================================
--- clangd/ClangdServer.h
+++ clangd/ClangdServer.h
@@ -18,17 +18,15 @@
 #include "Function.h"
 #include "GlobalCompilationDatabase.h"
 #include "Protocol.h"
+#include "TUScheduler.h"
 #include "index/FileIndex.h"
 #include "clang/Tooling/CompilationDatabase.h"
 #include "clang/Tooling/Core/Replacement.h"
 #include "llvm/ADT/IntrusiveRefCntPtr.h"
 #include "llvm/ADT/Optional.h"
 #include "llvm/ADT/StringRef.h"
-#include <condition_variable>
 #include <functional>
-#include <mutex>
 #include <string>
-#include <thread>
 #include <type_traits>
 #include <utility>
 
@@ -99,71 +97,6 @@
 
 class ClangdServer;
 
-/// Returns a number of a default async threads to use for ClangdScheduler.
-/// Returned value is always >= 1 (i.e. will not cause requests to be processed
-/// synchronously).
-unsigned getDefaultAsyncThreadsCount();
-
-/// Handles running WorkerRequests of ClangdServer on a number of worker
-/// threads.
-class ClangdScheduler {
-public:
-  /// If \p AsyncThreadsCount is 0, requests added using addToFront and addToEnd
-  /// will be processed synchronously on the calling thread.
-  // Otherwise, \p AsyncThreadsCount threads will be created to schedule the
-  // requests.
-  ClangdScheduler(unsigned AsyncThreadsCount);
-  ~ClangdScheduler();
-
-  /// Add a new request to run function \p F with args \p As to the start of the
-  /// queue. The request will be run on a separate thread.
-  template <class Func, class... Args>
-  void addToFront(Func &&F, Args &&... As) {
-    if (RunSynchronously) {
-      std::forward<Func>(F)(std::forward<Args>(As)...);
-      return;
-    }
-
-    {
-      std::lock_guard<std::mutex> Lock(Mutex);
-      RequestQueue.push_front(
-          BindWithForward(std::forward<Func>(F), std::forward<Args>(As)...));
-    }
-    RequestCV.notify_one();
-  }
-
-  /// Add a new request to run function \p F with args \p As to the end of the
-  /// queue. The request will be run on a separate thread.
-  template <class Func, class... Args> void addToEnd(Func &&F, Args &&... As) {
-    if (RunSynchronously) {
-      std::forward<Func>(F)(std::forward<Args>(As)...);
-      return;
-    }
-
-    {
-      std::lock_guard<std::mutex> Lock(Mutex);
-      RequestQueue.push_back(
-          BindWithForward(std::forward<Func>(F), std::forward<Args>(As)...));
-    }
-    RequestCV.notify_one();
-  }
-
-private:
-  bool RunSynchronously;
-  std::mutex Mutex;
-  /// We run some tasks on separate threads(parsing, CppFile cleanup).
-  /// These threads looks into RequestQueue to find requests to handle and
-  /// terminate when Done is set to true.
-  std::vector<std::thread> Workers;
-  /// Setting Done to true will make the worker threads terminate.
-  bool Done = false;
-  /// A queue of requests. Elements of this vector are async computations (i.e.
-  /// results of calling std::async(std::launch::deferred, ...)).
-  std::deque<UniqueFunction<void()>> RequestQueue;
-  /// Condition variable to wake up worker threads.
-  std::condition_variable RequestCV;
-};
-
 /// Provides API to manage ASTs for a collection of C++ files and request
 /// various language features.
 /// Currently supports async diagnostics, code completion, formatting and goto
@@ -221,17 +154,23 @@
   /// constructor will receive onDiagnosticsReady callback.
   /// \return A future that will become ready when the rebuild (including
   /// diagnostics) is finished.
+  /// FIXME: don't return futures here, LSP does not require a response for this
+  /// request.
   std::future<Context> addDocument(Context Ctx, PathRef File,
                                    StringRef Contents);
   /// Remove \p File from list of tracked files, schedule a request to free
   /// resources associated with it.
   /// \return A future that will become ready when the file is removed and all
   /// associated resources are freed.
+  /// FIXME: don't return futures here, LSP does not require a response for this
+  /// request.
   std::future<Context> removeDocument(Context Ctx, PathRef File);
   /// Force \p File to be reparsed using the latest contents.
   /// Will also check if CompileCommand, provided by GlobalCompilationDatabase
   /// for \p File has changed. If it has, will remove currently stored Preamble
   /// and AST and rebuild them from scratch.
+  /// FIXME: don't return futures here, LSP does not require a response for this
+  /// request.
   std::future<Context> forceReparse(Context Ctx, PathRef File);
 
   /// DEPRECATED. Please use a callback-based version, this API is deprecated
@@ -340,12 +279,8 @@
 
   std::future<Context>
   scheduleReparseAndDiags(Context Ctx, PathRef File, VersionedDraft Contents,
-                          std::shared_ptr<CppFile> Resources,
                           Tagged<IntrusiveRefCntPtr<vfs::FileSystem>> TaggedFS);
 
-  std::future<Context>
-  scheduleCancelRebuild(Context Ctx, std::shared_ptr<CppFile> Resources);
-
   CompileArgsCache CompileArgs;
   DiagnosticsConsumer &DiagConsumer;
   FileSystemProvider &FSProvider;
@@ -360,21 +295,19 @@
   std::unique_ptr<FileIndex> FileIdx;
   // If present, a merged view of FileIdx and an external index. Read via Index.
   std::unique_ptr<SymbolIndex> MergedIndex;
-  CppFileCollection Units;
   // If set, this represents the workspace path.
   llvm::Optional<std::string> RootPath;
   std::shared_ptr<PCHContainerOperations> PCHs;
-  bool StorePreamblesInMemory;
   /// Used to serialize diagnostic callbacks.
   /// FIXME(ibiryukov): get rid of an extra map and put all version counters
   /// into CppFile.
   std::mutex DiagnosticsMutex;
   /// Maps from a filename to the latest version of reported diagnostics.
   llvm::StringMap<DocVersion> ReportedDiagnosticVersions;
   // WorkScheduler has to be the last member, because its destructor has to be
   // called before all other members to stop the worker thread that references
-  // ClangdServer
-  ClangdScheduler WorkScheduler;
+  // ClangdServer.
+  TUScheduler WorkScheduler;
 };
 
 } // namespace clangd
Index: clangd/ClangdServer.cpp
===================================================================
--- clangd/ClangdServer.cpp
+++ clangd/ClangdServer.cpp
@@ -22,8 +22,6 @@
 #include "llvm/ADT/ScopeExit.h"
 #include "llvm/Support/Errc.h"
 #include "llvm/Support/FileSystem.h"
-#include "llvm/Support/FormatProviders.h"
-#include "llvm/Support/FormatVariadic.h"
 #include "llvm/Support/Path.h"
 #include "llvm/Support/raw_ostream.h"
 #include <future>
@@ -33,6 +31,26 @@
 
 namespace {
 
+// Issues an async read of AST and waits for results.
+template <class Ret, class Func>
+Ret blockingASTRead(TUScheduler &S, PathRef File, Func &&F) {
+  std::packaged_task<Ret(llvm::Expected<InputsAndAST>)> Task(
+      std::forward<Func>(F));
+  auto Future = Task.get_future();
+  S.runWithAST(File, std::move(Task));
+  return Future.get();
+}
+
+// Issues an async read of preamble and waits for results.
+template <class Ret, class Func>
+Ret blockingPreambleRead(TUScheduler &S, PathRef File, Func &&F) {
+  std::packaged_task<Ret(llvm::Expected<InputsAndPreamble>)> Task(
+      std::forward<Func>(F));
+  auto Future = Task.get_future();
+  S.runWithPreamble(File, std::move(Task));
+  return Future.get();
+}
+
 std::string getStandardResourceDir() {
   static int Dummy; // Just an address in this process.
   return CompilerInvocation::GetResourcesPath("clangd", (void *)&Dummy);
@@ -67,70 +85,6 @@
   return make_tagged(vfs::getRealFileSystem(), VFSTag());
 }
 
-unsigned clangd::getDefaultAsyncThreadsCount() {
-  unsigned HardwareConcurrency = std::thread::hardware_concurrency();
-  // C++ standard says that hardware_concurrency()
-  // may return 0, fallback to 1 worker thread in
-  // that case.
-  if (HardwareConcurrency == 0)
-    return 1;
-  return HardwareConcurrency;
-}
-
-ClangdScheduler::ClangdScheduler(unsigned AsyncThreadsCount)
-    : RunSynchronously(AsyncThreadsCount == 0) {
-  if (RunSynchronously) {
-    // Don't start the worker thread if we're running synchronously
-    return;
-  }
-
-  Workers.reserve(AsyncThreadsCount);
-  for (unsigned I = 0; I < AsyncThreadsCount; ++I) {
-    Workers.push_back(std::thread([this, I]() {
-      llvm::set_thread_name(llvm::formatv("scheduler/{0}", I));
-      while (true) {
-        UniqueFunction<void()> Request;
-
-        // Pick request from the queue
-        {
-          std::unique_lock<std::mutex> Lock(Mutex);
-          // Wait for more requests.
-          RequestCV.wait(Lock,
-                         [this] { return !RequestQueue.empty() || Done; });
-          if (Done)
-            return;
-
-          assert(!RequestQueue.empty() && "RequestQueue was empty");
-
-          // We process requests starting from the front of the queue. Users of
-          // ClangdScheduler have a way to prioritise their requests by putting
-          // them to the either side of the queue (using either addToEnd or
-          // addToFront).
-          Request = std::move(RequestQueue.front());
-          RequestQueue.pop_front();
-        } // unlock Mutex
-
-        Request();
-      }
-    }));
-  }
-}
-
-ClangdScheduler::~ClangdScheduler() {
-  if (RunSynchronously)
-    return; // no worker thread is running in that case
-
-  {
-    std::lock_guard<std::mutex> Lock(Mutex);
-    // Wake up the worker thread
-    Done = true;
-  } // unlock Mutex
-  RequestCV.notify_all();
-
-  for (auto &Worker : Workers)
-    Worker.join();
-}
-
 ClangdServer::ClangdServer(GlobalCompilationDatabase &CDB,
                            DiagnosticsConsumer &DiagConsumer,
                            FileSystemProvider &FSProvider,
@@ -142,18 +96,17 @@
                   ResourceDir ? ResourceDir->str() : getStandardResourceDir()),
       DiagConsumer(DiagConsumer), FSProvider(FSProvider),
       FileIdx(BuildDynamicSymbolIndex ? new FileIndex() : nullptr),
-      // Pass a callback into `Units` to extract symbols from a newly parsed
-      // file and rebuild the file index synchronously each time an AST is
-      // parsed.
+      PCHs(std::make_shared<PCHContainerOperations>()),
+      // Pass a callback into `WorkScheduler` to extract symbols from a newly
+      // parsed file and rebuild the file index synchronously each time an AST
+      // is parsed.
       // FIXME(ioeric): this can be slow and we may be able to index on less
       // critical paths.
-      Units(FileIdx
-                ? [this](const Context &Ctx, PathRef Path,
-                         ParsedAST *AST) { FileIdx->update(Ctx, Path, AST); }
-                : ASTParsedCallback()),
-      PCHs(std::make_shared<PCHContainerOperations>()),
-      StorePreamblesInMemory(StorePreamblesInMemory),
-      WorkScheduler(AsyncThreadsCount) {
+      WorkScheduler(
+          AsyncThreadsCount, StorePreamblesInMemory,
+          FileIdx ? [this](const Context &Ctx, PathRef Path,
+                           ParsedAST *AST) { FileIdx->update(Ctx, Path, AST); }
+                  : ASTParsedCallback()) {
   if (FileIdx && StaticIdx) {
     MergedIndex = mergeIndex(FileIdx.get(), StaticIdx);
     Index = MergedIndex.get();
@@ -175,21 +128,28 @@
 std::future<Context> ClangdServer::addDocument(Context Ctx, PathRef File,
                                                StringRef Contents) {
   DocVersion Version = DraftMgr.updateDraft(File, Contents);
-
   auto TaggedFS = FSProvider.getTaggedFileSystem(File);
-  std::shared_ptr<CppFile> Resources =
-      Units.getOrCreateFile(File, StorePreamblesInMemory, PCHs);
   return scheduleReparseAndDiags(std::move(Ctx), File,
                                  VersionedDraft{Version, Contents.str()},
-                                 std::move(Resources), std::move(TaggedFS));
+                                 std::move(TaggedFS));
 }
 
 std::future<Context> ClangdServer::removeDocument(Context Ctx, PathRef File) {
   DraftMgr.removeDraft(File);
   CompileArgs.invalidate(File);
 
-  std::shared_ptr<CppFile> Resources = Units.removeIfPresent(File);
-  return scheduleCancelRebuild(std::move(Ctx), std::move(Resources));
+  std::promise<Context> DonePromise;
+  std::future<Context> DoneFuture = DonePromise.get_future();
+
+  auto Callback = BindWithForward(
+      [](Context Ctx, std::promise<Context> DonePromise, llvm::Error Err) {
+        (void)(bool) Err;
+        DonePromise.set_value(std::move(Ctx));
+      },
+      std::move(Ctx), std::move(DonePromise));
+
+  WorkScheduler.remove(File, std::move(Callback));
+  return DoneFuture;
 }
 
 std::future<Context> ClangdServer::forceReparse(Context Ctx, PathRef File) {
@@ -202,10 +162,8 @@
   CompileArgs.invalidate(File);
 
   auto TaggedFS = FSProvider.getTaggedFileSystem(File);
-  std::shared_ptr<CppFile> Resources =
-      Units.getOrCreateFile(File, StorePreamblesInMemory, PCHs);
-  return scheduleReparseAndDiags(std::move(Ctx), File, FileContents,
-                                 std::move(Resources), std::move(TaggedFS));
+  return scheduleReparseAndDiags(std::move(Ctx), File, std::move(FileContents),
+                                 std::move(TaggedFS));
 }
 
 std::future<std::pair<Context, Tagged<CompletionList>>>
@@ -237,97 +195,83 @@
     IntrusiveRefCntPtr<vfs::FileSystem> *UsedFS) {
   using CallbackType = UniqueFunction<void(Context, Tagged<CompletionList>)>;
 
-  std::string Contents;
-  if (OverridenContents) {
-    Contents = *OverridenContents;
-  } else {
-    auto FileContents = DraftMgr.getDraft(File);
-    assert(FileContents.Draft &&
-           "codeComplete is called for non-added document");
-
-    Contents = std::move(*FileContents.Draft);
-  }
-
   auto TaggedFS = FSProvider.getTaggedFileSystem(File);
   if (UsedFS)
     *UsedFS = TaggedFS.Value;
 
-  std::shared_ptr<CppFile> Resources = Units.getFile(File);
-  assert(Resources && "Calling completion on non-added file");
-
-  // Remember the current Preamble and use it when async task starts executing.
-  // At the point when async task starts executing, we may have a different
-  // Preamble in Resources. However, we assume the Preamble that we obtain here
-  // is reusable in completion more often.
-  std::shared_ptr<const PreambleData> Preamble =
-      Resources->getPossiblyStalePreamble();
   // Copy completion options for passing them to async task handler.
   auto CodeCompleteOpts = Opts;
   if (!CodeCompleteOpts.Index) // Respect overridden index.
     CodeCompleteOpts.Index = Index;
 
-  // Copy File, as it is a PathRef that will go out of scope before Task is
-  // executed.
-  Path FileStr = File;
+  std::string Contents;
+  if (OverridenContents) {
+    Contents = OverridenContents->str();
+  } else {
+    VersionedDraft Latest = DraftMgr.getDraft(File);
+    assert(Latest.Draft && "codeComplete called for non-added document");
+    Contents = *Latest.Draft;
+  }
+
   // Copy PCHs to avoid accessing this->PCHs concurrently
   std::shared_ptr<PCHContainerOperations> PCHs = this->PCHs;
-  tooling::CompileCommand CompileCommand = CompileArgs.getCompileCommand(File);
-  // A task that will be run asynchronously.
-  auto Task =
-      // 'mutable' to reassign Preamble variable.
-      [FileStr, Preamble, Resources, Contents, Pos, CodeCompleteOpts, TaggedFS,
-       PCHs, CompileCommand](Context Ctx, CallbackType Callback) mutable {
-        if (!Preamble) {
-          // Maybe we built some preamble before processing this request.
-          Preamble = Resources->getPossiblyStalePreamble();
-        }
-        // FIXME(ibiryukov): even if Preamble is non-null, we may want to check
-        // both the old and the new version in case only one of them matches.
-        CompletionList Result = clangd::codeComplete(
-            Ctx, FileStr, CompileCommand,
-            Preamble ? &Preamble->Preamble : nullptr, Contents, Pos,
-            TaggedFS.Value, PCHs, CodeCompleteOpts);
-
-        Callback(std::move(Ctx),
-                 make_tagged(std::move(Result), std::move(TaggedFS.Tag)));
-      };
-
-  WorkScheduler.addToFront(std::move(Task), std::move(Ctx),
-                           std::move(Callback));
+  auto Task = [PCHs, Pos, TaggedFS, CodeCompleteOpts](
+                  Context Ctx, std::string Contents, Path File,
+                  CallbackType Callback, llvm::Expected<InputsAndPreamble> IP) {
+    assert(IP && "error when trying to read preamble for codeComplete");
+    auto PreambleData = IP->Preamble;
+    auto &Command = IP->Inputs.CompileCommand;
+
+    // FIXME(ibiryukov): even if Preamble is non-null, we may want to check
+    // both the old and the new version in case only one of them matches.
+    CompletionList Result = clangd::codeComplete(
+        Ctx, File, Command, PreambleData ? &PreambleData->Preamble : nullptr,
+        Contents, Pos, TaggedFS.Value, PCHs, CodeCompleteOpts);
+
+    Callback(std::move(Ctx),
+             make_tagged(std::move(Result), std::move(TaggedFS.Tag)));
+  };
+
+  WorkScheduler.runWithPreamble(
+      File, BindWithForward(Task, std::move(Ctx), std::move(Contents),
+                            File.str(), std::move(Callback)));
 }
 
 llvm::Expected<Tagged<SignatureHelp>>
 ClangdServer::signatureHelp(const Context &Ctx, PathRef File, Position Pos,
                             llvm::Optional<StringRef> OverridenContents,
                             IntrusiveRefCntPtr<vfs::FileSystem> *UsedFS) {
-  std::string DraftStorage;
-  if (!OverridenContents) {
-    auto FileContents = DraftMgr.getDraft(File);
-    if (!FileContents.Draft)
-      return llvm::make_error<llvm::StringError>(
-          "signatureHelp is called for non-added document",
-          llvm::errc::invalid_argument);
-
-    DraftStorage = std::move(*FileContents.Draft);
-    OverridenContents = DraftStorage;
-  }
-
   auto TaggedFS = FSProvider.getTaggedFileSystem(File);
   if (UsedFS)
     *UsedFS = TaggedFS.Value;
 
-  std::shared_ptr<CppFile> Resources = Units.getFile(File);
-  if (!Resources)
-    return llvm::make_error<llvm::StringError>(
-        "signatureHelp is called for non-added document",
-        llvm::errc::invalid_argument);
+  std::string Contents;
+  if (OverridenContents) {
+    Contents = OverridenContents->str();
+  } else {
+    VersionedDraft Latest = DraftMgr.getDraft(File);
+    if (!Latest.Draft)
+      return llvm::make_error<llvm::StringError>(
+          "signatureHelp is called for non-added document",
+          llvm::errc::invalid_argument);
+    Contents = std::move(*Latest.Draft);
+  }
 
-  auto Preamble = Resources->getPossiblyStalePreamble();
-  auto Result =
-      clangd::signatureHelp(Ctx, File, CompileArgs.getCompileCommand(File),
-                            Preamble ? &Preamble->Preamble : nullptr,
-                            *OverridenContents, Pos, TaggedFS.Value, PCHs);
-  return make_tagged(std::move(Result), TaggedFS.Tag);
+  auto Action = [=, &Ctx](llvm::Expected<InputsAndPreamble> IP)
+      -> Expected<Tagged<SignatureHelp>> {
+    if (!IP)
+      return IP.takeError();
+    auto PreambleData = IP->Preamble;
+    auto &Command = IP->Inputs.CompileCommand;
+
+    return make_tagged(
+        clangd::signatureHelp(Ctx, File, Command,
+                              PreambleData ? &PreambleData->Preamble : nullptr,
+                              Contents, Pos, TaggedFS.Value, PCHs),
+        TaggedFS.Tag);
+  };
+  return blockingPreambleRead<Expected<Tagged<SignatureHelp>>>(WorkScheduler,
+                                                               File, Action);
 }
 
 llvm::Expected<tooling::Replacements>
@@ -359,49 +303,55 @@
 Expected<std::vector<tooling::Replacement>>
 ClangdServer::rename(const Context &Ctx, PathRef File, Position Pos,
                      llvm::StringRef NewName) {
-  std::shared_ptr<CppFile> Resources = Units.getFile(File);
-  RefactoringResultCollector ResultCollector;
-  Resources->getAST().get()->runUnderLock([&](ParsedAST *AST) {
-    const SourceManager &SourceMgr = AST->getASTContext().getSourceManager();
+  using RetType = Expected<std::vector<tooling::Replacement>>;
+  auto Action = [=](Expected<InputsAndAST> InpAST) -> RetType {
+    if (!InpAST)
+      return InpAST.takeError();
+    auto &AST = InpAST->AST;
+
+    RefactoringResultCollector ResultCollector;
+    const SourceManager &SourceMgr = AST.getASTContext().getSourceManager();
     const FileEntry *FE =
         SourceMgr.getFileEntryForID(SourceMgr.getMainFileID());
     if (!FE)
-      return;
+      return llvm::make_error<llvm::StringError>(
+          "rename called for non-added document", llvm::errc::invalid_argument);
     SourceLocation SourceLocationBeg =
-        clangd::getBeginningOfIdentifier(*AST, Pos, FE);
+        clangd::getBeginningOfIdentifier(AST, Pos, FE);
     tooling::RefactoringRuleContext Context(
-        AST->getASTContext().getSourceManager());
-    Context.setASTContext(AST->getASTContext());
+        AST.getASTContext().getSourceManager());
+    Context.setASTContext(AST.getASTContext());
     auto Rename = clang::tooling::RenameOccurrences::initiate(
         Context, SourceRange(SourceLocationBeg), NewName.str());
-    if (!Rename) {
-      ResultCollector.Result = Rename.takeError();
-      return;
-    }
+    if (!Rename)
+      return Rename.takeError();
+
     Rename->invoke(ResultCollector, Context);
-  });
-  assert(ResultCollector.Result.hasValue());
-  if (!ResultCollector.Result.getValue())
-    return ResultCollector.Result->takeError();
-
-  std::vector<tooling::Replacement> Replacements;
-  for (const tooling::AtomicChange &Change : ResultCollector.Result->get()) {
-    tooling::Replacements ChangeReps = Change.getReplacements();
-    for (const auto &Rep : ChangeReps) {
-      // FIXME: Right now we only support renaming the main file, so we drop
-      // replacements not for the main file. In the future, we might consider to
-      // support:
-      //   * rename in any included header
-      //   * rename only in the "main" header
-      //   * provide an error if there are symbols we won't rename (e.g.
-      //     std::vector)
-      //   * rename globally in project
-      //   * rename in open files
-      if (Rep.getFilePath() == File)
-        Replacements.push_back(Rep);
+
+    assert(ResultCollector.Result.hasValue());
+    if (!ResultCollector.Result.getValue())
+      return ResultCollector.Result->takeError();
+
+    std::vector<tooling::Replacement> Replacements;
+    for (const tooling::AtomicChange &Change : ResultCollector.Result->get()) {
+      tooling::Replacements ChangeReps = Change.getReplacements();
+      for (const auto &Rep : ChangeReps) {
+        // FIXME: Right now we only support renaming the main file, so we
+        // drop replacements not for the main file. In the future, we might
+        // consider to support:
+        //   * rename in any included header
+        //   * rename only in the "main" header
+        //   * provide an error if there are symbols we won't rename (e.g.
+        //     std::vector)
+        //   * rename globally in project
+        //   * rename in open files
+        if (Rep.getFilePath() == File)
+          Replacements.push_back(Rep);
+      }
     }
-  }
-  return Replacements;
+    return Replacements;
+  };
+  return blockingASTRead<RetType>(WorkScheduler, File, std::move(Action));
 }
 
 llvm::Optional<std::string> ClangdServer::getDocument(PathRef File) {
@@ -412,40 +362,35 @@
 }
 
 std::string ClangdServer::dumpAST(PathRef File) {
-  std::shared_ptr<CppFile> Resources = Units.getFile(File);
-  if (!Resources)
-    return "<non-added file>";
+  auto Action = [](llvm::Expected<InputsAndAST> InpAST) -> std::string {
+    if (!InpAST) {
+      (void)(bool) InpAST.takeError();
+      return "<no-ast>";
+    }
+
+    std::string Result;
 
-  std::string Result;
-  Resources->getAST().get()->runUnderLock([&Result](ParsedAST *AST) {
     llvm::raw_string_ostream ResultOS(Result);
-    if (AST) {
-      clangd::dumpAST(*AST, ResultOS);
-    } else {
-      ResultOS << "<no-ast>";
-    }
+    clangd::dumpAST(InpAST->AST, ResultOS);
     ResultOS.flush();
-  });
-  return Result;
+
+    return Result;
+  };
+  return blockingASTRead<std::string>(WorkScheduler, File, std::move(Action));
 }
 
 llvm::Expected<Tagged<std::vector<Location>>>
 ClangdServer::findDefinitions(const Context &Ctx, PathRef File, Position Pos) {
   auto TaggedFS = FSProvider.getTaggedFileSystem(File);
 
-  std::shared_ptr<CppFile> Resources = Units.getFile(File);
-  if (!Resources)
-    return llvm::make_error<llvm::StringError>(
-        "findDefinitions called on non-added file",
-        llvm::errc::invalid_argument);
-
-  std::vector<Location> Result;
-  Resources->getAST().get()->runUnderLock([Pos, &Result, &Ctx](ParsedAST *AST) {
-    if (!AST)
-      return;
-    Result = clangd::findDefinitions(Ctx, *AST, Pos);
-  });
-  return make_tagged(std::move(Result), TaggedFS.Tag);
+  using RetType = llvm::Expected<Tagged<std::vector<Location>>>;
+  auto Action = [=, &Ctx](llvm::Expected<InputsAndAST> InpAST) -> RetType {
+    if (!InpAST)
+      return InpAST.takeError();
+    auto Result = clangd::findDefinitions(Ctx, InpAST->AST, Pos);
+    return make_tagged(std::move(Result), TaggedFS.Tag);
+  };
+  return blockingASTRead<RetType>(WorkScheduler, File, Action);
 }
 
 llvm::Optional<Path> ClangdServer::switchSourceHeader(PathRef Path) {
@@ -532,59 +477,35 @@
 
   auto TaggedFS = FSProvider.getTaggedFileSystem(File);
 
-  std::shared_ptr<CppFile> Resources = Units.getFile(File);
-  if (!Resources)
-    return llvm::make_error<llvm::StringError>(
-        "findDocumentHighlights called on non-added file",
-        llvm::errc::invalid_argument);
-
-  std::vector<DocumentHighlight> Result;
-  llvm::Optional<llvm::Error> Err;
-  Resources->getAST().get()->runUnderLock([Pos, &Ctx, &Err,
-                                           &Result](ParsedAST *AST) {
-    if (!AST) {
-      Err = llvm::make_error<llvm::StringError>("Invalid AST",
-                                                llvm::errc::invalid_argument);
-      return;
-    }
-    Result = clangd::findDocumentHighlights(Ctx, *AST, Pos);
-  });
-
-  if (Err)
-    return std::move(*Err);
-  return make_tagged(Result, TaggedFS.Tag);
+  using RetType = llvm::Expected<Tagged<std::vector<DocumentHighlight>>>;
+  auto Action = [=, &Ctx](llvm::Expected<InputsAndAST> InpAST) -> RetType {
+    if (!InpAST)
+      return InpAST.takeError();
+    auto Result = clangd::findDocumentHighlights(Ctx, InpAST->AST, Pos);
+    return make_tagged(std::move(Result), TaggedFS.Tag);
+  };
+  return blockingASTRead<RetType>(WorkScheduler, File, Action);
 }
 
 std::future<Context> ClangdServer::scheduleReparseAndDiags(
     Context Ctx, PathRef File, VersionedDraft Contents,
-    std::shared_ptr<CppFile> Resources,
     Tagged<IntrusiveRefCntPtr<vfs::FileSystem>> TaggedFS) {
-  assert(Contents.Draft && "Draft must have contents");
-  ParseInputs Inputs = {CompileArgs.getCompileCommand(File),
-                        std::move(TaggedFS.Value), *std::move(Contents.Draft)};
+  tooling::CompileCommand Command = CompileArgs.getCompileCommand(File);
+
+  using OptDiags = llvm::Optional<std::vector<DiagWithFixIts>>;
+
+  DocVersion Version = Contents.Version;
+  Path FileStr = File.str();
+  VFSTag Tag = std::move(TaggedFS.Tag);
 
-  UniqueFunction<llvm::Optional<std::vector<DiagWithFixIts>>(const Context &)>
-      DeferredRebuild = Resources->deferRebuild(std::move(Inputs));
   std::promise<Context> DonePromise;
   std::future<Context> DoneFuture = DonePromise.get_future();
 
-  DocVersion Version = Contents.Version;
-  Path FileStr = File;
-  VFSTag Tag = TaggedFS.Tag;
-  auto ReparseAndPublishDiags =
-      [this, FileStr, Version,
-       Tag](UniqueFunction<llvm::Optional<std::vector<DiagWithFixIts>>(
-                const Context &)>
-                DeferredRebuild,
-            std::promise<Context> DonePromise, Context Ctx) -> void {
+  auto Callback = [this, Version, FileStr,
+                   Tag](std::promise<Context> DonePromise, Context Ctx,
+                        OptDiags Diags) {
     auto Guard =
         llvm::make_scope_exit([&]() { DonePromise.set_value(std::move(Ctx)); });
-
-    auto CurrentVersion = DraftMgr.getVersion(FileStr);
-    if (CurrentVersion != Version)
-      return; // This request is outdated
-
-    auto Diags = DeferredRebuild(Ctx);
     if (!Diags)
       return; // A new reparse was requested before this one completed.
 
@@ -600,36 +521,15 @@
       return;
     LastReportedDiagsVersion = Version;
 
-    DiagConsumer.onDiagnosticsReady(Ctx, FileStr,
-                                    make_tagged(std::move(*Diags), Tag));
+    DiagConsumer.onDiagnosticsReady(
+        Ctx, FileStr, make_tagged(std::move(*Diags), std::move(Tag)));
   };
 
-  WorkScheduler.addToFront(std::move(ReparseAndPublishDiags),
-                           std::move(DeferredRebuild), std::move(DonePromise),
-                           std::move(Ctx));
-  return DoneFuture;
-}
-
-std::future<Context>
-ClangdServer::scheduleCancelRebuild(Context Ctx,
-                                    std::shared_ptr<CppFile> Resources) {
-  std::promise<Context> DonePromise;
-  std::future<Context> DoneFuture = DonePromise.get_future();
-  if (!Resources) {
-    // No need to schedule any cleanup.
-    DonePromise.set_value(std::move(Ctx));
-    return DoneFuture;
-  }
-
-  UniqueFunction<void()> DeferredCancel = Resources->deferCancelRebuild();
-  auto CancelReparses = [Resources](std::promise<Context> DonePromise,
-                                    UniqueFunction<void()> DeferredCancel,
-                                    Context Ctx) {
-    DeferredCancel();
-    DonePromise.set_value(std::move(Ctx));
-  };
-  WorkScheduler.addToFront(std::move(CancelReparses), std::move(DonePromise),
-                           std::move(DeferredCancel), std::move(Ctx));
+  WorkScheduler.update(std::move(Ctx), File,
+                       ParseInputs{std::move(Command),
+                                   std::move(TaggedFS.Value),
+                                   std::move(*Contents.Draft)},
+                       BindWithForward(Callback, std::move(DonePromise)));
   return DoneFuture;
 }
 
@@ -640,5 +540,5 @@
 
 std::vector<std::pair<Path, std::size_t>>
 ClangdServer::getUsedBytesPerFile() const {
-  return Units.getUsedBytesPerFile();
+  return WorkScheduler.getUsedBytesPerFile();
 }
Index: clangd/CMakeLists.txt
===================================================================
--- clangd/CMakeLists.txt
+++ clangd/CMakeLists.txt
@@ -21,7 +21,9 @@
   Protocol.cpp
   ProtocolHandlers.cpp
   SourceCode.cpp
+  Threading.cpp
   Trace.cpp
+  TUScheduler.cpp
   URI.cpp
   XRefs.cpp
   index/FileIndex.cpp
_______________________________________________
cfe-commits mailing list
cfe-commits@lists.llvm.org
http://lists.llvm.org/cgi-bin/mailman/listinfo/cfe-commits

Reply via email to