[GitHub] [airflow] takersk commented on issue #29561: CeleryKubernetesExecutor @task(queue="kubernetes")  not work

2023-02-19 Thread via GitHub


takersk commented on issue #29561:
URL: https://github.com/apache/airflow/issues/29561#issuecomment-1436439675

   @tirkarthi 
   thank you I solved it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] takersk commented on issue #29561: CeleryKubernetesExecutor @task(queue="kubernetes")  not work

2023-02-19 Thread via GitHub


takersk commented on issue #29561:
URL: https://github.com/apache/airflow/issues/29561#issuecomment-1436408171

   @tirkarthi 
   There is no value returned by task, exec_trino_query_pod task will run the 
actual
   ==>
   return KubernetesPodOperator(
   name=task_id.replace("_", "-"),
   ...
   do_xcom_push=False,
   )
   
   
   There is no problem with KubernetesExecutor , only CeleryKubernetesExecutor 
has issues. Will this make a difference?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] takersk commented on issue #29561: CeleryKubernetesExecutor @task(queue="kubernetes")  not work

2023-02-19 Thread via GitHub


takersk commented on issue #29561:
URL: https://github.com/apache/airflow/issues/29561#issuecomment-1436378898

   @tirkarthi cc: @hussein-awala 
   
   Thank you for answer. I tested it after changing task_id, but another error 
