The branch, master has been updated
       via  58f5ad63f3a python:ndr: improve type annotation and docs for 
pack/unpack
       via  49e7f2e981a python:samdb: remove dsdb_Dn alias
       via  7c94f0c69cc py:samdb: add get_linearized to dsdb_dn
       via  588a1566ef5 py:samdb: drop unused methods (get_binary_integer, 
get_bytes)
       via  45f7444d608 pytest:repl_rodc: avoid using dsdb_dn.get_bytes
       via  14c917ca941 pytest: add key_credential_link_Dn tests
       via  0cedf27dcfd python: add helpers to construct KeyCredentialLinkDn 
objects
       via  aa05d4b31f1 python: add KeyCredentialLinkDn BinaryDn subtype
       via  1b6395781a0 samba_kcc: log when msDS-HasInstantiatedNCs is not 
BinaryDn
       via  aeaa9a8f721 samba_kcc: use dsdb_dn_guess()
       via  c9e814b3a4a dbcheck: make deleted_objects check case-insensitive
       via  62b0d13896b dbcheck: use new dsdb_dn types
       via  cce07c29615 pytest:krb5_base: use BinaryDn not dsdb_dn
       via  c7789954769 s4:pytest:repl_rodc: use BinaryDn not dsdb_dn
       via  1109a9d6d70 pytest:dsdb_dn: expand and rework tests
       via  a45214e4139 pytest:dsdb_dn: fix binary test
       via  8e740bf86d3 python:samdb: replace dsdb_Dn with stricter types
       via  e68bbd0afe6 py/common: add cmp_with_nones() helper function
       via  a579efadaad py:common: normalise_int32 checks bit size
       via  8f733c12628 pytests: test normalise_int32 against out-of-range 
numbers
       via  dae81d0cd97 pytests: move dsdb_dn tests out of common
      from  b4ab753c0db WHATSNEW: Start release notes for Samba 4.24.0pre1.

https://git.samba.org/?p=samba.git;a=shortlog;h=master


- Log -----------------------------------------------------------------
commit 58f5ad63f3a3c070e10157c305331fb0eeda57be
Author: Douglas Bagnall <[email protected]>
Date:   Wed Jul 9 12:50:31 2025 +1200

    python:ndr: improve type annotation and docs for pack/unpack
    
    Signed-off-by: Douglas Bagnall <[email protected]>
    Reviewed-by: Gary Lockyer <[email protected]>
    
    Autobuild-User(master): Douglas Bagnall <[email protected]>
    Autobuild-Date(master): Fri Aug  8 00:29:00 UTC 2025 on atb-devel-224

commit 49e7f2e981a30d99029b6a3cf1a42f879ae9b0e7
Author: Douglas Bagnall <[email protected]>
Date:   Wed Jul 30 16:05:23 2025 +1200

    python:samdb: remove dsdb_Dn alias
    
    Signed-off-by: Douglas Bagnall <[email protected]>
    Reviewed-by: Gary Lockyer <[email protected]>

commit 7c94f0c69cc54f34a367e6c4a9a05b2377ed47a3
Author: Douglas Bagnall <[email protected]>
Date:   Wed Jul 30 16:40:15 2025 +1200

    py:samdb: add get_linearized to dsdb_dn
    
    Signed-off-by: Douglas Bagnall <[email protected]>
    Reviewed-by: Gary Lockyer <[email protected]>

commit 588a1566ef5c5a07101a138e170f26683e4e2063
Author: Douglas Bagnall <[email protected]>
Date:   Wed Jul 30 14:51:32 2025 +1200

    py:samdb: drop unused methods (get_binary_integer, get_bytes)
    
    get_binary_integer() is still a method on BinaryDn, but not on
    StringDn and PlainDn where it makes no sense.
    
    x.get_bytes() is merely an alias for x.binary.
    
    Signed-off-by: Douglas Bagnall <[email protected]>
    Reviewed-by: Gary Lockyer <[email protected]>

commit 45f7444d608d61856d30d6c5696ab7409a73ab08
Author: Douglas Bagnall <[email protected]>
Date:   Thu Jun 26 13:13:31 2025 +1200

    pytest:repl_rodc: avoid using dsdb_dn.get_bytes
    
    Nobody else uses it, so let's just use .binary.
    
    Signed-off-by: Douglas Bagnall <[email protected]>
    Reviewed-by: Gary Lockyer <[email protected]>

