[GitHub] [airflow] takersk commented on issue #29561: CeleryKubernetesExecutor @task(queue="kubernetes") not work
takersk commented on issue #29561: URL: https://github.com/apache/airflow/issues/29561#issuecomment-1436439675 @tirkarthi thank you I solved it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] takersk commented on issue #29561: CeleryKubernetesExecutor @task(queue="kubernetes") not work
takersk commented on issue #29561: URL: https://github.com/apache/airflow/issues/29561#issuecomment-1436408171 @tirkarthi There is no value returned by task, exec_trino_query_pod task will run the actual ==> return KubernetesPodOperator( name=task_id.replace("_", "-"), ... do_xcom_push=False, ) There is no problem with KubernetesExecutor , only CeleryKubernetesExecutor has issues. Will this make a difference? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] takersk commented on issue #29561: CeleryKubernetesExecutor @task(queue="kubernetes") not work
takersk commented on issue #29561: URL: https://github.com/apache/airflow/issues/29561#issuecomment-1436378898 @tirkarthi cc: @hussein-awala Thank you for answer. I tested it after changing task_id, but another error is occurring. Can I check it? dags code modiftyed ``` @task(queue="kubernetes") def example_task_kubernetes(_dag): query_str = "select * from kudu.dw.dim_gift_brand limit 10" exec_trino_query_string_task = exec_trino_query_pod( dag=_dag, task_id="task_kubernetes", trino_options=TrinoStringOptions( query_string=query_str, ) ) return exec_trino_query_string_task def example_task_celery(_dag): query_str = "select * from kudu.dw.dim_gift_brand limit 10" exec_trino_query_string_task = exec_trino_query_pod( dag=_dag, task_id="task_celery", trino_options=TrinoStringOptions( query_string=query_str, ) ) return exec_trino_query_string_task ``` error https://user-images.githubusercontent.com/3029712/220021608-92fd205d-f6ec-434d-b8c5-8ceecad2edf3.png";> ``` AIRFLOW_CTX_DAG_RUN_ID=manual__2023-02-20T05:53:47.226076+00:00 [2023-02-20, 14:57:38 KST] {python.py:173} INFO - Done. Returned value was: [2023-02-20, 14:57:38 KST] {xcom.py:585} ERROR - Could not serialize the XCom value into JSON. If you are using pickle instead of JSON for XCom, then you need to enable pickle support for XCom in your airflow config. [2023-02-20, 14:57:38 KST] {taskinstance.py:1902} ERROR - Task failed with exception Traceback (most recent call last): File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 68, in wrapper return func(*args, **kwargs) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 2398, in xcom_push XCom.set( File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 68, in wrapper return func(*args, **kwargs) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/xcom.py", line 192, in set value = cls.serialize_value( File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/xcom.py", line 583, in serialize_value return json.dumps(value).encode('UTF-8') File "/usr/local/lib/python3.8/json/__init__.py", line 231, in dumps return _default_encoder.encode(obj) File "/usr/local/lib/python3.8/json/encoder.py", line 199, in encode chunks = self.iterencode(o, _one_shot=True) File "/usr/local/lib/python3.8/json/encoder.py", line 257, in iterencode return _iterencode(o, 0) File "/usr/local/lib/python3.8/json/encoder.py", line 179, in default raise TypeError(f'Object of type {o.__class__.__name__} ' TypeError: Object of type KubernetesPodOperator is not JSON serializable [2023-02-20, 14:57:38 KST] {taskinstance.py:1407} INFO - Marking task as FAILED. dag_id=example_dag, task_id=example_task_kubernetes, execution_date=20230220T055347, start_date=20230220T055738, end_date=20230220T055738 [2023-02-20, 14:57:38 KST] {standard_task_runner.py:92} ERROR - Failed to execute job 1776 for task example_task_kubernetes (Object of type KubernetesPodOperator is not JSON serializable; 14) [2023-02-20, 14:57:38 KST] {local_task_job.py:156} INFO - Task exited with return code 1 [2023-02-20, 14:57:38 KST] {local_task_job.py:279} INFO - 0 downstream tasks scheduled from follow-on schedule check ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] takersk commented on issue #29561: CeleryKubernetesExecutor @task(queue="kubernetes") not work
takersk commented on issue #29561: URL: https://github.com/apache/airflow/issues/29561#issuecomment-1436153289 @hussein-awala yes!! ``` import os from datetime import datetime, timedelta from airflow import DAG from common.enum.enum import DagEnum, DatahubEnum from common.operator.custom_dummy_operator import CustomDummyOperator from common.options.trino_options import TrinoStringOptions from common.utils.date import KST from common.utils.date import ( execution_date_fmt, ) from common.utils.pod import ( exec_trino_query_pod, ) from common.utils.watchtower import WatchTower from airflow.decorators import task @task(queue="kubernetes") def example_task_kubernetes(_dag): query_str = "select * from kudu.dw.dim_gift_brand limit 10" exec_trino_query_string_task = exec_trino_query_pod( dag=_dag, task_id="example_task_kubernetes", trino_options=TrinoStringOptions( query_string=query_str, ) ) return exec_trino_query_string_task def example_task_celery(_dag): query_str = "select * from kudu.dw.dim_gift_brand limit 10" exec_trino_query_string_task = exec_trino_query_pod( dag=_dag, task_id="example_task_celery", trino_options=TrinoStringOptions( query_string=query_str, ) ) return exec_trino_query_string_task default_args = { "owner": DagEnum.DE_OWNER.value, "depends_on_past": False, "start_date": datetime(2022, 10, 6, tzinfo=KST), "email_on_failure": False, "email_on_retry": False, "execution_timeout": timedelta(hours=int(DagEnum.EXECUTION_TIMEOUT_HOUR_MAX.value)), "retries": int(DagEnum.RETRIES_MID.value), # "on_failure_callback": WatchTower().fail, "retry_delay": timedelta(minutes=int(DagEnum.RETRY_DELAY_MIN.value)), } with DAG( dag_id=os.path.basename(__file__).replace(".py", ""), default_args=default_args, schedule_interval="*/10 * * * *", user_defined_macros={ "execution_date_fmt": execution_date_fmt, }, concurrency=int(DagEnum.CONCURRENCY_MID.value), max_active_runs=int(DagEnum.MAX_ACTIVE_RUNS_MIN.value), catchup=False, tags=[DatahubEnum.TAG_TRINO.value, DatahubEnum.TAG_SPARK.value, DatahubEnum.TAG_DISTCP.value], ) as dag: # dummy task start_task = CustomDummyOperator(task_id="start", dag=dag) end_task = CustomDummyOperator(task_id="end", dag=dag) # tasks # start_task >> example_task_kubernetes(dag) >> end_task start_task >> example_task_celery(dag) >> end_task ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org