This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new ae4e3a6d fix(server): do not terminate the server when state resource
is missing (#1669)
ae4e3a6d is described below
commit ae4e3a6d5344d6502c53841f30bc51f8867cf9c9
Author: Piotr Gankiewicz <[email protected]>
AuthorDate: Sat Mar 29 16:46:41 2025 +0100
fix(server): do not terminate the server when state resource is missing
(#1669)
When the resource such as stream, topic or partition is removed from
disk (e.g. accidentally), but not from the server state log, the server
should not panic.
---
sdk/src/error.rs | 6 ---
server/src/streaming/streams/storage.rs | 55 ++++++++++++-------------
server/src/streaming/systems/streams.rs | 44 ++++++++++----------
server/src/streaming/topics/storage.rs | 73 +++++++++++++++------------------
4 files changed, 84 insertions(+), 94 deletions(-)
diff --git a/sdk/src/error.rs b/sdk/src/error.rs
index b769683d..60bd86f4 100644
--- a/sdk/src/error.rs
+++ b/sdk/src/error.rs
@@ -202,12 +202,6 @@ pub enum IggyError {
InvalidStreamId = 1014,
#[error("Cannot read streams")]
CannotReadStreams = 1015,
- #[error("Missing streams")]
- MissingStreams = 1016,
- #[error("Missing topics for stream with ID: {0}")]
- MissingTopics(u32) = 1017,
- #[error("Missing partitions for topic with ID: {0} for stream with ID:
{1}.")]
- MissingPartitions(u32, u32) = 1018,
#[error("Max topic size cannot be lower than segment size. Max topic size:
{0} < segment size: {1}.")]
InvalidTopicSize(MaxTopicSize, IggyByteSize) = 1019,
#[error("Cannot create topics directory for stream with ID: {0}, Path:
{1}")]
diff --git a/server/src/streaming/streams/storage.rs
b/server/src/streaming/streams/storage.rs
index 76fbb6ec..5e7bc535 100644
--- a/server/src/streaming/streams/storage.rs
+++ b/server/src/streaming/streams/storage.rs
@@ -108,37 +108,36 @@ impl StreamStorage for FileStreamStorage {
stream.stream_id
);
} else {
- error!("Topics with IDs: '{missing_ids:?}' for stream with ID:
'{}' were not found on disk.", stream.stream_id);
- if !stream.config.recovery.recreate_missing_state {
- warn!("Recreating missing state in recovery config is
disabled, missing topics will not be created for stream with ID: '{}'.",
stream.stream_id);
- return Err(IggyError::MissingTopics(stream.stream_id));
- }
-
- info!(
+ warn!("Topics with IDs: '{missing_ids:?}' for stream with ID: '{}'
were not found on disk.", stream.stream_id);
+ if stream.config.recovery.recreate_missing_state {
+ info!(
"Recreating missing state in recovery config is enabled,
missing topics will be created for stream with ID: '{}'.",
stream.stream_id
);
- for topic_id in missing_ids {
- let topic_state = state.topics.get(&topic_id).unwrap();
- let topic = Topic::empty(
- stream.stream_id,
- topic_id,
- &topic_state.name,
- stream.size_bytes.clone(),
- stream.messages_count.clone(),
- stream.segments_count.clone(),
- stream.config.clone(),
- stream.storage.clone(),
- )
- .await;
- topic.persist().await.with_error_context(|error| {
- format!("{COMPONENT} (error: {error}) - failed to persist
topic: {topic}")
- })?;
- unloaded_topics.push(topic);
- info!(
- "Created missing topic with ID: '{}', name: {}, for stream
with ID: '{}'.",
- topic_id, &topic_state.name, stream.stream_id
- );
+ for topic_id in missing_ids {
+ let topic_state = state.topics.get(&topic_id).unwrap();
+ let topic = Topic::empty(
+ stream.stream_id,
+ topic_id,
+ &topic_state.name,
+ stream.size_bytes.clone(),
+ stream.messages_count.clone(),
+ stream.segments_count.clone(),
+ stream.config.clone(),
+ stream.storage.clone(),
+ )
+ .await;
+ topic.persist().await.with_error_context(|error| {
+ format!("{COMPONENT} (error: {error}) - failed to
persist topic: {topic}")
+ })?;
+ unloaded_topics.push(topic);
+ info!(
+ "Created missing topic with ID: '{}', name: {}, for
stream with ID: '{}'.",
+ topic_id, &topic_state.name, stream.stream_id
+ );
+ }
+ } else {
+ warn!("Recreating missing state in recovery config is
disabled, missing topics will not be created for stream with ID: '{}'.",
stream.stream_id);
}
}
diff --git a/server/src/streaming/systems/streams.rs
b/server/src/streaming/systems/streams.rs
index 4800ea6c..f919deed 100644
--- a/server/src/streaming/systems/streams.rs
+++ b/server/src/streaming/systems/streams.rs
@@ -85,39 +85,41 @@ impl System {
.iter()
.map(|stream| stream.stream_id)
.collect::<AHashSet<u32>>();
- let missing_ids = state_stream_ids
+ let mut missing_ids = state_stream_ids
.difference(&unloaded_stream_ids)
.copied()
.collect::<AHashSet<u32>>();
if missing_ids.is_empty() {
info!("All streams found on disk were found in state.");
} else {
- error!("Streams with IDs: '{missing_ids:?}' were not found on
disk.");
- if !self.config.recovery.recreate_missing_state {
+ warn!("Streams with IDs: '{missing_ids:?}' were not found on
disk.");
+ if self.config.recovery.recreate_missing_state {
+ info!("Recreating missing state in recovery config is enabled,
missing streams will be created.");
+ for stream_id in missing_ids.iter() {
+ let stream_id = *stream_id;
+ let stream_state = streams.iter().find(|s| s.id ==
stream_id).unwrap();
+ let stream = Stream::create(
+ stream_id,
+ &stream_state.name,
+ self.config.clone(),
+ self.storage.clone(),
+ );
+ stream.persist().await?;
+ unloaded_streams.push(stream);
+ info!(
+ "Missing stream with ID: '{stream_id}', name: {} was
recreated.",
+ stream_state.name
+ );
+ }
+ missing_ids.clear();
+ } else {
warn!("Recreating missing state in recovery config is
disabled, missing streams will not be created.");
- return Err(IggyError::MissingStreams);
- }
-
- info!("Recreating missing state in recovery config is enabled,
missing streams will be created.");
- for stream_id in missing_ids {
- let stream_state = streams.iter().find(|s| s.id ==
stream_id).unwrap();
- let stream = Stream::create(
- stream_id,
- &stream_state.name,
- self.config.clone(),
- self.storage.clone(),
- );
- stream.persist().await?;
- unloaded_streams.push(stream);
- info!(
- "Missing stream with ID: '{stream_id}', name: {} was
recreated.",
- stream_state.name
- );
}
}
let mut streams_states = streams
.into_iter()
+ .filter(|s| !missing_ids.contains(&s.id))
.map(|s| (s.id, s))
.collect::<AHashMap<_, _>>();
let loaded_streams = RefCell::new(Vec::new());
diff --git a/server/src/streaming/topics/storage.rs
b/server/src/streaming/topics/storage.rs
index 52595b9b..02fc64c0 100644
--- a/server/src/streaming/topics/storage.rs
+++ b/server/src/streaming/topics/storage.rs
@@ -129,53 +129,48 @@ impl TopicStorage for FileTopicStorage {
topic.topic_id, topic.stream_id
);
} else {
- error!(
+ warn!(
"Partitions with IDs: '{missing_ids:?}' for topic with ID:
'{topic_id}' for stream with ID: '{stream_id}' were not found on disk.",
topic_id = topic.topic_id, stream_id = topic.stream_id
);
- if !topic.config.recovery.recreate_missing_state {
- warn!(
- "Recreating missing state in recovery config is disabled,
missing partitions will not be created for topic with ID: '{}' for stream with
ID: '{}'.", topic.topic_id, topic.stream_id);
- return Err(IggyError::MissingPartitions(
- topic.topic_id,
- topic.stream_id,
- ));
- }
-
- info!(
- "Recreating missing state in recovery config is enabled,
missing partitions will be created for topic with ID: '{}' for stream with ID:
'{}'.",
- topic.topic_id, topic.stream_id
- );
+ if topic.config.recovery.recreate_missing_state {
+ info!(
+ "Recreating missing state in recovery config is enabled,
missing partitions will be created for topic with ID: '{}' for stream with ID:
'{}'.",
+ topic.topic_id, topic.stream_id
+ );
- for partition_id in missing_ids {
- let partition_state =
state.partitions.get(&partition_id).unwrap();
- let mut partition = Partition::create(
- topic.stream_id,
- topic.topic_id,
- partition_id,
- true,
- topic.config.clone(),
- topic.storage.clone(),
- message_expiry,
- topic.messages_count_of_parent_stream.clone(),
- topic.messages_count.clone(),
- topic.size_of_parent_stream.clone(),
- topic.size_bytes.clone(),
- topic.segments_count_of_parent_stream.clone(),
- partition_state.created_at,
- )
- .await;
- partition.persist().await.with_error_context(|error| {
- format!(
- "{COMPONENT} (error: {error}) - failed to persist
partition: {partition}"
+ for partition_id in missing_ids {
+ let partition_state =
state.partitions.get(&partition_id).unwrap();
+ let mut partition = Partition::create(
+ topic.stream_id,
+ topic.topic_id,
+ partition_id,
+ true,
+ topic.config.clone(),
+ topic.storage.clone(),
+ message_expiry,
+ topic.messages_count_of_parent_stream.clone(),
+ topic.messages_count.clone(),
+ topic.size_of_parent_stream.clone(),
+ topic.size_bytes.clone(),
+ topic.segments_count_of_parent_stream.clone(),
+ partition_state.created_at,
)
- })?;
- partition.segments.clear();
- unloaded_partitions.push(partition);
- info!(
+ .await;
+ partition.persist().await.with_error_context(|error| {
+ format!(
+ "{COMPONENT} (error: {error}) - failed to persist
partition: {partition}"
+ )
+ })?;
+ partition.segments.clear();
+ unloaded_partitions.push(partition);
+ info!(
"Created missing partition with ID: '{partition_id}', for
topic with ID: '{}' for stream with ID: '{}'.",
topic.topic_id, topic.stream_id
);
+ }
+ } else {
+ warn!("Recreating missing state in recovery config is
disabled, missing partitions will not be created for topic with ID: '{}' for
stream with ID: '{}'.", topic.topic_id, topic.stream_id);
}
}