This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch impl_polling_messages
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/impl_polling_messages by this 
push:
     new 89d668a7 polishing
89d668a7 is described below

commit 89d668a740ef1f8a19193ec860e56d88c8afea5d
Author: numminex <[email protected]>
AuthorDate: Tue Sep 9 20:24:54 2025 +0200

    polishing
---
 core/server/src/bootstrap.rs             |   4 +-
 core/server/src/shard/mod.rs             | 316 +++++++++++++++++++++++++++++--
 core/server/src/shard/system/messages.rs |  45 +++--
 core/server/src/slab/helpers.rs          |   6 +-
 core/server/src/slab/streams.rs          | 110 +++++------
 5 files changed, 381 insertions(+), 100 deletions(-)

diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index 4678b146..dd88f721 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -350,9 +350,9 @@ pub fn create_shard_executor(cpu_set: HashSet<usize>) -> 
Runtime {
     let mut proactor = compio::driver::ProactorBuilder::new();
 
     proactor
-        .capacity(4096)
+        .capacity(1024)
         .coop_taskrun(true)
-        .taskrun_flag(false); // TODO: Try enabling this.
+        .taskrun_flag(true); // TODO: Try enabling this.
 
     // FIXME(hubcio): Only set thread_pool_limit(0) on non-macOS platforms
     // This causes a freeze on macOS with compio fs operations
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 8263942a..8794f665 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -32,7 +32,8 @@ use error_set::ErrContext;
 use futures::future::try_join_all;
 use hash32::{Hasher, Murmur3Hasher};
 use iggy_common::{
-    EncryptorKind, Identifier, IggyError, Permissions, UserId, UserStatus,
+    EncryptorKind, Identifier, IggyError, IggyTimestamp, Permissions, 
PollingKind, UserId,
+    UserStatus,
     defaults::{DEFAULT_ROOT_PASSWORD, DEFAULT_ROOT_USERNAME},
     locking::IggyRwLockFn,
 };
@@ -50,10 +51,11 @@ use std::{
     },
     time::{Duration, Instant},
 };