commit 14c917ca94157afbf1abf0a2743abf7e5d9baf32
Author: Douglas Bagnall <[email protected]>
Date:   Fri Jul 11 13:28:33 2025 +1200

    pytest: add key_credential_link_Dn tests
    
    Signed-off-by: Douglas Bagnall <[email protected]>
    Reviewed-by: Gary Lockyer <[email protected]>

commit 0cedf27dcfda591a2421950c5f4ad38db41eaff6
Author: Douglas Bagnall <[email protected]>
Date:   Wed Jul 30 15:15:04 2025 +1200

    python: add helpers to construct KeyCredentialLinkDn objects
    
    We want to ensure the as best we can that the binary blob is in a
    useful format. This will be used by samba-tool.
    
    Signed-off-by: Douglas Bagnall <[email protected]>
    Reviewed-by: Gary Lockyer <[email protected]>

commit aa05d4b31f15ec4365282dcb33ce45aed965b642
Author: Douglas Bagnall <[email protected]>
Date:   Thu Jul 17 12:02:10 2025 +1200

    python: add KeyCredentialLinkDn BinaryDn subtype
    
    This works as a BinaryDn, but it also does validation to ensure the
    binary contains a KEYCREDENTIALLINK_BLOB.
    
    Signed-off-by: Douglas Bagnall <[email protected]>
    Reviewed-by: Gary Lockyer <[email protected]>

commit 1b6395781a022ae7c3a802480bc1d32109c45cb9
Author: Douglas Bagnall <[email protected]>
Date:   Wed Jul 30 14:28:22 2025 +1200

    samba_kcc: log when msDS-HasInstantiatedNCs is not BinaryDn
    
    Signed-off-by: Douglas Bagnall <[email protected]>
    Reviewed-by: Gary Lockyer <[email protected]>

commit aeaa9a8f721affda5dac751f7a54575538e81e4e
Author: Douglas Bagnall <[email protected]>
Date:   Thu Jun 26 12:46:14 2025 +1200

    samba_kcc: use dsdb_dn_guess()
    
    We also remove a bit of .decode()ing, which now happens automatically.
    
    Signed-off-by: Douglas Bagnall <[email protected]>
    Reviewed-by: Gary Lockyer <[email protected]>

commit c9e814b3a4aa9de44698ac45f9542432256910e8
Author: Douglas Bagnall <[email protected]>
Date:   Thu Jun 26 12:45:24 2025 +1200

    dbcheck: make deleted_objects check case-insensitive
    
    While a BinaryDn.prefix is generated in upper-case, and
    dsdb.DS_GUID_DELETED_OBJECTS_CONTAINER is upper-case, we can avoid
    having to think about that by comparing the actual bytes.
    
    Signed-off-by: Douglas Bagnall <[email protected]>
    Reviewed-by: Gary Lockyer <[email protected]>

commit 62b0d13896b2ee7645c995cd1f1ed6af053f592d
Author: Douglas Bagnall <[email protected]>
Date:   Thu Jun 26 12:42:54 2025 +1200

    dbcheck: use new dsdb_dn types
    
    Signed-off-by: Douglas Bagnall <[email protected]>
    Reviewed-by: Gary Lockyer <[email protected]>

commit cce07c296152a8da8c5de3ce4532a25172380240
Author: Douglas Bagnall <[email protected]>
Date:   Thu Jun 26 11:58:30 2025 +1200

    pytest:krb5_base: use BinaryDn not dsdb_dn
    
    Signed-off-by: Douglas Bagnall <[email protected]>
    Reviewed-by: Gary Lockyer <[email protected]>

commit c77899547699b2c13f29f3c9458e147755c3d5c6
Author: Douglas Bagnall <[email protected]>
Date:   Thu Jun 26 11:57:45 2025 +1200

    s4:pytest:repl_rodc: use BinaryDn not dsdb_dn
    
    Signed-off-by: Douglas Bagnall <[email protected]>
    Reviewed-by: Gary Lockyer <[email protected]>

commit 1109a9d6d70048039272be4df84cf6b89237cd4f
Author: Douglas Bagnall <[email protected]>
Date:   Thu Jun 26 16:17:14 2025 +1200

    pytest:dsdb_dn: expand and rework tests
    
    We now test the specialised StringDn and BinaryDn types.
    
    There are new assertions about case-insensitivity in binary hex
    strings and BinaryDn prefix validation.
    
    Signed-off-by: Douglas Bagnall <[email protected]>
    Reviewed-by: Gary Lockyer <[email protected]>