is occurring. Can I check it?
   
   dags code modiftyed
   ```
   @task(queue="kubernetes")
   def example_task_kubernetes(_dag):
   query_str = "select * from kudu.dw.dim_gift_brand limit 10"
   exec_trino_query_string_task = exec_trino_query_pod(
   dag=_dag,
   task_id="task_kubernetes",
   trino_options=TrinoStringOptions(
   query_string=query_str,
   )
   )
   
   return exec_trino_query_string_task
   
   
   def example_task_celery(_dag):
   query_str = "select * from kudu.dw.dim_gift_brand limit 10"
   exec_trino_query_string_task = exec_trino_query_pod(
   dag=_dag,
   task_id="task_celery",
   trino_options=TrinoStringOptions(
   query_string=query_str,
   )
   )
   
   return exec_trino_query_string_task
   ```
   
   error
   https://user-images.githubusercontent.com/3029712/220021608-92fd205d-f6ec-434d-b8c5-8ceecad2edf3.png";>
   
   ```
   AIRFLOW_CTX_DAG_RUN_ID=manual__2023-02-20T05:53:47.226076+00:00
   [2023-02-20, 14:57:38 KST] {python.py:173} INFO - Done. Returned value was: 

   [2023-02-20, 14:57:38 KST] {xcom.py:585} ERROR - Could not serialize the 
XCom value into JSON. If you are using pickle instead of JSON for XCom, then 
you need to enable pickle support for XCom in your airflow config.
   [2023-02-20, 14:57:38 KST] {taskinstance.py:1902} ERROR - Task failed with 
exception
   Traceback (most recent call last):
 File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", 
line 68, in wrapper
   return func(*args, **kwargs)
 File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py",
 line 2398, in xcom_push
   XCom.set(
 File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", 
line 68, in wrapper
   return func(*args, **kwargs)
 File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/xcom.py", line 
192, in set
   value = cls.serialize_value(
 File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/xcom.py", line 
583, in serialize_value
   return json.dumps(value).encode('UTF-8')
 File "/usr/local/lib/python3.8/json/__init__.py", line 231, in dumps
   return _default_encoder.encode(obj)
 File "/usr/local/lib/python3.8/json/encoder.py", line 199, in encode
   chunks = self.iterencode(o, _one_shot=True)
 File "/usr/local/lib/python3.8/json/encoder.py", line 257, in iterencode
   return _iterencode(o, 0)
 File "/usr/local/lib/python3.8/json/encoder.py", line 179, in default
   raise TypeError(f'Object of type {o.__class__.__name__} '
   TypeError: Object of type KubernetesPodOperator is not JSON serializable
   [2023-02-20, 14:57:38 KST] {taskinstance.py:1407} INFO - Marking task as 
FAILED. dag_id=example_dag, task_id=example_task_kubernetes, 
execution_date=20230220T055347, start_date=20230220T055738, 
end_date=20230220T055738
   [2023-02-20, 14:57:38 KST] {standard_task_runner.py:92} ERROR - Failed to 
execute job 1776 for task example_task_kubernetes (Object of type 
KubernetesPodOperator is not JSON serializable; 14)
   [2023-02-20, 14:57:38 KST] {local_task_job.py:156} INFO - Task exited with 
return code 1
   [2023-02-20, 14:57:38 KST] {local_task_job.py:279} INFO - 0 downstream tasks 
scheduled from follow-on schedule check
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] takersk commented on issue #29561: CeleryKubernetesExecutor @task(queue="kubernetes")  not work

2023-02-19 Thread via GitHub


takersk commented on issue #29561:
URL: https://github.com/apache/airflow/issues/29561#issuecomment-1436153289

   @hussein-awala 
   
   yes!!
   
   ```
   import os
   from datetime import datetime, timedelta
   
   from airflow import DAG
   
   from common.enum.enum import DagEnum, DatahubEnum
   from common.operator.custom_dummy_operator import CustomDummyOperator
   from common.options.trino_options import TrinoStringOptions
   from common.utils.date import KST
   from common.utils.date import (
   execution_date_fmt,
   )
   from common.utils.pod import (
   exec_trino_query_pod,
   )
   from common.utils.watchtower import WatchTower
   from airflow.decorators import task
   
   
   @task(queue="kubernetes")
   def example_task_kubernetes(_dag):
   query_str = "select * from kudu.dw.dim_gift_brand limit 10"
   exec_trino_query_string_task = exec_trino_query_pod(
   dag=_dag,
   task_id="example_task_kubernetes",
   trino_options=TrinoStringOptions(
   query_string=query_str,
   )
   )
   
   return exec_trino_query_string_task
   
   
   def example_task_celery(_dag):
   query_str = "select * from kudu.dw.dim_gift_brand limit 10"
   exec_trino_query_string_task = exec_trino_query_pod(
   dag=_dag,
   task_id="example_task_celery",
   trino_options=TrinoStringOptions(
   query_string=query_str,
   )
   )
   
   return exec_trino_query_string_task
   
   
   default_args = {
   "owner": DagEnum.DE_OWNER.value,
   "depends_on_past": False,
   "start_date": datetime(2022, 10, 6, tzinfo=KST),
   "email_on_failure": False,
   "email_on_retry": False,
   "execution_timeout": 
timedelta(hours=int(DagEnum.EXECUTION_TIMEOUT_HOUR_MAX.value)),
   "retries": int(DagEnum.RETRIES_MID.value),
   # "on_failure_callback": WatchTower().fail,
   "retry_delay": timedelta(minutes=int(DagEnum.RETRY_DELAY_MIN.value)),
   }
   
   with DAG(
   dag_id=os.path.basename(__file__).replace(".py", ""),
   default_args=default_args,
   schedule_interval="*/10 * * * *",
   user_defined_macros={
   "execution_date_fmt": execution_date_fmt,
   },
   concurrency=int(DagEnum.CONCURRENCY_MID.value),
   max_active_runs=int(DagEnum.MAX_ACTIVE_RUNS_MIN.value),
   catchup=False,
   tags=[DatahubEnum.TAG_TRINO.value, DatahubEnum.TAG_SPARK.value, 
DatahubEnum.TAG_DISTCP.value],
   ) as dag:
   # dummy task
   start_task = CustomDummyOperator(task_id="start", dag=dag)
   end_task = CustomDummyOperator(task_id="end", dag=dag)
   
   # tasks
   # start_task >> example_task_kubernetes(dag) >> end_task
   start_task >> example_task_celery(dag) >> end_task
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org