ignaciojcano opened a new issue, #49801:
URL: https://github.com/apache/airflow/issues/49801

   ### Apache Airflow Provider(s)
   
   google
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-google ==15.1.0
   apache-airflow-providers-sftp ==5.2.1
   
   
   
   ### Apache Airflow version
   
   2.10.2
   
   ### Operating System
   
   google cloud composer
   
   ### Deployment
   
   Google Cloud Composer
   
   ### Deployment details
   
   Composer Version 2.11.5
   Airflow Version 2.10.2
   
   ### What happened
   
   We are looking to use the SFTPToGCSOperator operator to stream files from a 
SFTP to a GCS bucket. Saw that the streaming for single file was added in 
https://github.com/apache/airflow/pull/48107 so tried to use it.
   
   The task was configured as shown below, but when executing got the error 
`Failed to execute job **** for task ***** (expected str, bytes or os.PathLike 
object, not BlobWriter; 33609)`
   
   ```
   fetch_task = SFTPToGCSOperator(
           task_id="task_id",
           sftp_conn_id="sftp_conn",
           gcp_conn_id="gcp_conn",
           source_path="/file.txt",
           destination_bucket="test_bucket",
           destination_path="path/to/file.txt",
           use_stream=True,
       )
   ```
   
   Looking at the code that  setups the stream, and the code in the sftp hook, 
it is basically expecting `write_stream` to be an instance of `BytesIO`, it is 
instead a `BlobWriter`, so it ends up calling `sftp.get` instead of 
`sftp.getfo)` 
   
   SFTP Hook retrieve_file: 
https://github.com/apache/airflow/blob/providers-sftp/5.1.2/providers/sftp/src/airflow/providers/sftp/hooks/sftp.py#L277
   
   Storage "Blob": 
https://github.com/googleapis/python-storage/blob/main/google/cloud/storage/blob.py#L4098
   
   ### What you think should happen instead
   
   We should pass a `BytesIO` instance which represents the bucket object
   
   ### How to reproduce
   
   This is a pretty plan configuration of the operator, I cannot provide a 
bucket nor sftp server to test though.
   
   ```
   fetch_task = SFTPToGCSOperator(
           task_id="task_id",
           sftp_conn_id="sftp_conn",
           gcp_conn_id="gcp_conn",
           source_path="/file.txt",
           destination_bucket="test_bucket",
           destination_path="path/to/file.txt",
           use_stream=True,
       )
   ```
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to