This is an automated email from the git hooks/post-receive script. civodul pushed a commit to branch main in repository guix-cuirass.
The following commit(s) were added to refs/heads/main by this push: new 9a1452e remote-server: Guard against invalid zmq messages. 9a1452e is described below commit 9a1452ee021c9f773424961cfeef47ca0b7c5c5a Author: Ludovic Courtès <l...@gnu.org> AuthorDate: Wed Apr 3 10:55:15 2024 +0200 remote-server: Guard against invalid zmq messages. Fixes <https://issues.guix.gnu.org/67629>. This allows ‘cuirass remote-server’ to gracefully handle situations where it receives valid zmq messages that do not have the expected number of parts. Previously the receiving fiber would crash with a ‘match-error’ and ‘cuirass remote-server’ would thus stop doing its work (it wouldn’t exit because other fibers would still be running). * src/cuirass/remote.scm (&invalid-message-error): New condition type. (receive-message): Add catch-all clause and raise to ‘&invalid-message-error’. * src/cuirass/scripts/remote-server.scm (serve-build-requests): Guard body against ‘invalid-message-error?’. * tests/remote.scm ("send zmq garbage on the wire"): New test. --- src/cuirass/remote.scm | 34 ++++++- src/cuirass/scripts/remote-server.scm | 172 ++++++++++++++++++---------------- tests/remote.scm | 16 ++++ 3 files changed, 135 insertions(+), 87 deletions(-) diff --git a/src/cuirass/remote.scm b/src/cuirass/remote.scm index 7676cb0..fa46c6c 100644 --- a/src/cuirass/remote.scm +++ b/src/cuirass/remote.scm @@ -1,6 +1,6 @@ ;;; remote.scm -- Build on remote machines. ;;; Copyright © 2020 Mathieu Othacehe <othac...@gnu.org> -;;; Copyright © 2023 Ludovic Courtès <l...@gnu.org> +;;; Copyright © 2023-2024 Ludovic Courtès <l...@gnu.org> ;;; ;;; This file is part of Cuirass. ;;; @@ -37,6 +37,8 @@ #:use-module (srfi srfi-1) #:use-module (srfi srfi-11) #:use-module (srfi srfi-26) + #:use-module (srfi srfi-34) + #:use-module (srfi srfi-35) #:use-module (ice-9 match) #:use-module (ice-9 rdelim) #:use-module (ice-9 suspendable-ports) @@ -87,6 +89,10 @@ send-message receive-message + invalid-message-error? + invalid-message-parts + invalid-message-sender-address + remote-server-service-type)) @@ -436,13 +442,22 @@ the message." (hashq-remove! table socket) (zmq-socket->port socket))))))))) +;; Error raised when receiving a malformed message. +(define-condition-type &invalid-message-error &error + invalid-message-error? + (message-parts invalid-message-parts) + (sender-address invalid-message-sender-address)) + (define* (receive-message socket #:key router?) "Read an sexp from SOCKET, a ZMQ socket, and return it. Return the unspecified value when reading a message without payload. When ROUTER? is true, assume messages received start with a routing prefix (the identity of the peer, as a bytevector), and return three values: -the payload, the peer's identity (a bytevector), and the peer address." +the payload, the peer's identity (a bytevector), and the peer address. + +Raise to '&invalid-message-error' when receiving a message with unexpected +parts." (let ((port (zmq-socket->port socket))) (let wait () ;; Events are edge-triggered so before waiting, check whether there are @@ -473,13 +488,24 @@ the payload, the peer's identity (a bytevector), and the peer address." ((sender (and message (= zmq-message-size 0))) (values *unspecified* (bytevector-copy (zmq-message-content sender)) - (zmq-message-gets message "Peer-Address")))) + (zmq-message-gets message "Peer-Address"))) + (lst + (let ((address (any (lambda (message) + (false-if-exception + (zmq-message-gets message "Peer-Address"))) + lst))) + (raise (condition + (&invalid-message-error (message-parts lst) + (sender-address address))))))) (match (zmq-get-msg-parts-bytevector socket '()) ((#vu8() data) (call-with-input-string (bv->string data) read)) ((#vu8()) - *unspecified*)))) + *unspecified*) + (lst + (raise (condition + (&invalid-message-error (message-parts lst)))))))) ;; ZMQ Messages. (define* (build-request-message drv diff --git a/src/cuirass/scripts/remote-server.scm b/src/cuirass/scripts/remote-server.scm index 51c8ef2..49b1869 100644 --- a/src/cuirass/scripts/remote-server.scm +++ b/src/cuirass/scripts/remote-server.scm @@ -52,6 +52,7 @@ #:use-module (srfi srfi-37) #:use-module (srfi srfi-71) #:use-module (ice-9 atomic) + #:use-module (ice-9 format) #:use-module ((ice-9 ftw) #:select (scandir)) #:use-module (ice-9 match) #:use-module ((ice-9 threads) @@ -426,91 +427,96 @@ FETCH-WORKER to download the build's output(s)." ;; Do not use the built-in zmq-proxy as we want to edit the envelope of ;; frontend messages before forwarding them to the backend. (let loop () - (let* ((command sender sender-address - (receive-message build-socket #:router? #t)) - (reply-worker (lambda (message) - (send-message build-socket message - #:recipient sender)))) - (match command - (`(build-succeeded (drv ,drv) ,_ ,...) - (log-debug "fetching required for ~a (success)" drv) - (put-message fetch-worker command) - #t) - (`(build-failed (drv ,drv) ,_ ,...) - (log-debug "fetching required for ~a (fail)" drv) - (put-message fetch-worker command) - #t) - (`(worker-ready ,worker) - (update-worker! worker)) - (`(worker-request-info) - (catch 'zmq-error - (lambda () - (reply-worker - (server-info-message sender-address - (%log-port) (%publish-port)))) - (const #f))) - (`(worker-request-work ,name) - (let ((worker (db-get-worker name))) - (when worker - (log-debug "~a (~a): request work." - (worker-address worker) - (worker-name worker))) - (let ((build (pop-build name))) - (if build - (let ((derivation (build-derivation build)) - (priority (build-priority build)) - (timeout (build-timeout build)) - (max-silent (build-max-silent-time build))) - (when worker - (log-debug "~a (~a): build ~a submitted." - (worker-address worker) - (worker-name worker) - derivation)) - (db-update-build-worker! derivation name) - (db-update-build-status! derivation (build-status submitted)) - (catch 'zmq-error - (lambda () - (reply-worker - (build-request-message derivation - #:priority priority - #:timeout timeout - #:max-silent max-silent - #:system (build-system - build)))) - (lambda (key errno message . _) - (log-error "while submitting ~a to ~a (~a): ~a" - derivation + (guard (c ((invalid-message-error? c) + (log-error "received an invalid message from ~a:~{ ~a~}" + (invalid-message-sender-address c) + (map zmq-message-size + (invalid-message-parts c))))) + (let* ((command sender sender-address + (receive-message build-socket #:router? #t)) + (reply-worker (lambda (message) + (send-message build-socket message + #:recipient sender)))) + (match command + (`(build-succeeded (drv ,drv) ,_ ,...) + (log-debug "fetching required for ~a (success)" drv) + (put-message fetch-worker command) + #t) + (`(build-failed (drv ,drv) ,_ ,...) + (log-debug "fetching required for ~a (fail)" drv) + (put-message fetch-worker command) + #t) + (`(worker-ready ,worker) + (update-worker! worker)) + (`(worker-request-info) + (catch 'zmq-error + (lambda () + (reply-worker + (server-info-message sender-address + (%log-port) (%publish-port)))) + (const #f))) + (`(worker-request-work ,name) + (let ((worker (db-get-worker name))) + (when worker + (log-debug "~a (~a): request work." + (worker-address worker) + (worker-name worker))) + (let ((build (pop-build name))) + (if build + (let ((derivation (build-derivation build)) + (priority (build-priority build)) + (timeout (build-timeout build)) + (max-silent (build-max-silent-time build))) + (when worker + (log-debug "~a (~a): build ~a submitted." + (worker-address worker) (worker-name worker) + derivation)) + (db-update-build-worker! derivation name) + (db-update-build-status! derivation (build-status submitted)) + (catch 'zmq-error + (lambda () + (reply-worker + (build-request-message derivation + #:priority priority + #:timeout timeout + #:max-silent max-silent + #:system (build-system + build)))) + (lambda (key errno message . _) + (log-error "while submitting ~a to ~a (~a): ~a" + derivation + (worker-name worker) + (worker-address worker) + message) + (db-update-build-status! derivation + (build-status scheduled))))) + (begin + (when worker + (log-debug "~a (~a): no available build." (worker-address worker) - message) - (db-update-build-status! derivation - (build-status scheduled))))) - (begin - (when worker - (log-debug "~a (~a): no available build." - (worker-address worker) - (worker-name worker))) - (catch 'zmq-error - (lambda () - (reply-worker (no-build-message))) - (const #f))))))) - (`(worker-ping ,worker) - (update-worker! worker)) - (`(build-started (drv ,drv) (worker ,name)) - (let ((log-file (log-path (%cache-directory) drv)) - (worker (db-get-worker name))) - (when worker - (log-info "~a (~a): build started: '~a'." - (worker-address worker) - (worker-name worker) - drv)) - (db-update-build-worker! drv name) - (db-update-build-status! drv (build-status started) - #:log-file log-file))) - (_ - (log-warning "ignoring unrecognized message: ~s" command))) - - (loop))))) + (worker-name worker))) + (catch 'zmq-error + (lambda () + (reply-worker (no-build-message))) + (const #f))))))) + (`(worker-ping ,worker) + (update-worker! worker)) + (`(build-started (drv ,drv) (worker ,name)) + (let ((log-file (log-path (%cache-directory) drv)) + (worker (db-get-worker name))) + (when worker + (log-info "~a (~a): build started: '~a'." + (worker-address worker) + (worker-name worker) + drv)) + (db-update-build-worker! drv name) + (db-update-build-status! drv (build-status started) + #:log-file log-file))) + (_ + (log-warning "ignoring unrecognized message: ~s" command))))) + + (loop)))) ;;; diff --git a/tests/remote.scm b/tests/remote.scm index c089045..104349b 100644 --- a/tests/remote.scm +++ b/tests/remote.scm @@ -35,6 +35,9 @@ (avahi) (avahi client) (squee) + (simple-zmq) + (rnrs bytevectors) + (ice-9 binary-ports) (srfi srfi-34) (srfi srfi-64) (ice-9 match) @@ -180,6 +183,19 @@ (start-server) #t)) + (test-assert "send zmq garbage on the wire" + ;; 'cuirass remote-server' should be able to recover when it receives + ;; something that's a valid zmq message but has an unexpected number of + ;; parts. See <https://issues.guix.gnu.org/67629>. + (let* ((context (zmq-create-context)) + (socket (zmq-create-socket context ZMQ_DEALER))) + (zmq-connect socket "tcp://127.0.0.1:5555") + (zmq-send-msg-parts-bytevector socket + (list #vu8(0 77 77 77 77) + #vu8(4 5 6 7 8) + #vu8(4 5 6 7 8))) + (zmq-close-socket socket))) + (test-assert "remote-worker" (begin (start-worker)