commit a45214e41399011f19eee409db15ab2b5baacbce
Author: Douglas Bagnall <[email protected]>
Date:   Wed Jul 30 12:35:22 2025 +1200

    pytest:dsdb_dn: fix binary test
    
    Signed-off-by: Douglas Bagnall <[email protected]>
    Reviewed-by: Gary Lockyer <[email protected]>

commit 8e740bf86d3941203fc0b604be74d7c389eca98e
Author: Douglas Bagnall <[email protected]>
Date:   Thu Jun 19 13:25:08 2025 +1200

    python:samdb: replace dsdb_Dn with stricter types
    
    dsdb_Dn() was a catchall for DN+Binary, DN+String, and plain DNs which
    needed to be sorted in a particular way. This meant it treated none of
    them exactly right.
    
    For example, a binary dsdb_Dn would be compared on the string
    representation of the binary portion, so 'B:2:ff:CN=foo' would not
    equal 'B:2:FF:CN=foo', when it should.
    
    It meant a field that expected a binary dsdb_DN would also accept a
    plain DN or a string DN, which is never actually allowed.
    
    Also the parsing was a bit dodgy, so a string like 'B:6:ff:CN=foo'
    would be accepted, when the length of the binary portion ("ff") is
    obviously different from that given ("6").
    
    Here we solve many of the problems by making stricter subclasses but
    leaving a compatibility shim in place so that existing code continues
    to work.
    
    There is one INCOMPATIBLE change. Previously the `.binary` attribute
    of a dsdb_Dn was the hex-string, while now it is the actual binary
    data. In the case of StringDn, this means the utf-8 bytes.
    
    This affects dbcheck, which is fixed here (the .prefix assignment now
    correctly sets .binary).
    
    Signed-off-by: Douglas Bagnall <[email protected]>
    Reviewed-by: Gary Lockyer <[email protected]>

commit e68bbd0afe602f6a70b1ba07fca47648239553df
Author: Douglas Bagnall <[email protected]>
Date:   Thu Jul 17 11:42:54 2025 +1200

    py/common: add cmp_with_nones() helper function
    
    Signed-off-by: Douglas Bagnall <[email protected]>
    Reviewed-by: Gary Lockyer <[email protected]>

commit a579efadaadbd12f1debb70e4383db87cf20c86a
Author: Douglas Bagnall <[email protected]>
Date:   Wed Jul 30 11:57:02 2025 +1200

    py:common: normalise_int32 checks bit size
    
    Signed-off-by: Douglas Bagnall <[email protected]>
    Reviewed-by: Gary Lockyer <[email protected]>

commit 8f733c1262813eac42a511faa0e1a789d8063277
Author: Douglas Bagnall <[email protected]>
Date:   Thu Jun 26 16:30:48 2025 +1200

    pytests: test normalise_int32 against out-of-range numbers
    
    For example, we don't want to "normalise" 0x9876543210 to
    0x9776543210, or 0x200000000 to 0x100000000. That is just causing
    random damage to 64 bit values without achieving the sign switch.
    
    Signed-off-by: Douglas Bagnall <[email protected]>
    Reviewed-by: Gary Lockyer <[email protected]>

commit dae81d0cd97e142a0e9a794f84558fb3899a7cc6
Author: Douglas Bagnall <[email protected]>
Date:   Thu Jun 26 16:16:21 2025 +1200

    pytests: move dsdb_dn tests out of common
    
    dsdb_Dn hasn't been in samba.common since
    85d2ff2f0003b106ca84866b7e7893723f1dd93c
    and the tests should follow.
    
    Although dsdb_Dn is currently in samba.samdb, we aren't moving the
    tests to samba.tests.samdb, because those tests need a real AD
    environment whereas these ones can run more cheaply in the "none"
    environment.
    
    Another patch will improve the remaining samba.common tests.
    
    Signed-off-by: Douglas Bagnall <[email protected]>
    Reviewed-by: Gary Lockyer <[email protected]>

-----------------------------------------------------------------------

Summary of changes:
 python/samba/common.py                   |  22 ++-
 python/samba/dbchecker.py                |  33 ++---
 python/samba/kcc/kcc_utils.py            |  29 ++--
 python/samba/kcc/ldif_import_export.py   |   6 +-
 python/samba/key_credential_link.py      | 216 ++++++++++++++++++++++++++++
 python/samba/ndr.py                      |  16 ++-
 python/samba/samdb.py                    | 234 +++++++++++++++++++++++++------
 python/samba/tests/common.py             |  54 +++----
 python/samba/tests/dsdb_dn.py            | 189 +++++++++++++++++++++++++
 python/samba/tests/krb5/kdc_base_test.py |   6 +-
 selftest/tests.py                        |   1 +
 source4/torture/drs/python/repl_rodc.py  |   8 +-
 12 files changed, 690 insertions(+), 124 deletions(-)
 create mode 100644 python/samba/key_credential_link.py
 create mode 100644 python/samba/tests/dsdb_dn.py


