This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch rebase_master
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 05ab5a7705ce86a3f08aa95759617c754cc3aec9
Author: numinex <[email protected]>
AuthorDate: Sun Jun 29 10:01:36 2025 +0200

    finish send/poll messages
---
 core/common/src/error/iggy_error.rs                |   5 +
 core/common/src/lib.rs                             |   1 -
 core/common/src/types/confirmation/mod.rs          |  53 ---------
 core/common/src/types/mod.rs                       |   1 -
 .../tests/streaming/get_by_timestamp.rs            |   5 +-
 core/integration/tests/streaming/topic_messages.rs |   5 +-
 core/sdk/src/prelude.rs                            |   2 +-
 .../handlers/messages/poll_messages_handler.rs     | 128 +++++++++++++++++----
 .../handlers/messages/send_messages_handler.rs     | 111 +++++++++++++++---
 core/server/src/configs/defaults.rs                |   6 -
 core/server/src/configs/displays.rs                |   3 +-
 core/server/src/configs/system.rs                  |   3 -
 core/server/src/shard/mod.rs                       |  80 ++++++++++++-
 core/server/src/shard/namespace.rs                 |  11 +-
 core/server/src/shard/system/messages.rs           |  52 ++-------
 core/server/src/shard/system/partitions.rs         |  15 ++-
 core/server/src/shard/system/streams.rs            |   9 ++
 core/server/src/shard/system/topics.rs             |  24 +++-
 core/server/src/shard/transmission/event.rs        |  60 ++++++++++
 core/server/src/shard/transmission/frame.rs        |  25 ++--
 core/server/src/shard/transmission/message.rs      |  28 ++++-
 core/server/src/shard/transmission/mod.rs          |   1 +
 core/server/src/streaming/partitions/messages.rs   |  20 +---
 core/server/src/streaming/partitions/partition.rs  |   8 +-
 .../streaming/segments/messages/messages_reader.rs |  31 +++--
 .../streaming/segments/messages/messages_writer.rs |   2 +-
 core/server/src/streaming/streams/storage.rs       |  14 +--
 core/server/src/streaming/streams/topics.rs        |  14 ++-
 .../server/src/streaming/topics/consumer_groups.rs |   6 +-
 core/server/src/streaming/topics/messages.rs       |  55 +++++----
 core/server/src/streaming/topics/partitions.rs     |  14 +--
 core/server/src/streaming/topics/storage.rs        |   6 +-
 core/server/src/streaming/topics/topic.rs          |  26 +++--
 33 files changed, 530 insertions(+), 294 deletions(-)

diff --git a/core/common/src/error/iggy_error.rs 
b/core/common/src/error/iggy_error.rs
index cc8a284b..334e14c7 100644
--- a/core/common/src/error/iggy_error.rs
+++ b/core/common/src/error/iggy_error.rs
@@ -459,6 +459,11 @@ pub enum IggyError {
     CannotReadIndexPosition = 10011,
     #[error("Cannot read index timestamp")]
     CannotReadIndexTimestamp = 10012,
+
+    #[error("Shard not found for stream ID: {0}, topic ID: {1}, partition ID: 
{2}")]
+    ShardNotFound(u32, u32, u32) = 11000,
+    #[error("Shard communication error, shard ID: {0}")]
+    ShardCommunicationError(u16) = 11001,
 }
 
 impl IggyError {
diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs
index 6603a7d5..f055476e 100644
--- a/core/common/src/lib.rs
+++ b/core/common/src/lib.rs
@@ -64,7 +64,6 @@ pub use 
types::configuration::tcp_config::tcp_client_config::*;
 pub use types::configuration::tcp_config::tcp_client_config_builder::*;
 pub use types::configuration::tcp_config::tcp_client_reconnection_config::*;
 pub use types::configuration::tcp_config::tcp_connection_string_options::*;
-pub use types::confirmation::*;
 pub use types::consumer::consumer_group::*;
 pub use types::consumer::consumer_kind::*;
 pub use types::consumer::consumer_offset_info::*;
diff --git a/core/common/src/types/confirmation/mod.rs 
b/core/common/src/types/confirmation/mod.rs
deleted file mode 100644
index bc26cffa..00000000
--- a/core/common/src/types/confirmation/mod.rs
+++ /dev/null
@@ -1,53 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use serde::{Deserialize, Serialize};
-use strum::{Display, EnumString};
-
-#[derive(Clone, Copy, Debug, Default, Display, Serialize, Deserialize, 
EnumString, PartialEq)]
-#[strum(serialize_all = "snake_case")]
-pub enum Confirmation {
-    #[default]
-    Wait,
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use std::str::FromStr;
-
-    #[test]
-    fn test_to_string() {
-        assert_eq!(Confirmation::Wait.to_string(), "wait");
-        assert_eq!(Confirmation::NoWait.to_string(), "no_wait");
-    }
-
-    #[test]
-    fn test_from_str() {
-        assert_eq!(Confirmation::from_str("wait").unwrap(), 
Confirmation::Wait);
-        assert_eq!(
-            Confirmation::from_str("no_wait").unwrap(),
-            Confirmation::NoWait
-        );
-    }
-
-    #[test]
-    fn test_default() {
-        assert_eq!(Confirmation::default(), Confirmation::Wait);
-    }
-}
diff --git a/core/common/src/types/mod.rs b/core/common/src/types/mod.rs
index 2376abd6..6940f065 100644
--- a/core/common/src/types/mod.rs
+++ b/core/common/src/types/mod.rs
@@ -21,7 +21,6 @@ pub(crate) mod client_state;
 pub(crate) mod command;
 pub(crate) mod compression;
 pub(crate) mod configuration;
-pub(crate) mod confirmation;
 pub(crate) mod consumer;
 pub(crate) mod diagnostic;
 pub(crate) mod identifier;
