This is an automated email from the git hooks/post-receive script.

civodul pushed a commit to branch main
in repository guix-cuirass.

The following commit(s) were added to refs/heads/main by this push:
     new 9a1452e  remote-server: Guard against invalid zmq messages.
9a1452e is described below

commit 9a1452ee021c9f773424961cfeef47ca0b7c5c5a
Author: Ludovic Courtès <l...@gnu.org>
AuthorDate: Wed Apr 3 10:55:15 2024 +0200

    remote-server: Guard against invalid zmq messages.
    
    Fixes <https://issues.guix.gnu.org/67629>.
    
    This allows ‘cuirass remote-server’ to gracefully handle situations
    where it receives valid zmq messages that do not have the expected
    number of parts.
    
    Previously the receiving fiber would crash with a ‘match-error’ and
    ‘cuirass remote-server’ would thus stop doing its work (it wouldn’t exit
    because other fibers would still be running).
    
    * src/cuirass/remote.scm (&invalid-message-error): New condition type.
    (receive-message): Add catch-all clause and raise to
    ‘&invalid-message-error’.
    * src/cuirass/scripts/remote-server.scm (serve-build-requests): Guard
    body against ‘invalid-message-error?’.
    * tests/remote.scm ("send zmq garbage on the wire"): New test.
---
 src/cuirass/remote.scm                |  34 ++++++-
 src/cuirass/scripts/remote-server.scm | 172 ++++++++++++++++++----------------
 tests/remote.scm                      |  16 ++++
 3 files changed, 135 insertions(+), 87 deletions(-)

diff --git a/src/cuirass/remote.scm b/src/cuirass/remote.scm
index 7676cb0..fa46c6c 100644
--- a/src/cuirass/remote.scm
+++ b/src/cuirass/remote.scm
@@ -1,6 +1,6 @@
 ;;; remote.scm -- Build on remote machines.
 ;;; Copyright © 2020 Mathieu Othacehe <othac...@gnu.org>
-;;; Copyright © 2023 Ludovic Courtès <l...@gnu.org>
+;;; Copyright © 2023-2024 Ludovic Courtès <l...@gnu.org>
 ;;;
 ;;; This file is part of Cuirass.
 ;;;
@@ -37,6 +37,8 @@
   #:use-module (srfi srfi-1)
   #:use-module (srfi srfi-11)
   #:use-module (srfi srfi-26)
+  #:use-module (srfi srfi-34)
+  #:use-module (srfi srfi-35)
   #:use-module (ice-9 match)
   #:use-module (ice-9 rdelim)
   #:use-module (ice-9 suspendable-ports)
@@ -87,6 +89,10 @@
             send-message
             receive-message
 
+            invalid-message-error?
+            invalid-message-parts
+            invalid-message-sender-address
+
             remote-server-service-type))
 
 
@@ -436,13 +442,22 @@ the message."
                  (hashq-remove! table socket)
                  (zmq-socket->port socket)))))))))
 
