jscheffl commented on code in PR #36492: URL: https://github.com/apache/airflow/pull/36492#discussion_r1438942362
########## tests/models/test_trigger.py: ########## @@ -337,3 +341,47 @@ def test_get_sorted_triggers_different_priority_weights(session, create_task_ins trigger_ids_query = Trigger.get_sorted_triggers(capacity=100, alive_triggerer_ids=[], session=session) assert trigger_ids_query == [(2,), (1,)] + + +class SensitiveKwargsTrigger(BaseTrigger): + """ + A trigger that has sensitive kwargs. + """ + + def __init__(self, param1: str, param2: str): + super().__init__() + self.param1 = param1 + self.param2 = param2 + + def serialize(self) -> tuple[str, dict[str, Any]]: + return ( + "tests.models.test_trigger.SensitiveKwargsTrigger", + { + "param1": self.param1, + "encrypted__param2": self.param2, + }, + ) + + async def run(self) -> AsyncIterator[TriggerEvent]: + yield TriggerEvent({}) + + +@conf_vars({("core", "fernet_key"): Fernet.generate_key().decode()}) +def test_serialize_sensitive_kwargs(): + """ + Tests that sensitive kwargs are encrypted. + """ + trigger_instance = SensitiveKwargsTrigger(param1="value1", param2="value2") + trigger_row: Trigger = Trigger.from_object(trigger_instance) + + assert trigger_row.kwargs["param1"] == "value1" + assert ( + get_fernet().decrypt(trigger_row.kwargs["encrypted__param2"].encode("utf-8")).decode("utf-8") + == "value2" + ) Review Comment: In this pytest you actually re-implement the 100% encryption logic. Can you rather test on an "abstract" level, that stored value in DB is != the original value? Details of encryption should be in the implementation (=black box), pytest just should ensire you do not miss a regression and by accident with an update the encryption is missed. Proposal: ```suggestion assert trigger_row.kwargs["encrypted__param2"] != "value2" ``` ########## airflow/jobs/triggerer_job_runner.py: ########## @@ -708,3 +710,16 @@ def get_trigger_by_classpath(self, classpath: str) -> type[BaseTrigger]: if classpath not in self.trigger_cache: self.trigger_cache[classpath] = import_string(classpath) return self.trigger_cache[classpath] + + def trigger_row_to_trigger_instance(self, trigger_row: Trigger, trigger_class: type[U]) -> U: + """Convert a Trigger row into a Trigger instance.""" + from airflow.models.crypto import get_fernet + + decrypted_kwargs = {} + fernet = get_fernet() + for k, v in trigger_row.kwargs.items(): + if k.startswith("encrypted__"): + decrypted_kwargs[k[11:]] = fernet.decrypt(v.encode("utf-8")).decode("utf-8") Review Comment: Are you aure this is correct? In here you cur the prefix `encrypted__` off the key but in airflow/models/trigger.py:100 (https://github.com/apache/airflow/pull/36492/files#diff-3d13622d326f0c5cb736bb4fa5b0f3eb4e3d1e86b4b85fb0fb8503a2a8e10a89R100) you don't - should this not be mirroring the logic? ```suggestion decrypted_kwargs[k] = fernet.decrypt(v.encode("utf-8")).decode("utf-8") ``` ########## airflow/models/trigger.py: ########## @@ -90,8 +90,17 @@ def __init__( @internal_api_call def from_object(cls, trigger: BaseTrigger) -> Trigger: """Alternative constructor that creates a trigger row based directly off of a Trigger object.""" + from airflow.models.crypto import get_fernet + classpath, kwargs = trigger.serialize() - return cls(classpath=classpath, kwargs=kwargs) + secure_kwargs = {} + fernet = get_fernet() + for k, v in kwargs.items(): + if k.startswith("encrypted__"): Review Comment: The literal `"encrypted__"` is used now across the code multiple times, can you make this a constant and all parts of the code refer to the same constant? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org