The branch, master has been updated
       via  b5be313d92f s4:selftest: samba.tests.krb5.netlogon don't need 
explicit FAST_SUPPORT
       via  b0d6c639e4a python:tests/krb5: let netlogon.py test referral ticket 
for SEC_CHAN_DNS_DOMAIN
       via  1510aad09ba python:tests/krb5: allow get_service_ticket to accept a 
trust referral ticket without kvno
       via  3d41954195a python:tests/krb5: allow tickets without a kvno
       via  4e6724e1228 python:tests/krb5: let netlogon.py export changed 
passwords to keytab
       via  ca795ef4720 python:tests/krb5: add domain trust tests to netlogon.py
       via  9e58d057a09 python:tests/krb5: add a create_trust() helper function 
to test trusted domains
       via  9520aea8b0f python:tests/krb5: allow exporting a keytab file of the 
accounts used by the tests
       via  122af1c77e0 python:tests/krb5: add 
KerberosCredentials.[g|s]et_trust_{incoming,outgoing,account}_creds
       via  6c96f9499f5 pycredentials: add [g|s]et_old_nt_hash()
       via  978a66f2bd5 s4:rpc_server/netlogon: fix error codes for 
netr_NetrLogonSendToSam() with SEC_CHAN_RODC
       via  ed728597d2a s4:rpc_server/netlogon: an RODC is not allowed to call 
netr_ServerPasswordGet()
       via  3dd918f3186 python:tests/krb5: let netlogon.py run the tests also 
as rodc
       via  6c21a74c44e python:tests/krb5: allow netlogon.py tests to work 
against a KDC with claims enabled
       via  65812d642da python:tests/krb5: allow 
get_mock_rodc_krbtgt_creds(preserve=False) to create a tmp rodc
       via  a2864b3c4c7 python:tests/krb5: fix etypes_to_test values in 
RawKerberosTest
       via  92292aaa0a7 s4:rpc_server/netlogon: fill 
netlogon_creds_CredentialState->tdo_guid
       via  4349ee4586d schannel.idl: add tdo_guid to 
netlogon_creds_CredentialState
      from  56947612452 s4:dsdb: fix logic of dsdb_trust_routing_by_name()

https://git.samba.org/?p=samba.git;a=shortlog;h=master


- Log -----------------------------------------------------------------
commit b5be313d92f772b0d57681b2032fb359bf930c63
Author: Stefan Metzmacher <[email protected]>
Date:   Thu Dec 19 13:34:18 2024 +0100

    s4:selftest: samba.tests.krb5.netlogon don't need explicit FAST_SUPPORT
    
    Signed-off-by: Stefan Metzmacher <[email protected]>
    Reviewed-by: Jennifer Sutton <[email protected]>
    
    Autobuild-User(master): Stefan Metzmacher <[email protected]>
    Autobuild-Date(master): Wed Jan  8 10:16:50 UTC 2025 on atb-devel-224

commit b0d6c639e4a332eb2e91be0297d08e1e384da354
Author: Stefan Metzmacher <[email protected]>
Date:   Mon Dec 16 18:11:31 2024 +0100

    python:tests/krb5: let netlogon.py test referral ticket for 
SEC_CHAN_DNS_DOMAIN
    
    Signed-off-by: Stefan Metzmacher <[email protected]>
    Reviewed-by: Jennifer Sutton <[email protected]>

commit 1510aad09ba5ee80a62e72aa89b49f48403e3cdd
Author: Stefan Metzmacher <[email protected]>
Date:   Mon Dec 16 15:12:21 2024 +0100

    python:tests/krb5: allow get_service_ticket to accept a trust referral 
ticket without kvno
    
    Signed-off-by: Stefan Metzmacher <[email protected]>
    Reviewed-by: Jennifer Sutton <[email protected]>

commit 3d41954195a839dbc097967c04b0c15045787a32
Author: Stefan Metzmacher <[email protected]>
Date:   Mon Dec 16 15:51:34 2024 +0100

    python:tests/krb5: allow tickets without a kvno
    
    This is needed for trust referrals.
    
    Signed-off-by: Stefan Metzmacher <[email protected]>
    Reviewed-by: Jennifer Sutton <[email protected]>