Changeset truncated at 500 lines:

diff --git a/python/samba/common.py b/python/samba/common.py
index eafc4175b4c..a76110bc680 100644
--- a/python/samba/common.py
+++ b/python/samba/common.py
@@ -30,6 +30,23 @@ def cmp(x, y):
     return (x > y) - (x < y)
 
 
+def cmp_with_nones(x, y):
+    """This is like cmp(), but will cope if a value is None.
+
+    We sort Nones to the start.
+    """
+    # avoids
+    # TypeError: '>' not supported between instances of 'NoneType' and 'int'
+    if x == y:
+        return 0
+    if x is None:
+        return -1
+    if y is None:
+        return 1
+
+    return (x > y) - (x < y)
+
+
 def confirm(msg, forced=False, allow_all=False):
     """confirm an action with the user
 
@@ -65,7 +82,10 @@ def confirm(msg, forced=False, allow_all=False):
 
 def normalise_int32(ivalue):
     """normalise a ldap integer to signed 32 bit"""
-    if int(ivalue) & 0x80000000 and int(ivalue) > 0:
+    ivalue = int(ivalue)
+    if ivalue > 0xffffffff or ivalue < -0x80000000:
+        raise ValueError(f"{ivalue} (0x{ivalue:x}) does not fit in 32 bits.")
+    if ivalue >= 0x80000000:
         return str(int(ivalue) - 0x100000000)
     return str(ivalue)
 
diff --git a/python/samba/dbchecker.py b/python/samba/dbchecker.py
index 53d0030e941..0bc30d008d9 100644
--- a/python/samba/dbchecker.py
+++ b/python/samba/dbchecker.py
@@ -27,7 +27,7 @@ from samba.dcerpc import misc
 from samba.dcerpc import drsuapi
 from samba.ndr import ndr_unpack, ndr_pack
 from samba.dcerpc import drsblobs
-from samba.samdb import dsdb_Dn
+from samba.samdb import BinaryDn, StringDn, dsdb_dn_by_syntax_oid, 
dsdb_dn_guess
 from samba.dcerpc import security
 from samba.descriptor import (
         get_wellknown_sds,
@@ -356,7 +356,7 @@ class dbcheck(object):
             listwko = []
             proposed_objectguid = None
             for o in wko:
-                dsdb_dn = dsdb_Dn(self.samdb, o.decode('utf8'), 
dsdb.DSDB_SYNTAX_BINARY_DN)
+                dsdb_dn = BinaryDn(self.samdb, o)
                 if self.is_deleted_objects_dn(dsdb_dn):
                     self.report("wellKnownObjects had duplicate Deleted 
Objects value %s" % o)
                     # We really want to put this back in the same spot
@@ -602,9 +602,10 @@ newSuperior: %s""" % (str(from_dn), str(to_rdn), 
str(to_base)))
                           validate=False):
             self.report("Removed duplicate value on attribute %s" % attrname)
 
-    def is_deleted_objects_dn(self, dsdb_dn):
-        """see if a dsdb_Dn is the special Deleted Objects DN"""
-        return dsdb_dn.prefix == "B:32:%s:" % 
dsdb.DS_GUID_DELETED_OBJECTS_CONTAINER
+    def is_deleted_objects_dn(self, binary_dn):
+        """see if a BinaryDn is the special Deleted Objects DN"""
+        deleted_objects_guid = 
bytes.fromhex(dsdb.DS_GUID_DELETED_OBJECTS_CONTAINER)
+        return binary_dn.binary == deleted_objects_guid
 
     def err_missing_objectclass(self, dn):
         """handle object without objectclass"""
@@ -811,7 +812,7 @@ newSuperior: %s""" % (str(from_dn), str(to_rdn), 
str(to_base)))
         """fix missing <SID=...> on linked attributes"""
         self.report("ERROR: missing DN SID component for %s in object %s - %s" 
% (attrname, dn, val))
 
-        if len(dsdb_dn.prefix) != 0:
+        if isinstance(dn, (BinaryDn, StringDn)):
             self.report("Not fixing missing DN SID on DN+BINARY or DN+STRING")
             return
 
@@ -1091,7 +1092,7 @@ newSuperior: %s""" % (str(from_dn), str(to_rdn), 
str(to_base)))
                                 controls=["show_deleted:0", "extended_dn:0", 
"reveal_internals:0"])
         syntax_oid = 
