cmcfarlen commented on code in PR #11871:
URL: https://github.com/apache/trafficserver/pull/11871#discussion_r1844188335
##########
src/iocore/net/Server.cc:
##########
@@ -88,49 +104,205 @@ Server::close()
return sock.close();
}
+#if TS_USE_NUMA
+// Assumes that threads can be assigned to NUMA zones as 0,1,2,3,0,1,2,3,0,1,2
sequence with no gaps.
+
+class NUMASequencer
+{
+ std::mutex mutex;
+ std::condition_variable convar;
+ std::vector<int> thread_ids; // To store thread IDs
+ size_t cur_index = 0; // Index to track the current
thread to execute
+ bool initialized = false; // Flag to ensure
initialization happens once
+ bool ready_to_run = false; // Flag to ensure threads only
start executing when all IDs are collected
+
+public:
+ template <class T>
+ bool
+ run_sequential(T func)
+ {
+ std::unique_lock<std::mutex> lock(mutex);
+
+ int my_thread_id = this_ethread()->id;
+ int my_numa_node = this_ethread()->get_numa_node();
+
+ Debug("numa_sequencer", "[NUMASequencer] Thread %d (NUMA node %d) entered
run_sequential.", my_thread_id, my_numa_node);
+
+ // Initialize and populate the thread IDs vector
+ if (!initialized) {
+ initialized = true;
+ thread_ids.reserve(eventProcessor.net_threads); // Preallocate space
+ Debug("numa_sequencer", "[NUMASequencer] Initialized thread ID vector
with capacity %d.", eventProcessor.net_threads);
+ }
+
+ // Add the current thread ID to the list if it's not already present
+ if (std::find(thread_ids.begin(), thread_ids.end(), my_thread_id) ==
thread_ids.end()) {
+ thread_ids.push_back(my_thread_id);
+ Debug("numa_sequencer", "[NUMASequencer] Added Thread %d to the thread
ID list. Total threads collected: %zu", my_thread_id,
+ thread_ids.size());
+ }
+
+ // If all threads have been added (assuming their number is equal to
eventProcessor.net_threads), sort the thread IDs and set
+ // ready_to_run to true
+ if (thread_ids.size() == eventProcessor.net_threads) {
+ std::sort(thread_ids.begin(), thread_ids.end());
+ Debug("numa_sequencer", "[NUMASequencer] All thread IDs collected.
Sorting thread IDs...");
+ Debug("numa_sequencer", "[NUMASequencer] Thread IDs sorted. Execution
will follow this order:");
+ for (size_t i = 0; i < thread_ids.size(); ++i) {
+ Debug("numa_sequencer", "[NUMASequencer] Execution order %zu: Thread
ID %d", i + 1, thread_ids[i]);
+ }
+ ready_to_run = true;
+ convar.notify_all(); // Notify all threads that execution can begin
+ }
+
+ // Wait until all thread IDs are collected and ready_to_run is true
+ while (!ready_to_run) {
+ Debug("numa_sequencer", "[NUMASequencer] Thread %d is waiting for all
thread IDs to be collected.", my_thread_id);
+ convar.wait(lock);
+ }
+
+ // Logging the current state before entering the wait loop
+ Debug("numa_sequencer", "[NUMASequencer] Thread %d (NUMA node %d) waiting
to execute. Current sequence index: %zu",
+ my_thread_id, my_numa_node, cur_index);
+
+ // Wait until it's this thread's turn based on sorted IDs
+ while (cur_index < thread_ids.size() && thread_ids[cur_index] !=
my_thread_id) {
+ Debug("numa_sequencer", "[NUMASequencer] Thread %d is not yet in
sequence. Waiting...", my_thread_id);
+ convar.wait(lock);
+ }
+
+ // Log when the thread has been awakened and is about to execute the
function
+ Debug("numa_sequencer", "[NUMASequencer] Thread %d (NUMA node %d)
awakened. About to execute function.", my_thread_id,
+ my_numa_node);
+
+ // Execute the function
+ bool result = func();
+
+ // More detailed logging for debugging
+ if (result) {
+ Debug("numa_sequencer", "[NUMASequencer] Thread %d successfully executed
the function on NUMA node %d.", this_ethread()->id,
+ my_numa_node);
+ } else {
+ Error("[NUMASequencer] Thread %d failed to execute the function on NUMA
node %d.", this_ethread()->id, my_numa_node);
+ }
+
+ // Move to the next thread in the sequence
+ cur_index++;
+ Debug("numa_sequencer", "[NUMASequencer] Thread %d completed execution.
Moving to next thread. New index: %zu.", my_thread_id,
+ cur_index);
+
+ // If we've completed one pass through all threads, reset the index and
increment the repeat counter
+ if (cur_index >= thread_ids.size()) {
+ cur_index = 0;
+ Debug("numa_sequencer", "[NUMASequencer] Completed a full pass through
all threads. Resetting index.");
+ }
+
+ // Notify all threads about the change in sequence
+ convar.notify_all();
+
+ return result;
+ }
+};
+NUMASequencer numa_sequencer;
+#endif
+
int
Server::listen(bool non_blocking, const NetProcessor::AcceptOptions &opt)
{
ink_assert(!sock.is_ok());
int res = 0;
socklen_t namelen;
int prot = IPPROTO_TCP;
+#if TS_USE_NUMA
+ int use_ebpf = 0;
+ int affinity = 1;
+ bool success = false;
+
+ // Define the initialization function that will be run sequentially
+ auto init_func = [&]() -> bool {
+ // Additional setup after listen
+ Debug("numa", "[Server::listen] Attempting to set up fd after listen with
options: %d", opt);
+ if ((res = setup_fd_after_listen(opt)) < 0) {
+ Error("[Server::listen] Failed to setup fd after listen: %d", res);
+ return false;
+ }
+ Debug("numa", "[Server::listen] Thread %d successfully set up the
socket.", this_ethread()->id);
+ return true;
+ };
+#endif
+ // Set the IP address for binding
if (!ats_is_ip(&accept_addr)) {
ats_ip4_set(&addr, INADDR_ANY, 0);
} else {
ats_ip_copy(&addr, &accept_addr);
}
-
+ // Set protocol for MPTCP if enabled
if (opt.f_mptcp) {
Dbg(dbg_ctl_connection, "Define socket with MPTCP");
prot = IPPROTO_MPTCP;
}
+ // Create the socket
+ Debug("numa", "[Server::listen] Attempting to create socket with family: %d,
type: %d, protocol: %d", addr.sa.sa_family,
Review Comment:
These Debug lines were maybe for your debug? Could they be removed?
##########
src/cripts/Lulu.cc:
##########
@@ -163,6 +163,12 @@ cripts::Splitter(cripts::string_view input, char delim)
return details::Splitter<cripts::string_view>(input, delim);
}
+std::vector<Cript::string_view>
+Cript::splitter(Cript::string_view input, char delim)
+{
+ return details::splitter<Cript::string_view>(input, delim);
+}
+
Review Comment:
You can remove this chunk, perhaps a merge conflict issue.
##########
src/iocore/net/Server.cc:
##########
@@ -88,49 +104,205 @@ Server::close()
return sock.close();
}
+#if TS_USE_NUMA
+// Assumes that threads can be assigned to NUMA zones as 0,1,2,3,0,1,2,3,0,1,2
sequence with no gaps.
+
+class NUMASequencer
+{
+ std::mutex mutex;
+ std::condition_variable convar;
+ std::vector<int> thread_ids; // To store thread IDs
+ size_t cur_index = 0; // Index to track the current
thread to execute
+ bool initialized = false; // Flag to ensure
initialization happens once
+ bool ready_to_run = false; // Flag to ensure threads only
start executing when all IDs are collected
+
+public:
+ template <class T>
+ bool
+ run_sequential(T func)
+ {
+ std::unique_lock<std::mutex> lock(mutex);
+
+ int my_thread_id = this_ethread()->id;
+ int my_numa_node = this_ethread()->get_numa_node();
+
+ Debug("numa_sequencer", "[NUMASequencer] Thread %d (NUMA node %d) entered
run_sequential.", my_thread_id, my_numa_node);
Review Comment:
Debug has been removed, please use the Dbg macro instead.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]