This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 44be42fb7 fix(connectors): prevent state loss on source connector 
restart (#2743)
44be42fb7 is described below

commit 44be42fb7077f706cf3cc8de1783b856a58fe787
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Feb 16 13:02:58 2026 +0100

    fix(connectors): prevent state loss on source connector restart (#2743)
    
    Source handler tasks were fire-and-forget (JoinHandle dropped),
    so the tokio runtime could cancel them mid-state-save during
    shutdown. FileStateProvider::save() uses a non-atomic truncate
    then write sequence - cancellation between the two left the
    state file empty. On restart the connector loaded no tracking
    offsets and re-polled all rows from the source database.
    
    Retain handler JoinHandles and await them (with timeout) after
    dropping flume senders but before shutting down Iggy clients.
    Also add sync_all() after write_all() in save() to ensure
    data reaches disk before returning.
---
 core/connectors/runtime/src/main.rs   |  8 +++++++-
 core/connectors/runtime/src/source.rs | 10 ++++++++--
 core/connectors/runtime/src/state.rs  |  5 +++++
 3 files changed, 20 insertions(+), 3 deletions(-)

diff --git a/core/connectors/runtime/src/main.rs 
b/core/connectors/runtime/src/main.rs
index 72896d9f5..f9b5b3a72 100644
--- a/core/connectors/runtime/src/main.rs
+++ b/core/connectors/runtime/src/main.rs
@@ -220,7 +220,7 @@ async fn main() -> Result<(), RuntimeError> {
     let context = Arc::new(context);
     api::init(&config.http, context.clone()).await;
 
-    source::handle(source_wrappers, context.clone());
+    let source_handler_tasks = source::handle(source_wrappers, 
context.clone());
     sink::consume(sink_wrappers, context.clone());
     info!("All sources and sinks spawned.");
 
@@ -252,6 +252,12 @@ async fn main() -> Result<(), RuntimeError> {
         }
     }
 
+    // Wait for source handler tasks to drain remaining messages and persist 
state
+    // before shutting down the Iggy clients they depend on.
+    for handle in source_handler_tasks {
+        let _ = tokio::time::timeout(std::time::Duration::from_secs(5), 
handle).await;
+    }
+
     for (key, sink) in sink_with_plugins {
         for id in sink.plugin_ids {
             info!("Closing sink connector with ID: {id} for plugin: {key}");
diff --git a/core/connectors/runtime/src/source.rs 
b/core/connectors/runtime/src/source.rs
index 889932ecb..c6db93683 100644
--- a/core/connectors/runtime/src/source.rs
+++ b/core/connectors/runtime/src/source.rs
@@ -251,7 +251,11 @@ fn get_state_storage(state_path: &str, key: &str) -> 
StateStorage {
     StateStorage::File(FileStateProvider::new(path))
 }
 
-pub fn handle(sources: Vec<SourceConnectorWrapper>, context: 
Arc<RuntimeContext>) {
+pub fn handle(
+    sources: Vec<SourceConnectorWrapper>,
+    context: Arc<RuntimeContext>,
+) -> Vec<tokio::task::JoinHandle<()>> {
+    let mut handler_tasks = Vec::new();
     for source in sources {
         for plugin in source.plugins {
             let plugin_id = plugin.id;
@@ -275,7 +279,7 @@ pub fn handle(sources: Vec<SourceConnectorWrapper>, 
context: Arc<RuntimeContext>
             let (sender, receiver): (Sender<ProducedMessages>, 
Receiver<ProducedMessages>) =
                 flume::unbounded();
             SOURCE_SENDERS.insert(plugin_id, sender);
-            tokio::spawn(async move {
+            let handler_task = tokio::spawn(async move {
                 info!("Source connector with ID: {plugin_id} started.");
                 let Some(producer) = &plugin.producer else {
                     error!("Producer not initialized for source connector with 
ID: {plugin_id}");
@@ -421,8 +425,10 @@ pub fn handle(sources: Vec<SourceConnectorWrapper>, 
context: Arc<RuntimeContext>
                     )
                     .await;
             });
+            handler_tasks.push(handler_task);
         }
     }
+    handler_tasks
 }
 
 fn process_messages(
diff --git a/core/connectors/runtime/src/state.rs 
b/core/connectors/runtime/src/state.rs
index 0b2456e0d..8fe47adc4 100644
--- a/core/connectors/runtime/src/state.rs
+++ b/core/connectors/runtime/src/state.rs
@@ -102,6 +102,11 @@ impl StateProvider for FileStateProvider {
             Error::CannotWriteStateFile
         })?;
 
+        file.sync_all().await.map_err(|error| {
+            error!("Cannot sync state file: {}. {error}.", self.path);
+            Error::CannotWriteStateFile
+        })?;
+
         debug!("Saved state file: {}", self.path);
         Ok(())
     }

Reply via email to