> > - let connections = > > proxmox_backup::tools::async_io::HyperAccept(connections); > > + let connections = accept_connections(listener, acceptor); > > + let connections = > > hyper::server::accept::from_stream(connections); > > If we move the `from_stream` into the function below...
I have tried to do that for 2 hours, then gave up.... So please tell m e how to make that work! > > > > Ok(ready > > - .and_then(|_| hyper::Server::builder(connections) > > + .and_then(|_| hyper::Server::builder(connections) > > .serve(rest_server) > > .with_graceful_shutdown(server::shutdown_future()) > > .map_err(Error::from) > > @@ -170,6 +157,66 @@ async fn run() -> Result<(), Error> { > > Ok(()) > > } > > > > +fn accept_connections( > > + mut listener: tokio::net::TcpListener, > > + acceptor: Arc<openssl::ssl::SslAcceptor>, > > +) -> > > tokio::sync::mpsc::Receiver<Result<tokio_openssl::SslStream<tokio::net::TcpStream>, > > Error>> { > > ... then this could probably be shortened to > > ) -> impl Accept { > > shortens the line by 80 ;-) > > > + > > + let (sender, receiver) = tokio::sync::mpsc::channel(100); > > + > > + let accept_counter = Arc::new(AtomicUsize::new(0)); > > + > > + const MAX_PENDING_ACCEPTS: usize = 100; > > + > > + tokio::spawn(async move { > > + loop { > > + match listener.accept().await { > > + Err(err) => { > > + eprintln!("error accepting tcp connection: {}", err); > > + } > > + Ok((sock, _addr)) => { > > + sock.set_nodelay(true).unwrap(); > > + let _ = set_tcp_keepalive(sock.as_raw_fd(), > > PROXMOX_BACKUP_TCP_KEEPALIVE_TIME); > > + let acceptor = Arc::clone(&acceptor); > > + let mut sender = sender.clone(); > > + > > + if accept_counter.load(Ordering::SeqCst) > > > MAX_PENDING_ACCEPTS { > > + eprintln!("connection rejected - to many open > > connections"); > > + continue; > > + } > > + accept_counter.fetch_add(1, Ordering::SeqCst); > > We should think about making a counter guard for this sort of thing, > because from this point onward we're not allowed to use `?` anywhere, > which is quite annoying. yes > > > + > > + let accept_counter = accept_counter.clone(); > > + tokio::spawn(async move { > > + let accept_future = tokio::time::timeout( > > + Duration::new(10, 0), > > tokio_openssl::accept(&acceptor, sock)); > > + > > + let result = accept_future.await; > > + > > + match result { > > + Ok(Ok(connection)) => { > > + if let Err(_) = > > sender.send(Ok(connection)).await { > > + eprintln!("detect closed connection > > channel"); > > + } > > + } > > + Ok(Err(err)) => { > > + eprintln!("https handshakeX failed - {}", > > err); > > + } > > + Err(_) => { > > + eprintln!("https handshake timeout"); > > + } > > + } > > which is why I'd rather thave the part above in its own `async fn` > followed by the `fetch_sub` below, followed by the `eprintln!()`s. > > > + > > + accept_counter.fetch_sub(1, Ordering::SeqCst); > > + }); > > + } > > + } > > + } > > + }); > > + > > + receiver > > +} > > + > > fn start_stat_generator() { > > let abort_future = server::shutdown_future(); > > let future = Box::pin(run_stat_generator()); > > -- > > 2.20.1 _______________________________________________ pve-devel mailing list pve-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pve-devel