-use tracing::{error, info, instrument, warn};
+use tracing::{error, info, instrument, trace, warn};
 use transmission::connector::{Receiver, ShardConnector, StopReceiver, 
StopSender};
 
 use crate::{
+    binary::handlers::messages::poll_messages_handler::IggyPollMetadata,
     configs::server::ServerConfig,
     http::http_server,
     io::fs_utils,
@@ -77,7 +79,9 @@ use crate::{
     streaming::{
         clients::client_manager::ClientManager,
         diagnostics::metrics::Metrics,
+        partitions,
         personal_access_tokens::personal_access_token::PersonalAccessToken,
+        polling_consumer::PollingConsumer,
         session::Session,
         storage::SystemStorage,
         streams, topics,
@@ -345,24 +349,312 @@ impl IggyShard {
     }
 
     async fn handle_request(&self, request: ShardRequest) -> 
Result<ShardResponse, IggyError> {
-        todo!();
-        /*
-        let stream = 
self.get_stream(&Identifier::numeric(request.stream_id)?)?;
-        let topic = stream.get_topic(&Identifier::numeric(request.topic_id)?)?;
+        let stream_id = request.stream_id;
+        let topic_id = request.topic_id;
         let partition_id = request.partition_id;
         match request.payload {
             ShardRequestPayload::SendMessages { batch } => {
-                topic.append_messages(partition_id, batch).await?;
+                let mut batch = self.maybe_encrypt_messages(batch)?;
+                let messages_count = batch.count();
+
+                let current_offset = self.streams2.with_partition_by_id(
+                    &stream_id,
+                    &topic_id,
+                    partition_id,
+                    partitions::helpers::calculate_current_offset(),
+                );
+
+                self.streams2
+                    .with_partition_by_id_async(
+                        &stream_id,
+                        &topic_id,
+                        partition_id,
+                        
partitions::helpers::deduplicate_messages(current_offset, &mut batch),
+                    )
+                    .await;
+
+                let (journal_messages_count, journal_size) =
+                    self.streams2.with_partition_by_id_mut(
+                        &stream_id,
+                        &topic_id,
+                        partition_id,
+                        partitions::helpers::append_to_journal(self.id, 
current_offset, batch),
+                    )?;
+
+                let unsaved_messages_count_exceeded = journal_messages_count
+                    >= self.config.system.partition.messages_required_to_save;
+                let unsaved_messages_size_exceeded = journal_size
+                    >= self
+                        .config
+                        .system
+                        .partition
+                        .size_of_messages_required_to_save
+                        .as_bytes_u64() as u32;
+
+                let is_full = self.streams2.with_partition_by_id(
+                    &stream_id,
+                    &topic_id,
+                    partition_id,
+                    partitions::helpers::is_segment_full(),
+                );
+
+                // Try committing the journal
+                if is_full || unsaved_messages_count_exceeded || 
unsaved_messages_size_exceeded {
+                    self.streams2
+                        .persist_messages(
+                            self.id,
+                            &stream_id,
+                            &topic_id,
+                            partition_id,
+                            unsaved_messages_count_exceeded,
+                            unsaved_messages_size_exceeded,
+                            journal_messages_count,
+                            journal_size,
+                            &self.config.system,
+                        )
+                        .await?;
+
+                    if is_full {
+                        self.streams2
+                            .handle_full_segment(
+                                self.id,
+                                &stream_id,
+                                &topic_id,
+                                partition_id,
+                                &self.config.system,
+                            )
+                            .await?;
+                        self.metrics.increment_messages(messages_count as u64);
+                    }
+                }
                 Ok(ShardResponse::SendMessages)
             }
             ShardRequestPayload::PollMessages { args, consumer } => {
-                let (metadata, batch) = topic
-                    .get_messages(consumer, partition_id, args.strategy, 
args.count)
-                    .await?;
-                Ok(ShardResponse::PollMessages((metadata, batch)))
+                let current_offset = self.streams2.with_partition_by_id(
+                    &stream_id,
+                    &topic_id,
+                    partition_id,
+                    |(_, _, _, offset, ..)| offset.load(Ordering::Relaxed),
+                );
+                let metadata = IggyPollMetadata::new(partition_id as u32, 
current_offset);
+                let count = args.count;
+                let strategy = args.strategy;
+                let value = strategy.value;
+                let batches = match strategy.kind {
+                    PollingKind::Offset => {
+                        let offset = value;
+                        // We have to remember to keep the invariant from the 
if that is on line 496.
+                        // Alternatively a better design would be to get rid 
of that if and move the validations here.
+                        let batches = self
+                            .streams2
+                            .get_messages_by_offset(
+                                &stream_id,
+                                &topic_id,
+                                partition_id,
+                                offset,
+                                count,
+                            )
+                            .await?;
+                        Ok(batches)
+                    }
+                    PollingKind::Timestamp => {
+                        let timestamp = IggyTimestamp::from(value);
+                        let timestamp_ts = timestamp.as_micros();
+                        trace!(
+                            "Getting {count} messages by timestamp: {} for 
partition: {}...",
+                            timestamp_ts, partition_id
+                        );
+
+                        let batches = self
+                            .streams2
+                            .get_messages_by_timestamp(
+                                &stream_id,
+                                &topic_id,
+                                partition_id,
+                                timestamp_ts,
+                                count,
+                            )
+                            .await?;
+                        Ok(batches)
+                    }
+                    PollingKind::First => {
+                        let first_offset = self.streams2.with_partition_by_id(
+                            &stream_id,
+                            &topic_id,
+                            partition_id,
+                            |(_, _, _, _, _, _, log)| {
+                                log.segments()
+                                    .first()
+                                    .map(|segment| segment.start_offset)
+                                    .unwrap_or(0)
+                            },
+                        );
+
+                        let batches = self
+                            .streams2
+                            .get_messages_by_offset(
+                                &stream_id,
+                                &topic_id,
+                                partition_id,
+                                first_offset,
+                                count,
+                            )
+                            .await?;
+                        Ok(batches)
+                    }
+                    PollingKind::Last => {
+                        let (start_offset, actual_count) = 
self.streams2.with_partition_by_id(
+                            &stream_id,
+                            &topic_id,
+                            partition_id,
+                            |(_, _, _, offset, _, _, _)| {
+                                let current_offset = 
offset.load(Ordering::Relaxed);
+                                let mut requested_count = 0;
+                                if requested_count > current_offset + 1 {
+                                    requested_count = current_offset + 1
+                                }
+                                let start_offset = 1 + current_offset - 
requested_count;
+                                (start_offset, requested_count as u32)
+                            },
+                        );
+
+                        let batches = self
+                            .streams2
+                            .get_messages_by_offset(
+                                &stream_id,
+                                &topic_id,
+                                partition_id,
+                                start_offset,
+                                actual_count,
+                            )
+                            .await?;
+                        Ok(batches)
+                    }
+                    PollingKind::Next => {
+                        let (consumer_offset, consumer_id) = match consumer {
+                            PollingConsumer::Consumer(consumer_id, _) => (
+                                self.streams2
+                                    .with_partition_by_id(
+                                        &stream_id,
+                                        &topic_id,
+                                        partition_id,
+                                        
partitions::helpers::get_consumer_offset(consumer_id),
+                                    )
+                                    .map(|c_offset| c_offset.stored_offset),
+                                consumer_id,
+                            ),
+                            PollingConsumer::ConsumerGroup(cg_id, _) => (
+                                self.streams2
+                                    .with_partition_by_id(
+                                        &stream_id,
+                                        &topic_id,
+                                        partition_id,
+                                        
partitions::helpers::get_consumer_group_member_offset(
+                                            cg_id,
+                                        ),
+                                    )
+                                    .map(|cg_offset| cg_offset.stored_offset),
+                                cg_id,
+                            ),
+                        };
+
+                        let Some(consumer_offset) = consumer_offset else {
+                            return 
Err(IggyError::ConsumerOffsetNotFound(consumer_id));
+                        };
+                        let offset = consumer_offset + 1;
+                        trace!(
+                            "Getting next messages for consumer id: {} for 
partition: {} from offset: {}...",
+                            consumer_id, partition_id, offset
+                        );
+                        let batches = self
+                            .streams2
+                            .get_messages_by_offset(
+                                &stream_id,
+                                &topic_id,
+                                partition_id,
+                                offset,
+                                count,
+                            )
+                            .await?;
+                        Ok(batches)
+                    }
+                }?;
+
+                let numeric_stream_id = self.streams2.with_stream_by_id(
+                    &stream_id,
+                    crate::streaming::streams::helpers::get_stream_id(),
+                );
+                let numeric_topic_id = self.streams2.with_topic_by_id(
+                    &stream_id,
+                    &topic_id,
+                    crate::streaming::topics::helpers::get_topic_id(),
+                );
+
+                if args.auto_commit && !batches.is_empty() {
+                    let offset = batches
+                        .last_offset()
+                        .expect("Batch set should have at least one batch");
+                    trace!(
+                        "Last offset: {} will be automatically stored for {}, 
stream: {}, topic: {}, partition: {}",
+                        offset, consumer, numeric_stream_id, numeric_topic_id, 
partition_id
+                    );
+                    match consumer {
+                        PollingConsumer::Consumer(consumer_id, _) => {
+                            self.streams2.with_partition_by_id(
+                                &stream_id,
+                                &topic_id,
+                                partition_id,
+                                partitions::helpers::store_consumer_offset(
+                                    consumer_id,
+                                    numeric_stream_id,
+                                    numeric_topic_id,
+                                    partition_id,
+                                    offset,
+                                    &self.config.system,
+                                ),
+                            );
+                            self.streams2
+                                .with_partition_by_id_async(
+                                    &stream_id,
+                                    &topic_id,
+                                    partition_id,
+                                    
partitions::helpers::persist_consumer_offset_to_disk(
+                                        self.id,
+                                        consumer_id,
+                                    ),
+                                )
+                                .await?;
+                        }
+                        PollingConsumer::ConsumerGroup(cg_id, _) => {
+                            self.streams2.with_partition_by_id(
+                                &stream_id,
+                                &topic_id,
+                                partition_id,
+                                
partitions::helpers::store_consumer_group_member_offset(
+                                    cg_id,
+                                    numeric_stream_id,
+                                    numeric_topic_id,
+                                    partition_id,
+                                    offset,
+                                    &self.config.system,
+                                ),
+                            );
+                            self.streams2.with_partition_by_id_async(
+                                    &stream_id,
+                                    &topic_id,
+                                    partition_id,
+                                    
partitions::helpers::persist_consumer_group_member_offset_to_disk(
+                                        self.id,
+                                        cg_id,
+                                    ),
+                                )
+                                .await?;
+                        }
+                    }
+                }
+                Ok(ShardResponse::PollMessages((metadata, batches)))
             }
         }
-        */
     }
 
     async fn handle_event(&self, event: ShardEvent) -> Result<(), IggyError> {
diff --git a/core/server/src/shard/system/messages.rs 
b/core/server/src/shard/system/messages.rs
index 0acea3ee..a3b3e2c6 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -26,14 +26,12 @@ use crate::shard::transmission::frame::ShardResponse;
 use crate::shard::transmission::message::{
     ShardMessage, ShardRequest, ShardRequestPayload, ShardSendRequestResult,
 };
+use crate::shard_trace;
 use crate::streaming::polling_consumer::PollingConsumer;
-use crate::streaming::segments::{
-    IggyIndexesMut, IggyMessagesBatchMut, IggyMessagesBatchSet,
-};
+use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut, 
IggyMessagesBatchSet};
 use crate::streaming::session::Session;
 use crate::streaming::utils::{PooledBuffer, hash};
 use crate::streaming::{partitions, streams, topics};
-use crate::shard_trace;
 use error_set::ErrContext;
 
 use iggy_common::{
@@ -186,26 +184,30 @@ impl IggyShard {
                     // Try committing the journal
                     if is_full || unsaved_messages_count_exceeded || 
unsaved_messages_size_exceeded
                     {
-                        self.streams2.persist_messages(
-                            self.id,
-                            stream_id,
-                            topic_id,
-                            partition_id,
-                            unsaved_messages_count_exceeded,
-                            unsaved_messages_size_exceeded,
-                            journal_messages_count,
-                            journal_size,
-                            &self.config.system,
-                        ).await?;
-
-                        if is_full {
-                            self.streams2.handle_full_segment(
+                        self.streams2
+                            .persist_messages(
                                 self.id,
                                 stream_id,
                                 topic_id,
                                 partition_id,
+                                unsaved_messages_count_exceeded,
+                                unsaved_messages_size_exceeded,
+                                journal_messages_count,
+                                journal_size,
                                 &self.config.system,
-                            ).await?;
+                            )
+                            .await?;
+
+                        if is_full {
+                            self.streams2
+                                .handle_full_segment(
+                                    self.id,
+                                    stream_id,
+                                    topic_id,
+                                    partition_id,
+                                    &self.config.system,
+                                )
+                                .await?;
                             self.metrics.increment_messages(messages_count as 
u64);
                         }
                     }
@@ -270,7 +272,10 @@ impl IggyShard {
         let has_partition = self
             .streams2
             .with_topic_by_id(stream_id, topic_id, |(root, ..)| {
-                root.partitions().exists(partition_id)
+                let partitions = root.partitions();
+                tracing::warn!("partitions: {:?}, looking for part_id: {}", 
partitions, partition_id);
+                partitions.exists(partition_id)
+                //root.partitions().exists(partition_id)
             });
         if !has_partition {
             return Err(IggyError::NoPartitions(
diff --git a/core/server/src/slab/helpers.rs b/core/server/src/slab/helpers.rs
index 3f72b20a..31a5a8b7 100644
--- a/core/server/src/slab/helpers.rs
+++ b/core/server/src/slab/helpers.rs
@@ -27,11 +27,11 @@ where
     async |(root, ..)| f(root.topics()).await
 }
 
-pub fn topics_mut<O, F>(f: F) -> impl FnOnce(ComponentsById<StreamRefMut>) -> O
+pub fn topics_mut<O, F>(f: F) -> impl FnOnce(ComponentsById<StreamRef>) -> O
 where
-    F: for<'a> FnOnce(&'a mut Topics) -> O,
+    F: for<'a> FnOnce(&'a Topics) -> O,
 {
-    |(mut root, ..)| f(root.topics_mut())
+    |(root, ..)| f(root.topics())
 }
 
 pub fn partitions<O, F>(f: F) -> impl FnOnce(ComponentsById<TopicRef>) -> O
diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs
index de085238..36dadb79 100644
--- a/core/server/src/slab/streams.rs
+++ b/core/server/src/slab/streams.rs
@@ -1,18 +1,30 @@
 use crate::{
-    configs::{cache_indexes::CacheIndexesConfig, system::SystemConfig}, slab::{
-        consumer_groups::ConsumerGroups, helpers, partitions::{self, 
Partitions}, topics::Topics, traits_ext::{
+    configs::{cache_indexes::CacheIndexesConfig, system::SystemConfig},
+    slab::{
+        Keyed,
+        consumer_groups::ConsumerGroups,
+        helpers,
+        partitions::{self, Partitions},
+        topics::Topics,
+        traits_ext::{
             ComponentsById, DeleteCell, EntityComponentSystem, 
EntityComponentSystemMutCell,
             InsertCell, InteriorMutability, IntoComponents,
-        }, Keyed
-    }, streaming::{
+        },
+    },
+    streaming::{
         partitions::partition2::{PartitionRef, PartitionRefMut},
-        segments::{storage::create_segment_storage, IggyMessagesBatchSet, 
Segment2},
+        segments::{IggyMessagesBatchSet, Segment2, 
storage::create_segment_storage},
         stats::stats::StreamStats,
-        streams::{self, stream2::{self, StreamRef, StreamRefMut}},
+        streams::{
+            self,
+            stream2::{self, StreamRef, StreamRefMut},
+        },
         topics::{
-            self, consumer_group2::{ConsumerGroupRef, ConsumerGroupRefMut}, 
topic2::{TopicRef, TopicRefMut}
+            self,
+            consumer_group2::{ConsumerGroupRef, ConsumerGroupRefMut},
+            topic2::{TopicRef, TopicRefMut},
         },
-    }
+    },
 };
 use ahash::AHashMap;
 use iggy_common::{Identifier, IggyError};
@@ -208,9 +220,9 @@ impl Streams {
     pub fn with_topics_mut<T>(
         &self,
         stream_id: &Identifier,
-        f: impl FnOnce(&mut Topics) -> T,
+        f: impl FnOnce(&Topics) -> T,
     ) -> T {
-        self.with_stream_by_id_mut(stream_id, helpers::topics_mut(f))
+        self.with_stream_by_id(stream_id, helpers::topics_mut(f))
     }
 
     pub fn with_topic_by_id<T>(
@@ -446,44 +458,26 @@ impl Streams {
         partition_id: partitions::ContainerId,
         config: &crate::configs::system::SystemConfig,
     ) -> Result<(), IggyError> {
-        let numeric_stream_id = self
-            .with_stream_by_id(stream_id, streams::helpers::get_stream_id());
-        let numeric_topic_id = self.with_topic_by_id(
-            stream_id,
-            topic_id,
-            topics::helpers::get_topic_id(),
-        );
+        let numeric_stream_id =
+            self.with_stream_by_id(stream_id, 
streams::helpers::get_stream_id());
+        let numeric_topic_id =
+            self.with_topic_by_id(stream_id, topic_id, 
topics::helpers::get_topic_id());
 
-        if config.segment.cache_indexes
-            == CacheIndexesConfig::OpenSegment
-            || config.segment.cache_indexes
-                == CacheIndexesConfig::None
+        if config.segment.cache_indexes == CacheIndexesConfig::OpenSegment
+            || config.segment.cache_indexes == CacheIndexesConfig::None
         {
-            self.with_partition_by_id_mut(
-                stream_id,
-                topic_id,
-                partition_id,
-                |(.., log)| {
-                    log.clear_active_indexes();
-                },
-            );
+            self.with_partition_by_id_mut(stream_id, topic_id, partition_id, 
|(.., log)| {
+                log.clear_active_indexes();
+            });
         }
 
-        self.with_partition_by_id_mut(
-            stream_id,
-            topic_id,
-            partition_id,
-            |(.., log)| {
-                log.active_segment_mut().sealed = true;
-            },
-        );
+        self.with_partition_by_id_mut(stream_id, topic_id, partition_id, |(.., 
log)| {
+            log.active_segment_mut().sealed = true;
+        });
         let (log_writer, index_writer) =
-            self.with_partition_by_id_mut(
-                stream_id,
-                topic_id,
-                partition_id,
-                |(.., log)| log.active_storage_mut().shutdown(),
-            );
+            self.with_partition_by_id_mut(stream_id, topic_id, partition_id, 
|(.., log)| {
+                log.active_storage_mut().shutdown()
+            });
 
         compio::runtime::spawn(async move {
             let _ = log_writer.fsync().await;
@@ -496,18 +490,13 @@ impl Streams {
         .detach();
 
         let (start_offset, size, end_offset) =
-            self.with_partition_by_id(
-                stream_id,
-                topic_id,
-                partition_id,
-                |(.., log)| {
-                    (
-                        log.active_segment().start_offset,
-                        log.active_segment().size,
-                        log.active_segment().end_offset,
-                    )
-                },
-            );
+            self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., 
log)| {
+                (
+                    log.active_segment().start_offset,
+                    log.active_segment().size,
+                    log.active_segment().end_offset,
+                )
+            });
 
         crate::shard_info!(
             shard_id,
@@ -538,14 +527,9 @@ impl Streams {
             end_offset + 1,
         )
         .await?;
-        self.with_partition_by_id_mut(
-            stream_id,
-            topic_id,
-            partition_id,
-            |(.., log)| {
-                log.add_persisted_segment(segment, storage);
-            },
-        );
+        self.with_partition_by_id_mut(stream_id, topic_id, partition_id, |(.., 
log)| {
+            log.add_persisted_segment(segment, storage);
+        });
 
         Ok(())
     }

Reply via email to