This is an automated email from the ASF dual-hosted git repository. bashirbekov pushed a commit to branch feat/add-background-send in repository https://gitbox.apache.org/repos/asf/iggy.git
commit c1f22af6f9807fd630aa1af818fd83339d5673a1 Author: haze518 <[email protected]> AuthorDate: Thu May 15 09:45:15 2025 +0600 rename batch_size to batch_length --- README.md | 4 +-- core/examples/src/multi-tenant/consumer/main.rs | 4 +-- core/examples/src/multi-tenant/producer/main.rs | 14 +++++----- core/examples/src/new-sdk/consumer/main.rs | 2 +- core/examples/src/new-sdk/producer/main.rs | 2 +- .../stream-builder/stream-consumer-config/main.rs | 2 +- .../stream-builder/stream-producer-config/main.rs | 2 +- .../tests/server/scenarios/system_scenario.rs | 10 +++---- core/integration/tests/streaming/get_by_offset.rs | 14 +++++----- .../tests/streaming/get_by_timestamp.rs | 14 +++++----- core/sdk/src/clients/consumer.rs | 8 +++--- core/sdk/src/clients/consumer_builder.rs | 13 +++++---- core/sdk/src/clients/producer.rs | 16 +++++------ core/sdk/src/clients/producer_builder.rs | 18 ++++++------ .../stream_builder/build/build_iggy_consumer.rs | 4 +-- .../stream_builder/build/build_iggy_producer.rs | 4 +-- .../stream_builder/config/config_iggy_consumer.rs | 32 +++++++++++----------- .../stream_builder/config/config_iggy_producer.rs | 32 +++++++++++----------- .../stream_builder/config/config_iggy_stream.rs | 20 +++++++------- 19 files changed, 109 insertions(+), 106 deletions(-) diff --git a/README.md b/README.md index 899a05d1..ac89de64 100644 --- a/README.md +++ b/README.md @@ -248,7 +248,7 @@ let client = IggyClient::from_connection_string("iggy://user:secret@localhost:80 // Create a producer for the given stream and one of its topics let mut producer = client .producer("dev01", "events")? - .batch_size(1000) + .batch_length(1000) .send_interval(IggyDuration::from_str("1ms")?) .partitioning(Partitioning::balanced()) .build(); @@ -270,7 +270,7 @@ let mut consumer = client .auto_join_consumer_group() .polling_strategy(PollingStrategy::next()) .poll_interval(IggyDuration::from_str("1ms")?) - .batch_size(1000) + .batch_length(1000) .build(); consumer.init().await?; diff --git a/core/examples/src/multi-tenant/consumer/main.rs b/core/examples/src/multi-tenant/consumer/main.rs index 81b0c3eb..af947f60 100644 --- a/core/examples/src/multi-tenant/consumer/main.rs +++ b/core/examples/src/multi-tenant/consumer/main.rs @@ -276,7 +276,7 @@ async fn create_consumers( consumers_count: u32, stream: &str, topics: &[&str], - batch_size: u32, + batch_length: u32, interval: &str, ) -> Result<Vec<TenantConsumer>, IggyError> { let mut consumers = Vec::new(); @@ -284,7 +284,7 @@ async fn create_consumers( for id in 1..=consumers_count { let mut consumer = client .consumer_group(CONSUMER_GROUP, stream, topic)? - .batch_size(batch_size) + .batch_length(batch_length) .poll_interval(IggyDuration::from_str(interval).expect("Invalid duration")) .polling_strategy(PollingStrategy::next()) .auto_join_consumer_group() diff --git a/core/examples/src/multi-tenant/producer/main.rs b/core/examples/src/multi-tenant/producer/main.rs index b9683c64..2df9daa8 100644 --- a/core/examples/src/multi-tenant/producer/main.rs +++ b/core/examples/src/multi-tenant/producer/main.rs @@ -189,7 +189,7 @@ fn start_producers( tenant_id: u32, producers: Vec<TenantProducer>, batches_count: u64, - batch_size: u32, + batch_length: u32, ) -> Vec<JoinHandle<()>> { let mut tasks = Vec::new(); let topics_count = producers @@ -221,8 +221,8 @@ fn start_producers( _ => panic!("Invalid topic"), }; - let mut messages = Vec::with_capacity(batch_size as usize); - for _ in 1..=batch_size { + let mut messages = Vec::with_capacity(batch_length as usize); + for _ in 1..=batch_length { let payload = format!("{message}-{producer_id}-{message_id}"); let message = IggyMessage::from_str(&payload).expect("Invalid message"); messages.push(message); @@ -230,7 +230,7 @@ fn start_producers( if let Err(error) = producer.producer.send(messages).await { error!( - "Failed to send: {batch_size} message(s) to: {} -> {} by tenant: {tenant_id}, producer: {producer_id} with error: {error}", + "Failed to send: {batch_length} message(s) to: {} -> {} by tenant: {tenant_id}, producer: {producer_id} with error: {error}", producer.stream, producer.topic, ); continue; @@ -238,7 +238,7 @@ fn start_producers( counter += 1; info!( - "Sent: {batch_size} message(s) by tenant: {tenant_id}, producer: {producer_id}, to: {} -> {}", + "Sent: {batch_length} message(s) by tenant: {tenant_id}, producer: {producer_id}, to: {} -> {}", producer.stream, producer.topic ); } @@ -254,7 +254,7 @@ async fn create_producers( partitions_count: u32, stream: &str, topics: &[&str], - batch_size: u32, + batch_length: u32, interval: &str, ) -> Result<Vec<TenantProducer>, IggyError> { let mut producers = Vec::new(); @@ -262,7 +262,7 @@ async fn create_producers( for id in 1..=producers_count { let mut producer = client .producer(stream, topic)? - .batch_size(batch_size) + .batch_length(batch_length) .send_interval(IggyDuration::from_str(interval).expect("Invalid duration")) .partitioning(Partitioning::balanced()) .create_topic_if_not_exists( diff --git a/core/examples/src/new-sdk/consumer/main.rs b/core/examples/src/new-sdk/consumer/main.rs index cd20b9c7..aec1d464 100644 --- a/core/examples/src/new-sdk/consumer/main.rs +++ b/core/examples/src/new-sdk/consumer/main.rs @@ -62,7 +62,7 @@ async fn main() -> anyhow::Result<(), Box<dyn Error>> { .auto_join_consumer_group() .polling_strategy(PollingStrategy::next()) .poll_interval(IggyDuration::from_str(&args.interval)?) - .batch_size(args.messages_per_batch) + .batch_length(args.messages_per_batch) .build(); consumer.init().await?; diff --git a/core/examples/src/new-sdk/producer/main.rs b/core/examples/src/new-sdk/producer/main.rs index b909921c..8e3fd899 100644 --- a/core/examples/src/new-sdk/producer/main.rs +++ b/core/examples/src/new-sdk/producer/main.rs @@ -45,7 +45,7 @@ async fn main() -> anyhow::Result<(), Box<dyn Error>> { client.connect().await?; let mut producer = client .producer(&args.stream_id, &args.topic_id)? - .batch_size(args.messages_per_batch) + .batch_length(args.messages_per_batch) .send_interval(IggyDuration::from_str(&args.interval)?) .partitioning(Partitioning::balanced()) .create_topic_if_not_exists( diff --git a/core/examples/src/stream-builder/stream-consumer-config/main.rs b/core/examples/src/stream-builder/stream-consumer-config/main.rs index 8006b0ce..84e86c65 100644 --- a/core/examples/src/stream-builder/stream-consumer-config/main.rs +++ b/core/examples/src/stream-builder/stream-consumer-config/main.rs @@ -48,7 +48,7 @@ async fn main() -> Result<(), IggyError> { // The max number of messages to send in a batch. The greater the batch size, the higher the throughput for bulk data. // Note, there is a tradeoff between batch size and latency, so you want to benchmark your setup. // Note, this only applies to batch send messages. Single messages are sent immediately. - .batch_size(100) + .batch_length(100) // Create the stream if it doesn't exist. .create_stream_if_not_exists(true) // Create the topic if it doesn't exist. diff --git a/core/examples/src/stream-builder/stream-producer-config/main.rs b/core/examples/src/stream-builder/stream-producer-config/main.rs index 5dd6e481..f501a477 100644 --- a/core/examples/src/stream-builder/stream-producer-config/main.rs +++ b/core/examples/src/stream-builder/stream-producer-config/main.rs @@ -44,7 +44,7 @@ async fn main() -> Result<(), IggyError> { // The max number of messages to send in a batch. The greater the batch size, the higher the throughput for bulk data. // Note, there is a tradeoff between batch size and latency, so you want to benchmark your setup. // Note, this only applies to batch send messages. Single messages are sent immediately. - .batch_size(100) + .batch_length(100) // Sets the interval between sending the messages. Affects latency so you want to benchmark this value. .send_interval(IggyDuration::from_str("5ms").unwrap()) // `Partitioning` is used to specify to which partition the messages should be sent. diff --git a/core/integration/tests/server/scenarios/system_scenario.rs b/core/integration/tests/server/scenarios/system_scenario.rs index 1ae40637..c70968c8 100644 --- a/core/integration/tests/server/scenarios/system_scenario.rs +++ b/core/integration/tests/server/scenarios/system_scenario.rs @@ -255,9 +255,9 @@ pub async fn run(client_factory: &dyn ClientFactory) { // 19. Messages should be also polled in the smaller batches let batches_count = 10; - let batch_size = MESSAGES_COUNT / batches_count; + let batch_length = MESSAGES_COUNT / batches_count; for i in 0..batches_count { - let start_offset = (i * batch_size) as u64; + let start_offset = (i * batch_length) as u64; let polled_messages = client .poll_messages( &Identifier::numeric(STREAM_ID).unwrap(), @@ -265,13 +265,13 @@ pub async fn run(client_factory: &dyn ClientFactory) { Some(PARTITION_ID), &consumer, &PollingStrategy::offset(start_offset), - batch_size, + batch_length, false, ) .await .unwrap(); - assert_eq!(polled_messages.messages.len() as u32, batch_size); - for i in 0..batch_size as u64 { + assert_eq!(polled_messages.messages.len() as u32, batch_length); + for i in 0..batch_length as u64 { let offset = start_offset + i; let message = polled_messages.messages.get(i as usize).unwrap(); assert_message(message, offset); diff --git a/core/integration/tests/streaming/get_by_offset.rs b/core/integration/tests/streaming/get_by_offset.rs index 5652391f..28320d5e 100644 --- a/core/integration/tests/streaming/get_by_offset.rs +++ b/core/integration/tests/streaming/get_by_offset.rs @@ -82,14 +82,14 @@ fn very_large_batches() -> Vec<u32> { #[tokio::test] async fn test_get_messages_by_offset( message_size: IggyByteSize, - batch_sizes: Vec<u32>, + batch_lengths: Vec<u32>, messages_required_to_save: u32, segment_size: IggyByteSize, cache_indexes: CacheIndexesConfig, ) { println!( "Running test with message_size: {}, batches: {:?}, messages_required_to_save: {}, segment_size: {}, cache_indexes: {}", - message_size, batch_sizes, messages_required_to_save, segment_size, cache_indexes + message_size, batch_lengths, messages_required_to_save, segment_size, cache_indexes ); let setup = TestSetup::init().await; @@ -97,7 +97,7 @@ async fn test_get_messages_by_offset( let topic_id = 1; let partition_id = 1; - let total_messages_count = batch_sizes.iter().sum(); + let total_messages_count = batch_lengths.iter().sum(); let config = Arc::new(SystemConfig { path: setup.config.path.to_string(), @@ -169,11 +169,11 @@ async fn test_get_messages_by_offset( } // Keep track of offsets after each batch - let mut batch_offsets = Vec::with_capacity(batch_sizes.len()); + let mut batch_offsets = Vec::with_capacity(batch_lengths.len()); let mut current_pos = 0; // Append all batches as defined in the test matrix - for (batch_idx, &batch_len) in batch_sizes.iter().enumerate() { + for (batch_idx, &batch_len) in batch_lengths.iter().enumerate() { // If we've generated too many messages, skip the rest if current_pos + batch_len as usize > all_messages.len() { break; @@ -182,7 +182,7 @@ async fn test_get_messages_by_offset( println!( "Appending batch {}/{} with {} messages", batch_idx + 1, - batch_sizes.len(), + batch_lengths.len(), batch_len ); @@ -221,7 +221,7 @@ async fn test_get_messages_by_offset( // Test 2: Get messages from middle (after 3rd batch) if batch_offsets.len() >= 3 { let middle_offset = batch_offsets[2]; - let prior_batches_sum: u32 = batch_sizes[..3].iter().sum(); + let prior_batches_sum: u32 = batch_lengths[..3].iter().sum(); let remaining_messages = total_sent_messages - prior_batches_sum; let middle_messages = partition diff --git a/core/integration/tests/streaming/get_by_timestamp.rs b/core/integration/tests/streaming/get_by_timestamp.rs index d68e4811..c5b9bf01 100644 --- a/core/integration/tests/streaming/get_by_timestamp.rs +++ b/core/integration/tests/streaming/get_by_timestamp.rs @@ -84,14 +84,14 @@ fn very_large_batches() -> Vec<u32> { #[tokio::test] async fn test_get_messages_by_timestamp( message_size: IggyByteSize, - batch_sizes: Vec<u32>, + batch_lentghs: Vec<u32>, messages_required_to_save: u32, segment_size: IggyByteSize, cache_indexes: CacheIndexesConfig, ) { println!( "Running test with message_size: {}, batches: {:?}, messages_required_to_save: {}, segment_size: {}, cache_indexes: {}", - message_size, batch_sizes, messages_required_to_save, segment_size, cache_indexes + message_size, batch_lentghs, messages_required_to_save, segment_size, cache_indexes ); let setup = TestSetup::init().await; @@ -99,7 +99,7 @@ async fn test_get_messages_by_timestamp( let topic_id = 1; let partition_id = 1; - let total_messages_count = batch_sizes.iter().sum(); + let total_messages_count = batch_lentghs.iter().sum(); let config = Arc::new(SystemConfig { path: setup.config.path.to_string(), @@ -173,11 +173,11 @@ async fn test_get_messages_by_timestamp( // Timestamp tracking for messages let initial_timestamp = IggyTimestamp::now(); - let mut batch_timestamps = Vec::with_capacity(batch_sizes.len()); + let mut batch_timestamps = Vec::with_capacity(batch_lentghs.len()); let mut current_pos = 0; // Append all batches as defined in the test matrix with separate timestamps - for (batch_idx, &batch_len) in batch_sizes.iter().enumerate() { + for (batch_idx, &batch_len) in batch_lentghs.iter().enumerate() { // Add a small delay between batches to ensure distinct timestamps sleep(std::time::Duration::from_millis(2)); @@ -189,7 +189,7 @@ async fn test_get_messages_by_timestamp( println!( "Appending batch {}/{} with {} messages", batch_idx + 1, - batch_sizes.len(), + batch_lentghs.len(), batch_len ); @@ -241,7 +241,7 @@ async fn test_get_messages_by_timestamp( let middle_timestamp = IggyTimestamp::from(batch_timestamps[2].as_micros() + 1000); // Calculate how many messages should be in batches after the 3rd - let prior_batches_sum: u32 = batch_sizes[..3].iter().sum(); + let prior_batches_sum: u32 = batch_lentghs[..3].iter().sum(); let remaining_messages = total_sent_messages - prior_batches_sum; let middle_messages = partition diff --git a/core/sdk/src/clients/consumer.rs b/core/sdk/src/clients/consumer.rs index 795f4edc..3b5f2d17 100644 --- a/core/sdk/src/clients/consumer.rs +++ b/core/sdk/src/clients/consumer.rs @@ -103,7 +103,7 @@ pub struct IggyConsumer { partition_id: Option<u32>, polling_strategy: PollingStrategy, poll_interval_micros: u64, - batch_size: u32, + batch_length: u32, auto_commit: AutoCommit, auto_commit_after_polling: bool, auto_join_consumer_group: bool, @@ -137,7 +137,7 @@ impl IggyConsumer { partition_id: Option<u32>, polling_interval: Option<IggyDuration>, polling_strategy: PollingStrategy, - batch_size: u32, + batch_length: u32, auto_commit: AutoCommit, auto_join_consumer_group: bool, create_consumer_group_if_not_exists: bool, @@ -165,7 +165,7 @@ impl IggyConsumer { last_consumed_offsets: Arc::new(DashMap::new()), current_offsets: Arc::new(DashMap::new()), poll_future: None, - batch_size, + batch_length, auto_commit, auto_commit_after_polling: matches!( auto_commit, @@ -609,7 +609,7 @@ impl IggyConsumer { let consumer = self.consumer.clone(); let polling_strategy = self.polling_strategy; let client = self.client.clone(); - let count = self.batch_size; + let count = self.batch_length; let auto_commit_after_polling = self.auto_commit_after_polling; let auto_commit_enabled = self.auto_commit != AutoCommit::Disabled; let interval = self.poll_interval_micros; diff --git a/core/sdk/src/clients/consumer_builder.rs b/core/sdk/src/clients/consumer_builder.rs index 5008270f..908a367c 100644 --- a/core/sdk/src/clients/consumer_builder.rs +++ b/core/sdk/src/clients/consumer_builder.rs @@ -32,7 +32,7 @@ pub struct IggyConsumerBuilder { partition: Option<u32>, polling_strategy: PollingStrategy, polling_interval: Option<IggyDuration>, - batch_size: u32, + batch_length: u32, auto_commit: AutoCommit, auto_join_consumer_group: bool, create_consumer_group_if_not_exists: bool, @@ -63,7 +63,7 @@ impl IggyConsumerBuilder { topic: topic_id, partition: partition_id, polling_strategy: PollingStrategy::next(), - batch_size: 1000, + batch_length: 1000, auto_commit: AutoCommit::IntervalOrWhen( IggyDuration::ONE_SECOND, AutoCommitWhen::PollingMessages, @@ -103,8 +103,11 @@ impl IggyConsumerBuilder { } /// Sets the batch size for polling messages. - pub fn batch_size(self, batch_size: u32) -> Self { - Self { batch_size, ..self } + pub fn batch_length(self, batch_length: u32) -> Self { + Self { + batch_length, + ..self + } } /// Sets the auto-commit configuration for storing the offset on the server. @@ -226,7 +229,7 @@ impl IggyConsumerBuilder { self.partition, self.polling_interval, self.polling_strategy, - self.batch_size, + self.batch_length, self.auto_commit, self.auto_join_consumer_group, self.create_consumer_group_if_not_exists, diff --git a/core/sdk/src/clients/producer.rs b/core/sdk/src/clients/producer.rs index 7e9443c7..c9c203bc 100644 --- a/core/sdk/src/clients/producer.rs +++ b/core/sdk/src/clients/producer.rs @@ -42,7 +42,7 @@ pub struct IggyProducer { stream_name: String, topic_id: Arc<Identifier>, topic_name: String, - batch_size: Option<usize>, + batch_length: Option<usize>, partitioning: Option<Arc<Partitioning>>, encryptor: Option<Arc<EncryptorKind>>, partitioner: Option<Arc<dyn Partitioner>>, @@ -68,7 +68,7 @@ impl IggyProducer { stream_name: String, topic: Identifier, topic_name: String, - batch_size: Option<usize>, + batch_length: Option<usize>, partitioning: Option<Partitioning>, encryptor: Option<Arc<EncryptorKind>>, partitioner: Option<Arc<dyn Partitioner>>, @@ -90,7 +90,7 @@ impl IggyProducer { stream_name, topic_id: Arc::new(topic), topic_name, - batch_size, + batch_length, partitioning: partitioning.map(Arc::new), encryptor, partitioner, @@ -302,8 +302,8 @@ impl IggyProducer { ) -> Result<(), IggyError> { self.encrypt_messages(&mut messages)?; let partitioning = self.get_partitioning(&stream, &topic, &messages, partitioning)?; - let batch_size = self.batch_size.unwrap_or(MAX_BATCH_SIZE); - let batches = messages.chunks_mut(batch_size); + let batch_length = self.batch_length.unwrap_or(MAX_BATCH_SIZE); + let batches = messages.chunks_mut(batch_length); let mut current_batch = 1; let batches_count = batches.len(); for batch in batches { @@ -339,8 +339,8 @@ impl IggyProducer { trace!("No batch size specified, sending messages immediately."); self.encrypt_messages(&mut messages)?; let partitioning = self.get_partitioning(stream, topic, &messages, partitioning)?; - let batch_size = self.batch_size.unwrap_or(MAX_BATCH_SIZE); - if messages.len() <= batch_size { + let batch_length = self.batch_length.unwrap_or(MAX_BATCH_SIZE); + if messages.len() <= batch_length { self.last_sent_at .store(IggyTimestamp::now().into(), ORDERING); self.try_send_messages(stream, topic, &partitioning, &mut messages) @@ -348,7 +348,7 @@ impl IggyProducer { return Ok(()); } - for batch in messages.chunks_mut(batch_size) { + for batch in messages.chunks_mut(batch_length) { self.last_sent_at .store(IggyTimestamp::now().into(), ORDERING); self.try_send_messages(stream, topic, &partitioning, batch) diff --git a/core/sdk/src/clients/producer_builder.rs b/core/sdk/src/clients/producer_builder.rs index a25459cd..66ba3c08 100644 --- a/core/sdk/src/clients/producer_builder.rs +++ b/core/sdk/src/clients/producer_builder.rs @@ -31,7 +31,7 @@ pub struct IggyProducerBuilder { stream_name: String, topic: Identifier, topic_name: String, - batch_size: Option<usize>, + batch_length: Option<usize>, partitioning: Option<Partitioning>, encryptor: Option<Arc<EncryptorKind>>, partitioner: Option<Arc<dyn Partitioner>>, @@ -63,7 +63,7 @@ impl IggyProducerBuilder { stream_name, topic, topic_name, - batch_size: Some(1000), + batch_length: Some(1000), partitioning: None, encryptor, partitioner, @@ -90,26 +90,26 @@ impl IggyProducerBuilder { } /// Sets the number of messages to batch before sending them, can be combined with `interval`. - pub fn batch_size(self, batch_size: u32) -> Self { + pub fn batch_length(self, batch_length: u32) -> Self { Self { - batch_size: if batch_size == 0 { + batch_length: if batch_length == 0 { None } else { - Some(batch_size.min(MAX_BATCH_SIZE as u32) as usize) + Some(batch_length.min(MAX_BATCH_SIZE as u32) as usize) }, ..self } } /// Clears the batch size. - pub fn without_batch_size(self) -> Self { + pub fn without_batch_length(self) -> Self { Self { - batch_size: None, + batch_length: None, ..self } } - /// Sets the interval between sending the messages, can be combined with `batch_size`. + /// Sets the interval between sending the messages, can be combined with `batch_length`. pub fn send_interval(self, interval: IggyDuration) -> Self { Self { send_interval: Some(interval), @@ -236,7 +236,7 @@ impl IggyProducerBuilder { self.stream_name, self.topic, self.topic_name, - self.batch_size, + self.batch_length, self.partitioning, self.encryptor, self.partitioner, diff --git a/core/sdk/src/stream_builder/build/build_iggy_consumer.rs b/core/sdk/src/stream_builder/build/build_iggy_consumer.rs index b3d79ab4..dd7260bd 100644 --- a/core/sdk/src/stream_builder/build/build_iggy_consumer.rs +++ b/core/sdk/src/stream_builder/build/build_iggy_consumer.rs @@ -48,7 +48,7 @@ pub(crate) async fn build_iggy_consumer( let auto_commit = config.auto_commit(); let consumer_kind = config.consumer_kind(); let consumer_name = config.consumer_name(); - let batch_size = config.batch_size(); + let batch_length = config.batch_length(); let polling_interval = config.polling_interval(); let polling_strategy = config.polling_strategy(); let partition = config.partitions_count(); @@ -64,7 +64,7 @@ pub(crate) async fn build_iggy_consumer( .auto_join_consumer_group() .polling_strategy(polling_strategy) .poll_interval(polling_interval) - .batch_size(batch_size) + .batch_length(batch_length) .polling_retry_interval(polling_retry_interval); if let Some(encryptor) = config.encryptor() { diff --git a/core/sdk/src/stream_builder/build/build_iggy_producer.rs b/core/sdk/src/stream_builder/build/build_iggy_producer.rs index 6b7f2d18..e8f5a13f 100644 --- a/core/sdk/src/stream_builder/build/build_iggy_producer.rs +++ b/core/sdk/src/stream_builder/build/build_iggy_producer.rs @@ -47,7 +47,7 @@ pub(crate) async fn build_iggy_producer( let topic = config.topic_name(); let topic_partitions_count = config.topic_partitions_count(); let topic_replication_factor = config.topic_replication_factor(); - let batch_size = config.batch_size(); + let batch_length = config.batch_length(); let send_interval = config.send_interval(); let partitioning = config.partitioning().to_owned(); let send_retries = config.send_retries_count(); @@ -56,7 +56,7 @@ pub(crate) async fn build_iggy_producer( trace!("Build iggy producer"); let mut builder = client .producer(stream, topic)? - .batch_size(batch_size) + .batch_length(batch_length) .send_interval(send_interval) .partitioning(partitioning) .create_stream_if_not_exists() diff --git a/core/sdk/src/stream_builder/config/config_iggy_consumer.rs b/core/sdk/src/stream_builder/config/config_iggy_consumer.rs index e7496384..4a5ce32c 100644 --- a/core/sdk/src/stream_builder/config/config_iggy_consumer.rs +++ b/core/sdk/src/stream_builder/config/config_iggy_consumer.rs @@ -37,9 +37,9 @@ pub struct IggyConsumerConfig { topic_name: String, /// The auto-commit configuration for storing the message offset on the server. See `AutoCommit` for details. auto_commit: AutoCommit, - /// The max number of messages to send in a batch. The greater the batch size, the higher the throughput for bulk data. + /// The max number of messages to send in a batch. The greater the batch length, the higher the throughput for bulk data. /// Note, there is a tradeoff between batch size and latency, so you want to benchmark your setup. - batch_size: u32, + batch_length: u32, /// Create the stream if it doesn't exist. create_stream_if_not_exists: bool, /// Create the topic if it doesn't exist. @@ -78,7 +78,7 @@ impl Default for IggyConsumerConfig { topic_id, topic_name: "test_topic".to_string(), auto_commit: AutoCommit::When(AutoCommitWhen::PollingMessages), - batch_size: 100, + batch_length: 100, create_stream_if_not_exists: false, create_topic_if_not_exists: false, consumer_name: "test_consumer".to_string(), @@ -105,7 +105,7 @@ impl IggyConsumerConfig { /// * `topic_id` - The topic id. /// * `topic_name` - The topic name. /// * `auto_commit` - The auto commit config. - /// * `batch_size` - The max number of messages to send in a batch. + /// * `batch_length` - The max number of messages to send in a batch. /// * `create_stream_if_not_exists` - Whether to create the stream if it does not exists. /// * `create_topic_if_not_exists` - Whether to create the topic if it does not exists. /// * `consumer_name` - The consumer name. @@ -130,7 +130,7 @@ impl IggyConsumerConfig { topic_id: Identifier, topic_name: String, auto_commit: AutoCommit, - batch_size: u32, + batch_length: u32, create_stream_if_not_exists: bool, create_topic_if_not_exists: bool, consumer_name: String, @@ -150,7 +150,7 @@ impl IggyConsumerConfig { topic_id, topic_name, auto_commit, - batch_size, + batch_length, create_stream_if_not_exists, create_topic_if_not_exists, consumer_name, @@ -172,7 +172,7 @@ impl IggyConsumerConfig { /// /// * `stream` - The stream name. /// * `topic` - The topic name. - /// * `batch_size` - The max number of messages to send in a batch. + /// * `batch_length` - The max number of messages to send in a batch. /// * `polling_interval` - The interval between polling for new messages. /// /// Returns: @@ -181,7 +181,7 @@ impl IggyConsumerConfig { pub fn from_stream_topic( stream: &str, topic: &str, - batch_size: u32, + batch_length: u32, polling_interval: IggyDuration, ) -> Result<Self, IggyError> { let stream_id = Identifier::from_str_value(stream)?; @@ -193,7 +193,7 @@ impl IggyConsumerConfig { topic_id, topic_name: topic.to_string(), auto_commit: AutoCommit::When(AutoCommitWhen::PollingMessages), - batch_size, + batch_length, create_stream_if_not_exists: false, create_topic_if_not_exists: false, consumer_name: format!("consumer-{}-{}", stream, topic), @@ -231,8 +231,8 @@ impl IggyConsumerConfig { self.auto_commit } - pub fn batch_size(&self) -> u32 { - self.batch_size + pub fn batch_length(&self) -> u32 { + self.batch_length } pub fn create_stream_if_not_exists(&self) -> bool { self.create_stream_if_not_exists @@ -299,7 +299,7 @@ mod tests { .topic_id(topic_id) .topic_name("test_topic".to_string()) .auto_commit(AutoCommit::When(AutoCommitWhen::PollingMessages)) - .batch_size(100) + .batch_length(100) .create_stream_if_not_exists(true) .create_topic_if_not_exists(true) .consumer_name("test_consumer".to_string()) @@ -326,7 +326,7 @@ mod tests { config.auto_commit(), AutoCommit::When(AutoCommitWhen::PollingMessages) ); - assert_eq!(config.batch_size(), 100); + assert_eq!(config.batch_length(), 100); assert!(config.create_stream_if_not_exists()); assert!(config.create_topic_if_not_exists()); assert_eq!(config.consumer_name(), "test_consumer"); @@ -361,7 +361,7 @@ mod tests { config.auto_commit(), AutoCommit::When(AutoCommitWhen::PollingMessages) ); - assert_eq!(config.batch_size(), 100); + assert_eq!(config.batch_length(), 100); assert!(!config.create_stream_if_not_exists()); assert!(!config.create_topic_if_not_exists()); assert_eq!(config.consumer_name(), "test_consumer"); @@ -415,7 +415,7 @@ mod tests { config.auto_commit(), AutoCommit::When(AutoCommitWhen::PollingMessages) ); - assert_eq!(config.batch_size(), 100); + assert_eq!(config.batch_length(), 100); assert!(!config.create_stream_if_not_exists()); assert!(!config.create_topic_if_not_exists()); assert_eq!(config.consumer_name(), "test_consumer"); @@ -449,7 +449,7 @@ mod tests { assert_eq!(config.stream_name(), "test_stream"); assert_eq!(config.topic_name(), "test_topic"); - assert_eq!(config.batch_size(), 100); + assert_eq!(config.batch_length(), 100); assert!(!config.create_stream_if_not_exists()); assert!(!config.create_topic_if_not_exists()); assert_eq!(config.consumer_name(), "consumer-test_stream-test_topic"); diff --git a/core/sdk/src/stream_builder/config/config_iggy_producer.rs b/core/sdk/src/stream_builder/config/config_iggy_producer.rs index 1ca80244..0e5c4f76 100644 --- a/core/sdk/src/stream_builder/config/config_iggy_producer.rs +++ b/core/sdk/src/stream_builder/config/config_iggy_producer.rs @@ -37,8 +37,8 @@ pub struct IggyProducerConfig { /// Set the topic replication factor topic_replication_factor: Option<u8>, /// The max number of messages to send in a batch. Must be greater than 0. - batch_size: u32, - /// Sets the interval between sending the messages, can be combined with `batch_size`. + batch_length: u32, + /// Sets the interval between sending the messages, can be combined with `batch_length`. send_interval: IggyDuration, /// Specifies to which partition the messages should be sent. partitioning: Partitioning, @@ -61,7 +61,7 @@ impl Default for IggyProducerConfig { stream_name: "test_stream".to_string(), topic_id, topic_name: "test_topic".to_string(), - batch_size: 100, + batch_length: 100, send_interval: IggyDuration::from_str("5ms").unwrap(), partitioning: Partitioning::balanced(), topic_partitions_count: 1, @@ -84,7 +84,7 @@ impl IggyProducerConfig { /// * `topic_name` - The topic name. /// * `topic_partitions_count` - The number of partitions to create. /// * `topic_replication_factor` - The replication factor to use. - /// * `batch_size` - The max number of messages to send in a batch. + /// * `batch_length` - The max number of messages to send in a batch. /// * `send_interval` - The interval between messages sent. /// * `partitioning` - The partitioning strategy to use. /// * `encryptor` - The encryptor to use. @@ -102,7 +102,7 @@ impl IggyProducerConfig { topic_name: String, topic_partitions_count: u32, topic_replication_factor: Option<u8>, - batch_size: u32, + batch_length: u32, send_interval: IggyDuration, partitioning: Partitioning, encryptor: Option<Arc<EncryptorKind>>, @@ -116,7 +116,7 @@ impl IggyProducerConfig { topic_name, topic_partitions_count, topic_replication_factor, - batch_size, + batch_length, send_interval, partitioning, encryptor, @@ -132,7 +132,7 @@ impl IggyProducerConfig { /// /// * `stream` - The stream name. /// * `topic` - The topic name. - /// * `batch_size` - The max number of messages to send in a batch. + /// * `batch_length` - The max number of messages to send in a batch. /// * `send_interval` - The interval between messages sent. /// /// Returns: @@ -141,7 +141,7 @@ impl IggyProducerConfig { pub fn from_stream_topic( stream: &str, topic: &str, - batch_size: u32, + batch_length: u32, send_interval: IggyDuration, ) -> Result<Self, IggyError> { let stream_id = Identifier::from_str_value(stream)?; @@ -152,7 +152,7 @@ impl IggyProducerConfig { stream_name: stream.to_string(), topic_id, topic_name: topic.to_string(), - batch_size, + batch_length, send_interval, partitioning: Partitioning::balanced(), topic_partitions_count: 1, @@ -181,8 +181,8 @@ impl IggyProducerConfig { &self.topic_name } - pub fn batch_size(&self) -> u32 { - self.batch_size + pub fn batch_length(&self) -> u32 { + self.batch_length } pub fn send_interval(&self) -> IggyDuration { @@ -230,7 +230,7 @@ mod tests { .topic_id(Identifier::from_str_value(topic).unwrap()) .topic_name(topic) .topic_partitions_count(3) - .batch_size(100) + .batch_length(100) .send_interval(IggyDuration::from_str("5ms").unwrap()) .partitioning(Partitioning::balanced()) .send_retries_count(3) @@ -247,7 +247,7 @@ mod tests { &Identifier::from_str_value("test_topic").unwrap() ); assert_eq!(config.topic_name(), "test_topic"); - assert_eq!(config.batch_size(), 100); + assert_eq!(config.batch_length(), 100); assert_eq!( config.send_interval(), IggyDuration::from_str("5ms").unwrap() @@ -272,7 +272,7 @@ mod tests { assert_eq!(config.stream_name(), "test_stream"); assert_eq!(config.topic_id(), &topic_id); assert_eq!(config.topic_name(), "test_topic"); - assert_eq!(config.batch_size(), 100); + assert_eq!(config.batch_length(), 100); assert_eq!( config.send_interval(), IggyDuration::from_str("5ms").unwrap() @@ -310,7 +310,7 @@ mod tests { assert_eq!(config.stream_name(), "test_stream"); assert_eq!(config.topic_id(), &topic_id); assert_eq!(config.topic_name(), "test_topic"); - assert_eq!(config.batch_size(), 100); + assert_eq!(config.batch_length(), 100); assert_eq!( config.send_interval(), IggyDuration::from_str("5ms").unwrap() @@ -341,7 +341,7 @@ mod tests { assert_eq!(config.stream_name(), "test_stream"); assert_eq!(config.topic_id(), &topic_id); assert_eq!(config.topic_name(), "test_topic"); - assert_eq!(config.batch_size(), 100); + assert_eq!(config.batch_length(), 100); assert_eq!( config.send_interval(), IggyDuration::from_str("5ms").unwrap() diff --git a/core/sdk/src/stream_builder/config/config_iggy_stream.rs b/core/sdk/src/stream_builder/config/config_iggy_stream.rs index 2015e60b..49bc570e 100644 --- a/core/sdk/src/stream_builder/config/config_iggy_stream.rs +++ b/core/sdk/src/stream_builder/config/config_iggy_stream.rs @@ -51,7 +51,7 @@ impl IggyStreamConfig { /// /// * `stream` - The stream name. /// * `topic` - The topic name. - /// * `batch_size` - The max number of messages to send in a batch. + /// * `batch_length` - The max number of messages to send in a batch. /// * `send_interval` - The interval between messages sent. /// * `polling_interval` - The interval between polling for new messages. /// @@ -61,15 +61,15 @@ impl IggyStreamConfig { pub fn from_stream_topic( stream: &str, topic: &str, - batch_size: u32, + batch_length: u32, send_interval: IggyDuration, polling_interval: IggyDuration, ) -> Result<Self, IggyError> { let consumer_config = - IggyConsumerConfig::from_stream_topic(stream, topic, batch_size, polling_interval)?; + IggyConsumerConfig::from_stream_topic(stream, topic, batch_length, polling_interval)?; let producer_config = - IggyProducerConfig::from_stream_topic(stream, topic, batch_size, send_interval)?; + IggyProducerConfig::from_stream_topic(stream, topic, batch_length, send_interval)?; Ok(Self { consumer_config, @@ -129,8 +129,8 @@ mod tests { let config = IggyStreamConfig::new(consumer_config, producer_config); assert_eq!(config.stream_name(), "test_stream"); assert_eq!(config.topic_name(), "test_topic"); - assert_eq!(config.consumer_config().batch_size(), 100); - assert_eq!(config.producer_config().batch_size(), 100); + assert_eq!(config.consumer_config().batch_length(), 100); + assert_eq!(config.producer_config().batch_length(), 100); assert_eq!( config.consumer_config().polling_interval(), IggyDuration::from_str("5ms").unwrap() @@ -146,8 +146,8 @@ mod tests { let config = IggyStreamConfig::default(); assert_eq!(config.stream_name(), "test_stream"); assert_eq!(config.topic_name(), "test_topic"); - assert_eq!(config.consumer_config().batch_size(), 100); - assert_eq!(config.producer_config().batch_size(), 100); + assert_eq!(config.consumer_config().batch_length(), 100); + assert_eq!(config.producer_config().batch_length(), 100); assert_eq!( config.consumer_config().polling_interval(), IggyDuration::from_str("5ms").unwrap() @@ -173,8 +173,8 @@ mod tests { assert_eq!(config.stream_name(), "test_stream"); assert_eq!(config.topic_name(), "test_topic"); - assert_eq!(config.consumer_config().batch_size(), 100); - assert_eq!(config.producer_config().batch_size(), 100); + assert_eq!(config.consumer_config().batch_length(), 100); + assert_eq!(config.producer_config().batch_length(), 100); assert_eq!( config.consumer_config().polling_interval(), IggyDuration::from_str("5ms").unwrap()
