Colin Watson has proposed merging ~cjwatson/launchpad:distribution-sharing-policies into launchpad:master with ~cjwatson/launchpad:distribution-information-type as a prerequisite.
Commit message: Add distribution sharing policies Requested reviews: Launchpad code reviewers (launchpad-reviewers) For more details, see: https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/415623 This is heavily based on the equivalent code for projects. -- Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:distribution-sharing-policies into launchpad:master.
diff --git a/lib/lp/code/model/branch.py b/lib/lp/code/model/branch.py index 98952be..8ec4819 100644 --- a/lib/lp/code/model/branch.py +++ b/lib/lp/code/model/branch.py @@ -253,10 +253,10 @@ class Branch(SQLBase, WebhookTargetMixin, BzrIdentityMixin): (abstract_artifact, policy) for policy in getUtility(IAccessPolicySource).findByTeam([self.owner])} else: - # We haven't yet quite worked out how distribution privacy - # works, so only work for products for now. if self.product is not None: pillars = [self.product] + elif self.distribution is not None: + pillars = [self.distribution] reconcile_access_for_artifacts( [self], self.information_type, pillars, wanted_links) diff --git a/lib/lp/code/model/branchnamespace.py b/lib/lp/code/model/branchnamespace.py index 326d1d4..b007d99 100644 --- a/lib/lp/code/model/branchnamespace.py +++ b/lib/lp/code/model/branchnamespace.py @@ -26,7 +26,6 @@ from lp.app.enums import ( FREE_INFORMATION_TYPES, InformationType, NON_EMBARGOED_INFORMATION_TYPES, - PUBLIC_INFORMATION_TYPES, ) from lp.app.interfaces.services import IService from lp.code.enums import ( @@ -420,11 +419,32 @@ class PackageBranchNamespace(_BaseBranchNamespace): def getAllowedInformationTypes(self, who=None): """See `IBranchNamespace`.""" - return PUBLIC_INFORMATION_TYPES + # The distribution uses the new simplified branch_sharing_policy + # rules, so check them. + + # Some policies require that the branch owner or current user have + # full access to an information type. If it's required and the user + # doesn't hold it, no information types are legal. + distribution = self.sourcepackage.distribution + required_grant = BRANCH_POLICY_REQUIRED_GRANTS[ + distribution.branch_sharing_policy] + if (required_grant is not None + and not getUtility(IService, 'sharing').checkPillarAccess( + [distribution], required_grant, self.owner) + and (who is None + or not getUtility(IService, 'sharing').checkPillarAccess( + [distribution], required_grant, who))): + return [] + + return BRANCH_POLICY_ALLOWED_TYPES[distribution.branch_sharing_policy] def getDefaultInformationType(self, who=None): """See `IBranchNamespace`.""" - return InformationType.PUBLIC + default_type = BRANCH_POLICY_DEFAULT_TYPES[ + self.sourcepackage.distribution.branch_sharing_policy] + if default_type not in self.getAllowedInformationTypes(who): + return None + return default_type class BranchNamespaceSet: diff --git a/lib/lp/code/model/gitnamespace.py b/lib/lp/code/model/gitnamespace.py index 7ba9a13..e7b2c92 100644 --- a/lib/lp/code/model/gitnamespace.py +++ b/lib/lp/code/model/gitnamespace.py @@ -510,11 +510,29 @@ class PackageGitNamespace(_BaseGitNamespace): def getAllowedInformationTypes(self, who=None): """See `IGitNamespace`.""" - return PUBLIC_INFORMATION_TYPES + # Some policies require that the repository owner or current user + # have full access to an information type. If it's required and the + # user doesn't hold it, no information types are legal. + distribution = self.distro_source_package.distribution + required_grant = BRANCH_POLICY_REQUIRED_GRANTS[ + distribution.branch_sharing_policy] + if (required_grant is not None + and not getUtility(IService, 'sharing').checkPillarAccess( + [distribution], required_grant, self.owner) + and (who is None + or not getUtility(IService, 'sharing').checkPillarAccess( + [distribution], required_grant, who))): + return [] + + return BRANCH_POLICY_ALLOWED_TYPES[distribution.branch_sharing_policy] def getDefaultInformationType(self, who=None): """See `IGitNamespace`.""" - return InformationType.PUBLIC + default_type = BRANCH_POLICY_DEFAULT_TYPES[ + self.distro_source_package.distribution.branch_sharing_policy] + if default_type not in self.getAllowedInformationTypes(who): + return None + return default_type def areRepositoriesMergeable(self, this, other): """See `IGitNamespacePolicy`.""" diff --git a/lib/lp/code/model/gitrepository.py b/lib/lp/code/model/gitrepository.py index 1677d9e..0dd205b 100644 --- a/lib/lp/code/model/gitrepository.py +++ b/lib/lp/code/model/gitrepository.py @@ -659,10 +659,10 @@ class GitRepository(StormBase, WebhookTargetMixin, AccessTokenTargetMixin, (abstract_artifact, policy) for policy in getUtility(IAccessPolicySource).findByTeam([self.owner])} else: - # We haven't yet quite worked out how distribution privacy - # works, so only work for projects for now. if self.project is not None: pillars = [self.project] + elif self.distribution is not None: + pillars = [self.distribution] reconcile_access_for_artifacts( [self], self.information_type, pillars, wanted_links) diff --git a/lib/lp/code/model/tests/test_branch.py b/lib/lp/code/model/tests/test_branch.py index 4419b12..9692104 100644 --- a/lib/lp/code/model/tests/test_branch.py +++ b/lib/lp/code/model/tests/test_branch.py @@ -17,6 +17,10 @@ from pytz import UTC import six from storm.exceptions import LostObjectError from storm.locals import Store +from testscenarios import ( + load_tests_apply_scenarios, + WithScenarios, + ) from testtools import ExpectedException from testtools.matchers import ( Not, @@ -2531,13 +2535,17 @@ class TestBranchPrivacy(TestCaseWithFactory): [(branch.product, InformationType.USERDATA)]), get_policies_for_artifact(branch)) - def test__reconcileAccess_for_distro_branch(self): - # Branch privacy isn't yet supported for distributions, so no - # AccessPolicyArtifact is created for a distro branch. + def test__reconcileAccess_for_package_branch(self): + # _reconcileAccess uses a distribution policy for a package branch. branch = self.factory.makePackageBranch( information_type=InformationType.USERDATA) + [artifact] = getUtility(IAccessArtifactSource).ensure([branch]) + getUtility(IAccessPolicyArtifactSource).deleteByArtifact([artifact]) removeSecurityProxy(branch)._reconcileAccess() - self.assertEqual([], get_policies_for_artifact(branch)) + self.assertContentEqual( + getUtility(IAccessPolicySource).find( + [(branch.distribution, InformationType.USERDATA)]), + get_policies_for_artifact(branch)) def test__reconcileAccess_for_personal_branch(self): # _reconcileAccess uses a person policy for a personal branch. @@ -2701,15 +2709,26 @@ class TestBranchSetPrivate(TestCaseWithFactory): InformationType.PRIVATESECURITY, branch.information_type) -class BranchModerateTestCase(TestCaseWithFactory): - """Test that product owners and commercial admins can moderate branches.""" +class BranchModerateTestCase(WithScenarios, TestCaseWithFactory): + """Test that pillar owners and commercial admins can moderate branches.""" layer = DatabaseFunctionalLayer + scenarios = [ + ("project", {"branch_factory_name": "makeProductBranch"}), + ("distribution", {"branch_factory_name": "makePackageBranch"}), + ] + + def _makeBranch(self, **kwargs): + return getattr(self.factory, self.branch_factory_name)(**kwargs) + + def _getPillar(self, branch): + return branch.product or branch.distribution def test_moderate_permission(self): # Test the ModerateBranch security checker. - branch = self.factory.makeProductBranch() - with person_logged_in(branch.product.owner): + branch = self._makeBranch() + pillar = self._getPillar(branch) + with person_logged_in(pillar.owner): self.assertTrue( check_permission('launchpad.Moderate', branch)) with celebrity_logged_in('commercial_admin'): @@ -2718,25 +2737,27 @@ class BranchModerateTestCase(TestCaseWithFactory): def test_methods_smoketest(self): # Users with launchpad.Moderate can call transitionToInformationType. - branch = self.factory.makeProductBranch() - with person_logged_in(branch.product.owner): - branch.product.setBranchSharingPolicy(BranchSharingPolicy.PUBLIC) + branch = self._makeBranch() + pillar = self._getPillar(branch) + with person_logged_in(pillar.owner): + pillar.setBranchSharingPolicy(BranchSharingPolicy.PUBLIC) branch.transitionToInformationType( - InformationType.PRIVATESECURITY, branch.product.owner) + InformationType.PRIVATESECURITY, pillar.owner) self.assertEqual( InformationType.PRIVATESECURITY, branch.information_type) def test_attribute_smoketest(self): # Users with launchpad.Moderate can set attrs. - branch = self.factory.makeProductBranch() - with person_logged_in(branch.product.owner): + branch = self._makeBranch() + pillar = self._getPillar(branch) + with person_logged_in(pillar.owner): branch.name = 'not-secret' branch.description = 'redacted' - branch.reviewer = branch.product.owner + branch.reviewer = pillar.owner branch.lifecycle_status = BranchLifecycleStatus.EXPERIMENTAL self.assertEqual('not-secret', branch.name) self.assertEqual('redacted', branch.description) - self.assertEqual(branch.product.owner, branch.reviewer) + self.assertEqual(pillar.owner, branch.reviewer) self.assertEqual( BranchLifecycleStatus.EXPERIMENTAL, branch.lifecycle_status) @@ -3574,3 +3595,6 @@ class TestWebservice(TestCaseWithFactory): with admin_logged_in(): self.assertEqual( 1, len(list(getUtility(IBranchScanJobSource).iterReady()))) + + +load_tests = load_tests_apply_scenarios diff --git a/lib/lp/code/model/tests/test_branchnamespace.py b/lib/lp/code/model/tests/test_branchnamespace.py index aff13b1..7a4ff61 100644 --- a/lib/lp/code/model/tests/test_branchnamespace.py +++ b/lib/lp/code/model/tests/test_branchnamespace.py @@ -10,7 +10,6 @@ from lp.app.enums import ( FREE_INFORMATION_TYPES, InformationType, NON_EMBARGOED_INFORMATION_TYPES, - PUBLIC_INFORMATION_TYPES, ) from lp.app.interfaces.services import IService from lp.app.validators import LaunchpadValidationError @@ -594,6 +593,186 @@ class TestPackageBranchNamespace(TestCaseWithFactory, NamespaceMixin): self.assertEqual(IBranchTarget(package), namespace.target) +class TestPackageBranchNamespacePrivacyWithInformationType( + TestCaseWithFactory): + """Tests for the privacy aspects of `PackageBranchNamespace`. + + This tests the behaviour for a package in a distribution using the new + branch_sharing_policy rules. + """ + + layer = DatabaseFunctionalLayer + + def makePackageBranchNamespace(self, sharing_policy, person=None): + if person is None: + person = self.factory.makePerson() + package = self.factory.makeSourcePackage() + self.factory.makeCommercialSubscription(pillar=package.distribution) + with person_logged_in(package.distribution.owner): + package.distribution.setBranchSharingPolicy(sharing_policy) + namespace = PackageBranchNamespace(person, package) + return namespace + + def test_public_anyone(self): + namespace = self.makePackageBranchNamespace( + BranchSharingPolicy.PUBLIC) + self.assertContentEqual( + FREE_INFORMATION_TYPES, namespace.getAllowedInformationTypes()) + self.assertEqual( + InformationType.PUBLIC, namespace.getDefaultInformationType()) + + def test_forbidden_anyone(self): + namespace = self.makePackageBranchNamespace( + BranchSharingPolicy.FORBIDDEN) + self.assertContentEqual([], namespace.getAllowedInformationTypes()) + self.assertEqual(None, namespace.getDefaultInformationType()) + + def test_public_or_proprietary_anyone(self): + namespace = self.makePackageBranchNamespace( + BranchSharingPolicy.PUBLIC_OR_PROPRIETARY) + self.assertContentEqual( + NON_EMBARGOED_INFORMATION_TYPES, + namespace.getAllowedInformationTypes()) + self.assertEqual( + InformationType.PUBLIC, namespace.getDefaultInformationType()) + + def test_proprietary_or_public_anyone(self): + namespace = self.makePackageBranchNamespace( + BranchSharingPolicy.PROPRIETARY_OR_PUBLIC) + self.assertContentEqual([], namespace.getAllowedInformationTypes()) + self.assertIs(None, namespace.getDefaultInformationType()) + + def test_proprietary_or_public_owner_grantee(self): + namespace = self.makePackageBranchNamespace( + BranchSharingPolicy.PROPRIETARY_OR_PUBLIC) + distribution = namespace.sourcepackage.distribution + with person_logged_in(distribution.owner): + getUtility(IService, 'sharing').sharePillarInformation( + distribution, namespace.owner, distribution.owner, + {InformationType.PROPRIETARY: SharingPermission.ALL}) + self.assertContentEqual( + NON_EMBARGOED_INFORMATION_TYPES, + namespace.getAllowedInformationTypes()) + self.assertEqual( + InformationType.PROPRIETARY, + namespace.getDefaultInformationType()) + + def test_proprietary_or_public_caller_grantee(self): + namespace = self.makePackageBranchNamespace( + BranchSharingPolicy.PROPRIETARY_OR_PUBLIC) + distribution = namespace.sourcepackage.distribution + grantee = self.factory.makePerson() + with person_logged_in(distribution.owner): + getUtility(IService, 'sharing').sharePillarInformation( + distribution, grantee, distribution.owner, + {InformationType.PROPRIETARY: SharingPermission.ALL}) + self.assertContentEqual( + NON_EMBARGOED_INFORMATION_TYPES, + namespace.getAllowedInformationTypes(grantee)) + self.assertEqual( + InformationType.PROPRIETARY, + namespace.getDefaultInformationType(grantee)) + + def test_proprietary_anyone(self): + namespace = self.makePackageBranchNamespace( + BranchSharingPolicy.PROPRIETARY) + self.assertContentEqual([], namespace.getAllowedInformationTypes()) + self.assertIs(None, namespace.getDefaultInformationType()) + + def test_proprietary_branch_owner_grantee(self): + namespace = self.makePackageBranchNamespace( + BranchSharingPolicy.PROPRIETARY) + distribution = namespace.sourcepackage.distribution + with person_logged_in(distribution.owner): + getUtility(IService, 'sharing').sharePillarInformation( + distribution, namespace.owner, distribution.owner, + {InformationType.PROPRIETARY: SharingPermission.ALL}) + self.assertContentEqual( + [InformationType.PROPRIETARY], + namespace.getAllowedInformationTypes()) + self.assertEqual( + InformationType.PROPRIETARY, + namespace.getDefaultInformationType()) + + def test_proprietary_caller_grantee(self): + namespace = self.makePackageBranchNamespace( + BranchSharingPolicy.PROPRIETARY) + distribution = namespace.sourcepackage.distribution + grantee = self.factory.makePerson() + with person_logged_in(distribution.owner): + getUtility(IService, 'sharing').sharePillarInformation( + distribution, grantee, distribution.owner, + {InformationType.PROPRIETARY: SharingPermission.ALL}) + self.assertContentEqual( + [InformationType.PROPRIETARY], + namespace.getAllowedInformationTypes(grantee)) + self.assertEqual( + InformationType.PROPRIETARY, + namespace.getDefaultInformationType(grantee)) + + def test_embargoed_or_proprietary_anyone(self): + namespace = self.makePackageBranchNamespace( + BranchSharingPolicy.EMBARGOED_OR_PROPRIETARY) + self.assertContentEqual([], namespace.getAllowedInformationTypes()) + self.assertIs(None, namespace.getDefaultInformationType()) + + def test_embargoed_or_proprietary_owner_grantee(self): + namespace = self.makePackageBranchNamespace( + BranchSharingPolicy.EMBARGOED_OR_PROPRIETARY) + distribution = namespace.sourcepackage.distribution + with person_logged_in(distribution.owner): + getUtility(IService, 'sharing').sharePillarInformation( + distribution, namespace.owner, distribution.owner, + {InformationType.PROPRIETARY: SharingPermission.ALL}) + self.assertContentEqual( + [InformationType.PROPRIETARY, InformationType.EMBARGOED], + namespace.getAllowedInformationTypes()) + self.assertEqual( + InformationType.EMBARGOED, + namespace.getDefaultInformationType()) + + def test_embargoed_or_proprietary_caller_grantee(self): + namespace = self.makePackageBranchNamespace( + BranchSharingPolicy.EMBARGOED_OR_PROPRIETARY) + distribution = namespace.sourcepackage.distribution + grantee = self.factory.makePerson() + with person_logged_in(distribution.owner): + getUtility(IService, 'sharing').sharePillarInformation( + distribution, grantee, distribution.owner, + {InformationType.PROPRIETARY: SharingPermission.ALL}) + self.assertContentEqual( + [InformationType.PROPRIETARY, InformationType.EMBARGOED], + namespace.getAllowedInformationTypes(grantee)) + self.assertEqual( + InformationType.EMBARGOED, + namespace.getDefaultInformationType(grantee)) + + def test_grantee_has_no_artifact_grant(self): + # The owner of a new branch in a distribution whose default + # information type is non-public does not have an artifact grant + # specifically for the new branch, because their existing policy + # grant is sufficient. + person = self.factory.makePerson() + team = self.factory.makeTeam(members=[person]) + namespace = self.makePackageBranchNamespace( + BranchSharingPolicy.PROPRIETARY, person=person) + distribution = namespace.sourcepackage.distribution + with person_logged_in(distribution.owner): + getUtility(IService, 'sharing').sharePillarInformation( + distribution, team, distribution.owner, + {InformationType.PROPRIETARY: SharingPermission.ALL}) + branch = namespace.createBranch( + BranchType.HOSTED, self.factory.getUniqueString(), person) + [policy] = getUtility(IAccessPolicySource).find( + [(distribution, InformationType.PROPRIETARY)]) + apgfs = getUtility(IAccessPolicyGrantFlatSource) + self.assertContentEqual( + [(distribution.owner, {policy: SharingPermission.ALL}, []), + (team, {policy: SharingPermission.ALL}, [])], + apgfs.findGranteePermissionsByPolicy([policy])) + self.assertTrue(removeSecurityProxy(branch).visibleByUser(person)) + + class TestNamespaceSet(TestCaseWithFactory): """Tests for `get_namespace`.""" @@ -1041,21 +1220,6 @@ class TestPersonalBranchNamespaceAllowedInformationTypes(TestCaseWithFactory): namespace.getAllowedInformationTypes()) -class TestPackageBranchNamespaceAllowedInformationTypes(TestCaseWithFactory): - """Tests for PackageBranchNamespace.getAllowedInformationTypes.""" - - layer = DatabaseFunctionalLayer - - def test_anyone(self): - # Source package branches are always public. - source_package = self.factory.makeSourcePackage() - person = self.factory.makePerson() - namespace = PackageBranchNamespace(person, source_package) - self.assertContentEqual( - PUBLIC_INFORMATION_TYPES, - namespace.getAllowedInformationTypes()) - - class BaseValidateNewBranchMixin: layer = DatabaseFunctionalLayer diff --git a/lib/lp/code/model/tests/test_gitnamespace.py b/lib/lp/code/model/tests/test_gitnamespace.py index ac469e9..1ca8b5f 100644 --- a/lib/lp/code/model/tests/test_gitnamespace.py +++ b/lib/lp/code/model/tests/test_gitnamespace.py @@ -885,6 +885,183 @@ class TestPackageGitNamespace(TestCaseWithFactory, NamespaceMixin): repositories[0].namespace.collection.getRepositories()) +class TestPackageGitNamespacePrivacyWithInformationType(TestCaseWithFactory): + """Tests for the privacy aspects of `PackageGitNamespace`. + + This tests the behaviour for a package in a distribution using the new + branch_sharing_policy rules. + """ + + layer = DatabaseFunctionalLayer + + def makePackageGitNamespace(self, sharing_policy, person=None): + if person is None: + person = self.factory.makePerson() + dsp = self.factory.makeDistributionSourcePackage() + self.factory.makeCommercialSubscription(pillar=dsp.distribution) + with person_logged_in(dsp.distribution.owner): + dsp.distribution.setBranchSharingPolicy(sharing_policy) + namespace = PackageGitNamespace(person, dsp) + return namespace + + def test_public_anyone(self): + namespace = self.makePackageGitNamespace(BranchSharingPolicy.PUBLIC) + self.assertContentEqual( + FREE_INFORMATION_TYPES, namespace.getAllowedInformationTypes()) + self.assertEqual( + InformationType.PUBLIC, namespace.getDefaultInformationType()) + + def test_forbidden_anyone(self): + namespace = self.makePackageGitNamespace(BranchSharingPolicy.FORBIDDEN) + self.assertEqual([], namespace.getAllowedInformationTypes()) + self.assertIsNone(namespace.getDefaultInformationType()) + + def test_public_or_proprietary_anyone(self): + namespace = self.makePackageGitNamespace( + BranchSharingPolicy.PUBLIC_OR_PROPRIETARY) + self.assertContentEqual( + NON_EMBARGOED_INFORMATION_TYPES, + namespace.getAllowedInformationTypes()) + self.assertEqual( + InformationType.PUBLIC, namespace.getDefaultInformationType()) + + def test_proprietary_or_public_anyone(self): + namespace = self.makePackageGitNamespace( + BranchSharingPolicy.PROPRIETARY_OR_PUBLIC) + self.assertEqual([], namespace.getAllowedInformationTypes()) + self.assertIsNone(namespace.getDefaultInformationType()) + + def test_proprietary_or_public_owner_grantee(self): + namespace = self.makePackageGitNamespace( + BranchSharingPolicy.PROPRIETARY_OR_PUBLIC) + distribution = namespace.distro_source_package.distribution + with person_logged_in(distribution.owner): + getUtility(IService, "sharing").sharePillarInformation( + distribution, namespace.owner, distribution.owner, + {InformationType.PROPRIETARY: SharingPermission.ALL}) + self.assertContentEqual( + NON_EMBARGOED_INFORMATION_TYPES, + namespace.getAllowedInformationTypes()) + self.assertEqual( + InformationType.PROPRIETARY, + namespace.getDefaultInformationType()) + + def test_proprietary_or_public_caller_grantee(self): + namespace = self.makePackageGitNamespace( + BranchSharingPolicy.PROPRIETARY_OR_PUBLIC) + distribution = namespace.distro_source_package.distribution + grantee = self.factory.makePerson() + with person_logged_in(distribution.owner): + getUtility(IService, "sharing").sharePillarInformation( + distribution, grantee, distribution.owner, + {InformationType.PROPRIETARY: SharingPermission.ALL}) + self.assertContentEqual( + NON_EMBARGOED_INFORMATION_TYPES, + namespace.getAllowedInformationTypes(grantee)) + self.assertEqual( + InformationType.PROPRIETARY, + namespace.getDefaultInformationType(grantee)) + + def test_proprietary_anyone(self): + namespace = self.makePackageGitNamespace( + BranchSharingPolicy.PROPRIETARY) + self.assertEqual([], namespace.getAllowedInformationTypes()) + self.assertIsNone(namespace.getDefaultInformationType()) + + def test_proprietary_repository_owner_grantee(self): + namespace = self.makePackageGitNamespace( + BranchSharingPolicy.PROPRIETARY) + distribution = namespace.distro_source_package.distribution + with person_logged_in(distribution.owner): + getUtility(IService, "sharing").sharePillarInformation( + distribution, namespace.owner, distribution.owner, + {InformationType.PROPRIETARY: SharingPermission.ALL}) + self.assertContentEqual( + [InformationType.PROPRIETARY], + namespace.getAllowedInformationTypes()) + self.assertEqual( + InformationType.PROPRIETARY, + namespace.getDefaultInformationType()) + + def test_proprietary_caller_grantee(self): + namespace = self.makePackageGitNamespace( + BranchSharingPolicy.PROPRIETARY) + distribution = namespace.distro_source_package.distribution + grantee = self.factory.makePerson() + with person_logged_in(distribution.owner): + getUtility(IService, "sharing").sharePillarInformation( + distribution, grantee, distribution.owner, + {InformationType.PROPRIETARY: SharingPermission.ALL}) + self.assertContentEqual( + [InformationType.PROPRIETARY], + namespace.getAllowedInformationTypes(grantee)) + self.assertEqual( + InformationType.PROPRIETARY, + namespace.getDefaultInformationType(grantee)) + + def test_embargoed_or_proprietary_anyone(self): + namespace = self.makePackageGitNamespace( + BranchSharingPolicy.EMBARGOED_OR_PROPRIETARY) + self.assertEqual([], namespace.getAllowedInformationTypes()) + self.assertIsNone(namespace.getDefaultInformationType()) + + def test_embargoed_or_proprietary_owner_grantee(self): + namespace = self.makePackageGitNamespace( + BranchSharingPolicy.EMBARGOED_OR_PROPRIETARY) + distribution = namespace.distro_source_package.distribution + with person_logged_in(distribution.owner): + getUtility(IService, "sharing").sharePillarInformation( + distribution, namespace.owner, distribution.owner, + {InformationType.PROPRIETARY: SharingPermission.ALL}) + self.assertContentEqual( + [InformationType.PROPRIETARY, InformationType.EMBARGOED], + namespace.getAllowedInformationTypes()) + self.assertEqual( + InformationType.EMBARGOED, + namespace.getDefaultInformationType()) + + def test_embargoed_or_proprietary_caller_grantee(self): + namespace = self.makePackageGitNamespace( + BranchSharingPolicy.EMBARGOED_OR_PROPRIETARY) + distribution = namespace.distro_source_package.distribution + grantee = self.factory.makePerson() + with person_logged_in(distribution.owner): + getUtility(IService, "sharing").sharePillarInformation( + distribution, grantee, distribution.owner, + {InformationType.PROPRIETARY: SharingPermission.ALL}) + self.assertContentEqual( + [InformationType.PROPRIETARY, InformationType.EMBARGOED], + namespace.getAllowedInformationTypes(grantee)) + self.assertEqual( + InformationType.EMBARGOED, + namespace.getDefaultInformationType(grantee)) + + def test_grantee_has_no_artifact_grant(self): + # The owner of a new repository in a distribution whose default + # information type is non-public does not have an artifact grant + # specifically for the new repository, because their existing policy + # grant is sufficient. + person = self.factory.makePerson() + team = self.factory.makeTeam(members=[person]) + namespace = self.makePackageGitNamespace( + BranchSharingPolicy.PROPRIETARY, person=person) + distribution = namespace.distro_source_package.distribution + with person_logged_in(distribution.owner): + getUtility(IService, 'sharing').sharePillarInformation( + distribution, team, distribution.owner, + {InformationType.PROPRIETARY: SharingPermission.ALL}) + repository = namespace.createRepository( + GitRepositoryType.HOSTED, person, self.factory.getUniqueUnicode()) + [policy] = getUtility(IAccessPolicySource).find( + [(distribution, InformationType.PROPRIETARY)]) + apgfs = getUtility(IAccessPolicyGrantFlatSource) + self.assertContentEqual( + [(distribution.owner, {policy: SharingPermission.ALL}, []), + (team, {policy: SharingPermission.ALL}, [])], + apgfs.findGranteePermissionsByPolicy([policy])) + self.assertTrue(removeSecurityProxy(repository).visibleByUser(person)) + + class BaseCanCreateRepositoriesMixin: """Common tests for all namespaces.""" @@ -1040,20 +1217,6 @@ class TestPersonalGitNamespaceAllowedInformationTypes(TestCaseWithFactory): namespace.getAllowedInformationTypes()) -class TestPackageGitNamespaceAllowedInformationTypes(TestCaseWithFactory): - """Tests for PackageGitNamespace.getAllowedInformationTypes.""" - - layer = DatabaseFunctionalLayer - - def test_anyone(self): - # Package repositories are always public. - dsp = self.factory.makeDistributionSourcePackage() - person = self.factory.makePerson() - namespace = PackageGitNamespace(person, dsp) - self.assertContentEqual( - PUBLIC_INFORMATION_TYPES, namespace.getAllowedInformationTypes()) - - class TestOCIProjectGitNamespaceAllowedInformationTypes(TestCaseWithFactory): """Tests for OCIProjectGitNamespace.getAllowedInformationTypes.""" diff --git a/lib/lp/code/model/tests/test_gitrepository.py b/lib/lp/code/model/tests/test_gitrepository.py index 3ad7c8e..90bf08e 100644 --- a/lib/lp/code/model/tests/test_gitrepository.py +++ b/lib/lp/code/model/tests/test_gitrepository.py @@ -20,6 +20,10 @@ import pytz import six from storm.exceptions import LostObjectError from storm.store import Store +from testscenarios import ( + load_tests_apply_scenarios, + WithScenarios, + ) from testtools.matchers import ( AnyMatch, ContainsDict, @@ -141,6 +145,10 @@ from lp.registry.interfaces.accesspolicy import ( IAccessPolicyArtifactSource, IAccessPolicySource, ) +from lp.registry.interfaces.distributionsourcepackage import ( + IDistributionSourcePackage, + ) +from lp.registry.interfaces.ociproject import IOCIProject from lp.registry.interfaces.person import IPerson from lp.registry.interfaces.persondistributionsourcepackage import ( IPersonDistributionSourcePackageFactory, @@ -1746,13 +1754,18 @@ class TestGitRepositoryPrivacy(TestCaseWithFactory): get_policies_for_artifact(repository)) def test__reconcileAccess_for_package_repository(self): - # Git repository privacy isn't yet supported for distributions, so - # no AccessPolicyArtifact is created for a package repository. + # _reconcileAccess uses a distribution policy for a package + # repository. repository = self.factory.makeGitRepository( target=self.factory.makeDistributionSourcePackage(), information_type=InformationType.USERDATA) + [artifact] = getUtility(IAccessArtifactSource).ensure([repository]) + getUtility(IAccessPolicyArtifactSource).deleteByArtifact([artifact]) removeSecurityProxy(repository)._reconcileAccess() - self.assertEqual([], get_policies_for_artifact(repository)) + self.assertContentEqual( + getUtility(IAccessPolicySource).find( + [(repository.target.distribution, InformationType.USERDATA)]), + get_policies_for_artifact(repository)) def test__reconcileAccess_for_oci_project_repository(self): # Git repository privacy isn't yet supported for OCI projects, so no @@ -2356,17 +2369,36 @@ class TestGitRepositoryGetAllowedInformationTypes(TestCaseWithFactory): repository.getAllowedInformationTypes(admin)) -class TestGitRepositoryModerate(TestCaseWithFactory): +class TestGitRepositoryModerate(WithScenarios, TestCaseWithFactory): """Test that project owners and commercial admins can moderate Git repositories.""" layer = DatabaseFunctionalLayer + scenarios = [ + ("project", {"target_factory_name": "makeProduct"}), + ("distribution", + {"target_factory_name": "makeDistributionSourcePackage"}), + ("OCI project", {"target_factory_name": "makeOCIProject"}), + ] + + def _makeGitRepository(self, **kwargs): + target = getattr(self.factory, self.target_factory_name)() + return self.factory.makeGitRepository(target=target, **kwargs) + + def _getPillar(self, repository): + target = repository.target + if IDistributionSourcePackage.providedBy(target): + return target.distribution + elif IOCIProject.providedBy(target): + return target.pillar + else: + return target def test_moderate_permission(self): # Test the ModerateGitRepository security checker. - project = self.factory.makeProduct() - repository = self.factory.makeGitRepository(target=project) - with person_logged_in(project.owner): + repository = self._makeGitRepository() + pillar = self._getPillar(repository) + with person_logged_in(pillar.owner): self.assertTrue(check_permission("launchpad.Moderate", repository)) with celebrity_logged_in("commercial_admin"): self.assertTrue(check_permission("launchpad.Moderate", repository)) @@ -2376,24 +2408,26 @@ class TestGitRepositoryModerate(TestCaseWithFactory): def test_methods_smoketest(self): # Users with launchpad.Moderate can call transitionToInformationType. - project = self.factory.makeProduct() - repository = self.factory.makeGitRepository(target=project) - with person_logged_in(project.owner): - project.setBranchSharingPolicy(BranchSharingPolicy.PUBLIC) + if self.target_factory_name == "makeOCIProject": + self.skipTest("Not implemented for OCI projects yet.") + repository = self._makeGitRepository() + pillar = self._getPillar(repository) + with person_logged_in(pillar.owner): + pillar.setBranchSharingPolicy(BranchSharingPolicy.PUBLIC) repository.transitionToInformationType( - InformationType.PRIVATESECURITY, project.owner) + InformationType.PRIVATESECURITY, pillar.owner) self.assertEqual( InformationType.PRIVATESECURITY, repository.information_type) def test_attribute_smoketest(self): # Users with launchpad.Moderate can set attributes. - project = self.factory.makeProduct() - repository = self.factory.makeGitRepository(target=project) - with person_logged_in(project.owner): + repository = self._makeGitRepository() + pillar = self._getPillar(repository) + with person_logged_in(pillar.owner): repository.description = "something" - repository.reviewer = project.owner + repository.reviewer = pillar.owner self.assertEqual("something", repository.description) - self.assertEqual(project.owner, repository.reviewer) + self.assertEqual(pillar.owner, repository.reviewer) class TestGitRepositoryIsPersonTrustedReviewer(TestCaseWithFactory): @@ -5454,3 +5488,6 @@ class TestGitRepositoryMacaroonIssuer(MacaroonTestMixin, TestCaseWithFactory): ["Caveat check for '%s' failed." % find_caveats_by_name(macaroon2, "lp.expires")[0].caveat_id], issuer, macaroon2, repository, user=repository.owner) + + +load_tests = load_tests_apply_scenarios diff --git a/lib/lp/registry/interfaces/distribution.py b/lib/lp/registry/interfaces/distribution.py index 7e943c1..2e3a406 100644 --- a/lib/lp/registry/interfaces/distribution.py +++ b/lib/lp/registry/interfaces/distribution.py @@ -29,6 +29,7 @@ from lazr.restful.declarations import ( exported, exported_as_webservice_collection, exported_as_webservice_entry, + mutator_for, operation_for_version, operation_parameters, operation_returns_collection_of, @@ -780,6 +781,41 @@ class IDistributionView( class IDistributionEditRestricted(IOfficialBugTagTargetRestricted): """IDistribution properties requiring launchpad.Edit permission.""" + @mutator_for(IDistributionView['bug_sharing_policy']) + @operation_parameters(bug_sharing_policy=copy_field( + IDistributionView['bug_sharing_policy'])) + @export_write_operation() + @operation_for_version("devel") + def setBugSharingPolicy(bug_sharing_policy): + """Mutator for bug_sharing_policy. + + Checks authorization and entitlement. + """ + + @mutator_for(IDistributionView['branch_sharing_policy']) + @operation_parameters( + branch_sharing_policy=copy_field( + IDistributionView['branch_sharing_policy'])) + @export_write_operation() + @operation_for_version("devel") + def setBranchSharingPolicy(branch_sharing_policy): + """Mutator for branch_sharing_policy. + + Checks authorization and entitlement. + """ + + @mutator_for(IDistributionView['specification_sharing_policy']) + @operation_parameters( + specification_sharing_policy=copy_field( + IDistributionView['specification_sharing_policy'])) + @export_write_operation() + @operation_for_version("devel") + def setSpecificationSharingPolicy(specification_sharing_policy): + """Mutator for specification_sharing_policy. + + Checks authorization and entitlement. + """ + def checkInformationType(value): """Check whether the information type change should be permitted. diff --git a/lib/lp/registry/model/distribution.py b/lib/lp/registry/model/distribution.py index 94558ca..82f062d 100644 --- a/lib/lp/registry/model/distribution.py +++ b/lib/lp/registry/model/distribution.py @@ -83,11 +83,17 @@ from lp.blueprints.enums import SpecificationFilter from lp.blueprints.model.specification import ( HasSpecificationsMixin, Specification, + SPECIFICATION_POLICY_ALLOWED_TYPES, + SPECIFICATION_POLICY_DEFAULT_TYPES, ) from lp.blueprints.model.specificationsearch import search_specifications from lp.blueprints.model.sprint import HasSprintsMixin from lp.bugs.interfaces.bugsummary import IBugSummaryDimension from lp.bugs.interfaces.bugsupervisor import IHasBugSupervisor +from lp.bugs.interfaces.bugtarget import ( + BUG_POLICY_ALLOWED_TYPES, + BUG_POLICY_DEFAULT_TYPES, + ) from lp.bugs.interfaces.bugtaskfilter import OrderedBugTask from lp.bugs.model.bugtarget import ( BugTargetBase, @@ -101,6 +107,7 @@ from lp.code.interfaces.seriessourcepackagebranch import ( IFindOfficialBranchLinks, ) from lp.code.model.branch import Branch +from lp.code.model.branchnamespace import BRANCH_POLICY_ALLOWED_TYPES from lp.oci.interfaces.ociregistrycredentials import ( IOCIRegistryCredentialsSet, ) @@ -119,6 +126,7 @@ from lp.registry.errors import ( ProprietaryPillar, ) from lp.registry.interfaces.accesspolicy import ( + IAccessPolicyArtifactSource, IAccessPolicyGrantSource, IAccessPolicySource, ) @@ -240,6 +248,24 @@ from lp.translations.model.potemplate import POTemplate from lp.translations.model.translationpolicy import TranslationPolicyMixin +bug_policy_default = { + InformationType.PUBLIC: BugSharingPolicy.PUBLIC, + InformationType.PROPRIETARY: BugSharingPolicy.PROPRIETARY, + } + + +branch_policy_default = { + InformationType.PUBLIC: BranchSharingPolicy.PUBLIC, + InformationType.PROPRIETARY: BranchSharingPolicy.PROPRIETARY, + } + + +specification_policy_default = { + InformationType.PUBLIC: SpecificationSharingPolicy.PUBLIC, + InformationType.PROPRIETARY: SpecificationSharingPolicy.PROPRIETARY, + } + + @implementer( IBugSummaryDimension, IDistribution, IHasBugSupervisor, IHasBuildRecords, IHasIcon, IHasLogo, IHasMugshot, @@ -462,6 +488,10 @@ class Distribution(SQLBase, BugTargetBase, MakesAnnouncements, if (old_info_type == InformationType.PUBLIC and value != InformationType.PUBLIC): self._ensure_complimentary_subscription() + self.setBranchSharingPolicy(branch_policy_default[value]) + self.setBugSharingPolicy(bug_policy_default[value]) + self.setSpecificationSharingPolicy( + specification_policy_default[value]) self._ensurePolicies([value]) @property @@ -469,23 +499,51 @@ class Distribution(SQLBase, BugTargetBase, MakesAnnouncements, """See `IPillar`.""" return "Distribution" - @property - def branch_sharing_policy(self): - """See `IHasSharingPolicies.""" - # Sharing policy for distributions is always PUBLIC. - return BranchSharingPolicy.PUBLIC - - @property - def bug_sharing_policy(self): - """See `IHasSharingPolicies.""" - # Sharing policy for distributions is always PUBLIC. - return BugSharingPolicy.PUBLIC - - @property - def specification_sharing_policy(self): - """See `IHasSharingPolicies.""" - # Sharing policy for distributions is always PUBLIC. - return SpecificationSharingPolicy.PUBLIC + bug_sharing_policy = DBEnum( + enum=BugSharingPolicy, allow_none=True, + default=BugSharingPolicy.PUBLIC) + branch_sharing_policy = DBEnum( + enum=BranchSharingPolicy, allow_none=True, + default=BranchSharingPolicy.PUBLIC) + specification_sharing_policy = DBEnum( + enum=SpecificationSharingPolicy, allow_none=True, + default=SpecificationSharingPolicy.PUBLIC) + + def _prepare_to_set_sharing_policy(self, var, enum, kind, allowed_types): + if (var not in {enum.PUBLIC, enum.FORBIDDEN} and + not self.has_current_commercial_subscription): + raise CommercialSubscribersOnly( + "A current commercial subscription is required to use " + "proprietary %s." % kind) + if self.information_type != InformationType.PUBLIC: + if InformationType.PUBLIC in allowed_types[var]: + raise ProprietaryPillar( + "The distribution is %s." % self.information_type.title) + self._ensurePolicies(allowed_types[var]) + + def setBranchSharingPolicy(self, branch_sharing_policy): + """See `IDistributionEditRestricted`.""" + self._prepare_to_set_sharing_policy( + branch_sharing_policy, BranchSharingPolicy, 'branches', + BRANCH_POLICY_ALLOWED_TYPES) + self.branch_sharing_policy = branch_sharing_policy + self._pruneUnusedPolicies() + + def setBugSharingPolicy(self, bug_sharing_policy): + """See `IDistributionEditRestricted`.""" + self._prepare_to_set_sharing_policy( + bug_sharing_policy, BugSharingPolicy, 'bugs', + BUG_POLICY_ALLOWED_TYPES) + self.bug_sharing_policy = bug_sharing_policy + self._pruneUnusedPolicies() + + def setSpecificationSharingPolicy(self, specification_sharing_policy): + """See `IDistributionEditRestricted`.""" + self._prepare_to_set_sharing_policy( + specification_sharing_policy, SpecificationSharingPolicy, + 'specifications', SPECIFICATION_POLICY_ALLOWED_TYPES) + self.specification_sharing_policy = specification_sharing_policy + self._pruneUnusedPolicies() # Cache of AccessPolicy.ids that convey launchpad.LimitedView. # Unlike artifacts' cached access_policies, an AccessArtifactGrant @@ -530,6 +588,33 @@ class Distribution(SQLBase, BugTargetBase, MakesAnnouncements, else: self.access_policies = None + def _pruneUnusedPolicies(self): + allowed_bug_types = set( + BUG_POLICY_ALLOWED_TYPES.get( + self.bug_sharing_policy, FREE_INFORMATION_TYPES)) + allowed_branch_types = set( + BRANCH_POLICY_ALLOWED_TYPES.get( + self.branch_sharing_policy, FREE_INFORMATION_TYPES)) + allowed_spec_types = set( + SPECIFICATION_POLICY_ALLOWED_TYPES.get( + self.specification_sharing_policy, [InformationType.PUBLIC])) + allowed_types = ( + allowed_bug_types | allowed_branch_types | allowed_spec_types) + allowed_types.add(self.information_type) + # Fetch all APs, and after filtering out ones that are forbidden + # by the bug, branch, and specification policies, the APs that have no + # APAs are unused and can be deleted. + ap_source = getUtility(IAccessPolicySource) + access_policies = set(ap_source.findByPillar([self])) + apa_source = getUtility(IAccessPolicyArtifactSource) + unused_aps = [ + ap for ap in access_policies + if ap.type not in allowed_types + and apa_source.findByPolicy([ap]).is_empty()] + getUtility(IAccessPolicyGrantSource).revokeByPolicy(unused_aps) + ap_source.delete([(ap.pillar, ap.type) for ap in unused_aps]) + self._cacheAccessPolicies() + @cachedproperty def commercial_subscription(self): return IStore(CommercialSubscription).find( @@ -1157,11 +1242,13 @@ class Distribution(SQLBase, BugTargetBase, MakesAnnouncements, def getAllowedSpecificationInformationTypes(self): """See `ISpecificationTarget`.""" - return (InformationType.PUBLIC,) + return SPECIFICATION_POLICY_ALLOWED_TYPES[ + self.specification_sharing_policy] def getDefaultSpecificationInformationType(self): """See `ISpecificationTarget`.""" - return InformationType.PUBLIC + return SPECIFICATION_POLICY_DEFAULT_TYPES[ + self.specification_sharing_policy] def searchQuestions(self, search_text=None, status=QUESTION_STATUS_DEFAULT_SEARCH, @@ -1639,11 +1726,11 @@ class Distribution(SQLBase, BugTargetBase, MakesAnnouncements, def getAllowedBugInformationTypes(self): """See `IDistribution.`""" - return FREE_INFORMATION_TYPES + return BUG_POLICY_ALLOWED_TYPES[self.bug_sharing_policy] def getDefaultBugInformationType(self): """See `IDistribution.`""" - return InformationType.PUBLIC + return BUG_POLICY_DEFAULT_TYPES[self.bug_sharing_policy] def userCanEdit(self, user): """See `IDistribution`.""" @@ -1922,12 +2009,11 @@ class DistributionSet: IStore(distro).add(distro) if information_type != InformationType.PUBLIC: distro._ensure_complimentary_subscription() - # XXX cjwatson 2022-02-10: Replace this with sharing policies once - # those are defined here. - distro._ensurePolicies( - [information_type] - if information_type == InformationType.PROPRIETARY - else FREE_INFORMATION_TYPES) + distro.setBugSharingPolicy(bug_policy_default[information_type]) + distro.setBranchSharingPolicy( + branch_policy_default[information_type]) + distro.setSpecificationSharingPolicy( + specification_policy_default[information_type]) if information_type == InformationType.PUBLIC: getUtility(IArchiveSet).new( distribution=distro, owner=owner, diff --git a/lib/lp/registry/services/sharingservice.py b/lib/lp/registry/services/sharingservice.py index 9480512..9713061 100644 --- a/lib/lp/registry/services/sharingservice.py +++ b/lib/lp/registry/services/sharingservice.py @@ -549,15 +549,9 @@ class SharingService: def getBranchSharingPolicies(self, pillar): """See `ISharingService`.""" - # Only Products have branch sharing policies. Distributions just - # default to Public. - # If the branch sharing policy is EMBARGOED_OR_PROPRIETARY, then we - # do not allow any other policies. allowed_policies = [BranchSharingPolicy.PUBLIC] - # Commercial projects also allow proprietary branches. - if (IProduct.providedBy(pillar) - and pillar.has_current_commercial_subscription): - + # Commercial pillars also allow proprietary branches. + if pillar.has_current_commercial_subscription: if pillar.private: allowed_policies = [ BranchSharingPolicy.EMBARGOED_OR_PROPRIETARY, @@ -579,13 +573,9 @@ class SharingService: def getBugSharingPolicies(self, pillar): """See `ISharingService`.""" - # Only Products have bug sharing policies. Distributions just - # default to Public. allowed_policies = [BugSharingPolicy.PUBLIC] - # Commercial projects also allow proprietary bugs. - if (IProduct.providedBy(pillar) - and pillar.has_current_commercial_subscription): - + # Commercial pillars also allow proprietary bugs. + if pillar.has_current_commercial_subscription: if pillar.private: allowed_policies = [ BugSharingPolicy.EMBARGOED_OR_PROPRIETARY, @@ -607,13 +597,8 @@ class SharingService: def getSpecificationSharingPolicies(self, pillar): """See `ISharingService`.""" - # Only Products have specification sharing policies. Distributions just - # default to Public. allowed_policies = [SpecificationSharingPolicy.PUBLIC] - # Commercial projects also allow proprietary specifications. - if (IProduct.providedBy(pillar) - and pillar.has_current_commercial_subscription): - + if pillar.has_current_commercial_subscription: if pillar.private: allowed_policies = [ SpecificationSharingPolicy.EMBARGOED_OR_PROPRIETARY, @@ -910,10 +895,6 @@ class SharingService: if (not branch_sharing_policy and not bug_sharing_policy and not specification_sharing_policy): return None - # Only Products have sharing policies. - if not IProduct.providedBy(pillar): - raise ValueError( - "Sharing policies are only supported for products.") if branch_sharing_policy: pillar.setBranchSharingPolicy(branch_sharing_policy) if bug_sharing_policy: diff --git a/lib/lp/registry/services/tests/test_sharingservice.py b/lib/lp/registry/services/tests/test_sharingservice.py index 8ce3be1..8c557f8 100644 --- a/lib/lp/registry/services/tests/test_sharingservice.py +++ b/lib/lp/registry/services/tests/test_sharingservice.py @@ -91,10 +91,6 @@ class PillarScenariosMixin(WithScenarios): self.skipTest("Only relevant for Product.") def _makePillar(self, **kwargs): - if ("bug_sharing_policy" in kwargs or - "branch_sharing_policy" in kwargs or - "specification_sharing_policy" in kwargs): - self._skipUnlessProduct() return getattr(self.factory, self.pillar_factory_name)(**kwargs) def _makeBranch(self, pillar, **kwargs): @@ -246,14 +242,12 @@ class TestSharingService( pillar, [BranchSharingPolicy.PUBLIC]) def test_getBranchSharingPolicies_expired_commercial(self): - self._skipUnlessProduct() pillar = self._makePillar() self.factory.makeCommercialSubscription(pillar, expired=True) self._assert_getBranchSharingPolicies( pillar, [BranchSharingPolicy.PUBLIC]) def test_getBranchSharingPolicies_commercial(self): - self._skipUnlessProduct() pillar = self._makePillar() self.factory.makeCommercialSubscription(pillar) self._assert_getBranchSharingPolicies( @@ -266,7 +260,6 @@ class TestSharingService( def test_getBranchSharingPolicies_non_public(self): # When the pillar is non-public the policy options are limited to # only proprietary or embargoed/proprietary. - self._skipUnlessProduct() owner = self.factory.makePerson() pillar = self._makePillar( information_type=InformationType.PROPRIETARY, @@ -280,7 +273,6 @@ class TestSharingService( def test_getBranchSharingPolicies_disallowed_policy(self): # getBranchSharingPolicies includes a pillar's current policy even if # it is nominally not allowed. - self._skipUnlessProduct() pillar = self._makePillar() self.factory.makeCommercialSubscription(pillar, expired=True) with person_logged_in(pillar.owner): @@ -313,14 +305,12 @@ class TestSharingService( pillar, [SpecificationSharingPolicy.PUBLIC]) def test_getSpecificationSharingPolicies_expired_commercial(self): - self._skipUnlessProduct() pillar = self._makePillar() self.factory.makeCommercialSubscription(pillar, expired=True) self._assert_getSpecificationSharingPolicies( pillar, [SpecificationSharingPolicy.PUBLIC]) def test_getSpecificationSharingPolicies_commercial(self): - self._skipUnlessProduct() pillar = self._makePillar() self.factory.makeCommercialSubscription(pillar) self._assert_getSpecificationSharingPolicies( @@ -333,7 +323,6 @@ class TestSharingService( def test_getSpecificationSharingPolicies_non_public(self): # When the pillar is non-public the policy options are limited to # only proprietary or embargoed/proprietary. - self._skipUnlessProduct() owner = self.factory.makePerson() pillar = self._makePillar( information_type=InformationType.PROPRIETARY, @@ -367,13 +356,11 @@ class TestSharingService( self._assert_getBugSharingPolicies(pillar, [BugSharingPolicy.PUBLIC]) def test_getBugSharingPolicies_expired_commercial(self): - self._skipUnlessProduct() pillar = self._makePillar() self.factory.makeCommercialSubscription(pillar, expired=True) self._assert_getBugSharingPolicies(pillar, [BugSharingPolicy.PUBLIC]) def test_getBugSharingPolicies_commercial(self): - self._skipUnlessProduct() pillar = self._makePillar() self.factory.makeCommercialSubscription(pillar) self._assert_getBugSharingPolicies( @@ -386,7 +373,6 @@ class TestSharingService( def test_getBugSharingPolicies_non_public(self): # When the pillar is non-public the policy options are limited to # only proprietary or embargoed/proprietary. - self._skipUnlessProduct() owner = self.factory.makePerson() pillar = self._makePillar( information_type=InformationType.PROPRIETARY, @@ -400,7 +386,6 @@ class TestSharingService( def test_getBugSharingPolicies_disallowed_policy(self): # getBugSharingPolicies includes a pillar's current policy even if it # is nominally not allowed. - self._skipUnlessProduct() pillar = self._makePillar() self.factory.makeCommercialSubscription(pillar, expired=True) with person_logged_in(pillar.owner): @@ -1294,7 +1279,6 @@ class TestSharingService( def test_ensureAccessGrantsBranches(self): # Access grants can be created for branches. - self._skipUnlessProduct() owner = self.factory.makePerson() pillar = self._makePillar(owner=owner) login_person(owner) @@ -1305,7 +1289,6 @@ class TestSharingService( def test_ensureAccessGrantsGitRepositories(self): # Access grants can be created for Git repositories. - self._skipUnlessProduct() owner = self.factory.makePerson() pillar = self._makePillar(owner=owner) login_person(owner) @@ -1375,7 +1358,6 @@ class TestSharingService( def test_updatePillarBugSharingPolicy(self): # updatePillarSharingPolicies works for bugs. - self._skipUnlessProduct() owner = self.factory.makePerson() pillar = self._makePillar(owner=owner) self.factory.makeCommercialSubscription(pillar) @@ -1388,7 +1370,6 @@ class TestSharingService( def test_updatePillarBranchSharingPolicy(self): # updatePillarSharingPolicies works for branches. - self._skipUnlessProduct() owner = self.factory.makePerson() pillar = self._makePillar(owner=owner) self.factory.makeCommercialSubscription(pillar) @@ -1401,7 +1382,6 @@ class TestSharingService( def test_updatePillarSpecificationSharingPolicy(self): # updatePillarSharingPolicies works for specifications. - self._skipUnlessProduct() owner = self.factory.makePerson() pillar = self._makePillar(owner=owner) self.factory.makeCommercialSubscription(pillar) @@ -1730,7 +1710,6 @@ class TestSharingService( def test_getPeopleWithAccessBranches(self): # Test the getPeopleWithoutAccess method with branches. - self._skipUnlessProduct() owner = self.factory.makePerson() pillar = self._makePillar(owner=owner) branch = self._makeBranch( @@ -1741,7 +1720,6 @@ class TestSharingService( def test_getPeopleWithAccessGitRepositories(self): # Test the getPeopleWithoutAccess method with Git repositories. - self._skipUnlessProduct() owner = self.factory.makePerson() pillar = self._makePillar(owner=owner) gitrepository = self._makeGitRepository( diff --git a/lib/lp/registry/tests/test_distribution.py b/lib/lp/registry/tests/test_distribution.py index 49f55df..481ff56 100644 --- a/lib/lp/registry/tests/test_distribution.py +++ b/lib/lp/registry/tests/test_distribution.py @@ -31,6 +31,12 @@ from lp.app.errors import ( ServiceUsageForbidden, ) from lp.app.interfaces.launchpad import ILaunchpadCelebrities +from lp.app.interfaces.services import IService +from lp.blueprints.model.specification import ( + SPECIFICATION_POLICY_ALLOWED_TYPES, + ) +from lp.bugs.interfaces.bugtarget import BUG_POLICY_ALLOWED_TYPES +from lp.code.model.branchnamespace import BRANCH_POLICY_ALLOWED_TYPES from lp.oci.tests.helpers import OCIConfigHelperMixin from lp.registry.enums import ( BranchSharingPolicy, @@ -38,6 +44,7 @@ from lp.registry.enums import ( DistributionDefaultTraversalPolicy, EXCLUSIVE_TEAM_POLICY, INCLUSIVE_TEAM_POLICY, + SpecificationSharingPolicy, TeamMembershipPolicy, ) from lp.registry.errors import ( @@ -69,6 +76,7 @@ from lp.soyuz.interfaces.distributionsourcepackagerelease import ( IDistributionSourcePackageRelease, ) from lp.testing import ( + admin_logged_in, api_url, celebrity_logged_in, login_person, @@ -398,13 +406,52 @@ class TestDistribution(TestCaseWithFactory): grantees = {grant.grantee for grant in grants} self.assertEqual(expected_grantess, grantees) + def test_open_creation_sharing_policies(self): + # Creating a new open (non-proprietary) distribution sets the bug + # and branch sharing policies to public, and creates policies if + # required. + owner = self.factory.makePerson() + with person_logged_in(owner): + distribution = self.factory.makeDistribution(owner=owner) + self.assertEqual( + BugSharingPolicy.PUBLIC, distribution.bug_sharing_policy) + self.assertEqual( + BranchSharingPolicy.PUBLIC, distribution.branch_sharing_policy) + self.assertEqual( + SpecificationSharingPolicy.PUBLIC, + distribution.specification_sharing_policy) + aps = getUtility(IAccessPolicySource).findByPillar([distribution]) + expected = [ + InformationType.USERDATA, InformationType.PRIVATESECURITY] + self.assertContentEqual(expected, [policy.type for policy in aps]) + + def test_proprietary_creation_sharing_policies(self): + # Creating a new proprietary distribution sets the bug, branch, and + # specification sharing policies to proprietary. + owner = self.factory.makePerson() + with person_logged_in(owner): + distribution = self.factory.makeDistribution( + owner=owner, information_type=InformationType.PROPRIETARY) + self.assertEqual( + BugSharingPolicy.PROPRIETARY, distribution.bug_sharing_policy) + self.assertEqual( + BranchSharingPolicy.PROPRIETARY, + distribution.branch_sharing_policy) + self.assertEqual( + SpecificationSharingPolicy.PROPRIETARY, + distribution.specification_sharing_policy) + aps = getUtility(IAccessPolicySource).findByPillar([distribution]) + expected = [InformationType.PROPRIETARY] + self.assertContentEqual(expected, [policy.type for policy in aps]) + def test_change_info_type_proprietary_check_artifacts(self): # Cannot change distribution information_type if any artifacts are # public. - # XXX cjwatson 2022-02-11: Make this use - # artifact.transitionToInformationType once sharing policies are in - # place. - distribution = self.factory.makeDistribution() + distribution = self.factory.makeDistribution( + specification_sharing_policy=( + SpecificationSharingPolicy.PUBLIC_OR_PROPRIETARY), + bug_sharing_policy=BugSharingPolicy.PUBLIC_OR_PROPRIETARY, + branch_sharing_policy=BranchSharingPolicy.PUBLIC_OR_PROPRIETARY) self.useContext(person_logged_in(distribution.owner)) spec = self.factory.makeSpecification(distribution=distribution) for info_type in PRIVATE_DISTRIBUTION_TYPES: @@ -412,32 +459,34 @@ class TestDistribution(TestCaseWithFactory): CannotChangeInformationType, "Some blueprints are public."): distribution.information_type = info_type - removeSecurityProxy(spec).information_type = ( - InformationType.PROPRIETARY) + spec.transitionToInformationType( + InformationType.PROPRIETARY, distribution.owner) dsp = self.factory.makeDistributionSourcePackage( distribution=distribution) bug = self.factory.makeBug(target=dsp) for bug_info_type in FREE_INFORMATION_TYPES: - removeSecurityProxy(bug).information_type = bug_info_type + bug.transitionToInformationType(bug_info_type, distribution.owner) for info_type in PRIVATE_DISTRIBUTION_TYPES: with ExpectedException( CannotChangeInformationType, "Some bugs are neither proprietary nor embargoed."): distribution.information_type = info_type - removeSecurityProxy(bug).information_type = InformationType.PROPRIETARY + bug.transitionToInformationType( + InformationType.PROPRIETARY, distribution.owner) distroseries = self.factory.makeDistroSeries(distribution=distribution) sp = self.factory.makeSourcePackage(distroseries=distroseries) branch = self.factory.makeBranch(sourcepackage=sp) for branch_info_type in FREE_INFORMATION_TYPES: - removeSecurityProxy(branch).information_type = branch_info_type + branch.transitionToInformationType( + branch_info_type, distribution.owner) for info_type in PRIVATE_DISTRIBUTION_TYPES: with ExpectedException( CannotChangeInformationType, "Some branches are neither proprietary nor " "embargoed."): distribution.information_type = info_type - removeSecurityProxy(branch).information_type = ( - InformationType.PROPRIETARY) + branch.transitionToInformationType( + InformationType.PROPRIETARY, distribution.owner) for info_type in PRIVATE_DISTRIBUTION_TYPES: distribution.information_type = info_type @@ -457,6 +506,40 @@ class TestDistribution(TestCaseWithFactory): else: distribution.information_type = info_type + def test_change_info_type_proprietary_sets_policies(self): + # Changing information type from public to proprietary sets the + # appropriate policies. + distribution = self.factory.makeDistribution() + with person_logged_in(distribution.owner): + distribution.information_type = InformationType.PROPRIETARY + self.assertEqual( + BranchSharingPolicy.PROPRIETARY, + distribution.branch_sharing_policy) + self.assertEqual( + BugSharingPolicy.PROPRIETARY, distribution.bug_sharing_policy) + self.assertEqual( + SpecificationSharingPolicy.PROPRIETARY, + distribution.specification_sharing_policy) + + def test_proprietary_to_public_leaves_policies(self): + # Changing information type from public leaves sharing policies + # unchanged. + owner = self.factory.makePerson() + distribution = self.factory.makeDistribution( + information_type=InformationType.PROPRIETARY, owner=owner) + with person_logged_in(owner): + distribution.information_type = InformationType.PUBLIC + # Setting information type to the current type should be a no-op. + distribution.information_type = InformationType.PUBLIC + self.assertEqual( + BranchSharingPolicy.PROPRIETARY, + distribution.branch_sharing_policy) + self.assertEqual( + BugSharingPolicy.PROPRIETARY, distribution.bug_sharing_policy) + self.assertEqual( + SpecificationSharingPolicy.PROPRIETARY, + distribution.specification_sharing_policy) + def test_cacheAccessPolicies(self): # Distribution.access_policies is a list caching AccessPolicy.ids # for which an AccessPolicyGrant or AccessArtifactGrant gives a @@ -483,6 +566,18 @@ class TestDistribution(TestCaseWithFactory): naked_distribution.information_type = InformationType.PUBLIC self.assertIsNone(naked_distribution.access_policies) + # Proprietary distributions can have both Proprietary and Embargoed + # artifacts, and someone who can see either needs LimitedView on the + # pillar they're on. So both policies are permissible if they + # exist. + naked_distribution.information_type = InformationType.PROPRIETARY + naked_distribution.setBugSharingPolicy( + BugSharingPolicy.EMBARGOED_OR_PROPRIETARY) + [emb_policy] = aps.find([(distribution, InformationType.EMBARGOED)]) + self.assertContentEqual( + [prop_policy.id, emb_policy.id], + naked_distribution.access_policies) + def test_checkInformationType_bug_supervisor(self): # Bug supervisors of proprietary distributions must not have # inclusive membership policies. @@ -744,6 +839,378 @@ class TestDistribution(TestCaseWithFactory): distribution.information_type = InformationType.PROPRIETARY +class TestDistributionBugInformationTypes(TestCaseWithFactory): + + layer = DatabaseFunctionalLayer + + def makeDistributionWithPolicy(self, bug_sharing_policy): + distribution = self.factory.makeDistribution() + self.factory.makeCommercialSubscription(pillar=distribution) + with person_logged_in(distribution.owner): + distribution.setBugSharingPolicy(bug_sharing_policy) + return distribution + + def test_no_policy(self): + # New distributions can only use the non-proprietary information + # types. + distribution = self.factory.makeDistribution() + self.assertContentEqual( + FREE_INFORMATION_TYPES, + distribution.getAllowedBugInformationTypes()) + self.assertEqual( + InformationType.PUBLIC, + distribution.getDefaultBugInformationType()) + + def test_sharing_policy_public_or_proprietary(self): + # bug_sharing_policy can enable Proprietary. + distribution = self.makeDistributionWithPolicy( + BugSharingPolicy.PUBLIC_OR_PROPRIETARY) + self.assertContentEqual( + FREE_INFORMATION_TYPES + (InformationType.PROPRIETARY,), + distribution.getAllowedBugInformationTypes()) + self.assertEqual( + InformationType.PUBLIC, + distribution.getDefaultBugInformationType()) + + def test_sharing_policy_proprietary_or_public(self): + # bug_sharing_policy can enable and default to Proprietary. + distribution = self.makeDistributionWithPolicy( + BugSharingPolicy.PROPRIETARY_OR_PUBLIC) + self.assertContentEqual( + FREE_INFORMATION_TYPES + (InformationType.PROPRIETARY,), + distribution.getAllowedBugInformationTypes()) + self.assertEqual( + InformationType.PROPRIETARY, + distribution.getDefaultBugInformationType()) + + def test_sharing_policy_proprietary(self): + # bug_sharing_policy can enable only Proprietary. + distribution = self.makeDistributionWithPolicy( + BugSharingPolicy.PROPRIETARY) + self.assertContentEqual( + [InformationType.PROPRIETARY], + distribution.getAllowedBugInformationTypes()) + self.assertEqual( + InformationType.PROPRIETARY, + distribution.getDefaultBugInformationType()) + + +class TestDistributionSpecificationPolicyAndInformationTypes( + TestCaseWithFactory): + + layer = DatabaseFunctionalLayer + + def makeDistributionWithPolicy(self, specification_sharing_policy): + distribution = self.factory.makeDistribution() + self.factory.makeCommercialSubscription(pillar=distribution) + with person_logged_in(distribution.owner): + distribution.setSpecificationSharingPolicy( + specification_sharing_policy) + return distribution + + def test_no_policy(self): + # Distributions that have not specified a policy can use the PUBLIC + # information type. + distribution = self.factory.makeDistribution() + self.assertContentEqual( + [InformationType.PUBLIC], + distribution.getAllowedSpecificationInformationTypes()) + self.assertEqual( + InformationType.PUBLIC, + distribution.getDefaultSpecificationInformationType()) + + def test_sharing_policy_public(self): + # Distributions with a purely public policy should use PUBLIC + # information type. + distribution = self.makeDistributionWithPolicy( + SpecificationSharingPolicy.PUBLIC) + self.assertContentEqual( + [InformationType.PUBLIC], + distribution.getAllowedSpecificationInformationTypes()) + self.assertEqual( + InformationType.PUBLIC, + distribution.getDefaultSpecificationInformationType()) + + def test_sharing_policy_public_or_proprietary(self): + # specification_sharing_policy can enable Proprietary. + distribution = self.makeDistributionWithPolicy( + SpecificationSharingPolicy.PUBLIC_OR_PROPRIETARY) + self.assertContentEqual( + [InformationType.PUBLIC, InformationType.PROPRIETARY], + distribution.getAllowedSpecificationInformationTypes()) + self.assertEqual( + InformationType.PUBLIC, + distribution.getDefaultSpecificationInformationType()) + + def test_sharing_policy_proprietary_or_public(self): + # specification_sharing_policy can enable and default to Proprietary. + distribution = self.makeDistributionWithPolicy( + SpecificationSharingPolicy.PROPRIETARY_OR_PUBLIC) + self.assertContentEqual( + [InformationType.PUBLIC, InformationType.PROPRIETARY], + distribution.getAllowedSpecificationInformationTypes()) + self.assertEqual( + InformationType.PROPRIETARY, + distribution.getDefaultSpecificationInformationType()) + + def test_sharing_policy_proprietary(self): + # specification_sharing_policy can enable only Proprietary. + distribution = self.makeDistributionWithPolicy( + SpecificationSharingPolicy.PROPRIETARY) + self.assertContentEqual( + [InformationType.PROPRIETARY], + distribution.getAllowedSpecificationInformationTypes()) + self.assertEqual( + InformationType.PROPRIETARY, + distribution.getDefaultSpecificationInformationType()) + + def test_sharing_policy_embargoed_or_proprietary(self): + # specification_sharing_policy can be embargoed and then proprietary. + distribution = self.makeDistributionWithPolicy( + SpecificationSharingPolicy.EMBARGOED_OR_PROPRIETARY) + self.assertContentEqual( + [InformationType.PROPRIETARY, InformationType.EMBARGOED], + distribution.getAllowedSpecificationInformationTypes()) + self.assertEqual( + InformationType.EMBARGOED, + distribution.getDefaultSpecificationInformationType()) + + +class BaseSharingPolicyTests: + """Common tests for distribution sharing policies.""" + + layer = DatabaseFunctionalLayer + + def setSharingPolicy(self, policy, user): + raise NotImplementedError + + def getSharingPolicy(self): + raise NotImplementedError + + def setUp(self): + super().setUp() + self.distribution = self.factory.makeDistribution() + self.commercial_admin = self.factory.makeCommercialAdmin() + + def test_owner_can_set_policy(self): + # Distribution maintainers can set sharing policies. + self.setSharingPolicy(self.public_policy, self.distribution.owner) + self.assertEqual(self.public_policy, self.getSharingPolicy()) + + def test_commercial_admin_can_set_policy(self): + # Commercial admins can set sharing policies for commercial + # distributions. + self.factory.makeCommercialSubscription(pillar=self.distribution) + self.setSharingPolicy(self.public_policy, self.commercial_admin) + self.assertEqual(self.public_policy, self.getSharingPolicy()) + + def test_random_cannot_set_policy(self): + # An unrelated user can't set sharing policies. + person = self.factory.makePerson() + self.assertRaises( + Unauthorized, self.setSharingPolicy, self.public_policy, person) + + def test_anonymous_cannot_set_policy(self): + # An anonymous user can't set sharing policies. + self.assertRaises( + Unauthorized, self.setSharingPolicy, self.public_policy, None) + + def test_proprietary_forbidden_without_commercial_sub(self): + # No policy that allows Proprietary can be configured without a + # commercial subscription. + self.setSharingPolicy(self.public_policy, self.distribution.owner) + self.assertEqual(self.public_policy, self.getSharingPolicy()) + for policy in self.commercial_policies: + self.assertRaises( + CommercialSubscribersOnly, + self.setSharingPolicy, policy, self.distribution.owner) + + def test_proprietary_allowed_with_commercial_sub(self): + # All policies are valid when there's a current commercial + # subscription. + self.factory.makeCommercialSubscription(pillar=self.distribution) + for policy in self.enum.items: + self.setSharingPolicy(policy, self.commercial_admin) + self.assertEqual(policy, self.getSharingPolicy()) + + def test_setting_proprietary_creates_access_policy(self): + # Setting a policy that allows Proprietary creates a + # corresponding access policy and shares it with the the + # maintainer. + self.factory.makeCommercialSubscription(pillar=self.distribution) + self.assertEqual( + [InformationType.PRIVATESECURITY, InformationType.USERDATA], + [policy.type for policy in + getUtility(IAccessPolicySource).findByPillar( + [self.distribution])]) + self.setSharingPolicy( + self.commercial_policies[0], self.commercial_admin) + self.assertEqual( + [InformationType.PRIVATESECURITY, InformationType.USERDATA, + InformationType.PROPRIETARY], + [policy.type for policy in + getUtility(IAccessPolicySource).findByPillar( + [self.distribution])]) + self.assertTrue( + getUtility(IService, 'sharing').checkPillarAccess( + [self.distribution], InformationType.PROPRIETARY, + self.distribution.owner)) + + def test_unused_policies_are_pruned(self): + # When a sharing policy is changed, the allowed information types may + # become more restricted. If this case, any existing access polices + # for the now defunct information type(s) should be removed so long as + # there are no corresponding policy artifacts. + + # We create a distribution with and ensure there's an APA. + ap_source = getUtility(IAccessPolicySource) + distribution = self.factory.makeDistribution() + [ap] = ap_source.find( + [(distribution, InformationType.PRIVATESECURITY)]) + self.factory.makeAccessPolicyArtifact(policy=ap) + + def getAccessPolicyTypes(pillar): + return [ + ap.type + for ap in ap_source.findByPillar([pillar])] + + # Now change the sharing policies to PROPRIETARY + self.factory.makeCommercialSubscription(pillar=distribution) + with person_logged_in(distribution.owner): + distribution.setBugSharingPolicy(BugSharingPolicy.PROPRIETARY) + # Just bug sharing policy has been changed so all previous policy + # types are still valid. + self.assertContentEqual( + [InformationType.PRIVATESECURITY, InformationType.USERDATA, + InformationType.PROPRIETARY], + getAccessPolicyTypes(distribution)) + + distribution.setBranchSharingPolicy( + BranchSharingPolicy.PROPRIETARY) + # Proprietary is permitted by the sharing policy, and there's a + # Private Security artifact. But Private isn't in use or allowed + # by a sharing policy, so it's now gone. + self.assertContentEqual( + [InformationType.PRIVATESECURITY, InformationType.PROPRIETARY], + getAccessPolicyTypes(distribution)) + + def test_proprietary_distributions_forbid_public_policies(self): + # A proprietary distribution forbids any sharing policy that would + # permit public artifacts. + owner = self.distribution.owner + with admin_logged_in(): + self.distribution.information_type = InformationType.PROPRIETARY + policies_permitting_public = [self.public_policy] + policies_permitting_public.extend( + policy for policy in self.commercial_policies if + InformationType.PUBLIC in self.allowed_types[policy]) + for policy in policies_permitting_public: + with ExpectedException( + ProprietaryPillar, "The distribution is Proprietary."): + self.setSharingPolicy(policy, owner) + + +class TestDistributionBugSharingPolicy( + BaseSharingPolicyTests, TestCaseWithFactory): + """Test Distribution.bug_sharing_policy.""" + + layer = DatabaseFunctionalLayer + + enum = BugSharingPolicy + public_policy = BugSharingPolicy.PUBLIC + commercial_policies = ( + BugSharingPolicy.PUBLIC_OR_PROPRIETARY, + BugSharingPolicy.PROPRIETARY_OR_PUBLIC, + BugSharingPolicy.PROPRIETARY, + ) + allowed_types = BUG_POLICY_ALLOWED_TYPES + + def setSharingPolicy(self, policy, user): + with person_logged_in(user): + result = self.distribution.setBugSharingPolicy(policy) + return result + + def getSharingPolicy(self): + return self.distribution.bug_sharing_policy + + +class TestDistributionBranchSharingPolicy( + BaseSharingPolicyTests, TestCaseWithFactory): + """Test Distribution.branch_sharing_policy.""" + + layer = DatabaseFunctionalLayer + + enum = BranchSharingPolicy + public_policy = BranchSharingPolicy.PUBLIC + commercial_policies = ( + BranchSharingPolicy.PUBLIC_OR_PROPRIETARY, + BranchSharingPolicy.PROPRIETARY_OR_PUBLIC, + BranchSharingPolicy.PROPRIETARY, + BranchSharingPolicy.EMBARGOED_OR_PROPRIETARY, + ) + allowed_types = BRANCH_POLICY_ALLOWED_TYPES + + def setSharingPolicy(self, policy, user): + with person_logged_in(user): + result = self.distribution.setBranchSharingPolicy(policy) + return result + + def getSharingPolicy(self): + return self.distribution.branch_sharing_policy + + def test_setting_embargoed_creates_access_policy(self): + # Setting a policy that allows Embargoed creates a corresponding + # access policy and shares it with the maintainer. + self.factory.makeCommercialSubscription(pillar=self.distribution) + self.assertEqual( + [InformationType.PRIVATESECURITY, InformationType.USERDATA], + [policy.type for policy in + getUtility(IAccessPolicySource).findByPillar( + [self.distribution])]) + self.setSharingPolicy( + self.enum.EMBARGOED_OR_PROPRIETARY, + self.commercial_admin) + self.assertEqual( + [InformationType.PRIVATESECURITY, InformationType.USERDATA, + InformationType.PROPRIETARY, InformationType.EMBARGOED], + [policy.type for policy in + getUtility(IAccessPolicySource).findByPillar( + [self.distribution])]) + self.assertTrue( + getUtility(IService, 'sharing').checkPillarAccess( + [self.distribution], InformationType.PROPRIETARY, + self.distribution.owner)) + self.assertTrue( + getUtility(IService, 'sharing').checkPillarAccess( + [self.distribution], InformationType.EMBARGOED, + self.distribution.owner)) + + +class TestDistributionSpecificationSharingPolicy( + BaseSharingPolicyTests, TestCaseWithFactory): + """Test Distribution.specification_sharing_policy.""" + + layer = DatabaseFunctionalLayer + + enum = SpecificationSharingPolicy + public_policy = SpecificationSharingPolicy.PUBLIC + commercial_policies = ( + SpecificationSharingPolicy.PUBLIC_OR_PROPRIETARY, + SpecificationSharingPolicy.PROPRIETARY_OR_PUBLIC, + SpecificationSharingPolicy.PROPRIETARY, + SpecificationSharingPolicy.EMBARGOED_OR_PROPRIETARY, + ) + allowed_types = SPECIFICATION_POLICY_ALLOWED_TYPES + + def setSharingPolicy(self, policy, user): + with person_logged_in(user): + result = self.distribution.setSpecificationSharingPolicy(policy) + return result + + def getSharingPolicy(self): + return self.distribution.specification_sharing_policy + + class TestDistributionCurrentSourceReleases( CurrentSourceReleasesMixin, TestCase): """Test for Distribution.getCurrentSourceReleases(). diff --git a/lib/lp/scripts/garbo.py b/lib/lp/scripts/garbo.py index f1c75ad..24c4d61 100644 --- a/lib/lp/scripts/garbo.py +++ b/lib/lp/scripts/garbo.py @@ -79,6 +79,7 @@ from lp.code.model.revision import ( from lp.code.model.revisionstatus import RevisionStatusArtifact from lp.oci.model.ocirecipebuild import OCIFile from lp.registry.interfaces.person import IPersonSet +from lp.registry.model.distribution import Distribution from lp.registry.model.person import Person from lp.registry.model.product import Product from lp.registry.model.sourcepackagename import SourcePackageName @@ -1426,7 +1427,7 @@ class UnusedPOTMsgSetPruner(TunableLoop): transaction.commit() -class UnusedAccessPolicyPruner(TunableLoop): +class UnusedProductAccessPolicyPruner(TunableLoop): """Deletes unused AccessPolicy and AccessPolicyGrants for products.""" maximum_chunk_size = 5000 @@ -1451,6 +1452,32 @@ class UnusedAccessPolicyPruner(TunableLoop): transaction.commit() +class UnusedDistributionAccessPolicyPruner(TunableLoop): + """Deletes unused AccessPolicy and AccessPolicyGrants for distributions.""" + + maximum_chunk_size = 5000 + + def __init__(self, log, abort_time=None): + super().__init__(log, abort_time) + self.start_at = 1 + self.store = IMasterStore(Distribution) + + def findDistributions(self): + return self.store.find( + Distribution, + Distribution.id >= self.start_at).order_by(Distribution.id) + + def isDone(self): + return self.findDistributions().is_empty() + + def __call__(self, chunk_size): + distributions = list(self.findDistributions()[:chunk_size]) + for distribution in distributions: + distribution._pruneUnusedPolicies() + self.start_at = distributions[-1].id + 1 + transaction.commit() + + class ProductVCSPopulator(TunableLoop): """Populates product.vcs from product.inferred_vcs if not set.""" @@ -2087,8 +2114,9 @@ class DailyDatabaseGarbageCollector(BaseDatabaseGarbageCollector): SuggestiveTemplatesCacheUpdater, TeamMembershipPruner, UnlinkedAccountPruner, - UnusedAccessPolicyPruner, + UnusedDistributionAccessPolicyPruner, UnusedPOTMsgSetPruner, + UnusedProductAccessPolicyPruner, WebhookJobPruner, ] experimental_tunable_loops = [ diff --git a/lib/lp/scripts/tests/test_garbo.py b/lib/lp/scripts/tests/test_garbo.py index 7f5844a..3df3b58 100644 --- a/lib/lp/scripts/tests/test_garbo.py +++ b/lib/lp/scripts/tests/test_garbo.py @@ -1355,9 +1355,10 @@ class TestGarbo(FakeAdapterMixin, TestCaseWithFactory): ap.type for ap in getUtility(IAccessPolicySource).findByPillar([pillar])] - def test_UnusedAccessPolicyPruner(self): - # UnusedAccessPolicyPruner removes access policies that aren't - # in use by artifacts or allowed by the project sharing policy. + def test_UnusedProductAccessPolicyPruner(self): + # UnusedProductAccessPolicyPruner removes access policies that + # aren't in use by artifacts or allowed by the project sharing + # policy. switch_dbuser('testadmin') product = self.factory.makeProduct() self.factory.makeCommercialSubscription(pillar=product) @@ -1385,6 +1386,39 @@ class TestGarbo(FakeAdapterMixin, TestCaseWithFactory): [InformationType.PRIVATESECURITY, InformationType.PROPRIETARY], self.getAccessPolicyTypes(product)) + def test_UnusedDistributionAccessPolicyPruner(self): + # UnusedDistributionAccessPolicyPruner removes access policies that + # aren't in use by artifacts or allowed by the distribution sharing + # policy. + switch_dbuser('testadmin') + distribution = self.factory.makeProduct() + self.factory.makeCommercialSubscription(pillar=distribution) + self.factory.makeAccessPolicy( + distribution, InformationType.PROPRIETARY) + naked_distribution = removeSecurityProxy(distribution) + naked_distribution.bug_sharing_policy = BugSharingPolicy.PROPRIETARY + naked_distribution.branch_sharing_policy = ( + BranchSharingPolicy.PROPRIETARY) + [ap] = getUtility(IAccessPolicySource).find( + [(distribution, InformationType.PRIVATESECURITY)]) + self.factory.makeAccessPolicyArtifact(policy=ap) + + # Private and Private Security were created with the distribution. + # Proprietary was created when the branch sharing policy was set. + self.assertContentEqual( + [InformationType.PRIVATESECURITY, InformationType.USERDATA, + InformationType.PROPRIETARY], + self.getAccessPolicyTypes(distribution)) + + self.runDaily() + + # Proprietary is permitted by the sharing policy, and there's a + # Private Security artifact. But Private isn't in use or allowed + # by a sharing policy, so garbo deleted it. + self.assertContentEqual( + [InformationType.PRIVATESECURITY, InformationType.PROPRIETARY], + self.getAccessPolicyTypes(distribution)) + def test_ProductVCSPopulator(self): switch_dbuser('testadmin') product = self.factory.makeProduct() diff --git a/lib/lp/security.py b/lib/lp/security.py index 95d7a45..fbada1b 100644 --- a/lib/lp/security.py +++ b/lib/lp/security.py @@ -2402,14 +2402,15 @@ class EditBranch(AuthorizationBase): class ModerateBranch(EditBranch): - """The owners, product owners, and admins can moderate branches.""" + """The owners, pillar owners, and admins can moderate branches.""" permission = 'launchpad.Moderate' def checkAuthenticated(self, user): if super().checkAuthenticated(user): return True branch = self.obj - if branch.product is not None and user.inTeam(branch.product.owner): + pillar = branch.product or branch.distribution + if pillar is not None and user.inTeam(pillar.owner): return True return user.in_commercial_admin @@ -2477,15 +2478,22 @@ class EditGitRepository(AuthorizationBase): class ModerateGitRepository(EditGitRepository): - """The owners, project owners, and admins can moderate Git repositories.""" + """The owners, pillar owners, and admins can moderate Git repositories.""" permission = 'launchpad.Moderate' def checkAuthenticated(self, user): if super().checkAuthenticated(user): return True target = self.obj.target - if (target is not None and IProduct.providedBy(target) and - user.inTeam(target.owner)): + if IProduct.providedBy(target): + pillar = target + elif IDistributionSourcePackage.providedBy(target): + pillar = target.distribution + elif IOCIProject.providedBy(target): + pillar = target.pillar + else: + raise AssertionError("Unknown target: %r" % target) + if pillar is not None and user.inTeam(pillar.owner): return True return user.in_commercial_admin diff --git a/lib/lp/testing/factory.py b/lib/lp/testing/factory.py index d4b9f20..f6a7917 100644 --- a/lib/lp/testing/factory.py +++ b/lib/lp/testing/factory.py @@ -2709,7 +2709,10 @@ class BareLaunchpadObjectFactory(ObjectFactory): publish_root_dir=None, publish_base_url=None, publish_copy_base_url=None, no_pubconf=False, icon=None, summary=None, vcs=None, - oci_project_admin=None, information_type=None): + oci_project_admin=None, bug_sharing_policy=None, + branch_sharing_policy=None, + specification_sharing_policy=None, + information_type=None): """Make a new distribution.""" if name is None: name = self.getUniqueString(prefix="distribution") @@ -2740,6 +2743,26 @@ class BareLaunchpadObjectFactory(ObjectFactory): naked_distro.bug_supervisor = bug_supervisor if oci_project_admin is not None: naked_distro.oci_project_admin = oci_project_admin + # makeProduct defaults licenses to [License.OTHER_PROPRIETARY] if + # any non-public sharing policy is set, which ensures a + # complimentary commercial subscription. However, Distribution + # doesn't have a licenses field, so deal with the commercial + # subscription directly here instead. + if ((bug_sharing_policy is not None and + bug_sharing_policy != BugSharingPolicy.PUBLIC) or + (branch_sharing_policy is not None and + branch_sharing_policy != BranchSharingPolicy.PUBLIC) or + (specification_sharing_policy is not None and + specification_sharing_policy != + SpecificationSharingPolicy.PUBLIC)): + naked_distro._ensure_complimentary_subscription() + if branch_sharing_policy: + naked_distro.setBranchSharingPolicy(branch_sharing_policy) + if bug_sharing_policy: + naked_distro.setBugSharingPolicy(bug_sharing_policy) + if specification_sharing_policy: + naked_distro.setSpecificationSharingPolicy( + specification_sharing_policy) if not no_pubconf: self.makePublisherConfig( distro, publish_root_dir, publish_base_url,
_______________________________________________ Mailing list: https://launchpad.net/~launchpad-reviewers Post to : launchpad-reviewers@lists.launchpad.net Unsubscribe : https://launchpad.net/~launchpad-reviewers More help : https://help.launchpad.net/ListHelp