This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch rebase_master in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 05ab5a7705ce86a3f08aa95759617c754cc3aec9 Author: numinex <[email protected]> AuthorDate: Sun Jun 29 10:01:36 2025 +0200 finish send/poll messages --- core/common/src/error/iggy_error.rs | 5 + core/common/src/lib.rs | 1 - core/common/src/types/confirmation/mod.rs | 53 --------- core/common/src/types/mod.rs | 1 - .../tests/streaming/get_by_timestamp.rs | 5 +- core/integration/tests/streaming/topic_messages.rs | 5 +- core/sdk/src/prelude.rs | 2 +- .../handlers/messages/poll_messages_handler.rs | 128 +++++++++++++++++---- .../handlers/messages/send_messages_handler.rs | 111 +++++++++++++++--- core/server/src/configs/defaults.rs | 6 - core/server/src/configs/displays.rs | 3 +- core/server/src/configs/system.rs | 3 - core/server/src/shard/mod.rs | 80 ++++++++++++- core/server/src/shard/namespace.rs | 11 +- core/server/src/shard/system/messages.rs | 52 ++------- core/server/src/shard/system/partitions.rs | 15 ++- core/server/src/shard/system/streams.rs | 9 ++ core/server/src/shard/system/topics.rs | 24 +++- core/server/src/shard/transmission/event.rs | 60 ++++++++++ core/server/src/shard/transmission/frame.rs | 25 ++-- core/server/src/shard/transmission/message.rs | 28 ++++- core/server/src/shard/transmission/mod.rs | 1 + core/server/src/streaming/partitions/messages.rs | 20 +--- core/server/src/streaming/partitions/partition.rs | 8 +- .../streaming/segments/messages/messages_reader.rs | 31 +++-- .../streaming/segments/messages/messages_writer.rs | 2 +- core/server/src/streaming/streams/storage.rs | 14 +-- core/server/src/streaming/streams/topics.rs | 14 ++- .../server/src/streaming/topics/consumer_groups.rs | 6 +- core/server/src/streaming/topics/messages.rs | 55 +++++---- core/server/src/streaming/topics/partitions.rs | 14 +-- core/server/src/streaming/topics/storage.rs | 6 +- core/server/src/streaming/topics/topic.rs | 26 +++-- 33 files changed, 530 insertions(+), 294 deletions(-) diff --git a/core/common/src/error/iggy_error.rs b/core/common/src/error/iggy_error.rs index cc8a284b..334e14c7 100644 --- a/core/common/src/error/iggy_error.rs +++ b/core/common/src/error/iggy_error.rs @@ -459,6 +459,11 @@ pub enum IggyError { CannotReadIndexPosition = 10011, #[error("Cannot read index timestamp")] CannotReadIndexTimestamp = 10012, + + #[error("Shard not found for stream ID: {0}, topic ID: {1}, partition ID: {2}")] + ShardNotFound(u32, u32, u32) = 11000, + #[error("Shard communication error, shard ID: {0}")] + ShardCommunicationError(u16) = 11001, } impl IggyError { diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs index 6603a7d5..f055476e 100644 --- a/core/common/src/lib.rs +++ b/core/common/src/lib.rs @@ -64,7 +64,6 @@ pub use types::configuration::tcp_config::tcp_client_config::*; pub use types::configuration::tcp_config::tcp_client_config_builder::*; pub use types::configuration::tcp_config::tcp_client_reconnection_config::*; pub use types::configuration::tcp_config::tcp_connection_string_options::*; -pub use types::confirmation::*; pub use types::consumer::consumer_group::*; pub use types::consumer::consumer_kind::*; pub use types::consumer::consumer_offset_info::*; diff --git a/core/common/src/types/confirmation/mod.rs b/core/common/src/types/confirmation/mod.rs deleted file mode 100644 index bc26cffa..00000000 --- a/core/common/src/types/confirmation/mod.rs +++ /dev/null @@ -1,53 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -use serde::{Deserialize, Serialize}; -use strum::{Display, EnumString}; - -#[derive(Clone, Copy, Debug, Default, Display, Serialize, Deserialize, EnumString, PartialEq)] -#[strum(serialize_all = "snake_case")] -pub enum Confirmation { - #[default] - Wait, -} - -#[cfg(test)] -mod tests { - use super::*; - use std::str::FromStr; - - #[test] - fn test_to_string() { - assert_eq!(Confirmation::Wait.to_string(), "wait"); - assert_eq!(Confirmation::NoWait.to_string(), "no_wait"); - } - - #[test] - fn test_from_str() { - assert_eq!(Confirmation::from_str("wait").unwrap(), Confirmation::Wait); - assert_eq!( - Confirmation::from_str("no_wait").unwrap(), - Confirmation::NoWait - ); - } - - #[test] - fn test_default() { - assert_eq!(Confirmation::default(), Confirmation::Wait); - } -} diff --git a/core/common/src/types/mod.rs b/core/common/src/types/mod.rs index 2376abd6..6940f065 100644 --- a/core/common/src/types/mod.rs +++ b/core/common/src/types/mod.rs @@ -21,7 +21,6 @@ pub(crate) mod client_state; pub(crate) mod command; pub(crate) mod compression; pub(crate) mod configuration; -pub(crate) mod confirmation; pub(crate) mod consumer; pub(crate) mod diagnostic; pub(crate) mod identifier; diff --git a/core/integration/tests/streaming/get_by_timestamp.rs b/core/integration/tests/streaming/get_by_timestamp.rs index d0cc2e5b..93a178aa 100644 --- a/core/integration/tests/streaming/get_by_timestamp.rs +++ b/core/integration/tests/streaming/get_by_timestamp.rs @@ -202,10 +202,7 @@ async fn test_get_messages_by_timestamp( let batch = IggyMessagesBatchMut::from_messages(messages_slice_to_append, messages_size); assert_eq!(batch.count(), batch_len); - partition - .append_messages(batch) - .await - .unwrap(); + partition.append_messages(batch).await.unwrap(); // Capture the timestamp of this batch batch_timestamps.push(IggyTimestamp::now()); diff --git a/core/integration/tests/streaming/topic_messages.rs b/core/integration/tests/streaming/topic_messages.rs index f21263c4..19de255f 100644 --- a/core/integration/tests/streaming/topic_messages.rs +++ b/core/integration/tests/streaming/topic_messages.rs @@ -62,10 +62,7 @@ async fn assert_polling_messages() { .map(|m| m.get_size_bytes()) .sum::<IggyByteSize>(); let batch = IggyMessagesBatchMut::from_messages(&messages, batch_size.as_bytes_u32()); - topic - .append_messages(&partitioning, batch) - .await - .unwrap(); + topic.append_messages(&partitioning, batch).await.unwrap(); let consumer = PollingConsumer::Consumer(1, partition_id); let (_, polled_messages) = topic diff --git a/core/sdk/src/prelude.rs b/core/sdk/src/prelude.rs index 1e1ee111..e04164fe 100644 --- a/core/sdk/src/prelude.rs +++ b/core/sdk/src/prelude.rs @@ -51,7 +51,7 @@ pub use iggy_binary_protocol::{ }; pub use iggy_common::{ Aes256GcmEncryptor, Args, ArgsOptional, AutoLogin, BytesSerializable, CacheMetrics, - CacheMetricsKey, ClientError, ClientInfoDetails, CompressionAlgorithm, Confirmation, Consumer, + CacheMetricsKey, ClientError, ClientInfoDetails, CompressionAlgorithm, Consumer, ConsumerGroupDetails, ConsumerKind, EncryptorKind, FlushUnsavedBuffer, GlobalPermissions, HeaderKey, HeaderValue, HttpClientConfig, HttpClientConfigBuilder, IdKind, Identifier, IdentityInfo, IggyByteSize, IggyDuration, IggyError, IggyExpiry, IggyIndexView, IggyMessage, diff --git a/core/server/src/binary/handlers/messages/poll_messages_handler.rs b/core/server/src/binary/handlers/messages/poll_messages_handler.rs index 9d6d526b..c590dcf2 100644 --- a/core/server/src/binary/handlers/messages/poll_messages_handler.rs +++ b/core/server/src/binary/handlers/messages/poll_messages_handler.rs @@ -20,8 +20,12 @@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHa use crate::binary::handlers::messages::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::sender::SenderKind; -use crate::shard::IggyShard; +use crate::shard::namespace::IggyNamespace; +use crate::shard::transmission::frame::ShardResponse; +use crate::shard::transmission::message::{ShardMessage, ShardRequest}; +use crate::shard::{IggyShard, ShardRequestResult}; use crate::shard::system::messages::PollingArgs; +use crate::streaming::segments::IggyMessagesBatchSet; use crate::streaming::session::Session; use crate::to_iovec; use anyhow::Result; @@ -60,48 +64,128 @@ impl ServerCommandHandler for PollMessages { ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); - let (metadata, messages) = shard - .poll_messages( - session, - &self.consumer, - &self.stream_id, - &self.topic_id, - self.partition_id, - PollingArgs::new(self.strategy, self.count, self.auto_commit), - ) - .await + let stream = shard + .get_stream(&self.stream_id) + .with_error_context(|error| { + format!( + "{COMPONENT} (error: {error}) - stream not found for stream ID: {}", + self.stream_id + ) + })?; + let topic = stream.get_topic( + &self.topic_id, + ).with_error_context(|error| format!( + "{COMPONENT} (error: {error}) - topic not found for stream ID: {}, topic_id: {}", + self.stream_id, self.topic_id + ))?; + + let PollMessages { + consumer, + partition_id, + strategy, + count, + auto_commit, + .. + } = self; + let args = PollingArgs::new(strategy, count, auto_commit); + + shard.permissioner + .borrow() + .poll_messages(session.get_user_id(), topic.stream_id, topic.topic_id) .with_error_context(|error| format!( - "{COMPONENT} (error: {error}) - failed to poll messages for consumer: {}, stream_id: {}, topic_id: {}, partition_id: {:?}, session: {session}.", - self.consumer, self.stream_id, self.topic_id, self.partition_id + "{COMPONENT} (error: {error}) - permission denied to poll messages for user {} on stream ID: {}, topic ID: {}", + session.get_user_id(), + topic.stream_id, + topic.topic_id ))?; + if !topic.has_partitions() { + return Err(IggyError::NoPartitions(topic.topic_id, topic.stream_id)); + } + + // There might be no partition assigned, if it's the consumer group member without any partitions. + let Some((consumer, partition_id)) = topic + .resolve_consumer_with_partition_id(&consumer, session.client_id, partition_id, true) + .with_error_context(|error| format!("{COMPONENT} (error: {error}) - failed to resolve consumer with partition id, consumer: {consumer}, client ID: {}, partition ID: {:?}", session.client_id, partition_id))? else { + todo!("Send early response"); + //return Ok((IggyPollMetadata::new(0, 0), IggyMessagesBatchSet::empty())); + }; + + let namespace = IggyNamespace::new(stream.stream_id, topic.topic_id, partition_id); + let request = ShardRequest::PollMessages { + consumer, + partition_id, + args, + count, + }; + let message = ShardMessage::Request(request); + let (metadata, batch) = match shard.send_request_to_shard(&namespace, message).await { + ShardRequestResult::SameShard(message) => { + match message { + ShardMessage::Request(request) => { + match request { + ShardRequest::PollMessages { + consumer, + partition_id, + args, + count, + } => { + topic.get_messages(consumer, partition_id, args.strategy, count).await? + + } + _ => unreachable!( + "Expected a SendMessages request inside of SendMessages handler, impossible state" + ), + } + } + _ => unreachable!( + "Expected a request message inside of an command handler, impossible state" + ), + } + } + ShardRequestResult::Result(result) => { + match result? { + ShardResponse::PollMessages(response) => { + response + } + ShardResponse::ErrorResponse(err) => { + return Err(err); + } + _ => unreachable!( + "Expected a PollMessages response inside of PollMessages handler, impossible state" + ), + } + } + }; + + // Collect all chunks first into a Vec to extend their lifetimes. // This ensures the Bytes (in reality Arc<[u8]>) references from each IggyMessagesBatch stay alive // throughout the async vectored I/O operation, preventing "borrowed value does not live // long enough" errors while optimizing transmission by using larger chunks. // 4 bytes for partition_id + 8 bytes for current_offset + 4 bytes for messages_count + size of all batches. - let response_length = 4 + 8 + 4 + messages.size(); + let response_length = 4 + 8 + 4 + batch.size(); let response_length_bytes = response_length.to_le_bytes(); let partition_id = metadata.partition_id.to_le_bytes(); let current_offset = metadata.current_offset.to_le_bytes(); - let count = messages.count().to_le_bytes(); + let count = batch.count().to_le_bytes(); - let mut io_slices = Vec::with_capacity(messages.containers_count() + 3); - io_slices.push(to_iovec(&partition_id)); - io_slices.push(to_iovec(¤t_offset)); - io_slices.push(to_iovec(&count)); + let mut iovecs = Vec::with_capacity(batch.containers_count() + 3); + iovecs.push(to_iovec(&partition_id)); + iovecs.push(to_iovec(¤t_offset)); + iovecs.push(to_iovec(&count)); - io_slices.extend(messages.iter().map(|m| to_iovec(&m))); + iovecs.extend(batch.iter().map(|m| to_iovec(&m))); trace!( "Sending {} messages to client ({} bytes) to client", - messages.count(), + batch.count(), response_length ); sender - .send_ok_response_vectored(&response_length_bytes, io_slices) + .send_ok_response_vectored(&response_length_bytes, iovecs) .await?; Ok(()) } diff --git a/core/server/src/binary/handlers/messages/send_messages_handler.rs b/core/server/src/binary/handlers/messages/send_messages_handler.rs index d9a4770f..61e2c39c 100644 --- a/core/server/src/binary/handlers/messages/send_messages_handler.rs +++ b/core/server/src/binary/handlers/messages/send_messages_handler.rs @@ -16,19 +16,25 @@ * under the License. */ +use super::COMPONENT; use crate::binary::command::{BinaryServerCommand, ServerCommandHandler}; use crate::binary::sender::SenderKind; -use crate::shard::IggyShard; +use crate::shard::namespace::IggyNamespace; +use crate::shard::transmission::frame::ShardResponse; +use crate::shard::transmission::message::{ShardMessage, ShardRequest}; +use crate::shard::{IggyShard, ShardRequestResult}; use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut}; use crate::streaming::session::Session; use crate::streaming::utils::PooledBuffer; use anyhow::Result; use bytes::BytesMut; -use iggy_common::INDEX_SIZE; +use error_set::ErrContext; use iggy_common::Identifier; use iggy_common::Sizeable; +use iggy_common::{INDEX_SIZE, IdKind}; use iggy_common::{IggyError, Partitioning, SendMessages, Validatable}; use std::rc::Rc; +use std::sync::Arc; use tracing::instrument; impl ServerCommandHandler for SendMessages { @@ -104,19 +110,98 @@ impl ServerCommandHandler for SendMessages { indexes, messages_buffer, ); - batch.validate()?; - shard - .append_messages( - session, - &self.stream_id, - &self.topic_id, - &self.partitioning, - batch, - ) - .await?; - + let stream = shard + .get_stream(&self.stream_id) + .with_error_context(|error| { + format!( + "Failed to get stream with ID: {} (error: {})", + self.stream_id, error + ) + })?; + let topic = stream + .get_topic(&self.topic_id) + .with_error_context(|error| { + format!( + "Failed to get topic with ID: {} (error: {})", + self.topic_id, error + ) + })?; + let stream_id = stream.stream_id; + let topic_id = topic.topic_id; + let partition_id = topic.calculate_partition_id(&self.partitioning)?; + + // Validate permissions for given user on stream and topic. + shard.permissioner.borrow().append_messages( + session.get_user_id(), + stream_id, + topic_id + ).with_error_context(|error| format!( + "{COMPONENT} (error: {error}) - permission denied to append messages for user {} on stream ID: {}, topic ID: {}", + session.get_user_id(), + stream_id, + topic_id + ))?; + let messages_count = batch.count(); + shard.metrics.increment_messages(messages_count as u64); + + // Encrypt messages if encryptor is enabled in configuration. + let batch = shard.maybe_encrypt_messages(batch)?; + + let namespace = IggyNamespace::new(stream.stream_id, topic.topic_id, partition_id); + let request = ShardRequest::SendMessages { + stream_id, + topic_id, + partition_id, + batch, + }; + let message = ShardMessage::Request(request); + // Egh... I don't like those nested match statements, + // Technically there is only two `request` types that will ever be dispatched + // to different shards, thus we could get away with generic structs + // maybe todo ? + // how to make this code reusable, in a sense that we will have exactly the same code inside of + // `PollMessages` handler, but with different request.... + match shard.send_request_to_shard(&namespace, message).await { + ShardRequestResult::SameShard(message) => { + match message { + ShardMessage::Request(request) => { + match request { + ShardRequest::SendMessages { + stream_id, + topic_id, + partition_id, + batch, + } => { + // Just shut up rust analyzer. + let _stream_id = stream_id; + let _topic_id = topic_id; + topic.append_messages(partition_id, batch).await? + } + _ => unreachable!( + "Expected a SendMessages request inside of SendMessages handler, impossible state" + ), + } + } + _ => unreachable!( + "Expected a request message inside of an command handler, impossible state" + ), + } + } + ShardRequestResult::Result(result) => { + match result? { + ShardResponse::SendMessages => { + () + } + ShardResponse::ErrorResponse(err) => { + return Err(err); + } + _ => unreachable!("Expected a SendMessages response inside of SendMessages handler, impossible state"), + } + + } + }; sender.send_empty_ok_response().await?; Ok(()) } diff --git a/core/server/src/configs/defaults.rs b/core/server/src/configs/defaults.rs index 82b560cf..dd74ade7 100644 --- a/core/server/src/configs/defaults.rs +++ b/core/server/src/configs/defaults.rs @@ -451,12 +451,6 @@ impl Default for SegmentConfig { cache_indexes: SERVER_CONFIG.system.segment.cache_indexes.parse().unwrap(), message_expiry: SERVER_CONFIG.system.segment.message_expiry.parse().unwrap(), archive_expired: SERVER_CONFIG.system.segment.archive_expired, - server_confirmation: SERVER_CONFIG - .system - .segment - .server_confirmation - .parse() - .unwrap(), } } } diff --git a/core/server/src/configs/displays.rs b/core/server/src/configs/displays.rs index de4805a8..51873421 100644 --- a/core/server/src/configs/displays.rs +++ b/core/server/src/configs/displays.rs @@ -289,12 +289,11 @@ impl Display for SegmentConfig { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, - "{{ size_bytes: {}, cache_indexes: {}, message_expiry: {}, archive_expired: {}, server_confirmation: {} }}", + "{{ size_bytes: {}, cache_indexes: {}, message_expiry: {}, archive_expired: {} }}", self.size, self.cache_indexes, self.message_expiry, self.archive_expired, - self.server_confirmation, ) } } diff --git a/core/server/src/configs/system.rs b/core/server/src/configs/system.rs index fc263a26..a5193294 100644 --- a/core/server/src/configs/system.rs +++ b/core/server/src/configs/system.rs @@ -17,7 +17,6 @@ */ use super::cache_indexes::CacheIndexesConfig; -use iggy_common::Confirmation; use iggy_common::IggyByteSize; use iggy_common::IggyExpiry; use iggy_common::MaxTopicSize; @@ -141,8 +140,6 @@ pub struct SegmentConfig { #[serde_as(as = "DisplayFromStr")] pub message_expiry: IggyExpiry, pub archive_expired: bool, - #[serde_as(as = "DisplayFromStr")] - pub server_confirmation: Confirmation, } #[serde_as] diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs index dbd94b68..7976cca3 100644 --- a/core/server/src/shard/mod.rs +++ b/core/server/src/shard/mod.rs @@ -47,14 +47,12 @@ use crate::{ shard::{ system::info::SystemInfo, transmission::{ - frame::ShardFrame, - message::{ShardEvent, ShardMessage}, + frame::{ShardFrame, ShardResponse}, + message::{ShardEvent, ShardMessage, ShardRequest}, }, }, state::{ - StateKind, - file::FileState, - system::{StreamState, SystemState, UserState}, + file::FileState, system::{StreamState, SystemState, UserState}, StateKind }, streaming::{ clients::client_manager::ClientManager, @@ -85,12 +83,37 @@ impl Shard { connection, } } + + pub async fn send_request(&self, message: ShardMessage) -> Result<ShardResponse, IggyError> { + let (sender, receiver) = async_channel::bounded(1); + self.connection + .sender + .send(ShardFrame::new(message, Some(sender.clone()))); // Apparently sender needs to be cloned, otherwise channel will close... + //TODO: Fixme + let response = receiver.recv().await + .map_err(|err| { + error!("Failed to receive response from shard: {err}"); + IggyError::ShardCommunicationError(self.id) + })?; + Ok(response) + } } struct ShardInfo { id: u16, } +impl ShardInfo { + pub fn new(id: u16) -> Self { + Self { id } + } +} + +pub enum ShardRequestResult<T, E> { + SameShard(ShardMessage), + Result(Result<T, E>), +} + pub struct IggyShard { pub id: u16, shards: Vec<Shard>, @@ -426,6 +449,53 @@ impl IggyShard { self.shards.len() as u32 } + pub async fn send_request_to_shard( + &self, + namespace: &IggyNamespace, + message: ShardMessage, + ) -> ShardRequestResult<ShardResponse, IggyError> { + if let Some(shard) = self.find_shard(namespace) { + if shard.id == self.id { + return ShardRequestResult::SameShard(message); + } + + let response = match shard.send_request(message).await { + Ok(response) => response, + Err(err) => { + error!( + "{COMPONENT} - failed to send request to shard with ID: {}, error: {err}", + shard.id + ); + return ShardRequestResult::Result(Err(err)); + } + }; + ShardRequestResult::Result(Ok(response)) + } else { + ShardRequestResult::Result(Err(IggyError::ShardNotFound( + namespace.stream_id, + namespace.topic_id, + namespace.partition_id, + ))) + } + } + + fn find_shard(&self, namespace: &IggyNamespace) -> Option<&Shard> { + let shards_table = self.shards_table.borrow(); + shards_table.get(namespace).map(|shard_info| { + self.shards + .iter() + .find(|shard| shard.id == shard_info.id) + .expect("Shard not found in the shards table.") + }) + } + + pub fn insert_shard_table_records( + &self, + records: impl Iterator<Item = (IggyNamespace, ShardInfo)>, + ) { + self.shards_table.borrow_mut().extend(records); + } + pub fn broadcast_event_to_all_shards(&self, client_id: u32, event: ShardEvent) { self.shards .iter() diff --git a/core/server/src/shard/namespace.rs b/core/server/src/shard/namespace.rs index a7c899aa..6b3b5b7d 100644 --- a/core/server/src/shard/namespace.rs +++ b/core/server/src/shard/namespace.rs @@ -17,19 +17,18 @@ */ use hash32::{Hasher, Murmur3Hasher}; -use iggy_common::Identifier; use std::hash::Hasher as _; //TODO: Will probably want to move it to separate crate so we can share it with sdk. #[derive(Debug, Clone, Eq, PartialEq, Hash)] pub struct IggyNamespace { - pub(crate) stream_id: Identifier, - pub(crate) topic_id: Identifier, + pub(crate) stream_id: u32, + pub(crate) topic_id: u32, pub(crate) partition_id: u32, } impl IggyNamespace { - pub fn new(stream_id: Identifier, topic_id: Identifier, partition_id: u32) -> Self { + pub fn new(stream_id: u32, topic_id: u32, partition_id: u32) -> Self { Self { stream_id, topic_id, @@ -39,8 +38,8 @@ impl IggyNamespace { pub fn generate_hash(&self) -> u32 { let mut hasher = Murmur3Hasher::default(); - hasher.write(&self.stream_id.value); - hasher.write(&self.topic_id.value); + hasher.write_u32(self.stream_id); + hasher.write_u32(self.topic_id); hasher.write_u32(self.partition_id); hasher.finish32() } diff --git a/core/server/src/shard/system/messages.rs b/core/server/src/shard/system/messages.rs index 9a23a9bb..76337a1a 100644 --- a/core/server/src/shard/system/messages.rs +++ b/core/server/src/shard/system/messages.rs @@ -25,7 +25,7 @@ use crate::streaming::utils::PooledBuffer; use async_zip::tokio::read::stream; use error_set::ErrContext; use iggy_common::{ - BytesSerializable, Confirmation, Consumer, EncryptorKind, IGGY_MESSAGE_HEADER_SIZE, Identifier, + BytesSerializable, Consumer, EncryptorKind, IGGY_MESSAGE_HEADER_SIZE, Identifier, IggyError, Partitioning, PollingStrategy, }; use tracing::{error, trace}; @@ -98,47 +98,6 @@ impl IggyShard { Ok((metadata, batch_set)) } - pub async fn append_messages( - &self, - session: &Session, - stream_id: &Identifier, - topic_id: &Identifier, - partitioning: &Partitioning, - messages: IggyMessagesBatchMut, - ) -> Result<(), IggyError> { - self.ensure_authenticated(session)?; - let stream = self.get_stream(stream_id).with_error_context(|error| { - format!("{COMPONENT} (error: {error}) - stream not found for stream_id: {stream_id}") - })?; - let stream_id = stream.stream_id; - let topic = self.find_topic(session, &stream, topic_id).with_error_context(|error| format!("{COMPONENT} (error: {error}) - topic not found for stream_id: {stream_id}, topic_id: {topic_id}"))?; - self.permissioner.borrow().append_messages( - session.get_user_id(), - topic.stream_id, - topic.topic_id - ).with_error_context(|error| format!( - "{COMPONENT} (error: {error}) - permission denied to append messages for user {} on stream ID: {}, topic ID: {}", - session.get_user_id(), - topic.stream_id, - topic.topic_id - ))?; - let messages_count = messages.count(); - - // Encrypt messages if encryptor is configured - let messages = if let Some(encryptor) = &self.encryptor { - self.encrypt_messages(messages, encryptor)? - } else { - messages - }; - - topic - .append_messages(partitioning, messages) - .await?; - - self.metrics.increment_messages(messages_count as u64); - Ok(()) - } - pub async fn flush_unsaved_buffer( &self, session: &Session, @@ -167,7 +126,7 @@ impl IggyShard { Ok(()) } - async fn decrypt_messages( + pub async fn decrypt_messages( &self, batches: IggyMessagesBatchSet, encryptor: &EncryptorKind, @@ -206,11 +165,14 @@ impl IggyShard { Ok(IggyMessagesBatchSet::from_vec(decrypted_batches)) } - fn encrypt_messages( + pub fn maybe_encrypt_messages( &self, batch: IggyMessagesBatchMut, - encryptor: &EncryptorKind, ) -> Result<IggyMessagesBatchMut, IggyError> { + let encryptor = match self.encryptor.as_ref() { + Some(encryptor) => encryptor, + None => return Ok(batch), + }; let mut encrypted_messages = PooledBuffer::with_capacity(batch.size() as usize * 2); let count = batch.count(); let mut indexes = IggyIndexesMut::with_capacity(batch.count() as usize, 0); diff --git a/core/server/src/shard/system/partitions.rs b/core/server/src/shard/system/partitions.rs index 01e02968..78ef2144 100644 --- a/core/server/src/shard/system/partitions.rs +++ b/core/server/src/shard/system/partitions.rs @@ -18,6 +18,8 @@ use super::COMPONENT; use crate::shard::IggyShard; +use crate::shard::ShardInfo; +use crate::shard::namespace::IggyNamespace; use crate::streaming::session::Session; use error_set::ErrContext; use iggy_common::Identifier; @@ -54,6 +56,7 @@ impl IggyShard { let mut stream = self.get_stream_mut(stream_id).with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to get stream with ID: {stream_id}") })?; + let stream_id = stream.stream_id; let topic = stream .get_topic_mut(topic_id) .with_error_context(|error| { @@ -61,15 +64,25 @@ impl IggyShard { "{COMPONENT} (error: {error}) - failed to get mutable reference to stream with id: {stream_id}" ) })?; + let topic_id = topic.topic_id; // TODO: Make add persisted partitions to topic sync, and extract the storage persister out of it // perform disk i/o outside of the borrow_mut of the stream. - topic + let partition_ids = topic .add_persisted_partitions(partitions_count) .await .with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to add persisted partitions, topic: {topic}") })?; + let records = partition_ids.into_iter().map(|partition_id| { + let namespace = IggyNamespace::new(stream_id, topic_id, partition_id); + let hash = namespace.generate_hash(); + let shard_id = hash % self.get_available_shards_count(); + let shard_info = ShardInfo::new(shard_id as u16); + (namespace, shard_info) + }); + self.insert_shard_table_records(records); + topic.reassign_consumer_groups(); self.metrics.increment_partitions(partitions_count); self.metrics.increment_segments(partitions_count); diff --git a/core/server/src/shard/system/streams.rs b/core/server/src/shard/system/streams.rs index 3cb0da5b..fecec536 100644 --- a/core/server/src/shard/system/streams.rs +++ b/core/server/src/shard/system/streams.rs @@ -112,6 +112,15 @@ impl IggyShard { } } + pub fn try_get_topic_id(&self, stream_id: &Identifier, name: &str) -> Option<u32> { + let stream = self.get_stream(stream_id).ok()?; + stream.topics_ids.get(name).and_then(|id| Some(*id)) + } + + pub fn try_get_stream_id(&self, name: &str) -> Option<u32> { + self.streams_ids.borrow().get(name).and_then(|id| Some(*id)) + } + fn try_get_stream_by_name(&self, name: &str) -> Option<Ref<'_, Stream>> { self.streams_ids .borrow() diff --git a/core/server/src/shard/system/topics.rs b/core/server/src/shard/system/topics.rs index 477308bc..252dae30 100644 --- a/core/server/src/shard/system/topics.rs +++ b/core/server/src/shard/system/topics.rs @@ -17,7 +17,8 @@ */ use super::COMPONENT; -use crate::shard::IggyShard; +use crate::shard::namespace::IggyNamespace; +use crate::shard::{IggyShard, ShardInfo}; use crate::streaming::session::Session; use crate::streaming::streams::stream::Stream; use crate::streaming::topics::topic::Topic; @@ -131,8 +132,11 @@ impl IggyShard { // TODO: Make create topic sync, and extract the storage persister out of it // perform disk i/o outside of the borrow_mut of the stream. - let created_topic_id = self - .get_stream_mut(stream_id)? + let mut stream = self.get_stream_mut(stream_id).with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to get mutable reference to stream with ID: {stream_id}") + })?; + let stream_id = stream.stream_id; + let (topic_id, partition_ids) = stream .create_topic( topic_id, name, @@ -146,12 +150,24 @@ impl IggyShard { .with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to create topic with name: {name} in stream ID: {stream_id}") })?; + let records = partition_ids.into_iter().map(|partition_id| { + let namespace = IggyNamespace::new(stream_id, topic_id, partition_id); + // TODO: This setup isn't deterministic. + // Imagine a scenario where client creates partition using `String` identifiers, + // but then for poll_messages requests uses numeric ones. + // the namespace wouldn't match, therefore we would get miss in the shard table. + let hash = namespace.generate_hash(); + let shard_id = hash % self.get_available_shards_count(); + let shard_info = ShardInfo::new(shard_id as u16); + (namespace, shard_info) + }); + self.insert_shard_table_records(records); self.metrics.increment_topics(1); self.metrics.increment_partitions(partitions_count); self.metrics.increment_segments(partitions_count); - Ok(Identifier::numeric(created_topic_id)?) + Ok(Identifier::numeric(topic_id)?) } #[allow(clippy::too_many_arguments)] diff --git a/core/server/src/shard/transmission/event.rs b/core/server/src/shard/transmission/event.rs new file mode 100644 index 00000000..45815a50 --- /dev/null +++ b/core/server/src/shard/transmission/event.rs @@ -0,0 +1,60 @@ +use std::net::SocketAddr; + +use iggy_common::{CompressionAlgorithm, Identifier, IggyExpiry, MaxTopicSize}; + +use crate::streaming::clients::client_manager::Transport; + +pub enum ShardEvent { + CreatedStream { + stream_id: Option<u32>, + topic_id: Identifier + }, + //DeletedStream(Identifier), + //UpdatedStream(Identifier, String), + //PurgedStream(Identifier), + //CreatedPartitions(Identifier, Identifier, u32), + //DeletedPartitions(Identifier, Identifier, u32), + CreatedTopic( + Identifier, + Option<u32>, + String, + u32, + IggyExpiry, + CompressionAlgorithm, + MaxTopicSize, + Option<u8>, + ), + //CreatedConsumerGroup(Identifier, Identifier, Option<u32>, String), + //DeletedConsumerGroup(Identifier, Identifier, Identifier), + /* + UpdatedTopic( + Identifier, + Identifier, + String, + IggyExpiry, + CompressionAlgorithm, + MaxTopicSize, + Option<u8>, + ), + */ + //PurgedTopic(Identifier, Identifier), + //DeletedTopic(Identifier, Identifier), + //CreatedUser(String, String, UserStatus, Option<Permissions>), + //DeletedUser(Identifier), + LoginUser { + username: String, + password: String, + }, + //LogoutUser, + //UpdatedUser(Identifier, Option<String>, Option<UserStatus>), + //ChangedPassword(Identifier, String, String), + //CreatedPersonalAccessToken(String, IggyExpiry), + //DeletedPersonalAccessToken(String), + //LoginWithPersonalAccessToken(String), + //StoredConsumerOffset(Identifier, Identifier, PollingConsumer, u64), + NewSession { + user_id: u32, + socket_addr: SocketAddr, + transport: Transport, + }, +} \ No newline at end of file diff --git a/core/server/src/shard/transmission/frame.rs b/core/server/src/shard/transmission/frame.rs index ec63daf0..ff519ab4 100644 --- a/core/server/src/shard/transmission/frame.rs +++ b/core/server/src/shard/transmission/frame.rs @@ -16,38 +16,33 @@ * under the License. */ use async_channel::Sender; -use bytes::Bytes; use iggy_common::IggyError; -use crate::shard::transmission::message::ShardMessage; +use crate::{binary::handlers::messages::poll_messages_handler::IggyPollMetadata, shard::transmission::message::ShardMessage, streaming::segments::IggyMessagesBatchSet}; + +#[derive(Debug)] +pub enum ShardResponse { + PollMessages((IggyPollMetadata, IggyMessagesBatchSet)), + SendMessages, + ShardEvent, + ErrorResponse(IggyError), +} #[derive(Debug)] pub struct ShardFrame { - pub client_id: u32, pub message: ShardMessage, pub response_sender: Option<Sender<ShardResponse>>, } impl ShardFrame { - pub fn new( - client_id: u32, - message: ShardMessage, - response_sender: Option<Sender<ShardResponse>>, - ) -> Self { + pub fn new(message: ShardMessage, response_sender: Option<Sender<ShardResponse>>) -> Self { Self { - client_id, message, response_sender, } } } -#[derive(Debug)] -pub enum ShardResponse { - BinaryResponse(Bytes), - ErrorResponse(IggyError), -} - #[macro_export] macro_rules! handle_response { ($sender:expr, $response:expr) => { diff --git a/core/server/src/shard/transmission/message.rs b/core/server/src/shard/transmission/message.rs index cbf4fdd4..360e7a3c 100644 --- a/core/server/src/shard/transmission/message.rs +++ b/core/server/src/shard/transmission/message.rs @@ -1,5 +1,7 @@ use std::rc::Rc; +use iggy_common::PollingStrategy; + /* Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,11 +19,11 @@ use std::rc::Rc; * specific language governing permissions and limitations * under the License. */ -use crate::{binary::command::ServerCommand, streaming::session::Session}; +use crate::{shard::system::messages::PollingArgs, streaming::{polling_consumer::PollingConsumer, segments::IggyMessagesBatchMut, session::Session}}; #[derive(Debug)] pub enum ShardMessage { - Command(ServerCommand), + Request(ShardRequest), Event(ShardEvent), } @@ -30,9 +32,25 @@ pub enum ShardEvent { NewSession(), } -impl From<ServerCommand> for ShardMessage { - fn from(command: ServerCommand) -> Self { - ShardMessage::Command(command) +#[derive(Debug)] +pub enum ShardRequest { + SendMessages { + stream_id: u32, + topic_id: u32, + partition_id: u32, + batch: IggyMessagesBatchMut, + }, + PollMessages { + partition_id: u32, + args: PollingArgs, + consumer: PollingConsumer, + count: u32, + }, +} + +impl From<ShardRequest> for ShardMessage { + fn from(request: ShardRequest) -> Self { + ShardMessage::Request(request) } } diff --git a/core/server/src/shard/transmission/mod.rs b/core/server/src/shard/transmission/mod.rs index 1f9a79ed..4ed9bd1e 100644 --- a/core/server/src/shard/transmission/mod.rs +++ b/core/server/src/shard/transmission/mod.rs @@ -16,6 +16,7 @@ * under the License. */ +pub mod event; pub mod connector; pub mod frame; pub mod message; diff --git a/core/server/src/streaming/partitions/messages.rs b/core/server/src/streaming/partitions/messages.rs index cc961370..3b2d942b 100644 --- a/core/server/src/streaming/partitions/messages.rs +++ b/core/server/src/streaming/partitions/messages.rs @@ -21,7 +21,7 @@ use crate::streaming::partitions::partition::Partition; use crate::streaming::polling_consumer::PollingConsumer; use crate::streaming::segments::*; use error_set::ErrContext; -use iggy_common::{Confirmation, IggyError, IggyTimestamp, Sizeable}; +use iggy_common::{IggyError, IggyTimestamp, Sizeable}; use std::sync::atomic::Ordering; use tracing::trace; @@ -218,10 +218,7 @@ impl Partition { Ok(batches) } - pub async fn append_messages( - &mut self, - batch: IggyMessagesBatchMut, - ) -> Result<(), IggyError> { + pub async fn append_messages(&mut self, batch: IggyMessagesBatchMut) -> Result<(), IggyError> { if batch.count() == 0 { return Ok(()); } @@ -564,10 +561,7 @@ mod tests { .map(|m| m.get_size_bytes().as_bytes_u32()) .sum(); let initial_batch = IggyMessagesBatchMut::from_messages(&initial_messages, initial_size); - partition - .append_messages(initial_batch) - .await - .unwrap(); + partition.append_messages(initial_batch).await.unwrap(); // Now try to add only duplicates let duplicate_messages = vec![ @@ -582,10 +576,7 @@ mod tests { .sum(); let duplicate_batch = IggyMessagesBatchMut::from_messages(&duplicate_messages, duplicate_size); - partition - .append_messages(duplicate_batch) - .await - .unwrap(); + partition.append_messages(duplicate_batch).await.unwrap(); let loaded_messages = partition.get_messages_by_offset(0, 10).await.unwrap(); @@ -739,8 +730,7 @@ mod tests { Arc::new(AtomicU64::new(0)), Arc::new(AtomicU32::new(0)), IggyTimestamp::now(), - ) - .await, + ), temp_dir, ) } diff --git a/core/server/src/streaming/partitions/partition.rs b/core/server/src/streaming/partitions/partition.rs index 1b8bacf8..4a573d14 100644 --- a/core/server/src/streaming/partitions/partition.rs +++ b/core/server/src/streaming/partitions/partition.rs @@ -84,7 +84,7 @@ impl ConsumerOffset { impl Partition { #[allow(clippy::too_many_arguments)] - pub async fn create( + pub fn create( stream_id: u32, topic_id: u32, partition_id: u32, @@ -247,8 +247,7 @@ mod tests { Arc::new(AtomicU64::new(0)), Arc::new(AtomicU32::new(0)), IggyTimestamp::now(), - ) - .await; + ); assert_eq!(partition.stream_id, stream_id); assert_eq!(partition.topic_id, topic_id); @@ -290,8 +289,7 @@ mod tests { Arc::new(AtomicU64::new(0)), Arc::new(AtomicU32::new(0)), IggyTimestamp::now(), - ) - .await; + ); assert!(partition.segments.is_empty()); } } diff --git a/core/server/src/streaming/segments/messages/messages_reader.rs b/core/server/src/streaming/segments/messages/messages_reader.rs index 0f413650..4091e8e3 100644 --- a/core/server/src/streaming/segments/messages/messages_reader.rs +++ b/core/server/src/streaming/segments/messages/messages_reader.rs @@ -16,9 +16,9 @@ * under the License. */ -use crate::io::file::{IggyFile}; +use crate::io::file::IggyFile; use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut}; -use crate::streaming::utils::{file, PooledBuffer}; +use crate::streaming::utils::{PooledBuffer, file}; use bytes::BytesMut; use error_set::ErrContext; use iggy_common::IggyError; @@ -174,19 +174,18 @@ impl MessagesReader { len: u32, use_pool: bool, ) -> Result<PooledBuffer, std::io::Error> { - if use_pool { - let mut buf = PooledBuffer::with_capacity(len as usize); - unsafe { buf.set_len(len as usize) }; - let (result, buf) = self.file.read_exact_at(buf, offset as u64).await; - result?; - Ok(buf) - - } else { - let mut buf = BytesMut::with_capacity(len as usize); - unsafe { buf.set_len(len as usize) }; - let (result, buf) = self.file.read_exact_at(buf, offset as u64).await; - result?; - Ok(PooledBuffer::from_existing(buf)) - } + if use_pool { + let mut buf = PooledBuffer::with_capacity(len as usize); + unsafe { buf.set_len(len as usize) }; + let (result, buf) = self.file.read_exact_at(buf, offset as u64).await; + result?; + Ok(buf) + } else { + let mut buf = BytesMut::with_capacity(len as usize); + unsafe { buf.set_len(len as usize) }; + let (result, buf) = self.file.read_exact_at(buf, offset as u64).await; + result?; + Ok(PooledBuffer::from_existing(buf)) + } } } diff --git a/core/server/src/streaming/segments/messages/messages_writer.rs b/core/server/src/streaming/segments/messages/messages_writer.rs index 0429817f..63e424a9 100644 --- a/core/server/src/streaming/segments/messages/messages_writer.rs +++ b/core/server/src/streaming/segments/messages/messages_writer.rs @@ -21,7 +21,7 @@ use crate::{ streaming::segments::{IggyMessagesBatchSet, messages::write_batch}, }; use error_set::ErrContext; -use iggy_common::{Confirmation, IggyByteSize, IggyError}; +use iggy_common::{IggyByteSize, IggyError}; use monoio::fs::{File, OpenOptions}; use std::sync::{ Arc, diff --git a/core/server/src/streaming/streams/storage.rs b/core/server/src/streaming/streams/storage.rs index 53639de9..ebd4946a 100644 --- a/core/server/src/streaming/streams/storage.rs +++ b/core/server/src/streaming/streams/storage.rs @@ -20,6 +20,7 @@ use crate::state::system::StreamState; use crate::streaming::storage::StreamStorage; use crate::streaming::streams::COMPONENT; use crate::streaming::streams::stream::Stream; +use crate::streaming::topics::topic::CreatedTopicInfo; use crate::streaming::topics::topic::Topic; use ahash::AHashSet; use error_set::ErrContext; @@ -83,7 +84,7 @@ impl StreamStorage for FileStreamStorage { } let topic_state = topic_state.unwrap(); - let topic = Topic::empty( + let topic_info = Topic::empty( stream.stream_id, topic_id, &topic_state.name, @@ -92,9 +93,8 @@ impl StreamStorage for FileStreamStorage { stream.segments_count.clone(), stream.config.clone(), stream.storage.clone(), - ) - .await; - unloaded_topics.push(topic); + ); + unloaded_topics.push(topic_info.topic); } let state_topic_ids = state.topics.keys().copied().collect::<AHashSet<u32>>(); @@ -123,7 +123,7 @@ impl StreamStorage for FileStreamStorage { ); for topic_id in missing_ids { let topic_state = state.topics.get(&topic_id).unwrap(); - let topic = Topic::empty( + let topic_info = Topic::empty( stream.stream_id, topic_id, &topic_state.name, @@ -132,8 +132,8 @@ impl StreamStorage for FileStreamStorage { stream.segments_count.clone(), stream.config.clone(), stream.storage.clone(), - ) - .await; + ); + let topic = topic_info.topic; topic.persist().await.with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to persist topic: {topic}") })?; diff --git a/core/server/src/streaming/streams/topics.rs b/core/server/src/streaming/streams/topics.rs index 1d47cc36..6a201c7a 100644 --- a/core/server/src/streaming/streams/topics.rs +++ b/core/server/src/streaming/streams/topics.rs @@ -18,6 +18,7 @@ use crate::streaming::streams::COMPONENT; use crate::streaming::streams::stream::Stream; +use crate::streaming::topics::topic::CreatedTopicInfo; use crate::streaming::topics::topic::Topic; use error_set::ErrContext; use iggy_common::CompressionAlgorithm; @@ -44,7 +45,7 @@ impl Stream { compression_algorithm: CompressionAlgorithm, max_topic_size: MaxTopicSize, replication_factor: u8, - ) -> Result<u32, IggyError> { + ) -> Result<(u32, Vec<u32>), IggyError> { let max_topic_size = Topic::get_max_topic_size(max_topic_size, &self.config)?; if self.topics_ids.contains_key(name) { return Err(IggyError::TopicNameAlreadyExists( @@ -74,7 +75,10 @@ impl Stream { return Err(IggyError::TopicIdAlreadyExists(id, self.stream_id)); } - let topic = Topic::create( + let CreatedTopicInfo { + topic, + partition_ids, + } = Topic::create( self.stream_id, id, name, @@ -88,15 +92,15 @@ impl Stream { compression_algorithm, max_topic_size, replication_factor, - ) - .await?; + )?; + topic.persist().await.with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to persist topic: {topic}") })?; info!("Created topic {}", topic); self.topics_ids.insert(name.to_owned(), id); self.topics.insert(id, topic); - Ok(id) + Ok((id, partition_ids)) } pub async fn update_topic( diff --git a/core/server/src/streaming/topics/consumer_groups.rs b/core/server/src/streaming/topics/consumer_groups.rs index 28a8705c..79c7f9f3 100644 --- a/core/server/src/streaming/topics/consumer_groups.rs +++ b/core/server/src/streaming/topics/consumer_groups.rs @@ -458,7 +458,7 @@ mod tests { let messages_count_of_parent_stream = Arc::new(AtomicU64::new(0)); let segments_count_of_parent_stream = Arc::new(AtomicU32::new(0)); - Topic::create( + let topic_info = Topic::create( stream_id, id, name, @@ -473,7 +473,7 @@ mod tests { MaxTopicSize::ServerDefault, 1, ) - .await - .unwrap() + .unwrap(); + topic_info.topic } } diff --git a/core/server/src/streaming/topics/messages.rs b/core/server/src/streaming/topics/messages.rs index acbe8c28..2f8f0557 100644 --- a/core/server/src/streaming/topics/messages.rs +++ b/core/server/src/streaming/topics/messages.rs @@ -25,7 +25,7 @@ use crate::streaming::utils::hash; use ahash::AHashMap; use error_set::ErrContext; use iggy_common::locking::IggySharedMutFn; -use iggy_common::{Confirmation, IggyTimestamp, PollingStrategy}; +use iggy_common::{IggyTimestamp, PollingStrategy}; use iggy_common::{IggyError, IggyExpiry, Partitioning, PartitioningKind, PollingKind}; use std::sync::atomic::Ordering; use tracing::trace; @@ -78,8 +78,8 @@ impl Topic { pub async fn append_messages( &self, - partitioning: &Partitioning, - messages: IggyMessagesBatchMut, + partition_id: u32, + batch: IggyMessagesBatchMut, ) -> Result<(), IggyError> { if !self.has_partitions() { return Err(IggyError::NoPartitions(self.topic_id, self.stream_id)); @@ -91,24 +91,11 @@ impl Topic { return Err(IggyError::TopicFull(self.topic_id, self.stream_id)); } - if messages.is_empty() { + if batch.is_empty() { return Ok(()); } - let partition_id = match partitioning.kind { - PartitioningKind::Balanced => self.get_next_partition_id(), - PartitioningKind::PartitionId => u32::from_le_bytes( - partitioning.value[..partitioning.length as usize] - .try_into() - .map_err(|_| IggyError::InvalidNumberEncoding)?, - ), - PartitioningKind::MessagesKey => { - self.calculate_partition_id_by_messages_key_hash(&partitioning.value) - } - }; - - self.append_messages_to_partition(messages, partition_id) - .await + self.append_messages_to_partition(batch, partition_id).await } pub async fn flush_unsaved_buffer( @@ -152,6 +139,20 @@ impl Topic { Ok(()) } + pub fn calculate_partition_id(&self, partitioning: &Partitioning) -> Result<u32, IggyError> { + match partitioning.kind { + PartitioningKind::Balanced => Ok(self.get_next_partition_id()), + PartitioningKind::PartitionId => Ok(u32::from_le_bytes( + partitioning.value[..partitioning.length as usize] + .try_into() + .map_err(|_| IggyError::InvalidNumberEncoding)?, + )), + PartitioningKind::MessagesKey => { + Ok(self.calculate_partition_id_by_messages_key_hash(&partitioning.value)) + } + } + } + fn get_next_partition_id(&self) -> u32 { let mut partition_id = self.current_partition_id.fetch_add(1, Ordering::SeqCst); let partitions_count = self.partitions.len() as u32; @@ -227,10 +228,8 @@ mod tests { .build() .expect("Failed to create message with valid payload and headers"); let messages = IggyMessagesBatchMut::from_messages(&[message], 1); - topic - .append_messages(&partitioning, messages) - .await - .unwrap(); + let partition = topic.calculate_partition_id(&partitioning).unwrap(); + topic.append_messages(partition, messages).await.unwrap(); } let partitions = topic.get_partitions(); @@ -260,10 +259,10 @@ mod tests { .build() .expect("Failed to create message with valid payload and headers"); let messages = IggyMessagesBatchMut::from_messages(&[message], 1); - topic - .append_messages(&partitioning, messages) - .await - .unwrap(); + let partition_id = topic + .calculate_partition_id(&partitioning) + .expect("Failed to calculate partition ID"); + topic.append_messages(partition_id, messages).await.unwrap(); } let mut read_messages_count = 0; @@ -337,7 +336,7 @@ mod tests { let messages_count_of_parent_stream = Arc::new(AtomicU64::new(0)); let segments_count_of_parent_stream = Arc::new(AtomicU32::new(0)); - let topic = Topic::create( + let created_info = Topic::create( stream_id, id, name, @@ -352,8 +351,8 @@ mod tests { MaxTopicSize::ServerDefault, 1, ) - .await .unwrap(); + let topic = created_info.topic; topic.persist().await.unwrap(); topic } diff --git a/core/server/src/streaming/topics/partitions.rs b/core/server/src/streaming/topics/partitions.rs index dfedf0a9..b9e56331 100644 --- a/core/server/src/streaming/topics/partitions.rs +++ b/core/server/src/streaming/topics/partitions.rs @@ -34,7 +34,7 @@ impl Topic { self.partitions.len() as u32 } - pub async fn add_partitions(&mut self, count: u32) -> Result<Vec<u32>, IggyError> { + pub fn add_partitions(&mut self, count: u32) -> Result<Vec<u32>, IggyError> { if count == 0 { return Ok(vec![]); } @@ -60,8 +60,7 @@ impl Topic { self.size_bytes.clone(), self.segments_count_of_parent_stream.clone(), IggyTimestamp::now(), - ) - .await; + ); self.partitions .insert(partition_id, IggyRwLock::new(partition)); partition_ids.push(partition_id) @@ -71,12 +70,9 @@ impl Topic { } pub async fn add_persisted_partitions(&mut self, count: u32) -> Result<Vec<u32>, IggyError> { - let partition_ids = self - .add_partitions(count) - .await - .with_error_context(|error| { - format!("{COMPONENT} (error: {error}) - failed to add partitions, count: {count}") - })?; + let partition_ids = self.add_partitions(count).with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to add partitions, count: {count}") + })?; for partition_id in &partition_ids { let partition = self.partitions.get(partition_id).unwrap(); let mut partition = partition.write().await; diff --git a/core/server/src/streaming/topics/storage.rs b/core/server/src/streaming/topics/storage.rs index db74fc98..827ba6c9 100644 --- a/core/server/src/streaming/topics/storage.rs +++ b/core/server/src/streaming/topics/storage.rs @@ -113,8 +113,7 @@ impl TopicStorage for FileTopicStorage { topic.size_bytes.clone(), topic.segments_count_of_parent_stream.clone(), partition_state.created_at, - ) - .await; + ); unloaded_partitions.push(partition); } @@ -160,8 +159,7 @@ impl TopicStorage for FileTopicStorage { topic.size_bytes.clone(), topic.segments_count_of_parent_stream.clone(), partition_state.created_at, - ) - .await; + ); partition.persist().await.with_error_context(|error| { format!( "{COMPONENT} (error: {error}) - failed to persist partition: {partition}" diff --git a/core/server/src/streaming/topics/topic.rs b/core/server/src/streaming/topics/topic.rs index 2e9e38c9..43b28c49 100644 --- a/core/server/src/streaming/topics/topic.rs +++ b/core/server/src/streaming/topics/topic.rs @@ -63,9 +63,14 @@ pub struct Topic { pub created_at: IggyTimestamp, } +pub struct CreatedTopicInfo { + pub topic: Topic, + pub partition_ids: Vec<u32>, +} + impl Topic { #[allow(clippy::too_many_arguments)] - pub async fn empty( + pub fn empty( stream_id: u32, topic_id: u32, name: &str, @@ -74,7 +79,7 @@ impl Topic { segments_count_of_parent_stream: Arc<AtomicU32>, config: Arc<SystemConfig>, storage: Rc<SystemStorage>, - ) -> Topic { + ) -> CreatedTopicInfo { Topic::create( stream_id, topic_id, @@ -90,12 +95,11 @@ impl Topic { MaxTopicSize::ServerDefault, 1, ) - .await .unwrap() } #[allow(clippy::too_many_arguments)] - pub async fn create( + pub fn create( stream_id: u32, topic_id: u32, name: &str, @@ -109,7 +113,7 @@ impl Topic { compression_algorithm: CompressionAlgorithm, max_topic_size: MaxTopicSize, replication_factor: u8, - ) -> Result<Topic, IggyError> { + ) -> Result<CreatedTopicInfo, IggyError> { let path = config.get_topic_path(stream_id, topic_id); let partitions_path = config.get_partitions_path(stream_id, topic_id); let mut topic = Topic { @@ -142,8 +146,11 @@ impl Topic { message_expiry, topic.message_expiry ); - topic.add_partitions(partitions_count).await?; - Ok(topic) + let partition_ids = topic.add_partitions(partitions_count)?; + Ok(CreatedTopicInfo { + topic, + partition_ids, + }) } pub fn is_full(&self) -> bool { @@ -313,7 +320,7 @@ mod tests { let messages_count_of_parent_stream = Arc::new(AtomicU64::new(0)); let segments_count_of_parent_stream = Arc::new(AtomicU32::new(0)); - let topic = Topic::create( + let topic_info = Topic::create( stream_id, topic_id, name, @@ -328,9 +335,8 @@ mod tests { max_topic_size, replication_factor, ) - .await .unwrap(); - + let topic = topic_info.topic; assert_eq!(topic.stream_id, stream_id); assert_eq!(topic.topic_id, topic_id); assert_eq!(topic.path, path);