+;; Error raised when receiving a malformed message.
+(define-condition-type &invalid-message-error &error
+  invalid-message-error?
+  (message-parts  invalid-message-parts)
+  (sender-address invalid-message-sender-address))
+
 (define* (receive-message socket #:key router?)
   "Read an sexp from SOCKET, a ZMQ socket, and return it.  Return the
 unspecified value when reading a message without payload.
 
 When ROUTER? is true, assume messages received start with a routing
 prefix (the identity of the peer, as a bytevector), and return three values:
-the payload, the peer's identity (a bytevector), and the peer address."
+the payload, the peer's identity (a bytevector), and the peer address.
+
+Raise to '&invalid-message-error' when receiving a message with unexpected
+parts."
   (let ((port (zmq-socket->port socket)))
     (let wait ()
       ;; Events are edge-triggered so before waiting, check whether there are
@@ -473,13 +488,24 @@ the payload, the peer's identity (a bytevector), and the 
peer address."
         ((sender (and message (= zmq-message-size 0)))
          (values *unspecified*
                  (bytevector-copy (zmq-message-content sender))
-                 (zmq-message-gets message "Peer-Address"))))
+                 (zmq-message-gets message "Peer-Address")))
+        (lst
+         (let ((address (any (lambda (message)
+                               (false-if-exception
+                                (zmq-message-gets message "Peer-Address")))
+                             lst)))
+           (raise (condition
+                   (&invalid-message-error (message-parts lst)
+                                           (sender-address address)))))))
       (match (zmq-get-msg-parts-bytevector socket '())
         ((#vu8() data)
          (call-with-input-string (bv->string data)
            read))
         ((#vu8())
-         *unspecified*))))
+         *unspecified*)
+        (lst
+         (raise (condition
+                 (&invalid-message-error (message-parts lst))))))))
 
 ;; ZMQ Messages.
 (define* (build-request-message drv
diff --git a/src/cuirass/scripts/remote-server.scm 
b/src/cuirass/scripts/remote-server.scm
index 51c8ef2..49b1869 100644
--- a/src/cuirass/scripts/remote-server.scm
+++ b/src/cuirass/scripts/remote-server.scm
@@ -52,6 +52,7 @@
   #:use-module (srfi srfi-37)
   #:use-module (srfi srfi-71)
   #:use-module (ice-9 atomic)
+  #:use-module (ice-9 format)
   #:use-module ((ice-9 ftw) #:select (scandir))
   #:use-module (ice-9 match)
   #:use-module ((ice-9 threads)
@@ -426,91 +427,96 @@ FETCH-WORKER to download the build's output(s)."
     ;; Do not use the built-in zmq-proxy as we want to edit the envelope of
     ;; frontend messages before forwarding them to the backend.
     (let loop ()
-      (let* ((command sender sender-address
-                      (receive-message build-socket #:router? #t))
-             (reply-worker (lambda (message)
-                             (send-message build-socket message
-                                           #:recipient sender))))
-        (match command
-          (`(build-succeeded (drv ,drv) ,_ ,...)
-           (log-debug "fetching required for ~a (success)" drv)
-           (put-message fetch-worker command)
-           #t)
-          (`(build-failed (drv ,drv) ,_ ,...)
-           (log-debug "fetching required for ~a (fail)" drv)
-           (put-message fetch-worker command)
-           #t)
-          (`(worker-ready ,worker)
-           (update-worker! worker))
-          (`(worker-request-info)
-           (catch 'zmq-error
-             (lambda ()
-               (reply-worker
-                (server-info-message sender-address
-                                     (%log-port) (%publish-port))))
-             (const #f)))
-          (`(worker-request-work ,name)
-           (let ((worker (db-get-worker name)))
-             (when worker
-               (log-debug "~a (~a): request work."
-                          (worker-address worker)
-                          (worker-name worker)))
-             (let ((build (pop-build name)))
-               (if build
-                   (let ((derivation (build-derivation build))
-                         (priority (build-priority build))
-                         (timeout (build-timeout build))
-                         (max-silent (build-max-silent-time build)))
-                     (when worker
-                       (log-debug "~a (~a): build ~a submitted."
-                                  (worker-address worker)
-                                  (worker-name worker)
-                                  derivation))
-                     (db-update-build-worker! derivation name)
-                     (db-update-build-status! derivation (build-status 
submitted))
-                     (catch 'zmq-error
-                       (lambda ()
-                         (reply-worker
-                          (build-request-message derivation
-                                                 #:priority priority
-                                                 #:timeout timeout
-                                                 #:max-silent max-silent
-                                                 #:system (build-system
-                                                            build))))
-                       (lambda (key errno message . _)
-                         (log-error "while submitting ~a to ~a (~a): ~a"
-                                    derivation
+      (guard (c ((invalid-message-error? c)
+                 (log-error "received an invalid message from ~a:~{ ~a~}"
+                            (invalid-message-sender-address c)
+                            (map zmq-message-size
+                                 (invalid-message-parts c)))))
+        (let* ((command sender sender-address
+                        (receive-message build-socket #:router? #t))
+               (reply-worker (lambda (message)
+                               (send-message build-socket message
+                                             #:recipient sender))))
+          (match command
+            (`(build-succeeded (drv ,drv) ,_ ,...)
+             (log-debug "fetching required for ~a (success)" drv)
+             (put-message fetch-worker command)
+             #t)
+            (`(build-failed (drv ,drv) ,_ ,...)
+             (log-debug "fetching required for ~a (fail)" drv)
+             (put-message fetch-worker command)
+             #t)
+            (`(worker-ready ,worker)
+             (update-worker! worker))
+            (`(worker-request-info)
+             (catch 'zmq-error
+               (lambda ()
+                 (reply-worker
+                  (server-info-message sender-address
+                                       (%log-port) (%publish-port))))
+               (const #f)))
+            (`(worker-request-work ,name)
+             (let ((worker (db-get-worker name)))
+               (when worker
+                 (log-debug "~a (~a): request work."
+                            (worker-address worker)
+                            (worker-name worker)))
+               (let ((build (pop-build name)))
+                 (if build
+                     (let ((derivation (build-derivation build))
+                           (priority (build-priority build))
+                           (timeout (build-timeout build))
+                           (max-silent (build-max-silent-time build)))
+                       (when worker
+                         (log-debug "~a (~a): build ~a submitted."
+                                    (worker-address worker)
                                     (worker-name worker)
+                                    derivation))
+                       (db-update-build-worker! derivation name)
+                       (db-update-build-status! derivation (build-status 
submitted))
+                       (catch 'zmq-error
+                         (lambda ()
+                           (reply-worker
+                            (build-request-message derivation
+                                                   #:priority priority
+                                                   #:timeout timeout
+                                                   #:max-silent max-silent
+                                                   #:system (build-system
+                                                              build))))
+                         (lambda (key errno message . _)
+                           (log-error "while submitting ~a to ~a (~a): ~a"
+                                      derivation
+                                      (worker-name worker)
+                                      (worker-address worker)
+                                      message)
+                           (db-update-build-status! derivation
+                                                    (build-status 
scheduled)))))
+                     (begin
+                       (when worker
+                         (log-debug "~a (~a): no available build."
                                     (worker-address worker)
-                                    message)
-                         (db-update-build-status! derivation
-                                                  (build-status scheduled)))))
-                   (begin
-                     (when worker
-                       (log-debug "~a (~a): no available build."
-                                  (worker-address worker)
-                                  (worker-name worker)))
-                     (catch 'zmq-error
-                       (lambda ()
-                         (reply-worker (no-build-message)))
-                       (const #f)))))))
-          (`(worker-ping ,worker)
-           (update-worker! worker))
-          (`(build-started (drv ,drv) (worker ,name))
-           (let ((log-file (log-path (%cache-directory) drv))
-                 (worker (db-get-worker name)))
-             (when worker
-               (log-info "~a (~a): build started: '~a'."
-                         (worker-address worker)
-                         (worker-name worker)
-                         drv))
-             (db-update-build-worker! drv name)
-             (db-update-build-status! drv (build-status started)
-                                      #:log-file log-file)))
-          (_
-           (log-warning "ignoring unrecognized message: ~s" command)))
-
-        (loop)))))
+                                    (worker-name worker)))
+                       (catch 'zmq-error
+                         (lambda ()
+                           (reply-worker (no-build-message)))
+                         (const #f)))))))
+            (`(worker-ping ,worker)
+             (update-worker! worker))
+            (`(build-started (drv ,drv) (worker ,name))
+             (let ((log-file (log-path (%cache-directory) drv))
+                   (worker (db-get-worker name)))
+               (when worker
+                 (log-info "~a (~a): build started: '~a'."
+                           (worker-address worker)
+                           (worker-name worker)
+                           drv))
+               (db-update-build-worker! drv name)
+               (db-update-build-status! drv (build-status started)
+                                        #:log-file log-file)))
+            (_
+             (log-warning "ignoring unrecognized message: ~s" command)))))
+
+      (loop))))
 
 
 ;;;
diff --git a/tests/remote.scm b/tests/remote.scm
index c089045..104349b 100644
--- a/tests/remote.scm
+++ b/tests/remote.scm
@@ -35,6 +35,9 @@
              (avahi)
              (avahi client)
              (squee)
+             (simple-zmq)
+             (rnrs bytevectors)
+             (ice-9 binary-ports)
              (srfi srfi-34)
              (srfi srfi-64)
              (ice-9 match)
@@ -180,6 +183,19 @@
       (start-server)
       #t))
 
+  (test-assert "send zmq garbage on the wire"
+    ;; 'cuirass remote-server' should be able to recover when it receives
+    ;; something that's a valid zmq message but has an unexpected number of
+    ;; parts.  See <https://issues.guix.gnu.org/67629>.
+    (let* ((context (zmq-create-context))
+           (socket (zmq-create-socket context ZMQ_DEALER)))
+      (zmq-connect socket "tcp://127.0.0.1:5555")
+      (zmq-send-msg-parts-bytevector socket
+                                     (list #vu8(0 77 77 77 77)
+                                           #vu8(4 5 6 7 8)
+                                           #vu8(4 5 6 7 8)))
+      (zmq-close-socket socket)))
+
   (test-assert "remote-worker"
     (begin
       (start-worker)

Reply via email to