This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch connectors_fix
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/connectors_fix by this push:
     new d13f09333 fix consumer group recovery after runtime restart
d13f09333 is described below

commit d13f093337145301683b2ab42514a95a0eeeb16d
Author: spetz <[email protected]>
AuthorDate: Thu Feb 5 18:15:41 2026 +0100

    fix consumer group recovery after runtime restart
---
 core/integration/tests/server/cg.rs                |   4 +-
 core/integration/tests/server/mod.rs               |  11 +-
 ...er_group_new_messages_after_restart_scenario.rs | 193 +++++++++++++++++++++
 core/integration/tests/server/scenarios/mod.rs     |   1 +
 core/server/src/metadata/absorb.rs                 |  14 ++
 core/server/src/metadata/ops.rs                    |   1 +
 core/server/src/metadata/writer.rs                 |  12 ++
 core/server/src/shard/system/consumer_groups.rs    |  50 +++++-
 core/server/src/shard/system/consumer_offsets.rs   |  32 ++--
 core/server/src/shard/system/messages.rs           |   6 -
 10 files changed, 304 insertions(+), 20 deletions(-)

diff --git a/core/integration/tests/server/cg.rs 
b/core/integration/tests/server/cg.rs
index bf12c871e..0a2b343c0 100644
--- a/core/integration/tests/server/cg.rs
+++ b/core/integration/tests/server/cg.rs
@@ -17,7 +17,8 @@
 
 use crate::server::{
     ScenarioFn, auto_commit_reconnection_scenario, join_scenario, 
multiple_clients_scenario,
-    offset_cleanup_scenario, run_scenario, single_client_scenario,
+    new_messages_after_restart_scenario, offset_cleanup_scenario, run_scenario,
+    single_client_scenario,
 };
 use iggy_common::TransportProtocol;
 use serial_test::parallel;
@@ -40,6 +41,7 @@ fn websocket() -> TransportProtocol {
         single_client_scenario(),
         multiple_clients_scenario(),
         auto_commit_reconnection_scenario(),
+        new_messages_after_restart_scenario(),
         offset_cleanup_scenario(),
     ]
 )]
diff --git a/core/integration/tests/server/mod.rs 
b/core/integration/tests/server/mod.rs
index 376746436..10340935e 100644
--- a/core/integration/tests/server/mod.rs
+++ b/core/integration/tests/server/mod.rs
@@ -34,7 +34,8 @@ use integration::{
 };
 use scenarios::{
     authentication_scenario, bench_scenario, 
consumer_group_auto_commit_reconnection_scenario,
-    consumer_group_join_scenario, consumer_group_offset_cleanup_scenario,
+    consumer_group_join_scenario, 
consumer_group_new_messages_after_restart_scenario,
+    consumer_group_offset_cleanup_scenario,
     consumer_group_with_multiple_clients_polling_messages_scenario,
     consumer_group_with_single_client_polling_messages_scenario,
     consumer_timestamp_polling_scenario, create_message_payload, 
message_headers_scenario,
@@ -90,6 +91,14 @@ fn auto_commit_reconnection_scenario() -> ScenarioFn {
     }
 }
 
+fn new_messages_after_restart_scenario() -> ScenarioFn {
+    |factory| {
+        Box::pin(consumer_group_new_messages_after_restart_scenario::run(
+            factory,
+        ))
+    }
+}
+
 fn offset_cleanup_scenario() -> ScenarioFn {
     |factory| Box::pin(consumer_group_offset_cleanup_scenario::run(factory))
 }
diff --git 
a/core/integration/tests/server/scenarios/consumer_group_new_messages_after_restart_scenario.rs
 
