This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new bb63475748c Add infra policy compliance checkers (#35848)
bb63475748c is described below

commit bb63475748c5e10d0988b8b779e5ca2724b044c2
Author: Enrique Calderon <[email protected]>
AuthorDate: Mon Aug 18 13:46:57 2025 -0600

    Add infra policy compliance checkers (#35848)
    
    * Add IAM policy compliance checker and configuration files
    
    `infra/iam/generate` has been removed as it has been assimilated into 
`iam.py`
    
    * fix typo
    
    * standardize the keys yaml
    
    * Fix error on IAM not found
    
    * Add service account keys compliance checker
    
    * Improve readme by adding details, formats and Account Keys info
    
    * Modified account keys compliance check class to allow reading an empty 
keys.yaml
    
    * Change the project id to apache beam
    
    * Allow service accounts on terraform
    
    * Proposed changes to make the policy compliant
    It is adding some user permission changes and the service accounts roles.
    
    * Proposed account keys changes to make it compliant
    **Warning**: This commit modifies service account keys, clearing them and 
starting them as custom, modified the generated file to just include the keys 
you want to manage
    
    * Solution for issues found by gemini-code-assitant
    
    * Implement SendingClient for GitHub issue notifications and email alerts
    
    * Restore original configuration files
    
    * Add license to SendingClient
    
    * Implement a print announcement to avoid issues and email all together
    
    * Added announcement functionality in AccountKeys and IAM checkers
    
    - Added SendingClient integration for creating and printing compliance 
announcements.
    - Updated main execution flow to support new announcement actions.
    - Improved logging and error handling for announcement processes.
---
 infra/enforcement/README.md        | 165 ++++++++++++
 infra/enforcement/account_keys.py  | 523 +++++++++++++++++++++++++++++++++++++
 infra/enforcement/config.yml       |  37 +++
 infra/enforcement/iam.py           | 400 ++++++++++++++++++++++++++++
 infra/enforcement/requirements.txt |  23 ++
 infra/enforcement/sending.py       | 179 +++++++++++++
 infra/iam/generate.py              | 212 ---------------
 infra/iam/users.tf                 |   2 +-
 infra/iam/users.yml                |   2 +-
 infra/keys/keys.yaml               |  30 +--
 10 files changed, 1342 insertions(+), 231 deletions(-)

diff --git a/infra/enforcement/README.md b/infra/enforcement/README.md
new file mode 100644
index 00000000000..c9b2bda8cab
--- /dev/null
+++ b/infra/enforcement/README.md
@@ -0,0 +1,165 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Infrastructure rules enforcement
+
+This module is used to check that the infrastructure rules are being used.
+
+## IAM Policies
+
+The enforcement is done by validating the IAM policies against the defined 
policies.
+The tool monitors and enforces compliance for user permissions, service 
account roles, and group memberships across your GCP project.
+
+### Usage
+
+You can specify the action either through the configuration file 
(`config.yml`) or via command-line arguments:
+
+```bash
+# Check compliance and report issues (default)
+python iam.py --action check
+
+# Create GitHub issue if compliance violations are found
+python iam.py --action issue
+
+# Generate new compliance file based on current IAM policy
+python iam.py --action generate
+```
+
+### Actions
+
+- **check**: Validates IAM policies against defined policies and reports any 
differences (default behavior)
+- **issue**: Creates a GitHub issue when IAM policies differ from the defined 
ones, including detailed permission discrepancies
+- **generate**: Updates the compliance file to match the current GCP IAM 
policy, creating a new baseline from existing permissions
+
+### Features
+
+The IAM Policy enforcement tool provides the following capabilities:
+
+- **Comprehensive Policy Export**: Automatically exports all IAM bindings and 
roles from the GCP project
+- **Member Type Recognition**: Handles users, service accounts, and groups 
with proper parsing and identification
+- **Permission Comparison**: Detailed comparison between expected and actual 
permissions for each user
+- **Conditional Role Filtering**: Automatically excludes conditional roles 
(roles with conditions) from compliance checks
+- **Sorted Output**: Provides consistent, sorted output for easy comparison 
and review
+- **Detailed Reporting**: Comprehensive reporting of permission differences 
with clear before/after comparisons
+- **GitHub Integration**: Automatic issue creation with detailed compliance 
violation reports
+
+### Configuration
+
+The `config.yml` file supports the following parameters for IAM policies:
+
+- `project_id`: GCP project ID to check (default: `apache-beam-testing`)
+- `users_file`: Path to the YAML file containing expected IAM policies 
(default: `../iam/users.yml`)
+- `action`: Default action to perform (`check`, `issue`, or `generate`)
+- `logging`: Logging configuration (level and format)
+
+### IAM Policy File Format
+
+The IAM policy file should follow this YAML structure:
+
+```yaml
+- username: john.doe
+  email: [email protected]
+  permissions:
+    - role: roles/viewer
+    - role: roles/storage.objectViewer
+- username: service-account-name
+  email: [email protected]
+  permissions:
+    - role: roles/compute.instanceAdmin
+    - role: roles/iam.serviceAccountUser
+```
+
+Each user entry includes:
+- `username`: The derived username (typically the part before @ in email 
addresses)
+- `email`: The full email address of the user or service account
+- `permissions`: List of IAM roles assigned to this member
+  - `role`: The full GCP IAM role name (e.g., `roles/viewer`, `roles/editor`)
+
+### Compliance Checking Process
+
+1. **Policy Extraction**: Retrieves current IAM policy from the GCP project
+2. **Member Parsing**: Parses all IAM members and extracts usernames, emails, 
and types
+3. **Role Processing**: Processes all roles while filtering out conditional 
bindings
+4. **Comparison**: Compares current permissions with expected permissions from 
the policy file
+5. **Reporting**: Generates detailed reports of any discrepancies found
+
+Command-line arguments take precedence over configuration file settings.
+
+## Account Keys
+
+The enforcement is also done by validating service account keys and their 
access permissions against the defined policies.
+The tool supports three different actions when discrepancies are found:
+
+### Usage
+
+You can specify the action either through the configuration file 
(`config.yml`) or via command-line arguments:
+
+```bash
+# Check compliance and report issues (default)
+python account_keys.py --action check
+
+# Create GitHub issue if compliance violations are found
+python account_keys.py --action issue
+
+# Generate new compliance file based on current service account keys policy
+python account_keys.py --action generate
+```
+
+### Actions
+
+- **check**: Validates service account keys and their permissions against 
defined policies and reports any differences (default behavior)
+- **issue**: Creates a GitHub issue when service account keys policies differ 
from the defined ones
+- **generate**: Updates the compliance file to match the current GCP service 
account keys and Secret Manager permissions
+
+### Features
+
+The Account Keys enforcement tool provides the following capabilities:
+
+- **Service Account Discovery**: Automatically discovers all active 
(non-disabled) service accounts in the project
+- **Secret Manager Integration**: Monitors secrets created by the 
beam-infra-secret-manager service
+- **Permission Validation**: Ensures that Secret Manager permissions match the 
declared authorized users
+- **Compliance Reporting**: Identifies missing service accounts, undeclared 
managed secrets, and permission mismatches
+- **Automatic Remediation**: Can automatically update the compliance file to 
match current infrastructure state
+
+### Configuration
+
+The `config.yml` file supports the following parameters for account keys:
+
+- `project_id`: GCP project ID to check
+- `service_account_keys_file`: Path to the YAML file containing expected 
service account keys policies (default: `../keys/keys.yaml`)
+- `action`: Default action to perform (`check`, `issue`, or `generate`)
+- `logging`: Logging configuration (level and format)
+
+### Service Account Keys File Format
+
+The service account keys file should follow this YAML structure:
+
+```yaml
+service_accounts:
+- account_id: example-service-account
+  display_name: [email protected]
+  authorized_users:
+  - email: [email protected]
+  - email: [email protected]
+```
+
+Each service account entry includes:
+- `account_id`: The unique identifier for the service account (without the 
full email domain)
+- `display_name`: The full service account email address or any custom display 
name
+- `authorized_users`: List of users who should have access to the service 
account's secrets
diff --git a/infra/enforcement/account_keys.py 
b/infra/enforcement/account_keys.py
new file mode 100644
index 00000000000..4c3a8190d23
--- /dev/null
+++ b/infra/enforcement/account_keys.py
@@ -0,0 +1,523 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import datetime
+import logging
+import sys
+import yaml
+import argparse
+import os
+from typing import List, Dict, TypedDict, Optional
+from google.cloud import secretmanager
+from google.cloud import iam_admin_v1
+from google.cloud.iam_admin_v1 import types
+from sending import SendingClient
+
+SECRET_MANAGER_LABEL = "beam-infra-secret-manager"
+
+class AuthorizedUser(TypedDict):
+    email: str
+
+class ServiceAccount(TypedDict):
+    account_id: str
+    display_name: str
+    authorized_users: List[AuthorizedUser]
+
+class ServiceAccountsConfig(TypedDict):
+    service_accounts: List[ServiceAccount]
+
+CONFIG_FILE = "config.yml"
+
+class AccountKeysPolicyComplianceCheck:
+    def __init__(self, project_id: str, service_account_keys_file: str, 
logger: logging.Logger, sending_client: Optional[SendingClient] = None):
+        self.project_id = project_id
+        self.service_account_keys_file = service_account_keys_file
+        self.logger = logger
+        self.sending_client = sending_client
+        self.secret_client = secretmanager.SecretManagerServiceClient()
+        self.service_account_client = iam_admin_v1.IAMClient()
+
+    def _normalize_account_email(self, account_id: str) -> str:
+        """
+        Normalizes the account identifier to a full email format.
+        
+        Args:
+            account_id (str): The unique identifier or email of the service 
account.
+            
+        Returns:
+            str: The full service account email address.
+        """
+        if "@" in account_id:
+            return account_id
+        else:
+            return f"{account_id}@{self.project_id}.iam.gserviceaccount.com"
+
+    def _denormalize_account_email(self, email: str) -> str:
+        """
+        Denormalizes the full service account email address to its unique 
identifier.
+
+        Args:
+            email (str): The full service account email address.
+
+        Returns:
+            str: The unique identifier for the service account.
+        """
+        if email.endswith(f"@{self.project_id}.iam.gserviceaccount.com"):
+            return email.split("@")[0]
+        return email
+
+    def _normalize_username(self, username: str) -> str:
+        """
+        Normalizes the username to a consistent format.
+
+        Args:
+            username (str): The username to normalize.
+
+        Returns:
+            str: The normalized username.
+        """
+        if not username.startswith("user:"):
+            return f"user:{username.strip().lower()}"
+        return username
+    
+    def _denormalize_username(self, username: str) -> str:
+        """
+        Denormalizes the username from the consistent format.
+
+        Args:
+            username (str): The normalized username.
+
+        Returns:
+            str: The denormalized username.
+        """
+        if username.startswith("user:"):
+            return username.split(":", 1)[1].strip().lower()
+        return username
+
+    def _get_all_live_service_accounts(self) -> List[str]:
+        """
+        Retrieves all service accounts that are currently active (not 
disabled) in the project.
+
+        Returns:
+            List[str]: A list of email addresses for all live service accounts.
+        """
+        request = types.ListServiceAccountsRequest()
+        request.name = f"projects/{self.project_id}"
+
+        try:
+            accounts = 
self.service_account_client.list_service_accounts(request=request)
+            self.logger.debug(f"Retrieved {len(accounts.accounts)} service 
accounts for project {self.project_id}")
+
+            if not accounts:
+                self.logger.warning(f"No service accounts found in project 
{self.project_id}.")
+                return []
+
+            return [self._normalize_account_email(account.email) for account 
in accounts.accounts if not account.disabled]
+        except Exception as e:
+            self.logger.error(f"Failed to retrieve service accounts for 
project {self.project_id}: {e}")
+            raise
+
+    def _get_all_live_managed_secrets(self) -> List[str]:
+        """
+        Retrieves the list of secrets from the Secret Manager that where 
created by the beam-secret-service
+
+        Returns:
+            List[str]: A list of secret ids 
+        """
+        try:
+            secrets = list(self.secret_client.list_secrets(request={"parent": 
f"projects/{self.project_id}"}))
+            self.logger.debug(f"Retrieved {len(secrets)} secrets for project 
{self.project_id}")
+
+            if not secrets:
+                self.logger.warning(f"No secrets found in project 
{self.project_id}.")
+                return []
+
+            return [secret.name.split("/")[-1] for secret in secrets if 
"created_by" in secret.labels and secret.labels["created_by"] == 
SECRET_MANAGER_LABEL]
+        except Exception as e:
+            self.logger.error(f"Failed to retrieve secrets for project 
{self.project_id}: {e}")
+            raise
+
+    def _get_all_secret_authorized_users(self, secret_id: str) -> List[str]:
+        """
+        Retrieves a list of all users who have access to the secrets in the 
Secret Manager.
+
+        Args:
+            secret_id (str): The ID of the secret to check access for.
+        Returns:
+            List[str]: A list of email addresses for all users authorized to 
access the secrets.
+        """
+        accessor_role = "roles/secretmanager.secretAccessor"
+        resource_name = self.secret_client.secret_path(self.project_id, 
secret_id)
+
+        try:
+            policy = self.secret_client.get_iam_policy(request={"resource": 
resource_name})
+            self.logger.debug(f"Retrieved IAM policy for secret '{secret_id}': 
{policy}")
+
+            if not policy.bindings:
+                self.logger.warning(f"No IAM bindings found for secret 
'{secret_id}'.")
+                return []
+            
+            authorized_users = []
+            for binding in policy.bindings:
+                if binding.role == accessor_role:
+                    for user in binding.members:
+                        authorized_users.append(self._normalize_username(user))
+            
+            return authorized_users
+        except Exception as e:
+            self.logger.error(f"Failed to get IAM policy for secret 
'{secret_id}': {e}")
+            raise
+
+    def _read_service_account_keys(self) -> ServiceAccountsConfig:
+        """
+        Reads the service account keys from a YAML file and returns a list of 
ServiceAccount objects.
+
+        Returns:
+            List[ServiceAccount]: A list of service account declarations.
+        """
+        try:
+            with open(self.service_account_keys_file, "r") as file:
+                keys = yaml.safe_load(file)
+
+                if not keys or keys.get("service_accounts") is None:
+                    return {"service_accounts": []}
+
+                return keys
+        except FileNotFoundError:
+            self.logger.info(f"Service account keys file 
{self.service_account_keys_file} not found, starting with empty configuration")
+            return {"service_accounts": []}
+        except IOError as e:
+            error_msg = f"Failed to read service account keys from 
{self.service_account_keys_file}: {e}"
+            self.logger.error(error_msg)
+            raise
+
+    def _to_yaml_file(self, data: List[ServiceAccount], output_file: str, 
header_info: str = "") -> None:
+        """
+        Writes a list of dictionaries to a YAML file.
+        Include the apache license header on the files
+
+        Args:
+            data: A list of dictionaries containing user permissions and 
details.
+            output_file: The file path where the YAML output will be written.
+            header_info: A string containing the header information to be 
included in the YAML file.
+        """
+
+        apache_license_header = """# Licensed to the Apache Software 
Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+    """
+
+        # Prepare the header with the Apache license
+        header = f"{apache_license_header}\n# {header_info}\n# Generated on 
{datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d %H:%M:%S')} 
UTC\n\n"
+
+        try:
+            with open(output_file, "w") as file:
+                file.write(header)
+                yaml_data = {"service_accounts": data}
+                yaml.dump(yaml_data, file, sort_keys=False, 
default_flow_style=False, indent=2)
+            self.logger.info(f"Successfully wrote Service Account Keys policy 
data to {output_file}")
+        except IOError as e:
+            self.logger.error(f"Failed to write to {output_file}: {e}")
+            
+
+    def check_compliance(self) -> List[str]:
+        """
+        Checks the compliance of service account keys with the defined 
policies.
+
+        Returns:
+            List[str]: A list of compliance issue messages.
+        """
+
+        service_account_data = self._read_service_account_keys()
+        file_service_accounts = service_account_data.get("service_accounts")
+
+        if not file_service_accounts:
+            file_service_accounts = []
+            self.logger.info(f"No service account keys found in the 
{self.service_account_keys_file}.")
+        
+        compliance_issues = []
+
+        # Check that all service accounts that exist are declared
+        for service_account in self._get_all_live_service_accounts():
+            if self._denormalize_account_email(service_account) not in 
[account["account_id"] for account in file_service_accounts]:
+                msg = f"Service account '{service_account}' is not declared in 
the service account keys file."
+                compliance_issues.append(msg)
+                self.logger.warning(msg)
+
+        managed_secrets = self._get_all_live_managed_secrets()
+        extracted_secrets = 
[f"{self._denormalize_account_email(account['account_id'])}-key" for account in 
file_service_accounts]
+
+        # Check for managed secrets that are not declared
+        for secret in managed_secrets:
+            if secret not in extracted_secrets:
+                msg = f"Managed secret '{secret}' is not declared in the 
service account keys file."
+                compliance_issues.append(msg)
+                self.logger.warning(msg)
+
+        # Check for each managed secret if it has the correct permissions
+        for account in file_service_accounts:
+            secret_name = 
f"{self._denormalize_account_email(account['account_id'])}-key"
+            if secret_name not in managed_secrets:
+                # Skip accounts that don't have managed secrets
+                continue
+                
+            authorized_users = [user["email"] for user in 
account["authorized_users"]]
+            actual_users = [self._denormalize_username(user) for user in 
self._get_all_secret_authorized_users(secret_name)]
+            
+            # Sort both lists for proper comparison
+            authorized_users.sort()
+            actual_users.sort()
+            
+            if authorized_users != actual_users:
+                msg = f"Managed secret '{account['account_id']}' does not have 
the correct permissions. Expected: {authorized_users}, Actual: {actual_users}"
+                compliance_issues.append(msg)
+                self.logger.warning(msg)
+
+        return compliance_issues
+
+    def create_announcement(self, recipient: str) -> None:
+        """
+        Creates an announcement about compliance issues using the 
SendingClient.
+
+        Args:
+            recipient (str): The email address of the announcement recipient.
+        """
+        if not self.sending_client:
+            raise ValueError("SendingClient is required for creating 
announcements")
+            
+        diff = self.check_compliance()
+
+        if not diff:
+            self.logger.info("No compliance issues found, no announcement will 
be created.")
+            return  
+
+        title = f"Account Keys Compliance Issue Detected"
+        body = f"Account keys for project {self.project_id} are not compliant 
with the defined policies on {self.service_account_keys_file}\n\n"
+        for issue in diff:
+            body += f"- {issue}\n"
+
+        announcement = f"Dear team,\n\nThis is an automated notification about 
compliance issues detected in the Account Keys policy for project 
{self.project_id}.\n\n"
+        announcement += f"We found {len(diff)} compliance issue(s) that need 
your attention.\n"
+        announcement += f"\nPlease check the GitHub issue for detailed 
information and take appropriate action to resolve these compliance violations."
+
+        self.sending_client.create_announcement(title, body, recipient, 
announcement)
+
+    def print_announcement(self, recipient: str) -> None:
+        """
+        Prints announcement details instead of sending them (for testing 
purposes).
+        Args:
+            recipient (str): The email address of the announcement recipient.
+        """
+        if not self.sending_client:
+            raise ValueError("SendingClient is required for printing 
announcements")
+            
+        diff = self.check_compliance()
+
+        if not diff:
+            self.logger.info("No compliance issues found, no announcement will 
be printed.")
+            return
+
+        title = f"Account Keys Compliance Issue Detected"
+        body = f"Account keys for project {self.project_id} are not compliant 
with the defined policies on {self.service_account_keys_file}\n\n"
+        for issue in diff:
+            body += f"- {issue}\n"
+
+        announcement = f"Dear team,\n\nThis is an automated notification about 
compliance issues detected in the Account Keys policy for project 
{self.project_id}.\n\n"
+        announcement += f"We found {len(diff)} compliance issue(s) that need 
your attention.\n"
+        announcement += f"\nPlease check the GitHub issue for detailed 
information and take appropriate action to resolve these compliance violations."
+
+        self.sending_client.print_announcement(title, body, recipient, 
announcement)
+
+    def generate_compliance(self) -> None:
+        """
+        Modifies the service account keys file to match the current state of 
service accounts and secrets.
+        It will just add the non managed service accounts.
+        """
+
+        service_account_data = self._read_service_account_keys()
+        file_service_accounts = service_account_data.get("service_accounts", 
[])
+        
+        # Ensure file_service_accounts is a list
+        if file_service_accounts is None:
+            file_service_accounts = []
+
+        self.logger.info(f"Found {len(file_service_accounts)} existing service 
accounts in the keys file")
+        
+        # Check that all service accounts that exist are declared, if not, add 
them
+        for service_account in self._get_all_live_service_accounts():
+            if self._denormalize_account_email(service_account) not in 
[account["account_id"] for account in file_service_accounts]:
+                self.logger.info(f"Service account '{service_account}' is not 
declared in the service account keys file, adding it")
+                file_service_accounts.append({
+                    "account_id": 
self._denormalize_account_email(service_account),
+                    "display_name": service_account,
+                    "authorized_users": []
+                })
+
+        managed_secrets = self._get_all_live_managed_secrets()
+        extracted_secrets = 
[f"{self._denormalize_account_email(account['account_id'])}-key" for account in 
file_service_accounts]
+
+        # Check for managed secrets that are not declared, if not, add them
+        for secret in managed_secrets:
+            if secret not in extracted_secrets:
+                self.logger.info(f"Managed secret '{secret}' is not declared 
in the service account keys file, adding it")
+                file_service_accounts.append({
+                    "account_id": secret.strip("-key"),
+                    "display_name": 
self._normalize_account_email(secret.strip("-key")),
+                    "authorized_users": []
+                })
+
+        # Check for each managed secret if it has the correct permissions
+        for account in file_service_accounts:
+            secret_name = 
f"{self._denormalize_account_email(account['account_id'])}-key"
+            if secret_name not in managed_secrets:
+                continue
+
+            authorized_users = sorted([user["email"] for user in 
account["authorized_users"]])
+
+            if not authorized_users:
+                self.logger.info(f"Managed secret '{account}' is new, skipping 
permission check")
+                continue
+
+            actual_users_normalized = 
sorted(self._get_all_secret_authorized_users(secret_name))
+            actual_users = sorted([self._denormalize_username(user) for user 
in actual_users_normalized])
+
+            if authorized_users != actual_users:
+                self.logger.info(f"Managed secret '{account}' does not have 
the correct permissions, updating it")
+                account["authorized_users"] = [{"email": user} for user in 
actual_users]
+
+        # Remove duplicates based on account_id
+        seen_accounts = set()
+        deduplicated_accounts = []
+        for account in file_service_accounts:
+            if account["account_id"] not in seen_accounts:
+                seen_accounts.add(account["account_id"])
+                deduplicated_accounts.append(account)
+            else:
+                self.logger.info(f"Removing duplicate entry for account 
'{account['account_id']}'")
+
+        self._to_yaml_file(deduplicated_accounts, 
self.service_account_keys_file, header_info="Service Account Keys")
+
+def config_process() -> Dict[str, str]:
+    with open(CONFIG_FILE, "r") as file:
+        config = yaml.safe_load(file)
+
+    if not config:
+        raise ValueError("Configuration file is empty or invalid.")
+    
+    config_res = dict()
+
+    config_res["project_id"] = config.get("project_id", "apache-beam-testing")
+    config_res["logging_level"] = config.get("logging", {}).get("level", 
"INFO")
+    config_res["logging_format"] = config.get("logging", {}).get("format", 
"[%(asctime)s] %(levelname)s: %(message)s")
+    config_res["service_account_keys_file"] = 
config.get("service_account_keys_file", "../keys/keys.yaml")
+    config_res["action"] = config.get("action", "check")
+
+    # SendingClient configuration
+    config_res["github_token"] = os.getenv("GITHUB_TOKEN", "")
+    config_res["github_repo"] = os.getenv("GITHUB_REPOSITORY", "apache/beam")
+    config_res["smtp_server"] = os.getenv("SMTP_SERVER", "")
+    config_res["smtp_port"] = os.getenv("SMTP_PORT", 587)
+    config_res["email"] = os.getenv("EMAIL_ADDRESS", "")
+    config_res["password"] = os.getenv("EMAIL_PASSWORD", "")
+    config_res["recipient"] = os.getenv("EMAIL_RECIPIENT", "")
+
+    return config_res
+
+def main():
+    # Parse command line arguments
+    parser = argparse.ArgumentParser(description="Account Keys Compliance 
Checker")
+    parser.add_argument("--action", choices=["check", "announce", "print", 
"generate"], 
+                       help="Action to perform: check compliance, create 
announcement, print announcement, or generate new compliance")
+    args = parser.parse_args()
+
+    config = config_process()
+
+    # Command line argument takes precedence over config file
+    action = args.action if args.action else config.get("action", "check")
+
+    logging.basicConfig(level=getattr(logging, 
config["logging_level"].upper(), logging.INFO),
+                        format=config["logging_format"])
+    logger = logging.getLogger("AccountKeysPolicyComplianceCheck")
+
+    # Create SendingClient if needed for announcement actions
+    sending_client = None
+    if action in ["announce", "print"]:
+        try:
+            # Provide default values for testing, especially for print action
+            github_token = config["github_token"] or "dummy-token"
+            github_repo = config["github_repo"] or "dummy/repo"
+            smtp_server = config["smtp_server"] or "dummy-server"
+            smtp_port = int(config["smtp_port"]) if config["smtp_port"] else 
587
+            email = config["email"] or "[email protected]"
+            password = config["password"] or "dummy-password"
+            
+            sending_client = SendingClient(
+                logger=logger,
+                github_token=github_token,
+                github_repo=github_repo,
+                smtp_server=smtp_server,
+                smtp_port=smtp_port,
+                email=email,
+                password=password
+            )
+        except Exception as e:
+            logger.error(f"Failed to initialize SendingClient: {e}")
+            return 1
+
+    logger.info(f"Starting Account Keys policy compliance check with action: 
{action}")
+    account_keys_checker = 
AccountKeysPolicyComplianceCheck(config["project_id"], 
config["service_account_keys_file"], logger, sending_client)
+
+    try:
+        if action == "check":
+            compliance_issues = account_keys_checker.check_compliance()
+            if compliance_issues:
+                logger.warning("Account Keys policy compliance issues found:")
+                for issue in compliance_issues:
+                    logger.warning(issue)
+            else:
+                logger.info("Account Keys policy is compliant.")
+        elif action == "announce":
+            logger.info("Creating announcement for compliance violations...")
+            recipient = config["recipient"] or "[email protected]"
+            account_keys_checker.create_announcement(recipient)
+        elif action == "print":
+            logger.info("Printing announcement for compliance violations...")
+            recipient = config["recipient"] or "[email protected]"
+            account_keys_checker.print_announcement(recipient)
+        elif action == "generate":
+            logger.info("Generating new compliance based on current Account 
Keys policy...")
+            account_keys_checker.generate_compliance()
+        else:
+            logger.error(f"Unknown action: {action}")
+            return 1
+    except Exception as e:
+        logger.error(f"Error executing action '{action}': {e}")
+        return 1
+
+    return 0
+
+if __name__ == "__main__":
+    sys.exit(main())
diff --git a/infra/enforcement/config.yml b/infra/enforcement/config.yml
new file mode 100644
index 00000000000..7fae0253b8f
--- /dev/null
+++ b/infra/enforcement/config.yml
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Project ID
+project_id: apache-beam-testing
+
+# Logging
+logging:
+  level: DEBUG
+  format: "[%(asctime)s] %(levelname)s: %(message)s"
+
+# IAM
+
+# Working users file
+users_file: ../iam/users.yml
+
+# Service Account Keys
+service_account_keys_file: ../keys/keys.yaml
+
+# Action to perform when running the script
+# Options:
+#   - check: Check compliance and report issues (default)
+#   - issue: Create GitHub issue if compliance violations are found
+#   - generate: Generate new compliance file based on current IAM policy
+action: check
diff --git a/infra/enforcement/iam.py b/infra/enforcement/iam.py
new file mode 100644
index 00000000000..92246aa7c62
--- /dev/null
+++ b/infra/enforcement/iam.py
@@ -0,0 +1,400 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import argparse
+import datetime
+import logging
+import os
+import sys
+import yaml
+from google.api_core import exceptions
+from google.cloud import resourcemanager_v3
+from typing import Optional, List, Dict, Tuple
+from sending import SendingClient
+
+CONFIG_FILE = "config.yml"
+
+class IAMPolicyComplianceChecker:
+    def __init__(self, project_id: str, users_file: str, logger: 
logging.Logger, sending_client: Optional[SendingClient] = None):
+        self.project_id = project_id
+        self.users_file = users_file
+        self.client = resourcemanager_v3.ProjectsClient()
+        self.logger = logger
+        self.sending_client = sending_client
+
+    def _parse_member(self, member: str) -> tuple[str, Optional[str], str]:
+        """Parses an IAM member string to extract type, email, and a derived 
username.
+
+        Args:
+            member: The IAM member string
+        Returns:
+            A tuple containing:
+                - username: The derived username from the member string.
+                - email: The email address if available, otherwise None.
+                - member_type: The type of the member (e.g., user, 
serviceAccount, group).
+        """
+        email = None
+        username = member
+
+        # Split the member string to determine type and identifier
+        parts = member.split(':', 1)
+        member_type = parts[0] if len(parts) > 1 else "unknown"
+        identifier = parts[1] if len(parts) > 1 else member
+
+        if member_type in ["user", "serviceAccount", "group"]:
+            email = identifier
+            if '@' in identifier:
+                username = identifier.split('@')[0]
+            else:
+                username = identifier
+        else:
+            username = identifier
+            member_type = "unknown"
+            email = None
+
+        return username, email, member_type
+
+    def _export_project_iam(self) -> List[Dict]:
+        """Exports the IAM policy for a given project to YAML format.
+
+        Returns:
+            A list of dictionaries containing the IAM policy details.
+        """
+
+        try:
+            policy = 
self.client.get_iam_policy(resource=f"projects/{self.project_id}")
+            self.logger.debug(f"Retrieved IAM policy for project 
{self.project_id}")
+        except exceptions.NotFound as e:
+            self.logger.error(f"Project {self.project_id} not found: {e}")
+            raise
+        except exceptions.PermissionDenied as e:
+            self.logger.error(f"Permission denied for project 
{self.project_id}: {e}")
+            raise
+        except Exception as e:
+            self.logger.error(f"An error occurred while retrieving IAM policy 
for project {self.project_id}: {e}")
+            raise
+
+        members_data = {}
+
+        for binding in policy.bindings:
+            role = binding.role
+
+            for member_str in binding.members:
+                if member_str not in members_data:
+                    username, email_address, member_type = 
self._parse_member(member_str)
+                    if member_type == "unknown":
+                        self.logger.warning(f"Skipping member {member_str} 
with no email address")
+                        continue  # Skip if no email address is found, 
probably a malformed member
+                    members_data[member_str] = {
+                        "username": username,
+                        "email": email_address,
+                        "permissions": []
+                    }
+
+                # Skip permissions that have a condition
+                if "withcond" in role:
+                    continue
+
+                permission_entry = {}
+                permission_entry["role"] = role
+
+                
members_data[member_str]["permissions"].append(permission_entry)
+
+        output_list = []
+        for data in members_data.values():
+            data["permissions"] = sorted(data["permissions"], key=lambda p: 
p["role"])
+            output_list.append({
+                "username": data["username"],
+                "email": data["email"],
+                "permissions": data["permissions"]
+            })
+
+        output_list.sort(key=lambda x: x["username"])
+        return output_list
+
+    def _read_project_iam_file(self) -> List[Dict]:
+        """Reads the IAM policy from a YAML file.
+
+        Returns:
+            A list of dictionaries containing the IAM policy details.
+        """
+        try:
+            with open(self.users_file, "r") as file:
+                iam_policy = yaml.safe_load(file)
+
+
+                self.logger.debug(f"Retrieved IAM policy from file for project 
{self.project_id}")
+                return iam_policy
+        except FileNotFoundError:
+            self.logger.error(f"IAM policy file not found for project 
{self.project_id}")
+            return []
+        except Exception as e:
+            self.logger.error(f"An error occurred while reading IAM policy 
file for project {self.project_id}: {e}")
+            return []
+
+    def _to_yaml_file(self, data: List[Dict], output_file: str, header_info: 
str = "") -> None:
+        """
+        Writes a list of dictionaries to a YAML file.
+        Include the apache license header on the files
+
+        Args:
+            data: A list of dictionaries containing user permissions and 
details.
+            output_file: The file path where the YAML output will be written.
+            header_info: A string containing the header information to be 
included in the YAML file.
+        """
+
+        apache_license_header = """# Licensed to the Apache Software 
Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+    """
+
+        # Prepare the header with the Apache license
+        header = f"{apache_license_header}\n# {header_info}\n# Generated on 
{datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d %H:%M:%S')} 
UTC\n\n"
+
+        try:
+            with open(output_file, "w") as file:
+                file.write(header)
+                yaml.dump(data, file, sort_keys=False, 
default_flow_style=False, indent=2)
+            self.logger.info(f"Successfully wrote IAM policy data to 
{output_file}")
+        except IOError as e:
+            self.logger.error(f"Failed to write to {output_file}: {e}")
+            raise
+        
+    def check_compliance(self) -> List[str]:
+        """
+        Checks the compliance of the IAM policy against the defined policies.
+
+        Returns:
+            A list of strings describing any compliance issues found.
+        """
+        current_users = {user['email']: user for user in 
self._export_project_iam()}
+        existing_users = {user['email']: user for user in 
self._read_project_iam_file()}
+
+        if not existing_users:
+            error_msg = f"No IAM policy found in the {self.users_file}."
+            self.logger.info(error_msg)
+            raise RuntimeError(error_msg)
+
+        differences = []        
+
+        all_emails = set(current_users.keys()) | set(existing_users.keys())
+
+        for email in sorted(list(all_emails)):
+            current_user = current_users.get(email)
+            existing_user = existing_users.get(email)
+
+            if current_user and not existing_user:
+                differences.append(f"User {email} not found in existing 
policy.")
+            elif not current_user and existing_user:
+                differences.append(f"User {email} found in policy file but not 
in GCP.")
+            elif current_user and existing_user:
+                if current_user["permissions"] != existing_user["permissions"]:
+                    msg = f"\nPermissions for user {email} differ."
+                    msg += f"\nIn GCP: {current_user['permissions']}"
+                    msg += f"\nIn {self.users_file}: 
{existing_user['permissions']}"
+                    self.logger.info(msg)
+                    differences.append(msg)
+
+        return differences
+
+    def create_announcement(self, recipient: str) -> None:
+        """
+        Creates an announcement about compliance issues using the 
SendingClient.
+
+        Args:
+            recipient (str): The email address of the announcement recipient.
+        """
+        if not self.sending_client:
+            raise ValueError("SendingClient is required for creating 
announcements")
+            
+        diff = self.check_compliance()
+
+        if not diff:
+            self.logger.info("No compliance issues found, no announcement will 
be created.")
+            return
+
+        title = f"IAM Policy Non-Compliance Detected"
+        body = f"IAM policy for project {self.project_id} is not compliant 
with the defined policies on {self.users_file}\n\n"
+        for issue in diff:
+            body += f"- {issue}\n"
+
+        announcement = f"Dear team,\n\nThis is an automated notification about 
compliance issues detected in the IAM policy for project {self.project_id}.\n\n"
+        announcement += f"We found {len(diff)} compliance issue(s) that need 
your attention.\n"
+        announcement += f"\nPlease check the GitHub issue for detailed 
information and take appropriate action to resolve these compliance violations."
+
+        self.sending_client.create_announcement(title, body, recipient, 
announcement)
+
+    def print_announcement(self, recipient: str) -> None:
+        """
+        Prints announcement details instead of sending them (for testing 
purposes).
+        
+        Args:
+            recipient (str): The email address of the announcement recipient.
+        """
+        if not self.sending_client:
+            raise ValueError("SendingClient is required for printing 
announcements")
+            
+        diff = self.check_compliance()
+
+        if not diff:
+            self.logger.info("No compliance issues found, no announcement will 
be printed.")
+            return
+
+        title = f"IAM Policy Non-Compliance Detected"
+        body = f"IAM policy for project {self.project_id} is not compliant 
with the defined policies on {self.users_file}\n\n"
+        for issue in diff:
+            body += f"- {issue}\n"
+
+        announcement = f"Dear team,\n\nThis is an automated notification about 
compliance issues detected in the IAM policy for project {self.project_id}.\n\n"
+        announcement += f"We found {len(diff)} compliance issue(s) that need 
your attention.\n"
+        announcement += f"\nPlease check the GitHub issue for detailed 
information and take appropriate action to resolve these compliance violations."
+
+        self.sending_client.print_announcement(title, body, recipient, 
announcement)
+    
+    def generate_compliance(self) -> None:
+        """
+        Modifies the users file to match the current IAM policy.
+        If no changes are needed, no file will be written.
+        """
+        
+        try:
+            diff = self.check_compliance()
+        except RuntimeError:
+            self.logger.info("No existing IAM policy found.")
+            diff = ["No existing policy found"]
+
+        if not diff or (len(diff) == 1 and "No existing policy found" not in 
diff[0]):
+            self.logger.info("No compliance issues found, no changes will be 
made.")
+            return
+
+        current_policy = self._export_project_iam()
+        header_info = f"IAM policy for project {self.project_id}"
+        
+        self._to_yaml_file(current_policy, self.users_file, header_info)
+        self.logger.info(f"Generated new compliance file: {self.users_file}")
+
+def config_process() -> Dict[str, str]:
+    with open(CONFIG_FILE, "r") as file:
+        config = yaml.safe_load(file)
+
+    if not config:
+        raise ValueError("Configuration file is empty or invalid.")
+    
+    config_res = dict()
+
+    config_res["project_id"] = config.get("project_id", "apache-beam-testing")
+    config_res["logging_level"] = config.get("logging", {}).get("level", 
"INFO")
+    config_res["logging_format"] = config.get("logging", {}).get("format", 
"[%(asctime)s] %(levelname)s: %(message)s")
+    config_res["users_file"] = config.get("users_file", "../iam/users.yml")
+    config_res["action"] = config.get("action", "check")
+
+    # SendingClient configuration
+    config_res["github_token"] = os.getenv("GITHUB_TOKEN", "")
+    config_res["github_repo"] = os.getenv("GITHUB_REPOSITORY", "apache/beam")
+    config_res["smtp_server"] = os.getenv("SMTP_SERVER", "")
+    config_res["smtp_port"] = os.getenv("SMTP_PORT", 587)
+    config_res["email"] = os.getenv("EMAIL_ADDRESS", "")
+    config_res["password"] = os.getenv("EMAIL_PASSWORD", "")
+    config_res["recipient"] = os.getenv("EMAIL_RECIPIENT", "")
+
+    return config_res
+
+def main():
+    # Parse command line arguments
+    parser = argparse.ArgumentParser(description="IAM Policy Compliance 
Checker")
+    parser.add_argument("--action", choices=["check", "announce", "print", 
"generate"], 
+                       help="Action to perform: check compliance, create 
announcement, print announcement, or generate new compliance")
+    args = parser.parse_args()
+
+    config = config_process()
+
+    # Command line argument takes precedence over config file
+    action = args.action if args.action else config.get("action", "check")
+
+    logging.basicConfig(level=getattr(logging, 
config["logging_level"].upper(), logging.INFO),
+                        format=config["logging_format"])
+    logger = logging.getLogger("IAMPolicyComplianceChecker")
+
+    # Create SendingClient if needed for announcement actions
+    sending_client = None
+    if action in ["announce", "print"]:
+        try:
+            # Provide default values for testing, especially for print action
+            github_token = config["github_token"] or "dummy-token"
+            github_repo = config["github_repo"] or "dummy/repo"
+            smtp_server = config["smtp_server"] or "dummy-server"
+            smtp_port = int(config["smtp_port"]) if config["smtp_port"] else 
587
+            email = config["email"] or "[email protected]"
+            password = config["password"] or "dummy-password"
+            
+            sending_client = SendingClient(
+                logger=logger,
+                github_token=github_token,
+                github_repo=github_repo,
+                smtp_server=smtp_server,
+                smtp_port=smtp_port,
+                email=email,
+                password=password
+            )
+        except Exception as e:
+            logger.error(f"Failed to initialize SendingClient: {e}")
+            return 1
+
+    logger.info(f"Starting IAM policy compliance check with action: {action}")
+    iam_checker = IAMPolicyComplianceChecker(config["project_id"], 
config["users_file"], logger, sending_client)
+
+    try:
+        if action == "check":
+            compliance_issues = iam_checker.check_compliance()
+            if compliance_issues:
+                logger.warning("IAM policy compliance issues found:")
+                for issue in compliance_issues:
+                    logger.warning(issue)
+            else:
+                logger.info("IAM policy is compliant.")
+        elif action == "announce":
+            logger.info("Creating announcement for compliance violations...")
+            recipient = config["recipient"] or "[email protected]"
+            iam_checker.create_announcement(recipient)
+        elif action == "print":
+            logger.info("Printing announcement for compliance violations...")
+            recipient = config["recipient"] or "[email protected]"
+            iam_checker.print_announcement(recipient)
+        elif action == "generate":
+            logger.info("Generating new compliance based on current IAM 
policy...")
+            iam_checker.generate_compliance()
+        else:
+            logger.error(f"Unknown action: {action}")
+            return 1
+    except Exception as e:
+        logger.error(f"Error executing action '{action}': {e}")
+        return 1
+
+    return 0
+
+if __name__ == "__main__":
+    
+    sys.exit(main())
diff --git a/infra/enforcement/requirements.txt 
b/infra/enforcement/requirements.txt
new file mode 100644
index 00000000000..fa7fd181f9e
--- /dev/null
+++ b/infra/enforcement/requirements.txt
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This file is used to install the dependencies for the infrastructure
+
+PyYAML==6.0.2
+google-cloud-iam==2.19.0
+google-cloud-resource-manager==1.14.1
+google-cloud-secret-manager==2.24.0
+google-crc32c==1.7.1
diff --git a/infra/enforcement/sending.py b/infra/enforcement/sending.py
new file mode 100644
index 00000000000..961674ca2f1
--- /dev/null
+++ b/infra/enforcement/sending.py
@@ -0,0 +1,179 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import requests
+import logging
+import smtplib, ssl
+from typing import List, Optional
+from dataclasses import dataclass
+
+@dataclass
+class GitHubIssue:
+    """
+    Represents a GitHub issue.
+    """
+    number: int
+    title: str
+    body: str
+    state: str
+    html_url: str
+    created_at: str
+    updated_at: str
+
+class SendingClient:
+    """
+    Sends notifications about GitHub issues.
+    """
+    def __init__(self, logger: logging.Logger, github_token: str, github_repo: 
str,
+                 smtp_server: str, smtp_port: int, email: str, password: str):
+
+        required_keys = [github_token, github_repo, smtp_server, smtp_port, 
email, password]
+
+        if not all(required_keys):
+            raise ValueError("All parameters must be provided.")
+
+        self.github_repo = github_repo
+        self.headers = {
+            "Authorization": f"Bearer {github_token}",
+            "X-GitHub-Api-Version": "2022-11-28",
+            "Accept": "application/vnd.github+json"
+        }
+
+        self.smtp_server = smtp_server
+        self.smtp_port = smtp_port
+        self.email = email
+        self.password = password
+
+        self.logger = logger
+        self.github_api_url = "https://api.github.com";
+
+    def _make_github_request(self, method: str, endpoint: str, json: 
Optional[dict] = None) -> requests.Response:
+        """
+        Makes a request to the GitHub API.
+
+        Args:
+            method (str): The HTTP method to use (e.g., "GET", "POST", 
"PATCH").
+            endpoint (str): The API endpoint to call.
+            json (Optional[dict]): The JSON payload to send with the request.
+
+        Returns:
+            requests.Response: The response from the API.
+        """
+        url = f"{self.github_api_url}/{endpoint}"
+        response = requests.request(method, url, headers=self.headers, 
json=json)
+        
+        if not response.ok:
+            self.logger.error(f"Failed GitHub API request to {endpoint}: 
{response.status_code} - {response.text}")
+            response.raise_for_status()
+            
+        return response
+
+    def _send_email(self, title: str, body: str, recipient: str) -> None:
+        """
+        Sends an email notification.
+
+        Args:
+            title (str): The title of the email.
+            body (str): The body content of the email.
+            recipient (str): The email address of the recipient.
+        """
+        message = f"Subject: {title}\n\n{body}"
+        context = ssl.create_default_context()
+        with smtplib.SMTP_SSL(self.smtp_server, self.smtp_port, 
context=context) as server:
+            server.login(self.email, self.password)
+            server.sendmail(self.email, recipient, message)
+
+    def _get_open_issues(self, title: str) -> List[GitHubIssue]:
+        """
+        Retrieves the number of open GitHub issues with a given title.
+
+        Args:
+            title (str): The title of the GitHub issue.
+        """
+        endpoint = 
f"search/issues/?q=is:issue+repo:{self.github_repo}+in:title+{title}+is:open"
+        response = self._make_github_request("GET", endpoint)
+        issues = response.json().get('items', [])
+        return [GitHubIssue(**issue) for issue in issues]
+
+    def create_issue(self, title: str, body: str) -> GitHubIssue:
+        """
+        Creates a GitHub issue in the specified repository.
+
+        Args:
+            title (str): The title of the GitHub issue.
+            body (str): The body content of the GitHub issue.
+        """
+        endpoint = f"repos/{self.github_repo}/issues"
+        payload = {"title": title, "body": body}
+        response = self._make_github_request("POST", endpoint, json=payload)
+        self.logger.info(f"Successfully created GitHub issue: {title}")
+        return GitHubIssue(**response.json())
+
+    def update_issue_body(self, issue_number: int, new_body: str) -> None:
+        """
+        Updates the body of a GitHub issue in the specified repository.
+
+        Args:
+            issue_number (int): The number of the GitHub issue to update.
+            new_body (str): The new body content for the GitHub issue.
+        """
+        endpoint = f"repos/{self.github_repo}/issues/{issue_number}"
+        payload = {"body": new_body}
+        self._make_github_request("PATCH", endpoint, json=payload)
+        self.logger.info(f"Successfully updated body on GitHub issue: 
#{issue_number}")
+
+    def create_announcement(self, title: str, body: str, recipient: str, 
announcement: str) -> None:
+        """
+        This method sends an email with an announcement. The email will point 
to a GitHub issue.
+
+        Creates a GitHub issue in the specified repository if it doesn't 
already exist.
+        If multiple open versions exist, the most recent one will be updated.
+
+        Args:
+            title (str): The title of the GitHub issue.
+            body (str): The body content of the GitHub issue.
+            recipient (str): The email address of the recipient.
+            announcement (str): The announcement message to include in the 
email.
+        """
+        open_issues = self._get_open_issues(title)
+        open_issues.sort(key=lambda x: x.updated_at, reverse=True)
+        if open_issues:
+            self.logger.info(f"Issue with title '{title}' already exists: 
#{open_issues[0].number}")
+            announcement += f"\n\nRelated GitHub Issue: 
{open_issues[0].html_url}"
+
+            if open_issues[0].body != body:
+                self.logger.info(f"Updating body of issue 
#{open_issues[0].number}")
+                self.update_issue_body(open_issues[0].number, body)
+            else:
+                self.logger.info(f"No changes detected for issue 
#{open_issues[0].number}")
+            self._send_email(title, announcement, recipient)
+        else:
+            new_issue = self.create_issue(title, body)
+            announcement += f"\n\nRelated GitHub Issue: {new_issue.html_url}"
+            self._send_email(title, announcement, recipient)
+
+    def print_announcement(self, title: str, body: str, recipient: str, 
announcement: str) -> None:
+        """
+        This method prints the data instead of sending the email or creating 
an issue.
+        This is used for testing.
+        """
+        self.logger.info("Printing announcement...")
+        print(f"Simulating email sending...")
+        print(f"Recipient: {recipient}")
+        print(f"Announcement: {announcement}")
+
+        print("\nSimulating GitHub issue creation...")
+        print(f"Title: {title}")
+        print(f"Body: {body}")
diff --git a/infra/iam/generate.py b/infra/iam/generate.py
deleted file mode 100644
index 71f6379710e..00000000000
--- a/infra/iam/generate.py
+++ /dev/null
@@ -1,212 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# THIS IS NOT SUPPOSED TO RUN AFTER THE MIGRATION.
-# This script is used to export the IAM policy of a Google Cloud project to a 
YAML format.
-# It retrieves the IAM policy bindings, parses the members, and formats the 
output in a structured
-# YAML format, excluding service accounts and groups. The output includes 
usernames, emails, and
-# their associated permissions, with optional conditions for roles that have 
conditions attached.
-# You need to have the Google Cloud SDK installed and authenticated to run 
this script.
-
-import argparse
-import datetime
-import yaml
-import logging
-from typing import Optional, List, Dict
-from google.cloud import resourcemanager_v3
-from google.api_core import exceptions
-
-# Configure logging
-logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - 
%(message)s')
-logger = logging.getLogger(__name__)
-
-def parse_member(member: str) -> tuple[str, Optional[str], str]:
-    """Parses an IAM member string to extract type, email, and a derived 
username.
-
-    Args:
-        member: The IAM member string
-    Returns:
-        A tuple containing:
-            - username: The derived username from the member string.
-            - email: The email address if available, otherwise None.
-            - member_type: The type of the member (e.g., user, serviceAccount, 
group).
-    """
-    email = None
-    username = member
-
-    # Split the member string to determine type and identifier
-    parts = member.split(':', 1)
-    member_type = parts[0] if len(parts) > 1 else "unknown"
-    identifier = parts[1] if len(parts) > 1 else member
-
-    if member_type in ["user", "serviceAccount", "group"]:
-        email = identifier
-        if '@' in identifier:
-            username = identifier.split('@')[0]
-        else:
-            username = identifier
-    else:
-        username = identifier
-        email = None
-
-    return username, email, member_type
-
-def export_project_iam(project_id: str) -> List[Dict]:
-    """Exports the IAM policy for a given project to YAML format.
-
-    Args:
-        project_id: The ID of the Google Cloud project.
-    Returns:
-        A list of dictionaries containing the IAM policy details.
-    """
-
-    try:
-        client = resourcemanager_v3.ProjectsClient()
-        policy = client.get_iam_policy(resource=f"projects/{project_id}")
-        logger.info(f"Successfully retrieved IAM policy for project 
{project_id}")
-    except exceptions.NotFound as e:
-        logger.error(f"Project {project_id} not found: {e}")
-        raise
-    except exceptions.PermissionDenied as e:
-        logger.error(f"Permission denied for project {project_id}: {e}")
-        raise
-    except Exception as e:
-        logger.error(f"An error occurred while retrieving IAM policy for 
project {project_id}: {e}")
-        raise
-
-    members_data = {}
-
-    for binding in policy.bindings:
-        role = binding.role
-
-        for member_str in binding.members:
-            if member_str not in members_data:
-                username, email_address, member_type = parse_member(member_str)
-                if member_type == "serviceAccount":
-                    continue # Skip service accounts
-                if member_type == "group":
-                    continue  # Skip groups
-                if not email_address:
-                    continue # Skip if no email address is found, probably a 
malformed member
-                members_data[member_str] = {
-                    "username": username,
-                    "email": email_address,
-                    "permissions": []
-                }
-
-            # Skip permissions that have a condition
-            if "withcond" in role:
-                continue
-
-            permission_entry = {}
-            permission_entry["role"] = role
-
-            members_data[member_str]["permissions"].append(permission_entry)
-
-    output_list = []
-    for data in members_data.values():
-        data["permissions"] = sorted(data["permissions"], key=lambda p: 
p["role"])
-        output_list.append({
-            "username": data["username"],
-            "email": data["email"],
-            "permissions": data["permissions"]
-        })
-
-    output_list.sort(key=lambda x: x["username"])
-    return output_list
-
-def to_yaml_file(data: List[Dict], output_file: str, header_info: str = "") -> 
None:
-    """
-    Writes a list of dictionaries to a YAML file.
-    Include the apache license header on the files
-
-    Args:
-        data: A list of dictionaries containing user permissions and details.
-        output_file: The file path where the YAML output will be written.
-        header_info: A string containing the header information to be included 
in the YAML file.
-    """
-
-    apache_license_header = """#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""
-
-    # Prepare the header with the Apache license
-    header = f"{apache_license_header}\n# {header_info}\n# Generated on 
{datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d %H:%M:%S')} 
UTC\n\n"
-
-    try:
-        with open(output_file, "w") as file:
-            file.write(header)
-            yaml.dump(data, file, sort_keys=False, default_flow_style=False, 
indent=2)
-        logger.info(f"Successfully wrote IAM policy data to {output_file}")
-    except IOError as e:
-        logger.error(f"Failed to write to {output_file}: {e}")
-        raise
-
-def main():
-    """
-    Main function to run the script.
-
-    This function parses command-line arguments to either export IAM policies
-    or generate permission differences for a specified GCP project.
-    """
-    parser = argparse.ArgumentParser(
-        description="Export IAM policies or generate permission differences 
for a GCP project."
-    )
-    parser.add_argument(
-        "project_id",
-        help="The Google Cloud project ID."
-    )
-    parser.add_argument(
-        "output_file",
-        help="Defaults to 'users.yml' if not specified. The file where the IAM 
policy will be saved in YAML format.",
-        nargs='?',
-        default="users.yml"
-    )
-    parser.add_argument(
-        "--yes-i-know-what-i-am-doing",
-        action="store_true",
-        help="If set, the script will proceed"
-    )
-
-    args = parser.parse_args()
-    project_id = args.project_id
-    output_file = args.output_file
-
-    if not args.yes_i_know_what_i_am_doing:
-        logger.error("You must use the --yes-i-know-what-i-am-doing flag to 
proceed.")
-        return
-
-    # Export the IAM policy for the specified project
-    iam_data = export_project_iam(project_id)
-
-    # Write the exported data to the specified output file in YAML format
-    to_yaml_file(iam_data, output_file, header_info=f"Exported IAM policy for 
project {project_id}")
-
-if __name__ == "__main__":
-    main()
\ No newline at end of file
diff --git a/infra/iam/users.tf b/infra/iam/users.tf
index 32c26b8bcaa..30d5bfddf8f 100644
--- a/infra/iam/users.tf
+++ b/infra/iam/users.tf
@@ -46,7 +46,7 @@ resource "google_project_iam_member" "project_members" {
   }
   project = var.project_id
   role    = each.value.role
-  member  = "user:${each.value.email}"
+  member = can(regex(".*\\.gserviceaccount\\.com$", each.value.email)) ? 
"serviceAccount:${each.value.email}" : "user:${each.value.email}"
 
   dynamic "condition" {
     # Condition is only created if expiry_date is set
diff --git a/infra/iam/users.yml b/infra/iam/users.yml
index 06e9cea65e7..d76eb5ae267 100644
--- a/infra/iam/users.yml
+++ b/infra/iam/users.yml
@@ -544,4 +544,4 @@
 - username: zhoufek
   email: [email protected]
   permissions:
-  - role: roles/editor
+  - role: roles/editor
\ No newline at end of file
diff --git a/infra/keys/keys.yaml b/infra/keys/keys.yaml
index ca0e170cb21..4e56770b546 100644
--- a/infra/keys/keys.yaml
+++ b/infra/keys/keys.yaml
@@ -1,30 +1,26 @@
 # Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
+# contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
 # The ASF licenses this file to You under the Apache License, Version 2.0
 # (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
+# the License.  You may obtain a copy of the License at
 #
-# http://www.apache.org/licenses/LICENSE-2.0
+#    http://www.apache.org/licenses/LICENSE-2.0
 #
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-# Key management service for Apache Beam
-# This service manages the creation, rotation, and deletion of secrets used in 
Apache Beam.
-# It uses Google Cloud Secret Manager to store and manage secrets securely.
-# This file contains the list of service accounts and the users who can access 
the secrets.
+    
+# Service Account Keys
+# This file contains the service account for the project, the account id
+# and the users authorized to use it
+# service_accounts:
+# - account_id: account_id
+#   display_name: account_@project_id.iam.gserviceaccount.com
+#   authorized_users:
+#     - email: "[email protected]"
+#     - email: "[email protected]"
 
 service_accounts:
-  - account_id: "test-service-account"
-    display_name: "Service account for Beam secrets rotation"
-    authorized_users:
-      - email: "[email protected]"
-      - email: "[email protected]"
-  - account_id: "test-service-account-2"
-    display_name: "Another service account for Beam secrets rotation"
-    authorized_users:
-      - email: "[email protected]"
\ No newline at end of file

Reply via email to