commit 4e6724e12282f85c68e67edc0d20839533e123bd
Author: Stefan Metzmacher <[email protected]>
Date:   Wed Dec 18 14:59:06 2024 +0100

    python:tests/krb5: let netlogon.py export changed passwords to keytab
    
    Signed-off-by: Stefan Metzmacher <[email protected]>
    Reviewed-by: Jennifer Sutton <[email protected]>

commit ca795ef4720411fb21607a5bd67da2d0beb3f54e
Author: Stefan Metzmacher <[email protected]>
Date:   Mon Dec 16 15:18:54 2024 +0100

    python:tests/krb5: add domain trust tests to netlogon.py
    
    Signed-off-by: Stefan Metzmacher <[email protected]>
    Reviewed-by: Jennifer Sutton <[email protected]>

commit 9e58d057a099e89d312a3a23820242cd82587f31
Author: Stefan Metzmacher <[email protected]>
Date:   Mon Dec 2 08:48:32 2024 +0100

    python:tests/krb5: add a create_trust() helper function to test trusted 
domains
    
    Signed-off-by: Stefan Metzmacher <[email protected]>
    Reviewed-by: Jennifer Sutton <[email protected]>

commit 9520aea8b0f2cc56a165650fa470ae5e650ad9a9
Author: Stefan Metzmacher <[email protected]>
Date:   Wed Dec 18 11:44:27 2024 +0100

    python:tests/krb5: allow exporting a keytab file of the accounts used by 
the tests
    
    EXPORT_KEYTAB_FILE=/dev/shm/export.keytab
    EXPORT_KEYTAB_APPEND=0 or 1
    EXPORT_EXISTING_CREDS_TO_KEYTAB=0 or 1
    EXPORT_GIVEN_CREDS_TO_KEYTAB=0 or 1
    
    Signed-off-by: Stefan Metzmacher <[email protected]>
    Reviewed-by: Jennifer Sutton <[email protected]>

commit 122af1c77e0817896f726cecab662fbd0179387e
Author: Stefan Metzmacher <[email protected]>
Date:   Mon Dec 16 13:47:34 2024 +0100

    python:tests/krb5: add 
KerberosCredentials.[g|s]et_trust_{incoming,outgoing,account}_creds
    
    Signed-off-by: Stefan Metzmacher <[email protected]>
    Reviewed-by: Jennifer Sutton <[email protected]>

commit 6c96f9499f514e5b9fb93339407526e28b719d08
Author: Stefan Metzmacher <[email protected]>
Date:   Sat Dec 14 17:51:44 2024 +0100

    pycredentials: add [g|s]et_old_nt_hash()
    
    Signed-off-by: Stefan Metzmacher <[email protected]>
    Reviewed-by: Jennifer Sutton <[email protected]>

commit 978a66f2bd5c7c7a3925a109fc10706fcd869c88
Author: Stefan Metzmacher <[email protected]>
Date:   Fri Dec 13 16:11:34 2024 +0100

    s4:rpc_server/netlogon: fix error codes for netr_NetrLogonSendToSam() with 
SEC_CHAN_RODC
    
    Signed-off-by: Stefan Metzmacher <[email protected]>
    Reviewed-by: Jennifer Sutton <[email protected]>

commit ed728597d2abfacf9d6b3253db2c741880fe26d2
Author: Stefan Metzmacher <[email protected]>
Date:   Fri Dec 13 16:01:27 2024 +0100

    s4:rpc_server/netlogon: an RODC is not allowed to call 
netr_ServerPasswordGet()
    
    Signed-off-by: Stefan Metzmacher <[email protected]>
    Reviewed-by: Jennifer Sutton <[email protected]>

commit 3dd918f3186699b2908d6fc8cda83b49d6b9c39c
Author: Stefan Metzmacher <[email protected]>
Date:   Fri Dec 13 15:46:02 2024 +0100

    python:tests/krb5: let netlogon.py run the tests also as rodc
    
    Signed-off-by: Stefan Metzmacher <[email protected]>
    Reviewed-by: Jennifer Sutton <[email protected]>

