This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new cb0a5a6a3 feat(bench): add --message-expiry CLI option for benchmark
topics (#2672)
cb0a5a6a3 is described below
commit cb0a5a6a3cc202e08abd6e1e513a90d6282b253f
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Tue Feb 3 14:56:23 2026 +0100
feat(bench): add --message-expiry CLI option for benchmark topics (#2672)
Benchmarks previously created topics with hardcoded NeverExpire policy,
preventing testing of expiry-related scenarios.
Add -e/--message-expiry flag to all producer benchmark kinds, allowing
configurable message expiry (e.g., "1s", "5min", "1h"). Consumer-only
benchmarks use the trait default since they don't create topics.
---
core/bench/src/args/common.rs | 6 +++++-
core/bench/src/args/examples.rs | 1 +
core/bench/src/args/kinds/balanced/consumer_group.rs | 2 +-
core/bench/src/args/kinds/balanced/producer.rs | 10 +++++++++-
.../src/args/kinds/balanced/producer_and_consumer_group.rs | 10 +++++++++-
core/bench/src/args/kinds/end_to_end/producing_consumer.rs | 10 +++++++++-
.../src/args/kinds/end_to_end/producing_consumer_group.rs | 10 +++++++++-
core/bench/src/args/kinds/pinned/producer.rs | 10 +++++++++-
core/bench/src/args/kinds/pinned/producer_and_consumer.rs | 10 +++++++++-
core/bench/src/args/props.rs | 5 ++++-
core/bench/src/benchmarks/benchmark.rs | 7 ++++---
11 files changed, 69 insertions(+), 12 deletions(-)
diff --git a/core/bench/src/args/common.rs b/core/bench/src/args/common.rs
index ebc54e48b..be983c209 100644
--- a/core/bench/src/args/common.rs
+++ b/core/bench/src/args/common.rs
@@ -30,7 +30,7 @@ use bench_report::benchmark_kind::BenchmarkKind;
use bench_report::numeric_parameter::BenchmarkNumericParameter;
use clap::error::ErrorKind;
use clap::{CommandFactory, Parser};
-use iggy::prelude::{IggyByteSize, IggyDuration, TransportProtocol};
+use iggy::prelude::{IggyByteSize, IggyDuration, IggyExpiry, TransportProtocol};
use std::num::NonZeroU32;
use std::str::FromStr;
@@ -291,6 +291,10 @@ impl IggyBenchArgs {
self.benchmark_kind.inner().max_topic_size()
}
+ pub fn message_expiry(&self) -> IggyExpiry {
+ self.benchmark_kind.inner().message_expiry()
+ }
+
pub fn read_amplification(&self) -> Option<f32> {
self.benchmark_kind.inner().read_amplification()
}
diff --git a/core/bench/src/args/examples.rs b/core/bench/src/args/examples.rs
index 7a00929e5..95cb9f9d9 100644
--- a/core/bench/src/args/examples.rs
+++ b/core/bench/src/args/examples.rs
@@ -68,6 +68,7 @@ const EXAMPLES: &str = r#"EXAMPLES:
--producers (-c): Number of producers
--consumers (-c): Number of consumers
--max-topic-size (-T): Max topic size (e.g., "1GiB")
+ --message-expiry (-e): Topic message expiry time (e.g., "1s", "5min", "1h")
Examples with detailed configuration:
diff --git a/core/bench/src/args/kinds/balanced/consumer_group.rs
b/core/bench/src/args/kinds/balanced/consumer_group.rs
index a3c0fe2cc..ceb1d5ad8 100644
--- a/core/bench/src/args/kinds/balanced/consumer_group.rs
+++ b/core/bench/src/args/kinds/balanced/consumer_group.rs
@@ -82,7 +82,7 @@ impl BenchmarkKindProps for BalancedConsumerGroupArgs {
cmd.error(
ErrorKind::ArgumentConflict,
format!(
- "In balanced consumer group, consumer groups number
({cg_number}) must be less than the number of streams ({streams})"
+ "In balanced consumer group, consumer groups number
({cg_number}) must be greater than or equal to the number of streams
({streams})"
),
)
.exit();
diff --git a/core/bench/src/args/kinds/balanced/producer.rs
b/core/bench/src/args/kinds/balanced/producer.rs
index 02a222288..a49cdd6af 100644
--- a/core/bench/src/args/kinds/balanced/producer.rs
+++ b/core/bench/src/args/kinds/balanced/producer.rs
@@ -26,7 +26,7 @@ use crate::args::{
transport::BenchmarkTransportCommand,
};
use clap::{CommandFactory, Parser, error::ErrorKind};
-use iggy::prelude::IggyByteSize;
+use iggy::prelude::{IggyByteSize, IggyExpiry};
use std::num::NonZeroU32;
/// N producers sending to N separated stream-topic with single partition (one
stream per one producer)
@@ -50,6 +50,10 @@ pub struct BalancedProducerArgs {
/// Max topic size in human readable format, e.g. "1GiB", "2MB", "1GiB".
If not provided then the server default will be used.
#[arg(long, short = 'T')]
pub max_topic_size: Option<IggyByteSize>,
+
+ /// Topic message expiry time in human readable format, e.g. "1s", "5min",
"1h". If not provided, messages never expire.
+ #[arg(long, short = 'e')]
+ pub message_expiry: Option<IggyExpiry>,
}
impl BenchmarkKindProps for BalancedProducerArgs {
@@ -81,6 +85,10 @@ impl BenchmarkKindProps for BalancedProducerArgs {
self.max_topic_size
}
+ fn message_expiry(&self) -> IggyExpiry {
+ self.message_expiry.unwrap_or(IggyExpiry::NeverExpire)
+ }
+
fn validate(&self) {
let partitions = self.partitions();
let mut cmd = IggyBenchArgs::command();
diff --git a/core/bench/src/args/kinds/balanced/producer_and_consumer_group.rs
b/core/bench/src/args/kinds/balanced/producer_and_consumer_group.rs
index a18c869e8..70e761a4f 100644
--- a/core/bench/src/args/kinds/balanced/producer_and_consumer_group.rs
+++ b/core/bench/src/args/kinds/balanced/producer_and_consumer_group.rs
@@ -27,7 +27,7 @@ use crate::args::{
transport::BenchmarkTransportCommand,
};
use clap::{CommandFactory, Parser, error::ErrorKind};
-use iggy::prelude::IggyByteSize;
+use iggy::prelude::{IggyByteSize, IggyExpiry};
use std::num::NonZeroU32;
/// Polling benchmark with consumer group
@@ -60,6 +60,10 @@ pub struct BalancedProducerAndConsumerGroupArgs {
#[arg(long, short = 'T')]
pub max_topic_size: Option<IggyByteSize>,
+ /// Topic message expiry time in human readable format, e.g. "1s", "5min",
"1h". If not provided, messages never expire.
+ #[arg(long, short = 'e')]
+ pub message_expiry: Option<IggyExpiry>,
+
/// Consumer rate limit multiplier relative to producer rate.
/// When measuring E2E latency, consumers may need higher throughput to
prevent queue buildup.
/// Default is 1.05 (5% higher than producer rate). Set to 1.0 to disable.
@@ -124,4 +128,8 @@ impl BenchmarkKindProps for
BalancedProducerAndConsumerGroupArgs {
fn max_topic_size(&self) -> Option<IggyByteSize> {
self.max_topic_size
}
+
+ fn message_expiry(&self) -> IggyExpiry {
+ self.message_expiry.unwrap_or(IggyExpiry::NeverExpire)
+ }
}
diff --git a/core/bench/src/args/kinds/end_to_end/producing_consumer.rs
b/core/bench/src/args/kinds/end_to_end/producing_consumer.rs
index 57c0908d6..19069a9be 100644
--- a/core/bench/src/args/kinds/end_to_end/producing_consumer.rs
+++ b/core/bench/src/args/kinds/end_to_end/producing_consumer.rs
@@ -23,7 +23,7 @@ use crate::args::{
transport::BenchmarkTransportCommand,
};
use clap::{CommandFactory, Parser, error::ErrorKind};
-use iggy::prelude::IggyByteSize;
+use iggy::prelude::{IggyByteSize, IggyExpiry};
use std::num::NonZeroU32;
#[derive(Parser, Debug, Clone)]
@@ -42,6 +42,10 @@ pub struct EndToEndProducingConsumerArgs {
/// Max topic size in human readable format, e.g. "1GiB", "2MB", "1GiB".
If not provided then the server default will be used.
#[arg(long, short = 'T')]
pub max_topic_size: Option<IggyByteSize>,
+
+ /// Topic message expiry time in human readable format, e.g. "1s", "5min",
"1h". If not provided, messages never expire.
+ #[arg(long, short = 'e')]
+ pub message_expiry: Option<IggyExpiry>,
}
impl BenchmarkKindProps for EndToEndProducingConsumerArgs {
@@ -73,6 +77,10 @@ impl BenchmarkKindProps for EndToEndProducingConsumerArgs {
self.max_topic_size
}
+ fn message_expiry(&self) -> IggyExpiry {
+ self.message_expiry.unwrap_or(IggyExpiry::NeverExpire)
+ }
+
fn validate(&self) {
let mut cmd = IggyBenchArgs::command();
let streams = self.streams();
diff --git a/core/bench/src/args/kinds/end_to_end/producing_consumer_group.rs
b/core/bench/src/args/kinds/end_to_end/producing_consumer_group.rs
index 88cb58c2c..db2d7ba63 100644
--- a/core/bench/src/args/kinds/end_to_end/producing_consumer_group.rs
+++ b/core/bench/src/args/kinds/end_to_end/producing_consumer_group.rs
@@ -27,7 +27,7 @@ use crate::args::{
transport::BenchmarkTransportCommand,
};
use clap::{CommandFactory, Parser, error::ErrorKind};
-use iggy::prelude::IggyByteSize;
+use iggy::prelude::{IggyByteSize, IggyExpiry};
use std::num::NonZeroU32;
#[derive(Parser, Debug, Clone)]
@@ -58,6 +58,10 @@ pub struct EndToEndProducingConsumerGroupArgs {
/// Max topic size in human readable format, e.g. "1GiB", "2MB", "1GiB".
If not provided then topic size will be unlimited.
#[arg(long, short = 'T')]
pub max_topic_size: Option<IggyByteSize>,
+
+ /// Topic message expiry time in human readable format, e.g. "1s", "5min",
"1h". If not provided, messages never expire.
+ #[arg(long, short = 'e')]
+ pub message_expiry: Option<IggyExpiry>,
}
impl BenchmarkKindProps for EndToEndProducingConsumerGroupArgs {
@@ -89,6 +93,10 @@ impl BenchmarkKindProps for
EndToEndProducingConsumerGroupArgs {
self.max_topic_size
}
+ fn message_expiry(&self) -> IggyExpiry {
+ self.message_expiry.unwrap_or(IggyExpiry::NeverExpire)
+ }
+
fn validate(&self) {
let mut cmd = IggyBenchArgs::command();
let streams = self.streams();
diff --git a/core/bench/src/args/kinds/pinned/producer.rs
b/core/bench/src/args/kinds/pinned/producer.rs
index 138988fb6..89114c00d 100644
--- a/core/bench/src/args/kinds/pinned/producer.rs
+++ b/core/bench/src/args/kinds/pinned/producer.rs
@@ -21,7 +21,7 @@ use crate::args::{
transport::BenchmarkTransportCommand,
};
use clap::{CommandFactory, Parser, error::ErrorKind};
-use iggy::prelude::IggyByteSize;
+use iggy::prelude::{IggyByteSize, IggyExpiry};
use std::num::NonZeroU32;
#[derive(Parser, Debug, Clone)]
@@ -41,6 +41,10 @@ pub struct PinnedProducerArgs {
/// Max topic size in human readable format, e.g. "1GiB", "2MiB", "1GiB".
If not provided then the server default will be used.
#[arg(long, short = 'T')]
pub max_topic_size: Option<IggyByteSize>,
+
+ /// Topic message expiry time in human readable format, e.g. "1s", "5min",
"1h". If not provided, messages never expire.
+ #[arg(long, short = 'e')]
+ pub message_expiry: Option<IggyExpiry>,
}
impl BenchmarkKindProps for PinnedProducerArgs {
@@ -72,6 +76,10 @@ impl BenchmarkKindProps for PinnedProducerArgs {
self.max_topic_size
}
+ fn message_expiry(&self) -> IggyExpiry {
+ self.message_expiry.unwrap_or(IggyExpiry::NeverExpire)
+ }
+
fn validate(&self) {
let mut cmd = IggyBenchArgs::command();
let streams = self.streams();
diff --git a/core/bench/src/args/kinds/pinned/producer_and_consumer.rs
b/core/bench/src/args/kinds/pinned/producer_and_consumer.rs
index 6777e253c..296b38331 100644
--- a/core/bench/src/args/kinds/pinned/producer_and_consumer.rs
+++ b/core/bench/src/args/kinds/pinned/producer_and_consumer.rs
@@ -26,7 +26,7 @@ use crate::args::{
transport::BenchmarkTransportCommand,
};
use clap::{CommandFactory, Parser, error::ErrorKind};
-use iggy::prelude::IggyByteSize;
+use iggy::prelude::{IggyByteSize, IggyExpiry};
use std::num::NonZeroU32;
#[derive(Parser, Debug, Clone)]
@@ -55,6 +55,10 @@ pub struct PinnedProducerAndConsumerArgs {
#[arg(long, short = 'T')]
pub max_topic_size: Option<IggyByteSize>,
+ /// Topic message expiry time in human readable format, e.g. "1s", "5min",
"1h". If not provided, messages never expire.
+ #[arg(long, short = 'e')]
+ pub message_expiry: Option<IggyExpiry>,
+
/// Consumer rate limit multiplier relative to producer rate.
/// When measuring E2E latency, consumers may need higher throughput to
prevent queue buildup.
/// Default is 1.05 (5% higher than producer rate). Set to 1.0 to disable.
@@ -91,6 +95,10 @@ impl BenchmarkKindProps for PinnedProducerAndConsumerArgs {
self.max_topic_size
}
+ fn message_expiry(&self) -> IggyExpiry {
+ self.message_expiry.unwrap_or(IggyExpiry::NeverExpire)
+ }
+
fn read_amplification(&self) -> Option<f32> {
Some(self.read_amplification)
}
diff --git a/core/bench/src/args/props.rs b/core/bench/src/args/props.rs
index 2230163e8..aaff93db1 100644
--- a/core/bench/src/args/props.rs
+++ b/core/bench/src/args/props.rs
@@ -17,7 +17,7 @@
*/
use super::{output::BenchmarkOutputCommand,
transport::BenchmarkTransportCommand};
-use iggy::prelude::{IggyByteSize, TransportProtocol};
+use iggy::prelude::{IggyByteSize, IggyExpiry, TransportProtocol};
pub trait BenchmarkKindProps {
fn streams(&self) -> u32;
@@ -27,6 +27,9 @@ pub trait BenchmarkKindProps {
fn producers(&self) -> u32;
fn transport_command(&self) -> &BenchmarkTransportCommand;
fn max_topic_size(&self) -> Option<IggyByteSize>;
+ fn message_expiry(&self) -> IggyExpiry {
+ IggyExpiry::NeverExpire
+ }
fn validate(&self);
/// Consumer rate limit multiplier relative to producer rate.
diff --git a/core/bench/src/benchmarks/benchmark.rs
b/core/bench/src/benchmarks/benchmark.rs
index 6a334b4f0..594a00101 100644
--- a/core/bench/src/benchmarks/benchmark.rs
+++ b/core/bench/src/benchmarks/benchmark.rs
@@ -112,10 +112,11 @@ pub trait Benchmarkable: Send {
.args()
.max_topic_size()
.map_or(MaxTopicSize::Unlimited, MaxTopicSize::Custom);
+ let message_expiry = self.args().message_expiry();
info!(
- "Creating the test topic '{}' for stream '{}' with max
topic size: {:?}",
- topic_name, stream_name, max_topic_size
+ "Creating the test topic '{}' for stream '{}' with max
topic size: {:?}, message expiry: {}",
+ topic_name, stream_name, max_topic_size, message_expiry
);
client
@@ -125,7 +126,7 @@ pub trait Benchmarkable: Send {
partitions_count,
CompressionAlgorithm::default(),
None,
- IggyExpiry::NeverExpire,
+ message_expiry,
max_topic_size,
)
.await?;