davidzollo opened a new issue, #10357:
URL: https://github.com/apache/seatunnel/issues/10357
## Background
Azure Cosmos DB is Microsoft's globally distributed, multi-model NoSQL
database service designed for mission-critical applications. It offers 99.999%
SLA, single-digit millisecond latency, and support for multiple API models
(SQL, MongoDB, Cassandra, Gremlin, Table).
Currently, SeaTunnel lacks native support for Azure Cosmos DB as a data
source, limiting its ability to integrate with Azure cloud-native applications
and globally distributed systems.
## Motivation
- **Azure Cloud Leadership**: Cosmos DB is Microsoft Azure's flagship NoSQL
database service
- **Multi-Model Support**: Single database supporting SQL, document,
key-value, graph, and column-family data models
- **Global Distribution**: Native multi-region replication with automatic
failover
- **Mission-Critical Workloads**: Used extensively in finance, gaming, IoT,
and retail industries
- **No JDBC Support**: Requires native SDK for optimal performance and
feature access
## Proposed Solution
Implement a dedicated Azure Cosmos DB Source connector supporting multiple
API modes:
### Core Features
1. **Multi-API Support**
- **SQL API** (Core/SQL): Primary API with SQL-like queries (Priority 1)
- **MongoDB API**: MongoDB wire protocol compatibility (Priority 2)
- **Cassandra API**: CQL query support (Priority 3)
- **Gremlin API**: Graph traversal queries (Future)
- **Table API**: Azure Table Storage compatibility (Future)
2. **Data Extraction Modes**
- **Full Snapshot**: Complete container/collection scan
- **Incremental**: Extract based on `_ts` (timestamp) field
- **Change Feed**: Real-time CDC using Cosmos DB Change Feed
- **Query-Based**: Custom SQL/MongoDB/CQL queries
3. **Performance Optimization**
- Partition key-aware parallelism
- Configurable request units (RU) throttling
- Cross-partition query optimization
- Connection pooling and retry policies
4. **Authentication**
- Primary/secondary keys
- Azure AD authentication (Service Principal, Managed Identity)
- Resource tokens for granular access control
### Configuration Example (SQL API)
```hocon
source {
CosmosDB {
# Connection
endpoint = "https://myaccount.documents.azure.com:443/"
auth_type = "master_key" # or "aad", "resource_token"
master_key = "your-primary-key"
# Database and container
database_name = "myDatabase"
container_name = "myContainer"
# Extraction mode
extraction_mode = "incremental" # or "full", "change_feed", "query"
# Query configuration
query = "SELECT * FROM c WHERE c._ts > @lastTimestamp"
parameters = [
{ name = "@lastTimestamp", value = "1640995200" }
]
# Incremental configuration
incremental_field = "_ts"
start_timestamp = 1640995200
# Performance tuning
max_degree_of_parallelism = 4
max_buffered_item_count = 1000
preferred_regions = ["East US", "West US"]
# Request unit management
max_ru_per_second = 1000
enable_adaptive_throttling = true
# Schema options
flatten_nested_fields = false
include_metadata_fields = true # _rid, _self, _etag, _ts
}
}
```
### Change Feed CDC Example
```hocon
source {
CosmosDB {
endpoint = "https://myaccount.documents.azure.com:443/"
master_key = "your-primary-key"
database_name = "myDatabase"
container_name = "myContainer"
extraction_mode = "change_feed"
# Change feed configuration
change_feed_mode = "incremental" # or "all_versions", "latest_version"
start_from_beginning = false
lease_container_name = "leases" # For distributed processing
# Checkpoint configuration
checkpoint_interval_ms = 5000
max_items_per_batch = 100
}
}
```
### MongoDB API Example
```hocon
source {
CosmosDB {
api_type = "mongodb"
connection_string =
"mongodb://myaccount:[email protected]:10255/?ssl=true"
database_name = "myDatabase"
collection_name = "myCollection"
# MongoDB query
filter = "{ \"status\": \"active\", \"created_at\": { \"$gte\":
ISODate(\"2024-01-01\") } }"
projection = "{ \"_id\": 1, \"name\": 1, \"email\": 1 }"
extraction_mode = "incremental"
incremental_field = "updated_at"
}
}
```
## Expected Benefits
1. **Azure Ecosystem Integration**: Enable seamless data pipelines for
Azure-native applications
2. **Global Scale**: Support for globally distributed applications with
multi-region read/write
3. **Real-Time Analytics**: Change Feed enables near-real-time data
synchronization to data warehouses
4. **Multi-Model Flexibility**: Single connector supporting multiple data
models and query languages
5. **Cost Optimization**: Efficient RU consumption through optimized queries
and parallelism
## Technical Considerations
### Dependencies
- **SQL API**: `azure-cosmos` Java SDK (v4.x)
- **MongoDB API**: MongoDB Java Driver with Cosmos DB connection string
- **Cassandra API**: DataStax Cassandra driver
- **Authentication**: `azure-identity` for Azure AD authentication
### Performance & Cost
- **RU Management**: Implement request unit throttling to avoid overspending
- **Partition Awareness**: Use partition key for parallel processing
- **Query Optimization**: Minimize cross-partition queries
- **Connection Pooling**: Reuse connections across tasks
### Error Handling
- **Throttling (429)**: Exponential backoff with configurable retry policy
- **Transient Failures**: Automatic retry for network errors
- **Partial Failures**: Dead letter queue for failed records
- **Session Consistency**: Handle session token management
### Testing
- **Cosmos DB Emulator**: Local testing without Azure costs
- **Integration Tests**: Use Azure Cosmos DB test accounts
- **Performance Tests**: Validate RU consumption and throughput
## Implementation Phases
### Phase 1: SQL API Support (MVP)
- Master key authentication
- Full snapshot extraction
- Basic query support
- Partition-aware parallel reads
### Phase 2: Incremental & Change Feed
- Timestamp-based incremental extraction
- Change Feed CDC with lease management
- Checkpoint and state management
### Phase 3: Multi-API Support
- MongoDB API integration
- Cassandra API integration
- Azure AD authentication
### Phase 4: Advanced Features
- Cross-region failover
- Analytical store integration (HTAP)
- Vector search support (for AI workloads)
## References
- [Azure Cosmos DB Java
SDK](https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/sdk-java-v4)
- [Change Feed in Azure Cosmos
DB](https://learn.microsoft.com/en-us/azure/cosmos-db/change-feed)
- [Partitioning in Azure Cosmos
DB](https://learn.microsoft.com/en-us/azure/cosmos-db/partitioning-overview)
- [Request Units in Azure Cosmos
DB](https://learn.microsoft.com/en-us/azure/cosmos-db/request-units)
## Community Impact
This connector will:
- Position SeaTunnel as a key player in Azure data integration scenarios
- Enable enterprises to build modern data platforms on Azure
- Support hybrid and multi-cloud architectures (Azure + AWS/GCP)
- Attract IoT, gaming, and financial services companies using Cosmos DB
---
**Priority**: High
**Estimated Effort**: Medium-High
**Target Release**: 2.3.15 or 3.0.0
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]