Hi Lorenzo,

Responding for Erlon since he is out this week, comments inline.

On 9/12/25 5:46 PM, Lorenzo Bianconi via dev wrote:
On Sep 03, Erlon R. Cruz wrote:
At the current state, OVN can not handle UDP fragmented traffic
for ACLs in the userspace. Just like in the case of LB (commit
20a96b9), the kernel DP will try to reassemble the fragments during CT
lookup, however userspace won't reassemble them. One way to solve that
is to use the ct_udp field to match on ct.new connections. With the
current state, OVN will get the CMS rules and write them verbatim
to SB, which will generate similar rules in OVS. So, this workaround
replaces L4 protocol matches with connection tracking  equivalents.
For example:

    "outport == "server" && udp && udp.dst == 4242"
becomes:
    "outport == "server" && ct.new && udp && ct_udp.dst == 4242"

We made this behavior to be optional and disabled by default. In
order to enable it, the user needs to set an NB_Global flag:

ovn-nbctl set NB_Global . options:acl_udp_ct_translation=true

Signed-off-by: Erlon R. Cruz <[email protected]>
---
  NEWS                      |   4 +
  northd/en-global-config.c |   5 +
  northd/northd.c           | 239 +++++++++++++++++++++-
  ovn-nb.xml                |  11 +
  tests/automake.mk         |   4 +-
  tests/system-ovn.at       | 420 ++++++++++++++++++++++++++++++++++++++
  tests/tcp_simple.py       | 338 ++++++++++++++++++++++++++++++
  tests/udp_client.py       | 113 ++++++++++
  8 files changed, 1129 insertions(+), 5 deletions(-)
  create mode 100755 tests/tcp_simple.py
  create mode 100755 tests/udp_client.py

diff --git a/NEWS b/NEWS
index 932e173af..96aa6afda 100644
--- a/NEWS
+++ b/NEWS
@@ -31,6 +31,10 @@ OVN v25.09.0 - xxx xx xxxx
       with stateless ACL to work with load balancer.
     - Added new ovn-nbctl command 'pg-get-ports' to get the ports assigned to
       the port group.
+   - Added new NB_Global option 'acl_udp_ct_translation' to control whether
+     stateful ACL rules that match on UDP port fields are rewritten to use
+     connection tracking fields to properly handle IP fragments. Default is
+     false.
     - The ovn-controller option ovn-ofctrl-wait-before-clear is no longer
       supported.  It will be ignored if used. ovn-controller will
       automatically care about proper delay before clearing lflow.
diff --git a/northd/en-global-config.c b/northd/en-global-config.c
index 76046c265..48586c69b 100644
--- a/northd/en-global-config.c
+++ b/northd/en-global-config.c
@@ -650,6 +650,11 @@ check_nb_options_out_of_sync(
          return true;
      }
+ if (config_out_of_sync(&nb->options, &config_data->nb_options,
+                           "acl_udp_ct_translation", false)) {
+        return true;
+    }
+
      return false;
  }
diff --git a/northd/northd.c b/northd/northd.c
index e0a329a17..2eb6e4924 100644
--- a/northd/northd.c
+++ b/northd/northd.c
@@ -71,6 +71,7 @@
  #include "uuid.h"
  #include "ovs-thread.h"
  #include "openvswitch/vlog.h"
+#include <ctype.h>
VLOG_DEFINE_THIS_MODULE(northd); @@ -88,6 +89,12 @@ static bool use_common_zone = false;
   * Otherwise, it will avoid using it.  The default is true. */
  static bool use_ct_inv_match = true;
+/* If this option is 'true' northd will rewrite stateful ACL rules that match
+ * on UDP port fields to use connection tracking fields to properly handle IP
+ * fragments. By default this option is set to 'false'.
+ */

Should we add a comment about HW offload here and in ovn-nb.xml?

I can add a comment in a follow-up patch.

