This is an automated email from the ASF dual-hosted git repository.
maskit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git
The following commit(s) were added to refs/heads/master by this push:
new b9779fd8a4 Refactor the read and write functions in Unix and SSL NetVC
(#11794)
b9779fd8a4 is described below
commit b9779fd8a4ece23783fff59f96b725979b7d340d
Author: Masakazu Kitajo <[email protected]>
AuthorDate: Mon Nov 4 14:30:07 2024 -0700
Refactor the read and write functions in Unix and SSL NetVC (#11794)
---
include/iocore/net/NetEvent.h | 6 +-
src/iocore/net/NetHandler.cc | 4 +-
src/iocore/net/P_QUICNetVConnection.h | 2 +-
src/iocore/net/P_SSLNetVConnection.h | 4 +-
src/iocore/net/P_UnixNetVConnection.h | 14 +-
src/iocore/net/QUICNetVConnection.cc | 6 +-
src/iocore/net/SSLNetVConnection.cc | 16 +-
src/iocore/net/UnixNetVConnection.cc | 998 +++++++++++++++++-----------------
8 files changed, 508 insertions(+), 542 deletions(-)
diff --git a/include/iocore/net/NetEvent.h b/include/iocore/net/NetEvent.h
index 08b2ff8c8a..35c7dabd22 100644
--- a/include/iocore/net/NetEvent.h
+++ b/include/iocore/net/NetEvent.h
@@ -55,9 +55,9 @@ class NetEvent
public:
NetEvent() = default;
virtual ~NetEvent() {}
- virtual void net_read_io(NetHandler *nh, EThread *lthread) = 0;
- virtual void net_write_io(NetHandler *nh, EThread *lthread) = 0;
- virtual void free_thread(EThread *t) = 0;
+ virtual void net_read_io(NetHandler *nh) = 0;
+ virtual void net_write_io(NetHandler *nh) = 0;
+ virtual void free_thread(EThread *t) = 0;
// since we want this class to be independent from VConnection,
Continutaion. There should be
// a pure virtual function which connect sub class and NetHandler.
diff --git a/src/iocore/net/NetHandler.cc b/src/iocore/net/NetHandler.cc
index 208728636a..0f7d519d2b 100644
--- a/src/iocore/net/NetHandler.cc
+++ b/src/iocore/net/NetHandler.cc
@@ -283,7 +283,7 @@ NetHandler::process_ready_list()
if (ne->closed) {
free_netevent(ne);
} else if (ne->read.enabled && ne->read.triggered) {
- ne->net_read_io(this, this->thread);
+ ne->net_read_io(this);
} else if (!ne->read.enabled) {
read_ready_list.remove(ne);
}
@@ -293,7 +293,7 @@ NetHandler::process_ready_list()
if (ne->closed) {
free_netevent(ne);
} else if (ne->write.enabled && ne->write.triggered) {
- ne->net_write_io(this, this->thread);
+ ne->net_write_io(this);
} else if (!ne->write.enabled) {
write_ready_list.remove(ne);
}
diff --git a/src/iocore/net/P_QUICNetVConnection.h
b/src/iocore/net/P_QUICNetVConnection.h
index 9797c24984..353d3aed22 100644
--- a/src/iocore/net/P_QUICNetVConnection.h
+++ b/src/iocore/net/P_QUICNetVConnection.h
@@ -108,7 +108,7 @@ public:
bool getSSLHandShakeComplete() const override;
// NetEvent
- virtual void net_read_io(NetHandler *nh, EThread *lthread) override;
+ virtual void net_read_io(NetHandler *nh) override;
// NetVConnection
int populate_protocol(std::string_view *results, int n) const
override;
diff --git a/src/iocore/net/P_SSLNetVConnection.h
b/src/iocore/net/P_SSLNetVConnection.h
index 9b437a6d20..2babaeb0d6 100644
--- a/src/iocore/net/P_SSLNetVConnection.h
+++ b/src/iocore/net/P_SSLNetVConnection.h
@@ -140,7 +140,7 @@ public:
int sslServerHandShakeEvent(int &err);
int sslClientHandShakeEvent(int &err);
- void net_read_io(NetHandler *nh, EThread *lthread) override;
+ void net_read_io(NetHandler *nh) override;
int64_t load_buffer_and_write(int64_t towrite, MIOBufferAccessor &buf,
int64_t &total_written, int &needs) override;
void do_io_close(int lerrno = -1) override;
@@ -376,7 +376,7 @@ private:
UnixNetVConnection *_migrateFromSSL();
void _propagateHandShakeBuffer(UnixNetVConnection *target,
EThread *t);
- int _ssl_read_from_net(EThread *lthread, int64_t &ret);
+ int _ssl_read_from_net(int64_t &ret);
ssl_error_t _ssl_read_buffer(void *buf, int64_t nbytes, int64_t &nread);
ssl_error_t _ssl_write_buffer(const void *buf, int64_t nbytes, int64_t
&nwritten);
ssl_error_t _ssl_connect();
diff --git a/src/iocore/net/P_UnixNetVConnection.h
b/src/iocore/net/P_UnixNetVConnection.h
index a6ce3ab89e..5ff97aee16 100644
--- a/src/iocore/net/P_UnixNetVConnection.h
+++ b/src/iocore/net/P_UnixNetVConnection.h
@@ -150,8 +150,8 @@ public:
}
// NetEvent
- virtual void net_read_io(NetHandler *nh, EThread *lthread) override;
- virtual void net_write_io(NetHandler *nh, EThread *lthread) override;
+ virtual void net_read_io(NetHandler *nh) override;
+ virtual void net_write_io(NetHandler *nh) override;
virtual void free_thread(EThread *t) override;
virtual int
close() override
@@ -195,7 +195,7 @@ public:
int readSignalAndUpdate(int event);
void readReschedule(NetHandler *nh);
void writeReschedule(NetHandler *nh);
- void netActivity(EThread *lthread);
+ void netActivity();
/**
* If the current object's thread does not match the t argument, create a new
* NetVC in the thread t context based on the socket and ssl information in
the
@@ -234,8 +234,6 @@ public:
int set_tcp_congestion_control(int side) override;
void apply_options() override;
- friend void write_to_net_io(NetHandler *, UnixNetVConnection *, EThread *);
-
// set_context() should be called before calling this member function.
void mark_as_tunnel_endpoint() override;
@@ -376,9 +374,3 @@ UnixNetVConnection::get_action() const
{
return &action_;
}
-
-// declarations for local use (within the net module)
-
-void write_to_net(NetHandler *nh, UnixNetVConnection *vc, EThread *thread);
-void write_to_net_io(NetHandler *nh, UnixNetVConnection *vc, EThread *thread);
-void net_activity(UnixNetVConnection *vc, EThread *thread);
diff --git a/src/iocore/net/QUICNetVConnection.cc
b/src/iocore/net/QUICNetVConnection.cc
index 2fd225df24..5154c34950 100644
--- a/src/iocore/net/QUICNetVConnection.cc
+++ b/src/iocore/net/QUICNetVConnection.cc
@@ -401,7 +401,7 @@ QUICNetVConnection::handle_received_packet(UDPPacket
*packet)
{
size_t buf_len{0};
uint8_t *buf = packet->get_entire_chain_buffer(&buf_len);
- net_activity(this, this_ethread());
+ this->netActivity();
quiche_recv_info recv_info = {
&packet->from.sa,
static_cast<socklen_t>(packet->from.isIp4() ? sizeof(packet->from.sin) :
sizeof(packet->from.sin6)),
@@ -522,7 +522,7 @@ QUICNetVConnection::is_handshake_completed() const
}
void
-QUICNetVConnection::net_read_io(NetHandler * /* nh ATS_UNUSED */, EThread * /*
lthread ATS_UNUSED */)
+QUICNetVConnection::net_read_io(NetHandler * /* nh ATS_UNUSED */)
{
SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread());
this->handleEvent(QUIC_EVENT_PACKET_READ_READY, nullptr);
@@ -714,7 +714,7 @@ QUICNetVConnection::_handle_write_ready()
segment_size = max_udp_payload_size;
}
this->_packet_handler->send_packet(this->_udp_con, this->con.addr,
udp_payload, segment_size, &send_at_hint);
- net_activity(this, this_ethread());
+ this->netActivity();
}
}
diff --git a/src/iocore/net/SSLNetVConnection.cc
b/src/iocore/net/SSLNetVConnection.cc
index d49c79ec28..88805cf32e 100644
--- a/src/iocore/net/SSLNetVConnection.cc
+++ b/src/iocore/net/SSLNetVConnection.cc
@@ -178,7 +178,7 @@ debug_certificate_name(const char *msg, X509_NAME *name)
}
int
-SSLNetVConnection::_ssl_read_from_net(EThread *lthread, int64_t &ret)
+SSLNetVConnection::_ssl_read_from_net(int64_t &ret)
{
NetState *s = &this->read;
MIOBufferAccessor &buf = s->vio.buffer;
@@ -221,7 +221,7 @@ SSLNetVConnection::_ssl_read_from_net(EThread *lthread,
int64_t &ret)
bytes_read += nread;
if (nread > 0) {
buf.writer()->fill(nread); // Tell the buffer, we've used the bytes
- this->netActivity(lthread);
+ this->netActivity();
}
break;
case SSL_ERROR_WANT_WRITE:
@@ -275,7 +275,7 @@ SSLNetVConnection::_ssl_read_from_net(EThread *lthread,
int64_t &ret)
Dbg(dbg_ctl_ssl, "bytes_read=%" PRId64, bytes_read);
s->vio.ndone += bytes_read;
- this->netActivity(lthread);
+ this->netActivity();
ret = bytes_read;
@@ -454,7 +454,7 @@ SSLNetVConnection::update_rbio(bool move_to_socket)
// changed by YTS Team, yamsat
void
-SSLNetVConnection::net_read_io(NetHandler *nh, EThread *lthread)
+SSLNetVConnection::net_read_io(NetHandler *nh)
{
int ret;
int64_t r = 0;
@@ -462,11 +462,11 @@ SSLNetVConnection::net_read_io(NetHandler *nh, EThread
*lthread)
NetState *s = &this->read;
if (HttpProxyPort::TRANSPORT_BLIND_TUNNEL == this->attributes) {
- this->super::net_read_io(nh, lthread);
+ this->super::net_read_io(nh);
return;
}
- MUTEX_TRY_LOCK(lock, s->vio.mutex, lthread);
+ MUTEX_TRY_LOCK(lock, s->vio.mutex, nh->thread);
if (!lock.is_locked()) {
readReschedule(nh);
return;
@@ -475,7 +475,7 @@ SSLNetVConnection::net_read_io(NetHandler *nh, EThread
*lthread)
// The closed flag should be stable once we get the s->vio.mutex in that case
// (the global session pool mutex).
if (this->closed) {
- this->super::net_read_io(nh, lthread);
+ this->super::net_read_io(nh);
return;
}
// If the key renegotiation failed it's over, just signal the error and
finish.
@@ -614,7 +614,7 @@ SSLNetVConnection::net_read_io(NetHandler *nh, EThread
*lthread)
// this comment if you know
int ssl_read_errno = 0;
do {
- ret = this->_ssl_read_from_net(lthread, r);
+ ret = this->_ssl_read_from_net(r);
if (ret == SSL_READ_READY || ret == SSL_READ_ERROR_NONE) {
bytes += r;
}
diff --git a/src/iocore/net/UnixNetVConnection.cc
b/src/iocore/net/UnixNetVConnection.cc
index 46b568999b..6bd0331d54 100644
--- a/src/iocore/net/UnixNetVConnection.cc
+++ b/src/iocore/net/UnixNetVConnection.cc
@@ -75,18 +75,6 @@ write_reschedule(NetHandler *nh, UnixNetVConnection *vc)
}
}
-void
-net_activity(UnixNetVConnection *vc, EThread *thread)
-{
- Dbg(dbg_ctl_socket, "net_activity updating inactivity %" PRId64 ",
NetVC=%p", vc->inactivity_timeout_in, vc);
- (void)thread;
- if (vc->inactivity_timeout_in) {
- vc->next_inactivity_timeout_at = ink_get_hrtime() +
vc->inactivity_timeout_in;
- } else {
- vc->next_inactivity_timeout_at = 0;
- }
-}
-
//
// Signal an event
//
@@ -188,394 +176,12 @@ read_signal_error(NetHandler *nh, UnixNetVConnection
*vc, int lerrno)
vc->lerrno = lerrno;
return read_signal_done(VC_EVENT_ERROR, nh, vc);
}
-
-static inline int
-write_signal_error(NetHandler *nh, UnixNetVConnection *vc, int lerrno)
-{
- vc->lerrno = lerrno;
- return write_signal_done(VC_EVENT_ERROR, nh, vc);
-}
-
-// Read the data for a UnixNetVConnection.
-// Rescheduling the UnixNetVConnection by moving the VC
-// onto or off of the ready_list.
-// Had to wrap this function with net_read_io for SSL.
-static void
-read_from_net(NetHandler *nh, UnixNetVConnection *vc, EThread *thread)
-{
- NetState *s = &vc->read;
- int64_t r = 0;
-
- MUTEX_TRY_LOCK(lock, s->vio.mutex, thread);
-
- if (!lock.is_locked()) {
- read_reschedule(nh, vc);
- return;
- }
-
- // It is possible that the closed flag got set from HttpSessionManager in the
- // global session pool case. If so, the closed flag should be stable once
we get the
- // s->vio.mutex (the global session pool mutex).
- if (vc->closed) {
- vc->nh->free_netevent(vc);
- return;
- }
- // if it is not enabled.
- if (!s->enabled || s->vio.op != VIO::READ || s->vio.is_disabled()) {
- read_disable(nh, vc);
- return;
- }
-
- MIOBufferAccessor &buf = s->vio.buffer;
- ink_assert(buf.writer());
-
- // if there is nothing to do, disable connection
- int64_t ntodo = s->vio.ntodo();
- if (ntodo <= 0) {
- read_disable(nh, vc);
- return;
- }
- int64_t toread = buf.writer()->write_avail();
- if (toread > ntodo) {
- toread = ntodo;
- }
-
- // read data
- int64_t rattempted = 0, total_read = 0;
- unsigned niov = 0;
- IOVec tiovec[NET_MAX_IOV];
- if (toread) {
- IOBufferBlock *b = buf.writer()->first_write_block();
- do {
- niov = 0;
- rattempted = 0;
- while (b && niov < NET_MAX_IOV) {
- int64_t a = b->write_avail();
- if (a > 0) {
- tiovec[niov].iov_base = b->_end;
- int64_t togo = toread - total_read - rattempted;
- if (a > togo) {
- a = togo;
- }
- tiovec[niov].iov_len = a;
- rattempted += a;
- niov++;
- if (a >= togo) {
- break;
- }
- }
- b = b->next.get();
- }
-
- ink_assert(niov > 0);
- ink_assert(niov <= countof(tiovec));
- struct msghdr msg;
-
- ink_zero(msg);
- msg.msg_name = const_cast<sockaddr *>(vc->get_remote_addr());
- msg.msg_namelen = ats_ip_size(vc->get_remote_addr());
- msg.msg_iov = &tiovec[0];
- msg.msg_iovlen = niov;
- r = vc->con.sock.recvmsg(&msg, 0);
-
- Metrics::Counter::increment(net_rsb.calls_to_read);
-
- total_read += rattempted;
- } while (rattempted && r == rattempted && total_read < toread);
-
- // if we have already moved some bytes successfully, summarize in r
- if (total_read != rattempted) {
- if (r <= 0) {
- r = total_read - rattempted;
- } else {
- r = total_read - rattempted + r;
- }
- }
- // check for errors
- if (r <= 0) {
- if (r == -EAGAIN || r == -ENOTCONN) {
- Metrics::Counter::increment(net_rsb.calls_to_read_nodata);
- vc->read.triggered = 0;
- nh->read_ready_list.remove(vc);
- return;
- }
-
- if (!r || r == -ECONNRESET) {
- vc->read.triggered = 0;
- nh->read_ready_list.remove(vc);
- read_signal_done(VC_EVENT_EOS, nh, vc);
- return;
- }
- vc->read.triggered = 0;
- read_signal_error(nh, vc, static_cast<int>(-r));
- return;
- }
- Metrics::Counter::increment(net_rsb.read_bytes, r);
- Metrics::Counter::increment(net_rsb.read_bytes_count);
-
- // Add data to buffer and signal continuation.
- buf.writer()->fill(r);
-#ifdef DEBUG
- if (buf.writer()->write_avail() <= 0) {
- Dbg(dbg_ctl_iocore_net, "read_from_net, read buffer full");
- }
-#endif
- s->vio.ndone += r;
- net_activity(vc, thread);
- } else {
- r = 0;
- }
-
- // Signal read ready, check if user is not done
- if (r) {
- // If there are no more bytes to read, signal read complete
- ink_assert(ntodo >= 0);
- if (s->vio.ntodo() <= 0) {
- read_signal_done(VC_EVENT_READ_COMPLETE, nh, vc);
- Dbg(dbg_ctl_iocore_net, "read_from_net, read finished - signal done");
- return;
- } else {
- if (read_signal_and_update(VC_EVENT_READ_READY, vc) != EVENT_CONT) {
- return;
- }
-
- // change of lock... don't look at shared variables!
- if (lock.get_mutex() != s->vio.mutex.get()) {
- read_reschedule(nh, vc);
- return;
- }
- }
- }
-
- // If here are is no more room, or nothing to do, disable the connection
- if (s->vio.ntodo() <= 0 || !s->enabled || !buf.writer()->write_avail()) {
- read_disable(nh, vc);
- return;
- }
-
- read_reschedule(nh, vc);
-}
-
-//
-// Write the data for a UnixNetVConnection.
-// Rescheduling the UnixNetVConnection when necessary.
-//
-void
-write_to_net(NetHandler *nh, UnixNetVConnection *vc, EThread *thread)
-{
- Metrics::Counter::increment(net_rsb.calls_to_writetonet);
- write_to_net_io(nh, vc, thread);
-}
-
-void
-write_to_net_io(NetHandler *nh, UnixNetVConnection *vc, EThread *thread)
-{
- NetState *s = &vc->write;
- Continuation *c = vc->write.vio.cont;
-
- MUTEX_TRY_LOCK(lock, s->vio.mutex, thread);
-
- if (!lock.is_locked() || lock.get_mutex() != s->vio.mutex.get()) {
- write_reschedule(nh, vc);
- return;
- }
-
- if (vc->has_error()) {
- vc->lerrno = vc->error;
- write_signal_and_update(VC_EVENT_ERROR, vc);
- return;
- }
-
- // This function will always return true unless
- // vc is an SSLNetVConnection.
- if (!vc->getSSLHandShakeComplete()) {
- if (vc->trackFirstHandshake()) {
- // Eat the first write-ready. Until the TLS handshake is complete,
- // we should still be under the connect timeout and shouldn't bother
- // the state machine until the TLS handshake is complete
- vc->write.triggered = 0;
- nh->write_ready_list.remove(vc);
- }
-
- int err, ret;
-
- if (vc->get_context() == NET_VCONNECTION_OUT) {
- ret = vc->sslStartHandShake(SSL_EVENT_CLIENT, err);
- } else {
- ret = vc->sslStartHandShake(SSL_EVENT_SERVER, err);
- }
-
- if (ret == EVENT_ERROR) {
- vc->write.triggered = 0;
- write_signal_error(nh, vc, err);
- } else if (ret == SSL_HANDSHAKE_WANT_READ || ret ==
SSL_HANDSHAKE_WANT_ACCEPT) {
- vc->read.triggered = 0;
- nh->read_ready_list.remove(vc);
- read_reschedule(nh, vc);
- } else if (ret == SSL_HANDSHAKE_WANT_CONNECT || ret ==
SSL_HANDSHAKE_WANT_WRITE) {
- vc->write.triggered = 0;
- nh->write_ready_list.remove(vc);
- write_reschedule(nh, vc);
- } else if (ret == EVENT_DONE) {
- vc->write.triggered = 1;
- if (vc->write.enabled) {
- nh->write_ready_list.in_or_enqueue(vc);
- }
- // If this was driven by a zero length read, signal complete when
- // the handshake is complete. Otherwise set up for continuing read
- // operations.
- if (s->vio.ntodo() <= 0) {
- vc->readSignalDone(VC_EVENT_WRITE_COMPLETE, nh);
- }
- } else {
- write_reschedule(nh, vc);
- }
-
- return;
- }
-
- // If it is not enabled,add to WaitList.
- if (!s->enabled || s->vio.op != VIO::WRITE) {
- write_disable(nh, vc);
- return;
- }
-
- // If there is nothing to do, disable
- int64_t ntodo = s->vio.ntodo();
- if (ntodo <= 0) {
- write_disable(nh, vc);
- return;
- }
-
- MIOBufferAccessor &buf = s->vio.buffer;
- ink_assert(buf.writer());
-
- // Calculate the amount to write.
- int64_t towrite = buf.reader()->read_avail();
- if (towrite > ntodo) {
- towrite = ntodo;
- }
-
- int signalled = 0;
-
- // signal write ready to allow user to fill the buffer
- if (towrite != ntodo && !buf.writer()->high_water()) {
- if (write_signal_and_update(VC_EVENT_WRITE_READY, vc) != EVENT_CONT) {
- return;
- } else if (c != s->vio.cont) { /* The write vio was updated in the handler
*/
- write_reschedule(nh, vc);
- return;
- }
-
- ntodo = s->vio.ntodo();
- if (ntodo <= 0) {
- write_disable(nh, vc);
- return;
- }
-
- signalled = 1;
-
- // Recalculate amount to write
- towrite = buf.reader()->read_avail();
- if (towrite > ntodo) {
- towrite = ntodo;
- }
- }
-
- // if there is nothing to do, disable
- ink_assert(towrite >= 0);
- if (towrite <= 0) {
- write_disable(nh, vc);
- return;
- }
-
- int needs = 0;
- int64_t total_written = 0;
- int64_t r = vc->load_buffer_and_write(towrite, buf,
total_written, needs);
-
- if (total_written > 0) {
- Metrics::Counter::increment(net_rsb.write_bytes, total_written);
- Metrics::Counter::increment(net_rsb.write_bytes_count);
- s->vio.ndone += total_written;
- net_activity(vc, thread);
- }
-
- // A write of 0 makes no sense since we tried to write more than 0.
- ink_assert(r != 0);
- // Either we wrote something or got an error.
- // check for errors
- if (r < 0) { // if the socket was not ready, add to WaitList
- if (r == -EAGAIN || r == -ENOTCONN || -r == EINPROGRESS) {
- Metrics::Counter::increment(net_rsb.calls_to_write_nodata);
- if ((needs & EVENTIO_WRITE) == EVENTIO_WRITE) {
- vc->write.triggered = 0;
- nh->write_ready_list.remove(vc);
- write_reschedule(nh, vc);
- }
-
- if ((needs & EVENTIO_READ) == EVENTIO_READ) {
- vc->read.triggered = 0;
- nh->read_ready_list.remove(vc);
- read_reschedule(nh, vc);
- }
-
- return;
- }
-
- vc->write.triggered = 0;
- write_signal_error(nh, vc, static_cast<int>(-r));
- return;
- } else { // Wrote data. Finished
without error
- int wbe_event = vc->write_buffer_empty_event; // save so we can clear if
needed.
-
- // If the empty write buffer trap is set, clear it.
- if (!(buf.reader()->is_read_avail_more_than(0))) {
- vc->write_buffer_empty_event = 0;
- }
-
- // If there are no more bytes to write, signal write complete,
- ink_assert(ntodo >= 0);
- if (s->vio.ntodo() <= 0) {
- write_signal_done(VC_EVENT_WRITE_COMPLETE, nh, vc);
- return;
- }
-
- int e = 0;
- if (!signalled || (s->vio.ntodo() > 0 && !buf.writer()->high_water())) {
- e = VC_EVENT_WRITE_READY;
- } else if (wbe_event != vc->write_buffer_empty_event) {
- // @a signalled means we won't send an event, and the event values
differing means we
- // had a write buffer trap and cleared it, so we need to send it now.
- e = wbe_event;
- }
-
- if (e) {
- if (write_signal_and_update(e, vc) != EVENT_CONT) {
- return;
- }
-
- // change of lock... don't look at shared variables!
- if (lock.get_mutex() != s->vio.mutex.get()) {
- write_reschedule(nh, vc);
- return;
- }
- }
-
- if ((needs & EVENTIO_READ) == EVENTIO_READ) {
- read_reschedule(nh, vc);
- }
-
- if (!(buf.reader()->is_read_avail_more_than(0))) {
- write_disable(nh, vc);
- return;
- }
-
- if ((needs & EVENTIO_WRITE) == EVENTIO_WRITE) {
- write_reschedule(nh, vc);
- }
-
- return;
- }
+
+static inline int
+write_signal_error(NetHandler *nh, UnixNetVConnection *vc, int lerrno)
+{
+ vc->lerrno = lerrno;
+ return write_signal_done(VC_EVENT_ERROR, nh, vc);
}
bool
@@ -739,141 +345,504 @@ UnixNetVConnection::do_io_shutdown(ShutdownHowTo_t
howto)
}
//
-// Function used to reenable the VC for reading or
-// writing.
+// Function used to reenable the VC for reading or
+// writing.
+//
+void
+UnixNetVConnection::reenable(VIO *vio)
+{
+ if (STATE_FROM_VIO(vio)->enabled) {
+ return;
+ }
+ set_enabled(vio);
+ if (!thread) {
+ return;
+ }
+ EThread *t = vio->mutex->thread_holding;
+ ink_assert(t == this_ethread());
+ ink_release_assert(!closed);
+ if (nh->mutex->thread_holding == t) {
+ if (vio == &read.vio) {
+ ep.modify(EVENTIO_READ);
+ ep.refresh(EVENTIO_READ);
+ if (read.triggered) {
+ nh->read_ready_list.in_or_enqueue(this);
+ } else {
+ nh->read_ready_list.remove(this);
+ }
+ } else {
+ ep.modify(EVENTIO_WRITE);
+ ep.refresh(EVENTIO_WRITE);
+ if (write.triggered) {
+ nh->write_ready_list.in_or_enqueue(this);
+ } else {
+ nh->write_ready_list.remove(this);
+ }
+ }
+ } else {
+ MUTEX_TRY_LOCK(lock, nh->mutex, t);
+ if (!lock.is_locked()) {
+ if (vio == &read.vio) {
+ int isin = ink_atomic_swap(&read.in_enabled_list, 1);
+ if (!isin) {
+ nh->read_enable_list.push(this);
+ }
+ } else {
+ int isin = ink_atomic_swap(&write.in_enabled_list, 1);
+ if (!isin) {
+ nh->write_enable_list.push(this);
+ }
+ }
+ if (likely(nh->thread)) {
+ nh->thread->tail_cb->signalActivity();
+ } else if (nh->trigger_event) {
+ nh->trigger_event->ethread->tail_cb->signalActivity();
+ }
+ } else {
+ if (vio == &read.vio) {
+ ep.modify(EVENTIO_READ);
+ ep.refresh(EVENTIO_READ);
+ if (read.triggered) {
+ nh->read_ready_list.in_or_enqueue(this);
+ } else {
+ nh->read_ready_list.remove(this);
+ }
+ } else {
+ ep.modify(EVENTIO_WRITE);
+ ep.refresh(EVENTIO_WRITE);
+ if (write.triggered) {
+ nh->write_ready_list.in_or_enqueue(this);
+ } else {
+ nh->write_ready_list.remove(this);
+ }
+ }
+ }
+ }
+}
+
+void
+UnixNetVConnection::reenable_re(VIO *vio)
+{
+ if (!thread) {
+ return;
+ }
+ EThread *t = vio->mutex->thread_holding;
+ ink_assert(t == this_ethread());
+ if (nh->mutex->thread_holding == t) {
+ set_enabled(vio);
+ if (vio == &read.vio) {
+ ep.modify(EVENTIO_READ);
+ ep.refresh(EVENTIO_READ);
+ if (read.triggered) {
+ net_read_io(nh);
+ } else {
+ nh->read_ready_list.remove(this);
+ }
+ } else {
+ ep.modify(EVENTIO_WRITE);
+ ep.refresh(EVENTIO_WRITE);
+ if (write.triggered) {
+ this->net_write_io(nh);
+ } else {
+ nh->write_ready_list.remove(this);
+ }
+ }
+ } else {
+ reenable(vio);
+ }
+}
+
+UnixNetVConnection::UnixNetVConnection()
+{
+ SET_HANDLER(&UnixNetVConnection::startEvent);
+}
+
+// Private methods
+
+void
+UnixNetVConnection::set_enabled(VIO *vio)
+{
+ ink_assert(vio->mutex->thread_holding == this_ethread() && thread);
+ ink_release_assert(!closed);
+ STATE_FROM_VIO(vio)->enabled = 1;
+ if (!next_inactivity_timeout_at && inactivity_timeout_in) {
+ next_inactivity_timeout_at = ink_get_hrtime() + inactivity_timeout_in;
+ }
+}
+
+// Read the data for a UnixNetVConnection.
+// Rescheduling the UnixNetVConnection by moving the VC
+// onto or off of the ready_list.
+void
+UnixNetVConnection::net_read_io(NetHandler *nh)
+{
+ NetState *s = &this->read;
+ int64_t r = 0;
+
+ MUTEX_TRY_LOCK(lock, s->vio.mutex, thread);
+
+ if (!lock.is_locked()) {
+ read_reschedule(nh, this);
+ return;
+ }
+
+ // It is possible that the closed flag got set from HttpSessionManager in the
+ // global session pool case. If so, the closed flag should be stable once
we get the
+ // s->vio.mutex (the global session pool mutex).
+ if (this->closed) {
+ this->nh->free_netevent(this);
+ return;
+ }
+ // if it is not enabled.
+ if (!s->enabled || s->vio.op != VIO::READ || s->vio.is_disabled()) {
+ read_disable(nh, this);
+ return;
+ }
+
+ MIOBufferAccessor &buf = s->vio.buffer;
+ ink_assert(buf.writer());
+
+ // if there is nothing to do, disable connection
+ int64_t ntodo = s->vio.ntodo();
+ if (ntodo <= 0) {
+ read_disable(nh, this);
+ return;
+ }
+ int64_t toread = buf.writer()->write_avail();
+ if (toread > ntodo) {
+ toread = ntodo;
+ }
+
+ // read data
+ int64_t rattempted = 0, total_read = 0;
+ unsigned niov = 0;
+ IOVec tiovec[NET_MAX_IOV];
+ if (toread) {
+ IOBufferBlock *b = buf.writer()->first_write_block();
+ do {
+ niov = 0;
+ rattempted = 0;
+ while (b && niov < NET_MAX_IOV) {
+ int64_t a = b->write_avail();
+ if (a > 0) {
+ tiovec[niov].iov_base = b->_end;
+ int64_t togo = toread - total_read - rattempted;
+ if (a > togo) {
+ a = togo;
+ }
+ tiovec[niov].iov_len = a;
+ rattempted += a;
+ niov++;
+ if (a >= togo) {
+ break;
+ }
+ }
+ b = b->next.get();
+ }
+
+ ink_assert(niov > 0);
+ ink_assert(niov <= countof(tiovec));
+ struct msghdr msg;
+
+ ink_zero(msg);
+ msg.msg_name = const_cast<sockaddr *>(this->get_remote_addr());
+ msg.msg_namelen = ats_ip_size(this->get_remote_addr());
+ msg.msg_iov = &tiovec[0];
+ msg.msg_iovlen = niov;
+ r = this->con.sock.recvmsg(&msg, 0);
+
+ Metrics::Counter::increment(net_rsb.calls_to_read);
+
+ total_read += rattempted;
+ } while (rattempted && r == rattempted && total_read < toread);
+
+ // if we have already moved some bytes successfully, summarize in r
+ if (total_read != rattempted) {
+ if (r <= 0) {
+ r = total_read - rattempted;
+ } else {
+ r = total_read - rattempted + r;
+ }
+ }
+ // check for errors
+ if (r <= 0) {
+ if (r == -EAGAIN || r == -ENOTCONN) {
+ Metrics::Counter::increment(net_rsb.calls_to_read_nodata);
+ this->read.triggered = 0;
+ nh->read_ready_list.remove(this);
+ return;
+ }
+
+ if (!r || r == -ECONNRESET) {
+ this->read.triggered = 0;
+ nh->read_ready_list.remove(this);
+ read_signal_done(VC_EVENT_EOS, nh, this);
+ return;
+ }
+ this->read.triggered = 0;
+ read_signal_error(nh, this, static_cast<int>(-r));
+ return;
+ }
+ Metrics::Counter::increment(net_rsb.read_bytes, r);
+ Metrics::Counter::increment(net_rsb.read_bytes_count);
+
+ // Add data to buffer and signal continuation.
+ buf.writer()->fill(r);
+#ifdef DEBUG
+ if (buf.writer()->write_avail() <= 0) {
+ Dbg(dbg_ctl_iocore_net, "read_from_net, read buffer full");
+ }
+#endif
+ s->vio.ndone += r;
+ this->netActivity();
+ } else {
+ r = 0;
+ }
+
+ // Signal read ready, check if user is not done
+ if (r) {
+ // If there are no more bytes to read, signal read complete
+ ink_assert(ntodo >= 0);
+ if (s->vio.ntodo() <= 0) {
+ read_signal_done(VC_EVENT_READ_COMPLETE, nh, this);
+ Dbg(dbg_ctl_iocore_net, "read_from_net, read finished - signal done");
+ return;
+ } else {
+ if (read_signal_and_update(VC_EVENT_READ_READY, this) != EVENT_CONT) {
+ return;
+ }
+
+ // change of lock... don't look at shared variables!
+ if (lock.get_mutex() != s->vio.mutex.get()) {
+ read_reschedule(nh, this);
+ return;
+ }
+ }
+ }
+
+ // If here are is no more room, or nothing to do, disable the connection
+ if (s->vio.ntodo() <= 0 || !s->enabled || !buf.writer()->write_avail()) {
+ read_disable(nh, this);
+ return;
+ }
+
+ read_reschedule(nh, this);
+}
+
+//
+// Write the data for a UnixNetVConnection.
+// Rescheduling the UnixNetVConnection when necessary.
//
void
-UnixNetVConnection::reenable(VIO *vio)
+UnixNetVConnection::net_write_io(NetHandler *nh)
{
- if (STATE_FROM_VIO(vio)->enabled) {
+ Metrics::Counter::increment(net_rsb.calls_to_writetonet);
+ NetState *s = &this->write;
+ Continuation *c = this->write.vio.cont;
+
+ MUTEX_TRY_LOCK(lock, s->vio.mutex, thread);
+
+ if (!lock.is_locked() || lock.get_mutex() != s->vio.mutex.get()) {
+ write_reschedule(nh, this);
return;
}
- set_enabled(vio);
- if (!thread) {
+
+ if (this->has_error()) {
+ this->lerrno = this->error;
+ write_signal_and_update(VC_EVENT_ERROR, this);
return;
}
- EThread *t = vio->mutex->thread_holding;
- ink_assert(t == this_ethread());
- ink_release_assert(!closed);
- if (nh->mutex->thread_holding == t) {
- if (vio == &read.vio) {
- ep.modify(EVENTIO_READ);
- ep.refresh(EVENTIO_READ);
- if (read.triggered) {
- nh->read_ready_list.in_or_enqueue(this);
- } else {
- nh->read_ready_list.remove(this);
- }
+
+ // This function will always return true unless
+ // this vc is an SSLNetVConnection.
+ if (!this->getSSLHandShakeComplete()) {
+ if (this->trackFirstHandshake()) {
+ // Eat the first write-ready. Until the TLS handshake is complete,
+ // we should still be under the connect timeout and shouldn't bother
+ // the state machine until the TLS handshake is complete
+ this->write.triggered = 0;
+ nh->write_ready_list.remove(this);
+ }
+
+ int err, ret;
+
+ if (this->get_context() == NET_VCONNECTION_OUT) {
+ ret = this->sslStartHandShake(SSL_EVENT_CLIENT, err);
} else {
- ep.modify(EVENTIO_WRITE);
- ep.refresh(EVENTIO_WRITE);
- if (write.triggered) {
- nh->write_ready_list.in_or_enqueue(this);
- } else {
- nh->write_ready_list.remove(this);
- }
+ ret = this->sslStartHandShake(SSL_EVENT_SERVER, err);
}
- } else {
- MUTEX_TRY_LOCK(lock, nh->mutex, t);
- if (!lock.is_locked()) {
- if (vio == &read.vio) {
- int isin = ink_atomic_swap(&read.in_enabled_list, 1);
- if (!isin) {
- nh->read_enable_list.push(this);
- }
- } else {
- int isin = ink_atomic_swap(&write.in_enabled_list, 1);
- if (!isin) {
- nh->write_enable_list.push(this);
- }
+
+ if (ret == EVENT_ERROR) {
+ this->write.triggered = 0;
+ write_signal_error(nh, this, err);
+ } else if (ret == SSL_HANDSHAKE_WANT_READ || ret ==
SSL_HANDSHAKE_WANT_ACCEPT) {
+ this->read.triggered = 0;
+ nh->read_ready_list.remove(this);
+ read_reschedule(nh, this);
+ } else if (ret == SSL_HANDSHAKE_WANT_CONNECT || ret ==
SSL_HANDSHAKE_WANT_WRITE) {
+ this->write.triggered = 0;
+ nh->write_ready_list.remove(this);
+ write_reschedule(nh, this);
+ } else if (ret == EVENT_DONE) {
+ this->write.triggered = 1;
+ if (this->write.enabled) {
+ nh->write_ready_list.in_or_enqueue(this);
}
- if (likely(nh->thread)) {
- nh->thread->tail_cb->signalActivity();
- } else if (nh->trigger_event) {
- nh->trigger_event->ethread->tail_cb->signalActivity();
+ // If this was driven by a zero length read, signal complete when
+ // the handshake is complete. Otherwise set up for continuing read
+ // operations.
+ if (s->vio.ntodo() <= 0) {
+ this->readSignalDone(VC_EVENT_WRITE_COMPLETE, nh);
}
} else {
- if (vio == &read.vio) {
- ep.modify(EVENTIO_READ);
- ep.refresh(EVENTIO_READ);
- if (read.triggered) {
- nh->read_ready_list.in_or_enqueue(this);
- } else {
- nh->read_ready_list.remove(this);
- }
- } else {
- ep.modify(EVENTIO_WRITE);
- ep.refresh(EVENTIO_WRITE);
- if (write.triggered) {
- nh->write_ready_list.in_or_enqueue(this);
- } else {
- nh->write_ready_list.remove(this);
- }
- }
+ write_reschedule(nh, this);
}
+
+ return;
}
-}
-void
-UnixNetVConnection::reenable_re(VIO *vio)
-{
- if (!thread) {
+ // If it is not enabled,add to WaitList.
+ if (!s->enabled || s->vio.op != VIO::WRITE) {
+ write_disable(nh, this);
return;
}
- EThread *t = vio->mutex->thread_holding;
- ink_assert(t == this_ethread());
- if (nh->mutex->thread_holding == t) {
- set_enabled(vio);
- if (vio == &read.vio) {
- ep.modify(EVENTIO_READ);
- ep.refresh(EVENTIO_READ);
- if (read.triggered) {
- net_read_io(nh, t);
- } else {
- nh->read_ready_list.remove(this);
- }
- } else {
- ep.modify(EVENTIO_WRITE);
- ep.refresh(EVENTIO_WRITE);
- if (write.triggered) {
- write_to_net(nh, this, t);
- } else {
- nh->write_ready_list.remove(this);
- }
+
+ // If there is nothing to do, disable
+ int64_t ntodo = s->vio.ntodo();
+ if (ntodo <= 0) {
+ write_disable(nh, this);
+ return;
+ }
+
+ MIOBufferAccessor &buf = s->vio.buffer;
+ ink_assert(buf.writer());
+
+ // Calculate the amount to write.
+ int64_t towrite = buf.reader()->read_avail();
+ if (towrite > ntodo) {
+ towrite = ntodo;
+ }
+
+ int signalled = 0;
+
+ // signal write ready to allow user to fill the buffer
+ if (towrite != ntodo && !buf.writer()->high_water()) {
+ if (write_signal_and_update(VC_EVENT_WRITE_READY, this) != EVENT_CONT) {
+ return;
+ } else if (c != s->vio.cont) { /* The write vio was updated in the handler
*/
+ write_reschedule(nh, this);
+ return;
+ }
+
+ ntodo = s->vio.ntodo();
+ if (ntodo <= 0) {
+ write_disable(nh, this);
+ return;
+ }
+
+ signalled = 1;
+
+ // Recalculate amount to write
+ towrite = buf.reader()->read_avail();
+ if (towrite > ntodo) {
+ towrite = ntodo;
}
- } else {
- reenable(vio);
}
-}
-UnixNetVConnection::UnixNetVConnection()
-{
- SET_HANDLER(&UnixNetVConnection::startEvent);
-}
+ // if there is nothing to do, disable
+ ink_assert(towrite >= 0);
+ if (towrite <= 0) {
+ write_disable(nh, this);
+ return;
+ }
-// Private methods
+ int needs = 0;
+ int64_t total_written = 0;
+ int64_t r = this->load_buffer_and_write(towrite, buf,
total_written, needs);
-void
-UnixNetVConnection::set_enabled(VIO *vio)
-{
- ink_assert(vio->mutex->thread_holding == this_ethread() && thread);
- ink_release_assert(!closed);
- STATE_FROM_VIO(vio)->enabled = 1;
- if (!next_inactivity_timeout_at && inactivity_timeout_in) {
- next_inactivity_timeout_at = ink_get_hrtime() + inactivity_timeout_in;
+ if (total_written > 0) {
+ Metrics::Counter::increment(net_rsb.write_bytes, total_written);
+ Metrics::Counter::increment(net_rsb.write_bytes_count);
+ s->vio.ndone += total_written;
+ this->netActivity();
}
-}
-void
-UnixNetVConnection::net_read_io(NetHandler *nh, EThread *lthread)
-{
- read_from_net(nh, this, lthread);
-}
+ // A write of 0 makes no sense since we tried to write more than 0.
+ ink_assert(r != 0);
+ // Either we wrote something or got an error.
+ // check for errors
+ if (r < 0) { // if the socket was not ready, add to WaitList
+ if (r == -EAGAIN || r == -ENOTCONN || -r == EINPROGRESS) {
+ Metrics::Counter::increment(net_rsb.calls_to_write_nodata);
+ if ((needs & EVENTIO_WRITE) == EVENTIO_WRITE) {
+ this->write.triggered = 0;
+ nh->write_ready_list.remove(this);
+ write_reschedule(nh, this);
+ }
-void
-UnixNetVConnection::net_write_io(NetHandler *nh, EThread *lthread)
-{
- write_to_net(nh, this, lthread);
+ if ((needs & EVENTIO_READ) == EVENTIO_READ) {
+ this->read.triggered = 0;
+ nh->read_ready_list.remove(this);
+ read_reschedule(nh, this);
+ }
+
+ return;
+ }
+
+ this->write.triggered = 0;
+ write_signal_error(nh, this, static_cast<int>(-r));
+ return;
+ } else { // Wrote data. Finished
without error
+ int wbe_event = this->write_buffer_empty_event; // save so we can clear if
needed.
+
+ // If the empty write buffer trap is set, clear it.
+ if (!(buf.reader()->is_read_avail_more_than(0))) {
+ this->write_buffer_empty_event = 0;
+ }
+
+ // If there are no more bytes to write, signal write complete,
+ ink_assert(ntodo >= 0);
+ if (s->vio.ntodo() <= 0) {
+ write_signal_done(VC_EVENT_WRITE_COMPLETE, nh, this);
+ return;
+ }
+
+ int e = 0;
+ if (!signalled || (s->vio.ntodo() > 0 && !buf.writer()->high_water())) {
+ e = VC_EVENT_WRITE_READY;
+ } else if (wbe_event != this->write_buffer_empty_event) {
+ // @a signalled means we won't send an event, and the event values
differing means we
+ // had a write buffer trap and cleared it, so we need to send it now.
+ e = wbe_event;
+ }
+
+ if (e) {
+ if (write_signal_and_update(e, this) != EVENT_CONT) {
+ return;
+ }
+
+ // change of lock... don't look at shared variables!
+ if (lock.get_mutex() != s->vio.mutex.get()) {
+ write_reschedule(nh, this);
+ return;
+ }
+ }
+
+ if ((needs & EVENTIO_READ) == EVENTIO_READ) {
+ read_reschedule(nh, this);
+ }
+
+ if (!(buf.reader()->is_read_avail_more_than(0))) {
+ write_disable(nh, this);
+ return;
+ }
+
+ if ((needs & EVENTIO_WRITE) == EVENTIO_WRITE) {
+ write_reschedule(nh, this);
+ }
+
+ return;
+ }
}
// This code was pulled out of write_to_net so
@@ -1005,9 +974,14 @@ UnixNetVConnection::writeReschedule(NetHandler *nh)
}
void
-UnixNetVConnection::netActivity(EThread *lthread)
+UnixNetVConnection::netActivity()
{
- net_activity(this, lthread);
+ Dbg(dbg_ctl_socket, "net_activity updating inactivity %" PRId64 ",
NetVC=%p", this->inactivity_timeout_in, this);
+ if (this->inactivity_timeout_in) {
+ this->next_inactivity_timeout_at = ink_get_hrtime() +
this->inactivity_timeout_in;
+ } else {
+ this->next_inactivity_timeout_at = 0;
+ }
}
int