This is an automated email from the ASF dual-hosted git repository.

cliffjansen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git


The following commit(s) were added to refs/heads/main by this push:
     new cb637b79e PROTON-2748: Raw connection async close fix and tests. First 
part of pull 402
cb637b79e is described below

commit cb637b79ee2a4cb74c5332975c510f672c2c05fa
Author: Clifford Jansen <cliffjan...@apache.org>
AuthorDate: Mon Sep 11 23:00:12 2023 -0700

    PROTON-2748: Raw connection async close fix and tests. First part of pull 
402
---
 c/src/proactor/epoll_raw_connection.c    |  96 ++++---
 c/src/proactor/raw_connection-internal.h |   2 +
 c/src/proactor/raw_connection.c          |  35 ++-
 c/tests/raw_wake_test.cpp                | 417 ++++++++++++++++++++++++++++++-
 4 files changed, 499 insertions(+), 51 deletions(-)

diff --git a/c/src/proactor/epoll_raw_connection.c 
b/c/src/proactor/epoll_raw_connection.c
index 9b85b15f1..c0e731066 100644
--- a/c/src/proactor/epoll_raw_connection.c
+++ b/c/src/proactor/epoll_raw_connection.c
@@ -50,7 +50,8 @@ struct praw_connection_t {
   struct addrinfo *ai;               /* Current connect address */
   bool connected;
   bool disconnected;
-  bool batch_empty;
+  bool hup_detected;
+  bool read_check;
 };
 
 static void psocket_error(praw_connection_t *rc, int err, const char* msg) {
@@ -318,10 +319,7 @@ static pn_event_t *pni_raw_batch_next(pn_event_batch_t 
*batch) {
   unlock(&rc->task.mutex);
   if (waking) pni_raw_wake(raw);
 
-  pn_event_t *e = pni_raw_event_next(raw);
-  if (!e || pn_event_type(e) == PN_RAW_CONNECTION_DISCONNECTED)
-    rc->batch_empty = true;
-  return e;
+  return pni_raw_event_next(raw);
 }
 
 task_t *pni_psocket_raw_task(psocket_t* ps) {
@@ -393,10 +391,10 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t, 
uint32_t io_events, bool
     if (rc->disconnected) {
       pni_raw_connect_failed(&rc->raw_connection);
       unlock(&rc->task.mutex);
-      rc->batch_empty = false;
       return &rc->batch;
     }
     if (events & (EPOLLHUP | EPOLLERR)) {
+      // Continuation of praw_connection_maybe_connect_lh() logic.
       // A wake can be the first event.  Otherwise, wait for connection to 
complete.
       bool event_pending = task_wake || 
pni_raw_wake_is_pending(&rc->raw_connection) || 
pn_collector_peek(rc->raw_connection.collector);
       t->working = event_pending;
@@ -405,35 +403,46 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t, 
uint32_t io_events, bool
     }
     if (events & EPOLLOUT)
       praw_connection_connected_lh(rc);
+    unlock(&rc->task.mutex);
+    return &rc->batch;
   }
   unlock(&rc->task.mutex);
 
-  if (events & EPOLLIN) pni_raw_read(&rc->raw_connection, fd, rcv, set_error);
+  if (events & EPOLLERR) {
+    // Read and write sides closed via RST.  Tear down immediately.
+    int soerr;
+    socklen_t soerrlen = sizeof(soerr);
+    int ec = getsockopt(fd, SOL_SOCKET, SO_ERROR, &soerr, &soerrlen);
+    if (ec == 0 && soerr) {
+      psocket_error(rc, soerr, "async disconnect");
+    }
+    pni_raw_async_disconnect(&rc->raw_connection);
+    return &rc->batch;
+  }
+  if (events & EPOLLHUP) {
+    rc->hup_detected = true;
+  }
+
+  if (events & (EPOLLIN | EPOLLRDHUP) || rc->read_check) {
+    pni_raw_read(&rc->raw_connection, fd, rcv, set_error);
+    rc->read_check = false;
+  }
   if (events & EPOLLOUT) pni_raw_write(&rc->raw_connection, fd, snd, 
set_error);
-  rc->batch_empty = false;
   return &rc->batch;
 }
 
 void pni_raw_connection_done(praw_connection_t *rc) {
   bool notify = false;
   bool ready = false;
-  bool have_event = false;
-
-  // If !batch_empty, can't be sure state machine up to date, so reschedule 
task if necessary.
-  if (!rc->batch_empty) {
-    if (pn_collector_peek(rc->raw_connection.collector))
-      have_event = true;
-    else {
-      pn_event_t *e = pni_raw_batch_next(&rc->batch);
-      // State machine up to date.
-      if (e) {
-        have_event = true;
-        // Sole event.  Can put back without order issues.
-        // Edge case, performance not important.
-        pn_collector_put(rc->raw_connection.collector, pn_event_class(e), 
pn_event_context(e), pn_event_type(e));
-      }
-    }
-  }
+  pn_raw_connection_t *raw = &rc->raw_connection;
+  int fd = rc->psocket.epoll_io.fd;
+
+  // Try write
+  if (pni_raw_can_write(raw)) pni_raw_write(raw, fd, snd, set_error);
+  pni_raw_process_shutdown(raw, fd, shutr, shutw);
+
+  // Update state machine and check for possible pending event.
+  bool have_event = pni_raw_batch_has_events(raw);
 
   lock(&rc->task.mutex);
   pn_proactor_t *p = rc->task.proactor;
@@ -442,24 +451,31 @@ void pni_raw_connection_done(praw_connection_t *rc) {
   // The task may be in the ready state even if we've got no raw connection
   // wakes outstanding because we dealt with it already in pni_raw_batch_next()
   notify = (pni_task_wake_pending(&rc->task) || have_event) && 
schedule(&rc->task);
-  ready = rc->task.ready;
+  ready = rc->task.ready;  // No need to poll.  Already scheduled.
   unlock(&rc->task.mutex);
 
-  pn_raw_connection_t *raw = &rc->raw_connection;
-  int fd = rc->psocket.epoll_io.fd;
-  pni_raw_process_shutdown(raw, fd, shutr, shutw);
-  int wanted =
-    (pni_raw_can_read(raw)  ? EPOLLIN : 0) |
-    (pni_raw_can_write(raw) ? EPOLLOUT : 0);
-  if (wanted) {
-    rc->psocket.epoll_io.wanted = wanted;
-    rearm_polling(&rc->psocket.epoll_io, p->epollfd);  // TODO: check for error
+  bool finished_disconnect = raw->state==conn_fini && !ready && 
!raw->disconnectpending;
+  if (finished_disconnect) {
+    // If we're closed and we've sent the disconnect then close
+    pni_raw_finalize(raw);
+    praw_connection_cleanup(rc);
+  } else if (ready) {
+    // Already scheduled to run.  Skip poll.  Remember if we want a read.
+    rc->read_check = pni_raw_can_read(raw);
+  } else if (!rc->connected) {
+    // Connect logic has already armed the socket.
   } else {
-    bool finished_disconnect = raw->state==conn_fini && !ready && 
!raw->disconnectpending;
-    if (finished_disconnect) {
-      // If we're closed and we've sent the disconnect then close
-      pni_raw_finalize(raw);
-      praw_connection_cleanup(rc);
+    // Must poll for IO.
+    int wanted =
+      (pni_raw_can_read(raw)  ? (EPOLLIN | EPOLLRDHUP) : 0) |
+      (pni_raw_can_write(raw) ? EPOLLOUT : 0);
+
+    // wanted == 0 implies we block until either application wake() or 
EPOLLHUP | EPOLLERR.
+    // If wanted == 0 and hup_detected, blocking not possible, so skip arming 
until
+    // application provides read buffers.
+    if (wanted || !rc->hup_detected) {
+      rc->psocket.epoll_io.wanted = wanted;
+      rearm_polling(&rc->psocket.epoll_io, p->epollfd);  // TODO: check for 
error
     }
   }
 
diff --git a/c/src/proactor/raw_connection-internal.h 
b/c/src/proactor/raw_connection-internal.h
index 47b0ea925..cdfd5a852 100644
--- a/c/src/proactor/raw_connection-internal.h
+++ b/c/src/proactor/raw_connection-internal.h
@@ -134,9 +134,11 @@ void pni_raw_write_close(pn_raw_connection_t *conn);
 void pni_raw_read(pn_raw_connection_t *conn, int sock, long (*recv)(int, 
void*, size_t), void (*set_error)(pn_raw_connection_t *, const char *, int));
 void pni_raw_write(pn_raw_connection_t *conn, int sock, long (*send)(int, 
const void*, size_t), void (*set_error)(pn_raw_connection_t *, const char *, 
int));
 void pni_raw_process_shutdown(pn_raw_connection_t *conn, int sock, int 
(*shutdown_rd)(int), int (*shutdown_wr)(int));
+void pni_raw_async_disconnect(pn_raw_connection_t *conn);
 bool pni_raw_can_read(pn_raw_connection_t *conn);
 bool pni_raw_can_write(pn_raw_connection_t *conn);
 pn_event_t *pni_raw_event_next(pn_raw_connection_t *conn);
+bool pni_raw_batch_has_events(pn_raw_connection_t *conn);
 void pni_raw_initialize(pn_raw_connection_t *conn);
 void pni_raw_finalize(pn_raw_connection_t *conn);
 
diff --git a/c/src/proactor/raw_connection.c b/c/src/proactor/raw_connection.c
index 3d8b976c6..89fbbd1dd 100644
--- a/c/src/proactor/raw_connection.c
+++ b/c/src/proactor/raw_connection.c
@@ -669,12 +669,14 @@ bool pni_raw_can_write(pn_raw_connection_t *conn) {
   return !pni_raw_wdrained(conn) && conn->wbuffer_first_towrite;
 }
 
-pn_event_t *pni_raw_event_next(pn_raw_connection_t *conn) {
+bool pni_raw_batch_has_events(pn_raw_connection_t *conn) {
+  // If collector empty, advance state machine as necessary and generate next 
event.
+  // Return true if at least one event is available.
   assert(conn);
   do {
-    pn_event_t *event = pn_collector_next(conn->collector);
+    pn_event_t *event = pn_collector_peek(conn->collector);
     if (event) {
-      return pni_log_event(conn, event);
+      return true;
     } else if (conn->connectpending) {
       pni_raw_put_event(conn, PN_RAW_CONNECTION_CONNECTED);
       conn->connectpending = false;
@@ -716,11 +718,20 @@ pn_event_t *pni_raw_event_next(pn_raw_connection_t *conn) 
{
       pni_raw_put_event(conn, PN_RAW_CONNECTION_NEED_READ_BUFFERS);
       conn->rrequestedbuffers = true;
     } else {
-      return NULL;
+      return false;
     }
   } while (true);
 }
 
+pn_event_t *pni_raw_event_next(pn_raw_connection_t *conn) {
+  if (pni_raw_batch_has_events(conn)) {
+    pn_event_t* event = pn_collector_next(conn->collector);
+    assert(event);
+    return pni_log_event(conn, event);
+  }
+  return NULL;
+}
+
 void pni_raw_read_close(pn_raw_connection_t *conn) {
   // If already fully closed nothing to do
   if (pni_raw_rwclosed(conn)) {
@@ -781,6 +792,22 @@ void pni_raw_close(pn_raw_connection_t *conn) {
   }
 }
 
+void pni_raw_async_disconnect(pn_raw_connection_t *conn) {
+  if (pni_raw_rwclosed(conn))
+    return;
+
+  if (!pni_raw_rclosed(conn)) {
+    conn->state = pni_raw_new_state(conn, conn_read_closed);
+    conn->rclosedpending = true;
+  }
+  if (!pni_raw_wclosed(conn)) {
+    pni_raw_release_buffers(conn);
+    conn->state = pni_raw_new_state(conn, conn_write_closed);
+    conn->wclosedpending = true;
+  }
+  pni_raw_disconnect(conn);
+}
+
 bool pn_raw_connection_is_read_closed(pn_raw_connection_t *conn) {
   assert(conn);
   return pni_raw_rclosed(conn);
diff --git a/c/tests/raw_wake_test.cpp b/c/tests/raw_wake_test.cpp
index 4a5dc23d3..80ddc2028 100644
--- a/c/tests/raw_wake_test.cpp
+++ b/c/tests/raw_wake_test.cpp
@@ -28,11 +28,22 @@
 #include <sys/socket.h>
 #include <unistd.h>
 #include <errno.h>
+#include <arpa/inet.h>
 #endif
 
 #include <string.h>
 
-// WAKE tests require a running proactor.
+// Raw connection tests driven by a proactor.
+
+// These tests often cheat by directly calling API functions that
+// would normally be called in an event callback for thread safety
+// reasons.  This can usually work because the proactors and API calls
+// are all called from a single thread so there is no contention, but
+// the raw connection may require a wake so that the state machine and
+// polling mask can be updated.  Note that wakes stop working around
+// the time the raw connection thinks it is about to be fully closed,
+// so close operations may need to be done in event callbacks to
+// avoid wake uncertainty.
 
 #include "../src/proactor/proactor-internal.h"
 #include "./pn_test_proactor.hpp"
@@ -45,14 +56,32 @@ namespace {
 
 class common_handler : public handler {
   bool close_on_wake_;
+  bool write_close_on_wake_;
+  bool stop_on_wake_;
+  bool abort_on_wake_;
+  int closed_read_count_;
+  int closed_write_count_;
+  int disconnect_count_;
+  bool disconnect_error_;
   pn_raw_connection_t *last_server_;
+  pn_raw_buffer_t write_buff_;
 
 public:
-  explicit common_handler() : close_on_wake_(false), last_server_(0) {}
+  explicit common_handler() : close_on_wake_(false), write_close_on_wake_(0), 
stop_on_wake_(false),
+                              abort_on_wake_(false), closed_read_count_(0), 
closed_write_count_(0),
+                              disconnect_count_(0), disconnect_error_(false),
+                              last_server_(0), write_buff_({0}) {}
 
   void set_close_on_wake(bool b) { close_on_wake_ = b; }
-
+  void set_write_close_on_wake(bool b) { write_close_on_wake_ = b; }
+  void set_stop_on_wake(bool b) { stop_on_wake_ = b; }
+  void set_abort_on_wake(bool b) { abort_on_wake_ = b; }
+  int closed_read_count() { return closed_read_count_; }
+  int closed_write_count() { return closed_write_count_; }
+  int disconnect_count() { return disconnect_count_; }
+  bool disconnect_error() { return disconnect_error_; }
   pn_raw_connection_t *last_server() { return last_server_; }
+  void set_write_on_wake(pn_raw_buffer_t *b) { write_buff_ = *b; }
 
   bool handle(pn_event_t *e) override {
     switch (pn_event_type(e)) {
@@ -71,13 +100,41 @@ public:
     } break;
 
     case PN_RAW_CONNECTION_WAKE: {
-      if (close_on_wake_) {
-        pn_raw_connection_t *rc = pn_event_raw_connection(e);
+      if (abort_on_wake_) abort();
+      pn_raw_connection_t *rc = pn_event_raw_connection(e);
+
+      if (write_buff_.size) {
+        // Add the buff for writing before any close operation.
+        CHECK(pn_raw_connection_write_buffers(rc, &write_buff_, 1) == 1);
+        write_buff_.size = 0;
+      }
+      if (write_close_on_wake_)
+        pn_raw_connection_write_close(rc);
+      if (close_on_wake_)
         pn_raw_connection_close(rc);
+      return stop_on_wake_;
+    } break;
+
+    case PN_RAW_CONNECTION_DISCONNECTED: {
+      disconnect_count_++;
+      pn_raw_connection_t *rc = pn_event_raw_connection(e);
+      pn_condition_t *cond = pn_raw_connection_condition(rc);
+      if (disconnect_count_ == 1 && pn_condition_is_set(cond)) {
+        const char *nm = pn_condition_get_name(cond);
+        const char *ds = pn_condition_get_description(cond);
+        if (nm && strlen(nm) > 0 && ds && strlen(ds) > 0)
+          disconnect_error_ = true;
       }
-      return true;
+      return false;
     } break;
 
+    case PN_RAW_CONNECTION_CLOSED_READ:
+      closed_read_count_++;
+      return false;
+
+    case PN_RAW_CONNECTION_CLOSED_WRITE:
+      closed_write_count_++;
+      return false;
 
     default:
       return false;
@@ -85,9 +142,127 @@ public:
   }
 };
 
+static const size_t buffsz = 128;
+
+// Basic test consisting of
+//   client is an OS socket.
+//   server is a pn_raw_connection_t with one shared read/write buffer.
+//   pn_listener_t used to put the two together.
+struct basic_test {
+  common_handler h;
+  proactor p;
+  pn_listener_t *l;
+  int sockfd; // client
+  pn_raw_connection_t *server_rc;
+  char buff[buffsz];
+  bool buff_in_use;
+
+  basic_test() : h(), p(&h) {
+    l = p.listen();
+    REQUIRE_RUN(p, PN_LISTENER_OPEN);
+    sockfd = socket(AF_INET, SOCK_STREAM, 0);
+    REQUIRE(sockfd >= 0);
+    struct sockaddr_in laddr;
+    memset(&laddr, 0, sizeof(laddr));
+    laddr.sin_family = AF_INET;
+    laddr.sin_port = htons(atoi(pn_test::listening_port(l).c_str()));
+    laddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+    connect(sockfd, (const struct sockaddr*) &laddr, sizeof(laddr));
+
+    REQUIRE_RUN(p, PN_LISTENER_ACCEPT);
+    server_rc = h.last_server();
+    REQUIRE_RUN(p, PN_RAW_CONNECTION_NEED_READ_BUFFERS);
+    pn_raw_buffer_t rb = {0, buff, buffsz, 0, 0};
+    CHECK(pn_raw_connection_give_read_buffers(server_rc, &rb, 1) == 1);
+    buff_in_use = true;
+
+    pn_raw_connection_wake(server_rc);
+    REQUIRE_RUN(p, PN_RAW_CONNECTION_WAKE);
+    CHECK(pn_proactor_get(p) == NULL); /* idle */
+  }
+
+  ~basic_test() {
+    pn_listener_close(l);
+    REQUIRE_RUN(p, PN_LISTENER_CLOSE);
+    REQUIRE_RUN(p, PN_PROACTOR_INACTIVE);
+    if (sockfd >= 0) close(sockfd);
+    bool sanity = h.closed_read_count() == 1 && h.closed_write_count() == 1 &&
+      h.disconnect_count() == 1;
+    REQUIRE(sanity == true);
+  }
+
+  void socket_write_close() {
+    if (sockfd < 0) return;
+    shutdown(sockfd, SHUT_WR);
+  }
+
+  void socket_graceful_close() {
+    if (sockfd < 0) return;
+    close(sockfd);
+    sockfd = -1;
+  }
+
+  bool socket_hard_close() {
+    // RST (not FIN), hard/abort close
+    if (sockfd < 0) return false;
+    struct linger lngr;
+    lngr.l_onoff  = 1;
+    lngr.l_linger = 0;
+    if (sockfd < 0) return false;
+    if (setsockopt(sockfd, SOL_SOCKET, SO_LINGER, &lngr, sizeof(lngr)) == 0) {
+      if (close(sockfd) == 0) {
+        sockfd = -1;
+        return true;
+      }
+    }
+    return false;
+  }
+
+  void drain_read_buffer() {
+    assert(buff_in_use);
+    send(sockfd, "FOO", 3, 0);
+    REQUIRE_RUN(p, PN_RAW_CONNECTION_READ);
+    pn_raw_buffer_t rb = {0};
+    REQUIRE(pn_raw_connection_take_read_buffers(server_rc, &rb, 1) == 1);
+    REQUIRE(rb.size == 3);
+    buff_in_use = false;
+  }
+
+  void give_read_buffer() {
+    assert(!buff_in_use);
+    pn_raw_buffer_t rb = {0, buff, buffsz, 0, 0};
+    CHECK(pn_raw_connection_give_read_buffers(server_rc, &rb, 1) == 1);
+    buff_in_use = true;
+  }
+
+  void write_next_wake(const char *m) {
+    assert(!buff_in_use);
+    pn_raw_buffer_t rb = {0, buff, buffsz, 0, 0};
+    size_t l = strlen(m);
+    assert(l < buffsz);
+    strcpy(rb.bytes, m);
+    rb.size = l;
+    h.set_write_on_wake(&rb);
+  }
+
+  int drain_events() {
+    int ec = 0;
+    pn_event_batch_t *batch = NULL;
+    while (batch = pn_proactor_get(p.get())) {
+      pn_event_t *e;
+      while (e = pn_event_batch_next(batch)) {
+        ec++;
+        h.dispatch(e);
+      }
+      pn_proactor_done(p.get(), batch);
+    }
+    return ec;
+  }
+};
 
 } // namespace
 
+
 // Test waking up a connection that is idle
 TEST_CASE("proactor_raw_connection_wake") {
   common_handler h;
@@ -104,7 +279,7 @@ TEST_CASE("proactor_raw_connection_wake") {
   REQUIRE_RUN(p, PN_RAW_CONNECTION_NEED_READ_BUFFERS);
   REQUIRE_RUN(p, PN_RAW_CONNECTION_NEED_READ_BUFFERS);
   CHECK(pn_proactor_get(p) == NULL); /* idle */
-    pn_raw_connection_wake(rc);
+  pn_raw_connection_wake(rc);
   REQUIRE_RUN(p, PN_RAW_CONNECTION_WAKE);
   CHECK(pn_proactor_get(p) == NULL); /* idle */
 
@@ -119,3 +294,231 @@ TEST_CASE("proactor_raw_connection_wake") {
   REQUIRE_RUN(p, PN_LISTENER_CLOSE);
   REQUIRE_RUN(p, PN_PROACTOR_INACTIVE);
 }
+
+// Normal close
+TEST_CASE("raw_connection_graceful_close") {
+  struct basic_test x;
+  x.socket_graceful_close();
+  REQUIRE_RUN(x.p, PN_RAW_CONNECTION_CLOSED_READ);
+  x.h.set_close_on_wake(true);
+  pn_raw_connection_wake(x.server_rc);
+  REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED);
+  REQUIRE(x.h.disconnect_error() == false);
+}
+
+// HARD close
+TEST_CASE("raw_connection_hardclose") {
+  struct basic_test x;
+  x.socket_hard_close();
+  REQUIRE_RUN(x.p, PN_RAW_CONNECTION_CLOSED_READ);
+  REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED);
+  REQUIRE(x.h.disconnect_error() == true);
+}
+
+// HARD close, no read buffer
+TEST_CASE("raw_connection_hardclose_nrb") {
+  struct basic_test x;
+  // Drain read buffer without replenishing
+  x.drain_read_buffer();
+  x.drain_events();
+  CHECK(pn_proactor_get(x.p) == NULL); /* idle */
+  x.socket_hard_close();
+  REQUIRE_RUN(x.p, PN_RAW_CONNECTION_CLOSED_READ);
+  REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED);
+  REQUIRE(x.h.disconnect_error() == true);
+}
+
+// HARD close after read close
+TEST_CASE("raw_connection_readclose_then_hardclose") {
+  struct basic_test x;
+  x.socket_write_close();
+  REQUIRE_RUN(x.p, PN_RAW_CONNECTION_CLOSED_READ);
+  x.drain_events();
+  REQUIRE(x.h.disconnect_count() == 0);
+  x.socket_hard_close();
+  REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED);
+  REQUIRE(x.h.disconnect_error() == true);
+}
+
+// HARD close after read close, no read buffer
+TEST_CASE("raw_connection_readclose_then_hardclose_nrb") {
+  struct basic_test x;
+  // Drain read buffer without replenishing
+  x.drain_read_buffer();
+  x.drain_events();
+  CHECK(pn_proactor_get(x.p) == NULL); /* idle */
+  // Shut of read side should be ignored with no read buffer.
+  x.socket_write_close();
+  CHECK(pn_proactor_get(x.p) == NULL); /* still idle */
+
+  // Confirm raw connection shuts down, even with no read buffer
+  x.socket_hard_close();
+  REQUIRE_RUN(x.p, PN_RAW_CONNECTION_CLOSED_READ);
+  REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED);
+  REQUIRE(x.h.disconnect_error() == true);
+}
+
+// Normal close on socket delays CLOSED_READ event until application makes 
read buffers available
+TEST_CASE("raw_connection_delay_readclose") {
+  struct basic_test x;
+  x.drain_read_buffer();
+  x.socket_graceful_close();
+  x.drain_events();
+  REQUIRE(x.h.closed_read_count() == 0);
+
+  x.give_read_buffer();
+  pn_raw_connection_wake(x.server_rc);
+  REQUIRE_RUN(x.p, PN_RAW_CONNECTION_WAKE);
+  REQUIRE_RUN(x.p, PN_RAW_CONNECTION_CLOSED_READ);
+  REQUIRE(x.h.closed_read_count() == 1);
+
+  x.h.set_close_on_wake(true);
+  pn_raw_connection_wake(x.server_rc);
+  REQUIRE_RUN(x.p, PN_RAW_CONNECTION_WAKE);
+  REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED);
+}
+
+TEST_CASE("raw_connection_rst_on_write") {
+  struct basic_test x;
+  x.drain_read_buffer();
+
+  // Send some data
+  x.write_next_wake("foo");
+  pn_raw_connection_wake(x.server_rc);
+  REQUIRE_RUN(x.p, PN_RAW_CONNECTION_WRITTEN);
+  pn_raw_buffer_t rb = {0};
+  CHECK(pn_raw_connection_take_written_buffers(x.server_rc, &rb, 1) == 1);
+  char b[buffsz];
+  REQUIRE(recv(x.sockfd, b, buffsz, 0) == 3);
+
+  // Repeat, with closed peer socket.
+  x.socket_graceful_close();
+  x.write_next_wake("bar");
+  pn_raw_connection_wake(x.server_rc);
+  // Write or subsequent poll should fail EPIPE
+  REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED);
+  REQUIRE(x.h.disconnect_error() == true);
+}
+
+// One sided close.  No cooperation from peer.
+TEST_CASE("raw_connection_full_close") {
+  struct basic_test x;
+  x.h.set_close_on_wake(true);
+  pn_raw_connection_wake(x.server_rc);
+  // No send/recv/close/shutdown activity from peer socket.
+  REQUIRE_RUN(x.p, PN_RAW_CONNECTION_WAKE);
+  REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED);
+}
+
+// As above.  No read buffer.
+TEST_CASE("raw_connection_full_close_nrb") {
+  struct basic_test x;
+  x.drain_read_buffer();
+  x.h.set_close_on_wake(true);
+  pn_raw_connection_wake(x.server_rc);
+  // No send/recv/close/shutdown activity from peer socket.
+  REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED);
+}
+
+// One sided close, pending write.
+TEST_CASE("raw_connection_close_wdrain") {
+  struct basic_test x;
+  x.drain_read_buffer();
+  // write and then close on next wake
+  x.write_next_wake("fubar");
+  x.h.set_close_on_wake(true);
+  pn_raw_connection_wake(x.server_rc);
+  // No send/recv/close/shutdown activity from peer socket.
+  REQUIRE_RUN(x.p, PN_RAW_CONNECTION_WAKE);
+  REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED);
+  // Now check fubar made it
+  char b[buffsz];
+  REQUIRE(recv(x.sockfd, b, buffsz, 0) == 5);
+  REQUIRE(strncmp("fubar", b, 5) == 0);
+}
+
+// One sided write_close then close.
+TEST_CASE("raw_connection_wclose_full_close") {
+  struct basic_test x;
+  x.h.set_write_close_on_wake(true);
+  pn_raw_connection_wake(x.server_rc);
+  REQUIRE_RUN(x.p, PN_RAW_CONNECTION_WAKE);
+  REQUIRE_RUN(x.p, PN_RAW_CONNECTION_CLOSED_WRITE);
+  x.drain_events();
+  REQUIRE(x.h.closed_read_count() == 0);
+
+  x.h.set_write_close_on_wake(false);
+  x.h.set_close_on_wake(true);
+  pn_raw_connection_wake(x.server_rc);
+  // No send/recv/close/shutdown activity from peer socket.
+  REQUIRE_RUN(x.p, PN_RAW_CONNECTION_WAKE);
+  REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED);
+}
+
+TEST_CASE("raw_connection_wclose_full_close_nrb") {
+  struct basic_test x;
+  x.drain_read_buffer();
+  x.h.set_write_close_on_wake(true);
+  pn_raw_connection_wake(x.server_rc);
+  REQUIRE_RUN(x.p, PN_RAW_CONNECTION_WAKE);
+  REQUIRE_RUN(x.p, PN_RAW_CONNECTION_CLOSED_WRITE);
+  x.drain_events();
+  REQUIRE(x.h.closed_read_count() == 0);
+
+  x.h.set_write_close_on_wake(false);
+  x.h.set_close_on_wake(true);
+  pn_raw_connection_wake(x.server_rc);
+  // No send/recv/close/shutdown activity from peer socket.
+  REQUIRE_RUN(x.p, PN_RAW_CONNECTION_WAKE);
+  REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED);
+}
+
+TEST_CASE("raw_connection_wclose_full_close_wdrain") {
+  struct basic_test x;
+  x.drain_read_buffer();
+  // write and then wclose then close on next wake
+  x.write_next_wake("bar");
+  x.h.set_write_close_on_wake(true);
+  x.h.set_close_on_wake(true);
+  pn_raw_connection_wake(x.server_rc);
+  REQUIRE_RUN(x.p, PN_RAW_CONNECTION_WAKE);
+  // No send/recv/close/shutdown activity from peer socket.
+  REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED);
+  // Now check bar made it
+  char b[buffsz];
+  REQUIRE(recv(x.sockfd, b, buffsz, 0) == 3);
+  REQUIRE(strncmp("bar", b, 3) == 0);
+}
+
+// Half closes each direction.  Raw connection then peer.
+TEST_CASE("raw_connection_wclose_then_rclose") {
+  struct basic_test x;
+  x.h.set_write_close_on_wake(true);
+  pn_raw_connection_wake(x.server_rc);
+  x.drain_events();
+  REQUIRE(x.h.closed_write_count() == 1);
+  REQUIRE(x.h.closed_read_count() == 0);
+
+  char b[buffsz];
+  REQUIRE(recv(x.sockfd, b, buffsz, 0) == 0); // EOF
+  x.socket_write_close();
+  REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED);
+  REQUIRE(x.h.closed_read_count() == 1);
+}
+
+// As above but peer first then raw connection.
+TEST_CASE("raw_connection_rclose_then_wclose") {
+  struct basic_test x;
+  x.socket_write_close();
+  x.drain_events();
+  REQUIRE(x.h.closed_read_count() == 1);
+  REQUIRE(x.h.closed_write_count() == 0);
+
+  x.h.set_write_close_on_wake(true);
+  pn_raw_connection_wake(x.server_rc);
+  REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED);
+  char b[buffsz];
+  REQUIRE(recv(x.sockfd, b, buffsz, 0) == 0); // EOF
+  REQUIRE(x.h.closed_write_count() == 1);
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to