commit 6c21a74c44e573f66df8bde416e86619f27e6906
Author: Stefan Metzmacher <[email protected]>
Date:   Fri Dec 13 15:44:43 2024 +0100

    python:tests/krb5: allow netlogon.py tests to work against a KDC with 
claims enabled
    
    Signed-off-by: Stefan Metzmacher <[email protected]>
    Reviewed-by: Jennifer Sutton <[email protected]>

commit 65812d642dac2bc268faad219fa493ada89986bb
Author: Stefan Metzmacher <[email protected]>
Date:   Fri Dec 13 15:42:37 2024 +0100

    python:tests/krb5: allow get_mock_rodc_krbtgt_creds(preserve=False) to 
create a tmp rodc
    
    This also exposes credentials for the machine account for netlogon
    testing.
    
    Signed-off-by: Stefan Metzmacher <[email protected]>
    Reviewed-by: Jennifer Sutton <[email protected]>

commit a2864b3c4c7eec7b59367ce10438eb3aca28a50a
Author: Stefan Metzmacher <[email protected]>
Date:   Wed Dec 18 14:57:06 2024 +0100

    python:tests/krb5: fix etypes_to_test values in RawKerberosTest
    
    Signed-off-by: Stefan Metzmacher <[email protected]>
    Reviewed-by: Jennifer Sutton <[email protected]>

commit 92292aaa0a7499fc3f807eaae70213a976ca31fd
Author: Stefan Metzmacher <[email protected]>
Date:   Thu Dec 12 16:26:23 2024 +0100

    s4:rpc_server/netlogon: fill netlogon_creds_CredentialState->tdo_guid
    
    This will help us to lookup the tdo object using a <GUID=TDO-GUID>
    search base.
    
    Signed-off-by: Stefan Metzmacher <[email protected]>
    Reviewed-by: Jennifer Sutton <[email protected]>

commit 4349ee4586d8a78ba9996e4b3273e72e46070f3b
Author: Stefan Metzmacher <[email protected]>
Date:   Thu Dec 12 16:09:50 2024 +0100

    schannel.idl: add tdo_guid to netlogon_creds_CredentialState
    
    Signed-off-by: Stefan Metzmacher <[email protected]>
    Reviewed-by: Jennifer Sutton <[email protected]>

-----------------------------------------------------------------------

Summary of changes:
 auth/credentials/pycredentials.c              |  57 +++-
 librpc/idl/schannel.idl                       |   5 +
 python/samba/tests/krb5/kdc_base_test.py      | 414 ++++++++++++++++++++++++--
 python/samba/tests/krb5/netlogon.py           | 260 ++++++++++++++--
 python/samba/tests/krb5/raw_testcase.py       | 221 +++++++++++++-
 selftest/knownfail_mit_kdc.d/as-req           |  46 +--
 source4/rpc_server/netlogon/dcerpc_netlogon.c |  19 +-
 source4/selftest/tests.py                     |   2 -
 8 files changed, 942 insertions(+), 82 deletions(-)


Changeset truncated at 500 lines:

diff --git a/auth/credentials/pycredentials.c b/auth/credentials/pycredentials.c
index fa6040e35c0..1ccd242cc47 100644
--- a/auth/credentials/pycredentials.c
+++ b/auth/credentials/pycredentials.c
@@ -581,6 +581,49 @@ static PyObject *py_creds_set_nt_hash(PyObject *self, 
PyObject *args)
        return PyBool_FromLong(cli_credentials_set_nt_hash(creds, pwd, obt));
 }
 
