I've managed to reproduce this issue when many hosts hit apt-cacher-ng
at the same time.
Attached a patch which fixes it for me - this is a quick and hacky
patch!
Sent debug logs of a run that reproduces this problem and a run with
this patch applied directly to the maintainer.
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 024f6a0..011ab42 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1,5 +1,7 @@
cmake_minimum_required(VERSION 3.1)
+set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
+
# try to set the best C++ language level
set(CMAKE_CXX_STANDARD 20)
# let it take the lowest version, we need some precursor of C++14x
diff --git a/src/job.cc b/src/job.cc
index a2025cc..5c003a9 100644
--- a/src/job.cc
+++ b/src/job.cc
@@ -662,6 +662,18 @@ void job::Prepare(const header &h, string_view headBuf, cmstring& callerHostname
else
m_sFileLoc=theUrl.sHost+theUrl.sPath;
+ // Here we serialize multiple clients trying to download the
+ // same file. Only one thread at a time per URL is allowed to
+ // proceed further in this function.
+ Lockstuff g{h.getRequestUrl()};
+
+ // Check if another job is running. If so link to that.
+ if(g.stuff->otherThread) {
+ m_pItem = m_pParentCon.GetItemRegistry()->Create(m_sFileLoc, ESharingHow::ALWAYS_TRY_SHARING, fileitem::tSpecialPurposeAttr{});
+ USRDBG("Linked to other job");
+ return;
+ }
+
fileitem::tSpecialPurposeAttr attr {
! cfg::offlinemode && data_type == FILE_VOLATILE,
m_bIsHeadOnly,
@@ -697,8 +709,14 @@ void job::Prepare(const header &h, string_view headBuf, cmstring& callerHostname
if(cfg::trackfileuse && fistate >= fileitem::FIST_DLGOTHEAD && fistate < fileitem::FIST_DLERROR)
m_pItem.get()->UpdateHeadTimestamp();
- if(fistate==fileitem::FIST_COMPLETE)
+ if(fistate==fileitem::FIST_COMPLETE) {
+ // Tell everybody waiting for this thread to complete
+ // where to get their m_pItem and register a cleanup
+ // when this job completes.
+ g.setReturnValue(m_pItem.get());
+ m_ipc = std::make_unique<inProgressCleanup>(h.getRequestUrl());
return; // perfect, done here
+ }
if(cfg::offlinemode) { // make sure there will be no problems later in SendData or prepare a user message
// error or needs download but freshness check was disabled, so it's really not complete.
@@ -759,6 +777,11 @@ void job::Prepare(const header &h, string_view headBuf, cmstring& callerHostname
USRERR("PANIC! Error creating download job for " << m_sFileLoc);
return report_overload(__LINE__);
}
+ // Tell everybody waiting for this thread to complete
+ // where to get their m_pItem and register a cleanup
+ // when this job completes.
+ g.setReturnValue(m_pItem.get());
+ m_ipc = std::make_unique<inProgressCleanup>(h.getRequestUrl());
}
}
catch (const std::bad_alloc&) // OOM, may this ever happen here?
@@ -1190,4 +1213,58 @@ void job::AppendMetaHeaders()
<< "\r\nServer: Debian Apt-Cacher NG/" ACVERSION "\r\n"
"\r\n";
}
+
+job::Lockstuff::Lockstuff(const std::string& url_): url(url_) {
+ lockuniq g{inProgressLock};
+ LOGSTARTFUNC;
+ while(true) {
+ auto [it, ins] = inProgress.insert({url, nullptr});
+ if(!ins) {
+ stuff = it->second;
+ if (stuff->otherThread) {
+ break;
+ }
+ // Someone is already downloading this. Add ourselves to the waiters.
+ stuff->cv.wait(g._guard);
+ } else {
+ stuff = it->second = std::make_shared<Stuff>();
+ owner = true;
+ break;
+ }
+ }
+}
+
+void job::Lockstuff::setReturnValue(tFileItemPtr tfip) {
+ LOGSTARTFUNC;
+ if (const auto& it = inProgress.find(url); it != inProgress.end()) {
+ stuff->otherThread = tfip;
+ }
+}
+
+job::Lockstuff::~Lockstuff() {
+ lockuniq g{inProgressLock};
+ LOGSTARTFUNC;
+ if(owner) {
+ stuff->cv.notify_all();
+ // After notify_all, any waiting threads are guaranteed to
+ // be blocked on inProgressLock, not on the condition so
+ // it's safe to delete it. However we have to use shared
+ // pointers because we don't know how long it will take the
+ // waiters to read the tFileItemPtr;
+ if (!stuff->otherThread) {
+ inProgress.erase(url);
+ }
+ }
+}
+
+job::inProgressCleanup::~inProgressCleanup() {
+ lockuniq g{inProgressLock};
+ LOGSTARTFUNC;
+ if (const auto& it = inProgress.find(url); it != inProgress.end()) {
+ inProgress.erase(it);
+ }
+}
+
+std::map<std::string, std::shared_ptr<job::Stuff>> job::inProgress;
+base_with_mutex job::inProgressLock;
}
diff --git a/src/job.h b/src/job.h
index cb162a6..97446e2 100644
--- a/src/job.h
+++ b/src/job.h
@@ -16,6 +16,39 @@ class header;
class job
{
+private:
+ // Lock controlling access to inProgress
+ static base_with_mutex inProgressLock;
+
+ // The data that we store in inProgress
+ struct Stuff {
+ std::condition_variable cv;
+ tFileItemPtr otherThread = 0;
+ };
+
+ // Map from URL to Stuff for in progress jobs that are requesting this file.
+ // The entry is "owned" by the job that added it and it is deleted when the job completes.
+ static std::map<std::string, std::shared_ptr<Stuff>> inProgress;
+
+ // Where all the real work is done.
+ struct Lockstuff {
+ const std::string url;
+ std::shared_ptr<Stuff> stuff;
+ bool owner = false;
+ Lockstuff(const std::string& url_);
+ void setReturnValue(tFileItemPtr tfip);
+ ~Lockstuff();
+
+ };
+
+ // Simple class which is destroyed when the job is destroyed. It deletes the entry from inProgress.
+ struct inProgressCleanup {
+ const std::string url;
+ inProgressCleanup(const std::string& url_) : url(url_) { }
+ ~inProgressCleanup();
+ };
+
+ std::unique_ptr<inProgressCleanup> m_ipc;
public:
enum eJobResult : short