This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit f3de4e59bd40a1e1db351b2bf73aa5cf9da45118
Author: Andrew Stitcher <[email protected]>
AuthorDate: Thu Jun 12 18:42:40 2025 -0400

    PROTON-2898: [C] Introduce pn_proactor_import_socket API
    
    Implement functionality to import a socket that is already connected to
    the proactor.
    
    This can be for either the server or client end of the
    connection. So it can be used for both socket activation at the server
    end and protocols running on the connection prior to AMQP at the
    client end.
    
    The new pn_proactor_import_socket API is defined in a new header
    proactor_ext.h which will contain any other similarly OS dependent
    proactor APIs.
---
 c/CMakeLists.txt                |  1 +
 c/include/proton/proactor_ext.h | 70 +++++++++++++++++++++++++++++++++++++++++
 c/src/proactor/epoll.c          | 30 ++++++++++++++++--
 c/src/proactor/libuv.c          | 22 +++++++++++++
 c/src/proactor/win_iocp.cpp     | 52 +++++++++++++++++++++++++++---
 5 files changed, 169 insertions(+), 6 deletions(-)

diff --git a/c/CMakeLists.txt b/c/CMakeLists.txt
index e4440981a..b214126a2 100644
--- a/c/CMakeLists.txt
+++ b/c/CMakeLists.txt
@@ -327,6 +327,7 @@ set (qpid-proton-include
   include/proton/netaddr.h
   include/proton/object.h
   include/proton/proactor.h
+  include/proton/proactor_ext.h
   include/proton/raw_connection.h
   include/proton/sasl.h
   include/proton/sasl_plugin.h
diff --git a/c/include/proton/proactor_ext.h b/c/include/proton/proactor_ext.h
new file mode 100644
index 000000000..27e5b4035
--- /dev/null
+++ b/c/include/proton/proactor_ext.h
@@ -0,0 +1,70 @@
+#ifndef PROTON_PROACTOR_EXT_H
+#define PROTON_PROACTOR_EXT_H 1
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/import_export.h>
+#include <proton/types.h>
+
+#ifdef _WIN32
+#include <stdint.h>
+typedef uintptr_t pn_socket_t;
+#else
+typedef int pn_socket_t;
+#endif
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * @file
+ * @brief Extended proactor functions for native or implementation-specific 
integration.
+ */
+
+/**
+ * Import an existing connected socket into the @p proactor and bind to a @p 
connection and @p transport.
+ * Errors are returned as  @ref PN_TRANSPORT_CLOSED events by 
pn_proactor_wait().
+ *
+ * The @p proactor *takes ownership* of @p connection and will
+ * automatically call pn_connection_free() after the final @ref
+ * PN_TRANSPORT_CLOSED event is handled, or when pn_proactor_free() is
+ * called. You can prevent the automatic free with
+ * pn_proactor_release_connection()
+ *
+ * The @p proactor *takes ownership* of @p transport, it will be freed even
+ * if pn_proactor_release_connection() is called.
+ *
+ * The @p proactor takes ownership of the socket and will close it when the 
connection is closed.
+ *
+ * @note Thread-safe
+ *
+ * @param[in] proactor   The proactor object.
+ * @param[in] connection The connection object, or NULL to create a new one.
+ * @param[in] transport  The transport object, or NULL to create a new one.
+ * @param[in] fd         The file descriptor or socket handle of the connected 
socket.
+ */
+PNP_EXTERN void pn_proactor_import_socket(pn_proactor_t *proactor, 
pn_connection_t *connection, pn_transport_t *transport, pn_socket_t fd);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* PROTON_PROACTOR_EXT_H */
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index 7714c23fe..52068c59a 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -71,11 +71,12 @@
 #include <proton/condition.h>
 #include <proton/connection_driver.h>
 #include <proton/engine.h>
-#include <proton/proactor.h>
-#include <proton/transport.h>
 #include <proton/listener.h>
 #include <proton/netaddr.h>
+#include <proton/proactor.h>
+#include <proton/proactor_ext.h>
 #include <proton/raw_connection.h>
+#include <proton/transport.h>
 
 #include <assert.h>
 #include <stddef.h>
@@ -1470,6 +1471,31 @@ void pn_proactor_connect2(pn_proactor_t *p, 
pn_connection_t *c, pn_transport_t *
   if (notify) notify_poller(p);
 }
 
+void pn_proactor_import_socket(pn_proactor_t *p, pn_connection_t *c, 
pn_transport_t *t, pn_socket_t fd) {
+
+  pconnection_t *pc = (pconnection_t*) malloc(sizeof(pconnection_t));
+  assert(pc); // TODO: memory safety
+  const char *err = pconnection_setup(pc, p, c, t, false, "", 0);
+  if (err) {
+    PN_LOG_DEFAULT(PN_SUBSYSTEM_EVENT, PN_LEVEL_ERROR, "pn_proactor_connect_fd 
failure: %s", err);
+    return;
+  }
+  // TODO: check case of proactor shutting down
+
+  proactor_add(&pc->task);
+  pn_connection_open(pc->driver.connection); /* Auto-open */
+
+  configure_socket(fd);
+
+  lock(&pc->task.mutex);
+  pconnection_start(pc, fd);
+  pconnection_connected_lh(pc);
+
+  bool notify = schedule(&pc->task);
+  unlock(&pc->task.mutex);
+  if (notify) notify_poller(p);
+}
+
 static void pconnection_tick(pconnection_t *pc) {
   pn_transport_t *t = pc->driver.transport;
   if (pn_transport_get_idle_timeout(t) || 
pn_transport_get_remote_idle_timeout(t)) {
diff --git a/c/src/proactor/libuv.c b/c/src/proactor/libuv.c
index 3bef54f0d..25c5d94ec 100644
--- a/c/src/proactor/libuv.c
+++ b/c/src/proactor/libuv.c
@@ -34,6 +34,7 @@
 #include <proton/message.h>
 #include <proton/netaddr.h>
 #include <proton/proactor.h>
+#include <proton/proactor_ext.h>
 #include <proton/raw_connection.h>
 #include <proton/transport.h>
 
@@ -1195,6 +1196,27 @@ void pn_proactor_connect2(pn_proactor_t *p, 
pn_connection_t *c, pn_transport_t *
   work_start(&pc->work);
 }
 
+void pn_proactor_import_socket(pn_proactor_t* p, pn_connection_t* c, 
pn_transport_t* t, pn_socket_t fd) {
+  pconnection_t* pc = pconnection(p, c, t, false);
+  assert(pc);
+  add_active(p);
+  pn_connection_open(pc->driver.connection);
+  int err = pconnection_init(pc);
+  if (err) goto error;
+
+  err = uv_tcp_open(&pc->tcp, fd);
+  if (err) {
+    pconnection_error(pc, err, "uv_tcp_open");
+    goto error;
+  }
+
+  pc->connected = 1;
+  pconnection_addresses(pc);
+
+error:
+  work_start(&pc->work);
+}
+
 void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, 
int backlog) {
   work_init(&l->work, p, T_LISTENER);
   parse_addr(&l->addr, addr);
diff --git a/c/src/proactor/win_iocp.cpp b/c/src/proactor/win_iocp.cpp
index 6f67ed204..85d5821c5 100644
--- a/c/src/proactor/win_iocp.cpp
+++ b/c/src/proactor/win_iocp.cpp
@@ -27,9 +27,9 @@
 #include <proton/netaddr.h>
 #include <proton/object.h>
 #include <proton/proactor.h>
+#include <proton/proactor_ext.h>
 #include <proton/transport.h>
 #include <proton/listener.h>
-#include <proton/proactor.h>
 #include <proton/raw_connection.h>
 
 #include <assert.h>
@@ -209,9 +209,6 @@ iocpdesc_t *pni_deadline_desc(iocp_t *);
 void pni_iocpdesc_start(iocpdesc_t *iocpd);
 void pni_iocp_drain_completions(iocp_t *);
 int pni_iocp_wait_one(iocp_t *, int timeout, pn_error_t *);
-void pni_iocp_start_accepting(iocpdesc_t *iocpd);
-pn_socket_t pni_iocp_end_accept(iocpdesc_t *ld, sockaddr *addr, socklen_t 
*addrlen, bool *would_block, pn_error_t *error);
-pn_socket_t pni_iocp_begin_connect(iocp_t *, pn_socket_t sock, struct addrinfo 
*addr, pn_error_t *error);
 ssize_t pni_iocp_begin_write(iocpdesc_t *, const void *, size_t, bool *, 
pn_error_t *);
 ssize_t pni_iocp_recv(iocpdesc_t *iocpd, void *buf, size_t size, bool 
*would_block, pn_error_t *error);
 void pni_iocp_begin_close(iocpdesc_t *iocpd);
@@ -2748,6 +2745,53 @@ void pn_proactor_connect2(pn_proactor_t *p, 
pn_connection_t *c, pn_transport_t *
   }
 }
 
+void pn_proactor_import_socket(pn_proactor_t* proactor, pn_connection_t* 
connection, pn_transport_t* transport, pn_socket_t fd)
+{
+    // Step 1: Setup the pconnection_t structure
+    pconnection_t* pc = (pconnection_t*)pn_class_new(&pconnection_class, 
sizeof(pconnection_t));
+    if (!pc) {
+        // Handle allocation failure
+        PN_LOG_DEFAULT(PN_SUBSYSTEM_EVENT, PN_LEVEL_ERROR, 
"pn_proactor_import_socket: allocation failure");
+        return;
+    }
+    const char* err = pconnection_setup(pc, proactor, connection, transport, 
false, "");
+    if (err) {
+        PN_LOG_DEFAULT(PN_SUBSYSTEM_EVENT, PN_LEVEL_ERROR, 
"pn_proactor_import_socket: %s", err);
+        return;
+    }
+
+    // Step 2: Initialize the context and add to proactor
+    csguard g(&pc->context.cslock);
+    proactor_add(&pc->context);
+    pc->started = true; // Mark as started, since the socket is already 
connected
+
+    // Step 3: Wrap the socket in an iocpdesc_t and bind to IOCP
+    pni_configure_sock_2(fd); // Set non-blocking, TCP_NODELAY, etc.
+    pc->psocket.iocpd = pni_iocpdesc_create(proactor->iocp, fd);
+    if (!pc->psocket.iocpd) {
+        PN_LOG_DEFAULT(PN_SUBSYSTEM_EVENT, PN_LEVEL_ERROR, 
"pn_proactor_import_socket: failed to create iocpdesc");
+        return;
+    }
+    pc->psocket.iocpd->active_completer = &pc->psocket;
+    wakeup(&pc->psocket);
+    // Mark as not closing, not connecting
+    pc->psocket.iocpd->write_closed = false;
+    pc->psocket.iocpd->read_closed = false;
+    pc->connecting = false;
+
+    // Step 4: Bind the socket to the IOCP completion port and start IO
+    pni_iocpdesc_start(pc->psocket.iocpd);
+
+    // Step 5: Set local/remote addresses for the connection
+    socklen_t len = sizeof(pc->local.ss);
+    getsockname(fd, (struct sockaddr*)&pc->local.ss, &len);
+    len = sizeof(pc->remote.ss);
+    getpeername(fd, (struct sockaddr*)&pc->remote.ss, &len);
+
+    // Step 6: Auto-open the connection
+    pn_connection_open(pc->driver.connection);
+}
+
 void pn_proactor_release_connection(pn_connection_t *c) {
   bool notify = false;
   pconnection_t *pc = get_pconnection(c);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to