Lee-W commented on code in PR #43873:
URL: https://github.com/apache/airflow/pull/43873#discussion_r1837821960
##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -2113,44 +2114,58 @@ def _activate_referenced_assets(assets:
Collection[AssetModel], *, session: Sess
active_name_to_uri: dict[str, str] = {name: uri for name, uri in
active_assets}
active_uri_to_name: dict[str, str] = {uri: name for name, uri in
active_assets}
- def _generate_dag_warnings(offending: AssetModel, attr: str, value:
str) -> Iterator[DagWarning]:
+ def _generate_warning_message(
+ offending: AssetModel, attr: str, value: str
+ ) -> Iterator[tuple[str, str]]:
for ref in itertools.chain(offending.consuming_dags,
offending.producing_tasks):
- yield DagWarning(
- dag_id=ref.dag_id,
- error_type=DagWarningType.ASSET_CONFLICT,
- message=f"Cannot activate asset {offending}; {attr} is
already associated to {value!r}",
+ yield (
+ ref.dag_id,
+ f"Cannot activate asset {offending}; {attr} is already
associated to {value!r}",
)
- def _activate_assets_generate_warnings() -> Iterator[DagWarning]:
+ def _activate_assets_generate_warnings() -> Iterator[tuple[str, str]]:
incoming_name_to_uri: dict[str, str] = {}
incoming_uri_to_name: dict[str, str] = {}
for asset in assets:
if (asset.name, asset.uri) in active_assets:
continue
existing_uri = active_name_to_uri.get(asset.name) or
incoming_name_to_uri.get(asset.name)
if existing_uri is not None and existing_uri != asset.uri:
- yield from _generate_dag_warnings(asset, "name",
existing_uri)
+ yield from _generate_warning_message(asset, "name",
existing_uri)
continue
existing_name = active_uri_to_name.get(asset.uri) or
incoming_uri_to_name.get(asset.uri)
if existing_name is not None and existing_name != asset.name:
- yield from _generate_dag_warnings(asset, "uri",
existing_name)
+ yield from _generate_warning_message(asset, "uri",
existing_name)
continue
incoming_name_to_uri[asset.name] = asset.uri
incoming_uri_to_name[asset.uri] = asset.name
session.add(AssetActive.for_asset(asset))
- warnings_to_have = {w.dag_id: w for w in
_activate_assets_generate_warnings()}
+ def _get_first_item(x: Sequence[Any]) -> Any:
+ return x[0]
+
+ warnings_to_have = {
+ dag_id: DagWarning(
+ dag_id=dag_id,
+ error_type=DagWarningType.ASSET_CONFLICT,
+ message="\n".join([row[1] for row in group]),
+ )
Review Comment:
updated. Thanks!
##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -2113,44 +2114,58 @@ def _activate_referenced_assets(assets:
Collection[AssetModel], *, session: Sess
active_name_to_uri: dict[str, str] = {name: uri for name, uri in
active_assets}
active_uri_to_name: dict[str, str] = {uri: name for name, uri in
active_assets}
- def _generate_dag_warnings(offending: AssetModel, attr: str, value:
str) -> Iterator[DagWarning]:
+ def _generate_warning_message(
+ offending: AssetModel, attr: str, value: str
+ ) -> Iterator[tuple[str, str]]:
for ref in itertools.chain(offending.consuming_dags,
offending.producing_tasks):
- yield DagWarning(
- dag_id=ref.dag_id,
- error_type=DagWarningType.ASSET_CONFLICT,
- message=f"Cannot activate asset {offending}; {attr} is
already associated to {value!r}",
+ yield (
+ ref.dag_id,
+ f"Cannot activate asset {offending}; {attr} is already
associated to {value!r}",
)
- def _activate_assets_generate_warnings() -> Iterator[DagWarning]:
+ def _activate_assets_generate_warnings() -> Iterator[tuple[str, str]]:
incoming_name_to_uri: dict[str, str] = {}
incoming_uri_to_name: dict[str, str] = {}
for asset in assets:
if (asset.name, asset.uri) in active_assets:
continue
existing_uri = active_name_to_uri.get(asset.name) or
incoming_name_to_uri.get(asset.name)
if existing_uri is not None and existing_uri != asset.uri:
- yield from _generate_dag_warnings(asset, "name",
existing_uri)
+ yield from _generate_warning_message(asset, "name",
existing_uri)
continue
existing_name = active_uri_to_name.get(asset.uri) or
incoming_uri_to_name.get(asset.uri)
if existing_name is not None and existing_name != asset.name:
- yield from _generate_dag_warnings(asset, "uri",
existing_name)
+ yield from _generate_warning_message(asset, "uri",
existing_name)
continue
incoming_name_to_uri[asset.name] = asset.uri
incoming_uri_to_name[asset.uri] = asset.name
session.add(AssetActive.for_asset(asset))
- warnings_to_have = {w.dag_id: w for w in
_activate_assets_generate_warnings()}
+ def _get_first_item(x: Sequence[Any]) -> Any:
+ return x[0]
+
+ warnings_to_have = {
+ dag_id: DagWarning(
+ dag_id=dag_id,
+ error_type=DagWarningType.ASSET_CONFLICT,
+ message="\n".join([row[1] for row in group]),
+ )
+ for dag_id, group in groupby(
+ sorted(_activate_assets_generate_warnings(),
key=_get_first_item), key=_get_first_item
Review Comment:
updated. Thanks!
##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -2113,44 +2114,58 @@ def _activate_referenced_assets(assets:
Collection[AssetModel], *, session: Sess
active_name_to_uri: dict[str, str] = {name: uri for name, uri in
active_assets}
active_uri_to_name: dict[str, str] = {uri: name for name, uri in
active_assets}
- def _generate_dag_warnings(offending: AssetModel, attr: str, value:
str) -> Iterator[DagWarning]:
+ def _generate_warning_message(
+ offending: AssetModel, attr: str, value: str
+ ) -> Iterator[tuple[str, str]]:
for ref in itertools.chain(offending.consuming_dags,
offending.producing_tasks):
- yield DagWarning(
- dag_id=ref.dag_id,
- error_type=DagWarningType.ASSET_CONFLICT,
- message=f"Cannot activate asset {offending}; {attr} is
already associated to {value!r}",
+ yield (
+ ref.dag_id,
+ f"Cannot activate asset {offending}; {attr} is already
associated to {value!r}",
)
- def _activate_assets_generate_warnings() -> Iterator[DagWarning]:
+ def _activate_assets_generate_warnings() -> Iterator[tuple[str, str]]:
incoming_name_to_uri: dict[str, str] = {}
incoming_uri_to_name: dict[str, str] = {}
for asset in assets:
if (asset.name, asset.uri) in active_assets:
continue
existing_uri = active_name_to_uri.get(asset.name) or
incoming_name_to_uri.get(asset.name)
if existing_uri is not None and existing_uri != asset.uri:
- yield from _generate_dag_warnings(asset, "name",
existing_uri)
+ yield from _generate_warning_message(asset, "name",
existing_uri)
continue
existing_name = active_uri_to_name.get(asset.uri) or
incoming_uri_to_name.get(asset.uri)
if existing_name is not None and existing_name != asset.name:
- yield from _generate_dag_warnings(asset, "uri",
existing_name)
+ yield from _generate_warning_message(asset, "uri",
existing_name)
continue
incoming_name_to_uri[asset.name] = asset.uri
incoming_uri_to_name[asset.uri] = asset.name
session.add(AssetActive.for_asset(asset))
- warnings_to_have = {w.dag_id: w for w in
_activate_assets_generate_warnings()}
+ def _get_first_item(x: Sequence[Any]) -> Any:
+ return x[0]
Review Comment:
updated. Thanks!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]