Re: [PR] Allow Asset decorator to work with any TaskFlow operator [airflow]
marclamberti commented on PR #51229: URL: https://github.com/apache/airflow/pull/51229#issuecomment-3215412575 Awesome contribution! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Allow Asset decorator to work with any TaskFlow operator [airflow]
boring-cyborg[bot] commented on PR #51229: URL: https://github.com/apache/airflow/pull/51229#issuecomment-2994865564 Awesome work, congrats on your first merged pull request! You are invited to check our [Issue Tracker](https://github.com/apache/airflow/issues) for additional contributions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Allow Asset decorator to work with any TaskFlow operator [airflow]
uranusjr merged PR #51229: URL: https://github.com/apache/airflow/pull/51229 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Allow Asset decorator to work with any TaskFlow operator [airflow]
Lee-W commented on PR #51229: URL: https://github.com/apache/airflow/pull/51229#issuecomment-2987289310 the test failure doesn't look like something related to this change. let me rebase from the main branch and see how it works -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Allow Asset decorator to work with any TaskFlow operator [airflow]
yoeo commented on code in PR #51229: URL: https://github.com/apache/airflow/pull/51229#discussion_r2146709805 ## task-sdk/src/airflow/sdk/definitions/asset/decorators.py: ## @@ -98,6 +99,18 @@ def determine_kwargs(self, context: Mapping[str, Any]) -> Mapping[str, Any]: return dict(self._iter_kwargs(context)) +def _instanciate_task(definition: AssetDefinition | MultiAssetDefinition) -> _AssetMainOperator | XComArg: Review Comment: Good catch! I fixed the misspelling. I also removed the unnecessary `return` and it works the same as before. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Allow Asset decorator to work with any TaskFlow operator [airflow]
uranusjr commented on code in PR #51229: URL: https://github.com/apache/airflow/pull/51229#discussion_r2142127910 ## task-sdk/src/airflow/sdk/definitions/asset/decorators.py: ## @@ -98,6 +99,18 @@ def determine_kwargs(self, context: Mapping[str, Any]) -> Mapping[str, Any]: return dict(self._iter_kwargs(context)) +def _instanciate_task(definition: AssetDefinition | MultiAssetDefinition) -> _AssetMainOperator | XComArg: Review Comment: Also we probably do not need to return anything from this function? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Allow Asset decorator to work with any TaskFlow operator [airflow]
uranusjr commented on code in PR #51229: URL: https://github.com/apache/airflow/pull/51229#discussion_r2142126207 ## task-sdk/src/airflow/sdk/definitions/asset/decorators.py: ## @@ -98,6 +99,18 @@ def determine_kwargs(self, context: Mapping[str, Any]) -> Mapping[str, Any]: return dict(self._iter_kwargs(context)) +def _instanciate_task(definition: AssetDefinition | MultiAssetDefinition) -> _AssetMainOperator | XComArg: Review Comment: I believe the correct spelling is _instantiate_ instead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Allow Asset decorator to work with any TaskFlow operator [airflow]
yoeo commented on code in PR #51229: URL: https://github.com/apache/airflow/pull/51229#discussion_r2123653578 ## task-sdk/src/airflow/sdk/definitions/asset/decorators.py: ## @@ -111,7 +112,12 @@ class AssetDefinition(Asset): def __attrs_post_init__(self) -> None: with self._source.create_dag(default_dag_id=self.name): -_AssetMainOperator.from_definition(self) +if isinstance(self._function, _TaskDecorator): +if "outlets" not in self._function.kwargs: +self._function.kwargs["outlets"] = [v for _, v in self.iter_assets()] +self._function() +else: +_AssetMainOperator.from_definition(self) Review Comment: `+1` on banning the use `outlets` when using `@asset` + `@task`, to prevent accidents. I also applied the refactor you proposed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Allow Asset decorator to work with any TaskFlow operator [airflow]
yoeo commented on code in PR #51229: URL: https://github.com/apache/airflow/pull/51229#discussion_r2123647454 ## airflow-core/docs/authoring-and-scheduling/assets.rst: ## @@ -229,6 +229,15 @@ The other way around also applies: def process_example_asset(example_asset): """Process inlet example_asset...""" +In addition, ``@asset`` can be used with ``@task`` to set initial arguments for the task or to use an operator other than ``PythonOperator``: Review Comment: Indeed. I added the reference to the TaskFlow API -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Allow Asset decorator to work with any TaskFlow operator [airflow]
yoeo commented on code in PR #51229: URL: https://github.com/apache/airflow/pull/51229#discussion_r2123645876 ## task-sdk/src/airflow/sdk/definitions/asset/decorators.py: ## @@ -23,6 +23,7 @@ import attrs from airflow.providers.standard.operators.python import PythonOperator +from airflow.sdk.bases.decorator import _TaskDecorator Review Comment: Thanks for the tip! I removed this top-level import. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Allow Asset decorator to work with any TaskFlow operator [airflow]
uranusjr commented on code in PR #51229: URL: https://github.com/apache/airflow/pull/51229#discussion_r2121277822 ## task-sdk/src/airflow/sdk/definitions/asset/decorators.py: ## @@ -111,7 +112,12 @@ class AssetDefinition(Asset): def __attrs_post_init__(self) -> None: with self._source.create_dag(default_dag_id=self.name): -_AssetMainOperator.from_definition(self) +if isinstance(self._function, _TaskDecorator): +if "outlets" not in self._function.kwargs: +self._function.kwargs["outlets"] = [v for _, v in self.iter_assets()] +self._function() +else: +_AssetMainOperator.from_definition(self) Review Comment: It could be a good idea to extract this to a function for both classes to call. We should document the `outlets` behaviour; it could be surprising to the user that explicitly setting the argument _removes the asset itself_. (Or maybe we should always append the current asset as an outlet instead?) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Allow Asset decorator to work with any TaskFlow operator [airflow]
uranusjr commented on code in PR #51229: URL: https://github.com/apache/airflow/pull/51229#discussion_r2121283208 ## task-sdk/src/airflow/sdk/definitions/asset/decorators.py: ## @@ -111,7 +112,12 @@ class AssetDefinition(Asset): def __attrs_post_init__(self) -> None: with self._source.create_dag(default_dag_id=self.name): -_AssetMainOperator.from_definition(self) +if isinstance(self._function, _TaskDecorator): +if "outlets" not in self._function.kwargs: +self._function.kwargs["outlets"] = [v for _, v in self.iter_assets()] +self._function() +else: +_AssetMainOperator.from_definition(self) Review Comment: Or maybe we should just ban using `outlets` in this situation. It’d confuse the heck out of the scheduler. You can always just write a normal dag if you really want to set `outlets` directly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Allow Asset decorator to work with any TaskFlow operator [airflow]
uranusjr commented on code in PR #51229: URL: https://github.com/apache/airflow/pull/51229#discussion_r2121283208 ## task-sdk/src/airflow/sdk/definitions/asset/decorators.py: ## @@ -111,7 +112,12 @@ class AssetDefinition(Asset): def __attrs_post_init__(self) -> None: with self._source.create_dag(default_dag_id=self.name): -_AssetMainOperator.from_definition(self) +if isinstance(self._function, _TaskDecorator): +if "outlets" not in self._function.kwargs: +self._function.kwargs["outlets"] = [v for _, v in self.iter_assets()] +self._function() +else: +_AssetMainOperator.from_definition(self) Review Comment: Or maybe we should just ban using `outlets` in this situation. It’d confuse the heck out of the scheduler. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Allow Asset decorator to work with any TaskFlow operator [airflow]
uranusjr commented on code in PR #51229: URL: https://github.com/apache/airflow/pull/51229#discussion_r2121263045 ## task-sdk/src/airflow/sdk/definitions/asset/decorators.py: ## @@ -23,6 +23,7 @@ import attrs from airflow.providers.standard.operators.python import PythonOperator +from airflow.sdk.bases.decorator import _TaskDecorator Review Comment: You can use `getattr(f, "_airflow_is_task_decorator", False)` to get rid of the import. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Allow Asset decorator to work with any TaskFlow operator [airflow]
uranusjr commented on code in PR #51229: URL: https://github.com/apache/airflow/pull/51229#discussion_r2121258284 ## airflow-core/docs/authoring-and-scheduling/assets.rst: ## @@ -229,6 +229,15 @@ The other way around also applies: def process_example_asset(example_asset): """Process inlet example_asset...""" +In addition, ``@asset`` can be used with ``@task`` to set initial arguments for the task or to use an operator other than ``PythonOperator``: Review Comment: This should reference the taskflow doc page so users can find relevant descriptions more easily. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Allow Asset decorator to work with any TaskFlow operator [airflow]
yoeo commented on code in PR #51229: URL: https://github.com/apache/airflow/pull/51229#discussion_r2115061856 ## task-sdk/src/airflow/sdk/definitions/asset/decorators.py: ## @@ -23,6 +23,7 @@ import attrs from airflow.providers.standard.operators.python import PythonOperator +from airflow.sdk.bases.decorator import _TaskDecorator Review Comment: I can remove this import if there is a better way to check if a function is a task decorator. And I can also move it from top level to local import if there is might can cause performance issues... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Allow Asset decorator to work with any TaskFlow operator [airflow]
boring-cyborg[bot] commented on PR #51229: URL: https://github.com/apache/airflow/pull/51229#issuecomment-2921093724 Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst) Here are some useful points: - Pay attention to the quality of your code (ruff, mypy and type annotations). Our [pre-commits]( https://github.com/apache/airflow/blob/main/contributing-docs/08_static_code_checks.rst#prerequisites-for-pre-commit-hooks) will help you with that. - In case of a new feature add useful documentation (in docstrings or in `docs/` directory). Adding a new operator? Check this short [guide](https://github.com/apache/airflow/blob/main/airflow-core/docs/howto/custom-operator.rst) Consider adding an example DAG that shows how users should use it. - Consider using [Breeze environment](https://github.com/apache/airflow/blob/main/dev/breeze/doc/README.rst) for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations. - Be patient and persistent. It might take some time to get a review or get the final approval from Committers. - Please follow [ASF Code of Conduct](https://www.apache.org/foundation/policies/conduct) for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack. - Be sure to read the [Airflow Coding style]( https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#coding-style-and-best-practices). - Always keep your Pull Requests rebased, otherwise your build might fail due to changes not related to your commits. Apache Airflow is a community-driven project and together we are making it better 🚀. In case of doubts contact the developers at: Mailing List: [email protected] Slack: https://s.apache.org/airflow-slack -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
