tusharg1993 opened a new issue, #29684:
URL: https://github.com/apache/airflow/issues/29684

   ### Apache Airflow Provider(s)
   
   apache-flink
   
   ### Versions of Apache Airflow Providers
   
   1.0.0
   
   ### Apache Airflow version
   
   2.4.3
   
   ### Operating System
   
   Not sure
   
   ### Deployment
   
   Microsoft ADF Managed Airflow
   
   ### Deployment details
   
   _No response_
   
   ### What happened
   
   Hello, I am trying to use Flink Airflow operator to submit a Flink job. My 
AKS cluster already has Flink K8s controller installed on it and works 
perfectly using kubectl.
   
   However, while trying to do the same through Airflow results in following 
error
   
   
   ```
   [2023-02-22T05:46:11.473+0000] {flink_kubernetes.py:103} INFO - Creating 
flinkApplication with Context: None and op_context: {'conf': 
<airflow.configuration.AirflowConfigParser object at 0x7f41ac7003d0>, 'dag': 
<DAG: tutorial>, 'dag_run': <DagRun tutorial @ 2023-02-22 
05:41:08.519537+00:00: manual__2023-02-22T05:41:08.519537+00:00, state:running, 
queued_at: 2023-02-22 05:41:08.526958+00:00. externally triggered: True>, 
'data_interval_end': DateTime(2023, 2, 22, 5, 41, 8, 519537, 
tzinfo=Timezone('UTC')), 'data_interval_start': DateTime(2023, 2, 21, 5, 41, 8, 
519537, tzinfo=Timezone('UTC')), 'ds': '2023-02-22', 'ds_nodash': '20230222', 
'execution_date': DateTime(2023, 2, 22, 5, 41, 8, 519537, 
tzinfo=Timezone('UTC')), 'inlets': [], 'logical_date': DateTime(2023, 2, 22, 5, 
41, 8, 519537, tzinfo=Timezone('UTC')), 'macros': <module 'airflow.macros' from 
'/home/airflow/.local/lib/python3.8/site-packages/airflow/macros/__init__.py'>, 
'next_ds': '2023-02-22', 'next_ds_nodash': '20230222', 
 'next_execution_date': DateTime(2023, 2, 22, 5, 41, 8, 519537, 
tzinfo=Timezone('UTC')), 'outlets': [], 'params': {}, 
'prev_data_interval_start_success': DateTime(2023, 2, 21, 5, 32, 5, 541590, 
tzinfo=Timezone('UTC')), 'prev_data_interval_end_success': DateTime(2023, 2, 
22, 5, 32, 5, 541590, tzinfo=Timezone('UTC')), 'prev_ds': '2023-02-22', 
'prev_ds_nodash': '20230222', 'prev_execution_date': DateTime(2023, 2, 22, 5, 
41, 8, 519537, tzinfo=Timezone('UTC')), 'prev_execution_date_success': None, 
'prev_start_date_success': DateTime(2023, 2, 22, 5, 32, 6, 249219, 
tzinfo=Timezone('UTC')), 'run_id': 'manual__2023-02-22T05:41:08.519537+00:00', 
'task': <Task(FlinkKubernetesOperator): sample_flink_task>, 'task_instance': 
<TaskInstance: tutorial.sample_flink_task 
manual__2023-02-22T05:41:08.519537+00:00 [running]>, 'task_instance_key_str': 
'tutorial__sample_flink_task__20230222', 'test_mode': False, 'ti': 
<TaskInstance: tutorial.sample_flink_task 
manual__2023-02-22T05:41:08.519537+00:00 [runnin
 g]>, 'tomorrow_ds': '2023-02-23', 'tomorrow_ds_nodash': '20230223', 
'triggering_dataset_events': <Proxy at 0x7f41973edd80 with factory <function 
TaskInstance.get_template_context.<locals>.get_triggering_events at 
0x7f419749caf0>>, 'ts': '2023-02-22T05:41:08.519537+00:00', 'ts_nodash': 
'20230222T054108', 'ts_nodash_with_tz': '20230222T054108.519537+0000', 'var': 
{'json': None, 'value': None}, 'conn': None, 'yesterday_ds': '2023-02-21', 
'yesterday_ds_nodash': '20230221'}
   [2023-02-22T05:46:11.474+0000] {taskinstance.py:1851} ERROR - Task failed 
with exception
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/apache/flink/operators/flink_kubernetes.py",
 line 107, in execute
       self.hook.custom_object_client.list_cluster_custom_object(
   AttributeError: 'KubernetesHook' object has no attribute 
'custom_object_client'
   ```
   
   ### What you think should happen instead
   
   _No response_
   
   ### How to reproduce
   
   The error can be reproduced by trying to configure a AKS kubernetes 
connection with Airflow and then try to use Flink operator for submitting a 
FlinkDeployment job.
   
   ```
   TEST_VALID_APPLICATION_JSON = """
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkDeployment
   metadata:
     name: basic-example
   spec:
     image: flink:1.16
     flinkVersion: v1_16
     flinkConfiguration:
       taskmanager.numberOfTaskSlots: "2"
     serviceAccount: flink
     jobManager:
       resource:
         memory: "2048m"
         cpu: 1
     taskManager:
       resource:
         memory: "2048m"
         cpu: 1
     job:
       jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
       parallelism: 2
       upgradeMode: stateless
   """
   
   t6 = FlinkKubernetesOperator(
           application_file=TEST_VALID_APPLICATION_JSON,
           in_cluster=False,
           namespace="default",
           kubernetes_conn_id="tgoyal_aks",
           task_id="sample_flink_task",
   )
   ```
   
   
   
   ### Anything else
   
   I validated that the Kubernetes connection is configured correctly by using 
the following operator successfully
   
   ```
   t5 = KubernetesPodOperator(
           kubernetes_conn_id="tgoyal_aks",
           name="hello-dry-run",
           image="debian",
           cmds=["bash", "-cx"],
           arguments=["echo", "10"],
           labels={"foo": "bar"},
           task_id="dry_run_demo",
           in_cluster=False,
           namespace="default"
   )
   ```
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to