diff --git a/core/integration/tests/streaming/get_by_timestamp.rs 
b/core/integration/tests/streaming/get_by_timestamp.rs
index d0cc2e5b..93a178aa 100644
--- a/core/integration/tests/streaming/get_by_timestamp.rs
+++ b/core/integration/tests/streaming/get_by_timestamp.rs
@@ -202,10 +202,7 @@ async fn test_get_messages_by_timestamp(
 
         let batch = 
IggyMessagesBatchMut::from_messages(messages_slice_to_append, messages_size);
         assert_eq!(batch.count(), batch_len);
-        partition
-            .append_messages(batch)
-            .await
-            .unwrap();
+        partition.append_messages(batch).await.unwrap();
 
         // Capture the timestamp of this batch
         batch_timestamps.push(IggyTimestamp::now());
diff --git a/core/integration/tests/streaming/topic_messages.rs 
b/core/integration/tests/streaming/topic_messages.rs
index f21263c4..19de255f 100644
--- a/core/integration/tests/streaming/topic_messages.rs
+++ b/core/integration/tests/streaming/topic_messages.rs
@@ -62,10 +62,7 @@ async fn assert_polling_messages() {
         .map(|m| m.get_size_bytes())
         .sum::<IggyByteSize>();
     let batch = IggyMessagesBatchMut::from_messages(&messages, 
batch_size.as_bytes_u32());
-    topic
-        .append_messages(&partitioning, batch)
-        .await
-        .unwrap();
+    topic.append_messages(&partitioning, batch).await.unwrap();
 
     let consumer = PollingConsumer::Consumer(1, partition_id);
     let (_, polled_messages) = topic
diff --git a/core/sdk/src/prelude.rs b/core/sdk/src/prelude.rs
index 1e1ee111..e04164fe 100644
--- a/core/sdk/src/prelude.rs
+++ b/core/sdk/src/prelude.rs
@@ -51,7 +51,7 @@ pub use iggy_binary_protocol::{
 };
 pub use iggy_common::{
     Aes256GcmEncryptor, Args, ArgsOptional, AutoLogin, BytesSerializable, 
CacheMetrics,
-    CacheMetricsKey, ClientError, ClientInfoDetails, CompressionAlgorithm, 
Confirmation, Consumer,
+    CacheMetricsKey, ClientError, ClientInfoDetails, CompressionAlgorithm, 
Consumer,
     ConsumerGroupDetails, ConsumerKind, EncryptorKind, FlushUnsavedBuffer, 
GlobalPermissions,
     HeaderKey, HeaderValue, HttpClientConfig, HttpClientConfigBuilder, IdKind, 
Identifier,
     IdentityInfo, IggyByteSize, IggyDuration, IggyError, IggyExpiry, 
IggyIndexView, IggyMessage,
diff --git a/core/server/src/binary/handlers/messages/poll_messages_handler.rs 
b/core/server/src/binary/handlers/messages/poll_messages_handler.rs
index 9d6d526b..c590dcf2 100644
--- a/core/server/src/binary/handlers/messages/poll_messages_handler.rs
+++ b/core/server/src/binary/handlers/messages/poll_messages_handler.rs
@@ -20,8 +20,12 @@ use crate::binary::command::{BinaryServerCommand, 
ServerCommand, ServerCommandHa
 use crate::binary::handlers::messages::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::sender::SenderKind;
-use crate::shard::IggyShard;
+use crate::shard::namespace::IggyNamespace;
+use crate::shard::transmission::frame::ShardResponse;
+use crate::shard::transmission::message::{ShardMessage, ShardRequest};
+use crate::shard::{IggyShard, ShardRequestResult};
 use crate::shard::system::messages::PollingArgs;
+use crate::streaming::segments::IggyMessagesBatchSet;
 use crate::streaming::session::Session;
 use crate::to_iovec;
 use anyhow::Result;
@@ -60,48 +64,128 @@ impl ServerCommandHandler for PollMessages {
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
 
-        let (metadata, messages) = shard
-            .poll_messages(
-                session,
-                &self.consumer,
-                &self.stream_id,
-                &self.topic_id,
-                self.partition_id,
-                PollingArgs::new(self.strategy, self.count, self.auto_commit),
-            )
-            .await
+        let stream = shard
+            .get_stream(&self.stream_id)
+            .with_error_context(|error| {
+                format!(
+                    "{COMPONENT} (error: {error}) - stream not found for 
stream ID: {}",
+                    self.stream_id
+                )
+            })?;
+        let topic = stream.get_topic(
+            &self.topic_id,
+        ).with_error_context(|error| format!(
+            "{COMPONENT} (error: {error}) - topic not found for stream ID: {}, 
topic_id: {}",
+            self.stream_id, self.topic_id
+        ))?;
+
+        let PollMessages {
+            consumer,
+            partition_id,
+            strategy,
+            count,
+            auto_commit,
+            ..
+        } = self;
+        let args = PollingArgs::new(strategy, count, auto_commit);
+
+        shard.permissioner
+            .borrow()
+            .poll_messages(session.get_user_id(), topic.stream_id, 
topic.topic_id)
             .with_error_context(|error| format!(
-                "{COMPONENT} (error: {error}) - failed to poll messages for 
consumer: {}, stream_id: {}, topic_id: {}, partition_id: {:?}, session: 
{session}.",
-                self.consumer, self.stream_id, self.topic_id, self.partition_id
+                "{COMPONENT} (error: {error}) - permission denied to poll 
messages for user {} on stream ID: {}, topic ID: {}",
+                session.get_user_id(),
+                topic.stream_id,
+                topic.topic_id
             ))?;
 
+        if !topic.has_partitions() {
+            return Err(IggyError::NoPartitions(topic.topic_id, 
topic.stream_id));
+        }
+
+        // There might be no partition assigned, if it's the consumer group 
member without any partitions.
+        let Some((consumer, partition_id)) = topic
+            .resolve_consumer_with_partition_id(&consumer, session.client_id, 
partition_id, true)
+            .with_error_context(|error| format!("{COMPONENT} (error: {error}) 
- failed to resolve consumer with partition id, consumer: {consumer}, client 
ID: {}, partition ID: {:?}", session.client_id, partition_id))? else {
+                todo!("Send early response");
+            //return Ok((IggyPollMetadata::new(0, 0), 
IggyMessagesBatchSet::empty()));
+        };
+
+        let namespace = IggyNamespace::new(stream.stream_id, topic.topic_id, 
partition_id);
+        let request = ShardRequest::PollMessages {
+            consumer,
+            partition_id,
+            args,
+            count,
+        };
+        let message = ShardMessage::Request(request);
+        let (metadata, batch) = match shard.send_request_to_shard(&namespace, 
message).await {
+            ShardRequestResult::SameShard(message) => {
+                match message {
+                    ShardMessage::Request(request) => {
+                        match request {
+                            ShardRequest::PollMessages {
+                                consumer,
+                                partition_id,
+                                args,
+                                count,
+                            } => {
+                                topic.get_messages(consumer, partition_id, 
args.strategy, count).await?
+                                
+                            }
+                            _ => unreachable!(
+                                "Expected a SendMessages request inside of 
SendMessages handler, impossible state"
+                            ),
+                        }
+                    }
+                    _ => unreachable!(
+                        "Expected a request message inside of an command 
handler, impossible state"
+                    ),
+                }
+            }
+            ShardRequestResult::Result(result) => {
+                match result? {
+                    ShardResponse::PollMessages(response) => {
+                        response
+                    }
+                    ShardResponse::ErrorResponse(err) => {
+                        return Err(err);
+                    }
+                    _ => unreachable!(
+                        "Expected a PollMessages response inside of 
PollMessages handler, impossible state"
+                    ),
+                }
+            }
+        };
+
+
         // Collect all chunks first into a Vec to extend their lifetimes.
         // This ensures the Bytes (in reality Arc<[u8]>) references from each 
IggyMessagesBatch stay alive
         // throughout the async vectored I/O operation, preventing "borrowed 
value does not live
         // long enough" errors while optimizing transmission by using larger 
chunks.
 
         // 4 bytes for partition_id + 8 bytes for current_offset + 4 bytes for 
messages_count + size of all batches.
-        let response_length = 4 + 8 + 4 + messages.size();
+        let response_length = 4 + 8 + 4 + batch.size();
         let response_length_bytes = response_length.to_le_bytes();
 
         let partition_id = metadata.partition_id.to_le_bytes();
         let current_offset = metadata.current_offset.to_le_bytes();
-        let count = messages.count().to_le_bytes();
+        let count = batch.count().to_le_bytes();
 
-        let mut io_slices = Vec::with_capacity(messages.containers_count() + 
3);
-        io_slices.push(to_iovec(&partition_id));
-        io_slices.push(to_iovec(&current_offset));
-        io_slices.push(to_iovec(&count));
+        let mut iovecs = Vec::with_capacity(batch.containers_count() + 3);
+        iovecs.push(to_iovec(&partition_id));
+        iovecs.push(to_iovec(&current_offset));
+        iovecs.push(to_iovec(&count));
 
-        io_slices.extend(messages.iter().map(|m| to_iovec(&m)));
+        iovecs.extend(batch.iter().map(|m| to_iovec(&m)));
         trace!(
             "Sending {} messages to client ({} bytes) to client",
-            messages.count(),
+            batch.count(),
             response_length
         );
 
         sender
-            .send_ok_response_vectored(&response_length_bytes, io_slices)
+            .send_ok_response_vectored(&response_length_bytes, iovecs)
             .await?;
         Ok(())
     }
diff --git a/core/server/src/binary/handlers/messages/send_messages_handler.rs 
b/core/server/src/binary/handlers/messages/send_messages_handler.rs
index d9a4770f..61e2c39c 100644
--- a/core/server/src/binary/handlers/messages/send_messages_handler.rs
+++ b/core/server/src/binary/handlers/messages/send_messages_handler.rs
@@ -16,19 +16,25 @@
  * under the License.
  */
 
+use super::COMPONENT;
 use crate::binary::command::{BinaryServerCommand, ServerCommandHandler};
 use crate::binary::sender::SenderKind;
-use crate::shard::IggyShard;
+use crate::shard::namespace::IggyNamespace;
+use crate::shard::transmission::frame::ShardResponse;
+use crate::shard::transmission::message::{ShardMessage, ShardRequest};
+use crate::shard::{IggyShard, ShardRequestResult};
 use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut};
 use crate::streaming::session::Session;
 use crate::streaming::utils::PooledBuffer;
 use anyhow::Result;
 use bytes::BytesMut;
-use iggy_common::INDEX_SIZE;
+use error_set::ErrContext;
 use iggy_common::Identifier;
 use iggy_common::Sizeable;
+use iggy_common::{INDEX_SIZE, IdKind};
 use iggy_common::{IggyError, Partitioning, SendMessages, Validatable};
 use std::rc::Rc;
+use std::sync::Arc;
 use tracing::instrument;
 
 impl ServerCommandHandler for SendMessages {
@@ -104,19 +110,98 @@ impl ServerCommandHandler for SendMessages {
             indexes,
             messages_buffer,
         );
-
         batch.validate()?;
 
-        shard
-            .append_messages(
-                session,
-                &self.stream_id,
-                &self.topic_id,
-                &self.partitioning,
-                batch,
-            )
-            .await?;
-
+        let stream = shard
+            .get_stream(&self.stream_id)
+            .with_error_context(|error| {
+                format!(
+                    "Failed to get stream with ID: {} (error: {})",
+                    self.stream_id, error
+                )
+            })?;
+        let topic = stream
+            .get_topic(&self.topic_id)
+            .with_error_context(|error| {
+                format!(
+                    "Failed to get topic with ID: {} (error: {})",
+                    self.topic_id, error
+                )
+            })?;
+        let stream_id = stream.stream_id;
+        let topic_id = topic.topic_id;
+        let partition_id = topic.calculate_partition_id(&self.partitioning)?;
+
+        // Validate permissions for given user on stream and topic.
+        shard.permissioner.borrow().append_messages(
+            session.get_user_id(),
+            stream_id,
+            topic_id
+        ).with_error_context(|error| format!(
+            "{COMPONENT} (error: {error}) - permission denied to append 
messages for user {} on stream ID: {}, topic ID: {}",
+            session.get_user_id(),
+            stream_id,
+            topic_id
+        ))?;
+        let messages_count = batch.count();
+        shard.metrics.increment_messages(messages_count as u64);
+
+        // Encrypt messages if encryptor is enabled in configuration.
+        let batch = shard.maybe_encrypt_messages(batch)?;
+
+        let namespace = IggyNamespace::new(stream.stream_id, topic.topic_id, 
partition_id);
+        let request = ShardRequest::SendMessages {
+            stream_id,
+            topic_id,
+            partition_id,
+            batch,
+        };
+        let message = ShardMessage::Request(request);
+        // Egh... I don't like those nested match statements,
+        // Technically there is only two `request` types that will ever be 
dispatched
+        // to different shards, thus we could get away with generic structs
+        // maybe todo ?
+        // how to make this code reusable, in a sense that we will have 
exactly the same code inside of
+        // `PollMessages` handler, but with different request....
+        match shard.send_request_to_shard(&namespace, message).await {
+            ShardRequestResult::SameShard(message) => {
+                match message {
+                    ShardMessage::Request(request) => {
+                        match request {
+                            ShardRequest::SendMessages {
+                                stream_id,
+                                topic_id,
+                                partition_id,
+                                batch,
+                            } => {
+                                // Just shut up rust analyzer.
+                                let _stream_id = stream_id;
+                                let _topic_id = topic_id;
+                                topic.append_messages(partition_id, 
batch).await?
+                            }
+                            _ => unreachable!(
+                                "Expected a SendMessages request inside of 
SendMessages handler, impossible state"
+                            ),
+                        }
+                    }
+                    _ => unreachable!(
+                        "Expected a request message inside of an command 
handler, impossible state"
+                    ),
+                }
+            }
+            ShardRequestResult::Result(result) => {
+                match result? {
+                    ShardResponse::SendMessages => {
+                        ()
+                    }
+                    ShardResponse::ErrorResponse(err) => {
+                        return Err(err);
+                    }
+                    _ => unreachable!("Expected a SendMessages response inside 
of SendMessages handler, impossible state"),
+                }
+
+            }
+        };
         sender.send_empty_ok_response().await?;
         Ok(())
     }
diff --git a/core/server/src/configs/defaults.rs 
b/core/server/src/configs/defaults.rs
index 82b560cf..dd74ade7 100644
--- a/core/server/src/configs/defaults.rs
+++ b/core/server/src/configs/defaults.rs
@@ -451,12 +451,6 @@ impl Default for SegmentConfig {
             cache_indexes: 
SERVER_CONFIG.system.segment.cache_indexes.parse().unwrap(),
             message_expiry: 
SERVER_CONFIG.system.segment.message_expiry.parse().unwrap(),
             archive_expired: SERVER_CONFIG.system.segment.archive_expired,
-            server_confirmation: SERVER_CONFIG
-                .system
-                .segment
-                .server_confirmation
-                .parse()
-                .unwrap(),
         }
     }
 }
diff --git a/core/server/src/configs/displays.rs 
b/core/server/src/configs/displays.rs
index de4805a8..51873421 100644
--- a/core/server/src/configs/displays.rs
+++ b/core/server/src/configs/displays.rs
@@ -289,12 +289,11 @@ impl Display for SegmentConfig {
     fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
         write!(
             f,
-            "{{ size_bytes: {}, cache_indexes: {}, message_expiry: {}, 
archive_expired: {}, server_confirmation: {} }}",
+            "{{ size_bytes: {}, cache_indexes: {}, message_expiry: {}, 
archive_expired: {} }}",
             self.size,
             self.cache_indexes,
             self.message_expiry,
             self.archive_expired,