+static PyObject *py_creds_get_old_nt_hash(PyObject *self, PyObject *unused)
+{
+       PyObject *ret;
+       struct samr_Password *ntpw = NULL;
+       struct cli_credentials *creds = PyCredentials_AsCliCredentials(self);
+       if (creds == NULL) {
+               PyErr_Format(PyExc_TypeError, "Credentials expected");
+               return NULL;
+       }
+       ntpw = cli_credentials_get_old_nt_hash(creds, creds);
+       if (ntpw == NULL) {
+               Py_RETURN_NONE;
+       }
+
+       ret = PyBytes_FromStringAndSize(discard_const_p(char, ntpw->hash), 16);
+       TALLOC_FREE(ntpw);
+       return ret;
+}
+
+static PyObject *py_creds_set_old_nt_hash(PyObject *self, PyObject *args)
+{
+       PyObject *py_cp = Py_None;
+       const struct samr_Password *pwd = NULL;
+       struct cli_credentials *creds = PyCredentials_AsCliCredentials(self);
+       if (creds == NULL) {
+               PyErr_Format(PyExc_TypeError, "Credentials expected");
+               return NULL;
+       }
+
+       if (!PyArg_ParseTuple(args, "O", &py_cp)) {
+               return NULL;
+       }
+
+       if (!py_check_dcerpc_type(py_cp, "samba.dcerpc.samr", "Password")) {
+               /* py_check_dcerpc_type sets TypeError */
+               return NULL;
+       }
+
+       pwd = pytalloc_get_ptr(py_cp);
+
+       return PyBool_FromLong(cli_credentials_set_old_nt_hash(creds, pwd));
+}
+
 static PyObject *py_creds_get_kerberos_state(PyObject *self, PyObject *unused)
 {
        int state;
@@ -1668,9 +1711,21 @@ static PyMethodDef py_creds_methods[] = {
                .ml_name  = "set_nt_hash",
                .ml_meth  = py_creds_set_nt_hash,
                .ml_flags = METH_VARARGS,
-               .ml_doc = "S.set_net_sh(samr_Password[, credentials.SPECIFIED]) 
-> bool\n"
+               .ml_doc = "S.set_nt_hash(samr_Password[, 
credentials.SPECIFIED]) -> bool\n"
                        "Change NT hash.",
        },
