This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/io_uring_tpc by this push:
new 52dbed84 feat(io_uring): implement polling messages using new
architecture (#2162)
52dbed84 is described below
commit 52dbed84fb91903255d8b5097c34a652eb031c27
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Wed Sep 10 14:33:17 2025 +0200
feat(io_uring): implement polling messages using new architecture (#2162)
---
core/bench/src/actors/consumer/client/low_level.rs | 12 +-
core/sdk/src/clients/binary_consumer_group.rs | 4 +-
core/sdk/src/clients/client.rs | 6 +-
core/server/src/bootstrap.rs | 4 +-
core/server/src/http/consumer_groups.rs | 6 +-
core/server/src/shard/mod.rs | 334 +++++++++-
core/server/src/shard/namespace.rs | 15 +
core/server/src/shard/system/messages.rs | 732 +++++++++++----------
core/server/src/shard/system/partitions.rs | 18 +-
core/server/src/shard/system/utils.rs | 2 +-
core/server/src/shard/transmission/message.rs | 15 +-
core/server/src/slab/helpers.rs | 6 +-
core/server/src/slab/partitions.rs | 4 +
core/server/src/slab/streams.rs | 231 ++++++-
core/server/src/streaming/cluster/mod.rs | 4 +-
core/server/src/streaming/partitions/helpers.rs | 599 ++++++++++++++++-
core/server/src/streaming/partitions/journal.rs | 22 +
core/server/src/streaming/partitions/log.rs | 55 +-
core/server/src/streaming/partitions/messages.rs | 7 -
.../src/streaming/segments/indexes/index_reader.rs | 8 +-
.../src/streaming/segments/indexes/index_writer.rs | 8 +-
.../src/streaming/segments/indexes/indexes_mut.rs | 2 +-
.../streaming/segments/messages/messages_reader.rs | 11 +-
.../streaming/segments/messages/messages_writer.rs | 16 +-
core/server/src/streaming/segments/storage.rs | 26 +-
25 files changed, 1670 insertions(+), 477 deletions(-)
diff --git a/core/bench/src/actors/consumer/client/low_level.rs
b/core/bench/src/actors/consumer/client/low_level.rs
index 139dae24..088ad42f 100644
--- a/core/bench/src/actors/consumer/client/low_level.rs
+++ b/core/bench/src/actors/consumer/client/low_level.rs
@@ -89,7 +89,7 @@ impl ConsumerClient for LowLevelConsumerClient {
if polled.messages.is_empty() {
return Ok(None);
}
- let message_count = u32::try_from(polled.messages.len()).unwrap();
+ let messages_count = polled.messages.len() as u64;
let latency = if self.config.origin_timestamp_latency_calculation {
let now = IggyTimestamp::now().as_micros();
Duration::from_micros(now -
polled.messages[0].header.origin_timestamp)
@@ -100,10 +100,16 @@ impl ConsumerClient for LowLevelConsumerClient {
let user_bytes = batch_user_size_bytes(&polled);
let total_bytes = batch_total_size_bytes(&polled);
- self.offset += polled.messages.len() as u64;
+ self.offset += messages_count;
+ match self.polling_strategy.kind {
+ PollingKind::Offset => {
+ self.polling_strategy.value += messages_count;
+ }
+ _ => {}
+ }
Ok(Some(BatchMetrics {
- messages: message_count,
+ messages: messages_count as u32,
user_data_bytes: user_bytes,
total_bytes,
latency,
diff --git a/core/sdk/src/clients/binary_consumer_group.rs
b/core/sdk/src/clients/binary_consumer_group.rs
index b0ef8c67..b415e42f 100644
--- a/core/sdk/src/clients/binary_consumer_group.rs
+++ b/core/sdk/src/clients/binary_consumer_group.rs
@@ -20,7 +20,9 @@ use crate::prelude::IggyClient;
use async_dropper::AsyncDrop;
use async_trait::async_trait;
use iggy_binary_protocol::{ConsumerGroupClient, UserClient};
-use iggy_common::{locking::IggyRwLockFn, ConsumerGroup, ConsumerGroupDetails,
Identifier, IggyError};
+use iggy_common::{
+ ConsumerGroup, ConsumerGroupDetails, Identifier, IggyError,
locking::IggyRwLockFn,
+};
#[async_trait]
impl ConsumerGroupClient for IggyClient {
diff --git a/core/sdk/src/clients/client.rs b/core/sdk/src/clients/client.rs
index 6f31dd56..962ff1bc 100644
--- a/core/sdk/src/clients/client.rs
+++ b/core/sdk/src/clients/client.rs
@@ -17,8 +17,8 @@
*/
use crate::clients::client_builder::IggyClientBuilder;
-use iggy_common::locking::{IggyRwLock, IggyRwLockFn};
use iggy_common::Consumer;
+use iggy_common::locking::{IggyRwLock, IggyRwLockFn};
use crate::client_wrappers::client_wrapper::ClientWrapper;
use crate::http::http_client::HttpClient;
@@ -31,9 +31,7 @@ use crate::tcp::tcp_client::TcpClient;
use async_broadcast::Receiver;
use async_trait::async_trait;
use iggy_binary_protocol::{Client, SystemClient};
-use iggy_common::{
- ConnectionStringUtils, DiagnosticEvent, Partitioner, TransportProtocol,
-};
+use iggy_common::{ConnectionStringUtils, DiagnosticEvent, Partitioner,
TransportProtocol};
use std::fmt::Debug;
use std::sync::Arc;
use tokio::spawn;
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index 4678b146..8e3797b2 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -352,7 +352,7 @@ pub fn create_shard_executor(cpu_set: HashSet<usize>) ->
Runtime {
proactor
.capacity(4096)
.coop_taskrun(true)
- .taskrun_flag(false); // TODO: Try enabling this.
+ .taskrun_flag(true); // TODO: Try enabling this.
// FIXME(hubcio): Only set thread_pool_limit(0) on non-macOS platforms
// This causes a freeze on macOS with compio fs operations
@@ -362,7 +362,7 @@ pub fn create_shard_executor(cpu_set: HashSet<usize>) ->
Runtime {
compio::runtime::RuntimeBuilder::new()
.with_proactor(proactor.to_owned())
- .event_interval(69)
+ .event_interval(128)
.thread_affinity(cpu_set)
.build()
.unwrap()
diff --git a/core/server/src/http/consumer_groups.rs
b/core/server/src/http/consumer_groups.rs
index 6c5a49af..56f4ecf4 100644
--- a/core/server/src/http/consumer_groups.rs
+++ b/core/server/src/http/consumer_groups.rs
@@ -262,9 +262,9 @@ async fn delete_consumer_group(
// Delete using the new API
let consumer_group = state.shard.shard().delete_consumer_group2(
- &session,
- &identifier_stream_id,
- &identifier_topic_id,
+ &session,
+ &identifier_stream_id,
+ &identifier_topic_id,
&identifier_group_id
)
.with_error_context(|error| format!("{COMPONENT} (error: {error}) - failed
to delete consumer group with ID: {group_id} for topic with ID: {topic_id} in
stream with ID: {stream_id}"))?;
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 2a5ec7eb..8794f665 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -32,7 +32,8 @@ use error_set::ErrContext;
use futures::future::try_join_all;
use hash32::{Hasher, Murmur3Hasher};
use iggy_common::{
- EncryptorKind, Identifier, IggyError, Permissions, UserId, UserStatus,
+ EncryptorKind, Identifier, IggyError, IggyTimestamp, Permissions,
PollingKind, UserId,
+ UserStatus,
defaults::{DEFAULT_ROOT_PASSWORD, DEFAULT_ROOT_USERNAME},
locking::IggyRwLockFn,
};
@@ -50,10 +51,11 @@ use std::{
},
time::{Duration, Instant},
};
-use tracing::{error, info, instrument, warn};
+use tracing::{error, info, instrument, trace, warn};
use transmission::connector::{Receiver, ShardConnector, StopReceiver,
StopSender};
use crate::{
+ binary::handlers::messages::poll_messages_handler::IggyPollMetadata,
configs::server::ServerConfig,
http::http_server,
io::fs_utils,
@@ -77,9 +79,12 @@ use crate::{
streaming::{
clients::client_manager::ClientManager,
diagnostics::metrics::Metrics,
+ partitions,
personal_access_tokens::personal_access_token::PersonalAccessToken,
+ polling_consumer::PollingConsumer,
session::Session,
storage::SystemStorage,
+ streams, topics,
users::{permissioner::Permissioner, user::User},
utils::ptr::EternalPtr,
},
@@ -124,7 +129,7 @@ impl Shard {
// TODO: Maybe pad to cache line size?
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub struct ShardInfo {
- id: u16,
+ pub id: u16,
}
impl ShardInfo {
@@ -344,24 +349,312 @@ impl IggyShard {
}
async fn handle_request(&self, request: ShardRequest) ->
Result<ShardResponse, IggyError> {
- todo!();
- /*
- let stream =
self.get_stream(&Identifier::numeric(request.stream_id)?)?;
- let topic = stream.get_topic(&Identifier::numeric(request.topic_id)?)?;
+ let stream_id = request.stream_id;
+ let topic_id = request.topic_id;
let partition_id = request.partition_id;
match request.payload {
ShardRequestPayload::SendMessages { batch } => {
- topic.append_messages(partition_id, batch).await?;
+ let mut batch = self.maybe_encrypt_messages(batch)?;
+ let messages_count = batch.count();
+
+ let current_offset = self.streams2.with_partition_by_id(
+ &stream_id,
+ &topic_id,
+ partition_id,
+ partitions::helpers::calculate_current_offset(),
+ );
+
+ self.streams2
+ .with_partition_by_id_async(
+ &stream_id,
+ &topic_id,
+ partition_id,
+
partitions::helpers::deduplicate_messages(current_offset, &mut batch),
+ )
+ .await;
+
+ let (journal_messages_count, journal_size) =
+ self.streams2.with_partition_by_id_mut(
+ &stream_id,
+ &topic_id,
+ partition_id,
+ partitions::helpers::append_to_journal(self.id,
current_offset, batch),
+ )?;
+
+ let unsaved_messages_count_exceeded = journal_messages_count
+ >= self.config.system.partition.messages_required_to_save;
+ let unsaved_messages_size_exceeded = journal_size
+ >= self
+ .config
+ .system
+ .partition
+ .size_of_messages_required_to_save
+ .as_bytes_u64() as u32;
+
+ let is_full = self.streams2.with_partition_by_id(
+ &stream_id,
+ &topic_id,
+ partition_id,
+ partitions::helpers::is_segment_full(),
+ );
+
+ // Try committing the journal
+ if is_full || unsaved_messages_count_exceeded ||
unsaved_messages_size_exceeded {
+ self.streams2
+ .persist_messages(
+ self.id,
+ &stream_id,
+ &topic_id,
+ partition_id,
+ unsaved_messages_count_exceeded,
+ unsaved_messages_size_exceeded,
+ journal_messages_count,
+ journal_size,
+ &self.config.system,
+ )
+ .await?;
+
+ if is_full {
+ self.streams2
+ .handle_full_segment(
+ self.id,
+ &stream_id,
+ &topic_id,
+ partition_id,
+ &self.config.system,
+ )
+ .await?;
+ self.metrics.increment_messages(messages_count as u64);
+ }
+ }
Ok(ShardResponse::SendMessages)
}
ShardRequestPayload::PollMessages { args, consumer } => {
- let (metadata, batch) = topic
- .get_messages(consumer, partition_id, args.strategy,
args.count)
- .await?;
- Ok(ShardResponse::PollMessages((metadata, batch)))
+ let current_offset = self.streams2.with_partition_by_id(
+ &stream_id,
+ &topic_id,
+ partition_id,
+ |(_, _, _, offset, ..)| offset.load(Ordering::Relaxed),
+ );
+ let metadata = IggyPollMetadata::new(partition_id as u32,
current_offset);
+ let count = args.count;
+ let strategy = args.strategy;
+ let value = strategy.value;
+ let batches = match strategy.kind {
+ PollingKind::Offset => {
+ let offset = value;
+ // We have to remember to keep the invariant from the
if that is on line 496.
+ // Alternatively a better design would be to get rid
of that if and move the validations here.
+ let batches = self
+ .streams2
+ .get_messages_by_offset(
+ &stream_id,
+ &topic_id,
+ partition_id,
+ offset,
+ count,
+ )
+ .await?;
+ Ok(batches)
+ }
+ PollingKind::Timestamp => {
+ let timestamp = IggyTimestamp::from(value);
+ let timestamp_ts = timestamp.as_micros();
+ trace!(
+ "Getting {count} messages by timestamp: {} for
partition: {}...",
+ timestamp_ts, partition_id
+ );
+
+ let batches = self
+ .streams2
+ .get_messages_by_timestamp(
+ &stream_id,
+ &topic_id,
+ partition_id,
+ timestamp_ts,
+ count,
+ )
+ .await?;
+ Ok(batches)
+ }
+ PollingKind::First => {
+ let first_offset = self.streams2.with_partition_by_id(
+ &stream_id,
+ &topic_id,
+ partition_id,
+ |(_, _, _, _, _, _, log)| {
+ log.segments()
+ .first()
+ .map(|segment| segment.start_offset)
+ .unwrap_or(0)
+ },
+ );
+
+ let batches = self
+ .streams2
+ .get_messages_by_offset(
+ &stream_id,
+ &topic_id,
+ partition_id,
+ first_offset,
+ count,
+ )
+ .await?;
+ Ok(batches)
+ }
+ PollingKind::Last => {
+ let (start_offset, actual_count) =
self.streams2.with_partition_by_id(
+ &stream_id,
+ &topic_id,
+ partition_id,
+ |(_, _, _, offset, _, _, _)| {
+ let current_offset =
offset.load(Ordering::Relaxed);
+ let mut requested_count = 0;
+ if requested_count > current_offset + 1 {
+ requested_count = current_offset + 1
+ }
+ let start_offset = 1 + current_offset -
requested_count;
+ (start_offset, requested_count as u32)
+ },
+ );
+
+ let batches = self
+ .streams2
+ .get_messages_by_offset(
+ &stream_id,
+ &topic_id,
+ partition_id,
+ start_offset,
+ actual_count,
+ )
+ .await?;
+ Ok(batches)
+ }
+ PollingKind::Next => {
+ let (consumer_offset, consumer_id) = match consumer {
+ PollingConsumer::Consumer(consumer_id, _) => (
+ self.streams2
+ .with_partition_by_id(
+ &stream_id,
+ &topic_id,
+ partition_id,
+
partitions::helpers::get_consumer_offset(consumer_id),
+ )
+ .map(|c_offset| c_offset.stored_offset),
+ consumer_id,
+ ),
+ PollingConsumer::ConsumerGroup(cg_id, _) => (
+ self.streams2
+ .with_partition_by_id(
+ &stream_id,
+ &topic_id,
+ partition_id,
+
partitions::helpers::get_consumer_group_member_offset(
+ cg_id,
+ ),
+ )
+ .map(|cg_offset| cg_offset.stored_offset),
+ cg_id,
+ ),
+ };
+
+ let Some(consumer_offset) = consumer_offset else {
+ return
Err(IggyError::ConsumerOffsetNotFound(consumer_id));
+ };
+ let offset = consumer_offset + 1;
+ trace!(
+ "Getting next messages for consumer id: {} for
partition: {} from offset: {}...",
+ consumer_id, partition_id, offset
+ );
+ let batches = self
+ .streams2
+ .get_messages_by_offset(
+ &stream_id,
+ &topic_id,
+ partition_id,
+ offset,
+ count,
+ )
+ .await?;
+ Ok(batches)
+ }
+ }?;
+
+ let numeric_stream_id = self.streams2.with_stream_by_id(
+ &stream_id,
+ crate::streaming::streams::helpers::get_stream_id(),
+ );
+ let numeric_topic_id = self.streams2.with_topic_by_id(
+ &stream_id,
+ &topic_id,
+ crate::streaming::topics::helpers::get_topic_id(),
+ );
+
+ if args.auto_commit && !batches.is_empty() {
+ let offset = batches
+ .last_offset()
+ .expect("Batch set should have at least one batch");
+ trace!(
+ "Last offset: {} will be automatically stored for {},
stream: {}, topic: {}, partition: {}",
+ offset, consumer, numeric_stream_id, numeric_topic_id,
partition_id
+ );
+ match consumer {
+ PollingConsumer::Consumer(consumer_id, _) => {
+ self.streams2.with_partition_by_id(
+ &stream_id,
+ &topic_id,
+ partition_id,
+ partitions::helpers::store_consumer_offset(
+ consumer_id,
+ numeric_stream_id,
+ numeric_topic_id,
+ partition_id,
+ offset,
+ &self.config.system,
+ ),
+ );
+ self.streams2
+ .with_partition_by_id_async(
+ &stream_id,
+ &topic_id,
+ partition_id,
+
partitions::helpers::persist_consumer_offset_to_disk(
+ self.id,
+ consumer_id,
+ ),
+ )
+ .await?;
+ }
+ PollingConsumer::ConsumerGroup(cg_id, _) => {
+ self.streams2.with_partition_by_id(
+ &stream_id,
+ &topic_id,
+ partition_id,
+
partitions::helpers::store_consumer_group_member_offset(
+ cg_id,
+ numeric_stream_id,
+ numeric_topic_id,
+ partition_id,
+ offset,
+ &self.config.system,
+ ),
+ );
+ self.streams2.with_partition_by_id_async(
+ &stream_id,
+ &topic_id,
+ partition_id,
+
partitions::helpers::persist_consumer_group_member_offset_to_disk(
+ self.id,
+ cg_id,
+ ),
+ )
+ .await?;
+ }
+ }
+ }
+ Ok(ShardResponse::PollMessages((metadata, batches)))
}
}
- */
}
async fn handle_event(&self, event: ShardEvent) -> Result<(), IggyError> {
@@ -563,13 +856,12 @@ impl IggyShard {
&topic_id,
&polling_consumer,
partition_id,
- );
+ )?;
Ok(())
}
}
}
- /*
pub async fn send_request_to_shard_or_recoil(
&self,
namespace: &IggyNamespace,
@@ -593,13 +885,12 @@ impl IggyShard {
Ok(ShardSendRequestResult::Response(response))
} else {
Err(IggyError::ShardNotFound(
- namespace.stream_id,
- namespace.topic_id,
- namespace.partition_id,
+ namespace.stream_id(),
+ namespace.topic_id(),
+ namespace.partition_id(),
))
}
}
- */
pub async fn broadcast_event_to_all_shards(&self, event: ShardEvent) ->
Vec<ShardResponse> {
let mut responses =
Vec::with_capacity(self.get_available_shards_count() as usize);
@@ -669,7 +960,10 @@ impl IggyShard {
}
pub fn remove_shard_table_record(&self, namespace: &IggyNamespace) ->
ShardInfo {
- self.shards_table.remove(namespace).map(|(_, shard_info)|
shard_info).expect("remove_shard_table_record: namespace not found")
+ self.shards_table
+ .remove(namespace)
+ .map(|(_, shard_info)| shard_info)
+ .expect("remove_shard_table_record: namespace not found")
}
pub fn remove_shard_table_records(
diff --git a/core/server/src/shard/namespace.rs
b/core/server/src/shard/namespace.rs
index 127be415..f3a8f2c3 100644
--- a/core/server/src/shard/namespace.rs
+++ b/core/server/src/shard/namespace.rs
@@ -63,6 +63,21 @@ impl IggyNamespace {
self.0
}
+ #[inline]
+ pub fn stream_id(&self) -> usize {
+ ((self.0 >> STREAM_SHIFT) & STREAM_MASK) as usize
+ }
+
+ #[inline]
+ pub fn topic_id(&self) -> usize {
+ ((self.0 >> TOPIC_SHIFT) & TOPIC_MASK) as usize
+ }
+
+ #[inline]
+ pub fn partition_id(&self) -> usize {
+ ((self.0 >> PARTITION_SHIFT) & PARTITION_MASK) as usize
+ }
+
#[inline]
pub fn new(stream: usize, topic: usize, partition: usize) -> Self {
let value = ((stream as u64) & STREAM_MASK) << STREAM_SHIFT
diff --git a/core/server/src/shard/system/messages.rs
b/core/server/src/shard/system/messages.rs
index 0c898b26..1a57f4a1 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -20,27 +20,23 @@ use std::sync::atomic::Ordering;
use super::COMPONENT;
use crate::binary::handlers::messages::poll_messages_handler::IggyPollMetadata;
-use crate::configs::cache_indexes::CacheIndexesConfig;
use crate::shard::IggyShard;
+use crate::shard::namespace::IggyNamespace;
use crate::shard::transmission::frame::ShardResponse;
use crate::shard::transmission::message::{
ShardMessage, ShardRequest, ShardRequestPayload, ShardSendRequestResult,
};
-use crate::streaming::partitions::journal::Journal;
-use crate::streaming::partitions::partition2::PartitionRoot;
-use crate::streaming::segments::storage::create_segment_storage;
-use crate::streaming::segments::{
- IggyIndexesMut, IggyMessagesBatchMut, IggyMessagesBatchSet, Segment2,
-};
+use crate::shard_trace;
+use crate::streaming::polling_consumer::PollingConsumer;
+use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut,
IggyMessagesBatchSet};
use crate::streaming::session::Session;
use crate::streaming::utils::{PooledBuffer, hash};
-use crate::streaming::{streams, topics};
-use crate::{shard_info, shard_trace};
+use crate::streaming::{partitions, streams, topics};
use error_set::ErrContext;
use iggy_common::{
- BytesSerializable, Consumer, IGGY_MESSAGE_HEADER_SIZE, Identifier,
IggyError, Partitioning,
- PartitioningKind, PollingStrategy,
+ BytesSerializable, Consumer, IGGY_MESSAGE_HEADER_SIZE, Identifier,
IggyError, IggyTimestamp,
+ Partitioning, PartitioningKind, PollingKind, PollingStrategy,
};
use tracing::{error, trace};
@@ -98,10 +94,6 @@ impl IggyShard {
return Ok(());
}
- // Encrypt messages if encryptor is enabled in configuration.
- let mut batch = self.maybe_encrypt_messages(batch)?;
- let messages_count = batch.count();
-
let partition_id =
self.streams2
.with_topic_by_id(
@@ -128,308 +120,113 @@ impl IggyShard {
},
)?;
- // Deduplicate messages and adjust their offsets.
- let current_offset = self
- .streams2
- .with_partition_by_id_async(
- stream_id,
- topic_id,
- partition_id,
- async |(root, _, deduplicator, offset, _, _, log)| {
- let segment = log.active_segment();
- let current_offset = if !root.should_increment_offset() {
- 0
- } else {
- offset.load(Ordering::Relaxed) + 1
- };
- batch
- .prepare_for_persistence(
- segment.start_offset,
- current_offset,
- segment.size,
- deduplicator.as_ref(),
- )
- .await;
- current_offset
- },
- )
- .await;
-
- // Append to the journal.
- let (journal_messages_count, journal_size) =
self.streams2.with_partition_by_id_mut(
- stream_id,
- topic_id,
- partition_id,
- |(root, stats, _, offset, .., log)| {
- let segment = log.active_segment_mut();
-
- if segment.end_offset == 0 {
- segment.start_timestamp = batch.first_timestamp().unwrap();
- }
-
- let batch_messages_size = batch.size();
- let batch_messages_count = batch.count();
-
- segment.end_timestamp = batch.last_timestamp().unwrap();
- segment.end_offset = batch.last_offset().unwrap();
- segment.size += batch_messages_size;
-
- let (journal_messages_count, journal_size) =
- log.journal_mut().append(self.id, batch)?;
-
- stats.increment_messages_count(batch_messages_count as u64);
- stats.increment_size_bytes(batch_messages_size as u64);
-
- let last_offset = if batch_messages_count == 0 {
- current_offset
- } else {
- current_offset + batch_messages_count as u64 - 1
- };
-
- if root.should_increment_offset() {
- offset.store(last_offset, Ordering::Relaxed);
- } else {
- root.set_should_increment_offset(true);
- offset.store(last_offset, Ordering::Relaxed);
- }
-
- Ok((journal_messages_count, journal_size))
- },
- )?;
-
- let unsaved_messages_count_exceeded =
- journal_messages_count >=
self.config.system.partition.messages_required_to_save;
- let unsaved_messages_size_exceeded = journal_size
- >= self
- .config
- .system
- .partition
- .size_of_messages_required_to_save
- .as_bytes_u64() as u32;
-
- let is_full =
- self.streams2
- .with_partition_by_id(stream_id, topic_id, partition_id, |(..,
log)| {
- log.active_segment().is_full()
- });
+ let namespace = IggyNamespace::new(numeric_stream_id,
numeric_topic_id, partition_id);
+ let payload = ShardRequestPayload::SendMessages { batch: batch };
+ let request = ShardRequest::new(stream_id.clone(), topic_id.clone(),
partition_id, payload);
+ let message = ShardMessage::Request(request);
+ match self
+ .send_request_to_shard_or_recoil(&namespace, message)
+ .await?
+ {
+ ShardSendRequestResult::Recoil(message) => {
+ if let ShardMessage::Request(ShardRequest {
+ partition_id,
+ payload,
+ ..
+ }) = message
+ && let ShardRequestPayload::SendMessages { batch } =
payload
+ {
+ // Encrypt messages if encryptor is enabled in
configuration.
+ let mut batch = self.maybe_encrypt_messages(batch)?;
+ let messages_count = batch.count();
- // Try committing the journal
- if is_full || unsaved_messages_count_exceeded ||
unsaved_messages_size_exceeded {
- let batches = self.streams2.with_partition_by_id_mut(
- stream_id,
- topic_id,
- partition_id,
- |(.., log)| {
- let batches = log.journal_mut().commit();
- log.ensure_indexes();
- batches.append_indexes_to(log.indexes_mut().unwrap());
- batches
- },
- );
+ let current_offset = self.streams2.with_partition_by_id(
+ stream_id,
+ topic_id,
+ partition_id,
+ partitions::helpers::calculate_current_offset(),
+ );
- let (saved, batch_count) = self.streams2
- .with_partition_by_id_async(
- stream_id,
- topic_id,
- partition_id,
- async |(.., log)| {
- let reason = if unsaved_messages_count_exceeded {
- format!(
- "unsaved messages count exceeded: {}, max from
config: {}",
- journal_messages_count,
-
self.config.system.partition.messages_required_to_save,
- )
- } else if unsaved_messages_size_exceeded {
- format!(
- "unsaved messages size exceeded: {}, max from
config: {}",
- journal_size,
-
self.config.system.partition.size_of_messages_required_to_save,
- )
- } else {
- format!(
- "segment is full, current size: {}, max from
config: {}",
- log.active_segment().size,
- self.config.system.segment.size,
- )
- };
- shard_trace!(
- self.id,
- "Persisting messages on disk for stream ID: {},
topic ID: {}, partition ID: {} because {}...",
+ self.streams2
+ .with_partition_by_id_async(
stream_id,
topic_id,
partition_id,
- reason
- );
-
- let batch_count = batches.count();
- let batch_size = batches.size();
-
- let storage = log.active_storage();
- let saved = storage
- .messages_writer
- .as_ref()
- .expect("Messages writer not initialized")
- .save_batch_set(batches)
- .await
- .with_error_context(|error| {
- let segment = log.active_segment();
- format!(
- "Failed to save batch of {batch_count}
messages \
- ({batch_size} bytes) to {segment}.
{error}",
- )
- })?;
-
- let unsaved_indexes_slice =
log.indexes().unwrap().unsaved_slice();
- let len = unsaved_indexes_slice.len();
- storage
- .index_writer
- .as_ref()
- .expect("Index writer not initialized")
- .save_indexes(unsaved_indexes_slice)
- .await
- .with_error_context(|error| {
- let segment = log.active_segment();
- format!(
- "Failed to save index of {len} indexes to
{segment}. {error}",
- )
- })?;
-
- shard_trace!(
- self.id,
- "Persisted {} messages on disk for stream ID: {},
topic ID: {}, for partition with ID: {}, total bytes written: {}.",
- batch_count, stream_id, topic_id, partition_id,
saved
- );
-
- Ok((saved, batch_count))
- },
- )
- .await?;
-
- self.streams2.with_partition_by_id_mut(
- stream_id,
- topic_id,
- partition_id,
- |(_, stats, .., log)| {
- log.active_segment_mut().size += saved.as_bytes_u32();
- log.indexes_mut().unwrap().mark_saved();
- if self.config.system.segment.cache_indexes ==
CacheIndexesConfig::None {
- log.indexes_mut().unwrap().clear();
- }
- stats.increment_size_bytes(saved.as_bytes_u64());
- stats.increment_messages_count(batch_count as u64);
- },
- );
-
- // Handle possibly full segment after saving.
- let is_segment_full = self.streams2.with_partition_by_id(
- stream_id,
- topic_id,
- partition_id,
- |(.., log)| log.active_segment().is_full(),
- );
-
- if is_segment_full {
- let (start_offset, size, end_offset) =
self.streams2.with_partition_by_id(
- stream_id,
- topic_id,
- partition_id,
- |(.., log)| {
- (
- log.active_segment().start_offset,
- log.active_segment().size,
- log.active_segment().end_offset,
+
partitions::helpers::deduplicate_messages(current_offset, &mut batch),
)
- },
- );
-
- let numeric_stream_id = self
- .streams2
- .with_stream_by_id(stream_id,
streams::helpers::get_stream_id());
- let numeric_topic_id = self.streams2.with_topic_by_id(
- stream_id,
- topic_id,
- topics::helpers::get_topic_id(),
- );
+ .await;
- if self.config.system.segment.cache_indexes ==
CacheIndexesConfig::OpenSegment
- || self.config.system.segment.cache_indexes ==
CacheIndexesConfig::None
- {
- self.streams2.with_partition_by_id_mut(
+ let (journal_messages_count, journal_size) =
+ self.streams2.with_partition_by_id_mut(
+ stream_id,
+ topic_id,
+ partition_id,
+ partitions::helpers::append_to_journal(self.id,
current_offset, batch),
+ )?;
+
+ let unsaved_messages_count_exceeded =
journal_messages_count
+ >=
self.config.system.partition.messages_required_to_save;
+ let unsaved_messages_size_exceeded = journal_size
+ >= self
+ .config
+ .system
+ .partition
+ .size_of_messages_required_to_save
+ .as_bytes_u64() as u32;
+
+ let is_full = self.streams2.with_partition_by_id(
stream_id,
topic_id,
partition_id,
- |(.., log)| {
- log.clear_indexes();
- },
+ partitions::helpers::is_segment_full(),
);
- }
- self.streams2.with_partition_by_id_mut(
- stream_id,
- topic_id,
- partition_id,
- |(.., log)| {
- log.active_segment_mut().sealed = true;
- },
- );
- let (log_writer, index_writer) =
self.streams2.with_partition_by_id_mut(
- stream_id,
- topic_id,
- partition_id,
- |(.., log)| log.active_storage_mut().shutdown(),
- );
-
- compio::runtime::spawn(async move {
- let _ = log_writer.fsync().await;
- })
- .detach();
- compio::runtime::spawn(async move {
- let _ = index_writer.fsync().await;
- drop(index_writer)
- })
- .detach();
-
- shard_info!(
- self.id,
- "Closed segment for stream: {}, topic: {} with start
offset: {}, end offset: {}, size: {} for partition with ID: {}.",
- stream_id,
- topic_id,
- start_offset,
- end_offset,
- size,
- partition_id
- );
-
- let messages_size = 0;
- let indexes_size = 0;
- let segment = Segment2::new(
- end_offset + 1,
- self.config.system.segment.size,
- self.config.system.segment.message_expiry,
- );
-
- let storage = create_segment_storage(
- &self.config.system,
- numeric_stream_id,
- numeric_topic_id,
- partition_id,
- messages_size,
- indexes_size,
- end_offset + 1,
- )
- .await?;
- self.streams2.with_partition_by_id_mut(
- stream_id,
- topic_id,
- partition_id,
- |(.., log)| {
- log.add_persisted_segment(segment, storage);
- },
- );
+ // Try committing the journal
+ if is_full || unsaved_messages_count_exceeded ||
unsaved_messages_size_exceeded
+ {
+ self.streams2
+ .persist_messages(
+ self.id,
+ stream_id,
+ topic_id,
+ partition_id,
+ unsaved_messages_count_exceeded,
+ unsaved_messages_size_exceeded,
+ journal_messages_count,
+ journal_size,
+ &self.config.system,
+ )
+ .await?;
+
+ if is_full {
+ self.streams2
+ .handle_full_segment(
+ self.id,
+ stream_id,
+ topic_id,
+ partition_id,
+ &self.config.system,
+ )
+ .await?;
+ self.metrics.increment_messages(messages_count as
u64);
+ }
+ }
+ Ok(())
+ } else {
+ unreachable!(
+ "Expected a SendMessages request inside of
SendMessages handler, impossible state"
+ );
+ }
}
- }
+ ShardSendRequestResult::Response(response) => match response {
+ ShardResponse::SendMessages => Ok(()),
+ ShardResponse::ErrorResponse(err) => Err(err),
+ _ => unreachable!(
+ "Expected a SendMessages response inside of SendMessages
handler, impossible state"
+ ),
+ },
+ }?;
- self.metrics.increment_messages(messages_count as u64);
Ok(())
}
@@ -443,25 +240,16 @@ impl IggyShard {
maybe_partition_id: Option<u32>,
args: PollingArgs,
) -> Result<(IggyPollMetadata, IggyMessagesBatchSet), IggyError> {
- todo!();
- /*
- let stream = self.get_stream(stream_id).with_error_context(|error| {
- format!(
- "{COMPONENT} (error: {error}) - stream not found for stream
ID: {}",
- stream_id
- )
- })?;
- let stream_id = stream.stream_id;
- let numeric_topic_id = stream.get_topic(topic_id).map(|topic|
topic.topic_id).with_error_context(|error| {
- format!(
- "{COMPONENT} (error: {error}) - topic not found for stream ID:
{}, topic_id: {}",
- stream_id, topic_id
- )
- })?;
+ let numeric_stream_id = self
+ .streams2
+ .with_stream_by_id(stream_id, streams::helpers::get_stream_id());
+ let numeric_topic_id =
+ self.streams2
+ .with_topic_by_id(stream_id, topic_id,
topics::helpers::get_topic_id());
self.permissioner
.borrow()
- .poll_messages(user_id, stream_id, numeric_topic_id)
+ .poll_messages(user_id, numeric_stream_id as u32, numeric_topic_id
as u32)
.with_error_context(|error| format!(
"{COMPONENT} (error: {error}) - permission denied to poll
messages for user {} on stream ID: {}, topic ID: {}",
user_id,
@@ -469,49 +257,282 @@ impl IggyShard {
numeric_topic_id
))?;
- // There might be no partition assigned, if it's the consumer group
member without any partitions.
- //return Ok((IggyPollMetadata::new(0, 0),
IggyMessagesBatchSet::empty()));
-
- let (metadata, batch) = stream.poll_messages(topic_id, client_id,
consumer, maybe_partition_id, args.auto_commit, async |topic, consumer,
partition_id| {
- let namespace = IggyNamespace::new(stream.stream_id,
topic.topic_id, partition_id);
- let payload = ShardRequestPayload::PollMessages {
- consumer,
- args,
- };
- let request = ShardRequest::new(stream.stream_id, topic.topic_id,
partition_id, payload);
- let message = ShardMessage::Request(request);
-
- match self
- .send_request_to_shard_or_recoil(&namespace, message)
- .await?
+ // Resolve partition ID
+ let Some((consumer, partition_id)) =
self.resolve_consumer_with_partition_id(
+ stream_id,
+ topic_id,
+ &consumer,
+ client_id,
+ maybe_partition_id,
+ true,
+ ) else {
+ return Ok((IggyPollMetadata::new(0, 0),
IggyMessagesBatchSet::empty()));
+ };
+
+ let has_partition = self
+ .streams2
+ .with_topic_by_id(stream_id, topic_id, |(root, ..)| {
+ root.partitions().exists(partition_id)
+ });
+ if !has_partition {
+ return Err(IggyError::NoPartitions(
+ numeric_topic_id as u32,
+ numeric_stream_id as u32,
+ ));
+ }
+
+ let current_offset = self.streams2.with_partition_by_id(
+ stream_id,
+ topic_id,
+ partition_id,
+ |(_, _, _, offset, ..)| offset.load(Ordering::Relaxed),
+ );
+ if args.strategy.kind == PollingKind::Offset && args.strategy.value >
current_offset
+ || args.count == 0
+ {
+ return Ok((
+ IggyPollMetadata::new(partition_id as u32, current_offset),
+ IggyMessagesBatchSet::empty(),
+ ));
+ }
+
+ let namespace = IggyNamespace::new(numeric_stream_id,
numeric_topic_id, partition_id);
+ let payload = ShardRequestPayload::PollMessages { consumer, args };
+ let request = ShardRequest::new(stream_id.clone(), topic_id.clone(),
partition_id, payload);
+ let message = ShardMessage::Request(request);
+ let (metadata, batch) = match self
+ .send_request_to_shard_or_recoil(&namespace, message)
+ .await?
+ {
+ ShardSendRequestResult::Recoil(message) => {
+ if let ShardMessage::Request(ShardRequest {
+ partition_id,
+ payload,
+ ..
+ }) = message
+ && let ShardRequestPayload::PollMessages { consumer, args
} = payload
{
- ShardSendRequestResult::Recoil(message) => {
- if let ShardMessage::Request( ShardRequest {
partition_id, payload, .. } ) = message
- && let ShardRequestPayload::PollMessages {
consumer, args } = payload
- {
- topic.get_messages(consumer, partition_id,
args.strategy, args.count).await.with_error_context(|error| {
- format!("{COMPONENT}: Failed to get messages
for stream_id: {stream_id}, topic_id: {topic_id}, partition_id: {partition_id},
error: {error})")
- })
- } else {
- unreachable!(
- "Expected a PollMessages request inside of
PollMessages handler, impossible state"
+ let metadata = IggyPollMetadata::new(partition_id as u32,
current_offset);
+ let count = args.count;
+ let strategy = args.strategy;
+ let value = strategy.value;
+ let batches = match strategy.kind {
+ PollingKind::Offset => {
+ let offset = value;
+ // We have to remember to keep the invariant from
the if that is on line 496.
+ // Alternatively a better design would be to get
rid of that if and move the validations here.
+ let batches = self
+ .streams2
+ .get_messages_by_offset(
+ stream_id,
+ topic_id,
+ partition_id,
+ offset,
+ count,
+ )
+ .await?;
+ Ok(batches)
+ }
+ PollingKind::Timestamp => {
+ let timestamp = IggyTimestamp::from(value);
+ let timestamp_ts = timestamp.as_micros();
+ trace!(
+ "Getting {count} messages by timestamp: {} for
partition: {}...",
+ timestamp_ts, partition_id
);
+
+ let batches = self
+ .streams2
+ .get_messages_by_timestamp(
+ stream_id,
+ topic_id,
+ partition_id,
+ timestamp_ts,
+ count,
+ )
+ .await?;
+ Ok(batches)
}
- }
- ShardSendRequestResult::Response(response) => {
- match response {
- ShardResponse::PollMessages(result) => {
Ok(result) }
- ShardResponse::ErrorResponse(err) => {
- Err(err)
- }
- _ => unreachable!(
- "Expected a SendMessages response inside of
SendMessages handler, impossible state"
- ),
+ PollingKind::First => {
+ let first_offset =
self.streams2.with_partition_by_id(
+ stream_id,
+ topic_id,
+ partition_id,
+ |(_, _, _, _, _, _, log)| {
+ log.segments()
+ .first()
+ .map(|segment| segment.start_offset)
+ .unwrap_or(0)
+ },
+ );
+
+ let batches = self
+ .streams2
+ .get_messages_by_offset(
+ stream_id,
+ topic_id,
+ partition_id,
+ first_offset,
+ count,
+ )
+ .await?;
+ Ok(batches)
}
+ PollingKind::Last => {
+ let (start_offset, actual_count) =
self.streams2.with_partition_by_id(
+ stream_id,
+ topic_id,
+ partition_id,
+ |(_, _, _, offset, _, _, _)| {
+ let current_offset =
offset.load(Ordering::Relaxed);
+ let mut requested_count = 0;
+ if requested_count > current_offset + 1 {
+ requested_count = current_offset + 1
+ }
+ let start_offset = 1 + current_offset -
requested_count;
+ (start_offset, requested_count as u32)
+ },
+ );
+ let batches = self
+ .streams2
+ .get_messages_by_offset(
+ stream_id,
+ topic_id,
+ partition_id,
+ start_offset,
+ actual_count,
+ )
+ .await?;
+ Ok(batches)
+ }
+ PollingKind::Next => {
+ let (consumer_offset, consumer_id) = match
consumer {
+ PollingConsumer::Consumer(consumer_id, _) => (
+ self.streams2
+ .with_partition_by_id(
+ stream_id,
+ topic_id,
+ partition_id,
+
partitions::helpers::get_consumer_offset(consumer_id),
+ )
+ .map(|c_offset|
c_offset.stored_offset),
+ consumer_id,
+ ),
+ PollingConsumer::ConsumerGroup(cg_id, _) => (
+ self.streams2
+ .with_partition_by_id(
+ stream_id,
+ topic_id,
+ partition_id,
+
partitions::helpers::get_consumer_group_member_offset(
+ cg_id,
+ ),
+ )
+ .map(|cg_offset|
cg_offset.stored_offset),
+ cg_id,
+ ),
+ };
+
+ let Some(consumer_offset) = consumer_offset else {
+ return
Err(IggyError::ConsumerOffsetNotFound(consumer_id));
+ };
+ let offset = consumer_offset + 1;
+ trace!(
+ "Getting next messages for consumer id: {} for
partition: {} from offset: {}...",
+ consumer_id, partition_id, offset
+ );
+ let batches = self
+ .streams2
+ .get_messages_by_offset(
+ stream_id,
+ topic_id,
+ partition_id,
+ offset,
+ count,
+ )
+ .await?;
+ Ok(batches)
+ }
+ }?;
+
+ if args.auto_commit && !batches.is_empty() {
+ let offset = batches
+ .last_offset()
+ .expect("Batch set should have at least one
batch");
+ trace!(
+ "Last offset: {} will be automatically stored for
{}, stream: {}, topic: {}, partition: {}",
+ offset, consumer, numeric_stream_id,
numeric_topic_id, partition_id
+ );
+ match consumer {
+ PollingConsumer::Consumer(consumer_id, _) => {
+ self.streams2.with_partition_by_id(
+ stream_id,
+ topic_id,
+ partition_id,
+ partitions::helpers::store_consumer_offset(
+ consumer_id,
+ numeric_stream_id,
+ numeric_topic_id,
+ partition_id,
+ offset,
+ &self.config.system,
+ ),
+ );
+ self.streams2
+ .with_partition_by_id_async(
+ stream_id,
+ topic_id,
+ partition_id,
+
partitions::helpers::persist_consumer_offset_to_disk(
+ self.id,
+ consumer_id,
+ ),
+ )
+ .await?;
+ }
+ PollingConsumer::ConsumerGroup(cg_id, _) => {
+ self.streams2.with_partition_by_id(
+ stream_id,
+ topic_id,
+ partition_id,
+
partitions::helpers::store_consumer_group_member_offset(
+ cg_id,
+ numeric_stream_id,
+ numeric_topic_id,
+ partition_id,
+ offset,
+ &self.config.system,
+ ),
+ );
+ self.streams2.with_partition_by_id_async(
+ stream_id,
+ topic_id,
+ partition_id,
+
partitions::helpers::persist_consumer_group_member_offset_to_disk(
+ self.id,
+ cg_id,
+ ),
+ )
+ .await?;
+ }
+ }
}
+ Ok((metadata, batches))
+ } else {
+ unreachable!(
+ "Expected a PollMessages request inside of
PollMessages handler, impossible state"
+ );
}
- }).await?;
+ }
+ ShardSendRequestResult::Response(response) => match response {
+ ShardResponse::PollMessages(result) => Ok(result),
+ ShardResponse::ErrorResponse(err) => Err(err),
+ _ => unreachable!(
+ "Expected a SendMessages response inside of SendMessages
handler, impossible state"
+ ),
+ },
+ }?;
let batch = if let Some(_encryptor) = &self.encryptor {
//TODO: Bring back decryptor
@@ -522,7 +543,6 @@ impl IggyShard {
};
Ok((metadata, batch))
- */
}
pub async fn flush_unsaved_buffer(
diff --git a/core/server/src/shard/system/partitions.rs
b/core/server/src/shard/system/partitions.rs
index 892110ab..7b1eed84 100644
--- a/core/server/src/shard/system/partitions.rs
+++ b/core/server/src/shard/system/partitions.rs
@@ -21,11 +21,8 @@ use crate::shard::IggyShard;
use crate::shard::ShardInfo;
use crate::shard::namespace::IggyNamespace;
use crate::shard_info;
-use crate::slab::traits_ext::EntityComponentSystem;
-use crate::slab::traits_ext::EntityComponentSystemMutCell;
use crate::slab::traits_ext::EntityMarker;
use crate::slab::traits_ext::IntoComponents;
-use crate::slab::traits_ext::IntoComponentsById;
use crate::streaming::partitions;
use crate::streaming::partitions::helpers::create_message_deduplicator;
use crate::streaming::partitions::journal::MemoryMessageJournal;
@@ -113,7 +110,6 @@ impl IggyShard {
self.metrics.increment_partitions(partitions_count);
self.metrics.increment_segments(partitions_count);
- // TODO: Figure out how to do this operation in a batch.
for partition_id in partitions.iter().map(|p| p.id()) {
// TODO: Create shard table recordsj.
let ns = IggyNamespace::new(numeric_stream_id, numeric_topic_id,
partition_id);
@@ -259,6 +255,12 @@ impl IggyShard {
topic_id: &Identifier,
partitions: Vec<partition2::Partition>,
) -> Result<(), IggyError> {
+ let numeric_stream_id = self
+ .streams2
+ .with_stream_by_id(stream_id, streams::helpers::get_stream_id());
+ let numeric_topic_id =
+ self.streams2
+ .with_topic_by_id(stream_id, topic_id,
topics::helpers::get_topic_id());
for partition in partitions {
let actual_id = partition.id();
let id = self.insert_partition_mem(stream_id, topic_id, partition);
@@ -266,7 +268,13 @@ impl IggyShard {
id, actual_id,
"create_partitions_bypass_auth: partition mismatch ID, wrong
creation order ?!"
);
- self.init_log(stream_id, topic_id, id).await?;
+ let ns = IggyNamespace::new(numeric_stream_id, numeric_topic_id,
id);
+ let shard_info = self
+ .find_shard_table_record(&ns)
+ .expect("create_partitions_bypass_auth: missing shard table
record");
+ if self.id == shard_info.id {
+ self.init_log(stream_id, topic_id, id).await?;
+ }
}
Ok(())
}
diff --git a/core/server/src/shard/system/utils.rs
b/core/server/src/shard/system/utils.rs
index 1aa2857e..0c9b637b 100644
--- a/core/server/src/shard/system/utils.rs
+++ b/core/server/src/shard/system/utils.rs
@@ -61,7 +61,7 @@ impl IggyShard {
) -> Option<(PollingConsumer, usize)> {
match consumer.kind {
ConsumerKind::Consumer => {
- let partition_id = partition_id.unwrap_or(1);
+ let partition_id = partition_id.unwrap_or(0);
Some((
PollingConsumer::consumer(&consumer.id, partition_id as
usize),
partition_id as usize,
diff --git a/core/server/src/shard/transmission/message.rs
b/core/server/src/shard/transmission/message.rs
index 73d2a4e8..a4e4bb3a 100644
--- a/core/server/src/shard/transmission/message.rs
+++ b/core/server/src/shard/transmission/message.rs
@@ -1,6 +1,6 @@
use std::{rc::Rc, sync::Arc};
-use iggy_common::PollingStrategy;
+use iggy_common::{Identifier, PollingStrategy};
/* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -24,6 +24,7 @@ use crate::{
system::messages::PollingArgs,
transmission::{event::ShardEvent, frame::ShardResponse},
},
+ slab::partitions,
streaming::{polling_consumer::PollingConsumer,
segments::IggyMessagesBatchMut},
};
@@ -41,17 +42,17 @@ pub enum ShardMessage {
#[derive(Debug)]
pub struct ShardRequest {
- pub stream_id: u32,
- pub topic_id: u32,
- pub partition_id: u32,
+ pub stream_id: Identifier,
+ pub topic_id: Identifier,
+ pub partition_id: usize,
pub payload: ShardRequestPayload,
}
impl ShardRequest {
pub fn new(
- stream_id: u32,
- topic_id: u32,
- partition_id: u32,
+ stream_id: Identifier,
+ topic_id: Identifier,
+ partition_id: partitions::ContainerId,
payload: ShardRequestPayload,
) -> Self {
Self {
diff --git a/core/server/src/slab/helpers.rs b/core/server/src/slab/helpers.rs
index 3f72b20a..31a5a8b7 100644
--- a/core/server/src/slab/helpers.rs
+++ b/core/server/src/slab/helpers.rs
@@ -27,11 +27,11 @@ where
async |(root, ..)| f(root.topics()).await
}
-pub fn topics_mut<O, F>(f: F) -> impl FnOnce(ComponentsById<StreamRefMut>) -> O
+pub fn topics_mut<O, F>(f: F) -> impl FnOnce(ComponentsById<StreamRef>) -> O
where
- F: for<'a> FnOnce(&'a mut Topics) -> O,
+ F: for<'a> FnOnce(&'a Topics) -> O,
{
- |(mut root, ..)| f(root.topics_mut())
+ |(root, ..)| f(root.topics())
}
pub fn partitions<O, F>(f: F) -> impl FnOnce(ComponentsById<TopicRef>) -> O
diff --git a/core/server/src/slab/partitions.rs
b/core/server/src/slab/partitions.rs
index d8ee0adf..7c7f15b7 100644
--- a/core/server/src/slab/partitions.rs
+++ b/core/server/src/slab/partitions.rs
@@ -196,6 +196,10 @@ impl Partitions {
self.with_components_by_id(id, |components| f(components))
}
+ pub fn exists(&self, id: ContainerId) -> bool {
+ self.root.contains(id)
+ }
+
pub fn with_partition_by_id_mut<T>(
&mut self,
id: ContainerId,
diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs
index 1cd2989e..0c84f7a3 100644
--- a/core/server/src/slab/streams.rs
+++ b/core/server/src/slab/streams.rs
@@ -1,4 +1,5 @@
use crate::{
+ configs::{cache_indexes::CacheIndexesConfig, system::SystemConfig},
slab::{
Keyed,
consumer_groups::ConsumerGroups,
@@ -6,30 +7,33 @@ use crate::{
partitions::{self, Partitions},
topics::Topics,
traits_ext::{
- ComponentsById, ComponentsByIdMapping, ComponentsMapping,
DeleteCell,
- EntityComponentSystem, EntityComponentSystemMutCell, InsertCell,
InteriorMutability,
- IntoComponents,
+ ComponentsById, DeleteCell, EntityComponentSystem,
EntityComponentSystemMutCell,
+ InsertCell, InteriorMutability, IntoComponents,
},
},
streaming::{
- partitions::{
- journal::MemoryMessageJournal,
- log::{Log, SegmentedLog},
- partition2::{PartitionRef, PartitionRefMut},
- },
+ partitions::partition2::{PartitionRef, PartitionRefMut},
+ segments::{IggyMessagesBatchSet, Segment2,
storage::create_segment_storage},
stats::stats::StreamStats,
- streams::stream2::{self, StreamRef, StreamRefMut},
+ streams::{
+ self,
+ stream2::{self, StreamRef, StreamRefMut},
+ },
topics::{
+ self,
consumer_group2::{ConsumerGroupRef, ConsumerGroupRefMut},
- topic2::{self, TopicRef, TopicRefMut},
+ topic2::{TopicRef, TopicRefMut},
},
},
};
use ahash::AHashMap;
-use iggy_common::Identifier;
+use iggy_common::{Identifier, IggyError};
use slab::Slab;
use std::{cell::RefCell, sync::Arc};
+// Import streaming partitions helpers for the persist_messages method
+use crate::streaming::partitions as streaming_partitions;
+
const CAPACITY: usize = 1024;
pub type ContainerId = usize;
@@ -94,7 +98,7 @@ impl DeleteCell for Streams {
type Idx = ContainerId;
type Item = stream2::Stream;
- fn delete(&self, id: Self::Idx) -> Self::Item {
+ fn delete(&self, _id: Self::Idx) -> Self::Item {
todo!()
}
}
@@ -213,12 +217,8 @@ impl Streams {
self.with_stream_by_id_async(stream_id, helpers::topics_async(f))
}
- pub fn with_topics_mut<T>(
- &self,
- stream_id: &Identifier,
- f: impl FnOnce(&mut Topics) -> T,
- ) -> T {
- self.with_stream_by_id_mut(stream_id, helpers::topics_mut(f))
+ pub fn with_topics_mut<T>(&self, stream_id: &Identifier, f: impl
FnOnce(&Topics) -> T) -> T {
+ self.with_stream_by_id(stream_id, helpers::topics_mut(f))
}
pub fn with_topic_by_id<T>(
@@ -395,4 +395,199 @@ impl Streams {
pub fn len(&self) -> usize {
self.root.borrow().len()
}
+
+ pub async fn get_messages_by_offset(
+ &self,
+ stream_id: &Identifier,
+ topic_id: &Identifier,
+ partition_id: partitions::ContainerId,
+ offset: u64,
+ count: u32,
+ ) -> Result<IggyMessagesBatchSet, IggyError> {
+ use crate::streaming::partitions::helpers;
+ let range = self.with_partition_by_id(
+ stream_id,
+ topic_id,
+ partition_id,
+ helpers::get_segment_range_by_offset(offset),
+ );
+
+ self.with_partition_by_id_async(
+ stream_id,
+ topic_id,
+ partition_id,
+ helpers::get_messages_by_offset_range(offset, count, range),
+ )
+ .await
+ }
+
+ pub async fn get_messages_by_timestamp(
+ &self,
+ stream_id: &Identifier,
+ topic_id: &Identifier,
+ partition_id: partitions::ContainerId,
+ timestamp: u64,
+ count: u32,
+ ) -> Result<IggyMessagesBatchSet, IggyError> {
+ use crate::streaming::partitions::helpers;
+ let range = self.with_partition_by_id(
+ stream_id,
+ topic_id,
+ partition_id,
+ helpers::get_segment_range_by_timestamp(timestamp),
+ );
+
+ self.with_partition_by_id_async(
+ stream_id,
+ topic_id,
+ partition_id,
+ helpers::get_messages_by_timestamp_range(timestamp, count, range),
+ )
+ .await
+ }
+
+ pub async fn handle_full_segment(
+ &self,
+ shard_id: u16,
+ stream_id: &Identifier,
+ topic_id: &Identifier,
+ partition_id: partitions::ContainerId,
+ config: &crate::configs::system::SystemConfig,
+ ) -> Result<(), IggyError> {
+ let numeric_stream_id =
+ self.with_stream_by_id(stream_id,
streams::helpers::get_stream_id());
+ let numeric_topic_id =
+ self.with_topic_by_id(stream_id, topic_id,
topics::helpers::get_topic_id());
+
+ if config.segment.cache_indexes == CacheIndexesConfig::OpenSegment
+ || config.segment.cache_indexes == CacheIndexesConfig::None
+ {
+ self.with_partition_by_id_mut(stream_id, topic_id, partition_id,
|(.., log)| {
+ log.clear_active_indexes();
+ });
+ }
+
+ self.with_partition_by_id_mut(stream_id, topic_id, partition_id, |(..,
log)| {
+ log.active_segment_mut().sealed = true;
+ });
+ let (log_writer, index_writer) =
+ self.with_partition_by_id_mut(stream_id, topic_id, partition_id,
|(.., log)| {
+ log.active_storage_mut().shutdown()
+ });
+
+ compio::runtime::spawn(async move {
+ let _ = log_writer.fsync().await;
+ })
+ .detach();
+ compio::runtime::spawn(async move {
+ let _ = index_writer.fsync().await;
+ drop(index_writer)
+ })
+ .detach();
+
+ let (start_offset, size, end_offset) =
+ self.with_partition_by_id(stream_id, topic_id, partition_id, |(..,
log)| {
+ (
+ log.active_segment().start_offset,
+ log.active_segment().size,
+ log.active_segment().end_offset,
+ )
+ });
+
+ crate::shard_info!(
+ shard_id,
+ "Closed segment for stream: {}, topic: {} with start offset: {},
end offset: {}, size: {} for partition with ID: {}.",
+ stream_id,
+ topic_id,
+ start_offset,
+ end_offset,
+ size,
+ partition_id
+ );
+
+ let messages_size = 0;
+ let indexes_size = 0;
+ let segment = Segment2::new(
+ end_offset + 1,
+ config.segment.size,
+ config.segment.message_expiry,
+ );
+
+ let storage = create_segment_storage(
+ &config,
+ numeric_stream_id,
+ numeric_topic_id,
+ partition_id,
+ messages_size,
+ indexes_size,
+ end_offset + 1,
+ )
+ .await?;
+ self.with_partition_by_id_mut(stream_id, topic_id, partition_id, |(..,
log)| {
+ log.add_persisted_segment(segment, storage);
+ });
+
+ Ok(())
+ }
+
+ pub async fn persist_messages(
+ &self,
+ shard_id: u16,
+ stream_id: &Identifier,
+ topic_id: &Identifier,
+ partition_id: usize,
+ unsaved_messages_count_exceeded: bool,
+ unsaved_messages_size_exceeded: bool,
+ journal_messages_count: u32,
+ journal_size: u32,
+ config: &SystemConfig,
+ ) -> Result<(), IggyError> {
+ let batches = self.with_partition_by_id_mut(
+ stream_id,
+ topic_id,
+ partition_id,
+ streaming_partitions::helpers::commit_journal(),
+ );
+
+ let reason = self.with_partition_by_id(
+ stream_id,
+ topic_id,
+ partition_id,
+ streaming_partitions::helpers::persist_reason(
+ unsaved_messages_count_exceeded,
+ unsaved_messages_size_exceeded,
+ journal_messages_count,
+ journal_size,
+ config,
+ ),
+ );
+ let (saved, batch_count) = self
+ .with_partition_by_id_async(
+ stream_id,
+ topic_id,
+ partition_id,
+ streaming_partitions::helpers::persist_batch(
+ shard_id,
+ stream_id,
+ topic_id,
+ partition_id,
+ batches,
+ reason,
+ ),
+ )
+ .await?;
+
+ self.with_partition_by_id_mut(
+ stream_id,
+ topic_id,
+ partition_id,
+ streaming_partitions::helpers::update_index_and_increment_stats(
+ saved,
+ batch_count,
+ config,
+ ),
+ );
+
+ Ok(())
+ }
}
diff --git a/core/server/src/streaming/cluster/mod.rs
b/core/server/src/streaming/cluster/mod.rs
index 61b376c7..036ebe1c 100644
--- a/core/server/src/streaming/cluster/mod.rs
+++ b/core/server/src/streaming/cluster/mod.rs
@@ -16,7 +16,7 @@
* under the License.
*/
- /*
+/*
use crate::streaming::session::Session;
use crate::streaming::systems::system::System;
use iggy_common::IggyError;
@@ -68,4 +68,4 @@ impl System {
})
}
}
- */
\ No newline at end of file
+ */
diff --git a/core/server/src/streaming/partitions/helpers.rs
b/core/server/src/streaming/partitions/helpers.rs
index 213f7706..bb37b568 100644
--- a/core/server/src/streaming/partitions/helpers.rs
+++ b/core/server/src/streaming/partitions/helpers.rs
@@ -1,9 +1,14 @@
-use std::sync::atomic::Ordering;
-
-use iggy_common::{ConsumerOffsetInfo, IggyError};
+use error_set::ErrContext;
+use iggy_common::{ConsumerOffsetInfo, Identifier, IggyByteSize, IggyError};
+use std::{
+ ops::{AsyncFnOnce, Index},
+ sync::atomic::Ordering,
+};
+use sysinfo::Component;
use crate::{
- configs::system::SystemConfig,
+ configs::{cache_indexes::CacheIndexesConfig, system::SystemConfig},
+ shard_trace,
slab::{
partitions::{self, Partitions},
traits_ext::{
@@ -14,9 +19,11 @@ use crate::{
deduplication::message_deduplicator::MessageDeduplicator,
partitions::{
consumer_offset::ConsumerOffset,
- partition2::{self, PartitionRef},
+ journal::{Journal, MemoryMessageJournal},
+ partition2::{self, PartitionRef, PartitionRefMut},
storage2,
},
+ segments::{IggyIndexesMut, IggyMessagesBatchMut, IggyMessagesBatchSet,
storage::Storage},
},
};
@@ -212,7 +219,7 @@ pub fn delete_consumer_group_member_offset_from_disk(
}
pub fn purge_segments_mem() -> impl FnOnce(&mut Partitions) {
- |partitions| {
+ |_partitions| {
// TODO:
//partitions.segments_mut()
}
@@ -235,3 +242,583 @@ pub fn create_message_deduplicator(config: &SystemConfig)
-> Option<MessageDedup
Some(MessageDeduplicator::new(max_entries, expiry))
}
+
+pub async fn get_messages_by_offset(
+ storage: &Storage,
+ journal: &MemoryMessageJournal,
+ index: &Option<IggyIndexesMut>,
+ offset: u64,
+ end_offset: u64,
+ count: u32,
+ segment_start_offset: u64,
+) -> Result<IggyMessagesBatchSet, IggyError> {
+ if count == 0 {
+ return Ok(IggyMessagesBatchSet::default());
+ }
+
+ // Case 0: Accumulator is empty, so all messages have to be on disk
+ if journal.is_empty() {
+ return load_messages_from_disk_by_offset(
+ storage,
+ index,
+ offset,
+ count,
+ segment_start_offset,
+ )
+ .await;
+ }
+
+ let journal_first_offset = journal.inner().base_offset;
+ let journal_last_offset = journal.inner().current_offset;
+
+ // Case 1: All messages are in accumulator buffer
+ if offset >= journal_first_offset && end_offset <= journal_last_offset {
+ return Ok(journal.get(|batches| batches.get_by_offset(offset, count)));
+ }
+
+ // Case 2: All messages are on disk
+ if end_offset < journal_first_offset {
+ return load_messages_from_disk_by_offset(
+ storage,
+ index,
+ offset,
+ count,
+ segment_start_offset,
+ )
+ .await;
+ }
+
+ // Case 3: Messages span disk and accumulator buffer boundary
+ // Calculate how many messages we need from disk
+ let disk_count = if offset < journal_first_offset {
+ ((journal_first_offset - offset) as u32).min(count)
+ } else {
+ 0
+ };
+
+ let mut combined_batch_set = IggyMessagesBatchSet::empty();
+
+ // Load messages from disk if needed
+ if disk_count > 0 {
+ let disk_messages = load_messages_from_disk_by_offset(storage, index,
offset, disk_count, segment_start_offset)
+ .await
+ .with_error_context(|error| {
+ format!("Failed to load messages from disk, start offset:
{offset}, count: {disk_count}, error: {error}")
+ })?;
+
+ if !disk_messages.is_empty() {
+ combined_batch_set.add_batch_set(disk_messages);
+ }
+ }
+
+ // Calculate how many more messages we need from the accumulator
+ let remaining_count = count - combined_batch_set.count();
+
+ if remaining_count > 0 {
+ let accumulator_start_offset = std::cmp::max(offset,
journal_first_offset);
+
+ let accumulator_messages =
+ journal.get(|batches|
batches.get_by_offset(accumulator_start_offset, remaining_count));
+
+ if !accumulator_messages.is_empty() {
+ combined_batch_set.add_batch_set(accumulator_messages);
+ }
+ }
+
+ Ok(combined_batch_set)
+}
+
+async fn load_messages_from_disk_by_offset(
+ storage: &Storage,
+ index: &Option<IggyIndexesMut>,
+ start_offset: u64,
+ count: u32,
+ segment_start_offset: u64,
+) -> Result<IggyMessagesBatchSet, IggyError> {
+ // Convert start_offset to relative offset within the segment
+ let relative_start_offset = (start_offset - segment_start_offset) as u32;
+
+ // Load indexes first
+ let indexes_to_read = if let Some(indexes) = index {
+ if !indexes.is_empty() {
+ indexes.slice_by_offset(relative_start_offset, count)
+ } else {
+ storage
+ .index_reader
+ .as_ref()
+ .expect("Index reader not initialized")
+ .load_from_disk_by_offset(relative_start_offset, count)
+ .await?
+ }
+ } else {
+ storage
+ .index_reader
+ .as_ref()
+ .expect("Index reader not initialized")
+ .load_from_disk_by_offset(relative_start_offset, count)
+ .await?
+ };
+
+ if indexes_to_read.is_none() {
+ return Ok(IggyMessagesBatchSet::empty());
+ }
+
+ let indexes_to_read = indexes_to_read.unwrap();
+ let first = indexes_to_read.get(0).unwrap();
+ let last = indexes_to_read.last().unwrap();
+
+ let batch = storage
+ .messages_reader
+ .as_ref()
+ .expect("Messages reader not initialized")
+ .load_messages_from_disk(indexes_to_read)
+ .await
+ .with_error_context(|error| format!("Failed to load messages from
disk: {error}"))?;
+
+ batch
+ .validate_checksums_and_offsets(start_offset)
+ .with_error_context(|error| {
+ format!("Failed to validate messages read from disk! error:
{error}")
+ })?;
+
+ Ok(IggyMessagesBatchSet::from(batch))
+}
+
+pub fn get_messages_by_offset_range(
+ offset: u64,
+ count: u32,
+ range: std::ops::Range<usize>,
+) -> impl AsyncFnOnce(ComponentsById<PartitionRef>) ->
Result<IggyMessagesBatchSet, IggyError> {
+ async move |(.., log)| -> Result<IggyMessagesBatchSet, IggyError> {
+ let segments = log.segments().iter();
+ let storages = log.storages().iter();
+ let journal = log.journal();
+ let indexes = log.indexes().iter();
+
+ let mut remaining_count = count;
+ let mut batches = IggyMessagesBatchSet::empty();
+ let mut current_offset = offset;
+
+ for (segment, storage, index) in segments
+ .zip(storages)
+ .zip(indexes)
+ .map(|((a, b), c)| (a, b, c))
+ .skip(range.start)
+ .take(range.end - range.start)
+ {
+ let start_offset = if current_offset < segment.start_offset {
+ segment.start_offset
+ } else {
+ current_offset
+ };
+
+ let mut end_offset = start_offset + (remaining_count - 1) as u64;
+ if end_offset > segment.end_offset {
+ end_offset = segment.end_offset;
+ }
+
+ // Calculate the actual count to request from this segment
+ let count: u32 = ((end_offset - start_offset + 1) as
u32).min(remaining_count);
+
+ let messages = get_messages_by_offset(
+ storage,
+ journal,
+ index,
+ start_offset,
+ end_offset,
+ count,
+ segment.start_offset,
+ )
+ .await?;
+
+ let messages_count = messages.count();
+ if messages_count == 0 {
+ current_offset = segment.end_offset + 1;
+ continue;
+ }
+
+ remaining_count = remaining_count.saturating_sub(messages_count);
+
+ if let Some(last_offset) = messages.last_offset() {
+ current_offset = last_offset + 1;
+ } else if messages_count > 0 {
+ current_offset += messages_count as u64;
+ }
+
+ batches.add_batch_set(messages);
+
+ if remaining_count == 0 {
+ break;
+ }
+ }
+ Ok(batches)
+ }
+}
+
+pub fn get_segment_range_by_offset(
+ offset: u64,
+) -> impl FnOnce(ComponentsById<PartitionRef>) -> std::ops::Range<usize> {
+ move |(.., log)| {
+ let start = log
+ .segments()
+ .iter()
+ .enumerate()
+ .filter(|(_, segment)| segment.start_offset <= offset)
+ .map(|(index, _)| index)
+ .next()
+ .expect("get_segment_range_by_offset: start segment not found");
+ let end = log.segments().len();
+ start..end
+ }
+}
+
+pub fn get_segment_range_by_timestamp(
+ timestamp: u64,
+) -> impl FnOnce(ComponentsById<PartitionRef>) -> std::ops::Range<usize> {
+ move |(.., log)| -> std::ops::Range<usize> {
+ let start = log
+ .segments()
+ .iter()
+ .enumerate()
+ .find(|(_, segment)| segment.end_timestamp >= timestamp)
+ .map(|(index, _)| index)
+ .expect("get_segment_range_by_timestamp: start segment not found");
+ let end = log.segments().len();
+ start..end
+ }
+}
+
+pub async fn load_messages_from_disk_by_timestamp(
+ storage: &Storage,
+ index: &Option<IggyIndexesMut>,
+ timestamp: u64,
+ count: u32,
+) -> Result<IggyMessagesBatchSet, IggyError> {
+ let indexes_to_read = if let Some(indexes) = index {
+ if !indexes.is_empty() {
+ indexes.slice_by_timestamp(timestamp, count)
+ } else {
+ storage
+ .index_reader
+ .as_ref()
+ .expect("Index reader not initialized")
+ .load_from_disk_by_timestamp(timestamp, count)
+ .await?
+ }
+ } else {
+ storage
+ .index_reader
+ .as_ref()
+ .expect("Index reader not initialized")
+ .load_from_disk_by_timestamp(timestamp, count)
+ .await?
+ };
+
+ if indexes_to_read.is_none() {
+ return Ok(IggyMessagesBatchSet::empty());
+ }
+
+ let indexes_to_read = indexes_to_read.unwrap();
+
+ let batch = storage
+ .messages_reader
+ .as_ref()
+ .expect("Messages reader not initialized")
+ .load_messages_from_disk(indexes_to_read)
+ .await
+ .with_error_context(|error| {
+ format!("Failed to load messages from disk by timestamp: {error}")
+ })?;
+
+ Ok(IggyMessagesBatchSet::from(batch))
+}
+
+pub async fn get_messages_by_timestamp(
+ storage: &Storage,
+ journal: &MemoryMessageJournal,
+ index: &Option<IggyIndexesMut>,
+ timestamp: u64,
+ count: u32,
+) -> Result<IggyMessagesBatchSet, IggyError> {
+ if count == 0 {
+ return Ok(IggyMessagesBatchSet::default());
+ }
+
+ // Case 0: Accumulator is empty, so all messages have to be on disk
+ if journal.is_empty() {
+ return load_messages_from_disk_by_timestamp(storage, index, timestamp,
count).await;
+ }
+
+ let journal_first_timestamp = journal.inner().first_timestamp;
+ let journal_last_timestamp = journal.inner().end_timestamp;
+
+ // Case 1: All messages are in accumulator buffer
+ if timestamp > journal_last_timestamp {
+ return Ok(IggyMessagesBatchSet::empty());
+ }
+
+ if timestamp >= journal_first_timestamp {
+ return Ok(journal.get(|batches| batches.get_by_timestamp(timestamp,
count)));
+ }
+
+ // Case 2: All messages are on disk (timestamp is before journal's first
timestamp)
+ let disk_messages =
+ load_messages_from_disk_by_timestamp(storage, index, timestamp,
count).await?;
+
+ if disk_messages.count() >= count {
+ return Ok(disk_messages);
+ }
+
+ // Case 3: Messages span disk and accumulator buffer boundary
+ let remaining_count = count - disk_messages.count();
+ let journal_messages =
+ journal.get(|batches| batches.get_by_timestamp(timestamp,
remaining_count));
+
+ let mut combined_batch_set = disk_messages;
+ if !journal_messages.is_empty() {
+ combined_batch_set.add_batch_set(journal_messages);
+ }
+ return Ok(combined_batch_set);
+}
+
+pub fn get_messages_by_timestamp_range(
+ timestamp: u64,
+ count: u32,
+ range: std::ops::Range<usize>,
+) -> impl AsyncFnOnce(ComponentsById<PartitionRef>) ->
Result<IggyMessagesBatchSet, IggyError> {
+ async move |(.., log)| -> Result<IggyMessagesBatchSet, IggyError> {
+ let segments = log.segments().iter();
+ let storages = log.storages().iter();
+ let journal = log.journal();
+ let indexes = log.indexes().iter();
+
+ let mut remaining_count = count;
+ let mut batches = IggyMessagesBatchSet::empty();
+
+ for (segment, storage, index) in segments
+ .zip(storages)
+ .zip(indexes)
+ .map(|((a, b), c)| (a, b, c))
+ .skip(range.start)
+ .take(range.end - range.start)
+ {
+ if remaining_count == 0 {
+ break;
+ }
+
+ // Skip segments that end before our timestamp
+ if segment.end_timestamp < timestamp {
+ continue;
+ }
+
+ let messages =
+ get_messages_by_timestamp(storage, journal, index, timestamp,
remaining_count)
+ .await?;
+
+ let messages_count = messages.count();
+ if messages_count == 0 {
+ continue;
+ }
+
+ remaining_count = remaining_count.saturating_sub(messages_count);
+ batches.add_batch_set(messages);
+
+ if remaining_count == 0 {
+ break;
+ }
+ }
+
+ Ok(batches)
+ }
+}
+
+pub fn calculate_current_offset() -> impl FnOnce(ComponentsById<PartitionRef>)
-> u64 {
+ |(root, _, _, offset, ..)| {
+ if !root.should_increment_offset() {
+ 0
+ } else {
+ offset.load(Ordering::Relaxed) + 1
+ }
+ }
+}
+
+pub fn deduplicate_messages(
+ current_offset: u64,
+ batch: &mut IggyMessagesBatchMut,
+) -> impl AsyncFnOnce(ComponentsById<PartitionRef>) {
+ async move |(.., deduplicator, _, _, _, log)| {
+ let segment = log.active_segment();
+ batch
+ .prepare_for_persistence(
+ segment.start_offset,
+ current_offset,
+ segment.size,
+ deduplicator.as_ref(),
+ )
+ .await;
+ }
+}
+
+pub fn append_to_journal(
+ shard_id: u16,
+ current_offset: u64,
+ batch: IggyMessagesBatchMut,
+) -> impl FnOnce(ComponentsById<PartitionRefMut>) -> Result<(u32, u32),
IggyError> {
+ move |(root, stats, _, offset, .., log)| {
+ let segment = log.active_segment_mut();
+
+ if segment.end_offset == 0 {
+ segment.start_timestamp = batch.first_timestamp().unwrap();
+ }
+
+ let batch_messages_size = batch.size();
+ let batch_messages_count = batch.count();
+
+ segment.end_timestamp = batch.last_timestamp().unwrap();
+ segment.end_offset = batch.last_offset().unwrap();
+
+ let (journal_messages_count, journal_size) =
log.journal_mut().append(shard_id, batch)?;
+
+ stats.increment_messages_count(batch_messages_count as u64);
+ stats.increment_size_bytes(batch_messages_size as u64);
+
+ let last_offset = if batch_messages_count == 0 {
+ current_offset
+ } else {
+ current_offset + batch_messages_count as u64 - 1
+ };
+
+ if root.should_increment_offset() {
+ offset.store(last_offset, Ordering::Relaxed);
+ } else {
+ root.set_should_increment_offset(true);
+ offset.store(last_offset, Ordering::Relaxed);
+ }
+
+ Ok((journal_messages_count, journal_size))
+ }
+}
+
+pub fn commit_journal() -> impl FnOnce(ComponentsById<PartitionRefMut>) ->
IggyMessagesBatchSet {
+ |(.., log)| {
+ let batches = log.journal_mut().commit();
+ log.ensure_indexes();
+ batches.append_indexes_to(log.active_indexes_mut().unwrap());
+ batches
+ }
+}
+
+pub fn is_segment_full() -> impl FnOnce(ComponentsById<PartitionRef>) -> bool {
+ |(.., log)| log.active_segment().is_full()
+}
+
+pub fn persist_reason(
+ unsaved_messages_count_exceeded: bool,
+ unsaved_messages_size_exceeded: bool,
+ journal_messages_count: u32,
+ journal_size: u32,
+ config: &SystemConfig,
+) -> impl FnOnce(ComponentsById<PartitionRef>) -> String {
+ move |(.., log)| {
+ if unsaved_messages_count_exceeded {
+ format!(
+ "unsaved messages count exceeded: {}, max from config: {}",
+ journal_messages_count,
config.partition.messages_required_to_save,
+ )
+ } else if unsaved_messages_size_exceeded {
+ format!(
+ "unsaved messages size exceeded: {}, max from config: {}",
+ journal_size,
config.partition.size_of_messages_required_to_save,
+ )
+ } else {
+ format!(
+ "segment is full, current size: {}, max from config: {}",
+ log.active_segment().size,
+ &config.segment.size,
+ )
+ }
+ }
+}
+
+pub fn persist_batch(
+ shard_id: u16,
+ stream_id: &Identifier,
+ topic_id: &Identifier,
+ partition_id: usize,
+ batches: IggyMessagesBatchSet,
+ reason: String,
+) -> impl AsyncFnOnce(ComponentsById<PartitionRef>) -> Result<(IggyByteSize,
u32), IggyError> {
+ async move |(.., log)| {
+ shard_trace!(
+ shard_id,
+ "Persisting messages on disk for stream ID: {}, topic ID: {},
partition ID: {} because {}...",
+ stream_id,
+ topic_id,
+ partition_id,
+ reason
+ );
+
+ let batch_count = batches.count();
+ let batch_size = batches.size();
+
+ let storage = log.active_storage();
+ let saved = storage
+ .messages_writer
+ .as_ref()
+ .expect("Messages writer not initialized")
+ .save_batch_set(batches)
+ .await
+ .with_error_context(|error| {
+ let segment = log.active_segment();
+ format!(
+ "Failed to save batch of {batch_count} messages \
+ ({batch_size} bytes) to {segment}.
{error}",
+ )
+ })?;
+
+ let indices = log.active_indexes().unwrap();
+ let first_index = indices.get(0).unwrap();
+ let last_index = indices.last().unwrap();
+ let unsaved_indexes_slice =
log.active_indexes().unwrap().unsaved_slice();
+ let len = unsaved_indexes_slice.len();
+ storage
+ .index_writer
+ .as_ref()
+ .expect("Index writer not initialized")
+ .save_indexes(unsaved_indexes_slice)
+ .await
+ .with_error_context(|error| {
+ let segment = log.active_segment();
+ format!("Failed to save index of {len} indexes to {segment}.
{error}",)
+ })?;
+
+ shard_trace!(
+ shard_id,
+ "Persisted {} messages on disk for stream ID: {}, topic ID: {},
for partition with ID: {}, total bytes written: {}.",
+ batch_count,
+ stream_id,
+ topic_id,
+ partition_id,
+ saved
+ );
+
+ Ok((saved, batch_count))
+ }
+}
+
+pub fn update_index_and_increment_stats(
+ saved: IggyByteSize,
+ batch_count: u32,
+ config: &SystemConfig,
+) -> impl FnOnce(ComponentsById<PartitionRefMut>) {
+ move |(_, stats, .., log)| {
+ let segment = log.active_segment_mut();
+ segment.size += saved.as_bytes_u32();
+ log.active_indexes_mut().unwrap().mark_saved();
+ if config.segment.cache_indexes == CacheIndexesConfig::None {
+ log.active_indexes_mut().unwrap().clear();
+ }
+ stats.increment_size_bytes(saved.as_bytes_u64());
+ stats.increment_messages_count(batch_count as u64);
+ }
+}
diff --git a/core/server/src/streaming/partitions/journal.rs
b/core/server/src/streaming/partitions/journal.rs
index 19d6dd82..9feee8de 100644
--- a/core/server/src/streaming/partitions/journal.rs
+++ b/core/server/src/streaming/partitions/journal.rs
@@ -12,6 +12,8 @@ use std::fmt::Debug;
pub struct Inner {
pub base_offset: u64,
pub current_offset: u64,
+ pub first_timestamp: u64,
+ pub end_timestamp: u64,
pub messages_count: u32,
pub size: u32,
}
@@ -49,8 +51,14 @@ impl Journal for MemoryMessageJournal {
);
let batch_size = entry.size();
+ let first_timestamp = entry.first_timestamp().unwrap();
+ let last_timestamp = entry.last_timestamp().unwrap();
self.batches.add_batch(entry);
+ if self.inner.first_timestamp == 0 {
+ self.inner.first_timestamp = first_timestamp;
+ }
+ self.inner.end_timestamp = last_timestamp;
self.inner.messages_count += batch_messages_count;
self.inner.current_offset = self.inner.base_offset +
self.inner.messages_count as u64 - 1;
self.inner.size += batch_size;
@@ -72,10 +80,20 @@ impl Journal for MemoryMessageJournal {
fn commit(&mut self) -> Self::Container {
self.inner.base_offset = self.inner.current_offset;
+ self.inner.first_timestamp = 0;
+ self.inner.end_timestamp = 0;
self.inner.size = 0;
self.inner.messages_count = 0;
std::mem::take(&mut self.batches)
}
+
+ fn is_empty(&self) -> bool {
+ self.batches.is_empty()
+ }
+
+ fn inner(&self) -> &Self::Inner {
+ &self.inner
+ }
}
pub trait Journal {
@@ -93,6 +111,10 @@ pub trait Journal {
fn commit(&mut self) -> Self::Container;
+ fn is_empty(&self) -> bool;
+
+ fn inner(&self) -> &Self::Inner;
+
// `flush` is only useful in case of an journal that has disk backed WAL.
// This could be merged together with `append`, but not doing this for two
reasons.
// 1. In case of the `Journal` being used as part of structure that
utilizes interior mutability, async with borrow_mut is not possible.
diff --git a/core/server/src/streaming/partitions/log.rs
b/core/server/src/streaming/partitions/log.rs
index 87003880..83712975 100644
--- a/core/server/src/streaming/partitions/log.rs
+++ b/core/server/src/streaming/partitions/log.rs
@@ -24,7 +24,7 @@ where
access_map: AllocRingBuffer<usize>,
cache: (),
segments: Vec<Segment2>,
- indexes: Option<IggyIndexesMut>,
+ indexes: Vec<Option<IggyIndexesMut>>,
storage: Vec<Storage>,
}
@@ -48,7 +48,7 @@ where
cache: (),
segments: Vec::with_capacity(SEGMENTS_CAPACITY),
storage: Vec::with_capacity(SEGMENTS_CAPACITY),
- indexes: None,
+ indexes: Vec::with_capacity(SEGMENTS_CAPACITY),
}
}
}
@@ -57,6 +57,18 @@ impl<J> SegmentedLog<J>
where
J: Journal + Debug,
{
+ pub fn has_segments(&self) -> bool {
+ !self.segments.is_empty()
+ }
+
+ pub fn segments(&self) -> &Vec<Segment2> {
+ &self.segments
+ }
+
+ pub fn storages(&self) -> &Vec<Storage> {
+ &self.storage
+ }
+
pub fn active_segment(&self) -> &Segment2 {
self.segments
.last()
@@ -81,28 +93,47 @@ where
.expect("active storage called on empty log")
}
- pub fn indexes(&self) -> Option<&IggyIndexesMut> {
- self.indexes.as_ref()
+ pub fn indexes(&self) -> &Vec<Option<IggyIndexesMut>> {
+ &self.indexes
+ }
+
+ pub fn active_indexes(&self) -> Option<&IggyIndexesMut> {
+ self.indexes
+ .last()
+ .expect("active indexes called on empty log")
+ .as_ref()
}
- pub fn indexes_mut(&mut self) -> Option<&mut IggyIndexesMut> {
- self.indexes.as_mut()
+ pub fn active_indexes_mut(&mut self) -> Option<&mut IggyIndexesMut> {
+ self.indexes
+ .last_mut()
+ .expect("active indexes called on empty log")
+ .as_mut()
}
- pub fn clear_indexes(&mut self) {
- self.indexes = None;
+ pub fn clear_active_indexes(&mut self) {
+ let indexes = self
+ .indexes
+ .last_mut()
+ .expect("active indexes called on empty log");
+ *indexes = None;
}
pub fn ensure_indexes(&mut self) {
- if self.indexes.is_none() {
+ let indexes = self
+ .indexes
+ .last_mut()
+ .expect("active indexes called on empty log");
+ if indexes.is_none() {
let capacity = SIZE_16MB / INDEX_SIZE;
- self.indexes = Some(IggyIndexesMut::with_capacity(capacity, 0));
+ *indexes = Some(IggyIndexesMut::with_capacity(capacity, 0));
}
}
pub fn add_persisted_segment(&mut self, segment: Segment2, storage:
Storage) {
self.segments.push(segment);
self.storage.push(storage);
+ self.indexes.push(None);
}
}
@@ -113,6 +144,10 @@ where
pub fn journal_mut(&mut self) -> &mut J {
&mut self.journal
}
+
+ pub fn journal(&self) -> &J {
+ &self.journal
+ }
}
impl<J> Log for SegmentedLog<J> where J: Journal + Debug {}
diff --git a/core/server/src/streaming/partitions/messages.rs
b/core/server/src/streaming/partitions/messages.rs
index bc4da29b..f11a8021 100644
--- a/core/server/src/streaming/partitions/messages.rs
+++ b/core/server/src/streaming/partitions/messages.rs
@@ -32,12 +32,6 @@ impl Partition {
timestamp: IggyTimestamp,
count: u32,
) -> Result<IggyMessagesBatchSet, IggyError> {
- trace!(
- "Getting {count} messages by timestamp: {} for partition: {}...",
- timestamp.as_micros(),
- self.partition_id
- );
-
if self.segments.is_empty() || count == 0 {
return Ok(IggyMessagesBatchSet::empty());
}
@@ -754,5 +748,4 @@ mod tests {
.expect("Failed to create message with ID")
}
}
-
*/
diff --git a/core/server/src/streaming/segments/indexes/index_reader.rs
b/core/server/src/streaming/segments/indexes/index_reader.rs
index 325d561a..cf4df83c 100644
--- a/core/server/src/streaming/segments/indexes/index_reader.rs
+++ b/core/server/src/streaming/segments/indexes/index_reader.rs
@@ -31,6 +31,7 @@ use ring::error;
use std::{
io::ErrorKind,
os::unix::fs::FileExt,
+ rc::Rc,
sync::{
Arc,
atomic::{AtomicU64, Ordering},
@@ -43,12 +44,15 @@ use tracing::{error, trace};
pub struct IndexReader {
file_path: String,
file: File,
- index_size_bytes: AtomicU64,
+ index_size_bytes: Rc<AtomicU64>,
}
+// Safety: We are guaranteeing that IndexWriter will never be used from
multiple threads
+unsafe impl Send for IndexReader {}
+
impl IndexReader {
/// Opens the index file in read-only mode.
- pub async fn new(file_path: &str, index_size_bytes: AtomicU64) ->
Result<Self, IggyError> {
+ pub async fn new(file_path: &str, index_size_bytes: Rc<AtomicU64>) ->
Result<Self, IggyError> {
let file = OpenOptions::new()
.read(true)
.open(file_path)
diff --git a/core/server/src/streaming/segments/indexes/index_writer.rs
b/core/server/src/streaming/segments/indexes/index_writer.rs
index e905eaec..1c14d14f 100644
--- a/core/server/src/streaming/segments/indexes/index_writer.rs
+++ b/core/server/src/streaming/segments/indexes/index_writer.rs
@@ -22,6 +22,7 @@ use compio::io::AsyncWriteAtExt;
use error_set::ErrContext;
use iggy_common::INDEX_SIZE;
use iggy_common::IggyError;
+use std::rc::Rc;
use std::sync::{
Arc,
atomic::{AtomicU64, Ordering},
@@ -35,15 +36,18 @@ use crate::streaming::utils::PooledBuffer;
pub struct IndexWriter {
file_path: String,
file: File,
- index_size_bytes: AtomicU64,
+ index_size_bytes: Rc<AtomicU64>,
fsync: bool,
}
+// Safety: We are guaranteeing that IndexWriter will never be used from
multiple threads
+unsafe impl Send for IndexWriter {}
+
impl IndexWriter {
/// Opens the index file in write mode.
pub async fn new(
file_path: &str,
- index_size_bytes: AtomicU64,
+ index_size_bytes: Rc<AtomicU64>,
fsync: bool,
file_exists: bool,
) -> Result<Self, IggyError> {
diff --git a/core/server/src/streaming/segments/indexes/indexes_mut.rs
b/core/server/src/streaming/segments/indexes/indexes_mut.rs
index 4c68dea6..ba38697f 100644
--- a/core/server/src/streaming/segments/indexes/indexes_mut.rs
+++ b/core/server/src/streaming/segments/indexes/indexes_mut.rs
@@ -256,7 +256,7 @@ impl IggyIndexesMut {
if relative_start_offset == 0 {
Some(IggyIndexesMut::from_bytes(slice, self.base_position))
} else {
- let position_offset = self.get(relative_start_offset -
1).unwrap().position();
+ let position_offset: u32 = self.get(relative_start_offset -
1).unwrap().position();
Some(IggyIndexesMut::from_bytes(slice, position_offset))
}
}
diff --git a/core/server/src/streaming/segments/messages/messages_reader.rs
b/core/server/src/streaming/segments/messages/messages_reader.rs
index a3ffe3e3..ccdfec07 100644
--- a/core/server/src/streaming/segments/messages/messages_reader.rs
+++ b/core/server/src/streaming/segments/messages/messages_reader.rs
@@ -24,6 +24,7 @@ use compio::fs::{File, OpenOptions};
use compio::io::AsyncReadAtExt;
use error_set::ErrContext;
use iggy_common::IggyError;
+use std::rc::Rc;
use std::{
io::ErrorKind,
sync::{
@@ -38,12 +39,18 @@ use tracing::{error, trace};
pub struct MessagesReader {
file_path: String,
file: File,
- messages_size_bytes: AtomicU64,
+ messages_size_bytes: Rc<AtomicU64>,
}
+// Safety: We are guaranteeing that MessagesReader will never be used from
multiple threads
+unsafe impl Send for MessagesReader {}
+
impl MessagesReader {
/// Opens the messages file in read mode.
- pub async fn new(file_path: &str, messages_size_bytes: AtomicU64) ->
Result<Self, IggyError> {
+ pub async fn new(
+ file_path: &str,
+ messages_size_bytes: Rc<AtomicU64>,
+ ) -> Result<Self, IggyError> {
let file = OpenOptions::new()
.read(true)
.open(file_path)
diff --git a/core/server/src/streaming/segments/messages/messages_writer.rs
b/core/server/src/streaming/segments/messages/messages_writer.rs
index a9bc084d..becc7aea 100644
--- a/core/server/src/streaming/segments/messages/messages_writer.rs
+++ b/core/server/src/streaming/segments/messages/messages_writer.rs
@@ -20,9 +20,12 @@ use crate::streaming::segments::{IggyMessagesBatchSet,
messages::write_batch};
use compio::fs::{File, OpenOptions};
use error_set::ErrContext;
use iggy_common::{IggyByteSize, IggyError};
-use std::sync::{
- Arc,
- atomic::{AtomicU64, Ordering},
+use std::{
+ rc::Rc,
+ sync::{
+ Arc,
+ atomic::{AtomicU64, Ordering},
+ },
};
use tracing::{error, trace};
@@ -31,10 +34,13 @@ use tracing::{error, trace};
pub struct MessagesWriter {
file_path: String,
file: File,
- messages_size_bytes: AtomicU64,
+ messages_size_bytes: Rc<AtomicU64>,
fsync: bool,
}
+// Safety: We are guaranteeing that MessagesWriter will never be used from
multiple threads
+unsafe impl Send for MessagesWriter {}
+
impl MessagesWriter {
/// Opens the messages file in write mode.
///
@@ -43,7 +49,7 @@ impl MessagesWriter {
/// Otherwise, the file is retained in `self.file` for synchronous writes.
pub async fn new(
file_path: &str,
- messages_size_bytes: AtomicU64,
+ messages_size_bytes: Rc<AtomicU64>,
fsync: bool,
file_exists: bool,
) -> Result<Self, IggyError> {
diff --git a/core/server/src/streaming/segments/storage.rs
b/core/server/src/streaming/segments/storage.rs
index 271396b9..737b8947 100644
--- a/core/server/src/streaming/segments/storage.rs
+++ b/core/server/src/streaming/segments/storage.rs
@@ -1,4 +1,5 @@
use iggy_common::IggyError;
+use std::rc::Rc;
use std::sync::atomic::AtomicU64;
use tracing::warn;
@@ -27,25 +28,16 @@ impl Storage {
index_fsync: bool,
file_exists: bool,
) -> Result<Self, IggyError> {
- let messages_writer = MessagesWriter::new(
- messages_path,
- AtomicU64::new(messages_size),
- log_fsync,
- file_exists,
- )
- .await?;
+ let size = Rc::new(AtomicU64::new(messages_size));
+ let indexes_size = Rc::new(AtomicU64::new(indexes_size));
+ let messages_writer =
+ MessagesWriter::new(messages_path, size.clone(), log_fsync,
file_exists).await?;
- let index_writer = IndexWriter::new(
- index_path,
- AtomicU64::new(indexes_size),
- index_fsync,
- file_exists,
- )
- .await?;
+ let index_writer =
+ IndexWriter::new(index_path, indexes_size.clone(), index_fsync,
file_exists).await?;
- let messages_reader =
- MessagesReader::new(messages_path,
AtomicU64::new(messages_size)).await?;
- let index_reader = IndexReader::new(index_path,
AtomicU64::new(indexes_size)).await?;
+ let messages_reader = MessagesReader::new(messages_path, size).await?;
+ let index_reader = IndexReader::new(index_path, indexes_size).await?;
Ok(Self {
messages_writer: Some(messages_writer),