This is an automated email from the ASF dual-hosted git repository. bashirbekov pushed a commit to branch feat/add-background-send in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 5191209e13856cf208bc33444ed99ea7c1bc32e8 Author: haze518 <[email protected]> AuthorDate: Thu May 15 09:54:09 2025 +0600 rename send_interval to linger_time --- README.md | 2 +- core/examples/src/multi-tenant/producer/main.rs | 2 +- core/examples/src/new-sdk/producer/main.rs | 2 +- .../stream-builder/stream-producer-config/main.rs | 2 +- core/sdk/src/clients/producer.rs | 8 ++--- core/sdk/src/clients/producer_builder.rs | 14 ++++---- .../stream_builder/build/build_iggy_producer.rs | 4 +-- .../stream_builder/config/config_iggy_producer.rs | 42 ++++++++-------------- .../stream_builder/config/config_iggy_stream.rs | 12 +++---- 9 files changed, 38 insertions(+), 50 deletions(-) diff --git a/README.md b/README.md index ac89de64..5074f509 100644 --- a/README.md +++ b/README.md @@ -249,7 +249,7 @@ let client = IggyClient::from_connection_string("iggy://user:secret@localhost:80 let mut producer = client .producer("dev01", "events")? .batch_length(1000) - .send_interval(IggyDuration::from_str("1ms")?) + .linger_time(IggyDuration::from_str("1ms")?) .partitioning(Partitioning::balanced()) .build(); diff --git a/core/examples/src/multi-tenant/producer/main.rs b/core/examples/src/multi-tenant/producer/main.rs index 2df9daa8..28dc9759 100644 --- a/core/examples/src/multi-tenant/producer/main.rs +++ b/core/examples/src/multi-tenant/producer/main.rs @@ -263,7 +263,7 @@ async fn create_producers( let mut producer = client .producer(stream, topic)? .batch_length(batch_length) - .send_interval(IggyDuration::from_str(interval).expect("Invalid duration")) + .linger_time(IggyDuration::from_str(interval).expect("Invalid duration")) .partitioning(Partitioning::balanced()) .create_topic_if_not_exists( partitions_count, diff --git a/core/examples/src/new-sdk/producer/main.rs b/core/examples/src/new-sdk/producer/main.rs index 8e3fd899..52f60e7c 100644 --- a/core/examples/src/new-sdk/producer/main.rs +++ b/core/examples/src/new-sdk/producer/main.rs @@ -46,7 +46,7 @@ async fn main() -> anyhow::Result<(), Box<dyn Error>> { let mut producer = client .producer(&args.stream_id, &args.topic_id)? .batch_length(args.messages_per_batch) - .send_interval(IggyDuration::from_str(&args.interval)?) + .linger_time(IggyDuration::from_str(&args.interval)?) .partitioning(Partitioning::balanced()) .create_topic_if_not_exists( 3, diff --git a/core/examples/src/stream-builder/stream-producer-config/main.rs b/core/examples/src/stream-builder/stream-producer-config/main.rs index f501a477..3038d250 100644 --- a/core/examples/src/stream-builder/stream-producer-config/main.rs +++ b/core/examples/src/stream-builder/stream-producer-config/main.rs @@ -46,7 +46,7 @@ async fn main() -> Result<(), IggyError> { // Note, this only applies to batch send messages. Single messages are sent immediately. .batch_length(100) // Sets the interval between sending the messages. Affects latency so you want to benchmark this value. - .send_interval(IggyDuration::from_str("5ms").unwrap()) + .linger_time(IggyDuration::from_str("5ms").unwrap()) // `Partitioning` is used to specify to which partition the messages should be sent. // It has the following kinds: // - `Balanced` - the partition ID is calculated by the server using the round-robin algorithm. diff --git a/core/sdk/src/clients/producer.rs b/core/sdk/src/clients/producer.rs index c9c203bc..f3706e3a 100644 --- a/core/sdk/src/clients/producer.rs +++ b/core/sdk/src/clients/producer.rs @@ -46,7 +46,7 @@ pub struct IggyProducer { partitioning: Option<Arc<Partitioning>>, encryptor: Option<Arc<EncryptorKind>>, partitioner: Option<Arc<dyn Partitioner>>, - send_interval_micros: u64, + linger_time_micros: u64, create_stream_if_not_exists: bool, create_topic_if_not_exists: bool, topic_partitions_count: u32, @@ -94,7 +94,7 @@ impl IggyProducer { partitioning: partitioning.map(Arc::new), encryptor, partitioner, - send_interval_micros: interval.map_or(0, |i| i.as_micros()), + linger_time_micros: interval.map_or(0, |i| i.as_micros()), create_stream_if_not_exists, create_topic_if_not_exists, topic_partitions_count, @@ -307,9 +307,9 @@ impl IggyProducer { let mut current_batch = 1; let batches_count = batches.len(); for batch in batches { - if self.send_interval_micros > 0 { + if self.linger_time_micros > 0 { Self::wait_before_sending( - self.send_interval_micros, + self.linger_time_micros, self.last_sent_at.load(ORDERING), ) .await; diff --git a/core/sdk/src/clients/producer_builder.rs b/core/sdk/src/clients/producer_builder.rs index 66ba3c08..76ecfeea 100644 --- a/core/sdk/src/clients/producer_builder.rs +++ b/core/sdk/src/clients/producer_builder.rs @@ -35,7 +35,7 @@ pub struct IggyProducerBuilder { partitioning: Option<Partitioning>, encryptor: Option<Arc<EncryptorKind>>, partitioner: Option<Arc<dyn Partitioner>>, - send_interval: Option<IggyDuration>, + linger_time: Option<IggyDuration>, create_stream_if_not_exists: bool, create_topic_if_not_exists: bool, topic_partitions_count: u32, @@ -67,7 +67,7 @@ impl IggyProducerBuilder { partitioning: None, encryptor, partitioner, - send_interval: Some(IggyDuration::from(1000)), + linger_time: Some(IggyDuration::from(1000)), create_stream_if_not_exists: true, create_topic_if_not_exists: true, topic_partitions_count: 1, @@ -110,17 +110,17 @@ impl IggyProducerBuilder { } /// Sets the interval between sending the messages, can be combined with `batch_length`. - pub fn send_interval(self, interval: IggyDuration) -> Self { + pub fn linger_time(self, interval: IggyDuration) -> Self { Self { - send_interval: Some(interval), + linger_time: Some(interval), ..self } } /// Clears the interval. - pub fn without_send_interval(self) -> Self { + pub fn without_linger_time(self) -> Self { Self { - send_interval: None, + linger_time: None, ..self } } @@ -240,7 +240,7 @@ impl IggyProducerBuilder { self.partitioning, self.encryptor, self.partitioner, - self.send_interval, + self.linger_time, self.create_stream_if_not_exists, self.create_topic_if_not_exists, self.topic_partitions_count, diff --git a/core/sdk/src/stream_builder/build/build_iggy_producer.rs b/core/sdk/src/stream_builder/build/build_iggy_producer.rs index e8f5a13f..9b9a1cdf 100644 --- a/core/sdk/src/stream_builder/build/build_iggy_producer.rs +++ b/core/sdk/src/stream_builder/build/build_iggy_producer.rs @@ -48,7 +48,7 @@ pub(crate) async fn build_iggy_producer( let topic_partitions_count = config.topic_partitions_count(); let topic_replication_factor = config.topic_replication_factor(); let batch_length = config.batch_length(); - let send_interval = config.send_interval(); + let linger_time = config.linger_time(); let partitioning = config.partitioning().to_owned(); let send_retries = config.send_retries_count(); let send_retries_interval = config.send_retries_interval(); @@ -57,7 +57,7 @@ pub(crate) async fn build_iggy_producer( let mut builder = client .producer(stream, topic)? .batch_length(batch_length) - .send_interval(send_interval) + .linger_time(linger_time) .partitioning(partitioning) .create_stream_if_not_exists() .send_retries(send_retries, send_retries_interval) diff --git a/core/sdk/src/stream_builder/config/config_iggy_producer.rs b/core/sdk/src/stream_builder/config/config_iggy_producer.rs index 0e5c4f76..0eca7671 100644 --- a/core/sdk/src/stream_builder/config/config_iggy_producer.rs +++ b/core/sdk/src/stream_builder/config/config_iggy_producer.rs @@ -39,7 +39,7 @@ pub struct IggyProducerConfig { /// The max number of messages to send in a batch. Must be greater than 0. batch_length: u32, /// Sets the interval between sending the messages, can be combined with `batch_length`. - send_interval: IggyDuration, + linger_time: IggyDuration, /// Specifies to which partition the messages should be sent. partitioning: Partitioning, /// Sets the maximum number of send retries in case of a message sending failure. @@ -62,7 +62,7 @@ impl Default for IggyProducerConfig { topic_id, topic_name: "test_topic".to_string(), batch_length: 100, - send_interval: IggyDuration::from_str("5ms").unwrap(), + linger_time: IggyDuration::from_str("5ms").unwrap(), partitioning: Partitioning::balanced(), topic_partitions_count: 1, topic_replication_factor: None, @@ -85,7 +85,7 @@ impl IggyProducerConfig { /// * `topic_partitions_count` - The number of partitions to create. /// * `topic_replication_factor` - The replication factor to use. /// * `batch_length` - The max number of messages to send in a batch. - /// * `send_interval` - The interval between messages sent. + /// * `linger_time` - The interval between messages sent. /// * `partitioning` - The partitioning strategy to use. /// * `encryptor` - The encryptor to use. /// * `send_retries_count` - The number of retries to send messages. @@ -103,7 +103,7 @@ impl IggyProducerConfig { topic_partitions_count: u32, topic_replication_factor: Option<u8>, batch_length: u32, - send_interval: IggyDuration, + linger_time: IggyDuration, partitioning: Partitioning, encryptor: Option<Arc<EncryptorKind>>, send_retries_count: Option<u32>, @@ -117,7 +117,7 @@ impl IggyProducerConfig { topic_partitions_count, topic_replication_factor, batch_length, - send_interval, + linger_time, partitioning, encryptor, send_retries_count, @@ -133,7 +133,7 @@ impl IggyProducerConfig { /// * `stream` - The stream name. /// * `topic` - The topic name. /// * `batch_length` - The max number of messages to send in a batch. - /// * `send_interval` - The interval between messages sent. + /// * `linger_time` - The interval between messages sent. /// /// Returns: /// A new `IggyProducerConfig`. @@ -142,7 +142,7 @@ impl IggyProducerConfig { stream: &str, topic: &str, batch_length: u32, - send_interval: IggyDuration, + linger_time: IggyDuration, ) -> Result<Self, IggyError> { let stream_id = Identifier::from_str_value(stream)?; let topic_id = Identifier::from_str_value(topic)?; @@ -153,7 +153,7 @@ impl IggyProducerConfig { topic_id, topic_name: topic.to_string(), batch_length, - send_interval, + linger_time, partitioning: Partitioning::balanced(), topic_partitions_count: 1, topic_replication_factor: None, @@ -185,8 +185,8 @@ impl IggyProducerConfig { self.batch_length } - pub fn send_interval(&self) -> IggyDuration { - self.send_interval + pub fn linger_time(&self) -> IggyDuration { + self.linger_time } pub fn partitioning(&self) -> &Partitioning { @@ -231,7 +231,7 @@ mod tests { .topic_name(topic) .topic_partitions_count(3) .batch_length(100) - .send_interval(IggyDuration::from_str("5ms").unwrap()) + .linger_time(IggyDuration::from_str("5ms").unwrap()) .partitioning(Partitioning::balanced()) .send_retries_count(3) .send_retries_interval(IggyDuration::new_from_secs(1)) @@ -248,10 +248,7 @@ mod tests { ); assert_eq!(config.topic_name(), "test_topic"); assert_eq!(config.batch_length(), 100); - assert_eq!( - config.send_interval(), - IggyDuration::from_str("5ms").unwrap() - ); + assert_eq!(config.linger_time(), IggyDuration::from_str("5ms").unwrap()); assert_eq!(config.partitioning(), &Partitioning::balanced()); assert_eq!(config.topic_partitions_count(), 3); assert_eq!(config.topic_replication_factor(), None); @@ -273,10 +270,7 @@ mod tests { assert_eq!(config.topic_id(), &topic_id); assert_eq!(config.topic_name(), "test_topic"); assert_eq!(config.batch_length(), 100); - assert_eq!( - config.send_interval(), - IggyDuration::from_str("5ms").unwrap() - ); + assert_eq!(config.linger_time(), IggyDuration::from_str("5ms").unwrap()); assert_eq!(config.partitioning(), &Partitioning::balanced()); assert_eq!(config.topic_partitions_count(), 1); assert_eq!(config.topic_replication_factor(), None); @@ -311,10 +305,7 @@ mod tests { assert_eq!(config.topic_id(), &topic_id); assert_eq!(config.topic_name(), "test_topic"); assert_eq!(config.batch_length(), 100); - assert_eq!( - config.send_interval(), - IggyDuration::from_str("5ms").unwrap() - ); + assert_eq!(config.linger_time(), IggyDuration::from_str("5ms").unwrap()); assert_eq!(config.partitioning(), &Partitioning::balanced()); assert_eq!(config.topic_partitions_count(), 3); assert_eq!(config.topic_replication_factor(), None); @@ -342,10 +333,7 @@ mod tests { assert_eq!(config.topic_id(), &topic_id); assert_eq!(config.topic_name(), "test_topic"); assert_eq!(config.batch_length(), 100); - assert_eq!( - config.send_interval(), - IggyDuration::from_str("5ms").unwrap() - ); + assert_eq!(config.linger_time(), IggyDuration::from_str("5ms").unwrap()); assert_eq!(config.partitioning(), &Partitioning::balanced()); assert_eq!(config.topic_partitions_count(), 1); assert_eq!(config.topic_replication_factor(), None); diff --git a/core/sdk/src/stream_builder/config/config_iggy_stream.rs b/core/sdk/src/stream_builder/config/config_iggy_stream.rs index 49bc570e..07af9a48 100644 --- a/core/sdk/src/stream_builder/config/config_iggy_stream.rs +++ b/core/sdk/src/stream_builder/config/config_iggy_stream.rs @@ -52,7 +52,7 @@ impl IggyStreamConfig { /// * `stream` - The stream name. /// * `topic` - The topic name. /// * `batch_length` - The max number of messages to send in a batch. - /// * `send_interval` - The interval between messages sent. + /// * `linger_time` - The interval between messages sent. /// * `polling_interval` - The interval between polling for new messages. /// /// Returns: @@ -62,14 +62,14 @@ impl IggyStreamConfig { stream: &str, topic: &str, batch_length: u32, - send_interval: IggyDuration, + linger_time: IggyDuration, polling_interval: IggyDuration, ) -> Result<Self, IggyError> { let consumer_config = IggyConsumerConfig::from_stream_topic(stream, topic, batch_length, polling_interval)?; let producer_config = - IggyProducerConfig::from_stream_topic(stream, topic, batch_length, send_interval)?; + IggyProducerConfig::from_stream_topic(stream, topic, batch_length, linger_time)?; Ok(Self { consumer_config, @@ -136,7 +136,7 @@ mod tests { IggyDuration::from_str("5ms").unwrap() ); assert_eq!( - config.producer_config().send_interval(), + config.producer_config().linger_time(), IggyDuration::from_str("5ms").unwrap() ); } @@ -153,7 +153,7 @@ mod tests { IggyDuration::from_str("5ms").unwrap() ); assert_eq!( - config.producer_config().send_interval(), + config.producer_config().linger_time(), IggyDuration::from_str("5ms").unwrap() ); } @@ -180,7 +180,7 @@ mod tests { IggyDuration::from_str("5ms").unwrap() ); assert_eq!( - config.producer_config().send_interval(), + config.producer_config().linger_time(), IggyDuration::from_str("5ms").unwrap() ); }
