kaori-seasons opened a new pull request, #1872:
URL: https://github.com/apache/iggy/pull/1872

   ## Iggy Elasticsearch Connector
   An Elasticsearch connector for Apache Iggy providing bidirectional data 
synchronization capabilities.
   
   ## Features
   - Elasticsearch Sink: Consumes messages from Iggy and writes them to 
Elasticsearch
   - Elasticsearch Source: Reads data from Elasticsearch and sends it to Iggy
   - Batch Processing: Supports batch read/write operations for performance 
optimization
   - Incremental Sync: Timestamp-based incremental data synchronization
   - Error Handling: Comprehensive retry mechanisms and error handling
   - State Management: Supports checkpoint recovery and state persistence
   ## Quick Start
   ### Installation
   cargo add iggy-elasticsearch-connector
   Sink Usage Example
   ```
   use iggy_elasticsearch_connector::{ElasticsearchSink, 
ElasticsearchSinkConfig};  
   use iggy::prelude::*;  
   use std::time::Duration;  
     
   #[tokio::main]  
   async fn main() -> Result<(), Box<dyn std::error::Error>> {  
       // Create Iggy client  
       let iggy_client = 
IggyClient::from_connection_string("iggy://iggy:iggy@localhost:8090")?;  
         
       // Configure Elasticsearch Sink  
       let sink_config = ElasticsearchSinkConfig {  
           elasticsearch_url: "http://localhost:9200".to_string(),  
           index_pattern: "iggy-messages-{date}".to_string(),  
           batch_size: 1000,  
           flush_interval: Duration::from_secs(5),  
           retry_attempts: 3,  
           retry_interval: Duration::from_secs(1),  
           mapping_config: Some(r#"{  
               "properties": {  
                   "@timestamp": {"type": "date"},  
                   "message": {"type": "text"},  
                   "level": {"type": "keyword"}  
               }  
           }"#.to_string()),  
           document_id_field: None,  
           timestamp_field: Some("@timestamp".to_string()),  
           enable_dlq: false,  
           dlq_topic: None,  
       };  
         
       // Create and start sink  
       let mut sink = ElasticsearchSink::new(  
           iggy_client,  
           "logs",  
           "application-logs",   
           "elasticsearch-sink-group",  
           sink_config,  
       ).await?;  
         
       sink.start().await?;  
       Ok(())  
   }
   ```
   Source Usage Example
   ```
   use iggy_elasticsearch_connector::{ElasticsearchSource, 
ElasticsearchSourceConfig};  
   use iggy::prelude::*;  
   use std::time::Duration;  
     
   #[tokio::main]  
   async fn main() -> Result<(), Box<dyn std::error::Error>> {  
       // Create Iggy client  
       let iggy_client = 
IggyClient::from_connection_string("iggy://iggy:iggy@localhost:8090")?;  
         
       // Configure Elasticsearch Source  
       let source_config = ElasticsearchSourceConfig {  
           elasticsearch_url: "http://localhost:9200".to_string(),  
           query: r#"{  
               "bool": {  
                   "must": [  
                       {"range": {"@timestamp": {"gte": "now-1h"}}}  
                   ]  
               }  
           }"#.to_string(),  
           index_pattern: "logs-*".to_string(),  
           poll_interval: Duration::from_secs(30),  
           batch_size: 500,  
           timestamp_field: Some("@timestamp".to_string()),  
           scroll_timeout: Duration::from_secs(60),  
           max_docs_per_poll: Some(5000),  
           state_file_path: 
Some("./elasticsearch_source_state.json".to_string()),  
           sort_field: Some("@timestamp".to_string()),  
       };  
         
       // Create and start source  
       let mut source = ElasticsearchSource::new(  
           iggy_client,  
           "external-logs",  
           "imported-logs",  
           source_config,  
       ).await?;  
         
       source.start().await?;  
       Ok(())  
   }
   ```
   Configuration
   Core Configuration Structures
   ```
   // Sink Configuration  
   pub struct ElasticsearchSinkConfig {  
       pub elasticsearch_url: String,  
       pub index_pattern: String,  
       pub batch_size: usize,  
       pub flush_interval: Duration,  
       pub retry_attempts: u32,  
       pub retry_interval: Duration,  
       pub mapping_config: Option<String>,  
       pub document_id_field: Option<String>,  
       pub timestamp_field: Option<String>,  
       pub enable_dlq: bool,  
       pub dlq_topic: Option<String>,  
   }  
     
   // Source Configuration  
   pub struct ElasticsearchSourceConfig {  
       pub elasticsearch_url: String,  
       pub query: String,  
       pub index_pattern: String,  
       pub poll_interval: Duration,  
       pub batch_size: usize,  
       pub timestamp_field: Option<String>,  
       pub scroll_timeout: Duration,  
       pub max_docs_per_poll: Option<usize>,  
       pub state_file_path: Option<String>,  
       pub sort_field: Option<String>,  
   }
   ```
   Deployment
   Docker Compose
   
   ```
   version: '3.8'  
     
   services:  
     elasticsearch:  
       image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0  
       environment:  
         - discovery.type=single-node  
         - xpack.security.enabled=false  
       ports:  
         - "9200:9200"  
     
     iggy:  
       image: apache/iggy:latest  
       ports:  
         - "8090:8090"  
         - "3000:3000"  
     
     elasticsearch-sink:  
       build: .  
       depends_on:  
         - elasticsearch  
         - iggy  
       environment:  
         - RUST_LOG=info  
         - IGGY_URL=iggy://iggy:iggy@iggy:8090  
         - ELASTICSEARCH_URL=http://elasticsearch:9200  
     
     elasticsearch-source:  
       build: .  
       depends_on:  
         - elasticsearch  
         - iggy  
       environment:  
         - RUST_LOG=info  
         - IGGY_URL=iggy://iggy:iggy@iggy:8090  
         - ELASTICSEARCH_URL=http://elasticsearch:9200
   ```
   Quick deployment:
   ```
   docker-compose up -d
   ```
   ## Commit Message Rules
   
   - **Description**: Provide a concise description of the changes.
   - **Style**: Use an imperative style in the subject line (e.g., "Fix bug" 
rather than "Fixed bug" or "Fixes bug").
   - **Brevity**: Keep the subject line under 80 characters.
   - **Rationale**: Explain the 'why' and 'what' of your changes in the summary.
   - **Details**: Use the body to elaborate on the 'how' of your changes.
   - **Context**: Include 'before' and 'after' scenarios if applicable.
   - **References**: Link any relevant issues or PRs in the message body.
   
   **Remember:** Your contribution is essential to the success of `iggy`. 
Please ensure that your PR conforms to these guidelines for a swift and smooth 
integration process.
   
   Thank you!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to