self.samdb_schema.get_syntax_oid_from_lDAPDisplayName(attrname)
         for val in res[0][attrname]:
-            dsdb_dn = dsdb_Dn(self.samdb, val.decode('utf8'), syntax_oid)
+            dsdb_dn = dsdb_dn_by_syntax_oid(self.samdb, val, syntax_oid)
             guid2 = dsdb_dn.dn.get_extended_component("GUID")
             if guid == guid2:
                 return dsdb_dn
@@ -1117,7 +1118,7 @@ newSuperior: %s""" % (str(from_dn), str(to_rdn), 
str(to_base)))
             self.duplicate_link_cache[duplicate_cache_key] = False
 
         for val in obj[forward_attr]:
-            dsdb_dn = dsdb_Dn(self.samdb, val.decode('utf8'), forward_syntax)
+            dsdb_dn = dsdb_dn_by_syntax_oid(self.samdb, val, forward_syntax)
 
             # all DNs should have a GUID component
             guid = dsdb_dn.dn.get_extended_component("GUID")
@@ -1232,7 +1233,8 @@ newSuperior: %s""" % (str(from_dn), str(to_rdn), 
str(to_base)))
             raise
 
         for r in res:
-            target_dn = dsdb_Dn(self.samdb, r.dn.extended_str(), 
forward_syntax)
+            target_dn = dsdb_dn_by_syntax_oid(self.samdb, r.dn.extended_str(),
+                                              forward_syntax)
 
             guid = target_dn.dn.get_extended_component("GUID")
             guidstr = str(misc.GUID(guid))
@@ -1350,14 +1352,14 @@ newSuperior: %s""" % (str(from_dn), str(to_rdn), 
str(to_base)))
 
             # We now construct the sorted dn values.
             # They're sorted by the objectGUID of the target
-            # See dsdb_Dn.__cmp__()
+            # See BaseDsdbDn.__cmp__()
             vals = [str(dn) for dn in sorted(forward_links)]
             self.err_recover_forward_links(obj, attrname, vals)
             # We should continue with the fixed values
             obj[attrname] = ldb.MessageElement(vals, 0, attrname)
 
         for val in obj[attrname]:
-            dsdb_dn = dsdb_Dn(self.samdb, val.decode('utf8'), syntax_oid)
+            dsdb_dn = dsdb_dn_by_syntax_oid(self.samdb, val, syntax_oid)
 
             # all DNs should have a GUID component
             guid = dsdb_dn.dn.get_extended_component("GUID")
@@ -1399,7 +1401,6 @@ newSuperior: %s""" % (str(from_dn), str(to_rdn), 
str(to_base)))
 
             if fixing_msDS_HasInstantiatedNCs:
                 dsdb_dn.prefix = "B:8:%08X:" % int(res[0]['instanceType'][0])
-                dsdb_dn.binary = "%08X" % int(res[0]['instanceType'][0])
 
                 if str(dsdb_dn) != str(val):
                     error_count += 1
@@ -1496,7 +1497,7 @@ newSuperior: %s""" % (str(from_dn), str(to_rdn), 
str(to_base)))
                     # otherwise the LDB code will correct it on the way through
                     # (Note: we still want to preserve the DSDB DN prefix in 
the
                     # case of binary DNs)
-                    bad_dn = dsdb_dn.prefix + dsdb_dn.dn.get_linearized()
+                    bad_dn = dsdb_dn.get_linearized()
                     self.err_dn_string_component_old(obj.dn, attrname, bad_dn,
                                                      dsdb_dn, res[0].dn)
                 continue
@@ -1508,7 +1509,7 @@ newSuperior: %s""" % (str(from_dn), str(to_rdn), 
str(to_base)))
             match_count = 0
             if reverse_link_name in res[0]:
                 for v in res[0][reverse_link_name]:
-                    v_dn = dsdb_Dn(self.samdb, v.decode('utf8'))
+                    v_dn = dsdb_dn_guess(self.samdb, v)
                     v_guid = v_dn.dn.get_extended_component("GUID")
                     v_blob = v_dn.dn.get_extended_component("RMD_FLAGS")
                     v_rmd_flags = 0