b/core/integration/tests/server/scenarios/consumer_group_new_messages_after_restart_scenario.rs
new file mode 100644
index 000000000..3b219a3bc
--- /dev/null
+++ 
b/core/integration/tests/server/scenarios/consumer_group_new_messages_after_restart_scenario.rs
@@ -0,0 +1,193 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::server::scenarios::{
+    CONSUMER_GROUP_NAME, PARTITION_ID, STREAM_NAME, TOPIC_NAME, create_client,
+};
+use futures::StreamExt;
+use iggy::prelude::*;
+use integration::test_server::{ClientFactory, login_root};
+use std::str::FromStr;
+use tokio::time::{Duration, sleep, timeout};
+
+const INITIAL_MESSAGES_COUNT: u32 = 10;
+const NEW_MESSAGES_COUNT: u32 = 5;
+
+pub async fn run(client_factory: &dyn ClientFactory) {
+    let client = create_client(client_factory).await;
+    login_root(&client).await;
+    init_system(&client).await;
+    execute_scenario(client_factory, &client).await;
+}
+
+async fn init_system(client: &IggyClient) {
+    client.create_stream(STREAM_NAME).await.unwrap();
+
+    client
+        .create_topic(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            TOPIC_NAME,
+            1,
+            CompressionAlgorithm::default(),
+            None,
+            IggyExpiry::NeverExpire,
+            MaxTopicSize::ServerDefault,
+        )
+        .await
+        .unwrap();
+
+    client
+        .create_consumer_group(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            CONSUMER_GROUP_NAME,
+        )
+        .await
+        .unwrap();
+}
+
+async fn execute_scenario(client_factory: &dyn ClientFactory, client: 
&IggyClient) {
+    // 1. Produce initial messages
+    produce_messages(client, 1, INITIAL_MESSAGES_COUNT).await;
+
+    // 2. Create a separate client to simulate the runtime
+    let runtime_client = create_client(client_factory).await;
+    login_root(&runtime_client).await;
+
+    // 3. Create consumer and consume all initial messages
+    let mut consumer = create_consumer(&runtime_client).await;
+    let consumed_messages = consume_messages(&mut consumer, 
INITIAL_MESSAGES_COUNT).await;
+    assert_eq!(
+        consumed_messages.len(),
+        INITIAL_MESSAGES_COUNT as usize,
+        "Should consume all initial messages"
+    );
+
+    for (index, message) in consumed_messages.iter().enumerate() {
+        let expected_payload = format!("test_message_{}", index + 1);
+        let actual_payload = String::from_utf8_lossy(&message.payload);
+        assert_eq!(
+            actual_payload, expected_payload,
+            "Message content mismatch at index {index}"
+        );
+    }
+
+    // 4. Wait for auto-commit to process
+    sleep(Duration::from_secs(2)).await;
+
+    // 5. Disconnect the consumer and client (simulating runtime restart)
+    drop(consumer);
+    runtime_client.disconnect().await.unwrap();
+    drop(runtime_client);
+    sleep(Duration::from_millis(500)).await;
+
+    // 6. Send new messages after consumer disconnected
+    produce_messages(
+        client,
+        INITIAL_MESSAGES_COUNT + 1,
+        INITIAL_MESSAGES_COUNT + NEW_MESSAGES_COUNT,
+    )
+    .await;
+
+    // 7. Create a new client (simulating runtime restart)
+    let new_runtime_client = create_client(client_factory).await;
+    login_root(&new_runtime_client).await;
+
+    // 8. Reconnect consumer and consume new messages
+    let mut consumer = create_consumer(&new_runtime_client).await;
+    let new_messages = consume_messages(&mut consumer, 
NEW_MESSAGES_COUNT).await;
+    assert_eq!(
+        new_messages.len(),
+        NEW_MESSAGES_COUNT as usize,
+        "Should receive all new messages sent after restart"
+    );
+
+    for (index, message) in new_messages.iter().enumerate() {
+        let expected_payload =
+            format!("test_message_{}", INITIAL_MESSAGES_COUNT + 1 + index as 
u32);
+        let actual_payload = String::from_utf8_lossy(&message.payload);
+        assert_eq!(
+            actual_payload, expected_payload,
+            "New message content mismatch at index {index}"
+        );
+    }
+
+    drop(consumer);
+    drop(new_runtime_client);
+}
+
+async fn produce_messages(client: &IggyClient, start_id: u32, end_id: u32) {
+    let mut messages = Vec::new();
+    for message_id in start_id..=end_id {
+        let payload = format!("test_message_{message_id}");
+        let message = IggyMessage::from_str(&payload).unwrap();
+        messages.push(message);
+    }
+
+    client
+        .send_messages(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            &Partitioning::partition_id(PARTITION_ID),
+            &mut messages,
+        )
+        .await
+        .unwrap();
+}
+
+async fn create_consumer(client: &IggyClient) -> IggyConsumer {
+    let mut consumer = client
+        .consumer_group(CONSUMER_GROUP_NAME, STREAM_NAME, TOPIC_NAME)
+        .unwrap()
+        .batch_length(10)
+        .poll_interval(IggyDuration::from_str("100ms").unwrap())
+        .polling_strategy(PollingStrategy::next())
+        .auto_join_consumer_group()
+        .create_consumer_group_if_not_exists()
+        .auto_commit(AutoCommit::When(AutoCommitWhen::PollingMessages))
+        .build();
+
+    consumer.init().await.unwrap();
+    consumer
+}
+
+async fn consume_messages(consumer: &mut IggyConsumer, expected_count: u32) -> 
Vec<IggyMessage> {
+    let mut messages = Vec::new();
+    let mut count = 0;
+
+    let result = timeout(Duration::from_secs(30), async {
+        while count < expected_count {
+            if let Some(message_result) = consumer.next().await {
+                match message_result {
+                    Ok(polled_message) => {
+                        messages.push(polled_message.message);
+                        count += 1;
+                    }
+                    Err(error) => panic!("Error while consuming messages: 
{error}"),
+                }
+            }
+        }
+    })
+    .await;
+
+    if result.is_err() {
+        panic!("Timeout waiting for messages. Expected {expected_count}, 
received {count}");
+    }
+
+    messages
+}
diff --git a/core/integration/tests/server/scenarios/mod.rs 
b/core/integration/tests/server/scenarios/mod.rs
index 90d45676f..c376d8648 100644
--- a/core/integration/tests/server/scenarios/mod.rs
+++ b/core/integration/tests/server/scenarios/mod.rs
@@ -21,6 +21,7 @@ pub mod bench_scenario;
 pub mod concurrent_scenario;
 pub mod consumer_group_auto_commit_reconnection_scenario;
 pub mod consumer_group_join_scenario;
+pub mod consumer_group_new_messages_after_restart_scenario;
 pub mod consumer_group_offset_cleanup_scenario;
 pub mod consumer_group_with_multiple_clients_polling_messages_scenario;
 pub mod consumer_group_with_single_client_polling_messages_scenario;
diff --git a/core/server/src/metadata/absorb.rs 
b/core/server/src/metadata/absorb.rs
index 36159dfdc..b541b724c 100644
--- a/core/server/src/metadata/absorb.rs
+++ b/core/server/src/metadata/absorb.rs
@@ -284,11 +284,25 @@ fn apply_op(metadata: &mut InnerMetadata, op: 
&MetadataOp, populate_ids: bool) {
             group_id,
             client_id,
             member_id,
+            valid_client_ids,
         } => {
             if let Some(stream) = metadata.streams.get_mut(*stream_id)
                 && let Some(topic) = stream.topics.get_mut(*topic_id)
                 && let Some(group) = topic.consumer_groups.get_mut(*group_id)
             {
+                if let Some(valid_ids) = valid_client_ids {
+                    let stale_members: Vec<usize> = group
+                        .members
+                        .iter()
+                        .filter(|(_, m)| !valid_ids.contains(&m.client_id))
+                        .map(|(slot_id, _)| slot_id)
+                        .collect();
+
+                    for slot_id in stale_members {
+                        group.members.remove(slot_id);
+                    }
+                }
+
                 let next_id = group
                     .members
                     .iter()
diff --git a/core/server/src/metadata/ops.rs b/core/server/src/metadata/ops.rs
index f489b7309..91e6a0654 100644
--- a/core/server/src/metadata/ops.rs
+++ b/core/server/src/metadata/ops.rs
@@ -114,6 +114,7 @@ pub enum MetadataOp {
         group_id: ConsumerGroupId,
         client_id: u32,
         member_id: Arc<AtomicUsize>,
+        valid_client_ids: Option<Vec<u32>>,
     },
     LeaveConsumerGroup {
         stream_id: StreamId,
diff --git a/core/server/src/metadata/writer.rs 
b/core/server/src/metadata/writer.rs
index ca97842b1..30f6dad23 100644
--- a/core/server/src/metadata/writer.rs
+++ b/core/server/src/metadata/writer.rs
@@ -263,6 +263,17 @@ impl MetadataWriter {
         topic_id: TopicId,
         group_id: ConsumerGroupId,
         client_id: u32,
+    ) -> Option<usize> {
+        self.join_consumer_group_with_cleanup(stream_id, topic_id, group_id, 
client_id, None)
+    }
+
+    pub fn join_consumer_group_with_cleanup(
+        &mut self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        group_id: ConsumerGroupId,
+        client_id: u32,
+        valid_client_ids: Option<Vec<u32>>,
     ) -> Option<usize> {
         let member_id = Arc::new(AtomicUsize::new(usize::MAX));
         self.append(MetadataOp::JoinConsumerGroup {
@@ -271,6 +282,7 @@ impl MetadataWriter {
             group_id,
             client_id,
             member_id: member_id.clone(),
+            valid_client_ids,
         });
         self.publish();
         let id = member_id.load(Ordering::Acquire);
diff --git a/core/server/src/shard/system/consumer_groups.rs 
b/core/server/src/shard/system/consumer_groups.rs
index 967eb541a..a2d971049 100644
--- a/core/server/src/shard/system/consumer_groups.rs
+++ b/core/server/src/shard/system/consumer_groups.rs
@@ -103,8 +103,54 @@ impl IggyShard {
         let (stream, topic, group) =
             self.resolve_consumer_group_id(stream_id, topic_id, group_id)?;
 
-        self.writer()
-            .join_consumer_group(stream, topic, group, client_id);
+        let valid_client_ids: Vec<u32> = self
+            .client_manager
+            .get_clients()
+            .iter()
+            .map(|c| c.session.client_id)
+            .collect();
+
+        self.writer().join_consumer_group_with_cleanup(
+            stream,
+            topic,
+            group,
+            client_id,
+            Some(valid_client_ids),
+        );
+
+        if let Some(cg) = self.metadata.get_consumer_group(stream, topic, 
group)
+            && let Some((_, member)) = cg.members.iter().find(|(_, m)| 
m.client_id == client_id)
+            && member.partitions.is_empty()
+            && !cg.partitions.is_empty()
+        {
+            let current_valid_ids: Vec<u32> = self
+                .client_manager
+                .get_clients()
+                .iter()
+                .map(|c| c.session.client_id)
+                .collect();
+
+            let potentially_stale: Vec<u32> = cg
+                .members
+                .iter()
+                .filter(|(_, m)| {
+                    !m.partitions.is_empty() && 
!current_valid_ids.contains(&m.client_id)
+                })
+                .map(|(_, m)| m.client_id)
+                .collect();
+
+            if !potentially_stale.is_empty() {
+                tracing::info!(
+                    "join_consumer_group: new member {client_id} has no 
partitions, found stale members: {potentially_stale:?}, forcing leave"
+                );
+
+                for stale_client_id in potentially_stale {
+                    let _ =
+                        self.writer()
+                            .leave_consumer_group(stream, topic, group, 
stale_client_id);
+                }
+            }
+        }
 
         self.client_manager
             .join_consumer_group(client_id, stream, topic, group)
diff --git a/core/server/src/shard/system/consumer_offsets.rs 
b/core/server/src/shard/system/consumer_offsets.rs
index 961011bf8..6e3129515 100644
--- a/core/server/src/shard/system/consumer_offsets.rs
+++ b/core/server/src/shard/system/consumer_offsets.rs
@@ -69,16 +69,28 @@ impl IggyShard {
     ) -> Result<Option<ConsumerOffsetInfo>, IggyError> {
         let (stream, topic) = self.resolve_topic_id(stream_id, topic_id)?;
 
-        let Some((polling_consumer, partition_id)) = 
self.resolve_consumer_with_partition_id(
-            stream_id,
-            topic_id,
-            &consumer,
-            client_id,
-            partition_id,
-            false,
-        )?
-        else {
-            return Err(IggyError::NotResolvedConsumer(consumer.id));
+        let (polling_consumer, partition_id) = match consumer.kind {
+            ConsumerKind::Consumer => {
+                let Some((polling_consumer, partition_id)) = self
+                    .resolve_consumer_with_partition_id(
+                        stream_id,
+                        topic_id,
+                        &consumer,
+                        client_id,
+                        partition_id,
+                        false,
+                    )?
+                else {
+                    return 
Err(IggyError::NotResolvedConsumer(consumer.id.clone()));
+                };
+                (polling_consumer, partition_id)
+            }
+            ConsumerKind::ConsumerGroup => {
+                let (_, _, cg_id) =
+                    self.resolve_consumer_group_id(stream_id, topic_id, 
&consumer.id)?;
+                let partition_id = partition_id.unwrap_or(0) as usize;
+                (PollingConsumer::consumer_group(cg_id, 0), partition_id)
+            }
         };
         self.ensure_partition_exists(stream_id, topic_id, partition_id)?;
 
diff --git a/core/server/src/shard/system/messages.rs 
b/core/server/src/shard/system/messages.rs
index 008495743..616c8c7c8 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -414,12 +414,6 @@ impl IggyShard {
                     (item.offset.load(Ordering::Relaxed), item.path.clone())
                 }
                 PollingConsumer::ConsumerGroup(consumer_group_id, _) => {
-                    tracing::trace!(
-                        "Auto-committing offset {} for consumer group {} on 
partition {:?}",
-                        offset,
-                        consumer_group_id.0,
-                        namespace
-                    );
                     let hdl = partition.consumer_group_offsets.pin();
                     let item = hdl.get_or_insert(
                         consumer_group_id,

Reply via email to