This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/io_uring_tpc by this push:
     new 52dbed84 feat(io_uring): implement polling messages using new 
architecture (#2162)
52dbed84 is described below

commit 52dbed84fb91903255d8b5097c34a652eb031c27
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Wed Sep 10 14:33:17 2025 +0200

    feat(io_uring): implement polling messages using new architecture (#2162)
---
 core/bench/src/actors/consumer/client/low_level.rs |  12 +-
 core/sdk/src/clients/binary_consumer_group.rs      |   4 +-
 core/sdk/src/clients/client.rs                     |   6 +-
 core/server/src/bootstrap.rs                       |   4 +-
 core/server/src/http/consumer_groups.rs            |   6 +-
 core/server/src/shard/mod.rs                       | 334 +++++++++-
 core/server/src/shard/namespace.rs                 |  15 +
 core/server/src/shard/system/messages.rs           | 732 +++++++++++----------
 core/server/src/shard/system/partitions.rs         |  18 +-
 core/server/src/shard/system/utils.rs              |   2 +-
 core/server/src/shard/transmission/message.rs      |  15 +-
 core/server/src/slab/helpers.rs                    |   6 +-
 core/server/src/slab/partitions.rs                 |   4 +
 core/server/src/slab/streams.rs                    | 231 ++++++-
 core/server/src/streaming/cluster/mod.rs           |   4 +-
 core/server/src/streaming/partitions/helpers.rs    | 599 ++++++++++++++++-
 core/server/src/streaming/partitions/journal.rs    |  22 +
 core/server/src/streaming/partitions/log.rs        |  55 +-
 core/server/src/streaming/partitions/messages.rs   |   7 -
 .../src/streaming/segments/indexes/index_reader.rs |   8 +-
 .../src/streaming/segments/indexes/index_writer.rs |   8 +-
 .../src/streaming/segments/indexes/indexes_mut.rs  |   2 +-
 .../streaming/segments/messages/messages_reader.rs |  11 +-
 .../streaming/segments/messages/messages_writer.rs |  16 +-
 core/server/src/streaming/segments/storage.rs      |  26 +-
 25 files changed, 1670 insertions(+), 477 deletions(-)

diff --git a/core/bench/src/actors/consumer/client/low_level.rs 
b/core/bench/src/actors/consumer/client/low_level.rs
index 139dae24..088ad42f 100644
--- a/core/bench/src/actors/consumer/client/low_level.rs
+++ b/core/bench/src/actors/consumer/client/low_level.rs
@@ -89,7 +89,7 @@ impl ConsumerClient for LowLevelConsumerClient {
         if polled.messages.is_empty() {
             return Ok(None);
         }
-        let message_count = u32::try_from(polled.messages.len()).unwrap();
+        let messages_count = polled.messages.len() as u64;
         let latency = if self.config.origin_timestamp_latency_calculation {
             let now = IggyTimestamp::now().as_micros();
             Duration::from_micros(now - 
polled.messages[0].header.origin_timestamp)
@@ -100,10 +100,16 @@ impl ConsumerClient for LowLevelConsumerClient {
         let user_bytes = batch_user_size_bytes(&polled);
         let total_bytes = batch_total_size_bytes(&polled);
 
-        self.offset += polled.messages.len() as u64;
+        self.offset += messages_count;
+        match self.polling_strategy.kind {
+            PollingKind::Offset => {
+                self.polling_strategy.value += messages_count;
+            }
+            _ => {}
+        }
 
         Ok(Some(BatchMetrics {
-            messages: message_count,
+            messages: messages_count as u32,
             user_data_bytes: user_bytes,
             total_bytes,
             latency,
diff --git a/core/sdk/src/clients/binary_consumer_group.rs 
b/core/sdk/src/clients/binary_consumer_group.rs
index b0ef8c67..b415e42f 100644
--- a/core/sdk/src/clients/binary_consumer_group.rs
+++ b/core/sdk/src/clients/binary_consumer_group.rs
@@ -20,7 +20,9 @@ use crate::prelude::IggyClient;
 use async_dropper::AsyncDrop;
 use async_trait::async_trait;
 use iggy_binary_protocol::{ConsumerGroupClient, UserClient};
-use iggy_common::{locking::IggyRwLockFn, ConsumerGroup, ConsumerGroupDetails, 
Identifier, IggyError};
+use iggy_common::{
+    ConsumerGroup, ConsumerGroupDetails, Identifier, IggyError, 
locking::IggyRwLockFn,
+};
 
 #[async_trait]
 impl ConsumerGroupClient for IggyClient {
diff --git a/core/sdk/src/clients/client.rs b/core/sdk/src/clients/client.rs
index 6f31dd56..962ff1bc 100644
--- a/core/sdk/src/clients/client.rs
+++ b/core/sdk/src/clients/client.rs
@@ -17,8 +17,8 @@
  */
 
 use crate::clients::client_builder::IggyClientBuilder;
-use iggy_common::locking::{IggyRwLock, IggyRwLockFn};
 use iggy_common::Consumer;
+use iggy_common::locking::{IggyRwLock, IggyRwLockFn};
 
 use crate::client_wrappers::client_wrapper::ClientWrapper;
 use crate::http::http_client::HttpClient;
@@ -31,9 +31,7 @@ use crate::tcp::tcp_client::TcpClient;
 use async_broadcast::Receiver;
 use async_trait::async_trait;
 use iggy_binary_protocol::{Client, SystemClient};
-use iggy_common::{
-    ConnectionStringUtils, DiagnosticEvent, Partitioner, TransportProtocol,
-};
+use iggy_common::{ConnectionStringUtils, DiagnosticEvent, Partitioner, 
TransportProtocol};
 use std::fmt::Debug;
 use std::sync::Arc;
 use tokio::spawn;
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index 4678b146..8e3797b2 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -352,7 +352,7 @@ pub fn create_shard_executor(cpu_set: HashSet<usize>) -> 
Runtime {
     proactor
         .capacity(4096)
         .coop_taskrun(true)
-        .taskrun_flag(false); // TODO: Try enabling this.
+        .taskrun_flag(true); // TODO: Try enabling this.
 
     // FIXME(hubcio): Only set thread_pool_limit(0) on non-macOS platforms
     // This causes a freeze on macOS with compio fs operations
@@ -362,7 +362,7 @@ pub fn create_shard_executor(cpu_set: HashSet<usize>) -> 
Runtime {
 
     compio::runtime::RuntimeBuilder::new()
         .with_proactor(proactor.to_owned())
-        .event_interval(69)
+        .event_interval(128)
         .thread_affinity(cpu_set)
         .build()
         .unwrap()
diff --git a/core/server/src/http/consumer_groups.rs 
b/core/server/src/http/consumer_groups.rs
index 6c5a49af..56f4ecf4 100644
--- a/core/server/src/http/consumer_groups.rs
+++ b/core/server/src/http/consumer_groups.rs
@@ -262,9 +262,9 @@ async fn delete_consumer_group(
 
     // Delete using the new API
     let consumer_group = state.shard.shard().delete_consumer_group2(
-        &session, 
-        &identifier_stream_id, 
-        &identifier_topic_id, 
+        &session,
+        &identifier_stream_id,
+        &identifier_topic_id,
         &identifier_group_id
     )
     .with_error_context(|error| format!("{COMPONENT} (error: {error}) - failed 
to delete consumer group with ID: {group_id} for topic with ID: {topic_id} in 
stream with ID: {stream_id}"))?;
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 2a5ec7eb..8794f665 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -32,7 +32,8 @@ use error_set::ErrContext;
 use futures::future::try_join_all;
 use hash32::{Hasher, Murmur3Hasher};
 use iggy_common::{
-    EncryptorKind, Identifier, IggyError, Permissions, UserId, UserStatus,
+    EncryptorKind, Identifier, IggyError, IggyTimestamp, Permissions, 
PollingKind, UserId,
+    UserStatus,
     defaults::{DEFAULT_ROOT_PASSWORD, DEFAULT_ROOT_USERNAME},
     locking::IggyRwLockFn,
 };
@@ -50,10 +51,11 @@ use std::{
     },
     time::{Duration, Instant},
 };
-use tracing::{error, info, instrument, warn};
+use tracing::{error, info, instrument, trace, warn};
 use transmission::connector::{Receiver, ShardConnector, StopReceiver, 
StopSender};
 
 use crate::{
+    binary::handlers::messages::poll_messages_handler::IggyPollMetadata,
     configs::server::ServerConfig,
     http::http_server,
     io::fs_utils,
@@ -77,9 +79,12 @@ use crate::{
     streaming::{
         clients::client_manager::ClientManager,
         diagnostics::metrics::Metrics,
+        partitions,
         personal_access_tokens::personal_access_token::PersonalAccessToken,
+        polling_consumer::PollingConsumer,
         session::Session,
         storage::SystemStorage,
+        streams, topics,
         users::{permissioner::Permissioner, user::User},
         utils::ptr::EternalPtr,
     },
@@ -124,7 +129,7 @@ impl Shard {
 // TODO: Maybe pad to cache line size?
 #[derive(Debug, Clone, Copy, Eq, PartialEq)]
 pub struct ShardInfo {
-    id: u16,
+    pub id: u16,
 }
 
 impl ShardInfo {
@@ -344,24 +349,312 @@ impl IggyShard {
     }
 
     async fn handle_request(&self, request: ShardRequest) -> 
Result<ShardResponse, IggyError> {
-        todo!();
-        /*
-        let stream = 
self.get_stream(&Identifier::numeric(request.stream_id)?)?;
-        let topic = stream.get_topic(&Identifier::numeric(request.topic_id)?)?;
+        let stream_id = request.stream_id;
+        let topic_id = request.topic_id;
         let partition_id = request.partition_id;
         match request.payload {
             ShardRequestPayload::SendMessages { batch } => {
-                topic.append_messages(partition_id, batch).await?;
+                let mut batch = self.maybe_encrypt_messages(batch)?;
+                let messages_count = batch.count();
+
+                let current_offset = self.streams2.with_partition_by_id(
+                    &stream_id,
+                    &topic_id,
+                    partition_id,
+                    partitions::helpers::calculate_current_offset(),
+                );
+
+                self.streams2
+                    .with_partition_by_id_async(
+                        &stream_id,
+                        &topic_id,
+                        partition_id,
+                        
partitions::helpers::deduplicate_messages(current_offset, &mut batch),
+                    )
+                    .await;
+
+                let (journal_messages_count, journal_size) =
+                    self.streams2.with_partition_by_id_mut(
+                        &stream_id,
+                        &topic_id,
+                        partition_id,
+                        partitions::helpers::append_to_journal(self.id, 
current_offset, batch),
+                    )?;
+
+                let unsaved_messages_count_exceeded = journal_messages_count
+                    >= self.config.system.partition.messages_required_to_save;
+                let unsaved_messages_size_exceeded = journal_size
+                    >= self
+                        .config
+                        .system
+                        .partition
+                        .size_of_messages_required_to_save
+                        .as_bytes_u64() as u32;
+
+                let is_full = self.streams2.with_partition_by_id(
+                    &stream_id,
+                    &topic_id,
+                    partition_id,
+                    partitions::helpers::is_segment_full(),
+                );
+
+                // Try committing the journal
+                if is_full || unsaved_messages_count_exceeded || 
unsaved_messages_size_exceeded {
+                    self.streams2
+                        .persist_messages(
+                            self.id,
+                            &stream_id,
+                            &topic_id,
+                            partition_id,
+                            unsaved_messages_count_exceeded,
+                            unsaved_messages_size_exceeded,
+                            journal_messages_count,
+                            journal_size,
+                            &self.config.system,
+                        )
+                        .await?;
+
+                    if is_full {
+                        self.streams2
+                            .handle_full_segment(
+                                self.id,
+                                &stream_id,
+                                &topic_id,
+                                partition_id,
+                                &self.config.system,
+                            )
+                            .await?;
+                        self.metrics.increment_messages(messages_count as u64);
+                    }
+                }
                 Ok(ShardResponse::SendMessages)
             }
             ShardRequestPayload::PollMessages { args, consumer } => {
-                let (metadata, batch) = topic
-                    .get_messages(consumer, partition_id, args.strategy, 
args.count)
-                    .await?;
-                Ok(ShardResponse::PollMessages((metadata, batch)))
+                let current_offset = self.streams2.with_partition_by_id(
+                    &stream_id,
+                    &topic_id,
+                    partition_id,
+                    |(_, _, _, offset, ..)| offset.load(Ordering::Relaxed),
+                );
+                let metadata = IggyPollMetadata::new(partition_id as u32, 
current_offset);
+                let count = args.count;
+                let strategy = args.strategy;
+                let value = strategy.value;
+                let batches = match strategy.kind {
+                    PollingKind::Offset => {
+                        let offset = value;
+                        // We have to remember to keep the invariant from the 
if that is on line 496.
+                        // Alternatively a better design would be to get rid 
of that if and move the validations here.
+                        let batches = self
+                            .streams2
+                            .get_messages_by_offset(
+                                &stream_id,
+                                &topic_id,
+                                partition_id,
+                                offset,
+                                count,
+                            )
+                            .await?;
+                        Ok(batches)
+                    }
+                    PollingKind::Timestamp => {
+                        let timestamp = IggyTimestamp::from(value);
+                        let timestamp_ts = timestamp.as_micros();
+                        trace!(
+                            "Getting {count} messages by timestamp: {} for 
partition: {}...",
+                            timestamp_ts, partition_id
+                        );
+
+                        let batches = self
+                            .streams2
+                            .get_messages_by_timestamp(
+                                &stream_id,
+                                &topic_id,
+                                partition_id,
+                                timestamp_ts,
+                                count,
+                            )
+                            .await?;
+                        Ok(batches)
+                    }
+                    PollingKind::First => {
+                        let first_offset = self.streams2.with_partition_by_id(
+                            &stream_id,
+                            &topic_id,
+                            partition_id,
+                            |(_, _, _, _, _, _, log)| {
+                                log.segments()
+                                    .first()
+                                    .map(|segment| segment.start_offset)
+                                    .unwrap_or(0)
+                            },
+                        );
+
+                        let batches = self
+                            .streams2
+                            .get_messages_by_offset(
+                                &stream_id,
+                                &topic_id,
+                                partition_id,
+                                first_offset,
+                                count,
+                            )
+                            .await?;
+                        Ok(batches)
+                    }
+                    PollingKind::Last => {
+                        let (start_offset, actual_count) = 
self.streams2.with_partition_by_id(
+                            &stream_id,
+                            &topic_id,
+                            partition_id,
+                            |(_, _, _, offset, _, _, _)| {
+                                let current_offset = 
offset.load(Ordering::Relaxed);
+                                let mut requested_count = 0;
+                                if requested_count > current_offset + 1 {
+                                    requested_count = current_offset + 1
+                                }
+                                let start_offset = 1 + current_offset - 
requested_count;
+                                (start_offset, requested_count as u32)
+                            },
+                        );
+
+                        let batches = self
+                            .streams2
+                            .get_messages_by_offset(
+                                &stream_id,
+                                &topic_id,
+                                partition_id,
+                                start_offset,
+                                actual_count,
+                            )
+                            .await?;
+                        Ok(batches)
+                    }
+                    PollingKind::Next => {
+                        let (consumer_offset, consumer_id) = match consumer {
+                            PollingConsumer::Consumer(consumer_id, _) => (
+                                self.streams2
+                                    .with_partition_by_id(
+                                        &stream_id,
+                                        &topic_id,
+                                        partition_id,
+                                        
partitions::helpers::get_consumer_offset(consumer_id),
+                                    )
+                                    .map(|c_offset| c_offset.stored_offset),
+                                consumer_id,
+                            ),
+                            PollingConsumer::ConsumerGroup(cg_id, _) => (
+                                self.streams2
+                                    .with_partition_by_id(
+                                        &stream_id,
+                                        &topic_id,
+                                        partition_id,
+                                        
partitions::helpers::get_consumer_group_member_offset(
+                                            cg_id,
+                                        ),
+                                    )
+                                    .map(|cg_offset| cg_offset.stored_offset),
+                                cg_id,
+                            ),
+                        };
+
+                        let Some(consumer_offset) = consumer_offset else {
+                            return 
Err(IggyError::ConsumerOffsetNotFound(consumer_id));
+                        };
+                        let offset = consumer_offset + 1;
+                        trace!(
+                            "Getting next messages for consumer id: {} for 
partition: {} from offset: {}...",
+                            consumer_id, partition_id, offset
+                        );
+                        let batches = self
+                            .streams2
+                            .get_messages_by_offset(
+                                &stream_id,
+                                &topic_id,
+                                partition_id,
+                                offset,
+                                count,
+                            )
+                            .await?;
+                        Ok(batches)
+                    }
+                }?;
+
+                let numeric_stream_id = self.streams2.with_stream_by_id(
+                    &stream_id,
+                    crate::streaming::streams::helpers::get_stream_id(),
+                );
+                let numeric_topic_id = self.streams2.with_topic_by_id(
+                    &stream_id,
+                    &topic_id,
+                    crate::streaming::topics::helpers::get_topic_id(),
+                );
+
+                if args.auto_commit && !batches.is_empty() {
+                    let offset = batches
+                        .last_offset()
+                        .expect("Batch set should have at least one batch");
+                    trace!(
+                        "Last offset: {} will be automatically stored for {}, 
stream: {}, topic: {}, partition: {}",
+                        offset, consumer, numeric_stream_id, numeric_topic_id, 
partition_id
+                    );
+                    match consumer {
+                        PollingConsumer::Consumer(consumer_id, _) => {
+                            self.streams2.with_partition_by_id(
+                                &stream_id,
+                                &topic_id,
+                                partition_id,
+                                partitions::helpers::store_consumer_offset(
+                                    consumer_id,
+                                    numeric_stream_id,
+                                    numeric_topic_id,
+                                    partition_id,
+                                    offset,
+                                    &self.config.system,
+                                ),
+                            );
+                            self.streams2
+                                .with_partition_by_id_async(
+                                    &stream_id,
+                                    &topic_id,
+                                    partition_id,
+                                    
partitions::helpers::persist_consumer_offset_to_disk(
+                                        self.id,
+                                        consumer_id,
+                                    ),
+                                )
+                                .await?;
+                        }
+                        PollingConsumer::ConsumerGroup(cg_id, _) => {
+                            self.streams2.with_partition_by_id(
+                                &stream_id,
+                                &topic_id,
+                                partition_id,
+                                
partitions::helpers::store_consumer_group_member_offset(
+                                    cg_id,
+                                    numeric_stream_id,
+                                    numeric_topic_id,
+                                    partition_id,
+                                    offset,
+                                    &self.config.system,
+                                ),
+                            );
+                            self.streams2.with_partition_by_id_async(
+                                    &stream_id,
+                                    &topic_id,
+                                    partition_id,
+                                    
partitions::helpers::persist_consumer_group_member_offset_to_disk(
+                                        self.id,
+                                        cg_id,
+                                    ),
+                                )
+                                .await?;
+                        }
+                    }
+                }
+                Ok(ShardResponse::PollMessages((metadata, batches)))
             }
         }
-        */
     }
 
     async fn handle_event(&self, event: ShardEvent) -> Result<(), IggyError> {
@@ -563,13 +856,12 @@ impl IggyShard {
                     &topic_id,
                     &polling_consumer,
                     partition_id,
-                );
+                )?;
                 Ok(())
             }
         }
     }
 
-    /*
     pub async fn send_request_to_shard_or_recoil(
         &self,
         namespace: &IggyNamespace,
@@ -593,13 +885,12 @@ impl IggyShard {
             Ok(ShardSendRequestResult::Response(response))
         } else {
             Err(IggyError::ShardNotFound(
-                namespace.stream_id,
-                namespace.topic_id,
-                namespace.partition_id,
+                namespace.stream_id(),
+                namespace.topic_id(),
+                namespace.partition_id(),
             ))
         }
     }
-    */
 
     pub async fn broadcast_event_to_all_shards(&self, event: ShardEvent) -> 
Vec<ShardResponse> {
         let mut responses = 
Vec::with_capacity(self.get_available_shards_count() as usize);
@@ -669,7 +960,10 @@ impl IggyShard {
     }
 
     pub fn remove_shard_table_record(&self, namespace: &IggyNamespace) -> 
ShardInfo {
-        self.shards_table.remove(namespace).map(|(_, shard_info)| 
shard_info).expect("remove_shard_table_record: namespace not found")
+        self.shards_table
+            .remove(namespace)
+            .map(|(_, shard_info)| shard_info)
+            .expect("remove_shard_table_record: namespace not found")
     }
 
     pub fn remove_shard_table_records(
diff --git a/core/server/src/shard/namespace.rs 
b/core/server/src/shard/namespace.rs
index 127be415..f3a8f2c3 100644
--- a/core/server/src/shard/namespace.rs
+++ b/core/server/src/shard/namespace.rs
@@ -63,6 +63,21 @@ impl IggyNamespace {
         self.0
     }
 
+    #[inline]
+    pub fn stream_id(&self) -> usize {
+        ((self.0 >> STREAM_SHIFT) & STREAM_MASK) as usize
+    }
+
+    #[inline]
+    pub fn topic_id(&self) -> usize {
+        ((self.0 >> TOPIC_SHIFT) & TOPIC_MASK) as usize
+    }
+
+    #[inline]
+    pub fn partition_id(&self) -> usize {
+        ((self.0 >> PARTITION_SHIFT) & PARTITION_MASK) as usize
+    }
+
     #[inline]
     pub fn new(stream: usize, topic: usize, partition: usize) -> Self {
         let value = ((stream as u64) & STREAM_MASK) << STREAM_SHIFT
diff --git a/core/server/src/shard/system/messages.rs 
b/core/server/src/shard/system/messages.rs
index 0c898b26..1a57f4a1 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -20,27 +20,23 @@ use std::sync::atomic::Ordering;
 
 use super::COMPONENT;
 use crate::binary::handlers::messages::poll_messages_handler::IggyPollMetadata;
-use crate::configs::cache_indexes::CacheIndexesConfig;
 use crate::shard::IggyShard;
+use crate::shard::namespace::IggyNamespace;
 use crate::shard::transmission::frame::ShardResponse;
 use crate::shard::transmission::message::{
     ShardMessage, ShardRequest, ShardRequestPayload, ShardSendRequestResult,
 };
-use crate::streaming::partitions::journal::Journal;
-use crate::streaming::partitions::partition2::PartitionRoot;
-use crate::streaming::segments::storage::create_segment_storage;
-use crate::streaming::segments::{
-    IggyIndexesMut, IggyMessagesBatchMut, IggyMessagesBatchSet, Segment2,
-};
+use crate::shard_trace;
+use crate::streaming::polling_consumer::PollingConsumer;
+use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut, 
IggyMessagesBatchSet};
 use crate::streaming::session::Session;
 use crate::streaming::utils::{PooledBuffer, hash};
-use crate::streaming::{streams, topics};
-use crate::{shard_info, shard_trace};
+use crate::streaming::{partitions, streams, topics};
 use error_set::ErrContext;
 
 use iggy_common::{
-    BytesSerializable, Consumer, IGGY_MESSAGE_HEADER_SIZE, Identifier, 
IggyError, Partitioning,
-    PartitioningKind, PollingStrategy,
+    BytesSerializable, Consumer, IGGY_MESSAGE_HEADER_SIZE, Identifier, 
IggyError, IggyTimestamp,
+    Partitioning, PartitioningKind, PollingKind, PollingStrategy,
 };
 use tracing::{error, trace};
 
@@ -98,10 +94,6 @@ impl IggyShard {
             return Ok(());
         }
 
-        // Encrypt messages if encryptor is enabled in configuration.
-        let mut batch = self.maybe_encrypt_messages(batch)?;
-        let messages_count = batch.count();
-
         let partition_id =
             self.streams2
                 .with_topic_by_id(
@@ -128,308 +120,113 @@ impl IggyShard {
                     },
                 )?;
 
-        // Deduplicate messages and adjust their offsets.
-        let current_offset = self
-            .streams2
-            .with_partition_by_id_async(
-                stream_id,
-                topic_id,
-                partition_id,
-                async |(root, _, deduplicator, offset, _, _, log)| {
-                    let segment = log.active_segment();
-                    let current_offset = if !root.should_increment_offset() {
-                        0
-                    } else {
-                        offset.load(Ordering::Relaxed) + 1
-                    };
-                    batch
-                        .prepare_for_persistence(
-                            segment.start_offset,
-                            current_offset,
-                            segment.size,
-                            deduplicator.as_ref(),
-                        )
-                        .await;
-                    current_offset
-                },
-            )
-            .await;
-
-        // Append to the journal.
-        let (journal_messages_count, journal_size) = 
self.streams2.with_partition_by_id_mut(
-            stream_id,
-            topic_id,
-            partition_id,
-            |(root, stats, _, offset, .., log)| {
-                let segment = log.active_segment_mut();
-
-                if segment.end_offset == 0 {
-                    segment.start_timestamp = batch.first_timestamp().unwrap();
-                }
-
-                let batch_messages_size = batch.size();
-                let batch_messages_count = batch.count();
-
-                segment.end_timestamp = batch.last_timestamp().unwrap();
-                segment.end_offset = batch.last_offset().unwrap();
-                segment.size += batch_messages_size;
-
-                let (journal_messages_count, journal_size) =
-                    log.journal_mut().append(self.id, batch)?;
-
-                stats.increment_messages_count(batch_messages_count as u64);
-                stats.increment_size_bytes(batch_messages_size as u64);
-
-                let last_offset = if batch_messages_count == 0 {
-                    current_offset
-                } else {
-                    current_offset + batch_messages_count as u64 - 1
-                };
-
-                if root.should_increment_offset() {
-                    offset.store(last_offset, Ordering::Relaxed);
-                } else {
-                    root.set_should_increment_offset(true);
-                    offset.store(last_offset, Ordering::Relaxed);
-                }
-
-                Ok((journal_messages_count, journal_size))
-            },
-        )?;
-
-        let unsaved_messages_count_exceeded =
-            journal_messages_count >= 
self.config.system.partition.messages_required_to_save;
-        let unsaved_messages_size_exceeded = journal_size
-            >= self
-                .config
-                .system
-                .partition
-                .size_of_messages_required_to_save
-                .as_bytes_u64() as u32;
-
-        let is_full =
-            self.streams2
-                .with_partition_by_id(stream_id, topic_id, partition_id, |(.., 
log)| {
-                    log.active_segment().is_full()
-                });
+        let namespace = IggyNamespace::new(numeric_stream_id, 
numeric_topic_id, partition_id);
+        let payload = ShardRequestPayload::SendMessages { batch: batch };
+        let request = ShardRequest::new(stream_id.clone(), topic_id.clone(), 
partition_id, payload);
+        let message = ShardMessage::Request(request);
+        match self
+            .send_request_to_shard_or_recoil(&namespace, message)
+            .await?
+        {
+            ShardSendRequestResult::Recoil(message) => {
+                if let ShardMessage::Request(ShardRequest {
+                    partition_id,
+                    payload,
+                    ..
+                }) = message
+                    && let ShardRequestPayload::SendMessages { batch } = 
payload
+                {
+                    // Encrypt messages if encryptor is enabled in 
configuration.
+                    let mut batch = self.maybe_encrypt_messages(batch)?;
+                    let messages_count = batch.count();
 
-        // Try committing the journal
-        if is_full || unsaved_messages_count_exceeded || 
unsaved_messages_size_exceeded {
-            let batches = self.streams2.with_partition_by_id_mut(
-                stream_id,
-                topic_id,
-                partition_id,
-                |(.., log)| {
-                    let batches = log.journal_mut().commit();
-                    log.ensure_indexes();
-                    batches.append_indexes_to(log.indexes_mut().unwrap());
-                    batches
-                },
-            );
+                    let current_offset = self.streams2.with_partition_by_id(
+                        stream_id,
+                        topic_id,
+                        partition_id,
+                        partitions::helpers::calculate_current_offset(),
+                    );
 
-            let (saved, batch_count) = self.streams2
-                .with_partition_by_id_async(
-                    stream_id,
-                    topic_id,
-                    partition_id,
-                    async |(.., log)| {
-                        let reason = if unsaved_messages_count_exceeded {
-                            format!(
-                                "unsaved messages count exceeded: {}, max from 
config: {}",
-                                journal_messages_count,
-                                
self.config.system.partition.messages_required_to_save,
-                            )
-                        } else if unsaved_messages_size_exceeded {
-                            format!(
-                                "unsaved messages size exceeded: {}, max from 
config: {}",
-                                journal_size,
-                                
self.config.system.partition.size_of_messages_required_to_save,
-                            )
-                        } else {
-                            format!(
-                                "segment is full, current size: {}, max from 
config: {}",
-                                log.active_segment().size,
-                                self.config.system.segment.size,
-                            )
-                        };
-                        shard_trace!(
-                            self.id,
-                            "Persisting messages on disk for stream ID: {}, 
topic ID: {}, partition ID: {} because {}...",
+                    self.streams2
+                        .with_partition_by_id_async(
                             stream_id,
                             topic_id,
                             partition_id,
-                            reason
-                        );
-
-                        let batch_count = batches.count();
-                        let batch_size = batches.size();
-
-                        let storage = log.active_storage();
-                        let saved = storage
-                            .messages_writer
-                            .as_ref()
-                            .expect("Messages writer not initialized")
-                            .save_batch_set(batches)
-                            .await
-                            .with_error_context(|error| {
-                                let segment = log.active_segment();
-                                format!(
-                                    "Failed to save batch of {batch_count} 
messages \
-                                    ({batch_size} bytes) to {segment}. 
{error}",
-                                )
-                            })?;
-
-                        let unsaved_indexes_slice = 
log.indexes().unwrap().unsaved_slice();
-                        let len = unsaved_indexes_slice.len();
-                        storage
-                            .index_writer
-                            .as_ref()
-                            .expect("Index writer not initialized")
-                            .save_indexes(unsaved_indexes_slice)
-                            .await
-                            .with_error_context(|error| {
-                                let segment = log.active_segment();
-                                format!(
-                                    "Failed to save index of {len} indexes to 
{segment}. {error}",
-                                )
-                            })?;
-
-                        shard_trace!(
-                            self.id,
-                            "Persisted {} messages on disk for stream ID: {}, 
topic ID: {}, for partition with ID: {}, total bytes written: {}.",
-                            batch_count, stream_id, topic_id, partition_id, 
saved
-                        );
-
-                        Ok((saved, batch_count))
-                    },
-                )
-                .await?;
-
-            self.streams2.with_partition_by_id_mut(
-                stream_id,
-                topic_id,
-                partition_id,
-                |(_, stats, .., log)| {
-                    log.active_segment_mut().size += saved.as_bytes_u32();
-                    log.indexes_mut().unwrap().mark_saved();
-                    if self.config.system.segment.cache_indexes == 
CacheIndexesConfig::None {
-                        log.indexes_mut().unwrap().clear();
-                    }
-                    stats.increment_size_bytes(saved.as_bytes_u64());
-                    stats.increment_messages_count(batch_count as u64);
-                },
-            );
-
-            // Handle possibly full segment after saving.
-            let is_segment_full = self.streams2.with_partition_by_id(
-                stream_id,
-                topic_id,
-                partition_id,
-                |(.., log)| log.active_segment().is_full(),
-            );
-
-            if is_segment_full {
-                let (start_offset, size, end_offset) = 
self.streams2.with_partition_by_id(
-                    stream_id,
-                    topic_id,
-                    partition_id,
-                    |(.., log)| {
-                        (
-                            log.active_segment().start_offset,
-                            log.active_segment().size,
-                            log.active_segment().end_offset,
+                            
partitions::helpers::deduplicate_messages(current_offset, &mut batch),
                         )
-                    },
-                );
-
-                let numeric_stream_id = self
-                    .streams2
-                    .with_stream_by_id(stream_id, 
streams::helpers::get_stream_id());
-                let numeric_topic_id = self.streams2.with_topic_by_id(
-                    stream_id,
-                    topic_id,
-                    topics::helpers::get_topic_id(),
-                );
+                        .await;
 
-                if self.config.system.segment.cache_indexes == 
CacheIndexesConfig::OpenSegment
-                    || self.config.system.segment.cache_indexes == 
CacheIndexesConfig::None
-                {
-                    self.streams2.with_partition_by_id_mut(
+                    let (journal_messages_count, journal_size) =
+                        self.streams2.with_partition_by_id_mut(
+                            stream_id,
+                            topic_id,
+                            partition_id,
+                            partitions::helpers::append_to_journal(self.id, 
current_offset, batch),
+                        )?;
+
+                    let unsaved_messages_count_exceeded = 
journal_messages_count
+                        >= 
self.config.system.partition.messages_required_to_save;
+                    let unsaved_messages_size_exceeded = journal_size
+                        >= self
+                            .config
+                            .system
+                            .partition
+                            .size_of_messages_required_to_save
+                            .as_bytes_u64() as u32;
+
+                    let is_full = self.streams2.with_partition_by_id(
                         stream_id,
                         topic_id,
                         partition_id,
-                        |(.., log)| {
-                            log.clear_indexes();
-                        },
+                        partitions::helpers::is_segment_full(),
                     );
-                }
 
-                self.streams2.with_partition_by_id_mut(
-                    stream_id,
-                    topic_id,
-                    partition_id,
-                    |(.., log)| {
-                        log.active_segment_mut().sealed = true;
-                    },
-                );
-                let (log_writer, index_writer) = 
self.streams2.with_partition_by_id_mut(
-                    stream_id,
-                    topic_id,
-                    partition_id,
-                    |(.., log)| log.active_storage_mut().shutdown(),
-                );
-
-                compio::runtime::spawn(async move {
-                    let _ = log_writer.fsync().await;
-                })
-                .detach();
-                compio::runtime::spawn(async move {
-                    let _ = index_writer.fsync().await;
-                    drop(index_writer)
-                })
-                .detach();
-
-                shard_info!(
-                    self.id,
-                    "Closed segment for stream: {}, topic: {} with start 
offset: {}, end offset: {}, size: {} for partition with ID: {}.",
-                    stream_id,
-                    topic_id,
-                    start_offset,
-                    end_offset,
-                    size,
-                    partition_id
-                );
-
-                let messages_size = 0;
-                let indexes_size = 0;
-                let segment = Segment2::new(
-                    end_offset + 1,
-                    self.config.system.segment.size,
-                    self.config.system.segment.message_expiry,
-                );
-
-                let storage = create_segment_storage(
-                    &self.config.system,
-                    numeric_stream_id,
-                    numeric_topic_id,
-                    partition_id,
-                    messages_size,
-                    indexes_size,
-                    end_offset + 1,
-                )
-                .await?;
-                self.streams2.with_partition_by_id_mut(
-                    stream_id,
-                    topic_id,
-                    partition_id,
-                    |(.., log)| {
-                        log.add_persisted_segment(segment, storage);
-                    },
-                );
+                    // Try committing the journal
+                    if is_full || unsaved_messages_count_exceeded || 
unsaved_messages_size_exceeded
+                    {
+                        self.streams2
+                            .persist_messages(
+                                self.id,
+                                stream_id,
+                                topic_id,
+                                partition_id,
+                                unsaved_messages_count_exceeded,
+                                unsaved_messages_size_exceeded,
+                                journal_messages_count,
+                                journal_size,
+                                &self.config.system,
+                            )
+                            .await?;
+
+                        if is_full {
+                            self.streams2
+                                .handle_full_segment(
+                                    self.id,
+                                    stream_id,
+                                    topic_id,
+                                    partition_id,
+                                    &self.config.system,
+                                )
+                                .await?;
+                            self.metrics.increment_messages(messages_count as 
u64);
+                        }
+                    }
+                    Ok(())
+                } else {
+                    unreachable!(
+                        "Expected a SendMessages request inside of 
SendMessages handler, impossible state"
+                    );
+                }
             }
-        }
+            ShardSendRequestResult::Response(response) => match response {
+                ShardResponse::SendMessages => Ok(()),
+                ShardResponse::ErrorResponse(err) => Err(err),
+                _ => unreachable!(
+                    "Expected a SendMessages response inside of SendMessages 
handler, impossible state"
+                ),
+            },
+        }?;
 
-        self.metrics.increment_messages(messages_count as u64);
         Ok(())
     }
 
@@ -443,25 +240,16 @@ impl IggyShard {
         maybe_partition_id: Option<u32>,
         args: PollingArgs,
     ) -> Result<(IggyPollMetadata, IggyMessagesBatchSet), IggyError> {
-        todo!();
-        /*
-        let stream = self.get_stream(stream_id).with_error_context(|error| {
-            format!(
-                "{COMPONENT} (error: {error}) - stream not found for stream 
ID: {}",
-                stream_id
-            )
-        })?;
-        let stream_id = stream.stream_id;
-        let numeric_topic_id = stream.get_topic(topic_id).map(|topic| 
topic.topic_id).with_error_context(|error| {
-            format!(
-                "{COMPONENT} (error: {error}) - topic not found for stream ID: 
{}, topic_id: {}",
-                stream_id, topic_id
-            )
-        })?;
+        let numeric_stream_id = self
+            .streams2
+            .with_stream_by_id(stream_id, streams::helpers::get_stream_id());
+        let numeric_topic_id =
+            self.streams2
+                .with_topic_by_id(stream_id, topic_id, 
topics::helpers::get_topic_id());
 
         self.permissioner
             .borrow()
-            .poll_messages(user_id, stream_id, numeric_topic_id)
+            .poll_messages(user_id, numeric_stream_id as u32, numeric_topic_id 
as u32)
             .with_error_context(|error| format!(
                 "{COMPONENT} (error: {error}) - permission denied to poll 
messages for user {} on stream ID: {}, topic ID: {}",
                 user_id,
@@ -469,49 +257,282 @@ impl IggyShard {
                 numeric_topic_id
             ))?;
 
-        // There might be no partition assigned, if it's the consumer group 
member without any partitions.
-        //return Ok((IggyPollMetadata::new(0, 0), 
IggyMessagesBatchSet::empty()));
-
-        let (metadata, batch) = stream.poll_messages(topic_id, client_id, 
consumer, maybe_partition_id, args.auto_commit, async |topic, consumer, 
partition_id|  {
-            let namespace = IggyNamespace::new(stream.stream_id, 
topic.topic_id, partition_id);
-            let payload = ShardRequestPayload::PollMessages {
-                consumer,
-                args,
-            };
-            let request = ShardRequest::new(stream.stream_id, topic.topic_id, 
partition_id, payload);
-            let message = ShardMessage::Request(request);
-
-            match self
-                    .send_request_to_shard_or_recoil(&namespace, message)
-                    .await?
+        // Resolve partition ID
+        let Some((consumer, partition_id)) = 
self.resolve_consumer_with_partition_id(
+            stream_id,
+            topic_id,
+            &consumer,
+            client_id,
+            maybe_partition_id,
+            true,
+        ) else {
+            return Ok((IggyPollMetadata::new(0, 0), 
IggyMessagesBatchSet::empty()));
+        };
+
+        let has_partition = self
+            .streams2
+            .with_topic_by_id(stream_id, topic_id, |(root, ..)| {
+                root.partitions().exists(partition_id)
+            });
+        if !has_partition {
+            return Err(IggyError::NoPartitions(
+                numeric_topic_id as u32,
+                numeric_stream_id as u32,
+            ));
+        }
+
+        let current_offset = self.streams2.with_partition_by_id(
+            stream_id,
+            topic_id,
+            partition_id,
+            |(_, _, _, offset, ..)| offset.load(Ordering::Relaxed),
+        );
+        if args.strategy.kind == PollingKind::Offset && args.strategy.value > 
current_offset
+            || args.count == 0
+        {
+            return Ok((
+                IggyPollMetadata::new(partition_id as u32, current_offset),
+                IggyMessagesBatchSet::empty(),
+            ));
+        }
+
+        let namespace = IggyNamespace::new(numeric_stream_id, 
numeric_topic_id, partition_id);
+        let payload = ShardRequestPayload::PollMessages { consumer, args };
+        let request = ShardRequest::new(stream_id.clone(), topic_id.clone(), 
partition_id, payload);
+        let message = ShardMessage::Request(request);
+        let (metadata, batch) = match self
+            .send_request_to_shard_or_recoil(&namespace, message)
+            .await?
+        {
+            ShardSendRequestResult::Recoil(message) => {
+                if let ShardMessage::Request(ShardRequest {
+                    partition_id,
+                    payload,
+                    ..
+                }) = message
+                    && let ShardRequestPayload::PollMessages { consumer, args 
} = payload
                 {
-                    ShardSendRequestResult::Recoil(message) => {
-                        if let ShardMessage::Request( ShardRequest { 
partition_id, payload, .. } ) = message
-                            && let ShardRequestPayload::PollMessages { 
consumer, args } = payload
-                        {
-                            topic.get_messages(consumer, partition_id, 
args.strategy, args.count).await.with_error_context(|error| {
-                                format!("{COMPONENT}: Failed to get messages 
for stream_id: {stream_id}, topic_id: {topic_id}, partition_id: {partition_id}, 
error: {error})")
-                            })
-                        } else {
-                            unreachable!(
-                                "Expected a PollMessages request inside of 
PollMessages handler, impossible state"
+                    let metadata = IggyPollMetadata::new(partition_id as u32, 
current_offset);
+                    let count = args.count;
+                    let strategy = args.strategy;
+                    let value = strategy.value;
+                    let batches = match strategy.kind {
+                        PollingKind::Offset => {
+                            let offset = value;
+                            // We have to remember to keep the invariant from 
the if that is on line 496.
+                            // Alternatively a better design would be to get 
rid of that if and move the validations here.
+                            let batches = self
+                                .streams2
+                                .get_messages_by_offset(
+                                    stream_id,
+                                    topic_id,
+                                    partition_id,
+                                    offset,
+                                    count,
+                                )
+                                .await?;
+                            Ok(batches)
+                        }
+                        PollingKind::Timestamp => {
+                            let timestamp = IggyTimestamp::from(value);
+                            let timestamp_ts = timestamp.as_micros();
+                            trace!(
+                                "Getting {count} messages by timestamp: {} for 
partition: {}...",
+                                timestamp_ts, partition_id
                             );
+
+                            let batches = self
+                                .streams2
+                                .get_messages_by_timestamp(
+                                    stream_id,
+                                    topic_id,
+                                    partition_id,
+                                    timestamp_ts,
+                                    count,
+                                )
+                                .await?;
+                            Ok(batches)
                         }
-                    }
-                    ShardSendRequestResult::Response(response) => {
-                        match response {
-                            ShardResponse::PollMessages(result) =>  { 
Ok(result) }
-                            ShardResponse::ErrorResponse(err) => {
-                                Err(err)
-                            }
-                            _ => unreachable!(
-                                "Expected a SendMessages response inside of 
SendMessages handler, impossible state"
-                            ),
+                        PollingKind::First => {
+                            let first_offset = 
self.streams2.with_partition_by_id(
+                                stream_id,
+                                topic_id,
+                                partition_id,
+                                |(_, _, _, _, _, _, log)| {
+                                    log.segments()
+                                        .first()
+                                        .map(|segment| segment.start_offset)
+                                        .unwrap_or(0)
+                                },
+                            );
+
+                            let batches = self
+                                .streams2
+                                .get_messages_by_offset(
+                                    stream_id,
+                                    topic_id,
+                                    partition_id,
+                                    first_offset,
+                                    count,
+                                )
+                                .await?;
+                            Ok(batches)
                         }
+                        PollingKind::Last => {
+                            let (start_offset, actual_count) = 
self.streams2.with_partition_by_id(
+                                stream_id,
+                                topic_id,
+                                partition_id,
+                                |(_, _, _, offset, _, _, _)| {
+                                    let current_offset = 
offset.load(Ordering::Relaxed);
+                                    let mut requested_count = 0;
+                                    if requested_count > current_offset + 1 {
+                                        requested_count = current_offset + 1
+                                    }
+                                    let start_offset = 1 + current_offset - 
requested_count;
+                                    (start_offset, requested_count as u32)
+                                },
+                            );
 
+                            let batches = self
+                                .streams2
+                                .get_messages_by_offset(
+                                    stream_id,
+                                    topic_id,
+                                    partition_id,
+                                    start_offset,
+                                    actual_count,
+                                )
+                                .await?;
+                            Ok(batches)
+                        }
+                        PollingKind::Next => {
+                            let (consumer_offset, consumer_id) = match 
consumer {
+                                PollingConsumer::Consumer(consumer_id, _) => (
+                                    self.streams2
+                                        .with_partition_by_id(
+                                            stream_id,
+                                            topic_id,
+                                            partition_id,
+                                            
partitions::helpers::get_consumer_offset(consumer_id),
+                                        )
+                                        .map(|c_offset| 
c_offset.stored_offset),
+                                    consumer_id,
+                                ),
+                                PollingConsumer::ConsumerGroup(cg_id, _) => (
+                                    self.streams2
+                                        .with_partition_by_id(
+                                            stream_id,
+                                            topic_id,
+                                            partition_id,
+                                            
partitions::helpers::get_consumer_group_member_offset(
+                                                cg_id,
+                                            ),
+                                        )
+                                        .map(|cg_offset| 
cg_offset.stored_offset),
+                                    cg_id,
+                                ),
+                            };
+
+                            let Some(consumer_offset) = consumer_offset else {
+                                return 
Err(IggyError::ConsumerOffsetNotFound(consumer_id));
+                            };
+                            let offset = consumer_offset + 1;
+                            trace!(
+                                "Getting next messages for consumer id: {} for 
partition: {} from offset: {}...",
+                                consumer_id, partition_id, offset
+                            );
+                            let batches = self
+                                .streams2
+                                .get_messages_by_offset(
+                                    stream_id,
+                                    topic_id,
+                                    partition_id,
+                                    offset,
+                                    count,
+                                )
+                                .await?;
+                            Ok(batches)
+                        }
+                    }?;
+
+                    if args.auto_commit && !batches.is_empty() {
+                        let offset = batches
+                            .last_offset()
+                            .expect("Batch set should have at least one 
batch");
+                        trace!(
+                            "Last offset: {} will be automatically stored for 
{}, stream: {}, topic: {}, partition: {}",
+                            offset, consumer, numeric_stream_id, 
numeric_topic_id, partition_id
+                        );
+                        match consumer {
+                            PollingConsumer::Consumer(consumer_id, _) => {
+                                self.streams2.with_partition_by_id(
+                                    stream_id,
+                                    topic_id,
+                                    partition_id,
+                                    partitions::helpers::store_consumer_offset(
+                                        consumer_id,
+                                        numeric_stream_id,
+                                        numeric_topic_id,
+                                        partition_id,
+                                        offset,
+                                        &self.config.system,
+                                    ),
+                                );
+                                self.streams2
+                                    .with_partition_by_id_async(
+                                        stream_id,
+                                        topic_id,
+                                        partition_id,
+                                        
partitions::helpers::persist_consumer_offset_to_disk(
+                                            self.id,
+                                            consumer_id,
+                                        ),
+                                    )
+                                    .await?;
+                            }
+                            PollingConsumer::ConsumerGroup(cg_id, _) => {
+                                self.streams2.with_partition_by_id(
+                                    stream_id,
+                                    topic_id,
+                                    partition_id,
+                                    
partitions::helpers::store_consumer_group_member_offset(
+                                        cg_id,
+                                        numeric_stream_id,
+                                        numeric_topic_id,
+                                        partition_id,
+                                        offset,
+                                        &self.config.system,
+                                    ),
+                                );
+                                self.streams2.with_partition_by_id_async(
+                                    stream_id,
+                                    topic_id,
+                                    partition_id,
+                                    
partitions::helpers::persist_consumer_group_member_offset_to_disk(
+                                        self.id,
+                                        cg_id,
+                                    ),
+                                )
+                                .await?;
+                            }
+                        }
                     }
+                    Ok((metadata, batches))
+                } else {
+                    unreachable!(
+                        "Expected a PollMessages request inside of 
PollMessages handler, impossible state"
+                    );
                 }
-        }).await?;
+            }
+            ShardSendRequestResult::Response(response) => match response {
+                ShardResponse::PollMessages(result) => Ok(result),
+                ShardResponse::ErrorResponse(err) => Err(err),
+                _ => unreachable!(
+                    "Expected a SendMessages response inside of SendMessages 
handler, impossible state"
+                ),
+            },
+        }?;
 
         let batch = if let Some(_encryptor) = &self.encryptor {
             //TODO: Bring back decryptor
@@ -522,7 +543,6 @@ impl IggyShard {
         };
 
         Ok((metadata, batch))
-        */
     }
 
     pub async fn flush_unsaved_buffer(
diff --git a/core/server/src/shard/system/partitions.rs 
b/core/server/src/shard/system/partitions.rs
index 892110ab..7b1eed84 100644
--- a/core/server/src/shard/system/partitions.rs
+++ b/core/server/src/shard/system/partitions.rs
@@ -21,11 +21,8 @@ use crate::shard::IggyShard;
 use crate::shard::ShardInfo;
 use crate::shard::namespace::IggyNamespace;
 use crate::shard_info;
-use crate::slab::traits_ext::EntityComponentSystem;
-use crate::slab::traits_ext::EntityComponentSystemMutCell;
 use crate::slab::traits_ext::EntityMarker;
 use crate::slab::traits_ext::IntoComponents;
-use crate::slab::traits_ext::IntoComponentsById;
 use crate::streaming::partitions;
 use crate::streaming::partitions::helpers::create_message_deduplicator;
 use crate::streaming::partitions::journal::MemoryMessageJournal;
@@ -113,7 +110,6 @@ impl IggyShard {
         self.metrics.increment_partitions(partitions_count);
         self.metrics.increment_segments(partitions_count);
 
-        // TODO: Figure out how to do this operation in a batch.
         for partition_id in partitions.iter().map(|p| p.id()) {
             // TODO: Create shard table recordsj.
             let ns = IggyNamespace::new(numeric_stream_id, numeric_topic_id, 
partition_id);
@@ -259,6 +255,12 @@ impl IggyShard {
         topic_id: &Identifier,
         partitions: Vec<partition2::Partition>,
     ) -> Result<(), IggyError> {
+        let numeric_stream_id = self
+            .streams2
+            .with_stream_by_id(stream_id, streams::helpers::get_stream_id());
+        let numeric_topic_id =
+            self.streams2
+                .with_topic_by_id(stream_id, topic_id, 
topics::helpers::get_topic_id());
         for partition in partitions {
             let actual_id = partition.id();
             let id = self.insert_partition_mem(stream_id, topic_id, partition);
@@ -266,7 +268,13 @@ impl IggyShard {
                 id, actual_id,
                 "create_partitions_bypass_auth: partition mismatch ID, wrong 
creation order ?!"
             );
-            self.init_log(stream_id, topic_id, id).await?;
+            let ns = IggyNamespace::new(numeric_stream_id, numeric_topic_id, 
id);
+            let shard_info = self
+                .find_shard_table_record(&ns)
+                .expect("create_partitions_bypass_auth: missing shard table 
record");
+            if self.id == shard_info.id {
+                self.init_log(stream_id, topic_id, id).await?;
+            }
         }
         Ok(())
     }
diff --git a/core/server/src/shard/system/utils.rs 
b/core/server/src/shard/system/utils.rs
index 1aa2857e..0c9b637b 100644
--- a/core/server/src/shard/system/utils.rs
+++ b/core/server/src/shard/system/utils.rs
@@ -61,7 +61,7 @@ impl IggyShard {
     ) -> Option<(PollingConsumer, usize)> {
         match consumer.kind {
             ConsumerKind::Consumer => {
-                let partition_id = partition_id.unwrap_or(1);
+                let partition_id = partition_id.unwrap_or(0);
                 Some((
                     PollingConsumer::consumer(&consumer.id, partition_id as 
usize),
                     partition_id as usize,
diff --git a/core/server/src/shard/transmission/message.rs 
b/core/server/src/shard/transmission/message.rs
index 73d2a4e8..a4e4bb3a 100644
--- a/core/server/src/shard/transmission/message.rs
+++ b/core/server/src/shard/transmission/message.rs
@@ -1,6 +1,6 @@
 use std::{rc::Rc, sync::Arc};
 
-use iggy_common::PollingStrategy;
+use iggy_common::{Identifier, PollingStrategy};
 
 /* Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -24,6 +24,7 @@ use crate::{
         system::messages::PollingArgs,
         transmission::{event::ShardEvent, frame::ShardResponse},
     },
+    slab::partitions,
     streaming::{polling_consumer::PollingConsumer, 
segments::IggyMessagesBatchMut},
 };
 
@@ -41,17 +42,17 @@ pub enum ShardMessage {
 
 #[derive(Debug)]
 pub struct ShardRequest {
-    pub stream_id: u32,
-    pub topic_id: u32,
-    pub partition_id: u32,
+    pub stream_id: Identifier,
+    pub topic_id: Identifier,
+    pub partition_id: usize,
     pub payload: ShardRequestPayload,
 }
 
 impl ShardRequest {
     pub fn new(
-        stream_id: u32,
-        topic_id: u32,
-        partition_id: u32,
+        stream_id: Identifier,
+        topic_id: Identifier,
+        partition_id: partitions::ContainerId,
         payload: ShardRequestPayload,
     ) -> Self {
         Self {
diff --git a/core/server/src/slab/helpers.rs b/core/server/src/slab/helpers.rs
index 3f72b20a..31a5a8b7 100644
--- a/core/server/src/slab/helpers.rs
+++ b/core/server/src/slab/helpers.rs
@@ -27,11 +27,11 @@ where
     async |(root, ..)| f(root.topics()).await
 }
 
-pub fn topics_mut<O, F>(f: F) -> impl FnOnce(ComponentsById<StreamRefMut>) -> O
+pub fn topics_mut<O, F>(f: F) -> impl FnOnce(ComponentsById<StreamRef>) -> O
 where
-    F: for<'a> FnOnce(&'a mut Topics) -> O,
+    F: for<'a> FnOnce(&'a Topics) -> O,
 {
-    |(mut root, ..)| f(root.topics_mut())
+    |(root, ..)| f(root.topics())
 }
 
 pub fn partitions<O, F>(f: F) -> impl FnOnce(ComponentsById<TopicRef>) -> O
diff --git a/core/server/src/slab/partitions.rs 
b/core/server/src/slab/partitions.rs
index d8ee0adf..7c7f15b7 100644
--- a/core/server/src/slab/partitions.rs
+++ b/core/server/src/slab/partitions.rs
@@ -196,6 +196,10 @@ impl Partitions {
         self.with_components_by_id(id, |components| f(components))
     }
 
+    pub fn exists(&self, id: ContainerId) -> bool {
+        self.root.contains(id)
+    }
+
     pub fn with_partition_by_id_mut<T>(
         &mut self,
         id: ContainerId,
diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs
index 1cd2989e..0c84f7a3 100644
--- a/core/server/src/slab/streams.rs
+++ b/core/server/src/slab/streams.rs
@@ -1,4 +1,5 @@
 use crate::{
+    configs::{cache_indexes::CacheIndexesConfig, system::SystemConfig},
     slab::{
         Keyed,
         consumer_groups::ConsumerGroups,
@@ -6,30 +7,33 @@ use crate::{
         partitions::{self, Partitions},
         topics::Topics,
         traits_ext::{
-            ComponentsById, ComponentsByIdMapping, ComponentsMapping, 
DeleteCell,
-            EntityComponentSystem, EntityComponentSystemMutCell, InsertCell, 
InteriorMutability,
-            IntoComponents,
+            ComponentsById, DeleteCell, EntityComponentSystem, 
EntityComponentSystemMutCell,
+            InsertCell, InteriorMutability, IntoComponents,
         },
     },
     streaming::{
-        partitions::{
-            journal::MemoryMessageJournal,
-            log::{Log, SegmentedLog},
-            partition2::{PartitionRef, PartitionRefMut},
-        },
+        partitions::partition2::{PartitionRef, PartitionRefMut},
+        segments::{IggyMessagesBatchSet, Segment2, 
storage::create_segment_storage},
         stats::stats::StreamStats,
-        streams::stream2::{self, StreamRef, StreamRefMut},
+        streams::{
+            self,
+            stream2::{self, StreamRef, StreamRefMut},
+        },
         topics::{
+            self,
             consumer_group2::{ConsumerGroupRef, ConsumerGroupRefMut},
-            topic2::{self, TopicRef, TopicRefMut},
+            topic2::{TopicRef, TopicRefMut},
         },
     },
 };
 use ahash::AHashMap;
-use iggy_common::Identifier;
+use iggy_common::{Identifier, IggyError};
 use slab::Slab;
 use std::{cell::RefCell, sync::Arc};
 
+// Import streaming partitions helpers for the persist_messages method
+use crate::streaming::partitions as streaming_partitions;
+
 const CAPACITY: usize = 1024;
 pub type ContainerId = usize;
 
@@ -94,7 +98,7 @@ impl DeleteCell for Streams {
     type Idx = ContainerId;
     type Item = stream2::Stream;
 
-    fn delete(&self, id: Self::Idx) -> Self::Item {
+    fn delete(&self, _id: Self::Idx) -> Self::Item {
         todo!()
     }
 }
@@ -213,12 +217,8 @@ impl Streams {
         self.with_stream_by_id_async(stream_id, helpers::topics_async(f))
     }
 
-    pub fn with_topics_mut<T>(
-        &self,
-        stream_id: &Identifier,
-        f: impl FnOnce(&mut Topics) -> T,
-    ) -> T {
-        self.with_stream_by_id_mut(stream_id, helpers::topics_mut(f))
+    pub fn with_topics_mut<T>(&self, stream_id: &Identifier, f: impl 
FnOnce(&Topics) -> T) -> T {
+        self.with_stream_by_id(stream_id, helpers::topics_mut(f))
     }
 
     pub fn with_topic_by_id<T>(
@@ -395,4 +395,199 @@ impl Streams {
     pub fn len(&self) -> usize {
         self.root.borrow().len()
     }
+
+    pub async fn get_messages_by_offset(
+        &self,
+        stream_id: &Identifier,
+        topic_id: &Identifier,
+        partition_id: partitions::ContainerId,
+        offset: u64,
+        count: u32,
+    ) -> Result<IggyMessagesBatchSet, IggyError> {
+        use crate::streaming::partitions::helpers;
+        let range = self.with_partition_by_id(
+            stream_id,
+            topic_id,
+            partition_id,
+            helpers::get_segment_range_by_offset(offset),
+        );
+
+        self.with_partition_by_id_async(
+            stream_id,
+            topic_id,
+            partition_id,
+            helpers::get_messages_by_offset_range(offset, count, range),
+        )
+        .await
+    }
+
+    pub async fn get_messages_by_timestamp(
+        &self,
+        stream_id: &Identifier,
+        topic_id: &Identifier,
+        partition_id: partitions::ContainerId,
+        timestamp: u64,
+        count: u32,
+    ) -> Result<IggyMessagesBatchSet, IggyError> {
+        use crate::streaming::partitions::helpers;
+        let range = self.with_partition_by_id(
+            stream_id,
+            topic_id,
+            partition_id,
+            helpers::get_segment_range_by_timestamp(timestamp),
+        );
+
+        self.with_partition_by_id_async(
+            stream_id,
+            topic_id,
+            partition_id,
+            helpers::get_messages_by_timestamp_range(timestamp, count, range),
+        )
+        .await
+    }
+
+    pub async fn handle_full_segment(
+        &self,
+        shard_id: u16,
+        stream_id: &Identifier,
+        topic_id: &Identifier,
+        partition_id: partitions::ContainerId,
+        config: &crate::configs::system::SystemConfig,
+    ) -> Result<(), IggyError> {
+        let numeric_stream_id =
+            self.with_stream_by_id(stream_id, 
streams::helpers::get_stream_id());
+        let numeric_topic_id =
+            self.with_topic_by_id(stream_id, topic_id, 
topics::helpers::get_topic_id());
+
+        if config.segment.cache_indexes == CacheIndexesConfig::OpenSegment
+            || config.segment.cache_indexes == CacheIndexesConfig::None
+        {
+            self.with_partition_by_id_mut(stream_id, topic_id, partition_id, 
|(.., log)| {
+                log.clear_active_indexes();
+            });
+        }
+
+        self.with_partition_by_id_mut(stream_id, topic_id, partition_id, |(.., 
log)| {
+            log.active_segment_mut().sealed = true;
+        });
+        let (log_writer, index_writer) =
+            self.with_partition_by_id_mut(stream_id, topic_id, partition_id, 
|(.., log)| {
+                log.active_storage_mut().shutdown()
+            });
+
+        compio::runtime::spawn(async move {
+            let _ = log_writer.fsync().await;
+        })
+        .detach();
+        compio::runtime::spawn(async move {
+            let _ = index_writer.fsync().await;
+            drop(index_writer)
+        })
+        .detach();
+
+        let (start_offset, size, end_offset) =
+            self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., 
log)| {
+                (
+                    log.active_segment().start_offset,
+                    log.active_segment().size,
+                    log.active_segment().end_offset,
+                )
+            });
+
+        crate::shard_info!(
+            shard_id,
+            "Closed segment for stream: {}, topic: {} with start offset: {}, 
end offset: {}, size: {} for partition with ID: {}.",
+            stream_id,
+            topic_id,
+            start_offset,
+            end_offset,
+            size,
+            partition_id
+        );
+
+        let messages_size = 0;
+        let indexes_size = 0;
+        let segment = Segment2::new(
+            end_offset + 1,
+            config.segment.size,
+            config.segment.message_expiry,
+        );
+
+        let storage = create_segment_storage(
+            &config,
+            numeric_stream_id,
+            numeric_topic_id,
+            partition_id,
+            messages_size,
+            indexes_size,
+            end_offset + 1,
+        )
+        .await?;
+        self.with_partition_by_id_mut(stream_id, topic_id, partition_id, |(.., 
log)| {
+            log.add_persisted_segment(segment, storage);
+        });
+
+        Ok(())
+    }
+
+    pub async fn persist_messages(
+        &self,
+        shard_id: u16,
+        stream_id: &Identifier,
+        topic_id: &Identifier,
+        partition_id: usize,
+        unsaved_messages_count_exceeded: bool,
+        unsaved_messages_size_exceeded: bool,
+        journal_messages_count: u32,
+        journal_size: u32,
+        config: &SystemConfig,
+    ) -> Result<(), IggyError> {
+        let batches = self.with_partition_by_id_mut(
+            stream_id,
+            topic_id,
+            partition_id,
+            streaming_partitions::helpers::commit_journal(),
+        );
+
+        let reason = self.with_partition_by_id(
+            stream_id,
+            topic_id,
+            partition_id,
+            streaming_partitions::helpers::persist_reason(
+                unsaved_messages_count_exceeded,
+                unsaved_messages_size_exceeded,
+                journal_messages_count,
+                journal_size,
+                config,
+            ),
+        );
+        let (saved, batch_count) = self
+            .with_partition_by_id_async(
+                stream_id,
+                topic_id,
+                partition_id,
+                streaming_partitions::helpers::persist_batch(
+                    shard_id,
+                    stream_id,
+                    topic_id,
+                    partition_id,
+                    batches,
+                    reason,
+                ),
+            )
+            .await?;
+
+        self.with_partition_by_id_mut(
+            stream_id,
+            topic_id,
+            partition_id,
+            streaming_partitions::helpers::update_index_and_increment_stats(
+                saved,
+                batch_count,
+                config,
+            ),
+        );
+
+        Ok(())
+    }
 }
diff --git a/core/server/src/streaming/cluster/mod.rs 
b/core/server/src/streaming/cluster/mod.rs
index 61b376c7..036ebe1c 100644
--- a/core/server/src/streaming/cluster/mod.rs
+++ b/core/server/src/streaming/cluster/mod.rs
@@ -16,7 +16,7 @@
  * under the License.
  */
 
- /*
+/*
 use crate::streaming::session::Session;
 use crate::streaming::systems::system::System;
 use iggy_common::IggyError;
@@ -68,4 +68,4 @@ impl System {
         })
     }
 }
-    */
\ No newline at end of file
+    */
diff --git a/core/server/src/streaming/partitions/helpers.rs 
b/core/server/src/streaming/partitions/helpers.rs
index 213f7706..bb37b568 100644
--- a/core/server/src/streaming/partitions/helpers.rs
+++ b/core/server/src/streaming/partitions/helpers.rs
@@ -1,9 +1,14 @@
-use std::sync::atomic::Ordering;
-
-use iggy_common::{ConsumerOffsetInfo, IggyError};
+use error_set::ErrContext;
+use iggy_common::{ConsumerOffsetInfo, Identifier, IggyByteSize, IggyError};
+use std::{
+    ops::{AsyncFnOnce, Index},
+    sync::atomic::Ordering,
+};
+use sysinfo::Component;
 
 use crate::{
-    configs::system::SystemConfig,
+    configs::{cache_indexes::CacheIndexesConfig, system::SystemConfig},
+    shard_trace,
     slab::{
         partitions::{self, Partitions},
         traits_ext::{
@@ -14,9 +19,11 @@ use crate::{
         deduplication::message_deduplicator::MessageDeduplicator,
         partitions::{
             consumer_offset::ConsumerOffset,
-            partition2::{self, PartitionRef},
+            journal::{Journal, MemoryMessageJournal},
+            partition2::{self, PartitionRef, PartitionRefMut},
             storage2,
         },
+        segments::{IggyIndexesMut, IggyMessagesBatchMut, IggyMessagesBatchSet, 
storage::Storage},
     },
 };
 
@@ -212,7 +219,7 @@ pub fn delete_consumer_group_member_offset_from_disk(
 }
 
 pub fn purge_segments_mem() -> impl FnOnce(&mut Partitions) {
-    |partitions| {
+    |_partitions| {
         // TODO:
         //partitions.segments_mut()
     }
@@ -235,3 +242,583 @@ pub fn create_message_deduplicator(config: &SystemConfig) 
-> Option<MessageDedup
 
     Some(MessageDeduplicator::new(max_entries, expiry))
 }
+
+pub async fn get_messages_by_offset(
+    storage: &Storage,
+    journal: &MemoryMessageJournal,
+    index: &Option<IggyIndexesMut>,
+    offset: u64,
+    end_offset: u64,
+    count: u32,
+    segment_start_offset: u64,
+) -> Result<IggyMessagesBatchSet, IggyError> {
+    if count == 0 {
+        return Ok(IggyMessagesBatchSet::default());
+    }
+
+    // Case 0: Accumulator is empty, so all messages have to be on disk
+    if journal.is_empty() {
+        return load_messages_from_disk_by_offset(
+            storage,
+            index,
+            offset,
+            count,
+            segment_start_offset,
+        )
+        .await;
+    }
+
+    let journal_first_offset = journal.inner().base_offset;
+    let journal_last_offset = journal.inner().current_offset;
+
+    // Case 1: All messages are in accumulator buffer
+    if offset >= journal_first_offset && end_offset <= journal_last_offset {
+        return Ok(journal.get(|batches| batches.get_by_offset(offset, count)));
+    }
+
+    // Case 2: All messages are on disk
+    if end_offset < journal_first_offset {
+        return load_messages_from_disk_by_offset(
+            storage,
+            index,
+            offset,
+            count,
+            segment_start_offset,
+        )
+        .await;
+    }
+
+    // Case 3: Messages span disk and accumulator buffer boundary
+    // Calculate how many messages we need from disk
+    let disk_count = if offset < journal_first_offset {
+        ((journal_first_offset - offset) as u32).min(count)
+    } else {
+        0
+    };
+
+    let mut combined_batch_set = IggyMessagesBatchSet::empty();
+
+    // Load messages from disk if needed
+    if disk_count > 0 {
+        let disk_messages = load_messages_from_disk_by_offset(storage, index, 
offset, disk_count, segment_start_offset)
+            .await
+            .with_error_context(|error| {
+                format!("Failed to load messages from disk, start offset: 
{offset}, count: {disk_count}, error: {error}")
+            })?;
+
+        if !disk_messages.is_empty() {
+            combined_batch_set.add_batch_set(disk_messages);
+        }
+    }
+
+    // Calculate how many more messages we need from the accumulator
+    let remaining_count = count - combined_batch_set.count();
+
+    if remaining_count > 0 {
+        let accumulator_start_offset = std::cmp::max(offset, 
journal_first_offset);
+
+        let accumulator_messages =
+            journal.get(|batches| 
batches.get_by_offset(accumulator_start_offset, remaining_count));
+
+        if !accumulator_messages.is_empty() {
+            combined_batch_set.add_batch_set(accumulator_messages);
+        }
+    }
+
+    Ok(combined_batch_set)
+}
+
+async fn load_messages_from_disk_by_offset(
+    storage: &Storage,
+    index: &Option<IggyIndexesMut>,
+    start_offset: u64,
+    count: u32,
+    segment_start_offset: u64,
+) -> Result<IggyMessagesBatchSet, IggyError> {
+    // Convert start_offset to relative offset within the segment
+    let relative_start_offset = (start_offset - segment_start_offset) as u32;
+
+    // Load indexes first
+    let indexes_to_read = if let Some(indexes) = index {
+        if !indexes.is_empty() {
+            indexes.slice_by_offset(relative_start_offset, count)
+        } else {
+            storage
+                .index_reader
+                .as_ref()
+                .expect("Index reader not initialized")
+                .load_from_disk_by_offset(relative_start_offset, count)
+                .await?
+        }
+    } else {
+        storage
+            .index_reader
+            .as_ref()
+            .expect("Index reader not initialized")
+            .load_from_disk_by_offset(relative_start_offset, count)
+            .await?
+    };
+
+    if indexes_to_read.is_none() {
+        return Ok(IggyMessagesBatchSet::empty());
+    }
+
+    let indexes_to_read = indexes_to_read.unwrap();
+    let first = indexes_to_read.get(0).unwrap();
+    let last = indexes_to_read.last().unwrap();
+
+    let batch = storage
+        .messages_reader
+        .as_ref()
+        .expect("Messages reader not initialized")
+        .load_messages_from_disk(indexes_to_read)
+        .await
+        .with_error_context(|error| format!("Failed to load messages from 
disk: {error}"))?;
+
+    batch
+        .validate_checksums_and_offsets(start_offset)
+        .with_error_context(|error| {
+            format!("Failed to validate messages read from disk! error: 
{error}")
+        })?;
+
+    Ok(IggyMessagesBatchSet::from(batch))
+}
+
+pub fn get_messages_by_offset_range(
+    offset: u64,
+    count: u32,
+    range: std::ops::Range<usize>,
+) -> impl AsyncFnOnce(ComponentsById<PartitionRef>) -> 
Result<IggyMessagesBatchSet, IggyError> {
+    async move |(.., log)| -> Result<IggyMessagesBatchSet, IggyError> {
+        let segments = log.segments().iter();
+        let storages = log.storages().iter();
+        let journal = log.journal();
+        let indexes = log.indexes().iter();
+
+        let mut remaining_count = count;
+        let mut batches = IggyMessagesBatchSet::empty();
+        let mut current_offset = offset;
+
+        for (segment, storage, index) in segments
+            .zip(storages)
+            .zip(indexes)
+            .map(|((a, b), c)| (a, b, c))
+            .skip(range.start)
+            .take(range.end - range.start)
+        {
+            let start_offset = if current_offset < segment.start_offset {
+                segment.start_offset
+            } else {
+                current_offset
+            };
+
+            let mut end_offset = start_offset + (remaining_count - 1) as u64;
+            if end_offset > segment.end_offset {
+                end_offset = segment.end_offset;
+            }
+
+            // Calculate the actual count to request from this segment
+            let count: u32 = ((end_offset - start_offset + 1) as 
u32).min(remaining_count);
+
+            let messages = get_messages_by_offset(
+                storage,
+                journal,
+                index,
+                start_offset,
+                end_offset,
+                count,
+                segment.start_offset,
+            )
+            .await?;
+
+            let messages_count = messages.count();
+            if messages_count == 0 {
+                current_offset = segment.end_offset + 1;
+                continue;
+            }
+
+            remaining_count = remaining_count.saturating_sub(messages_count);
+
+            if let Some(last_offset) = messages.last_offset() {
+                current_offset = last_offset + 1;
+            } else if messages_count > 0 {
+                current_offset += messages_count as u64;
+            }
+
+            batches.add_batch_set(messages);
+
+            if remaining_count == 0 {
+                break;
+            }
+        }
+        Ok(batches)
+    }
+}
+
+pub fn get_segment_range_by_offset(
+    offset: u64,
+) -> impl FnOnce(ComponentsById<PartitionRef>) -> std::ops::Range<usize> {
+    move |(.., log)| {
+        let start = log
+            .segments()
+            .iter()
+            .enumerate()
+            .filter(|(_, segment)| segment.start_offset <= offset)
+            .map(|(index, _)| index)
+            .next()
+            .expect("get_segment_range_by_offset: start segment not found");
+        let end = log.segments().len();
+        start..end
+    }
+}
+
+pub fn get_segment_range_by_timestamp(
+    timestamp: u64,
+) -> impl FnOnce(ComponentsById<PartitionRef>) -> std::ops::Range<usize> {
+    move |(.., log)| -> std::ops::Range<usize> {
+        let start = log
+            .segments()
+            .iter()
+            .enumerate()
+            .find(|(_, segment)| segment.end_timestamp >= timestamp)
+            .map(|(index, _)| index)
+            .expect("get_segment_range_by_timestamp: start segment not found");
+        let end = log.segments().len();
+        start..end
+    }
+}
+
+pub async fn load_messages_from_disk_by_timestamp(
+    storage: &Storage,
+    index: &Option<IggyIndexesMut>,
+    timestamp: u64,
+    count: u32,
+) -> Result<IggyMessagesBatchSet, IggyError> {
+    let indexes_to_read = if let Some(indexes) = index {
+        if !indexes.is_empty() {
+            indexes.slice_by_timestamp(timestamp, count)
+        } else {
+            storage
+                .index_reader
+                .as_ref()
+                .expect("Index reader not initialized")
+                .load_from_disk_by_timestamp(timestamp, count)
+                .await?
+        }
+    } else {
+        storage
+            .index_reader
+            .as_ref()
+            .expect("Index reader not initialized")
+            .load_from_disk_by_timestamp(timestamp, count)
+            .await?
+    };
+
+    if indexes_to_read.is_none() {
+        return Ok(IggyMessagesBatchSet::empty());
+    }
+
+    let indexes_to_read = indexes_to_read.unwrap();
+
+    let batch = storage
+        .messages_reader
+        .as_ref()
+        .expect("Messages reader not initialized")
+        .load_messages_from_disk(indexes_to_read)
+        .await
+        .with_error_context(|error| {
+            format!("Failed to load messages from disk by timestamp: {error}")
+        })?;
+
+    Ok(IggyMessagesBatchSet::from(batch))
+}
+
+pub async fn get_messages_by_timestamp(
+    storage: &Storage,
+    journal: &MemoryMessageJournal,
+    index: &Option<IggyIndexesMut>,
+    timestamp: u64,
+    count: u32,
+) -> Result<IggyMessagesBatchSet, IggyError> {
+    if count == 0 {
+        return Ok(IggyMessagesBatchSet::default());
+    }
+
+    // Case 0: Accumulator is empty, so all messages have to be on disk
+    if journal.is_empty() {
+        return load_messages_from_disk_by_timestamp(storage, index, timestamp, 
count).await;
+    }
+
+    let journal_first_timestamp = journal.inner().first_timestamp;
+    let journal_last_timestamp = journal.inner().end_timestamp;
+
+    // Case 1: All messages are in accumulator buffer
+    if timestamp > journal_last_timestamp {
+        return Ok(IggyMessagesBatchSet::empty());
+    }
+
+    if timestamp >= journal_first_timestamp {
+        return Ok(journal.get(|batches| batches.get_by_timestamp(timestamp, 
count)));
+    }
+
+    // Case 2: All messages are on disk (timestamp is before journal's first 
timestamp)
+    let disk_messages =
+        load_messages_from_disk_by_timestamp(storage, index, timestamp, 
count).await?;
+
+    if disk_messages.count() >= count {
+        return Ok(disk_messages);
+    }
+
+    // Case 3: Messages span disk and accumulator buffer boundary
+    let remaining_count = count - disk_messages.count();
+    let journal_messages =
+        journal.get(|batches| batches.get_by_timestamp(timestamp, 
remaining_count));
+
+    let mut combined_batch_set = disk_messages;
+    if !journal_messages.is_empty() {
+        combined_batch_set.add_batch_set(journal_messages);
+    }
+    return Ok(combined_batch_set);
+}
+
+pub fn get_messages_by_timestamp_range(
+    timestamp: u64,
+    count: u32,
+    range: std::ops::Range<usize>,
+) -> impl AsyncFnOnce(ComponentsById<PartitionRef>) -> 
Result<IggyMessagesBatchSet, IggyError> {
+    async move |(.., log)| -> Result<IggyMessagesBatchSet, IggyError> {
+        let segments = log.segments().iter();
+        let storages = log.storages().iter();
+        let journal = log.journal();
+        let indexes = log.indexes().iter();
+
+        let mut remaining_count = count;
+        let mut batches = IggyMessagesBatchSet::empty();
+
+        for (segment, storage, index) in segments
+            .zip(storages)
+            .zip(indexes)
+            .map(|((a, b), c)| (a, b, c))
+            .skip(range.start)
+            .take(range.end - range.start)
+        {
+            if remaining_count == 0 {
+                break;
+            }
+
+            // Skip segments that end before our timestamp
+            if segment.end_timestamp < timestamp {
+                continue;
+            }
+
+            let messages =
+                get_messages_by_timestamp(storage, journal, index, timestamp, 
remaining_count)
+                    .await?;
+
+            let messages_count = messages.count();
+            if messages_count == 0 {
+                continue;
+            }
+
+            remaining_count = remaining_count.saturating_sub(messages_count);
+            batches.add_batch_set(messages);
+
+            if remaining_count == 0 {
+                break;
+            }
+        }
+
+        Ok(batches)
+    }
+}
+
+pub fn calculate_current_offset() -> impl FnOnce(ComponentsById<PartitionRef>) 
-> u64 {
+    |(root, _, _, offset, ..)| {
+        if !root.should_increment_offset() {
+            0
+        } else {
+            offset.load(Ordering::Relaxed) + 1
+        }
+    }
+}
+
+pub fn deduplicate_messages(
+    current_offset: u64,
+    batch: &mut IggyMessagesBatchMut,
+) -> impl AsyncFnOnce(ComponentsById<PartitionRef>) {
+    async move |(.., deduplicator, _, _, _, log)| {
+        let segment = log.active_segment();
+        batch
+            .prepare_for_persistence(
+                segment.start_offset,
+                current_offset,
+                segment.size,
+                deduplicator.as_ref(),
+            )
+            .await;
+    }
+}
+
+pub fn append_to_journal(
+    shard_id: u16,
+    current_offset: u64,
+    batch: IggyMessagesBatchMut,
+) -> impl FnOnce(ComponentsById<PartitionRefMut>) -> Result<(u32, u32), 
IggyError> {
+    move |(root, stats, _, offset, .., log)| {
+        let segment = log.active_segment_mut();
+
+        if segment.end_offset == 0 {
+            segment.start_timestamp = batch.first_timestamp().unwrap();
+        }
+
+        let batch_messages_size = batch.size();
+        let batch_messages_count = batch.count();
+
+        segment.end_timestamp = batch.last_timestamp().unwrap();
+        segment.end_offset = batch.last_offset().unwrap();
+
+        let (journal_messages_count, journal_size) = 
log.journal_mut().append(shard_id, batch)?;
+
+        stats.increment_messages_count(batch_messages_count as u64);
+        stats.increment_size_bytes(batch_messages_size as u64);
+
+        let last_offset = if batch_messages_count == 0 {
+            current_offset
+        } else {
+            current_offset + batch_messages_count as u64 - 1
+        };
+
+        if root.should_increment_offset() {
+            offset.store(last_offset, Ordering::Relaxed);
+        } else {
+            root.set_should_increment_offset(true);
+            offset.store(last_offset, Ordering::Relaxed);
+        }
+
+        Ok((journal_messages_count, journal_size))
+    }
+}
+
+pub fn commit_journal() -> impl FnOnce(ComponentsById<PartitionRefMut>) -> 
IggyMessagesBatchSet {
+    |(.., log)| {
+        let batches = log.journal_mut().commit();
+        log.ensure_indexes();
+        batches.append_indexes_to(log.active_indexes_mut().unwrap());
+        batches
+    }
+}
+
+pub fn is_segment_full() -> impl FnOnce(ComponentsById<PartitionRef>) -> bool {
+    |(.., log)| log.active_segment().is_full()
+}
+
+pub fn persist_reason(
+    unsaved_messages_count_exceeded: bool,
+    unsaved_messages_size_exceeded: bool,
+    journal_messages_count: u32,
+    journal_size: u32,
+    config: &SystemConfig,
+) -> impl FnOnce(ComponentsById<PartitionRef>) -> String {
+    move |(.., log)| {
+        if unsaved_messages_count_exceeded {
+            format!(
+                "unsaved messages count exceeded: {}, max from config: {}",
+                journal_messages_count, 
config.partition.messages_required_to_save,
+            )
+        } else if unsaved_messages_size_exceeded {
+            format!(
+                "unsaved messages size exceeded: {}, max from config: {}",
+                journal_size, 
config.partition.size_of_messages_required_to_save,
+            )
+        } else {
+            format!(
+                "segment is full, current size: {}, max from config: {}",
+                log.active_segment().size,
+                &config.segment.size,
+            )
+        }
+    }
+}
+
+pub fn persist_batch(
+    shard_id: u16,
+    stream_id: &Identifier,
+    topic_id: &Identifier,
+    partition_id: usize,
+    batches: IggyMessagesBatchSet,
+    reason: String,
+) -> impl AsyncFnOnce(ComponentsById<PartitionRef>) -> Result<(IggyByteSize, 
u32), IggyError> {
+    async move |(.., log)| {
+        shard_trace!(
+            shard_id,
+            "Persisting messages on disk for stream ID: {}, topic ID: {}, 
partition ID: {} because {}...",
+            stream_id,
+            topic_id,
+            partition_id,
+            reason
+        );
+
+        let batch_count = batches.count();
+        let batch_size = batches.size();
+
+        let storage = log.active_storage();
+        let saved = storage
+            .messages_writer
+            .as_ref()
+            .expect("Messages writer not initialized")
+            .save_batch_set(batches)
+            .await
+            .with_error_context(|error| {
+                let segment = log.active_segment();
+                format!(
+                    "Failed to save batch of {batch_count} messages \
+                                    ({batch_size} bytes) to {segment}. 
{error}",
+                )
+            })?;
+
+        let indices = log.active_indexes().unwrap();
+        let first_index = indices.get(0).unwrap();
+        let last_index = indices.last().unwrap();
+        let unsaved_indexes_slice = 
log.active_indexes().unwrap().unsaved_slice();
+        let len = unsaved_indexes_slice.len();
+        storage
+            .index_writer
+            .as_ref()
+            .expect("Index writer not initialized")
+            .save_indexes(unsaved_indexes_slice)
+            .await
+            .with_error_context(|error| {
+                let segment = log.active_segment();
+                format!("Failed to save index of {len} indexes to {segment}. 
{error}",)
+            })?;
+
+        shard_trace!(
+            shard_id,
+            "Persisted {} messages on disk for stream ID: {}, topic ID: {}, 
for partition with ID: {}, total bytes written: {}.",
+            batch_count,
+            stream_id,
+            topic_id,
+            partition_id,
+            saved
+        );
+
+        Ok((saved, batch_count))
+    }
+}
+
+pub fn update_index_and_increment_stats(
+    saved: IggyByteSize,
+    batch_count: u32,
+    config: &SystemConfig,
+) -> impl FnOnce(ComponentsById<PartitionRefMut>) {
+    move |(_, stats, .., log)| {
+        let segment = log.active_segment_mut();
+        segment.size += saved.as_bytes_u32();
+        log.active_indexes_mut().unwrap().mark_saved();
+        if config.segment.cache_indexes == CacheIndexesConfig::None {
+            log.active_indexes_mut().unwrap().clear();
+        }
+        stats.increment_size_bytes(saved.as_bytes_u64());
+        stats.increment_messages_count(batch_count as u64);
+    }
+}
diff --git a/core/server/src/streaming/partitions/journal.rs 
b/core/server/src/streaming/partitions/journal.rs
index 19d6dd82..9feee8de 100644
--- a/core/server/src/streaming/partitions/journal.rs
+++ b/core/server/src/streaming/partitions/journal.rs
@@ -12,6 +12,8 @@ use std::fmt::Debug;
 pub struct Inner {
     pub base_offset: u64,
     pub current_offset: u64,
+    pub first_timestamp: u64,
+    pub end_timestamp: u64,
     pub messages_count: u32,
     pub size: u32,
 }
@@ -49,8 +51,14 @@ impl Journal for MemoryMessageJournal {
         );
 
         let batch_size = entry.size();
+        let first_timestamp = entry.first_timestamp().unwrap();
+        let last_timestamp = entry.last_timestamp().unwrap();
         self.batches.add_batch(entry);
 
+        if self.inner.first_timestamp == 0 {
+            self.inner.first_timestamp = first_timestamp;
+        }
+        self.inner.end_timestamp = last_timestamp;
         self.inner.messages_count += batch_messages_count;
         self.inner.current_offset = self.inner.base_offset + 
self.inner.messages_count as u64 - 1;
         self.inner.size += batch_size;
@@ -72,10 +80,20 @@ impl Journal for MemoryMessageJournal {
 
     fn commit(&mut self) -> Self::Container {
         self.inner.base_offset = self.inner.current_offset;
+        self.inner.first_timestamp = 0;
+        self.inner.end_timestamp = 0;
         self.inner.size = 0;
         self.inner.messages_count = 0;
         std::mem::take(&mut self.batches)
     }
+
+    fn is_empty(&self) -> bool {
+        self.batches.is_empty()
+    }
+
+    fn inner(&self) -> &Self::Inner {
+        &self.inner
+    }
 }
 
 pub trait Journal {
@@ -93,6 +111,10 @@ pub trait Journal {
 
     fn commit(&mut self) -> Self::Container;
 
+    fn is_empty(&self) -> bool;
+
+    fn inner(&self) -> &Self::Inner;
+
     // `flush` is only useful in case of an journal that has disk backed WAL.
     // This could be merged together with `append`, but not doing this for two 
reasons.
     // 1. In case of the `Journal` being used as part of structure that 
utilizes interior mutability, async with borrow_mut is not possible.
diff --git a/core/server/src/streaming/partitions/log.rs 
b/core/server/src/streaming/partitions/log.rs
index 87003880..83712975 100644
--- a/core/server/src/streaming/partitions/log.rs
+++ b/core/server/src/streaming/partitions/log.rs
@@ -24,7 +24,7 @@ where
     access_map: AllocRingBuffer<usize>,
     cache: (),
     segments: Vec<Segment2>,
-    indexes: Option<IggyIndexesMut>,
+    indexes: Vec<Option<IggyIndexesMut>>,
     storage: Vec<Storage>,
 }
 
@@ -48,7 +48,7 @@ where
             cache: (),
             segments: Vec::with_capacity(SEGMENTS_CAPACITY),
             storage: Vec::with_capacity(SEGMENTS_CAPACITY),
-            indexes: None,
+            indexes: Vec::with_capacity(SEGMENTS_CAPACITY),
         }
     }
 }
@@ -57,6 +57,18 @@ impl<J> SegmentedLog<J>
 where
     J: Journal + Debug,
 {
+    pub fn has_segments(&self) -> bool {
+        !self.segments.is_empty()
+    }
+
+    pub fn segments(&self) -> &Vec<Segment2> {
+        &self.segments
+    }
+
+    pub fn storages(&self) -> &Vec<Storage> {
+        &self.storage
+    }
+
     pub fn active_segment(&self) -> &Segment2 {
         self.segments
             .last()
@@ -81,28 +93,47 @@ where
             .expect("active storage called on empty log")
     }
 
-    pub fn indexes(&self) -> Option<&IggyIndexesMut> {
-        self.indexes.as_ref()
+    pub fn indexes(&self) -> &Vec<Option<IggyIndexesMut>> {
+        &self.indexes
+    }
+
+    pub fn active_indexes(&self) -> Option<&IggyIndexesMut> {
+        self.indexes
+            .last()
+            .expect("active indexes called on empty log")
+            .as_ref()
     }
 
-    pub fn indexes_mut(&mut self) -> Option<&mut IggyIndexesMut> {
-        self.indexes.as_mut()
+    pub fn active_indexes_mut(&mut self) -> Option<&mut IggyIndexesMut> {
+        self.indexes
+            .last_mut()
+            .expect("active indexes called on empty log")
+            .as_mut()
     }
 
-    pub fn clear_indexes(&mut self) {
-        self.indexes = None;
+    pub fn clear_active_indexes(&mut self) {
+        let indexes = self
+            .indexes
+            .last_mut()
+            .expect("active indexes called on empty log");
+        *indexes = None;
     }
 
     pub fn ensure_indexes(&mut self) {
-        if self.indexes.is_none() {
+        let indexes = self
+            .indexes
+            .last_mut()
+            .expect("active indexes called on empty log");
+        if indexes.is_none() {
             let capacity = SIZE_16MB / INDEX_SIZE;
-            self.indexes = Some(IggyIndexesMut::with_capacity(capacity, 0));
+            *indexes = Some(IggyIndexesMut::with_capacity(capacity, 0));
         }
     }
 
     pub fn add_persisted_segment(&mut self, segment: Segment2, storage: 
Storage) {
         self.segments.push(segment);
         self.storage.push(storage);
+        self.indexes.push(None);
     }
 }
 
@@ -113,6 +144,10 @@ where
     pub fn journal_mut(&mut self) -> &mut J {
         &mut self.journal
     }
+
+    pub fn journal(&self) -> &J {
+        &self.journal
+    }
 }
 
 impl<J> Log for SegmentedLog<J> where J: Journal + Debug {}
diff --git a/core/server/src/streaming/partitions/messages.rs 
b/core/server/src/streaming/partitions/messages.rs
index bc4da29b..f11a8021 100644
--- a/core/server/src/streaming/partitions/messages.rs
+++ b/core/server/src/streaming/partitions/messages.rs
@@ -32,12 +32,6 @@ impl Partition {
         timestamp: IggyTimestamp,
         count: u32,
     ) -> Result<IggyMessagesBatchSet, IggyError> {
-        trace!(
-            "Getting {count} messages by timestamp: {} for partition: {}...",
-            timestamp.as_micros(),
-            self.partition_id
-        );
-
         if self.segments.is_empty() || count == 0 {
             return Ok(IggyMessagesBatchSet::empty());
         }
@@ -754,5 +748,4 @@ mod tests {
             .expect("Failed to create message with ID")
     }
 }
-
 */
diff --git a/core/server/src/streaming/segments/indexes/index_reader.rs 
b/core/server/src/streaming/segments/indexes/index_reader.rs
index 325d561a..cf4df83c 100644
--- a/core/server/src/streaming/segments/indexes/index_reader.rs
+++ b/core/server/src/streaming/segments/indexes/index_reader.rs
@@ -31,6 +31,7 @@ use ring::error;
 use std::{
     io::ErrorKind,
     os::unix::fs::FileExt,
+    rc::Rc,
     sync::{
         Arc,
         atomic::{AtomicU64, Ordering},
@@ -43,12 +44,15 @@ use tracing::{error, trace};
 pub struct IndexReader {
     file_path: String,
     file: File,
-    index_size_bytes: AtomicU64,
+    index_size_bytes: Rc<AtomicU64>,
 }
 
+// Safety: We are guaranteeing that IndexWriter will never be used from 
multiple threads
+unsafe impl Send for IndexReader {}
+
 impl IndexReader {
     /// Opens the index file in read-only mode.
-    pub async fn new(file_path: &str, index_size_bytes: AtomicU64) -> 
Result<Self, IggyError> {
+    pub async fn new(file_path: &str, index_size_bytes: Rc<AtomicU64>) -> 
Result<Self, IggyError> {
         let file = OpenOptions::new()
             .read(true)
             .open(file_path)
diff --git a/core/server/src/streaming/segments/indexes/index_writer.rs 
b/core/server/src/streaming/segments/indexes/index_writer.rs
index e905eaec..1c14d14f 100644
--- a/core/server/src/streaming/segments/indexes/index_writer.rs
+++ b/core/server/src/streaming/segments/indexes/index_writer.rs
@@ -22,6 +22,7 @@ use compio::io::AsyncWriteAtExt;
 use error_set::ErrContext;
 use iggy_common::INDEX_SIZE;
 use iggy_common::IggyError;
+use std::rc::Rc;
 use std::sync::{
     Arc,
     atomic::{AtomicU64, Ordering},
@@ -35,15 +36,18 @@ use crate::streaming::utils::PooledBuffer;
 pub struct IndexWriter {
     file_path: String,
     file: File,
-    index_size_bytes: AtomicU64,
+    index_size_bytes: Rc<AtomicU64>,
     fsync: bool,
 }
 
+// Safety: We are guaranteeing that IndexWriter will never be used from 
multiple threads
+unsafe impl Send for IndexWriter {}
+
 impl IndexWriter {
     /// Opens the index file in write mode.
     pub async fn new(
         file_path: &str,
-        index_size_bytes: AtomicU64,
+        index_size_bytes: Rc<AtomicU64>,
         fsync: bool,
         file_exists: bool,
     ) -> Result<Self, IggyError> {
diff --git a/core/server/src/streaming/segments/indexes/indexes_mut.rs 
b/core/server/src/streaming/segments/indexes/indexes_mut.rs
index 4c68dea6..ba38697f 100644
--- a/core/server/src/streaming/segments/indexes/indexes_mut.rs
+++ b/core/server/src/streaming/segments/indexes/indexes_mut.rs
@@ -256,7 +256,7 @@ impl IggyIndexesMut {
         if relative_start_offset == 0 {
             Some(IggyIndexesMut::from_bytes(slice, self.base_position))
         } else {
-            let position_offset = self.get(relative_start_offset - 
1).unwrap().position();
+            let position_offset: u32 = self.get(relative_start_offset - 
1).unwrap().position();
             Some(IggyIndexesMut::from_bytes(slice, position_offset))
         }
     }
diff --git a/core/server/src/streaming/segments/messages/messages_reader.rs 
b/core/server/src/streaming/segments/messages/messages_reader.rs
index a3ffe3e3..ccdfec07 100644
--- a/core/server/src/streaming/segments/messages/messages_reader.rs
+++ b/core/server/src/streaming/segments/messages/messages_reader.rs
@@ -24,6 +24,7 @@ use compio::fs::{File, OpenOptions};
 use compio::io::AsyncReadAtExt;
 use error_set::ErrContext;
 use iggy_common::IggyError;
+use std::rc::Rc;
 use std::{
     io::ErrorKind,
     sync::{
@@ -38,12 +39,18 @@ use tracing::{error, trace};
 pub struct MessagesReader {
     file_path: String,
     file: File,
-    messages_size_bytes: AtomicU64,
+    messages_size_bytes: Rc<AtomicU64>,
 }
 
+// Safety: We are guaranteeing that MessagesReader will never be used from 
multiple threads
+unsafe impl Send for MessagesReader {}
+
 impl MessagesReader {
     /// Opens the messages file in read mode.
-    pub async fn new(file_path: &str, messages_size_bytes: AtomicU64) -> 
Result<Self, IggyError> {
+    pub async fn new(
+        file_path: &str,
+        messages_size_bytes: Rc<AtomicU64>,
+    ) -> Result<Self, IggyError> {
         let file = OpenOptions::new()
             .read(true)
             .open(file_path)
diff --git a/core/server/src/streaming/segments/messages/messages_writer.rs 
b/core/server/src/streaming/segments/messages/messages_writer.rs
index a9bc084d..becc7aea 100644
--- a/core/server/src/streaming/segments/messages/messages_writer.rs
+++ b/core/server/src/streaming/segments/messages/messages_writer.rs
@@ -20,9 +20,12 @@ use crate::streaming::segments::{IggyMessagesBatchSet, 
messages::write_batch};
 use compio::fs::{File, OpenOptions};
 use error_set::ErrContext;
 use iggy_common::{IggyByteSize, IggyError};
-use std::sync::{
-    Arc,
-    atomic::{AtomicU64, Ordering},
+use std::{
+    rc::Rc,
+    sync::{
+        Arc,
+        atomic::{AtomicU64, Ordering},
+    },
 };
 use tracing::{error, trace};
 
@@ -31,10 +34,13 @@ use tracing::{error, trace};
 pub struct MessagesWriter {
     file_path: String,
     file: File,
-    messages_size_bytes: AtomicU64,
+    messages_size_bytes: Rc<AtomicU64>,
     fsync: bool,
 }
 
+// Safety: We are guaranteeing that MessagesWriter will never be used from 
multiple threads
+unsafe impl Send for MessagesWriter {}
+
 impl MessagesWriter {
     /// Opens the messages file in write mode.
     ///
@@ -43,7 +49,7 @@ impl MessagesWriter {
     /// Otherwise, the file is retained in `self.file` for synchronous writes.
     pub async fn new(
         file_path: &str,
-        messages_size_bytes: AtomicU64,
+        messages_size_bytes: Rc<AtomicU64>,
         fsync: bool,
         file_exists: bool,
     ) -> Result<Self, IggyError> {
diff --git a/core/server/src/streaming/segments/storage.rs 
b/core/server/src/streaming/segments/storage.rs
index 271396b9..737b8947 100644
--- a/core/server/src/streaming/segments/storage.rs
+++ b/core/server/src/streaming/segments/storage.rs
@@ -1,4 +1,5 @@
 use iggy_common::IggyError;
+use std::rc::Rc;
 use std::sync::atomic::AtomicU64;
 use tracing::warn;
 
@@ -27,25 +28,16 @@ impl Storage {
         index_fsync: bool,
         file_exists: bool,
     ) -> Result<Self, IggyError> {
-        let messages_writer = MessagesWriter::new(
-            messages_path,
-            AtomicU64::new(messages_size),
-            log_fsync,
-            file_exists,
-        )
-        .await?;
+        let size = Rc::new(AtomicU64::new(messages_size));
+        let indexes_size = Rc::new(AtomicU64::new(indexes_size));
+        let messages_writer =
+            MessagesWriter::new(messages_path, size.clone(), log_fsync, 
file_exists).await?;
 
-        let index_writer = IndexWriter::new(
-            index_path,
-            AtomicU64::new(indexes_size),
-            index_fsync,
-            file_exists,
-        )
-        .await?;
+        let index_writer =
+            IndexWriter::new(index_path, indexes_size.clone(), index_fsync, 
file_exists).await?;
 
-        let messages_reader =
-            MessagesReader::new(messages_path, 
AtomicU64::new(messages_size)).await?;
-        let index_reader = IndexReader::new(index_path, 
AtomicU64::new(indexes_size)).await?;
+        let messages_reader = MessagesReader::new(messages_path, size).await?;
+        let index_reader = IndexReader::new(index_path, indexes_size).await?;
 
         Ok(Self {
             messages_writer: Some(messages_writer),

Reply via email to