@@ -1525,7 +1526,7 @@ newSuperior: %s""" % (str(from_dn), str(to_rdn), 
str(to_base)))
                         # Forward binary multi-valued linked attribute
                         forward_count = 0
                         for w in obj[attrname]:
-                            w_guid = dsdb_Dn(self.samdb, 
w.decode('utf8')).dn.get_extended_component("GUID")
+                            w_guid = dsdb_dn_guess(self.samdb, 
w).dn.get_extended_component("GUID")
                             if w_guid == guid:
                                 forward_count += 1
 
@@ -1533,7 +1534,7 @@ newSuperior: %s""" % (str(from_dn), str(to_rdn), 
str(to_base)))
                             continue
             expected_count = 0
             for v in obj[attrname]:
-                v_dn = dsdb_Dn(self.samdb, v.decode('utf8'))
+                v_dn = dsdb_dn_guess(self.samdb, v)
                 v_guid = v_dn.dn.get_extended_component("GUID")
                 v_blob = v_dn.dn.get_extended_component("RMD_FLAGS")
                 v_rmd_flags = 0
diff --git a/python/samba/kcc/kcc_utils.py b/python/samba/kcc/kcc_utils.py
index 326889d8488..1cb4bb15e88 100644
--- a/python/samba/kcc/kcc_utils.py
+++ b/python/samba/kcc/kcc_utils.py
@@ -29,7 +29,7 @@ from samba.dcerpc import (
     drsuapi,
     misc,
 )
-from samba.samdb import dsdb_Dn
+from samba.samdb import dsdb_dn_guess, BinaryDn
 from samba.ndr import ndr_unpack, ndr_pack
 from collections import Counter
 
@@ -743,10 +743,9 @@ class DirectoryServiceAgent(object):
                 # listed.  For instance DCs normally have 3 hasMasterNCs
                 # listed.
                 for value in res[0][k]:
-                    # Turn dn into a dsdb_Dn so we can use
-                    # its methods to parse a binary DN
-                    dsdn = dsdb_Dn(samdb, value.decode('utf8'))
-                    flags = dsdn.get_binary_integer()
+                    # msDS-HasInstantiatedNCs is a BinaryDN, but the
+                    # others are plain DNs.
+                    dsdn = dsdb_dn_guess(samdb, value)
                     dnstr = str(dsdn.dn)
 
                     if dnstr not in tmp_table:
@@ -756,7 +755,15 @@ class DirectoryServiceAgent(object):
                         rep = tmp_table[dnstr]
 
                     if k == "msDS-HasInstantiatedNCs":
-                        rep.set_instantiated_flags(flags)
+                        # msDS-HasInstantiatedNCs should only be DN+Binary
+                        # (MS-ADTS 6.1.1.2.2.1.2.1.1 and 6.1.2.3.1)
+                        # but sometimes we see it as a plain DN.
+                        if isinstance(dsdn, BinaryDn):
+                            flags = dsdn.get_binary_integer()
+                            rep.set_instantiated_flags(flags)
+                        else:
+                            print("msDS-HasInstantiatedNCsis not a BinaryDn 
'{dsdn}'",
+                                  file=sys.stderr)
                         continue
 
                     rep.identify_by_dsa_attr(samdb, k)
@@ -991,7 +998,7 @@ class NTDSConnection(object):
                            "for (%s)" % (self.dnstr))
 
         if "transportType" in msg:
-            dsdn = dsdb_Dn(samdb, msg["transportType"][0].decode('utf8'))
+            dsdn = dsdb_dn_guess(samdb, msg["transportType"][0])
             self.load_connection_transport(samdb, str(dsdn.dn))
 
         if "schedule" in msg:
@@ -1001,7 +1008,7 @@ class NTDSConnection(object):
             self.whenCreated = ldb.string_to_time(str(msg["whenCreated"][0]))
 
         if "fromServer" in msg:
-            dsdn = dsdb_Dn(samdb, msg["fromServer"][0].decode('utf8'))
+            dsdn = dsdb_dn_guess(samdb, msg["fromServer"][0])
             self.from_dnstr = str(dsdn.dn)
             assert self.from_dnstr is not None
 
@@ -1368,7 +1375,7 @@ class Partition(NamingContext):
                 continue
 
             for value in msg[k]:
-                dsdn = dsdb_Dn(samdb, value.decode('utf8'))
+                dsdn = dsdb_dn_guess(samdb, value)
                 dnstr = str(dsdn.dn)
 
                 if k == "nCName":
@@ -1928,7 +1935,7 @@ class Transport(object):
 
         if "bridgeheadServerListBL" in msg:
             for value in msg["bridgeheadServerListBL"]:
-                dsdn = dsdb_Dn(samdb, value.decode('utf8'))
+                dsdn = dsdb_dn_guess(samdb, value)
                 dnstr = str(dsdn.dn)
                 if dnstr not in self.bridgehead_list:
                     self.bridgehead_list.append(dnstr)
