This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch bench-expiry in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 704fb5cc962cac1285a0d9aa7d3bcf0c9bd69ac8 Author: Hubert Gruszecki <[email protected]> AuthorDate: Tue Feb 3 14:33:16 2026 +0100 feat(bench): add --message-expiry CLI option for benchmark topics Benchmarks previously created topics with hardcoded NeverExpire policy, preventing testing of expiry-related scenarios. Add -e/--message-expiry flag to all producer benchmark kinds, allowing configurable message expiry (e.g., "1s", "5min", "1h"). Consumer-only benchmarks use the trait default since they don't create topics. --- core/bench/src/args/common.rs | 6 +++++- core/bench/src/args/examples.rs | 1 + core/bench/src/args/kinds/balanced/consumer_group.rs | 2 +- core/bench/src/args/kinds/balanced/producer.rs | 10 +++++++++- .../src/args/kinds/balanced/producer_and_consumer_group.rs | 10 +++++++++- core/bench/src/args/kinds/end_to_end/producing_consumer.rs | 10 +++++++++- .../src/args/kinds/end_to_end/producing_consumer_group.rs | 10 +++++++++- core/bench/src/args/kinds/pinned/producer.rs | 10 +++++++++- core/bench/src/args/kinds/pinned/producer_and_consumer.rs | 10 +++++++++- core/bench/src/args/props.rs | 5 ++++- core/bench/src/benchmarks/benchmark.rs | 7 ++++--- 11 files changed, 69 insertions(+), 12 deletions(-) diff --git a/core/bench/src/args/common.rs b/core/bench/src/args/common.rs index ebc54e48b..be983c209 100644 --- a/core/bench/src/args/common.rs +++ b/core/bench/src/args/common.rs @@ -30,7 +30,7 @@ use bench_report::benchmark_kind::BenchmarkKind; use bench_report::numeric_parameter::BenchmarkNumericParameter; use clap::error::ErrorKind; use clap::{CommandFactory, Parser}; -use iggy::prelude::{IggyByteSize, IggyDuration, TransportProtocol}; +use iggy::prelude::{IggyByteSize, IggyDuration, IggyExpiry, TransportProtocol}; use std::num::NonZeroU32; use std::str::FromStr; @@ -291,6 +291,10 @@ impl IggyBenchArgs { self.benchmark_kind.inner().max_topic_size() } + pub fn message_expiry(&self) -> IggyExpiry { + self.benchmark_kind.inner().message_expiry() + } + pub fn read_amplification(&self) -> Option<f32> { self.benchmark_kind.inner().read_amplification() } diff --git a/core/bench/src/args/examples.rs b/core/bench/src/args/examples.rs index 7a00929e5..ebd39ba91 100644 --- a/core/bench/src/args/examples.rs +++ b/core/bench/src/args/examples.rs @@ -68,6 +68,7 @@ const EXAMPLES: &str = r#"EXAMPLES: --producers (-c): Number of producers --consumers (-c): Number of consumers --max-topic-size (-T): Max topic size (e.g., "1GiB") + --message-expiry (-e): Message expiry time (e.g., "1s", "5min", "1h") Examples with detailed configuration: diff --git a/core/bench/src/args/kinds/balanced/consumer_group.rs b/core/bench/src/args/kinds/balanced/consumer_group.rs index a3c0fe2cc..ceb1d5ad8 100644 --- a/core/bench/src/args/kinds/balanced/consumer_group.rs +++ b/core/bench/src/args/kinds/balanced/consumer_group.rs @@ -82,7 +82,7 @@ impl BenchmarkKindProps for BalancedConsumerGroupArgs { cmd.error( ErrorKind::ArgumentConflict, format!( - "In balanced consumer group, consumer groups number ({cg_number}) must be less than the number of streams ({streams})" + "In balanced consumer group, consumer groups number ({cg_number}) must be greater than or equal to the number of streams ({streams})" ), ) .exit(); diff --git a/core/bench/src/args/kinds/balanced/producer.rs b/core/bench/src/args/kinds/balanced/producer.rs index 02a222288..3190fc9fd 100644 --- a/core/bench/src/args/kinds/balanced/producer.rs +++ b/core/bench/src/args/kinds/balanced/producer.rs @@ -26,7 +26,7 @@ use crate::args::{ transport::BenchmarkTransportCommand, }; use clap::{CommandFactory, Parser, error::ErrorKind}; -use iggy::prelude::IggyByteSize; +use iggy::prelude::{IggyByteSize, IggyExpiry}; use std::num::NonZeroU32; /// N producers sending to N separated stream-topic with single partition (one stream per one producer) @@ -50,6 +50,10 @@ pub struct BalancedProducerArgs { /// Max topic size in human readable format, e.g. "1GiB", "2MB", "1GiB". If not provided then the server default will be used. #[arg(long, short = 'T')] pub max_topic_size: Option<IggyByteSize>, + + /// Message expiry time in human readable format, e.g. "1s", "5min", "1h". If not provided, messages never expire. + #[arg(long, short = 'e')] + pub message_expiry: Option<IggyExpiry>, } impl BenchmarkKindProps for BalancedProducerArgs { @@ -81,6 +85,10 @@ impl BenchmarkKindProps for BalancedProducerArgs { self.max_topic_size } + fn message_expiry(&self) -> IggyExpiry { + self.message_expiry.unwrap_or(IggyExpiry::NeverExpire) + } + fn validate(&self) { let partitions = self.partitions(); let mut cmd = IggyBenchArgs::command(); diff --git a/core/bench/src/args/kinds/balanced/producer_and_consumer_group.rs b/core/bench/src/args/kinds/balanced/producer_and_consumer_group.rs index a18c869e8..cedf3fff6 100644 --- a/core/bench/src/args/kinds/balanced/producer_and_consumer_group.rs +++ b/core/bench/src/args/kinds/balanced/producer_and_consumer_group.rs @@ -27,7 +27,7 @@ use crate::args::{ transport::BenchmarkTransportCommand, }; use clap::{CommandFactory, Parser, error::ErrorKind}; -use iggy::prelude::IggyByteSize; +use iggy::prelude::{IggyByteSize, IggyExpiry}; use std::num::NonZeroU32; /// Polling benchmark with consumer group @@ -60,6 +60,10 @@ pub struct BalancedProducerAndConsumerGroupArgs { #[arg(long, short = 'T')] pub max_topic_size: Option<IggyByteSize>, + /// Message expiry time in human readable format, e.g. "1s", "5min", "1h". If not provided, messages never expire. + #[arg(long, short = 'e')] + pub message_expiry: Option<IggyExpiry>, + /// Consumer rate limit multiplier relative to producer rate. /// When measuring E2E latency, consumers may need higher throughput to prevent queue buildup. /// Default is 1.05 (5% higher than producer rate). Set to 1.0 to disable. @@ -124,4 +128,8 @@ impl BenchmarkKindProps for BalancedProducerAndConsumerGroupArgs { fn max_topic_size(&self) -> Option<IggyByteSize> { self.max_topic_size } + + fn message_expiry(&self) -> IggyExpiry { + self.message_expiry.unwrap_or(IggyExpiry::NeverExpire) + } } diff --git a/core/bench/src/args/kinds/end_to_end/producing_consumer.rs b/core/bench/src/args/kinds/end_to_end/producing_consumer.rs index 57c0908d6..d4b20645e 100644 --- a/core/bench/src/args/kinds/end_to_end/producing_consumer.rs +++ b/core/bench/src/args/kinds/end_to_end/producing_consumer.rs @@ -23,7 +23,7 @@ use crate::args::{ transport::BenchmarkTransportCommand, }; use clap::{CommandFactory, Parser, error::ErrorKind}; -use iggy::prelude::IggyByteSize; +use iggy::prelude::{IggyByteSize, IggyExpiry}; use std::num::NonZeroU32; #[derive(Parser, Debug, Clone)] @@ -42,6 +42,10 @@ pub struct EndToEndProducingConsumerArgs { /// Max topic size in human readable format, e.g. "1GiB", "2MB", "1GiB". If not provided then the server default will be used. #[arg(long, short = 'T')] pub max_topic_size: Option<IggyByteSize>, + + /// Message expiry time in human readable format, e.g. "1s", "5min", "1h". If not provided, messages never expire. + #[arg(long, short = 'e')] + pub message_expiry: Option<IggyExpiry>, } impl BenchmarkKindProps for EndToEndProducingConsumerArgs { @@ -73,6 +77,10 @@ impl BenchmarkKindProps for EndToEndProducingConsumerArgs { self.max_topic_size } + fn message_expiry(&self) -> IggyExpiry { + self.message_expiry.unwrap_or(IggyExpiry::NeverExpire) + } + fn validate(&self) { let mut cmd = IggyBenchArgs::command(); let streams = self.streams(); diff --git a/core/bench/src/args/kinds/end_to_end/producing_consumer_group.rs b/core/bench/src/args/kinds/end_to_end/producing_consumer_group.rs index 88cb58c2c..adcfd7e60 100644 --- a/core/bench/src/args/kinds/end_to_end/producing_consumer_group.rs +++ b/core/bench/src/args/kinds/end_to_end/producing_consumer_group.rs @@ -27,7 +27,7 @@ use crate::args::{ transport::BenchmarkTransportCommand, }; use clap::{CommandFactory, Parser, error::ErrorKind}; -use iggy::prelude::IggyByteSize; +use iggy::prelude::{IggyByteSize, IggyExpiry}; use std::num::NonZeroU32; #[derive(Parser, Debug, Clone)] @@ -58,6 +58,10 @@ pub struct EndToEndProducingConsumerGroupArgs { /// Max topic size in human readable format, e.g. "1GiB", "2MB", "1GiB". If not provided then topic size will be unlimited. #[arg(long, short = 'T')] pub max_topic_size: Option<IggyByteSize>, + + /// Message expiry time in human readable format, e.g. "1s", "5min", "1h". If not provided, messages never expire. + #[arg(long, short = 'e')] + pub message_expiry: Option<IggyExpiry>, } impl BenchmarkKindProps for EndToEndProducingConsumerGroupArgs { @@ -89,6 +93,10 @@ impl BenchmarkKindProps for EndToEndProducingConsumerGroupArgs { self.max_topic_size } + fn message_expiry(&self) -> IggyExpiry { + self.message_expiry.unwrap_or(IggyExpiry::NeverExpire) + } + fn validate(&self) { let mut cmd = IggyBenchArgs::command(); let streams = self.streams(); diff --git a/core/bench/src/args/kinds/pinned/producer.rs b/core/bench/src/args/kinds/pinned/producer.rs index 138988fb6..e338745fa 100644 --- a/core/bench/src/args/kinds/pinned/producer.rs +++ b/core/bench/src/args/kinds/pinned/producer.rs @@ -21,7 +21,7 @@ use crate::args::{ transport::BenchmarkTransportCommand, }; use clap::{CommandFactory, Parser, error::ErrorKind}; -use iggy::prelude::IggyByteSize; +use iggy::prelude::{IggyByteSize, IggyExpiry}; use std::num::NonZeroU32; #[derive(Parser, Debug, Clone)] @@ -41,6 +41,10 @@ pub struct PinnedProducerArgs { /// Max topic size in human readable format, e.g. "1GiB", "2MiB", "1GiB". If not provided then the server default will be used. #[arg(long, short = 'T')] pub max_topic_size: Option<IggyByteSize>, + + /// Message expiry time in human readable format, e.g. "1s", "5min", "1h". If not provided, messages never expire. + #[arg(long, short = 'e')] + pub message_expiry: Option<IggyExpiry>, } impl BenchmarkKindProps for PinnedProducerArgs { @@ -72,6 +76,10 @@ impl BenchmarkKindProps for PinnedProducerArgs { self.max_topic_size } + fn message_expiry(&self) -> IggyExpiry { + self.message_expiry.unwrap_or(IggyExpiry::NeverExpire) + } + fn validate(&self) { let mut cmd = IggyBenchArgs::command(); let streams = self.streams(); diff --git a/core/bench/src/args/kinds/pinned/producer_and_consumer.rs b/core/bench/src/args/kinds/pinned/producer_and_consumer.rs index 6777e253c..3e3eb7d47 100644 --- a/core/bench/src/args/kinds/pinned/producer_and_consumer.rs +++ b/core/bench/src/args/kinds/pinned/producer_and_consumer.rs @@ -26,7 +26,7 @@ use crate::args::{ transport::BenchmarkTransportCommand, }; use clap::{CommandFactory, Parser, error::ErrorKind}; -use iggy::prelude::IggyByteSize; +use iggy::prelude::{IggyByteSize, IggyExpiry}; use std::num::NonZeroU32; #[derive(Parser, Debug, Clone)] @@ -55,6 +55,10 @@ pub struct PinnedProducerAndConsumerArgs { #[arg(long, short = 'T')] pub max_topic_size: Option<IggyByteSize>, + /// Message expiry time in human readable format, e.g. "1s", "5min", "1h". If not provided, messages never expire. + #[arg(long, short = 'e')] + pub message_expiry: Option<IggyExpiry>, + /// Consumer rate limit multiplier relative to producer rate. /// When measuring E2E latency, consumers may need higher throughput to prevent queue buildup. /// Default is 1.05 (5% higher than producer rate). Set to 1.0 to disable. @@ -91,6 +95,10 @@ impl BenchmarkKindProps for PinnedProducerAndConsumerArgs { self.max_topic_size } + fn message_expiry(&self) -> IggyExpiry { + self.message_expiry.unwrap_or(IggyExpiry::NeverExpire) + } + fn read_amplification(&self) -> Option<f32> { Some(self.read_amplification) } diff --git a/core/bench/src/args/props.rs b/core/bench/src/args/props.rs index 2230163e8..aaff93db1 100644 --- a/core/bench/src/args/props.rs +++ b/core/bench/src/args/props.rs @@ -17,7 +17,7 @@ */ use super::{output::BenchmarkOutputCommand, transport::BenchmarkTransportCommand}; -use iggy::prelude::{IggyByteSize, TransportProtocol}; +use iggy::prelude::{IggyByteSize, IggyExpiry, TransportProtocol}; pub trait BenchmarkKindProps { fn streams(&self) -> u32; @@ -27,6 +27,9 @@ pub trait BenchmarkKindProps { fn producers(&self) -> u32; fn transport_command(&self) -> &BenchmarkTransportCommand; fn max_topic_size(&self) -> Option<IggyByteSize>; + fn message_expiry(&self) -> IggyExpiry { + IggyExpiry::NeverExpire + } fn validate(&self); /// Consumer rate limit multiplier relative to producer rate. diff --git a/core/bench/src/benchmarks/benchmark.rs b/core/bench/src/benchmarks/benchmark.rs index 6a334b4f0..594a00101 100644 --- a/core/bench/src/benchmarks/benchmark.rs +++ b/core/bench/src/benchmarks/benchmark.rs @@ -112,10 +112,11 @@ pub trait Benchmarkable: Send { .args() .max_topic_size() .map_or(MaxTopicSize::Unlimited, MaxTopicSize::Custom); + let message_expiry = self.args().message_expiry(); info!( - "Creating the test topic '{}' for stream '{}' with max topic size: {:?}", - topic_name, stream_name, max_topic_size + "Creating the test topic '{}' for stream '{}' with max topic size: {:?}, message expiry: {}", + topic_name, stream_name, max_topic_size, message_expiry ); client @@ -125,7 +126,7 @@ pub trait Benchmarkable: Send { partitions_count, CompressionAlgorithm::default(), None, - IggyExpiry::NeverExpire, + message_expiry, max_topic_size, ) .await?;
