bentorb opened a new pull request, #68946:
URL: https://github.com/apache/airflow/pull/68946

   ### Description
   
   This PR introduces a new ```S3CopyPrefixOperator``` that enables copying all 
S3 objects under a specified prefix from a source bucket to a destination 
bucket. This operator fills a gap in the current S3 operators by providing 
prefix-based bulk copy functionality.
   
   ### What does this operator do?
   
   • Copies all objects matching a specified prefix from source to destination 
S3 bucket
   • Supports cross-bucket copying
   • Provides configurable error handling (continue on failure or stop on first 
error)
   • Integrates with OpenLineage for data lineage tracking
   • Supports Airflow templating for dynamic parameter values
   
   ### Why is this needed?
   
   Currently, Airflow's S3 operators allow copying individual objects. For use 
cases involving copying entire "directory" structures or large numbers of 
objects sharing a common prefix, users must implement custom solutions or use 
multiple operator instances. 
   This operator provides a native, efficient solution for prefix-based bulk 
operations.
   
   ### Key Features
   
   • **Error Handling**: Configurable ```continue_on_failure``` parameter for 
resilient operations
   • **Template Fields**: All dynamic parameters support Jinja templating
   • **OpenLineage Integration**: Automatic data lineage tracking for copied 
objects
   • **Standard Exception Handling**: Uses RuntimeError instead of 
AirflowException per project conventions
   
   ### Testing
   
   Includes **10 new unit tests** (11 test cases) covering:
   - Basic prefix copying, same-bucket copying, and empty-prefix handling
   - Full `s3://` URL inputs and invalid bucket/URL combinations
   - Error scenarios and `continue_on_failure` behaviour
   - OpenLineage integration (bucket+prefix and `s3://` URL variants)
   - Template field validation
   
   • **System test integration** in 
```providers/amazon/tests/system/amazon/aws/example_s3.py```
   • **All tests pass** in Breeze testing environment
   
   ### Usage Example
   
   ```python
   copy_prefix = S3CopyPrefixOperator(
       task_id='copy_data_files',                                               
                                                                                
                                                                                
                                        
       source_bucket_name='source-bucket',                                      
                                                                                
                                                                                
                                        
       source_bucket_prefix='data/2023/',                                       
                                                                                
                                                                                
                                           
       dest_bucket_name='dest-bucket',                                          
                                                                                
                                                                                
                                        
       dest_bucket_prefix='archive/data/2023/',                                 
                                                                                
                                                                                
                                           
       continue_on_failure=True,                                                
                                                                                
                                                                                
                                        
       aws_conn_id='aws_default'                                                
                                                                                
                                                                                
                                        
   )                                                                            
                                                                                
                                                                                
                                        
   ```                                                                          
                                                                                
                                                                                
                                      
   
   ### Checklist
   
   • [x] Tests included (10 comprehensive unit tests)
   • [x] Documentation updated
   • [x] Code follows project coding standards
   • [x] All static code checks pass
   • [x] Apache license headers added
   • [x] PR is focused on single feature
   • [x] Local tests pass
   • [x] No unrelated changes included
   
   
   ##### Was generative AI tooling used to co-author this PR?
   
   - [x] Yes
   
   Generated-by:  Claude Code (claude-sonnet-4-6) following [the 
guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to