Hello,

The attached example opens a forwarding tunnel, waits for a client to connect 
to a local socket and joins the two together using the connector API. This is 
working fine until the remote server requests a key re-exchange after a 
gigabyte of data has been transferred via the channel. Then the poll loop 
deadlocks.

The problem seems to be that the handling of the key re-exchange is initiated 
from the poll callback. Before the callback the events of the file descriptor 
are cleared to prevent race conditions on the poll. Then with the events still 
cleared the key re-exchange handler waits for the remote server to transmit its 
new keys. But since the events for the file descriptor are still empty poll 
will never signal that a new message can be read from the file descriptor.

Please find a small example program attached that reproduces the problem. A 
test setup could look like the following:

# Listen on port 10000
$ nc -kl 10000 | pv -b > /dev/null

# Run the example program
$ ./examples/tunnel

# Send data to port 55555
$ dd if=/dev/zero bs=10M | pv -b | nc 127.0.0.1 55555

I also attached (the tail of) an strace capture of what is happening and the 
patch I applied to libssh to get a few more prints along the way.

I started looking into how to potentially fix the problem but so far it's not 
clear to me how the interaction between the key re-exchange handlers and the 
poll callbacks was designed to work. Would be great to get some pointers how a 
fix should look like.

Best regards,
Moritz



Moritz Pflanzer
Senior Software Development Engineer

[cid:image001.png@01D3BF6E.185110B0]

Hexagon Geosystems Services AG
Räffelstrasse 24, 8045 Zürich
Lift A, 4. Stock
E: 
moritz.pflan...@hexagon.com<mailto:moritz.pflan...@hexagon.com><mailto:carolin.sieb...@hexagon.com>
W: http://www.hexagongeosystems.com
diff --git a/src/channels.c b/src/channels.c
index 0dc4260d..7e06d7bb 100644
--- a/src/channels.c
+++ b/src/channels.c
@@ -1469,11 +1469,16 @@ static int channel_write_common(ssh_channel channel,
     return SSH_ERROR;
   }
 
