This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new ae4e3a6d fix(server): do not terminate the server when state resource 
is missing (#1669)
ae4e3a6d is described below

commit ae4e3a6d5344d6502c53841f30bc51f8867cf9c9
Author: Piotr Gankiewicz <[email protected]>
AuthorDate: Sat Mar 29 16:46:41 2025 +0100

    fix(server): do not terminate the server when state resource is missing 
(#1669)
    
    When the resource such as stream, topic or partition is removed from
    disk (e.g. accidentally), but not from the server state log, the server
    should not panic.
---
 sdk/src/error.rs                        |  6 ---
 server/src/streaming/streams/storage.rs | 55 ++++++++++++-------------
 server/src/streaming/systems/streams.rs | 44 ++++++++++----------
 server/src/streaming/topics/storage.rs  | 73 +++++++++++++++------------------
 4 files changed, 84 insertions(+), 94 deletions(-)

diff --git a/sdk/src/error.rs b/sdk/src/error.rs
index b769683d..60bd86f4 100644
--- a/sdk/src/error.rs
+++ b/sdk/src/error.rs
@@ -202,12 +202,6 @@ pub enum IggyError {
     InvalidStreamId = 1014,
     #[error("Cannot read streams")]
     CannotReadStreams = 1015,
-    #[error("Missing streams")]
-    MissingStreams = 1016,
-    #[error("Missing topics for stream with ID: {0}")]
-    MissingTopics(u32) = 1017,
-    #[error("Missing partitions for topic with ID: {0} for stream with ID: 
{1}.")]
-    MissingPartitions(u32, u32) = 1018,
     #[error("Max topic size cannot be lower than segment size. Max topic size: 
{0} < segment size: {1}.")]
     InvalidTopicSize(MaxTopicSize, IggyByteSize) = 1019,
     #[error("Cannot create topics directory for stream with ID: {0}, Path: 
{1}")]
diff --git a/server/src/streaming/streams/storage.rs 
b/server/src/streaming/streams/storage.rs
index 76fbb6ec..5e7bc535 100644
--- a/server/src/streaming/streams/storage.rs
+++ b/server/src/streaming/streams/storage.rs
@@ -108,37 +108,36 @@ impl StreamStorage for FileStreamStorage {
                 stream.stream_id
             );
         } else {
-            error!("Topics with IDs: '{missing_ids:?}' for stream with ID: 
'{}' were not found on disk.", stream.stream_id);
-            if !stream.config.recovery.recreate_missing_state {
-                warn!("Recreating missing state in recovery config is 
disabled, missing topics will not be created for stream with ID: '{}'.", 
stream.stream_id);
-                return Err(IggyError::MissingTopics(stream.stream_id));
-            }
-
-            info!(
+            warn!("Topics with IDs: '{missing_ids:?}' for stream with ID: '{}' 
were not found on disk.", stream.stream_id);
+            if stream.config.recovery.recreate_missing_state {
+                info!(
                 "Recreating missing state in recovery config is enabled, 
missing topics will be created for stream with ID: '{}'.",
                 stream.stream_id
             );
-            for topic_id in missing_ids {
-                let topic_state = state.topics.get(&topic_id).unwrap();
-                let topic = Topic::empty(
-                    stream.stream_id,
-                    topic_id,
-                    &topic_state.name,
-                    stream.size_bytes.clone(),
-                    stream.messages_count.clone(),
-                    stream.segments_count.clone(),
-                    stream.config.clone(),
-                    stream.storage.clone(),
-                )
-                .await;
-                topic.persist().await.with_error_context(|error| {
-                    format!("{COMPONENT} (error: {error}) - failed to persist 
topic: {topic}")
-                })?;
-                unloaded_topics.push(topic);
-                info!(
-                    "Created missing topic with ID: '{}', name: {}, for stream 
with ID: '{}'.",
-                    topic_id, &topic_state.name, stream.stream_id
-                );
+                for topic_id in missing_ids {
+                    let topic_state = state.topics.get(&topic_id).unwrap();
+                    let topic = Topic::empty(
+                        stream.stream_id,
+                        topic_id,
+                        &topic_state.name,
+                        stream.size_bytes.clone(),
+                        stream.messages_count.clone(),
+                        stream.segments_count.clone(),
+                        stream.config.clone(),
+                        stream.storage.clone(),
+                    )
+                    .await;
+                    topic.persist().await.with_error_context(|error| {
+                        format!("{COMPONENT} (error: {error}) - failed to 
persist topic: {topic}")
+                    })?;
+                    unloaded_topics.push(topic);
+                    info!(
+                        "Created missing topic with ID: '{}', name: {}, for 
stream with ID: '{}'.",
+                        topic_id, &topic_state.name, stream.stream_id
+                    );
+                }
+            } else {
+                warn!("Recreating missing state in recovery config is 
disabled, missing topics will not be created for stream with ID: '{}'.", 
stream.stream_id);
             }
         }
 
diff --git a/server/src/streaming/systems/streams.rs 
b/server/src/streaming/systems/streams.rs
index 4800ea6c..f919deed 100644
--- a/server/src/streaming/systems/streams.rs
+++ b/server/src/streaming/systems/streams.rs
@@ -85,39 +85,41 @@ impl System {
             .iter()
             .map(|stream| stream.stream_id)
             .collect::<AHashSet<u32>>();
-        let missing_ids = state_stream_ids
+        let mut missing_ids = state_stream_ids
             .difference(&unloaded_stream_ids)
             .copied()
             .collect::<AHashSet<u32>>();
         if missing_ids.is_empty() {
             info!("All streams found on disk were found in state.");
         } else {
-            error!("Streams with IDs: '{missing_ids:?}' were not found on 
disk.");
-            if !self.config.recovery.recreate_missing_state {
+            warn!("Streams with IDs: '{missing_ids:?}' were not found on 
disk.");
+            if self.config.recovery.recreate_missing_state {
+                info!("Recreating missing state in recovery config is enabled, 
missing streams will be created.");
+                for stream_id in missing_ids.iter() {
+                    let stream_id = *stream_id;
+                    let stream_state = streams.iter().find(|s| s.id == 
stream_id).unwrap();
+                    let stream = Stream::create(
+                        stream_id,
+                        &stream_state.name,
+                        self.config.clone(),
+                        self.storage.clone(),
+                    );
+                    stream.persist().await?;
+                    unloaded_streams.push(stream);
+                    info!(
+                        "Missing stream with ID: '{stream_id}', name: {} was 
recreated.",
+                        stream_state.name
+                    );
+                }
+                missing_ids.clear();
+            } else {
                 warn!("Recreating missing state in recovery config is 
disabled, missing streams will not be created.");
-                return Err(IggyError::MissingStreams);
-            }
-
-            info!("Recreating missing state in recovery config is enabled, 
missing streams will be created.");
-            for stream_id in missing_ids {
-                let stream_state = streams.iter().find(|s| s.id == 
stream_id).unwrap();
-                let stream = Stream::create(
-                    stream_id,
-                    &stream_state.name,
-                    self.config.clone(),
-                    self.storage.clone(),
-                );
-                stream.persist().await?;
-                unloaded_streams.push(stream);
-                info!(
-                    "Missing stream with ID: '{stream_id}', name: {} was 
recreated.",
-                    stream_state.name
-                );
             }
         }
 
         let mut streams_states = streams
             .into_iter()
+            .filter(|s| !missing_ids.contains(&s.id))
             .map(|s| (s.id, s))
             .collect::<AHashMap<_, _>>();
         let loaded_streams = RefCell::new(Vec::new());
diff --git a/server/src/streaming/topics/storage.rs 
b/server/src/streaming/topics/storage.rs
index 52595b9b..02fc64c0 100644
--- a/server/src/streaming/topics/storage.rs
+++ b/server/src/streaming/topics/storage.rs
@@ -129,53 +129,48 @@ impl TopicStorage for FileTopicStorage {
                 topic.topic_id, topic.stream_id
             );
         } else {
-            error!(
+            warn!(
                 "Partitions with IDs: '{missing_ids:?}' for topic with ID: 
'{topic_id}' for stream with ID: '{stream_id}' were not found on disk.",
                 topic_id = topic.topic_id, stream_id = topic.stream_id
             );
-            if !topic.config.recovery.recreate_missing_state {
-                warn!(
-                    "Recreating missing state in recovery config is disabled, 
missing partitions will not be created for topic with ID: '{}' for stream with 
ID: '{}'.", topic.topic_id, topic.stream_id);
-                return Err(IggyError::MissingPartitions(
-                    topic.topic_id,
-                    topic.stream_id,
-                ));
-            }
-
-            info!(
-                "Recreating missing state in recovery config is enabled, 
missing partitions will be created for topic with ID: '{}' for stream with ID: 
'{}'.",
-                topic.topic_id, topic.stream_id
-            );
+            if topic.config.recovery.recreate_missing_state {
+                info!(
+                    "Recreating missing state in recovery config is enabled, 
missing partitions will be created for topic with ID: '{}' for stream with ID: 
'{}'.",
+                    topic.topic_id, topic.stream_id
+                );
 
-            for partition_id in missing_ids {
-                let partition_state = 
state.partitions.get(&partition_id).unwrap();
-                let mut partition = Partition::create(
-                    topic.stream_id,
-                    topic.topic_id,
-                    partition_id,
-                    true,
-                    topic.config.clone(),
-                    topic.storage.clone(),
-                    message_expiry,
-                    topic.messages_count_of_parent_stream.clone(),
-                    topic.messages_count.clone(),
-                    topic.size_of_parent_stream.clone(),
-                    topic.size_bytes.clone(),
-                    topic.segments_count_of_parent_stream.clone(),
-                    partition_state.created_at,
-                )
-                .await;
-                partition.persist().await.with_error_context(|error| {
-                    format!(
-                        "{COMPONENT} (error: {error}) - failed to persist 
partition: {partition}"
+                for partition_id in missing_ids {
+                    let partition_state = 
state.partitions.get(&partition_id).unwrap();
+                    let mut partition = Partition::create(
+                        topic.stream_id,
+                        topic.topic_id,
+                        partition_id,
+                        true,
+                        topic.config.clone(),
+                        topic.storage.clone(),
+                        message_expiry,
+                        topic.messages_count_of_parent_stream.clone(),
+                        topic.messages_count.clone(),
+                        topic.size_of_parent_stream.clone(),
+                        topic.size_bytes.clone(),
+                        topic.segments_count_of_parent_stream.clone(),
+                        partition_state.created_at,
                     )
-                })?;
-                partition.segments.clear();
-                unloaded_partitions.push(partition);
-                info!(
+                    .await;
+                    partition.persist().await.with_error_context(|error| {
+                        format!(
+                            "{COMPONENT} (error: {error}) - failed to persist 
partition: {partition}"
+                        )
+                    })?;
+                    partition.segments.clear();
+                    unloaded_partitions.push(partition);
+                    info!(
                     "Created missing partition with ID: '{partition_id}', for 
topic with ID: '{}' for stream with ID: '{}'.",
                     topic.topic_id, topic.stream_id
                 );
+                }
+            } else {
+                warn!("Recreating missing state in recovery config is 
disabled, missing partitions will not be created for topic with ID: '{}' for 
stream with ID: '{}'.", topic.topic_id, topic.stream_id);
             }
         }
 

Reply via email to