-            self.server_confirmation,
         )
     }
 }
diff --git a/core/server/src/configs/system.rs 
b/core/server/src/configs/system.rs
index fc263a26..a5193294 100644
--- a/core/server/src/configs/system.rs
+++ b/core/server/src/configs/system.rs
@@ -17,7 +17,6 @@
  */
 
 use super::cache_indexes::CacheIndexesConfig;
-use iggy_common::Confirmation;
 use iggy_common::IggyByteSize;
 use iggy_common::IggyExpiry;
 use iggy_common::MaxTopicSize;
@@ -141,8 +140,6 @@ pub struct SegmentConfig {
     #[serde_as(as = "DisplayFromStr")]
     pub message_expiry: IggyExpiry,
     pub archive_expired: bool,
-    #[serde_as(as = "DisplayFromStr")]
-    pub server_confirmation: Confirmation,
 }
 
 #[serde_as]
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index dbd94b68..7976cca3 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -47,14 +47,12 @@ use crate::{
     shard::{
         system::info::SystemInfo,
         transmission::{
-            frame::ShardFrame,
-            message::{ShardEvent, ShardMessage},
+            frame::{ShardFrame, ShardResponse},
+            message::{ShardEvent, ShardMessage, ShardRequest},
         },
     },
     state::{
-        StateKind,
-        file::FileState,
-        system::{StreamState, SystemState, UserState},
+        file::FileState, system::{StreamState, SystemState, UserState}, 
StateKind
     },
     streaming::{
         clients::client_manager::ClientManager,
@@ -85,12 +83,37 @@ impl Shard {
             connection,
         }
     }
+
+    pub async fn send_request(&self, message: ShardMessage) -> 
Result<ShardResponse, IggyError> {
+        let (sender, receiver) = async_channel::bounded(1);
+        self.connection
+            .sender
+            .send(ShardFrame::new(message, Some(sender.clone()))); // 
Apparently sender needs to be cloned, otherwise channel will close...
+        //TODO: Fixme
+        let response = receiver.recv().await
+            .map_err(|err| {
+                error!("Failed to receive response from shard: {err}");
+                IggyError::ShardCommunicationError(self.id)
+            })?;
+        Ok(response)
+    }
 }
 
 struct ShardInfo {
     id: u16,
 }
 