+    SSH_LOG(SSH_LOG_PACKET, "before wait session");
   if (ssh_waitsession_unblocked(session) == 0){
+    SSH_LOG(SSH_LOG_PACKET, "not unblocked");
     rc = ssh_handle_packets_termination(session, SSH_TIMEOUT_DEFAULT,
             ssh_waitsession_unblocked, session);
-    if (rc == SSH_ERROR || !ssh_waitsession_unblocked(session))
+    SSH_LOG(SSH_LOG_PACKET, "handle termination");
+    if (rc == SSH_ERROR || !ssh_waitsession_unblocked(session)) {
+      SSH_LOG(SSH_LOG_PACKET, "goto out");
         goto out;
+    }
   }
   while (len > 0) {
     if (channel->remote_window < len) {
diff --git a/src/poll.c b/src/poll.c
index 82c9b18b..810d3e74 100644
--- a/src/poll.c
+++ b/src/poll.c
@@ -410,8 +410,10 @@ short ssh_poll_get_events(ssh_poll_handle p) {
  */
 void ssh_poll_set_events(ssh_poll_handle p, short events) {
   p->events = events;
+  fprintf(stderr, "Set events = %u for fd = %d\n", p->events, p->x.fd);
   if (p->ctx != NULL && !p->lock) {
     p->ctx->pollfds[p->x.idx].events = events;
+  fprintf(stderr, "Set events2 = %u for fd = %d\n", p->events, p->x.fd);
   }
 }
 
@@ -585,6 +587,7 @@ int ssh_poll_ctx_add(ssh_poll_ctx ctx, ssh_poll_handle p) {
   p->x.idx = ctx->polls_used++;
   ctx->pollptrs[p->x.idx] = p;
   ctx->pollfds[p->x.idx].fd = fd;
+  fprintf(stderr, "Init events = %u for fd = %d\n", p->events, p->x.fd);
   ctx->pollfds[p->x.idx].events = p->events;
   ctx->pollfds[p->x.idx].revents = 0;
   p->ctx = ctx;
@@ -695,9 +698,11 @@ int ssh_poll_ctx_dopoll(ssh_poll_ctx ctx, int timeout)
             fd = ctx->pollfds[i].fd;
             revents = ctx->pollfds[i].revents;
             /* avoid having any event caught during callback */
+            fprintf(stderr, "Clear events = %u for fd = %d\n", p->events, fd);
             ctx->pollfds[i].events = 0;
             p->lock = 1;
             if (p->cb && (ret = p->cb(p, fd, revents, p->cb_data)) < 0) {
+              fprintf(stderr, "CB failed with rc = %d for fd = %d\n", ret, fd);
                 if (ret == -2) {
                     return -1;
                 }
@@ -706,6 +711,8 @@ int ssh_poll_ctx_dopoll(ssh_poll_ctx ctx, int timeout)
                 i = 0;
             } else {
                 ctx->pollfds[i].revents = 0;
+
+                fprintf(stderr, "Restore events = %u for fd = %d\n", p->events, fd);
                 ctx->pollfds[i].events = p->events;
                 p->lock = 0;
                 i++;
commit cd8ad959aff0d081cf3f2b3c75ed2c93b7908660
Author: Moritz Pflanzer <moritz.pflan...@hexagon.com>
Date:   Sat Apr 30 19:19:46 2022 +0200

    Tunnel example

diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt
index 683ac6ca..1e006d85 100644
--- a/examples/CMakeLists.txt
+++ b/examples/CMakeLists.txt
@@ -90,6 +90,9 @@ target_link_libraries(keygen ssh::ssh)
 
 find_package(Threads REQUIRED)
 
+add_executable(tunnel tunnel.cpp)
+target_link_libraries(tunnel ssh::ssh Threads::Threads)
+
 add_executable(libsshpp libsshpp.cpp)
 target_link_libraries(libsshpp ssh::ssh Threads::Threads)
 #target_compile_options(libsshpp PUBLIC -fsanitize=address)
diff --git a/examples/tunnel.cpp b/examples/tunnel.cpp
new file mode 100644
index 00000000..6c47443a
--- /dev/null
+++ b/examples/tunnel.cpp
@@ -0,0 +1,158 @@
+#include <iostream>
+#include <string>
+
+extern "C" {
+#include <libssh/libssh.h>
+#include <poll.h>
+}
+
+std::string host = "localhost";
+int port = 22;
+
+int remote_port = 10000;
+int local_port = 55555;
+
+ssh_event event = nullptr;
+ssh_session session = nullptr;
+ssh_channel forward_channel = nullptr;
+ssh_connector connector_in = nullptr;
+ssh_connector connector_out = nullptr;
+
+int forward_cb(socket_t fd, int revents, void *userdata) {
+  std::cerr << "Got events for fd " << fd << ": " << revents << "\n";
+
+  if (revents & POLLIN) {
+    struct sockaddr_in client_sin;
+    socklen_t client_sin_len = sizeof(client_sin);
+    int client_fd = accept(fd, reinterpret_cast<struct sockaddr *>(&client_sin), &client_sin_len);
+    if (client_fd < 0) {
+      std::cerr << "Failed to accept\n";
+      throw false;
+    }
+
+    forward_channel = ssh_channel_new(session);
+
+    if (forward_channel == nullptr) {
+      throw false;
+    }
+
+    for (;;) {
+      if (int rc = ssh_channel_open_forward(forward_channel, host.c_str(), remote_port, "localhost", local_port); rc == SSH_AGAIN) {
+        ssh_event_dopoll(event, 1000);
+        continue;
+      } else if (rc != SSH_OK) {
+        std::cerr << "Forward tunnel failed (rc = " << rc << ")\n";
+        throw std::runtime_error(ssh_get_error(session));
+      }
+
+      break;
+    }
+
+    connector_in = ssh_connector_new(session);
+    ssh_connector_set_out_channel(connector_in, forward_channel, SSH_CONNECTOR_STDINOUT);
+    ssh_connector_set_in_fd(connector_in, client_fd);
+    ssh_event_add_connector(event, connector_in);
+
+    connector_out = ssh_connector_new(session);
+    ssh_connector_set_out_fd(connector_out, client_fd);
+    ssh_connector_set_in_channel(connector_out, forward_channel, SSH_CONNECTOR_STDINOUT);
+    ssh_event_add_connector(event, connector_out);
+  }
+
+  std::cerr << "Handled events\n";
+
+  return SSH_OK;
+}
+
+int main() {
+
+  event = ssh_event_new();
+
+  if (event == nullptr) {
+    throw std::runtime_error("Failed to create event");
+  }
+
+  session = ssh_new();
+
+  if (session == nullptr) {
+    throw std::runtime_error("Failed to create session");
+  }
+
+  if (ssh_options_set(session, SSH_OPTIONS_HOST, host.c_str()) < 0) {
+    throw std::runtime_error("Failed to set host option");
+  }
+
+  if (ssh_options_set(session, SSH_OPTIONS_PORT, &port) < 0) {
+    throw std::runtime_error("Failed to set port option");
+  }
+
+  //ssh_set_blocking(session, 0);
+
+  for (;;) {
+    std::cerr << "Trying to connect: " << session << "\n";
+    if (int rc = ssh_connect(session); rc == SSH_AGAIN) {
+      //std::cerr << "AGAIN CONNECT\n";
+      ssh_event_dopoll(event, 1000);
+      continue;
+    } else if (rc != SSH_OK) {
+      std::cerr << "NOT OK\n";
+      throw std::runtime_error(ssh_get_error(session));
+    }
+
+    break;
+  }
+
+  std::cerr << "After ssh_connect\n";
+
+  int rc = 0;
+
+  do {
+    //std::cerr << "AGAIN KEY\n";
+    rc = ssh_userauth_publickey_auto(session, NULL, NULL);
+    ssh_event_dopoll(event, 1000);
+  } while(rc == SSH_AUTH_AGAIN);
+
+  if (rc == SSH_AUTH_SUCCESS) {
+    std::cerr << "After ssh_userauth\n";
+  } else {
+    std::cerr << "NOT OK\n";
+    throw std::runtime_error(ssh_get_error(session));
+  }
+
+  int socket_fd = socket(AF_INET, SOCK_STREAM, 0);
+
+  if (socket_fd < 0) {
+    throw false;
+  }
+
+  struct sockaddr_in sin;
+  sin.sin_family = AF_INET;
+  sin.sin_addr.s_addr = htonl(INADDR_ANY);
+  sin.sin_port = htons(local_port);
+
+  if (int rc = bind(socket_fd, reinterpret_cast<struct sockaddr *>(&sin), sizeof sin); rc < 0) {
+    std::cerr << "Failed to bind to socket\n";
+    throw false;
+  }
+
+  socklen_t sin_len = sizeof(sin);
+  if (getsockname(socket_fd, reinterpret_cast<struct sockaddr *>(&sin), &sin_len) < 0) {
+    std::cerr << "Failed to get socket name\n";
+    throw false;
+  }
+
+  std::cerr << "Port: " << ntohs(sin.sin_port) << "\n";
+
+  if (int rc = listen(socket_fd, 1); rc < 0) {
+    std::cerr << "Listen failed\n";
+    throw false;
+  }
+
+  if (ssh_event_add_fd(event, socket_fd, POLLIN, forward_cb, nullptr) != SSH_OK) {
+    throw false;
+  }
+
+  for(;;) {
+    ssh_event_dopoll(event, 1000);
+  }
+}
poll([{fd=6, events=POLLIN}, {fd=7, events=POLLIN|POLLERR}, {fd=4, 
events=POLLIN|POLLOUT}, {fd=7, events=POLLERR}], 4, 1000) = 2 ([{fd=7, 
revents=POLLIN}, {fd=4, revents=POLLOUT}])
write(2, "Clear events = 9 for fd = 7\n", 28Clear events = 9 for fd = 7
) = 28
write(2, "Set events = 8 for fd = 1\n", 26Set events = 8 for fd = 1
) = 26
write(2, "Restore events = 8 for fd = 7\n", 30Restore events = 8 for fd = 7
) = 30
write(2, "Clear events = 5 for fd = 4\n", 28Clear events = 5 for fd = 4
) = 28
write(2, "Set events = 1 for fd = 2\n", 26Set events = 1 for fd = 2
) = 26
recvfrom(7, 
"\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0"..., 4096, 0, 
NULL, NULL) = 4096
getpid()                                = 22448
sendto(4, 
"S\r\27qD\223\335Z\t\2\325o\256\335\232\352\253\263\276\264O\25Oh\311\364\330\374s\200\267\n"...,
 4132, MSG_NOSIGNAL, NULL, 0) = 4132
write(2, "Set events = 5 for fd = 2\n", 26Set events = 5 for fd = 2
) = 26
write(2, "Set events = 9 for fd = 1\n", 26Set events = 9 for fd = 1
) = 26
write(2, "Set events2 = 9 for fd = 1\n", 27Set events2 = 9 for fd = 1
) = 27
write(2, "Restore events = 5 for fd = 4\n", 30Restore events = 5 for fd = 4
) = 30
poll([{fd=6, events=POLLIN}, {fd=7, events=POLLIN|POLLERR}, {fd=4, 
events=POLLIN|POLLOUT}, {fd=7, events=POLLERR}], 4, 1000) = 2 ([{fd=7, 
revents=POLLIN}, {fd=4, revents=POLLOUT}])
write(2, "Clear events = 9 for fd = 7\n", 28Clear events = 9 for fd = 7
) = 28
write(2, "Set events = 8 for fd = 1\n", 26Set events = 8 for fd = 1
) = 26
write(2, "Restore events = 8 for fd = 7\n", 30Restore events = 8 for fd = 7
) = 30
write(2, "Clear events = 5 for fd = 4\n", 28Clear events = 5 for fd = 4
) = 28
write(2, "Set events = 1 for fd = 2\n", 26Set events = 1 for fd = 2
) = 26
recvfrom(7, 
"\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0"..., 4096, 0, 
NULL, NULL) = 4096
getpid()                                = 22448
sendto(4, 
"\222\223\331\375\225\252p\nm\321\262\340\216\25\370)\235m8!I\340\262_\350\307,\237\214\204\301\34"...,
 4132, MSG_NOSIGNAL, NULL, 0) = 4132
write(2, "Set events = 5 for fd = 2\n", 26Set events = 5 for fd = 2
) = 26
write(2, "Set events = 9 for fd = 1\n", 26Set events = 9 for fd = 1
) = 26
write(2, "Set events2 = 9 for fd = 1\n", 27Set events2 = 9 for fd = 1
) = 27
write(2, "Restore events = 5 for fd = 4\n", 30Restore events = 5 for fd = 4
) = 30
poll([{fd=6, events=POLLIN}, {fd=7, events=POLLIN|POLLERR}, {fd=4, 
events=POLLIN|POLLOUT}, {fd=7, events=POLLERR}], 4, 1000) = 2 ([{fd=7, 
revents=POLLIN}, {fd=4, revents=POLLIN|POLLOUT}])
write(2, "Clear events = 9 for fd = 7\n", 28Clear events = 9 for fd = 7
) = 28
write(2, "Set events = 8 for fd = 1\n", 26Set events = 8 for fd = 1
) = 26
write(2, "Restore events = 8 for fd = 7\n", 30Restore events = 8 for fd = 7
) = 30
write(2, "Clear events = 5 for fd = 4\n", 28Clear events = 5 for fd = 4
) = 28
recvfrom(4, 
"\310u\210?\26:41\215\316\33\273\6\35\236\250\"\\\340l\352\353L\355\277\202\2402\326MU\321"...,
 4096, 0, NULL, NULL) = 1136
getpid()                                = 22448
openat(AT_FDCWD, "/home/p...@lgs-net.com/.ssh/known_hosts", O_RDONLY) = 8
fstat(8, {st_mode=S_IFREG|0600, st_size=53738, ...}) = 0
read(8, "|1|7GoD8gno0X1MhRhWzqUwC9EMLvY=|"..., 4096) = 4096
read(8, "LvKD3L5jkbb3UKRuoDZqIq9ZWyctwd4k"..., 4096) = 4096
read(8, "O6VqNEBxKvJJelCq0dTXWT5pbO2gDXC6"..., 4096) = 4096
read(8, "RGXyBfADo+cPlkLEpiqftRU46eynx81q"..., 4096) = 4096
read(8, "AAABBBMf7NHbfNkAfNIRrfenWXJFoZp6"..., 4096) = 4096
read(8, "chQjIXEmRDJDugfaGBBWnr/3sc=\n|1|O"..., 4096) = 4096
read(8, "BBE7C/CYjru8/GE/3yl5MLWi9wxoDiNN"..., 4096) = 4096
read(8, "G69I3s=\n|1|9lRFsXzUrtM//v5kBHfU0"..., 4096) = 4096
read(8, "YPU9QkRFhYPCsKeyBIJy0Pb9U= ecdsa"..., 4096) = 4096
read(8, "k5pDzsl4g==\n|1|8ttE1WsYtxprKwLr9"..., 4096) = 4096
read(8, "AAABBBOvNNNEYQKESjz/aXEsCps79oqa"..., 4096) = 4096
read(8, "DbPcZEP+BUNGmLiaFP+TA0zFNuX6V5TX"..., 4096) = 4096
read(8, "HNhLXNoYTItbmlzdHAyNTYAAAAIbmlzd"..., 4096) = 4096
read(8, "CjS2UHJdpkOh5POPYojWbbyt+AfYXXhG"..., 4096) = 490
read(8, "", 4096)                       = 0
close(8)                                = 0
openat(AT_FDCWD, "/etc/ssh/ssh_known_hosts", O_RDONLY) = -1 ENOENT (No such 
file or directory)
getpid()                                = 22448
write(2, "Set events = 5 for fd = 2\n", 26Set events = 5 for fd = 2
) = 26
getpid()                                = 22448
getpid()                                = 22448
write(2, "Set events = 5 for fd = 2\n", 26Set events = 5 for fd = 2
) = 26
sendto(4, 
".r\210}7Y[\265\26\235\270\27*FSmTi\333-\340&\321\245[\277\34\31\203p\270\211"...,
 928, MSG_NOSIGNAL, NULL, 0) = 928
write(2, "Set events = 5 for fd = 2\n", 26Set events = 5 for fd = 2
) = 26
write(2, "Restore events = 5 for fd = 4\n", 30Restore events = 5 for fd = 4
) = 30
poll([{fd=6, events=POLLIN}, {fd=7, events=POLLERR}, {fd=4, 
events=POLLIN|POLLOUT}, {fd=7, events=POLLERR}], 4, 1000) = 1 ([{fd=4, 
revents=POLLOUT}])
write(2, "Clear events = 5 for fd = 4\n", 28Clear events = 5 for fd = 4
) = 28
write(2, "Set events = 1 for fd = 2\n", 26Set events = 1 for fd = 2
) = 26
recvfrom(7, 
"\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0"..., 4096, 0, 
NULL, NULL) = 4096
write(2, "Set events = 1 for fd = 2\n", 26Set events = 1 for fd = 2
) = 26
poll([{fd=6, events=POLLIN}, {fd=7, events=POLLERR}, {fd=4, events=0}, {fd=7, 
events=POLLERR}], 4, -1strace: Process 22448 detached
 <detached ...>

Reply via email to