tusharg1993 opened a new issue, #29684: URL: https://github.com/apache/airflow/issues/29684
### Apache Airflow Provider(s) apache-flink ### Versions of Apache Airflow Providers 1.0.0 ### Apache Airflow version 2.4.3 ### Operating System Not sure ### Deployment Microsoft ADF Managed Airflow ### Deployment details _No response_ ### What happened Hello, I am trying to use Flink Airflow operator to submit a Flink job. My AKS cluster already has Flink K8s controller installed on it and works perfectly using kubectl. However, while trying to do the same through Airflow results in following error ``` [2023-02-22T05:46:11.473+0000] {flink_kubernetes.py:103} INFO - Creating flinkApplication with Context: None and op_context: {'conf': <airflow.configuration.AirflowConfigParser object at 0x7f41ac7003d0>, 'dag': <DAG: tutorial>, 'dag_run': <DagRun tutorial @ 2023-02-22 05:41:08.519537+00:00: manual__2023-02-22T05:41:08.519537+00:00, state:running, queued_at: 2023-02-22 05:41:08.526958+00:00. externally triggered: True>, 'data_interval_end': DateTime(2023, 2, 22, 5, 41, 8, 519537, tzinfo=Timezone('UTC')), 'data_interval_start': DateTime(2023, 2, 21, 5, 41, 8, 519537, tzinfo=Timezone('UTC')), 'ds': '2023-02-22', 'ds_nodash': '20230222', 'execution_date': DateTime(2023, 2, 22, 5, 41, 8, 519537, tzinfo=Timezone('UTC')), 'inlets': [], 'logical_date': DateTime(2023, 2, 22, 5, 41, 8, 519537, tzinfo=Timezone('UTC')), 'macros': <module 'airflow.macros' from '/home/airflow/.local/lib/python3.8/site-packages/airflow/macros/__init__.py'>, 'next_ds': '2023-02-22', 'next_ds_nodash': '20230222', 'next_execution_date': DateTime(2023, 2, 22, 5, 41, 8, 519537, tzinfo=Timezone('UTC')), 'outlets': [], 'params': {}, 'prev_data_interval_start_success': DateTime(2023, 2, 21, 5, 32, 5, 541590, tzinfo=Timezone('UTC')), 'prev_data_interval_end_success': DateTime(2023, 2, 22, 5, 32, 5, 541590, tzinfo=Timezone('UTC')), 'prev_ds': '2023-02-22', 'prev_ds_nodash': '20230222', 'prev_execution_date': DateTime(2023, 2, 22, 5, 41, 8, 519537, tzinfo=Timezone('UTC')), 'prev_execution_date_success': None, 'prev_start_date_success': DateTime(2023, 2, 22, 5, 32, 6, 249219, tzinfo=Timezone('UTC')), 'run_id': 'manual__2023-02-22T05:41:08.519537+00:00', 'task': <Task(FlinkKubernetesOperator): sample_flink_task>, 'task_instance': <TaskInstance: tutorial.sample_flink_task manual__2023-02-22T05:41:08.519537+00:00 [running]>, 'task_instance_key_str': 'tutorial__sample_flink_task__20230222', 'test_mode': False, 'ti': <TaskInstance: tutorial.sample_flink_task manual__2023-02-22T05:41:08.519537+00:00 [runnin g]>, 'tomorrow_ds': '2023-02-23', 'tomorrow_ds_nodash': '20230223', 'triggering_dataset_events': <Proxy at 0x7f41973edd80 with factory <function TaskInstance.get_template_context.<locals>.get_triggering_events at 0x7f419749caf0>>, 'ts': '2023-02-22T05:41:08.519537+00:00', 'ts_nodash': '20230222T054108', 'ts_nodash_with_tz': '20230222T054108.519537+0000', 'var': {'json': None, 'value': None}, 'conn': None, 'yesterday_ds': '2023-02-21', 'yesterday_ds_nodash': '20230221'} [2023-02-22T05:46:11.474+0000] {taskinstance.py:1851} ERROR - Task failed with exception Traceback (most recent call last): File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/apache/flink/operators/flink_kubernetes.py", line 107, in execute self.hook.custom_object_client.list_cluster_custom_object( AttributeError: 'KubernetesHook' object has no attribute 'custom_object_client' ``` ### What you think should happen instead _No response_ ### How to reproduce The error can be reproduced by trying to configure a AKS kubernetes connection with Airflow and then try to use Flink operator for submitting a FlinkDeployment job. ``` TEST_VALID_APPLICATION_JSON = """ apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example spec: image: flink:1.16 flinkVersion: v1_16 flinkConfiguration: taskmanager.numberOfTaskSlots: "2" serviceAccount: flink jobManager: resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 job: jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar parallelism: 2 upgradeMode: stateless """ t6 = FlinkKubernetesOperator( application_file=TEST_VALID_APPLICATION_JSON, in_cluster=False, namespace="default", kubernetes_conn_id="tgoyal_aks", task_id="sample_flink_task", ) ``` ### Anything else I validated that the Kubernetes connection is configured correctly by using the following operator successfully ``` t5 = KubernetesPodOperator( kubernetes_conn_id="tgoyal_aks", name="hello-dry-run", image="debian", cmds=["bash", "-cx"], arguments=["echo", "10"], labels={"foo": "bar"}, task_id="dry_run_demo", in_cluster=False, namespace="default" ) ``` ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org