+impl ShardInfo {
+    pub fn new(id: u16) -> Self {
+        Self { id }
+    }
+}
+
+pub enum ShardRequestResult<T, E> {
+    SameShard(ShardMessage),
+    Result(Result<T, E>),
+}
+
 pub struct IggyShard {
     pub id: u16,
     shards: Vec<Shard>,
@@ -426,6 +449,53 @@ impl IggyShard {
         self.shards.len() as u32
     }
 
+    pub async fn send_request_to_shard(
+        &self,
+        namespace: &IggyNamespace,
+        message: ShardMessage,
+    ) -> ShardRequestResult<ShardResponse, IggyError> {
+        if let Some(shard) = self.find_shard(namespace) {
+            if shard.id == self.id {
+                return ShardRequestResult::SameShard(message);
+            }
+
+            let response = match shard.send_request(message).await {
+                Ok(response) => response,
+                Err(err) => {
+                    error!(
+                        "{COMPONENT} - failed to send request to shard with 
ID: {}, error: {err}",
+                        shard.id
+                    );
+                    return ShardRequestResult::Result(Err(err));
+                }
+            };
+            ShardRequestResult::Result(Ok(response))
+        } else {
+            ShardRequestResult::Result(Err(IggyError::ShardNotFound(
+                namespace.stream_id,
+                namespace.topic_id,
+                namespace.partition_id,
+            )))
+        }
+    }
+
+    fn find_shard(&self, namespace: &IggyNamespace) -> Option<&Shard> {
+        let shards_table = self.shards_table.borrow();
+        shards_table.get(namespace).map(|shard_info| {
+            self.shards
+                .iter()
+                .find(|shard| shard.id == shard_info.id)
+                .expect("Shard not found in the shards table.")
+        })
+    }
+
+    pub fn insert_shard_table_records(
+        &self,
+        records: impl Iterator<Item = (IggyNamespace, ShardInfo)>,
+    ) {
+        self.shards_table.borrow_mut().extend(records);
+    }
+
     pub fn broadcast_event_to_all_shards(&self, client_id: u32, event: 
ShardEvent) {
         self.shards
             .iter()
diff --git a/core/server/src/shard/namespace.rs 
b/core/server/src/shard/namespace.rs
index a7c899aa..6b3b5b7d 100644
--- a/core/server/src/shard/namespace.rs
+++ b/core/server/src/shard/namespace.rs
@@ -17,19 +17,18 @@
  */
 
 use hash32::{Hasher, Murmur3Hasher};
-use iggy_common::Identifier;
 use std::hash::Hasher as _;
 
 //TODO: Will probably want to move it to separate crate so we can share it 
with sdk.
 #[derive(Debug, Clone, Eq, PartialEq, Hash)]
 pub struct IggyNamespace {
-    pub(crate) stream_id: Identifier,
-    pub(crate) topic_id: Identifier,
+    pub(crate) stream_id: u32,
+    pub(crate) topic_id: u32,
     pub(crate) partition_id: u32,
 }
 
 impl IggyNamespace {
-    pub fn new(stream_id: Identifier, topic_id: Identifier, partition_id: u32) 
-> Self {
+    pub fn new(stream_id: u32, topic_id: u32, partition_id: u32) -> Self {
         Self {
             stream_id,
             topic_id,
@@ -39,8 +38,8 @@ impl IggyNamespace {
 
     pub fn generate_hash(&self) -> u32 {
         let mut hasher = Murmur3Hasher::default();
-        hasher.write(&self.stream_id.value);
-        hasher.write(&self.topic_id.value);
+        hasher.write_u32(self.stream_id);
+        hasher.write_u32(self.topic_id);
         hasher.write_u32(self.partition_id);
         hasher.finish32()
     }
diff --git a/core/server/src/shard/system/messages.rs 
b/core/server/src/shard/system/messages.rs
index 9a23a9bb..76337a1a 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -25,7 +25,7 @@ use crate::streaming::utils::PooledBuffer;
 use async_zip::tokio::read::stream;
 use error_set::ErrContext;
 use iggy_common::{
-    BytesSerializable, Confirmation, Consumer, EncryptorKind, 
IGGY_MESSAGE_HEADER_SIZE, Identifier,
+    BytesSerializable, Consumer, EncryptorKind, IGGY_MESSAGE_HEADER_SIZE, 
Identifier,
     IggyError, Partitioning, PollingStrategy,
 };
 use tracing::{error, trace};
@@ -98,47 +98,6 @@ impl IggyShard {
         Ok((metadata, batch_set))
     }
 
-    pub async fn append_messages(
-        &self,
-        session: &Session,
-        stream_id: &Identifier,
-        topic_id: &Identifier,
-        partitioning: &Partitioning,
-        messages: IggyMessagesBatchMut,
-    ) -> Result<(), IggyError> {
-        self.ensure_authenticated(session)?;
-        let stream = self.get_stream(stream_id).with_error_context(|error| {
-            format!("{COMPONENT} (error: {error}) - stream not found for 
stream_id: {stream_id}")
-        })?;
-        let stream_id = stream.stream_id;
-        let topic = self.find_topic(session, &stream, 
topic_id).with_error_context(|error| format!("{COMPONENT} (error: {error}) - 
topic not found for stream_id: {stream_id}, topic_id: {topic_id}"))?;
-        self.permissioner.borrow().append_messages(
-            session.get_user_id(),
-            topic.stream_id,
-            topic.topic_id
-        ).with_error_context(|error| format!(
-            "{COMPONENT} (error: {error}) - permission denied to append 
messages for user {} on stream ID: {}, topic ID: {}",
-            session.get_user_id(),
-            topic.stream_id,
-            topic.topic_id
-        ))?;
-        let messages_count = messages.count();
-
-        // Encrypt messages if encryptor is configured
-        let messages = if let Some(encryptor) = &self.encryptor {
-            self.encrypt_messages(messages, encryptor)?
-        } else {
-            messages
-        };
-
-        topic
-            .append_messages(partitioning, messages)
-            .await?;
-
-        self.metrics.increment_messages(messages_count as u64);
-        Ok(())
-    }
-
     pub async fn flush_unsaved_buffer(
         &self,
         session: &Session,
@@ -167,7 +126,7 @@ impl IggyShard {
         Ok(())
     }
 
-    async fn decrypt_messages(
+    pub async fn decrypt_messages(
         &self,
         batches: IggyMessagesBatchSet,
         encryptor: &EncryptorKind,
@@ -206,11 +165,14 @@ impl IggyShard {
         Ok(IggyMessagesBatchSet::from_vec(decrypted_batches))
     }
 
-    fn encrypt_messages(
+    pub fn maybe_encrypt_messages(
         &self,
         batch: IggyMessagesBatchMut,
-        encryptor: &EncryptorKind,
     ) -> Result<IggyMessagesBatchMut, IggyError> {
+        let encryptor = match self.encryptor.as_ref() {
+            Some(encryptor) => encryptor,
+            None => return Ok(batch),
+        };
         let mut encrypted_messages = PooledBuffer::with_capacity(batch.size() 
as usize * 2);
         let count = batch.count();
         let mut indexes = IggyIndexesMut::with_capacity(batch.count() as 
usize, 0);
diff --git a/core/server/src/shard/system/partitions.rs 
b/core/server/src/shard/system/partitions.rs
index 01e02968..78ef2144 100644
--- a/core/server/src/shard/system/partitions.rs
+++ b/core/server/src/shard/system/partitions.rs
@@ -18,6 +18,8 @@
 
 use super::COMPONENT;
 use crate::shard::IggyShard;
+use crate::shard::ShardInfo;
+use crate::shard::namespace::IggyNamespace;
 use crate::streaming::session::Session;
 use error_set::ErrContext;
 use iggy_common::Identifier;
@@ -54,6 +56,7 @@ impl IggyShard {
         let mut stream = 
self.get_stream_mut(stream_id).with_error_context(|error| {
             format!("{COMPONENT} (error: {error}) - failed to get stream with 
ID: {stream_id}")
         })?;
+        let stream_id = stream.stream_id;
         let topic = stream
             .get_topic_mut(topic_id)
             .with_error_context(|error| {
@@ -61,15 +64,25 @@ impl IggyShard {
                     "{COMPONENT} (error: {error}) - failed to get mutable 
reference to stream with id: {stream_id}"
                 )
             })?;
+        let topic_id = topic.topic_id;
 
         // TODO: Make add persisted partitions to topic sync, and extract the 
storage persister out of it
         // perform disk i/o outside of the borrow_mut of the stream.
-        topic
+        let partition_ids = topic
             .add_persisted_partitions(partitions_count)
             .await
             .with_error_context(|error| {
                 format!("{COMPONENT} (error: {error}) - failed to add 
persisted partitions, topic: {topic}")
             })?;
+        let records = partition_ids.into_iter().map(|partition_id| {
+            let namespace = IggyNamespace::new(stream_id, topic_id, 
partition_id);
+            let hash = namespace.generate_hash();
+            let shard_id = hash % self.get_available_shards_count();
+            let shard_info = ShardInfo::new(shard_id as u16);
+            (namespace, shard_info)
+        });
+        self.insert_shard_table_records(records);
+
         topic.reassign_consumer_groups();
         self.metrics.increment_partitions(partitions_count);
         self.metrics.increment_segments(partitions_count);
diff --git a/core/server/src/shard/system/streams.rs 
b/core/server/src/shard/system/streams.rs
index 3cb0da5b..fecec536 100644
--- a/core/server/src/shard/system/streams.rs
+++ b/core/server/src/shard/system/streams.rs
@@ -112,6 +112,15 @@ impl IggyShard {
         }
     }
 
+    pub fn try_get_topic_id(&self, stream_id: &Identifier, name: &str) -> 
Option<u32> {
+        let stream = self.get_stream(stream_id).ok()?;
+        stream.topics_ids.get(name).and_then(|id| Some(*id))
+    }
+
+    pub fn try_get_stream_id(&self, name: &str) -> Option<u32> {
+        self.streams_ids.borrow().get(name).and_then(|id| Some(*id))
+    }
+
     fn try_get_stream_by_name(&self, name: &str) -> Option<Ref<'_, Stream>> {
         self.streams_ids
             .borrow()
diff --git a/core/server/src/shard/system/topics.rs 
b/core/server/src/shard/system/topics.rs
index 477308bc..252dae30 100644
--- a/core/server/src/shard/system/topics.rs
+++ b/core/server/src/shard/system/topics.rs
@@ -17,7 +17,8 @@
  */
 
 use super::COMPONENT;
-use crate::shard::IggyShard;
+use crate::shard::namespace::IggyNamespace;
+use crate::shard::{IggyShard, ShardInfo};
 use crate::streaming::session::Session;
 use crate::streaming::streams::stream::Stream;
 use crate::streaming::topics::topic::Topic;
@@ -131,8 +132,11 @@ impl IggyShard {
 
         // TODO: Make create topic sync, and extract the storage persister out 
of it
         // perform disk i/o outside of the borrow_mut of the stream.
-        let created_topic_id = self
-            .get_stream_mut(stream_id)?
+        let mut stream = 
self.get_stream_mut(stream_id).with_error_context(|error| {
+            format!("{COMPONENT} (error: {error}) - failed to get mutable 
reference to stream with ID: {stream_id}")
+        })?;
+        let stream_id = stream.stream_id;
+        let (topic_id, partition_ids) = stream
             .create_topic(
                 topic_id,
                 name,
@@ -146,12 +150,24 @@ impl IggyShard {
             .with_error_context(|error| {
                 format!("{COMPONENT} (error: {error}) - failed to create topic 
with name: {name} in stream ID: {stream_id}")
             })?;
+        let records = partition_ids.into_iter().map(|partition_id| {
+            let namespace = IggyNamespace::new(stream_id, topic_id, 
partition_id);
+            // TODO: This setup isn't deterministic.
+            // Imagine a scenario where client creates partition using 
`String` identifiers,
+            // but then for poll_messages requests uses numeric ones.
+            // the namespace wouldn't match, therefore we would get miss in 
the shard table.
+            let hash = namespace.generate_hash();
+            let shard_id = hash % self.get_available_shards_count();
+            let shard_info = ShardInfo::new(shard_id as u16);
+            (namespace, shard_info)
+        });
+        self.insert_shard_table_records(records);
 
         self.metrics.increment_topics(1);
         self.metrics.increment_partitions(partitions_count);
         self.metrics.increment_segments(partitions_count);
 
-        Ok(Identifier::numeric(created_topic_id)?)
+        Ok(Identifier::numeric(topic_id)?)
     }
 
     #[allow(clippy::too_many_arguments)]
diff --git a/core/server/src/shard/transmission/event.rs 
b/core/server/src/shard/transmission/event.rs
new file mode 100644
index 00000000..45815a50
--- /dev/null
+++ b/core/server/src/shard/transmission/event.rs
@@ -0,0 +1,60 @@
+use std::net::SocketAddr;
+
+use iggy_common::{CompressionAlgorithm, Identifier, IggyExpiry, MaxTopicSize};
+
+use crate::streaming::clients::client_manager::Transport;
+
+pub enum ShardEvent {
+    CreatedStream {
+        stream_id: Option<u32>,
+        topic_id: Identifier
+    },
+    //DeletedStream(Identifier),
+    //UpdatedStream(Identifier, String),
+    //PurgedStream(Identifier),
+    //CreatedPartitions(Identifier, Identifier, u32),
+    //DeletedPartitions(Identifier, Identifier, u32),
+    CreatedTopic(
+        Identifier,
+        Option<u32>,
+        String,
+        u32,
+        IggyExpiry,
+        CompressionAlgorithm,
+        MaxTopicSize,
+        Option<u8>,
+    ),
+    //CreatedConsumerGroup(Identifier, Identifier, Option<u32>, String),
+    //DeletedConsumerGroup(Identifier, Identifier, Identifier),
+    /*
+    UpdatedTopic(
+        Identifier,
+        Identifier,
+        String,
+        IggyExpiry,
+        CompressionAlgorithm,
+        MaxTopicSize,
+        Option<u8>,
+    ),
+    */
+    //PurgedTopic(Identifier, Identifier),
+    //DeletedTopic(Identifier, Identifier),
+    //CreatedUser(String, String, UserStatus, Option<Permissions>),
+    //DeletedUser(Identifier),
+    LoginUser {
+        username: String,
+        password: String,
+    },
+    //LogoutUser,
+    //UpdatedUser(Identifier, Option<String>, Option<UserStatus>),
+    //ChangedPassword(Identifier, String, String),
+    //CreatedPersonalAccessToken(String, IggyExpiry),
+    //DeletedPersonalAccessToken(String),
+    //LoginWithPersonalAccessToken(String),
+    //StoredConsumerOffset(Identifier, Identifier, PollingConsumer, u64),
+    NewSession {
+        user_id: u32,
+        socket_addr: SocketAddr,
+        transport: Transport,
+    },
+}
\ No newline at end of file
diff --git a/core/server/src/shard/transmission/frame.rs 
b/core/server/src/shard/transmission/frame.rs
index ec63daf0..ff519ab4 100644
--- a/core/server/src/shard/transmission/frame.rs
+++ b/core/server/src/shard/transmission/frame.rs
@@ -16,38 +16,33 @@
  * under the License.
  */
 use async_channel::Sender;
-use bytes::Bytes;
 use iggy_common::IggyError;
 
-use crate::shard::transmission::message::ShardMessage;
+use 
crate::{binary::handlers::messages::poll_messages_handler::IggyPollMetadata, 
shard::transmission::message::ShardMessage, 
streaming::segments::IggyMessagesBatchSet};
+
+#[derive(Debug)]
+pub enum ShardResponse {
+    PollMessages((IggyPollMetadata, IggyMessagesBatchSet)),
+    SendMessages,
+    ShardEvent,
+    ErrorResponse(IggyError),
+}
 
 #[derive(Debug)]
 pub struct ShardFrame {
-    pub client_id: u32,
     pub message: ShardMessage,
     pub response_sender: Option<Sender<ShardResponse>>,
 }
 
 impl ShardFrame {
-    pub fn new(
-        client_id: u32,
-        message: ShardMessage,
-        response_sender: Option<Sender<ShardResponse>>,
-    ) -> Self {
+    pub fn new(message: ShardMessage, response_sender: 
Option<Sender<ShardResponse>>) -> Self {
         Self {
-            client_id,
             message,
             response_sender,
         }
     }
 }
 
-#[derive(Debug)]
-pub enum ShardResponse {
-    BinaryResponse(Bytes),
-    ErrorResponse(IggyError),
-}
-
 #[macro_export]
 macro_rules! handle_response {
     ($sender:expr, $response:expr) => {
diff --git a/core/server/src/shard/transmission/message.rs 
b/core/server/src/shard/transmission/message.rs
index cbf4fdd4..360e7a3c 100644
--- a/core/server/src/shard/transmission/message.rs
+++ b/core/server/src/shard/transmission/message.rs
@@ -1,5 +1,7 @@
 use std::rc::Rc;
 
+use iggy_common::PollingStrategy;
+
 /* Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,11 +19,11 @@ use std::rc::Rc;
  * specific language governing permissions and limitations
  * under the License.
  */
-use crate::{binary::command::ServerCommand, streaming::session::Session};
+use crate::{shard::system::messages::PollingArgs, 
streaming::{polling_consumer::PollingConsumer, segments::IggyMessagesBatchMut, 
session::Session}};
 
 #[derive(Debug)]
 pub enum ShardMessage {
-    Command(ServerCommand),
+    Request(ShardRequest),
     Event(ShardEvent),
 }
 
@@ -30,9 +32,25 @@ pub enum ShardEvent {
     NewSession(),
 }
 
-impl From<ServerCommand> for ShardMessage {
-    fn from(command: ServerCommand) -> Self {
-        ShardMessage::Command(command)
+#[derive(Debug)]
+pub enum ShardRequest {
+    SendMessages {
+        stream_id: u32,
+        topic_id: u32,
+        partition_id: u32,
+        batch: IggyMessagesBatchMut,
+    },
+    PollMessages {
+        partition_id: u32,
+        args: PollingArgs,
+        consumer: PollingConsumer,
+        count: u32,
+    },
+}
+
+impl From<ShardRequest> for ShardMessage {
+    fn from(request: ShardRequest) -> Self {
+        ShardMessage::Request(request)
     }
 }
 
diff --git a/core/server/src/shard/transmission/mod.rs 
b/core/server/src/shard/transmission/mod.rs
index 1f9a79ed..4ed9bd1e 100644
--- a/core/server/src/shard/transmission/mod.rs
+++ b/core/server/src/shard/transmission/mod.rs
@@ -16,6 +16,7 @@
  * under the License.
  */
 
+pub mod event;
 pub mod connector;
 pub mod frame;
 pub mod message;
diff --git a/core/server/src/streaming/partitions/messages.rs 
b/core/server/src/streaming/partitions/messages.rs
index cc961370..3b2d942b 100644
--- a/core/server/src/streaming/partitions/messages.rs
+++ b/core/server/src/streaming/partitions/messages.rs
@@ -21,7 +21,7 @@ use crate::streaming::partitions::partition::Partition;
 use crate::streaming::polling_consumer::PollingConsumer;
 use crate::streaming::segments::*;
 use error_set::ErrContext;
-use iggy_common::{Confirmation, IggyError, IggyTimestamp, Sizeable};
+use iggy_common::{IggyError, IggyTimestamp, Sizeable};
 use std::sync::atomic::Ordering;
 use tracing::trace;
 
@@ -218,10 +218,7 @@ impl Partition {
         Ok(batches)
     }
 
-    pub async fn append_messages(
-        &mut self,
-        batch: IggyMessagesBatchMut,
-    ) -> Result<(), IggyError> {
+    pub async fn append_messages(&mut self, batch: IggyMessagesBatchMut) -> 
Result<(), IggyError> {
         if batch.count() == 0 {
             return Ok(());
         }
@@ -564,10 +561,7 @@ mod tests {
             .map(|m| m.get_size_bytes().as_bytes_u32())
             .sum();
         let initial_batch = 
IggyMessagesBatchMut::from_messages(&initial_messages, initial_size);
-        partition
-            .append_messages(initial_batch)
-            .await
-            .unwrap();
+        partition.append_messages(initial_batch).await.unwrap();
 
         // Now try to add only duplicates
         let duplicate_messages = vec![
@@ -582,10 +576,7 @@ mod tests {
             .sum();
         let duplicate_batch =
             IggyMessagesBatchMut::from_messages(&duplicate_messages, 
duplicate_size);
-        partition
-            .append_messages(duplicate_batch)
-            .await
-            .unwrap();
+        partition.append_messages(duplicate_batch).await.unwrap();
 
         let loaded_messages = partition.get_messages_by_offset(0, 
10).await.unwrap();
 
@@ -739,8 +730,7 @@ mod tests {
                 Arc::new(AtomicU64::new(0)),
                 Arc::new(AtomicU32::new(0)),
                 IggyTimestamp::now(),
-            )
-            .await,
+            ),
             temp_dir,
         )
     }
diff --git a/core/server/src/streaming/partitions/partition.rs 
b/core/server/src/streaming/partitions/partition.rs
index 1b8bacf8..4a573d14 100644
--- a/core/server/src/streaming/partitions/partition.rs
+++ b/core/server/src/streaming/partitions/partition.rs
@@ -84,7 +84,7 @@ impl ConsumerOffset {
 
 impl Partition {
     #[allow(clippy::too_many_arguments)]
-    pub async fn create(
+    pub fn create(
         stream_id: u32,
         topic_id: u32,
         partition_id: u32,
@@ -247,8 +247,7 @@ mod tests {
             Arc::new(AtomicU64::new(0)),
             Arc::new(AtomicU32::new(0)),
             IggyTimestamp::now(),
-        )
-        .await;
+        );
 
         assert_eq!(partition.stream_id, stream_id);
         assert_eq!(partition.topic_id, topic_id);
@@ -290,8 +289,7 @@ mod tests {
             Arc::new(AtomicU64::new(0)),
             Arc::new(AtomicU32::new(0)),
             IggyTimestamp::now(),
-        )
-        .await;
+        );
         assert!(partition.segments.is_empty());
     }
 }
diff --git a/core/server/src/streaming/segments/messages/messages_reader.rs 
b/core/server/src/streaming/segments/messages/messages_reader.rs
index 0f413650..4091e8e3 100644
--- a/core/server/src/streaming/segments/messages/messages_reader.rs
+++ b/core/server/src/streaming/segments/messages/messages_reader.rs
@@ -16,9 +16,9 @@
  * under the License.
  */
 
-use crate::io::file::{IggyFile};
+use crate::io::file::IggyFile;
 use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut};
-use crate::streaming::utils::{file, PooledBuffer};
+use crate::streaming::utils::{PooledBuffer, file};
 use bytes::BytesMut;
 use error_set::ErrContext;
 use iggy_common::IggyError;
@@ -174,19 +174,18 @@ impl MessagesReader {
         len: u32,
         use_pool: bool,
     ) -> Result<PooledBuffer, std::io::Error> {
-           if use_pool {
-                let mut buf = PooledBuffer::with_capacity(len as usize);
-                unsafe { buf.set_len(len as usize) };
-                let (result, buf) = self.file.read_exact_at(buf, offset as 
u64).await;
-                result?;
-                Ok(buf)
-
-            } else {
-                let mut buf = BytesMut::with_capacity(len as usize);
-                unsafe { buf.set_len(len as usize) };
-                let (result, buf) = self.file.read_exact_at(buf, offset as 
u64).await;
-                result?;
-                Ok(PooledBuffer::from_existing(buf))
-            }
+        if use_pool {
+            let mut buf = PooledBuffer::with_capacity(len as usize);
+            unsafe { buf.set_len(len as usize) };
+            let (result, buf) = self.file.read_exact_at(buf, offset as 
u64).await;
+            result?;
+            Ok(buf)
+        } else {
+            let mut buf = BytesMut::with_capacity(len as usize);
+            unsafe { buf.set_len(len as usize) };
+            let (result, buf) = self.file.read_exact_at(buf, offset as 
u64).await;
+            result?;
+            Ok(PooledBuffer::from_existing(buf))
+        }
     }
 }
diff --git a/core/server/src/streaming/segments/messages/messages_writer.rs 
b/core/server/src/streaming/segments/messages/messages_writer.rs
index 0429817f..63e424a9 100644
--- a/core/server/src/streaming/segments/messages/messages_writer.rs
+++ b/core/server/src/streaming/segments/messages/messages_writer.rs
@@ -21,7 +21,7 @@ use crate::{
     streaming::segments::{IggyMessagesBatchSet, messages::write_batch},
 };
 use error_set::ErrContext;
-use iggy_common::{Confirmation, IggyByteSize, IggyError};
+use iggy_common::{IggyByteSize, IggyError};
 use monoio::fs::{File, OpenOptions};
 use std::sync::{
     Arc,
diff --git a/core/server/src/streaming/streams/storage.rs 
b/core/server/src/streaming/streams/storage.rs
index 53639de9..ebd4946a 100644
--- a/core/server/src/streaming/streams/storage.rs
+++ b/core/server/src/streaming/streams/storage.rs
@@ -20,6 +20,7 @@ use crate::state::system::StreamState;
 use crate::streaming::storage::StreamStorage;
 use crate::streaming::streams::COMPONENT;
 use crate::streaming::streams::stream::Stream;
+use crate::streaming::topics::topic::CreatedTopicInfo;
 use crate::streaming::topics::topic::Topic;
 use ahash::AHashSet;
 use error_set::ErrContext;
@@ -83,7 +84,7 @@ impl StreamStorage for FileStreamStorage {
             }
 
             let topic_state = topic_state.unwrap();
-            let topic = Topic::empty(
+            let topic_info = Topic::empty(
                 stream.stream_id,
                 topic_id,
                 &topic_state.name,
@@ -92,9 +93,8 @@ impl StreamStorage for FileStreamStorage {
                 stream.segments_count.clone(),
                 stream.config.clone(),
                 stream.storage.clone(),
-            )
-            .await;
-            unloaded_topics.push(topic);
+            );
+            unloaded_topics.push(topic_info.topic);
         }
 
         let state_topic_ids = 
state.topics.keys().copied().collect::<AHashSet<u32>>();
@@ -123,7 +123,7 @@ impl StreamStorage for FileStreamStorage {
                 );
                 for topic_id in missing_ids {
                     let topic_state = state.topics.get(&topic_id).unwrap();
-                    let topic = Topic::empty(
+                    let topic_info = Topic::empty(
                         stream.stream_id,
                         topic_id,
                         &topic_state.name,
@@ -132,8 +132,8 @@ impl StreamStorage for FileStreamStorage {
                         stream.segments_count.clone(),
                         stream.config.clone(),
                         stream.storage.clone(),
-                    )
-                    .await;
+                    );
+                    let topic = topic_info.topic;
                     topic.persist().await.with_error_context(|error| {
                         format!("{COMPONENT} (error: {error}) - failed to 
persist topic: {topic}")
                     })?;
diff --git a/core/server/src/streaming/streams/topics.rs 
b/core/server/src/streaming/streams/topics.rs
index 1d47cc36..6a201c7a 100644
--- a/core/server/src/streaming/streams/topics.rs
+++ b/core/server/src/streaming/streams/topics.rs
@@ -18,6 +18,7 @@
 
 use crate::streaming::streams::COMPONENT;
 use crate::streaming::streams::stream::Stream;
+use crate::streaming::topics::topic::CreatedTopicInfo;
 use crate::streaming::topics::topic::Topic;
 use error_set::ErrContext;
 use iggy_common::CompressionAlgorithm;
@@ -44,7 +45,7 @@ impl Stream {
         compression_algorithm: CompressionAlgorithm,
         max_topic_size: MaxTopicSize,
         replication_factor: u8,
-    ) -> Result<u32, IggyError> {
+    ) -> Result<(u32, Vec<u32>), IggyError> {
         let max_topic_size = Topic::get_max_topic_size(max_topic_size, 
&self.config)?;
         if self.topics_ids.contains_key(name) {
             return Err(IggyError::TopicNameAlreadyExists(
@@ -74,7 +75,10 @@ impl Stream {
             return Err(IggyError::TopicIdAlreadyExists(id, self.stream_id));
         }
 
-        let topic = Topic::create(
+        let CreatedTopicInfo {
+            topic,
+            partition_ids,
+        } = Topic::create(
             self.stream_id,
             id,
             name,
@@ -88,15 +92,15 @@ impl Stream {
             compression_algorithm,
             max_topic_size,
             replication_factor,
-        )
-        .await?;
+        )?;
+
         topic.persist().await.with_error_context(|error| {
             format!("{COMPONENT} (error: {error}) - failed to persist topic: 
{topic}")
         })?;
         info!("Created topic {}", topic);
         self.topics_ids.insert(name.to_owned(), id);
         self.topics.insert(id, topic);
-        Ok(id)
+        Ok((id, partition_ids))
     }
 
     pub async fn update_topic(
diff --git a/core/server/src/streaming/topics/consumer_groups.rs 
b/core/server/src/streaming/topics/consumer_groups.rs
index 28a8705c..79c7f9f3 100644
--- a/core/server/src/streaming/topics/consumer_groups.rs
+++ b/core/server/src/streaming/topics/consumer_groups.rs
@@ -458,7 +458,7 @@ mod tests {
         let messages_count_of_parent_stream = Arc::new(AtomicU64::new(0));
         let segments_count_of_parent_stream = Arc::new(AtomicU32::new(0));
 
-        Topic::create(
+        let topic_info = Topic::create(
             stream_id,
             id,
             name,
@@ -473,7 +473,7 @@ mod tests {
             MaxTopicSize::ServerDefault,
             1,
         )
-        .await
-        .unwrap()
+        .unwrap();
+        topic_info.topic
     }
 }
diff --git a/core/server/src/streaming/topics/messages.rs 
b/core/server/src/streaming/topics/messages.rs
index acbe8c28..2f8f0557 100644
--- a/core/server/src/streaming/topics/messages.rs
+++ b/core/server/src/streaming/topics/messages.rs
@@ -25,7 +25,7 @@ use crate::streaming::utils::hash;
 use ahash::AHashMap;
 use error_set::ErrContext;
 use iggy_common::locking::IggySharedMutFn;
-use iggy_common::{Confirmation, IggyTimestamp, PollingStrategy};
+use iggy_common::{IggyTimestamp, PollingStrategy};
 use iggy_common::{IggyError, IggyExpiry, Partitioning, PartitioningKind, 
PollingKind};
 use std::sync::atomic::Ordering;
 use tracing::trace;
@@ -78,8 +78,8 @@ impl Topic {
 
     pub async fn append_messages(
         &self,
-        partitioning: &Partitioning,
-        messages: IggyMessagesBatchMut,
+        partition_id: u32,
+        batch: IggyMessagesBatchMut,
     ) -> Result<(), IggyError> {
         if !self.has_partitions() {
             return Err(IggyError::NoPartitions(self.topic_id, self.stream_id));
@@ -91,24 +91,11 @@ impl Topic {
             return Err(IggyError::TopicFull(self.topic_id, self.stream_id));
         }
 
-        if messages.is_empty() {
+        if batch.is_empty() {
             return Ok(());
         }
 
-        let partition_id = match partitioning.kind {
-            PartitioningKind::Balanced => self.get_next_partition_id(),
-            PartitioningKind::PartitionId => u32::from_le_bytes(
-                partitioning.value[..partitioning.length as usize]
-                    .try_into()
-                    .map_err(|_| IggyError::InvalidNumberEncoding)?,
-            ),
-            PartitioningKind::MessagesKey => {
-                
self.calculate_partition_id_by_messages_key_hash(&partitioning.value)
-            }
-        };
-
-        self.append_messages_to_partition(messages, partition_id)
-            .await
+        self.append_messages_to_partition(batch, partition_id).await
     }
 
     pub async fn flush_unsaved_buffer(
@@ -152,6 +139,20 @@ impl Topic {
         Ok(())
     }
 
+    pub fn calculate_partition_id(&self, partitioning: &Partitioning) -> 
Result<u32, IggyError> {
+        match partitioning.kind {
+            PartitioningKind::Balanced => Ok(self.get_next_partition_id()),
+            PartitioningKind::PartitionId => Ok(u32::from_le_bytes(
+                partitioning.value[..partitioning.length as usize]
+                    .try_into()
+                    .map_err(|_| IggyError::InvalidNumberEncoding)?,
+            )),
+            PartitioningKind::MessagesKey => {
+                
Ok(self.calculate_partition_id_by_messages_key_hash(&partitioning.value))
+            }
+        }
+    }
+
     fn get_next_partition_id(&self) -> u32 {
         let mut partition_id = self.current_partition_id.fetch_add(1, 
Ordering::SeqCst);
         let partitions_count = self.partitions.len() as u32;
@@ -227,10 +228,8 @@ mod tests {
                 .build()
                 .expect("Failed to create message with valid payload and 
headers");
             let messages = IggyMessagesBatchMut::from_messages(&[message], 1);
-            topic
-                .append_messages(&partitioning, messages)
-                .await
-                .unwrap();
+            let partition = 
topic.calculate_partition_id(&partitioning).unwrap();
+            topic.append_messages(partition, messages).await.unwrap();
         }
 
         let partitions = topic.get_partitions();
@@ -260,10 +259,10 @@ mod tests {
                 .build()
                 .expect("Failed to create message with valid payload and 
headers");
             let messages = IggyMessagesBatchMut::from_messages(&[message], 1);
-            topic
-                .append_messages(&partitioning, messages)
-                .await
-                .unwrap();
+            let partition_id = topic
+                .calculate_partition_id(&partitioning)
+                .expect("Failed to calculate partition ID");
+            topic.append_messages(partition_id, messages).await.unwrap();
         }
 
         let mut read_messages_count = 0;
@@ -337,7 +336,7 @@ mod tests {
         let messages_count_of_parent_stream = Arc::new(AtomicU64::new(0));
         let segments_count_of_parent_stream = Arc::new(AtomicU32::new(0));
 
-        let topic = Topic::create(
+        let created_info = Topic::create(
             stream_id,
             id,
             name,
@@ -352,8 +351,8 @@ mod tests {
             MaxTopicSize::ServerDefault,
             1,
         )
-        .await
         .unwrap();
+        let topic = created_info.topic;
         topic.persist().await.unwrap();
         topic
     }
diff --git a/core/server/src/streaming/topics/partitions.rs 
b/core/server/src/streaming/topics/partitions.rs
index dfedf0a9..b9e56331 100644
--- a/core/server/src/streaming/topics/partitions.rs
+++ b/core/server/src/streaming/topics/partitions.rs
@@ -34,7 +34,7 @@ impl Topic {
         self.partitions.len() as u32
     }
 
-    pub async fn add_partitions(&mut self, count: u32) -> Result<Vec<u32>, 
IggyError> {
+    pub fn add_partitions(&mut self, count: u32) -> Result<Vec<u32>, 
IggyError> {
         if count == 0 {
             return Ok(vec![]);
         }
@@ -60,8 +60,7 @@ impl Topic {
                 self.size_bytes.clone(),
                 self.segments_count_of_parent_stream.clone(),
                 IggyTimestamp::now(),
-            )
-            .await;
+            );
             self.partitions
                 .insert(partition_id, IggyRwLock::new(partition));
             partition_ids.push(partition_id)
@@ -71,12 +70,9 @@ impl Topic {
     }
 
     pub async fn add_persisted_partitions(&mut self, count: u32) -> 
Result<Vec<u32>, IggyError> {
-        let partition_ids = self
-            .add_partitions(count)
-            .await
-            .with_error_context(|error| {
-                format!("{COMPONENT} (error: {error}) - failed to add 
partitions, count: {count}")
-            })?;
+        let partition_ids = 
self.add_partitions(count).with_error_context(|error| {
+            format!("{COMPONENT} (error: {error}) - failed to add partitions, 
count: {count}")
+        })?;
         for partition_id in &partition_ids {
             let partition = self.partitions.get(partition_id).unwrap();
             let mut partition = partition.write().await;
diff --git a/core/server/src/streaming/topics/storage.rs 
b/core/server/src/streaming/topics/storage.rs
index db74fc98..827ba6c9 100644
--- a/core/server/src/streaming/topics/storage.rs
+++ b/core/server/src/streaming/topics/storage.rs
@@ -113,8 +113,7 @@ impl TopicStorage for FileTopicStorage {
                 topic.size_bytes.clone(),
                 topic.segments_count_of_parent_stream.clone(),
                 partition_state.created_at,
-            )
-            .await;
+            );
             unloaded_partitions.push(partition);
         }
 
@@ -160,8 +159,7 @@ impl TopicStorage for FileTopicStorage {
                         topic.size_bytes.clone(),
                         topic.segments_count_of_parent_stream.clone(),
                         partition_state.created_at,
-                    )
-                    .await;
+                    );
                     partition.persist().await.with_error_context(|error| {
                         format!(
                             "{COMPONENT} (error: {error}) - failed to persist 
partition: {partition}"
diff --git a/core/server/src/streaming/topics/topic.rs 
b/core/server/src/streaming/topics/topic.rs
index 2e9e38c9..43b28c49 100644
--- a/core/server/src/streaming/topics/topic.rs
+++ b/core/server/src/streaming/topics/topic.rs
@@ -63,9 +63,14 @@ pub struct Topic {
     pub created_at: IggyTimestamp,
 }
 
+pub struct CreatedTopicInfo {
+    pub topic: Topic,
+    pub partition_ids: Vec<u32>,
+}
+
 impl Topic {
     #[allow(clippy::too_many_arguments)]
-    pub async fn empty(
+    pub fn empty(
         stream_id: u32,
         topic_id: u32,
         name: &str,
@@ -74,7 +79,7 @@ impl Topic {
         segments_count_of_parent_stream: Arc<AtomicU32>,
         config: Arc<SystemConfig>,
         storage: Rc<SystemStorage>,
-    ) -> Topic {
+    ) -> CreatedTopicInfo {
         Topic::create(
             stream_id,
             topic_id,
@@ -90,12 +95,11 @@ impl Topic {
             MaxTopicSize::ServerDefault,
             1,
         )
-        .await
         .unwrap()
     }
 
     #[allow(clippy::too_many_arguments)]
-    pub async fn create(
+    pub fn create(
         stream_id: u32,
         topic_id: u32,
         name: &str,
@@ -109,7 +113,7 @@ impl Topic {
         compression_algorithm: CompressionAlgorithm,
         max_topic_size: MaxTopicSize,
         replication_factor: u8,
-    ) -> Result<Topic, IggyError> {
+    ) -> Result<CreatedTopicInfo, IggyError> {
         let path = config.get_topic_path(stream_id, topic_id);
         let partitions_path = config.get_partitions_path(stream_id, topic_id);
         let mut topic = Topic {
@@ -142,8 +146,11 @@ impl Topic {
             message_expiry, topic.message_expiry
         );
 
-        topic.add_partitions(partitions_count).await?;
-        Ok(topic)
+        let partition_ids = topic.add_partitions(partitions_count)?;
+        Ok(CreatedTopicInfo {
+            topic,
+            partition_ids,
+        })
     }
 
     pub fn is_full(&self) -> bool {
@@ -313,7 +320,7 @@ mod tests {
         let messages_count_of_parent_stream = Arc::new(AtomicU64::new(0));
         let segments_count_of_parent_stream = Arc::new(AtomicU32::new(0));
 
-        let topic = Topic::create(
+        let topic_info = Topic::create(
             stream_id,
             topic_id,
             name,
@@ -328,9 +335,8 @@ mod tests {
             max_topic_size,
             replication_factor,
         )
-        .await
         .unwrap();
-
+        let topic = topic_info.topic;
         assert_eq!(topic.stream_id, stream_id);
         assert_eq!(topic.topic_id, topic_id);
         assert_eq!(topic.path, path);

Reply via email to