This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch impl_polling_messages
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/impl_polling_messages by this
push:
new 89d668a7 polishing
89d668a7 is described below
commit 89d668a740ef1f8a19193ec860e56d88c8afea5d
Author: numminex <[email protected]>
AuthorDate: Tue Sep 9 20:24:54 2025 +0200
polishing
---
core/server/src/bootstrap.rs | 4 +-
core/server/src/shard/mod.rs | 316 +++++++++++++++++++++++++++++--
core/server/src/shard/system/messages.rs | 45 +++--
core/server/src/slab/helpers.rs | 6 +-
core/server/src/slab/streams.rs | 110 +++++------
5 files changed, 381 insertions(+), 100 deletions(-)
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index 4678b146..dd88f721 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -350,9 +350,9 @@ pub fn create_shard_executor(cpu_set: HashSet<usize>) ->
Runtime {
let mut proactor = compio::driver::ProactorBuilder::new();
proactor
- .capacity(4096)
+ .capacity(1024)
.coop_taskrun(true)
- .taskrun_flag(false); // TODO: Try enabling this.
+ .taskrun_flag(true); // TODO: Try enabling this.
// FIXME(hubcio): Only set thread_pool_limit(0) on non-macOS platforms
// This causes a freeze on macOS with compio fs operations
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 8263942a..8794f665 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -32,7 +32,8 @@ use error_set::ErrContext;
use futures::future::try_join_all;
use hash32::{Hasher, Murmur3Hasher};
use iggy_common::{
- EncryptorKind, Identifier, IggyError, Permissions, UserId, UserStatus,
+ EncryptorKind, Identifier, IggyError, IggyTimestamp, Permissions,
PollingKind, UserId,
+ UserStatus,
defaults::{DEFAULT_ROOT_PASSWORD, DEFAULT_ROOT_USERNAME},
locking::IggyRwLockFn,
};
@@ -50,10 +51,11 @@ use std::{
},
time::{Duration, Instant},
};
-use tracing::{error, info, instrument, warn};
+use tracing::{error, info, instrument, trace, warn};
use transmission::connector::{Receiver, ShardConnector, StopReceiver,
StopSender};
use crate::{
+ binary::handlers::messages::poll_messages_handler::IggyPollMetadata,
configs::server::ServerConfig,
http::http_server,
io::fs_utils,
@@ -77,7 +79,9 @@ use crate::{
streaming::{
clients::client_manager::ClientManager,
diagnostics::metrics::Metrics,
+ partitions,
personal_access_tokens::personal_access_token::PersonalAccessToken,
+ polling_consumer::PollingConsumer,
session::Session,
storage::SystemStorage,
streams, topics,
@@ -345,24 +349,312 @@ impl IggyShard {
}
async fn handle_request(&self, request: ShardRequest) ->
Result<ShardResponse, IggyError> {
- todo!();
- /*
- let stream =
self.get_stream(&Identifier::numeric(request.stream_id)?)?;
- let topic = stream.get_topic(&Identifier::numeric(request.topic_id)?)?;
+ let stream_id = request.stream_id;
+ let topic_id = request.topic_id;
let partition_id = request.partition_id;
match request.payload {
ShardRequestPayload::SendMessages { batch } => {
- topic.append_messages(partition_id, batch).await?;
+ let mut batch = self.maybe_encrypt_messages(batch)?;
+ let messages_count = batch.count();
+
+ let current_offset = self.streams2.with_partition_by_id(
+ &stream_id,
+ &topic_id,
+ partition_id,
+ partitions::helpers::calculate_current_offset(),
+ );
+
+ self.streams2
+ .with_partition_by_id_async(
+ &stream_id,
+ &topic_id,
+ partition_id,
+
partitions::helpers::deduplicate_messages(current_offset, &mut batch),
+ )
+ .await;
+
+ let (journal_messages_count, journal_size) =
+ self.streams2.with_partition_by_id_mut(
+ &stream_id,
+ &topic_id,
+ partition_id,
+ partitions::helpers::append_to_journal(self.id,
current_offset, batch),
+ )?;
+
+ let unsaved_messages_count_exceeded = journal_messages_count
+ >= self.config.system.partition.messages_required_to_save;
+ let unsaved_messages_size_exceeded = journal_size
+ >= self
+ .config
+ .system
+ .partition
+ .size_of_messages_required_to_save
+ .as_bytes_u64() as u32;
+
+ let is_full = self.streams2.with_partition_by_id(
+ &stream_id,
+ &topic_id,
+ partition_id,
+ partitions::helpers::is_segment_full(),
+ );
+
+ // Try committing the journal
+ if is_full || unsaved_messages_count_exceeded ||
unsaved_messages_size_exceeded {
+ self.streams2
+ .persist_messages(
+ self.id,
+ &stream_id,
+ &topic_id,
+ partition_id,
+ unsaved_messages_count_exceeded,
+ unsaved_messages_size_exceeded,
+ journal_messages_count,
+ journal_size,
+ &self.config.system,
+ )
+ .await?;
+
+ if is_full {
+ self.streams2
+ .handle_full_segment(
+ self.id,
+ &stream_id,
+ &topic_id,
+ partition_id,
+ &self.config.system,
+ )
+ .await?;
+ self.metrics.increment_messages(messages_count as u64);
+ }
+ }
Ok(ShardResponse::SendMessages)
}
ShardRequestPayload::PollMessages { args, consumer } => {
- let (metadata, batch) = topic
- .get_messages(consumer, partition_id, args.strategy,
args.count)
- .await?;
- Ok(ShardResponse::PollMessages((metadata, batch)))
+ let current_offset = self.streams2.with_partition_by_id(
+ &stream_id,
+ &topic_id,
+ partition_id,
+ |(_, _, _, offset, ..)| offset.load(Ordering::Relaxed),
+ );
+ let metadata = IggyPollMetadata::new(partition_id as u32,
current_offset);
+ let count = args.count;
+ let strategy = args.strategy;
+ let value = strategy.value;
+ let batches = match strategy.kind {
+ PollingKind::Offset => {
+ let offset = value;
+ // We have to remember to keep the invariant from the
if that is on line 496.
+ // Alternatively a better design would be to get rid
of that if and move the validations here.
+ let batches = self
+ .streams2
+ .get_messages_by_offset(
+ &stream_id,
+ &topic_id,
+ partition_id,
+ offset,
+ count,
+ )
+ .await?;
+ Ok(batches)
+ }
+ PollingKind::Timestamp => {
+ let timestamp = IggyTimestamp::from(value);
+ let timestamp_ts = timestamp.as_micros();
+ trace!(
+ "Getting {count} messages by timestamp: {} for
partition: {}...",
+ timestamp_ts, partition_id
+ );
+
+ let batches = self
+ .streams2
+ .get_messages_by_timestamp(
+ &stream_id,
+ &topic_id,
+ partition_id,
+ timestamp_ts,
+ count,
+ )
+ .await?;
+ Ok(batches)
+ }
+ PollingKind::First => {
+ let first_offset = self.streams2.with_partition_by_id(
+ &stream_id,
+ &topic_id,
+ partition_id,
+ |(_, _, _, _, _, _, log)| {
+ log.segments()
+ .first()
+ .map(|segment| segment.start_offset)
+ .unwrap_or(0)
+ },
+ );
+
+ let batches = self
+ .streams2
+ .get_messages_by_offset(
+ &stream_id,
+ &topic_id,
+ partition_id,
+ first_offset,
+ count,
+ )
+ .await?;
+ Ok(batches)
+ }
+ PollingKind::Last => {
+ let (start_offset, actual_count) =
self.streams2.with_partition_by_id(
+ &stream_id,
+ &topic_id,
+ partition_id,
+ |(_, _, _, offset, _, _, _)| {
+ let current_offset =
offset.load(Ordering::Relaxed);
+ let mut requested_count = 0;
+ if requested_count > current_offset + 1 {
+ requested_count = current_offset + 1
+ }
+ let start_offset = 1 + current_offset -
requested_count;
+ (start_offset, requested_count as u32)
+ },
+ );
+
+ let batches = self
+ .streams2
+ .get_messages_by_offset(
+ &stream_id,
+ &topic_id,
+ partition_id,
+ start_offset,
+ actual_count,
+ )
+ .await?;
+ Ok(batches)
+ }
+ PollingKind::Next => {
+ let (consumer_offset, consumer_id) = match consumer {
+ PollingConsumer::Consumer(consumer_id, _) => (
+ self.streams2
+ .with_partition_by_id(
+ &stream_id,
+ &topic_id,
+ partition_id,
+
partitions::helpers::get_consumer_offset(consumer_id),
+ )
+ .map(|c_offset| c_offset.stored_offset),
+ consumer_id,
+ ),
+ PollingConsumer::ConsumerGroup(cg_id, _) => (
+ self.streams2
+ .with_partition_by_id(
+ &stream_id,
+ &topic_id,
+ partition_id,
+
partitions::helpers::get_consumer_group_member_offset(
+ cg_id,
+ ),
+ )
+ .map(|cg_offset| cg_offset.stored_offset),
+ cg_id,
+ ),
+ };
+
+ let Some(consumer_offset) = consumer_offset else {
+ return
Err(IggyError::ConsumerOffsetNotFound(consumer_id));
+ };
+ let offset = consumer_offset + 1;
+ trace!(
+ "Getting next messages for consumer id: {} for
partition: {} from offset: {}...",
+ consumer_id, partition_id, offset
+ );
+ let batches = self
+ .streams2
+ .get_messages_by_offset(
+ &stream_id,
+ &topic_id,
+ partition_id,
+ offset,
+ count,
+ )
+ .await?;
+ Ok(batches)
+ }
+ }?;
+
+ let numeric_stream_id = self.streams2.with_stream_by_id(
+ &stream_id,
+ crate::streaming::streams::helpers::get_stream_id(),
+ );
+ let numeric_topic_id = self.streams2.with_topic_by_id(
+ &stream_id,
+ &topic_id,
+ crate::streaming::topics::helpers::get_topic_id(),
+ );
+
+ if args.auto_commit && !batches.is_empty() {
+ let offset = batches
+ .last_offset()
+ .expect("Batch set should have at least one batch");
+ trace!(
+ "Last offset: {} will be automatically stored for {},
stream: {}, topic: {}, partition: {}",
+ offset, consumer, numeric_stream_id, numeric_topic_id,
partition_id
+ );
+ match consumer {
+ PollingConsumer::Consumer(consumer_id, _) => {
+ self.streams2.with_partition_by_id(
+ &stream_id,
+ &topic_id,
+ partition_id,
+ partitions::helpers::store_consumer_offset(
+ consumer_id,
+ numeric_stream_id,
+ numeric_topic_id,
+ partition_id,
+ offset,
+ &self.config.system,
+ ),
+ );
+ self.streams2
+ .with_partition_by_id_async(
+ &stream_id,
+ &topic_id,
+ partition_id,
+
partitions::helpers::persist_consumer_offset_to_disk(
+ self.id,
+ consumer_id,
+ ),
+ )
+ .await?;
+ }
+ PollingConsumer::ConsumerGroup(cg_id, _) => {
+ self.streams2.with_partition_by_id(
+ &stream_id,
+ &topic_id,
+ partition_id,
+
partitions::helpers::store_consumer_group_member_offset(
+ cg_id,
+ numeric_stream_id,
+ numeric_topic_id,
+ partition_id,
+ offset,
+ &self.config.system,
+ ),
+ );
+ self.streams2.with_partition_by_id_async(
+ &stream_id,
+ &topic_id,
+ partition_id,
+
partitions::helpers::persist_consumer_group_member_offset_to_disk(
+ self.id,
+ cg_id,
+ ),
+ )
+ .await?;
+ }
+ }
+ }
+ Ok(ShardResponse::PollMessages((metadata, batches)))
}
}
- */
}
async fn handle_event(&self, event: ShardEvent) -> Result<(), IggyError> {
diff --git a/core/server/src/shard/system/messages.rs
b/core/server/src/shard/system/messages.rs
index 0acea3ee..a3b3e2c6 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -26,14 +26,12 @@ use crate::shard::transmission::frame::ShardResponse;
use crate::shard::transmission::message::{
ShardMessage, ShardRequest, ShardRequestPayload, ShardSendRequestResult,
};
+use crate::shard_trace;
use crate::streaming::polling_consumer::PollingConsumer;
-use crate::streaming::segments::{
- IggyIndexesMut, IggyMessagesBatchMut, IggyMessagesBatchSet,
-};
+use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut,
IggyMessagesBatchSet};
use crate::streaming::session::Session;
use crate::streaming::utils::{PooledBuffer, hash};
use crate::streaming::{partitions, streams, topics};
-use crate::shard_trace;
use error_set::ErrContext;
use iggy_common::{
@@ -186,26 +184,30 @@ impl IggyShard {
// Try committing the journal
if is_full || unsaved_messages_count_exceeded ||
unsaved_messages_size_exceeded
{
- self.streams2.persist_messages(
- self.id,
- stream_id,
- topic_id,
- partition_id,
- unsaved_messages_count_exceeded,
- unsaved_messages_size_exceeded,
- journal_messages_count,
- journal_size,
- &self.config.system,
- ).await?;
-
- if is_full {
- self.streams2.handle_full_segment(
+ self.streams2
+ .persist_messages(
self.id,
stream_id,
topic_id,
partition_id,
+ unsaved_messages_count_exceeded,
+ unsaved_messages_size_exceeded,
+ journal_messages_count,
+ journal_size,
&self.config.system,
- ).await?;
+ )
+ .await?;
+
+ if is_full {
+ self.streams2
+ .handle_full_segment(
+ self.id,
+ stream_id,
+ topic_id,
+ partition_id,
+ &self.config.system,
+ )
+ .await?;
self.metrics.increment_messages(messages_count as
u64);
}
}
@@ -270,7 +272,10 @@ impl IggyShard {
let has_partition = self
.streams2
.with_topic_by_id(stream_id, topic_id, |(root, ..)| {
- root.partitions().exists(partition_id)
+ let partitions = root.partitions();
+ tracing::warn!("partitions: {:?}, looking for part_id: {}",
partitions, partition_id);
+ partitions.exists(partition_id)
+ //root.partitions().exists(partition_id)
});
if !has_partition {
return Err(IggyError::NoPartitions(
diff --git a/core/server/src/slab/helpers.rs b/core/server/src/slab/helpers.rs
index 3f72b20a..31a5a8b7 100644
--- a/core/server/src/slab/helpers.rs
+++ b/core/server/src/slab/helpers.rs
@@ -27,11 +27,11 @@ where
async |(root, ..)| f(root.topics()).await
}
-pub fn topics_mut<O, F>(f: F) -> impl FnOnce(ComponentsById<StreamRefMut>) -> O
+pub fn topics_mut<O, F>(f: F) -> impl FnOnce(ComponentsById<StreamRef>) -> O
where
- F: for<'a> FnOnce(&'a mut Topics) -> O,
+ F: for<'a> FnOnce(&'a Topics) -> O,
{
- |(mut root, ..)| f(root.topics_mut())
+ |(root, ..)| f(root.topics())
}
pub fn partitions<O, F>(f: F) -> impl FnOnce(ComponentsById<TopicRef>) -> O
diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs
index de085238..36dadb79 100644
--- a/core/server/src/slab/streams.rs
+++ b/core/server/src/slab/streams.rs
@@ -1,18 +1,30 @@
use crate::{
- configs::{cache_indexes::CacheIndexesConfig, system::SystemConfig}, slab::{
- consumer_groups::ConsumerGroups, helpers, partitions::{self,
Partitions}, topics::Topics, traits_ext::{
+ configs::{cache_indexes::CacheIndexesConfig, system::SystemConfig},
+ slab::{
+ Keyed,
+ consumer_groups::ConsumerGroups,
+ helpers,
+ partitions::{self, Partitions},
+ topics::Topics,
+ traits_ext::{
ComponentsById, DeleteCell, EntityComponentSystem,
EntityComponentSystemMutCell,
InsertCell, InteriorMutability, IntoComponents,
- }, Keyed
- }, streaming::{
+ },
+ },
+ streaming::{
partitions::partition2::{PartitionRef, PartitionRefMut},
- segments::{storage::create_segment_storage, IggyMessagesBatchSet,
Segment2},
+ segments::{IggyMessagesBatchSet, Segment2,
storage::create_segment_storage},
stats::stats::StreamStats,
- streams::{self, stream2::{self, StreamRef, StreamRefMut}},
+ streams::{
+ self,
+ stream2::{self, StreamRef, StreamRefMut},
+ },
topics::{
- self, consumer_group2::{ConsumerGroupRef, ConsumerGroupRefMut},
topic2::{TopicRef, TopicRefMut}
+ self,
+ consumer_group2::{ConsumerGroupRef, ConsumerGroupRefMut},
+ topic2::{TopicRef, TopicRefMut},
},
- }
+ },
};
use ahash::AHashMap;
use iggy_common::{Identifier, IggyError};
@@ -208,9 +220,9 @@ impl Streams {
pub fn with_topics_mut<T>(
&self,
stream_id: &Identifier,
- f: impl FnOnce(&mut Topics) -> T,
+ f: impl FnOnce(&Topics) -> T,
) -> T {
- self.with_stream_by_id_mut(stream_id, helpers::topics_mut(f))
+ self.with_stream_by_id(stream_id, helpers::topics_mut(f))
}
pub fn with_topic_by_id<T>(
@@ -446,44 +458,26 @@ impl Streams {
partition_id: partitions::ContainerId,
config: &crate::configs::system::SystemConfig,
) -> Result<(), IggyError> {
- let numeric_stream_id = self
- .with_stream_by_id(stream_id, streams::helpers::get_stream_id());
- let numeric_topic_id = self.with_topic_by_id(
- stream_id,
- topic_id,
- topics::helpers::get_topic_id(),
- );
+ let numeric_stream_id =
+ self.with_stream_by_id(stream_id,
streams::helpers::get_stream_id());
+ let numeric_topic_id =
+ self.with_topic_by_id(stream_id, topic_id,
topics::helpers::get_topic_id());
- if config.segment.cache_indexes
- == CacheIndexesConfig::OpenSegment
- || config.segment.cache_indexes
- == CacheIndexesConfig::None
+ if config.segment.cache_indexes == CacheIndexesConfig::OpenSegment
+ || config.segment.cache_indexes == CacheIndexesConfig::None
{
- self.with_partition_by_id_mut(
- stream_id,
- topic_id,
- partition_id,
- |(.., log)| {
- log.clear_active_indexes();
- },
- );
+ self.with_partition_by_id_mut(stream_id, topic_id, partition_id,
|(.., log)| {
+ log.clear_active_indexes();
+ });
}
- self.with_partition_by_id_mut(
- stream_id,
- topic_id,
- partition_id,
- |(.., log)| {
- log.active_segment_mut().sealed = true;
- },
- );
+ self.with_partition_by_id_mut(stream_id, topic_id, partition_id, |(..,
log)| {
+ log.active_segment_mut().sealed = true;
+ });
let (log_writer, index_writer) =
- self.with_partition_by_id_mut(
- stream_id,
- topic_id,
- partition_id,
- |(.., log)| log.active_storage_mut().shutdown(),
- );
+ self.with_partition_by_id_mut(stream_id, topic_id, partition_id,
|(.., log)| {
+ log.active_storage_mut().shutdown()
+ });
compio::runtime::spawn(async move {
let _ = log_writer.fsync().await;
@@ -496,18 +490,13 @@ impl Streams {
.detach();
let (start_offset, size, end_offset) =
- self.with_partition_by_id(
- stream_id,
- topic_id,
- partition_id,
- |(.., log)| {
- (
- log.active_segment().start_offset,
- log.active_segment().size,
- log.active_segment().end_offset,
- )
- },
- );
+ self.with_partition_by_id(stream_id, topic_id, partition_id, |(..,
log)| {
+ (
+ log.active_segment().start_offset,
+ log.active_segment().size,
+ log.active_segment().end_offset,
+ )
+ });
crate::shard_info!(
shard_id,
@@ -538,14 +527,9 @@ impl Streams {
end_offset + 1,
)
.await?;
- self.with_partition_by_id_mut(
- stream_id,
- topic_id,
- partition_id,
- |(.., log)| {
- log.add_persisted_segment(segment, storage);
- },
- );
+ self.with_partition_by_id_mut(stream_id, topic_id, partition_id, |(..,
log)| {
+ log.add_persisted_segment(segment, storage);
+ });
Ok(())
}