cetingokhan commented on code in PR #66612:
URL: https://github.com/apache/airflow/pull/66612#discussion_r3334702871
##########
providers/informatica/src/airflow/providers/informatica/plugins/listener.py:
##########
@@ -30,107 +33,214 @@
_informatica_listener: InformaticaListener | None = None
+class InformaticaLineageResolutionError(RuntimeError):
+ """Raised when an EDC object cannot be resolved for a lineage URI."""
+
+
+def _resolve_uri_to_object_id(hook: InformaticaLineageExtractor, uri: str) ->
str:
+ """
+ Resolve an EDC lineage URI to an Informatica catalog object ID.
+
+ Manual lineage entries are treated as concrete object identifiers/URIs.
+ They are validated directly via ``get_object`` instead of being reparsed
+ and looked up again with ``find_object_id``.
+ """
+ log = logging.getLogger(__name__)
+ try:
+ obj = hook.get_object(uri)
+ except InformaticaEDCError as exc:
+ raise InformaticaLineageResolutionError(
+ f"Failed to resolve EDC object for URI {uri!r}: {exc}"
+ ) from exc
+
+ object_id = obj.get("id") if isinstance(obj, dict) else None
+ if not object_id:
+ raise InformaticaLineageResolutionError(
+ f"Could not resolve EDC object for URI {uri!r}. Ensure the object
exists in the Informatica catalog."
+ )
+ log.debug("Resolved URI %r to EDC object_id=%s", uri, object_id)
+ return object_id
+
+
class InformaticaListener:
"""Informatica listener sends events on task instance state changes to
Informatica EDC for lineage tracking."""
def __init__(self):
- self._executor = None
self.log = logging.getLogger(__name__)
self.hook = InformaticaLineageExtractor(edc_hook=InformaticaEDCHook())
- # self.extractor_manager = ExtractorManager()
+ # Cache: _cache_key(ti) -> (valid_inlets, valid_outlets)
+ # Populated by on_task_instance_running (pre-validation), consumed by
+ # on_task_instance_success and cleared by on_task_instance_failed.
+ self._resolved_cache: dict[tuple, tuple[list[tuple[str, str]],
list[tuple[str, str]]]] = {}
+
+ @staticmethod
+ def _cache_key(task_instance: TaskInstance) -> tuple:
+ dag_id = getattr(task_instance, "dag_id", None)
+ if dag_id is None:
+ task = getattr(task_instance, "task", None)
+ dag_id = getattr(task, "dag_id", None)
+ return (
+ dag_id,
+ getattr(task_instance, "run_id", None),
+ task_instance.task_id,
+ getattr(task_instance, "map_index", -1),
+ getattr(task_instance, "try_number", None),
+ )
@hookimpl
def on_task_instance_success(
self, previous_state: TaskInstanceState, task_instance: TaskInstance,
*args, **kwargs
):
- self._handle_lineage(task_instance, state="success")
+ key = self._cache_key(task_instance)
+ cached = self._resolved_cache.pop(key, None)
+ if cached is None:
+ # Running hook was skipped (e.g. operator disabled) - nothing to
do.
+ return
+ valid_inlets, valid_outlets = cached
+ self._create_lineage_links(valid_inlets, valid_outlets,
task_instance.task_id)
@hookimpl
def on_task_instance_failed(
self, previous_state: TaskInstanceState, task_instance: TaskInstance,
*args, **kwargs
):
- self._handle_lineage(task_instance, state="failed")
+ # Clean up cache entry so stale entries do not accumulate.
+ self._resolved_cache.pop(self._cache_key(task_instance), None)
@hookimpl
def on_task_instance_running(
self, previous_state: TaskInstanceState, task_instance: TaskInstance,
*args, **kwargs
):
- self._handle_lineage(task_instance, state="running")
-
- def _handle_lineage(self, task_instance: TaskInstance, state: str):
"""
- Handle lineage resolution for inlets and outlets.
+ Validate and pre-resolve all inlet/outlet URIs before the task
executes.
- For each inlet and outlet, resolve Informatica EDC object IDs using
getObject.
- If valid, collect and create lineage links between all valid inlets
and outlets.
+ Raises :class:`InformaticaLineageResolutionError` if any URI or table
cannot
+ be resolved in the Informatica catalog. This causes Airflow to fail
the task
+ immediately - before the operator ``execute()`` is called.
Review Comment:
I added pre_execute method for fail-fast validation ;)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]