This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 44be42fb7 fix(connectors): prevent state loss on source connector
restart (#2743)
44be42fb7 is described below
commit 44be42fb7077f706cf3cc8de1783b856a58fe787
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Feb 16 13:02:58 2026 +0100
fix(connectors): prevent state loss on source connector restart (#2743)
Source handler tasks were fire-and-forget (JoinHandle dropped),
so the tokio runtime could cancel them mid-state-save during
shutdown. FileStateProvider::save() uses a non-atomic truncate
then write sequence - cancellation between the two left the
state file empty. On restart the connector loaded no tracking
offsets and re-polled all rows from the source database.
Retain handler JoinHandles and await them (with timeout) after
dropping flume senders but before shutting down Iggy clients.
Also add sync_all() after write_all() in save() to ensure
data reaches disk before returning.
---
core/connectors/runtime/src/main.rs | 8 +++++++-
core/connectors/runtime/src/source.rs | 10 ++++++++--
core/connectors/runtime/src/state.rs | 5 +++++
3 files changed, 20 insertions(+), 3 deletions(-)
diff --git a/core/connectors/runtime/src/main.rs
b/core/connectors/runtime/src/main.rs
index 72896d9f5..f9b5b3a72 100644
--- a/core/connectors/runtime/src/main.rs
+++ b/core/connectors/runtime/src/main.rs
@@ -220,7 +220,7 @@ async fn main() -> Result<(), RuntimeError> {
let context = Arc::new(context);
api::init(&config.http, context.clone()).await;
- source::handle(source_wrappers, context.clone());
+ let source_handler_tasks = source::handle(source_wrappers,
context.clone());
sink::consume(sink_wrappers, context.clone());
info!("All sources and sinks spawned.");
@@ -252,6 +252,12 @@ async fn main() -> Result<(), RuntimeError> {
}
}
+ // Wait for source handler tasks to drain remaining messages and persist
state
+ // before shutting down the Iggy clients they depend on.
+ for handle in source_handler_tasks {
+ let _ = tokio::time::timeout(std::time::Duration::from_secs(5),
handle).await;
+ }
+
for (key, sink) in sink_with_plugins {
for id in sink.plugin_ids {
info!("Closing sink connector with ID: {id} for plugin: {key}");
diff --git a/core/connectors/runtime/src/source.rs
b/core/connectors/runtime/src/source.rs
index 889932ecb..c6db93683 100644
--- a/core/connectors/runtime/src/source.rs
+++ b/core/connectors/runtime/src/source.rs
@@ -251,7 +251,11 @@ fn get_state_storage(state_path: &str, key: &str) ->
StateStorage {
StateStorage::File(FileStateProvider::new(path))
}
-pub fn handle(sources: Vec<SourceConnectorWrapper>, context:
Arc<RuntimeContext>) {
+pub fn handle(
+ sources: Vec<SourceConnectorWrapper>,
+ context: Arc<RuntimeContext>,
+) -> Vec<tokio::task::JoinHandle<()>> {
+ let mut handler_tasks = Vec::new();
for source in sources {
for plugin in source.plugins {
let plugin_id = plugin.id;
@@ -275,7 +279,7 @@ pub fn handle(sources: Vec<SourceConnectorWrapper>,
context: Arc<RuntimeContext>
let (sender, receiver): (Sender<ProducedMessages>,
Receiver<ProducedMessages>) =
flume::unbounded();
SOURCE_SENDERS.insert(plugin_id, sender);
- tokio::spawn(async move {
+ let handler_task = tokio::spawn(async move {
info!("Source connector with ID: {plugin_id} started.");
let Some(producer) = &plugin.producer else {
error!("Producer not initialized for source connector with
ID: {plugin_id}");
@@ -421,8 +425,10 @@ pub fn handle(sources: Vec<SourceConnectorWrapper>,
context: Arc<RuntimeContext>
)
.await;
});
+ handler_tasks.push(handler_task);
}
}
+ handler_tasks
}
fn process_messages(
diff --git a/core/connectors/runtime/src/state.rs
b/core/connectors/runtime/src/state.rs
index 0b2456e0d..8fe47adc4 100644
--- a/core/connectors/runtime/src/state.rs
+++ b/core/connectors/runtime/src/state.rs
@@ -102,6 +102,11 @@ impl StateProvider for FileStateProvider {
Error::CannotWriteStateFile
})?;
+ file.sync_all().await.map_err(|error| {
+ error!("Cannot sync state file: {}. {error}.", self.path);
+ Error::CannotWriteStateFile
+ })?;
+
debug!("Saved state file: {}", self.path);
Ok(())
}