@@ -2188,7 +2195,7 @@ class SiteLink(object):
 
         if "siteList" in msg:
             for value in msg["siteList"]:
-                dsdn = dsdb_Dn(samdb, value.decode('utf8'))
+                dsdn = dsdb_dn_guess(samdb, value)
                 guid = misc.GUID(dsdn.dn.get_extended_component('GUID'))
                 dnstr = str(dsdn.dn)
                 if (guid, dnstr) not in self.site_list:
diff --git a/python/samba/kcc/ldif_import_export.py 
b/python/samba/kcc/ldif_import_export.py
index 41f0fd75778..ad584355151 100644
--- a/python/samba/kcc/ldif_import_export.py
+++ b/python/samba/kcc/ldif_import_export.py
@@ -23,7 +23,7 @@ import os
 
 from samba import Ldb, ldb, read_and_sub_file
 from samba.auth import system_session
-from samba.samdb import SamDB, dsdb_Dn
+from samba.samdb import SamDB, dsdb_dn_guess
 
 
 class LdifError(Exception):
@@ -231,8 +231,8 @@ def samdb_to_ldif_file(samdb, dburl, lp, creds, ldif_file):
                     if k in ncattrs:
                         for value in res_msg[k]:
                             # Some of these have binary DNs so
-                            # use dsdb_Dn to split out relevant parts
-                            dsdn = dsdb_Dn(samdb, value.decode('utf8'))
+                            # use dsdb_dn_guess to split out relevant parts.
+                            dsdn = dsdb_dn_guess(samdb, value)
                             dnstr = str(dsdn.dn)
                             if dnstr not in nclist:
                                 nclist.append(dnstr)