+static bool acl_udp_ct_translation = false;
+
  /* If this option is 'true' northd will implicitly add a lowest-priority
   * drop rule in the ACL stage of logical switches that have at least one
   * ACL.
@@ -6789,6 +6796,201 @@ build_acl_sample_default_flows(const struct 
ovn_datapath *od,
                    "next;", lflow_ref);
  }
+/* Port extraction result structure */
+struct port_extract_result {
+    bool found;
+    char *operator;  /* "==", ">=", "<=" */
+    char port_value[16];
+};
+
+/* Extracts port number from a match string with support for exact matches and
+ * ranges.
+ * Examples of match strings and extracted values:
+ * - "udp.dst == 4242" -> operator="==", port_value="4242"
+ * - "udp.dst >= 3002" -> operator=">=", port_value="3002"
+ * - "udp.dst <= 3006" -> operator="<=", port_value="3006"
+ * - "outport == \"server\" && udp && udp.dst == 4242" -> operator="==",
+ *    port_value="4242"
+ *
+ * Fills the caller-allocated result struct with extracted values.
+ * Returns true if port was extracted, false otherwise.
+ */
+static bool
+extract_port_value(const char *match_str, const char *field,
+                   struct port_extract_result *result)
+{
+    char *str_copy = xstrdup(match_str);
+    char *token;
+    char *saveptr;
+
+    /* Initialize result struct */
+    if (result) {
+        result->found = false;
+        result->operator = NULL;
+        result->port_value[0] = '\0';
+    } else {
+        return false;

I think there is a leak here since you do not free str_copy pointer. Moreover
this branch is not never used so I think we can drop it.

Yes, looks like result is always passed, will remove the else.

+    }
+
+    /* Tokenize by && to find the specific field condition */
+    token = strtok_r(str_copy, "&&", &saveptr);
+
+    while (token) {
+        /* Skip leading spaces */
+        while (*token == ' ') {
+            token++;
+        }
+
+        /* Check if this token contains our field */
+        if (strstr(token, field)) {
+            char *field_pos = strstr(token, field);
+            field_pos += strlen(field);
+
+            /* Skip spaces */
+            while (*field_pos == ' ') {
+                field_pos++;
+            }
+
+            /* Determine the operator and extract port */
+            if (strncmp(field_pos, ">=", 2) == 0) {
+                result->operator = ">=";
+                field_pos += 2;
+            } else if (strncmp(field_pos, "<=", 2) == 0) {
+                result->operator = "<=";
+                field_pos += 2;
+            } else if (strncmp(field_pos, "==", 2) == 0) {
+                result->operator = "==";
+                field_pos += 2;
+            } else {

nit: I would prefer to jump to last instruction of while loop, but I do not
have a strong opinion on it

Yes, I see what you mean, will change it.

+                /* No recognized operator found */
+                token = strtok_r(NULL, "&&", &saveptr);
+                continue;
+            }
+
+            /* Skip spaces after operator */
+            while (*field_pos == ' ') {
+                field_pos++;
+            }
+
+            /* Extract the port number */
+            size_t i = 0;
+            while (*field_pos && isdigit(*field_pos) &&
+                   i < (sizeof(result->port_value) - 1)) {
+                result->port_value[i++] = *field_pos++;
+            }
+            result->port_value[i] = '\0';
+
+            if (i > 0) {
+                result->found = true;
+                break;
+            }
+        }
+
+        token = strtok_r(NULL, "&&", &saveptr);
+    }
+
+    free(str_copy);
+    return result->found;
+}
+
+/* This function implements a workaround for stateful ACLs with UDP matches
+ * that need to handle IP fragments properly. The issue is that UDP L4 headers
+ * are only present in the first fragment of a fragmented packet. Subsequent
+ * fragments don't have L4 headers, so they won't match ACL rules that look for
+ * UDP fields.
+ *
+ * The workaround replaces UDP protocol matches with connection tracking
+ * equivalents. For example:
+ *   "outport == "server" && udp && udp.dst == 4242"
+ * becomes:
+ *   "outport == "server" && udp && ct.new && ct_udp.dst == 4242"
+ */
+static char *
+rewrite_match_for_fragments(const char *match_str)
+{
+    VLOG_DBG("rewrite_match_for_fragments called with: %s", match_str);

I guess you can drop the log message above since you have already added it
returning from the routine.

True, this can be removed or consolidated since there is only one return.

+    struct ds new_match = DS_EMPTY_INITIALIZER;
+    bool has_udp = false;
+    bool has_udp_dst = false;
+    bool has_udp_src = false;
+
+    char *str_copy = xstrdup(match_str);
+    char *token;
+    char *saveptr;
+
+    /* First token */
+    token = strtok_r(str_copy, "&&", &saveptr);
+
+    while (token) {
+        /* Skip leading spaces */
+        while (*token == ' ') {
+            token++;
+        }
+
+        /* Check what kind of token this is */
+        if (strstr(token, "udp") && !strstr(token, "udp.")) {
+            /* This is the UDP protocol marker */
+            has_udp = true;
+        } else if (strstr(token, "udp.dst")) {
+            /* This is a UDP destination port condition */
+            has_udp_dst = true;
+        } else if (strstr(token, "udp.src")) {
+            /* This is a UDP source port condition */
+            has_udp_src = true;
+        } else {
+            /* This is a non-UDP condition, keep it */
+            if (new_match.length > 0) {
+                ds_put_cstr(&new_match, " && ");
+            }
+            ds_put_cstr(&new_match, token);
+        }
+
+        /* Get next token */
+        token = strtok_r(NULL, "&&", &saveptr);
+    }
+
+    /* Free the string copy */
+    free(str_copy);
+
+    /* If we found UDP, always preserve it */
+    if (has_udp) {
+        if (new_match.length > 0) {
+            ds_put_cstr(&new_match, " && ");
+        }
+        ds_put_cstr(&new_match, "udp");
+
+        /* Handle destination port conditions */
+        if (has_udp_dst) {
+            struct port_extract_result dst_result;
+            if (extract_port_value(match_str, "udp.dst", &dst_result)) {
+                ds_put_format(&new_match, " && ct_udp.dst %s %s",
+                              dst_result.operator, dst_result.port_value);
+            }
+        }
+
+        /* Handle source port conditions */
+        if (has_udp_src) {
+            struct port_extract_result src_result;
+            if (extract_port_value(match_str, "udp.src", &src_result)) {
+                ds_put_format(&new_match, " && ct_udp.src %s %s",
+                              src_result.operator, src_result.port_value);
+            }
+        }
+
+        /* Add !ct.inv condition */
+        if (new_match.length > 0) {
+            ds_put_cstr(&new_match, " && ");
+        }
+        ds_put_cstr(&new_match, "!ct.inv && ct_proto == 17");
+    }
+
+    /* Return the result */
+    char *result = xstrdup(ds_cstr(&new_match));
+    ds_destroy(&new_match);
+    VLOG_DBG("rewrite_match_for_fragments returning: %s", result);
+    return result;
+}
+
  static void
  consider_acl(struct lflow_table *lflows, const struct ovn_datapath *od,
               const struct nbrec_acl *acl, bool has_stateful,
@@ -6802,6 +7004,8 @@ consider_acl(struct lflow_table *lflows, const struct 
ovn_datapath *od,
      enum ovn_stage stage;
      enum acl_observation_stage obs_stage;
+ VLOG_DBG("consider_acl: ingress=%d, acl=%s", ingress, acl->match);
+
      if (ingress && smap_get_bool(&acl->options, "apply-after-lb", false)) {
          stage = S_SWITCH_IN_ACL_AFTER_LB_EVAL;
          obs_stage = ACL_OBS_FROM_LPORT_AFTER_LB;
@@ -6839,6 +7043,23 @@ consider_acl(struct lflow_table *lflows, const struct 
ovn_datapath *od,
          match_tier_len = match->length;
      }
+ /* Check if this ACL has L4 matches that need fragment handling */
+    bool has_udp_match = strstr(acl->match, "udp") != NULL;
+
+    char *modified_match = NULL;
+    /* For stateful ACLs with L4 matches, rewrite the match string to handle
+     * fragments, but only if acl_udp_ct_translation is enabled */
+    if (has_stateful && has_udp_match && acl_udp_ct_translation) {
+        modified_match = rewrite_match_for_fragments(acl->match);
+
+        VLOG_DBG("Rewriting ACL match for L4 fragment handling: "
+                "original='%s' modified='%s'", acl->match, modified_match);
+    }
+
+    /* Use the original or modified match string based on whether UDP L4
+     * matches were detected */
+    const char *match_to_use = modified_match ? modified_match : acl->match;
+
      if (!has_stateful
          || !strcmp(acl->action, "pass")
          || !strcmp(acl->action, "allow-stateless")) {
@@ -6877,7 +7098,7 @@ consider_acl(struct lflow_table *lflows, const struct 
ovn_datapath *od,
           */
          ds_truncate(match, match_tier_len);
          ds_put_format(match, REGBIT_ACL_HINT_ALLOW_NEW " == 1 && (%s)",
-                      acl->match);
+                      match_to_use);
ds_truncate(actions, log_verdict_len); @@ -6922,7 +7143,7 @@ consider_acl(struct lflow_table *lflows, const struct ovn_datapath *od,
          ds_truncate(match, match_tier_len);
          ds_truncate(actions, log_verdict_len);
          ds_put_format(match, REGBIT_ACL_HINT_ALLOW " == 1 && (%s)",
-                      acl->match);
+                      match_to_use);
          if (acl->label || acl->sample_est) {
              ds_put_cstr(actions, REGBIT_CONNTRACK_COMMIT" = 1; ");
          }
@@ -6944,7 +7165,7 @@ consider_acl(struct lflow_table *lflows, const struct 
ovn_datapath *od,
           * connection, then we can simply reject/drop it. */
          ds_truncate(match, match_tier_len);
          ds_put_cstr(match, REGBIT_ACL_HINT_DROP " == 1");
-        ds_put_format(match, " && (%s)", acl->match);
+        ds_put_format(match, " && (%s)", match_to_use);
ds_truncate(actions, log_verdict_len); @@ -6968,7 +7189,7 @@ consider_acl(struct lflow_table *lflows, const struct ovn_datapath *od,
           */
          ds_truncate(match, match_tier_len);
          ds_put_cstr(match, REGBIT_ACL_HINT_BLOCK " == 1");
-        ds_put_format(match, " && (%s)", acl->match);
+        ds_put_format(match, " && (%s)", match_to_use);
ds_truncate(actions, log_verdict_len); @@ -6983,6 +7204,12 @@ consider_acl(struct lflow_table *lflows, const struct ovn_datapath *od,
                                  ds_cstr(match), ds_cstr(actions),
                                  &acl->header_, lflow_ref);
      }
+
+    /* Free the modified match string if it was created */
+    if (modified_match) {

you can drop the 'if' check

Will do.

+        free(modified_match);
+    }
+    VLOG_DBG("consider_acl done");
  }
static void
@@ -19172,6 +19399,10 @@ ovnnb_db_run(struct northd_input *input_data,
      use_common_zone = smap_get_bool(input_data->nb_options, "use_common_zone",
                                      false);
+ acl_udp_ct_translation = smap_get_bool(input_data->nb_options,
+                                           "acl_udp_ct_translation",
+                                       false);
+
      vxlan_mode = input_data->vxlan_mode;
build_datapaths(input_data->synced_lses,
diff --git a/ovn-nb.xml b/ovn-nb.xml
index 3f4398afb..498b6c993 100644
--- a/ovn-nb.xml
+++ b/ovn-nb.xml
@@ -429,6 +429,17 @@
          </p>
        </column>
+ <column name="options" key="acl_udp_ct_translation">
+        <p>
+          If set to <code>true</code>, <code>ovn-northd</code> will rewrite
+          stateful ACL rules that match on UDP port fields to use connection
+          tracking fields (ct_udp.dst, ct_udp.src) to properly handle IP
+          fragments. This ensures that fragmented UDP packets
+          match ACLs correctly. By default this option is set to
+          <code>false</code>.
+        </p>
+      </column>
+
        <column name="options" key="enable_chassis_nb_cfg_update">
          <p>
            If set to <code>false</code>, ovn-controllers will no longer update
diff --git a/tests/automake.mk b/tests/automake.mk
index adfa19503..edb370b99 100644
--- a/tests/automake.mk
+++ b/tests/automake.mk
@@ -333,7 +333,9 @@ CHECK_PYFILES = \
        tests/check_acl_log.py \
        tests/scapy-server.py \
        tests/client.py \
-       tests/server.py
+       tests/server.py \
+       tests/udp_client.py \
+       tests/tcp_simple.py
EXTRA_DIST += $(CHECK_PYFILES)
  PYCOV_CLEAN_FILES += $(CHECK_PYFILES:.py=.py,cover) .coverage
diff --git a/tests/system-ovn.at b/tests/system-ovn.at
index 8e356df6f..805e471df 100644
--- a/tests/system-ovn.at
+++ b/tests/system-ovn.at
@@ -18252,6 +18252,7 @@ OVS_TRAFFIC_VSWITCHD_STOP(["/failed to query port 
patch-.*/d
  AT_CLEANUP
  ])
+<<<<<<< HEAD

I think this is a leftover of a previous rebase.

Will cleanup.

  OVN_FOR_EACH_NORTHD([
  AT_SETUP([dynamic-routing - EVPN])
  AT_KEYWORDS([dynamic-routing])
@@ -18484,3 +18485,422 @@ OVS_TRAFFIC_VSWITCHD_STOP(["/failed to query port 
patch-.*/d
  /connection dropped.*/d"])
  AT_CLEANUP
  ])
+
+OVN_FOR_EACH_NORTHD([
+AT_SETUP([LB correctly handles fragmented traffic])
+AT_KEYWORDS([ovnlb])
+
+CHECK_CONNTRACK()
+CHECK_CONNTRACK_NAT()
+
+ovn_start
+OVS_TRAFFIC_VSWITCHD_START()
+ADD_BR([br-int])
+ADD_BR([br-ext])
+
+# Logical network:
+# 2 logical switches "public" (192.168.1.0/24) and "internal" (172.16.1.0/24)
+# connected to a router lr.
+# internal has a server.
+# client is connected through localnet.
+
+check ovs-ofctl add-flow br-ext action=normal
+# Set external-ids in br-int needed for ovn-controller
+check ovs-vsctl \
+        -- set Open_vSwitch . external-ids:system-id=hv1 \
+        -- set Open_vSwitch . 
external-ids:ovn-remote=unix:$ovs_base/ovn-sb/ovn-sb.sock \
+        -- set Open_vSwitch . external-ids:ovn-encap-type=geneve \
+        -- set Open_vSwitch . external-ids:ovn-encap-ip=169.0.0.1 \
+        -- set bridge br-int fail-mode=secure 
other-config:disable-in-band=true \
+        -- set Open_vSwitch . external-ids:ovn-bridge-mappings=phynet:br-ext
+
+
+# Start ovn-controller
+start_daemon ovn-controller
+
+# Set the minimal fragment size for userspace DP.
+# Note that this call will fail for system DP as this setting is not supported 
there.
+ovs-appctl dpctl/ipf-set-min-frag v4 500
+
+check ovn-nbctl lr-add lr
+check ovn-nbctl ls-add internal
+check ovn-nbctl ls-add public
+
+check ovn-nbctl lrp-add lr lr-pub 00:00:01:01:02:03 192.168.1.1/24
+check ovn-nbctl lsp-add  public pub-lr -- set Logical_Switch_Port pub-lr \
+    type=router options:router-port=lr-pub addresses=\"00:00:01:01:02:03\"
+
+check ovn-nbctl lrp-add lr lr-internal 00:00:01:01:02:04 172.16.1.1/24
+check ovn-nbctl lsp-add internal internal-lr -- set Logical_Switch_Port 
internal-lr \
+    type=router options:router-port=lr-internal addresses=\"00:00:01:01:02:04\"
+
+check ovn-nbctl lsp-add public ln_port \
+                -- lsp-set-addresses ln_port unknown \

[...]

--- /dev/null
+++ b/tests/tcp_simple.py
@@ -0,0 +1,338 @@
+#!/usr/bin/env python3
+"""
+Simple TCP client/server for testing network connectivity and data integrity.
+
+This module provides TCP echo server and client functionality for testing
+network connections, data transmission, and MD5 checksum verification.
+"""
+import socket
+import time
+import argparse
+import sys
+import hashlib
+import random
+
+DEFAULT_SRC_IP = "0.0.0.0"
+DEFAULT_DST_IP = "0.0.0.0"
+DEFAULT_PORT = 8080
+
+
+def calculate_md5(data):
+    """Calculate MD5 checksum of data."""
+    if isinstance(data, str):
+        data = data.encode('utf-8')
+    return hashlib.md5(data).hexdigest()
+
+
+def generate_random_bytes(size):
+    """Generate random bytes that are valid UTF-8."""
+    # Generate random printable ASCII characters (32-126) which are valid UTF-8
+    return bytes([random.randint(32, 126) for _ in range(size)])
+
+
+class TCPServer:
+    """Simple TCP echo server using standard sockets."""
+
+    def __init__(self, bind_ip, port):
+        self.bind_ip = bind_ip
+        self.port = port
+        self.running = False
+        self.server_socket = None
+
+    def start(self):
+        """Start the TCP server."""
+        exit_code = 0
+        self.running = True
+        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        self.server_socket.setsockopt(socket.SOL_SOCKET,
+                                      socket.SO_REUSEADDR, 1)
+
+        try:
+            self.server_socket.bind((self.bind_ip, self.port))
+            self.server_socket.listen(5)
+            print(f"TCP Server listening on {self.bind_ip}:{self.port}")
+
+            while self.running:
+                try:
+                    client_socket, client_addr = self.server_socket.accept()
+                    print(f"Connection from {client_addr[0]}:{client_addr[1]}")
+
+                    # Handle client directly (single-threaded)
+                    self._handle_client(client_socket, client_addr)
+
+                except socket.error as e:
+                    if self.running:
+                        print(f"Socket error: {e}")
+
+        except OSError as e:
+            if e.errno == 99:  # Cannot assign requested address
+                print(f"Error: Cannot bind to {self.bind_ip}:{self.port}")
+            elif e.errno == 98:  # Address already in use
+                print(f"Error: Port {self.port} is already in use.")
+            else:
+                print(f"Error binding to {self.bind_ip}:{self.port}: {e}")
+        except KeyboardInterrupt:
+            print("\nServer shutting down...")
+            exit_code = 0
+        except Exception as e:
+            print(f"Unexpected server error: {e}")
+            exit_code = 1
+        finally:
+            self.running = False
+            if self.server_socket:
+                self.server_socket.close()
+        sys.exit(exit_code)
+
+    def _handle_client(self, client_socket, client_addr):
+        """Handle individual client connection."""
+        total_bytes_received = 0
+        try:
+            while self.running:
+                data = client_socket.recv(4096)
+                if not data:
+                    break
+                client_socket.send(data)
+                total_bytes_received += len(data)
+
+            print(f"Total bytes received: {total_bytes_received}")
+        except socket.error as e:
+            print(f"Client {client_addr[0]}:{client_addr[1]} error: {e}")
+        finally:
+            client_socket.close()
+            print(f"Connection closed with {client_addr[0]}:{client_addr[1]}")
+
+
+class TCPClient:
+    """Simple TCP client using standard sockets."""
+
+    def __init__(self, src_ip, dst_ip, dst_port):
+        self.src_ip = src_ip
+        self.dst_ip = dst_ip
+        self.dst_port = dst_port
+
+    def connect_and_send(self, data_len, iterations=1, interval=0.1,
+                         unique_data=False):
+        """Connect to server and send data."""
+        print(f"TCP Client connecting to {self.dst_ip}:{self.dst_port}")
+
+        success_count = 0
+        correct_responses = 0
+
+        # Generate data once if not using unique data per iteration
+        shared_data_bytes = None
+        shared_md5 = None
+        if not unique_data:
+            shared_data_bytes = generate_random_bytes(data_len)
+            shared_md5 = calculate_md5(shared_data_bytes)
+            print(f"Generated shared buffer: {len(shared_data_bytes)} bytes, "
+                  f"MD5: {shared_md5}")
+
+        return_code = 0
+        for i in range(iterations):
+            iteration_result = self._process_iteration(
+                i, data_len, unique_data, shared_data_bytes, shared_md5)
+            if iteration_result['success']:
+                success_count += 1
+                if iteration_result['checksum_match']:
+                    correct_responses += 1
+            else:
+                return_code = 1
+
+            if i < iterations - 1:
+                time.sleep(interval)
+
+        print(f"Client completed: {success_count}/{iterations} "
+              f"successful connections")
+        print(f"MD5 checksum verification: "
+              f"{correct_responses}/{success_count} correct")
+
+        return_code = 0 if (correct_responses ==
+                           success_count and success_count > 0) else 1
+        return return_code
+
+    def _process_iteration(self, iteration_idx, data_len, unique_data,
+                          shared_data_bytes, shared_md5):
+        """Process a single iteration of the client test."""
+        try:
+            # Create socket and connect
+            client_socket = socket.socket(socket.AF_INET,
+                                          socket.SOCK_STREAM)
+
+            # Bind to specific source IP if provided
+            if self.src_ip != "0.0.0.0":
+                client_socket.bind((self.src_ip, 0))
+
+            client_socket.connect((self.dst_ip, self.dst_port))
+            print(f"Iteration {iteration_idx + 1}: Connected to server")
+
+            # Prepare data and calculate MD5 checksum
+            if unique_data:
+                data_bytes = generate_random_bytes(data_len)
+                original_md5 = calculate_md5(data_bytes)
+                print(f"Iteration {iteration_idx + 1}: Generated unique data, "
+                      f"MD5: {original_md5}")
+            else:
+                data_bytes = shared_data_bytes
+                original_md5 = shared_md5
+                print(f"Iteration {iteration_idx + 1}: Using shared buffer, "
+                      f"MD5: {original_md5}")
+
+            client_socket.send(data_bytes)
+            print(f"Iteration {iteration_idx + 1}: Sent {len(data_bytes)} "
+                  f"bytes")
+
+            # Receive echo response
+            response = self._receive_response(client_socket, len(data_bytes))
+            print(f"Iteration {iteration_idx + 1}: Received {len(response)} "
+                  f"bytes")
+
+            # Calculate MD5 of received data
+            received_md5 = calculate_md5(response)
+            print(f"Iteration {iteration_idx + 1}: Received MD5: "
+                  f"{received_md5}")
+
+            # Verify checksum
+            checksum_match = original_md5 == received_md5
+
+            if checksum_match:
+                print(f"Iteration {iteration_idx + 1}: ✓ MD5 checksum "
+                      f"verified correctly")
+            else:
+                print(f"Iteration {iteration_idx + 1}: ✗ MD5 checksum "
+                      f"mismatch!")
+
+            client_socket.close()
+            return {'success': True, 'checksum_match': checksum_match}
+
+        except ConnectionRefusedError:
+            print(f"Iteration {iteration_idx + 1}: Connection refused to "
+                  f"{self.dst_ip}:{self.dst_port}")
+            return {'success': False, 'checksum_match': False}
+        except socket.timeout:
+            print(f"Iteration {iteration_idx + 1}: Connection timeout to "
+                  f"{self.dst_ip}:{self.dst_port}")
+            return {'success': False, 'checksum_match': False}
+        except OSError as os_error:
+            self._handle_os_error(iteration_idx, os_error)
+            return {'success': False, 'checksum_match': False}
+        except socket.error as sock_error:
+            print(f"Iteration {iteration_idx + 1}: Socket error: {sock_error}")
+            return {'success': False, 'checksum_match': False}
+        except Exception as general_error:
+            print(f"Iteration {iteration_idx + 1}: Unexpected error: "
+                  f"{general_error}")
+            return {'success': False, 'checksum_match': False}
+
+    def _receive_response(self, client_socket, bytes_to_receive):
+        """Receive response data from server."""
+        response = b""
+        while len(response) < bytes_to_receive:
+            chunk = client_socket.recv(
+                min(4096, bytes_to_receive - len(response)))
+            if not chunk:
+                break
+            response += chunk
+        return response
+
+    def _handle_os_error(self, iteration_idx, os_error):
+        """Handle OS-specific errors."""
+        if os_error.errno == 99:  # Cannot assign requested address
+            print(f"Iteration {iteration_idx + 1}: Cannot bind to source IP "
+                  f"{self.src_ip}")
+        elif os_error.errno == 101:  # Network is unreachable
+            print(f"Iteration {iteration_idx + 1}: Network unreachable to "
+                  f"{self.dst_ip}:{self.dst_port}")
+        else:
+            print(f"Iteration {iteration_idx + 1}: Network error: {os_error}")
+
+
+def main():
+    """Main function to parse arguments and run TCP client or server."""
+    parser = argparse.ArgumentParser(
+        description="Simple TCP client/server for testing",
+        formatter_class=argparse.RawDescriptionHelpFormatter,
+        epilog="""
+COMMON OPTIONS:
+  --mode {client,server}  Run in client or server mode (required)
+  -p, --port PORT         TCP port (default: 8080)
+
+CLIENT MODE OPTIONS:
+  -s, --src-ip IP         Source IPv4 address (default: 0.0.0.0)
+  -d, --dst-ip IP         Destination IPv4 address (default: 0.0.0.0)
+  -n, --iterations N      Number of connections to make (default: 1)
+  -I, --interval SECS     Seconds between connections (default: 0.1)
+  -B, --payload-bytes N   Total TCP payload bytes (minimum: 500)
+  --unique-data           Generate unique random data for each iteration
+
+SERVER MODE OPTIONS:
+  --bind-ip IP            Server bind IP address (default: 172.16.1.2)
+
+""")
+
+    # Mode selection (required)
+    parser.add_argument("--mode", choices=['client', 'server'], required=True,
+                       help=argparse.SUPPRESS)
+
+    # Common arguments
+    parser.add_argument("-p", "--port", type=int, default=DEFAULT_PORT,
+                       help=argparse.SUPPRESS)
+
+    # Client mode arguments
+    parser.add_argument("-B", "--payload-bytes", type=int, default=None,
+                       help=argparse.SUPPRESS)
+    parser.add_argument("-s", "--src-ip", default=DEFAULT_SRC_IP,
+                       help=argparse.SUPPRESS)
+    parser.add_argument("-d", "--dst-ip", default=DEFAULT_DST_IP,
+                       help=argparse.SUPPRESS)
+    parser.add_argument("-I", "--interval", type=float, default=0.1,
+                       help=argparse.SUPPRESS)
+    parser.add_argument("-n", "--iterations", type=int, default=1,
+                       help=argparse.SUPPRESS)
+    parser.add_argument("--unique-data", action="store_true",
+                       help=argparse.SUPPRESS)
+
+    # Server mode arguments
+    parser.add_argument("--bind-ip", default="0.0.0.0",
+                       help=argparse.SUPPRESS)
+
+    args = parser.parse_args()
+
+    # Validate arguments
+    if args.port < 1 or args.port > 65535:
+        print(f"Error: Port {args.port} is out of valid range (1-65535)")
+        sys.exit(1)
+
+    if args.mode == 'client':
+        if args.iterations < 1:
+            print(f"Error: Iterations must be at least 1, "
+                  f"got {args.iterations}")
+            sys.exit(1)
+        if args.interval < 0:
+            print(f"Error: Interval cannot be negative, "
+                  f"got {args.interval}")
+            sys.exit(1)
+        if args.payload_bytes is not None and args.payload_bytes < 500:
+            print(f"Error: Payload bytes must be at least 500, "
+                  f"got {args.payload_bytes}")
+            sys.exit(1)
+
+    if args.mode == 'server':
+        # Run server
+        server = TCPServer(args.bind_ip, args.port)
+        server.start()
+    elif args.mode == 'client':
+        # Run client
+        client = TCPClient(args.src_ip, args.dst_ip, args.port)
+        success = client.connect_and_send(
+            max(0, int(args.payload_bytes)), args.iterations,
+            args.interval, args.unique_data)
+
+        # Summary
+        print(f"Client summary: payload_bytes={args.payload_bytes} "
+              f"iterations={args.iterations} success={success}")
+
+        sys.exit(success)
+
+
+if __name__ == "__main__":
+    main()
diff --git a/tests/udp_client.py b/tests/udp_client.py
new file mode 100755
index 000000000..cc67275f8
--- /dev/null
+++ b/tests/udp_client.py
@@ -0,0 +1,113 @@
+#!/usr/bin/env python3

I was wondering if we really need this code since we can trigger udp
fragmentation setting 'gateway_mtu' option and let nc fragment the
traffic. Am I missing something? Probably we can do something similar for TCP
too.

I would have to look into this further, don't know offhand.

Thanks for the review,

-Brian

Regards,
Lorenzo

+"""
+Scapy UDP client with MTU/payload control for testing fragmented UDP traffic.
+
+This module provides functionality to send fragmented UDP packets using Scapy,
+allowing control over MTU, payload size, and fragmentation behavior.
+"""
+import time
+import argparse
+import sys
+from scapy.all import IP, UDP, Raw, send, fragment
+
+# Defaults for same logical switch topology
+DEFAULT_SRC_IP = "172.16.1.3"
+DEFAULT_DST_IP = "172.16.1.2"
+
+
+def read_iface_mtu(ifname: str):
+    """Return MTU of interface or None if not available."""
+    try:
+        path = f"/sys/class/net/{ifname}/mtu"
+        with open(path, "r", encoding="utf-8") as f:
+            return int(f.read().strip())
+    except Exception:
+        return None
+
+
+def main():
+    """Main function to parse arguments and send fragmented UDP packets."""
+    parser = argparse.ArgumentParser(
+        description="Scapy UDP client with MTU/payload control")
+    parser.add_argument("-M", "--mtu", type=int, default=None,
+        help="Target MTU; payload ~ mtu-28 (IPv4+UDP).")
+    parser.add_argument("-B", "--payload-bytes", type=int, default=None,
+        help="Total UDP payload bytes (override default).")
+    parser.add_argument("-S", "--sport", type=int, default=4242,
+        help="UDP source port")
+    parser.add_argument("-D", "--dport", type=int, default=4242,
+        help="UDP destination port")
+    parser.add_argument("-s", "--src-ip", default=DEFAULT_SRC_IP,
+        help="Source IPv4 address")
+    parser.add_argument("-d", "--dst-ip", default=DEFAULT_DST_IP,
+        help="Destination IPv4 address")
+    parser.add_argument("-i", "--iface", default="client",
+        help="Egress interface (default: 'client').")
+    parser.add_argument("-I", "--interval", type=float, default=0.1,
+        help="Seconds between sends.")
+    parser.add_argument("-n", "--iterations", type=int, default=1,
+        help="Number of datagrams to send.")
+    args = parser.parse_args()
+
+    # Derive fragment size: for link MTU M, fragsize should typically be M-20
+    # (IP header)
+    if args.mtu is not None:
+        fragsize = max(68, int(args.mtu) - 20)
+    else:
+        fragsize = 1480  # default for 1500 MTU
+
+    # Build payload; size can be user-controlled or synthesized to ensure
+    # fragmentation
+    if args.payload_bytes is not None:
+        payload_len = max(0, int(args.payload_bytes))
+        payload = b"x" * payload_len
+    elif args.mtu is not None:
+        # Ensure payload exceeds fragsize-8 (UDP header) so we actually
+        # fragment.
+        base_len = max(0, int(args.mtu) - 28)
+        min_len_to_fragment = max(0, fragsize - 8 + 1)
+        payload_len = (base_len if base_len > min_len_to_fragment else
+                       fragsize * 2)
+        payload = b"x" * payload_len
+    else:
+        # No explicit size; synthesize a payload big enough to fragment at
+        # default fragsize
+        payload_len = fragsize * 2
+        payload = b"x" * payload_len
+
+    # Construct full IP/UDP packet and fragment it explicitly
+    ip_layer = IP(src=args.src_ip, dst=args.dst_ip)
+    udp_layer = UDP(sport=args.sport, dport=args.dport)
+    full_packet = ip_layer / udp_layer / Raw(load=payload)
+    frags = fragment(full_packet, fragsize=fragsize)
+
+    total_fragments_per_send = len(frags)
+    for _ in range(max(0, args.iterations)):
+        try:
+            send(frags, iface=args.iface, return_packets=True, verbose=False)
+        except OSError as e:
+            # Errno 90: Message too long (likely iface MTU < chosen --mtu)
+            if getattr(e, "errno", None) == 90:
+                iface_mtu = read_iface_mtu(args.iface)
+                mtu_note = (f"iface_mtu={iface_mtu}" if iface_mtu is not None
+                            else "iface_mtu=unknown")
+                print("ERROR: packet exceeds interface MTU. "
+                    f"iface={args.iface} {mtu_note} chosen_mtu="
+                    f"{args.mtu if args.mtu is not None else 1500} "
+                    f"fragsize={fragsize}. Set --mtu to the interface MTU.",
+                    file=sys.stderr)
+                sys.exit(2)
+            raise
+        time.sleep(args.interval)
+
+    # Summary
+    mtu_print = args.mtu if args.mtu is not None else 1500
+    total_frags = total_fragments_per_send * max(0, args.iterations)
+    print(f"payload_bytes={payload_len} mtu={mtu_print} fragsize={fragsize} "
+        f"fragments_per_datagram={total_fragments_per_send} "
+        f"total_fragments_sent={total_frags}")
+
+
+if __name__ == "__main__":
+    main()
+    sys.exit(0)
--
2.43.0

_______________________________________________
dev mailing list
[email protected]
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

_______________________________________________
dev mailing list
[email protected]
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

_______________________________________________
dev mailing list
[email protected]
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to