commit: ef108a692f34464cd574a7857f9c2eda83a06265
Author: Michał Górny <mgorny <AT> gentoo <DOT> org>
AuthorDate: Fri Dec 19 19:55:19 2025 +0000
Commit: Michał Górny <mgorny <AT> gentoo <DOT> org>
CommitDate: Fri Dec 19 19:56:46 2025 +0000
URL: https://gitweb.gentoo.org/proj/steve.git/commit/?id=ef108a69
Report process cmdline in more messages, and add bounds checking
Signed-off-by: Michał Górny <mgorny <AT> gentoo.org>
steve.cxx | 154 +++++++++++++++++++++++++++++++++++---------------------------
1 file changed, 88 insertions(+), 66 deletions(-)
diff --git a/steve.cxx b/steve.cxx
index 433c25b..803a7fc 100644
--- a/steve.cxx
+++ b/steve.cxx
@@ -93,6 +93,7 @@ struct steve_process {
std::optional<char> extra_token;
running_job_map running_jobs;
bool warned_incorrect_token{false};
+ std::string cmdline;
~steve_process() {
if (pid_fd != -1)
@@ -236,47 +237,55 @@ static char steve_get_token_char(steve_process *process)
return job_num & 0xFF;
}
+static std::string steve_process_id(uint64_t pid, const steve_process
*process) {
+ if (process && !process->cmdline.empty())
+ return std::format("PID {} ({})", pid, process->cmdline);
+ return std::format("PID {}", pid);
+}
+
static void steve_give_token(steve_state *state, fuse_req_t req, uint64_t pid)
{
- char token = steve_get_token_char(&state->processes[pid]);
+ steve_process *process = &state->processes.at(pid);
+ char token = steve_get_token_char(process);
- if (state->processes[pid].token_reserved) {
- state->processes[pid].tokens_held++;
- state->processes[pid].token_reserved = false;
+ if (process->token_reserved) {
+ process->tokens_held++;
+ process->token_reserved = false;
if (state->verbose)
- std::print(stderr, "Giving reserved token 0x{:02x} to
PID {}, {} left, {} tokens held by process\n",
- token, pid, state->tokens,
state->processes[pid].tokens_held);
+ std::print(stderr, "Giving reserved token 0x{:02x} to
{}, {} left, {} tokens held by process\n",
+ token, steve_process_id(pid, process),
state->tokens, process->tokens_held);
fuse_reply_buf(req, &token, 1);
return;
}
state->tokens--;
- state->processes[pid].tokens_held++;
+ process->tokens_held++;
if (state->verbose) {
if (state->max_load_avg > 0)
- std::print(stderr, "Giving job token 0x{:02x} to PID
{}, {} left, {} tokens held by process, token reserved: {}, load average =
{:.3} (limit: {})\n",
- token, pid, state->tokens,
state->processes[pid].tokens_held, state->processes[pid].token_reserved,
state->load_avg, state->max_load_avg);
+ std::print(stderr, "Giving job token 0x{:02x} to {}, {}
left, {} tokens held by process, token reserved: {}, load average = {:.3}
(limit: {})\n",
+ token, steve_process_id(pid, process),
state->tokens, process->tokens_held, process->token_reserved, state->load_avg,
state->max_load_avg);
else
- std::print(stderr, "Giving job token 0x{:02x} to PID
{}, {} left, {} tokens held by process, token reserved: {}\n",
- token, pid, state->tokens,
state->processes[pid].tokens_held, state->processes[pid].token_reserved);
+ std::print(stderr, "Giving job token 0x{:02x} to {}, {}
left, {} tokens held by process, token reserved: {}\n",
+ token, steve_process_id(pid, process),
state->tokens, process->tokens_held, process->token_reserved);
}
fuse_reply_buf(req, &token, 1);
}
static void steve_reserve_token(steve_state *state, uint64_t pid)
{
- if (state->processes[pid].token_reserved)
+ steve_process *process = &state->processes.at(pid);
+ if (process->token_reserved)
return;
state->tokens--;
- state->processes[pid].token_reserved = true;
+ process->token_reserved = true;
if (state->verbose) {
if (state->max_load_avg > 0)
- std::print(stderr, "Reserving job token for PID {}, {}
left, {} tokens held by process, load average = {:.3} (limit: {})\n",
- pid, state->tokens,
state->processes[pid].tokens_held, state->load_avg, state->max_load_avg);
+ std::print(stderr, "Reserving job token for {}, {}
left, {} tokens held by process, load average = {:.3} (limit: {})\n",
+ steve_process_id(pid, process),
state->tokens, process->tokens_held, state->load_avg, state->max_load_avg);
else
- std::print(stderr, "Reserving job token for PID {}, {}
left, {} tokens held by process\n",
- pid, state->tokens,
state->processes[pid].tokens_held);
+ std::print(stderr, "Reserving job token for {}, {}
left, {} tokens held by process\n",
+ steve_process_id(pid, process),
state->tokens, process->tokens_held);
}
/* TODO: we need to handle expiring reservations if client doesn't read
*/
@@ -303,7 +312,8 @@ static void steve_wake_waiters(steve_state *state)
/* poll request */
steve_reserve_token(state, it->pid);
if (state->verbose)
- std::print(stderr, "Notifying PID {} about
POLLIN\n", it->pid);
+ std::print(stderr, "Notifying {} about
POLLIN\n",
+ steve_process_id(it->pid,
&state->processes.at(it->pid)));
fuse_lowlevel_notify_poll(*poll_handle);
} else
assert(0 && "invalid waiter");
@@ -330,12 +340,14 @@ static void steve_handle_pidfd(evutil_socket_t pid_fd,
short, void *userdata) {
/* can we even have read waiters at
this point? */
fuse_reply_err(*read_req, EPIPE);
if (state->verbose)
- std::print(stderr, "Cleaning up
read waiter for PID {}\n", wit->pid);
+ std::print(stderr, "Cleaning up
read waiter for {}\n",
+ steve_process_id(pid,
&it->second));
} else if (fuse_pollhandle **poll_handle =
std::get_if<fuse_pollhandle *>(&wit->handle)) {
/* notify the poller, just in case */
fuse_lowlevel_notify_poll(*poll_handle);
if (state->verbose)
- std::print(stderr, "Cleaning up
poll notification for PID {}\n", wit->pid);
+ std::print(stderr, "Cleaning up
poll notification for {}\n",
+ steve_process_id(pid,
&it->second));
} else
assert(0 && "invalid waiter");
@@ -347,16 +359,16 @@ static void steve_handle_pidfd(evutil_socket_t pid_fd,
short, void *userdata) {
if (it->second.token_reserved)
++state->tokens;
if (state->verbose || it->second.tokens_held > 0) {
- std::print(stderr, "Process {} exited while
holding {} tokens, token reserved: {}, "
+ std::print(stderr, "{} exited while holding {}
tokens, token reserved: {}, "
"{} tokens available after
returning them\n",
- pid, it->second.tokens_held,
it->second.token_reserved, state->tokens);
+ steve_process_id(pid,
&it->second), it->second.tokens_held, it->second.token_reserved, state->tokens);
}
/* remove the process */
state->processes.erase(it);
/* if we have new tokens, wake the waiters */
steve_wake_waiters(state);
- /* make sure the process was removed */
+ /* make sure the process wasn't readded */
assert(state->processes.find(pid) ==
state->processes.end());
return;
}
@@ -407,7 +419,8 @@ static void steve_open(fuse_req_t req, struct
fuse_file_info *fi)
auto process_it = state->processes.find(fi->fh);
if (process_it != state->processes.end()) {
if (state->verbose)
- std::print(stderr, "Device open again by PID {}\n",
fi->fh);
+ std::print(stderr, "Device open again by {}\n",
+ steve_process_id(fi->fh, &process_it->second));
assert(process_it->second.pid_fd != -1);
assert(process_it->second.pidfd_event);
fuse_reply_open(req, fi);
@@ -460,6 +473,7 @@ static void steve_open(fuse_req_t req, struct
fuse_file_info *fi)
steve_process *process = &state->processes[fi->fh];
process->pid_fd = pid_fd;
process->pidfd_event = std::move(pidfd_event);
+ process->cmdline = std::move(cmdline);
fuse_reply_open(req, fi);
}
@@ -468,7 +482,8 @@ static void steve_release(fuse_req_t req, struct
fuse_file_info *fi)
steve_state *state = static_cast<steve_state *>(fuse_req_userdata(req));
if (state->verbose)
- std::print(stderr, "Device closed by PID {}\n", fi->fh);
+ std::print(stderr, "Device closed by {}\n",
+ steve_process_id(fi->fh, &state->processes.at(fi->fh)));
fuse_reply_err(req, 0);
}
@@ -482,7 +497,8 @@ static void steve_interrupt(fuse_req_t req, void *userdata)
if (fuse_req_t *read_req =
std::get_if<fuse_req_t>(&it->handle)) {
if (*read_req == req) {
if (state->verbose)
- std::print(stderr, "Passed EINTR to PID
{}\n", it->pid);
+ std::print(stderr, "Passed EINTR to
{}\n",
+ steve_process_id(it->pid,
&state->processes.at(it->pid)));
state->waiters.erase(it);
break;
}
@@ -518,14 +534,15 @@ static void steve_read(
state->waiters.emplace_back(steve_waiter{req, fi->fh});
if (state->verbose) {
+ steve_process *process = &state->processes.at(fi->fh);
if (token_avail == steve_token_availability::load_exceeded) {
- std::print(stderr, "Load exceeded while PID {}
requested token, waiting, {} tokens free, "
+ std::print(stderr, "Load exceeded while {} requested
token, waiting, {} tokens free, "
"{} tokens held by process, load
average {:.3} >= {}\n",
- fi->fh, state->tokens,
state->processes[fi->fh].tokens_held,
+ steve_process_id(fi->fh, process),
state->tokens, process->tokens_held,
state->load_avg, state->max_load_avg);
} else
- std::print(stderr, "No free job token for PID {},
waiting, {} tokens held by process\n",
- fi->fh,
state->processes[fi->fh].tokens_held);
+ std::print(stderr, "No free job token for {}, waiting,
{} tokens held by process\n",
+ steve_process_id(fi->fh, process),
process->tokens_held);
}
fuse_req_interrupt_func(req, steve_interrupt, state);
}
@@ -558,51 +575,53 @@ static void steve_write(
struct fuse_file_info *fi)
{
steve_state *state = static_cast<steve_state *>(fuse_req_userdata(req));
+ steve_process *process = &state->processes.at(fi->fh);
if (off != 0) {
fuse_reply_err(req, EIO);
return;
}
if (size > SSIZE_MAX) {
- std::print(stderr, "Warning: process {} tried to return more
than SSIZE_MAX tokens\n",
- fi->fh);
+ std::print(stderr, "Warning: {} tried to return more than
SSIZE_MAX tokens\n",
+ steve_process_id(fi->fh, process));
fuse_reply_err(req, EFBIG);
return;
}
/* workaround for https://github.com/medek/nasm-rs/issues/44 */
- if (state->processes[fi->fh].tokens_held == 0 && size == 1) {
- assert(!state->processes[fi->fh].extra_token.has_value());
- assert(state->processes[fi->fh].running_jobs.empty());
- state->processes[fi->fh].extra_token = data[0];
- std::print(stderr, "Warning: process {} pre-released an
unacquired token 0x{:02x}, please report a bug upstream\n",
- fi->fh, data[0]);
+ if (process->tokens_held == 0 && size == 1) {
+ assert(!process->extra_token.has_value());
+ assert(process->running_jobs.empty());
+ process->extra_token = data[0];
+ std::print(stderr, "Warning: {} pre-released an unacquired
token 0x{:02x}, please report a bug upstream\n",
+ steve_process_id(fi->fh, process), data[0]);
} else {
- if (state->processes[fi->fh].tokens_held <
static_cast<ssize_t>(size)) {
- std::print(stderr, "Warning: process {} tried to return
{} tokens while holding only {} tokens, capping\n",
- fi->fh, size,
state->processes[fi->fh].tokens_held);
- if (state->processes[fi->fh].tokens_held < 0)
+ if (process->tokens_held < static_cast<ssize_t>(size)) {
+ std::print(stderr, "Warning: {} tried to return {}
tokens while holding only {} tokens, capping\n",
+ steve_process_id(fi->fh, process),
size, process->tokens_held);
+ if (process->tokens_held < 0)
size = 0;
else
- size = state->processes[fi->fh].tokens_held;
+ size = process->tokens_held;
}
/* Finish the running jobs */
std::chrono::time_point<std::chrono::steady_clock> current_time
=
std::chrono::steady_clock::now();
for (const char *token = data; token < data + size; ++token) {
- auto it =
steve_find_running_job(&state->processes[fi->fh], *token);
+ auto it = steve_find_running_job(process, *token);
- if (static_cast<uint8_t>(it->first & 0xFF) != *token &&
!state->processes[fi->fh].warned_incorrect_token) {
- std::print(stderr, "Warning: process {}
returned incorrect token value 0x{:02x}, please report a bug upstream\n",
- fi->fh, *token);
- state->processes[fi->fh].warned_incorrect_token
= true;
+ if (static_cast<uint8_t>(it->first & 0xFF) != *token &&
!process->warned_incorrect_token) {
+ std::print(stderr, "Warning: {} returned
incorrect token value 0x{:02x}, please report a bug upstream\n",
+ steve_process_id(fi->fh,
process), *token);
+ process->warned_incorrect_token = true;
}
if (state->verbose)
- std::print(stderr, "PID {} job 0x{:02x}
finished after {}\n", fi->fh, it->first,
+ std::print(stderr, "{} job 0x{:02x} finished
after {}\n",
+ steve_process_id(fi->fh, process),
it->first,
std::chrono::duration<double>(current_time - it->second.start_time));
- state->processes[fi->fh].running_jobs.erase(it);
+ process->running_jobs.erase(it);
}
}
if (size == 0) {
@@ -611,10 +630,10 @@ static void steve_write(
}
state->tokens += size;
- state->processes[fi->fh].tokens_held -= size;
+ process->tokens_held -= size;
if (state->verbose)
- std::print(stderr, "PID {} returned {} tokens, {} available
now, {} tokens held by process, token reserved: {}\n",
- fi->fh, size, state->tokens,
state->processes[fi->fh].tokens_held, state->processes[fi->fh].token_reserved);
+ std::print(stderr, "{} returned {} tokens, {} available now, {}
tokens held by process, token reserved: {}\n",
+ steve_process_id(fi->fh, process), size,
state->tokens, process->tokens_held, process->token_reserved);
fuse_reply_write(req, size);
/* Since we have jobs now, see if anyone's waiting */
@@ -635,16 +654,17 @@ static void steve_poll(
}
if (state->verbose) {
+ steve_process *process = &state->processes.at(fi->fh);
if (token_avail == steve_token_availability::load_exceeded) {
assert(state->max_load_avg > 0);
/* capped by load average */
- std::print(stderr, "Load exceeded while PID {}
requested token, waiting, {} tokens free, "
+ std::print(stderr, "Load exceeded while {} requested
token, waiting, {} tokens free, "
"{} tokens held by process, load
average {:.3} >= {}\n",
- fi->fh, state->tokens,
state->processes[fi->fh].tokens_held,
+ steve_process_id(fi->fh, process),
state->tokens, process->tokens_held,
state->load_avg, state->max_load_avg);
} else
- std::print(stderr, "PID {} requested poll, {} tokens
available, {} tokens held by process\n",
- fi->fh, state->tokens,
state->processes[fi->fh].tokens_held);
+ std::print(stderr, "{} requested poll, {} tokens
available, {} tokens held by process\n",
+ steve_process_id(fi->fh, process),
state->tokens, process->tokens_held);
}
fuse_reply_poll(req, events);
@@ -668,9 +688,10 @@ static void steve_ioctl(
return;
}
+ steve_process *process = &state->processes.at(fi->fh);
if (state->verbose)
- std::print(stderr, "PID {} requested ioctl 0x{:08x}\n",
- fi->fh, ioctl_num);
+ std::print(stderr, "{} requested ioctl 0x{:08x}\n",
+ steve_process_id(fi->fh, process), ioctl_num);
int64_t val;
double dval;
@@ -725,7 +746,7 @@ static void steve_ioctl(
val = sysconf(_SC_NPROCESSORS_ONLN);
state->tokens += val - state->jobs;
state->jobs = val;
- std::print(stderr, "PID {} set jobs to {}\n", fi->fh,
state->jobs);
+ std::print(stderr, "{} set jobs to {}\n",
steve_process_id(fi->fh, process), state->jobs);
if (state->verbose)
std::print(stderr, " new token availability:
{}\n", state->tokens);
if (state->min_jobs > state->jobs) {
@@ -742,13 +763,13 @@ static void steve_ioctl(
return;
}
state->min_jobs = val;
- std::print(stderr, "PID {} set min-jobs to {}\n",
fi->fh, state->min_jobs);
+ std::print(stderr, "{} set min-jobs to {}\n",
steve_process_id(fi->fh, process), state->min_jobs);
fuse_reply_ioctl(req, 0, nullptr, 0);
steve_wake_waiters(state);
break;
case STEVE_IOC_SET_PER_PROCESS_LIMIT:
state->per_process_limit = val;
- std::print(stderr, "PID {} set per-process limit to
{}\n", fi->fh, state->per_process_limit);
+ std::print(stderr, "{} set per-process limit to {}\n",
steve_process_id(fi->fh, process), state->per_process_limit);
fuse_reply_ioctl(req, 0, nullptr, 0);
steve_wake_waiters(state);
break;
@@ -758,7 +779,7 @@ static void steve_ioctl(
return;
}
state->max_load_avg = dval;
- std::print(stderr, "PID {} set load-average to {}\n",
fi->fh, state->max_load_avg);
+ std::print(stderr, "{} set load-average to {}\n",
steve_process_id(fi->fh, process), state->max_load_avg);
fuse_reply_ioctl(req, 0, nullptr, 0);
steve_wake_waiters(state);
break;
@@ -768,8 +789,8 @@ static void steve_ioctl(
return;
}
steve_timeout_to_timeval(&state->recheck_timeout, dval);
- std::print(stderr, "PID {} set load-recheck-timeout to
{} s {} us\n",
- fi->fh, state->recheck_timeout.tv_sec,
state->recheck_timeout.tv_usec);
+ std::print(stderr, "{} set load-recheck-timeout to {} s
{} us\n",
+ steve_process_id(fi->fh, process),
state->recheck_timeout.tv_sec, state->recheck_timeout.tv_usec);
fuse_reply_ioctl(req, 0, nullptr, 0);
/* TODO: reset the event? */
break;
@@ -800,7 +821,8 @@ static void steve_handle_sigusr1(evutil_socket_t, short,
void *userdata) {
std::print(stderr, "steve: currently {} tokens available out of {}\n",
state->tokens, state->jobs);
for (const auto &it : state->processes) {
- std::print(stderr, "PID {} holds {} tokens:\n", it.first,
it.second.tokens_held);
+ std::print(stderr, "{} holds {} tokens:\n",
+ steve_process_id(it.first, &it.second),
it.second.tokens_held);
for (const auto &jt : it.second.running_jobs)
std::print(stderr, " job 0x{:02x} running for {}\n",
jt.first,
std::chrono::duration<double>(current_time -
jt.second.start_time));