+       {
+               .ml_name  = "get_old_nt_hash",
+               .ml_meth  = py_creds_get_old_nt_hash,
+               .ml_flags = METH_NOARGS,
+       },
+       {
+               .ml_name  = "set_old_nt_hash",
+               .ml_meth  = py_creds_set_old_nt_hash,
+               .ml_flags = METH_VARARGS,
+               .ml_doc = "S.set_old_nt_hash(samr_Password) -> bool\n"
+                       "Change old NT hash.",
+       },
        {
                .ml_name  = "get_kerberos_state",
                .ml_meth  = py_creds_get_kerberos_state,
diff --git a/librpc/idl/schannel.idl b/librpc/idl/schannel.idl
index 50b5bba39a0..1b259df81d3 100644
--- a/librpc/idl/schannel.idl
+++ b/librpc/idl/schannel.idl
@@ -58,6 +58,11 @@ interface schannel
                NTTIME auth_time;
                [value(0xFFFFFFFF)] uint32 magic3;
                dom_sid28 client_sid;
+               /*
+                * objectGUID of the trustedDomain object
+                * for SEC_CHAN_DOMAIN and SEC_CHAN_DNS_DOMAIN
+                */
+               GUID tdo_guid;
                /*
                 * NETLOGON_NEG_SUPPORTS_KERBEROS_AUTH is not
                 * enough to implement the logic
diff --git a/python/samba/tests/krb5/kdc_base_test.py 
b/python/samba/tests/krb5/kdc_base_test.py
index 093b6712e8e..17877aa8863 100644
--- a/python/samba/tests/krb5/kdc_base_test.py
+++ b/python/samba/tests/krb5/kdc_base_test.py
@@ -43,6 +43,8 @@ from samba import (
     generate_random_password,
     net,
     ntstatus,
+    current_unix_time,
+    unix2nttime,
 )
 from samba.auth import system_session
 from samba.credentials import (
@@ -52,6 +54,7 @@ from samba.credentials import (
     Credentials,
 )
 from samba.crypto import des_crypt_blob_16, md4_hash_blob
+from samba.lsa_utils import OpenPolicyFallback, CreateTrustedDomainFallback
 from samba.dcerpc import (
     claims,
     dcerpc,
@@ -66,7 +69,13 @@ from samba.dcerpc import (
     samr,
     security,
 )
-from samba.dcerpc.misc import SEC_CHAN_BDC, SEC_CHAN_NULL, SEC_CHAN_WKSTA
+from samba.dcerpc.misc import (
+    SEC_CHAN_NULL,
+    SEC_CHAN_BDC,
+    SEC_CHAN_DNS_DOMAIN,
+    SEC_CHAN_DOMAIN,
+    SEC_CHAN_WKSTA,
+)
 from samba.domain.models import AuthenticationPolicy, AuthenticationSilo
 from samba.drs_utils import drs_Replicate, drsuapi_connect
 from samba.dsdb import (
@@ -88,7 +97,8 @@ from samba.dsdb import (
     UF_SERVER_TRUST_ACCOUNT,
     UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION,
     UF_WORKSTATION_TRUST_ACCOUNT,
-    UF_SMARTCARD_REQUIRED
+    UF_SMARTCARD_REQUIRED,
+    UF_INTERDOMAIN_TRUST_ACCOUNT,
 )
 from samba.join import DCJoinContext
 from samba.ndr import ndr_pack, ndr_unpack
@@ -160,6 +170,7 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest):
         RODC = object()
         MANAGED_SERVICE = object()
         GROUP_MANAGED_SERVICE = object()
+        TRUST = object()
 
     @classmethod
     def setUpClass(cls):
@@ -170,6 +181,7 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest):
         cls._rodc_ldb = None
 
         cls._drsuapi_connection = None
+        cls._lsarpc_connection = None
 
         cls._functional_level = None
 
@@ -182,6 +194,9 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest):
         # A list containing DNs of accounts created as part of testing.
         cls.accounts = []
 
+        # A list of tdo_handles of trusts created as part of testing.
+        cls.trusts = []
+
         cls.account_cache = {}
         cls.policy_cache = {}
         cls.tkt_cache = {}
@@ -327,6 +342,15 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest):
             for dn in reversed(self.test_accounts):
                 delete_force(self._ldb, dn)
 
+        if self._test_rodc_ctx is not None:
+            self._test_rodc_ctx.cleanup_old_join(force=True)
+
+        # Clean up any trusts created for single tests.
+        if self._lsarpc_connection is not None:
+            lsa_conn, _, _, _, _ = self._lsarpc_connection
+            for tdo_handle in reversed(self.test_trusts):
+                lsa_conn.DeleteObject(tdo_handle)
+
         super().tearDown()
 
     @classmethod
@@ -344,6 +368,14 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest):
             for dn in reversed(cls.accounts):
                 delete_force(cls._ldb, dn)
 
+        # Clean up any trusts created by create_trust. This is
+        # done in tearDownClass() rather than tearDown(), so that
+        # trust accounts need only be created once for permutation tests.
+        if cls._lsarpc_connection is not None:
+            lsa_conn, _, _, _, _ = cls._lsarpc_connection
+            for tdo_handle in reversed(cls.trusts):
+                lsa_conn.DeleteObject(tdo_handle)
+
         if cls._rodc_ctx is not None:
             cls._rodc_ctx.cleanup_old_join(force=True)
 
@@ -357,6 +389,11 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest):
         # A list containing DNs of accounts that should be removed when the
         # current test finishes.
         self.test_accounts = []
+        self._test_rodc_ctx = None
+
+        # A list containing tdo_handles of trusts that should be removed when 
the
+        # current test finishes.
+        self.test_trusts = []
 
     def get_lp(self) -> LoadParm:
         if self._lp is None:
@@ -403,6 +440,62 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest):
 
         return self._drsuapi_connection
 
+    def get_lsarpc_connection(self):
+        def get_lsa_info(conn, policy_access):
+            in_version = 1
+            in_revision_info1 = lsa.revision_info1()
+            in_revision_info1.revision = 1
+            in_revision_info1.supported_features = (
+                lsa.LSA_FEATURE_TDO_AUTH_INFO_AES_CIPHER
+            )
+
+            out_version, out_revision_info1, policy = OpenPolicyFallback(
+                conn,
+                b''.decode('utf-8'),
+                in_version,
+                in_revision_info1,
+                access_mask=policy_access
+            )
+
+            info = conn.QueryInfoPolicy2(policy, lsa.LSA_POLICY_INFO_DNS)
+
+            return (policy, out_version, out_revision_info1, info)
+
+        def lsarpc_connect(server, lp, creds, ip=None):
+            binding_options = ""
+            if lp.log_level() >= 9:
+                binding_options += ",print"
+
+            # Allow forcing the IP
+            if ip is not None:
+                binding_options += f",target_hostname={server}"
+                binding_string = f"ncacn_np:{ip}[{binding_options}]"
+            else:
+                binding_string = "ncacn_np:%s[%s]" % (server, binding_options)
+
+            try:
+                conn = lsa.lsarpc(binding_string, lp, creds)
+                policy_access = lsa.LSA_POLICY_VIEW_LOCAL_INFORMATION
+                policy_access |= lsa.LSA_POLICY_TRUST_ADMIN
+                policy_access |= lsa.LSA_POLICY_CREATE_SECRET
+                (policy, out_version, out_revision_info1, info) = \
+                    get_lsa_info(conn, policy_access)
+            except Exception as e:
+                raise RuntimeError("LSARPC connection to %s failed: %s" % 
(server, e))
+
+            return (conn, policy, out_version, out_revision_info1, info)
+
+        if self._lsarpc_connection is None:
+            admin_creds = self.get_admin_creds()
+            samdb = self.get_samdb()
+            dns_hostname = samdb.host_dns_name()
+            type(self)._lsarpc_connection = lsarpc_connect(dns_hostname,
+                                                           self.get_lp(),
+                                                           admin_creds,
+                                                           ip=self.dc_host)
+
+        return self._lsarpc_connection
+
     def get_server_dn(self, samdb):
         server = samdb.get_serverName()
 
@@ -413,8 +506,13 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest):
 
         return dn
 
-    def get_mock_rodc_ctx(self):
-        if self._rodc_ctx is None:
+    def get_mock_rodc_ctx(self, preserve=True):
+        if preserve:
+            rodc_ctx = self._rodc_ctx
+        else:
+            rodc_ctx = self._test_rodc_ctx
+
+        if rodc_ctx is None:
             admin_creds = self.get_admin_creds()
             lp = self.get_lp()
 
@@ -430,9 +528,16 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest):
                                      domain=None)
             self.create_rodc(rodc_ctx)
 
-            type(self)._rodc_ctx = rodc_ctx
+            if preserve:
+                # Mark this rodc for deletion in tearDownClass() after all the
+                # tests in this class finish.
+                type(self)._rodc_ctx = rodc_ctx
+            else:
+                # Mark this rodc for deletion in tearDown() after the current
+                # test finishes.
+                self._test_rodc_ctx = rodc_ctx
 
-        return self._rodc_ctx
+        return rodc_ctx
 
     def get_domain_functional_level(self, ldb=None):
         if self._functional_level is None:
@@ -789,10 +894,212 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest):
             # Save the claim DN so it can be deleted in tearDownClass()
             self.accounts.append(str(claim_dn))
 
+    def create_trust(self, trust_info,
+                     trust_enc_types=None,
+                     trust_incoming_password=None,
+                     trust_outgoing_password=None,
+                     expect_error=None,
+                     preserve=True):
+        """Create an trust account for testing.
+           The handle of the created trust is added to cls.trusts,
+           which is used by tearDownClass to clean up the created trusts.
+           With preserve=False the handle is added to self.test_trusts,
+           which is used by tearDown to clean up the created trusts.
+        """
+
+        if trust_incoming_password is None:
+            trust_incoming_password = generate_random_password(120, 120)
+        trust_incoming_secret = 
list(trust_incoming_password.encode('utf-16-le'))
+        if trust_outgoing_password is None:
+            trust_outgoing_password = generate_random_password(120, 120)
+        trust_outgoing_secret = 
list(trust_outgoing_password.encode('utf-16-le'))
+
+        def generate_AuthInOutBlob(secret, update_time):
+            if secret is None:
+                blob = drsblobs.trustAuthInOutBlob()
+                blob.count = 0
+
+                return blob
+
+            clear = drsblobs.AuthInfoClear()
+            clear.size = len(secret)
+            clear.password = secret
+
+            info = drsblobs.AuthenticationInformation()
+            info.LastUpdateTime = unix2nttime(update_time)
+            info.AuthType = lsa.TRUST_AUTH_TYPE_CLEAR
+            info.AuthInfo = clear
+
+            array = drsblobs.AuthenticationInformationArray()
+            array.count = 1
+            array.array = [info]
+
+            blob = drsblobs.trustAuthInOutBlob()
+            blob.count = 1
+            blob.current = array
+
+            return blob
+
+        update_time = current_unix_time()
+        trust_incoming_blob = generate_AuthInOutBlob(trust_incoming_secret,
+                                                     update_time)
+        trust_outgoing_blob = generate_AuthInOutBlob(trust_outgoing_secret,
+                                                     update_time)
+
+        lsa_conn, lsa_policy, lsa_version, lsa_revision_info1, local_info = \
+                self.get_lsarpc_connection()
+
+        try:
+            tdo_handle = CreateTrustedDomainFallback(lsa_conn,
+                                                     lsa_policy,
+                                                     trust_info,
+                                                     
lsa.LSA_TRUSTED_DOMAIN_ALL_ACCESS |
+                                                     security.SEC_STD_DELETE,
+                                                     lsa_version,
+                                                     lsa_revision_info1,
+                                                     trust_incoming_blob,
+                                                     trust_outgoing_blob)
+        except NTSTATUSError as err:
+            status, _ = err.args
+            self.assertIsNotNone(expect_error,
+                                 f'unexpectedly failed with {status:08X}')
+            self.assertEqual(expect_error, status, 'got wrong status code')
+            return (None, None, None, None)
+        self.assertIsNone(expect_error, 'expected error')
+        if preserve:
+            # Mark this trust for deletion in tearDownClass() after all the
+            # tests in this class finish.
+            self.trusts.append(tdo_handle)
+        else:
+            # Mark this trust for deletion in tearDown() after the current
+            # test finishes.
+            self.test_trusts.append(tdo_handle)
+        if trust_enc_types:
+            lsa_conn.SetInformationTrustedDomain(tdo_handle,
+                                                 
lsa.LSA_TRUSTED_DOMAIN_SUPPORTED_ENCRYPTION_TYPES,
+                                                 trust_enc_types)
+
+        samdb = self.get_samdb()
+
+        incoming_account_name = trust_info.netbios_name.string
+        incoming_account_name += '$'
+        incoming_nbt_domain = local_info.name.string
+        incoming_dns_domain = local_info.dns_domain.string
+
+        outgoing_account_name = local_info.name.string
+        outgoing_account_name += '$'
+        outgoing_nbt_domain = trust_info.netbios_name.string
+        outgoing_dns_domain = trust_info.domain_name.string
+
+        tdo_search_filter = "(&(objectClass=trustedDomain)(name=%s))" % (
+                            outgoing_dns_domain)
+        tdo_res = samdb.search(scope=ldb.SCOPE_SUBTREE,
+                               expression=tdo_search_filter,
+                               attrs=['msDS-TrustForestTrustInfo'])
+        self.assertEqual(len(tdo_res), 1)
+        tdo_dn = tdo_res[0].dn
+
+        acct_search_filter = "(&(objectClass=user)(sAMAccountName=%s))" % (
+                             incoming_account_name)
+        acct_res = samdb.search(scope=ldb.SCOPE_SUBTREE,
+                                expression=acct_search_filter,
+                                attrs=['msDS-KeyVersionNumber',
+                                       'objectSid',
+                                       'objectGUID'])
+        self.assertEqual(len(acct_res), 1)
+        acct_dn = acct_res[0].dn
+        acct_kvno = int(acct_res[0]['msDS-KeyVersionNumber'][0])
+        acct_sid = acct_res[0].get('objectSid', idx=0)
+        acct_sid = samdb.schema_format_value('objectSID', acct_sid)
+        acct_sid = acct_sid.decode('utf-8')
+        acct_guid = acct_res[0].get('objectGUID', idx=0)
+        acct_guid = samdb.schema_format_value('objectGUID', acct_guid)
+        acct_guid = acct_guid.decode('utf-8')
+
+        trust_incoming_salt = "%skrbtgt%s" % (
+                incoming_dns_domain.upper(),
+                outgoing_dns_domain.upper())
+        trust_outgoing_salt = "%skrbtgt%s" % (
+                outgoing_dns_domain.upper(),
+                incoming_dns_domain.upper())
+        trust_account_salt = "%skrbtgt%s" % (
+                incoming_dns_domain.upper(),
+                outgoing_nbt_domain.upper())
+
+        if trust_info.trust_type != lsa.LSA_TRUST_TYPE_DOWNLEVEL:
+            secure_channel_type = SEC_CHAN_DNS_DOMAIN
+        else:
+            secure_channel_type = SEC_CHAN_DOMAIN
+
+        incoming_creds = KerberosCredentials()
+        incoming_creds.guess(self.get_lp())
+        incoming_creds.set_realm(incoming_dns_domain.upper())
+        incoming_creds.set_domain(incoming_nbt_domain.upper())
+        incoming_creds.set_forced_salt(trust_incoming_salt.encode('utf-8'))
+        incoming_creds.set_password(trust_incoming_password)
+        incoming_creds.set_username(incoming_account_name)
+        incoming_creds.set_workstation('')
+        incoming_creds.set_secure_channel_type(secure_channel_type)
+        incoming_creds.set_dn(tdo_dn)
+        incoming_creds.set_type(self.AccountType.TRUST)
+        incoming_creds.set_user_account_control(UF_INTERDOMAIN_TRUST_ACCOUNT)
+        self.creds_set_enctypes(incoming_creds)
+
+        outgoing_creds = KerberosCredentials()
+        outgoing_creds.guess(self.get_lp())
+        outgoing_creds.set_realm(outgoing_dns_domain.upper())
+        outgoing_creds.set_domain(outgoing_nbt_domain.upper())
+        outgoing_creds.set_forced_salt(trust_outgoing_salt.encode('utf-8'))
+        outgoing_creds.set_password(trust_outgoing_password)
+        outgoing_creds.set_username(outgoing_account_name)
+        outgoing_creds.set_workstation('')
+        outgoing_creds.set_secure_channel_type(secure_channel_type)
+        outgoing_creds.set_dn(tdo_dn)
+        outgoing_creds.set_type(self.AccountType.TRUST)
+        outgoing_creds.set_user_account_control(UF_INTERDOMAIN_TRUST_ACCOUNT)
+        self.creds_set_enctypes(outgoing_creds)
+
+        account_creds = KerberosCredentials()
+        account_creds.guess(self.get_lp())
+        account_creds.set_realm(incoming_dns_domain.upper())
+        account_creds.set_domain(incoming_nbt_domain.upper())
+        account_creds.set_forced_salt(trust_account_salt.encode('utf-8'))
+        account_creds.set_password(trust_incoming_password)
+        account_creds.set_username(incoming_account_name)
+        account_creds.set_workstation('TEST-TRUST-DC')
+        account_creds.set_secure_channel_type(secure_channel_type)
+        account_creds.set_dn(acct_dn)
+        account_creds.set_type(self.AccountType.TRUST)
+        account_creds.set_user_account_control(UF_INTERDOMAIN_TRUST_ACCOUNT)
+        account_creds.set_kvno(acct_kvno)
+        account_creds.set_sid(str(acct_sid))
+        account_creds.set_guid(acct_guid)
+        if trust_enc_types is not None:
+            self.creds_set_enctypes(account_creds,
+                                    extra_bits=trust_enc_types.enc_types)
+        else:
+            self.creds_set_enctypes(account_creds)
+
+        incoming_creds.set_trust_outgoing_creds(outgoing_creds)
+        incoming_creds.set_trust_account_creds(account_creds)
+
+        outgoing_creds.set_trust_incoming_creds(incoming_creds)


-- 
Samba Shared Repository

Reply via email to