romanzdk opened a new issue, #35195:
URL: https://github.com/apache/airflow/issues/35195

   ### Apache Airflow version
   
   2.7.2
   
   ### What happened
   
   I have PythonOperator task that uses Paramiko to SSH into some machine, run 
docker container there and stream logs back to airflow. This job usually runs 
about 30-45 minutes. Mostly it works fine like this but sometimes job runs for 
like 10mins and it gets marked as "up_for_retry" even though the logs are still 
coming. This means new task try gets created and it messes up things as two 
consecutive task tries are running at the same time.
   
   ### What you think should happen instead
   
   Task should not be marked as "up_for_retry" when it is still running
   
   ### How to reproduce
   
   use PythonOperator with following function
   
   ```python
   def _run_command(ti, id, command, env_vars = None):
        public_ip = '1.1.1.1'
        keypair_private_key = 'some-key'
   
        key = paramiko.RSAKey.from_private_key(StringIO(keypair_private_key))
   
        ssh_client = paramiko.SSHClient()
        ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        ssh_client.connect(hostname = public_ip, username = 'ubuntu', pkey = 
key)
   
        ssh_transport = ssh_client.get_transport()
        ssh_channel = ssh_transport.open_session()
        ssh_channel.get_pty()
        ssh_f = ssh_channel.makefile()
        command_prefix = ''
        if env_vars:
                for (key, value) in env_vars.items():
                        command_prefix += f'{key}="{value}" '
        logging.info(f'{command=}')
        ssh_channel.exec_command(command_prefix + command)
   
        # get real-time logs
        while True:
                if ssh_channel.exit_status_ready():
                        return_code = ssh_channel.recv_exit_status()
                        logging.info(f'{return_code=}')
                        if return_code > 0:
                                output = ssh_f.read().decode('utf-8')
                                logging.info(output)
                                ssh_transport.close()
                                ssh_client.close()
                                raise Exception('Execution error')
                        break
                if content := ssh_channel.recv(1024).decode('utf-8'):
                        logging.info(content)
   
        output = ssh_f.read().decode('utf-8')
        logging.info(output)
        ssh_transport.close()
        ssh_client.close()
   ```
   
   command should be some long-running thing like e.g. running docker container 
that would output logs
   
   ### Operating System
   
   Debian GNU/Linux 10 (buster)
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-amazon==8.7.1
   apache-airflow-providers-cncf-kubernetes==7.7.0
   apache-airflow-providers-common-sql==1.7.2
   apache-airflow-providers-ftp==3.5.2
   apache-airflow-providers-http==4.5.2
   apache-airflow-providers-imap==3.3.2
   apache-airflow-providers-postgres==5.6.1
   apache-airflow-providers-slack==8.1.0
   apache-airflow-providers-sqlite==3.4.3
   
   ### Deployment
   
   Other Docker-based deployment
   
   ### Deployment details
   
   * On-premise kubernetes cluster v1.26.5
   * LocalExecutor (3 replicas)
   * PostgreSQL 14
   * Python 3.10.12
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to