diff --git a/python/samba/key_credential_link.py 
b/python/samba/key_credential_link.py
new file mode 100644
index 00000000000..2ef5e00cbff
--- /dev/null
+++ b/python/samba/key_credential_link.py
@@ -0,0 +1,216 @@
+# Unix SMB/CIFS implementation.
+# Copyright (C) Catalyst.Net Ltd 2025
+#
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <https://www.gnu.org/licenses/>.
+#
+
+"""Functions for processing key_credential_link"""
+
+from hashlib import sha256
+import struct
+from typing import Optional, Union
+
+from cryptography.hazmat.primitives.serialization import (
+    load_der_public_key,
+    load_pem_public_key,
+    PublicFormat,
+    Encoding)
+
+from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey
+
+from cryptography.x509 import (
+    load_pem_x509_certificate,
+    load_der_x509_certificate)
+
+
+from samba.samdb import SamDB, BinaryDn
+from samba.ndr import ndr_unpack, ndr_pack
+from ldb import Dn
+from samba.dcerpc import keycredlink
+
+
+class KeyCredentialLinkDn(BinaryDn):
+    """KeyCredentialLink attributes are stored as DN+Binary.
+
+    The binary part is a KEYCREDENTIALLINK_BLOB, which is basically an
+    array of KEYCREDENTIALLINK_ENTRY collectively describing a public
+    key.
+
+    Usually the DN refers to the object the KeyCredentialLink was
+    found on.
+    """
+    # We make .binary a @property, so that BinaryDn's .parse() and
+    # .prefix just work without knowing that assigning to .binary is
+    # doing validation checks.
+    blob = None
+
+    @property
+    def binary(self) -> bytes:
+        """The binary is stored as a keycredlink.KEYCREDENTIALLINK_BLOB"""
+        if self.blob is None:
+            return None
+        return ndr_pack(self.blob)
+
+    @binary.setter
+    def binary(self, value:bytes):
+        try:
+            self.blob = ndr_unpack(keycredlink.KEYCREDENTIALLINK_BLOB,
+                                   value)
+        except Exception as e:
+            raise ValueError("Could not parse value as KEYCREDENTIALLINK_BLOB "
+                             f" (internal error: {e})")
+
+
+def get_public_key(data:bytes, encoding:str):
+    """decode a key in PEM or DER format.
+
+    If it turns out to be a certificate or something, we try to get
+    the public key from that.
+
+    So far only RSA keys are supported.
+    """
+    if encoding is None:
+        if data[:11] == b'-----BEGIN ':
+            encoding = 'PEM'
+        else:
+            encoding = 'DER'
+
+    encoding = encoding.upper()
+
+    # The cryptography module also supports ssh keys, PKCS1, and other
+    # formats, as well as non-RSA keys and extracting public keys from
+    # private. It might not be wise to tolerate all of this, but we
+    # can do it by adding to key_fns and cert_fns here.
+    if encoding == 'PEM':
+        key_fns = [load_pem_public_key]
+        cert_fns = [load_pem_x509_certificate]
+    elif encoding == 'DER':
+        key_fns = [load_der_public_key]
+        cert_fns = [load_der_x509_certificate]
+    else:
+        raise ValueError(f"Public key encoding '{encoding}' not supported "
+                         "(try 'PEM' or 'DER')")
+
+    key = None
+    for fn in key_fns:
+        try:
+            key = fn(data)
+            break
+        except ValueError:
+            continue
+
+    if key is None:
+        for fn in cert_fns:
+            try:
+                cert = fn(data)
+                key = cert.public_key()
+                break
+            except ValueError:
+                continue
+
+    if key is None:
+        raise ValueError("could not decode public key")
+
+    if not isinstance(key, RSAPublicKey):
+        raise ValueError("Currently only RSA Public Keys are supported "
+                         f"(not '{key}')")
+
+    return key
+
+
+def kcl_entry_bytes(entry_type:int, data:bytes) -> bytes:
+    """helper to pack key credential link entries"""
+    return struct.pack('<HB', len(data), entry_type) + data
+
+
+def create_key_credential_link(samdb: SamDB,
+                               target: Union[str, Dn],
+                               data: bytes,
+                               encoding: Optional[str] = None,
+                               force: bool = False):
+    """Convert a public key in a common format into a binary DN"""
+    if not force:
+        res = samdb.search(base=target)
+        if len(res) == 0:
+            raise ValueError(f"link target {target} does not exist")
+
+    key = get_public_key(data, encoding)
+
+    if key.key_size != 2048:
+        # According to [MS-ADTS] 2.2.20.5.1, KEY_USAGE_NGC means a
+        # 2048 bit public key.
+        if not force:
+            raise ValueError(f"2048 bit RSA key expected, not {key.key_size}")
+
+    key_bytes = key.public_bytes(Encoding.DER,
+                                 PublicFormat.SubjectPublicKeyInfo)
+
+    # that's the key.
+    # but there's more.
+    kcl_header = bytes.fromhex("00 02 00 00")  # Always version 2
+
+    # Entries are added in the enum order, as follows.
+    #
+    # Here '**' means MUST exist, '*' means SHOULD, and '-' means
+    # SHOULD which we ignore. We ignore all the un-SHOULDed values
+    # ([MS-ADTS] 2.2.20.6). For KeyUsage, only use KEY_USAGE_NGC.
+    #
+    # ** 1 KeyID            hash of the key material
+    #  * 2 KeyHash          hash of following entries (i.e. 3, 4, 9)
+    # ** 3 KeyMaterial      the key
+    # ** 4 KeyUsage         KEY_USAGE_NGC, KEY_USAGE_FIDO, or KEY_USAGE_FEK
+    #    5 KeySource        KEY_SOURCE_AD.
+    #    6 DeviceId         16 byte device ID (GUID, I guess) or zeros
+    #    7 CustomKeyInformation  CUSTOM_KEY_INFORMATION struct
+    #  - 8 KeyApproximateLastLogonTimeStamp  nttime
+    #  * 9 KeyCreationTime   nttime
+
+    # sha256 of the actual key
+    kcl_key_id = kcl_entry_bytes(keycredlink.KeyID,
+                                 sha256(key_bytes).digest())
+
+    # the actual key
+    kcl_material = kcl_entry_bytes(keycredlink.KeyMaterial,
+                                   key_bytes)
+
+    # always KEY_USAGE_NGC
+    kcl_key_usage = kcl_entry_bytes(keycredlink.KeyUsage,
+                                    keycredlink.KEY_USAGE_NGC.to_bytes())
+
+    # nttime for now
+    kcl_creation = kcl_entry_bytes(keycredlink.KeyCreationTime,
+                                   struct.pack('<Q', samdb.get_nttime()))
+
+    # always KEY_SOURCE_AD
+    #kcl_key_source = kcl_entry_bytes(keycredlink.KeySource,
+    #                                 KEY_SOURCE_AD.to_bytes())
+
+    # the KeyHash field is a sha256 of all the values after the
+    # KeyHash field.
+
+    kcl_key_hash = kcl_entry_bytes(keycredlink.KeyHash,
+                                   sha256(kcl_material +
+                                          kcl_key_usage +
+                                          kcl_creation).digest())
+


-- 
Samba Shared Repository

Reply via email to